伴魚基于 Flink 構建數(shù)據(jù)集成平臺的設計與實現(xiàn)
數(shù)據(jù)倉庫有四個基本的特征:面向主題的、集成的、相對穩(wěn)定的、反映歷史變化的。其中數(shù)據(jù)集成是數(shù)據(jù)倉庫構建的首要前提,指將多個分散的、異構的數(shù)據(jù)源整合在一起以便于后續(xù)的數(shù)據(jù)分析。將數(shù)據(jù)集成過程平臺化,將極大提升數(shù)據(jù)開發(fā)人員的效率,本文主要內容為:
- 數(shù)據(jù)集成 VS 數(shù)據(jù)同步
- 集成需求
- 數(shù)據(jù)集成 V1
- 數(shù)據(jù)集成 V2
- 線上效果
- 總結
A data warehouse is a subject-oriented, integrated, nonvolatile, and time-variant collection of data in support of management’s decisions.
—— Bill Inmon
一、數(shù)據(jù)集成 VS 數(shù)據(jù)同步
- 「數(shù)據(jù)集成」往往和「數(shù)據(jù)同步」在概念上存在一定的混淆,為此我們對這二者進行了區(qū)分:
- 「數(shù)據(jù)集成」特指面向數(shù)據(jù)倉庫 ODS 層的數(shù)據(jù)同步過程;
- 「數(shù)據(jù)同步」面向的是一般化的 Source 到 Sink 的數(shù)據(jù)傳輸過程。
二者的關系如下圖所示:
「數(shù)據(jù)同步平臺」提供基礎能力,不摻雜具體的業(yè)務邏輯。
「數(shù)據(jù)集成平臺」是構建在「數(shù)據(jù)同步平臺」之上的,除了將原始數(shù)據(jù)同步之外還包含了一些聚合的邏輯 (如通過數(shù)據(jù)庫的日志數(shù)據(jù)對快照數(shù)據(jù)進行恢復,下文將會詳細展開) 以及數(shù)倉規(guī)范相關的內容 (如數(shù)倉 ODS 層庫表命名規(guī)范) 等。
目前「數(shù)據(jù)同步平臺」的建設正在我們的規(guī)劃之中,但這并不影響「數(shù)據(jù)集成平臺」的搭建,一些同步的需求可提前在「實時計算平臺」創(chuàng)建,以「約定」的方式解耦。
值得一提的是「數(shù)據(jù)集成」也應當涵蓋「數(shù)據(jù)采集」(由特定的工具支持) 和「數(shù)據(jù)清洗」(由采集粒度、日志規(guī)范等因素決定) 兩部分內容,這兩部分內容各個公司都有自己的實現(xiàn),本文將不做詳細介紹。
二、集成需求
目前伴魚內部數(shù)據(jù)的集成需求主要體現(xiàn)在三塊:Stat Log (業(yè)務標準化日志或稱統(tǒng)計日志)、TiDB 及 MongoDB。除此之外還有一些 Service Log、Nginx Log 等,此類不具備代表性不在本文介紹。另外,由于實時數(shù)倉正處于建設過程中,目前「數(shù)據(jù)集成平臺」只涵蓋離線數(shù)倉 (Hive)。
- Stat Log:業(yè)務落盤的日志將由 FileBeat 組件收集至 Kafka。由于日志為 Append Only 類型, 因此 Stat Log 集成相對簡單,只需將 Kafka 數(shù)據(jù)同步至 Hive 即可。
- DB (TiDB、MongoDB):DB 數(shù)據(jù)相對麻煩,核心訴求是數(shù)倉中能夠存在業(yè)務數(shù)據(jù)庫的鏡像,即存在業(yè)務數(shù)據(jù)庫中某一時刻(天級 or 小時級)的數(shù)據(jù)快照,當然有時也有對數(shù)據(jù)變更過程的分析需求。因此 DB 數(shù)據(jù)集成需要將這兩個方面都考慮進去。
由于以上兩種類型的數(shù)據(jù)集成方式差異較大,下文將分別予以討論。
三、數(shù)據(jù)集成 V1
伴魚早期「數(shù)據(jù)集成平臺」已具備雛形,這個階段主要是借助一系列開源的工具實現(xiàn)。隨著時間推進,這個版本暴露的問題也逐漸增多,接下來將主要從數(shù)據(jù)流的角度對 V1 進行闡述,更多的細節(jié)問題將在 V2 版本的設計中體現(xiàn)。
3.1 Stat Log
日志的集成并未接入平臺,而是煙囪式的開發(fā)方式,數(shù)據(jù)集成的鏈路如下圖所示:
Kafka 中的數(shù)據(jù)先經過 Flume 同步至 HDFS,再由 Spark 任務將數(shù)據(jù)從 HDFS 導入至 Hive 并創(chuàng)建分區(qū)。整體鏈路較長且引入了第三方組件(Flume)增加了運維的成本,另外 Kafka 的原始數(shù)據(jù)在 HDFS 冗余存儲也增加了存儲的開銷。
3.2 DB
DB 數(shù)據(jù)的集成主要是基于查詢的方式(批的方式,通過 Select 查詢進行全表掃描得到快照數(shù)據(jù))實現(xiàn),其鏈路如下圖所示:
用戶通過平臺提交集成任務,由 Airflow 定時任務掃描集成平臺元數(shù)據(jù)庫,生成對應的取數(shù)任務 (TiDB 的數(shù)據(jù)通過 Sqoop 工具,MongoDB 的數(shù)據(jù)則通過 Mongoexport 工具)。可以看到 V1 版本并沒有獲取數(shù)據(jù)庫的變更的日志數(shù)據(jù),不能滿足對數(shù)據(jù)變更過程的分析訴求。
由于 Sqoop 任務最終要從 TiDB 生產環(huán)境的業(yè)務數(shù)據(jù)庫獲取數(shù)據(jù),數(shù)據(jù)量大的情況下勢必對業(yè)務數(shù)據(jù)庫造成一定的影響。Mongoexport 任務直接作用在 MongoDB 的隱藏節(jié)點 (無業(yè)務數(shù)據(jù)請求),對于線上業(yè)務的影響可以忽略不計?;诖?,DBA 單獨搭建了一套 TiDB 大數(shù)據(jù)集群,用于將體量較大的業(yè)務數(shù)據(jù)庫同步至此 (基于 TiDB Pump 和 Drainer 組件),因此部分 Sqoop 任務可以從此集群拉群數(shù)據(jù)以消除對業(yè)務數(shù)據(jù)庫的影響。從數(shù)據(jù)流的角度,整個過程如下圖所示:
是否將生產環(huán)境 TiDB 業(yè)務數(shù)據(jù)庫同步至 TiDB 大數(shù)據(jù)集群由數(shù)倉的需求以及 DBA 對于數(shù)據(jù)量評估決定??梢钥闯觯@種形式也存在著大量數(shù)據(jù)的冗余,集群的資源隨著同步任務的增加時長達到瓶頸。并且隨著后續(xù)的演進,TiDB 大數(shù)據(jù)集群也涵蓋一部分數(shù)據(jù)應用生產環(huán)境的業(yè)務數(shù)據(jù)庫,集群作用域逐漸模糊。
四、數(shù)據(jù)集成 V2
V2 版本我們引入了 Flink,將同步的鏈路進行了簡化,DB 數(shù)據(jù)集成從之前的基于查詢的方式改成了基于日志的方式 (流的方式),大大降低了冗余的存儲。
4.1 Stat Log
借助 Flink 1.11 版本后對于 Hive Integration 的支持,我們可以輕松的將 Kafka 的數(shù)據(jù)寫入 Hive,因此 Stat Log 的集成也就變得異常簡單 (相比 V1 版本,去除了對 Flume 組件的依賴,數(shù)據(jù)冗余也消除了),同時 Flink Exactly-Once 的語義也確保了數(shù)據(jù)的準確性。從數(shù)據(jù)流的角度,整個過程如下圖所示:
目前按照小時粒度生成日志分區(qū),幾項 Flink 任務配置參數(shù)如下:
- checkpoint: 10 min
- watermark: 1 min
- partition.time-extractor.kind: ‘custom’
- sink.partition-commit.delay: ‘3600s’
- sink.partition-commit.policy.kind: ‘metastore,success-file’
- sink.partition-commit.trigger: ‘partition-time’
4.2 DB
基于日志的方式對 DB 數(shù)據(jù)進行集成,意味著需要采集 DB 的日志數(shù)據(jù),在我們目前的實現(xiàn)中 TiDB 基于 Pump 和 Drainer 組件(目前生產環(huán)境數(shù)據(jù)庫集群版本暫不支持開啟 TICDC),MongoDB 基于 MongoShake 組件,采集的數(shù)據(jù)將輸送至 Kafka。
采用這種方式,一方面降低了業(yè)務數(shù)據(jù)庫的查詢壓力,另一方面可以捕捉數(shù)據(jù)的變更過程,同時冗余的數(shù)據(jù)存儲也消除了。不過由于原始數(shù)據(jù)是日志數(shù)據(jù),需要通過一定的手段還原出快照數(shù)據(jù)。新的鏈路如下圖所示:
用戶提交集成任務后將同步創(chuàng)建三個任務:
- 增量任務 (流):「增量任務」將 DB 日志數(shù)據(jù)由 Kafka 同步至 Hive。由于采集組件都是按照集群粒度進行采集,且集群數(shù)量有限,目前都是手動的方式將同步的任務在「實時計算平臺」創(chuàng)建,集成任務創(chuàng)建時默認假定同步任務已經 ready,待「數(shù)據(jù)同步平臺」落地后可以同步做更多的自動化操作和校驗。
- 存量任務 (批):要想還原出快照數(shù)據(jù)則至少需要一份初始的快照數(shù)據(jù),因此「存量任務」的目的是從業(yè)務數(shù)據(jù)庫拉取集成時數(shù)據(jù)的初始快照數(shù)據(jù)。
- Merge 任務 (批):「Merge 任務」將存量數(shù)據(jù)和增量數(shù)據(jù)進行聚合以還原快照數(shù)據(jù)。還原后的快照數(shù)據(jù)可作為下一日的存量,因此「存量任務」只需調度執(zhí)行一次,獲取初始快照數(shù)據(jù)即可。
「存量任務」和「Merge 任務」由離線調度平臺 Dolphinscheduler (簡稱 DS) 調度執(zhí)行,任務執(zhí)行過程中將從集成任務的元數(shù)據(jù)庫中獲取所需的信息。目前「Merge 任務」按小時粒度調度,即每小時還原快照數(shù)據(jù)。
從數(shù)據(jù)流的角度,整個過程如下圖所示:
DB 的數(shù)據(jù)集成相較于 Stat Log 復雜性高,接下來以 TiDB 的數(shù)據(jù)集成為例講述設計過程中的一些要點 (MongoDB 流程類似,區(qū)別在于存量同步工具及數(shù)據(jù)解析)。
4.2.1 需求表達
對于用戶而言,集成任務需要提供以下兩類信息:
- TiDB 源信息:包括集群、庫、表。
- 集成方式:集成方式表示的是快照數(shù)據(jù)的聚合粒度,包括全量和增量。全量表示需要將存量的快照數(shù)據(jù)與今日的增量日志數(shù)據(jù)聚合,而增量表示只需要將今日的增量日志數(shù)據(jù)聚合 (即便增量方式無需和存量的快照數(shù)據(jù)聚合,但初始存量的獲取依舊是有必要的,具體的使用形式由數(shù)倉人員自行決定)。
4.2.2 存量任務
存量任務雖然有且僅執(zhí)行一次,但為了完全消除數(shù)據(jù)集成對業(yè)務數(shù)據(jù)庫的影響,我們選擇數(shù)據(jù)庫的備份-恢復機制來實現(xiàn)。公司內部數(shù)據(jù)庫的備份和恢復操作已經平臺化,集群將定期進行備份 (天粒度),通過平臺可以查詢到集群的最新備份,并且可由接口觸發(fā)備份恢復操作,故存量的獲取可直接作用于恢復的數(shù)據(jù)庫。
由于數(shù)據(jù)庫備份的時間點與集成任務提交的時間點并不一定是同一天,這之間存在著一定的時間差將導致存量快照數(shù)據(jù)不符合我們的預期,各時間點的關系如下圖所示:
按照我們的設定,存量快照數(shù)據(jù)應當是包含 T4 之前的全部數(shù)據(jù),而實際備份的快照數(shù)據(jù)僅包含 T1 之前的全部數(shù)據(jù),這之間存在這 N 天的數(shù)據(jù)差。
注:這里之所以不說數(shù)據(jù)差集為 T1 至 T4 區(qū)間的數(shù)據(jù),是因為增量的 Binlog 數(shù)據(jù)是以整點為分區(qū)的,在 Merge 的時候也是將整點的分區(qū)數(shù)據(jù)與存量數(shù)據(jù)進行聚合,并支持了數(shù)據(jù)去重。因此 T1 時刻的存量數(shù)據(jù)與 T0-T3 之間的增量數(shù)據(jù)的 Merge 結果等效于 T0 時刻的存量數(shù)據(jù)與 T0-T3 之間的增量數(shù)據(jù)的 Merge 結果。所以 T1 至 T4 的數(shù)據(jù)差集等效 T0 至 T3 的數(shù)據(jù)差集,即圖示中的 N 天數(shù)據(jù)。
對于缺失的這部分數(shù)據(jù)實則是可以在「存量任務」中進行補全,仔細分析這其實是可以通過執(zhí)行的 「Merge 任務」的補數(shù)操作實現(xiàn)。
整個「存量任務」的工作流如下圖所示:
- 同步觸發(fā)數(shù)據(jù)庫平臺進行備份恢復,產生回執(zhí) ID;
- 通過回執(zhí) ID 輪訓備份恢復狀態(tài),恢復失敗需要 DBA 定位異常,故將下線整個工作流,待恢復成功可在平臺重新恢復執(zhí)行「存量任務」?;謴瓦M行中,工作流直接退出,借助 DS 定時調度等待下次喚醒。恢復成功,進入后續(xù)邏輯;
- 從恢復庫中拉取存量,判定存量是否存在數(shù)據(jù)差,若存在則執(zhí)行 Merge 任務的補數(shù)操作,整個操作可冪等執(zhí)行,如若失敗退出此次工作流,等待下次調度;
- 成功,下線整個工作流,任務完成。
4.2.3 Merge 任務
Merge 任務的前提是存量數(shù)據(jù)與增量數(shù)據(jù)都已經 ready,我們通過 _SUCCESS 文件進行標記。整個「Merge 任務」的工作流如下圖所示:
- 校驗文件標記是否存在,若不存在說明數(shù)據(jù)未 ready ,進行報警并退出工作流等待下次調度;
- 執(zhí)行 Merge 操作,失敗報警并退出工作流等待下次調度;
- 成功,退出工作流等待下次調度。
Merge 操作通過 Flink DataSet API 實現(xiàn)。核心邏輯如下:
- 加載存量、增量數(shù)據(jù),統(tǒng)一數(shù)據(jù)格式(核心字段:主鍵 Key 作為同一條數(shù)據(jù)的聚合字段;CommitTs 標識 binlog 的提交時間,存量數(shù)據(jù)默認為 0 早于增量數(shù)據(jù);OpType 標識數(shù)據(jù)操作類型,包括:Insert、Update、Delete,存量數(shù)據(jù)默認為 Insert 類型),將兩份數(shù)據(jù)進行 union;
- 按照主鍵聚合;
- 保留聚合后 CommitTs 最大的數(shù)據(jù)條目,其余丟棄;
- 過濾 OpType 為 Delete 類型的數(shù)據(jù)條目;
- 輸出聚合結果。
核心代碼:
- allMergedData.groupBy(x -> x.getKeyCols())
- .reduce(new ReduceFunction<MergeTransform>() {
- public MergeTransform reduce(MergeTransform value1, MergeTransform value2) throws Exception {
- if (value1.getCommitTS() > value2.getCommitTS()){
- return value1;
- }
- return value2;
- }
- })
- .filter(new FilterFunction<MergeTransform>() { //增量:過濾掉 op=delete
- public boolean filter(MergeTransform merge) throws Exception {
- if (merge.getOpType().equals(OPType.DELETE)){
- return false;
- }
- return true;
- }
- })
- .map(x -> x.getHiveColsText())
- .writeAsText(outPath);
主要思想為「后來者居上」,針對于 Insert、Update 操作,最新值直接覆蓋舊值,針對 Delete 操作,直接丟棄。這種方式也天然的實現(xiàn)了數(shù)據(jù)去重操作。
4.2.4 容錯性與數(shù)據(jù)一致性保證
我們大體可以從三個任務故障場景下的處理方式來驗證方案的容錯性。
- 「存量任務」異常失?。和ǔJ莻浞莼謴褪е?,DS 任務將發(fā)送失敗報警,因「數(shù)據(jù)庫平臺」暫不支持恢復重試,需人工介入處理。同時「Merge 任務」檢測不到存量的 _SUCCESS 標記,工作流不會向后推進。
- 「增量任務」異常失敗:Flink 自身的容錯機制以及「實時計算平臺」的外部檢測機制保障「增量任務」的容錯性。若在「Merge 任務」調度執(zhí)行期間「增量任務」尚未恢復,將誤以為該小時無增量數(shù)據(jù)跳過執(zhí)行,此時相當于快照更新延遲(Merge 是將全天的增量數(shù)據(jù)與存量聚合,在之后的調度時間點如果「增量任務」恢復又可以聚合得到最新的快照),或者在「增量任務」恢復后可人為觸發(fā)「Merge 任務」補數(shù)。
- 「Merge 任務」異常失敗:任務具有冪等性,通過設置 DS 任務失敗后的重試機制保障容錯性,同時發(fā)送失敗報警。
以上,通過自動恢復機制和報警機制確保了整個工作流的正確執(zhí)行。接下來我們可以從數(shù)據(jù)的角度看一下方案對于一致性的保障。
數(shù)據(jù)的一致性體現(xiàn)在 Merge 操作。兩份數(shù)據(jù)聚合,從代碼層面一定可以確保算法的正確性 (這是可驗證的、可測試的),那么唯一可能導致數(shù)據(jù)不一致的情況出現(xiàn)在兩份輸入的數(shù)據(jù)上,即存量和增量,存在兩種情況:
- 存量和增量數(shù)據(jù)有交疊:體現(xiàn)在初始存量與整點的增量數(shù)據(jù)聚合場景,由于算法天然的去重性可以保證數(shù)據(jù)的一致。
- 存量和增量數(shù)據(jù)有缺失:體現(xiàn)在增量數(shù)據(jù)的缺失上,而增量數(shù)據(jù)是由 Flink 將 Kafka 數(shù)據(jù)寫入 Hive 的,這個過程中是有一定的可能性造成數(shù)據(jù)的不一致,即分區(qū)提交后的亂序數(shù)據(jù)。雖然說亂序數(shù)據(jù)到來后的下一次 checkpoint 時間點分區(qū)將再次提交,但下游任務一般是檢測到首次分區(qū)提交就會觸發(fā)執(zhí)行,造成下游任務的數(shù)據(jù)不一致。
針對 Flink 流式寫 Hive 過程中的亂序數(shù)據(jù)處理可以采取兩種手段:
- 一是 Kafka 設置單分區(qū),多分區(qū)是產生導致亂序的根因,通過避免多分區(qū)消除數(shù)據(jù)亂序。
- 二是報警補償,亂序一旦產生流式任務是無法完全避免的 (可通過 watermark 設置亂序容忍時間,但終有一個界限),那么只能通過報警做事后補償。
問題轉換成了如何感知到亂序,我們可以進一步分析,既然亂序數(shù)據(jù)會觸發(fā)前一個分區(qū)的二次提交,那么只需要在提交分區(qū)的時候檢測前一個分區(qū)是否存在 _SUCCESS 標記便可以知曉是否是亂序數(shù)據(jù)以及觸發(fā)報警。
五、線上效果
總覽
存量任務
Merge 任務
六、總結
本文闡述了伴魚「數(shù)據(jù)集成平臺」核心設計思路,整個方案還有一些細節(jié)未在文章中體現(xiàn),如數(shù)據(jù) Schema 的變更、DB 日志數(shù)據(jù)的解析等,這些細節(jié)對于平臺構建也至關重要。目前伴魚絕大部分的集成任務已切換至新的方式并穩(wěn)定運行。我們也正在推進實時數(shù)倉集成任務的接入,以提供更統(tǒng)一的體驗。