開(kāi)源微服務(wù)編排框架:Netflix Conductor
本文主要介紹netflix conductor的基本概念和主要運(yùn)行機(jī)制。
一 簡(jiǎn)介
netflix conductor是基于JAVA語(yǔ)言編寫(xiě)的開(kāi)源流程引擎,用于架構(gòu)基于微服務(wù)的流程。它具備如下特性:
- 允許創(chuàng)建復(fù)雜的業(yè)務(wù)流程,流程中每個(gè)獨(dú)立的任務(wù)都是由一個(gè)微服務(wù)所實(shí)現(xiàn)。
- 基于JSON DSL 創(chuàng)建工作流,對(duì)任務(wù)的執(zhí)行進(jìn)行編排。
- 工作流在執(zhí)行的過(guò)程中可見(jiàn)、可追溯。
- 提供暫停、恢復(fù)、重啟等多種控制模型。
- 提供一種簡(jiǎn)單的方式來(lái)最大限度重用微服務(wù)。
- 擁有擴(kuò)展到百萬(wàn)流程并發(fā)運(yùn)行的服務(wù)能力。
- 通過(guò)隊(duì)列服務(wù)實(shí)現(xiàn)客戶端與服務(wù)端的分離。
- 支持 HTTP 或其他RPC協(xié)議進(jìn)行數(shù)據(jù)傳送
二 基本概念
1 Task
Task是最小執(zhí)行單元,承載了一段執(zhí)行邏輯,如發(fā)送HTTP請(qǐng)求等。
- System Task:被conductor服務(wù)執(zhí)行,這些任務(wù)的執(zhí)行與引擎在同一個(gè)JVM中。
- Worker Task:被worker服務(wù)執(zhí)行,執(zhí)行與引擎隔離開(kāi),worker通過(guò)隊(duì)列獲取任務(wù)后,執(zhí)行并更新結(jié)果狀態(tài)到引擎。Worker的實(shí)現(xiàn)是跨語(yǔ)言的,其使用Http協(xié)議與Server通信。
conductor提供了若干內(nèi)置SystemTask:
- 功能性Task:
- HTTP:發(fā)送http請(qǐng)求
- JSON_JQ_TRANSFORM:jq命令執(zhí)行,一般用戶json的轉(zhuǎn)換,具體可見(jiàn)jq官方文檔
- KAFKA_PUBLISH: 發(fā)布kafka消息
- 流程控制Task:
- SWITCH(原Decision):條件判斷分支,類(lèi)似于代碼中的switch case
- FORK:?jiǎn)?dòng)并行分支,用于調(diào)度并行任務(wù)
- JOIN:匯總并行分支,用于匯總并行任務(wù)
- DO_WHILE:循環(huán),類(lèi)似于代碼中的do while
- WAIT:一直在運(yùn)行中,直到外部時(shí)間觸發(fā)更新節(jié)點(diǎn)狀態(tài),可用于等待外部操作
- SUB_WORKFLOW:子流程,執(zhí)行其他的流程
- TERMINATE:結(jié)束流程,以指定輸出提前結(jié)束流程,可以與SWITCH節(jié)點(diǎn)配合使用,類(lèi)似代碼中的提前return語(yǔ)句
自定義Task:
- 對(duì)于System Task,Conductor提供了WorkflowSystemTask 抽象類(lèi),可以自定義擴(kuò)展實(shí)現(xiàn)。
- 對(duì)于Worker Task,可以實(shí)現(xiàn)conductor的client Worker接口實(shí)現(xiàn)執(zhí)行邏輯。
2 Workflow
- Workflow由一系列需要執(zhí)行的Task組成,conductor采用json來(lái)描述Task的流轉(zhuǎn)關(guān)系。
- 除基本的順序流程外,借助內(nèi)置的SWITCH、FORK、JOIN、DO_WIHLE、TERMINATE任務(wù),還能實(shí)現(xiàn)分支、并行、循環(huán)、提前結(jié)束等流程控制。
3 Input&Output
Task的輸入是一種映射,其作為工作流實(shí)例化的一部分或某些其他Task的輸出。允許將來(lái)自工作流或其他Task的輸入/輸出作為隨后執(zhí)行的Task的輸入。
- Task有自己的輸入和輸出,輸入輸出都是jsonobject類(lèi)型。
- Task可以引用其他Task的輸入輸出,使用${taskxxx.output}的方式引用。引用語(yǔ)法為json-path,除最基礎(chǔ)的${taskxxx.output}的值解析方式外,還支持其他復(fù)雜操作,如過(guò)濾等,具體見(jiàn)json-path語(yǔ)法。
- 啟動(dòng)Workflow時(shí)可以傳入流程的輸入數(shù)據(jù),Task可以通過(guò)${workflow.input}的方式引用。
Task實(shí)現(xiàn)原子操作的處理以及流程控制操作,Workflow定義描述Task的流轉(zhuǎn)關(guān)系,Task引用Workflow或者其它Task的輸入輸出。通過(guò)這些機(jī)制,conductor實(shí)現(xiàn)了JSON DSL對(duì)流程的描述。
三 整體架構(gòu)
主要分為幾個(gè)部分:
- Orchestrator: 負(fù)責(zé)流程的流轉(zhuǎn)調(diào)度工作;
- Management/Execution Service: 提供流程、任務(wù)的管理更新等操作;
- TaskQueues: 任務(wù)隊(duì)列,Orchestrator解析出來(lái)的待執(zhí)行Task會(huì)放到隊(duì)列中;
- Worker: 任務(wù)執(zhí)行worker,從TaskQueues中獲取任務(wù),通過(guò)Execution Service更新任務(wù)狀態(tài)與結(jié)果數(shù)據(jù);
- Database: 元數(shù)據(jù)&運(yùn)行時(shí)數(shù)據(jù)庫(kù),用于保存運(yùn)行時(shí)的Workflow、Task等狀態(tài)信息,以及流程任務(wù)定義的等原信息;
- Index: 索引數(shù)據(jù)庫(kù),用于存儲(chǔ)執(zhí)行歷史;
四 運(yùn)行模型
1 Task狀態(tài)轉(zhuǎn)移
- SCHEDULED:待調(diào)度,task放到隊(duì)列中還沒(méi)有被poll出來(lái)執(zhí)行時(shí)的狀態(tài)
- IN_PROGRESS:執(zhí)行中,被poll出來(lái)執(zhí)行但還沒(méi)有完成時(shí)的狀態(tài)
- COMPLETED:執(zhí)行完成
- FAILED:執(zhí)行失敗
- CANCELLED:被中止時(shí)為此狀態(tài),一般出現(xiàn)在兩種情況:
- 1.手動(dòng)中止流程時(shí),正在運(yùn)行中的task會(huì)被置為此狀態(tài);
- 2.多個(gè)fork分支,當(dāng)某個(gè)分支的task失敗時(shí),其它分支中正在運(yùn)行的task會(huì)被置為此狀態(tài);
2 任務(wù)隊(duì)列
任務(wù)的執(zhí)行(同步的系統(tǒng)任務(wù)除外)都會(huì)先添加到任務(wù)隊(duì)列中,是典型的生產(chǎn)者消費(fèi)者模式。
- 任務(wù)隊(duì)列,是一個(gè)帶有延遲、優(yōu)先級(jí)功能的隊(duì)列;
- 每種類(lèi)型的Task是一個(gè)單獨(dú)的隊(duì)列,此外,如果配置了domain、isolationGroup,還會(huì)拆分成多個(gè)隊(duì)列實(shí)現(xiàn)執(zhí)行隔離;
- decider service是生產(chǎn)者,其根據(jù)流程配置與當(dāng)前執(zhí)行情況,解析出可執(zhí)行的task后,添加到隊(duì)列;
- 任務(wù)執(zhí)行器(SystemTaskWorker、Worker)是消費(fèi)者,其長(zhǎng)輪詢對(duì)應(yīng)的隊(duì)列,從隊(duì)列中獲取任務(wù)執(zhí)行;
隊(duì)列接口可插拔,conductor提供了Dynomite 、MySQL、PostgreSQL的實(shí)現(xiàn)。
3 核心功能實(shí)現(xiàn)機(jī)制
conductor調(diào)度的核心是decider service,其根據(jù)當(dāng)前流程運(yùn)行的狀態(tài),解析出將要執(zhí)行的任務(wù)列表,將任務(wù)入隊(duì)交給worker執(zhí)行。
decide主要流程簡(jiǎn)化如下,詳細(xì)代碼見(jiàn)WorkflowExecutor.java的decide方法:
其中,調(diào)度任務(wù)處理流程簡(jiǎn)化如下,詳細(xì)代碼見(jiàn)WorkflowExecutor.java的scheduleTask方法:
decide的觸發(fā)時(shí)機(jī)
最主要的觸發(fā)時(shí)機(jī):
新啟動(dòng)執(zhí)行時(shí),會(huì)觸發(fā)decide操作
系統(tǒng)任務(wù)執(zhí)行完成時(shí),會(huì)觸發(fā)decide操作
Workder任務(wù)通過(guò)ExecutionService更新任務(wù)狀態(tài)時(shí),會(huì)觸發(fā)decide操作
流程控制節(jié)點(diǎn)的實(shí)現(xiàn)機(jī)制
1)Task & TaskMapper
對(duì)于每一個(gè)Task來(lái)說(shuō),都有Task和TaskMapper兩部分:
Task:任務(wù)的執(zhí)行邏輯代碼,它的作用是Task的執(zhí)行
TaskMapper:任務(wù)的映射邏輯代碼,它通過(guò)Task的定義配置、當(dāng)前實(shí)例的執(zhí)行狀態(tài)等信息,返回實(shí)際需要執(zhí)行的Task列表
對(duì)于一般的任務(wù)來(lái)說(shuō),TaskMapper返回的是就是Task本身,補(bǔ)充一些執(zhí)行實(shí)例的狀態(tài)信息。但是對(duì)于控制節(jié)點(diǎn)來(lái)說(shuō),會(huì)有不同的邏輯。
2)條件分支(SWITCH)的實(shí)現(xiàn)機(jī)制
SWITCH用于根據(jù)條件判斷,執(zhí)行不同的分支。
實(shí)際上,該節(jié)點(diǎn)的Task不做任何操作,TaskMapper根據(jù)分支條件,判斷出要走的分之后,返回對(duì)應(yīng)分支的第一個(gè)Task。
SwitchTaskMapper.java getMappedTasks方法關(guān)鍵代碼:
- // 待調(diào)度的Task list,最終返回結(jié)果
- List<Task> tasksToBeScheduled = new LinkedList<>();
- // evalResult是分支條件變量的值(case)
- // decisionCases是一個(gè)Map結(jié)構(gòu),key為分支的case值,value為對(duì)應(yīng)分支的任務(wù)定義list(分支內(nèi)的任務(wù)定義會(huì)有多個(gè))
- // 根據(jù)分支變量的實(shí)際值,獲取對(duì)應(yīng)分支的任務(wù)定義list
- List<WorkflowTask> selectedTasks = taskToSchedule.getDecisionCases().get(evalResult);
- // default的邏輯:如果獲取不到對(duì)應(yīng)的分支或者分支為空,則用默認(rèn)的分支
- if (selectedTasks == null || selectedTasks.isEmpty()) {
- selectedTasks = taskToSchedule.getDefaultCase();
- }
- if (selectedTasks != null && !selectedTasks.isEmpty()) {
- // 獲取分支的第一個(gè)(下標(biāo)0)task,返回給decider service去做調(diào)度(decider會(huì)把任務(wù)添加到隊(duì)列里,交給worker去執(zhí)行)
- WorkflowTask selectedTask = selectedTasks.get(0);
- // 調(diào)用了deciderService的getTasksToBeScheduled方法,此方法里又獲取到TaskMapper調(diào)用了getMappedTasks。這里采用了遞歸調(diào)用的方式,解析嵌套的Task
- List<Task> caseTasks = taskMapperContext.getDeciderService()
- .getTasksToBeScheduled(workflowInstance, selectedTask, retryCount, taskMapperContext.getRetryTaskId());
- tasksToBeScheduled.addAll(caseTasks);
- switchTask.getInputData().put("hasChildren", "true");
- }
- return tasksToBeScheduled;
3)并行(FORK)的實(shí)現(xiàn)機(jī)制
FORK用于開(kāi)啟多個(gè)并行分支。
實(shí)際上,該節(jié)點(diǎn)的Task不做任何操作,TaskMapper返回所有并行分支的第一個(gè)Task。
ForkJoinTaskMapper.java getMappedTasks關(guān)鍵代碼:
- // 待調(diào)度的Task list,最終返回結(jié)果
- List<Task> tasksToBeScheduled = new LinkedList<>();
- // 配置中的所有fork分支
- List<List<WorkflowTask>> forkTasks = taskToSchedule.getForkTasks();
- for (List<WorkflowTask> wfts : forkTasks) {
- // 每個(gè)分支取第一個(gè)Task
- WorkflowTask wft = wfts.get(0);
- // 調(diào)用了deciderService的getTasksToBeScheduled方法,此方法里又獲取到TaskMapper調(diào)用了getMappedTasks。這里采用了遞歸調(diào)用的方式,解析嵌套的Task
- List<Task> tasks2 = taskMapperContext.getDeciderService()
- .getTasksToBeScheduled(workflowInstance, wft, retryCount);
- tasksToBeScheduled.addAll(tasks2);
- }
- return tasksToBeScheduled;
總的來(lái)說(shuō),分支(SWITCH)、并行(FORK)節(jié)點(diǎn)本身沒(méi)有執(zhí)行邏輯,其通過(guò)TaskMapper返回到實(shí)際要執(zhí)行的Task,然后交給Decider Service處理。
重試的實(shí)現(xiàn)機(jī)制
重試和其延遲時(shí)間設(shè)置,都是借助任務(wù)隊(duì)列的功能實(shí)現(xiàn)的。
重試:將任務(wù)重新添加到任務(wù)隊(duì)列
重試的延遲時(shí)間:添加到任務(wù)隊(duì)列時(shí)設(shè)置延遲時(shí)間,延遲時(shí)間過(guò)后,任務(wù)才能在隊(duì)列中被poll出來(lái)執(zhí)行
五 完整性保障機(jī)制
由于調(diào)度過(guò)程中可能會(huì)出現(xiàn)因機(jī)器重啟、網(wǎng)絡(luò)異常、JVM崩潰等偶發(fā)情況,這些會(huì)導(dǎo)致的decide過(guò)程意外終止,流程執(zhí)行不完整,展現(xiàn)出如流程一直運(yùn)行中(實(shí)際已經(jīng)沒(méi)有在調(diào)度),或者其它狀態(tài)錯(cuò)誤等異?,F(xiàn)象。
1 WorkflowReconciler
針對(duì)這種情況,conductor有一個(gè)WorkflowReconciler,會(huì)定期嘗試decide所有正在運(yùn)行中的流程,修復(fù)流程執(zhí)行的一致性。此外,它還有一個(gè)作用是校驗(yàn)流程超時(shí)時(shí)間。
2 decideQueue
那么WorkflowReconciler是如何獲取到當(dāng)前運(yùn)行中的流程呢,答案是decideQueue。
decideQueue和任務(wù)隊(duì)列相同,也是一個(gè)具有延遲功能的隊(duì)列,其存放的是正在執(zhí)行中的流程的實(shí)例id。在任務(wù)開(kāi)始執(zhí)行時(shí)(包括新啟動(dòng)執(zhí)行、重試執(zhí)行、恢復(fù)執(zhí)行、重跑執(zhí)行等),會(huì)將實(shí)例id push到decideQueue中;在執(zhí)行結(jié)束(成功、失敗)時(shí),會(huì)從decideQueue中刪除實(shí)例id。
3 ExecutionLockService
WorkflowReconciler會(huì)定期嘗試decide所有正在運(yùn)行中的流程用于超時(shí)判斷、維護(hù)流程一致性。但是流程本身正常執(zhí)行也會(huì)觸發(fā)decide,如果同一個(gè)執(zhí)行同時(shí)觸發(fā)兩個(gè)decide,可能會(huì)導(dǎo)致?tīng)顟B(tài)混亂,執(zhí)行卡住等問(wèn)題。
conductor采用了鎖來(lái)解決這個(gè)問(wèn)題,其提供了單機(jī)LocalOnlyLock(基于信號(hào)量實(shí)現(xiàn))、redis分布式鎖(基于redission實(shí)現(xiàn))、zookeeper分布式鎖三種實(shí)現(xiàn)。
decide方法中最開(kāi)始會(huì)嘗試獲取鎖,如果獲取失敗則直接返回。通過(guò)鎖來(lái)保障不會(huì)對(duì)同一個(gè)流程實(shí)例并發(fā)執(zhí)行decide。
- if (!executionLockService.acquireLock(workflowId)) {
- return false;
- }
由于鎖是可配置的,可能會(huì)導(dǎo)致一個(gè)誤區(qū):?jiǎn)闻_(tái)機(jī)器的話不用配置鎖。其實(shí)單機(jī)也是需要配置鎖的,因?yàn)閃orkflowReconciler和流程正常執(zhí)行會(huì)產(chǎn)生沖突,可能會(huì)導(dǎo)致偶發(fā)的流程狀態(tài)混亂問(wèn)題。
參考:
Github: https://github.com/Netflix/conductor
官方文檔:https://netflix.github.io/conductor/
WorkflowReconciler:https://github.com/Netflix/conductor/blob/main/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowReconciler.java
WorkflowSystemTask:https://github.com/Netflix/conductor/blob/main/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java?spm=ata.21736010.0.0.2b501a3cYnrSfT&file=WorkflowSystemTask.java