為什么kafka性能下降這么快,我用RocketMQ的時(shí)候不會(huì)這樣子
?Rocketmq和kafka這兩個(gè)消息隊(duì)列大家應(yīng)該都比較熟悉吧,哪怕不是很熟悉,應(yīng)該也聽(tīng)說(shuō)過(guò)的吧,你別告訴我,作為一個(gè)資深的程序員,你沒(méi)聽(tīng)過(guò)這兩門技術(shù)。
我之前使用這兩個(gè)消息隊(duì)列的時(shí)候就遇到一個(gè)很奇怪的問(wèn)題,就是在kafka里面弄了比較多的topic,性能下降的速度賊快,不知道大家遇到過(guò)沒(méi),而同樣的場(chǎng)景切換到消息隊(duì)列rocketmq中,下降速度卻沒(méi)有那么快。
不熟悉這倆消息隊(duì)列結(jié)構(gòu)的朋友,一聽(tīng)這個(gè)肯定還是不太清楚的,今天我來(lái)給大家分析分析這其中的原因,給大家解惑。
rocketmq的結(jié)構(gòu)
NameServer:主要是對(duì)元數(shù)據(jù)的管理,包括Topic和路由信息的管理,底層由netty實(shí)現(xiàn),是一個(gè)提供路由管理、路由注冊(cè)和發(fā)現(xiàn)的無(wú)狀態(tài)節(jié)點(diǎn),類似于ZooKeeper
Broker:消息中轉(zhuǎn)站,負(fù)責(zé)收發(fā)消息、持久化消息
Producer:消息的生產(chǎn)者,一般由業(yè)務(wù)系統(tǒng)來(lái)產(chǎn)生消息供消費(fèi)者消費(fèi)
Consumer:消息的消費(fèi)者,一般由業(yè)務(wù)系統(tǒng)來(lái)異步消費(fèi)消息
在RocketMQ中的每一條消息,都有一個(gè)Topic,用來(lái)區(qū)分不同的消息。一個(gè)主題一般會(huì)有多個(gè)消息的訂閱者,當(dāng)生產(chǎn)者發(fā)布消息到某個(gè)主題時(shí),訂閱了這個(gè)主題的消費(fèi)者都可以接收到生產(chǎn)者寫入的新消息。
在Topic中有分為了多個(gè)Queue,這其實(shí)是我們發(fā)送/讀取消息通道的最小單位,我們發(fā)送消息都需要指定某個(gè)寫入某個(gè)Queue,拉取消息的時(shí)候也需要指定拉取某個(gè)Queue,所以我們的順序消息可以基于我們的Queue維度保持隊(duì)列有序,如果想做到全局有序那么需要將Queue大小設(shè)置為1,這樣所有的數(shù)據(jù)都會(huì)在Queue中有序。
我們同一組Consumer會(huì)根據(jù)一些策略來(lái)選Queue,常見(jiàn)的比如平均分配或者一致性Hash分配。
要注意的是當(dāng)Consumer出現(xiàn)下線或者上線的時(shí)候,這里需要做重平衡,也就是Rebalance,RocketMQ的重平衡機(jī)制如下:
定時(shí)拉取broker,topic的最新信息,每隔20s做重平衡,隨機(jī)選取當(dāng)前Topic的一個(gè)主Broker,這里要注意的是不是每次重平衡所有主Broker都會(huì)被選中,因?yàn)闀?huì)存在一個(gè)Broker再多個(gè)Broker的情況。
獲取當(dāng)前Broker,當(dāng)前ConsumerGroup的所有機(jī)器ID。然后進(jìn)行策略分配。
由于重平衡是定時(shí)做的,所以這里有可能會(huì)出現(xiàn)某個(gè)Queue同時(shí)被兩個(gè)Consumer消費(fèi),所以會(huì)出現(xiàn)消息重復(fù)投遞。
Queue讀寫數(shù)量不一致
在RocketMQ中Queue被分為讀和寫兩種,在最開(kāi)始接觸RocketMQ的時(shí)候一直以為讀寫隊(duì)列數(shù)量配置不一致不會(huì)出現(xiàn)什么問(wèn)題的,比如當(dāng)消費(fèi)者機(jī)器很多的時(shí)候我們配置很多讀的隊(duì)列,但是實(shí)際過(guò)程中發(fā)現(xiàn)會(huì)出現(xiàn)消息無(wú)法消費(fèi)和根本沒(méi)有消息消費(fèi)的情況。
當(dāng)寫的隊(duì)列數(shù)量大于讀的隊(duì)列的數(shù)量,當(dāng)大于讀隊(duì)列這部分ID的寫隊(duì)列的數(shù)據(jù)會(huì)無(wú)法消費(fèi),因?yàn)椴粫?huì)將其分配給消費(fèi)者。
當(dāng)讀的隊(duì)列數(shù)量大于寫的隊(duì)列數(shù)量,那么多的隊(duì)列數(shù)量就不會(huì)有消息被投遞進(jìn)來(lái)。
rocketmq中的存儲(chǔ)機(jī)制
RocketMQ憑借其強(qiáng)大的存儲(chǔ)能力和強(qiáng)大的消息索引能力,以及各種類型消息和消息的特性脫穎而出,于是乎,我們這些有夢(mèng)想的程序員學(xué)習(xí)RocketMQ的存儲(chǔ)原理也變得尤為重要
而要說(shuō)起這個(gè)存儲(chǔ)原理,則不得不說(shuō)的就是RocketMQ的消息存儲(chǔ)文件commitLog文件,消費(fèi)方則是憑借著巧妙的設(shè)計(jì)Consumerqueue文件來(lái)進(jìn)行高性能并且不混亂的消費(fèi),還有RocketMQ的強(qiáng)大的支持消息索引的特性,靠的就是indexfile索引文件
我們這篇文章就從這commitLog、Consumerqueue、indexfile這三個(gè)神秘的文件說(shuō)起,搞懂這三個(gè)文件,RocketMQ的核心就被你掏空了
先上個(gè)圖,寫入commitLog文件時(shí)commitLog和Consumerqueue、indexfile文件三者的關(guān)系
commitLog
RocketMQ中的消息存儲(chǔ)文件放在${ROCKET_HOME}/store 目錄下,當(dāng)生產(chǎn)者發(fā)送消息時(shí),broker會(huì)將消息存儲(chǔ)到Commit文件夾下,文件夾下面會(huì)有一個(gè)commitLog文件,但是并不是意味著這個(gè)文件叫這個(gè),文件命名是根據(jù)消息的偏移量來(lái)決定的。
文件有自己的生成規(guī)則,每個(gè)commitLog文件的大小是1G,一般情況下第一個(gè) CommitLog 的起始偏移量為 0,第二個(gè) CommitLog 的起始偏移量為 1073741824 (1G = 1073741824byte)。
commitLog文件的最大的一個(gè)特點(diǎn)就是消息的順序?qū)懭耄S機(jī)讀寫,所有的topic的消息都存儲(chǔ)到commitLog文件中,順序?qū)懭肟梢猿浞值睦么疟P順序減少了IO爭(zhēng)用數(shù)據(jù)存儲(chǔ)的性能,kafka也是通過(guò)硬盤順序存盤的。
大家都常說(shuō)硬盤的速度比內(nèi)存慢,其實(shí)這句話也是有歧義的,當(dāng)硬盤順序?qū)懭牒妥x取的時(shí)候,速度不比內(nèi)存慢,甚至比內(nèi)存速度快,這種存儲(chǔ)方式就好比數(shù)組,我們?nèi)绻罃?shù)組的下標(biāo),則可以直接通過(guò)下標(biāo)計(jì)算出位置,找到內(nèi)存地址,眾所周知,數(shù)組的讀取是很快的,但是數(shù)組的缺點(diǎn)在于插入數(shù)據(jù)比較慢,因?yàn)槿绻谥虚g插入數(shù)據(jù)需要將后面的數(shù)據(jù)往后移動(dòng)。
而對(duì)于數(shù)組來(lái)說(shuō),如果我們只會(huì)順序的往后添加,數(shù)組的速度也是很快的,因?yàn)閿?shù)組沒(méi)有后續(xù)的數(shù)據(jù)的移動(dòng),這一操作很耗時(shí)。
回到RocketMQ中的commitLog文件,也是同樣的道理,順序的寫入文件也就不需要太多的去考慮寫入的位置,直接找到文件往后放就可以了,而取數(shù)據(jù)的時(shí)候,也是和數(shù)組一樣,我們可以通過(guò)文件的大小去精準(zhǔn)的定位到哪一個(gè)文件,然后再精準(zhǔn)的定位到文件的位置。
consumerqueue文件
RocketMQ是分為多個(gè)topic,消息所屬主題,屬于消息類型,每一個(gè)topic有多個(gè)queue,每個(gè)queue放著不同的消息,在同一個(gè)消費(fèi)者組下的消費(fèi)者,可以同時(shí)消費(fèi)同一個(gè)topic下的不同queue隊(duì)列的消息。不同消費(fèi)者下的消費(fèi)者,可以同時(shí)消費(fèi)同一個(gè)topic下的相同的隊(duì)列的消息。而同一個(gè)消費(fèi)者組下的消費(fèi)者,不可以同時(shí)消費(fèi)不同topic下的消息。
而每個(gè)topic下的queue隊(duì)列都會(huì)對(duì)應(yīng)一個(gè)Consumerqueue文件,這個(gè)存儲(chǔ)的就是對(duì)應(yīng)的commitLog文件中的索引位置,而不用存儲(chǔ)真實(shí)的消息。
consumequeue存放在store文件里面,里面的consumequeue文件里面按照topic排放,然后每個(gè)topic默認(rèn)4個(gè)隊(duì)列,里面存放的consumequeue文件。
ConsumeQueue中并不需要存儲(chǔ)消息的內(nèi)容,而存儲(chǔ)的是消息在CommitLog中的offset。也就是說(shuō)ConsumeQueue其實(shí)是CommitLog的一個(gè)索引文件。
consumequeue是定長(zhǎng)結(jié)構(gòu),每個(gè)記錄固定大小20個(gè)字節(jié),單個(gè)consumequeue文件默認(rèn)包含30w個(gè)條目,所以單個(gè)文件大小大概6M左右。
很顯然,Consumer消費(fèi)消息的時(shí)候,要讀2次:先讀ConsumeQueue得到offset,再通過(guò)offset找到CommitLog對(duì)應(yīng)的消息內(nèi)容。
IndexFile
RocketMQ還支持通過(guò)MessageID或者M(jìn)essageKey來(lái)查詢消息,使用ID查詢時(shí),因?yàn)镮D就是用broker+offset生成的(這里msgId指的是服務(wù)端的),所以很容易就找到對(duì)應(yīng)的commitLog文件來(lái)讀取消息。
對(duì)于用MessageKey來(lái)查詢消息,MessageStore通過(guò)構(gòu)建一個(gè)index來(lái)提高讀取速度。
indexfile文件存儲(chǔ)在store目錄下的index文件里面,里面存放的是消息的hashcode和index內(nèi)容,文件由一個(gè)文件頭組成:長(zhǎng)40字節(jié)。500w個(gè)hashslot,每個(gè)4字節(jié)。2000w個(gè)index條目,每個(gè)20字節(jié)。
所以這里我們可以估算每個(gè)indexfile的大小為:40+500w4+2000w20個(gè)字節(jié),大約400M左右。
每放入一個(gè)新消息的index進(jìn)來(lái),首先會(huì)取MessageKey的HashCode,然后用Hashcode對(duì)slot的總數(shù)進(jìn)行取模,決定該消息key的位置,slot的總數(shù)默認(rèn)是500W個(gè)。
只要取hash就必然面臨著hash沖突的問(wèn)題,indexfile也是采用鏈表結(jié)構(gòu)來(lái)解決hash沖突,這一點(diǎn)和HashMap一樣的,不過(guò)這個(gè)不存在紅黑樹(shù)轉(zhuǎn)換這一說(shuō),個(gè)人猜測(cè)這個(gè)的沖突數(shù)量也達(dá)不到很高的級(jí)別,所以進(jìn)行這方面的設(shè)計(jì)也沒(méi)啥必要,甚至變成了強(qiáng)行增加indexfile的文件結(jié)構(gòu)難度。
還有,在indexfile中的slot中放的是最新的index的指針,因?yàn)橐话悴樵兊臅r(shí)候大概率是優(yōu)先查詢最近的消息。
每個(gè)slot中放的指針值是索引在indexfile中的偏移量,也就是后面index的位置,而index中存放的就是該消息在commitlog文件中的offset,每個(gè)index的大小是20字節(jié),所以根據(jù)當(dāng)前索引是這個(gè)文件中的第幾個(gè)偏移量,也就很容易定位到索引的位置,根據(jù)前面的固定大小可以很快把真實(shí)坐標(biāo)算出來(lái),以此類推,形成一個(gè)鏈表的結(jié)構(gòu)。
kafka的結(jié)構(gòu)
Broker:消息中間件處理節(jié)點(diǎn)(服務(wù)器),一個(gè)節(jié)點(diǎn)就是一個(gè)broker,一個(gè)Kafka集群由一個(gè)或多個(gè)broker組成。
Topic:Kafka對(duì)消息進(jìn)行歸類,發(fā)送到集群的每一條消息都要指定一個(gè)topic。
Partition:物理上的概念,每個(gè)topic包含一個(gè)或多個(gè)partition,一個(gè)partition對(duì)應(yīng)一個(gè)文件夾,這個(gè)文件夾下存儲(chǔ)partition的數(shù)據(jù)和索引文件,每個(gè)partition內(nèi)部是有序的。
Producer:生產(chǎn)者,負(fù)責(zé)發(fā)布消息到broker。
Consumer:消費(fèi)者,從broker讀取消息。
ConsumerGroup:每個(gè)consumer屬于一個(gè)特定的consumer group,可為每個(gè)consumer指定group name,若不指定,則屬于默認(rèn)的group,一條消息可以發(fā)送到不同的consumer group,但一個(gè)consumer group中只能有一個(gè)consumer能消費(fèi)這條消息。
kafka存儲(chǔ)機(jī)制
我們的生產(chǎn)者會(huì)決定發(fā)送到哪個(gè) Partition,如果沒(méi)有 Key 值則進(jìn)行輪詢發(fā)送。
如果有 Key 值,對(duì) Key 值進(jìn)行 Hash,然后對(duì)分區(qū)數(shù)量取余,保證了同一個(gè) Key 值的會(huì)被路由到同一個(gè)分區(qū)。(所有系統(tǒng)的partition都是同一個(gè)路數(shù))。
總所周知,topic在物理層面以partition為分組,一個(gè)topic可以分成若干個(gè)partition,那么topic以及partition又是怎么存儲(chǔ)的呢?
其實(shí)partition還可以細(xì)分為logSegment,一個(gè)partition物理上由多個(gè)logSegment組成,那么這些segment又是什么呢?
LogSegment 文件由兩部分組成,分別為“.index”文件和“.log”文件,分別表示為 Segment 索引文件和數(shù)據(jù)文件。
這兩個(gè)文件的命令規(guī)則為:partition全局的第一個(gè)segment從0開(kāi)始,后續(xù)每個(gè)segment文件名為上一個(gè)segment文件最后一條消息的offset值,數(shù)值大小為64位,20位數(shù)字字符長(zhǎng)度,沒(méi)有數(shù)字用0填充,如下:
第一個(gè)segment
00000000000000000000.index
00000000000000000000.log
第二個(gè)segment,文件命名以第一個(gè)segment的最后一條消息的offset組成
00000000000000170410.index
00000000000000170410.log
第三個(gè)segment,文件命名以上一個(gè)segment的最后一條消息的offset組成
00000000000000239430.index
00000000000000239430.log
“.index”索引文件存儲(chǔ)大量的元數(shù)據(jù),“.log”數(shù)據(jù)文件存儲(chǔ)大量的消息,索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址。
kafka和rocketmq的比較
RocketMQ和Kafka的存儲(chǔ)核心設(shè)計(jì)有很大的不同,所以其在寫入性能方面也有很大的差別,這是16年阿里中間件團(tuán)隊(duì)對(duì)RocketMQ和Kafka不同Topic下做的性能測(cè)試:
從圖上可以看出:
Kafka在Topic數(shù)量由64增長(zhǎng)到256時(shí),吞吐量下降了98.37%。
RocketMQ在Topic數(shù)量由64增長(zhǎng)到256時(shí),吞吐量只下降了16%。
這是為什么呢?
kafka一個(gè)topic下面的所有消息都是以partition的方式分布式的存儲(chǔ)在多個(gè)節(jié)點(diǎn)上。同時(shí)在kafka的機(jī)器上,每個(gè)Partition其實(shí)都會(huì)對(duì)應(yīng)一個(gè)日志目錄,在目錄下面會(huì)對(duì)應(yīng)多個(gè)日志分段。
所以如果Topic很多的時(shí)候Kafka雖然寫文件是順序?qū)懀珜?shí)際上文件過(guò)多,會(huì)造成磁盤IO競(jìng)爭(zhēng)非常激烈。
那RocketMQ為什么在多Topic的情況下,依然還能很好的保持較多的吞吐量呢?我們首先來(lái)看一下RocketMQ中比較關(guān)鍵的文件:
rocketmq中的消息主體數(shù)據(jù)并沒(méi)有像Kafka一樣寫入多個(gè)文件,而是寫入一個(gè)文件,這樣我們的寫入IO競(jìng)爭(zhēng)就非常小,可以在很多Topic的時(shí)候依然保持很高的吞吐量。
有人可能說(shuō)這里的ConsumeQueue寫是在不停的寫入呢,并且ConsumeQueue是以Queue維度來(lái)創(chuàng)建文件,那么文件數(shù)量依然很多,在這里ConsumeQueue的寫入的數(shù)據(jù)量很小,每條消息只有20個(gè)字節(jié),30W條數(shù)據(jù)也才6M左右,所以其實(shí)對(duì)我們的影響相對(duì)Kafka的Topic之間影響是要小很多的。
再順便提一嘴,一個(gè)topic分了一萬(wàn)個(gè)partition和一萬(wàn)個(gè)topic每個(gè)topic都是單partition對(duì)于kafka的負(fù)載是一樣的。