SpringCloud整合MQ實(shí)現(xiàn)消息總線服務(wù),實(shí)戰(zhàn)講解!
一、背景介紹
在之前的文章中,我們介紹了服務(wù)配置中心高可用的玩法。了解到,每當(dāng)修改配置文件內(nèi)容,如果需要客戶端也同步更新,就需要手動(dòng)調(diào)用/refresh接口,以便客戶端能獲取到最新的配置內(nèi)容。
當(dāng)客戶端越來(lái)越多的時(shí)候,通過(guò)人工進(jìn)行處理顯然非常雞肋。有沒(méi)有一種更加高效的辦法,當(dāng)手動(dòng)調(diào)用其中一個(gè)客戶端的/refresh接口,其它的客戶端也自動(dòng)更新?
答案是肯定的!
在 Spring Cloud 體系里,有一個(gè)叫做 Spring Cloud Bus 模塊,也被業(yè)界稱為消息總線。它可以將分布式系統(tǒng)內(nèi)的節(jié)點(diǎn)以消息代理方式連接起來(lái),開(kāi)發(fā)者可以通過(guò)消息代理服務(wù)向其它節(jié)點(diǎn)傳輸數(shù)據(jù)的變更,例如配置文件的更改,也可以用于收集節(jié)點(diǎn)監(jiān)控?cái)?shù)據(jù)。其中常用的消息代理服務(wù)有 RabbitMQ 和 Kafka。
換言之,我們可以借助 Spring Cloud Bus 模塊來(lái)實(shí)現(xiàn)上文介紹的訴求,引入 Spring Cloud Bus 模塊后,客戶端獲取遠(yuǎn)程配置文件的方式,可以用如下流程圖來(lái)描述。
圖片
熟悉 MQ 服務(wù)的同學(xué)可能一眼就看出來(lái)了,其原理就是借助 MQ 服務(wù)的發(fā)布與訂閱功能向其它節(jié)點(diǎn)進(jìn)行廣播數(shù)據(jù),從而實(shí)現(xiàn)客戶端自動(dòng)刷新配置功能。
其交互流程可以用如下內(nèi)容來(lái)概括。
- 1.當(dāng)外部請(qǐng)求調(diào)用客戶端 A 的/refresh接口后,除了主動(dòng)刷新配置意外,還會(huì)通過(guò) Spring Cloud Bus 模塊,將刷新配置接口的指令數(shù)據(jù)發(fā)送到 MQ 服務(wù)器
- 2.MQ 服務(wù)器會(huì)將指令數(shù)據(jù)通過(guò) Spring Cloud Bus 模塊推送給其它客戶端
- 3.客戶端 B、C 接收到最新的消息指令后,主動(dòng)調(diào)用刷新配置服務(wù),獲取最新的配置內(nèi)容
下面我們通過(guò)具體的例子,結(jié)合之前介紹的知識(shí),看看如何利用 Spring Cloud Bus 實(shí)現(xiàn)客戶端配置文件自動(dòng)刷新的效果。
三、方案實(shí)踐
在此,我們采用 RabbitMQ 服務(wù)器來(lái)搭建消息總線,因此需要事先準(zhǔn)備一臺(tái)可用的 RabbitMQ 服務(wù)器,具體的安裝教程之前有所介紹,大家也可以百度搜索一下,具體的安裝過(guò)程就不再重復(fù)撰述。
3.1、添加依賴
根據(jù)eureka-config-client復(fù)制一個(gè)服務(wù)消費(fèi)者工程,命名為eureka-config-client-bus,并在pom.xml中引入spring-cloud-starter-bus-amqp依賴包,示例如下:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
3.2、添加配置文件
接著,在bootstrap.properties配置文件中添加消息代理相關(guān)的屬性信息,示例如下:
# 配置rabbitmq服務(wù)器地址
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
3.3、服務(wù)測(cè)試
然后,依次將eureka-server、eureka-config-server、eureka-config-client-bus (分別在不同的端口上,比如9022、9023) 服務(wù)啟動(dòng)起來(lái)。訪問(wèn) eureka 可視化頁(yè)面,如果服務(wù)都正常,可以看到類似于如下的界面。
圖片
將遠(yuǎn)程倉(cāng)庫(kù)配置文件中的blog.name=hahaha修改成blog.name=hahaha123456,以便測(cè)試客戶端配置文件是否能自動(dòng)更新。
接著,向其中一個(gè)客戶端發(fā)送一個(gè)/bus/refresh的 POST 請(qǐng)求。
**需要注意的是,這里的路徑是/bus/refresh,而不是/refresh**!
圖片
最后,在瀏覽器中重新訪問(wèn)另一個(gè)客戶端讀取配置文件的接口,不意外的話,客戶端獲取的是最新的配置信息。
圖片
說(shuō)明客戶端已經(jīng)成功讀取到最新的配置內(nèi)容。
查看客戶端的日志,也會(huì)看到類似于如下的信息。
Received remote refresh request. Keys refreshed [config.client.version, blog.name]
3.4、WebHook
可能有的同學(xué)會(huì)發(fā)出一個(gè)疑問(wèn),不可能每次修改倉(cāng)庫(kù)的配置文件,自己都需要手動(dòng)調(diào)用/bus/refresh接口吧。
實(shí)際上,GIT 里面有個(gè) WebHooks 功能,每次 push 代碼后,我們可以利用它給遠(yuǎn)程 HTTP URL 發(fā)送一個(gè) POST 請(qǐng)求,以此省去手動(dòng)調(diào)用的工作。
圖片
操作非常簡(jiǎn)單,只需要將其中一個(gè)客戶端/bus/refresh接口地址添加進(jìn)去即可。需要注意的是,這里的 HTTP URL 必須是一個(gè)能請(qǐng)求通過(guò)的公網(wǎng)地址哈!
四、升級(jí)版
在以上的方案中,雖然我們利用消息總線實(shí)現(xiàn)了手動(dòng)刷新一個(gè)客戶端的配置文件更新,其它客戶端也同步跟著一起刷新的目的,但在實(shí)際的實(shí)踐過(guò)程中,發(fā)現(xiàn)還是有一些不便的地方。
例如客戶端因?yàn)闃I(yè)務(wù)的快速迭代會(huì)頻繁的發(fā)布服務(wù),同時(shí)也會(huì)根據(jù)服務(wù)的并發(fā)量適度的增減服務(wù)實(shí)例數(shù)量,這種情況下,客戶端的 IP 和端口會(huì)經(jīng)常發(fā)生變動(dòng),每次人工運(yùn)維起來(lái)會(huì)很繁瑣。
因此,我們可以將上面的交互流程改變一下,由服務(wù)配置中心通過(guò) Spring Cloud Bus 模塊向客戶端發(fā)送重刷配置文件的指令。
整個(gè)流程,可以用如下圖來(lái)描述。
圖片
因?yàn)榉?wù)配置中心基本上很少會(huì)去迭代,客戶端的 IP 和端口發(fā)生變動(dòng)的可能性較小,由它向客戶端推送消息,運(yùn)維的工作量可以顯著的下降。
改造的流程也很簡(jiǎn)單,只需要兩步即可!
3.1、添加依賴
與上文類似,根據(jù)eureka-config-server復(fù)制一個(gè)服務(wù)消費(fèi)者工程,命名為eureka-config-server-bus,并在pom.xml中引入spring-cloud-starter-bus-amqp依賴包,示例如下:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
</dependencies>
4.2、添加配置文件
接著,在application.properties中添加消息代理服務(wù)相關(guān)的屬性,示例如下:
spring.application.name=eureka-config-server
server.port=9020
# 配置git倉(cāng)庫(kù)地址
spring.cloud.config.server.git.uri=https://gitee.com/pzblogs/config-demo
spring.cloud.config.server.git.username=
spring.cloud.config.server.git.password=
# 設(shè)置與Eureka Server交互的地址,多個(gè)地址可使用【,】分隔
eureka.client.serviceUrl.defaultZnotallow=http://localhost:8001/eureka/
# 關(guān)閉安全認(rèn)證
management.security.enabled=false
# 配置rabbitmq服務(wù)器地址
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
4.3、服務(wù)測(cè)試
然后,依次將eureka-server、eureka-config-server-bus、eureka-config-client-bus服務(wù)啟動(dòng)起來(lái)。
接著,修改倉(cāng)庫(kù)中的配置文件內(nèi)容,完成之后,以 POST 方式調(diào)用服務(wù)配置中心的刷新配置接口,例如http://localhost:9020/bus/refresh。
最后,在瀏覽器訪問(wèn)客戶端http://localhost:9023/hello,不意外的話,能看到最新的信息。
圖片
五、小結(jié)
最后總結(jié)一下,當(dāng)我們手動(dòng)更新某個(gè)倉(cāng)庫(kù)配置文件的時(shí)候,想要實(shí)現(xiàn)所有客戶端同時(shí)也自動(dòng)更新配置,可以利用消息總線來(lái)實(shí)現(xiàn)節(jié)點(diǎn)之間數(shù)據(jù)的同步變更操作。
如果想要用 Kafka 來(lái)做消息代理服務(wù),實(shí)現(xiàn)思路也類似,將bus-amqp換成bus-kafka,示例子如下
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
最后在配置文件中添加 Kafka 相關(guān)服務(wù)地址配置參數(shù)即可。