手把手教你 springBoot 整合 rabbitMQ,利用 MQ 實現(xiàn)事務(wù)補償
本文轉(zhuǎn)載自微信公眾號「Java極客技術(shù)」,作者鴨血粉絲。轉(zhuǎn)載本文請聯(lián)系Java極客技術(shù)公眾號。
rabbitMQ 在互聯(lián)網(wǎng)公司有著大規(guī)模應用,本篇將實戰(zhàn)介紹 springboot 整合 rabbitMQ,同時也將在具體的業(yè)務(wù)場景中介紹利用 MQ 實現(xiàn)事務(wù)補償操作。
一、介紹
本篇我們一起來實操一下SpringBoot整合rabbitMQ,為后續(xù)業(yè)務(wù)處理做鋪墊。
廢話不多說,直奔主題!
二、整合實戰(zhàn)
2.1、創(chuàng)建一個 maven 工程,引入 amqp 包
- <!--amqp 支持-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
2.2、在全局文件中配置 rabbitMQ 服務(wù)信息
- spring.rabbitmq.addresses=197.168.24.206:5672
- spring.rabbitmq.username=guest
- spring.rabbitmq.password=guest
- spring.rabbitmq.virtual-host=/
其中,spring.rabbitmq.addresses參數(shù)值為 rabbitmq 服務(wù)器地址
2.3、編寫 rabbitmq 配置類
- @Slf4j
- @Configuration
- public class RabbitConfig {
- /**
- * 初始化連接工廠
- * @param addresses
- * @param userName
- * @param password
- * @param vhost
- * @return
- */
- @Bean
- ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.addresses}") String addresses,
- @Value("${spring.rabbitmq.username}") String userName,
- @Value("${spring.rabbitmq.password}") String password,
- @Value("${spring.rabbitmq.virtual-host}") String vhost) {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setAddresses(addresses);
- connectionFactory.setUsername(userName);
- connectionFactory.setPassword(password);
- connectionFactory.setVirtualHost(vhost);
- return connectionFactory;
- }
- /**
- * 重新實例化 RabbitAdmin 操作類
- * @param connectionFactory
- * @return
- */
- @Bean
- public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
- return new RabbitAdmin(connectionFactory);
- }
- /**
- * 重新實例化 RabbitTemplate 操作類
- * @param connectionFactory
- * @return
- */
- @Bean
- public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
- RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
- //數(shù)據(jù)轉(zhuǎn)換為json存入消息隊列
- rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
- return rabbitTemplate;
- }
- /**
- * 將 RabbitUtil 操作工具類加入IOC容器
- * @return
- */
- @Bean
- public RabbitUtil rabbitUtil(){
- return new RabbitUtil();
- }
- }
2.4、編寫 RabbitUtil 工具類
- public class RabbitUtil {
- private static final Logger logger = LoggerFactory.getLogger(RabbitUtil.class);
- @Autowired
- private RabbitAdmin rabbitAdmin;
- @Autowired
- private RabbitTemplate rabbitTemplate;
- /**
- * 創(chuàng)建Exchange
- * @param exchangeName
- */
- public void addExchange(String exchangeType, String exchangeName){
- Exchange exchange = createExchange(exchangeType, exchangeName);
- rabbitAdmin.declareExchange(exchange);
- }
- /**
- * 刪除一個Exchange
- * @param exchangeName
- */
- public boolean deleteExchange(String exchangeName){
- return rabbitAdmin.deleteExchange(exchangeName);
- }
- /**
- * 創(chuàng)建一個指定的Queue
- * @param queueName
- * @return queueName
- */
- public void addQueue(String queueName){
- Queue queue = createQueue(queueName);
- rabbitAdmin.declareQueue(queue);
- }
- /**
- * 刪除一個queue
- * @return queueName
- * @param queueName
- */
- public boolean deleteQueue(String queueName){
- return rabbitAdmin.deleteQueue(queueName);
- }
- /**
- * 按照篩選條件,刪除隊列
- * @param queueName
- * @param unused 是否被使用
- * @param empty 內(nèi)容是否為空
- */
- public void deleteQueue(String queueName, boolean unused, boolean empty){
- rabbitAdmin.deleteQueue(queueName,unused,empty);
- }
- /**
- * 清空某個隊列中的消息,注意,清空的消息并沒有被消費
- * @return queueName
- * @param queueName
- */
- public void purgeQueue(String queueName){
- rabbitAdmin.purgeQueue(queueName, false);
- }
- /**
- * 判斷指定的隊列是否存在
- * @param queueName
- * @return
- */
- public boolean existQueue(String queueName){
- return rabbitAdmin.getQueueProperties(queueName) == null ? false : true;
- }
- /**
- * 綁定一個隊列到一個匹配型交換器使用一個routingKey
- * @param exchangeType
- * @param exchangeName
- * @param queueName
- * @param routingKey
- * @param isWhereAll
- * @param headers EADERS模式類型設(shè)置,其他模式類型傳空
- */
- public void addBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){
- Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);
- rabbitAdmin.declareBinding(binding);
- }
- /**
- * 聲明綁定
- * @param binding
- */
- public void addBinding(Binding binding){
- rabbitAdmin.declareBinding(binding);
- }
- /**
- * 解除交換器與隊列的綁定
- * @param exchangeType
- * @param exchangeName
- * @param queueName
- * @param routingKey
- * @param isWhereAll
- * @param headers
- */
- public void removeBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){
- Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);
- removeBinding(binding);
- }
- /**
- * 解除交換器與隊列的綁定
- * @param binding
- */
- public void removeBinding(Binding binding){
- rabbitAdmin.removeBinding(binding);
- }
- /**
- * 創(chuàng)建一個交換器、隊列,并綁定隊列
- * @param exchangeType
- * @param exchangeName
- * @param queueName
- * @param routingKey
- * @param isWhereAll
- * @param headers
- */
- public void andExchangeBindingQueue(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){
- //聲明交換器
- addExchange(exchangeType, exchangeName);
- //聲明隊列
- addQueue(queueName);
- //聲明綁定關(guān)系
- addBinding(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);
- }
- /**
- * 發(fā)送消息
- * @param exchange
- * @param routingKey
- * @param object
- */
- public void convertAndSend(String exchange, String routingKey, final Object object){
- rabbitTemplate.convertAndSend(exchange, routingKey, object);
- }
- /**
- * 轉(zhuǎn)換Message對象
- * @param messageType
- * @param msg
- * @return
- */
- public Message getMessage(String messageType, Object msg){
- MessageProperties messageProperties = new MessageProperties();
- messageProperties.setContentType(messageType);
- Message message = new Message(msg.toString().getBytes(),messageProperties);
- return message;
- }
- /**
- * 聲明交換機
- * @param exchangeType
- * @param exchangeName
- * @return
- */
- private Exchange createExchange(String exchangeType, String exchangeName){
- if(ExchangeType.DIRECT.equals(exchangeType)){
- return new DirectExchange(exchangeName);
- }
- if(ExchangeType.TOPIC.equals(exchangeType)){
- return new TopicExchange(exchangeName);
- }
- if(ExchangeType.HEADERS.equals(exchangeType)){
- return new HeadersExchange(exchangeName);
- }
- if(ExchangeType.FANOUT.equals(exchangeType)){
- return new FanoutExchange(exchangeName);
- }
- return null;
- }
- /**
- * 聲明綁定關(guān)系
- * @param exchangeType
- * @param exchangeName
- * @param queueName
- * @param routingKey
- * @param isWhereAll
- * @param headers
- * @return
- */
- private Binding bindingBuilder(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){
- if(ExchangeType.DIRECT.equals(exchangeType)){
- return BindingBuilder.bind(new Queue(queueName)).to(new DirectExchange(exchangeName)).with(routingKey);
- }
- if(ExchangeType.TOPIC.equals(exchangeType)){
- return BindingBuilder.bind(new Queue(queueName)).to(new TopicExchange(exchangeName)).with(routingKey);
- }
- if(ExchangeType.HEADERS.equals(exchangeType)){
- if(isWhereAll){
- return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAll(headers).match();
- }else{
- return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAny(headers).match();
- }
- }
- if(ExchangeType.FANOUT.equals(exchangeType)){
- return BindingBuilder.bind(new Queue(queueName)).to(new FanoutExchange(exchangeName));
- }
- return null;
- }
- /**
- * 聲明隊列
- * @param queueName
- * @return
- */
- private Queue createQueue(String queueName){
- return new Queue(queueName);
- }
- /**
- * 交換器類型
- */
- public final static class ExchangeType {
- /**
- * 直連交換機(全文匹配)
- */
- public final static String DIRECT = "DIRECT";
- /**
- * 通配符交換機(兩種通配符:*只能匹配一個單詞,#可以匹配零個或多個)
- */
- public final static String TOPIC = "TOPIC";
- /**
- * 頭交換機(自定義鍵值對匹配,根據(jù)發(fā)送消息內(nèi)容中的headers屬性進行匹配)
- */
- public final static String HEADERS = "HEADERS";
- /**
- * 扇形(廣播)交換機 (將消息轉(zhuǎn)發(fā)到所有與該交互機綁定的隊列上)
- */
- public final static String FANOUT = "FANOUT";
- }
- }
此致, rabbitMQ 核心操作功能操作已經(jīng)開發(fā)完畢!
2.5、編寫隊列監(jiān)聽類(靜態(tài))
- @Slf4j
- @Configuration
- public class DirectConsumeListener {
- /**
- * 監(jiān)聽指定隊列,名稱:mq.direct.1
- * @param message
- * @param channel
- * @throws IOException
- */
- @RabbitListener(queues = "mq.direct.1")
- public void consume(Message message, Channel channel) throws IOException {
- log.info("DirectConsumeListener,收到消息: {}", message.toString());
- }
- }
如果你需要監(jiān)聽指定的隊列,只需要方法上加上@RabbitListener(queues = "")即可,同時填寫對應的隊列名稱。
但是,如果你想動態(tài)監(jiān)聽隊列,而不是通過寫死在方法上呢?
請看下面介紹!
2.6、編寫隊列監(jiān)聽類(動態(tài))
重新實例化一個SimpleMessageListenerContainer對象,這個對象就是監(jiān)聽容器。
- @Slf4j
- @Configuration
- public class DynamicConsumeListener {
- /**
- * 使用SimpleMessageListenerContainer實現(xiàn)動態(tài)監(jiān)聽
- * @param connectionFactory
- * @return
- */
- @Bean
- public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
- container.setMessageListener((MessageListener) message -> {
- log.info("ConsumerMessageListen,收到消息: {}", message.toString());
- });
- return container;
- }
- }
如果想向SimpleMessageListenerContainer添加監(jiān)聽隊列或者移除隊列,只需通過如下方式即可操作。
- @Slf4j
- @RestController
- @RequestMapping("/consumer")
- public class ConsumerController {
- @Autowired
- private SimpleMessageListenerContainer container;
- @Autowired
- private RabbitUtil rabbitUtil;
- /**
- * 添加隊列到監(jiān)聽器
- * @param consumerInfo
- */
- @PostMapping("addQueue")
- public void addQueue(@RequestBody ConsumerInfo consumerInfo) {
- boolean existQueue = rabbitUtil.existQueue(consumerInfo.getQueueName());
- if(!existQueue){
- throw new CommonExecption("當前隊列不存在");
- }
- //消費mq消息的類
- container.addQueueNames(consumerInfo.getQueueName());
- //打印監(jiān)聽容器中正在監(jiān)聽到隊列
- log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames()));
- }
- /**
- * 移除正在監(jiān)聽的隊列
- * @param consumerInfo
- */
- @PostMapping("removeQueue")
- public void removeQueue(@RequestBody ConsumerInfo consumerInfo) {
- //消費mq消息的類
- container.removeQueueNames(consumerInfo.getQueueName());
- //打印監(jiān)聽容器中正在監(jiān)聽到隊列
- log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames()));
- }
- /**
- * 查詢監(jiān)聽容器中正在監(jiān)聽到隊列
- */
- @PostMapping("queryListenerQueue")
- public void queryListenerQueue() {
- log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames()));
- }
- }
2.7、發(fā)送消息到交換器
發(fā)送消息到交換器,非常簡單,只需要通過如下方式即可!
- 先編寫一個請求參數(shù)實體類
- @Data
- public class ProduceInfo implements Serializable {
- private static final long serialVersionUID = 1l;
- /**
- * 交換器名稱
- */
- private String exchangeName;
- /**
- * 路由鍵key
- */
- private String routingKey;
- /**
- * 消息內(nèi)容
- */
- public String msg;
- }
- 編寫接口api
- @RestController
- @RequestMapping("/produce")
- public class ProduceController {
- @Autowired
- private RabbitUtil rabbitUtil;
- /**
- * 發(fā)送消息到交換器
- * @param produceInfo
- */
- @PostMapping("sendMessage")
- public void sendMessage(@RequestBody ProduceInfo produceInfo) {
- rabbitUtil.convertAndSend(produceInfo.getExchangeName(), produceInfo.getRoutingKey(), produceInfo);
- }
- }
當然,你也可以直接使用rabbitTemplate操作類,來實現(xiàn)發(fā)送消息。
- rabbitTemplate.convertAndSend(exchange, routingKey, message);
參數(shù)內(nèi)容解釋:
- exchange:表示交換器名稱
- routingKey:表示路由鍵key
- message:表示消息
2.8、交換器、隊列維護操作
如果想通過接口對 rabbitMQ 中的交換器、隊列以及綁定關(guān)系進行維護,通過如下方式接口操作,即可實現(xiàn)!
先編寫一個請求參數(shù)實體類
- @Data
- public class QueueConfig implements Serializable{
- private static final long serialVersionUID = 1l;
- /**
- * 交換器類型
- */
- private String exchangeType;
- /**
- * 交換器名稱
- */
- private String exchangeName;
- /**
- * 隊列名稱
- */
- private String queueName;
- /**
- * 路由鍵key
- */
- private String routingKey;
- }
編寫接口api
- /**
- * rabbitMQ管理操作控制層
- */
- @RestController
- @RequestMapping("/config")
- public class RabbitController {
- @Autowired
- private RabbitUtil rabbitUtil;
- /**
- * 創(chuàng)建交換器
- * @param config
- */
- @PostMapping("addExchange")
- public void addExchange(@RequestBody QueueConfig config) {
- rabbitUtil.addExchange(config.getExchangeType(), config.getExchangeName());
- }
- /**
- * 刪除交換器
- * @param config
- */
- @PostMapping("deleteExchange")
- public void deleteExchange(@RequestBody QueueConfig config) {
- rabbitUtil.deleteExchange(config.getExchangeName());
- }
- /**
- * 添加隊列
- * @param config
- */
- @PostMapping("addQueue")
- public void addQueue(@RequestBody QueueConfig config) {
- rabbitUtil.addQueue(config.getQueueName());
- }
- /**
- * 刪除隊列
- * @param config
- */
- @PostMapping("deleteQueue")
- public void deleteQueue(@RequestBody QueueConfig config) {
- rabbitUtil.deleteQueue(config.getQueueName());
- }
- /**
- * 清空隊列數(shù)據(jù)
- * @param config
- */
- @PostMapping("purgeQueue")
- public void purgeQueue(@RequestBody QueueConfig config) {
- rabbitUtil.purgeQueue(config.getQueueName());
- }
- /**
- * 添加綁定
- * @param config
- */
- @PostMapping("addBinding")
- public void addBinding(@RequestBody QueueConfig config) {
- rabbitUtil.addBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null);
- }
- /**
- * 解除綁定
- * @param config
- */
- @PostMapping("removeBinding")
- public void removeBinding(@RequestBody QueueConfig config) {
- rabbitUtil.removeBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null);
- }
- /**
- * 創(chuàng)建頭部類型的交換器
- * 判斷條件是所有的鍵值對都匹配成功才發(fā)送到隊列
- * @param config
- */
- @PostMapping("andExchangeBindingQueueOfHeaderAll")
- public void andExchangeBindingQueueOfHeaderAll(@RequestBody QueueConfig config) {
- HashMap<String, Object> header = new HashMap<>();
- header.put("queue", "queue");
- header.put("bindType", "whereAll");
- rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), null, true, header);
- }
- /**
- * 創(chuàng)建頭部類型的交換器
- * 判斷條件是只要有一個鍵值對匹配成功就發(fā)送到隊列
- * @param config
- */
- @PostMapping("andExchangeBindingQueueOfHeaderAny")
- public void andExchangeBindingQueueOfHeaderAny(@RequestBody QueueConfig config) {
- HashMap<String, Object> header = new HashMap<>();
- header.put("queue", "queue");
- header.put("bindType", "whereAny");
- rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), null, false, header);
- }
- }
至此,rabbitMQ 管理器基本的 crud 全部開發(fā)完成!
三、利用 MQ 實現(xiàn)事務(wù)補償
當然,我們花了這么大的力氣,絕不僅僅是為了將 rabbitMQ 通過 web 項目將其管理起來,最重要的是能投入業(yè)務(wù)使用中去!
上面的操作只是告訴我們怎么使用 rabbitMQ!
- 當你仔細回想整個過程的時候,其實還是回到最初那個問題,什么時候使用 MQ ?
以常見的訂單系統(tǒng)為例,用戶點擊【下單】按鈕之后的業(yè)務(wù)邏輯可能包括:支付訂單、扣減庫存、生成相應單據(jù)、發(fā)紅包、發(fā)短信通知等等。
在業(yè)務(wù)發(fā)展初期這些邏輯可能放在一起同步執(zhí)行,隨著業(yè)務(wù)的發(fā)展訂單量增長,需要提升系統(tǒng)服務(wù)的性能,這時可以將一些不需要立即生效的操作拆分出來異步執(zhí)行,比如發(fā)放紅包、發(fā)短信通知等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據(jù))完成之后發(fā)送一條消息到 MQ 讓主流程快速完結(jié),而由另外的單獨線程拉取 MQ 的消息(或者由 MQ 推送消息),當發(fā)現(xiàn) MQ 中有發(fā)紅包或發(fā)短信之類的消息時,執(zhí)行相應的業(yè)務(wù)邏輯。
這種是利用 MQ 實現(xiàn)業(yè)務(wù)解耦,其它的場景包括最終一致性、廣播、錯峰流控等等。
利用 MQ 實現(xiàn)業(yè)務(wù)解耦的過程其實也很簡單。
- 當主流程結(jié)束之后,將消息推送到發(fā)紅包、發(fā)短信交換器中即可
- @Service
- public class OrderService {
- @Autowired
- private RabbitUtil rabbitUtil;
- /**
- * 創(chuàng)建訂單
- * @param order
- */
- @Transactional
- public void createOrder(Order order){
- //1、創(chuàng)建訂單
- //2、調(diào)用庫存接口,減庫存
- //3、向客戶發(fā)放紅包
- rabbitUtil.convertAndSend("exchange.send.bonus", null, order);
- //4、發(fā)短信通知
- rabbitUtil.convertAndSend("exchange.sms.message", null, order);
- }
- }
- 監(jiān)聽發(fā)紅包操作
- /**
- * 監(jiān)聽發(fā)紅包
- * @param message
- * @param channel
- * @throws IOException
- */
- @RabbitListener(queues = "exchange.send.bonus")
- public void consume(Message message, Channel channel) throws IOException {
- String msgJson = new String(message.getBody(),"UTF-8");
- log.info("收到消息: {}", message.toString());
- //調(diào)用發(fā)紅包接口
- }
監(jiān)聽發(fā)短信操作
- /**
- * 監(jiān)聽發(fā)短信
- * @param message
- * @param channel
- * @throws IOException
- */
- @RabbitListener(queues = "exchange.sms.message")
- public void consume(Message message, Channel channel) throws IOException {
- String msgJson = new String(message.getBody(),"UTF-8");
- log.info("收到消息: {}", message.toString());
- //調(diào)用發(fā)短信接口
- }
既然 MQ 這么好用,那是不是完全可以將以前的業(yè)務(wù)也按照整個模型進行拆分呢?
答案顯然不是!
當引入 MQ 之后業(yè)務(wù)的確是解耦了,但是當 MQ 一旦掛了,所有的服務(wù)基本都掛了,是不是很可怕!
但是沒關(guān)系,俗話說,兵來將擋、水來土掩,這句話同樣適用于 IT 開發(fā)者,有坑填坑!
在下篇文章中,我們會詳細介紹 rabbitMQ 的集群搭建和部署,保證消息幾乎 100% 的投遞和消費。
四、總結(jié)
本篇主要圍繞SpringBoot整合rabbitMQ做內(nèi)容介紹,可能也有理解不到位的地方,歡迎網(wǎng)友批評指出!