这是我参与8月更文挑战的第7天,活动详情查看:8月更文挑战
一、概述
方案1. 常规的同步任务流程如下图
方案2. 单机DeferredResult异步任务
方案3. DeferredResult加上Redis的pub/sub异步任务
方案2和3都能处理异步任务,如果服务是单机的服务,直接方案2即可,简单高效。
但是如果是微服务架构,异步任务可能是发生在多个服务之间甚至是多点集群服务中。比如A服务负责逻辑计算,B服务负责存储等慢业务时。
像这种多服务多点的微服务架构中,需要的异步场景往往是需要在服务之间的。
这也是本文要描述的异步方案:DeferredResult与Redis pub/sub实现简单异步任务。
二、DeferredResult
DeferredResult提供了一种使用Callable进行异步请求处理的替代方法。当Callable代表应用程序并发执行时,应用程序可以使用DeferredResult从其选择的线程生成结果。
这是官方文档的释义,简单的说,它允许在请求接收线程以外的另一个线程中处理HTTP请求。虽然另一个线程会占用一些资源进行计算,但辅助线程在此期间不会被阻塞,并且可以处理传入的客户端请求。
异步请求处理模型非常有用,因为它有助于在高负载期间很好地扩展应用程序,特别是对于IO密集型操作。
@GetMapping(value = "/handlerReqSync")
public String handlerReqSync(Model model) {
return "success";
}
@GetMapping(value = "/handlerReqAsync")
public DeferredResult<String> handlerReqAsync(Model model) {
DeferredResult<String> deferredResult = new DeferredResult<>();
...
//业务处理
...
return deferredResult;
}
复制代码
DeferredResult作为返回参数时,内部的执行逻辑相当于被挂起(转交给其它线程处理),发送过来的http请求线程会被释放。当需要触发返回事件时,只需要调用DeferredResult的setResult方法就会立刻唤醒被挂起的任务,setResult的参数就是请求的返回值。
三、Redis的pub/sub
pub/sub即生产者消费者模式。
生产者(pub)负责生产产品(sendMsg)给工厂(Redis)。
而消费者(sub)收到从工厂(Redis)发的货(onMessage),然后进行消费(handle message)。
生产者
@Component
public class MsgPublisher {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void sendMsg(String msg){
redisTemplate.convertAndSend("anyTopic", "Message: " + msg +
";Time:" + Calendar.getInstance().getTime());
}
}
复制代码
- 第一个参数
channel习惯使用mq的也称它为topic,相当于工厂生产的产品名称(消费者根据产品名称消费对应的产品)。 - 第二个参数
message就是发送的消息,可以称之为生产者生产的产品。
消费者
通过RedisMessageListenerContainer创建监听容器,在里面配置要消费的topic也即产品名称。
@Configuration
public class SubConfig {
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter( new MsgListener() );
}
@Bean
RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory) {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(messageListener(), new ChannelTopic( "pubsub:queue" ));
return container;
}
}
复制代码
然后实现对应产品的listener。
@Component
public class MsgListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
Console.log( "Message received: {}",message.toString());
}
}
复制代码
下一节描述怎么结合二者实现微服务之间的异步通信。




近期评论