0%

SpringMVC 异步请求

Servlet 3.0 规范新增了对异步请求的支持,Spring MVC 也在此基础上对异步请求提供了方便。异步请求是在处理比较耗时的业务时先将 request 返回,然后另起线程处理耗时的业务,处理完后再返回给用户。

HTTP 协议是单向的,只能客户端自己拉不能服务器主动推,Servlet 对异步请求的支持并没有修改 HTTP 协议,而是对 HTTP 的巧妙利用。异步请求的核心原理主要分为两大类,一类是轮询,另一类是长连接。轮询就是定时自动发起请求检查有没有需要返回的数据,这种方式对资源的浪费是比较大的;长连接的原理是在客户端发起请求,服务端处理并返回后并不结束连接,这样就可以在后面再次返回给客户端数据。Servlet 对异步请求的支持其实采用的是长连接的方式,也就是说,异步请求中在原始的请求返回的时候并没有关闭连接,关闭的,只是处理请求的那个线程(一般是回收的线程池里了),只有在异步请求全部处理完之后才会关闭连接。

Servlet 3.0 对异步请求的支持

在 Servlet 3.0 规范中使用异步处理请求非常简单,只需要在请求处理过程中调用 request 的 startAsync 方法即可,其返回值是 AsyncContext。

AsyncContext 是保存与异步请求相关的所有信息,类似于 Servlet 中的 ServletContext。异步请求主要是使用 AsyncContext 进行操作,它是在请求处理的过程中调用 Request 的 startAsync 方法返回的,需要注意的是多次调用 startAsync 方法返回的是同一个 AsyncContext。AsyncContext 接口定义如下:

public interface AsyncContext {
    String ASYNC_REQUEST_URI = "javax.servlet.async.request_uri";
    String ASYNC_CONTEXT_PATH = "javax.servlet.async.context_path";
    String ASYNC_MAPPING = "javax.servlet.async.mapping";
    String ASYNC_PATH_INFO = "javax.servlet.async.path_info";
    String ASYNC_SERVLET_PATH = "javax.servlet.async.servlet_path";
    String ASYNC_QUERY_STRING = "javax.servlet.async.query_string";

    ServletRequest getRequest();

    ServletResponse getResponse();

    boolean hasOriginalRequestAndResponse();

    void dispatch();

    void dispatch(String var1);

    void dispatch(ServletContext var1, String var2);

    void complete();

    void start(Runnable var1);

    void addListener(AsyncListener var1);

    void addListener(AsyncListener var1, ServletRequest var2, ServletResponse var3);

    <T extends AsyncListener> T createListener(Class<T> var1) throws ServletException;

    void setTimeout(long var1);

    long getTimeout();
}

SpringMVC 中的异步请求

Spring MVC 为了方便使用异步请求专门提供了 AsyncWebRequest 类型的 request,并且提供了处理异步请求的管理器 WebAsyncManager 和工具 WebAsyncUtils。

Spring MVC 将异步请求细分为了 Callable、WebAsyncTask、DeferredResult 和 ListenableFuture 四种类型。Spring MVC 专门提供了 AsyncRestTemplate 方法调用别的资源,并返回 ListenableFuture 类型。

异步请求示例

Callable

@RequestMapping("/hello")
public Callable<String> hello() {
  return new Callable<String>() {
    @Override
    public String call() throws Exception {
      sleep5s();
      return "Complete";
    }
  };
}

Callable 的处理其实是在 WebAsyncManager 内部封装成 WebAsyncTask 后再处理的。核心逻辑在返回值处理器 CallableMethodReturnValueHandler。

WebAsyncTask

@RequestMapping("/hello")
public WebAsyncTask<String> hello() {
  WebAsyncTask<String> task = new WebAsyncTask<>(() -> {
    Thread.sleep(5000);
    return "Complete";
  });
  return task;
}

WebAsyncTask 比 Callable 更好的是支持设置 timeout 和 Executor,以及支持 Timeout、Complete、Error 事件回调。

DeferredResult

DeferredResult 是 Spring 提供的一种用于封装延迟处理结果的类,当一个处理器返回
DeferredResult 类型的返回值时将启动异步处理。其含义是当结果好了 set 就可以了。

@RequestMapping("/hello")
public DeferredResult<String> hello() {
  final DeferredResult<String> result = new DeferredResult<String>(5 * 1000L, "Timeout");
  // 一般使用线程池
  new Thread(()->{
    sleep5s();
    result.setResult("Complete");
  }).start();
  return result;
}

其支持设置 timeout 和 结果处理器,以及支持 Timeout、Complete、Error 事件回调。

ListenableFuture

ListenableFuture 继承自 Future,ListenableFuture 在 Future 阻塞获取结果的基础上,增加了可以添加处理成功和处理失败回调方法的方法,代码如下:

public interface ListenableFuture<T> extends Future<T> {

   /**
    * Register the given {@code ListenableFutureCallback}.
    * @param callback the callback to register
    */
   void addCallback(ListenableFutureCallback<? super T> callback);

   /**
    * Java 8 lambda-friendly alternative with success and failure callbacks.
    * @param successCallback the success callback
    * @param failureCallback the failure callback
    * @since 4.1
    */
   void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);


   /**
    * Expose this {@link ListenableFuture} as a JDK {@link CompletableFuture}.
    * @since 5.0
    */
   default CompletableFuture<T> completable() {
      CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
      addCallback(completable::complete, completable::completeExceptionally);
      return completable;
   }

}

ListenableFutureCallback 继承自 SuccessCallback 和 FailureCallback 接口,后两个接口分别有一个 onSuccess 方法和 onFailure 方法,用于处理异步处理成功的返回值和异步处理失败的返回值。

异步请求组件

AsyncWebRequest

AsyncWebRequest 是专门处理异步请求的 request。

public interface AsyncWebRequest extends NativeWebRequest {
 
   void setTimeout(Long timeout);
  
   void dispatch();
   void startAsync(); 
   boolean isAsyncStarted();
   boolean isAsyncComplete();

   void addTimeoutHandler(Runnable runnable);
   void addCompletionHandler(Runnable runnable);
}

其实现类有两个,一个是 NoSupportAsyncWebRequest,另一个是 StandardServletAsyncWebRequest,前者不支持异步请求,所以在 Spring MVC 中实际用作异步请求的 request 是 StandardServletAsyncWebRequest。

StandardServletAsyncWebRequest 还实现了 AsyncListener 接口,其本质就是一个监听器。并且在 startAsync 方法中创建出 AsyncContext 后会将自己当作监听器注册进去。

WebAsyncManager

WebAsyncManager 是 Spring MVC 处理异步请求过程中最核心的类,它管理着整个异步
处理的过程。

WebAsyncManager 中最重要的两个方法是 startCallableProcessing 和 startDeferredResultProcessing,这两个方法是启动异步处理的入口方法,它们一共做了三件事:① 启动异步处理; ② 给 Request 设置相应属性(主要包括 timeout、timeoutHandler 和 completionHandler); ③ 在相应位置调用相应的拦截器。这里的拦截器是 Spring MVC 自己定义的。

startCallableProcessing 方法用于处理 Callable 和 WebAsyncTask 类型的异步请求,使用的拦截器类型是 CallableProcessingInterceptor,拦截器封装在 CallableInterceptorChain 类型的拦截器链中统一调用。

startDeferredResultProcessing 方法用于处理 DeferredResult 和 ListenableFuture 类型的异步请求,使用的拦截器是 DeferredResultProcessingInterceptor 拦截器,拦截器封装在 DeferredResultInterceptorChain 类型的拦截器链中统一调用。

下面是 Callable 类型的拦截器

// 用作拦截 Callable 类型的异步请求
public interface CallableProcessingInterceptor {

   static final Object RESULT_NONE = new Object();

   static final Object RESPONSE_HANDLED = new Object();

   // 并发处理前事件
   <T> void  beforeConcurrentHandling(NativeWebRequest request, Callable<T> task) throws Exception;

   <T> void preProcess(NativeWebRequest request, Callable<T> task) throws Exception;

   <T> void postProcess(NativeWebRequest request, Callable<T> task, Object concurrentResult) throws Exception;
 
   <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws Exception;

   // Complete 事件:包含超时、网络异常、正常完成等的 Complete 事件
   <T> void afterCompletion(NativeWebRequest request, Callable<T> task) throws Exception;

}

下面是 DeferredResult 类型的拦截器

// 用作拦截 DeferredResult 类型的异步请求
public interface DeferredResultProcessingInterceptor {

   <T> void beforeConcurrentHandling(NativeWebRequest request, DeferredResult<T> deferredResult) throws Exception;

   <T> void preProcess(NativeWebRequest request, DeferredResult<T> deferredResult) throws Exception;

   <T> void postProcess(NativeWebRequest request, DeferredResult<T> deferredResult, Object concurrentResult) throws Exception;

   <T> boolean handleTimeout(NativeWebRequest request, DeferredResult<T> deferredResult) throws Exception;

   <T> void afterCompletion(NativeWebRequest request, DeferredResult<T> deferredResult) throws Exception;

}

CallableInterceptorChain 和 DeferredResultInterceptorChain 分别用于封装两个 Interceptor,它们都是将多个相应的拦截器封装到一个 List 类型的属性,然后在相应的方法中调用所封装的 Interceptor 相应方法进行处理。

下面分析 WebAsyncManager 的核心代码:

public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext) throws Exception {

   // 设置超时时间
   Long timeout = webAsyncTask.getTimeout();
   if (timeout != null) {
      this.asyncWebRequest.setTimeout(timeout);
   }

   // 设置线程池
   AsyncTaskExecutor executor = webAsyncTask.getExecutor();
   if (executor != null) {
      this.taskExecutor = executor;
   }

   // 设置拦截器
   List<CallableProcessingInterceptor> interceptors = new ArrayList<CallableProcessingInterceptor>();
   interceptors.add(webAsyncTask.getInterceptor());
   interceptors.addAll(this.callableInterceptors.values());
   interceptors.add(timeoutCallableInterceptor);

   final Callable<?> callable = webAsyncTask.getCallable();
   final CallableInterceptorChain interceptorChain = new CallableInterceptorChain(interceptors);

   // 添加超时请求处理器
   this.asyncWebRequest.addTimeoutHandler(new Runnable() {
      @Override
      public void run() {
         Object result = interceptorChain.triggerAfterTimeout(asyncWebRequest, callable);
         if (result != CallableProcessingInterceptor.RESULT_NONE) {
            setConcurrentResultAndDispatch(result);
         }
      }
   });

   // 添加请求完成处理器
   this.asyncWebRequest.addCompletionHandler(new Runnable() {
      @Override
      public void run() {
         interceptorChain.triggerAfterCompletion(asyncWebRequest, callable);
      }
   });
   // 执行 applu 方法
  interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, callable);
   
   // 启动异步处理
   startAsyncProcessing(processingContext);
   try {
      this.taskExecutor.submit(new Runnable() {
         @Override
         public void run() {
            Object result = null;
            try {
               interceptorChain.applyPreProcess(asyncWebRequest, callable);
               result = callable.call();
            }
            catch (Throwable ex) {
               result = ex;
            }
            finally {
               result = interceptorChain.applyPostProcess(asyncWebRequest, callable, result);
            }
            // 设置处理结果并发送请求
            setConcurrentResultAndDispatch(result);
         }
      });
   }
   catch (RejectedExecutionException ex) {
      Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex);
      setConcurrentResultAndDispatch(result);
      throw ex;
   }
}

concurrentResult 是 WebAsyncManager 中用来保存异步处理结果的属性,hasConcurrentResult 方法用来判断 concurrentResult 是否已经存在返回值。整个方法过程是:如果 concurrentResult 已经有返回值则直接返回,否则将传入的参数设置到 concurrentResult,然后调用 asyncWebRequest.isAsyneComplete() 检查 Request 是否已设置为异步处理完成状态(网络中断会造成 Request 设置为异步处理完成状态),如果是则保存错误日志并返回,否则调用。

asyncWebRequest.dispatch() 发送请求。Spring MVC 中异步请求处理完成后会再次发起一个相同的请求,然后在 HandlerAdapter 中使用一个特殊的 HandlerMethod 来处理它,具体过程后面再讲解,不过通过 Request 的 dispatch 方法发起的请求使用的还是原来的 Request,也就是说原来保存在 Request 中的属性不会丢失。

startDeferredResultProcessing 方法和 startCallableProcessing 方法执行过程类似,只是并没有使用 taskExecutor 来提交执行,这是因为 DeferredResult 并不需要执行处理,在后面讲了 DeferredResult 的用法后大家就明白了。

WebAsyncUtils

提供了对于 Async Manager、Request 等的操作支持。

对异步请求的支持

SpringMVC 对异步的支持可以拆分为两个部分:异步请求包装、返回结果处理。

主要处理流程可总结为:首先在处理器中返回需要启动异步处理的类型时(四种类型)相应返回值处理器会调用 WebAsyncManager 的相关方法启动异步处理,然后在 DispatcherServlet 中将原来请求直接返回,当异步处理完成后会重新发出一个相同的请求,这时在 RequestMappingHandlerAdapter 中会使用特殊的 ServletInvocableHandlerMethod 来处理请求,处理方法是:如果异步处理返回的结果是异常类型则抛出异常,否则直接返回异步处理结果,然后使用返回值处理器处理,接着返回 DispatcherServlet 中按正常流程往下处理。

异步处理完成后会重新发起一个请求,这时会重新查找 HandlerMethod 并初始化 PathVariable、MatrixVariable 等参数,重新初始化 Model 中的数据并再次执行 HandlerInterceptor 中相应的方法。这么做主要是可以复用原来的那套组件进行处理而不需要重新定义。

异步请求包装

异步请求包装的核心逻辑主要由 RequestMappingHandlerAdapter#invokeHandleMethod 方法提供:

创建 AsyncWebRequest 并初始化当前请求的 WebAsyncManager

AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
asyncWebRequest.setTimeout(this.asyncRequestTimeout);

WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
asyncManager.setTaskExecutor(this.taskExecutor);
asyncManager.setAsyncWebRequest(asyncWebRequest);
asyncManager.registerCallableInterceptors(this.callableInterceptors);
asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);

返回结果处理

如果是该请求第一次进入 DispatcherServlet,会直接通过 ServletInvocableHandlerMethod 桥接到 Controller 层获取到需要异步执行的任务(如 Callable 等),再通过 ReturnValueHandler 做异步处理后返回 null。

invocableMethod.invokeAndHandle(webRequest, mavContainer);
if (asyncManager.isConcurrentHandlingStarted()) {
   return null;
}

下面主要解释一下 ReturnValueHandler 的行为(以 Callable 为例):

先通过 HandlerMethodReturnValueHandlerComposite 匹配出 Callable 对应的处理器 CallableMethodReturnValueHandler,并调用 handleReturnValue 做异步处理。

// WebAsyncManager
public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext)
      throws Exception {

   // ...设置基本属性
   
   // ... before concurrent handling
   
   startAsyncProcessing(processingContext);
   
   // ... pre process
   
   // ... process
   
   // ... post process
  
   // 异步结果计算完成后,再次发送一个请求返回数据,处理通过下述解决
   setConcurrentResultAndDispatch(result); 
  
   // ... exception process
}

如果当前请求是异步请求而且已经处理出了结果,则将异步处理结果与之前保存到 WebAsyncManager 里的 ModelAndViewContainer 取出来,并将 WebAsyncManager 里的结果清空,然后调用 ServletInvocableHandlerMethod 的 wrapConcurrentResult 方法创建 ConcurrentResultHandlerMethod 类型的 ServletInvocableHandlerMethod 来替换自己,创建出来的 ConcurrentResultHandlerMethod 并不执行请求,它的主要功能是判断异步处理的结果是不是异常类型,如果是则抛出,如果不是则使用 ReturmValueHandler 对其进行解析并返回。

if (asyncManager.hasConcurrentResult()) {
   Object result = asyncManager.getConcurrentResult();
   mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0];
   asyncManager.clearConcurrentResult();
   LogFormatUtils.traceDebug(logger, traceOn -> {
      String formatted = LogFormatUtils.formatValue(result, !traceOn);
      return "Resume with async result [" + formatted + "]";
   });
   invocableMethod = invocableMethod.wrapConcurrentResult(result);
}

超时、线程池等属性都可以在 RequestMappingHandlerAdapter 中设置

异步请求的问题

第一是异步请求相应的拦截器里的方法会被调用两次。这是不合适的,而且有的时候还会出问题,比如,如果用了拦截器来检查 Token,那么第一次检查通过后就会将相应内容删除,第二次再检查的时候就检查失败了。

第二个是通过 FlashMap 传递 Redirect 参数的情况。FlashMapManager 获取 FlashMap 的时候说过,每次获取后就会将相应的 FlashMap 删除,但异步请求会获取两次,如果异步处理器是 Redirect 到的结果处理器,并且使用 FlashMap 传递了参数,这种情况下如果在第二次获取 FlashMap 的时候(异步请求处理完了)正好用户又发了一个相同的请求,而且 RedirectView 已经将 FlashMap 设置到了 Session,在获取之前可能被前面的请求获取删除,导致自己获取不到,示例如下:

请求1 请求2
saveOutputFlashMap 设置到 FM1
saveOutputFlashMap 设置到 FM2
retrieveAndUpdate 获取到 FM1
retrieveAndUpdate 获取到 FM2
retrieveAndUpdate 获取到 null
retrieveAndUpdate 获取到 null

这样请求 2 设置的 FlashMap 就会被请求 1 的第二次 retrieveAndUpdate 获取到并从 Session 中删除,请求 2 就获取不到了,这样造成了两个请求的值都出了问题。