自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

為什么消息會(huì)重復(fù)消費(fèi),我從RocketMQ源碼中扒出了7種原因,有點(diǎn)小坑

開發(fā) 前端
為了解決這個(gè)問題,RocketMQ引入了定時(shí)清理的機(jī)制,定時(shí)清理長時(shí)間消費(fèi)的消息,這樣消費(fèi)進(jìn)度就可以提交了。

大家好,我是三友~~

在眾多關(guān)于MQ的面試八股文中有這么一道題,“如何保證MQ消息消費(fèi)的冪等性”。

為什么需要保證冪等性呢?是因?yàn)橄?huì)重復(fù)消費(fèi)。

為什么消息會(huì)重復(fù)消費(fèi)?

明明已經(jīng)消費(fèi)了,為什么消息會(huì)被再次被消費(fèi)呢?

不同的MQ產(chǎn)生的原因可能不一樣

本文就以RocketMQ為例,來扒一扒RocketMQ中會(huì)導(dǎo)致消息重復(fù)消息的原因,最終你會(huì)發(fā)現(xiàn),其實(shí)消息重復(fù)消費(fèi)算是RocketMQ無奈的“bug”。

如果有對(duì)RocketMQ不熟悉的小伙伴,可以看看我之前寫的 RocketMQ保姆級(jí)教程? 和 RocketMQ消息短暫而又精彩的一生 這兩篇文章。

圖片

消息發(fā)送異常時(shí)重復(fù)發(fā)送

首先,我們來瞅瞅RocketMQ發(fā)送消息和消費(fèi)消息的基本原理。

如圖,簡單說一下上圖中的概念:

  • Broker,就是RocketMQ的服務(wù)端,如上圖就有兩個(gè)服務(wù)實(shí)例
  • Topic就是一類消息集合的名字
  • Queue就是Topic的對(duì)應(yīng)的隊(duì)列,消息都存在Queue上,每個(gè)Topic都會(huì)有自己的幾個(gè)Queue

所以,整個(gè)消息發(fā)送和消費(fèi)過程大致如下:

  • 生產(chǎn)者在發(fā)送消息之前根據(jù)負(fù)載均衡策略(默認(rèn)是輪詢)選擇一個(gè)Queue,然后跟這個(gè)Queue所在的機(jī)器建立連接,把消息發(fā)送到這個(gè)Queue上
  • 消費(fèi)者只要消費(fèi)這個(gè)Queue,那么就能消費(fèi)到消息

在正常情況下,生產(chǎn)者的確是按照這個(gè)方式來發(fā)送消息的

但是當(dāng)出現(xiàn)了異常時(shí),這種異常包括消息發(fā)送超時(shí)、響應(yīng)超時(shí)等等,RocketMQ為了保證消息成功發(fā)送,會(huì)進(jìn)行消息發(fā)送的重試操作,默認(rèn)情況下會(huì)最多會(huì)重試兩次

圖片

重試操作比較簡單,就是選擇另一臺(tái)機(jī)器的Queue來發(fā)送。

雖然重試操作可以很大程度保證消息能夠發(fā)送成功,但是同時(shí)也會(huì)帶來消息重復(fù)發(fā)送的問題。

舉個(gè)例子,假設(shè)生產(chǎn)者向A機(jī)器發(fā)送消息,發(fā)生了異常,響應(yīng)超時(shí)了,但是就一定代表消息沒發(fā)成功么?

不一定,有可能會(huì)出現(xiàn)服務(wù)端的確接受到并處理了消息,但是由于網(wǎng)絡(luò)波動(dòng)等等,導(dǎo)致生產(chǎn)者接收不到服務(wù)端響應(yīng)的情況,此時(shí)消息處理成功了,但是生成者還是以為發(fā)生了異常

此時(shí)如果發(fā)生重試操作,那么勢(shì)必會(huì)導(dǎo)致消息被發(fā)送了兩次甚至更多次,導(dǎo)致服務(wù)端存了多條相同的消息,那么就一定會(huì)導(dǎo)致消費(fèi)者重復(fù)消費(fèi)消息。

消費(fèi)消息拋出異常

在RocketMQ的并發(fā)消費(fèi)消息的模式下,需要用戶實(shí)現(xiàn)MessageListenerConcurrently接口來處理消息

圖片

當(dāng)消費(fèi)者獲取到消息之后會(huì)調(diào)用MessageListenerConcurrently?的實(shí)現(xiàn),傳入需要消費(fèi)的消息集合msgs?,這里提到的msgs很重要

圖片

如上代碼,當(dāng)消息消費(fèi)出現(xiàn)異常的時(shí)候,status?就會(huì)為null,后面就會(huì)將status?設(shè)置成為RECONSUME_LATER。

RECONSUME_LATER翻譯成功中文就是稍后重新消費(fèi)的意思

所以從這可以看出,一旦拋出異常,那么消息之后就可以被重復(fù)消息。

到這其實(shí)可能有小伙伴覺得消息消費(fèi)失敗重新消費(fèi)很正常,保證消息盡可能消費(fèi)成功。

對(duì),這句話不錯(cuò),的確可以在一定程度上保證消費(fèi)異常的消息可以消費(fèi)成功。

但是坑不在這,而是前面提到的消費(fèi)時(shí)傳入的整個(gè)集合中的消息都需要被重新消費(fèi)。

具體的原因我們接著往下看

當(dāng)消息處理之后,不論是成功還是異常,都需要對(duì)結(jié)果進(jìn)行處理,代碼如下

圖片

當(dāng)處理結(jié)果為RECONSUME_LATER?的時(shí)候(異常會(huì)設(shè)置為RECONSUME_LATER?),此時(shí)ackIndex?會(huì)設(shè)置成-1?,后面循環(huán)遍歷的時(shí)候就會(huì)遍歷到所有這次消費(fèi)的消息,然后調(diào)用sendMessageBack?方法,sendMessageBack方式是用來實(shí)現(xiàn)消息重新消費(fèi)的邏輯,這里就不展開說了。

所以,一旦被消費(fèi)的一批消息中出現(xiàn)一個(gè)消費(fèi)異常的情況,那么就會(huì)導(dǎo)致整批消息被重新消費(fèi),從而會(huì)導(dǎo)致在出現(xiàn)異常之前的成功處理的消息都會(huì)被重復(fù)消費(fèi),非???。

不過好在消費(fèi)時(shí)傳入的消息集合中的消息數(shù)量是可以設(shè)置的,并且默認(rèn)就是1

圖片

也就說默認(rèn)情況下那個(gè)集合中就一條消息,所以默認(rèn)情況下不會(huì)出現(xiàn)消費(fèi)成功的消息被重復(fù)消費(fèi)的情況。

所以這個(gè)參數(shù)不要輕易設(shè)置,一旦設(shè)置大了,就可能導(dǎo)致消息被重新消費(fèi)。

除了并發(fā)消費(fèi)消息的模式以外,RocketMQ還支持順序消費(fèi)消息的模式,也會(huì)造成重復(fù)消費(fèi),邏輯其實(shí)差不多,但是在實(shí)現(xiàn)消息重新消費(fèi)的邏輯不一樣。

消費(fèi)者提交offset失敗

首先來講一講什么是offset。

前面說過,消息在發(fā)送的時(shí)候需要指定發(fā)送到,消息最后會(huì)被放到Queue中,其實(shí)真正的消息不是在Queue中,Queue存的是每個(gè)消息的位置,但是你可以理解為Queue存的是消息。

而消息在Queue中是有序號(hào)的,這個(gè)序號(hào)就被稱為offset,從0開始,單調(diào)遞增1。

比如說,如上圖,消息1的offset就是0,消息2的offset就是1,依次類推。

這個(gè)offset的一個(gè)作用就是用來管理消費(fèi)者的消費(fèi)進(jìn)度。

當(dāng)消費(fèi)者在成功消費(fèi)消息之后,需要將所消費(fèi)的消息的offset提交給RocketMQ服務(wù)端,告訴RocketMQ,這個(gè)Queue的消息我已經(jīng)消費(fèi)到了這個(gè)位置了。

提交offset的代碼就在上述第二節(jié)提到的處理結(jié)果的后面

這樣有一個(gè)好處,那么一旦消費(fèi)者重啟了或者其它啥的要從這個(gè)Queue拉取消息的時(shí)候,此時(shí)他只需要問問RocketMQ服務(wù)端上次這個(gè)Queue消息消費(fèi)到哪個(gè)位置了,之后消費(fèi)者只需要從這個(gè)位置開始消費(fèi)消息就行了,這樣就解決了接著消費(fèi)的問題。

但是RocketMQ在設(shè)計(jì)的時(shí)候,當(dāng)消費(fèi)完消息的時(shí)候并不是同步告訴RocketMQ服務(wù)端offset,而是定時(shí)發(fā)送。

如圖,當(dāng)消費(fèi)者消費(fèi)完消息的時(shí)候,會(huì)將offset保存到內(nèi)存中的一個(gè)Map數(shù)據(jù)結(jié)構(gòu)中,所以上面截圖的那段代碼其實(shí)是更新內(nèi)存中的offset

而在消費(fèi)者啟動(dòng)的時(shí)候會(huì)開啟一個(gè)定時(shí)任務(wù),默認(rèn)是5s一次,會(huì)通過網(wǎng)絡(luò)請(qǐng)求將內(nèi)存中的每個(gè)Queue的消費(fèi)進(jìn)度offset發(fā)送給RocketMQ服務(wù)端。

由于是定時(shí)任務(wù),所以就可能出現(xiàn)服務(wù)器一旦宕機(jī),導(dǎo)致最新消費(fèi)的offset沒有成功告訴RocketMQ服務(wù)端的情況

此時(shí),消費(fèi)進(jìn)度offset就丟了,那么消費(fèi)者重啟的時(shí)候只能從RocketMQ中獲取到上一次提交的offset,從這里開始消費(fèi),而不是最新的offset,出現(xiàn)明明消費(fèi)到了第8個(gè)消息,RocketMQ卻告訴他只消費(fèi)到了第5個(gè)消息的情況,此時(shí)必然會(huì)導(dǎo)致消息又出現(xiàn)重復(fù)消費(fèi)的情況。

服務(wù)端持久化offset失敗

上一節(jié)說到,消費(fèi)者會(huì)有一個(gè)每隔5s鐘的定時(shí)任務(wù)將每個(gè)隊(duì)列的消費(fèi)進(jìn)度offset提交到RocketMQ服務(wù)端

當(dāng)RocketMQ服務(wù)端接收到提交請(qǐng)求之后,會(huì)將這個(gè)消費(fèi)進(jìn)度offset保存到內(nèi)存中

同時(shí)為了保證RocketMQ服務(wù)端重啟消費(fèi)進(jìn)度不會(huì)丟失,也會(huì)開啟一個(gè)定時(shí)任務(wù),默認(rèn)也是5s一次,將內(nèi)存中的消費(fèi)進(jìn)度持久化到磁盤文件中

所以,整個(gè)消費(fèi)進(jìn)度offset的數(shù)據(jù)流轉(zhuǎn)過程如下

當(dāng)RocketMQ服務(wù)端重啟之后,會(huì)從磁盤中讀取文件的數(shù)據(jù)加載到內(nèi)存中。

跟消費(fèi)者產(chǎn)生的問題一樣,一旦RocketMQ發(fā)生宕機(jī),那么offset就有可能丟失5s鐘的數(shù)據(jù),RocketMQ服務(wù)端一旦重啟,消費(fèi)者從RocketMQ服務(wù)端獲取到的消息消費(fèi)進(jìn)度就比實(shí)際消費(fèi)的進(jìn)度低,同樣也會(huì)導(dǎo)致消息重復(fù)消費(fèi)。

主從同步offset失敗

在RocketMQ的高可用模式中,有一種名叫主從同步的模式,當(dāng)主節(jié)點(diǎn)掛了之后,從節(jié)點(diǎn)可以手動(dòng)升級(jí)為主節(jié)點(diǎn)對(duì)外提供訪問,保證高可用。

在主從同步模式下,從節(jié)點(diǎn)默認(rèn)每隔10s會(huì)向主節(jié)點(diǎn)發(fā)送請(qǐng)求,同步一些元數(shù)據(jù),這些元數(shù)據(jù)就包括消費(fèi)進(jìn)度

當(dāng)從節(jié)點(diǎn)獲取到主節(jié)點(diǎn)的消費(fèi)進(jìn)度之后,會(huì)將主節(jié)點(diǎn)的消費(fèi)進(jìn)度設(shè)置到自己的內(nèi)存中,同時(shí)也會(huì)持久化到磁盤。

所以整個(gè)消費(fèi)進(jìn)度offset的數(shù)據(jù)的流轉(zhuǎn)過程就會(huì)變成如下

同樣,由于也是定時(shí)任務(wù),那么一旦主節(jié)點(diǎn)掛了,從節(jié)點(diǎn)就會(huì)丟10s鐘的消費(fèi)進(jìn)度,此時(shí)如果從節(jié)點(diǎn)升級(jí)為主節(jié)點(diǎn)對(duì)外提供訪問,就會(huì)出現(xiàn)跟上面提到的一樣的情況,消費(fèi)者從這個(gè)新的主節(jié)點(diǎn)中拿到的消費(fèi)進(jìn)度比實(shí)際的低,自然而然就會(huì)重復(fù)消費(fèi)消息。

所以,總的來說,在消費(fèi)進(jìn)度數(shù)據(jù)流轉(zhuǎn)的過程中,只要某個(gè)環(huán)節(jié)出現(xiàn)了問題,都有很有可能會(huì)導(dǎo)致消息重復(fù)消費(fèi)。

重平衡

先來講一講什么是重平衡,其實(shí)重平衡很好理解,我說一下你就明白了。

前面說到,消費(fèi)者是從隊(duì)列中獲取消息的

在RocketMQ中,有個(gè)消費(fèi)者組的概念,一個(gè)消費(fèi)者組中可以有多個(gè)消費(fèi)者,不同消費(fèi)者組之間消費(fèi)消息是互不干擾的,所以前面提到的消費(fèi)者其實(shí)都在消費(fèi)組下

在同一個(gè)消費(fèi)者組中,消息消費(fèi)有兩種模式:

  • 集群消費(fèi)模式
  • 廣播消費(fèi)模式

由于RocketMQ默認(rèn)是集群消費(fèi)模式,并且絕大多數(shù)業(yè)務(wù)場景都是使用集群消費(fèi)模式,所以這里就不討論廣播消費(fèi)模式了,感興趣的同學(xué)可以看看RocketMQ消息短暫而又精彩的一生 這篇文章。

集群消費(fèi)模式是指同一條消息只能被這個(gè)消費(fèi)者組消費(fèi)一次,這就叫集群消費(fèi)。

并且前面提到提交消費(fèi)進(jìn)度給RocketMQ服務(wù)端的情況只會(huì)集群消費(fèi)模式下才會(huì)有,在廣播消費(fèi)模式不會(huì)提給到RocketMQ服務(wù)端,僅僅持久化到本地磁盤

同時(shí)前面說的消費(fèi)者提交消費(fèi)進(jìn)度真正提交的是消費(fèi)者組對(duì)于這個(gè)Queue的消費(fèi)進(jìn)度,而不是指具體的某個(gè)消費(fèi)者對(duì)于Queue消費(fèi)進(jìn)度。

雖然說這里將前面提到的一些含義更深一步,但是并不妨礙前面的理解。

集群消費(fèi)的實(shí)現(xiàn)就是將隊(duì)列按照一定的算法分配給消費(fèi)者,默認(rèn)是按照平均分配的。

如圖所示,假設(shè)某個(gè)topic有4個(gè)Queue,有個(gè)消費(fèi)者組訂閱了這個(gè)topic,這個(gè)消費(fèi)者組有兩個(gè)消費(fèi)者1和消費(fèi)者2,此時(shí)每個(gè)消費(fèi)者就可以被分配兩個(gè)隊(duì)列,這樣就能保證消息正常情況下只會(huì)被消費(fèi)一次。如果只有一個(gè)消費(fèi)者,那么這個(gè)消費(fèi)者就會(huì)消費(fèi)所有隊(duì)列,很好理解。

接著后面又啟動(dòng)了一個(gè)消費(fèi)者3,此時(shí)為了保證剛上線的消費(fèi)者3能夠消費(fèi)消息,就要進(jìn)行重平衡操作,重新分配每個(gè)消費(fèi)者消費(fèi)的隊(duì)列。

在重平衡之后就可能會(huì)出現(xiàn)下面這種情況

如上圖,原本被消費(fèi)者2消費(fèi)的Queue4被分配給消費(fèi)者3,此時(shí)消費(fèi)者3就能消費(fèi)到消息了,這就是重平衡。

除了新增消費(fèi)者會(huì)導(dǎo)致重平衡之外,消費(fèi)者數(shù)量減少,隊(duì)列的數(shù)量增加或者減少都會(huì)觸發(fā)重平衡。

在了解了重平衡概念之后,接下來分析一下為什么重平衡會(huì)導(dǎo)致消息的重復(fù)消費(fèi)。

假設(shè)在進(jìn)行重平衡時(shí),還未重平衡完之前,消費(fèi)者2此時(shí)還是會(huì)按照上面第二節(jié)提到的消費(fèi)消息的邏輯來消費(fèi)Queue4的消息

當(dāng)消費(fèi)者2已經(jīng)重平衡完成了,發(fā)現(xiàn)Queue4自己已經(jīng)不能消費(fèi)了,那么此時(shí)就會(huì)把這個(gè)Queue4設(shè)置為dropped,就是丟棄的意思

但是由于重平衡進(jìn)行時(shí)消費(fèi)者2仍然在消費(fèi)Queue4的消息,但是當(dāng)消費(fèi)完之后,發(fā)現(xiàn)隊(duì)列被設(shè)置成dropped,那么此時(shí)被消費(fèi)者2消費(fèi)消息的offset就不會(huì)被提交,原因如下代碼

這段代碼前面已經(jīng)出現(xiàn)過,一旦dropped被設(shè)置成true,這個(gè)if條件就通不過,消費(fèi)進(jìn)度就不會(huì)被提交。

成功消費(fèi)消息了,但是卻不提交消費(fèi)進(jìn)度,這就非??恿?。。

于是當(dāng)消費(fèi)者3開始消費(fèi)Queue4的消息的時(shí)候,他就會(huì)問問RocketMQ服務(wù)端,我消費(fèi)者3所在的消費(fèi)者組對(duì)于Queue4這個(gè)隊(duì)列消費(fèi)到哪了,我接著消費(fèi)就行了。

此時(shí)由于沒有提交消費(fèi)進(jìn)度,RocketMQ服務(wù)端告訴消費(fèi)者3的消費(fèi)進(jìn)度就會(huì)比實(shí)際的低,這就造成了消息重復(fù)消費(fèi)的情況。

清理長時(shí)間消費(fèi)的消息

在RocketMQ中有這么一個(gè)機(jī)制,會(huì)定時(shí)清理長時(shí)間正在消費(fèi)的消息。

如圖,假設(shè)有5條消息現(xiàn)在正在被消費(fèi)者處理,這5條消息會(huì)被存在一個(gè)集合中,并且是按照offset的大小排序,消息1的offset最小,消息5的offset最大。

RocketMQ消費(fèi)者啟動(dòng)時(shí)會(huì)開啟一個(gè)默認(rèn)15分鐘執(zhí)行一次的定時(shí)任務(wù)

圖片

這個(gè)定時(shí)任務(wù)會(huì)去檢查正在處理的消息的第一條消息,也就是圖中的消息1,一旦發(fā)現(xiàn)消息1已經(jīng)處理了超過15分鐘了,那么此時(shí)就會(huì)將消息1從集合中移除,之后會(huì)隔一定時(shí)間再次消費(fèi)消息1。

這也會(huì)有坑,雖然消息1從集合中被移除了,但是消息1并沒有消失,仍然被消費(fèi)者繼續(xù)處理,但是消息1隔一定時(shí)間就會(huì)再次被消費(fèi),就會(huì)出現(xiàn)消息1被重復(fù)消費(fèi)的情況。

這就是清理長時(shí)間消費(fèi)的消息導(dǎo)致重復(fù)消費(fèi)的原因。

但此時(shí)又會(huì)引出一個(gè)新的疑問,為什么要移除這個(gè)處理超過15分鐘的消息呢?

這就又跟前面提到的消費(fèi)進(jìn)度提交有關(guān)!

前面說過消息被消費(fèi)完成之后會(huì)提交消費(fèi)進(jìn)度,提交的消費(fèi)進(jìn)度實(shí)際會(huì)有兩種情況:

第一種就是某個(gè)線程消費(fèi)了所有的消息,當(dāng)把所有的消息都消費(fèi)完成之后,就會(huì)把消息從集合中全部移除,此時(shí)提交的消費(fèi)進(jìn)度offset就是圖中消息5的offset+1

加1的操作是為了保證如果發(fā)生重啟,那么消費(fèi)者下次消費(fèi)的起始位置就是消息5后面的消息,保證消息5不被重復(fù)消費(fèi)

第二種情況就不太一樣了

假設(shè)現(xiàn)在有兩個(gè)線程來處理這5條消息,線程1處理前2條,線程2處理后3條,如圖

圖片

現(xiàn)在線程1出現(xiàn)了長時(shí)間處理消息的情況。

此時(shí)線程2處理完消息之后,移除后面三條消息,準(zhǔn)備提交offset的時(shí)候發(fā)現(xiàn)集合中還有元素,就是線程1正在處理的前兩條消息,此時(shí)線程2提交的offset并不是消息5對(duì)應(yīng)的offset,而是消息1的offset,代碼如下

圖片

這么做的主要原因就是保證消息1和消息2至少被消費(fèi)一次。

因?yàn)橐坏┨峤涣讼?對(duì)應(yīng)的offset,如果消費(fèi)者重啟了,下次消費(fèi)就會(huì)接著從消息5的后面開始消費(fèi),而對(duì)于消息1和消息2來說,并不知道有沒有被消費(fèi)成功,就有可能出現(xiàn)消息丟失的情況。

所以,一旦集合中最前面的消息長時(shí)間處理,那么就會(huì)導(dǎo)致后面被消費(fèi)的消息進(jìn)度無法提交,那么重啟之后就會(huì)導(dǎo)致大量消息被重復(fù)消費(fèi)。

為了解決這個(gè)問題,RocketMQ引入了定時(shí)清理的機(jī)制,定時(shí)清理長時(shí)間消費(fèi)的消息,這樣消費(fèi)進(jìn)度就可以提交了。

最后

總得來說,RocketMQ中還是存在很多種導(dǎo)致消息重讀消費(fèi)的情況,并且官方也說了,只是在大多數(shù)情況下消息不會(huì)重復(fù)

圖片

所以如果你的業(yè)務(wù)場景中需要保證消息不能重復(fù)消費(fèi),那么就需要根據(jù)業(yè)務(wù)場景合理的設(shè)計(jì)冪等技術(shù)方案。

責(zé)任編輯:武曉燕 來源: 三友的java日記
相關(guān)推薦

2024-03-12 00:00:00

RocketMQ服務(wù)端磁盤

2021-08-23 08:19:48

辭職Google工程師

2023-12-25 19:28:59

RocketMQ大數(shù)據(jù)

2013-06-18 10:55:26

PhoneGap

2024-05-23 12:11:39

2025-03-21 10:33:22

2021-09-07 15:41:35

Bug誘因代碼

2024-06-05 06:37:19

2010-03-09 13:16:11

LinuxWindows

2021-03-08 10:19:59

MQ消息磁盤

2021-03-13 11:23:51

源碼邏輯框架

2024-04-09 09:08:09

Kafka消息架構(gòu)

2022-03-27 09:06:04

React類型定義前端

2021-05-10 09:35:58

Kubernetes節(jié)點(diǎn)Join

2012-03-19 20:52:55

小米

2019-11-20 09:00:52

Linux 開發(fā)操作系統(tǒng)

2017-10-19 12:45:07

PHP

2022-10-31 08:40:06

消息RocketMQ消息重復(fù)

2011-05-27 09:19:32

Windows 7崩潰

2016-12-14 08:30:14

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)