一文帶你了解 Redis 的發(fā)布與訂閱的底層原理
01、前言
發(fā)布訂閱系統(tǒng)在我們?nèi)粘5墓ぷ髦薪?jīng)常會(huì)使用到,這種場(chǎng)景大部分情況我們都是使用消息隊(duì)列的,常用的消息隊(duì)列有 Kafka,RocketMQ,RabbitMQ,每一種消息隊(duì)列都有其特性。這篇文章主要是給大家介紹 Redis 的發(fā)布訂閱系統(tǒng),很多時(shí)候我們可能不需要獨(dú)立部署相應(yīng)的消息隊(duì)列,只是簡(jiǎn)單的使用,而且數(shù)據(jù)量也不會(huì)太大,這種情況下,我們就可以使用 Redis 的 Pub/Sub 模型。
02、使用方式
2.1 發(fā)布與訂閱
Redis 的發(fā)布訂閱功能主要由 PUBLISH,SUBSCRIBE,PSUBSCRIBE 命令組成,一個(gè)或者多個(gè)客戶端訂閱某個(gè)或者多個(gè)頻道,當(dāng)其他客戶端向該頻道發(fā)送消息的時(shí)候,訂閱了該頻道的客戶端都會(huì)收到對(duì)應(yīng)的消息。
上圖中有四個(gè)客戶端,Client 02,Client 03,Client 04 訂閱了同一個(gè) Sport 頻道(Channel),這時(shí)當(dāng) Client 01 向 Sport Channel 發(fā)送消息 “basketball” 的時(shí)候,02-04 這三個(gè)客戶端都同時(shí)收到了這條消息。
整個(gè)過程的執(zhí)行命令如下:
首先開四個(gè) Redis 的客戶端,然后在 Client 02,Client 03,Client 04 中輸入subscribe sport 命令,表示訂閱 sport 這個(gè)頻道
然后在 Client 01 的客戶端中輸入publish sport basketball 表示向 sport 頻道發(fā)送消息 "basketball"
這個(gè)時(shí)候我們?cè)谌タ聪?Client 02-04 的客戶端,可以看到已經(jīng)收到了消息了,每個(gè)訂閱了這個(gè)頻道的客戶端都是一樣的。
這里 Client 02-Client 04 三個(gè)客戶端訂閱了 Sport 頻道,我們叫做訂閱者(subscriber),Client 01 發(fā)布消息,我們叫做發(fā)布者(publisher),發(fā)送的消息就是 message。
2.2、模式訂閱
前面我們看到的是一個(gè)客戶端訂閱了一個(gè) Channel,事實(shí)上單個(gè)客戶端也可以同時(shí)訂閱多個(gè) Channel,采用模式匹配的方式,一個(gè)客戶端可以同時(shí)訂閱多個(gè) Channel。
如上圖 Client 05 通過命令subscribe run 訂閱了 run 頻道,Client 06 通過命令psubscribe run* 訂閱了 run* 匹配的頻道。當(dāng) Client 07 向 run 頻道發(fā)送消息 666 的時(shí)候,05 和 06 兩個(gè)客戶端都收到消息了;接下來 Client 07 向 run1 和 run_sport 兩個(gè)頻道發(fā)送消息的時(shí)候,Client 06 依舊可以收到消息,而 Client 05 就收不到了消息了。
Client 05 訂閱run 頻道和接收到消息:
Client 06 訂閱run* 頻道和接收到消息:
Client 07 向多個(gè)頻道發(fā)送消息:
通過上面的案例,我們學(xué)會(huì)了一個(gè)客戶端可以訂閱單個(gè)或者多個(gè)頻道,分別通過subscribe,psubscribe 命令,客戶端可以通過 publish 發(fā)送相應(yīng)的消息。
在命令行中我們可以用 Ctrl + C 來取消相關(guān)訂閱,對(duì)應(yīng)的命令時(shí) unsubscribe channelName。
03、Pub/Sub 底層存儲(chǔ)結(jié)構(gòu)
3.1、訂閱 Channel
在 Redis 的底層結(jié)構(gòu)中,客戶端和頻道的訂閱關(guān)系是通過一個(gè)字典加鏈表的結(jié)構(gòu)保存的,形式如下:
在 Redis 的底層結(jié)構(gòu)中,Redis 服務(wù)器結(jié)構(gòu)體中定義了一個(gè) pubsub_channels 字典
struct redisServer { //用于保存所有頻道的訂閱關(guān)系 dict *pubsub_channels;}
在這個(gè)字典中,key 代表的是頻道名稱,value 是一個(gè)鏈表,這個(gè)鏈表里面存放的是所有訂閱這個(gè)頻道的客戶端。
所以當(dāng)有客戶端執(zhí)行訂閱頻道的動(dòng)作的時(shí)候,服務(wù)器就會(huì)將客戶端與被訂閱的頻道在 pubsub_channels 字典中進(jìn)行關(guān)聯(lián)。
這個(gè)時(shí)候有兩種情況:
該渠道是首次被訂閱:首次被訂閱說明在字典中并不存在該渠道的信息,那么程序首先要?jiǎng)?chuàng)建一個(gè)對(duì)應(yīng)的 key,并且要賦值一個(gè)空鏈表,然后將對(duì)應(yīng)的客戶端加入到鏈表中。此時(shí)鏈表只有一個(gè)元素。
該渠道已經(jīng)被其他客戶端訂閱過:這個(gè)時(shí)候就直接將對(duì)應(yīng)的客戶端信息添加到鏈表的末尾就好了。
比如,如果有一個(gè)新的客戶端 Client 08 要訂閱 run 渠道,那么上圖就會(huì)變成
如果 Client 08 要訂閱一個(gè)新的渠道 new_sport ,那么就會(huì)變成
整個(gè)訂閱的過程可以采用下面?zhèn)未a來實(shí)現(xiàn)
- Map<String, List<Object>> pubsub_channels = new HashMap<>();
- public void subscribe(String[] subscribeList, Object client) {
- //遍歷所有訂閱的 channel,檢查是否在 pubsub_channels 中,不在則創(chuàng)建新的 key 和空鏈表
- for (int i = 0; i < subscribeList.length; i++) {
- if (!pubsub_channels.containsKey(subscribeList[i])) {
- pubsub_channels.put(subscribeList[i], new ArrayList<>());
- }
- pubsub_channels.get(subscribeList[i]).add(client);
- }
- }
3.2 取消訂閱
上面介紹的是單個(gè) Channel 的訂閱,相反的如果一個(gè)客戶端要取消訂閱相關(guān) Channel,則無非是找到對(duì)應(yīng)的 Channel 的鏈表,從中刪除對(duì)應(yīng)的客戶端,如果該客戶端已經(jīng)是最后一個(gè)了,則將對(duì)應(yīng) Channel 也刪除。
- public void unSubscribe(String[] subscribeList, Object client) {
- //遍歷所有訂閱的 channel,依次刪除
- for (int i = 0; i < subscribeList.length; i++) {
- pubsub_channels.get(subscribeList[i]).remove(client);
- //如果長(zhǎng)度為 0 則清楚 channel
- if (pubsub_channels.get(subscribeList[i]).size() == 0) {
- remove(subscribeList[i]);
- }
- }
- }
04、模式訂閱結(jié)構(gòu)
模式渠道的訂閱與單個(gè)渠道的訂閱類似,不過服務(wù)器是將所有模式的訂閱關(guān)系都保存在服務(wù)器狀態(tài)的pubsub_patterns 屬性里面。
- struct redisServer{
- //保存所有模式訂閱關(guān)系
- list *pubsub_patterns;
- }
與訂閱單個(gè) Channel 不同的是,pubsub_patterns 屬性是一個(gè)鏈表,不是字典。節(jié)點(diǎn)的結(jié)構(gòu)如下:
- struct pubsubPattern{
- //訂閱模式的客戶端
- redisClient *client;
- //被訂閱的模式
- robj *pattern;
- } pubsubPattern;
其實(shí) client 屬性是用來存放對(duì)應(yīng)客戶端信息,pattern 是用來存放客戶端對(duì)應(yīng)的匹配模式。
所以對(duì)應(yīng)上面的 Client-06 模式匹配的結(jié)構(gòu)存儲(chǔ)如下
在pubsub_patterns鏈表中有一個(gè)節(jié)點(diǎn),對(duì)應(yīng)的客戶端是 Client-06,對(duì)應(yīng)的匹配模式是run*。
4.1、訂閱模式
當(dāng)某個(gè)客戶端通過命令psubscribe 訂閱對(duì)應(yīng)模式的 Channel 時(shí)候,服務(wù)器會(huì)創(chuàng)建一個(gè)節(jié)點(diǎn),并將 Client 屬性設(shè)置為對(duì)應(yīng)的客戶端,pattern 屬性設(shè)置成對(duì)應(yīng)的模式規(guī)則,然后添加到鏈表尾部。
對(duì)應(yīng)的偽代碼如下:
- List<PubSubPattern> pubsub_patterns = new ArrayList<>();
- public void psubscribe(String[] subscribeList, Object client) {
- //遍歷所有訂閱的 channel,創(chuàng)建節(jié)點(diǎn)
- for (int i = 0; i < subscribeList.length; i++) {
- PubSubPattern pubSubPattern = new PubSubPattern();
- pubSubPattern.client = client;
- pubSubPattern.pattern = subscribeList[i];
- pubsub_patterns.add(pubSubPattern);
- }
- }
- 創(chuàng)建新節(jié)點(diǎn);
- 給節(jié)點(diǎn)的屬性賦值;
- 將節(jié)點(diǎn)添加到鏈表的尾部;
4.2、退訂模式
退訂模式的命令是punsubscribe,客戶端使用這個(gè)命令來退訂一個(gè)或者多個(gè)模式 Channel。服務(wù)器接收到該命令后,會(huì)遍歷pubsub_patterns鏈表,將匹配到的 client 和 pattern 屬性的節(jié)點(diǎn)給刪掉。這里需要判斷 client 屬性和 pattern 屬性都合法的時(shí)候再進(jìn)行刪除。
偽代碼如下:
- public void punsubscribe(String[] subscribeList, Object client) {
- //遍歷所有訂閱的 channel 相同 client 和 pattern 屬性的節(jié)點(diǎn)會(huì)刪除
- for (int i = 0; i < subscribeList.length; i++) {
- for (int j = 0; j < pubsub_patterns.size(); j++) {
- if (pubsub_patterns.get(j).client == client
- && pubsub_patterns.get(j).pattern == subscribeList[i]) {
- remove(pubsub_patterns);
- }
- }
- }
- }
遍歷所有的節(jié)點(diǎn),當(dāng)匹配到相同 client 屬性和 pattern 屬性的時(shí)候就進(jìn)行節(jié)點(diǎn)刪除。
05、發(fā)布消息
發(fā)布消息比較好容易理解,當(dāng)一個(gè)客戶端執(zhí)行了publish channelName message 命令的時(shí)候,服務(wù)器會(huì)從pubsub_channels和pubsub_patterns 兩個(gè)結(jié)構(gòu)中找到符合channelName 的所有 Channel,進(jìn)行消息的發(fā)送。在 pubsub_channels 中只要找到對(duì)應(yīng)的 Channel 的 key 然后向?qū)?yīng)的 value 鏈表中的客戶端發(fā)送消息就好。
06、總結(jié)
這篇文章主要給大家介紹了一下 Redis 的發(fā)布/訂閱的使用方式和底層的存儲(chǔ)結(jié)構(gòu)以及部分偽代碼的實(shí)現(xiàn),希望對(duì)大家有幫助。