namenode下对Datanode的表征和管理-Datanode*系列
namenode上对Datanode的表征和管理--Datanode*系列
DatanodeID是继承自WritableComparable,封装了DN的一些基本信息:
==================================================================================
DatanodeInfo继承自DatanodeID,用于表示一个Datanode的状态,包含以下变量:
==================================================================================
DatanodeDescriptor类继承自DatanodeInfo,保存、管理Datanode上的块记录,是Namenode对一个特定的Datanode的记录。包含以下变量:
下述变量维护了block调度包括block report和heartbeat时间等:
前面提到过Datanode.offerService()会通过RPC框架调用namenode.blockReport(dnRegistration,
BlockListAsLongs.convertToArrayLongs(bReport));周期性向namenode汇报本地的所有块的状态信息。namenode.blockReport调用namesystem.processReport(nodeReg, blist);更新namenode上关于datanode的块记录。namesystem.processReport首先调用node.reportDiff(blocksMap, newReport, toAdd, toRemove, toInvalidate);将上报上来的块进行分类(分离出哪些需要add到namenode的Datanode块链中,哪些块需要设为不可用invalid),然后将namenode此Datanode块链中的块remove移除,并将toAdd加入到块链中,将不可用的块加入到recentInvalidateSets集合中待合适时机通知Datanode。
Datanode.offerService()通过心跳机制调用namenode.sendHeartbeat(...)向namenode汇报了当前的信息,namenode调用updateHeartBeat将namenode上的Datanode信息进行更新:
调用顺序如下图:
namenode在处理心跳FSNamesystem.handleHeartbeat时会调用DatanodeDescriptor.getXXXCommand方法获得BlockCommand,BlockCommand整合了要处理的块和对应的DatanodeInfo信息(XXX可以是Replication、LeaseRecovery、Invalidate)。
DatanodeID是继承自WritableComparable,封装了DN的一些基本信息:
public String name; /// hostname:portNumber public String storageID; /// unique per cluster storageID protected int infoPort; /// infoserver所运行的端口--servlet端口 public int ipcPort; ///Datanode上ipc server运行的端口里面包含了一大堆上述变量的set、get方法
==================================================================================
DatanodeInfo继承自DatanodeID,用于表示一个Datanode的状态,包含以下变量:
protected long capacity; //容量 protected long dfsUsed; //已使用 protected long remaining; //保留容量 protected long lastUpdate;//上一次更新时间 protected int xceiverCount;//Datanode上DataXceiver数目 protected String location = NetworkTopology.DEFAULT_RACK;//所属的位置,在哪个机架上 protected String hostName = null;//datanode的ip地址,不包含端口。在datanode注册时生成。里面包含了一大堆上述变量的set、get方法
==================================================================================
DatanodeDescriptor类继承自DatanodeInfo,保存、管理Datanode上的块记录,是Namenode对一个特定的Datanode的记录。包含以下变量:
private volatile BlockInfo blockList = null;//块链表首部DatanodeDescriptor.BlockIterator用于从DatanodeDescriptor.BlockIterator.node上的块DatanodeDescriptor.BlockIterator.current开始对块链进行遍历。由blockList和BlockIterator我们就可以遍历Datanode上所有块信息。
private BlockQueue replicateBlocks = new BlockQueue();//要通过此Datanode进行备份的<block,DatanodeDescriptor[]>集合。namenode在FSNamesystem.computeReplicationWorkForBlock里面将要备份的块放到DatanodeCommand发送给Datanode,Datanode.processCommand中接收到的是DatanodeProtocol.DNA_TRANSFER(将特定块复制到其他指定的块)。
private BlockQueue recoverBlocks = new BlockQueue();//要通过此Datanode进行恢复的<block,DatanodeDescriptor[]>集合。流程大致同replicateBlocks,只是发送的是DatanodeProtocol.DNA_RECOVERBLOCK。
private Set<Block> invalidateBlocks = new TreeSet<Block>();//要设为不可用的块集合。Datanode周期性的汇报其所有块给Namenode,Namenode在接收到这个汇报后调用DatanodeDescriptor.reportDiff找出与namenode上的Datanode上的块信息不一致的块,根据相应的策略将这些块进行处理。而如果发过来的块在namenode上无记录(连一个副本都没有),就将发过来的块invalid掉。将要invalid的块先加入到recentInvalidateSets,然后周期性调用DatanodeDescriptor.addBlocksToBeInvalidated批量加入到invalidateBlocks中,在每次处理Datanode的心跳FSNamesystem.handleHeartbeat时,将这些要invalid的块加入到DatanodeCommand里,发给Datanode。
void addBlocksToBeInvalidated(List<Block> blocklist) { assert(blocklist != null && blocklist.size() > 0); synchronized (invalidateBlocks) { for(Block blk : blocklist) { invalidateBlocks.add(blk); } } }
protected boolean isAlive = false;//表征Datanode是否存活。在心跳时,如果namenode发现DatanodeDescriptor.isAlive=false,就会发送DatanodeCommand.REGISTER让Datanode重新注册。
下述变量维护了block调度包括block report和heartbeat时间等:
private int currApproxBlocksScheduled = 0; private int prevApproxBlocksScheduled = 0; private long lastBlocksScheduledRollTime = 0; private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
前面提到过Datanode.offerService()会通过RPC框架调用namenode.blockReport(dnRegistration,
BlockListAsLongs.convertToArrayLongs(bReport));周期性向namenode汇报本地的所有块的状态信息。namenode.blockReport调用namesystem.processReport(nodeReg, blist);更新namenode上关于datanode的块记录。namesystem.processReport首先调用node.reportDiff(blocksMap, newReport, toAdd, toRemove, toInvalidate);将上报上来的块进行分类(分离出哪些需要add到namenode的Datanode块链中,哪些块需要设为不可用invalid),然后将namenode此Datanode块链中的块remove移除,并将toAdd加入到块链中,将不可用的块加入到recentInvalidateSets集合中待合适时机通知Datanode。
void reportDiff(BlocksMap blocksMap, BlockListAsLongs newReport, Collection<Block> toAdd, Collection<Block> toRemove, Collection<Block> toInvalidate) { // place a deilimiter in the list which separates blocks // that have been reported from those that have not BlockInfo delimiter = new BlockInfo(new Block(), 1); boolean added = this.addBlock(delimiter);//将此标记符添加到Datanode中,用来作为 assert added : "Delimiting block cannot be present in the node"; if(newReport == null) newReport = new BlockListAsLongs( new long[0]); // scan the report and collect newly reported blocks // Note we are taking special precaution to limit tmp blocks allocated // as part this block report - which why block list is stored as longs Block iblk = new Block(); // a fixed new'ed block to be reused with index i for (int i = 0; i < newReport.getNumberOfBlocks(); ++i) { iblk.set(newReport.getBlockId(i), newReport.getBlockLen(i), newReport.getBlockGenStamp(i)); BlockInfo storedBlock = blocksMap.getStoredBlock(iblk); if(storedBlock == null) { toInvalidate.add(new Block(iblk));//当此块在namenode中此块的记录(连副本都没有),将这个块设为不可用 continue; } if(storedBlock.findDatanode(this) < 0) {//此(存在的)block不在namenode端的DatanodeDescriptor上 // if the size differs from what is in the blockmap, then return // the new block. addStoredBlock will then pick up the right size of this // block and will update the block object in the BlocksMap if (storedBlock.getNumBytes() != iblk.getNumBytes()) {//Datanode上报过来的新块的大小和namenode上的记录的块大小不一致,要用新块 toAdd.add(new Block(iblk)); } else { toAdd.add(storedBlock); } continue; } this.moveBlockToHead(storedBlock);//此块存在于该DataNodeDescriptor上 } // collect blocks that have not been reported // all of them are next to the delimiter //由于Datanode上报上来的是所有的块,所以应该将namenode上旧的块remove掉 Iterator<Block> it = new BlockIterator(delimiter.getNext(0), this);//用块迭代器BlockIterator对Datanode上的块进行逐个访问 while(it.hasNext()) toRemove.add(it.next()); this.removeBlock(delimiter);//删除标记 }
Datanode.offerService()通过心跳机制调用namenode.sendHeartbeat(...)向namenode汇报了当前的信息,namenode调用updateHeartBeat将namenode上的Datanode信息进行更新:
void updateHeartbeat(long capacity, long dfsUsed, long remaining, int xceiverCount) { this.capacity = capacity; this.dfsUsed = dfsUsed; this.remaining = remaining; this.lastUpdate = System.currentTimeMillis(); this.xceiverCount = xceiverCount; rollBlocksScheduled(lastUpdate); }
调用顺序如下图:
namenode在处理心跳FSNamesystem.handleHeartbeat时会调用DatanodeDescriptor.getXXXCommand方法获得BlockCommand,BlockCommand整合了要处理的块和对应的DatanodeInfo信息(XXX可以是Replication、LeaseRecovery、Invalidate)。
BlockCommand getLeaseRecoveryCommand(int maxTransfers) { List<BlockTargetPair> blocktargetlist = recoverBlocks.poll(maxTransfers); return blocktargetlist == null? null: new BlockCommand(DatanodeProtocol.DNA_RECOVERBLOCK, blocktargetlist); }
readFieldsFromFSEditLog方法用于从日志edits文件中恢复出datanode的配置信息:
void readFieldsFromFSEditLog(DataInput in) throws IOException { this.name = UTF8.readString(in); this.storageID = UTF8.readString(in); this.infoPort = in.readShort() & 0x0000ffff; this.capacity = in.readLong(); this.dfsUsed = in.readLong(); this.remaining = in.readLong(); this.lastUpdate = in.readLong(); this.xceiverCount = in.readInt(); this.location = Text.readString(in); this.hostName = Text.readString(in); setAdminState(WritableUtils.readEnum(in, AdminStates.class)); }