提高系統(tǒng)性能的必備技能:異步任務(wù)完全指南
環(huán)境:Spring5.3.23
本文將介紹Spring框架中的異步任務(wù),闡述為什么要使用異步任務(wù)以及異步任務(wù)帶來的好處。通過對(duì)Spring異步任務(wù)的深入了解,我們將掌握如何在Spring應(yīng)用程序中實(shí)現(xiàn)高效的異步處理,并利用異步任務(wù)提高應(yīng)用程序的性能和響應(yīng)能力。
1. 前言
為什么要使用異步任務(wù)?
在傳統(tǒng)的同步應(yīng)用程序中,每個(gè)請(qǐng)求都需要等待處理完成后再返回結(jié)果。這種方式在處理耗時(shí)操作時(shí)會(huì)導(dǎo)致應(yīng)用程序性能下降,響應(yīng)時(shí)間增加。為了解決這個(gè)問題,異步任務(wù)應(yīng)運(yùn)而生。通過將耗時(shí)操作移至后臺(tái)執(zhí)行,異步任務(wù)可以避免阻塞主線程,提高應(yīng)用程序的并發(fā)能力和響應(yīng)速度。
異步任務(wù)的好處:
提高性能:異步任務(wù)可以避免阻塞主線程,使得應(yīng)用程序能夠同時(shí)處理多個(gè)請(qǐng)求,提高了系統(tǒng)的吞吐量和性能。
改善用戶體驗(yàn):由于異步任務(wù)無需等待耗時(shí)操作完成,因此可以快速返回結(jié)果給用戶。這對(duì)于改善用戶體驗(yàn)非常有益,用戶可以在短暫的等待時(shí)間后獲得響應(yīng),而無需長時(shí)間等待。
高效利用資源:異步任務(wù)可以充分利用系統(tǒng)資源,例如CPU和內(nèi)存。在多核CPU系統(tǒng)中,異步任務(wù)可以同時(shí)運(yùn)行多個(gè)任務(wù),提高了資源的利用率。
降低系統(tǒng)負(fù)載:通過將耗時(shí)操作移至后臺(tái)執(zhí)行,異步任務(wù)可以減輕前臺(tái)系統(tǒng)的負(fù)載,使其專注于處理核心業(yè)務(wù)邏輯。
適應(yīng)高并發(fā)場(chǎng)景:在面對(duì)大量并發(fā)請(qǐng)求時(shí),異步任務(wù)能夠更好地應(yīng)對(duì)負(fù)載壓力,保證系統(tǒng)的穩(wěn)定性和可用性。
總之,Spring異步任務(wù)為我們提供了一種高效處理耗時(shí)操作的方法,通過提高性能、改善用戶體驗(yàn)、高效利用資源、降低系統(tǒng)負(fù)載以及適應(yīng)高并發(fā)場(chǎng)景等方面的優(yōu)勢(shì),幫助我們構(gòu)建更加出色的應(yīng)用程序。
2. 實(shí)戰(zhàn)代碼
為了演示的方便,所有示例代碼我都將在一個(gè)類中完成。
在項(xiàng)目中要使用異步任務(wù)非常的簡單,我們只需要通過一個(gè)注解開啟即可,剩下的就只需要在需要異步執(zhí)行的方法上添加上@Async注解即可。示例代碼如下:
通過@EnableAsync開啟異步任務(wù)
// 該配置類就作用就是開啟異步任務(wù)的能力
@Configuration
@EnableAsync
static class AppConfig {
}
測(cè)試使用的組件類
@Component
static class AsyncService {
// 我們只需要在我們的方法上添加@Async即可
// 這樣該方法的執(zhí)行將會(huì)在另外的線程中執(zhí)行
@Async
public void calc() {
System.out.printf("執(zhí)行線程: %s - 開始執(zhí)行%n", Thread.currentThread().getName()) ;
try {
// 模擬耗時(shí)的操作
TimeUnit.SECONDS.sleep(2) ;
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("線程: %s - 執(zhí)行完成%n", Thread.currentThread().getName()) ;
}
}
測(cè)試代碼
try (GenericApplicationContext context = new GenericApplicationContext()) {
// 容器中注冊(cè)相關(guān)的Bean
context.registerBean(ConfigurationClassPostProcessor.class) ;
context.registerBean(AppConfig.class) ;
context.registerBean(AsyncService.class) ;
context.refresh() ;
// 從容器中獲取組件
AsyncService as = context.getBean(AsyncService.class) ;
// 下面調(diào)用3次任務(wù)
as.calc() ;
as.calc() ;
as.calc() ;
System.out.println("主線程結(jié)束...") ;
System.in.read() ;
}
執(zhí)行結(jié)果
主線程結(jié)束...
執(zhí)行線程: SimpleAsyncTaskExecutor-1 - 開始執(zhí)行
執(zhí)行線程: SimpleAsyncTaskExecutor-2 - 開始執(zhí)行
執(zhí)行線程: SimpleAsyncTaskExecutor-3 - 開始執(zhí)行
線程: SimpleAsyncTaskExecutor-2 - 執(zhí)行完成
線程: SimpleAsyncTaskExecutor-1 - 執(zhí)行完成
線程: SimpleAsyncTaskExecutor-3 - 執(zhí)行完成
主線程早早的執(zhí)行完了,每次方法的調(diào)用都在不同的線程,與阻塞執(zhí)行相比大大提高了系統(tǒng)的吞吐量。
使用就是這么簡單,但是我們還需要更加的深入了解這里異步執(zhí)行的線程是什么樣的一個(gè)線程池?是否可以自定義自己的線程池?接下來就從這2個(gè)問題來更加的深入學(xué)習(xí)異步任務(wù)執(zhí)行的原理。
3. 異步任務(wù)使用的線程池
在Spring中使用異步任務(wù)的底層原理主要是通過Spring AOP(面向切面編程)來實(shí)現(xiàn)的。AOP是一種編程思想,它通過在程序執(zhí)行的關(guān)鍵點(diǎn)上添加橫切關(guān)注點(diǎn),來提高代碼的復(fù)用性和可維護(hù)性。
在Spring異步任務(wù)中,AOP被用于攔截方法的執(zhí)行,將耗時(shí)的任務(wù)放入線程池中執(zhí)行,從而避免阻塞主線程。具體來說,Spring異步任務(wù)底層使用了Java的Future和Callable接口,以及線程池技術(shù)來實(shí)現(xiàn)異步執(zhí)行。
首先,當(dāng)我們?cè)赟pring中定義一個(gè)異步方法時(shí),實(shí)際上該方法并不會(huì)立即執(zhí)行,而是會(huì)被封裝為一個(gè)Callable對(duì)象。Callable接口與Runnable接口類似,但它可以返回結(jié)果,并可以拋出異常。
異步任務(wù)執(zhí)行的核心處理器類是:AsyncAnnotationBeanPostProcessor
該處理器的創(chuàng)建是在@EnableAsync注解中的@Import導(dǎo)入的類
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
// 線程池是引用的父類中的成員
bpp.configure(this.executor, this.exceptionHandler);
return bpp;
}
}
// 父類AbstractAsyncConfiguration
public abstract class AbstractAsyncConfiguration implements ImportAware {
protected Supplier<Executor> executor;
// 這里的入?yún)⑹俏覀兛梢宰远x實(shí)現(xiàn)的地方,后面會(huì)講到
@Autowired
void setConfigurers(ObjectProvider<AsyncConfigurer> configurers) {
Supplier<AsyncConfigurer> configurer = SingletonSupplier.of(() -> {
List<AsyncConfigurer> candidates = configurers.stream().collect(Collectors.toList());
if (CollectionUtils.isEmpty(candidates)) {
return null;
}
// 如果系統(tǒng)中定義了多個(gè)AsyncConfigurer將會(huì)拋出異常
if (candidates.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
return candidates.get(0);
});
// 如果沒有自定義,則調(diào)用AsyncConfigurer#getAsyncExecutor,默認(rèn)這個(gè)方法返回的是null
// 所以,在默認(rèn)情況下,這里的executor還是為null
this.executor = adapt(configurer, AsyncConfigurer::getAsyncExecutor);
this.exceptionHandler = adapt(configurer, AsyncConfigurer::getAsyncUncaughtExceptionHandler);
}
}
接著進(jìn)入核心的處理器類AsyncAnnotationBeanPostProcessor 該類中現(xiàn)在設(shè)置的executor還是為null。
public class AsyncAnnotationBeanPostProcessor {
// 在示例化當(dāng)前處理器過程中會(huì)執(zhí)行setBeanFactory方法
// 該方法中會(huì)定義AOP的切面(低級(jí)切面)Advisor
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
// 該構(gòu)造方法中會(huì)構(gòu)建相應(yīng)的通知及切入點(diǎn)
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
}
}
// 切面
public class AsyncAnnotationAdvisor {
public AsyncAnnotationAdvisor(...) {
// 構(gòu)建通知攔截器
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
protected Advice buildAdvice() {
// 該攔截器說下繼承關(guān)系
// 1. AnnotationAsyncExecutionInterceptor繼承 AsyncExecutionInterceptor
// 2. AsyncExecutionInterceptor 繼承 AsyncExecutionAspectSupport
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
// 在該方法中進(jìn)行初始化線程池
// 調(diào)用父類AsyncExecutionAspectSupport#configure方法
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
}
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport {
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
// 先調(diào)用父類,默認(rèn)情況下父類返回null,下面有分析
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
// 當(dāng)為null,這里就創(chuàng)建默認(rèn)的線程池SimpleAsyncTaskExecutor
// 這也就是上面的示例代碼中默認(rèn)線程池名稱打印的是SimpleAsyncTaskExecutor-*
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
}
public abstract class AsyncExecutionAspectSupport {
public void configure(@Nullable Supplier<Executor> defaultExecutor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
// defaultExecutor為null,則會(huì)獲取系統(tǒng)默認(rèn)的getDefaultExecutor
// getDefaultExecutor這里的方法被子類AsyncExecutionInterceptor重寫了
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
}
// 初始化系統(tǒng)默認(rèn)的線程池
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// 從容器中查找TaskExcutor類型的Bean
return beanFactory.getBean(TaskExecutor.class);
} catch (NoUniqueBeanDefinitionException ex) {
try {
// 如果容器中有多個(gè)這種Bean,則在通過beanName獲取
// beanName = taskExecutor
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
} catch (NoSuchBeanDefinitionException ex) {
try {
// 如果指定beanName=taskExecutor類型為TaskExecutor的Bean
// 則在獲取beanName=taskExecutor類型為Executor類型的Bean
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
}
}
return null;
}
}
分析到這,在我們當(dāng)前的環(huán)境下是沒有TaskExecutor或Executor類型的Bean。所以程序這里最終返回還是null。那這個(gè)默認(rèn)線程池是誰呢?繼續(xù)向下看
在上面的buildAdvice方法中構(gòu)建攔截器AnnotationAsyncExecutionInterceptor該攔截器是執(zhí)行的核心
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor {
public Object invoke(final MethodInvocation invocation) throws Throwable {
// 確定任務(wù)執(zhí)行的線程池
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
}
}
到此分析完了Spring的異步任務(wù)執(zhí)行使用線程池的情況?,F(xiàn)總結(jié)下查找線程池的流程步驟:
- 容器中查找AsyncConfigurer
- 在1中沒有,則容器中查找TaskExecutor類型的Bean,如果正好有一個(gè)則使用,如果有多個(gè)則從容器中查找beanName=taskExecutor,類型為Executor,如果沒有則返回null。
- 在2中如果沒有TaskExecutor類型的Bean,則從容器中查找beanName=taskExecutor,類型為Executor,如果沒有則返回null。
- 到此都還是沒有,則直接創(chuàng)建SimpleAsyncTaskExecutor對(duì)象作為線程池。
4. 自定義線程池
通過上面的分析你應(yīng)該知道了如何自定義線程池了。
自定義AsyncConfigurer
@Component
static class CustomAsyncConfigurer implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
return new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactory() {
private final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group = Thread.currentThread().getThreadGroup() ;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix = "pack-" + poolNumber.getAndIncrement() +"-thread-" ;
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}) ;
}
}
在容器中注冊(cè)上面的bean后,執(zhí)行結(jié)果如下:
主線程結(jié)束...
執(zhí)行線程: pack-1-thread-1 - 開始執(zhí)行
執(zhí)行線程: pack-1-thread-2 - 開始執(zhí)行
線程: pack-1-thread-2 - 執(zhí)行完成
線程: pack-1-thread-1 - 執(zhí)行完成
執(zhí)行線程: pack-1-thread-2 - 開始執(zhí)行
線程: pack-1-thread-2 - 執(zhí)行完成
自定義線程池生效了。
其它方式就不嘗試了。