自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

消息隊(duì)列技術(shù)選型:這七種消息場景一定要考慮!

開發(fā) 架構(gòu)
消息隊(duì)列技術(shù)選型,要考慮的因素很多,本文主要從業(yè)務(wù)場景來分析需要考慮的因素,同時(shí)技術(shù)上也需要考慮運(yùn)維復(fù)雜度、業(yè)務(wù)規(guī)模、社區(qū)活躍度、學(xué)習(xí)成本等因素。希望本文對(duì)你使用消息隊(duì)列有所幫助。

大家好,我是君哥。

我們?cè)谧鱿㈥?duì)列的技術(shù)選型時(shí),往往會(huì)結(jié)合業(yè)務(wù)場景進(jìn)行考慮。今天來聊一聊消息隊(duì)列可能會(huì)用到的 7 種消息場景。

1、普通消息

消息隊(duì)列最基礎(chǔ)的功能就是生產(chǎn)者發(fā)送消息、Broker 保存消息,消費(fèi)者來消費(fèi)消息,以此實(shí)現(xiàn)系統(tǒng)解耦、削峰填谷的作用。

普通消息是消息隊(duì)列必備的消息類型,也是系統(tǒng)使用場景最多的一種消息。

2、順序消息

順序消息是指生產(chǎn)者發(fā)送消息的順序和消費(fèi)者消費(fèi)消息的順序是一致的。比如在一個(gè)電商場景,同一個(gè)用戶提交訂單、訂單支付、訂單出庫,這三個(gè)消息消費(fèi)者需要按照順序來進(jìn)行消費(fèi)。如下圖:

順序消息的實(shí)現(xiàn)并不容易,原因如下:

  • 生產(chǎn)者集群中,有多個(gè)生產(chǎn)者發(fā)送消息,網(wǎng)絡(luò)延遲不一樣,很難保證發(fā)送到 Broker 的消息落盤順序是一致的。
  • 如果 Broker 有多個(gè)分區(qū)或隊(duì)列,生產(chǎn)者發(fā)送的消息會(huì)進(jìn)入多個(gè)分區(qū),也無法保證順序消費(fèi)。
  • 如果有多個(gè)消費(fèi)者來異步消費(fèi)同一個(gè)分區(qū),很難保證消費(fèi)順序跟生產(chǎn)者發(fā)送順序一致。

要保證消息有序,需要滿足兩個(gè)條件:

  • 同一個(gè)生產(chǎn)者必須同步發(fā)送消息到同一個(gè)分區(qū)。
  • 一個(gè)分區(qū)只能給同一個(gè)消費(fèi)者消費(fèi)。

如下圖:

上面第二個(gè)條件是比較容易實(shí)現(xiàn)的,一個(gè)分區(qū)綁定一個(gè)消費(fèi)者就可以,主要是第一個(gè)條件。

在主流消息隊(duì)列的實(shí)現(xiàn)中,Kafka 和 Pulsar 的實(shí)現(xiàn)方式類似,生產(chǎn)者給消息賦值一個(gè) key,對(duì) key 做 Hash 運(yùn)算來指定消息發(fā)送到哪一個(gè)分區(qū)。比如上面電商的例子,對(duì)同一個(gè)用戶的一筆訂單,提交訂單、訂單支付、訂單出庫這三個(gè)消息賦值同一個(gè) key,就可以把這三條消息發(fā)送到同一個(gè)分區(qū)。

對(duì)于 RocketMQ,生產(chǎn)者在發(fā)送消息的時(shí)候,可以通過 MessageQueueSelector 指定把消息投遞到那個(gè) MessageQueue,如下圖:

示例代碼如下:

public static void main(String[] args) throws UnsupportedEncodingException {
 try {
  DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  producer.start();

  String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
  for (int i = 0; i < 100; i++) {
   int orderId = i % 10;
   Message msg =
    new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
     ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
   SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
     Integer id = (Integer) arg;
     int index = id % mqs.size();
     return mqs.get(index);
    }
   }, orderId);

   System.out.printf("%s%n", sendResult);
  }

  producer.shutdown();
 } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
  e.printStackTrace();
 }
}

RabbitMQ 的實(shí)現(xiàn)是 Exchange 根據(jù)設(shè)置好的 Route Key 將數(shù)據(jù)路由到不同的 Queue 中。示例代碼如下:

@Resource
private AmqpTemplate rabbitTemplate;

public void send1(String message) {
 rabbitTemplate.convertAndSend("testExchange", "testRoutingKey", message);
}

3、延時(shí)消息

或者也叫定時(shí)消息,是指消息發(fā)送后不會(huì)立即被消費(fèi),而是指定一個(gè)時(shí)間,到時(shí)間后再消費(fèi)。經(jīng)典的場景比如電商購物時(shí),30 分鐘未支付訂單,讓訂單自動(dòng)失效。

(1)RocketMQ 實(shí)現(xiàn)

RocketMQ 定義了 18 個(gè)延時(shí)級(jí)別,每個(gè)延時(shí)級(jí)別對(duì)應(yīng)一個(gè)延時(shí)時(shí)間。下面如果延遲級(jí)別是 3,則消息會(huì)延遲 10s 才會(huì)拉取。

//MessageStoreConfig類
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

RocketMQ 的延時(shí)消息如下圖:

生產(chǎn)者把消費(fèi)發(fā)送到 Broker 后,Broker 首先把消息保存到 SCHEDULE_TOPIC_XXXX 這個(gè) Topic,然后調(diào)度任務(wù)會(huì)判斷是否到期,如果到期,會(huì)把消息從 SCHEDULE_TOPIC_XXXX 取出投遞到原始的 queue,這樣消費(fèi)者就可以消費(fèi)到了。

RocketMQ 的延時(shí)消息只支持最大兩個(gè)小時(shí)的延時(shí),不過 RocketMQ5.0 基于時(shí)間輪算法實(shí)現(xiàn)了定時(shí)消息,解決了這個(gè)問題。

(2)Pulsar 實(shí)現(xiàn)

Pulsar 的實(shí)現(xiàn)如下圖:

Pulsar 的延時(shí)消息首先會(huì)寫入一個(gè) Delayed Message Tracker 的數(shù)據(jù)結(jié)構(gòu)中,Delayed Message Tracker 根據(jù)延時(shí)時(shí)間構(gòu)建 delayed index 優(yōu)先級(jí)隊(duì)列。消費(fèi)者拉取消息時(shí),首先去 Delayed Message Tracker 檢查是否有到期的消息。如果有則直接拉取進(jìn)行消費(fèi)。

(3)RabbitMQ 實(shí)現(xiàn)

RabbitMQ 的實(shí)現(xiàn)方式有兩種,一種是投遞到普通隊(duì)列都不消費(fèi),等消息過期后被投遞到死信隊(duì)列,消費(fèi)者消費(fèi)死信隊(duì)列。如下圖:

第二種方式是生產(chǎn)者發(fā)送消息時(shí),先發(fā)送到本地 Mnesia 數(shù)據(jù)庫,消息到期后定時(shí)器再將消息投遞到 broker。

(4)Kafka 實(shí)現(xiàn)

Kafka 本身并沒有延時(shí)隊(duì)列,不過可以通過生產(chǎn)者攔截器來實(shí)現(xiàn)消息延時(shí)發(fā)送,也可以定義延時(shí) Topic,利用類似 RocketMQ 的方案來實(shí)現(xiàn)延時(shí)消息。

4、事務(wù)消息

事務(wù)消息是指生產(chǎn)消息和消費(fèi)消息滿足事務(wù)的特性。

RabbitMQ 和 Kafka 的事務(wù)消息都是只支持生產(chǎn)消息的事務(wù)特性,即一批消息要不全部發(fā)送成功,要不全部發(fā)送失敗。

RabbitMQ 通過 Channel 來開啟事務(wù)消息,代碼如下:

ConnectionFactory factory=new ConnectionFactory();
cnotallow=factory.newConnection();
Channel channel=connection.createChannel();
//開啟事務(wù)
channel.txSelect();
channel.basicPublish("directTransactionExchange","transactionRoutingKey",null,message.getBytes("utf-8"));
//提交事務(wù) 或者 channel.txRollback()回滾事務(wù)
channel.txCommit();

Kafka 可以給多個(gè)生產(chǎn)者設(shè)置同一個(gè)事務(wù) ID ,從而把多個(gè) Topic 、多個(gè) Partition 放在一個(gè)事務(wù)中,實(shí)現(xiàn)原子性寫入。

Pulsar 的事務(wù)消息對(duì)于事務(wù)語義的定義是:允許事件流應(yīng)用將消費(fèi)、處理、生產(chǎn)消息整個(gè)過程定義為一個(gè)原子操作。可見,Pulsar 的事務(wù)消息可以覆蓋消息流整個(gè)過程。

RocketMQ 的事務(wù)消息是通過 half 消息來實(shí)現(xiàn)的。以電商購物場景來看,賬戶服務(wù)扣減賬戶金額后,發(fā)送消息給 Broker,庫存服務(wù)來消費(fèi)這條消息進(jìn)行扣減庫存。如下圖:

可見,RocketMQ 只能保證生產(chǎn)者發(fā)送消息和本地事務(wù)的原子性,并不能保證消費(fèi)消息的原子性。

5、軌跡消息

軌跡消息主要用于跟蹤消息的生命周期,當(dāng)消息丟失時(shí)可以很方便地找出原因。

軌跡消息也跟普通消息一樣,也需要存儲(chǔ)和查詢,也會(huì)占用消息隊(duì)列的資源,所以選擇軌跡消息要考慮下面幾點(diǎn):

  • 消息生命周期的關(guān)鍵節(jié)點(diǎn)一定要記錄。
  • 不能影響正常消息的發(fā)送和消費(fèi)性能。
  • 不能影響 Broker 的消息存儲(chǔ)性能。
  • 要考慮消息查詢維度和性能。

RabbitMQ Broker 實(shí)現(xiàn)了軌跡消息的功能,打開 Trace 開關(guān),就可以把軌跡消息發(fā)送到 amq.rabbitmq.trace 這個(gè) exchange,但是要考慮軌跡消息會(huì)不會(huì)給 Broker 造成 壓力進(jìn)而導(dǎo)致消息積壓。RabbitMQ 的生產(chǎn)者和消費(fèi)者都沒有實(shí)現(xiàn)軌跡消息,需要開發(fā)者自己來實(shí)現(xiàn)。

RocketMQ 生產(chǎn)者、Broker 和消費(fèi)者都實(shí)現(xiàn)了軌跡消息,不過默認(rèn)是關(guān)閉的,需要手工開啟。

使用軌跡消息,需要考慮記錄哪些節(jié)點(diǎn)、存儲(chǔ)介質(zhì)、性能、查詢方式等問題。

6、死信隊(duì)列

在消息隊(duì)列中,死信隊(duì)列主要應(yīng)對(duì)一些異常的情況,如下圖:

RocketMQ 實(shí)現(xiàn)了消費(fèi)端的死信隊(duì)列,當(dāng)消費(fèi)者消費(fèi)失敗時(shí),會(huì)進(jìn)行重試,如果重試 16 次還是失敗,則這條消息會(huì)被發(fā)送到死信隊(duì)列。

RabbitMQ 實(shí)現(xiàn)了生產(chǎn)者和 Broker 的死信隊(duì)列,下面三種情況,消息會(huì)被發(fā)送到死信隊(duì)列:

  • 生產(chǎn)者發(fā)送消息被拒絕,并且 requeue 參數(shù)設(shè)置為 false。
  • Broker 消息過期了。
  • 隊(duì)列達(dá)到最大長度。

RabbitMQ 消息變成死信消息后,會(huì)被發(fā)送到死信交換機(jī)(Dead-Letter-Exchange)。

7、優(yōu)先級(jí)消息

有一些業(yè)務(wù)場景下,我們需要優(yōu)先處理一些消息,比如銀行里面的金卡客戶、銀卡客戶優(yōu)先級(jí)高于普通客戶,他們的業(yè)務(wù)需要優(yōu)先處理。如下圖:

主流消息隊(duì)列中,RabbitMQ 是支持優(yōu)先級(jí)隊(duì)列的,代碼如下:

ConnectionFactory factory=new ConnectionFactory();
cnotallow=factory.newConnection();
Channel channel=connection.createChannel();
Map<String, Object> args = new HashMap<String, Object>();
//設(shè)置優(yōu)先級(jí)為 5
args.put("x-max-priority", 5);
channel.queueDeclare("my-priority-queue", true, false, false, args);

8、總結(jié)

消息隊(duì)列技術(shù)選型,要考慮的因素很多,本文主要從業(yè)務(wù)場景來分析需要考慮的因素,同時(shí)技術(shù)上也需要考慮運(yùn)維復(fù)雜度、業(yè)務(wù)規(guī)模、社區(qū)活躍度、學(xué)習(xí)成本等因素。希望本文對(duì)你使用消息隊(duì)列有所幫助。

責(zé)任編輯:姜華 來源: 君哥聊技術(shù)
相關(guān)推薦

2024-03-29 08:33:10

應(yīng)用場景存儲(chǔ)搜索

2023-11-27 13:42:00

消息隊(duì)列RocketMQ

2023-11-13 08:37:33

消息中間件分布式架構(gòu)

2023-09-19 15:33:50

Web實(shí)時(shí)消息推送

2018-03-13 09:00:01

IT架構(gòu)

2022-07-30 10:08:06

MQTT?協(xié)議物聯(lián)網(wǎng)

2019-01-29 11:02:30

消息中間件Java互聯(lián)網(wǎng)

2024-09-23 08:00:00

消息隊(duì)列MQ分布式系統(tǒng)

2024-09-18 07:00:00

消息隊(duì)列中間件消息隊(duì)列

2023-12-18 08:36:39

消息隊(duì)列微服務(wù)開發(fā)

2025-01-02 09:23:05

2017-07-04 08:00:23

熱門文章技術(shù)

2023-11-30 08:34:29

批量消息消息隊(duì)列

2017-10-11 15:08:28

消息隊(duì)列常見

2020-01-14 11:09:36

CIO IT技術(shù)

2022-05-31 08:21:07

MQ使用場景消費(fèi)消息

2021-02-19 09:19:11

消息隊(duì)列場景

2020-12-16 10:34:51

Linux 命令運(yùn)維

2024-01-16 08:24:59

消息隊(duì)列KafkaRocketMQ

2022-07-14 13:27:40

數(shù)據(jù)安全網(wǎng)絡(luò)安全
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)