字節(jié)跳動(dòng)基于Doris的湖倉(cāng)分析探索實(shí)踐
01 Doris簡(jiǎn)介
Apache Doris具備以下幾個(gè)特點(diǎn):
- 良好的架構(gòu)設(shè)計(jì),支持高并發(fā)低延時(shí)的查詢服務(wù),支持高吞吐量的交互式分析。多FE均可對(duì)外提供服務(wù),并發(fā)增加時(shí),線性擴(kuò)充FE和BE即可支持高并發(fā)的查詢請(qǐng)求。
- 支持批量數(shù)據(jù)load和流式數(shù)據(jù)load,支持?jǐn)?shù)據(jù)更新。支持Update/Delete語(yǔ)法,unique/aggregate數(shù)據(jù)模型,支持動(dòng)態(tài)更新數(shù)據(jù),實(shí)時(shí)更新聚合指標(biāo)。
- 提供了高可用,容錯(cuò)處理,高擴(kuò)展的企業(yè)級(jí)特性。FE Leader錯(cuò)誤異常,F(xiàn)E Follower秒級(jí)切換為新Leader繼續(xù)對(duì)外提供服務(wù)。
- 支持聚合表和物化視圖。多種數(shù)據(jù)模型,支持aggregate, replace等多種數(shù)據(jù)模型,支持創(chuàng)建rollup表,支持創(chuàng)建物化視圖。rollup表和物化視圖支持動(dòng)態(tài)更新,無(wú)需用戶手動(dòng)處理。
- MySQL協(xié)議兼容,支持直接使用MySQL客戶端連接,非常易用的數(shù)據(jù)應(yīng)用對(duì)接。
Doris 由 Frontend(以下簡(jiǎn)稱FE)和 Backend(以下簡(jiǎn)稱BE)組成,其中FE負(fù)責(zé)接受用戶請(qǐng)求、編譯、優(yōu)化、分發(fā)執(zhí)行計(jì)劃、元數(shù)據(jù)管理、BE節(jié)點(diǎn)的管理等功能,BE負(fù)責(zé)執(zhí)行由FE下發(fā)的執(zhí)行計(jì)劃,存儲(chǔ)和管理用戶數(shù)據(jù)。
?
02 數(shù)據(jù)湖格式Hudi簡(jiǎn)介
Hudi是下一代流式數(shù)據(jù)湖平臺(tái),為數(shù)據(jù)湖提供了表格式管理的能力,提供事務(wù),ACID,MVCC,數(shù)據(jù)更新刪除,增量數(shù)據(jù)讀取等功能。支持Spark, Flink, Presto, Trino等多種計(jì)算引擎。
?
Hudi根據(jù)數(shù)據(jù)更新時(shí)行為不同分為兩種表類型:
針對(duì)Hudi的兩種表格式,存在3種不同的查詢類型:
03
Doris分析Hudi數(shù)據(jù)的技術(shù)背景
在數(shù)倉(cāng)業(yè)務(wù)中,隨著業(yè)務(wù)對(duì)數(shù)據(jù)實(shí)時(shí)性的要求越來(lái)越高,T+1數(shù)倉(cāng)業(yè)務(wù)逐漸往小時(shí)級(jí)、分鐘級(jí),甚至秒級(jí)演進(jìn)。實(shí)時(shí)數(shù)倉(cāng)的應(yīng)用也越來(lái)越廣,也經(jīng)歷了多個(gè)發(fā)展階段。目前存在著多種解決方案。
1. Lambda架構(gòu)
Lambda將數(shù)據(jù)處理流分為在線分析和離線分析兩條不同的處理路徑,兩條路徑互相獨(dú)立,互不影響。
離線分析處理T+1數(shù)據(jù),使用Hive/Spark處理大數(shù)據(jù)量,不可變數(shù)據(jù),數(shù)據(jù)一般存儲(chǔ)在HDFS等系統(tǒng)上。如果遇到數(shù)據(jù)更新,需要overwrite整張表或整個(gè)分區(qū),成本比較高。
在線分析處理實(shí)時(shí)數(shù)據(jù),使用Flink/Spark Streaming處理流式數(shù)據(jù),分析處理秒級(jí)或分鐘級(jí)流式數(shù)據(jù),數(shù)據(jù)保存在Kafka或定期(分鐘級(jí))保存到HDFS中。?
該套方案存在以下缺點(diǎn):
- 同一套指標(biāo)可能需要開發(fā)兩份代碼來(lái)進(jìn)行在線分析和離線分析,維護(hù)復(fù)雜。
- 數(shù)據(jù)應(yīng)用查詢指標(biāo)時(shí)可能需要同時(shí)查詢離線數(shù)據(jù)和在線數(shù)據(jù),開發(fā)復(fù)雜。
- 同時(shí)部署批處理和流式計(jì)算兩套引擎,運(yùn)維復(fù)雜。
- 數(shù)據(jù)更新需要overwrite整張表或分區(qū),成本高。
2. Kappa架構(gòu)
隨著在線分析業(yè)務(wù)越來(lái)越多,Lambda架構(gòu)的弊端就越來(lái)越明顯,增加一個(gè)指標(biāo)需要在線離線分別開發(fā),維護(hù)困難,離線指標(biāo)可能和在線指標(biāo)對(duì)不齊,部署復(fù)雜,組件繁多。于是Kappa架構(gòu)應(yīng)運(yùn)而生。
Kappa架構(gòu)使用一套架構(gòu)處理在線數(shù)據(jù)和離線數(shù)據(jù),使用同一套引擎同時(shí)處理在線和離線數(shù)據(jù),數(shù)據(jù)存儲(chǔ)在消息隊(duì)列上。?
Kappa架構(gòu)也有一定的局限:
- 流式計(jì)算引擎批處理能力較弱,處理大數(shù)據(jù)量性能較弱。
- 數(shù)據(jù)存儲(chǔ)使用消息隊(duì)列,消息隊(duì)列對(duì)數(shù)據(jù)存儲(chǔ)有有效性限制,歷史數(shù)據(jù)無(wú)法回溯。
- 數(shù)據(jù)時(shí)序可能亂序,可能對(duì)部分在時(shí)序要求方面比較嚴(yán)格的應(yīng)用造成數(shù)據(jù)錯(cuò)誤。
- 數(shù)據(jù)應(yīng)用需要從消息隊(duì)列中取數(shù),需要開發(fā)適配接口,開發(fā)復(fù)雜。
3. 基于數(shù)據(jù)湖的實(shí)時(shí)數(shù)倉(cāng)
針對(duì)Lambda架構(gòu)和Kappa架構(gòu)的缺陷,業(yè)界基于數(shù)據(jù)湖開發(fā)了Iceberg, Hudi, DeltaLake這些數(shù)據(jù)湖技術(shù),使得數(shù)倉(cāng)支持ACID, Update/Delete,數(shù)據(jù)Time Travel, Schema Evolution等特性,使得數(shù)倉(cāng)的時(shí)效性從小時(shí)級(jí)提升到分鐘級(jí),數(shù)據(jù)更新也支持部分更新,大大提高了數(shù)據(jù)更新的性能。兼具流式計(jì)算的實(shí)時(shí)性和批計(jì)算的吞吐量,支持的是近實(shí)時(shí)的場(chǎng)景。
以上方案中其中基于數(shù)據(jù)湖的應(yīng)用最廣,但數(shù)據(jù)湖模式無(wú)法支撐更高的秒級(jí)實(shí)時(shí)性,也無(wú)法直接對(duì)外提供數(shù)據(jù)服務(wù),需要搭建其他的數(shù)據(jù)服務(wù)組件,系統(tǒng)較為復(fù)雜?;诖吮尘跋?,部分業(yè)務(wù)開始使用Doris來(lái)承接,業(yè)務(wù)數(shù)據(jù)分析師需要對(duì)Doris與Hudi中的數(shù)據(jù)進(jìn)行聯(lián)邦分析,此外在Doris對(duì)外提供數(shù)據(jù)服務(wù)時(shí)既要能查詢Doris中數(shù)據(jù),也要能加速查詢離線業(yè)務(wù)中的數(shù)據(jù)湖數(shù)據(jù),因此我們開發(fā)了Doris訪問(wèn)數(shù)據(jù)湖Hudi中數(shù)據(jù)的特性。
04 Doris分析Hudi數(shù)據(jù)的設(shè)計(jì)原理
基于以上背景,我們?cè)O(shè)計(jì)了Apache Doris中查詢數(shù)據(jù)湖格式Hudi數(shù)據(jù),因Hudi生態(tài)為java語(yǔ)言,而Apache Doris的執(zhí)行節(jié)點(diǎn)BE為C++環(huán)境,C++ 無(wú)法直接調(diào)用Hudi java SDK,針對(duì)這一點(diǎn),我們有三種解決方案。
1.實(shí)現(xiàn)Hudi C++ client,在BE中直接調(diào)用Hudi C++ client去讀寫Hudi表。?
該方案需要完整實(shí)現(xiàn)一套Hudi C++ client,開發(fā)周期較長(zhǎng),后期Hudi行為變更需要同步修改Hudi C++ client,維護(hù)較為困難。
2.BE通過(guò)thrift協(xié)議發(fā)送讀寫請(qǐng)求至Broker,由Broker調(diào)用Hudi java client讀取Hudi表。
該方案需要在Broker中增加讀寫Hudi數(shù)據(jù)的功能,目前Broker定位僅為fs的操作接口,引入Hudi打破了Broker的定位。第二,數(shù)據(jù)需要在BE和Broker之間傳輸,性能較低。
3.在BE中使用JNI創(chuàng)建JVM,加載Hudi java client去讀寫Hudi表。
該方案需要在BE進(jìn)程中維護(hù)JVM,有JVM調(diào)用Hudi java client對(duì)Hudi進(jìn)行讀寫。讀寫邏輯使用Hudi社區(qū)java實(shí)現(xiàn),可以維護(hù)與社區(qū)同步;同時(shí)數(shù)據(jù)在同一個(gè)進(jìn)程中進(jìn)行處理,性能較高。但需要在BE維護(hù)一個(gè)JVM,管理較為復(fù)雜。
4.使用BE arrow parquet c++ api讀取hudi parquet base file,hudi表中的delta file暫不處理。?
該方案可以由BE直接讀取hudi表的parquet文件,性能最高。但當(dāng)前不支持base file和delta file的合并讀取,因此僅支持COW表Snapshot Queries和MOR表的Read Optimized Queries,不支持Incremental Queries。
綜上,我們選擇方案四,第一期實(shí)現(xiàn)了COW表Snapshot Queries和MOR表的Read Optimized Queries,后面聯(lián)合Hudi社區(qū)開發(fā)base file和delta file合并讀取的C++接口。
05 Doris分析Hudi數(shù)據(jù)的技術(shù)實(shí)現(xiàn)
Doris中查詢分析Hudi外表使用步驟非常簡(jiǎn)單。
1. 創(chuàng)建Hudi外表
建表時(shí)指定engine為Hudi,同時(shí)指定Hudi外表的相關(guān)信息,如hive metastore uri,在hive metastore中的database和table名字等。
建表僅僅在Doris的元數(shù)據(jù)中增加一張表,無(wú)任何數(shù)據(jù)移動(dòng)。
建表時(shí)支持指定全部或部分hudi schema,也支持不指定schema創(chuàng)建hudi外表。指定schema時(shí)必須與hiveMetaStore中hudi表的列名,類型一致。
Example:
Plaintext
CREATE TABLE example_db.t_hudi
ENGINE=HUDI
PROPERTIES (
"hudi.database" = "hudi_db",
"hudi.table" = "hudi_table",
"hudi.hive.metastore.uris" = "thrift://127.0.0.1:9083"
);
CREATE TABLE example_db.t_hudi (
column1 int,
column2 string)
ENGINE=HUDI
PROPERTIES (
"hudi.database" = "hudi_db",
"hudi.table" = "hudi_table",
"hudi.hive.metastore.uris" = "thrift://127.0.0.1:9083"
);
2. 查詢Hudi外表
查詢Hudi數(shù)據(jù)表時(shí),F(xiàn)E在analazy階段會(huì)查詢?cè)獢?shù)據(jù)獲取到Hudi外表的的hive metastore地址,從Hive metastore中獲取hudi表的schema信息與文件路徑。
- 獲取hudi表的數(shù)據(jù)地址。
- FE規(guī)劃fragment增加HudiScanNode。HudiScanNode中獲取Hudi table對(duì)應(yīng)的data file文件列表。
- 根據(jù)Hudi table獲取的data file列表生成scanRange。
- 下發(fā)HudiScan 任務(wù)至BE節(jié)點(diǎn)。
- BE節(jié)點(diǎn)根據(jù)HudiScanNode指定的Hudi外表文件路徑調(diào)用native parquet reader進(jìn)行數(shù)據(jù)讀取。
?
06 后期規(guī)劃
目前Apche Doris查詢Hudi表已合入社區(qū),當(dāng)前已支持COW表的Snapshot Query,支持MOR表的Read Optimized Query。對(duì)MOR表的Snapshot Query暫時(shí)還未支持,流式場(chǎng)景中的Incremental Query也沒(méi)有支持。
后續(xù)還有幾項(xiàng)工作需要處理,我們和社區(qū)也在積極合作進(jìn)行中:
- MOR表的Snapshot Query。MOR表實(shí)時(shí)讀需要合并讀取Data file與對(duì)應(yīng)的Delta file,BE需要支持Delta file AVRO格式的讀取,需要增加avro的native讀取方式。
- COW/MOR表的Incremental Query。支持實(shí)時(shí)業(yè)務(wù)中的增量讀取。
- BE讀取Hudi base file和delta file的native接口。目前BE讀取Hudi數(shù)據(jù)時(shí),僅能讀取data file,使用的是parquet的C++ SDK。后期我們和聯(lián)合Hudi社區(qū)提供Huid base file和delta file的C++/Rust等語(yǔ)言的讀取接口,在Doris BE中直接使用native接口來(lái)查詢Hudi數(shù)據(jù)。
今天的分享就到這里,謝謝大家。