消息的存儲(chǔ)-RocketMQ知識(shí)體系之三
RocketMQ存儲(chǔ)概要設(shè)計(jì)
RocketMQ主要存儲(chǔ)的文件包括commitlog文件、consumeQueue文件、IndexFile文件。
CommitLog是消息存儲(chǔ)文件,所有消息主題的消息都存儲(chǔ)在CommitLog文件中;該文件默認(rèn)最大為1GB,超過(guò)1GB后會(huì)輪到下一個(gè)CommitLog文件。通過(guò)CommitLog,RocketMQ將所有消息存儲(chǔ)在一起,以順序IO的方式寫(xiě)入磁盤(pán),充分利用了磁盤(pán)順序?qū)憸p少了IO爭(zhēng)用提高數(shù)據(jù)存儲(chǔ)的性能。
RocketMQ的Broker機(jī)器磁盤(pán)上的文件存儲(chǔ)結(jié)構(gòu)
【CommitLog】
消息在CommitLog中的存儲(chǔ)格式如下:
存儲(chǔ)所有消息內(nèi)容,寫(xiě)滿一個(gè)文件后生成新的 commitlog 文件。所有 topic 的數(shù)據(jù)存儲(chǔ)在一起,邏輯視圖如下:
CommitLog代碼
- private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
- /**
- * MAGIC_CODE - MESSAGE
- * Message's MAGIC CODE daa320a7
- * 標(biāo)記某一段為消息,即:[msgId, MESSAGE_MAGIC_CODE, 消息]
- */
- public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
- /**
- * MAGIC_CODE - BLANK
- * End of file empty MAGIC CODE cbd43194
- * 標(biāo)記某一段為空白,即:[msgId, BLANK_MAGIC_CODE, 空白]
- * 當(dāng)CommitLog無(wú)法容納消息時(shí),使用該類型結(jié)尾
- */
- private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;
- /**
- * 映射文件隊(duì)列
- */
- private final MappedFileQueue mappedFileQueue;
- /**
- * 消息存儲(chǔ)
- */
- private final DefaultMessageStore defaultMessageStore;
- /**
- * flush commitLog 線程服務(wù)
- */
- private final FlushCommitLogService flushCommitLogService;
- /**
- * If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
- * commit commitLog 線程服務(wù)
- */
- private final FlushCommitLogService commitLogService;
- /**
- * 寫(xiě)入消息到Buffer Callback
- */
- private final AppendMessageCallback appendMessageCallback;
- /**
- * topic消息隊(duì)列 與 offset 的Map
- */
- private HashMap<String/* topic-queue_id */, Long/* offset */> topicQueueTable = new HashMap<>(1024);
- /**
- * TODO
- */
- private volatile long confirmOffset = -1L;
- /**
- * 當(dāng)前獲取lock時(shí)間。
- * 如果當(dāng)前解鎖,則為0
- */
- private volatile long beginTimeInLock = 0;
- /**
- * true: Can lock, false : in lock.
- * 添加消息 螺旋鎖(通過(guò)while循環(huán)實(shí)現(xiàn))
- */
- private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
- /**
- * 添加消息重入鎖
- */
- private ReentrantLock putMessageNormalLock = new ReentrantLock(); // Non fair Sync
【ConsumeQueue】
ConsumeQueue是消息消費(fèi)隊(duì)列文件,消息達(dá)到commitlog文件后將被異步轉(zhuǎn)發(fā)到消息消費(fèi)隊(duì)列,供消息消費(fèi)者消費(fèi);一個(gè)ConsumeQueue表示一個(gè)topic的一個(gè)queue,類似于kafka的一個(gè)partition,但是rocketmq在消息存儲(chǔ)上與kafka有著非常大的不同,RocketMQ的ConsumeQueue中不存儲(chǔ)具體的消息,具體的消息由CommitLog存儲(chǔ),ConsumeQueue中只存儲(chǔ)路由到該queue中的消息在CommitLog中的offset,消息的大小以及消息所屬的tag的hash(tagCode),一共只占20個(gè)字節(jié),整個(gè)數(shù)據(jù)包如下:
ConsumeQueue代碼
- public static final int CQ_STORE_UNIT_SIZE = 20;
- private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
- private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
- private final DefaultMessageStore defaultMessageStore;
- /**
- * 映射文件隊(duì)列
- */
- private final MappedFileQueue mappedFileQueue;
- /**
- * Topic
- */
- private final String topic;
- /**
- * 隊(duì)列編號(hào)
- */
- private final int queueId;
- /**
- * 消息位置信息ByteBuffer
- */
- private final ByteBuffer byteBufferIndex;
- /**
- * 文件存儲(chǔ)地址
- */
- private final String storePath;
- /**
- * 每個(gè)映射文件大小
- */
- private final int mappedFileSize;
- /**
- * 最大重放消息commitLog存儲(chǔ)位置
- */
- private long maxPhysicOffset = -1;
- private volatile long minLogicOffset = 0;
Consume Queue文件組織,如圖所示:
Consume Queue文件組織示意圖
- 根據(jù)topic和queueId來(lái)組織文件,圖中TopicA有兩個(gè)隊(duì)列0,1,那么TopicA和QueueId=0組成一個(gè)ConsumeQueue,TopicA和QueueId=1組成另一個(gè)ConsumeQueue。
- 按照消費(fèi)端的GroupName來(lái)分組重試隊(duì)列,如果消費(fèi)端消費(fèi)失敗,消息將被發(fā)往重試隊(duì)列中,比如圖中的%RETRY%ConsumerGroupA。
- 按照消費(fèi)端的GroupName來(lái)分組死信隊(duì)列,如果消費(fèi)端消費(fèi)失敗,并重試指定次數(shù)后,仍然失敗,則發(fā)往死信隊(duì)列,比如圖中的%DLQ%ConsumerGroupA。
死信隊(duì)列(Dead Letter Queue)一般用于存放由于某種原因無(wú)法傳遞的消息,比如處理失敗或者已經(jīng)過(guò)期的消息。
【IndexFile】
IndexFile是消息索引文件,主要存儲(chǔ)的是key和offset的對(duì)應(yīng)關(guān)系。
IndexFile(索引文件)提供了一種可以通過(guò)key或時(shí)間區(qū)間來(lái)查詢消息的方法。
文件名fileName是以創(chuàng)建時(shí)的時(shí)間戳命名的,固定的單個(gè)IndexFile文件大小約為400M,一個(gè)IndexFile可以保存 2000W個(gè)索引,IndexFile的底層存儲(chǔ)設(shè)計(jì)為在文件系統(tǒng)中實(shí)現(xiàn)HashMap結(jié)構(gòu),故rocketmq的索引文件其底層實(shí)現(xiàn)為hash索引。
- private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
- private static int hashSlotSize = 4;
- private static int indexSize = 20;
- private static int invalidIndex = 0;
- private final int hashSlotNum;
- private final int indexNum;
- private final MappedFile mappedFile;
- private final FileChannel fileChannel;
- private final MappedByteBuffer mappedByteBuffer;
- private final IndexHeader indexHeader;
IndexFile的存儲(chǔ)結(jié)構(gòu):
從上面的分析可以看出,RocketMQ采用的是混合型的存儲(chǔ)結(jié)構(gòu),即為Broker單個(gè)實(shí)例下所有的隊(duì)列共用一個(gè)日志數(shù)據(jù)文件(即為CommitLog)來(lái)存儲(chǔ)。RocketMQ的混合型存儲(chǔ)結(jié)構(gòu)(多個(gè)Topic的消息實(shí)體內(nèi)容都存儲(chǔ)于一個(gè)CommitLog中)針對(duì)Producer和Consumer分別采用了數(shù)據(jù)和索引部分相分離的存儲(chǔ)結(jié)構(gòu),Producer發(fā)送消息至Broker端,然后Broker端使用同步或者異步的方式對(duì)消息刷盤(pán)持久化,保存至CommitLog中。
只要消息被刷盤(pán)持久化至磁盤(pán)文件CommitLog中,那么Producer發(fā)送的消息就不會(huì)丟失。
正因?yàn)槿绱?,Consumer也就肯定有機(jī)會(huì)去消費(fèi)這條消息。當(dāng)無(wú)法拉取到消息后,可以等下一次消息拉取,同時(shí)服務(wù)端也支持長(zhǎng)輪詢模式,如果一個(gè)消息拉取請(qǐng)求未拉取到消息,Broker允許等待30s的時(shí)間,只要這段時(shí)間內(nèi)有新消息到達(dá),將直接返回給消費(fèi)端。這里,RocketMQ的具體做法是,使用Broker端的后臺(tái)服務(wù)線程—ReputMessageService不停地分發(fā)請(qǐng)求并異步構(gòu)建ConsumeQueue(邏輯消費(fèi)隊(duì)列)和IndexFile(索引文件)數(shù)據(jù)。
【全局的角度來(lái)看消息的存儲(chǔ)】
【消息存儲(chǔ)流程】
Broker端收到消息后,將消息原始信息保存在CommitLog文件對(duì)應(yīng)的MappedFile中,然后異步刷新到磁盤(pán)
ReputMessageServie線程異步的將CommitLog中MappedFile中的消息保存到ConsumerQueue和IndexFile中
ConsumerQueue和IndexFile只是原始文件的索引信息
內(nèi)存映射和數(shù)據(jù)刷盤(pán)
【內(nèi)存映射流程】
- 內(nèi)存映射文件MappedFile通過(guò)AllocateMappedFileService創(chuàng)建
- MappedFile的創(chuàng)建是典型的生產(chǎn)者-消費(fèi)者模型
- MappedFileQueue調(diào)用getLastMappedFile獲取MappedFile時(shí),將請(qǐng)求放入隊(duì)列中
- AllocateMappedFileService線程持續(xù)監(jiān)聽(tīng)隊(duì)列,隊(duì)列有請(qǐng)求時(shí),創(chuàng)建出MappedFile對(duì)象
- 最后將MappedFile對(duì)象預(yù)熱,底層調(diào)用force方法和mlock方法。
【刷盤(pán)機(jī)制】
- 異步刷盤(pán):消息被寫(xiě)入內(nèi)存的PAGECACHE,返回寫(xiě)成功狀態(tài),當(dāng)內(nèi)存里的消息量積累到一定程度時(shí),統(tǒng)一觸發(fā)寫(xiě)磁盤(pán)操作,快速寫(xiě)入 。吞吐量高,當(dāng)磁盤(pán)損壞時(shí),會(huì)丟失消息
- 同步刷盤(pán):消息寫(xiě)入內(nèi)存的PAGECACHE后,立刻通知刷盤(pán)線程刷盤(pán),然后等待刷盤(pán)完成,刷盤(pán)線程執(zhí)行完成后喚醒等待的線程,給應(yīng)用返回消息寫(xiě)成功的狀態(tài)。吞吐量低,但不會(huì)造成消息丟失。
【刷盤(pán)流程】
producer發(fā)送給broker的消息保存在MappedFile中,然后通過(guò)刷盤(pán)機(jī)制同步到磁盤(pán)中。
刷盤(pán)分為同步刷盤(pán)和異步刷盤(pán)。
異步刷盤(pán)后臺(tái)線程按一定時(shí)間間隔執(zhí)行。
同步刷盤(pán)也是生產(chǎn)者-消費(fèi)者模型。broker保存消息到MappedFile后,創(chuàng)建GroupCommitRequest請(qǐng)求放入列表,并阻塞等待。后臺(tái)線程從列表中獲取請(qǐng)求并刷新磁盤(pán),成功刷盤(pán)后通知等待線程。
RocketMQ 文件存儲(chǔ)模型層次結(jié)構(gòu)
文件存儲(chǔ)模型層次結(jié)構(gòu)圖
RocketMQ文件存儲(chǔ)模型層次結(jié)構(gòu)如上圖所示,根據(jù)類別和作用從概念模型上大致可以劃分為5層,下面將從各個(gè)層次分別進(jìn)行分析和闡述:
RocketMQ業(yè)務(wù)處理器層:Broker端對(duì)消息進(jìn)行讀取和寫(xiě)入的業(yè)務(wù)邏輯入口,比如前置的檢查和校驗(yàn)步驟、構(gòu)造MessageExtBrokerInner對(duì)象、decode反序列化、構(gòu)造Response返回對(duì)象等;
RocketMQ數(shù)據(jù)存儲(chǔ)組件層;該層主要是RocketMQ的存儲(chǔ)核心類—DefaultMessageStore,其為RocketMQ消息數(shù)據(jù)文件的訪問(wèn)入口,通過(guò)該類的“putMessage()”和“getMessage()”方法完成對(duì)CommitLog消息存儲(chǔ)的日志數(shù)據(jù)文件進(jìn)行讀寫(xiě)操作(具體的讀寫(xiě)訪問(wèn)操作還是依賴下一層中CommitLog對(duì)象模型提供的方法);另外,在該組件初始化時(shí)候,還會(huì)啟動(dòng)很多存儲(chǔ)相關(guān)的后臺(tái)服務(wù)線程,包括AllocateMappedFileService(MappedFile預(yù)分配服務(wù)線程)、ReputMessageService(回放存儲(chǔ)消息服務(wù)線程)、HAService(Broker主從同步高可用服務(wù)線程)、StoreStatsService(消息存儲(chǔ)統(tǒng)計(jì)服務(wù)線程)、IndexService(索引文件服務(wù)線程)等;
RocketMQ存儲(chǔ)邏輯對(duì)象層:該層主要包含了RocketMQ數(shù)據(jù)文件存儲(chǔ)直接相關(guān)的三個(gè)模型類IndexFile、ConsumerQueue和CommitLog。IndexFile為索引數(shù)據(jù)文件提供訪問(wèn)服務(wù),ConsumerQueue為邏輯消息隊(duì)列提供訪問(wèn)服務(wù),CommitLog則為消息存儲(chǔ)的日志數(shù)據(jù)文件提供訪問(wèn)服務(wù)。這三個(gè)模型類也是構(gòu)成了RocketMQ存儲(chǔ)層的整體結(jié)構(gòu)(對(duì)于這三個(gè)模型類的深入分析將放在后續(xù)篇幅中);
封裝的文件內(nèi)存映射層:RocketMQ主要采用JDK NIO中的MappedByteBuffer和FileChannel兩種方式完成數(shù)據(jù)文件的讀寫(xiě)。其中,采用MappedByteBuffer這種內(nèi)存映射磁盤(pán)文件的方式完成對(duì)大文件的讀寫(xiě),在RocketMQ中將該類封裝成MappedFile類。這里限制的問(wèn)題在上面已經(jīng)講過(guò);對(duì)于每類大文件(IndexFile/ConsumerQueue/CommitLog),在存儲(chǔ)時(shí)分隔成多個(gè)固定大小的文件(單個(gè)IndexFile文件大小約為400M、單個(gè)ConsumerQueue文件大小約5.72M、單個(gè)CommitLog文件大小為1G),其中每個(gè)分隔文件的文件名為前面所有文件的字節(jié)大小數(shù)+1,即為文件的起始偏移量,從而實(shí)現(xiàn)了整個(gè)大文件的串聯(lián)。這里,每一種類的單個(gè)文件均由MappedFile類提供讀寫(xiě)操作服務(wù)(其中,MappedFile類提供了順序?qū)?隨機(jī)讀、內(nèi)存數(shù)據(jù)刷盤(pán)、內(nèi)存清理等和文件相關(guān)的服務(wù));
磁盤(pán)存儲(chǔ)層:主要指的是部署RocketMQ服務(wù)器所用的磁盤(pán)。這里,需要考慮不同磁盤(pán)類型(如SSD或者普通的HDD)特性以及磁盤(pán)的性能參數(shù)(如IOPS、吞吐量和訪問(wèn)時(shí)延等指標(biāo))對(duì)順序?qū)?隨機(jī)讀操作帶來(lái)的影響;
文件存儲(chǔ)的高可用
【分布式存儲(chǔ)】
同一個(gè)topic 上的數(shù)據(jù)會(huì)分成多個(gè)queue 分布在不同的 broker 上,而且每個(gè)queue 都有副本機(jī)制。
【副本的主從同步(HA)】
RocketMQ 的主從同步機(jī)制如下:
1.首先啟動(dòng)Master并在指定端口監(jiān)聽(tīng);
2.客戶端啟動(dòng),主動(dòng)連接Master,建立TCP連接;
3.客戶端以每隔5s的間隔時(shí)間向服務(wù)端拉取消息,如果是第一次拉取的話,先獲取本地commitlog文件中最大的偏移量,以該偏移量向服務(wù)端拉取消息;
4.服務(wù)端解析請(qǐng)求,并返回一批數(shù)據(jù)給客戶端;
5.客戶端收到一批消息后,將消息寫(xiě)入本地commitlog文件中,然后向Master匯報(bào)拉取進(jìn)度,并更新下一次待拉取偏移量;
6.然后重復(fù)第3步;
文件存儲(chǔ)的優(yōu)化技術(shù)
RocketMQ存儲(chǔ)層采用的幾項(xiàng)優(yōu)化技術(shù)方案在一定程度上可以減少PageCache的缺點(diǎn)帶來(lái)的影響,主要包括內(nèi)存預(yù)分配,文件預(yù)熱和mlock系統(tǒng)調(diào)用。
【預(yù)先分配MappedFile】
在消息寫(xiě)入過(guò)程中(調(diào)用CommitLog的putMessage()方法),CommitLog會(huì)先從MappedFileQueue隊(duì)列中獲取一個(gè) MappedFile,如果沒(méi)有就新建一個(gè)。
RocketMQ中預(yù)分配MappedFile的設(shè)計(jì)非常巧妙,下次獲取時(shí)候直接返回就可以不用等待MappedFile創(chuàng)建分配所產(chǎn)生的時(shí)間延遲。
【文件預(yù)熱&&mlock系統(tǒng)調(diào)用】
(1)mlock系統(tǒng)調(diào)用:其可以將進(jìn)程使用的部分或者全部的地址空間鎖定在物理內(nèi)存中,防止其被交換到swap空間。對(duì)于RocketMQ這種的高吞吐量的分布式消息隊(duì)列來(lái)說(shuō),追求的是消息讀寫(xiě)低延遲,那么肯定希望盡可能地多使用物理內(nèi)存,提高數(shù)據(jù)讀寫(xiě)訪問(wèn)的操作效率。
(2)文件預(yù)熱:預(yù)熱的目的主要有兩點(diǎn);第一點(diǎn),由于僅分配內(nèi)存并進(jìn)行mlock系統(tǒng)調(diào)用后并不會(huì)為程序完全鎖定這些內(nèi)存,因?yàn)槠渲械姆猪?yè)可能是寫(xiě)時(shí)復(fù)制的。因此,就有必要對(duì)每個(gè)內(nèi)存頁(yè)面中寫(xiě)入一個(gè)假的值。其中,RocketMQ是在創(chuàng)建并分配MappedFile的過(guò)程中,預(yù)先寫(xiě)入一些隨機(jī)值至Mmap映射出的內(nèi)存空間里。第二,調(diào)用Mmap進(jìn)行內(nèi)存映射后,OS只是建立虛擬內(nèi)存地址至物理地址的映射表,而實(shí)際并沒(méi)有加載任何文件至內(nèi)存中。程序要訪問(wèn)數(shù)據(jù)時(shí)OS會(huì)檢查該部分的分頁(yè)是否已經(jīng)在內(nèi)存中,如果不在,則發(fā)出一次缺頁(yè)中斷。這里,可以想象下1G的CommitLog需要發(fā)生多少次缺頁(yè)中斷,才能使得對(duì)應(yīng)的數(shù)據(jù)才能完全加載至物理內(nèi)存中(ps:X86的Linux中一個(gè)標(biāo)準(zhǔn)頁(yè)面大小是4KB)?RocketMQ的做法是,在做Mmap內(nèi)存映射的同時(shí)進(jìn)行madvise系統(tǒng)調(diào)用,目的是使OS做一次內(nèi)存映射后對(duì)應(yīng)的文件數(shù)據(jù)盡可能多的預(yù)加載至內(nèi)存中,從而達(dá)到內(nèi)存預(yù)熱的效果。
本文轉(zhuǎn)載自微信公眾號(hào)「小汪哥寫(xiě)代碼」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系小汪哥寫(xiě)代碼公眾號(hào)。