Blaze:快手自研 Spark 向量化引擎從生產(chǎn)實踐到社區(qū)開源
一、關(guān)于向量化的介紹
1. 向量化是什么?
首先需要申明的是,這里的“向量化”并不是機器學(xué)習(xí)領(lǐng)域里的“向量化”,而是特指在大數(shù)據(jù)計算引擎里的一種技術(shù)。
那么這里的向量化(Vectorization)是什么呢?類比于上圖中生產(chǎn)化學(xué)藥劑的流水線,傳統(tǒng)做法是每次拿一個空瓶子,做罐裝,再蓋蓋子,送走,然后下一個瓶子;而一個高級版的生產(chǎn)線,每次可以灌裝十幾個、上百個空瓶子,灌裝完成后,并行發(fā)送到下一個流水線,然后統(tǒng)一把瓶子蓋上,處理速度會大幅提升。大數(shù)據(jù)計算引擎中的向量化也是類似的,通過硬件上的并行計算,一次性處理多條數(shù)據(jù),可以實現(xiàn)非常高效的計算。
2. 列式存儲
提到向量化,不得不提到另一個名詞叫做:列式存儲,指的是數(shù)據(jù)存儲在磁盤上的一種形式。
以上圖為例,一個表有 3 個字段,對應(yīng)到存儲就是3列多行。在通常的關(guān)系型數(shù)據(jù)庫中,會逐行存儲,如圖中的 Row layout 所示,第一列是一個整數(shù),第二列是一個字符串,第三列是一個浮點數(shù),按行存儲結(jié)構(gòu)交叉混亂。而將數(shù)據(jù)以列的形式存儲,先存第一列,存好之后再存第二列,再存第三列,如圖中的 Column layout 所示,就會是一個非常整齊的結(jié)構(gòu)。
列式存儲的優(yōu)勢在于:
- 更高的壓縮率。結(jié)構(gòu)相近的數(shù)據(jù)存放在一起,壓縮比更高。
- 更高效讀取部分數(shù)據(jù)。通常我們在讀某張表的時候,不會一次性讀取所有列,而是只會讀其中的某幾列,如果數(shù)據(jù)按列式存儲,讀的實現(xiàn)會更簡單,要讀哪一列就直接去讀那一列即可。
- 更適合向量化計算。在大數(shù)據(jù)領(lǐng)域,主要的數(shù)據(jù)格式基本都是列式存儲的,如 Parquet、ORC 等都是常用的列式存儲格式。
3. 向量化計算
前文中提到,傳統(tǒng)計算是基于行的,就像灌裝藥劑時一瓶一瓶地處理。而向量化計算是基于列的,每次可以把一整列都加載到內(nèi)存中進行向量計算,性能自然會更優(yōu),因此更適合于數(shù)據(jù)量大、計算復(fù)雜度高的場景。
另外一大優(yōu)勢是內(nèi)存局部性,包括兩種:數(shù)據(jù)局部性和代碼局部性。數(shù)據(jù)局部性如上圖所示,在計算的時候連續(xù)地訪問,現(xiàn)代的 CPU 都會使用一些 Cache,當(dāng)連續(xù)訪問時,對 cache 的利用率是非常高的。代碼局部性,涉及到計算邏輯,如果是行式計算,會先算第一行的第一個值,第二個值,比如先算一個整數(shù)的加法,再算一個字符串操作,再算一個浮點數(shù),那么在代碼里面跳轉(zhuǎn)是比較多的。而如果基于列,連續(xù)多個加法一起算,算好第一行再算第二行,連續(xù)很多個字符串操作,那么在代碼里面的跳轉(zhuǎn)也是非常緊湊的?,F(xiàn)代的 CPU 在執(zhí)行代碼時也會將代碼加載到緩存里面,同樣也會存在代碼的局部性。在列式計算的場景下,兩種局部性都是更優(yōu)的。
硬件支持,主要是 SIMD 指令、GPU 和 DPU 這些硬件的支持。這些硬件有一個常見的設(shè)計,就是希望通過做一些簡單的計算來把吞吐量做更大,這一點上,列式計算其實就對現(xiàn)代硬件設(shè)計表現(xiàn)得更具親和性。
4. SIMD 指令(Single Instruction,Multiple Data)
再來講一下 SIMD 指令。在現(xiàn)代 CPU 上,通過一條指令可以計算多條數(shù)據(jù),比如一次算 4 個數(shù)的加法,這樣比傳統(tǒng)指令一次算一條會更快。近 10 年推出的 CPU 基本上都支持 SIMD 指令。
5. 向量化在 SQL 引擎中的使用
將行式計算變?yōu)榱惺接嬎?,?SQL 里面就是把每次處理一行改成每次處理一個批次,即 Record Batch。一個批次里面可能包含著成千上萬的行,并且在內(nèi)存中是一個列式的組織。計算時,Batch 在 SQL 算子之間傳遞,可以減少算子調(diào)用的開銷,每一次調(diào)用都可以計算成千上萬行。
另外,可以充分利用 SIMD 指令去加速計算,現(xiàn)在流行的 SQL 引擎,如 ClickHouse、Doris、TiDB、DuckDB 等等都已支持向量化計算。
二、Apache Spark 與向量化
1. Apache Spark 介紹
Spark 是目前大數(shù)據(jù)場景下最常用的分布式數(shù)據(jù)引擎之一,廣泛應(yīng)用于 ETL、數(shù)倉建設(shè)、報表分析、機器學(xué)習(xí)等領(lǐng)域。在快手數(shù)據(jù)平臺上,絕大多數(shù)例行作業(yè)為 Spark SQL 作業(yè)。目前每天例行計算有數(shù)十萬個 SQL,處理的數(shù)據(jù)量已經(jīng)達到 EB 級別,每天計算資源開銷會有數(shù)百萬 CU,年化資源開銷超億元。
2. Apache Spark 工作方式
這里簡單介紹一下 Spark SQL 計算的工作方式。首先 SQL 進來會做一些詞法語法的處理,然后做一些優(yōu)化,生成執(zhí)行計劃,其中包括一個個算子,這些算子在執(zhí)行的時候會最終轉(zhuǎn)化為 Spark 的 RDD 去運行。這里特別標明了 RDD 是基于 Internal Row 的,即行式計算,是沒有向量化的。
3. 為什么要研究 Spark+向量化
那么我們?yōu)槭裁匆パ芯?Spark+向量化呢?剛才講到 Spark 是基于行的,沒有做向量化,所以在計算上的開銷非常大,如果能夠把向量化技術(shù)運用到 Spark 上,就可以獲得很大的性能提升和資源上的收益。這里簡單做一個 Spark 行式計算和向量化計算的對比:
- 數(shù)據(jù)讀寫:因為大數(shù)據(jù)存儲一般都是 Parquet、ORC 等列存的格式,那么向量化計算天然就可以比行式計算減少一次列到行的轉(zhuǎn)換。
- 計算模型:基于火山模型,并使用 WholeStageCodeGen 技術(shù)優(yōu)化,這里不做詳細介紹。
- SIMD 特性:向量化對于 SIMD 特性是天然支持的,而行式計算就比較難支持。
- Native 支持:因為 Spark 是用 Java 實現(xiàn)的,是跑在 JVM 上的,如果用向量化技術(shù)重新去實現(xiàn)集成層,可以用一些 native 的語言,比如 C++、Rust,這些語言會比 Java 運行速度更快,因此會帶來一些性能上的收益。
- 硬件支持:除了 SIMD 之外,也可以用到 GPU、DPU。最近比較火的技術(shù),就是把大數(shù)據(jù)計算的一些邏輯用硬件實現(xiàn),那么在向量化下它也是能夠更好的支持。
4. Spark + 向量化在行業(yè)內(nèi)的探索
再來介紹一下業(yè)內(nèi)關(guān)于 Spark+向量化的一些探索。目前業(yè)內(nèi)的向量化技術(shù)解決方案都是通過 Spark 本身的插件機制,把 Spark 算子翻譯成用向量化實現(xiàn)的功能等價的算子。
- 業(yè)界最先開始相關(guān)工作的是 Databricks,他們在 2019 年就開始做 Photon 引擎,目前已商業(yè)化。
- 百度近期也公開了一個 Spark native 的引擎,基于 C++,可以把 Spark 算子轉(zhuǎn)成可以 ClickHouse 的算子,目前正在商業(yè)化試用階段。
- 開源領(lǐng)域名氣比較大的就是 Gluten,是由英特爾和 Kyligence 主導(dǎo)的,與 Facebook 共建的一個項目。它底層也是 C++,有 Facebook Velox 和 ClickHouse 兩個可選的引擎。
- 接下來是快手自研的 Blaze,其底層技術(shù)實現(xiàn)采用的是 Rust,是基于 Apache 的 DataFusion 引擎開發(fā)的。目前在快手內(nèi)部處于大規(guī)模并開源。
五、Blaze 引擎
1. Blaze 引擎是什么?
接下來介紹 Blaze 引擎。它是快手自研的,基于向量化技術(shù)開發(fā)的一套 Native 執(zhí)行引擎,可以充分利用 Native 代碼和 SIMD 指令向量化優(yōu)勢,以實現(xiàn)減少資源開銷、加速執(zhí)行的目的。在公司內(nèi)部已有大規(guī)模的應(yīng)用,對公司降本增效起了很大作用。
簡而言之,只要給 Spark 裝上 Blaze 引擎,就可以在用戶零感知的情況下提升 SQL 的執(zhí)行效率,并極大地減少 SQL 運行的資源開銷。
2. 我們?yōu)槭裁匆?Blaze 引擎?
最初我們做 Blaze 的愿景其實就是降本增效。整個項目于 2021 年底開始調(diào)研,2022 年立項開發(fā)。當(dāng)時也是受到經(jīng)濟環(huán)境的影響,對降本增效的需求比較迫切。項目需要滿足以下幾點要求:
- 正確:系統(tǒng)必須保證計算作業(yè)的正確執(zhí)行,計算結(jié)果與原生 Spark 一致,這是做數(shù)據(jù)的底線。
- 高效:系統(tǒng)能夠?qū)崿F(xiàn)較大的性能提升。如果提升的收益太小,連投入的成本都達不到,就沒有意義了。這也是為什么我們要做底層向量化,而不是在 Spark 自身做開發(fā)的原因了。因為 Spark 本身已經(jīng)開源很多年了,很難將其性能大幅提升。
- 易用:快手內(nèi)部運行的 SQL 非常多,如果一個系統(tǒng)需要用戶做很多調(diào)整,比如改 SQL 或是加一些很復(fù)雜的配置,那么即使新系統(tǒng)能帶來性能提升,整個項目的成本也會非常高,是難以接受的。因此我們希望這套系統(tǒng)對用戶來說是透明的,當(dāng)加上這一系統(tǒng)之后,SQL 可以跑得更快,用的資源更少,并且是無感知。
為什么沒有用已有的開源方案?其實也是與時間點相關(guān),目前做得比較好的開源系統(tǒng) Gluten,在時間點上與我們的項目是重合的,所以當(dāng)時并沒有現(xiàn)成的開源方案可以借鑒。
3. 項目發(fā)展歷程
這里簡單介紹一下整個項目的發(fā)展歷程。
第一階段為“POC 階段“。我們從 2022 年初開始開發(fā),用了三個月的時間做了第一個 POC,跑通了一個簡單 SQL 的用例,驗證了我們這套理論的可行性。
第二階段為“原型版本“階段。也是用三個月的時間,實現(xiàn)了最常用的一些算子,跑通了 TPC-DS 基準測試的所有用例。
第三階段為“生產(chǎn)環(huán)境可用“階段。這個階段持續(xù)了近一年,主要工作是持續(xù)提升表達式和算子的覆蓋度和性能,并且去做一些實際生產(chǎn)環(huán)境的適配,比如支持 UDF、內(nèi)存管理等場景。經(jīng)過近一年的迭代,跑通了線上大多數(shù)作業(yè) SQL,基本達到了生產(chǎn)環(huán)境可用的狀態(tài)。
第四階段為“線上灰度&開源”階段。從 2023 年 4 月一直到現(xiàn)在,持續(xù)放量,并且通過一個雙跑工具來驗證結(jié)果,以保證改造前后計算結(jié)果是一致的。經(jīng)過雙跑之后,加大灰度規(guī)模,同時對 bad case 持續(xù)迭代優(yōu)化。最近,我們也開始做 Blaze 項目的開源和社區(qū)的建設(shè)。
4. Blaze 引擎是如何工作的?
下面介紹 Blaze 引擎的工作原理。上圖中展示了原生 Spark SQL 架構(gòu),從上往下來看,整個架構(gòu)可以分成三層: 前端(Spark Catalyst)、后端(Spark Tungsten)和執(zhí)行層(Spark Core)。
前端主要是負責(zé) SQL 的詞法、語法解析優(yōu)化,然后生成執(zhí)行計劃;后端負責(zé)實現(xiàn)執(zhí)行計劃具體的執(zhí)行邏輯;執(zhí)行層就是對后端的執(zhí)行邏輯去做資源的分配調(diào)度,使用分布式資源完成計算。
5. Blaze 架構(gòu)+Spark SQL
Blaze 利用了 Spark 插件機制,在 Spark 原生架構(gòu)的后端去做改造。當(dāng)前端把執(zhí)行計劃生成好之后,Blaze 會插入一段翻譯邏輯,如果在 Spark 執(zhí)行計劃中的算子能使用 native 向量化算子去做等價替換的話,就去做翻譯,把 Spark 算子翻譯成我的 native 算子,接著通過一個 native 引擎編譯成一個動態(tài)鏈接庫,一個 .so 文件打包到 Java 里面去。在執(zhí)行的時候,它就會把這一套東西發(fā)送到執(zhí)行層,然后使用向量化的邏輯,將翻譯后的執(zhí)行計劃執(zhí)行完成。
6. Blaze 架構(gòu)之 Native Engine 架構(gòu)
下面介紹一下 Native 引擎生成的 .so 文件,其中是一些與 Spark 算子相等價的使用向量化計算的算子。對于這些算子,早期我們完全復(fù)用了 Apache DataFusion 里面的算子,但后來發(fā)現(xiàn),因為其不是專門面向 Spark 開發(fā)的,在某些場景會有局限性,所以重寫了這些算子,使其更適用于 Spark 的場景。
除此之外,我們還對一些公共的模塊進行了重寫,包括內(nèi)存管理、UDF 框架,以及對外部的 IO,如訪問 HDFS、讀 Broadcast,與 Shuffle Service 對接等模塊。
7. Benchmark
上圖中展示了部分測試結(jié)果。
目前 Blaze 已經(jīng)支持了 Spark 3.0-3.5 各版本,均跑通了 TPC-DS 和 TPC-H 測試集。我們專門針對 TPC-H 做了一些優(yōu)化,比如強制使用 Hash Join。第一個圖就是使用了針對性優(yōu)化的測試結(jié)果,相比 Spark3.3,性能提升了近 300%。這種為了測試 Benchmark 而進行的調(diào)優(yōu),其實對生產(chǎn)的意義并不大,得出來的結(jié)果也只是為了跟同類產(chǎn)品做比較。
第二個圖是在實際生產(chǎn)環(huán)境上測試的 Benchmark。我們?nèi)サ袅硕ㄖ频膬?yōu)化,完全使用真實的生產(chǎn)參數(shù)。在這個環(huán)境下再和原生 Spark3.5 做對比,測試結(jié)果顯示,執(zhí)行效率提升了 220%,同時資源開銷也下降了一半以上。
四、從 Benchmark 到實戰(zhàn) Blaze 落地生產(chǎn)環(huán)境的挑戰(zhàn)
1. Benchmark 與生產(chǎn)環(huán)境的區(qū)別
在這一章節(jié)中將介紹 Blaze 是如何落地到快手生產(chǎn)環(huán)境中的。首先需要再次指出,盡管我們很早就跑通了 TPC-DS、TPC-H 這些測試集,但是要從 Benchmark 應(yīng)用到生產(chǎn)環(huán)境,其實還有很多工作要做。
- 輸入數(shù)據(jù)方面:在生產(chǎn)環(huán)境中,我們會面臨各種復(fù)雜的數(shù)據(jù)類型,并且文件格式也可能是 parquet 的各種版本,甚至?xí)恍┊惓?shù)據(jù)。
- 計算邏輯方面:用戶寫的 SQL 各種各樣,可能有成千上萬行,還會包括一些 UDF。
- 配置方面:快手的數(shù)據(jù)平臺允許用戶自定義配置,內(nèi)存大小不一,可能有多種 Spark 參數(shù)。
- 執(zhí)行環(huán)境方面:我們使用的 Hadoop 是內(nèi)部修改過的,一些 Shuffle Service 也是內(nèi)部自己開發(fā)的,沒有直接使用開源的。
- 上線要求方面:上線到生產(chǎn)環(huán)境需要保證數(shù)據(jù)完全一致,并且對用戶無感知。
接下來,將介紹我們?yōu)樯a(chǎn)環(huán)境做的一些開發(fā)和優(yōu)化。
2. 適應(yīng)非標準環(huán)境的存儲系統(tǒng)
快手使用的 HDFS 是經(jīng)過內(nèi)部改造的,對一些開源的客戶端是不適用的,比如說現(xiàn)在 native 實踐在訪問 HDFS 的時候,使用的都是 libhdfs3 庫,但是在快手內(nèi)部,因為我們是修改過的,這個庫沒辦法直接去用。所以為了解決這個問題以及方便后期適配更多的文件系統(tǒng),我們把訪問 IO 改成了直接使用 JNI。這樣就可以完全兼容 Spark 支持的所有存儲系統(tǒng),并且這些關(guān)于存儲系統(tǒng)的配置在以后可以直接復(fù)用。如此對生產(chǎn)環(huán)境的適用就更加泛化了。
3. 支持用戶 UDF、細粒度回退
第二個優(yōu)化是對用戶 UDF 的支持。因為 Spark 的 UDF 是用 Java 寫的,沒有辦法走 native 執(zhí)行。主流的向量化引擎,像 Photon 或者 Gluten 都需要對算子去做回退,也就是當(dāng)算子里面有不支持的表達式,這個算子無法翻譯到 native 去執(zhí)行時,就需要把這個算子放回 Spark 去執(zhí)行。這里的回退就會涉及到一個列轉(zhuǎn)行的操作,因為我們的數(shù)據(jù)在向量化這邊是列式存儲,到了 Spark 里面要轉(zhuǎn)成行才能去計算,而列轉(zhuǎn)行的開銷是非常大的,如果線上用的 UDF 比較多,就會有頻繁的列轉(zhuǎn)行,那么優(yōu)化效果就沒有了,甚至可能就退化了。
所以我們做了一個優(yōu)化,盡量把回退的力度做到最小。比如查 100 個字段,有 1 個 UDF 計算,那么只回退 UDF 的參數(shù),將參數(shù)轉(zhuǎn)回到 Spark,在 Spark 把 UDF 算好,再把結(jié)果轉(zhuǎn)成列,傳到 native 去參與后續(xù)的計算,這樣就可以使行列互轉(zhuǎn)的粒度最小。比如一些 UDF 只有一個參數(shù),那么我們甚至不用做列轉(zhuǎn)行,直接把這個參數(shù)通過 FFI,甚至不需要內(nèi)存拷貝,直接放回到 Spark 去計算。這樣就能夠支持很多線上 UDF 的場景。
4. 小內(nèi)存場景
再來講一下對小內(nèi)存的支持。
在快手內(nèi)部,默認的 Spark SQL 作業(yè)的內(nèi)存配置是比較小的,可能每個 Execute 上就只有幾 GB 的內(nèi)存,并且在 native 代碼里面,由于 JVM 的限制,它只能直接運行在堆外內(nèi)存,是一個特別小的內(nèi)存。為了在這種小內(nèi)存的場景下也能夠用起來,盡量減少用戶去改配置的成本,我們提供了對小內(nèi)存的支持,做了一個多級的內(nèi)存管理。
因為我們知道 Spark 在計算一些如排序聚合這樣的算子的時候,它需要把這個數(shù)據(jù)暫存到內(nèi)存,這種算子是特別吃內(nèi)存的。針對這個問題,我們做了一個多級的 Spill 管理。當(dāng)數(shù)據(jù)占滿了堆外內(nèi)存之后,不是直接去做磁盤溢寫,而是先去檢查堆內(nèi)內(nèi)存是不是還有空間。因為 native 是跑在堆外內(nèi)存,一般堆內(nèi)內(nèi)存它是比較空閑的。我們嘗試把數(shù)據(jù)做一個輕量的壓縮,然后暫存到堆內(nèi)內(nèi)存,這樣可以把 Spark 堆外堆內(nèi)內(nèi)存都充分地利用起來,最終的效果就是即使用戶默認的內(nèi)存配置很小,即便不修改內(nèi)存配置,也能夠有一個很好的優(yōu)化效果。因為我們 native 的代碼是用 C++ 和 Rust 來實現(xiàn)的,它用的內(nèi)存可能比 JVM 要小,甚至在小內(nèi)存下可能跑的比 Spark 默認還要更穩(wěn)定。
5. 針對性優(yōu)化:JSON 解析場景
下面要介紹的是對 JSON 解析的優(yōu)化。
在使用 Spark 做 ET L的時候,經(jīng)常會碰到這樣的場景:有一個特別大的 JSON 字段,需要從字段里面去解析出幾十個 key 出來。這種場景在快手有很多,在這種場景下 Spark 的實現(xiàn)效率比較低,每次解析一個 key,都需要去把字段的 JSON 重新 parse 一下。這里做了一個簡單的 Benchmark,就是解析 1 個字段到解析 5 個字段,可以看到藍色的是 Spark3.5,其開銷增長基本上是線性的。
在 Blaze 里面,我們專門針對這種場景進行了優(yōu)化。在計算的時候,去識別每個表達式是不是有公共的部分,我們發(fā)現(xiàn)解析 JSON 時,它解析的某一部分其實是可以公共的。解析同一個字段,可以取多個 key 的值,這樣就能夠減少重復(fù)解析字段的成本,圖中橙色柱狀是 Blaze 的開銷,可以看到,在做了對重復(fù)解析的優(yōu)化之后,不管解析幾個字段,其開銷基本上持平的,不再是線性增長的情況。
6. 灰度方案
下面介紹一下 Blaze 系統(tǒng)的上線過程,也就是我們的灰度方案。由于數(shù)據(jù)正確性是底線,所以我們開始上線的時候,需要去做嚴格的雙跑對比來確保數(shù)據(jù)是準確的。具體來說,我們會選取用戶的單個時間分區(qū)內(nèi)的一個 SQL,然后將其寫表這部分邏輯去掉,替換成一個校驗邏輯,會對每條數(shù)據(jù)算一個哈希值,然后做一個求和,然后把數(shù)據(jù)的條數(shù)和其哈希值都存下來。同樣的數(shù)據(jù),用 Spark 跑一次,用 Blaze 跑一次,將結(jié)果進行嚴格的一致性對比,并且還要檢驗性能,保證在 Blaze 里的執(zhí)行性能優(yōu)于 Spark,同時資源開銷更小。只有這些完全通過后,才會真正上線。這就是我們早期的初步灰度方案。
后來,隨著 Bad Case 不斷修復(fù),我們對整個系統(tǒng)更加有信心,并且我們希望加快整體進度,所以后面到了大規(guī)模上線階段,我們會引入幾個指標,首先判斷一個作業(yè)是否是核心作業(yè),并對其復(fù)雜程度做一個標識。如果一個作業(yè)不是核心作業(yè),并且比較簡單,我們就考慮將嚴格雙跑改成抽樣雙跑,可能用戶的每個分區(qū)每個表只取其中的一個小文件,用以對比計算的正確性,對于性能可以不做考慮,只要滿足抽樣通過,就可以直接上線。
當(dāng)然,對于核心作業(yè)或者是邏輯較為復(fù)雜的作業(yè),還是要通過嚴格對比才能上線。
7. 上線效果與最終目標
下面從幾個方面來分享上線效果。
- 在資源使用方面,因為快手內(nèi)部資源比較緊張,所以作業(yè)的執(zhí)行時間波動會比較大,這里我們主要考慮資源開銷。目前 Blaze 引擎在快手已覆蓋近一半的例行作業(yè),每天使用的資源開銷占據(jù)整個集群總量的 30%(這里可以看到一些優(yōu)化的效果,本來這部分作業(yè)占的資源達到了 40-50%,切換到 Blaze 之后,開銷下降到了 30%)。
- 上線作業(yè) native 算子占比達到了 93%,剩下尚未實現(xiàn)的部分,一個是 UDAF 用戶自定義的聚合函數(shù),這塊目前還在調(diào)研中,還沒有找到一個很好的辦法去做比如單 UDAF 的回退,另外還有一些用戶自定義的窗口函數(shù)還沒有支持到,可能還有少部分的算子是要回退的。
- 資源開銷方面,我們將上線前 7 天和上線后 7 天的平均資源開銷進行了對比,平均降低 30%,比如上線前可能要跑 10 分鐘,上線后僅需 7 分鐘,那么資源開銷就能夠下降 30%。節(jié)約的資源開銷折算的年化收益已達到數(shù)千萬。
我們最終的目標就是希望快手數(shù)據(jù)平臺的所有 SQL 作業(yè)都默認打開 Blaze。
五、Blaze 開源計劃
1. 為什么要將 Blaze 開源?
我們希望通過開源社區(qū)能夠讓 Blaze 項目有更長遠的發(fā)展。所以我們最近也在做開源社區(qū)的建設(shè),希望借助社區(qū)力量,一起把 Blaze 引擎優(yōu)化效果用起來,同時進一步提升我們的影響力和技術(shù)水平。
2. 當(dāng)前進展
這里是目前我們在社區(qū)取得的一些進展:
- 首先整個項目的構(gòu)建,在經(jīng)過社區(qū)很多同學(xué)的優(yōu)化之后,已經(jīng)逐漸健壯起來。目前項目的構(gòu)建過程也變得相當(dāng)簡單,只要在 GitHub 上提交了代碼,它就會在 GitHub 上直接編譯出一個可用的包,并且可以在 GitHub 上去跑一個小規(guī)模的 TPC-DS 驗證。整個編譯是非常簡單的,歡迎大家試用。
- 另外,對 Spark 多版本提供了支持。目前已覆蓋 Spark 3.0~3.5 版本。
- 第三是對 ORC 格式的支持。因為快手內(nèi)部用的都是 Parquet,原本對 ORC 是沒有支持的,經(jīng)過社區(qū)的運營,有熱心的同學(xué)加上了對 ORC 的支持。
- 最后是我們與阿里的合作,對 Apache Celeborn 提供了支持。Celeborn 是阿里研發(fā)的一個 Shuffle Service 服務(wù),也是由阿里那邊去做開發(fā),完成了 Blaze 對 Celeborn 的支持。
- 當(dāng)前社區(qū)項目已經(jīng)有 1.3k star,有多家公司試用并取得預(yù)期收益。
3. 未來規(guī)劃
未來主要工作包括以下幾個方向:
- 首先,我們的科研重點還是 Blaze 的生態(tài)圈。比較緊迫的是數(shù)據(jù)湖方面,會考慮對 Hudi 或者 Iceberg 這些數(shù)據(jù)湖引擎的支持。在 Shuffle Service 方面,我們已經(jīng)支持了阿里的 Celeborn,后續(xù)也會提供對騰訊 Uniffle 的支持。目前也在調(diào)研,是否能夠把我們的 native 引擎集成到 Flink 上去。
- 第二塊是多版本的支持?,F(xiàn)在 Spark 4 即將推出,我們會保持對 Spark 版本的跟進。
- 第三是性能優(yōu)化,這始終是最核心的工作。
- 最后是提升項目的社區(qū)影響力,我們也有計劃把整個項目加到 Apache 中去,目前正在推進中。
Github地址:https://github.com/kwai/blaze
以上是 Blaze 項目的開源地址,歡迎大家關(guān)注并試用(點擊閱讀原文可直接跳轉(zhuǎn)),也歡迎大家加入我們的技術(shù)交流群,謝謝大家!
六、Q&A
Q1:前面講到的采樣和雙跑,是要對 SQL 進行改造,還是 Blaze 自帶的功能?
A1:這個是我們做在 Spark 里面的一個功能,在讀表的時候,表里面可能有很多文件,但我們采樣只用讀一個就行,目的是驗證它執(zhí)行的邏輯是否正確,如果正確就不用去讀全表。對性能也是有信心的,所以采樣時就不用管。這塊對于 SQL 的改造是 Blaze 自帶的。
Q2:灰度測試以及后面的引擎切換是需要手動切換嗎?
A2:這塊切換是完全在引擎上去做的,只需要改一下參數(shù)配置。這塊對于用戶來說是不可見的,用戶關(guān)注的只是我們執(zhí)行的效率和結(jié)果。當(dāng)然我們會做一些對用戶的通知,告知作業(yè)已切換到 Blaze。
Q3:分享中提到,相比原生的 Spark3.3 和 Spark3.5 的速度,兩次 Benchmark 分別提升 300% 和 220%,速度提升意味著它的時效也會提升,對吧?比如計算時間縮短,只要原來的 1/3 了?
A3:是的。如果在資源的配置不變的條件下,那么假設(shè)原來執(zhí)行 10 分鐘,那么切換到 Blaze 之后,執(zhí)行可能就只要 5 分鐘了,這里就會有一個時效性的提升,資源開銷就下降了一半。
Q4:切換到 Blaze 是有一個開關(guān)一樣的配置是吧?那什么樣的任務(wù)才能去切換,比如基于什么樣的規(guī)則,還是通過人工去篩選?
A4:目前在我們大規(guī)?;叶葴y試的話,其實依據(jù)是我們對于歷史任務(wù)的一個分析。例如我們在記錄例行任務(wù)的時候會加上一些標識信息,就可以知道它的任務(wù)的核心程度、核心等級和復(fù)雜程度和它的子算子等信息。通過這些信息,我們可以去判斷測試用例是否需要雙跑,方式是采樣還是全量,然后分別去做這些驗證,最后在我們引擎這邊把它的作業(yè)加到灰度,這些對于用戶其實也還是沒有感知的。
Q5:如果大規(guī)模的任務(wù)做了灰度切換之后,怎么去保障它的計算結(jié)果是準確的?有沒有好的方法去驗證?
A5:首先剛才講到,我們在切換之前有一個雙跑,如果雙跑通過,那么我們就認為其沒有問題了。當(dāng)然也有過比較極端的 case,特別是 JSON 解析這一塊,因為我們用的是不同的解析庫,它其實會有一些問題,比如有些不規(guī)范的 JSON,里面有一些特殊字符,例如有一些表情包之類在里面,我們已經(jīng)踩過了很多坑,所以現(xiàn)在有充足的信心,如果雙跑通過,就能夠保證后面的數(shù)據(jù)是對的。
Q6:介紹中提到的都是 Spark SQL 的一些案例遷移到Blaze引擎去執(zhí)行。如果是用 Java 或 Scala 寫的那種算子,就是 Jar 包類型的任務(wù),或者是通過 PySpark 去實現(xiàn)的任務(wù),也能應(yīng)用到這個引擎嗎?
A6:目前可以對純 SQL 任務(wù)或者 Spark Jar、PySpark 任務(wù)中的 SQL 部分做優(yōu)化,我們還是對 SQL 生成的算子支持的比較好,因為 SQL 是一些比較標準邏輯的,我們只要去遵守標準即可。但如果是用戶自己實現(xiàn)的一些 RDD 的操作,這塊目前還是做不到。
Q7:在 Spark 切換的時候,Spark 用戶經(jīng)常有很多 UDF 代碼,這些代碼基本上是按行的形式去存儲的,這種情況下很容易打斷向量化執(zhí)行,是否有一些手段讓用戶的這些 UDF 自動轉(zhuǎn)成向量式的處理?
A7:我們針對一些常用的 UDF 比如 Brickhouse 系列的 UDF 做了一個在 native 的實現(xiàn),像這部分是可以直接用 native 去執(zhí)行的。這塊因為目前我們暫時還沒有一個 native 的 UDF 框架,如果需要,可以去改 Blaze 的代碼去編譯,后續(xù)我們會考慮加一個 native 的 UDF 框架。例如剛才問到的,怎么樣把 Java 的 UDF 轉(zhuǎn)成 native,其實目前 ChatGPT 可以做這個事情,我們也試過,還是效果比較好的。