自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

把Redis當(dāng)作隊(duì)列來(lái)用,真的合適嗎?

數(shù)據(jù)庫(kù) 其他數(shù)據(jù)庫(kù) Redis
這篇文章,我就和你聊一聊把 Redis 當(dāng)作隊(duì)列,究竟是否合適這個(gè)問(wèn)題。

 我經(jīng)常聽(tīng)到很多人討論,關(guān)于「把 Redis 當(dāng)作隊(duì)列來(lái)用是否合適」的問(wèn)題。

有些人表示贊成,他們認(rèn)為 Redis 很輕量,用作隊(duì)列很方便。

也些人則反對(duì),認(rèn)為 Redis 會(huì)「丟」數(shù)據(jù),最好還是用「專業(yè)」的隊(duì)列中間件更穩(wěn)妥。

究竟哪種方案更好呢?

這篇文章,我就和你聊一聊把 Redis 當(dāng)作隊(duì)列,究竟是否合適這個(gè)問(wèn)題。

我會(huì)從簡(jiǎn)單到復(fù)雜,一步步帶你梳理其中的細(xì)節(jié),把這個(gè)問(wèn)題真正的講清楚。

看完這篇文章后,我希望你對(duì)這個(gè)問(wèn)題你會(huì)有全新的認(rèn)識(shí)。

在文章的最后,我還會(huì)告訴你關(guān)于「技術(shù)選型」的思路,文章有點(diǎn)長(zhǎng),希望你可以耐心讀完。

從最簡(jiǎn)單的開(kāi)始:List 隊(duì)列

首先,我們先從最簡(jiǎn)單的場(chǎng)景開(kāi)始講起。

如果你的業(yè)務(wù)需求足夠簡(jiǎn)單,想把 Redis 當(dāng)作隊(duì)列來(lái)使用,肯定最先想到的就是使用 List 這個(gè)數(shù)據(jù)類型。

因?yàn)?List 底層的實(shí)現(xiàn)就是一個(gè)「鏈表」,在頭部和尾部操作元素,時(shí)間復(fù)雜度都是 O(1),這意味著它非常符合消息隊(duì)列的模型。

如果把 List 當(dāng)作隊(duì)列,你可以這么來(lái)用。

生產(chǎn)者使用 LPUSH 發(fā)布消息: 

  1. 127.0.0.1:6379> LPUSH queue msg1  
  2. (integer) 1  
  3. 127.0.0.1:6379> LPUSH queue msg2  
  4. (integer) 2 

消費(fèi)者這一側(cè),使用 RPOP 拉取消息: 

  1. 127.0.0.1:6379> RPOP queue  
  2. "msg1"  
  3. 127.0.0.1:6379> RPOP queue  
  4. "msg2" 

這個(gè)模型非常簡(jiǎn)單,也很容易理解。

但這里有個(gè)小問(wèn)題,當(dāng)隊(duì)列中已經(jīng)沒(méi)有消息了,消費(fèi)者在執(zhí)行 RPOP 時(shí),會(huì)返回 NULL。 

  1. 127.0.0.1:6379> RPOP queue  
  2. (nil)   // 沒(méi)消息了 

而我們?cè)诰帉懴M(fèi)者邏輯時(shí),一般是一個(gè)「死循環(huán)」,這個(gè)邏輯需要不斷地從隊(duì)列中拉取消息進(jìn)行處理,偽代碼一般會(huì)這么寫: 

  1. while true:  
  2.     msg = redis.rpop("queue")  
  3.     // 沒(méi)有消息,繼續(xù)循環(huán)  
  4.     if msg == null:  
  5.         continue  
  6.     // 處理消息 
  7.      handle(msg) 

如果此時(shí)隊(duì)列為空,那消費(fèi)者依舊會(huì)頻繁拉取消息,這會(huì)造成「CPU 空轉(zhuǎn)」,不僅浪費(fèi) CPU 資源,還會(huì)對(duì) Redis 造成壓力。

怎么解決這個(gè)問(wèn)題呢?

也很簡(jiǎn)單,當(dāng)隊(duì)列為空時(shí),我們可以「休眠」一會(huì),再去嘗試?yán)∠ⅰ4a可以修改成這樣: 

  1. while true:  
  2.     msg = redis.rpop("queue")  
  3.     // 沒(méi)有消息,休眠2s  
  4.     if msg == null:  
  5.         sleep(2)  
  6.         continue  
  7.     // 處理消息       
  8.      handle(msg) 

這就解決了 CPU 空轉(zhuǎn)問(wèn)題。

這個(gè)問(wèn)題雖然解決了,但又帶來(lái)另外一個(gè)問(wèn)題:當(dāng)消費(fèi)者在休眠等待時(shí),有新消息來(lái)了,那消費(fèi)者處理新消息就會(huì)存在「延遲」。

假設(shè)設(shè)置的休眠時(shí)間是 2s,那新消息最多存在 2s 的延遲。

要想縮短這個(gè)延遲,只能減小休眠的時(shí)間。但休眠時(shí)間越小,又有可能引發(fā) CPU 空轉(zhuǎn)問(wèn)題。

魚(yú)和熊掌不可兼得。

那如何做,既能及時(shí)處理新消息,還能避免 CPU 空轉(zhuǎn)呢?

Redis 是否存在這樣一種機(jī)制:如果隊(duì)列為空,消費(fèi)者在拉取消息時(shí)就「阻塞等待」,一旦有新消息過(guò)來(lái),就通知我的消費(fèi)者立即處理新消息呢?

幸運(yùn)的是,Redis 確實(shí)提供了「阻塞式」拉取消息的命令:BRPOP / BLPOP,這里的 B 指的是阻塞(Block)。

現(xiàn)在,你可以這樣來(lái)拉取消息了: 

  1. while true:  
  2.     // 沒(méi)消息阻塞等待,0表示不設(shè)置超時(shí)時(shí)間  
  3.     msg = redis.brpop("queue", 0)  
  4.     if msg == null:  
  5.         continue  
  6.     // 處理消息  
  7.     handle(msg) 

使用 BRPOP 這種阻塞式方式拉取消息時(shí),還支持傳入一個(gè)「超時(shí)時(shí)間」,如果設(shè)置為 0,則表示不設(shè)置超時(shí),直到有新消息才返回,否則會(huì)在指定的超時(shí)時(shí)間后返回 NULL。

這個(gè)方案不錯(cuò),既兼顧了效率,還避免了 CPU 空轉(zhuǎn)問(wèn)題,一舉兩得。

 注意:如果設(shè)置的超時(shí)時(shí)間太長(zhǎng),這個(gè)連接太久沒(méi)有活躍過(guò),可能會(huì)被 Redis Server 判定為無(wú)效連接,之后 Redis Server 會(huì)強(qiáng)制把這個(gè)客戶端踢下線。所以,采用這種方案,客戶端要有重連機(jī)制。

解決了消息處理不及時(shí)的問(wèn)題,你可以再思考一下,這種隊(duì)列模型,有什么缺點(diǎn)?

我們一起來(lái)分析一下:

  1.  不支持重復(fù)消費(fèi):消費(fèi)者拉取消息后,這條消息就從 List 中刪除了,無(wú)法被其它消費(fèi)者再次消費(fèi),即不支持多個(gè)消費(fèi)者消費(fèi)同一批數(shù)據(jù)
  2.  消息丟失:消費(fèi)者拉取到消息后,如果發(fā)生異常宕機(jī),那這條消息就丟失了

第一個(gè)問(wèn)題是功能上的,使用 List 做消息隊(duì)列,它僅僅支持最簡(jiǎn)單的,一組生產(chǎn)者對(duì)應(yīng)一組消費(fèi)者,不能滿足多組生產(chǎn)者和消費(fèi)者的業(yè)務(wù)場(chǎng)景。

第二個(gè)問(wèn)題就比較棘手了,因?yàn)閺?List 中 POP 一條消息出來(lái)后,這條消息就會(huì)立即從鏈表中刪除了。也就是說(shuō),無(wú)論消費(fèi)者是否處理成功,這條消息都沒(méi)辦法再次消費(fèi)了。

這也意味著,如果消費(fèi)者在處理消息時(shí)異常宕機(jī),那這條消息就相當(dāng)于丟失了。

針對(duì)這 2 個(gè)問(wèn)題怎么解決呢?我們一個(gè)個(gè)來(lái)看。

發(fā)布/訂閱模型:Pub/Sub

從名字就能看出來(lái),這個(gè)模塊是 Redis 專門是針對(duì)「發(fā)布/訂閱」這種隊(duì)列模型設(shè)計(jì)的。

它正好可以解決前面提到的第一個(gè)問(wèn)題:重復(fù)消費(fèi)。

即多組生產(chǎn)者、消費(fèi)者的場(chǎng)景,我們來(lái)看它是如何做的。

Redis 提供了 PUBLISH / SUBSCRIBE 命令,來(lái)完成發(fā)布、訂閱的操作。

假設(shè)你想開(kāi)啟 2 個(gè)消費(fèi)者,同時(shí)消費(fèi)同一批數(shù)據(jù),就可以按照以下方式來(lái)實(shí)現(xiàn)。

首先,使用 SUBSCRIBE 命令,啟動(dòng) 2 個(gè)消費(fèi)者,并「訂閱」同一個(gè)隊(duì)列。 

  1. // 2個(gè)消費(fèi)者 都訂閱一個(gè)隊(duì)列  
  2. 127.0.0.1:6379> SUBSCRIBE queue  
  3. Reading messages... (press Ctrl-C to quit)  
  4. 1) "subscribe"  
  5. 2) "queue"  
  6. 3) (integer) 1 

此時(shí),2 個(gè)消費(fèi)者都會(huì)被阻塞住,等待新消息的到來(lái)。

之后,再啟動(dòng)一個(gè)生產(chǎn)者,發(fā)布一條消息。 

  1. 127.0.0.1:6379> PUBLISH queue msg1  
  2. (integer) 1 

這時(shí),2 個(gè)消費(fèi)者就會(huì)解除阻塞,收到生產(chǎn)者發(fā)來(lái)的新消息。 

  1. 127.0.0.1:6379> SUBSCRIBE queue  
  2. // 收到新消息  
  3. 1) "message"  
  4. 2) "queue"  
  5. 3) "msg1" 

看到了么,使用 Pub/Sub 這種方案,既支持阻塞式拉取消息,還很好地滿足了多組消費(fèi)者,消費(fèi)同一批數(shù)據(jù)的業(yè)務(wù)需求。

除此之外,Pub/Sub 還提供了「匹配訂閱」模式,允許消費(fèi)者根據(jù)一定規(guī)則,訂閱「多個(gè)」自己感興趣的隊(duì)列。 

  1. // 訂閱符合規(guī)則的隊(duì)列  
  2. 127.0.0.1:6379> PSUBSCRIBE queue.*  
  3. Reading messages... (press Ctrl-C to quit)  
  4. 1) "psubscribe"  
  5. 2) "queue.*"  
  6. 3) (integer) 1 

這里的消費(fèi)者,訂閱了 queue.* 相關(guān)的隊(duì)列消息。

之后,生產(chǎn)者分別向 queue.p1 和 queue.p2 發(fā)布消息。 

  1. 127.0.0.1:6379> PUBLISH queue.p1 msg1  
  2. (integer) 1  
  3. 127.0.0.1:6379> PUBLISH queue.p2 msg2  
  4. (integer) 1 

這時(shí)再看消費(fèi)者,它就可以接收到這 2 個(gè)生產(chǎn)者的消息了。 

  1. 127.0.0.1:6379> PSUBSCRIBE queue.*  
  2. Reading messages... (press Ctrl-C to quit)  
  3. ...  
  4. // 來(lái)自queue.p1的消息  
  5. 1) "pmessage" 
  6. 2) "queue.*"  
  7. 3) "queue.p1"  
  8. 4) "msg1" 
  9. // 來(lái)自queue.p2的消息  
  10. 1) "pmessage"  
  11. 2) "queue.*"  
  12. 3) "queue.p2"  
  13. 4) "msg2" 

我們可以看到,Pub/Sub 最大的優(yōu)勢(shì)就是,支持多組生產(chǎn)者、消費(fèi)者處理消息。

講完了它的優(yōu)點(diǎn),那它有什么缺點(diǎn)呢?

其實(shí),Pub/Sub 最大問(wèn)題是:丟數(shù)據(jù)。

如果發(fā)生以下場(chǎng)景,就有可能導(dǎo)致數(shù)據(jù)丟失:

  1.  消費(fèi)者下線
  2.  Redis 宕機(jī)
  3.  消息堆積

究竟是怎么回事?

這其實(shí)與 Pub/Sub 的實(shí)現(xiàn)方式有很大關(guān)系。

Pub/Sub 在實(shí)現(xiàn)時(shí)非常簡(jiǎn)單,它沒(méi)有基于任何數(shù)據(jù)類型,也沒(méi)有做任何的數(shù)據(jù)存儲(chǔ),它只是單純地為生產(chǎn)者、消費(fèi)者建立「數(shù)據(jù)轉(zhuǎn)發(fā)通道」,把符合規(guī)則的數(shù)據(jù),從一端轉(zhuǎn)發(fā)到另一端。

一個(gè)完整的發(fā)布、訂閱消息處理流程是這樣的:

  1.  消費(fèi)者訂閱指定隊(duì)列,Redis 就會(huì)記錄一個(gè)映射關(guān)系:隊(duì)列->消費(fèi)者
  2.  生產(chǎn)者向這個(gè)隊(duì)列發(fā)布消息,那 Redis 就從映射關(guān)系中找出對(duì)應(yīng)的消費(fèi)者,把消息轉(zhuǎn)發(fā)給它

看到了么,整個(gè)過(guò)程中,沒(méi)有任何的數(shù)據(jù)存儲(chǔ),一切都是實(shí)時(shí)轉(zhuǎn)發(fā)的。

這種設(shè)計(jì)方案,就導(dǎo)致了上面提到的那些問(wèn)題。

例如,如果一個(gè)消費(fèi)者異常掛掉了,它再重新上線后,只能接收新的消息,在下線期間生產(chǎn)者發(fā)布的消息,因?yàn)檎也坏较M(fèi)者,都會(huì)被丟棄掉。

如果所有消費(fèi)者都下線了,那生產(chǎn)者發(fā)布的消息,因?yàn)檎也坏饺魏我粋€(gè)消費(fèi)者,也會(huì)全部「丟棄」。

所以,當(dāng)你在使用 Pub/Sub 時(shí),一定要注意:消費(fèi)者必須先訂閱隊(duì)列,生產(chǎn)者才能發(fā)布消息,否則消息會(huì)丟失。

這也是前面講例子時(shí),我們讓消費(fèi)者先訂閱隊(duì)列,之后才讓生產(chǎn)者發(fā)布消息的原因。

另外,因?yàn)?Pub/Sub 沒(méi)有基于任何數(shù)據(jù)類型實(shí)現(xiàn),所以它也不具備「數(shù)據(jù)持久化」的能力。

也就是說(shuō),Pub/Sub 的相關(guān)操作,不會(huì)寫入到 RDB 和 AOF 中,當(dāng) Redis 宕機(jī)重啟,Pub/Sub 的數(shù)據(jù)也會(huì)全部丟失。

最后,我們來(lái)看 Pub/Sub 在處理「消息積壓」時(shí),為什么也會(huì)丟數(shù)據(jù)?

當(dāng)消費(fèi)者的速度,跟不上生產(chǎn)者時(shí),就會(huì)導(dǎo)致數(shù)據(jù)積壓的情況發(fā)生。

如果采用 List 當(dāng)作隊(duì)列,消息積壓時(shí),會(huì)導(dǎo)致這個(gè)鏈表很長(zhǎng),最直接的影響就是,Redis 內(nèi)存會(huì)持續(xù)增長(zhǎng),直到消費(fèi)者把所有數(shù)據(jù)都從鏈表中取出。

但 Pub/Sub 的處理方式卻不一樣,當(dāng)消息積壓時(shí),有可能會(huì)導(dǎo)致消費(fèi)失敗和消息丟失!

這是怎么回事?

還是回到 Pub/Sub 的實(shí)現(xiàn)細(xì)節(jié)上來(lái)說(shuō)。

每個(gè)消費(fèi)者訂閱一個(gè)隊(duì)列時(shí),Redis 都會(huì)在 Server 上給這個(gè)消費(fèi)者在分配一個(gè)「緩沖區(qū)」,這個(gè)緩沖區(qū)其實(shí)就是一塊內(nèi)存。

當(dāng)生產(chǎn)者發(fā)布消息時(shí),Redis 先把消息寫到對(duì)應(yīng)消費(fèi)者的緩沖區(qū)中。

之后,消費(fèi)者不斷地從緩沖區(qū)讀取消息,處理消息。

但是,問(wèn)題就出在這個(gè)緩沖區(qū)上。

因?yàn)檫@個(gè)緩沖區(qū)其實(shí)是有「上限」的(可配置),如果消費(fèi)者拉取消息很慢,就會(huì)造成生產(chǎn)者發(fā)布到緩沖區(qū)的消息開(kāi)始積壓,緩沖區(qū)內(nèi)存持續(xù)增長(zhǎng)。

如果超過(guò)了緩沖區(qū)配置的上限,此時(shí),Redis 就會(huì)「強(qiáng)制」把這個(gè)消費(fèi)者踢下線。

這時(shí)消費(fèi)者就會(huì)消費(fèi)失敗,也會(huì)丟失數(shù)據(jù)。

如果你有看過(guò) Redis 的配置文件,可以看到這個(gè)緩沖區(qū)的默認(rèn)配置:client-output-buffer-limit pubsub 32mb 8mb 60。

它的參數(shù)含義如下:

  •  32mb:緩沖區(qū)一旦超過(guò) 32MB,Redis 直接強(qiáng)制把消費(fèi)者踢下線
  •  8mb + 60:緩沖區(qū)超過(guò) 8MB,并且持續(xù) 60 秒,Redis 也會(huì)把消費(fèi)者踢下線

Pub/Sub 的這一點(diǎn)特點(diǎn),是與 List 作隊(duì)列差異比較大的。

從這里你應(yīng)該可以看出,List 其實(shí)是屬于「拉」模型,而 Pub/Sub 其實(shí)屬于「推」模型。

List 中的數(shù)據(jù)可以一直積壓在內(nèi)存中,消費(fèi)者什么時(shí)候來(lái)「拉」都可以。

但 Pub/Sub 是把消息先「推」到消費(fèi)者在 Redis Server 上的緩沖區(qū)中,然后等消費(fèi)者再來(lái)取。

當(dāng)生產(chǎn)、消費(fèi)速度不匹配時(shí),就會(huì)導(dǎo)致緩沖區(qū)的內(nèi)存開(kāi)始膨脹,Redis 為了控制緩沖區(qū)的上限,所以就有了上面講到的,強(qiáng)制把消費(fèi)者踢下線的機(jī)制。

好了,現(xiàn)在我們總結(jié)一下 Pub/Sub 的優(yōu)缺點(diǎn):

  1.  支持發(fā)布 / 訂閱,支持多組生產(chǎn)者、消費(fèi)者處理消息
  2.  消費(fèi)者下線,數(shù)據(jù)會(huì)丟失
  3.  不支持?jǐn)?shù)據(jù)持久化,Redis 宕機(jī),數(shù)據(jù)也會(huì)丟失
  4.  消息堆積,緩沖區(qū)溢出,消費(fèi)者會(huì)被強(qiáng)制踢下線,數(shù)據(jù)也會(huì)丟失

有沒(méi)有發(fā)現(xiàn),除了第一個(gè)是優(yōu)點(diǎn)之外,剩下的都是缺點(diǎn)。

所以,很多人看到 Pub/Sub 的特點(diǎn)后,覺(jué)得這個(gè)功能很「雞肋」。

也正是以上原因,Pub/Sub 在實(shí)際的應(yīng)用場(chǎng)景中用得并不多。

目前只有哨兵集群和 Redis 實(shí)例通信時(shí),采用了 Pub/Sub 的方案,因?yàn)樯诒梅霞磿r(shí)通訊的業(yè)務(wù)場(chǎng)景。

我們?cè)賮?lái)看一下,Pub/Sub 有沒(méi)有解決,消息處理時(shí)異常宕機(jī),無(wú)法再次消費(fèi)的問(wèn)題呢?

其實(shí)也不行,Pub/Sub 從緩沖區(qū)取走數(shù)據(jù)之后,數(shù)據(jù)就從 Redis 緩沖區(qū)刪除了,消費(fèi)者發(fā)生異常,自然也無(wú)法再次重新消費(fèi)。

好,現(xiàn)在我們重新梳理一下,我們?cè)谑褂孟㈥?duì)列時(shí)的需求。

當(dāng)我們?cè)谑褂靡粋€(gè)消息隊(duì)列時(shí),希望它的功能如下:

  •  支持阻塞等待拉取消息
  •  支持發(fā)布 / 訂閱模式
  •  消費(fèi)失敗,可重新消費(fèi),消息不丟失
  •  實(shí)例宕機(jī),消息不丟失,數(shù)據(jù)可持久化
  •  消息可堆積

Redis 除了 List 和 Pub/Sub 之外,還有符合這些要求的數(shù)據(jù)類型嗎?

其實(shí),Redis 的作者也看到了以上這些問(wèn)題,也一直在朝著這些方向努力著。

Redis 作者在開(kāi)發(fā) Redis 期間,還另外開(kāi)發(fā)了一個(gè)開(kāi)源項(xiàng)目 disque。

這個(gè)項(xiàng)目的定位,就是一個(gè)基于內(nèi)存的分布式消息隊(duì)列中間件。

但由于種種原因,這個(gè)項(xiàng)目一直不溫不火。

終于,在 Redis 5.0 版本,作者把 disque 功能移植到了 Redis 中,并給它定義了一個(gè)新的數(shù)據(jù)類型:Stream。

下面我們就來(lái)看看,它能符合上面提到的這些要求嗎?

趨于成熟的隊(duì)列:Stream

我們來(lái)看 Stream 是如何解決上面這些問(wèn)題的。

我們依舊從簡(jiǎn)單到復(fù)雜,依次來(lái)看 Stream 在做消息隊(duì)列時(shí),是如何處理的?

首先,Stream 通過(guò) XADD 和 XREAD 完成最簡(jiǎn)單的生產(chǎn)、消費(fèi)模型:

  • XADD:發(fā)布消息
  •  XREAD:讀取消息

生產(chǎn)者發(fā)布 2 條消息: 

  1. // *表示讓Redis自動(dòng)生成消息ID  
  2. 127.0.0.1:6379> XADD queue * name zhangsan  
  3. "1618469123380-0"  
  4. 127.0.0.1:6379> XADD queue * name lisi  
  5. "1618469127777-0" 

使用 XADD 命令發(fā)布消息,其中的「*」表示讓 Redis 自動(dòng)生成唯一的消息 ID。

這個(gè)消息 ID 的格式是「時(shí)間戳-自增序號(hào)」。

消費(fèi)者拉取消息: 

  1. // 從開(kāi)頭讀取5條消息,0-0表示從開(kāi)頭讀取  
  2. 127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0  
  3. 1) 1) "queue"  
  4.    2) 1) 1) "1618469123380-0"  
  5.          2) 1) "name"  
  6.             2) "zhangsan"  
  7.       2) 1) "1618469127777-0"  
  8.          2) 1) "name"  
  9.             2) "lisi" 

如果想繼續(xù)拉取消息,需要傳入上一條消息的 ID:

127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0

(nil)

沒(méi)有消息,Redis 會(huì)返回 NULL。

以上就是 Stream 最簡(jiǎn)單的生產(chǎn)、消費(fèi)。

這里不再重點(diǎn)介紹 Stream 命令的各種參數(shù),我在例子中演示時(shí),凡是大寫的單詞都是「固定」參數(shù),凡是小寫的單詞,都是可以自己定義的,例如隊(duì)列名、消息長(zhǎng)度等等,下面的例子規(guī)則也是一樣,為了方便你理解,這里有必要提醒一下。

下面我們來(lái)看,針對(duì)前面提到的消息隊(duì)列要求,Stream 都是如何解決的?

1) Stream 是否支持「阻塞式」拉取消息?

可以的,在讀取消息時(shí),只需要增加 BLOCK 參數(shù)即可。 

  1. // BLOCK 0 表示阻塞等待,不設(shè)置超時(shí)時(shí)間  
  2. 127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0 

這時(shí),消費(fèi)者就會(huì)阻塞等待,直到生產(chǎn)者發(fā)布新的消息才會(huì)返回。

2) Stream 是否支持發(fā)布 / 訂閱模式?

也沒(méi)問(wèn)題,Stream 通過(guò)以下命令完成發(fā)布訂閱:

  •  XGROUP:創(chuàng)建消費(fèi)者組
  •  XREADGROUP:在指定消費(fèi)組下,開(kāi)啟消費(fèi)者拉取消息

下面我們來(lái)看具體如何做?

首先,生產(chǎn)者依舊發(fā)布 2 條消息: 

  1. 127.0.0.1:6379> XADD queue * name zhangsan  
  2. "1618470740565-0"  
  3. 127.0.0.1:6379> XADD queue * name lisi 
  4. "1618470743793-0" 

之后,我們想要開(kāi)啟 2 組消費(fèi)者處理同一批數(shù)據(jù),就需要?jiǎng)?chuàng)建 2 個(gè)消費(fèi)者組: 

  1. // 創(chuàng)建消費(fèi)者組1,0-0表示從頭拉取消息  
  2. 127.0.0.1:6379> XGROUP CREATE queue group1 0-0  
  3. OK  
  4. // 創(chuàng)建消費(fèi)者組2,0-0表示從頭拉取消息  
  5. 127.0.0.1:6379> XGROUP CREATE queue group2 0-0  
  6. OK 

消費(fèi)者組創(chuàng)建好之后,我們可以給每個(gè)「消費(fèi)者組」下面掛一個(gè)「消費(fèi)者」,讓它們分別處理同一批數(shù)據(jù)。

第一個(gè)消費(fèi)組開(kāi)始消費(fèi): 

  1. // group1的consumer開(kāi)始消費(fèi),>表示拉取最新數(shù)據(jù)  
  2. 127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS queue >  
  3. 1) 1) "queue"  
  4.    2) 1) 1) "1618470740565-0"  
  5.          2) 1) "name"  
  6.             2) "zhangsan"  
  7.       2) 1) "1618470743793-0"  
  8.          2) 1) "name"  
  9.             2) "lisi" 

同樣地,第二個(gè)消費(fèi)組開(kāi)始消費(fèi): 

  1. // group2的consumer開(kāi)始消費(fèi),>表示拉取最新數(shù)據(jù)  
  2. 127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS queue >  
  3. 1) 1) "queue"  
  4.    2) 1) 1) "1618470740565-0"  
  5.          2) 1) "name"  
  6.             2) "zhangsan"  
  7.       2) 1) "1618470743793-0"  
  8.          2) 1) "name"  
  9.             2) "lisi" 

我們可以看到,這 2 組消費(fèi)者,都可以獲取同一批數(shù)據(jù)進(jìn)行處理了。

這樣一來(lái),就達(dá)到了多組消費(fèi)者「訂閱」消費(fèi)的目的。

3) 消息處理時(shí)異常,Stream 能否保證消息不丟失,重新消費(fèi)?

除了上面拉取消息時(shí)用到了消息 ID,這里為了保證重新消費(fèi),也要用到這個(gè)消息 ID。

當(dāng)一組消費(fèi)者處理完消息后,需要執(zhí)行 XACK 命令告知 Redis,這時(shí) Redis 就會(huì)把這條消息標(biāo)記為「處理完成」。 

  1. // group1下的 1618472043089-0 消息已處理完成  
  2. 127.0.0.1:6379> XACK queue group1 1618472043089-0 

如果消費(fèi)者異常宕機(jī),肯定不會(huì)發(fā)送 XACK,那么 Redis 就會(huì)依舊保留這條消息。

待這組消費(fèi)者重新上線后,Redis 就會(huì)把之前沒(méi)有處理成功的數(shù)據(jù),重新發(fā)給這個(gè)消費(fèi)者。這樣一來(lái),即使消費(fèi)者異常,也不會(huì)丟失數(shù)據(jù)了。 

  1. // 消費(fèi)者重新上線,0-0表示重新拉取未ACK的消息  
  2. 127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS queue 0-0  
  3. // 之前沒(méi)消費(fèi)成功的數(shù)據(jù),依舊可以重新消費(fèi)  
  4. 1) 1) "queue"  
  5.    2) 1) 1) "1618472043089-0"  
  6.          2) 1) "name"  
  7.             2) "zhangsan"  
  8.       2) 1) "1618472045158-0"  
  9.          2) 1) "name"  
  10.             2) "lisi" 

4) Stream 數(shù)據(jù)會(huì)寫入到 RDB 和 AOF 做持久化嗎?

Stream 是新增加的數(shù)據(jù)類型,它與其它數(shù)據(jù)類型一樣,每個(gè)寫操作,也都會(huì)寫入到 RDB 和 AOF 中。

我們只需要配置好持久化策略,這樣的話,就算 Redis 宕機(jī)重啟,Stream 中的數(shù)據(jù)也可以從 RDB 或 AOF 中恢復(fù)回來(lái)。

5) 消息堆積時(shí),Stream 是怎么處理的?

其實(shí),當(dāng)消息隊(duì)列發(fā)生消息堆積時(shí),一般只有 2 個(gè)解決方案:

  1.  生產(chǎn)者限流:避免消費(fèi)者處理不及時(shí),導(dǎo)致持續(xù)積壓
  2.  丟棄消息:中間件丟棄舊消息,只保留固定長(zhǎng)度的新消息

而 Redis 在實(shí)現(xiàn) Stream 時(shí),采用了第 2 個(gè)方案。

在發(fā)布消息時(shí),你可以指定隊(duì)列的最大長(zhǎng)度,防止隊(duì)列積壓導(dǎo)致內(nèi)存爆炸。 

  1. // 隊(duì)列長(zhǎng)度最大10000  
  2. 127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan  
  3. "1618473015018-0" 

當(dāng)隊(duì)列長(zhǎng)度超過(guò)上限后,舊消息會(huì)被刪除,只保留固定長(zhǎng)度的新消息。

這么來(lái)看,Stream 在消息積壓時(shí),如果指定了最大長(zhǎng)度,還是有可能丟失消息的。

除了以上介紹到的命令,Stream 還支持查看消息長(zhǎng)度(XLEN)、查看消費(fèi)者狀態(tài)(XINFO)等命令,使用也比較簡(jiǎn)單,你可以查詢官方文檔了解一下,這里就不過(guò)多介紹了。

好了,通過(guò)以上介紹,我們可以看到,Redis 的 Stream 幾乎覆蓋到了消息隊(duì)列的各種場(chǎng)景,是不是覺(jué)得很完美?

既然它的功能這么強(qiáng)大,這是不是意味著,Redis 真的可以作為專業(yè)的消息隊(duì)列中間件來(lái)使用呢?

但是還「差一點(diǎn)」,就算 Redis 能做到以上這些,也只是「趨近于」專業(yè)的消息隊(duì)列。

原因在于 Redis 本身的一些問(wèn)題,如果把其定位成消息隊(duì)列,還是有些欠缺的。

到這里,就不得不把 Redis 與專業(yè)的隊(duì)列中間件做對(duì)比了。

下面我們就來(lái)看一下,Redis 在作隊(duì)列時(shí),到底還有哪些欠缺?

與專業(yè)的消息隊(duì)列對(duì)比

其實(shí),一個(gè)專業(yè)的消息隊(duì)列,必須要做到兩大塊:

  1.  消息不丟
  2.  消息可堆積

前面我們討論的重點(diǎn),很大篇幅圍繞的是第一點(diǎn)展開(kāi)的。

這里我們換個(gè)角度,從一個(gè)消息隊(duì)列的「使用模型」來(lái)分析一下,怎么做,才能保證數(shù)據(jù)不丟?

使用一個(gè)消息隊(duì)列,其實(shí)就分為三大塊:生產(chǎn)者、隊(duì)列中間件、消費(fèi)者。

消息是否會(huì)發(fā)生丟失,其重點(diǎn)也就在于以下 3 個(gè)環(huán)節(jié):

  1.  生產(chǎn)者會(huì)不會(huì)丟消息?
  2.  消費(fèi)者會(huì)不會(huì)丟消息?
  3.  隊(duì)列中間件會(huì)不會(huì)丟消息?

1) 生產(chǎn)者會(huì)不會(huì)丟消息?

當(dāng)生產(chǎn)者在發(fā)布消息時(shí),可能發(fā)生以下異常情況:

  1.  消息沒(méi)發(fā)出去:網(wǎng)絡(luò)故障或其它問(wèn)題導(dǎo)致發(fā)布失敗,中間件直接返回失敗
  2.  不確定是否發(fā)布成功:網(wǎng)絡(luò)問(wèn)題導(dǎo)致發(fā)布超時(shí),可能數(shù)據(jù)已發(fā)送成功,但讀取響應(yīng)結(jié)果超時(shí)了

如果是情況 1,消息根本沒(méi)發(fā)出去,那么重新發(fā)一次就好了。

如果是情況 2,生產(chǎn)者沒(méi)辦法知道消息到底有沒(méi)有發(fā)成功?所以,為了避免消息丟失,它也只能繼續(xù)重試,直到發(fā)布成功為止。

生產(chǎn)者一般會(huì)設(shè)定一個(gè)最大重試次數(shù),超過(guò)上限依舊失敗,需要記錄日志報(bào)警處理。

也就是說(shuō),生產(chǎn)者為了避免消息丟失,只能采用失敗重試的方式來(lái)處理。

但發(fā)現(xiàn)沒(méi)有?這也意味著消息可能會(huì)重復(fù)發(fā)送。

是的,在使用消息隊(duì)列時(shí),要保證消息不丟,寧可重發(fā),也不能丟棄。

那消費(fèi)者這邊,就需要多做一些邏輯了。

對(duì)于敏感業(yè)務(wù),當(dāng)消費(fèi)者收到重復(fù)數(shù)據(jù)數(shù)據(jù)時(shí),要設(shè)計(jì)冪等邏輯,保證業(yè)務(wù)的正確性。

從這個(gè)角度來(lái)看,生產(chǎn)者會(huì)不會(huì)丟消息,取決于生產(chǎn)者對(duì)于異常情況的處理是否合理。

所以,無(wú)論是 Redis 還是專業(yè)的隊(duì)列中間件,生產(chǎn)者在這一點(diǎn)上都是可以保證消息不丟的。

2) 消費(fèi)者會(huì)不會(huì)丟消息?

這種情況就是我們前面提到的,消費(fèi)者拿到消息后,還沒(méi)處理完成,就異常宕機(jī)了,那消費(fèi)者還能否重新消費(fèi)失敗的消息?

要解決這個(gè)問(wèn)題,消費(fèi)者在處理完消息后,必須「告知」隊(duì)列中間件,隊(duì)列中間件才會(huì)把標(biāo)記已處理,否則仍舊把這些數(shù)據(jù)發(fā)給消費(fèi)者。

這種方案需要消費(fèi)者和中間件互相配合,才能保證消費(fèi)者這一側(cè)的消息不丟。

無(wú)論是 Redis 的 Stream,還是專業(yè)的隊(duì)列中間件,例如 RabbitMQ、Kafka,其實(shí)都是這么做的。

所以,從這個(gè)角度來(lái)看,Redis 也是合格的。

3) 隊(duì)列中間件會(huì)不會(huì)丟消息?

前面 2 個(gè)問(wèn)題都比較好處理,只要客戶端和服務(wù)端配合好,就能保證生產(chǎn)端、消費(fèi)端都不丟消息。

但是,如果隊(duì)列中間件本身就不可靠呢?

畢竟生產(chǎn)者和消費(fèi)這都依賴它,如果它不可靠,那么生產(chǎn)者和消費(fèi)者無(wú)論怎么做,都無(wú)法保證數(shù)據(jù)不丟。

在這個(gè)方面,Redis 其實(shí)沒(méi)有達(dá)到要求。

Redis 在以下 2 個(gè)場(chǎng)景下,都會(huì)導(dǎo)致數(shù)據(jù)丟失。

  1.  AOF 持久化配置為每秒寫盤,但這個(gè)寫盤過(guò)程是異步的,Redis 宕機(jī)時(shí)會(huì)存在數(shù)據(jù)丟失的可能
  2.  主從復(fù)制也是異步的,主從切換時(shí),也存在丟失數(shù)據(jù)的可能(從庫(kù)還未同步完成主庫(kù)發(fā)來(lái)的數(shù)據(jù),就被提成主庫(kù))

基于以上原因我們可以看到,Redis 本身的無(wú)法保證嚴(yán)格的數(shù)據(jù)完整性。

所以,如果把 Redis 當(dāng)做消息隊(duì)列,在這方面是有可能導(dǎo)致數(shù)據(jù)丟失的。

再來(lái)看那些專業(yè)的消息隊(duì)列中間件是如何解決這個(gè)問(wèn)題的?

像 RabbitMQ 或 Kafka 這類專業(yè)的隊(duì)列中間件,在使用時(shí),一般是部署一個(gè)集群,生產(chǎn)者在發(fā)布消息時(shí),隊(duì)列中間件通常會(huì)寫「多個(gè)節(jié)點(diǎn)」,以此保證消息的完整性。這樣一來(lái),即便其中一個(gè)節(jié)點(diǎn)掛了,也能保證集群的數(shù)據(jù)不丟失。

也正因?yàn)槿绱耍琑abbitMQ、Kafka在設(shè)計(jì)時(shí)也更復(fù)雜。畢竟,它們是專門針對(duì)隊(duì)列場(chǎng)景設(shè)計(jì)的。

但 Redis 的定位則不同,它的定位更多是當(dāng)作緩存來(lái)用,它們兩者在這個(gè)方面肯定是存在差異的。

最后,我們來(lái)看消息積壓怎么辦?

4) 消息積壓怎么辦?

因?yàn)?Redis 的數(shù)據(jù)都存儲(chǔ)在內(nèi)存中,這就意味著一旦發(fā)生消息積壓,則會(huì)導(dǎo)致 Redis 的內(nèi)存持續(xù)增長(zhǎng),如果超過(guò)機(jī)器內(nèi)存上限,就會(huì)面臨被 OOM 的風(fēng)險(xiǎn)。

所以,Redis 的 Stream 提供了可以指定隊(duì)列最大長(zhǎng)度的功能,就是為了避免這種情況發(fā)生。

但 Kafka、RabbitMQ 這類消息隊(duì)列就不一樣了,它們的數(shù)據(jù)都會(huì)存儲(chǔ)在磁盤上,磁盤的成本要比內(nèi)存小得多,當(dāng)消息積壓時(shí),無(wú)非就是多占用一些磁盤空間,相比于內(nèi)存,在面對(duì)積壓時(shí)也會(huì)更加「坦然」。

綜上,我們可以看到,把 Redis 當(dāng)作隊(duì)列來(lái)使用時(shí),始終面臨的 2 個(gè)問(wèn)題:

  1.  Redis 本身可能會(huì)丟數(shù)據(jù)
  2.  面對(duì)消息積壓,Redis 內(nèi)存資源緊張

到這里,Redis 是否可以用作隊(duì)列,我想這個(gè)答案你應(yīng)該會(huì)比較清晰了。

如果你的業(yè)務(wù)場(chǎng)景足夠簡(jiǎn)單,對(duì)于數(shù)據(jù)丟失不敏感,而且消息積壓概率比較小的情況下,把 Redis 當(dāng)作隊(duì)列是完全可以的。

而且,Redis 相比于 Kafka、RabbitMQ,部署和運(yùn)維也更加輕量。

如果你的業(yè)務(wù)場(chǎng)景對(duì)于數(shù)據(jù)丟失非常敏感,而且寫入量非常大,消息積壓時(shí)會(huì)占用很多的機(jī)器資源,那么我建議你使用專業(yè)的消息隊(duì)列中間件。

總結(jié)

好了,總結(jié)一下。這篇文章我們從「Redis 能否用作隊(duì)列」這個(gè)角度出發(fā),介紹了 List、Pub/Sub、Stream 在做隊(duì)列的使用方式,以及它們各自的優(yōu)劣。

之后又把 Redis 和專業(yè)的消息隊(duì)列中間件做對(duì)比,發(fā)現(xiàn) Redis 的不足之處。

最后,我們得出 Redis 做隊(duì)列的合適場(chǎng)景。

這里我也列了一個(gè)表格,總結(jié)了它們各自的優(yōu)缺點(diǎn)。

后記

最后,我想和你再聊一聊關(guān)于「技術(shù)方案選型」的問(wèn)題。

你應(yīng)該也看到了,這篇文章雖然始于 Redis,但并不止于 Redis。

我們?cè)诜治?Redis 細(xì)節(jié)時(shí),一直在提出問(wèn)題,然后尋找更好的解決方案,在文章最后,又聊到一個(gè)專業(yè)的消息隊(duì)列應(yīng)該怎么做。

其實(shí),我們?cè)谟懻摷夹g(shù)選型時(shí),就是一個(gè)關(guān)于如何取舍的問(wèn)題。

而這里我想傳達(dá)給你的信息是,在面對(duì)技術(shù)選型時(shí),不要不經(jīng)過(guò)思考就覺(jué)得哪個(gè)方案好,哪個(gè)方案不好。

你需要根據(jù)具體場(chǎng)景具體分析,這里我把這個(gè)分析過(guò)程分為 2 個(gè)層面:

  1.  業(yè)務(wù)功能角度
  2.  技術(shù)資源角度

這篇文章所講到的內(nèi)容,都是以業(yè)務(wù)功能角度出發(fā)做決策的。

但這里的第二點(diǎn),從技術(shù)資源角度出發(fā),其實(shí)也很重要。

技術(shù)資源的角度是說(shuō),你所處的公司環(huán)境、技術(shù)資源能否匹配這些技術(shù)方案。

這個(gè)怎么解釋呢?

簡(jiǎn)單來(lái)講,就是你所在的公司、團(tuán)隊(duì),是否有匹配的資源能 hold 住這些技術(shù)方案。

我們都知道 Kafka、RabbitMQ 是非常專業(yè)的消息中間件,但它們的部署和運(yùn)維,相比于 Redis 來(lái)說(shuō),也會(huì)更復(fù)雜一些。

如果你在一個(gè)大公司,公司本身就有優(yōu)秀的運(yùn)維團(tuán)隊(duì),那么使用這些中間件肯定沒(méi)問(wèn)題,因?yàn)橛凶銐騼?yōu)秀的人能 hold 住這些中間件,公司也會(huì)投入人力和時(shí)間在這個(gè)方向上。

但如果你是在一個(gè)初創(chuàng)公司,業(yè)務(wù)正處在快速發(fā)展期,暫時(shí)沒(méi)有能 hold 住這些中間件的團(tuán)隊(duì)和人,如果貿(mào)然使用這些組件,當(dāng)發(fā)生故障時(shí),排查問(wèn)題也會(huì)變得很困難,甚至?xí)璧K業(yè)務(wù)的發(fā)展。

而這種情形下,如果公司的技術(shù)人員對(duì)于 Redis 都很熟,綜合評(píng)估來(lái)看,Redis 也基本可以滿足業(yè)務(wù) 90% 的需求,那當(dāng)下選擇 Redis 未必不是一個(gè)好的決策。

所以,做技術(shù)選型不只是技術(shù)問(wèn)題,還與人、團(tuán)隊(duì)、管理、組織結(jié)構(gòu)有關(guān)。

也正是因?yàn)檫@些原因,當(dāng)你在和別人討論技術(shù)選型問(wèn)題時(shí),你會(huì)發(fā)現(xiàn)每個(gè)公司的做法都不相同。

畢竟每個(gè)公司所處的環(huán)境和文化不一樣,做出的決策當(dāng)然就會(huì)各有差異。

如果你不了解這其中的邏輯,那在做技術(shù)選型時(shí),只會(huì)趨于表面現(xiàn)象,無(wú)法深入到問(wèn)題根源。

而一旦你理解了這個(gè)邏輯,那么你在看待這個(gè)問(wèn)題時(shí),不僅對(duì)于技術(shù)會(huì)有更加深刻認(rèn)識(shí),對(duì)技術(shù)資源和人的把握,也會(huì)更加清晰。

希望你以后在做技術(shù)選型時(shí),能夠把這些因素也考慮在內(nèi),這對(duì)你的技術(shù)成長(zhǎng)之路也是非常有幫助的。 

 

責(zé)任編輯:龐桂玉 來(lái)源: Java知音
相關(guān)推薦

2018-01-09 20:22:49

容器Docker桌面系統(tǒng)

2021-11-09 12:11:55

C# Redis隊(duì)列

2009-04-17 16:16:53

程序人生職場(chǎng)

2013-09-04 13:51:02

2019-12-22 13:21:04

人臉識(shí)別支付寶微信

2024-04-19 08:32:07

Redis緩存數(shù)據(jù)庫(kù)

2020-10-16 15:06:59

開(kāi)發(fā)技術(shù)方案

2009-11-23 10:08:45

BlackBerryLinux調(diào)制解調(diào)器

2021-03-14 22:00:17

MongoDB循環(huán)隊(duì)列

2024-09-30 10:27:22

2020-10-23 09:31:20

Redis-Cli REPL模式

2021-11-29 09:45:57

枚舉Go代碼

2020-12-24 06:00:27

Python編程語(yǔ)言開(kāi)發(fā)

2020-10-21 10:53:33

Google壟斷法瀏覽器

2016-06-01 15:42:58

Hadoop數(shù)據(jù)管理分布式

2019-06-14 14:48:41

多云云計(jì)算云平臺(tái)

2020-03-04 10:13:55

Kubernetes容器開(kāi)發(fā)

2014-04-17 16:42:03

DevOps

2020-04-17 14:25:22

Kubernetes應(yīng)用程序軟件開(kāi)發(fā)

2022-07-26 00:00:22

HTAP系統(tǒng)數(shù)據(jù)庫(kù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)