RocketMQ順序消息解析!
順序消息是消息隊(duì)列 RocketMQ 提供的一種高級(jí)消息類型。
對(duì)于一個(gè)指定的Topic,消息嚴(yán)格按照先進(jìn)先出(FIFO)的原則進(jìn)行消息發(fā)布和消費(fèi)。
- 即先發(fā)送的消息先消費(fèi),后發(fā)送的消息后消費(fèi)。
順序消息適用于對(duì)消息發(fā)送和消費(fèi)順序有嚴(yán)格要求的情況。
應(yīng)用場(chǎng)景
順序消息和普通消息的對(duì)比如下:
消息類型 | 消費(fèi)順序 | 性能 | 適用場(chǎng)景 |
普通消息 | 無順序 | 高 | 適用于對(duì)吞吐量要求高,且對(duì)生產(chǎn)和消費(fèi)順序無要求 |
順序消息 | 指定的 Topic 內(nèi)的消息遵循先入先出(FIFO)規(guī)則 | 一般 | 吞吐量要求一般 但是要求特定的 |
訂單創(chuàng)建場(chǎng)景:
在一些電商系統(tǒng)中,同一個(gè)訂單相關(guān)的創(chuàng)建訂單消息、訂單支付消息、訂單退款消息、訂單物流消息。
必須嚴(yán)格按照先后順序來進(jìn)行生產(chǎn)或者消費(fèi),否則消費(fèi)中傳遞訂單狀態(tài)會(huì)發(fā)生紊亂,影響業(yè)務(wù)的正常進(jìn)行。
因此,該訂單的消息必須按照一定的順序在客戶端和消息隊(duì)列中進(jìn)行生產(chǎn)和消費(fèi)。
- 同時(shí)消息之間有先后的依賴關(guān)系,后一條消息需要依賴于前一條消息的處理結(jié)果。
順序消息分為全局有序和局部有序。
全局有序
可以為Topic設(shè)置一個(gè)消息隊(duì)列,使用一個(gè)生產(chǎn)者單線程發(fā)送數(shù)據(jù),消費(fèi)者端也使用單線程進(jìn)行消費(fèi)。
從而保證消息的全局有序,但是這種方式效率低,一般不使用。
局部有序
假設(shè)一個(gè)Topic分配了兩個(gè)消息隊(duì)列,生產(chǎn)者在發(fā)送消息的時(shí)候,可以對(duì)消息設(shè)置一個(gè)路由ID。
- 比如想保證一個(gè)訂單的相關(guān)消息有序,那么就使用訂單ID當(dāng)做路由ID。
在發(fā)送消息的時(shí)候,通過訂單ID對(duì)消息隊(duì)列的個(gè)數(shù)取余,根據(jù)取余結(jié)果選擇消息隊(duì)列。
- 這樣同一個(gè)訂單的數(shù)據(jù)就可以保證發(fā)送到一個(gè)消息隊(duì)列中。
消費(fèi)者端使用MessageListenerOrderly處理有序消息。
這就是RocketMQ的局部有序,保證消息在某個(gè)消息隊(duì)列中有序。
圖片
實(shí)現(xiàn)原理
消費(fèi)者在啟動(dòng)時(shí)會(huì)調(diào)用DefaultMQPushConsumerImpl的start方法。
圖片
在DefaultMQPushConsumerImpl的start方法中,對(duì)消息監(jiān)聽器類型進(jìn)行了判斷。
如果類型是MessageListenerOrderly表示要進(jìn)行順序消費(fèi)。
此時(shí)使用ConsumeMessageOrderlyService對(duì)ConsumeMessageService進(jìn)行實(shí)例化。
- 然后調(diào)用它的start方法進(jìn)行啟動(dòng)。
圖片
加鎖定時(shí)任務(wù)
進(jìn)入到ConsumeMessageOrderlyService的start方法中。
可以看到,如果是集群模式,會(huì)啟動(dòng)一個(gè)定時(shí)加鎖的任務(wù),周期性的對(duì)訂閱的消息隊(duì)列進(jìn)行加鎖。
具體是通過調(diào)用RebalanceImpl的lockAll方法實(shí)現(xiàn)的。
圖片
為什么集群模式下需要加鎖?
因?yàn)閺V播模式下,消息隊(duì)列會(huì)分配給消費(fèi)者下的每一個(gè)消費(fèi)者。
而在集群模式下,一個(gè)消息隊(duì)列同一時(shí)刻只能被同一個(gè)消費(fèi)組下的某一個(gè)消費(fèi)者進(jìn)行。
- 所以在廣播模式下不存在競(jìng)爭關(guān)系,也就不需要對(duì)消息隊(duì)列進(jìn)行加鎖。
而在集群模式下,有可能因?yàn)樨?fù)載均衡等原因?qū)⒛骋粋€(gè)消息隊(duì)列分配到了另外一個(gè)消費(fèi)者中。
- 因此在集群模式下就要加鎖,當(dāng)某個(gè)消息隊(duì)列被鎖定時(shí),其他的消費(fèi)者不能進(jìn)行消費(fèi)。
整個(gè)順序消費(fèi)過程涉及了三把鎖,它們分別對(duì)應(yīng)不同的情況。
向Broker申請(qǐng)的消息隊(duì)列鎖
集群模式下一個(gè)消息隊(duì)列同一時(shí)刻只能被同一個(gè)消費(fèi)組下的某一個(gè)消費(fèi)者進(jìn)行。
為了避免負(fù)載均衡等原因引起的變動(dòng),消費(fèi)者會(huì)向Broker發(fā)送請(qǐng)求對(duì)消息隊(duì)列進(jìn)行加鎖。
如果加鎖成功,記錄到消息隊(duì)列對(duì)應(yīng)的ProcessQueue中的locked變量中,它是boolean類型的。
public class ProcessQueue {
private volatile boolean locked = false;
}
消費(fèi)者處理拉取消息時(shí)的消息隊(duì)列鎖
消費(fèi)者在處理拉取到的消息時(shí),由于可以開啟多線程進(jìn)行處理。
所以處理消息前通過MessageQueueLock中的mqLockTable獲取到了消息隊(duì)列對(duì)應(yīng)的鎖。
鎖住要處理的消息隊(duì)列,這里加消息隊(duì)列鎖主要是處理多線程之間的競(jìng)爭。
public class MessageQueueLock {
private ConcurrentMap<MessageQueue, Object> mqLockTable =
new ConcurrentHashMap<MessageQueue, Object>();
消息消費(fèi)鎖
消費(fèi)者在調(diào)用consumeMessage方法之前會(huì)加消費(fèi)鎖。
主要是為了避免在消費(fèi)消息時(shí),由于負(fù)載均衡等原因,ProcessQueue被刪除。
public class ProcessQueue {
private final Lock consumeLock = new ReentrantLock();
}
圖片
順序消息缺陷
消費(fèi)順序消息的并行度依賴于隊(duì)列的數(shù)量。
隊(duì)列熱點(diǎn)問題,個(gè)別隊(duì)列由于哈希不均導(dǎo)致消息過多,消費(fèi)速度跟不上,產(chǎn)生消息堆積問題。
遇到消息失敗的消息,無法跳過,當(dāng)前隊(duì)列消費(fèi)暫停。
熱點(diǎn)問題,只能通過拆分MessageQueue和優(yōu)化路由方法來盡量均衡的將消息分配到不同的MessageQueue。
消費(fèi)并行度理論上不會(huì)有太大問題,因?yàn)镸essageQueue的數(shù)量可以調(diào)整。
消費(fèi)失敗的無法跳過是不可避免的。
因?yàn)樘^可能導(dǎo)致后續(xù)的數(shù)據(jù)處理都是錯(cuò)誤的。
不過可以提供一些策略,由用戶根據(jù)錯(cuò)誤類型來決定是否跳過,并且提供重試隊(duì)列之類的功能。
- 在跳過之后用戶可以在其他地方重新消費(fèi)到這條消息。
資料分享:
參考:
丁威、周繼鋒《RocketMQ技術(shù)內(nèi)幕》
https://rocketmq.apache.org/zh/docs/featureBehavior/03fifomessage/