MQ選型:一文詳解Kafka與RocketMQ區(qū)別
引言
在做MQ技術(shù)選型的時(shí)候,Kafka和RocketMQ是常用的兩個(gè)消息隊(duì)列中間件,今天就從架構(gòu)設(shè)計(jì)、性能分析、使用場(chǎng)景來比較一下兩者的區(qū)別,到底該使用哪個(gè)MQ?
Kafka最初由LinkedIn開發(fā),后來成為Apache的一個(gè)頂級(jí)項(xiàng)目,它設(shè)計(jì)之初就是為處理大規(guī)模數(shù)據(jù)而生,特別擅長于高吞吐量的場(chǎng)景。Kafka廣泛應(yīng)用于日志收集、流式處理、事件驅(qū)動(dòng)架構(gòu)等多種場(chǎng)景,被許多知名企業(yè)采用,如Netflix、Uber和Twitter等。
RocketMQ原為阿里巴巴的內(nèi)部消息中間件,后來同樣成為了Apache的頂級(jí)項(xiàng)目。它在保證消息的高可靠性和順序性方面表現(xiàn)出色,非常適合金融行業(yè)等對(duì)數(shù)據(jù)一致性和可靠性要求極高的場(chǎng)景。除此之外,RocketMQ也支持多種消息傳遞模式,包括順序消息、延時(shí)消息和批量消息,能夠滿足復(fù)雜應(yīng)用場(chǎng)景的需求。
架構(gòu)設(shè)計(jì)
Kafka的基礎(chǔ)架構(gòu)
Apache Kafka 是一個(gè)分布式發(fā)布-訂閱消息系統(tǒng),設(shè)計(jì)之初的目標(biāo)是處理大規(guī)模的數(shù)據(jù)流,并且以高吞吐量和低延遲為特點(diǎn)。Kafka 的架構(gòu)主要由以下幾個(gè)部分組成:
- Producer(生產(chǎn)者):生產(chǎn)者是發(fā)送消息到Broker集群。生產(chǎn)者將消息發(fā)送到指定的主題,Kafka根據(jù)配置的分區(qū)策略(如輪詢、按鍵哈希等)將消息分配到不同的分區(qū)。
- Consumer(消費(fèi)者):消費(fèi)者從Broker讀取消息。消費(fèi)者可以獨(dú)立運(yùn)行或分組在一起運(yùn)行。分組中的消費(fèi)者共享訂閱的主題,Kafka平衡各個(gè)消費(fèi)者的負(fù)載,確保每個(gè)分區(qū)只被組內(nèi)的一個(gè)消費(fèi)者讀取。
- Broker(消息代理服務(wù)器):Broker是Kafka集群中的一個(gè)服務(wù)器,負(fù)責(zé)存儲(chǔ)數(shù)據(jù)和處理對(duì)數(shù)據(jù)的讀寫請(qǐng)求。每個(gè)Broker可以存儲(chǔ)一個(gè)或多個(gè)主題的數(shù)據(jù)。一個(gè)Kafka集群可以包含多個(gè)Broker,以提高容量和提供容錯(cuò)能力。
- Topic(主題): Topic是邏輯概念。生產(chǎn)者寫入消息到指定的Topic,消費(fèi)者從Topic讀取消息。Topic在邏輯上被分割為一個(gè)或多個(gè)分區(qū),這允許數(shù)據(jù)在多個(gè)Broker之間進(jìn)行負(fù)載均衡。
- Partition(分區(qū)): 分區(qū)是Topic的物理分段,每個(gè)分區(qū)是一個(gè)有序的、不可變的消息日志。分區(qū)可以分布在集群中的不同Broker上。每個(gè)分區(qū)都由一系列有序的、不斷增加的消息組成,每條消息都被分配一個(gè)順序的標(biāo)識(shí)符稱為偏移量。
- ZooKeeper:Kafka使用ZooKeeper來維護(hù)集群狀態(tài)、配置信息和進(jìn)行領(lǐng)導(dǎo)者選舉。
圖片
RocketMQ的基礎(chǔ)架構(gòu)
RocketMQ主要由四個(gè)基本組件構(gòu)成:
- NameServer(命名服務(wù)器):NameServer是RocketMQ網(wǎng)絡(luò)中的注冊(cè)中心和路由中心,提供輕量級(jí)服務(wù)發(fā)現(xiàn)和路由功能。每個(gè)Broker啟動(dòng)時(shí)都會(huì)在所有NameServer上注冊(cè)自己的路由信息,包括當(dāng)前Broker的IP地址、提供的Topic等信息。消費(fèi)者和生產(chǎn)者通過查詢NameServer來獲取Topic的路由信息。
- Broker(消息代理服務(wù)器):Broker是消息處理的核心節(jié)點(diǎn),負(fù)責(zé)存儲(chǔ)消息、驗(yàn)證和服務(wù)消息傳輸。RocketMQ支持多個(gè)Broker配置,可以是同步或異步復(fù)制數(shù)據(jù)以確保高可用性。Broker處理大量的數(shù)據(jù)寫入操作,并支持消息的順序和并行處理。
- Producer(生產(chǎn)者):生產(chǎn)者負(fù)責(zé)發(fā)布消息到指定的Topic。RocketMQ支持多種消息發(fā)送模式,包括同步發(fā)送、異步發(fā)送和單向發(fā)送(不等待服務(wù)器響應(yīng))。
- Consumer(消費(fèi)者):消費(fèi)者從Broker訂閱消息并處理它們。RocketMQ支持集群消費(fèi)和廣播消費(fèi)兩種模式。在集群模式下,同一個(gè)Consumer Group中的不同Consumer實(shí)例平均分?jǐn)傁?,而在廣播模式下,每個(gè)Consumer實(shí)例都會(huì)接收到所有的消息。
- Topic和Queue: Topic是消息的分類,每個(gè)Topic可以分為若干個(gè)Queue。RocketMQ通過增加Queue數(shù)量來水平擴(kuò)展Topic的處理能力。
RocketMQ 支持多種消息模式,包括順序消息、定時(shí)/延時(shí)消息和批量消息等。此外,RocketMQ 提供了豐富的消息過濾功能,消費(fèi)者可以根據(jù)Tag或者SQL92標(biāo)準(zhǔn)進(jìn)行消息過濾,極大地增加了其靈活性和應(yīng)用場(chǎng)景。
圖片
Kafka與RocketMQ在設(shè)計(jì)哲學(xué)和優(yōu)化點(diǎn)上有所不同。Kafka更注重于處理高吞吐量的數(shù)據(jù)流,而RocketMQ則提供了更為豐富的消息模式和高級(jí)功能,特別適合需要高可靠性和復(fù)雜消息處理場(chǎng)景的業(yè)務(wù)。
消息存儲(chǔ)機(jī)制
Kafka的日志存儲(chǔ)機(jī)制
**日志文件: **Kafka 所有的消息以日志的形式存儲(chǔ)在磁盤上,并且每個(gè)Partition都是一個(gè)連續(xù)的日志文件。 **追加寫入: **Kafka采用追加寫入的方式存儲(chǔ)消息到日志文件中,新消息被添加到文件的末尾,這種方式對(duì)于磁盤I/O是非常高效的,因?yàn)樗蟛糠质琼樞驅(qū)懭?,從而極大地提高了寫入速度。但是當(dāng)Partition數(shù)量過多時(shí),順序?qū)懢妥兂闪穗S機(jī)寫,性能下降。索引文件:為了快速查找和讀取特定消息,Kafka為每個(gè)日志文件維護(hù)一個(gè)索引文件。索引文件存儲(chǔ)消息在日志文件中的偏移量和其對(duì)應(yīng)在文件中的物理位置,這樣可以在不讀取整個(gè)日志文件的情況下直接跳轉(zhuǎn)到特定的消息。
RocketMQ的存儲(chǔ)設(shè)計(jì)
RocketMQ的存儲(chǔ)系統(tǒng)主要由以下幾部分構(gòu)成:CommitLog
- 統(tǒng)一存儲(chǔ): 所有Topic的消息都存儲(chǔ)在一個(gè)名為CommitLog的文件中,每個(gè)消息都有一個(gè)全局唯一的偏移量。這種設(shè)計(jì)簡化了消息存儲(chǔ)的管理,但也要求高效的索引機(jī)制來支持快速消息查找。
- 順序?qū)懭耄?與Kafka類似,RocketMQ的CommitLog也采用順序?qū)懭氲姆绞?,以提高寫入效率和減少磁盤I/O操作。順序?qū)懭肽茱@著提高消息存儲(chǔ)的性能。
- 定期切割: RocketMQ定期切分CommitLog和消費(fèi)隊(duì)列文件,新的消息寫入到新文件中。老舊文件在滿足一定條件后可以刪除或者歸檔,以釋放存儲(chǔ)空間。
- 刷盤策略: RocketMQ提供了同步刷盤和異步刷盤兩種策略,用戶可以根據(jù)業(yè)務(wù)需求和對(duì)性能的要求選擇合適的刷盤方式。
消費(fèi)隊(duì)列(Consume Queue)
- 索引機(jī)制:為了快速檢索到CommitLog中的消息,RocketMQ為每個(gè)隊(duì)列(Queue)維護(hù)一個(gè)消費(fèi)隊(duì)列(Consume Queue)。消費(fèi)隊(duì)列存儲(chǔ)了消息在CommitLog中的偏移量、消息長度和消息標(biāo)簽的哈希碼等信息。
- 輕量級(jí)設(shè)計(jì):消費(fèi)隊(duì)列相比于CommitLog要小很多,因?yàn)樗鼉H僅存儲(chǔ)索引信息,這使得加載和查找效率更高。
索引文件(Index File)
- 可選的索引服務(wù):RocketMQ提供了一個(gè)獨(dú)立的索引服務(wù),用于快速檢索具有特定鍵(如ID、Key或是業(yè)務(wù)屬性)的消息。索引文件存儲(chǔ)了鍵到消息物理位置的映射。
- 快速查詢:索引文件加速了基于鍵的消息查詢操作,使得RocketMQ能在大數(shù)據(jù)量中快速定位消息。
文件回收與存儲(chǔ)清理RocketMQ通過定期清理舊的CommitLog文件和消費(fèi)隊(duì)列文件來回收磁盤空間,這些操作基于消息的存儲(chǔ)時(shí)間和消費(fèi)狀態(tài)。定期刪除:系統(tǒng)根據(jù)配置的文件保留策略(如時(shí)間間隔、文件大?。┳詣?dòng)刪除舊文件。數(shù)據(jù)壓縮:在必要時(shí),RocketMQ可以對(duì)存儲(chǔ)的數(shù)據(jù)進(jìn)行壓縮,以節(jié)省存儲(chǔ)空間。
高可用設(shè)計(jì)
Kafka高可用
副本機(jī)制: Kafka通過副本(replicas)機(jī)制確保數(shù)據(jù)的安全性。每個(gè)Topic可以被配置為一個(gè)或多個(gè)分區(qū)(partitions),每個(gè)分區(qū)可以有一個(gè)或多個(gè)副本。副本分布在不同的Broker上,這樣即使一個(gè)或多個(gè)Broker發(fā)生故障,Topic的數(shù)據(jù)也不會(huì)丟失。
領(lǐng)導(dǎo)者和追隨者: 每個(gè)分區(qū)有一個(gè)領(lǐng)導(dǎo)者(leader)和多個(gè)追隨者(followers)。所有的讀寫請(qǐng)求都由領(lǐng)導(dǎo)者處理,而追隨者則從領(lǐng)導(dǎo)者那里復(fù)制數(shù)據(jù)。如果領(lǐng)導(dǎo)者發(fā)生故障,系統(tǒng)會(huì)從追隨者中選舉出新的領(lǐng)導(dǎo)者。
控制器(Controller): 控制器是一個(gè)特殊的Broker節(jié)點(diǎn),負(fù)責(zé)維護(hù)領(lǐng)導(dǎo)者的選舉和副本狀態(tài)的管理。如果控制器出現(xiàn)故障,集群中的其他Broker將通過選舉產(chǎn)生新的控制器。
ZooKeeper協(xié)調(diào): Kafka使用ZooKeeper來管理集群元數(shù)據(jù)和進(jìn)行Broker之間的協(xié)調(diào),包括領(lǐng)導(dǎo)者選舉和集群成員管理。
高水位標(biāo)記(high watermark): Kafka為每個(gè)分區(qū)維護(hù)一個(gè)“高水位”(high watermark)標(biāo)記,這是所有同步副本已確認(rèn)寫入的最小偏移量。只有高于高水位的消息才被認(rèn)為是“提交”的,消費(fèi)者只能讀取到這些已提交的消息。這保證了即使在發(fā)生故障的情況下,消費(fèi)者也不會(huì)讀取到可能因故障而回滾的消息。
RocketMQ高可用
主從架構(gòu): 在Broker級(jí)別,RocketMQ采用主從架構(gòu),其中主Broker負(fù)責(zé)處理讀寫請(qǐng)求,而從Broker則負(fù)責(zé)復(fù)制主Broker的數(shù)據(jù)。如果主Broker宕機(jī),從Broker可以迅速升級(jí)為新的主Broker,接管消息服務(wù)。
NameServer的高可用: RocketMQ使用NameServer管理元數(shù)據(jù)和路由信息,NameServer采用了無狀態(tài)設(shè)計(jì),之間互不備份,每個(gè)NameServer獨(dú)立提供服務(wù)。即使部分NameServer出現(xiàn)故障,其他NameServer仍能繼續(xù)提供服務(wù)。
同步復(fù)制與異步復(fù)制: RocketMQ支持同步和異步兩種數(shù)據(jù)復(fù)制方式。在同步復(fù)制模式下,生產(chǎn)者發(fā)送的消息必須被存儲(chǔ)在所有同步副本中Broker確認(rèn)后才返回成功響應(yīng),確保了數(shù)據(jù)的強(qiáng)一致性。異步復(fù)制則強(qiáng)調(diào)高吞吐量,犧牲了一部分?jǐn)?shù)據(jù)安全性。
主要區(qū)別:
- 元數(shù)據(jù)管理:
Kafka強(qiáng)依賴Zookeeper進(jìn)行集群管理和元數(shù)據(jù)存儲(chǔ),而RocketMQ則依賴輕量級(jí)的NameServer進(jìn)行路由信息管理,不涉及集群狀態(tài)管理。
- 數(shù)據(jù)復(fù)制與故障恢復(fù):
Kafka是基于分區(qū),側(cè)重于通過領(lǐng)導(dǎo)者和追隨者模式實(shí)現(xiàn)數(shù)據(jù)復(fù)制,依賴自動(dòng)的領(lǐng)導(dǎo)者選舉來恢復(fù)服務(wù)。RocketMQ是基于Broker,提供了主從同步或異步復(fù)制,通常需要更多手動(dòng)干預(yù)來切換Master。
- 架構(gòu)和擴(kuò)展性:
Kafka的架構(gòu)設(shè)計(jì)為分布式系統(tǒng)帶來了強(qiáng)大的水平擴(kuò)展性,適合處理大規(guī)模數(shù)據(jù)流。RocketMQ通過主從復(fù)制機(jī)制提供高可靠性,適用于交易等對(duì)數(shù)據(jù)一致性要求極高的場(chǎng)景。
消息可靠性保證
Kafka消息可靠性
- 復(fù)制(Replication)
Kafka通過在多個(gè)Broker中復(fù)制每個(gè)Topic的Partition來增加數(shù)據(jù)的可靠性和系統(tǒng)的容錯(cuò)性。這意味著每個(gè)Partition都有一個(gè)Leader和多個(gè)Follower。所有的寫操作都通過Leader進(jìn)行,而Follower從Leader同步數(shù)據(jù)。如果Leader失敗,一個(gè)Follower將被自動(dòng)選舉為新的Leader,確保服務(wù)的連續(xù)性和數(shù)據(jù)的可用性。
- 確認(rèn)機(jī)制(Acknowledgments)
生產(chǎn)者在發(fā)送消息時(shí)可以指定不同級(jí)別的確認(rèn)機(jī)制來保證消息的可靠傳遞:
acks=0:生產(chǎn)者在寫入消息后不會(huì)等待任何服務(wù)器的確認(rèn),這種模式下消息可能會(huì)丟失,但延遲最低。 acks=1:生產(chǎn)者會(huì)等待Leader確認(rèn)消息已被寫入本地日志后才考慮完成請(qǐng)求。這種模式下,如果在Follower復(fù)制之前Leader發(fā)生故障,消息可能會(huì)丟失。 acks=all 或 acks=-1:生產(chǎn)者會(huì)等待所有同步副本都確認(rèn)消息已被接收,才認(rèn)為消息發(fā)送成功。這提供了最高的數(shù)據(jù)可靠性保證。
- 事務(wù)支持
從0.11版本開始,Kafka引入了事務(wù)API,支持跨多個(gè)Partition的原子寫操作。這意味著生產(chǎn)者可以發(fā)送一批消息,這些消息要么全部成功寫入,要么全部失敗,從而防止了在處理復(fù)雜業(yè)務(wù)邏輯時(shí)出現(xiàn)部分更新的情況。
- 持久化
Kafka默認(rèn)將所有消息持久化到磁盤,這不僅確保了數(shù)據(jù)在系統(tǒng)重啟后的可恢復(fù)性,還能保護(hù)數(shù)據(jù)不受系統(tǒng)故障的影響。Kafka通過順序?qū)懘疟P的方式優(yōu)化了I/O性能,即使是在高負(fù)載下也能保持高吞吐量。
- 高水位(High Watermark)
Kafka為每個(gè)Partition維護(hù)一個(gè)高水位標(biāo)記,這表示所有同步副本都確認(rèn)接收到的最小偏移量。消費(fèi)者只能讀取到高水位之前的消息,這保證了消費(fèi)者只看到已經(jīng)被所有同步副本確認(rèn)的消息,增加了讀操作的一致性。
RocketMQ消息可靠性
- 復(fù)制
同步雙寫:在默認(rèn)設(shè)置下,消息被同時(shí)寫入到主Broker(Master)和從Broker(Slave)。只有當(dāng)主從Broker都成功寫入消息后,生產(chǎn)者才會(huì)收到一個(gè)成功的響應(yīng)。這確保了即使主Broker發(fā)生故障,消息也不會(huì)丟失,因?yàn)樗呀?jīng)存在于至少一個(gè)從Broker中。異步復(fù)制:除了同步雙寫,RocketMQ還支持異步復(fù)制模式,這在提高性能和吞吐量方面更為有效,尤其是在對(duì)延遲要求不是特別嚴(yán)格的場(chǎng)景下。
- 確認(rèn)機(jī)制(Acknowledgments)
消息確認(rèn):RocketMQ 支持端到端的消息確認(rèn)機(jī)制。生產(chǎn)者在發(fā)送消息后會(huì)等待Broker的確認(rèn),只有收到確認(rèn)后才認(rèn)為消息發(fā)送成功。重試機(jī)制:如果消息在傳輸過程中失?。ɡ?,因?yàn)榫W(wǎng)絡(luò)問題或Broker處理能力達(dá)到瓶頸),RocketMQ提供了自動(dòng)重試機(jī)制。生產(chǎn)者可以配置消息的重試次數(shù)和重試間隔,以增加消息傳遞成功的概率。
- 事務(wù)消息
RocketMQ 支持事務(wù)消息,采用半消息機(jī)制(half message),允許生產(chǎn)者在發(fā)送消息的同時(shí)執(zhí)行本地事務(wù),然后根據(jù)本地事務(wù)執(zhí)行的結(jié)果來提交或回滾消息。
延遲消息
- Kafka不支持延遲消息
- RocketMQ提供了多個(gè)預(yù)定義級(jí)別的延遲消息。
可選級(jí)別有:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
示例代碼:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class DelayedMessageProducer {
public static void main(String[] args) throws Exception {
// 創(chuàng)建生產(chǎn)者,并指定生產(chǎn)者組名
DefaultMQProducer producer = new DefaultMQProducer("example_producer_group");
// 設(shè)置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 啟動(dòng)生產(chǎn)者
producer.start();
// 創(chuàng)建消息實(shí)例,指定Topic、Tag和消息體
Message message = new Message("TopicTest", "TagA", "Hello RocketMQ, I will be sent after 30 seconds".getBytes());
// 設(shè)置延遲級(jí)別為4,即30秒
message.setDelayTimeLevel(4);
// 發(fā)送消息
producer.send(message);
// 關(guān)閉生產(chǎn)者
producer.shutdown();
}
}
順序消息
- Kafka支持單分區(qū)有序
- RocketMQ 提供兩種類型的順序消息:
全局順序消息:全局順序消息確保消息全局嚴(yán)格按照發(fā)送順序被消費(fèi)。實(shí)現(xiàn)方式是將所有消息路由到同一個(gè)隊(duì)列(Queue)中。
分區(qū)順序消息:分區(qū)順序消息確保同一分區(qū)內(nèi)的消息嚴(yán)格按照發(fā)送順序被消費(fèi)。每個(gè)主題可以有多個(gè)隊(duì)列,每個(gè)隊(duì)列保證隊(duì)列內(nèi)消息的順序性。
消息過濾
- Kafka不支持在Broker層面進(jìn)行消息過濾
- RocketMQ在Broker層面提供了兩種消息過濾機(jī)制,分別是標(biāo)簽過濾和SQL表達(dá)式過濾。
1. 標(biāo)簽過濾(Tag Filtering)這是RocketMQ最基本也是最常用的消息過濾方式。生產(chǎn)者在發(fā)送消息時(shí)可以指定一個(gè)標(biāo)簽(Tag),消費(fèi)者在訂閱消息時(shí)可以指定感興趣的標(biāo)簽,Broker僅將符合標(biāo)簽的消息推送給消費(fèi)者。 發(fā)送消息時(shí)指定標(biāo)簽:
Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello world".getBytes());
消費(fèi)者訂閱指定標(biāo)簽:
consumer.subscribe("TopicTest", "TagA || TagB");
2. SQL表達(dá)式過濾(SQL92 Filtering)RocketMQ 4.4.0 版本以上支持基于 SQL92 的表達(dá)式進(jìn)行消息過濾,這提供了更為強(qiáng)大和靈活的消息選擇能力。生產(chǎn)者在發(fā)送消息時(shí)可以設(shè)置消息屬性,消費(fèi)者可以通過 SQL92 表達(dá)式對(duì)這些屬性進(jìn)行篩選。 發(fā)送消息時(shí)設(shè)置屬性:
Message msg = new Message("TopicTest", "TagA", "OrderID002", "Hello world".getBytes());
msg.putUserProperty("a", String.valueOf(10));
消費(fèi)者使用 SQL 表達(dá)式訂閱:
consumer.subscribe("TopicTest", MessageSelector.bySql("a > 5 AND b <> 'abc'"));
對(duì)于使用SQL表達(dá)式過濾,RocketMQ需要配置Broker啟用此功能。在broker.conf中設(shè)置:
enablePropertyFilter=true
消息重試
Kafka消息重試機(jī)制
Kafka支持生產(chǎn)者發(fā)送消息失敗的時(shí)候自動(dòng)重試,不支持消費(fèi)者消費(fèi)消息失敗時(shí)重試。生產(chǎn)者重試配置:
- retries:
這個(gè)參數(shù)設(shè)置了生產(chǎn)者在發(fā)送消息時(shí)可以重試的次數(shù)。默認(rèn)值通常是 0,表示不進(jìn)行重試。如果設(shè)置為大于0的值,生產(chǎn)者將在發(fā)送失敗后嘗試重新發(fā)送消息指定的次數(shù)。
- retry.backoff.ms:
這個(gè)參數(shù)用來設(shè)置每次重試之間的時(shí)間間隔(以毫秒為單位)。這可以避免在出現(xiàn)暫時(shí)性問題時(shí)過于頻繁地重試,給系統(tǒng)帶來不必要的負(fù)擔(dān)。默認(rèn)值通常是 100ms。
- max.in.flight.requests.per.connection:
此參數(shù)定義了生產(chǎn)者在收到服務(wù)器響應(yīng)之前可以發(fā)送的最大請(qǐng)求數(shù)。如果設(shè)置為1,這將保證消息是按照發(fā)送的順序?qū)懭敕?wù)器的,即使進(jìn)行了重試。如果大于1,則在高吞吐量的情況下可以提高性能,但可能會(huì)導(dǎo)致重試后消息順序的改變。
生產(chǎn)者重試代碼示例:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class ProducerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 啟用重試,重試3次
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); // 設(shè)置重試的時(shí)間間隔為300ms
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 保持消息順序
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");
producer.send(record, (RecordMetadata metadata, Exception exception) -> {
if (exception != null) {
exception.printStackTrace(); // 處理發(fā)送異常
} else {
System.out.println("Message sent successfully: " + metadata);
}
});
producer.close();
}
}
注意事項(xiàng)
- 冪等性:從Kafka 0.11版本開始,支持冪等性生產(chǎn)者,通過配置enable.idempotence=true,可以保證即使進(jìn)行重試,消息也不會(huì)在Kafka中重復(fù)。
- 錯(cuò)誤處理:應(yīng)用程序應(yīng)適當(dāng)處理重試后仍然失敗的情況,比如記錄日志、發(fā)送告警等。
RocketMQ消息重試機(jī)制
RocketMQ 既支持生產(chǎn)者發(fā)送消息失敗的時(shí)候自動(dòng)重試,也支持消費(fèi)者消費(fèi)消息失敗時(shí)重試。RocketMQ 生產(chǎn)者發(fā)送失敗重試機(jī)制:默認(rèn)重試策略
- 重試次數(shù): RocketMQ 生產(chǎn)者默認(rèn)會(huì)在消息發(fā)送失敗時(shí)自動(dòng)重試,通常默認(rèn)重試次數(shù)為 2 次(總共發(fā)送 3 次:首次發(fā)送加上兩次重試)。
- 重試間隔: 默認(rèn)的重試間隔時(shí)間是 3000 毫秒(3 秒),即在初次發(fā)送失敗后,會(huì)在 3 秒后進(jìn)行第一次重試。
可以根據(jù)實(shí)際需要配置生產(chǎn)者的重試次數(shù)和重試間隔。這通常在創(chuàng)建生產(chǎn)者實(shí)例時(shí)進(jìn)行設(shè)置:
public static void main(String[] args) throws MQClientException {
// 創(chuàng)建消息生產(chǎn)者,指定生產(chǎn)者組名
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
// 設(shè)置 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 設(shè)置重試次數(shù),默認(rèn)2次,設(shè)置為5次
producer.setRetryTimesWhenSendFailed(5);
// 設(shè)置消息發(fā)送超時(shí)時(shí)間,超過這個(gè)時(shí)間未發(fā)送成功則不再重試,默認(rèn)為3000ms
producer.setSendMsgTimeout(4000);
// 啟動(dòng)生產(chǎn)者
producer.start();
// 創(chuàng)建消息
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET));
// 發(fā)送消息
try {
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
// 關(guān)閉生產(chǎn)者
producer.shutdown();
}
RocketMQ 消費(fèi)者消費(fèi)失敗重試機(jī)制:當(dāng)消費(fèi)者從隊(duì)列中拉取到消息并且在處理過程中失?。ㄍǔJ菢I(yè)務(wù)邏輯拋出異常或者返回了錯(cuò)誤狀態(tài)),RocketMQ 提供了自動(dòng)的消息重試機(jī)制。這意味著消息不會(huì)被立即標(biāo)記為“已消費(fèi)”,而是會(huì)重新被放入隊(duì)列,稍后再次投遞給消費(fèi)者。
- 重試延遲:
RocketMQ 為消費(fèi)失敗的消息設(shè)置了一系列遞增的延遲時(shí)間等級(jí),例如,首次重試可能延遲 10 秒,隨后 30 秒、1 分鐘、2 分鐘等。 默認(rèn)情況下,RocketMQ 提供了 16 級(jí)延遲時(shí)間,最長可以延遲兩個(gè)小時(shí)。
- 重試次數(shù):
如果消息連續(xù)多次重試仍然失敗,當(dāng)重試次數(shù)達(dá)到上限后(默認(rèn)是 16 次),消息將不再進(jìn)入重試隊(duì)列。 這些“死信消息”會(huì)被轉(zhuǎn)移到一個(gè)特殊的死信隊(duì)列(DLQ,Dead-Letter Queue),開發(fā)者可以對(duì)這些消息進(jìn)行特殊處理。非順序消息重試間隔如下:
第幾次重試 | 與上次重試的間隔時(shí)間 |
1 | 10秒 |
2 | 30秒 |
3 | 1分鐘 |
4 | 2分鐘 |
5 | 3分鐘 |
6 | 4分鐘 |
7 | 5分鐘 |
8 | 6分鐘 |
9 | 7分鐘 |
10 | 8分鐘 |
11 | 9分鐘 |
12 | 10分鐘 |
13 | 20分鐘 |
14 | 30分鐘 |
15 | 1小時(shí) |
16 | 2小時(shí) |
消費(fèi)者重試代碼示例:
public class ConsumerDemo {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String body = new String(msg.getBody(), "UTF-8");
System.out.println("Receive message: " + body);
// 業(yè)務(wù)邏輯處理
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
e.printStackTrace();
// 告訴MQ這條消息處理失敗,需要稍后重新消費(fèi)
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
消息回溯
消息回溯指的是消費(fèi)者重新消費(fèi)已經(jīng)消費(fèi)過的消息。這個(gè)功能對(duì)于處理消費(fèi)失敗、消息處理錯(cuò)誤或者需要重新處理數(shù)據(jù)的場(chǎng)景非常有用。 Kafka和RocketMQ都支持以下兩種消息回溯:
- 基于偏移量回溯:
消費(fèi)者可以直接指定隊(duì)列的偏移量來回溯消息。這種方式需要消費(fèi)者知道具體的偏移量。
- 基于時(shí)間戳回溯:
消費(fèi)者根據(jù)時(shí)間戳來重置消費(fèi)進(jìn)度。這種方式適用于希望從某一特定時(shí)間點(diǎn)重新開始消費(fèi)消息的場(chǎng)景。
事務(wù)消息
Kafka事務(wù)消息
Kafka 0.11 版本開始支持事務(wù)消息,允許生產(chǎn)者在單個(gè)或多個(gè)分區(qū)中原子地寫入消息。通過事務(wù)消息,生產(chǎn)者可以確保消息要么全被發(fā)送,要么全不發(fā)送,從而避免了在失敗時(shí)消息部分發(fā)送的問題。
Kafka 事務(wù)消息的關(guān)鍵概念
- 事務(wù)ID:每個(gè)事務(wù)生產(chǎn)者都被分配一個(gè)唯一的事務(wù)ID。這個(gè)ID用來標(biāo)識(shí)生產(chǎn)者的事務(wù)狀態(tài),并保證即使在發(fā)生故障后,也能恢復(fù)并繼續(xù)處理事務(wù)。
- 事務(wù)協(xié)調(diào)器:Kafka集群中的每個(gè)事務(wù)生產(chǎn)者都有一個(gè)事務(wù)協(xié)調(diào)器(Transaction Coordinator)與之對(duì)應(yīng)。協(xié)調(diào)器負(fù)責(zé)管理所有與其事務(wù)ID相關(guān)的事務(wù)狀態(tài)。
- 生產(chǎn)者冪等性:事務(wù)生產(chǎn)者在 Kafka 中自動(dòng)啟用冪等性。冪等性保證了即使生產(chǎn)者發(fā)送相同消息多次,消息也只會(huì)被寫入一次。
如何使用 Kafka 事務(wù)消息
- 配置事務(wù)生產(chǎn)者:啟用事務(wù)需要在生產(chǎn)者配置中設(shè)置 transactional.id 和開啟冪等性。
- 初始化事務(wù):通過調(diào)用 initTransactions() 方法初始化事務(wù)環(huán)境。
- 開始事務(wù):通過調(diào)用 beginTransaction() 開始一個(gè)新的事務(wù)。
- 發(fā)送消息:在事務(wù)中正常發(fā)送消息。
- 提交或中止事務(wù):根據(jù)業(yè)務(wù)邏輯處理結(jié)果,調(diào)用 commitTransaction() 或 abortTransaction() 來提交或中止事務(wù)。
示例代碼
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaTransactionalProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-1"); // 指定事務(wù)ID
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 啟用冪等性
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 1. 初始化事務(wù)
producer.initTransactions();
try {
// 2. 開啟事務(wù)
producer.beginTransaction();
for (int i = 0; i < 100; i++) {
// 3. 同時(shí)發(fā)送到多個(gè)分區(qū)
producer.send(new ProducerRecord<>("your-topic", Integer.toString(i), "test message - " + i));
}
// 4. 提交事務(wù)
producer.commitTransaction();
} catch (Exception e) {
// 5. 中止事務(wù)
producer.abortTransaction();
} finally {
producer.close();
}
}
}
注意事項(xiàng)
- 事務(wù)超時(shí):事務(wù)生產(chǎn)者必須在配置的事務(wù)超時(shí)時(shí)間內(nèi)完成事務(wù),否則 Kafka 會(huì)認(rèn)為事務(wù)失敗并自動(dòng)中止它。
- 單一生產(chǎn)者規(guī)則:事務(wù)ID 應(yīng)唯一對(duì)應(yīng)單一生產(chǎn)者實(shí)例,以避免并發(fā)沖突和潛在的數(shù)據(jù)不一致問題。
- 事務(wù)與消費(fèi)者:確保消費(fèi)者正確處理事務(wù)消息,例如使用 read_committed 配置來只消費(fèi)已提交的消息。
RocketMQ 事務(wù)消息
RocketMQ采用半消息機(jī)制,實(shí)現(xiàn)了事務(wù)消息,就是把本地事務(wù)和生產(chǎn)者發(fā)送消息放在一個(gè)事務(wù)中。RocketMQ 事務(wù)消息工作原理
- 半消息(Half Message): 事務(wù)消息首先被發(fā)送為“半消息”,這意味著消息被Broker接收但對(duì)消費(fèi)者不可見。
- 執(zhí)行本地事務(wù): 一旦半消息被成功發(fā)送,生產(chǎn)者客戶端將執(zhí)行本地事務(wù)邏輯(如數(shù)據(jù)庫操作)。
- 提交或回滾: 根據(jù)本地事務(wù)執(zhí)行的結(jié)果,生產(chǎn)者決定是提交還是回滾事務(wù)消息:
如果本地事務(wù)成功,生產(chǎn)者發(fā)送提交消息指令給Broker,使得半消息對(duì)消費(fèi)者可見。
如果本地事務(wù)失敗,生產(chǎn)者發(fā)送回滾消息指令給Broker,Broker將刪除半消息。
- 消息狀態(tài)回查: 如果Broker沒有收到最終的提交或回滾指令(可能由于生產(chǎn)者崩潰等原因),Broker將向生產(chǎn)者查詢?cè)摪胂?duì)應(yīng)的本地事務(wù)狀態(tài)。
示例代碼
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException {
// 創(chuàng)建事務(wù)生產(chǎn)者,指定生產(chǎn)者組名
TransactionMQProducer producer = new TransactionMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
// 設(shè)置事務(wù)監(jiān)聽器
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 執(zhí)行本地事務(wù)邏輯
// 返回事務(wù)狀態(tài)
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 提供事務(wù)狀態(tài)的檢查邏輯
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
// 創(chuàng)建消息
Message msg = new Message("TopicTest", "TagA", "Key", "Transaction Message".getBytes());
// 發(fā)送事務(wù)消息
TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
producer.shutdown();
}
}
集群擴(kuò)展
Kafka 集群擴(kuò)展
- 統(tǒng)一的 Broker 角色:
Kafka 的 Broker 同時(shí)負(fù)責(zé)消息存儲(chǔ)、處理和路由信息的管理。每個(gè) Broker 都可以處理客戶端的請(qǐng)求,存儲(chǔ)消息數(shù)據(jù),以及處理其他 Broker 的數(shù)據(jù)復(fù)制請(qǐng)求。
- 復(fù)制機(jī)制:
Kafka 采用分區(qū)和副本的方式進(jìn)行數(shù)據(jù)復(fù)制。每個(gè)主題被分為多個(gè)分區(qū),每個(gè)分區(qū)可以有一個(gè)或多個(gè)副本,復(fù)制策略可以配置為同步或異步。
- 擴(kuò)展性:
擴(kuò)展 Kafka 集群主要是通過增加更多的 Broker 來完成。新的 Broker 加入后,可以通過重新平衡分區(qū)來分散數(shù)據(jù)和請(qǐng)求負(fù)載。
- 依賴于 Zookeeper:
Kafka 使用 Zookeeper 進(jìn)行集群管理和協(xié)調(diào),雖然最新版本的 Kafka 正在嘗試去除對(duì) Zookeeper 的依賴。
RocketMQ 集群擴(kuò)展
- 角色區(qū)分:
RocketMQ 明確區(qū)分了 Broker 和 NameServer 的角色。Broker 負(fù)責(zé)消息存儲(chǔ)和傳輸,而 NameServer 提供路由信息和服務(wù)發(fā)現(xiàn)功能,不參與消息傳遞。
- 主從同步:
RocketMQ 支持主從同步,其中 Master Broker 處理讀寫請(qǐng)求,Slave Broker 主要用于數(shù)據(jù)同步和故障恢復(fù)。
- 擴(kuò)展方式:
擴(kuò)展 RocketMQ 集群通常涉及添加更多的 Broker(Master/Slave)和可能的 NameServer。這種方式有助于提升集群的容錯(cuò)能力和數(shù)據(jù)的可用性。
- 靈活的部署:
可以靈活地部署多個(gè) NameServer 來提高服務(wù)的可用性,但 NameServer 之間不進(jìn)行數(shù)據(jù)同步。
核心區(qū)別
- 依賴服務(wù):RocketMQ 使用 NameServer 作為獨(dú)立的路由和服務(wù)發(fā)現(xiàn)層,而 Kafka 使用 Zookeeper 作為協(xié)調(diào)服務(wù)。
- 數(shù)據(jù)同步:RocketMQ 的主從架構(gòu)與 Kafka 的分區(qū)副本策略提供了不同的數(shù)據(jù)同步和故障恢復(fù)機(jī)制。
- 擴(kuò)展操作:RocketMQ 在擴(kuò)展時(shí)可能需要同時(shí)增加 Broker 和 NameServer,而 Kafka 的擴(kuò)展更多關(guān)注于增加 Broker 和分區(qū)重新平衡。
使用場(chǎng)景
RocketMQ 使用場(chǎng)景
- 事務(wù)消息:
RocketMQ 提供原生支持的事務(wù)消息特別適合需要處理復(fù)雜業(yè)務(wù)邏輯的場(chǎng)景,如電子商務(wù)中的訂單系統(tǒng),可以在處理業(yè)務(wù)邏輯失敗時(shí)進(jìn)行消息回滾。
- 順序消息:
RocketMQ 支持嚴(yán)格的順序消息,非常適合需要消息嚴(yán)格順序消費(fèi)的場(chǎng)景,如金融行業(yè)的交易和支付系統(tǒng)。
- 廣播消息:
RocketMQ 支持廣播消息發(fā)送,適用于發(fā)送如廣告信息、系統(tǒng)通知等到多個(gè)接收者的場(chǎng)景。
- 定時(shí)、延遲消息:
RocketMQ 支持定時(shí)或延遲消息傳遞,適合需要在指定時(shí)間執(zhí)行任務(wù)的應(yīng)用,例如定時(shí)推送、預(yù)約提醒等。
- 可靠性和可用性較高的應(yīng)用:
RocketMQ 的設(shè)計(jì)注重高可用性和服務(wù)的穩(wěn)定性,適合銀行、股票交易和電信運(yùn)營商等對(duì)消息丟失敏感度極高的行業(yè)。
Kafka 使用場(chǎng)景
- 日志聚合:
Kafka 常用于日志數(shù)據(jù)的收集和聚合,適用于需要高吞吐量處理日志文件的場(chǎng)景,如中大型網(wǎng)站的用戶活動(dòng)跟蹤、應(yīng)用日志集中管理等。
- 流式處理:
Kafka 與流處理框架(如 Apache Flink、Apache Storm 和 Kafka Streams)結(jié)合,提供實(shí)時(shí)數(shù)據(jù)流處理能力,適合實(shí)時(shí)分析和監(jiān)控系統(tǒng)。
- 事件驅(qū)動(dòng)架構(gòu):
Kafka 支持高吞吐的事件發(fā)布和訂閱,適用于構(gòu)建微服務(wù)架構(gòu)中的事件驅(qū)動(dòng)系統(tǒng),可以作為各個(gè)服務(wù)之間解耦的通信中間件。
- 數(shù)據(jù)湖或數(shù)據(jù)倉庫的數(shù)據(jù)集成:
Kafka 可以作為數(shù)據(jù)管道,將數(shù)據(jù)從多個(gè)源頭實(shí)時(shí)傳輸?shù)酱髷?shù)據(jù)平臺(tái)(如 Hadoop 或 Spark),支持大數(shù)據(jù)分析和數(shù)據(jù)挖掘。
- 分布式系統(tǒng)的冗余備份:
Kafka 的數(shù)據(jù)復(fù)制特性適用于需要在多個(gè)地理位置進(jìn)行冗余備份的系統(tǒng),以提高數(shù)據(jù)的可靠性和系統(tǒng)的災(zāi)難恢復(fù)能力。
區(qū)別
選擇 RocketMQ 或 Kafka 主要取決于具體的業(yè)務(wù)需求、系統(tǒng)要求以及團(tuán)隊(duì)的技術(shù)棧偏好。如果需要處理具有復(fù)雜業(yè)務(wù)邏輯的事務(wù)性消息,或需要精確控制消息順序和定時(shí)發(fā)送的功能,RocketMQ 可能是更合適的選擇。 而如果應(yīng)用場(chǎng)景更側(cè)重于高吞吐量的數(shù)據(jù)處理,如日志收集、實(shí)時(shí)數(shù)據(jù)流處理或事件驅(qū)動(dòng)的微服務(wù)架構(gòu),Kafka 則可能是更優(yōu)的選擇。