hadoop_rpc之RPC(四)

hadoop_rpc之RPC(4)
有了Client,有了Server,那整个过程怎么运行起来?
先说一下基本原理:
  • 1. 首先客户端和服务器端之间要有一个协议,这里的协议就是以java接口类的方式暴露出来的
  • 2. 虽然Client类和Server类之间已经具有通信的能力,也有了协议,那么一个真正的客户端要调用服务器端rpc调用的实现,只需要解决参数及具体的调用实现两个问题即可
  • 3. 客户端要做的,就是要将参数(这个一般称为存根)通过网络传递到服务器端。这个自然而然想到使用代理模式,因为Client已经具备网络通信的能力,只要通过代理,实现获取参数进行传输即可,为什么不在Client这里实现参数的获取,如果这样的话,就违反了单一职责的原则,且扩展性不行,总不能一个客户端的调用实现一个特定的Client类吧。因此,将Client的功能单一独立出来,只负责将参数通过网络传递到服务器端
  • 4. 服务器要做的工作,只需要进行调用的真正的实现即可,当然了, 最后需要能够返回正确的结果。


上面说的这些,都全部在hadoop的这个RPC里进行了实现。
客户端的主要代理实现方法如下:
 
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
                                long clientVersion,
                                InetSocketAddress addr,
                                UserGroupInformation ticket,
                                Configuration conf,
                                SocketFactory factory,
                                int rpcTimeout) throws IOException {    
    if (UserGroupInformation.isSecurityEnabled()) {
      SaslRpcServer.init(conf);
    }
    return getProtocolEngine(protocol,conf).getProxy(protocol,
        clientVersion, addr, ticket, conf, factory, rpcTimeout);
  }


其中是调用RpcEngine的下面这个接口方法来进行实现的:
 
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
                         InetSocketAddress addr, UserGroupInformation ticket,
                         Configuration conf, SocketFactory factory,
                         int rpcTimeout)
    throws IOException

对应的,可以查看一个具体实现的代码,WritableRpcEngine类的实现:
 public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
                         InetSocketAddress addr, UserGroupInformation ticket,
                         Configuration conf, SocketFactory factory,
                         int rpcTimeout)
    throws IOException {    

    T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
        new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
            factory, rpcTimeout));
    return new ProtocolProxy<T>(protocol, proxy, true);
}


真正的代理处理在InVoker类里实现(关于JDK的动态代理,可参看http://jimmee.iteye.com/admin/blogs/776820)
  
public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
      long startTime = 0;
      if (LOG.isDebugEnabled()) {
        startTime = System.currentTimeMillis();
      }
		
      ObjectWritable value = (ObjectWritable)
      // 这里取得要调用的方法,参数列表,之后通过Client对象传递给服务器端
client.call(new Invocation(method, args), remoteId);
      if (LOG.isDebugEnabled()) {
        long callTime = System.currentTimeMillis() - startTime;
        LOG.debug("Call: " + method.getName() + " " + callTime);
      }
      return value.get();
    }


服务器端真正的实现,也在RpcEngine的一个具体实现里:
public Writable call(Class<?> protocol, Writable param, long receivedTime) 
    throws IOException {
		….
        Invocation call = (Invocation)param;
        if (verbose) log("Call: " + call);

        Method method = protocol.getMethod(call.getMethodName(),
                                           call.getParameterClasses());
        method.setAccessible(true);

        // Verify rpc version
        ….
        
        //Verify protocol version.
       ……

        long startTime = System.currentTimeMillis();
		  // 真正的调用
        Object value = method.invoke(instance, call.getParameters());
        int processingTime = (int) (System.currentTimeMillis() - startTime);
        int qTime = (int) (startTime-receivedTime);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Served: " + call.getMethodName() +
                    " queueTime= " + qTime +
                    " procesingTime= " + processingTime);
        }
        rpcMetrics.addRpcQueueTime(qTime);
        rpcMetrics.addRpcProcessingTime(processingTime);
        rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
                                             processingTime);
        if (verbose) log("Return: "+value);

        return new ObjectWritable(method.getReturnType(), value);
		…..     
  }

一个简单的例子:
客户端和服务器端的协议及实现:
package cn.edu.jimmee;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.VersionedProtocol;
/**
 * rpc的协议接口
 * @author jimmee
 */
public interface RpcProtocol extends VersionedProtocol {
	public BooleanWritable printMsg(IntWritable id, Text msg);
}

package cn.edu.jimmee;
import java.io.IOException;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
/**
 * rpc的协议实现接口
 * @author jimmee
 */
public class RpcProtocolImpl implements RpcProtocol {
	@Override
	public BooleanWritable printMsg(IntWritable id, Text msg) {
		System.out.println("id=" + id.get() + ", msg=" + msg.toString());	
		if (Math.random() < 0.5) {
			return new BooleanWritable(true);
		} else {
			return new BooleanWritable(false);
		}
	}

	@Override
	public long getProtocolVersion(String protocol,
            long clientVersion)
			throws IOException {
		return 0;
	}
}


服务器代码:
package cn.edu.jimmee;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
/**
 * rpc的server
 * @author jimmee
 */
public class RpcServer {
	public static void main(String[] args) throws IOException {
		RpcProtocol instance = new RpcProtocolImpl();
		Configuration conf = new Configuration();
		Server server = RPC.getServer(instance, "127.0.0.1", 7777, conf);
		server.start();
	}
}

客户端的代码:
package cn.edu.jimmee;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
/**
 * rpc的client端的实现
 * @author jimmee
 */
public class RpcClient {
	public static void main(String[] args) throws IOException {
		Configuration conf = new Configuration();
		RpcProtocol rpcClientImpl = (RpcProtocol) RPC.getProxy(RpcProtocol.class, 0, new InetSocketAddress("127.0.0.1", 7777), conf);
		for (int i = 0; i < 10; i++) {
			System.out.println(rpcClientImpl.printMsg(new IntWritable(i), new Text("hello" + i)));
		}
	}
}