程序員過關(guān)斬將--真的可以用版本號的方式來保證MQ消費(fèi)消息的冪等性?
靈魂拷問
- MQ消息的消費(fèi)為什么有時候要求冪等性?
- 你們都說可以用版本號來解決冪等性消費(fèi)?
- 什么才是消息冪等性消費(fèi)的根本性問題?
隨著系統(tǒng)的復(fù)雜性不斷增加,多數(shù)系統(tǒng)都會引入MQ來進(jìn)行解耦,其實(shí)從引入MQ的初衷來說,多數(shù)系統(tǒng)是為了解耦多個模塊帶來的復(fù)雜性,而有些“架構(gòu)師”卻說的:為了解決性能問題。。。當(dāng)然我不排除MQ有流量削峰的作用,我只是說大部分系統(tǒng)引入MQ最初的初衷應(yīng)該是系統(tǒng)解耦。
當(dāng)一個大的單體系統(tǒng)逐漸被拆分為多個小系統(tǒng),也就是所謂的微服務(wù)拆分之后,無論是微服務(wù)之間的通信,還是分布式事務(wù),幾乎都需要MQ的支持,這也充分體現(xiàn)了分布式系統(tǒng)中MQ的重要性。這個時候整個系統(tǒng)間的交互就類似于下圖所示
image
生產(chǎn)消息
既然引入了MQ這個組件,必然意味著同時存在消息的生產(chǎn)者和消費(fèi)者,這也是典型的訂閱模式。在消息數(shù)據(jù)的整個生命周期中,會依次經(jīng)過生產(chǎn)者=》MQ=》消費(fèi)者,三個主要部分。在生產(chǎn)者角度,消息的可靠投遞是首要的任務(wù),由于網(wǎng)絡(luò)的不可靠性,所以消息理論上是不可能100%都投遞成功的,針對這種情況,一般的解決方案就是消息重傳。
當(dāng)然重傳機(jī)制并非無限制的重傳,可以根據(jù)業(yè)務(wù)制定具體的重傳策略,比如:可以設(shè)置最大重傳次數(shù)為10次,而重傳的時間間隔依次增加。這種方案雖然簡單,但是帶來的副作用就是消息重復(fù)投遞的問題。
為什么需要冪等性消費(fèi)
冪等是一個數(shù)學(xué)上的概念理論,它的意思是多次執(zhí)行同一個操作和執(zhí)行一次操作,最終得到的結(jié)果是相同的。
舉一個業(yè)務(wù)不恰當(dāng)?shù)呛軠?zhǔn)確的栗子:你的女朋友出軌一次和出軌多次,對于你來說,結(jié)果其實(shí)是一樣的:你被綠了。所以出軌一次和出軌多次的結(jié)果對于你來說是相同的。
對于MQ來說,退一萬步講,就算MQ的消息無重復(fù)投遞的問題,在消費(fèi)端的業(yè)務(wù)中,那些對于消息消費(fèi)敏感的業(yè)務(wù),我們在設(shè)計程序架構(gòu)的時候也要把消息的冪等性消費(fèi)考慮在其中,比如:用戶購買商品贈送紅包或者積分的業(yè)務(wù)場景,這樣的場景對于消息的重復(fù)消費(fèi)很敏感,如果程序處理不當(dāng),出現(xiàn)重復(fù)給用戶送紅包的情況,估計程序員又要背鍋來祭天了。
冪等性其實(shí)很好做
任何業(yè)務(wù)場景接口的冪等性設(shè)計,都要找出冪等性產(chǎn)生的數(shù)據(jù)標(biāo)識。
MQ消息的重復(fù)性問題,從消息的整個流轉(zhuǎn)過程來看,大體上可以在兩個方向來解決:
- 消息產(chǎn)生的時候避免投遞重復(fù)性消息,既:消息生產(chǎn)者來保證消息唯一性
- MQ本身提供重復(fù)消息的過濾功能
- 消息被消費(fèi)的時候避免被重復(fù)消費(fèi)
image
在消息被消費(fèi)之前的前半部分流程中,生產(chǎn)者可以利用唯一的消息id和ACK機(jī)制來做消息被重復(fù)投遞的保證工作,但是這樣會大大降低生產(chǎn)者業(yè)務(wù)的性能,一般情況下生產(chǎn)者都需要異步的來發(fā)送MQ消息,如果在發(fā)送的時候還需要檢查消息是否被發(fā)送過,這無疑不是一個好的設(shè)計,而且你這樣做的檢查效果,只為命中很渺小的一部分?jǐn)?shù)據(jù),得不償失,所以在生產(chǎn)者很少有人主動去做消息的重復(fù)投遞檢查工作。
至于在MQ的內(nèi)部,有的MQ確實(shí)會提供冪等性的存儲設(shè)計,比如Kafka引入了Producer ID(即PID)和Sequence Number。
- PID。每個新的Producer在初始化的時候會被分配一個唯一的PID,這個PID對用戶是不可見的。
- Sequence Numbler。(對于每個PID,該P(yáng)roducer發(fā)送數(shù)據(jù)的每個都對應(yīng)一個從0開始單調(diào)遞增的Sequence Number。
Broker端在緩存中保存了這seqnumber,對于接收的每條消息,如果其序號比Broker緩存中序號大于1則接受它,否則將其丟棄。這樣就可以實(shí)現(xiàn)了消息重復(fù)提交了。但是,只能保證單個Producer對于同一個的Exactly Once語義。不能保證同一個Producer一個topic不同的partion冪等。
然而這些都不是我們今天要說的重點(diǎn),實(shí)際的業(yè)務(wù)中,消息的冪等性消費(fèi)也更傾向于在消費(fèi)端做,在消息的終點(diǎn)徹底解決問題,無論是在系統(tǒng)設(shè)計,還是在可擴(kuò)展性上無疑都是最好的。
剛才也提到,消息既然要做到冪等性消費(fèi),必須要提供一個用于判斷重復(fù)的標(biāo)識,可以是自定義的消息ID,也可以是消息中幾個字段聯(lián)合起來的類似數(shù)據(jù)表中的主鍵,目前主流的做法是在生產(chǎn)方根據(jù)業(yè)務(wù)特點(diǎn)生成消息id,例如:給用戶添加因為下單而贈送積分的消息id,就可以根據(jù)userid_orderId_積分?jǐn)?shù)量來生成唯一的消息id。
有了唯一的消息id,消費(fèi)者就可以把已經(jīng)消費(fèi)的消息id,本地存儲下來用于過濾重復(fù)消息,當(dāng)然如果數(shù)據(jù)量比較大的話,很早之前的歷史數(shù)據(jù)完全可以刪除或者轉(zhuǎn)移到其他的備份表,畢竟同一個消息不可能過了很長時間再次被投遞。以下是一個本地消息表的例子:
字段 | 說明 |
---|---|
MsgId | 消息id |
CreateTime | 創(chuàng)建時間 |
... | 其他有用的業(yè)務(wù)字段 |
當(dāng)消費(fèi)新消息的時候,執(zhí)行以下類似以下的sql語句,拿到消息是否已經(jīng)消費(fèi)過的結(jié)果來判斷當(dāng)前消息是否需要重復(fù)消費(fèi)
select count(0) from table where MsgId='消息id'
當(dāng)然,這里還會有問題,如果只有一個消費(fèi)者進(jìn)行消費(fèi),不會有任何問題,如果有多個消費(fèi)者在并行的進(jìn)行消費(fèi),在判斷重復(fù)消息的時候你會需要鎖來保證同樣數(shù)據(jù)的順序化,這個時候你可能需要分布式鎖。
鄭重提示
除了生成消息id這種方式之外,網(wǎng)上有很多文章指出可以利用版本號來解決冪等性問題,試問:這種方案又有多少人親自實(shí)踐過?今天我們就以給用戶添加積分這個案例來庖丁解牛一下這個方案的做法:
- 用戶的積分表中需要添加版本號(Version)字段
- 消息的生產(chǎn)者在消息投遞中添加版本號字段
- 消費(fèi)者根據(jù)消息的版本號來執(zhí)行sql具體的sql類似:
- update user set amount=amount+10 ,version=version+1 where userid=100 and version=1
對于同一條消息的重復(fù)投遞來說,這樣做確實(shí)可以做到冪等性消費(fèi),畢竟程序利用數(shù)據(jù)庫的鎖機(jī)制來保證了一致性。那有什么問題呢?
消息的版本號問題
所有的分布式系統(tǒng)都面臨著同樣的問題,就是數(shù)據(jù)的一致性問題,MQ的消費(fèi)場景也不例外。以上邊用戶加積分為案例,因為消息的生產(chǎn)者在投遞消息的時候需要查詢當(dāng)前的版本號,類似于以下sql
- select version from table where userid=100
當(dāng)查詢到版本號信息自后,會把版本號作為消息體的一部分投遞到MQ,那在并發(fā)的情況下會發(fā)生什么情況呢?假設(shè)當(dāng)前的版本號為1:
線程A查詢版本號為1,然后投遞了版本號為1,消息id為x的消息,于此同時線程B也查詢了當(dāng)前用戶版本,數(shù)值也為1,然后投遞了消息id為Y的消息,這個時候消費(fèi)端無論是先消費(fèi)消息X還是消息Y,數(shù)據(jù)庫的版本號都會增加,則導(dǎo)致了另外一個消息由于版本號的不符而消費(fèi)失敗。
image
本文轉(zhuǎn)載自微信公眾號「架構(gòu)師修行之路」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系架構(gòu)師修行之路公眾號。