聊聊Flink:Flink的運(yùn)行時(shí)架構(gòu)
一、運(yùn)行時(shí)架構(gòu)
上一篇我們可以看到Flink的核心組件的Deploy層,該層主要涉及了Flink的部署模式,F(xiàn)link支持多種部署模式:本地、集群(Standalone/YARN)、云(GCE/EC2)。
圖片
- Local(本地):?jiǎn)螜C(jī)模式,一般本地開(kāi)發(fā)調(diào)試使用,像我們程序?qū)懙腤ordCountStream那個(gè)例子,直接運(yùn)行main方法啟動(dòng)。
- Cluster(集群)
- Standalone(獨(dú)立模式):Flink自帶集群,自己管理資源調(diào)度,生產(chǎn)環(huán)境也會(huì)有所應(yīng)用。
- YARN(YARN模式):計(jì)算資源統(tǒng)一由Hadoop YARN管理,生產(chǎn)環(huán)境應(yīng)用較多。
- Cloud(云端):AliCloud Realtime Compute、Amazon EMR、Huawei Cloud Stream Service 等。
我們這里主要來(lái)介紹Cluster集群的兩種模式Standalone、YARN。
二、YARN集群架構(gòu)
在講解Flink集群架構(gòu)之前,我們先了解一下YARN集群架構(gòu),我覺(jué)得是很有必要的。YARN集群總體上是經(jīng)典的主/從(Master/Slave)架構(gòu),主要由ResourceManager、NodeManager、ApplicationMaster和Container等幾個(gè)組件構(gòu)成。
圖片
2.1 ResourceManager
以后臺(tái)進(jìn)程的形式運(yùn)行,負(fù)責(zé)對(duì)集群資源進(jìn)行統(tǒng)一管理和任務(wù)調(diào)度。ResourceManager的主要職責(zé)如下:
- 接收來(lái)自客戶端的請(qǐng)求。
- 啟動(dòng)和管理各個(gè)應(yīng)用程序的ApplicationMaster。
- 接收來(lái)自ApplicationMaster的資源申請(qǐng),并為其分配Container。
- 管理NodeManager,接收來(lái)自NodeManager的資源和節(jié)點(diǎn)健康情況匯報(bào)。
2.2 NodeManager
集群中每個(gè)節(jié)點(diǎn)上的資源和任務(wù)管理器,以后臺(tái)進(jìn)程的形式運(yùn)行。它會(huì)定時(shí)向ResourceManager匯報(bào)本節(jié)點(diǎn)上的資源(內(nèi)存、CPU)使用情況和各個(gè)Container的運(yùn)行狀態(tài),同時(shí)會(huì)接收并處理來(lái)自ApplicationMaster的Container啟動(dòng)/停止等請(qǐng)求。NodeManager不會(huì)監(jiān)視任務(wù),它僅監(jiān)視Container中的資源使用情況,例如。如果一個(gè)Container消耗的內(nèi)存比最初分配的更多,就會(huì)結(jié)束該Container。
2.3 Task
應(yīng)用程序具體執(zhí)行的任務(wù)。一個(gè)應(yīng)用程序可能有多個(gè)任務(wù),例如一個(gè)MapReduce程序可以有多個(gè)Map任務(wù)和多個(gè)Reduce任務(wù)。
2.4 Container
YARN中資源分配的基本單位,封裝了CPU和內(nèi)存資源的一個(gè)容器,相當(dāng)于一個(gè)Task運(yùn)行環(huán)境的抽象。從實(shí)現(xiàn)上看,Container是一個(gè)Java抽象類,定義了資源信息。應(yīng)用程序的Task將會(huì)被發(fā)布到Container中運(yùn)行,從而限定了Task使用的資源量。
一個(gè)應(yīng)用程序所需的Container分為兩類:運(yùn)行ApplicationMaster的Container和運(yùn)行各類Task的Container。前者是由ResourceManager向內(nèi)部的資源調(diào)度器申請(qǐng)和啟動(dòng)的,后者是由ApplicationMaster向ResourceManager申請(qǐng)的,并由ApplicationMaster請(qǐng)求NodeManager進(jìn)行啟動(dòng)。
我們可以將Container類比成數(shù)據(jù)庫(kù)連接池中的連接,需要的時(shí)候進(jìn)行申請(qǐng),使用完畢后進(jìn)行釋放,而不需要每次獨(dú)自創(chuàng)建。
2.5 ApplicationMaster
ApplicationMaster可在Container內(nèi)運(yùn)行任何類型的Task。例如,MapReduce ApplicationMaster請(qǐng)求一個(gè)容器來(lái)啟動(dòng)Map Task或Reduce Task。也可以實(shí)現(xiàn)一個(gè)自定義的ApplicationMaster來(lái)運(yùn)行特定的Task,以便任何分布式框架都可以受YARN支持,只要實(shí)現(xiàn)了相應(yīng)的ApplicationMaster即可。
我們可以這樣認(rèn)為:ResourceManager管理整個(gè)集群,NodeManager管理集群中的單個(gè)節(jié)點(diǎn),ApplicationMaster管理單個(gè)應(yīng)用程序(集群中可能同時(shí)有多個(gè)應(yīng)用程序在運(yùn)行,每個(gè)應(yīng)用程序都有各自的ApplicationMaster)。
YARN集群中應(yīng)用程序的執(zhí)行流程如下圖所示:
- 客戶端提交應(yīng)用程序(可以是MapReduce程序、Spark程序等)到ResourceManager。
- ResourceManager分配用于運(yùn)行ApplicationMaster的Container,然后與NodeManager通信,要求它在該Container中啟動(dòng)ApplicationMaster。ApplicationMaster啟動(dòng)后,它將負(fù)責(zé)此應(yīng)用程序的整個(gè)生命周期。
- ApplicationMaster向ResourceManager注冊(cè)(注冊(cè)后可以通過(guò)ResourceManager查看應(yīng)用程序的運(yùn)行狀態(tài))并請(qǐng)求運(yùn)行應(yīng)用程序各個(gè)Task所需的Container(資源請(qǐng)求是對(duì)一些Container的請(qǐng)求)。如果符合條件,ResourceManager會(huì)分配給ApplicationMaster所需的Container(表達(dá)為Container ID和主機(jī)名)。
- ApplicationMaster請(qǐng)求NodeManager使用這些Container來(lái)運(yùn)行應(yīng)用程序的相應(yīng)Task(即將Task發(fā)布到指定的Container中運(yùn)行)。
此外,各個(gè)運(yùn)行中的Task會(huì)通過(guò)RPC協(xié)議向ApplicationMaster匯報(bào)自己的狀態(tài)和進(jìn)度,這樣一旦某個(gè)Task運(yùn)行失敗,ApplicationMaster就可以對(duì)其重新啟動(dòng)。當(dāng)應(yīng)用程序運(yùn)行完成時(shí),ApplicationMaster會(huì)向ResourceManager申請(qǐng)注銷自己。
圖片
三、Flink Standalone模式
Flink Standalone模式為經(jīng)典的主從(Master/Slave)架構(gòu),資源調(diào)度是Flink自己實(shí)現(xiàn)的。集群?jiǎn)?dòng)后,主節(jié)點(diǎn)上會(huì)啟動(dòng)一個(gè)JobManager進(jìn)程,類似YARN集群的ResourceManager,因此主節(jié)點(diǎn)也稱為JobManager節(jié)點(diǎn);各個(gè)從節(jié)點(diǎn)上會(huì)啟動(dòng)一個(gè)TaskManager進(jìn)程,類似YARN集群的NodeManager,因此從節(jié)點(diǎn)也稱為TaskManager節(jié)點(diǎn)。從Flink 1.6版本開(kāi)始,將主節(jié)點(diǎn)上的進(jìn)程名稱改為了StandaloneSessionClusterEntrypoint,從節(jié)點(diǎn)的進(jìn)程名稱改為了TaskManagerRunner,在這里為了方便使用,仍然沿用之前版本的稱呼,即JobManager和TaskManager。
Client接收到Flink應(yīng)用程序后,將作業(yè)提交給JobManager。JobManager要做的第一件事就是分配Task(任務(wù))所需的資源。完成資源分配后,Task將被JobManager提交給相應(yīng)的TaskManager,TaskManager會(huì)啟動(dòng)線程開(kāi)始執(zhí)行。在執(zhí)行過(guò)程中,TaskManager會(huì)持續(xù)向JobManager匯報(bào)狀態(tài)信息,例如開(kāi)始執(zhí)行、進(jìn)行中或完成等狀態(tài)。作業(yè)執(zhí)行完成后,結(jié)果將通過(guò)JobManager發(fā)送給Client。
Flink所有組件之間的通信使用的是Akka框架,組件之間的數(shù)據(jù)交互使用的是Netty框架。
圖片
Client 不是運(yùn)行時(shí)和程序執(zhí)行的一部分,而是用于準(zhǔn)備數(shù)據(jù)流并將其發(fā)送給 JobManager。之后,客戶端可以斷開(kāi)連接(分離模式),或保持連接來(lái)接收進(jìn)程報(bào)告(附加模式)。客戶端可以作為觸發(fā)執(zhí)行 Java/Scala 程序的一部分運(yùn)行,也可以在命令行進(jìn)程./bin/flink run …中運(yùn)行。
可以通過(guò)多種方式啟動(dòng) JobManager 和 TaskManager:直接在機(jī)器上作為standalone 集群?jiǎn)?dòng)、在容器中啟動(dòng)、或者通過(guò)YARN等資源框架管理并啟動(dòng)。TaskManager 連接到 JobManagers,宣布自己可用,并被分配工作。
3.1 JobManager
JobManager 具有許多與協(xié)調(diào) Flink 應(yīng)用程序的分布式執(zhí)行有關(guān)的職責(zé):它決定何時(shí)調(diào)度下一個(gè) task(或一組 task)、對(duì)完成的 task 或執(zhí)行失敗做出反應(yīng)、協(xié)調(diào) checkpoint、并且協(xié)調(diào)從失敗中恢復(fù)等等。這個(gè)進(jìn)程由三個(gè)不同的組件組成:
- ResourceManager
ResourceManager 負(fù)責(zé) Flink 集群中的資源提供、回收、分配 - 它管理 task slots,這是 Flink 集群中資源調(diào)度的單位(請(qǐng)參考TaskManagers)。Flink 為不同的環(huán)境和資源提供者(例如 YARN、Kubernetes 和 standalone 部署)實(shí)現(xiàn)了對(duì)應(yīng)的 ResourceManager。在 standalone 設(shè)置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行啟動(dòng)新的 TaskManager。 - Dispatcher
Dispatcher 提供了一個(gè) REST 接口,用來(lái)提交 Flink 應(yīng)用程序執(zhí)行,并為每個(gè)提交的作業(yè)啟動(dòng)一個(gè)新的 JobMaster。它還運(yùn)行 Flink WebUI 用來(lái)提供作業(yè)執(zhí)行信息。 - JobMaster
JobMaster 負(fù)責(zé)管理單個(gè)JobGraph的執(zhí)行。Flink 集群中可以同時(shí)運(yùn)行多個(gè)作業(yè),每個(gè)作業(yè)都有自己的 JobMaster。
始終至少有一個(gè) JobManager。高可用(HA)設(shè)置中可能有多個(gè) JobManager,其中一個(gè)始終是 leader,其他的則是 standby。
3.2 TaskManager
TaskManager是Flink集群的工作進(jìn)程。Task被調(diào)度到TaskManager上執(zhí)行。TaskManager相互通信,只為在后續(xù)的Task之間交換數(shù)據(jù)。
TaskManager的主要作用如下:
- 接收J(rèn)obManager分配的任務(wù),負(fù)責(zé)具體的任務(wù)執(zhí)行。
- TaskManager會(huì)在同一個(gè)JVM進(jìn)程內(nèi)以多線程的方式執(zhí)行任務(wù)。· 負(fù)責(zé)對(duì)應(yīng)任務(wù)在每個(gè)節(jié)點(diǎn)上的資源申請(qǐng),管理任務(wù)的啟動(dòng)、停止、銷毀、異?;謴?fù)等生命周期。
- 負(fù)責(zé)對(duì)數(shù)據(jù)進(jìn)行緩存。TaskManager之間采用數(shù)據(jù)流的形式進(jìn)行數(shù)據(jù)交互。
3.3 Tasks 和算子鏈
對(duì)于分布式執(zhí)行,F(xiàn)link 將算子的 subtasks 鏈接成 tasks。每個(gè) task 由一個(gè)線程執(zhí)行。將算子鏈接成 task 是個(gè)有用的優(yōu)化:它減少線程間切換、緩沖的開(kāi)銷,并且減少延遲的同時(shí)增加整體吞吐量。鏈行為是可以配置的。
下圖中樣例數(shù)據(jù)流用 5 個(gè) subtask 執(zhí)行,因此有 5 個(gè)并行線程。
3.4 Task Slots 和資源
每個(gè) worker(TaskManager)都是一個(gè) JVM 進(jìn)程,可以在單獨(dú)的線程中執(zhí)行一個(gè)或多個(gè) subtask。為了控制一個(gè) TaskManager 中接受多少個(gè) task,就有了所謂的 task slots(至少一個(gè))。
每個(gè) task slot 代表 TaskManager 中資源的固定子集。例如,具有 3 個(gè) slot 的 TaskManager,會(huì)將其托管內(nèi)存 1/3 用于每個(gè) slot。分配資源意味著 subtask 不會(huì)與其他作業(yè)的 subtask 競(jìng)爭(zhēng)托管內(nèi)存,而是具有一定數(shù)量的保留托管內(nèi)存。注意此處沒(méi)有 CPU 隔離;當(dāng)前 slot 僅分離 task 的托管內(nèi)存。
通過(guò)調(diào)整 task slot 的數(shù)量,用戶可以定義 subtask 如何互相隔離。每個(gè) TaskManager 有一個(gè) slot,這意味著每個(gè) task 組都在單獨(dú)的 JVM 中運(yùn)行(例如,可以在單獨(dú)的容器中啟動(dòng))。具有多個(gè) slot 意味著更多 subtask 共享同一 JVM。同一 JVM 中的 task 共享 TCP 連接(通過(guò)多路復(fù)用)和心跳信息。它們還可以共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu),從而減少了每個(gè) task 的開(kāi)銷。
圖片
默認(rèn)情況下,F(xiàn)link 允許 subtask 共享 slot,即便它們是不同的 task 的 subtask,只要是來(lái)自于同一作業(yè)即可。結(jié)果就是一個(gè) slot 可以持有整個(gè)作業(yè)管道。允許 slot 共享有兩個(gè)主要優(yōu)點(diǎn):
- Flink 集群所需的 task slot 和作業(yè)中使用的最大并行度恰好一樣。無(wú)需計(jì)算程序總共包含多少個(gè) task(具有不同并行度)。
- 容易獲得更好的資源利用。如果沒(méi)有 slot 共享,非密集 subtask(source/map())將阻塞和密集型 subtask(window) 一樣多的資源。通過(guò) slot 共享,我們示例中的基本并行度從 2 增加到 6,可以充分利用分配的資源,同時(shí)確保繁重的 subtask 在 TaskManager 之間公平分配。
圖片
四、Flink On YARN模式
Flink On YARN模式遵循YARN的官方規(guī)范,YARN只負(fù)責(zé)資源的管理和調(diào)度,運(yùn)行哪種應(yīng)用程序由用戶自己實(shí)現(xiàn),因此可能在YARN上同時(shí)運(yùn)行MapReduce程序、Spark程序、Flink程序等。YARN很好地對(duì)每一個(gè)程序?qū)崿F(xiàn)了資源的隔離,這使得Spark、MapReduce、Flink等可以運(yùn)行于同一個(gè)集群中,共享集群存儲(chǔ)資源與計(jì)算資源。Flink On YARN模式的運(yùn)行架構(gòu)如下圖所示。
圖片
- 當(dāng)啟動(dòng)一個(gè)Client(客戶端)會(huì)話時(shí),Client首先會(huì)上傳Flink應(yīng)用程序JAR包和配置文件到HDFS。
- Client向ResourceManager申請(qǐng)用于運(yùn)行ApplicationMaster的Container。
- ResourceManager分配用于運(yùn)行ApplicationMaster的Container,然后與NodeManager通信,要求它在該Container中啟動(dòng)ApplicationMaster(ApplicationMaster與Flink JobManager運(yùn)行于同一Container中,這樣ApplicationMaster就能知道Flink JobManager的地址)。ApplicationMaster啟動(dòng)后,它將負(fù)責(zé)此應(yīng)用程序的整個(gè)生命周期。另外,ApplicationMaster還提供了Flink的WebUI服務(wù)。
- ApplicationMaster向ResourceManager注冊(cè)(注冊(cè)后可以通過(guò)ResourceManager查看應(yīng)用程序的運(yùn)行狀態(tài))并請(qǐng)求運(yùn)行Flink TaskManager所需的Container(資源請(qǐng)求是對(duì)一些Container的請(qǐng)求)。如果符合條件,ResourceManager會(huì)分配給ApplicationMaster所需的Container(表達(dá)為Container ID和主機(jī)名)。ApplicationMaster請(qǐng)求NodeManager使用這些Container來(lái)運(yùn)行Flink TaskManager。各個(gè)NodeManager從HDFS中下載Flink JAR包和配置文件。至此,F(xiàn)link相關(guān)任務(wù)就可以運(yùn)行了。
此外,各個(gè)運(yùn)行中的Flink TaskManager會(huì)通過(guò)RPC協(xié)議向ApplicationMaster匯報(bào)自己的狀態(tài)和進(jìn)度。