靈魂發(fā)問:重復(fù)消費(fèi)順序消費(fèi)分布式事務(wù)
hello大家好
我是大家的學(xué)習(xí)成長小伙伴Captain
我們繼續(xù)學(xué)習(xí)RocketMQ,上一篇我們學(xué)習(xí)了廣播消息、延遲消息、批量消息、過濾消息這些在RocketMQ中的特性,這一篇我們繼續(xù)來學(xué)習(xí)RocketMQ中的那些奇奇怪怪的特性,讓你在開發(fā)中如魚得水
這一篇我們要說的是重復(fù)消費(fèi)、順序消費(fèi)這兩個(gè)在消息隊(duì)列中常見的問題,以及一種事務(wù)消息,這種事務(wù)消息可以在消息隊(duì)列中完成分布式事務(wù)的特性
把之前的這些技術(shù)點(diǎn)有關(guān)的文章貼到這里,大家可以先讀一讀
- 搞懂什么是RocketMQ
- 我怎么不知道RocketMQ生產(chǎn)者有這么多用法?(圖片在末尾,不謝)
- 面試官問我:分布式事務(wù)是什么?
像這種啊,應(yīng)該都是面試場上非Ban必選的技術(shù)點(diǎn),除非面試官忘記了,否則他大概率會問起這些問題相關(guān)的技術(shù)棧,到時(shí)候可以到了發(fā)揮大家技術(shù)海和技術(shù)深度的時(shí)候了
01 重復(fù)消費(fèi)問題
問題開始
我們來聊一聊消息隊(duì)列中的重復(fù)消費(fèi)問題吧
這種問題應(yīng)該是必然存在的,也是大家使用消費(fèi)隊(duì)列必須考慮的問題之一,反正我用消息隊(duì)列這個(gè)問題都是首先考慮的,因?yàn)檫@個(gè)問題如果不去考慮,可能會造成業(yè)務(wù)上的不可接受的問題
重復(fù)消費(fèi),大家肯定也明白啥意思,就是同樣的消息消費(fèi)了多次
為什么說這種問題必然存在呢,因?yàn)橄㈥?duì)列一定有它的重試機(jī)制,也就是消息重發(fā),一旦消費(fèi)端出現(xiàn)異常的情況下,消息隊(duì)列會進(jìn)行消息的重發(fā)
你重發(fā)消息重新處理沒問題,但是一般一個(gè)消息的監(jiān)聽者不止一個(gè),也就是可能多個(gè)系統(tǒng)都在監(jiān)聽著處理這個(gè)消息,別的系統(tǒng)要是不支持重復(fù)消費(fèi),那豈不很糟糕
別的系統(tǒng)的數(shù)據(jù)就會出現(xiàn)混亂,各個(gè)系統(tǒng)之間的數(shù)據(jù)便會出現(xiàn)不一致的情況
舉個(gè)例子,電商系統(tǒng)中的支付成功消息,支付成功之后發(fā)送一個(gè)消息,積分系統(tǒng)、物流系統(tǒng)多個(gè)系統(tǒng)監(jiān)聽這一消息,積分系統(tǒng)處理出現(xiàn)異常,該支付成功的消息重新發(fā)送了一條,物流系統(tǒng)要是不支持消息的重試,那就出現(xiàn)了兩個(gè)物流單子,那可能會造成客戶買了一件商品,付了一件商品的錢,結(jié)果呢,給用戶發(fā)了多個(gè)該商品
???
啊這...
這樣豈不糟糕透了?你也可能該收拾東西了
其實(shí)出現(xiàn)消息重試這真的真的是很常見的情況,也是大家在使用消息隊(duì)列必須必須要考慮的,比如網(wǎng)絡(luò)抖動、系統(tǒng)業(yè)務(wù)的處理bug等,這個(gè)問題不處理,系統(tǒng)后患無窮
那這種重復(fù)消費(fèi)問題如何避免呢
解決方案:冪等
???
簡單來說,冪等是一個(gè)數(shù)學(xué)上的概念,通俗的解釋就是同樣的參數(shù)多次調(diào)用同樣的接口,調(diào)用的結(jié)果都是一樣的,也就是你支付成功的消息發(fā)送多少次,最終生成的物流數(shù)據(jù)還是一條
這樣就沒問題了
那如何去保證冪等呢
像這種問題我一般是分為兩種場景去回答的,一種是生產(chǎn)端的冪等,另一種是消費(fèi)端的冪等
生產(chǎn)者端的冪等一般都是通過第三方的存儲來完成的,比如Redis,或者是流水表,在消息發(fā)送之后,將記錄暫時(shí)保存起來,下次發(fā)送消息之前,在Redis中檢查該消息是否發(fā)送過,不過這種在很多場景下是不合適的,這種會在生產(chǎn)端就限制了重試這一機(jī)制
如果生產(chǎn)端發(fā)送成功,消費(fèi)失敗,則不會重新發(fā)送該消息
另一種消費(fèi)者端的冪等,這種是屬于最常見的,生產(chǎn)者無論發(fā)送多少次同樣的消息,最終的執(zhí)行結(jié)果都是一樣的,可以分為強(qiáng)冪等和弱冪等來處理
強(qiáng)冪等其實(shí)就是用于必須冪等的業(yè)務(wù)場景,不允許出現(xiàn)差錯的,這種更為謹(jǐn)慎些,比如上面的支付成功的這種消息,物流消費(fèi)方的處理肯定要是強(qiáng)冪等咯
這里可以引進(jìn)一個(gè)三方存儲,流水表或者Redis都可以,支付成功之后,記錄到流水表中,這里用Redis可能會丟失,把支付成功和記錄到流水表放入到同一個(gè)事務(wù)中,要么一起成功,要么一起失敗
每次消息過來之后根據(jù)訂單號去流水表中檢查是否有這條流水,有流水則直接return就可以了
也可以直接用數(shù)據(jù)庫的唯一約束來做insert操作
還以一種屬于是弱冪等性,這種不能保證百分百情況下冪等,比如用Redis來存儲業(yè)務(wù)ID作為唯一key來處理,Redis宕機(jī)可能導(dǎo)致短信發(fā)送情況的丟失,不過問題不大,用戶也是可以接受的,我們來看一下實(shí)例代碼
String idempotentValue = RedisUtil.get(RedisConstant.IDEMPOTENT.concat(msgId), String.class); if (!StringUtils.isEmpty(idempotentValue)) { log.info("========該消息已經(jīng)被消費(fèi):【{}】", msgBody); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //業(yè)務(wù)代碼 //冪等處理 RedisUtil.setEx(RedisConstant.IDEMPOTENT.concat(msgId), "1", 5, TimeUnit.DAYS);
02 順序消費(fèi)
那你說一下你有沒有遇到過順序消費(fèi)這個(gè)場景呢?
順序消費(fèi)這個(gè)場景其實(shí)不是特別的常見,但是也是必不可少的,因?yàn)樵谀承I(yè)務(wù)場景下順序是很關(guān)鍵的,保證消息的消費(fèi)順序也是很關(guān)鍵的
比如我們有一個(gè)操作需要對數(shù)據(jù)進(jìn)行刪除、增加、修改三個(gè)操作,這種在一般的系統(tǒng)中我們都會采用SQL來進(jìn)行操作,但是當(dāng)數(shù)據(jù)量很大的時(shí)候,我們做備份同步數(shù)據(jù)的時(shí)候,這種同步有的時(shí)候會通過消息隊(duì)列來慢慢的去執(zhí)行,這個(gè)時(shí)候就很有必要保證消息的順序性,如果上面的三個(gè)操作變成了修改、刪除、增加這樣的順序,那就不是我們想要的效果了
普通的消息的消費(fèi)當(dāng)然是沒有固定順序的,消息發(fā)送的時(shí)候默認(rèn)是采用的輪詢的方式發(fā)送到不同的分區(qū)中
???
而消費(fèi)端消費(fèi)的時(shí)候則是會分配到多個(gè)分區(qū)的,多個(gè)分區(qū)是同時(shí)拉取提交消費(fèi)的,在同一個(gè)分區(qū)queue中,是可以保證FIFO的,但是普通消息是沒法達(dá)到順序消費(fèi)的,只需要將消息投遞到同一條queue中即可
???
按照上面所說,我們只需要保證需要保持順序的消息投遞到相同的queue中即可,這樣同一個(gè)queue中的消息肯定會投遞到同一個(gè)消費(fèi)實(shí)例,同一個(gè)消費(fèi)實(shí)例肯定是順序拉取消息,然后順序的去消費(fèi)
即使觸發(fā)重排導(dǎo)致queue分配給了別的消費(fèi)者也沒有關(guān)系,由于queue的消息永遠(yuǎn)是FIFO的,所以只需要保證消息的重復(fù)消費(fèi)的冪等性即可,queue的內(nèi)部順序還是沒問題的
順序消費(fèi)分配全局順序和分區(qū)順序
- 全局順序:對于指定的一個(gè)Topic,所有消息按照嚴(yán)格的先入先出FIFO(First In First Out)的順序進(jìn)行發(fā)布和消費(fèi)。
- 分區(qū)順序:對于指定的一個(gè)Topic,所有消息根據(jù)Sharding Key進(jìn)行區(qū)塊分區(qū)。同一個(gè)分區(qū)內(nèi)的消息按照嚴(yán)格的FIFO順序進(jìn)行發(fā)布和消費(fèi)。Sharding Key是順序消息中用來區(qū)分不同分區(qū)的關(guān)鍵字段,和普通消息的Key是完全不同的概念。
為什么全局魂虛順序消息消費(fèi)性能一般
全局順序消息是嚴(yán)格按照FIFO的消息阻塞原則,即上一條消息沒有被成功消費(fèi),那么下一條消息會一直被存儲到Topic隊(duì)列中。如果想提高全局順序消息的TPS,可以升級實(shí)例配置,同時(shí)消息客戶端應(yīng)用盡量減少處理本地業(yè)務(wù)邏輯的耗時(shí)。
在rocketmq中,一個(gè)topic下有多個(gè)隊(duì)列queue,于是乎為了保證消息的順序性,將消息發(fā)送到同一個(gè)queue中,rocketmq提供了MessageQueueSelector隊(duì)列選擇機(jī)制,有三種實(shí)現(xiàn)
???
使用Hash取模法讓需要順序消費(fèi)的消息發(fā)送到同一個(gè)queue中,再使用同步發(fā)送,當(dāng)然這個(gè)取模根據(jù)的是這些消息的共同屬性
rocketmq僅僅保證了發(fā)送的順序性,至于最終的順序消費(fèi)還是要由消費(fèi)者業(yè)務(wù)來保證,就是我保證我發(fā)給你的是按照順序的消息,但是你要是自己給處理亂了就不關(guān)我rocketmq的事了,那就是你自己的代碼問題了
其實(shí)還是存在一些異常的場景會導(dǎo)致出現(xiàn)亂序的情況,比如master宕機(jī),導(dǎo)致寫入隊(duì)列的數(shù)量發(fā)生了變化,你想啊,采用上面的hash取模就會出現(xiàn)消息分散到其它的queue中,這樣就不能保證有序了,除非選擇master如果掛了就無法發(fā)送接下來的消息
03 分布式事務(wù)
聊一下分布式事務(wù)吧
大家看一下這篇面試官問我:分布式事務(wù)是什么?
簡單來說就是,事務(wù)是要么全部執(zhí)行成功,要么全部執(zhí)行失敗;而分布式事務(wù)就是跨機(jī)器的,跨服務(wù)的,跨系統(tǒng)的事務(wù)保證,現(xiàn)在的系統(tǒng)都是拆分成很多的服務(wù),每個(gè)服務(wù)最少部署兩臺,分別部署在不同的機(jī)器上
這樣系統(tǒng)之間的事務(wù)保證就是分布式事務(wù)
而rocketmq中的事務(wù)消息則天然支持分布式事務(wù)
事務(wù)消息:實(shí)現(xiàn)類似X或者Open XA的分布式事務(wù)功能,以達(dá)到最終一致性
消息隊(duì)列RocketMQ版提供類似X或Open XA的分布式事務(wù)功能,通過消息隊(duì)列RocketMQ版事務(wù)消息,能達(dá)到分布式事務(wù)的最終一致。
半事務(wù)消息:暫不能投遞的消息,發(fā)送方已經(jīng)成功地將消息發(fā)送到了消息隊(duì)列RocketMQ版服務(wù)端,但是服務(wù)端未收到生產(chǎn)者對該消息的二次確認(rèn),此時(shí)該消息被標(biāo)記成“暫不能投遞”狀態(tài),處于該種狀態(tài)下的消息即半事務(wù)消息。
消息回查:由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失,消息隊(duì)列RocketMQ版服務(wù)端通過掃描發(fā)現(xiàn)某條消息長期處于“半事務(wù)消息”時(shí),需要主動向消息生產(chǎn)者詢問該消息的最終狀態(tài)(Commit或是Rollback),該詢問過程即消息回查。
???
跟Captain來看看事務(wù)消息發(fā)送步驟:
1、發(fā)送方將半事務(wù)消息發(fā)送到服務(wù)端Broker,服務(wù)端會將消息持久化,成功之后會返回ACK確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息為半事務(wù)消息
2、發(fā)送方開始執(zhí)行本地事務(wù)的邏輯
3、發(fā)送方會根據(jù)本地事務(wù)的執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn),決定Commit還是Rollback,服務(wù)端收到Commit之后則把這個(gè)消息標(biāo)記為可投遞,發(fā)送到消費(fèi)方;服務(wù)端收到Rollback之后則刪除半事務(wù)消息,服務(wù)端不會發(fā)送,則消費(fèi)方也不會收到
如可是如果斷網(wǎng)或者應(yīng)用重啟這些情況,上述的步驟的二次確認(rèn)信息無法到達(dá)服務(wù)端,怎么辦?
這里其實(shí)有個(gè)回查機(jī)制,發(fā)送方發(fā)送消息之后,需要本地執(zhí)行事務(wù),如果事務(wù)執(zhí)行的過程出現(xiàn)卡死的情況,或者事務(wù)執(zhí)行結(jié)果因?yàn)榫W(wǎng)絡(luò)等問題,無法傳遞事務(wù)結(jié)果到服務(wù)端,服務(wù)端會執(zhí)行一個(gè)回查機(jī)制,來確認(rèn)這個(gè)半事務(wù)消息的最終提交情況
本文轉(zhuǎn)載自微信公眾號「Java賊船」
???