CompletionStage 接口分析
接口能力概述
CompletionStage 是 Java 8 引入的接口,用于表示异步计算的一个阶段,它提供了强大的异步编程能力:
- 链式异步操作:允许将一个异步操作的结果传递给下一个操作
- 组合操作:可以组合多个 CompletionStage
- 异常处理:提供对异步计算中异常的处理机制
- 多种执行方式:支持同步、默认异步和自定义执行器的异步执行
主要功能分类
1. 单阶段依赖操作
thenApply()
- 转换结果thenAccept()
- 消费结果thenRun()
- 执行无结果操作
2. 双阶段组合操作
thenCombine()
- 两个阶段都完成后合并结果thenAcceptBoth()
- 两个阶段都完成后消费结果runAfterBoth()
- 两个阶段都完成后执行操作
3. 任一阶段完成操作
applyToEither()
- 任一阶段完成后转换结果acceptEither()
- 任一阶段完成后消费结果runAfterEither()
- 任一阶段完成后执行操作
4. 异常处理
exceptionally()
- 异常时提供替代值handle()
- 无论成功或异常都处理whenComplete()
- 无论成功或异常都执行操作
5. 组合其他 CompletionStage
thenCompose()
- 扁平化嵌套的 CompletionStage
默认方法
从 Java 12 开始,CompletionStage 接口新增了一些默认方法,主要用于更灵活的异常处理:
-
exceptionallyAsync(Function<Throwable, ? extends T> fn)
- 异步处理异常的默认方法
-
exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor)
- 使用指定执行器异步处理异常的默认方法
-
exceptionallyCompose(Function<Throwable, ? extends CompletionStage<T>> fn)
- 异常时返回新的 CompletionStage 的默认方法
-
exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn)
- 异步方式异常时返回新的 CompletionStage 的默认方法
-
exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn, Executor executor)
- 使用指定执行器异步方式异常时返回新的 CompletionStage 的默认方法
执行模式
大多数方法都有三种变体:
- 同步:基本方法(如
thenApply
) - 默认异步:带
Async
后缀(如thenApplyAsync
) - 自定义执行器异步:带
Async
后缀和 Executor 参数(如thenApplyAsync(fn, executor)
)
总结
CompletionStage 接口为 Java 异步编程提供了强大的构建块,允许开发者以声明式的方式组合异步操作,处理成功和失败情况,并控制操作的执行方式(同步或异步)。从 Java 12 开始,通过新增的默认方法进一步增强了异常处理的灵活性。
CompletionStage 默认方法实现分析
CompletionStage 接口从 Java 12 开始引入了一系列默认方法,主要增强了异常处理的灵活性。这些默认方法提供了开箱即用的实现,但它们的正确运行依赖于接口中其他基本方法的正确实现。
默认方法分类与实现分析
1. 异步异常处理 (exceptionallyAsync
)
public default CompletionStage<T> exceptionallyAsync(Function<Throwable, ? extends T> fn) {
return handle((r, ex) -> (ex == null)
? this
: this.<T>handleAsync((r1, ex1) -> fn.apply(ex1)))
.thenCompose(Function.identity());
}
实现分析:
- 首先调用
handle
方法检查是否有异常 - 如果没有异常(
ex == null
),返回当前阶段本身 - 如果有异常,使用
handleAsync
异步执行异常处理函数 - 最后通过
thenCompose
扁平化结果
依赖的子类实现:
handle()
handleAsync()
thenCompose()
2. 带执行器的异步异常处理 (exceptionallyAsync
with Executor)
public default CompletionStage<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor) {
return handle((r, ex) -> (ex == null)
? this
: this.<T>handleAsync((r1, ex1) -> fn.apply(ex1), executor))
.thenCompose(Function.identity());
}
实现分析:
与上一个方法类似,但使用指定的 Executor
来执行异步操作
依赖的子类实现:
handle()
handleAsync(Executor)
thenCompose()
3. 异常组合处理 (exceptionallyCompose
)
public default CompletionStage<T> exceptionallyCompose(Function<Throwable, ? extends CompletionStage<T>> fn) {
return handle((r, ex) -> (ex == null)
? this
: fn.apply(ex))
.thenCompose(Function.identity());
}
实现分析:
- 检查是否有异常
- 无异常时返回当前阶段
- 有异常时调用函数生成新的 CompletionStage
- 使用
thenCompose
扁平化结果
依赖的子类实现:
handle()
thenCompose()
4. 异步异常组合处理 (exceptionallyComposeAsync
)
public default CompletionStage<T> exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn) {
return handle((r, ex) -> (ex == null)
? this
: this.handleAsync((r1, ex1) -> fn.apply(ex1))
.thenCompose(Function.identity()))
.thenCompose(Function.identity());
}
实现分析:
类似 exceptionallyCompose
,但使用异步方式处理异常
依赖的子类实现:
handle()
handleAsync()
thenCompose()
子类需要提供的内容
虽然这些是默认方法,但它们的正确运行依赖于接口中其他基本方法的正确实现。子类需要确保以下方法的正确实现:
-
基本处理方法:
handle()
handleAsync()
handleAsync(Executor)
-
组合方法:
thenCompose()
thenComposeAsync()
thenComposeAsync(Executor)
-
其他基础方法:
- 所有非默认的 CompletionStage 方法,因为默认方法构建在这些基础方法之上
实现注意事项
-
线程安全:子类实现必须保证线程安全,因为 CompletionStage 可能被多个线程访问
-
执行保证:子类必须确保异步方法(
...Async
)确实在另一个线程执行 -
异常传播:必须正确实现异常传播机制,确保异常能通过依赖链传递
-
完成状态:必须正确维护阶段的完成状态(正常完成/异常完成)
-
执行顺序:必须保证依赖操作的执行顺序符合接口规范
示例:自定义实现的关键点
如果创建自定义的 CompletionStage 实现,必须特别注意:
class MyCompletionStage<T> implements CompletionStage<T> {
// 必须实现所有非默认方法
@Override
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn) {
// 实现转换逻辑
}
@Override
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
// 实现处理逻辑
}
// 其他必要方法的实现...
// 默认方法会自动继承,但依赖于上述方法的正确实现
}
CompletionStage 默认方法的使用场景
CompletionStage 接口的默认方法(特别是 Java 12 引入的异常处理方法)在以下场景中特别有用:
1. 异步异常处理场景
典型场景:当需要异步处理异常且不希望阻塞当前线程时
CompletionStage<String> stage = someAsyncOperation()
.exceptionallyAsync(ex -> {
// 异步执行异常恢复逻辑
log.error("Operation failed, using fallback", ex);
return "fallback-value";
});
优势:
- 异常处理不会阻塞调用线程
- 适合处理耗时的异常恢复逻辑(如远程调用备用服务)
2. 需要自定义线程池的异常处理
典型场景:当异常处理需要特定线程池资源时
ExecutorService recoveryExecutor = Executors.newFixedThreadPool(2);
CompletionStage<String> stage = someAsyncOperation()
.exceptionallyAsync(ex -> {
// 在专用线程池中执行恢复逻辑
return callBackendServiceB();
}, recoveryExecutor);
优势:
- 避免异常处理占用主业务线程池
- 可以为不同类型的恢复操作分配不同的线程资源
3. 需要返回新 CompletionStage 的异常恢复
典型场景:当异常发生时需要触发另一个异步操作来恢复
CompletionStage<String> stage = someAsyncOperation()
.exceptionallyCompose(ex -> {
// 当主操作失败时,尝试备用方案
return fallbackAsyncOperation();
});
实际应用:
- 主数据库查询失败时尝试从缓存获取
- 主服务不可用时调用备用服务
4. 复杂的异常处理流水线
典型场景:需要多级异常恢复策略时
CompletionStage<String> stage = someAsyncOperation()
.exceptionallyComposeAsync(ex -> {
// 第一级恢复:尝试本地备用方案
return tryLocalRecovery();
})
.exceptionallyComposeAsync(ex -> {
// 第二级恢复:尝试远程恢复
return tryRemoteRecovery();
}, remoteRecoveryExecutor)
.exceptionally(ex -> {
// 最后兜底方案
return "ultimate-fallback";
});
优势:
- 构建多层次的弹性恢复策略
- 每级恢复可以使用不同的执行策略(同步/异步/特定线程池)
5. 与现有代码的集成
典型场景:当需要将异常处理函数封装为 CompletionStage 时
Function<Throwable, String> legacyRecovery = ex -> {
// 传统的同步恢复逻辑
return LegacyRecoveryService.recover(ex);
};
// 将传统恢复逻辑适配为异步处理
CompletionStage<String> stage = someAsyncOperation()
.exceptionallyAsync(legacyRecovery);
优势:
- 将同步恢复逻辑自动提升为异步处理
- 无需修改原有恢复逻辑的实现
6. 需要保留堆栈信息的场景
典型场景:当异步操作链中需要保留原始异常信息时
CompletionStage<String> stage = someAsyncOperation()
.handleAsync((result, ex) -> {
if (ex != null) {
// 在此添加额外上下文信息
throw new RecoveryException("Failed in async operation", ex);
}
return result;
})
.exceptionallyComposeAsync(ex -> {
// 可以访问到包装后的异常信息
RecoveryException re = (RecoveryException) ex.getCause();
return recoveryWithContext(re.getContext());
});
优势:
- 避免异步操作链中异常信息丢失
- 可以在不同阶段添加诊断上下文
何时选择默认方法 vs 基础方法
场景 | 使用默认方法 | 使用基础方法 |
---|---|---|
简单的同步异常处理 | - | exceptionally() |
异步异常处理 | exceptionallyAsync() | - |
需要控制异常处理线程 | exceptionallyAsync(fn, executor) | - |
异常时需要触发新异步操作 | exceptionallyCompose() | - |
简单的异常转换 | - | handle() |
需要同时处理成功和失败 | - | whenComplete() |
实际案例:服务降级策略
public CompletionStage<Response> handleRequest(Request request) {
return primaryService.callAsync(request)
.exceptionallyComposeAsync(ex -> {
if (ex instanceof TimeoutException) {
// 超时快速失败,不尝试降级
throw new ServiceException("Timeout", ex);
}
// 其他异常尝试降级
return fallbackService.callAsync(request);
}, fallbackExecutor)
.exceptionally(ex -> {
// 记录最终失败
metrics.recordFailure(ex);
return Response.failure("Service unavailable");
});
}
在这个案例中,我们:
- 首先尝试主服务调用
- 如果失败(非超时),异步尝试降级服务
- 使用专门的线程池执行降级逻辑
- 最后兜底记录指标并返回友好错误
这些默认方法让复杂的异步错误处理模式能够以声明式的方式简洁表达。