自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

深入理解 RocketMQ 廣播消費(fèi)

開發(fā) 前端
同一 Topic 下的一條消息只會(huì)被同一消費(fèi)組中的一個(gè)消費(fèi)者消費(fèi)。也就是說,消息被負(fù)載均衡到了同一個(gè)消費(fèi)組的多個(gè)消費(fèi)者實(shí)例上。

這篇文章我們聊聊廣播消費(fèi),因?yàn)閺V播消費(fèi)在某些場景下真的有奇效。筆者會(huì)從基礎(chǔ)概念、實(shí)現(xiàn)機(jī)制、實(shí)戰(zhàn)案例三個(gè)方面一一展開,希望能幫助到大家。

1 基礎(chǔ)概念

RocketMQ 支持兩種消息模式:集群消費(fèi)( Clustering )和廣播消費(fèi)( Broadcasting )。

集群消費(fèi):

同一 Topic 下的一條消息只會(huì)被同一消費(fèi)組中的一個(gè)消費(fèi)者消費(fèi)。也就是說,消息被負(fù)載均衡到了同一個(gè)消費(fèi)組的多個(gè)消費(fèi)者實(shí)例上。

圖片圖片

廣播消費(fèi):

當(dāng)使用廣播消費(fèi)模式時(shí),每條消息推送給集群內(nèi)所有的消費(fèi)者,保證消息至少被每個(gè)消費(fèi)者消費(fèi)一次。

圖片圖片

2 源碼解析

首先下圖展示了廣播消費(fèi)的代碼示例。

public class PushConsumer {
    public static final String CONSUMER_GROUP = "myconsumerGroup";
    public static final String DEFAULT_NAMESRVADDR = "localhost:9876";
    public static final String TOPIC = "mytest";
    public static final String SUB_EXPRESSION = "TagA || TagC || TagD";

    public static void main(String[] args) throws InterruptedException, MQClientException {
        // 定義 DefaultPushConsumer 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        // 定義名字服務(wù)地址
        consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        // 定義消費(fèi)讀取位點(diǎn)
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 定義消費(fèi)模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 訂閱主題信息
        consumer.subscribe(TOPIC, SUB_EXPRESSION);
        // 訂閱消息監(jiān)聽器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            try {
                for (MessageExt messageExt : msgs) {
                    System.out.println(new String(messageExt.getBody()));
                }
            }catch (Exception e) {
                e.printStackTrace();
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
        System.out.printf("Broadcast Consumer Started.%n");
    }
}

和集群消費(fèi)不同的點(diǎn)在于下面的代碼:

consumer.setMessageModel(MessageModel.BROADCASTING);

接下來,我們從源碼角度來看看廣播消費(fèi)和集群消費(fèi)有哪些差異點(diǎn) ?

首先進(jìn)入 DefaultMQPushConsumerImpl 類的 start 方法 , 分析啟動(dòng)流程中他們兩者的差異點(diǎn):

圖片圖片

▍ 差異點(diǎn)1:拷貝訂閱關(guān)系

private void copySubscription() throws MQClientException {
    try {
       Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
       if (sub != null) {
          for (final Map.Entry<String, String> entry : sub.entrySet()) {
              final String topic = entry.getKey();
              final String subString = entry.getValue();
              SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            }
        }
       if (null == this.messageListenerInner) {
          this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
       }
       // 注意下面的代碼 , 集群模式下自動(dòng)訂閱重試主題 
       switch (this.defaultMQPushConsumer.getMessageModel()) {
           case BROADCASTING:
               break;
           case CLUSTERING:
                final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
                this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                break;
            default:
                break;
        }
    } catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}

在集群模式下,會(huì)自動(dòng)訂閱重試隊(duì)列,而廣播模式下,并沒有這段代碼。也就是說廣播模式下,不支持消息重試。

▍ 差異點(diǎn)2:本地進(jìn)度存儲(chǔ)

switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    case CLUSTERING:
        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    default:
        break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);

我們可以看到消費(fèi)進(jìn)度存儲(chǔ)的對(duì)象是:LocalFileOffsetStore , 進(jìn)度文件存儲(chǔ)在如下的主目錄 /{用戶主目錄}/.rocketmq_offsets。

public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
    "rocketmq.client.localOffsetStoreDir",
    System.getProperty("user.home") + File.separator + ".rocketmq_offsets");

進(jìn)度文件是 /mqClientId/{consumerGroupName}/offsets.json 。

this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + this.mQClientFactory.getClientId() + File.separator + this.groupName + File.separator + "offsets.json";

筆者創(chuàng)建了一個(gè)主題 mytest , 包含4個(gè)隊(duì)列,進(jìn)度文件內(nèi)容如下:

圖片圖片

消費(fèi)者啟動(dòng)后,我們可以將整個(gè)流程簡化如下圖,并繼續(xù)整理差異點(diǎn):

圖片圖片

▍ 差異點(diǎn)3:負(fù)載均衡消費(fèi)該主題的所有 MessageQueue

進(jìn)入負(fù)載均衡抽象類 RebalanceImpl 的rebalanceByTopic方法 。

private void rebalanceByTopic(final String topic, final boolean isOrder) {
    switch (messageModel) {
        case BROADCASTING: {
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            if (mqSet != null) {
                boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                // 省略代碼
            } else {
                log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
            }
            break;
        }
        case CLUSTERING: {
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
            // 省略代碼
            if (mqSet != null && cidAll != null) {
                List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                mqAll.addAll(mqSet);

                Collections.sort(mqAll);
                Collections.sort(cidAll);

                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                List<MessageQueue> allocateResult = null;
                try {
                     allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        // 省略日志打印代碼
                        return;
                    }
                Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                if (allocateResult != null) {
                    allocateResultSet.addAll(allocateResult);
                }
                boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                //省略代碼
            }
            break;
        }
        default:
            break;
    }
}

從上面代碼我們可以看到消息模式為廣播消費(fèi)模式時(shí),消費(fèi)者會(huì)消費(fèi)該主題下所有的隊(duì)列,這一點(diǎn)也可以從本地的進(jìn)度文件 offsets.json 得到印證。

▍ 差異點(diǎn)4:不支持順序消息

我們知道消費(fèi)消息順序服務(wù)會(huì)向 Borker 申請鎖 。消費(fèi)者根據(jù)分配的隊(duì)列 messageQueue ,向 Borker 申請鎖 ,如果申請成功,則會(huì)拉取消息,如果失敗,則定時(shí)任務(wù)每隔 20 秒會(huì)重新嘗試。

if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                ConsumeMessageOrderlyService.this.lockMQPeriodically();
            } catch (Throwable e) {
                log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
            }
        }
    }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}

但是從上面的代碼,我們發(fā)現(xiàn)只有在集群消費(fèi)的時(shí)候才會(huì)定時(shí)申請鎖,這樣就會(huì)導(dǎo)致廣播消費(fèi)時(shí),無法為負(fù)載均衡的隊(duì)列申請鎖,導(dǎo)致拉取消息服務(wù)一直無法獲取消息數(shù)據(jù)。

筆者修改消費(fèi)例子,在消息模式為廣播模式的場景下,將消費(fèi)模式從并發(fā)消費(fèi)修改為順序消費(fèi)。

consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
    try {
        for (MessageExt messageExt : msgs) {
            System.out.println(new String(messageExt.getBody()));
        }
    }catch (Exception e) {
        e.printStackTrace();
    }
    return ConsumeOrderlyStatus.SUCCESS;
});

圖片圖片

通過 IDEA DEBUG 圖,筆者觀察到因?yàn)樨?fù)載均衡后的隊(duì)列無法獲取到鎖,所以拉取消息的線程無法發(fā)起拉取消息請求到 Broker , 也就不會(huì)走到消費(fèi)消息的流程。

因此,廣播消費(fèi)模式并不支持順序消息。

▍ 差異點(diǎn)5:并發(fā)消費(fèi)消費(fèi)失敗時(shí),沒有重試

進(jìn)入并發(fā)消息消費(fèi)類ConsumeMessageConcurrentlyService 的處理消費(fèi)結(jié)果方法 processConsumeResult。

switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
            MessageExt msg = consumeRequest.getMsgs().get(i);
            log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
        }
        break;
    case CLUSTERING:
        List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
            MessageExt msg = consumeRequest.getMsgs().get(i);
            boolean result = this.sendMessageBack(msg, context);
            if (!result) {
                msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                msgBackFailed.add(msg);
            }
        }

        if (!msgBackFailed.isEmpty()) {
            consumeRequest.getMsgs().removeAll(msgBackFailed);

            this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
        }
        break;
    default:
        break;
}

消費(fèi)消息失敗后,集群消費(fèi)時(shí),消費(fèi)者實(shí)例會(huì)通過 CONSUMER_SEND_MSG_BACK 請求,將失敗消息發(fā)回到 Broker 端。

但在廣播模式下,僅僅是打印了消息信息。因此,廣播模式下,并沒有消息重試。

3 實(shí)戰(zhàn)案例

廣播消費(fèi)主要用于兩種場景:消息推送和緩存同步。

3.1 消息推送

筆者第一次接觸廣播消費(fèi)的業(yè)務(wù)場景是神州專車司機(jī)端的消息推送。

用戶下單之后,訂單系統(tǒng)生成專車訂單,派單系統(tǒng)會(huì)根據(jù)相關(guān)算法將訂單派給某司機(jī),司機(jī)端就會(huì)收到派單推送。

圖片圖片

推送服務(wù)是一個(gè) TCP 服務(wù)(自定義協(xié)議),同時(shí)也是一個(gè)消費(fèi)者服務(wù),消息模式是廣播消費(fèi)。

司機(jī)打開司機(jī)端 APP 后,APP 會(huì)通過負(fù)載均衡和推送服務(wù)創(chuàng)建長連接,推送服務(wù)會(huì)保存 TCP 連接引用 (比如司機(jī)編號(hào)和 TCP channel 的引用)。

派單服務(wù)是生產(chǎn)者,將派單數(shù)據(jù)發(fā)送到 MetaQ ,  每個(gè)推送服務(wù)都會(huì)消費(fèi)到該消息,推送服務(wù)判斷本地內(nèi)存中是否存在該司機(jī)的 TCP channel , 若存在,則通過 TCP 連接將數(shù)據(jù)推送給司機(jī)端。

肯定有同學(xué)會(huì)問:假如網(wǎng)絡(luò)原因,推送失敗怎么處理 ?有兩個(gè)要點(diǎn):

  1. 司機(jī)端 APP 定時(shí)主動(dòng)拉取派單信息;
  2. 當(dāng)推送服務(wù)沒有收到司機(jī)端的 ACK 時(shí) ,也會(huì)一定時(shí)限內(nèi)再次推送,達(dá)到閾值后,不再推送。

3.2 緩存同步

高并發(fā)場景下,很多應(yīng)用使用本地緩存,提升系統(tǒng)性能 。

本地緩存可以是 HashMap 、ConcurrentHashMap ,也可以是緩存框架 Guava Cache 或者 Caffeine cache 。

圖片圖片

如上圖,應(yīng)用A啟動(dòng)后,作為一個(gè) RocketMQ 消費(fèi)者,消息模式設(shè)置為廣播消費(fèi)。為了提升接口性能,每個(gè)應(yīng)用節(jié)點(diǎn)都會(huì)將字典表加載到本地緩存里。

當(dāng)字典表數(shù)據(jù)變更時(shí),可以通過業(yè)務(wù)系統(tǒng)發(fā)送一條消息到 RocketMQ ,每個(gè)應(yīng)用節(jié)點(diǎn)都會(huì)消費(fèi)消息,刷新本地緩存。

4 總結(jié)

集群消費(fèi)和廣播消費(fèi)模式下,各功能的支持情況如下:

功能

集群消費(fèi)

廣播消費(fèi)

順序消息

支持

不支持

重置消費(fèi)位點(diǎn)

支持

不支持

消息重試

支持

不支持

消費(fèi)進(jìn)度

服務(wù)端維護(hù)

客戶端維護(hù)

廣播消費(fèi)主要用于兩種場景:消息推送和緩存同步。

參考資料 :

http://www.scjtxx.cn/article/714277.html

https://ost.51cto.com/posts/21100

責(zé)任編輯:武曉燕 來源: 勇哥java實(shí)戰(zhàn)分享
相關(guān)推薦

2020-11-13 16:40:05

RocketMQ延遲消息架構(gòu)

2010-06-01 15:25:27

JavaCLASSPATH

2016-12-08 15:36:59

HashMap數(shù)據(jù)結(jié)構(gòu)hash函數(shù)

2020-07-21 08:26:08

SpringSecurity過濾器

2013-09-22 14:57:19

AtWood

2009-09-25 09:14:35

Hibernate日志

2023-10-19 11:12:15

Netty代碼

2021-02-17 11:25:33

前端JavaScriptthis

2020-09-23 10:00:26

Redis數(shù)據(jù)庫命令

2017-01-10 08:48:21

2017-08-15 13:05:58

Serverless架構(gòu)開發(fā)運(yùn)維

2024-02-21 21:14:20

編程語言開發(fā)Golang

2019-06-25 10:32:19

UDP編程通信

2022-11-04 09:43:05

Java線程

2015-11-04 09:57:18

JavaScript原型

2021-05-13 21:27:24

ThreadLocal多線程多線程并發(fā)安全

2013-06-14 09:27:51

Express.jsJavaScript

2021-04-20 23:25:16

執(zhí)行函數(shù)變量

2023-02-10 08:11:43

Linux系統(tǒng)調(diào)用

2017-01-13 22:42:15

iosswift
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)