namenode下对Datanode的表征和管理-Datanode*系列

namenode上对Datanode的表征和管理--Datanode*系列
DatanodeID是继承自WritableComparable,封装了DN的一些基本信息:
  public String name;      /// hostname:portNumber
  public String storageID; /// unique per cluster storageID
  protected int infoPort;  /// infoserver所运行的端口--servlet端口
  public int ipcPort;     ///Datanode上ipc server运行的端口
里面包含了一大堆上述变量的set、get方法
==================================================================================
DatanodeInfo继承自DatanodeID,用于表示一个Datanode的状态,包含以下变量:
  protected long capacity;	//容量
  protected long dfsUsed;	//已使用
  protected long remaining;	//保留容量
  protected long lastUpdate;//上一次更新时间
  protected int xceiverCount;//Datanode上DataXceiver数目
  protected String location = NetworkTopology.DEFAULT_RACK;//所属的位置,在哪个机架上
  protected String hostName = null;//datanode的ip地址,不包含端口。在datanode注册时生成。
里面包含了一大堆上述变量的set、get方法
==================================================================================
DatanodeDescriptor类继承自DatanodeInfo,保存、管理Datanode上的块记录,是Namenode对一个特定的Datanode的记录。包含以下变量:
 private volatile BlockInfo blockList = null;//块链表首部
DatanodeDescriptor.BlockIterator用于从DatanodeDescriptor.BlockIterator.node上的块DatanodeDescriptor.BlockIterator.current开始对块链进行遍历。由blockList和BlockIterator我们就可以遍历Datanode上所有块信息。
private BlockQueue replicateBlocks = new BlockQueue();//要通过此Datanode进行备份的<block,DatanodeDescriptor[]>集合。namenode在FSNamesystem.computeReplicationWorkForBlock里面将要备份的块放到DatanodeCommand发送给Datanode,Datanode.processCommand中接收到的是DatanodeProtocol.DNA_TRANSFER(将特定块复制到其他指定的块)。
  private BlockQueue recoverBlocks = new BlockQueue();//要通过此Datanode进行恢复的<block,DatanodeDescriptor[]>集合。流程大致同replicateBlocks,只是发送的是DatanodeProtocol.DNA_RECOVERBLOCK。

  private Set<Block> invalidateBlocks = new TreeSet<Block>();//要设为不可用的块集合。Datanode周期性的汇报其所有块给Namenode,Namenode在接收到这个汇报后调用DatanodeDescriptor.reportDiff找出与namenode上的Datanode上的块信息不一致的块,根据相应的策略将这些块进行处理。而如果发过来的块在namenode上无记录(连一个副本都没有),就将发过来的块invalid掉。将要invalid的块先加入到recentInvalidateSets,然后周期性调用DatanodeDescriptor.addBlocksToBeInvalidated批量加入到invalidateBlocks中,在每次处理Datanode的心跳FSNamesystem.handleHeartbeat时,将这些要invalid的块加入到DatanodeCommand里,发给Datanode。

void addBlocksToBeInvalidated(List<Block> blocklist) {
    assert(blocklist != null && blocklist.size() > 0);
    synchronized (invalidateBlocks) {
      for(Block blk : blocklist) {
        invalidateBlocks.add(blk);
      }
    }
  }

  protected boolean isAlive = false;//表征Datanode是否存活。在心跳时,如果namenode发现DatanodeDescriptor.isAlive=false,就会发送DatanodeCommand.REGISTER让Datanode重新注册。


下述变量维护了block调度包括block report和heartbeat时间等:
  private int currApproxBlocksScheduled = 0;
  private int prevApproxBlocksScheduled = 0;
  private long lastBlocksScheduledRollTime = 0;
  private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min

前面提到过Datanode.offerService()会通过RPC框架调用namenode.blockReport(dnRegistration,
                  BlockListAsLongs.convertToArrayLongs(bReport));周期性向namenode汇报本地的所有块的状态信息。namenode.blockReport调用namesystem.processReport(nodeReg, blist);更新namenode上关于datanode的块记录。namesystem.processReport首先调用node.reportDiff(blocksMap, newReport, toAdd, toRemove, toInvalidate);将上报上来的块进行分类(分离出哪些需要add到namenode的Datanode块链中,哪些块需要设为不可用invalid),然后将namenode此Datanode块链中的块remove移除,并将toAdd加入到块链中,将不可用的块加入到recentInvalidateSets集合中待合适时机通知Datanode。
 
void reportDiff(BlocksMap blocksMap,
                  BlockListAsLongs newReport,
                  Collection<Block> toAdd,
                  Collection<Block> toRemove,
                  Collection<Block> toInvalidate) {
    // place a deilimiter in the list which separates blocks 
    // that have been reported from those that have not
    BlockInfo delimiter = new BlockInfo(new Block(), 1);
    boolean added = this.addBlock(delimiter);//将此标记符添加到Datanode中,用来作为
    assert added : "Delimiting block cannot be present in the node";
    if(newReport == null)
      newReport = new BlockListAsLongs( new long[0]);
    // scan the report and collect newly reported blocks
    // Note we are taking special precaution to limit tmp blocks allocated
    // as part this block report - which why block list is stored as longs
    Block iblk = new Block(); // a fixed new'ed block to be reused with index i
    for (int i = 0; i < newReport.getNumberOfBlocks(); ++i) {
      iblk.set(newReport.getBlockId(i), newReport.getBlockLen(i), 
               newReport.getBlockGenStamp(i));
      BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
      if(storedBlock == null) {
        toInvalidate.add(new Block(iblk));//当此块在namenode中此块的记录(连副本都没有),将这个块设为不可用
        continue;
      }
      if(storedBlock.findDatanode(this) < 0) {//此(存在的)block不在namenode端的DatanodeDescriptor上
        // if the size differs from what is in the blockmap, then return
        // the new block. addStoredBlock will then pick up the right size of this
        // block and will update the block object in the BlocksMap
        if (storedBlock.getNumBytes() != iblk.getNumBytes()) {//Datanode上报过来的新块的大小和namenode上的记录的块大小不一致,要用新块
          toAdd.add(new Block(iblk));
        } else {
          toAdd.add(storedBlock);
        }
        continue;
      }
      this.moveBlockToHead(storedBlock);//此块存在于该DataNodeDescriptor上
    }
    // collect blocks that have not been reported
    // all of them are next to the delimiter
//由于Datanode上报上来的是所有的块,所以应该将namenode上旧的块remove掉
    Iterator<Block> it = new BlockIterator(delimiter.getNext(0), this);//用块迭代器BlockIterator对Datanode上的块进行逐个访问
    while(it.hasNext())
      toRemove.add(it.next());
    this.removeBlock(delimiter);//删除标记
  }


Datanode.offerService()通过心跳机制调用namenode.sendHeartbeat(...)向namenode汇报了当前的信息,namenode调用updateHeartBeat将namenode上的Datanode信息进行更新:
  void updateHeartbeat(long capacity, long dfsUsed, long remaining,
      int xceiverCount) {
    this.capacity = capacity;
    this.dfsUsed = dfsUsed;
    this.remaining = remaining;
    this.lastUpdate = System.currentTimeMillis();
    this.xceiverCount = xceiverCount;
    rollBlocksScheduled(lastUpdate);
  }

调用顺序如下图:
namenode下对Datanode的表征和管理-Datanode*系列


namenode在处理心跳FSNamesystem.handleHeartbeat时会调用DatanodeDescriptor.getXXXCommand方法获得BlockCommand,BlockCommand整合了要处理的块和对应的DatanodeInfo信息(XXX可以是Replication、LeaseRecovery、Invalidate)。
BlockCommand getLeaseRecoveryCommand(int maxTransfers) {
    List<BlockTargetPair> blocktargetlist = recoverBlocks.poll(maxTransfers);
    return blocktargetlist == null? null:
        new BlockCommand(DatanodeProtocol.DNA_RECOVERBLOCK, blocktargetlist);
  }


readFieldsFromFSEditLog方法用于从日志edits文件中恢复出datanode的配置信息:

 void readFieldsFromFSEditLog(DataInput in) throws IOException {
    this.name = UTF8.readString(in);
    this.storageID = UTF8.readString(in);
    this.infoPort = in.readShort() & 0x0000ffff;

    this.capacity = in.readLong();
    this.dfsUsed = in.readLong();
    this.remaining = in.readLong();
    this.lastUpdate = in.readLong();
    this.xceiverCount = in.readInt();
    this.location = Text.readString(in);
    this.hostName = Text.readString(in);
    setAdminState(WritableUtils.readEnum(in, AdminStates.class));
  }