自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Rocketmq-Spring:實(shí)戰(zhàn)與源碼解析一網(wǎng)打盡

開發(fā) 項(xiàng)目管理
這篇文章會(huì)介紹 Spring Boot 項(xiàng)目使用 rocketmq-spring SDK 實(shí)現(xiàn)消息收發(fā)的操作流程,同時(shí)筆者會(huì)從開發(fā)者的角度解讀 SDK 的設(shè)計(jì)邏輯。

RocketMQ 是大家耳熟能詳?shù)南㈥?duì)列,開源項(xiàng)目 rocketmq-spring 可以幫助開發(fā)者在 Spring Boot 項(xiàng)目中快速整合 RocketMQ。

這篇文章會(huì)介紹 Spring Boot 項(xiàng)目使用 rocketmq-spring SDK 實(shí)現(xiàn)消息收發(fā)的操作流程,同時(shí)筆者會(huì)從開發(fā)者的角度解讀 SDK 的設(shè)計(jì)邏輯。

圖片

一、SDK 簡(jiǎn)介

圖片

項(xiàng)目地址:??https://github.com/apache/rocketmq-spring??

rocketmq-spring 的本質(zhì)是一個(gè) Spring Boot starter 。

Spring Boot 基于“約定大于配置”(Convention over configuration)這一理念來快速地開發(fā)、測(cè)試、運(yùn)行和部署 Spring 應(yīng)用,并能通過簡(jiǎn)單地與各種啟動(dòng)器(如 spring-boot-web-starter)結(jié)合,讓應(yīng)用直接以命令行的方式運(yùn)行,不需再部署到獨(dú)立容器中。

Spring Boot starter 構(gòu)造的啟動(dòng)器使用起來非常方便,開發(fā)者只需要在 pom.xml 引入 starter 的依賴定義,在配置文件中編寫約定的配置即可。

下面我們看下 rocketmq-spring-boot-starter 的配置:

1、引入依賴

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>

2、約定配置

圖片

接下來,我們分別按照生產(chǎn)者和消費(fèi)者的順序,詳細(xì)的講解消息收發(fā)的操作過程。

二、生產(chǎn)者

首先我們添加依賴后,進(jìn)行如下三個(gè)步驟:

1、配置文件中配置如下

rocketmq:
name-server: 127.0.0.1:9876
producer:
group: platform-sms-server-group
# access-key: myaccesskey
# secret-key: mysecretkey
topic: sms-common-topic

生產(chǎn)者配置非常簡(jiǎn)單,主要配置名字服務(wù)地址和生產(chǎn)者組。

2、需要發(fā)送消息的類中注入 RcoketMQTemplate

@Autowired
private RocketMQTemplate rocketMQTemplate;

@Value("${rocketmq.topic}")
private String smsTopic;

3、發(fā)送消息,消息體可以是自定義對(duì)象,也可以是 Message 對(duì)象

rocketMQTemplate 類包含多鐘發(fā)送消息的方法:

  • 同步發(fā)送 syncSend
  • 異步發(fā)送 asyncSend
  • 順序發(fā)送 syncSendOrderly
  • oneway發(fā)送 sendOneWay

下面的代碼展示如何同步發(fā)送消息。

String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags;
SendResult sendResult =
rocketMQTemplate.syncSend(
destination,
MessageBuilder.withPayload(messageContent).
setHeader(MessageConst.PROPERTY_KEYS, uniqueId).
build()
);
if (sendResult != null) {
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
// send message success ,do something
}
}

syncSend 方法的第一個(gè)參數(shù)是發(fā)送的目標(biāo),格式是:topic + ":" + tags ,

第二個(gè)參數(shù)是:spring-message 規(guī)范的 message 對(duì)象 ,而 MessageBuilder 是一個(gè)工具類,方法鏈?zhǔn)秸{(diào)用創(chuàng)建消息對(duì)象。

三、消費(fèi)者

1、配置文件中配置如下

rocketmq:
name-server: 127.0.0.1:9876
consumer1:
group: platform-sms-worker-common-group
topic: sms-common-topic

2、實(shí)現(xiàn)消息監(jiān)聽器

@Component
@RocketMQMessageListener(
consumerGroup = "${rocketmq.consumer1.group}", //消費(fèi)組
topic = "${rocketmq.consumer1.topic}" //主題
)
public class SmsMessageCommonConsumer implements RocketMQListener<String> {
public void onMessage(String message) {
System.out.println("普通短信:" + message);
}
}

消費(fèi)者實(shí)現(xiàn)類也可以實(shí)現(xiàn) RocketMQListener<MessageExt>, 在 onMessage 方法里通過 RocketMQ 原生消息對(duì)象 MessageExt 獲取更詳細(xì)的消息數(shù)據(jù) 。

public void onMessage(MessageExt message) {
try {
String body = new String(message.getBody(), "UTF-8");
logger.info("普通短信:" + message);
} catch (Exception e) {
logger.error("common onMessage error:", e);
}
}

四、源碼概覽

圖片

最新源碼中,我們可以看到源碼中包含四個(gè)模塊:

1、rocketmq-spring-boot-parent

該模塊是父模塊,定義項(xiàng)目所有依賴的 jar 包。

2、rocketmq-spring-boot

核心模塊,實(shí)現(xiàn)了 starter 的核心邏輯。

3、rocketmq-spring-boot-starter

SDK 模塊,簡(jiǎn)單封裝,外部項(xiàng)目引用。

4、rocketmq-spring-boot-samples

示例代碼模塊。這個(gè)模塊非常重要,當(dāng)用戶使用 SDK 時(shí),可以參考示例快速開發(fā)。

五、starter 實(shí)現(xiàn)

我們重點(diǎn)分析下 rocketmq-spring-boot 模塊的核心源碼:

圖片


spring-boot-starter 實(shí)現(xiàn)需要包含如下三個(gè)部分:

1、定義 Spring 自身的依賴包和 RocketMQ 的依賴包 ;

2、定義spring.factories 文件

在 resources 包下創(chuàng)建 META-INF 目錄后,新建 spring.factories 文件,并在文件中定義自動(dòng)加載類,文件內(nèi)容是:

org.springframework.boot.autoconfigure.EnableAutoCnotallow=\
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

spring boot 會(huì)根據(jù)文件中配置的自動(dòng)化配置類來自動(dòng)初始化相關(guān)的 Bean、Component 或 Service。

3、實(shí)現(xiàn)自動(dòng)加載類

在 RocketMQAutoConfiguration 類的具體實(shí)現(xiàn)中,我們重點(diǎn)分析下生產(chǎn)者和消費(fèi)者是如何分別啟動(dòng)的。

▍生產(chǎn)者發(fā)送模板類:RocketMQTemplate

RocketMQAutoConfiguration 類定義了兩個(gè)默認(rèn)的 Bean :

圖片

圖片

首先SpringBoot項(xiàng)目中配置文件中的配置值會(huì)根據(jù)屬性條件綁定到 RocketMQProperties 對(duì)象 中,然后使用 RocketMQ 的原生 API 分別創(chuàng)建生產(chǎn)者 Bean 和拉取消費(fèi)者 Bean , 分別將兩個(gè) bean 設(shè)置到 RocketMQTemplate 對(duì)象中。

兩個(gè)重點(diǎn)需要強(qiáng)調(diào):

  • 發(fā)送消息時(shí),將 spring-message 規(guī)范下的消息對(duì)象封裝成 RocketMQ 消息對(duì)象

圖片

  • 默認(rèn)拉取消費(fèi)者 litePullConsumer 。拉取消費(fèi)者一般用于大數(shù)據(jù)批量處理場(chǎng)景 。

圖片

  • 原生使用方式

RocketMQTemplate 類封裝了拉取消費(fèi)者的receive方法,以方便開發(fā)者使用。

圖片

▍自定義消費(fèi)者類

下圖是并發(fā)消費(fèi)者的例子:

圖片

消費(fèi)者示例代碼

那么 rocketmq-spring 是如何自動(dòng)啟動(dòng)消費(fèi)者呢 ?

圖片

spring 容器首先注冊(cè)了消息監(jiān)聽器后置處理器,然后調(diào)用 ListenerContainerConfiguration 類的 registerContainer 方法 。

對(duì)比并發(fā)消費(fèi)者的例子,我們可以看到:DefaultRocketMQListenerContainer 是對(duì) DefaultMQPushConsumer 消費(fèi)邏輯的封裝。

圖片

封裝消費(fèi)消息的邏輯,同時(shí)滿足 RocketMQListener 泛化接口支持不同參數(shù),比如 String 、MessageExt 、自定義對(duì)象 。

首先DefaultRocketMQListenerContainer初始化之后, 獲取 onMessage 方法的參數(shù)類型 。

圖片

然后消費(fèi)者調(diào)用 consumeMessage 處理消息時(shí),封裝了一個(gè) handleMessage 方法 ,將原生 RocketMQ 消息對(duì)象 MessageExt 轉(zhuǎn)換成 onMessage 方法定義的參數(shù)對(duì)象,然后調(diào)用 rocketMQListener 的 onMessage  方法。

圖片

mnjh9

上圖右側(cè)標(biāo)紅的代碼也就是該方法的精髓:

rocketMQListener.onMessage(doConvertMessage(messageExt));

六、寫到最后

開源項(xiàng)目 rocketmq-spring 有很多值得學(xué)習(xí)的地方 ,我們可以從如下四個(gè)層面逐層進(jìn)階:

1、學(xué)會(huì)如何使用 :參考 rocketmq-spring-boot-samples 模塊的示例代碼,學(xué)會(huì)如何發(fā)送和接收消息,快速編碼;

2、模塊設(shè)計(jì):學(xué)習(xí)項(xiàng)目的模塊分層 (父模塊、SDK 模塊、核心實(shí)現(xiàn)模塊、示例代碼模塊);

3、starter 設(shè)計(jì)思路 :定義自動(dòng)配置文件 spring.factories 、設(shè)計(jì)配置屬性類 、在 RocketMQ client 的基礎(chǔ)上實(shí)現(xiàn)優(yōu)雅的封裝、深入理解 RocketMQ 源碼等;

4、舉一反三:當(dāng)我們理解了 rocketmq-spring 的源碼,我們可以嘗試模仿該項(xiàng)目寫一個(gè)簡(jiǎn)單的 spring boot starter。

責(zé)任編輯:武曉燕 來源: 勇哥java實(shí)戰(zhàn)分享
相關(guān)推薦

2024-04-26 00:25:52

Rust語(yǔ)法生命周期

2024-04-07 08:41:34

2021-08-05 06:54:05

流程控制default

2024-02-27 10:11:36

前端CSS@規(guī)則

2021-10-11 07:55:42

瀏覽器語(yǔ)法Webpack

2013-08-02 10:52:10

Android UI控件

2024-08-26 10:01:50

2024-06-12 00:00:05

2010-08-25 01:59:00

2011-12-02 09:22:23

網(wǎng)絡(luò)管理NetQos

2021-10-26 16:15:26

Spring 事務(wù)隔離性

2013-10-16 14:18:02

工具圖像處理

2023-04-06 09:08:41

BPM流程引擎

2022-09-15 10:47:19

數(shù)據(jù)庫(kù)事務(wù)工作單元

2024-02-23 08:14:01

項(xiàng)目開發(fā)Spring

2019-07-24 15:30:00

SQL注入數(shù)據(jù)庫(kù)

2020-02-21 08:45:45

PythonWeb開發(fā)框架

2021-05-20 11:17:49

加密貨幣區(qū)塊鏈印度

2021-10-29 09:32:33

springboot 靜態(tài)變量項(xiàng)目

2023-09-06 18:37:45

CSS選擇器符號(hào)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)