自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink Task調(diào)度部署機制

開發(fā) 前端
Flink開源社區(qū)較活躍,Task側(cè)的部署鏈路也一直在演進中,持續(xù)跟進并深入了解內(nèi)部實現(xiàn)邏輯能更好的支持我們解決Flink個性化調(diào)度策略上的一些問題。

1背景

在日常Flink使用過程中,我們經(jīng)常遇到Flink任務(wù)中某些Slot或者TM負載過重的問題,對日常的資源調(diào)配、運維以及降本都帶來了很大的影響,所以我們對Flink的task部署機制進行了梳理和調(diào)研,準備在后續(xù)的工作中進行優(yōu)化。由于jobGraph的生成以及任務(wù)提交流程因任務(wù)部署方式而不同,對我們后續(xù)的分析也沒有影響,這里忽略前置流程,直接從Dispatcher出發(fā),重點關(guān)注submit后executionGraph構(gòu)建以及后續(xù)的任務(wù)部署過程。

2Flink Scheduling Components 構(gòu)成

2.1   SchedulerNG

在Dispatcher收到submit請求后,先是啟動了JobManagerRunner,再啟動JobMaster,在初始化jobMaster的過程中,我們注意到這里開始了整個作業(yè)的Scheduling第一步,創(chuàng)建SchedulerNG。

this.schedulerNG =
createScheduler(
slotPoolServiceSchedulerFactory,
executionDeploymentTracker,
jobManagerJobMetricGroup,
jobStatusListener);

我們看下SchedulerNG的職責,可以看到調(diào)度的發(fā)起,作業(yè)狀態(tài)的跟蹤以及我們熟悉的cp,sp的trigger都是在這里:

圖片

我們這次主要跟蹤構(gòu)建executionGraph,然后根據(jù)Scheduling策略發(fā)起的整個部署過程。

2.2   ExecutionGraph

現(xiàn)階段(1.13)SchedulerNG默認實現(xiàn)是DefaultScheduler,初始化過程中就會開始構(gòu)建我們的ExecutionGraph,ExecutionGraph中有幾個重要元素

  1. ExecutionJobVertex: 代表jobGraph中的一個JobVertex,是所有并行Task的集合
  2. ExecutionVertex: 代表ExecutionJobVertex中并行task中的一個,一個ExecutionJobVertex可能同時有很多并行運行的ExecutionVertex
  3. Execution: 代表ExecutionVertex的一次部署/執(zhí)行,一個ExecutionVertex可能會有很多次Execution

這里executionGraph通過jobGraph的拓撲圖構(gòu)建了自己的核心結(jié)構(gòu),看下從JobVertex到ExecutionJobVertex 的轉(zhuǎn)換流程:

// topologically sort the job vertices and attach the graph to the existing one
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
executionGraph.attachJobGraph(sortedTopology){
1. executionGraph第一步拿到了jobGraph中的有序JobVertex列表
2. 接著一對一創(chuàng)建ExecutionJobVertex
3. 根據(jù)producer并行度生成producedDataSets(IntermediateDataSet)
4. 再根據(jù)自身并行度生成所屬的ExecutionVertex[]
5. 構(gòu)建stateBackend信息和checkpointStorage信息等
6. 最后完成executionGraph的拓撲構(gòu)建executionTopology
}

2.3   執(zhí)行層拓撲結(jié)構(gòu)

我們知道Flink引擎在不停的致力于批流一體建設(shè),調(diào)度層的統(tǒng)一也是其中核心的一層。為了提高failover后recovery速度,減少對Flink任務(wù)的影響,現(xiàn)在Flink對于批、流的任務(wù)task調(diào)度都是以pipeline region為基礎(chǔ)。

Pipeline region的構(gòu)建內(nèi)嵌在executionGraph的初始化過程中,我們知道Flink中各個節(jié)點之間的鏈接都會有IntermediateDataSet這一種邏輯結(jié)構(gòu),用來表示JobVertex的輸出,即該JobVertex中包含的算子會產(chǎn)生的數(shù)據(jù)集。這個數(shù)據(jù)集的ResultPartitionType有幾種類型:

BLOCKING:都上游處理完數(shù)據(jù)后,再交給下游處理。這個數(shù)據(jù)分區(qū)可以被消費多次,也可以并發(fā)消費。這個分區(qū)并不會被自動銷毀,而是交給調(diào)度器判斷。
BLOCKING_PERSISTENT:類似于Blocking,但是其生命周期由用戶端指定。調(diào)用JobMaster或者ResourceManager的API來銷毀,而不是由調(diào)度器控制。
PIPELINED:流交換模式。可以用于有界和無界流。這種分區(qū)類型的數(shù)據(jù)只能被每個消費者消費一次。且這種分區(qū)可以保留任意數(shù)據(jù)。
PIPELINED_BOUNDED:該策略在PIPELINED的基礎(chǔ)上保留有限制的buffer,避免對barrier造成阻塞。
PIPELINED_APPROXIMATE:和PIPELINED_BOUNDED類似,可以支持下游task重啟后繼續(xù)消費,用來支持task failover后的Approximate Local-Recovery策略。

接下來我們看看executionGraph的核心拓撲結(jié)構(gòu)ExecutionTopology是如何構(gòu)建的:

第一步 先根據(jù)executionTopology構(gòu)建rawPipelinedRegions,多個vertex能否組合成一個pipeline region的關(guān)鍵在于這個vertex的consumedResult.getResultType().isReconnectable(),如果支持重連,那么兩個vertex之間就會進行拆分,劃到不同的region。這里的isReconnectable就和我們的ResultPartitionType類型有關(guān),流處理中的PIPELINED和PIPELINED_BOUNDED都是默認的false,在這種情況下所有的vertex其實都會放入同一個region。故我們?nèi)粘5膄link作業(yè)其實都只會生成一個pipeline region。
第二步 根據(jù)不同的pipeline region構(gòu)建自己的resultPartition信息,這個是為了構(gòu)建后續(xù)的PartitionReleaseStrategy,決定一個resultPartition何時finish以及被release
第三步 對vertex的coLocation情況進行校驗,保證co-located tasks必須在同一個pipeline Region里。這里是因為后續(xù)的scheduling strategy里會保證不同pipeline region的調(diào)度部署是階段隔離的,可能無法滿足colocation-constraint

2.4   Scheduling 策略

SchedulerNG Scheduling策略默認為PipelinedRegionSchedulingStrategy,在executionGraph完成之后,就可以根據(jù)生成的剛剛executionTopology來初步構(gòu)建初步的Scheduling策略了。這里看下startScheduling代碼,可以看到Scheduling過程就是我們常說的基于pipeline region的Scheduling。

@Override
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions =
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
.filter(this::isSourceRegion)
.collect(Collectors.toSet());
maybeScheduleRegions(sourceRegions);
}

2.5   Execution Slot 分配器

默認實現(xiàn)是SlotSharingExecutionSlotAllocator,在schedulerNG完成executionGraph構(gòu)建完成后,需要進一步構(gòu)建Execution Slot 分配器。用于將physical shared slots分配到我們的logical slots 上,并將logical slot 分配給我們executionGraph中的execution(task)。通過代碼我們可以看到ExecutionSlotAllocator的職責非常簡單,只有簡單的allocate和cancel。

圖片

但在實現(xiàn)上這里有幾個重要元素需要了解:

LocalInputPreferredSlotSharingStrategy :在Flink內(nèi)部,所有的slot分配都是基于sharingslot來操作的,在滿足co-location的基礎(chǔ)上,F(xiàn)link期望將producer和consumeNode task盡可能的分布在一起,以減少數(shù)據(jù)傳輸成本。

SlotProfile:slot的資源信息,對task -> logical slot -> physical slot的mapping有非常重要的作用,包含了task的資源信息,slot的物理資源信息,傾向的location(TaskManagerLocation),傾向的allocation以及整個executionGraph之前分配過的allocation(用于黑名單,重啟后盡量避免分配在之前的slot里)。

ResourceProfileRetriever: 用于獲取executionVertex的實際資源信息。默認是unknown,如果有明細配置會用于后續(xù)的executionSlotSharingGroup資源構(gòu)建。

ExecutionSlotSharingGroup:Flink task資源申請的最終邏輯載體,用于將sharing到一起的task(execution group)組合成一個group用于生成資源,后續(xù)部署也會綁定對應(yīng)的task。

3Scheduling 主要過程

在JobMaster完成自身構(gòu)建之后,就委托SchedulerNG來開始了整個job的Scheduling:

@Override
protected void startSchedulingInternal() {
log.info(
"Starting scheduling with scheduling strategy [{}]",
schedulingStrategy.getClass().getName());
transitionToRunning();
schedulingStrategy.startScheduling();
}

可以看到這里是由schedulingStrategy來負責整個調(diào)度過程的,也就是我們的PipelinedRegionSchedulingStrategy

one by one將pipeline region進行部署

private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
final List<SchedulingPipelinedRegion> regionsSorted =
SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(
schedulingTopology, regions);


final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = new HashMap<>();
for (SchedulingPipelinedRegion region : regionsSorted) {
maybeScheduleRegion(region, consumableStatusCache);
}
}

遍歷region中的ExecutionVertex依次進行部署

final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
regionVerticesSorted.get(region), id -> deploymentOption);
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);

將vertexDeployment交給SlotSharingExecutionSlotAllocator處理

private List<SlotExecutionVertexAssignment> allocateSlots(
final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
return executionSlotAllocator.allocateSlotsFor(
executionVertexDeploymentOptions.stream()
.map(ExecutionVertexDeploymentOption::getExecutionVertexId)
.collect(Collectors.toList()));
}
接下來整個allocate的主要過程如下(忽略physical fail等情況)

通過SlotSharingStrategy拿到每個execution對應(yīng)的ExecutionSlotSharingGroup

  1. 先從 corresponding co-location constraint 去mapping中尋找是否有存在的slot sharing group
  2. 接著從producer 的角度來逐一檢查是否可以合并到同一個slot sharing group.
  3. 最后嘗試所有剩下的slot sharing group看是否符合execution 的要求(如同屬于一個job vertex的task不能分配到同一個 slot sharing group).
  4. 如果以上都沒有滿足條件的就創(chuàng)建一個新的slot sharing group
  1. 檢查ExecutionSlotSharingGroup是否已經(jīng)有了對應(yīng)的sharedSlot
  2. 遍歷尚未得到分配的ExecutionSlotSharingGroup
  3. 計算對應(yīng)的SlotProfile
  4. 向PhysicalSlotProvider申請新的physical slot
  1. rm側(cè)會先檢查是否已經(jīng)有滿足條件的excess slot

  2. 如果沒有嘗試會申請新的woker以提供資源

  3. 由sharedSlotProfileRetriever來創(chuàng)建對應(yīng)的slotProfile并構(gòu)建PhysicalSlotRequest

  4. PhysicalSlotProvider向slotPool申請新的slot

  5. slotPool會向rm側(cè)申請新的slot

  1. 利用physical slot  future提前創(chuàng)建sharedSlotFutrue

  2. 將sharedSlotFutrue 分配給所有相關(guān)的executions

  3. 最后生成所有的SlotExecutionVertexAssignments

在完成所有的SlotExecutionVertexAssignment之后,生成對應(yīng)的DeploymentHandle并等待所有的assignedSlot創(chuàng)建完畢,正式開始部署對應(yīng)的任務(wù)。?

4問題思考

我們對整個Flink task的部署過程完成梳理后,重新對我們一開始的問題進行思考:

4.1   為什么會出現(xiàn)slot負載過重的情況?如何避免?

問題的產(chǎn)生在于大量的task集中分配到了統(tǒng)一個sharedSlot,這個我們可以發(fā)現(xiàn)其實是在ExecutionSlotSharingGroup的構(gòu)建過程中產(chǎn)生的。我們看下源碼,可以很直接的看到整個group的分配是一個roundRobin過程,而executionVertices來自于有序拓撲結(jié)構(gòu),中間傳遞過程也保證了有序性,所以最終會導(dǎo)致大量的task分配的index靠前的group中,最后落到了同一個slot。

為了避免這種情況,我們的做法其實有比較多,一種是在保證各種constraint的同時添加隨機性,以打散各個不均勻的task;還有一種就是構(gòu)建基于load-balance的分配過程,以盡可能的將task分布均勻。

附Flink部分源碼:

private void findAvailableOrCreateNewExecutionSlotSharingGroupFor(
final List<SchedulingExecutionVertex> executionVertices) {


for (SchedulingExecutionVertex executionVertex : executionVertices) {
final SlotSharingGroup slotSharingGroup =
getSlotSharingGroup(executionVertex.getId());
final List<ExecutionSlotSharingGroup> groups =
executionSlotSharingGroups.computeIfAbsent(
slotSharingGroup.getSlotSharingGroupId(), k -> new ArrayList<>());


ExecutionSlotSharingGroup group = null;
for (ExecutionSlotSharingGroup executionSlotSharingGroup : groups) {
if (isGroupAvailableForVertex(
executionSlotSharingGroup, executionVertex.getId())) {
group = executionSlotSharingGroup;
break;
}
}


if (group == null) {
group = new ExecutionSlotSharingGroup();
group.setResourceProfile(slotSharingGroup.getResourceProfile());
groups.add(group);
}


addVertexToExecutionSlotSharingGroup(executionVertex, group);
}
}
4.2   如何避免tm級別的負載過重?

這個問題主要是在于說有一些過重的task對應(yīng)的slot都分配在了同一個tm上,導(dǎo)致整個tm壓力過大,資源難以協(xié)調(diào)。在整個過程中其實我們有看到tm信息的交互,在co-location constraint上。我們看下該hint職責:

The co-location group is used to make sure that the i-th subtasks for iteration head and iteration tail are scheduled on the same TaskManager.

也就是說其實是為了解決算子間相同index的task數(shù)據(jù)傳遞之類的問題,但對于task的均衡負載無法介入。對此我們嘗試去做的事情:

在當前不使用細粒度資源配置的情況下,考慮task-slot之間均衡分布的同事,task-tm也能做到一定的負載均衡。這種情況可以通過tm單slot來解決,也可以在保證task-slotSharingGroup足夠隨機性的同時,保證slotSharingGroup-tm的足夠隨機性。

在后續(xù)使用使用細粒度資源配置的情況下,不使用slotsharing,且將相同jobVertex對應(yīng)的task盡量分布在同一個task當中。這個我們后續(xù)準備在slotProfile中加入jobVertex相關(guān)的tag,SlotAllocator做slot matching的時候加入jobVertex constraint來保證task的位置分配。

5寫在最后

Flink開源社區(qū)較活躍,Task側(cè)的部署鏈路也一直在演進中,持續(xù)跟進并深入了解內(nèi)部實現(xiàn)邏輯能更好的支持我們解決Flink個性化調(diào)度策略上的一些問題。后續(xù)我們也準備進一步完善Flink在operator級別的細粒度資源配置能力,降低資源使用率的同時進一步提高Flink作業(yè)穩(wěn)定性。

責任編輯:武曉燕 來源: 得物技術(shù)
相關(guān)推薦

2024-02-27 08:05:32

Flink分區(qū)機制數(shù)據(jù)傳輸

2014-01-06 17:09:10

ApacheMesos

2022-01-14 07:56:38

Checkpoint機制Flink

2025-01-15 09:13:53

2021-11-02 06:58:55

FlinkWindow機制

2022-12-20 10:22:16

計算函數(shù)

2024-06-04 15:56:48

Task?.NET異步編程

2015-03-24 16:29:55

默認線程池java

2020-10-10 14:21:49

CDH6.3.2flink部署

2013-08-05 17:09:57

2023-06-20 07:32:04

2021-07-30 19:44:51

AndroidJava線程

2020-03-03 08:29:07

時延敏感網(wǎng)絡(luò)TSN網(wǎng)絡(luò)

2022-06-20 06:38:50

Flink批作業(yè)算子

2021-11-29 08:48:00

K8S KubernetesAirflow

2009-03-02 14:19:33

CiscoWi-Fi通話調(diào)度

2021-02-01 11:30:13

React前端調(diào)度

2022-05-19 08:47:30

Flinkwatermark窗口計算

2019-06-04 13:44:53

架構(gòu)運維技術(shù)

2015-01-14 17:20:35

點贊
收藏

51CTO技術(shù)棧公眾號