汽車之家:基于 Flink + Iceberg 的湖倉(cāng)一體架構(gòu)實(shí)踐
內(nèi)容簡(jiǎn)要:
一、數(shù)據(jù)倉(cāng)庫(kù)架構(gòu)升級(jí)的背景
二、基于 Iceberg 的湖倉(cāng)一體架構(gòu)實(shí)踐
三、總結(jié)與收益
四、后續(xù)規(guī)劃
一、數(shù)據(jù)倉(cāng)庫(kù)架構(gòu)升級(jí)的背景
1. 基于 Hive 的數(shù)據(jù)倉(cāng)庫(kù)的痛點(diǎn)
原有的數(shù)據(jù)倉(cāng)庫(kù)完全基于 Hive 建造而成,主要存在三大痛點(diǎn):
痛點(diǎn)一:不支持 ACID
1)不支持 Upsert 場(chǎng)景;
2)不支持 Row-level delete,數(shù)據(jù)修正成本高。
痛點(diǎn)二:時(shí)效性難以提升
1)數(shù)據(jù)難以做到準(zhǔn)實(shí)時(shí)可見;
2)無法增量讀取,無法實(shí)現(xiàn)存儲(chǔ)層面的流批統(tǒng)一;
3)無法支持分鐘級(jí)延遲的數(shù)據(jù)分析場(chǎng)景。
痛點(diǎn)三:Table Evolution
1)寫入型 Schema,對(duì) Schema 變更支持不好;
2)Partition Spec 變更支持不友好。
2. Iceberg 關(guān)鍵特性
Iceberg 主要有四大關(guān)鍵特性:支持 ACID 語(yǔ)義、增量快照機(jī)制、開放的表格式和流批接口支持。
支持 ACID 語(yǔ)義不會(huì)讀到不完整的 Commit;基于樂觀鎖支持并發(fā) Commit;Row-level delete,支持 Upsert。
增量快照機(jī)制Commit 后數(shù)據(jù)即可見(分鐘級(jí));可回溯歷史快照。
開放的表格式數(shù)據(jù)格式:parquet、orc、avro計(jì)算引擎:Spark、Flink、Hive、Trino/Presto
流批接口支持支持流、批寫入;支持流、批讀取。
二、基于 Iceberg 的湖倉(cāng)一體架構(gòu)實(shí)踐
湖倉(cāng)一體的意義就是說我不需要看見湖和倉(cāng),數(shù)據(jù)有著打通的元數(shù)據(jù)的格式,它可以自由的流動(dòng),也可以對(duì)接上層多樣化的計(jì)算生態(tài)。
——賈揚(yáng)清(阿里云計(jì)算平臺(tái)高級(jí)研究員)
1. Append 流入湖的鏈路
上圖為日志類數(shù)據(jù)入湖的鏈路,日志類數(shù)據(jù)包含客戶端日志、用戶端日志以及服務(wù)端日志。這些日志數(shù)據(jù)會(huì)實(shí)時(shí)錄入到 Kafka,然后通過 Flink 任務(wù)寫到 Iceberg 里面,最終存儲(chǔ)到 HDFS。
2. Flink SQL 入湖鏈路打通
我們的 Flink SQL 入湖鏈路打通是基于 “Flink 1.11 + Iceberg 0.11” 完成的,對(duì)接 Iceberg Catalog 我們主要做了以下內(nèi)容:
1)Meta Server 增加對(duì) Iceberg Catalog 的支持;
2)SQL SDK 增加 Iceberg Catalog 支持。
然后在這基礎(chǔ)上,平臺(tái)開放 Iceberg 表的管理功能,使得用戶可以自己在平臺(tái)上建 SQL 的表。
3. 入湖 - 支持代理用戶
第二步是內(nèi)部的實(shí)踐,對(duì)接現(xiàn)有預(yù)算體系、權(quán)限體系。
因?yàn)橹捌脚_(tái)做實(shí)時(shí)作業(yè)的時(shí)候,平臺(tái)都是默認(rèn)為 Flink 用戶去運(yùn)行的,之前存儲(chǔ)不涉及 HDFS 存儲(chǔ),因此可能沒有什么問題,也就沒有思考預(yù)算劃分方面的問題。
但是現(xiàn)在寫 Iceberg 的話,可能就會(huì)涉及一些問題。比如數(shù)倉(cāng)團(tuán)隊(duì)有自己的集市,數(shù)據(jù)就應(yīng)該寫到他們的目錄下面,預(yù)算也是劃到他們的預(yù)算下,同時(shí)權(quán)限和離線團(tuán)隊(duì)賬號(hào)的體系打通。
如上所示,這塊主要是在平臺(tái)上做了代理用戶的功能,用戶可以去指定用哪個(gè)賬號(hào)去把這個(gè)數(shù)據(jù)寫到 Iceberg 里面,實(shí)現(xiàn)過程主要有以下三個(gè)。
增加 Table 級(jí)別配置:'iceberg.user.proxy' = 'targetUser’1)啟用 Superuser2)團(tuán)隊(duì)賬號(hào)鑒權(quán)
訪問 HDFS 時(shí)啟用代理用戶:
訪問 Hive Metastore 時(shí)指定代理用戶1)參考 Spark 的相關(guān)實(shí)現(xiàn):org.apache.spark.deploy.security.HiveDelegationTokenProvider2)動(dòng)態(tài)代理 HiveMetaStoreClient,使用代理用戶訪問 Hive metastore
4. Flink SQL 入湖示例
DDL + DML
5. CDC 數(shù)據(jù)入湖鏈路
如上所示,我們有一個(gè) AutoDTS 平臺(tái),負(fù)責(zé)業(yè)務(wù)庫(kù)數(shù)據(jù)的實(shí)時(shí)接入。我們會(huì)把這些業(yè)務(wù)庫(kù)的數(shù)據(jù)接入到 Kafka 里面,同時(shí)它還支持在平臺(tái)上配置分發(fā)任務(wù),相當(dāng)于把進(jìn) Kafka 的數(shù)據(jù)分發(fā)到不同的存儲(chǔ)引擎里,在這個(gè)場(chǎng)景下是分發(fā)到 Iceberg 里。
6. Flink SQL CDC 入湖鏈路打通
下面是我們基于 “Flink1.11 + Iceberg 0.11” 支持 CDC 入湖所做的改動(dòng):
改進(jìn) Iceberg Sink:
Flink 1.11 版本為 AppendStreamTableSink,無法處理 CDC 流,修改并適配。
表管理1)支持 Primary key(PR1978)2)開啟 V2 版本:'iceberg.format.version' = '2'
7. CDC 數(shù)據(jù)入湖
1. 支持 Bucket
Upsert 場(chǎng)景下,需要確保同一條數(shù)據(jù)寫入到同一 Bucket 下,這又如何實(shí)現(xiàn)?
目前 Flink SQL 語(yǔ)法不支持聲明 bucket 分區(qū),通過配置的方式聲明 Bucket:
'partition.bucket.source'='id', // 指定 bucket 字段
'partition.bucket.num'='10', // 指定 bucket 數(shù)量
2. Copy-on-write sink
做 Copy-on-Write 的原因是原本社區(qū)的 Merge-on-Read 不支持合并小文件,所以我們臨時(shí)去做了 Copy-on-write sink 的實(shí)現(xiàn)。目前業(yè)務(wù)一直在測(cè)試使用,效果良好。
上方為 Copy-on-Write 的實(shí)現(xiàn),其實(shí)跟原來的 Merge-on-Read 比較類似,也是有 StreamWriter 多并行度寫入和 FileCommitter 單并行度順序提交。
在 Copy-on-Write 里面,需要根據(jù)表的數(shù)據(jù)量合理設(shè)置 Bucket 數(shù),無需額外做小文件合并。
StreamWriter 在 snapshotState 階段多并行度寫入1)增加 Buffer;2)寫入前需要判斷上次 checkpoint 已經(jīng) commit 成功;3)按 bucket 分組、合并,逐個(gè) Bucket 寫入。
FileCommitter 單并行度順序提交1)table.newOverwrite()2)Flink.last.committed.checkpoint.id
8. 示例 - CDC 數(shù)據(jù)配置入湖
如上圖所示,在實(shí)際使用中,業(yè)務(wù)方可以在 DTS 平臺(tái)上創(chuàng)建或配置分發(fā)任務(wù)即可。
實(shí)例類型選擇 Iceberg 表,然后選擇目標(biāo)庫(kù),表明要把哪個(gè)表的數(shù)據(jù)同步到 Iceberg 里,然后可以選原表和目標(biāo)表的字段的映射關(guān)系是什么樣的,配置之后就可以啟動(dòng)分發(fā)任務(wù)。啟動(dòng)之后,會(huì)在實(shí)時(shí)計(jì)算平臺(tái) Flink 里面提交一個(gè)實(shí)時(shí)任務(wù),接著用 Copy-on-write sink 去實(shí)時(shí)地把數(shù)據(jù)寫到 Iceberg 表里面。
9. 入湖其他實(shí)踐
實(shí)踐一:減少 empty commit
問題描述:
在上游 Kafka 長(zhǎng)期沒有數(shù)據(jù)的情況下,每次 Checkpoint 依舊會(huì)生成新的 Snapshot,導(dǎo)致大量的空文件和不必要的 Snapshot。
解決方案(PR - 2042):
增加配置 Flink.max-continuousempty-commits,在連續(xù)指定次數(shù) Checkpoint 都沒有數(shù)據(jù)后才真正觸發(fā) Commit,生成 Snapshot。
實(shí)踐二:記錄 watermark
問題描述:
目前 Iceberg 表本身無法直接反映數(shù)據(jù)寫入的進(jìn)度,離線調(diào)度難以精準(zhǔn)觸發(fā)下游任務(wù)。
解決方案( PR - 2109 ):
在 Commit 階段將 Flink 的 Watermark 記錄到 Iceberg 表的 Properties 中,可直觀的反映端到端的延遲情況,同時(shí)可以用來判斷分區(qū)數(shù)據(jù)完整性,用于調(diào)度觸發(fā)下游任務(wù)。
實(shí)踐三:刪表優(yōu)化
問題描述:
刪除 Iceberg 可能會(huì)很慢,導(dǎo)致平臺(tái)接口相應(yīng)超時(shí)。因?yàn)?Iceberg 是面向?qū)ο蟠鎯?chǔ)來抽象 IO 層的,沒有快速清除目錄的方法。
解決方案:
擴(kuò)展 FileIO,增加 deleteDir 方法,在 HDFS 上快速刪除表數(shù)據(jù)。
10. 小文件合并及數(shù)據(jù)清理
定期為每個(gè)表執(zhí)行批處理任務(wù)(spark 3),分為以下三個(gè)步驟:
1. 定期合并新增分區(qū)的小文件:
rewriteDataFilesAction.execute(); 僅合并小文件,不會(huì)刪除舊文件。
2. 刪除過期的 snapshot,清理元數(shù)據(jù)及數(shù)據(jù)文件:
table.expireSnapshots().expireOld erThan(timestamp).commit();
3. 清理 orphan 文件,默認(rèn)清理 3 天前,且無法觸及的文件:
removeOrphanFilesAction.older Than(timestamp).execute();
11. 計(jì)算引擎 – Flink
Flink 是實(shí)時(shí)平臺(tái)的核心計(jì)算引擎,目前主要支持?jǐn)?shù)據(jù)入湖場(chǎng)景,主要有以下幾個(gè)方面的特點(diǎn)。
數(shù)據(jù)準(zhǔn)實(shí)時(shí)入湖:Flink 和 Iceberg 在數(shù)據(jù)入湖方面集成度最高,F(xiàn)link 社區(qū)主動(dòng)擁抱數(shù)據(jù)湖技術(shù)。
平臺(tái)集成:AutoStream 引入 IcebergCatalog,支持通過 SQL 建表、入湖 AutoDTS 支持將 MySQL、SQLServer、TiDB 表配置入湖。
流批一體:在流批一體的理念下,F(xiàn)link 的優(yōu)勢(shì)會(huì)逐漸體現(xiàn)出來。
12. 計(jì)算引擎 – Hive
Hive 在 SQL 批處理層面 Iceberg 和 Spark 3 集成度更高,主要提供以下三個(gè)方面的功能。
定期小文件合并及 meta 信息查詢:SELECT * FROM prod.db.table.history 還可查看 snapshots, files, manifests。
離線數(shù)據(jù)寫入:1)Insert into 2)Insert overwrite 3)Merge into
分析查詢:主要支持日常的準(zhǔn)實(shí)時(shí)分析查詢場(chǎng)景。
13. 計(jì)算引擎 – Trino/Presto
AutoBI 已經(jīng)和 Presto 集成,用于報(bào)表、分析型查詢場(chǎng)景。
Trino1)直接將 Iceberg 作為報(bào)表數(shù)據(jù)源2)需要增加元數(shù)據(jù)緩存機(jī)制:https://github.com/trinodb/trino/issues/7551
Presto社區(qū)集成中:https://github.com/prestodb/presto/pull/15836
14. 踩過的坑
1. 訪問 Hive Metastore 異常
問題描述:HiveConf 的構(gòu)造方法的誤用,導(dǎo)致 Hive 客戶端中聲明的配置被覆蓋,導(dǎo)致訪問 Hive metastore 時(shí)異常
解決方案(PR-2075):修復(fù) HiveConf 的構(gòu)造,顯示調(diào)用 addResource 方法,確保配置不會(huì)被覆蓋:hiveConf.addResource(conf);
2.Hive metastore 鎖未釋放
問題描述:“CommitFailedException: Timed out after 181138 ms waiting for lock xxx.” 原因是 hiveMetastoreClient.lock 方法,在未獲得鎖的情況下,也需要顯示 unlock,否則會(huì)導(dǎo)致上面異常。
解決方案(PR-2263):優(yōu)化 HiveTableOperations#acquireLock 方法,在獲取鎖失敗的情況下顯示調(diào)用 unlock 來釋放鎖。
3. 元數(shù)據(jù)文件丟失
問題描述:Iceberg 表無法訪問,報(bào) “NotFoundException Failed to open input stream for file : xxx.metadata.json”
解決方案(PR-2328):當(dāng)調(diào)用 Hive metastore 更新 iceberg 表的 metadata_location 超時(shí)后,增加檢查機(jī)制,確認(rèn)元數(shù)據(jù)未保存成功后再刪除元數(shù)據(jù)文件。
三、收益與總結(jié)
1. 總結(jié)
通過對(duì)湖倉(cāng)一體、流批融合的探索,我們分別做了總結(jié)。
湖倉(cāng)一體1)Iceberg 支持 Hive Metastore;2)總體使用上與 Hive 表類似:相同數(shù)據(jù)格式、相同的計(jì)算引擎。
流批融合準(zhǔn)實(shí)時(shí)場(chǎng)景下實(shí)現(xiàn)流批統(tǒng)一:同源、同計(jì)算、同存儲(chǔ)。
2. 業(yè)務(wù)收益
數(shù)據(jù)時(shí)效性提升:入倉(cāng)延遲從 2 小時(shí)以上降低到 10 分鐘以內(nèi);算法核心任務(wù) SLA 提前 2 小時(shí)完成。
準(zhǔn)實(shí)時(shí)的分析查詢:結(jié)合 Spark 3 和 Trino,支持準(zhǔn)實(shí)時(shí)的多維分析查詢。
特征工程提效:提供準(zhǔn)實(shí)時(shí)的樣本數(shù)據(jù),提高模型訓(xùn)練時(shí)效性。
CDC 數(shù)據(jù)準(zhǔn)實(shí)時(shí)入倉(cāng):可以在數(shù)倉(cāng)針對(duì)業(yè)務(wù)表做準(zhǔn)實(shí)時(shí)分析查詢。
3. 架構(gòu)收益 - 準(zhǔn)實(shí)時(shí)數(shù)倉(cāng)
上方也提到了,我們支持準(zhǔn)實(shí)時(shí)的入倉(cāng)和分析,相當(dāng)于是為后續(xù)的準(zhǔn)實(shí)時(shí)數(shù)倉(cāng)建設(shè)提供了基礎(chǔ)的架構(gòu)驗(yàn)證。準(zhǔn)實(shí)時(shí)數(shù)倉(cāng)的優(yōu)勢(shì)是一次開發(fā)、口徑統(tǒng)一、統(tǒng)一存儲(chǔ),是真正的批流一體。劣勢(shì)是實(shí)時(shí)性較差,原來可能是秒級(jí)、毫秒級(jí)的延遲,現(xiàn)在是分鐘級(jí)的數(shù)據(jù)可見性。
但是在架構(gòu)層面上,這個(gè)意義還是很大的,后續(xù)我們能看到一些希望,可以把整個(gè)原來 “T + 1” 的數(shù)倉(cāng),做成準(zhǔn)實(shí)時(shí)的數(shù)倉(cāng),提升數(shù)倉(cāng)整體的數(shù)據(jù)時(shí)效性,然后更好地支持上下游的業(yè)務(wù)。
四、后續(xù)規(guī)劃
1. 跟進(jìn) Iceberg 版本
全面開放 V2 格式,支持 CDC 數(shù)據(jù)的 MOR 入湖。
2. 建設(shè)準(zhǔn)實(shí)時(shí)數(shù)倉(cāng)
基于 Flink 通過 Data pipeline 模式對(duì)數(shù)倉(cāng)各層表全面提速。
3. 流批一體
隨著 upsert 功能的逐步完善,持續(xù)探索存儲(chǔ)層面流批一體。
4. 多維分析
基于 Presto/Spark3 輸出準(zhǔn)實(shí)時(shí)多維分析。