多线程系列(十五) -常用并发工具类详解

news2025/6/17 12:31:37

一、摘要

在前几篇文章中,我们讲到了线程、线程池、BlockingQueue 等核心组件,其实 JDK 给开发者还提供了比synchronized更加高级的线程同步组件,比如 CountDownLatch、CyclicBarrier、Semaphore、Exchanger 等并发工具类。

下面我们一起来了解一下这些常用的并发工具类!

二、常用并发工具类

2.1、CountDownLatch

CountDownLatch是 JDK5 之后加入的一种并发流程控制工具类,它允许一个或多个线程一直等待,直到其他线程运行完成后再执行。

它的工作原理主要是通过一个计数器来实现,初始化的时候需要指定线程的数量;每当一个线程完成了自己的任务,计数器的值就相应得减 1;当计数器到达 0 时,表示所有的线程都已经执行完毕,处于等待的线程就可以恢复继续执行任务。

根据CountDownLatch的工作原理,它的应用场景一般可以划分为两种:

  • 场景一:某个线程需要在其他 n 个线程执行完毕后,再继续执行
  • 场景二:多个工作线程等待某个线程的命令,同时执行同一个任务

下面我们先来看下两个简单的示例。

示例1:某个线程等待 n 个工作线程

比如某项任务,先采用多线程去执行,最后需要在主线程中进行汇总处理,这个时候CountDownLatch就可以发挥作用了,具体应用如下!

public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        // 采用 10 个工作线程去执行任务
        final int threadCount = 10;
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    // 执行具体任务
                    System.out.println("thread name:" +  Thread.currentThread().getName() + ",执行完毕!");
                    // 计数器减 1
                    countDownLatch.countDown();
                }
            }).start();
        }

        // 阻塞等待 10 个工作线程执行完毕
        countDownLatch.await();
        System.out.println("所有任务线程已执行完毕,准备进行结果汇总");
    }
}

运行结果如下:

thread name:Thread-0,执行完毕!
thread name:Thread-2,执行完毕!
thread name:Thread-1,执行完毕!
thread name:Thread-3,执行完毕!
thread name:Thread-4,执行完毕!
thread name:Thread-5,执行完毕!
thread name:Thread-6,执行完毕!
thread name:Thread-7,执行完毕!
thread name:Thread-8,执行完毕!
thread name:Thread-9,执行完毕!
所有任务线程执行完毕,准备进行结果汇总
示例2:n 个工作线程等待某个线程

比如田径赛跑,10 个同学准备开跑,但是需要等工作人员发出枪声才允许开跑,使用CountDownLatch可以实现这一功能,具体应用如下!

public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        // 使用一个计数器
        CountDownLatch countDownLatch = new CountDownLatch(1);
        final int threadCount = 10;
        // 采用 10 个工作线程去执行任务
        for (int i = 0; i < threadCount; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 阻塞等待计数器为 0
                        countDownLatch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 发起某个服务请求,省略
                    System.out.println("thread name:" +  Thread.currentThread().getName() + ",开始执行!");

                }
            }).start();
        }

        Thread.sleep(1000);
        System.out.println("thread name:" +  Thread.currentThread().getName() + " 准备开始!");
        // 将计数器减 1,运行完成后为 0
        countDownLatch.countDown();
    }
}

运行结果如下:

thread name:main 准备开始!
thread name:Thread-0,开始执行!
thread name:Thread-1,开始执行!
thread name:Thread-2,开始执行!
thread name:Thread-3,开始执行!
thread name:Thread-5,开始执行!
thread name:Thread-6,开始执行!
thread name:Thread-8,开始执行!
thread name:Thread-7,开始执行!
thread name:Thread-4,开始执行!
thread name:Thread-9,开始执行!

从上面的示例可以很清晰的看到,CountDownLatch类似于一个倒计数器,当计数器为 0 的时候,调用await()方法的线程会被解除等待状态,然后继续执行。

CountDownLatch类的主要方法,有以下几个:

  • public CountDownLatch(int count):核心构造方法,初始化的时候需要指定线程数
  • countDown():每调用一次,计数器值 -1,直到 count 被减为 0,表示所有线程全部执行完毕
  • await():等待计数器变为 0,即等待所有异步线程执行完毕,否则一直阻塞
  • await(long timeout, TimeUnit unit):支持指定时间内的等待,避免永久阻塞,await()的一个重载方法

从以上的分析可以得出,当计数器为 1 的时候,即由一个线程来通知其他线程,效果等同于对象的wait()notifyAll();当计时器大于 1 的时候,可以实现多个工作线程完成任务后通知一个或者多个等待线程继续工作,CountDownLatch可以看成是一种进阶版的等待/通知机制,在实际中应用比较多见。

2.2、CyclicBarrier

CyclicBarrier从字面上很容易理解,表示可循环使用的屏障,它真正的作用是让一组线程到达一个屏障时被阻塞,直到满足要求的线程数都到达屏障时,屏障才会解除,此时所有被屏障阻塞的线程就可以继续执行。

下面我们还是先看一个简单的示例,以便于更好的理解这个工具类。

public class CyclicBarrierTest {

    public static void main(String[] args) {
        // 设定参与线程的个数为 5
        int threadCount = 5;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有的线程都已经准备就绪...");
            }
        });
        for (int i = 0; i < threadCount; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("thread name:" +  Thread.currentThread().getName() + ",已达到屏障!");
                    try {
                        cyclicBarrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println("thread name:" +  Thread.currentThread().getName() + ",阻塞解除,继续执行!");
                }
            }).start();
        }
    }
}

输出结果:

thread name:Thread-0,已达到屏障!
thread name:Thread-1,已达到屏障!
thread name:Thread-2,已达到屏障!
thread name:Thread-3,已达到屏障!
thread name:Thread-4,已达到屏障!
所有的线程都已经准备就绪...
thread name:Thread-4,阻塞解除,继续执行!
thread name:Thread-0,阻塞解除,继续执行!
thread name:Thread-3,阻塞解除,继续执行!
thread name:Thread-1,阻塞解除,继续执行!
thread name:Thread-2,阻塞解除,继续执行!

从上面的示例可以很清晰的看到,CyclicBarrier中设定的线程数相当于一个屏障,当所有的线程数达到时,此时屏障就会解除,线程继续执行剩下的逻辑。

CyclicBarrier类的主要方法,有以下几个:

  • public CyclicBarrier(int parties):构造方法,parties参数表示参与线程的个数
  • public CyclicBarrier(int parties, Runnable barrierAction):核心构造方法,barrierAction参数表示线程到达屏障时的回调方法
  • public void await():核心方法,每个线程调用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞,直到屏障解除,继续执行剩下的逻辑

从以上的示例中,可以看到CyclicBarrierCountDownLatch有很多的相似之处,都能够实现线程之间的等待,但是它们的侧重点不同:

  • CountDownLatch一般用于一个或多个线程,等待其他的线程执行完任务后再执行
  • CyclicBarrier一般用于一组线程等待至某个状态,当状态解除之后,这一组线程再继续执行
  • CyclicBarrier中的计数器可以反复使用,而CountDownLatch用完之后只能重新初始化
2.3、Semaphore

Semaphore通常我们把它称之为信号计数器,它可以保证同一时刻最多有 N 个线程能访问某个资源,比如同一时刻最多允许 10 个用户访问某个服务,同一时刻最多创建 100 个数据库连接等等。

Semaphore可以用于控制并发的线程数,实际应用场景非常的广,比如流量控制、服务限流等等。

下面我们看一个简单的示例。

public class SemaphoreTest {

    public static void main(String[] args) {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        // 同一时刻仅允许最多3个线程获取许可
        final Semaphore semaphore = new Semaphore(3);
        // 初始化 5 个线程生成
        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 如果超过了许可数量,其他线程将在此等待
                        semaphore.acquire();
                        System.out.println(format.format(new Date()) +  " thread name:" +  Thread.currentThread().getName() + " 获取许可,开始执行任务");
                        // 假设执行某项任务的耗时
                        Thread.sleep(2000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        // 使用完后释放许可
                        semaphore.release();
                    }
                }
            }).start();
        }
    }
}

输出结果:

2023-11-22 17:32:01 thread name:Thread-0 获取许可,开始执行任务
2023-11-22 17:32:01 thread name:Thread-1 获取许可,开始执行任务
2023-11-22 17:32:01 thread name:Thread-2 获取许可,开始执行任务
2023-11-22 17:32:03 thread name:Thread-4 获取许可,开始执行任务
2023-11-22 17:32:03 thread name:Thread-3 获取许可,开始执行任务

从上面的示例可以很清晰的看到,同一时刻前 3 个线程获得了许可优先执行, 2 秒过后许可被释放,剩下的 2 个线程获取释放的许可继续执行。

Semaphore类的主要方法,有以下几个:

  • public Semaphore(int permits):构造方法,permits参数表示同一时间能访问某个资源的线程数量
  • acquire():获取一个许可,在获取到许可之前或者被其他线程调用中断之前,线程将一直处于阻塞状态
  • tryAcquire(long timeout, TimeUnit unit):表示在指定时间内尝试获取一个许可,如果获取成功,返回true;反之false
  • release():释放一个许可,同时唤醒一个获取许可不成功的阻塞线程。

通过permits参数的设定,可以实现限制多个线程同时访问服务的效果,当permits参数为 1 的时候,表示同一时刻只有一个线程能访问服务,相当于一个互斥锁,效果等同于synchronized

使用Semaphore的时候,通常需要先调用acquire()或者tryAcquire()获取许可,然后通过try ... finally模块在finally中释放许可。

例如如下方式,尝试在 3 秒内获取许可,如果没有获取就退出,防止程序一直阻塞。

// 尝试 3 秒内获取许可
if(semaphore.tryAcquire(3, TimeUnit.SECONDS)){
    try {
       // ...业务逻辑
    }  finally {
        // 释放许可
        semaphore.release();
    }
}
2.4、Exchanger

Exchanger从字面上很容易理解表示交换,它主要用途在两个线程之间进行数据交换,注意也只能在两个线程之间进行数据交换。

Exchanger提供了一个exchange()同步交换方法,当两个线程调用exchange()方法时,无论调用时间先后,会互相等待线程到达exchange()方法同步点,此时两个线程进行交换数据,将本线程产出数据传递给对方。

简单的示例如下。

public class ExchangerTest {

    public static void main(String[] args) {
        // 交换同步器
        Exchanger<String> exchanger = new Exchanger<>();

        // 线程1
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String value = "A";
                    System.out.println("thread name:" +  Thread.currentThread().getName() + " 原数据:" + value);
                    String newValue = exchanger.exchange(value);
                    System.out.println("thread name:" +  Thread.currentThread().getName() + " 交换后的数据:" + newValue);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 线程2
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String value = "B";
                    System.out.println("thread name:" +  Thread.currentThread().getName() + " 原数据:" + value);
                    String newValue = exchanger.exchange(value);
                    System.out.println("thread name:" +  Thread.currentThread().getName() + " 交换后的数据:" + newValue);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

输出结果:

thread name:Thread-0 原数据:A
thread name:Thread-1 原数据:B
thread name:Thread-0 交换后的数据:B
thread name:Thread-1 交换后的数据:A

从上面的示例可以很清晰的看到,当线程Thread-0Thread-1都到达了exchange()方法的同步点时,进行了数据交换。

Exchanger类的主要方法,有以下几个:

  • exchange(V x):等待另一个线程到达此交换点,然后将给定的对象传送给该线程,并接收该线程的对象,除非当前线程被中断,否则一直阻塞等待
  • exchange(V x, long timeout, TimeUnit unit):表示在指定的时间内等待另一个线程到达此交换点,如果超时会自动退出并抛超时异常

如果多个线程调用exchange()方法,数据交换可能会出现混乱,因此实际上Exchanger应用并不多见。

三、小结

本文主要围绕 Java 多线程中常见的并发工具类进行了简单的用例介绍,这些工具类都可以实现线程同步的效果,底层原理实现主要是基于 AQS 队列式同步器来实现,关于 AQS 我们会在后期的文章中再次介绍。

本文篇幅稍有所长,内容难免有所遗漏,欢迎大家留言指出!

四、参考

1.https://www.cnblogs.com/xrq730/p/4869671.html

2.https://zhuanlan.zhihu.com/p/97055716

五、写到最后

最近无意间获得一份阿里大佬写的技术笔记,内容涵盖 Spring、Spring Boot/Cloud、Dubbo、JVM、集合、多线程、JPA、MyBatis、MySQL 等技术知识。需要的小伙伴可以点击如下链接获取,资源地址:技术资料笔记。

不会有人刷到这里还想白嫖吧?点赞对我真的非常重要!在线求赞。加个关注我会非常感激!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1496490.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

嵌入式工程师必备知识点

嵌入式工程师必备知识点 一、基础知识与理论二、编程语言与工具三、嵌入式操作系统四、通信协议与接口技术五、设计与测试 嵌入式工程师作为现代电子工程领域的核心角色&#xff0c;其职责涵盖了从硬件设计、软件开发到系统测试等多个方面。为了胜任这一职位&#xff0c;嵌入式…

一次一对一服务引起的沉思和笑话

前情提要 客户需求&#xff1a; 分析页面代码和接口请求协议和参数需求&#xff0c;将人工下载视频怎么获得最终的视频链接&#xff0c;这一逻辑清晰的展示并讲解清除。我询问了是否需要成品爬虫&#xff0c;他说代码他自己能搞定。 我给的价格选择&#xff1a; 第一种、首…

mac 下redis

安装 Redis brew install redis 安装完成后&#xff0c;我们可以使用以下命令来确认 Redis 是否正确安装&#xff1a; redis-cli ping 启动 Redis redis-server 后台启动 Redis&#xff0c;可以使用以下命令&#xff1a; redis-server --daemonize yes 指定配置文件启动…

Sentinel 面试题及答案整理,最新面试题

Sentinel的流量控制规则有哪些&#xff0c;各自的作用是什么&#xff1f; Sentinel的流量控制规则主要包括以下几种&#xff1a; 1、QPS&#xff08;每秒查询量&#xff09;限流&#xff1a; 限制资源每秒的请求次数&#xff0c;适用于控制高频访问。 2、线程数限流&#xf…

C--8--解决因配置文件中字符编码不一致问题导致的错误

1、介绍&#xff1a; ASCII 1个字节1个文字 ANSI 2个字节1个文字 UNICODE 3或4个甚至更多字节1个文字 ASCII、ANSI、UNICODE这三种压缩后&#xff0c;变为UTF-8、UTF-16、UTF-32。 ASCII、ANSI、UNICODE 是字符编码&#xff0c;即给全世界每…

1.2_3 TCP/IP参考模型

文章目录 1.2_3 TCP/IP参考模型&#xff08;一&#xff09;OSI参考模型与TCP/IP参考模型&#xff08;二&#xff09;5层参考模型&#xff08;三&#xff09;5层参考模型的数据封装与解封装 1.2_3 TCP/IP参考模型 &#xff08;一&#xff09;OSI参考模型与TCP/IP参考模型 TCP/I…

open-spider开源爬虫工具:抖音数据采集

在当今信息爆炸的时代&#xff0c;网络爬虫作为一种自动化的数据收集工具&#xff0c;其重要性不言而喻。它能够帮助我们从互联网上高效地提取和处理数据&#xff0c;为数据分析、市场研究、内容监控等领域提供支持。抖音作为一个全球性的短视频平台&#xff0c;拥有海量的用户…

哈希专题 - leetcode 1. 两数之和 - 简单难度

leetcode 1. 两数之和 leetcode 1. 两数之和 简单难度 哈希1. 题目详情1. 原题链接2. 基础框架 2. 解题思路1. 题目分析2. 算法原理3. 时间复杂度 3. 代码实现4. 知识与收获 leetcode 1. 两数之和 简单难度 哈希 1. 题目详情 给定一个整数数组 nums 和一个整数目标值 target…

万马合一之js解答

输入m和n 两个数&#xff0c;m和n表示一个mn 的棋盘。输入棋盘内的数据。棋盘中存在数字和".“两种字符&#xff0c;如果是数字表示Q该位置是一匹马&#xff0c;如果是”."表示该位置为空的&#xff0c;棋盘内的数字表示为该马能走的最大步数。例如棋盘内某个位置一个…

ICCV 2023 超分辨率(Super-Resolution)论文汇总

文章目录 图像超分辨率&#xff08;Image Super-Resolution&#xff09; 1、经典图像超分辨率&#xff08;Classical image SR&#xff09; 2、基于参考的图像超分辨率&#xff08;Reference-Based image SR&#xff09; 3、高效&轻量化图像超分辨率&#xff08;Efficient/…

Unity引擎关于APP后台下载支持的实现问题

1&#xff09;Unity引擎关于APP后台下载支持的实现问题 2&#xff09;Prefab对DLL中脚本的引用丢失 3&#xff09;Unity DOTS资源加载问题 4&#xff09;UnitySendMessage和_MultiplyMatrixArrayWithBase4x4_NEON调用导致崩溃 这是第376篇UWA技术知识分享的推送&#xff0c;精选…

Unity背景模糊图片高斯模糊高性能的实现方案

环境&#xff1a; unity2021.3.x 效果&#xff1a; 模糊前&#xff1a; 模糊后&#xff1a; 模糊前&#xff1a; 模糊后&#xff1a; 实现核心思路(shader)&#xff1a; SubShader {CGINCLUDE#include "UnityCG.cginc"sampler2D _MainTex; // 主纹理half4 _MainTe…

CubeMX入门教程(1)——环境搭建

目录 1、为什么要用CubeMX&#xff1f; 2、如何安装CubeMX? 3、HAL固件库的安装 1、为什么要用CubeMX&#xff1f; STM32CubeMX 是 ST 意法半导体近几年来大力推荐的STM32 芯片图形化配置工具&#xff0c;目的就是为了方便开发者&#xff0c; 允许用户使用图形化向导生成C …

BUUCTF---[极客大挑战 2019]BabySQL1

1.这道题和之前做的几道题是相似的&#xff0c;这道题考的知识点更多。难度也比之前的大一些 2.尝试万能密码 or 1#发现过滤了or,使用1和1,发现他对单引号也进行了过滤。于是我尝试进行双写绕过&#xff0c;发现可以通过了。 3.由之前的做题经验可知&#xff0c;这道题会涉及到…

wps没保存关闭了怎么恢复数据?恢复文件教程

Microsoft Word是我们不可或缺的工具。很多小伙伴都遇到在WPS中编辑文件时&#xff0c;它可能会突然闪退&#xff0c;或者忘记及时保存文件就直接关闭了&#xff0c;导致我们辛苦编辑的文档丢失。面对这种情况我们该如何应对&#xff0c;尽量减小损失呢&#xff1f;接下来让我为…

Java - Spring MVC 实现跨域资源 CORS 请求

据我所知道的是有三种方式&#xff1a;Tomcat 配置、拦截器设置响应头和使用 Spring MVC 4.2。 设置 Tomcat 这种方式就是引用别人封装好的两个 jar 包&#xff0c;配置一下web.xml就行了。我也并不推荐&#xff0c;这里放两个我在网上找到的配置相关文章&#xff0c;感兴趣可…

遥感分析时什么情况下需要做大气校正?

经常会遇到这样的问题&#xff1a;什么情况需要做大气校正产生&#xff1f;这个问题取决于传感器和应用目标&#xff0c;总的来说&#xff0c;如果要做光谱分析&#xff0c;那么大气校正是必须要做的。本文对于在什么情况下选择什么样的大气校正方法&#xff0c;给出了一些依据…

(3)(3.3) MAVLink高延迟协议

文章目录 前言 1 配置 2 说明 3 消息说明 前言 ArduPilot 支持 MAVLink 高延迟协议(MAVLink High Latency)。该协议专为卫星或 LoRA 等低带宽或高成本链路而设计。 在此协议中&#xff0c;每 5s 只发送一次 HIGH_LATENCY2 MAVLink 信息。对 MAVLink 命令或请求&#xff08…

今日头条 _signature逆向分析

声明&#xff1a;本文仅作学习交流&#xff0c;请遵守法律法规&#xff0c;不要恶意爬取网站。 网址&#xff1a;aHR0cHM6Ly93d3cudG91dGlhby5jb20v 接口&#xff1a;aHR0cHM6Ly93d3cudG91dGlhby5jb20vaG90LWV2ZW50L2hvdC1ib2FyZC8 本文提到的接口是头条的今日热榜接口&…

linux安装ngnix

一、将nginx-1.20.1.tar.gz上传至linux服务器目录下 二、将nginx安装包解压到/usr/local目录下 tar -zxvf /home/local/nginx-1.20.1.tar.gz -C /usr/local/三、预先安装依赖 yum -y install pcre-devel yum -y install openssl openssl-devel yum -y install gcc gcc-c auto…