(十一) Nepxion分布式RPC框架 - 负载均衡跟同步中心
Nepxion Thuder (QQ 1394997)发布在淘宝代码基地 http://code.taobao.org/p/Thunder/
-
loadbalance - 负载均衡
consistenthash - 一致性Hash算法
random - 随机算法
roundrobin - 轮询算法 - consistency - 同步中心
1. 负载均衡
负载均衡接口LoadBalanceExecutor.java,根据提供的接口名负载均衡到某一个ApplicationEntity(服务host和port)。代码如下:
public interface LoadBalanceExecutor extends ThunderDelegate { // 负载均衡 ApplicationEntity loadBalance(String interfaze) throws Exception; }负载均衡抽象类AbstractLoadBalanceExecutor.java。代码如下:
public abstract class AbstractLoadBalanceExecutor extends ThunderDelegateImpl implements LoadBalanceExecutor { private static final Logger LOG = LoggerFactory.getLogger(AbstractLoadBalanceExecutor.class); @Override public ApplicationEntity loadBalance(String interfaze) throws Exception { StrategyEntity strategyEntity = cacheContainer.getStrategyEntity(); LoadBalanceType loadBalanceType = strategyEntity.getLoadBalanceType(); Map<String, List<ApplicationEntity>> applicationEntityMap = cacheContainer.getApplicationEntityMap(); List<ApplicationEntity> applicationEntityList = applicationEntityMap.get(interfaze); if (CollectionUtils.isEmpty(applicationEntityList)) { throw new LoadBalanceException("Service instance [" + interfaze + "] can't be retrieved at Registry Center"); } ApplicationEntity applicationEntity = loadBalance(applicationEntityList, interfaze); LOG.info("{} - Loadbalance to host={}, port={}, service={}", loadBalanceType.getCriterion(), applicationEntity.getHost(), applicationEntity.getPort(), interfaze); return applicationEntity; } protected abstract ApplicationEntity loadBalance(List<ApplicationEntity> applicationEntityList, String interfaze) throws Exception; }
-
一致性Hash算法
一致性哈希算法是分布式系统中常用的算法。比如,一个分布式的存储系统,要将数据存储到具体的节点上,如果采用普通的hash方法,将数据映射到具体的节 点上,如key%N,key是数据的key,N是机器节点数,如果有一个机器加入或退出这个集群,则所有的数据映射都无效了,如果是持久化存储则要做数据 迁移,如果是分布式缓存,则其他缓存就失效了。
因此,引入了一致性哈希算法:把数据用hash函数(如MD5),映射到一个很大的空间里,如图所示。数据的存储时,先得到一个hash值,对应到这个环中的每个位置,如k1对应到了图中所示的位置,然后沿顺时针找到一个机器节点B,将k1存储到B这个节点中。
如果B节点宕机了,则B上的数据就会落到C节点上,如下图所示:
这样,只会影响C节点,对其他的节点A,D的数据不会造成影响。然而,这又会造成一个“雪崩”的情况,即C节点由于承担了B节点的数据,所以C节点的负载会变高,C节点很容易也宕机,这样依次下去,这样造成整个集群都挂了。
为此,引入了“虚拟节点”的概念:即把想象在这个环上有很多“虚拟节点”,数据的存储是沿着环的顺时针方向找一个虚拟节点,每个虚拟节点都会关联到一个真实节点,如下图所使用:
图中的A1、A2、B1、B2、C1、C2、D1、D2都是虚拟节点,机器A负载存储A1、A2的数据,机器B负载存储B1、B2的数据,机器C负载存储C1、C2的数据。由于这些虚拟节点数量很多,均匀分布,因此不会造成“雪崩”现象。
Thunder实现的一致性Hash算法,基于MemCache Spy的ketama源码改造而来的,具体实现不介绍了,可以参考它的源码。这里讲解把ketama应用于Thunder的一致性hash。代码如下:
由初始化的服务列表(ApplicationEntity)构造KetamaNodeLocator,算法是DefaultHashAlgorithm.KETAMA_HASH。当本地服务列表和远程服务列表不一致时,执行locator.updateLocator
locator.getPrimary(UUID.randomUUID().toString())是核心部分,用随机字符串去碰撞一致性Hash环里的节点,最终达到负载均衡public class ConsistentHashLoadBalanceExecutor extends AbstractLoadBalanceExecutor { private NodeLocator<ApplicationEntity> locator; @Override protected ApplicationEntity loadBalance(List<ApplicationEntity> applicationEntityList, String interfaze) throws Exception { if (locator == null) { locator = new KetamaNodeLocator<ApplicationEntity>(Lists.newCopyOnWriteArrayList(applicationEntityList), DefaultHashAlgorithm.KETAMA_HASH); } else { List<ApplicationEntity> entityList = locator.getAll(); if (!CollectionUtils.isEqualCollection(applicationEntityList, entityList)) { locator.updateLocator(Lists.newCopyOnWriteArrayList(applicationEntityList)); } } return locator.getPrimary(UUID.randomUUID().toString()); } }
-
随机算法
随机从一群服务中挑选一个可用服务返回。代码如下:public class RandomLoadBalanceExecutor extends AbstractLoadBalanceExecutor { @Override protected ApplicationEntity loadBalance(List<ApplicationEntity> applicationEntityList, String interfaze) throws Exception { int random = MathsUtil.random(applicationEntityList.size() - 1, 0); return applicationEntityList.get(random); } }
-
轮询算法
把一群有序服务,从头到尾方式访问,如果达到尾部后,再回到头部方法,周而复始的循环。代码如下:public class RoundRobinLoadBalanceExecutor extends AbstractLoadBalanceExecutor { private Map<String, AtomicLong> indexMap = Maps.newConcurrentMap(); @Override protected ApplicationEntity loadBalance(List<ApplicationEntity> applicationEntityList, String interfaze) { AtomicLong atomicLong = indexMap.get(interfaze); if (atomicLong == null) { atomicLong = new AtomicLong(0); indexMap.put(interfaze, atomicLong); } int index = (int) (atomicLong.getAndAdd(1) % applicationEntityList.size()); return applicationEntityList.get(Math.abs(index)); } }
2. 同步中心
同步中心接口ConsistencyExecutor.java,实现本地服务列表和远程注册中心保持同步一致。代码如下:
public interface ConsistencyExecutor extends ThunderDelegate { // 被动接受注册中心上下线事件,本地缓存与注册中心保持一致 void update(String interfaze, List<ApplicationEntity> applicationEntityList) throws Exception; // 本地上线,更新缓存 void online(String interfaze, ApplicationEntity applicationEntity) throws Exception; // 本地下线,更新缓存 void offline(String interfaze, ApplicationEntity applicationEntity) throws Exception; // 获得本地所有可用连接 List<ApplicationEntity> getApplicationEntityList(String interfaze) throws Exception; }接口中的update方法,将被注册中心的ZookeeperServiceInstancesWatcher.java所调用