自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink CDC + Hudi 海量數(shù)據(jù)入湖在順豐的實踐

大數(shù)據(jù)
順豐的數(shù)據(jù)集成經(jīng)歷了幾年的發(fā)展,主要分為兩塊,一塊是離線數(shù)據(jù)集成,一塊是實時數(shù)據(jù)集成。離線數(shù)據(jù)集成以 DataX 為主,本文主要介紹實時數(shù)據(jù)集成方案。

?摘要:本文整理自順豐大數(shù)據(jù)研發(fā)工程師覃立輝在 5月 21 日 Flink CDC Meetup 的演講。主要內(nèi)容包括:

  • 順豐數(shù)據(jù)集成背景
  • Flink CDC 實踐問題與優(yōu)化
  • 未來規(guī)劃

一、順豐數(shù)據(jù)集成背景

順豐是快遞物流服務提供商,主營業(yè)務包含了時效快遞、經(jīng)濟快遞、同城配送以及冷鏈運輸?shù)取?/p>

運輸流程背后需要一系列系統(tǒng)的支持,比如訂單管理系統(tǒng)、智慧物業(yè)系統(tǒng)、以及很多中轉(zhuǎn)場、汽車或飛機上的很多傳感器,都會產(chǎn)生大量數(shù)據(jù)。如果需要對這些數(shù)據(jù)進行數(shù)據(jù)分析,那么數(shù)據(jù)集成是其中很重要的一步。

順豐的數(shù)據(jù)集成經(jīng)歷了幾年的發(fā)展,主要分為兩塊,一塊是離線數(shù)據(jù)集成,一塊是實時數(shù)據(jù)集成。離線數(shù)據(jù)集成以 DataX 為主,本文主要介紹實時數(shù)據(jù)集成方案。

2017 年,基于 Jstorm + Canal 的方式實現(xiàn)了第一個版本的實時數(shù)據(jù)集成方案。但是此方案存在諸多問題,比如無法保證數(shù)據(jù)的一致性、吞吐率較低、難以維護。2019 年,隨著 Flink 社區(qū)的不斷發(fā)展,它補齊了很多重要特性,因此基于 Flink + Canal 的方式實現(xiàn)了第二個版本的實時數(shù)據(jù)集成方案。但是此方案依然不夠完美,經(jīng)歷了內(nèi)部調(diào)研與實踐,2022 年初,我們?nèi)孓D(zhuǎn)向 Flink CDC 。

上圖為 Flink + Canal 的實時數(shù)據(jù)入湖架構(gòu)。

Flink 啟動之后,首先讀取當前的 Binlog 信息,標記為 StartOffset ,通過 select 方式將全量數(shù)據(jù)采集上來,發(fā)往下游 Kafka。全量采集完畢之后,再從 startOffset 采集增量的日志信息,發(fā)往 Kafka。最終 Kafka 的數(shù)據(jù)由 Spark 消費后寫往 Hudi。

但是此架構(gòu)存在以下三個問題:

  • 全量與增量數(shù)據(jù)存在重復:因為采集過程中不會進行鎖表,如果在全量采集過程中有數(shù)據(jù)變更,并且采集到了這些數(shù)據(jù),那么這些數(shù)據(jù)會與 Binlog 中的數(shù)據(jù)存在重復;
  • 需要下游進行 Upsert 或 Merge 寫入才能剔除重復的數(shù)據(jù),確保數(shù)據(jù)的最終一致性;
  • 需要兩套計算引擎,再加上消息隊列 Kafka 才能將數(shù)據(jù)寫入到數(shù)據(jù)湖 Hudi 中,過程涉及組件多、鏈路長,且消耗資源大。

基于以上問題,我們整理出了數(shù)據(jù)集成的核心需求:

  1. 全量增量自動切換,并保證數(shù)據(jù)的準確性。Flink + Canal 的架構(gòu)能實現(xiàn)全量和增量自動切換,但無法保證數(shù)據(jù)的準確性;
  2. 最大限度地減少對源數(shù)據(jù)庫的影響,比如同步過程中盡量不使用鎖、能流控等;
  3. 能在已存在的任務中添加新表的數(shù)據(jù)采集,這是非常核心的需求,因為在復雜的生產(chǎn)環(huán)境中,等所有表都準備好之后再進行數(shù)據(jù)集成會導致效率低下。此外,如果不能做到任務的合并,需要起很多次任務,采集很多次 Binlog 的數(shù)據(jù),可能會導致 DB 機器帶寬被打滿;
  4. 能同時進行全量和增量日志采集,新增表不能暫停日志采集來確保數(shù)據(jù)的準確性,這種方式會給其他表日志采集帶來延遲;
  5. 能確保數(shù)據(jù)在同一主鍵 ID 下按歷史順序發(fā)生,不會出現(xiàn)后發(fā)生的事件先發(fā)送到下游。

Flink CDC 很好地解決了業(yè)務痛點,并且在可擴展性、穩(wěn)定性、社區(qū)活躍度方面都非常優(yōu)秀。

  • 首先,它能無縫對接 Flink 生態(tài),復用 Flink 眾多 sink 能力,使用 Flink 數(shù)據(jù)清理轉(zhuǎn)換的能力;
  • 其次,它能進行全量與增量自動切換,并且保證數(shù)據(jù)的準確性;
  • 第三,它能支持無鎖讀取、斷點續(xù)傳、水平擴展,特別是在水平擴展方面,理論上來說,給的資源足夠多時,性能瓶頸一般不會出現(xiàn)在 CDC 側(cè),而是在于數(shù)據(jù)源/目標源是否能支持讀/寫這么多數(shù)據(jù)。

二、Flink CDC 實踐問題與優(yōu)化

上圖為 Flink CDC 2.0 的架構(gòu)原理。它基于 FLIP-27 實現(xiàn),核心步驟如下:

  1. Enumerator 先將全量數(shù)據(jù)拆分成多個 SnapshotSplit,然后按照上圖中第一步將 SnapshotSplit 發(fā)送給 SourceReader 執(zhí)行。執(zhí)行過程中會對數(shù)據(jù)進行修正來保證數(shù)據(jù)的一致性;
  2. SnapshotSplit 讀取完成后向 Enumerator 匯報已讀取完成的塊信息;
  3. 重復執(zhí)行 (1) (2) 兩個步驟,直到將全量數(shù)據(jù)讀取完畢;
  4. 全量數(shù)據(jù)讀取完畢之后,Enumerator 會根據(jù)之前全量完成的 split 信息, 構(gòu)造一個 BinlogSplit。發(fā)送給 SourceRead 執(zhí)行,讀取增量日志數(shù)據(jù)。

問題一:新增表會停止 Binlog 日志流

?

在已存在的任務中添加新表是非常重要的需求, Flink CDC 2.0 也支持了這一功能。但是為了確保數(shù)據(jù)的一致性,F(xiàn)link CDC 2.0 在新增表的流程中,需要停止 Binlog 日志流的讀取,再進行新增表的全量數(shù)據(jù)讀取。等新增表的全量數(shù)據(jù)讀取完畢之后,再將之前停止的 Binlog 任務重新啟動。這也意味著新增表會影響其他表的日志采集進度。然而我們希望全量和增量兩個任務能夠同時進行,為了解決這一問題,我們對 Flink CDC 進行了拓展,支持了全量和增量日志流并行讀取,步驟如下:

  1. 程序啟動后,在 Enumerator 中創(chuàng)建 BinlogSplit ,放在分配列表的第一位,分配給 SourceReader 執(zhí)行增量數(shù)據(jù)采集;
  2. 與原有的全量數(shù)據(jù)采集一樣,Enumerator 將全量采集切分成多個 split 塊,然后將切分好的塊分配給 SourceReader 去執(zhí)行全量數(shù)據(jù)的采集;
  3. 全量數(shù)據(jù)采集完成之后,SourceReader 向 Enumerator 匯報已經(jīng)完成的全量數(shù)據(jù)采集塊的信息;
  4. 重復 (2) (3) 步,將全量的表采集完畢。

以上就是第一次啟動任務,全量與增量日志并行讀取的流程。新增表后,并行讀取實現(xiàn)步驟如下:

  1. 恢復任務時,F(xiàn)link CDC 會從 state 中獲取用戶新表的配置信息;
  2. 通過對比用戶配置信息與狀態(tài)信息,捕獲到要新增的表。對于 BinlogSplit 任務,會增加新表 binlog 數(shù)據(jù)的采集;對于 Enumerator 任務,會對新表進行全量切分;
  3. Enumerator 將切分好的 SnapshotSplit 分配給 SourceReader 執(zhí)行全量數(shù)據(jù)采集;
  4. 重復步驟 (3),直到所有全量數(shù)據(jù)讀取完畢。

然而,實現(xiàn)全量和增量日志并行讀取后,又出現(xiàn)了數(shù)據(jù)沖突問題。

如上圖所示, Flink CDC 在讀取全量數(shù)據(jù)之前,會先讀取當前 Binlog 的位置信息,將其標記為 LW,接著通過 select 的方式讀取全量數(shù)據(jù),讀取到上圖中 s1、s2、 s3、s4 四條數(shù)據(jù)。再讀取當前的 Binlog 位置,標記為 HW, 然后將 LW 和 HW 中變更的數(shù)據(jù) merge 到之前全量采集上來的數(shù)據(jù)中。經(jīng)過一系列操作后,最終全量采集到的數(shù)據(jù)是 s1、s2、s3、s4 和 s5。

而增量采集的進程也會讀取 Binlog 中的日志信息,會將 LW 和 HW 中的 s2、s2、s4、s5 四條數(shù)據(jù)發(fā)往下游。

上述整個流程中存在兩個問題:首先,數(shù)據(jù)多取,存在數(shù)據(jù)重復,上圖中紅色標識即存在重復的數(shù)據(jù);其次,全量和增量在兩個不同的線程中,也有可能是在兩個不同的 JVM 中,因此先發(fā)往下游的數(shù)據(jù)可能是全量數(shù)據(jù),也有可能是增量數(shù)據(jù),意味著同一主鍵 ID 到達下游的先后順序不是按歷史順序,與核心需求不符。

針對數(shù)據(jù)沖突問題,我們提供了基于 GTID 實現(xiàn)的處理方案。

首先,為全量數(shù)據(jù)打上 Snapshot 標簽,增量數(shù)據(jù)打上 Binlog 標簽;其次,為全量數(shù)據(jù)補充一個高水位 GTID 信息,而增量數(shù)據(jù)本身攜帶有 GTID 信息,因此不需要補充。將數(shù)據(jù)下發(fā),下游會接上一個 KeyBy 算子,再接上數(shù)據(jù)沖突處理算子,數(shù)據(jù)沖突的核心是保證發(fā)往下游的數(shù)據(jù)不重復,并且按歷史順序產(chǎn)生。

如果下發(fā)的是全量采集到的數(shù)據(jù),且此前沒有 Binlog 數(shù)據(jù)下發(fā),則將這條數(shù)據(jù)的 GTID 存儲到 state 并把這條數(shù)據(jù)下發(fā);如果 state 不為空且此條記錄的 GTID 大于等于狀態(tài)中的 GTID ,也將這條數(shù)據(jù)的 GTID 存儲到 state 并把這條數(shù)據(jù)下發(fā);

通過這種方式,很好地解決了數(shù)據(jù)沖突的問題,最終輸出到下游的數(shù)據(jù)是不重復且按歷史順序發(fā)生的。

然而,新的問題又產(chǎn)生了。在處理算法中可以看出,為了確保數(shù)據(jù)的不重復并且按歷史順序下發(fā),會將所有記錄對應的 GTID 信息存儲在狀態(tài)中,導致狀態(tài)一直遞增。

清理狀態(tài)一般首選 TTL,但 TTL 難以控制時間,且無法將數(shù)據(jù)完全清理掉。第二種方式是手動清理,全量表完成之后,可以下發(fā)一條記錄告訴下游清理 state 中的數(shù)據(jù)。

解決了以上所有問題,并行讀取的最終方案如下圖所示。

首先,給數(shù)據(jù)打上四種標簽,分別代表不同的狀態(tài):

  • SNAPSHOT:全量采集到的數(shù)據(jù)信息。
  • STATE_BINLOG:還未完成全量采集, Binlog 已采集到這張表的變更數(shù)據(jù)。
  • BINLOG:全量數(shù)據(jù)采集完畢之后,Binlog 再采集到這張表的變更數(shù)據(jù)。
  • TABLE_FINISHED:全量數(shù)據(jù)采集完成之后通知下游,可以清理 state。

具體實現(xiàn)步驟如下:

  1. 分配 Binlog ,此時 Binlog 采集到的數(shù)據(jù)都為 STATE_BINLOG 標簽;
  2. 分配 SnapshotSplit 任務,此時全量采集到的數(shù)據(jù)都為 SNAPSHOT 標簽;
  3. Enumerator 實時監(jiān)控表的狀態(tài),某一張表執(zhí)行完成并完成 checkpoint 后,通知 Binlog 任務。Binlog 任務收到通知后,將此表后續(xù)采集到的 Binlog 信息都打上 BINLOG 標簽;此外,它還會構(gòu)造一條 TABLE_FINISHED 記錄發(fā)往下游做處理;
  4. 數(shù)據(jù)采集完成后,除了接上數(shù)據(jù)沖突處理算子,此處還新增了一個步驟:從主流中篩選出來的 TABLE_FINISHED 事件記錄,通過廣播的方式將其發(fā)往下游,下游根據(jù)具體信息清理對應表的狀態(tài)信息。

問題二:寫 Hudi 時存在數(shù)據(jù)傾斜

如上圖,F(xiàn)link CDC 采集三張表數(shù)據(jù)的時候,會先讀取完 tableA 的全量數(shù)據(jù),再讀取tableB 的全量數(shù)據(jù)。讀取 tableA 的過程中,下游只有 tableA 的 sink 有數(shù)據(jù)流入。

我們通過多表混合讀取的方式來解決數(shù)據(jù)傾斜的問題。

引入多表混合之前,F(xiàn)link CDC 讀取完 tableA 的所有 chunk,再讀取 tableB 的所有 chunk。實現(xiàn)了多表混合讀取后,讀取的順序變?yōu)樽x取 tableA 的 chunk1、tableB 的 chunk1、tableC 的 chunk1,再讀取 tableA 的 chunk2,以此類推,最終很好地解決了下游 sink 數(shù)據(jù)傾斜的問題,保證每個 sink 都有數(shù)據(jù)流入。

我們對多表混合讀取的性能進行了測試,由 TPCC 工具構(gòu)造的測試數(shù)據(jù),讀取了 4。張表,總并行度為 8,每個 sink 的并行度為 2,寫入時間由原來的 46 分鐘降至 20 分鐘,性能提升 2.3 倍。

需要注意的是,如果 sink 的并行度和總并行度相等,則性能不會有明顯提升,多表混合讀取主要的作用是更快地獲取到每張表下發(fā)的數(shù)據(jù)。

問題三:需要用戶手動指定 schema 信息

用戶手動執(zhí)行 DB schema 與 sink 之間 schema 映射關(guān)系,開發(fā)效率低,耗時長且容易出錯。

為了降低用戶的使用門檻,提升開發(fā)效率,我們實現(xiàn)了 Oracle catalog ,讓用戶能以低代碼的方式、無需指定 DB schema 信息與 sink schema 信息的映射關(guān)系,即可通過 Flink CDC 將數(shù)據(jù)寫入到 Hudi。

三、未來規(guī)劃

  • 第一, 支持 schema 信息變更同步。比如數(shù)據(jù)源發(fā)生了 schema 信息變更,能夠?qū)⑵渫降?Kafka 和 Hudi 中;支持平臺接入更多數(shù)據(jù)源類型,增強穩(wěn)定性,實現(xiàn)更多應用場景的落地。
  • 第二, 支持 SQL 化的方式,使用 Flink CDC 將數(shù)據(jù)同步到 Hudi 中,降低用戶的使用門檻。
  • 第三, 希望技術(shù)更開放,與社區(qū)共同成長,為社區(qū)貢獻出自己的一份力量。

提問&解答

Q1斷點續(xù)傳采集如何處理?

斷點續(xù)傳有兩種,分為全量和 Binlog。但它們都是基于 Flink state 的能力,同步的過程中會將進度存儲到 state 中。如果失敗了,下一次再從 state 中恢復即可。

Q2MySQL 在監(jiān)控多表使用 SQL 寫入 Hudi 表中的時候,存在多個 job,維護很麻煩,如何通過單 job 同步整庫?

我們基于 GTID 的方式對 Flink CDC 進行了拓展,支持任務中新增表,且不影響其他表的采集進度。不考慮新增表影響到其他表進度的情況下,也可以基于 Flink CDC 2.2 做新增表的能力。

Q3順豐這些特性會在 CDC 開源版本中實現(xiàn)嗎?

目前我們的方案還存在一些局限性,比如必須用 MySQL 的 GTID,需要下游有數(shù)據(jù)沖突處理的算子,因此較難實現(xiàn)在社區(qū)中開源。

Q4Flink CDC 2.0 新增表支持全量 + 增量嗎?

是的。

Q5GTID 去重算子會不會成為性能瓶頸?

經(jīng)過實踐,不存在性能瓶頸,它只是做了一些數(shù)據(jù)的判斷和過濾。

責任編輯:未麗燕 來源: Apache Flink
相關(guān)推薦

2021-06-04 07:24:14

Flink CDC數(shù)據(jù)

2025-02-11 10:13:05

2021-09-07 10:41:21

CDC數(shù)據(jù)湖Apache Hud

2021-08-31 10:07:16

Flink Hud數(shù)據(jù)湖阿里云

2021-09-13 13:46:29

Apache HudiB 站數(shù)據(jù)湖

2022-06-10 15:21:15

MySQL CDCSqlServer數(shù)據(jù)庫

2023-12-14 13:01:00

Hudivivo

2022-07-20 23:15:11

Flink數(shù)據(jù)集CDC

2023-02-26 00:12:10

Hadoop數(shù)據(jù)湖存儲

2024-06-04 07:29:13

2023-07-12 16:07:50

鏈路數(shù)據(jù)湖技術(shù)

2022-05-23 13:30:48

數(shù)據(jù)胡實踐

2022-06-21 14:02:29

MongoDB數(shù)據(jù)庫存儲

2022-10-24 00:26:51

大數(shù)據(jù)Hadoop存儲層

2023-02-13 14:01:32

2024-02-01 12:32:35

MySQL數(shù)據(jù)鎖數(shù)據(jù)庫

2020-03-26 10:05:18

大數(shù)據(jù)IT互聯(lián)網(wǎng)

2022-08-06 08:23:47

云計算公有云廠商成本

2023-04-04 12:38:50

GPT機器人LLM

2017-12-15 09:20:20

IT運維順豐
點贊
收藏

51CTO技術(shù)棧公眾號