等待条件ConditionObject源码浅析 1.引子 2. Condition接口中的抽象方法用法说明 3 Condition用法示例 4 条件队列结构 5 等待——休眠 6 通知——唤醒
任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式,这种实现主要体现在在虚拟机层面(对象头)和字节码(monitoreter monitorexit和synchronized方法修饰符)层面的支持 。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,这种实现主要通过一些数据结构和算法使用java代码实现。这两者之间有些差别如下:
对比项 | Object Monitor | Condition |
前置条件 | 获取对象的锁 | 先获取到显示锁,再根据显式锁获取条件Condition对象。Lock.lock();Lock.newCondition() |
调用方式 | 调用对象的wait方法,obj.wait() | 调用Condition对象的awaitXX()方法 |
等待条件个数 | 一个 | 多个 |
当线程释放锁并进入等待条件 | 支持 | 支持 |
当前线程释放锁并进入等待状态,在等待状态中不响应中断 | 不支持 | 支持 |
当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态到将来的某个时间 | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持 | 支持 |
唤醒等待队列中的所在线程 | 支持 | 支持 |
Java对象自带监视器与Condition接口的监视器的对比
从以上对比表可以看出它们再者之间的最大不同就是一个支持多个等待队列、另一个却不支持,在复杂的并发编程中Codition明显有更大的优势与便利。
2. Condition接口中的抽象方法用法说明
public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
await()方法:当前线程直到被通知或中断
awaitUninterruptibly():当前线程进入等待状态直到被通知(不响应中断)
awaitNanos(long):当前线程进入等待状态直到被通知或中断或超时。参数表示限定的纳秒数,返回值表示剩余时间,若返回值小于等于零,表明已超时。
await(long,TimeUnit):当前线程进入等待状态直到被通知或中断或超时。如果超时仍未被通知就返回false,否则返回true.
awaitUntil(Date):当前线程进入等待状态直到通知或中断或到了指定的某个时间点。如果到了某个时间点仍未获被通知就返回false,否则返回true。
singal():唤醒一个等待在Condition上的线程。
sinaglAll():唤醒所有等待在些Condtion上的线程。
3 Condition用法示例
下面这个Printer示例中,只有当charList中元素个数大于22,out()方法才输出并返回,否则一直等待。
class Printer { private final Lock lock = new ReentrantLock(); private final Condition lessItem = lock.newCondition(); private ArrayList<Character> charList = new ArrayList<>(Arrays.asList('1', '3', '4')); public void out() { lock.lock(); try { if (charList.size() < 22) { lessItem.await(); } System.out.println(charList.toString()); charList.clear(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void addChar() { lock.lock(); try { int i = 0; while (i < new Random().nextInt(3) + 3) { charList.add(String.valueOf(System.currentTimeMillis()).charAt(i)); i++; } if (charList.size() >= 22) { lessItem.signal(); } } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { final Printer p = new Printer(); new Thread(p::out).start(); new Thread(() -> { for (int i = 0; i < 7; i++) { System.out.println("第" + (i + 1) + "次addChar()"); p.addChar(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
下面的 BoundBlockQueue是一个有界阻塞队列,当队列为空时,出队列会被阻塞等待,直到队列中有元素;当队列中已满时,入队列也会被阻塞,直到队列中空出一个位置。
package thread; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class BoundBlockQueue<T> { private int addIndex = 0; private int removeIndex = 0; private int count = 0; private final Object[] items; private final Lock lock = new ReentrantLock(); private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition(); public BoundBlockQueue(int size) { if (size < 0) throw new IllegalArgumentException("size must large than zero"); this.items = new Object[size]; } public void add(T o) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[addIndex++] = o; count++; if (addIndex == items.length) addIndex = 0; notEmpty.signal(); } finally { lock.unlock(); } } public T remove() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object o = items[removeIndex]; items[removeIndex++] = null; count--; if (removeIndex == items.length) removeIndex = 0; notFull.signal(); return (T) o; } finally { lock.unlock(); } } public static void main(String[] args) { BoundBlockQueue<String> queue = new BoundBlockQueue<>(9); /* new Thread(() -> { try { int i = 0; while(i<=5){ queue.add(String.valueOf(i)); System.out.println("add-digital: " + i); Thread.sleep(1000); i++; } } catch (InterruptedException e) { e.printStackTrace(); } }).start();*/ new Thread(() -> { try { int i = 1; while (i <= 10) { queue.add("0" + i); System.out.println("add-digital: 0" + i); Thread.sleep(500); i++; } } catch (InterruptedException e) { e.printStackTrace(); } }, "addItemThread1").start(); new Thread(() -> { try { int i = 0; while (i <= 8) { String s = queue.remove(); System.out.println("remove-digital:" + s); Thread.sleep(1000); i++; } } catch (InterruptedException e) { e.printStackTrace(); } }, "removeItemThread1").start(); } }
4 条件队列结构
ConditionObject是Condition接口的实现类,它是AQS的成员内部类。每个Condition对象都包含着一个队列(等待队列,也可称为条件队列),该队列是Condition对象实现等待/通知功能的关键。ConditionObject有两个实例变量,firstWaiter lastWaiter分别表示等待条件的第一个节点(线程)和最后一个节点(线程),这里Node类型就是AQS中的静态内部类Node。
/** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter;
当前线程调用Condition的awaitXXX()方法,会将当前线程构造成一个新的结点添加的条件队列的尾部。这里不再使用Node的next属性(它对于ASQ的实例变量head、 tail很有用),而只使用nextWaiter属性,而且现在的Node的prev属性也不重要了,我们关注的是下一个等待条件唤醒的节点(线程)。Node类的设计就是如此巧妙,可以在AQS的两个地方扮演着不同的角色。
条件队列的主要结构
上图可看出新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。这里没有用CAS的方式添加尾节点,因为在调用awaitXXX()方法时,已经获取到锁了,锁可以保证此更新过程是线程安全的。
ConditionObject是AQS的成员内部类,因为成员内部类的实例对象必须依赖于外部实例而存在,所以每个ConditionObject都与一个AQS对象(准确说应该是AQS子类的对象,因为抽象类不可实例化)相绑定,ConditionObject对象可以访问AQS同步器的所有成员变量和方法。因为Condition.newCondition()方法可以调用多次,每次都产生一个与AQS对象绑定的Condition条件对象。因为ReentrantLock等锁都将AQS的子类类型的变量作为自身的实例变量,那么很明显在监视器模型上一个(锁)对象拥有一个同步队列和多个条件队列。
同步队列与条件队列的关系模型
5 等待——休眠
调用Condition的awaitXXX()方法,会使当前线程进入条件队列并释放锁,同时线程状态变为等待状态(线程挂起休眠)。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。
如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter();//将当前线程加入条件队列,node代表当前线程的节点 int savedState = fullyRelease(node);//释放当前线程的锁(即释放同步状态,并通知后继节点),并保存锁释放前的状态state int interruptMode = 0; //方法isOnSyncQueue(Node)用来判断节点所代表的线程是否在同步队列中 while (!isOnSyncQueue(node)) { //当前线程(节点)不在同步队列中,就休息当前线程 LockSupport.park(this); /** * 方法checkInterruptWhileWaiting(Node) * 在通知前发生中断返回THROW_IE,在通知后发生中断返回 REINTERRUPT,在等待通知的过程中没发生中断返回0 */ if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break;//发生了中断就停止当前线程是否在同步队列的检测 } /** * 执行到此,说明退出了上前的while循环,即从休眠状态中被唤醒了(从LockSupport.park()方法返回了),且当前线程(节点)在同步队列中。 * 在同步队列中了,当前线程又调用acquireQueued(Node,int)抢锁 */ if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters();//将被成功通知(即从休眠中唤醒的)的线程对应的节点从条件队列中移除 if (interruptMode != 0) /** *记录中断状态 * 如果interruptMode的值是THROW_IE,直接抛出中断异常 * 如果interruptMode的值是REINTERRUPT,则调用Thread.interrupt()中断当前线程(实际上只是置中断标志位,可能根本不会真正中断当前线程) */ reportInterruptAfterWait(interruptMode); }
调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException。
入等待队列的方法addCoditionWaiter()
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
根据addCoditionWaiter()方法可以看出,这里没有重用同步队列中的节点,而是使用构造方法Node(Thread,Node)重新构造的一个节点加入条件队列。而且此处没有使用Node的next 和prev属性,这里的Node类型已经从双向链表型队列退化为单向链表型队列了。
添加新节点进入等待队列的示意图
6 通知——唤醒
通知方法signal():先要进行监视器状态验证,如果没取到锁,就通知其他线程唤醒,显然是非法的,这将抛出异常。在通过监视器状态验证后才开始做实际通知的相关处理
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
doSigale(Node)主要逻辑是:按需更新条件队列的首尾节点,尝试通知条件队列的首节点代表的线程,如果通知失败,则通知条件队列中下一个节点的线程。
private void doSignal(Node first) { do { //将待通知节点的后继节点作为新的首节点,即更新首节点 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null;//if条件成立,说明新的首节点为null,表明条件队列中任何节点了,此时还需要更新尾节点为null first.nextWaiter = null;//将待通知节点的后继节点赋null,此后待通知节点不会在条件队列中使用用,因为新的首节点有自己的nextWaiter属性。 } while (!transferForSignal(first) && (first = firstWaiter) != null);//更新state状态并唤醒first代表的线程,不成功则继续循环处理(尝试通知条件队列中下一个节点代表的线程) }
transferForSignal(Node)方法的主要逻辑:尝试更新待通知节点的waitStatus,再将待通知节点加入同步队列,最后将待通知节点代表的线程从休眠中唤醒。
final boolean transferForSignal(Node node) { /* * CAS尝试将node节点的waitState属性从代表等待条件的CONDITION状态更新为代表初始状态的0 */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /** * 添加到同步队列上并尝试设置其前驱的waitStatus来指示线程(可能)在waiting。 * 如果节点是CANCELL或尝试设置waitStatus失败,请唤醒以重新同步(在这种情况下,waitStatus可能会短暂而无害地出现错误)。 */ Node p = enq(node);//重新新node代表的线程节点加入同步队列,p为node的前驱 int ws = p.waitStatus;//waitStatus>0的值只有 CANCELL //如果前驱节点是CANCELL状态,或前驱节点更新状态失败,就唤醒node节点代表的线程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
条件队列节点移动至同步队列
被唤醒后的线程,await()方法中while()循环片断的LockSupport.park(this)方法将得以返回,而接下来的while循环的条件表达式“isOnSyncQueue(Node )”也将返回true(节点已经在同步队列中),while循环得以退出,进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。
成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已经成功地获取了锁。
Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程!