使用 Python 配合 Redis 超越緩存
如果你是一位 Python 開(kāi)發(fā)者, 那么你肯定使用過(guò) Redis , 并且認(rèn)為它是一個(gè)很棒的緩存。 雖然你的印象沒(méi)有錯(cuò), Redis 的確是一個(gè)很棒的緩存, 但使用 Redis 能夠解決的問(wèn)題并不僅限于緩存。
我們將探索 Redis 和 Redis Enterprise 的一些其他用途。 為了找點(diǎn)樂(lè)子, 我將使用之前《 使用 Redis 儲(chǔ)存地理位置數(shù)據(jù) 》一文中的大腳怪(Bigfoot)數(shù)據(jù)。 此外, 由于這篇文章的讀者都是 Python 開(kāi)發(fā)者, 所以我將使用 Python 來(lái)編寫(xiě)本文的所有代碼!
我在接下來(lái)展示的代碼中使用了 aioredis 客戶端庫(kù), 因?yàn)樗鼘?duì) async/await
提供了非常棒的支持。 如果你對(duì) async/await
不熟悉的話, 那么可以去看看 這篇文章 , 里面提到了 async/await
對(duì)提升性能的幫助。
使用 Redis 構(gòu)建隊(duì)列
Redis 提供了字符串、哈希、集合和列表等多種數(shù)據(jù)結(jié)構(gòu)可供使用。 這些數(shù)據(jù)結(jié)構(gòu)都是儲(chǔ)存數(shù)據(jù)的好幫手, 其中列表就可以用作一個(gè)非常棒的隊(duì)列(queue)。
為了將列表用作隊(duì)列, 我們需要使用 RPUSH
將新項(xiàng)目推送至列表末尾, 然后使用 LPOP
或者 BLPOP
將它們從列表的前面彈出。 由于 Redis 對(duì)數(shù)據(jù)庫(kù)的所有修改都是在單個(gè)線程里面完成的, 所以這些操作都是原子的。
作為例子, 下面這段在隊(duì)列里面添加了一些大腳怪的蹤跡。
- import asyncio
- import aioredis
- async def main():
- redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
- await asyncio.gather(
- add_to_queue(redis, 'Possible vocalizations east of Makanda'),
- add_to_queue(redis, 'Sighting near the Columbia River'),
- add_to_queue(redis, 'Chased by a tall hairy creature')
- )
- redis.close()
- await redis.wait_closed()
- def add_to_queue(redis, message):
- return redis.rpush('bigfoot:sightings:received', message)
- asyncio.run(main())
import asyncio 這個(gè)程序非常直接。 我們只需要在第 18 行調(diào)用 redis.rpush
, 就能夠?qū)⒅付ǖ脑赝迫氲疥?duì)列。 接下來(lái)是從隊(duì)列另一端讀取元素的代碼, 同樣非常簡(jiǎn)單。
- import aioredis
- from pprint import pp
- async def main():
- redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
- while True:
- sighting = await redis.blpop('bigfoot:sightings:received')
- pp(sighting)
- asyncio.run(main())
Redis 還有 一些同樣很酷的命令 , 它們不僅可以將列表用作隊(duì)列甚至堆棧。 我最喜歡的是 BRPOPLPUSH
, 它可以從列表的右側(cè)阻塞并彈出一些元素, 然后將被彈出的元素推入到另一個(gè)列表。 你可以使用這個(gè)命令來(lái)將一個(gè)隊(duì)列中的元素傳遞至另一個(gè)隊(duì)列, 這是非常棒的一個(gè)命令。第 11 行和第 12 行的無(wú)限循環(huán)將等待并且打印被推入至隊(duì)列中的大腳怪蹤跡。 這里使用了 redis.blpop
而不是 redis.lpop
, 因?yàn)榍罢呖梢宰枞蛻舳瞬⒌却斜碇械脑胤祷亍?比起讓 Redis 和 Python 代碼之間的網(wǎng)絡(luò)無(wú)休止地輪詢并做無(wú)用功, 讓客戶端阻塞并等待元素出現(xiàn)的做法會(huì)高效得多。
使用 Redis 訂閱和發(fā)送事件
Redis 提供的東西中有些并不是數(shù)據(jù)結(jié)構(gòu), 比如訂閱與發(fā)布(Pub/Sub)特性就是其中之一。 這個(gè)特性就像它的名字一樣, 是一個(gè)內(nèi)置于 Redis 中的發(fā)布與訂閱機(jī)制。 得益于這個(gè)特性, 我們只需要 使用一些命令 就可以在自己的 Python 應(yīng)用里面添加強(qiáng)大的訂閱與發(fā)布機(jī)制。
通過(guò)執(zhí)行訂閱操作可以讓我們發(fā)現(xiàn)事件, 以下是代碼:
- import asyncio
- import aioredis
- from pprint import pp
- async def main():
- redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
- [channel] = await redis.psubscribe('bigfoot:broadcast:channel:*')
- while True:
- message = await channel.get()
- pp(message)
- asyncio.run(main())
用于匹配模式的 redis.psubscribe
函數(shù)和非模式匹配的 redis.subscribe
函數(shù)都返回 Python 列表, 以便包含不定數(shù)量的元素。 程序?qū)⒔鈽?gòu)這個(gè)列表(Python 的術(shù)語(yǔ)是解包)以獲得我想要的通道, 并在之后使用 .get
進(jìn)行阻塞調(diào)用以等待下一條消息。因?yàn)槲蚁胍邮账懈竽_獸有關(guān)的消息, 所以我在這段代碼的第 10 行使用 redis.psubscribe
訂閱了一個(gè) Glob 風(fēng)格的模式, 通過(guò)使用 bigfoot:broadcast:channel:*
作為模式, 客戶端將接收到所有以 bigfoot:broadcast:channel:
開(kāi)頭的事件。
發(fā)布事件非常簡(jiǎn)單, 下面是代碼:
- import asyncio
- import aioredis
- async def main():
- redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
- await asyncio.gather(
- publish(redis, 1, 'Possible vocalizations east of Makanda'),
- publish(redis, 2, 'Sighting near the Columbia River'),
- publish(redis, 2, 'Chased by a tall hairy creature')
- )
- redis.close()
- await redis.wait_closed()
- def publish(redis, channel, message):
- return redis.publish(f'bigfoot:broadcast:channel:{channel}', message)
- asyncio.run(main())
值得注意的是, 發(fā)布與訂閱是一個(gè)發(fā)送即遺忘機(jī)制(fire-and-forget)。 如果代碼發(fā)布了一個(gè)事件但是卻沒(méi)有人監(jiān)聽(tīng), 那么該事件就會(huì)消失。 如果你想讓自己的事件持續(xù)存在, 那么可以考慮使用前面提到的隊(duì)列, 又或者接下來(lái)將要介紹的 Redis 流。這段代碼的重點(diǎn)是第 18 行, 它使用了名字非常直接的 redis.publish
來(lái)講消息發(fā)布至所需的通道。
使用 Redis 儲(chǔ)存數(shù)據(jù)流
除了發(fā)布與訂閱之外, Redis 還可以使用流來(lái)發(fā)布和訂閱事件。 Redis 流 是一個(gè)非常大的話題, 但使用它只需要 掌握少量命令 。 從 Python 來(lái)看, 這些命令的用法都是非常簡(jiǎn)單的, 我將一一向你說(shuō)明。
下面的代碼將把三次大腳獸的目擊事件添加到流里面。
- import asyncio
- import aioredis
- async def main():
- redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
- await asyncio.gather(
- add_to_stream(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
- add_to_stream(redis, 2, 'Sighting near the Columbia River', 'Class A'),
- add_to_stream(redis, 3, 'Chased by a tall hairy creature', 'Class A'))
- redis.close()
- await redis.wait_closed()
- def add_to_stream(redis, id, title, classification):
- return redis.xadd('bigfoot:sightings:stream', {
- 'id': id, 'title': title, 'classification': classification })
- asyncio.run(main())
每個(gè)新添加的流事件都有一個(gè)唯一標(biāo)識(shí)符, 其中包含自 1970 年開(kāi)始的時(shí)間戳(毫秒)和一個(gè)用破折號(hào)連接的序列號(hào)。 例如, 當(dāng)我寫(xiě)這篇文章的時(shí)候, 1970 年 1 月 1 日(Unix紀(jì)元)午夜已經(jīng)過(guò)去了 1,593,120,357,193 毫秒(1.59千兆秒)。 因此當(dāng)我運(yùn)行上面這段代碼的時(shí)候, 命令將創(chuàng)建出 ID 為 1593120357193-0
的事件。這段代碼中最重要的就是第 17 行和第 18 行, 它使用了 redis.xadd
函數(shù)將一次目擊事件的字段添加到流里面。
我們?cè)谔砑邮录臅r(shí)候可以使用 *
來(lái)代替具體的 ID , 這樣 Redis 就會(huì)根據(jù)當(dāng)前時(shí)間來(lái)自動(dòng)生成事件的 ID , 這也是 redis.xadd
函數(shù)的默認(rèn)行為。
正如接下來(lái)的代碼所示, 在讀取流元素的時(shí)候, 我們需要設(shè)置一個(gè)起始 ID 。 你可以看到, 在第 10 行, 程序?qū)⒆兞?nbsp;last_id
設(shè)置成了 0-0
, 這個(gè) ID 代表流的起始位置。
- import asyncio
- import aioredis
- from pprint import pp
- async def main():
- redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf8')
- last_id = '0-0'
- while True:
- events = await redis.xread(['bigfoot:sightings:stream'], timeout=0, count=5, latest_ids=[last_id])
- for key, id, fields in events:
- pp(fields)
- last_id = id
- asyncio.run(main())
程序的第 12 行使用 redis.xread
函數(shù)從流中請(qǐng)求最多 5 個(gè) 0-0
之后的事件。 該調(diào)用將返回一個(gè)列表, 然后程序?qū)?duì)其進(jìn)行循環(huán)和解構(gòu), 以獲得事件的字段和標(biāo)識(shí)符。 事件的標(biāo)識(shí)符會(huì)被儲(chǔ)存起來(lái), 以便將來(lái)調(diào)用 redis.xread
時(shí)可以獲得新的事件并在有需要時(shí)重新讀取之前讀取過(guò)的舊事件
。
將 Redis 用作搜索引擎
Redis 可以通過(guò)模塊(Module)擴(kuò)展來(lái)增加新的命令和功能。 有 大量的模塊 可以用于 AI 模型服務(wù)、圖形數(shù)據(jù)庫(kù)、時(shí)間序列數(shù)據(jù)庫(kù)以及本例中的搜索引擎。
RedisSearch 是一個(gè)強(qiáng)大的搜索引擎, 它攝取數(shù)據(jù)的速度快得驚人。 有些人喜歡用它來(lái)進(jìn)行 瞬時(shí)搜索 , 但除此之外它也可以用來(lái)進(jìn)行其他搜索。 下面是使用該模塊的一個(gè)例子:
- import asyncio
- import aioredis
- from pprint import pp
- async def main():
- redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
- await redis.execute('FT.DROP', 'bigfoot:sightings:search')
- await redis.execute('FT.CREATE', 'bigfoot:sightings:search',
- 'SCHEMA', 'title', 'TEXT', 'classification', 'TEXT')
- await asyncio.gather(
- add_document(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
- add_document(redis, 2, 'Sighting near the Columbia River', 'Class A'),
- add_document(redis, 3, 'Chased by a tall hairy creature', 'Class A'))
- results = await search(redis, 'chase|east')
- pp(results)
- redis.close()
- await redis.wait_closed()
- def add_document(redis, id, title, classification):
- return redis.execute('FT.ADD', 'bigfoot:sightings:search', id, '1.0',
- 'FIELDS', 'title', title, 'classification', classification)
- def search(redis, query):
- return redis.execute('FT.SEARCH', 'bigfoot:sightings:search', query)
- asyncio.run(main())
在擁有了索引之后, 程序就可以向里面添加文檔了, 這一操作發(fā)生在程序的第 27 行和第 28 行, 通過(guò) FT.ADD
命令來(lái)完成。 每個(gè)文檔偶讀需要一個(gè)唯一 ID 、一個(gè)介于 0.0
和 1.0
之間的權(quán)重(rank)以及相應(yīng)的字段。在第 12 和第 13 行, 程序使用 FT.CREATE
創(chuàng)建了一個(gè)索引。 索引需要描述程序?qū)⒁砑拥拿總€(gè)文檔中的字段的模式。 在這個(gè)例子中, 程序需要添加大腳獸的目擊事件, 該文檔包含一個(gè)標(biāo)題和一個(gè)分類, 并且它們都是文本字段。
正如程序的第 31 行所示, 在索引加載文檔之后, 程序就可以使用 FT.SEARCH
命令和具體的查詢語(yǔ)句來(lái)執(zhí)行查詢操作。 第 20 行的特定查詢指示 RedisSearch 在索引中查找包含這些術(shù)語(yǔ)之一的文檔。 在這個(gè)例子中, 該查詢將返回兩個(gè)文檔。
使用 Redis 作為主數(shù)據(jù)庫(kù)
Redis 可以作為一個(gè)速度奇快的內(nèi)存存儲(chǔ)數(shù)據(jù)庫(kù)來(lái)使用。 下面的代碼使用了哈希來(lái)演示這種用法。 哈希是一種非常棒的數(shù)據(jù)結(jié)構(gòu), 它可以建模你想要儲(chǔ)存的記錄類型, 并且能夠?qū)?shù)據(jù)的主鍵用作鍵名的其中一部分。
- import asyncio
- import aioredis
- from pprint import pp
- async def main():
- redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
- await asyncio.gather(
- add_sighting(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
- add_sighting(redis, 2, 'Sighting near the Columbia River', 'Class A'),
- add_sighting(redis, 3, 'Chased by a tall hairy creature', 'Class A'))
- sightings = await asyncio.gather(
- read_sighting(redis, 1),
- read_sighting(redis, 2),
- read_sighting(redis, 3))
- pp(sightings)
- redis.close()
- await redis.wait_closed()
- def add_sighting(redis, id, title, classification):
- return redis.hmset(f'bigfoot:sighting:{id}',
- 'id', id, 'title', title, 'classification', classification)
- def read_sighting(redis, id):
- return redis.hgetall(f'bigfoot:sighting:{id}')
- asyncio.run(main())
你可能會(huì)這樣想”如果我把服務(wù)器關(guān)掉了怎么辦?如果它崩潰了怎么辦?那我就什么數(shù)據(jù)都沒(méi)有了!“ No,不會(huì)的! 你可以修改你的 redis.conf
文件, 用幾種不同的方式來(lái)持久化內(nèi)存中的數(shù)據(jù) 。 此外, 如果你使用的是 Redis Enterprise , 我們也有為你提供 相應(yīng)的解決方案 , 使得你可以直接使用 Redis 而不必?fù)?dān)心持久化的問(wèn)題。
為了方便你親手嘗試這些例子, 我把文中涉及的 所有代碼都放到了 GitHub 上面 , 你可以克隆并開(kāi)始使用它們。 如果你是 Docker 用戶, 項(xiàng)目里面也有一個(gè)名為 start-redis.sh
的 shell 腳本, 它可以拉取一個(gè)鏡像, 然后啟動(dòng)一個(gè)能夠運(yùn)行這些例子的 Redis 版本。
如果你在玩耍完畢之后想要認(rèn)真地構(gòu)建一些軟件, 那么可以注冊(cè)并嘗試 Redis Cloud Essentials 。 它和你所熟悉和喜歡的 Redis 一樣, 唯一的區(qū)別就是這種 Redis 由云端進(jìn)行管理, 所以你只需要專注于構(gòu)建你的軟件即可。