四種策略確保 RabbitMQ 消息發(fā)送可靠性!你用哪種?
微服務(wù)可以設(shè)計(jì)成消息驅(qū)動(dòng)的微服務(wù),響應(yīng)式系統(tǒng)也可以基于消息中間件來做,從這個(gè)角度來說,在互聯(lián)網(wǎng)應(yīng)用開發(fā)中,消息中間件真的是太重要了。
今天,以 RabbitMQ 為例,松哥來和大家聊一聊消息中間消息發(fā)送可靠性的問題。
注意,以下內(nèi)容我主要和大家討論如何確保消息生產(chǎn)者將消息發(fā)送成功,并不涉及消息消費(fèi)的問題。
1. RabbitMQ 消息發(fā)送機(jī)制
大家知道,RabbitMQ 中的消息發(fā)送引入了 Exchange(交換機(jī))的概念,消息的發(fā)送首先到達(dá)交換機(jī)上,然后再根據(jù)既定的路由規(guī)則,由交換機(jī)將消息路由到不同的 Queue(隊(duì)列)中,再由不同的消費(fèi)者去消費(fèi)。
大致的流程就是這樣,所以要確保消息發(fā)送的可靠性,主要從兩方面去確認(rèn):
- 消息成功到達(dá) Exchange
- 消息成功到達(dá) Queue
如果能確認(rèn)這兩步,那么我們就可以認(rèn)為消息發(fā)送成功了。
如果這兩步中任一步驟出現(xiàn)問題,那么消息就沒有成功送達(dá),此時(shí)我們可能要通過重試等方式去重新發(fā)送消息,多次重試之后,如果消息還是不能到達(dá),則可能就需要人工介入了。
經(jīng)過上面的分析,我們可以確認(rèn),要確保消息成功發(fā)送,我們只需要做好三件事就可以了:
- 確認(rèn)消息到達(dá) Exchange。
- 確認(rèn)消息到達(dá) Queue。
- 開啟定時(shí)任務(wù),定時(shí)投遞那些發(fā)送失敗的消息。
2. RabbitMQ 的努力
上面提出的三個(gè)步驟,第三步需要我們自己實(shí)現(xiàn),前兩步 RabbitMQ 則有現(xiàn)成的解決方案。
如何確保消息成功到達(dá) RabbitMQ?RabbitMQ 給出了兩種方案:
- 開啟事務(wù)機(jī)制
- 發(fā)送方確認(rèn)機(jī)制
這是兩種不同的方案,不可以同時(shí)開啟,只能選擇其中之一,如果兩者同時(shí)開啟,則會(huì)報(bào)如下錯(cuò)誤:
我們分別來看。以下所有案例都在 Spring Boot 中展開,文末可以下載相關(guān)源碼。
2.1 開啟事務(wù)機(jī)制
Spring Boot 中開啟 RabbitMQ 事務(wù)機(jī)制的方式如下:
首先需要先提供一個(gè)事務(wù)管理器,如下:
- @Bean
- RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
- return new RabbitTransactionManager(connectionFactory);
- }
接下來,在消息生產(chǎn)者上面做兩件事:添加事務(wù)注解并設(shè)置通信信道為事務(wù)模式:
- @Service
- public class MsgService {
- @Autowired
- RabbitTemplate rabbitTemplate;
- @Transactional
- public void send() {
- rabbitTemplate.setChannelTransacted(true);
- rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes());
- int i = 1 / 0;
- }
- }
這里注意兩點(diǎn):
- 發(fā)送消息的方法上添加 @Transactional 注解標(biāo)記事務(wù)。
- 調(diào)用 setChannelTransacted 方法設(shè)置為 true 開啟事務(wù)模式。
這就 OK 了。
在上面的案例中,我們在結(jié)尾來了個(gè) 1/0 ,這在運(yùn)行時(shí)必然拋出異常,我們可以嘗試運(yùn)行該方法,發(fā)現(xiàn)消息并未發(fā)送成功。
當(dāng)我們開啟事務(wù)模式之后,RabbitMQ 生產(chǎn)者發(fā)送消息會(huì)多出四個(gè)步驟:
- 客戶端發(fā)出請求,將信道設(shè)置為事務(wù)模式。
- 服務(wù)端給出回復(fù),同意將信道設(shè)置為事務(wù)模式。
- 客戶端發(fā)送消息。
- 客戶端提交事務(wù)。
- 服務(wù)端給出響應(yīng),確認(rèn)事務(wù)提交。
上面的步驟,除了第三步是本來就有的,其他幾個(gè)步驟都是平白無故多出來的。所以大家看到,事務(wù)模式其實(shí)效率有點(diǎn)低,這并非一個(gè)最佳解決方案。我們可以想想,什么項(xiàng)目會(huì)用到消息中間件?一般來說都是一些高并發(fā)的項(xiàng)目,這個(gè)時(shí)候并發(fā)性能尤為重要。
所以,RabbitMQ 還提供了發(fā)送方確認(rèn)機(jī)制(publisher confirm)來確保消息發(fā)送成功,這種方式,性能要遠(yuǎn)遠(yuǎn)高于事務(wù)模式,一起來看下。
2.2 發(fā)送方確認(rèn)機(jī)制
2.2.1 單條消息處理
首先我們移除剛剛關(guān)于事務(wù)的代碼,然后在 application.properties 中配置開啟消息發(fā)送方確認(rèn)機(jī)制,如下:
- spring.rabbitmq.publisher-confirm-type=correlated
- spring.rabbitmq.publisher-returns=true
第一行是配置消息到達(dá)交換器的確認(rèn)回調(diào),第二行則是配置消息到達(dá)隊(duì)列的回調(diào)。
第一行屬性的配置有三個(gè)取值:
- none:表示禁用發(fā)布確認(rèn)模式,默認(rèn)即此。
- correlated:表示成功發(fā)布消息到交換器后會(huì)觸發(fā)的回調(diào)方法。
- simple:類似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的調(diào)用。
接下來我們要開啟兩個(gè)監(jiān)聽,具體配置如下:
- @Configuration
- public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
- public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
- public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
- private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
- @Autowired
- RabbitTemplate rabbitTemplate;
- @Bean
- Queue queue() {
- return new Queue(JAVABOY_QUEUE_NAME);
- }
- @Bean
- DirectExchange directExchange() {
- return new DirectExchange(JAVABOY_EXCHANGE_NAME);
- }
- @Bean
- Binding binding() {
- return BindingBuilder.bind(queue())
- .to(directExchange())
- .with(JAVABOY_QUEUE_NAME);
- }
- @PostConstruct
- public void initRabbitTemplate() {
- rabbitTemplate.setConfirmCallback(this);
- rabbitTemplate.setReturnsCallback(this);
- }
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- if (ack) {
- logger.info("{}:消息成功到達(dá)交換器",correlationData.getId());
- }else{
- logger.error("{}:消息發(fā)送失敗", correlationData.getId());
- }
- }
- @Override
- public void returnedMessage(ReturnedMessage returned) {
- logger.error("{}:消息未成功路由到隊(duì)列",returned.getMessage().getMessageProperties().getMessageId());
- }
- }
關(guān)于這個(gè)配置類,我說如下幾點(diǎn):
- 定義配置類,實(shí)現(xiàn) RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 兩個(gè)接口,這兩個(gè)接口,前者的回調(diào)用來確定消息到達(dá)交換器,后者則會(huì)在消息路由到隊(duì)列失敗時(shí)被調(diào)用。
- 定義 initRabbitTemplate 方法并添加 @PostConstruct 注解,在該方法中為 rabbitTemplate 分別配置這兩個(gè) Callback。
這就可以了。
接下來我們對(duì)消息發(fā)送進(jìn)行測試。
首先我們嘗試將消息發(fā)送到一個(gè)不存在的交換機(jī)中,像下面這樣:
- rabbitTemplate.convertAndSend("RabbitConfig.JAVABOY_EXCHANGE_NAME",RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
注意第一個(gè)參數(shù)是一個(gè)字符串,不是變量,這個(gè)交換器并不存在,此時(shí)控制臺(tái)會(huì)報(bào)如下錯(cuò)誤:
接下來我們給定一個(gè)真實(shí)存在的交換器,但是給一個(gè)不存在的隊(duì)列,像下面這樣:
- rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,"RabbitConfig.JAVABOY_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
注意此時(shí)第二個(gè)參數(shù)是一個(gè)字符串,不是變量。
可以看到,消息雖然成功達(dá)到交換器了,但是沒有成功路由到隊(duì)列(因?yàn)殛?duì)列不存在)。
這是一條消息的發(fā)送,我們再來看看消息的批量發(fā)送。
2.2.2 消息批量處理
如果是消息批量處理,那么發(fā)送成功的回調(diào)監(jiān)聽是一樣的,這里不再贅述。
這就是 publisher-confirm 模式。
相比于事務(wù),這種模式下的消息吞吐量會(huì)得到極大的提升。
3. 失敗重試
失敗重試分兩種情況,一種是壓根沒找到 MQ 導(dǎo)致的失敗重試,另一種是找到 MQ 了,但是消息發(fā)送失敗了。
兩種重試我們分別來看。
3.1 自帶重試機(jī)制
前面所說的事務(wù)機(jī)制和發(fā)送方確認(rèn)機(jī)制,都是發(fā)送方確認(rèn)消息發(fā)送成功的辦法。如果發(fā)送方一開始就連不上 MQ,那么 Spring Boot 中也有相應(yīng)的重試機(jī)制,但是這個(gè)重試機(jī)制就和 MQ 本身沒有關(guān)系了,這是利用 Spring 中的 retry 機(jī)制來完成的,具體配置如下:
- spring.rabbitmq.template.retry.enabled=true
- spring.rabbitmq.template.retry.initial-interval=1000ms
- spring.rabbitmq.template.retry.max-attempts=10
- spring.rabbitmq.template.retry.max-interval=10000ms
- spring.rabbitmq.template.retry.multiplier=2
從上往下配置含義依次是:
- 開啟重試機(jī)制。
- 重試起始間隔時(shí)間。
- 最大重試次數(shù)。
- 最大重試間隔時(shí)間。
間隔時(shí)間乘數(shù)。(這里配置間隔時(shí)間乘數(shù)為 2,則第一次間隔時(shí)間 1 秒,第二次重試間隔時(shí)間 2 秒,第三次 4 秒,以此類推)
配置完成后,再次啟動(dòng) Spring Boot 項(xiàng)目,然后關(guān)掉 MQ,此時(shí)嘗試發(fā)送消息,就會(huì)發(fā)送失敗,進(jìn)而導(dǎo)致自動(dòng)重試。
3.2 業(yè)務(wù)重試
業(yè)務(wù)重試主要是針對(duì)消息沒有到達(dá)交換器的情況。
如果消息沒有成功到達(dá)交換器,根據(jù)我們第二小節(jié)的講解,此時(shí)就會(huì)觸發(fā)消息發(fā)送失敗回調(diào),在這個(gè)回調(diào)中,我們就可以做文章了!
整體思路是這樣:
首先創(chuàng)建一張表,用來記錄發(fā)送到中間件上的消息,像下面這樣:
每次發(fā)送消息的時(shí)候,就往數(shù)據(jù)庫中添加一條記錄。這里的字段都很好理解,有三個(gè)我額外說下:
- status:表示消息的狀態(tài),有三個(gè)取值,0,1,2 分別表示消息發(fā)送中、消息發(fā)送成功以及消息發(fā)送失敗。
- tryTime:表示消息的第一次重試時(shí)間(消息發(fā)出去之后,在 tryTime 這個(gè)時(shí)間點(diǎn)還未顯示發(fā)送成功,此時(shí)就可以開始重試了)。
- count:表示消息重試次數(shù)。
其他字段都很好理解,我就不一一啰嗦了。
在消息發(fā)送的時(shí)候,我們就往該表中保存一條消息發(fā)送記錄,并設(shè)置狀態(tài) status 為 0,tryTime 為 1 分鐘之后。
在 confirm 回調(diào)方法中,如果收到消息發(fā)送成功的回調(diào),就將該條消息的 status 設(shè)置為1(在消息發(fā)送時(shí)為消息設(shè)置 msgId,在消息發(fā)送成功回調(diào)時(shí),通過 msgId 來唯一鎖定該條消息)。
另外開啟一個(gè)定時(shí)任務(wù),定時(shí)任務(wù)每隔 10s 就去數(shù)據(jù)庫中撈一次消息,專門去撈那些 status 為 0 并且已經(jīng)過了 tryTime 時(shí)間記錄,把這些消息拎出來后,首先判斷其重試次數(shù)是否已超過 3 次,如果超過 3 次,則修改該條消息的 status 為 2,表示這條消息發(fā)送失敗,并且不再重試。對(duì)于重試次數(shù)沒有超過 3 次的記錄,則重新去發(fā)送消息,并且為其 count 的值+1。
大致的思路就是上面這樣,松哥這里就不給出代碼了,松哥的 vhr 里邊郵件發(fā)送就是這樣的思路來處理的,完整代碼大家可以參考 vhr 項(xiàng)目(https://github.com/lenve/vhr)。
當(dāng)然這種思路有兩個(gè)弊端:
- 去數(shù)據(jù)庫走一遭,可能拖慢 MQ 的 Qos,不過有的時(shí)候我們并不需要 MQ 有很高的 Qos,所以這個(gè)應(yīng)用時(shí)要看具體情況。
- 按照上面的思路,可能會(huì)出現(xiàn)同一條消息重復(fù)發(fā)送的情況,不過這都不是事,我們在消息消費(fèi)時(shí),解決好冪等性問題就行了。
當(dāng)然,大家也要注意,消息是否要確保 100% 發(fā)送成功,也要看具體情況。
4. 小結(jié)
好啦,這就是關(guān)于消息生產(chǎn)者的一些常見問題以及對(duì)應(yīng)的解決方案,下篇文章松哥和大家探討如果保證消息消費(fèi)成功并解決冪等性問題。
本文涉及到的相關(guān)源代碼大家可以在這里下載:https://github.com/lenve/javaboy-code-samples。
本文轉(zhuǎn)載自微信公眾號(hào)「江南一點(diǎn)雨」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系江南一點(diǎn)雨公眾號(hào)。