自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

MQ的數(shù)據(jù)一致性,如何保證?

開發(fā) 架構(gòu)
在消費(fèi)者端消費(fèi)消息時(shí),如果消費(fèi)失敗次數(shù),達(dá)到重試上限后進(jìn)入死信隊(duì)列。這個(gè)方案救了社交系統(tǒng)的推送服務(wù)——通過DLX收集全部異常消息,凌晨用補(bǔ)償Job重跑。

前言

上個(gè)月,我們有個(gè)電商系統(tǒng)出了個(gè)靈異事件:用戶支付成功了,但訂單狀態(tài)死活不改成“已發(fā)貨”。

折騰了半天才定位到問題:訂單服務(wù)的MQ消息,像人間蒸發(fā)一樣消失了。

這個(gè)Bug讓我明白:(MQ)消息隊(duì)列的數(shù)據(jù)一致性設(shè)計(jì),絕對(duì)能排進(jìn)分布式系統(tǒng)三大噩夢(mèng)之一!

今天這篇文章跟大家一起聊聊,MQ如何保證數(shù)據(jù)一致性?希望對(duì)你會(huì)有所幫助。

1.數(shù)據(jù)一致性問題的原因

這些年在Kafka、RabbitMQ、RocketMQ踩過的坑,總結(jié)成四類致命原因:

  • 生產(chǎn)者悲?。合⒊晒M(jìn)Broker,卻沒寫入磁盤就斷電。
  • 消費(fèi)者悲劇:消息消費(fèi)成功,但業(yè)務(wù)執(zhí)行失敗。
  • 輪盤賭局:網(wǎng)絡(luò)抖動(dòng)導(dǎo)致消息重復(fù)投遞。
  • 數(shù)據(jù)孤島:數(shù)據(jù)庫和消息狀態(tài)割裂(下完單沒發(fā)券)

這些情況,都會(huì)導(dǎo)致MQ產(chǎn)生數(shù)據(jù)不一致的問題。

那么,如何解決這些問題呢?

2.消息不丟的方案

我們首先需要解決消息丟失的問題。

2.1 事務(wù)消息的兩階段提交

以RocketMQ的事務(wù)消息為例,工作原理就像雙11的預(yù)售定金偽代碼如下:

// 發(fā)送事務(wù)消息核心代碼
TransactionMQProducer producer = new TransactionMQProducer("group");
producer.setTransactionListener(new TransactionListener() {
    // 執(zhí)行本地事務(wù)(比如扣庫存)
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        return doBiz() ? LocalTransactionState.COMMIT : LocalTransactionState.ROLLBACK;
    }

    // Broker回調(diào)檢查本地事務(wù)狀態(tài)
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        return checkDB(msg.getTransactionId()) ? COMMIT : ROLLBACK;
    }
});

真實(shí)場(chǎng)景中,別忘了在checkLocalTransaction里做好妥協(xié)查詢(查流水表或分布式事務(wù)日志)。

去年在物流系統(tǒng)救火,就遇到過事務(wù)超時(shí)的坑——本地事務(wù)成功了,但因網(wǎng)絡(luò)問題沒收到Commit,導(dǎo)致Broker不斷回查。

2.2 持久化配置

RabbitMQ的坑都在配置表里:

配置項(xiàng)

例子

作用

隊(duì)列持久化

durable=true

隊(duì)列元數(shù)據(jù)不丟

消息持久化

deliveryMode=2

消息存入磁盤

Lazy Queue

x-queue-mode=lazy

消息直接寫盤不讀取進(jìn)內(nèi)存

Confirm機(jī)制

publisher-confirm-type

生產(chǎn)者確認(rèn)消息投遞成功

RabbitMQ本地存儲(chǔ)+備份交換機(jī)雙重保護(hù)代碼如下:

channel.queueDeclare("order_queue", true, false, false, 
    new HashMap<String, Object>(){{
        put("x-dead-letter-exchange", "dlx_exchange"); // 死信交換機(jī)
    }});

去年雙十一訂單系統(tǒng)就靠這個(gè)組合拳硬剛流量峰值:主隊(duì)列消息積壓觸發(fā)閾值時(shí),自動(dòng)轉(zhuǎn)移消息到備份隊(duì)列給應(yīng)急服務(wù)處理。

2.3 副本配置

消息隊(duì)列

保命絕招

Kafka

acks=all + 副本數(shù)≥3

RocketMQ

同步刷盤 + 主從同步策略

Pulsar

BookKeeper多副本存儲(chǔ)

上周幫一個(gè)金融系統(tǒng)遷移到Kafka,為了數(shù)據(jù)安全啟用了最高配置。

server.properties配置如下:

acks=all
min.insync.replicas=2
unclean.leader.election.enable=false

結(jié)果發(fā)現(xiàn)吞吐量只剩原來的三分之一,但客戶說“錢比速度重要”——這一行哪有銀彈,全是取舍。

不同的業(yè)務(wù)場(chǎng)景,情況不一樣。

3.應(yīng)對(duì)重復(fù)消費(fèi)的方案

接下來,需要解決消息的重復(fù)消費(fèi)問題。

3.1 唯一ID

訂單系統(tǒng)的架構(gòu)課代表代碼:

// 雪花算法生成全局唯一ID
Snowflake snowflake = new Snowflake(datacenterId, machineId);
String bizId = "ORDER_" + snowflake.nextId();

// 查重邏輯(Redis原子操作)
String key = "msg:" + bizId;
if(redis.setnx(key, "1")) {
    redis.expire(key, 72 * 3600);
    processMsg();
}

先使用雪花算法生成全局唯一ID,然后使用Redis的setnx命令加分布式鎖,來保證請(qǐng)求的唯一性。

某次促銷活動(dòng)因Redis集群抖動(dòng),導(dǎo)致重復(fù)扣款。

后來改用:本地布隆過濾器+分布式Redis 雙校驗(yàn),總算解決這個(gè)世紀(jì)難題。

3.2 冪等設(shè)計(jì)

針對(duì)不同業(yè)務(wù)場(chǎng)景的三種對(duì)策:

場(chǎng)景

代碼示例

關(guān)鍵點(diǎn)

強(qiáng)一致性

SELECT FOR UPDATE先查后更新

數(shù)據(jù)庫行鎖

最終一致性

版本號(hào)控制(類似CAS)

樂觀鎖重試3次

補(bǔ)償型事務(wù)

設(shè)計(jì)反向操作(如退款、庫存回滾)

操作日志必須落庫

去年重構(gòu)用戶積分系統(tǒng)時(shí),就靠著這個(gè)三板斧把錯(cuò)誤率從0.1%降到了0.001%:

積分變更冪等示例如下:

public void addPoints(String userId, String orderId, Long points) {
    if (pointLogDao.exists(orderId)) return;
    
    User user = userDao.selectForUpdate(userId); // 悲觀鎖
    user.setPoints(user.getPoints() + points);
    userDao.update(user);
    pointLogDao.insert(new PointLog(orderId)); // 冪等日志
}

這里使用了數(shù)據(jù)庫行鎖實(shí)現(xiàn)的冪等性。

3.3 死信隊(duì)列

RabbitMQ的終極保命配置如下:

// 消費(fèi)者設(shè)置手動(dòng)ACK
channel.basicConsume(queue, false, deliverCallback, cancelCallback);

// 達(dá)到重試上限后進(jìn)入死信隊(duì)列
public void process(Message msg) {
    try {
        doBiz();
        channel.basicAck(deliveryTag);
    } catch(Exception e) {
        if(retryCount < 3) {
            channel.basicNack(deliveryTag, false, true);
        } else {
            channel.basicNack(deliveryTag, false, false); // 進(jìn)入DLX
        }
    }
}

消費(fèi)者端手動(dòng)ACK消息。

在消費(fèi)者端消費(fèi)消息時(shí),如果消費(fèi)失敗次數(shù),達(dá)到重試上限后進(jìn)入死信隊(duì)列。

這個(gè)方案救了社交系統(tǒng)的推送服務(wù)——通過DLX收集全部異常消息,凌晨用補(bǔ)償Job重跑。

4.系統(tǒng)架構(gòu)設(shè)計(jì)

接下來,從系統(tǒng)架構(gòu)設(shè)計(jì)的角度,聊聊MQ要如何保證數(shù)據(jù)一致性?

4.1 生產(chǎn)者端

對(duì)于實(shí)效性要求不太高的業(yè)務(wù)場(chǎng)景,可以使用:本地事務(wù)表+定時(shí)任務(wù)掃描的補(bǔ)償方案。

流程圖如下:

圖片圖片

4.2 消費(fèi)者端

消費(fèi)者端為了防止消息風(fēng)暴,要設(shè)置合理的并發(fā)消費(fèi)線程數(shù)。

流程圖如下:

圖片圖片

4.3 終極方案

對(duì)于實(shí)時(shí)性要求比較高的業(yè)務(wù)場(chǎng)景,可以使用 事務(wù)消息+本地事件表 的黃金組合.

流程圖如下:

圖片圖片

5.血淚經(jīng)驗(yàn)十條

  1. 消息必加唯一業(yè)務(wù)ID(別用MQ自帶的ID)
  2. 消費(fèi)邏輯一定要冪等(重復(fù)消費(fèi)是必然事件)
  3. 數(shù)據(jù)庫事務(wù)和消息發(fā)送必須二選一(或者用事務(wù)消息)
  4. 消費(fèi)者線程數(shù)不要超過分區(qū)數(shù)*2(Kafka的教訓(xùn))
  5. 死信隊(duì)列必須加監(jiān)控報(bào)警(別等客服找你)
  6. 測(cè)試環(huán)境一定要模擬網(wǎng)絡(luò)抖動(dòng)(chaos engineering)
  7. 消息體要兼容版本號(hào)(血的教訓(xùn)警告)
  8. 不要用消息隊(duì)列做業(yè)務(wù)主流程(它只配當(dāng)輔助)
  9. 消費(fèi)者offset定時(shí)存庫(防止重平衡丟消息)
  10. 業(yè)務(wù)指標(biāo)和MQ監(jiān)控要聯(lián)動(dòng)(比如訂單量和消息量的波動(dòng)要同步)

總結(jié)

(MQ)消息隊(duì)列像金融系統(tǒng)的SWIFT結(jié)算網(wǎng)絡(luò),看似簡(jiǎn)單實(shí)則處處殺機(jī)。

真正的高手不僅要會(huì)調(diào)參,更要設(shè)計(jì)出能兼容可靠性與性能的架構(gòu)。

記住,分布式系統(tǒng)的數(shù)據(jù)一致性不是銀彈,而是通過層層防御達(dá)成的動(dòng)態(tài)平衡。

就像當(dāng)年我在做資金結(jié)算系統(tǒng)時(shí),老板說的那句震耳發(fā)聵的話:“寧可慢十秒,不可錯(cuò)一分”。

責(zé)任編輯:武曉燕 來源: 蘇三說技術(shù)
相關(guān)推薦

2024-12-26 15:01:29

2023-09-07 08:11:24

Redis管道機(jī)制

2024-08-20 16:13:52

2023-05-26 07:34:50

RedisMySQL緩存

2021-12-14 07:15:57

MySQLRedis數(shù)據(jù)

2024-01-22 08:52:00

AQS雙異步數(shù)據(jù)一致性

2024-07-04 12:36:50

2023-09-15 14:24:54

ByteHouseClickHouse開源

2022-08-23 07:46:45

數(shù)據(jù)一致性數(shù)據(jù)庫

2019-08-30 12:46:10

并發(fā)扣款查詢SQL

2022-12-05 08:24:32

mongodb數(shù)據(jù)庫數(shù)據(jù)

2022-10-19 12:22:53

并發(fā)扣款一致性

2023-12-11 12:27:31

并發(fā)Zookeeper數(shù)據(jù)

2022-02-17 21:04:27

數(shù)據(jù)庫MysqlRedis

2018-08-14 10:39:04

數(shù)據(jù)錯(cuò)誤DIX

2021-03-04 06:49:53

RocketMQ事務(wù)

2025-04-27 08:52:21

Redis數(shù)據(jù)庫緩存

2022-09-15 10:37:46

MySQLRedis數(shù)據(jù)一致性

2021-12-05 21:06:27

軟件

2020-08-05 08:46:10

NFS網(wǎng)絡(luò)文件系統(tǒng)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)