AQS源码解析 —共享模式_CyclicBarrier重复屏障
简介
CyclicBarrier:循环屏障、循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行。
应用:可以实现多线程中,某个任务在等待其他线程执行完毕以后触发。
CyclicBarrier 与 CountDownLatch 的异同
- 两者都能实现阻塞一组线程等待被唤醒;
- 前者是最后一个线程到达时自动唤醒;
- 后者是通过显式地调用 countDown() 实现的;
- 前者是通过重入锁及其条件锁实现的,后者是直接基于 AQS 实现的;
- 前者具有“代”的概念,可以重复使用,后者只能使用一次;
- 前者只能实现多个线程到达栅栏处一起运行;
- 后者不仅可以实现多个线程等待一个线程条件成立,还能实现一个线程等待多个线程条件成立(详见 CountDownLatch 那章使用案例);
与 CountDownLatch 主要区别:CyclicBarrier 是可以重用的。
-
CountDownLatch 是一次性的,循环多次使用,每次都需要重新 new 一次(因为会清空计数),无法重用
ExecutorService service = Executors.newFixedThreadPool(2); for (int i = 0; i < 3; i++) { CountDownLatch latch = new CountDownLatch(2); // 每循环一次就需要重新 new 一次 service.submit(() -> { log.debug("task1 start..."); sleep(1); latch.countDown(); }); service.submit(() -> { log.debug("task2 start..."); sleep(2); latch.countDown(); }); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("task1 task2 finish..."); } service.shutdown();
-
CyclicBarrier,可以重用
ExecutorService service = Executors.newFixedThreadPool(2); // 可重用 下次被调用不会清空计数 恢复为 2 CyclicBarrier barrier = new CyclicBarrier(2, () -> { // 个数为2时才会继续执行 System.out.println("task1, task2 finish..."); // 这里的任务执行时机是在其余的所有任务都执行完成后 }); for (int i = 0; i < 3; i++) { service.submit(() -> { log.debug("task1 begin..."); sleep(1); try { barrier.await(); // 2-1=1 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); service.submit(() -> { log.debug("task2 begin..."); sleep(2); try { barrier.await(); // 1-1=0 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); } service.shutdown(); }
执行结果
21:20:03.323 c.TestCyclicBarrier [pool-1-thread-2] - task2 begin... 21:20:03.323 c.TestCyclicBarrier [pool-1-thread-1] - task1 begin... 21:20:03.323 c.TestCyclicBarrier [pool-1-thread-3] - task1 begin... 21:20:04.340 c.TestCyclicBarrier [pool-1-thread-1] - task1, task2 finish... 21:20:04.340 c.TestCyclicBarrier [pool-1-thread-1] - task2 begin... 21:20:04.340 c.TestCyclicBarrier [pool-1-thread-3] - task1 begin... 21:20:05.347 c.TestCyclicBarrier [pool-1-thread-3] - task1, task2 finish... 21:20:05.347 c.TestCyclicBarrier [pool-1-thread-3] - task2 begin... 21:20:07.356 c.TestCyclicBarrier [pool-1-thread-3] - task1, task2 finish...
可以看到 只有 task1 和 task2 这两个任务都完成时,才会打破 CyclicBarrier 的屏障,执行 await() 后续代码逻辑。并且屏障可以重用。
工作原理图
入门案例
public class CyclicBarrierDemo {
/**
* 案例:模拟英雄联盟 游戏开始逻辑
*/
public static void main(String[] args) {
// 第1步:定义5个英雄
String[] heroes = { "青钢影", "武器大师", "剑姬", "腕豪", "剑魔" };
// 第2步:创建固定的线程池,线程数量为5
ExecutorService service = Executors.newFixedThreadPool(5);
// 第3步:创建barrier,parties 设置为5
CyclicBarrier barrier = new CyclicBarrier(5);
// 第4步:通过for循环开启5任务,模拟开始游戏,传递给每个任务 英雄名称和barrier
for (int i = 0; i < 5; i++) {
service.execute(new Player(heroes[i], barrier));
}
service.shutdown();
}
static class Player implements Runnable {
private String hero;
private CyclicBarrier barrier;
public Player(String hero, CyclicBarrier barrier) {
this.hero = hero;
this.barrier = barrier;
}
@Override
public void run() {
try {
// 每个玩家加载进度不一样,这里使用随机数来模拟
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println(hero + " 加载进度100% 等待其他玩家加载中...");
barrier.await();
System.out.println(hero + " 加载完成,欢迎来到英雄联盟,点击左侧录制按钮,即可开始录制本局比赛");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
执行结果
武器大师 加载进度100% 等待其他玩家加载中...
青钢影 加载进度100% 等待其他玩家加载中...
腕豪 加载进度100% 等待其他玩家加载中...
剑姬 加载进度100% 等待其他玩家加载中...
剑魔 加载进度100% 等待其他玩家加载中...
剑魔 加载完成,欢迎来到英雄联盟,点击左侧录制按钮,即可开始录制本局比赛
武器大师 加载完成,欢迎来到英雄联盟,点击左侧录制按钮,即可开始录制本局比赛
剑姬 加载完成,欢迎来到英雄联盟,点击左侧录制按钮,即可开始录制本局比赛
腕豪 加载完成,欢迎来到英雄联盟,点击左侧录制按钮,即可开始录制本局比赛
青钢影 加载完成,欢迎来到英雄联盟,点击左侧录制按钮,即可开始录制本局比赛
源码解析
属性及构造方法
public class CyclicBarrier {
// 静态内部类 Generation 表示 “代” 这个概念
// 一开始到来的线程 都会在某一个代中挂起,当最后一个线程到达时,这个代中的线程就可以全部通过了,随后会开启新的一个代。
private static class Generation {
/*
* 表示当前代是否被打破,如果代被打破,那么再来到这一代的线程,就会直接抛出BrokenException异常,
* 且在这一代挂起的线程都会被唤醒,然后抛出异常BrokenException。
*
* 在await()方法时,正常情况下被阻塞的线程被唤醒后,如果跳出await()就会判断
* 原代和新代是否是一个,因为最后一个达到的线程会将创建新代。
*/
boolean broken = false;
}
// 因为CyclicBarrier的实现是依赖于Condition等待队列的,而Condition等待队列必须依赖lock
private final ReentrantLock lock = new ReentrantLock();
/*
* 线程挂起实现使用的等待队列,条件:当前代所有线程到位(count = 0),这个等待队列的线程才会被唤醒
*/
private final Condition trip = lock.newCondition();
// barrier需要参与进来的线程数量(可以比作『人满发车』)
private final int parties;
// 当前代 最后一个到位的线程需要执行的事件
private final Runnable barrierCommand;
// 表示barrier对象,当前代
private Generation generation = new Generation();
// 当前代还有多少个线程未到位(初始值为parties)
private int count;
/*
* 构造器
* @param parties 表示需要参与的线程数量,每次屏障需要参与的线程数
* @param barrierAction 当前 “代” 最后一个到位的线程,需要执行的事件(可以为null)
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
// parties <= 0 抛出异常
if (parties <= 0) throw new IllegalArgumentException();
// 为内部属性赋值
this.parties = parties;
this.count = parties; // count的初始值 就是parties,后面当前代每到位一个线程,count--
this.barrierCommand = barrierAction;
}
简单小方法
/*
* 开启下一代,当这一代 所有线程到位后(假设barrierCommond不为空,还需要最后一个线程执行完事件), 会调用nextGeneration()开启下一代。
*/
private void nextGeneration() {
// 将在trip条件队列内挂起的线程 全部唤醒
trip.signalAll();
// 重置count为parties
count = parties;
// 开启下一代 使用一个新的generation对象 表示新的一代,新的一代和上一代没有任何关系
generation = new Generation();
}
/*
* 打破barrier屏障,在屏障内部的线程 都会抛出异常
*/
private void breakBarrier() {
// 将"代"中的broken设置为true,表示这一代是被打破了的,再来到这一代的线程直接抛出异常
generation.broken = true;
// 重置count为parties
count = parties;
/*
* 将等待队列中的线程全部唤醒,唤醒后的线程会检查当前代是否是被打破的,
* 如果是被打破的话,接下来的逻辑和开启下一代唤醒的逻辑不一样。
*/
trip.signalAll();
}
await()
public int await() throws InterruptedException, BrokenBarrierException {
try {
// 底层调用的是dowait()方法,这里分析一个不带超时时间的dowait()
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe);
}
}
||
||
\/
dowait()
/*
* @param timed 表示当前调用await()方法的线程是否指定了超时时长
* @param nanos 表示线程等待超时时长,如果timed == false,那么nanos == 0
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException { // 抛出3种异常:中断、打破、超时
// 获取全局锁
final ReentrantLock lock = this.lock;
// 加锁
// 为什么要加锁呢?
// 因为 barrier的挂起 和 唤醒 依赖的组件是 Condition
lock.lock();
try {
// 获取barrier当前的 “代”
final Generation g = generation;
// 如果当前代已经被打破状态,则当前调用await()方法的线程,直接抛出Broken异常
if (g.broken)
throw new BrokenBarrierException();
// 如果当前线程的中断标志位为true,则打破当前代,然后当前线程抛出中断异常
if (Thread.interrupted()) {
/*
* 此方法将代中的broken打破标志设置为true,并重置count,唤醒等待队列的所有节点
*/
breakBarrier();
// 抛出中断异常
throw new InterruptedException();
}
/*
* 线程执行到这里,说明当前线程中断状态是正常的(false),并且当前“代”的broken为false(未打破状态)
*/
// 将count - 1 赋值给 index
int index = --count;
/*
* 条件成立:表示当前线程是最后一个到达barrier的线程
*/
if (index == 0) { // tripped
// ranAction -> true 表示最后一个到达barrier的线程在执行内部的barrierCommand任务时没有抛出异常,否则抛出了异常。
boolean ranAction = false;
try {
// 拿到创建的barrierCommand
final Runnable command = barrierCommand;
// barrierCommand不为null,将其执行
if (command != null)
command.run();
// 如果执行barrierCommand => command.run()未抛出异常,线程会执行到这里
// 执行完成,设置标记位为true
ranAction = true;
/*
* 开启新一代
* 1.唤醒等待队列内的线程,被唤醒的线程会依次获取到锁(state),然后依次退出await方法
* 2.重置count为parties
* 3.创建一个新的generation,表示新的一代
*/
nextGeneration();
// 返回0,因为当前线程是此代最后一个到达的线程,所以index == 0
return 0;
} finally {
// 如果执行barrierCommand => command.run()出现异常,会进入到这里
if (!ranAction)
// 打破屏障
breakBarrier();
}
}
/*
* 执行到这里,说明当前线程并不是最后一个到达barrier的线程,此时需要进入自旋。
*/
// 自旋,一直到条件满足 或者 当前代被打破、线程被中断、等待超时
for (;;) {
try {
// 条件成立:说明当前线程是不指定超时时间的
if (!timed)
// 当前线程会释放掉lock,然后进入等待队列的尾部,然后挂起 等待被唤醒
trip.await();
// 响应超时时间
else if (nanos > 0L)
// 调用awaitNanos方法(带超时的阻塞)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
/*
* 抛出中断异常,会进来这里
* 什么时候会抛出中断异常呢?
* Node节点在等待队列内收到中断信号,会抛出中断异常
*/
/*
* 条件1:g == generation 成立,说明当前代并没有变化
* 条件2:!g.broken,当前代如果没有被打破,那么当前线程就去打破,并且抛出异常
*/
if (g == generation && ! g.broken) {
// 打破barrier
breakBarrier();
// 抛出中断异常
throw ie;
} else {
/*
* 执行到else有几种情况?
* 1.代发生了变化,这个时候就不需要抛出中断异常了,因为代已经更新了,这里唤醒后就走正常逻辑了,只不过设置下中断标记
* 2.代未发生变化,但是代被打破了,此时也不用返回中断异常,执行到下面的时候会抛出brokenBarrier异常,也记录下中断标志位
*/
Thread.currentThread().interrupt();
}
}
/*
* 唤醒后执行到这里 有几种情况?
* 1.正常情况,当前barrier开启了新的一代
* 2.当前generation被打破,此时也会唤醒所有在trip等待队列上挂起的线程
* 3.当前线程在等待队列中超时,然后主动转移到同步队列,然后获取到锁 唤醒
*/
// 表示当前代已经被打破
if (g.broken)
// 线程唤醒后依次抛出BrokenBarrier异常
throw new BrokenBarrierException();
// 条件成立:说明当前线程挂起期间,最后一个线程到位了,然后触发了开启新一轮的逻辑,此时唤醒等待队列中的线程
// 这是一次正常的线程被唤醒后退出的逻辑
if (g != generation)
return index;
// 超时判断
if (timed && nanos <= 0L) {
// 打破barrier
breakBarrier();
// 抛出超时异常
throw new TimeoutException();
}
}
} finally {
// 解锁
lock.unlock();
}
}
总结
- CyclicBarrier 有一个 “代” 的概念,一开始到来的线程,都会在某一个代中挂起,当最后一个线程到达时,这个代中的线程就可以全部通过了,随后会开启新的一个代。
- CyclicBarrier 会使一组线程阻塞在 await() 处,当最后一个线程到达时唤醒(只是从条件队列转移到 AQS 队列中)前面的线程大家再继续往下走;
- CyclicBarrier 不是直接使用 AQS 实现的一个同步器,是基于 Lock 和 Condition 的一个实践案例,实现整个同步逻辑;
- 整体来说,只有一个方法 dowait() 。
参考
- 视频参考
- b站_小刘讲源码付费课
- b站_黑马程序员深入学习Java并发编程,JUC并发编程全套教程
- 文章参考
- shstart7_AQS共享模式之CyclicBarrier源码解析
- 兴趣使然的草帽路飞_AQS源码探究_08 CyclicBarrier源码分析
- 肆华_CyclicBarrier阅读理解