Flink 原理與實(shí)現(xiàn):架構(gòu)和拓?fù)涓庞[
要了解一個(gè)系統(tǒng),一般都是從架構(gòu)開始。我們關(guān)心的問題是:系統(tǒng)部署成功后各個(gè)節(jié)點(diǎn)都啟動(dòng)了哪些服務(wù),各個(gè)服務(wù)之間又是怎么交互和協(xié)調(diào)的。下方是 Flink 集群啟動(dòng)后架構(gòu)圖。
當(dāng) Flink 集群啟動(dòng)后,首先會啟動(dòng)一個(gè) JobManger 和一個(gè)或多個(gè)的 TaskManager。由 Client 提交任務(wù)給 JobManager,JobManager 再調(diào)度任務(wù)到各個(gè) TaskManager 去執(zhí)行,然后 TaskManager 將心跳和統(tǒng)計(jì)信息匯報(bào)給 JobManager。TaskManager 之間以流的形式進(jìn)行數(shù)據(jù)的傳輸。上述三者均為獨(dú)立的 JVM 進(jìn)程。
- Client 為提交 Job 的客戶端,可以是運(yùn)行在任何機(jī)器上(與 JobManager 環(huán)境連通即可)。提交 Job 后,Client 可以結(jié)束進(jìn)程(Streaming的任務(wù)),也可以不結(jié)束并等待結(jié)果返回。
- JobManager 主要負(fù)責(zé)調(diào)度 Job 并協(xié)調(diào) Task 做 checkpoint,職責(zé)上很像 Storm 的 Nimbus。從 Client 處接收到 Job 和 JAR 包等資源后,會生成優(yōu)化后的執(zhí)行計(jì)劃,并以 Task 的單元調(diào)度到各個(gè) TaskManager 去執(zhí)行。
- TaskManager 在啟動(dòng)的時(shí)候就設(shè)置好了槽位數(shù)(Slot),每個(gè) slot 能啟動(dòng)一個(gè) Task,Task 為線程。從 JobManager 處接收需要部署的 Task,部署啟動(dòng)后,與自己的上游建立 Netty 連接,接收數(shù)據(jù)并處理。
可以看到 Flink 的任務(wù)調(diào)度是多線程模型,并且不同Job/Task混合在一個(gè) TaskManager 進(jìn)程中。雖然這種方式可以有效提高 CPU 利用率,但是個(gè)人不太喜歡這種設(shè)計(jì),因?yàn)椴粌H缺乏資源隔離機(jī)制,同時(shí)也不方便調(diào)試。類似 Storm 的進(jìn)程模型,一個(gè)JVM 中只跑該 Job 的 Tasks 實(shí)際應(yīng)用中更為合理。
Job 例子
本文所示例子為 flink-1.0.x 版本
我們使用 Flink 自帶的 examples 包中的 SocketTextStreamWordCount ,這是一個(gè)從 socket 流中統(tǒng)計(jì)單詞出現(xiàn)次數(shù)的例子。
-
首先,使用 netcat 啟動(dòng)本地服務(wù)器:
$ nc -l 9000
-
然后提交 Flink 程序
$ bin/flink run examples/streaming/SocketTextStreamWordCount.jar \ --hostname 10.218.130.9 \ --port 9000
在netcat端輸入單詞并監(jiān)控 taskmanager 的輸出可以看到單詞統(tǒng)計(jì)的結(jié)果。
SocketTextStreamWordCount 的具體代碼如下:
public static void main(String[] args) throws Exception{ // 檢查輸入 final ParameterTool params = ParameterTool.fromArgs(args); ... // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data DataStream<String> text = env.socketTextStream(params.get("hostname"), params.getInt("port"), '\n', 0); DataStream<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .keyBy(0) .sum(1); counts.print(); // execute program env.execute("WordCount from SocketTextStream Example"); }
我們將***一行代碼 env.execute 替換成 System.out.println(env.getExecutionPlan()); 并在本地運(yùn)行該代碼(并發(fā)度設(shè)為2),可以得到該拓?fù)涞倪壿媹?zhí)行計(jì)劃圖的 JSON 串,將該 JSON 串粘貼到 http://flink.apache.org/visualizer/ 中,能可視化該執(zhí)行圖。
但這并不是最終在 Flink 中運(yùn)行的執(zhí)行圖,只是一個(gè)表示拓?fù)涔?jié)點(diǎn)關(guān)系的計(jì)劃圖,在 Flink 中對應(yīng)了 SteramGraph。另外,提交拓?fù)浜螅úl(fā)度設(shè)為2)還能在 UI 中看到另一張執(zhí)行計(jì)劃圖,如下所示,該圖對應(yīng)了 Flink 中的 JobGraph。
Graph
看起來有點(diǎn)亂,怎么有這么多不一樣的圖。實(shí)際上,還有更多的圖。Flink 中的執(zhí)行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執(zhí)行圖。
- StreamGraph: 是根據(jù)用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓?fù)浣Y(jié)構(gòu)。
- JobGraph: StreamGraph經(jīng)過優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為,將多個(gè)符合條件的節(jié)點(diǎn) chain 在一起作為一個(gè)節(jié)點(diǎn),這樣可以減少數(shù)據(jù)在節(jié)點(diǎn)之間流動(dòng)所需要的序列化/反序列化/傳輸消耗。
- ExecutionGraph: JobManager 根據(jù) JobGraph 生成的分布式執(zhí)行圖,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)。
- 物理執(zhí)行圖: JobManager 根據(jù) ExecutionGraph 對 Job 進(jìn)行調(diào)度后,在各個(gè)TaskManager 上部署 Task 后形成的“圖”,并不是一個(gè)具體的數(shù)據(jù)結(jié)構(gòu)。
例如上文中的2個(gè)并發(fā)度(Source為1個(gè)并發(fā)度)的 SocketTextStreamWordCount 四層執(zhí)行圖的演變過程如下圖所示(點(diǎn)擊查看大圖):
這里對一些名詞進(jìn)行簡單的解釋。
-
StreamGraph:根據(jù)用戶通過 Stream API 編寫的代碼生成的最初的圖。
- StreamNode:用來代表 operator 的類,并具有所有相關(guān)的屬性,如并發(fā)度、入邊和出邊等。
- StreamEdge:表示連接兩個(gè)StreamNode的邊。
-
JobGraph:StreamGraph經(jīng)過優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。
- JobVertex:經(jīng)過優(yōu)化后符合條件的多個(gè)StreamNode可能會chain在一起生成一個(gè)JobVertex,即一個(gè)JobVertex包含一個(gè)或多個(gè)operator,JobVertex的輸入是JobEdge,輸出是IntermediateDataSet。
- IntermediateDataSet:表示JobVertex的輸出,即經(jīng)過operator處理產(chǎn)生的數(shù)據(jù)集。producer是JobVertex,consumer是JobEdge。
- JobEdge:代表了job graph中的一條數(shù)據(jù)傳輸通道。source 是 IntermediateDataSet,target 是 JobVertex。即數(shù)據(jù)通過JobEdge由IntermediateDataSet傳遞給目標(biāo)JobVertex。
-
ExecutionGraph:JobManager 根據(jù) JobGraph 生成的分布式執(zhí)行圖,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)。
- ExecutionJobVertex:和JobGraph中的JobVertex一一對應(yīng)。每一個(gè)ExecutionJobVertex都有和并發(fā)度一樣多的 ExecutionVertex。
- ExecutionVertex:表示ExecutionJobVertex的其中一個(gè)并發(fā)子任務(wù),輸入是ExecutionEdge,輸出是IntermediateResultPartition。
- IntermediateResult:和JobGraph中的IntermediateDataSet一一對應(yīng)。每一個(gè)IntermediateResult的IntermediateResultPartition個(gè)數(shù)等于該operator的并發(fā)度。
- IntermediateResultPartition:表示ExecutionVertex的一個(gè)輸出分區(qū),producer是ExecutionVertex,consumer是若干個(gè)ExecutionEdge。
- ExecutionEdge:表示ExecutionVertex的輸入,source是IntermediateResultPartition,target是ExecutionVertex。source和target都只能是一個(gè)。
- Execution:是執(zhí)行一個(gè) ExecutionVertex 的一次嘗試。當(dāng)發(fā)生故障或者數(shù)據(jù)需要重算的情況下 ExecutionVertex 可能會有多個(gè) ExecutionAttemptID。一個(gè) Execution 通過 ExecutionAttemptID 來唯一標(biāo)識。JM和TM之間關(guān)于 task 的部署和 task status 的更新都是通過 ExecutionAttemptID 來確定消息接受者。
-
物理執(zhí)行圖:JobManager 根據(jù) ExecutionGraph 對 Job 進(jìn)行調(diào)度后,在各個(gè)TaskManager 上部署 Task 后形成的“圖”,并不是一個(gè)具體的數(shù)據(jù)結(jié)構(gòu)。
- Task:Execution被調(diào)度后在分配的 TaskManager 中啟動(dòng)對應(yīng)的 Task。Task 包裹了具有用戶執(zhí)行邏輯的 operator。
- ResultPartition:代表由一個(gè)Task的生成的數(shù)據(jù),和ExecutionGraph中的IntermediateResultPartition一一對應(yīng)。
- ResultSubpartition:是ResultPartition的一個(gè)子分區(qū)。每個(gè)ResultPartition包含多個(gè)ResultSubpartition,其數(shù)目要由下游消費(fèi) Task 數(shù)和 DistributionPattern 來決定。
- InputGate:代表Task的輸入封裝,和JobGraph中JobEdge一一對應(yīng)。每個(gè)InputGate消費(fèi)了一個(gè)或多個(gè)的ResultPartition。
- InputChannel:每個(gè)InputGate會包含一個(gè)以上的InputChannel,和ExecutionGraph中的ExecutionEdge一一對應(yīng),也和ResultSubpartition一對一地相連,即一個(gè)InputChannel接收一個(gè)ResultSubpartition的輸出。
那么 Flink 為什么要設(shè)計(jì)這4張圖呢,其目的是什么呢?Spark 中也有多張圖,數(shù)據(jù)依賴圖以及物理執(zhí)行的DAG。其目的都是一樣的,就是解耦,每張圖各司其職,每張圖對應(yīng)了 Job 不同的階段,更方便做該階段的事情。我們給出更完整的 Flink Graph 的層次圖。
首先我們看到,JobGraph 之上除了 StreamGraph 還有 OptimizedPlan。OptimizedPlan 是由 Batch API 轉(zhuǎn)換而來的。StreamGraph 是由 Stream API 轉(zhuǎn)換而來的。為什么 API 不直接轉(zhuǎn)換成 JobGraph?因?yàn)?,Batch 和 Stream 的圖結(jié)構(gòu)和優(yōu)化方法有很大的區(qū)別,比如 Batch 有很多執(zhí)行前的預(yù)分析用來優(yōu)化圖的執(zhí)行,而這種優(yōu)化并不普適于 Stream,所以通過 OptimizedPlan 來做 Batch 的優(yōu)化會更方便和清晰,也不會影響 Stream。JobGraph 的責(zé)任就是統(tǒng)一 Batch 和 Stream 的圖,用來描述清楚一個(gè)拓?fù)鋱D的結(jié)構(gòu),并且做了 chaining 的優(yōu)化,chaining 是普適于 Batch 和 Stream 的,所以在這一層做掉。ExecutionGraph 的責(zé)任是方便調(diào)度和各個(gè) tasks 狀態(tài)的監(jiān)控和跟蹤,所以 ExecutionGraph 是并行化的 JobGraph。而“物理執(zhí)行圖”就是最終分布式在各個(gè)機(jī)器上運(yùn)行著的tasks了。所以可以看到,這種解耦方式極大地方便了我們在各個(gè)層所做的工作,各個(gè)層之間是相互隔離的。