Hadoop源码分析HDFS ClientProtocol——getBlockLocations
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
Class?ClientProtocol是HDFS客戶端與NameNode之間的接口。Client通過調(diào)用ClientProtocol的方法完成與NameNode之間的交互。本文分析方法getBlockLocations。該方法的方法聲明如下:
public?LocatedBlocks?getBlockLocations(String?src,long?offset,?long?length)throws?IOException
該方法用于獲取一個(gè)給定文件的偏移和數(shù)據(jù)長度的所有數(shù)據(jù)庫塊(block)的數(shù)據(jù)節(jié)點(diǎn)(datanode)。該方法的參數(shù)如下:
src:文件的名稱
offset:文件的開始位置,文件開始的偏移
length:需要獲取文件數(shù)據(jù)的長度
該方法返回LocatedBlocks,LocatedBlocks包含文件長度、數(shù)據(jù)塊blocks、數(shù)據(jù)塊block的datanode等信息。注意,每一個(gè)block的所有datanode按照其距離client的距離進(jìn)行排序。
下面我們看一下具體是怎么獲得文件給定范圍內(nèi)數(shù)據(jù)的數(shù)據(jù)塊和數(shù)據(jù)塊所在的datanode的。首先,HDFS在內(nèi)存中包含了整個(gè)文件系統(tǒng)的一個(gè)映像,包括INodeFile(對(duì)應(yīng)于硬盤上的文件)、INodeDirectory(對(duì)應(yīng)于硬盤上的文件夾)等??梢蚤喿x文件《FSNamesystem中有幾個(gè)非常重要的變量》。
為了獲取文件的數(shù)據(jù)塊,我們首先需要從文件系統(tǒng)中命名空間中獲取代表該文件的INodeFile。通過該INodeFile我們可以獲取該文件所包含的所有block。獲取INodeFile所有的block之后,我們從中查找包含offset的第一個(gè)block。獲取到第一個(gè)block之后,就可以獲取該block所在的datanode。
為了獲取block所在的datanode,首先判斷該block是不是在文件系統(tǒng)正在構(gòu)建。如果是正在構(gòu)建的block,比如當(dāng)前正在創(chuàng)建該block,正在往block中寫入數(shù)據(jù)。我們可以用INodeFileUnderConstruction.getTargets()來獲取該block所在的datanode。因?yàn)轭怚NodeFileUnderConstruction保存了最后一個(gè)block所存放的datanode。如果不是正在構(gòu)建的block,我們首先判斷該block是不是損壞。如果是損壞的。如果該block沒有損壞,我們從blocksMap中獲取該block所在的所有DatanodeDescriptor并確定該DatanodeDescriptor存儲(chǔ)的block是沒有損壞的。
以此類推直到找到所有的block。具體代碼如下:
private?synchronized?LocatedBlocks?getBlockLocationsInternal(String?src,
long?offset,?long?length,?int?nrBlocksToReturn,
boolean?doAccessTime,?boolean?needBlockToken)?throws?IOException?{
INodeFile?inode?=?dir.getFileINode(src);//獲取INodeFile
if?(inode?==?null)?{
return?null;
}
if?(doAccessTime?&&?isAccessTimeSupported())?{
dir.setTimes(src,?inode,?-1,?now(),?false);
}
Block[]?blocks?=?inode.getBlocks();//獲取該INodeFile所有的block
if?(blocks?==?null)?{
return?null;
}
if?(blocks.length?==?0)?{
return?inode.createLocatedBlocks(new?ArrayList<LocatedBlock>(
blocks.length));
}
List<LocatedBlock>?results;
results?=?new?ArrayList<LocatedBlock>(blocks.length);
/**下面的for循環(huán)獲取offset開始的block**/
int?curBlk?=?0;
long?curPos?=?0,?blkSize?=?0;
int?nrBlocks?=?(blocks[0].getNumBytes()?==?0)???0?:?blocks.length;
for?(curBlk?=?0;?curBlk?<?nrBlocks;?curBlk++)?{
blkSize?=?blocks[curBlk].getNumBytes();
assert?blkSize?>?0?:?"Block?of?size?0";
if?(curPos?+?blkSize?>?offset)?{
break;
}
curPos?+=?blkSize;
}
if?(nrBlocks?>?0?&&?curBlk?==?nrBlocks)?//?offset?>=?end?of?file
return?null;
long?endOff?=?offset?+?length;
do?{
//?get?block?locations
int?numNodes?=?blocksMap.numNodes(blocks[curBlk]);//block存儲(chǔ)備份的數(shù)目
int?numCorruptNodes?=?countNodes(blocks[curBlk]).corruptReplicas();//損壞節(jié)點(diǎn)??//數(shù)目
int?numCorruptReplicas?=?corruptReplicas
.numCorruptReplicas(blocks[curBlk]);//損壞備份存儲(chǔ)的數(shù)量
if?(numCorruptNodes?!=?numCorruptReplicas)?{
LOG.warn("Inconsistent?number?of?corrupt?replicas?for?"
+?blocks[curBlk]?+?"blockMap?has?"?+?numCorruptNodes
+?"?but?corrupt?replicas?map?has?"?+?numCorruptReplicas);
}
DatanodeDescriptor[]?machineSet?=?null;
boolean?blockCorrupt?=?false;
if?(inode.isUnderConstruction()?&&?curBlk?==?blocks.length?-?1
&&?blocksMap.numNodes(blocks[curBlk])?==?0)?{
//?get?unfinished?block?locations
//獲取正在構(gòu)建的block的存儲(chǔ)位置datanode
INodeFileUnderConstruction?cons?=?(INodeFileUnderConstruction)?inode;
machineSet?=?cons.getTargets();
blockCorrupt?=?false;
}?else?{
blockCorrupt?=?(numCorruptNodes?==?numNodes);
int?numMachineSet?=?blockCorrupt???numNodes
:?(numNodes?-?numCorruptNodes);
machineSet?=?new?DatanodeDescriptor[numMachineSet];
if?(numMachineSet?>?0)?{
numNodes?=?0;
//從blocksMap中獲取該block所有未損壞的datanode的位置
for?(Iterator<DatanodeDescriptor>?it?=?blocksMap
.nodeIterator(blocks[curBlk]);?it.hasNext();)?{
DatanodeDescriptor?dn?=?it.next();
boolean?replicaCorrupt?=?corruptReplicas
.isReplicaCorrupt(blocks[curBlk],?dn);
if?(blockCorrupt?||?(!blockCorrupt?&&?!replicaCorrupt))
machineSet[numNodes++]?=?dn;
}
}
}
LocatedBlock?b?=?new?LocatedBlock(blocks[curBlk],?machineSet,
curPos,?blockCorrupt);
if?(isAccessTokenEnabled?&&?needBlockToken)?{
b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(),
EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
}
results.add(b);
curPos?+=?blocks[curBlk].getNumBytes();
curBlk++;
}?while?(curPos?<?endOff?&&?curBlk?<?blocks.length
&&?results.size()?<?nrBlocksToReturn);
return?inode.createLocatedBlocks(results);
}
轉(zhuǎn)載于:https://my.oschina.net/sdzzboy/blog/164141
總結(jié)
以上是生活随笔為你收集整理的Hadoop源码分析HDFS ClientProtocol——getBlockLocations的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: CA knowledge study
- 下一篇: 猎豹浏览器怎么不能设置背景着颜色护眼?