自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

美團面試:對比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常見問題?

開發(fā) 架構
RocketMQ通過橫向擴展(增加消費者實例、隊列數(shù)量)、提升消費能力(線程池調(diào)優(yōu)、批量消費)、動態(tài)擴容、消息預取、死信隊列隔離無效消息,并支持消費限流及監(jiān)控告警,快速定位處理積壓問題。

三大MQ指標對比

分布式、微服務、高并發(fā)架構中,消息隊列(Message Queue,簡稱MQ)扮演著至關重要的角色。

消息隊列用于實現(xiàn)系統(tǒng)間的異步通信、解耦、削峰填谷等功能。

對比指標

RabbitMQ

RocketMQ

Kafka

應用場景

中小規(guī)模應用場景

分布式事務、實時日志處理

大規(guī)模數(shù)據(jù)處理、實時流處理

開發(fā)語言

Erlang

Java

Scala & Java

消息可靠性

最高 (AMQP協(xié)議保證)

較高 (基于事務保證)

中等 (基于副本機制保證)

消息吞吐量

低 萬級到十萬級

中等 十萬級到百萬級

高 百萬級或更高

時效性

毫秒級

毫秒級

毫秒級

支持的語言和平臺

Java、C++、Python等

Java、C++、Go等

Java、Scala、Python等

架構模型

virtual host、broker、exchange、queue

nameserver、controller、broker

broker、topic、partition、zookeeper/Kraft

社區(qū)活躍度和生態(tài)建設

中等 活躍的開源社區(qū)和豐富的插件生態(tài)系統(tǒng)

較高 阿里巴巴開源,穩(wěn)定的社區(qū)支持

最高 活躍的開源社區(qū)和廣泛的應用

github star

10.8k

19.4k

25.2k

對比分析三大MQ常見問題

下面, 對比分析三大MQ常見問題。

消息丟失問題


image-20250508192254071image-20250508192254071

1、RocketMQ解決消息丟失問題

生產(chǎn)端:  采用同步發(fā)送(等待Broker確認)并啟用重試機制,結合事務消息(如預提交half消息+二次確認commit)確保消息可靠投遞。

Broker端:配置同步刷盤(消息寫入磁盤后返回確認)和多副本同步機制(主從節(jié)點數(shù)據(jù)冗余)防止宕機丟失,同時通過集群容災保障高可用。

消費端:消費者需手動ACK確認,失敗時觸發(fā)自動重試(默認16次),最終失敗消息轉(zhuǎn)入死信隊列人工處理,避免異常場景下消息丟失。

2、Kafka解決消息丟失問題

生產(chǎn)端:設置acks=all確保消息被所有副本持久化后才響應,啟用生產(chǎn)者重試(retries)及冪等性(enable.idempotence=true)防止網(wǎng)絡抖動或Broker異常導致丟失

Broker端:配置多副本同步(min.insync.replicas≥2)和ISR(In-Sync Replicas)機制,僅同步成功的副本參與選舉;避免unclean.leader.election.enable=true(防止數(shù)據(jù)不全的副本成為Leader)

消費端:關閉自動提交位移(enable.auto.commit=false),手動同步提交(commitSync)確保消息處理完成后再更新位移,結合消費重試及死信隊列兜底

3、RabbitMQ解決消息丟失問題

生產(chǎn)端:啟用Publisher Confirm模式(異步確認消息持久化)并設置mandatory=true路由失敗回退,結合備份交換機處理無法路由的消息;事務消息因性能損耗僅限關鍵場景使用。

Broker端:消息與隊列均需持久化(durable=true)防止宕機丟失,部署鏡像隊列集群實現(xiàn)多節(jié)點冗余;同步刷盤策略確保數(shù)據(jù)落盤后響應。

消費端:關閉自動ACK,采用手動ACK并在業(yè)務處理成功后提交確認;消費失敗時重試(重試次數(shù)可配置)并最終轉(zhuǎn)入死信隊列人工干預,避免消息因異常未處理而丟失。

消息積壓問題

1、RocketMQ解決消息積壓問題

RocketMQ通過橫向擴展(增加消費者實例、隊列數(shù)量)、提升消費能力(線程池調(diào)優(yōu)、批量消費)、動態(tài)擴容、消息預取、死信隊列隔離無效消息,并支持消費限流及監(jiān)控告警,快速定位處理積壓問題。

RocketMQ還提供了消息拉取和推拉模式,消費者可以根據(jù)自身的處理能力主動拉取消息,避免消息積壓過多。

2、Kafka解決消息積壓問題

Kafka通過  橫向擴展(增加分區(qū)及消費者實例)、優(yōu)化消費者參數(shù)(如批量拉取、并發(fā)處理)、提升消費邏輯效率(異步化、減少I/O),并動態(tài)監(jiān)控消費滯后指標。

必要時限流生產(chǎn)者或臨時擴容消費組,結合分區(qū)再平衡策略快速分發(fā)積壓消息負載。

Kafka還提供了消息清理(compaction)和數(shù)據(jù)保留策略,可以根據(jù)時間或者數(shù)據(jù)大小來自動刪除過期的消息,避免消息積壓過多。

3、RabbitMQ解決消息積壓問題

RabbitMQ通過調(diào)整消費者的消費速率來控制消息積壓。

可以使用QoS(Quality of Service)機制設置每個消費者的預取計數(shù),限制每次從隊列中獲取的消息數(shù)量,以控制消費者的處理速度。

RabbitMQ還支持消費者端的流量控制,通過設置basic.qos或basic.consume命令的參數(shù)來控制消費者的處理速度,避免消息過多導致積壓。

消息重復消費問題

1、RocketMQ解決消息重復消費問題

  • 使用消息唯一標識符(Message ID):在消息發(fā)送時,為每條消息附加一個唯一標識符。消費者在處理消息時,可以通過判斷消息唯一標識符來避免重復消費??梢詫⑾D記錄在數(shù)據(jù)庫或緩存中,用于去重檢查。
  • 消費者端去重處理:消費者在消費消息時,可以通過維護一個已消費消息的列表或緩存,來避免重復消費已經(jīng)處理過的消息。

2、Kafka解決消息重復消費問題

  • 冪等性處理:在消費者端實現(xiàn)冪等性邏輯,即多次消費同一條消息所產(chǎn)生的結果與單次消費的結果一致。這可以通過在業(yè)務邏輯中引入唯一標識符或記錄已處理消息的狀態(tài)來實現(xiàn)。
  • 消息確認機制:消費者在處理完消息后,提交已消費的偏移量(Offset)給Kafka,Kafka會記錄已提交的偏移量,以便在消費者重新啟動時從正確的位置繼續(xù)消費。消費者可以定期提交偏移量,確保消息只被消費一次。

3、RabbitMQ解決消息重復消費問題

  • 冪等性處理:在消費者端實現(xiàn)冪等性邏輯,即無論消息被消費多少次,最終的結果應該保持一致。這可以通過在消費端進行唯一標識的檢查或者記錄已經(jīng)處理過的消息來實現(xiàn)。
  • 消息確認機制:消費者在處理完消息后,發(fā)送確認消息(ACK)給RabbitMQ,告知消息已經(jīng)成功處理。RabbitMQ根據(jù)接收到的確認消息來判斷是否需要重新投遞消息給其他消費者。

最為詳細的方案,請參考尼恩團隊的架構方案: 最系統(tǒng)的冪等性方案:一鎖二判三更新

消息有序性

1、Rabbitmq 解決有序性問題

模式一:單隊列單消費者模式
  • 將需要保證順序的消息全部發(fā)送到同一個隊列,且消費者設置為單線程處理。
  • 原理:RabbitMQ 隊列天然支持 FIFO 順序存儲,單消費者避免并發(fā)處理導致亂序。

示例代:

// 生產(chǎn)者發(fā)送到同一隊列
  rabbitTemplate.convertAndSend("order.queue", "message1");
  rabbitTemplate.convertAndSend("order.queue", "message2");
  // 消費者單線程監(jiān)聽
  @RabbitListener(queues = "order.queue")
  public void processOrder(String message) {
      // 順序處理邏輯
  }

缺點:無法橫向擴展消費者,吞吐量受限。

模式二:消息分組策略

按業(yè)務標識分區(qū)(如訂單 ID、用戶 ID),相同分組的消息路由到同一隊列,每個隊列對應一個消費者。

實現(xiàn)方式: 生產(chǎn)者通過哈希算法或自定義路由鍵將關聯(lián)的消息分配到特定隊列。

  • 生產(chǎn)者根據(jù)業(yè)務標識生成路由鍵,如 routingKey = orderId.hashCode() % queueCount。
  • 聲明多個隊列,綁定到同一交換機,并根據(jù)路由鍵規(guī)則分發(fā)消息。

代碼示例:

// 生產(chǎn)者發(fā)送消息時指定路由鍵
String orderId = "ORDER_1001";
String routingKey = "order." + (orderId.hashCode() % 3);  // 分配到3個隊列之一
rabbitTemplate.convertAndSend("order.exchange", routingKey, message);

優(yōu)勢:在保證同分組順序性的同時,允許不同分組并行處理。

消費者并發(fā)控制 設置

prefetchCount=1

確保每次只處理一個消息,關閉自動應答,手動確認后再獲取新消息:

spring:
  rabbitmq:
         listener:
           simple:
             prefetch: 1

效果:防止消費者同時處理多個消息導致亂序。

2、RocketMQ解決有序性問題

RocketMQ實現(xiàn)順序消息的核心是通過生產(chǎn)端和消費端雙重保障:

  • 全局順序需單隊列(性能受限),分區(qū)順序通過Sharding Key哈希分散到不同隊列,兼顧吞吐量與局部有序性。需避免異步消費、消息重試亂序,失敗時跳過當前消息防止阻塞
  • 生產(chǎn)者使用MessageQueueSelector將同一業(yè)務標識(如訂單ID)的消息強制路由至同一隊列,利用隊列FIFO特性保序;
  • 消費端對  同一隊列啟用 單線程拉取 + 分區(qū)鎖機制(ConsumeOrderlyContext),確保串行處理。

3、Kafka解決有序性問題

Kafka實現(xiàn)順序消息的核心在于分區(qū)順序性:

  • 生產(chǎn)端:相同業(yè)務標識(如訂單ID)的消息通過固定Key哈希至同一分區(qū)(Partitioner),利用分區(qū)內(nèi)消息天然有序性保序;
  • 消費端:每個分區(qū)僅由同一消費者組的一個線程消費(單線程串行處理),避免并發(fā)消費亂序;

事務消息

1、RabbitMQ的事務消息

  • RabbitMQ支持事務消息的發(fā)送和確認。在發(fā)送消息之前,可以通過調(diào)用"channel.txSelect()"來開啟事務,然后將要發(fā)送的消息發(fā)布到交換機中。如果事務成功提交,消息將被發(fā)送到隊列,否則事務會回滾,消息不會被發(fā)送。
  • 在消費端,可以通過"channel.txSelect()"開啟事務,然后使用"basicAck"手動確認消息的處理結果。如果事務成功提交,消費端會發(fā)送ACK確認消息的處理;否則,事務回滾,消息將被重新投遞。
public class RabbitMQTransactionDemo {
    private static final String QUEUE_NAME = "transaction_queue";
    public static void main(String[] args) {
        try {
            // 創(chuàng)建連接工廠
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            // 創(chuàng)建連接
            Connection connection = factory.newConnection();
            // 創(chuàng)建信道
            Channel channel = connection.createChannel();
            // 聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            try {
                // 開啟事務
                channel.txSelect();
                // 發(fā)送消息
                String message = "Hello, RabbitMQ!";
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                // 提交事務
                channel.txCommit();
            } catch (Exception e) {
                // 事務回滾
                channel.txRollback();
                e.printStackTrace();
            }
            // 關閉信道和連接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2、RocketMQ的事務消息

RocketMQ提供了事務消息的機制,確保消息的可靠性和一致性。

發(fā)送事務消息時,需要將消息發(fā)送到半消息隊列,然后執(zhí)行本地事務邏輯。

事務執(zhí)行成功后,通過調(diào)用"TransactionStatus.CommitTransaction"提交事務消息;若事務執(zhí)行失敗,則通過調(diào)用"TransactionStatus.RollbackTransaction"回滾事務消息。

事務消息的最終狀態(tài)由消息生產(chǎn)者根據(jù)事務執(zhí)行結果進行確認。

public class RocketMQTransactionDemo {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建事務消息生產(chǎn)者
        TransactionMQProducer producer = new TransactionMQProducer("group_name");
        producer.setNamesrvAddr("localhost:9876");
        // 設置事務監(jiān)聽器
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 執(zhí)行本地事務邏輯,根據(jù)業(yè)務邏輯結果返回相應的狀態(tài)
                // 返回 LocalTransactionState.COMMIT_MESSAGE 表示事務提交
                // 返回 LocalTransactionState.ROLLBACK_MESSAGE 表示事務回滾
                // 返回 LocalTransactionState.UNKNOW 表示事務狀態(tài)未知
            }
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 根據(jù)消息的狀態(tài),來判斷本地事務的最終狀態(tài)
                // 返回 LocalTransactionState.COMMIT_MESSAGE 表示事務提交
                // 返回 LocalTransactionState.ROLLBACK_MESSAGE 表示事務回滾
                // 返回 LocalTransactionState.UNKNOW 表示事務狀態(tài)未知
            }
        });
        // 啟動事務消息生產(chǎn)者
        producer.start();
        // 構造消息
        Message msg = new Message("topic_name", "tag_name", "Hello, RocketMQ!".getBytes());
        // 發(fā)送事務消息
        TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null);
        System.out.println("Send Result: " + sendResult);
        // 關閉事務消息生產(chǎn)者
        producer.shutdown();
    }
}

3、Kafka的事務消息

Kafka引入了事務功能來確保消息的原子性和一致性。事務消息的發(fā)送和確認在生產(chǎn)者端進行。

生產(chǎn)者可以通過初始化事務,將一系列的消息寫入事務,然后通過"commitTransaction()"提交事務,或者通過"abortTransaction()"中止事務。

Kafka會保證在事務提交之前,寫入的所有消息不會被消費者可見,以保持事務的一致性。

public class KafkaTransactionDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id");
        Producer<String, String> producer = new KafkaProducer<>(props);
        // 初始化事務
        producer.initTransactions();
        try {
            // 開啟事務
            producer.beginTransaction();
            // 發(fā)送消息
            ProducerRecord<String, String> record = new ProducerRecord<>("topic_name", "Hello, Kafka!");
            producer.send(record);
            // 提交事務
            producer.commitTransaction();
        } catch (ProducerFencedException e) {
            // 處理異常情況
            producer.close();
        } finally {
            producer.close();
        }
    }
}

消息確認 ACK機制

1、RabbitMQ的ACK機制

RabbitMQ使用ACK(消息確認)機制來確保消息的可靠傳遞。

消費者收到消息后,需要向RabbitMQ發(fā)送ACK來確認消息的處理狀態(tài)。

只有在收到ACK后,RabbitMQ才會將消息標記為已成功傳遞,否則會將消息重新投遞給其他消費者或者保留在隊列中。

以下是RabbitMQ ACK的Java示例:

public class RabbitMQAckDemo {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        // 創(chuàng)建連接
        Connection connection = factory.newConnection();
        // 創(chuàng)建信道
        Channel channel = connection.createChannel();
        // 聲明隊列
        String queueName = "queue_name";
        channel.queueDeclare(queueName, false, false, false, null);
        // 創(chuàng)建消費者
        String consumerTag = "consumer_tag";
        boolean autoAck = false; // 關閉自動ACK
        // 消費消息
        channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消費消息
                String message = new String(body, "UTF-8");
                System.out.println("Received message: " + message);
                try {
                    // 模擬處理消息的業(yè)務邏輯
                    processMessage(message);
                    // 手動發(fā)送ACK確認消息
                    long deliveryTag = envelope.getDeliveryTag();
                    channel.basicAck(deliveryTag, false);
                } catch (Exception e) {
                    // 處理消息異常,可以選擇重試或者記錄日志等操作
                    System.out.println("Failed to process message: " + message);
                    e.printStackTrace();
                    // 手動發(fā)送NACK拒絕消息,并可選是否重新投遞
                    long deliveryTag = envelope.getDeliveryTag();
                    boolean requeue = true; // 重新投遞消息
                    channel.basicNack(deliveryTag, false, requeue);
                }
            }
        });
    }
    private static void processMessage(String message) {
        // 模擬處理消息的業(yè)務邏輯
    }
}

2、RocketMQ的ACK機制

RocketMQ的ACK機制由消費者控制,消費者從消息隊列中消費消息后,可以手動發(fā)送ACK確認消息的處理狀態(tài)。

只有在收到ACK后,RocketMQ才會將消息標記為已成功消費,否則會將消息重新投遞給其他消費者。

public class RocketMQAckDemo {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
        consumer.setNamesrvAddr("localhost:9876");
        // 訂閱消息
        consumer.subscribe("topic_name", "*");
        // 注冊消息監(jiān)聽器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt message : msgs) {
                try {
                    // 消費消息
                    String msgBody = new String(message.getBody(), "UTF-8");
                    System.out.println("Received message: " + msgBody);
                    // 模擬處理消息的業(yè)務邏輯
                    processMessage(msgBody);
                    // 手動發(fā)送ACK確認消息
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    // 處理消息異常,可以選擇重試或者記錄日志等操作
                    System.out.println("Failed to process message: " + new String(message.getBody()));
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        // 啟動消費者
        consumer.start();
    }
    private static void processMessage(String message) {
        // 模擬處理消息的業(yè)務邏輯
    }
}

3、Kafka的ACK機制

Kafka的ACK機制用于控制生產(chǎn)者在發(fā)送消息后,需要等待多少個副本確認才視為消息發(fā)送成功。

這個機制可以通過設置acks參數(shù)來進行配置。在Kafka中,acks參數(shù)有三個可選值:

acks=0:生產(chǎn)者在發(fā)送消息后不需要等待任何確認,直接將消息發(fā)送給Kafka集群。這種方式具有最高的吞吐量,但是也存在數(shù)據(jù)丟失的風險,因為生產(chǎn)者不會知道消息是否成功發(fā)送給任何副本。

acks=1:生產(chǎn)者在發(fā)送消息后只需要等待首領副本(leader replica)確認。一旦首領副本成功接收到消息,生產(chǎn)者就會收到確認。這種方式提供了一定的可靠性,但是如果首領副本在接收消息后但在確認之前發(fā)生故障,仍然可能會導致數(shù)據(jù)丟失。

acks=all:生產(chǎn)者在發(fā)送消息后需要等待所有副本都確認。只有當所有副本都成功接收到消息后,生產(chǎn)者才會收到確認。這是最安全的確認機制,確保了消息不會丟失,但是需要更多的時間和資源。acks=-1與acks=all是等效的。

public classKafkaProducerDemo{
    public static void main(String[]args){
        // 配置Kafka生產(chǎn)者的參數(shù)
        Propertiesprops=newProperties();
        props.put("bootstrap.servers","localhost:9092");// Kafka集群的地址和端口
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");// 鍵的序列化器
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 值的序列化器
        props.put("acks","all");// 設置ACK機制為所有副本都確認
        // 創(chuàng)建生產(chǎn)者實例
        KafkaProducer<String,String>producer=newKafkaProducer<>(props);
        // 構造消息
        Stringtopic="my_topic";
        Stringkey="my_key";
        Stringvalue="Hello, Kafka!";
        // 創(chuàng)建消息記錄
        ProducerRecord<String,String>record=newProducerRecord<>(topic,key,value);
        // 發(fā)送消息
        producer.send(record,newCallback(){
            @Override
            publicvoidonCompletion(RecordMetadatametadata,Exceptionexception){
                if(exception!=null){
                    System.err.println("發(fā)送消息出現(xiàn)異常:"+exception.getMessage());
                }else{
                    System.out.println("消息發(fā)送成功!位于分區(qū) "+metadata.partition()+",偏移量 "+metadata.offset());
                }
            }
        });
        // 關閉生產(chǎn)者
        producer.close();
    }
}

延遲消息實現(xiàn)

延遲隊列在實際項目中有非常多的應用場景,最常見的比如訂單未支付,超時取消訂單,在創(chuàng)建訂單的時候發(fā)送一條延遲消息,達到延遲時間之后消費者收到消息,如果訂單沒有支付的話,那么就取消訂單。


image-20250508192415995image-20250508192415995

1、RocketMQ實現(xiàn)延遲消息

RocketMQ 默認時間間隔分為 18 個級別,基本上也能滿足大部分場景的需要了。

默認延遲級別:

1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h

使用起來也非常的簡單,直接通過setDelayTimeLevel設置延遲級別即可。

setDelayTimeLevel(level)

實現(xiàn)原理說起來比較簡單,Broker 會根據(jù)不同的延遲級別創(chuàng)建出多個不同級別的隊列,當我們發(fā)送延遲消息的時候,根據(jù)不同的延遲級別發(fā)送到不同的隊列中,同時在 Broker 內(nèi)部通過一個定時器去輪詢這些隊列(RocketMQ 會為每個延遲級別分別創(chuàng)建一個定時任務),如果消息達到發(fā)送時間,那么就直接把消息發(fā)送到指 topic 隊列中。

RocketMQ 這種實現(xiàn)方式是放在服務端去做的,同時有個好處就是相同延遲時間的消息是可以保證有序性的。

談到這里就順便提一下關于消息消費重試的原理,這個本質(zhì)上來說其實是一樣的,對于消費失敗需要重試的消息實際上都會被丟到延遲隊列的 topic 里,到期后再轉(zhuǎn)發(fā)到真正的 topic 中。


image-20250508192539070image-20250508192539070

2、RabbitMQ實現(xiàn)延遲消息

RabbitMQ本身并不存在延遲隊列的概念,在 RabbitMQ 中是通過 DLX 死信交換機和 TTL 消息過期來實現(xiàn)延遲隊列的。

TTL(Time to Live)過期時間

有兩種方式可以設置 TTL。

(1) 通過隊列屬性設置,這樣的話隊列中的所有消息都會擁有相同的過期時間(2) 對消息單獨設置過期時間,這樣每條消息的過期時間都可以不同

那么如果同時設置呢?這樣將會以兩個時間中較小的值為準。

針對隊列的方式通過參數(shù)x-message-ttl來設置。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

針對消息的方式通過setExpiration來設置。

AMQP.BasicProperties properties = new AMQP.BasicProperties();
Properties.setDeliveryMode(2);
properties.setExpiration("60000");
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "message".getBytes());

DLX(Dead Letter Exchange)死信交換機

一個消息要成為死信消息有 3 種情況:

(1) 消息被拒絕,比如調(diào)用reject方法,并且需要設置requeuefalse

(2) 消息過期

(3) 隊列達到最大長度

可以通過參數(shù)dead-letter-exchange設置死信交換機,也可以通過參數(shù)dead-letter- exchange指定 RoutingKey(未指定則使用原隊列的 RoutingKey)。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "exchange.dlx");
args.put("x-dead-letter-routing-key", "routingkey");
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

實現(xiàn)原理

當我們對消息設置了 TTL 和 DLX 之后,當消息正常發(fā)送,通過 Exchange 到達 Queue 之后,由于設置了 TTL 過期時間,并且消息沒有被消費(訂閱的是死信隊列),達到過期時間之后,消息就轉(zhuǎn)移到與之綁定的 DLX 死信隊列之中。

這樣的話,就相當于通過 DLX 和 TTL 間接實現(xiàn)了延遲消息的功能,實際使用中我們可以根據(jù)不同的延遲級別綁定設置不同延遲時間的隊列來達到實現(xiàn)不同延遲時間的效果。

如果隊列通過 dead-letter-exchange 屬性指定了一個交換機,那么該隊列中的死信就會投遞到這個交換機中,這個交換機稱為死信交換機(Dead Letter Exchange,簡稱DLX)。


image-20250508192505250image-20250508192505250

3、Kafka實現(xiàn)延遲消息

對于 Kafka 來說,原生并不支持延遲隊列的功能,需要我們手動去實現(xiàn),這里我根據(jù) RocketMQ 的設計提供一個實現(xiàn)思路。

這個設計,我們也不支持任意時間精度的延遲消息,只支持固定級別的延遲,因為對于大部分延遲消息的場景來說足夠使用了。

只創(chuàng)建一個 topic,但是針對該 topic 創(chuàng)建 18 個 partition,每個 partition 對應不同的延遲級別,這樣做和 RocketMQ 一樣有個好處就是能達到相同延遲時間的消息達到有序性。

應用級 Kafka 延遲消息實現(xiàn)原理

首先創(chuàng)建一個單獨針對延遲隊列的 topic,同時創(chuàng)建 18 個 partition 針對不同的延遲級別。

發(fā)送消息的時候根據(jù)延遲參數(shù)發(fā)送到延遲 topic 對應的 partition,對應的key為延遲時間,同時把原 topic 保存到 header 中。

ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>("delay_topic", delayPartition, delayTime, data);
producerRecord.headers().add("origin_topic", topic.getBytes(StandardCharsets.UTF_8));

內(nèi)嵌的consumer單獨設置一個ConsumerGroup去消費延遲 topic 消息,消費到消息之后如果沒有達到延遲時間那么就進行pause,然后seek到當前ConsumerRecordoffset位置,同時使用定時器去輪詢延遲的TopicPartition,達到延遲時間之后進行resume。

如果達到了延遲時間,那么就獲取到header中的真實 topic ,直接轉(zhuǎn)發(fā)。

這里為什么要進行pauseresume呢?

因為如果不這樣的話,如果超時未消費達到max.poll.interval.ms最大時間(默認300s),那么將會觸發(fā) Rebalance。

責任編輯:武曉燕 來源: 技術自由圈
相關推薦

2009-02-16 17:21:46

2023-09-19 08:09:21

RabbitMQRocketMQKafka

2025-02-27 08:50:00

RocketMQ開發(fā)代碼

2017-05-05 10:15:38

深度學習框架對比分析

2010-05-13 13:27:23

2012-11-19 11:30:40

PowerShell常見問題解決方法

2010-06-08 11:15:43

OpenSUSE Ub

2018-04-23 09:50:54

2011-11-23 16:28:07

JavaSpring框架

2018-05-10 12:55:51

大數(shù)據(jù)對比分析面試

2024-08-22 14:49:49

系統(tǒng)設計數(shù)據(jù)庫

2010-06-12 15:36:01

2015-09-22 10:14:57

虛擬化虛擬化問題

2021-08-24 07:57:26

KafkaRocketMQPulsar

2010-08-06 16:15:57

Flex通信

2024-01-09 15:37:46

2010-07-20 16:16:21

SDH

2018-01-26 14:29:01

框架

2018-01-21 14:11:22

人工智能PaddlePaddlTensorflow

2023-05-14 22:00:01

點贊
收藏

51CTO技術棧公眾號