自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

RocketMQ順序消息解析!

開發(fā) 前端
順序消息是消息隊(duì)列 RocketMQ 提供的一種高級(jí)消息類型。對(duì)于一個(gè)指定的Topic,消息嚴(yán)格按照先進(jìn)先出(FIFO)的原則進(jìn)行消息發(fā)布和消費(fèi)。

順序消息是消息隊(duì)列 RocketMQ 提供的一種高級(jí)消息類型。

對(duì)于一個(gè)指定的Topic,消息嚴(yán)格按照先進(jìn)先出(FIFO)的原則進(jìn)行消息發(fā)布和消費(fèi)。

  • 即先發(fā)送的消息先消費(fèi),后發(fā)送的消息后消費(fèi)。

順序消息適用于對(duì)消息發(fā)送和消費(fèi)順序有嚴(yán)格要求的情況。

應(yīng)用場(chǎng)景

順序消息和普通消息的對(duì)比如下:

消息類型

消費(fèi)順序

性能

適用場(chǎng)景

普通消息

無順序

適用于對(duì)吞吐量要求高,且對(duì)生產(chǎn)和消費(fèi)順序無要求

順序消息

指定的 Topic 內(nèi)的消息遵循先入先出(FIFO)規(guī)則

一般

吞吐量要求一般

但是要求特定的 Topic 嚴(yán)格地按照 FIFO 原則進(jìn)行消息發(fā)布和消費(fèi)的場(chǎng)景

訂單創(chuàng)建場(chǎng)景:

在一些電商系統(tǒng)中,同一個(gè)訂單相關(guān)的創(chuàng)建訂單消息、訂單支付消息、訂單退款消息、訂單物流消息。

必須嚴(yán)格按照先后順序來進(jìn)行生產(chǎn)或者消費(fèi),否則消費(fèi)中傳遞訂單狀態(tài)會(huì)發(fā)生紊亂,影響業(yè)務(wù)的正常進(jìn)行。

因此,該訂單的消息必須按照一定的順序在客戶端和消息隊(duì)列中進(jìn)行生產(chǎn)和消費(fèi)。

  • 同時(shí)消息之間有先后的依賴關(guān)系,后一條消息需要依賴于前一條消息的處理結(jié)果。

順序消息分為全局有序和局部有序。

全局有序

可以為Topic設(shè)置一個(gè)消息隊(duì)列,使用一個(gè)生產(chǎn)者單線程發(fā)送數(shù)據(jù),消費(fèi)者端也使用單線程進(jìn)行消費(fèi)。

從而保證消息的全局有序,但是這種方式效率低,一般不使用。

圖片

局部有序

假設(shè)一個(gè)Topic分配了兩個(gè)消息隊(duì)列,生產(chǎn)者在發(fā)送消息的時(shí)候,可以對(duì)消息設(shè)置一個(gè)路由ID。

  • 比如想保證一個(gè)訂單的相關(guān)消息有序,那么就使用訂單ID當(dāng)做路由ID。

在發(fā)送消息的時(shí)候,通過訂單ID對(duì)消息隊(duì)列的個(gè)數(shù)取余,根據(jù)取余結(jié)果選擇消息隊(duì)列。

  • 這樣同一個(gè)訂單的數(shù)據(jù)就可以保證發(fā)送到一個(gè)消息隊(duì)列中。

消費(fèi)者端使用MessageListenerOrderly處理有序消息。

這就是RocketMQ的局部有序,保證消息在某個(gè)消息隊(duì)列中有序。

圖片圖片

實(shí)現(xiàn)原理

消費(fèi)者在啟動(dòng)時(shí)會(huì)調(diào)用DefaultMQPushConsumerImpl的start方法。

圖片圖片

在DefaultMQPushConsumerImpl的start方法中,對(duì)消息監(jiān)聽器類型進(jìn)行了判斷。

如果類型是MessageListenerOrderly表示要進(jìn)行順序消費(fèi)。

此時(shí)使用ConsumeMessageOrderlyService對(duì)ConsumeMessageService進(jìn)行實(shí)例化。

  • 然后調(diào)用它的start方法進(jìn)行啟動(dòng)。

圖片圖片

加鎖定時(shí)任務(wù)

進(jìn)入到ConsumeMessageOrderlyService的start方法中。

可以看到,如果是集群模式,會(huì)啟動(dòng)一個(gè)定時(shí)加鎖的任務(wù),周期性的對(duì)訂閱的消息隊(duì)列進(jìn)行加鎖。

具體是通過調(diào)用RebalanceImpl的lockAll方法實(shí)現(xiàn)的。

圖片圖片

為什么集群模式下需要加鎖?

因?yàn)閺V播模式下,消息隊(duì)列會(huì)分配給消費(fèi)者下的每一個(gè)消費(fèi)者。

而在集群模式下,一個(gè)消息隊(duì)列同一時(shí)刻只能被同一個(gè)消費(fèi)組下的某一個(gè)消費(fèi)者進(jìn)行。

  • 所以在廣播模式下不存在競(jìng)爭關(guān)系,也就不需要對(duì)消息隊(duì)列進(jìn)行加鎖。

而在集群模式下,有可能因?yàn)樨?fù)載均衡等原因?qū)⒛骋粋€(gè)消息隊(duì)列分配到了另外一個(gè)消費(fèi)者中。

  • 因此在集群模式下就要加鎖,當(dāng)某個(gè)消息隊(duì)列被鎖定時(shí),其他的消費(fèi)者不能進(jìn)行消費(fèi)。

整個(gè)順序消費(fèi)過程涉及了三把鎖,它們分別對(duì)應(yīng)不同的情況。

向Broker申請(qǐng)的消息隊(duì)列鎖

集群模式下一個(gè)消息隊(duì)列同一時(shí)刻只能被同一個(gè)消費(fèi)組下的某一個(gè)消費(fèi)者進(jìn)行。

為了避免負(fù)載均衡等原因引起的變動(dòng),消費(fèi)者會(huì)向Broker發(fā)送請(qǐng)求對(duì)消息隊(duì)列進(jìn)行加鎖。

如果加鎖成功,記錄到消息隊(duì)列對(duì)應(yīng)的ProcessQueue中的locked變量中,它是boolean類型的。

public class ProcessQueue {
    private volatile boolean locked = false;
}

消費(fèi)者處理拉取消息時(shí)的消息隊(duì)列鎖

消費(fèi)者在處理拉取到的消息時(shí),由于可以開啟多線程進(jìn)行處理。

所以處理消息前通過MessageQueueLock中的mqLockTable獲取到了消息隊(duì)列對(duì)應(yīng)的鎖。

鎖住要處理的消息隊(duì)列,這里加消息隊(duì)列鎖主要是處理多線程之間的競(jìng)爭。

public class MessageQueueLock {
    private ConcurrentMap<MessageQueue, Object> mqLockTable =
        new ConcurrentHashMap<MessageQueue, Object>();

消息消費(fèi)鎖

消費(fèi)者在調(diào)用consumeMessage方法之前會(huì)加消費(fèi)鎖。

主要是為了避免在消費(fèi)消息時(shí),由于負(fù)載均衡等原因,ProcessQueue被刪除。

public class ProcessQueue {
    private final Lock consumeLock = new ReentrantLock();
}

圖片圖片

順序消息缺陷

消費(fèi)順序消息的并行度依賴于隊(duì)列的數(shù)量。

隊(duì)列熱點(diǎn)問題,個(gè)別隊(duì)列由于哈希不均導(dǎo)致消息過多,消費(fèi)速度跟不上,產(chǎn)生消息堆積問題。

遇到消息失敗的消息,無法跳過,當(dāng)前隊(duì)列消費(fèi)暫停。

熱點(diǎn)問題,只能通過拆分MessageQueue和優(yōu)化路由方法來盡量均衡的將消息分配到不同的MessageQueue。

消費(fèi)并行度理論上不會(huì)有太大問題,因?yàn)镸essageQueue的數(shù)量可以調(diào)整。

消費(fèi)失敗的無法跳過是不可避免的。

因?yàn)樘^可能導(dǎo)致后續(xù)的數(shù)據(jù)處理都是錯(cuò)誤的。

不過可以提供一些策略,由用戶根據(jù)錯(cuò)誤類型來決定是否跳過,并且提供重試隊(duì)列之類的功能。

  • 在跳過之后用戶可以在其他地方重新消費(fèi)到這條消息。

資料分享:

參考:

丁威、周繼鋒《RocketMQ技術(shù)內(nèi)幕》

https://rocketmq.apache.org/zh/docs/featureBehavior/03fifomessage/

責(zé)任編輯:武曉燕 來源: 月伴飛魚
相關(guān)推薦

2024-10-29 08:34:27

RocketMQ消息類型事務(wù)消息

2021-04-15 09:17:01

SpringBootRocketMQ

2024-09-25 08:32:05

2024-08-22 18:49:23

2023-12-15 13:08:00

RocketMQ中間件消費(fèi)順序

2023-09-04 08:00:53

提交事務(wù)消息

2021-07-13 11:52:47

順序消息RocketMQkafka

2024-11-11 00:00:10

2022-06-27 11:04:24

RocketMQ順序消息

2024-10-11 09:15:33

2022-12-22 10:03:18

消息集成

2023-12-04 09:23:49

分布式消息

2023-09-21 09:02:03

RocketMQ全局有序局部有序

2023-07-17 08:34:03

RocketMQ消息初體驗(yàn)

2022-06-02 08:21:07

RocketMQ消息中間件

2023-07-18 09:03:01

RocketMQ場(chǎng)景消息

2025-04-09 08:20:00

RocketMQ消息隊(duì)列開發(fā)

2023-12-21 08:01:41

RocketMQ消息堆積

2022-03-31 08:26:44

RocketMQ消息排查

2020-11-13 16:40:05

RocketMQ延遲消息架構(gòu)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)