自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

RocketMQ 和 RabbitMQ 的比較以及 RocketMQ 的使用

開發(fā) 架構(gòu)
消息隊列在項目中會經(jīng)常用到,目前我們使用的是 RabbitMQ,但在 Java 技術(shù)棧下,RocketMQ 使用的比較多。下面比較下 RabbitMQ 和 RocketMQ。


RabbitMQ 和 RocketMQ 對比

1.設(shè)計理念和架構(gòu)

RabbitMQ:

基于 AMQP(Advanced Message Queuing Protocol)協(xié)議,使用 Erlang 語言開發(fā)。Erlang 的天生高并發(fā)和容錯性使得 RabbitMQ 在穩(wěn)定性方面表現(xiàn)出色。RabbitMQ 的核心概念是 Exchange(交換機)和 Queue(隊列),消息通過 Exchange 路由到 Queue,再由消費者消費。這種模型非常靈活,支持多種消息路由模式。

RocketMQ:

源于阿里巴巴,后捐獻給 Apache 基金會,所以現(xiàn)在的官網(wǎng)是:https://rocketmq.apache.org/ 。使用 Java 語言開發(fā),更貼近 Java 技術(shù)棧。RocketMQ 的核心概念是 Topic(主題),消息發(fā)送到 Topic,消費者訂閱 Topic 進行消費。RocketMQ 的設(shè)計目標是高吞吐量、低延遲和高可靠性,適合大規(guī)模分布式系統(tǒng)。

RocketMQ 的設(shè)計理念更偏向于解決互聯(lián)網(wǎng)場景下的具體問題,如海量消息處理、消息順序性等。

2.性能

吞吐量:RocketMQ 在吞吐量方面通常優(yōu)于 RabbitMQ,尤其是在高并發(fā)場景下。RocketMQ 的設(shè)計更偏向于高吞吐的消息傳遞,而 RabbitMQ 更注重消息的可靠性和靈活性。

延遲:兩者在延遲方面都表現(xiàn)不錯,但在極端高負載情況下,RocketMQ 的延遲可能更低一些。

不過在 ToB 的一些業(yè)務場景,RabbitMQ 是可以勝任的。

3.特性

消息路由:RabbitMQ 支持多種 Exchange 類型(Direct、Topic、Fanout、Headers),提供更豐富的消息路由策略。RocketMQ 主要使用 Topic 進行消息路由,相對簡單。

消息過濾:RocketMQ 支持基于 Tag 和 SQL 的消息過濾,方便消費者按需訂閱消息。RabbitMQ 的消息過濾相對較弱。

事務消息:RocketMQ 提供了分布式事務消息的支持,可以保證消息生產(chǎn)和本地事務的原子性。RabbitMQ 沒有直接提供事務消息的支持,需要通過其他方式實現(xiàn)。

延遲消息:RocketMQ 支持延遲消息,可以實現(xiàn)定時任務等功能。RabbitMQ 通過插件可以實現(xiàn)類似功能。

監(jiān)控和管理:RocketMQ 和 RabbitMQ 都提供了豐富的監(jiān)控指標和管理工具,相比之下我更喜歡 RocketMQ 的管理工具。

4.創(chuàng)新點

RabbitMQ:

  • 插件系統(tǒng)設(shè)計靈活,易于擴展
  • 虛擬主機(vhost)概念,實現(xiàn)多租戶隔離
  • 內(nèi)存和磁盤節(jié)點的混合部署方案

RocketMQ:

  • 基于文件的消息存儲系統(tǒng),避免了緩存未刷盤導致的消息丟失
  • Pull 模式和長輪詢機制的結(jié)合,平衡了實時性和性能
  • 消息過濾支持在 Broker 端完成,減少網(wǎng)絡傳輸開銷

5.Exchange 和 Topic 的區(qū)別

RabbitMQ的 Exchange 和 RocketMQ 的 Topic 在消息路由機制上有以下主要區(qū)別:

概念和角色

RabbitMQ Exchange 是一個路由組件,負責接收生產(chǎn)者發(fā)送的消息并將其路由到一個或多個隊列,作為消息的"交換機",它不存儲消息,只負責消息的路由轉(zhuǎn)發(fā),需要通過 binding key 與 Queue 建立綁定關(guān)系。

RocketMQ Topic 是消息的邏輯分類,直接作為消息的存儲和投遞單元,包含多個消息隊列(MessageQueue),用于存儲消息,消費者直接訂閱 Topic 即可接收消息。

RabbitMQ 的 key 綁定和 Exchange、隊列的關(guān)系,一開始不太容易理解,相比之下 RocketMQ 的 Topic 和隊列關(guān)系更清晰。

路由方式

RabbitMQ Exchange 支持四種路由策略,路由更加靈活,可以實現(xiàn)復雜的消息分發(fā)模式

  • Direct:根據(jù)routing key精確匹配
  • Topic:根據(jù)routing key的模式匹配
  • Fanout:廣播到所有綁定的隊列
  • Headers:根據(jù)消息屬性匹配

RocketMQ Topic 采用發(fā)布/訂閱模式,更加簡單直接,通過Tag機制實現(xiàn)消息過濾。支持消息隊列的負載均衡。

消息存儲

RabbitMQ Exchange 不存儲消息,消息存儲在 Queue 中,消息一旦被路由到 Queue 就與 Exchange 無關(guān)。

RocketMQ Topic 直接存儲消息,每個 Topic 包含多個消息隊列。消息存儲在 CommitLog 中,通過 ConsumeQueue 建立索引。

Docker-compose 部署 RocketMQ

同樣是使用容器進行部署,RabbitMQ 一個容器搞定,RocketMQ 需要兩個容器(NameServer 和 Broker),如果需要 web 管理工具,還需要再單獨部署一個容器。

當進行集群模式部署時,RocketMQ 的下載包中有各種集群模式的示例配置文件,這對新手非常友好。

下面是部署 RocketMQ 的 docker-comopose 文件的內(nèi)容:

version: '3'

# 定義自定義網(wǎng)絡
networks:
  rmq_network:
    driver: bridge
    ipam:
      config:
        - subnet: 192.168.10.0/24

services:
  # RocketMQ Name Server
  namesrv:
    image: apache/rocketmq:5.1.4
    container_name: rmqnamesrv
    networks:
      rmq_network:
        ipv4_address: 192.168.10.2
    ports:
      - 9876:9876
    volumes:
      - ./data/namesrv/logs:/home/rocketmq/logs
    command: sh mqnamesrv
    environment:
      - JAVA_OPT_EXT=-server -Xms512m -Xmx512m

  # RocketMQ Broker
  broker:
    image: apache/rocketmq:5.3.1
    container_name: rmqbroker
    networks:
      rmq_network:
        ipv4_address: 192.168.10.3
    ports:
      - 10909:10909
      - 10911:10911
      - 10912:10912
    volumes:
      - ./data/broker/logs:/home/rocketmq/logs
      - ./data/broker/store:/home/rocketmq/store
      - ./conf/broker.conf:/home/rocketmq/conf/broker.conf
    command: sh mqbroker -c /home/rocketmq/conf/broker.conf
    environment:
      - JAVA_OPT_EXT=-server -Xms512m -Xmx512m
    depends_on:
      - namesrv

  # RocketMQ Dashboard 
  dashboard:
    image: apacherocketmq/rocketmq-dashboard:1.0.0
    container_name: rmqdashboard
    networks:
      rmq_network:
        ipv4_address: 192.168.10.4
    ports:
      - 19080:8080
    environment:
      - JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876
    depends_on:
      - namesrv

broker.conf 的內(nèi)容如下:

# broker集群名稱
brokerClusterName = DefaultCluster
# broker名稱
brokerName = broker-a
# broker編號,0表示master,大于0表示slave
brokerId = 0
# 刪除過期文件時間點,默認是凌晨4點
deleteWhen = 04
# 文件保留時間,默認48小時
fileReservedTime = 48
# broker角色,ASYNC_MASTER=異步復制Master,SYNC_MASTER=同步雙寫Master,SLAVE=slave節(jié)點
brokerRole = ASYNC_MASTER
# 刷盤方式,ASYNC_FLUSH=異步刷盤,SYNC_FLUSH=同步刷盤
flushDiskType = ASYNC_FLUSH
# nameServer地址,分號分割
namesrvAddr = namesrv:9876
# 在發(fā)送消息時,自動創(chuàng)建服務器不存在的topic,默認創(chuàng)建的隊列數(shù)
defaultTopicQueueNums = 4
# 是否允許 Broker 自動創(chuàng)建Topic,建議線下開啟,線上關(guān)閉
autoCreateTopicEnable = true
# 是否允許 Broker 自動創(chuàng)建訂閱組,建議線下開啟,線上關(guān)閉
autoCreateSubscriptionGroup = true
# brokerIP1 注意:本地測試使用本機的宿主機的IP
brokerIP1=192.168.1.109

代碼示例

對于消息隊列,單播、廣播、重試,這三種場景用的比較多。下面就看看這三個場景是怎么實現(xiàn)的。

創(chuàng)建生產(chǎn)者 Service 類來處理消息的發(fā)送:

@Slf4j
@Service
public class MessageProducerService {

    // RocketMQ消息主題
    public static final String TOPIC_UNICAST = "topic-unicast";
    public static final String TOPIC_BROADCAST = "topic-broadcast";
    public static final String TOPIC_RETRY = "topic-retry";

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 發(fā)送單播消息(點對點)
     * 單播消息會被消費組中的某一個消費者消費
     */
    public void sendUnicastMessage(MessageEvent message) {
        rocketMQTemplate.convertAndSend(TOPIC_UNICAST, message);
        log.info("Unicast message sent: {}", message);
    }

    /**
     * 發(fā)送廣播消息
     * 廣播消息會被所有訂閱該主題的消費者消費
     */
    public void sendBroadcastMessage(MessageEvent message) {
        rocketMQTemplate.convertAndSend(TOPIC_BROADCAST, message);
        log.info("Broadcast message sent: {}", message);
    }

    /**
     * 發(fā)送需要重試的消息
     * 使用異步發(fā)送方式,并在回調(diào)中處理發(fā)送結(jié)果
     */
    public void sendRetryMessage(MessageEvent message) {
        rocketMQTemplate.asyncSend(TOPIC_RETRY, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("Retry message sent successfully: {}, result: {}", message, sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("Failed to send retry message: {}, error: {}", message, throwable.getMessage());
            }
        });
    }
}

創(chuàng)建消費者 Service 類來處理消息的接收:

@Slf4j
@Service
public class MessageConsumerService {

    // RocketMQ消息主題
    public static final String TOPIC_UNICAST = "topic-unicast";
    public static final String TOPIC_BROADCAST = "topic-broadcast";
    public static final String TOPIC_RETRY = "topic-retry";
    /**
     * 單播消息消費者
     * consumeMode默認為CONCURRENTLY(并發(fā)消費)
     */
    @Service
    @RocketMQMessageListener(
            topic = TOPIC_UNICAST,
            consumerGroup = "unicast-consumer-group"
    )
    public class UnicastMessageListener implements RocketMQListener<MessageEvent> {
        @Override
        public void onMessage(MessageEvent message) {

            log.info("Received unicast message: {}", message);
        }
    }

    /**
     * 廣播消息消費者
     * messageModel設(shè)置為BROADCASTING表示廣播模式
     */
    @Service
    @RocketMQMessageListener(
            topic = TOPIC_BROADCAST,
            consumerGroup = "broadcast-consumer-group",
            messageModel = MessageModel.BROADCASTING
    )
    public class BroadcastMessageListener implements RocketMQListener<MessageEvent> {
        @Override
        public void onMessage(MessageEvent message) {
            log.info("Received broadcast message: {}", message);
        }
    }

    /**
     * 重試消息消費者
     * 配置了重試次數(shù)和重試間隔
     */
    @Service
    @RocketMQMessageListener(
            topic = TOPIC_RETRY,
            consumerGroup = "retry-consumer-group",
            maxReconsumeTimes = 3,    // 最大重試次數(shù)
            delayLevelWhenNextConsume = 2  // 重試間隔級別
    )
    public class RetryMessageListener implements RocketMQListener<MessageEvent> {
        @Override
        public void onMessage(MessageEvent message) {
            try {
                // 模擬處理失敗的情況
                if (message.getContent().contains("error")) {
                    throw new RuntimeException("Processing failed, will retry");
                }
                log.info("Received retry message: {}", message);
            } catch (Exception e) {
                log.error("Error processing message: {}, error: {}", message, e.getMessage());
                throw e; // 拋出異常觸發(fā)重試機制
            }
        }
    }
}

創(chuàng)建 MessageController 來進行測試:

@RestController
@RequestMapping("/api/messages")
public class MessageController {

    @Autowired
    private MessageProducerService producerService;

    @PostMapping("/unicast")
    public String sendUnicastMessage(@RequestParam String content) {
        MessageEvent message = new MessageEvent()
                .setId(UUID.randomUUID().toString())
                .setContent(content)
                .setTimestamp(System.currentTimeMillis());
        producerService.sendUnicastMessage(message);
        return "Unicast message sent successfully";
    }

    @PostMapping("/broadcast")
    public String sendBroadcastMessage(@RequestParam String content) {
        MessageEvent message = new MessageEvent()
                .setId(UUID.randomUUID().toString())
                .setContent(content)
                .setTimestamp(System.currentTimeMillis());
        producerService.sendBroadcastMessage(message);
        return "Broadcast message sent successfully";
    }

    @PostMapping("/retry")
    public String sendRetryMessage(@RequestParam String content) {
        MessageEvent message = new MessageEvent()
                .setId(UUID.randomUUID().toString())
                .setContent(content)
                .setTimestamp(System.currentTimeMillis());
        producerService.sendRetryMessage(message);
        return "Retry message sent successfully";
    }
}
責任編輯:姜華 來源: 不止dotNET
相關(guān)推薦

2023-03-10 08:00:03

KafkaActiveMQ

2019-04-11 10:26:15

架構(gòu)運維技術(shù)

2021-07-09 07:15:48

RocketMQ數(shù)據(jù)結(jié)構(gòu)kafka

2020-02-20 16:45:39

RabbitMQKafka架構(gòu)

2020-07-27 08:13:03

RabbitMQ代碼系統(tǒng)

2022-07-27 22:48:29

消息中間件RocketMQ架構(gòu)設(shè)計

2023-10-24 07:50:18

消息中間件MQ

2021-10-03 21:41:13

RocketMQKafkaPulsar

2021-06-11 21:46:31

RocketMQ數(shù)據(jù)JSON

2022-02-23 15:08:18

開發(fā)分布式Java

2024-11-01 13:49:24

RocketMQ消息類型業(yè)務

2024-04-11 09:45:31

2024-04-11 09:45:31

.NETRabbitMQEasyNetQ

2023-08-07 08:32:05

RocketMQ名字服務

2024-10-29 08:34:27

RocketMQ消息類型事務消息

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2022-10-08 09:33:00

平臺中間件

2021-05-08 08:33:00

Rocketmq日志數(shù)據(jù)源

2021-01-21 07:16:03

RocketMQKafka中間件

2009-07-02 09:13:25

什么是JSPServlet
點贊
收藏

51CTO技術(shù)棧公眾號