聊聊 Spring 異步任務(wù)教程
阿粉最近碰到一個(gè)場(chǎng)景,用戶注冊(cè)之后需要發(fā)送郵件給其郵箱。原先設(shè)計(jì)中,這是一個(gè)同步過程,注冊(cè)方法需要等待郵件發(fā)送成功才能返回。
由于郵件發(fā)送流程對(duì)于注冊(cè)來說并不是一個(gè)關(guān)鍵節(jié)點(diǎn),我們可以將郵件發(fā)送異步執(zhí)行,減少注冊(cè)方法執(zhí)行時(shí)間。
我們可以自己創(chuàng)建線程池,然后執(zhí)行異步任務(wù),示例代碼如下:
- // 生產(chǎn)使用線程池的最佳實(shí)踐,一定要自定義線程池,不要嫌麻煩,使用 Executors 創(chuàng)建線程池
- private ThreadPoolExecutor threadPool =
- new ThreadPoolExecutor(5,
- 10,
- 60l,
- TimeUnit.SECONDS,
- new LinkedBlockingDeque<>(200),
- new ThreadFactoryBuilder().setNameFormat("register-%d").build());
- /**
- * 使用線程池執(zhí)行發(fā)送郵件的任務(wù)
- */
- private void sendEmailByThreadPool() {
- threadPool.submit(() -> emailService.sendEmail());
- }
ps: 生產(chǎn)使用線程池的最佳實(shí)踐,一定要自定義線程池,根據(jù)業(yè)務(wù)場(chǎng)景設(shè)置合理的線程池參數(shù),另外給線程設(shè)置具有明確意義的前綴,這樣排查問題就非常簡單。
千萬不要為了方便,使用 Executors 相關(guān)方法創(chuàng)建線程池。
上面代碼中使用線程池完成了發(fā)送郵件的異步任務(wù),可以看到這個(gè)示例還是有點(diǎn)麻煩,我們不僅要自定義線程池,還需要在創(chuàng)建相關(guān)任務(wù)執(zhí)行類。
Spring 提供執(zhí)行異步任務(wù)功能,我們使用一個(gè)注解就可以輕松完成上面的功能。
今天阿粉就來講解一下如何使用 Spring 異步任務(wù),以及 Spring 異步任務(wù)使用過程中一些注意點(diǎn)。
異步任務(wù)使用方式
Spring 異步任務(wù)需要在相關(guān)的方法上設(shè)置 @Async 注解,這里為了舉例,我們創(chuàng)建一個(gè) EmailService 類,專用完成郵件服務(wù)。
代碼如下所示:
- @Slf4j
- @Service
- public class EmailService {
- /**
- * 異步發(fā)送任務(wù)
- *
- * @throws InterruptedException
- */
- @SneakyThrows
- @Async
- public void sendEmailAsync() {
- log.info("使用 Spring 異步任務(wù)發(fā)送郵件示例");
- // 模擬郵件發(fā)送耗時(shí)
- TimeUnit.SECONDS.sleep(2l);
- }
- }
這里要注意了,Spring 異步任務(wù)默認(rèn)關(guān)閉的,我們需要使用 @EnableAsync開啟異步任務(wù)。
如果還在使用 Spring XML 配置,我們需要配置如下配置:
- <task:annotation-driven/>
上述配置完成之后,我們只需要在調(diào)用方,比如上一層 Controller 注入這個(gè) EmailService ,然后直接調(diào)用這個(gè)方法,該方法將會(huì)在異步線程中執(zhí)行。
- @Slf4j
- @RestController
- public class RegisterController {
- @Autowired
- EmailService emailService;
- @RequestMapping("register")
- public String register() {
- log.info("注冊(cè)流程開始");
- emailService.sendEmailAsync();
- return "success";
- }
- }
輸出日志如下:
從日志上可以看到,兩個(gè)方法執(zhí)行線程不一樣,這就說明了EmailService#sendEmailAsync 被異步線程成功執(zhí)行。
帶有返回值的異步任務(wù)
上面的異步任務(wù)比較簡單,但是有時(shí)我們有需要獲取異步任務(wù)返回值。
如果使用線程池執(zhí)行異步任務(wù),我們可以使用 threadPool#submit 獲取返回對(duì)象 Future,接著我們就可以調(diào)用其內(nèi) get 方法,獲取返回結(jié)果。
在 Spring 異步任務(wù)中,我們也可以使用 Future 獲取返回結(jié)果,示例代碼如下:
- @Async
- @SneakyThrows
- public Future<String> sendEmailAsyncWithResult() {
- log.info("使用 Spring 異步任務(wù)發(fā)送郵件,并且獲取任務(wù)返回結(jié)果示例");
- TimeUnit.SECONDS.sleep(2l);
- return AsyncResult.forValue("success");
- }
這里需要注意,這里返回對(duì)象我們需要使用 Spring 內(nèi)部類 AsyncResult。
Controller 層調(diào)用代碼如下所示:
- private void sendEmailWithResult() {
- Future<String> future = emailService.sendEmailAsyncWithResult();
- try {
- String result = future.get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- }
- }
我們知道 Future#get 方法將會(huì)一直阻塞,直到異步任務(wù)執(zhí)行成功。
有時(shí)候我們獲取異步任務(wù)的返回值是為了做一下后續(xù)業(yè)務(wù),但是主流程方法是無需返回異步任務(wù)的返回值。如果我們使用了 Future#get方法,主流程就會(huì)一直被阻塞。
對(duì)于這種場(chǎng)景,我們可以使用 org.springframework.util.concurrent.ListenableFuture稍微改造一下上面的方法。
ListenableFuture 這個(gè)類允許我們注冊(cè)回調(diào)函數(shù),一旦異步任務(wù)執(zhí)行成功,或者執(zhí)行異常,將會(huì)立刻執(zhí)行回調(diào)函數(shù)。通過這種方式就可以不用阻塞執(zhí)行的主線程。
示例代碼如下:
- @Async
- @SneakyThrows
- public ListenableFuture<String> sendEmailAsyncWithListenableFuture() {
- log.info("使用 Spring 異步任務(wù)發(fā)送郵件,并且獲取任務(wù)返回結(jié)果示例");
- TimeUnit.SECONDS.sleep(2l);
- return AsyncResult.forValue("success");
- }
Controller 層代碼如下所示:
- ListenableFuture<String> listenableFuture = emailService.sendEmailAsyncWithListenableFuture();
- // 異步回調(diào)處理
- listenableFuture.addCallback(new SuccessCallback<String>() {
- @Override
- public void onSuccess(String result) {
- log.info("異步回調(diào)處理返回值");
- }
- }, new FailureCallback() {
- @Override
- public void onFailure(Throwable ex) {
- log.error("異步回調(diào)處理異常",ex);
- }
- });
看到這里,如果有同學(xué)有疑惑,我們返回對(duì)象是 AsyncResult,為什么方法返回類可以是 Future,又可以是 ListenableFuture?
看完這張類繼承關(guān)系,大家應(yīng)該就知道答案了。
異常處理方式
異步任務(wù)中異常處理方式,不是很難,我們只要在方法中將整個(gè)代碼塊 try...catch 即可。
- try {
- // 其他代碼
- } catch (Exception e) {
- e.printStackTrace();
- }
一般來說,我們只需要捕獲 Exception 異常,就可以應(yīng)對(duì)大部分情況
但是極端情況下,比如方法內(nèi)發(fā)生 OOM,將會(huì)拋出 OutOfMemoryError。如果發(fā)生Error 錯(cuò)誤,以上的捕獲代碼就會(huì)失效。
Spring 的異步任務(wù),默認(rèn)提供幾種異常處理方式,可以統(tǒng)一處理異步任務(wù)中的發(fā)生的異常。
帶有返回值的異常處理方式
如果我們使用帶有返回值的異步任務(wù),處理方式就比較簡單了,我們只需要捕獲 Future#get 拋出的異常就好了。
- Future<String> future = emailService.sendEmailAsyncWithResult();
- try {
- String result = future.get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
如果我們使用 ListenableFuture 注冊(cè)回調(diào)函數(shù)處理,那我們?cè)诜椒▋?nèi)增加一個(gè) FailureCallback,在這個(gè)實(shí)現(xiàn)類處理相關(guān)異常即可。
- ListenableFuture<String> listenableFuture = emailService.sendEmailAsyncWithListenableFuture();
- // 異步回調(diào)處理
- listenableFuture.addCallback(new SuccessCallback<String>() {
- @Override
- public void onSuccess(String result) {
- log.info("異步回調(diào)處理返回值");
- }
- // 異常處理
- }, new FailureCallback() {
- @Override
- public void onFailure(Throwable ex) {
- log.error("異步回調(diào)處理異常",ex);
- }
- });
統(tǒng)一異常處理方式
沒有返回值的異步任務(wù)處理方式就比較復(fù)雜了,我們需要繼承 AsyncConfigurerSupport,實(shí)現(xiàn) getAsyncUncaughtExceptionHandler 方法,示例代碼如下:
- @Slf4j
- @Configuration
- public class AsyncErrorHandler extends AsyncConfigurerSupport {
- @Override
- public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
- AsyncUncaughtExceptionHandler handler = (throwable, method, objects) -> {
- log.error("全局異常捕獲", throwable);
- };
- return handler;
- }
- }
ps:這個(gè)異常處理方式只能處理未帶返回值的異步任務(wù)。
異步任務(wù)使用注意點(diǎn)
異步線程池設(shè)置
Spring 異步任務(wù)默認(rèn)使用 Spring 內(nèi)部線程池 SimpleAsyncTaskExecutor 。
這個(gè)線程池比較坑爹,不會(huì)復(fù)用線程。也就是說來一個(gè)請(qǐng)求,將會(huì)新建一個(gè)線程。極端情況下,如果調(diào)用次數(shù)過多,將會(huì)創(chuàng)建大量線程。
Java 中的線程是會(huì)占用一定的內(nèi)存空間 ,所以創(chuàng)建大量的線程將會(huì)導(dǎo)致 OOM 錯(cuò)誤。
所以如果需要使用異步任務(wù),我們需要一定要使用自定義線程池替換默認(rèn)線程池。
XML 配置方式
如果當(dāng)前使用 Spring XML 配置方式,我們可以使用如下配置設(shè)置線程池:
- <task:annotation-driven/>
- <task:executor id="executor" pool-size="10" queue-capacity="200"/>
注解方式
如果注解方式配置,配置方式如下:
- @Configuration
- public class AsyncConfiguration {
- @Bean
- public ThreadPoolTaskExecutor taskExecutor() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setThreadNamePrefix("task-Executor-");
- executor.setMaxPoolSize(10);
- executor.setCorePoolSize(5);
- executor.setQueueCapacity(200);
- // 還有其他參數(shù)可以設(shè)置
- return executor;
- }
- }
只要我們配置了這個(gè)線程池Bean,Spring 的異步任務(wù)都將會(huì)使用該線程池執(zhí)行。
如果我們應(yīng)用配置了多個(gè)線程池Bean,異步任務(wù)需要指定使用某個(gè)線程池執(zhí)行,我們只需要在 @Async注解上設(shè)置相應(yīng) Bean 的名字即可。示例代碼如下:
- @Async("taskExecutor")
- public void sendEmailAsync() {
- log.info("使用 Spring 異步任務(wù)發(fā)送郵件示例");
- TimeUnit.SECONDS.sleep(2l);
- }
Spring Boot 方式
如果是 SpringBoot 項(xiàng)目,從阿粉的測(cè)試情況來看,默認(rèn)將會(huì)創(chuàng)建核心線程數(shù)為 8,最大線程數(shù)為 Integer.MAX_VALUE,隊(duì)列數(shù)也為 Integer.MAX_VALUE線程池。
雖然上面的線程池不用擔(dān)心創(chuàng)建過多線程的問題,不是還是有可能隊(duì)列任務(wù)過多,導(dǎo)致 OOM 的問題。所以還是建議使用自定義線程池嗎,或者在配置文件修改默認(rèn)配置,例如:
- spring.task.execution.pool.core-size=10
- spring.task.execution.pool.max-size=20
- spring.task.execution.pool.queue-capacity=200
ps:如果我們使用注解方式自定義了一個(gè)線程池,那么 Spring 異步任務(wù)都將會(huì)使用這個(gè)線程池。通過 SpringBoot 配置文件創(chuàng)建的線程池將會(huì)失效。
異步方法失效
Spring 異步任務(wù)背后原理是使用 AOP ,而使用 Spring AOP 時(shí)我們需要注意,切勿在方法內(nèi)部調(diào)用其他使用 AOP 的方法,可能有點(diǎn)拗口,我們來看下代碼:
- @Async
- @SneakyThrows
- public ListenableFuture<String> sendEmailAsyncWithListenableFuture() {
- // 這樣調(diào)用,sendEmailAsync 不會(huì)異步執(zhí)行
- sendEmailAsync();
- log.info("使用 Spring 異步任務(wù)發(fā)送郵件,并且獲取任務(wù)返回結(jié)果示例");
- TimeUnit.SECONDS.sleep(2l);
- return AsyncResult.forValue("success");
- }
- /**
- * 異步發(fā)送任務(wù)
- *
- * @throws InterruptedException
- */
- @SneakyThrows
- @Async("taskExecutor")
- public void sendEmailAsync() {
- log.info("使用 Spring 異步任務(wù)發(fā)送郵件示例");
- TimeUnit.SECONDS.sleep(2l);
- }
上面兩個(gè)方法都處于同一個(gè)類中,這樣調(diào)用將會(huì)導(dǎo)致 AOP 失效,無法起到 AOP 的效果。
其他類似的 @Transactional,以及自定義的 AOP 注解都會(huì)有這個(gè)問題,大家使用過程,千萬需要注意這一點(diǎn)。
總結(jié)
Spring 異步任務(wù)幫我們大大解決簡化開發(fā)了流程,只要使用一個(gè)@Async就可以輕松解決異步任務(wù)。
不過,雖然使用方式比較簡單,大家使用過程一定要注意設(shè)置合理的線程池。