如何正確使用Redis Streams?
譯文【51CTO.com快譯】 Redis是一種內(nèi)存中的多模式數(shù)據(jù)庫,適用于諸多使用場合,包括內(nèi)容緩存、會話存儲、實時分析、消息代理和數(shù)據(jù)流。去年,我撰文介紹了如何使用Redis Pub/Sub、Lists和Sorted Sets用于實時數(shù)據(jù)流處理,詳見https://www.infoworld.com/article/3212768/database/how-to-use-redis-for-real-time-stream-processing.html?,F(xiàn)在Redis 5.0已發(fā)布,Redis有了一種全新的數(shù)據(jù)結(jié)構(gòu)來管理數(shù)據(jù)流。
有了Redis Streams數(shù)據(jù)結(jié)構(gòu),可以執(zhí)行比Pub/Sub、Lists和Sorted Sets豐富得多的操作。Redis Streams有眾多優(yōu)點,它讓你能夠執(zhí)行下列操作:
- 收集大量高速傳來的數(shù)據(jù)(唯一的瓶頸是你的網(wǎng)絡(luò)I/O);
- 在許多生產(chǎn)者和許多消費者之間建立數(shù)據(jù)通道;
- 即使生產(chǎn)者和消費者運作的速度不一樣,也能高效地管理數(shù)據(jù)的消費;
- 你的消費者離線或斷開連接時確保數(shù)據(jù)持久化;
- 在生產(chǎn)者和消費者之間異步通信;
- 擴大消費者的數(shù)量;
- 消費者在消費數(shù)據(jù)過程中失效時,實現(xiàn)類似事務(wù)的數(shù)據(jù)安全性;
- 有效地使用主內(nèi)存。
Redis Streams的最大優(yōu)點是它內(nèi)置于Redis中,因此部署或管理Redis Streams無需額外的步驟。我在本文中將逐步介紹使用Redis Streams的基本方面,包括如何將數(shù)據(jù)添加到數(shù)據(jù)流、如何讀取數(shù)據(jù)(一次性讀取、異步讀取和到達時讀取等),以滿足消費者的不同使用場合。
一、了解Redis Streams中的數(shù)據(jù)流
Redis Streams提供了一種“只允許追加”(append only)的數(shù)據(jù)結(jié)構(gòu),與日志類似。它提供了可以將數(shù)據(jù)源添加到數(shù)據(jù)流、使用數(shù)據(jù)流以及監(jiān)控和管理如何消費數(shù)據(jù)的命令。 Streams數(shù)據(jù)結(jié)構(gòu)很靈活,讓你可以以幾種方式來連接生產(chǎn)者和消費者。
圖1. Redis Streams的簡單應(yīng)用,只有一個生產(chǎn)者和一個消費者
圖1表明了Redis Streams的基本用法。單單一個生產(chǎn)者充當數(shù)據(jù)源,消費者是將數(shù)據(jù)發(fā)送給相關(guān)接收者的消息傳遞應(yīng)用程序。
圖2.多個消費者從Redis Streams讀取數(shù)據(jù)的應(yīng)用
圖2中,一個公共數(shù)據(jù)流被多個消費者使用。使用Redis Streams,消費者可以按照自己的節(jié)奏來讀取和分析數(shù)據(jù)。
在下一個應(yīng)用中,如圖3所示,情況變得復(fù)雜一點。該服務(wù)從多個生產(chǎn)者接收數(shù)據(jù),并將所有數(shù)據(jù)存儲在Redis Streams數(shù)據(jù)結(jié)構(gòu)中。該應(yīng)用有多個消費者從Redis Streams讀取數(shù)據(jù),讀取數(shù)據(jù)的還有消費者組(consumer group),消費者組支持無法與生產(chǎn)者保持同樣速度的消費者。
圖3. Redis Streams支持多個生產(chǎn)者和消費者
二、用Redis Streams將數(shù)據(jù)添加到數(shù)據(jù)流
圖3中的圖表只顯示了向Redis Stream添加數(shù)據(jù)的一種方法。雖然一個或多個生產(chǎn)者可以向數(shù)據(jù)結(jié)構(gòu)添加數(shù)據(jù),但任何新數(shù)據(jù)始終追加到數(shù)據(jù)流的末尾。
1.添加數(shù)據(jù)的默認方法
這是向Redis Streams添加數(shù)據(jù)的最簡單方法:
- XADD mystream * name Anna
- XADD mystream * name Bert
- XADD mystream * name Cathy
在上述命令中,XADD是Redis命令,mystream是數(shù)據(jù)流的名稱,Anna、Bert和Cathy是每一行添加的名稱,而*操作符告訴Redis為每一行自動生成識別符。這個命令得出三個mystream條目:
- 1518951481323-0 name Cathy
- 1518951480723-0 name Bert
- 1518951480106-0 name Anna
2.針對每個條目,為數(shù)據(jù)添加用戶管理的ID
Redis讓你可以為每個條目維護你自己的識別符(見下面)。雖然這在一些情況下很有用,但依賴自動生成的ID通常來得更簡單:
- XADD mystream 10000000 name Anna
- XADD mystream 10000001 name Bert
- XADD mystream 10000002 name Cathy
這得出下列的mystream條目:
- 10000002-0 name Cathy
- 10000001-0 name Bert
- 10000000-0 name Anna
3.為數(shù)據(jù)添加最大限制
你可以為數(shù)據(jù)流設(shè)置條目最大數(shù):
- XADD mystream MAXLEN 1000000 * name Anna
- XADD mystream MAXLEN 1000000 * name Bert
- XADD mystream MAXLEN 1000000 * name Cathy
數(shù)據(jù)流達到1000000個左右條目的長度時,該命令驅(qū)逐舊條目。
一個小貼士:Redis Streams將數(shù)據(jù)存儲在基樹(radix tree)的宏節(jié)點中。每個宏節(jié)點有幾個數(shù)據(jù)項(通常在幾十個左右)。如果添加一個近似的MAXLEN值(如下所示),就沒必要為每次插入處理宏節(jié)點。如果幾十個數(shù)(比如1000000或1000050)對你來說關(guān)系不大,可以用近似字符(~)來調(diào)用命令,從而優(yōu)化性能。
- XADD mystream MAXLEN ~ 1000000 * name Anna
- XADD mystream MAXLEN ~ 1000000 * name Bert
- XADD mystream MAXLEN ~ 1000000 * name Cathy
三、用Redis Streams消費來自數(shù)據(jù)流的數(shù)據(jù)
Redis Streams結(jié)構(gòu)提供了一套豐富的命令和功能,以便以多種方式消費數(shù)據(jù)。
1.從數(shù)據(jù)流的開頭讀取所有內(nèi)容
場景:數(shù)據(jù)流已含有你需要處理的數(shù)據(jù),而且你想從開頭開始處理數(shù)據(jù)。
為此你要使用的命令是XREAD,它讓你可以從數(shù)據(jù)的開頭讀取所有或前N個條目。一條最佳實踐是,逐頁讀取數(shù)據(jù)始終是好主意。想從數(shù)據(jù)流的開頭讀取多達100個條目,命令是:
- XREAD COUNT 100 STREAMS mystream 0
假設(shè)1518951481323-0是你在上一個命令中收到的數(shù)據(jù)項的最后一個ID,你可以運行該命令,檢索下100個條目:
- XREAD COUNT 100 STREAMS mystream 1518951481323-1
2.異步消費數(shù)據(jù)(通過阻塞調(diào)用)
場景:你的消費者消費和處理數(shù)據(jù)的速度比數(shù)據(jù)添加到數(shù)據(jù)流的速度還快。
在許多使用場合下,消費者讀取的速度比生產(chǎn)者向數(shù)據(jù)流添加數(shù)據(jù)的速度還快。這種情況下,你希望消費者等待、新數(shù)據(jù)到達時接到通知。BLOCK選項讓你可以指定等待新數(shù)據(jù)的時長:
- XREAD BLOCK 60000 STREAMS mystream 1518951123456-1
在這里,XREAD返回1518951123456-1之后的所有數(shù)據(jù)。如果之后沒有數(shù)據(jù),查詢將等待N= 60秒,直至新數(shù)據(jù)到達,然后超時中斷。如果你想要無限期地阻止該命令,按如下方式調(diào)用XREAD:
- XREAD BLOCK 0 STREAMS mystream 1518951123456-1
注意:在該示例中,你還可以使用XRANGE命令來逐頁檢索數(shù)據(jù)。
3.只讀取剛到達的新數(shù)據(jù)
場景:你只對處理從當前時間點開始的新數(shù)據(jù)集有興趣。
你反復(fù)讀取數(shù)據(jù)時,從上次停下來的地方重新開始始終是個好主意。比如在前一個示例中,你進行了阻塞調(diào)用以讀取大于1518951123456-1的數(shù)據(jù)。然而,你可能不知道最新的ID。在這種情況下,可以用$符號開始讀取數(shù)據(jù)流,該符號告訴XREAD命令只檢索新數(shù)據(jù)。由于該調(diào)用使用的BLOCK選項是60秒,它將等到數(shù)據(jù)流中有一些數(shù)據(jù)。
- XREAD BLOCK 60000 STREAMS mystream $
這種情況下,你將開始用$選項讀取新數(shù)據(jù)。然而,不該用$選項進行后續(xù)調(diào)用。比如說,如果1518951123456-0是之前調(diào)用中檢索的數(shù)據(jù)的ID,你的下一個調(diào)用應(yīng)該是:
- XREAD BLOCK 60000 STREAMS mystream 1518951123456-1
4.迭代數(shù)據(jù)流以讀取過去的數(shù)據(jù)
場景:你的數(shù)據(jù)流已有足夠的數(shù)據(jù),你想查詢它已分析到目前為止收集的數(shù)據(jù)。
可以分別使用XRANGE和XREVRANGE,以向前或向后的方向讀取兩個條目之間的數(shù)據(jù)。在該示例中,命令讀取1518951123450-0和1518951123460-0之間的數(shù)據(jù):
- XRANGE mystream 1518951123450-0 1518951123460-0
XRANGE還讓你可以借助COUNT選項,限制返回的數(shù)據(jù)項數(shù)量。比如說,下列查詢返回兩個間隔之間的前10個數(shù)據(jù)項。使用該選項,你可以像使用SCAN命令一樣迭代數(shù)據(jù)流:
- XRANGE mystream 1518951123450-0 1518951123460-0 COUNT 10
如果你不知道查詢的上限或下限,可以將下限換成-、將上限換成+。比如說,下列查詢返回從數(shù)據(jù)開頭的前10個數(shù)據(jù)項:
- XRANGE mystream - + COUNT 10
XREVRANGE的語法類似XRANGE,只是下限和上限的順序倒過來。比如說,下列查詢以相反的順序返回數(shù)據(jù)流末尾的前10個數(shù)據(jù)項:
- XREVRANGE mystream + - COUNT 10
5.在多個消費者之間劃分數(shù)據(jù)
場景:消費者消費數(shù)據(jù)的速度遠低于生產(chǎn)者生成數(shù)據(jù)的速度。
在某些情況下,包括圖像處理、深度學習和情感分析,消費者與生產(chǎn)者相比可能很慢。這種情況下,可以通過分散消費者并劃分每個消費者消耗的數(shù)據(jù)的做法,來匹配到達數(shù)據(jù)的速度與消耗數(shù)據(jù)的速度。
使用Redis Streams,你可以利用消費者組來完成此任務(wù)。多個消費者是消費者組的一部分時,Redis Streams將確保每個消費者都收到一組獨有的數(shù)據(jù)。
- XREADGROUP GROUP mygroup consumer1 COUNT 2 STREAMS mystream >
當然,關(guān)于消費者組如何運作還有更多的東西要了解。Redis Streams消費者組旨在劃分數(shù)據(jù)、實現(xiàn)災(zāi)難恢復(fù)并提供事務(wù)數(shù)據(jù)安全性。
如你所見,Redis Streams很容易上手。只需下載并安裝Redis 5.0,然后鉆研該項目網(wǎng)站上的Redis Streams教程。
原文標題:How to use Redis Streams,作者:Roshan Kumar
作者簡介:Roshan Kumar是Redis Labs的資深產(chǎn)品經(jīng)理。他在軟件開發(fā)和技術(shù)營銷方面有著豐富的從業(yè)經(jīng)驗。Roshan曾供職于惠普和多家成功的硅谷初創(chuàng)公司,包括ZillionTV、 Salorix、Alopa和ActiveVideo。
【51CTO譯稿,合作站點轉(zhuǎn)載請注明原文譯者和出處為51CTO.com】