【并发编程】异步编程CompletableFuture实战

news2025/6/18 5:14:19

文章目录

        • 1.CompletableFuture简介
        • 2.CompletableFuture核心API实战
        • 3.CompletableFuture嵌套案例实战
        • 4.合并两个CompletableFuture案例实战
        • 5.多个CompletableFuture任务组合调度实战

1.CompletableFuture简介

在JDK8之前,我们使用的Java多线程变成,主要是 Thread+Runnable 来完成,但是这种方式有个弊端就是没有返回值。如果想要返回值怎么办呢,大多数人就会想到 Callable + Thread 的方式来获取到返回值。

在这里插入图片描述

通过上面的类继承关系图可以知道 CompletableFuture 实现了 Future 接口和 CompletionStage 。因此 CompletableFuture是对 Futrue的功能增强包含了Future的功能。从继承的另一个 CompletionStage 的名称来看完成阶段性的接口。

CompletableFuture的核心用途:

  • 在项目开发中,由于业务规划逻辑的原因,业务需要从多个不同的地方获取数据,
  • 然后汇总处理为最终的结果,再返回给请求的调用方,就是聚合信息处理类的处理逻辑
  • 如果常用串行请求,则接口响应时间长;利用CompletableFuture则可以大大提升性能
  • 针对多任务,需要进行任务编排调度,也可以使用CompletableFuture进行完成

CompletableFuture类实现了Future和CompletionStage接口,相当于一个Task编排工具

  • Future

    • 表示异步计算的结果,它提供了检查计算是否完成的方法,以等待计算的完成
    • 计算完成后只能使用 get 方法来获取结果,有cancel、get、isDone、isCancelled等方法
  • CompletionStage

    • 是Java8新增接口,用于异步执行中的阶段处理,CompletableFuture是其中的一个实现类
    • 对任务处理可以构造一条结果传递链,在结果传递过程中任何一个CompletionStage都可以对结果进行处理
      • 包括异常处理、类型转换,可以构造非常简单的传递链也可以构造很复杂的传递链
    • 几个CompletionStage可以串联起来,一个完成的阶段可以触发下一阶段的执行
  • 当前的Task到底由那个Thread执行,使用的不好可能会有性能问题, 根据CompletableFuture的方法命名可以掌握

    • 不带Async的方法,比如thenAccept,表示该方法将继续在当前执行CompletableFuture的方法线程中执行
    • 带Async的方法,比如thenAcceptAsync,表示异步,在线程池中执行
    • 在没有指定线程池的情况下,使用的是CompletableFuture内部的线程池 ForkJoinPool ,线程数默认是 CPU 的核心数
      • 一般不要所有业务共用一个线程池,避免有任务执行一些很慢的 I/O 操作,
      • 会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,影响整个系统的性能

方法API

  • CompletableFuture静态方法,执行异步任务的API
//无返回值,默认使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码
public static CompletableFuture<Void>   runAsync(Runnable runnable)

//无返回值,可以自定义线程池
public static CompletableFuture<Void>  runAsync(Runnable runnable, Executor executor)


//有返回值,默认使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)

//有返回值,可以自定义线程池
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)
  • CompletableFuture对象,获取结果的API
//如果返回值没有返回,一直阻塞
V get()

//设置等待超时的时间
V get(long timeout,Timeout unit);

//有返回值就返回, 线程抛出异常就返回设置的默认值
T getNow(T defaultValue);
  • CompletableFuture对象,其他重点API
//方法无返回值,当前任务正常完成以后执行,当前任务的执行结果可以作为下一任务的输入参数
thenAccept

//方法有返回值,当前任务正常完成以后执行,当前任务的执行的结果会作为下一任务的输入参数
thenApply

//对不关心上一步的计算结果,执行下一个操作
thenRun

2.CompletableFuture核心API实战

(1)supplyAsync方法实战,有返回值,默认使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。

    public static void testCompletableFuture1() throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
            return "lixiang";
        });
        System.out.println(future.get());
    }

在这里插入图片描述

(2)runAsync方法实战,无返回值

    public static void testCompletableFuture2() throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
            System.out.println("runAsync方法执行。。。");
        });
        System.out.println(future.get());
    }

在这里插入图片描述

(3)thenApply 组合调度,能拿到上步执行的结果,并且当前执行完有任务返回值的

    public static void testCompletableFuture3() throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
            System.out.println("supplyAsync方法执行。。。");
            return "lixiang";
        }).thenApply((ele)->{
            System.out.println("thenApply方法执行。。。,拿到上一步的执行结果:"+ele);
            return ele + " is a java工程师";
        });
        System.out.println(future.get());
    }

在这里插入图片描述

(4)thenAccept 组合调度,能拿到上步执行的结果,当前执行完无任务返回值的

    public static void testCompletableFuture4() throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(()->{
            System.out.println("supplyAsync方法执行。。。");
            return "lixiang";
        }).thenAccept((ele)->{
            System.out.println("thenAccept方法执行。。。,拿到上一步的执行结果:"+ele);
        });
        System.out.println(future.get());
    }

在这里插入图片描述

3.CompletableFuture嵌套案例实战

  • 需求

    • 日常的任务中,通常定义的方法都会返回 CompletableFuture 类型,方便后续操作
    • 然后将该任务的执行结果Future作为方法入参然后执行指定的方法, 返回一个新的CompletableFuture
    • 任务它们之间存在着业务逻辑上的先后顺序
  • thenCompose

    • 用来连接两个CompletableFuture,是生成一个新的CompletableFuture,用于组合多个CompletableFuture
    • 也可以使用 thenApply() 方法来描述关系,但返回的结果就会发生 CompletableFuture 的嵌套
      • CompletableFuture<CompletableFuture< Product >> 这样的情况,需要get两次

(1)编写商品类

@Data
public class Product {

    private int id;

    private String title;

    private String detail;
}

(2)模拟商品service

public class ProductService {

    private static final Map<Integer,String> map = new HashMap<>();
    static {
        map.put(1,"iphone14");
        map.put(2,"iphone 蓝牙耳机");
        map.put(3,"Mac Book Pro-详情图内容");
        map.put(4,"小香风深蓝色大衣");
        map.put(5,"清热解火菊花茶");
        map.put(6,"补肝养肾枸杞大枣茶");
        map.put(7,"颈椎病康复指南");
    }

    public String getById(int id){
        try {
            Thread.sleep(1000);
            System.out.println("ProductService#getById方法运行线程:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return map.get(id);
    }

}

(3)模拟商品详情service

public class ProductDetailService {

    private static final Map<Integer,String> map = new HashMap<>();
    static {
        map.put(1,"iphone14-详情图内容");
        map.put(2,"iphone 蓝牙耳机-详情图内容");
        map.put(3,"Mac Book Pro-详情图内容");
        map.put(4,"小香风深蓝色大衣-详情图内容");
        map.put(5,"清热解火菊花茶-详情图内容");
        map.put(6,"补肝养肾枸杞大枣茶-详情图内容");
        map.put(7,"颈椎病康复指南-详情图内容");
    }

    public String getById(int id){
        try {
            Thread.sleep(1000);
            System.out.println("DetailService # getById方法运行线程:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return map.get(id);
    }
}

(4)测试方法,正常用thenApply,需要get两次

    public static void test1() throws ExecutionException, InterruptedException {

        ProductService productService = new ProductService();

        ProductDetailService productDetailService = new ProductDetailService();

        //第一步异步 ,第二部异步
        CompletableFuture<CompletableFuture<Product>> future = CompletableFuture.supplyAsync(() -> {
            Product product = new Product();
            String title = productService.getById(1);
            product.setId(product.getId());
            product.setTitle(title);
            return product;
        }).thenApply(product -> CompletableFuture.supplyAsync(() -> {
            String detail = productDetailService.getById(1);
            product.setDetail(detail);
            return product;
        }));

        //这块 获取 商品信息要get两次
        System.out.println("获取商品信息:"+future.get().get());

    }

(5)测试方法,用thenCompose,只需要get一次即可

	public static void test2() throws ExecutionException, InterruptedException {

        ProductService productService = new ProductService();

        ProductDetailService productDetailService = new ProductDetailService();

        //第一步异步 ,第二部异步
        CompletableFuture<Product> compose = CompletableFuture.supplyAsync(() -> {
            Product product = new Product();
            String title = productService.getById(1);
            product.setId(product.getId());
            product.setTitle(title);
            return product;
        }).thenCompose(product -> CompletableFuture.supplyAsync(() -> {
            String detail = productDetailService.getById(1);
            product.setDetail(detail);
            return product;
        }));

        //这块 获取 商品信息要get两次
        System.out.println("获取商品信息:"+compose.get());

    }

在这里插入图片描述

4.合并两个CompletableFuture案例实战

  • 需求

    • 需要请求两个个接口,然后把对应的CompletableFuture进行合并,返回一个新的CompletableFuture
  • thenCombine

    • 在两个任务都执行完成后,把两个任务的结果合并,有返回值。
  • 编码实战

	public static void test3() throws ExecutionException, InterruptedException {
        ProductService productService = new ProductService();
        ProductDetailService detailService = new ProductDetailService();

        int id = 1;
        //第1个任务
        CompletableFuture<Product> baseProductFuture = CompletableFuture.supplyAsync(() -> {
            String title = productService.getById(id);
            Product product = new Product();
            product.setTitle(title);
            product.setId(id);
            return product;
        });

        //第2个任务
        CompletableFuture<Product> detailProductFuture = CompletableFuture.supplyAsync(() -> {
            String detail = detailService.getById(id);
            Product product = new Product();
            product.setDetail(detail);
            product.setId(id);
            return product;
        });

        CompletableFuture<Product> compose = baseProductFuture.thenCombine(detailProductFuture,
                (base, detail) -> {
                    base.setDetail(detail.getDetail());
                    return base;
                });
        System.out.println(compose.get());
    }

在这里插入图片描述

  • thenAccepetBoth
    • 在两个任务都执行完成后,把两个任务的结果合并,有返回值。
  • 编码实战
	public static void test3() throws ExecutionException, InterruptedException {
        ProductService productService = new ProductService();
        ProductDetailService detailService = new ProductDetailService();

        int id = 1;
        //第1个任务
        CompletableFuture<Product> baseProductFuture = CompletableFuture.supplyAsync(() -> {
            String title = productService.getById(id);
            Product product = new Product();
            product.setTitle(title);
            product.setId(id);
            return product;
        });

        //第2个任务
        CompletableFuture<Product> detailProductFuture = CompletableFuture.supplyAsync(() -> {
            String detail = detailService.getById(id);
            Product product = new Product();
            product.setDetail(detail);
            product.setId(id);
            return product;
        });

        CompletableFuture<Void> acceptBoth = baseProductFuture.thenAcceptBoth(detailProductFuture, (base, detail) -> {
            base.setDetail(detail.getDetail());
            System.out.println(base);
        });
        System.out.println(acceptBoth.get());
    }

在这里插入图片描述

5.多个CompletableFuture任务组合调度实战

背景

  • 前面学习处理两个 Future 的关系,如果超过两个Future,如何处理他们的一些聚合关系呢
  • 方法 allOf 和 anyOf
    • 两个函数都是静态函数,参数是变长的 CompletableFuture 的集合,前者是「与」,后者是「或」
    • allOf
      • 返回值是 CompletableFuture< Void >类型
      • 因为allOf没有返回值,所以通过thenApply,获取每个 CompletableFuture 的执行结果
    • anyOf
      • 只要有任意一个 CompletableFuture 结束,就可以做接下来的事情,不像 allOf 要等待所有的 CompletableFuture 结束
      • 每个 CompletableFuture 的返回值类型都可能不同,无法判断是什么类型
      • 所以 anyOf 的返回值是 CompletableFuture< Object >类型

(1)allOf编码实战

	public static void testAllOf() throws Exception {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("future1执行完成");
            return "future1执行完成";
        });


        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("future2执行完成");
            return "future2执行完成";
        });

        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("future3执行完成");
            return "future3执行完成";
        });

        CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3);

        //阻塞,直到所有任务结束。
        System.out.println(Thread.currentThread().getName() + ":" + LocalDateTime.now() + ":阻塞");
        //调用join方法等待全部任务完成
        all.join();

        if (all.isDone()) {
            //一个需要耗时2秒,一个需要耗时3秒,只有当最长的耗时3秒的完成后,才会结束。
            System.out.println("全部任务执行完成");
        }
        System.out.println(Thread.currentThread().getName() + ":" + LocalDateTime.now() + ":阻塞结束");
    }

在这里插入图片描述

(2)anyOf编码实战

	public static void testAnyOf() throws Exception {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("future1执行完成");
            return "future1执行完成";
        });


        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("future2执行完成");
            return "future2执行完成";
        });

        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("future3执行完成");
            return "future3执行完成";
        });

        CompletableFuture<Object> any = CompletableFuture.anyOf(future1, future2, future3);

        //阻塞,最快任务执行完成 任务结束。
        System.out.println(Thread.currentThread().getName() + ":" + LocalDateTime.now() + ":阻塞");
        //调用join方法等待最快的一个任务执行
        any.join();

        if (any.isDone()) {
            //一个需要耗时2秒,一个需要耗时3秒,一个耗时5秒 当最短的完成则会结束
            System.out.println(any.get()+"任务执行完成");
        }
        System.out.println(Thread.currentThread().getName() + ":" + LocalDateTime.now() + ":阻塞结束");
    }

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/411830.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Golang GORM入门

一、GORM入门 1.1 什么是ORM&#xff1f; orm是一种术语而不是软件 orm英文全称object relational mapping&#xff0c;就是对象映射关系简单来说类似python这种面向对象的程序来说一切皆对象&#xff0c;但是我们使用的数据库却都是关系型的 为了保证一致的使用习惯&#xff…

NumPy 数组学习手册:1~5

原文&#xff1a;Learning NumPy Array 协议&#xff1a;CC BY-NC-SA 4.0 译者&#xff1a;飞龙 一、NumPy 入门 让我们开始吧。 我们将在不同的操作系统上安装 NumPy 和相关软件&#xff0c;并查看一些使用 NumPy 的简单代码。 正如“序言”所述&#xff0c;SciPy 与 NumPy 密…

开放式蓝牙耳机推荐,推荐几款具有代表性的骨传导耳机

骨传导耳机是一种骨传导技术和音频技术结合的产品&#xff0c;通过声音将振动从骨头传到听觉神经&#xff0c;从而达到听音的效果。相比于传统的入耳式耳机&#xff0c;骨传导耳机佩戴更舒适&#xff0c;不会对耳朵造成损伤。然而市面上有很多不同类型的骨传导耳机&#xff0c;…

机器学习和深度学习在气象中的应用(台风预报只能订正、风速预报订正、LSTM 方法预测 ENSO)

查看原文>>>Python人工智能在气象中的实践技术应用 目录 专题一、Python 和科学计算基础 专题二、机器学习和深度学习基础理论和实操 2.1 机器学习和深度学习基础理论 2.2 sklearn 和pytorch 库 专题三 、气象领域中的机器学习应用实例 3.1 GFS 数值模式的风速…

【玩转RT-Thread】RT-Thread网络框架:BSD网络接口SAL套接字抽象层

文章目录RT-Thread网络框架&#xff1a;BSD网络接口&SAL套接字抽象层基础知识1.TCP与UDP的区别2.TCP编程 服务端配置过程3.TCP编程 客户端配置过程4.UDP编程 客户端配置过程SAL套接字抽象层1.SAL组件主要功能特点&#xff1a;2.SAL网络框架3.工作原理4.多协议接入与接口函数…

“成年人”的数据库,既要又要也要!

欢迎访问 OceanBase 官网获取更多信息&#xff1a;https://www.oceanbase.com/ 3 月 25 日&#xff0c;第一届 OceanBase 开发者大会在北京举行&#xff0c;《明说三人行》访谈栏目创始人兼主持人卢东明、沃趣科技创始人兼 CEO 陈栋、DBAplus 社群联合创始人杨建荣、PostgreSQL…

7.1 基本运算电路(2)

七、集成运放性能指标对运算误差的影响 在上述各电路运算关系的分析中&#xff0c;均认为集成运放为理想运放。而实际上&#xff0c;当利用运放构成运算电路时&#xff0c;由于开环差模增益 AodA_{od}Aod​、差模输入电阻 ridr_{id}rid​ 和共模抑制比 KCMRK_{CMR}KCMR​ 为有…

【计算机网络-应用层】域名系统 DNS、文件传输协议 FTP、电子邮件

文章目录1 域名系统 DNS1.1 域名结构1.2 域名服务器1.2.1 根域名服务器1.2.2 顶级域名服务器1.2.3 权限域名服务器1.2.4 本地域名服务器1.3 域名解析过程1.3.1 递归查询1.3.2 递归与迭代相结合查询1.3.3 本地域名服务器的高速缓存2 文件传输协议 FTP2.1 主动模式&#xff08;建…

java编译和运行带有包名的类

写在前面 对于习惯了使用ide的我们似乎早已经忘记了如何通过命令行来编译和运行java类了&#xff0c;至少我是这样的&#xff0c;本文就一起来回顾下吧&#xff01; 1&#xff1a;运行不带包的类 这种相信大多数朋友都记得&#xff0c;直接javac yourCode.java,然后java you…

Camtasia studio2023录屏和后期剪辑的软件

Camtasia 2023是专门用于屏幕录制的软件&#xff0c;功能十分丰富&#xff0c;不仅可以录制电脑屏幕、局部区域和摄像头等&#xff0c;而且还能即时编辑视频&#xff0c;给视频添加转场、旁白、字幕等&#xff0c;能够轻松制作更优秀的视频。 兼顾录屏和后期剪辑的软件—Camtas…

Oracle_EBS_核心功能(MFG)(第二部分)

BOM: Routing工艺路线应用&#xff1a;Bills of Material 职责&#xff1a;Bills of Material 基础业务学习总体说明 Routing&#xff08;工艺路线&#xff09;最终解决的问题是生产过程中加工顺序、资源和用量的标准化。准确度要求在98%以上&#xff0c;要不断与现场比对&…

【离散数学】图论

1、有n个点没有边 零图 2、有1个点没有边 平凡图 3、含有平行边的图 多重图 4、简单图 不含有平行边和自回环的图 5、任意两个结点之间都有边 完全图 6、环贡献 两度 7、所有顶点的度数之和等于边数的两倍 8、在有向图中所有顶点的出度之和 或者 入度之和 等于边数 9、度数为…

特斯拉和OpenAI的加持,马斯克简直人生赢家

赢家已定 商人行事&#xff0c;最重要的因素之一是利益驱动。这里&#xff0c;最服“马斯克”。 以马斯克为首的特斯拉公司周日宣布&#xff0c;将在上海新建一家超级工厂&#xff0c;专门生产该公司的储能产品Megapack。签约的特斯拉储能超级工厂项目也是该公司在美国本土以…

【论文笔记】CRN: Camera Radar Net for Accurate, Robust, Efficient 3D Perception

原文链接&#xff1a;https://arxiv.org/abs/2304.00670 1. 引言 本文提出两阶段融合方法CRN&#xff0c;能使用相机和雷达生成语义丰富且位置精确的BEV特征。具体来说&#xff0c;首先将图像透视特征转换到BEV下&#xff0c;该步骤依赖雷达&#xff0c;称为雷达辅助的视图变换…

大数据技术(入门篇) --- centos7安装CDH6.2集群

随着信息化时代的进步&#xff0c;业务系统的数据量出现了爆发式的增长&#xff0c;带来的不良结果就是数据库的数据量剧增&#xff0c;而部分业务系统需要实时数据&#xff0c;有些业务系统需要离线计算后的数据&#xff0c;所以就产生了大数据技术&#xff0c;因此最近在学习…

面试官:说一说mysql的varchar字段最大长度?

在mysql建表sql里&#xff0c;我们经常会有定义字符串类型的需求。 CREATE TABLE user (name varchar(100) NOT NULL DEFAULT COMMENT 名字 ) ENGINEInnoDB DEFAULT CHARSETutf8mb4 ;比方说user表里的名字&#xff0c;就是个字符串。mysql里有两个类型比较适合这个场景。 ch…

剧本拆分如何用ai人工智能辅助完成

随着现代技术的发展&#xff0c;人工智能在电影制作领域中的应用已经越来越普遍。其中&#xff0c;辅助剧本拆分是人工智能技术的一种重要应用。人工智能可以帮助电影制作人员更快速、更准确地进行剧本拆分&#xff0c;提高制作效率和创作质量。 剧本拆分是电影制作中非常重要的…

二叉树的链式结构

思维导图 二叉树的创建 先定义一个二叉树链式结构的结构体 typedef int BTDatatype; typedef struct BinaryTreeNode {struct BinaryTreeNode* left;struct BinaryTreeNode* right;BTDatatype data; }BTNode; 手搓一个二叉树&#xff08;前序遍历的方式创建二叉树放到OJ题…

nm命令 以及 C++11 编译出现找不到stringstream 以及 undefined reference to `std::runtime_error

最近在学习ZLMediaKit 源码 里面用到了很多C11 的知识 本地有一个 ubuntu18.04 的服务器 源码下下来发现 直接编译报很多错误 比如 找不到 std::runtime_error 找不到 stringstream 等等等 后来偶然的机会发现 是libstdc.so.6 太老了 找一个新的 替换掉这个就可以 …

新 Nano(五)自己写个库,读 DHT11 / DHT22

DHT11 这款温湿度传感器 几乎是所有 MCU 入门第一个传感器&#xff0c; 现在看来有些不合时宜&#xff0c; 毕竟过于廉价&#xff0c;数据不太靠谱&#xff0c;远不如 AHT10 好用。早年买了两个&#xff0c;按例程读出数据后就吃灰了。某日看到有人说自己按datasheet去读&#…