hadoop学习札记(二)——RPC代理机制解析
RPC/代理机制分析
在hadoop的通信机制中不得不提到rpc通信机制,Client与namenode,namenode与datanode,datanode之间很多通信都是基于rpc机制。
提到RPC机制,其实根据我的理解,他就是一种代理,只不过它和我们平常的代理不同的地方是它是一种远程代理!首先介绍一下代理机制!
一、代理机制
提到代理机制又不得不提到Java中的一个非常重要的概念,反射机制!
那么什么是反射呢?
简单的说就是应用程序在运行是通过JavaReflection api取得一个类名的class信息。请看代码示例:
public interface people{ public void study(); } //内部类; public static class Student implements people{ private String name;//名字; private int age; //构造方法; public Student(){} //构造方法2; public Student(String name,int age){ this.name=name; this.age=age; } //set和get方法; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public void study(){ System.out.println("学生在学习"); } } //程序的主方法; public static void main(String[] args) { Student stu=new Student("陆亮",20); Class<? extends Student> tmp=stu.getClass(); String cName=tmp.getName(); System.out.println("类的名字是"+cName); try { Class c=Class.forName(cName); //得到类中的方法; java.lang.reflect.Method[] ms=c.getMethods(); for(java.lang.reflect.Method m:ms){ System.out.println("方法的名字是"+m.getName()); System.out.println("方法的返回值类型是"+ m.getReturnType().toString()); System.out.println("方法的参数类型是 " +m.getParameterTypes()); } //得到属性; java.lang.reflect.Field[] fields=c.getFields(); for(java.lang.reflect.Field f:fields){ System.out.println("参数类型是"+f.getType()); } //得到父接口 Class[] is=c.getInterfaces(); for(Class s:is){ System.out.println("父接口的名字是"+s.getName()); } } catch (ClassNotFoundException e) { e.printStackTrace(); } }
部分结果如下:
下面介绍代理机制:
其实代理机制我的理解就是一个对象代替另外一个对象实现某项功能!其中前一个对象我们称之为代理对象,后面一个对象才是我们的真实对象!
对于代理类我们必须实现InvocationHandler接口,必须实现它的invoke(Object proxy, Method method, Object[] args)方法;在这个方法中我们调用真实对象的方法。
public class TestProxy implements java.lang.reflect.InvocationHandler{ //被代理对象 private Object obj; //构造方法 public TestProxy(Object obj){ this.obj=obj; } /** * 得到被代理对象; * @param obj * @return */ public static Object getProxy(Object obj){ //return调用此方法; return java.lang.reflect.Proxy.newProxyInstance(obj.getClass().getClassLoader(), obj.getClass().getInterfaces(), new TestProxy(obj)); } /** * 调用此方法执行 */ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //结果参数; Object result; try{ System.out.println("代理对象"+proxy.getClass().getName()); System.out.println("代理方法"+method.getName()); //调用对象的实际方法; result=method.invoke(obj, args); } catch (InvocationTargetException e) { throw e.getTargetException(); } catch (Exception e) { throw new RuntimeException("代理 : " + e.getMessage()); } finally { System.out.println("代理结束:" + method.getName()); } return result; } }
真实的对象中的那个方法,
//构造方法重载;
public SumInt(Integer a,Integer b){
this.a=a;
this.b=b;
}
其调用关系如下:
二、RPC(远程方法调用)
RPC的本质也是代理,但是RPC是一种远程代理,在我们的代理类的实现方法中我们不是直接invoke调用真实类的方法,而是通过socket或是其他向server发送信息,server接受信息,调用方法然后返回结果:在客户端看来就好像真的调用了代理对象的方法一样!神奇吧。。。
请看示例,在这里我们实现了一个简单的演示,客户端发送一条消息给server,server端调用方法返回另外一条消息。
public class Client { //客户端和服务器建立socket连接 private java.net.Socket sc; //输入输出流; private java.io.InputStream ins; private java.io.OutputStream ous; //字符缓冲输出流和输入流 private java.io.BufferedReader br; private java.io.BufferedWriter bw; //代理对象; ProxyClient pc; //测试方法; public void test(){ System.out.println("开始测试"); //建立socket连接; try { sc=new Socket("localhost",9090); System.out.println("建立了连接!"); //得到输出流; ins=sc.getInputStream(); ous=sc.getOutputStream(); br=new java.io.BufferedReader(new java.io.InputStreamReader(ins)); bw=new java.io.BufferedWriter(new java.io.OutputStreamWriter(ous)); //构造代理对象; pc=new ProxyClient(br,bw); ifconfig ic=(ifconfig) java.lang.reflect.Proxy.newProxyInstance(ifconfig.class.getClassLoader(), new Class[] {ifconfig.class}, pc); String result=ic.sayWord("hello"); System.out.println("返回的结果是"+result); try { br.close(); bw.flush(); bw.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } catch (UnknownHostException e) { System.out.println("连接错误!"); e.printStackTrace(); } catch (IOException e) { System.out.println("IO错误"); e.printStackTrace(); } } } 在代理类的invoke方法中我们发送信息,然后等待结果: //代理方法; public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //再次发送类名,方法名,参数列表; Object result; //写入类名; bw.write("cn.luliang.rpc.rpcServer.ProxyServer"+"\r\n"); //写入方法名; bw.write(method.getName()+"\r\n"); //写入参数名; for(Object obj:args){ if(obj instanceof String){//如果是String对象的示例 String str=(String)obj; bw.write(str+"\r\n"); } } bw.flush(); //阻塞式读取结果; result=br.readLine(); System.out.println("返回了结果"); return result; } 再来看看server端的实现代码: public void readMsg(){ String s=null; String result=""; try { //读取类名; String cName=br.readLine(); //读取方法名; String fName=br.readLine(); s=br.readLine(); Class c; ProxyServer ts; Method m; try { c=Class.forName(cName); //产生示例; try { ts=(ProxyServer) c.newInstance(); Method[] ms=c.getMethods(); for(Method mt:ms){ if(mt.getName().equals(fName)){ m=mt; try { result=(String) mt.invoke(ts, result); System.out.println("服务器返回结果"+result); bw.write(result+"\r\n"); bw.flush(); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } } catch (IllegalAccessException e) { e.printStackTrace(); } } catch (ClassNotFoundException e) { System.out.println("类没有找到!"); e.printStackTrace(); } catch (InstantiationException e) { e.printStackTrace(); } } catch (IOException e) { e.printStackTrace(); } }
结果如下:
<!--EndFragment-->