沒想到RocketMQ的tag還有這個“坑”!
RocketMQ提供了基于Tag的消息過濾機制,但在使用過程中有很多朋友或多或少會有一些疑問,我不經(jīng)意在RocketMQ官方釘釘群,我記得有好多朋友都有問到如下問題:
今天我就與RocketMQ Tag幾個值得關(guān)注的問題,和大家來做一個分享,看過后的朋友,如果覺得有幫助,期待你的點贊支持。
- 消費組訂閱關(guān)系不一致為什么會到來消息丟失?
- 如果一個tag的消息數(shù)量很少,是否會顯示很高的延遲?
1.消費組訂閱關(guān)系不一致導致消息丟失
從消息消費的視角來看消費組是一個基本的物理隔離單位,每一個消費組擁有自己的消費位點、消費線程池等。
RocketMQ的初學者容易犯這樣一個錯誤:消費組中的不同消費者,訂閱同一個topic的不同的tag,這樣會導致消息丟失(部分消息沒有消費),在思考這個問題時,我們不妨先來看一張圖:
簡單闡述一下其核心關(guān)鍵點:
- 例如一個Topic共有4個隊列。
- 消息發(fā)送者連續(xù)發(fā)送4條tagA的消息后,再連續(xù)發(fā)送4條tagb的消息,消息發(fā)送者端默認采取輪循的負載均衡機制,這樣topic的每一個隊列中都存在tagA、tabB兩個tag的消息。
- 消費組dw_tag_test的IP為192.168.3.10的消費者訂閱tagA,另外一個IP為192.168.3.11的消費者訂閱tagB。
- 消費組內(nèi)的消費者在進行消息消費之前,首先會進行隊列負載,默認為平均分配,分配結(jié)果:
消費者然后向Broker發(fā)起消息拉取請求,192.168.3.10消費者會由于只訂閱了tagA,這樣存在q0、q1中的tagB的消息會被過濾,但被過濾的tagB并不會投遞到另外一個訂閱了tagB的消費者,造成這部分消息沒有被投遞,從而導致消息丟失。
同樣192.168.3.11消費者會由于只訂閱了tagB,這樣存在q2、q3中的tagA的消息會被過濾,但被過濾的tagA并不會投遞到另外一個訂閱了tagA的消費者,造成這部分消息沒有被投遞,從而導致消息丟失。
192.168.3.10 分配到q0、q1。
192.168.3.11 分配到q2、q3。
2.如果一個tag的消息數(shù)量很少,是否會顯示很高的延遲?
開篇有群友會存在這樣一個擔憂,其場景大概如下圖所示:
消費者在消費offset=100的這條tag1消息后,后面連續(xù)出現(xiàn)1000W條非tag1的消息,這個消費組的積壓會持續(xù)增加,直接到1000W嗎?
要想明白這個問題,我們至少應(yīng)該要重點去查看如下幾個功能的源碼:
- 消息拉取流程
- 位點提交機制
本文將從以問題為導向,經(jīng)過自己的思考,并找到關(guān)鍵源碼加以求證,最后進行簡單的示例代碼進行驗證。
遇到問題之前,我們可以先嘗試思考一下,如果這個功能要我們實現(xiàn),我們大概會怎么去思考?
要判斷消費組在消費為offset=100的消息后,在接下來1000W條消息都會被過濾的情況下,如果我們希望位點能夠提交,我們應(yīng)該怎么設(shè)計?我覺得應(yīng)該至少有如下幾個關(guān)鍵點:
- 消息消息拉取時連續(xù)1000W條消息找不到合適的消息,服務(wù)端會如何處理
- 客戶端拉取到消息與未拉取到消息兩種情況如何提交位點
2.1 消息拉取流程中的關(guān)鍵設(shè)計
客戶端向服務(wù)端拉取消息,連續(xù)1000W條消息都不符合條件,一次過濾查找這么多消息,肯定非常耗時,客戶端也不能等待這么久,那服務(wù)端必須采取措施,必須觸發(fā)一個停止查找的條件并向客戶端返回NO_MESSAGE,客戶端在消息查找時會等待多久呢?
核心關(guān)鍵點一:客戶端在向服務(wù)端發(fā)起消息拉取請求時會設(shè)置超時時間,代碼如下所示:
其中與超時時間相關(guān)的兩個變量,其含義分別:
- long brokerSuspendMaxTimeMillis 在當前沒有符合的消息時在Broker端允許掛起的時間,默認為15s,暫時不支持自定義。
- long timeoutMillis 消息拉取的超時時間,默認為30s,暫時不支持自定義。
即一次消息拉取最大的超時時間為30s。
核心關(guān)鍵點二:Broker端在處理消息拉取時設(shè)置了完備的退出條件,具體由DefaultMessageStore的getMessage方法事項,具體代碼如下所述:
核心要點:
- 首先客戶端在發(fā)起時會傳入一個本次期望拉取的消息數(shù)量,對應(yīng)上述代碼中的maxMsgNums,如果拉取到指定條數(shù)到消息(讀者朋友們?nèi)珞w代碼讀者可以查閱isTheBatchFull方法),則正常退出。
- 另外一個非常關(guān)鍵的過濾條件,即一次消息拉取過程中,服務(wù)端最大掃描的索引字節(jié)數(shù),即一次拉取掃描ConsumeQueue的字節(jié)數(shù)量,取16000與期望拉取條數(shù)乘以20,因為一個consumequeue條目占20個字節(jié)。
- 服務(wù)端還蘊含了一個長輪循機制,即如果掃描了指定的字節(jié)數(shù),但一條消息都沒查詢到,會在broker端掛起一段時間,如果有新消息到來并符合過濾條件,則會喚醒,向客戶端返回消息。
回到這個問題,如果服務(wù)端連續(xù)1000W條非tag1的消息,拉取請求不會一次性篩選,而是會返回,不至于讓客戶端超時。
從這里可以打消第一個顧慮:服務(wù)端在沒有找到消息時不會傻傻等待不返回,接下來看是否會有積壓的關(guān)鍵是看如何提交位點。
2.2 位點提交機制
2.2.1 客戶端拉取到合適的消息位點提交機制
Pull線程從服務(wù)端拉取到結(jié)構(gòu)后會將消息提交到消費組線程池,主要定義在DefaultMQPushConsumerImpl的PullTask類中,具體代碼如下所示:
眾所周知,RocketMQ是在消費成功后進行位點提交,代碼在ConsumeMessageConcurrentlyService中,如下所示:
這里的核心要點:
消費端成功消息完消費后,會采用最小位點提交機制,確保消費不丟失。
最小位點提交機制,其實就是將拉取到的消息放入一個TreeMap中,然后消費線程成功消費一條消息后,將該消息從TreeMap中移除,再計算位點:
如果當前TreeMap中還有消息在處理,則返回TreeMap中的第一條消息(最小位點)
如果當前TreeMap中已沒有消息處理,返回的位點為this.queueOffsetMax,queueOffsetMax的表示的是當前消費隊列中拉取到的最大消費位點,因為此時拉取到的消息全部消費了。
最后調(diào)用updateoffset方法,更新本地的位點緩存(有定時持久機制)
2.2.2 客戶端沒有拉取到合適的消息位點提交機制
客戶端如果沒有拉取到合適的消息,例如全部被tag過濾了,在DefaultMqPushConsumerImpl的PullTask中定義了處理方式,具體如下所示:
其關(guān)鍵代碼在correctTasOffset中,具體代碼請看:
核心要點:如果此時處理隊列中的消息為0時,則會將下一次拉取偏移量當成位點,而這個值在服務(wù)端進行消息查找時會向前驅(qū)動,代碼在DefaultMessageStore的getMessage中:
故從這里可以看到,就算消息全部過濾掉了,位點還是會向前驅(qū)動的,不會造成大量積壓。
2.2.3 消息拉取時會附帶一次位點提交
其實RocketMQ的位點提交,客戶端提交位點時會先存儲在本地緩存中,然后定時將位點信息一次性提交到Broker端,其實還存在另外一種較為隱式位點提交機制:
即在消息拉取時,如果本地緩存中存在位點信息,會設(shè)置一個系統(tǒng)標記:FLAG_COMMIT_OFFSET,該標記在服務(wù)端會觸發(fā)一次位點提交,具體代碼如下:
2.2.4 總結(jié)與驗證
綜上述所述,使用TAG并不會因為對應(yīng)tag數(shù)量比較少,從而造成大量積壓的情況。
為了驗證這個觀點,我也做了一個簡單的驗證,具體方法是啟動一個消息發(fā)送者,向指定topic發(fā)送tag B的消息,而消費者只訂閱tag A,但消費者并不會出現(xiàn)消費積壓,測試代碼如下圖所示:
查看消費組積壓情況如下圖所示: