使用消息中間件時(shí),如何保證消息僅僅被消費(fèi)一次?
消息中間件使用廣泛,常用來削峰填谷、系統(tǒng)解耦、異步處理。異步處理可能是使用的最多的場(chǎng)景了,比如現(xiàn)在的技術(shù)博客網(wǎng)站,都采用積分制,用戶發(fā)表一篇文章后,可以獲取想要的積分,為了提升系統(tǒng)的性能,給用戶加積分的操作可以異步處理,并不需要放在同步流程中。
我們可以把用戶ID,需要增加的積分封裝成一條消息投遞到消息系統(tǒng)中,異步處理加積分操作,由于這是發(fā)生在不同服務(wù)器之間,消息有可能投遞失敗、處理失敗等問題,從而導(dǎo)致用戶加積分失敗,還有一種可能是消息重復(fù)投遞,那么用戶就有可能重復(fù)加積分,不管出現(xiàn)那種情況,都是不正常的情況。
要避免上面的兩種情況,就需要我們盡量保證消息不丟失和消息只被消費(fèi)一次,這篇文章拋開具體的消息中間件,從消息系統(tǒng)的通用層面上,談?wù)勅绾伪苊膺@兩種情況。
1、保證消息不丟失
一條消息從生產(chǎn)到消費(fèi)這條鏈路中,有三個(gè)地方可能會(huì)造成消息丟失,分別如下:
- 消息從生產(chǎn)者寫入到消息隊(duì)列的過程投遞失敗。
- 消息在消息隊(duì)列中,持久化失敗。
- 消息被消費(fèi)者消費(fèi)的過程出現(xiàn)異常。
在消息生產(chǎn)的過程中投遞失敗
消息生產(chǎn)者和消息系統(tǒng)一般都是獨(dú)立部署在不同的服務(wù)器上,兩臺(tái)服務(wù)器之間要通信就要通過網(wǎng)絡(luò)來完成,網(wǎng)絡(luò)是不穩(wěn)定,可能會(huì)發(fā)生抖動(dòng),那么數(shù)據(jù)就有可能丟失。網(wǎng)絡(luò)發(fā)生抖動(dòng)會(huì)有以下兩種情況。
在消息生產(chǎn)的過程中丟失消息
情景一:消息在傳送給消息系統(tǒng)的過程中發(fā)生網(wǎng)絡(luò)抖動(dòng),數(shù)據(jù)直接丟失。
情景二:消息已經(jīng)到達(dá)消息系統(tǒng),但是在消息系統(tǒng)給生產(chǎn)者服務(wù)器返回信息時(shí),網(wǎng)絡(luò)發(fā)生抖動(dòng),此時(shí)的數(shù)據(jù)不一定真正的丟失,很可能只是生產(chǎn)者認(rèn)為數(shù)據(jù)丟失。
針對(duì)消息在消息生產(chǎn)時(shí)丟失,可以采取重投機(jī)制,當(dāng)程序檢測(cè)到網(wǎng)絡(luò)異常時(shí),將消息再次投遞到消息系統(tǒng)。但是重新投遞在情景二情況下,可能造成數(shù)據(jù)重復(fù),如何解決這個(gè)問題,在后面會(huì)提到。
在消息隊(duì)列中持久化失敗
消息系統(tǒng)是可以對(duì)消息進(jìn)行持久化,一般都是將消息存儲(chǔ)到本地磁盤中,當(dāng)然也有少數(shù)消息中間件支持將數(shù)據(jù)持久化到數(shù)據(jù)庫中,那么消息系統(tǒng)的性能可能就會(huì)下降。
如果你對(duì) Redis 的持久化有一定的了解話,你會(huì)發(fā)現(xiàn) Redis 在持久化數(shù)據(jù)時(shí)并不是每新增一條就立即存入到本地磁盤,而是會(huì)將數(shù)據(jù)先寫入到操作系統(tǒng)的 Page Cache 中,當(dāng)滿足一定條件時(shí),再將 Page Cache 中的數(shù)據(jù)刷入磁盤,因?yàn)檫@樣可以減少對(duì)磁盤的隨機(jī) I/O 操作,我們知道隨機(jī) I/O 是非常耗時(shí)的,這樣也提高了系統(tǒng)性能,消息中間件也不例外,在持久化時(shí)也是采用這種方式。
在某些極端情況下,可能會(huì)造成 Page Cache 中的數(shù)據(jù)丟失,比如突然停電或者機(jī)器異常重啟操作。要解決 Page Cache 中數(shù)據(jù)丟失問題,可以采用集群部署的方式,來盡量保證數(shù)據(jù)不丟失。
在消費(fèi)的過程中存在消息丟失
消息在消費(fèi)過程中也是會(huì)發(fā)生丟失的,而且在消費(fèi)過程中丟失的概率要比前兩種情況大很多。一條消息消費(fèi)過程大概分成三步:消費(fèi)者拉取消息,消費(fèi)者處理消息,消息系統(tǒng)更新消費(fèi)進(jìn)度。
圖片描述
第一步在拉取消息的時(shí)候可能發(fā)生網(wǎng)絡(luò)抖動(dòng)異常,第二步在處理消息的時(shí)候可能發(fā)生一些業(yè)務(wù)異常,而導(dǎo)致流程并沒有走完,如果在第一步、第二步發(fā)生異常的情況下,通知消息系統(tǒng)更新消費(fèi)進(jìn)度,那么這條失敗的消息就永遠(yuǎn)不會(huì)在被處理了,自然就丟失了,其實(shí)我們的業(yè)務(wù)并沒有跑完。
要避免消息在消費(fèi)時(shí)丟失的情況,可以在消息接收和處理完成之后才更新消費(fèi)進(jìn)度,但是在極端的情況下,會(huì)出現(xiàn)消息重復(fù)消費(fèi)的問題,比如某一條消息在處理完成之后,消費(fèi)者宕機(jī)了,這時(shí)還沒有更新消費(fèi)進(jìn)度,消費(fèi)者重啟后,這條消息還是會(huì)被消費(fèi)到。
2、保證消息只被消費(fèi)一次
消息系統(tǒng)本身不能保證消息僅被消費(fèi)一次,因?yàn)橄M(fèi)本身可能重復(fù)、下游系統(tǒng)啟動(dòng)拉取重復(fù)、失敗重試帶來的重復(fù)、補(bǔ)償邏輯導(dǎo)致的重復(fù)都有可能造重復(fù)消息,要保證消息僅被消費(fèi)一次可以利用等冪性來實(shí)現(xiàn)。
等冪是數(shù)學(xué)上的一個(gè)概念,就是多次執(zhí)行同一個(gè)操作和執(zhí)行一次操作,最終得到的結(jié)果是相同的。
從等冪的概念上就可以看出來,就算消息執(zhí)行多次也不會(huì)對(duì)系統(tǒng)造成影響,那么在使用消息系統(tǒng)時(shí)如何保證等冪性呢?因?yàn)樯a(chǎn)者和消費(fèi)者都有可能產(chǎn)生重復(fù)消息,所以要在生產(chǎn)者和消費(fèi)者兩端都保證等冪性。
保證生產(chǎn)者等冪性,在生產(chǎn)消息的時(shí)候,利用雪花算法給消息生成一個(gè)全局 ID,在消息系統(tǒng)中維護(hù)消息已 ID 映射關(guān)系,如果在映射表中已經(jīng)存在相同 ID,這丟棄這條消息,雖然消息被投遞了兩次,但是實(shí)際上就保存了一條,避免了消息重復(fù)問題。
生產(chǎn)者等冪性跟所選者的消息中間件有關(guān)系,因?yàn)榻^大數(shù)情況下消息系統(tǒng)不需要我們自己實(shí)現(xiàn),所以等冪性是不太好控制的,消費(fèi)者等冪性才是我們開發(fā)人員控制的重點(diǎn)方向。
在消費(fèi)者端可以從通用層和業(yè)務(wù)層兩個(gè)方面來做等冪操作,取決于我們的業(yè)務(wù)要求。
在通用層面中,利用好消息生成是產(chǎn)生的全局唯一ID,消息被處理成功后,把這個(gè)全局 ID 存入到數(shù)據(jù)中,在處理下一條消息之前,先從數(shù)據(jù)庫中查詢這個(gè)全局 ID 是否存在,如果已經(jīng)存在,則直接放棄該消息。
利用這個(gè)全局唯一ID就實(shí)現(xiàn)了消息等冪性,偽代碼如下:
- boolean isIDExisted = selectByID(ID); // 判斷ID是否存在
- if(isIDExisted) {
- return; //存在則直接返回
- } else {
- process(message); //不存在,則處理消息
- saveID(ID); //存儲(chǔ)ID
- }
但是在極端情況下,這種方式還是會(huì)出問題,如果消息在處理之后,還沒來得及保存到數(shù)據(jù)庫,消費(fèi)者就宕機(jī)重啟了,重啟之后還會(huì)再次獲取該消息,執(zhí)行時(shí)查詢?cè)撓⒉⑽幢幌M(fèi)過,還是會(huì)執(zhí)行兩次消費(fèi)??梢砸霐?shù)據(jù)庫事務(wù)來解決這個(gè)問題,但是會(huì)降低系統(tǒng)性能。如果對(duì)消息重復(fù)消費(fèi)沒有特別嚴(yán)格要求的話,直接使用這種沒有引入事務(wù)的通用方案就好了,畢竟這也是極小概率的事情。
在業(yè)務(wù)層面上,我們可選擇性就變多了,比如樂觀鎖、悲觀鎖、內(nèi)存去重(https://github.com/RoaringBitmap/RoaringBitmap)等方法。
我們拿樂觀鎖來舉例,比如我們要給一個(gè)用戶加積分,因?yàn)榧臃e分操作并不需要放在主業(yè)務(wù)中,所以就可以使用消息系統(tǒng)來異步通知,要使用樂觀鎖,就需要給積分表添加一個(gè)版本號(hào)字段。并且在生產(chǎn)消息的時(shí)候先查詢這個(gè)賬號(hào)的版本號(hào)并且連同消息一起發(fā)送到消息系統(tǒng)中。
圖片描述
消費(fèi)者拿到消息和版本號(hào)后,在執(zhí)行更新積分操作的 SQL 時(shí)帶上版本號(hào),類似于:
- update score set score = score + 20, version=version+1 where userId=1 and version=1;
這條消息消費(fèi)成功后,version 就變成了 2,那么如果有重復(fù)的 version=1 的消息再次被消費(fèi)者拉取到,SQL 語句并不會(huì)執(zhí)行成功,從而保證了消息的冪等性。
要保證消息僅被消費(fèi)一次,我們需要把重點(diǎn)放在消費(fèi)者這一段,利用等冪性來保證消息被消費(fèi)一次。
今天站在消息中間件的通用層面上,聊了聊如何保證數(shù)據(jù)不丟失和僅被消費(fèi)一次,希望今天的文章對(duì)您的學(xué)習(xí)或者工作有所幫助,如果您認(rèn)為文章有價(jià)值,歡迎點(diǎn)個(gè)贊,謝謝。