自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

4 張圖,9 個(gè)維度告訴你怎么做能確保 RocketMQ 不丟失消息

開發(fā) 架構(gòu)
引入消息隊(duì)列可以方便地實(shí)現(xiàn)系統(tǒng)解耦、削峰填谷等作用。但是消息隊(duì)列使用不當(dāng),可能會(huì)引起消息丟失,在一些消息敏感的業(yè)務(wù)場景下,這是不允許的。今天我們來聊一聊 RocketMQ 怎么做能確保消息不丟失。

大家好,我是君哥。

引入消息隊(duì)列可以方便地實(shí)現(xiàn)系統(tǒng)解耦、削峰填谷等作用。但是消息隊(duì)列使用不當(dāng),可能會(huì)引起消息丟失,在一些消息敏感的業(yè)務(wù)場景下,這是不允許的。今天我們來聊一聊 RocketMQ 怎么做能確保消息不丟失。

1 RocketMQ 簡介

RocketMQ 是阿里巴巴開源的分布式消息中間件,整體架構(gòu)如下圖:

RocketMQ 主要包括 Producer、Consumer 和 Broker,同時(shí) Name Server 進(jìn)行集群注冊(cè)管理和保存元數(shù)據(jù)。

2 消息不丟失

要想保證消息不丟失,需要從以下幾個(gè)方面考慮:

  • Producer 發(fā)送消息
  • Broker 保存消息
  • Consumer 消費(fèi)消息
  • Broker 主從切換

維度 1:同步發(fā)送,代碼如下:

public void send() throws Exception {
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.putUserProperty("name1","value1");
SendResult sendResult = null;

DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
try {
sendResult = producer.send(sendMessage);
} catch (Exception e) {
e.printStackTrace();
}
if (sendResult != null) {
System.out.println(sendResult.getSendStatus());
}
}

同步發(fā)送會(huì)返回 4 個(gè)狀態(tài)碼:

  • SEND_OK:消息發(fā)送成功。需要注意的是,消息發(fā)送到 broker 后,還有兩個(gè)操作:消息刷盤和消息同步到 slave 節(jié)點(diǎn),默認(rèn)這兩個(gè)操作都是異步的,只有把這兩個(gè)操作都改為同步,SEND_OK 這個(gè)狀態(tài)才能真正表示發(fā)送成功。
  • FLUSH_DISK_TIMEOUT:消息發(fā)送成功但是消息刷盤超時(shí)。
  • FLUSH_SLAVE_TIMEOUT:消息發(fā)送成功但是消息同步到 slave 節(jié)點(diǎn)時(shí)超時(shí)。
  • SLAVE_NOT_AVAILABLE:消息發(fā)送成功但是 broker 的 slave 節(jié)點(diǎn)不可用。

根據(jù)返回的狀態(tài)碼,可以做消息重試,這里設(shè)置的重試次數(shù)是 3。

消息重試時(shí),消費(fèi)端一定要做好冪等處理。

維度 2:異步發(fā)送,代碼如下:

public void sendAsync() throws Exception {
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.putUserProperty("name1","value1");

DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
producer.send(sendMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {

}

@Override
public void onException(Throwable e) {
// TODO 可以在這里加入重試邏輯
}
});
}

異步發(fā)送,可以重寫回調(diào)函數(shù),回調(diào)函數(shù)捕獲到 Exception 時(shí)表示發(fā)送失敗,這時(shí)可以進(jìn)行重試,這里設(shè)置的重試次數(shù)是 3。

維度 3:刷盤策略

  • 異步刷盤:默認(rèn)。消息寫入 CommitLog 時(shí),并不會(huì)直接寫入磁盤,而是先寫入 PageCache 緩存后返回成功,然后用后臺(tái)線程異步把消息刷入磁盤。異步刷盤提高了消息吞吐量,但是可能會(huì)有消息丟失的情況,比如斷點(diǎn)導(dǎo)致機(jī)器停機(jī),PageCache 中沒來得及刷盤的消息就會(huì)丟失。
  • 同步刷盤:消息寫入內(nèi)存后,立刻請(qǐng)求刷盤線程進(jìn)行刷盤,如果消息未在約定的時(shí)間內(nèi)(默認(rèn) 5 s)刷盤成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到這個(gè)響應(yīng)后,可以進(jìn)行重試。同步刷盤策略保證了消息的可靠性,同時(shí)降低了吞吐量,增加了延遲。要開啟同步刷盤,需要增加下面配置:
flushDiskType=SYNC_FLUSH

維度 4:Broker 多副本和高可用

Broker 為了保證高可用,采用一主多從的方式部署。如下圖:

消息發(fā)送到 master 節(jié)點(diǎn)后,slave 節(jié)點(diǎn)會(huì)從 master 拉取消息保持跟 master 的一致。這個(gè)過程默認(rèn)是異步的,即 master 收到消息后,不等 slave 節(jié)點(diǎn)復(fù)制消息就直接給 Producer 返回成功。

這樣會(huì)有一個(gè)問題,如果 slave 節(jié)點(diǎn)還沒有完成消息復(fù)制,這時(shí) master 宕機(jī)了,進(jìn)行主備切換后就會(huì)有消息丟失。為了避免這個(gè)問題,可以采用 slave 節(jié)點(diǎn)同步復(fù)制消息,即等 slave 節(jié)點(diǎn)復(fù)制消息成功后再給 Producer 返回發(fā)送成功。只需要增加下面的配置:

brokerRole=SYNC_MASTER

改為同步復(fù)制后,消息復(fù)制流程如下:

  • slave 初始化后,跟 master 建立連接并向 master 發(fā)送自己的 offset;
  • master 收到 slave 發(fā)送的 offset 后,將 offset 后面的消息批量發(fā)送給 slave;
  • slave 把收到的消息寫入 commitLog 文件,并給 master 發(fā)送新的 offset;
  • master 收到新的 offset 后,如果 offset >= producer 發(fā)送消息后的 offset,給 Producer 返回 SEND_OK。

維度 5:消息確認(rèn)

Consumer 消費(fèi)消息的代碼如下:

public void consume() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("topic1", "tag1");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try{
System.out.printf("Receive New Messages: %s", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
}

如果 Consumer 消費(fèi)成功,返回 CONSUME_SUCCESS,提交 offset 并從 Broker 拉取下一批消息。

維度 6:Consumer 重試

Consumer 消費(fèi)失敗,這里有 3 種情況:

  • 返回 RECONSUME_LATER
  • 返回 null
  • 拋出異常

Broker 收到這個(gè)響應(yīng)后,會(huì)把這條消息放入重試隊(duì)列,重新發(fā)送給 Consumer。

注意:

  • Broker 默認(rèn)最多重試 16 次,如果重試 16 次都失敗,就把這條消息放入死信隊(duì)列,Consumer 可以訂閱死信隊(duì)列進(jìn)行消費(fèi)。
  • 重試只有在集群模式(MessageModel.CLUSTERING)下生效,在廣播模式(MessageModel.BROADCASTING)下是不生效的。
  • Consumer 端一定要做好冪等處理。

其實(shí)重試 3 次都失敗就可以說明代碼有問題,這時(shí) Consumer 可以把消息存入本地,給 Broker 返回CONSUME_SUCCESS 來結(jié)束重試。代碼如下:

int count = ((MessageExt) msgs).getReconsumeTimes();
if (count > 2) {
//TODO 把消息寫入本地存儲(chǔ)
System.out.println("重試次數(shù)超過3次");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

維度7:事務(wù)消息

RocketMQ支持事務(wù)消息,整體流程如下圖:

  • Producer 發(fā)送 half 消息;
  • Broker 先把消息寫入 topic 是 RMQ_SYS_TRANS_HALF_TOPIC 的隊(duì)列,之后給 Producer 返回成功;
  • Producer 執(zhí)行本地事務(wù),成功后給 Broker 發(fā)送 commit 命令(本地事務(wù)執(zhí)行失敗則發(fā)送 rollback);
  • Broker 收到 commit 請(qǐng)求后把消息狀態(tài)更改為成功并把消息推到真正的 topic;
  • Consumer 拉取消息進(jìn)行消費(fèi)。

代碼如下:

public class ProducerTransactionListenerImpl implements TransactionListener {

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
/**
* 這里執(zhí)行本地事務(wù),執(zhí)行成功返回LocalTransactionState.COMMIT_MESSAGE,執(zhí)行失敗返回
* LocalTransactionState.ROLLBACK_MESSAGE,如果返回LocalTransactionState.UNKNOW,
* Broker會(huì)回來查詢,所以需要記錄事務(wù)執(zhí)行狀態(tài)
*/
return LocalTransactionState.COMMIT_MESSAGE;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
/**
* 這里查詢事務(wù)執(zhí)行狀態(tài),根據(jù)事務(wù)狀態(tài)返回LocalTransactionState.COMMIT_MESSAGE或
* LocalTransactionState.ROLLBACK_MESSAGE,如果沒有查詢到返回LocalTransactionState.UNKNOW,
* Broker會(huì)再次查詢,可以記錄查詢次數(shù),超過次數(shù)后返回ROLLBACK_MESSAGE
*/
return LocalTransactionState.UNKNOW;
}
}

維度 8:消息索引

我們知道,RocketMQ 核心的數(shù)據(jù)文件有 3 個(gè):CommitLog、ConsumeQueue 和 Index。其中Index 文件就是一個(gè)索引文件,結(jié)構(gòu)如下圖:

查找消息時(shí),首先根據(jù)消息 key 的 hashcode 計(jì)算出 Hash 槽的位置,然后讀取 Hash 槽的值計(jì)算 Index 條目的位置,從Index 條目位置讀取到消息在 CommitLog 文件中的 offset,從而查找到消息。

在 Producer 發(fā)送消息時(shí),可以指定一個(gè) key,代碼如下:

Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.setKeys("weiyiid");

這樣可以通過 RocketMQ 提供的命令或者管理控制臺(tái)來查詢消息是否發(fā)送成功。

維度 9:極端情況

如果對(duì)消息丟失零容忍,我們必須要考慮極端情況,比如整個(gè) RocketMQ 集群掛了,這時(shí) Producer 端發(fā)送消息一定會(huì)失敗,可以考慮在 Producer 端做降級(jí),把要發(fā)送的消息保存到本地?cái)?shù)據(jù)庫或磁盤,等 RocketMQ 恢復(fù)以后再把本地消息推送出去。

3 總結(jié)

在一些特殊的業(yè)務(wù)場景,比如支付、銀行核算等,需要確保消息不丟失,但是同時(shí)也要看到,消息不丟失的方案會(huì)大大降低 RocketMQ 的吞吐量,需要綜合考慮。


責(zé)任編輯:武曉燕 來源: 君哥聊技術(shù)
相關(guān)推薦

2022-09-26 10:43:13

RocketMQ保存消息

2022-08-01 10:43:11

RocketMQZookeeper注冊(cè)中心

2024-08-06 09:55:25

2022-08-15 10:45:34

RocketMQ消息隊(duì)列

2020-10-09 06:55:23

監(jiān)控告警日志

2022-09-26 11:32:14

用戶分層服務(wù)業(yè)務(wù)

2021-03-18 12:16:44

用戶分層業(yè)務(wù)

2022-03-31 08:26:44

RocketMQ消息排查

2022-06-13 11:05:35

RocketMQ消費(fèi)者線程

2022-07-11 11:06:11

RocketMQ函數(shù).消費(fèi)端

2022-12-16 17:15:33

MQRabbitMQ

2022-12-19 17:44:25

MQ技術(shù)RabbitMQ

2022-07-04 11:06:02

RocketMQ事務(wù)消息實(shí)現(xiàn)

2022-06-27 11:04:24

RocketMQ順序消息

2022-04-25 15:01:07

系統(tǒng)程序員調(diào)度

2022-09-16 15:42:00

數(shù)據(jù)Kafka

2020-04-06 14:53:05

MySQL數(shù)據(jù)庫字符串

2021-04-13 15:51:46

服務(wù)治理流量

2021-04-13 18:16:07

多線程安全代碼

2012-07-20 17:24:51

HTML5
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)