轉轉基于MQ的分布式重試框架設計方案
1 背景
在分布式場景下,為了保障系統(tǒng)的可用性和數(shù)據(jù)的最終一致性,采用基于消息隊列(MQ)的重試機制是一種常見的解決方案。偽代碼如下:
/**
* 需要保證最終一致性的函數(shù)
*/
public void doSomething(Object args) {
try {
// 執(zhí)行事務的操作
executeTransaction();
// 提交事務
commitTransaction();
} catch (Exception e) {
// 回滾事務
rollbackTransaction();
// 記錄日志
log.error(e);
// 序列化參數(shù)
byte[] body = serialize(args);
// 構建消息, 指定Topic、Body
Message msg = new Message("doSomethingTopic", body);
// 發(fā)送失敗重試消息
mq.send(msg);
}
}
/**
* 消費者,用于失敗重試處理
*/
@Consumer(topic = "doSomethingTopic")
public void consume(Message msg) {
// 反序列化
Object args = msg.deserialize();
// 重試
doSomething(args);
}
在上述示例中,我們需要編寫一系列與業(yè)務無關的代碼來實現(xiàn)業(yè)務邏輯的重試機制。為了減輕開發(fā)人員的負擔并讓其專注于核心業(yè)務,我們可以對這些無關代碼進行抽象和優(yōu)化,以提高開發(fā)效率和代碼質量。
2 方案
通過如下步驟,我們對重試邏輯進行了封裝,開發(fā)人員只需要在需要保證最終一致性的函數(shù)上標注一個重試注解,便擁有基于MQ的分布式重試能力。
1. 使用注解與AOP: 通過使用注解與面向切面編程(AOP)的技術,將重試邏輯模塊與業(yè)務代碼解耦。開發(fā)人員可以在需要保證最終一致性的業(yè)務方法上添加注解,通過AOP將重試邏輯應用到目標方法中,從而自動觸發(fā)重試機制。
2. 提供配置化選項:為重試邏輯提供可配置化的選項,例如設置最大重試次數(shù)、重試間隔時間等。這樣,開發(fā)人員可以根據(jù)具體業(yè)務需求進行調整,而無需修改代碼。
3. 異常處理和日志記錄:在重試邏輯中合理地處理異常,并在必要時記錄相關日志。這樣可以幫助開發(fā)人員及時發(fā)現(xiàn)問題并進行排查。
4. 提供可視化監(jiān)控工具:開發(fā)一個可視化的監(jiān)控工具,用于實時跟蹤重試操作和相關指標。這樣可以幫助開發(fā)人員更好地理解重試的執(zhí)行情況,并進行故障排查和性能優(yōu)化。
圖片
3 效果
我們引入了@MQRetry注解用于標記業(yè)務邏輯函數(shù),一旦該函數(shù)發(fā)生異常,該注解會將服務名、類的完整名稱、方法名稱以及實際參數(shù)列表發(fā)送到消息隊列(MQ)中。同時系統(tǒng)會注冊一個MQ消費者來消費這些消息,并進行重試處理。
舉個例子,假設我們有一個名為doSomething的函數(shù),它包含了需要保證最終一致性執(zhí)行的業(yè)務邏輯。僅需在該函數(shù)上添加@MQRetry注解,當函數(shù)出現(xiàn)異常時,框架會自動發(fā)送一條MQ重試消息。這條消息可以被當前服務的任意一臺服務器消費,并重新執(zhí)行doSomething函數(shù)。
@Service
class Service {
@MQRetry
public void doSomething(String params1, String params2, List<String> params3) {
//throw new RuntimeException(); 拋異常將重試
//RetryContext.markRetryLater(); 標記為需要下次重試
//int retryCount = RetryContext.getRetryCount(); 獲取重試次數(shù)
}
}
@Controller
class Controller {
@Autowired
private Service service;
service.doSomething("1", "2", Arrays.asList("3", "4"));
}
4 可選項
除此之外,我們還為開發(fā)人員提供了一些可選項,提供一些可配置的能力。
/**
* 基于MQ的分布式重試組件
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MQRetry {
/**
* 最大重試次數(shù),默認與上限為16次
*/
int maxAttempts() default 16;
/**
* 忽略的異常類列表,默認所有異常都重試
*/
Class<? extends Throwable>[] exclude() default {};
/**
* 需要重試的異常類列表,默認所有異常都重試
*/
Class<? extends Throwable>[] include() default {};
/**
* 出現(xiàn)異常時的處理函數(shù), 格式: Bean名.方法名. 如: smsService.onError
* 也可以只設置函數(shù)名, 不設置Bean名將執(zhí)行本類的函數(shù). 如: onError
* 要求函數(shù)參數(shù)必須與重試函數(shù)的參數(shù)完全一致
*/
String errorHandler() default "";
/**
* true: 第一次調用時, 同步執(zhí)行@MQRetry函數(shù), 如果失敗再使用MQ
* false: 調用@MQRetry函數(shù)時, 只會發(fā)送MQ
*/
boolean firstSyncCall() default true;
/**
* 消費線程數(shù),默認為20個
*/
int consumeThread() default 20;
}
5 注意事項
- 適用于異步場景,重試函數(shù)不要設置返回值,函數(shù)的返回值將不會有任何的實際意義。
- At lease Once保證,重試函數(shù)需要保證冪等。
- 使用了AOP代理實現(xiàn),因此,@Transactional的注意事項同樣適用于@MQRetry,如this調用、private函數(shù)、final函數(shù)會導致重試失效。
- 如果重試函數(shù)需要增加參數(shù),請在函數(shù)參數(shù)最后位置添加。歷史消息消費時對應參數(shù)將填充為null。
6 總結
在計算機領域中,重試機制的重要性不言而喻。它通常分為兩種模式:客戶端模式和服務端模式。客戶端模式簡單易用,但可靠性較低;而服務端模式雖然相對復雜,但能夠提供更高的可靠性。
無論是客戶端模式還是服務端模式,重試機制都是保障系統(tǒng)正常運行的重要一環(huán)。選擇適合您業(yè)務需求的模式,并通過合理的配置項進行優(yōu)化,將為您的系統(tǒng)帶來更好的表現(xiàn)和用戶體驗。
圖片
關于作者
苑沖,轉轉架構部存儲服務負責人,負責MQ、監(jiān)控系統(tǒng)、KV存儲、時序數(shù)據(jù)庫、Redis、KMS秘鑰管理等基礎組件。喜歡深入思考問題,對探索新領域和解決問題充滿熱情。