CompletableFuture:异步编程的“智能机械臂”
机械臂这个词很亲切这可是上家公司我们的仓储物流系统大功臣如果说Future是一个只会让你“傻等”的取餐牌拿到牌 - 去旁边站着 - 叫号了去取那么CompletableFuture就是工厂里的智能流水线机械臂传统 Future 痛点阻塞调用future.get()会卡住当前线程直到任务完成。无法编排很难实现“任务 A 做完自动做 BB 做完并行做 C 和 D最后汇总”这种复杂逻辑。代码会变成地狱般的嵌套回调Callback Hell或者一堆杂乱的线程管理。CompletableFuture 核心能力非阻塞任务提交后立即返回不占用主线程。函数式编排像搭积木一样链式调用 (thenApply,thenCompose)。异常处理统一的exceptionally或handle机制。多任务聚合轻松实现allOf(全做完) 或anyOf(谁快用谁)。第一部分核心概念——从“取餐牌”到“流水线”1. 为什么需要它场景对比比如练手做小型电商项目构建一个电商详情页需要获取用户信息(User Service) - 耗时 50ms商品信息(Product Service) - 耗时 80ms推荐列表(Recommend Service) - 依赖用户信息耗时 60ms优惠券(Coupon Service) - 依赖用户 商品耗时 40ms当然真实的业务场景不是这么用的只是用于概念的讲解// 代码混乱难以维护容易忘记关闭线程池异常处理麻烦 FutureUser f1 pool.submit(() - userService.getUser()); FutureProduct f2 pool.submit(() - productService.getProduct()); User user f1.get(); // 阻塞 Product product f2.get(); // 阻塞 ——————————————————————————————————————————————————————————————————————————————————— CompletableFuture (虽是异步编排、环环相扣、逻辑相连 // 总耗时 ≈ max(50, 80) max(60, 40) ≈ 80 60 140ms (性能提升 40%) // 且代码像流水账一样清晰 CompletableFutureUser userFuture CompletableFuture.supplyAsync(() - userService.getUser()); CompletableFutureProduct productFuture CompletableFuture.supplyAsync(() - productService.getProduct()); // 依赖 user 的任务自动触发 CompletableFutureListRec recFuture userFuture.thenApply(user - recService.getRecs(user)); // 依赖 user 和 product 的任务等两者都完成后自动触发 CompletableFutureListCoupon couponFuture userFuture.thenCombine(productFuture, (u, p) - couponService.getCoupons(u, p)); // 最后汇总所有结果 CompletableFuturePageData allDone CompletableFuture.allOf(recFuture, couponFuture) .thenApply(v - assemblePage(recFuture.join(), couponFuture.join()));这就像饭店上菜一样作为服务员你不可能一直站在厨房门口等菜。你下单后submit厨师线程池开始做菜。菜做好了complete自动传送到下一个工位thenApply最后打包好直接端到你面前join/get或者回调通知第二部分核心 API 详解与实战方法描述是否有返回值线程池supplyAsync(SupplierU)异步执行有返回值的任务✅ 有默认 ForkJoinPool.commonPool()runAsync(Runnable)异步执行无返回值的任务❌ 无 (Void)默认 ForkJoinPool.commonPool()supplyAsync(..., Executor)推荐指定自定义线程池✅ 有自定义重要警告生产环境永远不要使用默认的commonPool()原因它是全局共享的。如果你的任务里有 IO 阻塞查库、调接口会把公共池的线程占满导致整个 JVM 其他使用并行流或 CF 的地方全部卡死。最佳实践 always provide a customExecutor(e.g.,ThreadPoolExecutor).自定义2. 转换结果thenApply(流水线加工)场景上一步的结果经过计算变成下一步的输入。签名thenApply(FunctionT, U)行为当前任务完成后在当前线程或指定线程执行转换函数返回新结果。CompletableFutureInteger future CompletableFuture.supplyAsync(() - { System.out.println(Step 1: 获取原始数据 (Thread: Thread.currentThread().getName() )); return 10; }, customExecutor); // 链式调用10 - 20 CompletableFutureInteger result future.thenApply(data - { System.out.println(Step 2: 数据翻倍 (Thread: Thread.currentThread().getName() )); return data * 2; }); // 注意thenApply 默认复用上一个任务的线程如果上一个刚结束除非指定 executor3. 依赖另一个异步任务thenCompose(扁平化/串联)场景第二步也是一个异步任务返回CompletableFuture你需要把两个未来“拍平”成一个签名thenCompose(FunctionT, CompletableFutureU)比喻第一步拿到了“订单 ID”第二步要用这个 ID 去异步查询“订单详情”。区别thenApply:T-U(同步转换)thenCompose:T-CompletableFutureU(异步依赖避免嵌套CompletableFutureCompletableFutureU)// 模拟异步获取用户 ID CompletableFutureString userIdFuture CompletableFuture.supplyAsync(() - U1001); // 错误写法 (会得到 CompletableFutureCompletableFutureUser) // userIdFuture.thenApply(id - getUserAsync(id)); // 正确写法 (thenCompose 拍平) CompletableFutureUser userFuture userIdFuture.thenCompose(id - { System.out.println(拿到 ID: id , 开始异步查询用户详情...); return getUserAsync(id); // 返回一个新的 CompletableFuture });4. 合并两个任务thenCombine(并联汇聚)任务 C 依赖 任务 A和任务 B 的结果。A 和 B 并行执行都完成后触发 C签名thenCombine(OtherFuture, BiFunctionT, U, V)CompletableFutureInteger taskA CompletableFuture.supplyAsync(() - 10); CompletableFutureInteger taskB CompletableFuture.supplyAsync(() - 20); // 等 A 和 B 都做完执行相加 CompletableFutureInteger sumFuture taskA.thenCombine(taskB, (a, b) - { System.out.println(A a , B b , 计算总和); return a b; });5. 等待所有/任意任务allOf/anyOfallOf(f1, f2, ...): 等待所有任务完成。返回CompletableFutureVoid。用法通常配合join()提取各个任务的结果。anyOf(f1, f2, ...):任意一个完成任务即返回常用于多源兜底谁快用谁6. 异常处理不让流水线崩塌传统的try-catch在异步链式中很难写。CF 提供了专门的钩子。exceptionally(FunctionThrowable, T): 类似 catch返回一个默认值。handle(BiFunctionT, Throwable, U): 无论成功还是异常都会执行类似 finally 判断。最推荐CompletableFutureString future CompletableFuture.supplyAsync(() - { if (true) throw new RuntimeException(服务挂了!); return OK; }).handle((result, ex) - { if (ex ! null) { System.err.println(出错了: ex.getMessage()); return 默认降级数据; // 返回兜底值 } return result; // 正常返回 });第三部分微服务实战——编排复杂调用链场景构建一个聚合接口需要并行调用三个下游服务其中两个有依赖关系最后汇总。Task A: 获取基础配置 (独立)Task B: 获取用户信息 (独立)Task C: 获取用户订单 (依赖 B)Task D: 组装最终结果 (依赖 A, C)import java.util.concurrent.*; import java.util.stream.Collectors; public class MicroserviceOrchestration { // 自定义线程池 (关键隔离业务控制资源) private static final ExecutorService executor new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(100), new ThreadFactoryBuilder().setNameFormat(async-pool-%d).build(), new ThreadPoolExecutor.CallerRunsPolicy() ); public CompletableFutureResponseDTO buildPage(String userId) { // 1. 并行启动独立任务 A 和 B CompletableFutureConfig configFuture CompletableFuture .supplyAsync(() - mockConfigService(), executor) .exceptionally(ex - Config.defaultConfig()); // 降级 CompletableFutureUser userFuture CompletableFuture .supplyAsync(() - mockUserService(userId), executor) .exceptionally(ex - User.guestUser()); // 降级 // 2. 任务 C 依赖 B (thenCompose) CompletableFutureOrder orderFuture userFuture.thenCompose(user - { if (guest.equals(user.getType())) { return CompletableFuture.completedFuture(Order.empty()); // 快速返回不再调下游 } return CompletableFuture.supplyAsync(() - mockOrderService(user.getId()), executor); }); // 3. 任务 D 依赖 A 和 C (thenCombine) CompletableFutureResponseDTO finalResult configFuture.thenCombine(orderFuture, (config, order) - { // 这里执行最后的组装逻辑 return new ResponseDTO(config, order); }); return finalResult; } // 模拟调用 private Config mockConfigService() { /* sleep 50ms */ return new Config(); } private User mockUserService(String id) { /* sleep 80ms */ return new User(id); } private Order mockOrderService(String uid) { /* sleep 60ms */ return new Order(uid); } public static void main(String[] args) throws Exception { MicroserviceOrchestration service new MicroserviceOrchestration(); long start System.currentTimeMillis(); // 发起请求 (非阻塞) CompletableFutureResponseDTO future service.buildPage(U123); // 在主线程等待结果 (实际 Web 容器中框架会帮你处理这个等待直接返回 DeferredResult) ResponseDTO response future.join(); System.out.println(总耗时: (System.currentTimeMillis() - start) ms); System.out.println(结果: response); executor.shutdown(); } }自定义线程池避免了污染commonPool且可以针对该业务调整队列大小和拒绝策略。异常降级每个远程调用都加了exceptionally保证单个服务挂掉不会导致整个页面白屏而是显示默认值。短路优化在thenCompose中判断如果是 Guest 用户直接返回空订单不再发起多余的 RPC 调用。自动并行configFuture和userFuture同时启动互不阻塞。第四部分避坑指南与实践慎用get()和join()原则尽量将逻辑写在链式调用 (thenApply,thenAccept) 中让框架自动回调。例外只有在最外层如 Controller 出口或单元测试才调用join()等待最终结果。在链条中间调用join()会阻塞当前线程破坏异步优势。线程池隔离IO 密集型调 RPC、查 DB线程数可以设大一点如 CPU 核数 * 2 或更多因为线程大部分时间在 wait。CPU 密集型计算线程数 CPU 核数 1。不同业务隔离核心业务如下单和非核心业务如推荐使用不同的线程池防止非核心业务把线程池占满拖垮核心业务。上下文传递 (ThreadLocal)问题supplyAsync会切换线程导致ThreadLocal(如 TraceID, UserContext)丢失。解决方案 A在supplyAsync之前手动把变量取出来作为参数传进去推荐最简单。方案 B使用InheritableThreadLocal(仅限线程池复用线程时有效且有风险)。方案 C使用阿里TransmittableThreadLocal(TTl) 等专门库在包装 Runnable/Supplier 时传递上下文。避免回调地狱 (Callback Hell)虽然 CF 比原生 Callback 好但如果链式调用超过 5-6 层代码也会难读。建议将长链条拆分成多个方法每个方法返回一个CompletableFuture// 坏超长链 f1.thenApply(...).thenCompose(...).thenApply(...).thenCombine(...)... // 好拆分 CompletableFutureA step1() { ... } CompletableFutureB step2(A a) { ... } public CompletableFutureResult orchestrate() { return step1().thenCompose(this::step2)... }场景推荐方案理由简单异步无需结果executor.submit(Runnable)轻量够用简单异步需阻塞等待结果Future传统简单多服务并行调用需编排依赖CompletableFuture唯一真神。支持 DAG (有向无环图) 编排非阻塞异常友好响应式流 (背压海量事件)Project Reactor (Mono/Flux)/RxJavaCF 是单次任务Reactor 是流式数据。如果涉及流控、重试、复杂流变换选 Reactor
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2410415.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!