MQ黃金三劍客 Rabbit Rocket Kafka深入解密常見(jiàn)問(wèn)題及功能對(duì)比指南
1、消息丟失問(wèn)題
RabbitMQ解決消息丟失的問(wèn)題:
- RabbitMQ通過(guò)消息持久化和消息確認(rèn)機(jī)制來(lái)確保消息的可靠傳遞。生產(chǎn)者可以選擇將消息標(biāo)記為持久化,使得即使在消息隊(duì)列服務(wù)器故障后,消息也能被保存并傳遞給消費(fèi)者。
- RabbitMQ還提供了多種消息確認(rèn)機(jī)制,如發(fā)布確認(rèn)(Publish Confirm)和事務(wù)機(jī)制(Transaction),生產(chǎn)者可以通過(guò)這些機(jī)制獲取消息是否成功被RabbitMQ接收和處理的確認(rèn)。
RocketMQ解決消息丟失的問(wèn)題:
- RocketMQ通過(guò)持久化存儲(chǔ)和副本機(jī)制來(lái)保證消息的可靠傳遞。消息在發(fā)送前會(huì)被持久化存儲(chǔ)到磁盤上,即使在消息服務(wù)器故障時(shí)也能夠恢復(fù)消息。
- RocketMQ支持多副本機(jī)制,將消息復(fù)制到多個(gè)Broker節(jié)點(diǎn)上,即使其中一個(gè)Broker節(jié)點(diǎn)發(fā)生故障,仍然可以從其他副本節(jié)點(diǎn)讀取和傳遞消息。
Kafka解決消息丟失的問(wèn)題:
- Kafka通過(guò)持久化存儲(chǔ)和副本機(jī)制來(lái)保證消息的可靠傳遞。消息在發(fā)送前被持久化存儲(chǔ)到磁盤上,即使在服務(wù)器重啟后也不會(huì)丟失。
- Kafka采用多副本機(jī)制,將消息復(fù)制到多個(gè)Broker節(jié)點(diǎn)上,即使其中一個(gè)Broker節(jié)點(diǎn)故障,仍然可以從其他副本節(jié)點(diǎn)讀取和傳遞消息。
2、消息積壓?jiǎn)栴}
RabbitMQ解決消息積壓的問(wèn)題:
- RabbitMQ通過(guò)調(diào)整消費(fèi)者的消費(fèi)速率來(lái)控制消息積壓??梢允褂肣oS(Quality of Service)機(jī)制設(shè)置每個(gè)消費(fèi)者的預(yù)取計(jì)數(shù),限制每次從隊(duì)列中獲取的消息數(shù)量,以提升消費(fèi)者的處理速度。
- RabbitMQ還支持消費(fèi)者端的流量控制,通過(guò)設(shè)置basic.qos參數(shù)來(lái)提升消費(fèi)者的處理速度,避免消息過(guò)多導(dǎo)致積壓。
RocketMQ解決消息積壓的問(wèn)題:
- RocketMQ通過(guò)動(dòng)態(tài)提升消費(fèi)者的消費(fèi)速率來(lái)控制消息積壓??梢愿鶕?jù)系統(tǒng)的負(fù)載情況和消息隊(duì)列的堆積情況,動(dòng)態(tài)調(diào)整消費(fèi)者的并發(fā)消費(fèi)線程數(shù),以適應(yīng)消息的處理需求。
- RocketMQ還提供了消息拉取和推拉模式,消費(fèi)者可以根據(jù)自身的處理能力主動(dòng)拉取消息,避免消息積壓過(guò)多。
Kafka解決消息積壓的問(wèn)題:
- Kafka通過(guò)分區(qū)和副本機(jī)制來(lái)實(shí)現(xiàn)消息的并行處理和負(fù)載均衡。可以根據(jù)消息的負(fù)載情況和消費(fèi)者的處理能力,通過(guò)增加分區(qū)數(shù)量、調(diào)整副本分配策略等方式來(lái)提高系統(tǒng)的處理能力。
- Kafka還提供了消息清理(compaction)和數(shù)據(jù)保留策略,可以根據(jù)時(shí)間或者數(shù)據(jù)大小來(lái)自動(dòng)刪除過(guò)期的消息,避免消息積壓過(guò)多。
3、消息重復(fù)消費(fèi)問(wèn)題
RabbitMQ:
- 冪等性處理:在消費(fèi)者端實(shí)現(xiàn)冪等性邏輯,即無(wú)論消息被消費(fèi)多少次,最終的結(jié)果應(yīng)該保持一致。這可以通過(guò)在消費(fèi)端進(jìn)行唯一標(biāo)識(shí)的檢查或者記錄已經(jīng)處理過(guò)的消息來(lái)實(shí)現(xiàn)。
- 消息確認(rèn)機(jī)制:消費(fèi)者在處理完消息后,發(fā)送確認(rèn)消息(ACK)給RabbitMQ,告知消息已經(jīng)成功處理。RabbitMQ根據(jù)接收到的確認(rèn)消息來(lái)判斷是否需要重新投遞消息給其他消費(fèi)者。
RocketMQ:
- 使用消息唯一標(biāo)識(shí)符(Message ID):在消息發(fā)送時(shí),為每條消息附加一個(gè)唯一標(biāo)識(shí)符。消費(fèi)者在處理消息時(shí),可以通過(guò)判斷消息唯一標(biāo)識(shí)符來(lái)避免重復(fù)消費(fèi)??梢詫⑾D記錄在數(shù)據(jù)庫(kù)或緩存中,用于去重檢查。
- 消費(fèi)者端去重處理:消費(fèi)者在消費(fèi)消息時(shí),可以通過(guò)維護(hù)一個(gè)已消費(fèi)消息的列表或緩存,來(lái)避免重復(fù)消費(fèi)已經(jīng)處理過(guò)的消息。
Kafka:
- 冪等性處理:在消費(fèi)者端實(shí)現(xiàn)冪等性邏輯,即多次消費(fèi)同一條消息所產(chǎn)生的結(jié)果與單次消費(fèi)的結(jié)果一致。這可以通過(guò)在業(yè)務(wù)邏輯中引入唯一標(biāo)識(shí)符或記錄已處理消息的狀態(tài)來(lái)實(shí)現(xiàn)。
- 消息確認(rèn)機(jī)制:消費(fèi)者在處理完消息后,提交已消費(fèi)的偏移量(Offset)給Kafka,Kafka會(huì)記錄已提交的偏移量,以便在消費(fèi)者重新啟動(dòng)時(shí)從正確的位置繼續(xù)消費(fèi)。消費(fèi)者可以定期提交偏移量,確保消息只被消費(fèi)一次。
4、消息順序性
rabbitmq 的消息順序性主要依賴于以下幾個(gè)方面:
- 單個(gè)隊(duì)列:rabbitmq 保證了同一個(gè)隊(duì)列中的消息按照發(fā)布的順序進(jìn)入和出隊(duì)。
rokcetmq 的消息順序性主要依賴于以下幾個(gè)方面:
- 有序分區(qū):rokcetmq 保證了同一個(gè)隊(duì)列(topic + queueId)中的消息按照發(fā)布的順序存儲(chǔ)和消費(fèi)。
kafka 的消息順序性主要依賴于以下幾個(gè)方面:
- 有序分區(qū):kafka 保證了同一個(gè)分區(qū)(topic + partition)中的消息按照發(fā)布的順序存儲(chǔ)和消費(fèi)。
5、事務(wù)消息
RabbitMQ的事務(wù)消息:
- RabbitMQ支持事務(wù)消息的發(fā)送和確認(rèn)。在發(fā)送消息之前,可以通過(guò)調(diào)用"channel.txSelect()"來(lái)開(kāi)啟事務(wù),然后將要發(fā)送的消息發(fā)布到交換機(jī)中。如果事務(wù)成功提交,消息將被發(fā)送到隊(duì)列,否則事務(wù)會(huì)回滾,消息不會(huì)被發(fā)送。
- 在消費(fèi)端,可以通過(guò)"channel.txSelect()"開(kāi)啟事務(wù),然后使用"basicAck"手動(dòng)確認(rèn)消息的處理結(jié)果。如果事務(wù)成功提交,消費(fèi)端會(huì)發(fā)送ACK確認(rèn)消息的處理;否則,事務(wù)回滾,消息將被重新投遞。
public class RabbitMQTransactionDemo {
private static final String QUEUE_NAME = "transaction_queue";
public static void main(String[] args) {
try {
// 創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 創(chuàng)建連接
Connection connection = factory.newConnection();
// 創(chuàng)建信道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
try {
// 開(kāi)啟事務(wù)
channel.txSelect();
// 發(fā)送消息
String message = "Hello, RabbitMQ!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
// 提交事務(wù)
channel.txCommit();
} catch (Exception e) {
// 事務(wù)回滾
channel.txRollback();
e.printStackTrace();
}
// 關(guān)閉信道和連接
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
RocketMQ的事務(wù)消息:
- RocketMQ提供了事務(wù)消息的機(jī)制,確保消息的可靠性和一致性。發(fā)送事務(wù)消息時(shí),需要將消息發(fā)送到半消息隊(duì)列,然后執(zhí)行本地事務(wù)邏輯。事務(wù)執(zhí)行成功后,通過(guò)調(diào)用"TransactionStatus.CommitTransaction"提交事務(wù)消息;若事務(wù)執(zhí)行失敗,則通過(guò)調(diào)用"TransactionStatus.RollbackTransaction"回滾事務(wù)消息。事務(wù)消息的最終狀態(tài)由消息生產(chǎn)者根據(jù)事務(wù)執(zhí)行結(jié)果進(jìn)行確認(rèn)。
public class RocketMQTransactionDemo {
public static void main(String[] args) throws Exception {
// 創(chuàng)建事務(wù)消息生產(chǎn)者
TransactionMQProducer producer = new TransactionMQProducer("group_name");
producer.setNamesrvAddr("localhost:9876");
// 設(shè)置事務(wù)監(jiān)聽(tīng)器
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 執(zhí)行本地事務(wù)邏輯,根據(jù)業(yè)務(wù)邏輯結(jié)果返回相應(yīng)的狀態(tài)
// 返回 LocalTransactionState.COMMIT_MESSAGE 表示事務(wù)提交
// 返回 LocalTransactionState.ROLLBACK_MESSAGE 表示事務(wù)回滾
// 返回 LocalTransactionState.UNKNOW 表示事務(wù)狀態(tài)未知
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 根據(jù)消息的狀態(tài),來(lái)判斷本地事務(wù)的最終狀態(tài)
// 返回 LocalTransactionState.COMMIT_MESSAGE 表示事務(wù)提交
// 返回 LocalTransactionState.ROLLBACK_MESSAGE 表示事務(wù)回滾
// 返回 LocalTransactionState.UNKNOW 表示事務(wù)狀態(tài)未知
}
});
// 啟動(dòng)事務(wù)消息生產(chǎn)者
producer.start();
// 構(gòu)造消息
Message msg = new Message("topic_name", "tag_name", "Hello, RocketMQ!".getBytes());
// 發(fā)送事務(wù)消息
TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.println("Send Result: " + sendResult);
// 關(guān)閉事務(wù)消息生產(chǎn)者
producer.shutdown();
}
}
Kafka的事務(wù)消息:
- Kafka引入了事務(wù)功能來(lái)確保消息的原子性和一致性。事務(wù)消息的發(fā)送和確認(rèn)在生產(chǎn)者端進(jìn)行。生產(chǎn)者可以通過(guò)初始化事務(wù),將一系列的消息寫入事務(wù),然后通過(guò)"commitTransaction()"提交事務(wù),或者通過(guò)"abortTransaction()"中止事務(wù)。Kafka會(huì)保證在事務(wù)提交之前,寫入的所有消息不會(huì)被消費(fèi)者可見(jiàn),以保持事務(wù)的一致性。
public class KafkaTransactionDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id");
Producer<String, String> producer = new KafkaProducer<>(props);
// 初始化事務(wù)
producer.initTransactions();
try {
// 開(kāi)啟事務(wù)
producer.beginTransaction();
// 發(fā)送消息
ProducerRecord<String, String> record = new ProducerRecord<>("topic_name", "Hello, Kafka!");
producer.send(record);
// 提交事務(wù)
producer.commitTransaction();
} catch (ProducerFencedException e) {
// 處理異常情況
producer.close();
} finally {
producer.close();
}
}
}
6、ACK機(jī)制
RabbitMQ的ACK機(jī)制:
RabbitMQ使用ACK(消息確認(rèn))機(jī)制來(lái)確保消息的可靠傳遞。消費(fèi)者收到消息后,需要向RabbitMQ發(fā)送ACK來(lái)確認(rèn)消息的處理狀態(tài)。只有在收到ACK后,RabbitMQ才會(huì)將消息標(biāo)記為已成功傳遞,否則會(huì)將消息重新投遞給其他消費(fèi)者或者保留在隊(duì)列中。
以下是RabbitMQ ACK的Java示例:
public class RabbitMQAckDemo {
public static void main(String[] args) throws Exception {
// 創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 創(chuàng)建連接
Connection connection = factory.newConnection();
// 創(chuàng)建信道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
String queueName = "queue_name";
channel.queueDeclare(queueName, false, false, false, null);
// 創(chuàng)建消費(fèi)者
String consumerTag = "consumer_tag";
boolean autoAck = false; // 關(guān)閉自動(dòng)ACK
// 消費(fèi)消息
channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消費(fèi)消息
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
try {
// 模擬處理消息的業(yè)務(wù)邏輯
processMessage(message);
// 手動(dòng)發(fā)送ACK確認(rèn)消息
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 處理消息異常,可以選擇重試或者記錄日志等操作
System.out.println("Failed to process message: " + message);
e.printStackTrace();
// 手動(dòng)發(fā)送NACK拒絕消息,并可選是否重新投遞
long deliveryTag = envelope.getDeliveryTag();
boolean requeue = true; // 重新投遞消息
channel.basicNack(deliveryTag, false, requeue);
}
}
});
}
private static void processMessage(String message) {
// 模擬處理消息的業(yè)務(wù)邏輯
}
}
RocketMQ的ACK機(jī)制:
RocketMQ的ACK機(jī)制由消費(fèi)者控制,消費(fèi)者從消息隊(duì)列中消費(fèi)消息后,可以手動(dòng)發(fā)送ACK確認(rèn)消息的處理狀態(tài)。只有在收到ACK后,RocketMQ才會(huì)將消息標(biāo)記為已成功消費(fèi),否則會(huì)將消息重新投遞給其他消費(fèi)者。
以下是RocketMQ ACK的Java示例:
public class RocketMQAckDemo {
public static void main(String[] args) throws Exception {
// 創(chuàng)建消費(fèi)者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
consumer.setNamesrvAddr("localhost:9876");
// 訂閱消息
consumer.subscribe("topic_name", "*");
// 注冊(cè)消息監(jiān)聽(tīng)器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt message : msgs) {
try {
// 消費(fèi)消息
String msgBody = new String(message.getBody(), "UTF-8");
System.out.println("Received message: " + msgBody);
// 模擬處理消息的業(yè)務(wù)邏輯
processMessage(msgBody);
// 手動(dòng)發(fā)送ACK確認(rèn)消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 處理消息異常,可以選擇重試或者記錄日志等操作
System.out.println("Failed to process message: " + new String(message.getBody()));
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 啟動(dòng)消費(fèi)者
consumer.start();
}
private static void processMessage(String message) {
// 模擬處理消息的業(yè)務(wù)邏輯
}
}
Kafka的ACK機(jī)制:
Kafka的ACK機(jī)制用于控制生產(chǎn)者在發(fā)送消息后,需要等待多少個(gè)副本確認(rèn)才視為消息發(fā)送成功。這個(gè)機(jī)制可以通過(guò)設(shè)置acks參數(shù)來(lái)進(jìn)行配置。
在Kafka中,acks參數(shù)有三個(gè)可選值:
- acks=0:生產(chǎn)者在發(fā)送消息后不需要等待任何確認(rèn),直接將消息發(fā)送給Kafka集群。這種方式具有最高的吞吐量,但是也存在數(shù)據(jù)丟失的風(fēng)險(xiǎn),因?yàn)樯a(chǎn)者不會(huì)知道消息是否成功發(fā)送給任何副本。
- acks=1:生產(chǎn)者在發(fā)送消息后只需要等待首領(lǐng)副本(leader replica)確認(rèn)。一旦首領(lǐng)副本成功接收到消息,生產(chǎn)者就會(huì)收到確認(rèn)。這種方式提供了一定的可靠性,但是如果首領(lǐng)副本在接收消息后但在確認(rèn)之前發(fā)生故障,仍然可能會(huì)導(dǎo)致數(shù)據(jù)丟失。
- acks=all:生產(chǎn)者在發(fā)送消息后需要等待所有副本都確認(rèn)。只有當(dāng)所有副本都成功接收到消息后,生產(chǎn)者才會(huì)收到確認(rèn)。這是最安全的確認(rèn)機(jī)制,確保了消息不會(huì)丟失,但是需要更多的時(shí)間和資源。acks=-1與acks=all是等效的。
下面是一個(gè)使用Java編寫的Kafka生產(chǎn)者示例代碼:
public class KafkaProducerDemo {
public static void main(String[] args) {
// 配置Kafka生產(chǎn)者的參數(shù)
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群的地址和端口
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 鍵的序列化器
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器
props.put("acks", "all"); // 設(shè)置ACK機(jī)制為所有副本都確認(rèn)
// 創(chuàng)建生產(chǎn)者實(shí)例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 構(gòu)造消息
String topic = "my_topic";
String key = "my_key";
String value = "Hello, Kafka!";
// 創(chuàng)建消息記錄
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
// 發(fā)送消息
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("發(fā)送消息出現(xiàn)異常:" + exception.getMessage());
} else {
System.out.println("消息發(fā)送成功!位于分區(qū) " + metadata.partition() + ",偏移量 " + metadata.offset());
}
}
});
// 關(guān)閉生產(chǎn)者
producer.close();
}
}