五張圖帶你了解分布式事務(wù) Saga 模式中的狀態(tài)機
大家好,我是君哥。
狀態(tài)機在我們的工作中應(yīng)用非常廣泛,今天聊一聊分布式事務(wù)中間件 Seata 中 Saga 模式的狀態(tài)機。
1.狀態(tài)機簡介
狀態(tài)機是一個數(shù)學(xué)模型,它將工作中的運行狀態(tài)和流轉(zhuǎn)規(guī)則抽象出來,可以協(xié)調(diào)相關(guān)信號來完成預(yù)先設(shè)定的操作。
下面介紹狀態(tài)機中的幾個概念:
- 狀態(tài):狀態(tài)機目前的狀態(tài)標識;
- 狀態(tài)轉(zhuǎn)移:定義狀態(tài)之間的轉(zhuǎn)移路由;
- 動作(Action):狀態(tài)轉(zhuǎn)移需要的操作;
- 事件:要執(zhí)行某個操作時的觸發(fā)器或者口令。
狀態(tài)機一般用在狀態(tài)類型比較多(超過 3 個),分支流程比較多,初始狀態(tài)經(jīng)過多個流程的流轉(zhuǎn)達到最終狀態(tài)的場景。
2.Saga 模式
Saga 模式是分布式事務(wù)中長事務(wù)的一種解決方案,Seata 中 Saga 模式的理論基礎(chǔ)是 Hector & Kenneth 在 1987 年發(fā)表的論文 Sagas。下圖(來自官網(wǎng))是 Seata 中 Saga 模型:
在 Saga 模式中,如果一部分分支事務(wù)已經(jīng)提交成功,當(dāng)其中一個分支事務(wù)提交失敗,狀態(tài)機就會觸發(fā)所有提交成功的分支事務(wù)進行回滾。
分支事務(wù)中提交和回滾的邏輯需要由業(yè)務(wù)代碼來實現(xiàn)。
3.Saga 實現(xiàn)
Seata 中 Saga 模式是基于狀態(tài)機來實現(xiàn)的,使用 Saga 模式時,先畫一張狀態(tài)圖,這個狀態(tài)圖定義服務(wù)調(diào)用流程,每個節(jié)點調(diào)用一個分支事務(wù),并且每個節(jié)點需要配備一個補償節(jié)點用于分支事務(wù)失敗后的補償動作。
以經(jīng)典電商案例來講,一個分布式事務(wù)中有三個分支事務(wù)參數(shù)者:
分支事務(wù) | 動作 | 狀態(tài) |
訂單服務(wù) | 保存訂單 | 保存成功、失敗 |
賬戶服務(wù) | 扣減金額 | 扣減成功、失敗 |
庫存服務(wù) | 扣減庫存 | 扣減成功、失敗 |
在這個分布式事務(wù)中,只有訂單、賬戶、庫存這三個分支事務(wù)都提交成功,整個事務(wù)才能成功。每一個分支事務(wù)提交失敗,其他執(zhí)行成功的事務(wù)都需要反向補償。如下圖:
圖片
比如扣減金額這個分支事務(wù)失敗了,需要反向補償扣減金額、保存訂單這兩個分支事務(wù)。那 Seata 是怎么做到事件觸發(fā)、狀態(tài)流轉(zhuǎn)和補償操作的呢?
使用 Seata 狀態(tài)機,首先需要定義一個 Json 文件,這個 Json 文件把圖中的每個節(jié)點都定義成一個 State,State 的類型共有四種:
- ServiceTask:對應(yīng)分支事務(wù)的提交操作。
- Choice:對應(yīng)流程中下一個 State 的選擇。
- CompensationTrigger:觸發(fā)補償服務(wù)。
- Succeed:成功狀態(tài),當(dāng)所有分支事務(wù)都成功后才會流轉(zhuǎn)到這個狀態(tài)。
- Fail:失敗狀態(tài)。
(1)ServiceTask
下面我們看"保存訂單"這個狀態(tài):
"SaveOrder": {
"Type": "ServiceTask",
"ServiceName": "orderSave",
"ServiceMethod": "saveOrder",
"CompensateState": "DeleteOrder",
"Next": "ChoiceAccountState",
"Input": [
"$.[businessKey]",
"$.[order]"
],
"Output": {
"SaveOrderResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
}
]
},
這個 State 的類型是 ServiceTask,上面圖中的分支服務(wù)和補償服務(wù)都是這種類型,也對應(yīng)代碼中的一個 Service。上面的 Json 中主要定義了三個內(nèi)容:
- 這個 state 調(diào)用的 Service 方法.
- 提交失敗后的補償 State(CompensateState)。
- 提交成功后應(yīng)該跳轉(zhuǎn)的下一個 State(ChoiceAccountState)。
(2)Choice
下面來看 ChoiceAccountState 這個狀態(tài)節(jié)點,Json 文件定義如下:
"ChoiceAccountState":{
"Type": "Choice",
"Choices":[
{
"Expression":"[SaveOrderResult] == true",
"Next":"ReduceAccount"
}
],
"Default":"Fail"
}
對應(yīng)的下個節(jié)點是 ReduceAccount,如果失敗就會跳轉(zhuǎn) Fail 狀態(tài)。
(3)Fail
上面 orderSave 這個狀態(tài)節(jié)點如果發(fā)生異常,會跳轉(zhuǎn)到 CompensationTrigger,CompensationTrigger 狀態(tài)節(jié)點定義如下:
"CompensationTrigger": {
"Type": "CompensationTrigger",
"Next": "Fail"
}
這個節(jié)點會觸發(fā) SaveOrder 中定義的補償服務(wù),然后將最終狀態(tài)流轉(zhuǎn)到 Fail。同時我們也看到,只要到了 CompensationTrigger 這個狀態(tài)節(jié)點,最終狀態(tài)就會流轉(zhuǎn)到 Fail。
下面我們把整個 Json 文件的定義貼出來看一下:
{
"Name": "buyGoodsOnline",
"Comment": "buy a goods on line, add order, deduct account, deduct storage ",
"StartState": "SaveOrder",
"Version": "0.0.1",
#定義狀態(tài)
"States": {
"SaveOrder": {
"Type": "ServiceTask",
"ServiceName": "orderSave",
"ServiceMethod": "saveOrder",
"CompensateState": "DeleteOrder",
"Next": "ChoiceAccountState",
"Input": [
"$.[businessKey]",
"$.[order]"
],
"Output": {
"SaveOrderResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
}
]
},
"ChoiceAccountState":{
"Type": "Choice",
"Choices":[
{
"Expression":"[SaveOrderResult] == true",
"Next":"ReduceAccount"
}
],
"Default":"Fail"
},
"ReduceAccount": {
"Type": "ServiceTask",
"ServiceName": "accountService",
"ServiceMethod": "decrease",
"CompensateState": "CompensateReduceAccount",
"Next": "ChoiceStorageState",
"Input": [
"$.[businessKey]",
"$.[userId]",
"$.[money]",
{
"throwException" : "$.[mockReduceAccountFail]"
}
],
"Output": {
"ReduceAccountResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
}
]
},
"ChoiceStorageState":{
"Type": "Choice",
"Choices":[
{
"Expression":"[ReduceAccountResult] == true",
"Next":"ReduceStorage"
}
],
"Default":"Fail"
},
"ReduceStorage": {
"Type": "ServiceTask",
"ServiceName": "storageService",
"ServiceMethod": "decrease",
"CompensateState": "CompensateReduceStorage",
"Input": [
"$.[businessKey]",
"$.[productId]",
"$.[count]",
{
"throwException" : "$.[mockReduceStorageFail]"
}
],
"Output": {
"ReduceStorageResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
}
],
"Next": "Succeed"
},
"DeleteOrder": {
"Type": "ServiceTask",
"ServiceName": "orderSave",
"ServiceMethod": "deleteOrder",
"Input": [
"$.[businessKey]",
"$.[order]"
]
},
"CompensateReduceAccount": {
"Type": "ServiceTask",
"ServiceName": "accountService",
"ServiceMethod": "compensateDecrease",
"Input": [
"$.[businessKey]",
"$.[userId]",
"$.[money]"
]
},
"CompensateReduceStorage": {
"Type": "ServiceTask",
"ServiceName": "storageService",
"ServiceMethod": "compensateDecrease",
"Input": [
"$.[businessKey]",
"$.[productId]",
"$.[count]"
]
},
"CompensationTrigger": {
"Type": "CompensationTrigger",
"Next": "Fail"
},
"Succeed": {
"Type":"Succeed"
},
"Fail": {
"Type":"Fail",
"ErrorCode": "PURCHASE_FAILED",
"Message": "purchase failed"
}
}
}
上面 Json 文件中定義的 buyGoodsOnline,是狀態(tài)機加載的入口,狀態(tài)機會找到這個 name,然后把狀態(tài)加載到自己的內(nèi)存中。下面,我們再來總結(jié)一下電商案例中分布式事務(wù)狀態(tài)流轉(zhuǎn)過程:
4.狀態(tài)機應(yīng)用
上面的電商例子中,三個分支服務(wù)分別定義了三個 State,對應(yīng)的 ServiceMethod 如下:
- SaveOrder#saveOrder:
public boolean saveOrder(String businessKey, Order order) {
logger.info("保存訂單, businessKey:{}, order: {}", businessKey, order);
orderDao.create(order);
return true;
}
- ReduceAccount#decrease
public boolean decrease(String businessKey, Long userId, BigDecimal money) {
return accountApi.decrease(businessKey, userId, money);
}
- ReduceStorage#decrease
public boolean decrease(String businessKey, Long productId, Integer count) {
return storageApi.decrease(businessKey, productId, count);
}
狀態(tài)機在啟動的時候,需要把上面方法中的參數(shù)都傳入,實例代碼如下:
StateMachineEngine stateMachineEngine = (StateMachineEngine) ApplicationContextUtils.getApplicationContext().getBean("stateMachineEngine");
Map<String, Object> startParams = new HashMap<>(3);
String businessKey = String.valueOf(System.currentTimeMillis());
startParams.put("businessKey", businessKey);
startParams.put("order", order);
startParams.put("mockReduceAccountFail", "true");
startParams.put("userId", order.getUserId());
startParams.put("money", order.getPayAmount());
startParams.put("productId", order.getProductId());
startParams.put("count", order.getCount());
//這里采用同步方法
StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("buyGoodsOnline", null, businessKey, startParams);
5.狀態(tài)機原理
下面這張圖來自于 Seata 官網(wǎng),主要講解了狀態(tài)機的工作原理:
圖片
- 狀態(tài)機啟動時,首先啟動了全局事務(wù)。
- 將狀態(tài)機的參數(shù)記錄在本地 seata_state_machine_inst 表。
- 向 Seata Server 注冊分支事務(wù)。
- 執(zhí)行 StateA 并記錄狀態(tài)到本地數(shù)據(jù)庫,同時會產(chǎn)生路由事件放入 EventQueue,執(zhí)行 StateB 時取出路由消息觸發(fā)執(zhí)行。同樣 StateB 執(zhí)行時也會產(chǎn)生路由消息放入 EventQueue。
- 從 EventQueue 取出路由消息執(zhí)行 StateC。
- 狀態(tài)機結(jié)束流程,提交或回滾全局事務(wù)。
6.高可用
Seata 中的狀態(tài)機并不是獨立部署,而是內(nèi)嵌在應(yīng)用中,由于狀態(tài)機上下文和執(zhí)行日志都記錄在本地數(shù)據(jù)庫中,所以狀態(tài)機本身是無狀態(tài)的。
狀態(tài)機啟動時,會發(fā)送狀態(tài)到 Seata Server,當(dāng)一個應(yīng)用宕機后,Seata Server 能感知到,并會把恢復(fù)請求發(fā)送到存活的實例,收到請求的實例從數(shù)據(jù)庫取出狀態(tài)機上下文和執(zhí)行日志進行恢復(fù)。如下圖:
7.總結(jié)
本文講解了分布式事務(wù)中間件 Seata 給 Saga 模式設(shè)計的狀態(tài)機使用方式和原理。狀態(tài)機在我們的日常工作中使用非常廣泛,希望 Seata 的設(shè)計能對我們設(shè)計狀態(tài)機提供思路和參考。