消息隊列 RocketMQ 入門指南
在當今的分布式系統(tǒng)中,消息隊列(Message Queue)作為解耦、異步通信和流量削峰的重要組件,扮演著不可或缺的角色。而RocketMQ,作為阿里巴巴開源的一款高性能、高可靠的分布式消息中間件,憑借其強大的功能和穩(wěn)定的性能,成為了眾多開發(fā)者和企業(yè)的首選。
無論你是剛剛接觸消息隊列的新手,還是希望深入了解RocketMQ的開發(fā)者,這篇文章都將為你提供一個清晰的入門指南。我們將從RocketMQ的基本概念出發(fā),逐步講解其核心功能,并通過簡單的實踐示例,幫助你快速上手。
一、詳解RocketMQ基礎(chǔ)概念
1. 為什么要用RocketMQ
相比于市場上的各種消息隊列,它有如下優(yōu)勢:
- 性能好。
- 穩(wěn)定可靠。
- 中文社區(qū)活躍。
當然缺點也是有那么一些些的,兼容性確實不太行。
2. RocketMQ優(yōu)缺點是什么
優(yōu)點:
- 單機吞吐量為10w級。
- 可用性很高,支持分布式架構(gòu)。
- 擴展性好。
- 支持10億級別的消息堆積,而且不會因為堆積導(dǎo)致性能下降。
- 源碼是用Java寫的,對于Java程序員來說非常方便改造。
- 參數(shù)優(yōu)化配置,消息基本可以做到0丟失。
- 使用于對可靠性要求高的金融行業(yè)。
缺點:
- 目前只支持Java、C++客戶端,而且C++還不算完善。
- 沒有在MQ核心實現(xiàn)JMS相關(guān)接口,有些遷移改造就比較麻煩了。
3. 消息隊列使用場景
解耦: 例如用戶完成下單除了必要的庫存扣減和訂單狀態(tài)更新外,我們還需要處理一些積分系統(tǒng)、推送系統(tǒng)的無關(guān)緊要的業(yè)務(wù)處理,如果全部順序執(zhí)行,等待時間就會變得很漫長,所以我們需要借助MQ將邊角業(yè)務(wù)從業(yè)務(wù)模塊中解耦開來。
- 異步: 這點不必多說,上述的解耦方案就會使得積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)任務(wù)異步執(zhí)行。
- 削峰: 可以理解為一個漏斗,例如我們的某個服務(wù)只能抗住10wQPS,可是當前請求卻達到20w的QPS,那么我們就可以將請求全部先扔到MQ中,讓服務(wù)慢慢消化處理。
二、RocketMQ基礎(chǔ)安裝與實踐
1. 安裝并啟動RocketMQ
在編寫業(yè)務(wù)代碼之前,我們必須完成一下RocketMQ的部署,首先我們自然要下載一下RocketMQ,下載地址如下,筆者下載的是rocketmq-all-4.8.0-bin-release這個版本:https://rocketmq.apache.org/download/
完成完成后,我們將其解壓到自定義的路徑,鍵入sudo vim /etc/profile配置MQ環(huán)境變量,完成后鍵入source /etc/profile使之生效,對應(yīng)的配置內(nèi)容如下所示:
export ROCKETMQ_HOME=/home/sharkchili/rocketmq-all-4.8.0-bin-release
export PATH=$PATH:$ROCKETMQ_HOME/bin
需要注意的是筆者本次采用WSL的Ubuntu子系統(tǒng)時啟動時腳本會拋出runserver.sh: 70: [[: Exec format error錯誤,嘗試格式化和指令配置后都沒有很好的解決,于是循著報錯找到runserver.sh這行對應(yīng)的腳本內(nèi)容,該括弧本質(zhì)上就是基于JDK內(nèi)容配置對應(yīng)的GC算法:
以筆者為里系統(tǒng)是jdk8,所以直接去掉判斷用走JDK8的配置即可:
choose_gc_options()
{
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFractinotallow=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
}
完成后鍵入./mqnamesrv &將MQ啟動,如果彈窗輸出下面這條結(jié)果,則說明mq的NameServer啟動成功。
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
然后我們再鍵入./mqbroker -n 127.0.0.1:9876啟動broker,需要注意的是默認情況下broker占用堆內(nèi)存差不多是4g,所以讀者本地部署時建議修改一下runbroker.sh的堆內(nèi)存,如下圖所示:
若彈窗輸出下面所示的文字,則說明broker啟動成功,自此mq就在windows環(huán)境部署成功了。我們就可以開始編碼工作了。
The broker[DESKTOP-BI4ATFQ, 192.168.237.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
2. 訂單系統(tǒng)改造
本次的示例是關(guān)于訂單系統(tǒng)改造,用戶下單完成后,服務(wù)器需要進行庫存扣減、訂單狀態(tài)更新、以及優(yōu)惠券、積分等邊邊角角的業(yè)務(wù),如果順序執(zhí)行這些邏輯+網(wǎng)絡(luò)開銷,接口耗時對于用戶體驗是非常不友好的。
所以我們在將非核心業(yè)務(wù)邏輯從接口串行調(diào)用中抽出,下單業(yè)務(wù)只需關(guān)注完成我們庫存扣減、訂單狀態(tài)更新就行了,剩下的業(yè)務(wù)用MQ發(fā)個消息給積分系統(tǒng)、促銷系統(tǒng)告知他們自己處理一下就行了:
首先我們引入MQ依賴腳手架:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
同時這里我們也給出配置信息:
# mq地址端口
rocketmq.name-server=127.0.0.1:9876
# 生產(chǎn)者配置
rocketmq.producer.isOnOff=on
# 發(fā)送同一類消息設(shè)置為同一個group,保證唯一
rocketmq.producer.group=rocketmq-group
rocketmq.producer.groupName=rocketmq-group
# namesrv地址
rocketmq.producer.namesrvAddr=127.0.0.1:9876
# 設(shè)置消息最大長度 4M
rocketmq.producer.maxMessageSize=4096
# 消息發(fā)送超時時間
rocketmq.producer.sendMsgTimeout=3000
# 消息發(fā)送失敗重試次數(shù)
rocketmq.producer.retryTimesWhenSendFailed=2
隨后我們設(shè)置監(jiān)聽處理關(guān)于訂單創(chuàng)建的topic消息:
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "ORDER_ADD")
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("收到訂單,訂單信息:[{}],進行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSONUtil.toJsonStr(order));
}
}
完成后我們基于CommandLineRunner 測試一下消息發(fā)送:
@Component
@Slf4j
public class MQTest implements CommandLineRunner {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void run(String... args) throws Exception {
Order order = new Order();
order.setOrderNo("20221217001002003");
order.setUserId(1);
order.setPrice(500.00);
rocketMQTemplate.asyncSend("ORDER_ADD", MessageBuilder.withPayload(order).build(), getDefaultSendCallBack());
}
/**
* 消息處理默認回調(diào)
* @return
*/
private SendCallback getDefaultSendCallBack() {
return new SendCallback() {
@Override
public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
log.info("MQ消息發(fā)送成功。result:{}", JSONUtil.toJsonStr(sendResult));
}
@Override
public void onException(Throwable throwable) {
log.error("MQ消息發(fā)送失敗,失敗原因:{}" + throwable.getMessage());
}
};
}
}
日志如下,可以看到消息消費成功了:
2025-02-11 10:03:14.577 INFO 14420 --- [MessageThread_1] com.sharkChili.config.OrderMsgListener : 收到訂單,訂單信息:[{"userId":1,"orderNo":"20221217001002003","price":500}],進行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....
2025-02-11 10:03:14.577 INFO 14420 --- [ublicExecutor_2] com.sharkChili.runner.MQTest : MQ消息發(fā)送成功。result:{"sendStatus":"SEND_OK","msgId":"AC1E1001385418B4AAC235A7E0190000","messageQueue":{"topic":"ORDER_ADD","brokerName":"DESKTOP-DC9PSUS","queueId":2},"queueOffset":1,"offsetMsgId":"AC15733800002A9F0000000000000558","regionId":"DefaultRegion","traceOn":true}
3. 如何實現(xiàn)消息過濾
設(shè)置tag消息的方式常見的是有兩種,一種是基于tag標簽過濾,如下代碼所示,我們希望發(fā)送訂單業(yè)務(wù)即ORDER_ADD這個主題下tag標簽為tagA的用戶收到消息,那么我們就可以通過ORDER_ADD:tagA針對topic進行更進一步劃分:
//創(chuàng)建訂單消息
Order order = new Order();
order.setUserId(1);
order.setOrderNo(UUID.randomUUID().toString());
order.setPrice(500);
//生成消息
Message<Order> message = MessageBuilder.withPayload(order)
.build();
//同步發(fā)送
rocketMQTemplate.syncSend("ORDER_ADD:tagA", message);
對應(yīng)的監(jiān)聽者通過selectorExpression 指定標簽即可:
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}",
topic = "ORDER_ADD",
selectorExpression = "tagA"http://訂閱tagA的消息
)
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("收到訂單,訂單信息:[{}],進行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSONUtil.toJsonStr(order));
}
}
還有一種就是基于SQL過濾,因為表達式靈活,相對更強大一些,例如我們的消費者只處理userId為10以內(nèi)的數(shù)據(jù),那么消費者的監(jiān)聽就可以按照如下姿勢進行配置:
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}",
topic = "ORDER_ADD",
selectorType = SelectorType.SQL92,//指令類型為sql表達式
selectorExpression = "userId<10"http://過濾出id小于10的用戶的訂單
)
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("收到訂單,訂單信息:[{}],進行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSONUtil.toJsonStr(order));
}
}
發(fā)送消息時,通過headers 指定本消息條件并通過convertAndSend發(fā)送即可:
//創(chuàng)建訂單消息
Order order = new Order();
order.setUserId(1);
order.setOrderNo(UUID.randomUUID().toString());
order.setPrice(500);
//通過header攜帶條件告知當前userId為1
Map<String, Object> headers = new HashMap<>();
headers.put("userId", 1);
//生成消息
Message<Order> message = MessageBuilder.withPayload(order)
.build();
//發(fā)送
rocketMQTemplate.convertAndSend("ORDER_ADD", message, headers);
需要注意的是默認情況下,MQ是不支持SQL表達過濾,我們需要到conf目錄下的broker.conf文件,添加enablePropertyFilter=true,然后鍵入如下指令降broker啟動:
./mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true -c ../conf/broker.conf
4. 如何提交延時消息
延遲消息即需要消費者過一段時間后才能消費的消息,例如我們現(xiàn)在有個消息要求消費者10s后才能消費,那么我們就可以使用延遲消息,如下代碼所示:
// 創(chuàng)建延遲消息
Message<String> rocketMessage = MessageBuilder.withPayload("this is delay msg").build();
// 發(fā)送延遲消息,timeout設(shè)置為10000即10s,delayLevel表示延遲等級,1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,所以 3為10s
rocketMQTemplate.syncSend("delay_topic", rocketMessage, 10000,3);
log.info("延遲消息發(fā)送完成");
消費者代碼:
@Component
@RocketMQMessageListener(consumerGroup = "delay_msg_group", topic = "delay_topic")
@Slf4j
public class DelayMsgListener implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
log.info("收到延遲消息,消息內(nèi)容:{}", JSONUtil.toJsonStr(msg));
}
}
輸出結(jié)果,可以看到確實是10s后消費者采納看到消息并消費:
2025-02-11 10:56:58.300 INFO 18568 --- [ main] com.sharkChili.runner.MQTest : 延遲消息發(fā)送完成
2025-02-11 10:57:08.307 INFO 18568 --- [MessageThread_1] com.sharkChili.config.DelayMsgListener : 收到延遲消息,消息內(nèi)容:this is delay msg