自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

SpringBoot+RabbitMQ 實(shí)現(xiàn) RPC 調(diào)用

開發(fā) 架構(gòu)
說到 RPC(Remote Procedure Call Protocol 遠(yuǎn)程過程調(diào)用協(xié)議),小伙伴們腦海里蹦出的估計(jì)都是 RESTful API、Dubbo、WebService、Java RMI、CORBA 等。

[[435082]]

說到 RPC(Remote Procedure Call Protocol 遠(yuǎn)程過程調(diào)用協(xié)議),小伙伴們腦海里蹦出的估計(jì)都是 RESTful API、Dubbo、WebService、Java RMI、CORBA 等。

其實(shí),RabbitMQ 也給我們提供了 RPC 功能,并且使用起來很簡(jiǎn)單。

今天松哥通過一個(gè)簡(jiǎn)單的案例來和大家分享一下 Spring Boot+RabbitMQ 如何實(shí)現(xiàn)一個(gè)簡(jiǎn)單的 RPC 調(diào)用。

注意

關(guān)于 RabbitMQ 實(shí)現(xiàn) RPC 調(diào)用,有的小伙伴可能會(huì)有一些誤解,心想這還不簡(jiǎn)單?搞兩個(gè)消息隊(duì)列 queue_1 和 queue_2,首先客戶端發(fā)送消息到 queue_1 上,服務(wù)端監(jiān)聽 queue_1 上的消息,收到之后進(jìn)行處理;處理完成后,服務(wù)端發(fā)送消息到 queue_2 隊(duì)列上,然后客戶端監(jiān)聽 queue_2 隊(duì)列上的消息,這樣就知道服務(wù)端的處理結(jié)果了。

這種方式不是不可以,就是有點(diǎn)麻煩!RabbitMQ 中提供了現(xiàn)成的方案可以直接使用,非常方便。接下來我們就一起來學(xué)習(xí)下。

1. 架構(gòu)

先來看一個(gè)簡(jiǎn)單的架構(gòu)圖:

這張圖把問題說的很明白了:

首先 Client 發(fā)送一條消息,和普通的消息相比,這條消息多了兩個(gè)關(guān)鍵內(nèi)容:一個(gè)是 correlation_id,這個(gè)表示這條消息的唯一 id,還有一個(gè)內(nèi)容是 reply_to,這個(gè)表示消息回復(fù)隊(duì)列的名字。

Server 從消息發(fā)送隊(duì)列獲取消息并處理相應(yīng)的業(yè)務(wù)邏輯,處理完成后,將處理結(jié)果發(fā)送到 reply_to 指定的回調(diào)隊(duì)列中。

Client 從回調(diào)隊(duì)列中讀取消息,就可以知道消息的執(zhí)行情況是什么樣子了。

這種情況其實(shí)非常適合處理異步調(diào)用。

2. 實(shí)踐

接下來我們通過一個(gè)具體的例子來看看這個(gè)怎么玩。

2.1 客戶端開發(fā)

首先我們來創(chuàng)建一個(gè) Spring Boot 工程名為 producer,作為消息生產(chǎn)者,創(chuàng)建時(shí)候添加 web 和 rabbitmq 依賴,如下圖:

項(xiàng)目創(chuàng)建成功之后,首先在 application.properties 中配置 RabbitMQ 的基本信息,如下:

  1. spring.rabbitmq.host=localhost 
  2. spring.rabbitmq.port=5672 
  3. spring.rabbitmq.username=guest 
  4. spring.rabbitmq.password=guest 
  5. spring.rabbitmq.publisher-confirm-type=correlated 
  6. spring.rabbitmq.publisher-returns=true 

這個(gè)配置前面四行都好理解,我就不贅述,后面兩行:首先是配置消息確認(rèn)方式,我們通過 correlated 來確認(rèn),只有開啟了這個(gè)配置,將來的消息中才會(huì)帶 correlation_id,只有通過 correlation_id 我們才能將發(fā)送的消息和返回值之間關(guān)聯(lián)起來。最后一行配置則是開啟發(fā)送失敗退回。

接下來我們來提供一個(gè)配置類,如下:

  1. /** 
  2.  * @author 江南一點(diǎn)雨 
  3.  * @微信公眾號(hào) 江南一點(diǎn)雨 
  4.  * @網(wǎng)站 http://www.itboyhub.com 
  5.  * @國(guó)際站 http://www.javaboy.org 
  6.  * @微信 a_java_boy 
  7.  * @GitHub https://github.com/lenve 
  8.  * @Gitee https://gitee.com/lenve 
  9.  */ 
  10. @Configuration 
  11. public class RabbitConfig { 
  12.  
  13.     public static final String RPC_QUEUE1 = "queue_1"
  14.     public static final String RPC_QUEUE2 = "queue_2"
  15.     public static final String RPC_EXCHANGE = "rpc_exchange"
  16.  
  17.     /** 
  18.      * 設(shè)置消息發(fā)送RPC隊(duì)列 
  19.      */ 
  20.     @Bean 
  21.     Queue msgQueue() { 
  22.         return new Queue(RPC_QUEUE1); 
  23.     } 
  24.  
  25.     /** 
  26.      * 設(shè)置返回隊(duì)列 
  27.      */ 
  28.     @Bean 
  29.     Queue replyQueue() { 
  30.         return new Queue(RPC_QUEUE2); 
  31.     } 
  32.  
  33.     /** 
  34.      * 設(shè)置交換機(jī) 
  35.      */ 
  36.     @Bean 
  37.     TopicExchange exchange() { 
  38.         return new TopicExchange(RPC_EXCHANGE); 
  39.     } 
  40.  
  41.     /** 
  42.      * 請(qǐng)求隊(duì)列和交換器綁定 
  43.      */ 
  44.     @Bean 
  45.     Binding msgBinding() { 
  46.         return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1); 
  47.     } 
  48.  
  49.     /** 
  50.      * 返回隊(duì)列和交換器綁定 
  51.      */ 
  52.     @Bean 
  53.     Binding replyBinding() { 
  54.         return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2); 
  55.     } 
  56.  
  57.  
  58.     /** 
  59.      * 使用 RabbitTemplate發(fā)送和接收消息 
  60.      * 并設(shè)置回調(diào)隊(duì)列地址 
  61.      */ 
  62.     @Bean 
  63.     RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { 
  64.         RabbitTemplate template = new RabbitTemplate(connectionFactory); 
  65.         template.setReplyAddress(RPC_QUEUE2); 
  66.         template.setReplyTimeout(6000); 
  67.         return template; 
  68.     } 
  69.  
  70.  
  71.     /** 
  72.      * 給返回隊(duì)列設(shè)置監(jiān)聽器 
  73.      */ 
  74.     @Bean 
  75.     SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) { 
  76.         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
  77.         container.setConnectionFactory(connectionFactory); 
  78.         container.setQueueNames(RPC_QUEUE2); 
  79.         container.setMessageListener(rabbitTemplate(connectionFactory)); 
  80.         return container; 
  81.     } 

這個(gè)配置類中我們分別配置了消息發(fā)送隊(duì)列 msgQueue 和消息返回隊(duì)列 replyQueue,然后將這兩個(gè)隊(duì)列和消息交換機(jī)進(jìn)行綁定。這個(gè)都是 RabbitMQ 的常規(guī)操作,沒啥好說的。

在 Spring Boot 中我們負(fù)責(zé)消息發(fā)送的工具是 RabbitTemplate,默認(rèn)情況下,系統(tǒng)自動(dòng)提供了該工具,但是這里我們需要對(duì)該工具重新進(jìn)行定制,主要是添加消息發(fā)送的返回隊(duì)列,最后我們還需要給返回隊(duì)列設(shè)置一個(gè)監(jiān)聽器。

好啦,接下來我們就可以開始具體的消息發(fā)送了:

  1. /** 
  2.  * @author 江南一點(diǎn)雨 
  3.  * @微信公眾號(hào) 江南一點(diǎn)雨 
  4.  * @網(wǎng)站 http://www.itboyhub.com 
  5.  * @國(guó)際站 http://www.javaboy.org 
  6.  * @微信 a_java_boy 
  7.  * @GitHub https://github.com/lenve 
  8.  * @Gitee https://gitee.com/lenve 
  9.  */ 
  10. @RestController 
  11. public class RpcClientController { 
  12.  
  13.     private static final Logger logger = LoggerFactory.getLogger(RpcClientController.class); 
  14.  
  15.     @Autowired 
  16.     private RabbitTemplate rabbitTemplate; 
  17.  
  18.     @GetMapping("/send"
  19.     public String send(String message) { 
  20.         // 創(chuàng)建消息對(duì)象 
  21.         Message newMessage = MessageBuilder.withBody(message.getBytes()).build(); 
  22.  
  23.         logger.info("client send:{}", newMessage); 
  24.  
  25.         //客戶端發(fā)送消息 
  26.         Message result = rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE1, newMessage); 
  27.  
  28.         String response = ""
  29.         if (result != null) { 
  30.             // 獲取已發(fā)送的消息的 correlationId 
  31.             String correlationId = newMessage.getMessageProperties().getCorrelationId(); 
  32.             logger.info("correlationId:{}", correlationId); 
  33.  
  34.             // 獲取響應(yīng)頭信息 
  35.             HashMap<String, Object> headers = (HashMap<String, Object>) result.getMessageProperties().getHeaders(); 
  36.  
  37.             // 獲取 server 返回的消息 id 
  38.             String msgId = (String) headers.get("spring_returned_message_correlation"); 
  39.  
  40.             if (msgId.equals(correlationId)) { 
  41.                 response = new String(result.getBody()); 
  42.                 logger.info("client receive:{}", response); 
  43.             } 
  44.         } 
  45.         return response; 
  46.     } 

這塊的代碼其實(shí)也都是一些常規(guī)代碼,我挑幾個(gè)關(guān)鍵的節(jié)點(diǎn)說下:

  • 消息發(fā)送調(diào)用 sendAndReceive 方法,該方法自帶返回值,返回值就是服務(wù)端返回的消息。
  • 服務(wù)端返回的消息中,頭信息中包含了 spring_returned_message_correlation 字段,這個(gè)就是消息發(fā)送時(shí)候的 correlation_id,通過消息發(fā)送時(shí)候的 correlation_id 以及返回消息頭中的 spring_returned_message_correlation 字段值,我們就可以將返回的消息內(nèi)容和發(fā)送的消息綁定到一起,確認(rèn)出這個(gè)返回的內(nèi)容就是針對(duì)這個(gè)發(fā)送的消息的。

這就是整個(gè)客戶端的開發(fā),其實(shí)最最核心的就是 sendAndReceive 方法的調(diào)用。調(diào)用雖然簡(jiǎn)單,但是準(zhǔn)備工作還是要做足夠。例如如果我們沒有在 application.properties 中配置 correlated,發(fā)送的消息中就沒有 correlation_id,這樣就無法將返回的消息內(nèi)容和發(fā)送的消息內(nèi)容關(guān)聯(lián)起來。

2.2 服務(wù)端開發(fā)

再來看看服務(wù)端的開發(fā)。

首先創(chuàng)建一個(gè)名為 consumer 的 Spring Boot 項(xiàng)目,創(chuàng)建項(xiàng)目添加的依賴和客戶端開發(fā)創(chuàng)建的依賴是一致的,不再贅述。

然后配置 application.properties 配置文件,該文件的配置也和客戶端中的配置一致,不再贅述。

接下來提供一個(gè) RabbitMQ 的配置類,這個(gè)配置類就比較簡(jiǎn)單,單純的配置一下消息隊(duì)列并將之和消息交換機(jī)綁定起來,如下:

  1. /** 
  2.  * @author 江南一點(diǎn)雨 
  3.  * @微信公眾號(hào) 江南一點(diǎn)雨 
  4.  * @網(wǎng)站 http://www.itboyhub.com 
  5.  * @國(guó)際站 http://www.javaboy.org 
  6.  * @微信 a_java_boy 
  7.  * @GitHub https://github.com/lenve 
  8.  * @Gitee https://gitee.com/lenve 
  9.  */ 
  10. @Configuration 
  11. public class RabbitConfig { 
  12.  
  13.     public static final String RPC_QUEUE1 = "queue_1"
  14.     public static final String RPC_QUEUE2 = "queue_2"
  15.     public static final String RPC_EXCHANGE = "rpc_exchange"
  16.  
  17.     /** 
  18.      * 配置消息發(fā)送隊(duì)列 
  19.      */ 
  20.     @Bean 
  21.     Queue msgQueue() { 
  22.         return new Queue(RPC_QUEUE1); 
  23.     } 
  24.  
  25.     /** 
  26.      * 設(shè)置返回隊(duì)列 
  27.      */ 
  28.     @Bean 
  29.     Queue replyQueue() { 
  30.         return new Queue(RPC_QUEUE2); 
  31.     } 
  32.  
  33.     /** 
  34.      * 設(shè)置交換機(jī) 
  35.      */ 
  36.     @Bean 
  37.     TopicExchange exchange() { 
  38.         return new TopicExchange(RPC_EXCHANGE); 
  39.     } 
  40.  
  41.     /** 
  42.      * 請(qǐng)求隊(duì)列和交換器綁定 
  43.      */ 
  44.     @Bean 
  45.     Binding msgBinding() { 
  46.         return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1); 
  47.     } 
  48.  
  49.     /** 
  50.      * 返回隊(duì)列和交換器綁定 
  51.      */ 
  52.     @Bean 
  53.     Binding replyBinding() { 
  54.         return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2); 
  55.     } 

最后我們?cè)賮砜聪孪⒌南M(fèi):

  1. @Component 
  2. public class RpcServerController { 
  3.     private static final Logger logger = LoggerFactory.getLogger(RpcServerController.class); 
  4.     @Autowired 
  5.     private RabbitTemplate rabbitTemplate; 
  6.  
  7.     @RabbitListener(queues = RabbitConfig.RPC_QUEUE1) 
  8.     public void process(Message msg) { 
  9.         logger.info("server receive : {}",msg.toString()); 
  10.         Message response = MessageBuilder.withBody(("i'm receive:"+new String(msg.getBody())).getBytes()).build(); 
  11.         CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId()); 
  12.         rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE2, response, correlationData); 
  13.     } 

這里的邏輯就比較簡(jiǎn)單了:

  • 服務(wù)端首先收到消息并打印出來。
  • 服務(wù)端提取出原消息中的 correlation_id。
  • 服務(wù)端調(diào)用 sendAndReceive 方法,將消息發(fā)送給 RPC_QUEUE2 隊(duì)列,同時(shí)帶上 correlation_id 參數(shù)。

服務(wù)端的消息發(fā)出后,客戶端將收到服務(wù)端返回的結(jié)果。

OK,大功告成。

2.3 測(cè)試

接下來我們進(jìn)行一個(gè)簡(jiǎn)單測(cè)試。

首先啟動(dòng) RabbitMQ。

接下來分別啟動(dòng) producer 和 consumer,然后在 postman 中調(diào)用 producer 的接口進(jìn)行測(cè)試,如下:

可以看到,已經(jīng)收到了服務(wù)端的返回信息。

來看看 producer 的運(yùn)行日志:

可以看到,消息發(fā)送出去后,同時(shí)也收到了 consumer 返回的信息。

可以看到,consumer 也收到了客戶端發(fā)來的消息。

3. 小結(jié)

好啦,一個(gè)小小的案例,帶小伙伴們體驗(yàn)一把 RabbitMQ 實(shí)現(xiàn) RPC 調(diào)用。

責(zé)任編輯:武曉燕 來源: 江南一點(diǎn)雨
相關(guān)推薦

2020-09-14 11:50:21

SpringBootRabbitMQJava

2014-09-02 10:43:45

RedisRPC

2024-05-31 08:45:24

2024-09-05 08:58:37

2024-12-24 08:44:55

ActiveMQRabbitMQ交換機(jī)

2024-11-04 08:02:23

SpringRabbitMQ中間件

2022-04-02 07:52:47

DubboRPC調(diào)用動(dòng)態(tài)代理

2023-10-10 13:39:53

Spring隊(duì)列優(yōu)化

2020-09-08 07:37:44

springBoot MQ rabbitMQ

2023-08-30 08:33:07

RabbitMQSpringBoot消息信息

2024-11-14 09:40:06

RPC框架NettyJava

2014-07-22 10:42:04

2024-09-02 09:14:36

SpringRabbitMQ數(shù)據(jù)

2022-01-07 06:12:08

RPC框架限流

2023-09-05 15:48:14

RabbitMQ延遲隊(duì)列

2024-12-31 00:00:00

RabbitMQ插件代碼

2021-10-19 08:58:48

Java 語言 Java 基礎(chǔ)

2021-10-21 08:21:10

Java Reflect Java 基礎(chǔ)

2012-11-23 14:26:40

IBMdW

2021-01-19 09:19:33

RPC調(diào)用過程框架
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)