Hadoop异步rpc通讯机制-org.apache.hadoop.ipc.Server

Hadoop异步rpc通信机制--org.apache.hadoop.ipc.Server
Java NOI非阻塞技术不是开启线程去等待端口的响应,而是采用Reactor模式或Observer模式监听I/O端口,当端口有响应时,会自动通知我们,从而实现流畅的I/O读写。

Java NOI中selector可视为一个观察者,只要我们把要观察的SocketChannel告诉Selector(注册的方式),我们就可以做其余的事情,等到已告知Channel上有事情发生时,Selector会通知我们,传回一组SelectionKey,我们读取这些Key,就可以获得Channel上的数据了。

Client端的底层通信直接采用了阻塞式IO编程,Server是采用Java NIO机制进行RPC通信:

java NIO参考资料:

http://www.iteye.com/topic/834447

http://weixiaolu.iteye.com/blog/1479656

=========================================================================================================================

Server是一个abstract类,抽象之处在call方法中,RPC.Server是ipc.Server的实现类,RPC.Server的构造函数调用了ipc.Server类的构造函数的,Namenode在初始化时调用RPC.getServer方法初始化了RPC.Server:

public static Server getServer(final Object instance, final String bindAddress, final int port,
                                 final int numHandlers,
                                 final boolean verbose, Configuration conf,
                                 SecretManager<? extends TokenIdentifier> secretManager) 
    throws IOException {
    return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
  }

Server.Call是一个请求类,类似Client.Call,只是添加了Call的时间戳机制:

private static class Call {
    private int id;                       // 请求id
    private Writable param;               // 请求的参数
    private Connection connection;        // 和Client一样,表示一个C/S间的连接
    private long timestamp;               // 时间戳
    private ByteBuffer response;          // server对此次请求的响应结果
...
}

知道了Client.Connection后,显然Server.Connection就是Server到Client的连接。Server.Connection内保存了Client的地址,用于灾难恢复。Server.Connection通过调用readAndProcess对Client进行一些操作:版本校验,读数据头processHeader(获取通信协议protocol,根据头部的ugi信息创建user对象)以及读数据processData(获取Client发送过来的Call.id和params,根据二者建立一个请求call,并将请求call入队callQueue)【readAndProcess方法是在Listener.doRead时调用,此时监听器监听到新连接的读数据事件】。

Hadoop异步rpc通讯机制-org.apache.hadoop.ipc.Server

    public int readAndProcess() throws IOException, InterruptedException {
    	//先对connection进行版本校验,校验成功后读取Header头部信息(得到客户端所用的协议和客户端的标识user)
    	//,接着读取数据(Call.id和参数params,其中params),然后建立一个Call
      while (true) {
        /* Read at most one RPC. If the header is not read completely yet
         * then iterate until we read first RPC or until there is no data left.
         */    
        int count = -1;
        if (dataLengthBuffer.remaining() > 0) {
          count = channelRead(channel, dataLengthBuffer);       
          if (count < 0 || dataLengthBuffer.remaining() > 0) 
            return count;
        }
      
        if (!versionRead) {//尚未版本验证
          //Every connection is expected to send the header.
          ByteBuffer versionBuffer = ByteBuffer.allocate(1);
          count = channelRead(channel, versionBuffer);
          if (count <= 0) {
            return count;
          }
          int version = versionBuffer.get(0);
          //要读取BufferByte前要先flip下
          dataLengthBuffer.flip();//.flip();一定得有,如果没有,就是从最后开始读取的,当然读出来的都是byte=0时候的字符。
          //通过buffer.flip();这个语句,就能把buffer的当前位置更改为buffer缓冲区的第一个位置          
          if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
            //Warning is ok since this is not supposed to happen.
            LOG.warn("Incorrect header or version mismatch from " + 
                     hostAddress + ":" + remotePort +
                     " got version " + version + 
                     " expected version " + CURRENT_VERSION);
            return -1;
          }
          dataLengthBuffer.clear();//清除内容
          versionRead = true;//验证版本了
          continue;
        }
        
        if (data == null) {//分配新的data
          dataLengthBuffer.flip();
          dataLength = dataLengthBuffer.getInt();
       
          if (dataLength == Client.PING_CALL_ID) {
            dataLengthBuffer.clear();
            return 0;  //ping message
          }
          data = ByteBuffer.allocate(dataLength);
          incRpcCount();  // Increment the rpc count
        }
        
        count = channelRead(channel, data);//读数据
        
        if (data.remaining() == 0) {//因为分配刚好的dataLength,所有正常情况会无剩余空间
          dataLengthBuffer.clear();
          data.flip();
          if (headerRead) {//头部已经处理过了
            processData();//处理数据
            data = null;
            return count;
          } else {
            processHeader();
            headerRead = true;
            data = null;
            
            // Authorize the connection
            try {//尝试授权
              authorize(user, header);//user已创建成功
              
              if (LOG.isDebugEnabled()) {
                LOG.debug("Successfully authorized " + header);
              }
            } catch (AuthorizationException ae) {//授权失败
              authFailedCall.connection = this;
              setupResponse(authFailedResponse, authFailedCall, 
                            Status.FATAL, null, 
                            ae.getClass().getName(), ae.getMessage());
              responder.doRespond(authFailedCall);
              
              // Close this connection
              return -1;
            }

            continue;
          }
        } 
        return count;
      }
    }

Server.Listener是一个用于监听新的连接的线程。该线程在run里面循环处理在channel上的事件。当事件为新连接(即key.isAcceptable()=true)时,调用doAccpet接收请求(为channel注册OP_READ监听读,然后根据注册得到的SelectionKey和channel创建一个新的Server.Connection,然后将这个连接加入到连接集合connectionList内【handler进程是从这个list上取出数据进行处理的】);当事件为读事件(即key.isReadable()=true)调用doRead读取connection上的数据(其实是调用connection.readAndProcess进行读);当没有监听到事件时,会调用cleanupConnections(false)清理掉长时间不响应的连接。

  public void run() {
      LOG.info(getName() + ": starting");
      SERVER.set(Server.this);
      while (running) {
        SelectionKey key = null;
        try {
        	/*Selector通过select方法通知我们我们感兴趣的事件发生了。
        	nKeys = selector.select();
        	如果有我们注册的事情发生了,它的传回值就会大于0*/
          selector.select();
          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {//此键是否有效
                if (key.isAcceptable())//通道是否已准备好接受新的套接字连接。
                  doAccept(key);//接受一个新连接,为通道注册读事件,建立新连接并将新连接加入connectionList
                else if (key.isReadable())//测试此键的通道是否已准备好进行读取。
                  doRead(key);//调用Connection.readAndProcess读取call的数据,建立新call
              }
            } catch (IOException e) {
            }
            key = null;
          }
        } catch (OutOfMemoryError e) {
          // we can run out of memory if we have too many threads
          // log the event and sleep for a minute and give 
          // some thread(s) a chance to finish
          LOG.warn("Out of Memory in server select", e);
          closeCurrentConnection(key, e);
          cleanupConnections(true);
          try { Thread.sleep(60000); } catch (Exception ie) {}
        } catch (InterruptedException e) {
          if (running) {                          // unexpected -- log it
            LOG.info(getName() + " caught: " +
                     StringUtils.stringifyException(e));
          }
        } catch (Exception e) {
          closeCurrentConnection(key, e);
        }
        cleanupConnections(false);
      }
      LOG.info("Stopping " + this.getName());

      synchronized (this) {
        try {//善后工作
          acceptChannel.close();
          selector.close();
        } catch (IOException e) { }

        selector= null;
        acceptChannel= null;
        
        // clean up all connections
        while (!connectionList.isEmpty()) {
          closeConnection(connectionList.remove(0));
        }
      }
    }



Server.Handle"线程主要的任务是用于处理请求。考虑到可能同时有多个请求要处理,所以Handle一般不唯一。Handle线程循环从callQueue中取出一个call,根据调用Server.Call关联的连接Server.Connection所对应的用户Subject,来执行IPC调用过程。然后将处理结果序列化到call并加入到call对应的connection中的responseQueue【responseQueue也是一个Call集合,每个connection都有一个responseQueue,保存了给对应的client的响应信息】:

  /** Handles queued calls . */
  private class Handler extends Thread {
    public Handler(int instanceNumber) {
      this.setDaemon(true);
      this.setName("IPC Server handler "+ instanceNumber + " on " + port);
    }

    @Override
    public void run() {
      LOG.info(getName() + ": starting");
      SERVER.set(Server.this);// 设置当前处理线程的本地变量的拷贝
      ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
      while (running) {
        try {
          final Call call = callQueue.take(); // pop the queue; maybe blocked here

          if (LOG.isDebugEnabled())
            LOG.debug(getName() + ": has #" + call.id + " from " +
                      call.connection);
          
          String errorClass = null;
          String error = null;
          Writable value = null;

          CurCall.set(call);// 设置当前线程本地变量拷贝的值为出队得到的一个call调用实例  
          try {
            // Make the call as the user via Subject.doAs, thus associating
            // the call with the Subject
        	  // // 根据调用Server.Call关联的连接Server.Connection所对应的用户Subject,来执行IPC调用过程
            value = 
              Subject.doAs(call.connection.user, 
                           new PrivilegedExceptionAction<Writable>() {
                              @Override
                              public Writable run() throws Exception {
                                // make the call
                                return call(call.connection.protocol, 
                                            call.param, call.timestamp);
                              }
                           }
                          );
              
          } catch (PrivilegedActionException pae) {
            Exception e = pae.getException();
            LOG.info(getName()+", call "+call+": error: " + e, e);
            errorClass = e.getClass().getName();
            error = StringUtils.stringifyException(e);
          } catch (Throwable e) {
            LOG.info(getName()+", call "+call+": error: " + e, e);
            errorClass = e.getClass().getName();
            error = StringUtils.stringifyException(e);
          }
          CurCall.set(null); // 当前Handler线程处理完成一个调用call,回收当前线程的局部变量拷贝  
          // 处理当前获取到的调用的响应  
          setupResponse(buf, call, 
                        (error == null) ? Status.SUCCESS : Status.ERROR, 
                        value, errorClass, error);//将处理的结果序列化到call中
          responder.doRespond(call); // 将调用call加入到响应队列中,等待客户端读取响应信息  
        } catch (InterruptedException e) {
          if (running) {                          // unexpected -- log it
            LOG.info(getName() + " caught: " +
                     StringUtils.stringifyException(e));
          }
        } catch (Exception e) {
          LOG.info(getName() + " caught: " +
                   StringUtils.stringifyException(e));
        }
      }
      LOG.info(getName() + ": exiting");
    }
  }
 void doRespond(Call call) throws IOException {
      synchronized (call.connection.responseQueue) {
        call.connection.responseQueue.addLast(call);//将call加入到call对应的connection对应的responseQueue中
        if (call.connection.responseQueue.size() == 1) {//当入队前队列为空时,则调用processResponse将为call.connection对应的通道添加写事件,同时将call附加到通道上。这使得在Responder.run调用doAsyncWrite时可以通过选择键获得附加对象call。
          processResponse(call.connection.responseQueue, true);
        }
      }
    }

Responder线程任务是将响应队列中的响应写回客户端。
在run内定期doPurge,根据Call.timestamp判断该请求是否长时间未响应,如果是,关闭掉对应的连接。

/**
     * 调用异步写doAsyncWrite写回响应消息,并周期性doPurge清理不响应的连接
     */
    public void run() {
      LOG.info(getName() + ": starting");
      SERVER.set(Server.this);//保存server
      long lastPurgeTime = 0;   // last check for old calls.

      while (running) {
        try {
          waitPending();     // If a channel is being registered, wait.
          writeSelector.select(PURGE_INTERVAL);//选择一组键,其相应的通道已为 I/O 操作准备就绪。 最多阻塞 timeout 毫秒
          Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
          while (iter.hasNext()) {
            SelectionKey key = iter.next();
            iter.remove();
            try {
              if (key.isValid() && key.isWritable()) {//测试此键的通道是否已准备好进行写入。
                  doAsyncWrite(key);//调用异步写
              }
            } catch (IOException e) {
              LOG.info(getName() + ": doAsyncWrite threw exception " + e);
            }
          }
          long now = System.currentTimeMillis();
          if (now < lastPurgeTime + PURGE_INTERVAL) {
            continue;
          }
          lastPurgeTime = now;
          //
          // If there were some calls that have not been sent out for a
          // long time, discard them.
          //
          LOG.debug("Checking for old call responses.");
          ArrayList<Call> calls;
          
          // get the list of channels from list of keys.
          synchronized (writeSelector.keys()) {//找出正常的call构成calls
            calls = new ArrayList<Call>(writeSelector.keys().size());
            iter = writeSelector.keys().iterator();
            while (iter.hasNext()) {
              SelectionKey key = iter.next();
              Call call = (Call)key.attachment();
              if (call != null && key.channel() == call.connection.channel) { 
                calls.add(call);
              }
            }
          }
          //关掉正常calls中长久不响应的连接
          for(Call call : calls) {
            try {
              doPurge(call, now);
            } catch (IOException e) {
              LOG.warn("Error in purging old calls " + e);
            }
          }
        } catch (OutOfMemoryError e) {
          //
          // we can run out of memory if we have too many threads
          // log the event and sleep for a minute and give
          // some thread(s) a chance to finish
          //
          LOG.warn("Out of Memory in server select", e);
          try { Thread.sleep(60000); } catch (Exception ie) {}
        } catch (Exception e) {
          LOG.warn("Exception in Responder " + 
                   StringUtils.stringifyException(e));
        }
      }
      LOG.info("Stopping " + this.getName());
    }

/**调用processResponse进行异步写响应消息
     * */
    private void doAsyncWrite(SelectionKey key) throws IOException {
      Call call = (Call)key.attachment();//获取当前的附加对象call。在processResponse内将Call作为附件注册到写通道
      if (call == null) {//只有当Handler线程有注册写事件并将call附加到通道上,call才不为null
        return;
      }
      //根据得到的附加对象call,将call所在的connection的responseQueue上的response写回给客户端
      if (key.channel() != call.connection.channel) {
        throw new IOException("doAsyncWrite: bad channel");
      }
      synchronized(call.connection.responseQueue) {
        if (processResponse(call.connection.responseQueue, false)) {// 调用processResponse处理与调用关联的响应数据  
          try {
            key.interestOps(0);//将此键的 interest 集合设置为给定值0。 
          } catch (CancelledKeyException e) {
            /* The Listener/reader might have closed the socket.
             * We don't explicitly cancel the key, so not sure if this will
             * ever fire.
             * This warning could be removed.
             */
            LOG.warn("Exception while changing ops : " + e);
          }
        }
      }
    }

    //清理工作
    // Remove calls that have been pending in the responseQueue 
    // for a long time.
    //
    private void doPurge(Call call, long now) throws IOException {
      LinkedList<Call> responseQueue = call.connection.responseQueue;//一个connection对应多个请求,
      //也就需要多个响应
      synchronized (responseQueue) {
        Iterator<Call> iter = responseQueue.listIterator(0);
        while (iter.hasNext()) {//如果发现有长时间未响应的请求,关闭这个connection
          call = iter.next();
          if (now > call.timestamp + PURGE_INTERVAL) {
            closeConnection(call.connection);
            break;
          }
        }
      }
    }


最后,看一下几个线程的开启:

public synchronized void start() throws IOException {
    responder.start();
    listener.start();
    handlers = new Handler[handlerCount];
    
    for (int i = 0; i < handlerCount; i++) {//可以有多个处理线程
      handlers[i] = new Handler(i);
      handlers[i].start();
    }
  }
stop()操作类似,不再赘述。

=========================================================================================================================

【总结】

Server采用Java NIO非阻塞技术。
Listener用于接收客户端的连接,把连接加入到connectionList内,同时调用Connection的方法进行版本校验、建立Call并将Call加入到callQueue中。

Handler是处理线程,用于从callQueue中取出请求进行处理,调用Respond.doRespond将call加入到responseQueue。

Respond是响应线程,用于将responseQueue上的响应写给对应的Client,同时doPurge清理掉长期不响应的连接。