AQS
AQS 是 AbstractQueuedSynchronizer 的缩写,即 抽象队列同步器,是 Java.util.concurrent 中的一个基础工具类,用于实现同步器(Synchronizer)的开发。
AQS 提供了一种实现锁和同步器的框架,使用 AQS 能简单且高效地构造出应用广泛的同步器,使得开发者能够更方便地编写线程安全的代码。
AQS 的使用涉及到复杂的同步机制和多线程协调,需要仔细考虑并发控制的细节。通常情况下,直接使用基于 AQS 实现的高级同步器(如 ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask、CountDownLatch 等)更为常见,而不需要直接使用 AQS。
AQS 的数据结构
AQS 内部使用了一个先进先出(FIFO)的 双端队列,被称为,被称为 CLH 队列(Craig, Landin, and Hagersten queues)。它并不直接储存线程,而是储存拥有线程的 Node 节点。
使用了两个虚拟的引用 head 和 tail 用于标识队列的头部和尾部,管理等待线程。
它的实现方式和普通的双端队列略有不同,主要涉及到以下两个方面:
-
节点的处理方式:在 CLH 队列中,每个 Node 都代表了一个等待线程,同时它还包含一个
state属性。当一个线程尝试获取同步状态时,它会创建一个节点插入到 CLH 队列的尾部,然后自旋等待获取同步状态。当同步状态可用时,AQS 会使用 CAS 操作将当前头结点改为自己,并返回原头结点,从而唤醒自己。
-
队列的管理方式:CLH 队列采用的是轻量级锁的思路,当一个线程到达尾节点时它只需自旋等待即可。此时,节点的
state属性会发挥重要作用,它能够指示当前节点是处于等待还是已经被唤醒出队并不再使用,从而避免了队列中节点的浪费。
![![[Craig, Landin, and Hagersten queues.png]]](https://i-blog.csdnimg.cn/direct/55c1741e1fb24daead67411f3a20e631.png)
AQS 内部使用了一个 [[JMM Java内存模型#volatile|volatile]] 的变量 state 来作为资源的标识。同时定义了几个获取和改变 state 的 [[Protected]] 方法 getState()、setState()、compareAndSetState(),均为原子操作,子类可以覆盖这些方法来实现自己的逻辑。
compareAndSetState 的实现依赖于 [[实现线程安全#Unsafe 类|Unsafe]] 的 compareAndSwapInt() 方法
线程的 Node 节点
![![[AQS Node Source Code.png]]](https://i-blog.csdnimg.cn/direct/59ac983d2f294d88b450a6b22ac6e04f.png)
waitStatus 用来标记当前节点的状态:
-
CANCELLED:表示当前节点(对应的线程)已被取消。当等待超时或被中断,会触发进入为此状态,进入该状态后节点状态不再变化; -
SIGNAL:后面节点等待当前节点唤醒; -
CONDITION:Condition类 中使用,当前线程阻塞在Condition,如果其他线程调用了 Condition 的signal方法,这个节点将从等待队列转移到同步队列队尾,等待获取同步锁; -
PROPAGATE:共享模式,前置节点唤醒后面节点后,唤醒操作无条件传播下去; -
0:中间状态,当前节点后面的节点已经唤醒,但是当前节点线程还没有执行完成。
实现队列
通过 Node 可以实现两种队列:
-
通过 prev 和 next 实现 CLH 队列。
在 CLH 锁中,每个等待的线程都会有一个关联的 Node,每个 Node 有一个 prev 和 next 指针。当一个线程尝试获取锁并失败时,它会将自己添加到队列的尾部并自旋,等待前一个节点的线程释放锁。
![![[CLTLock instance.png]]](https://i-blog.csdnimg.cn/direct/33fcfb54084747458866590a07e7c27b.png)
- 通过
nextWaiter实现Condition上的等待线程队列(单向队列),这个 Condition 主要用在ReentrantLock类中
常用方法
AQS 主要提供了以下几个重要方法(均为 protected):
-
isHeldExclusively():该线程是否正在独占资源。只有用到Condition类才需要去实现它。 -
acquire(int arg):尝试获取资源,如果获取失败则等待,直到成功获取同步状态或被中断。 -
release(int arg):释放资源,唤醒等待队列中的其他线程。 -
tryAcquire(int arg):独占方式,尝试获取资源,成功返回 true,失败返回 false。 -
tryRelease(int arg):独占方式,尝试释放资源,成功返回 true,失败返回 false。 -
tryAcquireShared(int arg):共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 -
tryReleaseShared(int arg):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回 true,否则返回 false。
arg 为要 获取/释放 的资源个数,在独占模式下始终为 1
这些方法虽然都是 protected 的,但是它们并没有在 AQS 具体实现,而是直接抛出异常:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
这里不使用抽象方法的目的是:避免强迫子类中把所有的抽象方法都实现一遍,减少无用功,这样子类只需要实现自己关心的抽象方法即可,如 信号 Semaphoreopen 只需要实现 tryAcquire 方法而不用实现其余不需要用到的模版方法
设计思想
AQS 的设计思想是基于模板方法模式,它将同步器的核心操作定义为抽象方法,由子类去实现具体细节。AQS 内部维护了一个等待队列,用于管理等待获取同步状态的线程。
AQS 通过 CAS(Compare and Swap)操作、队列等待、线程阻塞等机制来实现线程的协作和同步。
获取资源
获取资源的入口是 acquire(int arg) 方法
![![[AQS accquire source code.png]]](https://i-blog.csdnimg.cn/direct/6af153a9f58746a583adcc2e0a410b12.png)
首先调用 tryAcquire 尝试去获取资源,如果获取资源失败,就通过 addWaiter(Node.EXCLUSIVE) 方法把这个线程插入到等待队列(在队列的尾部插入新的 Node 节点)中。其中传入的参数代表要插入的 Node 是独占式的。通过 CAS 自旋的方式保证操作的线程安全性。
之后,处于等待队列的 Node 从头结点一个一个去获取资源。
![![[AQS acquire resource.png]]](https://i-blog.csdnimg.cn/direct/d71d8f883630440db03af267d3786904.png)
此外,还可以通过如下方法获取资源:
-
acquireInterruptibly:申请可中断(在线程中断时可能会抛出InterruptedException)的资源(独占模式) -
acquireShared:申请共享模式的资源 -
acquireSharedInterruptibly:申请可中断的资源(共享模式)
释放资源
释放资源相比于获取资源要简单得多
在 java.util.concurrent.locks.ReentrantLock 的实现中,tryRelease(arg) 会减少持有锁的数量,如果持有锁的数量变为0,释放锁并返回 true。
如果 tryRelease(arg) 成功释放了锁,那么接下来会检查队列的头结点。如果头结点存在并且 waitStatus 不为0(这意味着有线程在等待),那么会调用 unparkSuccessor(Node h) 方法来唤醒等待的线程。
AQS实现自定义锁
下面是一个简单的示例,展示如何使用 AQS 来实现一个自定义的可重入锁。
这个简单的锁实现没有考虑重入性和公平性等问题,实际应用中可能还需要更复杂的逻辑来满足更高级的需求。
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class CustomReentrantLock implements Lock {
private final Sync sync = new Sync();
private static final class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease(int arg) {
setState(0);
return true;
}
Condition newCondition() {
return new ConditionObject();
}
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
代码说明:
-
CustomReentrantLock 类实现了
Lock接口,提供了锁的基本操作。 -
Sync 类继承自
AbstractQueuedSynchronizer,并重写了tryAcquire和tryRelease方法来控制锁的状态。tryAcquire: 尝试获取锁,通过compareAndSetState方法原子地更新状态值。如果当前状态为0(表示锁未被持有),则设置状态为1并返回true,否则返回false。tryRelease: 释放锁,简单地将状态设置为0。
-
lock/unlock 方法: 使用
sync实例来调用AbstractQueuedSynchronizer中定义的acquire和release方法。 -
Condition: 通过
newCondition方法创建条件变量。
AQLS
AQS 里面的“资源”是用一个 int 类型的数据来表示的,有时候业务需求的资源数超出了 int 的范围,所以在 JDK 1.6 中,多了一个 AQLS(AbstractQueuedLongSynchronizer)。它的代码跟 AQS 几乎一样,只是把资源的类型变成了 long 类型。
AOS
AQS 和 AQLS 都继承了一个类 AOS(AbstractOwnableSynchronizer)。这个类于 JDK 1.6 引入。用于表示锁与持有者之间的关系(独占模式)
![![[AbstractOwnableSynchronizer.png]]](https://i-blog.csdnimg.cn/direct/ce4d38cced3041d2a9d73e84e8e936ca.png)
-
字段:
private transient Thread exclusiveOwnerThread;:这个字段用于存储当前独占访问权的线程。由于它是transient的,因此在序列化时不会被保存。
-
方法:
-
protected final void setExclusiveOwnerThread(Thread thread);:此方法用于设置当前拥有独占访问权的线程。如果传入null,则表示没有线程拥有访问权。 -
protected final Thread getExclusiveOwnerThread();:此方法返回最后通过setExclusiveOwnerThread方法设置的线程,如果没有设置过,则返回null。
-
AbstractOwnableSynchronizer 提供了一种机制来允许线程独占某个资源或同步器,通常被用作创建锁和其他同步器的基类。例如,在Java的 ReentrantLock 类中,就使用了 AbstractOwnableSynchronizer 来跟踪哪个线程当前拥有锁。这使得 ReentrantLock 能够支持重入性,即同一个线程可以多次获得同一个锁。



















