异步请求和异步调用

异步请求与同步请求

当我们接口中遇到长耗时操作时(例如长时间的运算),异步请求可以快速释放容器分配给请求处理线程的资源,提高系统的吞吐量,该接口的响应将被延后,可以在耗时处理完成时再对客户端进行响应。一句话:增加了服务器对客户端请求的吞吐量,但是客户端其实还是处于等待状态(实际生产上我们用的比较少,如果并发请求量很大的情况下,我们会通过nginx把请求负载到集群服务的各个节点上来分摊请求压力,当然还可以通过消息队列来做请求的缓冲)

区别如下:

  • 同步请求:后端接受到请求后,直接在请求处理线程中处理业务逻辑,并返回,整个过程客户端等待

  • 异步请求:后端接受到请求后,创建处理线程来执行业务逻辑,这样就可以释放请求线程,避免请求线程被大量耗时的请求占满,导致服务不可用。而后台处理线程执行完成后会通过监听器响应客户端,整个过程客户端等待

  • 异步调用:后端接受到请求后,创建处理线程来执行业务逻辑,同时处理线程直接返回

场景分析

异步请求适用于耗时的请求,快速的释放请求处理线程资源,避免web容器的请求线程被打满,导致服务不可用。举个例子,假设我们有个业务需要导出操作,而其耗时长而且又占用内存,这时我们就可以结合异步请求、线程池、消息队列来完成业务需求

异步请求的实现方式

方式一:Servlet方式实现异步请求

比如我们现在有个接口耗时为2s,如果有大量并发请求过来势必会拖垮系统,这时我们可以考虑采用异步请求快速释放Web请求线程,从而提高系统的吞吐量,一般步骤如下:

  • javax.servlet.http.HttpServletRequest#startAsync()获取AsyncContext

  • asyncContext.addListener添加监听器(可选), 可设置其开始、完成、异常、超时等事件的回调处理

  • asyncContext.setTimeout设置超时时间(可选)

  • 异步任务asyncContext.start(Runnable)

    public void asyncContext(HttpServletRequest request) {
    AsyncContext asyncContext = request.startAsync();
    // 添加监听器(可选), 可设置其开始、完成、异常、超时等事件的回调处理
    asyncContext.addListener(new AsyncListener() {
    @Override
    public void onComplete(AsyncEvent asyncEvent) throws IOException {
    log.info("AsyncController.onComplete {} 操作完成", Thread.currentThread().getName());
    }
    @Override
    public void onTimeout(AsyncEvent asyncEvent) throws IOException {
    log.info("AsyncController.onTimeout {} 用户请求超时", Thread.currentThread().getName());
    }
    @Override
    public void onError(AsyncEvent asyncEvent) throws IOException {
    log.info("AsyncController.onError {} 用户请求异常", Thread.currentThread().getName());
    }
    @Override
    public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
    log.info("AsyncController.onStartAsync {} 用户请求开始", Thread.currentThread().getName());
    }
    });
    asyncContext.setTimeout(3000L); // 设置超时时间(可选), 比如任务计划耗时30s,但是可能出现超时
    asyncContext.start(() -> {
    try {
    Thread.sleep(2000L);
    System.out.println("内部线程: " + Thread.currentThread().getName());
    asyncContext.getResponse().setCharacterEncoding("utf-8");
    asyncContext.getResponse().setContentType("text/html;charset=UTF-8");
    asyncContext.getResponse().getWriter().println("异步请求");
    asyncContext.getResponse().getWriter().flush();
    // 异步完成,释放
    asyncContext.complete();
    } catch (Exception e) {
    e.printStackTrace();
    }
    });
    log.info("主线程 end: {}", Thread.currentThread().getName());
    }

但是异步请求有一个缺点就是客户端需要等待,但是如果我们需要的是发起一个任务而不需要实时响应给用户,那么我们可以通过异步调用的方式来完成。在下面的例子中,我们可以直接返回响应告诉用户任务已经创建成功,而任务则由后台处理进程处理,而处理结果可以通过监听器的完成、超时、异常回调函数设置到数据库中

public BaseResponse asyncContext2(HttpServletRequest request) {
    AsyncContext asyncContext = request.startAsync();
    // 添加监听器(可选), 用户请求开始、超时、
    asyncContext.addListener(new AsyncListener() {
        @Override
        public void onComplete(AsyncEvent asyncEvent) throws IOException {
            log.info("AsyncController.onComplete {} 操作完成", Thread.currentThread().getName());
            // TODO: 更新任务的状态(已完成)
        }
        @Override
        public void onTimeout(AsyncEvent asyncEvent) throws IOException {
            log.info("AsyncController.onTimeout {} 用户请求超时", Thread.currentThread().getName());
            // TODO: 更新任务的状态(超时未完成)
        }
        @Override
        public void onError(AsyncEvent asyncEvent) throws IOException {
            log.info("AsyncController.onError {} 用户请求异常", Thread.currentThread().getName());
            // TODO: 更新任务的状态(任务执行出错)
        }
        @Override
        public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
            log.info("AsyncController.onStartAsync {} 用户请求开始", Thread.currentThread().getName());
        }
    });
    asyncContext.setTimeout(10000L); // 设置超时时间(可选), 比如任务计划耗时30s,但是可能出现超时
    asyncContext.start(() -> {
        try {
            Thread.sleep(2 * 60 * 1000L); // 执行2分钟
            // TODO: 执行异步调用                
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
    log.info("主线程 end: {}", Thread.currentThread().getName());
    return BaseResponse.success();
}
复制代码

方式二:Callable

将返回参数包裹一层callable即可,可以继承WebMvcConfigurationSupport类来设置默认线程池和超时处理

@GetMapping(value = "/asyncCallable")
public Callable<BaseResponse>  asyncCallable() {
    return new Callable<BaseResponse>() {
        @Override
        public BaseResponse call() throws Exception {
            Thread.sleep(5000L); // do something
            return BaseResponse.success();
        }
    };
}

@Configuration
public class WebConfig extends WebMvcConfigurationSupport {

    @Override
    public void addResourceHandlers(ResourceHandlerRegistry registry) {
        registry.addResourceHandler("/**").addResourceLocations("classpath:/static/");
        registry.addResourceHandler("swagger-ui.html").addResourceLocations("classpath:/META-INF/resources/");
        registry.addResourceHandler("doc.html").addResourceLocations("classpath:/META-INF/resources/");
        registry.addResourceHandler("/webjars/**").addResourceLocations("classpath:/META-INF/resources/webjars/");
    }

    @Resource
    private ThreadPoolTaskExecutor myThreadPoolTaskExecutor;

    @Override
    public void configureAsyncSupport(final AsyncSupportConfigurer configurer) {
        //处理 callable超时
        configurer.setDefaultTimeout(1000); // 这里会抛出异常
        configurer.setTaskExecutor(myThreadPoolTaskExecutor);
        configurer.registerCallableInterceptors(timeoutCallableProcessingInterceptor());
    }
    @Bean
    public TimeoutCallableProcessingInterceptor timeoutCallableProcessingInterceptor() {
        return new TimeoutCallableProcessingInterceptor();
    }
}
复制代码

方式三:WebAsyncTask

Callable的方式非常直观简单,但是我们经常关注的超时和异常的处理却不太好,这个时候我们可以用WebAsyncTask,实现起来也很简单,包装一下callable然后设置各种回调函数即可

@GetMapping(value = "/asyncWebAsyncTask")
public WebAsyncTask<BaseResponse> asyncWebAsyncTask() {

    Callable<BaseResponse> callable = new Callable<BaseResponse>() {
        @Override
        public BaseResponse call() throws Exception {
            TimeUnit.SECONDS.sleep(4); // do something
            return BaseResponse.success();
        }
    };

    WebAsyncTask<BaseResponse> webAsyncTask = new WebAsyncTask<>(3000L, callable);
    webAsyncTask.onTimeout(() -> {
        log.error("请求超时!");
        return BaseResponse.wrap(ResponseCodeConst.SYSTEM_ERROR, "请求超时");
    });
    webAsyncTask.onError(() -> {
        log.error("请求异常!");
        return BaseResponse.wrap(ResponseCodeConst.SYSTEM_ERROR, "请求异常");
    });
    webAsyncTask.onCompletion(() -> {
        log.info("成功回调!");
    });
    return webAsyncTask;
}
复制代码

方式四:DeferredResult

DeferredResult可以处理一些相对复杂一些的业务逻辑,最主要还是可以在另一个线程里面进行业务处理及返回,即可在两个完全不相干的线程间的通信。

private Map<String, DeferredResult> cache = new ConcurrentHashMap<>();
@Resource
private ThreadPoolTaskExecutor myThreadPoolTaskExecutor;
@GetMapping(value = "/asyncDeferredResult")
public BaseResponse asyncDeferredResult(String id) {
    DeferredResult<BaseResponse> deferredResult = new DeferredResult<>();
    cache.put(id, deferredResult);
    // 处理超时事件,采用委派机制?
    deferredResult.onTimeout(() -> {
        log.info("请求超时! {}", Thread.currentThread().getName());
    });
    deferredResult.onCompletion(() -> {
        log.info("成功回调!");
    });
    // 创建线程处理业务逻辑
    myThreadPoolTaskExecutor.execute(() -> {
        try {
            TimeUnit.SECONDS.sleep(10);
            log.info("do something {}", Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    return BaseResponse.successData("任务创建成功");
}

@GetMapping(value = "/getTaskResult")
public BaseResponse getTaskResult(String id, String content) {
    DeferredResult<BaseResponse> deferredResult = new DeferredResult<>();
    if (deferredResult == null)
        return BaseResponse.successData("no consumer");
    deferredResult.setResult(BaseResponse.successData(content));
    return BaseResponse.successData(content);
}
复制代码

思考下,这些异步方法有哪些应用场景呢