Thread_大批量数据的分页处置(生产者-消费者-阻塞队列)
Thread_大批量数据的分页处理(生产者-消费者-阻塞队列)
java应用中通常会有处理大批量数据的场景,这里介绍一种分页处理的方式,仅供参考。
大批量数据通常不能一次读取或者一次写入,我的思路是参照分页查询的模式缩小每次操作的数据集,循环执行,直到处理完毕。
实现方式有两种:
1种是采用单线程模式顺序执行,1种是采用多线程模式,1个线程读取,另1个线程负责处理数据,这里介绍的是后一种。
具体说明:
1. 采用固定大小的队列来存储分页数据;
2. 队列满了,则开始执行操作(入库等)操作;
3. 队列空了,则开始向队列中插入数据;
/** * 批量数据分页处理 * * 场景:大批量数据查询、入库 * 解决:拆分大批量为小数据集合 * * 关键点:指定长度的queue/是否可读取read/存、取数据线程间的通讯 * * @author charles * */ public class PageDataHandler { boolean parseComplete = false; DataBuffer dataBuffer = new DataBuffer(); private static String[] data = null; private static int rowCount = 0; private static int pageCount = 0; // 准备数据 static { data = new String[]{"0-0","1-1","2-2","3-3","4-4","5-5","6-6","7-7","8-8"}; rowCount = data.length; pageCount = rowCount / DataBuffer.ROWS_PER_PAGE + ((rowCount % DataBuffer.ROWS_PER_PAGE) == 0 ? 0 : 1); } /** * 分页查询数据 * @param page * @return */ private void findByPage() throws Exception { Thread t = new Thread() { public void run() { try { for(int i = 0; i < pageCount; i++){ int begin = i * DataBuffer.ROWS_PER_PAGE; //装满queue为止 for(int j = 0; j < DataBuffer.ROWS_PER_PAGE; j++){ if(begin+j < data.length) dataBuffer.put(data[begin+j]); } } parseComplete = true; dataBuffer.triggerRead(true); } catch (Exception e) { e.printStackTrace(); } } }; t.start(); } /** * 消费queue中的数据 */ private void bizz(){ Thread t = new Thread() { public void run() { try { int bufferSize = 0; while (true) { bufferSize = dataBuffer.queue.size(); if(bufferSize > 0) dataBuffer.oper(); if(parseComplete == true) break; } } catch (Exception e) { e.printStackTrace(); } } }; t.start(); } public static void main(String args[]) throws Exception { PageDataHandler pageHandler = new PageDataHandler(); pageHandler.findByPage();//1.分页获取数据 pageHandler.bizz();//2.操作分页数据 } }
import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class DataBuffer { public static final int ROWS_PER_PAGE = 3; // 指定长度的queue public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(ROWS_PER_PAGE); // 是否可读取 private boolean read = false; public synchronized void put(String data) throws Exception { queue.put(data); System.out.println("** put : "+ data); if(queue.size() == ROWS_PER_PAGE) triggerRead(true); while(read){ try { this.wait(); } catch (InterruptedException e) { } } } public synchronized void triggerRead(boolean read) { this.read = read; this.notifyAll(); } public synchronized void oper() throws Exception { while(!read){ try { this.wait(); } catch (InterruptedException e) { } } System.out.println("** queue size : "+ queue.size()); for(Iterator it = queue.iterator(); it.hasNext();){ String data = (String)it.next(); //操作数据 System.out.println("** get : "+ queue.take()); } triggerRead(false); } }