面試被問到Redis實現發(fā)布與訂閱,手摸手教
簡介
Redis發(fā)布與發(fā)布功能(Pub/Sub)是基于事件座位基本的通信機制,是目前應用比較普遍的通信模型,它的目的主要是解除消息的發(fā)布者與訂閱者之間的耦合關系。
Redis作為消息發(fā)布和訂閱之間的服務器,起到橋梁的作用,在Redis里面有一個channel的概念,也就是頻道,發(fā)布者通過指定發(fā)布到某個頻道,然后只要有訂閱者訂閱了該頻道,該消息就會發(fā)送給訂閱者,原理圖如下所示:
Redis同時也可以使用list類型實現消息隊列(消息隊列的實現以及應用場景會在下一篇文章繼續(xù)講解)。
Redis的發(fā)布與訂閱的功能應用還是比較廣泛的,它的應用場景有很多。比如:最常見的就是實現實時聊天的功能,還是有就是博客的粉絲文章的推送,當博主推送原創(chuàng)文章的時候,就會將文章實時推送給博主的粉絲。
簡介完Redis的發(fā)布于訂閱功能,下面就要來實操一下,包括linux命令的實操和java代碼的實現。
命令實操
這里就假設各位讀者都已經安裝好自己的虛擬機環(huán)境和Redis了,若是沒有安裝好的,可以參考這一篇博文:https://www.cnblogs.com/zuidongfeng/p/8032505.html
我這里是已經安裝好了Redis了,直接啟動我們的Redis,我已經設置好了開機啟動,上面的那篇博文有講解怎么設置開機啟動。
發(fā)布消息
Redis中發(fā)布消息的命令是publish,具體使用如下所示:
PUBLISH test "haha":test表示頻道的名稱,haha表示發(fā)布的內容,這樣就完成了一個一個消息的發(fā)布,后面的返回(integer)0表示0人訂閱。
訂閱頻道
于此同時再啟動一個窗口,這個窗口作為訂閱者,訂閱者的命令subscribe,使用SUBSCRIBE test就表示訂閱了test這個頻道
訂閱后返回的結果中由三條信息,第一個表示類型、第二個表示訂閱的頻道,第三個表示訂閱的數量。接著在第一個窗口進行發(fā)布消息:
可以看到發(fā)布者發(fā)布的消息,訂閱者都會實時的接收到,并發(fā)訂閱者收到的信息中也會出現三條信息,分別表示:返回值的類型、頻道名稱、消息內容。
取消訂閱
若是想取消之前的訂閱可以使用unsubscribe命令,格式為:
- unsubscribe 頻道名稱
- // 取消之前訂閱的test頻道
- unsubscribe test
輸入命令后,返回以下結果:
- [root@pinyoyougou-docker src]# ./redis-cli
- 127.0.0.1:6379> UNSUBSCRIBE test
- 1) "unsubscribe"
- 2) "test"
- 3) (integer) 0
它分別表示:返回值的類型、頻道的名稱、該頻道訂閱的數量。
按模式訂閱
除了直接以特定的名城進行訂閱,還可以按照模式進行訂閱,模式的方式進行訂閱可以一次訂閱多個頻道,按照模式進行訂閱的命令為psubscribe,具體格式如下:
- psubscribe 模式
- // 表示訂閱名稱以ldc開頭的頻道
- psubscribe ldc*
輸入上面的命令后,返回如下結果:
- 127.0.0.1:6379> PSUBSCRIBE ldc*
- Reading messages... (press Ctrl-C to quit)
- 1) "psubscribe"
- 2) "ldc*"
- 3) (integer) 1
這個也是非常簡單,分別表示:返回的類型(表示按模式訂閱類型)、訂閱的模式、訂閱數。
取消按模式訂閱
假如你想取消之前的按模式訂閱,可以使用punsubscribe來取消,具體格式:
- punsubscribe 模式
- // 取消頻道名稱按照ldc開頭的頻道
- punsubscribe ldc*
他的返回值,如下所示:
- 127.0.0.1:6379> PUNSUBSCRIBE ldc*
- 1) "punsubscribe"
- 2) "ldc*"
- 3) (integer) 0
這個就不多說了,表示的意思和上面的一樣,可以看到上面的命令都是有規(guī)律的訂閱SUBSCRIBE,取消就是UNSUBSCRIBE,前面加前綴UN,按模式訂閱也是。
查看訂閱消息
(1)你想查看某一個模式下訂閱數是大于零的頻道,可以使用如下格式的命令進行操作:
- pubsub channels 模式
- // 查看頻道名稱以ldc模式開頭的訂閱數大于零的頻道
- pubsub channels ldc*
(2)假如你想查看某一個頻道的訂閱數,可以使用如下命令:
- pubsub numsub 頻道名稱
(3)查看按照模式的訂閱數,可以使用如下命令進行操作:
- pubsub numpat
到這里以上的命令操作就基本結束了,下面就來代碼實戰(zhàn)。
代碼實練
(1)首先第一步想要操作Redis,再SpringBoot項目中引入jedis的依賴,畢竟jedis是官方推薦使用操作Redis的工具。
- <dependency>
- <groupId>redis.clients</groupId>
- <artifactId>jedis</artifactId>
- <version>2.9.0</version>
- </dependency>
(2)然后創(chuàng)建發(fā)布者Publisher,用于消息的發(fā)布,具體代碼如下:
- package com.ldc.org.myproject.demo.redis;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import redis.clients.jedis.Jedis;
- import redis.clients.jedis.JedisPool;
- /**
- * 發(fā)布者
- * @author liduchang
- *
- */
- public class Publisher extends Thread{
- // 連接池
- private final JedisPool jedisPool;
- // 發(fā)布頻道名稱
- private String name;
- public Publisher(JedisPool jedisPool, String name) {
- super();
- this.jedisPool = jedisPool;
- this.name = name;
- }
- @Override
- public void run() {
- // 獲取要發(fā)布的消息
- BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
- // 獲取連接
- Jedis resource = jedisPool.getResource();
- while (true) {
- String message = null;
- try {
- message = reader.readLine();
- if (!"exit".equals(message)) {
- // 發(fā)布消息
- resource.publish(name, "發(fā)布者:"+Thread.currentThread().getName()+"發(fā)布消息:"+message);
- } else {
- break;
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
(3)接著創(chuàng)建訂閱類Subscriber,并且繼承JedisPubSub 類,重寫onMessage、onSubscribe、onUnsubscribe三個方法,這三個方法的調用時機在注釋上都有說明,具體的實現代碼如下:
- package com.ldc.org.myproject.demo.redis;
- import com.fasterxml.jackson.core.sym.Name;
- import redis.clients.jedis.JedisPubSub;
- /**
- * 訂閱者
- * @author liduchang
- */
- public class Subscriber extends JedisPubSub {
- //訂閱頻道名稱
- private String name;
- public Subscriber(String name) {
- this.name = name;
- }
- /**
- * 訂閱者收到消息時會調用
- */
- @Override
- public void onMessage(String channel, String message) {
- // TODO Auto-generated method stub
- super.onMessage(channel, message);
- System.out.println("頻道:"+channel+" 接受的消息為:"+message);
- }
- /**
- * 訂閱了頻道會被調用
- */
- @Override
- public void onSubscribe(String channel, int subscribedChannels) {
- System.out.println("訂閱了頻道:"+channel+" 訂閱數為:"+subscribedChannels);
- }
- /**
- * 取消訂閱頻道會被調用
- */
- @Override
- public void onUnsubscribe(String channel, int subscribedChannels) {
- System.out.println("取消訂閱的頻道:"+channel+" 訂閱的頻道數量為:"+subscribedChannels);
- }
- }
(4)這次創(chuàng)建的才是真正的訂閱者SubThread,上面的Subscriber是指為了測試實訂閱的時候或者發(fā)布消息,能夠有信息輸出:
- package com.ldc.org.myproject.demo.redis;
- import redis.clients.jedis.Jedis;
- import redis.clients.jedis.JedisPool;
- /**
- * 訂閱者線程
- * @author liduchang
- *
- */
- public class SubThread extends Thread {
- private final JedisPool jedisPool;
- private final Subscriber subscriber;
- private String name;
- public SubThread(JedisPool jedisPool,Subscriber subscriber,String name) {
- super();
- this.jedisPool = jedisPool;
- this.subscriber = subscriber;
- this.name = name;
- }
- @Override
- public void run() {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- // 訂閱頻道為name
- jedis.subscribe(subscriber, name);
- } catch (Exception e) {
- System.err.println("訂閱失敗");
- e.printStackTrace();
- } finally {
- if (jedis!=null) {
- // jedis.close();
- //歸還連接到redis池中
- jedisPool.returnResource(jedis);
- }
- }
- }
- }
(5)后面就是測試了,分別測試發(fā)布與訂閱的測試,發(fā)布者為TestPublisher,訂閱者為TestSubscriber:
- package com.ldc.org.myproject.demo.redis;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
- import redis.clients.jedis.JedisPool;
- public class TestPublisher {
- public static void main(String[] args) throws InterruptedException {
- JedisPool jedisPool = new JedisPool("192.168.163.155");
- // 向ldc頻道發(fā)布消息
- Publisher publisher = new Publisher(jedisPool, "ldc");
- publisher.start();
- }
- }
訂閱者
- package com.ldc.org.myproject.demo.redis;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
- import redis.clients.jedis.JedisPool;
- public class TestSubscriber1 {
- public static void main(String[] args) throws InterruptedException {
- JedisPool jedisPool = new JedisPool("192.168.163.155",6379);
- Subscriber subscriber = new Subscriber("黎杜");
- // 訂閱ldc頻道
- SubThread thread= new SubThread(jedisPool, subscriber, "ldc");
- thread.start();
- Thread.sleep(600000);
- // 取消訂閱
- subscriber.unsubscribe("ldc");
- }
- }
這里為了測試方便就直接創(chuàng)建線程的方式,更好的話可以使用線程池的方式通過線程池的submit方法來執(zhí)行線程,若是不用了可以使用shutdown方式關閉。