自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink CDC MongoDB Connector 的實(shí)現(xiàn)原理和使用實(shí)踐

數(shù)據(jù)庫
MongoDB 是一種面向文檔的非關(guān)系型數(shù)據(jù)庫,支持半結(jié)構(gòu)化數(shù)據(jù)存儲(chǔ);也是一種分布式的數(shù)據(jù)庫,提供副本集和分片集兩種集群部署模式,具有高可用和水平擴(kuò)展的能力,比較適合大規(guī)模的數(shù)據(jù)存儲(chǔ)。

摘要:本文整理自 XTransfer 資深 Java 開發(fā)工程師、Flink CDC Maintainer 孫家寶在 Flink CDC Meetup 的演講。主要內(nèi)容包括:

  1. MongoDB Change Stream 技術(shù)簡介
  2. MongoDB CDC Connector 業(yè)務(wù)實(shí)踐
  3. MongoDB CDC Connector 生產(chǎn)調(diào)優(yōu)
  4. MongoDB CDC Connector 并行化 Snapshot 改進(jìn)
  5. 后續(xù)規(guī)劃

01MongoDB Change Stream 技術(shù)簡介

圖片

MongoDB 是一種面向文檔的非關(guān)系型數(shù)據(jù)庫,支持半結(jié)構(gòu)化數(shù)據(jù)存儲(chǔ);也是一種分布式的數(shù)據(jù)庫,提供副本集和分片集兩種集群部署模式,具有高可用和水平擴(kuò)展的能力,比較適合大規(guī)模的數(shù)據(jù)存儲(chǔ)。另外, MongoDB 4.0 版本還提供了多文檔事務(wù)的支持,對(duì)于一些比較復(fù)雜的業(yè)務(wù)場(chǎng)景更加友好。

圖片

MongoDB 使用了弱結(jié)構(gòu)化的存儲(chǔ)模式,支持靈活的數(shù)據(jù)結(jié)構(gòu)和豐富的數(shù)據(jù)類型,適合 Json 文檔、標(biāo)簽、快照、地理位置、內(nèi)容存儲(chǔ)等業(yè)務(wù)場(chǎng)景。它天然的分布式架構(gòu)提供了開箱即用的分片機(jī)制和自動(dòng) rebalance 能力,適合大規(guī)模數(shù)據(jù)存儲(chǔ)。另外, MongoDB 還提供了分布式網(wǎng)格文件存儲(chǔ)的功能,即 GridFS,適合圖片、音頻、視頻等大文件存儲(chǔ)。

圖片

MongoDB 提供了副本集和分片集兩種集群模部署模式。

副本集:高可用的部署模式,次要節(jié)點(diǎn)通過拷貝主要節(jié)點(diǎn)的操作日志來進(jìn)行數(shù)據(jù)的復(fù)制。當(dāng)主要節(jié)點(diǎn)發(fā)生故障時(shí),次要節(jié)點(diǎn)和仲裁節(jié)點(diǎn)會(huì)重新發(fā)起投票來選出新的主要節(jié)點(diǎn),實(shí)現(xiàn)故障轉(zhuǎn)移。另外,次要節(jié)點(diǎn)還能分擔(dān)查詢請(qǐng)求,減輕主要節(jié)點(diǎn)的查詢壓力。

分片集:水平擴(kuò)展的部署模式,將數(shù)據(jù)均勻分散在不同 Shard 上,每個(gè) Shard 可以部署為一個(gè)副本集,Shard 中主要節(jié)點(diǎn)承載讀寫請(qǐng)求,次要節(jié)點(diǎn)會(huì)復(fù)制主要節(jié)點(diǎn)的操作日志,能夠根據(jù)指定的分片索引和分片策略將數(shù)據(jù)切分成多個(gè) 16MB 的數(shù)據(jù)塊,并將這些數(shù)據(jù)塊交給不同 Shard 進(jìn)行存儲(chǔ)。Config Servers 中會(huì)記錄 Shard 和數(shù)據(jù)塊的對(duì)應(yīng)關(guān)系。

圖片

MongoDB 的 Oplog 與 MySQL 的 Binlog 類似,記錄了數(shù)據(jù)在 MongoDB 中所有的操作日志。Oplog 是一個(gè)有容量的集合,如果超出預(yù)設(shè)的容量范圍,則會(huì)丟棄先前的信息。

圖片

與 MySQL 的 Binlog 不同, Oplog 并不會(huì)記錄變更前/后的完整信息。遍歷 Oplog 的確可以捕獲 MongoDB 的數(shù)據(jù)變更,但是想要轉(zhuǎn)換成 Flink 支持的 Changelog 依然存在一些限制。

首先,訂閱 Oplog 難度較大。每個(gè)副本集會(huì)維護(hù)自己的 Oplog, 對(duì)于分片集群來說,每個(gè) Shard 可能是一個(gè)獨(dú)立的副本集,需要遍歷每個(gè) Shard 的 Oplog 并按照操作時(shí)間進(jìn)行排序。另外, Oplog 沒有包含變更文檔前和變更后的完整狀態(tài),因此既不能轉(zhuǎn)換成 Flink 標(biāo)準(zhǔn)的 Changelog ,也不能轉(zhuǎn)換成 Upsert 類型的 Changelog 。這亦是我們?cè)趯?shí)現(xiàn) MongoDB CDC Connector 的時(shí)候沒有采用直接訂閱 Oplog 方案的主要原因。

圖片


最終我們選擇使用 MongoDB Change Streams 方案來實(shí)現(xiàn) MongoDB CDC Connector。

Change Streams 是 MongoDB 3.6 版本提供的新特性,它提供了更簡單的變更數(shù)據(jù)捕獲接口,屏蔽了直接遍歷 Oplog 的復(fù)雜度。Change Streams 還提供了變更后文檔完整狀態(tài)的提取功能,可以輕松轉(zhuǎn)換成 Flink Upsert 類型的 Changelog。它還提供了比較完整的故障恢復(fù)能力,每一條變更記錄數(shù)據(jù)都會(huì)包含一個(gè) resume token 來記錄當(dāng)前變更流的位置。故障發(fā)生后,可以通過 resume token 從當(dāng)前消費(fèi)點(diǎn)進(jìn)行恢復(fù)。

另外, Change Streams 支持變更事件的篩選和定制化的功能。比如可以將數(shù)據(jù)庫和集合名稱的正則過濾器下推到 MongoDB 來完成,可以明顯減少網(wǎng)絡(luò)開銷。它還提供了對(duì)集合庫以及整個(gè)集群級(jí)別的變更訂閱,能夠支持相應(yīng)的權(quán)限控制。

圖片

使用 MongoDB Change Streams 特性實(shí)現(xiàn)的 CDC Connector 如上圖所示。首先通過 Change Streams 訂閱 MongoDB 的變更。比如有 insert、update、delete、replace 四種變更類型,先將其轉(zhuǎn)換成 Flink 支持的 upsert  Changelog,便可以在其之上定義成一張動(dòng)態(tài)表,使用 Flink SQL 進(jìn)行處理。

目前 MongoDB CDC Connector 支持 Exactly-Once 語義,支持全量加增量的訂閱,支持從檢查點(diǎn)、保存點(diǎn)恢復(fù),支持 Snapshot 數(shù)據(jù)的過濾,支持?jǐn)?shù)據(jù)庫的 Database、Collection 等元數(shù)據(jù)的提取,也支持庫集合的正則篩選功能。

02MongoDB CDC Connector 業(yè)務(wù)實(shí)踐

圖片

XTransfer 成立于 2017 年,聚焦于 B2B 跨境支付業(yè)務(wù),為從事跨境電商出口的中小微企業(yè)提供外貿(mào)收款以及風(fēng)控服務(wù)??缇?B 類業(yè)務(wù)結(jié)算場(chǎng)景涉及的業(yè)務(wù)鏈路很長,從詢盤到最終的成交,過程中涉及物流條款、支付條款等,需要在每個(gè)環(huán)節(jié)上做好風(fēng)險(xiǎn)管控,以符合跨境資金交易的監(jiān)管要求。

以上種種因素對(duì) XTransfer 的數(shù)據(jù)處理安全性和準(zhǔn)確性都提出了更高的要求。在此基礎(chǔ)上,XTransfer 基于 Flink 搭建了自己的大數(shù)據(jù)平臺(tái),能夠有效保障在跨境 B2B 全鏈路上的數(shù)據(jù)能夠被有效地采集、加工和計(jì)算,并滿足了高安全、低延遲、高精度的需求。

圖片

變更數(shù)據(jù)采集 CDC 是數(shù)據(jù)集成的關(guān)鍵環(huán)節(jié)。在沒有使用 Flink CDC  之前,一般使用 Debezium、Canal 等傳統(tǒng) CDC 工具來抽取數(shù)據(jù)庫的變更日志,并將其轉(zhuǎn)發(fā)到 Kafka 中,下游讀取 Kafka 中的變更日志進(jìn)行消費(fèi)。這種架構(gòu)存在以下痛點(diǎn):

  • 部署組件多,運(yùn)維成本較高;
  • 下游數(shù)據(jù)消費(fèi)邏輯需要根據(jù)寫入端進(jìn)行適配,存在一定的開發(fā)成本;
  • 數(shù)據(jù)訂閱配置較復(fù)雜,無法像 Flink CDC 一樣僅通過 SQL 語句便定義出一個(gè)完整的數(shù)據(jù)同步邏輯;
  • 難以全部滿足全量 + 增量采集,可能需要引入 DataX 等全量采集組件;
  • 比較偏向于對(duì)變更數(shù)據(jù)的采集,對(duì)數(shù)據(jù)的處理過濾能力較為薄弱;
  • 難以滿足異構(gòu)數(shù)據(jù)源打?qū)挼膱?chǎng)景。

目前我們的大數(shù)據(jù)平臺(tái)主要使用 Flink CDC 來進(jìn)行變更數(shù)據(jù)捕獲,它具有如下優(yōu)勢(shì):

1. 實(shí)時(shí)數(shù)據(jù)集成

圖片


  • 無須額外部署 Debezium、Canal、Datax 等組件,運(yùn)維成本大幅降低;
  • 支持豐富的數(shù)據(jù)源,也可復(fù)用 Flink 既有的 connectors 進(jìn)行數(shù)據(jù)采集寫入,可以覆蓋大多數(shù)業(yè)務(wù)場(chǎng)景;
  • 降低了開發(fā)難度,僅通過 Flink SQL 就可以定義出完整的數(shù)據(jù)集成工作流程;
  • 數(shù)據(jù)處理能力較強(qiáng),依托于 Flink 平臺(tái)強(qiáng)大的計(jì)算能力可以實(shí)現(xiàn)流式 ETL 甚至異構(gòu)數(shù)據(jù)源的 join、group by 等。

2. 構(gòu)建實(shí)時(shí)數(shù)倉

圖片

  • 大幅簡化實(shí)時(shí)數(shù)倉的部署難度,通過 Flink CDC 實(shí)時(shí)采集數(shù)據(jù)庫的變更,并寫入 Kafka、Iceberg、Hudi、TiDB 等數(shù)據(jù)庫中,即可使用 Flink 進(jìn)行深度的數(shù)據(jù)挖掘和數(shù)據(jù)處理。
  • Flink 的計(jì)算引擎可以支持流批一體的計(jì)算模式,不用再維護(hù)多套計(jì)算引擎,可以大幅降低數(shù)據(jù)的開發(fā)成本。

3. 實(shí)時(shí)風(fēng)控

圖片

  • 實(shí)時(shí)風(fēng)控以往一般采取往 Kafka 中發(fā)業(yè)務(wù)事件的方式實(shí)現(xiàn),而使用 Flink CDC 之后,可以直接從業(yè)務(wù)庫中捕獲風(fēng)控事件,然后通過 Flink CDC 來進(jìn)行復(fù)雜的事件處理。
  • 可以運(yùn)行模型,以通過 Flink ML、Alink 來豐富機(jī)器學(xué)習(xí)的能力。最后將這些實(shí)時(shí)風(fēng)控的處置結(jié)果回落進(jìn) Kafka,下達(dá)風(fēng)控指令。

03MongoDB CDC Connector 生產(chǎn)調(diào)優(yōu)

圖片

MongoDB CDC Connector 的使用有如下幾點(diǎn)要求:

  • 鑒于使用了 Change Streams 的特性來實(shí)現(xiàn) MongoDB CDC Connector, 因此要求 MongoDB 的最小可用版本是 3.6,比較推薦 4.0.8 及以上版本。
  • 必須使用集群部署模式。由于訂閱 MongoDB 的 Change Streams 要求節(jié)點(diǎn)之間能夠進(jìn)行相互復(fù)制數(shù)據(jù),單機(jī) MongoDB 無法進(jìn)行數(shù)據(jù)的互相拷貝,也沒有 Oplog,只有副本集或分片集的情況下才有數(shù)據(jù)復(fù)制機(jī)制。
  • 需要使用 WireTiger 存儲(chǔ)引擎,使用 pv1 復(fù)制協(xié)議。
  • 需要擁有 ChangeStream 和 find 用戶權(quán)限。

圖片

使用 MongoDB CDC Connector 時(shí)要注意設(shè)置 Oplog 的容量和過期時(shí)間。MongoDB oplog 是一個(gè)特殊的有容量集合,容量達(dá)到最大值后,會(huì)丟棄歷史數(shù)據(jù)。而 Change Streams 通過 resume token 來進(jìn)行恢復(fù),太小的 oplog 容量可能會(huì)導(dǎo)致 resume token 對(duì)應(yīng)的 oplog 記錄不再存在,即 resume token 過期,進(jìn)而導(dǎo)致 Change Streams 無法被恢復(fù)。

可以使用 replSetResizeOplog 設(shè)置 oplog 容量和最短保留時(shí)間,MongoDB 4.4 版本之后也支持設(shè)置最小時(shí)間。一般而言,生產(chǎn)環(huán)境中建議 oplog 保留不小于 7 天。

圖片

對(duì)一些變更較慢的表,建議在配置中開啟心跳事件。變更事件和心跳事件可以同時(shí)向前推進(jìn) resume token,對(duì)于變更較慢的表,可以通過心跳事件來刷新 resume token 避免其過期。

可以通過 heartbeat.interval.ms 設(shè)置心跳的間隔。

圖片

由于只能將 MongoDB 的 Change Streams 轉(zhuǎn)換成 Flink 的 Upsert changelog,它類似于 Upsert Kafka 形式,為了補(bǔ)齊 –U 前置鏡像值,會(huì)增加一個(gè)算子 ChangelogNormalize,而這會(huì)帶來額外的狀態(tài)開銷。因此在生產(chǎn)環(huán)境中比較推薦使用 RocksDB State Backend。

圖片

當(dāng)默認(rèn)連接的參數(shù)無法滿足使用需求時(shí),可以通過設(shè)置 connection.options 配置項(xiàng)來傳遞 MongoDB 支持的連接參數(shù)。

比如連接 MongoDB 的用戶創(chuàng)建的數(shù)據(jù)庫不在 admin 中,可以設(shè)置參數(shù)來指定需要使用哪個(gè)數(shù)據(jù)庫來認(rèn)證當(dāng)前用戶,也可以設(shè)置連接池的最大連接參數(shù)等,MongoDB 的連接字符串默認(rèn)支持這些參數(shù)。

圖片

正則匹配多庫、多表是 MongoDB CDC Connector 在 2.0 版本之后提供的新功能。需要注意,如果數(shù)據(jù)庫名稱使用了正則參數(shù),則需要擁有 readAnyDatabase 角色。因?yàn)?MongoDB 的 Change Streams 只能在整個(gè)集群、數(shù)據(jù)庫以及 collection 粒度上開啟。如果需要對(duì)整個(gè)數(shù)據(jù)庫進(jìn)行過濾,那么數(shù)據(jù)庫進(jìn)行正則匹配時(shí)只能在整個(gè)集群上開啟 Change Streams ,然后通過 Pipeline 過濾數(shù)據(jù)庫的變更??梢酝ㄟ^在 Ddatabase 和 Collection 兩個(gè)參數(shù)中寫入正則表達(dá)式進(jìn)行多庫、多表的訂閱。

04MongoDB CDC Connector并行化 Snapshot 改進(jìn)

圖片

為了加速 Snapshot 的速度,可以使用 Flip-27 引入的 source 來進(jìn)行并行化改造。首先使用一個(gè) split 枚舉器,根據(jù)一定的切分策略,將一個(gè)完整的 Snapshot 任務(wù)拆分成若干個(gè)子任務(wù),然后分配給多個(gè) split reader 并行做 Snapshot ,以此提升整體任務(wù)的運(yùn)行速度。

但是在 MongoDB 里,大多情況下組件是 ObjectID,其中前面四個(gè)字節(jié)是 UNIX 描述,中間五個(gè)字節(jié)是一個(gè)隨機(jī)值,后面三個(gè)字節(jié)是一個(gè)自增量。在相同描述里插入的文檔并不是嚴(yán)格遞增的,中間的隨機(jī)值可能會(huì)影響局部的嚴(yán)格遞增,但從總體來看,依然能夠滿足遞增趨勢(shì)。

因此,不同于 MySQL 的遞增組件,MongoDB 并不適合采用 offset + limit 的切分策略對(duì)其集合進(jìn)行簡單拆分,需要針對(duì) ObjectID 采用針對(duì)性的切分策略。

圖片

最終,我們采取了以下三種 MongoDB 切分策略:

  • Sample 采樣分桶:原理是利用 $sample 命令對(duì) collection 進(jìn)行隨機(jī)采樣,通過平均文檔大小和每個(gè) chunk 的大小來預(yù)估需要的分桶數(shù)。要求相應(yīng)集合的查詢權(quán)限,其優(yōu)點(diǎn)是速度較快,適用于數(shù)據(jù)量大但是沒有分片的集合;缺點(diǎn)是由于使用了抽樣預(yù)估模式,分桶的結(jié)果不能做到絕對(duì)均勻。
  • SplitVector 索引切分:SplitVector 是 MongoDB 計(jì)算 chunk 分裂點(diǎn)的內(nèi)部命令,通過訪問指定的索引計(jì)算出每個(gè) chunk 的邊界。要求擁有 SplitVector 權(quán)限,其優(yōu)點(diǎn)是速度快,chunk 結(jié)果均勻;缺點(diǎn)是對(duì)于數(shù)據(jù)量大且已經(jīng)分片的集合,不如直接讀取 config 庫中已經(jīng)分好的 chunks 元數(shù)據(jù)。
  • Chunks 元數(shù)據(jù)讀?。阂?yàn)?MongoDB 在 config 數(shù)據(jù)庫會(huì)存儲(chǔ)分片集合的實(shí)際分片結(jié)果,因此可以直接從 config 中讀取分片集合的實(shí)際分片結(jié)果。要求擁有 config 庫讀取權(quán)限,僅限于分片集合使用。其優(yōu)點(diǎn)是速度快,無須重新計(jì)算 chunk 分裂點(diǎn),chunk 結(jié)果均勻,默認(rèn)情況下為 64MB;缺點(diǎn)是不能滿足所有場(chǎng)景,僅限分片場(chǎng)景。

圖片

上圖為 sample 采樣分桶示例。左側(cè)是一個(gè)完整的集合,從完整的集合中設(shè)定樣本數(shù)量,然后將整個(gè)樣本縮小,并根據(jù)采樣以后的樣本進(jìn)行分桶,最終結(jié)果就是我們希望的 chunks 邊界。

sample 命令是 MongoDB 采樣的一個(gè)內(nèi)置命令。在樣本值小于 5% 的情況下,使用偽隨機(jī)算法進(jìn)行采樣;樣本值大于 5% 的情況下,先使用隨機(jī)排序,然后選擇前 N 個(gè)文檔。它的均勻度和耗時(shí)主要取決于隨機(jī)算法和樣本的數(shù)量,是一種均勻程度和切分速度的折中策略,適合于要求切分速度快,但可以容忍切分結(jié)果不太均勻的場(chǎng)景。

在實(shí)際測(cè)試中,sample 采樣的均勻程度有著不錯(cuò)的表現(xiàn)。

圖片

上圖為 SplitVector 索引切分示例。左側(cè)是原始集合,通過 SplitVector 命令指定需要訪問的索引,為 ID 索引。可以設(shè)置每個(gè) chunk 的大小,單位為 MB,然后使用 SplitVector 命令訪問索引,并通過索引計(jì)算每個(gè)塊的邊界。

它速度快,chunk 結(jié)果也很均勻,適用于大部分場(chǎng)景。

圖片

上圖為 config.chuncks 讀取示例,即直接讀取 MongoDB 已經(jīng)分好的 chunks 元數(shù)據(jù)。在 Config Server 中會(huì)存儲(chǔ)每個(gè) Shard、其所在機(jī)器以及每個(gè) Shard 的邊界。對(duì)于分片集合,可以直接在 chunks 中讀取它的邊界信息,無須重復(fù)計(jì)算這些分裂點(diǎn),也可以保證每一個(gè) chunk 的讀取在單臺(tái)機(jī)器上就能完成,速度極快,在大規(guī)模的分片集合場(chǎng)景下有著很好的表現(xiàn)。

05后續(xù)規(guī)劃

圖片

Flink CDC 的后續(xù)規(guī)劃主要分為以下五個(gè)方面:

  • 第一,協(xié)助完善 Flink CDC 增量 Snapshot 框架;
  • 第二,使用 MongoDB CDC 對(duì)接 Flink CDC 增量 Snapshot 框架,使其能夠支持并行 Snapshot 改進(jìn);
  • 第三,MongoDB CDC 支持 Flink RawType。對(duì)于一些比較靈活的存儲(chǔ)結(jié)構(gòu)提供 RawType 轉(zhuǎn)換,用戶可以通過 UDF 的形式對(duì)其進(jìn)行自定義解析;
  • 第四,MongoDB CDC 支持從指定位置進(jìn)行變更數(shù)據(jù)的采集;
  • 第五,MongoDB CDC 穩(wěn)定性的優(yōu)化。

提問&解答

Q1?:MongoDB CDC 延遲高嗎?是否需要通過犧牲性能來降低延遲?

MongoDB CDC 延遲不高,在全量采集的時(shí)候經(jīng)過 changelog normalize 可能會(huì)對(duì)于 CDC 的增量采集造成一些背壓,但是這種情況可以通過 MongoDB 并行化改造、增加資源的方式來避免。

Q2?:默認(rèn)連接什么時(shí)候無法滿足要求?

MongoDB 的用戶可以在任何數(shù)據(jù)庫、任何子庫中進(jìn)行創(chuàng)建。如果不是在 admin 的數(shù)據(jù)庫中創(chuàng)建用戶,認(rèn)證的時(shí)候需要顯示地指定要在哪個(gè)數(shù)據(jù)庫中認(rèn)證用戶,也可以設(shè)置最大的連接大小等參數(shù)。

Q3?:MongoDB 目前的 DBlog 支持無鎖并發(fā)讀取嗎?

DBlog 的無鎖并發(fā)擁有增量快照的能力,但是因?yàn)?MongoDB 難以獲取當(dāng)前 changelog 的位點(diǎn),所以增量快照無法立刻實(shí)現(xiàn),但無鎖并發(fā)的 Snapshot 即將支持。

責(zé)任編輯:未麗燕 來源: Apache Flink
相關(guān)推薦

2022-06-10 15:21:15

MySQL CDCSqlServer數(shù)據(jù)庫

2021-07-16 10:05:34

項(xiàng)目企業(yè)系統(tǒng)

2021-08-16 08:44:54

Pravega Fli項(xiàng)目協(xié)議

2022-07-20 23:15:11

Flink數(shù)據(jù)集CDC

2022-06-09 14:19:46

順豐數(shù)據(jù)集成Flink

2017-07-07 14:30:27

Flink架構(gòu)拓?fù)?/a>

2017-07-25 16:34:06

數(shù)據(jù)庫sqlmongodb

2022-11-01 08:02:04

2024-02-01 12:32:35

MySQL數(shù)據(jù)鎖數(shù)據(jù)庫

2019-11-12 11:15:39

setTimeout前端代碼

2025-02-11 10:13:05

2018-05-16 15:26:43

數(shù)據(jù)庫MySQL主從復(fù)制

2022-10-24 00:48:58

Go語言errgroup

2022-09-16 08:23:22

Flink數(shù)據(jù)湖優(yōu)化

2021-10-20 09:58:46

開發(fā)視圖系統(tǒng)

2022-05-06 09:22:25

Go泛型

2022-03-17 08:55:43

本地線程變量共享全局變量

2021-09-17 07:51:24

Keepalived服務(wù)高可用

2021-06-04 07:24:14

Flink CDC數(shù)據(jù)

2021-05-06 11:54:40

大數(shù)據(jù)Flink
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)