自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

日常工作,MQ的八種常用使用場(chǎng)景

開(kāi)發(fā) 前端
在用戶注冊(cè)場(chǎng)景中,當(dāng)用戶信息保存成功后,系統(tǒng)需要發(fā)送一個(gè)短信、或者郵箱消息,通知用戶注冊(cè)成功。如果這個(gè)短信或者郵箱消息發(fā)送比較耗時(shí),則可能拖垮注冊(cè)接口。又或者如果調(diào)用第三方短信、郵件發(fā)送接口失敗,也會(huì)影響注冊(cè)接口。

前言

大家好,我是田螺。

我們?nèi)粘i_(kāi)發(fā)中,經(jīng)常跟MQ(消息隊(duì)列)打交道。本文田螺哥梳理了MQ的8種使用場(chǎng)景。

圖片圖片

1. 異步處理

面試官在問(wèn)我們MQ作用時(shí),很多伙伴馬上想到異步處理、解耦、流量削鋒等等。

MQ 最常見(jiàn)的應(yīng)用場(chǎng)景之一就是異步處理。

比如,在用戶注冊(cè)場(chǎng)景中,當(dāng)用戶信息保存成功后,系統(tǒng)需要發(fā)送一個(gè)短信、或者郵箱消息,通知用戶注冊(cè)成功。如果這個(gè)短信或者郵箱消息發(fā)送比較耗時(shí),則可能拖垮注冊(cè)接口。又或者如果調(diào)用第三方短信、郵件發(fā)送接口失敗,也會(huì)影響注冊(cè)接口。一般我們不希望一個(gè)通知類(lèi)的功能,去影響注冊(cè)主流程,這時(shí)候,則可以使用MQ來(lái)實(shí)現(xiàn)異步處理。

簡(jiǎn)要代碼如下:先保存用戶信息,然后發(fā)送注冊(cè)成功的MQ消息

// 用戶注冊(cè)方法
  public void registerUser(String username, String email, String phoneNumber) {
      // 保存用戶信息(簡(jiǎn)化版)
      userService.add(buildUser(username,email,phoneNumber))
      // 發(fā)送消息
      String registrationMessage = "User " + username + " has registered successfully.";
      // 發(fā)送消息到隊(duì)列
      rabbitTemplate.convertAndSend("registrationQueue", registrationMessage);
  }

消費(fèi)者從隊(duì)列中讀取消息并發(fā)送短信或郵件:

@Service
public class NotificationService {

    // 監(jiān)聽(tīng)消息隊(duì)列中的消息并發(fā)送短信/郵件
    @RabbitListener(queues = "registrationQueue")
    public void handleRegistrationNotification(String message) {
        // 這里可以進(jìn)行短信或郵件的發(fā)送操作
        System.out.println("Sending registration notification: " + message);

        // 假設(shè)這里是發(fā)送短信的操作
        sendSms(message);

        // 也可以做其他通知(比如發(fā)郵件等)
        sendEmail(message);
    }
  }

2. 解耦

在微服務(wù)架構(gòu)中,各個(gè)服務(wù)通常需要進(jìn)行相互通信。使用 MQ 可以幫助解耦服務(wù),避免直接調(diào)用導(dǎo)致的強(qiáng)耦合。

圖片圖片


一個(gè)電商平臺(tái)的庫(kù)存服務(wù)和支付服務(wù)。支付服務(wù)在處理支付后,需要向庫(kù)存服務(wù)發(fā)送扣庫(kù)存的請(qǐng)求,但不直接調(diào)用 API,而是通過(guò) MQ 發(fā)送消息,讓庫(kù)存服務(wù)異步處理。

支付服務(wù)在支付成功后將消息發(fā)送到 RocketMQ:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class PaymentService {

    private DefaultMQProducer producer;

    public PaymentService() throws Exception {
        producer = new DefaultMQProducer("PaymentProducerGroup");
        producer.setNamesrvAddr("localhost:9876");  // RocketMQ NameServer 地址
        producer.start();
    }

    public void processPayment(String orderId, int quantity) throws Exception {
        // 1. 模擬調(diào)用支付接口(例如:支付寶、微信支付等)
        boolean paymentSuccessful = callPayment(orderId, quantity);

        if (paymentSuccessful) {
            // 2. 支付成功后,創(chuàng)建支付消息并發(fā)送到 RocketMQ
            String messageBody = "OrderId: " + orderId + ", Quantity: " + quantity;
            Message message = new Message("paymentTopic", "paymentTag", messageBody.getBytes());
            producer.send(message);    
        }
    }
}

庫(kù)存服務(wù)從 RocketMQ 中消費(fèi)支付消息,并處理扣庫(kù)存的邏輯:

public class InventoryService {

    private DefaultMQPushConsumer consumer;

    public InventoryService() throws Exception {
        consumer = new DefaultMQPushConsumer("InventoryConsumerGroup");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("paymentTopic", "paymentTag");

        // 消息監(jiān)聽(tīng)器
        consumer.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                String messageBody = new String(msg.getBody());
                // 執(zhí)行扣庫(kù)存操作
                reduceStock(messageBody);
            }
            return null; // 返回消費(fèi)成功
        });

        consumer.start();
        System.out.println("InventoryService started...");
    }
}

3.流量削鋒

在高并發(fā)的情況下,有些請(qǐng)求可能會(huì)產(chǎn)生瞬時(shí)流量峰值,直接處理可能會(huì)導(dǎo)致服務(wù)過(guò)載。比如:

  • 春運(yùn)快到了,12306的搶票就是這種案例。
  • 又或者雙12這種大促,訂單壓力會(huì)比較大。
  • 秒殺的時(shí)候,也需要避免流量暴漲,打垮應(yīng)用系統(tǒng)的風(fēng)險(xiǎn)

這些場(chǎng)景,我們都可以使用MQ來(lái)進(jìn)行流量的削峰填谷,確保系統(tǒng)平穩(wěn)運(yùn)行。

圖片圖片

假設(shè)秒殺系統(tǒng)每秒最多可以處理2k個(gè)請(qǐng)求,每秒?yún)s有5k的請(qǐng)求過(guò)來(lái),可以引入消息隊(duì)列,秒殺系統(tǒng)每秒從消息隊(duì)列拉2k請(qǐng)求處理得了。

4.延時(shí)任務(wù)

在電商平臺(tái)的訂單處理中,如果用戶下單后一定時(shí)間內(nèi)未支付,需要自動(dòng)取消訂單。通過(guò)MQ的延時(shí)隊(duì)列功能,可以設(shè)置消息延遲消費(fèi)的時(shí)間,當(dāng)消息到達(dá)延遲時(shí)間后,由消費(fèi)者處理取消訂單的邏輯。

當(dāng)用戶下單時(shí),生成一個(gè)訂單并發(fā)送一條延遲消息到RocketMQ。延遲時(shí)間可以根據(jù)訂單的超時(shí)時(shí)間設(shè)置:

@Service
public class OrderService {
 
 @Autowired
 private RocketMQTemplate rocketMQTemplate;
 
 public void createOrder(Order order) {
  // 保存訂單邏輯(省略)
 
  // 計(jì)算延遲時(shí)間(單位:毫秒)
  long delay = order.getTimeout();
 
  // 發(fā)送延遲消息
  rocketMQTemplate.syncSend("orderCancelTopic:delay" + delay,
    MessageBuilder.withPayload(order).build(),
    10000, // 消息發(fā)送超時(shí)時(shí)間(單位:毫秒)
    (int) (delay / 1000) // RocketMQ的延遲級(jí)別是以秒為單位的,因此需要轉(zhuǎn)換為秒
  );
 }
}

注意:RocketMQ的延遲級(jí)別是固定的,如1s、5s、10s等。如果訂單的延遲時(shí)間不是RocketMQ支持的延遲級(jí)別的整數(shù)倍,那么消息將不會(huì)精確地在預(yù)期的延遲時(shí)間后被消費(fèi)。為了解決這個(gè)問(wèn)題,你可以選擇最接近的延遲級(jí)別,或者根據(jù)業(yè)務(wù)需求進(jìn)行適當(dāng)?shù)恼{(diào)整。

創(chuàng)建一個(gè)用來(lái)消費(fèi)延遲消息的消費(fèi)者,處理取消訂單的邏輯。例如:

@Component
@RocketMQMessageListener(topic = "orderCancelTopic", consumerGroup = "order-cancel-consumer-group")
public class OrderCancelListener implements RocketMQListener<Order> {
 
 @Override
 public void onMessage(Order order) {
  // 取消訂單邏輯
  // 檢查訂單狀態(tài),如果訂單仍處于未支付狀態(tài)則進(jìn)行取消
  System.out.println("Cancelling order: " + order.getOrderId());
  // (省略實(shí)際的取消訂單邏輯)
 }
}

5. 日志收集

消息隊(duì)列常用于日志系統(tǒng)中,將應(yīng)用生成的日志異步地發(fā)送到日志處理系統(tǒng),進(jìn)行統(tǒng)一存儲(chǔ)和分析。

假設(shè)你有一個(gè)微服務(wù)架構(gòu),每個(gè)微服務(wù)都會(huì)生成日志。你可以將這些日志通過(guò)消息隊(duì)列(如Kafka)發(fā)送到一個(gè)集中式的日志收集系統(tǒng)(如 ELK(Elasticsearch, Logstash, Kibana) 或 Fluentd),從而實(shí)現(xiàn)日志的統(tǒng)一管理。

生產(chǎn)者(發(fā)送日志到 Kafka)

// 配置和發(fā)送日志到 Kafka 主題 "app-logs"
KafkaProducer<String, String> producer = new KafkaProducer<>(config);
String logMessage = "{\"level\": \"INFO\", \"message\": \"Application started\", \"timestamp\": \"2024-12-29T20:30:59\"}";
producer.send(new ProducerRecord<>("app-logs", "log-key", logMessage));

消費(fèi)者(收集日志信息)

@Service
public class LogConsumer {
    // 使用 @KafkaListener 注解來(lái)消費(fèi) Kafka 中的日志
    @KafkaListener(topics = "app-logs", groupId = "log-consumer-group")
    public void consumeLog(String logMessage) {
        // 打印或處理收到的日志
        System.out.println("Received log: " + logMessage);
    }
}

6.分布式事務(wù)

業(yè)界經(jīng)常使用MQ來(lái)實(shí)現(xiàn)分布式事務(wù)。

我舉個(gè)下訂單的場(chǎng)景,使用MQ實(shí)現(xiàn)分布式事務(wù)的例子吧。

我們先來(lái)看,一條普通的MQ消息,從產(chǎn)生到被消費(fèi),大概流程如下:

圖片圖片

  • 生產(chǎn)者產(chǎn)生消息,發(fā)送帶MQ服務(wù)器
  • MQ收到消息后,將消息持久化到存儲(chǔ)系統(tǒng)。
  • MQ服務(wù)器返回ACk到生產(chǎn)者。
  • MQ服務(wù)器把消息push給消費(fèi)者
  • 消費(fèi)者消費(fèi)完消息,響應(yīng)ACK
  • MQ服務(wù)器收到ACK,認(rèn)為消息消費(fèi)成功,即在存儲(chǔ)中刪除消息。

回到下訂單這個(gè)例子,訂單系統(tǒng)創(chuàng)建完訂單后,再發(fā)送消息給下游系統(tǒng)。如果訂單創(chuàng)建成功,然后消息沒(méi)有成功發(fā)送出去,下游系統(tǒng)就無(wú)法感知這個(gè)事情,出導(dǎo)致數(shù)據(jù)不一致。

這時(shí)候就可以使用MQ實(shí)現(xiàn)分布式事務(wù)消息。大家看下這個(gè)流程:

圖片圖片

  • 生產(chǎn)者產(chǎn)生消息,發(fā)送一條半事務(wù)消息到MQ服務(wù)器
  • MQ收到消息后,將消息持久化到存儲(chǔ)系統(tǒng),這條消息的狀態(tài)是待發(fā)送狀態(tài)。
  • MQ服務(wù)器返回ACK確認(rèn)到生產(chǎn)者,此時(shí)MQ不會(huì)觸發(fā)消息推送事件
  • 生產(chǎn)者執(zhí)行本地事務(wù)
  • 如果本地事務(wù)執(zhí)行成功,即commit執(zhí)行結(jié)果到MQ服務(wù)器;如果執(zhí)行失敗,發(fā)送rollback。
  • 如果是正常的commit,MQ服務(wù)器更新消息狀態(tài)為可發(fā)送;如果是rollback,即刪除消息。
  • 如果消息狀態(tài)更新為可發(fā)送,則MQ服務(wù)器會(huì)push消息給消費(fèi)者。消費(fèi)者消費(fèi)完就回ACK。
  • 如果MQ服務(wù)器長(zhǎng)時(shí)間沒(méi)有收到生產(chǎn)者的commit或者rollback,它會(huì)反查生產(chǎn)者,然后根據(jù)查詢到的結(jié)果執(zhí)行最終狀態(tài)。

7. 遠(yuǎn)程調(diào)用

我以前公司(微眾)基于MQ(RocketMQ),自研了遠(yuǎn)程調(diào)用框架。

RocketMQ 作為遠(yuǎn)程調(diào)用框架,主要就是金融場(chǎng)景的適配性。

  • 消息查詢功能:RocketMQ提供了消息查詢功能,方便微眾銀行在需要時(shí)進(jìn)行消息對(duì)賬或問(wèn)題排查。
  • 金融級(jí)穩(wěn)定性:RocketMQ在金融領(lǐng)域的應(yīng)用非常廣泛,得到了眾多金融機(jī)構(gòu)的認(rèn)可。其穩(wěn)定性和可靠性能夠滿足微眾銀行對(duì)金融級(jí)消息服務(wù)的需求。

還有可以基于RocketMQ的定制開(kāi)發(fā):多中心多活、灰度發(fā)布、流量權(quán)重與消息去重、背壓模式(能夠根據(jù)后續(xù)服務(wù)的治理能力決定拉取的消息數(shù)量,確保系統(tǒng)的穩(wěn)定運(yùn)行。)

8. 廣播通知:事件驅(qū)動(dòng)的消息通知

消息隊(duì)列(MQ) 可以非常適合用于 廣播通知。在廣播通知場(chǎng)景中,消息隊(duì)列可以將消息推送到多個(gè)訂閱者,讓不同的服務(wù)或者應(yīng)用接收到通知。

  • 系統(tǒng)通知:向所有用戶廣播應(yīng)用更新、系統(tǒng)維護(hù)、公告通知等。
  • 事件驅(qū)動(dòng)的消息通知:如庫(kù)存更新、用戶狀態(tài)變化、訂單支付成功等事件通知,多個(gè)系統(tǒng)可以訂閱這個(gè)事件。

針對(duì)事件驅(qū)動(dòng)的消息通知,我們以 訂單支付成功 事件為例,假設(shè)多個(gè)系統(tǒng)(如庫(kù)存管理系統(tǒng)、用戶積分系統(tǒng)、財(cái)務(wù)系統(tǒng)等)都需要監(jiān)聽(tīng)這個(gè)事件來(lái)進(jìn)行相應(yīng)處理。

圖片

當(dāng)訂單支付成功 事件發(fā)生時(shí),系統(tǒng)會(huì)通過(guò)消息隊(duì)列廣播一個(gè)事件通知(比如消息內(nèi)容是訂單ID、支付金額等),其他系統(tǒng)可以根據(jù)這個(gè)事件來(lái)執(zhí)行相應(yīng)的操作,如:

  • 庫(kù)存系統(tǒng):根據(jù)訂單信息減少庫(kù)存。
  • 用戶積分系統(tǒng):增加用戶積分。
  • 財(cái)務(wù)系統(tǒng):記錄支付流水。

發(fā)送訂單支付成功事件:

// 創(chuàng)建訂單支付成功事件消息
String orderEventData = "{\"orderId\": 12345, \"userId\": 67890, \"amount\": 100.0, \"event\": \"ORDER_PAYMENT_SUCCESS\"}";
Message msg = new Message("order_event_topic", "order_payment_success", orderEventData.getBytes());

// 發(fā)送消息
producer.send(msg);

事件消費(fèi)者(接收并處理訂單支付成功事件):

  • 庫(kù)存系統(tǒng):
// 注冊(cè)消息監(jiān)聽(tīng)器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (Message msg : msgs) {
                String eventData = new String(msg.getBody());
                System.out.println("Inventory system received: " + eventData);
                
                // 處理庫(kù)存減少邏輯
                // 解析消息(假設(shè)是 JSON 格式)
                // updateInventory(eventData);  // 假設(shè)調(diào)用庫(kù)存更新方法
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
  • 積分系統(tǒng):
// 注冊(cè)消息監(jiān)聽(tīng)器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (Message msg : msgs) {
                String eventData = new String(msg.getBody());
                System.out.println("Points system received: " + eventData);

                // 處理用戶積分增加邏輯
                // updateUserPoints(eventData);  // 假設(shè)調(diào)用積分更新方法
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
  • 財(cái)務(wù)系統(tǒng):
// 注冊(cè)消息監(jiān)聽(tīng)器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (Message msg : msgs) {
                String eventData = new String(msg.getBody());
                System.out.println("Finance system received: " + eventData);

                // 處理財(cái)務(wù)記錄邏輯
                // recordPaymentTransaction(eventData);  // 假設(shè)調(diào)用財(cái)務(wù)記錄方法
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });


責(zé)任編輯:武曉燕 來(lái)源: 撿田螺的小男孩
相關(guān)推薦

2020-07-15 07:53:41

VSCode Task腳本命令

2023-11-20 13:52:00

Redis數(shù)據(jù)庫(kù)

2024-03-13 14:57:37

2009-03-27 10:25:24

OracleDBA職責(zé)

2022-05-31 08:21:07

MQ使用場(chǎng)景消費(fèi)消息

2023-02-02 09:37:59

消息隊(duì)列MQ

2024-11-27 08:15:50

2024-12-11 08:20:57

設(shè)計(jì)模式源碼

2011-07-30 13:01:23

2023-01-05 13:36:41

Script優(yōu)化任務(wù)

2022-07-29 07:48:15

HTTP常用狀態(tài)碼

2025-02-11 09:49:12

2023-05-16 07:47:18

RabbitMQ消息隊(duì)列系統(tǒng)

2019-12-23 08:48:24

Java技術(shù)全局變量

2020-02-14 13:50:32

JavaScript前端技術(shù)

2017-11-24 12:35:14

數(shù)據(jù)科學(xué)統(tǒng)計(jì)學(xué)習(xí)機(jī)器學(xué)習(xí)

2018-04-09 12:44:45

Docker使用場(chǎng)景開(kāi)發(fā)

2015-01-06 09:48:34

Docker多租戶docker應(yīng)用

2024-10-29 09:42:50

2018-12-03 12:20:52

Systemd定時(shí)器Linux
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)