貨拉拉Flink CDC實(shí)踐:穩(wěn)定性建設(shè)與數(shù)據(jù)入湖新探索
一、貨拉拉業(yè)務(wù)背景介紹
1. 貨拉拉背景介紹
貨拉拉是一家拉貨搬家跑腿發(fā)長途平臺(tái),創(chuàng)立于 2013 年,成長于粵港澳大灣區(qū),是從事同城/跨城貨運(yùn)、企業(yè)版物流服務(wù)、搬家、零擔(dān)、跑腿、冷運(yùn)、汽車租售及車后市場服務(wù)的互聯(lián)網(wǎng)物流商城。通過共享模式整合社會(huì)運(yùn)力資源,完成海量運(yùn)力儲(chǔ)備,并依托移動(dòng)互聯(lián)、大數(shù)據(jù)和人工智能技術(shù),搭建“方便、科技、可靠”的貨運(yùn)平臺(tái),實(shí)現(xiàn)多種車型的即時(shí)智能調(diào)度,為個(gè)人、商戶及企業(yè)提供高效的物流解決方案。
2. 業(yè)務(wù)整體增長情況
截至 2023 年 12 月,貨拉拉業(yè)務(wù)范圍覆蓋全球 11 個(gè)市場,包括中國及東南亞、南亞、南美洲等地區(qū),其中中國內(nèi)地總共覆蓋 363 座城市,月活司機(jī)達(dá) 90 萬,月活用戶達(dá) 1200 萬,每天產(chǎn)生訂單、司機(jī)、汽車物聯(lián)網(wǎng)數(shù)據(jù)量達(dá)到 PB 級別。如何穩(wěn)定、高效、快速采集到這些數(shù)據(jù),挖掘業(yè)務(wù)數(shù)據(jù)價(jià)值,釋放新質(zhì)生產(chǎn)力成為公司運(yùn)營和決策的關(guān)鍵。
3. 業(yè)務(wù)攀升的穩(wěn)定性挑戰(zhàn)
隨著企業(yè)業(yè)務(wù)量的急速攀升,逐漸遇到新的挑戰(zhàn),首先是實(shí)時(shí)抽數(shù)延遲嚴(yán)重,導(dǎo)致下游 Flink 的雙流 Join 產(chǎn)生問題,并帶來數(shù)據(jù)時(shí)效性、數(shù)據(jù)鏈路穩(wěn)定性等問題。早期使用 Canal 作為實(shí)時(shí)數(shù)采集主要存在以下問題:
- 架構(gòu)陳舊:單節(jié)點(diǎn)部且非分布式運(yùn)行,維護(hù)頻率低。
- Canal 維護(hù)性差:可維護(hù)性差,Canal 社區(qū)的整體上下游處于不活躍,導(dǎo)致維護(hù)性成本特別高。
- 上游數(shù)據(jù)采集穩(wěn)定性差,結(jié)合歷史故障以及冒煙測試,發(fā)現(xiàn)實(shí)時(shí)數(shù)據(jù)采集穩(wěn)定性主要集中在上游數(shù)據(jù)采集端。
接下來將介紹貨拉拉實(shí)時(shí)數(shù)據(jù)采集改造為什么選擇 Flink CDC 作為新的實(shí)時(shí)數(shù)據(jù)采集和同步框架。
二、貨拉拉為何選擇 Flink CDC
1. 選擇四象限作為思考切入點(diǎn)
首先我們會(huì)從上述四點(diǎn)去考慮到底需要一款什么工具作為貨拉拉的實(shí)時(shí)數(shù)據(jù)同步工具。
- 功能性:實(shí)時(shí)數(shù)據(jù)平臺(tái)首先考慮完善的功能性,F(xiàn)link SQL 目前開源版本僅支持單表單庫同步,如果業(yè)務(wù)方想完成其同步作業(yè)的話,必須使用 SQL 或 Flink CDC3.0 的 yaml 配置化方式才能完成整庫同步開發(fā)。
- 對標(biāo) Canal 兼容性:歷史業(yè)務(wù)方使用 Canal 進(jìn)行數(shù)據(jù)采集,以及下游不限于大數(shù)據(jù)團(tuán)隊(duì)的消費(fèi)方均使用 Canal,因此要對部分 Canal 功能進(jìn)行兼容性對標(biāo),已實(shí)現(xiàn)業(yè)務(wù)感知和改動(dòng)最小化。
- 鏈路穩(wěn)定性保障:涉及下游任務(wù)方的改造,當(dāng)前只能通過 Kafka 消費(fèi)組獲取下游消費(fèi)方,因此希望下游消費(fèi)方無需做過多改動(dòng),如 SQL 任務(wù)下游僅需切換 CDC 數(shù)據(jù)源即可;同時(shí)包裝了一個(gè)消費(fèi) CDC 的 SDK 供業(yè)務(wù)使用,依據(jù)相關(guān) topic 命名規(guī)則即可完成整個(gè)鏈路切換,保障鏈路切換的穩(wěn)定性。
- 保障數(shù)據(jù)一致性:鏈路切換時(shí)希望保障數(shù)據(jù)的一致性,即最終數(shù)據(jù)結(jié)果是等價(jià)的。因此需要通過一些科學(xué)的數(shù)據(jù)驗(yàn)證手段,如雙跑驗(yàn)證、采用對數(shù)工具,保證數(shù)據(jù)最終一致。
2. 開源組件對比
我們在進(jìn)行實(shí)時(shí)數(shù)據(jù)同步調(diào)研時(shí)對一些開源組件的功能、使用場景、穩(wěn)定性以及社區(qū)生態(tài)等多方面進(jìn)行了對比,包括 Flink CDC、Canal、Apache SeaTunnel 以及 DataX。
- CDC 同步機(jī)制:傳統(tǒng)數(shù)據(jù)同步方面,DataX 只支持查詢的 CDC 操作。Flink CDC 只需要訂閱 binlog 即可完成數(shù)據(jù)采集比較服務(wù)業(yè)務(wù)訴求。
- 全量+增量同步:只有 Flink CDC 支持全量+增量數(shù)據(jù)同步,滿足貨拉拉某些場景下采集全量數(shù)據(jù)構(gòu)建湖倉一體,業(yè)務(wù)需要持續(xù)性地對歷史數(shù)據(jù)進(jìn)行全量采集并加上增量數(shù)據(jù)同步,而其他組件在此方面表現(xiàn)為不支持或部分支持。
- 部署形態(tài):由于 Flink CDC 是依托于 Flink 的底層架構(gòu),F(xiàn)link 本身采用分布式部署,架構(gòu)選型會(huì)考慮 Flink CDC 在數(shù)據(jù)采集階段以及下游消費(fèi)階段的整體的一些協(xié)調(diào)性。
- 穩(wěn)定性:Flink CDC 依靠于 Flink 的 HA 機(jī)制,包括 ZooKeeper 以及 on K8s 的高可用,整體上會(huì)更加傾向于 Flink CDC 作為實(shí)時(shí)鏈路的數(shù)據(jù)同步工具。
3. 未來數(shù)據(jù)入湖需求
我們正在建設(shè)的數(shù)據(jù)入湖,也做了一些面向未來的設(shè)計(jì),包括 CDC 數(shù)據(jù)入湖分析,數(shù)據(jù)時(shí)效性高且為結(jié)構(gòu)化數(shù)據(jù),而埋點(diǎn)數(shù)據(jù)時(shí)效性低且非結(jié)構(gòu)化數(shù)據(jù),以及日志數(shù)據(jù)需要間接性統(tǒng)計(jì)和分析,并且為非結(jié)構(gòu)樹數(shù)據(jù)。這里我們需要通過引入 CDC pipeline 機(jī)制對接 Paimon Yaml 配置,便可通過 CDC 將傳統(tǒng) MySQL 數(shù)據(jù)庫直接訂閱入湖到 Paimon,然后進(jìn)行數(shù)據(jù)加工等 ETL 相關(guān)操作。
經(jīng)過前期的深度思考、對比與總結(jié)最終形成了如上圖所示的架構(gòu),主要包括數(shù)據(jù)來源、業(yè)務(wù)場景、數(shù)據(jù)服務(wù)以及數(shù)據(jù)湖平臺(tái)、數(shù)據(jù)引擎、湖倉格式、數(shù)據(jù)存儲(chǔ)層以及業(yè)務(wù)等。數(shù)據(jù)內(nèi)部開發(fā)平臺(tái)主要是元數(shù)據(jù)平臺(tái)(元初)、離線數(shù)據(jù)平臺(tái)(IDP)以及實(shí)時(shí)數(shù)據(jù)開發(fā)平臺(tái)(飛流);數(shù)據(jù)湖平臺(tái)主要包含數(shù)據(jù)集成服務(wù)和湖倉優(yōu)化服務(wù)。數(shù)據(jù)集成服務(wù)采用 Flink CDC 實(shí)時(shí)采集把數(shù)據(jù)源的數(shù)據(jù)訂閱到湖倉里面,并通過 Amoro 進(jìn)行自動(dòng)優(yōu)化湖倉,從而達(dá)到湖倉一體的整體架構(gòu)。在執(zhí)行引擎方面當(dāng)前只是完成了基于 Flink Engine 的建設(shè),對于灰色的 Doris Engine、Spark Engine 以及 Presto Engine 將是 2025 年的建設(shè)重點(diǎn),數(shù)據(jù)加工完成后將輸送給業(yè)務(wù)方,如埋點(diǎn)業(yè)務(wù)、業(yè)務(wù)畫像以及實(shí)時(shí)大屏、同時(shí)也會(huì)輸出給內(nèi)部 GPT 項(xiàng)目等提供給業(yè)務(wù)方去使用。
三、貨拉拉 CDC 生產(chǎn)實(shí)踐
1. 飛流實(shí)時(shí)計(jì)算平臺(tái)能力建設(shè)
飛流作為貨拉拉的實(shí)時(shí)計(jì)算平臺(tái),為了很好的對接 Flink CDC,實(shí)時(shí)數(shù)據(jù)計(jì)算平臺(tái)進(jìn)行了升級優(yōu)化,主要包括以下幾個(gè)方面:
- 平臺(tái)感知能力:修改了很多底層代碼,新增了 Metrics 的一些能力,如把 DB 底層的 Metrics 進(jìn)行了封裝,連同 Flink 的 Metrics 一并上報(bào),形成報(bào)警能力,便于業(yè)務(wù)及時(shí)發(fā)現(xiàn) DB 底層的整體采集狀況。
- 平臺(tái)配置化能力:對 Flink CDC 的 catalog 做了一層封裝,同時(shí)支持 Flink Yaml 的配置化方式,提供了更多的靈活性。
- 平臺(tái)數(shù)據(jù)協(xié)議優(yōu)化:由于采用 Flink CDC Connector 進(jìn)行二次開發(fā),當(dāng)前對數(shù)據(jù)協(xié)議進(jìn)行了二次封裝,把內(nèi)部的 DB 層數(shù)據(jù)進(jìn)行打?qū)?,并增加了一些原始字段,支持業(yè)務(wù)方消費(fèi)這些數(shù)據(jù),同時(shí)做到了傳統(tǒng)數(shù)據(jù)庫的采集數(shù)據(jù)落庫。
- 數(shù)據(jù)解析優(yōu)化:通過增加元數(shù)據(jù)字段的一些信息,提高了在數(shù)據(jù)協(xié)議和數(shù)據(jù)解析的速度。
- SDK 封裝:由于 CDC 數(shù)據(jù)的使用者不僅包括大數(shù)據(jù)內(nèi)部平臺(tái),還包含很多線上業(yè)務(wù)方,因此封裝了一套 SDK,屏蔽 CDC 相對業(yè)務(wù)方比較復(fù)雜的概念與邏輯,交付業(yè)務(wù)方使用。
從數(shù)據(jù)架構(gòu)層面,目前正在做的是統(tǒng)一數(shù)據(jù)采集的工作,如海內(nèi)網(wǎng)逐步推進(jìn)整體使用 Flink CDC 替換掉 Canal,以及一鍵入倉、一鍵入湖的工作,甚至一些流量回放業(yè)務(wù)場景。在數(shù)據(jù)遷移方面,我們也會(huì)用到 Flink CDC。
穩(wěn)定性方面,引入了限流的能力,如會(huì)限制 sink 的采集速度,避免在采集高風(fēng)險(xiǎn)期引起數(shù)據(jù)庫的整體壓力。采集性能方面引入了多線程處理,提升解析能力。同時(shí)做了全局血緣的關(guān)聯(lián),用于快速感知業(yè)務(wù)方使用 CDC 表,以及 CDC 采集數(shù)據(jù)影響下游任務(wù),可以快速讓業(yè)務(wù)方感知采集出現(xiàn)問題時(shí)會(huì)導(dǎo)致哪些業(yè)務(wù)受到影響。
以上就是對飛流實(shí)時(shí)計(jì)算平臺(tái)整體能力的介紹。
2. 常規(guī)對數(shù)方法校驗(yàn)
由于采用 Flink CDC 代替了 Canal 進(jìn)行實(shí)時(shí)數(shù)據(jù)采集,因此需要進(jìn)行數(shù)據(jù)校驗(yàn)和對比。首先在常規(guī)對數(shù)方面,對特殊字段類型,如時(shí)間類型、bigInt、dynamic 等特殊字段的數(shù)據(jù)一致性校驗(yàn),同時(shí)基于時(shí)間切片做了 count 統(tǒng)計(jì)操作。由于消費(fèi)方在大數(shù)據(jù)內(nèi)部,因此還會(huì)涉及到數(shù)倉分層逐層對數(shù)的校驗(yàn),這里我們使用 Flink batch task 在維度時(shí)間對齊、最終切片對齊的最大差異、差異占比以及差異分布等方面進(jìn)行統(tǒng)一對數(shù)。
3. 數(shù)據(jù)科學(xué)方法校驗(yàn)
上文提到使用 Flink batch task 進(jìn)行統(tǒng)一對數(shù),主要會(huì)在基于差異率的正負(fù)進(jìn)行分布式對數(shù),差異統(tǒng)計(jì)表、全局指標(biāo)的差值以及與 Canal 對比差異的趨勢率。如上圖可以看到,可通過總條數(shù)以及每一個(gè)時(shí)間切片上面每一個(gè)數(shù)據(jù)的準(zhǔn)確性進(jìn)行整體對比,確保從 ODS 到 DWD 以及 DWS 層整體鏈路數(shù)據(jù)準(zhǔn)確性和最終一致性,如果出現(xiàn)數(shù)據(jù)缺少將會(huì)主動(dòng)進(jìn)行排查。
4. 數(shù)據(jù)雙跑校驗(yàn)
還會(huì)通過數(shù)據(jù)雙跑進(jìn)行數(shù)據(jù)校驗(yàn),如通過生產(chǎn) Kafka 和驗(yàn)證 Kafka 去進(jìn)行數(shù)據(jù)交叉鏈路驗(yàn)證對比,然后基于 binlog 采集時(shí)間對比這一段時(shí)間的數(shù)據(jù)總數(shù)以及數(shù)據(jù)的準(zhǔn)確性進(jìn)而得出一個(gè)交叉率,當(dāng)兩部分?jǐn)?shù)據(jù)完全一致時(shí)交叉率應(yīng)該是 100%,最終會(huì)輸出一份報(bào)告給到業(yè)務(wù)方,使業(yè)務(wù)方信任,并推動(dòng)業(yè)務(wù)使用鏈路切換工作順利開展。
5. Schema Change 信息變更處理
由于基于 Flink CDC Connector 進(jìn)行開發(fā),只有 3.0 才支持 Schema 變更操作,當(dāng)前做法是把 Schema Change 通過一個(gè)測流發(fā)送到對應(yīng)告警的 Kafka topic,并通過消費(fèi)再發(fā)出一個(gè)告警卡片,同時(shí)會(huì)將此任務(wù)告警和下一個(gè)任務(wù) Flink taskId 進(jìn)行關(guān)聯(lián),通知下游業(yè)務(wù)方 Schema 變更消息。后續(xù)我們將接入 CDC3.X Pipeline Connector,進(jìn)行定制化開發(fā),提供分流告警和下游支持等。
6. Canal VS Flink CDC 穩(wěn)定性對比
下面介紹一下切換后的整體穩(wěn)定性。以某一真實(shí)在線業(yè)務(wù)為例,在下午高峰期采集的時(shí)候,使用 Canal 最大的延遲在 3030s 左右,而使用 Flink CDC 基本維持在毫秒級別。在采集的整體穩(wěn)定性方面,可以看到 CDC 整體采集穩(wěn)定性要比 Canal 有顯著提升,最高可提高 80 倍。采集波動(dòng)率方面,Canal 采集按照 Batch 作業(yè)有批量的波動(dòng),而 CDC 則保持在一個(gè)穩(wěn)定的水平。
截止到目前,我們已經(jīng)有 100+ 個(gè) CDC 采集業(yè)務(wù),其中有 70+ 是之前的 Canal 任務(wù)切換到 Flink CDC,后續(xù)海外一些 Canal 采集也將會(huì)采用 Flink CDC 代替。
整體上延遲最高下降了 80%,同時(shí)我們基于協(xié)議進(jìn)行改造,因此消息中間件的數(shù)據(jù)存量也下降了 30%,并且完成了一些核心應(yīng)用加關(guān)鍵線上業(yè)務(wù)的接入。上圖給出了整體延遲的 1h 截圖,可發(fā)現(xiàn)使用 Flink CDC 的數(shù)據(jù)采集基本上穩(wěn)定保持在 1s 左右,可以比較好地保持?jǐn)?shù)據(jù)的新鮮度。
7. 建設(shè)成果
整體建設(shè)成果方面,當(dāng)前通過訂閱關(guān)系型數(shù)據(jù)庫,通過飛流平臺(tái)使用 Flink 作業(yè)進(jìn)行數(shù)據(jù)采集,寫入到 Kafka 或流入數(shù)據(jù)湖組件上,后續(xù)經(jīng)過離線 ETL 加工輸出后生成一些報(bào)表。目前公司內(nèi)部業(yè)務(wù)包括小伙拉行、貨拉拉、跑腿等多個(gè)業(yè)務(wù)線使用 Flink CDC 代替了原先的 Canal 進(jìn)行實(shí)時(shí)數(shù)據(jù)鏈路采集,整體業(yè)務(wù)數(shù)據(jù)量達(dá)到 TB-PB 級別,并且多個(gè)實(shí)時(shí)看板、云臺(tái)、BI 報(bào)表以及交易 2.0 等業(yè)務(wù)也使用 Flink CDC 進(jìn)行實(shí)時(shí)數(shù)據(jù)采集。最終我們希望可以實(shí)現(xiàn)數(shù)據(jù)訂閱鏈路的“以舊換新“,后續(xù)將持續(xù)對老鏈路的替換,最終完成平臺(tái)化工程建設(shè)。
四、CDC 數(shù)據(jù)入湖&未來展望
結(jié)合公司內(nèi)部使用場景以及阿里最新發(fā)布的 Fluss 項(xiàng)目,為我們帶來了一些新的想法。如上圖,業(yè)務(wù)數(shù)據(jù)經(jīng)過 CDC 訂閱同步后進(jìn)入到 Fluss,F(xiàn)luss 將消費(fèi) CDC 的數(shù)據(jù)產(chǎn)生 changeLog,并將這個(gè) changeLog 給到 Flink 下游繼續(xù)去消費(fèi)。同時(shí)也會(huì)通過 Compaction Service 生成數(shù)據(jù)到 LakeHouse Storage,這一部分?jǐn)?shù)據(jù)通過 Compaction Service 生成一些湖格式的表,如 Paimon 或 Iceberg 表,這些表可以通過外表的形式給到 OLAP 引擎或流計(jì)算引擎進(jìn)行查詢。同時(shí)在 Flink 的 source 一端做合并讀的操作,如把 LakeHouse storage 進(jìn)行合并讀從而屏蔽掉用戶對流和批的差異。
當(dāng)然這樣將數(shù)據(jù)引入到 LakeHouse storage 會(huì)帶來讀放大的問題,可以引入 Amoro 持續(xù)優(yōu)化 Paimon 和 Iceberg 表減少小文件的數(shù)量,同時(shí)在為下游消費(fèi)這部分 CDC 數(shù)據(jù)時(shí)帶來更好的體驗(yàn)。
當(dāng)前我們正在探索 Flink CDC+數(shù)據(jù)湖(Paimon 和 Iceberg),并結(jié)合 Apache Amoro 實(shí)現(xiàn)全自動(dòng)數(shù)據(jù)入湖,形成完整的數(shù)據(jù)入湖生態(tài)體系,進(jìn)一步提升數(shù)據(jù)時(shí)效性和準(zhǔn)確性,以滿足業(yè)務(wù)方對數(shù)據(jù)新鮮度的需求。并將與數(shù)據(jù)湖開源社區(qū)開展深度合作與探討,把場景固化,加速湖倉一體落地的進(jìn)程。
我們還會(huì)考慮多數(shù)據(jù)源訂閱的需求,滿足關(guān)系型和非慣性數(shù)據(jù)的訂閱查詢,如支持 MongoDB 數(shù)據(jù)的訂閱,構(gòu)建貨拉拉統(tǒng)一實(shí)時(shí)采集和湖倉數(shù)據(jù)生態(tài)。