C#分布式消息隊(duì)列 EQueue 2.0 發(fā)布啦
前言
最近花了我?guī)讉€(gè)月的業(yè)余時(shí)間,對(duì)EQueue做了一個(gè)重大的改造,消息持久化采用本地寫文件的方式。到現(xiàn)在為止,總算完成了,所以第一時(shí)間寫文章分享給大家這段時(shí)間我所積累的一些成果。
-
EQueue開源地址:https://github.com/tangxuehua/equeue
-
EQueue相關(guān)文檔:http://www.cnblogs.com/netfocus/category/598000.html
-
EQueue Nuget地址:http://www.nuget.org/packages/equeue
昨天,我寫過一篇關(guān)于EQueue 2.0性能測(cè)試結(jié)果的文章,有興趣的可以看看。
文章地址:http://www.cnblogs.com/netfocus/p/4926305.html
為什么要改為文件存儲(chǔ)?
SQL Server的問題
之前EQueue的消息持久化是采用SQL Server的。一開始我覺得沒什么問題,采用的是異步定時(shí)批量持久化,使用SqlBulkCopy的方法,這個(gè)方法測(cè)試下來,批量插入消息的性能還不 錯(cuò),就決定使用了。一開始我并沒有在使用到EQueue后做集成的性能測(cè)試。在功能上確實(shí)沒什么問題了。而且使用DB持久化也有很多好處,比如消息查詢很 簡(jiǎn)單,DB天生支持各種方式的查詢。刪除消息也非常簡(jiǎn)單,一條DELETE語句即可。所以功能實(shí)現(xiàn)比較順利。但后來當(dāng)我對(duì)EQueue做性能測(cè)試時(shí),發(fā)現(xiàn) 一些問題。當(dāng)數(shù)據(jù)庫服務(wù)器和Broker本身部署在不同的服務(wù)器上時(shí),持久化消息也會(huì)走網(wǎng)卡,消耗帶寬,影響消息的發(fā)送和消費(fèi)的TPS。而如果數(shù)據(jù)庫服務(wù) 器部署在Broker同一臺(tái)服務(wù)器上,則因?yàn)镾QLServer本身也會(huì)消耗CPU以及內(nèi)存,也會(huì)影響B(tài)roker的消息發(fā)送和消費(fèi)的TPS。另外 SqlBulkCopy的速度,再本身機(jī)器正在接收大量的發(fā)送消息和拉取消息的請(qǐng)求時(shí),會(huì)不太穩(wěn)定。經(jīng)過一些測(cè)試,發(fā)現(xiàn)整個(gè)EQueue Broker的性能不太理想。然后又想想,Broker服務(wù)器有有一個(gè)硬件一直沒有好好利用起來,那就是硬盤。假設(shè)我們的消息是持久化到本地硬盤的,順序 寫文件,就應(yīng)該能解決SQL Server的問題了。所以,開始調(diào)研如何實(shí)現(xiàn)文件持久化消息的方案了。
消息緩存在托管內(nèi)存的GC的問題
之前消息存儲(chǔ)在SQL Server,如果消費(fèi)者每次讀取消息時(shí),總是從數(shù)據(jù)庫去讀取,那對(duì)數(shù)據(jù)庫就是不斷的寫入和讀取,性能不太理想。所以當(dāng)初的思路是,盡量把最近可能要被消 費(fèi)的消息緩存在本地內(nèi)存中。當(dāng)初的做法是設(shè)計(jì)了一個(gè)很大的ConcurrentDictionary<long, Message>,這個(gè)字典就是存放了所有可能會(huì)被消費(fèi)的消息。如果要消費(fèi)的消息當(dāng)前不在這個(gè)字典里,就批量從DB拉取一批出來消費(fèi)。這個(gè)設(shè)計(jì)可以 盡可能的避免讀取DB的情況。但是帶來了另一個(gè)問題。就是我們對(duì)這個(gè)字典在高并發(fā)不斷的寫入和讀取。且這個(gè)字典里緩存的消息又很多,到到達(dá)幾百上千萬 時(shí),GC的壓力過大,導(dǎo)致很多線程都會(huì)被阻塞。嚴(yán)重影響B(tài)roker的TPS。
所以,基于上面的兩個(gè)主要原因,我想到了兩個(gè)思路來解決:1)采用寫文件的方式來持久化消息;2)使用非托管內(nèi)存來緩存將要被消費(fèi)的消息;下面我們來看看這兩個(gè)設(shè)計(jì)的一些關(guān)鍵問題的設(shè)計(jì)思路。
文件存儲(chǔ)的關(guān)鍵問題設(shè)計(jì)
心路背景
之前一直無法駕馭寫文件的設(shè)計(jì)。因?yàn)榫?xì)化的將數(shù)據(jù)寫入文件,并能要精確的讀取想要的數(shù)據(jù),實(shí)在沒什么經(jīng)驗(yàn)。之前雖然也知道阿里的RocketMQ 的消息持久化也是采用順序?qū)懳募姆绞降?,但是看了代碼,發(fā)現(xiàn)設(shè)計(jì)很復(fù)雜,一下子也比較難懂。嘗試看了多次也無法完全理解。所以一直無法掌握這種方式。有 一天不經(jīng)意間想到之前看過的EventStore這個(gè)開源項(xiàng)目中,也有寫文件的設(shè)計(jì)。這個(gè)項(xiàng)目是CQRS架構(gòu)之父greg young所主導(dǎo)的開源項(xiàng)目,是一個(gè)專門為ES(Event Sourcing)設(shè)計(jì)模式中提供保存事件流支持的事件流存儲(chǔ)系統(tǒng)。于是下定決心專研其源碼,看C#代碼肯定還是比Java容易,呵呵。經(jīng)過一段時(shí)間的摸 索之后,基本學(xué)到了它是如何寫文件以及如何讀文件的。了解了很多設(shè)計(jì)思路。然后,在看懂了EventStore的文件存儲(chǔ)設(shè)計(jì)之后,再去看 RocketMQ的文件持久化的設(shè)計(jì),發(fā)現(xiàn)驚人的相似。原來看不懂的代碼現(xiàn)在也能看懂了,因?yàn)樗悸凡畈欢嗟?。所以,這給我開始動(dòng)手提供了很大的信心。經(jīng)過 自己的一些準(zhǔn)備(文件讀寫的性能驗(yàn)證)和設(shè)計(jì)思路整理后,終于開始動(dòng)手了。
如何寫消息到文件?
其實(shí)說出來也很簡(jiǎn)單。之前一直以為寫文件就是一個(gè)消息一行唄。這樣當(dāng)我們要找哪個(gè)消息時(shí),只需要知道行號(hào)即可。確實(shí),理論上這樣也挺好。但上面這兩 個(gè)開源項(xiàng)目都不是這樣做的,而是都是采用更精細(xì)化的直接寫二進(jìn)制的方式。搞清楚寫入的格式之后,還要考慮一個(gè)文件寫不下的時(shí)候怎么辦?因?yàn)橐粋€(gè)文件總是有 大小的,比如1G,那超過1G后,必然要?jiǎng)?chuàng)建新的文件,然后把消息寫入新的文件。所以,我們就又有了Chunk的概念。一個(gè)Chunk就是一個(gè)文件,假設(shè) 我們現(xiàn)在實(shí)現(xiàn)了一個(gè)FileMessageStore,表示對(duì)文件持久化的封裝,那這個(gè)FileMessageStore肯定維護(hù)了一堆的Chunk。然 后我們也很容易想到一點(diǎn),就是Chunk有3種狀態(tài):1)New,表示剛創(chuàng)建的Chunk,這種Chunk我們可以寫入新消息進(jìn) 去;2)Completed,已寫入完成的Chunk,這種Chunk是只讀的;3)OnGoing的Chunk,就是當(dāng) FileMessageStore初始化時(shí),要從磁盤的某個(gè)chunk的目錄下加載所有的Chunk文件,那不難理解,最后一個(gè)文件之前的Chunk文件 應(yīng)該都是Completed的;最后一個(gè)Chunk文件可能寫入了一半,就是之前沒完全用完的。所以,本質(zhì)上New和Ongoing的Chunk其實(shí)是一 樣的,只是初始化的方式不同。
至此,我們知道了寫文件的兩個(gè)關(guān)鍵思路:1)按二進(jìn)制寫;2)拆分為Chunk文件,且每個(gè)Chunk文件有狀態(tài);按二進(jìn)制寫主要的思路是,假如我 們當(dāng)前要寫入的消息的二進(jìn)制數(shù)組大小為100個(gè)字節(jié),也就是說消息的長(zhǎng)度為100,那我們可以先把消息的長(zhǎng)度寫入文件,再接著寫入消息本身。這樣我們讀取 消息時(shí),只要知道了寫入消息長(zhǎng)度時(shí)的那個(gè)Position,就能先讀取到消息的長(zhǎng)度,然后就能知道接下來要讀取多少字節(jié)為消息內(nèi)容。從而能正確讀取消息出 來。
另外再分享一點(diǎn),EventStore中,寫入一個(gè)事件到文件中時(shí),還會(huì)在寫入消息內(nèi)容后再寫入這個(gè)消息的長(zhǎng)度到文件里。也就是說,寫入一個(gè)數(shù)據(jù)到 文件時(shí),會(huì)在頭尾都寫入該數(shù)據(jù)的長(zhǎng)度。這樣做的好處是什么呢?就是當(dāng)我們想從后往前讀數(shù)據(jù)時(shí),也能方便的做到,因?yàn)槊總€(gè)數(shù)據(jù)的前后都記錄了該數(shù)據(jù)的長(zhǎng)度。 這點(diǎn)應(yīng)該不難理解吧?而EventStore是一個(gè)面向流的存儲(chǔ)系統(tǒng),我們對(duì)事件流確實(shí)可能從前往后讀,也可能是從后往前讀。另外這個(gè)設(shè)計(jì)還有一個(gè)好處, 就是起到了校驗(yàn)數(shù)據(jù)合法性的目的。當(dāng)我們根據(jù)長(zhǎng)度讀取數(shù)據(jù)后,再數(shù)據(jù)之后再讀取一個(gè)長(zhǎng)度,如果這兩個(gè)長(zhǎng)度一致,那數(shù)據(jù)應(yīng)該就沒問題的。在RocketMQ 中,是通過CRC校驗(yàn)的方式來保證讀取的數(shù)據(jù)沒有問題。我個(gè)人還是比較喜歡EventStore的做法。所以EQueue里現(xiàn)在寫入數(shù)據(jù)就是這樣做的。
上面我介紹了一種寫入不定長(zhǎng)數(shù)據(jù)到文件的設(shè)計(jì)思路,這種設(shè)計(jì)是為了解決寫入消息到文件的情況,因?yàn)橄⒌拈L(zhǎng)度是不定的。在EQueue中,我們還有 一另一種寫文件的場(chǎng)景。就是隊(duì)列信息的持久化。EQueue的架構(gòu)是一個(gè)Topic下有多個(gè)Queue,每個(gè)Queue里有很多消息,消費(fèi)者負(fù)載均衡是通 過給消費(fèi)者分配均勻數(shù)量的Queue的方式來達(dá)到的。這樣我們只要確保寫入Queue的消息是均勻的,那每個(gè)Consumer消費(fèi)到的消息數(shù)就是均勻的。 那一個(gè)Queue里記錄的是什么呢?就是一個(gè)消息和其在隊(duì)列的位置的對(duì)應(yīng)關(guān)系。假設(shè)消息寫入在文件的物理位置為10000,然后這個(gè)消息在Queue里的 索引是100,那這個(gè)隊(duì)列就會(huì)把這兩個(gè)位置對(duì)應(yīng)起來。這樣當(dāng)我們要消費(fèi)這個(gè)Queue中索引為100的消息時(shí),就能找到這個(gè)消息在文件中的物理位置為 10000,就能根據(jù)這個(gè)位置找到消息的內(nèi)容了。如果是托管內(nèi)存,我們只需要弄一個(gè)Dictionary,key是消息在隊(duì)列中的 Offset,value是消息在文件中的物理Offset即可。這樣我們有了這個(gè)dict,就能輕松建立起對(duì)應(yīng)關(guān)系了。但上面我說過,這種巨大的 dict是要占用內(nèi)存的,會(huì)有GC的問題。所以更好的辦法是,把這個(gè)對(duì)應(yīng)關(guān)系也寫入文件,那怎么做呢?這時(shí)就又需要更精細(xì)化的設(shè)計(jì)了。想到了其實(shí)也很簡(jiǎn) 單,這個(gè)設(shè)計(jì)我是從RocketMQ中學(xué)到的。就是我們?cè)O(shè)計(jì)一種固定長(zhǎng)度的結(jié)構(gòu)體,這個(gè)結(jié)構(gòu)體里就存放一個(gè)數(shù)據(jù),就是消息在文件的物理位置(為了后面好表 達(dá),我命名為MessagePosition),一個(gè)Long值,一個(gè)Long的長(zhǎng)度是8個(gè)字節(jié)。也就是說,這個(gè)文件中,每個(gè)寫入的數(shù)據(jù)的長(zhǎng)度都是8個(gè)字 節(jié)。假設(shè)我們一個(gè)文件要保存100W個(gè)MessagePosition。那這個(gè)文件的長(zhǎng)度就是100W * 8這么多字節(jié),大概為7.8MB。那么這樣做有什么好處呢?好處就是,假如我們現(xiàn)在要消費(fèi)這個(gè)Queue里的第一個(gè)消息,那這個(gè)消息的 MessagePosition在這個(gè)文件中的位置0,第二個(gè)消息在這個(gè)文件中的位置是8,第三個(gè)就是16,以此類推,第N 個(gè)消息就是(N-1) * 8。也就是說,我們無須顯式的把消息在隊(duì)列中的位置信息也寫入到文件,而是通過這樣的固定算法,就能精確的算出Queue中某個(gè)消息的 MessagePosition是寫入在文件的哪個(gè)位置。然后拿到了MessagePosition之后,就能從Message的Chunk文件中讀取到 這個(gè)消息了。
通過上面的分析,我們知道了,Producer發(fā)送一個(gè)消息到Broker時(shí),Broker會(huì)寫兩次磁盤。一次是現(xiàn)將消息本身寫入磁盤 (Message Chunk里),另一次是將消息的寫入位置寫入到磁盤(Queue Chunk里)。細(xì)心的朋友可能會(huì)問,假如我第一次寫入成功,但第二次寫入時(shí)失敗,比如正好機(jī)器斷電或者當(dāng)前Broker服務(wù)器正好出啥問題 了,沒有寫入成功。那怎么辦呢?這個(gè)沒有什么大的影響。因?yàn)槭紫冗@種情況會(huì)被認(rèn)為是消息發(fā)送失敗。所以Producer還會(huì)重新發(fā)送該消息,然后 Broker收到消息后還會(huì)再做一次這兩個(gè)寫入操作。也就是說,第一次寫入的消息內(nèi)容永遠(yuǎn)也不會(huì)用到了,因?yàn)槟莻€(gè)寫入位置永遠(yuǎn)也不會(huì)在Queue Chunk里有記錄。
下面的代碼展示了寫消息到文件的核心代碼:
- //消息寫文件需要加鎖,確保順序?qū)懳募?/span>
- MessageStoreResult result = null;
- lock (_syncObj)
- {
- var queueOffset = queue.NextOffset;
- var messageRecord = _messageStore.StoreMessage(queueId, queueOffset, message);
- queue.AddMessage(messageRecord.LogPosition, message.Tag);
- queue.IncrementNextOffset();
- result = new MessageStoreResult(messageRecord.MessageId, message.Code, message.Topic, queueId, queueOffset, message.Tag);
- }
StoreMessage方法內(nèi)部實(shí)現(xiàn):
- public MessageLogRecord StoreMessage(int queueId, long queueOffset, Message message)
- {
- var record = new MessageLogRecord(
- message.Topic,
- message.Code,
- message.Body,
- queueId,
- queueOffset,
- message.CreatedTime,
- DateTime.Now,
- message.Tag);
- _chunkWriter.Write(record);
- return record;
- }
queue.AddMessage方法的內(nèi)部實(shí)現(xiàn):
- public void AddMessage(long messagePosition, string messageTag)
- {
- _chunkWriter.Write(new QueueLogRecord(messagePosition + 1, messageTag.GetHashcode2()));
- }
ChunkWriter的內(nèi)部實(shí)現(xiàn):
- public long Write(ILogRecord record)
- {
- lock (_lockObj)
- {
- if (_isClosed)
- {
- throw new ChunkWriteException(_currentChunk.ToString(), "Chunk writer is closed.");
- }
- //如果當(dāng)前文件已經(jīng)寫完,則需要新建文件
- if (_currentChunk.IsCompleted)
- {
- _currentChunk = _chunkManager.AddNewChunk();
- }
- //先嘗試寫文件
- var result = _currentChunk.TryAppend(record);
- //如果當(dāng)前文件已滿
- if (!result.Success)
- {
- //結(jié)束當(dāng)前文件
- _currentChunk.Complete();
- //新建新的文件
- _currentChunk = _chunkManager.AddNewChunk();
- //再嘗試寫入新的文件
- result = _currentChunk.TryAppend(record);
- //如果還是不成功,則報(bào)錯(cuò)
- if (!result.Success)
- {
- throw new ChunkWriteException(_currentChunk.ToString(), "Write record to chunk failed.");
- }
- }
- //如果需要同步刷盤,則立即同步刷盤
- if (_chunkManager.Config.SyncFlush)
- {
- _currentChunk.Flush();
- }
- //返回?cái)?shù)據(jù)寫入位置
- return result.Position;
- }
- }
當(dāng)然,我上面為了簡(jiǎn)化問題的復(fù)雜度。所以沒有引入關(guān)于如何根據(jù)某個(gè)全局的MessagePosition找到其在哪個(gè)Message Chunk的問題。這個(gè)其實(shí)也很好做,我們首先固定好每個(gè)Message Chunk文件的大小。比如大小為256MB,然后我們?yōu)槊總€(gè)Chunk文件設(shè)計(jì)一個(gè)ChunkHeader,每個(gè)Chunk文件總是先把這個(gè) ChunkHeader寫入文件,這個(gè)Header里記錄了這個(gè)文件的起始位置和結(jié)束位置,以及文件的大小。這樣我們根據(jù)某個(gè) MessagePosition計(jì)算其在哪個(gè)Chunk文件時(shí),只需要把這個(gè)MessagePositon對(duì)Chunk的大小做取摸操作即可。根據(jù)數(shù)據(jù)的 位置找其在哪個(gè)Chunk的代碼看起來如下面這樣這樣:
- public Chunk GetChunkFor(long dataPosition)
- {
- var chunkNum = (int)(dataPosition / _config.GetChunkDataSize());
- return GetChunk(chunkNum);
- }
- public Chunk GetChunk(int chunkNum)
- {
- if (_chunks.ContainsKey(chunkNum))
- {
- return _chunks[chunkNum];
- }
- return null;
- }
代碼很簡(jiǎn)單,就不多講了。拿到了Chunk對(duì)象后,我們就可以把dataPosition傳給Chunk,然后Chunk內(nèi)部把這個(gè)全局的 dataPosition轉(zhuǎn)換為本地的一個(gè)位置,就能準(zhǔn)確的定位到這個(gè)數(shù)據(jù)在當(dāng)前Chunk文件的實(shí)際位置了。將全局位置轉(zhuǎn)換為本地的位置的算法也很簡(jiǎn)單 直接:
- public int GetLocalDataPosition(long globalDataPosition)
- {
- if (globalDataPosition < ChunkDataStartPosition || globalDataPosition > ChunkDataEndPosition)
- {
- throw new Exception(string.Format("globalDataPosition {0} is out of chunk data positions [{1}, {2}].",
- globalDataPosition, ChunkDataStartPosition, ChunkDataEndPosition));
- }
- return (int)(globalDataPosition - ChunkDataStartPosition);
- }
只需要把這個(gè)全局的位置減去當(dāng)前Chunk的數(shù)據(jù)開始位置,就能知道這個(gè)全局位置相對(duì)于當(dāng)前Chunk的本地位置了。
好了,上面介紹了消息如何寫入的主要思路以及如何讀取數(shù)據(jù)的思路。
另外一點(diǎn)還想提一下,就是關(guān)于刷盤的策略。一般我們寫數(shù)據(jù)到文件后,是需要調(diào)用文件流的Flush方法的,確保數(shù)據(jù)最終刷入到了磁盤上。否則數(shù)據(jù)就 還是在緩沖區(qū)里。當(dāng)然,我們需要注意到,即便調(diào)用了Flush方法,數(shù)據(jù)可能也還沒真正邏輯到磁盤,而只是在操作系統(tǒng)內(nèi)部的緩沖區(qū)里。這個(gè)我們就無法控制 了,我們能做到的是調(diào)用了Flush方法即可。那當(dāng)我們每次寫入一個(gè)數(shù)據(jù)到文件都要調(diào)用Flush方法的話,無疑性能是低下的,所以就有了所謂的異步刷盤 的設(shè)計(jì)。就是我們寫入消息后不立即調(diào)用Flush方法,而是采用一個(gè)獨(dú)立的線程,定時(shí)調(diào)用Flush方法來實(shí)現(xiàn)刷盤。目前EQueue支持同步刷盤和異步 刷盤,開發(fā)者可以自己配置決定采用哪一種。異步刷盤的間隔默認(rèn)是100ms。當(dāng)我們?cè)谧非蟾咄掏铝繒r(shí),應(yīng)該考慮異步刷盤,但要求數(shù)據(jù)可靠性更高但對(duì)吞吐量 可以低一點(diǎn)時(shí),則可以使用同步刷盤。如果又要高吞吐又要數(shù)據(jù)高可靠,那就只有一個(gè)辦法了,呵呵。就是多增加一些Broker機(jī)器,通過集群來彌補(bǔ)單臺(tái) Broker寫入數(shù)據(jù)的瓶頸。
#p#
如何從文件讀取消息?
假設(shè)我們現(xiàn)在要從一個(gè)文件讀取數(shù)據(jù),且是多線程并發(fā)的讀取,要怎么設(shè)計(jì)?一個(gè)辦法是,每次讀取時(shí),創(chuàng)建文件流,然后創(chuàng)建StreamReader, 然后讀取文件,讀取完成后釋放StreamReader并關(guān)閉文件流。但每次要讀取文件的一個(gè)數(shù)據(jù)都要這樣做的話性能不是太好,因?yàn)槲覀兎磸?fù)的創(chuàng)建這樣的 對(duì)象。所以,這里我們可以使用對(duì)象池的概念。就是Chunk內(nèi)部,預(yù)先創(chuàng)建好一些Reader,當(dāng)需要讀文件時(shí),獲取一個(gè)可用的Reader,讀取完成 后,再把Reader歸還到對(duì)象池里?;谶@個(gè)思路,我設(shè)計(jì)了一個(gè)簡(jiǎn)單的對(duì)象池:
- private readonly ConcurrentQueue<ReaderWorkItem> _readerWorkItemQueue = new ConcurrentQueue<ReaderWorkItem>();
- private void InitializeReaderWorkItems()
- {
- for (var i = 0; i < _chunkConfig.ChunkReaderCount; i++)
- {
- _readerWorkItemQueue.Enqueue(CreateReaderWorkItem());
- }
- _isReadersInitialized = true;
- }
- private ReaderWorkItem CreateReaderWorkItem()
- {
- var stream = default(Stream);
- if (_isMemoryChunk)
- {
- stream = new UnmanagedMemoryStream((byte*)_cachedData, _cachedLength);
- }
- else
- {
- stream = new FileStream(_filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, _chunkConfig.ChunkReadBuffer, FileOptions.None);
- }
- return new ReaderWorkItem(stream, new BinaryReader(stream));
- }
- private ReaderWorkItem GetReaderWorkItem()
- {
- ReaderWorkItem readerWorkItem;
- while (!_readerWorkItemQueue.TryDequeue(out readerWorkItem))
- {
- Thread.Sleep(1);
- }
- return readerWorkItem;
- }
- private void ReturnReaderWorkItem(ReaderWorkItem readerWorkItem)
- {
- _readerWorkItemQueue.Enqueue(readerWorkItem);
- }
當(dāng)一個(gè)Chunk初始化時(shí),我們預(yù)先初始化好固定數(shù)量(可配置)的Reader對(duì)象,并把這些對(duì)象放入一個(gè)ConcurrentQueue里(對(duì)象 池的作用),然后要讀取數(shù)據(jù)時(shí),從從ConcurrentQueue里拿一個(gè)可用的Reader即可,如果當(dāng)前并發(fā)太高拿不到怎么辦,就等待直到拿到為 止,目前我是等待1ms后繼續(xù)嘗試拿,直到最后拿到為止。然后ReturnReaderWorkItem就是數(shù)據(jù)讀取完之后歸還Reader到對(duì)象池。就 是不是很簡(jiǎn)單哦。這樣的設(shè)計(jì),可以避免不斷的創(chuàng)建文件流和Reader對(duì)象,可以避免GC的副作用。
Broker重啟時(shí)如何做?
大家知道,當(dāng)Broker重啟時(shí),我們是需要掃描磁盤上Chunk目錄下的所有Chunk文件的。那怎么掃描呢?上面其實(shí)我也簡(jiǎn)單提到過。首先,我 們可以對(duì)每個(gè)Chunk文件的文件名的命名定義一個(gè)規(guī)則,第一個(gè)Chunk文件的文件名比如為:message-chunk-000000000,第二個(gè) 為:message-chunk-000000001,以此類推。那我們掃描時(shí),只要先把所有的文件名獲取到,然后對(duì)文件名升序排序。那最后一個(gè)文件之前 的文件肯定都是寫入完全了的,即上面我說的Completed狀態(tài)的,而最后一個(gè)文件是還沒有寫入完的,還可以接著寫。所以我們初始化時(shí),只需要先初始化 最后一個(gè)之前的所有Chunk文件,最后再初始化最后一個(gè)文件即可。這里我所說的初始化不是要把整個(gè)Chunk文件的內(nèi)容都加載到內(nèi)存,而是只是讀取這個(gè) 文件的ChunkHeader的信息維護(hù)在內(nèi)存即可。有了Header信息,我們就可以為后續(xù)的數(shù)據(jù)讀取提供位置計(jì)算了。所以,整個(gè)加載過程是很快的,讀 取100個(gè)Chunk文件的ChunkHeader也不過一兩秒的時(shí)間,完全不影響B(tài)roker的啟動(dòng)時(shí)間。對(duì)于初始化Completed的Chunk比 較簡(jiǎn)單,只需要讀取ChunkHeader信息即可。但是初始化最后一個(gè)文件比較麻煩,因?yàn)槲覀冞€要知道這個(gè)文件當(dāng)前寫入到哪里了?從而我們可以從這個(gè)位 置的下一個(gè)位置接著往下寫。那怎么知道這個(gè)文件當(dāng)前寫入到哪里了呢?其實(shí)比較復(fù)雜。有很多技術(shù),我看到RocketMQ和EventStore這兩個(gè)開源 項(xiàng)目中都采用了Checkpoint的技術(shù)。就是當(dāng)我們每次寫入一個(gè)數(shù)據(jù)到文件后,都會(huì)更新一下Checkpoint,即表示當(dāng)前寫入到這個(gè)文件的哪里 了。然后這個(gè)Checkpoint值我們也是定時(shí)異步保存到某個(gè)獨(dú)立的小文件里,這個(gè)文件里只保存了這個(gè)Checkpoint。這樣的設(shè)計(jì)有一個(gè)問題,就 是假如數(shù)據(jù)寫入了,但由于Checkpoint的保存不是實(shí)時(shí)的,所以理論上會(huì)出現(xiàn)Checkpint值會(huì)小于實(shí)際文件寫入的位置的情況。一般我們忽略這 種情況即可,即可能會(huì)存在初始化時(shí),下次寫入可能會(huì)覆蓋一定的之前已經(jīng)寫入的數(shù)據(jù),因?yàn)镃heckpoint可能是稍微老一點(diǎn)的。
而我在設(shè)計(jì)時(shí),希望能再嚴(yán)謹(jǐn)一點(diǎn),取消Checkpoint的設(shè)計(jì),而是采用在初始化Ongoing狀態(tài)的Chunk文件時(shí),從文件的頭開始不斷往 下讀,當(dāng)最后無法往下讀時(shí),我們就知道這個(gè)文件我們當(dāng)前寫入到哪里了。那怎么知道無法往下讀了呢?也就是說怎么確定后續(xù)的文件內(nèi)容不是我們寫入的?也很簡(jiǎn) 單。對(duì)于不固定數(shù)據(jù)長(zhǎng)度的Chunk來說,由于我們每次寫入一個(gè)數(shù)據(jù)時(shí)都是同時(shí)在前后寫入這個(gè)數(shù)據(jù)的長(zhǎng)度;所以我們?cè)俪跏蓟x取這個(gè)文件時(shí),可以借助這一 點(diǎn)來校驗(yàn),但出現(xiàn)不符合這個(gè)規(guī)則的數(shù)據(jù)時(shí),就認(rèn)為后續(xù)不是正常的數(shù)據(jù)了。對(duì)于固定長(zhǎng)度的Chunk來說,我們只要保證每次寫入的數(shù)據(jù)的數(shù)據(jù)是非0了。而對(duì) 于EQueue的場(chǎng)景,固定數(shù)據(jù)的Chunk里存儲(chǔ)的都是消息在Message Chunk中的全局位置,一個(gè)Long值;但這個(gè)Long值我們正常是從0開始的,怎么辦呢?很簡(jiǎn)單,我們寫入MessagePosition時(shí),總是加 1即可。即假如當(dāng)前的MessagePosition為0,那我們實(shí)際寫入1,如果為100,則實(shí)際寫入的值是101。這樣我們就能確保這個(gè)固定長(zhǎng)度的 Chunk文件里每個(gè)數(shù)據(jù)都是非0的。然后我們?cè)诔跏蓟@樣的Chunk文件時(shí),只要不斷讀取固定長(zhǎng)度(8個(gè)字節(jié))的數(shù)據(jù),當(dāng)出現(xiàn)讀取到的數(shù)據(jù)為0時(shí),就 認(rèn)為已經(jīng)到頭了,即后續(xù)的不是我們寫入的數(shù)據(jù)了。然后我們就能知道接下來要從哪里開始讀取了哦。
#p#
如何盡量避免讀文件?
上面我介紹了如何讀文件的思路。我們也知道了,我們是在消費(fèi)者要消費(fèi)消息時(shí),從文件讀取消息的。但對(duì)從文件讀取消息總是沒有比從內(nèi)存讀取消息來的 快。我們前面的設(shè)計(jì)都沒有把內(nèi)存好好利用起來。所以我們能否考慮把未來可能要消費(fèi)的Chunk文件的內(nèi)容直接緩存在內(nèi)存呢?這樣我們就可以避免對(duì)文件的讀 取了??隙梢缘?。那怎么做呢?前面我提高多,曾經(jīng)我們用托管內(nèi)存中的ConcurrentDictionary<long, Message>這樣的字典來緩存消息。我也提到這會(huì)帶來垃圾回收而影響性能的問題。所以我們不能直接這樣簡(jiǎn)單的設(shè)計(jì)。經(jīng)過我的一些嘗試,以及從 EventStore中的源碼中學(xué)到的,我們可以使用非托管內(nèi)存來緩存Chunk文件。我們可以使用Marshal.AllocHGlobal來申請(qǐng)一塊 完整的非托管內(nèi)存,然后再需要釋放時(shí),通過Marshal.FreeHGlobal來釋放。然后,我們可以通過 UnmanagedMemoryStream來訪問這個(gè)非托管內(nèi)存。這個(gè)是核心思路。那么怎樣把一個(gè)Chunk文件緩存到非托管內(nèi)存呢?很簡(jiǎn)單了,就是掃 描這個(gè)文件的所有內(nèi)容,把內(nèi)容都寫入內(nèi)存即可。代碼如下:
- private void LoadFileChunkToMemory()
- {
- using (var fileStream = new FileStream(_filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 8192, FileOptions.None))
- {
- var cachedLength = (int)fileStream.Length;
- var cachedData = Marshal.AllocHGlobal(cachedLength);
- try
- {
- using (var unmanagedStream = new UnmanagedMemoryStream((byte*)cachedData, cachedLength, cachedLength, FileAccess.ReadWrite))
- {
- fileStream.Seek(0, SeekOrigin.Begin);
- var buffer = new byte[65536];
- int toRead = cachedLength;
- while (toRead > 0)
- {
- int read = fileStream.Read(buffer, 0, Math.Min(toRead, buffer.Length));
- if (read == 0)
- {
- break;
- }
- toRead -= read;
- unmanagedStream.Write(buffer, 0, read);
- }
- }
- }
- catch
- {
- Marshal.FreeHGlobal(cachedData);
- throw;
- }
- _cachedData = cachedData;
- _cachedLength = cachedLength;
- }
- }
代碼很簡(jiǎn)單,不用多解釋了。需要注意的是,上面這個(gè)方法針對(duì)的是Completed狀態(tài)的Chunk,即已經(jīng)寫入完成的Chunk的。已經(jīng)寫入完全的Chunk是只讀的,不會(huì)再發(fā)生更改,所以我們可以隨便緩存在內(nèi)存中。
那對(duì)于新創(chuàng)建出來的Chunk文件呢?正常情況下,消費(fèi)者來得及消費(fèi)時(shí),我們總是在不斷的寫入最新的Chunk文件,也在不斷的從這個(gè)最新的 Chunk文件讀取消息。那我們?cè)趺创_保消費(fèi)最新的消息時(shí),也不需要從文件讀取呢?也很簡(jiǎn)單,就是在新建一個(gè)Chunk文件時(shí),如果內(nèi)存足夠,也同時(shí)創(chuàng)建 一個(gè)一樣大小的基于非托管內(nèi)存的Chunk。然后我們?cè)賹懭胂⒌轿募﨏hunk成功后,再同時(shí)寫入這個(gè)消息到非托管內(nèi)存的Chunk。這樣,我們?cè)谙M(fèi) 消息,讀取消息時(shí)總是首先判斷當(dāng)前Chunk是否關(guān)聯(lián)了一個(gè)非托管內(nèi)存的Chunk,如果有,就優(yōu)先從內(nèi)存讀取即可。如果沒有才從文件Chunk讀取。
但是從文件讀取時(shí),可能會(huì)遇到一個(gè)問題。就是我們剛寫入到文件的數(shù)據(jù)可能無法立即讀取到。因?yàn)閷懭氲臄?shù)據(jù)沒有立即刷盤,所以無法通過Reader讀 取到。所以,我們不能僅通過判斷當(dāng)前寫入的位置來判斷當(dāng)前是否還有數(shù)據(jù)可以被讀取,而是考慮當(dāng)前的最后一次刷盤的位置。理論上只能讀取刷盤之前的數(shù)據(jù)。但 即便這樣設(shè)計(jì)了,在如果當(dāng)前硬盤不是SSD的情況下,好像也會(huì)出現(xiàn)讀不到數(shù)據(jù)的問題。偶爾會(huì)報(bào)錯(cuò),有朋友在測(cè)試時(shí)已經(jīng)遇到了這樣的問題。那怎么辦呢?我想 了一個(gè)辦法。因?yàn)檫@種情況歸根接地還是因?yàn)槲覀冞壿嬌险J(rèn)為已經(jīng)寫入到文件的數(shù)據(jù)由于未及時(shí)刷盤或者操作系統(tǒng)本身的內(nèi)部緩存的問題,導(dǎo)致數(shù)據(jù)未能及時(shí)寫入磁 盤。出現(xiàn)這種情況一定是最近的一些數(shù)據(jù)。所以我們?nèi)绻軌虬驯热缱罱鼘懭氲?0000(可配置)個(gè)數(shù)據(jù)都緩存在本地托管內(nèi)存中,然后讀取時(shí)先看本地緩存的 托管內(nèi)存中有沒有這個(gè)位置的數(shù)據(jù),如果有,就不需要讀文件了。這樣就能很好的解決這個(gè)問題了。那怎么確保我們只緩存了最新的10000個(gè)數(shù)據(jù)且不會(huì)超出 10000個(gè)呢?答案是環(huán)形隊(duì)列。這個(gè)名字聽起來很高大上,其實(shí)就是一個(gè)數(shù)組,數(shù)組的長(zhǎng)度為10000,然后我們?cè)趯懭霐?shù)據(jù)時(shí),我們肯定知道這個(gè)數(shù)據(jù)在文 件中的位置的,我們可以把這個(gè)位置(一個(gè)long值)對(duì)10000取摸,就能知道該把這個(gè)數(shù)據(jù)緩存在這個(gè)數(shù)組的哪個(gè)位置了。通過這個(gè)設(shè)計(jì)確保緩存的數(shù)據(jù)不 會(huì)超過10000個(gè),且確保一定只緩存最新的數(shù)據(jù),如果新的數(shù)據(jù)保存到數(shù)組的某個(gè)下標(biāo)時(shí),該下標(biāo)已經(jīng)存在以前已經(jīng)保存過的數(shù)據(jù)了,就自動(dòng)覆蓋掉即可。由于 這個(gè)數(shù)組的長(zhǎng)度不是很長(zhǎng),所以每什么GC的問題。
但是光這樣還不夠,我們這個(gè)數(shù)組中的每個(gè)元素至少要記錄這個(gè)元素對(duì)應(yīng)的數(shù)據(jù)在文件中的位置。這個(gè)是為了我們?cè)趶臄?shù)組中獲取到數(shù)據(jù)后,進(jìn)一步校驗(yàn)這個(gè)數(shù)據(jù)是否是我想要的那個(gè)位置的數(shù)據(jù)。這點(diǎn)大家應(yīng)該可以理解的吧。下面這段代碼展示了如何從環(huán)形數(shù)組中讀取想要的數(shù)據(jù):
- if (_cacheItems != null)
- {
- var index = dataPosition % _chunkConfig.ChunkLocalCacheSize;
- var cacheItem = _cacheItems[index];
- if (cacheItem != null && cacheItem.RecordPosition == dataPosition)
- {
- var record = readRecordFunc(cacheItem.RecordBuffer);
- if (record == null)
- {
- throw new ChunkReadException(
- string.Format("Cannot read a record from data position {0}. Something is seriously wrong in chunk {1}.",
- dataPosition, this));
- }
- if (_chunkConfig.EnableChunkReadStatistic)
- {
- _chunkStatisticService.AddCachedReadCount(ChunkHeader.ChunkNumber);
- }
- return record;
- }
- }
_cacheItems是當(dāng)前Chunk內(nèi)的一個(gè)環(huán)形數(shù)組,然后假如當(dāng)前我們要讀取的數(shù)據(jù)的位置是dataPosition,那我們只需要先對(duì)環(huán)形 數(shù)據(jù)的長(zhǎng)度取摸,得到一個(gè)下標(biāo),即上面代碼中的index。然后就能從數(shù)組中拿到對(duì)應(yīng)的數(shù)據(jù)了,然后如果這個(gè)數(shù)據(jù)存在,就進(jìn)一步判斷這個(gè)數(shù)據(jù) dataPosition是否和要求的dataPosition,如果一致,我們就能確定這個(gè)數(shù)據(jù)確實(shí)是我們想要的數(shù)據(jù)了。就可以返回了。
所以,通過上面的兩種緩存(非托管內(nèi)存+托管內(nèi)存環(huán)形數(shù)組)的設(shè)計(jì),我們可以確保幾乎不用再從文件讀取消息了。那什么時(shí)候還是會(huì)從文件讀取呢?就是 在1)內(nèi)存不夠用了;2)當(dāng)前要讀取的數(shù)據(jù)不是最近的10000個(gè);這兩個(gè)前提下,才會(huì)從文件讀取。一般我們線上服務(wù)器,肯定會(huì)保證內(nèi)存是可用的。 EQueue現(xiàn)在有兩個(gè)內(nèi)存使用的水位。一個(gè)是當(dāng)物理內(nèi)存使用到多少百分比(默認(rèn)值為40%)時(shí),開始清理已經(jīng)不再活躍的Chunk文件的非托管內(nèi)存 Chunk;那什么是不活躍呢?就是在最近5s內(nèi)沒有發(fā)生過讀寫的Chunk。這個(gè)設(shè)計(jì)我覺得是非常有效的,因?yàn)榧偃缫粋€(gè)Chunk有5s沒有發(fā)生過讀 寫,那一般肯定是沒有消費(fèi)者在消費(fèi)它了。另一個(gè)水位是指,最多EQueue Broker最多使用物理內(nèi)存的多少百分比(默認(rèn)值為75%),這個(gè)應(yīng)該好理解。這個(gè)水位是為了保證EQueue不會(huì)把所有物理內(nèi)存都吃光,是為了確保服 務(wù)器不會(huì)因?yàn)閮?nèi)存耗盡而宕機(jī)或?qū)е路?wù)不可用。
那什么時(shí)候會(huì)出現(xiàn)大量使用服務(wù)器內(nèi)存的情況呢?我們可以推導(dǎo)出來的。正常情況下,消息寫入第一個(gè)Chunk,我們就在讀取第一個(gè)Chunk;寫入第 二個(gè)Chunk我們也會(huì)跟著讀取第二個(gè)Chunk;假設(shè)當(dāng)前寫入到了第10個(gè)Chunk,那理論上前面的9個(gè)Chunk之前緩存的非托管內(nèi)存都可以釋放 了。因?yàn)榭隙ǔ^5s沒有發(fā)生讀寫了。但是假如現(xiàn)在消費(fèi)者有很多,且每個(gè)消費(fèi)者的消費(fèi)進(jìn)度都不同,有些很快,有些很慢,當(dāng)所有的消費(fèi)者的消費(fèi)進(jìn)度正好覆蓋 到所有的Chunk文件時(shí),就意味著每個(gè)Chunk文件都在發(fā)生讀取。也就是說,每個(gè)Chunk都是活躍的。那此時(shí)就無法釋放任何一個(gè)Chunk的非托管 內(nèi)存了。這樣就會(huì)導(dǎo)致占用大量非托管內(nèi)存了。但由于75%的水位的設(shè)計(jì),Broker內(nèi)存的使用是不會(huì)超過物理內(nèi)存75%的。在創(chuàng)建新的Chunk或者嘗 試緩存一個(gè)Completed的Chunk時(shí),總是會(huì)判斷當(dāng)前使用的物理內(nèi)存是否已經(jīng)超過75%,如果已經(jīng)超過,就不會(huì)分配對(duì)應(yīng)的非托管內(nèi)存了。
如何刪除消息?
刪除消息的設(shè)計(jì)比較簡(jiǎn)單。主要的思路是,當(dāng)我們的消息已經(jīng)被所有的消費(fèi)者都消費(fèi)過了,且滿足我們的刪除策略了,就可以刪除了。RocketMQ刪除 消息的策略比較粗暴,沒有考慮消息是否經(jīng)被消費(fèi),而是直接到了一定的時(shí)間就刪除了,比如最多只保留2天。這個(gè)是RocketMQ的設(shè)計(jì)。EQueue中, 會(huì)確保消息一定是被所有的消費(fèi)者都消費(fèi)了才會(huì)考慮刪除。然后目前我設(shè)計(jì)的刪除策略有兩種:
-
按Chunk文件數(shù);即設(shè)計(jì)一個(gè)閥值,表示磁盤上最多保存多少個(gè)Chunk文件。目前默認(rèn)值為100,每個(gè)Chunk文件的大小為256MB。也 就是大概總磁盤占用25G。一般我們的硬盤肯定有25G的。當(dāng)我們不關(guān)心消息保存多久而只從文件數(shù)的角度來決定消息是否要?jiǎng)h除時(shí),可以使用這個(gè)策略;
-
按時(shí)間來刪除,默認(rèn)是7天,即當(dāng)某個(gè)Chunk是7天前創(chuàng)建的,那我們就可以創(chuàng)建了。這種策略是不關(guān)心Chunk總共有多少,完全根據(jù)時(shí)間的維度來判斷。
實(shí)際上,應(yīng)該可能還有一些需求希望能把兩個(gè)策略合起來考慮的。這個(gè)目前我沒有做,我覺得這兩種應(yīng)該夠了。如果大家想做,可以自己擴(kuò)展的。
另外,上面我說過EQueue中目前有兩種Chunk文件,一種是存儲(chǔ)消息本身的,我叫做Message Chunk;一種是存儲(chǔ)隊(duì)列信息的,我叫做Queue Chunk;而Queue Chunk的數(shù)據(jù)是依賴于Message Chunk的。上面我說的兩種刪除策略是針對(duì)Message Chunk而言的。而Queue Chunk,由于這個(gè)依賴性,我覺得比較合理的方式是,只需要判斷當(dāng)前Queue Chunk中的所有的消息對(duì)應(yīng)的Message Chunk是否已經(jīng)都刪除了,如果是,難說明這個(gè)Queue Chunk也已經(jīng)沒意義了,就可以刪除了。但只要這個(gè)Queue Chunk中至少還有一個(gè)消息的Chunk文件沒刪除,那這個(gè)Queue Chunk就不會(huì)刪除。
上面這個(gè)只是思路哦,真實(shí)的代碼肯定比這個(gè)復(fù)雜,呵呵。有興趣的朋友還是要看代碼的。
如何查消息?
之前用SQL Server的方式,由于DB很容易查消息,所以查詢消息不是大問題。但是現(xiàn)在我們的消息是放在文件里的,那要怎么查詢呢?目前支持根據(jù)消息ID來查詢。 當(dāng)Producer發(fā)送一個(gè)消息到Broker,Broker返回結(jié)果里會(huì)包含消息的ID。Producer的正確做法應(yīng)該是要用日志或其他方式記錄這個(gè) ID,并最好和自己的當(dāng)前業(yè)務(wù)消息的某個(gè)業(yè)務(wù)ID一起記錄,比如CommandId或者EventId,這樣我們就能根據(jù)我們的業(yè)務(wù)ID找到消息ID,然 后根據(jù)消息ID找到消息內(nèi)容了。那消息ID現(xiàn)在是怎么設(shè)計(jì)的呢?也是受到RocketMQ的啟發(fā),消息ID由兩部分組成:1)Broker的IP;2)消 息在Broker的文件中的全局位置;這樣,當(dāng)我們要根據(jù)某個(gè)消息ID查詢時(shí),就可以先定位到這個(gè)消息在哪個(gè)Broker上,也同時(shí)知道了消息在文件的哪 個(gè)位置了,這樣就能最終讀取這個(gè)消息的內(nèi)容了。
為什么要這樣設(shè)計(jì)呢?如果我們的Broker沒有集群,那其實(shí)不需要包含Broker的IP;這個(gè)設(shè)計(jì)是為了未來EQueue Broker會(huì)支持集群的,那個(gè)時(shí)候,我們就必須要知道某個(gè)消息ID對(duì)應(yīng)的Broker是哪個(gè)了。
如何保存隊(duì)列消費(fèi)進(jìn)度?
EQueue中,每個(gè)Queue,都會(huì)有一個(gè)對(duì)應(yīng)的Consumer。消費(fèi)進(jìn)度就是這個(gè)Queue當(dāng)前被消費(fèi)到哪里了,一個(gè)Offset值。比如 Offset為100,就表示當(dāng)前這個(gè)Queue已經(jīng)消費(fèi)到第99(因?yàn)槭菑?開始的)個(gè)位置的消息了。因?yàn)橐粋€(gè)Broker上有很多的Queue,比如 有100個(gè)。而我們現(xiàn)在是使用文件的方式來存儲(chǔ)信息了。所以自然消費(fèi)進(jìn)度也是用文件了。但由于消費(fèi)進(jìn)度的信息很少,也不是遞增的形式。所以我們可以簡(jiǎn)單設(shè) 計(jì),目前EQueue采用一個(gè)文件的方式來保存所有Queue的消費(fèi)進(jìn)度,文件內(nèi)容為JSON,JSON里記錄了每個(gè)Queue的消費(fèi)進(jìn)度。文件內(nèi)容看起 來像下面這樣:
{"SampleGroup":{"topic1-3":89055,"topic1-2":89599,"topic1-1":89471,"topic1-0":89695}}
上面的JSON標(biāo)識(shí)一個(gè)名為SampleGroup的ConsumerGroup,他消費(fèi)了一個(gè)名為topic1的topic,然后這個(gè)topic 下的每個(gè)Queue的消費(fèi)進(jìn)度記錄了下來。如果有另一個(gè)ConsumerGroup,也消費(fèi)了這個(gè)topic,那消費(fèi)進(jìn)度是隔離的。如果還不清楚 ConsumerGroup的同學(xué),要去看一下我之前寫的EQueue的文章了。
還有沒有可以優(yōu)化的地方?
到目前為止,還有沒有其他可優(yōu)化的大的地方呢?有。之前我做EQueue時(shí),總是把消息從數(shù)據(jù)庫讀取出來,然后構(gòu)造出消息對(duì)象,再把消息對(duì)象序列化 為二進(jìn)制,再返回給Consumer。這里涉及到從DB拿出來,再序列化為二進(jìn)制。學(xué)習(xí)了RocketMQ的代碼后,我們可以做的更聰明一點(diǎn)。因?yàn)槠鋵?shí)基 于文件存儲(chǔ)時(shí),我們從文件里拿出來的已經(jīng)是二進(jìn)制了。所以可以直接把二進(jìn)制返回給消費(fèi)者即可。不需要先轉(zhuǎn)換為對(duì)象再做序列化了。通過這個(gè)設(shè)計(jì)的改進(jìn),我們 現(xiàn)在的消費(fèi)者消費(fèi)消息,可以說無任何瓶頸了,非常快。
#p#
如何統(tǒng)計(jì)消息讀寫情況?
在測(cè)試寫文件的這個(gè)版本時(shí),我們很希望知道每個(gè)Chunk的讀寫情況的統(tǒng)計(jì),從而確定設(shè)計(jì)是正確的。所以,我給EQueue的Chunk增加了實(shí)時(shí) 統(tǒng)計(jì)Chunk讀寫的統(tǒng)計(jì)服務(wù)。目前我們?cè)谶\(yùn)行EQueue自帶的例子時(shí),Broker會(huì)每個(gè)一秒打印出所有Chunk的讀寫情況,這個(gè)特性極大的方便我 們判斷消息的發(fā)送和消費(fèi)是否正常,消費(fèi)是否有延遲等。
其他新增功能
更完善和安全的隊(duì)列擴(kuò)容和縮容設(shè)計(jì)
這次我給EQueue的Web后臺(tái)管理控制臺(tái)也完善了一下隊(duì)列的增加和減少的設(shè)計(jì)。增加隊(duì)列(即隊(duì)列的擴(kuò)容)比較簡(jiǎn)單,直接新增即可。但是當(dāng)我們要 刪除一個(gè)隊(duì)列時(shí),怎樣安全的刪除呢?主要是要確保刪除這個(gè)隊(duì)列時(shí),已經(jīng)沒有Producer或Consumer在使用這個(gè)隊(duì)列了。要怎么做到呢?我的思路 是,為每個(gè)Queue對(duì)象設(shè)計(jì)兩個(gè)屬性,表示對(duì)Producer是否可見,對(duì)Consumer是否可見。當(dāng)我們要?jiǎng)h除某個(gè)Queue時(shí),可以:1)先讓其 對(duì)Producer不可見,這樣Producer后續(xù)就不會(huì)再發(fā)送新的消息到這個(gè)隊(duì)列了;然后等待,直到這個(gè)隊(duì)列里的消息都被所有的消費(fèi)者消費(fèi)掉了;然后 再設(shè)置為對(duì)Consumer不可見。然后再過幾秒,確保每個(gè)消費(fèi)者都不會(huì)再向這個(gè)隊(duì)列發(fā)出拉取消息的請(qǐng)求了。這樣我們就能安全的刪除這個(gè)隊(duì)列了。刪除隊(duì)列 的邏輯大概如如下:
- public void DeleteQueue(string topic, int queueId)
- {
- lock (this)
- {
- var key = QueueKeyUtil.CreateQueueKey(topic, queueId);
- Queue queue;
- if (!_queueDict.TryGetValue(key, out queue))
- {
- return;
- }
- //檢查隊(duì)列對(duì)Producer或Consumer是否可見,如果可見是不允許刪除的
- if (queue.Setting.ProducerVisible || queue.Setting.ConsumerVisible)
- {
- throw new Exception("Queue is visible to producer or consumer, cannot be delete.");
- }
- //檢查是否有未消費(fèi)完的消息
- var minConsumedOffset = _consumeOffsetStore.GetMinConsumedOffset(topic, queueId);
- var queueCurrentOffset = queue.NextOffset - 1;
- if (minConsumedOffset < queueCurrentOffset)
- {
- throw new Exception(string.Format("Queue is not allowed to delete, there are not consumed messages: {0}",
- queueCurrentOffset - minConsumedOffset));
- }
- //刪除隊(duì)列的消費(fèi)進(jìn)度信息
- _consumeOffsetStore.DeleteConsumeOffset(queue.Key);
- //刪除隊(duì)列本身,包括所有的文件
- queue.Delete();
- //最后將隊(duì)列從字典中移除
- _queueDict.Remove(key);
- }
- }
代碼應(yīng)該很簡(jiǎn)單直接,不多解釋了。隊(duì)列的動(dòng)態(tài)新增和刪除,可以方便我們線上應(yīng)付在線活動(dòng)時(shí),隨時(shí)為消費(fèi)者提供更高的并行消費(fèi)能力,以及活動(dòng)結(jié)束后去掉多余的隊(duì)列。是非常實(shí)用的功能。
支持Tag功能
這個(gè)功能,也是非常實(shí)用的。這個(gè)版本我加了上去。以前EQueue只有Topic的概念,沒有Tag的概念。Tag是對(duì)Topic的二級(jí)過濾。比如 當(dāng)某個(gè)Producer發(fā)送了3個(gè)消息,Topic都是topic,然后tag分別是01,02,03。然后Consumer訂閱了這個(gè)Topic,但是 訂閱這個(gè)Topic時(shí)同時(shí)制定了Tag,比如指定為02,那這個(gè)Consumer就只會(huì)收到一個(gè)消息。Tag為01,03的消息是不會(huì)收到的。這個(gè)就是 Tag的功能。我覺得Tag對(duì)我們是非常有用的,它可以極大的減少我們定義Topic。本來我們必須要定義一個(gè)新的Topic時(shí),現(xiàn)在可能只需要定義一個(gè) Tag即可。關(guān)于Tag的實(shí)現(xiàn),我就不展開了。
支持消息堆積報(bào)警
終于到最后一點(diǎn)了,終于堅(jiān)持快寫完了,呵呵。EQueue Web后臺(tái)管理控制臺(tái)現(xiàn)在支持消息堆積的報(bào)警了。當(dāng)EQueue Broker上當(dāng)前所有未消費(fèi)的消息數(shù)達(dá)到一定的閥值時(shí),就會(huì)發(fā)送郵件進(jìn)行報(bào)警。我們可以把我們的郵件和我們的手機(jī)短信進(jìn)行綁定,比如移動(dòng)的139郵箱我 記得就有這個(gè)功能。這樣我們就能第一時(shí)間知道Broker上是否有大量消息堆積了,可以讓我們第一時(shí)間處理問題。
結(jié)束語
這篇文章感覺是我有史以來寫過的最有干貨的一篇了,呵呵。一氣呵成,也是對(duì)我前面幾個(gè)月的所有積累知識(shí)經(jīng)驗(yàn)的一次性釋放吧。希望能給大家一些幫助。 我寫文章比較喜歡寫思路,不太喜歡介紹如何用。我覺得一個(gè)程序員,最重要的是要學(xué)會(huì)如何思考去解決自己想解決的問題。而不是別人直接告訴你如何去解決。通 過做EQueue這個(gè)分布式消息隊(duì)列,也算是我自己的一個(gè)實(shí)踐過程。我非常鼓勵(lì)大家寫開源項(xiàng)目哦,當(dāng)你專注于實(shí)現(xiàn)某個(gè)你感興趣的開源項(xiàng)目時(shí),你就會(huì)有目標(biāo) 性的去學(xué)習(xí)相關(guān)的知識(shí),你的學(xué)習(xí)就不會(huì)迷茫,不會(huì)為了學(xué)技術(shù)而學(xué)技術(shù)了。我在做EQuque時(shí),要考慮各種東西,比如通信層的設(shè)計(jì)、消息持久化、整個(gè)架構(gòu) 設(shè)計(jì),等等。我覺得是非常鍛煉人的。
一個(gè)人時(shí)間短暫,如果能用有限的時(shí)間做出好的東西可以造福后人,那我們來到這個(gè)世上也算沒白來了,你說對(duì)嗎?所以,我們千萬不要放棄我們的理想,雖然堅(jiān)持理想很難,但也要堅(jiān)持。