自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

MongoDB Stream是如何實(shí)現(xiàn)完美數(shù)據(jù)增量遷移的?

數(shù)據(jù)庫 其他數(shù)據(jù)庫 MongoDB
MongoDB 3.6版本開始便提供了Change Stream功能,支持對(duì)數(shù)據(jù)變更記錄做監(jiān)聽。這為實(shí)現(xiàn)數(shù)據(jù)同步及轉(zhuǎn)換處理提供了更大的便利,下面將探討如何利用Change Stream實(shí)現(xiàn)數(shù)據(jù)的增量遷移。

一、背景介紹

最近微服務(wù)架構(gòu)火得不行,但本質(zhì)上也只是風(fēng)口上的一個(gè)熱點(diǎn)詞匯。

作為筆者的經(jīng)驗(yàn)來說,想要應(yīng)用一個(gè)新的架構(gòu)需要帶來的變革成本是非常高的。

盡管如此,目前還是有許多企業(yè)踏上了服務(wù)化改造的道路,這其中則免不了“舊改”的各種繁雜事。

所謂的“舊改”,就是把現(xiàn)有的系統(tǒng)架構(gòu)來一次重構(gòu),拆分成多個(gè)細(xì)粒度的服務(wù)后,然后找時(shí)間升級(jí)割接一把,讓新系統(tǒng)上線。這其中,數(shù)據(jù)的遷移往往會(huì)成為一個(gè)非常重要且繁雜的活兒。

拆分服務(wù)時(shí)數(shù)據(jù)遷移的挑戰(zhàn)在哪?

  • 首先是難度大,做一個(gè)遷移方案需要了解項(xiàng)目的前身今世,評(píng)估遷移方案、技術(shù)工具等;
  • 其次是成本高。由于新舊系統(tǒng)數(shù)據(jù)結(jié)構(gòu)是不一樣的,需要定制開發(fā)遷移轉(zhuǎn)化功能,很難有一個(gè)通用的工具能一鍵遷移;
  • 再者對(duì)于一些容量大、可靠性要求高的系統(tǒng),要能夠不影響業(yè)務(wù),出了問題還能追溯,因此方案上還得往復(fù)雜了想。

二、常見方案

按照遷移的方案及流程,可將數(shù)據(jù)遷移分為三類:

1、停機(jī)遷移

最簡單的方案,停機(jī)遷移的順序如下:

采用停機(jī)遷移的好處是流程操作簡單,工具成本低,然而缺點(diǎn)也很明顯,遷移過程中業(yè)務(wù)是無法訪問的,因此只適合于規(guī)格小、允許停服的場景。

2、業(yè)務(wù)雙寫

業(yè)務(wù)雙寫是指對(duì)現(xiàn)有系統(tǒng)先進(jìn)行改造升級(jí),支持同時(shí)對(duì)新庫和舊庫進(jìn)行寫入。之后再通過數(shù)據(jù)遷移工具對(duì)舊數(shù)據(jù)做全量遷移,待所有數(shù)據(jù)遷移轉(zhuǎn)換完成后切換到新系統(tǒng)。

示意圖:

 

業(yè)務(wù)雙寫的方案是平滑的,對(duì)線上業(yè)務(wù)影響極小,在出現(xiàn)問題的情況下可重新來過,操作壓力也會(huì)比較小。

筆者在早些年前嘗試過這樣的方案,整個(gè)遷移過程確實(shí)非常順利,但實(shí)現(xiàn)該方案比較復(fù)雜,需要對(duì)現(xiàn)有的代碼進(jìn)行改造并完成新數(shù)據(jù)的轉(zhuǎn)換及寫入,對(duì)于開發(fā)人員的要求較高。在業(yè)務(wù)邏輯清晰、團(tuán)隊(duì)對(duì)系統(tǒng)有足夠的把控能力的場景下適用。

3、增量遷移

增量遷移的基本思路是先進(jìn)行全量的遷移轉(zhuǎn)換,待完成后持續(xù)進(jìn)行增量數(shù)據(jù)的處理,直到數(shù)據(jù)追平后切換系統(tǒng)。

示意圖:

 

關(guān)鍵點(diǎn):

要求系統(tǒng)支持增量數(shù)據(jù)的記錄。對(duì)于MongoDB可以利用oplog實(shí)現(xiàn)這點(diǎn),為避免全量遷移過程中oplog被沖掉,在開始遷移前就必須開始監(jiān)聽oplog,并將變更全部記錄下來;如果沒有辦法,需要從應(yīng)用層上考慮,比如為所有的表(集合)記錄下updateTime這樣的時(shí)間戳,或者升級(jí)應(yīng)用并支持將修改操作單獨(dú)記錄下來。

增量數(shù)據(jù)的回放是持續(xù)的。在所有的增量數(shù)據(jù)回放轉(zhuǎn)換過程中,系統(tǒng)仍然會(huì)產(chǎn)生新的增量數(shù)據(jù),這要求遷移工具能做到將增量數(shù)據(jù)持續(xù)回放并將之追平,之后才能做系統(tǒng)切換。

MongoDB 3.6版本開始便提供了Change Stream功能,支持對(duì)數(shù)據(jù)變更記錄做監(jiān)聽。這為實(shí)現(xiàn)數(shù)據(jù)同步及轉(zhuǎn)換處理提供了更大的便利,下面將探討如何利用Change Stream實(shí)現(xiàn)數(shù)據(jù)的增量遷移。

三、Change Stream介紹

Chang Stream(變更記錄流)是指collection(數(shù)據(jù)庫集合)的變更事件流,應(yīng)用程序通過db.collection.watch()這樣的命令可以獲得被監(jiān)聽對(duì)象的實(shí)時(shí)變更。

在該特性出現(xiàn)之前,你可以通過拉取oplog達(dá)到同樣的目的;但oplog的處理及解析相對(duì)復(fù)雜且存在被回滾的風(fēng)險(xiǎn),如果使用不當(dāng)?shù)脑掃€會(huì)帶來性能問題。Change Stream可以與aggregate framework結(jié)合使用,對(duì)變更集進(jìn)行進(jìn)一步的過濾或轉(zhuǎn)換。

參考鏈接:https://docs.mongodb.com/manual/aggregation/

由于Change Stream利用了存儲(chǔ)在oplog中的信息,因此對(duì)于單進(jìn)程部署的MongoDB無法支持Change Stream功能,其只能用于啟用了副本集的獨(dú)立集群或分片集群。

監(jiān)聽的目標(biāo)

變更事件

一個(gè)Change Stream Event的基本結(jié)構(gòu)如下所示:

字段說明:

Change Steram支持的變更類型有以下幾個(gè):

利用以下的shell腳本,可以打印出集合 T_USER上的變更事件:

下面提供一些樣例,感受一下:

insert事件

update事件

replace事件

delete事件

invalidate事件

更多的Change Event信息可以參考:https://docs.mongodb.com/manual/reference/change-events/

四、實(shí)現(xiàn)增量遷移

本次設(shè)計(jì)了一個(gè)簡單的論壇帖子遷移樣例,用于演示如何利用Change Stream實(shí)現(xiàn)***的增量遷移方案。

背景如下:

現(xiàn)有的系統(tǒng)中有一批帖子,每個(gè)帖子都屬于一個(gè)頻道(channel),如下表:

新系統(tǒng)中頻道字段將采用英文簡稱,同時(shí)要求能支持平滑升級(jí)。根據(jù)前面篇幅的敘述,我們將使用Change Stream功能實(shí)現(xiàn)一個(gè)增量遷移的方案。

相關(guān)表的轉(zhuǎn)換如下圖:

原理

topic是帖子原表,在遷移開始前將開啟watch任務(wù)持續(xù)獲得增量數(shù)據(jù),并記錄到 topic_incr表中;接著執(zhí)行全量的遷移轉(zhuǎn)換,之后再持續(xù)對(duì)增量表數(shù)據(jù)進(jìn)行遷移,直到無新的增量為止。

接下來我們使用Java程序來完成相關(guān)代碼,mongodb-java--driver在3.6版本后才支持watch功能,需要確保升級(jí)到對(duì)應(yīng)版本:

定義Channel頻道的轉(zhuǎn)換表:

 

  1. public static enum Channel {  
  2.     Food("美食"),  
  3.     Emotion("情感"), 
  4.     Pet("寵物"),  
  5.     House("家居"),  
  6.     Marriage("征婚"),  
  7.     Education("教育"),  
  8.     Travel("旅游"
  9.  
  10.     private final String oldName;  
  11.     public String getOldName() {  
  12.         return oldName;  
  13.     }  
  14.     private Channel(String oldName) {  
  15.         this.oldName = oldName;  
  16.     }  
  17.     /**  
  18.      * 轉(zhuǎn)換為新的名稱  
  19.      *  
  20.      * @param oldName  
  21.      * @return  
  22.      */  
  23.     public static String toNewName(String oldName) {  
  24.         for (Channel channel : values()) {  
  25.             if (channel.oldName.equalsIgnoreCase(oldName)) {  
  26.                 return channel.name();  
  27.             }  
  28.         }  
  29.         return "" 
  30.     }  
  31.     /**  
  32.      * 返回一個(gè)隨機(jī)頻道  
  33.      *  
  34.      * @return  
  35.      */  
  36.     public static Channel random() {  
  37.         Channel[] channels = values();  
  38.         int idx = (int) (Math.random() * channels.length);  
  39.         return channels[idx];  
  40.     }  

為topic表預(yù)寫入1w條記錄:

開啟監(jiān)聽任務(wù),將topic上的所有變更寫入到增量表:

代碼中通過watch命令獲得一個(gè)MongoCursor對(duì)象,用于遍歷所有的變更。

FullDocument.UPDATE_LOOKUP選項(xiàng)啟用后,在update變更事件中將攜帶完整的文檔數(shù)據(jù)(FullDocument)。

watch()命令提交后,mongos會(huì)與分片上的mongod(主節(jié)點(diǎn))建立訂閱通道,這可能需要花費(fèi)一點(diǎn)時(shí)間。

為了模擬線上業(yè)務(wù)的真實(shí)情況,啟用幾個(gè)線程對(duì)topic表進(jìn)行持續(xù)寫操作:

 

ChangeTask實(shí)現(xiàn)邏輯如下:

每一個(gè)變更任務(wù)會(huì)不斷對(duì)topic產(chǎn)生寫操作,觸發(fā)一系列ChangeEvent產(chǎn)生:

  • doInsert:生成隨機(jī)頻道的topic后,執(zhí)行insert;
  • doUpdate:隨機(jī)取得一個(gè)topic,將其channel字段改為隨機(jī)值,執(zhí)行update;
  • doReplace:隨機(jī)取得一個(gè)topic,將其channel字段改為隨機(jī)值,執(zhí)行replace;
  • doDelete:隨機(jī)取得一個(gè)topic,執(zhí)行delete。

以doUpdate為例,實(shí)現(xiàn)代碼如下:

 

啟動(dòng)一個(gè)全量遷移任務(wù),將topic表中數(shù)據(jù)遷移到topic_new新表:

在全量遷移開始前,先獲得當(dāng)前時(shí)刻的的*** _id 值(可以將此值記錄下來)作為終點(diǎn),隨后逐個(gè)完成遷移轉(zhuǎn)換。

在全量遷移完成后,便開始***一步:增量遷移。

注:增量遷移過程中,變更操作仍然在進(jìn)行。

 

  1. final MongoCollection<Document> topicIncrCollection = getCollection(coll_topic_incr);  
  2. final MongoCollection<Document> topicNewCollection = getCollection(coll_topic_new);  
  3. ObjectId currentId = null 
  4. Document sort = new Document("_id", 1);  
  5. MongoCursor<Document> cursor = null 
  6. // 批量大小  
  7. int batchSize = 100;AtomicInteger count = new AtomicInteger(0);  
  8. try {  
  9.     while (true) {  
  10.         boolean isWatchTaskStillRunning = watchFlag.getCount() > 0;  
  11.         // 按ID增量分段拉取  
  12.         if (currentId == null) {  
  13.             cursor = topicIncrCollection.find().sort(sort).limit(batchSize).iterator();  
  14.         } else {  
  15.             cursor = topicIncrCollection.find(new Document("_id", new Document("$gt", currentId)))  
  16.                     .sort(sort).limit(batchSize).iterator();  
  17.         }  
  18.         boolean hasIncrRecord = false 
  19.         while (cursor.hasNext()) {  
  20.             hasIncrRecord = true 
  21.             Document incrDoc = cursor.next();  
  22.             OperationType opType = OperationType.fromString(incrDoc.getString(field_op));  
  23.             ObjectId docId = incrDoc.getObjectId(field_key);  
  24.             // 記錄當(dāng)前ID  
  25.             currentId = incrDoc.getObjectId("_id"); 
  26.             if (opType == OperationType.DELETE) {  
  27.                 topicNewCollection.deleteOne(new Document("_id", docId));  
  28.             } else {  
  29.                 Document doc = incrDoc.get(field_data, Document.class);  
  30.                 // channel轉(zhuǎn)換  
  31.                 String oldChannel = doc.getString(field_channel);  
  32.                 doc.put(field_channel, Channel.toNewName(oldChannel));  
  33.                 // 啟用upsert  
  34.                 UpdateOptions options = new UpdateOptions().upsert(true);  
  35.                 topicNewCollection.replaceOne(new Document("_id", docId),  
  36.                         incrDoc.get(field_data, Document.class), options);  
  37.             }  
  38.             if (count.incrementAndGet() % 10 == 0) {  
  39.                 logger.info("IncrTransferTask progress, count: {}"count.get());  
  40.             }  
  41.         }  
  42.         // 當(dāng)watch停止工作(沒有更多變更),同時(shí)也沒有需要處理的記錄時(shí),跳出  
  43.         if (!isWatchTaskStillRunning && !hasIncrRecord) {  
  44.             break;  
  45.         }  
  46.         sleep(200);  
  47.     } 
  48.  } catch (Exception e) {  
  49.     logger.error("IncrTransferTask ERROR", e);  

增量遷移的實(shí)現(xiàn)是一個(gè)不斷tail的過程,利用 **_id 字段的有序特性 ** 進(jìn)行分段遷移;即記錄下當(dāng)前處理的_id值,循環(huán)拉取在該_id值之后的記錄進(jìn)行處理。

增量表(topic_incr)中除了DELETE變更之外,其余的類型都保留了整個(gè)文檔,因此可直接利用replace + upsert追加到新表。

***,運(yùn)行整個(gè)程序。

查看topic表和topic_new表,發(fā)現(xiàn)兩者數(shù)量是相同的。為了進(jìn)一步確認(rèn)一致性,我們對(duì)兩個(gè)表的分別做一次聚合統(tǒng)計(jì):

topic表

topic_new表

前者輸出結(jié)果:

后者輸出結(jié)果:

前后對(duì)比的結(jié)果是一致的。

五、后續(xù)優(yōu)化

前面的章節(jié)演示了一個(gè)增量遷移的樣例,在投入到線上運(yùn)行之前,這些代碼還得繼續(xù)優(yōu)化:

  • 寫入性能,線上的數(shù)據(jù)量可能會(huì)達(dá)到億級(jí),在全量、增量遷移時(shí)應(yīng)采用合理的批量化處理;另外可以通過增加并發(fā)線程,添置更多的Worker,分別對(duì)不同業(yè)務(wù)庫、不同表進(jìn)行處理以提升效率。增量表存在冪等性,即回放多次其最終結(jié)果還是一致的,但需要保證表級(jí)有序,即一個(gè)表同時(shí)只有一個(gè)線程在進(jìn)行增量回放。
  • 容錯(cuò)能力,一旦watch監(jiān)聽任務(wù)出現(xiàn)異常,要能夠從更早的時(shí)間點(diǎn)開始(使用startAtOperationTime參數(shù)),而如果寫入時(shí)發(fā)生失敗,要支持重試。
  • 回溯能力,做好必要的跟蹤記錄,比如將轉(zhuǎn)換失敗的ID號(hào)記錄下來,舊系統(tǒng)的數(shù)據(jù)需要保留,以免在事后追究某個(gè)數(shù)據(jù)問題時(shí)找不著北。
  • 數(shù)據(jù)轉(zhuǎn)換,新舊業(yè)務(wù)的差異不會(huì)很簡單,通常需要借助大量的轉(zhuǎn)換表來完成。

一致性檢查,需要根據(jù)業(yè)務(wù)特點(diǎn)開發(fā)自己的一致性檢查工具,用來證明遷移后數(shù)據(jù)達(dá)到想要的一致性級(jí)別。

BTW,數(shù)據(jù)遷移一定要結(jié)合業(yè)務(wù)特性、架構(gòu)差異來做考慮,否則還是在耍流氓。

六、小結(jié)

服務(wù)化系統(tǒng)中擴(kuò)容、升級(jí)往往會(huì)進(jìn)行數(shù)據(jù)遷移,對(duì)于業(yè)務(wù)量大,中斷敏感的系統(tǒng)通常會(huì)采用平滑遷移的方式。

MongoDB 3.6版本后提供了Change Stream功能以支持應(yīng)用訂閱數(shù)據(jù)的變更事件流,本文使用Stream功能實(shí)現(xiàn)了增量平滑遷移的例子,這是一次嘗試,相信后續(xù)這樣的應(yīng)用場景會(huì)越來越多。

附參考文檔

https://docs.mongodb.com/manual/changeStreams/

  • Use-ChangeStream To Handle Temperature

https://www.percona.com/blog/2017/11/22/mongodb-3-6-change-streams-nest-temperature-fan-control-use-case/ 

 

責(zé)任編輯:龐桂玉 來源: DBAplus社群
相關(guān)推薦

2015-01-26 14:08:37

USP服務(wù)器數(shù)據(jù)中心

2023-07-27 07:35:55

HTTP持久化服務(wù)器

2021-07-09 18:26:41

PythonMySQL MongoDB

2019-01-02 16:40:13

MongoDBPostgres數(shù)據(jù)庫

2024-04-19 08:32:07

Redis緩存數(shù)據(jù)庫

2009-01-16 14:22:27

ETLExtract數(shù)據(jù)增量抽取

2011-04-14 10:18:20

數(shù)據(jù)遷移

2023-08-11 08:34:40

開發(fā)工具

2017-10-20 08:45:15

數(shù)據(jù)庫MongoDBMySQL

2009-03-30 14:30:52

2021-04-22 11:22:12

云計(jì)算數(shù)據(jù)遷移混合云

2015-07-27 09:32:36

BAT數(shù)據(jù)中心

2024-11-05 15:02:41

2014-11-24 09:23:22

華為數(shù)據(jù)中心

2010-08-12 09:43:31

CassandraMongoDB

2014-01-16 16:58:06

cdn

2010-09-15 14:45:30

數(shù)據(jù)保護(hù)

2024-08-09 08:00:00

2023-10-19 16:39:38

2022-05-16 08:22:37

零拷貝Netty
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)