Spark AQE SkewedJoin 在字節(jié)跳動的實踐和優(yōu)化
精選1. 概述
本文將首先介紹 Spark AQE SkewedJoin 的基本原理以及字節(jié)跳動在使用 AQE SkewedJoin 的實踐中遇到的一些問題;其次介紹針對遇到的問題所做的相關優(yōu)化和功能增強,以及相關優(yōu)化在字節(jié)跳動的收益;此外,我們還將分享 SkewedJoin 的使用經驗。
2. 背景
首先對 Spark AQE SkewedJoin 做一個簡單的介紹。Spark Adaptive Query Execution, 簡稱 Spark AQE,總體思想是動態(tài)優(yōu)化和修改 stage 的物理執(zhí)行計劃。利用執(zhí)行結束的上游 stage 的統(tǒng)計信息(主要是數據量和記錄數),來優(yōu)化下游 stage 的物理執(zhí)行計劃。
Spark AQE 能夠在 stage 提交執(zhí)行之前,根據上游 stage 的所有 MapTask 的統(tǒng)計信息,計算得到下游每個 ReduceTask 的 shuffle 輸入,因此 Spark AQE 能夠自動發(fā)現發(fā)生數據傾斜的 Join,并且做出優(yōu)化處理,該功能就是 Spark AQE SkewedJoin。
例如 A 表 inner join B 表,并且 A 表中第 0 個 partition(A0)是一個傾斜的 partition,正常情況下,A0 會和 B 表的第 0 個 partition(B0)發(fā)生 join,由于此時 A0 傾斜,task 0 就會成為長尾 task。
SkewedJoin 在執(zhí)行 A Join B 之前,通過上游 stage 的統(tǒng)計信息,發(fā)現 partition A0 明顯超過平均值的數倍,即判斷 A Join B 發(fā)生了數據傾斜,且傾斜分區(qū)為 partition A0。Spark AQE 會將 A0 的數據拆成 N 份,使用 N 個 task 去處理該 partition,每個 task 只讀取若干個 MapTask 的 shuffle 輸出文件,如下圖所示,A0-0 只會讀取 Stage0#MapTask0 中屬于 A0 的數據。這 N 個 Task 然后都讀取 B 表 partition 0 的數據做 join。這 N 個 task 執(zhí)行的結果和 A 表的 A0 join B0 的結果是等價的。
不難看出,在這樣的處理中,B 表的 partition 0 會被讀取 N 次,雖然這增加了一定的額外成本,但是通過 N 個任務處理傾斜數據帶來的收益仍然大于這樣的成本。
Spark 從3.0 版本開始支持了 AQE SkewedJoin 功能,但是我們在實踐中發(fā)現了一些問題。
- 不準確的統(tǒng)計數據可能導致 Spark 無法識別數據傾斜。
- 切分不均勻導致優(yōu)化處理效果不理想。
- 不支持復雜場景例如同一個字段發(fā)生連續(xù) join。
我將在【優(yōu)化增強】中詳述這些問題以及我們的優(yōu)化和解決方案。
3. 優(yōu)化增強
3.1 提高數據傾斜的識別能力
由 Spark AQE 處理數據傾斜的原理不難發(fā)現,Spark AQE 識別傾斜以及切分數據傾斜的功能依賴于上游 Stage 的統(tǒng)計數據,統(tǒng)計數據越準確,傾斜的識別能力和處理能力就越高,直觀表現就是傾斜數據被拆分的非常平均,拆分后的數據大小幾乎和中位數一致,將長尾Task的影響降到最低。
MapStage 執(zhí)行結束之后,每一個 MapTask 會生成統(tǒng)計結果 MapStatus,并將其發(fā)送給 Driver。MapStatus維護了一個 Array[Long],記錄了該 MapTask 中屬于下游每一個 ReduceTask 的數據大小。當 Driver 收集到了所有的 MapTask 的MapStatu之后,就能夠計算得到每一個 ReduceTask 的輸入數據量,以及分屬于每一個上游 MapTask 的數據大小。根據每一個 ReduceTask 的數據大小,Spark AQE 能夠判斷出數據傾斜,并根據上游 MapTask 的統(tǒng)計信息,合理切分 Reducetask,盡可能保證切分的均勻性。
如下圖描述,ReduceTask0 的 ShuffleRead(shuffle 過程中讀取的數據量) 為 200,明顯大于 ReduceTask1 和 ReduceTask2 的 100,發(fā)生了數據傾斜。我們可以將 ReduceTask0 拆成 2 份,ReduceTask0-0 讀取 MapTask0 和 MapTask1 的數據,ReduceTask0-1 讀取 MapTask2 和 MapTask3 的數據,拆分后的兩個 task 的 ShuffleRead 均為 100。
我們可以看出,統(tǒng)計信息的大小的空間復雜度是 O(M*R),對于大任務而言,會占據大量的 Driver 內存,所以 Spark 原生做了限制,對于 MapTask,當下游 ReduceTask 個數大于某一閾值(spark.shuffle.minNumPartitionsToHighlyCompress?,默認 2000),就會將MapStatus進行壓縮,所有小于 spark.shuffle.accurateBlockThreshold(默認100M)的值都會被一個平均值所代替填充。
舉個例子,下圖是我們遇到的一個 SkewedJoin 沒有生效的作業(yè),從運行 metrics 來看,ShuffleRead 發(fā)生了很嚴重的傾斜,符合 SkewedJoin 生效的場景,但實際運行時并沒有生效。
通過閱讀日志,可以看到,Spark AQE 在運行時,獲取的 join 兩側的 shuffle partitions 的中位數和最大值都是一樣的,所以沒有識別到任何的傾斜。這就是由于壓縮后 MapStatus 的統(tǒng)計數據的不準確造成的。
我們在實踐中,遇到很多大作業(yè)由于統(tǒng)計數據不準確,無法識別傾斜。而當我們嘗試提高這一閾值之后,部分大作業(yè)由于 Driver 內存使用上漲而失敗,為了解決這一問題,我們做了以下優(yōu)化:
- Driver 收到詳細的 MapStatus之后,先將數據用于更新每個 ReduceTask 的累計輸入數據,然后將 MapStatus壓縮,這樣就不會占用太多內存。此時,雖然壓縮后的 MapStatus無法讓我們獲得 ReduceTask 準確的上游分布,但是能夠獲得準確的 ReduceTask 的輸入數據總大小,這樣我們就能夠準確的識別發(fā)生傾斜的 ReduceTask。
- 上述優(yōu)化增加了一次 MapStatus 的解壓操作,而 MapStatus 的解壓是一個比較耗CPU的操作,對于大作業(yè)可能出現 Driver CPU 被打滿,無法處理 Executor 心跳導致作業(yè)失敗的情況。對此,我們使用緩存保證Driver端在消費 MapStatus 時,每個 MapStatus 只會被解壓一次,大大降低了優(yōu)化帶來的 Overhead。
通過上述優(yōu)化,我們成功在線上將默認閾值從 2000 調整為 5000,保證了線上 96.6% 的 Spark 作業(yè)能夠準確的識別數據傾斜(如果存在)。
3.2 提高傾斜數據的切分均勻程度
由于 HighlyCompressMapStatus 用平均值填充所有低于 spark.shuffle.accurateBlockThreshold 的值,每個 ReduceTask 通過壓縮后的 MapStatus 累加計算得到的總數據大小和數據分布,就和實際差距很大。
舉個簡單的例子:我們得到 ReduceTask0 的實際總數據是 1G,而中位數是 100M,因此我們的期望是將 ReduceTask0 拆成 10 份,每一份是 100M。此時上游的 MapStage 一共有 100 個 MapTask,除了 MapTask0 中屬于 ReduceTask0 的數據是 100M,其他 99 個 MapStak 的數據都是 10M。當我們將所有的 MapStatus 壓縮之后,AQE 獲取的 ReduceTask0 的上游分布,就是 MapTask0 有 100M (因為大塊數據所以被保留),其他 99 個 MapTask 的數據都是 1M(在壓縮時使用平均值填充)。這時,Spark AQE 按照 100M 的期望值來切分,只會切分成兩個 ReduceTask:ReduceTask0-0(讀取MapTask0)和 ReduceTask0-1(讀取剩下99個MapTask)。
基于此,我們改進后的方法是利用精確的 ReduceTask 數據量來反推每個 MapperTask 對應的數據量,得到盡可能準確的數據分布。同樣是剛才的例子,我們已知 ReduceTask0 的實際總數據是 1G,MapStatus 壓縮的閾值是 100M,那么可以確定的是,MapTask0 關于 ReduceTask0 的數據 100M 是準確被保留的(因為大于等于閾值),而其他 99 個 MapTask 的數據都是不準確的。此時 AQE 就不會使用被壓縮的數據,而是通過 1G 的總數據反推得到其他 99 個 MapTask 中屬于 ReduceTask0 的數據是 10M,雖然同樣是存在誤差的平均值,但是相比壓縮數據,通過準確的總量反推得到的平均值會更加準確。這個時候 Spark 按照 100M 的期望值來切分,就會切成 10 個 ReduceTask,符合我們的預期。
而在實際應用中,利用新方案,AQE SkewedJoin 切分傾斜數據更加平均,優(yōu)化效果有明顯的提升。
下圖是某個傾斜處理效果不理想的作業(yè),SkewedJoin 生效后,該 Stage ShuffleReadSize 的中位數和最大值分別為 4M 和 9.9G。
經過我們的優(yōu)化后,該 Stage 的 ShuffleReadSize 的中位數和最大值分別為 149M 和 1427M,傾斜分區(qū)的切分更加均勻,該 Stage 的運行時間也由原來的 2h 降為 20m。
3.3 支持更多的場景
場景1:JoinWithAggOrWin
以下圖為例,Stage10 雖然只有一個 SortMergeJoin,但是 join 的一邊并不是 Sort+Exchange 的組合,而是存在 Aggregate 算子或者 Window 算子,因此不屬于社區(qū)實現的范圍內。
場景2:MultipleSkewedJoin
在用戶的業(yè)務邏輯中,經常出現這樣一種場景:一張表的主鍵需要連續(xù)的 join 多張表,這種場景體現在 Spark 的具體執(zhí)行上,就是連續(xù)的 join 存在于同一個 Stage 當中。如下圖所示 Stage21 中存在連續(xù)的多個 SortMergeJoin,而這種場景也是社區(qū)的實現無法優(yōu)化的。
場景3:JoinWithUnion
Stage 中有 Union 算子,且 Union 的 children 中有 SMJ。
此外,我們還支持了 ShuffleHashJoin、 BucketJoin、MultipleJoinWithAggOrWin 等更多場景。
4. 字節(jié)的實踐
上面介紹的 LAS 對 Spark AQE SkewedJoin 的優(yōu)化功能在字節(jié)跳動內部已使用 1 年左右,截止 2022年8月,優(yōu)化日均覆蓋1.8萬+ Spark 作業(yè),優(yōu)化命中作業(yè)平均性能提升 35% 左右,其中 30% 被優(yōu)化的 Spark 作業(yè)所屬于的場景是 LAS 自研支持的,大家可以通過火山引擎開通 LAS 服務并體驗這些優(yōu)化功能。
5. 用戶指南
5.1 哪些場景 AQE SkewedJoin 不支持
AQE SkewedJoin 功能并不能處理所有發(fā)生數據傾斜的 Join,這是由它的實現邏輯所決定的。
第一,如果傾斜的分區(qū)的大部分數據來自于上游的同一個 Mapper,AQE SkewedJoin 無法處理,原因是 Spark 不支持 Reduce Task 只讀取上游 Mapper 的一個 block 的部分數據。
第二,如果 Join 的發(fā)生傾斜的一側存在 Agg 或者 Window 這類有指定 requiredChildDistribution 的算子,那么 SkewedJoin 優(yōu)化無法處理,因為將分區(qū)切分會破壞 RDD 的 outputPartitioning,導致不再滿足 requiredChildDistribution。
第三,對于 Outer/Semi Join,AQE SkewedJoin 是無法處理非 Outer/Semi 側的數據傾斜。比如,對于 LeftOuter Join,SkewedJoin 無法處理右側的數據傾斜。
第四,AQE 無法處理傾斜的 BroadcastHashJoin。
5.2 AQE SkewedJoin 優(yōu)化效果不明顯時的措施
如果遇到了符合應用場景但是 SkewedJoin 沒有生效或者傾斜處理效果不理想的情況,有以下調優(yōu)手段:
- 提高spark.shuffle.minNumPartitionsToHighlyCompress,保證值大于等于 shuffle 并發(fā)(當開啟 AQE 時,即為spark.sql.adaptive.coalescePartitions.initialPartitionNum)。
- 調小spark.shuffle.accurateBlockThreshold,比如 4M。但是需要注意的是,這會增加 Driver 的內存消耗,需要同步增加 Driver 的 cpu 和內存。
- 降低spark.sql.adaptive.skewJoin.skewedPartitionFactor,降低定義發(fā)生傾斜的閾值。
6. 總結
本文首先簡單介紹了 Spark AQE 的基本思想以及 SkewedJoin 功能的原理,接著提出了我們在應用 SkewedJoin的過程中遇到的一些問題。針對這些問題,我們介紹了對 AQE SkewedJoin 做的優(yōu)化和增強——提高統(tǒng)計的準確度;提高傾斜數據的切分均勻程度;支持了更多的場景。接著,本文介紹了 AQE SkewedJoin 在字節(jié)跳動的使用情況,包括日均優(yōu)化覆蓋作業(yè)和優(yōu)化效果,其中30%被優(yōu)化的 Spark 作業(yè)所屬于的場景是字節(jié)自研支持的。最后分享了我們關于 AQE SkewedJoin 的用戶指南:哪些場景 AQE SkewedJoin 不支持;當 AQE SkewedJoin 效果不明顯時,可以采取哪些措施。
7. 附錄A :本文涉及的關于 AQE SkewedJoin 優(yōu)化的相關參數配置
參數配置名 | 默認值 | 參數意義 |
spark.shuffle.minNumPartitionsToHighlyCompress | 2000 | 決定 Mapstatus 使用 HighlyCompressedMapStatus還是 CompressedMapStatus 的閾值,如果 huffle partition 大于該值,則使用 HighlyCompressedMapStatus。 |
spark.shuffle.accurateBlockThreshold | 100M | HighlyCompressedMapStatus 中記錄 shuffle blcok 準確大小的閾值,當 block 小于該值則用平均值代替。 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor | 10 | 如果一個 partition 大于該因子乘以分區(qū)大小的中位數,那么它就是傾斜的 partition。 |