SpringBoot+Redis自定義注解實(shí)現(xiàn)發(fā)布訂閱
前言
最近開發(fā)了一個(gè)內(nèi)部消息組件,邏輯大體是通過定義注解 @MessageHub,在啟動(dòng)時(shí)掃描全部bean中有使用了該注解的方法后臺(tái)創(chuàng)建一個(gè)常駐線程代理消費(fèi)數(shù)據(jù),當(dāng)線程消費(fèi)到數(shù)據(jù)就回寫到對(duì)應(yīng)加了注解的方法里。
@Slf4j
@Service
public class RedisConsumerDemo {
@MessageHub(topic = "${uptown.topic}", type = "REDIS_PUBSUB")
public void consumer(Object message) {
log.info("pubsub info {} ", message);
}
}
實(shí)現(xiàn)redis的隊(duì)列、stream方式實(shí)現(xiàn)都很簡(jiǎn)單,唯獨(dú)發(fā)布訂閱方式,網(wǎng)上的demo全都是一個(gè)固定套路,通過redis容器注入監(jiān)聽器,而且回寫非常死板。那么如何將這塊的邏輯統(tǒng)一呢。
常規(guī)寫法
常規(guī)實(shí)現(xiàn)reids的發(fā)布訂閱模式寫法一共三步
1.創(chuàng)建消息監(jiān)聽器
@Bean
public MessageListenerAdapter smsExpirationListener(TestSubscriber messageListener) {
return new MessageListenerAdapter(messageListener, "onMessage");
}
2.創(chuàng)建訂閱器
@Component
public class TestSubscriber implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
log.info("get data :{}", msg);
}
}
3.向redis容器中添加消息監(jiān)聽器
@Configuration
public class RedisConfig {
@Bean
public RedisMessageListenerContainer container(
RedisConnectionFactory redisConnectionFactory,
MessageListenerAdapter smsExpirationListener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
container.addMessageListener(smsExpirationListener, new PatternTopic("test"));
return container;
}
}
這樣定義非常簡(jiǎn)單明了,但是有個(gè)問題是太代碼僵硬了,創(chuàng)建監(jiān)聽者很不靈活,只能指定內(nèi)部的onMessage方法,那么怎么才能融入到我們的內(nèi)部消息流轉(zhuǎn)中間件里呢。
自定義注解實(shí)現(xiàn)
我們內(nèi)部組件抽象了兩個(gè)方法,生產(chǎn)和消費(fèi),但這兩個(gè)方法邏輯截然不同,生產(chǎn)方法是暴露給serverice層接口調(diào)用,調(diào)用方在調(diào)用生產(chǎn)方法后能直接知道生產(chǎn)了幾條數(shù)據(jù)和成功與否。而消費(fèi)方法是配合Spring生命周期函數(shù)服務(wù)啟動(dòng)時(shí)建立常駐消費(fèi)線程的。
/**
* 生產(chǎn)消息
*/
Integer producer(MessageForm messageForm);
/**
* 消費(fèi)消息
*/
void consumer(ConsumerAdapterForm adapterForm);
生產(chǎn)消息當(dāng)然很容易實(shí)現(xiàn),只需要調(diào)用已經(jīng)封裝好的convertAndSend方法。
stringRedisTemplate.convertAndSend(messageForm.getTopic(), messageForm.getMessage());
消費(fèi)方法就有說法了,動(dòng)態(tài)生成監(jiān)聽者的場(chǎng)景下使用redis容器用代碼挨個(gè)注冊(cè)已經(jīng)滿足不了了,但仔細(xì)過一遍源代碼就會(huì)發(fā)現(xiàn),監(jiān)聽類的構(gòu)造方法的入?yún)⒅挥袃蓚€(gè),第一個(gè)需要回調(diào)的代理類,第二個(gè)消費(fèi)到數(shù)據(jù)后回調(diào)的方法。
/**
* Create a new {@link MessageListenerAdapter} for the given delegate.
*
* @param delegate the delegate object
* @param defaultListenerMethod method to call when a message comes
* @see #getListenerMethodName
*/
public MessageListenerAdapter(Object delegate, String defaultListenerMethod) {
this(delegate);
setDefaultListenerMethod(defaultListenerMethod);
}
方案有了,本質(zhì)上就是把RedisMessageListenerContainer注入進(jìn)來之后,掃描項(xiàng)目里所有加了 @MessageHub 的bean,包裝成監(jiān)聽類加載到容器里就完事了。
怎么掃描的代碼就不再贅述了,實(shí)現(xiàn)Spring的生命周期函數(shù)BeanPostProcessor#postProcessAfterInitialization,在這里用AnnotationUtils判斷是否標(biāo)注了注解。
MessageHub annotation = AnnotationUtils.findAnnotation(method, MessageHub.class);
if (annotation == null) {
continue;
}
標(biāo)注了后判斷如果是發(fā)布訂閱,進(jìn)入發(fā)布訂閱的實(shí)現(xiàn)類。
@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
@Service("redisPubSubProcessor")
public class RedisPubSubProcessor extends MessageHubServiceImpl {
@Resource
RedisMessageListenerContainer redisPubSubContainer;
@Override
public void produce(ProducerAdapterForm producerAdapterForm) {
stringRedisTemplate.convertAndSend(producerAdapterForm.getTopic(), producerAdapterForm.getMessage());
}
@Override
public void consume(ConsumerAdapterForm messageForm) {
MessageListenerAdapter adapter = new MessageListenerAdapter(messageForm.getBean(), messageForm.getInvokeMethod().getName());
adapter.afterPropertiesSet();
redisPubSubContainer.addMessageListener(adapter, new PatternTopic(messageForm.getTopic()));
}
@Bean
public RedisMessageListenerContainer redisPubSubContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
先將RedisMessageListenerContainer注入到Spring容器里,produce方法只需要調(diào)用下現(xiàn)成的api。consume方法由于上一步我們獲取了bean和對(duì)應(yīng)的method,直接用MessageListenerAdapter的構(gòu)造器創(chuàng)建出監(jiān)聽器來,這里有坑,需要手動(dòng)調(diào)用adapter.afterPropertiesSet()設(shè)置一些必要的屬性,這個(gè)在常規(guī)寫法里框架幫忙做了。如果不調(diào)用的話會(huì)出一些空指針之類的bug。
隨后把監(jiān)聽器add到容器就實(shí)現(xiàn)了方法代理,背后的線程監(jiān)聽到數(shù)據(jù)會(huì)回調(diào)到標(biāo)注了 @MessageHub 的方法里