自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

消息隊(duì)列批量收發(fā)消息,請(qǐng)避開這五個(gè)坑!

開發(fā) 架構(gòu)
使用批量消息,在一定程度上可以提高性能和吞吐量,但是確實(shí)也會(huì)存在一些問題,使用的時(shí)候要結(jié)合業(yè)務(wù)場(chǎng)景避開這些坑。

大家好,我是君哥。

使用消息隊(duì)列時(shí),為了提高生產(chǎn)和消費(fèi)的性能,有時(shí)會(huì)開啟批量處理。

在生產(chǎn)端,生產(chǎn)者發(fā)送的消息先發(fā)送到一個(gè)消息列表,積累到一定的消息量之后再批量發(fā)送給 Broker,如下圖:

在消費(fèi)端,消費(fèi)者拉取消息后先不立即處理,而是把消息轉(zhuǎn)存到一個(gè)內(nèi)存隊(duì)列或數(shù)據(jù)庫(kù),由業(yè)務(wù)線程去處理,如下圖:

無論是生產(chǎn)者做批量發(fā)送,還是消費(fèi)者做批量處理,都需要考慮使用批量消息的業(yè)務(wù)場(chǎng)景,避免踩坑。下面看一下批量操作可能會(huì)遇到哪些坑。

批量大小

當(dāng)生產(chǎn)者采用批量發(fā)送的方式來提高發(fā)送性能時(shí),一定要考慮發(fā)送消息的批量大小。下面是 RocketMQ 批量發(fā)送的官方示例:

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
    producer.send(messages);
} catch (Exception e) {
    e.printStackTrace();
    //handle the error
}

RocketMQ 默認(rèn)消息大小是 4M,由 maxMessageSize 參數(shù)控制,如果批量消息大小超過 maxMessageSize,則會(huì)拋出異常。

如果遇到消息大小超過 maxMessageSize 的情況時(shí),可以用下面方法進(jìn)行處理:

  • 把這個(gè)參數(shù)改大,但需要考慮 Broker 的性能和網(wǎng)絡(luò)帶寬;
  • 將消息進(jìn)行拆分后分批發(fā)送;
  • 對(duì)消息進(jìn)行壓縮處理。

RabbitMQ 相關(guān)的 API 則提供了更加靈活的批量控制,對(duì)消息數(shù)量和消息大小都做了控制,下面看一下源碼:

冪等

消費(fèi)端可以批量拉取消息進(jìn)行消費(fèi),這樣可以減少拉取消息時(shí)的 RPC 次數(shù),提升消費(fèi)性能。比如在 RocketMQ 中,可以通過 Consumer 中的 pullBatchSize 來設(shè)置一次拉取的消息數(shù)量,通過 consumeMessageBatchMaxSize 參數(shù)來設(shè)置一次消費(fèi)的消息數(shù)量。

但需要注意的是,如果批量消息中一條消息消費(fèi)失敗了,這一批消息都需要進(jìn)行重試,已經(jīng)消費(fèi)成功的消息會(huì)被重復(fù)消費(fèi),帶來業(yè)務(wù)問題。

為了不對(duì)業(yè)務(wù)造成影響,必須考慮冪等。一個(gè)簡(jiǎn)單的方法是在消息中增加全局唯一 id 屬性,對(duì)消息消費(fèi)結(jié)果進(jìn)行記錄,消費(fèi)成功后保存 id。這樣在消費(fèi)消息之前先查詢是否存在消費(fèi)成功的記錄,如果存在則直接返回處理成功。

時(shí)延

在使用消息隊(duì)列進(jìn)行批量操作時(shí),必須要考慮到時(shí)延問題。比如我們?cè)O(shè)置一個(gè)批次 100 條消息,積累夠 100 條消息后再發(fā)送,在消息量小的情況下,可能積累夠 100 條消息會(huì)很長(zhǎng)時(shí)間,導(dǎo)致消費(fèi)端拉取到一條消息時(shí)延很大。

雖然消息隊(duì)列的一個(gè)重要作用是削峰填谷,但在一些場(chǎng)景下,對(duì)消息的實(shí)時(shí)性也有要求。比如在車聯(lián)網(wǎng)的充電場(chǎng)景,車聯(lián)網(wǎng)平臺(tái)需要實(shí)時(shí)感知充電樁的狀態(tài),如果充電樁積累夠一批消息再上報(bào)平臺(tái),平臺(tái)獲取到的狀態(tài)會(huì)不準(zhǔn)確,如果心跳消息延時(shí)太久,平臺(tái)會(huì)認(rèn)為充電樁離線。

對(duì)于有時(shí)延要求又需要批量操作的場(chǎng)景,可以設(shè)置一個(gè)超時(shí)時(shí)間,超時(shí)后即使消息數(shù)量不夠,也會(huì)發(fā)送出去。看下 RabbitMQ 的處理:

public synchronized void send(String exchange, String routingKey, Message message, CorrelationData correlationData)
  throws AmqpException {
 if (correlationData != null) {
  //...
  super.send(exchange, routingKey, message, correlationData);
 }
 else {
  if (this.scheduledTask != null) {
   this.scheduledTask.cancel(false);
  }
  MessageBatch batch = this.batchingStrategy.addToBatch(exchange, routingKey, message);
  if (batch != null) {
   super.send(batch.getExchange(), batch.getRoutingKey(), batch.getMessage(), null);
  }
  //這里獲取到超時(shí)時(shí)間,到達(dá)超時(shí)時(shí)間后使用定時(shí)器將消息發(fā)送出去
  Date next = this.batchingStrategy.nextRelease();
  if (next != null) {
   this.scheduledTask = this.scheduler.schedule((Runnable) () -> releaseBatches(), next);
  }
 }
}

可靠性

使用批處理一定要考慮可靠性的問題。

在消費(fèi)端,消費(fèi)者批量拉取一批消息后把消息暫存到一個(gè)內(nèi)存臨時(shí)隊(duì)列,然后多線程去臨時(shí)隊(duì)列消費(fèi)消息,如果服務(wù)宕機(jī),臨時(shí)隊(duì)列中的消息會(huì)丟失。

為了避免宕機(jī)引發(fā)的損失,可以拉取一批消息后保存到數(shù)據(jù)庫(kù),然后給 Broker 返回 ACK,之后業(yè)務(wù)代碼去數(shù)據(jù)庫(kù)查詢消息并消費(fèi),不過要考慮數(shù)據(jù)庫(kù)大事務(wù)、鎖競(jìng)爭(zhēng)等問題。

當(dāng)然,對(duì)于一些消息丟失不敏感的場(chǎng)景,比如日志收集之類的,可靠性這個(gè)指標(biāo)是不用太關(guān)注的。

特殊場(chǎng)景

因?yàn)榕肯⒂幸恍?fù)雜性,消息隊(duì)列的部分特性不支持。

事務(wù)消息

批量消息會(huì)增加消息重試的難度,所以對(duì)于事務(wù)消息,建議使用單條消息,一條消息對(duì)應(yīng)一個(gè)事務(wù)。

順序消息

順序消息的實(shí)現(xiàn)思路一般是生產(chǎn)者將消息發(fā)送到同一個(gè)分區(qū),消費(fèi)者綁定這個(gè)分區(qū)并使用單線程消費(fèi)這個(gè)分區(qū)的消息。如果對(duì)同一個(gè) Topic 下的同一個(gè)分區(qū)來實(shí)現(xiàn)批量發(fā)送,難度會(huì)增大。所以建議順序消息使用單條消息進(jìn)行發(fā)送。

延時(shí)消息

如果延時(shí)消息使用批量進(jìn)行發(fā)送,這一批消息的延時(shí)時(shí)間必須相同,同時(shí)要考慮批量消息的超時(shí)時(shí)間,超時(shí)時(shí)間太大會(huì)影響延時(shí)時(shí)間的準(zhǔn)確性,生產(chǎn)端實(shí)現(xiàn)復(fù)雜度大大增加。

總結(jié)

使用批量消息,在一定程度上可以提高性能和吞吐量,但是確實(shí)也會(huì)存在一些問題,使用的時(shí)候要結(jié)合業(yè)務(wù)場(chǎng)景避開這些坑。

責(zé)任編輯:姜華 來源: 君哥聊技術(shù)
相關(guān)推薦

2022-07-26 20:00:35

場(chǎng)景RabbitMQMQ

2020-09-14 11:50:21

SpringBootRabbitMQJava

2017-07-28 09:30:55

2017-10-11 15:08:28

消息隊(duì)列常見

2025-03-28 10:06:01

架構(gòu)輪詢延時(shí)

2022-08-22 08:45:57

Kafka網(wǎng)絡(luò)層源碼實(shí)現(xiàn)

2023-09-26 08:20:12

消息隊(duì)列RabbitMQ

2020-10-09 15:00:56

實(shí)時(shí)消息編程語言

2016-08-24 15:43:01

2015-08-12 10:10:21

2025-03-28 12:20:00

代碼C#異步編程

2020-10-10 12:46:17

編程指南誤區(qū)

2019-07-19 07:56:13

消息隊(duì)列消息代理消息中間件

2010-04-21 12:39:48

Unix 消息隊(duì)列

2009-12-07 09:23:05

2017-02-27 14:25:50

Java隊(duì)列Web

2022-04-12 11:15:31

Redis消息隊(duì)列數(shù)據(jù)庫(kù)

2024-11-11 00:00:10

2012-09-24 11:48:05

IBMdw

2009-11-09 11:15:06

WCF消息隊(duì)列
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)