自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

五張圖帶你了解分布式事務(wù) Saga 模式中的狀態(tài)機

開發(fā) 架構(gòu)
本文講解了分布式事務(wù)中間件 Seata 給 Saga 模式設(shè)計的狀態(tài)機使用方式和原理。狀態(tài)機在我們的日常工作中使用非常廣泛,希望 Seata 的設(shè)計能對我們設(shè)計狀態(tài)機提供思路和參考。

大家好,我是君哥。

狀態(tài)機在我們的工作中應(yīng)用非常廣泛,今天聊一聊分布式事務(wù)中間件 Seata 中 Saga 模式的狀態(tài)機。

1.狀態(tài)機簡介

狀態(tài)機是一個數(shù)學(xué)模型,它將工作中的運行狀態(tài)和流轉(zhuǎn)規(guī)則抽象出來,可以協(xié)調(diào)相關(guān)信號來完成預(yù)先設(shè)定的操作。

下面介紹狀態(tài)機中的幾個概念:

  • 狀態(tài):狀態(tài)機目前的狀態(tài)標識;
  • 狀態(tài)轉(zhuǎn)移:定義狀態(tài)之間的轉(zhuǎn)移路由;
  • 動作(Action):狀態(tài)轉(zhuǎn)移需要的操作;
  • 事件:要執(zhí)行某個操作時的觸發(fā)器或者口令。

狀態(tài)機一般用在狀態(tài)類型比較多(超過 3 個),分支流程比較多,初始狀態(tài)經(jīng)過多個流程的流轉(zhuǎn)達到最終狀態(tài)的場景。

2.Saga 模式

Saga 模式是分布式事務(wù)中長事務(wù)的一種解決方案,Seata 中 Saga 模式的理論基礎(chǔ)是 Hector & Kenneth 在 1987 年發(fā)表的論文 Sagas。下圖(來自官網(wǎng))是 Seata 中 Saga 模型:

在 Saga 模式中,如果一部分分支事務(wù)已經(jīng)提交成功,當(dāng)其中一個分支事務(wù)提交失敗,狀態(tài)機就會觸發(fā)所有提交成功的分支事務(wù)進行回滾。

分支事務(wù)中提交和回滾的邏輯需要由業(yè)務(wù)代碼來實現(xiàn)。

3.Saga 實現(xiàn)

Seata 中 Saga 模式是基于狀態(tài)機來實現(xiàn)的,使用 Saga 模式時,先畫一張狀態(tài)圖,這個狀態(tài)圖定義服務(wù)調(diào)用流程,每個節(jié)點調(diào)用一個分支事務(wù),并且每個節(jié)點需要配備一個補償節(jié)點用于分支事務(wù)失敗后的補償動作。

以經(jīng)典電商案例來講,一個分布式事務(wù)中有三個分支事務(wù)參數(shù)者:

分支事務(wù)

動作

狀態(tài)

訂單服務(wù)

保存訂單

保存成功、失敗

賬戶服務(wù)

扣減金額

扣減成功、失敗

庫存服務(wù)

扣減庫存

扣減成功、失敗

在這個分布式事務(wù)中,只有訂單、賬戶、庫存這三個分支事務(wù)都提交成功,整個事務(wù)才能成功。每一個分支事務(wù)提交失敗,其他執(zhí)行成功的事務(wù)都需要反向補償。如下圖:

圖片圖片

比如扣減金額這個分支事務(wù)失敗了,需要反向補償扣減金額、保存訂單這兩個分支事務(wù)。那 Seata 是怎么做到事件觸發(fā)、狀態(tài)流轉(zhuǎn)和補償操作的呢?

使用 Seata 狀態(tài)機,首先需要定義一個 Json 文件,這個 Json 文件把圖中的每個節(jié)點都定義成一個 State,State 的類型共有四種:

  • ServiceTask:對應(yīng)分支事務(wù)的提交操作。
  • Choice:對應(yīng)流程中下一個 State 的選擇。
  • CompensationTrigger:觸發(fā)補償服務(wù)。
  • Succeed:成功狀態(tài),當(dāng)所有分支事務(wù)都成功后才會流轉(zhuǎn)到這個狀態(tài)。
  • Fail:失敗狀態(tài)。

(1)ServiceTask

下面我們看"保存訂單"這個狀態(tài):

"SaveOrder": {
 "Type": "ServiceTask",
 "ServiceName": "orderSave",
 "ServiceMethod": "saveOrder",
 "CompensateState": "DeleteOrder",
 "Next": "ChoiceAccountState",
 "Input": [
  "$.[businessKey]",
  "$.[order]"
 ],
 "Output": {
  "SaveOrderResult": "$.#root"
 },
 "Status": {
  "#root == true": "SU",
  "#root == false": "FA",
  "$Exception{java.lang.Throwable}": "UN"
 },
 "Catch": [
  {
   "Exceptions": [
    "java.lang.Throwable"
   ],
   "Next": "CompensationTrigger"
  }
 ]
},

這個 State 的類型是 ServiceTask,上面圖中的分支服務(wù)和補償服務(wù)都是這種類型,也對應(yīng)代碼中的一個 Service。上面的 Json 中主要定義了三個內(nèi)容:

  • 這個 state 調(diào)用的 Service 方法.
  • 提交失敗后的補償 State(CompensateState)。
  • 提交成功后應(yīng)該跳轉(zhuǎn)的下一個 State(ChoiceAccountState)。

(2)Choice

下面來看 ChoiceAccountState 這個狀態(tài)節(jié)點,Json 文件定義如下:

"ChoiceAccountState":{
 "Type": "Choice",
 "Choices":[
  {
   "Expression":"[SaveOrderResult] == true",
   "Next":"ReduceAccount"
  }
 ],
 "Default":"Fail"
}

對應(yīng)的下個節(jié)點是 ReduceAccount,如果失敗就會跳轉(zhuǎn) Fail 狀態(tài)。

(3)Fail

上面 orderSave 這個狀態(tài)節(jié)點如果發(fā)生異常,會跳轉(zhuǎn)到 CompensationTrigger,CompensationTrigger 狀態(tài)節(jié)點定義如下:

"CompensationTrigger": {
 "Type": "CompensationTrigger",
 "Next": "Fail"
}

這個節(jié)點會觸發(fā) SaveOrder 中定義的補償服務(wù),然后將最終狀態(tài)流轉(zhuǎn)到 Fail。同時我們也看到,只要到了 CompensationTrigger 這個狀態(tài)節(jié)點,最終狀態(tài)就會流轉(zhuǎn)到 Fail。

下面我們把整個 Json 文件的定義貼出來看一下:

{
    "Name": "buyGoodsOnline",
    "Comment": "buy a goods on line, add order, deduct account, deduct storage ",
    "StartState": "SaveOrder",
    "Version": "0.0.1",
 #定義狀態(tài)
    "States": {
        "SaveOrder": {
            "Type": "ServiceTask",
            "ServiceName": "orderSave",
            "ServiceMethod": "saveOrder",
            "CompensateState": "DeleteOrder",
            "Next": "ChoiceAccountState",
            "Input": [
                "$.[businessKey]",
                "$.[order]"
            ],
            "Output": {
                "SaveOrderResult": "$.#root"
            },
            "Status": {
                "#root == true": "SU",
                "#root == false": "FA",
                "$Exception{java.lang.Throwable}": "UN"
            },
   "Catch": [
                {
                    "Exceptions": [
                        "java.lang.Throwable"
                    ],
                    "Next": "CompensationTrigger"
                }
            ]
        },
        "ChoiceAccountState":{
            "Type": "Choice",
            "Choices":[
                {
                    "Expression":"[SaveOrderResult] == true",
                    "Next":"ReduceAccount"
                }
            ],
            "Default":"Fail"
        },
        "ReduceAccount": {
            "Type": "ServiceTask",
            "ServiceName": "accountService",
            "ServiceMethod": "decrease",
            "CompensateState": "CompensateReduceAccount",
            "Next": "ChoiceStorageState",
            "Input": [
                "$.[businessKey]",
                "$.[userId]",
                "$.[money]",
                {
                    "throwException" : "$.[mockReduceAccountFail]"
                }
            ],
            "Output": {
                "ReduceAccountResult": "$.#root"
            },
            "Status": {
                "#root == true": "SU",
                "#root == false": "FA",
                "$Exception{java.lang.Throwable}": "UN"
            },
            "Catch": [
                {
                    "Exceptions": [
                        "java.lang.Throwable"
                    ],
                    "Next": "CompensationTrigger"
                }
            ]
        },
        "ChoiceStorageState":{
            "Type": "Choice",
            "Choices":[
                {
                    "Expression":"[ReduceAccountResult] == true",
                    "Next":"ReduceStorage"
                }
            ],
            "Default":"Fail"
        },
        "ReduceStorage": {
            "Type": "ServiceTask",
            "ServiceName": "storageService",
            "ServiceMethod": "decrease",
            "CompensateState": "CompensateReduceStorage",
            "Input": [
                "$.[businessKey]",
                "$.[productId]",
                "$.[count]",
                {
                    "throwException" : "$.[mockReduceStorageFail]"
                }
            ],
            "Output": {
                "ReduceStorageResult": "$.#root"
            },
            "Status": {
                "#root == true": "SU",
                "#root == false": "FA",
                "$Exception{java.lang.Throwable}": "UN"
            },
            "Catch": [
                {
                    "Exceptions": [
                        "java.lang.Throwable"
                    ],
                    "Next": "CompensationTrigger"
                }
            ],
            "Next": "Succeed"
        },
        "DeleteOrder": {
            "Type": "ServiceTask",
            "ServiceName": "orderSave",
            "ServiceMethod": "deleteOrder",
            "Input": [
                "$.[businessKey]",
                "$.[order]"
            ]
        },
        "CompensateReduceAccount": {
            "Type": "ServiceTask",
            "ServiceName": "accountService",
            "ServiceMethod": "compensateDecrease",
            "Input": [
                "$.[businessKey]",
                "$.[userId]",
                "$.[money]"
            ]
        },
        "CompensateReduceStorage": {
            "Type": "ServiceTask",
            "ServiceName": "storageService",
            "ServiceMethod": "compensateDecrease",
            "Input": [
                "$.[businessKey]",
                "$.[productId]",
                "$.[count]"
            ]
        },
        "CompensationTrigger": {
            "Type": "CompensationTrigger",
            "Next": "Fail"
        },
        "Succeed": {
            "Type":"Succeed"
        },
        "Fail": {
            "Type":"Fail",
            "ErrorCode": "PURCHASE_FAILED",
            "Message": "purchase failed"
        }
    }
}

上面 Json 文件中定義的 buyGoodsOnline,是狀態(tài)機加載的入口,狀態(tài)機會找到這個 name,然后把狀態(tài)加載到自己的內(nèi)存中。下面,我們再來總結(jié)一下電商案例中分布式事務(wù)狀態(tài)流轉(zhuǎn)過程:

4.狀態(tài)機應(yīng)用

上面的電商例子中,三個分支服務(wù)分別定義了三個 State,對應(yīng)的 ServiceMethod 如下:

  • SaveOrder#saveOrder:
public boolean saveOrder(String businessKey, Order order) {
 logger.info("保存訂單, businessKey:{}, order: {}", businessKey, order);
 orderDao.create(order);
 return true;
}
  • ReduceAccount#decrease
public boolean decrease(String businessKey, Long userId, BigDecimal money) {
 return accountApi.decrease(businessKey, userId, money);
}
  • ReduceStorage#decrease
public boolean decrease(String businessKey, Long productId, Integer count) {
 return storageApi.decrease(businessKey, productId, count);
}

狀態(tài)機在啟動的時候,需要把上面方法中的參數(shù)都傳入,實例代碼如下:

StateMachineEngine stateMachineEngine = (StateMachineEngine) ApplicationContextUtils.getApplicationContext().getBean("stateMachineEngine");
Map<String, Object> startParams = new HashMap<>(3);
String businessKey = String.valueOf(System.currentTimeMillis());
startParams.put("businessKey", businessKey);
startParams.put("order", order);
startParams.put("mockReduceAccountFail", "true");
startParams.put("userId", order.getUserId());
startParams.put("money", order.getPayAmount());
startParams.put("productId", order.getProductId());
startParams.put("count", order.getCount());
//這里采用同步方法
StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("buyGoodsOnline", null, businessKey, startParams);

5.狀態(tài)機原理

下面這張圖來自于 Seata 官網(wǎng),主要講解了狀態(tài)機的工作原理:

圖片圖片

  • 狀態(tài)機啟動時,首先啟動了全局事務(wù)。
  • 將狀態(tài)機的參數(shù)記錄在本地 seata_state_machine_inst 表。
  • 向 Seata Server 注冊分支事務(wù)。
  • 執(zhí)行 StateA 并記錄狀態(tài)到本地數(shù)據(jù)庫,同時會產(chǎn)生路由事件放入 EventQueue,執(zhí)行 StateB 時取出路由消息觸發(fā)執(zhí)行。同樣 StateB 執(zhí)行時也會產(chǎn)生路由消息放入 EventQueue。
  • 從 EventQueue 取出路由消息執(zhí)行 StateC。
  1. 狀態(tài)機結(jié)束流程,提交或回滾全局事務(wù)。

6.高可用

Seata 中的狀態(tài)機并不是獨立部署,而是內(nèi)嵌在應(yīng)用中,由于狀態(tài)機上下文和執(zhí)行日志都記錄在本地數(shù)據(jù)庫中,所以狀態(tài)機本身是無狀態(tài)的。

狀態(tài)機啟動時,會發(fā)送狀態(tài)到 Seata Server,當(dāng)一個應(yīng)用宕機后,Seata Server 能感知到,并會把恢復(fù)請求發(fā)送到存活的實例,收到請求的實例從數(shù)據(jù)庫取出狀態(tài)機上下文和執(zhí)行日志進行恢復(fù)。如下圖:

7.總結(jié)

本文講解了分布式事務(wù)中間件 Seata 給 Saga 模式設(shè)計的狀態(tài)機使用方式和原理。狀態(tài)機在我們的日常工作中使用非常廣泛,希望 Seata 的設(shè)計能對我們設(shè)計狀態(tài)機提供思路和參考。

責(zé)任編輯:姜華 來源: 君哥聊技術(shù)
相關(guān)推薦

2020-10-16 06:30:45

分布式場景方案

2021-08-19 09:00:00

微服務(wù)開發(fā)架構(gòu)

2021-04-27 07:52:18

分布式事務(wù)系統(tǒng)

2021-06-01 12:45:19

數(shù)據(jù)庫分布式OceanBase

2021-09-07 09:26:13

Python 開發(fā)編程語言

2022-01-26 13:46:40

分布式事務(wù)集合,這

2019-06-10 14:53:15

分布式架構(gòu)應(yīng)用服務(wù)

2020-06-28 07:39:44

Kafka分布式消息

2024-01-08 09:46:47

2022-06-27 08:21:05

Seata分布式事務(wù)微服務(wù)

2025-04-28 00:44:04

2021-08-16 15:40:04

分布式架構(gòu)系統(tǒng)

2022-06-21 08:27:22

Seata分布式事務(wù)

2017-07-26 15:08:05

大數(shù)據(jù)分布式事務(wù)

2021-03-18 09:18:39

分布式事務(wù)Saga

2022-12-21 08:40:05

限流器分布式限流

2021-07-07 07:14:48

分布式ID分布式系統(tǒng)

2019-10-10 09:16:34

Zookeeper架構(gòu)分布式

2014-10-09 09:43:05

虛擬機遷移

2022-07-03 14:03:57

分布式Seata
點贊
收藏

51CTO技術(shù)棧公眾號