自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

沒想到RocketMQ的tag還有這個“坑”!

開發(fā) 開發(fā)工具
客戶端向服務(wù)端拉取消息,連續(xù)1000W條消息都不符合條件,一次過濾查找這么多消息,肯定非常耗時,客戶端也不能等待這么久,那服務(wù)端必須采取措施,必須觸發(fā)一個停止查找的條件并向客戶端返回NO_MESSAGE,客戶端在消息查找時會等待多久呢?

RocketMQ提供了基于Tag的消息過濾機制,但在使用過程中有很多朋友或多或少會有一些疑問,我不經(jīng)意在RocketMQ官方釘釘群,我記得有好多朋友都有問到如下問題:

今天我就與RocketMQ Tag幾個值得關(guān)注的問題,和大家來做一個分享,看過后的朋友,如果覺得有幫助,期待你的點贊支持。

  • 消費組訂閱關(guān)系不一致為什么會到來消息丟失?
  • 如果一個tag的消息數(shù)量很少,是否會顯示很高的延遲?

1.消費組訂閱關(guān)系不一致導致消息丟失

從消息消費的視角來看消費組是一個基本的物理隔離單位,每一個消費組擁有自己的消費位點、消費線程池等。

RocketMQ的初學者容易犯這樣一個錯誤:消費組中的不同消費者,訂閱同一個topic的不同的tag,這樣會導致消息丟失(部分消息沒有消費),在思考這個問題時,我們不妨先來看一張圖:

簡單闡述一下其核心關(guān)鍵點:

  • 例如一個Topic共有4個隊列。
  • 消息發(fā)送者連續(xù)發(fā)送4條tagA的消息后,再連續(xù)發(fā)送4條tagb的消息,消息發(fā)送者端默認采取輪循的負載均衡機制,這樣topic的每一個隊列中都存在tagA、tabB兩個tag的消息。
  • 消費組dw_tag_test的IP為192.168.3.10的消費者訂閱tagA,另外一個IP為192.168.3.11的消費者訂閱tagB。
  • 消費組內(nèi)的消費者在進行消息消費之前,首先會進行隊列負載,默認為平均分配,分配結(jié)果:

消費者然后向Broker發(fā)起消息拉取請求,192.168.3.10消費者會由于只訂閱了tagA,這樣存在q0、q1中的tagB的消息會被過濾,但被過濾的tagB并不會投遞到另外一個訂閱了tagB的消費者,造成這部分消息沒有被投遞,從而導致消息丟失。

同樣192.168.3.11消費者會由于只訂閱了tagB,這樣存在q2、q3中的tagA的消息會被過濾,但被過濾的tagA并不會投遞到另外一個訂閱了tagA的消費者,造成這部分消息沒有被投遞,從而導致消息丟失。

192.168.3.10 分配到q0、q1。

192.168.3.11 分配到q2、q3。

2.如果一個tag的消息數(shù)量很少,是否會顯示很高的延遲?

開篇有群友會存在這樣一個擔憂,其場景大概如下圖所示:

消費者在消費offset=100的這條tag1消息后,后面連續(xù)出現(xiàn)1000W條非tag1的消息,這個消費組的積壓會持續(xù)增加,直接到1000W嗎?

要想明白這個問題,我們至少應(yīng)該要重點去查看如下幾個功能的源碼:

  • 消息拉取流程
  • 位點提交機制

本文將從以問題為導向,經(jīng)過自己的思考,并找到關(guān)鍵源碼加以求證,最后進行簡單的示例代碼進行驗證。

遇到問題之前,我們可以先嘗試思考一下,如果這個功能要我們實現(xiàn),我們大概會怎么去思考?

要判斷消費組在消費為offset=100的消息后,在接下來1000W條消息都會被過濾的情況下,如果我們希望位點能夠提交,我們應(yīng)該怎么設(shè)計?我覺得應(yīng)該至少有如下幾個關(guān)鍵點:

  • 消息消息拉取時連續(xù)1000W條消息找不到合適的消息,服務(wù)端會如何處理
  • 客戶端拉取到消息與未拉取到消息兩種情況如何提交位點

2.1 消息拉取流程中的關(guān)鍵設(shè)計

客戶端向服務(wù)端拉取消息,連續(xù)1000W條消息都不符合條件,一次過濾查找這么多消息,肯定非常耗時,客戶端也不能等待這么久,那服務(wù)端必須采取措施,必須觸發(fā)一個停止查找的條件并向客戶端返回NO_MESSAGE,客戶端在消息查找時會等待多久呢?

核心關(guān)鍵點一:客戶端在向服務(wù)端發(fā)起消息拉取請求時會設(shè)置超時時間,代碼如下所示:

其中與超時時間相關(guān)的兩個變量,其含義分別:

  • long brokerSuspendMaxTimeMillis 在當前沒有符合的消息時在Broker端允許掛起的時間,默認為15s,暫時不支持自定義。
  • long timeoutMillis 消息拉取的超時時間,默認為30s,暫時不支持自定義。

即一次消息拉取最大的超時時間為30s。

核心關(guān)鍵點二:Broker端在處理消息拉取時設(shè)置了完備的退出條件,具體由DefaultMessageStore的getMessage方法事項,具體代碼如下所述:

核心要點:

  • 首先客戶端在發(fā)起時會傳入一個本次期望拉取的消息數(shù)量,對應(yīng)上述代碼中的maxMsgNums,如果拉取到指定條數(shù)到消息(讀者朋友們?nèi)珞w代碼讀者可以查閱isTheBatchFull方法),則正常退出。
  • 另外一個非常關(guān)鍵的過濾條件,即一次消息拉取過程中,服務(wù)端最大掃描的索引字節(jié)數(shù),即一次拉取掃描ConsumeQueue的字節(jié)數(shù)量,取16000與期望拉取條數(shù)乘以20,因為一個consumequeue條目占20個字節(jié)。
  • 服務(wù)端還蘊含了一個長輪循機制,即如果掃描了指定的字節(jié)數(shù),但一條消息都沒查詢到,會在broker端掛起一段時間,如果有新消息到來并符合過濾條件,則會喚醒,向客戶端返回消息。

回到這個問題,如果服務(wù)端連續(xù)1000W條非tag1的消息,拉取請求不會一次性篩選,而是會返回,不至于讓客戶端超時。

從這里可以打消第一個顧慮:服務(wù)端在沒有找到消息時不會傻傻等待不返回,接下來看是否會有積壓的關(guān)鍵是看如何提交位點。

2.2 位點提交機制

2.2.1 客戶端拉取到合適的消息位點提交機制

Pull線程從服務(wù)端拉取到結(jié)構(gòu)后會將消息提交到消費組線程池,主要定義在DefaultMQPushConsumerImpl的PullTask類中,具體代碼如下所示:

眾所周知,RocketMQ是在消費成功后進行位點提交,代碼在ConsumeMessageConcurrentlyService中,如下所示:

這里的核心要點:

消費端成功消息完消費后,會采用最小位點提交機制,確保消費不丟失。

最小位點提交機制,其實就是將拉取到的消息放入一個TreeMap中,然后消費線程成功消費一條消息后,將該消息從TreeMap中移除,再計算位點:

如果當前TreeMap中還有消息在處理,則返回TreeMap中的第一條消息(最小位點)

如果當前TreeMap中已沒有消息處理,返回的位點為this.queueOffsetMax,queueOffsetMax的表示的是當前消費隊列中拉取到的最大消費位點,因為此時拉取到的消息全部消費了。

最后調(diào)用updateoffset方法,更新本地的位點緩存(有定時持久機制)

2.2.2 客戶端沒有拉取到合適的消息位點提交機制

客戶端如果沒有拉取到合適的消息,例如全部被tag過濾了,在DefaultMqPushConsumerImpl的PullTask中定義了處理方式,具體如下所示:

其關(guān)鍵代碼在correctTasOffset中,具體代碼請看:

核心要點:如果此時處理隊列中的消息為0時,則會將下一次拉取偏移量當成位點,而這個值在服務(wù)端進行消息查找時會向前驅(qū)動,代碼在DefaultMessageStore的getMessage中:

故從這里可以看到,就算消息全部過濾掉了,位點還是會向前驅(qū)動的,不會造成大量積壓。

2.2.3 消息拉取時會附帶一次位點提交

其實RocketMQ的位點提交,客戶端提交位點時會先存儲在本地緩存中,然后定時將位點信息一次性提交到Broker端,其實還存在另外一種較為隱式位點提交機制:

即在消息拉取時,如果本地緩存中存在位點信息,會設(shè)置一個系統(tǒng)標記:FLAG_COMMIT_OFFSET,該標記在服務(wù)端會觸發(fā)一次位點提交,具體代碼如下:

2.2.4 總結(jié)與驗證

綜上述所述,使用TAG并不會因為對應(yīng)tag數(shù)量比較少,從而造成大量積壓的情況。

為了驗證這個觀點,我也做了一個簡單的驗證,具體方法是啟動一個消息發(fā)送者,向指定topic發(fā)送tag B的消息,而消費者只訂閱tag A,但消費者并不會出現(xiàn)消費積壓,測試代碼如下圖所示:

查看消費組積壓情況如下圖所示:

責任編輯:武曉燕 來源: 中間件興趣圈
相關(guān)推薦

2017-02-09 17:00:00

iOSSwiftKVC

2023-10-30 08:16:33

數(shù)據(jù)庫插件Mybatis

2016-10-11 14:19:07

2018-01-26 23:23:23

JDBC MySQL數(shù)據(jù)庫

2021-01-27 18:13:35

日志nginx信息

2021-08-31 09:35:01

TCPIP漏洞

2017-12-26 15:41:26

2022-07-05 13:56:21

模式Spring注入

2021-08-01 12:34:01

跳水人工智能AI

2012-12-28 13:47:36

Raspberry PGeek

2022-01-05 17:13:28

監(jiān)控HTTPS網(wǎng)站

2019-12-09 10:13:20

HashMap選擇容量

2019-03-08 10:08:41

網(wǎng)絡(luò)程序猿代碼

2022-07-26 01:00:12

Eureka延遲注冊

2021-11-29 05:37:24

Windows Def操作系統(tǒng)微軟

2020-08-14 08:19:25

Shell命令行數(shù)據(jù)

2009-04-28 07:48:29

蓋茨打工基金會

2022-05-09 17:12:32

元宇宙技術(shù)生活

2018-10-22 15:29:50

2023-09-07 06:48:38

Intel顯卡AMD
點贊
收藏

51CTO技術(shù)棧公眾號