AQS源码解析 7.共享模式_CyclicBarrier重复屏障

news2025/7/19 16:53:23

AQS源码解析 —共享模式_CyclicBarrier重复屏障

简介

CyclicBarrier:循环屏障、循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行。

应用:可以实现多线程中,某个任务在等待其他线程执行完毕以后触发。

CyclicBarrier 与 CountDownLatch 的异同

  • 两者都能实现阻塞一组线程等待被唤醒;
  • 前者是最后一个线程到达时自动唤醒;
  • 后者是通过显式地调用 countDown() 实现的;
  • 前者是通过重入锁及其条件锁实现的,后者是直接基于 AQS 实现的;
  • 前者具有“代”的概念,可以重复使用,后者只能使用一次;
  • 前者只能实现多个线程到达栅栏处一起运行;
  • 后者不仅可以实现多个线程等待一个线程条件成立,还能实现一个线程等待多个线程条件成立(详见 CountDownLatch 那章使用案例);

与 CountDownLatch 主要区别:CyclicBarrier 是可以重用的。

  • CountDownLatch 是一次性的,循环多次使用,每次都需要重新 new 一次(因为会清空计数),无法重用

    ExecutorService service = Executors.newFixedThreadPool(2);
    for (int i = 0; i < 3; i++) {
        CountDownLatch latch = new CountDownLatch(2); // 每循环一次就需要重新 new 一次
        service.submit(() -> {
            log.debug("task1 start...");
            sleep(1);
            latch.countDown();
        });
        service.submit(() -> {
            log.debug("task2 start...");
            sleep(2);
            latch.countDown();
        });
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug("task1 task2 finish...");
    }
    service.shutdown();
    
  • CyclicBarrier,可以重用

    ExecutorService service = Executors.newFixedThreadPool(2);
        // 可重用 下次被调用不会清空计数 恢复为 2
        CyclicBarrier barrier = new CyclicBarrier(2, () -> { // 个数为2时才会继续执行
            System.out.println("task1, task2 finish..."); // 这里的任务执行时机是在其余的所有任务都执行完成后
        });
        for (int i = 0; i < 3; i++) {
            service.submit(() -> {
                log.debug("task1 begin...");
                sleep(1);
                try {
                    barrier.await(); // 2-1=1
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
            service.submit(() -> {
                log.debug("task2 begin...");
                sleep(2);
                try {
                    barrier.await(); // 1-1=0
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
        service.shutdown();
    }
    

    执行结果

    21:20:03.323 c.TestCyclicBarrier [pool-1-thread-2] - task2 begin...
    21:20:03.323 c.TestCyclicBarrier [pool-1-thread-1] - task1 begin...
    21:20:03.323 c.TestCyclicBarrier [pool-1-thread-3] - task1 begin...
    21:20:04.340 c.TestCyclicBarrier [pool-1-thread-1] - task1, task2 finish...
    21:20:04.340 c.TestCyclicBarrier [pool-1-thread-1] - task2 begin...
    21:20:04.340 c.TestCyclicBarrier [pool-1-thread-3] - task1 begin...
    21:20:05.347 c.TestCyclicBarrier [pool-1-thread-3] - task1, task2 finish...
    21:20:05.347 c.TestCyclicBarrier [pool-1-thread-3] - task2 begin...
    21:20:07.356 c.TestCyclicBarrier [pool-1-thread-3] - task1, task2 finish...
    

    可以看到 只有 task1 和 task2 这两个任务都完成时,才会打破 CyclicBarrier 的屏障,执行 await() 后续代码逻辑。并且屏障可以重用。

工作原理图

image-20221120145216352

入门案例

public class CyclicBarrierDemo {
    /**
     * 案例:模拟英雄联盟 游戏开始逻辑
     */
    public static void main(String[] args) {
        // 第1步:定义5个英雄
        String[] heroes = { "青钢影", "武器大师", "剑姬", "腕豪", "剑魔" };
        // 第2步:创建固定的线程池,线程数量为5
        ExecutorService service = Executors.newFixedThreadPool(5);
        // 第3步:创建barrier,parties 设置为5
        CyclicBarrier barrier = new CyclicBarrier(5);
        // 第4步:通过for循环开启5任务,模拟开始游戏,传递给每个任务 英雄名称和barrier
        for (int i = 0; i < 5; i++) {
            service.execute(new Player(heroes[i], barrier));
        }
        service.shutdown();
    }

    static class Player implements Runnable {
        private String hero;
        private CyclicBarrier barrier;

        public Player(String hero, CyclicBarrier barrier) {
            this.hero = hero;
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                // 每个玩家加载进度不一样,这里使用随机数来模拟
                TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                System.out.println(hero + " 加载进度100% 等待其他玩家加载中...");
                barrier.await();
                System.out.println(hero + " 加载完成,欢迎来到英雄联盟,点击左侧录制按钮,即可开始录制本局比赛");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

执行结果

武器大师 加载进度100% 等待其他玩家加载中...
青钢影 加载进度100% 等待其他玩家加载中...
腕豪 加载进度100% 等待其他玩家加载中...
剑姬 加载进度100% 等待其他玩家加载中...
剑魔 加载进度100% 等待其他玩家加载中...
剑魔 加载完成,欢迎来到英雄联盟,点击左侧录制按钮,即可开始录制本局比赛
武器大师 加载完成,欢迎来到英雄联盟,点击左侧录制按钮,即可开始录制本局比赛
剑姬 加载完成,欢迎来到英雄联盟,点击左侧录制按钮,即可开始录制本局比赛
腕豪 加载完成,欢迎来到英雄联盟,点击左侧录制按钮,即可开始录制本局比赛
青钢影 加载完成,欢迎来到英雄联盟,点击左侧录制按钮,即可开始录制本局比赛

源码解析

属性及构造方法

public class CyclicBarrier {  
    
    // 静态内部类 Generation 表示 “代” 这个概念
    // 一开始到来的线程 都会在某一个代中挂起,当最后一个线程到达时,这个代中的线程就可以全部通过了,随后会开启新的一个代。
	private static class Generation {
        /*
         * 表示当前代是否被打破,如果代被打破,那么再来到这一代的线程,就会直接抛出BrokenException异常,
         * 且在这一代挂起的线程都会被唤醒,然后抛出异常BrokenException。
         *
         * 在await()方法时,正常情况下被阻塞的线程被唤醒后,如果跳出await()就会判断
         * 原代和新代是否是一个,因为最后一个达到的线程会将创建新代。
         */
        boolean broken = false;
    }
	
    // 因为CyclicBarrier的实现是依赖于Condition等待队列的,而Condition等待队列必须依赖lock
    private final ReentrantLock lock = new ReentrantLock();
	
    /*
     * 线程挂起实现使用的等待队列,条件:当前代所有线程到位(count = 0),这个等待队列的线程才会被唤醒
     */
    private final Condition trip = lock.newCondition();
	
    // barrier需要参与进来的线程数量(可以比作『人满发车』)
    private final int parties;
	
    // 当前代 最后一个到位的线程需要执行的事件
    private final Runnable barrierCommand;
	
    // 表示barrier对象,当前代
    private Generation generation = new Generation();
	
    // 当前代还有多少个线程未到位(初始值为parties)
    private int count;
    
    /*
     * 构造器
     * @param parties 表示需要参与的线程数量,每次屏障需要参与的线程数
     * @param barrierAction 当前 “代” 最后一个到位的线程,需要执行的事件(可以为null)
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        // parties <= 0 抛出异常
        if (parties <= 0) throw new IllegalArgumentException();
        // 为内部属性赋值
        this.parties = parties;
        this.count = parties; // count的初始值 就是parties,后面当前代每到位一个线程,count--
        this.barrierCommand = barrierAction;
    }

简单小方法

    /*
     * 开启下一代,当这一代 所有线程到位后(假设barrierCommond不为空,还需要最后一个线程执行完事件), 会调用nextGeneration()开启下一代。
     */
	private void nextGeneration() {
        // 将在trip条件队列内挂起的线程 全部唤醒
        trip.signalAll();
        // 重置count为parties
        count = parties;  
        // 开启下一代 使用一个新的generation对象 表示新的一代,新的一代和上一代没有任何关系
        generation = new Generation();
    }
	/*
	 * 打破barrier屏障,在屏障内部的线程 都会抛出异常
	 */
    private void breakBarrier() {
        // 将"代"中的broken设置为true,表示这一代是被打破了的,再来到这一代的线程直接抛出异常
        generation.broken = true;
        // 重置count为parties
        count = parties;
        /*
         * 将等待队列中的线程全部唤醒,唤醒后的线程会检查当前代是否是被打破的,
         * 如果是被打破的话,接下来的逻辑和开启下一代唤醒的逻辑不一样。
         */
        trip.signalAll();
    }

await()

	public int await() throws InterruptedException, BrokenBarrierException {
        try {
            // 底层调用的是dowait()方法,这里分析一个不带超时时间的dowait()
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); 
        }
    }
                   ||
                   ||
                   \/

dowait()

	/*
     * @param timed 表示当前调用await()方法的线程是否指定了超时时长
     * @param nanos 表示线程等待超时时长,如果timed == false,那么nanos == 0
     */
	private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException { // 抛出3种异常:中断、打破、超时
        // 获取全局锁
        final ReentrantLock lock = this.lock;        
        // 加锁                 
        // 为什么要加锁呢?
        // 因为 barrier的挂起 和 唤醒 依赖的组件是 Condition
        lock.lock();
        try {
            // 获取barrier当前的 “代”
            final Generation g = generation;
            // 如果当前代已经被打破状态,则当前调用await()方法的线程,直接抛出Broken异常
            if (g.broken)
                throw new BrokenBarrierException();
            // 如果当前线程的中断标志位为true,则打破当前代,然后当前线程抛出中断异常
            if (Thread.interrupted()) {
                /*
                 * 此方法将代中的broken打破标志设置为true,并重置count,唤醒等待队列的所有节点
                 */
                breakBarrier();
                // 抛出中断异常
                throw new InterruptedException();
            }
            
            /*
             * 线程执行到这里,说明当前线程中断状态是正常的(false),并且当前“代”的broken为false(未打破状态)
             */
            
            // 将count - 1 赋值给 index
            int index = --count;
            /*
             * 条件成立:表示当前线程是最后一个到达barrier的线程
             */
            if (index == 0) {  // tripped  
                // ranAction -> true 表示最后一个到达barrier的线程在执行内部的barrierCommand任务时没有抛出异常,否则抛出了异常。
                boolean ranAction = false;
                try {
                    // 拿到创建的barrierCommand
                    final Runnable command = barrierCommand;     
                    // barrierCommand不为null,将其执行
                    if (command != null)
                        command.run();
                    // 如果执行barrierCommand => command.run()未抛出异常,线程会执行到这里
                    // 执行完成,设置标记位为true
                    ranAction = true;   
                    /*
                     * 开启新一代
                     * 1.唤醒等待队列内的线程,被唤醒的线程会依次获取到锁(state),然后依次退出await方法
                     * 2.重置count为parties
                     * 3.创建一个新的generation,表示新的一代
                     */
                    nextGeneration();
                    // 返回0,因为当前线程是此代最后一个到达的线程,所以index == 0
                    return 0;
                } finally {
                    // 如果执行barrierCommand => command.run()出现异常,会进入到这里
                    if (!ranAction)
                        // 打破屏障
                        breakBarrier();
                }
            }
			
            /*
             * 执行到这里,说明当前线程并不是最后一个到达barrier的线程,此时需要进入自旋。
             */
            
            // 自旋,一直到条件满足 或者 当前代被打破、线程被中断、等待超时
            for (;;) {
                try {
                    // 条件成立:说明当前线程是不指定超时时间的
                    if (!timed)
                    	// 当前线程会释放掉lock,然后进入等待队列的尾部,然后挂起 等待被唤醒
                        trip.await();
                    // 响应超时时间
                    else if (nanos > 0L)
                        // 调用awaitNanos方法(带超时的阻塞)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    /*
                     * 抛出中断异常,会进来这里
                     * 什么时候会抛出中断异常呢?
                     * Node节点在等待队列内收到中断信号,会抛出中断异常
                     */
                    /*
                     * 条件1:g == generation 成立,说明当前代并没有变化
                     * 条件2:!g.broken,当前代如果没有被打破,那么当前线程就去打破,并且抛出异常
                     */ 
                    if (g == generation && ! g.broken) {
                        // 打破barrier
                        breakBarrier();
                        // 抛出中断异常
                        throw ie;
                    } else {
						/*
						 * 执行到else有几种情况?
						 * 1.代发生了变化,这个时候就不需要抛出中断异常了,因为代已经更新了,这里唤醒后就走正常逻辑了,只不过设置下中断标记
						 * 2.代未发生变化,但是代被打破了,此时也不用返回中断异常,执行到下面的时候会抛出brokenBarrier异常,也记录下中断标志位
						 */
                        Thread.currentThread().interrupt();
                    }
                }
                /*
                 * 唤醒后执行到这里 有几种情况?
                 * 1.正常情况,当前barrier开启了新的一代
                 * 2.当前generation被打破,此时也会唤醒所有在trip等待队列上挂起的线程
                 * 3.当前线程在等待队列中超时,然后主动转移到同步队列,然后获取到锁 唤醒
                 */      
                // 表示当前代已经被打破
                if (g.broken)
                    // 线程唤醒后依次抛出BrokenBarrier异常
                    throw new BrokenBarrierException();
                // 条件成立:说明当前线程挂起期间,最后一个线程到位了,然后触发了开启新一轮的逻辑,此时唤醒等待队列中的线程
                // 这是一次正常的线程被唤醒后退出的逻辑
                if (g != generation)
                    return index;
                // 超时判断
                if (timed && nanos <= 0L) {
                    // 打破barrier
                    breakBarrier();
                    // 抛出超时异常
                    throw new TimeoutException();
                }
            }
        } finally {
            // 解锁
            lock.unlock();
        }
    }

总结

  • CyclicBarrier 有一个 “代” 的概念,一开始到来的线程,都会在某一个代中挂起,当最后一个线程到达时,这个代中的线程就可以全部通过了,随后会开启新的一个代。
  • CyclicBarrier 会使一组线程阻塞在 await() 处,当最后一个线程到达时唤醒(只是从条件队列转移到 AQS 队列中)前面的线程大家再继续往下走;
  • CyclicBarrier 不是直接使用 AQS 实现的一个同步器,是基于 Lock 和 Condition 的一个实践案例,实现整个同步逻辑;
  • 整体来说,只有一个方法 dowait() 。



参考

  • 视频参考
    • b站_小刘讲源码付费课
    • b站_黑马程序员深入学习Java并发编程,JUC并发编程全套教程
  • 文章参考
    • shstart7_AQS共享模式之CyclicBarrier源码解析
    • 兴趣使然的草帽路飞_AQS源码探究_08 CyclicBarrier源码分析
    • 肆华_CyclicBarrier阅读理解

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/37845.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【多目标进化优化】多目标进化群体的分布性

0 前言 \quad\quad进化算法是模拟生物自然进化的人工方法&#xff0c;与大自然生态环境一样&#xff0c;进化的物种也需要平衡发展。因此&#xff0c;设计者必须制定合适的生存规则来维持种群的多样性和分布性。在多目标进化算法中&#xff0c;对于某些问题&#xff0c;Pareto最…

微机-------可编程并行接口8255A

目录 8255A的内部结构8255A控制信息和传输动作的对应关系⭐8255A的控制字一、方式选择控制字①方式0(基本输入输出工作方式)二、端口C置1/置0控制字8255A的工作方式②方式1(选通的输入输出工作方式)③方式2(双向传输方式)⭐⭐8255的编程及应用8255A的内部结构 ①数据总线…

Steam项目推进(二)—— 在项目中使用FairyGUI

一、遇到的问题 昨天把代码大致清理了一遍之后&#xff0c;发现代码中存在很大的一个问题是数据和表现耦合在一起了&#xff0c;如下&#xff1a; using UnityEngine; using UnityEngine.UI;public enum CardStateType {InDeck, InHand, InBattle, InSave, InAbandon }//卡牌…

Cisco简单配置(十八)—OSPF

开放式最短路径优先&#xff08;Open Shortest Path First&#xff0c;OSPF&#xff09;是广泛使用的一种动态路由协议&#xff0c;它属于链路状态路由协议&#xff0c;具有路由变化收敛速度快、无路由环路、支持变长子网掩码&#xff08;VLSM&#xff09;和汇总、层次区域划分…

设计模式-组合模式(决策树)

一、只如初见 组合模式也许大家第一联想到的就是把两个模块组合起来使用&#xff0c;其实好像是这样也其实不是这样&#xff0c;废话不多说&#xff0c;学习一件新的事物总要先了解一下他的概念&#xff0c;老规矩先上概念&#xff08;摘自百度百科&#xff09;&#xff1a; 组…

【活动预告】金融大数据治理实践分享(12/03)

原创 DAMA数据管理 # 本期主题 金融大数据治理实践分享 数字化时代&#xff0c;数据的价值受到越来越多的关注&#xff0c;有人将其比作黄金&#xff0c;也有人将其比作石油&#xff0c;成为组织中的最重要资产之一。针对数据这种有特殊属性的资产&#xff0c;也存在着采集…

[论文阅读] 颜色迁移-N维pdf迁移

[论文阅读] 颜色迁移-N维pdf迁移 文章: N-Dimensional Probability Density Function Transfer and its Application to Colour Transfer, [paper ][code] 1-算法原理 简单来说, 本文将图像看作是随机变量的一组样本, 图像之间的颜色迁移可以看作是样本之间分布的迁移. 因而…

G1D23-RAGA报名蓝桥Attackg安装cudatorch

昨天太摸鱼啦~不过蛮开心的哈哈 今天主要是把积累的ddl都清理一下&#xff01;&#xff01;&#xff01;第一项就是我和舍友一起读的论文嘿嘿&#xff01;&#xff01; 一、RAGA &#xff08;零&#xff09;总结&#xff08;仅模型&#xff09; 作为数据挖掘顶会2021年的论文…

【MATLAB教程案例46】三维数据的插值和滤波处理matlab仿真

欢迎订阅《FPGA学习入门100例教程》、《MATLAB学习入门100例教程》 本课程学习成果预览: 目录 1.软件版本 2.三维数据插值

openFeign夺命连环9问,这谁受得了?

1、前言 前面介绍了Spring Cloud 中的灵魂摆渡者Nacos&#xff0c;和它的前辈们相比不仅仅功能强大&#xff0c;而且部署非常简单。 今天介绍一款服务调用的组件&#xff1a;OpenFeign&#xff0c;同样是一款超越先辈&#xff08;Ribbon、Feign&#xff09;的狠角色。 文章目…

linux 安装新版傻妞+TG+青龙

一键安装命令 #国内服务器要先设置网络代理set sillyGirl download_prefix https://yanyu.ltd/#一键安装命令ssillyGirl;aarm64;if [[ $(uname -a | grep "x86_64") ! "" ]];then aamd64;fi ;if [ ! -d $s ];then mkdir $s;fi ;cd $s;wget https://yanyu.…

git回滚指定版本相关操作

当提交推送到远程仓库之后&#xff0c;需要回退到特定版本,去修改该代码,然后在推送到远程仓库; 1.查看目前版本状态: git status 2.查看提交日志&#xff0c;找到需要回滚的git版本号 git log 3.将当前分支回滚到id9c45732c5701fc84164bebe3c05760a72a4ece12 #这个是软回滚&am…

一个基于容斥原理的概率模型

为论述概率模型的思想&#xff0c;本部分以下图所描述的情况作为例子讲述&#xff0c;为简化图省略线路开关。 不同于单供网络&#xff0c;双供网络由于多条联络线&#xff0c;存在多个扩展供电方案。设供电路径P(p1,p2,..,pn)P(p_1,p_2,..,p_n)P(p1​,p2​,..,pn​)&#xff…

wodFtpDLX ActiveX 组件--Crack

wodFtpDLX ActiveX 组件 FTP 组件&#xff0c;安全&#xff08;SSL、SSH&#xff09;FTP ActiveX 客户端 FtpDLX 组件是 FTP&#xff08;或者更确切地说&#xff0c;文件传输&#xff09;客户端组件。它不仅提供老式的 FTP 协议&#xff0c;还允许您使用安全的 SFTP&#xff08…

短视频怎么在平台规则之内更快更好的吸引用户粉丝的关注

短视频怎么在平台规则之内更快更好的吸引用户粉丝的关注 每天一组短视频运营小技巧&#xff0c;碎片化学习优化自己的账号&#xff0c;今天来学习内容发布技巧&#xff1a; 内容上: 关心用户喜欢看什么 &#xff0c;在视频中埋下泪点笑点吐槽点以及所有你能想到的可以激发观众…

浅谈Linux系统信息与资源

大家将来应用开发Linux程序&#xff0c;无论是ARM架构的板子&#xff0c;还是在Linux上开发应用程序&#xff0c;相信大家都会用到到一些系统相关的信息&#xff0c;譬如时间、日期、以及其它一些系统相关信息&#xff0c;今天带大家了解一下如何通过 Linux 系统调用或 C 库函数…

springMVC参数绑定源码分析

一、遇到的问题 1. 在请求时get方法路径传参&#xff0c;接收时&#xff0c;用枚举接收&#xff0c;出现参数绑定错误 请求路径&#xff1a;http://localhost:9104/api/sent/test2?type0 后端代码&#xff1a; GetMapping("/test2")public String openNewFile2(…

基于优先级的时间片轮转调度算法(C语言实现)

已剪辑自: http://www.demodashi.com/demo/15341.html 基于优先级的时间片轮转调度算法 1. PCB结构&#xff08;Block&#xff09; 由此定义如下结构体&#xff1a; typedef struct Block {int processID; // 进程号int priority; // 优先级int status; // 状态double arriv…

PyQt5 JavaScript调用PyQt代码

JavaScript调用PyQt代码JavaScript调用PyQt代码,是指PyQt可以与加载的Web页面进行双向的数据交互。1.创建QWebChannel对象&#xff1a;创建QWebChannel对象&#xff0c;注册一个需要桥接的对象&#xff0c;以便Web页面的JavaScript使用。其核心代码如下&#xff1a;channel QW…

JUC并发编程与源码分析笔记01-本课程前置知识及要求说明

JUC是什么 JUC是指java.util.concurrent包&#xff0c;在并发编程中广泛使用。 官方文档搜索java.util.concurrent&#xff0c;可以看到有java.util.concurrent、java.util.concurrent。atomic、java.util.concurrent.locks。 本课程学生对象&#xff08;非零基础&#xff09…