縱騰湖倉全鏈路落地實踐
一、總體架構(gòu)
面對日益增長的數(shù)據(jù)量,Lambda 架構(gòu)使用離線/實時兩條鏈路和兩種存儲完成數(shù)據(jù)的保存和處理。這種繁雜的架構(gòu)體系帶來了不一致的問題,需要通過修數(shù)、補數(shù)等一系列監(jiān)控運維手段去彌補。為了統(tǒng)一簡化架構(gòu),提高開發(fā)效率,減少運維負(fù)擔(dān),我們實施了基于數(shù)據(jù)湖 Hudi+Flink 的流批一體架構(gòu),達(dá)到了降本增效的目的。
如下圖所示,總體架構(gòu)包括數(shù)據(jù)采集、ETL、查詢、調(diào)度、監(jiān)控、數(shù)據(jù)服務(wù)等。要解決的是數(shù)據(jù)從哪里來到哪里去,怎么過去,怎么用,以及過程中的調(diào)度和監(jiān)控、元數(shù)據(jù)管理、權(quán)限管理等問題。
“數(shù)據(jù)從哪里來”,我們的數(shù)據(jù)來自 MySQL、MongoDB、Tablestore、Hana?!皵?shù)據(jù)到哪里去”,我們的數(shù)據(jù)會寫入到 Hudi、Doris,其中 Doris 負(fù)責(zé)存儲部分應(yīng)用層的數(shù)據(jù)。“數(shù)據(jù)怎么過去”,將在后面的實時入湖部分進(jìn)行介紹。“數(shù)據(jù)用在哪里”,我們的數(shù)據(jù)會被 OLAP、機(jī)器學(xué)習(xí)、API、BI 查詢使用,其中 OLAP 和 BI 都通過 Kyuubi 的服務(wù)進(jìn)行查詢。
任務(wù)的調(diào)度主要通過 DolpuinScheduler 來執(zhí)行,基于 quartz 的 cronTrigger 完成 shell、SQL 等調(diào)度。監(jiān)控部分則是通過 Prometheus 和 Grafana,這是業(yè)界通用的解決方案。元數(shù)據(jù)采集通過 DataHub 完成,采用了 datahub 的 ingestion framework 框架來采集各種數(shù)據(jù)源的元數(shù)據(jù)。權(quán)限管理主要包括 Kyuubi 服務(wù)端的統(tǒng)一認(rèn)證和引擎端的獨立鑒權(quán)。
二、入湖方案選型
數(shù)據(jù)入湖方案設(shè)計上,我們比較了三種入湖的實現(xiàn)思路。
1、入湖方案一
如下圖所示,包含了兩條支線:
- 分支①:Flink SQL 通過 MySQL-CDC connector 和 Hudi connector 完成 source 和 sink 端讀寫。這樣 MySQL 每張表由單獨的 binlog dump 線程讀取 binlog。
- 分支②:通過 MySQL 多庫表配置一個 Debezium Connector 實現(xiàn)單獨 binlog dump 線程讀取多庫表,解析后發(fā)送到 Kafka 的多個 topic。即一張表一個 topic。之后用 Flink SQL通過 Kafka connector 和 Hudi connector 完成 source 和 sink 端讀寫。
這種方案的主要優(yōu)點是 Flink 和 CDC 組件都經(jīng)過了充分驗證,已經(jīng)非常穩(wěn)定成熟了。而主要缺點是 Flink SQL 需要定義表 DDL。但我們已經(jīng)開發(fā) DDL 列信息從元數(shù)據(jù)系統(tǒng)獲取,無須自定義。并且寫 Hudi 是每張表一個 Flink 任務(wù),這樣會導(dǎo)致資源占用過多。另外 Flink CDC 還不支持 Schema 演變,一旦 Schema 變更,需要重新拉取數(shù)據(jù)。
2、入湖方案二
這一方案是在前一個方案分支二的基礎(chǔ)上進(jìn)行了一定的改進(jìn),通過 Dinky 完成整庫數(shù)據(jù)同步,其優(yōu)點是同源數(shù)據(jù)合并成一個 source 節(jié)點,減輕源庫壓力,根據(jù) schema、database、table 分流 sink 到對應(yīng)表。其缺點是不支持 schema 演變,表結(jié)構(gòu)變更須重新導(dǎo)數(shù)。如下圖所示,mysql_biz 庫中有3張表,從 flink dag 圖看到 mysql cdc source 分3條流 sink 到 Hudi 的3張表。
3、入湖方案三
主要流程如下圖所示。其主要優(yōu)點是支持 Schema 演變。Schema 變更的信息由 Debezium 注冊到 Confluence Schema Registry,schema change 的信息通過 DeltaStreamer 執(zhí)行任務(wù)變更到 Hudi,使得任務(wù)執(zhí)行過程中不需要重新拉起。其主要缺點是依賴于 Spark 計算引擎,而我們部門主要用 Flink,當(dāng)然,這會因各個公司實際情況而不同。
下圖分別是 Yarn 的 deltastreamer 任務(wù), Kafka schema-change topic 的 DML message 和 Hudi 表變更后的數(shù)據(jù)。
4、入湖方案總結(jié)
在方案選型時,可以根據(jù)下面的流程圖進(jìn)行比較選擇:
(1) 先看計算框架是 Spark 還是Flink,如果是Spark 則選擇方案三,即 Deltastreamer,這一方案適用于表結(jié)構(gòu)變更頻繁,重新拉取代價高,主要技術(shù)棧是Spark 的情況。
(2) 如果是 Flink,再看數(shù)據(jù)量是否較少,以及表結(jié)構(gòu)是否較穩(wěn)定,如果是的話,選擇方案二,Dinky 整庫同步方案支持表名過濾,適用數(shù)據(jù)量較少且表結(jié)構(gòu)較穩(wěn)定的表。
(3) 如果否,再考慮 mysql 能否抗較大壓力,如果否,那么選擇方案一下分支,即 Kafka Connect,Debezium 拉取發(fā)送 Kafka,從 Kafka 讀取后寫 Hudi。適用數(shù)據(jù)量較大的多張表。
(4) 如果是,則選擇方案一上分支,即 Flink SQL mysql-cdc 寫 Hudi,適用于對實時穩(wěn)定要求高于資源敏感的重要業(yè)務(wù)場景。
三、實時入湖優(yōu)化
我們的入湖場景是 Flink Stream API 讀取Pulsar 寫 Hudi MOR 表,特點是數(shù)據(jù)量大,并且源端的每條消息都只包含了部分的列數(shù)據(jù)。我們通過使用 Hudi 的 MOR 表格式和 PartialUpdateAvroPayload 實現(xiàn)了這個需求。使用 Hudi 的 MOR 格式,是因為 COW 的寫放大問題,不適合數(shù)據(jù)量大的實時場景,而 MOR 是增量數(shù)據(jù)寫行存 Avro 格式log,通過在線或離線方式壓縮合并至列存格式 parquet。在保證寫效率的同時也兼顧了查詢的性能。不過需要通過合并任務(wù)定期地對數(shù)據(jù)進(jìn)行合并處理,這是引入復(fù)雜度的地方。
以下面這張圖為例,recordKey 是 ID1 的3條 msg,每條分別包含一個列值,其余字段為空,按 ts 列 precombine,當(dāng) ts3 > ts2 > ts1時,最終 Hudi 存的 ID1 行的值是 v1,v2,v3,ts3。
此入湖場景痛點包括,MOR 表索引選擇不當(dāng),壓縮異常導(dǎo)致越寫越慢,直至 checkpoint 超時,某分區(qū)存在重復(fù)文件導(dǎo)致寫任務(wù)出錯,MOR 表某個壓縮計劃 pending阻礙此 bucket 的壓縮及后續(xù)的壓縮計劃生成,以及如何平衡效率與資源等。
我們在實踐過程中針對一些痛點實施了相應(yīng)的解決方案。
Hudi 表索引類型選擇不當(dāng),導(dǎo)致越寫越慢至 CK 超時,這是因為 Bucket 索引通過 hash 映射 recordKey 到 fileGroup。而 Bloom 索引是保存 recordKey 和 partition、fileGroup 值來實現(xiàn),因此 checkpoint size 會隨數(shù)據(jù)量的增加而增長。Bloom Filter 索引基于布隆過濾器實現(xiàn),索引信息存儲在 parquet 的 footer 中,Bloom 的假陽性問題也會導(dǎo)致更新越來越慢,假陽性是指只能判斷數(shù)據(jù)一定不在某個文件而不能保證數(shù)據(jù)一定在某個文件,因此存在多個文件都可能存在某條數(shù)據(jù),即須讀取多個文件才能準(zhǔn)確判斷。
我們做的優(yōu)化是使用 Bucket 索引代替 Bloom 索引,Hudi 目前也支持了可以動態(tài)擴(kuò)容的 Bucket 參數(shù)。
MOR 表壓縮執(zhí)行異常,具體來說有以下三個場景:
- 單 log 超過1G,使寫延遲提高,導(dǎo)致越寫越慢至 checkpoint 超時,checkpoint 端到端耗時增長至3-6分鐘。
- 在 inline schedule 的壓縮模式下,offline execute 出現(xiàn)報錯:log文件不存在。
- Compaction 一直處于 Infight 狀態(tài),即進(jìn)行中,不能完成;同時存在無效 compaction,既不能被壓縮,也不能被取消。
此3種現(xiàn)象的原因都是 Sink:compact_commit 算子的并行度 > 1,我們做的優(yōu)化是降低壓縮過程的并發(fā)度,設(shè)置 compact_commit Parallelism = 1。并行度改成1后1G的 log 壓縮正常。整張表size 明顯減少。log 到 parquet 的壓縮比默認(rèn)是0.35。
MOR 表某分區(qū)存在重復(fù)文件,導(dǎo)致寫任務(wù)出錯。出現(xiàn)這個問題的原因是某個 instant 已寫 log 文件但未成功提交到 timeline 時,發(fā)生異常重啟后未 rollback 這個 instant,即未清理已有 log,繼續(xù)寫此 instant 則有重復(fù)。
我們做的優(yōu)化是在遇到重復(fù)文件時,通過 Hudi-Cli 執(zhí)行去重任務(wù),再恢復(fù)執(zhí)行。具體來說,需要拆分成以下四個步驟:
- 停止當(dāng)前的 Flink 任務(wù)。
- 通過 Hudi-cli 執(zhí)行去重命令。
repair deduplicate --duplicatedPartitionPath 20220604 --repairedOutputPath hdfs:///hudi/hudi_tis.db/track_detail_3_repair/20220604 --dedupeType upsert_type --sparkMaster local
- 刪除 partition 文件,修復(fù)文件移到原分區(qū)。
- 重新啟動 Flink 任務(wù)。
MOR 表某個壓縮計劃 pending,阻礙此 bucket 的壓縮及后續(xù)的壓縮計劃生成。這個問題是由于環(huán)境問題導(dǎo)致的 zombie compaction 或 bug。上圖中第一列是compaction instant time,即壓縮計劃生成時間,第二列是狀態(tài),第三列是此壓縮計劃包含的文件數(shù)。8181的 instant 卡住,且此壓縮計劃包含2198個文件,即涉及到大量的 file group,涉及的 file group不會有新的壓縮計劃生成。導(dǎo)致表的 size 增加,寫延時。
我們做的優(yōu)化是回滾不正常的合并任務(wù),重新處理。即利用較多資源快速離線壓縮完。保證之后啟動的 Flink 任務(wù)在相對少的資源情況下仍然可以保證更新和在線壓縮的效率。
具體來說,包括下面的命令:
- 執(zhí)行HoodieFlinkCompactor把所有inflight instant回滾成requested狀態(tài)。
- 執(zhí)行compaction unschedule命。
sh bin/hudi-compactor.sh hudi_tis track_detail_3 100
compaction unschedule --instant 20230613180604970 --parallelism 200 --sparkMaster local --sparkMemory 5g
經(jīng)過多次的修改和驗證,我們的入湖任務(wù)在性能和穩(wěn)定性上取得了明顯的改善。在穩(wěn)定性上,做到了在十幾天內(nèi)任務(wù)無異常。在時延上,做到了分鐘級別的 checkpoint 和數(shù)據(jù)可見。在資源使用上,對 Hadoop YARN 資源的占用明顯減少。
下圖總結(jié)了我們對實時入湖做的參數(shù)優(yōu)化方案,包括:
- 索引選擇GLOBAL_BLOOM-> BUCKET_INDEX #Bucket索引較布隆索引寫吞吐性能高。
- BUCKET_NUM 20 #Bucket數(shù)量:根據(jù)單分區(qū)數(shù)據(jù)量評估,保證File Slice2GB,平衡讀寫性能。
Flink增量checkpoint:Rockdb #Flink ck存儲,rockdb支持增量ck,減少單ck數(shù)據(jù)量,提高寫吞吐。
- Yarn資源:
jobmanager 5G #Flink jobmanager內(nèi)存,減少oom,保證穩(wěn)定。
taskmanager 50G 20S #Flink taskmanager內(nèi)存與slot數(shù),slot與并發(fā)度、bucket數(shù)一致。
- write.rate.limit 30000 #寫速度限制,過載保護(hù),保證作業(yè)穩(wěn)定運行。
- write.max.size 2560 #寫用到最大內(nèi)存,于taskmanager每個slot內(nèi)存一致。
- write.batch.size 512 #批量寫,適量調(diào)大減少刷盤頻率。
- compaction.max.memory 2048 #壓縮用到的最大內(nèi)存,適量調(diào)大提升壓縮速度。
- compaction.trigger.strategy num_and_time #壓縮策略 增量提交個數(shù)或時間達(dá)標(biāo)觸發(fā)生成壓縮策略。
- compaction.delta_seconds 30 #壓縮策略之時間,減少時間間隔,減少單個壓縮文件數(shù)。
- compaction.delta_commits 2 #壓縮策略之增量提交個數(shù),減少個數(shù),減少單個壓縮文件數(shù)。
實時任務(wù)入湖的優(yōu)化思路流程包括下面幾個步驟:
- 先確定 bucket 數(shù)量,觀察 fileSlice 大小,估算調(diào)整。
- 根據(jù) bucket 數(shù)確定 Flink job 并行度,與 bucket 數(shù)保持一致。
- 根據(jù)并行度確定 tm 資源,即并行度 = 總 slot 數(shù)。
- 根據(jù)總 slot 數(shù)確定內(nèi)存,即總內(nèi)存 = num(slot) * (write.max.size)。
- 根據(jù) pulsar topic 流量確定 write.rate.limit,一般峰值 * 1.5。
- 根據(jù) Flink job 內(nèi)存使用情況及平穩(wěn)度確定 write.max.size,可拿追存量數(shù)據(jù)測試,一般內(nèi)存降到寫速度明顯下降即為內(nèi)存最低值。
- 根據(jù)write.max.size 確定 write.batch.size 和 compaction.max.memory,前者是后兩個的和。
- 根據(jù) pulsar topic 流量確定壓縮類型,一般超過 10w/s 考慮使用 inline schedule 和 offline execution。
四、數(shù)據(jù)湖上的查詢
在引入 Kyuubi 前,我們通過 JDBC、Beeline、Spark Client、Flink Client 等客戶端訪問服務(wù)層執(zhí)行查詢,沒有統(tǒng)一入口,多個平臺不互通,多賬號權(quán)限體系。用戶的痛點是跨多平臺開發(fā)體驗差,低效率。平臺層的痛點是問題定位運維復(fù)雜,存在資源浪費。
在引入 Kyuubi 后,我們基于社區(qū)版 Kyuubi 做了一定的改造,包括 JDBC 引擎開發(fā)、JDBC 引擎 Ranger 鑒權(quán)開發(fā)、BI、JDBC 客戶端元數(shù)據(jù)適配修改、Spark 引擎大結(jié)果集存 HDFS、支持導(dǎo)數(shù)開發(fā)、JDBC 引擎 SQL 攔截控流開發(fā)等,實現(xiàn)了統(tǒng)一數(shù)據(jù)服務(wù)入口,做到了統(tǒng)一認(rèn)證權(quán)限管理和統(tǒng)一易用原則。
下圖展示了 Kyuubi 的架構(gòu)和權(quán)限管控:
Kyuubi 查詢流程是:客戶端請求通過 LDAP 認(rèn)證后,連接 Kyuubi Server 生成 Kyuubi session,之后 Kyuubi server 根據(jù)連接用戶以及用戶隔離級別路由到已經(jīng)啟動的 engine 或啟動一個新的 engine。Spark 引擎會先申請 container 運行 AppMaster,后申請 container 運行 executor 執(zhí)行 task。Flink 引擎會完成 StreamGraph 至 JobGraph 至 executionGraph 構(gòu)建并通過 Jobmanager 和 taskmanager 運行。其中 engine 端 RangerPlungin 會在 SQL 解析后拉取 RangerAdmin 由用戶配置的策略進(jìn)行鑒權(quán)。RangerAdmin 完成用戶同步,策略刷新等。
Kyuubi on Flink 跨庫查詢的目的是嘗試基于 Flink實現(xiàn)流批一體,支持跨數(shù)據(jù)源導(dǎo)數(shù) SQL 化。我們的實現(xiàn)方案是通過 Flink Metadata Catalog Connector 的開發(fā),即基于元數(shù)據(jù)系統(tǒng)以統(tǒng)一 datasource.db.table 的格式查詢所有數(shù)據(jù)源,且讓用戶免于自定義 DDL。其中元數(shù)據(jù)采集是用 datahub 的 ingestion framework 采集各種數(shù)據(jù)源的元數(shù)據(jù),并生成對應(yīng) Flink 表屬性。Flink 端是擴(kuò)展 AbstractCatalog 查詢 metadata DB,實現(xiàn) CatalogFactory 接口。
其基本流程如下圖所示:
完整流程是1 發(fā)起采集請求2和3是采集服務(wù)調(diào) Datahub ingestion framework 完成元數(shù)據(jù)采集并寫到 metadata DB 同時寫 Flink 表屬性。4是 用戶發(fā)送 SQL 到 Kyuubi server 5是 Kyuubi server 發(fā)送 SQL 到 Flink engine 6和7是 Flink metadata catalog 會讀取 metadata DB 根據(jù) Flink 表屬性讀取對應(yīng)數(shù)據(jù)源。
Kyuubi on JDBC Doris 可以通過外表查詢 Hudi,但在 Doris 1.2 版本,仍然有一定的限制,Hudi 目前僅支持Copy On Write 表的 Snapshot Query,以及 Merge On Read 表的 Read Optimized Query。后續(xù)將支持 Incremental Query 和 Merge On Read 表的 Snapshot Query。
Doris 的架構(gòu)示意和其基本使用流程如下圖所示: