字節(jié)跳動數(shù)據(jù)湖技術(shù)選型的思考
字節(jié)跳動數(shù)據(jù)集成的現(xiàn)狀
在2018年,我們基于Flink構(gòu)造了異構(gòu)數(shù)據(jù)源之間批式同步通道,主要用于將在線數(shù)據(jù)庫導(dǎo)入到離線數(shù)倉,和不同數(shù)據(jù)源之間的批式傳輸。
在2020年,我們基于Flink構(gòu)造了MQ-Hive的實時數(shù)據(jù)集成通道,主要用于將消息隊列中的數(shù)據(jù)實時寫入到Hive和HDFS,在計算引擎上做到了流批統(tǒng)一。
到了2021年,我們基于Flink構(gòu)造了實時數(shù)據(jù)湖集成通道,從而完成了湖倉一體的數(shù)據(jù)集成系統(tǒng)的構(gòu)建。
字節(jié)跳動數(shù)據(jù)集成系統(tǒng)目前支持了幾十條不同的數(shù)據(jù)傳輸管道,涵蓋了線上數(shù)據(jù)庫,例如Mysql Oracle和MangoDB;消息隊列,例如Kafka RocketMQ;大數(shù)據(jù)生態(tài)系統(tǒng)的各種組件,例如HDFS、HIVE和ClickHouse。
在字節(jié)跳動內(nèi)部,數(shù)據(jù)集成系統(tǒng)服務(wù)了幾乎所有的業(yè)務(wù)線,包括抖音、今日頭條等大家耳熟能詳?shù)膽?yīng)用。
整個系統(tǒng)主要分成3種模式——批式集成、流式集成和增量集成。
- 批式集成模式基于Flink Batch模式打造,將數(shù)據(jù)以批的形式在不同系統(tǒng)中傳輸,目前支持了20多種不同數(shù)據(jù)源類型。
- 流式集成模式主要是從MQ將數(shù)據(jù)導(dǎo)入到Hive和HDFS,任務(wù)的穩(wěn)定性和實時性都受到了用戶廣泛的認可。
- 增量模式即CDC模式,用于支持通過數(shù)據(jù)庫變更日志Binlog,將數(shù)據(jù)變更同步到外部組件的數(shù)據(jù)庫。這種模式目前支持5種數(shù)據(jù)源,雖然數(shù)據(jù)源不多,但是任務(wù)數(shù)量非常龐大,其中包含了很多核心鏈路,例如各個業(yè)務(wù)線的計費、結(jié)算等,對數(shù)據(jù)準確性要求非常高。在CDC鏈路的整體鏈路比較長。首先,首次導(dǎo)入為批式導(dǎo)入,我們通過Flink Batch模式直連Mysql庫拉取全量數(shù)據(jù)寫入到Hive,增量Binlog數(shù)據(jù)通過流式任務(wù)導(dǎo)入到HDFS。由于Hive不支持更新操作,我們依舊使用了一條基于Spark的批處理鏈路,通過T-1增量合并的方式,將前一天的Hive表和新增的Binlog進行合并從而產(chǎn)出當天的Hive表。
隨著業(yè)務(wù)的快速發(fā)展,這條鏈路暴露出來的問題也越來越多。
- 首先,這條基于Spark的離線鏈路資源消耗嚴重,每次產(chǎn)出新數(shù)據(jù)都會涉及到一次全量數(shù)據(jù)Shuffle以及一份全量數(shù)據(jù)落盤,中間所消耗的儲存以及計算資源都比較嚴重。
- 同時,隨著字節(jié)跳動業(yè)務(wù)的快速發(fā)展,近實時分析的需求也越來越多。
- 最后,整條鏈路流程太長,涉及到Spark和Flink兩個計算引擎,以及3個不同的任務(wù)類型,用戶使用成本和學(xué)習(xí)成本都比較高,并且?guī)砹瞬恍〉倪\維成本。
為了解決這些問題,我們希望對增量模式做一次徹底的架構(gòu)升級,將增量模式合并到流式集成中,從而可以擺脫對Spark的依賴,在計算引擎層面做到統(tǒng)一。
改造完成后,基于Flink的數(shù)據(jù)集成引擎就能同時支持批式、流式和增量模式,幾乎可以覆蓋所有的數(shù)據(jù)集成場景。
同時,在增量模式上,提供和流式通道相當?shù)臄?shù)據(jù)延遲,賦予用戶近實時分析能力。在達到這些目標的同時,還可以進一步降低計算成本、提高效率。
經(jīng)過一番探索,我們關(guān)注到了正在興起的數(shù)據(jù)湖技術(shù)。
關(guān)于數(shù)據(jù)湖技術(shù)選型的思考我們的目光集中在了Apache軟件基金會旗下的兩款開源數(shù)據(jù)湖框架Iceberg和Hudi中。Iceberg和Hudi兩款數(shù)據(jù)湖框架都非常優(yōu)秀。但兩個項目被創(chuàng)建的目的是為了解決不同的問題,所以在功能上的側(cè)重點也有所不同。
- Iceberg:核心抽象對接新的計算引擎的成本比較低,并且提供先進的查詢優(yōu)化功能和完全的schema變更。
- Hudi:更注重于高效率的Upsert和近實時更新,提供了Merge On Read文件格式,以及便于搭建增量ETL管道的增量查詢功能。
一番對比下來,兩個框架各有千秋,并且離我們想象中的數(shù)據(jù)湖最終形態(tài)都有一定距離,于是我們的核心問題便集中在了以下兩個問題:
- 哪個框架可以更好的支持我們CDC數(shù)據(jù)處理的核心訴求?
- 哪個框架可以更快速補齊另一個框架的功能,從而成長為一個通用并且成熟的數(shù)據(jù)湖框架?
經(jīng)過多次的內(nèi)部討論,我們認為:Hudi在處理CDC數(shù)據(jù)上更為成熟,并且社區(qū)迭代速度非???,特別是最近一年補齊了很多重要的功能,與Flink的集成也愈發(fā)成熟,最終我們選擇了Hudi作為我們的數(shù)據(jù)湖底座。
01 - 索引系統(tǒng)
我們選擇Hudi,最為看重的就是Hudi的索引系統(tǒng)。
這張圖是一個有索引和沒有索引的對比。在CDC數(shù)據(jù)寫入的過程中,為了讓新增的Update數(shù)據(jù)作用在底表上,我們需要明確知道這條數(shù)據(jù)是否出現(xiàn)過、出現(xiàn)在哪里,從而把數(shù)據(jù)寫到正確的地方。
在合并的時候,我們就可以只合并單個文件,而不需要去管全局數(shù)據(jù)。如果沒有索引,合并的操作只能通過合并全局數(shù)據(jù),帶來的就是全局的shuffle。在圖中的例子中,沒有索引的合并開銷是有索引的兩倍,并且如果隨著底表數(shù)據(jù)量的增大,這個性能差距會呈指數(shù)型上升。
所以,在字節(jié)跳動的業(yè)務(wù)數(shù)據(jù)量級下,索引帶來的性能收益是非常巨大的。Hudi提供了多種索引來適配不同的場景,每種索引都有不同的優(yōu)缺點,索引的選擇需要根據(jù)具體的數(shù)據(jù)分布來進行取舍,從而達到寫入和查詢的最優(yōu)解。下面舉兩個不同場景的例子。
日志數(shù)據(jù)去重場景
在日志數(shù)據(jù)去重的場景中,數(shù)據(jù)通常會有一個create_time的時間戳,底表的分布也是按照這個時間戳進行分區(qū),最近幾小時或者幾天的數(shù)據(jù)會有比較頻繁的更新,但是更老的數(shù)據(jù)則不會有太多的變化。冷熱分區(qū)的場景就比較適合布隆索引、帶TTL的State索引和哈希索引。
CDC場景
第二個例子是一個數(shù)據(jù)庫導(dǎo)出的例子,也就是CDC場景。這個場景更新數(shù)據(jù)會隨機分布,沒有什么規(guī)律可言,并且底表的數(shù)據(jù)量會比較大,新增的數(shù)據(jù)量通常相比底表會比較小。在這種場景下,我們可以選用哈希索引、State索引和Hbase索引來做到高效率的全局索引。這兩個例子說明了不同場景下,索引的選擇也會決定了整個表讀寫性能。Hudi提供多種開箱即用的索引,已經(jīng)覆蓋了絕大部分場景,用戶使用成本非常低。
02 - Merge On Read表格式
除了索引系統(tǒng)之外,Hudi的Merge On Read表格式也是一個我們看重的核心功能之一。這種表格式讓實時寫入、近實時查詢成為了可能。在大數(shù)據(jù)體系的建設(shè)中,寫入引擎和查詢引擎存在著天然的沖突:
- 寫入引擎更傾向于寫小文件,以行存的數(shù)據(jù)格式寫入,盡可能避免在寫入過程中有過多的計算包袱,最好是來一條寫一條。
- 查詢引擎則更傾向于讀大文件,以列存的文件格式儲存數(shù)據(jù),比如說parquet和orc,數(shù)據(jù)以某種規(guī)則嚴格分布,比如根據(jù)某個常用字段進行排序,從而做到可以在查詢的時候,跳過掃描無用的數(shù)據(jù),來減少計算開銷。
為了在這種天然的沖突下找到最佳的取舍,Hudi支持了Merge On Read的文件格式。
MOR格式中包含兩種文件:一種是基于行存Avro格式的log文件,一種是基于列存格式的base文件,包括Parquet或者ORC。log文件通常體積較小,包含了新增的更新數(shù)據(jù)。base文件體積較大,包含了所有的歷史數(shù)據(jù)。
- 寫入引擎可以低延遲的將更新的數(shù)據(jù)寫入到log文件中。
- 查詢引擎在讀的時候?qū)og文件與base文件進行合并,從而可以讀到最新的視圖;compaction任務(wù)定時觸發(fā)合并base文件和log文件,避免log文件持續(xù)膨脹。在這個機制下,Merge On Read文件格式做到了實時寫入和近實時查詢。
03 - 增量計算
索引系統(tǒng)和Merge On Read格式給實時數(shù)據(jù)湖打下了非常堅實的基礎(chǔ),增量計算則是這個基礎(chǔ)之上的Hudi的又一個亮眼功能:
?增量計算賦予了Hudi類似于消息隊列的能力。用戶可以通過類似于offset的時間戳,在Hudi的時間線上拉取一段時間內(nèi)的新增數(shù)據(jù)。在一些數(shù)據(jù)延遲容忍度在分鐘級別的場景中,基于Hudi可以統(tǒng)一Lambda架構(gòu),同時服務(wù)于實時場景和離線場景,在儲存上做到流批一體。
結(jié)語在選擇了基于Hudi的數(shù)據(jù)湖框架后,我們基于字節(jié)跳動內(nèi)部的場景,打造定制化落地方案。我們的目標是通過Hudi來支持所有帶Update的數(shù)據(jù)鏈路。