自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

手把手教你 springBoot 整合 rabbitMQ,利用 MQ 實現(xiàn)事務(wù)補償

開發(fā) 前端
rabbitMQ 在互聯(lián)網(wǎng)公司有著大規(guī)模應用,本篇將實戰(zhàn)介紹 springboot 整合 rabbitMQ,同時也將在具體的業(yè)務(wù)場景中介紹利用 MQ 實現(xiàn)事務(wù)補償操作。

[[341134]]

本文轉(zhuǎn)載自微信公眾號「Java極客技術(shù)」,作者鴨血粉絲。轉(zhuǎn)載本文請聯(lián)系Java極客技術(shù)公眾號。  

rabbitMQ 在互聯(lián)網(wǎng)公司有著大規(guī)模應用,本篇將實戰(zhàn)介紹 springboot 整合 rabbitMQ,同時也將在具體的業(yè)務(wù)場景中介紹利用 MQ 實現(xiàn)事務(wù)補償操作。

一、介紹

本篇我們一起來實操一下SpringBoot整合rabbitMQ,為后續(xù)業(yè)務(wù)處理做鋪墊。

廢話不多說,直奔主題!

二、整合實戰(zhàn)

2.1、創(chuàng)建一個 maven 工程,引入 amqp 包

  1. <!--amqp 支持--> 
  2. <dependency> 
  3.     <groupId>org.springframework.boot</groupId> 
  4.     <artifactId>spring-boot-starter-amqp</artifactId> 
  5. </dependency> 

2.2、在全局文件中配置 rabbitMQ 服務(wù)信息

  1. spring.rabbitmq.addresses=197.168.24.206:5672 
  2. spring.rabbitmq.username=guest 
  3. spring.rabbitmq.password=guest 
  4. spring.rabbitmq.virtual-host=/ 

其中,spring.rabbitmq.addresses參數(shù)值為 rabbitmq 服務(wù)器地址

2.3、編寫 rabbitmq 配置類

  1. @Slf4j 
  2. @Configuration 
  3. public class RabbitConfig { 
  4.  
  5.     /** 
  6.      * 初始化連接工廠 
  7.      * @param addresses 
  8.      * @param userName 
  9.      * @param password 
  10.      * @param vhost 
  11.      * @return 
  12.      */ 
  13.     @Bean 
  14.     ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.addresses}") String addresses, 
  15.                                         @Value("${spring.rabbitmq.username}") String userName, 
  16.                                         @Value("${spring.rabbitmq.password}") String password
  17.                                         @Value("${spring.rabbitmq.virtual-host}") String vhost) { 
  18.         CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); 
  19.         connectionFactory.setAddresses(addresses); 
  20.         connectionFactory.setUsername(userName); 
  21.         connectionFactory.setPassword(password); 
  22.         connectionFactory.setVirtualHost(vhost); 
  23.         return connectionFactory; 
  24.     } 
  25.  
  26.     /** 
  27.      * 重新實例化 RabbitAdmin 操作類 
  28.      * @param connectionFactory 
  29.      * @return 
  30.      */ 
  31.     @Bean 
  32.     public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ 
  33.         return new RabbitAdmin(connectionFactory); 
  34.     } 
  35.  
  36.     /** 
  37.      * 重新實例化 RabbitTemplate 操作類 
  38.      * @param connectionFactory 
  39.      * @return 
  40.      */ 
  41.     @Bean 
  42.     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ 
  43.         RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory); 
  44.         //數(shù)據(jù)轉(zhuǎn)換為json存入消息隊列 
  45.         rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); 
  46.         return rabbitTemplate; 
  47.     } 
  48.  
  49.     /** 
  50.      * 將 RabbitUtil 操作工具類加入IOC容器 
  51.      * @return 
  52.      */ 
  53.     @Bean 
  54.     public RabbitUtil rabbitUtil(){ 
  55.         return new RabbitUtil(); 
  56.     } 
  57.  

2.4、編寫 RabbitUtil 工具類

  1. public class RabbitUtil { 
  2.  
  3.     private static final Logger logger = LoggerFactory.getLogger(RabbitUtil.class); 
  4.  
  5.     @Autowired 
  6.     private RabbitAdmin rabbitAdmin; 
  7.  
  8.     @Autowired 
  9.     private RabbitTemplate rabbitTemplate; 
  10.  
  11.     /** 
  12.      * 創(chuàng)建Exchange 
  13.      * @param exchangeName 
  14.      */ 
  15.     public void addExchange(String exchangeType, String exchangeName){ 
  16.         Exchange exchange = createExchange(exchangeType, exchangeName); 
  17.         rabbitAdmin.declareExchange(exchange); 
  18.     } 
  19.  
  20.     /** 
  21.      * 刪除一個Exchange 
  22.      * @param exchangeName 
  23.      */ 
  24.     public boolean deleteExchange(String exchangeName){ 
  25.         return rabbitAdmin.deleteExchange(exchangeName); 
  26.     } 
  27.  
  28.     /** 
  29.      * 創(chuàng)建一個指定的Queue 
  30.      * @param queueName 
  31.      * @return queueName 
  32.      */ 
  33.     public void addQueue(String queueName){ 
  34.         Queue queue = createQueue(queueName); 
  35.         rabbitAdmin.declareQueue(queue); 
  36.     } 
  37.  
  38.     /** 
  39.      * 刪除一個queue 
  40.      * @return queueName 
  41.      * @param queueName 
  42.      */ 
  43.     public boolean deleteQueue(String queueName){ 
  44.         return rabbitAdmin.deleteQueue(queueName); 
  45.     } 
  46.  
  47.     /** 
  48.      * 按照篩選條件,刪除隊列 
  49.      * @param queueName 
  50.      * @param unused 是否被使用 
  51.      * @param empty 內(nèi)容是否為空 
  52.      */ 
  53.     public void deleteQueue(String queueName, boolean unused, boolean empty){ 
  54.         rabbitAdmin.deleteQueue(queueName,unused,empty); 
  55.     } 
  56.  
  57.     /** 
  58.      * 清空某個隊列中的消息,注意,清空的消息并沒有被消費 
  59.      * @return queueName 
  60.      * @param queueName 
  61.      */ 
  62.     public void purgeQueue(String queueName){ 
  63.         rabbitAdmin.purgeQueue(queueName, false); 
  64.     } 
  65.  
  66.     /** 
  67.      * 判斷指定的隊列是否存在 
  68.      * @param queueName 
  69.      * @return 
  70.      */ 
  71.     public boolean existQueue(String queueName){ 
  72.         return rabbitAdmin.getQueueProperties(queueName) == null ? false : true
  73.     } 
  74.  
  75.     /** 
  76.      * 綁定一個隊列到一個匹配型交換器使用一個routingKey 
  77.      * @param exchangeType 
  78.      * @param exchangeName 
  79.      * @param queueName 
  80.      * @param routingKey 
  81.      * @param isWhereAll 
  82.      * @param headers EADERS模式類型設(shè)置,其他模式類型傳空 
  83.      */ 
  84.     public void addBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){ 
  85.         Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers); 
  86.         rabbitAdmin.declareBinding(binding); 
  87.     } 
  88.  
  89.     /** 
  90.      * 聲明綁定 
  91.      * @param binding 
  92.      */ 
  93.     public void addBinding(Binding binding){ 
  94.         rabbitAdmin.declareBinding(binding); 
  95.     } 
  96.  
  97.     /** 
  98.      * 解除交換器與隊列的綁定 
  99.      * @param exchangeType 
  100.      * @param exchangeName 
  101.      * @param queueName 
  102.      * @param routingKey 
  103.      * @param isWhereAll 
  104.      * @param headers 
  105.      */ 
  106.     public void removeBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){ 
  107.         Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers); 
  108.         removeBinding(binding); 
  109.     } 
  110.  
  111.     /** 
  112.      * 解除交換器與隊列的綁定 
  113.      * @param binding 
  114.      */ 
  115.     public void removeBinding(Binding binding){ 
  116.         rabbitAdmin.removeBinding(binding); 
  117.     } 
  118.  
  119.     /** 
  120.      * 創(chuàng)建一個交換器、隊列,并綁定隊列 
  121.      * @param exchangeType 
  122.      * @param exchangeName 
  123.      * @param queueName 
  124.      * @param routingKey 
  125.      * @param isWhereAll 
  126.      * @param headers 
  127.      */ 
  128.     public void andExchangeBindingQueue(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){ 
  129.         //聲明交換器 
  130.         addExchange(exchangeType, exchangeName); 
  131.         //聲明隊列 
  132.         addQueue(queueName); 
  133.         //聲明綁定關(guān)系 
  134.         addBinding(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers); 
  135.     } 
  136.  
  137.     /** 
  138.      * 發(fā)送消息 
  139.      * @param exchange 
  140.      * @param routingKey 
  141.      * @param object 
  142.      */ 
  143.     public void convertAndSend(String exchange, String routingKey, final Object object){ 
  144.         rabbitTemplate.convertAndSend(exchange, routingKey, object); 
  145.     } 
  146.  
  147.     /** 
  148.      * 轉(zhuǎn)換Message對象 
  149.      * @param messageType 
  150.      * @param msg 
  151.      * @return 
  152.      */ 
  153.     public Message getMessage(String messageType, Object msg){ 
  154.         MessageProperties messageProperties = new MessageProperties(); 
  155.         messageProperties.setContentType(messageType); 
  156.         Message message = new Message(msg.toString().getBytes(),messageProperties); 
  157.         return message; 
  158.     } 
  159.  
  160.     /** 
  161.      * 聲明交換機 
  162.      * @param exchangeType 
  163.      * @param exchangeName 
  164.      * @return 
  165.      */ 
  166.     private Exchange createExchange(String exchangeType, String exchangeName){ 
  167.         if(ExchangeType.DIRECT.equals(exchangeType)){ 
  168.             return new DirectExchange(exchangeName); 
  169.         } 
  170.         if(ExchangeType.TOPIC.equals(exchangeType)){ 
  171.             return new TopicExchange(exchangeName); 
  172.         } 
  173.         if(ExchangeType.HEADERS.equals(exchangeType)){ 
  174.             return new HeadersExchange(exchangeName); 
  175.         } 
  176.         if(ExchangeType.FANOUT.equals(exchangeType)){ 
  177.             return new FanoutExchange(exchangeName); 
  178.         } 
  179.         return null
  180.     } 
  181.  
  182.     /** 
  183.      * 聲明綁定關(guān)系 
  184.      * @param exchangeType 
  185.      * @param exchangeName 
  186.      * @param queueName 
  187.      * @param routingKey 
  188.      * @param isWhereAll 
  189.      * @param headers 
  190.      * @return 
  191.      */ 
  192.     private Binding bindingBuilder(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){ 
  193.         if(ExchangeType.DIRECT.equals(exchangeType)){ 
  194.             return BindingBuilder.bind(new Queue(queueName)).to(new DirectExchange(exchangeName)).with(routingKey); 
  195.         } 
  196.         if(ExchangeType.TOPIC.equals(exchangeType)){ 
  197.             return BindingBuilder.bind(new Queue(queueName)).to(new TopicExchange(exchangeName)).with(routingKey); 
  198.         } 
  199.         if(ExchangeType.HEADERS.equals(exchangeType)){ 
  200.             if(isWhereAll){ 
  201.                 return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAll(headers).match(); 
  202.             }else
  203.                 return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAny(headers).match(); 
  204.             } 
  205.         } 
  206.         if(ExchangeType.FANOUT.equals(exchangeType)){ 
  207.             return BindingBuilder.bind(new Queue(queueName)).to(new FanoutExchange(exchangeName)); 
  208.         } 
  209.         return null
  210.     } 
  211.  
  212.     /** 
  213.      * 聲明隊列 
  214.      * @param queueName 
  215.      * @return 
  216.      */ 
  217.     private Queue createQueue(String queueName){ 
  218.         return new Queue(queueName); 
  219.     } 
  220.  
  221.  
  222.     /** 
  223.      * 交換器類型 
  224.      */ 
  225.     public final static class ExchangeType { 
  226.  
  227.         /** 
  228.          * 直連交換機(全文匹配) 
  229.          */ 
  230.         public final static String DIRECT = "DIRECT"
  231.  
  232.         /** 
  233.          * 通配符交換機(兩種通配符:*只能匹配一個單詞,#可以匹配零個或多個) 
  234.          */ 
  235.         public final static String TOPIC = "TOPIC"
  236.  
  237.         /** 
  238.          * 頭交換機(自定義鍵值對匹配,根據(jù)發(fā)送消息內(nèi)容中的headers屬性進行匹配) 
  239.          */ 
  240.         public final static String HEADERS = "HEADERS"
  241.  
  242.         /** 
  243.          * 扇形(廣播)交換機 (將消息轉(zhuǎn)發(fā)到所有與該交互機綁定的隊列上) 
  244.          */ 
  245.         public final static String FANOUT = "FANOUT"
  246.     } 

此致, rabbitMQ 核心操作功能操作已經(jīng)開發(fā)完畢!

2.5、編寫隊列監(jiān)聽類(靜態(tài))

  1. @Slf4j 
  2. @Configuration 
  3. public class DirectConsumeListener { 
  4.  
  5.     /** 
  6.      * 監(jiān)聽指定隊列,名稱:mq.direct.1 
  7.      * @param message 
  8.      * @param channel 
  9.      * @throws IOException 
  10.      */ 
  11.     @RabbitListener(queues = "mq.direct.1"
  12.     public void consume(Message message, Channel channel) throws IOException { 
  13.         log.info("DirectConsumeListener,收到消息: {}", message.toString()); 
  14.     } 

如果你需要監(jiān)聽指定的隊列,只需要方法上加上@RabbitListener(queues = "")即可,同時填寫對應的隊列名稱。

但是,如果你想動態(tài)監(jiān)聽隊列,而不是通過寫死在方法上呢?

請看下面介紹!

2.6、編寫隊列監(jiān)聽類(動態(tài))

重新實例化一個SimpleMessageListenerContainer對象,這個對象就是監(jiān)聽容器。

  1. @Slf4j 
  2. @Configuration 
  3. public class DynamicConsumeListener { 
  4.  
  5.     /** 
  6.      * 使用SimpleMessageListenerContainer實現(xiàn)動態(tài)監(jiān)聽 
  7.      * @param connectionFactory 
  8.      * @return 
  9.      */ 
  10.     @Bean 
  11.     public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ 
  12.         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); 
  13.         container.setMessageListener((MessageListener) message -> { 
  14.             log.info("ConsumerMessageListen,收到消息: {}", message.toString()); 
  15.         }); 
  16.         return container; 
  17.     } 

如果想向SimpleMessageListenerContainer添加監(jiān)聽隊列或者移除隊列,只需通過如下方式即可操作。

  1. @Slf4j 
  2. @RestController 
  3. @RequestMapping("/consumer"
  4. public class ConsumerController { 
  5.  
  6.     @Autowired 
  7.     private SimpleMessageListenerContainer container; 
  8.  
  9.     @Autowired 
  10.     private RabbitUtil rabbitUtil; 
  11.  
  12.     /** 
  13.      * 添加隊列到監(jiān)聽器 
  14.      * @param consumerInfo 
  15.      */ 
  16.     @PostMapping("addQueue"
  17.     public void addQueue(@RequestBody ConsumerInfo consumerInfo) { 
  18.         boolean existQueue = rabbitUtil.existQueue(consumerInfo.getQueueName()); 
  19.         if(!existQueue){ 
  20.             throw new CommonExecption("當前隊列不存在"); 
  21.         } 
  22.         //消費mq消息的類 
  23.         container.addQueueNames(consumerInfo.getQueueName()); 
  24.         //打印監(jiān)聽容器中正在監(jiān)聽到隊列 
  25.         log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames())); 
  26.     } 
  27.  
  28.     /** 
  29.      * 移除正在監(jiān)聽的隊列 
  30.      * @param consumerInfo 
  31.      */ 
  32.     @PostMapping("removeQueue"
  33.     public void removeQueue(@RequestBody ConsumerInfo consumerInfo) { 
  34.         //消費mq消息的類 
  35.         container.removeQueueNames(consumerInfo.getQueueName()); 
  36.         //打印監(jiān)聽容器中正在監(jiān)聽到隊列 
  37.         log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames())); 
  38.     } 
  39.  
  40.     /** 
  41.      * 查詢監(jiān)聽容器中正在監(jiān)聽到隊列 
  42.      */ 
  43.     @PostMapping("queryListenerQueue"
  44.     public void queryListenerQueue() { 
  45.         log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames())); 
  46.     } 

2.7、發(fā)送消息到交換器

發(fā)送消息到交換器,非常簡單,只需要通過如下方式即可!

  • 先編寫一個請求參數(shù)實體類
  1. @Data 
  2. public class ProduceInfo implements Serializable { 
  3.  
  4.     private static final long serialVersionUID = 1l; 
  5.  
  6.     /** 
  7.      * 交換器名稱 
  8.      */ 
  9.     private String exchangeName; 
  10.  
  11.     /** 
  12.      * 路由鍵key 
  13.      */ 
  14.     private String routingKey; 
  15.  
  16.     /** 
  17.      * 消息內(nèi)容 
  18.      */ 
  19.     public String msg; 
  • 編寫接口api
  1. @RestController 
  2. @RequestMapping("/produce"
  3. public class ProduceController { 
  4.  
  5.     @Autowired 
  6.     private RabbitUtil rabbitUtil; 
  7.  
  8.     /** 
  9.      * 發(fā)送消息到交換器 
  10.      * @param produceInfo 
  11.      */ 
  12.     @PostMapping("sendMessage"
  13.     public void sendMessage(@RequestBody ProduceInfo produceInfo) { 
  14.         rabbitUtil.convertAndSend(produceInfo.getExchangeName(), produceInfo.getRoutingKey(), produceInfo); 
  15.     } 
  16.  

當然,你也可以直接使用rabbitTemplate操作類,來實現(xiàn)發(fā)送消息。

  1. rabbitTemplate.convertAndSend(exchange, routingKey, message); 

參數(shù)內(nèi)容解釋:

  • exchange:表示交換器名稱
  • routingKey:表示路由鍵key
  • message:表示消息

2.8、交換器、隊列維護操作

如果想通過接口對 rabbitMQ 中的交換器、隊列以及綁定關(guān)系進行維護,通過如下方式接口操作,即可實現(xiàn)!

先編寫一個請求參數(shù)實體類

  1. @Data 
  2. public class QueueConfig implements Serializable
  3.  
  4.     private static final long serialVersionUID = 1l; 
  5.  
  6.     /** 
  7.      * 交換器類型 
  8.      */ 
  9.     private String exchangeType; 
  10.  
  11.     /** 
  12.      * 交換器名稱 
  13.      */ 
  14.     private String exchangeName; 
  15.  
  16.     /** 
  17.      * 隊列名稱 
  18.      */ 
  19.     private String queueName; 
  20.  
  21.     /** 
  22.      * 路由鍵key 
  23.      */ 
  24.     private String routingKey; 

編寫接口api

  1. /** 
  2.  * rabbitMQ管理操作控制層 
  3.  */ 
  4. @RestController 
  5. @RequestMapping("/config"
  6. public class RabbitController { 
  7.  
  8.  
  9.     @Autowired 
  10.     private RabbitUtil rabbitUtil; 
  11.  
  12.     /** 
  13.      * 創(chuàng)建交換器 
  14.      * @param config 
  15.      */ 
  16.     @PostMapping("addExchange"
  17.     public void addExchange(@RequestBody QueueConfig config) { 
  18.         rabbitUtil.addExchange(config.getExchangeType(), config.getExchangeName()); 
  19.     } 
  20.  
  21.     /** 
  22.      * 刪除交換器 
  23.      * @param config 
  24.      */ 
  25.     @PostMapping("deleteExchange"
  26.     public void deleteExchange(@RequestBody QueueConfig config) { 
  27.         rabbitUtil.deleteExchange(config.getExchangeName()); 
  28.     } 
  29.  
  30.     /** 
  31.      * 添加隊列 
  32.      * @param config 
  33.      */ 
  34.     @PostMapping("addQueue"
  35.     public void addQueue(@RequestBody QueueConfig config) { 
  36.         rabbitUtil.addQueue(config.getQueueName()); 
  37.     } 
  38.  
  39.     /** 
  40.      * 刪除隊列 
  41.      * @param config 
  42.      */ 
  43.     @PostMapping("deleteQueue"
  44.     public void deleteQueue(@RequestBody QueueConfig config) { 
  45.         rabbitUtil.deleteQueue(config.getQueueName()); 
  46.     } 
  47.  
  48.     /** 
  49.      * 清空隊列數(shù)據(jù) 
  50.      * @param config 
  51.      */ 
  52.     @PostMapping("purgeQueue"
  53.     public void purgeQueue(@RequestBody QueueConfig config) { 
  54.         rabbitUtil.purgeQueue(config.getQueueName()); 
  55.     } 
  56.  
  57.     /** 
  58.      * 添加綁定 
  59.      * @param config 
  60.      */ 
  61.     @PostMapping("addBinding"
  62.     public void addBinding(@RequestBody QueueConfig config) { 
  63.         rabbitUtil.addBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), falsenull); 
  64.     } 
  65.  
  66.     /** 
  67.      * 解除綁定 
  68.      * @param config 
  69.      */ 
  70.     @PostMapping("removeBinding"
  71.     public void removeBinding(@RequestBody QueueConfig config) { 
  72.         rabbitUtil.removeBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), falsenull); 
  73.     } 
  74.  
  75.     /** 
  76.      * 創(chuàng)建頭部類型的交換器 
  77.      * 判斷條件是所有的鍵值對都匹配成功才發(fā)送到隊列 
  78.      * @param config 
  79.      */ 
  80.     @PostMapping("andExchangeBindingQueueOfHeaderAll"
  81.     public void andExchangeBindingQueueOfHeaderAll(@RequestBody QueueConfig config) { 
  82.         HashMap<String, Object> header = new HashMap<>(); 
  83.         header.put("queue""queue"); 
  84.         header.put("bindType""whereAll"); 
  85.         rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), nulltrue, header); 
  86.     } 
  87.  
  88.     /** 
  89.      * 創(chuàng)建頭部類型的交換器 
  90.      * 判斷條件是只要有一個鍵值對匹配成功就發(fā)送到隊列 
  91.      * @param config 
  92.      */ 
  93.     @PostMapping("andExchangeBindingQueueOfHeaderAny"
  94.     public void andExchangeBindingQueueOfHeaderAny(@RequestBody QueueConfig config) { 
  95.         HashMap<String, Object> header = new HashMap<>(); 
  96.         header.put("queue""queue"); 
  97.         header.put("bindType""whereAny"); 
  98.         rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), nullfalse, header); 
  99.     } 

至此,rabbitMQ 管理器基本的 crud 全部開發(fā)完成!

三、利用 MQ 實現(xiàn)事務(wù)補償

當然,我們花了這么大的力氣,絕不僅僅是為了將 rabbitMQ 通過 web 項目將其管理起來,最重要的是能投入業(yè)務(wù)使用中去!

上面的操作只是告訴我們怎么使用 rabbitMQ!

  • 當你仔細回想整個過程的時候,其實還是回到最初那個問題,什么時候使用 MQ ?

以常見的訂單系統(tǒng)為例,用戶點擊【下單】按鈕之后的業(yè)務(wù)邏輯可能包括:支付訂單、扣減庫存、生成相應單據(jù)、發(fā)紅包、發(fā)短信通知等等。

在業(yè)務(wù)發(fā)展初期這些邏輯可能放在一起同步執(zhí)行,隨著業(yè)務(wù)的發(fā)展訂單量增長,需要提升系統(tǒng)服務(wù)的性能,這時可以將一些不需要立即生效的操作拆分出來異步執(zhí)行,比如發(fā)放紅包、發(fā)短信通知等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據(jù))完成之后發(fā)送一條消息到 MQ 讓主流程快速完結(jié),而由另外的單獨線程拉取 MQ 的消息(或者由 MQ 推送消息),當發(fā)現(xiàn) MQ 中有發(fā)紅包或發(fā)短信之類的消息時,執(zhí)行相應的業(yè)務(wù)邏輯。

這種是利用 MQ 實現(xiàn)業(yè)務(wù)解耦,其它的場景包括最終一致性、廣播、錯峰流控等等。

利用 MQ 實現(xiàn)業(yè)務(wù)解耦的過程其實也很簡單。

  • 當主流程結(jié)束之后,將消息推送到發(fā)紅包、發(fā)短信交換器中即可
  1. @Service 
  2. public class OrderService { 
  3.  
  4.     @Autowired 
  5.     private RabbitUtil rabbitUtil; 
  6.  
  7.     /** 
  8.      * 創(chuàng)建訂單 
  9.      * @param order 
  10.      */ 
  11.     @Transactional 
  12.     public void createOrder(Order order){ 
  13.         //1、創(chuàng)建訂單 
  14.         //2、調(diào)用庫存接口,減庫存 
  15.         //3、向客戶發(fā)放紅包 
  16.         rabbitUtil.convertAndSend("exchange.send.bonus"nullorder); 
  17.         //4、發(fā)短信通知 
  18.         rabbitUtil.convertAndSend("exchange.sms.message"nullorder); 
  19.     } 
  20.  
  • 監(jiān)聽發(fā)紅包操作
  1. /** 
  2.  * 監(jiān)聽發(fā)紅包 
  3.  * @param message 
  4.  * @param channel 
  5.  * @throws IOException 
  6.  */ 
  7. @RabbitListener(queues = "exchange.send.bonus"
  8. public void consume(Message message, Channel channel) throws IOException { 
  9.     String msgJson = new String(message.getBody(),"UTF-8"); 
  10.     log.info("收到消息: {}", message.toString()); 
  11.  
  12.     //調(diào)用發(fā)紅包接口 

監(jiān)聽發(fā)短信操作

  1. /** 
  2.  * 監(jiān)聽發(fā)短信 
  3.  * @param message 
  4.  * @param channel 
  5.  * @throws IOException 
  6.  */ 
  7. @RabbitListener(queues = "exchange.sms.message"
  8. public void consume(Message message, Channel channel) throws IOException { 
  9.     String msgJson = new String(message.getBody(),"UTF-8"); 
  10.     log.info("收到消息: {}", message.toString()); 
  11.  
  12.     //調(diào)用發(fā)短信接口 

既然 MQ 這么好用,那是不是完全可以將以前的業(yè)務(wù)也按照整個模型進行拆分呢?

答案顯然不是!

當引入 MQ 之后業(yè)務(wù)的確是解耦了,但是當 MQ 一旦掛了,所有的服務(wù)基本都掛了,是不是很可怕!

但是沒關(guān)系,俗話說,兵來將擋、水來土掩,這句話同樣適用于 IT 開發(fā)者,有坑填坑!

在下篇文章中,我們會詳細介紹 rabbitMQ 的集群搭建和部署,保證消息幾乎 100% 的投遞和消費。

四、總結(jié)

本篇主要圍繞SpringBoot整合rabbitMQ做內(nèi)容介紹,可能也有理解不到位的地方,歡迎網(wǎng)友批評指出!

 

責任編輯:武曉燕 來源: Java極客技術(shù)
相關(guān)推薦

2023-04-26 12:46:43

DockerSpringKubernetes

2009-11-09 14:57:37

WCF上傳文件

2011-01-06 10:39:25

.NET程序打包

2021-07-14 09:00:00

JavaFX開發(fā)應用

2011-01-10 14:41:26

2011-05-03 15:59:00

黑盒打印機

2011-04-21 10:32:44

MySQL雙機同步

2022-08-04 10:39:23

Jenkins集成CD

2020-05-15 08:07:33

JWT登錄單點

2021-03-12 10:01:24

JavaScript 前端表單驗證

2022-01-08 20:04:20

攔截系統(tǒng)調(diào)用

2022-12-07 08:42:35

2022-07-27 08:16:22

搜索引擎Lucene

2022-03-14 14:47:21

HarmonyOS操作系統(tǒng)鴻蒙

2011-02-22 13:46:27

微軟SQL.NET

2021-12-28 08:38:26

Linux 中斷喚醒系統(tǒng)Linux 系統(tǒng)

2021-02-26 11:54:38

MyBatis 插件接口

2011-02-22 14:36:40

ASP.NETmsdnC#

2015-07-15 13:18:27

附近的人開發(fā)

2016-05-12 11:54:39

點贊
收藏

51CTO技術(shù)棧公眾號