自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

RocketMQ 常見問題與深度解析

開發(fā)
本文我們將從RocketMQ的基本概念、核心組件、消息存儲與消費、高可用性設(shè)計以及性能優(yōu)化等方面展開詳細解析,助你輕松應對面試中的各種挑戰(zhàn)!

在當今的分布式系統(tǒng)架構(gòu)中,消息隊列作為解耦、異步處理和流量削峰的核心組件,扮演著至關(guān)重要的角色。而RocketMQ作為阿里巴巴開源的一款高性能、高可用的分布式消息中間件,憑借其強大的功能和廣泛的應用場景,成為眾多企業(yè)和開發(fā)者關(guān)注的焦點。

無論是面試中還是實際工作中,掌握RocketMQ的核心原理和常見問題都顯得尤為重要。本文將從基礎(chǔ)到高級,全面總結(jié)RocketMQ的面試高頻問題,并結(jié)合實際應用場景和源碼解析,幫助讀者深入理解其設(shè)計思想和實現(xiàn)機制。無論你是準備面試的技術(shù)人,還是希望提升對RocketMQ理解的開發(fā)者,相信這篇文章都能為你提供有價值的參考。

接下來,我們將從RocketMQ的基本概念、核心組件、消息存儲與消費、高可用性設(shè)計以及性能優(yōu)化等方面展開詳細解析,助你輕松應對面試中的各種挑戰(zhàn)!

RocketMQ的架構(gòu)是怎么樣的

整體來說,RocketMQ是由如下幾個部分組成:

producer:投遞消息的生產(chǎn)者,主要負責將消息投遞到broker上。

  • broker:可以理解為消息中轉(zhuǎn)服務(wù)器,主要負責消息存儲,以及消息路由的轉(zhuǎn)發(fā),RocketMQ支持多個broker構(gòu)成集群,每個broker都有獨立的存儲空間和隊列。
  • consumer:訂閱topic并從broker中獲得消息并消費。
  • nameserver:提供服務(wù)發(fā)現(xiàn)和路由功能,負責維護broker的數(shù)據(jù)信息,包括broker地址、topic和queue等,對應的producer和consumer在啟動時都需要通過nameserver獲取broker的地址信息。
  • topic:消費主題,對于消息的邏輯分類的單位,producer消息都會發(fā)送到特定的topic上,對應的consumer就會從這些topic拿到消費的消息。

RocketMQ的事務(wù)消息是如何實現(xiàn)的

大體是通過half消息完成,大體工作流程為:

  • 生產(chǎn)者即應用程序像mq的broker發(fā)送一條half消息,mq收到該消息后在事務(wù)消息日志中將其標記為prepared狀態(tài),然后回復ack確認。
  • 生產(chǎn)者執(zhí)行本地事務(wù),將事務(wù)執(zhí)行結(jié)果發(fā)送提交指令告知mq可以提交事務(wù)消息給消費者。
  • mq若收到提交通知后,將消息從prepared改為commited,然后將消息提交給消費者,當然如果mq長時間沒有收到提交通知則發(fā)送回查給生產(chǎn)者詢問該事務(wù)的執(zhí)行情況。
  • 基于事務(wù)結(jié)果若成功則將事務(wù)消息提交給消費者,反之回滾該消息即將消息設(shè)置為rollback并將該消息從消息日志中刪除,從而保證消息不被消費。

RocketMQ如何保證消息的順序性

針對保證消費順序性的問題,我們可以基于下面這樣的一個場景來分析,假設(shè)我們有一個下單請求,生產(chǎn)者要求按需投遞下面這些消息讓消費者消費:

  • 創(chuàng)建訂單
  • 用戶付款
  • 庫存扣減

同理消費者也得嚴格按照這個順序完成消費,此時如果按照簡單維度的架構(gòu)來說,我們可以全局設(shè)置一個topic讓生產(chǎn)者準確有序的投遞每一個消息,然后消費者準確依次消費消息即可,但是這樣做對于并發(fā)的場景下性能表現(xiàn)就會非常差勁:

為了適當提升兩端性能比對消息堆積,我們選擇增加隊列用多個隊列處理這個原子業(yè)務(wù):

有了這樣的架構(gòu)基礎(chǔ),我們就需要考慮生產(chǎn)者和消費者的有序生產(chǎn)和有序消費的落地思路了,先來說說生產(chǎn)者有序投遞,這樣做比較簡單,我們可以直接通過訂單號進行hash并結(jié)合topic隊列數(shù)進行取模的方式將一個訂單的創(chuàng)建、余額扣減、庫存扣減的消息有序投遞到某個隊列中,這樣就能保證單個訂單的業(yè)務(wù)消息有序性:

對應的我們也給出生產(chǎn)者的代碼使用示例:

//基于訂單號orderNo進行哈希取模發(fā)送訂單消息
Message<Order> message = MessageBuilder.withPayload(order).build();
rocketMQTemplate.syncSendOrderly("ORDER_ADD", message, order.getOrderNo());

這塊哈希取模的實現(xiàn)可以從底層源碼DefaultMQProducerImpl的sendSelectImpl看到,它會將arg(也就是我們的orderNo)通過selector的select進行運算獲得單topic下的某個隊列:

private SendResult sendSelectImpl(
        Message msg,
        MessageQueueSelector selector,
        Object arg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
      //......
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            try {
             //傳入arg也就是我們的orderNo基于selector算法進行哈希取模
                mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
            } catch (Throwable e) {
               //......
            }

          //......
    }

這個調(diào)用會來到SelectMessageQueueByHash的select,從源碼可以看出這塊代碼看出,它的算法就是通過參數(shù)哈希運算后結(jié)合隊列數(shù)(默認為4)進行取模:

public class SelectMessageQueueByHash implements MessageQueueSelector {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    //數(shù)值哈希運算
        int value = arg.hashCode();
      //......
      //結(jié)合隊列數(shù)取模得到隊列返回
        value = value % mqs.size();
        return mqs.get(value);
    }
}

消費者端就比較簡單了,consumeMode指定為有序消費即可:

@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}",
        topic = "ORDER_ADD",
        consumeMode = ConsumeMode.ORDERLY//同一個topic下同一個隊列只有一個消費者線程消費
        
)
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        log.info("收到訂單,訂單信息:[{}],進行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSONUtil.toJsonStr(order));
    }
}

這里我們也基于源碼解析一下這個有序消費的實現(xiàn),本質(zhì)上消費者啟動的時候會開啟一個定時任務(wù)嘗試獲取分布式上鎖隊列信息的執(zhí)行如下步驟:

  • 獲取知道的broker及其隊列。
  • 獲取對應broker的master地址。
  • 發(fā)送請求到服務(wù)端詢問master獲取所有隊列的分布式鎖。
  • 基于請求結(jié)果獲取查看那些隊列上鎖成功。
  • 更新本地結(jié)果。

完成后,消費者就拉取到全局可唯一消費的隊列信息,因為每個消費者都是基于多線程執(zhí)行,所以為了保證本地多線程消費有序性,每一個線程進行消費時都會以消息隊列messageQueue作為key用synchronized上鎖后才能消費。

代碼如下所示,可以看到上鎖成功后就會執(zhí)行messageListener.consumeMessage方法,該方法就會走到我們上文中聲明的監(jiān)聽上了:

public void run() {
            //......
            //消費請求線程消費前會獲取消息隊列鎖
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            //上鎖
            synchronized (objLock) {
                 //......
                                //將消息發(fā)送給實現(xiàn)有序監(jiān)聽的監(jiān)聽器線程
                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } catch (Throwable e) {
                                 //......
                            } finally {
                                 //......
                            }
  //......
        }

RocketMQ有幾種集群方式

  • 多master模式:該模式僅由多master構(gòu)成,配置比較簡單,單個master宕機或重啟對于應用全局沒有任何影響,尤其在磁盤為RAID10的情況下,即使服務(wù)器不可恢復,只要我們使用同步刷盤策略,基本上消息都不會丟失,而缺點也非常明顯,單臺機器宕機期間,這臺機器未被消費的消息在恢復之前不可訂閱,消息的實時性會受影響。
  • 多master-多slave異步復制:多個master和多個slave構(gòu)成,即使master宕機后slave依然可以對外提供服務(wù),所以消息實時性不會受影響,缺點是主從復制是異步的,如果master宕機時同步的消息可能丟失部分,且沒有解決slave自動切換為master。
  • 多master-多slave同步復制:上者的優(yōu)化版即同步策略采用同步的方式,保證在RAID10的情況下消息基本不丟失,但因采用的是同步復制,所以發(fā)送單個消息的RT可能略高,且同樣沒有解決slave自動切換為master。
  • Dledger模式:該集群模式要求至少由3個broker構(gòu)成,即一個master必須對應兩個slave,一旦某個master宕機后通過raft一致性算法選舉新的master對外提供服務(wù)。具體實踐可以參考:https://rocketmq.apache.org/zh/docs/bestPractice/02dledger/

RocketMQ消息堆積了怎么解決

消息堆積的原因比較多,大體是客戶端隊列并發(fā)度不夠或者客戶端消費能力不足,所以我們可以針對以下幾個角度進行針對性的優(yōu)化:

  • 增加消費者實例:如果是消費速度過慢導致的消息堆積,則建議增加消費者數(shù)量,讓更多的實例來消費這些消息。
  • 提升消費者消費速度:如果是消息消費處理耗時長,則針對性的業(yè)務(wù)流程調(diào)優(yōu),例如引入線程池、本地消息存儲后立即返回成功等方式提前消息進行批量的預消化。
  • 降低生產(chǎn)者速度:如果生產(chǎn)端可控且消費者已經(jīng)沒有調(diào)優(yōu)的空間時,我們建議降低生產(chǎn)者生產(chǎn)速度。
  • 清理過期消息:對于一些過期且一直無法處理成功的消息,在進行業(yè)務(wù)上的評估后,我們可以針對性的進行清理。
  • 增加topic隊列數(shù):如果是因為隊列少導致并發(fā)度不夠可以考慮增加一下消費者隊列,來提升消息隊列的并發(fā)度。
  • 參數(shù)調(diào)優(yōu):我們可以針對各個節(jié)點耗時針對:消費模式、消息拉取間隔等參數(shù)進行優(yōu)化。

RocketMQ的工作流程詳解

上文已經(jīng)介紹了幾個基本的概念,我們這里直接將其串聯(lián)起來:

  • 啟動nameServer,等待broker、producer和consumer的接入。
  • 啟動broker和nameserver建立連接,并定時發(fā)送心跳包,心跳包中包含broker信息(ip、端口號等)以及topic以及broker與topic的映射關(guān)系。
  • 啟動producer,producer啟動時會隨機通過nameserver集群中的一臺建立長連接,并從中獲取發(fā)送的topic和所有broker地址信息,基于這些信息拿到topic對應的隊列,與隊列所在的broker建立長連接,自此開始消息發(fā)送。
  • broker接收producer發(fā)送的消息時,會根據(jù)配置同步和刷盤策略進行狀態(tài)回復:
1. 若為同步復制則master需要復制到slave節(jié)點后才能返回寫狀態(tài)成功
2. 若配置同步刷盤,還需要基于上述步驟再將數(shù)據(jù)寫入磁盤才能返回寫成功
3. 若是異步刷盤和異步復制,則消息一到master就直接回復成功
  • 啟動consumer,和nameserver建立連接然后訂閱信息,然后對感興趣的broker建立連接,獲取消息并消費。

RocketMQ的消息是采用推模式還是采用拉模式

消費模式分為3種:

  • push:服務(wù)端主動推送消息給客戶端。
  • pull:客戶端主動到服務(wù)端輪詢獲取數(shù)據(jù)。
  • pop:5.0之后的新模式,后文會介紹。

總的來說push模式是性能比較好,但是客戶端沒有做好留空,可能會出現(xiàn)大量消息把客戶端打死的情況。 而poll模式同理,頻繁拉取服務(wù)端可能會造成服務(wù)器壓力,若設(shè)置不好輪詢間隔,可能也會出現(xiàn)消費不及時的情況,

整體來說RocketMQ本質(zhì)上還是采用pull模式,具體后文會有介紹。

用了RocketMQ一定能做到削峰嗎

削峰本質(zhì)就是將高并發(fā)場景下短時間涌入的消息平攤通過消息隊列構(gòu)成緩沖區(qū)然后平攤到各個時間點進行消費,從而實現(xiàn)平滑處理。

這也不意味著用mq就一定可以解決問題,假如用push模式,這就意味著你的消息都是mq立即收到立即推送的,本質(zhì)上只是加了一個無腦轉(zhuǎn)發(fā)的中間層,并沒有實際解決問題。 所以要想做到削峰,就必須用拉模式,通過主動拉去保證消費的速度,讓消息堆積在mq隊列中作為緩沖。

常見消息隊列的消息模型有哪些?RocketMQ用的是那種消息模型

消息隊列的消息模型有兩種,一種是隊列模型,生產(chǎn)者負責把消息扔到消息隊列中,消費者去消息隊列中搶消息,消息只有一個,先到者先得:

還有一種就是發(fā)布/訂閱模型了,發(fā)布訂閱模型的消息只要消費者有訂閱就能消費消息:

RocketMQ是支持發(fā)布訂閱模式的,如下所示,筆者又新建一個監(jiān)聽者,用的是不同的消費者組consumerGroup,運行時即可看到兩組訂閱的消費者消費一份消息:

@Component
@RocketMQMessageListener(consumerGroup = "gourp2", topic = "ORDER_ADD")
public class OrderMqListener2 implements RocketMQListener<Order> {

    private static Logger logger = LoggerFactory.getLogger(OrderMqListener2.class);


    @Override
    public void onMessage(Order order) {

        logger.info("訂閱者2收到消息,訂單信息:[{}],進行新春福利活動.....", JSON.toJSONString(order));

    }
}

RocketMQ消息的消費模式有哪些

有兩種消費模式:

集群消費:這種是RocketMQ默認模式,一個主題下的多個隊列都會被消費者組中的某個消費者消費掉。

廣播消費:廣播消費模式會讓每個消費者組中的每個消費者都能使用這個消息。

如何保證消息可用性和可靠性呢?

這個問題我們要從3個角度考慮:

對于生產(chǎn)階段,生產(chǎn)者發(fā)送消息要想確??煽勘仨氉裱韵?點:

  • 沒有發(fā)送成功就需要進行重試:
SendResult result = producer.send(message);
                if (!"SEND_OK".equals(result.getSendStatus().name())){
                    logger.warn("消息發(fā)送失敗,執(zhí)行重試的邏輯");
                }
  • 如果發(fā)送超時,我們可以從日志相關(guān)API中查看是否存到Broker中。
  • 如果是異步消息,則需要到回調(diào)接口中做相應處理。

針對存儲階段,存儲階段要保證可靠性就需要從以下幾個角度保證:

  • 開啟主從復制模式,使得Master掛了還有Slave可以用。
  • 為了確保發(fā)送期間服務(wù)器宕機的情況,我們建議刷盤機制改為同步刷盤,確保消息發(fā)送并寫到CommitLog中再返回成功。

這里補充一下同步刷盤和異步刷盤的區(qū)別:

  • 同步刷盤,生產(chǎn)者投遞的消息持久化時必須真正寫到磁盤才會返回成功,可靠性高,但是因為IO問題會使得組件處理效率下降。
  • 異步刷盤,如下圖所示,可以僅僅是存到page cache即可返回成功,至于何時持久化操磁盤由操作系統(tǒng)后臺異步的頁緩存置換算法決定。

對應的刷盤策略,我們只需修改broker.conf的配置文件即可:

#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH

對于消費階段,消費者編碼邏輯一定要確保消費成功了再返回消費成功:

consumer.registerMessageListener((List<MessageExt> msgs,
                                          ConsumeConcurrentlyContext context) -> {
            String msg = new String(msgs.stream().findFirst().get().getBody());
            logger.info("消費收到消息,消息內(nèi)容={}", msg);
            
            //消費完全成功再返回成功狀態(tài)
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

如果避免消息重復消費問題(重點)

這個我們可以分不同的情況討論,有些場景下,我們只需保證業(yè)務(wù)冪等即可,例如:我們需要給訂單服務(wù)發(fā)送一個用戶下單成功的消息,無論發(fā)送多少次訂單服務(wù)只是將訂單表狀態(tài)設(shè)置成已完成。

@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "ORDER_ADD")
public class OrderMqListener implements RocketMQListener<Order> {

    private static Logger logger = LoggerFactory.getLogger(OrderMqListener.class);


    @Override
    public void onMessage(Order order) {
        logger.info("消費者收到訂單,訂單信息:[{}],進行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSON.toJSONString(order));
        updateOrderFinish(order);
    }

    private void updateOrderFinish(Order order){
        logger.info("執(zhí)行dao層邏輯,將訂單設(shè)置下單完成,無論多少次,執(zhí)行到這個消費邏輯都是將訂單設(shè)置為處理完成");
    }

}

還有一種方式就是業(yè)務(wù)去重,例如我們現(xiàn)在要創(chuàng)建訂單,每次訂單創(chuàng)建完都會往一張記錄消費信息表中插入數(shù)據(jù)。一旦我們收到重復的消息,只需帶著唯一標識去數(shù)據(jù)庫中查,如果有則直接返回成功即可:

@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "ORDER_ADD")
public class OrderMqListener implements RocketMQListener<Order> {

 //......
    @Override
    public void onMessage(Order order) {
        logger.info("消費者收到訂單,訂單信息:[{}],進行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSON.toJSONString(order));
        //消費者消費時判斷訂單是否存在,如果存在則直接返回
        if (isExist(order)){
            return;
        }
        updateOrderFinish(order);
    }

   

}

延時消息底層是怎么實現(xiàn)

我們都知道投遞消息到消息隊列的時候,消息都會寫入到commitLog上,在此之前MQ會檢查當前消息延遲等級是否大于0,如果是則說明該消息是延遲消息,則會將其topic設(shè)置為RMQ_SYS_SCHEDULE_TOPIC并基于延遲等級獲取對應的隊列,最后基于零拷貝的方式寫入磁盤,注意此時消息還不可被消費:

對此我們也給出這段投遞消息的源碼,即位于CommitLog的asyncPutMessage異步投遞消息的方法:

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
       //......

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
           
            //如果大于0則說明是延遲消息
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                //設(shè)置topic設(shè)置為SCHEDULE_TOPIC_XXXX
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                //基于等級獲取延遲消息隊列
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

               //......
                //基于上述設(shè)置topic和隊列信息
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

         //......
            //基于零拷貝的方式添加
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            //......
    }

MQ啟動的時候底層的消息調(diào)度服務(wù)會基于延遲消息的等級初始化幾個任務(wù),這些任務(wù)會基于定時的間隔檢查是否有到期的消息到來,如果到期則將其投遞到真正topic的隊列中供消費者消費:

基于此邏輯我們也給出ScheduleMessageService的start方法查看調(diào)度器的初始化邏輯,可以看到初始化階段,它會遍歷所有延遲級別并為其初始化一個定時任務(wù):

public void start() {
        if (started.compareAndSet(false, true)) {
            this.timer = new Timer("ScheduleMessageTimerThread", true);
            //遍歷所有延遲級別
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level);
                if (null == offset) {
                    offset = 0L;
                }
                //設(shè)置timer定時器
                if (timeDelay != null) {
                    //投遞給定時器對應等級的定時任務(wù)
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }
          //......
    }

查看DeliverDelayedMessageTimerTask的核心邏輯即run方法,也就是我們所說的定時檢查是否有到期消息,若存在則將其存入原本的topic上,消費者就可以消費了:

@Override
        public void run() {
            try {
                if (isStarted()) {
                    this.executeOnTimeup();
                }
            } catch (Exception e) {
              //......
            }
        }


public void executeOnTimeup() {

            //基于topic和隊列id獲取延遲隊列
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

            long failScheduleOffset = offset;

            if (cq != null) {
                //根據(jù)偏移量獲取有效消息
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                      //......
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();

                              //......

                            long now = System.currentTimeMillis();
                            //計算可消費時間
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                            long countdown = deliverTimestamp - now;
                            //如果小于0說明可消費
                            if (countdown <= 0) {
                                MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);

                                if (msgExt != null) {
                                    try {
                                        //清除延遲級別恢復到真正的topic和隊列id
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                      //......
                                        //放到消息隊列上
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.writeMessageStore
                                                .putMessage(msgInner);

                                         //......
                                        } else {
                                           //......
                                    } catch (Exception e) {
                                       //......
                               
                            } else {
                                //......
                            }
                        } // end of for

                         //......
        }

什么是死信隊列

通俗來說一個消息消費失敗并重試達到最大次數(shù)后,MQ就會將其放到死信隊列中。超過三天該消息就會被銷毀。 需要補充的時死信隊列是針對一個group id為單位創(chuàng)建的隊列,如果一個gourp中都沒有死信的話,那么MQ就不會為這個組創(chuàng)建死信隊列。

Broker是進行消息持久化的

要想了解Broker如何保存數(shù)據(jù),我們必須了解RocketMQ三大文件:

首先是commitlog,producer發(fā)送的消息最終都會通過刷盤機制存到commitlog文件夾下。commitlog下一個文件名為00000000000000000000一旦寫滿,就會再創(chuàng)建一個文件寫,一般來說第二個文件名為00000000001073741824,名稱即是第一個文件的字節(jié)數(shù)。文件大小一般是1G:

然后是consumequeue文件夾,這個文件夾下記錄的都是commitlog中每個topic下的隊列信息物理偏移量、消息大小、hashCode值,如下圖,consumequeue文件夾下會為每個topic創(chuàng)建一個文件夾:

打開任意一個文件夾就會看到這樣一個名為00000000000000000000的文件:

而這個文件內(nèi)部最多維護30w個條目,注意文件中每個條目大約20字節(jié),8字節(jié)代表當前消息在commitLog中的偏移量,4字節(jié)存放消息大小,8字節(jié)存放tag和hashCode的值。

最后就算index,維護消息的索引,基于HashMap結(jié)構(gòu),這個文件使得我們可以通過key或者時間區(qū)間查詢消息:

文件名基本用時間戳生成的,大小一般為400M,差不多維護2000w個索引:

簡單小結(jié)一下RocketMQ持久化的物理文件:MQ會為每個broker維護一個commitlog,一旦文件存放到commitlog,消息就不會丟失。當無法拉取消息時,broker允許producer在30s內(nèi)發(fā)送一個消息,然后直接給消費者消費。

后兩個索引文件的維護是基于一個線程ReputMessageService進行異步維護consumeQueue(邏輯消費隊列)和IndexFile(索引文件)數(shù)據(jù):

RocketMQ如何進行文件讀寫的呢?

對于讀寫IO處理有以下兩種:

  • pageCache:在RocketMQ中,ConsumeQueue存儲數(shù)據(jù)較少,并且是順序讀取,在pageCache預讀的機制下讀取速率是非??陀^的(即使有大量的消息堆積)。操作系統(tǒng)會將一部分內(nèi)存用作pageCache,當數(shù)據(jù)寫入磁盤會先經(jīng)過pageCache然后通過內(nèi)核線程pdflush寫入物理磁盤。 針對ConsumeQueue下關(guān)于消息索引的數(shù)據(jù)查詢時,會先去pageCache查詢是否有數(shù)據(jù),若有則直接返回。若沒有則去ConsumeQueue文件中讀取需要的數(shù)據(jù)以及這個數(shù)據(jù)附近的數(shù)據(jù)一起加載到pageCache中,這樣后續(xù)的讀取就是走緩存,效率自然上去了,這種磁盤預讀目標數(shù)據(jù)的附近數(shù)據(jù)就是我們常說的局部性原理。而commitLog隨機性比較強特定情況下讀寫性能會相對差一些,所以在操作系統(tǒng)層面IO讀寫調(diào)度算法可以改為deadline并選用SSD盤以保證操作系統(tǒng)在指定時間完成數(shù)據(jù)讀寫保證性能。
  • 零拷貝技術(shù):這是MQ基于NIO的FileChannel模型的一種直接將物理文件映射到用戶態(tài)內(nèi)存地址的一種技術(shù),通過MappedByteBuffer,它的工作機制是直接建立內(nèi)存映射,文件數(shù)據(jù)并沒有經(jīng)過JVM和操作系統(tǒng)直接復制的過程,相當于直接操作內(nèi)存,所以效率就非常高??梢詤⒖? 能不能給我講講零拷貝:https://mp.weixin.qq.com/s/zS2n2a4h3YQifBYKFgcUCA

消息刷盤如何實現(xiàn)呢?

兩種方式分別是同步刷盤和異步刷盤

  • 同步刷盤: producer發(fā)送的消息經(jīng)過broker后必須寫入到物理磁盤commitLog后才會返回成功。
  • 異步刷盤:producer發(fā)送的消息到達broker之后,直接返回成功,刷盤的邏輯交給一個異步線程實現(xiàn)。

而上面說的刷盤都是通過MappedByteBuffer.force() 這個方法完成的,需要補充異步刷盤是通過一個異步線程FlushCommitLogService實現(xiàn)的,其底層通過MappedFileQueue針對內(nèi)存中的隊列消息調(diào)用flush進行刷盤從而完成消息寫入:

public boolean flush(final int flushLeastPages) {
        boolean result = true;
        //拉取文件處理偏移量信息
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            //基于mmap零拷貝技術(shù)進行刷盤
            int offset = mappedFile.flush(flushLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            //如果刷盤后的進度和預期一樣說明刷盤成功
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                //維護處理時間
                this.storeTimestamp = tmpTimeStamp;
            }
        }

        return result;
    }

RocketMQ負載均衡

MQ中負載均衡的主要是體現(xiàn)在生產(chǎn)者端和消費者端,Producer負載均衡算法在上述中有序消費中的源碼已經(jīng)說明,這里就不多做贅述,本質(zhì)上就是通過底層的selector進行輪詢投遞:

Message<Order> message = MessageBuilder.withPayload(order).build();
        rocketMQTemplate.syncSendOrderly("ORDER_ADD", message, order.getOrderNo());

再來consumer負載均衡算法,mq客戶端啟動時會開啟一個負載均衡服務(wù)執(zhí)行負載均衡隊列輪詢邏輯,通過負載均衡算法得出每個消費者應該處理的隊列信息后生產(chǎn)拉取消息的請求,交由有MQ客戶端去拉取消息:

默認情況下,負載均衡算法選定隊列后拉取消息進行消費,默認情況下它會根據(jù)隊列數(shù)和消費者數(shù)決定如何進行負載分擔,按照平均算法:

  • 如果消費者數(shù)大于隊列數(shù),則將隊列分配給有限的幾個消費者。
  • 如果消費者數(shù)小于隊列數(shù),默認情況下會按照隊列數(shù)/消費者數(shù)取下限+1進行分配,例如隊列為4,消費者為3,那么每個消費者就會拿到2個隊列,其中第三個消費者則沒有處理任何數(shù)據(jù)。

對應的我們給出MQ客戶端初始化的代碼RebalanceService的run方法,可以看到它會調(diào)用mqClientFactory執(zhí)行負載均衡方法doRebalance:

@Override
    public void run() {
     //.......

        while (!this.isStopped()) {
           //.......
            //客戶端執(zhí)行負載均衡
            this.mqClientFactory.doRebalance();
        }

       //.......
    }

步入其內(nèi)部邏輯會走到RebalanceImpl的doRebalance,它遍歷每個topic進行負載均衡運算:

public void doRebalance(final boolean isOrder) {
       //......
        if (subTable != null) {
            //遍歷每個topic
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    //計算該topic中當前消費者要處理的隊列
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                   //......
                }
            }
        }

       //......
    }

最終我們來到了核心邏輯rebalanceByTopic方法,可以看到它會基于我們查到的topic的隊列和消費者通過策略模式找到對應的消息分配策略AllocateMessageQueueStrategy從而算得當前消費者需要處理的隊列,然后在基于這份結(jié)果調(diào)用updateProcessQueueTableInRebalance生成pullRequest告知客戶端為該消費者拉取消息:

private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
               //......
            }
            case CLUSTERING: {
                //根據(jù)主題獲取消息隊列
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                //根據(jù) topic 與 consumerGroup 獲取所有的 consumerId
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                //......

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
                    //// 排序后才能保證消費者負載策略相對穩(wěn)定
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        //按負載策略進行分配,返回當前消費者實際訂閱的messageQueue集合
                        allocateResult = strategy.allocate(
                                this.consumerGroup,
                                this.mQClientFactory.getClientId(),
                                mqAll,
                                cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                                e);
                        return;
                    }

                  //......

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    //......
                break;
            }
            default:
                break;
        }
    }

對應的我們也給出負載均衡算法AllocateMessageQueueAveragely的源碼,大體算法和筆者上述說明的基本一致,讀者可以參考上圖講解了解一下:

public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
       //......
        //獲取消費者id對應的索引
        int index = cidAll.indexOf(currentCID);
        //基于隊列總數(shù)和客戶端總數(shù)進行取模
        int mod = mqAll.size() % cidAll.size();
        /**
         *計算每個消費者的可消費的平均值:
         * 1. 如果消費者多于隊列就取1
         * 2. 如果消費者少于隊列就按照取模結(jié)果來計算
         */
        
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        //基于當前客戶端的索引定位其處理的隊列位置
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        //獲取消費者的隊列消費范圍
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        //遍歷隊列存入結(jié)果集
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }

完成后基于這份結(jié)果生成pullRequest存入pullRequestQueue中:

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
                                                       final boolean isOrder) {
       //......

        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                 //......
                if (nextOffset >= 0) {
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {
                     //......
                    } else {
                    //生成pullRequest
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }
  //存入pullRequestQueue中
        this.dispatchPullRequest(pullRequestList);

        return changed;
    }

最后消費者的PullMessageService這個線程就會從隊列中取出該請求向MQ發(fā)起消息拉取請求:

@Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                //獲取一個拉消息的請求pullRequest 
                PullRequest pullRequest = this.pullRequestQueue.take();
                //拉消息
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

RocketMQ消息長輪詢

消費者獲取消息大體有兩種方案:

  • 消息隊列主動push:由消息隊列主動去推送消息給消費者,高并發(fā)場景下,對于服務(wù)端性能開銷略大。
  • 消費者定期pull消息:由客戶端主動去拉取消息,但是需要客戶端設(shè)置好拉取的間隔,太頻繁對于消息隊列開銷還是很大,間隔太長消息實時性又無法保證。

對此RocketMQ采用長輪詢機制保證了實時性同時又降低了服務(wù)端的開銷,總的來說,它的整體思路為:

  • 消費者發(fā)起一個消費請求,內(nèi)容傳入topic、queueId和客戶端socket、pullFromThisOffset等數(shù)據(jù)。
  • 服務(wù)端收到請求后查看該隊列是否有數(shù)據(jù),若沒有則掛起。
  • 在一個最大超時時間內(nèi)定時輪詢,如果有則將結(jié)果返回給客戶端。
  • 反之處理超時,也直接告知客戶端超時了也沒有消息。

對應的我們再次給出消費者拉取消費的源碼PullMessageService的run方法,可以看到其內(nèi)部不斷從阻塞隊列中拉取請求并發(fā)起消息拉?。?/p>

@Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                //獲取一個拉消息的請求
                PullRequest pullRequest = this.pullRequestQueue.take();
                //拉消息
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

服務(wù)端的PullMessageProcessor的processRequest就是處理請求的入口,可以看到該方法如果發(fā)現(xiàn)broker沒有看到新的消息就會調(diào)用suspendPullRequest將客戶端連接hold?。?/p>

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
        throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
          //.......

            switch (response.getCode()) {
                case ResponseCode.SUCCESS:
     //.......
                    } else {
                        //.......
                    break;
                case ResponseCode.PULL_NOT_FOUND://沒拉取到消息

                    if (brokerAllowSuspend && hasSuspendFlag) {
                        long pollingTimeMills = suspendTimeoutMillisLong;
                        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                        }
                        //定位請求的topic以及offset和隊列id
                        String topic = requestHeader.getTopic();
                        long offset = requestHeader.getQueueOffset();
                        int queueId = requestHeader.getQueueId();
                        //基于上述數(shù)據(jù)生成pullRequest并調(diào)用suspendPullRequest將其hold住
                        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
                        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                        response = null;
                        break;
                    }

                case ResponseCode.PULL_RETRY_IMMEDIATELY:
                    break;
                case ResponseCode.PULL_OFFSET_MOVED:
                    //.......
                    break;
                default:
                    assert false;
            }
        } else {
           //.......
        }

         //.......
        return response;
    }

然后PullRequestHoldService就會基于上述一部掛起的數(shù)據(jù)定時檢查是否有新消息到來,直到超期:

@Override
    public void run() {
        log.info("{} service started", this.getServiceName());
        while (!this.isStopped()) {
            try {
                //等待
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    this.waitForRunning(5 * 1000);
                } else {
                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }

                long beginLockTimestamp = this.systemClock.now();
                //基于超時時限內(nèi)定時查看topic中的隊列是否有新消息,如果有或者超時則返回
                this.checkHoldRequest();
                long costTime = this.systemClock.now() - beginLockTimestamp;
                if (costTime > 5 * 1000) {
                    log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
                }
            } catch (Throwable e) {
                //......
            }
        }

       //......
    }

這里我們也給出checkHoldRequest的調(diào)用可以看到,如果查到隊列offset大于用戶傳的說明就有新消息則返回,超時則直接返回:

public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
        //......
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (mpr != null) {
            List<PullRequest> requestList = mpr.cloneListAndClear();
            if (requestList != null) {
              //......

                for (PullRequest request : requestList) {
                    long newestOffset = maxOffset;
                    if (newestOffset <= request.getPullFromThisOffset()) {
                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                    }
                    //拉取到新消息就返回
                    if (newestOffset > request.getPullFromThisOffset()) {
                        //......

                        if (match) {
                            try {
                                this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                    request.getRequestCommand());
                            } catch (Throwable e) {
                                log.error("execute request when wakeup failed.", e);
                            }
                            continue;
                        }
                    }
                    //超時也返回
                    if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                        try {
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue;
                    }

                    replayList.add(request);
                }

                //......
            }
        }
    }


責任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關(guān)推薦

2011-03-29 13:23:54

CACTI

2009-10-15 16:55:05

綜合布線系統(tǒng)測試

2009-09-23 17:52:16

Hibernate概念Hibernate常見

2009-10-26 11:11:22

接入網(wǎng)常見問題

2010-05-13 10:22:45

綜合布線系統(tǒng)測試

2009-07-07 10:13:57

Servlet學習

2010-07-01 17:18:02

UML包圖

2013-11-14 15:47:29

SDN問題答疑

2011-04-01 13:55:24

Java

2011-05-06 15:39:55

硒鼓

2010-07-21 09:10:02

Perl常見問題

2010-08-06 09:30:03

思科IOS升級

2018-03-08 14:00:02

2010-05-19 11:35:13

SVN

2010-03-25 09:08:43

CentOS配置

2009-11-02 17:25:04

ADSL常見問題

2011-02-22 14:00:16

vsftpd

2010-05-13 13:27:23

2013-09-27 10:08:51

VMware虛擬化

2015-12-21 11:45:27

C語言常見問題錯誤
點贊
收藏

51CTO技術(shù)棧公眾號