java 并发学习札记(二)生产者消费者队列
java 并发学习笔记(二)生产者消费者队列
消费者是LiftOffRunner,他将每个LiftOff对象从BlockingQueue中推出并直接运行,即通过显示的调用run(),使用自己的线程来运行,而不是为每个任务启动一个新线程
各个任务由main()放置到了BlockingQueue中,并且由LiftOffRunner从BlockingQueue中取出,LiftOffRunner可以忽略同步问题,由BlockingQueue解决了。
消费者是LiftOffRunner,他将每个LiftOff对象从BlockingQueue中推出并直接运行,即通过显示的调用run(),使用自己的线程来运行,而不是为每个任务启动一个新线程
package producer_consumer_Queue; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.SynchronousQueue; public class TestBlockingQueues { /** * @param args */ public static void main(String[] args) { //unlimited size test("LinkedBlockingQueue",new LinkedBlockingDeque<LiftOff>()); //fixed size //test("ArrayBlockingQueue",new ArrayBlockingQueue<LiftOff>(6)); //size of 1 //test("SynchronousQueue",new SynchronousQueue<LiftOff>()); } static void getKey() { try { new BufferedReader(new InputStreamReader(System.in)).readLine(); } catch (IOException e) { // TODO Auto-generated catch block //e.printStackTrace(); throw new RuntimeException(e); } } static void getKey(String message) { System.out.println(message); getKey(); } static void test(String message,BlockingQueue<LiftOff> queue) { System.out.println(message); LiftOffRunner runner=new LiftOffRunner(queue); Thread t=new Thread(runner); t.start(); for(int i=0;i<5;i++) { runner.add(new LiftOff(i)); } getKey("Press 'Enter' ("+message+")"); t.interrupt(); System.out.println("Finished "+message+" test"); } } //消费者 class LiftOffRunner implements Runnable { private BlockingQueue<LiftOff> rockets; public LiftOffRunner(BlockingQueue<LiftOff> queue) { rockets=queue; } public void add(LiftOff lo) { rockets.add(lo); } public void run() { try { while(!Thread.interrupted()) { LiftOff rocket=rockets.take(); rocket.run(); //use this thread } } catch (InterruptedException e) { //e.printStackTrace(); System.out.println("Waking from taken()"); } System.out.println("Exiting LiftOffRunner"); } } class LiftOff implements Runnable { private int count; public LiftOff(int i) { this.count=i; } public void run() { System.out.println("wawa:"+count); } }
各个任务由main()放置到了BlockingQueue中,并且由LiftOffRunner从BlockingQueue中取出,LiftOffRunner可以忽略同步问题,由BlockingQueue解决了。