定制一个生产的WebClient

定制一个生产的WebClient 1 为什么要用 WebClient刚开始尝试使用 Spring WebFlux 的时候很多人都会使用 Mono.fromFuture() 将异步请求转成 Mono 对象或者 Mono.fromSupplier() 将请求转成 MOno 对象这两种方式在响应式编程中都是不建议的都会阻塞当前线程。1.1 Mono.fromFuture() VS WebClientMono.fromFuture()方法和使用 WebClient 调用第三方接口之间存在以下区别异步 vs. 非阻塞Mono.fromFuture()方法适用于接收一个 java.util.concurrent.Future 对象并将其转换为响应式的 Mono。这是一个阻塞操作因为它会等待 Future 对象完成。而使用 WebClient 调用第三方接口是异步和非阻塞的它不会直接阻塞应用程序的执行而是使用事件驱动的方式处理响应。可扩展性和灵活性使用 WebClient 可以更灵活地进行配置和处理例如设置超时时间、请求头、重试机制等。WebClient 还可以与许多其他 Spring WebFlux 组件集成如 WebSockets、Server-Sent Events 等。而 Mono.fromFuture() 是适用于单个 Future 对象转化为 Mono 的情况可扩展性较差。错误处理WebClient 提供了更丰富的错误处理机制可以通过 onStatus、onError 等方法来处理不同的 HTTP 状态码或异常。同时WebClient 还提供了更灵活的重试和回退策略。Mono.fromFuture() 方法只能将 Future 对象的结果包装在 Mono 中不提供特定的错误处理机制。阻塞操作Mono.fromFuture() 会阻塞。当调用 Mono.fromFuture() 方法将 Future 转换为 Mono 时它会等待 Future 对象的结果返回。在这个等待的过程中Mono.fromFuture()方法会阻塞当前的线程。这意味着如果 Future 的结果在运行过程中没有返回则当前线程会一直阻塞直到 Future 对象返回结果或者超时。因此在使用 Mono.fromFuture() 时需要注意潜在的阻塞风险。另外需要确保F uture 的任务在后台线程中执行以免阻塞应用程序的主线程。1.2 Mono.fromFuture VS Mono.fromSupplierMono.fromSupplier() 和 Mono.fromFuture() 都是用于将异步执行的操作转换为响应式的 Mono 对象但它们的区别在于Mono.fromSupplier() 适用于一个提供者/生产者可以用来表示某个操作的结果该操作是一些纯计算并且没有阻塞的方法。也就是说Mono.fromSupplier() 将其参数 (Supplier) 所提供的操作异步执行并将其结果打包成一个 Mono 对象。Mono.fromFuture() 适用于一个 java.util.concurrent.Future 对象将其封装成 Mono 对象。这意味着调用 Mono.fromFuture() 方法将阻塞当前线程直到异步操作完成返回一个 Future 对象。因此Mono.fromSupplier() 与 Mono.fromFuture() 的主要区别在于Mono.fromSupplier() 是一个非阻塞的操作不会阻塞当前线程。这个方法用于执行计算型的任务返回一个封装了计算结果的 Mono 对象。 Mono.fromFuture() 是阻塞操作会阻塞当前线程直到异步操作完毕并返回看它适用于处理 java.util.concurrent.Future 对象。需要注意的是如果 Supplier 提供的操作是阻塞的则 Mono.fromSupplier() 方法本身也会阻塞线程。但通常情况下Supplier 提供的操作是纯计算型的不会阻塞线程。因此可以使用 Mono.fromSupplier() 方法将一个纯计算型的操作转换为 Mono 对象而将一个异步返回结果的操作转换为 Mono 对象时可以使用 Mono.fromFuture() 方法。2 定制化自己的 WebClient2.1 初始化 WebClientWebClient 支持建造者模式使用 WebClient 建造者模式支持开发自己的个性化 WebClient比如支持设置接口调用统一耗时、自定义底层 Http 客户端、调用链路、打印接口返回日志、监控接口耗时等等。WebClient builder 支持以下方法interface Builder { /** * 配置请求基础的url如baseUrl https://abc.go.com/v1;和 uriBuilderFactory 冲突如果有 uriBuilderFactory 则忽略 baseUrl */ Builder baseUrl(String baseUrl); /** * URI 请求的默认变量。也和 uriBuilderFactory 冲突如果有 uriBuilderFactory 则忽略 defaultUriVariables */ Builder defaultUriVariables(MapString, ? defaultUriVariables); /** * 提供一个预配置的UriBuilderFactory实例 */ Builder uriBuilderFactory(UriBuilderFactory uriBuilderFactory); /** * 默认 header */ Builder defaultHeader(String header, String... values); /** * 默认cookie */ Builder defaultCookie(String cookie, String... values); /** * 提供一个 consumer 来定制每个请求 */ Builder defaultRequest(ConsumerRequestHeadersSpec? defaultRequest); /** * 添加一个filter可以添加多个 */ Builder filter(ExchangeFilterFunction filter); /** * 配置要使用的 ClientHttpConnector。这对于插入或自定义底层HTTP 客户端库(例如SSL)的选项非常有用。 */ Builder clientConnector(ClientHttpConnector connector); /** * Configure the codecs for the {code WebClient} in the * {link #exchangeStrategies(ExchangeStrategies) underlying} * {code ExchangeStrategies}. * param configurer the configurer to apply * since 5.1.13 */ Builder codecs(ConsumerClientCodecConfigurer configurer); /** * 提供一个预先配置了ClientHttpConnector和ExchangeStrategies的ExchangeFunction。 这是对 clientConnector 的一种替代并且有效地覆盖了它们。 */ Builder exchangeFunction(ExchangeFunction exchangeFunction); /** * Builder the {link WebClient} instance. */ WebClient build(); // 其他方法 }2.2 日志打印及监控打印参数、url、返回参数和返回需要转成json需要打印正常返回日志和异常正常监控、异常监控、总监控以及响应时间.doOnSuccess(response- { log.info(get.success, url{}, response{}, param{}, url, response); }) .doOnError(error- { log.info(get.error, url{}, url, error); // 监控 }) .doFinally(res- { //监控 })2.3 返回处理retrieve() // 声明如何提取响应。例如提取一个ResponseEntity的状态头部和身体: .bodyToMono(clazz) 将返回body内容转成clazz对象clazz 对象可以自己指定类型。如果碰到有问题的无法转化的也可以先转成String然后自己实现一个工具类将String转成 class 对象。2.3.1 getpublic T MonoT get(String url, ClassT clazz, T defaultClass) { long start System.currentTimeMillis(); return webClient.get() .uri(url) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response- { log.info(get.success, url{}, response{}, param{}, url, response); }) .doOnError(error- { log.info(get.param.error, url{}, url, error); }) .onErrorReturn(defaultClass) .doFinally(res- { }) .publishOn(customScheduler); }2.3.2 get param 请求public T MonoT getParam(String url, MultiValueMapString, String param, ClassT clazz, T defaultClass) { long start System.currentTimeMillis(); URI uri UriComponentsBuilder.fromUriString(url) .queryParams(param) .build() .toUri(); return webClient.get() .uri(uri) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response- { log.info(get.param.success, url{}, response{}, param{}, url, response, JsonUtil.toJson(param)); }) .doOnError(error- { log.error(get.param.error, url{}, param{}, url, JsonUtil.toJson(param), error); }) .onErrorReturn(defaultClass) .doFinally(res- { // 监控 or 打印日志 or 耗时 }) .publishOn(customScheduler); }2.3.3 post json 请求public T MonoT postJson(String url, final HttpParameter4Json parameter, ClassT clazz, T defaultClass) { final long start System.currentTimeMillis(); return webClient.post() .uri(url) .contentType(MediaType.APPLICATION_JSON) .cookies(cookies - cookies.setAll(parameter.getCookies())) .body(Mono.just(parameter.getJsonBody()), ParameterizedTypeReference.forType(parameter.getBodyType())) .headers(headers - headers.setAll(parameter.getHeaders())) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response- { log.info(post.json.success, url{}, response{}, param{}, url, response, parameter.getJsonBody()); }) .doOnError(error- { log.error(get.param.error, url{}, param{}, url, parameter.getJsonBody(), error); }) .onErrorReturn(defaultClass) .doFinally(res- { }) .publishOn(customScheduler); }2.3.4 post form Data 请求public T MonoT postFormData(String url, HttpParameter parameter, ClassT clazz, T defaultClass) { final long start System.currentTimeMillis(); return webClient.post() .uri(url) .contentType(MediaType.APPLICATION_FORM_URLENCODED) .cookies(cookies - cookies.setAll(parameter.getCookies())) .body(BodyInserters.fromFormData(parameter.getMultiValueMapParam())) .headers(headers - headers.setAll(parameter.getMapHeaders())) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response- { log.info(post.fromData.success, url{}, response{}, param{}, url, response, JsonUtil.toJson(parameter)); }) .doOnError(error- { log.info(get.param.error, url{}, param{}, url, JsonUtil.toJson(parameter), error); }) .onErrorReturn(defaultClass) .doFinally(res- { }) .publishOn(customScheduler); }2.4 异常处理2.4.1 异常返回兜底onErrorReturn 发现异常返回兜底数据2.4.2 异常处理状态码转成异常抛出.onStatus(HttpStatus::isError, response - Mono.error(new RuntimeException(Request failed with status code: response.statusCode())))监控异常.doOnError(error - { // log and monitor })3 完整的 WebClientpackage com.geniu.reactor.webclient; import com.geniu.utils.JsonUtil; import io.netty.channel.ChannelOption; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.util.MultiValueMap; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.util.UriComponentsBuilder; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.netty.http.client.HttpClient; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.LoopResources; import reactor.netty.tcp.SslProvider; import reactor.netty.tcp.TcpClient; import java.net.URI; import java.time.Duration; import java.util.function.Function; /** * Author: prepared * Date: 2023/8/15 11:05 */ Slf4j public class CustomerWebClient { public static final CustomerWebClient instance new CustomerWebClient(); /** * 限制并发数 100 */ Scheduler customScheduler Schedulers.newParallel(CustomScheduler, 100); private final WebClient webClient; private CustomerWebClient() { final SslContextBuilder sslBuilder SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE); final SslProvider ssl SslProvider.builder().sslContext(sslBuilder) .defaultConfiguration(SslProvider.DefaultConfigurationType.TCP).build(); final int cpuCores Runtime.getRuntime().availableProcessors(); final int selectorCount Math.max(cpuCores / 2, 4); final int workerCount Math.max(cpuCores * 2, 8); final LoopResources pool LoopResources.create(HCofSWC, selectorCount, workerCount, true); final Function? super TcpClient, ? extends TcpClient tcpMapper tcp - tcp .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .option(ChannelOption.SO_TIMEOUT, 10000) .secure(ssl) .runOn(pool); ConnectionProvider.Builder httpClientOfSWC ConnectionProvider .builder(HttpClientOfSWC) .maxConnections(100_000) .pendingAcquireTimeout(Duration.ofSeconds(6)); final ConnectionProvider connectionProvider httpClientOfSWC.build(); final HttpClient hc HttpClient.create(connectionProvider) .tcpConfiguration(tcpMapper); final FunctionHttpClient, HttpClient hcMapper rhc - rhc .compress(true); final WebClient.Builder wcb WebClient.builder() .clientConnector(new ReactorClientHttpConnector(hcMapper.apply(hc))); // .filter(new TraceRequestFilter()); 可以通过Filter 增加trace追踪 this.webClient wcb.build(); } public T MonoT get(String url, ClassT clazz, T defaultClass) { long start System.currentTimeMillis(); return webClient.get() .uri(url) .accept(MediaType.APPLICATION_JSON) .retrieve() .onStatus(HttpStatus::isError, response - Mono.error(new RuntimeException(Request failed with status code: response.statusCode()))) .bodyToMono(clazz) .doOnSuccess(response- { log.info(get.success, url{}, response{}, param{}, url, response); }) .doOnError(error- { log.info(get.param.error, url{}, url, error); }) .onErrorReturn(defaultClass) .doFinally(res- { }) .publishOn(customScheduler); } public T MonoT getParam(String url, MultiValueMapString, String param, ClassT clazz, T defaultClass) { long start System.currentTimeMillis(); URI uri UriComponentsBuilder.fromUriString(url) .queryParams(param) .build() .toUri(); return webClient.get() .uri(uri) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response- { log.info(get.param.success, url{}, response{}, param{}, url, response, JsonUtil.toJson(param)); }) .doOnError(error- { log.error(get.param.error, url{}, param{}, url, JsonUtil.toJson(param), error); }) .onErrorReturn(defaultClass) .doFinally(res- { }) .publishOn(customScheduler); } public T MonoT postJson(String url, final HttpParameter4Json parameter, ClassT clazz, T defaultClass) { final long start System.currentTimeMillis(); return webClient.post() .uri(url) .contentType(MediaType.APPLICATION_JSON) .cookies(cookies - cookies.setAll(parameter.getCookies())) .body(Mono.just(parameter.getJsonBody()), ParameterizedTypeReference.forType(parameter.getBodyType())) .headers(headers - headers.setAll(parameter.getHeaders())) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response- { log.info(post.json.success, url{}, response{}, param{}, url, response, parameter.getJsonBody()); }) .doOnError(error- { log.error(get.param.error, url{}, param{}, url, parameter.getJsonBody(), error); }) .onErrorReturn(defaultClass) .doFinally(res- { }) .publishOn(customScheduler); } public T MonoT postFormData(String url, HttpParameter parameter, ClassT clazz, T defaultClass) { final long start System.currentTimeMillis(); return webClient.post() .uri(url) .contentType(MediaType.APPLICATION_FORM_URLENCODED) .cookies(cookies - cookies.setAll(parameter.getCookies())) .body(BodyInserters.fromFormData(parameter.getMultiValueMapParam())) .headers(headers - headers.setAll(parameter.getMapHeaders())) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(clazz) .doOnSuccess(response- { log.info(post.fromData.success, url{}, response{}, param{}, url, response, JsonUtil.toJson(parameter)); }) .doOnError(error- { log.info(get.param.error, url{}, param{}, url, JsonUtil.toJson(parameter), error); }) .onErrorReturn(defaultClass) .doFinally(res- { }) .publishOn(customScheduler); } }