Condition底层机制剖析:多线程等待与通知机制 _
在使用Lock之前使用的最多的同步方式应该是synchronized关键字来实现同步方式了。配合Object的wait()、notify()系列方法可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法与Lock配合可以实现等待/通知模式但是这两者在使用方式以及功能特性上还是有差别的。Object和Condition接口的一些对比。对比项Object 监视器方法Condition前置条件获取对象的监视器锁调用 Lock.lock() 获取锁调用 Lock.newCondition() 获取 Condition 对象调用方法直接调用如object.wait()直接调用如condition.await()等待队列个数一个多个当前线程释放锁并进入等待队列支持支持当前线程释放锁并进入等待队列在等待状态中不响应中断不支持支持当前线程释放锁并进入超时等待状态支持支持当前线程释放锁并进入等待状态到将来的某个时间不支持支持唤醒等待队列中的一个线程支持支持唤醒等待队列中的全部线程支持支持接口的介绍与示例首先需要明白condition对象是依赖于lock对象的意思就是说condition对象需要通过lock对象进行创建出来(调用Lock对象的newCondition()方法)。condition的使用方式非常的简单。但是需要注意在调用方法前获取锁。java/** * condition使用示例 * 1、condition的使用必须要配合锁使用调用方法时必须要获取锁 * 2、condition的创建依赖于Lock lock.newCondition() */ public class ConditionUseCase { /** * 创建锁 */ public Lock readLock new ReentrantLock(); /** * 创建条件 */ public Condition condition readLock.newCondition(); public static void main(String[] args) { ConditionUseCase useCase new ConditionUseCase(); ExecutorService executorService Executors.newFixedThreadPool(2); executorService.execute(() - { //获取锁进行等待 useCase.conditionWait(); }); executorService.execute(() - { //获取锁进行唤起读锁 useCase.conditionSignal(); }); } /** * 等待线程 */ public void conditionWait() { readLock.lock(); try { System.out.println(Thread.currentThread().getName() 拿到锁了); System.out.println(Thread.currentThread().getName() 等待信号); condition.await(); System.out.println(Thread.currentThread().getName() 拿到信号); } catch (Exception e) { } finally { readLock.unlock(); } } /** * 唤起线程 */ public void conditionSignal() { readLock.lock(); try { //睡眠5s 线程1启动 Thread.sleep(5000); System.out.println(Thread.currentThread().getName() 拿到锁了); condition.signal(); System.out.println(Thread.currentThread().getName() 发出信号); } catch (Exception e) { } finally { //释放锁 readLock.unlock(); } } } //执行结果 1 pool-1-thread-1拿到锁了 2 pool-1-thread-1等待信号 ---释放锁-线程等待 t1 3 pool-1-thread-2拿到锁了 4 pool-1-thread-2发出信号 --- 唤起线程t2释放锁 5 pool-1-thread-1拿到信号---t1继续执行如示例所示一般都会将Condition对象作为成员变量。当调用await()方法后当前线程会释放锁并在此等待而其他线程调用Condition对象的signal()方法通知当前线程后当前线程才从await()方法返回并且在返回前已经获取了锁。接口常用方法condition可以通俗的理解为条件队列。当一个线程在调用了await方法以后直到线程等待的某个条件为真的时候才会被唤醒。这种方式为线程提供了更加简单的等待/通知模式。Condition必须要配合锁一起使用因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定因此Condition一般都是作为Lock的内部实现。await()造成当前线程在接到信号或被中断之前一直处于等待状态。boolean await(long time, TimeUnit unit)造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态---》是否超时超时异常awaitNanos(long nanosTimeout)造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间如果在nanosTimesout之前唤醒那么返回值 nanosTimeout - 消耗时间如果返回值 0 ,则可以认定它已经超时了。awaitUninterruptibly()造成当前线程在接到信号之前一直处于等待状态。【注意该方法对中断不敏感】。awaitUntil(Date deadline)造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知则返回true否则表示到了指定时间返回返回false。signal()唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。signalAll()唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。这里顺便回顾一下 Object 类的主要方法wait()线程等待直到被通知或者中断。wait(long timeout)线程等待指定的时间或被通知或被中断。wait(long timeout, int nanos)线程等待指定的时间或被通知或被中断。notify()唤醒一个等待的线程。notifyAll()唤醒所有等待的线程。原理解析Condition是AQS的内部类。可以通过 Lock.newCondition() 方法获取 Condition 对象而 Lock 对于同步状态的实现都是通过内部的自定义同步器实现的newCondition() 方法也不例外所以Condition 接口的唯一实现类是同步器 AQS 的内部类 ConditionObject因为 Condition 的操作需要获取相关的锁所以作为同步器的内部类也比较合理该类定义如下javapublic class ConditionObject implements Condition, java.io.Serializable等待队列前面我们学过AQS 内部维护了一个先进先出FIFO的双端队列并使用了两个引用 head 和 tail 用于标识队列的头部和尾部。Condition 内部也使用了同样的方式内部维护了一个先进先出FIFO的单向队列我们把它称为等待队列。该队列是 Condition 对象实现等待 / 通知功能的关键。等待队列是一个FIFO的队列在队列中的每个节点都包含了一个线程引用该线程就是在Condition对象上等待的线程如果一个线程调用了Condition.await()方法那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。事实上节点的定义复用了 AQS 中 Node 节点的定义也就是说同步队列和等待队列中节点类型都是 AQS 的静态内部类 AbstractQueuedSynchronized.Node。一个 Condition 包含一个等待队列Condition 拥有首节点firstWaiter和尾节点lastWaiter。当前线程调用 Condition.await() 方法之后将会以当前线程构造节点并将节点从尾部加入等待队列等待队列的基本结构如下所示。等待队列分为首节点和尾节点。当一个线程调用Condition.await()方法将会以当前线程构造节点并将节点从尾部加入等待队列。新增节点就是将尾部节点指向新增的节点。节点引用更新本来就是在获取锁以后的操作所以不需要CAS保证。同时也是线程安全的操作。javapublic class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; }在 Object 的监视器模型上一个对象拥有一个同步队列和等待队列而并发包中的 Lock更确切地说是同步器拥有一个同步队列和多个等待队列等待 await 方法当线程调用了await方法以后。线程就作为队列中的一个节点被加入到等待队列中去了。同时会释放锁的拥有。当从await方法返回的时候。一定会获取condition相关联的锁。当等待队列中的节点被唤醒的时候则唤醒节点的线程开始尝试获取同步状态。如果不是通过 其他线程调用Condition.signal()方法唤醒而是对等待线程进行中断则会抛出InterruptedException异常信息。通知调用Condition的signal()方法将会唤醒在等待队列中等待最长时间的节点条件队列里的首节点在唤醒节点前会将节点移到同步队列中。当前线程加入到等待队列中如图所示源码如下javapublic final void await() throws InterruptedException { // 检测线程中断状态 if (Thread.interrupted()) throw new InterruptedException(); // 将当前线程包装为Node节点加入等待队列 Node node addConditionWaiter(); // 释放同步状态也就是释放锁 int savedState fullyRelease(node); int interruptMode 0; // 检测该节点是否在同步队中如果不在则说明该线程还不具备竞争锁的资格则继续等待 while (!isOnSyncQueue(node)) { // 挂起线程 LockSupport.park(this); if ((interruptMode checkInterruptWhileWaiting(node)) ! 0) break; } // 竞争同步状态 if (acquireQueued(node, savedState) interruptMode ! THROW_IE) interruptMode REINTERRUPT; // 清理条件队列中的不是在等待条件的节点 if (node.nextWaiter ! null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode ! 0) reportInterruptAfterWait(interruptMode); }调用该方法的线程是成功获取了锁的线程也就是同步队列中的首节点该方法会将当前线程构造节点并加入等待队列中然后释放同步状态唤醒同步队列中的后继节点然后当前线程会进入等待状态。可能会有这样几个问题怎样将当前线程添加到等待队列中释放锁的过程是怎样才能从 await 方法中退出问题1怎样将当前线程添加到等待队列中调用 addConditionWaiter 方法会将当前线程添加到等待队列中源码如下javaprivate Node addConditionWaiter() { // 尾节点 Node t lastWaiter; // 尾节点如果不是CONDITION状态则表示该节点不处于等待状态需要清理节点 if (t ! null t.waitStatus ! Node.CONDITION) { unlinkCancelledWaiters(); t lastWaiter; } // 根据当前线程创建Node节点 Node node new Node(Thread.currentThread(), Node.CONDITION); // 将该节点加入等待队列的末尾 if (t null) firstWaiter node; else t.nextWaiter node; lastWaiter node; return node; }首先将 t 指向尾节点如果尾节点不为空并且它的waitStatus!-2-2 为 CONDITION表示正在等待 Condition 条件则将不处于等待状态的节点从等待队列中移除并且将 t 指向新的尾节点。然后将当前线程封装成 waitStatus 为-2 的节点追加到等待队列尾部。如果尾节点为空则表明队列为空将首尾节点都指向当前节点。如果尾节点不为空表明队列中有其他节点则将当前尾节点的 nextWaiter 指向当前节点将当前节点置为尾节点。简单总结一下这段代码的作用就是通过尾插入的方式将当前线程封装的 Node 插入到等待队列中同时可以看出Condtion 的等待队列是一个不带头节点的链式队列之前我们学习 AQS 时知道同步队列是一个带头节点的链式队列这是两者的一个区别。关于头节点的作用我们这里简单说明一下。不带头节点是指在链表数据结构中链表的第一个节点就是实际存储的第一个数据元素而不是一个特定的头节点该节点不包含实际的数据。1不带头节点的链表链表的第一个节点就是第一个实际的数据节点。当链表为空时头引用通常称为 head指向 null。2带头节点的链表链表有一个特殊的节点作为链表的开头这个特殊的节点称为头节点。头节点通常不存储任何实际数据或者它的数据字段不被使用。无论链表是否为空头节点总是存在的。当链表为空时头节点的下一个节点指向 null。使用头节点可以简化某些链表操作因为你不必特殊处理第一个元素的插入和删除。为了更好地解释这两种链表结构我将为每种结构提供一个简单的整数链表插入方法的示例。1不带头节点的链表javapublic class Node { public int data; public Node next; public Node(int data) { this.data data; this.next null; } } public class LinkedListWithoutHead { public Node head; public void insert(int value) { Node newNode new Node(value); if (head null) { head newNode; } else { Node temp head; while (temp.next ! null) { temp temp.next; } temp.next newNode; } } }2带头节点的链表javapublic class NodeWithHead { public int data; public NodeWithHead next; public NodeWithHead(int data) { this.data data; this.next null; } } public class LinkedListWithHead { private NodeWithHead head; public LinkedListWithHead() { head new NodeWithHead(-1); // 初始化头节点 } public void insert(int value) { NodeWithHead newNode new NodeWithHead(value); NodeWithHead temp head; while (temp.next ! null) { temp temp.next; } temp.next newNode; } }这下是不是就彻底明白了说明白了头节点我们再回到 Condition 的 await 方法。问题 2释放锁的过程是将当前线程加入到等待队列之后需要释放同步状态该操作通过 fullyRelease(Node) 方法来完成javafinal int fullyRelease(Node node) { boolean failed true; try { // 获取同步状态 int savedState getState(); // 释放锁 if (release(savedState)) { failed false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus Node.CANCELLED; } }这段代码也很容易理解调用 AQS 的模板方法 release 释放 AQS 的同步状态并且唤醒在同步队列中头节点的后继节点引用的线程如果释放成功则正常返回若失败的话就抛出异常。问题3怎样才能从 await 方法中退出怎样从 await 方法退出呢现在回过头再来看 await 方法其中有这样一段逻辑javawhile (!isOnSyncQueue(node)) { // 3. 当前线程进入到等待状态 LockSupport.park(this); if ((interruptMode checkInterruptWhileWaiting(node)) ! 0) break; }isOnSyncQueue 方法用于判断当前线程所在的 Node 是否在同步队列中javafinal boolean isOnSyncQueue(Node node) { // 节点状态为CONDITION或者前驱节点为null返回false if (node.waitStatus Node.CONDITION || node.prev null) return false; // 后继节点不为null那么肯定在同步队列中 if (node.next ! null) // If has successor, it must be on queue return true; return findNodeFromTail(node); }如果当前节点的 waitStatus-2说明它在等待队列中返回 false如果当前节点有前驱节点则证明它在 AQS 队列中但是前驱节点为空说明它是头节点而头节点是不参与锁竞争的也返回 false。如果当前节点既不在等待队列中又不是 AQS 中的头节点且存在 next 节点说明它存在于 AQS 中直接返回 true。这里有必要给大家看一下同步队列与等待队列的关系图了。当线程第一次调用 condition.await 方法时会进入到这个 while 循环然后通过LockSupport.park(this)使当前线程进入等待状态那么要想退出 await第一个前提条件就是要先退出这个 while 循环出口就只两个地方走到 break 退出 while 循环while 循环中的逻辑判断为 false。出现第 1 种情况的条件是当前等待的线程被中断后代码会走到 break 退出第 2 种情况是当前节点被移动到了同步队列中即另外一个线程调用了 condition 的 signal 或者 signalAll 方法while 中逻辑判断为 false 后结束 while 循环。总结一下退出 await 方法的前提条件是当前线程被中断或者调用 condition.signal 或者 condition.signalAll 使当前节点移动到同步队列后。当退出 while 循环后会调用acquireQueued(node, savedState)该方法的作用是在自旋过程中线程不断尝试获取同步状态直到成功线程获取到 lock。这样也说明了退出 await 方法必须是已经获得了 condition 引用关联的 lock。到目前为止上文提到的三个问题我们都通过阅读源码的方式找到了答案也加深了对 await 方法的理解。await 方法示意图如下如图调用 condition.await 方法的线程必须是已经获得了 lock 的线程也就是当前线程是同步队列中的头节点。调用该方法后会使得当前线程所封装的 Node 尾插入到等待队列中。超时机制的支持condition 还额外支持超时机制使用者可调用 awaitNanos、awaitUtil 这两个方法实现原理基本上与 AQS 中的 tryAcquire 方法如出一辙。不响应中断的支持要想不响应中断可以调用condition.awaitUninterruptibly()方法该方法的源码如下javapublic final void awaitUninterruptibly() { Node node addConditionWaiter(); int savedState fullyRelease(node); boolean interrupted false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }这段方法与上面的 await 方法基本一致只不过减少了对中断的处理。通知-signal/signalAll 实现原理调用 condition 的 signal 或者 signalAll 方法可以将等待队列中等待时间最长的节点移动到同步队列中使得该节点能够有机会获得 lock。等待队列是先进先出FIFO的所以等待队列的头节点必然会是等待时间最长的节点也就是每次调用 condition 的 signal 方法都会将头节点移动到同步队列中。在调用signal()方法之前必须先判断是否获取到了锁。接着获取等待队列的首节点将其移动到同步队列并且利用LockSupport唤醒节点中的线程。节点从等待队列移动到同步队列如下图所示被唤醒的线程将从await方法中的while循环中退出。随后加入到同步状态的竞争当中去。成功获取到竞争的线程则会返回到await方法之前的状态。源码如下调用 Condition 的 signal() 方法将会唤醒在等待队列中等待时间最长的节点首节点在唤醒节点之前会将节点移到同步队列中。Condition 的 signal() 方法如下所示javapublic final void signal() { // 判断是否是当前线程获取了锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 唤醒等待队列的首节点 Node first firstWaiter; if (first ! null) doSignal(first); }该方法最终调用 doSignal(Node) 方法来唤醒节点javaprivate void doSignal(Node first) { do { // 把等待队列的首节点移除之后要修改首结点 if ( (firstWaiter first.nextWaiter) null) lastWaiter null; first.nextWaiter null; } while (!transferForSignal(first) (first firstWaiter) ! null); }真正对头节点做处理的逻辑将节点移动到同步队列是通过 transferForSignal(Node) 方法完成的
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2483780.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!