Redis Stream 用做消息隊(duì)列完美嗎
這篇文章,分享筆者學(xué)習(xí) Redis Stream 的心得,希望對(duì)大家有所啟發(fā)。
圖片
1 基礎(chǔ)知識(shí)
Redis Stream 的結(jié)構(gòu)如下圖所示,它是一個(gè)消息鏈表,將所有加入的消息都串起來(lái),每個(gè)消息都有一個(gè)唯一的 ID 和對(duì)應(yīng)的內(nèi)容。
圖片
每個(gè) Redis Stream 都有唯一的名稱(chēng) ,對(duì)應(yīng)唯一的 Redis Key 。
同一個(gè) Stream 可以掛載多個(gè)消費(fèi)者組 ConsumerGroup , 消費(fèi)組不能自動(dòng)創(chuàng)建,需要使用 XGROUP CREATE 命令創(chuàng)建。
每個(gè)消費(fèi)組會(huì)有個(gè)游標(biāo) last_delivered_id,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使游標(biāo) last_delivered_id 往前移動(dòng) ,標(biāo)識(shí)當(dāng)前消費(fèi)組消費(fèi)到哪條消息了。
消費(fèi)組 ConsumerGroup 同樣可以掛載多個(gè)消費(fèi)者 Consumer , 每個(gè) Consumer 并行的讀取消息,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使游標(biāo) last_delivered_id 往前移動(dòng)。
消費(fèi)者內(nèi)部有一個(gè)屬性 pending_ids , 記錄了當(dāng)前消費(fèi)者讀取但沒(méi)有回復(fù) ACK 的消息 ID 列表 。
細(xì)品 Redis stream 的設(shè)計(jì),我們發(fā)現(xiàn)它和 Kafka 非常相似,比如說(shuō)消費(fèi)者組,消費(fèi)進(jìn)度偏移量等。
但內(nèi)部實(shí)現(xiàn)差異還是很明顯,Kafka 主要是利用了文件系統(tǒng)的特性,每一個(gè)主題下存在多個(gè)分區(qū) partition,而消費(fèi)者處理對(duì)應(yīng)分區(qū)消息成功之后,消費(fèi)者會(huì) commit 消費(fèi)進(jìn)度到 Broker ,Broker 修改消費(fèi)進(jìn)度偏移量并持久化。
2 核心命令
01 XADD 向 Stream 末尾添加消息
使用 XADD 向隊(duì)列添加消息,如果指定的隊(duì)列不存在,則創(chuàng)建一個(gè)隊(duì)列。基礎(chǔ)語(yǔ)法格式:
XADD key ID field value [field value ...]
- key :隊(duì)列名稱(chēng),如果不存在就創(chuàng)建
- ID :消息 id,我們使用 * 表示由 redis 生成,可以自定義,但是要自己保證遞增性。
- field value :記錄。
127.0.0.1:6379> XADD mystream * name1 value1 name2 value2
"1712473185388-0"
127.0.0.1:6379> XLEN mystream
(integer) 1
127.0.0.1:6379> XADD mystream * name2 value2 name3 value3
"1712473231761-0"
消息 ID 使用 * 表示由 redis 生成,同時(shí)也可以自定義,但是自定義時(shí)要保證遞增性。
消息 ID 的格式:毫秒級(jí)時(shí)間戳 + 序號(hào) , 例如:1712473185388-5 , 它表示當(dāng)前消息在毫秒時(shí)間戳 1712473185388 產(chǎn)生 ,并且該毫秒內(nèi)產(chǎn)生到了第5條消息。
在添加隊(duì)列消息時(shí),也可以指定隊(duì)列的長(zhǎng)度。
127.0.0.1:6379> XADD mystream MAXLEN 100 * name value1 age 30
"1713082205042-0"
使用 XADD 命令向 mystream 的 stream 中添加了一條消息,并且指定了最大長(zhǎng)度為 100。消息的 ID 由 Redis 自動(dòng)生成,消息包含兩個(gè)字段 name 和 age,分別對(duì)應(yīng)的值是 value1 和 30。
02 XRANGE 獲取消息列表
使用 XRANGE 獲取消息列表,會(huì)自動(dòng)過(guò)濾已經(jīng)刪除的消息。語(yǔ)法格式:
XRANGE key start end [COUNT count]
- key :隊(duì)列名
- start :開(kāi)始值, - 表示最小值
- end :結(jié)束值, + 表示最大值
- count :數(shù)量
127.0.0.1:6379> XRANGE mystream - + COUNT 2
1) 1) "1712473185388-0"
2) 1) "name1"
2) "value1"
3) "name2"
4) "value2"
2) 1) "1712473231761-0"
2) 1) "name2"
2) "value2"
3) "name3"
4) "value3"
我們得到兩條消息,第一層是消息 ID ,第二層是消息內(nèi)容 ,消息內(nèi)容是 Hash 數(shù)據(jù)結(jié)構(gòu) 。
03 XREAD 以阻塞/非阻塞方式獲取消息列表
使用 XREAD 以阻塞或非阻塞方式獲取消息列表 ,語(yǔ)法格式:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
- count :數(shù)量
- milliseconds :可選,阻塞毫秒數(shù),沒(méi)有設(shè)置就是非阻塞模式
- key :隊(duì)列名
- id :消息 ID
127.0.0.1:6379> XREAD streams mystream 0-0
1) 1) "mystream"
2) 1) 1) "1712473185388-0"
2) 1) "name1"
2) "value1"
3) "name2"
4) "value2"
2) 1) "1712473231761-0"
2) 1) "name2"
2) "value2"
3) "name3"
4) "value3"
XRED 讀消息時(shí)分為阻塞和非阻塞模式,使用 BLOCK 選項(xiàng)可以表示阻塞模式,需要設(shè)置阻塞時(shí)長(zhǎng)。非阻塞模式下,讀取完畢(即使沒(méi)有任何消息)立即返回,而在阻塞模式下,若讀取不到內(nèi)容,則阻塞等待。
127.0.0.1:6379> XREAD block 1000 streams mystream $
(nil)
(1.07s)
使用 Block 模式,配合 $ 作為 ID ,表示讀取最新的消息,若沒(méi)有消息,命令阻塞!等待過(guò)程中,其他客戶(hù)端向隊(duì)列追加消息,則會(huì)立即讀取到。
因此,典型的隊(duì)列就是 XADD 配合 XREAD Block 完成。XADD 負(fù)責(zé)生成消息,XREAD 負(fù)責(zé)消費(fèi)消息。
04 XGROUP CREATE 創(chuàng)建消費(fèi)者組
使用 XGROUP CREATE 創(chuàng)建消費(fèi)者組,分兩種情況:
- 從頭開(kāi)始消費(fèi):
XGROUP CREATE mystream consumer-group-name 0-0
- 從尾部開(kāi)始消費(fèi):
XGROUP CREATE mystream consumer-group-name $
執(zhí)行效果如下:
127.0.0.1:6379> XGROUP CREATE mystream mygroup 0-0
OK
05 XREADGROUP GROUP 讀取消費(fèi)組中的消息
使用 XREADGROUP GROUP 讀取消費(fèi)組中的消息,語(yǔ)法格式:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- group :消費(fèi)組名
- consumer :消費(fèi)者名。
- count :讀取數(shù)量。
- milliseconds :阻塞毫秒數(shù)。
- key :隊(duì)列名。
- ID :消息 ID。
示例:
127.0.0.1:6379> XREADGROUP group mygroup consumerA count 1 streams mystream >
1) 1) "mystream"
2) 1) 1) "1712473185388-0"
2) 1) "name1"
2) "value1"
3) "name2"
4) "value2"
消費(fèi)者組 mygroup 中的消費(fèi)者 consumerA ,從 名為 mystream 的 Stream 中讀取消息。
- COUNT 1 表示一次最多讀取一條消息
- > 表示消息的起始位置是當(dāng)前可用消息的 ID,即從當(dāng)前未讀取的最早消息開(kāi)始讀取。
06 XACK 消息消費(fèi)確認(rèn)
接收到消息之后,我們要手動(dòng)確認(rèn)一下(ack),語(yǔ)法格式:
xack key group-key ID [ID ...]
示例:
127.0.0.1:6379> XACK mystream mygroup 1713089061658-0
(integer) 1
消費(fèi)確認(rèn)增加了消息的可靠性,一般在業(yè)務(wù)處理完成之后,需要執(zhí)行 ack 確認(rèn)消息已經(jīng)被消費(fèi)完成,整個(gè)流程的執(zhí)行如下圖所示:
圖片
我們可以使用 xpending 命令查看消費(fèi)者未確認(rèn)的消息ID:
127.0.0.1:6379> xpending mystream mygroup
1) (integer) 1
2) "1713091227595-0"
3) "1713091227595-0"
4) 1) 1) "consumerA"
2) "1"
07 XTRIM 限制 Stream 長(zhǎng)度
我們使用 XTRIM 對(duì)流進(jìn)行修剪,限制長(zhǎng)度, 語(yǔ)法格式:
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1712535017402-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 4
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1712498239430-0"
2) 1) "name"
2) "zhangyogn"
2) 1) "1712535017402-0"
2) 1) "field1"
2) "A"
3) "field2"
4) "B"
5) "field3"
6) "C"
7) "field4"
8) "D"
通過(guò)學(xué)習(xí)這七個(gè)核心命令,筆者發(fā)現(xiàn) Redis Stream 既支持簡(jiǎn)單的生產(chǎn)消費(fèi)模型,也支持發(fā)布訂閱模型。
3 SpringBoot Redis Stream 實(shí)戰(zhàn)
演示代碼地址:
https://github.com/makemyownlife/courage-cache-demo
1、添加 SpringBoot Redis 依賴(lài)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2、yaml 文件配置
圖片
3、RedisTemplate 配置
圖片
4、定義stream監(jiān)聽(tīng)器
圖片
5、定義 StreamContainer 并啟動(dòng)
圖片
6、發(fā)送消息
圖片
執(zhí)行完成之后,消費(fèi)者就可以打印如下日志:
圖片
通過(guò)實(shí)戰(zhàn)流程,我們發(fā)現(xiàn) SpringBoot 整合 Redis Stream 并不復(fù)雜,需要注意:定義注冊(cè)信息時(shí),需要確保 stream key 和消費(fèi)者組已經(jīng)創(chuàng)建好了。
4 Redis stream 用做消息隊(duì)列完美嗎
筆者認(rèn)為 Redis stream 用于消息隊(duì)列最大的進(jìn)步在于:實(shí)現(xiàn)了發(fā)布訂閱模型。
發(fā)布訂閱模型具有如下特點(diǎn):
- 消費(fèi)獨(dú)立相比隊(duì)列模型的匿名消費(fèi)方式,發(fā)布訂閱模型中消費(fèi)方都會(huì)具備的身份,一般叫做訂閱組(訂閱關(guān)系),不同訂閱組之間相互獨(dú)立不會(huì)相互影響。
- 一對(duì)多通信基于獨(dú)立身份的設(shè)計(jì),同一個(gè)主題內(nèi)的消息可以被多個(gè)訂閱組處理,每個(gè)訂閱組都可以拿到全量消息。因此發(fā)布訂閱模型可以實(shí)現(xiàn)一對(duì)多通信。
我們?cè)?jīng)詬病 Redis List 數(shù)據(jù)結(jié)構(gòu)用做隊(duì)列時(shí),因?yàn)橄M(fèi)時(shí)沒(méi)有 Ack 機(jī)制,應(yīng)用異常掛掉導(dǎo)致消息偶發(fā)丟失的情況,Redis Stream 從設(shè)計(jì)角度來(lái)講已經(jīng)完美的解決了。
因?yàn)橄M(fèi)者內(nèi)部有一個(gè)屬性 pending_ids , 記錄了當(dāng)前消費(fèi)者讀取但沒(méi)有回復(fù) ACK 的消息 ID 列表 。當(dāng)消費(fèi)者重新上線,這些消息可以重新被消費(fèi)。
但 Redis stream 用做消息隊(duì)列完美嗎 ?
這個(gè)還真沒(méi)有,原因在于 Redis 先天不足。
1、Redis 本身定位是內(nèi)存數(shù)據(jù)庫(kù),它的設(shè)計(jì)之初都是為緩存準(zhǔn)備的,并不具備消息堆積的能力。而專(zhuān)業(yè)消息隊(duì)列一個(gè)非常重要的功能是數(shù)據(jù)中轉(zhuǎn)樞紐,Redis 的定位很難滿足,使用起來(lái)要非常小心。
2、Redis 的高可用方案可能丟失消息(AOF 持久化和主從復(fù)制都是異步 ),而專(zhuān)業(yè)消息隊(duì)列可以針對(duì)不同的場(chǎng)景選擇不同的高可用策略。也就說(shuō),專(zhuān)業(yè)消息隊(duì)列的高可用方案依然有可能會(huì)丟失消息,但是它有完善的方案支持不丟失消息。
所以,筆者認(rèn)為 Redis 非常適合輕量級(jí)消息隊(duì)列解決方案,即:數(shù)據(jù)量可控 + 業(yè)務(wù)模型簡(jiǎn)單 + 消息堆積概率低 + 報(bào)警監(jiān)控 。