Kafka多種跨IDC災(zāi)備方案調(diào)研對比
1.前言
為了盡量減少自然和人為災(zāi)難(如停電、災(zāi)難性軟件故障和網(wǎng)絡(luò)中斷)對業(yè)務(wù)的影響,以及隨著我行基于Kafka的實時業(yè)務(wù)不斷增長,Kafka的重要性日益增長,在我行逐步優(yōu)化跨IDC的Kafka連續(xù)性建設(shè)已經(jīng)成為我們目前亟待解決的問題。
本文就目前已有的災(zāi)備方案在元數(shù)據(jù)同步、數(shù)據(jù)復(fù)制、消費位移同步、災(zāi)備模式等方面進行調(diào)研對比。
2.現(xiàn)有災(zāi)備方案
方案 | 描述 | 使用方 |
MirrorMaker1(簡稱MM1) | 原理是啟動消費者從源集群進行消費,然后發(fā)送到目標集群,功能較簡單 | |
MirrorMaker2(簡稱MM2)或 基于MM2的改進 | 基于Kafka Connect框架實現(xiàn),由LinkedIn工程師貢獻,修復(fù)MM1的局限性,Topic和分區(qū)可自動感知,acl和配置可自動同步,支持雙活,提供offset轉(zhuǎn)換功能 | 360 |
Confluent Replicator | Confluent收費版,與MM2相比,雙活模式更優(yōu)雅,可支持單條消息的修改 | Confluent |
基于Follower的同步機制 | 利用Kafka的副本同步機制創(chuàng)建Fetcher線程同步數(shù)據(jù),需要在原生Kafka上進行二次開發(fā) | 字節(jié)、滴滴 |
uReplicator | 改進MM1,利用分布式的任務(wù)管理框架Apache Helix控制Partition的分配,不需要全部rebalance | Uber |
brooklin | 改進MM1,實現(xiàn)思路和MM2類似,與uReplicator一樣,為了減少rebalance,采用Sticky Assignment控制Partition的分配,除了支持Kafka集群間的復(fù)制,還能作為Azure Event Hubs,AWS Kinesis流式服務(wù)之間的通道,另外還能作為CDC連接器 |
3.各方案的主要設(shè)計點對比分析
3.1 元數(shù)據(jù)同步
元數(shù)據(jù)同步主要是指Topic、Partition、Configuration、ACL的同步,我們需要評估各方案在新增Topic,分區(qū)擴容后、修改Configuration和ACL后能否自動感知,以及評估方案中選擇復(fù)制的Topic是否靈活(比如是否支持白名單、黑名單機制,是否支持正則),目標集群中Topic名稱是否發(fā)生改變(決定是否支持雙向復(fù)制,是否會發(fā)生循環(huán)復(fù)制)。
MM1方案中,選擇復(fù)制的Topic只支持白名單機制(--whitelist或者--include參數(shù)指定),且白名單支持正則寫法,但是當源集群新增Topic后,目標集群的auto.create.topics.enable設(shè)置為true時,才能自動在目標集群創(chuàng)建相同名稱的Topic(可以擴展messagehandler改名),否則必須重啟MirrorMaker才能發(fā)現(xiàn)新增的Topic,關(guān)于目標集群上的Topic的分區(qū)數(shù),MM1是按默認值num.partitions進行配置(其他方案均無該問題),無法和源集群上保持一致,ACL也無法同步。
相比MM1,MM2彌補了上述不足,主要是依賴MirrorSourceConnector里的多個定時任務(wù)實現(xiàn)該功能,更新Topic/Partition、Configuration、ACL的間隔時長分別由三個參數(shù)指定,非常靈活。在MM2中,目前截至3.0.0的版本,支持兩種復(fù)制策略,默認的DefaultReplicationPolicy中目標集群中復(fù)制后Topic名稱發(fā)生變化,前面會加一個源集群的前綴,為了兼容MM1,3.0.0中新增的IdentityReplicationPolicy中目標集群中復(fù)制后Topic名稱不會發(fā)生變化。
Confluent Replicator,根據(jù)官網(wǎng)描述,也同樣具備上述功能,原理和MM2類似,只是檢測更新只由一個參數(shù)確定。Replicator可以定義復(fù)制后Topic的名稱,由參數(shù)topic.rename.format指定,默認值是保持Topic名稱不變。
基于Follower的同步機制的方案,由于網(wǎng)上資料不足,具體實現(xiàn)無法得知,但是原理估計和MM2類似,復(fù)制后在目標集群中Topic名稱保持不變。
uReplicator的實現(xiàn)略有不同,復(fù)制哪些Topic,由參數(shù)enableAutoWhitelist和patternToExcludeTopics一起決定,當enableAutoWhitelist設(shè)置為true時,若源集群和目標集群中存在相同Topic,那么不需要其他設(shè)置即可實現(xiàn)數(shù)據(jù)復(fù)制,若設(shè)置為false,需要將復(fù)制的Topic名稱等信息提交給uReplicator Controller,由該Controller來控制分區(qū)的分配,另外黑名單參數(shù)patternToExcludeTopics控制哪些Topic不用復(fù)制;分區(qū)擴容是否自動感知,是由參數(shù)enableAutoTopicExpansion控制的;關(guān)于Configuration和ACL無法實現(xiàn)同步。
brooklin選擇復(fù)制的Topic只支持白名單機制,可支持正則,新增Topic和分區(qū)擴容后可自動感知,檢測更新由參數(shù)partitionFetchIntervalMs確定,復(fù)制后Topic名稱前可加前綴,由參數(shù)DESTINATION_TOPIC_PFEFIX確定。
總結(jié)如下:
方案 | MM1 | MM2 | Confluent Replicator | 基于Follower的同步機制 | uReplicator | brooklin |
復(fù)制后Topic名稱變化 | 不變,也可自定義 | 可保持不變,也可以增加固定前綴 | 可保持不變,也可以自定義 | 不變 | 不變 | 可保持不變,也可定義前綴 |
自動檢測和復(fù)制新Topic | 部分支持(取決于目標集群的自動創(chuàng)建topic是否開啟) | 支持 | 支持 | 取決于二次開發(fā)的功能 | 不支持 | 支持 |
自動檢測和復(fù)制新分區(qū) | 不支持 | 支持 | 支持 | 取決于二次開發(fā)的功能 | 支持 | 支持 |
源集群和目標集群總Topic配置一致 | 不支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
配置和ACL更新是否同步 | 不支持 | 支持 | 支持 | 取決于二次開發(fā)的功能 | 不支持 | 不支持 |
選擇復(fù)制Topic的靈活度:是否具有白名單、黑名單和正則表達式的主題 | 部分支持 | 支持 | 支持 | 取決于二次開發(fā)的功能 | 部分支持 | 部分支持 |
3.2 數(shù)據(jù)復(fù)制
數(shù)據(jù)復(fù)制是災(zāi)備方案的最核心點之一,我們需要評估各方案中復(fù)制后消息offset能否對齊,復(fù)制期間數(shù)據(jù)的一致性能否保證即是否會丟失數(shù)據(jù)或者會出現(xiàn)重復(fù)數(shù)據(jù)。首先說明一下,由于復(fù)制會有延遲,因此所有這些災(zāi)備方案里RPO都不等于0。
基于Follower的同步機制的方案可以保持offset對齊,由于副本同步存在延遲,當主機房異常時,備機房上仍有丟失部分數(shù)據(jù)的可能性,offset可保持一致,不會出現(xiàn)重復(fù)數(shù)據(jù)的可能性。其他方案均不能保證offset對齊(除非是復(fù)制時源Topic的offset從0開始),關(guān)于每個方案中消費者從源集群消費,再寫入到目標集群的邏輯,我們一一詳細解釋下:
先從MM1開始,這是他的設(shè)計架構(gòu):
在KIP-3 MirrorMaker Enhancement里,設(shè)計了上述架構(gòu),從以下幾處保證不丟數(shù):
1.關(guān)掉消費者的自動提交位移,提交位移之前會調(diào)用producer.flush()刷出緩存里數(shù)據(jù)
2.在producer端,通過設(shè)置這幾個參數(shù)max.in.flight.requests.per.connection=1(多個consumer共享一個producer,這個producer每次只給broker發(fā)一個request),retries=Int.MaxValue(返回是可重試異常,無限次重試直到緩沖區(qū)滿),ack=-1(發(fā)給所有副本)
3.設(shè)置abortOnSendFail,當producer端收到不可重試異常后(比如消息過大之類的異常),停止MirrorMaker進程,否則會丟失發(fā)送失敗的部分數(shù)據(jù)
另外為了避免在consumer發(fā)生rebalance的是時候出現(xiàn)重復(fù)數(shù)據(jù)(rebalance時候有些數(shù)據(jù)位移沒提交),定義了一個新的consumerRebalance監(jiān)聽器,在發(fā)生partitionRevoke的時候,先刷出producer緩存里數(shù)據(jù),再提交位移。
從上面設(shè)計來看,MM1是不丟數(shù),但是還是存在數(shù)據(jù)重復(fù)的可能性,這是Kafka的非冪等Producer決定的,另外MM1的設(shè)計還有很多缺陷,比如只有一個Producer,發(fā)送效率低,另外這個Producer是輪詢發(fā)送,消息發(fā)送到目的Topic上的分區(qū)和源Topic的分區(qū)不一定一致,由于是輪詢,這個Producer和集群里每個broker會建立連接。對比uReplicator,同樣也是在flush之后再提交位移去避免丟數(shù),在MM1的缺陷都得到了改進,每個WorkerInstance里有多個FetcherThread和多個ProducerThread,從源集群fetch數(shù)據(jù)后會放到一個隊列里,ProducerThread從隊列里取走數(shù)據(jù)并發(fā)到目標集群的Topic,每條消息發(fā)送到目的Topic上分區(qū)和源分區(qū)保持一致,可以保持語義上一致。
在brooklin中,每個Brooklin Instance中可以起多個Consumer和Producer,也可保持語義上一致,比uReplicator更有優(yōu)勢的一處就是提供了flushless的生產(chǎn)者(也可提供flush的Producer),哪些消息發(fā)送成功,才會提交這些位移,因為調(diào)用Producer.flush()可以將緩沖區(qū)的數(shù)據(jù)強制發(fā)送,但是代價較高,在清空緩沖前會堵塞發(fā)送線程。
consumer.poll()->producer.send(records)->producer.flush()->consumer.commit()
優(yōu)化為:
consumer.poll()->producer.send(records)->consumer.commit(offsets)
在MirrorMaker2中,采用Kafka Connect框架進行復(fù)制數(shù)據(jù),從源端消費數(shù)據(jù)后,存到一個類型為IdentityHashMap的內(nèi)存結(jié)構(gòu)outstandingMessages中,Producer發(fā)送到目的端成功后,會從該內(nèi)存結(jié)構(gòu)中刪除該消息,另外會定時將從源端消費的進度保存到Kafka Topic中。這種實現(xiàn)機制不會丟失數(shù)據(jù),但是Producer發(fā)送成功后,未將進度持久化前進程異常掛掉,那么會產(chǎn)生重復(fù)消息。目前在KIP-656: MirrorMaker2 Exactly-once Semantics提出了一種可實現(xiàn)Exactly Only Once的方案,思路是將提交消費位移和發(fā)送消息放在一個事務(wù)里,但是相關(guān)Patch KAFKA-10339仍然沒被合進主分支,最后更新停留在20年8月份。
根據(jù)Confluent Replicator官網(wǎng)描述,復(fù)制不會丟數(shù),但是可能會重復(fù),因此和上述MM2、uReplicator、brooklin一樣,提供的都是At least Once Delivery消息傳遞語義。
方案 | MM1 | MM2 | Confluent Replicator | 基于Follower的同步機制 | uReplicator | brooklin |
復(fù)制前后分區(qū)語義一致 | 不支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
offset對齊 | 不能 | 不支持 | 不支持 | 支持 | 不支持 | 不支持 |
消息傳遞語義 | 不丟數(shù),可能重復(fù) At least Once | 不丟數(shù),可能重復(fù) At least Once, 未來會提供EOS語義 | 不丟數(shù),可能重復(fù) At least Once | 取決于二次開發(fā)的功能, 從Kafka副本同步的原理看, 在參數(shù)設(shè)置合理的情況下,在副本之間同步過程中數(shù)據(jù)可保持一致 | 不丟數(shù),可能重復(fù) At least Once | 不丟數(shù),可能重復(fù) At least Once |
3.3 消費位移同步
災(zāi)備方案中除數(shù)據(jù)復(fù)制,消費位移的同步也非常關(guān)鍵,災(zāi)備切換后消費者是否能在新的集群中恢復(fù)消費,取決于consumer offset是否能同步。
在MM1設(shè)計中,若要同步消費位移,只能將__consumer_offsets作為一個普通的Topic進行同步,但是由于源集群和目標集群的offset可能存在不對齊的情況,因此無法進行offset轉(zhuǎn)換。
在MM2設(shè)計中,解決了上述MM1問題,設(shè)計思路是會定期在目標集群的checkpoint Topic中記錄消費位移,包括源端和目標端的已提交位移,消息包括如下字段:
- consumer group id (String) 消費組
- topic (String) – includes source cluster prefix topic名稱
- partition (int) 分區(qū)名稱
- upstream offset (int): latest committed offset in source cluster 源集群的消費位移
- downstream offset (int): latest committed offset translated to target cluster 目標集群的消費位移
- metadata (String) partition元數(shù)據(jù)
- timestamp
另外,還設(shè)計了一個offset sync Topic用于記錄源端和目的端offset的映射。
同時,MM2還提供了MirrorClient接口做位移轉(zhuǎn)換:
// Find the local offsets corresponding to the latest checkpoint from a specific upstream consumer group.
Map<TopicPartition, OffsetAndMetadata> remoteConsumerOffsets(String consumerGroupId,
``String remoteClusterAlias, Duration timeout)
在uReplicator中,另外設(shè)計了一個offset Sync的服務(wù),跟MM2類似(可能是MM2參考了uReplicator的設(shè)計),這個服務(wù)可以實時收集不同集群offset 的映射關(guān)系,計算出從一個DC切換到另一個DC后需要從哪個 offset 進行讀取。
在brooklin中,沒有類似uReplicator里的offset Sync服務(wù),需要自己實現(xiàn)。
在Confluent Replicator中,用另外一種思路解決該問題,不同DC的時間是一致的,在Kafka的消息里包含時間戳,5.0 版引入了一項新功能,該功能使用時間戳自動轉(zhuǎn)換偏移量,以便消費者可以故障轉(zhuǎn)移到不同的數(shù)據(jù)中心并開始在目標集群中消費他們在源集群中中斷的數(shù)據(jù)。要使用此功能,需要在Consumer中設(shè)置Consumer Timestamps Interceptor 的攔截器,該攔截器保留消費消息的元數(shù)據(jù),包括:
? Consumer group ID
? Topic name
? Partition
? Committed offset
? Timestamp
此消費者時間戳信息保存在位于源集群中名為 __consumer_timestamps 的 Kafka Topic中。然后Replicator通過以下步驟進行offset轉(zhuǎn)換:
- 從源集群中的 consumer_timestamps 主題中讀取消費者偏移量和時間戳信息,以獲取消費者組的進度
- 將源數(shù)據(jù)中心中的已提交偏移量轉(zhuǎn)換為目標數(shù)據(jù)中心中的相應(yīng)偏移量
- 將轉(zhuǎn)換后的偏移量寫入目標集群中的 __consumer_offsets 主題
那么消費者切換到目標中心的集群后,可繼續(xù)進行消費。
基于Follower的同步機制方案,Topic完全一致,只要將__consumer_offsets也同步,那么消費者故障轉(zhuǎn)移后仍可繼續(xù)消費。
在消費位移同步方面,各方案總結(jié)如下:
方案 | MM1 | MM2 | Confluent Replicator | 基于Follower的同步機制 | uReplicator | brooklin |
復(fù)制消費位移 | 部分支持 | 支持 | 支持 | 支持 | 部分支持 | 部分支持 |
offset轉(zhuǎn)換 | 不支持 | 支持 | 支持 | 不需要 | 支持 | 不支持 |
客戶端切換 | 客戶端自定義 seek offset | 通過接口獲取目標集群 的offset,再seek | 不需要做額外轉(zhuǎn)換, 啟動即可 | 不需要做額外轉(zhuǎn)換, 啟動即可 | 通過sync topic服務(wù) 查看目標集群的offset,再seek | 客戶端自定義 seek offset |
3.4 是否支持雙活
為了提升資源利用率,災(zāi)備模式的選取也是一個重要考量點。
MM1是不支持雙活模式的,兩個集群無法配置為相互復(fù)制(“Active/Active”),主要是因為如果在兩個集群中若存在相同名稱的Topic,無法解決Topic循環(huán)復(fù)制的問題。
MM1這個可能循環(huán)復(fù)制的問題在MM2中解決,解決思路是復(fù)制后的Topic與原Topic名稱不一致,會加上源集群的名稱作為前綴,例如如下示例中,A集群中的topic1在復(fù)制到B集群后,名稱變更為A.topic1。
但是MM2默認的DefaultReplicationPolicy是復(fù)制后Topic名稱改變,對客戶端來說會增加切換代價,可以考慮改成IdentityReplicationPolicy,這種復(fù)制策略只能支持單向復(fù)制,主集群提供業(yè)務(wù)服務(wù),即Active/Standy模式。
在Confluent Replicator 5.0.1中,為了避免循環(huán)復(fù)制,利用了KIP-82 Add Record Headers的特性,在消息的header里加入了消息來源,如果目標集群的集群 ID 與header里的源集群 ID 匹配,并且目標Topic名稱與header的Topic名稱匹配,則 Replicator 不會將消息復(fù)制到目標集群。如下圖所示:
DC-1的m1復(fù)制后DC-2,消息的header里加入了標記,這條消息是從DC-1復(fù)制過來的,那么Replicator不會把DC-2的m1再復(fù)制到DC-1,同理,DC-1的m2也不會復(fù)制到DC-2。因此Confluent Replicator是可以支持Active/Active模式的。
在uReplicator中,通過數(shù)據(jù)的冗余提供Region級別的故障轉(zhuǎn)移,在這種設(shè)計中,每個區(qū)域除部署一套本地Kafka集群,還會部署一套聚合集群,這套聚合集群里存儲了所有區(qū)域的數(shù)據(jù)。
當區(qū)域集群A和B中存在相同Topic,那么匯聚后,在區(qū)域A和B中的消息offset可能不一致,uReplicator設(shè)計了一個offset管理服務(wù),會記錄這個對應(yīng)關(guān)系,示例如下:
這種設(shè)計中,可以支持消費者的Active/Active和Active/Standy模式,前者是每個區(qū)域起一個消費者消費聚合集群的的數(shù)據(jù),只有一個區(qū)域是主區(qū)域,只有主區(qū)域的數(shù)據(jù)可以更新數(shù)據(jù)到后端數(shù)據(jù)庫中,當主區(qū)域故障后,指定新的主區(qū)域,新的主區(qū)域繼續(xù)消費計算,在Active/Standy模式中,所有區(qū)域中只有一個消費者,該區(qū)域故障后,在其他區(qū)域啟動一個消費者,根據(jù)offset管理服務(wù)里記錄的offset對應(yīng)關(guān)系,從每個區(qū)域的區(qū)域集群中找到所有最新的checkpoints,然后根據(jù)該checkpoints在Standy區(qū)域的聚合集群查找最小offset,從Standy區(qū)域的該offset開始消費。
在brooklin中,也可以通過類似uReplicator的設(shè)計利用數(shù)據(jù)的冗余實現(xiàn)Active/Active災(zāi)備模式。
在字節(jié)介紹的災(zāi)備方案中,Producer只能往主集群寫(主備集群中的信息是存儲在配置中心里的,客戶端需要先從配置中心查詢),Producer可以在雙中心部署,但是通過配置中心路由到主集群,Consumer也可在雙中心部署,若采用Active/Standy模式,各自消費本地機房的數(shù)據(jù),但是只有主集群里消費者的消費位移可以生效,在采用Active/Active模式下,消費者只能從主集群進行消費,這兩種模式下,都是將雙中心所有消費者的消費位移采用一個存儲統(tǒng)一存儲。
在災(zāi)備模式方面,各方案總結(jié)如下:
方案 | MM1 | MM2 | Confluent Replicator | 基于Follower的同步機制 | uReplicator | brooklin |
雙集群是否可互相復(fù)制 | 不支持 | 支持 | 支持 | 不支持 | 支持,依靠聚合集群 | 支持,依靠聚合集群 |
Producer Active/Active | 不支持 | 支持 | 支持 | 支持,但是其實只寫入主集群 | 支持 | 支持 |
Consumer Active/Standy | 不支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
Consumer Active/Active | 不支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
4.各方案的主要設(shè)計點總結(jié)
總結(jié)來說,這些方案里歸結(jié)為三類:
1.Kafka社區(qū)的設(shè)計路線方案,從源集群消費,再寫入到目標集群,包含MM1,MM2,uReplicator,brooklin這幾種方案,MM2是參考了uReplicator的設(shè)計,實現(xiàn)方案和brooklin類似,那么在這四種方案中,MM2可以作為優(yōu)先考慮方案。
2.Confluent Replicator的商業(yè)收費方案,也是利用Kafka Connect框架進行消費寫入,在避免Topic循環(huán)復(fù)制和消費位移轉(zhuǎn)換方面做得非常出色,客戶端切換的代價很低。
3.以字節(jié)、滴滴為代表的基于Follower同步機制的方案,這種方案里復(fù)制后的Topic是源Topic的鏡像,客戶端不需要做offset轉(zhuǎn)換,需要改造Kafka代碼,考慮到后續(xù)和原生Kafka代碼的版本融合,技術(shù)要求較高。
目前來說,沒有一個完美的解決方案,各公司可根據(jù)自身實際需求制定。