异步请求与同步请求
当我们接口中遇到长耗时操作时(例如长时间的运算),异步请求可以快速释放容器分配给请求处理线程的资源,提高系统的吞吐量,该接口的响应将被延后,可以在耗时处理完成时再对客户端进行响应。一句话:增加了服务器对客户端请求的吞吐量,但是客户端其实还是处于等待状态(实际生产上我们用的比较少,如果并发请求量很大的情况下,我们会通过nginx把请求负载到集群服务的各个节点上来分摊请求压力,当然还可以通过消息队列来做请求的缓冲)
区别如下:
-
同步请求:后端接受到请求后,直接在请求处理线程中处理业务逻辑,并返回,整个过程客户端等待
-
异步请求:后端接受到请求后,创建处理线程来执行业务逻辑,这样就可以释放请求线程,避免请求线程被大量耗时的请求占满,导致服务不可用。而后台处理线程执行完成后会通过监听器响应客户端,整个过程客户端等待
-
异步调用:后端接受到请求后,创建处理线程来执行业务逻辑,同时处理线程直接返回
场景分析
异步请求适用于耗时的请求,快速的释放请求处理线程资源,避免web容器的请求线程被打满,导致服务不可用。举个例子,假设我们有个业务需要导出操作,而其耗时长而且又占用内存,这时我们就可以结合异步请求、线程池、消息队列来完成业务需求
异步请求的实现方式
方式一:Servlet方式实现异步请求
比如我们现在有个接口耗时为2s,如果有大量并发请求过来势必会拖垮系统,这时我们可以考虑采用异步请求快速释放Web请求线程,从而提高系统的吞吐量,一般步骤如下:
-
javax.servlet.http.HttpServletRequest#startAsync()获取AsyncContext
-
asyncContext.addListener添加监听器(可选), 可设置其开始、完成、异常、超时等事件的回调处理
-
asyncContext.setTimeout设置超时时间(可选)
-
异步任务asyncContext.start(Runnable)
public void asyncContext(HttpServletRequest request) {
AsyncContext asyncContext = request.startAsync();
// 添加监听器(可选), 可设置其开始、完成、异常、超时等事件的回调处理
asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent asyncEvent) throws IOException {
log.info("AsyncController.onComplete {} 操作完成", Thread.currentThread().getName());
}
@Override
public void onTimeout(AsyncEvent asyncEvent) throws IOException {
log.info("AsyncController.onTimeout {} 用户请求超时", Thread.currentThread().getName());
}
@Override
public void onError(AsyncEvent asyncEvent) throws IOException {
log.info("AsyncController.onError {} 用户请求异常", Thread.currentThread().getName());
}
@Override
public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
log.info("AsyncController.onStartAsync {} 用户请求开始", Thread.currentThread().getName());
}
});
asyncContext.setTimeout(3000L); // 设置超时时间(可选), 比如任务计划耗时30s,但是可能出现超时
asyncContext.start(() -> {
try {
Thread.sleep(2000L);
System.out.println("内部线程: " + Thread.currentThread().getName());
asyncContext.getResponse().setCharacterEncoding("utf-8");
asyncContext.getResponse().setContentType("text/html;charset=UTF-8");
asyncContext.getResponse().getWriter().println("异步请求");
asyncContext.getResponse().getWriter().flush();
// 异步完成,释放
asyncContext.complete();
} catch (Exception e) {
e.printStackTrace();
}
});
log.info("主线程 end: {}", Thread.currentThread().getName());
}
但是异步请求有一个缺点就是客户端需要等待,但是如果我们需要的是发起一个任务而不需要实时响应给用户,那么我们可以通过异步调用的方式来完成。在下面的例子中,我们可以直接返回响应告诉用户任务已经创建成功,而任务则由后台处理进程处理,而处理结果可以通过监听器的完成、超时、异常回调函数设置到数据库中
public BaseResponse asyncContext2(HttpServletRequest request) {
AsyncContext asyncContext = request.startAsync();
// 添加监听器(可选), 用户请求开始、超时、
asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent asyncEvent) throws IOException {
log.info("AsyncController.onComplete {} 操作完成", Thread.currentThread().getName());
// TODO: 更新任务的状态(已完成)
}
@Override
public void onTimeout(AsyncEvent asyncEvent) throws IOException {
log.info("AsyncController.onTimeout {} 用户请求超时", Thread.currentThread().getName());
// TODO: 更新任务的状态(超时未完成)
}
@Override
public void onError(AsyncEvent asyncEvent) throws IOException {
log.info("AsyncController.onError {} 用户请求异常", Thread.currentThread().getName());
// TODO: 更新任务的状态(任务执行出错)
}
@Override
public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
log.info("AsyncController.onStartAsync {} 用户请求开始", Thread.currentThread().getName());
}
});
asyncContext.setTimeout(10000L); // 设置超时时间(可选), 比如任务计划耗时30s,但是可能出现超时
asyncContext.start(() -> {
try {
Thread.sleep(2 * 60 * 1000L); // 执行2分钟
// TODO: 执行异步调用
} catch (Exception e) {
e.printStackTrace();
}
});
log.info("主线程 end: {}", Thread.currentThread().getName());
return BaseResponse.success();
}
复制代码
方式二:Callable
将返回参数包裹一层callable即可,可以继承WebMvcConfigurationSupport类来设置默认线程池和超时处理
@GetMapping(value = "/asyncCallable")
public Callable<BaseResponse> asyncCallable() {
return new Callable<BaseResponse>() {
@Override
public BaseResponse call() throws Exception {
Thread.sleep(5000L); // do something
return BaseResponse.success();
}
};
}
@Configuration
public class WebConfig extends WebMvcConfigurationSupport {
@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.addResourceHandler("/**").addResourceLocations("classpath:/static/");
registry.addResourceHandler("swagger-ui.html").addResourceLocations("classpath:/META-INF/resources/");
registry.addResourceHandler("doc.html").addResourceLocations("classpath:/META-INF/resources/");
registry.addResourceHandler("/webjars/**").addResourceLocations("classpath:/META-INF/resources/webjars/");
}
@Resource
private ThreadPoolTaskExecutor myThreadPoolTaskExecutor;
@Override
public void configureAsyncSupport(final AsyncSupportConfigurer configurer) {
//处理 callable超时
configurer.setDefaultTimeout(1000); // 这里会抛出异常
configurer.setTaskExecutor(myThreadPoolTaskExecutor);
configurer.registerCallableInterceptors(timeoutCallableProcessingInterceptor());
}
@Bean
public TimeoutCallableProcessingInterceptor timeoutCallableProcessingInterceptor() {
return new TimeoutCallableProcessingInterceptor();
}
}
复制代码
方式三:WebAsyncTask
Callable的方式非常直观简单,但是我们经常关注的超时和异常的处理却不太好,这个时候我们可以用WebAsyncTask,实现起来也很简单,包装一下callable然后设置各种回调函数即可
@GetMapping(value = "/asyncWebAsyncTask")
public WebAsyncTask<BaseResponse> asyncWebAsyncTask() {
Callable<BaseResponse> callable = new Callable<BaseResponse>() {
@Override
public BaseResponse call() throws Exception {
TimeUnit.SECONDS.sleep(4); // do something
return BaseResponse.success();
}
};
WebAsyncTask<BaseResponse> webAsyncTask = new WebAsyncTask<>(3000L, callable);
webAsyncTask.onTimeout(() -> {
log.error("请求超时!");
return BaseResponse.wrap(ResponseCodeConst.SYSTEM_ERROR, "请求超时");
});
webAsyncTask.onError(() -> {
log.error("请求异常!");
return BaseResponse.wrap(ResponseCodeConst.SYSTEM_ERROR, "请求异常");
});
webAsyncTask.onCompletion(() -> {
log.info("成功回调!");
});
return webAsyncTask;
}
复制代码
方式四:DeferredResult
DeferredResult可以处理一些相对复杂一些的业务逻辑,最主要还是可以在另一个线程里面进行业务处理及返回,即可在两个完全不相干的线程间的通信。
private Map<String, DeferredResult> cache = new ConcurrentHashMap<>();
@Resource
private ThreadPoolTaskExecutor myThreadPoolTaskExecutor;
@GetMapping(value = "/asyncDeferredResult")
public BaseResponse asyncDeferredResult(String id) {
DeferredResult<BaseResponse> deferredResult = new DeferredResult<>();
cache.put(id, deferredResult);
// 处理超时事件,采用委派机制?
deferredResult.onTimeout(() -> {
log.info("请求超时! {}", Thread.currentThread().getName());
});
deferredResult.onCompletion(() -> {
log.info("成功回调!");
});
// 创建线程处理业务逻辑
myThreadPoolTaskExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(10);
log.info("do something {}", Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
return BaseResponse.successData("任务创建成功");
}
@GetMapping(value = "/getTaskResult")
public BaseResponse getTaskResult(String id, String content) {
DeferredResult<BaseResponse> deferredResult = new DeferredResult<>();
if (deferredResult == null)
return BaseResponse.successData("no consumer");
deferredResult.setResult(BaseResponse.successData(content));
return BaseResponse.successData(content);
}
复制代码
思考下,这些异步方法有哪些应用场景呢
近期评论