一、Semaphore
Semaphore 通过设置一个固定数值的信号量,并发时线程通过 acquire() 获取一个信号量,如果能成功获得则可以继续执行,否则将阻塞等待,当某个线程使用 release() 释放一个信号量时,被等待的线程如果可以成功抢到信号量则继续执行。根据该特征可以有效控制线程的并发数。
那 Semaphore 是如何控制并发的呢,本篇文章带领大家一起解读下 Semaphore 的源码。
在进行源码分析前,先回顾下 Semaphore 是如何使用的,例如下面一个案例:
public class Test {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("线程:" + Thread.currentThread().getName() + " 执行, 当前时间:" + LocalDateTime.now().toString());
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
运行之后,可以看到下面日志:

可以看到每次都是 3 个并发。
下面我们通过源码看下 Semaphore 是如何实现的等待唤醒。
二、Semaphore 源码解读
2.1. 对象的创建过程
如果需要一个Semaphore 对象,可以直接通过 new 创建对象,并传入一个信号量的值,那这个值到底给到哪去了呢?
我们可以点到 Semaphore 的构造函数中,构造函数中可以通过 fair 参数控制 Sync 的类型,其实就是控制后面所使用的锁是公平锁还是非公平锁,如果使用的是只有一个 permits 参数的构造函数,则 Sync 的类型是 NonfairSync 也就是非公平锁:

这里以 NonfairSync 为例,可以点到 NonfairSync 的构造函数中,可以看到信号量值给了父类 Sync 的构造函数:

Sync 则继承了 AQS ,因此 Semaphore 是依赖于 AQS 阻塞队列的,这里将信号量的大小给到了 AQS 中的 state 变量。


2.2. acquire() 获取信号量过程
点到 acquire() 方法中,可以看到又调用了 sync.acquireSharedInterruptibly(1) 也就是 AQS 下的 acquireSharedInterruptibly 方法中:


在该方法中首先使用了 tryAcquireShared 方法,可以理解为获取锁的操作,不过不同的是,这里获取的是信号量的剩余,如果剩余小于0了,则表示获取锁失败,需要进行阻塞,下面来看下是如果获取锁的。
这里的 tryAcquireShared 会调用到子类的 tryAcquireShared ,这里以 NonfairSync 为例:


下面继续进到 nonfairTryAcquireShared 方法中,这里的逻辑很明了,使用乐观锁自旋的方式,对 state 也就是记录信号量值的变量,进行减去 acquires, acquires在前面就已经看到,写死的 1 ,因此这里将 state 进行了 -1 并赋值,返回的值也就是state - 1 之后的值:

看到这之后再回到前面的 AQS 下的 acquireSharedInterruptibly 方法中,如果 tryAcquireShared 获取锁成功,也就是信号量余额大于等于 0 ,那这种情况下也无需进行阻塞,但如果小于 0 了此时则表示信号量已经被使用完了,该线程就需要阻塞等待了,下面继续进到 doAcquireSharedInterruptibly 方法中,看是如何实现阻塞等待的:

进到 doAcquireSharedInterruptibly 方法中,主要使用了 AQS中的几个常用方法,在该方法中首先使用 addWaiter 将当前线程加入到了 AQS 的阻塞队列中:


然后使用自旋的方式,找到当前节点的前驱节点,如果前置节点已经是 head 了,那就可以尝试获取锁了,也就是信号量的剩余,如果获取到了锁,也就是信号量大于等于0,则可以唤醒node节点,释放p :

如果前面没有成功获取到锁,下面则需要进行阻塞等待,当被唤醒时,又会接着进行自旋直到获取到锁后结束:

2.3. release() 释放信号量过程
首先进入到 release() 方法中,可以看到调用了 AQS 下的 releaseShared 方法,并传入了固定值 1 :


这里 tryReleaseShared ,其实也是一种获取锁操作,下面可以进到该方法中看下逻辑:

在该方法中同样和之前 nonfairTryAcquireShared 方法类似,都是使用乐观锁自旋的方式,不过这里做的是 + releases 也就是 +1,并将 +1 后的值进行替换,并返回 true:

下面再回到 AQS 下的 releaseShared 方法,下面会进入到 doReleaseShared 方法中,该方法则是对已经阻塞的线程进行唤醒:


三、总结
阅读了 Semaphore 的源码,可以发现内部基于AQS共享方式实现,因此在并发量大的场景下可以起到较好的性能。


















