火山引擎 ByteHouse:ClickHouse 如何保證海量數(shù)據(jù)一致性
背景
ClickHouse是一個(gè)開源的OLAP引擎,不僅被全球開發(fā)者廣泛使用,在字節(jié)各個(gè)應(yīng)用場(chǎng)景中也可以看到它的身影?;诟咝阅?、分布式特點(diǎn),ClickHouse可以滿足大規(guī)模數(shù)據(jù)的分析和查詢需求,因此字節(jié)研發(fā)團(tuán)隊(duì)以開源ClickHouse為基礎(chǔ),推出火山引擎云原生數(shù)據(jù)倉(cāng)庫(kù)ByteHouse。
在日常工作中,研發(fā)人員經(jīng)常會(huì)遇到業(yè)務(wù)鏈路過長(zhǎng),導(dǎo)致流程穩(wěn)定性和數(shù)據(jù)一致性難保障的問題,這在分布式、跨服務(wù)的場(chǎng)景中更為明顯。本篇文章提出針對(duì)這一問題的解決思路:在火山引擎ByteHouse中構(gòu)建輕量級(jí)流程引擎,來解決數(shù)據(jù)一致性問題。
使用輕量級(jí)流程引擎可以幫我們使用統(tǒng)一的標(biāo)準(zhǔn)來解決復(fù)雜業(yè)務(wù)鏈路的編排問題,不僅提高業(yè)務(wù)代碼的可讀性和復(fù)用性,還能更專注業(yè)務(wù)核心邏輯的開發(fā),讓整體流程更加標(biāo)準(zhǔn)化、規(guī)范化。
總結(jié)來說,使用流程引擎有以下優(yōu)勢(shì):
- 輕量級(jí),接入方便,內(nèi)存操作,性能有保障
- 易維護(hù),流程配置與業(yè)務(wù)分離,支持熱更新
- 易擴(kuò)展,豐富的執(zhí)行策略及算子支持
大體思路
圖片
上圖為ByteHouse企業(yè)版管理平臺(tái)功能架構(gòu)圖。從該功能架構(gòu)圖可以看出,ByteHouse核心能力都是依賴ClickHouse集群,對(duì)于集群節(jié)點(diǎn)多、數(shù)據(jù)計(jì)算量大的業(yè)務(wù)場(chǎng)景,容易出現(xiàn)節(jié)點(diǎn)狀態(tài)不一致的問題,因此保證ClickHouse集群間的狀態(tài)一致性是我們的核心訴求。
圖片
為了保證數(shù)據(jù)一致性,ByteHouse提供了以下能力:
- event engine: 事件處理中心
- workflow engine:輕量級(jí)流程引擎
- 對(duì)賬系統(tǒng)
保障數(shù)據(jù)一致性最簡(jiǎn)單的方式是通過狀態(tài)機(jī)來監(jiān)聽流程執(zhí)行過程:
- 首先,將所有的任務(wù)請(qǐng)求下發(fā)到event engine,由event engine將任務(wù)分發(fā)對(duì)應(yīng)的handler執(zhí)行,統(tǒng)一管理所有下發(fā)任務(wù)的生命周期,并提供異步重試、回滾補(bǔ)償?shù)裙δ?。流量匯總到event engine以后,會(huì)讓服務(wù)后續(xù)的業(yè)務(wù)擴(kuò)展更加便捷。
- 其次,對(duì)于比較復(fù)雜的任務(wù)請(qǐng)求,我們可以下發(fā)到workflow engine執(zhí)行,由workflow生成實(shí)例,并編排任務(wù)隊(duì)列,管理流程執(zhí)行實(shí)例的生命周期,統(tǒng)一失敗回滾,失敗重試。
- 最后,對(duì)于服務(wù)不可用等特殊場(chǎng)景產(chǎn)生的臟數(shù)據(jù),由對(duì)賬服務(wù)兜底。
圖片
架構(gòu)設(shè)計(jì)
在流程監(jiān)控的架構(gòu)設(shè)計(jì)中,主要包含以下:
- 流程管理層:主要負(fù)責(zé)流程配置的解析初始化,并完成編排策略的工作
- 策略behavior層:編排執(zhí)行節(jié)點(diǎn),并下發(fā)執(zhí)行任務(wù)到執(zhí)行器
- 執(zhí)行器:管理執(zhí)行節(jié)點(diǎn)執(zhí)行
- 執(zhí)行節(jié)點(diǎn):負(fù)責(zé)業(yè)務(wù)具體實(shí)現(xiàn)
圖片
實(shí)現(xiàn)方案
執(zhí)行節(jié)點(diǎn)
圖片
流程引擎的核心為“責(zé)任鏈”,按照責(zé)任鏈上的節(jié)點(diǎn)順序依次執(zhí)行所有任務(wù),所以我們需要的三個(gè)基本單元分別為:
- request:入?yún)?/li>
- processlist:流程執(zhí)行節(jié)點(diǎn)list
- response:出參
在研發(fā)工作中,我們時(shí)常會(huì)遇到以下問題:
- 如果同時(shí)出現(xiàn)了一個(gè)問題,node1、node2、node3之間的數(shù)據(jù)交互如何實(shí)現(xiàn)?
- 如果node1入?yún)?、出參與node2,node3不一樣該如何處理?
- 參數(shù)類型不同的node又該如何統(tǒng)一調(diào)度?
最簡(jiǎn)單的處理辦法,是讓node使用相同的上下文信息,將整個(gè)執(zhí)行node模版化。我們讓所有的執(zhí)行節(jié)點(diǎn)node實(shí)現(xiàn)相同的接口Delegation,統(tǒng)一使用相同的上下文executionContext作為執(zhí)行方法的入?yún)ⅰ?/p>
對(duì)于流程中的request和response,我們可以放入executionContext中,讓每個(gè)執(zhí)行節(jié)點(diǎn)都可以通過上下文操作response。
// Delegation -
type Delegation interface {
Execute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError
TryExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError
ConfirmExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError
CancelExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError
Code() string
Type() value.DelegationType
}
執(zhí)行策略
如果確定好了最小的執(zhí)行節(jié)點(diǎn),我們需要考慮到,業(yè)務(wù)場(chǎng)景并不會(huì)永遠(yuǎn)順序執(zhí)行node,再返回結(jié)果,流程執(zhí)行過程中跳轉(zhuǎn)、循環(huán)、并發(fā)執(zhí)行都是比較常見的操作??紤]不同業(yè)務(wù)場(chǎng)景復(fù)用性,我們?cè)趫?zhí)行節(jié)點(diǎn)之上加了一層執(zhí)行策略,用策略behaivor來重新編排觸發(fā)執(zhí)行節(jié)點(diǎn)的任務(wù)。
- 下圖將流程分成了behavior1和behavior2,分別對(duì)應(yīng)不同的策略。
- 簡(jiǎn)單的策略舉例:按順序執(zhí)行、并發(fā)執(zhí)行、循環(huán)執(zhí)行、條件跳轉(zhuǎn)執(zhí)行等。
- 我們可以根據(jù)自身業(yè)務(wù)實(shí)際需要定制,后續(xù)會(huì)有實(shí)例介紹。
圖片
// ActivityBehavior -
type ActivityBehavior interface {
Enter(ctx context.Context, executionContext ExecutionContextInterface, pvmActivity PvmActivity) apperror.AppError
Execute(ctx context.Context, executionContext ExecutionContextInterface, pvmActivity PvmActivity) apperror.AppError
Leave(ctx context.Context, executionContext ExecutionContextInterface, pvmActivity PvmActivity) apperror.AppError
Code() value.ActivityBehaviorCode
}
策略behavior提供有Enter,Execute,Leave三個(gè)接口,Enter負(fù)責(zé)生成執(zhí)行節(jié)點(diǎn)任務(wù)instance,Execute負(fù)責(zé)編排并觸發(fā)執(zhí)行任務(wù)instance操作,Leave負(fù)責(zé)跳轉(zhuǎn)到下一個(gè)behavior。
可以看出來策略behaivor的跳轉(zhuǎn)方式類似于鏈表,不斷執(zhí)行next方法,所以編碼過程中需要注意不要出現(xiàn)死循環(huán),小心stackoverflow。
Executor
執(zhí)行器Executor的主要作用是串聯(lián)執(zhí)行策略和執(zhí)行節(jié)點(diǎn),策略behavior將執(zhí)行的命令下發(fā)給Executor,由Executor對(duì)執(zhí)行節(jié)點(diǎn)的觸發(fā)操作。這里會(huì)根據(jù)執(zhí)行節(jié)點(diǎn)的type,映射到三種執(zhí)行節(jié)點(diǎn)的執(zhí)行方式,包含tcc,執(zhí)行一次,重試多次。
// DelegationExecutor -
type DelegationExecutor interface {
execute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError
postExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError
}
func (de *DefaultDelegationExecutor) execute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError {
delegationCode := executionContext.GetExecutionInstance().GetDelegationCode()
if len(delegationCode) == 0 || de.DelegationMap[delegationCode] == nil {
logger.Info(ctx, "DefaultDelegationExecutor delegation code not found,use default delegation", zap.String("delegationCode", delegationCode))
delegationCode = string(value.DefaultDelegation)
executionContext.GetExecutionInstance().SetDelegationCode(delegationCode)
}
return de.dumpExecute(ctx, executionContext, delegationCode)
}
func (de *DefaultDelegationExecutor) dumpExecute(ctx context.Context, executionContext ExecutionContextInterface, delegationCode string) apperror.AppError {
FireEvent(ctx, executionContext, value.ExecutionStart)
var err apperror.AppError
delegation := de.DelegationMap[delegationCode]
switch delegation.Type() {
case value.TccDelegation:
err = tccExecute(ctx, executionContext, delegation)
case value.SingleDelegation:
err = singleExecute(ctx, executionContext, delegation)
case value.RetryDelegation:
err = retryExecute(ctx, executionContext, delegation)
}
if err != nil {
logger.Error(ctx, "delegation.Execute_err", zap.Error(err))
return apperror.Trace(err)
}
FireEvent(ctx, executionContext, value.ExecutionEnd)
return nil
}
ExecutionContext
ExecutionContext上下文是用來記錄了流程執(zhí)行的所有細(xì)節(jié),包含以下:
- ProcessEngineConfigurationInterface: 流程定義信息
- ExecutionInstanceInterface: 執(zhí)行節(jié)點(diǎn)實(shí)例
- ActivityInstanceInterface: 執(zhí)行策略實(shí)例
- ProcessInstanceInterface: 流程實(shí)例
- request:入?yún)?/li>
- response:返回值
為了保證整個(gè)流程執(zhí)行的穩(wěn)定性,這里除了response之外,所以其他的實(shí)例參數(shù)都不建議開放寫接口,response可以用來存儲(chǔ)流程實(shí)例執(zhí)行過程中會(huì)產(chǎn)生的變量信息。
對(duì)于整個(gè)流程的定義ProcessEngineConfiguration,我們可以選擇最簡(jiǎn)單的方式,即在數(shù)據(jù)庫(kù)里,將配置信息映射成json字符串。當(dāng)然也可以選擇讀取配置文件,只要能滿足讀取方便,數(shù)據(jù)不丟即可。
// ExecutionContextInterface -
type ExecutionContextInterface interface {
GetProcessEngineConfiguration() ProcessEngineConfigurationInterface
SetProcessEngineConfiguration(processEngineConfiguration ProcessEngineConfigurationInterface)
GetExecutionInstance() instance.ExecutionInstanceInterface
SetExecutionInstance(executionInstance instance.ExecutionInstanceInterface)
GetActivityInstance() instance.ActivityInstanceInterface
SetActivityInstance(activityInstance instance.ActivityInstanceInterface)
GetProcessInstance() instance.ProcessInstanceInterface
SetProcessInstance(processInstance instance.ProcessInstanceInterface)
SetNeedPause(needPause bool)
IsNeedPause() bool
SetActivityIndex(activityIndex int)
GetActivityIndex() int
SetActivityBehaviorCode(activityBehaviorCode value.ActivityBehaviorCode)
GetActivityBehaviorCode() value.ActivityBehaviorCode
SetBizUniqueKey(bizUniqueKey string)
GetBizUniqueKey() string
GetRequest() map[string]interface{}
SetRequest(request map[string]interface{})
GetResponse() map[string]string
SetResponse(response map[string]string)
AtomicAddResponse(key string, value string)
}
Listener
監(jiān)聽器的主要作用是用來監(jiān)聽流程執(zhí)行中的重要參數(shù)信息。從上述executor接口可以看到fireEvent,它的作用是發(fā)送消息event,讓listener監(jiān)聽到對(duì)應(yīng)的event類型,完成一些定制化的行為。
類似于面向切面編程,我們可以在執(zhí)行節(jié)點(diǎn)的前后增加定制化的邏輯,如打日志、監(jiān)聽節(jié)點(diǎn)執(zhí)行時(shí)間,持久化流程中產(chǎn)生的response信息、增加鏈路追蹤等。
API
圖片
最后,我們將上述的內(nèi)容拼接串聯(lián)起來,主要提供三個(gè)接口:
- Start: 啟動(dòng)流程
- Signal: 暫停或是異常退出后,繼續(xù)執(zhí)行流程
- Abort: 強(qiáng)制中斷流程
process start(){
//1.get and create ProcessEngineConfigurationInterface 解析流程定義
//2.create processInstance 創(chuàng)建流程實(shí)例
//3.create ExecutionContext 創(chuàng)建執(zhí)行上下文
//4. lockstrategy trylock
//5. invoke process start
processinstance.start()
//6. persist processInstance and return
//7. lockstrategy unlock
}
processinstance start(){
// get behavior
// behavior enter
behavior.Enter(ctx, executionContext)
//behavior execute
behavior.Execute(ctx, executionContext)
//behavior leave
behavior.Leave(ctx, executionContext)
}
相比于start,signal需要讀取執(zhí)行的細(xì)節(jié)信息,找到之前失敗的執(zhí)行節(jié)點(diǎn)位置,并加載到上下文中,再繼續(xù)執(zhí)行。
對(duì)于失敗節(jié)點(diǎn)信息的持久化有兩種方式:第一,可以選擇在流程執(zhí)行結(jié)束持久化;第二,可以通過listener在每個(gè)執(zhí)行節(jié)點(diǎn)結(jié)束持久化。具體根據(jù)實(shí)際業(yè)務(wù)場(chǎng)景對(duì)于性能、數(shù)據(jù)一致性的要求做出抉擇。
并發(fā)場(chǎng)景考慮
- behavior策略中肯定會(huì)出現(xiàn)定制、并發(fā)、處理多個(gè)執(zhí)行節(jié)點(diǎn)到場(chǎng)景的問題,如果同時(shí)修改必定會(huì)造成數(shù)據(jù)錯(cuò)亂。簡(jiǎn)單的方法推薦使用帶鎖的容器存儲(chǔ),可以被修改的信息(response),此處使用的是github.com/bytedance/gopkg包里面封裝的skipmap。
- lockstrategy可以自己定義最適配業(yè)務(wù)場(chǎng)景的,最簡(jiǎn)單的方案是redis鎖,同時(shí)也考慮到系統(tǒng)異常退出后的恢復(fù)問題??梢詤⒖紃edis官網(wǎng)解決特殊情況下的鎖異常解決方案:https://redis.io/commands/setnx/
后續(xù)的工作
輕量級(jí)流程引擎的基本功能到此已經(jīng)實(shí)現(xiàn),后續(xù)的擴(kuò)展優(yōu)化可以圍繞以下方向進(jìn)行:
- 界面化展示,可以將鏈路執(zhí)行情況展示出來
- 策略behavior維度擴(kuò)展,適配各種業(yè)務(wù)場(chǎng)景
- 增加子流程的維度,可以復(fù)用原先的執(zhí)行邏輯
Demo示例
以下為簡(jiǎn)單的processconfiguration的配置信息,此處使用DefaultBehavior,即同步順序執(zhí)行策略。
{
"ProcessContentList":[
{
"Behavior":"DefaultBehavior",
"DelegationList":[
{
"Code":"sample1"
},
{
"Code":"sample2"
},
{
"Code":"sample3"
}
]
},
{
"Behavior":"DefaultBehavior",
"DelegationList":[
{
"Code":"sample4"
},
{
"Code":"sample5"
}
]
}
]
}
圖片
在listener里面加入日志,這樣可以追溯出整個(gè)流程的執(zhí)行流程,以便更好的監(jiān)控整個(gè)流程的運(yùn)行狀態(tài)。
實(shí)際使用
以ClickHouse集群縮容為例:
圖片
{
"ProcessContentList":[
// 查詢所有需要重分布的table
{
"Behavior":"DefaultBehavior",// 順序執(zhí)行
"DelegationList":[
{
"Code":"hor_reshard_table_loop"
}
]
},
// 遍歷所有table進(jìn)行數(shù)據(jù)的重分布
{
"LoopKey":"reshard_table_loop_key",
"Behavior":"NonBlockLoopBehavior",// 非阻塞循環(huán)處理
"DelegationList":[
{
"Code":"hor_reshard_table"
}
]
},
// 進(jìn)行刪除節(jié)點(diǎn)操作
{
"Behavior":"DefaultBehavior",
"DelegationList":[
{
"Code":"hor_start_remove_node"
},
{
"Code":"hor_prepare_node_vcloud",
"PostCode":"hor_rollback_remove_node_vcloud"http:// 統(tǒng)一失敗回滾處理
},
{
"Code":"hor_update_config_vcloud",
"PostCode":"hor_rollback_remove_node_vcloud"
},
{
"Code":"hor_set_cluster_running",
"PostCode":"hor_rollback_remove_node_vcloud"
},
{
"Code":"hor_release_node"
},
{
"Code":"hor_callback_bill"
}
]
}
]
}
總結(jié)
一個(gè)流程引擎適配所有的業(yè)務(wù)場(chǎng)景幾乎是不可能,除非接受復(fù)雜的方案設(shè)計(jì),而第三方流程引擎對(duì)于日常的業(yè)務(wù)開發(fā)顯得太笨重。輕量級(jí)流程引擎則會(huì)簡(jiǎn)化接入方式,減少了過多http請(qǐng)求帶來的性能損耗,更加靈活多變,追述問題也變得簡(jiǎn)單。
在ByteHouse中加入流程引擎的能力,能以較小的代價(jià)給業(yè)務(wù)更多重試的可能性,而不需要反復(fù)回滾,特別對(duì)于耗時(shí)很長(zhǎng)的任務(wù),能帶來更好用戶使用體驗(yàn)。除此之外,流程引擎還能將業(yè)務(wù)流程模版化,增加接口服務(wù)的復(fù)用性,使得業(yè)務(wù)代碼的可讀性、擴(kuò)展性得到提升,方便后期維護(hù)。
火山引擎云原生數(shù)據(jù)倉(cāng)庫(kù)ByteHouse是火山引擎旗下的一款云原生數(shù)據(jù)倉(cāng)庫(kù),為用戶提供極速分析體驗(yàn),能夠支撐實(shí)時(shí)數(shù)據(jù)分析和海量數(shù)據(jù)離線分析,同時(shí)還具備便捷的彈性擴(kuò)縮容能力,極致分析性能和豐富的企業(yè)級(jí)特性,助力客戶數(shù)字化轉(zhuǎn)型。