线程间的交互

线程之间的交互有以下几种选择:

  1. synchronized,object.wait(),object.notify()
  2. lock,condition.await(),condition.signal()

synchronized,object.wait(),object.notify()

object.wait()是将当前线程置为等待状态,等待其他线程调用object.notify()或object.notifyAll()将其唤醒。

object.wait(),object.notify(),notifyAll(),wait(long),wait(long, int)等方法在用时必须保证调用主体被synchronized锁定中,否则会报java.lang.IllegalMonitorStateException。比如:

synchronized(value){
	value.notifyAll();
}
public synchronized void method(){
	this.notifyAll();
}
public static synchronized void staticMethod() {
    MyClass.class.notifyAll();
}

wait()存在的问题:方法注释上说:可能出现中断和虚假唤醒问题,这个方法应该始终被用在循环中。

As in the one argument version, interrupts and spurious wakeups are possible, and this method should always be used in a loop:

	synchronized (obj) {
        while (<condition does not hold>)
            obj.wait();
        ... // Perform action appropriate to condition
    }
class CakeProducer{
    private int cakeCount = 0;

    /**
     * 在蛋糕没消费之前不生产。
     */
    public synchronized void produce() {
        if(cakeCount != 0){
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        cakeCount++;
        System.out.println(Thread.currentThread().getName()+" #####produce 	"+cakeCount);
        this.notifyAll();
    }

    /**
     * 在蛋糕没生产之前不消费。
     */
    public synchronized void consume(){
        if(cakeCount != 1){
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        cakeCount--;
        System.out.println(Thread.currentThread().getName()+" consume 	"+cakeCount);
        this.notifyAll();
    }
}

捋一下上面这串代码的逻辑:线程a、b负责循环生产,线程c负责循环消费。

  1. a=>加锁,cakeCount为0,生产,cakeCount+1,notifyAll,循环第2遍,cakeCount为1,wait,解锁,停在第10行代码

    b=>等待a的锁=>加锁,cakeCount为1,wait,解锁,停在第10行代码

    c=>等待a的锁=>等待b的锁=>加锁,cakeCount为1,消费,cakeCount-1,notifyAll,循环第2遍,cakeCount为0,wait,解锁,停在第26行代码

  2. a和b争抢锁,假设a先获得锁

    a=>加锁,从第10行代码继续,生产,cakeCount+1,notifyAll,循环第3遍,cakeCount为1,wait,解锁,停在第10行代码

    b=>等待a的锁=>加锁,从第10行代码继续,生产,cakeCount+1,notifyAll,循环第2遍,cakeCount为2,wait,解锁,停在第10行代码

    c=>等待a的锁=>等待b的锁=>加锁,从第26行代码继续,消费,cakeCount-1,notifyAll,循环第3遍,cakeCount为1,消费,cakeCount-1,notifyAll,循环第4遍,cakeCount为0,wait,解锁,停在第26行代码

  3. ……

我们想要的结果是一次生产一个蛋糕,但上面的代码出现了连续生产两个蛋糕的情况,此时按照注释说明,用while替换if就会就解决问题。不信再捋一下。

lock,condition.await(),condition.signal()

这种方式比第一种更灵活一些。

引用Condition 接口的注释:

Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.

Condition将Object监视器方法( wait 、 notify和notifyAll )分解为不同的对象,通过将它们与任意Lock实例相结合的方法使用,产生每个对象具有多个等待集的效果。 Lock代替了synchronized方法和语句的使用,而Condition代替了对象监视器方法的使用。

说人话就是,用Lock代替synchronized加锁时,需要使用condition来代替wait(),notify()等方法。与wait()和notify()不同的是,condition可以给加锁的对象添加多个条件。当被加锁的对象相同时,notifyAll会唤醒所有wait的线程,notify也只是会随机唤醒一个线程。对于condition,当被加锁的对象相同时,可以有因为不满足condition1而await的线程,也有不满足condition2而await的线程……当condition1满足时,可以只唤醒因为condition1而等待的线程,其他线程仍然等待。

接口注释中提供的示例:

一个带边界的缓存,当缓存满的时候,put就置为等待状态,直到有可用空间,当缓存为空时,take就置为等待状态,直到缓存中有可用数据。

这里用了两个条件分别来限制put线程和take线程。

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition(); 
    //当缓存非空条件满足时,唤醒take线程,不满足时使take线程等待。
    final Condition notEmpty = lock.newCondition(); 

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            //校验缓存是否已满
            while (count == items.length)
                //是,则不满足notFull条件,则通过此condition将线程置为等待状态
                notFull.await();
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            //写入数据后,缓存不为空,则唤醒因为缓存为空而等待的读取线程
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            //校验缓存是否为空
            while (count == 0)
                //是,则不满足notEmpty条件,则通过此condition将线程置为等待状态
                notEmpty.await();
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            //读取数据后,缓存未满,则唤醒因为缓存已满而等待的写入线程
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}