Flink Task調(diào)度部署機制
1背景
在日常Flink使用過程中,我們經(jīng)常遇到Flink任務(wù)中某些Slot或者TM負載過重的問題,對日常的資源調(diào)配、運維以及降本都帶來了很大的影響,所以我們對Flink的task部署機制進行了梳理和調(diào)研,準備在后續(xù)的工作中進行優(yōu)化。由于jobGraph的生成以及任務(wù)提交流程因任務(wù)部署方式而不同,對我們后續(xù)的分析也沒有影響,這里忽略前置流程,直接從Dispatcher出發(fā),重點關(guān)注submit后executionGraph構(gòu)建以及后續(xù)的任務(wù)部署過程。
2Flink Scheduling Components 構(gòu)成
2.1 SchedulerNG
在Dispatcher收到submit請求后,先是啟動了JobManagerRunner,再啟動JobMaster,在初始化jobMaster的過程中,我們注意到這里開始了整個作業(yè)的Scheduling第一步,創(chuàng)建SchedulerNG。
我們看下SchedulerNG的職責,可以看到調(diào)度的發(fā)起,作業(yè)狀態(tài)的跟蹤以及我們熟悉的cp,sp的trigger都是在這里:
我們這次主要跟蹤構(gòu)建executionGraph,然后根據(jù)Scheduling策略發(fā)起的整個部署過程。
2.2 ExecutionGraph
現(xiàn)階段(1.13)SchedulerNG默認實現(xiàn)是DefaultScheduler,初始化過程中就會開始構(gòu)建我們的ExecutionGraph,ExecutionGraph中有幾個重要元素
- ExecutionJobVertex: 代表jobGraph中的一個JobVertex,是所有并行Task的集合
- ExecutionVertex: 代表ExecutionJobVertex中并行task中的一個,一個ExecutionJobVertex可能同時有很多并行運行的ExecutionVertex
- Execution: 代表ExecutionVertex的一次部署/執(zhí)行,一個ExecutionVertex可能會有很多次Execution
這里executionGraph通過jobGraph的拓撲圖構(gòu)建了自己的核心結(jié)構(gòu),看下從JobVertex到ExecutionJobVertex 的轉(zhuǎn)換流程:
2.3 執(zhí)行層拓撲結(jié)構(gòu)
我們知道Flink引擎在不停的致力于批流一體建設(shè),調(diào)度層的統(tǒng)一也是其中核心的一層。為了提高failover后recovery速度,減少對Flink任務(wù)的影響,現(xiàn)在Flink對于批、流的任務(wù)task調(diào)度都是以pipeline region為基礎(chǔ)。
Pipeline region的構(gòu)建內(nèi)嵌在executionGraph的初始化過程中,我們知道Flink中各個節(jié)點之間的鏈接都會有IntermediateDataSet這一種邏輯結(jié)構(gòu),用來表示JobVertex的輸出,即該JobVertex中包含的算子會產(chǎn)生的數(shù)據(jù)集。這個數(shù)據(jù)集的ResultPartitionType有幾種類型:
接下來我們看看executionGraph的核心拓撲結(jié)構(gòu)ExecutionTopology是如何構(gòu)建的:
2.4 Scheduling 策略
SchedulerNG Scheduling策略默認為PipelinedRegionSchedulingStrategy,在executionGraph完成之后,就可以根據(jù)生成的剛剛executionTopology來初步構(gòu)建初步的Scheduling策略了。這里看下startScheduling代碼,可以看到Scheduling過程就是我們常說的基于pipeline region的Scheduling。
2.5 Execution Slot 分配器
默認實現(xiàn)是SlotSharingExecutionSlotAllocator,在schedulerNG完成executionGraph構(gòu)建完成后,需要進一步構(gòu)建Execution Slot 分配器。用于將physical shared slots分配到我們的logical slots 上,并將logical slot 分配給我們executionGraph中的execution(task)。通過代碼我們可以看到ExecutionSlotAllocator的職責非常簡單,只有簡單的allocate和cancel。
但在實現(xiàn)上這里有幾個重要元素需要了解:
LocalInputPreferredSlotSharingStrategy :在Flink內(nèi)部,所有的slot分配都是基于sharingslot來操作的,在滿足co-location的基礎(chǔ)上,F(xiàn)link期望將producer和consumeNode task盡可能的分布在一起,以減少數(shù)據(jù)傳輸成本。
SlotProfile:slot的資源信息,對task -> logical slot -> physical slot的mapping有非常重要的作用,包含了task的資源信息,slot的物理資源信息,傾向的location(TaskManagerLocation),傾向的allocation以及整個executionGraph之前分配過的allocation(用于黑名單,重啟后盡量避免分配在之前的slot里)。
ResourceProfileRetriever: 用于獲取executionVertex的實際資源信息。默認是unknown,如果有明細配置會用于后續(xù)的executionSlotSharingGroup資源構(gòu)建。
ExecutionSlotSharingGroup:Flink task資源申請的最終邏輯載體,用于將sharing到一起的task(execution group)組合成一個group用于生成資源,后續(xù)部署也會綁定對應(yīng)的task。
3Scheduling 主要過程
在JobMaster完成自身構(gòu)建之后,就委托SchedulerNG來開始了整個job的Scheduling:
可以看到這里是由schedulingStrategy來負責整個調(diào)度過程的,也就是我們的PipelinedRegionSchedulingStrategy,
one by one將pipeline region進行部署
遍歷region中的ExecutionVertex依次進行部署
將vertexDeployment交給SlotSharingExecutionSlotAllocator處理
通過SlotSharingStrategy拿到每個execution對應(yīng)的ExecutionSlotSharingGroup
- 先從 corresponding co-location constraint 去mapping中尋找是否有存在的slot sharing group
- 接著從producer 的角度來逐一檢查是否可以合并到同一個slot sharing group.
- 最后嘗試所有剩下的slot sharing group看是否符合execution 的要求(如同屬于一個job vertex的task不能分配到同一個 slot sharing group).
- 如果以上都沒有滿足條件的就創(chuàng)建一個新的slot sharing group
- 檢查ExecutionSlotSharingGroup是否已經(jīng)有了對應(yīng)的sharedSlot
- 遍歷尚未得到分配的ExecutionSlotSharingGroup
- 計算對應(yīng)的SlotProfile
- 向PhysicalSlotProvider申請新的physical slot
rm側(cè)會先檢查是否已經(jīng)有滿足條件的excess slot
如果沒有嘗試會申請新的woker以提供資源
由sharedSlotProfileRetriever來創(chuàng)建對應(yīng)的slotProfile并構(gòu)建PhysicalSlotRequest
PhysicalSlotProvider向slotPool申請新的slot
slotPool會向rm側(cè)申請新的slot
利用physical slot future提前創(chuàng)建sharedSlotFutrue
將sharedSlotFutrue 分配給所有相關(guān)的executions
最后生成所有的SlotExecutionVertexAssignments
在完成所有的SlotExecutionVertexAssignment之后,生成對應(yīng)的DeploymentHandle并等待所有的assignedSlot創(chuàng)建完畢,正式開始部署對應(yīng)的任務(wù)。?
4問題思考
我們對整個Flink task的部署過程完成梳理后,重新對我們一開始的問題進行思考:
4.1 為什么會出現(xiàn)slot負載過重的情況?如何避免?
問題的產(chǎn)生在于大量的task集中分配到了統(tǒng)一個sharedSlot,這個我們可以發(fā)現(xiàn)其實是在ExecutionSlotSharingGroup的構(gòu)建過程中產(chǎn)生的。我們看下源碼,可以很直接的看到整個group的分配是一個roundRobin過程,而executionVertices來自于有序拓撲結(jié)構(gòu),中間傳遞過程也保證了有序性,所以最終會導(dǎo)致大量的task分配的index靠前的group中,最后落到了同一個slot。
為了避免這種情況,我們的做法其實有比較多,一種是在保證各種constraint的同時添加隨機性,以打散各個不均勻的task;還有一種就是構(gòu)建基于load-balance的分配過程,以盡可能的將task分布均勻。
附Flink部分源碼:
這個問題主要是在于說有一些過重的task對應(yīng)的slot都分配在了同一個tm上,導(dǎo)致整個tm壓力過大,資源難以協(xié)調(diào)。在整個過程中其實我們有看到tm信息的交互,在co-location constraint上。我們看下該hint職責:
The co-location group is used to make sure that the i-th subtasks for iteration head and iteration tail are scheduled on the same TaskManager.
也就是說其實是為了解決算子間相同index的task數(shù)據(jù)傳遞之類的問題,但對于task的均衡負載無法介入。對此我們嘗試去做的事情:
在當前不使用細粒度資源配置的情況下,考慮task-slot之間均衡分布的同事,task-tm也能做到一定的負載均衡。這種情況可以通過tm單slot來解決,也可以在保證task-slotSharingGroup足夠隨機性的同時,保證slotSharingGroup-tm的足夠隨機性。
在后續(xù)使用使用細粒度資源配置的情況下,不使用slotsharing,且將相同jobVertex對應(yīng)的task盡量分布在同一個task當中。這個我們后續(xù)準備在slotProfile中加入jobVertex相關(guān)的tag,SlotAllocator做slot matching的時候加入jobVertex constraint來保證task的位置分配。
5寫在最后
Flink開源社區(qū)較活躍,Task側(cè)的部署鏈路也一直在演進中,持續(xù)跟進并深入了解內(nèi)部實現(xiàn)邏輯能更好的支持我們解決Flink個性化調(diào)度策略上的一些問題。后續(xù)我們也準備進一步完善Flink在operator級別的細粒度資源配置能力,降低資源使用率的同時進一步提高Flink作業(yè)穩(wěn)定性。