Rocketmq-Spring:實(shí)戰(zhàn)與源碼解析一網(wǎng)打盡
RocketMQ 是大家耳熟能詳?shù)南㈥?duì)列,開源項(xiàng)目 rocketmq-spring 可以幫助開發(fā)者在 Spring Boot 項(xiàng)目中快速整合 RocketMQ。
這篇文章會(huì)介紹 Spring Boot 項(xiàng)目使用 rocketmq-spring SDK 實(shí)現(xiàn)消息收發(fā)的操作流程,同時(shí)筆者會(huì)從開發(fā)者的角度解讀 SDK 的設(shè)計(jì)邏輯。
一、SDK 簡(jiǎn)介
項(xiàng)目地址:??https://github.com/apache/rocketmq-spring??
rocketmq-spring 的本質(zhì)是一個(gè) Spring Boot starter 。
Spring Boot 基于“約定大于配置”(Convention over configuration)這一理念來快速地開發(fā)、測(cè)試、運(yùn)行和部署 Spring 應(yīng)用,并能通過簡(jiǎn)單地與各種啟動(dòng)器(如 spring-boot-web-starter)結(jié)合,讓應(yīng)用直接以命令行的方式運(yùn)行,不需再部署到獨(dú)立容器中。
Spring Boot starter 構(gòu)造的啟動(dòng)器使用起來非常方便,開發(fā)者只需要在 pom.xml 引入 starter 的依賴定義,在配置文件中編寫約定的配置即可。
下面我們看下 rocketmq-spring-boot-starter 的配置:
1、引入依賴
2、約定配置
接下來,我們分別按照生產(chǎn)者和消費(fèi)者的順序,詳細(xì)的講解消息收發(fā)的操作過程。
二、生產(chǎn)者
首先我們添加依賴后,進(jìn)行如下三個(gè)步驟:
1、配置文件中配置如下
生產(chǎn)者配置非常簡(jiǎn)單,主要配置名字服務(wù)地址和生產(chǎn)者組。
2、需要發(fā)送消息的類中注入 RcoketMQTemplate
3、發(fā)送消息,消息體可以是自定義對(duì)象,也可以是 Message 對(duì)象
rocketMQTemplate 類包含多鐘發(fā)送消息的方法:
- 同步發(fā)送 syncSend
- 異步發(fā)送 asyncSend
- 順序發(fā)送 syncSendOrderly
- oneway發(fā)送 sendOneWay
下面的代碼展示如何同步發(fā)送消息。
syncSend 方法的第一個(gè)參數(shù)是發(fā)送的目標(biāo),格式是:topic + ":" + tags ,
第二個(gè)參數(shù)是:spring-message 規(guī)范的 message 對(duì)象 ,而 MessageBuilder 是一個(gè)工具類,方法鏈?zhǔn)秸{(diào)用創(chuàng)建消息對(duì)象。
三、消費(fèi)者
1、配置文件中配置如下
2、實(shí)現(xiàn)消息監(jiān)聽器
消費(fèi)者實(shí)現(xiàn)類也可以實(shí)現(xiàn) RocketMQListener<MessageExt>, 在 onMessage 方法里通過 RocketMQ 原生消息對(duì)象 MessageExt 獲取更詳細(xì)的消息數(shù)據(jù) 。
四、源碼概覽
最新源碼中,我們可以看到源碼中包含四個(gè)模塊:
1、rocketmq-spring-boot-parent
該模塊是父模塊,定義項(xiàng)目所有依賴的 jar 包。
2、rocketmq-spring-boot
核心模塊,實(shí)現(xiàn)了 starter 的核心邏輯。
3、rocketmq-spring-boot-starter
SDK 模塊,簡(jiǎn)單封裝,外部項(xiàng)目引用。
4、rocketmq-spring-boot-samples
示例代碼模塊。這個(gè)模塊非常重要,當(dāng)用戶使用 SDK 時(shí),可以參考示例快速開發(fā)。
五、starter 實(shí)現(xiàn)
我們重點(diǎn)分析下 rocketmq-spring-boot 模塊的核心源碼:
spring-boot-starter 實(shí)現(xiàn)需要包含如下三個(gè)部分:
1、定義 Spring 自身的依賴包和 RocketMQ 的依賴包 ;
2、定義spring.factories 文件
在 resources 包下創(chuàng)建 META-INF 目錄后,新建 spring.factories 文件,并在文件中定義自動(dòng)加載類,文件內(nèi)容是:
spring boot 會(huì)根據(jù)文件中配置的自動(dòng)化配置類來自動(dòng)初始化相關(guān)的 Bean、Component 或 Service。
3、實(shí)現(xiàn)自動(dòng)加載類
在 RocketMQAutoConfiguration 類的具體實(shí)現(xiàn)中,我們重點(diǎn)分析下生產(chǎn)者和消費(fèi)者是如何分別啟動(dòng)的。
▍生產(chǎn)者發(fā)送模板類:RocketMQTemplate
RocketMQAutoConfiguration 類定義了兩個(gè)默認(rèn)的 Bean :
首先SpringBoot項(xiàng)目中配置文件中的配置值會(huì)根據(jù)屬性條件綁定到 RocketMQProperties 對(duì)象 中,然后使用 RocketMQ 的原生 API 分別創(chuàng)建生產(chǎn)者 Bean 和拉取消費(fèi)者 Bean , 分別將兩個(gè) bean 設(shè)置到 RocketMQTemplate 對(duì)象中。
兩個(gè)重點(diǎn)需要強(qiáng)調(diào):
- 發(fā)送消息時(shí),將 spring-message 規(guī)范下的消息對(duì)象封裝成 RocketMQ 消息對(duì)象
- 默認(rèn)拉取消費(fèi)者 litePullConsumer 。拉取消費(fèi)者一般用于大數(shù)據(jù)批量處理場(chǎng)景 。
- 原生使用方式
RocketMQTemplate 類封裝了拉取消費(fèi)者的receive方法,以方便開發(fā)者使用。
▍自定義消費(fèi)者類
下圖是并發(fā)消費(fèi)者的例子:
消費(fèi)者示例代碼
那么 rocketmq-spring 是如何自動(dòng)啟動(dòng)消費(fèi)者呢 ?
spring 容器首先注冊(cè)了消息監(jiān)聽器后置處理器,然后調(diào)用 ListenerContainerConfiguration 類的 registerContainer 方法 。
對(duì)比并發(fā)消費(fèi)者的例子,我們可以看到:DefaultRocketMQListenerContainer 是對(duì) DefaultMQPushConsumer 消費(fèi)邏輯的封裝。
封裝消費(fèi)消息的邏輯,同時(shí)滿足 RocketMQListener 泛化接口支持不同參數(shù),比如 String 、MessageExt 、自定義對(duì)象 。
首先DefaultRocketMQListenerContainer初始化之后, 獲取 onMessage 方法的參數(shù)類型 。
然后消費(fèi)者調(diào)用 consumeMessage 處理消息時(shí),封裝了一個(gè) handleMessage 方法 ,將原生 RocketMQ 消息對(duì)象 MessageExt 轉(zhuǎn)換成 onMessage 方法定義的參數(shù)對(duì)象,然后調(diào)用 rocketMQListener 的 onMessage 方法。
mnjh9
上圖右側(cè)標(biāo)紅的代碼也就是該方法的精髓:
六、寫到最后
開源項(xiàng)目 rocketmq-spring 有很多值得學(xué)習(xí)的地方 ,我們可以從如下四個(gè)層面逐層進(jìn)階:
1、學(xué)會(huì)如何使用 :參考 rocketmq-spring-boot-samples 模塊的示例代碼,學(xué)會(huì)如何發(fā)送和接收消息,快速編碼;
2、模塊設(shè)計(jì):學(xué)習(xí)項(xiàng)目的模塊分層 (父模塊、SDK 模塊、核心實(shí)現(xiàn)模塊、示例代碼模塊);
3、starter 設(shè)計(jì)思路 :定義自動(dòng)配置文件 spring.factories 、設(shè)計(jì)配置屬性類 、在 RocketMQ client 的基礎(chǔ)上實(shí)現(xiàn)優(yōu)雅的封裝、深入理解 RocketMQ 源碼等;
4、舉一反三:當(dāng)我們理解了 rocketmq-spring 的源碼,我們可以嘗試模仿該項(xiàng)目寫一個(gè)簡(jiǎn)單的 spring boot starter。