自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

SpringBoot+Redis自定義注解實(shí)現(xiàn)發(fā)布訂閱

數(shù)據(jù)庫(kù) Redis
我們內(nèi)部組件抽象了兩個(gè)方法,生產(chǎn)和消費(fèi),但這兩個(gè)方法邏輯截然不同,生產(chǎn)方法是暴露給serverice層接口調(diào)用,調(diào)用方在調(diào)用生產(chǎn)方法后能直接知道生產(chǎn)了幾條數(shù)據(jù)和成功與否。而消費(fèi)方法是配合Spring生命周期函數(shù)服務(wù)啟動(dòng)時(shí)建立常駐消費(fèi)線程的。

前言

最近開發(fā)了一個(gè)內(nèi)部消息組件,邏輯大體是通過定義注解 @MessageHub,在啟動(dòng)時(shí)掃描全部bean中有使用了該注解的方法后臺(tái)創(chuàng)建一個(gè)常駐線程代理消費(fèi)數(shù)據(jù),當(dāng)線程消費(fèi)到數(shù)據(jù)就回寫到對(duì)應(yīng)加了注解的方法里。

@Slf4j
@Service
public class RedisConsumerDemo {
    @MessageHub(topic = "${uptown.topic}", type = "REDIS_PUBSUB")
    public void consumer(Object message) {
        log.info("pubsub info {} ", message);
    }   
}

實(shí)現(xiàn)redis的隊(duì)列、stream方式實(shí)現(xiàn)都很簡(jiǎn)單,唯獨(dú)發(fā)布訂閱方式,網(wǎng)上的demo全都是一個(gè)固定套路,通過redis容器注入監(jiān)聽器,而且回寫非常死板。那么如何將這塊的邏輯統(tǒng)一呢。

常規(guī)寫法

常規(guī)實(shí)現(xiàn)reids的發(fā)布訂閱模式寫法一共三步

1.創(chuàng)建消息監(jiān)聽器

@Bean 
public MessageListenerAdapter smsExpirationListener(TestSubscriber messageListener) {
    return new MessageListenerAdapter(messageListener, "onMessage");
}

2.創(chuàng)建訂閱器

@Component
public class TestSubscriber implements MessageListener {
 
    @Override
    public void onMessage(Message message, byte[] pattern) {
        log.info("get data :{}", msg);
    }
 
}

3.向redis容器中添加消息監(jiān)聽器

@Configuration
public class RedisConfig {

    @Bean
    public RedisMessageListenerContainer container(
        RedisConnectionFactory redisConnectionFactory,
        MessageListenerAdapter smsExpirationListener) {
    
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        container.addMessageListener(smsExpirationListener, new PatternTopic("test"));
        return container;
    }
}

這樣定義非常簡(jiǎn)單明了,但是有個(gè)問題是太代碼僵硬了,創(chuàng)建監(jiān)聽者很不靈活,只能指定內(nèi)部的onMessage方法,那么怎么才能融入到我們的內(nèi)部消息流轉(zhuǎn)中間件里呢。

自定義注解實(shí)現(xiàn)

我們內(nèi)部組件抽象了兩個(gè)方法,生產(chǎn)和消費(fèi),但這兩個(gè)方法邏輯截然不同,生產(chǎn)方法是暴露給serverice層接口調(diào)用,調(diào)用方在調(diào)用生產(chǎn)方法后能直接知道生產(chǎn)了幾條數(shù)據(jù)和成功與否。而消費(fèi)方法是配合Spring生命周期函數(shù)服務(wù)啟動(dòng)時(shí)建立常駐消費(fèi)線程的。

/**
 * 生產(chǎn)消息
 */
Integer producer(MessageForm messageForm);

/**
 * 消費(fèi)消息
 */
void consumer(ConsumerAdapterForm adapterForm);

生產(chǎn)消息當(dāng)然很容易實(shí)現(xiàn),只需要調(diào)用已經(jīng)封裝好的convertAndSend方法。

stringRedisTemplate.convertAndSend(messageForm.getTopic(), messageForm.getMessage());

消費(fèi)方法就有說法了,動(dòng)態(tài)生成監(jiān)聽者的場(chǎng)景下使用redis容器用代碼挨個(gè)注冊(cè)已經(jīng)滿足不了了,但仔細(xì)過一遍源代碼就會(huì)發(fā)現(xiàn),監(jiān)聽類的構(gòu)造方法的入?yún)⒅挥袃蓚€(gè),第一個(gè)需要回調(diào)的代理類,第二個(gè)消費(fèi)到數(shù)據(jù)后回調(diào)的方法。

/**
 * Create a new {@link MessageListenerAdapter} for the given delegate.
 *
 * @param delegate the delegate object
 * @param defaultListenerMethod method to call when a message comes
 * @see #getListenerMethodName
 */
public MessageListenerAdapter(Object delegate, String defaultListenerMethod) {
   this(delegate);
   setDefaultListenerMethod(defaultListenerMethod);
}

方案有了,本質(zhì)上就是把RedisMessageListenerContainer注入進(jìn)來之后,掃描項(xiàng)目里所有加了 @MessageHub 的bean,包裝成監(jiān)聽類加載到容器里就完事了。

怎么掃描的代碼就不再贅述了,實(shí)現(xiàn)Spring的生命周期函數(shù)BeanPostProcessor#postProcessAfterInitialization,在這里用AnnotationUtils判斷是否標(biāo)注了注解。

MessageHub annotation = AnnotationUtils.findAnnotation(method, MessageHub.class);
if (annotation == null) {
    continue;
}

標(biāo)注了后判斷如果是發(fā)布訂閱,進(jìn)入發(fā)布訂閱的實(shí)現(xiàn)類。

@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
@Service("redisPubSubProcessor")
public class RedisPubSubProcessor extends MessageHubServiceImpl {

    @Resource
    RedisMessageListenerContainer redisPubSubContainer;



    @Override
    public void produce(ProducerAdapterForm producerAdapterForm) {
        stringRedisTemplate.convertAndSend(producerAdapterForm.getTopic(), producerAdapterForm.getMessage());
    }

    @Override
    public void consume(ConsumerAdapterForm messageForm) {
        MessageListenerAdapter adapter = new MessageListenerAdapter(messageForm.getBean(), messageForm.getInvokeMethod().getName());
        adapter.afterPropertiesSet();
        redisPubSubContainer.addMessageListener(adapter, new PatternTopic(messageForm.getTopic()));
    }


    @Bean
    public RedisMessageListenerContainer redisPubSubContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
}

先將RedisMessageListenerContainer注入到Spring容器里,produce方法只需要調(diào)用下現(xiàn)成的api。consume方法由于上一步我們獲取了bean和對(duì)應(yīng)的method,直接用MessageListenerAdapter的構(gòu)造器創(chuàng)建出監(jiān)聽器來,這里有坑,需要手動(dòng)調(diào)用adapter.afterPropertiesSet()設(shè)置一些必要的屬性,這個(gè)在常規(guī)寫法里框架幫忙做了。如果不調(diào)用的話會(huì)出一些空指針之類的bug。

隨后把監(jiān)聽器add到容器就實(shí)現(xiàn)了方法代理,背后的線程監(jiān)聽到數(shù)據(jù)會(huì)回調(diào)到標(biāo)注了 @MessageHub 的方法里

責(zé)任編輯:武曉燕 來源: 一安未來
相關(guān)推薦

2024-10-09 10:46:41

springboot緩存redis

2023-10-09 07:37:01

2023-10-11 07:57:23

springboot微服務(wù)

2023-10-24 13:48:50

自定義注解舉值驗(yàn)證

2021-02-20 11:40:35

SpringBoot占位符開發(fā)技術(shù)

2024-12-27 15:37:23

2024-11-15 10:39:11

2021-12-30 12:30:01

Java注解編譯器

2022-02-17 07:10:39

Nest自定義注解

2017-08-03 17:00:54

Springmvc任務(wù)執(zhí)行器

2022-12-02 07:28:58

Event訂閱模式Spring

2024-10-14 17:18:27

2022-12-13 09:19:06

高并發(fā)SpringBoot

2023-03-03 09:11:12

高并發(fā)SpringBoot

2023-09-04 08:12:16

分布式鎖Springboot

2025-03-05 10:49:32

2024-04-03 09:18:03

Redis數(shù)據(jù)結(jié)構(gòu)接口防刷

2025-02-25 09:29:34

2009-09-07 22:00:15

LINQ自定義

2022-05-18 07:44:13

自定義菜單前端
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)