在SpringBoot項(xiàng)目中使用CompletableFuture優(yōu)化并發(fā)REST調(diào)用的正確姿勢
環(huán)境:SpringBoot2.7.18
1. 簡介
在項(xiàng)目開發(fā)時(shí),經(jīng)常會(huì)遇到從不同的接口服務(wù)拉取數(shù)據(jù)并將其匯總到響應(yīng)中。在微服務(wù)中,這些數(shù)據(jù)源通常是外部 REST API。在本篇文章中,我們將使用 Java 的 CompletableFuture 高效地并行請求多個(gè)外部 REST API 中的數(shù)據(jù)。同時(shí),會(huì)對整個(gè)請求過程中的異常處理、請求超時(shí)進(jìn)行詳細(xì)的介紹。
2. 為什么要并行調(diào)用?
假設(shè)我們需要更新對象中的多個(gè)字段,每個(gè)字段的值都來自外部 REST 調(diào)用。一種方法也是最簡單的方式是依次調(diào)用每個(gè) API 來更新每個(gè)字段。
但是,等待一個(gè) REST 調(diào)用完成后再啟動(dòng)另一個(gè)會(huì)增加服務(wù)的整體響應(yīng)時(shí)間。例如,如果我們調(diào)用兩個(gè)應(yīng)用程序接口,每個(gè)需要 5 秒鐘,那么總時(shí)間至少要 10 秒鐘,因?yàn)榈诙€(gè)調(diào)用需要等待第一個(gè)調(diào)用完成。
相反,我們可以并行調(diào)用所有 API,這樣總時(shí)間就是最慢(耗時(shí)最長)的 REST 調(diào)用時(shí)間。例如,一個(gè)調(diào)用需要 7 秒,另一個(gè)需要 5 秒。在這種情況下,我們將等待 7 秒,因?yàn)槲覀円呀?jīng)并行處理了所有內(nèi)容,必須等待所有結(jié)果完成。
因此,并行化是減少服務(wù)響應(yīng)時(shí)間、提高服務(wù)可擴(kuò)展性和改善用戶體驗(yàn)的絕佳選擇。
3. 實(shí)戰(zhàn)案例
3.1 定義用于更新的目標(biāo) POJO
public class Purchase {
private String orderDescription ;
private String paymentDescription ;
private String buyerName ;
private String orderId ;
private String paymentId ;
private String userId ;
// getters and setters
}
該采購類有三個(gè)需要更新的字段,每個(gè)字段都需要通過 ID 進(jìn)行不同的 REST 調(diào)用來查詢。
接下來,先創(chuàng)建一個(gè)類,定義一個(gè) RestTemplate Bean 和一個(gè)用于 REST 調(diào)用的域 URL:
@Component
public class PurchaseRestCallsAsyncExecutor {
private static final String BASE_URL = "http://www.pack.com" ;
private final RestTemplate restTemplate ;
public PurchaseRestCallsAsyncExecutor(RestTemplate restTemplate) {
this.restTemplate = restTemplate ;
}
}
接下來,分別編寫3個(gè)REST接口調(diào)用的方法
現(xiàn)在,讓我們來定義 /orders API 調(diào)用:
public String getOrderDescription(String orderId) {
ResponseEntity<String> result = restTemplate.getForEntity(
String.format("%s/orders/%s", BASE_URL, orderId),
String.class) ;
return result.getBody() ;
}
然后,讓我們定義 /payments API 調(diào)用:
public String getPaymentDescription(String paymentId) {
ResponseEntity<String> result = restTemplate.getForEntity(
String.format("%s/payments/%s", BASE_URL, paymentId),
String.class) ;
return result.getBody() ;
}
最后,我們定義了 /users API 調(diào)用:
public String getUserName(String userId) {
ResponseEntity<String> result = restTemplate.getForEntity(
String.format("%s/users/%s", BASE_URL, userId),
String.class) ;
return result.getBody() ;
}
這三個(gè)接口方法都使用 getForEntity() 方法進(jìn)行 REST 調(diào)用,并將結(jié)果封裝在一個(gè) ResponseEntity 對象中。
3.2 使用 CompletableFuture 進(jìn)行多次 REST 調(diào)用
現(xiàn)在,我們就可以創(chuàng)建一個(gè)方法,用于構(gòu)建和運(yùn)行一組三個(gè) CompletableFutures:
public void updatePurchase(Purchase purchase) {
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
.thenAccept(purchase::setOrderDescription),
CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
.thenAccept(purchase::setPaymentDescription),
CompletableFuture.supplyAsync(() -> getUserName(purchase.getUserId()))
.thenAccept(purchase::setBuyerName)
).join() ;
}
我們使用allOf()方法來構(gòu)建CompletableFuture的步驟。每個(gè)參數(shù)都是一個(gè)并行任務(wù),這些任務(wù)以另一個(gè)通過REST調(diào)用及其結(jié)果構(gòu)建的CompletableFuture的形式存在。
我們首先使用supplyAsync()方法提供了一個(gè)Supplier,從這個(gè)Supplier中我們將檢索數(shù)據(jù)。然后,我們使用thenAccept()來消費(fèi)supplyAsync()的結(jié)果,并將其設(shè)置到Purchase類中相應(yīng)的字段上。
在allOf()方法結(jié)束時(shí),我們只是構(gòu)建了這些任務(wù),但尚未執(zhí)行任何操作。
最后,我們在所有任務(wù)構(gòu)建完畢后調(diào)用join()方法來并行運(yùn)行所有任務(wù)并收集它們的結(jié)果。由于join()是一個(gè)線程阻塞操作,我們只在最后調(diào)用它,而不是在每個(gè)任務(wù)步驟之后調(diào)用,這是為了通過減少線程阻塞來優(yōu)化應(yīng)用程序性能。
由于我們沒有為supplyAsync()方法提供一個(gè)自定義的ExecutorService,因此所有任務(wù)都在同一個(gè)Executor中運(yùn)行。默認(rèn)情況下,Java使用ForkJoinPool.commonPool()。
建議為supplyAsync()方法指定一個(gè)自定義的ExecutorService是一個(gè)好習(xí)慣,這樣我們可以對線程池參數(shù)有更多的控制。
3.3 錯(cuò)誤處理
在分布式系統(tǒng)中,服務(wù)不可用或網(wǎng)絡(luò)故障是很常見的。這些故障可能發(fā)生在外部 REST API 中,而我們作為該 API 的客戶端卻并不知情。例如,如果應(yīng)用程序宕機(jī),這就導(dǎo)致發(fā)送的請求將永遠(yuǎn)無法完成。
因此,我們可以使用 handle() 方法單獨(dú)處理每個(gè) REST 調(diào)用異常:
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) ;
該方法的參數(shù)是一個(gè) BiFunction,其中包含作為參數(shù)的上一個(gè)任務(wù)的結(jié)果和異常。 接下來我們將 handle() 步驟添加到 CompletableFuture 的一個(gè)步驟中
public void updatePurchaseHandlingExceptions(Purchase purchase) {
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
.thenAccept(purchase::setPaymentDescription)
.handle((result, exception) -> {
if (exception != null) {
// 異常處理
return null ;
}
return result ;
})
).join() ;
}
在示例中,handle() 從 thenAccept() 調(diào)用的 setPaymentDescription() 中獲取一個(gè) Void 類型,然后將 thenAccept() 動(dòng)作中拋出的任何錯(cuò)誤存儲(chǔ)到異常中。最后,如果沒有異常拋出,則 handle() 返回作為參數(shù)傳遞的值。否則,返回空值。
3.4 處理 REST 調(diào)用超時(shí)
當(dāng)我們使用 CompletableFuture 時(shí),我們可以指定一個(gè)任務(wù)超時(shí),類似于我們在 REST 調(diào)用中定義的超時(shí)。因此,如果任務(wù)沒有在指定時(shí)間內(nèi)完成,Java 會(huì)以超時(shí)異常(TimeoutException)結(jié)束任務(wù)執(zhí)行,修改代碼如下:
public void updatePurchaseHandlingExceptions(Purchase purchase) {
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
.thenAccept(purchase::setOrderDescription)
// 設(shè)置超時(shí)時(shí)間5s
.orTimeout(5, TimeUnit.SECONDS)
.handle((result, exception) -> {
if (exception instanceof TimeoutException) {
// 異常處理
return null ;
}
return result ;
})
).join() ;
}
我們在 CompletableFuture 中通過 orTimeout() 方法設(shè)置超時(shí)時(shí)間,如果在 5 秒內(nèi)未完成任務(wù)時(shí)停止任務(wù)執(zhí)行。同時(shí)還在 handle() 方法中添加了 if 語句,以便單獨(dú)處理 TimeoutException。在 CompletableFuture 中添加超時(shí)可確保任務(wù)始終完成。這對于避免線程無限期地等待可能永遠(yuǎn)不會(huì)完成的操作結(jié)果非常重要。因此,它減少了處于長時(shí)間運(yùn)行狀態(tài)的線程數(shù)量,提高了應(yīng)用程序的健康度。