百万级数据-程序迁徙后续

百万级数据-程序迁移后续
百万级数据-程序迁移:http://donald-draper.iteye.com/blog/2327909
    在上面这一篇文章中,内存为2G的情况下,单线程分页数为10万,批量保存为5000的情况下,更新120万的数据,需要耗时20分钟左右,同时JVM被占满,由于以前认为数据更新一次就少,就没有优化;后来一次更新的记录达到百万,应用扛不住,现在总于抽出时间,来做一些优化。以前是单线程处理所有分页,现在每页通过一个线程去更新,每个线程获取一个jdbc连接,注意数据量过大,分页数小的情况下,jdbc连个可能同时需要建立多个,我们要保证数据库允许最大连接数够用,Oracle默认为100。
今天打算每一页用一个线程去更新,主要思路如下:
#######需要用到的变量
pageUpateSize:分页数
threadPoolSize:线程数
batchSize:批量保存数

sums为需要更新的记录数,我们测试的为126万,大于10万才分页更新
##############################
主线程核心代码:
ExecutorService exec = null;
int batches = 0;
if( sums > 100000){
	if(sums % pageUpateSize ==0){
		batches = sums/pageUpateSize;
	}
	else{
		batches = sums/pageUpateSize  + 1;
	}
}
AtomicInteger counts = new AtomicInteger(0);//更新记录数计数器
CountDownLatch doneSignal = new CountDownLatch(batches); 
exec = Executors.newFixedThreadPool(threadPoolSize);
for(int i =1;i<=batches;i++){
        //getConnection(),获取数据库连接
	exec.submit(new PageUpdateThread(getConnection(), tableName,(i-1)*pageUpateSize+1,(i)*pageUpateSize,counts,doneSignal));
}
doneSignal.await();//等待所有分页线程结束
logger.info("============All Insert Sizes:"+counts.get());




分页更新线程:
/**
 * 分页更新线程
 * @author donald
 * @date 2017-4-13
 * @time 下午4:37:07
 */
public class PageUpdateThread implements Runnable {
	private static final Logger log = LoggerFactory.getLogger(PageUpdateThread.class);
	private static int batchSize = 2500;
	private Connection con;
	private String tableName;
	private int startPos;
	private int endPos;
	private final  AtomicInteger totalCount;
	private final CountDownLatch doneSignal;
	private SynService synService = null;
	private String threadName;
	/**
	 * 
	 * @param con
	 * @param tableName
	 * @param startPos
	 * @param endPos
	 * @param totalCount
	 * @param doneSignal
	 */
	public PageUpdateAllThread(Connection con, String tableName,
			 int startPos, int endPos,
			AtomicInteger totalCount,CountDownLatch doneSignal) {
		super();
		this.con = con;
		this.startPos = startPos;
		this.endPos = endPos;
		this.totalCount = totalCount;
		this.doneSignal = doneSignal;
	}
	/**
	 * 
	 */
	private void init(){
		synService = new SynService();
		threadName = Thread.currentThread().getName();
	}
	@Override
	public void run() {
		init();
		try {
			log.info(threadName+"正在更新记录:"+startPos+","+endPos);
			work();
			log.info(threadName+"更新记录完毕:"+startPos+","+endPos);
		} catch (BatchUpdateException e) {
			e.printStackTrace();
		} catch (SQLException e) {
			e.printStackTrace();
		}
		finally{
			doneSignal.countDown();
		}
	}
	/**
	 * 
	 * @throws BatchUpdateException
	 * @throws SQLException
	 */
	private void work() throws BatchUpdateException, SQLException{
		ResultSet addRs = null;
		PreparedStatement ps = null;
		List<PageData> insertList = new ArrayList<PageData>();
		//分页语句
		String sql = "SELECT * FROM (SELECT t.*, ROWNUM as rowno FROM ( SELECT * FROM "
				+ tableName
				+ " ORDER BY CREATETIME"
				+ " ) t WHERE ROWNUM <= ?)" + " WHERE rowno >= ?";
		log.info(threadName+"======Search insert records sql:" + sql + ",startPos:"
				+ startPos + ",endPos:" + endPos);
		int counts = 0;// 记录数
		try {
			ps = con.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE,
					ResultSet.CONCUR_READ_ONLY);
			ps.setInt(1, endPos);
			ps.setInt(2, startPos);
			addRs = ps.executeQuery();
			while (addRs.next()) {
				HashMap dataMap = null;
				dataMap = switch(addRs);//将记录放在Map中
				insertList.add(pd);
				if (counts % batchSize == 0 && counts > 0) {
					long childStartTime = System.currentTimeMillis();
					synService.batchInsertSync(tableName + "Mapper.save", insertList);
					long childEndTime = System.currentTimeMillis();
					log.info(threadName+"保存2500记录所用时间s:"
							+ (childEndTime - childStartTime) / 1000.00);
					insertList.clear();
					log.info(threadName+"============Records:" + counts);
				}
				if (addRs.isLast()) {
					synService.batchInsertSync(tableName + "Mapper.save", insertList);
					insertList.clear();
				}

				pd = null;
				counts++;
				totalCount.incrementAndGet();
			}
		} catch (SQLException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
		finally {
		        insertList = null;
			sql = null;
			if (addRs != null) {
				addRs.close();
				addRs = null;
			}
			if (ps != null) {
				ps.close();
				ps = null;
			}
			if (con != null) {
				con.close();
			}
		}
	}
}

下面我们来测试:
########################################
硬件环境如下:
硬件酷睿i7,4核处理器,JVM内存2G,记录数126万,数据库oracle
#########################################
JVM虚拟机参数配置:
-server
-XX:+UseConcMarkSweepGC
-XX:+PrintGCDetails
-Xloggc:E:\gc.log


测试的过程中,我们主要调试着3个参数:
pageUpateSize:分页数
threadPoolSize:线程数
batchSize:批量保存数


先调线程数:
参数设置及内存消耗和所用时间情况:
线程数,分页数,批量保存数,消耗内存最大值(G),耗时(s)
8,30000,  5000, 1.039, 353.661

Jconsole内存使用情况,垃圾回收次数和时间:
时间: 
2017-04-13 14:50:43
已用: 
 1,023,612 KB
已提交: 
 1,155,084 KB
最大值: 
 2,038,528 KB
GC 时间: 
ParNew上的      45.212 秒 (4,582收集)
ConcurrentMarkSweep上的       0.620 秒 (20收集)

VisualVM-内存使用情况图:

百万级数据-程序迁徙后续


参数设置及内存消耗和所用时间情况:
线程数,分页数,批量保存数,消耗内存最大值(G),耗时(s)
12,30000,  5000, 1.645, 254.612

Jconsole内存使用情况,垃圾回收次数和时间:
时间: 
2017-04-13 15:09:40
已用: 
   296,974 KB
已提交: 
 1,741,000 KB
最大值: 
 2,038,528 KB
GC 时间: 
ParNew上的      42.666 秒 (3,767收集)
ConcurrentMarkSweep上的       0.177 秒 (15收集)

VisualVM-内存使用情况图:


百万级数据-程序迁徙后续

相对于8个线程,使用时间减少99秒,新生代和老年代的垃圾回收次数和时间减少,但同时内存的最大使用量增大了500M左后,
这是因为线程内更新的有记录,线程数增量,相应的峰值内存增加,内存占用变化较大。

参数设置及内存消耗和所用时间情况:
线程数,分页数,批量保存数,消耗内存最大值(G),耗时(s)
16,30000,  5000, 1.607,187.303

Jconsole内存使用情况,垃圾回收次数和时间:
时间: 
2017-04-13 15:23:08
已用: 
   840,564 KB
已提交: 
 1,770,688 KB
最大值: 
 2,038,528 KB
GC 时间: 
ParNew上的      29.268 秒 (2,560收集)
ConcurrentMarkSweep上的       0.163 秒 (13收集)

VisualVM-内存使用情况图:

百万级数据-程序迁徙后续

相对于12个线程,使用时间减少67秒,新生代和老年代的垃圾回收次数和时间减少,但同时内存的最大使用量没有多大变化,
线程数增量。
小节:
在分页数和批量保存数相同的情况下,线程数越多,所用时间越少,
同时所耗内存最大值变大,新生代和老年代的垃圾回收次数和时间减少;

下面来调分页数量参数:
参数设置及内存消耗和所用时间情况:
线程数,分页数,批量保存数,消耗内存最大值(G),耗时(s)
8,30000,  5000, 1.039, 353.661
Jconsole内存使用情况,垃圾回收次数和时间:
时间: 
2017-04-13 14:50:43
已用: 
 1,023,612 KB
已提交: 
 1,155,084 KB
最大值: 
 2,038,528 KB
GC 时间: 
ParNew上的      45.212 秒 (4,582收集)
ConcurrentMarkSweep上的       0.620 秒 (20收集)
以上面参数配置,内存使用量,耗时作为对比基础


参数设置及内存消耗和所用时间情况:
线程数,分页数,批量保存数,消耗内存最大值(G),耗时(s)
8,20000,  5000, 0.851, 411.734

Jconsole内存使用情况,垃圾回收次数和时间:
时间: 
2017-04-13 15:40:23
已用: 
   202,855 KB
已提交: 
   893,508 KB
最大值: 
 2,038,528 KB
GC 时间: 
ParNew上的      42.290 秒 (4,530收集)
ConcurrentMarkSweep上的       0.236 秒 (23收集)

VisualVM-内存使用情况图:

百万级数据-程序迁徙后续

在8个线程和批量保存数为5000的情况下,分页数量减少10000,内存的峰值减少了188M,
但所耗时间增加了58秒,新生代和老年代的垃圾回收次数和时间增加;

参数设置及内存消耗和所用时间情况:
线程数,分页数,批量保存数,消耗内存最大值(G),耗时(s)
8,10000,  5000, 0.622, 696.168

Jconsole内存使用情况,垃圾回收次数和时间:
时间: 
2017-04-13 16:07:15
已用: 
   398,122 KB
已提交: 
   622,284 KB
最大值: 
 2,038,528 KB
GC 时间: 
ParNew上的      41.930 秒 (4,970收集)
ConcurrentMarkSweep上的       0.301 秒 (29收集)

VisualVM-内存使用情况图:

百万级数据-程序迁徙后续

在线程数和批量保存数相同情况下,
在8个线程和批量保存数为5000的情况下,分页数量减少,内存的峰值减少,
但所耗时间增加,新生代和老年代的垃圾回收次数和时间增加;

再来调批量保存数量:
以上一个参数配置,内存使用量,耗时作为对比基础作为对比基础
参数设置及内存消耗和所用时间情况:
线程数,分页数,批量保存数,消耗内存最大值(G),耗时(s)
8,10000,  2500, 0.474, 664.131

Jconsole内存使用情况,垃圾回收次数和时间:
时间: 
2017-04-13 16:31:18
已用: 
   256,886 KB
已提交: 
   535,884 KB
最大值: 
 2,038,528 KB
GC 时间: 
ParNew上的      38.173 秒 (4,693收集)
ConcurrentMarkSweep上的       0.380 秒 (32收集)

VisualVM-内存使用情况图:

百万级数据-程序迁徙后续

在8个线程,分页数为10000的情况下,批量保存数减少,内存的峰值减少,
但所耗时间增加,新生代和老年代的垃圾回收次数和时间增加;


总结:
在内存充足的情况,线程数量越多,所耗时间越少,新生代和老年代的垃圾回收次数和时间减少,但同时内存的峰值越大,在本次测试中,更新126万数据,内存峰值为1.607G,所耗时间为187.303s;平均每秒处理6737条记录,这个是在硬件酷睿i7,4核处理器,JVM内存2G情况下取得,假设有2个CPU,CPU为8核的,总共有16线程,内存为8G,保守估计,再想同时记录数和批量保存记录数的情况下每秒可以处理的记录数为(2x8x8)/(1x4x2)x6737,约每秒11万,拿相同时间可以处理的记录数来看,187.303s可以处理2057万数据。当然线程不是越多要好,凡是要有一个度,以前看过一篇文章,线程数最好为2xSum(CPU)xCore(CPU);从测试来看当线程数为Sum(CPU)xCore(CPU)的4倍时,性能相当好,不过这个要看具体的场景,仁者见仁智者见智啦;
如果内存不够用的情况,对处理耗时没有要求的话,我们可以减少线程数,分页数和批量保存数;现在有一台服务,奶奶的普通PC,4G的内存,由于服务器上还有其他数据库在跑,最后JVM只有650M的内存可用,由于应用服务为Server模式,默认为内存的1/4,但现在只有650M,由于应用没有时间上的要求,所以取线程数为8,分页数为10000,批量保存数为2500,更新120万的数据,10分钟还是可以接受的,内存峰值还不到500M。