关于高并发写数据的性能优化提拔的思考和实践
【背景】由于公司业务的需要,需要开发一个消息转发服务器,总体需求大致为:要求每秒的接入能力不低于1000,需要做数据统计(重点),数据不能缓存,尽可能快的转发,延时尽可能的低,可以允许丢失一些消息。
按照设计规格,每秒产生的统计数据不低于1000,这些数据需要入库,提供给报表做分析。因此,对统计数据t_statistics这表来说,并发写的鸭梨很大。由于要求尽可能的低延时,因此,每一个服务接入线程单独的存储统计数据也不现实:使用数据库线程池或者单线程直接存储都会对延时(需要对请求端有应答)有较大影响。 大致的数据流图如下
数据库使用的是mysql5.5 最终部署OS为linux2.6,对于mysql,经过测试,单线程单表每秒写入1300是比较稳定的速率,因此,基于要高于设计规格的思想,接入速率也要高于1000。那么问题来了-----对于每秒产生1000条统计数据应该如何插入数据库呢?每个服务线程中单独插入肯定不现实,因为这要维持一个庞大的数据库线程池,或者排队,带来的是延迟。
既然接入速率和存储速率能够匹配,因此,换个角度考虑的话,这就是个很基本的 生产者---消费者模型,上文中已经提到,单个线程的写入能力已经能达到要求,因此,重点是协调 N---1 模型的并发竞争。
基于N---1 的生产者---消费者模型,或者消费者比较少的模型,一个很重要的点在于如何通知消费者消费数据。太频繁的通知将带来消费低效,毕竟线程切换是需要付出代价的,因此,批次的概念在这里被很好的运用了。
基于一个批次的存储,也就是当生产者发现一个批次的数据达到一定的阀值时候,通知消费者来消费当前这个批次,问题又来了:当消费者消费某个批次的数据时候,生产者最新生产的数据应该怎么办呢,如果消费者---生产者基于共享队列(链表)的话,将会频繁的上锁、解锁和通知,这对编程来讲也带来复杂度。
实际上,上文中已经提到,当一个批次满的时候,就应该通知消费者写数据库,那么在消费者消费数据的空档期,新产生的数据仍然应该能够快速的消费掉,至少不能让消费者多等,那么是否能再开辟一个批次呢?
OK,N---1 的消费者--生产者模型基本出来了,基于多个链表的缓冲区,消费者集中向一个链表中写数据,当当前链表数据满时,通知消费者消费,后面的消费者开始切换链表,使用新的空的链表来存储数据。
在个人开发的消息转发服务器中,使用了6个List,阀值配置的是1w,也是就是当某个List的size达到1w的时候,就开始切换,聪明的你也许很快就想到了,瓶颈会再次出现在 List 切换的时候,实际上这个可以不算是问题,因为使用了多个List,在特定的时候哪些List是可用的,哪些List是消费者应该消费的,这些需要建立索引, 建立索引的好处是公平、避免单个队列数据过大和通知遗漏。
基于以上一些分析,个人写了如下的一点代码,使用了泛型,可通用。
package com.rockton.gps.router.concurrent; import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * * @author taoyu * * @param <T> */ public class ChunkManager<T> implements Runnable { Log log = LogFactory.getLog(ChunkManager.class); int chunkNum; int chunkSize; List<Chunk<T>> chunkList;// 数据块链表 ChunkConsumer<T> consumer;// 实际的消费者 Object lock = new Object();// 数据块索引切换锁 volatile boolean isStop; final Queue<Integer> fullIndex = new ConcurrentLinkedQueue<Integer>(); final Queue<Integer> emptyIndexs = new ConcurrentLinkedQueue<Integer>(); public ChunkManager(int chunkNum, int chunkSize) { this.chunkNum = chunkNum; this.chunkSize = chunkSize; chunkList = new ArrayList<Chunk<T>>(this.chunkNum); for (int i = 0; i < this.chunkNum; i++) { Chunk<T> c = new Chunk<T>(); c.setLock(false); chunkList.add(c); emptyIndexs.add(Integer.valueOf(i)); } } public void setChunkConsumer(ChunkConsumer<T> consumer) { this.consumer = consumer; } public void start() { if (null == consumer) { throw new IllegalStateException("ChunkConsumer could not be null"); } Thread t = new Thread(this); t.setDaemon(true); t.setName("T_ChunkManager"); t.start(); } public void add(T e) { Integer index = null; while ((index = emptyIndexs.peek()) == null) ; chunkList.get(index).add(e);// 线存储再检查,允许小范围的写入并发 if (chunkList.get(index).size() >= chunkSize) { // 这一步很重要,在临界状态下可以避免很多的锁定操作 if (fullIndex.contains(index)) { return; } synchronized (lock) { if (fullIndex.contains(index)) { return; } emptyIndexs.poll(); fullIndex.offer(index); lock.notify(); } } } @Override public void run() { while (!isStop) { try { doConsume(); } catch (Throwable e) { log.error(e.getMessage(), e); } } } private void doConsume() throws Exception { synchronized (lock) { while (fullIndex.isEmpty()) { lock.notify(); lock.wait(); if (isStop) { return; } } } // 如下的代码应该在synchronized之外,很多人习惯于放在里面,这样会导致锁定时间过长 Integer index = null; while ((index = fullIndex.peek()) != null) { Chunk<T> c = chunkList.get(index); log.info("Persistence the chunk , record is : " + c.size()); try { consumer.consume(c.getList()); } catch (Exception e) { log.error(e.getMessage(), e); } finally { c.getList().clear(); fullIndex.poll();// emptyIndexs.offer(index); } log.info("Persistence the chunk over"); } } public void exitAndFlush() { synchronized (lock) { isStop = Boolean.TRUE; lock.notifyAll(); } for (int i = 0; i < chunkList.size(); i++) { List<T> datas = chunkList.get(i).getList(); synchronized (datas) { consumer.consume(datas); } } } }
如下是spring的配置
<bean id="chunkManager" class="com.rockton.gps.router.concurrent.ChunkManager"> <constructor-arg index="0" value="6" /> <constructor-arg index="1" value="10000" /> </bean>
经过严密的测试,接入速率为 1.3K---1.4K 的时候,数据库操作比较稳定,如下图
上图是对转发数据的统计做的一个简单图表,实际上由于不能做到发送速率稳定,因此上图的数据会有一些波动,但是没有丢包。
也许,你会认为,如果请求数要高于1.4,那么数据库瓶颈就出来了呀,对,你可以考虑增加一个消费线程,但是这样做存储效率会提升但是不会翻番的。对于一个系统,是有最大负载的,都会有瓶颈的,超出部分,应该勇敢 say no.
欢迎拍砖!