RocketMQ基礎(chǔ)概念剖析,并分析一下Producer的底層源碼
本文轉(zhuǎn)載自微信公眾號「SH的全棧筆記」,作者SH。轉(zhuǎn)載本文請聯(lián)系SH的全棧筆記公眾號。
基礎(chǔ)概念Broker
首先我們要知道,使用RocketMQ時我們經(jīng)歷了什么。那就是生產(chǎn)者發(fā)送一條消息給RocketMQ,RocketMQ拿到這條消息之后將其持久化存儲起來,然后消費者去找MQ消費這條消息。
RocketMQ操作
上圖中,RocketMQ被標(biāo)識為了一個單點,但事實上肯定不是如此,對于可以隨時橫向擴展的服務(wù)來說,生產(chǎn)者向MQ生產(chǎn)消息的數(shù)量也會隨之而變化,所以一個合格成熟的MQ必然是要能夠處理這種情況的;而且MQ自身需要做到高可用,否則一旦這個單點宕機,那所有存儲在MQ中的消息就全部丟失且無法找回了。
所以在實際的生產(chǎn)環(huán)境中,肯定是會部署一個MQ的集群。而在RocketMQ中,這個“實例”有個專屬名詞,叫做Broker。并且,每個Broker都會部署一個Slave Broker,Master Broker會定時的向Slave Broker同步數(shù)據(jù),形成一個Broker的主從架構(gòu)。
那么問題來了,在微服務(wù)的架構(gòu)中,部署的服務(wù)也存在多實例部署的情況,服務(wù)之間相互調(diào)用是通過注冊中心來獲取對應(yīng)服務(wù)的實例列表的。
拿Spring Cloud舉例,服務(wù)通過Eureka注冊中心獲取到某個服務(wù)的全部實例,然后交給Ribbon,Ribbon聯(lián)動Eureka,從Eureka處獲取到服務(wù)實例的列表,然后通過負(fù)載均衡算法選出一個實例,最后發(fā)起請求。
同理,此時MQ中存在多個Broker實例,那生產(chǎn)者如何得知MQ集群中有多少Broker實例呢?自己應(yīng)該連接哪個實例?
首先我們直接排除在代碼里Hard Code,具體原因我覺得應(yīng)該不用再贅述了。RocketMQ是如何解決這個問題呢?這就是接下來我們要介紹的NameServer了。
NameServer
NameServer可以被簡單的理解為上一小節(jié)中提到的注冊中心,所有的Broker的在啟動的時候都會向NameServer進行注冊,將自己的信息上報。這些信息除了Broker的IP、端口相關(guān)數(shù)據(jù),還有RocketMQ集群的路由信息,路由信息后面再聊。
RocketMQ操作
有了NameServer,客戶端啟動之后會和NameServer交互,獲取到當(dāng)前RocketMQ集群中所有的Broker信息、路由信息。這樣一來,生產(chǎn)者就知道自己需要連接的Broker信息了,就可以進行消息投遞。
那么問題來了,如果在運行過程中,如果某個Broker突然宕機,NameServer會如何處理?
這需要提到RocketMQ的這續(xù)約機制和故障感知機制。Broker在完成向NameServer的注冊之后,會每隔30秒向NameServer發(fā)送心跳進行續(xù)約;如果NameServer感知到了某個Broker超過了120秒都沒有發(fā)送心跳,則會認(rèn)為這個Broker不可用,將其從自己維護的信息中移除。
這套機制,和Spring Cloud中的Eureka的實現(xiàn)如出一轍。Eureka中的Service在啟動之后也會向Eureka注冊自己,這樣一來其他的服務(wù)就可以向該服務(wù)發(fā)起請求,交換數(shù)據(jù)。Service每隔30秒會向Eureka發(fā)送心跳續(xù)約,如果某個Service超過了90秒沒有發(fā)送心跳,Eureka就會認(rèn)為該服務(wù)宕機,將其從Eureka維護的注冊表中移除。
上面圖中我聊到了多實例部署,這個多實例部署和微服務(wù)中的多實例部署還不太一樣,微服務(wù)中,所有的服務(wù)都是無狀態(tài)的,可以橫向的擴展,而在RocketMQ中,每個Broker所存的數(shù)據(jù)可能都不一樣。
我們來看一下RocketMQ的簡單使用。
- Message msg = new Message(
- "TopicTest",
- "TagA",
- ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
- );
- SendResult sendResult = producer.send(msg);
可以看到,Message的第一個參數(shù),為當(dāng)前這條消息指定了一個Topic,那Topic又是什么呢?
Topic
Topic是對發(fā)送到RocketMQ中的消息的邏輯分類,例如我們的訂單系統(tǒng)、積分系統(tǒng)、倉儲系統(tǒng)都會用到這個MQ,為了對其進行區(qū)分,我們就可以為不同的系統(tǒng)建立不同的Topic。
那為什么說是邏輯分區(qū)呢?因為RocketMQ在真實存儲中,并不是一個Broker就存儲一個Topic的數(shù)據(jù),道理很簡單,如果當(dāng)前這個Broker宕機,甚至極端情況磁盤壞了,那這個Topic的數(shù)據(jù)就會永久丟失。
所以在真實存儲中,消息是分布式的存儲在多個Broker上的,這這些分散在多個Broker上的存儲介質(zhì)叫MessageQueue,如果你熟悉Kafka的底層原理,就知道這個跟Kafka中的Partition是同類的實現(xiàn)。
Message Queue存儲
通過上圖可以看出,同一個Topic的數(shù)據(jù),被分成了好幾份,分別存儲在不同的Broker上,那RocketMQ為什么要這么實現(xiàn)?
首先,一個Topic中如果只有一個Queue,那么消費者在消費時的速度必然受到影響;而如果一個Topic有很多個Queue,那么Consumer就可以將消費操作同時進行,從而扛住更多的并發(fā)。
除此之外,單臺機器的資源是有限的。一個Topic的消息量可能會非常之巨大,一臺機器的磁盤很快就會被塞滿。所以RocketMQ將一個Topic的數(shù)據(jù)分?jǐn)偨o了多臺機器,進行分散存儲。其本質(zhì)上就是一個數(shù)據(jù)分片存儲的一種機制。
所以我們知道了,發(fā)送到某個Topic的數(shù)據(jù)是分布式的存儲在多個Broker中的MessageQueue上的。
Broker消息存儲原理
那Producer發(fā)送到Broker中的消息,到底是以什么方式存儲的呢?答案是Commit Log,Broker收到消息,會將該消息采用順序?qū)懭氲姆绞?,追加到磁盤上的Commit Log文件中,每個Commit Log大小為1G,如果寫滿了1G則會新建一個Commit Log繼續(xù)寫,Commit Log文件的特點是順序?qū)憽㈦S機讀。
Topic詳情
這就是最底層的存儲的方式,那么問題來了,Consumer來取消息的時候,Broker是如何從這一堆的Commit Log中找到相應(yīng)的數(shù)據(jù)呢?眾所周知,一提到磁盤的I/O操作,就會聯(lián)想到耗時這兩個字,而RocketMQ的一大特點就是高吞吐,看似很矛盾,RocketMQ是如何做的呢?
答案是ConsumeQueue,Broker在寫入Commit Log的同時,還會將當(dāng)前這條消息在Commit Log中的Offset、消息的Size和對應(yīng)的Tag的Hash寫入到ConsumeQueue文件中。每個Message Queue會有相對應(yīng)的ConsumeQueue文件存儲在磁盤上。
和Commit Log一樣,一個ConsumeQueue包含了30W條消息,每條消息的大小為20字節(jié),所以每個ConsumeQueue文件的大小約為5.72M;當(dāng)其寫滿了之后,會再新建一個ConsumeQueue文件繼續(xù)寫入。
ConsumeQueue是一種邏輯隊列,更是一種索引,讓Consumer來消費的時候可以快速的從磁盤文件中定位到這條消息。
看到這你可能會想,上面提到的Tag又是個什么東西?
Tag
Tag,標(biāo)簽,用于對同一個Topic內(nèi)的消息進行分類,為什么還需要對Topic進行消息類型劃分呢?
舉一個極端的例子,某一個新的服務(wù),需要去消費訂單系統(tǒng)的MQ,但是由于業(yè)務(wù)的特殊性,只需要去消費商品類型為數(shù)碼產(chǎn)品的訂單消息,如果沒有Tag,那么該Consumer就會去做判斷,該訂單消息是否是數(shù)碼產(chǎn)品類,如果不是,則丟棄,如果是則進行消費。
這樣一來,Consumer側(cè)就執(zhí)行了大量的無用功。引入了Tag之后,Producer在生產(chǎn)消息的時候會給訂單打上Tag,Consumer進行消費的時候,可以配置只消費指定的Tag的消息。這樣一來就不需要Consumer自己去做這個事情了,RocketMQ會幫我們實現(xiàn)這個過濾。
那其過濾的原理是什么?首先在Broker側(cè)是通過消息中保存的Tag的Hash值進行過濾,然后Consumer側(cè)在去拉取消息的時候還需要再過濾一次。
為什么在Broker過濾了,還需要在Consumer側(cè)再過濾一次?因為Hash沖突,不同的Tag經(jīng)過Hash算法之后可能會得到一樣的值,所以Consumer側(cè)在拉取消息的時候會通過字符串進行二次過濾。
Producer發(fā)送消息源碼分析
流程總覽
首先給出整個發(fā)送消息的大致流程,先熟悉這個流程看源碼,會更加的清晰一點。
總體流程
初始化Prodcuer
還是按照下面這個例子出發(fā)。
producer使用樣例
首先我們會初始化一個DefaultMQProducer,RocketMQ會給這個Producer一個默認(rèn)的實現(xiàn)DefaultMQProducerImpl。然后producer.start()會啟動一個線程池。
合法性校驗
接下來就是比較核心的producer.send(msg)了,首先RocketMQ會調(diào)用checkMessage來檢測發(fā)送的消息是否合法。
send消息
這些檢測包含了待發(fā)送的消息是否為空,Topic是否為空、Topic是否包含了非法的字符串、Topic的長度是否超過了最大限制127,然后會去檢查Body是否符合發(fā)送要求,例如msg的Body是否為空、msg的Body是否超過了最大的限制等等,這里消息的Body最大不能超過4M。
檢查消息合法性源碼
調(diào)用發(fā)送消息
對于msg的Topic,RocketMQ會用NameSpace將其包裝一層,然后就會調(diào)用DefaultMQProducerImpl中的sendDefaultImpl默認(rèn)實現(xiàn),發(fā)送消息給Broker,默認(rèn)的發(fā)送消息Timeout是3秒。
發(fā)送消息默認(rèn)實現(xiàn)
發(fā)送消息中,MQ會再次調(diào)用checkMessage對消息的合法性再次進行檢查,然后就會去嘗試獲取Topic的詳細(xì)信息。
所有的Topic的信息都會存在一個叫topicPublishInfoTable的 ConcurrentHashMap中,這個Map中Key就是Topic的字符串,而Value則是TopicPublishInfo。
這個TopicPublishInfo中就包含了之前在基礎(chǔ)概念中提到的,從Broker中獲取到的相應(yīng)的元數(shù)據(jù),其中就包含了關(guān)鍵的MessageQueue和集群元數(shù)據(jù),其基礎(chǔ)的結(jié)構(gòu)如下。
Topic詳情
messageQueueList包含了該Topic下的所有的MessageQueue,每個MessageQueue的所屬Topic,每個MessageQueue所在的Broker的名稱以及專屬的queueId。
topicRouteData包含了該Topic下的所有的Queue、Broker相關(guān)的數(shù)據(jù)。
獲取Topic詳細(xì)數(shù)據(jù)
在最終發(fā)送消息前,需要獲取到Topic的詳情,例如像Broker地址這樣的數(shù)據(jù),Producer中是通過tryToFindTopicPublishInfo方法獲取的,詳細(xì)的注釋我已經(jīng)寫在了下圖中。
獲取topic詳情
對于首次使用的Topic,在上面的Map肯定是不存在的。所以RocketMQ會將其加入到Map中去,并且調(diào)用方法updateTopicRouteInfoFromNameServer從NameServer處獲取該Topic的元數(shù)據(jù),將其一并寫入Map。初次之外,還會將路由信息、Broker的詳細(xì)信息分別放入topicRouteTable和brokerAddrTable中,這兩個都是Producer維護在內(nèi)存中的ConcurrentHashMap。
獲取到了Topic的詳細(xì)信息之后,接下來會確認(rèn)一個發(fā)送的重試次數(shù)timesTotal,假設(shè)timesTotal為N,那么發(fā)送消息如果失敗就會重試N次。不過當(dāng)且僅當(dāng)發(fā)送失敗的時候才會進行重試,其余的case都不會,例如超時、或者沒有選擇到合適的MessageQueue。
這個重試的次數(shù)timesTotal受到參數(shù)communicationMode的影響;CommunicationMode有三個值,分別是SYNC、ASYNC和ONEWAY。RocketMQ默認(rèn)的實現(xiàn)中,選擇了SYNC同步。
計算重試次數(shù)
通過代碼我們可以看到,如果是communicationMode是SYNC的話,timesTotal的值為1+retryTimesWhenSendFailed,而retryTimesWhenSendFailed的值默認(rèn)為2,代表在消息發(fā)送失敗之后的重試次數(shù)。
這樣一來,如果我們選擇了SYNC的方式,Producer在發(fā)送消息的時候默認(rèn)的重試次數(shù)就為3。不過當(dāng)且僅當(dāng)發(fā)送失敗的時候才會進行重試,其余的case都不會。
MessageQueue選擇機制
我們之前聊過,一個Topic的數(shù)據(jù)是分片存儲在一個或者多個Broker上的,底層的存儲介質(zhì)為MessageQueue,之前的圖中,我們沒有給出Producer是如何選擇具體發(fā)送到哪個MessageQueue,這里我們通過源碼來看一下。
Producer中是通過selectOneMessageQueue來進行的Message Queue選擇,該方法通過Topic的詳細(xì)元數(shù)據(jù)和上次選擇的MessageQueue所在的Broker,來決定下一個的選擇。
核心的選擇邏輯
其核心的選擇邏輯是什么呢?用大白話來說,就是選出一個index,然后將其和當(dāng)前Topic的MessageQueue數(shù)量取模。這個index在首次選擇的時候,肯定是沒有的, RocketMQ會搞一個隨機數(shù)出來。然后在該值的基礎(chǔ)上+1,因為為了通用,在外層看來,這個index上次已經(jīng)用過了,所以每次獲取你都直接幫我+1就好了。
核心的選擇機制
上圖就是MessageQueue最核心的、最底層的原則機制了。但是由于實際的業(yè)務(wù)情況十分復(fù)雜, RocketMQ在實現(xiàn)中還額外的做了很多的事情。
發(fā)送故障延遲下的選擇邏輯
在實際的選擇過程中,會判斷當(dāng)前是否啟用了發(fā)送延遲故障,這個由變量sendLatencyFaultEnable的值決定,其默認(rèn)值是false,也就是默認(rèn)是不開啟的,從代碼里我暫時沒找到其開啟的位置。
不過我們可以聊聊開啟之后,會發(fā)生什么。它同樣會開啟for循環(huán),次數(shù)為MessageQueue的數(shù)量,計算拿到確定的Queue之后,會通過內(nèi)存的一張表faultItemTable去判斷當(dāng)前這個Broker是否可用,該表是每次發(fā)送消息的時候都會去更新它。
如果當(dāng)前沒有可用的Broker,則會觸發(fā)其兜底的邏輯,再選擇一個MessageQueue出來。
選擇queue的源碼
常規(guī)的選擇邏輯
如果當(dāng)前發(fā)送故障延遲沒有啟用,則會走常規(guī)邏輯,同樣的會去for循環(huán)計算,循環(huán)中取到了MessageQueue之后會去判斷是否和上次選擇的MessageQueue屬于同一個Broker,如果是同一個Broker,則會重新選擇,直到選擇到不屬于同一個Broker的MessageQueue,或者直到循環(huán)結(jié)束。這也是為了將消息均勻的分發(fā)存儲,防止數(shù)據(jù)傾斜。
常規(guī)邏輯下的選擇邏輯
消息發(fā)送
最后就會調(diào)用Netty相關(guān)的組件,將消息發(fā)送出去了。
EOF關(guān)于RocketMQ中的一些基礎(chǔ)的概念,和RocketMQ的Producer發(fā)送消息的源碼就先分析到這里,后續(xù)看緣分再分享其他部分的源碼吧。