生产者消费者模式(转)

原文地址:http://www.cnblogs.com/luxh/p/3300956.html

第一种实现方式:

package com.rainy.pools.threads;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Demo {

    final Lock lock = new ReentrantLock();
    final Condition condition = lock.newCondition();

    public static void main(String[] args) {
        Demo test = new Demo();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();


        consumer.start();
        producer.start();
    }

    class Consumer extends Thread{

        @Override
        public void run() {
            consume();
        }

        private void consume() {

            try {
                lock.lock();
                System.out.println("我在等一个新信号"+this.currentThread().getName());
                condition.await();

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally{
                System.out.println("拿到一个信号"+this.currentThread().getName());
                lock.unlock();
            }

        }
    }

    class Producer extends Thread{

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            try {
                lock.lock();
                System.out.println("我拿到锁"+this.currentThread().getName());
                condition.signalAll();
                System.out.println("我发出了一个信号:"+this.currentThread().getName());
            } finally{
                lock.unlock();
            }
        }
    }

}

第二种:

package com.rainy.pools.threads;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingDemo {

    Executor pool = Executors.newFixedThreadPool(10);

    //仓库
    private BlockingQueue<String> storageQueue = new LinkedBlockingQueue<String>(5);

    //仓库容量
    private int MAX_SIZE = 3;

    //仓库为空
    private int ZERO = 0;

    //生产者线程
    private class Producer implements Runnable{

        //生产方法,需同步
        private void produce(){
            try {
                System.out.println(Thread.currentThread().getName()+"进入仓库,准备生产!");
                if(storageQueue.size()==MAX_SIZE) {
                    System.out.println("仓库已满!等待消费者消费");
                    Thread.sleep(1000);
                }
                if(storageQueue.size()<=MAX_SIZE) {
                    String product = "产品"+new Random().nextInt();
                    storageQueue.put(product);
                    System.out.println(Thread.currentThread().getName()+"往仓库中生产了一个产品!");
                }
                Thread.sleep(1000);
            }catch(InterruptedException ie) {
                System.out.println("中断异常");
                ie.printStackTrace();
            }
        }

        public void run() {
            while(true) {
                produce();
            }
        }
    }

    //消费者线程
    private class Customer implements Runnable{

        //消费方法,需同步
        private void consume() {
            try {
                System.out.println(Thread.currentThread().getName()+"进入仓库,准备消费!");
                if(storageQueue.size()==ZERO) {
                    System.out.println("仓库已空!等待生产者生产");
                    Thread.sleep(1000);
                }
                if(storageQueue.size()!=ZERO) {
                    System.out.println(Thread.currentThread().getName()+"从仓库取得产品:"+storageQueue.take());
                }
                Thread.sleep(1000);
            }catch(InterruptedException ie) {
                System.out.println("中断异常");
                ie.printStackTrace();
            }
        }

        public void run() {
            while(true) {
                consume();
            }
        }

    }

    //启动生产者和消费者线程
    public void start() {
        for(int i=1;i<5;i++) {
            //new Thread(new Producer()).start();
            ///new Thread(new Customer()).start();
            pool.execute(new Producer());
            pool.execute(new Customer());
        }

    }

    public static void main(String[] args) {
        BlockingDemo pc = new BlockingDemo();
        pc.start();
    }

}

第三种:

package com.rainy.pools.threads;

import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConcurrentDemo {

    Executor pool = Executors.newFixedThreadPool(10);

    //仓库
    private List<String> storageList = new LinkedList<String>();

    //仓库容量
    private int MAX_SIZE = 3;

    //仓库为空
    private int ZERO = 0;

    //获取锁对象
    private Lock lock = new ReentrantLock();

    //仓库满了,绑定生产者线程
    private Condition full = lock.newCondition();

    //仓库为空,绑定消费者线程
    private Condition empty = lock.newCondition();

    //生产者线程
    private class Producer implements Runnable{

        //生产方法,需同步
        private void produce(){
            if(lock.tryLock()) {
                System.out.println(Thread.currentThread().getName()+"进入仓库,准备生产!");
                try {
                    if(storageList.size()==MAX_SIZE) {
                        System.out.println("仓库已满!等待消费者消费");
                        Thread.sleep(1000);
                        full.await();//生产者线程加入线程等待池
                    }
                    if(storageList.size()<MAX_SIZE){
                        String name = "产品"+new Random().nextInt();
                        storageList.add(name);
                        System.out.println(Thread.currentThread().getName()+"往仓库中生产了一个产品!");
                    }
                    Thread.sleep(1000);
                    System.out.println("发送唤醒消费者信号");
                    empty.signalAll();//唤醒消费者线程

                }catch(InterruptedException ie) {
                    System.out.println("中断异常");
                    ie.printStackTrace();
                }finally{
                    lock.unlock();
                }
            }
        }

        public void run() {
            while(true) {
                produce();
            }
        }
    }

    //消费者线程
    private class Customer implements Runnable{

        //消费方法,需同步
        private void consume() {
            if(lock.tryLock()) {
                System.out.println(Thread.currentThread().getName()+"进入仓库,准备消费!");
                try {
                    if(storageList.size()==ZERO) {
                        System.out.println("仓库已空!等待生产者生产");
                        Thread.sleep(1000);
                        empty.await();//消费者线程加入线程等待池
                    }
                    if(storageList.size()!=ZERO) {
                        System.out.println(Thread.currentThread().getName()+"从仓库取得产品:"+storageList.remove(0));
                    }

                    Thread.sleep(1000);
                    System.out.println("发送唤醒生产者信号");
                    full.signalAll();//唤醒生产者线程
                }catch(InterruptedException ie) {
                    System.out.println("中断异常");
                    ie.printStackTrace();
                }finally{
                    lock.unlock();
                }
            }
        }

        public void run() {
            while(true) {
                consume();
            }
        }

    }

    //启动生产者和消费者线程
    public void start() {
        for(int i=1;i<5;i++) {
            //new Thread(new Producer()).start();
            //new Thread(new Customer()).start();
            pool.execute(new Producer());
            pool.execute(new Customer());
        }

    }

    public static void main(String[] args) {
        ConcurrentDemo pc = new ConcurrentDemo();
        pc.start();
    }

}

自测Demo:

package com.rainy.pools.threads;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Pools {

    protected Condition notEmpty;
    protected Condition empty;
    protected ReentrantLock lock;
    private int poolingCount = 0;

    public Pools () {
        lock = new ReentrantLock(false);
        notEmpty = lock.newCondition();
        empty = lock.newCondition();
    }

    public void get() {
        GetThread get = new GetThread();
        get.start();
    }

    public void set() {
        SetThread set = new SetThread();
        set.start();
    }

    public class SetThread extends Thread {
        public void run() {
            while(true){
                set();
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        public void set() {
            if(lock.tryLock()) {
                try {
                    System.out.println("set poolingCount : " + poolingCount);
                    if (poolingCount > 10) {
                        System.out.println("仓库满");
                        notEmpty.await();
                    }
                    poolingCount++;
                    System.out.println("set poolingCount : " + poolingCount);
                } catch (InterruptedException e) {
                    System.out.println("set InterruptedException ");
                    e.printStackTrace();
                } finally {
                    empty.signal();
                }
                System.out.println("set poolingCount : " + poolingCount);
                System.out.println();
                System.out.println();
                lock.unlock();
            }
        }
    }

    public class GetThread extends Thread {
        public void run() {
            while(true){
                get();
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        public void get() {
            // tryLock()
            // lock()
            // lockInterruptibly()
            // 方法的区别:
            /**
                (*) lock()方法获取锁。
                如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。
                如果当前线程已经保持该锁,则将保持计数加 1,并且该方法立即返回。
                如果该锁被另一个线程保持,则出于线程调度的目的,禁用当前线程,并且在获得锁之前,该线程将一直处于休眠状态,此时锁保持计数被设置为 1。

                (*) lockInterruptibly()方法获取锁。
                1) 如果当前线程未被中断,则获取锁。
                2)如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。
                3)如果当前线程已经保持此锁,则将保持计数加 1,并且该方法立即返回。
                4)如果锁被另一个线程保持,则出于线程调度目的,禁用当前线程,并且在发生以下两种情况之一以前,该线程将一直处于休眠状态:
                    1)锁由当前线程获得;或者
                    2)其他某个线程中断当前线程。
                5)如果当前线程获得该锁,则将锁保持计数设置为 1。
                如果当前线程:
                    1)在进入此方法时已经设置了该线程的中断状态;或者
                    2)在等待获取锁的同时被中断。
                    则抛出 InterruptedException,并且清除当前线程的已中断状态。
                6)在此实现中,因为此方法是一个显式中断点,所以要优先考虑响应中断,而不是响应锁的普通获取或重入获取。
                    指定者: 接口 Lock 中的 lockInterruptibly
                    抛出:   InterruptedException   如果当前线程已中断。

                (*) tryLock()方法获取锁。
                仅在调用时锁未被另一个线程保持的情况下,才获取该锁。
                1)如果该锁没有被另一个线程保持,并且立即返回 true 值,则将锁的保持计数设置为 1。
                    即使已将此锁设置为使用公平排序策略,但是调用 tryLock() 仍将 立即获取锁(如果有可用的),
                    而不管其他线程当前是否正在等待该锁。在某些情况下,此“闯入”行为可能很有用,即使它会打破公
                    平性也如此。如果希望遵守此锁的公平设置,则使用 tryLock(0, TimeUnit.SECONDS)
                    ,它几乎是等效的(也检测中断)。
                2)如果当前线程已经保持此锁,则将保持计数加 1,该方法将返回 true。
                3)如果锁被另一个线程保持,则此方法将立即返回 false 值。
                指定者:
                    接口 Lock 中的  tryLock
                    返回:
                    如果锁是*的并且被当前线程获取,或者当前线程已经保持该锁,则返回true;否则返回false

                总结:
                     1)lock(), 拿不到lock就不罢休,不然线程就一直block。 比较无赖的做法。
                     2)tryLock(),马上返回,拿到lock就返回true,不然返回false。 比较潇洒的做法。带时间限制的tryLock(),拿不到lock,就等一段时间,超时返回false。比较聪明的做法。
                     3)lockInterruptibly()lockInterruptibly()和上面的第一种情况是一样的, 线程在请求lock并被阻塞时,如果被interrupt,则“此线程会被唤醒并被要求处理InterruptedException”。
                        并且如果线程已经被interrupt,再使用lockInterruptibly的时候,此线程也会被要求处理interruptedException
             */
            if(lock.tryLock()) {
                try {
                    System.out.println("get poolingCount : " + poolingCount);
                    if (0 == poolingCount) {
                        System.out.println("仓库空");
                        empty.await();
                    }
                } catch (InterruptedException e) {
                    System.out.println("get InterruptedException ");
                    e.printStackTrace();
                } finally {
                    notEmpty.signal();
                }

                System.out.println("get poolingCount : " + poolingCount);
                poolingCount--;
                System.out.println("get poolingCount : " + poolingCount);
                System.out.println();
                System.out.println();

                lock.unlock();
            }
        }
    }

    public static void main(String[] args) {
        Pools p = new Pools();
        p.set();
        p.get();
    }

}