自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

消息隊列 RocketMQ 入門指南

開發(fā)
本文我們將從RocketMQ的基本概念出發(fā),逐步講解其核心功能,并通過簡單的實踐示例,幫助你快速上手。

在當今的分布式系統(tǒng)中,消息隊列(Message Queue)作為解耦、異步通信和流量削峰的重要組件,扮演著不可或缺的角色。而RocketMQ,作為阿里巴巴開源的一款高性能、高可靠的分布式消息中間件,憑借其強大的功能和穩(wěn)定的性能,成為了眾多開發(fā)者和企業(yè)的首選。

無論你是剛剛接觸消息隊列的新手,還是希望深入了解RocketMQ的開發(fā)者,這篇文章都將為你提供一個清晰的入門指南。我們將從RocketMQ的基本概念出發(fā),逐步講解其核心功能,并通過簡單的實踐示例,幫助你快速上手。

一、詳解RocketMQ基礎(chǔ)概念

1. 為什么要用RocketMQ

相比于市場上的各種消息隊列,它有如下優(yōu)勢:

  • 性能好。
  • 穩(wěn)定可靠。
  • 中文社區(qū)活躍。

當然缺點也是有那么一些些的,兼容性確實不太行。

2. RocketMQ優(yōu)缺點是什么

優(yōu)點:

  • 單機吞吐量為10w級。
  • 可用性很高,支持分布式架構(gòu)。
  • 擴展性好。
  • 支持10億級別的消息堆積,而且不會因為堆積導(dǎo)致性能下降。
  • 源碼是用Java寫的,對于Java程序員來說非常方便改造。
  • 參數(shù)優(yōu)化配置,消息基本可以做到0丟失。
  • 使用于對可靠性要求高的金融行業(yè)。

缺點:

  • 目前只支持Java、C++客戶端,而且C++還不算完善。
  • 沒有在MQ核心實現(xiàn)JMS相關(guān)接口,有些遷移改造就比較麻煩了。

3. 消息隊列使用場景

解耦: 例如用戶完成下單除了必要的庫存扣減和訂單狀態(tài)更新外,我們還需要處理一些積分系統(tǒng)、推送系統(tǒng)的無關(guān)緊要的業(yè)務(wù)處理,如果全部順序執(zhí)行,等待時間就會變得很漫長,所以我們需要借助MQ將邊角業(yè)務(wù)從業(yè)務(wù)模塊中解耦開來。

  • 異步: 這點不必多說,上述的解耦方案就會使得積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)任務(wù)異步執(zhí)行。
  • 削峰: 可以理解為一個漏斗,例如我們的某個服務(wù)只能抗住10wQPS,可是當前請求卻達到20w的QPS,那么我們就可以將請求全部先扔到MQ中,讓服務(wù)慢慢消化處理。

二、RocketMQ基礎(chǔ)安裝與實踐

1. 安裝并啟動RocketMQ

在編寫業(yè)務(wù)代碼之前,我們必須完成一下RocketMQ的部署,首先我們自然要下載一下RocketMQ,下載地址如下,筆者下載的是rocketmq-all-4.8.0-bin-release這個版本:https://rocketmq.apache.org/download/

完成完成后,我們將其解壓到自定義的路徑,鍵入sudo vim /etc/profile配置MQ環(huán)境變量,完成后鍵入source /etc/profile使之生效,對應(yīng)的配置內(nèi)容如下所示:

export ROCKETMQ_HOME=/home/sharkchili/rocketmq-all-4.8.0-bin-release
export PATH=$PATH:$ROCKETMQ_HOME/bin

需要注意的是筆者本次采用WSL的Ubuntu子系統(tǒng)時啟動時腳本會拋出runserver.sh: 70: [[: Exec format error錯誤,嘗試格式化和指令配置后都沒有很好的解決,于是循著報錯找到runserver.sh這行對應(yīng)的腳本內(nèi)容,該括弧本質(zhì)上就是基于JDK內(nèi)容配置對應(yīng)的GC算法:

以筆者為里系統(tǒng)是jdk8,所以直接去掉判斷用走JDK8的配置即可:

choose_gc_options()
{

      JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFractinotallow=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
      JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
      JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
 
}

完成后鍵入./mqnamesrv &將MQ啟動,如果彈窗輸出下面這條結(jié)果,則說明mq的NameServer啟動成功。

Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON

然后我們再鍵入./mqbroker -n 127.0.0.1:9876啟動broker,需要注意的是默認情況下broker占用堆內(nèi)存差不多是4g,所以讀者本地部署時建議修改一下runbroker.sh的堆內(nèi)存,如下圖所示:

若彈窗輸出下面所示的文字,則說明broker啟動成功,自此mq就在windows環(huán)境部署成功了。我們就可以開始編碼工作了。

The broker[DESKTOP-BI4ATFQ, 192.168.237.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876

2. 訂單系統(tǒng)改造

本次的示例是關(guān)于訂單系統(tǒng)改造,用戶下單完成后,服務(wù)器需要進行庫存扣減、訂單狀態(tài)更新、以及優(yōu)惠券、積分等邊邊角角的業(yè)務(wù),如果順序執(zhí)行這些邏輯+網(wǎng)絡(luò)開銷,接口耗時對于用戶體驗是非常不友好的。

所以我們在將非核心業(yè)務(wù)邏輯從接口串行調(diào)用中抽出,下單業(yè)務(wù)只需關(guān)注完成我們庫存扣減、訂單狀態(tài)更新就行了,剩下的業(yè)務(wù)用MQ發(fā)個消息給積分系統(tǒng)、促銷系統(tǒng)告知他們自己處理一下就行了:

首先我們引入MQ依賴腳手架:

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

同時這里我們也給出配置信息:

# mq地址端口
rocketmq.name-server=127.0.0.1:9876
# 生產(chǎn)者配置
rocketmq.producer.isOnOff=on
# 發(fā)送同一類消息設(shè)置為同一個group,保證唯一
rocketmq.producer.group=rocketmq-group
rocketmq.producer.groupName=rocketmq-group
# namesrv地址
rocketmq.producer.namesrvAddr=127.0.0.1:9876
# 設(shè)置消息最大長度 4M
rocketmq.producer.maxMessageSize=4096
# 消息發(fā)送超時時間
rocketmq.producer.sendMsgTimeout=3000
# 消息發(fā)送失敗重試次數(shù)
rocketmq.producer.retryTimesWhenSendFailed=2

隨后我們設(shè)置監(jiān)聽處理關(guān)于訂單創(chuàng)建的topic消息:

@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "ORDER_ADD")
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        log.info("收到訂單,訂單信息:[{}],進行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSONUtil.toJsonStr(order));
    }
}

完成后我們基于CommandLineRunner 測試一下消息發(fā)送:

@Component
@Slf4j
public class MQTest implements CommandLineRunner {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    @Override
    public void run(String... args) throws Exception {

        Order order = new Order();
        order.setOrderNo("20221217001002003");
        order.setUserId(1);
        order.setPrice(500.00);

        rocketMQTemplate.asyncSend("ORDER_ADD", MessageBuilder.withPayload(order).build(), getDefaultSendCallBack());

    }

    /**
     * 消息處理默認回調(diào)
     * @return
     */
    private SendCallback getDefaultSendCallBack() {
        return new SendCallback() {

            @Override
            public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
                log.info("MQ消息發(fā)送成功。result:{}", JSONUtil.toJsonStr(sendResult));
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("MQ消息發(fā)送失敗,失敗原因:{}" + throwable.getMessage());
            }
        };
    }

}

日志如下,可以看到消息消費成功了:

2025-02-11 10:03:14.577  INFO 14420 --- [MessageThread_1] com.sharkChili.config.OrderMsgListener   : 收到訂單,訂單信息:[{"userId":1,"orderNo":"20221217001002003","price":500}],進行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....
2025-02-11 10:03:14.577  INFO 14420 --- [ublicExecutor_2] com.sharkChili.runner.MQTest             : MQ消息發(fā)送成功。result:{"sendStatus":"SEND_OK","msgId":"AC1E1001385418B4AAC235A7E0190000","messageQueue":{"topic":"ORDER_ADD","brokerName":"DESKTOP-DC9PSUS","queueId":2},"queueOffset":1,"offsetMsgId":"AC15733800002A9F0000000000000558","regionId":"DefaultRegion","traceOn":true}

3. 如何實現(xiàn)消息過濾

設(shè)置tag消息的方式常見的是有兩種,一種是基于tag標簽過濾,如下代碼所示,我們希望發(fā)送訂單業(yè)務(wù)即ORDER_ADD這個主題下tag標簽為tagA的用戶收到消息,那么我們就可以通過ORDER_ADD:tagA針對topic進行更進一步劃分:

//創(chuàng)建訂單消息
        Order order = new Order();
        order.setUserId(1);
        order.setOrderNo(UUID.randomUUID().toString());
        order.setPrice(500);
        //生成消息
        Message<Order> message = MessageBuilder.withPayload(order)
                .build();
        //同步發(fā)送
        rocketMQTemplate.syncSend("ORDER_ADD:tagA", message);

對應(yīng)的監(jiān)聽者通過selectorExpression 指定標簽即可:

@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}",
        topic = "ORDER_ADD",
        selectorExpression = "tagA"http://訂閱tagA的消息
)
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        log.info("收到訂單,訂單信息:[{}],進行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSONUtil.toJsonStr(order));
    }
}

還有一種就是基于SQL過濾,因為表達式靈活,相對更強大一些,例如我們的消費者只處理userId為10以內(nèi)的數(shù)據(jù),那么消費者的監(jiān)聽就可以按照如下姿勢進行配置:

@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}",
        topic = "ORDER_ADD",
        selectorType = SelectorType.SQL92,//指令類型為sql表達式
        selectorExpression = "userId<10"http://過濾出id小于10的用戶的訂單
)
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        log.info("收到訂單,訂單信息:[{}],進行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSONUtil.toJsonStr(order));
    }
}

發(fā)送消息時,通過headers 指定本消息條件并通過convertAndSend發(fā)送即可:

//創(chuàng)建訂單消息
        Order order = new Order();
        order.setUserId(1);
        order.setOrderNo(UUID.randomUUID().toString());
        order.setPrice(500);
        //通過header攜帶條件告知當前userId為1
        Map<String, Object> headers = new HashMap<>();
        headers.put("userId", 1);
        //生成消息
        Message<Order> message = MessageBuilder.withPayload(order)
                .build();
        //發(fā)送
        rocketMQTemplate.convertAndSend("ORDER_ADD", message, headers);

需要注意的是默認情況下,MQ是不支持SQL表達過濾,我們需要到conf目錄下的broker.conf文件,添加enablePropertyFilter=true,然后鍵入如下指令降broker啟動:

./mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true -c ../conf/broker.conf

4. 如何提交延時消息

延遲消息即需要消費者過一段時間后才能消費的消息,例如我們現(xiàn)在有個消息要求消費者10s后才能消費,那么我們就可以使用延遲消息,如下代碼所示:

// 創(chuàng)建延遲消息
        Message<String> rocketMessage = MessageBuilder.withPayload("this is delay msg").build();
        // 發(fā)送延遲消息,timeout設(shè)置為10000即10s,delayLevel表示延遲等級,1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,所以 3為10s
        rocketMQTemplate.syncSend("delay_topic", rocketMessage, 10000,3);
        log.info("延遲消息發(fā)送完成");

消費者代碼:

@Component
@RocketMQMessageListener(consumerGroup = "delay_msg_group", topic = "delay_topic")
@Slf4j
public class DelayMsgListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String msg) {
        log.info("收到延遲消息,消息內(nèi)容:{}", JSONUtil.toJsonStr(msg));
    }
}

輸出結(jié)果,可以看到確實是10s后消費者采納看到消息并消費:

2025-02-11 10:56:58.300  INFO 18568 --- [           main] com.sharkChili.runner.MQTest             : 延遲消息發(fā)送完成
2025-02-11 10:57:08.307  INFO 18568 --- [MessageThread_1] com.sharkChili.config.DelayMsgListener   : 收到延遲消息,消息內(nèi)容:this is delay msg
責任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關(guān)推薦

2022-09-21 21:50:18

Dapr消息隊列

2017-07-11 15:26:57

LocalMQ RocketMQ高性能

2024-10-08 08:52:59

2017-10-11 15:08:28

消息隊列常見

2024-10-29 08:34:27

RocketMQ消息類型事務(wù)消息

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2023-09-18 08:27:20

RabbitMQRocketMQKafka

2022-12-22 10:03:18

消息集成

2023-08-17 10:20:18

RabbitMQ系統(tǒng)

2023-11-20 09:33:43

開發(fā)指南

2023-07-17 08:34:03

RocketMQ消息初體驗

2022-06-02 08:21:07

RocketMQ消息中間件

2023-07-18 09:03:01

RocketMQ場景消息

2023-12-21 08:01:41

RocketMQ消息堆積

2022-03-31 08:26:44

RocketMQ消息排查

2024-09-13 08:49:45

2024-09-25 08:32:05

2024-08-22 18:49:23

2020-11-13 16:40:05

RocketMQ延遲消息架構(gòu)
點贊
收藏

51CTO技術(shù)棧公眾號