自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

RabbitMQ如何實(shí)現(xiàn)延遲任務(wù)?

開(kāi)發(fā) 前端
實(shí)現(xiàn) RabbitMQ 延遲隊(duì)列目前主流的實(shí)現(xiàn)方式,是采用官方提供的延遲插件來(lái)實(shí)現(xiàn)。而延遲插件需要先下載插件、然后配置并重啟 RabbitMQ 服務(wù),之后就可以通過(guò)編寫代碼的方式實(shí)現(xiàn)延遲隊(duì)列了。

延遲任務(wù)是指當(dāng)消息被發(fā)送以后,并不是立即執(zhí)行,而是等待特定的時(shí)間后,消費(fèi)者才會(huì)執(zhí)行該消息。延遲隊(duì)列的使用場(chǎng)景有以下幾種:

  • 未按時(shí)支付的訂單,30 分鐘過(guò)期之后取消訂單。
  • 給活躍度比較低的用戶間隔 N 天之后推送消息,提高活躍度。
  • 新注冊(cè)會(huì)員的用戶,等待幾分鐘之后發(fā)送歡迎郵件等。

1.如何實(shí)現(xiàn)延遲隊(duì)列?

延遲隊(duì)列有以下兩種實(shí)現(xiàn)方式:

  • 通過(guò)消息過(guò)期后進(jìn)入死信交換器,再由交換器轉(zhuǎn)發(fā)到延遲消費(fèi)隊(duì)列,實(shí)現(xiàn)延遲功能;
  • 使用官方提供的延遲插件實(shí)現(xiàn)延遲功能。

早期,大部分公司都會(huì)采用第一種方式,但因?yàn)榈谝环N方式存在“隊(duì)頭阻塞”問(wèn)題,所以在生產(chǎn)環(huán)境我們通常會(huì)使用 RabbitMQ 3.5.7(2015 年底發(fā)布)的延遲插件來(lái)實(shí)現(xiàn),它的使用更簡(jiǎn)單、更方便,所以本文主要講第二種實(shí)習(xí)方式。

2.實(shí)現(xiàn)延遲隊(duì)列

2.1 安裝并啟動(dòng)延遲隊(duì)列

2.1.1 下載延遲插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

注意:需要根據(jù)你自己的 RabbitMQ 服務(wù)器端版本選擇相同版本的延遲插件,可以在 RabbitMQ 控制臺(tái)查看:

圖片圖片

圖片圖片

2.1.2 將插件放到插件目錄

接下來(lái),將上一步下載的插件放到 RabbitMQ 服務(wù)器安裝目錄,如果是 docker,使用一下命令復(fù)制:

docker cp 宿主機(jī)文件 容器名稱或ID:容器目錄

如下圖所示:

圖片圖片

之后,進(jìn)入 docker 容器,查看插件中是否包含延遲隊(duì)列:

docker exec -it 容器名稱或ID /bin/bash rabbitmq-plugins list

如下圖所示:

圖片圖片

2.1.3 啟動(dòng)插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

如下圖所示:

圖片圖片

2.1.4 重啟RabbitMQ服務(wù)

安裝完 RabbitMQ 插件之后,需要重啟 RabbitMQ 服務(wù)才能生效。如果使用的是 Docker,只需要重啟 Docker 容器即可:

docker restart 容器名稱或ID

如下圖所示:

圖片圖片

2.1.5 驗(yàn)收結(jié)果

在 RabbitMQ 控制臺(tái)查看,新建交換機(jī)時(shí)是否有延遲消息選項(xiàng),如果有就說(shuō)明延遲消息插件已經(jīng)正常運(yùn)行了,如下圖所示:

圖片圖片

2.1.6 手動(dòng)創(chuàng)建延遲交換器(可選)

此步驟可選(非必須),因?yàn)槟承┌姹鞠峦ㄟ^(guò)程序創(chuàng)建延遲交換器可能會(huì)出錯(cuò),如果出錯(cuò)了,手動(dòng)創(chuàng)建延遲隊(duì)列即可,如下圖所示:

圖片圖片

2.2 編寫延遲消息實(shí)現(xiàn)代碼

2.2.1 配置交換器和隊(duì)列

import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;

/**
 * 延遲交換器和隊(duì)列
 */
@Configuration
public class DelayedExchangeConfig {
    public static final String EXCHANGE_NAME = "myDelayedExchange";
    public static final String QUEUE_NAME = "delayed.queue";
    public static final String ROUTING_KEY = "delayed.routing.key";

    @Bean
    public CustomExchange delayedExchange() {
        return new CustomExchange(EXCHANGE_NAME,
                "x-delayed-message", // 消息類型
                true, // 是否持久化
                false); // 是否自動(dòng)刪除
    }

    @Bean
    public Queue delayedQueue() {
        return QueueBuilder.durable(QUEUE_NAME)
                .withArgument("x-delayed-type", "direct")
                .build();
    }

    @Bean
    public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
    }
}

2.1.2 定義消息發(fā)送方法

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class DelayedMessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Scheduled(fixedDelay = 5000)
    public void sendDelayedMessage(String message) {
        rabbitTemplate.convertAndSend(DelayedExchangeConfig.EXCHANGE_NAME,
                DelayedExchangeConfig.ROUTING_KEY,
                message,
                messagePostProcessor -> {
                    messagePostProcessor.getMessageProperties().setDelay(10000); // 設(shè)置延遲時(shí)間,單位毫秒
                    return messagePostProcessor;
                });
    }
}

2.1.3 發(fā)送延遲消息

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/delayed")
public class DelayedMessageController {
    @Autowired
    private DelayedMessageProducer delayedMessageProducer;

    @GetMapping("/send")
    public String sendDirectMessage(@RequestParam String message) {
        delayedMessageProducer.sendDelayedMessage(message);
        return "Delayed message sent to Exchange: " + message;
    }
}

2.1.4 接收延遲消息

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class DelayedMessageConsumer {

    @RabbitListener(queues = DelayedExchangeConfig.QUEUE_NAME)
    public void receiveDelayedMessage(String message) {
        System.out.println("Received delayed message: " + message);
    }
}

PS:防止失聯(lián),請(qǐng)加我:vipStone

小結(jié)

實(shí)現(xiàn) RabbitMQ 延遲隊(duì)列目前主流的實(shí)現(xiàn)方式,是采用官方提供的延遲插件來(lái)實(shí)現(xiàn)。而延遲插件需要先下載插件、然后配置并重啟 RabbitMQ 服務(wù),之后就可以通過(guò)編寫代碼的方式實(shí)現(xiàn)延遲隊(duì)列了。


責(zé)任編輯:武曉燕 來(lái)源: 磊哥和Java
相關(guān)推薦

2023-09-05 15:48:14

RabbitMQ延遲隊(duì)列

2024-12-17 15:39:33

2024-04-28 08:52:33

RabbitMQ延遲隊(duì)列延遲插件

2019-02-25 15:44:16

開(kāi)源RabbitMQSpring Clou

2024-01-26 13:16:00

RabbitMQ延遲隊(duì)列docker

2024-10-22 16:39:07

2024-04-09 10:40:04

2024-04-19 00:47:07

RabbitMQ消息機(jī)制

2021-12-08 10:47:35

RabbitMQ 實(shí)現(xiàn)延遲

2023-10-23 10:02:58

RabbitMQ延遲隊(duì)列

2024-10-16 09:29:30

RabbitMQ延遲隊(duì)列

2023-10-10 13:39:53

Spring隊(duì)列優(yōu)化

2024-11-05 16:58:21

RabbitMQ訂單超時(shí)取消延遲隊(duì)列

2021-10-15 10:39:43

RabbitMQ隊(duì)列延遲

2022-05-31 09:36:18

JDKDelayQueueRedis

2023-02-27 22:03:06

數(shù)據(jù)庫(kù)內(nèi)存RocketMQ

2022-08-02 11:27:25

RabbitMQ消息路由

2022-06-27 23:49:21

數(shù)據(jù)倉(cāng)庫(kù)資源不足集群

2023-08-08 08:28:03

消息消費(fèi)端Spring

2024-11-04 16:01:01

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)