聽(tīng)叔一句勸,消息隊(duì)列的水太深,你把握不??!
本文轉(zhuǎn)載自微信公眾號(hào)「JAVA日知錄」,作者單一色調(diào) 。轉(zhuǎn)載本文請(qǐng)聯(lián)系JAVA日知錄公眾號(hào)。
很多人在做架構(gòu)設(shè)計(jì)時(shí)往往會(huì)“過(guò)度設(shè)計(jì)”,簡(jiǎn)單問(wèn)題復(fù)雜化,上來(lái)就引一堆中間件,我想大概原因主要有下面兩點(diǎn):
為了秀(學(xué))技術(shù)而架構(gòu)
我們常說(shuō)技術(shù)是為業(yè)務(wù)服務(wù)的,不能為了技術(shù)而技術(shù),為了秀技術(shù)引入一堆復(fù)雜架構(gòu)這是要不得的。
考慮問(wèn)題不全面,或者說(shuō)廣度不夠,不知道如何簡(jiǎn)單化
舉個(gè)栗子,假設(shè)有一個(gè)高并發(fā)的用戶平臺(tái)需要處理注冊(cè)(寫(xiě))及登錄查詢(讀)功能,在數(shù)據(jù)庫(kù)層做了主從同步。
有人為了解決主從同步延時(shí)問(wèn)題引入了一個(gè)Redis,想實(shí)現(xiàn)寫(xiě)主庫(kù)的時(shí)候同時(shí)寫(xiě)Redis,然后讀的時(shí)候直接讀Redis,用以避免了主從延時(shí)同步問(wèn)題,這就是典型的考慮問(wèn)題不全面,這雖然可以解決主從延時(shí)問(wèn)題,但是又會(huì)導(dǎo)致雙寫(xiě)一致性事務(wù)問(wèn)題的產(chǎn)生,那不如直接把主從同步的方式改成強(qiáng)同步復(fù)制直接從數(shù)據(jù)庫(kù)層面保證了一致性。
那你可能會(huì)說(shuō)改成強(qiáng)同步復(fù)制不是會(huì)增加響應(yīng)時(shí)間進(jìn)而影響系統(tǒng)吞吐量嗎,那咱還可以對(duì)用戶做個(gè)分庫(kù),多做幾個(gè)主從同步出來(lái)不就可以了嗎?
誒誒誒,跑題了,今天咱不是說(shuō)消息隊(duì)列嗎?
哦,言歸正傳。今天我們說(shuō)說(shuō)消息隊(duì)列的問(wèn)題,希望看完本文大家在引入消息隊(duì)列的時(shí)候先想一想,是不是一定要引入?引入消息隊(duì)列后產(chǎn)生的問(wèn)題能不能解決?
消息隊(duì)列的作用
在微服務(wù)開(kāi)發(fā)中我們經(jīng)常會(huì)引入消息中間件實(shí)現(xiàn)業(yè)務(wù)解耦,執(zhí)行異步操作, 現(xiàn)在讓我們來(lái)看看使用消息中間件的好處和弊端。
首先需要肯定是使用消息組件有很多好處,其中最核心的三個(gè)是:解耦、異步、削峰。
- 解耦:客戶端只要講請(qǐng)求發(fā)送給特定的通道即可,不需要感知接收請(qǐng)求實(shí)例的情況。
- 異步:將消息寫(xiě)入消息隊(duì)列,非必要的業(yè)務(wù)邏輯以異步的方式運(yùn)行,加快響應(yīng)速度。
- 削峰:消息中間件在消息被消費(fèi)之前一直緩存消息,消息處理端可以按照自己處理的并發(fā)量從消息隊(duì)列中慢慢處理消息,不會(huì)一瞬間壓垮業(yè)務(wù)。
當(dāng)然消息中間件并不是銀彈,引入消息機(jī)制后也會(huì)有如下一些弊端:
- 潛在的性能瓶頸:消息代理可能會(huì)存在性能瓶頸。幸運(yùn)的是目前主流的消息中間件都支持高度的橫向擴(kuò)展。
- 潛在的單點(diǎn)故障:消息代理的高可用性至關(guān)重要,否則系統(tǒng)整體的可靠性將受到影響,幸運(yùn)的是大多數(shù)消息中間件都是高可用的。
- 額外的操作復(fù)雜性:消息系統(tǒng)是一個(gè)必須獨(dú)立安裝、配置和運(yùn)維的系統(tǒng)組件,增加了運(yùn)維的復(fù)雜度。
這些弊端我們借助消息中間件本身提供的擴(kuò)展、高可用能力可以解決,但是要真正用好消息中間件我們還需要關(guān)注可能會(huì)遇到的一些設(shè)計(jì)難題。
消息隊(duì)列的設(shè)計(jì)難題
處理并發(fā)和順序消息
在生產(chǎn)環(huán)境中為了提高消息處理的能力以及應(yīng)用程序的吞吐量,一般會(huì)將消費(fèi)者部署多個(gè)實(shí)例節(jié)點(diǎn)。那么帶來(lái)的挑戰(zhàn)就是 如何確保每個(gè)消息只被處理一次,并且是按照他們的發(fā)送順序來(lái)處理的。
例如:假設(shè)有3個(gè)相同的接收方實(shí)例從同一個(gè)點(diǎn)對(duì)點(diǎn)通道讀取消息,發(fā)送方按順序發(fā)布了 Order Created、Order Updated 和 Order Cancelled 這3個(gè)事件消息。簡(jiǎn)單的消息實(shí)現(xiàn)可能就會(huì)同事講每個(gè)消息給不同的接收方。若由于網(wǎng)絡(luò)問(wèn)題導(dǎo)致延遲,消息可能沒(méi)有按照他們發(fā)出時(shí)的順序被處理,這將導(dǎo)致奇怪的行為,服務(wù)實(shí)例可能在另一個(gè)服務(wù)器處理 Order Created 消息之前處理 Order Cancelled消息。
Kafka 使用的解決方案是使用分片(分區(qū))通道。整體解決方案分為三個(gè)部分:
- 一個(gè)主題通道由多個(gè)分片組成,每個(gè)分片的行為類(lèi)似一個(gè)通道。
- 發(fā)送方在消息頭部指定分片鍵如orderId,Kafka使用分片鍵將消息分配給特定的分片。
- 將接收方的多個(gè)實(shí)例組合在一起,并將他們視為相同的邏輯接收方(消費(fèi)者組)。kafka將每個(gè)分片分配給單個(gè)接收器,它在接收方啟動(dòng)和關(guān)閉時(shí)重新分配分片。
如上圖所示,每個(gè)Order事件消息都將orderId作為其分片鍵。特定訂單的每個(gè)事件都發(fā)布到同一個(gè)分片。而且該分片中的消息始終由同一個(gè)接收方實(shí)例讀取,因此這樣就能夠保證按順序處理這些消息。
處理重復(fù)消息
引入消息架構(gòu)必須要解決的另一個(gè)挑戰(zhàn)是處理重復(fù)消息。在理想情況下,消息代理應(yīng)該只傳遞一次消息,但保證消息有且僅有一次的消息傳遞的成本通常很高。相反,很多消息組件承諾至少保證成功傳遞一次消息。
在正常情況下,消息組件只會(huì)傳遞一次消息。但是當(dāng)客戶端、網(wǎng)絡(luò)或消息組件故障可能導(dǎo)致消息被多次傳遞。假設(shè)客戶端在處理消息后發(fā)送確認(rèn)消息前,他的數(shù)據(jù)庫(kù)崩潰了,這時(shí)消息組件將再次發(fā)送未確認(rèn)的消息,在數(shù)據(jù)庫(kù)重新啟動(dòng)時(shí)向該客戶端發(fā)送。
處理重復(fù)消息有以下兩種不同的方法:
- 編寫(xiě)冪等消息處理程序
- 跟蹤消息并丟棄重復(fù)項(xiàng)
編寫(xiě)冪等消息處理器
如果應(yīng)用程序處理消息的邏輯是滿足冪等的,那么重復(fù)消息就是無(wú)害的。程序的冪等性是指,即使這個(gè)應(yīng)用被相同輸入?yún)?shù)多次重復(fù)調(diào)用時(shí),也不會(huì)產(chǎn)生額外的效果。例如:取消一個(gè)已經(jīng)取消的訂單,就是一個(gè)冪等性操作。同樣,創(chuàng)建一個(gè)已經(jīng)存在的訂單操作也必是這樣。滿足冪等的消息處理程序可以被放心的執(zhí)行多次,只要消息組件在傳遞消息時(shí)保持相同的消息順序。
但是不幸的是,應(yīng)用程序通常不是冪等的?;蛘吣悻F(xiàn)在正在使用的消息組件在重新傳遞消息時(shí)不會(huì)保留排序。重復(fù)或無(wú)序消息可能會(huì)導(dǎo)致錯(cuò)誤。在這種情況下,你需要編寫(xiě)跟蹤消息并丟棄重復(fù)消息的消息處理程序。
跟蹤消息并丟棄重復(fù)消息
考慮一個(gè)授權(quán)消費(fèi)者信用卡的消息處理程序。它必須為每個(gè)訂單僅執(zhí)行一次信用卡授權(quán)操作。這段應(yīng)用程序每次調(diào)用時(shí)都會(huì)產(chǎn)生不同的效果。如果重復(fù)消息導(dǎo)致消息處理程序多次執(zhí)行該邏輯,則應(yīng)用程序的行為將不正確。執(zhí)行此類(lèi)應(yīng)用程序邏輯的消息處理程序必須通過(guò)檢測(cè)和丟棄重復(fù)消息而讓它成為冪等的。
一個(gè)簡(jiǎn)單的解決方案是消息接收方使用 message id 跟蹤他已處理的消息并丟棄任何重復(fù)項(xiàng)。例如,在數(shù)據(jù)庫(kù)表中存儲(chǔ)它消費(fèi)的每條消息的 message id。
當(dāng)接收方處理消息時(shí),它將消息的 message id 作為創(chuàng)建和變更業(yè)務(wù)實(shí)體的事務(wù)的一部分記錄在數(shù)據(jù)表里。如上圖所示,接收方將包含message id 的行插入 PROCESSED_MESSAGE表。如果消息是重復(fù)的,則INSERT將失敗,接收方可以選擇丟棄該消息。
另一個(gè)解決方案是消息處理程序在應(yīng)用程序表,而不是專(zhuān)門(mén)表中記錄 message id。當(dāng)時(shí)用具有受限事務(wù)模型的NoSQL數(shù)據(jù)庫(kù)時(shí),此方法特別有用,因?yàn)? NoSQL數(shù)據(jù)庫(kù)通常不支持將針對(duì)兩個(gè)表的更新作為數(shù)據(jù)庫(kù)事務(wù)。
處理事務(wù)性消息
服務(wù)通常需要在更新數(shù)據(jù)庫(kù)的事務(wù)中發(fā)布消息,數(shù)據(jù)庫(kù)更新和消息發(fā)送都必須在事務(wù)中進(jìn)行,否則服務(wù)可能會(huì)更新數(shù)據(jù)庫(kù)然后在發(fā)送消息之前崩潰。
如果服務(wù)不以原子方式執(zhí)行者兩個(gè)操作,則類(lèi)似的故障可能使系統(tǒng)處于不一致?tīng)顟B(tài)。
接下來(lái)我們看一下常用的保證事務(wù)消息的兩種解決方案,最后再看看現(xiàn)代消息組件RocketMQ的事務(wù)性消息解決方案。
使用數(shù)據(jù)庫(kù)表作為消息隊(duì)列
如果你的應(yīng)用程序正在使用關(guān)系型數(shù)據(jù)庫(kù),要保證數(shù)據(jù)的更新和消息發(fā)送之間的事務(wù)可以直接使用 事務(wù)性發(fā)件箱模式,Transactional Outbox。
此模式使用數(shù)據(jù)庫(kù)表作為臨時(shí)消息隊(duì)列。如上圖所示,發(fā)送消息的服務(wù)有個(gè)OUTBOX數(shù)據(jù)表,在進(jìn)行INSERT、UPDATE、DELETE 業(yè)務(wù)操作時(shí)也會(huì)給OUTBOX數(shù)據(jù)表INSERT一條消息記錄,這樣可以保證原子性,因?yàn)檫@是基于本地的ACID事務(wù)。
OUTBOX表充當(dāng)臨時(shí)消息隊(duì)列,然后我們?cè)谝胍粋€(gè)消息中繼(MessageRelay)的服務(wù),由他從OUTBOX表中讀取數(shù)據(jù)并發(fā)布消息到消息組件。
消息中繼的實(shí)現(xiàn)可以很簡(jiǎn)單,只需要通過(guò)定時(shí)任務(wù)定期從OUTBOX表中拉取最新未發(fā)布的數(shù)據(jù),獲取到數(shù)據(jù)后將數(shù)據(jù)發(fā)送給消息組件,最后將完成發(fā)送的消息從OUTBOX表中刪除即可。
使用事務(wù)日志發(fā)布事件
另外一種保證事務(wù)性消息的方式是基于數(shù)據(jù)庫(kù)的事務(wù)日志,也就是所謂的數(shù)據(jù)變更捕獲,Change Data Capture,簡(jiǎn)稱CDC。
一般數(shù)據(jù)庫(kù)在數(shù)據(jù)發(fā)生變更的時(shí)候都會(huì)記錄事務(wù)日志(Transaction Log),比如MySQL的binlog。事務(wù)日志可以簡(jiǎn)單的理解成數(shù)據(jù)庫(kù)本地的一個(gè)文件隊(duì)列,它主要記錄按時(shí)間順序發(fā)生的數(shù)據(jù)庫(kù)表變更記錄。
這里我們利用alibaba開(kāi)源的組件canal結(jié)合MySQL來(lái)說(shuō)明下這種模式的工作原理。
更多操作說(shuō)明可以參考官方文檔:https://github.com/alibaba/canal
canal工作原理
- canal 模擬 MySQL slave 的交互協(xié)議,把自己偽裝成一個(gè)MySQL的 slave節(jié)點(diǎn) ,向 MySQL master 發(fā)送dump 協(xié)議;
- MySQL master 收到 dump 請(qǐng)求,開(kāi)始推送 binary log 給 slave (即 canal );
- canal 解析 binary log 對(duì)象(原始為 byte 流),然后可以將解析后的數(shù)據(jù)直接發(fā)送給消息組件。
RocketMQ事務(wù)消息解決方案
Apache RocketMQ在4.3.0版中已經(jīng)支持分布式事務(wù)消息,RocketMQ采用了2PC的思想來(lái)實(shí)現(xiàn)了提交事務(wù)消息,同時(shí)增加一個(gè)補(bǔ)償邏輯來(lái)處理二階段超時(shí)或者失敗的消息,如下圖所示。
RocketMQ實(shí)現(xiàn)事務(wù)消息主要分為兩個(gè)階段:正常事務(wù)的發(fā)送及提交、事務(wù)信息的補(bǔ)償流程。
整體流程為:
- 正常事務(wù)發(fā)送與提交階段
1、生產(chǎn)者發(fā)送一個(gè)半消息給MQServer(半消息是指消費(fèi)者暫時(shí)不能消費(fèi)的消息)
2、服務(wù)端響應(yīng)消息寫(xiě)入結(jié)果,半消息發(fā)送成功
3、開(kāi)始執(zhí)行本地事務(wù)
4、根據(jù)本地事務(wù)的執(zhí)行狀態(tài)執(zhí)行Commit或者Rollback操作
- 事務(wù)信息的補(bǔ)償流程
1、如果MQServer長(zhǎng)時(shí)間沒(méi)收到本地事務(wù)的執(zhí)行狀態(tài)會(huì)向生產(chǎn)者發(fā)起一個(gè)確認(rèn)回查的操作請(qǐng)求
2、生產(chǎn)者收到確認(rèn)回查請(qǐng)求后,檢查本地事務(wù)的執(zhí)行狀態(tài)
3、根據(jù)檢查后的結(jié)果執(zhí)行Commit或者Rollback操作 補(bǔ)償階段主要是用于解決生產(chǎn)者在發(fā)送Commit或者Rollback操作時(shí)發(fā)生超時(shí)或失敗的情況。
在生產(chǎn)者使用RocketMQ發(fā)送事務(wù)消息的時(shí)候我們也會(huì)借鑒第一種方案即自建一張事務(wù)日志表,然后在執(zhí)行本地事務(wù)的時(shí)候同時(shí)生成一條事務(wù)日志記錄,讓本地事務(wù)與日志事務(wù)在同一個(gè)方法中,同時(shí)添加 @Transactional 注解,保證兩個(gè)操作事務(wù)是一個(gè)原子操作。這樣如果事務(wù)日志表中有這個(gè)本地事務(wù)的信息,那就代表本地事務(wù)執(zhí)行成功,需要Commit,相反如果沒(méi)有對(duì)應(yīng)的事務(wù)日志,則表示沒(méi)執(zhí)行成功,需要Rollback。