Servlet 3.0 规范新增了对异步请求的支持,Spring MVC 也在此基础上对异步请求提供了方便。异步请求是在处理比较耗时的业务时先将 request 返回,然后另起线程处理耗时的业务,处理完后再返回给用户。
HTTP 协议是单向的,只能客户端自己拉不能服务器主动推,Servlet 对异步请求的支持并没有修改 HTTP 协议,而是对 HTTP 的巧妙利用。异步请求的核心原理主要分为两大类,一类是轮询,另一类是长连接。轮询就是定时自动发起请求检查有没有需要返回的数据,这种方式对资源的浪费是比较大的;长连接的原理是在客户端发起请求,服务端处理并返回后并不结束连接,这样就可以在后面再次返回给客户端数据。Servlet 对异步请求的支持其实采用的是长连接的方式,也就是说,异步请求中在原始的请求返回的时候并没有关闭连接,关闭的,只是处理请求的那个线程(一般是回收的线程池里了),只有在异步请求全部处理完之后才会关闭连接。
Servlet 3.0 对异步请求的支持
在 Servlet 3.0 规范中使用异步处理请求非常简单,只需要在请求处理过程中调用 request 的 startAsync 方法即可,其返回值是 AsyncContext。
AsyncContext 是保存与异步请求相关的所有信息,类似于 Servlet 中的 ServletContext。异步请求主要是使用 AsyncContext 进行操作,它是在请求处理的过程中调用 Request 的 startAsync 方法返回的,需要注意的是多次调用 startAsync 方法返回的是同一个 AsyncContext。AsyncContext 接口定义如下:
public interface AsyncContext {
String ASYNC_REQUEST_URI = "javax.servlet.async.request_uri";
String ASYNC_CONTEXT_PATH = "javax.servlet.async.context_path";
String ASYNC_MAPPING = "javax.servlet.async.mapping";
String ASYNC_PATH_INFO = "javax.servlet.async.path_info";
String ASYNC_SERVLET_PATH = "javax.servlet.async.servlet_path";
String ASYNC_QUERY_STRING = "javax.servlet.async.query_string";
ServletRequest getRequest();
ServletResponse getResponse();
boolean hasOriginalRequestAndResponse();
void dispatch();
void dispatch(String var1);
void dispatch(ServletContext var1, String var2);
void complete();
void start(Runnable var1);
void addListener(AsyncListener var1);
void addListener(AsyncListener var1, ServletRequest var2, ServletResponse var3);
<T extends AsyncListener> T createListener(Class<T> var1) throws ServletException;
void setTimeout(long var1);
long getTimeout();
}
SpringMVC 中的异步请求
Spring MVC 为了方便使用异步请求专门提供了 AsyncWebRequest 类型的 request,并且提供了处理异步请求的管理器 WebAsyncManager 和工具 WebAsyncUtils。
Spring MVC 将异步请求细分为了 Callable、WebAsyncTask、DeferredResult 和 ListenableFuture 四种类型。Spring MVC 专门提供了 AsyncRestTemplate 方法调用别的资源,并返回 ListenableFuture 类型。
异步请求示例
Callable
@RequestMapping("/hello")
public Callable<String> hello() {
return new Callable<String>() {
@Override
public String call() throws Exception {
sleep5s();
return "Complete";
}
};
}
Callable 的处理其实是在 WebAsyncManager 内部封装成 WebAsyncTask 后再处理的。核心逻辑在返回值处理器 CallableMethodReturnValueHandler。
WebAsyncTask
@RequestMapping("/hello")
public WebAsyncTask<String> hello() {
WebAsyncTask<String> task = new WebAsyncTask<>(() -> {
Thread.sleep(5000);
return "Complete";
});
return task;
}
WebAsyncTask 比 Callable 更好的是支持设置 timeout 和 Executor,以及支持 Timeout、Complete、Error 事件回调。
DeferredResult
DeferredResult 是 Spring 提供的一种用于封装延迟处理结果的类,当一个处理器返回
DeferredResult 类型的返回值时将启动异步处理。其含义是当结果好了 set 就可以了。
@RequestMapping("/hello")
public DeferredResult<String> hello() {
final DeferredResult<String> result = new DeferredResult<String>(5 * 1000L, "Timeout");
// 一般使用线程池
new Thread(()->{
sleep5s();
result.setResult("Complete");
}).start();
return result;
}
其支持设置 timeout 和 结果处理器,以及支持 Timeout、Complete、Error 事件回调。
ListenableFuture
ListenableFuture 继承自 Future,ListenableFuture 在 Future 阻塞获取结果的基础上,增加了可以添加处理成功和处理失败回调方法的方法,代码如下:
public interface ListenableFuture<T> extends Future<T> {
/**
* Register the given {@code ListenableFutureCallback}.
* @param callback the callback to register
*/
void addCallback(ListenableFutureCallback<? super T> callback);
/**
* Java 8 lambda-friendly alternative with success and failure callbacks.
* @param successCallback the success callback
* @param failureCallback the failure callback
* @since 4.1
*/
void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
/**
* Expose this {@link ListenableFuture} as a JDK {@link CompletableFuture}.
* @since 5.0
*/
default CompletableFuture<T> completable() {
CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
addCallback(completable::complete, completable::completeExceptionally);
return completable;
}
}
ListenableFutureCallback 继承自 SuccessCallback 和 FailureCallback 接口,后两个接口分别有一个 onSuccess 方法和 onFailure 方法,用于处理异步处理成功的返回值和异步处理失败的返回值。
异步请求组件
AsyncWebRequest
AsyncWebRequest 是专门处理异步请求的 request。
public interface AsyncWebRequest extends NativeWebRequest {
void setTimeout(Long timeout);
void dispatch();
void startAsync();
boolean isAsyncStarted();
boolean isAsyncComplete();
void addTimeoutHandler(Runnable runnable);
void addCompletionHandler(Runnable runnable);
}
其实现类有两个,一个是 NoSupportAsyncWebRequest,另一个是 StandardServletAsyncWebRequest,前者不支持异步请求,所以在 Spring MVC 中实际用作异步请求的 request 是 StandardServletAsyncWebRequest。
StandardServletAsyncWebRequest 还实现了 AsyncListener 接口,其本质就是一个监听器。并且在 startAsync 方法中创建出 AsyncContext 后会将自己当作监听器注册进去。
WebAsyncManager
WebAsyncManager 是 Spring MVC 处理异步请求过程中最核心的类,它管理着整个异步
处理的过程。
WebAsyncManager 中最重要的两个方法是 startCallableProcessing 和 startDeferredResultProcessing,这两个方法是启动异步处理的入口方法,它们一共做了三件事:① 启动异步处理; ② 给 Request 设置相应属性(主要包括 timeout、timeoutHandler 和 completionHandler); ③ 在相应位置调用相应的拦截器。这里的拦截器是 Spring MVC 自己定义的。
startCallableProcessing 方法用于处理 Callable 和 WebAsyncTask 类型的异步请求,使用的拦截器类型是 CallableProcessingInterceptor,拦截器封装在 CallableInterceptorChain 类型的拦截器链中统一调用。
startDeferredResultProcessing 方法用于处理 DeferredResult 和 ListenableFuture 类型的异步请求,使用的拦截器是 DeferredResultProcessingInterceptor 拦截器,拦截器封装在 DeferredResultInterceptorChain 类型的拦截器链中统一调用。
下面是 Callable 类型的拦截器
// 用作拦截 Callable 类型的异步请求
public interface CallableProcessingInterceptor {
static final Object RESULT_NONE = new Object();
static final Object RESPONSE_HANDLED = new Object();
// 并发处理前事件
<T> void beforeConcurrentHandling(NativeWebRequest request, Callable<T> task) throws Exception;
<T> void preProcess(NativeWebRequest request, Callable<T> task) throws Exception;
<T> void postProcess(NativeWebRequest request, Callable<T> task, Object concurrentResult) throws Exception;
<T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws Exception;
// Complete 事件:包含超时、网络异常、正常完成等的 Complete 事件
<T> void afterCompletion(NativeWebRequest request, Callable<T> task) throws Exception;
}
下面是 DeferredResult 类型的拦截器
// 用作拦截 DeferredResult 类型的异步请求
public interface DeferredResultProcessingInterceptor {
<T> void beforeConcurrentHandling(NativeWebRequest request, DeferredResult<T> deferredResult) throws Exception;
<T> void preProcess(NativeWebRequest request, DeferredResult<T> deferredResult) throws Exception;
<T> void postProcess(NativeWebRequest request, DeferredResult<T> deferredResult, Object concurrentResult) throws Exception;
<T> boolean handleTimeout(NativeWebRequest request, DeferredResult<T> deferredResult) throws Exception;
<T> void afterCompletion(NativeWebRequest request, DeferredResult<T> deferredResult) throws Exception;
}
CallableInterceptorChain 和 DeferredResultInterceptorChain 分别用于封装两个 Interceptor,它们都是将多个相应的拦截器封装到一个 List 类型的属性,然后在相应的方法中调用所封装的 Interceptor 相应方法进行处理。
下面分析 WebAsyncManager 的核心代码:
public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext) throws Exception {
// 设置超时时间
Long timeout = webAsyncTask.getTimeout();
if (timeout != null) {
this.asyncWebRequest.setTimeout(timeout);
}
// 设置线程池
AsyncTaskExecutor executor = webAsyncTask.getExecutor();
if (executor != null) {
this.taskExecutor = executor;
}
// 设置拦截器
List<CallableProcessingInterceptor> interceptors = new ArrayList<CallableProcessingInterceptor>();
interceptors.add(webAsyncTask.getInterceptor());
interceptors.addAll(this.callableInterceptors.values());
interceptors.add(timeoutCallableInterceptor);
final Callable<?> callable = webAsyncTask.getCallable();
final CallableInterceptorChain interceptorChain = new CallableInterceptorChain(interceptors);
// 添加超时请求处理器
this.asyncWebRequest.addTimeoutHandler(new Runnable() {
@Override
public void run() {
Object result = interceptorChain.triggerAfterTimeout(asyncWebRequest, callable);
if (result != CallableProcessingInterceptor.RESULT_NONE) {
setConcurrentResultAndDispatch(result);
}
}
});
// 添加请求完成处理器
this.asyncWebRequest.addCompletionHandler(new Runnable() {
@Override
public void run() {
interceptorChain.triggerAfterCompletion(asyncWebRequest, callable);
}
});
// 执行 applu 方法
interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, callable);
// 启动异步处理
startAsyncProcessing(processingContext);
try {
this.taskExecutor.submit(new Runnable() {
@Override
public void run() {
Object result = null;
try {
interceptorChain.applyPreProcess(asyncWebRequest, callable);
result = callable.call();
}
catch (Throwable ex) {
result = ex;
}
finally {
result = interceptorChain.applyPostProcess(asyncWebRequest, callable, result);
}
// 设置处理结果并发送请求
setConcurrentResultAndDispatch(result);
}
});
}
catch (RejectedExecutionException ex) {
Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex);
setConcurrentResultAndDispatch(result);
throw ex;
}
}
concurrentResult 是 WebAsyncManager 中用来保存异步处理结果的属性,hasConcurrentResult 方法用来判断 concurrentResult 是否已经存在返回值。整个方法过程是:如果 concurrentResult 已经有返回值则直接返回,否则将传入的参数设置到 concurrentResult,然后调用 asyncWebRequest.isAsyneComplete()
检查 Request 是否已设置为异步处理完成状态(网络中断会造成 Request 设置为异步处理完成状态),如果是则保存错误日志并返回,否则调用。
asyncWebRequest.dispatch()
发送请求。Spring MVC 中异步请求处理完成后会再次发起一个相同的请求,然后在 HandlerAdapter 中使用一个特殊的 HandlerMethod 来处理它,具体过程后面再讲解,不过通过 Request 的 dispatch 方法发起的请求使用的还是原来的 Request,也就是说原来保存在 Request 中的属性不会丢失。
startDeferredResultProcessing 方法和 startCallableProcessing 方法执行过程类似,只是并没有使用 taskExecutor 来提交执行,这是因为 DeferredResult 并不需要执行处理,在后面讲了 DeferredResult 的用法后大家就明白了。
WebAsyncUtils
提供了对于 Async Manager、Request 等的操作支持。
对异步请求的支持
SpringMVC 对异步的支持可以拆分为两个部分:异步请求包装、返回结果处理。
主要处理流程可总结为:首先在处理器中返回需要启动异步处理的类型时(四种类型)相应返回值处理器会调用 WebAsyncManager 的相关方法启动异步处理,然后在 DispatcherServlet 中将原来请求直接返回,当异步处理完成后会重新发出一个相同的请求,这时在 RequestMappingHandlerAdapter 中会使用特殊的 ServletInvocableHandlerMethod 来处理请求,处理方法是:如果异步处理返回的结果是异常类型则抛出异常,否则直接返回异步处理结果,然后使用返回值处理器处理,接着返回 DispatcherServlet 中按正常流程往下处理。
异步处理完成后会重新发起一个请求,这时会重新查找 HandlerMethod 并初始化 PathVariable、MatrixVariable 等参数,重新初始化 Model 中的数据并再次执行 HandlerInterceptor 中相应的方法。这么做主要是可以复用原来的那套组件进行处理而不需要重新定义。
异步请求包装
异步请求包装的核心逻辑主要由 RequestMappingHandlerAdapter#invokeHandleMethod 方法提供:
创建 AsyncWebRequest 并初始化当前请求的 WebAsyncManager
AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
asyncWebRequest.setTimeout(this.asyncRequestTimeout);
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
asyncManager.setTaskExecutor(this.taskExecutor);
asyncManager.setAsyncWebRequest(asyncWebRequest);
asyncManager.registerCallableInterceptors(this.callableInterceptors);
asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);
返回结果处理
如果是该请求第一次进入 DispatcherServlet,会直接通过 ServletInvocableHandlerMethod 桥接到 Controller 层获取到需要异步执行的任务(如 Callable 等),再通过 ReturnValueHandler 做异步处理后返回 null。
invocableMethod.invokeAndHandle(webRequest, mavContainer);
if (asyncManager.isConcurrentHandlingStarted()) {
return null;
}
下面主要解释一下 ReturnValueHandler 的行为(以 Callable 为例):
先通过 HandlerMethodReturnValueHandlerComposite 匹配出 Callable 对应的处理器 CallableMethodReturnValueHandler,并调用 handleReturnValue 做异步处理。
// WebAsyncManager
public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext)
throws Exception {
// ...设置基本属性
// ... before concurrent handling
startAsyncProcessing(processingContext);
// ... pre process
// ... process
// ... post process
// 异步结果计算完成后,再次发送一个请求返回数据,处理通过下述解决
setConcurrentResultAndDispatch(result);
// ... exception process
}
如果当前请求是异步请求而且已经处理出了结果,则将异步处理结果与之前保存到 WebAsyncManager 里的 ModelAndViewContainer 取出来,并将 WebAsyncManager 里的结果清空,然后调用 ServletInvocableHandlerMethod 的 wrapConcurrentResult 方法创建 ConcurrentResultHandlerMethod 类型的 ServletInvocableHandlerMethod 来替换自己,创建出来的 ConcurrentResultHandlerMethod 并不执行请求,它的主要功能是判断异步处理的结果是不是异常类型,如果是则抛出,如果不是则使用 ReturmValueHandler 对其进行解析并返回。
if (asyncManager.hasConcurrentResult()) {
Object result = asyncManager.getConcurrentResult();
mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0];
asyncManager.clearConcurrentResult();
LogFormatUtils.traceDebug(logger, traceOn -> {
String formatted = LogFormatUtils.formatValue(result, !traceOn);
return "Resume with async result [" + formatted + "]";
});
invocableMethod = invocableMethod.wrapConcurrentResult(result);
}
超时、线程池等属性都可以在 RequestMappingHandlerAdapter 中设置
异步请求的问题
第一是异步请求相应的拦截器里的方法会被调用两次。这是不合适的,而且有的时候还会出问题,比如,如果用了拦截器来检查 Token,那么第一次检查通过后就会将相应内容删除,第二次再检查的时候就检查失败了。
第二个是通过 FlashMap 传递 Redirect 参数的情况。FlashMapManager 获取 FlashMap 的时候说过,每次获取后就会将相应的 FlashMap 删除,但异步请求会获取两次,如果异步处理器是 Redirect 到的结果处理器,并且使用 FlashMap 传递了参数,这种情况下如果在第二次获取 FlashMap 的时候(异步请求处理完了)正好用户又发了一个相同的请求,而且 RedirectView 已经将 FlashMap 设置到了 Session,在获取之前可能被前面的请求获取删除,导致自己获取不到,示例如下:
请求1 | 请求2 |
---|---|
saveOutputFlashMap 设置到 FM1 | |
saveOutputFlashMap 设置到 FM2 | |
retrieveAndUpdate 获取到 FM1 | |
retrieveAndUpdate 获取到 FM2 | |
retrieveAndUpdate 获取到 null | |
retrieveAndUpdate 获取到 null |
这样请求 2 设置的 FlashMap 就会被请求 1 的第二次 retrieveAndUpdate 获取到并从 Session 中删除,请求 2 就获取不到了,这样造成了两个请求的值都出了问题。