從零到一搭建 TCC 分布式事務(wù)框架
本著“理論先行,實踐緊隨”的理念,這里強烈建議大家先完成上期理論篇內(nèi)容的學習,再跟隨我的思路一起進入本期實戰(zhàn)篇的學習.
在寫這篇文章的過程中,我在另一邊并行完成了一個開源項目的搭建. 這個項目是基于 golang 從零到一實現(xiàn)的 TCC 分布式事務(wù)框架,當前于 github 的開源地址為:https://github.com/xiaoxuxiansheng/gotcc
本期分享內(nèi)容將會緊密圍繞著這個開源項目展開. 受限于個人水平,在項目實現(xiàn)以及文章講解中有著諸多不當之處,權(quán)當在此拋磚引玉,歡迎大家多多批評指正.
1 架構(gòu)設(shè)計
1.1 整體架構(gòu)
首先我們簡單回顧一下有關(guān)于分布式事務(wù)以及 TCC 的概念.
所謂事務(wù),對應(yīng)的語義是“要么什么都不做,要么全都做到位”,需要針對多個執(zhí)行動作,建立一套一氣呵成、不可拆解的運行機制.
在事務(wù)中包括的一些執(zhí)行動作,倘若涉及到跨數(shù)據(jù)庫、跨組件、跨服務(wù)等分布式操作,那我們就稱這樣的事務(wù)是分布式事務(wù).
分布式事務(wù)在實現(xiàn)上存在很多技術(shù)難點,是一個頗具挑戰(zhàn)的有趣話題. 目前業(yè)界也形成了一套相對成熟且普遍認同的解決方案,就是——TCC:Try-Confirm/Cancel.
TCC 本質(zhì)上是一種 2PC(two phase commitment protocal 兩階段提交)的實現(xiàn):
? 把分布式事務(wù)中,負責維護狀態(tài)數(shù)據(jù)變更的模塊,封裝成一個 TCC 組件
? 把數(shù)據(jù)的變更狀態(tài)拆分為對應(yīng) Try 操作的【凍結(jié)】、對應(yīng) Confirm 操作的【成功】以及對應(yīng) Cancel 操作的【失敗回滾】
? 抽出一個統(tǒng)籌全局的事務(wù)協(xié)調(diào)者角色 TXManager. 在執(zhí)行分布式事務(wù)時,分為兩個階段:
? 階段 I:先對所有組件執(zhí)行 Try 操作
- ? 階段 II:根據(jù)上階段 Try 操作的執(zhí)行結(jié)果,決定本輪執(zhí)行 Confirm 還是 Cancel 操作
圖片
在我們實現(xiàn) TCC 框架的實戰(zhàn)環(huán)節(jié)中,首先需要明確的事情是:
? 哪部分內(nèi)容在 TCC 架構(gòu)中屬于通用的流程,這部分內(nèi)容可以抽取出來放在 sdk 中,以供后續(xù)復(fù)用
? 哪部分內(nèi)容需要給使用方預(yù)留出足夠的自由度,由使用方自行實現(xiàn),然后和通用 sdk 進行接軌.
最終,這兩部分內(nèi)容明確如下:
? 在 TCC sdk 中實現(xiàn)的通用邏輯包含了和事務(wù)協(xié)調(diào)器 txManager 有關(guān)的核心流程
? 事務(wù)協(xié)調(diào)器 TXManager 開啟事務(wù)以及 try-confirm/cancel 的 2PC 流程串聯(lián)
? 事務(wù)協(xié)調(diào)器 TXManager 異步輪詢?nèi)蝿?wù),用于推進事務(wù)從中間態(tài)走向終態(tài)
? TCC 組件的注冊流程
? 需要預(yù)定義事務(wù)日志存儲模塊 TXStore 的實現(xiàn)規(guī)范(聲明 interface)
? 需要預(yù)定義 TCC 組件 TCCComponent 的實現(xiàn)規(guī)范(聲明 interface)
? TCC 組件和 TXStore 兩部分內(nèi)容需要由使用方自行實現(xiàn):
? 使用方自行實現(xiàn) TCCComponent 類,包括其 Try、Confirm、Cancel 方法的執(zhí)行邏輯
? 使用方自行實現(xiàn)具體的 TXStore 日志存儲模塊. 可以根據(jù)實際需要,選型合適的存儲組件和存儲方式
圖片
1.2 TCC Component
下面是關(guān)于 TCC 組件的定位:
? 這部分內(nèi)容需要由用戶自行實現(xiàn),并在 TXManager 啟動時將其注冊到注冊中心 RegistryCenter 當中.
? 當使用方調(diào)用 TXManager 開啟事務(wù)時,會通過 RegistryCenter 獲取這些組件,并對其進行使用
? TCC 組件需要具備的能力包括如下幾項:
圖片
1.3 TX Manager
下面是關(guān)于事務(wù)協(xié)調(diào)器 TXManager 的定位.
- ? TXManager 是整個 TCC 架構(gòu)中最核心的角色
- ? TXManager 作為 gotcc 的統(tǒng)一入口,供使用方執(zhí)行啟動事務(wù)和注冊組件的操作
- ? TXManager 作為中樞系統(tǒng)分別和 RegisterCenter、TXStore 交互
- ? TXManager 需要串聯(lián)起整個 Try-Confirm/Canel 的 2PC 調(diào)用流程
- ? TXManager 需要運行異步輪詢?nèi)蝿?wù),推進未完成的事務(wù)走向終態(tài)
圖片
1.4 TX Store
TXStore 是用于存儲和管理事務(wù)日志明細記錄的模塊:
? 需要支持事務(wù)明細數(shù)據(jù)的 CRUD 能力
? 通常情況下,底層需要應(yīng)用到實際的存儲組件作為支持
? TXStore 在 gotcc 的 sdk 中體現(xiàn)為一個抽象的 interface. 需要由用戶完成具體類的實現(xiàn),并將其注入到 TXManager 當中.
圖片
1.5 RegistryCenter
最后是 TCC 組件的注冊管理中心 RegistryCenter,負責給 txManager 提供出注冊和查詢 TCC 組件的能力.
圖片
2 TXManager 核心源碼講解
理完了基本的流程和概念,下面我們一起開啟一線實戰(zhàn)環(huán)節(jié).
2.1 類圖
首先捋一下,在 gotcc 核心 sdk 中,涉及到的幾個核心類:
- ? TXManager:事務(wù)協(xié)調(diào)器,class
- ? TXStore:事務(wù)日志存儲模塊,interface
- ? registryCenter:TCC 組件注冊管理中心,class
- ? TCCComponent:TCC 組件,interface
通過下面的 UML 類圖,展示一下幾個核心類之間的關(guān)聯(lián)性:
圖片
2.2 核心類定義
圖片
2.2.1 TXManager
下面是關(guān)于事務(wù)協(xié)調(diào)器 TXManager 的幾個核心字段:
? txStore:內(nèi)置的事務(wù)日志存儲模塊,需要由使用方實現(xiàn)并完成注入
? registryCenter:TCC 組件的注冊管理中心
? opts:內(nèi)聚了一些 TXManager 的配置項,可以由使用方自定義,并通過 option 注入
? ctx:用于反映 TXManager 運行生命周期的的 context,當 ctx 終止時,異步輪詢?nèi)蝿?wù)也會隨之退出
? stop:用于停止 txManager 的控制器. 當 stop 被調(diào)用后,異步輪詢?nèi)蝿?wù)會被終止
type TXManager struct {
ctx context.Context
stop context.CancelFunc
opts *Options
txStore TXStore
registryCenter *registryCenter
}
func NewTXManager(txStore TXStore, opts ...Option) *TXManager {
ctx, cancel := context.WithCancel(context.Background())
txManager := TXManager{
opts: &Options{},
txStore: txStore,
registryCenter: newRegistryCenter(),
ctx: ctx,
stop: cancel,
}
for _, opt := range opts {
opt(txManager.opts)
}
repair(txManager.opts)
go txManager.run()
return &txManager
}
2.2.2 RegistryCenter
注冊中心 registryCenter 中的內(nèi)容很簡單,通過 map 存儲所有注冊進來的 TCC 組件,要求各組件都有獨立的組件 ID;通過一把讀寫鎖 rwMutex 保護 map 的并發(fā)安全性
type registryCenter struct {
mux sync.RWMutex
components map[string]component.TCCComponent
}
func newRegistryCenter() *registryCenter {
return ?istryCenter{
components: make(map[string]component.TCCComponent),
}
}
2.2.3 TXStore
下面 gotcc sdk 中,對事務(wù)日志存儲模塊 TXStore interface 的定義,這個點很重要,要求后續(xù)使用方在實現(xiàn)具體的 TXStore 模塊時,需要實現(xiàn)這里所羅列出來的所有方法,并且要保證實現(xiàn)方法滿足預(yù)期的功能:
? CreateTX:創(chuàng)建一條事務(wù)明細記錄,會在入?yún)⒅袀魅氡臼聞?wù)涉及的 TCC 組件列表,同時需要在出參中返回全局唯一的事務(wù) id
? TXUpdate:更新一條事務(wù)明細記錄. 這里指的更新,針對于,事務(wù)中某個 TCC 組件 Try 響應(yīng)狀態(tài)的更新
? TXSubmit:提交一條事務(wù)的執(zhí)行結(jié)果. 要么置為成功,要么置為失敗
? GetHangingTXs:獲取所有未完成的事務(wù)明細記錄
? GetTX:根據(jù)事務(wù) id,獲取指定的一條事務(wù)明細記錄
? Lock:鎖住整個事務(wù)日志存儲模塊(要求為分布式鎖)
? Unlock:解鎖整個事務(wù)日志存儲模塊
type TXStore interface {
// 創(chuàng)建一條事務(wù)明細記錄
CreateTX(ctx context.Context, components ...component.TCCComponent) (txID string, err error)
// 更新事務(wù)進度:
// 規(guī)則為:倘若有一個 component try 操作執(zhí)行失敗,則整個事務(wù)失??;倘若所有 component try 操作執(zhí)行成功,則事務(wù)成功
TXUpdate(ctx context.Context, txID string, componentID string, accept bool) error
// 提交事務(wù)的最終狀態(tài)
TXSubmit(ctx context.Context, txID string, success bool) error
// 獲取到所有處于中間態(tài)的事務(wù)
GetHangingTXs(ctx context.Context) ([]*Transaction, error)
// 獲取指定的一筆事務(wù)
GetTX(ctx context.Context, txID string) (*Transaction, error)
// 鎖住事務(wù)日志表
Lock(ctx context.Context, expireDuration time.Duration) error
// 解鎖事務(wù)日志表
Unlock(ctx context.Context) error
}
2.3 注冊組件
下面是注冊 TCC 組件的處理流程:
首先,使用方通過 TXManager 對外暴露的公開方法 Register,開啟注冊流程,傳入對應(yīng)的 TCCComponent:
func (t *TXManager) Register(component component.TCCComponent) error {
return t.registryCenter.register(component)
}
? TXManager 會調(diào)用注冊中心 registeryCenter 的 register 方法,將對應(yīng) component 注入到 map 中. 這里有兩個點值得一提:
? Register 方法可以并發(fā)使用,其內(nèi)部會通過 rwMutex 維護 map 的并發(fā)安全性
? TCC 組件不能重復(fù)注冊,即不能存在重復(fù)的 component id
func (r *registryCenter) register(component component.TCCComponent) error {
r.mux.Lock()
defer r.mux.Unlock()
if _, ok := r.components[component.ID()]; ok {
return errors.New("repeat component id")
}
r.components[component.ID()] = component
return nil
}
上游 TXManager 可以通過 component id,進行 TCC 組件的查詢. 倘若某個 component id 不存在,則會拋出錯誤:
func (r *registryCenter) getComponents(componentIDs ...string) ([]component.TCCComponent, error) {
components := make([]component.TCCComponent, 0, len(componentIDs))
r.mux.RLock()
defer r.mux.RUnlock()
for _, componentID := range componentIDs {
component, ok := r.components[componentID]
if !ok {
return nil, fmt.Errorf("component id: %s not existed", componentID)
}
components = append(components, component)
}
return components, nil
}
2.4 事務(wù)主流程
下面進入最核心的部分,介紹一下整個分布式事務(wù)的運行流程.
2.4.1 主流程
用戶可以通過 txManager.Transaction 方法,一鍵啟動動一個分布式事務(wù)流程,其中包含的幾個核心步驟展示如下圖:
圖片
txManager.Transaction 方法是用戶啟動分布式事務(wù)的入口,需要在入?yún)⒅新暶鞅敬问聞?wù)涉及到的組件以及需要在 Try 流程中傳遞給對應(yīng)組件的請求參數(shù):
type RequestEntity struct {
// 組件名稱
ComponentID string `json:"componentName"`
// Try 請求時傳遞的參數(shù)
Request map[string]interface{} `json:"request"`
}
txManager.Transaction 對應(yīng)源碼如下,核心步驟均給出了注釋. 核心的 try-confirm/cancel 流程,會在后續(xù)的 txManager.twoPhaseCommit 方法中展開.
// 啟動事務(wù)
func (t *TXManager) Transaction(ctx context.Context, reqs ...*RequestEntity) (bool, error) {
// 1 限制分布式事務(wù)執(zhí)行時長
tctx, cancel := context.WithTimeout(ctx, t.opts.Timeout)
defer cancel()
// 2 獲得所有的涉及使用的 tcc 組件
componentEntities, err := t.getComponents(tctx, reqs...)
if err != nil {
return false, err
}
// 3 調(diào)用 txStore 模塊,創(chuàng)建新的事務(wù)明細記錄,并取得全局唯一的事務(wù) id
txID, err := t.txStore.CreateTX(tctx, componentEntities.ToComponents()...)
if err != nil {
return false, err
}
// 4. 開啟兩階段提交流程:try-confirm/cancel
return t.twoPhaseCommit(ctx, txID, componentEntities)
}
2.4.2 2PC 串聯(lián)
此處涉及 try-confirm/cancel 流程的串聯(lián),可以說是整個 gotcc 框架的精髓所在,請大家細品斟酌.
對應(yīng)流程圖展示如下,方法源碼中也給出了相對詳細的注釋:
圖片
func (t *TXManager) twoPhaseCommit(ctx context.Context, txID string, componentEntities ComponentEntities) (bool, error) {
// 1 創(chuàng)建子 context 用于管理子 goroutine 生命周期
// 手握 cancel 終止器,能保證在需要的時候終止所有子 goroutine 生命周期
cctx, cancel := context.WithCancel(ctx)
defer cancel()
// 2 創(chuàng)建一個 chan,用于接收子 goroutine 傳遞的錯誤
errCh := make(chan error)
// 3 并發(fā)啟動,批量執(zhí)行各 tcc 組件的 try 流程
go func() {
// 通過 waitGroup 進行多個子 goroutine 的匯總
var wg sync.WaitGroup
for _, componentEntity := range componentEntities {
// shadow
componentEntity := componentEntity
wg.Add(1)
// 并發(fā)執(zhí)行各組件的 try 流程
go func() {
defer wg.Done()
resp, err := componentEntity.Component.Try(cctx, &component.TCCReq{
ComponentID: componentEntity.Component.ID(),
TXID: txID,
Data: componentEntity.Request,
})
// 出現(xiàn) tcc 組件執(zhí)行 try 操作失敗,則需要對事務(wù)明細記錄進行更新,同時把錯誤通過 chan 拋給父 goroutine
if err != nil || !resp.ACK {
// 對對應(yīng)的事務(wù)進行更新
_ = t.txStore.TXUpdate(cctx, txID, componentEntity.Component.ID(), false)
errCh <- fmt.Errorf("component: %s try failed", componentEntity.Component.ID())
return
}
// try 請求成功,則對事務(wù)明細記錄進行更新. 倘若更新失敗,也要視為錯誤,拋給父 goroutine
if err = t.txStore.TXUpdate(cctx, txID, componentEntity.Component.ID(), true); err != nil {
errCh <- err
}
}()
}
// 等待所有子 goroutine 運行完成
wg.Wait()
// 關(guān)閉 errCh,告知父 goroutine 所有任務(wù)已運行完成的信息
close(errCh)
}()
successful := true
// 4 通過 chan,阻塞子 goroutine 執(zhí)行完成
// 4.1 但凡出現(xiàn)一個子 goroutine 遇到了錯誤,則會提前接收到錯誤,并調(diào)用 cancel 方法熔斷其他所有子 goroutine 流程
// 4.2 倘若所有子 goroutine 都執(zhí)行成功,則會通過 chan 的關(guān)閉事件推進流程,對應(yīng) err 為 nil
if err := <-errCh; err != nil {
// 只要有一筆 try 請求出現(xiàn)問題,其他的都進行終止
cancel()
successful = false
}
// 5 異步執(zhí)行第二階段的 confirm/cancel 流程
// 之所以是異步,是因為實際上在第一階段 try 的響應(yīng)結(jié)果塵埃落定時,對應(yīng)事務(wù)的成敗已經(jīng)有了定論
// 第二階段能夠容忍異步執(zhí)行的原因在于,執(zhí)行失敗時,還有輪詢?nèi)蝿?wù)進行兜底
go t.advanceProgressByTXID(txID)
// 6 響應(yīng)結(jié)果
// 6.1 倘若所有 try 請求都成功,則 successful 為 try,事務(wù)成功
// 6.2 但凡有一個 try 請求處理出現(xiàn)問題,successful 為 false,事務(wù)失敗
return successful, nil
}
2.4.3 事務(wù)進度推進
當一筆事務(wù)在第一階段中所有的 Try 請求都有了響應(yīng)后,就需要根據(jù)第一階段的結(jié)果,執(zhí)行第二階段的 Confirm 或者 Cancel 操作,并且將事務(wù)狀態(tài)推進為成功或失敗的終態(tài):
- ? 倘若所有組件的 Try 響應(yīng)都是成功,則需要批量調(diào)用組件的 Confirm 接口,并在這之后將事務(wù)狀態(tài)更新為成功
- ? 倘若存在某個組件 Try 響應(yīng)為失敗,則需要批量調(diào)用組件的 Cancel 接口,并在這之后將事務(wù)狀態(tài)更新為失敗
- ? 倘若當前事務(wù)已執(zhí)行超時,同樣需要批量調(diào)用組件的 Cancel 接口,并在這之后將事務(wù)狀態(tài)更新為失敗
圖片
// 傳入一個事務(wù) id 推進其進度
func (t *TXManager) advanceProgressByTXID(txID string) error {
// 獲取事務(wù)日志明細
tx, err := t.txStore.GetTX(t.ctx, txID)
if err != nil {
return err
}
// 推進進度
return t.advanceProgress(tx)
}
// 傳入一個事務(wù) id 推進其進度
func (t *TXManager) advanceProgress(tx *Transaction) error {
// 1 推斷出事務(wù)當前的狀態(tài)
// 1.1 倘若所有組件 try 都成功,則為 successful
// 1.2 倘若存在組件 try 失敗,則為 failure
// 1.3 倘若事務(wù)超時了,則為 failure
// 1.4 否則事務(wù)狀態(tài)為 hanging
txStatus := tx.getStatus(time.Now().Add(-t.opts.Timeout))
// hanging 狀態(tài)的事務(wù)暫時不處理
if txStatus == TXHanging {
return nil
}
// 2 根據(jù)事務(wù)是否成功,定制不同的處理函數(shù)
success := txStatus == TXSuccessful
var confirmOrCancel func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error)
var txAdvanceProgress func(ctx context.Context) error
if success {
// 如果事務(wù)成功,則需要對組件進行 confirm
confirmOrCancel = func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error) {
return component.Confirm(ctx, tx.TXID)
}
// 如果事務(wù)成功,則需要在最后更新事務(wù)日志記錄的狀態(tài)為成功
txAdvanceProgress = func(ctx context.Context) error {
return t.txStore.TXSubmit(ctx, tx.TXID, true)
}
} else {
// 如果事務(wù)失敗,則需要對組件進行 cancel
confirmOrCancel = func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error) {
return component.Cancel(ctx, tx.TXID)
}
// 如果事務(wù)失敗,則需要在最后更新事務(wù)日志記錄的狀態(tài)為失敗
txAdvanceProgress = func(ctx context.Context) error {
return t.txStore.TXSubmit(ctx, tx.TXID, false)
}
}
// 3 批量調(diào)用組件,執(zhí)行第二階段的 confirm/cancel 操作
for _, component := range tx.Components {
// 獲取對應(yīng)的 tcc component
components, err := t.registryCenter.getComponents(component.ComponentID)
if err != nil || len(components) == 0 {
return errors.New("get tcc component failed")
}
resp, err := confirmOrCancel(t.ctx, components[0])
if err != nil {
return err
}
if !resp.ACK {
return fmt.Errorf("component: %s ack failed", component.ComponentID)
}
}
// 4 二階段 confirm/cancel 操作都執(zhí)行完成后,對事務(wù)狀態(tài)進行提交
return txAdvanceProgress(t.ctx)
}
2.5 異步輪詢流程
接下來聊聊 txManager 的異步輪詢流程. 這個流程同樣非常重要,是支撐 txManager 魯棒性的重要機制.
倘若存在事務(wù)已經(jīng)完成第一階段 Try 操作的執(zhí)行,但是第二階段沒執(zhí)行成功,則需要由異步輪詢流程進行兜底處理,為事務(wù)補齊第二階段的操作,并將事務(wù)狀態(tài)更新為終態(tài)
2.5.1 啟動時機
異步輪詢?nèi)蝿?wù)是在 txManager 的初始化流程中啟動的,通過異步 goroutine 持久運行:
func NewTXManager(txStore TXStore, opts ...Option) *TXManager {
ctx, cancel := context.WithCancel(context.Background())
txManager := TXManager{
opts: &Options{},
txStore: txStore,
registryCenter: NewRegistryCenter(),
ctx: ctx,
stop: cancel,
}
for _, opt := range opts {
opt(txManager.opts)
}
repair(txManager.opts)
go txManager.run()
return &txManager
}
2.5.2 輪詢流程
異步輪詢?nèi)蝿?wù)運行時,基于 for 循環(huán) + select 多路復(fù)用的方式,實現(xiàn)定時任務(wù)的執(zhí)行.
輪詢的時間間隔會根據(jù)一輪任務(wù)處理過程中是否出現(xiàn)錯誤,而進行動態(tài)調(diào)整. 這里調(diào)整規(guī)則指的是:當一次處理流程中發(fā)生了錯誤,就需要調(diào)大當前節(jié)點輪詢的時間間隔,讓其他節(jié)點的異步輪詢?nèi)蝿?wù)得到更大的執(zhí)行機會.
圖片
func (t *TXManager) run() {
var tick time.Duration
var err error
// 1 for 循環(huán)自旋式運行任務(wù)
for {
// 如果處理過程中出現(xiàn)了錯誤,需要增長輪詢時間間隔
if err == nil {
tick = t.opts.MonitorTick
} else {
tick = t.backOffTick(tick)
}
// select 多路復(fù)用
select {
// 倘若 txManager.ctx 被終止,則異步輪詢?nèi)蝿?wù)退出
case <-t.ctx.Done():
return
// 2 等待 tick 對應(yīng)時長后,開始執(zhí)行任務(wù)
case <-time.After(tick):
// 對 txStore 加分布式鎖,避免分布式服務(wù)下多個服務(wù)節(jié)點的輪詢?nèi)蝿?wù)重復(fù)執(zhí)行
if err = t.txStore.Lock(t.ctx, t.opts.MonitorTick); err != nil {
// 取鎖失敗時(大概率被其他節(jié)點占有),不需要增加 tick 時長
err = nil
continue
}
// 3 獲取處于 hanging 狀態(tài)的事務(wù)
var txs []*Transaction
if txs, err = t.txStore.GetHangingTXs(t.ctx); err != nil {
_ = t.txStore.Unlock(t.ctx)
continue
}
// 4 批量推進事務(wù)進度
err = t.batchAdvanceProgress(txs)
_ = t.txStore.Unlock(t.ctx)
}
}
}
有關(guān)于輪詢時間間隔的退避謙讓策略為:每次對時間間隔進行翻倍,封頂為初始時長的 8 倍:
func (t *TXManager) backOffTick(tick time.Duration) time.Duration {
tick <<= 1
if threshold := t.opts.MonitorTick << 3; tick > threshold {
return threshold
}
return tick
}
2.5.3 批量推進事務(wù)進度
下面是異步輪詢?nèi)蝿?wù)批量推進事務(wù)第二階段執(zhí)行的流程,核心是開啟多個 goroutine 并發(fā)對多項事務(wù)進行處理:
func (t *TXManager) batchAdvanceProgress(txs []*Transaction) error {
// 1 創(chuàng)建一個 chan,用于接收子 goroutine 傳輸?shù)?err
errCh := make(chan error)
go func() {
// 2 通過 waitGroup 聚合多個子 groutine
var wg sync.WaitGroup
for _, tx := range txs {
// shadow
tx := tx
wg.Add(1)
go func() {
defer wg.Done()
// 3 推進每筆事務(wù)的進度
if err := t.advanceProgress(tx); err != nil {
// 遇到錯誤則投遞到 errCh
errCh <- err
}
}()
}
// 4 收口等待所有子 goroutine 執(zhí)行完成
wg.Wait()
// 5 所有子 goroutine 執(zhí)行完成后關(guān)閉 chan,喚醒阻塞等待的父 goroutine
close(errCh)
}()
// 記錄遇到的第一個錯誤
var firstErr error
// 6 父 goroutine 通過 chan 阻塞在這里,直到所有 goroutine 執(zhí)行完成,chan 被 close 才能往下
for err := range errCh {
// 記錄遇到的第一個錯誤
if firstErr != nil {
continue
}
firstErr = err
}
// 7 返回錯誤,核心是標識執(zhí)行過程中,是否發(fā)生過錯誤
return firstErr
}
3 GOTCC 使用案例講解
從第 3 章開始,我們從實際應(yīng)用 gotcc 框架的使用方視角出發(fā),對所需要實現(xiàn)的模塊進行定義,然后給出應(yīng)用 gotcc 框架的代碼示例.
3.1 TCC 組件實現(xiàn)
首先,我們對 TCC 組件的具體實現(xiàn)類進行定義:
3.1.1 類定義
定義一個 MockComponent 類,其中內(nèi)置了 redis 客戶端,用于完成一些狀態(tài)數(shù)據(jù)的存取.
圖片
// 實現(xiàn)的 tcc 組件
type MockComponent struct {
// tcc 組件唯一標識 id,構(gòu)造時由使用方傳入
id string
// redis 客戶端
client *redis_lock.Client
}
func NewMockComponent(id string, client *redis_lock.Client) *MockComponent {
return &MockComponent{
id: id,
client: client,
}
}
// 返回 tcc 組件的唯一標識 id
func (m *MockComponent) ID() string {
return m.id
}
3.1.2 Try 流程
下面實現(xiàn)一下 TCC 組件的 Try 方法,關(guān)鍵要點已于代碼中通過注釋的形式給出:
圖片
func (m *MockComponent) Try(ctx context.Context, req *component.TCCReq) (*component.TCCResp, error) {
// 1 基于 txID 維度加 redis 分布式鎖
lock := redis_lock.NewRedisLock(pkg.BuildTXLockKey(m.id, req.TXID), m.client)
if err := lock.Lock(ctx); err != nil {
return nil, err
}
defer func() {
_ = lock.Unlock(ctx)
}()
// 2 基于 txID 冪等性去, 需要對事務(wù)的狀態(tài)進行檢查
txStatus, err := m.client.Get(ctx, pkg.BuildTXKey(m.id, req.TXID))
if err != nil && !errors.Is(err, redis_lock.ErrNil) {
return nil, err
}
res := component.TCCResp{
ComponentID: m.id,
TXID: req.TXID,
}
switch txStatus {
case TXTried.String(), TXConfirmed.String(): // 重復(fù)的 try 請求,給予成功的響應(yīng)
res.ACK = true
return &res, nil
case TXCanceled.String(): // 此前該事務(wù)已 cancel,則拒絕本次 try 請求
return &res, nil
default:
}
// 3 建立 txID 與 bizID 的關(guān)聯(lián)
bizID := gocast.ToString(req.Data["biz_id"])
if _, err = m.client.Set(ctx, pkg.BuildTXDetailKey(m.id, req.TXID), bizID); err != nil {
return nil, err
}
// 4 把 bizID 對應(yīng)的業(yè)務(wù)數(shù)據(jù)置為凍結(jié)態(tài)
reply, err := m.client.SetNX(ctx, pkg.BuildDataKey(m.id, req.TXID, bizID), DataFrozen.String())
if err != nil {
return nil, err
}
// 倘若數(shù)據(jù)此前已凍結(jié)或已使用,則拒絕本次 try 請求
if reply != 1 {
return &res, nil
}
// 5 更新當前組件下的事務(wù)狀態(tài)為 tried
_, err = m.client.Set(ctx, pkg.BuildTXKey(m.id, req.TXID), TXTried.String())
if err != nil {
return nil, err
}
// 6 給予接收 try 請求的響應(yīng)
res.ACK = true
return &res, nil
}
3.1.3 Confirm 流程
下面實現(xiàn)一下 TCC 組件的 Confirm 方法,關(guān)鍵要點已于代碼中通過注釋的形式給出:
圖片
func (m *MockComponent) Confirm(ctx context.Context, txID string) (*component.TCCResp, error) {
// 1 基于 txID 維度加鎖
lock := redis_lock.NewRedisLock(pkg.BuildTXLockKey(m.id, txID), m.client)
if err := lock.Lock(ctx); err != nil {
return nil, err
}
defer func() {
_ = lock.Unlock(ctx)
}()
// 2. 校驗事務(wù)狀態(tài),要求對應(yīng)組件下,事務(wù)此前的狀態(tài)為 tried
txStatus, err := m.client.Get(ctx, pkg.BuildTXKey(m.id, txID))
if err != nil {
return nil, err
}
res := component.TCCResp{
ComponentID: m.id,
TXID: txID,
}
switch txStatus {
case TXConfirmed.String(): // 事務(wù)狀態(tài)已 confirm,直接冪等響應(yīng)為成功
res.ACK = true
return &res, nil
case TXTried.String(): // 只有事務(wù)狀態(tài)為 try 才是合法的,會對程序放行
default: // 其他情況直接拒絕,ack 為 false
return &res, nil
}
// 3 獲取事務(wù)對應(yīng)的 bizID
bizID, err := m.client.Get(ctx, pkg.BuildTXDetailKey(m.id, txID))
if err != nil {
return nil, err
}
// 4. 校驗業(yè)務(wù)數(shù)據(jù)此前狀態(tài)是否為凍結(jié)
dataStatus, err := m.client.Get(ctx, pkg.BuildDataKey(m.id, txID, bizID))
if err != nil {
return nil, err
}
// 如果此前非凍結(jié)態(tài),則拒絕本次請求
if dataStatus != DataFrozen.String() {
return &res, nil
}
// 5 把業(yè)務(wù)數(shù)據(jù)的更新操作置為 successful
if _, err = m.client.Set(ctx, pkg.BuildDataKey(m.id, txID, bizID), DataSuccessful.String()); err != nil {
return nil, err
}
// 6 把對應(yīng)組件下的事務(wù)狀態(tài)更新為成功,這一步哪怕失敗了也不阻塞主流程
_, _ = m.client.Set(ctx, pkg.BuildTXKey(m.id, txID), TXConfirmed.String())
// 7 處理成功,給予成功的響應(yīng)
res.ACK = true
return &res, nil
}
3.1.4 Cancel 流程
下面實現(xiàn)一下 TCC 組件的 Cancel 方法,關(guān)鍵要點已于代碼中通過注釋的形式給出:
func (m *MockComponent) Cancel(ctx context.Context, txID string) (*component.TCCResp, error) {
// 1 基于 txID 維度加鎖
lock := redis_lock.NewRedisLock(pkg.BuildTXLockKey(m.id, txID), m.client)
if err := lock.Lock(ctx); err != nil {
return nil, err
}
defer func() {
_ = lock.Unlock(ctx)
}()
// 2 校驗事務(wù)狀態(tài),只要不是 confirmed,都允許被置為 canceld
txStatus, err := m.client.Get(ctx, pkg.BuildTXKey(m.id, txID))
if err != nil && !errors.Is(err, redis_lock.ErrNil) {
return nil, err
}
// 倘若組件內(nèi)事務(wù)此前的狀態(tài)為 confirmed,則說明流程有異常.
if txStatus == TXConfirmed.String() {
return nil, fmt.Errorf("invalid tx status: %s, txid: %s", txStatus, txID)
}
// 3 根據(jù)事務(wù)獲取對應(yīng)的 bizID
bizID, err := m.client.Get(ctx, pkg.BuildTXDetailKey(m.id, txID))
if err != nil {
return nil, err
}
// 4 刪除對應(yīng)的 frozen 凍結(jié)記錄,代表對數(shù)據(jù)執(zhí)行了回滾操作
if err = m.client.Del(ctx, pkg.BuildDataKey(m.id, txID, bizID)); err != nil {
return nil, err
}
// 5 把事務(wù)狀態(tài)更新為 canceled
_, err = m.client.Set(ctx, pkg.BuildTXKey(m.id, txID), TXCanceled.String())
if err != nil {
return nil, err
}
// 6 給予處理成功的 ack
return &component.TCCResp{
ACK: true,
ComponentID: m.id,
TXID: txID,
}, nil
}
3.2 TX Store 實現(xiàn)
接下來是關(guān)于事務(wù)日志存儲模塊 TXStore 的具體實現(xiàn):
3.2.1 類定義
聲明了一個 MockTXStore 類,里面通過 mysql 存儲事務(wù)日志明細數(shù)據(jù),通過 redis 實現(xiàn) TXStore 模塊的分布式鎖.
其中和事務(wù)日志明細數(shù)據(jù)庫直接交互的操作被封裝在 TXRecordDAO 當中.
圖片
// TXStore 模塊具體實現(xiàn)
type MockTXStore struct {
// redis 客戶端,用于實現(xiàn)分布式鎖
client *redis_lock.Client
// 事務(wù)日志存儲 DAO 層
dao *expdao.TXRecordDAO
}
func NewMockTXStore(dao *expdao.TXRecordDAO, client *redis_lock.Client) *MockTXStore {
return &MockTXStore{
dao: dao,
client: client,
}
}
事務(wù)日志存儲 DAO 層:
type TXRecordDAO struct {
db *gorm.DB
}
func NewTXRecordDAO(db *gorm.DB) *TXRecordDAO {
return &TXRecordDAO{
db: db,
}
}
接下來是關(guān)于事務(wù)日志明細記錄的持久化對象(PO,Persistent Object)模型定義:
? 內(nèi)置了 gorm.Model,包含了主鍵 ID、創(chuàng)建時間 CreatedAt、更新時間 UpdatedAt、刪除時間 DeletedAt 幾個字段
? 事務(wù)狀態(tài) Status,標識事務(wù)所處的狀態(tài),分為進行中 hanging、成功 successful、失敗 failure
? 組件 Try 響應(yīng)明細記錄 ComponentTryStatuses: 記錄了事務(wù)下各組件 Try 請求響應(yīng)結(jié)果,會以一個 json 字符串的格式存儲,其真實的類型為 map[string]*ComponentTryStatus
type TXRecordPO struct {
gorm.Model
Status string `gorm:"status"`
ComponentTryStatuses string `gorm:"component_try_statuses"`
}
func (t TXRecordPO) TableName() string {
return "tx_record"
}
type ComponentTryStatus struct {
ComponentID string `json:"componentID"`
TryStatus string `json:"tryStatus"`
}
下面是事務(wù)日志明細表的建表語句:
CREATE TABLE IF NOT EXISTS `tx_record`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主鍵ID',
`status` varchar(16) NOT NULL COMMENT '事務(wù)狀態(tài) hanging/successful/failure',
`component_try_statuses` json DEFAULT NULL COMMENT '各組件 try 接口請求狀態(tài) hanging/successful/failure',
`deleted_at` datetime DEFAULT NULL COMMENT '刪除時間',
`created_at` datetime NOT NULL COMMENT '創(chuàng)建時間',
`updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',
PRIMARY KEY (`id`) USING BTREE COMMENT '主鍵索引',
KEY `idx_status` (`status`) COMMENT '事務(wù)狀態(tài)索引'
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT '事務(wù)日志記錄';
3.2.2 創(chuàng)建事務(wù)記錄
接下來是通過過 TXStore 模塊創(chuàng)建一條事務(wù)明細記錄的實現(xiàn)代碼:
func (m *MockTXStore) CreateTX(ctx context.Context, components ...component.TCCComponent) (string, error) {
// 創(chuàng)建一個記錄組件 try 響應(yīng)結(jié)果的 map,其中以組件 id 為 key
componentTryStatuses := make(map[string]*expdao.ComponentTryStatus, len(components))
for _, component := range components {
componentTryStatuses[component.ID()] = &expdao.ComponentTryStatus{
ComponentID: component.ID(),
TryStatus: txmanager.TryHanging.String(),
}
}
statusesBody, _ := json.Marshal(componentTryStatuses)
// 創(chuàng)建事務(wù)明細記錄 po 示例,調(diào)用 dao 模塊將記錄落庫
txID, err := m.dao.CreateTXRecord(ctx, &expdao.TXRecordPO{
Status: txmanager.TXHanging.String(),
ComponentTryStatuses: string(statusesBody),
})
if err != nil {
return "", err
}
return gocast.ToString(txID), nil
}
dao 層創(chuàng)建事務(wù)明細記錄的實現(xiàn)代碼:
func (t *TXRecordDAO) CreateTXRecord(ctx context.Context, record *TXRecordPO) (uint, error) {
return record.ID, t.db.WithContext(ctx).Model(&TXRecordPO{}).Create(record).Error
}
3.2.3 事務(wù)明細更新
下面是更新一筆事務(wù)明細的方法,其處理流程是:
? 針對這筆事務(wù)記錄加寫鎖
? 根據(jù)組件的 try 響應(yīng)結(jié)果,對 json字符串進行更新
? 將事務(wù)記錄寫回表中.
func (m *MockTXStore) TXUpdate(ctx context.Context, txID string, componentID string, accept bool) error {
// 后續(xù)需要閉包傳入執(zhí)行函數(shù)
do := func(ctx context.Context, dao *expdao.TXRecordDAO, record *expdao.TXRecordPO) error {
componentTryStatuses := make(map[string]*expdao.ComponentTryStatus)
_ = json.Unmarshal([]byte(record.ComponentTryStatuses), &componentTryStatuses)
if accept {
componentTryStatuses[componentID].TryStatus = txmanager.TrySucceesful.String()
} else {
componentTryStatuses[componentID].TryStatus = txmanager.TryFailure.String()
}
newBody, _ := json.Marshal(componentTryStatuses)
record.ComponentTryStatuses = string(newBody)
return dao.UpdateTXRecord(ctx, record)
}
_txID := gocast.ToUint(txID)
return m.dao.LockAndDo(ctx, _txID, do)
}
// 通過 gorm 實現(xiàn)數(shù)據(jù)記錄加寫鎖,并執(zhí)行閉包函數(shù)的操作:
func (t *TXRecordDAO) LockAndDo(ctx context.Context, id uint, do func(ctx context.Context, dao *TXRecordDAO, record *TXRecordPO) error) error {
// 開啟事務(wù)
return t.db.Transaction(func(tx *gorm.DB) error {
defer func() {
if err := recover(); err != nil {
tx.Rollback()
}
}()
// 加寫鎖
var record TXRecordPO
if err := tx.Set("gorm:query_option", "FOR UPDATE").WithContext(ctx).First(&record, id).Error; err != nil {
return err
}
txDAO := NewTXRecordDAO(tx)
// 執(zhí)行閉包函數(shù)
return do(ctx, txDAO, &record)
})
}
// 更新一條事務(wù)日志數(shù)據(jù)記錄
func (t *TXRecordDAO) UpdateTXRecord(ctx context.Context, record *TXRecordPO) error {
return t.db.WithContext(ctx).Updates(record).Error
}
3.2.4 查詢事務(wù)
接下來是查詢事務(wù)的兩個方法:
// 根據(jù)事務(wù) id 查詢指定的一筆事務(wù)明細記錄:
func (m *MockTXStore) GetTX(ctx context.Context, txID string) (*txmanager.Transaction, error) {
// 通過 option 在查詢條件中注入事務(wù) id
records, err := m.dao.GetTXRecords(ctx, expdao.WithID(gocast.ToUint(txID)))
if err != nil {
return nil, err
}
if len(records) != 1 {
return nil, errors.New("get tx failed")
}
// 對各組件 try 明細內(nèi)容進行反序列化
componentTryStatuses := make(map[string]*expdao.ComponentTryStatus)
_ = json.Unmarshal([]byte(records[0].ComponentTryStatuses), &componentTryStatuses)
components := make([]*txmanager.ComponentTryEntity, 0, len(componentTryStatuses))
for _, tryItem := range componentTryStatuses {
components = append(components, &txmanager.ComponentTryEntity{
ComponentID: tryItem.ComponentID,
TryStatus: txmanager.ComponentTryStatus(tryItem.TryStatus),
})
}
return &txmanager.Transaction{
TXID: txID,
Status: txmanager.TXStatus(records[0].Status),
Components: components,
CreatedAt: records[0].CreatedAt,
}, nil
}
// 獲取全量處于中間態(tài)的事務(wù)明細記錄
func (m *MockTXStore) GetHangingTXs(ctx context.Context) ([]*txmanager.Transaction, error) {
// 通過 option 在查詢條件中指定事務(wù)狀態(tài)為 hanging
records, err := m.dao.GetTXRecords(ctx, expdao.WithStatus(txmanager.TryHanging))
if err != nil {
return nil, err
}
txs := make([]*txmanager.Transaction, 0, len(records))
for _, record := range records {
// 對各組件 try 響應(yīng)結(jié)果進行反序列化
componentTryStatuses := make(map[string]*expdao.ComponentTryStatus)
_ = json.Unmarshal([]byte(record.ComponentTryStatuses), &componentTryStatuses)
components := make([]*txmanager.ComponentTryEntity, 0, len(componentTryStatuses))
for _, component := range componentTryStatuses {
components = append(components, &txmanager.ComponentTryEntity{
ComponentID: component.ComponentID,
TryStatus: txmanager.ComponentTryStatus(component.TryStatus),
})
}
txs = append(txs, &txmanager.Transaction{
TXID: gocast.ToString(record.ID),
Status: txmanager.TXHanging,
CreatedAt: record.CreatedAt,
Components: components,
})
}
return txs, nil
}
在 dao 層實現(xiàn)了一個通用的事務(wù)日志查詢方法,通過 option 模式實現(xiàn)查詢條件的靈活組裝:
func (t *TXRecordDAO) GetTXRecords(ctx context.Context, opts ...QueryOption) ([]*TXRecordPO, error) {
db := t.db.WithContext(ctx).Model(&TXRecordPO{})
for _, opt := range opts {
db = opt(db)
}
var records []*TXRecordPO
return records, db.Scan(&records).Error
}
下面是關(guān)于 option 的具體定義,更多有關(guān)于這種模式的設(shè)計實現(xiàn)思路,可以參見我之前發(fā)表的文章——Golang 設(shè)計模式之建造者模式
type QueryOption func(db *gorm.DB) *gorm.DB
// 通過事務(wù)主鍵 id 進行查詢
func WithID(id uint) QueryOption {
return func(db *gorm.DB) *gorm.DB {
return db.Where("id = ?", id)
}
}
// 通過事務(wù)狀態(tài)進行查詢
func WithStatus(status txmanager.ComponentTryStatus) QueryOption {
return func(db *gorm.DB) *gorm.DB {
return db.Where("status = ?", status.String())
}
}
3.2.5 提交事務(wù)結(jié)果
接下來是在事務(wù)執(zhí)行完成后,將執(zhí)行結(jié)果更新到事務(wù)明細記錄中的處理方法:
// 提交事務(wù)的最終狀態(tài)
func (m *MockTXStore) TXSubmit(ctx context.Context, txID string, success bool) error {
do := func(ctx context.Context, dao *expdao.TXRecordDAO, record *expdao.TXRecordPO) error {
if success {
record.Status = txmanager.TXSuccessful.String()
} else {
record.Status = txmanager.TXFailure.String()
}
return dao.UpdateTXRecord(ctx, record)
}
return m.dao.LockAndDo(ctx, gocast.ToUint(txID), do)
}
3.2.6 加/解全局鎖
最后,是實現(xiàn)整個 txStore 模塊加/解鎖的處理方法,內(nèi)部是基于 redis 實現(xiàn)的分布式鎖:
func (m *MockTXStore) Lock(ctx context.Context, expireDuration time.Duration) error {
lock := redis_lock.NewRedisLock(pkg.BuildTXRecordLockKey(), m.client, redis_lock.WithExpireSeconds(int64(expireDuration.Seconds())))
return lock.Lock(ctx)
}
func (m *MockTXStore) Unlock(ctx context.Context) error {
lock := redis_lock.NewRedisLock(pkg.BuildTXRecordLockKey(), m.client)
return lock.Unlock(ctx)
}
到這里為止,所有前置準備工作都已經(jīng)處理完成,接下來我們展示一個應(yīng)用到 gotcc 框架的使用示例。
3.3 使用代碼示例
由于我實現(xiàn)的 txStore 和 tccComponent 需要依賴到 mysql 和 redis 兩個組件,因此在這里需要輸入對應(yīng)的信息.
單測代碼相對比較簡單,其中一些要點通過注釋給出:
const (
dsn = "請輸入你的 mysql dsn"
network = "tcp"
address = "請輸入你的 redis ip"
password = "請輸入你的 redis 密碼"
)
// 使用 tcc 單測代碼
func Test_TCC(t *testing.T) {
// 創(chuàng)建 redis 客戶端
redisClient := pkg.NewRedisClient(network, address, password)
// 創(chuàng)建 mysql 客戶端
mysqlDB, err := pkg.NewDB(dsn)
if err != nil {
t.Error(err)
return
}
// 構(gòu)造三個 tcc 組件
componentAID := "componentA"
componentBID := "componentB"
componentCID := "componentC"
componentA := NewMockComponent(componentAID, redisClient)
componentB := NewMockComponent(componentBID, redisClient)
componentC := NewMockComponent(componentCID, redisClient)
// 構(gòu)造出事務(wù)日志存儲模塊
txRecordDAO := dao.NewTXRecordDAO(mysqlDB)
txStore := NewMockTXStore(txRecordDAO, redisClient)
// 構(gòu)造出 txManager 模塊
txManager := txmanager.NewTXManager(txStore, txmanager.WithMonitorTick(time.Second))
defer txManager.Stop()
// 完成三個組件的注冊
if err := txManager.Register(componentA); err != nil {
t.Error(err)
return
}
if err := txManager.Register(componentB); err != nil {
t.Error(err)
return
}
if err := txManager.Register(componentC); err != nil {
t.Error(err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
// 啟動分布式事務(wù)
success, err := txManager.Transaction(ctx, []*txmanager.RequestEntity{
{ComponentID: componentAID,
Request: map[string]interface{}{
"biz_id": componentAID + "_biz",
},
},
{ComponentID: componentBID,
Request: map[string]interface{}{
"biz_id": componentBID + "_biz",
},
},
{ComponentID: componentCID,
Request: map[string]interface{}{
"biz_id": componentCID + "_biz",
},
},
}...)
if err != nil {
t.Errorf("tx failed, err: %v", err)
return
}
if !success {
t.Error("tx failed")
return
}
// 分布式事務(wù)處理成功
t.Log("success")
}
4 總結(jié)
到這里,本文正文內(nèi)容全部結(jié)束. 這里回頭再對本期分享的內(nèi)容做個總結(jié):
? 本期我基于 golang 從零到一搭建了一個 TCC 分布式事務(wù)框架的開源項目 gotcc,并圍繞著這個項目展開源碼級別的講解。
? 在 gotcc 中,對事務(wù)協(xié)調(diào)器 TXManager 相關(guān)的核心處理邏輯,如 Try-Confirm/Cancel 兩階段流程串聯(lián)、TCC 組件注冊、異步輪詢?nèi)蝿?wù)等內(nèi)容進行實現(xiàn),并將這部分核心內(nèi)容抽出放在了 SDK 中,供應(yīng)用方使用。
? 在 gotcc 中還定義了 TCC 組件和事務(wù)日志存儲模塊的抽象 interface,這部分內(nèi)容需要由應(yīng)用方自行實現(xiàn),并在使用 gotcc 時將其注入到 TXManager 當中。