DeferredResult与Redispub/sub实现

这是我参与8月更文挑战的第7天,活动详情查看:8月更文挑战

一、概述

方案1. 常规的同步任务流程如下图

http请求

分配容器线程

同步处理完成

客户端

Web容器

等待逻辑处理

方案2. 单机DeferredResult异步任务

http请求

分配容器线程

释放容器线程

异步处理完成

客户端

Web容器

DefferredResult

其它线程后台处理

方案3. DeferredResult加上Redispub/sub异步任务

1. http请求

2.1 分配容器线程

3 释放容器线程

2.2 sub订阅等待响应结果

4. 处理完成后pub发布

5. setResult

6. 唤醒容器线程

客户端

Web容器

DefferredResult

其它线程后台处理

Redis

方案2和3都能处理异步任务,如果服务是单机的服务,直接方案2即可,简单高效。

但是如果是微服务架构,异步任务可能是发生在多个服务之间甚至是多点集群服务中。比如A服务负责逻辑计算,B服务负责存储等慢业务时。

1. HTTP请求a

2. 逻辑计算

3. 存储

1. HTTP请求b

2. 逻辑计算

3. 存储

1. HTTP请求c

2. 逻辑计算

3. 存储

客户端

A

B1

结束

B2

B3

像这种多服务多点的微服务架构中,需要的异步场景往往是需要在服务之间的。

这也是本文要描述的异步方案:DeferredResult与Redis pub/sub实现简单异步任务

二、DeferredResult

DeferredResult提供了一种使用Callable进行异步请求处理的替代方法。当Callable代表应用程序并发执行时,应用程序可以使用DeferredResult从其选择的线程生成结果。

这是官方文档的释义,简单的说,它允许在请求接收线程以外的另一个线程中处理HTTP请求。虽然另一个线程会占用一些资源进行计算,但辅助线程在此期间不会被阻塞,并且可以处理传入的客户端请求。

异步请求处理模型非常有用,因为它有助于在高负载期间很好地扩展应用程序,特别是对于IO密集型操作。


@GetMapping(value = "/handlerReqSync")
public String handlerReqSync(Model model) {
    return "success";
}

@GetMapping(value = "/handlerReqAsync")
public DeferredResult<String> handlerReqAsync(Model model) {
    DeferredResult<String> deferredResult = new DeferredResult<>();
    ...
    //业务处理
    ...
    return deferredResult;
}

复制代码

DeferredResult作为返回参数时,内部的执行逻辑相当于被挂起(转交给其它线程处理),发送过来的http请求线程会被释放。当需要触发返回事件时,只需要调用DeferredResultsetResult方法就会立刻唤醒被挂起的任务setResult的参数就是请求的返回值。

三、Redispub/sub

pub/sub即生产者消费者模式。

生产者(pub)负责生产产品(sendMsg)给工厂(Redis)。

生产

发送

生产者

产品

工厂

而消费者(sub)收到从工厂(Redis)发的货(onMessage),然后进行消费(handle message)。

发货

处理或消费

工厂

产品

消费者

生产者

@Component
public class MsgPublisher {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public void sendMsg(String msg){
        redisTemplate.convertAndSend("anyTopic", "Message: " + msg +
                ";Time:" + Calendar.getInstance().getTime());
    }

}
复制代码

image.png

  • 第一个参数channel习惯使用mq的也称它为topic,相当于工厂生产的产品名称(消费者根据产品名称消费对应的产品)。
  • 第二个参数message就是发送的消息,可以称之为生产者生产的产品

消费者

通过RedisMessageListenerContainer创建监听容器,在里面配置要消费的topic也即产品名称。

@Configuration
public class SubConfig {

    @Bean
    MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter( new MsgListener() );
    }

    @Bean
    RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory) {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        container.addMessageListener(messageListener(), new ChannelTopic( "pubsub:queue" ));
        return container;
    }

}
复制代码

然后实现对应产品的listener

@Component
public class MsgListener implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] bytes) {
        Console.log( "Message received: {}",message.toString());
    }
}
复制代码

下一节描述怎么结合二者实现微服务之间的异步通信。