自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

RabbitMQ 高可用之如何確保消息成功消費

開發(fā) 架構(gòu)
為了保證消息能夠可靠的到達消息消費者,RabbitMQ 中提供了消息消費確認機制。當消費者去消費消息的時候,可以通過指定 autoAck 參數(shù)來表示消息消費的確認方式。
  • 1. 兩種消費思路
  • 2. 確保消費成功兩種思路
  • 3. 消息拒絕
  • 4. 消息確認
    • 4.1 自動確認
    • 4.2 手動確認
  • 5. 冪等性問題
  • 6. 小結(jié)

前面一篇文章松哥和大家聊了 MQ 高可用之如何確保消息成功發(fā)送,各種配置齊上陣,最終確保了消息的成功發(fā)送,甚至在一些極端情況下還可能發(fā)生同一條消息重復(fù)發(fā)送的情況,不管怎么樣,消息總算發(fā)送出去了,如果小伙伴們還沒看過上篇文章,建議先看看,再來學習本文:

四種策略確保 RabbitMQ 消息發(fā)送可靠性!你用哪種?

今天我們就來聊一聊消息消費的問題,看看如何確保消息消費成功,并且確保冪等性。

1. 兩種消費思路

RabbitMQ 的消息消費,整體上來說有兩種不同的思路:

  • 推(push):MQ 主動將消息推送給消費者,這種方式需要消費者設(shè)置一個緩沖區(qū)去緩存消息,對于消費者而言,內(nèi)存中總是有一堆需要處理的消息,所以這種方式的效率比較高,這也是目前大多數(shù)應(yīng)用采用的消費方式。
  • 拉(pull):消費者主動從 MQ 拉取消息,這種方式效率并不是很高,不過有的時候如果服務(wù)端需要批量拉取消息,倒是可以采用這種方式。

兩種方式我都舉個例子看下。

先來看推(push):

這種方式大家比較常見,就是通過 @RabbitListener 注解去標記消費者,如下:

  1. @Component 
  2. public class ConsumerDemo { 
  3.     @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME) 
  4.     public void handle(String msg) { 
  5.         System.out.println("msg = " + msg); 
  6.     } 

當監(jiān)聽的隊列中有消息時,就會觸發(fā)該方法。

再來看拉(pull):

  1. @Test 
  2. public void test01() throws UnsupportedEncodingException { 
  3.     Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME); 
  4.     System.out.println("o = " + new String(((byte[]) o),"UTF-8")); 

調(diào)用 receiveAndConvert 方法,方法參數(shù)為隊列名稱,方法執(zhí)行完成后,會從 MQ 上拉取一條消息下來,如果該方法返回值為 null,表示該隊列上沒有消息了。receiveAndConvert 方法有一個重載方法,可以在重載方法中傳入一個等待超時時間,例如 3 秒。此時,假設(shè)隊列中沒有消息了,則 receiveAndConvert 方法會阻塞 3 秒,3 秒內(nèi)如果隊列中有了新消息就返回,3 秒后如果隊列中還是沒有新消息,就返回 null,這個等待超時時間要是不設(shè)置的話,默認為 0。

這是消息兩種不同的消費模式。

如果需要從消息隊列中持續(xù)獲得消息,就可以使用推模式;如果只是單純的消費一條消息,則使用拉模式即可。切忌將拉模式放到一個死循環(huán)中,變相的訂閱消息,這會嚴重影響 RabbitMQ 的性能。

2. 確保消費成功兩種思路

在上篇文章中,我們想盡辦法確保消息能夠發(fā)送成功,對于消息消費成功,其實官方提供了相關(guān)的機制,我們一起來看下。

為了保證消息能夠可靠的到達消息消費者,RabbitMQ 中提供了消息消費確認機制。當消費者去消費消息的時候,可以通過指定 autoAck 參數(shù)來表示消息消費的確認方式。

當 autoAck 為 false 的時候,此時即使消費者已經(jīng)收到消息了,RabbitMQ 也不會立馬將消息移除,而是等待消費者顯式的回復(fù)確認信號后,才會將消息打上刪除標記,然后再刪除。

當 autoAck 為 true 的時候,此時消息消費者就會自動把發(fā)送出去的消息設(shè)置為確認,然后將消息移除(從內(nèi)存或者磁盤中),即使這些消息并沒有到達消費者。

我們來看一張圖:

如上圖所示,在 RabbitMQ 的 web 管理頁面:

  • Ready 表示待消費的消息數(shù)量。
  • Unacked 表示已經(jīng)發(fā)送給消費者但是還沒收到消費者 ack 的消息數(shù)量。

這是我們可以從 UI 層面觀察消息的消費情況確認情況。

當我們將 autoAck 設(shè)置為 false 的時候,對于 RabbitMQ 而言,消費分成了兩個部分:

  • 待消費的消息
  • 已經(jīng)投遞給消費者,但是還沒有被消費者確認的消息

換句話說,當設(shè)置 autoAck 為 false 的時候,消費者就變得非常從容了,它將有足夠的時間去處理這條消息,當消息正常處理完成后,再手動 ack,此時 RabbitMQ 才會認為這條消息消費成功了。如果 RabbitMQ 一直沒有收到客戶端的反饋,并且此時客戶端也已經(jīng)斷開連接了,那么 RabbitMQ 就會將剛剛的消息重新放回隊列中,等待下一次被消費。

綜上所述,確保消息被成功消費,無非就是手動 Ack 或者自動 Ack,無他。當然,無論這兩種中的哪一種,最終都有可能導致消息被重復(fù)消費,所以一般來說我們還需要在處理消息時,解決冪等性問題。

3. 消息拒絕

當客戶端收到消息時,可以選擇消費這條消息,也可以選擇拒絕這條消息。我們來看下拒絕的方式:

  1. @Component 
  2. public class ConsumerDemo { 
  3.     @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME) 
  4.     public void handle(Channel channel, Message message) { 
  5.         //獲取消息編號 
  6.         long deliveryTag = message.getMessageProperties().getDeliveryTag(); 
  7.         try { 
  8.             //拒絕消息 
  9.             channel.basicReject(deliveryTag, true); 
  10.         } catch (IOException e) { 
  11.             e.printStackTrace(); 
  12.         } 
  13.     } 

消費者收到消息之后,可以選擇拒絕消費該條消息,拒絕的步驟分兩步:

  1. 獲取消息編號 deliveryTag。
  2. 調(diào)用 basicReject 方法拒絕消息。

調(diào)用 basicReject 方法時,第二個參數(shù)是 requeue,即是否重新入隊。如果第二個參數(shù)為 true,則這條被拒絕的消息會重新進入到消息隊列中,等待下一次被消費;如果第二個參數(shù)為 false,則這條被拒絕的消息就會被丟掉,不會有新的消費者去消費它了。

需要注意的是,basicReject 方法一次只能拒絕一條消息。

4. 消息確認

消息確認分為自動確認和手動確認,我們分別來看。

4.1 自動確認

先來看看自動確認,在 Spring Boot 中,默認情況下,消息消費就是自動確認的。

我們來看如下一個消息消費方法:

  1. @Component 
  2. public class ConsumerDemo { 
  3.     @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME) 
  4.     public void handle2(String msg) { 
  5.         System.out.println("msg = " + msg); 
  6.         int i = 1 / 0; 
  7.     } 

通過 @Componet 注解將當前類注入到 Spring 容器中,然后通過 @RabbitListener 注解來標記一個消息消費方法,默認情況下,消息消費方法自帶事務(wù),即如果該方法在執(zhí)行過程中拋出異常,那么被消費的消息會重新回到隊列中等待下一次被消費,如果該方法正常執(zhí)行完沒有拋出異常,則這條消息就算是被消費了。

4.2 手動確認

手動確認我又把它分為兩種:推模式手動確認與拉模式手動確認。

4.2.1 推模式手動確認

要開啟手動確認,需要我們首先關(guān)閉自動確認,關(guān)閉方式如下:

  1. spring.rabbitmq.listener.simple.acknowledge-mode=manual 

這個配置表示將消息的確認模式改為手動確認。

接下來我們來看下消費者中的代碼:

  1. @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME) 
  2. public void handle3(Message message,Channel channel) { 
  3.     long deliveryTag = message.getMessageProperties().getDeliveryTag(); 
  4.     try { 
  5.         //消息消費的代碼寫到這里 
  6.         String s = new String(message.getBody()); 
  7.         System.out.println("s = " + s); 
  8.         //消費完成后,手動 ack 
  9.         channel.basicAck(deliveryTag, false); 
  10.     } catch (Exception e) { 
  11.         //手動 nack 
  12.         try { 
  13.             channel.basicNack(deliveryTag, falsetrue); 
  14.         } catch (IOException ex) { 
  15.             ex.printStackTrace(); 
  16.         } 
  17.     } 

將消費者要做的事情放到一個 try..catch 代碼塊中。

如果消息正常消費成功,則執(zhí)行 basicAck 完成確認。

如果消息消費失敗,則執(zhí)行 basicNack 方法,告訴 RabbitMQ 消息消費失敗。

這里涉及到兩個方法:

  • basicAck:這個是手動確認消息已經(jīng)成功消費,該方法有兩個參數(shù):第一個參數(shù)表示消息的 id;第二個參數(shù) multiple 如果為 false,表示僅確認當前消息消費成功,如果為 true,則表示當前消息之前所有未被當前消費者確認的消息都消費成功。
  • basicNack:這個是告訴 RabbitMQ 當前消息未被成功消費,該方法有三個參數(shù):第一個參數(shù)表示消息的 id;第二個參數(shù) multiple 如果為 false,表示僅拒絕當前消息的消費,如果為 true,則表示拒絕當前消息之前所有未被當前消費者確認的消息;第三個參數(shù) requeue 含義和前面所說的一樣,被拒絕的消息是否重新入隊。

當 basicNack 中最后一個參數(shù)設(shè)置為 false 的時候,還涉及到一個死信隊列的問題,這個松哥以后再專門寫文章和大家細聊。

4.2.2 拉模式手動確認

拉模式手動 ack 比較麻煩一些,在 Spring 中封裝的 RabbitTemplate 中并未找到對應(yīng)的方法,所以我們得用原生的辦法,如下:

  1. public void receive2() { 
  2.     Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true); 
  3.     long deliveryTag = 0L; 
  4.     try { 
  5.         GetResponse getResponse = channel.basicGet(RabbitConfig.JAVABOY_QUEUE_NAME, false); 
  6.         deliveryTag = getResponse.getEnvelope().getDeliveryTag(); 
  7.         System.out.println("o = " + new String((getResponse.getBody()), "UTF-8")); 
  8.         channel.basicAck(deliveryTag, false); 
  9.     } catch (IOException e) { 
  10.         try { 
  11.             channel.basicNack(deliveryTag, falsetrue); 
  12.         } catch (IOException ex) { 
  13.             ex.printStackTrace(); 
  14.         } 
  15.     } 

這里涉及到的 basicAck 和 basicNack 方法跟前面的一樣,我就不再贅述。

5. 冪等性問題

最后我們再來說說消息的冪等性問題。

大家設(shè)想下面一個場景:

消費者在消費完一條消息后,向 RabbitMQ 發(fā)送一個 ack 確認,此時由于網(wǎng)絡(luò)斷開或者其他原因?qū)е?RabbitMQ 并沒有收到這個 ack,那么此時 RabbitMQ 并不會將該條消息刪除,當重新建立起連接后,消費者還是會再次收到該條消息,這就造成了消息的重復(fù)消費。同時,由于類似的原因,消息在發(fā)送的時候,同一條消息也可能會發(fā)送兩次(參見四種策略確保 RabbitMQ 消息發(fā)送可靠性!你用哪種?)。種種原因?qū)е挛覀冊谙M消息時,一定要處理好冪等性問題。

冪等性問題的處理倒也不難,基本上都是從業(yè)務(wù)上來處理,我來大概說說思路。

采用 Redis,在消費者消費消息之前,現(xiàn)將消息的 id 放到 Redis 中,存儲方式如下:

  • id-0(正在執(zhí)行業(yè)務(wù))
  • id-1(執(zhí)行業(yè)務(wù)成功)

如果 ack 失敗,在 RabbitMQ 將消息交給其他的消費者時,先執(zhí)行 setnx,如果 key 已經(jīng)存在(說明之前有人消費過該消息),獲取他的值,如果是 0,當前消費者就什么都不做,如果是 1,直接 ack。

極端情況:第一個消費者在執(zhí)行業(yè)務(wù)時,出現(xiàn)了死鎖,在 setnx 的基礎(chǔ)上,再給 key 設(shè)置一個生存時間。生產(chǎn)者,發(fā)送消息時,指定 messageId。

當然這只是一個簡單思路供大家參考。

松哥在 vhr 項目中也處理了消息冪等性問題,感興趣的小伙伴可以查看 vhr 源碼(https://github.com/lenve/vhr),代碼在 mailserver 中。

6. 小結(jié)

好啦,今天就和小伙伴們聊了下 RabbitMQ 中和消息消費相關(guān)的幾個話題,感興趣的小伙伴可以實踐下哦~

本文轉(zhuǎn)載自微信公眾號「江南一點雨」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系江南一點雨公眾號。

 

責任編輯:武曉燕 來源: 江南一點雨
相關(guān)推薦

2024-05-23 12:11:39

2024-12-18 07:43:49

2013-08-28 10:30:39

vSphere

2022-05-16 13:46:38

Redis高可用Sentinel

2024-08-12 12:17:03

2018-06-21 08:23:35

云存儲高可用應(yīng)用

2021-09-30 07:26:15

MQ消息丟失

2020-03-18 09:00:06

SQL Server云計算數(shù)據(jù)庫

2022-09-21 16:09:28

消息中間件

2023-03-06 08:16:04

SpringRabbitMQ

2019-03-28 09:07:05

MySQLRedisHBase

2020-10-28 11:20:18

RabbitMQHAProxy運維

2010-12-07 15:30:15

Exchange Se

2019-03-25 09:09:57

MySQLRedisHBase

2021-03-01 07:31:53

消息支付高可用

2024-05-09 08:04:23

RabbitMQ消息可靠性

2022-07-26 20:00:35

場景RabbitMQMQ

2020-10-14 08:36:10

RabbitMQ消息

2022-08-02 11:27:25

RabbitMQ消息路由

2021-08-10 09:59:15

RabbitMQ消息微服務(wù)
點贊
收藏

51CTO技術(shù)棧公眾號