自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

RocketMQ Tag在實(shí)際業(yè)務(wù)中有什么作用?

開發(fā)
本文分析了 RocketMQ 的 Tag 功能,它在消息過濾和分類處理方面提供了極大的便利,適用于各種需要高效、低延遲消息傳遞的場景。

Tag 是 RocketMQ 提供的一種消息過濾機(jī)制,允許生產(chǎn)者在發(fā)送消息時(shí)指定一個(gè)或多個(gè)標(biāo)簽,消費(fèi)者則可以根據(jù)這些標(biāo)簽來選擇性地消費(fèi)消息。這篇文章,我們將詳細(xì)介紹 RocketMQ 中 Tag 的原理、源碼分析以及示例。

Tag 的原理

在 RocketMQ 中,Tag 主要用于消息過濾。每個(gè)消息可以攜帶一個(gè) Tag,消費(fèi)者可以根據(jù) Tag 來訂閱特定的消息,從而實(shí)現(xiàn)消息的過濾和分類處理。

(1) 消息發(fā)送階段

生產(chǎn)者在發(fā)送消息時(shí),可以指定一個(gè) Tag。這個(gè) Tag 會被附加到消息的元數(shù)據(jù)中,并存儲在 RocketMQ 的消息存儲系統(tǒng)中。

(2) 消息存儲階段

消息被存儲在 RocketMQ 的 Broker 中,消息的元數(shù)據(jù)(包括 Tag)也會被存儲。

(3) 消息消費(fèi)階段

消費(fèi)者在訂閱消息時(shí),可以指定要消費(fèi)的 Tag。Broker 會根據(jù)消費(fèi)者訂閱的 Tag,將符合條件的消息投遞給消費(fèi)者。

(4) 源碼分析

為了更好的理解 Tag的原理,我們通過 RocketMQ 中Tag 相關(guān)的幾個(gè)主要代碼片段進(jìn)行演示。

生產(chǎn)者發(fā)送消息時(shí)的代碼:

// 創(chuàng)建消息實(shí)例,并指定Topic和Tag
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());

// 發(fā)送消息
SendResult sendResult = producer.send(msg);

在 Message 類中,Tag 是通過構(gòu)造函數(shù)傳遞的,并存儲在 Message 對象的 tags 字段中。

消費(fèi)者訂閱消息時(shí)的代碼:

// 創(chuàng)建消費(fèi)者實(shí)例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");

// 訂閱Topic,并指定Tag
consumer.subscribe("TopicTest", "TagA");

// 注冊消息監(jiān)聽器
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

// 啟動消費(fèi)者
consumer.start();

在 DefaultMQPushConsumer 類中,通過 subscribe 方法指定要訂閱的 Topic 和 Tag,RocketMQ 內(nèi)部會根據(jù)訂閱的 Tag 進(jìn)行消息過濾。

示例

下面是一個(gè)完整的示例,演示如何使用 RocketMQ 的 Tag 功能。

(1) 生產(chǎn)者代碼

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建生產(chǎn)者實(shí)例
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
        producer.setNamesrvAddr("localhost:9876");

        // 啟動生產(chǎn)者
        producer.start();

        // 發(fā)送消息
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes());
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        // 關(guān)閉生產(chǎn)者
        producer.shutdown();
    }
}

(2) 消費(fèi)者代碼

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建消費(fèi)者實(shí)例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
        consumer.setNamesrvAddr("localhost:9876");

        // 訂閱Topic,并指定Tag
        consumer.subscribe("TopicTest", "TagA");

        // 注冊消息監(jiān)聽器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 啟動消費(fèi)者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

盡管 RocketMQ 的 Tag 功能在消息過濾和分類處理方面提供了極大的便利,但也有其優(yōu)缺點(diǎn)。下面詳細(xì)分析一下:

優(yōu)點(diǎn)

  • 簡單易用:Tag 的使用非常簡單,生產(chǎn)者只需在發(fā)送消息時(shí)指定 Tag,消費(fèi)者在訂閱消息時(shí)指定相應(yīng)的 Tag 即可。
  • 高效過濾:通過 Tag 進(jìn)行消息過濾,減少了消費(fèi)者處理不相關(guān)消息的開銷,從而提高了系統(tǒng)的性能。
  • 靈活性高:支持一個(gè) Topic 下多個(gè) Tag,使得消息的分類和過濾更加靈活。
  • 低延遲:Tag 過濾是在 Broker 端進(jìn)行的,不會顯著增加消息傳遞的延遲。
  • 減少網(wǎng)絡(luò)帶寬:消費(fèi)者只會接收到自己感興趣的消息,減少了不必要的網(wǎng)絡(luò)傳輸,從而節(jié)省了帶寬。

缺點(diǎn)

  • 單一維度過濾:Tag 只能提供單一維度的消息過濾,無法進(jìn)行更復(fù)雜的多維度過濾。如果需要多維度過濾,需要結(jié)合其他機(jī)制(如消息屬性)來實(shí)現(xiàn)。
  • 有限的靈活性:Tag 的數(shù)量和種類在設(shè)計(jì)階段需要規(guī)劃好,靈活性有限。如果后期需要添加新的 Tag,可能需要重新設(shè)計(jì)和部署。
  • 不支持復(fù)雜邏輯:Tag 過濾支持的邏輯較為簡單,只能進(jìn)行基于字符串匹配的過濾,無法支持復(fù)雜的過濾邏輯。
  • 管理復(fù)雜性:隨著系統(tǒng)規(guī)模的增大,Tag 的管理和維護(hù)可能變得復(fù)雜,尤其是在多個(gè)應(yīng)用共享同一個(gè) Topic 的情況下。
  • 潛在的性能瓶頸:雖然 Tag 過濾在大多數(shù)場景下性能良好,但在極端情況下(如大量不同 Tag 的消息和高并發(fā)消費(fèi)),可能會帶來性能瓶頸。

適用場景

  • 日志和監(jiān)控:不同類型的日志和監(jiān)控?cái)?shù)據(jù)可以通過 Tag 進(jìn)行分類和過濾。
  • 電商系統(tǒng):不同類型的訂單、商品信息等可以通過 Tag 進(jìn)行分類和過濾,消費(fèi)者只處理自己感興趣的消息。
  • 金融系統(tǒng):不同類型的交易、通知等可以通過 Tag 進(jìn)行分類和過濾,提高系統(tǒng)的處理效率。
  • 社交平臺:不同類型的消息(如評論、點(diǎn)贊、私信等)可以通過 Tag 進(jìn)行分類和過濾,提供更精準(zhǔn)的消息推送。

總結(jié)

本文分析了 RocketMQ 的 Tag 功能,它在消息過濾和分類處理方面提供了極大的便利,適用于各種需要高效、低延遲消息傳遞的場景。然而,它也有一些局限性,如單一維度過濾、管理復(fù)雜性等。

在實(shí)際應(yīng)用中,需要根據(jù)具體需求和系統(tǒng)設(shè)計(jì),合理使用 Tag 功能,結(jié)合其他機(jī)制來實(shí)現(xiàn)更復(fù)雜的消息過濾和處理。

責(zé)任編輯:趙寧寧 來源: 猿java
相關(guān)推薦

2023-06-12 07:02:53

物聯(lián)網(wǎng)數(shù)據(jù)決策

2019-04-28 17:39:06

大數(shù)據(jù)區(qū)塊鏈數(shù)據(jù)隱私安全

2018-11-06 10:51:07

Redis開發(fā)存儲系統(tǒng)

2024-11-28 08:15:44

LLM大型語言模型人工智能

2022-03-02 14:08:35

區(qū)塊鏈供應(yīng)鏈技術(shù)

2010-02-25 17:22:39

WCF服務(wù)行為

2009-12-03 18:21:15

軟路由技術(shù)

2010-01-14 10:35:34

VB.NET指針

2010-01-08 18:02:33

VB.NET事件

2010-01-15 13:30:53

VB.NET Tool

2023-06-25 11:38:31

2022-03-21 08:55:53

RocketMQ客戶端過濾機(jī)制

2010-01-07 16:16:03

VB.NET變量作用域

2010-01-19 15:21:55

VB.NET區(qū)域性

2009-11-19 15:14:43

路由器系統(tǒng)

2009-11-23 17:56:45

業(yè)務(wù)路由器

2010-01-20 18:34:46

VB.NET Syst

2025-02-27 09:30:00

MySQLLog Buffer數(shù)據(jù)庫

2023-12-30 11:01:19

C語言C++編程
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號