自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

真實(shí)案例 | 如何處理 MQ 消費(fèi)失敗的消息?

數(shù)據(jù)庫(kù) MySQL
起初的確是存儲(chǔ)在 MYSQL 中,但是隨著業(yè)務(wù)的快速發(fā)展,訂單消息數(shù)據(jù)結(jié)構(gòu)越來(lái)越復(fù)雜,數(shù)據(jù)量也非常的大,甚至大到 MYSQL 中的 text 類型都無(wú)法存儲(chǔ),同時(shí)這種數(shù)據(jù)結(jié)構(gòu)也不太適合在 MYSQL 中存儲(chǔ),因此將其遷移到 mongodb!

01、背景介紹

再次回顧一下當(dāng)時(shí)提過(guò)的問(wèn)題,為何項(xiàng)目中要引用 MQ 消息中間件?

我們知道,在電商平臺(tái)中常見(jiàn)的用戶下單,會(huì)經(jīng)歷以下幾個(gè)流程。

當(dāng)用戶下單時(shí),創(chuàng)建完訂單之后,會(huì)調(diào)用第三方支付平臺(tái),對(duì)用戶的賬戶金額進(jìn)行扣款,如果平臺(tái)支付扣款成功,會(huì)將結(jié)果通知到對(duì)應(yīng)的業(yè)務(wù)系統(tǒng),接著業(yè)務(wù)系統(tǒng)會(huì)更新訂單狀態(tài),同時(shí)調(diào)用倉(cāng)庫(kù)接口,進(jìn)行減庫(kù)存,通知物流進(jìn)行發(fā)貨!

圖片圖片

試想一下,從訂單狀態(tài)更新、到扣減庫(kù)存、通知物流發(fā)貨都在一個(gè)方法內(nèi)同步完成,假如用戶支付成功、訂單狀態(tài)更新也成功,但是在扣減庫(kù)存或者通知物流發(fā)貨步驟失敗了,那么就會(huì)造成一個(gè)問(wèn)題,用戶已經(jīng)支付成功了,只是在倉(cāng)庫(kù)扣減庫(kù)存方面失敗,從而導(dǎo)致整個(gè)交易失??!

一單失敗,老板可以假裝看不見(jiàn),但是如果上千個(gè)單子都因此失敗,那么因系統(tǒng)造成的業(yè)務(wù)損失,將是巨大的,老板可能坐不住了!

因此,針對(duì)這種業(yè)務(wù)場(chǎng)景,架構(gòu)師們引入了異步通信技術(shù)方案,從而保證服務(wù)的高可用,大體流程如下:

圖片圖片

當(dāng)訂單系統(tǒng)收到支付平臺(tái)發(fā)送的扣款結(jié)果之后,會(huì)將訂單消息發(fā)送到 MQ 消息中間件,同時(shí)也會(huì)更新訂單狀態(tài)。

在另一端,由倉(cāng)庫(kù)系統(tǒng)來(lái)異步監(jiān)聽(tīng)訂單系統(tǒng)發(fā)送的消息,當(dāng)收到訂單消息之后,再操作扣減庫(kù)存、通知物流公司發(fā)貨等服務(wù)!

在優(yōu)化后的流程下,即使扣減庫(kù)存服務(wù)失敗,也不會(huì)影響用戶交易。

正如《人月神話》中所說(shuō)的,軟件工程,沒(méi)有銀彈!

當(dāng)引入了 MQ 消息中間件之后,同樣也會(huì)帶來(lái)另一個(gè)問(wèn)題,假如 MQ 消息中間件突然宕機(jī)了,導(dǎo)致消息無(wú)法發(fā)送出去,那倉(cāng)庫(kù)系統(tǒng)就無(wú)法接受到訂單消息,進(jìn)而也無(wú)法發(fā)貨!

針對(duì)這個(gè)問(wèn)題,業(yè)界主流的解決辦法是采用集群部署,一主多從模式,從而實(shí)現(xiàn)服務(wù)的高可用,即使一臺(tái)機(jī)器突然宕機(jī)了,也依然能保證服務(wù)可用,在服務(wù)器故障期間,通過(guò)運(yùn)維手段,將服務(wù)重新啟動(dòng),之后服務(wù)依然能正常運(yùn)行!

但是還有另一個(gè)問(wèn)題,假如倉(cāng)庫(kù)系統(tǒng)已經(jīng)收到訂單消息了,但是業(yè)務(wù)處理異常,或者服務(wù)器異常,導(dǎo)致當(dāng)前商品庫(kù)存并沒(méi)有扣減,也沒(méi)有發(fā)貨!

這個(gè)時(shí)候又改如何處理呢?

今天我們所要介紹的正是這種場(chǎng)景,假如消息消費(fèi)失敗,我們應(yīng)該如何處理?

02、解決方案

針對(duì)消息消費(fèi)失敗的場(chǎng)景,我們一般會(huì)通過(guò)如下方式進(jìn)行處理:

  • 當(dāng)消息消費(fèi)失敗時(shí),會(huì)對(duì)消息進(jìn)行重新推送
  • 如果重試次數(shù)超過(guò)最大值,會(huì)將異常消息存儲(chǔ)到數(shù)據(jù)庫(kù),然后人工介入排查問(wèn)題,進(jìn)行手工重試

圖片圖片

當(dāng)消息在客戶端消費(fèi)失敗時(shí),我們會(huì)將異常的消息加入到一個(gè)消息重試對(duì)象中,同時(shí)設(shè)置最大重試次數(shù),并將消息重新推送到 MQ 消息中間件里,當(dāng)重試次數(shù)超過(guò)最大值時(shí),會(huì)將異常的消息存儲(chǔ)到 MongoDB數(shù)據(jù)庫(kù)中,方便后續(xù)查詢異常的信息。

基于以上系統(tǒng)模型,我們可以編寫一個(gè)公共重試組件,話不多說(shuō),直接干!

03、代碼實(shí)踐

本次補(bǔ)償服務(wù)采用 rabbitmq 消息中間件進(jìn)行處理,其他消息中間件處理思路也類似!

3.1、創(chuàng)建一個(gè)消息重試實(shí)體類

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class MessageRetryDTO implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 原始消息body
     */
    private String bodyMsg;

    /**
     * 消息來(lái)源ID
     */
    private String sourceId;

    /**
     * 消息來(lái)源描述
     */
    private String sourceDesc;

    /**
     * 交換器
     */
    private String exchangeName;

    /**
     * 路由鍵
     */
    private String routingKey;

    /**
     * 隊(duì)列
     */
    private String queueName;

    /**
     * 狀態(tài),1:初始化,2:成功,3:失敗
     */
    private Integer status = 1;

    /**
     * 最大重試次數(shù)
     */
    private Integer maxTryCount = 3;

    /**
     * 當(dāng)前重試次數(shù)
     */
    private Integer currentRetryCount = 0;

    /**
     * 重試時(shí)間間隔(毫秒)
     */
    private Long retryIntervalTime = 0L;

    /**
     * 任務(wù)失敗信息
     */
    private String errorMsg;

    /**
     * 創(chuàng)建時(shí)間
     */
    private Date createTime;

    @Override
    public String toString() {
        return "MessageRetryDTO{" +
                "bodyMsg='" + bodyMsg + '\'' +
                ", sourceId='" + sourceId + '\'' +
                ", sourceDesc='" + sourceDesc + '\'' +
                ", exchangeName='" + exchangeName + '\'' +
                ", routingKey='" + routingKey + '\'' +
                ", queueName='" + queueName + '\'' +
                ", status=" + status +
                ", maxTryCount=" + maxTryCount +
                ", currentRetryCount=" + currentRetryCount +
                ", retryIntervalTime=" + retryIntervalTime +
                ", errorMsg='" + errorMsg + '\'' +
                ", createTime=" + createTime +
                '}';
    }

    /**
     * 檢查重試次數(shù)是否超過(guò)最大值
     *
     * @return
     */
    public boolean checkRetryCount() {
        retryCountCalculate();
        //檢查重試次數(shù)是否超過(guò)最大值
        if (this.currentRetryCount < this.maxTryCount) {
            return true;
        }
        return false;
    }

    /**
     * 重新計(jì)算重試次數(shù)
     */
    private void retryCountCalculate() {
        this.currentRetryCount = this.currentRetryCount + 1;
    }

}

3.2、編寫服務(wù)重試抽象類

public abstract class CommonMessageRetryService {

    private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private MongoTemplate mongoTemplate;


    /**
     * 初始化消息
     *
     * @param message
     */
    public void initMessage(Message message) {
        log.info("{} 收到消息: {},業(yè)務(wù)數(shù)據(jù):{}", this.getClass().getName(), message.toString(), new String(message.getBody()));
        try {
            //封裝消息
            MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message);
            if (log.isInfoEnabled()) {
                log.info("反序列化消息:{}", messageRetryDto.toString());
            }
            prepareAction(messageRetryDto);
        } catch (Exception e) {
            log.warn("處理消息異常,錯(cuò)誤信息:", e);
        }
    }

    /**
     * 準(zhǔn)備執(zhí)行
     *
     * @param retryDto
     */
    protected void prepareAction(MessageRetryDTO retryDto) {
        try {
            execute(retryDto);
            doSuccessCallBack(retryDto);
        } catch (Exception e) {
            log.error("當(dāng)前任務(wù)執(zhí)行異常,業(yè)務(wù)數(shù)據(jù):" + retryDto.toString(), e);
            //執(zhí)行失敗,計(jì)算是否還需要繼續(xù)重試
            if (retryDto.checkRetryCount()) {
                if (log.isInfoEnabled()) {
                    log.info("重試消息:{}", retryDto.toString());
                }
                retrySend(retryDto);
            } else {
                if (log.isWarnEnabled()) {
                    log.warn("當(dāng)前任務(wù)重試次數(shù)已經(jīng)到達(dá)最大次數(shù),業(yè)務(wù)數(shù)據(jù):" + retryDto.toString(), e);
                }
                doFailCallBack(retryDto.setErrorMsg(e.getMessage()));
            }
        }
    }

    /**
     * 任務(wù)執(zhí)行成功,回調(diào)服務(wù)(根據(jù)需要進(jìn)行重寫)
     *
     * @param messageRetryDto
     */
    private void doSuccessCallBack(MessageRetryDTO messageRetryDto) {
        try {
            successCallback(messageRetryDto);
        } catch (Exception e) {
            log.warn("執(zhí)行成功回調(diào)異常,隊(duì)列描述:{},錯(cuò)誤原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());
        }
    }

    /**
     * 任務(wù)執(zhí)行失敗,回調(diào)服務(wù)(根據(jù)需要進(jìn)行重寫)
     *
     * @param messageRetryDto
     */
    private void doFailCallBack(MessageRetryDTO messageRetryDto) {
        try {
            saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg()));
            failCallback(messageRetryDto);
        } catch (Exception e) {
            log.warn("執(zhí)行失敗回調(diào)異常,隊(duì)列描述:{},錯(cuò)誤原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());
        }
    }

    /**
     * 執(zhí)行任務(wù)
     *
     * @param messageRetryDto
     */
    protected abstract void execute(MessageRetryDTO messageRetryDto);

    /**
     * 成功回調(diào)
     *
     * @param messageRetryDto
     */
    protected abstract void successCallback(MessageRetryDTO messageRetryDto);

    /**
     * 失敗回調(diào)
     *
     * @param messageRetryDto
     */
    protected abstract void failCallback(MessageRetryDTO messageRetryDto);

    /**
     * 構(gòu)建消息補(bǔ)償實(shí)體
     * @param message
     * @return
     */
    private MessageRetryDTO buildMessageRetryInfo(Message message){
        //如果頭部包含補(bǔ)償消息實(shí)體,直接返回
        Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders();
        if(messageHeaders.containsKey("message_retry_info")){
            Object retryMsg = messageHeaders.get("message_retry_info");
            if(Objects.nonNull(retryMsg)){
                return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class);
            }
        }
        //自動(dòng)將業(yè)務(wù)消息加入補(bǔ)償實(shí)體
        MessageRetryDTO messageRetryDto = new MessageRetryDTO();
        messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8));
        messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange());
        messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
        messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue());
        messageRetryDto.setCreateTime(new Date());
        return messageRetryDto;
    }

    /**
     * 異常消息重新入庫(kù)
     * @param retryDto
     */
    private void retrySend(MessageRetryDTO retryDto){
        //將補(bǔ)償消息實(shí)體放入頭部,原始消息內(nèi)容保持不變
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto));
        Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties);
        rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message);
    }



    /**
     * 將異常消息存儲(chǔ)到mongodb中
     * @param retryDto
     */
    private void saveMessageRetryInfo(MessageRetryDTO retryDto){
        try {
            mongoTemplate.save(retryDto, "message_retry_info");
        } catch (Exception e){
            log.error("將異常消息存儲(chǔ)到mongodb失敗,消息數(shù)據(jù):" + retryDto.toString(), e);
        }
    }
}

3.3、編寫監(jiān)聽(tīng)服務(wù)類

在消費(fèi)端應(yīng)用的時(shí)候,也非常簡(jiǎn)單,例如,針對(duì)扣減庫(kù)存操作,我們可以通過(guò)如下方式進(jìn)行處理!

@Component
public class OrderServiceListener extends CommonMessageRetryService {

    private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class);

    /**
     * 監(jiān)聽(tīng)訂單系統(tǒng)下單成功消息
     * @param message
     */
    @RabbitListener(queues = "mq.order.add")
    public void consume(Message message) {
        log.info("收到訂單下單成功消息: {}", message.toString());
        super.initMessage(message);
    }


    @Override
    protected void execute(MessageRetryDTO messageRetryDto) {
        //調(diào)用扣減庫(kù)存服務(wù),將業(yè)務(wù)異常拋出來(lái)
    }

    @Override
    protected void successCallback(MessageRetryDTO messageRetryDto) {
        //業(yè)務(wù)處理成功,回調(diào)
    }

    @Override
    protected void failCallback(MessageRetryDTO messageRetryDto) {
        //業(yè)務(wù)處理失敗,回調(diào)
    }
}

當(dāng)消息消費(fèi)失敗,并超過(guò)最大次數(shù)時(shí),會(huì)將消息存儲(chǔ)到 mongodb 中,然后像常規(guī)數(shù)據(jù)庫(kù)操作一樣,可以通過(guò)web接口查詢異常消息,并針對(duì)具體場(chǎng)景進(jìn)行重試!

04、小結(jié)

可能有的同學(xué)會(huì)問(wèn),為啥不將異常消息存在數(shù)據(jù)庫(kù)?

起初的確是存儲(chǔ)在 MYSQL 中,但是隨著業(yè)務(wù)的快速發(fā)展,訂單消息數(shù)據(jù)結(jié)構(gòu)越來(lái)越復(fù)雜,數(shù)據(jù)量也非常的大,甚至大到 MYSQL 中的 text 類型都無(wú)法存儲(chǔ),同時(shí)這種數(shù)據(jù)結(jié)構(gòu)也不太適合在 MYSQL 中存儲(chǔ),因此將其遷移到 mongodb!

本文主要圍繞消息消費(fèi)失敗這種場(chǎng)景,進(jìn)行基礎(chǔ)的方案和代碼實(shí)踐講解,如果有描述不對(duì)的地方,歡迎大家留言指出!

05、參考

1、https://blog.csdn.net/qq_42046105/article/details/114156904

責(zé)任編輯:武曉燕 來(lái)源: 潘志的研發(fā)筆記
相關(guān)推薦

2021-03-01 07:31:53

消息支付高可用

2016-07-04 14:22:47

DevOps案例軟件

2021-09-30 07:26:15

MQ消息丟失

2012-07-03 11:18:20

運(yùn)維disable tab

2025-01-10 08:20:00

MQ消息架構(gòu)

2018-12-25 09:44:42

2024-06-18 14:08:22

2023-07-26 08:21:33

2019-08-15 10:20:19

云計(jì)算技術(shù)安全

2020-10-18 07:25:55

MQ消息冪等架構(gòu)

2021-03-24 10:40:26

Python垃圾語(yǔ)言

2021-03-08 10:19:59

MQ消息磁盤

2024-11-13 00:59:13

2025-01-13 05:00:00

2021-04-20 08:32:51

消息MQ隊(duì)列

2022-06-02 10:54:16

BrokerRocketMQ

2024-05-23 12:11:39

2024-12-18 07:43:49

2023-11-27 17:29:43

Kafka全局順序性

2024-12-12 14:56:48

消息積壓MQ分區(qū)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)