實時數(shù)據(jù)湖在字節(jié)跳動的實踐
?導讀:今天分享的主題是實時數(shù)據(jù)湖在字節(jié)跳動的實踐,將圍繞下面四點展開:
- 對實時數(shù)據(jù)湖的解讀
- 在落地實時數(shù)據(jù)湖的過程中遇到的一些挑戰(zhàn)和應對方式
- 結合場景介紹實時數(shù)據(jù)湖在字節(jié)內(nèi)部的一些實踐案例
- 數(shù)據(jù)湖發(fā)展的一些規(guī)劃
01對實時數(shù)據(jù)湖的解讀
數(shù)據(jù)湖的概念是比較寬泛的,不同的人可能有著不同的解讀。這個名詞誕生以來,在不同的階段被賦予了不同的含義。
數(shù)據(jù)湖的概念最早是在Hadoop World大會上提出的。當時的提出者給數(shù)據(jù)湖賦予了一個非常抽象的含義,他認為它能解決數(shù)據(jù)集市面臨的一些重要問題。其中最主要的兩個問題是:首先,數(shù)據(jù)集市只保留了部分屬性,只能解決預先定義好的問題;另外,數(shù)據(jù)集市中反映細節(jié)的原始數(shù)據(jù)丟失了,限制了通過數(shù)據(jù)解決問題。
從解決問題的角度出發(fā),希望有一個合適的存儲來保存這些明細的、未加工的數(shù)據(jù)。因此在這個階段,人們對數(shù)據(jù)湖的解讀更多是聚焦在中心化的存儲之上。
不同的云廠商也把自己的對象產(chǎn)存儲產(chǎn)品稱為數(shù)據(jù)湖。比如AWS在那個階段就強調(diào)數(shù)據(jù)湖的存儲屬性,對應的就是自家的對象存儲S3。在Wiki的定義中也是強調(diào)數(shù)據(jù)湖是一個中心化存儲,可以存海量的不同種類的數(shù)據(jù)。但是當對象存儲滿足了大家對存儲海量數(shù)據(jù)的訴求之后,人們對數(shù)據(jù)湖的解讀又發(fā)生了變化。
第二階段,對數(shù)據(jù)湖的解讀更多是從開源社區(qū)和背后的商業(yè)公司發(fā)起的。比如Databricks 作為一個云中立的產(chǎn)品,它將云廠商的這個對象存儲稱為 data lakes storage,然后把自己的重心聚焦在如何基于一個中心化的存儲構建一個數(shù)據(jù)分析、數(shù)據(jù)科學和機器學習的數(shù)據(jù)湖解決方案,并且把這個方案稱之為lake。他們認為在這個中心化的存儲之上構建事務層、索引層、元數(shù)據(jù)層,可以解決數(shù)據(jù)湖上的可靠性、性能和安全的問題。
與此同時,Uber最初也將Hudi對外稱為一個事務型的數(shù)據(jù)湖,名字實際上也是由 Hadoop Updates and Incrementals縮寫而來,最早也是被用于解決Uber內(nèi)部離線數(shù)據(jù)的合規(guī)問題?,F(xiàn)在他們更傾向的定義是一個流式數(shù)據(jù)湖平臺,Iceberg也常常被人們納入數(shù)據(jù)湖的討論。盡管Ryan Blue一直宣稱 Iceberg 是一個Open Table Format。這三者有一些共同點,一個是對 ACID的支持,引入了一個事務層,第二是對 streaming 和 batch的同等支持,第三就是聚焦在如何能更快地查詢數(shù)據(jù)。國內(nèi)也有人將 Hudi、Iceberg、Delta Lake稱為數(shù)據(jù)湖的三劍客。
講完了業(yè)界的解讀,來看一下字節(jié)跳動對數(shù)據(jù)湖的解讀。我們是結合字節(jié)的業(yè)務場景來解讀的。通過實踐總結,我們發(fā)現(xiàn)數(shù)據(jù)湖需要具備六大能力:
- 高效的并發(fā)更新能力
因為它能夠改變我們在 Hive 數(shù)倉中遇到的數(shù)據(jù)更新成本高的問題,支持對海量的離線數(shù)據(jù)做更新刪除。
- 智能的查詢加速
用戶使用數(shù)據(jù)湖的時候,不希望感知到數(shù)據(jù)湖的底層實現(xiàn)細節(jié),數(shù)據(jù)湖的解決方案應該能夠自動地優(yōu)化數(shù)據(jù)分布,提供穩(wěn)定的產(chǎn)品性能。
- 批流一體的存儲
數(shù)據(jù)湖這個技術出現(xiàn)以來,被數(shù)倉行業(yè)給予了厚望,他們認為數(shù)據(jù)湖可以最終去解決一份存儲流批兩種使用方式的問題,從而從根本上提升開發(fā)效率和數(shù)據(jù)質(zhì)量。
- 統(tǒng)一的元數(shù)據(jù)和權限
在一個企業(yè)級的數(shù)據(jù)湖當中,元數(shù)據(jù)和權限肯定是不能少的。同時在湖倉共存的情況下,用戶不希望元數(shù)據(jù)和權限在湖倉兩種情況下是割裂的。
- 極致的查詢性能
用戶對于數(shù)據(jù)湖的期望就是能夠在數(shù)據(jù)實時入湖的同時還能做到數(shù)據(jù)的秒級可視化。
- AI + BI
數(shù)據(jù)湖數(shù)據(jù)的對外輸出,不只局限于BI,同時AI也是數(shù)據(jù)湖的一等公民,數(shù)據(jù)湖也被應用在了字節(jié)的整個推薦體系,尤其是特征工程當中。實時數(shù)據(jù)湖其實是數(shù)據(jù)湖之上,更加注重數(shù)據(jù)的實時屬性或者說流屬性的一個數(shù)據(jù)湖發(fā)展方向。當然,正如業(yè)界對于數(shù)據(jù)湖的解讀一直在演變,我們對數(shù)據(jù)湖的解讀也不會局限于以上場景和功能。
02落地實時數(shù)據(jù)湖過程中的挑戰(zhàn)和應對方式
接下來介紹數(shù)據(jù)湖落地的挑戰(zhàn)和應對。字節(jié)內(nèi)部的數(shù)據(jù)湖最初是基于開源的數(shù)據(jù)湖框架Hudi構建的,選擇Hudi,最簡單的一個原因是相比于 Iceberg 和 Delta Lake,Hudi原生支持可擴展的索引系統(tǒng),能夠幫助數(shù)據(jù)快速定位到所在的位置,達到高效更新的效果。
在嘗試規(guī)模化落地的過程中,我們主要遇到了四個挑戰(zhàn):數(shù)據(jù)難管理、并發(fā)更新弱、更新性能差,以及日志難入湖。
接下來會一一介紹這些挑戰(zhàn)背后出現(xiàn)的原因以及我們應對的策略。
1. 數(shù)據(jù)難管理
下圖是一個典型的基于中心化存儲構建數(shù)倉機器學習和數(shù)據(jù)科學的架構。這里將加工過后的數(shù)據(jù)保存在數(shù)倉中,通過數(shù)倉的元數(shù)據(jù)進行組織。數(shù)據(jù)科學家和機器學習框架都會直接去這個中心化的存儲中獲取原始數(shù)據(jù)。因此在這個中心化存儲之上的數(shù)據(jù)對用戶來說是完全分散的,沒有一個全局的視圖。
為了解決這個數(shù)據(jù)難管理的問題,Databricks 提出了一個Lakehouse 的架構,就是在存儲層之上去構建統(tǒng)一的元數(shù)據(jù)緩存和索引層,所有對數(shù)據(jù)湖之上數(shù)據(jù)的使用都會經(jīng)過這個統(tǒng)一的一層。這和我們的目標很相似,但是現(xiàn)實比較殘酷,我們面臨的是海量存量數(shù)據(jù),這些存量數(shù)據(jù)不管是數(shù)據(jù)格式的遷移,還是使用方式的遷移,亦或是元數(shù)據(jù)的遷移,都意味著巨大的投入。因此在很長一段時間里,我們都會面臨數(shù)倉和數(shù)據(jù)湖共存這樣一個階段。在這一階段,兩者的連通性是用戶最為關心的。
我們在數(shù)據(jù)湖和數(shù)倉之上,構建了一層統(tǒng)一的元數(shù)據(jù)層,這層元數(shù)據(jù)層屏蔽了下層各個系統(tǒng)的元數(shù)據(jù)的異構性,由統(tǒng)一的元數(shù)據(jù)層去對接 BI 工具,對接計算引擎,以及數(shù)據(jù)開發(fā)、治理和權限管控的一系列數(shù)據(jù)工具。而這一層對外暴露的 API 是與 Hive 兼容的。
盡管 Hive 這個引擎已經(jīng)逐漸被其他更新的計算引擎代替了,比如Spark、Presto、Flink,但是它的源數(shù)據(jù)管理依舊是業(yè)界的事實標準。另外,一些云廠商即使選擇構建了自己的元數(shù)據(jù)服務,也都同時提供了和 HMS 兼容的元數(shù)據(jù)查詢接口,各個計算引擎也都內(nèi)置了Hive Catalog 這一層。
解決了上層的訪問統(tǒng)一的問題,但依舊沒有解決數(shù)據(jù)湖和數(shù)倉元數(shù)據(jù)本身的異構問題。這個異構問題是如何導致的呢?為什么Hive Matestore 沒有辦法去滿足元數(shù)據(jù)管理的這個訴求?
這就涉及到數(shù)據(jù)湖管理元數(shù)據(jù)的特殊性。以Hudi為例,作為一個典型的事務型數(shù)據(jù)湖,Hudi使用時間線 Timeline 來追蹤針對表的各種操作。比如commit compaction clean,Timeline 類似于數(shù)據(jù)湖里的事務管理器,記錄對表的更改情況。這些更改或事務記錄了每次更新的操作是發(fā)生在哪些文件當中,哪些文件為新增,哪些文件失效,哪些數(shù)據(jù)新增,哪些數(shù)據(jù)更新。
總結下來,數(shù)據(jù)湖是通過追蹤文件來管理元數(shù)據(jù)。管理的力度更細了,自然也就避免了無效的讀寫放大,從而提供了高效的更新刪除、增量消費、時間旅行等一系列的能力。但這其實也意味著另外一個問題,就是一個目錄中可以包含多個版本的文件,這與 Hive 管理元數(shù)據(jù)的方式產(chǎn)生了分歧,因為 Hive Metastore 是通過目錄的形式來管理元數(shù)據(jù)的,數(shù)據(jù)更新也是通過覆蓋目錄來保證事務。
由于對元信息的管理力度不同,基于 Hive Metastore的元數(shù)據(jù)管理其實是沒有辦法實現(xiàn)數(shù)據(jù)湖剛剛提到的一系列能力。針對這個問題,Hudi社區(qū)的解決方案是使用一個分布式存儲來管理這個 Timeline 。Timeline 里面記錄了每次操作的元數(shù)據(jù),也記錄了一些表的 schema 和分區(qū)的信息,通過同步到 Hive Metastore 來做元數(shù)據(jù)的展示。這個過程中我們發(fā)現(xiàn)了三個問題。
第一個問題是分區(qū)的元數(shù)據(jù)是分散在兩個系統(tǒng)當中的,缺乏 single source of true。
第二個問題是分區(qū)的元數(shù)據(jù)的獲取需要從 HDFS 拉取多個文件,沒有辦法給出類似于 HMS 這樣的秒級訪問響應。服務在線的數(shù)據(jù)應用和開發(fā)工具時,這個延遲沒有辦法滿足需求。
第三個問題是讀表的時候需要拉取大量的目錄和 Timeline 上記錄的表操作對應的元數(shù)據(jù)進行比對,找出最新的這個版本包含的文件。元數(shù)據(jù)讀取本身就很重,并且缺乏裁剪能力,這在近實時的場景下帶來了比較大的overhead。
Hudi Metastore Server 融合了 Hive Metastore 和 Hudi MetaData管理的優(yōu)勢。
首先,Hudi Metastore Server 提供了多租戶的、中心化的元數(shù)據(jù)管理服務,將文件一級的元數(shù)據(jù)保存在適合隨機讀寫的存儲中,讓數(shù)據(jù)湖的元數(shù)據(jù)不再分散在多個文件當中,滿足了single source of true。
其次,Hudi Metastore Server 針對元數(shù)據(jù)的查詢,尤其是一些變更操作。比如Job position 提供了與 Hive Metastore完全兼容的接口,用戶在使用一張數(shù)據(jù)湖上的表的時候,享受到這些增加的高效更新、刪除、增量消費等能力的同時,也能享受到一張 Hive 表所具備的功能,例如通過Spark、Flink、Presto查詢,以及在一些數(shù)據(jù)開發(fā)工具上在線的去獲取到元數(shù)據(jù)以及一些分區(qū) TTL清理的能力。
此外,Hudi Metastore Server還解決了一個關鍵性的問題,就是多任務并發(fā)更新弱的問題。
2. 并發(fā)更新弱
我們最早是基于Hudi社區(qū)的0.7版本的內(nèi)核進行研發(fā)的,當時Hudi的Timeline中的操作必須是完全順序的,每一個新的事務都會去回滾之前未完成的事務,因此無法支持并發(fā)寫入。后續(xù)社區(qū)也實現(xiàn)了一個并發(fā)寫入的方案,整體是基于分布式鎖實現(xiàn)的,并且只支持了Spark COW表的并發(fā)寫,并不適用于 Flink 或者實時的MOR表。但是多任務的并發(fā)寫入是我們內(nèi)部實踐當中一個非常通用的訴求。因此我們在Hudi Metastore Server的Timeline之上,使用樂觀鎖去重新實現(xiàn)了這個并發(fā)的更新能力。同時我們這個并發(fā)控制模塊還能支持更靈活的行列級別并發(fā)寫策略,為后續(xù)要介紹到的實時數(shù)據(jù)關聯(lián)的場景的落地提供了一個可能。
除了多任務的并發(fā)寫入之外,我們在單個 Flink 任務的并發(fā)寫入也遇到了瓶頸。由于Hudi設計之初嚴重依賴Spark。0.7.0的版本才剛剛支持Flink。不管是在穩(wěn)定性還是在功能上都和 Spark On Hudi有非常大的差距。因此在進行高QPS入湖的情況下,我們就遇到了單個Flink任務的擴展性問題。
我們通過在Flink的 embedding term server上支持對當前進行中的事務元信息進行一下緩存,大幅提升了單個任務能夠并發(fā)寫入的文件量級,基本上是在80倍的量級。結合分區(qū)級別的并發(fā)寫入,我們整體支撐了近千萬QPS的數(shù)據(jù)量的增量入湖。
下一步的并發(fā)問題是批流并發(fā)沖突的問題。批流并發(fā)沖突問題類似于一個我們在傳統(tǒng)數(shù)據(jù)湖中遇到的場景,就是有一連串的小事務和一個周期比較長的長事務,如果這兩者發(fā)生沖突,應該如何處理。
如果讓短事務等長事務完成之后再進行,那對一個實時的鏈路來說,意味著數(shù)據(jù)的可見性變低了。同時如果在等待過程中失敗了,還會有非常高的fail over成本。但是如果我們讓這個長事務失敗了,成本又會很高,因為這個長事務往往需要耗費更多的資源和時間。而在批流并發(fā)沖突的這個場景下,最好是兩個都不失敗,但這從語義上來講又不符合我們認知中的隔離級別。
為了解決批流沖突的問題,我們的思路是提供更靈活的沖突檢查和數(shù)據(jù)合并策略。最基礎的就是行級并發(fā)。首先兩個獨立的writer寫入的數(shù)據(jù)在物理上是隔離的,借助文件系統(tǒng)的租約機制也能夠保證對于一個文件同時只有一個writer。所以這個沖突實際上不是發(fā)生在數(shù)據(jù)層面的,而是發(fā)生在元數(shù)據(jù)層面。那數(shù)據(jù)的沖突與否,就可以交由用戶來定義。很多時候入湖的數(shù)據(jù)實際上并不是一個現(xiàn)實中正在發(fā)生的事情,而是一個現(xiàn)實操作的回放。比如圖中的這個場景,我們假設刪除的作業(yè)是針對一個特定的 Snapshot。即使有沖突,我們可以認為整個刪除的過程是瞬時完成的,后續(xù)的新事物可以追加發(fā)生在這次刪除作業(yè)之后。
第二是列級并發(fā)。比如接下來在實踐實際案例中,我們要介紹的這個實時數(shù)據(jù)關聯(lián)場景,每個writer實際上只是根據(jù)主鍵去更新部分的列。因此這些數(shù)據(jù)其實在行級別看起來是沖突的,但是從列的角度來看是完全不沖突的。配合我們的一些確定性索引,數(shù)據(jù)能被寫入到同一個文件組中,這樣就不會出現(xiàn)一致性的問題。
最后是沖突合并。假如兩個數(shù)據(jù)真的是在行級別和列級別都發(fā)生了沖突,那真的只能通過 fail 掉一個事務才能完成嗎?我覺得是不一定的,這里我們受到了git的啟發(fā)。假如兩次 commit沖突了,我們可以提供merge值的策略,比如數(shù)據(jù)中帶有時間戳,在合并時就可以按照時間戳的先后順序來做合并。
3. 更新性能差
我們最早選擇基于Hudi也是因為可擴展的索引系統(tǒng),通過這個索引系統(tǒng)可以快速地定位到需要跟新的文件。這帶來了三點好處:
一個是避免讀取不需要的文件;二是避免更新不必要的文件;三是避免將更新的數(shù)據(jù)和歷史的數(shù)據(jù)做分布式關聯(lián),而是通過提前將文件分好組的方式直接在文件組內(nèi)進行合并。
在早期的落地過程當中,我們嘗試盡可能復用Hudi的一些原生能力,比如Boom Filter index。但是隨著數(shù)據(jù)規(guī)模的不停增長,當達到了千億的量級之后,upsert的數(shù)據(jù)隨著數(shù)據(jù)量的增長逐漸放緩,到了數(shù)千億的量級后,消費的速度甚至趕不上生產(chǎn)者的速度。即使我們?nèi)樗鼣U充了資源,而這時的數(shù)據(jù)總量其實也只是在 TB 級別。我們分析了每個文件組的大小,發(fā)現(xiàn)其實文件組的大小也是一個比較合理的值,基本上是在0.5g到1g之間。進一步分析,我們發(fā)現(xiàn)隨著數(shù)據(jù)量的增長,新的導入在通過索引定位數(shù)據(jù)的這一步花費的時間越來越長。
根本原因是Bloom Filter存在假陽性,一旦命中假陽性的case,我們就需要把整個文件組中的主鍵鏈讀取上來,再進一步判斷這個數(shù)據(jù)是否已經(jīng)存在。通過這種方式來區(qū)分這個到底是 update 還是 insert。upsert本身就是update和insert兩個操作的結合,如果發(fā)現(xiàn)相同組件數(shù)據(jù)不存在,就進行insert。如果存在,我們就進行 update。而 Bloom Filter由于假陽性的存在,只能加速數(shù)據(jù)的insert而沒有辦法去加速update。這就和我們觀察到的現(xiàn)象很一致。因為這個 pipeline 在運行初期,大部分數(shù)據(jù)都是第一次入湖,是insert操作,因此可以被索引加速。但是規(guī)模達到一定量級之后,大部分數(shù)據(jù)都是更新操作,沒有辦法再被索引加速。為了解決這個問題,我們急需一個更穩(wěn)定更高效的索引。
Bloom Filter索引的問題,根因是讀取歷史數(shù)據(jù)進行定位,導致定位的時間越來越長。那有沒有什么辦法是無需讀歷史數(shù)據(jù),也可以快速定位到數(shù)據(jù)所在位置呢?我們想到了類似于 Hive的bucket,也就是哈希的方法來解決這個問題。
Bucket Index原理比較簡單,整個表或者分區(qū)相當于是一張哈希表,文件名中記錄的這個哈希值,就相當于哈希表中這個數(shù)組的值??梢愿鶕?jù)這個數(shù)據(jù)中的主鍵哈希值快速定位到文件組。一個文件組就類似于哈希表中的一個鏈表,可以將數(shù)據(jù)追加到這個文件組當中。Bucket Index成功地解決了流式更新性能的問題。由于極低的定位數(shù)據(jù)的成本,只要設置了一個合適的bucket桶大小,就能解決導入性能的問題,將流式更新能覆蓋的場景從 TB 級別擴展到了百 TB 級別。除了導入的性能,Bucket Index 還加速了數(shù)據(jù)的查詢,其中比較有代表性的就是 bucket Pruning和bucket join。
當然這種索引方式也遇到了擴展性的問題,用戶需要提前一步做桶數(shù)的容量規(guī)劃,給一個比較安全的值,避免單個桶擴大,以便應對接下來的數(shù)據(jù)增長。在數(shù)據(jù)傾斜的場景下,為了讓傾斜值盡可能分散在不同的bucket,會將bucket的數(shù)量調(diào)到很大。而每個bucket平均大小很小,會帶來大量的小文件,給文件系統(tǒng)帶來沖擊的同時也會帶來查詢側性能下滑和寫入側的資源浪費。同時在一線快速增長的業(yè)務,很難對容量有一個精準的預估。如果估算少了,數(shù)據(jù)量飛速增長,單個的bucket的平均大小就會很大,這就會導致寫入和查詢的并發(fā)度不足,影響性能。如果估算多了,就會和傾斜的場景一樣出現(xiàn)大量的小文件。整體的rehash又是一個很重的運維操作,會直接影響業(yè)務側對數(shù)據(jù)的生產(chǎn)和使用。因此不管從業(yè)務的易用性出發(fā),還是考慮到資源的使用率和查詢的效率,我們認為兼具高效導入和查詢性能,也能支持彈性擴展的索引系統(tǒng)是一個重要的方向。
這時我們想到了可擴展hash這個數(shù)據(jù)結構。利用這個結構,我們可以很自然地做桶的分裂和合并,讓整個bucket的索引從手動駕駛進化到自動駕駛。在數(shù)據(jù)寫入的時候,也可以快速地根據(jù)現(xiàn)有的總數(shù),推斷出最深的有效哈希值的長度,通過不斷地對 2 的桶深度次方進行取余的方式,匹配到最接近的分桶寫入。我們將Bucket Index這個索引貢獻到了社區(qū),已在Hudi的0.11版本對外發(fā)布。
4. 日志難入湖
本質(zhì)原因也是因為Hudi的索引系統(tǒng)。因為這個索引系統(tǒng)要求數(shù)據(jù)按照組件聚集,一個最簡單的方式就是把這個組件設成UUID,但這樣就會帶來性能上的問題以及資源上的浪費。因此我們在Hudi之內(nèi)實現(xiàn)了一套新的機制,我們認為是無索引,即繞過Hudi的索引機制,做到數(shù)據(jù)的實時入湖。同時因為沒有主鍵,Upsert 的能力也失效了。我們提供了用更通用的 update 能力,通過shuffle hash join和 broadcast join 去完成數(shù)據(jù)實時更新。
03結合場景介紹實時數(shù)據(jù)湖在字節(jié)內(nèi)部的一些實踐案例
接下來詳細介紹實時數(shù)據(jù)湖在字節(jié)的實踐場景。電商是字節(jié)發(fā)展非常快速的業(yè)務之一,數(shù)據(jù)增長非常快,這也對數(shù)倉的建設提出了較高的要求。目前電商業(yè)務數(shù)據(jù)還是典型的lambda架構,分為是離線數(shù)倉和實時數(shù)倉建設。在實際場景中,lambda架構的問題相信大家都已經(jīng)比較了解了,我就不多做贅述了。這次的場景介紹是圍繞一個主題,通過數(shù)據(jù)湖來構建實時數(shù)倉,使實時數(shù)據(jù)湖切入到實時數(shù)倉的建設當中。這不是一蹴而就的,是分階段一步一步滲透到實時數(shù)倉的建設當中,而實時數(shù)據(jù)湖的終極目標也是在存儲側形成一個真正意義上的批流一體的架構。
我們切入的第一個階段是實時數(shù)據(jù)的近實時可見可測。
坦白說,在實時數(shù)據(jù)湖的落地初期,對于數(shù)據(jù)湖是否能在實時數(shù)倉中真正勝任,大家都是存疑的。因此最早的切入點也比較保守,用在數(shù)據(jù)的驗證環(huán)節(jié)。在電商的實時數(shù)倉中,由于業(yè)務發(fā)展快,上游系統(tǒng)變更,以及數(shù)據(jù)產(chǎn)品需求都非常多。導致實時數(shù)倉開發(fā)周期短,上線變更頻繁。當前這個實時的數(shù)據(jù)的新增字段和指標邏輯變更,或者在任務重構優(yōu)化時,都要對新版本的作業(yè)生成的指標進行驗證。驗證的目標主要有兩點,一是原有指標,數(shù)據(jù)是否一致,二是新增指標的數(shù)據(jù)是否合理。
在采用數(shù)據(jù)湖的方案之前,數(shù)據(jù)湖的驗證環(huán)節(jié)需要將結果導入到Kafka然后再dump到 Hive,進行全量數(shù)據(jù)校驗。這里存在的一個問題就是數(shù)據(jù)無法實時或者近實時可見可檢的,基本上都是一個小時級的延遲。在很多緊急上線的場景下,因為延時的問題,只能去抽測數(shù)據(jù)進行測試驗證,就會影響數(shù)據(jù)質(zhì)量。實時數(shù)據(jù)湖的方案,是通過將實時數(shù)據(jù)低成本的增量導入到數(shù)據(jù)湖中,然后通過Presto進行查詢,然后進行實時計算匯總,計算的結果做到近實時的全面的可見可測。
當然在這個階段中,我們也暴露出了很多數(shù)據(jù)湖上易用性的問題。業(yè)務側的同學反饋最多的問題就是數(shù)據(jù)湖的配置過于復雜。比如要寫一個數(shù)據(jù)湖的任務,Hudi自身就存在十多個參數(shù)需要在寫入任務中配置。這增加了業(yè)務側同學的學習成本和引擎?zhèn)韧瑢W的解釋成本。同時還需要在Flink SQL里定義一個sync table 的DDL,寫一個完整的 schema,很容易會因為頁的順序或者拼寫錯誤導致任務失敗。
我們借助了Hudi Metastore Server 的能力,封裝了大量的參數(shù)。同時使用Flink Catalog的能力,對Meta Server進一步封裝,讓用戶在配置一個 Fink SQL任務的時候,從最初的寫DDL配置十多個參數(shù),到現(xiàn)在只要寫一條 create table like的語句,配置一張臨時表,用戶對這種方式的接受度普遍是比較高的。
第二個階段,也就是第二個應用場景是數(shù)據(jù)的實時入湖和實時分析。
數(shù)據(jù)湖可以同時滿足高效的實時數(shù)據(jù)增量導入和交互式分析的需求,讓數(shù)據(jù)分析師可以自助搭建看板,同時也可以進行低成本的數(shù)據(jù)回刷,真正做到一份數(shù)據(jù)批流兩種使用方式。在這個階段,由于數(shù)據(jù)實際上已經(jīng)開始生產(chǎn)了,用戶對于數(shù)據(jù)入湖的穩(wěn)定性和查詢性能都有很高的要求。我們通過將Compaction任務與實時導入任務拆分,首先解決了資源搶占導致的入湖時效性比較低的問題,同時設計了compaction service,負責compaction任務的調(diào)度,整個過程對業(yè)務側同學完全屏蔽。我們在服務層面也對報警和監(jiān)控進行了加強,能夠做到先于業(yè)務去發(fā)現(xiàn)問題,處理問題,進一步提升了任務的穩(wěn)定性,也讓我們的使用方能夠更有信心地去使用實時數(shù)據(jù)湖。
在查詢的優(yōu)化上面,我們優(yōu)化了讀文件系統(tǒng)的長尾問題,支持了實時表的列裁剪。同時我們對Avro日志進行了短序列化和序列化的case by case的優(yōu)化,還引入了列存的 log進一步提升查詢性能。除了實時數(shù)據(jù)分析之外,這種能力還可以用于機器學習。在特征過程當中,有些label是可以快速地從日志中實時獲取到的。比如對一個視頻點了個贊,和特征是可以關聯(lián)上的。
有些label的生成則是長周期的,比如在抖音上買了一個東西,或者把一個東西加入購物車,到最后的購買,這整個鏈路是很長的,可能涉及到天級別或者周級別的一個不定周期。但是在這兩種情況下,它的特征數(shù)據(jù)基本上都是相同的,這也使底層的存儲有了批流兩種使用方式的訴求,以往都是通過冗余的存儲和計算來解決的。通過數(shù)據(jù)湖可以將短周期的特征和標簽實時地入湖,長周期的每天做一次調(diào)度,做一個批式入湖,真正能做到一份數(shù)據(jù)去適用多個模型。
第三個階段的應用場景是數(shù)據(jù)的實時多維匯總。
在這個階短最重要的目標是實時數(shù)據(jù)的普惠。因為很多的實時數(shù)據(jù)使用方都是通過可視化查詢或者是數(shù)據(jù)服務去消費一個特定的匯總數(shù)據(jù),而這些重度匯總過后的實時數(shù)據(jù)使用率相對來說是比較低的。因此我們和數(shù)倉的同學共同推進了一個實時多維匯總的方案落地。數(shù)倉的同學通過實時計算引擎完成數(shù)據(jù)的多維度的輕度匯總,并且實時地更新入湖。下游可以靈活地按需獲取重度匯總的數(shù)據(jù),這種方式可以縮短數(shù)據(jù)鏈路,提升研發(fā)效能。
在實際的業(yè)務場景中,對于不同的業(yè)務訴求,又可以細分成三個不同的子場景。
第一個場景是內(nèi)部用戶的可視化查詢和報表這一類場景。它的特點是查詢頻率不高,但是維度和指標的組合靈活,同時用戶也能容忍數(shù)秒的延遲。在這種場景下,上層的數(shù)據(jù)應用直接調(diào)用底層的 Presto 引擎行為實時入庫的數(shù)據(jù)進行多維度的重度聚合之后,再做展現(xiàn)。
另外一個主要的場景就是面向在線的數(shù)據(jù)產(chǎn)品。這種場景對高查詢頻率、低查詢延遲的訴求比較高,但是對數(shù)據(jù)可見性的要求反而不那么高。而且,經(jīng)過重度匯總的數(shù)據(jù)量也比較小,這就對數(shù)據(jù)分析工具提出了比較大的挑戰(zhàn)。因此在當前階段,我們通過增加了一個預計算鏈路來解決。
下面一個問題,多維重度匯總的多維計算結果是從我們湖里批量讀出來,然后定時地去寫入 KV存儲,由存儲去直接對接數(shù)據(jù)產(chǎn)品。從長期來看,我們下一步計劃就是對實時數(shù)據(jù)湖之上的表去進行自動地構建物化視圖,并且加載進緩存,以此來兼顧靈活性和查詢性能,讓用戶在享受這種低運維成本的同時,又能滿足低延低查詢延遲、高查詢頻率和靈活使用的訴求。
第四個典型的場景是實時數(shù)據(jù)關聯(lián)。數(shù)據(jù)的關聯(lián)在數(shù)倉中是一個非?;A的訴求,數(shù)倉的同學需要將多個流的指標和維度列進行關聯(lián),形成一張寬表。但是使用維表join,尤其是通過緩存加速的方式,數(shù)據(jù)準確性往往很難保障。而使用多流join 的方式又需要維持一個大狀態(tài),尤其是對于一些關聯(lián)周期不太確定的場景,穩(wěn)定性和準確性之間往往很難取舍。
基于以上背景,我們的實時數(shù)據(jù)湖方案通過了這個列級的并發(fā)寫入和確定性的索引。我們支持多個流式任務并發(fā)地去寫入同一張表中,每個任務只寫表中的部分列。數(shù)據(jù)寫入的 log 件在物理上其實是隔離的,每個log文件當中也只包含了寬表中的部分列,實際上不會產(chǎn)生互相影響。再異步地通過compaction任務定期的對之前對log數(shù)據(jù)進行合并,在這個階段對數(shù)據(jù)進行真正的實際的關聯(lián)操作。通過這種方式,提供一個比較穩(wěn)定的性能。使用這一套方案,實時關聯(lián)用戶也不用再關注狀態(tài)大小和TTL該如何設置這個問題了,寬表的數(shù)據(jù)也可以做到實時可查。
最后一個階段是實時數(shù)據(jù)湖的終極階段,目前仍在探索中。我們只在部分場景開啟了驗證。在這個架構里面,數(shù)據(jù)可以從外部的不同數(shù)據(jù)源中實時或者批量的入湖和出湖,而流批作業(yè)完成湖內(nèi)的數(shù)據(jù)實時流轉,形成真正意義上的存儲層批流一體。
同時在這套架構中,為了解決實時數(shù)據(jù)湖從分鐘級到秒級的最后一公里,我們在實時引擎與數(shù)據(jù)湖的表之間增加了一層數(shù)據(jù)加速服務。在這層數(shù)據(jù)加速服務之上,多個實時作業(yè)可以做到秒級的數(shù)據(jù)流轉,而這個服務也會解決頻繁流式寫入頻繁提交導致的小文件問題,為實時數(shù)據(jù)的交互查詢進一步提速。
除此之外,由于流批作業(yè)的特性不同,批計算往往會需要更高的瞬時吞吐。因此這些批計算任務也可以直接讀寫底層的池化文件系統(tǒng),做到極強的擴展性,真正意義上做到批流寫入的隔離,批作業(yè)的寫入不會受限于加速服務的帶寬。在這個批流一體的架構中,數(shù)據(jù)湖之上的用戶,不管是SQL查詢,還是BI 、AI ,都可以通過一個統(tǒng)一的 table format 享受到數(shù)據(jù)湖之上數(shù)據(jù)的開放性。
04數(shù)據(jù)湖發(fā)展的一些規(guī)劃
未來規(guī)劃主要聚焦于三個維度:功能層面的規(guī)劃,開源層面的規(guī)劃,以及商業(yè)化輸出相關的一些規(guī)劃。
1. 功能層面
首先是功能維度,我們認為一個更智能的實時數(shù)據(jù)湖的加速系統(tǒng)是我們最重要的目標之一。
- 元數(shù)據(jù)層面的加速
數(shù)據(jù)湖托管了文件級別的元數(shù)據(jù),元數(shù)據(jù)的數(shù)據(jù)量相比數(shù)倉有了幾個量級的增長,但同時也給我們帶來了一些優(yōu)化的機會。比如我們未來計劃將查詢的謂詞直接下推到元數(shù)據(jù)系統(tǒng)當中,讓這個引擎在scan階段無需訪問系統(tǒng),直接去跳過無效文件來提升查詢的性能。
- 數(shù)據(jù)的加速
當前的實時數(shù)據(jù)湖由于其 serverless 架構對文件系統(tǒng)的重度依賴,在生產(chǎn)實踐中還是處于分鐘級,秒級依舊處于驗證階段。那我們接下來計劃將這個數(shù)據(jù)湖加速服務不斷地去打磨成熟,用來做實時數(shù)據(jù)的交換和熱數(shù)據(jù)的存儲,以解決分鐘級到秒級的最后一公里問題。智能加速層面臨的最大的挑戰(zhàn)是批流數(shù)據(jù)寫入的一致性問題,這也是我們接下來重點要解決的問題。例如在這種端到端的實時生產(chǎn)鏈路中,如何在提供秒級延時的前提下解決類似于跨表事務的問題。
- 索引加速
通過bucket, zorder等一系列的主鍵索引,進一步提升數(shù)據(jù)湖之上的數(shù)據(jù)的查詢性能,過濾掉大量的原始數(shù)據(jù),避免無效的數(shù)據(jù)交換。同時我們接下來也會非常注重二級索引的支持,因為二級索引的支持可以延伸湖上數(shù)據(jù)的更新能力,從而去加速非主線更新的效率。
- 智能優(yōu)化
我們接下來會通過一套表優(yōu)化服務來實現(xiàn)智能優(yōu)化,因為對于兩個類似的查詢能否去提供一個穩(wěn)定的查詢性能,表的數(shù)據(jù)分布是一個關鍵因素。從用戶的角度來看,用戶只要查詢快、寫入快,像類似于compaction或clustering、索引構建等一系列的表優(yōu)化的方式,只會提升用戶的使用門檻。我們的計劃是通過一個智能的表優(yōu)化服務分析用戶的查詢特征,同時監(jiān)聽這個數(shù)據(jù)湖上數(shù)據(jù)的變化,自適應地觸發(fā)這個表的一系列優(yōu)化操作,可以做到在用戶不需要了解過多細節(jié)的情況下,做到智能的互加速。
2. 開源層面
第二個維度是開源貢獻。我們現(xiàn)在一直在積極地投入到Hudi的社區(qū)貢獻當中,參與了多個Hudi的核心feature的開發(fā)和設計。其中Bucket index是我們合入到社區(qū)的第一個核心功能,而當下我們也在同時貢獻著多個重要的功能,比如最早提到的解決數(shù)據(jù)難管理的Hudi MetaStore Server,我們已經(jīng)貢獻到社區(qū)了,去普惠到開源社區(qū)。因為我們發(fā)現(xiàn)Hudi MetaStore Server不止解決我們在生產(chǎn)實踐中遇到的問題,也是業(yè)界普遍遇到的一個問題?,F(xiàn)在也在跟Hudi社區(qū)的PMC共同探討數(shù)據(jù)湖的元數(shù)據(jù)管理系統(tǒng)制定標準。
其它一些功能我們也計劃分兩個階段貢獻到社區(qū)。比如 RPC 42,將我們的湖表管理服務與大家共享,長期來看能夠做到數(shù)據(jù)湖上的表的自動優(yōu)化。還有 Trino 和Presto DB 的 Hudi Connector,目前也是在和Hudi背后的生態(tài)公司共同推進投入到開源社區(qū)當中。
3. 商業(yè)化輸出
當前在火山引擎之上,我們將內(nèi)部的數(shù)據(jù)湖技術實踐同時通過LAS和EMR這兩個產(chǎn)品向外部企業(yè)輸出。其中LAS湖倉一體分析服務是一個整體面向湖倉一體架構的Serverless數(shù)據(jù)處理分析服務,提供一站式的海量數(shù)據(jù)存儲計算和交互分析能力,完全兼容 Spark、Presto和Flink生態(tài)。同時這個產(chǎn)品具備了完整的字節(jié)內(nèi)部的實時數(shù)據(jù)湖的成熟能力,能夠幫助企業(yè)輕松完成湖倉的構建和數(shù)據(jù)價值的洞察。
另外一個產(chǎn)品 EMR 是一個Stateless的云原生數(shù)倉,100%開源兼容,在這個產(chǎn)品當中也會包含字節(jié)數(shù)據(jù)湖實踐中一些開源兼容的優(yōu)化,以及一些引擎的企業(yè)級增強,以及云上便捷的運維能力。
最后,歡迎大家關注字節(jié)跳動數(shù)據(jù)平臺公眾號,在這里有非常多的技術干貨、產(chǎn)品動態(tài)和招聘信息。
05 問答環(huán)節(jié)
Q:可擴展性的 Bucket Index 具體是怎么做的?
A:可控擴展性的Bucket Index其實是把哈希值的 String 用一個字典樹的思路去解決。我們把它當成一個一個的 bit ,比如說當我們把兩個 bucket 合并了之后,我們就可以少用一個 bit,如果我們把一個 bucket 分裂之后,就會增加一個 bit。
然后這里面其實主要是兩點,一個是查詢層我們怎么去識別它到底屬于哪個 bucket。這個我們是可以通過一個當前的桶數(shù)算出一個最大的這個哈希深度。然后我們?nèi)V岛瓦@個桶的深度的N次方去進行取余。如果取余能匹配上,就說明這個桶是存在的。如果匹配不上,我們就把這個深度減1,然后再進行取余,直到能匹配上為止,這個是在寫入的時候。
第二個就是在查詢層面,我們會找一個合理的并行度,比如說我們這個桶的深度可能是6,但是這個6的文件占的數(shù)量特別少,那我們可能就再把它減少一位。然后從整個查詢的這個角度來看,我們減少一位的話,這個數(shù)據(jù)分布其實應該是更為合理的。我們把文件先分好組,讓每個 task 去拿到對應的一個特定的哈希值上的一個文件。
還有一個就是當數(shù)據(jù)真正發(fā)生這merge 和 split 的時候,這個階段我們是如何處理的?這個階段其實這樣的,當一個文件發(fā)生分裂的時候,它原始的數(shù)據(jù)是不用動的。我們可以認為它就是一個引用,因為我們匹配到了新的file group。我們可以找到之前它引用的原生沒有擴容的這個bucket,然后我們依舊還是可以去把這個數(shù)據(jù)拿到,并且在這個沒有擴容的file group上,我們可以套一層 hash filter ,然后可以保證這個數(shù)據(jù)不會有重復。最后我們異步地去做一個 clustering這個時候真正地去對數(shù)據(jù)物理上面去完成一個歷史數(shù)據(jù)的重分布。
Q:這邊對數(shù)據(jù)湖的應用主要是實時數(shù)倉嗎?
A:實時數(shù)倉是我們非常重要的一個落地場景。這次為什么著重介紹實時數(shù)倉,也是這次的這個整體的 topic 是字節(jié)跳動實時數(shù)據(jù)湖的引用。這個數(shù)據(jù)湖在我們內(nèi)部其實也會用于離線數(shù)倉,可能也會用于推薦系統(tǒng),很多場景都會有相應的一個應用。
Q:感覺schema on read的這種特性的實踐和預期并不一致。
A:其實是這樣的,schema on read目前的實踐整體來說是比較少的,但是其實我們是有一些預期的。我可以大概講一下我的理解,首先我們在數(shù)據(jù)入湖的時候,對數(shù)據(jù)的期望還是它要是結構化的。但是我們schema on read的核心可能不是說去支持這種類似于非結構化或者說是沒有辦法去結構化的數(shù)據(jù),我們的核心可能是要去支持數(shù)據(jù)的一個靈活的演變能力。那這里面其實有幾種思路。
第一種思路的話就是我們在表的schema層,去做一個靈活演變的支持。第二個思路也非常的類似于 git 的思路,就是我們的這個用戶其實對同一份數(shù)據(jù)它有不同視圖的需求。我們可以把這個數(shù)據(jù)以git 的思路去把它做成分支。每個人在同一份數(shù)據(jù)上面,有一個自己的數(shù)據(jù)的視圖,這個我認為可能也是 schema on read 的下一個重要的發(fā)展方向,我們可能有一張表,這張表每個人他看到的這個視圖可能是不一樣的。然后每個人可以往自己的視圖里頭去加上一些自己想要的數(shù)據(jù)。這個在實際的業(yè)務場景中其實也是存在的。比如說一個實時數(shù)據(jù),它進來的時候,它可能這個指標不是很全的,但是我們有些指標可能是需要在這個離線加工完之后再回灌進去。那這樣的話,其實這一張表對用戶呈現(xiàn)的就是兩個視圖。那我們接下來可能要做的就是如何去解決這個不同視圖之間的這個隔離的問題。不管是存儲上面的這個隔離,還是權限上面的隔離,還是元數(shù)據(jù)上面的隔離。
Q:數(shù)據(jù)湖里面是否還需要考慮類似數(shù)倉的分層架構,如果需要的話是如何實現(xiàn)的?
A:這主要取決于上層用戶如何使用數(shù)據(jù)湖,目前來看實際依舊還是有分層架構的,但是從底層來看,不管用戶是否分層,數(shù)據(jù)湖提供的能力是一樣的。
今天的分享就到這里,謝謝大家。?