消息冪等(去重)通用解決方案,寫得真好!
消息中間件是分布式系統(tǒng)常用的組件,無(wú)論是異步化、解耦、削峰等都有廣泛的應(yīng)用價(jià)值。我們通常會(huì)認(rèn)為,消息中間件是一個(gè)可靠的組件——這里所謂的可靠是指,只要我把消息成功投遞到了消息中間件,消息就不會(huì)丟失,即消息肯定會(huì)至少保證消息能被消費(fèi)者成功消費(fèi)一次,這是消息中間件最基本的特性之一,也就是我們常說(shuō)的“AT LEAST ONCE”,即消息至少會(huì)被“成功消費(fèi)一遍”。
舉個(gè)例子,一個(gè)消息M發(fā)送到了消息中間件,消息投遞到了消費(fèi)程序A,A接受到了消息,然后進(jìn)行消費(fèi),但在消費(fèi)到一半的時(shí)候程序重啟了,這時(shí)候這個(gè)消息并沒(méi)有標(biāo)記為消費(fèi)成功,這個(gè)消息還會(huì)繼續(xù)投遞給這個(gè)消費(fèi)者,直到其消費(fèi)成功了,消息中間件才會(huì)停止投遞。
然而這種可靠的特性導(dǎo)致,消息可能被多次地投遞。舉個(gè)例子,還是剛剛這個(gè)例子,程序A接受到這個(gè)消息M并完成消費(fèi)邏輯之后,正想通知消息中間件“我已經(jīng)消費(fèi)成功了”的時(shí)候,程序就重啟了,那么對(duì)于消息中間件來(lái)說(shuō),這個(gè)消息并沒(méi)有成功消費(fèi)過(guò),所以他還會(huì)繼續(xù)投遞。這時(shí)候?qū)τ趹?yīng)用程序A來(lái)說(shuō),看起來(lái)就是這個(gè)消息明明消費(fèi)成功了,但是消息中間件還在重復(fù)投遞。
這在RockectMQ的場(chǎng)景來(lái)看,就是同一個(gè)messageId的消息重復(fù)投遞下來(lái)了。
基于消息的投遞可靠(消息不丟)是優(yōu)先級(jí)更高的,所以消息不重的任務(wù)就會(huì)轉(zhuǎn)移到應(yīng)用程序自我實(shí)現(xiàn),這也是為什么RocketMQ的文檔里強(qiáng)調(diào)的,消費(fèi)邏輯需要自我實(shí)現(xiàn)冪等。背后的邏輯其實(shí)就是:不丟和不重是矛盾的(在分布式場(chǎng)景下),但消息重復(fù)是有解決方案的,而消息丟失是很麻煩的。
簡(jiǎn)單的消息去重解決方案
例如:假設(shè)我們業(yè)務(wù)的消息消費(fèi)邏輯是:插入某張訂單表的數(shù)據(jù),然后更新庫(kù)存:
- insert into t_order values .....
- update t_inv set countcount = count-1 where good_id = 'good123';
要實(shí)現(xiàn)消息的冪等,我們可能會(huì)采取這樣的方案:
- select * from t_order where order_no = 'order123'
- if(order != null) {
- return ;//消息重復(fù),直接返回
- }
這對(duì)于很多情況下,的確能起到不錯(cuò)的效果,但是在并發(fā)場(chǎng)景下,還是會(huì)有問(wèn)題。
并發(fā)重復(fù)消息
假設(shè)這個(gè)消費(fèi)的所有代碼加起來(lái)需要1秒,有重復(fù)的消息在這1秒內(nèi)(假設(shè)100毫秒)內(nèi)到達(dá)(例如生產(chǎn)者快速重發(fā),Broker重啟等),那么很可能,上面去重代碼里面會(huì)發(fā)現(xiàn),數(shù)據(jù)依然是空的(因?yàn)樯弦粭l消息還沒(méi)消費(fèi)完,還沒(méi)成功更新訂單狀態(tài)),
那么就會(huì)穿透掉檢查的擋板,最后導(dǎo)致重復(fù)的消息消費(fèi)邏輯進(jìn)入到非冪等安全的業(yè)務(wù)代碼中,從而引發(fā)重復(fù)消費(fèi)的問(wèn)題(如主鍵沖突拋出異常、庫(kù)存被重復(fù)扣減而沒(méi)釋放等)
并發(fā)去重的解決方案之一
要解決上面并發(fā)場(chǎng)景下的消息冪等問(wèn)題,一個(gè)可取的方案是開啟事務(wù)把select 改成 select for update語(yǔ)句,把記錄進(jìn)行鎖定。
- select * from t_order where order_no = 'THIS_ORDER_NO' for update //開啟事務(wù) if(order.status != null) {
- return ;//消息重復(fù),直接返回
- }
但這樣消費(fèi)的邏輯會(huì)因?yàn)橐肓耸聞?wù)包裹而導(dǎo)致整個(gè)消息消費(fèi)可能變長(zhǎng),并發(fā)度下降。
當(dāng)然還有其他更高級(jí)的解決方案,例如更新訂單狀態(tài)采取樂(lè)觀鎖,更新失敗則消息重新消費(fèi)之類的。但這需要針對(duì)具體業(yè)務(wù)場(chǎng)景做更復(fù)雜和細(xì)致的代碼開發(fā)、庫(kù)表設(shè)計(jì),不在本文討論的范圍。
但無(wú)論是select for update, 還是樂(lè)觀鎖這種解決方案,實(shí)際上都是基于業(yè)務(wù)表本身做去重,這無(wú)疑增加了業(yè)務(wù)開發(fā)的復(fù)雜度, 一個(gè)業(yè)務(wù)系統(tǒng)里面很大部分的請(qǐng)求處理都是依賴MQ的,如果每個(gè)消費(fèi)邏輯本身都需要基于業(yè)務(wù)本身而做去重/冪等的開發(fā)的話,這是繁瑣的工作量。本文希望探索出一個(gè)通用的消息冪等處理的方法,從而抽象出一定的工具類用以適用各個(gè)業(yè)務(wù)場(chǎng)景。
Exactly Once
在消息中間件里,有一個(gè)投遞語(yǔ)義的概念,而這個(gè)語(yǔ)義里有一個(gè)叫”Exactly Once”,即消息肯定會(huì)被成功消費(fèi),并且只會(huì)被消費(fèi)一次。以下是阿里云里對(duì)Exactly Once的解釋:
Exactly-Once 是指發(fā)送到消息系統(tǒng)的消息只能被消費(fèi)端處理且僅處理一次,即使生產(chǎn)端重試消息發(fā)送導(dǎo)致某消息重復(fù)投遞,該消息在消費(fèi)端也只被消費(fèi)一次。
在我們業(yè)務(wù)消息冪等處理的領(lǐng)域內(nèi),可以認(rèn)為業(yè)務(wù)消息的代碼肯定會(huì)被執(zhí)行,并且只被執(zhí)行一次,那么我們可以認(rèn)為是Exactly Once。
但這在分布式的場(chǎng)景下想找一個(gè)通用的方案幾乎是不可能的。不過(guò)如果是針對(duì)基于數(shù)據(jù)庫(kù)事務(wù)的消費(fèi)邏輯,實(shí)際上是可行的。
基于關(guān)系數(shù)據(jù)庫(kù)事務(wù)插入消息表
假設(shè)我們業(yè)務(wù)的消息消費(fèi)邏輯是:更新MySQL數(shù)據(jù)庫(kù)的某張訂單表的狀態(tài):
- update t_order set status = 'SUCCESS' where order_no= 'order123';
要實(shí)現(xiàn)Exaclty Once即這個(gè)消息只被消費(fèi)一次(并且肯定要保證能消費(fèi)一次),我們可以這樣做:在這個(gè)數(shù)據(jù)庫(kù)中增加一個(gè)消息消費(fèi)記錄表,把消息插入到這個(gè)表,并且把原來(lái)的訂單更新和這個(gè)插入的動(dòng)作放到同一個(gè)事務(wù)中一起提交,就能保證消息只會(huì)被消費(fèi)一遍了。
- 開啟事務(wù)
- 插入消息表(處理好主鍵沖突的問(wèn)題)
- 更新訂單表(原消費(fèi)邏輯)
- 提交事務(wù)
說(shuō)明:
1. 這時(shí)候如果消息消費(fèi)成功并且事務(wù)提交了,那么消息表就插入成功了,這時(shí)候就算RocketMQ還沒(méi)有收到消費(fèi)位點(diǎn)的更新再次投遞,也會(huì)插入消息失敗而視為已經(jīng)消費(fèi)過(guò),后續(xù)就直接更新消費(fèi)位點(diǎn)了。這保證我們消費(fèi)代碼只會(huì)執(zhí)行一次。
2. 如果事務(wù)提交之前服務(wù)掛了(例如重啟),對(duì)于本地事務(wù)并沒(méi)有執(zhí)行所以訂單沒(méi)有更新,消息表也沒(méi)插入成功;而對(duì)于RocketMQ服務(wù)端來(lái)說(shuō),消費(fèi)位點(diǎn)也沒(méi)更新,所以消息還會(huì)繼續(xù)投遞下來(lái),投遞下來(lái)發(fā)現(xiàn)這個(gè)消息插入消息表也是成功的,所以可以繼續(xù)消費(fèi)。這保證了消息不丟失。
事實(shí)上,阿里云ONS的EXACTLY-ONCE語(yǔ)義的實(shí)現(xiàn)上,就是類似這個(gè)方案基于數(shù)據(jù)庫(kù)的事務(wù)特性實(shí)現(xiàn)的。
基于這種方式,的確這是有能力拓展到不同的應(yīng)用場(chǎng)景,因?yàn)樗膶?shí)現(xiàn)方案與具體業(yè)務(wù)本身無(wú)關(guān)——而是依賴一個(gè)消息表。
但是這里有它的局限性
- 消息的消費(fèi)邏輯必須是依賴于關(guān)系型數(shù)據(jù)庫(kù)事務(wù)。如果消費(fèi)的消費(fèi)過(guò)程中還涉及其他數(shù)據(jù)的修改,例如Redis這種不支持事務(wù)特性的數(shù)據(jù)源,則這些數(shù)據(jù)是不可回滾的。
- 數(shù)據(jù)庫(kù)的數(shù)據(jù)必須是在一個(gè)庫(kù),跨庫(kù)無(wú)法解決
注:業(yè)務(wù)上,消息表的設(shè)計(jì)不應(yīng)該以消息ID作為標(biāo)識(shí),而應(yīng)該以業(yè)務(wù)的業(yè)務(wù)主鍵作為標(biāo)識(shí)更為合理,以應(yīng)對(duì)生產(chǎn)者的重發(fā)。阿里云上的消息去重只是RocketMQ的messageId,在生產(chǎn)者因?yàn)槟承┰蚴謩?dòng)重發(fā)(例如上游針對(duì)一個(gè)交易重復(fù)請(qǐng)求了)的場(chǎng)景下起不到去重/冪等的效果(因消息id不同)。
更復(fù)雜的業(yè)務(wù)場(chǎng)景
如上所述,這種方式Exactly Once語(yǔ)義的實(shí)現(xiàn),實(shí)際上有很多局限性,這種局限性使得這個(gè)方案基本不具備廣泛應(yīng)用的價(jià)值。并且由于基于事務(wù),可能導(dǎo)致鎖表時(shí)間過(guò)長(zhǎng)等性能問(wèn)題。
例如我們以一個(gè)比較常見的一個(gè)訂單申請(qǐng)的消息來(lái)舉例,可能有以下幾步(以下統(tǒng)稱為步驟X):
- 檢查庫(kù)存(RPC)
- 鎖庫(kù)存(RPC)
- 開啟事務(wù),插入訂單表(MySQL)
- 調(diào)用某些其他下游服務(wù)(RPC)
- 更新訂單狀態(tài)
- commit 事務(wù)(MySQL)
這種情況下,我們?nèi)绻扇∠⒈?本地事務(wù)的實(shí)現(xiàn)方式,消息消費(fèi)過(guò)程中很多子過(guò)程是不支持回滾的,也就是說(shuō)就算我們加了事務(wù),實(shí)際上這背后的操作并不是原子性的。怎么說(shuō)呢,就是說(shuō)有可能第一條小在經(jīng)歷了第二步鎖庫(kù)存的時(shí)候,服務(wù)重啟了,這時(shí)候?qū)嶋H上庫(kù)存是已經(jīng)在另外的服務(wù)里被鎖定了,這并不能被回滾。當(dāng)然消息還會(huì)再次投遞下來(lái),要保證消息能至少消費(fèi)一遍,換句話說(shuō),鎖庫(kù)存的這個(gè)RPC接口本身依舊要支持“冪等”。
再者,如果在這個(gè)比較耗時(shí)的長(zhǎng)鏈條場(chǎng)景下加入事務(wù)的包裹,將大大的降低系統(tǒng)的并發(fā)。所以通常情況下,我們處理這種場(chǎng)景的消息去重的方法還是會(huì)使用一開始說(shuō)的業(yè)務(wù)自己實(shí)現(xiàn)去重邏輯的方式,如前面加select for update,或者使用樂(lè)觀鎖。
那我們有沒(méi)有方法抽取出一個(gè)公共的解決方案,能兼顧去重、通用、高性能呢?Java 核心技術(shù)教程和示例源碼:https://github.com/javastacks/javastack
拆解消息執(zhí)行過(guò)程
其中一個(gè)思路是把上面的幾步,拆解成幾個(gè)不同的子消息,例如:
- 庫(kù)存系統(tǒng)消費(fèi)A:檢查庫(kù)存并做鎖庫(kù)存,發(fā)送消息B給訂單服務(wù)
- 訂單系統(tǒng)消費(fèi)消息B:插入訂單表(MySQL),發(fā)送消息C給自己(下游系統(tǒng))消費(fèi)
- 下游系統(tǒng)消費(fèi)消息C:處理部分邏輯,發(fā)送消息D給訂單系統(tǒng)
- 訂單系統(tǒng)消費(fèi)消息D:更新訂單狀態(tài)
注:上述步驟需要保證本地事務(wù)和消息是一個(gè)事務(wù)的(至少是最終一致性的),這其中涉及到分布式事務(wù)消息相關(guān)的話題,不在本文論述。
可以看到這樣的處理方法會(huì)使得每一步的操作都比較原子,而原子則意味著是小事務(wù),小事務(wù)則意味著使用消息表+事務(wù)的方案顯得可行。
分享給你:Spring Boot 學(xué)習(xí)筆記,這個(gè)太全了!
然而,這太復(fù)雜了!這把一個(gè)本來(lái)連續(xù)的代碼邏輯割裂成多個(gè)系統(tǒng)多次消息交互!那還不如業(yè)務(wù)代碼層面上加鎖實(shí)現(xiàn)呢。另外,多線程系列面試題和答案全部整理好了,微信搜索Java技術(shù)棧,在后臺(tái)發(fā)送:面試,可以在線閱讀。
更通用的解決方案
上面消息表+本地事務(wù)的方案之所以有其局限性和并發(fā)的短板,究其根本是因?yàn)樗蕾囉陉P(guān)系型數(shù)據(jù)庫(kù)的事務(wù),且必須要把事務(wù)包裹于整個(gè)消息消費(fèi)的環(huán)節(jié)。
如果我們能不依賴事務(wù)而實(shí)現(xiàn)消息的去重,那么方案就能推廣到更復(fù)雜的場(chǎng)景例如:RPC、跨庫(kù)等。
例如,我們依舊使用消息表,但是不依賴事務(wù),而是針對(duì)消息表增加消費(fèi)狀態(tài),是否可以解決問(wèn)題呢?
基于消息冪等表的非事務(wù)方案
以上是去事務(wù)化后的消息冪等方案的流程,可以看到,此方案是無(wú)事務(wù)的,而是針對(duì)消息表本身做了狀態(tài)的區(qū)分:消費(fèi)中、消費(fèi)完成。只有消費(fèi)完成的消息才會(huì)被冪等處理掉。
而對(duì)于已有消費(fèi)中的消息,后面重復(fù)的消息會(huì)觸發(fā)延遲消費(fèi)(在RocketMQ的場(chǎng)景下即發(fā)送到RETRY TOPIC),之所以觸發(fā)延遲消費(fèi)是為了控制并發(fā)場(chǎng)景下,第二條消息在第一條消息沒(méi)完成的過(guò)程中,去控制消息不丟(如果直接冪等,那么會(huì)丟失消息(同一個(gè)消息id的話),因?yàn)樯弦粭l消息如果沒(méi)有消費(fèi)完成的時(shí)候,第二條消息你已經(jīng)告訴broker成功了,那么第一條消息這時(shí)候失敗broker也不會(huì)重新投遞了)
上面的流程不再細(xì)說(shuō),后文有g(shù)ithub源碼的地址,讀者可以參考源碼的實(shí)現(xiàn),這里我們回頭看看我們一開始想解決的問(wèn)題是否解決了:
- 消息已經(jīng)消費(fèi)成功了,第二條消息將被直接冪等處理掉(消費(fèi)成功)。
- 并發(fā)場(chǎng)景下的消息,依舊能滿足不會(huì)出現(xiàn)消息重復(fù),即穿透冪等擋板的問(wèn)題。
- 支持上游業(yè)務(wù)生產(chǎn)者重發(fā)的業(yè)務(wù)重復(fù)的消息冪等問(wèn)題。
關(guān)于第一個(gè)問(wèn)題已經(jīng)很明顯已經(jīng)解決了,在此就不討論了。
關(guān)于第二個(gè)問(wèn)題是如何解決的?主要是依靠插入消息表的這個(gè)動(dòng)作做控制的,假設(shè)我們用MySQL作為消息表的存儲(chǔ)媒介(設(shè)置消息的唯一ID為主鍵),那么插入的動(dòng)作只有一條消息會(huì)成功,后面的消息插入會(huì)由于主鍵沖突而失敗,走向延遲消費(fèi)的分支,然后后面延遲消費(fèi)的時(shí)候就會(huì)變成上面第一個(gè)場(chǎng)景的問(wèn)題。
關(guān)于第三個(gè)問(wèn)題,只要我們?cè)O(shè)計(jì)去重的消息鍵讓其支持業(yè)務(wù)的主鍵(例如訂單號(hào)、請(qǐng)求流水號(hào)等),而不僅僅是messageId即可。所以也不是問(wèn)題。另外,MySQL 系列面試題和答案全部整理好了,微信搜索Java技術(shù)棧,在后臺(tái)發(fā)送:面試,可以在線閱讀。
此方案是否有消息丟失的風(fēng)險(xiǎn)?
如果細(xì)心的讀者可能會(huì)發(fā)現(xiàn)這里實(shí)際上是有邏輯漏洞的,問(wèn)題出在上面聊到的個(gè)三問(wèn)題中的第2個(gè)問(wèn)題(并發(fā)場(chǎng)景),在并發(fā)場(chǎng)景下我們依賴于消息狀態(tài)是做并發(fā)控制使得第2條消息重復(fù)的消息會(huì)不斷延遲消費(fèi)(重試)。但如果這時(shí)候第1條消息也由于一些異常原因(例如機(jī)器重啟了、外部異常導(dǎo)致消費(fèi)失敗)沒(méi)有成功消費(fèi)成功呢?也就是說(shuō)這時(shí)候延遲消費(fèi)實(shí)際上每次下來(lái)看到的都是消費(fèi)中的狀態(tài),最后消費(fèi)就會(huì)被視為消費(fèi)失敗而被投遞到死信Topic中(RocketMQ默認(rèn)可以重復(fù)消費(fèi)16次)。
有這種顧慮是正確的!對(duì)于此,我們解決的方法是,插入的消息表必須要帶一個(gè)最長(zhǎng)消費(fèi)過(guò)期時(shí)間,例如10分鐘,意思是如果一個(gè)消息處于消費(fèi)中超過(guò)10分鐘,就需要從消息表中刪除(需要程序自行實(shí)現(xiàn))。所以最后這個(gè)消息的流程會(huì)是這樣的:
更靈活的消息表存儲(chǔ)媒介
我們這個(gè)方案實(shí)際上沒(méi)有事務(wù)的,只需要一個(gè)存儲(chǔ)的中心媒介,那么自然我們可以選擇更靈活的存儲(chǔ)媒介,例如Redis。另外,Redis 系列面試題和答案全部整理好了,微信搜索Java技術(shù)棧,在后臺(tái)發(fā)送:面試,可以在線閱讀。
使用Redis有兩個(gè)好處:
- 性能上損耗更低
- 上面我們講到的超時(shí)時(shí)間可以直接利用Redis本身的ttl實(shí)現(xiàn)
當(dāng)然Redis存儲(chǔ)的數(shù)據(jù)可靠性、一致性等方面是不如MySQL的,需要用戶自己取舍。