DDD與微服務集成的第一戰(zhàn)役:客戶端重試&服務端冪等
當一個接口從簡單的內(nèi)部調(diào)用升級為遠程方法調(diào)用(RPC)會面臨很多問題,比如:
- 本地事務失效。在內(nèi)部調(diào)用時,多個方法通常在同一事務中執(zhí)行,可以使用本地數(shù)據(jù)庫事務來確保數(shù)據(jù)的一致性。但是,在遠程方法調(diào)用中,由于涉及到網(wǎng)絡通信,事務的邊界會擴展到多個系統(tǒng)之間,因此無法直接使用本地事務。如果遠程方法調(diào)用出現(xiàn)異常,可能會導致事務提交失敗,從而產(chǎn)生數(shù)據(jù)不一致;
- 第三狀態(tài)影響。網(wǎng)絡不確定性可能導致遠程調(diào)用無法成功獲得結(jié)果,例如網(wǎng)絡連接中斷、網(wǎng)絡超時等。在這種情況下,客戶端無法獲得期望的結(jié)果,調(diào)用會以網(wǎng)絡錯誤或超時的方式結(jié)束;
- 服務版本兼容性問題。如果服務接口發(fā)生變化,客戶端和服務端的版本不匹配可能導致調(diào)用失?。?/li>
- 性能問題、可用性問題、安全問題等;
由于涉及的問題比較多,這里重點分析和解決 RPC 調(diào)用時的第三狀態(tài)問題。
1. 什么是第三狀態(tài)
當一個客戶端發(fā)起一個RPC請求時,服務端可能會返回不同的狀態(tài),包括:
- 成功:服務端成功完成了客戶端發(fā)送的請求,并返回對應的響應結(jié)果;
- 失?。悍斩藷o法成功處理客戶端發(fā)送的請求,并返回錯誤信息或異常;
- 超時:調(diào)用方無法在規(guī)定時間收到服務端的處理結(jié)果,所以無法知道請求的最終處理結(jié)果;
如下圖所示:
圖片
一般情況下,調(diào)用方在規(guī)定時間收到被調(diào)用方的返回結(jié)果,能夠非常明確的知道處理結(jié)果是成功還是失敗。
當網(wǎng)絡或被調(diào)用方出問題,就會觸發(fā)超時,比如下圖所示:
圖片
如果被調(diào)用方異?;蛘呔W(wǎng)絡發(fā)生阻塞,調(diào)用方發(fā)送的 Request 請求沒有被正常處理,那調(diào)用方只能在等待若干時間后拋出異常進行流程中斷。
又或者如下圖所示:
圖片
被調(diào)用方處理時間過長或者網(wǎng)絡發(fā)生阻塞,調(diào)用方無法在規(guī)定時間獲得最終結(jié)果,也只能觸發(fā)超時中斷。
可見,如果發(fā)生 超時 情況,對于處理結(jié)果就處于未知狀態(tài),這就是所謂的“第三狀態(tài)”:
- 被調(diào)用方成功接收到請求并完成了處理;
- 被調(diào)用方完全沒有接收到請求;
在出現(xiàn)第三狀態(tài)時,在不做任何處理前,根本就無法獲取最終的處理結(jié)果。在該場景下,最通用的解決方案便是 客戶端重試 + 服務端冪等。
- 客戶端重試。指的是在RPC調(diào)用出現(xiàn)超時的情況下,客戶端自動重新發(fā)送相同的請求。然而,在客戶端重試時需要注意避免重復執(zhí)行有副作用的操作,比如避免重復插入數(shù)據(jù);
- 服務端冪等。指的是對于相同的輸入請求,服務端能夠產(chǎn)生相同的結(jié)果,而且不會對系統(tǒng)狀態(tài)造成影響。冪等性是為了應對由于重試等原因?qū)е轮貜蛨?zhí)行的副作用;
通過客戶端重試和服務端冪等的方式,可以增加RPC調(diào)用的可靠性和數(shù)據(jù)一致性??蛻舳酥卦嚳梢蕴幚砭W(wǎng)絡不穩(wěn)定、服務端故障等導致的失敗情況,而服務端冪等性能保證相同的請求不會重復執(zhí)行或引起數(shù)據(jù)不一致的問題。這兩個技術(shù)結(jié)合使用,可以提高分布式系統(tǒng)中RPC調(diào)用的健壯性和可靠性。
2. 客戶端重試
客戶端重試指的是在RPC調(diào)用失敗的情況下,客戶端自動重新發(fā)送相同的請求??蛻舳丝梢栽O置重試的次數(shù)和時間間隔,直到得到預期的成功響應或達到最大重試次數(shù)??蛻舳酥卦嚈C制可以彌補網(wǎng)絡不穩(wěn)定性或服務端異常導致的調(diào)用失敗。然而,在客戶端重試時需要注意避免重復執(zhí)行有副作用的操作,比如避免重復插入數(shù)據(jù)。此外,還需要合理設置重試策略,避免因頻繁的重試導致網(wǎng)絡負荷增加或服務端壓力過大。
如下圖所示:
圖片
- 第一次獲取商品信息時,由于網(wǎng)絡異常導致請求超時;
- 網(wǎng)絡異常觸發(fā) retry 機制,重新發(fā)起新的請求;
- 新請求成功發(fā)送至商品服務并獲取信息,從而使得流程從異常中恢復,最終正常執(zhí)行完成;
由此可見,重試的工作原理非常簡單。
Spring Retry是Spring Framework提供的一個模塊,用于簡化和增強應用程序中的重試操作。它提供了一種聲明式的方式來處理方法調(diào)用的重試,以應對在分布式系統(tǒng)或有限資源環(huán)境下可能出現(xiàn)的失敗情況。
Spring Retry模塊的主要特性包括:
- 聲明式注解:通過使用注解(例如@Retryable、@Recover等),可以將重試行為與方法關(guān)聯(lián)起來。通過在方法上添加@Retryable注解,可以指定需要進行重試的異常類型,最大重試次數(shù),重試間隔等信息;
- 容錯策略:Spring Retry提供了多種容錯策略,包括簡單重試、指數(shù)退避重試、固定時間間隔重試等,開發(fā)者可以根據(jù)具體需求選擇合適的策略;
- 回退機制:如果重試次數(shù)超過限定值仍然失敗,Spring Retry可以通過@Recover注解來指定一個回退方法,用于執(zhí)行備選邏輯或返回默認值;
要使用Spring Retry,需要在項目中引入相應的依賴:
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
2.1. Retryable
重試行為主要是由 @Retryable 注解完成,通過為目標方法添加注解,將其納入重試管理,Spring Retry將負責在出現(xiàn)失敗情況時自動進行重試。
以下是 @Retryable 注解代碼示例:
@Service
public class MyService {
@Retryable(value = {SomeException.class}, maxAttempts = 3)
public void doSomething() {
// 需要進行重試的業(yè)務邏輯
}
}
上述示例中的 doSomething() 方法標記為需要進行重試,在捕獲到 SomeException 異常時觸發(fā)重試,最多重試3次。
下表列出了 @Retryable 注解的所有屬性及其說明:
屬性名 | 說明 |
value | 定義重試的異常類型。默認情況下,將捕獲所有 |
maxAttempts | 定義最大重試次數(shù),默認為3次。當達到最大重試次數(shù)后,如果仍然失敗,則不再進行重試。 |
backoff | 定義重試間隔策略??梢允褂?code style="background-color: rgb(231, 243, 237); padding: 1px 3px; border-radius: 4px; overflow-wrap: break-word; text-indent: 0px; display: inline-block;">@Backoff注解來配置重試間隔的退避策略,包括 |
delayExpression | 定義重試間隔的SpEL表達式??梢允褂?code style="background-color: rgb(231, 243, 237); padding: 1px 3px; border-radius: 4px; overflow-wrap: break-word; text-indent: 0px; display: inline-block;">#lastException表示上一次重試時捕獲的異常對象。 |
multiplier | 定義指數(shù)退避策略的倍數(shù),默認為1。在使用指數(shù)退避策略時,每次重試的間隔時間將是 |
random | 定義是否啟用隨機化延遲。默認為 |
exceptionExpression | 定義一個SpEL表達式,用于決定是否進行重試。該表達式可以使用 |
include | 定義需要包含在重試行為中的異常類型,默認為空數(shù)組,表示包括所有異常類型??梢灾付ㄒ粋€或多個異常類來明確指定需要處理的異常類型。例如, |
exclude | 定義需要排除在重試行為之外的異常類型,默認為空數(shù)組,表示排除所有異常類型。可以指定一個或多個異常類來明確排除某些異常類型。例如, |
這些屬性可以根據(jù)具體的需求進行配置,以實現(xiàn)靈活的重試策略。
2.2. Recover
Spring Retry 的 fallback 機制是在重試失敗后,執(zhí)行備選邏輯的一種處理方式。通過使用 @Recover 注解,在重試失敗后,可以調(diào)用備選邏輯方法來完成錯誤處理、數(shù)據(jù)清理等操作。
具體來說,當 @Retryable 注解標記的方法達到最大重試次數(shù)或者拋出了無法重試的異常時,Spring Retry將會嘗試查找與該方法參數(shù)類型相同的方法,并在找到時調(diào)用它。
@Retryable(maxAttempts = 3)
public void someMethod(String arg1, int arg2) {
// 重試業(yè)務邏輯
}
@Recover
public void recover(String arg1, int arg2) {
// 備選邏輯
}
當 someMethod() 方法達到最大重試次數(shù)或拋出無法重試的異常時,Spring Retry 將會查找并調(diào)用 recover() 方法,并傳遞相同的方法參數(shù)。在 recover() 方法中,應該針對重試失敗的情況編寫備選邏輯,例如記錄日志、發(fā)送通知等操作。
需要注意的是,@Recover 注解必須放置在其對應的重試方法所屬的類中,并且方法參數(shù)類型必須與重試方法一致。如果有多個重試方法,每個重試方法都可以對應一個@Recover方法,以滿足不同的備選邏輯需求。
在使用fallback機制時,應該仔細考慮備選邏輯的實現(xiàn)方式,確保其能夠正確處理重試失敗的情況,并對數(shù)據(jù)的一致性和完整性產(chǎn)生最小的影響。
2.3. 不同場景下的 Retry 和 Fallback
@Retryable 和 @Recover 都是添加在類方法上的注解,不管是什么場景下的請求只會走固定流程。這樣的設計在復雜場景下是否夠用?
之前,我們看到通過 Retry 恢復網(wǎng)絡抖動的場景;接下來讓我們看另一個場景,如下圖所示:
圖片
image
商品服務流量激增,導致 DB CPU 飆升,出現(xiàn)大量的慢 SQL,這時觸發(fā)了系統(tǒng)的 Retry 會是怎樣的結(jié)果?
- 在獲取商品失敗后,系統(tǒng)自動觸發(fā) Retry 機制;
- 由于是商品服務本身出了問題,第二次請求仍舊失??;
- 服務又觸發(fā)了第三次請求,仍未獲取結(jié)果;
- 達到最大重試次數(shù),仍舊無法獲取商品,只能通過異常中斷用戶請求;
通過 Retry 機制未能將流程從異常中恢復過來,反而給下游的 商品服務 造成了巨大傷害。
- 商品服務壓力大,響應時間長;
- 上游系統(tǒng)由于超時觸發(fā)自動重試;
- 自動重試增大了對商品服務的調(diào)用;
- 商品服務請求量更大,更難以從故障中恢復;
這就是常說的“讀放大”,假設用戶驗證是否能夠購買請求的請求量為 n,那極端情況下 商品服務的請求量為 3n (其中 2n 是由 Retry 機制造成)
此時,最優(yōu)解不是進行 Retry 而是直接走 Fallback,給下游服務一定的恢復機會。
同樣是對商品服務接口(同一個接口)的調(diào)用,在不同的場景需要使用不同的策略用以適配不同的業(yè)務流程,通常情況下:
- Command 場景優(yōu)先使用 Retry 策略
這種流量即為重要,最好能保障流程的完整性
通常寫流量比較小,小范圍 Retry 不會對下游系統(tǒng)造成巨大影響
- Query 場景優(yōu)選使用 Fallback 策略
大多數(shù)展示場景,哪怕部分信息沒有獲取到對整體的影響也比較小
通常讀場景流量較高,Retry 對下游系統(tǒng)的傷害不容忽視
面對不同的業(yè)務場景,你會怎么做呢?準備兩組不同的方法根據(jù)業(yè)務場景分別調(diào)用?
2.4. SmartFailure
SmartFailure 不是為不同的場景使用不同的方法,而是根據(jù)請求上下文信息,動態(tài)的走 Retry 或 Fallback,從而更好的適應復雜的業(yè)務場景。
整體設計如下:
圖片
整體流程如下:
- 讀取配置信息,將請求類型(ActionType)綁定到線程上下文;
- 然后執(zhí)行正常業(yè)務邏輯
- 當調(diào)用 @SmartFault 注解的方法時,會被 SmartFaultMethodInterceptor 攔截器攔截
攔截器通過 ActionTypeProvider 獲取當前的 ActionType;
根據(jù) ActionType 對請求進行路由;
如果是 COMMAND 操作,將使用 RetryTemplate 執(zhí)行請求,在發(fā)生異常時,通過重試配置進行請求重發(fā),從而最大限度的獲得遠程結(jié)果;
如果是 QUERY 操作,將使用 FallbackTemplate(重試次數(shù)為0的 RetryTemplate)執(zhí)行請求,當發(fā)生異常時,調(diào)用 fallback 方法,執(zhí)行配置的 recover 方法,直接使用返回結(jié)果;
- 獲取遠程結(jié)果后,執(zhí)行后續(xù)的業(yè)務邏輯;
- 最后,ActionAspect 將 ActionType 從線程上下文中移除;
使用前需添加 lego 依賴,具體如下:
<dependency>
<groupId>com.geekhalo.lego</groupId>
<artifactId>lego-starter</artifactId>
<version>最新版本</version>
</dependency>
2.4.1. ActionTypeProvider
首先,需要準備一個 ActionTypeProvider 用以提供上下文信息。ActionTypeProvider 接口定義如下:
public interface ActionTypeProvider {
ActionType get();
}
public enum ActionType {
COMMAND, QUERY
}
通常情況下,我們使用 ThreadLocal 組件將 ActionType 存儲于線程上下文,在使用時從上下中獲取相關(guān)信息。
public class ActionContext {
private static final ThreadLocal<ActionType> ACTION_TYPE_THREAD_LOCAL = new ThreadLocal<>();
public static void set(ActionType actionType){
ACTION_TYPE_THREAD_LOCAL.set(actionType);
}
public static ActionType get(){
return ACTION_TYPE_THREAD_LOCAL.get();
}
public static void clear(){
ACTION_TYPE_THREAD_LOCAL.remove();
}
}
有了上下文之后,ActionBasedActionTypeProvider 直接從 Context 中獲取 ActionType 具體如下:
@Component
public class ActionBasedActionTypeProvider implements ActionTypeProvider {
@Override
public ActionType get() {
return ActionContext.get();
}
}
如何對請求進行標記?如何對 ActionType 進行管理(包括信息綁定和信息清理)?最常用的方式便是:
- 提供一個注解,在方法上添加注解用于對 ActionType 的配置;
- 提供一個攔截器,對方法調(diào)用進行攔截:
方法調(diào)用前,從注解中獲取配置信息并綁定到上下文;
執(zhí)行具體的業(yè)務方法;
方法調(diào)用后,主動清理上下文信息;
核心實現(xiàn)為:
@Target({ ElementType.METHOD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface Action {
ActionType type();
}
@Aspect
@Component
@Order(Integer.MIN_VALUE)
public class ActionAspect {
@Pointcut("@annotation(com.geekhalo.lego.faultrecovery.smart.Action)")
public void pointcut() {
}
@Around(value = "pointcut()")
public Object action(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Action annotation = methodSignature.getMethod().getAnnotation(Action.class);
ActionContext.set(annotation.type());
try {
return joinPoint.proceed();
}finally {
ActionContext.clear();
}
}
}
在這些組件的幫助下,我們只需在方法上基于 @Action 注解進行標記,便能夠?qū)?ActionType 綁定到上下文。
2.4.2. @SmartFault
在完成 ActionType 綁定到上下文之后,接下來要做的便是對 遠程接口 進行配置。遠程接口的配置工作主要由 @SmartFault 來完成。其核心配置項包括:
配置項 | 含義 | 默認配置 |
recover | fallback 方法名稱 | |
maxRetry | 最大重試次數(shù) | 3 |
include | 觸發(fā)重試的異常類型 | |
exclude | 不需要重新的異常類型 |
測試 Demo 如下:
@Service
@Slf4j
@Getter
public class RetryService3 {
private int count = 0;
private int retryCount = 0;
private int fallbackCount = 0;
private int recoverCount = 0;
public void clean(){
this.retryCount = 0;
this.fallbackCount = 0;
this.recoverCount = 0;
}
/**
* Command 請求,啟動重試機制
*/
@Action(type = ActionType.COMMAND)
@SmartFault(recover = "recover")
public Long retry(Long input) throws Throwable{
this.retryCount ++;
return doSomething(input);
}
/**
* Query 請求,啟動Fallback機制
*/
@Action(type = ActionType.QUERY)
@SmartFault(recover = "recover")
public Long fallback(Long input) throws Throwable{
this.fallbackCount ++;
return doSomething(input);
}
@Recover
public Long recover(Throwable e, Long input){
this.recoverCount ++;
log.info("recover-{}", input);
return input;
}
private Long doSomething(Long input) {
// 偶數(shù)拋出異常
if (count ++ % 2 == 0){
log.info("Error-{}", input);
throw new RuntimeException();
}
log.info("Success-{}", input);
return input;
}
}
2.4.3. 測試
最后,對代碼進行簡單測試:
@SpringBootTest(classes = DemoApplication.class)
public class RetryService3Test {
@Autowired
private RetryService3 retryService;
@BeforeEach
public void setup(){
retryService.clean();
}
@Test
public void retry() throws Throwable{
for (int i = 0; i < 100; i++){
retryService.retry(i + 0L);
}
Assertions.assertTrue(retryService.getRetryCount() > 0);
Assertions.assertTrue(retryService.getRecoverCount() == 0);
Assertions.assertTrue(retryService.getFallbackCount() == 0);
}
@Test
public void fallback() throws Throwable{
for (int i = 0; i < 100; i++){
retryService.fallback(i + 0L);
}
Assertions.assertTrue(retryService.getRetryCount() == 0);
Assertions.assertTrue(retryService.getRecoverCount() > 0);
Assertions.assertTrue(retryService.getFallbackCount() > 0);
}
}
運行 retry 測試,日志如下:
[main] c.g.l.c.f.smart.SmartFaultExecutor : action type is COMMAND
[main] c.g.l.faultrecovery.smart.RetryService3 : Error-0
[main] c.g.l.c.f.smart.SmartFaultExecutor : Retry method public java.lang.Long com.geekhalo.lego.faultrecovery.smart.RetryService3.retry(java.lang.Long) throws java.lang.Throwable use [0]
[main] c.g.l.faultrecovery.smart.RetryService3 : Success-0
可見,當 action type 為 COMMAND 時:
- 第一次調(diào)用時,觸發(fā)異常,打印:Error-0
- 此時 SmartFaultExecutor 主動進行重試,打?。篟etry method xxxx
- 方法重試成功,RetryService3 打?。篠uccess-0
方法主動進行重試,流程從異常中恢復,處理過程和效果符合預期。
運行 fallback 測試,日志如下:
[main] c.g.l.c.f.smart.SmartFaultExecutor : action type is QUERY
[main] c.g.l.faultrecovery.smart.RetryService3 : Error-0
[main] c.g.l.c.f.smart.SmartFaultExecutor : recover From ERROR for method ReflectiveMethodInvocation: public java.lang.Long com.geekhalo.lego.faultrecovery.smart.RetryService3.fallback(java.lang.Long) throws java.lang.Throwable; target is of class [com.geekhalo.lego.faultrecovery.smart.RetryService3]
[main] c.g.l.faultrecovery.smart.RetryService3 : recover-0
可見,當 action type 為 QUERY 時:
- 第一次調(diào)用時,觸發(fā)異常,打印:Error-0
- SmartFaultExecutor 執(zhí)行 Fallback 策略,打印:recover From ERROR for method xxxx
- 調(diào)用RetryService3的 recover 方法,獲取最終返回值。RetryService3 打印:recover-0
異常后自動執(zhí)行 fallback,將流程從異常中恢復過來,處理過程和效果符合預期。
3. 服務端冪等
服務端冪等指的是對于相同的輸入請求,服務端能夠產(chǎn)生相同的結(jié)果,而且不會對系統(tǒng)狀態(tài)造成影響。冪等性是為了防止由于重試等原因?qū)е碌闹貜蛨?zhí)行帶來的副作用。在RPC中,服務端需要設計和實現(xiàn)冪等性的方法,確保多次接收到相同的請求時能正確處理,而不會重復執(zhí)行對系統(tǒng)狀態(tài)有改變的操作。
3.1. 冪等與冪等接口
冪等是指對同一個操作的多次執(zhí)行所產(chǎn)生的影響與一次執(zhí)行的影響相同,即不會因為多次執(zhí)行而產(chǎn)生額外的副作用。在分布式系統(tǒng)中,由于網(wǎng)絡等各種因素的存在,可能會導致對同一個操作進行多次執(zhí)行,此時如果該操作是冪等的,那么就可以避免數(shù)據(jù)沖突和重復處理問題。
冪等接口是指對外提供的接口,它們所提供的業(yè)務操作具有冪等性。也就是說,在客戶端多次調(diào)用該接口時,每次調(diào)用都會產(chǎn)生相同的結(jié)果,并且不會產(chǎn)生額外的副作用。
例如,銀行轉(zhuǎn)賬操作就應該是一個冪等接口。假設客戶端已經(jīng)成功地向銀行發(fā)起了一次轉(zhuǎn)賬請求,但由于網(wǎng)絡不穩(wěn)定等原因,導致該請求的響應沒有及時返回給客戶端。此時客戶端可能會誤以為轉(zhuǎn)賬請求失敗,進而再次發(fā)送同樣的轉(zhuǎn)賬請求。如果銀行的轉(zhuǎn)賬接口是冪等的,那么無論客戶端發(fā)送多少次轉(zhuǎn)賬請求,最終的結(jié)果都應該是相同的——只有一次轉(zhuǎn)賬操作會被執(zhí)行,其他的請求都會被忽略。
3.2. 天然冪接口
有些接口天然就具備冪等性。這些接口通常是對資源的查詢操作,不會對資源進行修改。以下是一些天然具備冪等性的場景:
- 查詢接口:對于只用于查詢數(shù)據(jù)的接口,例如獲取用戶信息、獲取訂單詳情等,多次調(diào)用不會對數(shù)據(jù)產(chǎn)生影響;
- 獲取接口:對于獲取資源的接口,例如獲取圖片、獲取文件等,無論調(diào)用多少次,得到的都是相同的資源副本;
- 計算接口:對于純計算的接口,例如加法、乘法等數(shù)學運算接口,多次調(diào)用得到的結(jié)果是相同的;
- 驗證接口:對于驗證某個條件的接口,例如檢查用戶名是否已存在、驗證手機號是否有效等,多次調(diào)用得到的驗證結(jié)果都是一致的;
這些接口在設計上更容易滿足冪等性的要求,因為它們的操作本身并沒有產(chǎn)生副作用,不會對數(shù)據(jù)進行修改。在分布式系統(tǒng)中,可以安全地多次調(diào)用這些接口,而不會引發(fā)數(shù)據(jù)沖突。
3.3. 非冪等接口
非冪等接口是指對資源進行修改、狀態(tài)進行變更而產(chǎn)生副作用的接口,多次調(diào)用可能會導致不同的結(jié)果。以下是一些常見的非冪等接口:
- 創(chuàng)建資源接口:例如創(chuàng)建用戶、創(chuàng)建訂單等操作,多次調(diào)用會生成多個相同的資源實例,每次調(diào)用都會產(chǎn)生不同的結(jié)果;
- 修改資源接口:例如更新用戶信息、修改文章內(nèi)容等操作,多次調(diào)用會對同一個資源進行多次修改,每次修改都會產(chǎn)生不同的結(jié)果;
- 刪除資源接口:例如刪除文件、刪除訂單等操作,多次調(diào)用會多次刪除同一個資源,每次調(diào)用都會產(chǎn)生不同的結(jié)果;
這些非冪等接口在設計上需要特別注意,并采取合適的措施來確保數(shù)據(jù)的一致性和操作的正確性。
3.4. 重復請求的處理方式
當系統(tǒng)檢測出當前請求是重復請求時,通常會有兩種處理策略:
- 直接拋出冪等異常,用以說明該請求為重復請求;
- 直接返回上次請求一致的返回結(jié)果;
這兩種方案在實現(xiàn)上差異不大,但在客戶端使用中差異巨大。
- 如果是異常方案,客戶端在調(diào)用冪等接口時需要對異常進行捕獲,然后通過其他 API 獲取上次請求的處理結(jié)果,根據(jù)結(jié)果不同來決定接下來的處理流程;
- 如果是直接返回上次請求的處理結(jié)果,則客戶端不需要做額外的處理,直接使用重試機制對請求進行重新發(fā)送即可,獲取結(jié)果后自然進入到下面的流程;
所以,在冪等設計中,優(yōu)先使用 “直接返回上次請求結(jié)果” 方案。
綜上分析,冪等可以做為一種通用能力與業(yè)務處理邏輯進行充分解構(gòu),這就是組件封裝的前提。
3.5. 冪等組件
我們可以將冪等封裝成一種能力,然后在需要冪等保護的業(yè)務方法上進行配置,從而實現(xiàn)冪等能力與業(yè)務方法的徹底解耦。
整體設計如下圖所示:
圖片
整體設計比較簡單,運行流程如下:
- IdempotentInterceptor 會對 @Idempotent 注解標記的方法進行攔截;
- 當方法第一次被調(diào)用時,會讀取 @Idempotent 注解上的配置信息,使用 IdempotentExecutorFactoris 為每個方法創(chuàng)建一個 IdempotentExecutor 實例;
- 在方法調(diào)用時,將請求直接路由到 IdempotentExecutor 實例,由 IdempotentExecutor 完成核心流程;
- 其中,IdempotentExecutorFactories 擁有多個 IdempotentExecutorFactory 實例,并根據(jù) @Idempotent 上配置的 executorFactory 屬性使用對應的實例完成創(chuàng)建工作;
從設計上看,系統(tǒng)中可以同時配置多個 IdempotentExecutorFactory,然后根據(jù)不同的業(yè)務場景設置不同的 executorFactory。
冪等處理的核心流程如下:
圖片
IdempotentExecutor 處理核心流程如下:
- 通過 SpringEL 表達式從入?yún)⒅刑崛?unique key 信息;
- 根據(jù) group 和 unique key 從 ExecutionRecordRepository 中讀取執(zhí)行記錄 ExecutionRecord;
- 如果 ExecutionRecord 為已完成狀態(tài),則根據(jù)配置直接返回 ExecutionRecord 的執(zhí)行結(jié)果 或者 直接拋出 RepeatedSubmitException 異常;
- 如果 ExecutionRecord 為執(zhí)行中,則出現(xiàn)并發(fā)問題,直接拋出 ConcurrentRequestException 異常;
- 如果 ExecutionRecord 為未執(zhí)行,先執(zhí)行方法獲取返回值,然后使用 ExecutionRecordRepository 對 ExecutionRecord 進行更新,然后返回執(zhí)行結(jié)果;
使用前需要引入 lego starter,在 maven pom 中添加如下信息:
<groupId>com.geekhalo.lego</groupId>
<artifactId>lego-starter</artifactId>
<version>最新版本</version>
3.5.1. 配置 dbExecutorFactory
以 JpaRepository 為例實現(xiàn)對 IdempotentExecutorFactory 的配置,具體如下:
@Configuration
public class IdempotentConfiguration extends IdempotentConfigurationSupport {
@Bean("dbExecutorFactory")
public IdempotentExecutorFactory dbExecutorFactory(JpaBasedExecutionRecordRepository recordRepository){
return createExecutorFactory(recordRepository);
}
}
其中,IdempotentConfigurationSupport 已經(jīng)提供 idempotent 所需的很多 Bean,同時提供 createExecutorFactory(repository) 方法,用以完成 IdempotentExecutorFactory 的創(chuàng)建。
使用 Jpa 需要調(diào)整 EnableJpaRepositories 相關(guān)配置,具體如下:
@Configuration
@EnableJpaRepositories(basePackages = {
"com.geekhalo.lego.core.idempotent.support.repository"
}, repositoryFactoryBeanClass = JpaBasedQueryObjectRepositoryFactoryBean.class)
public class SpringDataJpaConfiguration {
}
其中,com.geekhalo.lego.core.idempotent.support.repository 為固定包名,指向 Jpa 默認實現(xiàn) JpaBasedExecutionRecordRepository,Spring Data Jpa 會自動生成實現(xiàn)的代理對象。
最后,在數(shù)據(jù)庫中增加 冪等所需表,sql 如下:
CREATE TABLE `idempotent_execution_record` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`type` int(11) NOT NULL,
`unique_key` varchar(64) NOT NULL,
`status` int(11) NOT NULL,
`result` varchar(1024) DEFAULT NULL,
`create_date` datetime DEFAULT NULL,
`update_date` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `unq_type_key` (`type`,`unique_key`)
) ENGINE=InnoDB;
至此,便完成了基本配置。
【注】關(guān)于 Spring data jpa 配置,可以自行到網(wǎng)上進行檢索。
3.5.2. 冪等保護示例
在方法上增加 @Idempotent 注解便可以使其具備冪等保護,示例如下:
@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",
handleType = IdempotentHandleType.RESULT)
@Transactional
public Long putForResult(String key, Long data){
return put(key, data);
}
其中 @Idempotent 為核心配置,詳細信息如下:
- executorFactory 為 IdempotentExecutorFactory,及在 IdempotentConfiguration 中配置的bean,默認為 DEFAULT_EXECUTOR_FACTORY
- group 為組信息,用于區(qū)分不同的業(yè)務場景,同一業(yè)務場景使用相同的配置;
- keyEl 為提取冪等鍵所用的 SpringEl 表達式,#key 說明入?yún)⒌?key 將作為冪等鍵,group + key 為一個完整的冪等鍵,唯一識別一次請求;
- handleType 是處理類型,及重復提交時如何處理請求
RESULT,直接返回上次的執(zhí)行結(jié)果
ERROR,直接拋出 RepeatedSubmitException 異常
編寫簡單的測試用例如下:
@Test
void putForResult() {
BaseIdempotentService idempotentService = getIdempotentService();
String key = String.valueOf(RandomUtils.nextLong());
Long value = RandomUtils.nextLong();
{ // 第一次操作,返回值和最終結(jié)果符合預期
Long result = idempotentService.putForResult(key, value);
Assertions.assertEquals(value, result);
Assertions.assertEquals(value, idempotentService.getValue(key));
}
{ // 第二次操作,返回值和最終結(jié)果 與第一次一致(直接獲取返回值,沒有執(zhí)行業(yè)務邏輯)
Long valueNew = RandomUtils.nextLong();
Long result = idempotentService.putForResult(key, valueNew);
Assertions.assertEquals(value, result);
Assertions.assertEquals(value, idempotentService.getValue(key));
}
}
運行測試用例,測試通過,可得出如下結(jié)論:
- 第一次操作,與正常方法一致,成功返回結(jié)果值;
- 第二次操作,邏輯方法未執(zhí)行,直接返回第一次的運行結(jié)果;
這是最常見的一種工作模式,除直接返回上次執(zhí)行結(jié)果外,當發(fā)生重復提交時也可以拋出異常中斷流程,只需將 handleType 設置為 ERROR 即可,具體如下:
@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",
handleType = IdempotentHandleType.ERROR)
@Transactional
public Long putForError(String key, Long data){
return put(key, data);
}
編寫測試用例,具體如下:
@Test
void putForError() {
BaseIdempotentService idempotentService = getIdempotentService();
String key = String.valueOf(RandomUtils.nextLong());
Long value = RandomUtils.nextLong();
{ // 第一次操作,返回值和最終結(jié)果符合預期
Long result = idempotentService.putForError(key, value);
Assertions.assertEquals(value, result);
Assertions.assertEquals(value, idempotentService.getValue(key));
}
{ // 第二次操作,直接拋出異常,結(jié)果與第一次一致
Assertions.assertThrows(RepeatedSubmitException.class, () ->{
Long valueNew = RandomUtils.nextLong();
idempotentService.putForError(key, valueNew);
});
Assertions.assertEquals(value, idempotentService.getValue(key));
}
}
運行測試用例,測試通過,可以得出:
- 第一次操作,與正常方法一致,成功返回結(jié)果值;
- 第二次操作,直接拋出 RepeatedSubmitException 異常,同時方法未執(zhí)行,結(jié)果與第一次調(diào)用一致;
3.5.3. 冪等與異常
異常是一種特殊的返回值?。?!
如果將異常看做是一種特殊的返回值,那冪等接口在第二次請求時同樣需要拋出異常,示例代碼如下:
@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",
handleType = IdempotentHandleType.RESULT)
@Transactional
public Long putExceptionForResult(String key, Long data) {
return putException(key, data);
}
protected Long putException(String key, Long data){
this.data.put(key, data);
throw new IdempotentTestException();
}
@Idempotent 注解沒有變化,只是在 putException 方法執(zhí)行后拋出 IdempotentTestException 異常。
編寫簡單測試用例如下:
@Test
void putExceptionForResult(){
BaseIdempotentService idempotentService = getIdempotentService();
String key = String.valueOf(RandomUtils.nextLong());
Long value = RandomUtils.nextLong();
{ // 第一次操作,拋出異常
Assertions.assertThrows(IdempotentTestException.class,
()->idempotentService.putExceptionForResult(key, value));
Assertions.assertEquals(value, idempotentService.getValue(key));
}
{ // 第二次操作,返回值和最終結(jié)果 與第一一致(直接獲取返回值,沒有執(zhí)行業(yè)務邏輯)
Long valueNew = RandomUtils.nextLong();
Assertions.assertThrows(IdempotentTestException.class,
()->idempotentService.putExceptionForResult(key, valueNew));
Assertions.assertEquals(value, idempotentService.getValue(key));
}
}
運行測試用例,用例通過,可知:
- 第一次操作,與方法邏輯一致,更新數(shù)據(jù)并拋出 IdempotentTestException 異常;
- 第二次操作,直接拋出 IdempotentTestException 異常,同時方法未執(zhí)行,結(jié)果與第一次一致;
3.5.4. 并發(fā)保護
如果上一個請求執(zhí)行尚未結(jié)束,新的請求已經(jīng)開啟,那會如何?
這就是最常見的并發(fā)場景,idempotent 對其也進行了支持,當出現(xiàn)并發(fā)請求時會直接拋出 ConcurrentRequestException,用于中斷處理。
首先,使用 sleep 模擬一個耗時的方法,具體如下:
@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",
handleType = IdempotentHandleType.RESULT)
@Transactional
public Long putWaitForResult(String key, Long data) {
return putForWait(key, data);
}
protected Long putForWait(String key, Long data){
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return put(key, data);
}
putWaitForResult 方法調(diào)用時會主動 sleep 3 秒,然后才執(zhí)行真正的邏輯。
編寫測試代碼如下:
@Test
void putWaitForResult(){
String key = String.valueOf(RandomUtils.nextLong());
Long value = RandomUtils.nextLong();
// 主線程拋出 ConcurrentRequestException
Assertions.assertThrows(ConcurrentRequestException.class, () ->
testForConcurrent(baseIdempotentService ->
baseIdempotentService.putWaitForResult(key, value))
);
}
private void testForConcurrent(Consumer<BaseIdempotentService> consumer) throws InterruptedException {
// 啟動一個線程執(zhí)行任務,模擬并發(fā)場景
Thread thread = new Thread(() -> consumer.accept(getIdempotentService()));
thread.start();
// 主線程 sleep 1 秒,與異步線程并行執(zhí)行任務
TimeUnit.SECONDS.sleep(1);
consumer.accept(getIdempotentService());
}
運行單元測試,測試通過,核心測試邏輯如下:
- 創(chuàng)建一個線程,執(zhí)行耗時方法;
- 等待 1 秒后,主線程也執(zhí)行耗時方法;
- 此時,兩個線程并發(fā)執(zhí)行耗時方法,后進入的主線程直接拋出 ConcurrentRequestException;
3.5.5. Redis 支持
DB 具有非常好的一致性,但性能存在一定的問題。在一致性要求不高,性能要求高的場景,可以使用 Redis 作為 ExecutionRecord 的存儲引擎。
引入 redis 非常簡單,大致分兩步:
- 在 IdempotentConfiguration 中注冊 redisExecutorFactory bean;
- @Idempotent 注解中使用 redisExecutorFactory 即可;
添加 redisExecutorFactory Bean,具體如下:
@Configuration
public class IdempotentConfiguration extends IdempotentConfigurationSupport {
@Bean("redisExecutorFactory")
public IdempotentExecutorFactory redisExecutorFactory(ExecutionRecordRepository executionRecordRepository){
return createExecutorFactory(executionRecordRepository);
}
@Bean
public ExecutionRecordRepository executionRecordRepository(RedisTemplate<String, ExecutionRecord> recordRedisTemplate){
return new RedisBasedExecutionRecordRepository("ide-%s-%s", Duration.ofDays(7), recordRedisTemplate);
}
@Bean
public RedisTemplate<String, ExecutionRecord> recordRedisTemplate(RedisConnectionFactory redisConnectionFactory){
RedisTemplate<String, ExecutionRecord> redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
Jackson2JsonRedisSerializer<ExecutionRecord> executionRecordJackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(ExecutionRecord.class);
executionRecordJackson2JsonRedisSerializer.setObjectMapper(objectMapper);
redisTemplate.setValueSerializer(executionRecordJackson2JsonRedisSerializer);
return redisTemplate;
}
}
@Idempotent 注解調(diào)整如下:
@Idempotent(executorFactory = "redisExecutorFactory", group = 1, keyEl = "#key",
handleType = IdempotentHandleType.RESULT)
@Override
public Long putForResult(String key, Long data){
return put(key, data);
}
這樣,所有的冪等信息都會存儲在 redis 中。
【注】一般 redis 不會對數(shù)據(jù)進行持久存儲,只能保障在一段時間內(nèi)的冪等性,超出時間后,由于 key 被自動清理,冪等將不再生效。對于業(yè)務場景不太嚴格但性能要求較高的場景才可使用,比如為過濾系統(tǒng)中由于 retry 機制造成的重復請求。
4. 小結(jié)
當系統(tǒng)開啟微服務化后,服務調(diào)用的第三狀態(tài)就成為了不可回避的話題。通常情況下,會綜合使用 客戶端重試 和 服務端冪等 兩個方案來解決:
- 客戶端重試。需要根據(jù)不同的場景選擇不同的 Retry 和 Fallback 機制:
寫請求,建設使用 Retry 機制,保障最重要的流量不被浪費
讀請求,建議使用 Fallback 機制,避免重試流量對下游服務造成巨大壓力
- 服務端冪等。冪等作為一種通用能力,建議與業(yè)務邏輯進行分離,在需要的時候直接在業(yè)務方法上進行配置即可,但仍舊有一些最佳實踐:
- 使用直接返回上次的處理結(jié)果替代異常中斷,使調(diào)用方更加簡潔;
- 業(yè)務異常是一種特殊的返回值,也需要進行冪等保護;
- 有冪等保護后,仍舊需要對并發(fā)請求進行處理,此時直接通過異常對重復流程進行中斷即可;
以上兩種場景,lego 對其都進行了封裝,可以方便的應用于業(yè)務系統(tǒng),從而降低微服務的傷害。