java自带API实现生产者消费者形式
java自带API实现生产者消费者模式
通过JDK自带的BlockingQueue来实现生产者与消费者模式
①生产者
package org.fenxisoft.quence; import java.util.concurrent.BlockingQueue; public class Producer implements Runnable{ private BlockingQueue<Integer> queue; private int count; Producer(BlockingQueue<Integer> queue, int count){ this.queue = queue; this.count = count; } @Override public void run() { try { for(int i =0; i < count; i++){ /** * 如果队列满了,则会阻塞 * 将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间 */ queue.put(i); System.out.println("生产者生产数据:"+i); } } catch (InterruptedException e) { e.printStackTrace(); } } }
②消费者
package org.fenxisoft.quence; import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable{ private BlockingQueue<Integer> queue; private int count; Consumer(BlockingQueue queue, int count){ this.queue = queue; this.count = count; } @Override public void run() { try { for(int i =0; i<count; i++){ /** * 如果队列为空,则会阻塞 * 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要) */ Integer in= queue.take(); System.out.println("消费者消费数据:"+in); } } catch (InterruptedException e) { e.printStackTrace(); } } }
③测试类
package org.fenxisoft.quence; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class BlockingQuenceTest { public static final int MAX_VALUE = 100; public static void main(String[] args) { /** * 创建线程池 */ ExecutorService service = Executors.newCachedThreadPool(); BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10); Producer producer = new Producer(blockingQueue, MAX_VALUE); Consumer consumer = new Consumer(blockingQueue, MAX_VALUE); service.submit(producer); service.submit(consumer); service.shutdown(); } }