?前提
公司在做一些金融相關(guān)業(yè)務,某些時候由于數(shù)據(jù)提供商定期維護或者特殊原因需要暫停某些服務的消費者。之前選用的消息隊列技術(shù)棧是RabbitMQ?,用于微服務之間的消息投遞,對于這類需要暫停消費者的場景是選用注釋掉消費者Bean?中的相應Spring(Boot)?注解重新發(fā)布來實現(xiàn),后面需要重新啟動消費就是解開對應的注釋再發(fā)布一次。這樣的處理流程既繁瑣,也顯得沒有技術(shù)含量,所以筆者就這個問題結(jié)合已有的配置中心Nacos?集群做了一個方案,使用Nacos?的配置準實時刷新功能去控制某個微服務實例的所有RabbitMQ消費者(容器)的停止和啟動。

spring-boot-rabbit-nacos-control-1
方案原理
下面探討一下方案的原理和可行性,主要包括:
- RabbitMQ消費者生命周期管理
- Nacos長輪詢與配置刷新
因為工作中的主要技術(shù)棧是SpringBoot? + RabbitMQ?,下文是探討場景針對spring-boot-starter-amqp?(下面簡稱amqp)展開。
使用SpringBoot版本為2.3.0.RELEASE,spring-cloud-alibaba-nacos-config的版本為2.2.0.RELEASE
RabbitMQ消費者生命周期管理
查看RabbitAnnotationDrivenConfiguration的源碼:
spring-boot-rabbit-nacos-control-2
amqp?中默認啟用spring.rabbitmq.listener.type=simple?,使用的RabbitListenerContainerFactory?(消息監(jiān)聽器容器工廠)實現(xiàn)為SimpleRabbitListenerContainerFactory?,使用的MessageListenerContainer?(消息監(jiān)聽器容器)實現(xiàn)為SimpleMessageListenerContainer?。在amqp?中,無論注解聲明式或者編程式注冊的消費者最終都會封裝為MessageListenerContainer?實例,因此消費者生命周期可以直接通過MessageListenerContainer?進行管理,MessageListenerContainer?的生命周期管理API?會直接作用于最底層的真實消費者實現(xiàn)BlockingQueueConsumer。幾者的關(guān)系如下:

spring-boot-rabbit-nacos-control-3
一般聲明式消費者注冊方式如下:
@Slf4j
@RabbitListener(id = "SingleAnnoMethodDemoConsumer", queues = "srd->srd.demo")
@Component
public class SingleAnnoMethodDemoConsumer {
@RabbitHandler
public void onMessage(Message message) {
log.info("SingleAnnoMethodDemoConsumer.onMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
}
}
@RabbitListener(id = "MultiAnnoMethodDemoConsumer", queues = "srd->srd.demo")
@Component
@Slf4j
public class MultiAnnoMethodDemoConsumer {
@RabbitHandler
public void firstOnMessage(Message message) {
log.info("MultiAnnoMethodDemoConsumer.firstOnMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
}
@RabbitHandler
public void secondOnMessage(Message message) {
log.info("MultiAnnoMethodDemoConsumer.secondOnMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
}
}
@Component
@Slf4j
public class MultiAnnoInstanceDemoConsumer {
@RabbitListener(id = "MultiAnnoInstanceDemoConsumer-firstOnInstanceMessage", queues = "srd->srd.demo")
public void firstOnInstanceMessage(Message message) {
log.info("MultiAnnoInstanceDemoConsumer.firstOnInstanceMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
}
@RabbitListener(id = "MultiAnnoInstanceDemoConsumer-secondOnInstanceMessage", queues = "srd->srd.sec")
public void secondOnInstanceMessage(Message message) {
log.info("MultiAnnoInstanceDemoConsumer.secondOnInstanceMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
}
}
對于基于@RabbitListener?進行聲明式注冊的消費者,每個被@RabbitListener?修飾的Bean?或者方法最終都會單獨生成一個SimpleMessageListenerContainer?實例,這些SimpleMessageListenerContainer?實例的唯一標識由@RabbitListener的id?屬性指定,缺省值為org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#N?,建議在使用時候通過規(guī)范約束必須定義此id?屬性。分析源碼可以得知這類型的消費者通過RabbitListenerAnnotationBeanPostProcessor?進行發(fā)現(xiàn)和自動注冊,并且在RabbitListenerEndpointRegistry?緩存了注冊信息,因此可以通過RabbitListenerEndpointRegistry直接獲取這些聲明式的消費者容器實例:
RabbitListenerEndpointRegistry endpointRegistry = configurableListableBeanFactory.getBean(
RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
RabbitListenerEndpointRegistry.class);
Set<String> listenerContainerIds = endpointRegistry.getListenerContainerIds();
for (String containerId : listenerContainerIds) {
MessageListenerContainer messageListenerContainer = endpointRegistry.getListenerContainer(containerId);
// do something with messageListenerContainer
}
一般編程式消費者注冊方式如下:
// MessageListenerDemoConsumer
@Component
@Slf4j
public class MessageListenerDemoConsumer implements MessageListener {
@Override
public void onMessage(Message message) {
log.info("MessageListenerDemoConsumer.onMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
}
}
// CustomMethodDemoConsumer
@Component
@Slf4j
public class CustomMethodDemoConsumer {
public void customOnMessage(Message message) {
log.info("CustomMethodDemoConsumer.customOnMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
}
}
// configuration class
// 通過現(xiàn)存的MessageListener實例進行消費
@Bean
public SimpleMessageListenerContainer messageListenerDemoConsumerContainer(
ConnectionFactory connectionFactory,
@Qualifier("messageListenerDemoConsumer") MessageListener messageListener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setListenerId("MessageListenerDemoConsumer");
container.setConnectionFactory(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setQueueNames("srd->srd.demo");
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setPrefetchCount(10);
container.setAutoStartup(true);
container.setMessageListener(messageListener);
return container;
}
// 通過IOC容器中某個Bean的具體方法進行消費
@Bean
public SimpleMessageListenerContainer customMethodDemoConsumerContainer(
ConnectionFactory connectionFactory,
CustomMethodDemoConsumer customMethodDemoConsumer) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setListenerId("CustomMethodDemoConsumer");
container.setConnectionFactory(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setQueueNames("srd->srd.demo");
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setPrefetchCount(10);
container.setAutoStartup(true);
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
messageListenerAdapter.setDelegate(customMethodDemoConsumer);
messageListenerAdapter.setDefaultListenerMethod("customOnMessage");
container.setMessageListener(messageListenerAdapter);
return container;
}
編程式注冊的SimpleMessageListenerContainer?可以直接從IOC容器中獲?。?/p>
Map<String, MessageListenerContainer> messageListenerContainerBeans
= configurableListableBeanFactory.getBeansOfType(MessageListenerContainer.class);
if (!CollectionUtils.isEmpty(messageListenerContainerBeans)) {
messageListenerContainerBeans.forEach((beanId, messageListenerContainer) -> {
// do something with messageListenerContainer
});
}
至此,我們知道可以比較輕松地拿到服務中所有的MessageListenerContainer的實例,從而可以管理服務內(nèi)所有消費者的生命周期。
Nacos長輪詢與配置刷新
Nacos?的客戶端通過LongPolling?(長輪詢)的方式監(jiān)聽Nacos?服務端集群對應dataId和group?的配置數(shù)據(jù)變更,具體可以參考ClientWorker的源碼實現(xiàn),實現(xiàn)的過程大致如下:

spring-boot-rabbit-nacos-control-4
在非Spring(Boot)?體系中,可以通過ConfigService#addListener()進行配置變更監(jiān)聽,示例代碼如下:
Properties properties = new Properties();
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
properties.put(PropertyKeyConst.NAMESPACE, "LOCAL");
ConfigService configService = NacosFactory.createConfigService(properties);
Executor executor = Executors.newSingleThreadExecutor(runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName("NacosConfigSyncWorker");
return thread;
});
configService.addListener("application-aplha.properties", "customer-service", new Listener() {
@Override
public Executor getExecutor() {
return executor;
}
@Override
public void receiveConfigInfo(String configInfo) {
// do something with 'configInfo'
}
});
這種LongPolling?的方式目前來看可靠性是比較高,因為Nacos?服務端集群一般在生產(chǎn)部署是大于3?的奇數(shù)個實例節(jié)點,并且底層基于raft共識算法實現(xiàn)集群通訊,只要不是同一時間超過半數(shù)節(jié)點宕機集群還是能正常提供服務。但是從實現(xiàn)上來看會有一些局限性:
- 如果注冊過多的配置變更監(jiān)聽器有可能會對Nacos服務端造成比較大的壓力,畢竟是多個客戶端進行輪詢
- 配置變更是由Nacos客戶端向Nacos服務端發(fā)起請求,因此監(jiān)聽器回調(diào)有可能不是實時的(有可能延遲到客戶端下一輪的LongPolling提交)
- Nacos?客戶端會緩存每次從Nacos服務端拉取的配置內(nèi)容,如果要變更配置文件過大有可能導致緩存的數(shù)據(jù)占用大量內(nèi)存,影響客戶端所在服務的性能
關(guān)于配置變更監(jiān)聽其實有其他候選的方案,例如Redis的發(fā)布訂閱,Zookeeper的節(jié)點路徑變更監(jiān)聽甚至是使用消息隊列進行通知,本文使用Nacos配置變更監(jiān)聽的原因是更好的劃分不同應用配置文件的編輯查看權(quán)限方便進行管理,其他候選方案要實現(xiàn)分權(quán)限管理需要二次開發(fā)
使用SpringCloudAlibaba?提供的spring-cloud-alibaba-nacos-config?可以更加簡便地使用Nacos?配置刷新監(jiān)聽,并且會把變更的PropertySource?重新綁定到對應的配置屬性Bean。引入依賴:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
</dependency>
具體的配置類是NacosConfigProperties:

spring-boot-rabbit-nacos-control-5
紅圈中是需要關(guān)注的配置項,refreshEnabled?是配置刷新的開關(guān),默認是開啟的。sharedConfigs和extensionConfigs?雖然命名不同,但是兩者實現(xiàn)和功能沒有差異,都是類似于共享或者說擴展配置,每個共享(擴展)配置支持單獨配置刷新開關(guān)。舉個例子,在Nacos服務端的某個配置如下圖:

spring-boot-rabbit-nacos-control-6
為了支持配置變更和對應的實體類成員變量更新,對應客戶端的配置文件是這樣的:
spring.cloud.nacos.config.refresh-enabled=true
spring.cloud.nacos.config.shared-configs[0].data-id=shared.properties
spring.cloud.nacos.config.shared-configs[0].group=shared-conf
spring.cloud.nacos.config.shared-configs[0].refresh=true
對應的配置屬性Bean如下:
@Data
@ConfigurationProperties(prefix = "shared")
public class SharedProperties {
private String foo;
}
只要客戶端所在SpringBoot?服務啟動完成后,修改Nacos?服務端對應dataId為shared.properties的shared.foo?屬性值,那邊SharedProperties的foo?屬性就會準實時刷新??梢栽赟haredProperties?添加一個@PostConstruct來觀察這個屬性更新的過程:
@Slf4j
@Data
@ConfigurationProperties(prefix = "shared")
public class SharedProperties {
private final AtomicBoolean firstInit = new AtomicBoolean();
private String foo;
@PostConstruct
public void postConstruct() {
if (!firstInit.compareAndSet(false, true)) {
log.info("SharedProperties refresh...");
} else {
log.info("SharedProperties first init...");
}
}
}
方案實施
整個方案實施包括下面幾步:
- 配置變更通知與配置類刷新
- 發(fā)現(xiàn)所有消費者容器
- 管理消費者容器生命周期
初始化一個Maven項目,引入下面的依賴:
- org.projectlombok:lombok:1.18.12
- org.springframework.boot:spring-boot-starter-web:2.3.0.RELEASE
- org.springframework.boot:spring-boot-starter-amqp:2.3.0.RELEASE
- com.alibaba.cloud:spring-cloud-alibaba-nacos-config:2.2.0.RELEASE
- com.alibaba.nacos:nacos-client:1.4.4
下載Nacos?服務并且啟動一個單機實例(當前2023-02?的最新穩(wěn)定版為2.2.0?),新建命名空間LOCAL并且添加四份配置文件:

spring-boot-rabbit-nacos-control-7
可以使用1.x的Nacos客戶端去連接2.x的Nacos服務端,這個是Nacos做的向下兼容,反過來不行
前文提到的Nacos?客戶端中,ConfigService?是通過dataId和group?定位到具體的配置文件,一般dataId?按照配置文件的內(nèi)容命名,對于SpringBoot?的應用配置文件一般命名為application-{profile}.[properties,yml],group?是配置文件的分組,對于SpringBoot?的應用配置文件一般命名為{??spring.application.name???}?。筆者在在這份SpringBoot?的應用配置文件中只添加了RabbitMQ的配置:

spring-boot-rabbit-nacos-control-8
確保本地或者遠程有一個可用的RabbitMQ服務,接下來往下開始實施方案。
配置變更通知與配置類刷新
前面已經(jīng)提到過SpringBoot?結(jié)合Nacos?進行配置屬性Bean?的成員變量刷新,在項目的Classpath(resources?文件夾)添加bootstrap.properties文件,內(nèi)容如下:
spring.application.name=rabbitmq-rocketmq-demo
spring.profiles.active=default
# nacos配置
spring.cloud.nacos.config.enabled=true
spring.cloud.nacos.config.server-addr=127.0.0.1:8848
spring.cloud.nacos.config.namespace=LOCAL
spring.cloud.nacos.config.group=rabbitmq-rocketmq-demo
spring.cloud.nacos.config.prefix=application
spring.cloud.nacos.config.file-extensinotallow=properties
spring.cloud.nacos.config.refresh-enabled=true
spring.cloud.nacos.config.shared-configs[0].data-id=shared.properties
spring.cloud.nacos.config.shared-configs[0].group=shared-conf
spring.cloud.nacos.config.shared-configs[0].refresh=true
spring.cloud.nacos.config.extension-configs[0].data-id=extension.properties
spring.cloud.nacos.config.extension-configs[0].group=extension-conf
spring.cloud.nacos.config.extension-configs[0].refresh=true
spring.cloud.nacos.config.extension-configs[1].data-id=rabbitmq-toggle.properties
spring.cloud.nacos.config.extension-configs[1].group=rabbitmq-rocketmq-demo
spring.cloud.nacos.config.extension-configs[1].refresh=true
這里profile?定義為default?也就是會關(guān)聯(lián)到Nacos中dataId = 'application.properties', group = 'rabbitmq-rocketmq-demo'?那份配置文件,主要是用于定義amqp?需要的配置屬性。對于RabbitMQ?消費者的開關(guān),定義在dataId = 'rabbitmq-toggle.properties', group = 'rabbitmq-rocketmq-demo'?的文件中。添加RabbitmqToggleProperties:
// RabbitmqToggleProperties
@Slf4j
@Data
@ConfigurationProperties(prefix = "rabbitmq.toggle")
public class RabbitmqToggleProperties {
private final AtomicBoolean firstInit = new AtomicBoolean();
private List<RabbitmqConsumer> consumers;
@PostConstruct
public void postConstruct() {
if (!firstInit.compareAndSet(false, true)) {
StaticEventPublisher.publishEvent(new RabbitmqToggleRefreshEvent(this));
log.info("RabbitmqToggleProperties refresh, publish RabbitmqToggleRefreshEvent...");
} else {
log.info("RabbitmqToggleProperties first init...");
}
}
@Data
public static class RabbitmqConsumer {
private String listenerId;
private Integer concurrentConsumers;
private Integer maxConcurrentConsumers;
private Boolean enable;
}
}
// RabbitmqToggleRefreshEvent
@Getter
public class RabbitmqToggleRefreshEvent extends ApplicationEvent {
private final RabbitmqToggleProperties rabbitmqToggleProperties;
public RabbitmqToggleRefreshEvent(RabbitmqToggleProperties rabbitmqToggleProperties) {
super("RabbitmqToggleRefreshEvent");
this.rabbitmqToggleProperties = rabbitmqToggleProperties;
}
}
// StaticEventPublisher
public class StaticEventPublisher {
private static ApplicationEventPublisher PUBLISHER = null;
public static void publishEvent(ApplicationEvent applicationEvent) {
if (Objects.nonNull(PUBLISHER)) {
PUBLISHER.publishEvent(applicationEvent);
}
}
public static void attachApplicationEventPublisher(ApplicationEventPublisher publisher) {
PUBLISHER = publisher;
}
}
這里prefix?定義為rabbitmq.toggle?,為了和rabbitmq-toggle.properties?的屬性一一綁定,該文件中的配置Key?必須以rabbitmq.toggle?為前綴。RabbitmqToggleProperties?首次回調(diào)@PostConstruct?方法只打印初始化日志,再次回調(diào)@PostConstruct?方法則發(fā)布RabbitmqToggleRefreshEvent?事件,用于后面通知對應的消費者容器Bean進行啟停。
發(fā)現(xiàn)所有消費者容器
為了統(tǒng)一管理服務中所有消費者容器Bean?,需要定義一個類似于消費者容器注冊或者緩存中心類,緩存Key?可以考慮使用listenerId,Value?就直接使用MessageListenerContainer實例即可:
private final ConcurrentMap<String, MessageListenerContainer> containerCache = Maps.newConcurrentMap();
這里既然選定了listenerId作為緩存的Key,那么必須定義好規(guī)范,要求無論注解聲明式定義的消費者還是編程式定義的消費者,必須明確指定具體意義的listenerId,否則到時候存在Key的格式為org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#N會比較混亂
接下來發(fā)現(xiàn)和緩存所有消費者容器:
private ConfigurableListableBeanFactory configurableListableBeanFactory;
private ApplicationEventPublisher applicationEventPublisher;
// ----------------------------------------------------------------------
// 獲取聲明式消費者容器
RabbitListenerEndpointRegistry endpointRegistry = configurableListableBeanFactory.getBean(
RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
RabbitListenerEndpointRegistry.class);
Set<String> listenerContainerIds = endpointRegistry.getListenerContainerIds();
for (String containerId : listenerContainerIds) {
MessageListenerContainer messageListenerContainer = endpointRegistry.getListenerContainer(containerId);
containerCache.putIfAbsent(containerId, messageListenerContainer);
}
// 獲取編程式消費者容器
Map<String, MessageListenerContainer> messageListenerContainerBeans
= configurableListableBeanFactory.getBeansOfType(MessageListenerContainer.class);
if (!CollectionUtils.isEmpty(messageListenerContainerBeans)) {
messageListenerContainerBeans.forEach((beanId, bean) -> {
if (bean instanceof AbstractMessageListenerContainer) {
AbstractMessageListenerContainer abstractMessageListenerContainer = (AbstractMessageListenerContainer) bean;
String listenerId = abstractMessageListenerContainer.getListenerId();
if (StringUtils.hasLength(listenerId)) {
containerCache.putIfAbsent(listenerId, abstractMessageListenerContainer);
} else {
containerCache.putIfAbsent(beanId, bean);
}
} else {
containerCache.putIfAbsent(beanId, bean);
}
});
}
Set<String> listenerIds = containerCache.keySet();
listenerIds.forEach(listenerId -> log.info("Cache message listener container => {}", listenerId));
// 所有消費者容器Bean發(fā)現(xiàn)完成后才接收刷新事件
StaticEventPublisher.attachApplicationEventPublisher(this.applicationEventPublisher);
StaticEventPublisher?中的ApplicationEventPublisher屬性延遲到所有消費者容器緩存完成后賦值,防止過早的屬性變更通知導致部分消費者容器的啟停操作被忽略。
管理消費者容器生命周期
接收到RabbitmqToggleRefreshEvent?事件后,然后遍歷傳遞過來的RabbitmqToggleProperties?里面的consumers,再基于已經(jīng)發(fā)現(xiàn)的消費者容器進行處理,代碼大概如下:
@EventListener(classes = RabbitmqToggleRefreshEvent.class)
public void onRabbitmqToggleRefreshEvent(RabbitmqToggleRefreshEvent event) {
RabbitmqToggleProperties rabbitmqToggleProperties = event.getRabbitmqToggleProperties();
List<RabbitmqToggleProperties.RabbitmqConsumer> consumers = rabbitmqToggleProperties.getConsumers();
if (!CollectionUtils.isEmpty(consumers)) {
consumers.forEach(consumerConf -> {
String listenerId = consumerConf.getListenerId();
if (StringUtils.hasLength(listenerId)) {
MessageListenerContainer messageListenerContainer = containerCache.get(listenerId);
if (Objects.nonNull(messageListenerContainer)) {
// running -> stop
if (messageListenerContainer.isRunning() && Objects.equals(Boolean.FALSE, consumerConf.getEnable())) {
messageListenerContainer.stop();
log.info("Message listener container => {} stop successfully", listenerId);
}
// modify concurrency
if (messageListenerContainer instanceof SimpleMessageListenerContainer) {
SimpleMessageListenerContainer simpleMessageListenerContainer
= (SimpleMessageListenerContainer) messageListenerContainer;
if (Objects.nonNull(consumerConf.getConcurrentConsumers())) {
simpleMessageListenerContainer.setConcurrentConsumers(consumerConf.getConcurrentConsumers());
}
if (Objects.nonNull(consumerConf.getMaxConcurrentConsumers())) {
simpleMessageListenerContainer.setMaxConcurrentConsumers(consumerConf.getMaxConcurrentConsumers());
}
}
// stop -> running
if (!messageListenerContainer.isRunning() && Objects.equals(Boolean.TRUE, consumerConf.getEnable())) {
messageListenerContainer.start();
log.info("Message listener container => {} start successfully", listenerId);
}
}
}
});
}
}
修改Nacos?服務里面的rabbitmq-toggle.properties文件,輸入內(nèi)容如下:
rabbitmq.toggle.consumers[0].listenerId=MultiAnnoInstanceDemoConsumer-firstOnInstanceMessage
rabbitmq.toggle.consumers[0].enable=true
rabbitmq.toggle.consumers[1].listenerId=MultiAnnoInstanceDemoConsumer-secondOnInstanceMessage
rabbitmq.toggle.consumers[1].enable=true
rabbitmq.toggle.consumers[2].listenerId=MultiAnnoMethodDemoConsumer
rabbitmq.toggle.consumers[2].enable=true
rabbitmq.toggle.consumers[3].listenerId=SingleAnnoMethodDemoConsumer
rabbitmq.toggle.consumers[3].enable=true
rabbitmq.toggle.consumers[4].listenerId=CustomMethodDemoConsumer
rabbitmq.toggle.consumers[4].enable=true
rabbitmq.toggle.consumers[5].listenerId=MessageListenerDemoConsumer
rabbitmq.toggle.consumers[5].enable=true
啟動項目,觀察RabbitMQ WebUI對應的隊列消費者數(shù)量:

spring-boot-rabbit-nacos-control-9
然后隨機修改rabbitmq-toggle.properties?文件某個消費者容器設(shè)置為enable = 'fasle'?,觀察服務日志和觀察RabbitMQ WebUI的變化:

spring-boot-rabbit-nacos-control-10
可見RabbitMQ WebUI?中隊列消費者數(shù)量減少,服務日志也提示listenerId = 'MessageListenerDemoConsumer'的消費者容器被停止了。
一些思考
為了更精確控制有消費者容器的啟停,可以考慮在配置文件中定義關(guān)閉消費者容器的自動啟動開關(guān):
spring.rabbitmq.listener.simple.auto-startup=false
可以考慮在RabbitmqToggleProperties?首次回調(diào)@PostConstruct?方法時候發(fā)布RabbitmqToggleInitEvent?事件,然后監(jiān)聽此事件啟動所有已經(jīng)發(fā)現(xiàn)的消費者容器。這樣就能做到應用內(nèi)部的消費者的啟停行為總是以Nacos的開關(guān)配置文件為準,并且可以實現(xiàn)「在線」啟停和動態(tài)調(diào)整最小最大消費者數(shù)量。
另外,如果細心的話能夠觀察到服務日志中,每當監(jiān)聽到Nacos?配置變動會打印Started application in N seconds (JVM running for M)?的日志,這個并不是服務重啟了,而是啟動了一個Spring?子容器用于構(gòu)建一個全新的StandardEnvironment?(見文末Demo?項目中的EnvironmentCaptureApplicationRunner?)用來承載刷新后的配置文件內(nèi)容,然后再拷貝或者覆蓋到當前的Spring?容器中的PropertySources,這個過程的代碼實現(xiàn)類似這樣:

spring-boot-rabbit-nacos-control-11
小結(jié)
本文探討了一種通過Nacos?配置刷新方式管理SpringBoot?服務中RabbitMQ?消費者生命周期管理的方案,目前只是提供了完整的思路和一些Demo級別代碼,后續(xù)應該會完善方案和具體的工程級別編碼實現(xiàn)。
本文Demo項目倉庫:
- framework-mesh/rabbitmq-rocketmq-demo