Thread_大批量数据的分页处置(生产者-消费者-阻塞队列)

Thread_大批量数据的分页处理(生产者-消费者-阻塞队列)

java应用中通常会有处理大批量数据的场景,这里介绍一种分页处理的方式,仅供参考。

大批量数据通常不能一次读取或者一次写入,我的思路是参照分页查询的模式缩小每次操作的数据集,循环执行,直到处理完毕。

 

实现方式有两种:

1种是采用单线程模式顺序执行,1种是采用多线程模式,1个线程读取,另1个线程负责处理数据,这里介绍的是后一种。

 

具体说明:

1. 采用固定大小的队列来存储分页数据;

2. 队列满了,则开始执行操作(入库等)操作;

3. 队列空了,则开始向队列中插入数据;

 

 

/**
 * 批量数据分页处理
 * 
 * 场景:大批量数据查询、入库
 * 解决:拆分大批量为小数据集合
 * 
 * 关键点:指定长度的queue/是否可读取read/存、取数据线程间的通讯
 * 
 * @author charles
 *
 */
public class PageDataHandler {
	boolean parseComplete = false;
	DataBuffer dataBuffer = new DataBuffer();
	
	private static String[] data = null;
	private static int rowCount = 0;
	private static int pageCount = 0;
	
	
	// 准备数据
	static {
		data = new String[]{"0-0","1-1","2-2","3-3","4-4","5-5","6-6","7-7","8-8"};
		rowCount = data.length;
		pageCount = rowCount / DataBuffer.ROWS_PER_PAGE + ((rowCount % DataBuffer.ROWS_PER_PAGE) == 0 ? 0 : 1);
	}
	
	
	/**
	 * 分页查询数据
	 * @param page
	 * @return
	 */
	private void findByPage() throws Exception {
		
		Thread t = new Thread() {
			public void run() {
				try {
					for(int i = 0; i < pageCount; i++){
						int begin = i * DataBuffer.ROWS_PER_PAGE;
						//装满queue为止
						for(int j = 0; j < DataBuffer.ROWS_PER_PAGE; j++){
							if(begin+j < data.length)
								dataBuffer.put(data[begin+j]);
						}
					}
					parseComplete = true;
					dataBuffer.triggerRead(true);
					
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		};
		
		t.start();
	}
	
	
	/**
	 * 消费queue中的数据
	 */
	private void bizz(){
		Thread t = new Thread() {
			public void run() {
				try {
					int bufferSize = 0;
					while (true) {
						bufferSize = dataBuffer.queue.size();
						if(bufferSize > 0)
							dataBuffer.oper();
						
						if(parseComplete == true)
							break;
					}
					
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		};
		
		t.start();
	}
	
	
	public static void main(String args[]) throws Exception {
		PageDataHandler pageHandler = new PageDataHandler();		
		pageHandler.findByPage();//1.分页获取数据
		pageHandler.bizz();//2.操作分页数据
	}
}



 

 

import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;


public class DataBuffer {
	public static final int ROWS_PER_PAGE = 3;
	// 指定长度的queue
	public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(ROWS_PER_PAGE);
	// 是否可读取
	private boolean read = false;
	
	
	public synchronized void put(String data) throws Exception {
		queue.put(data);
		System.out.println("** put : "+ data);
		
		if(queue.size() == ROWS_PER_PAGE)
			triggerRead(true);
		
		while(read){
			try {
				this.wait();
			} catch (InterruptedException e) {
			}
		}
	}
	
	
	public synchronized void triggerRead(boolean read) {
		this.read = read;
		this.notifyAll();
	}
	
	
	public synchronized void oper() throws Exception {
		while(!read){
			try {
				this.wait();
			} catch (InterruptedException e) {
			}
		}
		
		System.out.println("** queue size : "+ queue.size());
		
		for(Iterator it = queue.iterator(); it.hasNext();){
			String data = (String)it.next();
			//操作数据
			System.out.println("** get : "+ queue.take());
		}
		triggerRead(false);
	}
}