自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

消息消費(fèi)失敗如何處理?

開(kāi)發(fā) 前端
在介紹消息中間件 MQ 之前,我們先來(lái)簡(jiǎn)單的了解一下,為何要引用消息中間件。

[[384109]]

本文轉(zhuǎn)載自微信公眾號(hào)「Java極客技術(shù)」,作者鴨血粉絲。轉(zhuǎn)載本文請(qǐng)聯(lián)系Java極客技術(shù)公眾號(hào)。   

一、介紹

在介紹消息中間件 MQ 之前,我們先來(lái)簡(jiǎn)單的了解一下,為何要引用消息中間件。

例如,在電商平臺(tái)中,常見(jiàn)的用戶(hù)下單,會(huì)經(jīng)歷以下幾個(gè)流程。

當(dāng)用戶(hù)下單時(shí),創(chuàng)建完訂單之后,會(huì)調(diào)用第三方支付平臺(tái),對(duì)用戶(hù)的賬戶(hù)金額進(jìn)行扣款,如果平臺(tái)支付扣款成功,會(huì)將結(jié)果通知到對(duì)應(yīng)的業(yè)務(wù)系統(tǒng),接著業(yè)務(wù)系統(tǒng)會(huì)更新訂單狀態(tài),同時(shí)調(diào)用倉(cāng)庫(kù)接口,進(jìn)行減庫(kù)存,通知物流進(jìn)行發(fā)貨!

 

試想一下,從訂單狀態(tài)更新、到扣減庫(kù)存、通知物流發(fā)貨都在一個(gè)方法內(nèi)同步完成,假如用戶(hù)支付成功、訂單狀態(tài)更新也成功,但是在扣減庫(kù)存或者通知物流發(fā)貨步驟失敗了,那么就會(huì)造成一個(gè)問(wèn)題,用戶(hù)已經(jīng)支付成功了,只是在倉(cāng)庫(kù)扣減庫(kù)存方面失敗,從而導(dǎo)致整個(gè)交易失敗!

一單失敗,老板可以假裝看不見(jiàn),但是如果上千個(gè)單子都因此失敗,那么因系統(tǒng)造成的業(yè)務(wù)損失,將是巨大的,老板可能坐不住了!

因此,針對(duì)這種業(yè)務(wù)場(chǎng)景,架構(gòu)師們引入了異步通信技術(shù)方案,從而保證服務(wù)的高可用,大體流程如下:

 

當(dāng)訂單系統(tǒng)收到支付平臺(tái)發(fā)送的扣款結(jié)果之后,會(huì)將訂單消息發(fā)送到 MQ 消息中間件,同時(shí)也會(huì)更新訂單狀態(tài)。

在另一端,由倉(cāng)庫(kù)系統(tǒng)來(lái)異步監(jiān)聽(tīng)訂單系統(tǒng)發(fā)送的消息,當(dāng)收到訂單消息之后,再操作扣減庫(kù)存、通知物流公司發(fā)貨等服務(wù)!

在優(yōu)化后的流程下,即使扣減庫(kù)存服務(wù)失敗,也不會(huì)影響用戶(hù)交易。

正如《人月神話(huà)》中所說(shuō)的,軟件工程,沒(méi)有銀彈!

當(dāng)引入了 MQ 消息中間件之后,同樣也會(huì)帶來(lái)另一個(gè)問(wèn)題,假如 MQ 消息中間件突然宕機(jī)了,導(dǎo)致消息無(wú)法發(fā)送出去,那倉(cāng)庫(kù)系統(tǒng)就無(wú)法接受到訂單消息,進(jìn)而也無(wú)法發(fā)貨!

針對(duì)這個(gè)問(wèn)題,業(yè)界主流的解決辦法是采用集群部署,一主多從模式,從而實(shí)現(xiàn)服務(wù)的高可用,即使一臺(tái)機(jī)器突然宕機(jī)了,也依然能保證服務(wù)可用,在服務(wù)器故障期間,通過(guò)運(yùn)維手段,將服務(wù)重新啟動(dòng),之后服務(wù)依然能正常運(yùn)行!

但是還有另一個(gè)問(wèn)題,假如倉(cāng)庫(kù)系統(tǒng)已經(jīng)收到訂單消息了,但是業(yè)務(wù)處理異常,或者服務(wù)器異常,導(dǎo)致當(dāng)前商品庫(kù)存并沒(méi)有扣減,也沒(méi)有發(fā)貨!

這個(gè)時(shí)候又改如何處理呢?

今天我們所要介紹的正是這種場(chǎng)景,假如消息消費(fèi)失敗,我們應(yīng)該如何處理?

二、解決方案

針對(duì)消息消費(fèi)失敗的場(chǎng)景,我們一般會(huì)通過(guò)如下方式進(jìn)行處理:

  • 當(dāng)消息消費(fèi)失敗時(shí),會(huì)對(duì)消息進(jìn)行重新推送
  • 如果重試次數(shù)超過(guò)最大值,會(huì)將異常消息存儲(chǔ)到數(shù)據(jù)庫(kù),然后人工介入排查問(wèn)題,進(jìn)行手工重試

 

當(dāng)消息在客戶(hù)端消費(fèi)失敗時(shí),我們會(huì)將異常的消息加入到一個(gè)消息重試對(duì)象中,同時(shí)設(shè)置最大重試次數(shù),并將消息重新推送到 MQ 消息中間件里,當(dāng)重試次數(shù)超過(guò)最大值時(shí),會(huì)將異常的消息存儲(chǔ)到 MongoDB數(shù)據(jù)庫(kù)中,方便后續(xù)查詢(xún)異常的信息。

基于以上系統(tǒng)模型,我們可以編寫(xiě)一個(gè)公共重試組件,話(huà)不多說(shuō),直接干!

三、代碼實(shí)踐

本次補(bǔ)償服務(wù)采用 rabbitmq 消息中間件進(jìn)行處理,其他消息中間件處理思路也類(lèi)似!

3.1、創(chuàng)建一個(gè)消息重試實(shí)體類(lèi)

  1. @Data 
  2. @EqualsAndHashCode(callSuper = false
  3. @Accessors(chain = true
  4. public class MessageRetryDTO implements Serializable { 
  5.  
  6.     private static final long serialVersionUID = 1L; 
  7.  
  8.     /** 
  9.      * 原始消息body 
  10.      */ 
  11.     private String bodyMsg; 
  12.  
  13.     /** 
  14.      * 消息來(lái)源ID 
  15.      */ 
  16.     private String sourceId; 
  17.  
  18.     /** 
  19.      * 消息來(lái)源描述 
  20.      */ 
  21.     private String sourceDesc; 
  22.  
  23.     /** 
  24.      * 交換器 
  25.      */ 
  26.     private String exchangeName; 
  27.  
  28.     /** 
  29.      * 路由鍵 
  30.      */ 
  31.     private String routingKey; 
  32.  
  33.     /** 
  34.      * 隊(duì)列 
  35.      */ 
  36.     private String queueName; 
  37.  
  38.     /** 
  39.      * 狀態(tài),1:初始化,2:成功,3:失敗 
  40.      */ 
  41.     private Integer status = 1; 
  42.  
  43.     /** 
  44.      * 最大重試次數(shù) 
  45.      */ 
  46.     private Integer maxTryCount = 3; 
  47.  
  48.     /** 
  49.      * 當(dāng)前重試次數(shù) 
  50.      */ 
  51.     private Integer currentRetryCount = 0; 
  52.  
  53.     /** 
  54.      * 重試時(shí)間間隔(毫秒) 
  55.      */ 
  56.     private Long retryIntervalTime = 0L; 
  57.  
  58.     /** 
  59.      * 任務(wù)失敗信息 
  60.      */ 
  61.     private String errorMsg; 
  62.  
  63.     /** 
  64.      * 創(chuàng)建時(shí)間 
  65.      */ 
  66.     private Date createTime; 
  67.  
  68.     @Override 
  69.     public String toString() { 
  70.         return "MessageRetryDTO{" + 
  71.                 "bodyMsg='" + bodyMsg + '\'' + 
  72.                 ", sourceId='" + sourceId + '\'' + 
  73.                 ", sourceDesc='" + sourceDesc + '\'' + 
  74.                 ", exchangeName='" + exchangeName + '\'' + 
  75.                 ", routingKey='" + routingKey + '\'' + 
  76.                 ", queueName='" + queueName + '\'' + 
  77.                 ", status=" + status + 
  78.                 ", maxTryCount=" + maxTryCount + 
  79.                 ", currentRetryCount=" + currentRetryCount + 
  80.                 ", retryIntervalTime=" + retryIntervalTime + 
  81.                 ", errorMsg='" + errorMsg + '\'' + 
  82.                 ", createTime=" + createTime + 
  83.                 '}'
  84.     } 
  85.  
  86.     /** 
  87.      * 檢查重試次數(shù)是否超過(guò)最大值 
  88.      * 
  89.      * @return 
  90.      */ 
  91.     public boolean checkRetryCount() { 
  92.         retryCountCalculate(); 
  93.         //檢查重試次數(shù)是否超過(guò)最大值 
  94.         if (this.currentRetryCount < this.maxTryCount) { 
  95.             return true
  96.         } 
  97.         return false
  98.     } 
  99.  
  100.     /** 
  101.      * 重新計(jì)算重試次數(shù) 
  102.      */ 
  103.     private void retryCountCalculate() { 
  104.         this.currentRetryCount = this.currentRetryCount + 1; 
  105.     } 
  106.  

3.2、編寫(xiě)服務(wù)重試抽象類(lèi)

  1. public abstract class CommonMessageRetryService { 
  2.  
  3.     private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class); 
  4.  
  5.     @Autowired 
  6.     private RabbitTemplate rabbitTemplate; 
  7.  
  8.     @Autowired 
  9.     private MongoTemplate mongoTemplate; 
  10.  
  11.  
  12.     /** 
  13.      * 初始化消息 
  14.      * 
  15.      * @param message 
  16.      */ 
  17.     public void initMessage(Message message) { 
  18.         log.info("{} 收到消息: {},業(yè)務(wù)數(shù)據(jù):{}", this.getClass().getName(), message.toString(), new String(message.getBody())); 
  19.         try { 
  20.             //封裝消息 
  21.             MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message); 
  22.             if (log.isInfoEnabled()) { 
  23.                 log.info("反序列化消息:{}", messageRetryDto.toString()); 
  24.             } 
  25.             prepareAction(messageRetryDto); 
  26.         } catch (Exception e) { 
  27.             log.warn("處理消息異常,錯(cuò)誤信息:", e); 
  28.         } 
  29.     } 
  30.  
  31.     /** 
  32.      * 準(zhǔn)備執(zhí)行 
  33.      * 
  34.      * @param retryDto 
  35.      */ 
  36.     protected void prepareAction(MessageRetryDTO retryDto) { 
  37.         try { 
  38.             execute(retryDto); 
  39.             doSuccessCallBack(retryDto); 
  40.         } catch (Exception e) { 
  41.             log.error("當(dāng)前任務(wù)執(zhí)行異常,業(yè)務(wù)數(shù)據(jù):" + retryDto.toString(), e); 
  42.             //執(zhí)行失敗,計(jì)算是否還需要繼續(xù)重試 
  43.             if (retryDto.checkRetryCount()) { 
  44.                 if (log.isInfoEnabled()) { 
  45.                     log.info("重試消息:{}", retryDto.toString()); 
  46.                 } 
  47.                 retrySend(retryDto); 
  48.             } else { 
  49.                 if (log.isWarnEnabled()) { 
  50.                     log.warn("當(dāng)前任務(wù)重試次數(shù)已經(jīng)到達(dá)最大次數(shù),業(yè)務(wù)數(shù)據(jù):" + retryDto.toString(), e); 
  51.                 } 
  52.                 doFailCallBack(retryDto.setErrorMsg(e.getMessage())); 
  53.             } 
  54.         } 
  55.     } 
  56.  
  57.     /** 
  58.      * 任務(wù)執(zhí)行成功,回調(diào)服務(wù)(根據(jù)需要進(jìn)行重寫(xiě)) 
  59.      * 
  60.      * @param messageRetryDto 
  61.      */ 
  62.     private void doSuccessCallBack(MessageRetryDTO messageRetryDto) { 
  63.         try { 
  64.             successCallback(messageRetryDto); 
  65.         } catch (Exception e) { 
  66.             log.warn("執(zhí)行成功回調(diào)異常,隊(duì)列描述:{},錯(cuò)誤原因:{}", messageRetryDto.getSourceDesc(), e.getMessage()); 
  67.         } 
  68.     } 
  69.  
  70.     /** 
  71.      * 任務(wù)執(zhí)行失敗,回調(diào)服務(wù)(根據(jù)需要進(jìn)行重寫(xiě)) 
  72.      * 
  73.      * @param messageRetryDto 
  74.      */ 
  75.     private void doFailCallBack(MessageRetryDTO messageRetryDto) { 
  76.         try { 
  77.             saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg())); 
  78.             failCallback(messageRetryDto); 
  79.         } catch (Exception e) { 
  80.             log.warn("執(zhí)行失敗回調(diào)異常,隊(duì)列描述:{},錯(cuò)誤原因:{}", messageRetryDto.getSourceDesc(), e.getMessage()); 
  81.         } 
  82.     } 
  83.  
  84.     /** 
  85.      * 執(zhí)行任務(wù) 
  86.      * 
  87.      * @param messageRetryDto 
  88.      */ 
  89.     protected abstract void execute(MessageRetryDTO messageRetryDto); 
  90.  
  91.     /** 
  92.      * 成功回調(diào) 
  93.      * 
  94.      * @param messageRetryDto 
  95.      */ 
  96.     protected abstract void successCallback(MessageRetryDTO messageRetryDto); 
  97.  
  98.     /** 
  99.      * 失敗回調(diào) 
  100.      * 
  101.      * @param messageRetryDto 
  102.      */ 
  103.     protected abstract void failCallback(MessageRetryDTO messageRetryDto); 
  104.  
  105.     /** 
  106.      * 構(gòu)建消息補(bǔ)償實(shí)體 
  107.      * @param message 
  108.      * @return 
  109.      */ 
  110.     private MessageRetryDTO buildMessageRetryInfo(Message message){ 
  111.         //如果頭部包含補(bǔ)償消息實(shí)體,直接返回 
  112.         Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders(); 
  113.         if(messageHeaders.containsKey("message_retry_info")){ 
  114.             Object retryMsg = messageHeaders.get("message_retry_info"); 
  115.             if(Objects.nonNull(retryMsg)){ 
  116.                 return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class); 
  117.             } 
  118.         } 
  119.         //自動(dòng)將業(yè)務(wù)消息加入補(bǔ)償實(shí)體 
  120.         MessageRetryDTO messageRetryDto = new MessageRetryDTO(); 
  121.         messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8)); 
  122.         messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange()); 
  123.         messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey()); 
  124.         messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue()); 
  125.         messageRetryDto.setCreateTime(new Date()); 
  126.         return messageRetryDto; 
  127.     } 
  128.  
  129.     /** 
  130.      * 異常消息重新入庫(kù) 
  131.      * @param retryDto 
  132.      */ 
  133.     private void retrySend(MessageRetryDTO retryDto){ 
  134.         //將補(bǔ)償消息實(shí)體放入頭部,原始消息內(nèi)容保持不變 
  135.         MessageProperties messageProperties = new MessageProperties(); 
  136.         messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); 
  137.         messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto)); 
  138.         Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties); 
  139.         rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message); 
  140.     } 
  141.  
  142.  
  143.  
  144.     /** 
  145.      * 將異常消息存儲(chǔ)到mongodb中 
  146.      * @param retryDto 
  147.      */ 
  148.     private void saveMessageRetryInfo(MessageRetryDTO retryDto){ 
  149.         try { 
  150.             mongoTemplate.save(retryDto, "message_retry_info"); 
  151.         } catch (Exception e){ 
  152.             log.error("將異常消息存儲(chǔ)到mongodb失敗,消息數(shù)據(jù):" + retryDto.toString(), e); 
  153.         } 
  154.     } 

3.3、編寫(xiě)監(jiān)聽(tīng)服務(wù)類(lèi)

在消費(fèi)端應(yīng)用的時(shí)候,也非常簡(jiǎn)單,例如,針對(duì)扣減庫(kù)存操作,我們可以通過(guò)如下方式進(jìn)行處理!

  1. @Component 
  2. public class OrderServiceListener extends CommonMessageRetryService { 
  3.  
  4.     private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class); 
  5.  
  6.     /** 
  7.      * 監(jiān)聽(tīng)訂單系統(tǒng)下單成功消息 
  8.      * @param message 
  9.      */ 
  10.     @RabbitListener(queues = "mq.order.add"
  11.     public void consume(Message message) { 
  12.         log.info("收到訂單下單成功消息: {}", message.toString()); 
  13.         super.initMessage(message); 
  14.     } 
  15.  
  16.  
  17.     @Override 
  18.     protected void execute(MessageRetryDTO messageRetryDto) { 
  19.         //調(diào)用扣減庫(kù)存服務(wù),將業(yè)務(wù)異常拋出來(lái) 
  20.     } 
  21.  
  22.     @Override 
  23.     protected void successCallback(MessageRetryDTO messageRetryDto) { 
  24.         //業(yè)務(wù)處理成功,回調(diào) 
  25.     } 
  26.  
  27.     @Override 
  28.     protected void failCallback(MessageRetryDTO messageRetryDto) { 
  29.         //業(yè)務(wù)處理失敗,回調(diào) 
  30.     } 

當(dāng)消息消費(fèi)失敗,并超過(guò)最大次數(shù)時(shí),會(huì)將消息存儲(chǔ)到 mongodb 中,然后像常規(guī)數(shù)據(jù)庫(kù)操作一樣,可以通過(guò) web 接口查詢(xún)異常消息,并針對(duì)具體場(chǎng)景進(jìn)行重試!

四、小結(jié)

可能有的同學(xué)會(huì)問(wèn),為啥不將異常消息存在數(shù)據(jù)庫(kù)?

起初的確是存儲(chǔ)在 MYSQL 中,但是隨著業(yè)務(wù)的快速發(fā)展,訂單消息數(shù)據(jù)結(jié)構(gòu)越來(lái)越復(fù)雜,數(shù)據(jù)量也非常的大,甚至大到 MYSQL 中的 text 類(lèi)型都無(wú)法存儲(chǔ),同時(shí)這種數(shù)據(jù)結(jié)構(gòu)也不太適合在 MYSQL 中存儲(chǔ),因此將其遷移到 mongodb!

本文主要圍繞消息消費(fèi)失敗這種場(chǎng)景,進(jìn)行基礎(chǔ)的方案和代碼實(shí)踐講解,可能有理解不到位的地方,歡迎批評(píng)指出!

五、參考

 

1、石杉的架構(gòu)筆記 - 如何處理消息消費(fèi)失敗問(wèn)題

 

責(zé)任編輯:武曉燕 來(lái)源: Java極客技術(shù)
相關(guān)推薦

2024-09-23 08:04:45

MYSQL數(shù)據(jù)存儲(chǔ)

2012-07-03 11:18:20

運(yùn)維disable tab

2018-12-25 09:44:42

2019-08-15 10:20:19

云計(jì)算技術(shù)安全

2017-10-26 08:43:18

JavaScript內(nèi)存處理

2019-12-23 10:20:12

Web圖片優(yōu)化前端

2012-12-12 09:49:41

2017-03-13 13:21:34

Git處理大倉(cāng)庫(kù)

2020-12-29 09:11:33

LinuxLinux內(nèi)核

2022-06-02 10:54:16

BrokerRocketMQ

2024-05-23 12:11:39

2024-12-18 07:43:49

2025-01-09 10:20:53

2011-02-28 14:08:31

網(wǎng)速變慢局域網(wǎng)網(wǎng)速

2024-04-16 13:32:57

2023-01-04 10:01:21

ReactTypeScript元素

2021-03-24 10:40:26

Python垃圾語(yǔ)言

2024-08-26 10:47:22

2022-04-19 09:00:52

ReactTypeScript

2023-07-03 13:50:13

ReactonResize事件
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)