深度解析字節(jié)跳動開源數(shù)據(jù)集成引擎 BitSail
精選1. 導讀
BitSail 是字節(jié)跳動開源數(shù)據(jù)集成引擎,支持多種異構(gòu)數(shù)據(jù)源間的數(shù)據(jù)同步,并提供離線、實時、全量、增量場景下全域數(shù)據(jù)集成解決方案,目前支撐了字節(jié)內(nèi)部和火山引擎多個客戶的數(shù)據(jù)集成需求。經(jīng)過字節(jié)跳動各大業(yè)務線海量數(shù)據(jù)的考驗,在性能、穩(wěn)定性上得到較好驗證。
10 月 26 日,字節(jié)跳動宣布 BitSail 項目正式在 GitHub 開源,為更多的企業(yè)和開發(fā)者帶來便利,降低數(shù)據(jù)建設的成本,讓數(shù)據(jù)高效地創(chuàng)造價值。本篇內(nèi)容將圍繞 BitSail 演講歷程及重點能力解析展開,主要包括以下四個部分:
- 字節(jié)跳動內(nèi)部數(shù)據(jù)集成背景
- BitSail 技術(shù)演進歷程
- BitSail 能力解析
- 未來展望
2. 字節(jié)跳動內(nèi)部數(shù)據(jù)集成背景
一直以來,字節(jié)跳動都非常重視并貫徹“數(shù)據(jù)驅(qū)動”這一理念,作為數(shù)據(jù)驅(qū)動的一環(huán),數(shù)據(jù)中臺能力的建設至關(guān)重要,而這其中,數(shù)據(jù)集成作為數(shù)據(jù)中臺建設的基礎,主要解決了異構(gòu)數(shù)據(jù)源的數(shù)據(jù)傳輸、加工和處理的問題。
BitSail 源自字節(jié)跳動數(shù)據(jù)平臺團隊自研的數(shù)據(jù)集成引擎 DTS(全稱 Data Transmission Service,即數(shù)據(jù)傳輸服務),最初基于 Apache Flink 實現(xiàn),至今已經(jīng)服務于字節(jié)內(nèi)部業(yè)務接近五年,現(xiàn)已具備批式集成、流式集成和增量集成三類同步模式,并支持分布式水平擴展和流批一體架構(gòu),在各種數(shù)據(jù)量和各種場景下,一個框架即可解決數(shù)據(jù)集成需求。此外,BitSail 采用插件式架構(gòu),支持運行時解耦,從而具備極強的靈活性,企業(yè)可以很方便地接入新的數(shù)據(jù)源。
3. BitSail 演進歷程
3.1 全域數(shù)據(jù)集成引擎演進三階段
字節(jié)跳動數(shù)據(jù)集成引擎 BitSail 演進的歷程可以分為三個階段:
① 初始期:2018 年以前公司沒有統(tǒng)一的數(shù)據(jù)集成框架,對每個通道都是各自實現(xiàn),因此依賴的大數(shù)據(jù)引擎也比較零散,如 MapReduce 、Spark ,數(shù)據(jù)源之間的連接也是網(wǎng)狀連接,整體的開發(fā)和運維成本都比較高。
② 成長期:可以分為三個小階段。
- 2018 - 2019 :隨著 Flink 生態(tài)不斷完善,越來越多的公司將 Flink 作為大數(shù)據(jù)計算引擎的首選,字節(jié)跳動也不例外,并在 Flink 上持續(xù)探索,并于 2019 年提出基于 Flink 的異構(gòu)數(shù)據(jù)源間傳輸,完成批式場景的統(tǒng)一。
- 2020 - 2021 :隨著 Flink 批流一體的完善,字節(jié)跳動對原有架構(gòu)進行較大升級,并覆蓋了流式場景,完成批流場景的統(tǒng)一。
- 2021 - 2022 :接入了 Hudi 數(shù)據(jù)湖引擎,解決 CDC 數(shù)據(jù)實時同步問題,并提供湖倉一體解決方案。
③ 成熟期:2022 年開始全域數(shù)據(jù)集成引擎的整體架構(gòu)已經(jīng)穩(wěn)定,并經(jīng)過字節(jié)跳動內(nèi)部各業(yè)務線生產(chǎn)環(huán)境的考驗,在性能和穩(wěn)定性上也得到充分的保障,于是團隊希望能夠?qū)⒛芰ν廨敵?,為更多的企業(yè)和開發(fā)者帶來便利,降低數(shù)據(jù)建設的成本,讓數(shù)據(jù)高效地創(chuàng)造價值。
3.2 BitSail 數(shù)據(jù)集成引擎技術(shù)架構(gòu)演進
3.2.1 基于 Flink 的異構(gòu)數(shù)據(jù)源傳輸架構(gòu)
基于 Flink 1.5 DataSet API 實現(xiàn)的異構(gòu)數(shù)據(jù)源傳輸架構(gòu),只支持批式場景??蚣芎诵乃枷胧?,對原始輸入層數(shù)據(jù)抽象為 BaseInput,主要用于拉取源端的數(shù)據(jù);對輸出層抽象為 BaseOutput,負責將數(shù)據(jù)寫到外部系統(tǒng)。同時,框架層提供了基礎服務,包括類型系統(tǒng)(Type System)、自動并發(fā)度(Auto Parallelism)、流控(Flow Control)、臟數(shù)據(jù)檢測(Dirty Data)等等,并對所有的數(shù)據(jù)源通道生效。
以下介紹一個批次場景上比較有意思的功能,也是實際業(yè)務中面臨的一些痛點。
上圖左上部分是原始的 Flink 運行日志,從這個日志里看不到任務進度數(shù)據(jù)和預測數(shù)據(jù),如當前任務運行的百分比、運行完成所需時間。
左下部分則是 Flink UI 界面提供的任務運行的元信息,可以看到讀寫條數(shù)都是 0 ,從 Flink 引擎角度,由于所有算子作為一個整體是沒有輸入和輸出的,這是合理的,但從用戶角度就無法看到任務整體進度信息和當前處理記錄條數(shù),從而導致用戶懷疑這個任務是否已經(jīng)卡住。圖中右邊是改造之后的效果,日志中明確輸出當前處理了多少條數(shù)、實時進度展示、消耗時間等等,該功能在字節(jié)內(nèi)部上線后,得到了很多業(yè)務的好評。
下面介紹一下具體的實現(xiàn)。
首先回顧 Flink Task 的執(zhí)行過程,與傳統(tǒng)的 MapReduce、Spark 的驅(qū)動模型不一樣,F(xiàn)link 是以任務驅(qū)動,JM 創(chuàng)建好 Split 之后,Task 是常駐運行,不斷向 JM 請求新的 Split,只有所有的 Split 處理完之后,Task 才會退出。此時,如果用總的完成的 Task 個數(shù)除以總的 Task 個數(shù),進度將出現(xiàn)一定程度的失真。最開始,所有的 Task 都在運行,不斷地去拉取 Split,我們看到的進度會是 0,等到 JM 的 Split 處理完之后,所有的 Task 會集中退出,可以看到進度會突然跳動到 100%,中間是缺少進度信息的。
為了解決這個問題,我們還是要回到數(shù)據(jù)驅(qū)動本身,以 Split 的維度來衡量整個 Job 的運行過程。圖中右邊所展示的是,通過 Flink UI 提供的 API,可以拿到整個任務的拓撲信息,將其分為兩層算子并進行改造,分別是 Source 層和 Operator 層。
Source 層?
我們修改了原生的 Source API,具體的話包括兩個部分,第一個是創(chuàng)建 Split 之后,我們會去拿到 Total Split 的個數(shù),將它上載到 Metric 里;其次是 Source里的每個 Task 每處理完一個 Split 之后,我們會上報一個 CompletedSplit。最終我們通過 Flink UI 是可以拿到當前已經(jīng)完成的 Split 個數(shù)以及總共的 Split 個數(shù),并用完成的 Split 個數(shù)來除以總共的 Split 個數(shù)來衡量 Source 節(jié)點的進度。
Operator 層?
首先我們會看當前 Operator 上游節(jié)點的輸出多少條,以及當前節(jié)點它讀取了多少條,并用當前節(jié)點讀取的條數(shù)除以它的上游節(jié)點的輸出條數(shù)作為當前 Operator 的進度。同時,這里我們做了一個梯度限制,就是當前節(jié)點的進度只能小于等于它的上游節(jié)點進度。
3.2.2 基于 Flink 批流一體的架構(gòu)
以下是批流一體的架構(gòu),相對于原有架構(gòu),字節(jié)跳動數(shù)據(jù)平臺團隊完成如下升級:
- 將 Flink 版本從 1.5 升級到 1.9,同時我們分析了 DataSet API,統(tǒng)一升級到 DataStream API,以支持批流一體架構(gòu)。
- 對數(shù)據(jù)源支持進行擴充,除了原有的離線數(shù)據(jù)源之外,增加了實時數(shù)據(jù)源,如消息隊列。
- 對框架層完成拓展,支持 Exactly Once、支持 Event Time 寫入、Auto DDL 等功能。
- 對引擎層進行改進,增加推測執(zhí)行、Region Failover 等功能。
- 在 Runtime 層也做了進一步的擴充,支持云原生架構(gòu)。
我們分析一個實時場景中比較典型的鏈路,MQ 到 Hive 這個鏈路。
左圖(Shuffle)是目前社區(qū)的實現(xiàn)方式,很多數(shù)據(jù)湖的寫入,比如 Hudi、Iceberg 基本上也是這個結(jié)構(gòu)。這套結(jié)構(gòu)分為兩層算子,第一層是我們的數(shù)據(jù)處理層,負責數(shù)據(jù)的讀取和寫入;第二層算子是一個單節(jié)點的提交層,它是一個單并發(fā),主要負責元信息的提交,比如去生成 Hive 的分區(qū)或者做一些其他的元信息動作。
這個架構(gòu)的優(yōu)勢是其整體拓撲(數(shù)據(jù)處理流程)比較清晰,算子功能定位也比較清楚,但是它有一個明顯的缺陷,加入一個單并發(fā)節(jié)點后,導致整個任務變成 Shuffle 連接。而 Shuffle 連接天然的弱勢是,當遇到 Task Failover 的時候,它會直接進行全局重啟。?
右圖(Pipelined)是改造之后的數(shù)據(jù)處理流程,數(shù)據(jù)寫入部分沒有變化,變化的是后面的提交部分,這樣的設計考慮是是保持原有 Pipeline 架構(gòu),以實現(xiàn) Task 容錯時不會進行全局重啟。廢棄了原有的單并發(fā)提交節(jié)點,把所有元信息的提交拿到 JM 端處理,同時 Task 和 JM 的通訊是通過 Aggregate Manager 來實現(xiàn)。改為這套架構(gòu)之后,在大數(shù)據(jù)量場景下,其穩(wěn)定性得到了顯著的提升。
3.2.3 基于 Flink 湖倉一體的架構(gòu)
引入湖倉一體架構(gòu)的目的是解決 CDC 數(shù)據(jù)的近實時同步。
右圖是原有架構(gòu),處理流程包括三個模塊:
- 拉取批次任務:用來拉取 CDC 全量的數(shù)據(jù),寫到 Hive 里作為一個基礎的鏡像。
- 實時任務:拉取 CDC 的 Changelog,并實時寫入 HDFS,作為一個增量數(shù)據(jù)。
- 離線調(diào)度任務:周期性地進行 Merge,將全量數(shù)據(jù)和增量數(shù)據(jù)進行合并,形成新的全量數(shù)據(jù)。
上述架構(gòu)比較復雜,并依賴 Flink、Spark 等多種計算引擎,在實時性方面,只能做到 T+1,最快也只能做到小時級延遲,無法有效支撐近實時分析場景。從效率來說,存儲開銷比較大,每個分區(qū)都是一個全量鏡像,而且計算成本較高,每次 Merge 都需要進行全局 Shuffle。
右圖是升級后的架構(gòu),主要的升級點包括:??
- 將 Flink 1.9 升級到 Flink 1.11,接入了 Hudi 數(shù)據(jù)湖引擎,以支持 CDC 數(shù)據(jù)近實時同步。這是因為 Hudi 引擎有完備的索引機制以及高效的 Upsert 性能。
- 對 Hudi 引擎也進行了多項基礎改進,以提高整體的寫入效率和穩(wěn)定性。
最終實施的效果,近實時寫入,整體的延遲在 10 分鐘以內(nèi),綜合性能比原有架構(gòu)提升 70% 以上。至此,完成了全域數(shù)據(jù)集成架構(gòu)統(tǒng)一,實現(xiàn)一套系統(tǒng)覆蓋所有同步場景。
3.3 架構(gòu)演進過程實踐經(jīng)驗分享
下面介紹實際演進過程中的一些思考、問題和改進方案。
表類型選擇?
數(shù)據(jù)湖是支持多種表格式的,比如 CopyOnWrite(簡稱COW)表、MergeOnRead(簡稱MOR)表。COW 表的優(yōu)勢在于讀性能比較好,但是會導致寫放大,MOR 表正好相反,寫的性能比較好的,會導致讀放大。具體選擇哪種表格式,更多要根據(jù)大家的業(yè)務場景來決定。
我們的業(yè)務場景是為了解決 CDC 數(shù)據(jù)的近實時同步,CDC 數(shù)據(jù)有個明顯的特點,是存在大量的隨機更新。這個場景下選擇 COW,會導致寫放大的問題比較嚴重,所以我們選擇了 MOR 表。上圖就是一個 MOR 表查詢和寫入的流程。第一個是列存儲的基礎鏡像文件,我們稱之為 Base 文件,第二個是行存儲的增量日志,我們稱之為 Log 文件。
每次查詢時,需要將 Log 文件和 Base 文件合并,為了解決 MOR 表讀放大的問題,通常我們會建一個 Compaction 的服務,通過周期性的調(diào)度,將 Log 文件和 Base 文件合并,生成一個新的 Base 文件。
Hudi 實時寫入痛點?
如圖所示,這是原生的 Hudi 實時寫入的流程圖。
首先,我們接入 Hudi 數(shù)據(jù),會進入 Flink State,它的作用是索引。Hudi 提供了很多索引機制,比如 BloomIndex。但是 BloomIndex 有個缺陷,它會出現(xiàn)假陽性,降級去遍歷整個文件,在效率上有一定的影響。Flink State 的優(yōu)勢是支持增量更新,同時它讀取的性能會比較高。經(jīng)過 Flink State 之后,我們就可以確認這條記錄是 Upsert,還是 Insert 記錄,同時會分配一個 File Id。
緊接著,我們通過這個 File Id 會做一層 KeyBy,將相同 File 的數(shù)據(jù)分配到同一個Task。Task 會為每一個 File Id 在本地做一次緩存,當緩存達到上限后,會將這批數(shù)據(jù) Flush 出去到 hoodie client 端。Hoodie client 主要是負責以塊的方式來寫增量的 Log 數(shù)據(jù),以 Mini Batch 的方式將數(shù)據(jù)刷新到 HDFS。
再之后,我們會接一個單并發(fā)的提交節(jié)點,最新的版本是基于 Coordinator 來做的,當所有的算子 Checkpoint 完成之后,會提交元信息做一次 Commit,認為這次寫入成功。同時 Checkpoint 時,我們會刷新 Task 的緩存和 hoodie client 的緩存,同時寫到 HDFS。通常,我們還會接一個 Compaction 的算子,主要用來解決 MOR 表讀放大的問題。
這個架構(gòu)在實際的生產(chǎn)環(huán)境會遇到如下問題:?
(1)當數(shù)據(jù)量比較大的時候,Flink State 的膨脹會比較厲害,相應地會影響 Task 的速度以及 Checkpoint 的成功率。
(2)關(guān)于 Compaction 算子,F(xiàn)link 的流式任務資源是常駐的,Compaction 本身是一個周期性的調(diào)度,如果并發(fā)度設置比較高,往往就意味著資源的浪費比較多。
(3)Flink 提供了很多資源優(yōu)化的策略,比如 Slot Sharing,來提高整體的資源利用率,這就會導致資源搶占的問題,Compaction 會和真正的數(shù)據(jù)讀寫算子來進行資源的搶占。Compaction 本身也是一個重 I/O、CPU 密集型操作,需要不斷地讀取增量日志、全量日志,同時再輸出一個全量數(shù)據(jù)。
針對上述問題,我們優(yōu)化了 Hudi 的寫入流程。
首先我們會采集 CDC 的 Change Log,并發(fā)送到消息隊列,然后消費消息隊列中的 Change Log,然后我們進行如下三個優(yōu)化:?
(1)廢棄了原先的 Flink State,替換為 Hash Index。Hash Index 的優(yōu)勢是不依賴外部存儲。來了一個 Hoodie Record 之后,只需要一個簡單的哈希處理,就知道它對應的 Bucket。
(2)將 Compaction 服務獨立成一個離線的任務,并且是周期性的調(diào)度,用來解決資源浪費和資源搶占的問題。
(3)將 Task 緩存和 Hudi 緩存做了合并,因為每次 Checkpoint 都需要刷新 Task 緩存,Hudi 緩存需要寫入 HDFS,如果緩存的數(shù)據(jù)量比較多,會導致整個 Checkpoint 時間比較長。
優(yōu)化之后,穩(wěn)定性方面,可以支持百萬級的 QPS;端到端的 Checkpoint 延時控制在 1 分鐘以內(nèi),Checkpoint 成功率可以做到 99%。
4. BitSail 能力解析
目前技術(shù)架構(gòu)比較成熟,并經(jīng)過字節(jié)跳動各業(yè)務線的驗證,在數(shù)據(jù)的穩(wěn)定性和效率上都能得到一定的保障。因此,我們希望能把自己沉淀的經(jīng)驗對外輸出,給更多企業(yè)和開發(fā)者帶來便利,降低大家數(shù)據(jù)建設的成本,讓數(shù)據(jù)創(chuàng)造高效的價值。為了達到這個目標,我們要解決兩個能力的構(gòu)建。
4.1 低成本共建能力
數(shù)據(jù)集成有一個明顯的網(wǎng)絡效應,每個用戶所面臨的數(shù)據(jù)集成的場景也是不一樣的,因此需要大家的共同參與,完善數(shù)據(jù)集成的功能和生態(tài),這就需要解決共建成本的問題,讓大家都能低成本地參與整個項目的共建和迭代。
在 BitSail 中,我們通過兩個思路推進這個能力建設。
4.1.1 模塊拆分
所有的模塊糅合在一個大的 jar 包中,包括引擎層、數(shù)據(jù)源層、基礎框架層,模塊耦合比較嚴重,數(shù)據(jù)處理流程也不清晰。針對這個問題,我們按照功能模塊進行劃分,將基礎框架和數(shù)據(jù)源從引擎中獨立出來,同時我們的技術(shù)組件采取可插拔的設計,以應對不同的用戶環(huán)境,比如臟數(shù)據(jù)檢測、Schema 同步、監(jiān)控等等,在不同的環(huán)境中會有不同的實現(xiàn)方式。
4.1.2 接口抽象
框架對 Flink API 是深度綁定,用戶需要深入到 Flink 引擎內(nèi)部,這會導致整體 Connector 接入成本比較高。為了解決這個問題,我們抽象了新的讀寫接口,該接口與引擎無關(guān),用戶只要開發(fā)新的接口即可。同時在內(nèi)部會做一層新的抽象接口與引擎接口的轉(zhuǎn)換,這個轉(zhuǎn)換對用戶是屏蔽的,用戶不需要了解底層引擎細節(jié)。
4.2 架構(gòu)的兼容能力
不同公司依賴的大數(shù)據(jù)組件和數(shù)據(jù)源的版本不一樣,同時還會遇到版本前后不兼容問題,因此需要完善架構(gòu)的兼容能力,以解決不同環(huán)境下的快速安裝、部署和驗證。我們同樣有兩個思路來建設這個能力。
4.2.1 多引擎架構(gòu)
當前架構(gòu)和 Flink 引擎深度綁定,在使用場景方面受到一定的限制,比如有些客戶用了 Spark 引擎或者其他引擎。Flink 引擎依賴比較重的情況下,對于簡單場景和小數(shù)據(jù)量場景,整體的資源浪費比較嚴重。
為解決此問題,我們在引擎層預留了多引擎入口,在已經(jīng)預留的 Flink 引擎基礎之上,接下來會擴展到 Spark 引擎或者 Local Engine。? 具體實現(xiàn)方面,我們對執(zhí)行的環(huán)境進行了一層抽象,不同的引擎會去實現(xiàn)我們的抽象類。同時,我們探索 Local 執(zhí)行方式,對小數(shù)據(jù)量在本地通過線程的方式來解決,不用去啟動 Flink Job 或類似的處理,提高整體資源的使用效率。
4.2.2 依賴隔離
目前系統(tǒng)存在一些外部環(huán)境中沒有的內(nèi)部依賴,大數(shù)據(jù)底座也是綁定的公司內(nèi)部版本,我們進行了三個方面的優(yōu)化:
- 剔除公司內(nèi)部依賴,采取開源的通用解決方案,以應對不同的業(yè)務場景。
- 大數(shù)據(jù)底座方面,采用 Provided 依賴,不綁定固定底座,運行時由外部指定,針對不兼容的場景,通過 Maven Profile 和 Maven Shade 隔離。
- 針對數(shù)據(jù)源多版本和版本不兼容的問題,采取動態(tài)加載的策略,將數(shù)據(jù)源做成獨立的組件,每次只會加載需要的數(shù)據(jù)源,以達到隔離的目標。
5. 未來展望
BitSail 希望數(shù)據(jù)暢通無阻地航行到有價值的地方,期待和大家共同合作,完善數(shù)據(jù)集成的功能和生態(tài)。同時未來我們將在三個方面繼續(xù)深化:
① 多引擎架構(gòu):探索 Local Engine 落地,支持本地執(zhí)行,對簡單場景和小數(shù)據(jù)量場景提高資源利用率;實現(xiàn)引擎智能選擇策略,針對簡單場景使用 Local Engine;針對復雜場景復用大數(shù)據(jù)引擎的能力。
② 通用能力建設:推廣新接口,對用戶屏蔽引擎細節(jié),降低 Connector 開發(fā)成本
探索 Connector 多語言方案。
③ 流式數(shù)據(jù)湖:統(tǒng)一 CDC 數(shù)據(jù)入湖解決方案,在性能上穩(wěn)定支撐千萬級 QPS
在數(shù)據(jù)湖平臺能力構(gòu)建方面,全面覆蓋批式、流式、增量使用場景。
??本文感謝 DataFun 志愿者鐘曉華整理?