Redis發(fā)布訂閱,右手就行!
哈嘍,大家好,我是了不起。
Redis平常作為緩存使用較多,但是也可以作為發(fā)布訂閱的消息隊列來使用,本篇給大家介紹一下如何簡單使用!右手就能操作
前言
本篇我們會使用Spring Data Redis中集成的發(fā)布訂閱功能來展示這個示例,
先看我們需要的依賴, 其實只需要引入spring-boot-starter-data-redis 就夠了,另外再寫一個接口來觸發(fā)消息發(fā)布。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
Spring Data 為 Redis 提供了專用的消息傳遞集成,其功能和命名與 Spring Framework 中的 JMS 集成類似。
Redis 消息傳遞大致可分為兩個功能領域:
- 消息的發(fā)布或制作
- 消息的訂閱或消費
其中主要的類都在這兩個包下面,感興趣的小伙伴可以去看看,原理就先不講了,下期再安排吧。
org.springframework.data.redis.connection
org.springframework.data.redis.listener
發(fā)布消息
發(fā)布消息我們可以直接使用RedisTemplate的 convertAndSend , 這個方法有兩個參數(shù),分別是channel, 還有消息內(nèi)容。
public Long convertAndSend(String channel, Object message) {
Assert.hasText(channel, "a non-empty channel is required");
byte[] rawChannel = this.rawString(channel);
byte[] rawMessage = this.rawValue(message);
return (Long)this.execute((connection) -> {
return connection.publish(rawChannel, rawMessage);
}, true);
}
本次我們使用如下類來發(fā)布消息。作為示例就要簡單粗暴。
public interface MessagePublisher {
void publish(String message);
}
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
public class RedisMessagePublisher implements MessagePublisher {
private RedisTemplate<String, Object> redisTemplate;
private ChannelTopic topic;
public RedisMessagePublisher() {
}
public RedisMessagePublisher(
RedisTemplate<String, Object> redisTemplate, ChannelTopic topic) {
this.redisTemplate = redisTemplate;
this.topic = topic;
}
public void publish(String message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
訂閱消息
訂閱消息需要實現(xiàn)MessageListener的接口 ,onMessage的方法是收到消息后的消費方法。
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;
@Service
public class RedisMessageSubscriber implements MessageListener {
public void onMessage(Message message, byte[] pattern) {
System.*out*.println("Message received: " + message.toString());
}
}
// 消息訂閱2
@Service("redisMessageSubscriber2")
public class RedisMessageSubscriber2 implements MessageListener {
public void onMessage(Message message, byte[] pattern) {
System.out.println("Message received2: " + message.toString());
}
}
消息監(jiān)聽容器和適配器
另外就是訂閱方訂閱發(fā)布者,SpringDataRedis這里使用了一個消息監(jiān)聽容器和適配器來處理。我們直接貼出代碼:
import com.north.redis.message.MessagePublisher;
import com.north.redis.message.RedisMessagePublisher;
import com.north.redis.message.RedisMessageSubscriber;
import jakarta.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Resource
MessageListener redisMessageSubscriber2;
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
// 使用StringRedisSerializer來序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
// 使用GenericJackson2JsonRedisSerializer來序列化和反序列化redis的value值
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
template.afterPropertiesSet();
return template;
}
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new RedisMessageSubscriber());
}
@Bean
RedisMessageListenerContainer redisContainer() {
RedisMessageListenerContainer container
= new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
container.addMessageListener(messageListener(), topic());
container.addMessageListener(redisMessageSubscriber2, topic());
return container;
}
@Bean
MessagePublisher redisPublisher() {
return new RedisMessagePublisher(redisTemplate(), topic());
}
@Bean
ChannelTopic topic() {
return new ChannelTopic("northQueue");
}
}
以上代碼中有幾個點:
- 創(chuàng)建適配器時,這里面我們使用了MessageListener的實現(xiàn)類,簡單容易理解。
- 使用消息容器來訂閱消息隊列,其中addMessageListener中可以訂閱多個隊列,其中第二個參數(shù)可以傳入隊列名數(shù)組。而且可以添加多個訂閱方。
RedisMessageListenerContainer 是處理消費者和發(fā)布者的關系的類 ,使用起來也比較簡單。
測試
下面我們做一個小測試:
寫一個接口來出發(fā)消息發(fā)布,使用多個訂閱者
@RestController
public class TestController {
@Resource
private MessagePublisher redisMessagePublisher;
@GetMapping("/hello")
public Flux<String> hello(@RequestParam String message) {
redisMessagePublisher.publish(message);
return Flux.*just*("Hello", "Webflux");
}
}
啟動SpringBoot項目后我們發(fā)送消息測試:
圖片
兩個消費者都接到了消息:
圖片