通过使用Netty实现RPC

目标:通过使用Netty框架实现RPC(远程过程调用协议),技术储备为以后实现分布式服务框架做技术储备。在这里实现自定义协议主要实现远程方法调用。

技术分析:

    1.通过Java的反射技术我们可以获取对象的属性以及调用指定的方法所以,只要指定对象的名字以及所对应的方法名和参数值以及参数类型我们就可以实现动他的调用对象。

    2.通过Netty我们可以实现数据的NIO(非阻塞异步传输)高并发高效率传递。

    3.通过代理(JDK或CGLIb)来实现动态代理。

代码实现:

一 jdk动态代理实现

  思路:在这里考虑到代理方式的多样所以用抽象的代理工厂。

AbstractProxyFactory.java

public abstract class AbstractProxyFactory {

//    public static final String JDK_PROXY_NAME = "jdkProxy";
    public static final String JDK_PROXY_NAME = "com.jewel.proxy.JdkProxyFactory";

    public static AbstractProxyFactory create(String name) {
        AbstractProxyFactory apf = null;
        try {
            apf = (AbstractProxyFactory) Class.forName(name).newInstance();
        } catch (Exception e) {
            e.printStackTrace();
        } 
        /*if (name.equals(JDK_PROXY_NAME)) {
            return new JdkProxyFactory();
        }*/
        return apf;
    }

    public abstract Object proxy(String interfaceName, String version, AbstractLoadCluster abstractLoadCluster);

    public abstract Object proxy(String interfaceName, String version);
}

实现jdk动态代理

JdkProxyFactory.java

public class JdkProxyFactory extends AbstractProxyFactory {
    public Object proxy(String interfaceName, String version,
            AbstractLoadCluster abstractLoadCluster) {
        try {
            return Proxy.newProxyInstance(Class.forName(interfaceName)
                    .getClassLoader(), new Class<?>[] { Class
                    .forName(interfaceName) }, new JewelInvocationHandler(
                    version, interfaceName, abstractLoadCluster));
        } catch (IllegalArgumentException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return null;
    }

    public Object proxy(String interfaceName, String version) {
        try {
            return Proxy.newProxyInstance(Class.forName(interfaceName)
                    .getClassLoader(), new Class<?>[] { Class
                    .forName(interfaceName) }, new JewelInvocationHandler(
                    version, interfaceName));
        } catch (IllegalArgumentException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return null;
    }
}

二 netty传输搭建

  思路:通过使用自定义事件来监听数据传输层状态,通过使用protostuff来对数据编解码。

2.1 编解码

  protostuff帮助类:SerializationUtil.java

public class SerializationUtil {

	@SuppressWarnings("unchecked")
	public static <T> byte[] serialize(Object obj, Class<T> clazz) {
		Schema<T> schema = (Schema<T>) RuntimeSchema.getSchema(clazz);
		LinkedBuffer buffer = LinkedBuffer.allocate(4096);
		byte[] protostuff = null;
		try {
			protostuff = ProtostuffIOUtil.toByteArray((T)obj, schema, buffer);
		} catch (Exception ex) {
			ex.printStackTrace();
		} finally {
			buffer.clear();
		}
		return protostuff;
	}

	public static <T> T deserialize(byte[] buff, Class<T> clazz) {
		T object = null;
		try {
			object = (T) clazz.newInstance();
			Schema<T> schema = RuntimeSchema.getSchema(clazz);
			ProtostuffIOUtil.mergeFrom(buff, object, schema);
		} catch (InstantiationException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IllegalAccessException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		return object;
	}

  编码:RPCEncoder.java

public class RPCEncoder extends MessageToByteEncoder {

	private Class<?> clazz;

	public RPCEncoder(Class<?> clazz) {
		this.clazz = clazz;
	}
	@Override
	protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
		if (clazz.isInstance(msg)) {
			// 序列化
			byte[] buff = SerializationUtil.serialize(msg, clazz);
			out.writeInt(buff.length);
			out.writeBytes(buff);
		}

	}
}

  解码:RPCDecoder.java

public class RPCDecoder extends ByteToMessageDecoder {
	private Class<?> clazz;

	public RPCDecoder(Class<?> clazz) {
		this.clazz = clazz;
	}

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
		if (in.readableBytes() < 4) {
			return;
		}
		in.markReaderIndex();
		int dataLength = in.readInt();
		if (dataLength < 0) {
			ctx.close();
		}
		if (in.readableBytes() < dataLength) {
			in.resetReaderIndex();
		}
		byte[] data = new byte[dataLength];
		in.readBytes(data);
		Object object = null;
		//反序列化
		object = SerializationUtil.deserialize(data, clazz);
		out.add(object);
	}
}

  2.2 传输层状态事件监听

    StateEvent.java

public class StateEvent {

	public String clientId;// 客户端编号,和jewel协议层一致

	private boolean isActice = true;// 通道是可用

	private boolean isClose = false;// 连接关闭不可用

	private boolean isValid = false;// 通道是空闲的

	public boolean isClose() {
		return isClose;
	}

	public void setValid(boolean isValid) {
		this.isValid = isValid;
	}

	public boolean isValid() {
		return this.isValid;
	}

	public void setClose(boolean isClose) {
		this.isClose = isClose;
	}

	private boolean reConnect;

	public void setReConnect(boolean reConnect) {
		this.reConnect = reConnect;
	}

	public void setClientId(String clientId) {
		this.clientId = clientId;
	}

	public String getClientId() {
		return clientId;
	}

	public boolean isReConnect() {
		return reConnect;
	}

	public boolean isActive() {
		return isActice;
	}

	public void setActice(boolean isActice) {
		this.isActice = isActice;
	}

	@Override
	public String toString() {
		return "StateEvent [clientId=" + clientId + ", isActice=" + isActice + ", isClose=" + isClose + ", isVal
				+ reConnect + "]";
	}

}

  ProtocolActiveListener.java

/**
 * 
 * @Description: netty客户端连接状态监听器类
 * @author maybo
 * @date 2016年5月13日 下午3:12:24
 *
 */
public interface ProtocolActiveListener {
	/**
	 * 
	 * @Description: TODO(事件方法,客户端连接状态)
	 * @param @param stateEvent 状态类
	 * @return void 返回类型
	 */
	public void clientStateEvent(StateEvent stateEvent);

}

2.3 Server层

RPCServer.java

public class RPCServer {
    private static final Logger LOGGER = LoggerFactory.getLogger(RPCServer.class);

    public static  String IP = "127.0.0.1";

    public static  int PORT = 9998;

    public static int READ_IDLE_TIME = 60;// 读空闲时间

    public static int WRITE_IDLE_TIME = 30;// 写空闲时间

    public static int ALL_IDLE_TIME = 10;// 读写空闲时间

    public static int TIMEOUT_SECONDS = 300;//

    /** 用于分配处理业务线程的线程组个数 */
    protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors() * 2; // 默认
    /** 业务出现线程大小 */
    protected static final int BIZTHREADSIZE = 4;

    /*
     * NioEventLoopGroup实际上就是个线程池,
     * NioEventLoopGroup在后台启动了n个NioEventLoop来处理Channel事件,
     * 每一个NioEventLoop负责处理m个Channel,
     * NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理Channel
     */
    private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);
    private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);

    public static void run() throws Exception {
        LOGGER.info("----------------------------------------开启服务端连接--IP:"+IP+PORT+"------------------------");
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup);
        b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
        b.channel(NioServerSocketChannel.class);
        b.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new IdleStateHandler(READ_IDLE_TIME, WRITE_IDLE_TIME, ALL_IDLE_TIME, TimeUnit.SECONDS));
                pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                pipeline.addLast(new RPCDecoder(RPCRequest.class));
                pipeline.addLast(new RPCEncoder(RPCResponse.class));
                pipeline.addLast(new RPCServerHandler());

            }
        });
        b.bind(IP, PORT).sync();
        LOGGER.info("----------------------------------------开启服务端连接成功-------------------------");
    }

    protected static void shutdown() {
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception {
        RPCServer.run();

    }
}

RPCServerHandler.java

public class RPCServerHandler extends SimpleChannelInboundHandler<RPCRequest> {

    private static FutureTaskService taskService = FutureTaskService.newInstance();

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, RPCRequest request) {
        if (request.getType() != 1) {// 调用服务
            try {
                logger.info("Channel"+ctx.channel().id().asShortText()+"-------------"+"From:"+"jewel:"+ctx.channel().localAddress()+"/"+request.getInterfaceName()+"/"+request.getMethodName());
                taskService.invoke(request, ctx.channel().id().asShortText(), ctx);
            } catch (InterruptedException e) {
                RPCResponse response = new RPCResponse();
                response.setDate(new Date());
                response.setError(new RuntimeException());
                response.setResponseId(request.getRequestId());
                response.setState("505");
                if (ctx.channel().isActive()) {
                    ctx.channel().writeAndFlush(response);
                }
                e.printStackTrace();
            } catch (ExecutionException e) {
                RPCResponse response = new RPCResponse();
                response.setDate(new Date());
                response.setError(new RuntimeException());
                response.setResponseId(request.getRequestId());
                response.setState("510");
                if (ctx.channel().isActive()) {
                    ctx.channel().writeAndFlush(response);
                }
                e.printStackTrace();
            }
        }
    }

    /**
     * 一段时间未进行读写操作 回调
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // TODO Auto-generated method stub
        super.userEventTriggered(ctx, evt);

        if (evt instanceof IdleStateEvent) {

            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state().equals(IdleState.READER_IDLE)) {// 超时关闭通道
                // 超时关闭channel
                ctx.close();

            } else if (event.state().equals(IdleState.WRITER_IDLE)) {
                // 写超时
            } else if (event.state().equals(IdleState.ALL_IDLE)) {// 心跳检测
                // 未进行读写
                RPCResponse response = new RPCResponse();
                response.setType(1);
                if (ctx.channel().isActive()) {
                    ctx.channel().writeAndFlush(response);
                }

            }

        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception {
        super.exceptionCaught(ctx, e);
        if (e instanceof IOException) {
            ctx.channel().close();
        }
        e.printStackTrace();
    }

}

2.4 Client层

RPCClient.java

public class RPCClient {

	private Logger logger = LoggerFactory.getLogger(this.getClass());

	private String id;// 编号

	public static String HOST = "127.0.0.1";

	public static int PORT = 9999;

	public static int TIMEOUT__SECONDS = 120;// 超时时间

	public static int CONNECT_TIMEOUT_SECONDS = 3;// 连接超时时间

	public static int RECONN_TIME_SECONDS = 3;// 重新建立连接时间

	private ChannelFuture channelFuture;

	private int connectAmount = 0;

	private RPCClient client = null;

	private Bootstrap bootstrap = null;

	private EventLoopGroup group;

	public static Map<String, Object> responseMap = new ConcurrentHashMap<String, Object>();

	public ProtocolActiveListener activeListener = null;

	public ProtocolActiveListener getActiveListener() {
		return activeListener;
	}

	public RPCClient(String id) {
		this.id = id;
	}

	/**
	 * 初始化Bootstrap
	 * 
	 * @return
	 */
	private Bootstrap getBootstrap() {
		group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_SECONDS * 1000);
		b.group(group).channel(NioSocketChannel.class);
		b.handler(new ChannelInitializer<Channel>() {
			@Override
			protected void initChannel(Channel ch) throws Exception {
				ChannelPipeline pipeline = ch.pipeline();
				pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));// 防止丢包
				pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
				pipeline.addLast(new RPCEncoder(RPCRequest.class));// 编码
				pipeline.addLast(new RPCDecoder(RPCResponse.class));// 转码
				pipeline.addLast(new RPCClientHandler(responseMap, client));// 处理
			}
		});
		b.option(ChannelOption.SO_KEEPALIVE, true);
		return b;
	}

	/**
	 * 
	 * @Description: TODO(初始化建立连接)
	 * @param @param id
	 * @param @return 设定文件
	 * @return RPCClient 返回类型
	 */
	public static RPCClient init(String id) {
		RPCClient client = new RPCClient(id);
		client.client = client;
		client.bootstrap = client.getBootstrap();
		return client;
	}

	public RPCClient getClient() {
		return client;
	}

	public boolean isActive() {
		return client.getChannelFuture().channel().isActive();
	}

	public RPCClient doConnection() throws InterruptedException {
		client.connection(HOST, PORT);
		return client;
	}

	/**
	 * 
	 * @Description: TODO(连接)
	 * @param @param host
	 * @param @param port
	 * @param @return 设定文件
	 * @return RPCClient 返回类型
	 * @throws InterruptedException 
	 */
	public RPCClient connection(String host, int port) throws InterruptedException {
		
			channelFuture = client.bootstrap.connect(host, port).sync();
			channelFuture.addListener(new ChannelFutureListener() {

				public void operationComplete(ChannelFuture future) throws Exception {// 重连机制
					if (future.isSuccess()) {
						// 连接成功连接指数为0
						connectAmount = 0;

						if (null != activeListener) {// 接听客户状态
							StateEvent stateEvent = new StateEvent();
							stateEvent.setActice(true);
							stateEvent.setClientId(id);
							stateEvent.setClose(false);
							stateEvent.setReConnect(true);
							activeListener.clientStateEvent(stateEvent);
						}
						logger.info("客户端连接成功。");

					} else {
						connectAmount++;
						if (connectAmount == 3) {// 连接数大于3次停止连接
							connectAmount = 0;
							shutdown();// 关闭连接
						} else {
							future.channel().eventLoop().schedule(new Runnable() {
								public void run() {
									try {
										doConnection();
									} catch (InterruptedException e) {
										e.printStackTrace();
										logger.error("------------重新连接服务器失败--------");
									}
								}
							}, RECONN_TIME_SECONDS, TimeUnit.SECONDS);

							if (null != activeListener) {// 接听客户状态
								StateEvent stateEvent = new StateEvent();
								stateEvent.setActice(false);
								stateEvent.setClientId(id);
								stateEvent.setClose(false);
								stateEvent.setReConnect(false);
								activeListener.clientStateEvent(stateEvent);
							}
						}
					}
				}
			});
		
		return client;
	}

	public ChannelFuture getChannelFuture() {
		return channelFuture;
	}

	/**
	 * 
	 * @Description: TODO(发送消息)
	 * @param @param request
	 * @param @return
	 * @param @throws Exception 设定文件
	 * @return RPCResponse 返回类型
	 */
	public RPCResponse sendMsg(RPCRequest request) throws Exception {
		if (null == request) {
			throw new NullPointerException();
		}
		try {
			RPCResponse response = new RPCResponse();
			responseMap.put(request.getRequestId(), response);

			if (channelFuture.channel().isOpen() && channelFuture.channel().isActive()) {
				if (null != this.activeListener) {// 发送监听通道
					StateEvent stateEvent = new StateEvent();
					stateEvent.setActice(true);
					stateEvent.setClientId(this.id);
					stateEvent.setClose(false);
					stateEvent.setReConnect(false);
					stateEvent.setValid(false);
					activeListener.clientStateEvent(stateEvent);
				}
				channelFuture.channel().writeAndFlush(request);
				synchronized (response) {
					if (null == response.getResponseId()) {
						response.wait(TIMEOUT__SECONDS * 1000);
					} else {
						response.notifyAll();
					}
					if (null == response.getResponseId()) {
						channelFuture.channel().close();
						response = new RPCResponse();
						response.setDate(new Date());
						response.setResponseId(request.getRequestId());
						response.setState("305");
						response.setError(new RequestTimeOutException("请求超时"));
						this.shutdown();// 关闭连接

					} else {
						if (null != this.activeListener) {// 发送监听通道
							StateEvent stateEvent = new StateEvent();
							stateEvent.setActice(true);
							stateEvent.setClientId(this.id);
							stateEvent.setClose(false);
							stateEvent.setReConnect(false);
							stateEvent.setValid(true);
							activeListener.clientStateEvent(stateEvent);
						}
						response = (RPCResponse) responseMap.get(request.getRequestId());
						System.out.println(response.toString());
						responseMap.remove(request.getRequestId());
					}
				}

			}

			return response;
		} finally {

		}
	}

	public void shutdown() throws InterruptedException {
		channelFuture.channel().close();
		group.shutdownGracefully();

		if (null != activeListener) {// 接听客户状态
			StateEvent stateEvent = new StateEvent();
			stateEvent.setActice(false);
			stateEvent.setClientId(this.id);
			stateEvent.setClose(true);
			activeListener.clientStateEvent(stateEvent);
		}
		logger.info("客户端关闭连接。");
	}

	public void addActiveListener(ProtocolActiveListener listener) {
		this.activeListener = listener;
	}

	public String id() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

}

RPCClientHandler.java

public class RPCClientHandler extends SimpleChannelInboundHandler<RPCResponse> {

	private RPCClient client = null;

	private Logger logger = LoggerFactory.getLogger(RPCClientHandler.class);

	public void setClient(RPCClient client) {
		this.client = client;
	}

	private Map<String, Object> responseMap;

	public RPCClientHandler(Map<String, Object> responseMap) {
		this.responseMap = responseMap;
	}

	public RPCClientHandler(Map<String, Object> responseMap, RPCClient client) {
		this.responseMap = responseMap;
		this.client = client;
	}

	@Override
	protected void messageReceived(ChannelHandlerContext ctx, RPCResponse response) throws Exception {
		if (response.getType() == 1) {// 发送心跳回文

			if (null != this.client.getActiveListener()) {// 监听通道空闲
				StateEvent stateEvent = new StateEvent();
				stateEvent.setActice(true);
				stateEvent.setClientId(this.client.id());
				stateEvent.setClose(false);
				stateEvent.setReConnect(false);
				stateEvent.setValid(true);
				this.client.getActiveListener().clientStateEvent(stateEvent);
			}

			RPCRequest request = new RPCRequest();
			request.setType(1);
			ctx.channel().writeAndFlush(request);
		} else {// 接收消息
			RPCResponse rpcResponse = (RPCResponse) responseMap.get(response.getResponseId());
			synchronized (rpcResponse) {
				BeanUtils.copyProperties(rpcResponse, response);
				rpcResponse.notifyAll();
			}
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception {
		e.printStackTrace();
		super.exceptionCaught(ctx, e);
		if (e instanceof IOException) {// 发生异常关闭通道
			ctx.channel().close();
			client.shutdown();
		}
	}

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		super.channelInactive(ctx);
		// 重新连接服务器
		ctx.channel().eventLoop().schedule(new Runnable() {
			public void run() {
				try {
					client.doConnection();
				} catch (InterruptedException e) {
					logger.error("断开连接后重新建立连接出现异常."+e.getMessage());
					e.printStackTrace();
				}
			}
		}, RPCClient.RECONN_TIME_SECONDS, TimeUnit.SECONDS);
	}

}

三 反射实现方法调用

RequestMethodInvoker.java
public class RequestMethodInvoker {

    public RPCResponse invoke(RPCRequest request) {
        if (null == request) {
            throw new NullPointerException();
        }

        RPCResponse response = new RPCResponse();
        response.setResponseId(request.getRequestId());
        response.setDate(new Date());

        String interfaceName = request.getInterfaceName();// 接口名字
        if (null == interfaceName || interfaceName.length() <= 0) {// 接口名为空
            response.setState("400");
            response.setError(new NullPointerException("接口名字不可以为空"));
        } else {
            Object object = null;
            if (null != request.getVersion() && request.getVersion().length() > 0) {// 存在版本号
                object = ContextBeanUtil.getBean(interfaceName, request.getVersion());// 获取服务对象
            } else {
                object = ContextBeanUtil.getBean(interfaceName);// 获取服务对象
            }
            Object result;
            try {
                if (null == object) {
                    response.setError(new NotFindServiceException("没有找到服务实现类异常"));
                    response.setState("405");
                } else {
                    result = MethodInvokeUtil.invoke(object, request.getParams(), request.getMethodName(), request.getParamTypes());
                    response.setResult(result);
                    response.setState("200");
                }
            } catch (IllegalAccessException e) {
                response.setState("410");
                response.setError(e);
            } catch (IllegalArgumentException e) {
                response.setState("415");
                response.setError(e);
            } catch (InvocationTargetException e) {
                response.setState("420");
                response.setError(e);
            } catch (NoSuchMethodException e) {
                response.setState("425");
                response.setError(e);
            } catch (SecurityException e) {
                response.setState("430");
                response.setError(e);
            } catch (ClassNotFoundException e) {
                response.setState("435");
                response.setError(e);
            }

        }
        return response;
    }

}

总结:通过以上方式实现RPC功能。