前言
jdk 中提供了许多的并发工具类,大家可能比较熟悉的有CountDownLatch,主要用来阻塞一个线程运行,直到其他线程运行完毕。而 jdk 还有一个功能类似并发工具类CyclicBarrier,你知道它的作用吗?和CountDownLatch有什么区别呢?
对于 CountDownLatch 不了解的可以参考# CountDownLatch源码硬核解析
介绍和使用
CyclicBarrier,循环屏障,用来进行线程协作,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,会触发自己运行,运行完后,屏障又会开门,所有被屏障拦截的线程又可以继续运行。所以CyclicBarrier 是可以重用的。
为了更好的理解,我们举个例子,如下图所示:

我们将屏障想成栅栏,5 个线程想成 5 头猪。5 头猪开始往前跑,直到都跑到栅栏前,栅栏开始做个自己的任务,比如看看猪多重。然后打开栅栏,猪又会继续跑,跑到下一个栅栏,就这样循环....
API 介绍
构造方法
-
public CyclicBarrier(int parties): 创建parties个线程任务的循环CyclicBarrier -
public CyclicBarrier(int parties, Runnable barrierAction): 当parties个线程到到屏障出,自己执行任务barrierAction
常用 API
-
int await():线程调用await方法通知CyclicBarrier本线程已经到达屏障
基本使用
我们将上面猪猪的例子通过CyclicBarrier简单做一个实现。
@Slf4j(topic = "c.CyclicBarrierPig")public class CyclicBarrierPig {public static void main(String[] args) {ExecutorService service = Executors.newFixedThreadPool(5);CyclicBarrier barrier = new CyclicBarrier(5, () -> {System.out.println("主人看看哪个猪跑最快,最肥...");});// 循环跑3次for (int i = 0; i < 3; i++) {// 5条猪开始跑for(int j = 0; j<5; j++) {int finalJ = j;service.submit(() -> {log.info("pig{} is run .....", finalJ);try {// 随机时间,模拟跑花费的时间Thread.sleep(new Random().nextInt(5000));log.info("pig{} reach barrier .....", finalJ);barrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}});}}service.shutdown();}}
复制代码
运行结果:

实现原理
我们已经明白CyclicBarrier的基本使用了,那我们看看它是如何实现的。
成员属性
-
全局锁:利用可重入锁实现的工具类
// barrier 实现是依赖于Condition条件队列,condition 条件队列必须依赖lock才能使用private final ReentrantLock lock = new ReentrantLock();// 线程挂起实现使用的 condition 队列,当前代所有线程到位,这个条件队列内的线程才会被唤醒private final Condition trip = lock.newCondition();
复制代码
-
线程数量
// 代表多少个线程到达屏障开始触发线程任务private final int parties;// 表示当前“代”还有多少个线程未到位,初始值为 partiesprivate int count;
复制代码
-
当前代中最后一个线程到位后要执行的任务
private final Runnable barrierCommand;
复制代码
-
代, 也是用标记一次循环
// 表示 barrier 对象当前 代private Generation generation = new Generation();private static class Generation {// 表示当前“代”是否被打破,如果被打破再来到这一代的线程 就会直接抛出 BrokenException 异常// 且在这一代挂起的线程都会被唤醒,然后抛出 BrokerException 异常。boolean broken = false;}
复制代码
构造方法
public CyclicBarrie(int parties, Runnable barrierAction) {// 因为小于等于 0 的 barrier 没有任何意义if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;// 可以为 nullthis.barrierCommand = barrierAction;}
复制代码
成员方法
-
await():阻塞等待所有线程到位
public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}}// timed:表示当前调用await方法的线程是否指定了超时时长,如果 true 表示线程是响应超时的// nanos:线程等待超时时长,单位是纳秒private int dowait(boolean timed, long nanos) {final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 获取当前代final Generation g = generation;// 【如果当前代是已经被打破状态,则当前调用await方法的线程,直接抛出Broken异常】if (g.broken)throw new BrokenBarrierException();// 如果当前线程被中断了,则打破当前代,然后当前线程抛出中断异常if (Thread.interrupted()) {// 设置当前代的状态为 broken 状态,唤醒在 trip 条件队列内的线程breakBarrier();throw new InterruptedException();}// 逻辑到这说明,当前线程中断状态是 false, 当前代的 broken 为 false(未打破状态)// 假设 parties 给的是 5,那么index对应的值为 4,3,2,1,0int index = --count;// 条件成立说明当前线程是最后一个到达 barrier 的线程,【需要开启新代,唤醒阻塞线程】if (index == 0) {// 栅栏任务启动标记boolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)// 启动触发的任务command.run();// run()未抛出异常的话,启动标记设置为 trueranAction = true;// 开启新的一代,这里会【唤醒所有的阻塞队列】nextGeneration();// 返回 0 因为当前线程是此代最后一个到达的线程,index == 0return 0;} finally {// 如果 command.run() 执行抛出异常的话,会进入到这里if (!ranAction)breakBarrier();}}// 自旋,一直到条件满足、当前代被打破、线程被中断,等待超时for (;;) {try {// 根据是否需要超时等待选择阻塞方法if (!timed)// 当前线程释放掉 lock,【进入到 trip 条件队列的尾部挂起自己】,等待被唤醒trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {// 被中断后来到这里的逻辑// 当前代没有变化并且没有被打破if (g == generation && !g.broken) {// 打破屏障breakBarrier();// node 节点在【条件队列】内收到中断信号时 会抛出中断异常throw ie;} else {// 等待过程中代变化了,完成一次自我打断Thread.currentThread().interrupt();}}// 唤醒后的线程,【判断当前代已经被打破,线程唤醒后依次抛出 BrokenBarrier 异常】if (g.broken)throw new BrokenBarrierException();// 当前线程挂起期间,最后一个线程到位了,然后触发了开启新的一代的逻辑if (g != generation)return index;// 当前线程 trip 中等待超时,然后主动转移到阻塞队列if (timed && nanos <= 0L) {breakBarrier();// 抛出超时异常throw new TimeoutException();}}} finally {// 解锁lock.unlock();}}
复制代码
-
breakBarrier():打破 Barrier 屏障
private void breakBarrier() {// 将代中的 broken 设置为 true,表示这一代是被打破了,再来到这一代的线程,直接抛出异常generation.broken = true;// 重置 count 为 partiescount = parties;// 将在trip条件队列内挂起的线程全部唤醒,唤醒后的线程会检查当前是否是打破的,然后抛出异常trip.signalAll();}
复制代码
-
nextGeneration():开启新的下一代
private void nextGeneration() {// 将在 trip 条件队列内挂起的线程全部唤醒trip.signalAll();// 重置 count 为 partiescount = parties;// 开启新的一代,使用一个新的generation对象,表示新的一代,新的一代和上一代【没有任何关系】generation = new Generation();}
复制代码
和 CountDownLatch 的区别
相同点
二者都能让一个或多个线程阻塞等待,都可以用在多个线程间的协调,起到线程同步的作用。
不同点
-
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以反复 使用。 -
CountDownLatch.await一般阻塞工作线程,所有的进行预备工作的线程执行countDown,而CyclicBarrier通过工作线程调用await从而自行阻塞,直到所有工作线程达到指定屏障,所有的线程才会返回各自执行自己的工作。 -
CyclicBarrier还可以提供一个barrierAction,合并多线程计算结果。 -
CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
总结
本文讲解了CyclicBarrier 的基本功能和使用,同时讲解了它大致的实现。关于CyclicBarrier 具体有什么使用场景,你可能还是比较迷惑,我再举个例子,比如CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。
如果本文对你有帮助的话,请留下一个赞吧


![[附源码]Python计算机毕业设计SSM开心鲜花系统(程序+LW)](https://img-blog.csdnimg.cn/78a2249d76c74b7faaf73082b5d309c5.png)









![[附源码]Python计算机毕业设计SSM绝味鸭脖连锁店信息系统(程序+LW)](https://img-blog.csdnimg.cn/f4a0c865d0b04218b3efec392e386edd.png)






