SpringBoot的四種異步處理,寫這篇文章,我自己先學(xué)到了
本文轉(zhuǎn)載自微信公眾號(hào)「程序新視界」,作者丑胖俠二師兄。轉(zhuǎn)載本文請(qǐng)聯(lián)系程序新視界公眾號(hào)。
前言
在網(wǎng)絡(luò)上有關(guān)于SpringBoot的異步請(qǐng)求和異步調(diào)有兩種說(shuō)法,經(jīng)過(guò)調(diào)用這兩種說(shuō)法本質(zhì)上就是一回事,在《異步請(qǐng)求和異步調(diào)用有區(qū)別?》一種,已經(jīng)做過(guò)解釋了。
同時(shí),我們也知道了“服務(wù)實(shí)現(xiàn)的異步與同步特性完全獨(dú)立于客戶端調(diào)用的異步和同步特性。也就是說(shuō)客戶端可以異步的去調(diào)用同步服務(wù),而且客戶端也可以同步的去調(diào)用異步服務(wù)。”
本篇文章我們以SpringBoot中異步的使用(包括:異步調(diào)用和異步方法兩個(gè)維度)來(lái)進(jìn)行講解。
異步請(qǐng)求與同步請(qǐng)求
我們先通過(guò)一張圖來(lái)區(qū)分一下異步請(qǐng)求和同步請(qǐng)求的區(qū)別:
異步與同步
在上圖中有三個(gè)角色:客戶端、Web容器和業(yè)務(wù)處理線程。
兩個(gè)流程中客戶端對(duì)Web容器的請(qǐng)求,都是同步的。因?yàn)樗鼈冊(cè)谡?qǐng)求客戶端時(shí)都處于阻塞等待狀態(tài),并沒(méi)有進(jìn)行異步處理。
在Web容器部分,第一個(gè)流程采用同步請(qǐng)求,第二個(gè)流程采用異步回調(diào)的形式。
通過(guò)異步處理,可以先釋放容器分配給請(qǐng)求的線程與相關(guān)資源,減輕系統(tǒng)負(fù)擔(dān),從而增加了服務(wù)器對(duì)客戶端請(qǐng)求的吞吐量。但并發(fā)請(qǐng)求量較大時(shí),通常會(huì)通過(guò)負(fù)載均衡的方案來(lái)解決,而不是異步。
Servlet3.0中的異步
Servlet 3.0之前,Servlet采用Thread-Per-Request的方式處理請(qǐng)求,即每一次Http請(qǐng)求都由一個(gè)線程從頭到尾處理。當(dāng)涉及到耗時(shí)操作時(shí),性能問(wèn)題便比較明顯。
Servlet 3.0中提供了異步處理請(qǐng)求??梢韵柔尫湃萜鞣峙浣o請(qǐng)求的線程與相關(guān)資源,減輕系統(tǒng)負(fù)擔(dān),從而增加服務(wù)的吞吐量。
Servlet 3.0的異步是通過(guò)AsyncContext對(duì)象來(lái)完成的,它可以從當(dāng)前線程傳給另一個(gè)線程,并歸還初始線程。新的線程處理完業(yè)務(wù)可以直接返回結(jié)果給客戶端。
AsyncContext對(duì)象可以從HttpServletRequest中獲?。?/p>
- @RequestMapping("/async")
- public void async(HttpServletRequest request) {
- AsyncContext asyncContext = request.getAsyncContext();
- }
在AsyncContext中提供了獲取ServletRequest、ServletResponse和添加監(jiān)聽(tīng)(addListener)等功能:
- public interface AsyncContext {
- ServletRequest getRequest();
- ServletResponse getResponse();
- void addListener(AsyncListener var1);
- void setTimeout(long var1);
- // 省略其他方法
- }
不僅可以通過(guò)AsyncContext獲取Request和Response等信息,還可以設(shè)置異步處理超時(shí)時(shí)間。通常,超時(shí)時(shí)間(單位毫秒)是需要設(shè)置的,不然無(wú)限等下去不就與同步處理一樣了。
通過(guò)AsyncContext的addListener還可以添加監(jiān)聽(tīng)事件,用來(lái)處理異步線程的開(kāi)始、完成、異常、超時(shí)等事件回調(diào)。
addListener方法的參數(shù)AsyncListener的源碼如下:
- public interface AsyncListener extends EventListener {
- // 異步執(zhí)行完畢時(shí)調(diào)用
- void onComplete(AsyncEvent var1) throws IOException;
- // 異步線程執(zhí)行超時(shí)調(diào)用
- void onTimeout(AsyncEvent var1) throws IOException;
- // 異步線程出錯(cuò)時(shí)調(diào)用
- void onError(AsyncEvent var1) throws IOException;
- // 異步線程開(kāi)始時(shí)調(diào)用
- void onStartAsync(AsyncEvent var1) throws IOException;
- }
通常,異?;虺瑫r(shí)時(shí)返回調(diào)用方錯(cuò)誤信息,而異常時(shí)會(huì)處理一些清理和關(guān)閉操作或記錄異常日志等。
基于Servlet方式實(shí)現(xiàn)異步請(qǐng)求
下面直接看一個(gè)基于Servlet方式的異步請(qǐng)求示例:
- @GetMapping(value = "/email/send")
- public void servletReq(HttpServletRequest request) {
- AsyncContext asyncContext = request.startAsync();
- // 設(shè)置監(jiān)聽(tīng)器:可設(shè)置其開(kāi)始、完成、異常、超時(shí)等事件的回調(diào)處理
- asyncContext.addListener(new AsyncListener() {
- @Override
- public void onTimeout(AsyncEvent event) {
- System.out.println("處理超時(shí)了...");
- }
- @Override
- public void onStartAsync(AsyncEvent event) {
- System.out.println("線程開(kāi)始執(zhí)行");
- }
- @Override
- public void onError(AsyncEvent event) {
- System.out.println("執(zhí)行過(guò)程中發(fā)生錯(cuò)誤:" + event.getThrowable().getMessage());
- }
- @Override
- public void onComplete(AsyncEvent event) {
- System.out.println("執(zhí)行完成,釋放資源");
- }
- });
- //設(shè)置超時(shí)時(shí)間
- asyncContext.setTimeout(6000);
- asyncContext.start(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(5000);
- System.out.println("內(nèi)部線程:" + Thread.currentThread().getName());
- asyncContext.getResponse().getWriter().println("async processing");
- } catch (Exception e) {
- System.out.println("異步處理發(fā)生異常:" + e.getMessage());
- }
- // 異步請(qǐng)求完成通知,整個(gè)請(qǐng)求完成
- asyncContext.complete();
- }
- });
- //此時(shí)request的線程連接已經(jīng)釋放了
- System.out.println("主線程:" + Thread.currentThread().getName());
- }
啟動(dòng)項(xiàng)目,訪問(wèn)對(duì)應(yīng)的URL,打印日志如下:
- 主線程:http-nio-8080-exec-4
- 內(nèi)部線程:http-nio-8080-exec-5
- 執(zhí)行完成,釋放資源
可以看出,上述代碼先執(zhí)行完了主線程,也就是程序的最后一行代碼的日志打印,然后才是內(nèi)部線程的執(zhí)行。內(nèi)部線程執(zhí)行完成,AsyncContext的onComplete方法被調(diào)用。
如果通過(guò)瀏覽器訪問(wèn)對(duì)應(yīng)的URL,還可以看到該方法的返回值“async processing”。說(shuō)明內(nèi)部線程的結(jié)果同樣正常的返回到客戶端了。
基于Spring實(shí)現(xiàn)異步請(qǐng)求
基于Spring可以通過(guò)Callable、DeferredResult或者WebAsyncTask等方式實(shí)現(xiàn)異步請(qǐng)求。
基于Callable實(shí)現(xiàn)
對(duì)于一次請(qǐng)求(/email),基于Callable的處理流程如下:
1、Spring MVC開(kāi)啟副線程處理業(yè)務(wù)(將Callable提交到TaskExecutor);
2、DispatcherServlet和所有的Filter退出Web容器的線程,但是response保持打開(kāi)狀態(tài);
3、Callable返回結(jié)果,SpringMVC將原始請(qǐng)求重新派發(fā)給容器(再重新請(qǐng)求一次/email),恢復(fù)之前的處理;
4、DispatcherServlet重新被調(diào)用,將結(jié)果返回給用戶;
代碼實(shí)現(xiàn)示例如下:
- @GetMapping("/email")
- public Callable<String> order() {
- System.out.println("主線程開(kāi)始:" + Thread.currentThread().getName());
- Callable<String> result = () -> {
- System.out.println("副線程開(kāi)始:" + Thread.currentThread().getName());
- Thread.sleep(1000);
- System.out.println("副線程返回:" + Thread.currentThread().getName());
- return "success";
- };
- System.out.println("主線程返回:" + Thread.currentThread().getName());
- return result;
- }
訪問(wèn)對(duì)應(yīng)URL,控制臺(tái)輸入日志如下:
- 主線程開(kāi)始:http-nio-8080-exec-1
- 主線程返回:http-nio-8080-exec-1
- 副線程開(kāi)始:task-1
- 副線程返回:task-1
通過(guò)日志可以看出,主線程已經(jīng)完成了,副線程才進(jìn)行執(zhí)行。同時(shí),URL返回結(jié)果“success”。這也說(shuō)明一個(gè)問(wèn)題,服務(wù)器端的異步處理對(duì)客戶端來(lái)說(shuō)是不可見(jiàn)的。
Callable默認(rèn)使用SimpleAsyncTaskExecutor類來(lái)執(zhí)行,這個(gè)類非常簡(jiǎn)單而且沒(méi)有重用線程。在實(shí)踐中,需要使用AsyncTaskExecutor類來(lái)對(duì)線程進(jìn)行配置。
這里通過(guò)實(shí)現(xiàn)WebMvcConfigurer接口來(lái)完成線程池的配置。
- @Configuration
- public class WebConfig implements WebMvcConfigurer {
- @Resource
- private ThreadPoolTaskExecutor myThreadPoolTaskExecutor;
- /**
- * 配置線程池
- */
- @Bean(name = "asyncPoolTaskExecutor")
- public ThreadPoolTaskExecutor getAsyncThreadPoolTaskExecutor() {
- ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
- taskExecutor.setCorePoolSize(2);
- taskExecutor.setMaxPoolSize(10);
- taskExecutor.setQueueCapacity(25);
- taskExecutor.setKeepAliveSeconds(200);
- taskExecutor.setThreadNamePrefix("thread-pool-");
- // 線程池對(duì)拒絕任務(wù)(無(wú)線程可用)的處理策略,目前只支持AbortPolicy、CallerRunsPolicy;默認(rèn)為后者
- taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
- taskExecutor.initialize();
- return taskExecutor;
- }
- @Override
- public void configureAsyncSupport(final AsyncSupportConfigurer configurer) {
- // 處理callable超時(shí)
- configurer.setDefaultTimeout(60 * 1000);
- configurer.setTaskExecutor(myThreadPoolTaskExecutor);
- configurer.registerCallableInterceptors(timeoutCallableProcessingInterceptor());
- }
- @Bean
- public TimeoutCallableProcessingInterceptor timeoutCallableProcessingInterceptor() {
- return new TimeoutCallableProcessingInterceptor();
- }
- }
為了驗(yàn)證打印的線程,我們將實(shí)例代碼中的System.out.println替換成日志輸出,會(huì)發(fā)現(xiàn)在使用線程池之前,打印日志如下:
- 2021-02-21 09:45:37.144 INFO 8312 --- [nio-8080-exec-1] c.s.learn.controller.AsynController : 主線程開(kāi)始:http-nio-8080-exec-1
- 2021-02-21 09:45:37.144 INFO 8312 --- [nio-8080-exec-1] c.s.learn.controller.AsynController : 主線程返回:http-nio-8080-exec-1
- 2021-02-21 09:45:37.148 INFO 8312 --- [ task-1] c.s.learn.controller.AsynController : 副線程開(kāi)始:task-1
- 2021-02-21 09:45:38.153 INFO 8312 --- [ task-1] c.s.learn.controller.AsynController : 副線程返回:task-1
線程名稱為“task-1”。讓線程池生效之后,打印日志如下:
- 2021-02-21 09:50:28.950 INFO 8339 --- [nio-8080-exec-1] c.s.learn.controller.AsynController : 主線程開(kāi)始:http-nio-8080-exec-1
- 2021-02-21 09:50:28.951 INFO 8339 --- [nio-8080-exec-1] c.s.learn.controller.AsynController : 主線程返回:http-nio-8080-exec-1
- 2021-02-21 09:50:28.955 INFO 8339 --- [ thread-pool-1] c.s.learn.controller.AsynController : 副線程開(kāi)始:thread-pool-1
- 2021-02-21 09:50:29.956 INFO 8339 --- [ thread-pool-1] c.s.learn.controller.AsynController : 副線程返回:thread-pool-1
線程名稱為“thread-pool-1”,其中前面的“thread-pool”正是我們配置的線程池前綴。
除了線程池的配置,還可以配置統(tǒng)一異常處理,這里就不再演示了。
基于WebAsyncTask實(shí)現(xiàn)
Spring提供的WebAsyncTask是對(duì)Callable的包裝,提供了更強(qiáng)大的功能,比如:處理超時(shí)回調(diào)、錯(cuò)誤回調(diào)、完成回調(diào)等。
- @GetMapping("/webAsyncTask")
- public WebAsyncTask<String> webAsyncTask() {
- log.info("外部線程:" + Thread.currentThread().getName());
- WebAsyncTask<String> result = new WebAsyncTask<>(60 * 1000L, new Callable<String>() {
- @Override
- public String call() {
- log.info("內(nèi)部線程:" + Thread.currentThread().getName());
- return "success";
- }
- });
- result.onTimeout(new Callable<String>() {
- @Override
- public String call() {
- log.info("timeout callback");
- return "timeout callback";
- }
- });
- result.onCompletion(new Runnable() {
- @Override
- public void run() {
- log.info("finish callback");
- }
- });
- return result;
- }
訪問(wèn)對(duì)應(yīng)請(qǐng)求,打印日志:
- 2021-02-21 10:22:33.028 INFO 8547 --- [nio-8080-exec-1] c.s.learn.controller.AsynController : 外部線程:http-nio-8080-exec-1
- 2021-02-21 10:22:33.033 INFO 8547 --- [ thread-pool-1] c.s.learn.controller.AsynController : 內(nèi)部線程:thread-pool-1
- 2021-02-21 10:22:33.055 INFO 8547 --- [nio-8080-exec-2] c.s.learn.controller.AsynController : finish callback
基于DeferredResult實(shí)現(xiàn)
DeferredResult使用方式與Callable類似,但在返回結(jié)果時(shí)不一樣,它返回的時(shí)實(shí)際結(jié)果可能沒(méi)有生成,實(shí)際的結(jié)果可能會(huì)在另外的線程里面設(shè)置到DeferredResult中去。
DeferredResult的這個(gè)特性對(duì)實(shí)現(xiàn)服務(wù)端推技術(shù)、訂單過(guò)期時(shí)間處理、長(zhǎng)輪詢、模擬MQ的功能等高級(jí)應(yīng)用非常重要。
關(guān)于DeferredResult的使用先來(lái)看一下官方的例子和說(shuō)明:
- @RequestMapping("/quotes")
- @ResponseBody
- public DeferredResult<String> quotes() {
- DeferredResult<String> deferredResult = new DeferredResult<String>();
- // Save the deferredResult in in-memory queue ...
- return deferredResult;
- }
- // In some other thread...
- deferredResult.setResult(data);
上述示例中我們可以發(fā)現(xiàn)DeferredResult的調(diào)用并不一定在Spring MVC當(dāng)中,它可以是別的線程。官方的解釋也是如此:
In this case the return value will also be produced from a separate thread. However, that thread is not known to Spring MVC. For example the result may be produced in response to some external event such as a JMS message, a scheduled task, etc.
也就是說(shuō),DeferredResult返回的結(jié)果也可能是由MQ、定時(shí)任務(wù)或其他線程觸發(fā)。來(lái)個(gè)實(shí)例:
- @Controller
- @RequestMapping("/async/controller")
- public class AsyncHelloController {
- private List<DeferredResult<String>> deferredResultList = new ArrayList<>();
- @ResponseBody
- @GetMapping("/hello")
- public DeferredResult<String> helloGet() throws Exception {
- DeferredResult<String> deferredResult = new DeferredResult<>();
- //先存起來(lái),等待觸發(fā)
- deferredResultList.add(deferredResult);
- return deferredResult;
- }
- @ResponseBody
- @GetMapping("/setHelloToAll")
- public void helloSet() throws Exception {
- // 讓所有hold住的請(qǐng)求給與響應(yīng)
- deferredResultList.forEach(d -> d.setResult("say hello to all"));
- }
- }
第一個(gè)請(qǐng)求/hello,會(huì)先將deferredResult存起來(lái),前端頁(yè)面是一直等待(轉(zhuǎn)圈)狀態(tài)。直到發(fā)第二個(gè)請(qǐng)求:setHelloToAll,所有的相關(guān)頁(yè)面才會(huì)有響應(yīng)。
整個(gè)執(zhí)行流程如下:
- controller返回一個(gè)DeferredResult,把它保存到內(nèi)存里或者List里面(供后續(xù)訪問(wèn));
- Spring MVC調(diào)用request.startAsync(),開(kāi)啟異步處理;與此同時(shí)將DispatcherServlet里的攔截器、Filter等等都馬上退出主線程,但是response仍然保持打開(kāi)的狀態(tài);
- 應(yīng)用通過(guò)另外一個(gè)線程(可能是MQ消息、定時(shí)任務(wù)等)給DeferredResult#setResult值。然后SpringMVC會(huì)把這個(gè)請(qǐng)求再次派發(fā)給servlet容器;
- DispatcherServlet再次被調(diào)用,然后處理后續(xù)的標(biāo)準(zhǔn)流程;
通過(guò)上述流程可以發(fā)現(xiàn):利用DeferredResult可實(shí)現(xiàn)一些長(zhǎng)連接的功能,比如當(dāng)某個(gè)操作是異步時(shí),可以先保存對(duì)應(yīng)的DeferredResult對(duì)象,當(dāng)異步通知回來(lái)時(shí),再找到這個(gè)DeferredResult對(duì)象,在setResult處理結(jié)果即可。從而提高性能。
SpringBoot中的異步實(shí)現(xiàn)
在SpringBoot中將一個(gè)方法聲明為異步方法非常簡(jiǎn)單,只需兩個(gè)注解即可@EnableAsync和@Async。其中@EnableAsync用于開(kāi)啟SpringBoot支持異步的功能,用在SpringBoot的啟動(dòng)類上。@Async用于方法上,標(biāo)記該方法為異步處理方法。
需要注意的是@Async并不支持用于被@Configuration注解的類的方法上。同一個(gè)類中,一個(gè)方法調(diào)用另外一個(gè)有@Async的方法,注解也是不會(huì)生效的。
@EnableAsync的使用示例:
- @SpringBootApplication
- @EnableAsync
- public class App {
- public static void main(String[] args) {
- SpringApplication.run(App.class, args);
- }
- }
@Async的使用示例:
- @Service
- public class SyncService {
- @Async
- public void asyncEvent() {
- // 業(yè)務(wù)處理
- }
- }
@Async注解的使用與Callable有類似之處,在默認(rèn)情況下使用的都是SimpleAsyncTaskExecutor線程池,可參考Callable中的方式來(lái)自定義線程池。
下面通過(guò)一個(gè)實(shí)例來(lái)驗(yàn)證一下,啟動(dòng)類上使用@EnableAsync,然后定義Controller類:
- @RestController
- public class IndexController {
- @Resource
- private UserService userService;
- @RequestMapping("/async")
- public String async(){
- System.out.println("--IndexController--1");
- userService.sendSms();
- System.out.println("--IndexController--4");
- return "success";
- }
- }
定義Service及異步方法:
- @Service
- public class UserService {
- @Async
- public void sendSms(){
- System.out.println("--sendSms--2");
- IntStream.range(0, 5).forEach(d -> {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- System.out.println("--sendSms--3");
- }
- }
如果先注釋掉@EnableAsync和@Async注解,即正常情況下的業(yè)務(wù)請(qǐng)求,打印日志為:
- --IndexController--1
- --sendSms--2
- --sendSms--3
- --IndexController--4
使用@EnableAsync和@Async注解時(shí),打印日志如下:
- --IndexController--1
- --IndexController--4
- --sendSms--2
- --sendSms--3
通過(guò)日志的對(duì)比我們可以看出,使用了@Async的方法,會(huì)被當(dāng)成一個(gè)子線程。所以,整個(gè)sendSms方法會(huì)在主線程執(zhí)行完了之后執(zhí)行。
這樣的效果是不是跟我們上面使用的其他形式的異步異曲同工?所以在文章最開(kāi)始已經(jīng)說(shuō)到,網(wǎng)絡(luò)上所謂的“異步調(diào)用與異步請(qǐng)求的區(qū)別”是并不存儲(chǔ)在的,本質(zhì)上都是一回事,只不過(guò)實(shí)現(xiàn)形式不同而已。這里所提到異步方法,也就是將方法進(jìn)行異步處理而已。
@Async、WebAsyncTask、Callable、DeferredResult的區(qū)別
所在的包不同:
- @Async:org.springframework.scheduling.annotation;
- WebAsyncTask:org.springframework.web.context.request.async;
- Callable:java.util.concurrent;
- DeferredResult:org.springframework.web.context.request.async;
通過(guò)所在的包,我們應(yīng)該隱隱約約感到一些區(qū)別,比如@Async是位于scheduling包中,而WebAsyncTask和DeferredResult是用于Web(Spring MVC)的,而Callable是用于concurrent(并發(fā))處理的。
對(duì)于Callable,通常用于Controller方法的異步請(qǐng)求,當(dāng)然也可以用于替換Runable的方式。在方法的返回上與正常的方法有所區(qū)別:
- // 普通方法
- public String aMethod(){
- }
- // 對(duì)照Callable方法
- public Callable<String> aMethod(){
- }
而WebAsyncTask是對(duì)Callable的封裝,提供了一些事件回調(diào)的處理,本質(zhì)上區(qū)別不大。
DeferredResult使用方式與Callable類似,重點(diǎn)在于跨線程之間的通信。
@Async也是替換Runable的一種方式,可以代替我們自己創(chuàng)建線程。而且適用的范圍更廣,并不局限于Controller層,而可以是任何層的方法上。
當(dāng)然,大家也可以從返回結(jié)果,異常處理等角度來(lái)分析一下,這里就不再展開(kāi)了。
小結(jié)
經(jīng)過(guò)上述的一步步分析,大家應(yīng)該對(duì)于Servlet3.0及Spring中對(duì)異步處理有所了解。當(dāng)了解了這些基礎(chǔ)理論,實(shí)戰(zhàn)實(shí)例,使用方法及注意事項(xiàng)之后,想必更能夠?qū)W(wǎng)絡(luò)上的相關(guān)知識(shí)能夠進(jìn)一步的去偽存真。
盡信書則不如無(wú)書,帶大家一起學(xué)習(xí),一起研究,一起去偽存真,追求真正有用的知識(shí)。
參考文章:
https://blog.csdn.net/f641385712/article/details/88692534
https://blog.lqdev.cn/2018/08/16/springboot/chapter-twenty/
https://blog.lqdev.cn/2018/08/17/springboot/chapter-twenty-one/
https://cloud.tencent.com/developer/article/1559230
https://docs.spring.io/spring-framework/docs/3.2.1.RELEASE/spring-framework-reference/html/mvc.html
https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/scheduling/annotation/Async.html
https://www.cnblogs.com/xuwenjin/p/8858050.html
https://stackoverflow.com/questions/17167020/when-to-use-spring-async-vs-callable-controller-async-controller-servlet-3
https://stackoverflow.com/questions/17855852/difference-between-spring-mvcs-async-deferredresult-and-callable