自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Nacos Client服務(wù)訂閱機(jī)制之核心流程

開(kāi)發(fā) 架構(gòu)
說(shuō)起Nacos的服務(wù)訂閱機(jī)制,對(duì)此不了解的朋友,可能感覺(jué)非常神秘,這篇文章就大家深入淺出的了解一下Nacos 2.0客戶端的訂閱實(shí)現(xiàn)。

[[417043]]

本文轉(zhuǎn)載自微信公眾號(hào)「程序新視界」,作者二師兄。轉(zhuǎn)載本文請(qǐng)聯(lián)系程序新視界公眾號(hào)。

說(shuō)起Nacos的服務(wù)訂閱機(jī)制,對(duì)此不了解的朋友,可能感覺(jué)非常神秘,這篇文章就大家深入淺出的了解一下Nacos 2.0客戶端的訂閱實(shí)現(xiàn)。由于涉及到的內(nèi)容比較多,就分幾篇來(lái)講,本篇為第一篇。

Nacos訂閱概述

Nacos的訂閱機(jī)制,如果用一句話來(lái)描述就是:Nacos客戶端通過(guò)一個(gè)定時(shí)任務(wù),每6秒從注冊(cè)中心獲取實(shí)例列表,當(dāng)發(fā)現(xiàn)實(shí)例發(fā)生變化時(shí),發(fā)布變更事件,訂閱者進(jìn)行業(yè)務(wù)處理。該更新實(shí)例的更新實(shí)例,該更新本地緩存的更新本地緩存。

nacos

上圖畫(huà)出了訂閱方法的主線流程,涉及的內(nèi)容較多,處理細(xì)節(jié)復(fù)雜。這里只用把握住核心部分即可。下面就通過(guò)代碼和流程圖來(lái)逐步分析上述過(guò)程。

從訂閱到定時(shí)任務(wù)開(kāi)啟

我們這里聊的訂閱機(jī)制,其實(shí)本質(zhì)上就是服務(wù)發(fā)現(xiàn)的準(zhǔn)實(shí)時(shí)感知。上面已經(jīng)看到了當(dāng)執(zhí)行訂閱方法時(shí),會(huì)觸發(fā)定時(shí)任務(wù),定時(shí)去拉服務(wù)器端的數(shù)據(jù)。所以,本質(zhì)上,訂閱機(jī)制就是實(shí)現(xiàn)服務(wù)發(fā)現(xiàn)的一種方式,對(duì)照的方式就是直接查詢(xún)接口了。

NacosNamingService中暴露的許多重載的subscribe,重載的目的就是讓大家少寫(xiě)一些參數(shù),這些參數(shù)呢,Nacos給默認(rèn)處理了。最終這些重載方法都會(huì)調(diào)用到下面這個(gè)方法:

  1. // NacosNamingService 
  2. public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) 
  3.         throws NacosException { 
  4.     if (null == listener) { 
  5.         return
  6.     } 
  7.     String clusterString = StringUtils.join(clusters, ","); 
  8.     changeNotifier.registerListener(groupName, serviceName, clusterString, listener); 
  9.     clientProxy.subscribe(serviceName, groupName, clusterString); 

方法中的事件監(jiān)聽(tīng)我們暫時(shí)不聊,直接看subscribe方法,這里clientProxy類(lèi)型為NamingClientProxyDelegate。實(shí)例化NacosNamingService時(shí)該類(lèi)被實(shí)例化,前面章節(jié)中已經(jīng)講到,不再贅述。

而clientProxy.subscribe方法在NamingClientProxyDelegate中實(shí)現(xiàn):

  1. // NamingClientProxyDelegate 
  2. @Override 
  3. public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException { 
  4.     String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName); 
  5.     String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters); 
  6.     // 獲取緩存中的ServiceInfo 
  7.     ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey); 
  8.     if (null == result) { 
  9.         // 如果為null,則進(jìn)行訂閱邏輯處理,基于gRPC協(xié)議 
  10.         result = grpcClientProxy.subscribe(serviceName, groupName, clusters); 
  11.     } 
  12.     // 定時(shí)調(diào)度UpdateTask 
  13.     serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters); 
  14.     // ServiceInfo本地緩存處理 
  15.     serviceInfoHolder.processServiceInfo(result); 
  16.     return result; 

這段方法是不是眼熟啊?對(duì)的,在前面分析《Nacos Client服務(wù)發(fā)現(xiàn)》時(shí)我們已經(jīng)講過(guò)了??磥?lái)殊途同歸,查詢(xún)服務(wù)列表和訂閱最終都調(diào)用了同一個(gè)方法。

上篇講了其他流程,我們這里重點(diǎn)看任務(wù)調(diào)度:

  1. // ServiceInfoUpdateService 
  2. public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) { 
  3.     String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); 
  4.     if (futureMap.get(serviceKey) != null) { 
  5.         return
  6.     } 
  7.     synchronized (futureMap) { 
  8.         if (futureMap.get(serviceKey) != null) { 
  9.             return
  10.         } 
  11.         // 構(gòu)建UpdateTask 
  12.         ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters)); 
  13.         futureMap.put(serviceKey, future); 
  14.     } 

該方法包含了構(gòu)建serviceKey、通過(guò)serviceKey判重,最后添加UpdateTask。

而其中的addTask的實(shí)現(xiàn)就是發(fā)起了一個(gè)定時(shí)任務(wù):

  1. // ServiceInfoUpdateService 
  2. private synchronized ScheduledFuture<?> addTask(UpdateTask task) { 
  3.     return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS); 

定時(shí)任務(wù)延時(shí)1秒執(zhí)行。

跟蹤到這里就告一階段了。核心功能只有兩個(gè):調(diào)用訂閱方法和發(fā)起定時(shí)任務(wù)。

定時(shí)任務(wù)都干了啥

UpdateTask封裝了訂閱機(jī)制的核心業(yè)務(wù)邏輯,先來(lái)通過(guò)一張流程圖看一下都做了啥。

nacos

有了上述流程圖,基本就很清晰的了解UpdateTask所做的事情了。直接貼出run方法的所有代碼:

  1. public void run() { 
  2.     long delayTime = DEFAULT_DELAY; 
  3.  
  4.     try { 
  5.         // 判斷該注冊(cè)的Service是否被訂閱,如果沒(méi)有訂閱則不再執(zhí)行 
  6.         if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) { 
  7.             NAMING_LOGGER 
  8.                     .info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters); 
  9.             return
  10.         } 
  11.  
  12.         // 獲取緩存的service信息 
  13.         ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey); 
  14.         if (serviceObj == null) { 
  15.             // 根據(jù)serviceName從注冊(cè)中心服務(wù)端獲取Service信息 
  16.             serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); 
  17.             serviceInfoHolder.processServiceInfo(serviceObj); 
  18.             lastRefTime = serviceObj.getLastRefTime(); 
  19.             return
  20.         } 
  21.  
  22.         // 過(guò)期服務(wù)(服務(wù)的最新更新時(shí)間小于等于緩存刷新時(shí)間),從注冊(cè)中心重新查詢(xún) 
  23.         if (serviceObj.getLastRefTime() <= lastRefTime) { 
  24.             serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); 
  25.             // 處理Service消息 
  26.             serviceInfoHolder.processServiceInfo(serviceObj); 
  27.         } 
  28.         // 刷新更新時(shí)間 
  29.         lastRefTime = serviceObj.getLastRefTime(); 
  30.         if (CollectionUtils.isEmpty(serviceObj.getHosts())) { 
  31.             incFailCount(); 
  32.             return
  33.         } 
  34.         // 下次更新緩存時(shí)間設(shè)置,默認(rèn)為6秒 
  35.         // TODO multiple time can be configured. 
  36.         delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE; 
  37.         // 重置失敗數(shù)量為0 
  38.         resetFailCount(); 
  39.     } catch (Throwable e) { 
  40.         incFailCount(); 
  41.         NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e); 
  42.     } finally { 
  43.         // 下次調(diào)度刷新時(shí)間,下次執(zhí)行的時(shí)間與failCount有關(guān) 
  44.         // failCount=0,則下次調(diào)度時(shí)間為6秒,最長(zhǎng)為1分鐘 
  45.         // 即當(dāng)無(wú)異常情況下緩存實(shí)例的刷新時(shí)間是6秒 
  46.         executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); 
  47.     } 

首先在判斷服務(wù)是否是被訂閱過(guò),實(shí)現(xiàn)方法是ChangeNotifier#isSubscribed:

  1. public boolean isSubscribed(String groupName, String serviceName, String clusters) { 
  2.     String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); 
  3.     ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key); 
  4.     return CollectionUtils.isNotEmpty(eventListeners); 

查看該方法的源碼會(huì)發(fā)現(xiàn),這里的listenerMap正是最開(kāi)始的subscribe方法中registerListener注冊(cè)的EventListener。

run方法后面的業(yè)務(wù)處理基本上都雷同了,先判斷緩存是否有ServiceInfo信息,如果沒(méi)有則查詢(xún)注冊(cè)中心、處理ServiceInfo、更新上次處理時(shí)間。

而下面判斷ServiceInfo是否失效,正是通過(guò)“上次更新時(shí)間”與當(dāng)前ServiceInfo中的“上次更新時(shí)間”做比較來(lái)判斷。如果失效,也會(huì)查詢(xún)注冊(cè)中心、處理ServiceInfo、更新上次處理時(shí)間等一系列操作。

業(yè)務(wù)邏輯最后會(huì)計(jì)算下一次定時(shí)任務(wù)的執(zhí)行時(shí)間,通過(guò)delayTime來(lái)延遲執(zhí)行。delayTime默認(rèn)為 1000L * 6,也就是6秒。而在finally里面真的發(fā)起下一次定時(shí)任務(wù)。當(dāng)出現(xiàn)異常時(shí),下次執(zhí)行的時(shí)間與失敗次數(shù)有關(guān),但最長(zhǎng)不超過(guò)1分鐘。

小結(jié)

這一篇我們講了Nacos客戶端服務(wù)訂閱機(jī)制的源碼,主要有以下步驟:

第一步:訂閱方法的調(diào)用,并進(jìn)行EventListener的注冊(cè),后面UpdateTask要用來(lái)進(jìn)行判斷;

第二步:通過(guò)委托代理類(lèi)來(lái)處理訂閱邏輯,此處與獲取實(shí)例列表方法使用了同一個(gè)方法;

第三步:通過(guò)定時(shí)任務(wù)執(zhí)行UpdateTask方法,默認(rèn)執(zhí)行間隔為6秒,當(dāng)發(fā)生異常時(shí)會(huì)延長(zhǎng),但不超過(guò)1分鐘;

第四步:UpdateTask方法中會(huì)比較本地是否存在緩存,緩存是否過(guò)期。當(dāng)不存在或過(guò)期時(shí),查詢(xún)注冊(cè)中心,獲取最新實(shí)例,更新最后獲取時(shí)間,處理ServiceInfo。

第五步:重新計(jì)算定時(shí)任務(wù)時(shí)間,循環(huán)執(zhí)行上述流程。

 

責(zé)任編輯:武曉燕 來(lái)源: 程序新視界
相關(guān)推薦

2021-08-16 07:26:42

服務(wù)訂閱機(jī)制

2022-05-14 22:27:40

Nacos訂閱機(jī)制定時(shí)器

2022-05-19 07:39:43

Nacos訂閱機(jī)制線程類(lèi)

2021-08-10 07:00:00

Nacos Clien服務(wù)分析

2022-06-08 10:58:00

服務(wù)配置Nacos

2022-05-02 22:01:49

訂閱模式Eureka推送模式

2018-08-19 11:00:05

2021-05-27 22:46:00

Nacos Clien版本Nacos

2023-03-01 08:15:10

NginxNacos

2020-06-28 13:51:03

哈希map結(jié)構(gòu)

2016-09-22 16:40:48

微服務(wù)架構(gòu)RPC-client序

2021-12-20 00:03:38

Webpack運(yùn)行機(jī)制

2012-07-03 10:57:54

Hadoop核心機(jī)制

2021-09-16 06:44:04

Android進(jìn)階流程

2023-01-11 08:22:22

RabbitMQ通信模型

2021-09-01 09:40:44

Docker開(kāi)發(fā)人員擴(kuò)展

2023-08-14 08:17:13

Kafka服務(wù)端

2021-07-12 08:00:21

Nacos 服務(wù)注冊(cè)源碼分析

2021-09-30 07:36:51

AndroidViewDraw

2021-08-04 11:54:25

Nacos注冊(cè)中心設(shè)計(jì)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)