性能提升!@Async與CompletableFuture優(yōu)雅應(yīng)用
1. 簡介
@Async 和 CompletableFuture 是實(shí)現(xiàn)異步處理的強(qiáng)大工具組合。@Async 是Spring框架提供的一個注解,用于標(biāo)記方法以表明它將在Spring管理的線程池中的另一個線程上異步執(zhí)行。這使得開發(fā)人員能夠在不阻塞主線程的情況下執(zhí)行耗時的任務(wù),從而提高應(yīng)用程序的整體性能和響應(yīng)速度。
CompletableFuture 是Java 8引入的一個強(qiáng)大的類,它代表了一個可能尚未完成的計算的結(jié)果。CompletableFuture 提供了豐富的API來支持異步編程模式,如回調(diào)、組合操作、錯誤處理等。通過將@Async與CompletableFuture結(jié)合使用,可以實(shí)現(xiàn)更高效的異步任務(wù)處理。
接下來,我們將介紹@Async與CompletableFuture結(jié)合的使用。
2. 實(shí)戰(zhàn)案例
2.1 @EnableAsync and @Async
Spring 自帶 @EnableAsync 注解,可應(yīng)用于 @Configuration 類以實(shí)現(xiàn)異步行為。@EnableAsync 注解會查找標(biāo)有 @Async 注解的方法,并在后臺線程池中運(yùn)行這些方法。
@Async 注解方法在單獨(dú)的線程中執(zhí)行,并返回 CompletableFuture 來保存異步計算的結(jié)果。
開啟異步功能
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean(name = "asyncExecutor")
public Executor asyncExecutor() {
int core = Runtime.getRuntime().availableProcessors() ;
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(core) ;
executor.setMaxPoolSize(core) ;
executor.setQueueCapacity(100) ;
executor.setThreadNamePrefix("PackAsync-") ;
executor.initialize() ;
return executor ;
}
}
如上我們自定義了線程池,該線程池用來執(zhí)行我們的異步任務(wù)。你也可以不用配置,使用系統(tǒng)默認(rèn)的線程池。
創(chuàng)建異步任務(wù)
@Async("asyncExecutor")
public CompletableFuture<EmployeeNames> task() {
// TODO
}
用 @Async 對方法進(jìn)行注解,該方法應(yīng)異步運(yùn)行。該方法必須是公共的,可以返回值,也可以不返回值。如果返回值,則應(yīng)使用 Future 接口實(shí)現(xiàn)對其進(jìn)行封裝。
這里指定了使用我們自定義的線程池執(zhí)行異步任務(wù)。
多個異步任務(wù)同時執(zhí)行
CompletableFuture.allOf(
asyncMethodOne,
asyncMethodTwo,
asyncMethodThree
).join() ;
要合并多個異步任務(wù)的結(jié)果,通過使用 join() 方法,這將等待所有異步任務(wù)執(zhí)行完成才會繼續(xù)往后執(zhí)行。
2.2 Rest Controller中調(diào)用異步任務(wù)
接下來,我們將創(chuàng)建一個 REST API,從三個遠(yuǎn)程服務(wù)異步獲取數(shù)據(jù),當(dāng)所有三個服務(wù)的響應(yīng)都可用時,再匯總響應(yīng)。
- 調(diào)用/addresses接口獲取所有地址信息
- 調(diào)用/phones接口獲取所有電話數(shù)據(jù)
- 調(diào)用/names接口獲取所有姓名
- 等待以上3個接口都返回結(jié)果后再進(jìn)行處理
- 匯總所有三個應(yīng)用程序接口的響應(yīng),并生成最終響應(yīng)發(fā)送回客戶端
遠(yuǎn)程接口準(zhǔn)備
@RestController
public class EmployeeController {
@GetMapping("/addresses")
public EmployeeAddresses addresses() {
// TODO
}
@GetMapping("/phones")
public EmployeePhone phones() {
// TODO
}
@GetMapping("/names")
public EmployeeNames names() {
// TODO
}
}
我們將通過異步的方式調(diào)用上面定義的3個接口。
異步調(diào)用REST API
這些服務(wù)方法將從遠(yuǎn)程應(yīng)用程序接口或數(shù)據(jù)庫中提取數(shù)據(jù),必須在不同的線程中并行運(yùn)行,以加快處理速度。
@Service
public class AsyncService {
private static Logger logger = LoggerFactory.getLogger(AsyncService.class);
private final RestTemplate restTemplate;
public AsyncService(RestTemplate restTemplate) {
this.restTemplate = restTemplate ;
}
@Async("asyncExecutor")
public CompletableFuture<EmployeeNames> names() {
logger.info("getEmployeeName starts");
EmployeeNames employeeNameData = restTemplate.getForObject("http://localhost:8080/names", EmployeeNames.class) ;
logger.info("employeeNameData, {}", employeeNameData) ;
logger.info("employeeNameData completed");
return CompletableFuture.completedFuture(employeeNameData);
}
@Async("asyncExecutor")
public CompletableFuture<EmployeeAddresses> addresses() {
logger.info("getEmployeeAddress starts");
EmployeeAddresses employeeAddressData = restTemplate.getForObject("http://localhost:8080/addresses", EmployeeAddresses.class);
logger.info("employeeAddressData, {}", employeeAddressData) ;
logger.info("employeeAddressData completed");
return CompletableFuture.completedFuture(employeeAddressData);
}
@Async("asyncExecutor")
public CompletableFuture<EmployeePhone> phones() {
logger.info("getEmployeePhone starts") ;
EmployeePhone employeePhoneData = restTemplate.getForObject("http://localhost:8080/phones", EmployeePhone.class) ;
logger.info("employeePhoneData, {}", employeePhoneData) ;
logger.info("employeePhoneData completed") ;
return CompletableFuture.completedFuture(employeePhoneData) ;
}
}
注意:你可不能如下方式來執(zhí)行遠(yuǎn)程接口的調(diào)用。
CompletableFuture.supplyAsync(() -> {
return restTemplate.getForObject("http://localhost:8080/phones", EmployeePhone.class) ;
}) ;
如果你這樣寫,你的遠(yuǎn)程接口并非在你的異步線程中執(zhí)行,而是在CompletableFuturue的線程池中執(zhí)行(ForkJoinPool)。
2.3 聚合異步任務(wù)
接下來在REST API中調(diào)用上面的異步方法、消耗和聚合其響應(yīng)并返回客戶端。
@RestController
public class AsyncController {
private final AsyncService asyncService;
public AsyncController(AsyncService service) {
this.asyncService = asyncService ;
}
@GettMapping("/profile/infos")
public EmployeeDTO infos() throws Exception {
CompletableFuture<EmployeeAddresses> addresses = asyncService.addresses() ;
CompletableFuture<EmployeeNames> names = asyncService.names() ;
CompletableFuture<EmployeePhone> phones = asyncService.phones() ;
// 等待所有異步任務(wù)都執(zhí)行完成
CompletableFuture.allOf(addresses, names, phones).join() ;
return new EmployeeDTO(addresses.get(), names.get(), phones.get()) ;
}
}
整個請求的耗時將會是請求最長REST API調(diào)用所用的時間,這大大提升該接口的性能。
2.4 異常處理
當(dāng)方法的返回類型是 Future 時,F(xiàn)uture.get() 方法會拋出異常,我們應(yīng)該在聚合結(jié)果之前使用 try-catch 塊捕獲并處理異常。
問題是,如果異步方法不返回任何值,那么就很難知道方法執(zhí)行時是否發(fā)生了異常。我們可以使用 AsyncUncaughtExceptionHandler 實(shí)現(xiàn)來捕獲和處理此類異常。
@Configuration
public class AsyncConfig implements AsyncConfigurer {
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncExceptionHandler() ;
}
public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
private final Logger logger = LoggerFactory.getLogger(AsyncExceptionHandler.class);
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
logger.error("Unexpected asynchronous exception at : "
+ method.getDeclaringClass().getName() + "." + method.getName(), ex);
}
}
}