hadoop实现原理 (3) 远程过程调用
我们通过一个简单的例子来看Hadoop 远程过程调用怎么使用。
服务端启动
IPCQueryStatusImpl queryService = new IPCQueryStatusImpl(); Server server = RPC.getServer(queryService, "0.0.0.0",IPC_PORT, new Configuration()); server.start();
客户端调用
InetSocketAddress addr = new InetSocketAddress("localhost",IPCQueryServer.IPC_PORT); IPCQueryStatus query = (IPCQueryStatus) RPC.getProxy(RPCQueryStatus.class, IPCQueryServer.IPC_VER, addr, new Configuration()); IPCFileStatus status = query.getFileStatus("/tmp/testipc");
-----------------------
服务端分析
先来分析一下服务器端为什么能够提供服务,以及是怎么提供服务的。
我们将接口IPCQueryStatus 的一个实现类IPCQueryStatusImpl作为一个实例,封装成RPC.Server 为客户端提供服务,我们从RPC.getServer方法进入逐步分析。
1. getServer()方法
getServer方法调用RPC.Server的构造函数
public Server(Object instance, Configuration conf, String bindAddress, int port, int numHandlers, boolean verbose, SecretManager<? extends TokenIdentifier> secretManager);创建一个RPC.Server对象。
由于RPC.Server继承自Server,首先需要调用super()构造函数,在super中设置了ip地址,端口号,配置等成员变量信息。然后,最主要的是创建了Listener,Responder对象,这个两个对象在RPC.Server对象启动后分别起到监听客户端请求和响应客户端请求的作用。Server中还包含一个重要的成员变量是handler,而且它的个数>=1,这个变量没有在构造函数中初始化,而是在Server启动的时候(server.start())才进行初始化的,这应该是为了避免资源浪费吧。
创建好Server后,启动Server,server.start()开始为外界提供服务。
2. server.start()
public synchronized void start() { responder.start(); listener.start(); handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); } }这个方法启动了responder, listener。它们都是一个继承自Thread的线程类,因此主要分析各自的run方法。
3. listener.run()
run方法采用IO多路复用处理链接请求,在while循环中反复调用
selector.select() Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { if (key.isAcceptable()) doAccept(key); } } catch (IOException e) { } key = null; }我们再进入doAccept方法进行研究
首先从slectionKey中获得ServerSocketChannel server = key.channel()。然后调用server.accept()方法获得SocketChannel channel,获得客户端到服务器的管道连接。在源码中,获得channel是通过 while ((channel = server.accept()) != null)来获得的,这是因为OP_ACCEPT事件(这个事件是在Listner的构造函数中注册的)可能有多个。获得了SocketChannel channel,需要将它(SocketChannel)与读通道管理器(Selector readSelector, Reader的成员变量)绑定,并为该通道注册OP_READ事件,这样调用readerSelector.select()后就可以阻塞获得OP_READ事件了。
我们还为这个OP_READ事件加一个attachment, 也就是Connection, 在doRead()方法中,获得这个connection进行真正的读写。
4. Reader类分析
Listener类中,有一个数组成员变量readers,在Listener构造函数中,创建readThreads个Reader,Reader是一个线程,将它放到线程池中实现线程共享。在Listener的doAccept方法中,有一个OP_ACCEPT事件发生,就创建一个新的Connection进行读处理,这时候需要从readThreads个Reader中轮询获得一个Reader用于这个Connection的读取工作。因为Reader实现了Runnable,我们主要分析run方法做了那些工作。
run中使用注册了OP_READ事件的Selector进行IO多路复用,当有读事件发生时,调用doRead(key)方法。在doRead方法中,调用Connection的readAndProcess()方法进行读处理。
5. Connection.readAndProcess()方法分析
采用数据分帧的方式进行数据读去,首先处理帧的头部,头部包括用户的信息和权限方法,还有版本,如果开启了权限检查,而权限信息不匹配,或者存在版本信息,就会报告错误信息。根据读源码,我绘制帧的格式如下(如有误,请指证):
在读取ipc header的时候,如果发现数据长度为dataLength=Client.PING_CALL_ID(-1),则说明是客户端发送过来的PING消息。如果版本不匹配,汇报错误。读取完ipc header,就要读取connection header了, connection header是客户端发送过来的远程调用,利用这个构造对应的方法调用类。其中有两个重要的方法,processHeader() 就是处理connection header的, processData()方法就是处理远程调用的,将读取出来的调用信息封装成Call类,放在Call Queue中,等待被处理。
6. Handler 分析
我们之前分析到Listener中的readers将各个远程调用以Call类封装在Call Queue中,放到里面去之后,这里采用了生产者,消费者的模式,Handler作为一个消费者,以线程的方式运行从队列中取出Call对象进行处理。
在run方法中,先从CallQueue中取出一个Call对象来进行处理,调用call(Class<?> protocol, Writable param, long receivedTime)方法。在call方法中,首先将param对象cast成Invocation对象,并构造Method,然后调用invoke方法,调用的方式是Object value = method.invoke(instance, call.getParameters()),其中instatnce是我们在获得server的时候传进去的实例对象,也就是如本文最开始的queryService:
IPCQueryStatusImpl queryService = new IPCQueryStatusImpl(); Server server = RPC.getServer(queryService, "0.0.0.0",IPC_PORT, new Configuration());这就是一个远程过程调用在服务端的调用了。
调用完成之后,调用了两个重要的方法,一个是setupResponse, 这个是将结果以合理的帧格式存放在Call中,然后调用responder.doResponse()方法处理队列中的Call,将产生结果的Call输出到客户端。源码中这两个方法做了很多优化工作,值得再深入研究。
至此,服务端的主要过程分析完毕,接下来分析客户端过程了。
-------------------------------------------
客户端分析
我们从RPC.getProxy方法开始
1. RPC.getProxy()分析
getProxy方法最终会调用Proxy.ewProxyInstance(ClassLoader loader, Class<?>[] interfaces,InvocationHandler h)方法,这是用到了动态代理了,动态代理可以参考我之前写的博客,生成一个继承interfaces,并且包含invocationHandler成员变量的一个代理类,直接生成.class文件,因为这个.class是动态生成的,所以叫做动态代理。获得动态代理后返回给一个接口保存:
<pre name="code" class="java">IPCQueryStatus query = (IPCQueryStatus) RPC.getProxy(RPCQueryStatus.class, IPCQueryServer.IPC_VER, addr, new Configuration());
这里query这个实例其实就是动态生成的代理类了,因此调用后面的方法getFileStatus其实就是调用动态实例的对应方法,只不过再这个方法中,InvocationHandler,在这里也就是Invoker类对这个方法做了控制。接下来分析一下Invoker这个类了。
2. Invoker类分析
Invoker类中有两个重要的变量,一个是Client.ConnectionId remoteId ,一个是Client client,其中Client.ConnectionId用来标识客户端到服务端的连接,其中包括底层网络连接的一些属性和参数。在Client类中,保存了一个Hashtable<ConnectionId, Connection> connections变量,可以根据connectionId获得一个Connection对象,如果connections中没有保存,就新建一个并放进去。
Client 类管理了connections,还有调用的一些整体情况。
在Invoker.invoke方法中,会调用Client.call方法:
ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);我们进去分析call方法
3. Client.call()方法分析
call方法中,将Invocation封装成Call对象,然后根据remoteId获得Connection对象:
Connection connection = getConnection(remoteId, call);这个方法不是简单地查看hashtable connections里面有没有对应remoteId的 Connection对象,当得到或创建了一个Connection对象后,会调用connection.setupIOstreams()方法,这个方法主要完成到服务器端的连接(底层socket实现),然后获得socket的输入输出流,调用writeRpcHeader(outStream)输出rpc header,构造header信息,调用writeHeader()方法将远程调用发送到服务器端,由于Connection是一个线程,发送了调用后,在运行的线程中要等待结果的到来。
4. Connection.run()方法分析
public void run() { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting, having connections " + connections.size()); while (waitForWork()) {//wait here for work - read or close connection receiveResponse(); } close(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": stopped, remaining connections " + connections.size()); }在waitForWork中,如果目前没有正在处理的远程调用,同时shouldCloseConnection没被置位,且客户端的running标志位为true时,计算等待时间,并调用wait()方法等待。
下面集中情况结束在wait()方法上的等待,包括:
- 发生一次远程调用,也就是说,需要等待连接上返回的远程调用结果;
- markClosed()被调用,shouldCloseConnection为true,可以关闭连接;
- wait()方法超时,这意味着IPC连接长时间处于空闲状态;
- Clinet.stop()关闭整个客户端,这时running被设置为false,也可以关闭连接。