研发必会-异步编程利器之CompletableFuture(含源码 中)

news2025/6/20 10:44:26

近期热推文章:

    1、springBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表;

    2、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据;

    3、基于Redis的Geo实现附近商铺搜索(含源码)

    4、基于Redis实现关注、取关、共同关注及消息推送(含源码)

    5、SpringBoot整合多数据源,并支持动态新增与切换(详细教程)

    6、基于Redis实现点赞及排行榜功能

一、多任务组合回调

备注:源码获取方式在文底。

1.1、AND组合关系

thenCombine / thenAcceptBoth / runAfterBoth都表示:将两个CompletableFuture组合起来只有这两个都正常执行完了,才会执行某个任务。也即:当任务一和任务二都完成再执行任务三(异步任务)。

区别在于:

    1、runAfterBoth:不会把执行结果当做方法入参,且没有返回值。

    2、thenAcceptBoth:会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值。

    3、thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值。

代码案例:

/**     * 功能描述:多任务组合回调:AND组合关系     * @MethodName: testCompleteAnd     * @MethodParam: []     * @Return: void     * @Author: yyalin     * @CreateDate: 2023/10/11 17:30     */    public void  testCompleteAnd() throws ExecutionException, InterruptedException {        //创建线程池        ExecutorService executorService = Executors.newFixedThreadPool(10);        long startTime = System.currentTimeMillis();        //1、使用自定义线程池,开启异步任务01        CompletableFuture<Integer> supplyAsyncRes01=CompletableFuture.supplyAsync(()->{            int res=1;            try {                //执行任务1 开始执行任务01,当前线程为:12                log.info("开始执行任务01,当前线程为:"+Thread.currentThread().getId());                //执行具体的事务                Thread.sleep(600);                res+=1; //模拟加1            } catch (InterruptedException e) {                e.printStackTrace();            }            //返回结果            return res;        },executorService);        //2、使用自定义线程池,开启异步任务02        CompletableFuture<Integer> supplyAsyncRes02=CompletableFuture.supplyAsync(()->{            int res=1;            try {                //执行任务02 开始执行任务02,当前线程为:13                log.info("开始执行任务02,当前线程为:"+Thread.currentThread().getId());                //执行具体的事务                Thread.sleep(600);                res+=2; //模拟加2            } catch (InterruptedException e) {                e.printStackTrace();            }            //返回结果            return res;        });        //3、任务02:将任务1与任务2开始任务组合        CompletableFuture<Integer> thenCombineAsyncRes=supplyAsyncRes01.thenCombineAsync(supplyAsyncRes02,(res01, res02)->{            //始执行任务03,当前线程为:14            log.info("开始执行任务03,当前线程为:"+Thread.currentThread().getId());            log.info("任务01返回值:"+res01);            log.info("任务02返回值:"+res02);            //任务组合返回值 可以拿到任务01和任务02的返回结果进行相关操作,然后统一返回结果            return res01+res02;        },executorService);        //4、最终返回结果        log.info("最终返回结果为:"+thenCombineAsyncRes.get());        log.info("总共用时" + (System.currentTimeMillis() - startTime) + "ms");    }

运行结果:

1.2、OR组合关系

将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务。(两个任务,只要有一个任务完成,就执行任务三

区别在于:

    1、runAfterEither:不会把执行结果当做方法入参,且没有返回值。

    2、acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值。

    3、applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值。(个人推荐)

参考代码:

 /**     * 功能描述:OR组合关系     * @MethodName: testCompleteOr     * @MethodParam: []     * @Return: void     * @Author: yyalin     * @CreateDate: 2023/10/11 18:14     */    public void  testCompleteOr(){        //创建线程池        ExecutorService executorService = Executors.newFixedThreadPool(10);        long startTime = System.currentTimeMillis();        //1、使用自定义线程池,开启异步任务01        CompletableFuture<Integer> supplyAsyncRes01=CompletableFuture.supplyAsync(()->{            int res=1;            try {                //执行任务1 开始执行任务01,当前线程为:12                log.info("开始执行任务01,当前线程为:"+Thread.currentThread().getId());                //执行具体的事务                Thread.sleep(600);                res+=2; //模拟加1            } catch (InterruptedException e) {                e.printStackTrace();            }            //返回结果            return res;        },executorService);        //2、使用自定义线程池,开启异步任务02        CompletableFuture<Integer> supplyAsyncRes02=CompletableFuture.supplyAsync(()->{            int res=1;            try {                //执行任务02 开始执行任务02,当前线程为:13                log.info("开始执行任务02,当前线程为:"+Thread.currentThread().getId());                //执行具体的事务                Thread.sleep(600);                res+=3; //模拟加2            } catch (InterruptedException e) {                e.printStackTrace();            }            //返回结果            return res;        },executorService);        //3、任务组合or        supplyAsyncRes01.acceptEitherAsync(supplyAsyncRes02,(res)->{            try {                log.info("开始执行任务03,当前线程为:"+Thread.currentThread().getId());                //执行具体的事务                Thread.sleep(600);                log.info("上一个任务返回值:"+res);                log.info("总共用时" + (System.currentTimeMillis() - startTime) + "ms");            } catch (InterruptedException e) {                e.printStackTrace();            }        },executorService);    }

返回结果:

若将异步任务02中的Thread.sleep(600)改为300,将输出的结果为:

从结果中不难对比发现,任务03的参数是任务01和任务02中执行最快的返回结果。

注意:若把核心线程数量改为1,会是什么样的呢?

ExecutorService executorService = Executors.newFixedThreadPool(1);

运行结果:

从上面看出,改为1就变成单线程执行了。

1.3、多任务组合(allOf\anyOf)

  1.allOf:等待所有任务都执行完成后,才会执行 allOf 返回的CompletableFuture。如果任意一个任务异常,allOf的CompletableFuture,执行get方法,会抛出异常。(等待所有任务完成才会执行)

  2.anyOf:任意一个任务执行完,就执行anyOf返回的CompletableFuture。如果执行的任务异常,anyOf的CompletableFuture,执行get方法,会抛出异常。(只要有一个任务完成)

参考案例:

public void testAllOfOrAnyOf() throws ExecutionException, InterruptedException {        //创建线程池        ExecutorService executorService = Executors.newFixedThreadPool(10);        long startTime = System.currentTimeMillis();        //1、使用自定义线程池,开启异步任务01        CompletableFuture<Integer> supplyAsyncRes01=CompletableFuture.supplyAsync(()->{            int res=1;            try {                //执行任务1 开始执行任务01,当前线程为:12                log.info("开始执行任务01,当前线程为:"+Thread.currentThread().getId());                //执行具体的事务                Thread.sleep(600);                res+=3; //模拟加1            } catch (InterruptedException e) {                e.printStackTrace();            }            //返回结果            return res;        },executorService);        //2、使用自定义线程池,开启异步任务02        CompletableFuture<Integer> supplyAsyncRes02=CompletableFuture.supplyAsync(()->{            int res=1;            try {                //执行任务02 开始执行任务02,当前线程为:13                log.info("开始执行任务02,当前线程为:"+Thread.currentThread().getId());                //执行具体的事务                Thread.sleep(600);                res+=4; //模拟加2            } catch (InterruptedException e) {                e.printStackTrace();            }            //返回结果            return res;        },executorService);        //3、使用自定义线程池,开启异步任务03        CompletableFuture<Integer> supplyAsyncRes03=CompletableFuture.supplyAsync(()->{            int res=1;            try {                //执行任务02 开始执行任务02,当前线程为:13                log.info("开始执行任务03,当前线程为:"+Thread.currentThread().getId());                //执行具体的事务                Thread.sleep(600);                res+=5; //模拟加2            } catch (InterruptedException e) {                e.printStackTrace();            }            //返回结果            return res;        },executorService);        //4、开始任务组合        CompletableFuture<Void> allOfRes=CompletableFuture.allOf(supplyAsyncRes01,supplyAsyncRes02,supplyAsyncRes03);        //等待所有任务完成        log.info("所有任务执行完成,组合后返回结果为:"+allOfRes.get());        //获取所有任务的返回结果        log.info("任务01返回值:"+supplyAsyncRes01.get());        log.info("任务02返回值:"+supplyAsyncRes02.get());        log.info("任务03返回值:"+supplyAsyncRes03.get());        log.info("总共用时" + (System.currentTimeMillis() - startTime) + "ms");    }

结果返回:

从结果中看出:等待所有任务都执行完成后,才会执行 allOf 返回的CompletableFuture。

同理anyOf,只需要调整代码:

 CompletableFuture<Object> allOfRes=CompletableFuture.anyOf(supplyAsyncRes01,supplyAsyncRes02,supplyAsyncRes03);

运行结果:

1.4、thenCompose

thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例。

1、如果该CompletableFuture实例的result不为null,则返回一个基于该result新的CompletableFuture实例;

2、如果该CompletableFuture实例为null,然后就执行这个新任务。

代码案例:

    /**     * 功能描述:thenCompose     * @MethodName: testThenCompose     * @MethodParam: []     * @Return: void     * @Author: yyalin     * @CreateDate: 2023/10/12 9:38     */    public void testThenCompose() throws ExecutionException, InterruptedException {        CompletableFuture<String> res01=CompletableFuture.completedFuture("任务01");        ExecutorService executor = Executors.newSingleThreadExecutor();        //第二个任务 在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法,        // 该方法会返回一个新的CompletableFuture实例。        CompletableFuture<String> futureRes =CompletableFuture.supplyAsync(()-> "第二个任务02"        ,executor).thenComposeAsync(data->{            log.info("data数据为:"+data);            return res01;        },executor);        log.info("最终返回:"+futureRes.get());        executor.shutdown();    }

结果:

、使用注意点

CompletableFuture 使异步编程更加便利的、代码更加优雅的同时,也要关注使用的一些注意点。

2.1、Future需要获取返回值,才能获取异常信息

代码案例:

/**     * 功能描述:使用注意点     * @MethodName: testFuture     * @MethodParam: []     * @Return: void     * @Author: yyalin     * @CreateDate: 2023/10/12 9:54     */    public void testFuture() throws ExecutionException, InterruptedException {        //自定义线程池        ExecutorService executorService = new ThreadPoolExecutor(                5,                10,                5L,                TimeUnit.SECONDS,                new ArrayBlockingQueue<>(10));        //创建任务        CompletableFuture<Void> res01=CompletableFuture.supplyAsync(()->{            int sum=1/0;            return "分母不能为0";        },executorService).thenAccept((res)->{  //3、异常捕获            log.info("系统出现异常,需要处理:"+res);        });        log.info("返回结果:"+res01.get());    }

输出结果:

Future需要获取返回值(res01.get()),才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。使用的时候,注意一下,考虑是否加try…catch…或者使用exceptionally方法。

若改成exceptionally方法,无需get或join也可以捕获异常信息:

 CompletableFuture<String> res01=CompletableFuture.supplyAsync(()->{            int sum=1/0;            return "分母不能为0";        },executorService).exceptionally((throwable)->{  //3、异常捕获            log.info("系统出现异常,需要处理:"+throwable.getMessage());            return "00";        });//        log.info("返回结果:"+res01.get());

结果:

2.2、CompletableFuture的get()方法是阻塞的

CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间

推荐使用:

log.info("返回结果:"+res01.get(5,TimeUnit.SECONDS));

2.3、建议使用自定义线程池,不要使用默认的

CompletableFuture代码中使用了默认的线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数

参考案例:

 //自定义线程池ExecutorService executorService = new ThreadPoolExecutor(                 5,                 10,                 5L,                TimeUnit.SECONDS,                new ArrayBlockingQueue<>(10));

但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离。

/*** 参数信息:* int corePoolSize     核心线程大小* int maximumPoolSize  线程池最大容量大小* long keepAliveTime   线程空闲时,线程存活的时间* TimeUnit unit        时间单位* BlockingQueue<Runnable> workQueue  任务队列。一个阻塞队列* AbortPolicy(默认):直接抛弃*/ThreadPoolExecutor pool = new ThreadPoolExecutor(4,        4,        0L,        TimeUnit.MILLISECONDS,        new LinkedBlockingDeque<>(10),        new ThreadPoolExecutor.AbortPolicy());

说明:

AbortPolicy(默认):直接抛弃

CallerRunsPolicy:用调用者的线程执行任务

DiscardOldestPolicy:抛弃队列中最久的任务

DiscardPolicy:抛弃当前任务。

三、源码获取方式

     更多优秀文章,请关注个人微信公众号或搜索“程序猿小杨”查阅。然后回复:源码,可以获取对应的源码,开箱即可使用。

       如果大家对相关文章感兴趣,可以关注微信公众号"程序猿小杨",会持续更新优秀文章!欢迎大家 分享、收藏、点赞、在看,您的支持就是我坚持下去的最大动力!谢谢!


参考网站:

https://blog.csdn.net/ThinkWon/article/details/123390393

https://mp.weixin.qq.com/s/shjANruBk6VL492JaWLTEg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1086795.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Avalonia常用小控件Charts

1.项目下载地址&#xff1a;https://gitee.com/confusedkitten/avalonia-demo 2.UI库Semi.Avalonia&#xff0c;项目地址 https://github.com/irihitech/Semi.Avalonia 3.Charts库&#xff0c;LiveChartsCore.SkiaSharpView.Avalonia&#xff0c;Nuget获取只有预览库&#x…

Vue3模块找不到问题解决:找不到模块‘vue ‘。你的意思是将“模块解决方案”选项设置为“节点”,还是添加ali

Vue3 vite 项目引入 vue 报错 Cannot find module ‘vue‘. Did you mean to set the ‘moduleResolution‘ option to ‘node‘, or to add ali 在项目中找到 tsconfig.json 文件 找到配置项里的 "moduleResolution": "bundler", 将其改成 &q…

简述WPF中MVVM的设计思想

近年来&#xff0c;随着WPF在生产、制造、工控等领域应用越来越广泛&#xff0c;对WPF的开发需求也在逐渐增多&#xff0c;有很多人不断的从Web、WinForm开发转向了WPF开发。 WPF开发有很多新的概念及设计思想&#xff0c;如数据驱动、数据绑定、依赖属性、命令、控件模板、数…

win10 配置静态IP脚本

静态IP配置方法&#xff1a; 1、新建一个txt文本&#xff0c;加入以下代码 netsh interface ip set address name"以太网" sourcestatic addr192.168.101.11 mask255.255.255.0 gateway192.168.101.12、将TXT另存为ANSI编码&#xff01;将TXT另存为ANSI编码&#x…

2023年中国舞台烟雾机产量、销量及市场规模分析[图]

舞台烟雾机是一种用于舞台表演和演出的设备&#xff0c;它能够产生各种形式的烟雾效果&#xff0c;以增强舞台表演的视觉效果和氛围。舞台烟雾机通常由气泵、烟雾发生器、控制器和烟雾管道等组成&#xff0c;可以通过控制器调节烟雾的浓度、颜色和流量&#xff0c;以满足不同演…

C++ 位图与布隆过滤器

目录 前言位图场景演示应用场景模拟实现问题例题 布隆过滤器例子理解应用 例题 前言 位图与布隆过滤器是用来在海量数据中判断一个数据在不在的问题的数据结构&#xff0c;这种数据结构在存储空间上大大的优于红黑树、哈希等数据结构 位图 我们为了处理一个数据在海量数据中…

Linux CentOS8安装gitlab_ce步骤

1 下载安装包 wget --content-disposition https://packages.gitlab.com/gitlab/gitlab-ce/packages/el/8/gitlab-ce-15.0.2-ce.0.el8.x86_64.rpm/download.rpm2 安装gitlab yum install policycoreutils-python-utilsrpm -Uvh gitlab-ce-15.0.2-ce.0.el8.x86_64.rpm3 更新配…

Gpt-4多模态功能强势上线,景联文科技多模态数据采集标注服务等您来体验!

就在上个月&#xff0c;OpenAI 宣布对ChatGPT 进行重大更新&#xff0c;该模型不仅能够通过文字输入进行识别和分析&#xff0c;还能够通过语音、图像甚至视频等多种模态的输入来获取、识别、分析和输出信息。这一重要技术突破&#xff0c;将促进多模态自然语言处理的发展&…

ESP32-WROOM-32无法进入下载模式进行程序上传的问题

结论 先说结论&#xff0c;ESP32-WROOM-32无法进入下载模式通过串口进行程序上传&#xff0c;可能是GPIO2引脚没有通过下拉电阻拉低&#xff0c;导致无法进入正确的启动模式。 启动模式 ESP32启动时会打印rst:0x1 (POWERON_RESET),boot:0x13 (SPI_FAST_FLASH_BOOT) 复位源rs…

mysql面试题49:MySQL中不同text数据类型的最大长度

该文章专注于面试&#xff0c;面试只要回答关键点即可&#xff0c;不需要对框架有非常深入的回答&#xff0c;如果你想应付面试&#xff0c;是足够了&#xff0c;抓住关键点 面试官&#xff1a;MySQL中TEXT数据类型的最大长度 在MySQL中&#xff0c;TEXT数据类型用于存储较大…

idea怎么设置作者信息(详细)

目录 一&#xff1a;在Java类的开头自动注释作者名字和日期等信息 二&#xff1a;给Java的方法注释作者名字和日期等信息 1. 不可修改的模板&#xff1a;Postfix Completion 2. 可修改的模板&#xff1a;Live Templates tips&#xff1a;首先给大家推荐两款好用的免费软件&…

【Linux】多进程编程

目录 1. 进程基础知识 2. 查看进程 3. 杀死进程 4. 获取进程标识符 5. 进程创建 6. 进程终止 7. 进程等待 8. 进程程序替换 9. 进程间通信之管道 9.1 匿名管道 9.2 命名管道&#xff08;FIFO&#xff09; 10. 进程间通信之共享内存 11. 进程间通信之信号 11.1 Li…

Linux传统跨进程通信原理

文章目录 前言一、进程隔离二、进程空间划分&#xff1a;用户空间(User Space)/内核空间(Kernel Space)三、系统调用&#xff1a;用户态与内核态四、Linux下传统IPC跨进程通信原理1、发送进程通过系统调用&#xff0c;将需要发送的数据拷贝到Linux进程的内核空间中的缓存区(数据…

百度智能云千帆大模型平台 2.0 产品技术解析

本文整理自 2023 年 9 月 5 日百度云智大会 - 智能计算&大模型技术分论坛&#xff0c;百度智能云 AI &大数据平台总经理忻舟的主题演讲《百度智能云千帆大模型平台 2.0 产品技术解析》。 这是关于技术主题的论坛&#xff0c;我首先问大家三个开发者的小问题。 第一个问…

tez作业运行慢

文章目录 问题现象&#xff1a;排查思路查看task运行概况查看map和reduce container的日志初步结论 继续排查container数量差异大分片计算异常 结论 问题现象&#xff1a; 每天调度的一个任务在某天突然运行时长多了好几倍&#xff0c;平时30m左右&#xff0c;那天运行了4个小…

Ubuntu 22.04‘Temporary failure resolving‘ 解决方案

终极解决方案 首先安装resolvconf sudo apt-get install resolvconf 使用 cd /etc/resolvconf/resolv.conf.d/ 进入文件夹&#xff0c;使用 ls 查看目录&#xff0c;会显示 base head tail 使用 sudo vim base 编辑base文件&#xff0c; 进入时为空&#xff0c;点击 i 添加 …

【架构艺术】(零) 环境搭建

写在前面 今天尝试了如systemC,Chisel,MyHDL等方式来进行功能仿真&#xff0c;并生成波形到Wavedrom格式&#xff0c;后来发现对于学习这些简单架构&#xff0c;还是脑子里面根据规则进行仿真或者是编写verilog代码进行仿真即可。 所以我们的环境依赖只有&#xff1a;安装waved…

【PostgreSQL启动,停止命令(重启)】

找到 /usr/lib/systemd/system文件夹路径看是否包含 postgresql服务 关闭服务&#xff1a; systemctl stop postgresql-12.service启动服务 systemctl start postgresql-12.service重启服务 systemctl restart postgresql-12查看状态 systemctl status postgresql-12.servi…

区分Cookie,Session,Token

Cookie 由于HTTP 协议是一个无状态协议&#xff0c;客户端向服务器发请求&#xff0c;服务器返回响应。并且你每次都要输入账号和密码进行登录&#xff0c;对于用户来说非常的麻烦&#xff01;这种背景下&#xff0c;就产生了 Cookie cookie 存储在客户端&#xff1a; cookie…

Essential Steps in Natural Language Processing (NLP)

&#x1f497;&#x1f497;&#x1f497;欢迎来到我的博客&#xff0c;你将找到有关如何使用技术解决问题的文章&#xff0c;也会找到某个技术的学习路线。无论你是何种职业&#xff0c;我都希望我的博客对你有所帮助。最后不要忘记订阅我的博客以获取最新文章&#xff0c;也欢…