自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

自適應(yīng)批作業(yè)調(diào)度器:為 Flink 批作業(yè)自動(dòng)推導(dǎo)并行度

開(kāi)發(fā)
為了控制批作業(yè)的執(zhí)行時(shí)長(zhǎng),算子的并行度應(yīng)該和其需要處理的數(shù)據(jù)量成正比。用戶需要通過(guò)預(yù)估算子需要處理的數(shù)據(jù)量來(lái)配置并行度。但準(zhǔn)確預(yù)估算子需要處理的數(shù)據(jù)量是一件很困難的事情。

?01引言

對(duì)大部分用戶來(lái)說(shuō),為 Flink 算子配置合適的并行度并不是一件容易的事。對(duì)于批作業(yè),小的并行度會(huì)導(dǎo)致作業(yè)運(yùn)行時(shí)間長(zhǎng),故障恢復(fù)慢,而不必要的大并行度會(huì)導(dǎo)致資源浪費(fèi),任務(wù)部署和數(shù)據(jù) shuffle 開(kāi)銷也會(huì)變大。

為了控制批作業(yè)的執(zhí)行時(shí)長(zhǎng),算子的并行度應(yīng)該和其需要處理的數(shù)據(jù)量成正比。用戶需要通過(guò)預(yù)估算子需要處理的數(shù)據(jù)量來(lái)配置并行度。但準(zhǔn)確預(yù)估算子需要處理的數(shù)據(jù)量是一件很困難的事情:需要處理的數(shù)據(jù)量可能每天都在變化,作業(yè)中可能會(huì)存在大量的 UDF 和復(fù)雜算子導(dǎo)致難以判斷其產(chǎn)出的數(shù)據(jù)量。

為了解決這個(gè)問(wèn)題,我們?cè)?Flink 1.15 中引入了一種新的調(diào)度器:自適應(yīng)批作業(yè)調(diào)度器(Adaptive Batch Scheduler)。自適應(yīng)批作業(yè)調(diào)度器會(huì)在作業(yè)運(yùn)行時(shí)根據(jù)每個(gè)算子需要處理的實(shí)際數(shù)據(jù)量來(lái)自動(dòng)推導(dǎo)并行度。它會(huì)帶來(lái)以下好處:

  1. 大大降低批處理作業(yè)并發(fā)度調(diào)優(yōu)的繁瑣程度;
  2. 可以根據(jù)處理的數(shù)據(jù)量為不同的算子配置不同的并行度,這對(duì)于之前只能配置全局并行度的 SQL 作業(yè)尤其有益;
  3. 可以更好的適應(yīng)每日變化的數(shù)據(jù)量。

02用法

使 Flink 自動(dòng)推導(dǎo)算子的并行度,需要進(jìn)行以下配置:

  1. 啟用自適應(yīng)批作業(yè)調(diào)度器;
  2. 配置算子的并行度為 -1。

2.1 啟用自適應(yīng)批作業(yè)調(diào)度器

啟用自適應(yīng)批作業(yè)調(diào)度器,需要進(jìn)行以下配置:

  1. 配置 jobmanager.scheduler: AdaptiveBatch;
  2. 將 execution.batch-shuffle-mode 配置為 ALL-EXCHANGES-BLOCKING (默認(rèn)值)。因?yàn)槟壳白赃m應(yīng)批作業(yè)調(diào)度器只支持 shuffle mode 為 ALL-EXCHANGES-BLOCKING 的作業(yè)。

此外,還有一些相關(guān)配置來(lái)指定自動(dòng)推導(dǎo)的算子并行度的上下限、預(yù)期每個(gè)算子處理的數(shù)據(jù)量以及 source 算子的默認(rèn)并行度,詳情請(qǐng)參閱 Flink 文檔 [1]。

2.2 配置算子的并行度為 -1

自適應(yīng)批作業(yè)調(diào)度器只會(huì)為用戶未指定并行度的算子(即并行度為默認(rèn)值 -1)推導(dǎo)并行度。所以需要進(jìn)行以下配置:

  1. 配置 parallelism.default: -1;
  2. 對(duì)于 SQL 作業(yè),需要配置 table.exec.resource.default-parallelism: -1;
  3. 對(duì)于 DataStream/DataSet 作業(yè),避免在作業(yè)中通過(guò)算子的 setParallelism() 方法來(lái)指定并行度;
  4. 對(duì)于 DataStream/DataSet 作業(yè),避免在作業(yè)中通過(guò) StreamExecutionEnvironment/ExecutionEnvironment 的 setParallelism() 方法來(lái)指定并行度。

03實(shí)現(xiàn)細(xì)節(jié)

接下來(lái)我們將介紹自適應(yīng)批作業(yè)調(diào)度器的實(shí)現(xiàn)細(xì)節(jié)。在此之前,我們簡(jiǎn)要介紹一下涉及到的一些術(shù)語(yǔ)概念:

  1. 邏輯節(jié)點(diǎn)(JobVertex)[2] 和邏輯拓?fù)洌↗obGraph)[3]:邏輯節(jié)點(diǎn)是為了更優(yōu)的性能而將幾個(gè)算子鏈接到一起形成的算子鏈,邏輯拓?fù)鋭t是多個(gè)邏輯節(jié)點(diǎn)連接組成的數(shù)據(jù)流圖。
  2. 執(zhí)行節(jié)點(diǎn)(ExecutionVertex)[4] 和執(zhí)行拓?fù)洌‥xecutionGraph)[5]:執(zhí)行節(jié)點(diǎn)對(duì)應(yīng)一個(gè)可部署物理任務(wù),是邏輯節(jié)點(diǎn)根據(jù)并行度進(jìn)行展開(kāi)生成的。例如,如果一個(gè)邏輯節(jié)點(diǎn)的并行度為 100,就會(huì)生成 100 個(gè)對(duì)應(yīng)的執(zhí)行節(jié)點(diǎn)。執(zhí)行拓?fù)鋭t是所有執(zhí)行節(jié)點(diǎn)連接組成的物理執(zhí)行圖。

以上概念的介紹可以參見(jiàn) Flink 文檔 [6]。需要注意的是,自適應(yīng)批作業(yè)調(diào)度器是通過(guò)推導(dǎo)邏輯節(jié)點(diǎn)的并行度來(lái)決定該節(jié)點(diǎn)包含的算子的并行度的。

實(shí)現(xiàn)細(xì)節(jié)主要包括以下幾部分:

  1. 使調(diào)度器能夠收集執(zhí)行節(jié)點(diǎn)產(chǎn)出數(shù)據(jù)的大??;
  2. 引入一個(gè)新組件 VertexParallelismDecider [7] 來(lái)負(fù)責(zé)根據(jù)邏輯節(jié)點(diǎn)需要處理的數(shù)據(jù)量計(jì)算其并行度;
  3. 支持動(dòng)態(tài)構(gòu)建執(zhí)行拓?fù)?,即?zhí)行拓?fù)鋸囊粋€(gè)空的執(zhí)行拓?fù)溟_(kāi)始,然后隨著作業(yè)調(diào)度逐漸添加執(zhí)行節(jié)點(diǎn);
  4. 引入自適應(yīng)批作業(yè)調(diào)度器來(lái)更新和調(diào)度執(zhí)行拓?fù)洹?/li>

后續(xù)章節(jié)會(huì)對(duì)以上內(nèi)容進(jìn)行詳細(xì)介紹。

圖片

圖 1 - 自動(dòng)推導(dǎo)并行度的整體結(jié)構(gòu)

3.1 收集執(zhí)行節(jié)點(diǎn)產(chǎn)出的數(shù)據(jù)量

自適應(yīng)批作業(yè)調(diào)度器是根據(jù)邏輯節(jié)點(diǎn)需要處理的數(shù)據(jù)量來(lái)決定其并行度的,因此需要收集上游節(jié)點(diǎn)產(chǎn)出的數(shù)據(jù)量。為此,我們引入了一個(gè) numBytesProduced 計(jì)數(shù)器來(lái)記錄每個(gè)執(zhí)行節(jié)點(diǎn)產(chǎn)出的數(shù)據(jù)分區(qū)(ResultPartition)的數(shù)據(jù)量,并在執(zhí)行節(jié)點(diǎn)運(yùn)行完成時(shí)將累計(jì)值發(fā)送給調(diào)度器。

3.2 為邏輯節(jié)點(diǎn)決定合適的并行度

我們引入了一個(gè)新組件 VertexParallelismDecider 來(lái)負(fù)責(zé)為邏輯節(jié)點(diǎn)計(jì)算并行度。計(jì)算算法如下:

假設(shè)

  1. V 是用戶配置的期望每個(gè)執(zhí)行節(jié)點(diǎn)處理的數(shù)據(jù)量;
  2. totalBytenon-broadcast 是邏輯節(jié)點(diǎn)需要處理的非廣播數(shù)據(jù)的總量;
  3. totalBytesbroadcast 是邏輯節(jié)點(diǎn)需要處理的廣播數(shù)據(jù)的總量;
  4. maxBroadcastRatio 是每個(gè)執(zhí)行節(jié)點(diǎn)處理的廣播數(shù)據(jù)的比例上限;
  5. normalize(x) 是一個(gè)輸出與 x 最接近的 2 的冪的函數(shù)。

計(jì)算并行度的公式如下:

圖片

值得注意的是,我們?cè)谶@個(gè)公式中引入了兩個(gè)特殊處理:

  1. 限制每個(gè)執(zhí)行節(jié)點(diǎn)處理的廣播數(shù)據(jù)的比例;
  2. 將并行度調(diào)整為 2 的冪。

此外,上述公式不能直接用來(lái)決定 source 節(jié)點(diǎn)的并行度,因?yàn)?source 節(jié)點(diǎn)不會(huì)消費(fèi)數(shù)據(jù)。為了解決這個(gè)問(wèn)題,我們引入了配置選項(xiàng) jobmanager.adaptive-batch-scheduler.default-source-parallelism,允許用戶手動(dòng)配置 source 節(jié)點(diǎn)的并行度。請(qǐng)注意,并非所有 source 都需要此選項(xiàng),因?yàn)槟承?source 可以自己推導(dǎo)并行度(例如,HiveTableSource,詳情請(qǐng)參閱 HiveParallelismInference),對(duì)于這些source,更推薦由它們自己推導(dǎo)并行度。

3.2.1 限制每個(gè)執(zhí)行節(jié)點(diǎn)處理的廣播數(shù)據(jù)的比例

我們?cè)诠较拗泼總€(gè)執(zhí)行節(jié)點(diǎn)處理的廣播數(shù)據(jù)上限比例為 maxBroadcastRatio。 即每個(gè)執(zhí)行節(jié)點(diǎn)處理的非廣播數(shù)據(jù)至少為 (1-maxBroadcastRatio) * V。如果不這樣做,當(dāng)廣播數(shù)據(jù)的數(shù)據(jù)量接近 V 時(shí),即使非廣播數(shù)據(jù)的量非常小,也可能會(huì)被計(jì)算出很大的并行度,這是不必要的,會(huì)導(dǎo)致資源浪費(fèi)和任務(wù)部署的開(kāi)銷變大。

通常情況下,一個(gè)執(zhí)行節(jié)點(diǎn)需要處理的廣播數(shù)據(jù)量會(huì)小于要處理的非廣播數(shù)據(jù)。 因此,我們將 maxBroadcastRatio 默認(rèn)設(shè)置為 0.5。目前,這個(gè)值是硬編碼在代碼中的,我們后續(xù)會(huì)考慮將其改為可配置的。

3.2.2 將并行度調(diào)整為 2 的冪

normalize 函數(shù)會(huì)將并行度調(diào)整為最近的 2 的冪,這樣做是為了避免引入數(shù)據(jù)傾斜。為了更好的理解本節(jié),我們建議您先閱讀子分區(qū)動(dòng)態(tài)映射部分。

以圖 4(b)為例,A1/A2 產(chǎn)生 4 個(gè)子分區(qū),B 最終被決定的并行度為 3。這種情況下,B1 將消費(fèi) 1 個(gè)子分區(qū),B2 將消費(fèi) 1 個(gè)子分區(qū),B3 將消費(fèi) 2 個(gè)子分區(qū)。我們假設(shè)不同子分區(qū)的數(shù)據(jù)量都相同,這樣 B3 需要消費(fèi)的數(shù)據(jù)量是 B1/B2 的 2 倍,從而導(dǎo)致了數(shù)據(jù)傾斜。

為了解決這個(gè)問(wèn)題,我們需要讓所有下游執(zhí)行節(jié)點(diǎn)消費(fèi)的子分區(qū)數(shù)量都一樣,也就是說(shuō)上游產(chǎn)出的子分區(qū)數(shù)量應(yīng)該是下游邏輯節(jié)點(diǎn)并行度的整數(shù)倍。為簡(jiǎn)單起見(jiàn),我們希望用戶指定的最大并行度為 2^N(如果不是則會(huì)被自動(dòng)調(diào)整到不超過(guò)配置值的 2^N),然后將下游邏輯節(jié)點(diǎn)的并行度調(diào)整到最接近的 2^M(M <= N),這樣就可以保證子分區(qū)被下游均勻消費(fèi)。

不過(guò)這只是一個(gè)臨時(shí)的解決方案,最終應(yīng)該通過(guò)自動(dòng)負(fù)載均衡來(lái)解決,我們將在后續(xù)版本中實(shí)現(xiàn)。

3.3 動(dòng)態(tài)構(gòu)建執(zhí)行拓?fù)?/h3>

在引入自適應(yīng)批作業(yè)調(diào)度器之前,執(zhí)行拓?fù)涫且造o態(tài)方式構(gòu)建的,也就是在調(diào)度開(kāi)始前執(zhí)行拓?fù)渚捅煌耆珓?chuàng)建出來(lái)了。為了使邏輯節(jié)點(diǎn)并行度可以在運(yùn)行時(shí)決定,執(zhí)行拓?fù)湫枰С謩?dòng)態(tài)構(gòu)建。

3.3.1 向執(zhí)行拓?fù)鋭?dòng)態(tài)添加節(jié)點(diǎn)和邊

動(dòng)態(tài)構(gòu)建執(zhí)行拓?fù)涫侵敢粋€(gè) Flink 作業(yè)從一個(gè)空的執(zhí)行拓?fù)溟_(kāi)始,然后隨著調(diào)度逐步附加執(zhí)行節(jié)點(diǎn),如圖 2 所示。

執(zhí)行拓?fù)溆蓤?zhí)行節(jié)點(diǎn)和執(zhí)行邊(ExecutionEdge)組成。只有在以下情況下,才會(huì)將邏輯節(jié)點(diǎn)展開(kāi)創(chuàng)建執(zhí)行節(jié)點(diǎn)并將其添加到執(zhí)行拓?fù)洌?/p>

  1. 對(duì)應(yīng)邏輯節(jié)點(diǎn)的并行度已經(jīng)被確定(以便 Flink 知道應(yīng)該創(chuàng)建多少個(gè)執(zhí)行節(jié)點(diǎn));
  2. 所有上游邏輯節(jié)點(diǎn)都已經(jīng)被展開(kāi)(以便 Flink 通過(guò)執(zhí)行邊將新創(chuàng)建的執(zhí)行節(jié)點(diǎn)和上游執(zhí)行節(jié)點(diǎn)連接起來(lái))。

圖片

圖 2 - 動(dòng)態(tài)構(gòu)建執(zhí)行拓?fù)?/p>

3.3.2 子分區(qū)動(dòng)態(tài)映射

在引入自適應(yīng)批作業(yè)調(diào)度器之前,在部署執(zhí)行節(jié)點(diǎn)時(shí),F(xiàn)link 需要知道其下游邏輯節(jié)點(diǎn)的并行度。因?yàn)橄掠芜壿嫻?jié)點(diǎn)的并行度決定了上游執(zhí)行節(jié)點(diǎn)需要產(chǎn)出的子分區(qū)數(shù)量。以圖 3 為例,下游 B 的并行度為 2,因此上游的 A1/A2 需要產(chǎn)生 2 個(gè)子分區(qū),索引為 0 的子分區(qū)被 B1 消費(fèi),索引為 1 的子分區(qū)被 B2 消費(fèi)。

圖片

圖 3 - 靜態(tài)執(zhí)行拓?fù)湎M(fèi)子分區(qū)的方式

但顯然,這不適用于動(dòng)態(tài)圖,因?yàn)楫?dāng)部署上游執(zhí)行節(jié)點(diǎn)時(shí),下游邏輯節(jié)點(diǎn)的并行度可能尚未確定(即部署 A1/A2 時(shí),B 的并行度還未確定)。為了解決這個(gè)問(wèn)題,我們需要使上游執(zhí)行節(jié)點(diǎn)產(chǎn)生的子分區(qū)數(shù)量與下游邏輯節(jié)點(diǎn)的并行度解耦。

我們通過(guò)以下方法實(shí)現(xiàn)解耦:將上游執(zhí)行節(jié)點(diǎn)產(chǎn)生子分區(qū)的數(shù)量設(shè)置為下游邏輯節(jié)點(diǎn)的最大并行度(最大并行度是一個(gè)可配置的固定值),然后在下游邏輯節(jié)點(diǎn)并行度被確定后,將這些子分區(qū)均分給不同的下游執(zhí)行節(jié)點(diǎn)進(jìn)行消費(fèi)。也就是說(shuō),部署下游執(zhí)行節(jié)點(diǎn)時(shí),每個(gè)下游執(zhí)行節(jié)點(diǎn)都會(huì)被分配到一個(gè)子分區(qū)范圍來(lái)消費(fèi)。假設(shè) N 是下游邏輯節(jié)點(diǎn)并行度,P 是子分區(qū)的數(shù)量。對(duì)于第 k 個(gè)下游執(zhí)行節(jié)點(diǎn),消費(fèi)的子分區(qū)范圍應(yīng)該是:

圖片

以圖 4 為例,B 的最大并行度為 4,因此 A1/A2 有 4 個(gè)子分區(qū)。然后如果B的確定并行度為 2,則子分區(qū)映射將為圖 4(a),如果B的確定并行度為 3,則子分區(qū)映射將為圖 4(b)。

圖片

圖 4 - 動(dòng)態(tài)執(zhí)行拓?fù)湎M(fèi)子分區(qū)的方式

3.4 動(dòng)態(tài)更新并調(diào)度執(zhí)行拓?fù)?/h3>

自適應(yīng)批作業(yè)調(diào)度器調(diào)度作業(yè)的方式和默認(rèn)調(diào)度器基本相同,唯一的區(qū)別是:自適應(yīng)批作業(yè)調(diào)度器是從一個(gè)空的執(zhí)行拓?fù)溟_(kāi)始調(diào)度,在處理任何調(diào)度事件之前,都會(huì)嘗試決定所有邏輯節(jié)點(diǎn)的并行度,然后嘗試為邏輯節(jié)點(diǎn)生成對(duì)應(yīng)的執(zhí)行節(jié)點(diǎn),并通過(guò)執(zhí)行邊連接上游節(jié)點(diǎn),更新執(zhí)行拓?fù)洹?/p>

調(diào)度器會(huì)在每次調(diào)度之前嘗試按照拓?fù)漤樞驔Q定所有邏輯節(jié)點(diǎn)的并行度:

  1. 對(duì)于 source 節(jié)點(diǎn),其并行度會(huì)在開(kāi)始調(diào)度之前就進(jìn)行確定;
  2. 對(duì)于非 source 節(jié)點(diǎn),需要在其所有上游節(jié)點(diǎn)數(shù)據(jù)產(chǎn)出完成后才能確定其并行度。

然后,調(diào)度程序?qū)L試按照拓?fù)漤樞驅(qū)⑦壿嫻?jié)點(diǎn)展開(kāi)生成執(zhí)行節(jié)點(diǎn)。一個(gè)可以被展開(kāi)的邏輯節(jié)點(diǎn)應(yīng)該滿足以下條件:

  1. 該邏輯節(jié)點(diǎn)并行度已確定;
  2. 所有上游邏輯節(jié)點(diǎn)都已經(jīng)被展開(kāi)。

04未來(lái)展望 - 自動(dòng)負(fù)載均衡

運(yùn)行批作業(yè)時(shí),可能會(huì)出現(xiàn)數(shù)據(jù)傾斜(某個(gè)執(zhí)行節(jié)點(diǎn)需要處理的數(shù)據(jù)遠(yuǎn)多于其他執(zhí)行節(jié)點(diǎn)),這會(huì)導(dǎo)作業(yè)出現(xiàn)長(zhǎng)尾現(xiàn)象,拖慢作業(yè)的完成速度。如果 Flink 可以自動(dòng)改善或者解決這個(gè)問(wèn)題,可以給用戶很大的幫助。

一種典型的數(shù)據(jù)傾斜情況是某些子分區(qū)的數(shù)據(jù)量明顯大于其他子分區(qū)。這種情況可以通過(guò)劃分更細(xì)粒度的子分區(qū),并根據(jù)子分區(qū)大小來(lái)平衡工作負(fù)載來(lái)解決(如圖 5)。自適應(yīng)批作業(yè)調(diào)度器的工作可以被認(rèn)為是邁向它的第一步,因?yàn)樽詣?dòng)重新平衡的要求類似于自適應(yīng)批作業(yè)調(diào)度器,它們都需要?jiǎng)討B(tài)圖的支持和結(jié)果分區(qū)大小的采集。

基于自適應(yīng)批作業(yè)調(diào)度器的實(shí)現(xiàn),我們可以通過(guò)增加最大并行度(為了更細(xì)粒度的子分區(qū))和簡(jiǎn)單地更改子分區(qū)范圍劃分算法(為了平衡工作負(fù)載)來(lái)解決上述問(wèn)題。在目前的設(shè)計(jì)中,子分區(qū)范圍是按照子分區(qū)的個(gè)數(shù)來(lái)劃分的,我們可以改成按照子分區(qū)中的數(shù)據(jù)量來(lái)劃分,這樣每個(gè)子分區(qū)范圍內(nèi)的數(shù)據(jù)量可以大致相同,從而平衡下游執(zhí)行節(jié)點(diǎn)的工作量。

圖片

圖 5 - 自動(dòng)負(fù)載均衡

注釋

[1] https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/deployment/elastic_scaling/#adaptive-batch-scheduler

[2] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java

[3] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java

[4] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java

[5] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java

[6] https://nightlies.apache.org/flink/flink-docs-master/zh/docs/internals/job_scheduling/#jobmanager-數(shù)據(jù)結(jié)構(gòu)

[7] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/VertexParallelismDecider.java?

責(zé)任編輯:未麗燕 來(lái)源: Apache Flink
相關(guān)推薦

2011-04-06 14:16:49

SQL Server自動(dòng)備份

2021-11-05 15:55:35

作業(yè)幫Kubernetes調(diào)度器

2024-03-15 15:09:28

2010-04-15 10:41:13

2011-03-30 14:29:13

QuartzJava

2022-07-26 16:54:08

QuartzJava

2024-07-08 00:00:02

.NET系統(tǒng)調(diào)度器

2017-06-06 10:30:12

前端Web寬度自適應(yīng)

2012-05-16 11:13:35

傲游瀏覽器手機(jī)版

2022-11-09 17:12:38

AI模型

2023-08-28 08:00:45

2021-04-18 12:12:29

systemd定時(shí)器系統(tǒng)運(yùn)維

2011-10-19 08:04:12

2023-07-31 08:24:34

MySQL索引計(jì)數(shù)

2010-08-30 10:26:20

DIV自適應(yīng)高度

2015-08-12 15:10:46

Ubuntucronlinux

2012-05-09 10:58:25

JavaMEJava

2010-08-30 09:52:03

DIV高度自適應(yīng)

2014-09-05 10:10:32

Android自適應(yīng)布局設(shè)計(jì)

2011-12-13 20:08:54

云計(jì)算BMC
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)