自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Redis 使用 List 實現(xiàn)消息隊列的利與弊

開發(fā) 前端
今天,碼哥結(jié)合消息隊列的特點一步步帶大家分析使用 Redis 的 List 作為消息隊列的實現(xiàn)原理,并分享如何把 SpringBoot 與 Redission 整合運用到項目中。

 分布式系統(tǒng)中必備的一個中間件就是消息隊列,通過消息隊列我們能對服務(wù)間進行異步解耦、流量消峰、實現(xiàn)最終一致性。

目前市面上已經(jīng)有 RabbitMQ、RochetMQ、ActiveMQ、Kafka等,有人會問:“Redis 適合做消息隊列么?”

在回答這個問題之前,我們先從本質(zhì)思考:

  • 消息隊列提供了什么特性?
  • Redis 如何實現(xiàn)消息隊列?是否滿足存取需求?

今天,碼哥結(jié)合消息隊列的特點一步步帶大家分析使用 Redis 的 List 作為消息隊列的實現(xiàn)原理,并分享如何把 SpringBoot 與 Redission 整合運用到項目中。

什么是消息隊列

消息隊列是一種異步的服務(wù)間通信方式,適用于分布式和微服務(wù)架構(gòu)。消息在被處理和刪除之前一直存儲在隊列上。

每條消息僅可被一位用戶處理一次。消息隊列可被用于分離重量級處理、緩沖或批處理工作以及緩解高峰期工作負(fù)載。

消息隊列

  • Producer:消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生和發(fā)送消息到 Broker;
  • Broker:消息處理中心。負(fù)責(zé)消息存儲、確認(rèn)、重試等,一般其中會包含多個 queue;
  • Consumer:消息消費者,負(fù)責(zé)從 Broker 中獲取消息,并進行相應(yīng)處理;

消息隊列的使用場景有哪些呢?

消息隊列在實際應(yīng)用中包括如下四個場景:

  • 應(yīng)用耦合:發(fā)送方、接收方系統(tǒng)之間不需要了解雙方,只需要認(rèn)識消息。多應(yīng)用間通過消息隊列對同一消息進行處理,避免調(diào)用接口失敗導(dǎo)致整個過程失敗;
  • 異步處理:多應(yīng)用對消息隊列中同一消息進行處理,應(yīng)用間并發(fā)處理消息,相比串行處理,減少處理時間;
  • 限流削峰:廣泛應(yīng)用于秒殺或搶購活動中,避免流量過大導(dǎo)致應(yīng)用系統(tǒng)掛掉的情況;
  • 消息驅(qū)動的系統(tǒng):系統(tǒng)分為消息隊列、消息生產(chǎn)者、消息消費者,生產(chǎn)者負(fù)責(zé)產(chǎn)生消息,消費者(可能有多個)負(fù)責(zé)對消息進行處理;

消息隊列滿足哪些特性

消息有序性

消息是異步處理的,但是消費者需要按照生產(chǎn)者發(fā)送消息的順序來消費,避免出現(xiàn)后發(fā)送的消息被先處理的情況。

重復(fù)消息處理

生產(chǎn)者可能因為網(wǎng)絡(luò)問題出現(xiàn)消息重傳導(dǎo)致消費者可能會收到多條重復(fù)消息。

同樣的消息重復(fù)多次的話可能會造成一業(yè)務(wù)邏輯多次執(zhí)行,需要確保如何避免重復(fù)消費問題。

可靠性

一次保證消息的傳遞。如果發(fā)送消息時接收者不可用,消息隊列會保留消息,直到成功地傳遞它。

當(dāng)消費者重啟后,可以繼續(xù)讀取消息進行處理,防止消息遺漏。

List 實現(xiàn)消息隊列

Redis 的列表(List)是一種線性的有序結(jié)構(gòu),可以按照元素被推入列表中的順序來存儲元素,能滿足「先進先出」的需求,這些元素既可以是文字?jǐn)?shù)據(jù),又可以是二進制數(shù)據(jù)。

LPUSH

生產(chǎn)者使用 LPUSH key element[element...] 將消息插入到隊列的頭部,如果 key 不存在則會創(chuàng)建一個空的隊列再插入消息。

如下,生產(chǎn)者向隊列 queue 先后插入了 「Java」「碼哥字節(jié)」「Go」,返回值表示消息插入隊列后的個數(shù)。

  1. > LPUSH queue Java 碼哥字節(jié) Go 
  2. (integer) 3 

RPOP

消費者使用 RPOP key 依次讀取隊列的消息,先進先出,所以 「Java」會先讀取消費:

  1. > RPOP queue 
  2. "Java" 
  3. > RPOP queue 
  4. "碼哥字節(jié)" 
  5. > RPOP queue 
  6. "Go" 

List隊列

實時消費問題

65 哥:這么簡單就實現(xiàn)了么?

別高興的太早,LPUSH、RPOP 存在一個性能風(fēng)險,生產(chǎn)者向隊列插入數(shù)據(jù)的時候,List 并不會主動通知消費者及時消費。

我們需要寫一個 while(true) 不停地調(diào)用 RPOP 指令,當(dāng)有新消息就會返回消息,否則返回空。

程序需要不斷輪詢并判斷是否為空再執(zhí)行消費邏輯,這就會導(dǎo)致即使沒有新消息寫入到隊列,消費者也要不停地調(diào)用 RPOP 命令占用 CPU 資源。

65 哥:要如何避免循環(huán)調(diào)用導(dǎo)致的 CPU 性能損耗呢?

Redis 提供了 BLPOP、BRPOP 阻塞讀取的命令,消費者在在讀取隊列沒有數(shù)據(jù)的時候自動阻塞,直到有新的消息寫入隊列,才會繼續(xù)讀取新消息執(zhí)行業(yè)務(wù)邏輯。

  1. BRPOP queue 0 

參數(shù) 0 表示阻塞等待時間無無限制

重復(fù)消費

  • 消息隊列為每一條消息生成一個「全局 ID」;
  • 生產(chǎn)者為每一條消息創(chuàng)建一條「全局 ID」,消費者把一件處理過的消息 ID 記錄下來判斷是否重復(fù)。

其實這就是冪等,對于同一條消息,消費者收到后處理一次的結(jié)果和多次的結(jié)果是一致的。

消息可靠性

65 哥:消費者從 List 中讀取一條在消息處理過程中宕機了就會導(dǎo)致消息沒有處理完成,可是數(shù)據(jù)已經(jīng)沒有保存在 List 中了咋辦?

本質(zhì)就是消費者在處理消息的時候崩潰了,就無法再還原消息,缺乏一個消息確認(rèn)機制。

Redis 提供了 RPOPLPUSH、BRPOPLPUSH(阻塞)兩個指令,含義是從 List 從讀取消息的同時把這條消息復(fù)制到另一個 List 中(備份),并且是原子操作。

我們就可以在業(yè)務(wù)流程正確處理完成后再刪除隊列消息實現(xiàn)消息確認(rèn)機制。如果在處理消息的時候宕機了,重啟后再從備份 List 中讀取消息處理。

  1. LPUSH redisMQ 公眾號 碼哥字節(jié) 
  2. BRPOPLPUSH redisMQ redisMQBack 

生產(chǎn)者用 LPUSH 把消息插入到 redisMQ 隊列中,消費者使用 BRPOPLPUSH 讀取消息「公眾號」,同時該消息會被插入到 「redisMQBack」隊列中。

如果消費成功則把「redisMQBack」的消息刪除即可,異常的話可以繼續(xù)從 「redisMQBack」再次讀取消息處理。

redis消息確認(rèn)機制

需要注意的是,如果生產(chǎn)者消息發(fā)送的很快,而消費者處理速度慢就會導(dǎo)致消息堆積,給 Redis 的內(nèi)存帶來過大壓力。

Redission 實戰(zhàn)

在 Java 中,我們可以利用 Redission 封裝的 API 來快速實現(xiàn)隊列,接下來碼哥基于 SpringBoot 2.1.4 版本來交大家如何整合并實戰(zhàn)。

詳細(xì) API 文檔大家可查閱:https://github.com/redisson/redisson/wiki/7.-Distributed-collections

添加依賴

  1. <dependency> 
  2.   <groupId>org.redisson</groupId> 
  3.   <artifactId>redisson-spring-boot-starter</artifactId> 
  4.   <version>3.16.7</version> 
  5. </dependency> 

添加 Redis 配置,碼哥的 Redis 沒有配置密碼,大家根據(jù)實際情況配置即可。

  1. spring: 
  2.   application: 
  3.     name: redission 
  4.   redis: 
  5.     host: 127.0.0.1 
  6.     port: 6379 
  7.     ssl: false 

Java 代碼實戰(zhàn)

RBlockingDeque 繼承 java.util.concurrent.BlockingDeque ,在使用過程中我們完全可以根據(jù)接口文檔來選擇合適的 API 去實現(xiàn)業(yè)務(wù)邏輯。

主要方法如下

碼哥采用了雙端隊列來舉例

  1. @Slf4j 
  2. @Service 
  3. public class QueueService { 
  4.  
  5.     @Autowired 
  6.     private RedissonClient redissonClient; 
  7.  
  8.     private static final String REDIS_MQ = "redisMQ"
  9.  
  10.     /** 
  11.      * 發(fā)送消息到隊列頭部 
  12.      * 
  13.      * @param message 
  14.      */ 
  15.     public void sendMessage(String message) { 
  16.         RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDIS_MQ); 
  17.  
  18.         try { 
  19.             blockingDeque.putFirst(message); 
  20.             log.info("將消息: {} 插入到隊列。", message); 
  21.         } catch (InterruptedException e) { 
  22.             e.printStackTrace(); 
  23.         } 
  24.     } 
  25.  
  26.     /** 
  27.      * 從隊列尾部阻塞讀取消息,若沒有消息,線程就會阻塞等待新消息插入,防止 CPU 空轉(zhuǎn) 
  28.      */ 
  29.     public void onMessage() { 
  30.         RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDIS_MQ); 
  31.         while (true) { 
  32.             try { 
  33.                 String message = blockingDeque.takeLast(); 
  34.                 log.info("從隊列 {} 中讀取到消息:{}.", REDIS_MQ, message); 
  35.             } catch (InterruptedException e) { 
  36.                 e.printStackTrace(); 
  37.             } 
  38.  
  39.         } 
  40.     } 

單元測試

  1. @RunWith(SpringRunner.class) 
  2. @SpringBootTest(classes = RedissionApplication.class) 
  3. public class RedissionApplicationTests { 
  4.  
  5.     @Autowired 
  6.     private QueueService queueService; 
  7.  
  8.     @Test 
  9.     public void testQueue() throws InterruptedException { 
  10.         new Thread(() -> { 
  11.             for (int i = 0; i < 1000; i++) { 
  12.                 queueService.sendMessage("消息" + i); 
  13.             } 
  14.         }).start(); 
  15.  
  16.         new Thread(() -> queueService.onMessage()).start(); 
  17.  
  18.         Thread.currentThread().join(); 
  19.     } 
  20.  
  21.  

總結(jié)

可以使用 List 數(shù)據(jù)結(jié)構(gòu)來實現(xiàn)消息隊列,滿足先進先出。為了實現(xiàn)消息可靠性,Redis 提供了 BRPOPLPUSH 命令是解決。

Redis 是一個非常輕量級的鍵值數(shù)據(jù)庫,部署一個 Redis 實例就是啟動一個進程,部署 Redis 集群,也就是部署多個 Redis 實例。

而 Kafka、RabbitMQ 部署時,涉及額外的組件,例如 Kafka 的運行就需要再部署 ZooKeeper。相比 Redis 來說,Kafka 和 RabbitMQ 一般被認(rèn)為是重量級的消息隊列。

需要注意的是,我們要避免生產(chǎn)者過快,消費者過慢導(dǎo)致的消息堆積占用 Redis 的內(nèi)存。

在消息量不大的情況下使用 Redis 作為消息隊列,他能給我們帶來高性能的消息讀寫,這似乎也是一個很好消息隊列解決方案。

本文轉(zhuǎn)載自微信公眾號「碼哥字節(jié)」

 

責(zé)任編輯:姜華 來源: 碼哥字節(jié)
相關(guān)推薦

2022-01-21 19:22:45

RedisList命令

2024-03-22 12:10:39

Redis消息隊列數(shù)據(jù)庫

2024-10-25 08:41:18

消息隊列RedisList

2022-02-28 08:42:49

RedisStream消息隊列

2023-12-30 13:47:48

Redis消息隊列機制

2024-09-11 14:57:00

Redis消費線程模型

2021-01-12 08:43:29

Redis ListStreams

2022-04-12 11:15:31

Redis消息隊列數(shù)據(jù)庫

2022-06-28 08:37:07

分布式服務(wù)器WebSocket

2021-03-11 06:01:41

Linux消息隊列

2023-09-12 14:58:00

Redis

2024-05-16 08:10:17

RabbitMQ軟件通信機制

2018-03-29 08:38:10

2023-07-26 07:28:55

WebSocket服務(wù)器方案

2024-04-19 08:32:07

Redis緩存數(shù)據(jù)庫

2021-11-09 12:11:55

C# Redis隊列

2017-10-11 15:08:28

消息隊列常見

2024-05-08 14:49:22

Redis延遲隊列業(yè)務(wù)

2025-03-12 07:55:46

2010-09-07 15:08:25

CSS框架
點贊
收藏

51CTO技術(shù)棧公眾號