核心操作符详解与实战优化1.map与flatMap转换与异步展开map用于同步的一对一转换而flatMap用于将每个元素异步映射为一个新的PublisherMono或Flux并展开合并结果是处理异步依赖操作如网络请求、数据库查询的核心 。import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.List; public class TransformOperators { // 场景1: map - 同步数据转换 public FluxString toUpperCase(FluxString names) { return names.map(String::toUpperCase); // 同步转换保持顺序 } // 场景2: flatMap异步一对多展开 (顺序不保证) public FluxString fetchDetailsAsync(ListInteger ids) { return Flux.fromIterable(ids) .flatMap(id - fetchFromRemoteService(id) // 返回 MonoString .subscribeOn(Schedulers.boundedElastic()) // 指定异步执行线程池 ); } // 场景3: concatMap - 异步一对多展开 (保证顺序) public FluxString fetchDetailsInOrder(ListInteger ids) { return Flux.fromIterable(ids) .concatMap(id - fetchFromRemoteService(id)); // 顺序执行保证输出顺序与输入一致 } // 场景4: flatMapSequential - 异步并发但按序输出 public FluxString fetchDetailsConcurrentOrdered(ListInteger ids) { return Flux.fromIterable(ids) .flatMapSequential(id - fetchFromRemoteService(id).subscribeOn(Schedulers.parallel()), 5 // 最大并发数 ); } private MonoString fetchFromRemoteService(Integer id) { // 模拟异步网络请求 return Mono.fromCallable(() - { Thread.sleep(100); // 模拟IO延迟 return Data for ID: id; }) .subscribeOn(Schedulers.boundedElastic()); // 在弹性线程池执行阻塞调用 } public static void main(String[] args) throws InterruptedException { TransformOperators demo new TransformOperators(); ListInteger ids List.of(1, 2, 3, 4, 5); System.out.println( flatMap (并发顺序不确定) ); demo.fetchDetailsAsync(ids).subscribe(System.out::println); System.out.println( concatMap (顺序执行) ); demo.fetchDetailsInOrder(ids).subscribe(System.out::println); System.out.println( flatMapSequential (并发但输出有序) ); demo.fetchDetailsConcurrentOrdered(ids).subscribe(System.out::println); Thread.sleep(2000); // 等待异步任务完成 } }关键区别操作符执行方式顺序保证适用场景map同步严格保证简单的数据格式转换、计算flatMap异步并发不保证独立的异步操作如并行API调用concatMap异步顺序严格保证有顺序依赖的异步操作如流水线处理flatMapSequential异步并发保证需要并发提升性能但结果需保持原序2.filter与distinct数据筛选filter根据条件过滤元素distinct用于去重常结合使用以净化数据流 。import reactor.core.publisher.Flux; import java.time.Duration; import java.util.Objects; public class FilteringOperators { // 场景1: 基础过滤与去重 public FluxInteger filterAndDistinct(FluxInteger numbers) { return numbers .filter(n - n % 2 0) // 过滤偶数 .distinct() // 去重 .filter(n - n 10); // 过滤大于10的数 } // 场景2: 基于时间的去重 (防抖变体) public FluxString distinctUntilChangedWithDebounce(FluxString searchStream) { return searchStream .filter(term - term ! null term.length() 2) .distinctUntilChanged() // 仅当与前一个元素不同时才发射 .sample(Duration.ofMillis(300)); // 采样实现防抖效果 } // 场景3: 条件取用 takeUntil / skipWhile public FluxInteger takeUntilCondition(FluxInteger infiniteStream) { return infiniteStream .takeUntil(n - n 50); // 发射元素直到条件为真包含使条件为真的元素 } public FluxInteger skipWhileCondition(FluxInteger numbers) { return numbers .skipWhile(n - n 10); // 跳过元素直到条件为假从第一个不满足条件的元素开始发射 } public static void main(String[] args) { FilteringOperators demo new FilteringOperators(); FluxInteger numbers Flux.just(1, 2, 2, 3, 4, 4, 5, 12, 12, 15); demo.filterAndDistinct(numbers).subscribe(n - System.out.print(n )); // 输出: 12 System.out.println( ---); FluxString searches Flux.just(java, java, jav, javascript, java); demo.distinctUntilChangedWithDebounce(searches).subscribe(System.out::println); // 输出: java (连续重复的java被过滤) - jav - javascript } }3.switchMap与concatMap高级映射与取消语义switchMap在收到新元素时会取消并丢弃前一个内部Publisher的未完成结果非常适合实现搜索建议、取消过时请求等场景 。concatMap则保证顺序且不会取消。import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; public class AdvancedMappingOperators { private AtomicInteger requestCounter new AtomicInteger(0); // 场景1: switchMap - 用于搜索建议取消前一个请求 public FluxString liveSearch(FluxString queryFlux) { return queryFlux .debounce(Duration.ofMillis(300)) // 防抖 .distinctUntilChanged() .switchMap(query - { int reqId requestCounter.incrementAndGet(); System.out.println(发起搜索请求[ reqId ]: query); // 模拟网络请求每个请求耗时不同 return simulateSearchApi(query) .doOnCancel(() - System.out.println(取消请求[ reqId ]: query)); }); } // 场景2: concatMap - 用于有严格顺序要求的异步写入 public FluxString sequentialWrite(FluxString dataFlux) { return dataFlux .concatMap(data - simulateWriteToDatabase(data) // 返回MonoVoid .thenReturn(写入成功: data) // 转换结果 ); } private FluxString simulateSearchApi(String query) { // 模拟一个延迟的API响应 long delay (long) (Math.random() * 500) 100; return Flux.just(结果1 for query, 结果2 for query) .delayElements(Duration.ofMillis(delay)) .doOnComplete(() - System.out.println(请求完成: query)); } private MonoVoid simulateWriteToDatabase(String data) { return Mono.fromRunnable(() - { try { Thread.sleep(200); // 模拟写入延迟 System.out.println(已写入数据库: data); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }) .subscribeOn(Schedulers.boundedElastic()) .then(); } public static void main(String[] args) throws InterruptedException { AdvancedMappingOperators demo new AdvancedMappingOperators(); System.out.println( switchMap 演示快速输入只保留最后一次 ); // 模拟快速输入 a, ab, abc FluxString rapidInput Flux.just(a, ab, abc) .delayElements(Duration.ofMillis(150)); // 间隔小于防抖时间 demo.liveSearch(rapidInput).subscribe( result - System.out.println(收到结果: result), error - System.err.println(错误: error), () - System.out.println(搜索流结束) ); Thread.sleep(1000); System.out.println( concatMap 演示保证顺序 ); FluxString data Flux.just(记录A, 记录B, 记录C); demo.sequentialWrite(data).subscribe(System.out::println); Thread.sleep(1500); } }4. 错误处理操作符实战响应式流的错误处理是声明式的操作符如onErrorReturn,onErrorResume,retry等允许优雅地定义错误恢复策略 。import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; public class ErrorHandlingDemo { private AtomicInteger callCount new AtomicInteger(0); // 场景1: 基础错误恢复 public MonoString fetchDataWithFallback(String id) { return unstableExternalService(id) .onErrorReturn(默认数据) // 1. 出错时返回静态值 .onErrorResume(IllegalArgumentException.class, e - fetchFromBackupService(id) // 2. 根据异常类型切换到备用流 ) .onErrorResume(e - { logError(e); return Mono.error(new BusinessException(业务异常, e)); // 3. 转换异常 }); } // 场景2: 重试策略 public MonoString fetchWithRetry(String id) { return unstableExternalService(id) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)) // 指数退避重试 .maxBackoff(Duration.ofSeconds(10)) .filter(throwable - !(throwable instanceof IllegalArgumentException)) // 特定异常不重试 .doBeforeRetry(rs - System.out.println(第 rs.totalRetries() 次重试...)) .onRetryExhaustedThrow((spec, signal) - new ServiceUnavailableException(服务不可用重试耗尽) ) ); } // 场景3: 超时控制 public MonoString fetchWithTimeout(String id) { return unstableExternalService(id) .timeout(Duration.ofSeconds(2), Mono.defer(() - Mono.just(降级数据)) // 超时后的降级 ) .onErrorMap(TimeoutException.class, e - new BusinessException(请求超时, e) ); } // 场景4: 在Flux流中处理错误继续处理后续元素 public FluxString processBatchWithErrorContinue(ListString ids) { return Flux.fromIterable(ids) .flatMap(id - processItem(id) .onErrorResume(e - { System.err.println(处理项 id 失败: e.getMessage()); return Mono.empty(); // 错误时返回空流不影响其他项 }) ); } private MonoString unstableExternalService(String id) { return Mono.defer(() - { int count callCount.incrementAndGet(); if (count % 3 0) { // 模拟每第三次调用失败 return Mono.error(new RuntimeException(模拟服务异常)); } return Mono.just(数据来自服务: id) .delayElement(Duration.ofMillis(500)); }); } private MonoString fetchFromBackupService(String id) { return Mono.just(数据来自备份: id); } private MonoString processItem(String id) { if (error.equals(id)) { return Mono.error(new RuntimeException(处理错误)); } return Mono.just(已处理: id); } private void logError(Throwable e) { System.err.println([ERROR] e.getMessage()); } public static void main(String[] args) throws InterruptedException { ErrorHandlingDemo demo new ErrorHandlingDemo(); System.out.println( 测试重试与超时 ); demo.fetchWithRetry(test123) .subscribe( data - System.out.println(成功: data), err - System.out.println(最终失败: err.getClass().getSimpleName() : err.getMessage()) ); Thread.sleep(5000); } }5. 背压Backpressure控制策略背压是响应式流的核心特性用于协调生产者和消费者的速度差异防止下游被数据淹没 。import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.reactivestreams.Subscription; import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; public class BackpressureDemo { // 场景1: 响应式拉取基础背压 public void demonstrateRequestPull() { Flux.range(1, 100) .log() // 使用log()观察请求事件 .subscribe( data - System.out.println(收到: data), err - err.printStackTrace(), () - System.out.println(完成), subscription - { subscription.request(5); // 初始请求5个 // 后续可根据处理能力动态调用 subscription.request(n) } ); } // 场景2: 缓冲策略 (onBackpressureBuffer) public FluxInteger handleFastProducer() { return Flux.interval(Duration.ofMillis(10)) // 快速生产者每10ms一个 .map(i - i.intValue()) .onBackpressureBuffer(50, // 缓冲区大小 dropped - System.out.println(元素被丢弃: dropped), // 缓冲区满时的回调 BufferOverflowStrategy.DROP_LATEST) // 策略丢弃最新的 .doOnNext(i - { try { Thread.sleep(100); // 慢速消费者每100ms处理一个 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println(处理: i); }); } // 场景3: 丢弃策略 (onBackpressureDrop) public FluxLong handleDropStrategy() { return Flux.interval(Duration.ofMillis(1)) .onBackpressureDrop(dropped - System.out.println(下游忙丢弃: dropped) ) .doOnNext(i - { // 模拟不稳定的处理速度 try { Thread.sleep((long) (Math.random() * 10)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } // 场景4: 最新值策略 (onBackpressureLatest) public FluxLong handleLatestStrategy() { return Flux.interval(Duration.ofMillis(1)) .onBackpressureLatest() // 只保留最新的元素供下游消费 .doOnNext(i - { try { Thread.sleep(100); // 慢消费者 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println(处理最新值: i); }); } // 场景5: 使用 limitRate 进行预取优化 public FluxInteger optimizeWithLimitRate() { return Flux.range(1, 1000) .log(before-limit) .limitRate(10) // 预取10个处理75%后即7个再请求10个 .log(after-limit) .doOnNext(i - { try { Thread.sleep(10); // 模拟处理 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } public static void main(String[] args) throws InterruptedException { BackpressureDemo demo new BackpressureDemo(); System.out.println( 演示缓冲策略 ); CountDownLatch latch new CountDownLatch(1); demo.handleFastProducer().take(10).subscribe( i - {}, err - err.printStackTrace(), latch::countDown ); latch.await(); } }6. 线程调度Schedulers最佳实践正确的线程调度是避免阻塞、提升性能的关键。subscribeOn指定上游执行线程publishOn影响下游执行线程 。import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import java.time.Duration; import java.util.concurrent.CompletableFuture; public class SchedulerDemo { // 场景1: subscribeOn - 指定源头执行线程 public MonoString fetchDataWithBlockingCall() { return Mono.fromCallable(() - { // 模拟阻塞IO操作如JDBC、文件读取 Thread.sleep(1000); return 阻塞IO数据; }) .subscribeOn(Schedulers.boundedElastic()) // 将阻塞操作移交到弹性线程池 .doOnSubscribe(s - System.out.println(订阅发生在线程: Thread.currentThread().getName())); } // 场景2: publishOn切换下游操作线程 public FluxInteger processWithThreadSwitch() { return Flux.range(1, 5) .doOnNext(i - System.out.println(生成 i 在线程: Thread.currentThread().getName())) .publishOn(Schedulers.parallel()) // 切换下游到并行线程池 .map(i - { System.out.println(映射 i 在线程: Thread.currentThread().getName()); return i * 2; }) .publishOn(Schedulers.single()) // 再次切换到单一线程 .doOnNext(i - System.out.println(最终处理 i 在线程: Thread.currentThread().getName())); } // 场景3: 并行处理 (parallel runOn) public FluxInteger parallelComputation(FluxInteger heavyTasks) { return heavyTasks .parallel(4) // 将流并行化为4个轨道 .runOn(Schedulers.parallel()) // 在每个轨道上使用并行调度器 .map(i - computeHeavy(i)) // 并行执行计算密集型任务 .sequential(); // 将并行流合并回顺序流 } // 场景4: 超时与调度器 public MonoString callWithTimeout() { return Mono.fromFuture(CompletableFuture.supplyAsync(() - { try { Thread.sleep(2000); // 模拟长时间任务 return 结果; } catch (InterruptedException e) { throw new RuntimeException(e); } })) .timeout(Duration.ofSeconds(1), Mono.defer(() - Mono.just(超时降级)) .subscribeOn(Schedulers.boundedElastic()) ) .subscribeOn(Schedulers.boundedElastic()); } private int computeHeavy(int input) { try { Thread.sleep(100); // 模拟计算 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return input * input; } public static void main(String[] args) throws InterruptedException { SchedulerDemo demo new SchedulerDemo(); System.out.println( publishOn 线程切换演示 ); demo.processWithThreadSwitch().subscribe(); Thread.sleep(100); System.out.println( 并行计算演示 ); demo.parallelComputation(Flux.range(1, 8)) .subscribe(i - System.out.println(结果: i)); Thread.sleep(2000); } }7. 组合操作符zip,merge,concat用于将多个流组合成一个流是处理多源数据的利器 。import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Duration; public class CombiningOperators { // 场景1: zip - 一对一组合像拉链 public FluxString zipUserAndProfile(FluxLong userIds, FluxString userNames) { return Flux.zip(userIds, userNames) .map(tuple - ID: tuple.getT1() , Name: tuple.getT2()); // 只有当两个流都有元素时才会组合发射 } // 场景2: combineLatest - 任一更新即组合最新值 public FluxString combineLatestInput(FluxString searchBox, FluxString category) { return Flux.combineLatest(searchBox, category, (s, c) - 搜索: s 在分类: c ); // 常用于实时仪表盘任一数据源更新都重新计算视图 } // 场景3: merge合并多个流按实际发射时间交错 public FluxString mergeNewsFeeds(FluxString feed1, FluxString feed2) { return Flux.merge(feed1, feed2).log(merge); // 结果顺序取决于元素实际到达时间 } // 场景4: concat顺序连接流先消费完第一个再消费第二个 public FluxString concatSequentialOperations(MonoString op1, MonoString op2) { return Flux.concat(op1, op2); // 严格保证 op1 在 op2 之前 } // 场景5: flatMap merge 实现并行合并 public FluxString fetchMultipleInParallel(ListString urls) { return Flux.fromIterable(urls) .flatMap(url - fetchUrl(url)) // 异步获取每个URL .map(response - Data: response); // 合并结果 } // 场景6: zip 等待所有异步操作完成 public MonoAggregatedResult aggregateUserData(Long userId) { MonoUser user fetchUser(userId); MonoListOrder orders fetchOrders(userId); MonoProfile profile fetchProfile(userId); return Mono.zip(user, orders, profile) .map(tuple - new AggregatedResult(tuple.getT1(), tuple.getT2(), tuple.getT3())); // 等待所有三个Mono完成然后组合结果 } private MonoString fetchUrl(String url) { return Mono.just(Response from url) .delayElement(Duration.ofMillis((long) (Math.random() * 500))); } private MonoUser fetchUser(Long id) { return Mono.just(new User(id, User id)); } private MonoListOrder fetchOrders(Long id) { return Mono.just(List.of(new Order(id))); } private MonoProfile fetchProfile(Long id) { return Mono.just(new Profile(id)); } static class User { /* 省略 */ } static class Order { /* 省略 */ } static class Profile { /* 省略 */ } static class AggregatedResult { /* 省略 */ } public static void main(String[] args) throws InterruptedException { CombiningOperators demo new CombiningOperators(); System.out.println( zip 演示 ); FluxLong ids Flux.just(1L, 2L, 3L).delayElements(Duration.ofMillis(100)); FluxString names Flux.just(Alice, Bob, Charlie).delayElements(Duration.ofMillis(150)); demo.zipUserAndProfile(ids, names).subscribe(System.out::println); Thread.sleep(1000); } }8. 综合实战优化后的 WebFlux REST API 端点结合上述操作符构建健壮、高效的响应式API 。import org.springframework.web.bind.annotation.*; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; import java.time.Duration; import java.util.List; RestController RequestMapping(/api/v1) public class OptimizedReactiveController { private final ProductReactiveRepository productRepository; private final WebClient externalServiceClient; // API 1: 获取产品详情并行获取库存与评价 GetMapping(/products/{id}/enhanced) public MonoProductDetailResponse getProductDetail(PathVariable String id) { return productRepository.findById(id) .switchIfEmpty(Mono.error(new ProductNotFoundException(id))) .flatMap(product - Mono.zip( getStockLevel(product.getSku()).subscribeOn(Schedulers.boundedElastic()), getExternalRating(product.getSku()).subscribeOn(Schedulers.boundedElastic()), (stock, rating) - buildDetailResponse(product, stock, rating) ) ) .timeout(Duration.ofSeconds(3), fallbackDetail(id)) .retryWhen(Retry.backoff(2, Duration.ofMillis(500))) .onErrorResume(ProductNotFoundException.class, e - Mono.just(ProductDetailResponse.notFound(id))) .onErrorResume(e - { log.error(获取产品详情失败ID: {}, id, e); return Mono.just(ProductDetailResponse.error(id)); }); } // API 2: 产品搜索带防抖、缓存、分页 GetMapping(/products/search) public FluxProduct searchProducts( RequestParam String q, RequestParam(defaultValue 0) int page, RequestParam(defaultValue 20) int size) { return Mono.just(q) .filter(term - term.length() 2) .switchIfEmpty(Mono.error(new IllegalArgumentException(搜索词过短))) .flatMapMany(term - productRepository.findByKeyword(term) .filter(Product::isActive) .filter(p - p.getStock() 0) .sort((p1, p2) - p2.getScore().compareTo(p1.getScore())) // 按相关性降序 .skip((long) page * size) .take(size) .switchIfEmpty(Flux.defer(() - productRepository.findFromCache(term).take(size) )) ) .onErrorResume(IllegalArgumentException.class, e - Flux.empty()) .onBackpressureBuffer(50, BufferOverflowStrategy.DROP_OLDEST); } // API 3: 批量订单创建并行处理限制并发优雅降级 PostMapping(/orders/batch) public FluxOrderResult createOrdersBatch(RequestBody FluxOrderRequest requests) { return requests .index() // 为每个请求附加索引便于追踪和保持顺序 .flatMap(tuple - { Long index tuple.getT1(); OrderRequest req tuple.getT2(); return processSingleOrder(req) .map(result - new IndexedResult(index, result)) .onErrorResume(e - Mono.just(new IndexedResult(index, OrderResult.failed(req.getId(), e.getMessage()))) ); }, 5) // 最大并发数5 .sort((ir1, ir2) - Long.compare(ir1.index(), ir2.index())) // 按原始请求顺序排序 .map(IndexedResult::result); } // API 4: 服务器发送事件 (Server-Sent Events) GetMapping(value /products/stream, produces text/event-stream) public FluxProductEvent streamProductEvents(RequestParam ListString productIds) { return Flux.fromIterable(productIds) .flatMap(productId - productRepository.watchProductChanges(productId) // 返回一个无限流 .timeout(Duration.ofMinutes(10)) .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(5))) ) .distinctUntilChanged(ProductEvent::getVersion) // 去重 .sample(Duration.ofMillis(500)) // 采样控制发射频率 .doOnCancel(() - log.info(客户端断开产品流)) .doOnError(e - log.error(产品事件流异常, e)); } // --- 辅助方法 --- private MonoInteger getStockLevel(String sku) { return externalServiceClient.get() .uri(/inventory/{sku}, sku) .retrieve() .bodyToMono(Integer.class) .onErrorReturn(0); } private MonoRating getExternalRating(String sku) { return externalServiceClient.get() .uri(/ratings/{sku}, sku) .retrieve() .bodyToMono(Rating.class) .timeout(Duration.ofSeconds(2)) .onErrorReturn(Rating.defaultRating()); } private MonoProductDetailResponse fallbackDetail(String id) { return Mono.fromCallable(() - ProductDetailResponse.fallback(id) ).subscribeOn(Schedulers.boundedElastic()); } private MonoOrderResult processSingleOrder(OrderRequest req) { // 验证 - 扣库存 - 支付 - 持久化 的异步链 return validateRequest(req) .flatMap(validated - reserveStock(validated)) .flatMap(reserved - processPayment(reserved)) .flatMap(paid - saveOrder(paid)) .map(order - OrderResult.success(order)) .timeout(Duration.ofSeconds(30)) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)) .filter(e - e instanceof TemporaryServiceException) ); } // 内部类与异常定义省略... static class ProductDetailResponse { /* ... */ } static class OrderResult { /* ... */ } static class IndexedResult { /* ... */ } static class ProductNotFoundException extends RuntimeException { /* ... */ } static class TemporaryServiceException extends RuntimeException { /* ... */ } }9. 响应式流测试 (StepVerifier)使用StepVerifier是测试响应式流的行业标准它能验证流中元素的顺序、数量和完成信号 。import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; import java.time.Duration; import java.util.List; public class ReactiveServiceUnitTest { Test void testMapAndFilter() { FluxString source Flux.just(apple, banana, cherry, date); StepVerifier.create(source .map(String::toUpperCase) .filter(s - s.length() 5) ) .expectNext(BANANA, CHERRY) .verifyComplete(); } Test void testFlatMapConcurrency() { FluxInteger ids Flux.just(1, 2, 3); StepVerifier.create(ids.flatMap(id - Mono.just(id * 10).delayElement(Duration.ofMillis(100))) ) .expectNextCount(3) // 不保证顺序但数量是3 .verifyComplete(); } Test void testErrorHandlingAndRetry() { AtomicInteger attempt new AtomicInteger(); MonoString unreliable Mono.fromCallable(() - { if (attempt.incrementAndGet() 3) { throw new RuntimeException(临时失败); } return 成功; }); StepVerifier.create(unreliable.retry(2)) .expectNext(成功) .verifyComplete(); } Test void testSwitchMapCancellation() { TestPublisherString publisher TestPublisher.create(); FluxString source publisher.flux(); StepVerifier.create(source.switchMap(s - Mono.just(processed- s).delayElement(Duration.ofMillis(200))) ) .then(() - { publisher.next(first); publisher.next(second); // 第二个元素会取消第一个元素的处理 }) .expectNext(processed-second) // 只期望收到第二个的处理结果 .thenCancel() .verify(); } Test void testVirtualTimeForSlowOperations() { StepVerifier.withVirtualTime(() - Flux.interval(Duration.ofSeconds(1)).take(5) ) .expectSubscription() .thenAwait(Duration.ofSeconds(5)) // 虚拟时间快进5秒 .expectNextCount(5) .verifyComplete(); } Test void testBackpressure() { FluxInteger source Flux.range(1, 100); StepVerifier.create(source.onBackpressureBuffer(10)) .expectSubscription() .thenRequest(1) .expectNext(1) .thenRequest(5) .expectNext(2, 3, 4, 5, 6) .thenCancel() .verify(); } }参考来源响应式编程基石 Project Reactor源码解读Spring WebFlux响应式编程Spring-WebFlux使用一文带你从0开始学明白Spring-WebFlux学明白响应式编程Project Reactor 响应式编程Spring响应式编程之Reactor操作符
响应式编程:map与flatMap实战解析
核心操作符详解与实战优化1.map与flatMap转换与异步展开map用于同步的一对一转换而flatMap用于将每个元素异步映射为一个新的PublisherMono或Flux并展开合并结果是处理异步依赖操作如网络请求、数据库查询的核心 。import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.List; public class TransformOperators { // 场景1: map - 同步数据转换 public FluxString toUpperCase(FluxString names) { return names.map(String::toUpperCase); // 同步转换保持顺序 } // 场景2: flatMap异步一对多展开 (顺序不保证) public FluxString fetchDetailsAsync(ListInteger ids) { return Flux.fromIterable(ids) .flatMap(id - fetchFromRemoteService(id) // 返回 MonoString .subscribeOn(Schedulers.boundedElastic()) // 指定异步执行线程池 ); } // 场景3: concatMap - 异步一对多展开 (保证顺序) public FluxString fetchDetailsInOrder(ListInteger ids) { return Flux.fromIterable(ids) .concatMap(id - fetchFromRemoteService(id)); // 顺序执行保证输出顺序与输入一致 } // 场景4: flatMapSequential - 异步并发但按序输出 public FluxString fetchDetailsConcurrentOrdered(ListInteger ids) { return Flux.fromIterable(ids) .flatMapSequential(id - fetchFromRemoteService(id).subscribeOn(Schedulers.parallel()), 5 // 最大并发数 ); } private MonoString fetchFromRemoteService(Integer id) { // 模拟异步网络请求 return Mono.fromCallable(() - { Thread.sleep(100); // 模拟IO延迟 return Data for ID: id; }) .subscribeOn(Schedulers.boundedElastic()); // 在弹性线程池执行阻塞调用 } public static void main(String[] args) throws InterruptedException { TransformOperators demo new TransformOperators(); ListInteger ids List.of(1, 2, 3, 4, 5); System.out.println( flatMap (并发顺序不确定) ); demo.fetchDetailsAsync(ids).subscribe(System.out::println); System.out.println( concatMap (顺序执行) ); demo.fetchDetailsInOrder(ids).subscribe(System.out::println); System.out.println( flatMapSequential (并发但输出有序) ); demo.fetchDetailsConcurrentOrdered(ids).subscribe(System.out::println); Thread.sleep(2000); // 等待异步任务完成 } }关键区别操作符执行方式顺序保证适用场景map同步严格保证简单的数据格式转换、计算flatMap异步并发不保证独立的异步操作如并行API调用concatMap异步顺序严格保证有顺序依赖的异步操作如流水线处理flatMapSequential异步并发保证需要并发提升性能但结果需保持原序2.filter与distinct数据筛选filter根据条件过滤元素distinct用于去重常结合使用以净化数据流 。import reactor.core.publisher.Flux; import java.time.Duration; import java.util.Objects; public class FilteringOperators { // 场景1: 基础过滤与去重 public FluxInteger filterAndDistinct(FluxInteger numbers) { return numbers .filter(n - n % 2 0) // 过滤偶数 .distinct() // 去重 .filter(n - n 10); // 过滤大于10的数 } // 场景2: 基于时间的去重 (防抖变体) public FluxString distinctUntilChangedWithDebounce(FluxString searchStream) { return searchStream .filter(term - term ! null term.length() 2) .distinctUntilChanged() // 仅当与前一个元素不同时才发射 .sample(Duration.ofMillis(300)); // 采样实现防抖效果 } // 场景3: 条件取用 takeUntil / skipWhile public FluxInteger takeUntilCondition(FluxInteger infiniteStream) { return infiniteStream .takeUntil(n - n 50); // 发射元素直到条件为真包含使条件为真的元素 } public FluxInteger skipWhileCondition(FluxInteger numbers) { return numbers .skipWhile(n - n 10); // 跳过元素直到条件为假从第一个不满足条件的元素开始发射 } public static void main(String[] args) { FilteringOperators demo new FilteringOperators(); FluxInteger numbers Flux.just(1, 2, 2, 3, 4, 4, 5, 12, 12, 15); demo.filterAndDistinct(numbers).subscribe(n - System.out.print(n )); // 输出: 12 System.out.println( ---); FluxString searches Flux.just(java, java, jav, javascript, java); demo.distinctUntilChangedWithDebounce(searches).subscribe(System.out::println); // 输出: java (连续重复的java被过滤) - jav - javascript } }3.switchMap与concatMap高级映射与取消语义switchMap在收到新元素时会取消并丢弃前一个内部Publisher的未完成结果非常适合实现搜索建议、取消过时请求等场景 。concatMap则保证顺序且不会取消。import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; public class AdvancedMappingOperators { private AtomicInteger requestCounter new AtomicInteger(0); // 场景1: switchMap - 用于搜索建议取消前一个请求 public FluxString liveSearch(FluxString queryFlux) { return queryFlux .debounce(Duration.ofMillis(300)) // 防抖 .distinctUntilChanged() .switchMap(query - { int reqId requestCounter.incrementAndGet(); System.out.println(发起搜索请求[ reqId ]: query); // 模拟网络请求每个请求耗时不同 return simulateSearchApi(query) .doOnCancel(() - System.out.println(取消请求[ reqId ]: query)); }); } // 场景2: concatMap - 用于有严格顺序要求的异步写入 public FluxString sequentialWrite(FluxString dataFlux) { return dataFlux .concatMap(data - simulateWriteToDatabase(data) // 返回MonoVoid .thenReturn(写入成功: data) // 转换结果 ); } private FluxString simulateSearchApi(String query) { // 模拟一个延迟的API响应 long delay (long) (Math.random() * 500) 100; return Flux.just(结果1 for query, 结果2 for query) .delayElements(Duration.ofMillis(delay)) .doOnComplete(() - System.out.println(请求完成: query)); } private MonoVoid simulateWriteToDatabase(String data) { return Mono.fromRunnable(() - { try { Thread.sleep(200); // 模拟写入延迟 System.out.println(已写入数据库: data); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }) .subscribeOn(Schedulers.boundedElastic()) .then(); } public static void main(String[] args) throws InterruptedException { AdvancedMappingOperators demo new AdvancedMappingOperators(); System.out.println( switchMap 演示快速输入只保留最后一次 ); // 模拟快速输入 a, ab, abc FluxString rapidInput Flux.just(a, ab, abc) .delayElements(Duration.ofMillis(150)); // 间隔小于防抖时间 demo.liveSearch(rapidInput).subscribe( result - System.out.println(收到结果: result), error - System.err.println(错误: error), () - System.out.println(搜索流结束) ); Thread.sleep(1000); System.out.println( concatMap 演示保证顺序 ); FluxString data Flux.just(记录A, 记录B, 记录C); demo.sequentialWrite(data).subscribe(System.out::println); Thread.sleep(1500); } }4. 错误处理操作符实战响应式流的错误处理是声明式的操作符如onErrorReturn,onErrorResume,retry等允许优雅地定义错误恢复策略 。import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; public class ErrorHandlingDemo { private AtomicInteger callCount new AtomicInteger(0); // 场景1: 基础错误恢复 public MonoString fetchDataWithFallback(String id) { return unstableExternalService(id) .onErrorReturn(默认数据) // 1. 出错时返回静态值 .onErrorResume(IllegalArgumentException.class, e - fetchFromBackupService(id) // 2. 根据异常类型切换到备用流 ) .onErrorResume(e - { logError(e); return Mono.error(new BusinessException(业务异常, e)); // 3. 转换异常 }); } // 场景2: 重试策略 public MonoString fetchWithRetry(String id) { return unstableExternalService(id) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)) // 指数退避重试 .maxBackoff(Duration.ofSeconds(10)) .filter(throwable - !(throwable instanceof IllegalArgumentException)) // 特定异常不重试 .doBeforeRetry(rs - System.out.println(第 rs.totalRetries() 次重试...)) .onRetryExhaustedThrow((spec, signal) - new ServiceUnavailableException(服务不可用重试耗尽) ) ); } // 场景3: 超时控制 public MonoString fetchWithTimeout(String id) { return unstableExternalService(id) .timeout(Duration.ofSeconds(2), Mono.defer(() - Mono.just(降级数据)) // 超时后的降级 ) .onErrorMap(TimeoutException.class, e - new BusinessException(请求超时, e) ); } // 场景4: 在Flux流中处理错误继续处理后续元素 public FluxString processBatchWithErrorContinue(ListString ids) { return Flux.fromIterable(ids) .flatMap(id - processItem(id) .onErrorResume(e - { System.err.println(处理项 id 失败: e.getMessage()); return Mono.empty(); // 错误时返回空流不影响其他项 }) ); } private MonoString unstableExternalService(String id) { return Mono.defer(() - { int count callCount.incrementAndGet(); if (count % 3 0) { // 模拟每第三次调用失败 return Mono.error(new RuntimeException(模拟服务异常)); } return Mono.just(数据来自服务: id) .delayElement(Duration.ofMillis(500)); }); } private MonoString fetchFromBackupService(String id) { return Mono.just(数据来自备份: id); } private MonoString processItem(String id) { if (error.equals(id)) { return Mono.error(new RuntimeException(处理错误)); } return Mono.just(已处理: id); } private void logError(Throwable e) { System.err.println([ERROR] e.getMessage()); } public static void main(String[] args) throws InterruptedException { ErrorHandlingDemo demo new ErrorHandlingDemo(); System.out.println( 测试重试与超时 ); demo.fetchWithRetry(test123) .subscribe( data - System.out.println(成功: data), err - System.out.println(最终失败: err.getClass().getSimpleName() : err.getMessage()) ); Thread.sleep(5000); } }5. 背压Backpressure控制策略背压是响应式流的核心特性用于协调生产者和消费者的速度差异防止下游被数据淹没 。import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.reactivestreams.Subscription; import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; public class BackpressureDemo { // 场景1: 响应式拉取基础背压 public void demonstrateRequestPull() { Flux.range(1, 100) .log() // 使用log()观察请求事件 .subscribe( data - System.out.println(收到: data), err - err.printStackTrace(), () - System.out.println(完成), subscription - { subscription.request(5); // 初始请求5个 // 后续可根据处理能力动态调用 subscription.request(n) } ); } // 场景2: 缓冲策略 (onBackpressureBuffer) public FluxInteger handleFastProducer() { return Flux.interval(Duration.ofMillis(10)) // 快速生产者每10ms一个 .map(i - i.intValue()) .onBackpressureBuffer(50, // 缓冲区大小 dropped - System.out.println(元素被丢弃: dropped), // 缓冲区满时的回调 BufferOverflowStrategy.DROP_LATEST) // 策略丢弃最新的 .doOnNext(i - { try { Thread.sleep(100); // 慢速消费者每100ms处理一个 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println(处理: i); }); } // 场景3: 丢弃策略 (onBackpressureDrop) public FluxLong handleDropStrategy() { return Flux.interval(Duration.ofMillis(1)) .onBackpressureDrop(dropped - System.out.println(下游忙丢弃: dropped) ) .doOnNext(i - { // 模拟不稳定的处理速度 try { Thread.sleep((long) (Math.random() * 10)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } // 场景4: 最新值策略 (onBackpressureLatest) public FluxLong handleLatestStrategy() { return Flux.interval(Duration.ofMillis(1)) .onBackpressureLatest() // 只保留最新的元素供下游消费 .doOnNext(i - { try { Thread.sleep(100); // 慢消费者 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println(处理最新值: i); }); } // 场景5: 使用 limitRate 进行预取优化 public FluxInteger optimizeWithLimitRate() { return Flux.range(1, 1000) .log(before-limit) .limitRate(10) // 预取10个处理75%后即7个再请求10个 .log(after-limit) .doOnNext(i - { try { Thread.sleep(10); // 模拟处理 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } public static void main(String[] args) throws InterruptedException { BackpressureDemo demo new BackpressureDemo(); System.out.println( 演示缓冲策略 ); CountDownLatch latch new CountDownLatch(1); demo.handleFastProducer().take(10).subscribe( i - {}, err - err.printStackTrace(), latch::countDown ); latch.await(); } }6. 线程调度Schedulers最佳实践正确的线程调度是避免阻塞、提升性能的关键。subscribeOn指定上游执行线程publishOn影响下游执行线程 。import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import java.time.Duration; import java.util.concurrent.CompletableFuture; public class SchedulerDemo { // 场景1: subscribeOn - 指定源头执行线程 public MonoString fetchDataWithBlockingCall() { return Mono.fromCallable(() - { // 模拟阻塞IO操作如JDBC、文件读取 Thread.sleep(1000); return 阻塞IO数据; }) .subscribeOn(Schedulers.boundedElastic()) // 将阻塞操作移交到弹性线程池 .doOnSubscribe(s - System.out.println(订阅发生在线程: Thread.currentThread().getName())); } // 场景2: publishOn切换下游操作线程 public FluxInteger processWithThreadSwitch() { return Flux.range(1, 5) .doOnNext(i - System.out.println(生成 i 在线程: Thread.currentThread().getName())) .publishOn(Schedulers.parallel()) // 切换下游到并行线程池 .map(i - { System.out.println(映射 i 在线程: Thread.currentThread().getName()); return i * 2; }) .publishOn(Schedulers.single()) // 再次切换到单一线程 .doOnNext(i - System.out.println(最终处理 i 在线程: Thread.currentThread().getName())); } // 场景3: 并行处理 (parallel runOn) public FluxInteger parallelComputation(FluxInteger heavyTasks) { return heavyTasks .parallel(4) // 将流并行化为4个轨道 .runOn(Schedulers.parallel()) // 在每个轨道上使用并行调度器 .map(i - computeHeavy(i)) // 并行执行计算密集型任务 .sequential(); // 将并行流合并回顺序流 } // 场景4: 超时与调度器 public MonoString callWithTimeout() { return Mono.fromFuture(CompletableFuture.supplyAsync(() - { try { Thread.sleep(2000); // 模拟长时间任务 return 结果; } catch (InterruptedException e) { throw new RuntimeException(e); } })) .timeout(Duration.ofSeconds(1), Mono.defer(() - Mono.just(超时降级)) .subscribeOn(Schedulers.boundedElastic()) ) .subscribeOn(Schedulers.boundedElastic()); } private int computeHeavy(int input) { try { Thread.sleep(100); // 模拟计算 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return input * input; } public static void main(String[] args) throws InterruptedException { SchedulerDemo demo new SchedulerDemo(); System.out.println( publishOn 线程切换演示 ); demo.processWithThreadSwitch().subscribe(); Thread.sleep(100); System.out.println( 并行计算演示 ); demo.parallelComputation(Flux.range(1, 8)) .subscribe(i - System.out.println(结果: i)); Thread.sleep(2000); } }7. 组合操作符zip,merge,concat用于将多个流组合成一个流是处理多源数据的利器 。import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Duration; public class CombiningOperators { // 场景1: zip - 一对一组合像拉链 public FluxString zipUserAndProfile(FluxLong userIds, FluxString userNames) { return Flux.zip(userIds, userNames) .map(tuple - ID: tuple.getT1() , Name: tuple.getT2()); // 只有当两个流都有元素时才会组合发射 } // 场景2: combineLatest - 任一更新即组合最新值 public FluxString combineLatestInput(FluxString searchBox, FluxString category) { return Flux.combineLatest(searchBox, category, (s, c) - 搜索: s 在分类: c ); // 常用于实时仪表盘任一数据源更新都重新计算视图 } // 场景3: merge合并多个流按实际发射时间交错 public FluxString mergeNewsFeeds(FluxString feed1, FluxString feed2) { return Flux.merge(feed1, feed2).log(merge); // 结果顺序取决于元素实际到达时间 } // 场景4: concat顺序连接流先消费完第一个再消费第二个 public FluxString concatSequentialOperations(MonoString op1, MonoString op2) { return Flux.concat(op1, op2); // 严格保证 op1 在 op2 之前 } // 场景5: flatMap merge 实现并行合并 public FluxString fetchMultipleInParallel(ListString urls) { return Flux.fromIterable(urls) .flatMap(url - fetchUrl(url)) // 异步获取每个URL .map(response - Data: response); // 合并结果 } // 场景6: zip 等待所有异步操作完成 public MonoAggregatedResult aggregateUserData(Long userId) { MonoUser user fetchUser(userId); MonoListOrder orders fetchOrders(userId); MonoProfile profile fetchProfile(userId); return Mono.zip(user, orders, profile) .map(tuple - new AggregatedResult(tuple.getT1(), tuple.getT2(), tuple.getT3())); // 等待所有三个Mono完成然后组合结果 } private MonoString fetchUrl(String url) { return Mono.just(Response from url) .delayElement(Duration.ofMillis((long) (Math.random() * 500))); } private MonoUser fetchUser(Long id) { return Mono.just(new User(id, User id)); } private MonoListOrder fetchOrders(Long id) { return Mono.just(List.of(new Order(id))); } private MonoProfile fetchProfile(Long id) { return Mono.just(new Profile(id)); } static class User { /* 省略 */ } static class Order { /* 省略 */ } static class Profile { /* 省略 */ } static class AggregatedResult { /* 省略 */ } public static void main(String[] args) throws InterruptedException { CombiningOperators demo new CombiningOperators(); System.out.println( zip 演示 ); FluxLong ids Flux.just(1L, 2L, 3L).delayElements(Duration.ofMillis(100)); FluxString names Flux.just(Alice, Bob, Charlie).delayElements(Duration.ofMillis(150)); demo.zipUserAndProfile(ids, names).subscribe(System.out::println); Thread.sleep(1000); } }8. 综合实战优化后的 WebFlux REST API 端点结合上述操作符构建健壮、高效的响应式API 。import org.springframework.web.bind.annotation.*; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; import java.time.Duration; import java.util.List; RestController RequestMapping(/api/v1) public class OptimizedReactiveController { private final ProductReactiveRepository productRepository; private final WebClient externalServiceClient; // API 1: 获取产品详情并行获取库存与评价 GetMapping(/products/{id}/enhanced) public MonoProductDetailResponse getProductDetail(PathVariable String id) { return productRepository.findById(id) .switchIfEmpty(Mono.error(new ProductNotFoundException(id))) .flatMap(product - Mono.zip( getStockLevel(product.getSku()).subscribeOn(Schedulers.boundedElastic()), getExternalRating(product.getSku()).subscribeOn(Schedulers.boundedElastic()), (stock, rating) - buildDetailResponse(product, stock, rating) ) ) .timeout(Duration.ofSeconds(3), fallbackDetail(id)) .retryWhen(Retry.backoff(2, Duration.ofMillis(500))) .onErrorResume(ProductNotFoundException.class, e - Mono.just(ProductDetailResponse.notFound(id))) .onErrorResume(e - { log.error(获取产品详情失败ID: {}, id, e); return Mono.just(ProductDetailResponse.error(id)); }); } // API 2: 产品搜索带防抖、缓存、分页 GetMapping(/products/search) public FluxProduct searchProducts( RequestParam String q, RequestParam(defaultValue 0) int page, RequestParam(defaultValue 20) int size) { return Mono.just(q) .filter(term - term.length() 2) .switchIfEmpty(Mono.error(new IllegalArgumentException(搜索词过短))) .flatMapMany(term - productRepository.findByKeyword(term) .filter(Product::isActive) .filter(p - p.getStock() 0) .sort((p1, p2) - p2.getScore().compareTo(p1.getScore())) // 按相关性降序 .skip((long) page * size) .take(size) .switchIfEmpty(Flux.defer(() - productRepository.findFromCache(term).take(size) )) ) .onErrorResume(IllegalArgumentException.class, e - Flux.empty()) .onBackpressureBuffer(50, BufferOverflowStrategy.DROP_OLDEST); } // API 3: 批量订单创建并行处理限制并发优雅降级 PostMapping(/orders/batch) public FluxOrderResult createOrdersBatch(RequestBody FluxOrderRequest requests) { return requests .index() // 为每个请求附加索引便于追踪和保持顺序 .flatMap(tuple - { Long index tuple.getT1(); OrderRequest req tuple.getT2(); return processSingleOrder(req) .map(result - new IndexedResult(index, result)) .onErrorResume(e - Mono.just(new IndexedResult(index, OrderResult.failed(req.getId(), e.getMessage()))) ); }, 5) // 最大并发数5 .sort((ir1, ir2) - Long.compare(ir1.index(), ir2.index())) // 按原始请求顺序排序 .map(IndexedResult::result); } // API 4: 服务器发送事件 (Server-Sent Events) GetMapping(value /products/stream, produces text/event-stream) public FluxProductEvent streamProductEvents(RequestParam ListString productIds) { return Flux.fromIterable(productIds) .flatMap(productId - productRepository.watchProductChanges(productId) // 返回一个无限流 .timeout(Duration.ofMinutes(10)) .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(5))) ) .distinctUntilChanged(ProductEvent::getVersion) // 去重 .sample(Duration.ofMillis(500)) // 采样控制发射频率 .doOnCancel(() - log.info(客户端断开产品流)) .doOnError(e - log.error(产品事件流异常, e)); } // --- 辅助方法 --- private MonoInteger getStockLevel(String sku) { return externalServiceClient.get() .uri(/inventory/{sku}, sku) .retrieve() .bodyToMono(Integer.class) .onErrorReturn(0); } private MonoRating getExternalRating(String sku) { return externalServiceClient.get() .uri(/ratings/{sku}, sku) .retrieve() .bodyToMono(Rating.class) .timeout(Duration.ofSeconds(2)) .onErrorReturn(Rating.defaultRating()); } private MonoProductDetailResponse fallbackDetail(String id) { return Mono.fromCallable(() - ProductDetailResponse.fallback(id) ).subscribeOn(Schedulers.boundedElastic()); } private MonoOrderResult processSingleOrder(OrderRequest req) { // 验证 - 扣库存 - 支付 - 持久化 的异步链 return validateRequest(req) .flatMap(validated - reserveStock(validated)) .flatMap(reserved - processPayment(reserved)) .flatMap(paid - saveOrder(paid)) .map(order - OrderResult.success(order)) .timeout(Duration.ofSeconds(30)) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)) .filter(e - e instanceof TemporaryServiceException) ); } // 内部类与异常定义省略... static class ProductDetailResponse { /* ... */ } static class OrderResult { /* ... */ } static class IndexedResult { /* ... */ } static class ProductNotFoundException extends RuntimeException { /* ... */ } static class TemporaryServiceException extends RuntimeException { /* ... */ } }9. 响应式流测试 (StepVerifier)使用StepVerifier是测试响应式流的行业标准它能验证流中元素的顺序、数量和完成信号 。import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; import java.time.Duration; import java.util.List; public class ReactiveServiceUnitTest { Test void testMapAndFilter() { FluxString source Flux.just(apple, banana, cherry, date); StepVerifier.create(source .map(String::toUpperCase) .filter(s - s.length() 5) ) .expectNext(BANANA, CHERRY) .verifyComplete(); } Test void testFlatMapConcurrency() { FluxInteger ids Flux.just(1, 2, 3); StepVerifier.create(ids.flatMap(id - Mono.just(id * 10).delayElement(Duration.ofMillis(100))) ) .expectNextCount(3) // 不保证顺序但数量是3 .verifyComplete(); } Test void testErrorHandlingAndRetry() { AtomicInteger attempt new AtomicInteger(); MonoString unreliable Mono.fromCallable(() - { if (attempt.incrementAndGet() 3) { throw new RuntimeException(临时失败); } return 成功; }); StepVerifier.create(unreliable.retry(2)) .expectNext(成功) .verifyComplete(); } Test void testSwitchMapCancellation() { TestPublisherString publisher TestPublisher.create(); FluxString source publisher.flux(); StepVerifier.create(source.switchMap(s - Mono.just(processed- s).delayElement(Duration.ofMillis(200))) ) .then(() - { publisher.next(first); publisher.next(second); // 第二个元素会取消第一个元素的处理 }) .expectNext(processed-second) // 只期望收到第二个的处理结果 .thenCancel() .verify(); } Test void testVirtualTimeForSlowOperations() { StepVerifier.withVirtualTime(() - Flux.interval(Duration.ofSeconds(1)).take(5) ) .expectSubscription() .thenAwait(Duration.ofSeconds(5)) // 虚拟时间快进5秒 .expectNextCount(5) .verifyComplete(); } Test void testBackpressure() { FluxInteger source Flux.range(1, 100); StepVerifier.create(source.onBackpressureBuffer(10)) .expectSubscription() .thenRequest(1) .expectNext(1) .thenRequest(5) .expectNext(2, 3, 4, 5, 6) .thenCancel() .verify(); } }参考来源响应式编程基石 Project Reactor源码解读Spring WebFlux响应式编程Spring-WebFlux使用一文带你从0开始学明白Spring-WebFlux学明白响应式编程Project Reactor 响应式编程Spring响应式编程之Reactor操作符