自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Redis List 是否適合做消息隊(duì)列?Spring Boot 與 Redission 實(shí)現(xiàn) Redis 消息隊(duì)列!

數(shù)據(jù)庫(kù) Redis
我將結(jié)合消息隊(duì)列的特點(diǎn),分析使用 Redis 的 List 作為消息隊(duì)列的實(shí)現(xiàn)原理,并分享如何把 SpringBoot 與 Redission 整合來(lái)操作 Redis 運(yùn)用到項(xiàng)目中。

分布式系統(tǒng)中必備的一個(gè)中間件就是消息隊(duì)列,通過消息隊(duì)列你能對(duì)服務(wù)間進(jìn)行異步解耦、流量消峰、實(shí)現(xiàn)最終一致性。

目前市面上已經(jīng)有 RabbitMQ、RochetMQ、ActiveMQ、Kafka等,有人會(huì)問:“Redis 適合做消息隊(duì)列么?”

在回答這個(gè)問題之前,你先從本質(zhì)思考。

  • 消息隊(duì)列提供了什么特性?
  • Redis 如何實(shí)現(xiàn)消息隊(duì)列?是否滿足存取需求?

我將結(jié)合消息隊(duì)列的特點(diǎn),分析使用 Redis 的 List 作為消息隊(duì)列的實(shí)現(xiàn)原理,并分享如何把 SpringBoot 與 Redission 整合來(lái)操作 Redis 運(yùn)用到項(xiàng)目中。

什么是消息隊(duì)列

消息隊(duì)列是一種異步的服務(wù)間通信方式,適用于分布式和微服務(wù)架構(gòu)。消息在被處理和刪除之前一直存儲(chǔ)在隊(duì)列上。

每條消息僅可被一位用戶處理一次。消息隊(duì)列可被用于分離重量級(jí)處理、緩沖或批處理工作以及緩解高峰期工作負(fù)載。

  • Producer:消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生和發(fā)送消息到 Broker;
  • Broker:消息處理中心。負(fù)責(zé)消息存儲(chǔ)、確認(rèn)、重試等,一般其中會(huì)包含多個(gè) queue;
  • Consumer:消息消費(fèi)者,負(fù)責(zé)從 Broker 中獲取消息,并進(jìn)行相應(yīng)處理;

MySQL:“消息隊(duì)列的使用場(chǎng)景有哪些呢?“

消息隊(duì)列在實(shí)際應(yīng)用中包括如下四個(gè)場(chǎng)景。

  • 應(yīng)用耦合:發(fā)送方、接收方系統(tǒng)之間不需要了解雙方,只需要認(rèn)識(shí)消息。多應(yīng)用間通過消息隊(duì)列對(duì)同一消息進(jìn)行處理,避免調(diào)用接口失敗導(dǎo)致整個(gè)過程失敗。
  • 異步處理:多應(yīng)用對(duì)消息隊(duì)列中同一消息進(jìn)行處理,應(yīng)用間并發(fā)處理消息,相比串行處理,減少處理時(shí)間。
  • 限流削峰:廣泛應(yīng)用于秒殺或搶購(gòu)活動(dòng)中,避免流量過大導(dǎo)致應(yīng)用系統(tǒng)掛掉的情況。
  • 消息驅(qū)動(dòng)的系統(tǒng):系統(tǒng)分為消息隊(duì)列、消息生產(chǎn)者、消息消費(fèi)者,生產(chǎn)者負(fù)責(zé)產(chǎn)生消息,消費(fèi)者(可能有多個(gè))負(fù)責(zé)對(duì)消息進(jìn)行處理。

消息隊(duì)列滿足哪些特性

消息有序性

消息是異步處理的,但是消費(fèi)者需要按照生產(chǎn)者發(fā)送消息的順序來(lái)消費(fèi),避免出現(xiàn)后發(fā)送的消息被先處理的情況。

重復(fù)消息處理

生產(chǎn)者可能因?yàn)榫W(wǎng)絡(luò)問題出現(xiàn)消息重傳導(dǎo)致消費(fèi)者可能會(huì)收到多條重復(fù)消息。

同樣的消息重復(fù)多次的話可能會(huì)造成一業(yè)務(wù)邏輯多次執(zhí)行,需要確保如何避免重復(fù)消費(fèi)問題。

可靠性

一次保證消息的傳遞。如果發(fā)送消息時(shí)接收者不可用,消息隊(duì)列會(huì)保留消息,直到成功地傳遞它。

當(dāng)消費(fèi)者重啟后,可以繼續(xù)讀取消息進(jìn)行處理,防止消息遺漏。

LPUSH

生產(chǎn)者使用 LPUSH key element[element...] 將消息插入到隊(duì)列的頭部,如果 key 不存在則會(huì)創(chuàng)建一個(gè)空的隊(duì)列再插入消息。

如下,生產(chǎn)者向隊(duì)列 queue 先后插入了 “Java”、“碼哥字節(jié)”、“Go”,返回值表示消息插入隊(duì)列后的個(gè)數(shù)。

> LPUSH queue Java 碼哥字節(jié) Go
(integer) 3

MySQL:“如果生產(chǎn)者消息發(fā)送很快,消費(fèi)者處理不過來(lái),會(huì)導(dǎo)致消息積壓,占用過多的 Redis 內(nèi)存?!?/p>

確實(shí),List 并沒有提供類似于 Kafka 的 ConsumeGroup ,會(huì)使用多個(gè)消費(fèi)者策劃給你續(xù)組成一個(gè)消費(fèi)組來(lái)分擔(dān)處理隊(duì)列消息。不過在 Redis 5.0 之后,提供了 Streams 數(shù)據(jù)類型,后面我會(huì)介紹到。

RPOP

消費(fèi)者使用 RPOP key 依次讀取隊(duì)列的消息,先進(jìn)先出,所以 “Java”會(huì)先讀取消費(fèi):

> RPOP queue
"Java"
> RPOP queue
"碼哥字節(jié)"
> RPOP queue
"Go"

圖2-13

實(shí)時(shí)消費(fèi)問題

謝霸戈:“這么簡(jiǎn)單就實(shí)現(xiàn)了?”

別高興的太早,LPUSH、RPOP 存在一個(gè)性能風(fēng)險(xiǎn),生產(chǎn)者向隊(duì)列插入數(shù)據(jù)的時(shí)候,List 并不會(huì)主動(dòng)通知消費(fèi)者及時(shí)消費(fèi)。

謝霸戈:“那我寫一個(gè) while(true) 不停地調(diào)用 RPOP 指令,當(dāng)有新消息就消費(fèi)“

程序需要不斷輪詢并判斷是否為空再執(zhí)行消費(fèi)邏輯,這就會(huì)導(dǎo)致即使沒有新消息寫入隊(duì)列,消費(fèi)者也在不停地調(diào)用 RPOP 命令占用 CPU 資源。

謝霸戈:“如何避免循環(huán)調(diào)用導(dǎo)致的 CPU 性能損耗呢?”

請(qǐng)叫我貼心哥 Redis,我提供了 BLPOP、BRPOP 阻塞讀取的命令,消費(fèi)者在讀取隊(duì)列沒有數(shù)據(jù)的時(shí)候自動(dòng)阻塞,直到有新的消息寫入隊(duì)列,才會(huì)繼續(xù)讀取新消息執(zhí)行業(yè)務(wù)邏輯。

BRPOP queue 0

參數(shù) 0 表示阻塞等待時(shí)間無(wú)止期,哪怕是煙花易冷人事易分,雨紛紛舊故里草木深,斑駁的城門盤踞著老樹根,石板上回蕩的是再等,一直等到“心上人”來(lái)。

重復(fù)消費(fèi)解決方案

  • 消息隊(duì)列為自動(dòng)每一條消息生成一個(gè)全局 ID;
  • 生產(chǎn)者為每一條消息創(chuàng)建一個(gè)全局 ID,消費(fèi)者把處理過的消息 ID 記錄下來(lái)判斷是否重復(fù)。

其實(shí)這就是冪等,對(duì)于同一條消息,消費(fèi)者收到后處理一次的結(jié)果和多次的結(jié)果是一致的。

消息可靠性解決方案

謝霸戈:“消費(fèi)者讀取消息,處理過程中宕機(jī)了就會(huì)導(dǎo)致消息沒有處理完成,可是數(shù)據(jù)已經(jīng)不在隊(duì)列中了咋辦?”

本質(zhì)就是消費(fèi)者在處理消息的時(shí)候崩潰了,無(wú)法再讀取消息,缺乏一個(gè)消息確認(rèn)可靠機(jī)制。

我提供了 BRPOPLPUSH source destination timeout指令,含義是阻塞的方式從 source 隊(duì)列讀取消息的同時(shí)把這條消息復(fù)制到另一個(gè) destination 隊(duì)列中(備份),并且是原子操作。

不過這個(gè)指令在 6.2 版本被 BLMOVE 取代。接下來(lái),上才藝!生產(chǎn)者使用 LPUSH 把消息依次從存入 order:pay 隊(duì)列隊(duì)頭(左端)。

LPUSH order:pay "謝霸戈"
LPUSH order:pay "肖材吉"

消費(fèi)者消費(fèi)消息的時(shí)候在 while循環(huán)使用BLMOVE 以阻塞的方式從隊(duì)列 order:pay 隊(duì)尾(右端)彈出消息“謝霸戈”,同時(shí)把該消息復(fù)制到隊(duì)列 order:pay:back 隊(duì)頭(左端),該操作是原子性的,最后一個(gè)參數(shù) timeout = 0 表示持續(xù)等待。

BLMOVE order:pay order:pay:back RIGHT LEFT 0

如果消費(fèi)消息“謝霸戈”成功,那就使用 LREM 把隊(duì)列 order:pay:back 的“謝霸戈”消息刪除,從而實(shí)現(xiàn) ACK 確認(rèn)機(jī)制。

LREM order:pay:back 0 "謝霸戈"

倒數(shù)第二個(gè)參數(shù) count 的含義如下。

  • count > 0,從表頭(左端)向表尾(右端),依次刪除 count 個(gè) value。
  • count < 0,從表尾(右端)向表頭(左端),依次刪除 count 絕對(duì)值個(gè) value。
  • count = 0,刪除所有的 value。

消費(fèi)異常的話,應(yīng)用程序使用 BRPOP order:pay:back 從備份隊(duì)列再次讀取消息處理即可。

Redisson 實(shí)戰(zhàn)

在 Java 中,你可以利用 Redission 封裝的 API 來(lái)快速實(shí)現(xiàn)隊(duì)列,接下來(lái)我將基于 SpringBoot 2.1.4 版本來(lái)教你如何整合 Redisson。

添加依賴

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>3.23.3</version>
</dependency>

application.yaml引入 Redisson 配置文件。

spring:
  application:
    name: redission
  redis:
    redisson:
      file: classpath:redisson-config.yaml

創(chuàng)建 redisson-config.yaml 配置。

singleServerConfig:
  idleConnectionTimeout: 10000
  connectTimeout: 10000
  timeout: 3000
  retryAttempts: 3
  retryInterval: 1500
  password: magebyte
  subscriptionsPerConnection: 5
  clientName: redissonClient
  address: "redis://127.0.0.1:6379"
  subscriptionConnectionMinimumIdleSize: 1
  subscriptionConnectionPoolSize: 50
  connectionMinimumIdleSize: 24
  connectionPoolSize: 64
  database: 0
  dnsMonitoringInterval: 5000
threads: 16
nettyThreads: 32
codec: !<org.redisson.codec.Kryo5Codec> {}
transportMode: "NIO"

在代碼中,我使用的是阻塞雙端隊(duì)列,消費(fèi)者開啟死循環(huán),執(zhí)行 BLMOVE 指令。

@Slf4j
@Service
public class QueueService {

    @Autowired
    private RedissonClient redissonClient;

    private static final String ORDER_PAY_QUEUE = "order:pay";
    private static final String ORDER_PAY_BACK_QUEUE = "order:pay:back";

    /**
     * 生產(chǎn)者發(fā)送消息到隊(duì)列頭部
     *
     * @param message
     */
    public void sendMessage(String message) {
        RBlockingDeque<String> orderPayQueue = redissonClient.getBlockingDeque(ORDER_PAY_QUEUE);

        try {
            orderPayQueue.putFirst(message);
            log.info("將消息: {} 插入到隊(duì)列 {}。", message, ORDER_PAY_QUEUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 消費(fèi)者消費(fèi)消息
     */
    public void onMessage() {

        RBlockingDeque<String> orderPayQueue = redissonClient.getBlockingDeque(ORDER_PAY_QUEUE);
        while (true) {
            // BLMOVE order:pay order:pay:back RIGHT LEFT 0
            String message = orderPayQueue.move(Duration.ofSeconds(0), DequeMoveArgs.pollLast()
                    .addFirstTo(ORDER_PAY_BACK_QUEUE));
            log.info("從隊(duì)列 {} 中讀取到消息:{},并把消息復(fù)制到 {} 隊(duì)列.", ORDER_PAY_QUEUE, message, ORDER_PAY_BACK_QUEUE);

            // 消費(fèi)正常,從 ORDER_PAY_BACK_QUEUE 刪除這條消息,LREM order:pay:back 0 message
            removeBackQueueMessage(message, ORDER_PAY_BACK_QUEUE);
        }
    }

    /**
     * 從隊(duì)列中刪除消息
     * @param message
     * @param queueName
     */
    private void removeBackQueueMessage(String message, String queueName) {
        RBlockingDeque<String> orderPayBackDeque = redissonClient.getBlockingDeque(queueName);
        boolean remove = orderPayBackDeque.remove(message);
        log.info("消費(fèi)正常,刪除隊(duì)列 {} 的消息 {}。", queueName, message);
    }
}

單元測(cè)試

RunWith(SpringRunner.class)
@SpringBootTest(classes = RedissionApplication.class)
public class RedissionApplicationTests {

    @Autowired
    private QueueService queueService;

    @Test
    public void testQueue() throws InterruptedException {
        new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                queueService.sendMessage("消息" + i);
            }
        }).start();

        new Thread(() -> queueService.onMessage()).start();

        Thread.currentThread().join();
    }

}

總結(jié)

可以使用 List 數(shù)據(jù)結(jié)構(gòu)來(lái)實(shí)現(xiàn)消息隊(duì)列,滿足先進(jìn)先出。

Redis 是一個(gè)非常輕量級(jí)的鍵值數(shù)據(jù)庫(kù),部署一個(gè) Redis 實(shí)例就是啟動(dòng)一個(gè)進(jìn)程,部署 Redis 集群,也就是部署多個(gè) Redis 實(shí)例。

而 Kafka、RabbitMQ 部署時(shí),涉及額外的組件,例如 Kafka 的運(yùn)行就需要再部署 ZooKeeper。相比 Redis 來(lái)說,Kafka 和 RabbitMQ 一般被認(rèn)為是重量級(jí)的消息隊(duì)列。

需要注意的是,我們要避免生產(chǎn)者過快,消費(fèi)者過慢導(dǎo)致的消息堆積占用 Redis 的內(nèi)存。

在消息量不大的情況下使用 Redis 作為消息隊(duì)列,他能給我們帶來(lái)高性能的消息讀寫,這似乎也是一個(gè)很好消息隊(duì)列解決方案。

責(zé)任編輯:姜華 來(lái)源: 碼哥跳動(dòng)
相關(guān)推薦

2022-01-21 19:22:45

RedisList命令

2022-01-15 07:20:18

Redis List 消息隊(duì)列

2024-03-22 12:10:39

Redis消息隊(duì)列數(shù)據(jù)庫(kù)

2022-02-28 08:42:49

RedisStream消息隊(duì)列

2023-12-30 13:47:48

Redis消息隊(duì)列機(jī)制

2022-04-12 11:15:31

Redis消息隊(duì)列數(shù)據(jù)庫(kù)

2024-04-19 08:32:07

Redis緩存數(shù)據(jù)庫(kù)

2017-10-11 15:08:28

消息隊(duì)列常見

2023-09-12 14:58:00

Redis

2021-01-12 08:43:29

Redis ListStreams

2024-09-11 14:57:00

Redis消費(fèi)線程模型

2022-06-09 08:36:56

高性能Disruptor模式

2023-07-10 09:18:39

Redis訂閱模型

2020-01-14 15:08:44

Redis5Streams數(shù)據(jù)庫(kù)

2018-03-29 08:38:10

2021-09-16 10:29:05

開發(fā)技能代碼

2023-11-13 08:37:33

消息中間件分布式架構(gòu)

2017-04-27 10:07:52

框架設(shè)計(jì)實(shí)現(xiàn)

2010-04-21 12:39:48

Unix 消息隊(duì)列

2017-02-27 14:25:50

Java隊(duì)列Web
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)