作者 | 張靜
編輯 | 姚佳新
本文整理自快手數(shù)據(jù)架構(gòu)研發(fā)專家張靜在WOT2023大會上的主題分享,更多精彩內(nèi)容及現(xiàn)場PPT,請關(guān)注51CTO技術(shù)棧公眾號,發(fā)消息【W(wǎng)OT2023PPT】即可直接領(lǐng)取。
今天的分享分為四個部分:首先介紹傳統(tǒng)離線鏈路,它存在哪些痛點;第二部分引入數(shù)據(jù)湖的特性;第三部分是通過快手數(shù)據(jù)湖幾個典型的業(yè)務場景來說明如何基于數(shù)據(jù)湖技術(shù)重塑離線鏈路的生產(chǎn);最后一部分介紹近期工作和長遠規(guī)劃。希望通過本次分享能夠讓大家了解數(shù)據(jù)湖技術(shù)在重塑離線生產(chǎn)方式中的關(guān)鍵作用。
一、傳統(tǒng)離線鏈路的缺點
快手的傳統(tǒng)離線鏈路和很多公司是一致的,基于 Hive做離線分層數(shù)倉的建設。在入倉環(huán)節(jié)和層與層之間是基于 Spark 或者 Hive做清洗加工和計算。這個鏈路有以下四個痛點:
更新成本高:Hive 表最細的更新粒度是分區(qū)級,需要先掃出分區(qū)的全量數(shù)據(jù),關(guān)聯(lián)這次更新的增量數(shù)據(jù)得到這次的全量數(shù)據(jù)并覆蓋原來的分區(qū)。這個過程導致計算開銷比較大,且降低時效性;
缺少索引:不僅影響更新,也影響讀取。因為查詢大部分會以掃描為主,由此會導致查詢效率低;
缺少事務:多個寫入任務之間,寫入任務和讀取任務之間缺少事務機制,需要讀寫鎖來避免數(shù)據(jù)的不一致;
啟動調(diào)度晚:目前離線任務調(diào)度最細粒度是小時級別,會影響下游各層的數(shù)據(jù)可見性;
圖片
二、HUDI 數(shù)據(jù)湖的特性
針對傳統(tǒng)離線鏈路的缺點,我們決定引入數(shù)據(jù)湖來解決上述的痛點??焓质窃?1年開始探索數(shù)據(jù)湖方向,我們進行了技術(shù)選型,考慮到HUDI 對更新能力的支持,以及活躍的社區(qū)生態(tài),由此便選擇了HUDI。HUDI 具備如下幾個特點:
寫入:由于 HUDI提供多種內(nèi)置的索引,基于這些索引可以提供高效的更新能力;寫入支持流式入湖,也支持離線入湖;支持多種的寫入操作,比如插入、更新、刪除、覆蓋;支持多種輸入源,比如更新流,日志流。
查詢:支持多種的查詢方式,比如讀優(yōu)化查詢、快照查詢和增量查詢;提供時間旅行的特點解鎖查詢歷史版本的能力;社區(qū)做了很多優(yōu)化提高查詢的效率。
并發(fā)控制:HUDI 引入 MVCC 來控制寫入任務和查詢?nèi)蝿罩g的并發(fā)。引入 OCC 來控制多個寫入任務之間的并發(fā)。同時社區(qū)也有一些關(guān)于無鎖的并發(fā)控制。
豐富的表服務:Compaction、Clustering、Clean 等。
開放性:適配多種計算引擎和查詢引擎,比如 Spark,F(xiàn)link,Presto,Trino,Starrokcs,Doris 等
Schema Evolution:提供Schema 演進的能力。
圖片
三、快手數(shù)據(jù)湖的典型業(yè)務場景
下面通過快手在數(shù)據(jù)湖上的幾個典型業(yè)務場景介紹如何用 HUDI重塑離線鏈路產(chǎn)生。分為三個方向:數(shù)據(jù)同步、數(shù)據(jù)更新、寬表拼接。每個方向都會介紹兩類最有代表性的場景。
圖片
1.數(shù)據(jù)同步 – 日志流入湖
首先是數(shù)據(jù)同步里日志流入湖??焓謨?nèi)部的數(shù)據(jù)同步工具有一個限制:只支持日期和小時兩級分區(qū)。所以一個日志流從 Kafka 到入倉整個鏈路需要多個離線任務加工,這就導致了鏈路長,重復計算和冗余存儲的問題。
圖片
基于 HUDI 改進后的方案,整個鏈路得到極大的簡化。直接用 Flink 任務做日志流數(shù)據(jù)入湖。最后一層將 HUDI 表落到 DWD 層數(shù)據(jù)主要是做兼容性,這樣下游業(yè)務依然可以訪問原來的 Hive 表,同時獲得時效性的提升,在資源持平情況下,時效性從之前1h40min縮減到40min,也降低了了鏈路的復雜度。
圖片
2.數(shù)據(jù)同步 – CDC 數(shù)據(jù)入湖
第二個場景是更新場景入湖。歷史上 Mysql to Hive的方案有兩個鏈路,一個全量初始化任務,一個是增量同步任務。初始化任務把全量數(shù)據(jù)落到一個HIVE 全量快照表,完成后啟動增量同步任務把增量binlog 數(shù)據(jù)落到一個 HIVE增量表,每天合并前一天的全量和今天的增量生成一個新的全量快照表。
圖片
Mysql to Hive 方案的痛點是時效低。時效低有兩方面原因:第一個是離線任務調(diào)度周期是T+1級別,第二個是任務調(diào)度以后才做全量和增量的合并。
圖片
改造后的Mysql to HUDI,鏈路得到了簡化,直接把 CDC 更新數(shù)據(jù)落到一個 HUDI 表里,這個 HUDI 表是沒有日期或者小時分區(qū)的。內(nèi)部的 MySQL to HUDI 和其他公司的 CDC 更新流入湖比較起來有一些差異化的需求,因此我們在設計上也是有所不同。
避免在全量同步完成后再啟動增量同步任務:因為采用傳統(tǒng)的串行調(diào)度,如果全量同步任務執(zhí)行很久才結(jié)束,增量同步啟動后可能發(fā)現(xiàn)最開始的一些 Kafka 數(shù)據(jù)已經(jīng)被清理了,導致數(shù)據(jù)丟失。因此,支持全量初始化任務和增量同步任務的并行,不需要等全量初始化任務完成后再去調(diào)度增量同步任務。
按照事件時間來查詢某個版本:HUDI 的版本是一個 processing time 的語義,但是用戶需要能按照 event time 語義來訪問某個 HUDI 版本。為了支持按照事件時間方案,在元數(shù)據(jù)里維護 Processing time 到 Event time 的映射關(guān)系。收到按照事件時間的快照查詢請求,先做一下映射得到 processing time,再基于time travel能力查詢對應的版本。
數(shù)據(jù)就緒后盡快發(fā)布對應版本:如果完全依賴周期性的 checkpoint 來做分區(qū)發(fā)布會導致數(shù)據(jù)就緒后不能立刻發(fā)布對應的版本。這里修改了 Flink 引擎的邏輯,除了周期性的 checkpoint 以外,又增加一種非周期性的checkpoint 用于監(jiān)聽到整點數(shù)據(jù)就緒以后立刻發(fā)布分區(qū)。
兼容當前 HIVE 表的使用方式:1. Mysql to HUDI 鏈路里的HUDI 表是沒有日期分區(qū),如何能按照日期分區(qū)查詢。2.長生命周期管理,用戶可能需要訪問很久以前的數(shù)據(jù)。為了支持這兩個需求,Mysql to HUDI 的鏈路會輸出兩個表,一個是無時間分區(qū)的 HUDI 表,一個是HIVE 表。在發(fā)布分區(qū)時,會在HIVE 表里添加一個新分區(qū),這個時候分區(qū) location下是沒有數(shù)據(jù),分區(qū)元數(shù)據(jù)里維護了它對應哪個 HUDI 表的哪個版本。無時間分區(qū)的HUDI 表是沒有辦法直接做長生命周期的,所以定期把HUDI 數(shù)據(jù)同步到Hive 表中去。歸檔后的 HIVE 表分區(qū)就是一個普通的 HIVE 分區(qū),它的 location 下有對應的分區(qū)數(shù)據(jù)。因此,這個HIVE 表是一個異構(gòu)的HIVE 表。異構(gòu)性體現(xiàn)在兩個方面,第一個元數(shù)據(jù)是異構(gòu)的,第二個是數(shù)據(jù)是異構(gòu)的。這個異構(gòu)設計對用戶是透明的。當用戶查詢HIVE分區(qū)的時候,引擎通過 Hive 元數(shù)據(jù)判斷這個日期是否被歸檔,如果還沒有被歸檔,會通過分區(qū)元數(shù)據(jù)里的HUDI 表和版本把請求路有到HUDI 表上。如果是歸檔后的分區(qū),直接走正常的HIVE查詢流程把分區(qū)數(shù)據(jù)返回給用戶。
圖片
Mysql to HUDI的整個鏈路如上圖。分為左右兩部分。左邊是必選的,做CDC 入湖;右邊是可選的,為了支持兼容HIVE 的需求。
3.數(shù)據(jù)更新
數(shù)據(jù)更新的第一個業(yè)務場景是人群包圈選。每次活動DAU 是一個非常重要的指標,人群圈選業(yè)務是根據(jù)用戶的歷史行為來圈選出一些潛在的目標用戶。歷史方案是基于天級離線數(shù)據(jù)和小時級離線數(shù)據(jù)組合計算生成。這種方式存在的最大痛點就是時效性問題,某些場景下的小時級產(chǎn)出的數(shù)據(jù)延遲在3-4 小時左右,對于除夕活動來說,這種延遲是不能忍受的。基于 HUDI 改造后的鏈路是用一個實時的 Flink 任務,在入湖過程中完成更新。這使得整條鏈得到簡化,不僅時效性從3h ~ 4h左右縮短到15min左右,而且資源也有節(jié)約。
第二個業(yè)務場景是基于HUDI 自定義的payload能力的N天留存標簽更新。歷史的留存鏈路加工流程需要大規(guī)模Join 并且需要與行為數(shù)據(jù)進行整合,并且需要大規(guī)模數(shù)據(jù)回刷。具體過程是用當天的日活數(shù)據(jù)和歷史N天的日活數(shù)據(jù)算出當天日活用戶在過去 180 天的留存標簽,存一個中間表。然后分別用過去N天的行為數(shù)據(jù)關(guān)聯(lián)這個中間表得到最新的標簽覆蓋回對應的分區(qū)。這個方案的缺點是時效低,重復計算和重復存儲。
基于HUDI 改造后的鏈路從剛才的多層關(guān)聯(lián)升級為單表生產(chǎn),時效性也是有了很大的提升,從2.5h縮短到1.5h。資源開銷也是有收益的。這里最重要的就是基于 HUDI 的 MOR 表能力和自定義payload 的特點。寫入流程非常輕量,將當天的日活數(shù)據(jù)產(chǎn)生的增量數(shù)據(jù)寫到歷史N 天的分區(qū)里。合并流程做在分區(qū)內(nèi)部做局部關(guān)聯(lián)只更新對應的留存標簽。
圖片
圖片
4.寬表拼接
第三個方向是寬表拼接,也介紹兩個典型的業(yè)務場景,一個是離線寬表模型,一個是準實時的多流拼接。
寬表模型是指把業(yè)務主題相關(guān)的指標、維度、屬性關(guān)聯(lián)在一起的一張大寬表。寬表模型因為結(jié)構(gòu)簡單,模型可復用度高,數(shù)據(jù)訪問效率等優(yōu)勢,廣泛地使用在 BI 和 AI 場景。
圖片
基于 HUDI 的寬表拼接之前有很多公司也有分享,我們內(nèi)部的寬表拼接有一些差異化的需求。
支持多個寫入任務并行:允許多個寫入任務并行加工一張寬表,每個寫入任務加工這個寬表中的部分列。
支持 Schema Evolution:在業(yè)務演進過程中可能隨時需要有更多的列加進來。用戶希望在創(chuàng)建表的時候,只需要定義必要的列,比如主鍵列、分區(qū)列、排序列。后續(xù)可以很靈活地添加新的列。
支持 Implicit Schema Evolution:顯式的 Schema Evolution 是指通過類似于 Alter table add column 這種DDL 語句來修改表。Implicit Schema Evolution,是指在寫入任務的 Schema里包含了表里不存在的列,會在寫入任務提交時追加到這個表的最后。
支持 Partial Insert:寫入任務不需要指定表里的所有列,允許只插入表里的部分。
支持不同分區(qū)設置不同的桶個數(shù):有一些業(yè)務分區(qū)存在非常大的數(shù)據(jù)量差異,所以需要能支持不同子分區(qū)設置不同的桶個數(shù)。
支持快照隔離:讀取任務和寫入任務之間支持快照隔離,上游加工好部分列以后,下游就可以先讀這些加工好的部分列。
圖片
上圖是一個簡單的寬表拼接的例子。兩個寫入任務加工一個寬表,第一個寫入任務加工 id, ts 和name。第二個寫入任務加工 id, ts 和 price。每個寫入任務只需要寫入部分列,這個是 partial insert 的能力。最后合并流程做拼接。另外,這個圖也可以說明 schema evolution。建表時,只定義了主鍵、排序鍵和分區(qū)鍵。第一個寫入任務提交的時候追加了name 列,第二個寫入任務提交的時候追加 price 列。
圖片
寫入階段分為兩個階段,第一個階段寫入數(shù)據(jù),第二個階段提交數(shù)據(jù)。第一個階段是無鎖方案的設計,第二個階段是有鎖的設計。第一個階段,寫入任務是在加工同一個文件組的同一個數(shù)據(jù)版本下不同的增量文件來避免多個任務把一個文件寫花。在提交階段引入一種特殊的沖突檢查機制,允許在不同分區(qū)或者是相同分區(qū)的不同列上的并發(fā)寫入,另外這個階段按需更新 schema,發(fā)現(xiàn)有新增的列需要更新schema 。
這個方案也可以用在實時寬表拼接場景,這里因為時間關(guān)系,不再做贅述。最后說一下在目前的寬表拼接實現(xiàn)里有一個限制,即寫入任務正在進行時不可以生成合并計劃,可能存在丟數(shù)據(jù)的風險。在用戶角度這個限制有三點影響:第一個是離線寬表拼接場景需要依賴任務以來關(guān)系來避免寫入任務和 schedule compaction 的并行。第二個是對實時寬表拼接場景,只能在同一個 Flink 作業(yè)的多個 pipeline 里共同加工一個寬表,不能多個 Flink 作業(yè)同時加工一個寬表。第三個是不能滿足實時和離線任務共同加工一張寬表的需求。
四、未來規(guī)劃
圖片
近期的工作有四點:
(1)Schedule Compaction 和 Writer 的并發(fā)。
(2)可擴展的 Bucket index,實現(xiàn)根據(jù)數(shù)據(jù)量自動適配 bucket number 個數(shù)。
(3)加速寫入流程:這里涉及到多個優(yōu)化點,一個是優(yōu)化寫入鏈路,一個是減少序列化和反序列化開銷
(4)服務化建設。包括 MetaStore Service 和 Table Service。
圖片
中長期的工作圍繞兩個方向,第一個是建設實時數(shù)據(jù)湖。對于實時數(shù)據(jù)湖也會有很多挑戰(zhàn),需要把它補充齊才可以把實時化做起來,這塊會引入流計算領(lǐng)域領(lǐng)域通用的概念,比如事件時間和watermark。第二個是基于HUDI的分析查詢場景。我們會參與到社區(qū)的建設中,通過構(gòu)建物化視圖減少重復計算加速查詢,后續(xù)也會引入緩存加速分析查詢的場景。這兩個方向都有很多地方需要探索和完善。