Adaptive Execution 讓 Spark SQL 更智能更高效
本文轉(zhuǎn)發(fā)自技術(shù)世界,原文鏈接 http://www.jasongj.com/spark/adaptive_execution/
1 背景
前面《Spark SQL / Catalyst 內(nèi)部原理 與 RBO》與《Spark SQL 性能優(yōu)化再進一步 CBO 基于代價的優(yōu)化》介紹的優(yōu)化,從查詢本身與目標(biāo)數(shù)據(jù)的特點的角度盡可能保證了最終生成的執(zhí)行計劃的高效性。但是執(zhí)行計劃一旦生成,便不可更改,即使執(zhí)行過程中發(fā)現(xiàn)后續(xù)執(zhí)行計劃可以進一步優(yōu)化,也只能按原計劃執(zhí)行。
CBO 基于統(tǒng)計信息生成***執(zhí)行計劃,需要提前生成統(tǒng)計信息,成本較大,且不適合數(shù)據(jù)更新頻繁的場景。
CBO 基于基礎(chǔ)表的統(tǒng)計信息與操作對數(shù)據(jù)的影響推測中間結(jié)果的信息,只是估算,不夠精確。
本文介紹的 Adaptive Execution 將可以根據(jù)執(zhí)行過程中的中間數(shù)據(jù)優(yōu)化后續(xù)執(zhí)行,從而提高整體執(zhí)行效率。核心在于兩點:
- 執(zhí)行計劃可動態(tài)調(diào)整
- 調(diào)整的依據(jù)是中間結(jié)果的精確統(tǒng)計信息
2 動態(tài)設(shè)置 Shuffle Partition
2.1 Spark Shuffle 原理
Spark Shuffle 一般用于將上游 Stage 中的數(shù)據(jù)按 Key 分區(qū),保證來自不同 Mapper (表示上游 Stage 的 Task)的相同的 Key 進入相同的 Reducer (表示下游 Stage 的 Task)。一般用于 group by 或者 Join 操作。
如上圖所示,該 Shuffle 總共有 2 個 Mapper 與 5 個 Reducer。每個 Mapper 會按相同的規(guī)則(由 Partitioner 定義)將自己的數(shù)據(jù)分為五份。每個 Reducer 從這兩個 Mapper 中拉取屬于自己的那一份數(shù)據(jù)。
2.2 原有 Shuffle 的問題
使用 Spark SQL 時,可通過 spark.sql.shuffle.partitions 指定 Shuffle 時 Partition 個數(shù),也即 Reducer 個數(shù)。
該參數(shù)決定了一個 Spark SQL Job 中包含的所有 Shuffle 的 Partition 個數(shù)。如下圖所示,當(dāng)該參數(shù)值為 3 時,所有 Shuffle 中 Reducer 個數(shù)都為 3。
這種方法有如下問題:
- Partition 個數(shù)不宜設(shè)置過大
- Reducer(代指 Spark Shuffle 過程中執(zhí)行 Shuffle Read 的 Task) 個數(shù)過多,每個 Reducer 處理的數(shù)據(jù)量過小。大量小 Task 造成不必要的 Task 調(diào)度開銷與可能的資源調(diào)度開銷(如果開啟了 Dynamic Allocation)
- Reducer 個數(shù)過大,如果 Reducer 直接寫 HDFS 會生成大量小文件,從而造成大量 addBlock RPC,Name node 可能成為瓶頸,并影響其它使用 HDFS 的應(yīng)用
- 過多 Reducer 寫小文件,會造成后面讀取這些小文件時產(chǎn)生大量 getBlock RPC,對 Name node 產(chǎn)生沖擊
- Partition 個數(shù)不宜設(shè)置過小
- 每個 Reducer 處理的數(shù)據(jù)量太大,Spill 到磁盤開銷增大
- Reducer GC 時間增長
- Reducer 如果寫 HDFS,每個 Reducer 寫入數(shù)據(jù)量較大,無法充分發(fā)揮并行處理優(yōu)勢
- 很難保證所有 Shuffle 都***
- 不同的 Shuffle 對應(yīng)的數(shù)據(jù)量不一樣,因此***的 Partition 個數(shù)也不一樣。使用統(tǒng)一的 Partition 個數(shù)很難保證所有 Shuffle 都***
- 定時任務(wù)不同時段數(shù)據(jù)量不一樣,相同的 Partition 數(shù)設(shè)置無法保證所有時間段執(zhí)行時都***
2.3 自動設(shè)置 Shuffle Partition 原理
如 Spark Shuffle 原理 一節(jié)圖中所示,Stage 1 的 5 個 Partition 數(shù)據(jù)量分別為 60MB,40MB,1MB,2MB,50MB。其中 1MB 與 2MB 的 Partition 明顯過小(實際場景中,部分小 Partition 只有幾十 KB 及至幾十字節(jié))。
開啟 Adaptive Execution 后
- Spark 在 Stage 0 的 Shuffle Write 結(jié)束后,根據(jù)各 Mapper 輸出,統(tǒng)計得到各 Partition 的數(shù)據(jù)量,即 60MB,40MB,1MB,2MB,50MB。
- 通過 ExchangeCoordinator 計算出合適的 post-shuffle Partition 個數(shù)(即 Reducer)個數(shù)(本例中 Reducer 個數(shù)設(shè)置為 3)。
- 啟動相應(yīng)個數(shù)的 Reducer 任務(wù)。
- 每個 Reducer 讀取一個或多個 Shuffle Write Partition 數(shù)據(jù)(如下圖所示,Reducer 0 讀取 Partition 0,Reducer 1 讀取 Partition 1、2、3,Reducer 2 讀取 Partition 4)。
三個 Reducer 這樣分配是因為:
- targetPostShuffleInputSize 默認(rèn)為 64MB,每個 Reducer 讀取數(shù)據(jù)量不超過 64MB
- 如果 Partition 0 與 Partition 2 結(jié)合,Partition 1 與 Partition 3 結(jié)合,雖然也都不超過 64 MB。但讀完 Partition 0 再讀 Partition 2,對于同一個 Mapper 而言,如果每個 Partition 數(shù)據(jù)比較少,跳著讀多個 Partition 相當(dāng)于隨機讀,在 HDD 上性能不高
- 目前的做法是只結(jié)合相臨的 Partition,從而保證順序讀,提高磁盤 IO 性能
- 該方案只會合并多個小的 Partition,不會將大的 Partition 拆分,因為拆分過程需要引入一輪新的 Shuffle
- 基于上面的原因,默認(rèn) Partition 個數(shù)(本例中為 5)可以大一點,然后由 ExchangeCoordinator 合并。如果設(shè)置的 Partition 個數(shù)太小,Adaptive Execution 在此場景下無法發(fā)揮作用
由上圖可見,Reducer 1 從每個 Mapper 讀取 Partition 1、2、3 都有三根線,是因為原來的 Shuffle 設(shè)計中,每個 Reducer 每次通過 Fetch 請求從一個特定 Mapper 讀數(shù)據(jù)時,只能讀一個 Partition 的數(shù)據(jù)。也即在上圖中,Reducer 1 讀取 Mapper 0 的數(shù)據(jù),需要 3 輪 Fetch 請求。對于 Mapper 而言,需要讀三次磁盤,相當(dāng)于隨機 IO。
為了解決這個問題,Spark 新增接口,一次 Shuffle Read 可以讀多個 Partition 的數(shù)據(jù)。如下圖所示,Task 1 通過一輪請求即可同時讀取 Task 0 內(nèi) Partition 0、1 和 2 的數(shù)據(jù),減少了網(wǎng)絡(luò)請求數(shù)量。同時 Mapper 0 一次性讀取并返回三個 Partition 的數(shù)據(jù),相當(dāng)于順序 IO,從而提升了性能。
由于 Adaptive Execution 的自動設(shè)置 Reducer 是由 ExchangeCoordinator 根據(jù) Shuffle Write 統(tǒng)計信息決定的,因此即使在同一個 Job 中不同 Shuffle 的 Reducer 個數(shù)都可以不一樣,從而使得每次 Shuffle 都盡可能***。
上文 原有 Shuffle 的問題 一節(jié)中的例子,在啟用 Adaptive Execution 后,三次 Shuffle 的 Reducer 個數(shù)從原來的全部為 3 變?yōu)?2、4、3。
2.4 使用與優(yōu)化方法
可通過 spark.sql.adaptive.enabled=true 啟用 Adaptive Execution 從而啟用自動設(shè)置 Shuffle Reducer 這一特性。
通過 spark.sql.adaptive.shuffle.targetPostShuffleInputSize 可設(shè)置每個 Reducer 讀取的目標(biāo)數(shù)據(jù)量,其單位是字節(jié),默認(rèn)值為 64 MB。上文例子中,如果將該值設(shè)置為 50 MB,最終效果仍然如上文所示,而不會將 Partition 0 的 60MB 拆分。具體原因上文已說明。
3 動態(tài)調(diào)整執(zhí)行計劃
3.1 固定執(zhí)行計劃的不足
在不開啟 Adaptive Execution 之前,執(zhí)行計劃一旦確定,即使發(fā)現(xiàn)后續(xù)執(zhí)行計劃可以優(yōu)化,也不可更改。如下圖所示,SortMergJoin 的 Shuffle Write 結(jié)束后,發(fā)現(xiàn) Join 一方的 Shuffle 輸出只有 46.9KB,仍然繼續(xù)執(zhí)行 SortMergeJoin。
此時完全可將 SortMergeJoin 變更為 BroadcastJoin 從而提高整體執(zhí)行效率。
3.2 SortMergeJoin 原理
SortMergeJoin 是常用的分布式 Join 方式,它幾乎可使用于所有需要 Join 的場景。但有些場景下,它的性能并不是***的。
SortMergeJoin 的原理如下圖所示:
- 將 Join 雙方以 Join Key 為 Key 按照 HashPartitioner 分區(qū),且保證分區(qū)數(shù)一致
- Stage 0 與 Stage 1 的所有 Task 在 Shuffle Write 時,都將數(shù)據(jù)分為 5 個 Partition,并且每個 Partition 內(nèi)按 Join Key 排序
- Stage 2 啟動 5 個 Task 分別去 Stage 0 與 Stage 1 中所有包含 Partition 分區(qū)數(shù)據(jù)的 Task 中取對應(yīng) Partition 的數(shù)據(jù)。(如果某個 Mapper 不包含該 Partition 的數(shù)據(jù),則 Redcuer 無須向其發(fā)起讀取請求)。
- Stage 2 的 Task 2 分別從 Stage 0 的 Task 0、1、2 中讀取 Partition 2 的數(shù)據(jù),并且通過 MergeSort 對其進行排序
- Stage 2 的 Task 2 分別從 Stage 1 的 Task 0、1 中讀取 Partition 2 的數(shù)據(jù),且通過 MergeSort 對其進行排序
- Stage 2 的 Task 2 在上述兩步 MergeSort 的同時,使用 SortMergeJoin 對二者進行 Join
3.3 BroadcastJoin 原理
當(dāng)參與 Join 的一方足夠小,可全部置于 Executor 內(nèi)存中時,可使用 Broadcast 機制將整個 RDD 數(shù)據(jù)廣播到每一個 Executor 中,該 Executor 上運行的所有 Task 皆可直接讀取其數(shù)據(jù)。(本文中,后續(xù)配圖,為了方便展示,會將整個 RDD 的數(shù)據(jù)置于 Task 框內(nèi),而隱藏 Executor)
對于大 RDD,按正常方式,每個 Task 讀取并處理一個 Partition 的數(shù)據(jù),同時讀取 Executor 內(nèi)的廣播數(shù)據(jù),該廣播數(shù)據(jù)包含了小 RDD 的全量數(shù)據(jù),因此可直接與每個 Task 處理的大 RDD 的部分?jǐn)?shù)據(jù)直接 Join。
根據(jù) Task 內(nèi)具體的 Join 實現(xiàn)的不同,又可分為 BroadcastHashJoin 與 BroadcastNestedLoopJoin。后文不區(qū)分這兩種實現(xiàn),統(tǒng)稱為 BroadcastJoin。
與 SortMergeJoin 相比,BroadcastJoin 不需要 Shuffle,減少了 Shuffle 帶來的開銷,同時也避免了 Shuffle 帶來的數(shù)據(jù)傾斜,從而極大地提升了 Job 執(zhí)行效率。
同時,BroadcastJoin 帶來了廣播小 RDD 的開銷。另外,如果小 RDD 過大,無法存于 Executor 內(nèi)存中,則無法使用 BroadcastJoin。
對于基礎(chǔ)表的 Join,可在生成執(zhí)行計劃前,直接通過 HDFS 獲取各表的大小,從而判斷是否適合使用 BroadcastJoin。但對于中間表的 Join,無法提前準(zhǔn)確判斷中間表大小從而精確判斷是否適合使用 BroadcastJoin。
《Spark SQL 性能優(yōu)化再進一步 CBO 基于代價的優(yōu)化》一文介紹的 CBO 可通過表的統(tǒng)計信息與各操作對數(shù)據(jù)統(tǒng)計信息的影響,推測出中間表的統(tǒng)計信息,但是該方法得到的統(tǒng)計信息不夠準(zhǔn)確。同時該方法要求提前分析表,具有較大開銷。
而開啟 Adaptive Execution 后,可直接根據(jù) Shuffle Write 數(shù)據(jù)判斷是否適用 BroadcastJoin。
3.4 動態(tài)調(diào)整執(zhí)行計劃原理
如上文 SortMergeJoin 原理 中配圖所示,SortMergeJoin 需要先對 Stage 0 與 Stage 1 按同樣的 Partitioner 進行 Shuffle Write。
Shuffle Write 結(jié)束后,可從每個 ShuffleMapTask 的 MapStatus 中統(tǒng)計得到按原計劃執(zhí)行時 Stage 2 各 Partition 的數(shù)據(jù)量以及 Stage 2 需要讀取的總數(shù)據(jù)量。(一般來說,Partition 是 RDD 的屬性而非 Stage 的屬性,本文為了方便,不區(qū)分 Stage 與 RDD??梢院唵握J(rèn)為一個 Stage 只有一個 RDD,此時 Stage 與 RDD 在本文討論范圍內(nèi)等價)。
如果其中一個 Stage 的數(shù)據(jù)量較小,適合使用 BroadcastJoin,無須繼續(xù)執(zhí)行 Stage 2 的 Shuffle Read。相反,可利用 Stage 0 與 Stage 1 的數(shù)據(jù)進行 BroadcastJoin,如下圖所示:
具體做法是
- 將 Stage 1 全部 Shuffle Write 結(jié)果廣播出去
- 啟動 Stage 2,Partition 個數(shù)與 Stage 0 一樣,都為 3
- 每個 Stage 2 每個 Task 讀取 Stage 0 每個 Task 的 Shuffle Write 數(shù)據(jù),同時與廣播得到的 Stage 1 的全量數(shù)據(jù)進行 Join
注:廣播數(shù)據(jù)存于每個 Executor 中,其上所有 Task 共享,無須為每個 Task 廣播一份數(shù)據(jù)。上圖中,為了更清晰展示為什么能夠直接 Join 而將 Stage 2 每個 Task 方框內(nèi)都放置了一份 Stage 1 的全量數(shù)據(jù)
雖然 Shuffle Write 已完成,將后續(xù)的 SortMergeJoin 改為 Broadcast 仍然能提升執(zhí)行效率
- SortMergeJoin 需要在 Shuffle Read 時對來自 Stage 0 與 Stage 1 的數(shù)據(jù)進行 Merge Sort,并且可能需要 Spill 到磁盤,開銷較大
- SortMergeJoin 時,Stage 2 的所有 Task 需要取 Stage 0 與 Stage 1 的所有 Task 的輸出數(shù)據(jù)(如果有它要的數(shù)據(jù) ),會造成大量的網(wǎng)絡(luò)連接。且當(dāng) Stage 2 的 Task 較多時,會造成大量的磁盤隨機讀操作,效率不高,且影響相同機器上其它 Job 的執(zhí)行效率
- SortMergeJoin 時,Stage 2 每個 Task 需要從幾乎所有 Stage 0 與 Stage 1 的 Task 取數(shù)據(jù),無法很好利用 Locality
- Stage 2 改用 Broadcast,每個 Task 直接讀取 Stage 0 的每個 Task 的數(shù)據(jù)(一對一),可很好利用 Locality 特性。***在 Stage 0 使用的 Executor 上直接啟動 Stage 2 的 Task。如果 Stage 0 的 Shuffle Write 數(shù)據(jù)并未 Spill 而是在內(nèi)存中,則 Stage 2 的 Task 可直接讀取內(nèi)存中的數(shù)據(jù),效率非常高。如果有 Spill,那可直接從本地文件中讀取數(shù)據(jù),且是順序讀取,效率遠比通過網(wǎng)絡(luò)隨機讀數(shù)據(jù)效率高
3.5 使用與優(yōu)化方法
該特性的使用方式如下
- 當(dāng) spark.sql.adaptive.enabled 與 spark.sql.adaptive.join.enabled 都設(shè)置為 true 時,開啟 Adaptive Execution 的動態(tài)調(diào)整 Join 功能
- spark.sql.adaptiveBroadcastJoinThreshold 設(shè)置了 SortMergeJoin 轉(zhuǎn) BroadcastJoin 的閾值。如果不設(shè)置該參數(shù),該閾值與 spark.sql.autoBroadcastJoinThreshold 的值相等
- 除了本文所述 SortMergeJoin 轉(zhuǎn) BroadcastJoin,Adaptive Execution 還可提供其它 Join 優(yōu)化策略。部分優(yōu)化策略可能會需要增加 Shuffle。spark.sql.adaptive.allowAdditionalShuffle 參數(shù)決定了是否允許為了優(yōu)化 Join 而增加 Shuffle。其默認(rèn)值為 false
4 自動處理數(shù)據(jù)傾斜
4.1 解決數(shù)據(jù)傾斜典型方案
《Spark性能優(yōu)化之道——解決Spark數(shù)據(jù)傾斜(Data Skew)的N種姿勢》一文講述了數(shù)據(jù)傾斜的危害,產(chǎn)生原因,以及典型解決方法
- 保證文件可 Split 從而避免讀 HDFS 時數(shù)據(jù)傾斜
- 保證 Kafka 各 Partition 數(shù)據(jù)均衡從而避免讀 Kafka 引起的數(shù)據(jù)傾斜
- 調(diào)整并行度或自定義 Partitioner 從而分散分配給同一 Task 的大量不同 Key
- 使用 BroadcastJoin 代替 ReduceJoin 消除 Shuffle 從而避免 Shuffle 引起的數(shù)據(jù)傾斜
- 對傾斜 Key 使用隨機前綴或后綴從而分散大量傾斜 Key,同時將參與 Join 的小表擴容,從而保證 Join 結(jié)果的正確性
4.2 自動解決數(shù)據(jù)傾斜
目前 Adaptive Execution 可解決 Join 時數(shù)據(jù)傾斜問題。其思路可理解為將部分傾斜的 Partition (傾斜的判斷標(biāo)準(zhǔn)為該 Partition 數(shù)據(jù)是所有 Partition Shuffle Write 中位數(shù)的 N 倍) 進行單獨處理,類似于 BroadcastJoin,如下圖所示:
在上圖中,左右兩邊分別是參與 Join 的 Stage 0 與 Stage 1 (實際應(yīng)該是兩個 RDD 進行 Join,但如同上文所述,這里不區(qū)分 RDD 與 Stage),中間是獲取 Join 結(jié)果的 Stage 2
明顯 Partition 0 的數(shù)據(jù)量較大,這里假設(shè) Partition 0 符合“傾斜”的條件,其它 4 個 Partition 未傾斜
以 Partition 對應(yīng)的 Task 2 為例,它需獲取 Stage 0 的三個 Task 中所有屬于 Partition 2 的數(shù)據(jù),并使用 MergeSort 排序。同時獲取 Stage 1 的兩個 Task 中所有屬于 Partition 2 的數(shù)據(jù)并使用 MergeSort 排序。然后對二者進行 SortMergeJoin
對于 Partition 0,可啟動多個 Task
- 在上圖中,啟動了兩個 Task 處理 Partition 0 的數(shù)據(jù),分別名為 Task 0-0 與 Task 0-1
- Task 0-0 讀取 Stage 0 Task 0 中屬于 Partition 0 的數(shù)據(jù)
- Task 0-1 讀取 Stage 0 Task 1 與 Task 2 中屬于 Partition 0 的數(shù)據(jù),并進行 MergeSort
- Task 0-0 與 Task 0-1 都從 Stage 1 的兩個 Task 中所有屬于 Partition 0 的數(shù)據(jù)
- Task 0-0 與 Task 0-1 使用 Stage 0 中屬于 Partition 0 的部分?jǐn)?shù)據(jù)與 Stage 1 中屬于 Partition 0 的全量數(shù)據(jù)進行 Join
通過該方法,原本由一個 Task 處理的 Partition 0 的數(shù)據(jù)由多個 Task 共同處理,每個 Task 需處理的數(shù)據(jù)量減少,從而避免了 Partition 0 的傾斜
對于 Partition 0 的處理,有點類似于 BroadcastJoin 的做法。但區(qū)別在于,Stage 2 的 Task 0-0 與 Task 0-1 同時獲取 Stage 1 中屬于 Partition 0 的全量數(shù)據(jù),是通過正常的 Shuffle Read 機制實現(xiàn),而非 BroadcastJoin 中的變量廣播實現(xiàn)
4.3 使用與優(yōu)化方法
開啟與調(diào)優(yōu)該特性的方法如下:
- 將 spark.sql.adaptive.skewedJoin.enabled 設(shè)置為 true 即可自動處理 Join 時數(shù)據(jù)傾斜
- spark.sql.adaptive.skewedPartitionMaxSplits 控制處理一個傾斜 Partition 的 Task 個數(shù)上限,默認(rèn)值為 5
- spark.sql.adaptive.skewedPartitionRowCountThreshold 設(shè)置了一個 Partition 被視為傾斜 Partition 的行數(shù)下限,也即行數(shù)低于該值的 Partition 不會被當(dāng)作傾斜 Partition 處理。其默認(rèn)值為 10L * 1000 * 1000 即一千萬
- spark.sql.adaptive.skewedPartitionSizeThreshold設(shè)置了一個 Partition 被視為傾斜 Partition 的大小下限,也即大小小于該值的 Partition 不會被視作傾斜 Partition。其默認(rèn)值為 64 * 1024 * 1024 也即 64MB
- spark.sql.adaptive.skewedPartitionFactor 該參數(shù)設(shè)置了傾斜因子。如果一個 Partition 的大小大于 spark.sql.adaptive.skewedPartitionSizeThreshold的同時大于各 Partition 大小中位數(shù)與該因子的乘積,或者行數(shù)大于 spark.sql.adaptive.skewedPartitionRowCountThreshold 的同時大于各 Partition 行數(shù)中位數(shù)與該因子的乘積,則它會被視為傾斜的 Partition