自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Redis 消息隊(duì)列的三種方案(List、Streams、Pub/Sub)

存儲(chǔ) 存儲(chǔ)軟件 Redis
現(xiàn)如今的互聯(lián)網(wǎng)應(yīng)用大都是采用 分布式系統(tǒng)架構(gòu) 設(shè)計(jì)的,所以 消息隊(duì)列 已經(jīng)逐漸成為企業(yè)應(yīng)用系統(tǒng) 內(nèi)部通信 的核心手段,它具有 低耦合、可靠投遞、廣播、流量控制、最終一致性 等一系列功能。

[[375172]]

本文轉(zhuǎn)載自微信公眾號(hào)「JavaKeeper」,作者海星 。轉(zhuǎn)載本文請(qǐng)聯(lián)系JavaKeeper公眾號(hào)。 

 現(xiàn)如今的互聯(lián)網(wǎng)應(yīng)用大都是采用 分布式系統(tǒng)架構(gòu) 設(shè)計(jì)的,所以 消息隊(duì)列 已經(jīng)逐漸成為企業(yè)應(yīng)用系統(tǒng) 內(nèi)部通信 的核心手段,它具有 低耦合、可靠投遞、廣播、流量控制、最終一致性 等一系列功能。

當(dāng)前使用較多的 消息隊(duì)列 有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等,而部分?jǐn)?shù)據(jù)庫 如 Redis、MySQL 以及 phxsql ,如果硬搞的話,其實(shí)也可實(shí)現(xiàn)消息隊(duì)列的功能。

可能有人覺得,各種開源的 MQ 已經(jīng)足夠使用了,為什么需要用 Redis 實(shí)現(xiàn) MQ 呢?

有些簡(jiǎn)單的業(yè)務(wù)場(chǎng)景,可能不需要重量級(jí)的 MQ 組件(相比 Redis 來說,Kafka 和 RabbitMQ 都算是重量級(jí)的消息隊(duì)列)

那你有考慮過用 Redis 做消息隊(duì)列嗎?

這一章,我會(huì)結(jié)合消息隊(duì)列的特點(diǎn)和 Redis 做消息隊(duì)列的使用方式,以及實(shí)際項(xiàng)目中的使用,來和大家探討下 Redis 消息隊(duì)列的方案。

一、回顧消息隊(duì)列

消息隊(duì)列 是指利用 高效可靠 的 消息傳遞機(jī)制 進(jìn)行與平臺(tái)無關(guān)的 數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進(jìn)行分布式系統(tǒng)的集成。

通過提供 消息傳遞 和 消息排隊(duì) 模型,它可以在 分布式環(huán)境 下提供 應(yīng)用解耦、彈性伸縮、冗余存儲(chǔ)、流量削峰、異步通信、數(shù)據(jù)同步 等等功能,其作為 分布式系統(tǒng)架構(gòu) 中的一個(gè)重要組件,有著舉足輕重的地位。

mq_overview

現(xiàn)在回顧下,我們使用的消息隊(duì)列,一般都有什么樣的特點(diǎn):

  • 三個(gè)角色:生產(chǎn)者、消費(fèi)者、消息處理中心
  • 異步處理模式:生產(chǎn)者 將消息發(fā)送到一條 虛擬的通道(消息隊(duì)列)上,而無須等待響應(yīng)。消費(fèi)者 則 訂閱 或是 監(jiān)聽 該通道,取出消息。兩者互不干擾,甚至都不需要同時(shí)在線,也就是我們說的 松耦合
  • 可靠性:消息要可以保證不丟失、不重復(fù)消費(fèi)、有時(shí)可能還需要順序性的保證

撇開我們常用的消息中間件不說,你覺得 Redis 的哪些數(shù)據(jù)類型可以滿足 MQ 的常規(guī)需求~~

二、Redis 實(shí)現(xiàn)消息隊(duì)列

思來想去,只有 List 和 Streams 兩種數(shù)據(jù)類型,可以實(shí)現(xiàn)消息隊(duì)列的這些需求,當(dāng)然,Redis 還提供了發(fā)布、訂閱(pub/sub) 模式。

我們逐一看下這 3 種方式的使用和場(chǎng)景。

2.1 List 實(shí)現(xiàn)消息隊(duì)列

Redis 列表是簡(jiǎn)單的字符串列表,按照插入順序排序。你可以添加一個(gè)元素到列表的頭部(左邊)或者尾部(右邊)。

所以常用來做異步隊(duì)列使用。將需要延后處理的任務(wù)結(jié)構(gòu)體序列化成字符串塞進(jìn) Redis 的列表,另一個(gè)線程從這個(gè)列表中輪詢數(shù)據(jù)進(jìn)行處理。

Redis 提供了好幾對(duì) List 指令,先大概看下這些命令,混個(gè)眼熟

List 常用命令

命令 用法 描述
LPUSH LPUSH key value [value ...] 將一個(gè)或多個(gè)值 value 插入到列表 key 的表頭如果有多個(gè) value 值,那么各個(gè) value 值按從左到右的順序依次插入到表頭
RPUSH RPUSH key value [value ...] 將一個(gè)或多個(gè)值 value 插入到列表 key 的表尾(最右邊)
LPOP LPOP key 移除并返回列表 key 的頭元素。
BLPOP BLPOP key [key ...] timeout 移出并獲取列表的第一個(gè)元素, 如果列表沒有元素會(huì)阻塞列表直到等待超時(shí)或發(fā)現(xiàn)可彈出元素為止
RPOP RPOP key 移除并返回列表 key 的尾元素。
BRPOP BRPOP key [key ...] timeout 移出并獲取列表的最后一個(gè)元素, 如果列表沒有元素會(huì)阻塞列表直到等待超時(shí)或發(fā)現(xiàn)可彈出元素為止。
BRPOPLPUSH BRPOPLPUSH source destination timeout 從列表中彈出一個(gè)值,將彈出的元素插入到另外一個(gè)列表中并返回它;如果列表沒有元素會(huì)阻塞列表直到等待超時(shí)或發(fā)現(xiàn)可彈出元素為止。
RPOPLPUSH RPOPLPUSH source destinationb 命令 RPOPLPUSH 在一個(gè)原子時(shí)間內(nèi),執(zhí)行以下兩個(gè)動(dòng)作:將列表 source 中的最后一個(gè)元素(尾元素)彈出,并返回給客戶端。將 source 彈出的元素插入到列表 destination ,作為 destination 列表的的頭元素
LLEN LLEN key 返回列表 key 的長(zhǎng)度。如果 key 不存在,則 key 被解釋為一個(gè)空列表,返回 0 .如果 key 不是列表類型,返回一個(gè)錯(cuò)誤
LRANGE LRANGE key start stop 返回列表 key 中指定區(qū)間內(nèi)的元素,區(qū)間以偏移量 start 和 stop 指定

挑幾個(gè)彈入、彈出的命令就可以組合出很多姿勢(shì)

  • LPUSH、RPOP 左進(jìn)右出
  • RPUSH、LPOP 右進(jìn)左出
  1. 127.0.0.1:6379> lpush mylist a a b c d e 
  2. (integer) 6 
  3. 127.0.0.1:6379> rpop mylist 
  4. "a" 
  5. 127.0.0.1:6379> rpop mylist 
  6. "a" 
  7. 127.0.0.1:6379> rpop mylist 
  8. "b" 
  9. 127.0.0.1:6379>  

redis-RPOP

即時(shí)消費(fèi)問題

通過 LPUSH,RPOP 這樣的方式,會(huì)存在一個(gè)性能風(fēng)險(xiǎn)點(diǎn),就是消費(fèi)者如果想要及時(shí)的處理數(shù)據(jù),就要在程序中寫個(gè)類似 while(true) 這樣的邏輯,不停的去調(diào)用 RPOP 或 LPOP 命令,這就會(huì)給消費(fèi)者程序帶來些不必要的性能損失。

所以,Redis 還提供了 BLPOP、BRPOP 這種阻塞式讀取的命令(帶 B-Bloking的都是阻塞式),客戶端在沒有讀到隊(duì)列數(shù)據(jù)時(shí),自動(dòng)阻塞,直到有新的數(shù)據(jù)寫入隊(duì)列,再開始讀取新數(shù)據(jù)。這種方式就節(jié)省了不必要的 CPU 開銷。

  • LPUSH、BRPOP 左進(jìn)右阻塞出
  • RPUSH、BLPOP 右進(jìn)左阻塞出
  1. 127.0.0.1:6379> lpush yourlist a b c d 
  2. (integer) 4 
  3. 127.0.0.1:6379> blpop yourlist 10 
  4. 1) "yourlist" 
  5. 2) "d" 
  6. 127.0.0.1:6379> blpop yourlist 10 
  7. 1) "yourlist" 
  8. 2) "c" 
  9. 127.0.0.1:6379> blpop yourlist 10 
  10. 1) "yourlist" 
  11. 2) "b" 
  12. 127.0.0.1:6379> blpop yourlist 10 
  13. 1) "yourlist" 
  14. 2) "a" 
  15. 127.0.0.1:6379> blpop yourlist 10 
  16. (nil) 
  17. (10.02s) 

如果將超時(shí)時(shí)間設(shè)置為 0 時(shí),即可無限等待,直到彈出消息

因?yàn)?Redis 單線程的特點(diǎn),所以在消費(fèi)數(shù)據(jù)時(shí),同一個(gè)消息會(huì)不會(huì)同時(shí)被多個(gè) consumer 消費(fèi)掉,但是需要我們考慮消費(fèi)不成功的情況。

可靠隊(duì)列模式 | ack 機(jī)制

以上方式中, List 隊(duì)列中的消息一經(jīng)發(fā)送出去,便從隊(duì)列里刪除。如果由于網(wǎng)絡(luò)原因消費(fèi)者沒有收到消息,或者消費(fèi)者在處理這條消息的過程中崩潰了,就再也無法還原出這條消息。究其原因,就是缺少消息確認(rèn)機(jī)制。

為了保證消息的可靠性,消息隊(duì)列都會(huì)有完善的消息確認(rèn)機(jī)制(Acknowledge),即消費(fèi)者向隊(duì)列報(bào)告消息已收到或已處理的機(jī)制。

Redis List 怎么搞一搞呢?

再看上邊的表格中,有兩個(gè)命令, RPOPLPUSH、BRPOPLPUSH (阻塞)從一個(gè) list 中獲取消息的同時(shí)把這條消息復(fù)制到另一個(gè) list 里(可以當(dāng)做備份),而且這個(gè)過程是原子的。

這樣我們就可以在業(yè)務(wù)流程安全結(jié)束后,再刪除隊(duì)列元素,實(shí)現(xiàn)消息確認(rèn)機(jī)制。

  1. 127.0.0.1:6379> rpush myqueue one 
  2. (integer) 1 
  3. 127.0.0.1:6379> rpush myqueue two 
  4. (integer) 2 
  5. 127.0.0.1:6379> rpush myqueue three 
  6. (integer) 3 
  7. 127.0.0.1:6379> rpoplpush myqueue queuebak 
  8. "three" 
  9. 127.0.0.1:6379> lrange myqueue 0 -1 
  10. 1) "one" 
  11. 2) "two" 
  12. 127.0.0.1:6379> lrange queuebak 0 -1 
  13. 1) "three" 

redis-rpoplpush

之前做過的項(xiàng)目中就有用到這樣的方式去處理數(shù)據(jù),數(shù)據(jù)標(biāo)識(shí)從一個(gè) List 取出后放入另一個(gè) List,業(yè)務(wù)操作安全執(zhí)行完成后,再去刪除 List 中的數(shù)據(jù),如果有問題的話,很好回滾。

當(dāng)然,還有更特殊的場(chǎng)景,可以通過 zset 來實(shí)現(xiàn)延時(shí)消息隊(duì)列,原理就是將消息加到 zset 結(jié)構(gòu)后,將要被消費(fèi)的時(shí)間戳設(shè)置為對(duì)應(yīng)的 score 即可,只要業(yè)務(wù)數(shù)據(jù)不會(huì)是重復(fù)數(shù)據(jù)就 OK。

2.2 訂閱與發(fā)布實(shí)現(xiàn)消息隊(duì)列

我們都知道消息模型有兩種

點(diǎn)對(duì)點(diǎn):Point-to-Point(P2P)

發(fā)布訂閱:Publish/Subscribe(Pub/Sub)

List 實(shí)現(xiàn)方式其實(shí)就是點(diǎn)對(duì)點(diǎn)的模式,下邊我們?cè)倏聪?Redis 的發(fā)布訂閱模式(消息多播),這才是“根正苗紅”的 Redis MQ

redis-pub_sub

"發(fā)布/訂閱"模式同樣可以實(shí)現(xiàn)進(jìn)程間的消息傳遞,其原理如下:

"發(fā)布/訂閱"模式包含兩種角色,分別是發(fā)布者和訂閱者。訂閱者可以訂閱一個(gè)或者多個(gè)頻道(channel),而發(fā)布者可以向指定的頻道(channel)發(fā)送消息,所有訂閱此頻道的訂閱者都會(huì)收到此消息。

Redis 通過 PUBLISH 、 SUBSCRIBE 等命令實(shí)現(xiàn)了訂閱與發(fā)布模式, 這個(gè)功能提供兩種信息機(jī)制, 分別是訂閱/發(fā)布到頻道和訂閱/發(fā)布到模式。

這個(gè) 頻道 和 模式 有什么區(qū)別呢?

頻道我們可以先理解為是個(gè) Redis 的 key 值,而模式,可以理解為是一個(gè)類似正則匹配的 Key,只是個(gè)可以匹配給定模式的頻道。這樣就不需要顯式的去訂閱多個(gè)名稱了,可以通過模式訂閱這種方式,一次性關(guān)注多個(gè)頻道。

我們啟動(dòng)三個(gè) Redis 客戶端看下效果:

redis-subscribe

先啟動(dòng)兩個(gè)客戶端訂閱(subscribe) 名字叫 framework 的頻道,然后第三個(gè)客戶端往 framework 發(fā)消息,可以看到前兩個(gè)客戶端都會(huì)接收到對(duì)應(yīng)的消息:

redis-publish

我們可以看到訂閱的客戶端每次可以收到一個(gè) 3 個(gè)參數(shù)的消息,分別為:

  • 消息的種類
  • 始發(fā)頻道的名稱
  • 實(shí)際的消息

再來看下訂閱符合給定模式的頻道,這回訂閱的命令是 PSUBSCRIBE

redis-psubscribe

我們往 java.framework 這個(gè)頻道發(fā)送了一條消息,不止訂閱了該頻道的 Consumer1 和 Consumer2 可以接收到消息,訂閱了模式 java.* 的 Consumer3 和 Consumer4 也可以接收到消息。

redis-psubscribe1

Pub/Sub 常用命令:

命令 用法 描述
PSUBSCRIBE PSUBSCRIBE pattern [pattern ...] 訂閱一個(gè)或多個(gè)符合給定模式的頻道
PUBSUB PUBSUB subcommand [argument [argument ...]] 查看訂閱與發(fā)布系統(tǒng)狀態(tài)
PUBLISH PUBLISH channel message 將信息發(fā)送到指定的頻道
PUNSUBSCRIBE PUNSUBSCRIBE [pattern [pattern ...]] 退訂所有給定模式的頻道
SUBSCRIBE SUBSCRIBE channel [channel ...] 訂閱給定的一個(gè)或多個(gè)頻道的信息
UNSUBSCRIBE UNSUBSCRIBE [channel [channel ...]] 指退訂給定的頻道

2.3 Streams 實(shí)現(xiàn)消息隊(duì)列

Redis 發(fā)布訂閱 (pub/sub) 有個(gè)缺點(diǎn)就是消息無法持久化,如果出現(xiàn)網(wǎng)絡(luò)斷開、Redis 宕機(jī)等,消息就會(huì)被丟棄。而且也沒有 Ack 機(jī)制來保證數(shù)據(jù)的可靠性,假設(shè)一個(gè)消費(fèi)者都沒有,那消息就直接被丟棄了。

后來 Redis 的父親 Antirez,又單獨(dú)開啟了一個(gè)叫 Disque 的項(xiàng)目來完善這些問題,但是沒有做起來,github 的更新也定格在了 5 年前,所以我們就不討論了。

Redis 5.0 版本新增了一個(gè)更強(qiáng)大的數(shù)據(jù)結(jié)構(gòu)——Stream。它提供了消息的持久化和主備復(fù)制功能,可以讓任何客戶端訪問任何時(shí)刻的數(shù)據(jù),并且能記住每一個(gè)客戶端的訪問位置,還能保證消息不丟失。

它就像是個(gè)僅追加內(nèi)容的消息鏈表,把所有加入的消息都串起來,每個(gè)消息都有一個(gè)唯一的 ID 和對(duì)應(yīng)的內(nèi)容。而且消息是持久化的。

 

redis-stream

每個(gè) Stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加消息時(shí)自動(dòng)創(chuàng)建。

Streams 是 Redis 專門為消息隊(duì)列設(shè)計(jì)的數(shù)據(jù)類型,所以提供了豐富的消息隊(duì)列操作命令。

Stream 常用命令

描述 用法
添加消息到末尾,保證有序,可以自動(dòng)生成唯一ID XADD key ID field value [field value ...]
對(duì)流進(jìn)行修剪,限制長(zhǎng)度 XTRIM key MAXLEN [~] count
刪除消息 XDEL key ID [ID ...]
獲取流包含的元素?cái)?shù)量,即消息長(zhǎng)度 XLEN key
獲取消息列表,會(huì)自動(dòng)過濾已經(jīng)刪除的消息 XRANGE key start end [COUNT count]
以阻塞或非阻塞方式獲取消息列表 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
創(chuàng)建消費(fèi)者組 XGROUP [CREATE key groupname id-or-] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
讀取消費(fèi)者組中的消息 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
將消息標(biāo)記為"已處理" XACK key group ID [ID ...]
為消費(fèi)者組設(shè)置新的最后遞送消息ID XGROUP SETID [CREATE key groupname id-or-] [DESTROY key groupname]
刪除消費(fèi)者 XGROUP DELCONSUMER [CREATE key groupname id-or-] [DESTROY key groupname]
刪除消費(fèi)者組 XGROUP DESTROY [CREATE key groupname id-or-] [DESTROY key groupname] [DEL
顯示待處理消息的相關(guān)信息 XPENDING key group [start end count] [consumer]
查看流和消費(fèi)者組的相關(guān)信息 XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
打印流信息 XINFO STREAM [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

CRUD 工程師上線

增刪改查來一波

  1. # * 號(hào)表示服務(wù)器自動(dòng)生成 ID,后面順序跟著一堆 key/value 
  2. 127.0.0.1:6379> xadd mystream * f1 v1 f2 v2 f3 v3 
  3. "1609404470049-0"  ## 生成的消息 ID,有兩部分組成,毫秒時(shí)間戳-該毫秒內(nèi)產(chǎn)生的第1條消息 
  4.  
  5. # 消息ID 必須要比上個(gè) ID 大 
  6. 127.0.0.1:6379> xadd mystream 123 f4 v4   
  7. (error) ERR The ID specified in XADD is equal or smaller than the target stream top item 
  8.  
  9. # 自定義ID 
  10. 127.0.0.1:6379> xadd mystream 1609404470049-1 f4 v4 
  11. "1609404470049-1" 
  12.  
  13. # -表示最小值 , + 表示最大值,也可以指定最大消息ID,或最小消息ID,配合 -、+ 使用 
  14. 127.0.0.1:6379> xrange mystream - + 
  15. 1) 1) "1609404470049-0" 
  16.    2) 1) "f1" 
  17.       2) "v1" 
  18.       3) "f2" 
  19.       4) "v2" 
  20.       5) "f3" 
  21.       6) "v3" 
  22. 2) 1) "1609404470049-1" 
  23.    2) 1) "f4" 
  24.       2) "v4" 
  25.  
  26. 127.0.0.1:6379> xdel mystream 1609404470049-1 
  27. (integer) 1 
  28. 127.0.0.1:6379> xlen mystream 
  29. (integer) 1 
  30. # 刪除整個(gè) stream 
  31. 127.0.0.1:6379> del mystream 
  32. (integer) 1 

獨(dú)立消費(fèi)

xread 以阻塞或非阻塞方式獲取消息列表,指定 BLOCK 選項(xiàng)即表示阻塞,超時(shí)時(shí)間 0 毫秒(意味著永不超時(shí))

  1. # 從ID是0-0的開始讀前2條 
  2. 127.0.0.1:6379> xread count 2 streams mystream 0 
  3. 1) 1) "mystream" 
  4.    2) 1) 1) "1609405178536-0" 
  5.          2) 1) "f5" 
  6.             2) "v5" 
  7.       2) 1) "1609405198676-0" 
  8.          2) 1) "f1" 
  9.             2) "v1" 
  10.             3) "f2" 
  11.             4) "v2" 
  12.  
  13. # 阻塞的從尾部讀取流,開啟新的客戶端xadd后發(fā)現(xiàn)這里就讀到了,block 0 表示永久阻塞 
  14. 127.0.0.1:6379> xread block 0 streams mystream $ 
  15. 1) 1) "mystream" 
  16.    2) 1) 1) "1609408791503-0" 
  17.          2) 1) "f6" 
  18.             2) "v6" 
  19. (42.37s) 

可以看到,我并沒有給流 mystream 傳入一個(gè)常規(guī)的 ID,而是傳入了一個(gè)特殊的 ID $這個(gè)特殊的 ID 意思是 XREAD 應(yīng)該使用流 mystream 已經(jīng)存儲(chǔ)的最大 ID 作為最后一個(gè) ID。以便我們僅接收從我們開始監(jiān)聽時(shí)間以后的新消息。這在某種程度上相似于 Unix 命令tail -f。

當(dāng)然,也可以指定任意有效的 ID。

而且, XREAD 的阻塞形式還可以同時(shí)監(jiān)聽多個(gè) Strema,只需要指定多個(gè)鍵名即可。

  1. 127.0.0.1:6379> xread block 0 streams mystream yourstream $ $ 

創(chuàng)建消費(fèi)者組

xread 雖然可以扇形分發(fā)到 N 個(gè)客戶端,然而,在某些問題中,我們想要做的不是向許多客戶端提供相同的消息流,而是從同一流向許多客戶端提供不同的消息子集。比如下圖這樣,三個(gè)消費(fèi)者按輪訓(xùn)的方式去消費(fèi)一個(gè) Stream。

redis-stream-cg

Redis Stream 借鑒了很多 Kafka 的設(shè)計(jì)。

  • Consumer Group:有了消費(fèi)組的概念,每個(gè)消費(fèi)組狀態(tài)獨(dú)立,互不影響,一個(gè)消費(fèi)組可以有多個(gè)消費(fèi)者
  • last_delivered_id :每個(gè)消費(fèi)組會(huì)有個(gè)游標(biāo) last_delivered_id 在數(shù)組之上往前移動(dòng),表示當(dāng)前消費(fèi)組已經(jīng)消費(fèi)到哪條消息了
  • pending_ids :消費(fèi)者的狀態(tài)變量,作用是維護(hù)消費(fèi)者的未確認(rèn)的 id。pending_ids 記錄了當(dāng)前已經(jīng)被客戶端讀取的消息,但是還沒有 ack。如果客戶端沒有 ack,這個(gè)變量里面的消息 ID 會(huì)越來越多,一旦某個(gè)消息被 ack,它就開始減少。這個(gè) pending_ids 變量在 Redis 官方被稱之為 PEL,也就是 Pending Entries List,這是一個(gè)很核心的數(shù)據(jù)結(jié)構(gòu),它用來確??蛻舳酥辽傧M(fèi)了消息一次,而不會(huì)在網(wǎng)絡(luò)傳輸?shù)闹型緛G失了沒處理。

redis-group-strucure

Stream 不像 Kafak 那樣有分區(qū)的概念,如果想實(shí)現(xiàn)類似分區(qū)的功能,就要在客戶端使用一定的策略將消息寫到不同的 Stream。

  • xgroup create:創(chuàng)建消費(fèi)者組
  • xgreadgroup:讀取消費(fèi)組中的消息
  • xack:ack 掉指定消息

  1. # 創(chuàng)建消費(fèi)者組的時(shí)候必須指定 ID, ID 為 0 表示從頭開始消費(fèi),為 $ 表示只消費(fèi)新的消息,也可以自己指定 
  2. 127.0.0.1:6379> xgroup create mystream mygroup $ 
  3. OK 
  4.  
  5. # 查看流和消費(fèi)者組的相關(guān)信息,可以查看流、也可以單獨(dú)查看流下的某個(gè)組的信息 
  6. 127.0.0.1:6379> xinfo stream mystream 
  7.  1) "length" 
  8.  2) (integer) 4  # 共 4 個(gè)消息 
  9.  3) "radix-tree-keys" 
  10.  4) (integer) 1 
  11.  5) "radix-tree-nodes" 
  12.  6) (integer) 2 
  13.  7) "last-generated-id" 
  14.  8) "1609408943089-0" 
  15.  9) "groups" 
  16. 10) (integer) 1  # 一個(gè)消費(fèi)組 
  17. 11) "first-entry" # 第一個(gè)消息 
  18. 12) 1) "1609405178536-0" 
  19.     2) 1) "f5" 
  20.        2) "v5" 
  21. 13) "last-entry"  # 最后一個(gè)消息 
  22. 14) 1) "1609408943089-0" 
  23.     2) 1) "f6" 
  24.        2) "v6" 
  25. 127.0.0.1:6379>  

按消費(fèi)組消費(fèi)

Stream 提供了 xreadgroup 指令可以進(jìn)行消費(fèi)組的組內(nèi)消費(fèi),需要提供消費(fèi)組名稱、消費(fèi)者名稱和起始消息 ID。它同 xread 一樣,也可以阻塞等待新消息。讀到新消息后,對(duì)應(yīng)的消息 ID 就會(huì)進(jìn)入消費(fèi)者的 PEL(正在處理的消息) 結(jié)構(gòu)里,客戶端處理完畢后使用 xack 指令通知服務(wù)器,本條消息已經(jīng)處理完畢,該消息 ID 就會(huì)從 PEL 中移除。

  1. #  消費(fèi)組 mygroup1 中的 消費(fèi)者 c1 從 mystream 中 消費(fèi)組數(shù)據(jù) 
  2. # > 號(hào)表示從當(dāng)前消費(fèi)組的 last_delivered_id 后面開始讀 
  3. # 每當(dāng)消費(fèi)者讀取一條消息,last_delivered_id 變量就會(huì)前進(jìn) 
  4. 127.0.0.1:6379> xreadgroup group mygroup1 c1 count 1 streams mystream > 
  5. 1) 1) "mystream" 
  6.    2) 1) 1) "1609727806627-0" 
  7.          2) 1) "f1" 
  8.             2) "v1" 
  9.             3) "f2" 
  10.             4) "v2" 
  11.             5) "f3" 
  12.             6) "v3" 
  13. 127.0.0.1:6379> xreadgroup group mygroup1 c1 count 1 streams mystream > 
  14. 1) 1) "mystream" 
  15.    2) 1) 1) "1609727818650-0" 
  16.          2) 1) "f4" 
  17.             2) "v4" 
  18. # 已經(jīng)沒有消息可讀了             
  19. 127.0.0.1:6379> xreadgroup group mygroup1 c1 count 2 streams mystream > 
  20. (nil) 
  21.  
  22. # 還可以阻塞式的消費(fèi) 
  23. 127.0.0.1:6379> xreadgroup group mygroup1 c2 block 0 streams mystream > 
  24. µ1) 1) "mystream" 
  25.    2) 1) 1) "1609728270632-0" 
  26.          2) 1) "f5" 
  27.             2) "v5" 
  28. (89.36s) 
  29.  
  30. # 觀察消費(fèi)組信息 
  31. 127.0.0.1:6379> xinfo groups mystream 
  32. 1) 1) "name" 
  33.    2) "mygroup1" 
  34.    3) "consumers" 
  35.    4) (integer) 2  # 2個(gè)消費(fèi)者 
  36.    5) "pending" 
  37.    6) (integer) 3   # 共 3 條正在處理的信息還沒有 ack 
  38.    7) "last-delivered-id" 
  39.    8) "1609728270632-0" 
  40.     
  41. 127.0.0.1:6379> xack mystream mygroup1 1609727806627-0  # ack掉指定消息 
  42. (integer) 1 

嘗鮮到此結(jié)束,就不繼續(xù)深入了。

個(gè)人感覺,就目前來說,Stream 還是不能當(dāng)做主流的 MQ 來使用的,而且使用案例也比較少,慎用。

寫在最后

當(dāng)然,還有需要注意的就是,業(yè)務(wù)上避免過度復(fù)用一個(gè) Redis。既用它做緩存、做計(jì)算,還拿它做任務(wù)隊(duì)列,這樣的話 Redis 會(huì)很累的。

沒有絕對(duì)好的技術(shù)、只有對(duì)業(yè)務(wù)最友好的技術(shù),共勉

以夢(mèng)為馬,越騎越傻。詩和遠(yuǎn)方,越走越慌。不忘初心是對(duì)的,但切記要出發(fā),加油吧,程序員。

在路上的你,可以微信搜「 JavaKeeper 」一起前行,無套路領(lǐng)取 500+ 本電子書和 30+ 視頻教學(xué)和源碼,本文 GitHub github.com/JavaKeeper)已經(jīng)收錄,服務(wù)端開發(fā)、面試必備技能兵器譜,有你想要的!

參考

《Redis 設(shè)計(jì)與實(shí)現(xiàn)》

Redis 官網(wǎng)

https://segmentfault.com/a/1190000012244418

https://www.cnblogs.com/williamjie/p/11201654.html

 

責(zé)任編輯:武曉燕 來源: JavaKeeper
相關(guān)推薦

2024-09-11 14:57:00

Redis消費(fèi)線程模型

2020-01-14 15:08:44

Redis5Streams數(shù)據(jù)庫

2020-11-24 10:13:02

Redis集群數(shù)據(jù)庫

2011-09-05 12:43:23

Sencha Touc事件

2022-01-21 19:22:45

RedisList命令

2022-01-15 07:20:18

Redis List 消息隊(duì)列

2021-11-05 21:33:28

Redis數(shù)據(jù)高并發(fā)

2023-10-13 00:00:00

Redis模塊空間對(duì)象

2023-03-06 08:40:43

RedisListJava

2024-10-25 08:41:18

消息隊(duì)列RedisList

2017-07-03 18:24:39

MySQL數(shù)據(jù)冗余

2022-07-22 20:00:01

高可用路由

2022-03-22 10:24:48

Linux開源Elasticsea

2009-12-21 13:37:43

WCF消息交換

2021-12-20 07:11:26

Java List排序 Java 基礎(chǔ)

2022-02-28 08:42:49

RedisStream消息隊(duì)列

2010-05-25 18:50:22

MySQL安裝

2020-11-03 19:52:54

Java數(shù)組編程語言

2018-07-10 08:42:45

Oracle高可用集群

2011-01-18 15:35:59

jQueryJavaScriptweb
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)