Spring任務(wù)調(diào)度&異步任務(wù)&Web異步請求三者如何配置線程池?
一、任務(wù)調(diào)度
注解類:@Scheduled
核心處理類:ScheduledAnnotationBeanPostProcessor
使用的線程池:從容器中查詢TaskScheduler。
- 首先在容器中通過類型查找TaskScheduler Bean,如果沒有則拋出NoSuchBeanDefinitionException異常。
- 在這一步中,如果找到多個,那么會在通過beanName=taskScheduler在容器中查找
- 在上一步中拋出異常后會繼續(xù)查找java.util.concurrent.ScheduledExecutorService 類型的Bean。
- 在這一步中,如果找到多個,那么會在通過beanName=taskScheduler在容器中查找
- 在上一步中還是沒有則結(jié)束(程序并不會報錯)
如果上面流程都沒有找到,則會通過如下方式創(chuàng)建一個。
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
在Springboot中有個自動配置類會配置一個TaskSchedulingAutoConfiguration。
public class TaskSchedulingAutoConfiguration {
@Bean
@ConditionalOnBean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@ConditionalOnMissingBean({ SchedulingConfigurer.class, TaskScheduler.class, ScheduledExecutorService.class })
public ThreadPoolTaskScheduler taskScheduler(TaskSchedulerBuilder builder) {
return builder.build();
}
@Bean
@ConditionalOnMissingBean
public TaskSchedulerBuilder taskSchedulerBuilder(TaskSchedulingProperties properties,
ObjectProvider<TaskSchedulerCustomizer> taskSchedulerCustomizers) {
TaskSchedulerBuilder builder = new TaskSchedulerBuilder();
builder = builder.poolSize(properties.getPool().getSize());
Shutdown shutdown = properties.getShutdown();
builder = builder.awaitTermination(shutdown.isAwaitTermination());
builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
builder = builder.customizers(taskSchedulerCustomizers);
return builder;
}
}
二、異步任務(wù)
- 注解類:Async。
- 核心處理類:AsyncAnnotationBeanPostProcessor。
通過ProxyAsyncConfiguration配置,該類繼承AbstractAsyncConfiguration。
在父類中會初始化,下面兩個成員變量:
@Configuration(proxyBeanMethods = false)
public abstract class AbstractAsyncConfiguration implements ImportAware {
@Nullable
protected Supplier<Executor> executor;
@Nullable
protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
// 在容器中查找AsyncConfigurer Bean 且只能有一個
@Autowired
void setConfigurers(ObjectProvider<AsyncConfigurer> configurers) {
Supplier<AsyncConfigurer> configurer = SingletonSupplier.of(() -> {
List<AsyncConfigurer> candidates = configurers.stream().collect(Collectors.toList());
if (CollectionUtils.isEmpty(candidates)) {
return null;
}
if (candidates.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
return candidates.get(0);
});
this.executor = adapt(configurer, AsyncConfigurer::getAsyncExecutor);
this.exceptionHandler = adapt(configurer, AsyncConfigurer::getAsyncUncaughtExceptionHandler);
}
private <T> Supplier<T> adapt(Supplier<AsyncConfigurer> supplier, Function<AsyncConfigurer, T> provider) {
return () -> {
AsyncConfigurer configurer = supplier.get();
return (configurer != null ? provider.apply(configurer) : null);
};
}
}
使用的線程池:
- 首先在容器中通過類型查找AsyncConfigurer Bean。
- 如果沒有則設(shè)置默認的AsyncConfigurer::getAsyncExecutor 該方法是接口中默認方法,返回的是null。
- 在上一步中如果容器中沒有AsyncConfigurer,那么設(shè)置到AsyncAnnotationBeanPostProcessor中也將就是null。
- 初始化AsyncAnnotationBeanPostProcessor。
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
@Nullable
private Supplier<Executor> executor;
@Nullable
private Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
// 構(gòu)建切面Advisor
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
}
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
// 構(gòu)建通知類
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
// 調(diào)用父類方法
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
}
public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
public void configure(@Nullable Supplier<Executor> defaultExecutor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
// 如果defaultExecutor則調(diào)用getDefaultExecutor方法,該方法在子類重寫了
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// 容器中查找TaskExecutor類型的Bean
return beanFactory.getBean(TaskExecutor.class);
} catch (NoUniqueBeanDefinitionException ex) {
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
} catch (NoSuchBeanDefinitionException ex2) {
}
} catch (NoSuchBeanDefinitionException ex) {
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
} catch (NoSuchBeanDefinitionException ex2) {
}
// Giving up -> either using local default executor or none at all...
}
}
return null;
}
}
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
// 先通過父類獲取
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
// 如果父類獲取不到,則創(chuàng)建默認的SimpleAsyncTaskExecutor
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
}
總結(jié):
- 從容器中查找TaskExecutor Bean。
- 上一步?jīng)]有找到,則查找beanName=taskExecutor,類型為java.util.concurrent.Executor的Bean。
- 如果上一步還是沒有找到,那么最終創(chuàng)建默認的SimpleAsyncTaskExecutor 這是個沒有上限的線程池,來一個任務(wù)創(chuàng)建新線程。
如果執(zhí)行的異步任務(wù)很多且線程池,線程有限則多的任務(wù)會等待。
三、Web異步接口
RequestMappingHandlerAdapter。
public class RequestMappingHandlerAdapter {
// 默認是一個沒有上限的線程池
private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("MvcAsync");
protected ModelAndView invokeHandlerMethod(
HttpServletRequest request,
HttpServletResponse response,
HandlerMethod handlerMethod) throws Exception {
// ...
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
asyncManager.setTaskExecutor(this.taskExecutor);
// ...
}
}
創(chuàng)建RequestMappingHandlerAdapter Bean對象。
繼承關(guān)系。
public static class EnableWebMvcConfiguration extends DelegatingWebMvcConfiguration{}
public class DelegatingWebMvcConfiguration extends WebMvcConfigurationSupport {
private final WebMvcConfigurerComposite configurers = new WebMvcConfigurerComposite();
// WebMvcConfigurer默認實現(xiàn)
@Autowired(required = false)
public void setConfigurers(List<WebMvcConfigurer> configurers) {
if (!CollectionUtils.isEmpty(configurers)) {
this.configurers.addWebMvcConfigurers(configurers);
}
}
@Override
protected void configureAsyncSupport(AsyncSupportConfigurer configurer) {
this.configurers.configureAsyncSupport(configurer);
}
}
public class WebMvcConfigurationSupport implements ApplicationContextAware, ServletContextAware {
private AsyncSupportConfigurer asyncSupportConfigurer;
@Bean
public RequestMappingHandlerAdapter requestMappingHandlerAdapter(...) {
// ...
AsyncSupportConfigurer configurer = getAsyncSupportConfigurer();
if (configurer.getTaskExecutor() != null) {
adapter.setTaskExecutor(configurer.getTaskExecutor());
}
// ...
}
protected AsyncSupportConfigurer getAsyncSupportConfigurer() {
if (this.asyncSupportConfigurer == null) {
this.asyncSupportConfigurer = new AsyncSupportConfigurer();
configureAsyncSupport(this.asyncSupportConfigurer);
}
return this.asyncSupportConfigurer;
}
}
public class WebMvcAutoConfiguration {
public static class WebMvcAutoConfigurationAdapter implements WebMvcConfigurer, ServletContextAware {
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
// 判斷容器中是否有beanName = applicationTaskExecutor 的Bean
if (this.beanFactory.containsBean(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME)) {
// 通過beanName = applicationTaskExecutor獲取bean對象
Object taskExecutor = this.beanFactory.getBean(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME);
// 判斷是否AsyncTaskExecutor對象
if (taskExecutor instanceof AsyncTaskExecutor) {
configurer.setTaskExecutor(((AsyncTaskExecutor) taskExecutor));
}
}
Duration timeout = this.mvcProperties.getAsync().getRequestTimeout();
if (timeout != null) {
configurer.setDefaultTimeout(timeout.toMillis());
}
}
}
}
總結(jié):
- 默認使用SimpleAsyncTaskExecutor。
- 如果容器中存在以beanName = applicationTaskExecutor 且 類型是 AsyncTaskExecutor, 則使用該bean。
到這你應(yīng)該知道了這三者在線程池方面該如何正確配置及使用了。