Spring Boot + RabbitMQ 消息100%可靠?3大絕招 + 手動 ACK,徹底告別丟失!
在分布式系統(tǒng)架構(gòu)中,RabbitMQ 作為強大的消息中間件,廣泛應(yīng)用于訂單、庫存、支付等核心業(yè)務(wù)場景。然而,消息丟失問題時有發(fā)生,例如:
- 訂單支付通知丟失導(dǎo)致客戶已付款但系統(tǒng)未更新狀態(tài);
- 庫存扣減消息丟失導(dǎo)致庫存數(shù)據(jù)與實際銷量不一致;
- 消息無法到達(dá)消費者導(dǎo)致業(yè)務(wù)流程中斷,嚴(yán)重影響用戶體驗。
要解決這些問題,我們需要建立 高可靠性的 RabbitMQ 消息傳輸機制,確保消息 生產(chǎn)、存儲、消費 三個環(huán)節(jié)的穩(wěn)定性。本文將基于 Spring Boot 3.4
,介紹 生產(chǎn)者確認(rèn)機制、消息持久化、手動 ACK 三大核心策略,并提供完整的可運行代碼示例,幫助你徹底告別 RabbitMQ 消息丟失。
消息丟失的3大“案發(fā)現(xiàn)場”
生產(chǎn)者消息投遞失敗
- 問題原因網(wǎng)絡(luò)抖動、RabbitMQ 服務(wù)宕機、路由配置錯誤;
- 后果消息未成功發(fā)送,導(dǎo)致數(shù)據(jù)不一致;
- 解決方案生產(chǎn)者 Confirm 模式 + Return 回調(diào) 機制。
MQ 服務(wù)崩潰
- 問題原因RabbitMQ 服務(wù)器故障、磁盤損壞、未開啟消息持久化;
- 后果未持久化的消息在 RabbitMQ 宕機后丟失;
- 解決方案交換機、隊列、消息持久化,保證消息不會因重啟而丟失。
消費者崩潰
- 問題原因:消費者在處理消息時異常退出,或者自動 ACK 機制導(dǎo)致 RabbitMQ 認(rèn)為消息已消費;
- 后果:消息被 RabbitMQ 移除,但實際業(yè)務(wù)未處理成功;
- 解決方案:手動 ACK + 冪等性控制,確保消息消費的可靠性。
生產(chǎn)者可靠性:Confirm模式 + Return機制
啟用Confirm與Return機制
spring:
rabbitmq:
publisher-confirm-type: correlated # 啟用Confirm模式
publisher-returns: true # 啟用Return機制
template:
mandatory: true # 讓生產(chǎn)者接收未被路由的消息通知
實現(xiàn)Confirm回調(diào)(確保消息成功落庫)
package com.icoderoad.mq;
import com.icoderoad.mapper.MessageLogMapper;
import com.icoderoad.model.MessageStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired
private MessageLogMapper messageLogMapper;
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("? 消息到達(dá) Exchange,ID:{}", correlationData.getId());
messageLogMapper.updateStatus(correlationData.getId(), MessageStatus.SENT);
} else {
log.error("? 消息投遞失敗,ID:{},原因:{}", correlationData.getId(), cause);
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("?? 消息路由失??!交換機:{},路由鍵:{},消息:{}",
returned.getExchange(), returned.getRoutingKey(), new String(returned.getMessage().getBody()));
}
}
發(fā)送消息(攜帶唯一消息ID)
public void sendOrder(Order order) {
String msgId = UUID.randomUUID().toString();
messageLogMapper.insert(new MessageLog(msgId, order, MessageStatus.SENDING));
rabbitTemplate.convertAndSend(
"order-exchange",
"order.create",
order,
message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setMessageId(msgId);
return message;
},
new CorrelationData(msgId)
);
}
MQ可靠性:隊列/消息持久化
package com.icoderoad;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order-exchange", true, false);
}
@Bean
public Queue orderQueue() {
return new Queue("order.queue", true);
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order.create");
}
}
消費者可靠性:手動ACK + 冪等性
關(guān)閉自動ACK,改為手動
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 開啟手動ACK
prefetch: 10 # 限制單次拉取消息數(shù)
處理訂單消息(確保冪等性 + 手動ACK
package com.icoderoad.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Header;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.support.AmqpHeaders;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class OrderConsumer {
@Autowired
private OrderService orderService;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@RabbitListener(queues = "order.queue")
public void handleOrder(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
String redisKey = "order:" + order.getOrderNo();
if (redisTemplate.opsForValue().setIfAbsent(redisKey, "processing", 30, TimeUnit.MINUTES)) {
orderService.process(order);
redisTemplate.delete(redisKey);
channel.basicAck(tag, false);
log.info("?? 訂單處理成功:{}", order.getOrderNo());
} else {
log.warn("?? 訂單正在處理中,直接ACK:{}", order.getOrderNo());
channel.basicAck(tag, false);
}
} catch (Exception e) {
log.error("? 訂單處理失?。簕}", order.getOrderNo(), e);
channel.basicNack(tag, false, true);
}
}
}
全鏈路監(jiān)控(生產(chǎn)環(huán)境推薦)
- 消息追蹤記錄消息流轉(zhuǎn)狀態(tài)
- 死信隊列避免消息無限重試
- 消息監(jiān)控Grafana監(jiān)控消息積壓、ACK率、重試次數(shù)
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
.deadLetterExchange("dlx.exchange")
.deadLetterRoutingKey("dlx.order")
.build();
}
核心配置一覽:
配置項 | 作用 |
| 確認(rèn)消息到達(dá)Exchange |
| 監(jiān)聽未路由消息 |
| 確保消息重啟后不丟失 |
| 關(guān)閉自動ACK,使用手動確認(rèn) |
| 確保消息持久化 |
結(jié)論
在高并發(fā)分布式系統(tǒng)中,RabbitMQ 消息可靠性 直接關(guān)系到業(yè)務(wù)數(shù)據(jù)的完整性和一致性。本篇文章介紹了 Spring Boot 3.4 下 RabbitMQ 100% 可靠消息傳輸方案,核心思路包括:
- 生產(chǎn)者端采用 Confirm 機制 + Return 回調(diào),確保消息成功到達(dá) RabbitMQ;
- RabbitMQ 服務(wù)器端開啟 隊列、交換機持久化,防止消息因宕機丟失;
- 消費者端使用 手動 ACK + 冪等控制,確保消息被正確消費。
此外,我們還提到 死信隊列 及 消息軌跡監(jiān)控 作為生產(chǎn)級增強方案,進(jìn)一步提升系統(tǒng)穩(wěn)定性。如果你在生產(chǎn)環(huán)境中使用 RabbitMQ,建議你根據(jù)本篇內(nèi)容進(jìn)行配置,確保消息 零丟失、零誤判,構(gòu)建更加健壯的消息隊列架構(gòu)。