并发编程AQS之ReentrantLock/Semaphore/CountDownLatch/CyclicBarrier

news2026/4/30 16:11:13
一、管程——Java线程同步的设计思想管程指的是管理共享变量以及对共享变量的操作过程让他们支持并发。互斥同一时刻只允许一个线程访问共享资源同步线程之间如何通信、协作。MESA模型在管程的发展史上先后出现过三种不同的管程模型分别是Hasen模型、Hoare模型和MESA模型。现在正在广泛使用的是MESA模型。管程中引入了条件变量的概念而且每个条件变量都对应有一个等待队列。条件变量和等待队列的作用是解决线程之间的同步问题。Java中针对管程有两种实现一种是基于Object的Monitor机制用于synchronized内置锁的实现一种是抽象队列同步器AQS用于JUC包下Lock锁机制的实现Slf4j public class ConditionDemo2 { private static final ReentrantLock lock new ReentrantLock(); private static final Condition condition lock.newCondition(); public static void main(String[] args) throws InterruptedException { new Thread(() - { log.debug(t1开始执行....); lock.lock(); try { log.debug(t1获取锁....); // 让线程在obj上一直等待下去 condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); log.debug(t1执行完成....); } }, t1).start(); new Thread(() - { log.debug(t2开始执行....); lock.lock(); try { log.debug(t2获取锁....); // 让线程在obj上一直等待下去 condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); log.debug(t2执行完成....); } }, t2).start(); // 主线程两秒后执行 Thread.sleep(2000); log.debug(准备获取锁去唤醒 condition上阻塞的线程); lock.lock(); try { // 唤醒condition上所有阻塞的线程 condition.signalAll(); log.debug(唤醒condition上阻塞的线程); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }二、AQS原理分析1、什么是AQSjava.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为比如等待队列、条件队列、独占获取、共享获取等而这些行为的抽象就是基于AbstractQueuedSynchronizer简称AQS实现的AQS是一个抽象同步框架可以用来实现一个依赖状态的同步器。JDK中提供的大多数的同步器如Lock, Latch, Barrier等都是基于AQS框架来实现的一般是通过一个内部类Sync继承 AQS将同步器所有调用都映射到Sync对应的方法AQS具备的特性阻塞等待队列共享/独占公平/非公平可重入允许中断2、AQS核心结构private volatile int state;//共享变量使用volatile修饰保证线程可见性 //返回同步状态的当前值 protected final int getState() { return state; } // 设置同步状态的值 protected final void setState(int newState) { state newState; } //原子地CAS操作将同步状态值设置为给定值update如果当前同步状态的值等于expect期望值 protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }2.1、AQS内部维护属性volatile int statestate表示资源的可用状态2.2、State访问方式getState()setState()compareAndSetState()2.3、资源访问方式Exclusive-独占只有一个线程能执行如ReentrantLockShare-共享多个线程可以同时执行如Semaphore/CountDownLatch2.4、AQS实现方法isHeldExclusively()该线程是否正在独占资源。只有用到condition才需要去实现它。tryAcquire(int)独占方式。尝试获取资源成功则返回true失败则返回false。tryRelease(int)独占方式。尝试释放资源成功则返回true失败则返回false。tryAcquireShared(int)共享方式。尝试获取资源。负数表示失败0表示成功但没有剩余可用资源正数表示成功且有剩余资源。tryReleaseShared(int)共享方式。尝试释放资源如果释放后允许唤醒后续等待结点返回true否则返回false。2.5、AQS定义两种队列同步等待队列 主要用于维护获取锁失败时入队的线程。条件等待队列 调用await()的时候会释放锁然后线程会加入到条件队列调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中等待再次获得锁。2.6、AQS定义了5个队列中节点状态值为0初始化状态表示当前节点在sync队列中等待着获取锁。CANCELLED值为1表示当前的线程被取消SIGNAL值为-1表示当前节点的后继节点包含的线程需要运行也就是unparkCONDITION值为-2表示当前节点在等待condition也就是在condition队列中PROPAGATE值为-3表示当前场景下后续的acquireShared能够得以执行3、同步等待队列AQS当中的同步等待队列也称CLH队列CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列是FIFO先进先出线程等待队列Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。AQS 依赖CLH同步队列来完成同步状态的管理当前线程如果获取同步状态失败时AQS则会将当前线程已经等待状态等信息构造成一个节点Node并将其加入到CLH同步队列同时会阻塞当前线程当同步状态释放时会把首节点唤醒公平锁使其再次尝试获取同步状态。通过signal或signalAll将条件队列中的节点转移到同步队列。由条件队列转化为同步队列4、条件等待队列AQS中条件队列是使用单向列表保存的用nextWaiter来连接:调用await方法阻塞线程当前线程存在于同步队列的头结点调用await方法进行阻塞从同步队列转化到条件队列5、基于AQS实现一把独占锁/** * author * 基于AQS实现一把独占锁 */ public class TulingLock extends AbstractQueuedSynchronizer{ Override protected boolean tryAcquire(int unused) { //cas 加锁 state0 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } Override protected boolean tryRelease(int unused) { //释放锁 setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return getState() ! 0; } }三、ReentrantLockReentrantLock是一种基于AQS框架的应用实现是JDK中的一种线程并发访问的同步手段它的功能类似于synchronized是一种互斥锁可以保证线程安全。1、ReentrantLock使用方式public class ReentrantLockTest { private final ReentrantLock lock new ReentrantLock(); // ... public void doSomething() { lock.lock(); // block until condition holds try { // ... method body } finally { lock.unlock(); } } }2、ReentrantLock原理ReentrantLock基于 AQS CAS 实现。lock()流程图ReentrantLock基于抽象队列同步器AQS CAS 实现的加锁、释放锁。ReentrantLock实现了公平锁、非公平锁公平锁与非公平锁唯一的区别在于非公平锁不会判断等待队列中是否节点等待获取锁而是直接尝试获取锁获取不到再将当前线程节点添加进等待队列的尾节点判断当前线程节点是否挂起。unlock()流程图ReentrantLock释放锁的流程较为简单优先判断持有锁资源的线程是否为当前线程若不为当前线程抛出异常若为当前线程AQS的state的属性值减1再判断减1后的值是否为0若为0表示当前线程彻底释放锁资源唤醒等待队列中的挂起线程节点开始抢占锁资源。3、ReentrantLock源码分析3.1 构造函数private final Sync sync; // 默认使用非公平锁 public ReentrantLock() { sync new NonfairSync(); } // fairtrue公平锁否则非公平锁 public ReentrantLock(boolean fair) { sync fair ? new FairSync() : new NonfairSync(); }Sync是ReentrantLock的抽象静态内部类继承自AQS(AbstractQueuedSynchronizer) - 抽象队列同步器AQS中定义了锁的基本行为AQS中用volatile修饰的state表示当前锁重入的次数。NonfairSync、FairSync是ReentrantLock的静态内部类继承ReentrantLock$SyncNonfairSync实现非公平锁FairSync实现公平锁。3.2 lock()加锁private final Sync sync; // 加锁 public void lock() { sync.lock(); }3.3 公平锁调用AQS的acquire方法。ReentrantLock$FairSync#lock() 核心代码// 加锁 final void lock() { acquire(1); }3.4 非公平锁通过CAS尝试获取锁(将AQS的state由0修改为1)若成功代表当前线程获取锁资源成功若失败调用AQS的acquire方法。ReentrantLock$NonfairSync#lock() 核心代码// 加锁 final void lock() { // 获取锁资源CAS 修改 AQS 的 state 属性值,获取成功设置当前线程 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); // 获取失败执行AQS的acquire else acquire(1); }3.5 acquire()acquire()方法是Sync父类AQS中的方法AbstractQueuedSynchronizer#acquire() 核心代码// 获取锁资源 public final void acquire(int arg) { // 尝试获取锁资源 if (!tryAcquire(arg) // 当前线程为获取到锁资源加入等待队列同时挂起线程等待唤醒 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }3.6 tryAcquire()tryAcquire()方法在FairSync、NonFairSync中均有实现尝试获取锁资源核心代码如下// 公平锁 FairSync#tryAcquire() 方法 protected final boolean tryAcquire(int acquires) { // 获取当前线程 final Thread current Thread.currentThread(); // 获取AQS的 state int c getState(); // state 0 当前没有线程占用锁资源 if (c 0) { // 判断是否有线程在排队若有线程在排队返回true if (!hasQueuedPredecessors() // 尝试抢锁 compareAndSetState(0, acquires)) { // 无线程排队将线程属性设置为当前线程 setExclusiveOwnerThread(current); return true; } } // state ! 0 有线程占用锁资源 // 占用锁资源的线程是否为当前线程 else if (current getExclusiveOwnerThread()) { // state 1 int nextc c acquires; // 锁重入超出最大限制 (int的最大值)抛异常 if (nextc 0) throw new Error(Maximum lock count exceeded); // 将 state 1 设置给 state setState(nextc); // 当前线程拿到锁资源返回true return true; } return false; } // 非公平锁 NonFairSync#tryAcquire() 方法 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } // 非公平锁 Sync#nonfairTryAcquire() 方法 final boolean nonfairTryAcquire(int acquires) { // 获取当前线程 final Thread current Thread.currentThread(); // 获取AQS的 state int c getState(); // 无线程占用锁资源 if (c 0) { // CAS 修改 state 的值修改成功设置线程属性为当前线程返回占用锁资源标识 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 有线程占用锁资源 // 占用锁资源的线程是当前线程重入 else if (current getExclusiveOwnerThread()) { // AQS 的 state acquires int nextc c acquires; // 超出锁重入的上限(int的最大值)抛异常 if (nextc 0) throw new Error(Maximum lock count exceeded); // 将 state acquires 设置到 state 属性 setState(nextc); return true; } return false; }获取当前线程AQS的stateAQS的state属性值为0表示无线程占用锁资源判断等待队列中是否有线程在排队若有线程在排队返回尝试抢锁失败标识将线程添加进等待队列中。若state属性值不为0判断持有锁资源的线程是否为当前线程若为当前线程AQS的state属性值 1返回尝试抢锁成功标识。公平锁与非公平锁的整体实现流程类似唯一不同的是AQS的state属性值为0无线程占用锁资源时非公平锁不会判断是否有线程在等待队列中排队而是直接通过CAS抢锁。3.7 addWaiter()为当前线程创建入队节点AbstractQueuedSynchronizer$Node入参mode表示锁类型在AQS的静态内部类Node中有SHARE、EXCLUSIVE两个属性SHARE代表共享锁、EXCLUSIVE代表排它锁。AbstractQueuedSynchronizer#addWaiter() 核心代码// 等待队列的尾节点懒加载只能通过enq方法添加节点 private transient volatile Node tail; private Node addWaiter(Node mode) { // 当前线程、获取的锁类型封装为Node对象 Node node new Node(Thread.currentThread(), mode); // 获取等待队列的尾节点 Node pred tail; // 尾节点不为null if (pred ! null) { // 将当前节点设置为等待队列的尾节点 node.prev pred; if (compareAndSetTail(pred, node)) { pred.next node; return node; } } // 等待队列为空初始化等待队列节点信息 enq(node); // 返回当前线程节点 return node; }等待队列不为空将当前线程封装的Node节点添加进队列尾部若等待队列为空先初始化等待队列然后在将Node节点添加进队列尾部。3.8 enq()等待队列尾节点为空时执行enq()方法初始化等待队列并将Node节点添加进等待队列中。private Node enq(final Node node) { for (;;) { // 获取等待队列的尾节点 Node t tail; // 等待队列为空初始化等待队列 if (t null) { // 初始化等待队列头尾节点 if (compareAndSetHead(new Node())) tail head; } else { // 当前线程的Node添加到等待队列中 node.prev t; if (compareAndSetTail(t, node)) { t.next node; return t; } } } }3.9 acquireQueued()当前线程是否挂起AbstractQueuedSynchronizer#acquireQueued() 核心代码final boolean acquireQueued(final Node node, int arg) { // 获取锁资源标识 boolean failed true; try { boolean interrupted false; // 自旋 for (;;) { // 获取当前节点的前驱节点 final Node p node.predecessor(); // 当前节点的前驱节点为头节点并获取锁资源成功 if (p head tryAcquire(arg)) { // 将当前节点设置到head - 头节点 setHead(node); // 原头节点的下一节点指向设置为nullGC回收 p.next null; // 设置获取锁资源成功 failed false; // 不管线程GC return interrupted; } // 如果当前节点不是head的下一节点获取锁资源失败,尝试将线程挂起 if (shouldParkAfterFailedAcquire(p, node) // 线程挂起 UNSAFE.park() parkAndCheckInterrupt()) interrupted true; } } finally { if (failed) cancelAcquire(node); } }查看当前排队的Node是否是head的next 如果是尝试获取锁资源 如果不是或者获取锁资源失败那么就尝试将当前Node的线程挂起unsafe.park()。3.10 shouldParkAfterFailedAcquire检查并更新未成功获取锁资源的状态返回true表示线程被挂起。AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire() 核心代码static final class Node { // 线程被取消 static final int CANCELLED 1; // 等待队列中存在待被唤醒的挂起线程 static final int SIGNAL -1; // 当前线程在Condition队列中未在AQS对列中 static final int CONDITION -2; // 解决JDK1.5的BUG。共享锁在释放资源后若头节点为0无法确定真的没有后继节点 // 如果头节点为0需要将头节点的状态改为 -3 当最新拿到锁资源的线程查看 // 是否有后继节点并且为当前锁为共享锁需唤醒排队的线程。 static final int PROPAGATE -3; } // 获取锁资源失败挂起线程 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取当前节点的上一个节点的状态 int ws pred.waitStatus; // 上一节点被挂起 if (ws Node.SIGNAL) // 返回true挂起当前线程 return true; if (ws 0) { // 上一节点被取消获取最近的线程挂起节点 // 并将当前节点的上一节点指向最近的线程挂起节点 do { node.prev pred pred.prev; } while (pred.waitStatus 0); // 最近线程挂起节点的下一节点指向当前节点 pred.next node; } else { // 上一节点状态小于等于0存在线程处于等待状态但未被挂起的场景 // 通过CAS将处于等待的线程挂起避免在挂起前节点获取到锁资源 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 返回true不挂起当前线程 return false; }在挂起线程前确认当前节点的上一个节点的状态。若为1代表是取消的节点不能挂起若为-1代表后续节点中有挂起的线程若为-2 (线程在等待队列 - Condition队列中)、-3 (避免线程无法唤醒的一个状态)需要将状态改为-1之后才能挂起当前线程。3.11 unlock()释放锁释放锁ReentrantLock#unlock() 核心代码// 释放锁 public void unlock() { sync.release(1); }unlock方法实际调用的是AQS的release方法AbstractQueuedSynchronizer#release() 核心代码// 等待队列的头节点懒加载通过setHead方法初始化 private transient volatile Node head; // 释放锁 public final boolean release(int arg) { // 当前线程释放锁资源的计数值 if (tryRelease(arg)) { // 当前线程玩去释放锁资源获取等待队列头节点 Node h head; if (h ! null h.waitStatus ! 0) // 唤醒等待队列中待唤醒的节点 unparkSuccessor(h); // 完全释放锁资源 return true; } // 当前线程未完全释放锁资源 return false; }3.12 tryRelease()释放锁Reenttrant$Sync#tryRelease()的核心代码// 释放锁 protected final boolean tryRelease(int releases) { // 修改 AQS 的 state int c getState() - releases; // 当前线程不是持有锁的线程抛出异常 if (Thread.currentThread() ! getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 是否成功的将锁资源完全释放标识 state 0 boolean free false; // 锁资源完全释放 if (c 0) { // 修改标识 free true; // 将占用锁资源的属性设置为null setExclusiveOwnerThread(null); } // state赋值 setState(c); // 返回true表示当前线程完全释放锁资源 // 返回false标识当前线程是由锁资源持有计数值减少 return free; }4、ReentrantLock非公平锁执行流程四、SemaphoreSemaphore基于 AQS CAS 实现的可根据构造参数的布尔值选择使用公平锁还是非公平锁。Semaphore默认使用非公平锁。Semaphore详情如下1、Semaphore构造函数// AQS的实现 private final Sync sync; // 默认使用非公平锁 public Semaphore(int permits) { sync new NonfairSync(permits); } // 根据fair布尔值选择使用公平锁还是非公平锁 public Semaphore(int permits, boolean fair) { sync fair ? new FairSync(permits) : new NonfairSync(permits); }2、公平锁与非公平锁Semaphore中公平锁与非公平锁的实现可以在tryAcquireShared()方法中找到两种锁的区别。3、NonfairSyncSemaphore#NonfairSync#tryAcquireShared() 详情如下// 非公平锁 获取信号量 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }Semaphore#Sync#nonfairTryAcquireShared() 详情如下// 非公平锁 获取信号量 final int nonfairTryAcquireShared(int acquires) { // 自旋 for (;;) { // 获取Semaphore中可用的信号量数 int available getState(); // 当前可用信号量数 - acquires int remaining available - acquires; // 可用信号量数不足 或 CAS操作获取信号量失败返回 当前可用信号量数 - acquires if (remaining 0 || compareAndSetState(available, remaining)) return remaining; } }4、FairSyncSemaphore#FairSync#tryAcquireShared() 详情如下protected int tryAcquireShared(int acquires) { // 自旋 for (;;) { // 等待队列中挂起线程返回-1 (根据返回的-1将当前线程添加到等待队列中) if (hasQueuedPredecessors()) return -1; // 尝试获取Semaphore的信号量下面与非公平锁逻辑相同 int available getState(); int remaining available - acquires; if (remaining 0 || compareAndSetState(available, remaining)) return remaining; } }5、acquire()Semaphore默认实现的是非公平锁acquire()按非公平锁的实现进行源码分析。Semaphore 中获取一个信号量Semaphore#acquire() 详情如下// Semaphore 中无信号量阻塞 public void acquire() throws InterruptedException { // 获取 Semaphore 信号量 sync.acquireSharedInterruptibly(1); }AbstractQueuedSynchronizer#acquireSharedInterruptibly() 详情如下public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 线程中断抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 尝试获取Semaphore的信号量 if (tryAcquireShared(arg) 0) // 尝试获取信号量失败再次获取Semaphore信号量 doAcquireSharedInterruptibly(arg); }private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node addWaiter(Node.SHARED); boolean failed true; try { // 自旋 for (;;) { final Node p node.predecessor(); // 当前节点的前驱节点为等待队列头节点 if (p head) { // 尝试获取信号量 int r tryAcquireShared(arg); // 获取信号量成功 if (r 0) { // 唤醒等待队列中的待唤醒线程 setHeadAndPropagate(node, r); p.next null; failed false; return; } } // 获取信号量失败挂起线程 线程阻塞待唤醒进行下一轮自旋 if (shouldParkAfterFailedAcquire(p, node) // 若当前线程被中断抛出InterruptedException异常 parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }AbstractQueuedSynchronizer#setHeadAndPropagate()// node 当前节点propagate 剩余资源 private void setHeadAndPropagate(Node node, int propagate) { // 获取等待队列中的头节点 Node h head; // 将当前Node节点设置为等待队列的头节点 setHead(node); // 剩余资源大于0 || 原等待队列中的头节点为null || 原等待队列中 Node 的 ws 为 -1 或者 -3(共享锁) if (propagate 0 || h null || h.waitStatus 0 || (h head) null || h.waitStatus 0) { // 获取当前等待队列头节点的后继节点 Node s node.next; // 当前节点的后继节点为null 或 当前节点的后继节点为共享锁 if (s null || s.isShared()) doReleaseShared(); } }6、release()Semaphore默认实现的是非公平锁release()按非公平锁的实现进行源码分析。归还Semaphore的信号量Semaphore#release() 详情如下// 归还Semaphore的信号量 public void release() { sync.releaseShared(1); }Semaphore#Sync#releaseShared() 详情如下public final boolean releaseShared(int arg) { // 尝试归还信号量 if (tryReleaseShared(arg)) { // 归还信号量 doReleaseShared(); // 归还成功 return true; } // 归还失败 return false; }Semaphore#Sync#releaseShared() 详情如下// 尝试归还信号量 protected final boolean tryReleaseShared(int releases) { // 自旋 for (;;) { // 获取Semaphore中可用的信号量数 int current getState(); // 当前可用信号量数 归还的信号量 releases int next current releases; // 超出了int的最大值变成了负数 if (next current) throw new Error(Maximum permit count exceeded); // cas操作将信号量归还给Semaphore if (compareAndSetState(current, next)) return true; } }归还信号量成功唤醒等待队列中的挂起线程AbstractQueuedSynchronizer#doReleaseShared() :private void doReleaseShared() { // 自旋 for (;;) { // 获取等待队列头节点 Node h head; // 等待队列中有排队的线程 if (h ! null h ! tail) { int ws h.waitStatus; // 等待队列头节点ws -1说明其后继节点中有待唤醒的线程 if (ws Node.SIGNAL) { // cas 操作等待队列头节点的 ws 由 -1 更新为 0 cas失败继续下一次自旋 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 唤醒头节点的后继节点中待唤醒线程 unparkSuccessor(h); } // 解决共享锁JDK1.5的bug头节点的 ws 为0将头节点的 ws 设置为 -3 代表后继节点中可能有待唤醒的线程 else if (ws 0 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h head) break; } }7、总结不难看出公平锁与非公平锁的区别在于当线程尝试获取Semaphore中的信号量时公平锁优先判断等待队列中是否有挂起的线程如果有则将当前线程添加到等待队列中等待唤醒后抢夺信号量非公平锁不管等待队列中是否有挂起线程优先尝试获取信号量获取失败将当前线程添加到等待队列。五、CountDownLatchCountDownLatch让一个或多个线程等待其他线程执行完成后再执行。在创建CountDownLatch对象时必须指定线程数count每当一个线程执行完成调用countDown()方法线程数count减1当count减到0时await()方法就不再阻塞。CountDownLatch详情1、构造函数CountDownLatch没有无参构造函数在有参构造函数中初始化了sync属性。public CountDownLatch(int count) { // count 合法校验 if (count 0) throw new IllegalArgumentException(count 0); // 初始化sync属性 this.sync new Sync(count); }2、Sync - 队列同步器// 抽象队列同步器 private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID 4982264981922014374L; // 将 count 赋值给 AQS 的 state 属性 Sync(int count) { setState(count); } // 获取 AQS 的 state 属性 int getCount() { return getState(); } // 判断所有线程是否都执行完成 1 - 全部执行完成-1 - 仍有线程在执行 protected int tryAcquireShared(int acquires) { return (getState() 0) ? 1 : -1; } // 释放锁 protected boolean tryReleaseShared(int releases) { // 自旋 for (;;) { // 获取 AQS 的 state int c getState(); // 锁资源已经释放完毕再次进入直接返回false什么也不做 if (c 0) return false; // state - 1 int nextc c-1; // CAS 赋值操作 if (compareAndSetState(c, nextc)) // 最后一个线程执行完state 0 返回true。 // countDown() 唤醒等待队列中的其他挂起线程 return nextc 0; } } }3、await() - 阻塞等待// AQS的state属性不为0 阻塞 public void await() throws InterruptedException { // 调用AQS提供的获取共享锁并允许中断的方法 sync.acquireSharedInterruptibly(1); }AbstractQueuedSynchronizer#acquireSharedInterruptibly()详情如下// 获取共享锁并允许其中断 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 线程中断抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 获取共享锁由CountDownLatch实现 if (tryAcquireShared(arg) 0) // state 0说明有线程在持有锁资源将当前线程添加到AQS等待队列中 doAcquireSharedInterruptibly(arg); }CountDownLatch#Sync#tryAcquireShared()详情如下// 获取共享锁 protected int tryAcquireShared(int acquires) { // 线程全部执行完成返回 1未全部执行完成返回-1 return (getState() 0) ? 1 : -1; }AbstractQueuedSynchronizer#acquireSharedInterruptibly()详情如下// 将当前线程添加到AQS等待队列中 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 当前线程封装成Node添加到AQS等待队列中 final Node node addWaiter(Node.SHARED); boolean failed true; try { // 自旋 for (;;) { // 获取当前线程节点的前驱节点 final Node p node.predecessor(); // 前驱节点为等待队列头节点 if (p head) { // 调用 CountDownLatch 实现的方法 int r tryAcquireShared(arg); // 返回值为1表示 state 为 0 所有线程都释放了锁无其他线程持有锁资源 if (r 0) { // state 0将当前线程和后面所有排队的线程都唤醒。 setHeadAndPropagate(node, r); p.next null; failed false; return; } } // *** 线程在此处被挂起待所有线程释放锁资源后即state 0 线程被唤醒再继续往下执行 // 挂起获取锁资源失败的线程并且挂起的线程被中断抛出InterruptedException异常 if (shouldParkAfterFailedAcquire(p, node) parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }4、countDown() - 释放锁资源// countDown方法, 实际上调用了AQS的释放共享锁操作 public void countDown() { sync.releaseShared(1); }AbstractQueuedSynchronizer#releaseShared()详情如下// AQS提供的释放共享锁方法CountDownLatch实现了 tryReleaseShared 方法 public final boolean releaseShared(int arg) { // 尝试释放锁资源 if (tryReleaseShared(arg)) { // 没有线程持有锁资源唤醒等待队列中的其他挂起线程 doReleaseShared(); return true; } return false; }CountDownLatch#Sync#tryReleaseShared()详情如下protected boolean tryReleaseShared(int releases) { // 自旋 for (;;) { // 获取当前持有锁资源的线程数 int c getState(); // state已为0返回false那么再次执行countDown什么事情也不做 if (c 0) return false; // count - 1 int nextc c-1; // CAS 完成赋值操作 if (compareAndSetState(c, nextc)) // 没有线程持有锁资源返回true return nextc 0; } }AbstractQueuedSynchronizer#doReleaseShared()详情如下// 没有线程持有锁资源的处理 private void doReleaseShared() { // 自旋 for (;;) { // 获取等待队列的头节点 Node h head; // 等待队列中有挂起线程待唤醒 if (h ! null h ! tail) { int ws h.waitStatus; // 线程待唤醒 if (ws Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 唤醒线程 unparkSuccessor(h); } // CAS失败 else if (ws 0 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } // 等待队列头节点被改变结束循环 if (h head) break; } }5、总结CountDownLatch基于 AQS CAS 实现CountDownLatch的构造函数中必须指定count同时初始继承AQS的内部类Sync通过Sync对象将count赋值给AQS的state属性这样就可以基于AQS提供的方法完成CountDownLatch的功能。调用countDown()方法实际上是将AQS中 state 减 1。所有线程执行完成state 会被修改为 0 在countDown()中会唤醒等待队列中挂起的线程。调用await()方法实际上是判断AQS中的 state 是否为 0。state 0表示有线程仍在执行此时await()会阻塞线程。当最后一个线程执行结束state 变为 0countDown()唤醒线程后await()正常执行结束不再阻塞。六、CyclicBarrier与CountDownLatch、Semaphore直接基于AQS实现不同CyclicBarrier 是基于 ReentrantLock ConditionObject 实现的间接基于AQS实现的。1、CyclicBarrier内部结构Generation静态内部类持有布尔类型的属性broken默认为false只有在重置方法reset()、执行出现异常或中断调用breakBarrier() 属性会被设置为true。nextGenerate() 重置 CyclicBarrier 的计数器和generation属性。breakBarrier() 任务执行中断、异常、被重置将Generation中的布尔类型属性设置为true将Waiter队列中的线程转移到AQS队列中待执行完unlock方法后唤醒AQS队列中的挂起线程。await() CyclicBarrier的核心方法计数器递减处理。2、构造函数构造参数重载最终调用的是CyclicBarrier(int, Runnable)详情如下public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { // 参数合法性校验 if (parties 0) throw new IllegalArgumentException(); // final修饰所有线程执行完成归为或重置时 使用 this.parties parties; // 在await方法中计数值表示还有多少线程待执行await this.count parties; // 当计数count为0时 执行此Runnnable再唤醒被阻塞的线程 this.barrierCommand barrierAction; }CyclicBarrier属性3、await()在CyclicBarrier中await有重载方法。await()表示会一直等待指定数量的线程未准备就绪(执行await方法)await(timout, unit)表示等待timeout时间后指定数量的线程未准备就绪抛出TimeoutException超时异常。CyclicBarrier#await 详情如下// 执行没有超时时间的await public int await() throws InterruptedException, BrokenBarrierException { try { // 执行dowait() return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); } } // 执行有超时时间的await public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }await最终调用dowait()方法CyclicBarrier#dowait 详情如下private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 获取锁对象 final ReentrantLock lock this.lock; // 加锁 lock.lock(); try { // 获取generation对象 final Generation g generation; // 这组线程中在执行过程中是否异常、超时、中断、重置 if (g.broken) throw new BrokenBarrierException(); // 这组线程被中断重置标识与计数值 // 将Waiter队列中的线程转移到AQS队列抛出InterruptedException if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 计数值 - 1 int index --count; // 这组线程都已准备就绪 if (index 0) { // 执行结果标识 boolean ranAction false; try { // 若使用2个参数的有参构造就传入了自实现任务index 0先执行CyclicBarrier有参的任务 // 此处设计与 FutureTask 构造参数设计类似 final Runnable command barrierCommand; if (command ! null) // 执行任务 command.run(); // 执行完成设置为true ranAction true; // CyclicBarrier属性归位 nextGeneration(); return 0; } finally { // 执行过程中出现问题 if (!ranAction) // 重置标识与计数值将Waiter队列中的线程转移到AQS队列 breakBarrier(); } } // -- 之后count不为0表示还有线程在等待 // 自旋 直到被中断、超时、异常、count 0 for (;;) { try { // 未设置超时时间 if (!timed) // 挂起线程将线程转移到 Condition 队列 trip.await(); // 未达到等待时间 else if (nanos 0L) // 挂起线程并返回剩余等待时间 nanos trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 中断异常 if (g generation ! g.broken) { breakBarrier(); throw ie; } else { // 线程中断 Thread.currentThread().interrupt(); } } // 该组线程被中断、执行异常、超时抛出BrokenBarrierException异常 if (g.broken) throw new BrokenBarrierException(); if (g ! generation) return index; // 超时抛出异常TimeoutException if (timed nanos 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // 释放锁资源 lock.unlock(); } }4、breakBarrier()结束CyclicBarrier的执行// 结束CyclicBarrier的执行 private void breakBarrier() { // 设置线程执行过程中是否异常、中断、重置标识 generation.broken true; // 重置计数值 count parties; // 将Condition队列中的Node转移到AQS队列中等到执行完unlockAQS队列中的挂起线程会被唤醒 // 有后继节点的设置ws -1; // 无后继节点的设置ws 0 trip.signalAll(); }5、reset()重置CyclicBarrier// 重置CyclicBarrier public void reset() { // 获取锁对象 final ReentrantLock lock this.lock; // 加锁 lock.lock(); try { // 设置当前generation属性并将Waiter队列中线程转移到AQS队列 breakBarrier(); // 重置generation 属性、计数值 nextGeneration(); } finally { // 释放锁 lock.unlock(); } }6、nextGeneration()CyclicBarrier归位private void nextGeneration() { // 将Waiter队列中线程转移到AQS队列 trip.signalAll(); // 计数值、generation 归位 count parties; generation new Generation(); }7、总结CyclicBarrier基于 ReentrantLock ConditionObject实现CyclicBarrier的构造函数中必须指定parties同时对象generation内部持有布尔型属性表示当前CyclicBarrier执行过程中是否有超时、异常、中断的情况。parties是初始待执行线程数在构造函数中会将parties赋给计数值count每当一个线程执行await()count就会减1。当count被减为0时代表所有线程都准备就绪此时判断构造函数是否初始化了barrierCommand属性若对barrierCommand属性做了赋值优先执行barrierCommand任务barrierCommand任务执行完成再将Waiter队列中的线程转移到AQS队列中执行完unlock唤醒AQS队列中的线程计数值count、generation归位。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2569275.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…

网络编程(Modbus进阶)

思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…