ReactiveSpring实战–WebFlux使用

WebFlux是Spring 5提供的响应式Web应用框架。
它是完全非阻塞的,可以在Netty,Undertow和Servlet 3.1+等非阻塞服务器上运行。
本文主要介绍WebFlux的使用。

FluxWeb vs noFluxWeb

WebFlux是完全非阻塞的。
在FluxWeb前,我们可以使用DeferredResult和AsyncRestTemplate等方式实现非阻塞的Web通信。
我们先来比较一下这两者。

注意:关于同步阻塞与异步非阻塞的性能差异,本文不再阐述。
阻塞即浪费。我们通过异步实现非阻塞。只有存在阻塞时,异步才能提高性能。如果不存在阻塞,使用异步反而可能由于线程调度等开销导致性能下降。

下面例子模拟一种业务场景。
订单服务提供接口查找订单信息,同时,该接口实现还需要调用仓库服务查询仓库信息,商品服务查询商品信息,并过滤,取前5个商品数据。

OrderService提供如下方法

public void getOrderByRest(DeferredResult<Order> rs, long orderId) {
    // [1]
    Order order = mockOrder(orderId);
    // [2]
    ListenableFuture<ResponseEntity<User>> userLister = asyncRestTemplate.getForEntity("http://user-service/user/mock/" + 1, User.class);
    ListenableFuture<ResponseEntity<List<Goods>>> goodsLister =
                    asyncRestTemplate.exchange("http://goods-service/goods/mock/list?ids=" + StringUtils.join(order.getGoodsIds(), ","),
                            HttpMethod.GET,  null, new ParameterizedTypeReference<List<Goods>>(){});
    // [3]
    CompletableFuture<ResponseEntity<User>> userFuture = userLister.completable().exceptionally(err -> {
        logger.warn("get user err", err);
        return new ResponseEntity(new User(), HttpStatus.OK);
    });
    CompletableFuture<ResponseEntity<List<Goods>>> goodsFuture = goodsLister.completable().exceptionally(err -> {
        logger.warn("get goods err", err);
        return new ResponseEntity(new ArrayList<>(), HttpStatus.OK);
    });
    // [4]
    warehouseFuture.thenCombineAsync(goodsFuture, (warehouseRes, goodsRes)-> {
            order.setWarehouse(warehouseRes.getBody());
            List<Goods> goods = goodsRes.getBody().stream()
                    .filter(g -> g.getPrice() > 10).limit(5)
                    .collect(Collectors.toList());
            order.setGoods(goods);
        return order;
    }).whenCompleteAsync((o, err)-> {
        // [5]
        if(err != null) {
            logger.warn("err happen:", err);
        }
        rs.setResult(o);
    });
}
复制代码
  1. 加载订单数据,这里mack了一个数据。
  2. 通过asyncRestTemplate获取仓库,产品信息,得到ListenableFuture。
  3. 设置ListenableFuture异常处理,避免因为某个请求报错导致接口失败。
  4. 合并仓库,产品请求结果,组装订单数据
  5. 通过DeferredResult设置接口返回数据。

可以看到,代码较繁琐,通过DeferredResult返回数据的方式也与我们同步接口通过方法返回值返回数据的方式大相径庭。

这里实际存在两处非阻塞

  1. 使用AsyncRestTemplate实现发送异步Http请求,也就是说通过其他线程调用仓库服务和产品服务,并返回CompletableFuture,所以不阻塞getOrderByRest方法线程。
  2. DeferredResult负责异步返回Http响应。

getOrderByRest方法中并不阻塞等待AsyncRestTemplate返回,而是直接返回,等到AsyncRestTemplate返回后通过回调函数设置DeferredResult的值将数据返回给Http,可对比以下阻塞等待的代码

ResponseEntity<Warehouse> warehouseRes = warehouseFuture.get();
ResponseEntity<List<Goods>> goodsRes = goodsFuture.get();
order.setWarehouse(warehouseRes.getBody());
order.setGoods(goodsRes.getBody());
return order;
复制代码

下面我们使用WebFlux实现。
pom引入依赖

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
复制代码

服务启动类OrderServiceReactive

@EnableDiscoveryClient
@SpringBootApplication
public class OrderServiceReactive
{
    public static void main( String[] args )
    {
        new SpringApplicationBuilder(
                OrderServiceReactive.class)
                .web(WebApplicationType.REACTIVE).run(args);
    }
}
复制代码

WebApplicationType.REACTIVE启动WebFlux。

OrderController实现如下

@GetMapping("/{id}")
public Mono<Order> getById(@PathVariable long id) {
    return service.getOrder(id);
}
复制代码

注意返回一个Mono数据,Mono与Flux是Spring Reactor提供的异步数据流。
WebFlux中通常使用Mono,Flux作为数据输入,输出值。
当接口返回Mono,Flux,Spring知道这是一个异步请求结果。
关于Spring Reactor,可参考《理解Reactor的设计与实现

OrderService实现如下

public Mono<Order> getOrder(long orderId) {
    // [1]
    Mono<Order> orderMono = mockOrder(orderId);
    // [2]
    return orderMono.flatMap(o -> {
        // [3]
        Mono<User> userMono =  getMono("http://user-service/user/mock/" + o.getUserId(), User.class).onErrorReturn(new User());
        Flux<Goods> goodsFlux = getFlux("http://goods-service/goods/mock/list?ids=" +
                StringUtils.join(o.getGoodsIds(), ","), Goods.class)
                .filter(g -> g.getPrice() > 10)
                .take(5)
                .onErrorReturn(new Goods());
        // [4]
        return userMono.zipWith(goodsFlux.collectList(), (u, gs) -> {
            o.setUser(u);
            o.setGoods(gs);
            return o;
        });
    });
}

private <T> Mono<T> getMono(String url, Class<T> resType) {
    return webClient.get().uri(url).retrieve().bodyToMono(resType);
}

// getFlux
复制代码
  1. 加载订单数据,这里mock了一个Mono数据
  2. flatMap方法可以将Mono中的数据转化类型,这里转化后的结果还是Order。
  3. 获取仓库,产品数据。这里可以看到,对产品过滤,取前5个的操作可以直接添加到Flux上。
  4. zipWith方法可以组合两个Mono,并返回新的Mono类型,这里组合仓库、产品数据,最后返回Mono。

可以看到,代码整洁不少,并且接口返回Mono,与我们在同步接口中直接数据的做法类似,不需要借助DeferredResult这样的工具类。

我们通过WebClient发起异步请求,WebClient返回Mono结果,虽然它并不是真正的数据(它是一个数据发布者,等请求数据返回后,它才把数据送过来),但我们可以通过操作符方法对他添加逻辑,如过滤,排序,组合,就好像同步操作时已经拿到数据那样。
而在AsyncRestTemplate,则所有的逻辑都要写到回调函数中。

WebFlux是完全非阻塞的。
Mono、Flux的组合函数非常有用。
上面方法中先获取订单数据,再同时获取仓库,产品数据,
如果接口参数同时传入了订单id,仓库id,产品id,我们也可以同时获取这三个数据,再组装起来

public Mono<Order> getOrder(long orderId, long warehouseId, List<Long> goodsIds) {
    Mono<Order> orderMono = mockOrderMono(orderId);

    return orderMono.zipWith(getMono("http://warehouse-service/warehouse/mock/" + warehouseId, Warehouse.class), (o,w) -> {
        o.setWarehouse(w);
        return o;
    }).zipWith(getFlux("http://goods-service/goods/mock/list?ids=" +
            StringUtils.join(goodsIds, ","), Goods.class)
            .filter(g -> g.getPrice() > 10).take(5).collectList(), (o, gs) -> {
        o.setGoods(gs);
        return o;
    });
}
复制代码

如果我们需要串行获取订单,仓库,商品这三个数据,实现如下

public Mono<Order> getOrderInLabel(long orderId) {
    Mono<Order> orderMono = mockOrderMono(orderId);

    return orderMono.zipWhen(o -> getMono("http://warehouse-service/warehouse/mock/" + o.getWarehouseId(), Warehouse.class), (o, w) -> {
        o.setWarehouse(w);
        return o;
    }).zipWhen(o -> getFlux("http://goods-service/goods/mock/list?ids=" +
                    StringUtils.join(o.getGoodsIds(), ",") + "&label=" + o.getWarehouse().getLabel() , Goods.class)
            .filter(g -> g.getPrice() > 10).take(5).collectList(), (o, gs) -> {
        o.setGoods(gs);
        return o;
    });
}
复制代码

zipWith方法会同时请求待合并的两个Mono数据,而zipWhen方法则会阻塞等待第一个Mono数据到达在请求第二个Mono数据。
orderMono.zipWhen(...).zipWhen(...),第一个zipWhen方法会阻塞等待orderMono数据返回再使用order数据构造新的Mono数据,第二个zipWhen方法也会等待前面zipWhen构建的Mono数据返回再构建新Mono,
所以在第二个zipWhen方法中,可以调用o.getWarehouse().getLabel(),因为第一个zipWhen已经获取到仓库信息。

下面说一个WebFlux的使用。
分为两部分,WebFlux服务端与WebClient。

WebFlux服务端

底层容器切换

WebFlux默认使用Netty实现服务端异步通信,可以通过更换依赖包切换底层容器

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <exclusions>
    <exclusion>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-netty</artifactId>
    </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
复制代码

注解

WebFlux支持SpringMvc大部分的注解,如
映射:@Controller,@GetMapping,@PostMapping,@PutMapping,@DeleteMapping
参数绑定:@PatchMapping,@RequestParam,@RequestBody,@RequestHeader,@PathVariable,@RequestAttribute,@SessionAttribute
结果解析:@ResponseBody,@ModelAttribute
这些注解的使用方式与springMvc相同

命令式映射

WebFlux支持使用命令式编程指定映射关系

@Bean
public RouterFunction<ServerResponse> monoRouterFunction(InvoiceHandler invoiceHandler) {
    return route()
            .GET("/invoice/{orderId}",  accept(APPLICATION_JSON), invoiceHandler::get)
            .build();
}
复制代码

调用"/invoice/{orderId}",请求会转发到invoiceHandler#get方法

invoiceHandler#get方法实现如下

public Mono<ServerResponse> get(ServerRequest request) {
    Invoice invoice = new Invoice();
    invoice.setId(999L);
    invoice.setOrderId(Long.parseLong(request.pathVariable("orderId")));
    return ok().contentType(APPLICATION_JSON).body(Mono.just(invoice), Warehouse.class);
}
复制代码

Filter

可以通过实现WebFilter接口添加过滤器

@Component
public class TokenCheckFilter implements WebFilter {
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        if(!exchange.getRequest().getHeaders().containsKey("token")) {
            ServerHttpResponse response =  exchange.getResponse();
            response.setStatusCode(HttpStatus.FORBIDDEN);
            response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
            return response.writeWith(Mono.just(response.bufferFactory().wrap("{\"msg\":\"no token\"}".getBytes())));
        } else {
            exchange.getAttributes().put("auth", "true");
            return chain.filter(exchange);
        }
    }
}
复制代码

上面实现的是前置过滤器,在调用逻辑方法前的检查请求token

实现后置过滤器代码如下

@Component
public class LogFilter  implements WebFilter {
    private static final Logger logger = LoggerFactory.getLogger(LogFilter.class);
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        // [1]
        logger.info("request before, url:{}, statusCode:{}", exchange.getRequest().getURI(), exchange.getResponse().getStatusCode());
        return chain.filter(exchange)
            .doFinally(s -> {
                // [2]
                logger.info("request after, url:{}, statusCode:{}", exchange.getRequest().getURI(), exchange.getResponse().getStatusCode());
            });
    }
}
复制代码

注意,[1]处exchange.getResponse()返回的是初始化状态的response,并不是请求处理后返回的response。

异常处理

通过@ExceptionHandler注解定义一个全局的异常处理器

@ControllerAdvice
public class ErrorController {
    private static final Logger logger = LoggerFactory.getLogger(ErrorController.class);

    @ResponseBody
    @ExceptionHandler({NullPointerException.class})
    @ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
    public String nullException(NullPointerException e) {
        logger.error("global err handler", e);
        return "{\"msg\":\"There is a problem\"}";
    }
}
复制代码

WebFluxConfigurer

WebFlux中可以通过WebFluxConfigurer做自定义配置,如配置自定义的结果解析

@Configuration
@EnableWebFlux
public class WebConfig implements WebFluxConfigurer {
    public void configureArgumentResolvers(ArgumentResolverConfigurer configurer) {
        configurer.addCustomResolver(new HandlerMethodArgumentResolver() {
            ...
        });
    }

    public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
        configurer.customCodecs().register(new HttpMessageWriter() {
            ...
        });
    }
}
复制代码

configureArgumentResolvers方法配置参数绑定处理器
configureHttpMessageCodecs方法配置Http请求报文,响应报文解析器

@EnableWebFlux要求Spring从WebFluxConfigurationSupport引入Spring WebFlux 配置。如果你的依赖中引入了spring-boot-starter-webflux,Spring WebFlux 将自动配置,不需要添加该注解。
但如果你只使用Spring WebFlux而没有使用Spring Boot,这是需要添加@EnableWebFlux启动Spring WebFlux自动化配置。

Spring Flux支持CORS,Spring Security,HTTP/2,更多内容不再列出,请参考官方文档。

WebClient

WebClient可以发送异步Web请求,并支持响应式编程。
下面说一个WebClient的使用。

底层框架

WebClient底层使用的Netty实现异步Http请求,我们可以切换底层库,如Jetty

@Bean
public JettyResourceFactory resourceFactory() {
    return new JettyResourceFactory();
}

@Bean
public WebClient webClient() {
    HttpClient httpClient = HttpClient.create();
    ClientHttpConnector connector =
            new JettyClientHttpConnector(httpClient, resourceFactory());
    return WebClient.builder().clientConnector(connector).build();
}
复制代码

连接池

WebClient默认是每个请求创建一个连接。
我们可以配置连接池复用连接,以提高性能。

ConnectionProvider provider = ConnectionProvider.builder("order")
    .maxConnections(100)
    .maxIdleTime(Duration.ofSeconds(30))
    .pendingAcquireTimeout(Duration.ofMillis(100))  
    .build();
return WebClient
    .builder().clientConnector(new ReactorClientHttpConnector(HttpClient.create(provider)));
复制代码

maxConnections:允许的最大连接数
pendingAcquireTimeout:没有连接可用时,请求等待的最长时间
maxIdleTime:连接最大闲置时间

超时

底层使用Netty时,可以如下配置超时时间

import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;

HttpClient httpClient = HttpClient.create()
        .doOnConnected(conn -> conn
                .addHandlerLast(new ReadTimeoutHandler(10))
                .addHandlerLast(new WriteTimeoutHandler(10)));
复制代码

或者直接使用responseTimeout

HttpClient httpClient = HttpClient.create()
        .responseTimeout(Duration.ofSeconds(2));
复制代码
Post Json

WebClient可以发送json,form,文件等请求报文,
看一个最常用的Post Json请求

webClient.post().uri("http://localhost:9004/order/")
    .contentType(MediaType.APPLICATION_JSON)
    .body(Mono.just(order), Order.class)
    .retrieve().bodyToMono(String.class)
复制代码

异常处理

可以在ResponseSpec中指定异常处理

private <T> Mono<T> getMono(String url, Class<T> resType) {
return webClient
    .get().uri(url).retrieve()
    .onStatus(HttpStatus::is5xxServerError, clientResponse -> {
        return Mono.error(...);
    })
    .onStatus(HttpStatus::is4xxClientError, clientResponse -> {
        return Mono.error(...);
    })
    .onStatus(HttpStatus::isError, clientResponse -> {
        return Mono.error(...);
    })
    .bodyToMono(resType)
}
复制代码

也可以在HttpClient上配置

HttpClient httpClient = HttpClient.create()
        .doOnError((req, err) -> {
            log.error("err on request:{}", req.uri(), err);
        }, (res, err) -> {
            log.error("err on response:{}", res.uri(), err);
        })
复制代码

同步返回结果

使用block方法可以阻塞线程,等待请求返回

private <T> T syncGetMono(String url, Class<T> resType) {
    return webClient
            .get().uri(url).retrieve()
            .bodyToMono(resType).block();
}
复制代码

获取响应信息

exchangeToMono可以获取到响应的header,statusCode等信息

private <T> Mono<T> getMonoWithInfo(String url, Class<T> resType) {
    return webClient
            .get()
            .uri(url)
            .exchangeToMono(response -> {
                logger.info("request url:{},statusCode:{},headers:{}", url, response.statusCode(), response.headers());
                return response.bodyToMono(resType);
            });
}
复制代码

注册中心与Ribbon

经验证,WebClient支持Eureka注册中心与Ribbon转发,使用方式与restTemplate相同。
不过@LoadBalanced需要添加在WebClient.Builder上

@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
    return WebClient.builder();
}
复制代码

官方文档:docs.spring.io/spring-fram…
文章完整代码:gitee.com/binecy/bin-…

实际项目中,线程阻塞场景往往不只有Http请求阻塞,还有Mysql请求,Redis请求,Kafka请求等等导致的阻塞。从这些数据源中获取数据时,大多数都是阻塞直到数据源返回数据。
而Reactive Spring强大在于,它也支持这些数据源的非阻塞响应式编程。
下一篇文章,我们来看一个如何实现Redis的非阻塞响应式编程。

如果您觉得本文不错,欢迎关注我的微信公众号,系列文章持续更新中。您的关注是我坚持的动力。