自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

RocketMQ基礎(chǔ)概念剖析&源碼解析

開(kāi)發(fā) 前端
Topic是一類消息的集合,是一種邏輯上的分區(qū)。為什么說(shuō)是邏輯分區(qū)呢?因?yàn)樽罱K數(shù)據(jù)是存儲(chǔ)到Broker上的,而且為了滿足高可用,采用了分布式的存儲(chǔ)。

[[389254]]

 Topic

Topic是一類消息的集合,是一種邏輯上的分區(qū)。為什么說(shuō)是邏輯分區(qū)呢?因?yàn)樽罱K數(shù)據(jù)是存儲(chǔ)到Broker上的,而且為了滿足高可用,采用了分布式的存儲(chǔ)。

這和Kafka中的實(shí)現(xiàn)如出一轍,Kafka的Topic也是一種邏輯概念,每個(gè)Topic的數(shù)據(jù)會(huì)分成很多份,然后存儲(chǔ)在不同的Broker上,這個(gè)「份」叫Partition。而在RocketMQ中,Topic的數(shù)據(jù)也會(huì)分布式的存儲(chǔ),這個(gè)「份」叫MessageQueue。

其分布可以用下圖來(lái)表示。

這樣一來(lái),如果某個(gè)Broker所在的機(jī)器意外宕機(jī),而且剛好MessageQueue中的數(shù)據(jù)還沒(méi)有持久化到磁盤(pán),那么該Topic下的這部分消息就會(huì)完全丟失。此時(shí)如果有備份的話,MQ就可以繼續(xù)對(duì)外提供服務(wù)。

為什么還會(huì)出現(xiàn)沒(méi)有持久化到磁盤(pán)的情況呢?現(xiàn)在的OS當(dāng)中,程序?qū)懭霐?shù)據(jù)到文件之后,并不會(huì)立馬寫(xiě)入到磁盤(pán),因?yàn)榇疟P(pán)I/O是非常耗時(shí)的操作,在計(jì)算機(jī)來(lái)看是非常慢的一種操作。所以寫(xiě)入文件的數(shù)據(jù)會(huì)先寫(xiě)入到OS自己的緩存中去,然后擇機(jī)異步的將Buffer中的數(shù)據(jù)刷入磁盤(pán)。

通過(guò)多副本冗余的機(jī)制,使得RocketMQ具有了高可用的特性。除此之外,分布式存儲(chǔ)能夠應(yīng)對(duì)后期業(yè)務(wù)大量的數(shù)據(jù)存儲(chǔ)。如果不使用分布式進(jìn)行存儲(chǔ),那么隨著后期業(yè)務(wù)發(fā)展,消息量越來(lái)越大,單機(jī)是無(wú)論如何也滿足不了RocketMQ消息的存儲(chǔ)需求的。如果不做處理,那么一臺(tái)機(jī)器的磁盤(pán)總有被塞滿的時(shí)候,此時(shí)的系統(tǒng)就不具備可伸縮的特性,也無(wú)法滿足業(yè)務(wù)的使用要求了。

但是這里的可伸縮,和微服務(wù)中的服務(wù)可伸縮還不太一樣。因?yàn)樵谖⒎?wù)中,各個(gè)服務(wù)是無(wú)狀態(tài)的。而B(niǎo)roker是有狀態(tài)的,每個(gè)Broker上存儲(chǔ)的數(shù)據(jù)都不太一樣,因?yàn)镻roducer在發(fā)送消息的時(shí)候會(huì)通過(guò)指定的算法,從Message Queue列表中選出一個(gè)MessageQueue發(fā)送消息。

如果不是很理解這個(gè)橫向擴(kuò)展,那么可以把它當(dāng)成Redis的Cluster,通過(guò)一致性哈希,選擇到Redis Cluster中的具體某個(gè)節(jié)點(diǎn),然后將數(shù)據(jù)寫(xiě)入Redis Master中去。如果此時(shí)想要擴(kuò)容很方便,只需要往Redis Cluster中新增Master節(jié)點(diǎn)就好了。

所以,數(shù)據(jù)分布式的存儲(chǔ)本質(zhì)上是一種數(shù)據(jù)分片的機(jī)制。在此基礎(chǔ)上,通過(guò)冗余多副本,達(dá)成了高可用。

Broker

Broker可以理解為我們微服務(wù)中的一個(gè)服務(wù)的某個(gè)實(shí)例,因?yàn)槲⒎?wù)中我們的服務(wù)一般來(lái)說(shuō)都會(huì)多實(shí)例部署,而RocketMQ也同理,多實(shí)例部署可以幫助系統(tǒng)扛住更多的流量,也從某種方面提高了系統(tǒng)的健壯性。

在RocketMQ4.5之前,它使用主從架構(gòu),每一個(gè)Master Broker都有一個(gè)自己的Slave Broker。

那RocketMQ的主從Broker是如何進(jìn)行數(shù)據(jù)同步的呢?

Broker啟動(dòng)的時(shí)候,會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù),定期的從Master Broker同步全量的數(shù)據(jù)。

這塊可以先不用糾結(jié),后面我們會(huì)通過(guò)源碼來(lái)驗(yàn)證這個(gè)主從同步邏輯。

上面提到了Broker會(huì)部署很多個(gè)實(shí)例,那么既然多實(shí)例部署,那必然會(huì)存在一個(gè)問(wèn)題,客戶端是如何得知自己是連接的哪個(gè)服務(wù)器?如何得知對(duì)應(yīng)的Broker的IP地址和端口?如果某個(gè)Broker突然掛了怎么辦?

NameServer

這就需要NameServer了,NameServer是什么?

這里先拿Spring Cloud舉例子——Spring Cloud中服務(wù)啟動(dòng)的時(shí)候會(huì)將自己注冊(cè)到Eureka注冊(cè)中心上。當(dāng)服務(wù)實(shí)例啟動(dòng)的時(shí)候,會(huì)從Eureka拉取全量的注冊(cè)表,并且之后定期的從Eureka增量同步,并且每隔30秒發(fā)送心跳到Eureka去續(xù)約。如果Eureka檢測(cè)到某個(gè)服務(wù)超過(guò)了90秒沒(méi)有發(fā)送心跳,那么就會(huì)該服務(wù)宕機(jī),就會(huì)將其從注冊(cè)表中移除。

RocketMQ中,NameServer充當(dāng)?shù)囊彩穷愃频慕巧?。兩者從功能上也有一定的區(qū)別。

Broker在啟動(dòng)的時(shí)候會(huì)向NameServer注冊(cè)自己,并且每隔30秒向NameServerv發(fā)送心跳。如果某個(gè)Broker超過(guò)了120秒沒(méi)有發(fā)送心跳,那么就會(huì)認(rèn)為該Broker宕機(jī),就會(huì)將其從維護(hù)的信息中移除。這塊后面也會(huì)從源碼層面驗(yàn)證。

當(dāng)然NameServer不僅僅是存儲(chǔ)了各個(gè)Broker的IP地址和端口,還存儲(chǔ)了對(duì)應(yīng)的Topic的路由數(shù)據(jù)。什么是路由數(shù)據(jù)呢?那就是某個(gè)Topic下的哪個(gè)Message Queue在哪臺(tái)Broker上。

Producer

總體流程

接下來(lái),我們來(lái)看看Producer發(fā)送一條消息到Broker的時(shí)候會(huì)做什么事情,整體的流程如下。

檢查消息合法性

整體來(lái)看,其實(shí)是個(gè)很簡(jiǎn)單的操作,跟我們平時(shí)寫(xiě)代碼是一樣的,來(lái)請(qǐng)求了先校驗(yàn)請(qǐng)求是否合法。Producer啟動(dòng)這里會(huì)去校驗(yàn)當(dāng)前Topic數(shù)據(jù)的合法性。

  • Topic名稱中是否包含了非法字符
  • Topic名稱長(zhǎng)度是否超過(guò)了最大的長(zhǎng)度限制,由常量TOPIC_MAX_LENGTH來(lái)決定,其默認(rèn)值為127
  • 當(dāng)前消息體是否是NULL或者是空消息
  • 當(dāng)前消息體是否超過(guò)了最大限制,由常量maxMessageSize決定,值為1024 * 1024 * 4,也就是4M。

都是些很常規(guī)的操作,和我們平時(shí)寫(xiě)的checker都差不多。

獲取Topic的詳情

當(dāng)通過(guò)了消息的合法性校驗(yàn)之后,就需要繼續(xù)往下走。此時(shí)的關(guān)注點(diǎn)就應(yīng)該從消息是否合法轉(zhuǎn)移到我要發(fā)消息給誰(shuí)。

此時(shí)就需要通過(guò)當(dāng)前消息所屬的Topic拿到Topic的詳細(xì)數(shù)據(jù)。

獲取Topic的方法源碼在上面已經(jīng)給出來(lái)了,首先會(huì)從內(nèi)存中維護(hù)的一份Map中獲取數(shù)據(jù)。順帶一提,這里的Map是ConcurrentHashMap,是線程安全的,和Golang中的Sync.Map類似。

當(dāng)然,首次發(fā)送的話,這個(gè)Map肯定是空的,此時(shí)會(huì)調(diào)用NameServer的接口,通過(guò)Topic去獲取詳情的Topic數(shù)據(jù),此時(shí)會(huì)在上面的方法中將其加入到Map中去,這樣一來(lái)下次再往該Topic發(fā)送消息就能夠直接從內(nèi)存中獲取。這里就是簡(jiǎn)單的實(shí)現(xiàn)的緩存機(jī)制 。

從方法名稱來(lái)看,是通過(guò)Topic獲取路由數(shù)據(jù)。實(shí)際上該方法,通過(guò)調(diào)用NameServer提供的API,更新了兩部分?jǐn)?shù)據(jù),分別是:

  • Topic路由信息
  • Topic下的Broker相關(guān)信息

而這兩部分?jǐn)?shù)據(jù)都來(lái)源于同一個(gè)結(jié)構(gòu)體TopicRouteData。其結(jié)構(gòu)如下。

通過(guò)源碼可以看到,就包含了該Topic下所有Broker下的Message Queue相關(guān)的數(shù)據(jù)、所有Broker的地址信息。

發(fā)送的具體Queue

此時(shí)我們獲取到了需要發(fā)送到的Broker詳情,包括地址和MessageQueue,那么此時(shí)問(wèn)題的關(guān)注點(diǎn)又該從「消息發(fā)送給誰(shuí)」轉(zhuǎn)移到「消息具體發(fā)送到哪兒」。

什么叫發(fā)送到哪兒?

開(kāi)篇提到過(guò)一個(gè)Topic下會(huì)被分為很多個(gè)MessageQueue,「發(fā)送到哪兒」指的就是具體發(fā)送到哪一個(gè)Message Queue中去。

Message Queue選擇機(jī)制

核心的選擇邏輯

還是先給出流程圖

核心邏輯,用大白話講就是將一個(gè)隨機(jī)數(shù)和Message Queue的容量取模。這個(gè)隨機(jī)數(shù)存儲(chǔ)在Thread Local中,首次計(jì)算的時(shí)候,會(huì)直接隨機(jī)一個(gè)數(shù)。

此后,都直接從ThreadLocal中取出該值,并且+1返回,拿到了MessageQueue的數(shù)量和隨機(jī)數(shù)兩個(gè)關(guān)鍵的參數(shù)之后,就會(huì)執(zhí)行最終的計(jì)算邏輯。

接下來(lái),我們來(lái)看看選擇Message Queue的方法SelectOneMessageQueue都做了什么操作吧。

可以看到,主邏輯被變量sendLatencyFaultEnable分為了兩部分。

容錯(cuò)機(jī)制下的選擇邏輯

該變量表意為發(fā)送延遲故障。本質(zhì)上是一種容錯(cuò)的策略,在原有的MessageQueue選擇基礎(chǔ)上,再過(guò)濾掉不可用的Broker,對(duì)之前失敗的Broker,按一定的時(shí)間做退避。

可以看到,如果調(diào)用Broker信息發(fā)生了異常,那么就會(huì)調(diào)用updateFault這個(gè)方法,來(lái)更新Broker的Aviable情況。注意這個(gè)參數(shù)isolation的值為true。接下來(lái)我們從源碼級(jí)別來(lái)驗(yàn)證上面說(shuō)的退避3000ms的事實(shí)。

可以看到,isolation值是true,則duration通過(guò)三元運(yùn)算符計(jì)算出來(lái)結(jié)果為30000,也就是30秒。所以我們可以得出結(jié)論,如果發(fā)送消息拋出了異常,那么直接會(huì)將該Broker設(shè)置為30秒內(nèi)不可用。

而如果只是發(fā)送延遲較高,則會(huì)根據(jù)如下的map,根據(jù)延遲的具體時(shí)間,來(lái)判斷該設(shè)置多少時(shí)間的不可用。

例如,如果上次請(qǐng)求的latency超過(guò)550ms,就退避3000ms;超過(guò)1000,就退避60000;

正常情況下的選擇邏輯

而正常情況下,如果當(dāng)前發(fā)送故障延遲沒(méi)有啟用,則會(huì)走常規(guī)邏輯,同樣的會(huì)去for循環(huán)計(jì)算,循環(huán)中取到了MessageQueue之后會(huì)去判斷是否和上次選擇的MessageQueue屬于同一個(gè)Broker,如果是同一個(gè)Broker,則會(huì)重新選擇,直到選擇到不屬于同一個(gè)Broker的MessageQueue,或者直到循環(huán)結(jié)束。這也是為了將消息均勻的分發(fā)存儲(chǔ),防止數(shù)據(jù)傾斜。

發(fā)送消息

選到了具體的Message Queue之后就會(huì)開(kāi)始執(zhí)行發(fā)送消息的邏輯,就會(huì)調(diào)用底層Netty的接口給發(fā)送出去,這塊暫時(shí)沒(méi)啥可看的。

Broker的啟動(dòng)流程

主從同步

在上面提到過(guò),RocketMQ有自己的主從同步,但是有兩個(gè)不同的版本,版本的分水嶺是在4.5版本。這兩個(gè)版本區(qū)別是什么呢?

  • 4.5之前:有點(diǎn)類似于Redis中,我們手動(dòng)的將某臺(tái)機(jī)器通過(guò)命令slave of 變成另一臺(tái)Redis的Slave節(jié)點(diǎn),這樣一來(lái)就變成了一個(gè)較為原始的一主一從的架構(gòu)。為什么說(shuō)原始呢?因?yàn)槿绻藭r(shí)Master節(jié)點(diǎn)宕機(jī),我們需要人肉的去做故障轉(zhuǎn)移。RocketMQ的主從架構(gòu)也是這種情況。
  • 4.5之后:引入了Dleger,可以實(shí)現(xiàn)一主多從,并且實(shí)現(xiàn)自動(dòng)的故障轉(zhuǎn)移。這就跟Redis后續(xù)推出了Sentinel是一樣的。Dleger也是類似的作用。

下圖是Broker啟動(dòng)代碼中的源碼。

可以看到判斷了是否開(kāi)啟了Dleger,默認(rèn)是不開(kāi)啟的。所以就會(huì)執(zhí)行其中的邏輯。

剛好我們就看到了,里面有Rocket主從同步數(shù)據(jù)的相關(guān)代碼。

如果當(dāng)前Broker節(jié)點(diǎn)的角色是Slave,則會(huì)啟動(dòng)一個(gè)周期性的定時(shí)任務(wù),定期(也就是10秒)去Master Broker同步全量的數(shù)據(jù)。同步的數(shù)據(jù)包括:

  • Topic的相關(guān)配置
  • Cosumer的消費(fèi)偏移量
  • 延遲消息的Offset
  • 訂閱組的相關(guān)數(shù)據(jù)和配置

注冊(cè)Broker

完成了主動(dòng)同步定時(shí)任務(wù)的啟動(dòng)之后,就會(huì)去調(diào)用registerBrokerAll去注冊(cè)Broker。可能這里會(huì)有點(diǎn)疑問(wèn),我這里是Broker啟動(dòng),只有當(dāng)前一個(gè)Broker實(shí)例,那這個(gè)All是什么意思呢?

All是指所有的NameServer,Broker啟動(dòng)的時(shí)候會(huì)將自己注冊(cè)到每一個(gè)NameServer上去。為什么不只注冊(cè)到一個(gè)NameServer就完事了呢?這樣一來(lái)還可以提高效率。歸根結(jié)底還是高可用的問(wèn)題。

如果Broker只注冊(cè)到了一臺(tái)NameServer上,萬(wàn)一這臺(tái)NameServer掛了呢?這個(gè)Broker對(duì)所有客戶端就都不可見(jiàn)了。實(shí)際上Broker還在正常的運(yùn)行。

進(jìn)到registerBrokerAll中去。

可以看到,這里會(huì)判斷是否需要進(jìn)行注冊(cè)。通過(guò)上面的截圖可以看到,此時(shí)forceRegister的值為true,而是否要注冊(cè),決定權(quán)就交給了needRegister

為什么需要判斷是否需要注冊(cè)呢?因?yàn)锽roker一旦注冊(cè)到了NameServer之后,由于Producer不停的在寫(xiě)入數(shù)據(jù),Consumer也在不停的消費(fèi)數(shù)據(jù),Broker也可能因?yàn)楣收蠈?dǎo)致某些Topic下的Message Queue等關(guān)鍵的路由信息發(fā)生變動(dòng)。

這樣一來(lái),NameServer中的數(shù)據(jù)和Broker中的數(shù)據(jù)就會(huì)不一致。

如何判斷是否需要注冊(cè)

大致的思路是,Broker會(huì)從每一個(gè)NameServer中獲取到當(dāng)前Broker的數(shù)據(jù),并和當(dāng)前Broker節(jié)點(diǎn)中的數(shù)據(jù)做對(duì)比。但凡有一臺(tái)NameServer數(shù)據(jù)和當(dāng)前Broker不一致,都會(huì)進(jìn)行注冊(cè)操作。

接下來(lái),我們從源碼層面驗(yàn)證這個(gè)邏輯。關(guān)鍵的邏輯我在圖中也標(biāo)注了出來(lái)。

可以看到, 就是通過(guò)對(duì)比Broker中的數(shù)據(jù)版本和NameServer中的數(shù)據(jù)版本來(lái)實(shí)現(xiàn)的。這個(gè)版本,注冊(cè)的時(shí)候會(huì)寫(xiě)到注冊(cè)的數(shù)據(jù)中存入NameServer中。

這里由于是有多個(gè),所以RocketMQ用線程池來(lái)實(shí)現(xiàn)了多線程操作,并且用CountDownLatch來(lái)等待所有的返回結(jié)果。經(jīng)典的用空間換時(shí)間,Golang里面也有類似的操作,那就是sync.waitGroup。

關(guān)于任何一個(gè)數(shù)據(jù)不匹配,都會(huì)進(jìn)行重新注冊(cè)的事實(shí),我們也從源碼層面來(lái)驗(yàn)證一下。

可以看到,如果任何一臺(tái)NameServer的數(shù)據(jù)發(fā)生了Change,都會(huì)break,返回true。

這里的結(jié)果列表使用的是CopyOnWriteList來(lái)實(shí)現(xiàn)的。

因?yàn)檫@里是多線程去執(zhí)行的判斷邏輯,而正常的列表不是線程安全的。CopyOnWriteArrayList之所以是線程安全的,這歸功于COW(Copy On Write),讀請(qǐng)求時(shí)共用同一個(gè)List,涉及到寫(xiě)請(qǐng)求時(shí),會(huì)復(fù)制出一個(gè)List,并在寫(xiě)入數(shù)據(jù)的時(shí)候加入獨(dú)占鎖。比起直接對(duì)所有操作加鎖,讀寫(xiě)鎖的形式分離了讀、寫(xiě)請(qǐng)求,使其互不影響,只對(duì)寫(xiě)請(qǐng)求加鎖,降低了加鎖的次數(shù)、減少了加鎖的消耗,提升了整體操作的并發(fā)。

執(zhí)行注冊(cè)邏輯

這塊就是構(gòu)建數(shù)據(jù),然后多線程并發(fā)的去發(fā)送請(qǐng)求,用CopyOnWriteArrayList來(lái)保存結(jié)果。不過(guò),上面我們提到過(guò),Broker注冊(cè)的時(shí)候,會(huì)把數(shù)據(jù)版本發(fā)送到NameServer并且存儲(chǔ)起來(lái),這塊我們可以看看發(fā)送到NameServer的數(shù)據(jù)結(jié)構(gòu)。

可以看到,Topic的數(shù)據(jù)分為了兩部分,一部分是核心的邏輯,另一部分是DataVersion,也就是我們剛剛一直提到的數(shù)據(jù)版本。

Broker如何存儲(chǔ)數(shù)據(jù)

剛剛在聊Producer最后提到的是,發(fā)送消息到Broker就完了。不知道大家有沒(méi)有想過(guò)Broker是如何存儲(chǔ)消息的?

Commit log

先給出流程圖

然后給出結(jié)論,Producer發(fā)送的消息是存儲(chǔ)在一種叫commit log的文件中的,Producer端每次寫(xiě)入的消息是不等長(zhǎng)的,當(dāng)該CommitLog文件寫(xiě)入滿1G,就會(huì)新建另一個(gè)新的CommitLog,繼續(xù)寫(xiě)入。此次采取的是順序?qū)懭搿?/p>

那么問(wèn)題來(lái)了,Consumer來(lái)消費(fèi)的時(shí)候,Broker是如何快速找到對(duì)應(yīng)的消息的呢?我們首先排除遍歷文件查找的方法, 因?yàn)镽ocketMQ是以高吞吐、高性能著稱的,肯定不可能采取這種對(duì)于很慢的操作。那RocketMQ是如何做的呢?

答案是ConsumerQueue

ConsumerQueue

ConsumerQueue是什么?是文件。引入的目的是什么呢?提高消費(fèi)的性能。

Broker在收到一條消息的時(shí)候,寫(xiě)入Commit Log的同時(shí),還會(huì)將當(dāng)前這條消息在commit log中的offset、消息的size和對(duì)應(yīng)的Tag的Hash寫(xiě)入到consumer queue文件中去。

每個(gè)MessageQueue都會(huì)有對(duì)應(yīng)的ConsumerQueue文件存儲(chǔ)在磁盤(pán)上,每個(gè)ConsumerQueue文件包含了30W條消息,每條消息的size大小為20字節(jié),包含了8字節(jié)CommitLog的Offset、4字節(jié)的消息長(zhǎng)度、8字節(jié)的Tag的哈希值。這樣一來(lái),每個(gè)ConsumerQueue的文件大小就約為5.72M。

當(dāng)該ConsumerQueue文件寫(xiě)滿了之后,就會(huì)再新建一個(gè)ConsumerQueue文件,繼續(xù)寫(xiě)入。

所以,ConsumerQueue文件可以看成是CommitLog文件的索引。

負(fù)載均衡

什么意思呢?假設(shè)我們總共有6個(gè)MessageQueue,然后此時(shí)分布在了3臺(tái)Broker上,每個(gè)Broker上包含了兩個(gè)queue。此時(shí)Consumer有3臺(tái),我們可以大致的認(rèn)為每個(gè)Consumer負(fù)責(zé)2個(gè)MessageQueue的消費(fèi)。但是這里有一個(gè)原則,那就是一個(gè)MessageQueue只能被一臺(tái)Consumer消費(fèi),而一臺(tái)Consumer可以消費(fèi)多個(gè)MessageQueue。

為什么?道理很簡(jiǎn)單,RocketMQ支持的順序消費(fèi),是指的分區(qū)順序性,也就是在單個(gè)MessageQueue中,消息是具有順序性的,而如果多臺(tái)Consumer去消費(fèi)同一個(gè)MessageQueue,就很難去保證順序消費(fèi)了。

由于有很多個(gè)Consumer在消費(fèi)多個(gè)MessageQueue,所以為了不出現(xiàn)數(shù)據(jù)傾斜,也為了資源的合理分配利用,在Producer發(fā)送消息的時(shí)候,需要盡可能的將消息均勻的分發(fā)給多個(gè)MessageQueue。

同時(shí),上面那種一個(gè)Consumer消費(fèi)了2個(gè)MessageQueue的情況,萬(wàn)一這臺(tái)Consumer掛了呢?這兩個(gè)MessageQueue不就沒(méi)人消費(fèi)了?

以上兩種情況分別是Producer端的負(fù)載均衡、Consumer端的負(fù)載均衡。

Producer端負(fù)載均衡

關(guān)于Producer端上面的負(fù)載均衡,上面的流程圖已經(jīng)給了出來(lái),并且給出了源碼的驗(yàn)證。首先是容錯(cuò)策略,會(huì)去避開(kāi)一段時(shí)間有問(wèn)題的Broker,并且加上如果選擇了上次的Broker,就會(huì)重新進(jìn)行選擇。

Consumer端負(fù)載均衡

首先Consumer端的負(fù)責(zé)均衡可以由兩個(gè)對(duì)象觸發(fā):

  • Broker
  • Consumer自身

Consumer也會(huì)向所有的Broker發(fā)送心跳,將消息的消費(fèi)組名稱、訂閱關(guān)系集合、消息的通信模式和客戶端的ID等等。Broker收到了Consumer的心跳之后,會(huì)將其存在Broker維護(hù)的一個(gè)Manager中,名字叫ConsumerManager。當(dāng)Broker監(jiān)聽(tīng)到了Consumer數(shù)量發(fā)生了變動(dòng),就會(huì)通知Consumer進(jìn)行Rebalance。

但是如果Broker通知Consumer進(jìn)行Rebalance的消息丟了呢?這也就是為什么需要對(duì)Consumer自身進(jìn)行觸發(fā)的原因。Consumer會(huì)在啟動(dòng)的時(shí)候啟動(dòng)定時(shí)任務(wù),周期性的執(zhí)行rebalance操作。

默認(rèn)是20秒執(zhí)行一次。具體的代碼如下。

具體流程

首先,Consumer的Rebalance會(huì)獲取到本地緩存的Topic的全部數(shù)據(jù),然后向Broker發(fā)起請(qǐng)求,拉取該Topic和ConsumerGroup下的所有的消費(fèi)者信息。此處的Broker數(shù)據(jù)來(lái)源就是Consumer之前的心跳發(fā)送過(guò)去的數(shù)據(jù)。然后會(huì)對(duì)Topic中MessageQueue和消費(fèi)者ID進(jìn)行排序,然后用消息隊(duì)列默認(rèn)分配算法來(lái)進(jìn)行分配,這里的默認(rèn)分配策略是平均分配。

首先會(huì)均勻的按照類似分頁(yè)的思想,將MessageQueue分配給Consumer,如果分配的不均勻,則會(huì)依次的將剩下的MessageQueue按照排序的順序,從上往下的分配。所以在這里Consumer 1被分配到了4個(gè)MessageQueue,而Consumer 2被分配到了3個(gè)MessageQueue。

Rebalance完了之后,會(huì)將結(jié)果和Consumer緩存的數(shù)據(jù)做對(duì)比,移除不在ReBalance結(jié)果中的MessageQueue,將原本沒(méi)有的MessageQueue給新增到緩存中。

觸發(fā)時(shí)機(jī)

  • Consumer啟動(dòng)時(shí) 啟動(dòng)之后會(huì)立馬進(jìn)行Rebalance
  • Consumer運(yùn)行中 運(yùn)行中會(huì)監(jiān)聽(tīng)Broker發(fā)送過(guò)來(lái)的Rebalance消息,以及Consumer自身的定時(shí)任務(wù)觸發(fā)的Rebalance
  • Consumer停止運(yùn)行 停止時(shí)沒(méi)有直接地調(diào)用Rebalance,而是會(huì)通知Broker自己下線了,然后Broker會(huì)通知其余的Consumer進(jìn)行Rebalance。

換一個(gè)角度來(lái)分析,其實(shí)就是兩個(gè)方面,一個(gè)是隊(duì)列信息發(fā)生了變化,另一種是消費(fèi)者發(fā)生了變化。

源碼驗(yàn)證

然后給出核心的代碼驗(yàn)證,獲取數(shù)據(jù)的邏輯如下


驗(yàn)證了我們剛剛說(shuō)的獲取了本地的Topic數(shù)據(jù)緩存,和從Broker端拉取所有的ConsumerID。

接下來(lái)是驗(yàn)證剛說(shuō)的排序邏輯。

接下來(lái)是看判斷結(jié)果是否發(fā)生了變化的源碼。

可以看到,Consumer通知Broker策略,其本質(zhì)上就是發(fā)送心跳,將更新后的數(shù)據(jù)通過(guò)心跳發(fā)送給所有的Broker。

Consumer更多的細(xì)節(jié)

可能關(guān)于Consumer,我們使用的更多一點(diǎn)。例如我們知道我們可以設(shè)置集群消費(fèi)和廣播消息,分別對(duì)應(yīng)RocketMQ中的CLUSTERING和BROADCASTING**。

再比如我們知道,我們可以設(shè)置順序消費(fèi)和并發(fā)消費(fèi)等等,接下來(lái)就讓我們用源碼來(lái)看看這些功能在RocketMQ中是怎么實(shí)現(xiàn)的。

消費(fèi)模型

在Consumer中,默認(rèn)都是采用集群消費(fèi),這塊在Consumer的代碼中也有體現(xiàn)。

而消費(fèi)模式的不同,會(huì)影響到管理offset的具體實(shí)現(xiàn)。

可以看到,當(dāng)消費(fèi)模型是廣播模式時(shí),Offset的持久化管理會(huì)使用實(shí)現(xiàn)LocalFileOffsetStorage

當(dāng)消費(fèi)模式是集群消費(fèi)時(shí),則會(huì)使用RemoteBrokerOffsetStore。

具體原因是什么呢?首先我們得知道廣播模式和集群模式的區(qū)別在哪兒:

  • 廣播模式下,一條消息會(huì)被ConsumerGroup中的每一臺(tái)機(jī)器所消費(fèi)
  • 集群模式下,一條消息只會(huì)被ConsumerGroup中的一臺(tái)機(jī)器消費(fèi)

所以在廣播模式下,每個(gè)ConsumerGroup的消費(fèi)進(jìn)度都不一樣,所以需要由Consumer自身來(lái)管理Offset。而集群模式下,同個(gè)ConsumerGroup下的消費(fèi)進(jìn)度其實(shí)是一樣的,所以可以交由Broker統(tǒng)一管理。

消費(fèi)模式

消費(fèi)模式則分為順序消費(fèi)和并發(fā)消費(fèi),分別對(duì)應(yīng)實(shí)現(xiàn)MessageListenerOrderly和MessageListenerConcurrently兩種方式。

不同的消費(fèi)方式會(huì)采取不同的底層實(shí)現(xiàn),配置完成之后就會(huì)調(diào)用start。

拉取消息

接下來(lái)我們來(lái)看一個(gè)跟我們最最相關(guān)的問(wèn)題,那就是我們平時(shí)消費(fèi)的消息到底是怎么樣從Broker發(fā)到的Consumer。在靠近啟動(dòng)Rebalance的地方,Consumer也開(kāi)啟了一個(gè)定時(shí)拉取消息的線程。

這個(gè)線程做了什么事呢?它會(huì)不停的從一個(gè)維護(hù)在內(nèi)存中的Queue中獲取一個(gè)在寫(xiě)入的時(shí)候就構(gòu)建好的PullRequest對(duì)象,調(diào)用具體實(shí)現(xiàn)去不停的拉取消息了。

處理消費(fèi)結(jié)果

在這里是否開(kāi)啟AutoCommit,所做的處理差不了很多,大家也都知道,唯一區(qū)別就在于是否自動(dòng)的提交Offset。對(duì)于處理成功的邏輯也差不多,我們平時(shí)業(yè)務(wù)邏輯中可能也并不關(guān)心消費(fèi)成功的消息。我們更多關(guān)注的是如果消費(fèi)失敗了,RocketMQ是怎么處理的?

這是在AutoCommit下,如果消費(fèi)失敗了的處理邏輯。會(huì)記錄一個(gè)失敗的TPS,然后這里有一個(gè)非常關(guān)鍵的邏輯,那就是checkReconsumeTimes。

如果當(dāng)前消息的重試次數(shù),如果大于了最大的重試消費(fèi)次數(shù),就會(huì)把消費(fèi)發(fā)回給Broker。那最大重試次數(shù)是如何定義的。

如果值為-1,那么最大次數(shù)就是MAX_VALUE,也就是2147483647。這里有點(diǎn)奇怪啊,按照我們平常的認(rèn)知,難道不是重試16次嗎?然后就看到了很騷的一句注釋。

-1 means 16 times,這代碼確實(shí)有點(diǎn),一言難盡。

然后,如果超過(guò)了最大的次數(shù)限制,就會(huì)將該消息調(diào)用Prodcuer的默認(rèn)實(shí)現(xiàn),將其發(fā)送到死信隊(duì)列中。當(dāng)然,死信隊(duì)列也不是什么特殊的存在,就是一個(gè)單獨(dú)的Topic而已。


通過(guò)getRetryTopic來(lái)獲取的,默認(rèn)是給當(dāng)前的ConsumerGroup名稱加上一個(gè)前綴。

 

責(zé)任編輯:姜華 來(lái)源: 今日頭條
相關(guān)推薦

2021-02-26 13:59:41

RocketMQProducer底層

2021-11-04 12:42:55

RocketMQ啟動(dòng)消費(fèi)

2021-07-09 07:15:48

RocketMQ數(shù)據(jù)結(jié)構(gòu)kafka

2020-12-01 15:00:20

Java 基礎(chǔ)

2010-07-19 09:52:04

Perl標(biāo)量

2010-06-30 16:00:01

FTP協(xié)議

2024-10-29 08:34:27

RocketMQ消息類型事務(wù)消息

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2010-01-14 14:56:07

2022-09-27 18:56:28

ArrayList數(shù)組源代碼

2010-08-16 11:19:31

DIV

2024-11-14 09:10:13

消費(fèi)者RocketMQ負(fù)載均衡

2023-04-03 08:30:54

項(xiàng)目源碼操作流程

2022-04-29 14:56:40

通話應(yīng)用源碼剖析

2010-01-06 16:45:42

.Net Framew

2023-10-17 09:36:32

Spark大數(shù)據(jù)

2010-09-26 11:31:03

DHCP服務(wù)

2010-07-19 08:39:14

Perl包

2024-09-25 08:32:05

2024-08-22 18:49:23

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)