你用錯(cuò)了!詳解SpringBoot異步任務(wù)&任務(wù)調(diào)度&異步請(qǐng)求線程池的使用及原理
環(huán)境:SpringBoot2.7.12
1. 簡介
異步任務(wù): 它允許將耗時(shí)的任務(wù)異步執(zhí)行,從而提高系統(tǒng)的并發(fā)能力和響應(yīng)速度。異步任務(wù)可以在啟動(dòng)類上使用注解@EnableAsync進(jìn)行啟用,并在需要異步執(zhí)行的任務(wù)上使用@Async標(biāo)注該方法為異步任務(wù)。通過這種方式,可以快速地在SpringBoot應(yīng)用程序中實(shí)現(xiàn)異步處理。
任務(wù)調(diào)度: 是SpringBoot中用于管理和執(zhí)行任務(wù)的機(jī)制。通過任務(wù)調(diào)用,可以輕松地調(diào)度、并行處理任務(wù)??梢栽趩?dòng)類上添加@EnableScheduling注解進(jìn)行開啟。在需要執(zhí)行的調(diào)度方法上使用@Scheduled注解。
異步請(qǐng)求: 是Web請(qǐng)求的一種處理方式,它允許后端在接收到請(qǐng)求后,新開一個(gè)線程來執(zhí)行業(yè)務(wù)邏輯,釋放請(qǐng)求線程,避免請(qǐng)求線程被大量耗時(shí)的請(qǐng)求沾滿,導(dǎo)致服務(wù)不可用。在SpringBoot中,異步請(qǐng)求可以通過Controller的返回值來控制,支持多種類型。
以上不管是任務(wù)還是請(qǐng)求都會(huì)在一個(gè)異步線程中執(zhí)行,這異步線程是使用的同一個(gè)嗎?還是各自都有各自的線程池?接下來將通過源碼的方式分析他們各自使用的線程池及自定義線程池。
2. 源碼分析
2.1 異步任務(wù)(@EnableAsync)
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {}
核心類AsyncConfigurationSelector
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
// 向容器注冊(cè)ProxyAsyncConfiguration
return new String[] {ProxyAsyncConfiguration.class.getName()};
// ...
}
}
}
ProxyAsyncConfiguration配置類中主要就是注冊(cè)了BeanPostProcessor用來處理@Async注解的方法。
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
// 處理器,該處理器獲取父類中的Supplier<Executor>
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
// ...
return bpp;
}
}
AbstractAsyncConfiguration
@Configuration(proxyBeanMethods = false)
public abstract class AbstractAsyncConfiguration implements ImportAware {
@Nullable
protected Supplier<Executor> executor;
// 自動(dòng)裝配AsyncConfigurer,我們可以通過實(shí)現(xiàn)該接口來實(shí)現(xiàn)自己的Executor
// 默認(rèn)情況下系統(tǒng)也沒有提供默認(rèn)的AsyncConfigurer.
@Autowired
void setConfigurers(ObjectProvider<AsyncConfigurer> configurers) {
Supplier<AsyncConfigurer> configurer = SingletonSupplier.of(() -> {
List<AsyncConfigurer> candidates = configurers.stream().collect(Collectors.toList());
if (CollectionUtils.isEmpty(candidates)) {
return null;
}
// 如果存在多個(gè)將拋出異常(也就相當(dāng)于容器中有多個(gè)Executor)
if (candidates.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
return candidates.get(0);
});
// 調(diào)用AsyncConfigurer#getAsyncExecutor方法,默認(rèn)返回的null
// 所以最終這里返回的null
this.executor = adapt(configurer, AsyncConfigurer::getAsyncExecutor);
}
private <T> Supplier<T> adapt(Supplier<AsyncConfigurer> supplier, Function<AsyncConfigurer, T> provider) {
return () -> {
AsyncConfigurer configurer = supplier.get();
return (configurer != null ? provider.apply(configurer) : null);
};
}
}
AsyncConfigurer
public interface AsyncConfigurer {
@Nullable
default Executor getAsyncExecutor() {
return null;
}
@Nullable
default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
到此,默認(rèn)情況下通過ProxyAsyncConfiguration是不能得到Executor執(zhí)行器,也就是到目前為止AsyncAnnotationBeanPostProcessor中的線程池對(duì)象還是null。接下來進(jìn)入到AsyncAnnotationBeanPostProcessor處理器中。
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
public void setBeanFactory(BeanFactory beanFactory) {
// 創(chuàng)建切面類,而這里的executor根據(jù)上文的分析還是null。
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
// ...
}
}
AsyncAnnotationAdvisor
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
// 構(gòu)建通知類,這里傳入的executor為null
this.advice = buildAdvice(executor, exceptionHandler);
}
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler)
// 創(chuàng)建攔截器
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
// executor為null, configure是父類(AsyncExecutionAspectSupport )方法
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
// 這個(gè)方法非常重要,我們的AnnotationAsyncExecutionInterceptor 并沒有
// 被容器管理,所以內(nèi)部的BeanFactory對(duì)象沒法注入是null,所以進(jìn)行了設(shè)置
public void setBeanFactory(BeanFactory beanFactory) {
if (this.advice instanceof BeanFactoryAware) {
((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
}
}
}
AsyncExecutionAspectSupport
public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";
public void configure(@Nullable Supplier<Executor> defaultExecutor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
// 這里的defaultExecutor獲取到d餓還是null,所以這里提供了默認(rèn)的獲取Executor的方法
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
}
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// 從容器中查找TaskExecutor類型的Bean對(duì)象
// 由于SpringBoot提供了一個(gè)自動(dòng)配置TaskExecutionAutoConfiguration
// 所以這里直接就返回了
return beanFactory.getBean(TaskExecutor.class);
}
// 如果容器中有多個(gè)TaskExecutor,那么將會(huì)拋出下面這個(gè)異常
catch (NoUniqueBeanDefinitionException ex) {
try {
// 獲取beanName=taskExecutor 獲取Executor
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
// 如果沒有這樣的Bean則結(jié)束了
catch (NoSuchBeanDefinitionException ex2) {
}
}
// 如果容器中沒有TaskExecutor類型的Bean拋出該異常
catch (NoSuchBeanDefinitionException ex) {
try {
// 這里與上面一樣了
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
}
}
// 最終返回null,那么程序最終也會(huì)報(bào)錯(cuò)。
return null;
}
}
如果Executor返回的null,那么最終程序拋出異常
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport {
public Object invoke(final MethodInvocation invocation) throws Throwable {
// ...
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
}
}
到此應(yīng)該說清楚了系統(tǒng)是如何查找Executor執(zhí)行器的(線程池對(duì)象)。接下來通過實(shí)例演示:
@Service
public class AsyncService {
@Async
public void async() {
System.out.printf("%s - 異步任務(wù)執(zhí)行...%n", Thread.currentThread().getName()) ;
}
}
默認(rèn)情況
taskExecutor-2 - 異步任務(wù)執(zhí)行...
可以通過如下配置修改默認(rèn)配置
spring:
task:
execution:
pool:
core-size: 2
max-size: 2
allow-core-thread-timeout: false
thread-name-prefix: pack-
輸出
pack-2 - 異步任務(wù)執(zhí)行...
自定義TaskExecutor
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(5);//核心線程數(shù)
pool.setMaxPoolSize(10);//最大線程數(shù)
pool.setQueueCapacity(25);//線程隊(duì)列
pool.setThreadNamePrefix("pack-custom-") ;
pool.initialize();//線程初始化
return pool;
}
根據(jù)上面源碼的分析,也可以自定義AsyncConfigurer
@Component
public class PackAsyncConfigurer implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
return new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)) ;
}
}
以上就是關(guān)于異步任務(wù)的源碼分析及自定義實(shí)現(xiàn)。
2.2 異步請(qǐng)求
在SpringMVC中我們可以將Controller接口的返回值定義為:DeferredResult、Callable、Reactive Types等。只要返回值是這些,我們耗時(shí)的代碼就可以放到一個(gè)異步的線程中執(zhí)行。這里我們一Callable為例,先看看效果
@GetMapping("/callable")
public Callable<Map<String, Object>> callable() {
long start = System.currentTimeMillis() ;
System.out.printf("%s - 開始時(shí)間:%d%n", Thread.currentThread().getName(), start) ;
Callable<Map<String, Object>> callable = new Callable<Map<String, Object>>() {
public Map<String, Object> call() throws Exception {
Map<String, Object> result = new HashMap<>() ;
try {
// 這里模擬耗時(shí)操作
TimeUnit.SECONDS.sleep(1) ;
// 將執(zhí)行結(jié)果保存
result.put("code", 1) ;
result.put("data", "你的業(yè)務(wù)數(shù)據(jù)") ;
System.out.println(Thread.currentThread().getName()) ;
} catch (InterruptedException e) {}
return result ;
}
} ;
long end = System.currentTimeMillis() ;
System.out.printf("%s - 結(jié)束時(shí)間:%d%n", Thread.currentThread().getName(), end) ;
System.out.printf("總耗時(shí):%d毫秒%n", (end - start)) ;
return callable ;
}
執(zhí)行結(jié)果
http-nio-8882-exec-4 - 開始時(shí)間:1705560641226
http-nio-8882-exec-4 - 結(jié)束時(shí)間:1705560641227
總耗時(shí):1毫秒
pack-2
根據(jù)最后一行的輸出,我們?cè)谂渲梦募信渲玫纳Я恕R簿褪钱惒秸?qǐng)求與異步任務(wù)這一點(diǎn)是相同的,默認(rèn)會(huì)使用系統(tǒng)默認(rèn)提供的線程池(上面介紹了默認(rèn)的自動(dòng)配置)。
源碼分析
在SpringMVC中具體Controller接口的方法的調(diào)用是通過HandlerMapping,而這個(gè)具體實(shí)現(xiàn)是RequestMappingHandlerAdapter,所以我們就先從這里開始看
public class RequestMappingHandlerAdapter {
// 根據(jù)輸出實(shí)際并沒有使用這里默認(rèn)的
private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("MvcAsync");
protected ModelAndView handleInternal(HttpServletRequest request, HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
invokeHandlerMethod(request, response, handlerMethod);
}
protected ModelAndView invokeHandlerMethod(HttpServletRequest request, HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
// ...
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
// 設(shè)置異步任務(wù)執(zhí)行器(線程池)
asyncManager.setTaskExecutor(this.taskExecutor);
// ...
}
public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
}
既然沒有使用默認(rèn)的線程池對(duì)象,那么這里是如果設(shè)置的系統(tǒng)默認(rèn)TaskExecutor? 這是在容器實(shí)例化RequestMappingHandlerAdapter Bean對(duì)象時(shí)設(shè)置。
public class WebMvcConfigurationSupport {
@Bean
public RequestMappingHandlerAdapter requestMappingHandlerAdapter(...) {
RequestMappingHandlerAdapter adapter = createRequestMappingHandlerAdapter();
// ...
AsyncSupportConfigurer configurer = getAsyncSupportConfigurer();
if (configurer.getTaskExecutor() != null) {
adapter.setTaskExecutor(configurer.getTaskExecutor());
}
// ...
return adapter;
}
protected AsyncSupportConfigurer getAsyncSupportConfigurer() {
// 默認(rèn)為null
if (this.asyncSupportConfigurer == null) {
this.asyncSupportConfigurer = new AsyncSupportConfigurer();
// 調(diào)用子類DelegatingWebMvcConfiguration方法(子類重寫了)
configureAsyncSupport(this.asyncSupportConfigurer);
}
return this.asyncSupportConfigurer;
}
}
// 子類
public class DelegatingWebMvcConfiguration extends WebMvcConfigurationSupport {
private final WebMvcConfigurerComposite configurers = new WebMvcConfigurerComposite();
// 獲取當(dāng)前環(huán)境下所有自定義的WebMvcConfigurer
@Autowired(required = false)
public void setConfigurers(List<WebMvcConfigurer> configurers) {
if (!CollectionUtils.isEmpty(configurers)) {
this.configurers.addWebMvcConfigurers(configurers);
}
}
protected void configureAsyncSupport(AsyncSupportConfigurer configurer) {
// 在組合器中依次調(diào)用自定義的WebMvcConfigurer(如果有重寫對(duì)應(yīng)的configureAsyncSupport方法)
this.configurers.configureAsyncSupport(configurer);
}
}
SpringBoot默認(rèn)提供了一個(gè)自定義的WebMvcConfigurer且重寫了configureAsyncSupport方法。
public static class WebMvcAutoConfigurationAdapter implements WebMvcConfigurer {
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
// 判斷容器中是否以applicationTaskExecutor為beanName的bean
// SpringBoot自動(dòng)配置有提供這樣的bean(TaskExecutionAutoConfiguration)
if (this.beanFactory.containsBean(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME)) {
// 獲取bean對(duì)象
Object taskExecutor = this.beanFactory
.getBean(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME);
if (taskExecutor instanceof AsyncTaskExecutor) {
configurer.setTaskExecutor(((AsyncTaskExecutor) taskExecutor));
}
}
// ...
}
}
到此,異步請(qǐng)求使用的線程池應(yīng)該清楚了使用的是系統(tǒng)默認(rèn)的線程池可通過配置文件修改默認(rèn)值。我們也可以通過自定義WebMvcConfigurer來重寫對(duì)應(yīng)的方法實(shí)現(xiàn)自己的線程池對(duì)象。
總結(jié):在默認(rèn)的情況下,異步任務(wù)與異步請(qǐng)求使用的是同一個(gè)線程池對(duì)象。
2.3 任務(wù)調(diào)用
先通過一個(gè)示例,查看默認(rèn)執(zhí)行情況
@Service
public class SchedueService {
@Scheduled(cron = "*/2 * * * * *")
public void task() {
System.out.printf("%s: %d - 任務(wù)調(diào)度%n", Thread.currentThread().getName(), System.currentTimeMillis()) ;
}
}
輸出
scheduling-1: 1705564144014 - 任務(wù)調(diào)度
scheduling-1: 1705564146010 - 任務(wù)調(diào)度
scheduling-1: 1705564148003 - 任務(wù)調(diào)度
scheduling-1: 1705564150005 - 任務(wù)調(diào)度
scheduling-1: 1705564152001 - 任務(wù)調(diào)度
使用的scheduling相應(yīng)的線程池,每隔2s執(zhí)行任務(wù)。
源碼分析
@Import(SchedulingConfiguration.class)
public @interface EnableScheduling {}
核心類SchedulingConfiguration
@Configuration(proxyBeanMethods = false)
public class SchedulingConfiguration {
// 該bean專門用來處理@Scheduled注解的核心處理器類
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
ScheduledAnnotationBeanPostProcessor
public class ScheduledAnnotationBeanPostProcessor {
// 看到這個(gè)就能猜到,系統(tǒng)默認(rèn)是獲取的taskScheduler bean對(duì)象
public static final String DEFAULT_TASK_SCHEDULER_BEAN_NAME = "taskScheduler";
public void onApplicationEvent(ContextRefreshedEvent event) {
// 是同一個(gè)容器
if (event.getApplicationContext() == this.applicationContext) {
// 注冊(cè)任務(wù)
finishRegistration();
}
}
private void finishRegistration() {
if (this.beanFactory instanceof ListableBeanFactory) {
// 獲取所有SchedulingConfigurer,我們可以實(shí)現(xiàn)該接口自定義配置線程池
Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
// 在這里可以自定義的不僅僅是線程池了,還有其它的東西
configurer.configureTasks(this.registrar);
}
}
// 在默認(rèn)情況下,上面是沒有自定義的SchedulingConfigurer。
// 有調(diào)度的任務(wù)并且當(dāng)前的調(diào)度任務(wù)注冊(cè)器中沒有線程池對(duì)象
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
try {
// 通過類型查找TaskScheduler類型的bean對(duì)象。這里就會(huì)獲取到系統(tǒng)默認(rèn)的
// 由TaskSchedulingAutoConfiguration自動(dòng)配置
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
}
// 如果查找到了多個(gè)則進(jìn)入這個(gè)catch(如你自己也定義了多個(gè))
catch (NoUniqueBeanDefinitionException ex) {
try {
// 通過beanName進(jìn)行查找
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
}
catch (NoSuchBeanDefinitionException ex2) {
}
}
catch (NoSuchBeanDefinitionException ex) {
try {
// 如果不存在則在查找ScheduledExecutorService
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
}
// 如果有多個(gè)則再根據(jù)名字查找
catch (NoUniqueBeanDefinitionException ex2) {
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
}
}
}
}
this.registrar.afterPropertiesSet();
}
}
以上就是任務(wù)調(diào)用如何查找使用線程池對(duì)象的。根據(jù)上面的分析我們也可以通過如下的方式進(jìn)行自定義處理。
自定義SchedulingConfigurer
@Component
public class PackSchedulingConfigurer implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler() ;
taskScheduler.setThreadNamePrefix("pack-schedule-") ;
taskScheduler.afterPropertiesSet() ;
taskRegistrar.setTaskScheduler(taskScheduler ) ;
}
}
執(zhí)行結(jié)果
pack-schedule-1: 1705567234013 - 任務(wù)調(diào)度
pack-schedule-1: 1705567236011 - 任務(wù)調(diào)度
自定義ThreadPoolTaskScheduler
@Bean
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler() ;
scheduler.setThreadNamePrefix("pack-custom-scheduler") ;
return scheduler ;
}
執(zhí)行結(jié)果
pack-custom-scheduler1: 1705567672013 - 任務(wù)調(diào)度
pack-custom-scheduler1: 1705567674011 - 任務(wù)調(diào)度
通過配置文件自定義默認(rèn)配置
spring:
task:
scheduling:
pool:
size: 2
thread-name-prefix: pack-custom-
注意:系統(tǒng)默認(rèn)的任務(wù)調(diào)用pool.size=1。所以如果你有多個(gè)調(diào)度任務(wù)要當(dāng)心了。