RocketMQ 常見問題與深度解析
在當今的分布式系統(tǒng)架構(gòu)中,消息隊列作為解耦、異步處理和流量削峰的核心組件,扮演著至關(guān)重要的角色。而RocketMQ作為阿里巴巴開源的一款高性能、高可用的分布式消息中間件,憑借其強大的功能和廣泛的應用場景,成為眾多企業(yè)和開發(fā)者關(guān)注的焦點。
無論是面試中還是實際工作中,掌握RocketMQ的核心原理和常見問題都顯得尤為重要。本文將從基礎(chǔ)到高級,全面總結(jié)RocketMQ的面試高頻問題,并結(jié)合實際應用場景和源碼解析,幫助讀者深入理解其設(shè)計思想和實現(xiàn)機制。無論你是準備面試的技術(shù)人,還是希望提升對RocketMQ理解的開發(fā)者,相信這篇文章都能為你提供有價值的參考。
接下來,我們將從RocketMQ的基本概念、核心組件、消息存儲與消費、高可用性設(shè)計以及性能優(yōu)化等方面展開詳細解析,助你輕松應對面試中的各種挑戰(zhàn)!
RocketMQ的架構(gòu)是怎么樣的
整體來說,RocketMQ是由如下幾個部分組成:
producer:投遞消息的生產(chǎn)者,主要負責將消息投遞到broker上。
- broker:可以理解為消息中轉(zhuǎn)服務(wù)器,主要負責消息存儲,以及消息路由的轉(zhuǎn)發(fā),RocketMQ支持多個broker構(gòu)成集群,每個broker都有獨立的存儲空間和隊列。
- consumer:訂閱topic并從broker中獲得消息并消費。
- nameserver:提供服務(wù)發(fā)現(xiàn)和路由功能,負責維護broker的數(shù)據(jù)信息,包括broker地址、topic和queue等,對應的producer和consumer在啟動時都需要通過nameserver獲取broker的地址信息。
- topic:消費主題,對于消息的邏輯分類的單位,producer消息都會發(fā)送到特定的topic上,對應的consumer就會從這些topic拿到消費的消息。
RocketMQ的事務(wù)消息是如何實現(xiàn)的
大體是通過half消息完成,大體工作流程為:
- 生產(chǎn)者即應用程序像mq的broker發(fā)送一條half消息,mq收到該消息后在事務(wù)消息日志中將其標記為prepared狀態(tài),然后回復ack確認。
- 生產(chǎn)者執(zhí)行本地事務(wù),將事務(wù)執(zhí)行結(jié)果發(fā)送提交指令告知mq可以提交事務(wù)消息給消費者。
- mq若收到提交通知后,將消息從prepared改為commited,然后將消息提交給消費者,當然如果mq長時間沒有收到提交通知則發(fā)送回查給生產(chǎn)者詢問該事務(wù)的執(zhí)行情況。
- 基于事務(wù)結(jié)果若成功則將事務(wù)消息提交給消費者,反之回滾該消息即將消息設(shè)置為rollback并將該消息從消息日志中刪除,從而保證消息不被消費。
RocketMQ如何保證消息的順序性
針對保證消費順序性的問題,我們可以基于下面這樣的一個場景來分析,假設(shè)我們有一個下單請求,生產(chǎn)者要求按需投遞下面這些消息讓消費者消費:
- 創(chuàng)建訂單
- 用戶付款
- 庫存扣減
同理消費者也得嚴格按照這個順序完成消費,此時如果按照簡單維度的架構(gòu)來說,我們可以全局設(shè)置一個topic讓生產(chǎn)者準確有序的投遞每一個消息,然后消費者準確依次消費消息即可,但是這樣做對于并發(fā)的場景下性能表現(xiàn)就會非常差勁:
為了適當提升兩端性能比對消息堆積,我們選擇增加隊列用多個隊列處理這個原子業(yè)務(wù):
有了這樣的架構(gòu)基礎(chǔ),我們就需要考慮生產(chǎn)者和消費者的有序生產(chǎn)和有序消費的落地思路了,先來說說生產(chǎn)者有序投遞,這樣做比較簡單,我們可以直接通過訂單號進行hash并結(jié)合topic隊列數(shù)進行取模的方式將一個訂單的創(chuàng)建、余額扣減、庫存扣減的消息有序投遞到某個隊列中,這樣就能保證單個訂單的業(yè)務(wù)消息有序性:
對應的我們也給出生產(chǎn)者的代碼使用示例:
//基于訂單號orderNo進行哈希取模發(fā)送訂單消息
Message<Order> message = MessageBuilder.withPayload(order).build();
rocketMQTemplate.syncSendOrderly("ORDER_ADD", message, order.getOrderNo());
這塊哈希取模的實現(xiàn)可以從底層源碼DefaultMQProducerImpl的sendSelectImpl看到,它會將arg(也就是我們的orderNo)通過selector的select進行運算獲得單topic下的某個隊列:
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//......
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
//傳入arg也就是我們的orderNo基于selector算法進行哈希取模
mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) {
//......
}
//......
}
這個調(diào)用會來到SelectMessageQueueByHash的select,從源碼可以看出這塊代碼看出,它的算法就是通過參數(shù)哈希運算后結(jié)合隊列數(shù)(默認為4)進行取模:
public class SelectMessageQueueByHash implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//數(shù)值哈希運算
int value = arg.hashCode();
//......
//結(jié)合隊列數(shù)取模得到隊列返回
value = value % mqs.size();
return mqs.get(value);
}
}
消費者端就比較簡單了,consumeMode指定為有序消費即可:
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}",
topic = "ORDER_ADD",
consumeMode = ConsumeMode.ORDERLY//同一個topic下同一個隊列只有一個消費者線程消費
)
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("收到訂單,訂單信息:[{}],進行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSONUtil.toJsonStr(order));
}
}
這里我們也基于源碼解析一下這個有序消費的實現(xiàn),本質(zhì)上消費者啟動的時候會開啟一個定時任務(wù)嘗試獲取分布式上鎖隊列信息的執(zhí)行如下步驟:
- 獲取知道的broker及其隊列。
- 獲取對應broker的master地址。
- 發(fā)送請求到服務(wù)端詢問master獲取所有隊列的分布式鎖。
- 基于請求結(jié)果獲取查看那些隊列上鎖成功。
- 更新本地結(jié)果。
完成后,消費者就拉取到全局可唯一消費的隊列信息,因為每個消費者都是基于多線程執(zhí)行,所以為了保證本地多線程消費有序性,每一個線程進行消費時都會以消息隊列messageQueue作為key用synchronized上鎖后才能消費。
代碼如下所示,可以看到上鎖成功后就會執(zhí)行messageListener.consumeMessage方法,該方法就會走到我們上文中聲明的監(jiān)聽上了:
public void run() {
//......
//消費請求線程消費前會獲取消息隊列鎖
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
//上鎖
synchronized (objLock) {
//......
//將消息發(fā)送給實現(xiàn)有序監(jiān)聽的監(jiān)聽器線程
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
//......
} finally {
//......
}
//......
}
RocketMQ有幾種集群方式
- 多master模式:該模式僅由多master構(gòu)成,配置比較簡單,單個master宕機或重啟對于應用全局沒有任何影響,尤其在磁盤為RAID10的情況下,即使服務(wù)器不可恢復,只要我們使用同步刷盤策略,基本上消息都不會丟失,而缺點也非常明顯,單臺機器宕機期間,這臺機器未被消費的消息在恢復之前不可訂閱,消息的實時性會受影響。
- 多master-多slave異步復制:多個master和多個slave構(gòu)成,即使master宕機后slave依然可以對外提供服務(wù),所以消息實時性不會受影響,缺點是主從復制是異步的,如果master宕機時同步的消息可能丟失部分,且沒有解決slave自動切換為master。
- 多master-多slave同步復制:上者的優(yōu)化版即同步策略采用同步的方式,保證在RAID10的情況下消息基本不丟失,但因采用的是同步復制,所以發(fā)送單個消息的RT可能略高,且同樣沒有解決slave自動切換為master。
- Dledger模式:該集群模式要求至少由3個broker構(gòu)成,即一個master必須對應兩個slave,一旦某個master宕機后通過raft一致性算法選舉新的master對外提供服務(wù)。具體實踐可以參考:https://rocketmq.apache.org/zh/docs/bestPractice/02dledger/
RocketMQ消息堆積了怎么解決
消息堆積的原因比較多,大體是客戶端隊列并發(fā)度不夠或者客戶端消費能力不足,所以我們可以針對以下幾個角度進行針對性的優(yōu)化:
- 增加消費者實例:如果是消費速度過慢導致的消息堆積,則建議增加消費者數(shù)量,讓更多的實例來消費這些消息。
- 提升消費者消費速度:如果是消息消費處理耗時長,則針對性的業(yè)務(wù)流程調(diào)優(yōu),例如引入線程池、本地消息存儲后立即返回成功等方式提前消息進行批量的預消化。
- 降低生產(chǎn)者速度:如果生產(chǎn)端可控且消費者已經(jīng)沒有調(diào)優(yōu)的空間時,我們建議降低生產(chǎn)者生產(chǎn)速度。
- 清理過期消息:對于一些過期且一直無法處理成功的消息,在進行業(yè)務(wù)上的評估后,我們可以針對性的進行清理。
- 增加topic隊列數(shù):如果是因為隊列少導致并發(fā)度不夠可以考慮增加一下消費者隊列,來提升消息隊列的并發(fā)度。
- 參數(shù)調(diào)優(yōu):我們可以針對各個節(jié)點耗時針對:消費模式、消息拉取間隔等參數(shù)進行優(yōu)化。
RocketMQ的工作流程詳解
上文已經(jīng)介紹了幾個基本的概念,我們這里直接將其串聯(lián)起來:
- 啟動nameServer,等待broker、producer和consumer的接入。
- 啟動broker和nameserver建立連接,并定時發(fā)送心跳包,心跳包中包含broker信息(ip、端口號等)以及topic以及broker與topic的映射關(guān)系。
- 啟動producer,producer啟動時會隨機通過nameserver集群中的一臺建立長連接,并從中獲取發(fā)送的topic和所有broker地址信息,基于這些信息拿到topic對應的隊列,與隊列所在的broker建立長連接,自此開始消息發(fā)送。
- broker接收producer發(fā)送的消息時,會根據(jù)配置同步和刷盤策略進行狀態(tài)回復:
1. 若為同步復制則master需要復制到slave節(jié)點后才能返回寫狀態(tài)成功
2. 若配置同步刷盤,還需要基于上述步驟再將數(shù)據(jù)寫入磁盤才能返回寫成功
3. 若是異步刷盤和異步復制,則消息一到master就直接回復成功
- 啟動consumer,和nameserver建立連接然后訂閱信息,然后對感興趣的broker建立連接,獲取消息并消費。
RocketMQ的消息是采用推模式還是采用拉模式
消費模式分為3種:
- push:服務(wù)端主動推送消息給客戶端。
- pull:客戶端主動到服務(wù)端輪詢獲取數(shù)據(jù)。
- pop:5.0之后的新模式,后文會介紹。
總的來說push模式是性能比較好,但是客戶端沒有做好留空,可能會出現(xiàn)大量消息把客戶端打死的情況。 而poll模式同理,頻繁拉取服務(wù)端可能會造成服務(wù)器壓力,若設(shè)置不好輪詢間隔,可能也會出現(xiàn)消費不及時的情況,
整體來說RocketMQ本質(zhì)上還是采用pull模式,具體后文會有介紹。
用了RocketMQ一定能做到削峰嗎
削峰本質(zhì)就是將高并發(fā)場景下短時間涌入的消息平攤通過消息隊列構(gòu)成緩沖區(qū)然后平攤到各個時間點進行消費,從而實現(xiàn)平滑處理。
這也不意味著用mq就一定可以解決問題,假如用push模式,這就意味著你的消息都是mq立即收到立即推送的,本質(zhì)上只是加了一個無腦轉(zhuǎn)發(fā)的中間層,并沒有實際解決問題。 所以要想做到削峰,就必須用拉模式,通過主動拉去保證消費的速度,讓消息堆積在mq隊列中作為緩沖。
常見消息隊列的消息模型有哪些?RocketMQ用的是那種消息模型
消息隊列的消息模型有兩種,一種是隊列模型,生產(chǎn)者負責把消息扔到消息隊列中,消費者去消息隊列中搶消息,消息只有一個,先到者先得:
還有一種就是發(fā)布/訂閱模型了,發(fā)布訂閱模型的消息只要消費者有訂閱就能消費消息:
RocketMQ是支持發(fā)布訂閱模式的,如下所示,筆者又新建一個監(jiān)聽者,用的是不同的消費者組consumerGroup,運行時即可看到兩組訂閱的消費者消費一份消息:
@Component
@RocketMQMessageListener(consumerGroup = "gourp2", topic = "ORDER_ADD")
public class OrderMqListener2 implements RocketMQListener<Order> {
private static Logger logger = LoggerFactory.getLogger(OrderMqListener2.class);
@Override
public void onMessage(Order order) {
logger.info("訂閱者2收到消息,訂單信息:[{}],進行新春福利活動.....", JSON.toJSONString(order));
}
}
RocketMQ消息的消費模式有哪些
有兩種消費模式:
集群消費:這種是RocketMQ默認模式,一個主題下的多個隊列都會被消費者組中的某個消費者消費掉。
廣播消費:廣播消費模式會讓每個消費者組中的每個消費者都能使用這個消息。
如何保證消息可用性和可靠性呢?
這個問題我們要從3個角度考慮:
對于生產(chǎn)階段,生產(chǎn)者發(fā)送消息要想確??煽勘仨氉裱韵?點:
- 沒有發(fā)送成功就需要進行重試:
SendResult result = producer.send(message);
if (!"SEND_OK".equals(result.getSendStatus().name())){
logger.warn("消息發(fā)送失敗,執(zhí)行重試的邏輯");
}
- 如果發(fā)送超時,我們可以從日志相關(guān)API中查看是否存到Broker中。
- 如果是異步消息,則需要到回調(diào)接口中做相應處理。
針對存儲階段,存儲階段要保證可靠性就需要從以下幾個角度保證:
- 開啟主從復制模式,使得Master掛了還有Slave可以用。
- 為了確保發(fā)送期間服務(wù)器宕機的情況,我們建議刷盤機制改為同步刷盤,確保消息發(fā)送并寫到CommitLog中再返回成功。
這里補充一下同步刷盤和異步刷盤的區(qū)別:
- 同步刷盤,生產(chǎn)者投遞的消息持久化時必須真正寫到磁盤才會返回成功,可靠性高,但是因為IO問題會使得組件處理效率下降。
- 異步刷盤,如下圖所示,可以僅僅是存到page cache即可返回成功,至于何時持久化操磁盤由操作系統(tǒng)后臺異步的頁緩存置換算法決定。
對應的刷盤策略,我們只需修改broker.conf的配置文件即可:
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
對于消費階段,消費者編碼邏輯一定要確保消費成功了再返回消費成功:
consumer.registerMessageListener((List<MessageExt> msgs,
ConsumeConcurrentlyContext context) -> {
String msg = new String(msgs.stream().findFirst().get().getBody());
logger.info("消費收到消息,消息內(nèi)容={}", msg);
//消費完全成功再返回成功狀態(tài)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
如果避免消息重復消費問題(重點)
這個我們可以分不同的情況討論,有些場景下,我們只需保證業(yè)務(wù)冪等即可,例如:我們需要給訂單服務(wù)發(fā)送一個用戶下單成功的消息,無論發(fā)送多少次訂單服務(wù)只是將訂單表狀態(tài)設(shè)置成已完成。
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "ORDER_ADD")
public class OrderMqListener implements RocketMQListener<Order> {
private static Logger logger = LoggerFactory.getLogger(OrderMqListener.class);
@Override
public void onMessage(Order order) {
logger.info("消費者收到訂單,訂單信息:[{}],進行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSON.toJSONString(order));
updateOrderFinish(order);
}
private void updateOrderFinish(Order order){
logger.info("執(zhí)行dao層邏輯,將訂單設(shè)置下單完成,無論多少次,執(zhí)行到這個消費邏輯都是將訂單設(shè)置為處理完成");
}
}
還有一種方式就是業(yè)務(wù)去重,例如我們現(xiàn)在要創(chuàng)建訂單,每次訂單創(chuàng)建完都會往一張記錄消費信息表中插入數(shù)據(jù)。一旦我們收到重復的消息,只需帶著唯一標識去數(shù)據(jù)庫中查,如果有則直接返回成功即可:
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "ORDER_ADD")
public class OrderMqListener implements RocketMQListener<Order> {
//......
@Override
public void onMessage(Order order) {
logger.info("消費者收到訂單,訂單信息:[{}],進行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSON.toJSONString(order));
//消費者消費時判斷訂單是否存在,如果存在則直接返回
if (isExist(order)){
return;
}
updateOrderFinish(order);
}
}
延時消息底層是怎么實現(xiàn)
我們都知道投遞消息到消息隊列的時候,消息都會寫入到commitLog上,在此之前MQ會檢查當前消息延遲等級是否大于0,如果是則說明該消息是延遲消息,則會將其topic設(shè)置為RMQ_SYS_SCHEDULE_TOPIC并基于延遲等級獲取對應的隊列,最后基于零拷貝的方式寫入磁盤,注意此時消息還不可被消費:
對此我們也給出這段投遞消息的源碼,即位于CommitLog的asyncPutMessage異步投遞消息的方法:
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
//......
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
//如果大于0則說明是延遲消息
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//設(shè)置topic設(shè)置為SCHEDULE_TOPIC_XXXX
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
//基于等級獲取延遲消息隊列
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
//......
//基于上述設(shè)置topic和隊列信息
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
//......
//基于零拷貝的方式添加
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
//......
}
MQ啟動的時候底層的消息調(diào)度服務(wù)會基于延遲消息的等級初始化幾個任務(wù),這些任務(wù)會基于定時的間隔檢查是否有到期的消息到來,如果到期則將其投遞到真正topic的隊列中供消費者消費:
基于此邏輯我們也給出ScheduleMessageService的start方法查看調(diào)度器的初始化邏輯,可以看到初始化階段,它會遍歷所有延遲級別并為其初始化一個定時任務(wù):
public void start() {
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
//遍歷所有延遲級別
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
//設(shè)置timer定時器
if (timeDelay != null) {
//投遞給定時器對應等級的定時任務(wù)
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
//......
}
查看DeliverDelayedMessageTimerTask的核心邏輯即run方法,也就是我們所說的定時檢查是否有到期消息,若存在則將其存入原本的topic上,消費者就可以消費了:
@Override
public void run() {
try {
if (isStarted()) {
this.executeOnTimeup();
}
} catch (Exception e) {
//......
}
}
public void executeOnTimeup() {
//基于topic和隊列id獲取延遲隊列
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
//根據(jù)偏移量獲取有效消息
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
//......
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
//......
long now = System.currentTimeMillis();
//計算可消費時間
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
//如果小于0說明可消費
if (countdown <= 0) {
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
//清除延遲級別恢復到真正的topic和隊列id
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
//......
//放到消息隊列上
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
//......
} else {
//......
} catch (Exception e) {
//......
} else {
//......
}
} // end of for
//......
}
什么是死信隊列
通俗來說一個消息消費失敗并重試達到最大次數(shù)后,MQ就會將其放到死信隊列中。超過三天該消息就會被銷毀。 需要補充的時死信隊列是針對一個group id為單位創(chuàng)建的隊列,如果一個gourp中都沒有死信的話,那么MQ就不會為這個組創(chuàng)建死信隊列。
Broker是進行消息持久化的
要想了解Broker如何保存數(shù)據(jù),我們必須了解RocketMQ三大文件:
首先是commitlog,producer發(fā)送的消息最終都會通過刷盤機制存到commitlog文件夾下。commitlog下一個文件名為00000000000000000000一旦寫滿,就會再創(chuàng)建一個文件寫,一般來說第二個文件名為00000000001073741824,名稱即是第一個文件的字節(jié)數(shù)。文件大小一般是1G:
然后是consumequeue文件夾,這個文件夾下記錄的都是commitlog中每個topic下的隊列信息物理偏移量、消息大小、hashCode值,如下圖,consumequeue文件夾下會為每個topic創(chuàng)建一個文件夾:
打開任意一個文件夾就會看到這樣一個名為00000000000000000000的文件:
而這個文件內(nèi)部最多維護30w個條目,注意文件中每個條目大約20字節(jié),8字節(jié)代表當前消息在commitLog中的偏移量,4字節(jié)存放消息大小,8字節(jié)存放tag和hashCode的值。
最后就算index,維護消息的索引,基于HashMap結(jié)構(gòu),這個文件使得我們可以通過key或者時間區(qū)間查詢消息:
文件名基本用時間戳生成的,大小一般為400M,差不多維護2000w個索引:
簡單小結(jié)一下RocketMQ持久化的物理文件:MQ會為每個broker維護一個commitlog,一旦文件存放到commitlog,消息就不會丟失。當無法拉取消息時,broker允許producer在30s內(nèi)發(fā)送一個消息,然后直接給消費者消費。
后兩個索引文件的維護是基于一個線程ReputMessageService進行異步維護consumeQueue(邏輯消費隊列)和IndexFile(索引文件)數(shù)據(jù):
RocketMQ如何進行文件讀寫的呢?
對于讀寫IO處理有以下兩種:
- pageCache:在RocketMQ中,ConsumeQueue存儲數(shù)據(jù)較少,并且是順序讀取,在pageCache預讀的機制下讀取速率是非??陀^的(即使有大量的消息堆積)。操作系統(tǒng)會將一部分內(nèi)存用作pageCache,當數(shù)據(jù)寫入磁盤會先經(jīng)過pageCache然后通過內(nèi)核線程pdflush寫入物理磁盤。 針對ConsumeQueue下關(guān)于消息索引的數(shù)據(jù)查詢時,會先去pageCache查詢是否有數(shù)據(jù),若有則直接返回。若沒有則去ConsumeQueue文件中讀取需要的數(shù)據(jù)以及這個數(shù)據(jù)附近的數(shù)據(jù)一起加載到pageCache中,這樣后續(xù)的讀取就是走緩存,效率自然上去了,這種磁盤預讀目標數(shù)據(jù)的附近數(shù)據(jù)就是我們常說的局部性原理。而commitLog隨機性比較強特定情況下讀寫性能會相對差一些,所以在操作系統(tǒng)層面IO讀寫調(diào)度算法可以改為deadline并選用SSD盤以保證操作系統(tǒng)在指定時間完成數(shù)據(jù)讀寫保證性能。
- 零拷貝技術(shù):這是MQ基于NIO的FileChannel模型的一種直接將物理文件映射到用戶態(tài)內(nèi)存地址的一種技術(shù),通過MappedByteBuffer,它的工作機制是直接建立內(nèi)存映射,文件數(shù)據(jù)并沒有經(jīng)過JVM和操作系統(tǒng)直接復制的過程,相當于直接操作內(nèi)存,所以效率就非常高??梢詤⒖? 能不能給我講講零拷貝:https://mp.weixin.qq.com/s/zS2n2a4h3YQifBYKFgcUCA
消息刷盤如何實現(xiàn)呢?
兩種方式分別是同步刷盤和異步刷盤
- 同步刷盤: producer發(fā)送的消息經(jīng)過broker后必須寫入到物理磁盤commitLog后才會返回成功。
- 異步刷盤:producer發(fā)送的消息到達broker之后,直接返回成功,刷盤的邏輯交給一個異步線程實現(xiàn)。
而上面說的刷盤都是通過MappedByteBuffer.force() 這個方法完成的,需要補充異步刷盤是通過一個異步線程FlushCommitLogService實現(xiàn)的,其底層通過MappedFileQueue針對內(nèi)存中的隊列消息調(diào)用flush進行刷盤從而完成消息寫入:
public boolean flush(final int flushLeastPages) {
boolean result = true;
//拉取文件處理偏移量信息
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
//基于mmap零拷貝技術(shù)進行刷盤
int offset = mappedFile.flush(flushLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
//如果刷盤后的進度和預期一樣說明刷盤成功
result = where == this.flushedWhere;
this.flushedWhere = where;
if (0 == flushLeastPages) {
//維護處理時間
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
RocketMQ負載均衡
MQ中負載均衡的主要是體現(xiàn)在生產(chǎn)者端和消費者端,Producer負載均衡算法在上述中有序消費中的源碼已經(jīng)說明,這里就不多做贅述,本質(zhì)上就是通過底層的selector進行輪詢投遞:
Message<Order> message = MessageBuilder.withPayload(order).build();
rocketMQTemplate.syncSendOrderly("ORDER_ADD", message, order.getOrderNo());
再來consumer負載均衡算法,mq客戶端啟動時會開啟一個負載均衡服務(wù)執(zhí)行負載均衡隊列輪詢邏輯,通過負載均衡算法得出每個消費者應該處理的隊列信息后生產(chǎn)拉取消息的請求,交由有MQ客戶端去拉取消息:
默認情況下,負載均衡算法選定隊列后拉取消息進行消費,默認情況下它會根據(jù)隊列數(shù)和消費者數(shù)決定如何進行負載分擔,按照平均算法:
- 如果消費者數(shù)大于隊列數(shù),則將隊列分配給有限的幾個消費者。
- 如果消費者數(shù)小于隊列數(shù),默認情況下會按照隊列數(shù)/消費者數(shù)取下限+1進行分配,例如隊列為4,消費者為3,那么每個消費者就會拿到2個隊列,其中第三個消費者則沒有處理任何數(shù)據(jù)。
對應的我們給出MQ客戶端初始化的代碼RebalanceService的run方法,可以看到它會調(diào)用mqClientFactory執(zhí)行負載均衡方法doRebalance:
@Override
public void run() {
//.......
while (!this.isStopped()) {
//.......
//客戶端執(zhí)行負載均衡
this.mqClientFactory.doRebalance();
}
//.......
}
步入其內(nèi)部邏輯會走到RebalanceImpl的doRebalance,它遍歷每個topic進行負載均衡運算:
public void doRebalance(final boolean isOrder) {
//......
if (subTable != null) {
//遍歷每個topic
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//計算該topic中當前消費者要處理的隊列
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
//......
}
}
}
//......
}
最終我們來到了核心邏輯rebalanceByTopic方法,可以看到它會基于我們查到的topic的隊列和消費者通過策略模式找到對應的消息分配策略AllocateMessageQueueStrategy從而算得當前消費者需要處理的隊列,然后在基于這份結(jié)果調(diào)用updateProcessQueueTableInRebalance生成pullRequest告知客戶端為該消費者拉取消息:
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
//......
}
case CLUSTERING: {
//根據(jù)主題獲取消息隊列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//根據(jù) topic 與 consumerGroup 獲取所有的 consumerId
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
//......
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
//// 排序后才能保證消費者負載策略相對穩(wěn)定
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
//按負載策略進行分配,返回當前消費者實際訂閱的messageQueue集合
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
//......
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
//......
break;
}
default:
break;
}
}
對應的我們也給出負載均衡算法AllocateMessageQueueAveragely的源碼,大體算法和筆者上述說明的基本一致,讀者可以參考上圖講解了解一下:
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
//......
//獲取消費者id對應的索引
int index = cidAll.indexOf(currentCID);
//基于隊列總數(shù)和客戶端總數(shù)進行取模
int mod = mqAll.size() % cidAll.size();
/**
*計算每個消費者的可消費的平均值:
* 1. 如果消費者多于隊列就取1
* 2. 如果消費者少于隊列就按照取模結(jié)果來計算
*/
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
//基于當前客戶端的索引定位其處理的隊列位置
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
//獲取消費者的隊列消費范圍
int range = Math.min(averageSize, mqAll.size() - startIndex);
//遍歷隊列存入結(jié)果集
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
完成后基于這份結(jié)果生成pullRequest存入pullRequestQueue中:
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
//......
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
//......
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
//......
} else {
//生成pullRequest
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
//存入pullRequestQueue中
this.dispatchPullRequest(pullRequestList);
return changed;
}
最后消費者的PullMessageService這個線程就會從隊列中取出該請求向MQ發(fā)起消息拉取請求:
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//獲取一個拉消息的請求pullRequest
PullRequest pullRequest = this.pullRequestQueue.take();
//拉消息
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
RocketMQ消息長輪詢
消費者獲取消息大體有兩種方案:
- 消息隊列主動push:由消息隊列主動去推送消息給消費者,高并發(fā)場景下,對于服務(wù)端性能開銷略大。
- 消費者定期pull消息:由客戶端主動去拉取消息,但是需要客戶端設(shè)置好拉取的間隔,太頻繁對于消息隊列開銷還是很大,間隔太長消息實時性又無法保證。
對此RocketMQ采用長輪詢機制保證了實時性同時又降低了服務(wù)端的開銷,總的來說,它的整體思路為:
- 消費者發(fā)起一個消費請求,內(nèi)容傳入topic、queueId和客戶端socket、pullFromThisOffset等數(shù)據(jù)。
- 服務(wù)端收到請求后查看該隊列是否有數(shù)據(jù),若沒有則掛起。
- 在一個最大超時時間內(nèi)定時輪詢,如果有則將結(jié)果返回給客戶端。
- 反之處理超時,也直接告知客戶端超時了也沒有消息。
對應的我們再次給出消費者拉取消費的源碼PullMessageService的run方法,可以看到其內(nèi)部不斷從阻塞隊列中拉取請求并發(fā)起消息拉?。?/p>
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//獲取一個拉消息的請求
PullRequest pullRequest = this.pullRequestQueue.take();
//拉消息
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
服務(wù)端的PullMessageProcessor的processRequest就是處理請求的入口,可以看到該方法如果發(fā)現(xiàn)broker沒有看到新的消息就會調(diào)用suspendPullRequest將客戶端連接hold?。?/p>
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
//.......
switch (response.getCode()) {
case ResponseCode.SUCCESS:
//.......
} else {
//.......
break;
case ResponseCode.PULL_NOT_FOUND://沒拉取到消息
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
//定位請求的topic以及offset和隊列id
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
//基于上述數(shù)據(jù)生成pullRequest并調(diào)用suspendPullRequest將其hold住
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
case ResponseCode.PULL_RETRY_IMMEDIATELY:
break;
case ResponseCode.PULL_OFFSET_MOVED:
//.......
break;
default:
assert false;
}
} else {
//.......
}
//.......
return response;
}
然后PullRequestHoldService就會基于上述一部掛起的數(shù)據(jù)定時檢查是否有新消息到來,直到超期:
@Override
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
//等待
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
//基于超時時限內(nèi)定時查看topic中的隊列是否有新消息,如果有或者超時則返回
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
//......
}
}
//......
}
這里我們也給出checkHoldRequest的調(diào)用可以看到,如果查到隊列offset大于用戶傳的說明就有新消息則返回,超時則直接返回:
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
//......
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
List<PullRequest> requestList = mpr.cloneListAndClear();
if (requestList != null) {
//......
for (PullRequest request : requestList) {
long newestOffset = maxOffset;
if (newestOffset <= request.getPullFromThisOffset()) {
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}
//拉取到新消息就返回
if (newestOffset > request.getPullFromThisOffset()) {
//......
if (match) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
}
//超時也返回
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
replayList.add(request);
}
//......
}
}
}