Blaze RangePartitioning 算子 Native 實現(xiàn)全解析 原創(chuàng)
引言:本文將全面且深入地解析 Blaze RangePartitioning 算子的 Native 實現(xiàn)過程。相較于原生 Spark,RangePartitioning 的 Native 實現(xiàn)在執(zhí)行時間上達到了 30%的顯著下降,同時在資源開銷方面節(jié)省了高達 76%。這一改進大幅降低了運行成本,展現(xiàn)了 Native 實現(xiàn)帶來的巨大優(yōu)勢。
一、算子描述
RangePartitioning 是 shuffle partitioning 的一種分區(qū)類型。它通過根據(jù)數(shù)據(jù)的值范圍將數(shù)據(jù)劃分成多個分區(qū)。每個分區(qū)包含特定范圍內(nèi)的值,通常用于處理有序的數(shù)據(jù)集,能夠根據(jù)數(shù)據(jù)值進行動態(tài)劃分。
RangePartitioning 的基本思想是:先對數(shù)據(jù)采樣找到劃分標志 bounds,根據(jù) bounds 將數(shù)據(jù)劃分成多個近似大小的區(qū)間,然后將數(shù)據(jù)按所屬區(qū)間寫入對應 partition,用于 order by 全排序場景。
二、實現(xiàn)方案
RangePartitioning 實現(xiàn)主要包含采樣和 partition 劃分兩個部分。
步驟一:首先需要獲取每個 partition 對應的區(qū)間劃分范圍 bounds,所以會先對全量數(shù)據(jù)進行采樣,算出 partitionNum - 1 個區(qū)間分割點 bounds。具體流程如下:
1、在 driver 端基于 InternalRow 進行數(shù)據(jù)采樣:
- 通過 spark.sql.execution.rangeExchange.sampleSizePerPartition 參數(shù)控制每個分區(qū)平均采樣數(shù)量,設置一個稍微過采樣一點的采樣數(shù) sampleSizePerPartition。
- 對每個分區(qū)采用蓄水池采樣(Reservoir Sampling)算法進行采樣。
- 對采樣結果評估,記錄采樣不均衡的分區(qū)重新采樣(某個分區(qū)數(shù)據(jù)量過多,按照 sampleSizePerPartition 均值采樣會出現(xiàn)樣本數(shù)少于實際應采樣數(shù)量,即采樣不均衡的情況)。
- 計算每個樣本的權重 weight,通過 sumWeights/numReducer = step 找到每個邊界的步長,類似于直方圖劃分邊界找出 numReducer-1 個分割點 bounds。
2、由于采樣數(shù)據(jù)量可能不足導致 bounds 較少,需要重新設置 partitionNum=bounds.len + 1。因此會出現(xiàn) RangePartitioning 的實際 partition num 與設置數(shù)量不同的情況。
3、定義 rangepartition 的序列化方式,主要包括三個參數(shù):SortExpr、numPartitions、Bounds。進而轉成 native 算子進行后續(xù)處理。
步驟二:在 native 端需要再計算一次全量數(shù)據(jù),將數(shù)據(jù)按分割點 bounds 寫入對應的 partition。具體流程如下:
1、將 bounds 和 input 數(shù)據(jù)都轉成可直接比較的 arrow-row 類型。
2、針對每個 batch,對將數(shù)據(jù)與 bounds 進行比較并確定所在 partition id:
- 如果 bounds.len<=128,直接進行比較。
- 如果 bounds.len>128,進行二分查找提速。
三、優(yōu)化效果
通過構造 sql 語句測試加速效果:
sql 測試例子
11.8GB 數(shù)據(jù)量:
insert overwrite table blaze_t.like_lineitem select * from tpch_parquet_1000.lineitem order by l_quantity
復制代碼
實現(xiàn) Native RangePartitioning
執(zhí)行計劃:
sql 時間 1073.516 s
Stage Total Time Across All Tasks: 8.9h
沒有實現(xiàn) Native RangePartitioning,會回退到 spark 的 RangePartitioning
sql 時間 1357.814 s
Stage Total Time Across All Tasks 38.1h
多個不同 sql 測試取均值
Stage 時間提升:76.94%
四、總結
- 多次測試取均值,RangePartitioning 實現(xiàn) native 相比舊版執(zhí)行時間下降 30%,資源開銷節(jié)約 70%
- 由于采樣結果可能較少導致 bounds 小于 partition num-1,RangePartitioning 可能實際執(zhí)行的 partition num 與設置不同。
