美團面試:對比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常見問題?
三大MQ指標對比
分布式、微服務、高并發(fā)架構中,消息隊列(Message Queue,簡稱MQ)扮演著至關重要的角色。
消息隊列用于實現(xiàn)系統(tǒng)間的異步通信、解耦、削峰填谷等功能。
對比指標 | RabbitMQ | RocketMQ | Kafka |
應用場景 | 中小規(guī)模應用場景 | 分布式事務、實時日志處理 | 大規(guī)模數(shù)據(jù)處理、實時流處理 |
開發(fā)語言 | Erlang | Java | Scala & Java |
消息可靠性 | 最高 (AMQP協(xié)議保證) | 較高 (基于事務保證) | 中等 (基于副本機制保證) |
消息吞吐量 | 低 萬級到十萬級 | 中等 十萬級到百萬級 | 高 百萬級或更高 |
時效性 | 毫秒級 | 毫秒級 | 毫秒級 |
支持的語言和平臺 | Java、C++、Python等 | Java、C++、Go等 | Java、Scala、Python等 |
架構模型 | virtual host、broker、exchange、queue | nameserver、controller、broker | broker、topic、partition、zookeeper/Kraft |
社區(qū)活躍度和生態(tài)建設 | 中等 活躍的開源社區(qū)和豐富的插件生態(tài)系統(tǒng) | 較高 阿里巴巴開源,穩(wěn)定的社區(qū)支持 | 最高 活躍的開源社區(qū)和廣泛的應用 |
github star | 10.8k | 19.4k | 25.2k |
對比分析三大MQ常見問題
下面, 對比分析三大MQ常見問題。
消息丟失問題
image-20250508192254071
1、RocketMQ解決消息丟失問題
生產(chǎn)端: 采用同步發(fā)送(等待Broker確認)并啟用重試機制,結合事務消息(如預提交half消息+二次確認commit)確保消息可靠投遞。
Broker端:配置同步刷盤(消息寫入磁盤后返回確認)和多副本同步機制(主從節(jié)點數(shù)據(jù)冗余)防止宕機丟失,同時通過集群容災保障高可用。
消費端:消費者需手動ACK確認,失敗時觸發(fā)自動重試(默認16次),最終失敗消息轉(zhuǎn)入死信隊列人工處理,避免異常場景下消息丟失。
2、Kafka解決消息丟失問題
生產(chǎn)端:設置acks=all
確保消息被所有副本持久化后才響應,啟用生產(chǎn)者重試(retries
)及冪等性(enable.idempotence=true
)防止網(wǎng)絡抖動或Broker異常導致丟失
Broker端:配置多副本同步(min.insync.replicas≥2
)和ISR(In-Sync Replicas)機制,僅同步成功的副本參與選舉;避免unclean.leader.election.enable=true
(防止數(shù)據(jù)不全的副本成為Leader)
消費端:關閉自動提交位移(enable.auto.commit=false
),手動同步提交(commitSync
)確保消息處理完成后再更新位移,結合消費重試及死信隊列兜底
3、RabbitMQ解決消息丟失問題
生產(chǎn)端:啟用Publisher Confirm模式(異步確認消息持久化)并設置mandatory=true
路由失敗回退,結合備份交換機處理無法路由的消息;事務消息因性能損耗僅限關鍵場景使用。
Broker端:消息與隊列均需持久化(durable=true
)防止宕機丟失,部署鏡像隊列集群實現(xiàn)多節(jié)點冗余;同步刷盤策略確保數(shù)據(jù)落盤后響應。
消費端:關閉自動ACK,采用手動ACK并在業(yè)務處理成功后提交確認;消費失敗時重試(重試次數(shù)可配置)并最終轉(zhuǎn)入死信隊列人工干預,避免消息因異常未處理而丟失。
消息積壓問題
1、RocketMQ解決消息積壓問題
RocketMQ通過橫向擴展(增加消費者實例、隊列數(shù)量)、提升消費能力(線程池調(diào)優(yōu)、批量消費)、動態(tài)擴容、消息預取、死信隊列隔離無效消息,并支持消費限流及監(jiān)控告警,快速定位處理積壓問題。
RocketMQ還提供了消息拉取和推拉模式,消費者可以根據(jù)自身的處理能力主動拉取消息,避免消息積壓過多。
2、Kafka解決消息積壓問題
Kafka通過 橫向擴展(增加分區(qū)及消費者實例)、優(yōu)化消費者參數(shù)(如批量拉取、并發(fā)處理)、提升消費邏輯效率(異步化、減少I/O),并動態(tài)監(jiān)控消費滯后指標。
必要時限流生產(chǎn)者或臨時擴容消費組,結合分區(qū)再平衡策略快速分發(fā)積壓消息負載。
Kafka還提供了消息清理(compaction)和數(shù)據(jù)保留策略,可以根據(jù)時間或者數(shù)據(jù)大小來自動刪除過期的消息,避免消息積壓過多。
3、RabbitMQ解決消息積壓問題
RabbitMQ通過調(diào)整消費者的消費速率來控制消息積壓。
可以使用QoS(Quality of Service)機制設置每個消費者的預取計數(shù),限制每次從隊列中獲取的消息數(shù)量,以控制消費者的處理速度。
RabbitMQ還支持消費者端的流量控制,通過設置basic.qos或basic.consume命令的參數(shù)來控制消費者的處理速度,避免消息過多導致積壓。
消息重復消費問題
1、RocketMQ解決消息重復消費問題
- 使用消息唯一標識符(Message ID):在消息發(fā)送時,為每條消息附加一個唯一標識符。消費者在處理消息時,可以通過判斷消息唯一標識符來避免重復消費??梢詫⑾D記錄在數(shù)據(jù)庫或緩存中,用于去重檢查。
- 消費者端去重處理:消費者在消費消息時,可以通過維護一個已消費消息的列表或緩存,來避免重復消費已經(jīng)處理過的消息。
2、Kafka解決消息重復消費問題
- 冪等性處理:在消費者端實現(xiàn)冪等性邏輯,即多次消費同一條消息所產(chǎn)生的結果與單次消費的結果一致。這可以通過在業(yè)務邏輯中引入唯一標識符或記錄已處理消息的狀態(tài)來實現(xiàn)。
- 消息確認機制:消費者在處理完消息后,提交已消費的偏移量(Offset)給Kafka,Kafka會記錄已提交的偏移量,以便在消費者重新啟動時從正確的位置繼續(xù)消費。消費者可以定期提交偏移量,確保消息只被消費一次。
3、RabbitMQ解決消息重復消費問題
- 冪等性處理:在消費者端實現(xiàn)冪等性邏輯,即無論消息被消費多少次,最終的結果應該保持一致。這可以通過在消費端進行唯一標識的檢查或者記錄已經(jīng)處理過的消息來實現(xiàn)。
- 消息確認機制:消費者在處理完消息后,發(fā)送確認消息(ACK)給RabbitMQ,告知消息已經(jīng)成功處理。RabbitMQ根據(jù)接收到的確認消息來判斷是否需要重新投遞消息給其他消費者。
最為詳細的方案,請參考尼恩團隊的架構方案: 最系統(tǒng)的冪等性方案:一鎖二判三更新
消息有序性
1、Rabbitmq 解決有序性問題
模式一:單隊列單消費者模式
- 將需要保證順序的消息全部發(fā)送到同一個隊列,且消費者設置為單線程處理。
- 原理:RabbitMQ 隊列天然支持 FIFO 順序存儲,單消費者避免并發(fā)處理導致亂序。
示例代:
// 生產(chǎn)者發(fā)送到同一隊列
rabbitTemplate.convertAndSend("order.queue", "message1");
rabbitTemplate.convertAndSend("order.queue", "message2");
// 消費者單線程監(jiān)聽
@RabbitListener(queues = "order.queue")
public void processOrder(String message) {
// 順序處理邏輯
}
缺點:無法橫向擴展消費者,吞吐量受限。
模式二:消息分組策略
按業(yè)務標識分區(qū)(如訂單 ID、用戶 ID),相同分組的消息路由到同一隊列,每個隊列對應一個消費者。
實現(xiàn)方式: 生產(chǎn)者通過哈希算法或自定義路由鍵將關聯(lián)的消息分配到特定隊列。
- 生產(chǎn)者根據(jù)業(yè)務標識生成路由鍵,如
routingKey = orderId.hashCode() % queueCount
。 - 聲明多個隊列,綁定到同一交換機,并根據(jù)路由鍵規(guī)則分發(fā)消息。
代碼示例:
// 生產(chǎn)者發(fā)送消息時指定路由鍵
String orderId = "ORDER_1001";
String routingKey = "order." + (orderId.hashCode() % 3); // 分配到3個隊列之一
rabbitTemplate.convertAndSend("order.exchange", routingKey, message);
優(yōu)勢:在保證同分組順序性的同時,允許不同分組并行處理。
消費者并發(fā)控制 設置
prefetchCount=1
確保每次只處理一個消息,關閉自動應答,手動確認后再獲取新消息:
spring:
rabbitmq:
listener:
simple:
prefetch: 1
效果:防止消費者同時處理多個消息導致亂序。
2、RocketMQ解決有序性問題
RocketMQ實現(xiàn)順序消息的核心是通過生產(chǎn)端和消費端雙重保障:
- 全局順序需單隊列(性能受限),分區(qū)順序通過Sharding Key哈希分散到不同隊列,兼顧吞吐量與局部有序性。需避免異步消費、消息重試亂序,失敗時跳過當前消息防止阻塞
- 生產(chǎn)者使用MessageQueueSelector將同一業(yè)務標識(如訂單ID)的消息強制路由至同一隊列,利用隊列FIFO特性保序;
- 消費端對 同一隊列啟用 單線程拉取 + 分區(qū)鎖機制(ConsumeOrderlyContext),確保串行處理。
3、Kafka解決有序性問題
Kafka實現(xiàn)順序消息的核心在于分區(qū)順序性:
- 生產(chǎn)端:相同業(yè)務標識(如訂單ID)的消息通過固定Key哈希至同一分區(qū)(
Partitioner
),利用分區(qū)內(nèi)消息天然有序性保序; - 消費端:每個分區(qū)僅由同一消費者組的一個線程消費(單線程串行處理),避免并發(fā)消費亂序;
事務消息
1、RabbitMQ的事務消息
- RabbitMQ支持事務消息的發(fā)送和確認。在發(fā)送消息之前,可以通過調(diào)用"channel.txSelect()"來開啟事務,然后將要發(fā)送的消息發(fā)布到交換機中。如果事務成功提交,消息將被發(fā)送到隊列,否則事務會回滾,消息不會被發(fā)送。
- 在消費端,可以通過"channel.txSelect()"開啟事務,然后使用"basicAck"手動確認消息的處理結果。如果事務成功提交,消費端會發(fā)送ACK確認消息的處理;否則,事務回滾,消息將被重新投遞。
public class RabbitMQTransactionDemo {
private static final String QUEUE_NAME = "transaction_queue";
public static void main(String[] args) {
try {
// 創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 創(chuàng)建連接
Connection connection = factory.newConnection();
// 創(chuàng)建信道
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
try {
// 開啟事務
channel.txSelect();
// 發(fā)送消息
String message = "Hello, RabbitMQ!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
// 提交事務
channel.txCommit();
} catch (Exception e) {
// 事務回滾
channel.txRollback();
e.printStackTrace();
}
// 關閉信道和連接
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2、RocketMQ的事務消息
RocketMQ提供了事務消息的機制,確保消息的可靠性和一致性。
發(fā)送事務消息時,需要將消息發(fā)送到半消息隊列,然后執(zhí)行本地事務邏輯。
事務執(zhí)行成功后,通過調(diào)用"TransactionStatus.CommitTransaction"提交事務消息;若事務執(zhí)行失敗,則通過調(diào)用"TransactionStatus.RollbackTransaction"回滾事務消息。
事務消息的最終狀態(tài)由消息生產(chǎn)者根據(jù)事務執(zhí)行結果進行確認。
public class RocketMQTransactionDemo {
public static void main(String[] args) throws Exception {
// 創(chuàng)建事務消息生產(chǎn)者
TransactionMQProducer producer = new TransactionMQProducer("group_name");
producer.setNamesrvAddr("localhost:9876");
// 設置事務監(jiān)聽器
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 執(zhí)行本地事務邏輯,根據(jù)業(yè)務邏輯結果返回相應的狀態(tài)
// 返回 LocalTransactionState.COMMIT_MESSAGE 表示事務提交
// 返回 LocalTransactionState.ROLLBACK_MESSAGE 表示事務回滾
// 返回 LocalTransactionState.UNKNOW 表示事務狀態(tài)未知
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 根據(jù)消息的狀態(tài),來判斷本地事務的最終狀態(tài)
// 返回 LocalTransactionState.COMMIT_MESSAGE 表示事務提交
// 返回 LocalTransactionState.ROLLBACK_MESSAGE 表示事務回滾
// 返回 LocalTransactionState.UNKNOW 表示事務狀態(tài)未知
}
});
// 啟動事務消息生產(chǎn)者
producer.start();
// 構造消息
Message msg = new Message("topic_name", "tag_name", "Hello, RocketMQ!".getBytes());
// 發(fā)送事務消息
TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.println("Send Result: " + sendResult);
// 關閉事務消息生產(chǎn)者
producer.shutdown();
}
}
3、Kafka的事務消息
Kafka引入了事務功能來確保消息的原子性和一致性。事務消息的發(fā)送和確認在生產(chǎn)者端進行。
生產(chǎn)者可以通過初始化事務,將一系列的消息寫入事務,然后通過"commitTransaction()"提交事務,或者通過"abortTransaction()"中止事務。
Kafka會保證在事務提交之前,寫入的所有消息不會被消費者可見,以保持事務的一致性。
public class KafkaTransactionDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id");
Producer<String, String> producer = new KafkaProducer<>(props);
// 初始化事務
producer.initTransactions();
try {
// 開啟事務
producer.beginTransaction();
// 發(fā)送消息
ProducerRecord<String, String> record = new ProducerRecord<>("topic_name", "Hello, Kafka!");
producer.send(record);
// 提交事務
producer.commitTransaction();
} catch (ProducerFencedException e) {
// 處理異常情況
producer.close();
} finally {
producer.close();
}
}
}
消息確認 ACK機制
1、RabbitMQ的ACK機制
RabbitMQ使用ACK(消息確認)機制來確保消息的可靠傳遞。
消費者收到消息后,需要向RabbitMQ發(fā)送ACK來確認消息的處理狀態(tài)。
只有在收到ACK后,RabbitMQ才會將消息標記為已成功傳遞,否則會將消息重新投遞給其他消費者或者保留在隊列中。
以下是RabbitMQ ACK的Java示例:
public class RabbitMQAckDemo {
public static void main(String[] args) throws Exception {
// 創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 創(chuàng)建連接
Connection connection = factory.newConnection();
// 創(chuàng)建信道
Channel channel = connection.createChannel();
// 聲明隊列
String queueName = "queue_name";
channel.queueDeclare(queueName, false, false, false, null);
// 創(chuàng)建消費者
String consumerTag = "consumer_tag";
boolean autoAck = false; // 關閉自動ACK
// 消費消息
channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消費消息
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
try {
// 模擬處理消息的業(yè)務邏輯
processMessage(message);
// 手動發(fā)送ACK確認消息
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 處理消息異常,可以選擇重試或者記錄日志等操作
System.out.println("Failed to process message: " + message);
e.printStackTrace();
// 手動發(fā)送NACK拒絕消息,并可選是否重新投遞
long deliveryTag = envelope.getDeliveryTag();
boolean requeue = true; // 重新投遞消息
channel.basicNack(deliveryTag, false, requeue);
}
}
});
}
private static void processMessage(String message) {
// 模擬處理消息的業(yè)務邏輯
}
}
2、RocketMQ的ACK機制
RocketMQ的ACK機制由消費者控制,消費者從消息隊列中消費消息后,可以手動發(fā)送ACK確認消息的處理狀態(tài)。
只有在收到ACK后,RocketMQ才會將消息標記為已成功消費,否則會將消息重新投遞給其他消費者。
public class RocketMQAckDemo {
public static void main(String[] args) throws Exception {
// 創(chuàng)建消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
consumer.setNamesrvAddr("localhost:9876");
// 訂閱消息
consumer.subscribe("topic_name", "*");
// 注冊消息監(jiān)聽器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt message : msgs) {
try {
// 消費消息
String msgBody = new String(message.getBody(), "UTF-8");
System.out.println("Received message: " + msgBody);
// 模擬處理消息的業(yè)務邏輯
processMessage(msgBody);
// 手動發(fā)送ACK確認消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 處理消息異常,可以選擇重試或者記錄日志等操作
System.out.println("Failed to process message: " + new String(message.getBody()));
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 啟動消費者
consumer.start();
}
private static void processMessage(String message) {
// 模擬處理消息的業(yè)務邏輯
}
}
3、Kafka的ACK機制
Kafka的ACK機制用于控制生產(chǎn)者在發(fā)送消息后,需要等待多少個副本確認才視為消息發(fā)送成功。
這個機制可以通過設置acks參數(shù)來進行配置。在Kafka中,acks參數(shù)有三個可選值:
acks=0:生產(chǎn)者在發(fā)送消息后不需要等待任何確認,直接將消息發(fā)送給Kafka集群。這種方式具有最高的吞吐量,但是也存在數(shù)據(jù)丟失的風險,因為生產(chǎn)者不會知道消息是否成功發(fā)送給任何副本。
acks=1:生產(chǎn)者在發(fā)送消息后只需要等待首領副本(leader replica)確認。一旦首領副本成功接收到消息,生產(chǎn)者就會收到確認。這種方式提供了一定的可靠性,但是如果首領副本在接收消息后但在確認之前發(fā)生故障,仍然可能會導致數(shù)據(jù)丟失。
acks=all:生產(chǎn)者在發(fā)送消息后需要等待所有副本都確認。只有當所有副本都成功接收到消息后,生產(chǎn)者才會收到確認。這是最安全的確認機制,確保了消息不會丟失,但是需要更多的時間和資源。acks=-1與acks=all是等效的。
public classKafkaProducerDemo{
public static void main(String[]args){
// 配置Kafka生產(chǎn)者的參數(shù)
Propertiesprops=newProperties();
props.put("bootstrap.servers","localhost:9092");// Kafka集群的地址和端口
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");// 鍵的序列化器
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 值的序列化器
props.put("acks","all");// 設置ACK機制為所有副本都確認
// 創(chuàng)建生產(chǎn)者實例
KafkaProducer<String,String>producer=newKafkaProducer<>(props);
// 構造消息
Stringtopic="my_topic";
Stringkey="my_key";
Stringvalue="Hello, Kafka!";
// 創(chuàng)建消息記錄
ProducerRecord<String,String>record=newProducerRecord<>(topic,key,value);
// 發(fā)送消息
producer.send(record,newCallback(){
@Override
publicvoidonCompletion(RecordMetadatametadata,Exceptionexception){
if(exception!=null){
System.err.println("發(fā)送消息出現(xiàn)異常:"+exception.getMessage());
}else{
System.out.println("消息發(fā)送成功!位于分區(qū) "+metadata.partition()+",偏移量 "+metadata.offset());
}
}
});
// 關閉生產(chǎn)者
producer.close();
}
}
延遲消息實現(xiàn)
延遲隊列在實際項目中有非常多的應用場景,最常見的比如訂單未支付,超時取消訂單,在創(chuàng)建訂單的時候發(fā)送一條延遲消息,達到延遲時間之后消費者收到消息,如果訂單沒有支付的話,那么就取消訂單。
image-20250508192415995
1、RocketMQ實現(xiàn)延遲消息
RocketMQ 默認時間間隔分為 18 個級別,基本上也能滿足大部分場景的需要了。
默認延遲級別:
1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
使用起來也非常的簡單,直接通過setDelayTimeLevel
設置延遲級別即可。
setDelayTimeLevel(level)
實現(xiàn)原理說起來比較簡單,Broker 會根據(jù)不同的延遲級別創(chuàng)建出多個不同級別的隊列,當我們發(fā)送延遲消息的時候,根據(jù)不同的延遲級別發(fā)送到不同的隊列中,同時在 Broker 內(nèi)部通過一個定時器去輪詢這些隊列(RocketMQ 會為每個延遲級別分別創(chuàng)建一個定時任務),如果消息達到發(fā)送時間,那么就直接把消息發(fā)送到指 topic 隊列中。
RocketMQ 這種實現(xiàn)方式是放在服務端去做的,同時有個好處就是相同延遲時間的消息是可以保證有序性的。
談到這里就順便提一下關于消息消費重試的原理,這個本質(zhì)上來說其實是一樣的,對于消費失敗需要重試的消息實際上都會被丟到延遲隊列的 topic 里,到期后再轉(zhuǎn)發(fā)到真正的 topic 中。
image-20250508192539070
2、RabbitMQ實現(xiàn)延遲消息
RabbitMQ本身并不存在延遲隊列的概念,在 RabbitMQ 中是通過 DLX 死信交換機和 TTL 消息過期來實現(xiàn)延遲隊列的。
TTL(Time to Live)過期時間
有兩種方式可以設置 TTL。
(1) 通過隊列屬性設置,這樣的話隊列中的所有消息都會擁有相同的過期時間(2) 對消息單獨設置過期時間,這樣每條消息的過期時間都可以不同
那么如果同時設置呢?這樣將會以兩個時間中較小的值為準。
針對隊列的方式通過參數(shù)x-message-ttl
來設置。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
針對消息的方式通過setExpiration
來設置。
AMQP.BasicProperties properties = new AMQP.BasicProperties();
Properties.setDeliveryMode(2);
properties.setExpiration("60000");
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "message".getBytes());
DLX(Dead Letter Exchange)死信交換機
一個消息要成為死信消息有 3 種情況:
(1) 消息被拒絕,比如調(diào)用reject
方法,并且需要設置requeue
為false
(2) 消息過期
(3) 隊列達到最大長度
可以通過參數(shù)dead-letter-exchange
設置死信交換機,也可以通過參數(shù)dead-letter- exchange
指定 RoutingKey(未指定則使用原隊列的 RoutingKey)。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "exchange.dlx");
args.put("x-dead-letter-routing-key", "routingkey");
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
實現(xiàn)原理
當我們對消息設置了 TTL 和 DLX 之后,當消息正常發(fā)送,通過 Exchange 到達 Queue 之后,由于設置了 TTL 過期時間,并且消息沒有被消費(訂閱的是死信隊列),達到過期時間之后,消息就轉(zhuǎn)移到與之綁定的 DLX 死信隊列之中。
這樣的話,就相當于通過 DLX 和 TTL 間接實現(xiàn)了延遲消息的功能,實際使用中我們可以根據(jù)不同的延遲級別綁定設置不同延遲時間的隊列來達到實現(xiàn)不同延遲時間的效果。
如果隊列通過 dead-letter-exchange 屬性指定了一個交換機,那么該隊列中的死信就會投遞到這個交換機中,這個交換機稱為死信交換機(Dead Letter Exchange,簡稱DLX)。
image-20250508192505250
3、Kafka實現(xiàn)延遲消息
對于 Kafka 來說,原生并不支持延遲隊列的功能,需要我們手動去實現(xiàn),這里我根據(jù) RocketMQ 的設計提供一個實現(xiàn)思路。
這個設計,我們也不支持任意時間精度的延遲消息,只支持固定級別的延遲,因為對于大部分延遲消息的場景來說足夠使用了。
只創(chuàng)建一個 topic,但是針對該 topic 創(chuàng)建 18 個 partition,每個 partition 對應不同的延遲級別,這樣做和 RocketMQ 一樣有個好處就是能達到相同延遲時間的消息達到有序性。
應用級 Kafka 延遲消息實現(xiàn)原理
首先創(chuàng)建一個單獨針對延遲隊列的 topic,同時創(chuàng)建 18 個 partition 針對不同的延遲級別。
發(fā)送消息的時候根據(jù)延遲參數(shù)發(fā)送到延遲 topic 對應的 partition,對應的key
為延遲時間,同時把原 topic 保存到 header 中。
ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>("delay_topic", delayPartition, delayTime, data);
producerRecord.headers().add("origin_topic", topic.getBytes(StandardCharsets.UTF_8));
內(nèi)嵌的consumer
單獨設置一個ConsumerGroup
去消費延遲 topic 消息,消費到消息之后如果沒有達到延遲時間那么就進行pause
,然后seek
到當前ConsumerRecord
的offset
位置,同時使用定時器去輪詢延遲的TopicPartition
,達到延遲時間之后進行resume
。
如果達到了延遲時間,那么就獲取到header
中的真實 topic ,直接轉(zhuǎn)發(fā)。
這里為什么要進行pause
和resume
呢?
因為如果不這樣的話,如果超時未消費達到max.poll.interval.ms
最大時間(默認300s),那么將會觸發(fā) Rebalance。