hadoop_rpc之RPC(四)
hadoop_rpc之RPC(4)
有了Client,有了Server,那整个过程怎么运行起来?
先说一下基本原理:
上面说的这些,都全部在hadoop的这个RPC里进行了实现。
客户端的主要代理实现方法如下:
其中是调用RpcEngine的下面这个接口方法来进行实现的:
对应的,可以查看一个具体实现的代码,WritableRpcEngine类的实现:
真正的代理处理在InVoker类里实现(关于JDK的动态代理,可参看http://jimmee.iteye.com/admin/blogs/776820)
服务器端真正的实现,也在RpcEngine的一个具体实现里:
一个简单的例子:
客户端和服务器端的协议及实现:
服务器代码:
客户端的代码:
有了Client,有了Server,那整个过程怎么运行起来?
先说一下基本原理:
- 1. 首先客户端和服务器端之间要有一个协议,这里的协议就是以java接口类的方式暴露出来的
- 2. 虽然Client类和Server类之间已经具有通信的能力,也有了协议,那么一个真正的客户端要调用服务器端rpc调用的实现,只需要解决参数及具体的调用实现两个问题即可
- 3. 客户端要做的,就是要将参数(这个一般称为存根)通过网络传递到服务器端。这个自然而然想到使用代理模式,因为Client已经具备网络通信的能力,只要通过代理,实现获取参数进行传输即可,为什么不在Client这里实现参数的获取,如果这样的话,就违反了单一职责的原则,且扩展性不行,总不能一个客户端的调用实现一个特定的Client类吧。因此,将Client的功能单一独立出来,只负责将参数通过网络传递到服务器端
- 4. 服务器要做的工作,只需要进行调用的真正的实现即可,当然了, 最后需要能够返回正确的结果。
上面说的这些,都全部在hadoop的这个RPC里进行了实现。
客户端的主要代理实现方法如下:
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { if (UserGroupInformation.isSecurityEnabled()) { SaslRpcServer.init(conf); } return getProtocolEngine(protocol,conf).getProxy(protocol, clientVersion, addr, ticket, conf, factory, rpcTimeout); }
其中是调用RpcEngine的下面这个接口方法来进行实现的:
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException
对应的,可以查看一个具体实现的代码,WritableRpcEngine类的实现:
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); return new ProtocolProxy<T>(protocol, proxy, true); }
真正的代理处理在InVoker类里实现(关于JDK的动态代理,可参看http://jimmee.iteye.com/admin/blogs/776820)
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { long startTime = 0; if (LOG.isDebugEnabled()) { startTime = System.currentTimeMillis(); } ObjectWritable value = (ObjectWritable) // 这里取得要调用的方法,参数列表,之后通过Client对象传递给服务器端 client.call(new Invocation(method, args), remoteId); if (LOG.isDebugEnabled()) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); }
服务器端真正的实现,也在RpcEngine的一个具体实现里:
public Writable call(Class<?> protocol, Writable param, long receivedTime) throws IOException { …. Invocation call = (Invocation)param; if (verbose) log("Call: " + call); Method method = protocol.getMethod(call.getMethodName(), call.getParameterClasses()); method.setAccessible(true); // Verify rpc version …. //Verify protocol version. …… long startTime = System.currentTimeMillis(); // 真正的调用 Object value = method.invoke(instance, call.getParameters()); int processingTime = (int) (System.currentTimeMillis() - startTime); int qTime = (int) (startTime-receivedTime); if (LOG.isDebugEnabled()) { LOG.debug("Served: " + call.getMethodName() + " queueTime= " + qTime + " procesingTime= " + processingTime); } rpcMetrics.addRpcQueueTime(qTime); rpcMetrics.addRpcProcessingTime(processingTime); rpcDetailedMetrics.addProcessingTime(call.getMethodName(), processingTime); if (verbose) log("Return: "+value); return new ObjectWritable(method.getReturnType(), value); ….. }
一个简单的例子:
客户端和服务器端的协议及实现:
package cn.edu.jimmee; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.VersionedProtocol; /** * rpc的协议接口 * @author jimmee */ public interface RpcProtocol extends VersionedProtocol { public BooleanWritable printMsg(IntWritable id, Text msg); }
package cn.edu.jimmee; import java.io.IOException; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; /** * rpc的协议实现接口 * @author jimmee */ public class RpcProtocolImpl implements RpcProtocol { @Override public BooleanWritable printMsg(IntWritable id, Text msg) { System.out.println("id=" + id.get() + ", msg=" + msg.toString()); if (Math.random() < 0.5) { return new BooleanWritable(true); } else { return new BooleanWritable(false); } } @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { return 0; } }
服务器代码:
package cn.edu.jimmee; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; /** * rpc的server * @author jimmee */ public class RpcServer { public static void main(String[] args) throws IOException { RpcProtocol instance = new RpcProtocolImpl(); Configuration conf = new Configuration(); Server server = RPC.getServer(instance, "127.0.0.1", 7777, conf); server.start(); } }
客户端的代码:
package cn.edu.jimmee; import java.io.IOException; import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; /** * rpc的client端的实现 * @author jimmee */ public class RpcClient { public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); RpcProtocol rpcClientImpl = (RpcProtocol) RPC.getProxy(RpcProtocol.class, 0, new InetSocketAddress("127.0.0.1", 7777), conf); for (int i = 0; i < 10; i++) { System.out.println(rpcClientImpl.printMsg(new IntWritable(i), new Text("hello" + i))); } } }