由浅入深了解Thrift之客户端连接池化
一、问题描述
在上一篇《由浅入深了解Thrift之服务模型和序列化机制》文章中,我们已经了解了thrift的基本架构和网络服务模型的优缺点。如今的互联网圈中,RPC服务化的思想如火如荼。我们又该如何将thrift服务化应用到我们的项目中哪?实现thrift服务化前,我们先想想这几个问题:服务注册、服务发现、服务健康检测、服务“Load Balance”、隐藏client和server端的交互细节、服务调用端的对象池化。
- 服务的注册、发现和健康检测,我们使用zookeeper可以很好的解决
- 服务“Load Balance",我们可以使用简单的算法“权重+随机”,当然也可以使用成熟复杂的算法
- 服务调用端的对象池化,我们可以使用common pool,使用简单又可以满足我们的需求
二、实现思路
1、thrift server端启动时,每个实例向zk集群以临时节点方式注册(这样,遍历zk上/server下有多少个临时节点就知道有哪些server实例)
thrift server端可以单机多端口多实例或多机部署多实例方式运行。
2、服务调用方实现一个连接池,连接池初始化时,通过zk将在线的server实例信息同步到本地并缓存,同时监听zk下的节点变化。
3、服务调用方与Server通讯时,从连接池中取一个可用的连接,用它实现RPC调用。
三、具体实现
1、thrift server端
thrift server端,向zk中注册server address
package com.wy.thriftpool.commzkpool; import java.lang.instrument.IllegalClassFormatException; import java.lang.reflect.Constructor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TBinaryProtocol.Factory; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadedSelectorServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.springframework.beans.factory.InitializingBean; import com.wy.thrift.service.UserService.Processor; import com.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter; import com.wy.thriftpool.commzkpool.support.ThriftServerIpTransfer; import com.wy.thriftpool.commzkpool.support.impl.LocalNetworkIpTransfer; /** * thrift server端,向zk中注册server address * * @author wy * */ public class ThriftServiceServerFactory implements InitializingBean { // thrift server 服务端口 private Integer port; // default 权重 private Integer priority = 1; // service实现类 private Object service; // thrift server 注册路径 private String configPath; private ThriftServerIpTransfer ipTransfer; // thrift server注册类 private ThriftServerAddressReporter addressReporter; // thrift server开启服务 private ServerThread serverThread; @Override public void afterPropertiesSet() throws Exception { if (ipTransfer == null) { ipTransfer = new LocalNetworkIpTransfer(); } String ip = ipTransfer.getIp(); if (ip == null) { throw new NullPointerException("cant find server ip..."); } String hostname = ip + ":" + port + ":" + priority; Class<? extends Object> serviceClass = service.getClass(); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Class<?>[] interfaces = serviceClass.getInterfaces(); if (interfaces.length == 0) { throw new IllegalClassFormatException("service-class should implements Iface"); } // reflect,load "Processor"; Processor<?> processor = null; for (Class<?> clazz : interfaces) { String cname = clazz.getSimpleName(); if (!cname.equals("Iface")) { continue; } String pname = clazz.getEnclosingClass().getName() + "$Processor"; try { Class<?> pclass = classLoader.loadClass(pname); if (!pclass.isAssignableFrom(Processor.class)) { continue; } Constructor<?> constructor = pclass.getConstructor(clazz); processor = (Processor<?>) constructor.newInstance(service); break; } catch (Exception e) { // TODO } } if (processor == null) { throw new IllegalClassFormatException("service-class should implements Iface"); } // 需要单独的线程,因为serve方法是阻塞的. serverThread = new ServerThread(processor, port); serverThread.start(); // report if (addressReporter != null) { addressReporter.report(configPath, hostname); } } class ServerThread extends Thread { private TServer server; ServerThread(Processor<?> processor, int port) throws Exception { // 设置传输通道 TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port); // 设置二进制协议 Factory protocolFactory = new TBinaryProtocol.Factory(); TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport); tArgs.processor(processor); tArgs.transportFactory(new TFramedTransport.Factory()); tArgs.protocolFactory(protocolFactory); int num = Runtime.getRuntime().availableProcessors() * 2 + 1; tArgs.selectorThreads(num); tArgs.workerThreads(num * 10); // 网络服务模型 server = new TThreadedSelectorServer(tArgs); } @Override public void run() { try { server.serve(); } catch (Exception e) { //TODO } } public void stopServer() { server.stop(); } } public void close() { serverThread.stopServer(); } public void setService(Object service) { this.service = service; } public void setPriority(Integer priority) { this.priority = priority; } public void setPort(Integer port) { this.port = port; } public void setIpTransfer(ThriftServerIpTransfer ipTransfer) { this.ipTransfer = ipTransfer; } public void setAddressReporter(ThriftServerAddressReporter addressReporter) { this.addressReporter = addressReporter; } public void setConfigPath(String configPath) { this.configPath = configPath; } }
thrift server address注册到zk
package com.wy.thriftpool.commzkpool.support.impl; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.zookeeper.CreateMode; import com.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter; /** * thrift server address注册到zk * * @author wy * */ public class DynamicAddressReporter implements ThriftServerAddressReporter { private CuratorFramework zookeeper; public DynamicAddressReporter() { } public DynamicAddressReporter(CuratorFramework zookeeper) { this.zookeeper = zookeeper; } public void setZookeeper(CuratorFramework zookeeper) { this.zookeeper = zookeeper; } @Override public void report(String service, String address) throws Exception { if (zookeeper.getState() == CuratorFrameworkState.LATENT) { zookeeper.start(); zookeeper.newNamespaceAwareEnsurePath(service); } zookeeper.create().creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath(service + "/i_", address.getBytes("utf-8")); } public void close() { zookeeper.close(); } }