自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

從零到一搭建 TCC 分布式事務(wù)框架

云計算 分布式
在 gotcc 中,對事務(wù)協(xié)調(diào)器 TXManager 相關(guān)的核心處理邏輯,如 Try-Confirm/Cancel 兩階段流程串聯(lián)、TCC 組件注冊、異步輪詢?nèi)蝿?wù)等內(nèi)容進行實現(xiàn),并將這部分核心內(nèi)容抽出放在了 SDK 中,供應(yīng)用方使用。

本著“理論先行,實踐緊隨”的理念,這里強烈建議大家先完成上期理論篇內(nèi)容的學習,再跟隨我的思路一起進入本期實戰(zhàn)篇的學習.

在寫這篇文章的過程中,我在另一邊并行完成了一個開源項目的搭建. 這個項目是基于 golang 從零到一實現(xiàn)的 TCC 分布式事務(wù)框架,當前于 github 的開源地址為:https://github.com/xiaoxuxiansheng/gotcc

圖片

本期分享內(nèi)容將會緊密圍繞著這個開源項目展開. 受限于個人水平,在項目實現(xiàn)以及文章講解中有著諸多不當之處,權(quán)當在此拋磚引玉,歡迎大家多多批評指正.

1 架構(gòu)設(shè)計

1.1 整體架構(gòu)

首先我們簡單回顧一下有關(guān)于分布式事務(wù)以及 TCC 的概念.

所謂事務(wù),對應(yīng)的語義是“要么什么都不做,要么全都做到位”,需要針對多個執(zhí)行動作,建立一套一氣呵成、不可拆解的運行機制.

在事務(wù)中包括的一些執(zhí)行動作,倘若涉及到跨數(shù)據(jù)庫、跨組件、跨服務(wù)等分布式操作,那我們就稱這樣的事務(wù)是分布式事務(wù).

分布式事務(wù)在實現(xiàn)上存在很多技術(shù)難點,是一個頗具挑戰(zhàn)的有趣話題. 目前業(yè)界也形成了一套相對成熟且普遍認同的解決方案,就是——TCC:Try-Confirm/Cancel.

TCC 本質(zhì)上是一種 2PC(two phase commitment protocal 兩階段提交)的實現(xiàn):

? 把分布式事務(wù)中,負責維護狀態(tài)數(shù)據(jù)變更的模塊,封裝成一個 TCC 組件

? 把數(shù)據(jù)的變更狀態(tài)拆分為對應(yīng) Try 操作的【凍結(jié)】、對應(yīng) Confirm 操作的【成功】以及對應(yīng) Cancel 操作的【失敗回滾】

? 抽出一個統(tǒng)籌全局的事務(wù)協(xié)調(diào)者角色 TXManager. 在執(zhí)行分布式事務(wù)時,分為兩個階段:

? 階段 I:先對所有組件執(zhí)行 Try 操作

  • ? 階段 II:根據(jù)上階段 Try 操作的執(zhí)行結(jié)果,決定本輪執(zhí)行 Confirm 還是 Cancel 操作

圖片圖片

在我們實現(xiàn) TCC 框架的實戰(zhàn)環(huán)節(jié)中,首先需要明確的事情是:

? 哪部分內(nèi)容在 TCC 架構(gòu)中屬于通用的流程,這部分內(nèi)容可以抽取出來放在 sdk 中,以供后續(xù)復(fù)用

? 哪部分內(nèi)容需要給使用方預(yù)留出足夠的自由度,由使用方自行實現(xiàn),然后和通用 sdk 進行接軌.

最終,這兩部分內(nèi)容明確如下:

? 在 TCC sdk 中實現(xiàn)的通用邏輯包含了和事務(wù)協(xié)調(diào)器 txManager 有關(guān)的核心流程

? 事務(wù)協(xié)調(diào)器 TXManager 開啟事務(wù)以及 try-confirm/cancel 的 2PC 流程串聯(lián)

? 事務(wù)協(xié)調(diào)器 TXManager 異步輪詢?nèi)蝿?wù),用于推進事務(wù)從中間態(tài)走向終態(tài)

? TCC 組件的注冊流程

? 需要預(yù)定義事務(wù)日志存儲模塊 TXStore 的實現(xiàn)規(guī)范(聲明 interface)

? 需要預(yù)定義 TCC 組件 TCCComponent 的實現(xiàn)規(guī)范(聲明 interface)

? TCC 組件和 TXStore 兩部分內(nèi)容需要由使用方自行實現(xiàn):

? 使用方自行實現(xiàn) TCCComponent 類,包括其 Try、Confirm、Cancel 方法的執(zhí)行邏輯

? 使用方自行實現(xiàn)具體的 TXStore 日志存儲模塊. 可以根據(jù)實際需要,選型合適的存儲組件和存儲方式

圖片圖片

 

1.2 TCC Component

下面是關(guān)于 TCC 組件的定位:

? 這部分內(nèi)容需要由用戶自行實現(xiàn),并在 TXManager 啟動時將其注冊到注冊中心 RegistryCenter 當中.

? 當使用方調(diào)用 TXManager 開啟事務(wù)時,會通過 RegistryCenter 獲取這些組件,并對其進行使用

? TCC 組件需要具備的能力包括如下幾項:

圖片圖片

1.3 TX Manager

下面是關(guān)于事務(wù)協(xié)調(diào)器 TXManager 的定位.

  • ? TXManager 是整個 TCC 架構(gòu)中最核心的角色
  • ? TXManager 作為 gotcc 的統(tǒng)一入口,供使用方執(zhí)行啟動事務(wù)和注冊組件的操作
  • ? TXManager 作為中樞系統(tǒng)分別和 RegisterCenter、TXStore 交互
  • ? TXManager 需要串聯(lián)起整個 Try-Confirm/Canel 的 2PC 調(diào)用流程
  • ? TXManager 需要運行異步輪詢?nèi)蝿?wù),推進未完成的事務(wù)走向終態(tài)

圖片圖片

1.4 TX Store

TXStore 是用于存儲和管理事務(wù)日志明細記錄的模塊:

? 需要支持事務(wù)明細數(shù)據(jù)的 CRUD 能力

? 通常情況下,底層需要應(yīng)用到實際的存儲組件作為支持

? TXStore 在 gotcc 的 sdk 中體現(xiàn)為一個抽象的 interface. 需要由用戶完成具體類的實現(xiàn),并將其注入到 TXManager 當中.

圖片圖片

1.5 RegistryCenter

最后是 TCC 組件的注冊管理中心 RegistryCenter,負責給 txManager 提供出注冊和查詢 TCC 組件的能力.

圖片圖片

 

2 TXManager 核心源碼講解

理完了基本的流程和概念,下面我們一起開啟一線實戰(zhàn)環(huán)節(jié).

2.1 類圖

首先捋一下,在 gotcc 核心 sdk 中,涉及到的幾個核心類:

  • ? TXManager:事務(wù)協(xié)調(diào)器,class
  • ? TXStore:事務(wù)日志存儲模塊,interface
  • ? registryCenter:TCC 組件注冊管理中心,class
  • ? TCCComponent:TCC 組件,interface

通過下面的 UML 類圖,展示一下幾個核心類之間的關(guān)聯(lián)性:

圖片圖片

 

2.2 核心類定義

圖片圖片

 

2.2.1 TXManager

下面是關(guān)于事務(wù)協(xié)調(diào)器 TXManager 的幾個核心字段:

? txStore:內(nèi)置的事務(wù)日志存儲模塊,需要由使用方實現(xiàn)并完成注入

? registryCenter:TCC 組件的注冊管理中心

? opts:內(nèi)聚了一些 TXManager 的配置項,可以由使用方自定義,并通過 option 注入

? ctx:用于反映 TXManager 運行生命周期的的 context,當 ctx 終止時,異步輪詢?nèi)蝿?wù)也會隨之退出

? stop:用于停止 txManager 的控制器. 當 stop 被調(diào)用后,異步輪詢?nèi)蝿?wù)會被終止

type TXManager struct {
    ctx            context.Context
    stop           context.CancelFunc
    opts           *Options
    txStore        TXStore
    registryCenter *registryCenter
}


func NewTXManager(txStore TXStore, opts ...Option) *TXManager {
    ctx, cancel := context.WithCancel(context.Background())
    txManager := TXManager{
        opts:           &Options{},
        txStore:        txStore,
        registryCenter: newRegistryCenter(),
        ctx:            ctx,
        stop:           cancel,
    }


    for _, opt := range opts {
        opt(txManager.opts)
    }


    repair(txManager.opts)


    go txManager.run()
    return &txManager
}

2.2.2 RegistryCenter

注冊中心 registryCenter 中的內(nèi)容很簡單,通過 map 存儲所有注冊進來的 TCC 組件,要求各組件都有獨立的組件 ID;通過一把讀寫鎖 rwMutex 保護 map 的并發(fā)安全性

type registryCenter struct {
    mux        sync.RWMutex
    components map[string]component.TCCComponent
}


func newRegistryCenter() *registryCenter {
    return ?istryCenter{
        components: make(map[string]component.TCCComponent),
    }
}

2.2.3 TXStore

下面 gotcc sdk 中,對事務(wù)日志存儲模塊 TXStore interface 的定義,這個點很重要,要求后續(xù)使用方在實現(xiàn)具體的 TXStore 模塊時,需要實現(xiàn)這里所羅列出來的所有方法,并且要保證實現(xiàn)方法滿足預(yù)期的功能:

? CreateTX:創(chuàng)建一條事務(wù)明細記錄,會在入?yún)⒅袀魅氡臼聞?wù)涉及的 TCC 組件列表,同時需要在出參中返回全局唯一的事務(wù) id

? TXUpdate:更新一條事務(wù)明細記錄. 這里指的更新,針對于,事務(wù)中某個 TCC 組件 Try 響應(yīng)狀態(tài)的更新

? TXSubmit:提交一條事務(wù)的執(zhí)行結(jié)果. 要么置為成功,要么置為失敗

? GetHangingTXs:獲取所有未完成的事務(wù)明細記錄

? GetTX:根據(jù)事務(wù) id,獲取指定的一條事務(wù)明細記錄

? Lock:鎖住整個事務(wù)日志存儲模塊(要求為分布式鎖)

? Unlock:解鎖整個事務(wù)日志存儲模塊

type TXStore interface {
    // 創(chuàng)建一條事務(wù)明細記錄
    CreateTX(ctx context.Context, components ...component.TCCComponent) (txID string, err error)
    // 更新事務(wù)進度:
    // 規(guī)則為:倘若有一個 component try 操作執(zhí)行失敗,則整個事務(wù)失??;倘若所有 component try 操作執(zhí)行成功,則事務(wù)成功
    TXUpdate(ctx context.Context, txID string, componentID string, accept bool) error
    // 提交事務(wù)的最終狀態(tài)
    TXSubmit(ctx context.Context, txID string, success bool) error
    // 獲取到所有處于中間態(tài)的事務(wù)
    GetHangingTXs(ctx context.Context) ([]*Transaction, error)
    // 獲取指定的一筆事務(wù)
    GetTX(ctx context.Context, txID string) (*Transaction, error)
    // 鎖住事務(wù)日志表
    Lock(ctx context.Context, expireDuration time.Duration) error
    // 解鎖事務(wù)日志表
    Unlock(ctx context.Context) error
}

2.3 注冊組件

下面是注冊 TCC 組件的處理流程:

首先,使用方通過 TXManager 對外暴露的公開方法 Register,開啟注冊流程,傳入對應(yīng)的 TCCComponent:

func (t *TXManager) Register(component component.TCCComponent) error {
    return t.registryCenter.register(component)
}

? TXManager 會調(diào)用注冊中心 registeryCenter 的 register 方法,將對應(yīng) component 注入到 map 中. 這里有兩個點值得一提:

? Register 方法可以并發(fā)使用,其內(nèi)部會通過 rwMutex 維護 map 的并發(fā)安全性

? TCC 組件不能重復(fù)注冊,即不能存在重復(fù)的 component id

func (r *registryCenter) register(component component.TCCComponent) error {
    r.mux.Lock()
    defer r.mux.Unlock()
    if _, ok := r.components[component.ID()]; ok {
        return errors.New("repeat component id")
    }
    r.components[component.ID()] = component
    return nil
}

上游 TXManager 可以通過 component id,進行 TCC 組件的查詢. 倘若某個 component id 不存在,則會拋出錯誤:

func (r *registryCenter) getComponents(componentIDs ...string) ([]component.TCCComponent, error) {
    components := make([]component.TCCComponent, 0, len(componentIDs))


    r.mux.RLock()
    defer r.mux.RUnlock()


    for _, componentID := range componentIDs {
        component, ok := r.components[componentID]
        if !ok {
            return nil, fmt.Errorf("component id: %s not existed", componentID)
        }
        components = append(components, component)
    }


    return components, nil
}

2.4 事務(wù)主流程

下面進入最核心的部分,介紹一下整個分布式事務(wù)的運行流程.

2.4.1 主流程

用戶可以通過 txManager.Transaction 方法,一鍵啟動動一個分布式事務(wù)流程,其中包含的幾個核心步驟展示如下圖:

圖片圖片

txManager.Transaction 方法是用戶啟動分布式事務(wù)的入口,需要在入?yún)⒅新暶鞅敬问聞?wù)涉及到的組件以及需要在 Try 流程中傳遞給對應(yīng)組件的請求參數(shù):

type RequestEntity struct {
    // 組件名稱
    ComponentID string `json:"componentName"`
    // Try 請求時傳遞的參數(shù)
    Request map[string]interface{} `json:"request"`
}

txManager.Transaction 對應(yīng)源碼如下,核心步驟均給出了注釋. 核心的 try-confirm/cancel 流程,會在后續(xù)的 txManager.twoPhaseCommit 方法中展開.

// 啟動事務(wù)
func (t *TXManager) Transaction(ctx context.Context, reqs ...*RequestEntity) (bool, error) {
    // 1 限制分布式事務(wù)執(zhí)行時長
    tctx, cancel := context.WithTimeout(ctx, t.opts.Timeout)
    defer cancel()


    // 2 獲得所有的涉及使用的 tcc 組件
    componentEntities, err := t.getComponents(tctx, reqs...)
    if err != nil {
        return false, err
    }


    // 3 調(diào)用 txStore 模塊,創(chuàng)建新的事務(wù)明細記錄,并取得全局唯一的事務(wù) id
    txID, err := t.txStore.CreateTX(tctx, componentEntities.ToComponents()...)
    if err != nil {
        return false, err
    }


    // 4. 開啟兩階段提交流程:try-confirm/cancel
    return t.twoPhaseCommit(ctx, txID, componentEntities)
}

2.4.2 2PC 串聯(lián)

此處涉及 try-confirm/cancel 流程的串聯(lián),可以說是整個 gotcc 框架的精髓所在,請大家細品斟酌.

對應(yīng)流程圖展示如下,方法源碼中也給出了相對詳細的注釋:

圖片圖片

func (t *TXManager) twoPhaseCommit(ctx context.Context, txID string, componentEntities ComponentEntities) (bool, error) {
    // 1 創(chuàng)建子 context 用于管理子 goroutine 生命周期
    // 手握 cancel 終止器,能保證在需要的時候終止所有子 goroutine 生命周期
    cctx, cancel := context.WithCancel(ctx)
    defer cancel()


    // 2 創(chuàng)建一個 chan,用于接收子 goroutine 傳遞的錯誤
    errCh := make(chan error)
    // 3 并發(fā)啟動,批量執(zhí)行各 tcc 組件的 try 流程
    go func() {
        // 通過 waitGroup 進行多個子 goroutine 的匯總
        var wg sync.WaitGroup
        for _, componentEntity := range componentEntities {
            // shadow
            componentEntity := componentEntity
            wg.Add(1)
            // 并發(fā)執(zhí)行各組件的 try 流程
            go func() {
                defer wg.Done()
                resp, err := componentEntity.Component.Try(cctx, &component.TCCReq{
                    ComponentID: componentEntity.Component.ID(),
                    TXID:        txID,
                    Data:        componentEntity.Request,
                })
                // 出現(xiàn) tcc 組件執(zhí)行 try 操作失敗,則需要對事務(wù)明細記錄進行更新,同時把錯誤通過 chan 拋給父 goroutine
                if err != nil || !resp.ACK {
                    // 對對應(yīng)的事務(wù)進行更新
                    _ = t.txStore.TXUpdate(cctx, txID, componentEntity.Component.ID(), false)
                    errCh <- fmt.Errorf("component: %s try failed", componentEntity.Component.ID())
                    return
                }
                // try 請求成功,則對事務(wù)明細記錄進行更新. 倘若更新失敗,也要視為錯誤,拋給父 goroutine
                if err = t.txStore.TXUpdate(cctx, txID, componentEntity.Component.ID(), true); err != nil {
                    errCh <- err
                }
            }()
        }


        // 等待所有子 goroutine 運行完成
        wg.Wait()
        // 關(guān)閉 errCh,告知父 goroutine 所有任務(wù)已運行完成的信息
        close(errCh)
    }()




    successful := true
    // 4 通過 chan,阻塞子 goroutine 執(zhí)行完成
    // 4.1 但凡出現(xiàn)一個子 goroutine 遇到了錯誤,則會提前接收到錯誤,并調(diào)用 cancel 方法熔斷其他所有子 goroutine 流程
    // 4.2 倘若所有子 goroutine 都執(zhí)行成功,則會通過 chan 的關(guān)閉事件推進流程,對應(yīng) err 為 nil
    if err := <-errCh; err != nil {
        // 只要有一筆 try 請求出現(xiàn)問題,其他的都進行終止
        cancel()
        successful = false
    }


    // 5 異步執(zhí)行第二階段的 confirm/cancel 流程
    // 之所以是異步,是因為實際上在第一階段 try 的響應(yīng)結(jié)果塵埃落定時,對應(yīng)事務(wù)的成敗已經(jīng)有了定論
    // 第二階段能夠容忍異步執(zhí)行的原因在于,執(zhí)行失敗時,還有輪詢?nèi)蝿?wù)進行兜底
    go t.advanceProgressByTXID(txID)
    
    // 6 響應(yīng)結(jié)果
    // 6.1 倘若所有 try 請求都成功,則 successful 為 try,事務(wù)成功
    // 6.2 但凡有一個 try 請求處理出現(xiàn)問題,successful 為 false,事務(wù)失敗
    return successful, nil
}

2.4.3 事務(wù)進度推進

當一筆事務(wù)在第一階段中所有的 Try 請求都有了響應(yīng)后,就需要根據(jù)第一階段的結(jié)果,執(zhí)行第二階段的 Confirm 或者 Cancel 操作,并且將事務(wù)狀態(tài)推進為成功或失敗的終態(tài):

  • ? 倘若所有組件的 Try 響應(yīng)都是成功,則需要批量調(diào)用組件的 Confirm 接口,并在這之后將事務(wù)狀態(tài)更新為成功
  • ? 倘若存在某個組件 Try 響應(yīng)為失敗,則需要批量調(diào)用組件的 Cancel 接口,并在這之后將事務(wù)狀態(tài)更新為失敗
  • ? 倘若當前事務(wù)已執(zhí)行超時,同樣需要批量調(diào)用組件的 Cancel 接口,并在這之后將事務(wù)狀態(tài)更新為失敗

圖片圖片

// 傳入一個事務(wù) id 推進其進度
func (t *TXManager) advanceProgressByTXID(txID string) error {
    // 獲取事務(wù)日志明細
    tx, err := t.txStore.GetTX(t.ctx, txID)
    if err != nil {
        return err
    }
    // 推進進度
    return t.advanceProgress(tx)
}
// 傳入一個事務(wù) id 推進其進度
func (t *TXManager) advanceProgress(tx *Transaction) error {
    // 1 推斷出事務(wù)當前的狀態(tài)
    // 1.1 倘若所有組件 try 都成功,則為 successful
    // 1.2 倘若存在組件 try 失敗,則為 failure
    // 1.3 倘若事務(wù)超時了,則為 failure
    // 1.4 否則事務(wù)狀態(tài)為 hanging
    txStatus := tx.getStatus(time.Now().Add(-t.opts.Timeout))
    // hanging 狀態(tài)的事務(wù)暫時不處理
    if txStatus == TXHanging {
        return nil
    }


    // 2 根據(jù)事務(wù)是否成功,定制不同的處理函數(shù)
    success := txStatus == TXSuccessful
    var confirmOrCancel func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error)
    var txAdvanceProgress func(ctx context.Context) error
    if success {
        // 如果事務(wù)成功,則需要對組件進行 confirm
        confirmOrCancel = func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error) {
            return component.Confirm(ctx, tx.TXID)
        }
        // 如果事務(wù)成功,則需要在最后更新事務(wù)日志記錄的狀態(tài)為成功
        txAdvanceProgress = func(ctx context.Context) error {
            return t.txStore.TXSubmit(ctx, tx.TXID, true)
        }




    } else {
        // 如果事務(wù)失敗,則需要對組件進行 cancel
        confirmOrCancel = func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error) {           
            return component.Cancel(ctx, tx.TXID)
        }


        // 如果事務(wù)失敗,則需要在最后更新事務(wù)日志記錄的狀態(tài)為失敗
        txAdvanceProgress = func(ctx context.Context) error {           
            return t.txStore.TXSubmit(ctx, tx.TXID, false)
        }
    }


    // 3 批量調(diào)用組件,執(zhí)行第二階段的 confirm/cancel 操作
    for _, component := range tx.Components {
        // 獲取對應(yīng)的 tcc component
        components, err := t.registryCenter.getComponents(component.ComponentID)
        if err != nil || len(components) == 0 {
            return errors.New("get tcc component failed")
        }      
        resp, err := confirmOrCancel(t.ctx, components[0])
        if err != nil {
            return err
        }
        if !resp.ACK {
            return fmt.Errorf("component: %s ack failed", component.ComponentID)
        }
    }


    // 4 二階段 confirm/cancel 操作都執(zhí)行完成后,對事務(wù)狀態(tài)進行提交
    return txAdvanceProgress(t.ctx)
}

2.5 異步輪詢流程

接下來聊聊 txManager 的異步輪詢流程. 這個流程同樣非常重要,是支撐 txManager 魯棒性的重要機制.

倘若存在事務(wù)已經(jīng)完成第一階段 Try 操作的執(zhí)行,但是第二階段沒執(zhí)行成功,則需要由異步輪詢流程進行兜底處理,為事務(wù)補齊第二階段的操作,并將事務(wù)狀態(tài)更新為終態(tài)

2.5.1 啟動時機

異步輪詢?nèi)蝿?wù)是在 txManager 的初始化流程中啟動的,通過異步 goroutine 持久運行:

func NewTXManager(txStore TXStore, opts ...Option) *TXManager {
    ctx, cancel := context.WithCancel(context.Background())
    txManager := TXManager{
        opts:           &Options{},
        txStore:        txStore,
        registryCenter: NewRegistryCenter(),
        ctx:            ctx,
        stop:           cancel,
    }


    for _, opt := range opts {
        opt(txManager.opts)
    }


    repair(txManager.opts)


    go txManager.run()
    return &txManager
}

2.5.2 輪詢流程

異步輪詢?nèi)蝿?wù)運行時,基于 for 循環(huán) + select 多路復(fù)用的方式,實現(xiàn)定時任務(wù)的執(zhí)行.

輪詢的時間間隔會根據(jù)一輪任務(wù)處理過程中是否出現(xiàn)錯誤,而進行動態(tài)調(diào)整. 這里調(diào)整規(guī)則指的是:當一次處理流程中發(fā)生了錯誤,就需要調(diào)大當前節(jié)點輪詢的時間間隔,讓其他節(jié)點的異步輪詢?nèi)蝿?wù)得到更大的執(zhí)行機會.

圖片圖片

func (t *TXManager) run() {
    var tick time.Duration
    var err error
    // 1 for 循環(huán)自旋式運行任務(wù)
    for {
        // 如果處理過程中出現(xiàn)了錯誤,需要增長輪詢時間間隔
        if err == nil {
            tick = t.opts.MonitorTick
        } else {
            tick = t.backOffTick(tick)
        }
        
        // select 多路復(fù)用
        select {
        // 倘若 txManager.ctx 被終止,則異步輪詢?nèi)蝿?wù)退出
        case <-t.ctx.Done():
            return


        // 2 等待 tick 對應(yīng)時長后,開始執(zhí)行任務(wù)
        case <-time.After(tick):
            // 對 txStore 加分布式鎖,避免分布式服務(wù)下多個服務(wù)節(jié)點的輪詢?nèi)蝿?wù)重復(fù)執(zhí)行
            if err = t.txStore.Lock(t.ctx, t.opts.MonitorTick); err != nil {
                // 取鎖失敗時(大概率被其他節(jié)點占有),不需要增加 tick 時長
                err = nil
                continue
            }


            // 3 獲取處于 hanging 狀態(tài)的事務(wù)
            var txs []*Transaction
            if txs, err = t.txStore.GetHangingTXs(t.ctx); err != nil {
                _ = t.txStore.Unlock(t.ctx)
                continue
            }
   
            // 4 批量推進事務(wù)進度
            err = t.batchAdvanceProgress(txs)
            _ = t.txStore.Unlock(t.ctx)
        }
    }
}

 

有關(guān)于輪詢時間間隔的退避謙讓策略為:每次對時間間隔進行翻倍,封頂為初始時長的 8 倍:

func (t *TXManager) backOffTick(tick time.Duration) time.Duration {
    tick <<= 1
    if threshold := t.opts.MonitorTick << 3; tick > threshold {
        return threshold
    }
    return tick
}

2.5.3 批量推進事務(wù)進度

下面是異步輪詢?nèi)蝿?wù)批量推進事務(wù)第二階段執(zhí)行的流程,核心是開啟多個 goroutine 并發(fā)對多項事務(wù)進行處理:

func (t *TXManager) batchAdvanceProgress(txs []*Transaction) error {
    // 1 創(chuàng)建一個 chan,用于接收子 goroutine 傳輸?shù)?err
    errCh := make(chan error)
    go func() {
        // 2 通過 waitGroup 聚合多個子 groutine
        var wg sync.WaitGroup
        for _, tx := range txs {
            // shadow
            tx := tx
            wg.Add(1)
            go func() {
                defer wg.Done()
                // 3 推進每筆事務(wù)的進度
                if err := t.advanceProgress(tx); err != nil {
                    // 遇到錯誤則投遞到 errCh
                    errCh <- err
                }
            }()
        }
        
        // 4 收口等待所有子 goroutine 執(zhí)行完成
        wg.Wait()
        // 5 所有子 goroutine 執(zhí)行完成后關(guān)閉 chan,喚醒阻塞等待的父 goroutine
        close(errCh)
    }()


    // 記錄遇到的第一個錯誤
    var firstErr error
    // 6 父 goroutine 通過 chan 阻塞在這里,直到所有 goroutine 執(zhí)行完成,chan 被 close 才能往下
    for err := range errCh {
        // 記錄遇到的第一個錯誤
        if firstErr != nil {
            continue
        }
        firstErr = err
    }


    // 7 返回錯誤,核心是標識執(zhí)行過程中,是否發(fā)生過錯誤
    return firstErr
}

3 GOTCC 使用案例講解

從第 3 章開始,我們從實際應(yīng)用 gotcc 框架的使用方視角出發(fā),對所需要實現(xiàn)的模塊進行定義,然后給出應(yīng)用 gotcc 框架的代碼示例.

3.1 TCC 組件實現(xiàn)

首先,我們對 TCC 組件的具體實現(xiàn)類進行定義:

3.1.1 類定義

定義一個 MockComponent 類,其中內(nèi)置了 redis 客戶端,用于完成一些狀態(tài)數(shù)據(jù)的存取.

圖片圖片

// 實現(xiàn)的 tcc 組件
type MockComponent struct {
    // tcc 組件唯一標識 id,構(gòu)造時由使用方傳入
    id     string
    // redis 客戶端
    client *redis_lock.Client
}


func NewMockComponent(id string, client *redis_lock.Client) *MockComponent {
    return &MockComponent{
        id:     id,
        client: client,
    }
}


// 返回 tcc 組件的唯一標識 id
func (m *MockComponent) ID() string {
    return m.id
}

3.1.2 Try 流程

下面實現(xiàn)一下 TCC 組件的 Try 方法,關(guān)鍵要點已于代碼中通過注釋的形式給出:

圖片圖片

func (m *MockComponent) Try(ctx context.Context, req *component.TCCReq) (*component.TCCResp, error) {
    // 1 基于 txID 維度加 redis 分布式鎖
    lock := redis_lock.NewRedisLock(pkg.BuildTXLockKey(m.id, req.TXID), m.client)
    if err := lock.Lock(ctx); err != nil {
        return nil, err
    }
    defer func() {
        _ = lock.Unlock(ctx)
    }()


    // 2 基于 txID 冪等性去, 需要對事務(wù)的狀態(tài)進行檢查
    txStatus, err := m.client.Get(ctx, pkg.BuildTXKey(m.id, req.TXID))
    if err != nil && !errors.Is(err, redis_lock.ErrNil) {
        return nil, err
    }


    res := component.TCCResp{
        ComponentID: m.id,
        TXID:        req.TXID,
    }
    switch txStatus {
    case TXTried.String(), TXConfirmed.String(): // 重復(fù)的 try 請求,給予成功的響應(yīng)
        res.ACK = true
        return &res, nil
    case TXCanceled.String(): // 此前該事務(wù)已 cancel,則拒絕本次 try 請求
        return &res, nil
    default:
    }


    // 3 建立 txID 與 bizID 的關(guān)聯(lián)
    bizID := gocast.ToString(req.Data["biz_id"])
    if _, err = m.client.Set(ctx, pkg.BuildTXDetailKey(m.id, req.TXID), bizID); err != nil {
        return nil, err
    }


    // 4 把 bizID 對應(yīng)的業(yè)務(wù)數(shù)據(jù)置為凍結(jié)態(tài)
    reply, err := m.client.SetNX(ctx, pkg.BuildDataKey(m.id, req.TXID, bizID), DataFrozen.String())
    if err != nil {
        return nil, err
    }
    // 倘若數(shù)據(jù)此前已凍結(jié)或已使用,則拒絕本次 try 請求
    if reply != 1 {
        return &res, nil
    }


    // 5 更新當前組件下的事務(wù)狀態(tài)為 tried
    _, err = m.client.Set(ctx, pkg.BuildTXKey(m.id, req.TXID), TXTried.String())
    if err != nil {
        return nil, err
    }


    // 6 給予接收 try 請求的響應(yīng)
    res.ACK = true
    return &res, nil
}

3.1.3 Confirm 流程

下面實現(xiàn)一下 TCC 組件的 Confirm 方法,關(guān)鍵要點已于代碼中通過注釋的形式給出:

圖片圖片

func (m *MockComponent) Confirm(ctx context.Context, txID string) (*component.TCCResp, error) {
    // 1 基于 txID 維度加鎖
    lock := redis_lock.NewRedisLock(pkg.BuildTXLockKey(m.id, txID), m.client)
    if err := lock.Lock(ctx); err != nil {
        return nil, err
    }
    defer func() {
        _ = lock.Unlock(ctx)
    }()




    // 2. 校驗事務(wù)狀態(tài),要求對應(yīng)組件下,事務(wù)此前的狀態(tài)為 tried
    txStatus, err := m.client.Get(ctx, pkg.BuildTXKey(m.id, txID))
    if err != nil {
        return nil, err
    }


    res := component.TCCResp{
        ComponentID: m.id,
        TXID:        txID,
    }
    switch txStatus {
    case TXConfirmed.String(): // 事務(wù)狀態(tài)已 confirm,直接冪等響應(yīng)為成功
        res.ACK = true
        return &res, nil
    case TXTried.String(): // 只有事務(wù)狀態(tài)為 try 才是合法的,會對程序放行
    default: // 其他情況直接拒絕,ack 為 false
        return &res, nil
    }


    // 3 獲取事務(wù)對應(yīng)的 bizID
    bizID, err := m.client.Get(ctx, pkg.BuildTXDetailKey(m.id, txID))
    if err != nil {
        return nil, err
    }


    // 4. 校驗業(yè)務(wù)數(shù)據(jù)此前狀態(tài)是否為凍結(jié)
    dataStatus, err := m.client.Get(ctx, pkg.BuildDataKey(m.id, txID, bizID))
    if err != nil {
        return nil, err
    }
    
    // 如果此前非凍結(jié)態(tài),則拒絕本次請求
    if dataStatus != DataFrozen.String() {
        return &res, nil
    }




    // 5 把業(yè)務(wù)數(shù)據(jù)的更新操作置為 successful
    if _, err = m.client.Set(ctx, pkg.BuildDataKey(m.id, txID, bizID), DataSuccessful.String()); err != nil {
        return nil, err
    }


    // 6 把對應(yīng)組件下的事務(wù)狀態(tài)更新為成功,這一步哪怕失敗了也不阻塞主流程
    _, _ = m.client.Set(ctx, pkg.BuildTXKey(m.id, txID), TXConfirmed.String())


    // 7 處理成功,給予成功的響應(yīng)
    res.ACK = true
    return &res, nil
}

3.1.4 Cancel 流程

下面實現(xiàn)一下 TCC 組件的 Cancel 方法,關(guān)鍵要點已于代碼中通過注釋的形式給出:

func (m *MockComponent) Cancel(ctx context.Context, txID string) (*component.TCCResp, error) {
    // 1 基于 txID 維度加鎖
    lock := redis_lock.NewRedisLock(pkg.BuildTXLockKey(m.id, txID), m.client)
    if err := lock.Lock(ctx); err != nil {
        return nil, err
    }
    defer func() {
        _ = lock.Unlock(ctx)
    }()


    // 2 校驗事務(wù)狀態(tài),只要不是 confirmed,都允許被置為 canceld
    txStatus, err := m.client.Get(ctx, pkg.BuildTXKey(m.id, txID))
    if err != nil && !errors.Is(err, redis_lock.ErrNil) {
        return nil, err
    }
    // 倘若組件內(nèi)事務(wù)此前的狀態(tài)為 confirmed,則說明流程有異常.
    if txStatus == TXConfirmed.String() {
        return nil, fmt.Errorf("invalid tx status: %s, txid: %s", txStatus, txID)
    }


    // 3 根據(jù)事務(wù)獲取對應(yīng)的 bizID
    bizID, err := m.client.Get(ctx, pkg.BuildTXDetailKey(m.id, txID))
    if err != nil {
        return nil, err
    }


    // 4 刪除對應(yīng)的 frozen 凍結(jié)記錄,代表對數(shù)據(jù)執(zhí)行了回滾操作
    if err = m.client.Del(ctx, pkg.BuildDataKey(m.id, txID, bizID)); err != nil {
        return nil, err
    }


    // 5 把事務(wù)狀態(tài)更新為 canceled
    _, err = m.client.Set(ctx, pkg.BuildTXKey(m.id, txID), TXCanceled.String())
    if err != nil {
        return nil, err
    }


    // 6 給予處理成功的 ack 
    return &component.TCCResp{
        ACK:         true,
        ComponentID: m.id,
        TXID:        txID,
    }, nil
}

3.2 TX Store 實現(xiàn)

接下來是關(guān)于事務(wù)日志存儲模塊 TXStore 的具體實現(xiàn):

3.2.1 類定義

聲明了一個 MockTXStore 類,里面通過 mysql 存儲事務(wù)日志明細數(shù)據(jù),通過 redis 實現(xiàn) TXStore 模塊的分布式鎖.

其中和事務(wù)日志明細數(shù)據(jù)庫直接交互的操作被封裝在 TXRecordDAO 當中.

圖片圖片

// TXStore 模塊具體實現(xiàn)
type MockTXStore struct {
    // redis 客戶端,用于實現(xiàn)分布式鎖
    client *redis_lock.Client
    // 事務(wù)日志存儲 DAO 層
    dao    *expdao.TXRecordDAO
}


func NewMockTXStore(dao *expdao.TXRecordDAO, client *redis_lock.Client) *MockTXStore {
    return &MockTXStore{
        dao:    dao,
        client: client,
    }
}

事務(wù)日志存儲 DAO 層:

type TXRecordDAO struct {
    db *gorm.DB
}


func NewTXRecordDAO(db *gorm.DB) *TXRecordDAO {
    return &TXRecordDAO{
        db: db,
    }
}

接下來是關(guān)于事務(wù)日志明細記錄的持久化對象(PO,Persistent Object)模型定義:

? 內(nèi)置了 gorm.Model,包含了主鍵 ID、創(chuàng)建時間 CreatedAt、更新時間 UpdatedAt、刪除時間 DeletedAt 幾個字段

? 事務(wù)狀態(tài) Status,標識事務(wù)所處的狀態(tài),分為進行中 hanging、成功 successful、失敗 failure

? 組件 Try 響應(yīng)明細記錄 ComponentTryStatuses: 記錄了事務(wù)下各組件 Try 請求響應(yīng)結(jié)果,會以一個 json 字符串的格式存儲,其真實的類型為 map[string]*ComponentTryStatus

type TXRecordPO struct {
    gorm.Model
    Status               string `gorm:"status"`
    ComponentTryStatuses string `gorm:"component_try_statuses"`
}


func (t TXRecordPO) TableName() string {
    return "tx_record"
}


type ComponentTryStatus struct {
    ComponentID string `json:"componentID"`
    TryStatus   string `json:"tryStatus"`
}

下面是事務(wù)日志明細表的建表語句:

CREATE TABLE IF NOT EXISTS `tx_record`
(
    `id`                       bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主鍵ID',
    `status`                   varchar(16) NOT NULL COMMENT '事務(wù)狀態(tài) hanging/successful/failure',
    `component_try_statuses`   json DEFAULT NULL COMMENT '各組件 try 接口請求狀態(tài) hanging/successful/failure',
    `deleted_at`        datetime     DEFAULT NULL COMMENT '刪除時間',
    `created_at`        datetime     NOT NULL COMMENT '創(chuàng)建時間',
    `updated_at`        datetime     DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',
    PRIMARY KEY (`id`) USING BTREE COMMENT '主鍵索引',
    KEY `idx_status` (`status`) COMMENT '事務(wù)狀態(tài)索引'
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT '事務(wù)日志記錄';

3.2.2 創(chuàng)建事務(wù)記錄

接下來是通過過 TXStore 模塊創(chuàng)建一條事務(wù)明細記錄的實現(xiàn)代碼:

func (m *MockTXStore) CreateTX(ctx context.Context, components ...component.TCCComponent) (string, error) {
    // 創(chuàng)建一個記錄組件 try 響應(yīng)結(jié)果的 map,其中以組件 id 為 key
    componentTryStatuses := make(map[string]*expdao.ComponentTryStatus, len(components))
    for _, component := range components {
        componentTryStatuses[component.ID()] = &expdao.ComponentTryStatus{
            ComponentID: component.ID(),
            TryStatus:   txmanager.TryHanging.String(),
        }
    }


    statusesBody, _ := json.Marshal(componentTryStatuses)
    // 創(chuàng)建事務(wù)明細記錄 po 示例,調(diào)用 dao 模塊將記錄落庫
    txID, err := m.dao.CreateTXRecord(ctx, &expdao.TXRecordPO{
        Status:               txmanager.TXHanging.String(),
        ComponentTryStatuses: string(statusesBody),
    })
    if err != nil {
        return "", err
    }


    return gocast.ToString(txID), nil
}

dao 層創(chuàng)建事務(wù)明細記錄的實現(xiàn)代碼:

func (t *TXRecordDAO) CreateTXRecord(ctx context.Context, record *TXRecordPO) (uint, error) {
    return record.ID, t.db.WithContext(ctx).Model(&TXRecordPO{}).Create(record).Error
}

3.2.3 事務(wù)明細更新

下面是更新一筆事務(wù)明細的方法,其處理流程是:

? 針對這筆事務(wù)記錄加寫鎖

? 根據(jù)組件的 try 響應(yīng)結(jié)果,對 json字符串進行更新

? 將事務(wù)記錄寫回表中.

func (m *MockTXStore) TXUpdate(ctx context.Context, txID string, componentID string, accept bool) error {
    // 后續(xù)需要閉包傳入執(zhí)行函數(shù)
    do := func(ctx context.Context, dao *expdao.TXRecordDAO, record *expdao.TXRecordPO) error {
        componentTryStatuses := make(map[string]*expdao.ComponentTryStatus)
        _ = json.Unmarshal([]byte(record.ComponentTryStatuses), &componentTryStatuses)
        if accept {
            componentTryStatuses[componentID].TryStatus = txmanager.TrySucceesful.String()
        } else {
            componentTryStatuses[componentID].TryStatus = txmanager.TryFailure.String()
        }
        newBody, _ := json.Marshal(componentTryStatuses)
        record.ComponentTryStatuses = string(newBody)
        return dao.UpdateTXRecord(ctx, record)
    }


    _txID := gocast.ToUint(txID)
    return m.dao.LockAndDo(ctx, _txID, do)
}
// 通過 gorm 實現(xiàn)數(shù)據(jù)記錄加寫鎖,并執(zhí)行閉包函數(shù)的操作:
func (t *TXRecordDAO) LockAndDo(ctx context.Context, id uint, do func(ctx context.Context, dao *TXRecordDAO, record *TXRecordPO) error) error {
    // 開啟事務(wù)
    return t.db.Transaction(func(tx *gorm.DB) error {
        defer func() {
            if err := recover(); err != nil {
                tx.Rollback()
            }
        }()


        // 加寫鎖
        var record TXRecordPO
        if err := tx.Set("gorm:query_option", "FOR UPDATE").WithContext(ctx).First(&record, id).Error; err != nil {
            return err
        }


        txDAO := NewTXRecordDAO(tx)
        // 執(zhí)行閉包函數(shù)
        return do(ctx, txDAO, &record)
    })
}
// 更新一條事務(wù)日志數(shù)據(jù)記錄
func (t *TXRecordDAO) UpdateTXRecord(ctx context.Context, record *TXRecordPO) error {
    return t.db.WithContext(ctx).Updates(record).Error
}

3.2.4 查詢事務(wù)

接下來是查詢事務(wù)的兩個方法:

// 根據(jù)事務(wù) id 查詢指定的一筆事務(wù)明細記錄:
func (m *MockTXStore) GetTX(ctx context.Context, txID string) (*txmanager.Transaction, error) {
    // 通過 option 在查詢條件中注入事務(wù) id
    records, err := m.dao.GetTXRecords(ctx, expdao.WithID(gocast.ToUint(txID)))
    if err != nil {
        return nil, err
    }
    if len(records) != 1 {
        return nil, errors.New("get tx failed")
    }
 
    // 對各組件 try 明細內(nèi)容進行反序列化
    componentTryStatuses := make(map[string]*expdao.ComponentTryStatus)
    _ = json.Unmarshal([]byte(records[0].ComponentTryStatuses), &componentTryStatuses)


    components := make([]*txmanager.ComponentTryEntity, 0, len(componentTryStatuses))
    for _, tryItem := range componentTryStatuses {
        components = append(components, &txmanager.ComponentTryEntity{
            ComponentID: tryItem.ComponentID,
            TryStatus:   txmanager.ComponentTryStatus(tryItem.TryStatus),
        })
    }
    return &txmanager.Transaction{
        TXID:       txID,
        Status:     txmanager.TXStatus(records[0].Status),
        Components: components,
        CreatedAt:  records[0].CreatedAt,
    }, nil
}
// 獲取全量處于中間態(tài)的事務(wù)明細記錄
func (m *MockTXStore) GetHangingTXs(ctx context.Context) ([]*txmanager.Transaction, error) {
    // 通過 option 在查詢條件中指定事務(wù)狀態(tài)為 hanging
    records, err := m.dao.GetTXRecords(ctx, expdao.WithStatus(txmanager.TryHanging))
    if err != nil {
        return nil, err
    }


    txs := make([]*txmanager.Transaction, 0, len(records))
    for _, record := range records {
        // 對各組件 try 響應(yīng)結(jié)果進行反序列化
        componentTryStatuses := make(map[string]*expdao.ComponentTryStatus)
        _ = json.Unmarshal([]byte(record.ComponentTryStatuses), &componentTryStatuses)
        components := make([]*txmanager.ComponentTryEntity, 0, len(componentTryStatuses))
        for _, component := range componentTryStatuses {
            components = append(components, &txmanager.ComponentTryEntity{
                ComponentID: component.ComponentID,
                TryStatus:   txmanager.ComponentTryStatus(component.TryStatus),
            })
        }


        txs = append(txs, &txmanager.Transaction{
            TXID:       gocast.ToString(record.ID),
            Status:     txmanager.TXHanging,
            CreatedAt:  record.CreatedAt,
            Components: components,
        })
    }


    return txs, nil
}

在 dao 層實現(xiàn)了一個通用的事務(wù)日志查詢方法,通過 option 模式實現(xiàn)查詢條件的靈活組裝:

func (t *TXRecordDAO) GetTXRecords(ctx context.Context, opts ...QueryOption) ([]*TXRecordPO, error) {
    db := t.db.WithContext(ctx).Model(&TXRecordPO{})
    for _, opt := range opts {
        db = opt(db)
    }


    var records []*TXRecordPO
    return records, db.Scan(&records).Error
}

下面是關(guān)于 option 的具體定義,更多有關(guān)于這種模式的設(shè)計實現(xiàn)思路,可以參見我之前發(fā)表的文章——Golang 設(shè)計模式之建造者模式

type QueryOption func(db *gorm.DB) *gorm.DB


// 通過事務(wù)主鍵 id 進行查詢
func WithID(id uint) QueryOption {
    return func(db *gorm.DB) *gorm.DB {
        return db.Where("id = ?", id)
    }
}


// 通過事務(wù)狀態(tài)進行查詢
func WithStatus(status txmanager.ComponentTryStatus) QueryOption {
    return func(db *gorm.DB) *gorm.DB {
        return db.Where("status = ?", status.String())
    }
}

3.2.5 提交事務(wù)結(jié)果

接下來是在事務(wù)執(zhí)行完成后,將執(zhí)行結(jié)果更新到事務(wù)明細記錄中的處理方法:

// 提交事務(wù)的最終狀態(tài)
func (m *MockTXStore) TXSubmit(ctx context.Context, txID string, success bool) error {
    do := func(ctx context.Context, dao *expdao.TXRecordDAO, record *expdao.TXRecordPO) error {
        if success {
            record.Status = txmanager.TXSuccessful.String()
        } else {
            record.Status = txmanager.TXFailure.String()
        }
        return dao.UpdateTXRecord(ctx, record)
    }
    return m.dao.LockAndDo(ctx, gocast.ToUint(txID), do)
}

3.2.6 加/解全局鎖

最后,是實現(xiàn)整個 txStore 模塊加/解鎖的處理方法,內(nèi)部是基于 redis 實現(xiàn)的分布式鎖:

func (m *MockTXStore) Lock(ctx context.Context, expireDuration time.Duration) error {
    lock := redis_lock.NewRedisLock(pkg.BuildTXRecordLockKey(), m.client, redis_lock.WithExpireSeconds(int64(expireDuration.Seconds())))
    return lock.Lock(ctx)
}


func (m *MockTXStore) Unlock(ctx context.Context) error {
    lock := redis_lock.NewRedisLock(pkg.BuildTXRecordLockKey(), m.client)
    return lock.Unlock(ctx)
}

到這里為止,所有前置準備工作都已經(jīng)處理完成,接下來我們展示一個應(yīng)用到 gotcc 框架的使用示例。

3.3 使用代碼示例

由于我實現(xiàn)的 txStore 和 tccComponent 需要依賴到 mysql 和 redis 兩個組件,因此在這里需要輸入對應(yīng)的信息.

單測代碼相對比較簡單,其中一些要點通過注釋給出:

const (
    dsn      = "請輸入你的 mysql dsn"
    network  = "tcp"
    address  = "請輸入你的 redis ip"
    password = "請輸入你的 redis 密碼"
)


// 使用 tcc 單測代碼
func Test_TCC(t *testing.T) {
    // 創(chuàng)建 redis 客戶端
    redisClient := pkg.NewRedisClient(network, address, password)
    // 創(chuàng)建 mysql 客戶端
    mysqlDB, err := pkg.NewDB(dsn)
    if err != nil {
        t.Error(err)
        return
    }


    // 構(gòu)造三個 tcc 組件
    componentAID := "componentA"
    componentBID := "componentB"
    componentCID := "componentC"
    componentA := NewMockComponent(componentAID, redisClient)
    componentB := NewMockComponent(componentBID, redisClient)
    componentC := NewMockComponent(componentCID, redisClient)


    // 構(gòu)造出事務(wù)日志存儲模塊
    txRecordDAO := dao.NewTXRecordDAO(mysqlDB)
    txStore := NewMockTXStore(txRecordDAO, redisClient)


    // 構(gòu)造出 txManager 模塊
    txManager := txmanager.NewTXManager(txStore, txmanager.WithMonitorTick(time.Second))
    defer txManager.Stop()


    // 完成三個組件的注冊
    if err := txManager.Register(componentA); err != nil {
        t.Error(err)
        return
    }


    if err := txManager.Register(componentB); err != nil {
        t.Error(err)
        return
    }


    if err := txManager.Register(componentC); err != nil {
        t.Error(err)
        return
    }


    
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
    defer cancel()
    // 啟動分布式事務(wù)
    success, err := txManager.Transaction(ctx, []*txmanager.RequestEntity{
        {ComponentID: componentAID,
            Request: map[string]interface{}{
                "biz_id": componentAID + "_biz",
            },
        },
        {ComponentID: componentBID,
            Request: map[string]interface{}{
                "biz_id": componentBID + "_biz",
            },
        },
        {ComponentID: componentCID,
            Request: map[string]interface{}{
                "biz_id": componentCID + "_biz",
            },
        },
    }...)
    if err != nil {
        t.Errorf("tx failed, err: %v", err)
        return
    }
    if !success {
        t.Error("tx failed")
        return
    }
    
    // 分布式事務(wù)處理成功
    t.Log("success")
}

4 總結(jié)

到這里,本文正文內(nèi)容全部結(jié)束. 這里回頭再對本期分享的內(nèi)容做個總結(jié):

? 本期我基于 golang 從零到一搭建了一個 TCC 分布式事務(wù)框架的開源項目 gotcc,并圍繞著這個項目展開源碼級別的講解。

? 在 gotcc 中,對事務(wù)協(xié)調(diào)器 TXManager 相關(guān)的核心處理邏輯,如 Try-Confirm/Cancel 兩階段流程串聯(lián)、TCC 組件注冊、異步輪詢?nèi)蝿?wù)等內(nèi)容進行實現(xiàn),并將這部分核心內(nèi)容抽出放在了 SDK 中,供應(yīng)用方使用。

? 在 gotcc 中還定義了 TCC 組件和事務(wù)日志存儲模塊的抽象 interface,這部分內(nèi)容需要由應(yīng)用方自行實現(xiàn),并在使用 gotcc 時將其注入到 TXManager 當中。

責任編輯:武曉燕 來源: 小徐先生的編程世界
相關(guān)推薦

2021-06-08 12:46:27

分布式阿里TCC

2024-10-09 14:14:07

2024-06-28 09:07:19

2024-12-09 09:35:00

2024-01-05 07:28:50

分布式事務(wù)框架

2017-10-24 11:39:29

銀行轉(zhuǎn)賬數(shù)據(jù)庫分布式事務(wù)

2021-10-25 10:33:29

Python 開發(fā)編程語言

2024-06-04 10:58:30

2021-10-11 09:24:14

分布式架構(gòu)系統(tǒng)

2022-07-20 06:55:10

TCC分布式事務(wù)微服務(wù)

2018-11-23 09:25:00

TCC分布式事務(wù)

2022-01-12 10:02:02

TCC模式 Seata

2022-06-27 08:21:05

Seata分布式事務(wù)微服務(wù)

2022-06-21 08:27:22

Seata分布式事務(wù)

2017-07-26 15:08:05

大數(shù)據(jù)分布式事務(wù)

2021-11-26 06:43:19

Java分布式

2019-12-27 16:00:56

分布式事務(wù)框架Java

2019-10-10 09:16:34

Zookeeper架構(gòu)分布式

2020-12-09 09:14:57

SpringCloudSeata 分布式

2021-12-09 10:45:19

分布式事務(wù)框架
點贊
收藏

51CTO技術(shù)棧公眾號