MongoDB Change Streams性能優(yōu)化實踐
簡介:基于MongoDB的應(yīng)用程序通過Change Streams功能可以方便的實現(xiàn)對某個集合,數(shù)據(jù)庫或者整個集群的數(shù)據(jù)變更的訂閱,極大的方便了應(yīng)用對數(shù)據(jù)庫變化的感知,但是當(dāng)前Change Streams對部分?jǐn)?shù)據(jù)的變化并沒有提供對應(yīng)的事件(創(chuàng)建索引,刪除索引,shardCollection)等,本文介紹一種新的事件訂閱方式,來完善上述不足,并探討通過并發(fā)預(yù)讀的方式,來提升原生Change Streams的性能。
一、前言
MongoDB作為一款優(yōu)秀的NOSQL數(shù)據(jù)庫,支持海量存儲,查詢能力豐富以及優(yōu)秀的性能和可靠性,當(dāng)前大部分云廠商都提供了兼容MongoDB協(xié)議的服務(wù),用戶使用廣泛,深受國內(nèi)外用戶和企業(yè)的認(rèn)可。
MongoDB從3.6版本開始提供了Change Stream特性,通過該特性,應(yīng)用程序可以實時的訂閱特定集合、庫、或整個集群的數(shù)據(jù)變更事件,相比該特性推出之前通過監(jiān)聽oplog的變化來實現(xiàn)對數(shù)據(jù)變更的感知,非常的易用,該特性同時支持副本集和集群場景。
Change Streams功能目前支持大部分?jǐn)?shù)據(jù)操作的事件,但是對于與部分其他操作,如創(chuàng)建索引,刪除索引,ColMod, shardCollection并不支持,而且目前Change Streams內(nèi)部實現(xiàn)是通過Aggregate命令的方式完成的, 對于分片集群場景下, 在mongos節(jié)點是通過單線程匯聚的方式完成從shard節(jié)點上oplog的拉取和處理,當(dāng)實例寫入壓力很大的情況下,感知數(shù)據(jù)的實時變化會有延遲,性能有待提升,對于ChangeStreams目前的性能問題,官方也有過探討https://jira.mongodb.org/browse/SERVER-46979。
本文通過深入分析當(dāng)前的Change Stream實現(xiàn)機制,結(jié)合客戶實際使用場景,提出了一種新的多并發(fā)預(yù)讀的事件監(jiān)聽方式,來解決上述問題,并應(yīng)用到客戶實際遷移和數(shù)據(jù)庫容災(zāi)的場景中。
二、Change Steams 機制介紹
Change Streams支持對單個集合,DB,集群進行事件訂閱,當(dāng)業(yè)務(wù)程序通過watch的方式發(fā)起訂閱后,背后發(fā)生了什么,讓我們一起來分析一下。
Change Streams內(nèi)部實現(xiàn)是通過Aggregate的方式實現(xiàn)的,所以watch背后,對應(yīng)的是客戶端向MongoDB Server發(fā)起了一個Aggregate命令,且對Aggregate的pipeline 參數(shù)中,添加了一個$changeStream的Stage, 結(jié)合客戶端其他參數(shù),一起發(fā)給MongoDB Server。
當(dāng)Mongo Server收到Aggregate命令后,解析后,會根據(jù)具體的請求,組合一個新的Aggregate命令,并把該命令發(fā)給對應(yīng)的Shard節(jié)點,同時會在游標(biāo)管理器(CursorManger)中注冊一個新的游標(biāo)(cursor),并把游標(biāo)Id返回給客戶端。
當(dāng)Shard Server端收到Aggregate命令后,構(gòu)建pipeline流水線,并根據(jù)pipeline參數(shù)中包括了Change Steams參數(shù),確定原始掃描的集合為oplog,并創(chuàng)建對該集合上掃描數(shù)據(jù)的原始cursor, 和對應(yīng)的查詢計劃執(zhí)行器(PlanExecutor),構(gòu)建PlanExecutor時候,用了一個特殊的執(zhí)行Stage, 即ProxyStage完成對整個Pipeline的封裝,此外也會把對應(yīng)的游標(biāo)ID返回給Mongos節(jié)點。
客戶端利用從Mongos節(jié)點拿到游標(biāo)ID, 在該游標(biāo)上不斷的執(zhí)行g(shù)etMore請求,服務(wù)端收到getMore請求后,最后通過cursor的next調(diào)用,轉(zhuǎn)發(fā)請求到shard節(jié)點,拿到數(shù)據(jù)后,歸并后返回可客戶端,完成了整個Change Streams事件的訂閱。
Shard上pipeline具體執(zhí)行的細(xì)節(jié)不在本文重點介紹范圍,這些就不詳細(xì)展開了。
原生Change Stream目前使用上有如下限制:
1. 支持DDL事件不完善
Change Stream目前支持的事件如下:
- Insert Event
- Update Event
- Replace Event
- Delete Event
- Drop Event
- Rename Event
- DropDatabase Event
- invalidate Event
顯然上述事件并沒有完全覆蓋MongoDB內(nèi)部全部的數(shù)據(jù)變更的事件。
此外,對于在集合上監(jiān)聽的Change Streams, 當(dāng)出現(xiàn)集合或者所屬的DB被刪除后,會觸發(fā)一個invalidate Event, 該事件會把Change Streams的cursor關(guān)閉掉,導(dǎo)致Change Streams無法繼續(xù)進行,對于通過Change Streams來實現(xiàn)容災(zāi)的場景,顯然是不夠友好的,需要重新建立新的Change Streams監(jiān)聽。
2. 事件拉取性能有待提升
如上述分析,當(dāng)前的Change Streams請求發(fā)到Mongos節(jié)點后,通過單線程的方式向每個Shard節(jié)點發(fā)送異步請求命令來完成數(shù)據(jù)的拉取,并做數(shù)據(jù)歸并,如果將該方式替換為多線程并發(fā)拉取,對于分片表來說,性能會有提升。
三、 并行Change Streams架構(gòu)和原理
3.1 并發(fā)Change Streams架構(gòu)介紹
針對上述的一些使用限制,我們結(jié)合實際客戶使用需求,提出一種新的并發(fā)Change Streams(Parallel Change Streams)的方式,來嘗試解決上述問題。
為了提升原生Change Streams的性能,我們在Mongos 節(jié)點引入如下幾個新的組件:
- Change Streams Buffer
與Shard是一對一的關(guān)系。每個Change Streams Buffer 默認(rèn)1GB,在Buffer滿之前,該Buffer無條件的向?qū)?yīng)的Shard(secondary節(jié)點)拉取Change Streams數(shù)據(jù)。
- Merged Queue
Merged Queue是一個內(nèi)存隊列,是Change Streams Buffer的消費者,是 Bucket的生產(chǎn)者。Merged Queue 歸并所有Shard的Change Streams Buffer,并等待合適的時機按照規(guī)則放入對應(yīng)Client的Bucket。
- Bucket
Bucket 是一個內(nèi)存隊列,是MergedQueue的消費者,是Client的生產(chǎn)者。每個Client對應(yīng)一個Bucket。每個Bucket維護該Bucket內(nèi)所有文檔的的集合。
- Merged Queue 與Bucket的交互過程
Merged Queue不停的從頭部拿出盡可能多的數(shù)據(jù),并從前往后的按照hash(document.ns)%n的規(guī)則放入對應(yīng)的Bucket, document.ns是指這個文檔的NameSpace, 所以同一個集合的數(shù)據(jù)一定在一個Bucket里面。
3.2 對DDL事件的增強
并發(fā)Change Stream除了支持原生的Change Stream外,還新增支持如下事件:
- CreateCollection Event
- CollMod Event
- CreateIndex Event
- Drop Index Event
- CreateView Event
- DropView Event
- ShardCollection Event
本文以ShardCollection為例來說明如何實現(xiàn)新增DDL事件的支持:
當(dāng)執(zhí)行ShardCollection命令的時候,Config節(jié)點會向該集合的主Shard發(fā)送一個shardsvrShardCollection命令,主Shard收到改請求后,我們在該命令的處理流程中記錄了一個type為noop的oplog, 并把該命令的詳細(xì)內(nèi)容寫入到oplog的o2字段里面,以此來實現(xiàn)shardcollecton事件的追蹤。
之后在處理Change Streams流程的pipeline中,我們對noop事件進行分析,如果其中內(nèi)容包括了shardCollection事件相關(guān)的標(biāo)記,則提取該事件,并返回給上層。
3.3 如何使用
1 如果想創(chuàng)建并發(fā)change Stream,需要先通過如下命令創(chuàng)建bucket和cursor:
- db.runCommand(
- {
- parallelChangeStream: 1,
- nBuckets: Required,<int>,
- nsRegex: Optional,<Regex>,
- startAtOperationTime: Optional,<Timestamp>,
- })
參數(shù)說明如下:
parallelChangeStream :開啟并行changeStream
nBuckets:要創(chuàng)建的bucket的數(shù)目
nsRegex:可選,定義要訂閱的集合,一個正則表達式。
startAtOperationTime:可選,表示訂閱的事件從哪個時間點開始。
返回值:
- "cursors" : [
- NumberLong("2286048776922859088"),
- NumberLong("2286048779108179584"),
- NumberLong("2286048780088774662"),
- NumberLong("2286048777169702425"),
- NumberLong("2286048779233363970"),
- NumberLong("2286048779250024945"),
- NumberLong("2286048776628281242"),
- NumberLong("2286048778209018113"),
- NumberLong("2286048778833886224"),
- NumberLong("2286048777951363227")
- ]
Cursors :返回的Mongos側(cè)的Cursor ID。
當(dāng)獲取到所有Cursor ID后,客戶端就可以并發(fā)的(每個CursorId一個線程)通過getMore命令不斷的從服務(wù)端拉取結(jié)果了。
斷點續(xù)傳
ParallelChangeStream的斷點續(xù)傳通過startAtOperationTime實現(xiàn),由于每個cursor的消費進度不一樣,恢復(fù)的斷點應(yīng)該選用n個cursor的消費值的最小值。
四、性能對比
針對新的Parallel Change Stream和原生的Change Streams ,我們做了較長時間的對比測試分析,所有測試場景采用的測試實例如下:
實例規(guī)格:4U16G, 2個Shard(副本集) ,2個Mongos,
磁盤容量:500G
測試數(shù)據(jù)模型:通過YCSB 預(yù)置數(shù)據(jù),單條記錄1K , 單個分片表1000w條記錄。
下面分幾個場景分別介紹:
1. 集群模式1分片表場景測試
測試方法:
1) 創(chuàng)建一個Hash分片的集合,預(yù)置16 Chunk
2) 啟動YCSB , 對該集合進行Load數(shù)據(jù)操作,Load數(shù)據(jù)量為1000w ,設(shè)置的Oplog足夠大,保證這些操作還在Oplog中
3) 分別啟動原生Change Streams 和 Parallel Change Streams,通過指定startAtOperationTime來觀察訂閱1000w條記錄分別需要花費的時間。
4) 由于是單個表, nBuckets 為1
測試數(shù)據(jù)如下:
讀取總數(shù)據(jù)量 | 花費總時間(ms ) | TPS( 個/s) | |
Change Streams | 1000w | 432501 | 23148 |
Parallel Change Streams(1 bucket) | 1000w | 184437 | 54361 |
2. 集群模2分片表場景測試
測試方法:
1) 創(chuàng)建2個Hash分片的集合,預(yù)置16 Chunk
2) 啟動YCSB , 同時對這2個集合進行Load數(shù)據(jù)操作,每個集合Load數(shù)據(jù)量為1000w ,設(shè)置的Oplog足夠大,保證這些操作還在Oplog中
3) 分別啟動原生Change Streams和Parallel Change Streams,通過指定startAtOperationTime來觀察訂閱4000w條記錄分別需要花費的時間。
4) 由于是2個表, nBuckets 為2
測試數(shù)據(jù)如下:
讀取總數(shù)據(jù)量 | 花費總時間(ms ) | TPS( 個/s) | |
Change Streams | 4000w | 2151792 | 18484 |
Parallel Change Streams | 4000w | 690776 | 55248 |
3. 集群模式4分片表場景測試
測試方法:
1) 創(chuàng)建4個Hash分片的集合,預(yù)置16 Chunk
2) 啟動YCSB , 同時對這4個集合進行Load數(shù)據(jù)操作,每個集合Load數(shù)據(jù)量為1000w ,設(shè)置的Oplog足夠大,保證這些操作還在Oplog中
3) 分別啟動原生Change Streams和Parallel Change Streams,通過指定startAtOperationTime來觀察訂閱4000w條記錄分別需要花費的時間。
4) 由于是4個表, nBuckets 為4
測試數(shù)據(jù)如下:
讀取總數(shù)據(jù)量 | 花費總時間(ms ) | TPS( 個/s) | |
Change Streams | 4000w | 2151792 | 18596 |
Parallel Change Streams | 4000w | 690776 | 56577 |
總結(jié):通過實際測試可以看出來, Parallel Change Streams這種方式性能有極大的提升,實際上我們后續(xù)根據(jù)實例規(guī)格,通過調(diào)整內(nèi)部Bucket和Buffer的緩存大小,性能還可以繼續(xù)提升,同時隨著分片表數(shù)據(jù)量和Shard節(jié)點數(shù)量的變多,和原生Change Streams 的性能優(yōu)勢會更加明顯。
五、并發(fā)Change Streams使用場景分析
并發(fā)Change Streams非常適合在MongoDB集群的容災(zāi)場景,應(yīng)用可以有針對性的設(shè)置對特定的集合或者DB進行監(jiān)聽,可以實時的感知到源端實例的數(shù)據(jù)變化,并快速的應(yīng)用到目標(biāo)端,整體實現(xiàn)較低RPO。
此外,并發(fā)Change Streams也可以應(yīng)用到PITR場景中, 通過并發(fā)Change Streams良好的性能,實時實現(xiàn)動態(tài)數(shù)據(jù)的跟蹤并記錄,使得PITR的可恢復(fù)時間更短。
六、未來展望
當(dāng)前的并行Change Streams的實現(xiàn)中,merge queue中的事件分發(fā)到bucket的事件中,我們采用的策略是基于事件的NameSpace的HASH值,傳遞給對應(yīng)的bucket中,這種策略對于單集合的場景,性能優(yōu)化有限,后續(xù)我們計劃同時提供基于事件的ID內(nèi)容的HASH值,把事件分發(fā)到不同的bucket中,這種方式能進一步的提升系統(tǒng)并發(fā)性能,帶來更好的性能優(yōu)化效果。
七、總結(jié)
通過引入一種新的并發(fā)Change Streams的方式,支持更多類別的MongoDB事件的訂閱,同時在事件監(jiān)聽的性能方面相比原生有較大的提高,可以廣泛應(yīng)用在數(shù)據(jù)庫實例容災(zāi), PITR,數(shù)據(jù)在線遷移業(yè)務(wù)場景中,為客戶帶來更好的體驗。