自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

從RocketMQ的Broker源碼層面驗(yàn)證一下這兩個(gè)點(diǎn)

開(kāi)發(fā) 前端
Producer從啟動(dòng)到發(fā)送消息的整個(gè)過(guò)程,從源碼級(jí)別分析了Producer在發(fā)送消息到Broker的時(shí)候,是如何拿到Broker的數(shù)據(jù)的,如何從多個(gè)MessageQueue中選擇對(duì)應(yīng)的Queue發(fā)送消息。

 [[387560]]

本篇博客會(huì)從源碼層面,驗(yàn)證在RocketMQ基礎(chǔ)概念剖析,并分析一下Producer的底層源碼中提到的結(jié)論,分別是:

  • Broker在啟動(dòng)時(shí),會(huì)將自己注冊(cè)到所有的NameServer上
  • Broker在啟動(dòng)之后,會(huì)每隔30S向NameServer發(fā)送心跳

之前的文章中,我們知道了RocketMQ中的一些核心概念,例如Broker、NameServer、Topic和Tag等等。Producer從啟動(dòng)到發(fā)送消息的整個(gè)過(guò)程,從源碼級(jí)別分析了Producer在發(fā)送消息到Broker的時(shí)候,是如何拿到Broker的數(shù)據(jù)的,如何從多個(gè)MessageQueue中選擇對(duì)應(yīng)的Queue發(fā)送消息。

但是由于篇幅原因,文章開(kāi)頭提到的兩個(gè)已知結(jié)論在上篇博客里并沒(méi)沒(méi)有對(duì)其進(jìn)行驗(yàn)證,這次就從源碼層面來(lái)驗(yàn)證一下。

一開(kāi)頭就看到Broker主從架構(gòu)相關(guān)的源碼

在上篇博客中提到過(guò),Broker為了保證自身的高可用,會(huì)采取一主一從的架構(gòu)。即使Master Broker因?yàn)橐馔庠驋炝?,Slave Broker上還有一份完整的數(shù)據(jù),Broker可以繼續(xù)提供服務(wù)。

isEnableDLegerCommitLog中提到的DLeger可以先不管,我們目前只需要知道其默認(rèn)返回的結(jié)果是false。所以Broker首次啟動(dòng)的時(shí)候,就會(huì)執(zhí)行被If包裹住的邏輯。

RocketMQ本身是有主從架構(gòu)的,但是功能不夠完善,如果Master Broker出現(xiàn)了故障,需要人工的將Slave Broker切換成Master。

就有點(diǎn)類(lèi)似于手動(dòng)的將一臺(tái)Redis設(shè)置成另一臺(tái)Redis的Slave節(jié)點(diǎn),如果此時(shí)Redis的Master掛了,還需要手動(dòng)的進(jìn)行切換一樣。為了解決這個(gè)問(wèn)題,Redis搞出了Sentinel,可以在發(fā)生故障的時(shí)候自動(dòng)的實(shí)現(xiàn)故障轉(zhuǎn)移。所以RocketMQ在4.5版本之后推出的Dleger差不多也是這么個(gè)東西,除此之外,Dleger還可以實(shí)現(xiàn)多副本。

不使用Dleger時(shí),主從數(shù)據(jù)如何進(jìn)行同步

先給出結(jié)論,在RocketMQ的主從架構(gòu)下,主從同步采取的是Slave主動(dòng)拉取的方式。

如果當(dāng)前執(zhí)行注冊(cè)的Broker角色是Slave,那就會(huì)使用ScheduledExecutorService啟動(dòng)一個(gè)周期性的定時(shí)任務(wù),每隔10秒就會(huì)去Master同步一次,同步的數(shù)據(jù)包括Topic的相關(guān)配置、Consumer的消費(fèi)偏移量、延遲消息的Offset、訂閱組的相關(guān)數(shù)據(jù)和配置。

ScheduledExecutorService的作用和原理下面會(huì)做簡(jiǎn)單介紹。

首次啟動(dòng)時(shí)強(qiáng)制進(jìn)行Broker注冊(cè)

因?yàn)槭鞘状螁?dòng),所以參數(shù)forceRegister被直接設(shè)置成了true。

使用ScheduledExecutorService啟動(dòng)定時(shí)任務(wù)

通過(guò)入口進(jìn)來(lái)之后,Broker會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù),周期性的去注冊(cè)。ScheduledExecutorService底層就是一個(gè)newSingleThreadScheduledExecutor,只有一個(gè)線程的線程池,其關(guān)鍵的參數(shù)corePoolSize值為1,然后按照指定的頻率周期性的執(zhí)行某個(gè)任務(wù)。

ScheduledExecutorService主要的功能有兩個(gè),分別是:

  • ScheduledExecutorService 以固定的頻率執(zhí)行任務(wù)
  • ScheduledExecutorService 執(zhí)行完之后,間隔制定的時(shí)間后再執(zhí)行下一個(gè)任務(wù)

使用scheduleAtFixedRate實(shí)現(xiàn)心跳機(jī)制

此處我們使用的是scheduleAtFixedRate,如下圖。

至于執(zhí)行的頻率,我們能夠配置的范圍最大不能超過(guò)一分鐘,也就是說(shuō)這個(gè)范圍是在10-60秒之間,默認(rèn)30秒執(zhí)行一次,這也就驗(yàn)證了每30秒,Broker會(huì)向NameServer發(fā)送一次心跳。

獲取執(zhí)行頻率的這個(gè)判斷有點(diǎn)意思,甚至看起來(lái)有那么一絲絲簡(jiǎn)潔,但是理解其具體可配置的時(shí)間范圍可能需要花點(diǎn)時(shí)間。在實(shí)際業(yè)務(wù)性代碼中,個(gè)人建議還是不要這么寫(xiě),業(yè)務(wù)中代碼的可讀性和可維護(hù)性我認(rèn)為是需要放在首位的。

值得注意的是,此處啟動(dòng)心跳,給了一個(gè)10秒的延遲,因?yàn)樵诓皇褂肈leger的情況下,在之前的邏輯中已經(jīng)執(zhí)行過(guò)一次注冊(cè)了。如果不做延遲,那么幾乎是同一個(gè)時(shí)間就會(huì)有兩次注冊(cè)操作,而這明顯是不符合預(yù)期的;同時(shí)forceRegister也從true變成了通過(guò)函數(shù)isForceRegister來(lái)進(jìn)行獲取。

調(diào)用registerBrokerAll注冊(cè)

定時(shí)任務(wù)注冊(cè)完成之后,之后的每次觸發(fā)都會(huì)執(zhí)行registerBrokerAll方法來(lái)執(zhí)行注冊(cè),你可能會(huì)有疑問(wèn),我當(dāng)前不就是一個(gè)Broker嗎,怎么名字有個(gè)后綴All?那是因?yàn)镹ameServer會(huì)有多個(gè),Broker啟動(dòng)的時(shí)候會(huì)將自己注冊(cè)到所有的NameServer上去。當(dāng)然,口說(shuō)無(wú)憑,我們繼續(xù)看下去。

繼續(xù)往里走,如果當(dāng)前滿足注冊(cè)條件,則會(huì)實(shí)際的執(zhí)行注冊(cè)操作。那具體滿足什么條件呢?由變量forceRegister和一個(gè)needRegister方法來(lái)決定,forceRegister默認(rèn)是true,所以當(dāng)?shù)谝粓?zhí)行這個(gè)邏輯的時(shí)候是一定會(huì)執(zhí)行注冊(cè)操作的。

通過(guò)對(duì)比數(shù)據(jù)版本判斷當(dāng)前Broker是否需要進(jìn)行注冊(cè)

感興趣的話,可以繼續(xù)跟隨文章了解一下,needRegister是根據(jù)什么來(lái)判斷是否需要注冊(cè)的。

首先,Broker一旦注冊(cè)到了NameServer之后,由于Producer不停的在寫(xiě)入數(shù)據(jù),Consumer也在不停的消費(fèi)數(shù)據(jù),Broker也可能因?yàn)楣收蠈?dǎo)致MessageQueue等關(guān)鍵路由信息發(fā)生變動(dòng),NameServer中的數(shù)據(jù)和Broker中實(shí)際的數(shù)據(jù)就會(huì)不一致,如果不及時(shí)更新,Producer拉取到的路由數(shù)據(jù)就可能有誤。

所以每次定時(shí)任務(wù)觸發(fā)的時(shí)候會(huì)去對(duì)比NameServer和Broker的數(shù)據(jù),如果發(fā)現(xiàn)數(shù)據(jù)版本不一致,Broker會(huì)重新進(jìn)行注冊(cè),將最新的數(shù)據(jù)更新到NameServer。說(shuō)直白一點(diǎn),就是做一個(gè)數(shù)據(jù)定時(shí)更新。以下紅框中的代碼就是數(shù)據(jù)對(duì)比的核心代碼。

當(dāng)Broker和所有的NameServer節(jié)點(diǎn)一一完成數(shù)據(jù)對(duì)比之后,就會(huì)進(jìn)行結(jié)果判定,但凡有一個(gè)節(jié)點(diǎn)數(shù)據(jù)不一致,都需要進(jìn)行重新注冊(cè),把最新的數(shù)據(jù)更新到NameServer,核心判斷邏輯同樣用紅框標(biāo)出。

至此,其實(shí)我們就已經(jīng)完成了 Broker在啟動(dòng)的時(shí)候會(huì)向所有NameServer進(jìn)行注冊(cè) 的驗(yàn)證。但是由于后續(xù)仍然有值得關(guān)注發(fā)光點(diǎn),我們繼續(xù)后續(xù)的源碼閱讀。

使用CountDownLatch獲取所有注冊(cè)異步任務(wù)的返回結(jié)果

除此之外,還值得注意的是在needRegister中,對(duì)于和多個(gè)NameServer的交互,RocketMQ是通過(guò)線程池異步實(shí)現(xiàn)的,同時(shí)使用了CountDownLatch來(lái)等待所有的請(qǐng)求結(jié)束,返回結(jié)果給主線程。

既然聊到了CountDownLatch,就順帶提一下。假設(shè)我們有5個(gè)互不依賴(lài)的計(jì)算任務(wù),如果快速的計(jì)算出結(jié)果并返回呢?那當(dāng)然是5個(gè)任務(wù)并發(fā)執(zhí)行,這就需要通過(guò)新開(kāi)線程實(shí)現(xiàn),結(jié)果就無(wú)法一起返回了。

而CountDownLatch可以讓主線程等待,等待這5個(gè)計(jì)算任務(wù)全部結(jié)束之后,喚醒主線程再繼續(xù)后面的邏輯。這就是CountDownLatch的作用,如果平時(shí)只是單純的CRUD功能的話,可能連CountDownLatch是什么都做不知道,這也是為什么大廠面試會(huì)問(wèn)這些問(wèn)題,因?yàn)樵诖髲S的復(fù)雜業(yè)務(wù)背景下,你必須要會(huì)使用它們。

指定需要注冊(cè)之后,接下來(lái)就是核心的注冊(cè)方法了,核心邏輯由registerBrokerAll來(lái)實(shí)現(xiàn)。Broker同樣會(huì)去每一個(gè)NameServer節(jié)點(diǎn)上注冊(cè)自己,并且為了提前執(zhí)行的效率,同樣開(kāi)線程采用了異步的方式。在獲取所有結(jié)果時(shí),同樣的使用了CountDownLatch。

使用CopyOnWriteArrayList存儲(chǔ)注冊(cè)請(qǐng)求的返回

除此之外,用于保存注冊(cè)結(jié)果的列表,使用的是CopyOnWriteArrayList,被面試虐過(guò)的同學(xué)應(yīng)該熟悉。我們知道此處開(kāi)啟了多線程去不同的NameServer注冊(cè),寫(xiě)入注冊(cè)結(jié)果的時(shí)候,多線程對(duì)同一個(gè)列表進(jìn)行寫(xiě)入,會(huì)產(chǎn)生線程安全的問(wèn)題。

而我們知道ArrayList是非線程安全的,這也是為什么此處要使用CopyOnWriteArrayList來(lái)保存注冊(cè)結(jié)果。為什么CopyOnWriteArrayList能夠保證線程安全?

這歸功于COW(Copy On Write),讀請(qǐng)求時(shí)共用同一個(gè)List,涉及到寫(xiě)請(qǐng)求時(shí),會(huì)復(fù)制出一個(gè)List,并在寫(xiě)入數(shù)據(jù)的時(shí)候加入獨(dú)占鎖。比起直接對(duì)所有操作加鎖,讀寫(xiě)鎖的形式分離了讀、寫(xiě)請(qǐng)求,使其互不影響,只對(duì)寫(xiě)請(qǐng)求加鎖,降低了加鎖的消耗,提升了整體操作的并發(fā)。

上面并發(fā)執(zhí)行的注冊(cè)操作,具體做了哪些事情呢?先看代碼。

上面就是單個(gè)注冊(cè)的所有邏輯,可以看到在構(gòu)建完請(qǐng)求之后,有一個(gè)oneway的判斷。

oneway值為false,表示單向通信,Broker不關(guān)心NameServer的返回,也不會(huì)觸發(fā)任何回調(diào)函數(shù)。接下來(lái)Broker就會(huì)把已經(jīng)寫(xiě)進(jìn)request body的所有數(shù)據(jù)發(fā)送給NameServer。請(qǐng)求數(shù)據(jù)統(tǒng)一由一個(gè)叫TopicConfigSerializeWrapper的Wrapper給包裹住。

其可以看為兩部分:

  • 存在該Broker節(jié)點(diǎn)上的所有Topic的數(shù)據(jù)
  • 數(shù)據(jù)版本

然后帶著這些數(shù)據(jù),Broker會(huì)同步的調(diào)用invokeSync發(fā)送請(qǐng)求給NameServe,并且在執(zhí)行之后觸發(fā)實(shí)現(xiàn)特定功能的回調(diào)函數(shù)。

EOF

至此,我們完成了對(duì)開(kāi)篇所提結(jié)論的驗(yàn)證,同時(shí)也發(fā)現(xiàn)了RocketMQ的主從架構(gòu)、Master和Slave同步數(shù)據(jù)的方式、心跳機(jī)制的實(shí)現(xiàn)等等,也基本從源碼中看完了Broker啟動(dòng)的所有流程??催@些老哥寫(xiě)的源碼還是挺有意思的,之后有時(shí)間隨緣再看看NameServer端相關(guān)的源碼吧。

 

責(zé)任編輯:武曉燕 來(lái)源: SH的全棧筆記
相關(guān)推薦

2024-09-26 00:22:24

2018-08-17 08:56:03

WindowsLinux系統(tǒng)

2021-02-26 13:59:41

RocketMQProducer底層

2020-12-10 10:32:33

區(qū)塊鏈比特幣數(shù)字貨幣

2022-05-02 16:18:22

RocketMQBrokertopic

2022-02-21 23:08:50

Kubernetes集群容器

2023-01-13 16:57:50

SpringBoot配置核心

2022-06-23 08:01:48

hookSetMap

2016-10-25 13:58:36

數(shù)據(jù)圖表化大數(shù)據(jù)

2021-12-02 07:50:30

字節(jié)緩沖流使用

2020-02-27 14:05:26

SQLServer數(shù)據(jù)庫(kù)

2021-05-11 16:44:42

Windows工具軟件

2020-12-09 09:39:52

SaaSLTV軟件

2017-08-28 14:47:54

NASSAN存儲(chǔ)

2025-03-12 00:22:00

2021-04-15 09:07:52

hotspotJavaC++

2018-03-21 12:13:47

工具數(shù)據(jù)開(kāi)發(fā)

2017-01-13 09:07:48

大數(shù)據(jù)互聯(lián)網(wǎng)數(shù)據(jù)挖掘

2022-04-22 13:04:43

微軟Windows 10

2024-07-29 09:30:23

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)