在電商等系統(tǒng)中,下單后需要進行一系列的處理,包括庫存扣減、支付通知等??梢詫⒂唵蜗嚓P(guān)的消息發(fā)送到消息隊列中,由消費者異步地進行處理,從而提高系統(tǒng)的處理能力和可靠性。

Spring Boot 是一個基于 Spring 框架的快速開發(fā)框架,而 RabbitMQ 和 RocketMQ 則是常用的消息隊列中間件。下面是它們常用的一些用法和場景。
1、訂單處理
在電商等系統(tǒng)中,下單后需要進行一系列的處理,包括庫存扣減、支付通知等??梢詫⒂唵蜗嚓P(guān)的消息發(fā)送到消息隊列中,由消費者異步地進行處理,從而提高系統(tǒng)的處理能力和可靠性。
使用 RabbitMQ 實現(xiàn)訂單處理的示例代碼:
// 發(fā)送訂單消息
rabbitTemplate.convertAndSend("order-exchange", "order-routing-key", order);
// 消費訂單消息
@RabbitListener(queues = "order-queue")
public void handleOrderMessage(Order order) {
// 處理訂單相關(guān)的業(yè)務(wù)邏輯
}
使用 RocketMQ 實現(xiàn)訂單處理的示例代碼:
// 發(fā)送訂單消息
DefaultMQProducer producer = new DefaultMQProducer("order-producer-group");
producer.setNamesrvAddr("localhost:9876");
Message message = new Message("order-topic", "order-tag", order.toString().getBytes());
producer.send(message);
// 消費訂單消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-consumer-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("order-topic", "order-tag");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// 處理訂單相關(guān)的業(yè)務(wù)邏輯
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
2、日志處理
在分布式系統(tǒng)中,各個節(jié)點產(chǎn)生的日志需要進行集中處理和存儲,以便后續(xù)的分析和監(jiān)控??梢允褂孟㈥犃袑⑷罩鞠l(fā)送到中心化的日志系統(tǒng)中。
使用 RabbitMQ 實現(xiàn)日志處理的示例代碼:
// 發(fā)送日志消息
rabbitTemplate.convertAndSend("log-exchange", "log-routing-key", log);
// 消費日志消息
@RabbitListener(queues = "log-queue")
public void handleLogMessage(Log log) {
// 處理日志相關(guān)的業(yè)務(wù)邏輯
}
使用 RocketMQ 實現(xiàn)日志處理的示例代碼:
// 發(fā)送日志消息
DefaultMQProducer producer = new DefaultMQProducer("log-producer-group");
producer.setNamesrvAddr("localhost:9876");
Message message = new Message("log-topic", "log-tag", log.toString().getBytes());
producer.send(message);
// 消費日志消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("log-consumer-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("log-topic", "log-tag");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// 處理日志相關(guān)的業(yè)務(wù)邏輯
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
3、分布式事務(wù)
在分布式系統(tǒng)中,不同的服務(wù)之間需要進行事務(wù)管理,以保證數(shù)據(jù)的一致性和可靠性??梢允褂孟㈥犃衼韺崿F(xiàn)分布式事務(wù)的消息確認(rèn)和回滾。
使用 RabbitMQ 實現(xiàn)分布式事務(wù)的示例代碼:
// 發(fā)送訂單消息和庫存消息
rabbitTemplate.invoke(new RabbitCallback<Void>() {
public Void doInRabbit(Channel channel) throws Exception {
channel.txSelect();
channel.basicPublish("order-exchange", "order-routing-key", null, order.toString().getBytes());
channel.basicPublish("stock-exchange", "stock-routing-key", null, stock.toString().getBytes());
channel.txCommit();
return null;
}
});
// 消費訂單消息和庫存消息
@RabbitListener(queues = "order-queue")
@Transactional
public void handleOrderMessage(Order order) {
// 處理訂單相關(guān)的業(yè)務(wù)邏輯
}
@RabbitListener(queues = "stock-queue")
@Transactional
public void handleStockMessage(Stock stock) {
// 處理庫存相關(guān)的業(yè)務(wù)邏輯
}
使用 RocketMQ 實現(xiàn)分布式事務(wù)的示例代碼:
// 發(fā)送訂單消息和庫存消息
TransactionMQProducer producer = new TransactionMQProducer("transaction-producer-group");
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(new TransactionListener() {
public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
// 執(zhí)行本地事務(wù)
return LocalTransactionState.COMMIT_MESSAGE;
}
public LocalTransactionState checkLocalTransaction(MessageExt message) {
// 檢查本地事務(wù)狀態(tài)
return LocalTransactionState.COMMIT_MESSAGE;
}
});
Message message1 = new Message("order-topic", "order-tag", order.toString().getBytes());
Message message2 = new Message("stock-topic", "stock-tag", stock.toString().getBytes());
TransactionSendResult result = producer.sendMessageInTransaction(Arrays.asList(message1, message2), null);
// 消費訂單消息和庫存消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction-consumer-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("order-topic", "order-tag");
consumer.subscribe("stock-topic", "stock-tag");
consumer.registerMessageListener(new MessageListenerOrderly() {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messages, ConsumeOrderlyContext context) {
for (MessageExt message : messages) {
// 處理訂單或庫存相關(guān)的業(yè)務(wù)邏輯
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
4、消息隊列比較
RabbitMQ 和 RocketMQ 都是常用的消息隊列中間件,它們在特點和使用上有些區(qū)別。
- RabbitMQ 采用 AMQP(高級消息隊列協(xié)議),支持多種編程語言和多種操作系統(tǒng)。RabbitMQ 的消息傳輸可靠性較高,但性能較低。
- RocketMQ 采用自定義的協(xié)議,適合 Java 應(yīng)用,支持高吞吐量和高可用性。RocketMQ 的消息傳輸可靠性較低,但性能較高。