Java 响应式编程:Reactor 框架深度解析
Java 响应式编程Reactor 框架深度解析核心概念响应式编程是一种编程范式关注数据的异步流和变化传播。在 Java 中Reactor 框架提供了强大的响应式编程支持基于 Reactive Streams 规范实现。Reactor 核心组件Mono表示 0 或 1 个元素的异步序列Flux表示 0 到 N 个元素的异步序列Scheduler调度器控制任务执行线程Operator操作符用于转换和处理数据流创建流// 创建 Flux FluxString flux Flux.just(Hello, World, Reactor); // 创建 Mono MonoString mono Mono.just(Hello); // 从集合创建 ListString list Arrays.asList(A, B, C); FluxString fromList Flux.fromIterable(list); // 从数组创建 String[] array {X, Y, Z}; FluxString fromArray Flux.fromArray(array); // 创建范围 FluxInteger range Flux.range(1, 10); // 创建空流 FluxString empty Flux.empty(); MonoString monoEmpty Mono.empty(); // 创建错误流 FluxString error Flux.error(new RuntimeException(Something went wrong));订阅流// 订阅并消费数据 Flux.just(A, B, C) .subscribe( item - System.out.println(Received: item), error - System.err.println(Error: error.getMessage()), () - System.out.println(Completed) ); // 简化订阅 Flux.just(1, 2, 3) .subscribe(System.out::println); // 使用 Disposable 控制订阅 Disposable disposable Flux.interval(Duration.ofSeconds(1)) .subscribe(tick - System.out.println(Tick: tick)); // 5 秒后取消订阅 Thread.sleep(5000); disposable.dispose();操作符// 转换操作符 Flux.just(hello, world) .map(String::toUpperCase) .subscribe(System.out::println); // HELLO, WORLD // 过滤操作符 Flux.range(1, 10) .filter(n - n % 2 0) .subscribe(System.out::println); // 2, 4, 6, 8, 10 // 映射操作符 Flux.just(a, b, c) .flatMap(s - Flux.just(s.toUpperCase(), s.toLowerCase())) .subscribe(System.out::println); // A, a, B, b, C, c // 组合操作符 FluxString flux1 Flux.just(A, B); FluxString flux2 Flux.just(X, Y); Flux.concat(flux1, flux2) .subscribe(System.out::println); // A, B, X, Y Flux.zip(flux1, flux2, (a, b) - a - b) .subscribe(System.out::println); // A-X, B-Y // 聚合操作符 Flux.range(1, 10) .reduce(0, Integer::sum) .subscribe(sum - System.out.println(Sum: sum)); // Sum: 55 Flux.range(1, 10) .collectList() .subscribe(list - System.out.println(List: list)); // List: [1, 2, ..., 10]错误处理// 错误处理操作符 Flux.error(new RuntimeException(Original error)) .onErrorReturn(Fallback value) .subscribe(System.out::println); // Fallback value Flux.error(new RuntimeException(Original error)) .onErrorResume(e - Flux.just(Recovered from: e.getMessage())) .subscribe(System.out::println); // Recovered from: Original error Flux.just(1, 2, 0, 3) .map(n - 10 / n) .onErrorContinue((e, value) - System.out.println(Skipping value)) .subscribe(System.out::println); // 10, 5, 3 Flux.just(1, 2, 3) .doOnError(e - System.err.println(Error: e.getMessage())) .subscribe(); // 使用 retry Flux.error(new RuntimeException(Retry error)) .retry(3) .subscribe( System.out::println, e - System.err.println(Failed after retries: e.getMessage()) );调度器// 使用调度器 Flux.range(1, 10) .subscribeOn(Schedulers.parallel()) // 在并行线程池执行 .subscribe(System.out::println); Flux.range(1, 10) .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行 .observeOn(Schedulers.single()) // 在单线程池观察结果 .subscribe(System.out::println); // 创建自定义调度器 Scheduler customScheduler Schedulers.newParallel(custom, 4); Flux.range(1, 10) .subscribeOn(customScheduler) .subscribe(System.out::println); // 使用虚拟线程Java 21 Flux.range(1, 10) .subscribeOn(Schedulers.fromExecutor(Executors.newVirtualThreadPerTaskExecutor())) .subscribe(System.out::println);背压处理// 背压策略 Flux.range(1, 1000) .onBackpressureBuffer(100) // 缓冲最多 100 个元素 .subscribe(); Flux.range(1, 1000) .onBackpressureDrop(dropped - System.out.println(Dropped: dropped)) .subscribe(); Flux.range(1, 1000) .onBackpressureLatest() // 只保留最新元素 .subscribe(); // 使用 request 控制流速 Flux.range(1, 10) .subscribe(new BaseSubscriberInteger() { Override protected void hookOnSubscribe(Subscription subscription) { request(2); // 初始请求 2 个元素 } Override protected void hookOnNext(Integer value) { System.out.println(Received: value); request(1); // 处理完一个再请求一个 } });实际应用示例// 响应式 REST 客户端 Service public class UserService { private final WebClient webClient; public UserService(WebClient.Builder webClientBuilder) { this.webClient webClientBuilder.baseUrl(https://api.example.com).build(); } public MonoUser getUserById(Long id) { return webClient.get() .uri(/users/{id}, id) .retrieve() .onStatus(HttpStatus::isError, response - response.bodyToMono(String.class) .flatMap(body - Mono.error(new UserNotFoundException(body))) ) .bodyToMono(User.class); } public FluxUser getUsersByDepartment(String department) { return webClient.get() .uri(/users?department{department}, department) .retrieve() .bodyToFlux(User.class); } public MonoUser createUser(UserCreateRequest request) { return webClient.post() .uri(/users) .bodyValue(request) .retrieve() .onStatus(HttpStatus::isError, response - response.bodyToMono(String.class) .flatMap(body - Mono.error(new UserCreationException(body))) ) .bodyToMono(User.class); } } // 响应式数据访问 Repository public class ReactiveUserRepository { private final MongoClient mongoClient; public ReactiveUserRepository(MongoClient mongoClient) { this.mongoClient mongoClient; } public MonoUser findById(String id) { return getCollection().findById(id); } public FluxUser findByDepartment(String department) { return getCollection().find(query(where(department).is(department))); } public MonoUser save(User user) { return getCollection().save(user); } public MonoVoid deleteById(String id) { return getCollection().deleteById(id); } private MongoCollectionUser getCollection() { return mongoClient.getDatabase(example).getCollection(users, User.class); } } // 组合多个异步操作 Service public class OrderService { private final UserService userService; private final ProductService productService; private final OrderRepository orderRepository; public OrderService(UserService userService, ProductService productService, OrderRepository orderRepository) { this.userService userService; this.productService productService; this.orderRepository orderRepository; } public MonoOrder createOrder(Long userId, Long productId, int quantity) { return Mono.zip( userService.getUserById(userId), productService.getProductById(productId) ) .flatMap(tuple - { User user tuple.getT1(); Product product tuple.getT2(); if (product.getStock() quantity) { return Mono.error(new InsufficientStockException()); } Order order new Order(); order.setUserId(userId); order.setProductId(productId); order.setQuantity(quantity); order.setTotalPrice(product.getPrice() * quantity); order.setStatus(PENDING); return orderRepository.save(order); }) .doOnSuccess(order - { // 更新库存 productService.updateStock(productId, -quantity).subscribe(); }) .doOnError(e - { // 记录错误日志 System.err.println(Failed to create order: e.getMessage()); }); } }测试响应式代码// 测试 Mono Test void monoTest() { MonoString mono Mono.just(Hello); StepVerifier.create(mono) .expectNext(Hello) .expectComplete() .verify(); } // 测试 Flux Test void fluxTest() { FluxInteger flux Flux.range(1, 5); StepVerifier.create(flux) .expectNext(1, 2, 3, 4, 5) .expectComplete() .verify(); } // 测试错误处理 Test void errorTest() { FluxString errorFlux Flux.error(new RuntimeException(Test error)); StepVerifier.create(errorFlux) .expectError(RuntimeException.class) .verify(); } // 测试转换操作 Test void transformTest() { FluxString flux Flux.just(a, b, c) .map(String::toUpperCase); StepVerifier.create(flux) .expectNext(A, B, C) .expectComplete() .verify(); }最佳实践避免阻塞不要在响应式链中调用阻塞方法正确处理背压根据消费者能力控制流速使用合适的调度器根据任务类型选择调度器组合操作符使用 compose 和 transform 组合操作符错误处理在适当的位置处理错误资源管理使用 using 管理资源生命周期避免嵌套订阅使用 flatMap 代替嵌套 subscribe测试验证使用 StepVerifier 测试响应式代码实际应用场景高并发 API处理大量并发请求实时数据流如 WebSocket 通信数据处理管道批量数据处理异步组合组合多个异步操作总结Reactor 框架为 Java 提供了强大的响应式编程能力。通过 Mono 和 Flux 两个核心类型可以优雅地处理异步数据流。合理使用操作符和调度器可以构建高性能、高并发的应用程序。别叫我大神叫我 Alex 就好。这其实可以更优雅一点响应式编程让异步操作变得更加简洁和高效。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2599071.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!