自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

如何在優(yōu)雅地Spring中實(shí)現(xiàn)消息的發(fā)送和消費(fèi)

開(kāi)發(fā) 后端
本文將對(duì)當(dāng)前的設(shè)計(jì)實(shí)現(xiàn)做一個(gè)簡(jiǎn)單的介紹,讀者可以通過(guò)本文了解將RocketMQ Client端集成為spring-boot-starter框架的開(kāi)發(fā)細(xì)節(jié)。

前言

上世紀(jì)90年代末,隨著Java EE(Enterprise Edition)的出現(xiàn),特別是Enterprise Java Beans的使用需要復(fù)雜的描述符配置和死板復(fù)雜的代碼實(shí)現(xiàn),增加了廣大開(kāi)發(fā)者的學(xué)習(xí)曲線和開(kāi)發(fā)成本,由此基于簡(jiǎn)單的XML配置和普通Java對(duì)象(Plain Old Java Objects)的Spring技術(shù)應(yīng)運(yùn)而生,依賴(lài)注入(Dependency Injection), 控制反轉(zhuǎn)(Inversion of Control)和面向切面編程(AOP)的技術(shù)更加敏捷地解決了傳統(tǒng)Java企業(yè)及版本的不足。

隨著Spring的持續(xù)演進(jìn),基于注解(Annotation)的配置逐漸取代了XML文件配置, 2014年4月1日,Spring Boot 1.0.0正式發(fā)布,它基于“約定大于配置”(Convention over configuration)這一理念來(lái)快速地開(kāi)發(fā)、測(cè)試、運(yùn)行和部署Spring應(yīng)用,并能通過(guò)簡(jiǎn)單地與各種啟動(dòng)器(如 spring-boot-web-starter)結(jié)合,讓?xiě)?yīng)用直接以命令行的方式運(yùn)行,不需再部署到獨(dú)立容器中。這種簡(jiǎn)便直接快速構(gòu)建和開(kāi)發(fā)應(yīng)用的過(guò)程,可以使用約定的配置并且簡(jiǎn)化部署,受到越來(lái)越多的開(kāi)發(fā)者的歡迎。

Apache RocketMQ是業(yè)界知名的分布式消息和流處理中間件,簡(jiǎn)單地理解,它由Broker服務(wù)器和客戶端兩部分組成:

其中客戶端一個(gè)是消息發(fā)布者客戶端(Producer),它負(fù)責(zé)向Broker服務(wù)器發(fā)送消息;

另外一個(gè)是消息的消費(fèi)者客戶端(Consumer),多個(gè)消費(fèi)者可以組成一個(gè)消費(fèi)組,來(lái)訂閱和拉取消費(fèi)Broker服務(wù)器上存儲(chǔ)的消息。

為了利用Spring Boot的快速開(kāi)發(fā)和讓用戶能夠更靈活地使用RocketMQ消息客戶端,Apache RocketMQ社區(qū)推出了spring-boot-starter實(shí)現(xiàn)。隨著分布式事務(wù)消息功能在RocketMQ 4.3.0版本的發(fā)布,近期升級(jí)了相關(guān)的spring-boot代碼,通過(guò)注解方式支持分布式事務(wù)的回查和事務(wù)消息的發(fā)送。

本文將對(duì)當(dāng)前的設(shè)計(jì)實(shí)現(xiàn)做一個(gè)簡(jiǎn)單的介紹,讀者可以通過(guò)本文了解將RocketMQ Client端集成為spring-boot-starter框架的開(kāi)發(fā)細(xì)節(jié),然后通過(guò)一個(gè)簡(jiǎn)單的示例來(lái)一步一步的講解如何使用這個(gè)spring-boot-starter工具包來(lái)配置,發(fā)送和消費(fèi)RocketMQ消息。

Spring 中的消息框架

順便在這里討論一下在Spring中關(guān)于消息的兩個(gè)主要的框架,即Spring Messaging和Spring Cloud Stream。它們都能夠與Spring Boot整合并提供了一些參考的實(shí)現(xiàn)。和所有的實(shí)現(xiàn)框架一樣,消息框架的目的是實(shí)現(xiàn)輕量級(jí)的消息驅(qū)動(dòng)的微服務(wù),可以有效地簡(jiǎn)化開(kāi)發(fā)人員對(duì)消息中間件的使用復(fù)雜度,讓系統(tǒng)開(kāi)發(fā)人員可以有更多的精力關(guān)注于核心業(yè)務(wù)邏輯的處理。

2.1 Spring Messaging

Spring Messaging是Spring Framework 4中添加的模塊,是Spring與消息系統(tǒng)集成的一個(gè)擴(kuò)展性的支持。它實(shí)現(xiàn)了從基于JmsTemplate的簡(jiǎn)單的使用JMS接口到異步接收消息的一整套完整的基礎(chǔ)架構(gòu),Spring AMQP提供了該協(xié)議所要求的類(lèi)似的功能集。 在與Spring Boot的集成后,它擁有了自動(dòng)配置能力,能夠在測(cè)試和運(yùn)行時(shí)與相應(yīng)的消息傳遞系統(tǒng)進(jìn)行集成。

單純對(duì)于客戶端而言,Spring Messaging提供了一套抽象的API或者說(shuō)是約定的標(biāo)準(zhǔn),對(duì)消息發(fā)送端和消息接收端的模式進(jìn)行規(guī)定,不同的消息中間件提供商可以在這個(gè)模式下提供自己的Spring實(shí)現(xiàn):在消息發(fā)送端需要實(shí)現(xiàn)的是一個(gè)XXXTemplate形式的Java Bean,結(jié)合Spring Boot的自動(dòng)化配置選項(xiàng)提供多個(gè)不同的發(fā)送消息方法;在消息的消費(fèi)端是一個(gè)XXXMessageListener接口(實(shí)現(xiàn)方式通常會(huì)使用一個(gè)注解來(lái)聲明一個(gè)消息驅(qū)動(dòng)的POJO),提供回調(diào)方法來(lái)監(jiān)聽(tīng)和消費(fèi)消息,這個(gè)接口同樣可以使用Spring Boot的自動(dòng)化選項(xiàng)和一些定制化的屬性。

如果有興趣深入的了解Spring Messaging及針對(duì)不同的消息產(chǎn)品的使用,推薦閱讀這個(gè)文件。參考Spring Messaging的既有實(shí)現(xiàn),RocketMQ的spring-boot-starter中遵循了相關(guān)的設(shè)計(jì)模式并結(jié)合RocketMQ自身的功能特點(diǎn)提供了相應(yīng)的API(如,順序,異步和事務(wù)半消息等)。

2.2 Spring Cloud Stream

Spring Cloud Stream結(jié)合了Spring Integration的注解和功能,它的應(yīng)用模型如下:

該圖片引自spring cloud stream

Spring Cloud Stream框架中提供一個(gè)獨(dú)立的應(yīng)用內(nèi)核,它通過(guò)輸入(@Input)和輸出(@Output)通道與外部世界進(jìn)行通信,消息源端(Source)通過(guò)輸入通道發(fā)送消息,消費(fèi)目標(biāo)端(Sink)通過(guò)監(jiān)聽(tīng)輸出通道來(lái)獲取消費(fèi)的消息。這些通道通過(guò)專(zhuān)用的Binder實(shí)現(xiàn)與外部代理連接。開(kāi)發(fā)人員的代碼只需要針對(duì)應(yīng)用內(nèi)核提供的固定的接口和注解方式進(jìn)行編程,而不需要關(guān)心運(yùn)行時(shí)具體的Binder綁定的消息中間件。在運(yùn)行時(shí),Spring Cloud Stream能夠自動(dòng)探測(cè)并使用在classpath下找到的Binder。

這樣開(kāi)發(fā)人員可以輕松地在相同的代碼中使用不同類(lèi)型的中間件:僅僅需要在構(gòu)建時(shí)包含進(jìn)不同的Binder。在更加復(fù)雜的使用場(chǎng)景中,也可以在應(yīng)用中打包多個(gè)Binder并讓它自己選擇Binder,甚至在運(yùn)行時(shí)為不同的通道使用不同的Binder。

Binder抽象使得Spring Cloud Stream應(yīng)用可以靈活的連接到中間件,加之Spring Cloud Stream使用利用了Spring Boot的靈活配置配置能力,這樣的配置可以通過(guò)外部配置的屬性和Spring Boo支持的任何形式來(lái)提供(包括應(yīng)用啟動(dòng)參數(shù)、環(huán)境變量和application.yml或者application.properties文件),部署人員可以在運(yùn)行時(shí)動(dòng)態(tài)選擇通道連接destination(例如,Kafka的topic或者RabbitMQ的exchange)。

Binder SPI的方式來(lái)讓消息中間件產(chǎn)品使用可擴(kuò)展的API來(lái)編寫(xiě)相應(yīng)的Binder,并集成到Spring Cloud Steam環(huán)境,目前RocketMQ還沒(méi)有提供相關(guān)的Binder,我們計(jì)劃在下一步將完善這一功能,也希望社區(qū)里有這方面經(jīng)驗(yàn)的同學(xué)積極嘗試,貢獻(xiàn)PR或建議。

spring-boot-starter的實(shí)現(xiàn)

在開(kāi)始的時(shí)候我們已經(jīng)知道,spring boot starter構(gòu)造的啟動(dòng)器對(duì)于使用者是非常方便的,使用者只要在pom.xml引入starter的依賴(lài)定義,相應(yīng)的編譯,運(yùn)行和部署功能就全部自動(dòng)引入。因此常用的開(kāi)源組件都會(huì)為Spring的用戶提供一個(gè)spring-boot-starter封裝給開(kāi)發(fā)者,讓開(kāi)發(fā)者非常方便集成和使用,這里我們?cè)敿?xì)的介紹一下RocketMQ(客戶端)的starter實(shí)現(xiàn)過(guò)程。

3.1. spring-boot-starter的實(shí)現(xiàn)步驟

對(duì)于一個(gè)spring-boot-starter實(shí)現(xiàn)需要包含如下幾個(gè)部分:

   1.  在pom.xml的定義

  •  定義最終要生成的starter組件信息 
  1. <groupId>org.apache.rocketmq</groupId>  
  2. <artifactId>spring-boot-starter-rocketmq</artifactId>  
  3. <version>1.0.0-SNAPSHOT</version> 
  •  定義依賴(lài)包,

它分為兩個(gè)部分: A、Spring自身的依賴(lài)包; B、RocketMQ的依賴(lài)包 

  1. <dependencies>  
  2.     <!-- spring-boot-start internal depdencies -->  
  3.     <dependency>  
  4.         <groupId>org.springframework.boot</groupId>  
  5.         <artifactId>spring-boot-starter</artifactId>  
  6.     </dependency>           
  7.     <dependency>  
  8.         <groupId>org.springframework.boot</groupId>  
  9.         <artifactId>spring-boot-starter-test</artifactId>  
  10.         <scope>test</scope>  
  11.     </dependency> 
  12.     <!-- rocketmq dependencies -->  
  13.     <dependency> 
  14.          <groupId>org.apache.rocketmq</groupId>  
  15.         <artifactId>rocketmq-client</artifactId>  
  16.         <version>${rocketmq-version}</version>  
  17.     </dependency>  
  18. </dependencies>      
  19.     <dependencyManagement>  
  20.     <dependencies> 
  21.          <!-- spring-boot-start parent depdency definition -->   
  22.         <dependency>  
  23.             <groupId>org.springframework.boot</groupId>  
  24.             <artifactId>spring-boot-starter-parent</artifactId>  
  25.             <version>${spring.boot.version}</version>  
  26.             <type>pom</type>  
  27.             <scope>import</scope>  
  28.         </dependency>  
  29.     </dependencies>  
  30. </dependencyManagement> 

    2.  配置文件類(lèi)

定義應(yīng)用屬性配置文件類(lèi)RocketMQProperties,這個(gè)Bean定義一組默認(rèn)的屬性值。用戶在使用最終的starter時(shí),可以根據(jù)這個(gè)類(lèi)定義的屬性來(lái)修改取值,當(dāng)然不是直接修改這個(gè)類(lèi)的配置,而是spring-boot應(yīng)用中對(duì)應(yīng)的配置文件:src/main/resources/application.properties.

    3.  定義自動(dòng)加載類(lèi)

定義 src/resources/META-INF/spring.factories文件中的自動(dòng)加載類(lèi), 其目的是讓spring boot更具文中中所指定的自動(dòng)化配置類(lèi)來(lái)自動(dòng)初始化相關(guān)的Bean,Component或Service,它的內(nèi)容如下: 

  1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=\  
  2. org.apache.rocketmq.spring.starter.RocketMQAutoConfiguration 

在RocketMQAutoConfiguration類(lèi)的具體實(shí)現(xiàn)中,定義開(kāi)放給用戶直接使用的Bean對(duì)象. 包括:

  •  RocketMQProperties 加載應(yīng)用屬性配置文件的處理類(lèi);
  •  RocketMQTemplate 發(fā)送端用戶發(fā)送消息的發(fā)送模板類(lèi);
  •  ListenerContainerConfiguration 容器Bean負(fù)責(zé)發(fā)現(xiàn)和注冊(cè)消費(fèi)端消費(fèi)實(shí)現(xiàn)接口類(lèi),這個(gè)類(lèi)要求:由@RocketMQMessageListener注解標(biāo)注;實(shí)現(xiàn)RocketMQListener泛化接口。

    4.  最后具體的RocketMQ相關(guān)的封裝

在發(fā)送端(producer)和消費(fèi)端(consumer)客戶端分別進(jìn)行封裝,在當(dāng)前的實(shí)現(xiàn)版本提供了對(duì)Spring Messaging接口的兼容方式。

3.2. 消息發(fā)送端實(shí)現(xiàn)

    1.  普通發(fā)送端

發(fā)送端的代碼封裝在RocketMQTemplate POJO中,下圖是發(fā)送端的相關(guān)代碼的調(diào)用關(guān)系圖:

為了與Spring Messaging的發(fā)送模板兼容,在RocketMQTemplate集成了AbstractMessageSendingTemplate抽象類(lèi),來(lái)支持相關(guān)的消息轉(zhuǎn)換和發(fā)送方法,這些方法最終會(huì)代理給doSend()方法;doSend()以及RocoketMQ所特有的一些方法如異步,單向和順序等方法直接添加到RoketMQTempalte中,這些方法直接代理調(diào)用到RocketMQ的Producer API來(lái)進(jìn)行消息的發(fā)送。

    2.  事務(wù)消息發(fā)送端

對(duì)于事務(wù)消息的處理,在消息發(fā)送端進(jìn)行了部分的擴(kuò)展,參考下圖的調(diào)用關(guān)系類(lèi)圖:

RocketMQTemplate里加入了一個(gè)發(fā)送事務(wù)消息的方法sendMessageInTransaction(), 并且最終這個(gè)方法會(huì)代理到RocketMQ的TransactionProducer進(jìn)行調(diào)用,在這個(gè)Producer上會(huì)注冊(cè)其關(guān)聯(lián)的TransactionListener實(shí)現(xiàn)類(lèi),以便在發(fā)送消息后能夠?qū)ransactionListener里的方法實(shí)現(xiàn)進(jìn)行調(diào)用。

3.3. 消息消費(fèi)端實(shí)現(xiàn)

在消費(fèi)端Spring-Boot應(yīng)用啟動(dòng)后,會(huì)掃描所有包含@RocketMQMessageListener注解的類(lèi)(這些類(lèi)需要集成RocketMQListener接口,并實(shí)現(xiàn)onMessage()方法),這個(gè)Listener會(huì)一對(duì)一的被放置到DefaultRocketMQListenerContainer容器對(duì)象中,容器對(duì)象會(huì)根據(jù)消費(fèi)的方式(并發(fā)或順序),將RocketMQListener封裝到具體的RocketMQ內(nèi)部的并發(fā)或者順序接口實(shí)現(xiàn)。在容器中創(chuàng)建RocketMQ Consumer對(duì)象,啟動(dòng)并監(jiān)聽(tīng)定制的Topic消息,如果有消費(fèi)消息,則回調(diào)到Listener的onMessage()方法。

使用示例

上面的一章介紹了RocketMQ在spring-boot-starter方式的實(shí)現(xiàn),這里通過(guò)一個(gè)最簡(jiǎn)單的消息發(fā)送和消費(fèi)的例子來(lái)介紹如何使這個(gè)rocketmq-spring-boot-starter。

4.1 RocketMQ服務(wù)端的準(zhǔn)備

  1.  啟動(dòng)NameServer和Broker

要驗(yàn)證RocketMQ的Spring-Boot客戶端,首先要確保RocketMQ服務(wù)正確的下載并啟動(dòng)。可以參考RocketMQ主站的快速開(kāi)始來(lái)進(jìn)行操作。確保啟動(dòng)NameServer和Broker已經(jīng)正確啟動(dòng)。

      2.  創(chuàng)建實(shí)例中所需要的Topics

在執(zhí)行啟動(dòng)命令的目錄下執(zhí)行下面的命令行操作 

  1. bash bin/mqadmin updateTopic -c DefaultCluster -t string-topic 

4.2. 編譯rocketmq-spring-boot-starter

目前的spring-boot-starter依賴(lài)還沒(méi)有提交的Maven的中心庫(kù),用戶使用前需要自行下載git源碼,然后執(zhí)行mvn clean install 安裝到本地倉(cāng)庫(kù)。 

  1. git clone https://github.com/apache/rocketmq-externals.git  
  2. cd rocketmq-spring-boot-starter  
  3. mvn clean install 

4.3. 編寫(xiě)客戶端代碼

用戶如果使用它,需要在消息的發(fā)布和消費(fèi)客戶端的maven配置文件pom.xml中添加如下的依賴(lài): 

  1. <properties>   <spring-boot-starter-rocketmq-version>1.0.0-SNAPSHOT</spring-boot-starter-rocketmq-version>  
  2. </properties>  
  3. <dependency>  
  4.    <groupId>org.apache.rocketmq</groupId>  
  5.    <artifactId>spring-boot-starter-rocketmq</artifactId>  
  6.    <version>${spring-boot-starter-rocketmq-version}</version>  
  7. </dependency> 

屬性spring-boot-starter-rocketmq-version的取值為:1.0.0-SNAPSHOT, 這與上一步驟中執(zhí)行安裝到本地倉(cāng)庫(kù)的版本一致。

  1.  消息發(fā)送端的代碼

發(fā)送端的配置文件application.properties 

  1. # 定義name-server地址  
  2. spring.rocketmq.name-server=localhost:9876  
  3. # 定義發(fā)布者組名  
  4. spring.rocketmq.producer.group=my-group1  
  5. # 定義要發(fā)送的topic  
  6. spring.rocketmq.topic=string-topic 

發(fā)送端的Java代碼 

  1. import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;  
  2. ...  
  3. @SpringBootApplication  
  4. public class ProducerApplication implements CommandLineRunner {  
  5.     // 聲明并引用RocketMQTemplate  
  6.     @Resource  
  7.     private RocketMQTemplate rocketMQTemplate;  
  8.     // 使用application.properties里定義的topic屬性  
  9.     @Value("${spring.rocketmq.springTopic}")  
  10.     private String springTopic;  
  11.     public static void main(String[] args){  
  12.         SpringApplication.run(ProducerApplication.class, args);  
  13.     }  
  14.     public void run(String... args) throws Exception {  
  15.         // 以同步的方式發(fā)送字符串消息給指定的topic  
  16.         SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello, World!");  
  17.         // 打印發(fā)送結(jié)果信息  
  18.         System.out.printf("string-topic syncSend1 sendResult=%s %n", sendResult);  
  19.     }  

     2.   消息消費(fèi)端代碼

消費(fèi)端的配置文件application.properties 

  1. # 定義name-server地址  
  2. spring.rocketmq.name-server=localhost:9876  
  3. # 定義發(fā)布者組名  
  4. spring.rocketmq.consumer.group=my-customer-group1  
  5. # 定義要發(fā)送的topic  
  6. spring.rocketmq.topic=string-topic 

消費(fèi)端的Java代碼 

  1. @SpringBootApplication  
  2. public class ConsumerApplication {  
  3.     public static void main(String[] args) {  
  4.         SpringApplication.run(ConsumerApplication.class, args);  
  5.     } 
  6.  
  7. // 聲明消費(fèi)消息的類(lèi),并在注解中指定,相關(guān)的消費(fèi)信息  
  8. @Service  
  9. @RocketMQMessageListener(topic = "${spring.rocketmq.topic}"consumerGroup = "${spring.rocketmq.consumer.group}" 
  10. class StringConsumer implements RocketMQListener{  
  11.     @Override  
  12.     public void onMessage(String message) {  
  13.         System.out.printf("------- StringConsumer received: %s %f", message);  
  14.     }  

這里只是簡(jiǎn)單的介紹了使用spring-boot來(lái)編寫(xiě)最基本的消息發(fā)送和接收的代碼,如果需要了解更多的調(diào)用方式,如: 異步發(fā)送,對(duì)象消息體,指定tag標(biāo)簽以及指定事務(wù)消息,請(qǐng)參看github的說(shuō)明文檔和詳細(xì)的代碼。我們后續(xù)還會(huì)對(duì)這些高級(jí)功能進(jìn)行陸續(xù)的介紹。 

 

責(zé)任編輯:龐桂玉 來(lái)源: JAVA高級(jí)架構(gòu)
相關(guān)推薦

2025-03-28 08:34:34

2024-01-05 16:43:30

數(shù)據(jù)庫(kù)線程

2020-08-26 07:17:19

通信

2021-04-15 00:16:18

JavaString字符串

2024-04-24 12:34:08

Spring事務(wù)編程

2021-05-12 22:07:43

并發(fā)編排任務(wù)

2022-04-01 12:51:44

命令Containerd

2020-12-08 08:08:51

Java接口數(shù)據(jù)

2021-03-24 10:20:50

Fonts前端代碼

2024-01-30 12:08:31

Go框架停止服務(wù)

2020-04-03 13:45:16

刪除Linux垃圾文件

2020-02-24 11:12:01

Linux電腦數(shù)據(jù)

2024-11-21 09:00:00

Python字典代碼

2019-11-18 15:50:11

AjaxJavascript前端

2022-05-24 06:07:48

JShack用戶代碼

2020-09-25 11:30:20

Java判空代碼

2020-04-10 10:22:12

Java判空編程語(yǔ)言

2024-05-23 12:11:39

2023-06-06 08:51:06

2009-12-15 13:41:49

Ruby向?qū)ο蟀l(fā)送消息
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)