自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

一篇學會配置管理客戶端流程

開發(fā) 架構(gòu)
Nacos注冊中心的主要流程基本上擼完了,下面開始擼配置中心。本文從示例入手走查了客戶端的初始化流程,Listener的注冊邏輯和執(zhí)行邏輯。

[[416108]]

引言

Nacos注冊中心的主要流程基本上擼完了,下面開始擼配置中心。本文從示例入手走查了客戶端的初始化流程,Listener的注冊邏輯和執(zhí)行邏輯。

一、內(nèi)容提要

示例

  • 通過示例構(gòu)建ConfigService、注冊了Listener分析其流程

Client初始化概覽

  • 支持多種獲取server地址方式(本地、endpoint)
  • 支持多種namespace設(shè)置(本地、阿里云)
  • 支持超時時間、重試時間等參數(shù)設(shè)置
  • 支持用戶名和密碼驗證
  • 長輪詢會從BlockingQueue中獲取元素,隊列有元素立即執(zhí)行executeConfigListen,隊列無元素阻塞5秒鐘執(zhí)行executeConfigListen()

Listener注冊邏輯

  • client添加Listener后會在cacheMap中緩存CacheData
  • cacheMap中key由「dataId+group+tenant」拼接而成
  • 每個CacheData會綁定注冊的Listener列表
  • 每個CacheData會綁定taskId,3000個不同的CacheData對應一個taskId
  • 設(shè)置isSyncWithServer=false表示 cache md5 data不是來自server同步
  • BlockingQueue中添加new Object() 供長輪詢判斷立即執(zhí)行使用

配置變更執(zhí)行邏輯

  • 執(zhí)行邏輯由executeConfigListen方法實現(xiàn)
  • 當CacheData從Server同步后,會校驗md5是否變更了,變更則回調(diào)注冊的Listener完成通知
  • 注冊Listener后會構(gòu)建與server的RPC通道rpcClient
  • 向server發(fā)起變更查詢請求configChangeListenRequest
  • Server端通過比較緩存的md5值,返回client變更的key列表
  • Client通過變更的key列表向server發(fā)起配置查詢請求ConfigQueryRequest
  • 獲取變更內(nèi)容,并回調(diào)注冊的Listener完成通知
  • 回調(diào)注冊的Listener是通過線程池異步執(zhí)行Runnble Job實現(xiàn)的

二、示例

  1. @Test 
  2. public void test01() throws Exception { 
  3.   String serverAddr = "localhost:8848"
  4.   String dataId = "test"
  5.   String group = "DEFAULT_GROUP"
  6.   Properties properties = new Properties(); 
  7.   properties.put("serverAddr", serverAddr); 
  8.   // 構(gòu)建ConfigService 
  9.   ConfigService configService = NacosFactory.createConfigService(properties); 
  10.   configService.addListener(dataId, group, new Listener() { 
  11.     @Override 
  12.     public void receiveConfigInfo(String configInfo) { 
  13.       System.out.println("receive:" + configInfo); 
  14.     } 
  15.  
  16.     @Override 
  17.     public Executor getExecutor() { 
  18.       return null
  19.     } 
  20.   }); 
  21.   System.in.read(); 

備注: 示例中構(gòu)建了ConfigService,注入Listener接受server配置變更通知。

二、Client初始化概覽

NacosConfigService構(gòu)造方法

  1. public NacosConfigService(Properties properties) throws NacosException { 
  2.   ValidatorUtils.checkInitParam(properties); 
  3.   // 注解@1 
  4.   initNamespace(properties); 
  5.   // 注解@2 
  6.   ServerListManager serverListManager = new ServerListManager(properties); 
  7.   // 注解@3 
  8.   serverListManager.start(); 
  9.   // 注解@4 
  10.   this.worker = new ClientWorker(this.configFilterChainManager, serverListManager, properties); 
  11.   // 將被廢棄HttpAgent,先忽略 
  12.   // will be deleted in 2.0 later versions 
  13.   agent = new ServerHttpAgent(serverListManager); 

注解@1 設(shè)置namespace可以通過properties.setProperty(PropertyKeyConst.NAMESPACE),代碼中會兼容阿里云環(huán)境,在此忽略,默認為空。

注解@2 初始化namespace、server地址等信息

注解@3 啟動主要用于endpoint方式定時獲取server地址,當本地傳入isFixed=true

注解@4 clientWorker初始化

ClientWorker初始化

  1. public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager, 
  2.             final Properties properties) throws NacosException { 
  3.  
  4.   this.configFilterChainManager = configFilterChainManager; 
  5.  
  6.   // 注解@5 
  7.   init(properties); 
  8.  
  9.   // 注解@6 
  10.   agent = new ConfigRpcTransportClient(properties, serverListManager); 
  11.  
  12.   // 調(diào)度線程池,「處理器核數(shù)」 
  13.   ScheduledExecutorService executorService = Executors 
  14.     .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { 
  15.       @Override 
  16.       public Thread newThread(Runnable r) { 
  17.         Thread t = new Thread(r); 
  18.         t.setName("com.alibaba.nacos.client.Worker"); 
  19.         t.setDaemon(true); 
  20.         return t; 
  21.       } 
  22.     }); 
  23.   agent.setExecutor(executorService); 
  24.  
  25.   // 注解@7 
  26.   agent.start(); 
  27.  

注解@5 初始化超時時間、重試時間等

  1. private void init(Properties properties) { 
  2.  
  3.     // 超時時間,默認30秒 
  4.     timeout = Math.max(ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT), 
  5.             Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT); 
  6.  
  7.  
  8.     // 重試時間,默認2秒 
  9.     taskPenaltyTime = ConvertUtils 
  10.             .toInt(properties.getProperty(PropertyKeyConst.CONFIG_RETRY_TIME), Constants.CONFIG_RETRY_TIME); 
  11.  
  12.  
  13.     // 開啟配置刪除同步,默認false 
  14.     this.enableRemoteSyncConfig = Boolean 
  15.             .parseBoolean(properties.getProperty(PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG)); 

注解@6 gRPC config agent初始化

  1. public ConfigTransportClient(Properties properties, ServerListManager serverListManager) { 
  2.  
  3.     // 默認編碼UTF-8 
  4.     String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); 
  5.     if (StringUtils.isBlank(encodeTmp)) { 
  6.         this.encode = Constants.ENCODE; 
  7.     } else { 
  8.         this.encode = encodeTmp.trim(); 
  9.     } 
  10.     // namespace租戶,默認空 
  11.     this.tenant = properties.getProperty(PropertyKeyConst.NAMESPACE); 
  12.  
  13.     this.serverListManager = serverListManager; 
  14.  
  15.     // 用戶名和密碼驗證 
  16.     this.securityProxy = new SecurityProxy(properties, 
  17.             ConfigHttpClientManager.getInstance().getNacosRestTemplate()); 
  18.  

注解@7 gRPC agent啟動

  1. public void start() throws NacosException { 
  2.     // 簡單用戶名和密碼驗證 
  3.     if (securityProxy.isEnabled()) { 
  4.         securityProxy.login(serverListManager.getServerUrls()); 
  5.  
  6.         this.executor.scheduleWithFixedDelay(new Runnable() { 
  7.             @Override 
  8.             public void run() { 
  9.                 securityProxy.login(serverListManager.getServerUrls()); 
  10.             } 
  11.         }, 0, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS); 
  12.  
  13.     } 
  14.  
  15.     startInternal(); 
  1. @Override 
  2. public void startInternal() throws NacosException { 
  3.     executor.schedule(new Runnable() { 
  4.         @Override 
  5.         public void run() { 
  6.             while (true) { // 一直運行 
  7.                 try { 
  8.                     // 最長等待5秒 
  9.                     listenExecutebell.poll(5L, TimeUnit.SECONDS); 
  10.                     executeConfigListen(); 
  11.                 } catch (Exception e) { 
  12.                     LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e); 
  13.                 } 
  14.             } 
  15.         } 
  16.     }, 0L, TimeUnit.MILLISECONDS); 
  17.  

小結(jié): 線程會一直運行,從BlockingQueue中獲取元素。隊里不為空,獲取后立即執(zhí)行executeConfigListen();隊列為空等待5秒后執(zhí)行

executeConfigListen()。

三、Listener注冊邏輯

  1. configService.addListener(dataId, group, new Listener() { 
  2.     @Override 
  3.     public void receiveConfigInfo(String configInfo) { 
  4.         System.out.println("receive:" + configInfo); 
  5.     } 
  6.  
  7.     @Override 
  8.     public Executor getExecutor() { 
  9.         return null
  10.     } 
  11. }); 
  12. public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) 
  13.         throws NacosException { 
  14.  
  15.     // 默認DEFAULT_GROUP 
  16.     group = null2defaultGroup(group); 
  17.  
  18.     // 租戶,默認空 
  19.     String tenant = agent.getTenant(); 
  20.  
  21.     // 注解@8 
  22.     CacheData cache = addCacheDataIfAbsent(dataId, group, tenant); 
  23.  
  24.     synchronized (cache) { 
  25.         for (Listener listener : listeners) { 
  26.             cache.addListener(listener); 
  27.         } 
  28.         // cache md5 data是否來自server同步 
  29.         cache.setSyncWithServer(false); 
  30.        // BlockingQueue中添加new Object() 
  31.         agent.notifyListenConfig(); 
  32.     } 
  33.   

注解@8 構(gòu)建緩存數(shù)據(jù)CacheData并放入cacheMap中,緩存的key為 「dataId+group+tenant」例如:test+DEFAULT_GROUP。

每個CacheData會綁定對應的taskId,每3000個CacheData對應一個taskId。其實從后面的代碼中可以看出,每個taskId會對應一個gRPC Client。

  1. public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException { 
  2.  
  3.     // 從緩存中獲取 
  4.     CacheData cache = getCache(dataId, group, tenant); 
  5.     if (null != cache) { 
  6.         return cache; 
  7.     } 
  8.     // 構(gòu)造緩存key以+連接,test+DEFAULT_GROUP 
  9.     String key = GroupKey.getKeyTenant(dataId, group, tenant); 
  10.     synchronized (cacheMap) { 
  11.         CacheData cacheFromMap = getCache(dataId, group, tenant); 
  12.         // multiple listeners on the same dataid+group and race condition,so 
  13.         // double check again 
  14.         // other listener thread beat me to set to cacheMap 
  15.         if (null != cacheFromMap) { // 再檢查一遍 
  16.             cache = cacheFromMap; 
  17.             // reset so that server not hang this check 
  18.             cache.setInitializing(true); // 緩存正在初始化 
  19.         } else { 
  20.             // 構(gòu)造緩存數(shù)據(jù)對象 
  21.             cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant); 
  22.             // 初始值taskId=0,注意此處每3000個CacheData共用一個taskId 
  23.             int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize(); 
  24.             cache.setTaskId(taskId); 
  25.             // fix issue # 1317 
  26.             if (enableRemoteSyncConfig) { // 默認false 
  27.                 String[] ct = getServerConfig(dataId, group, tenant, 3000L, false); 
  28.                 cache.setContent(ct[0]); 
  29.             } 
  30.         } 
  31.         Map<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get()); 
  32.         // key = test+DEFAULT_GROUP 
  33.         copy.put(key, cache); 
  34.         // cacheMap = {test+DEFAULT_GROUP=CacheData [test, DEFAULT_GROUP]} 
  35.         cacheMap.set(copy); 
  36.  
  37.     } 
  38.     LOGGER.info("[{}] [subscribe] {}", agent.getName(), key); 
  39.  
  40.     MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size()); 
  41.  
  42.     return cache; 
屬性 含義
name ConfigTransportClient名稱,config_rpc_client
configFilterChainManager filter攔截鏈條,可以執(zhí)行一些列攔截器
dataId dataId
group group名稱,默認為DEFAULT_GROUP
tenant 租戶名稱
listeners 添加的Listener列表,線程安全CopyOnWriteArrayList
content 啟動時會從本地文件讀入,默認為null
md5 content的md5字符串

小結(jié):添加監(jiān)聽器邏輯如下:構(gòu)建CacheData,并緩存在cacheMap中,key是由「dataId+group+tenant」組成;每個CacheData會綁定了Listener列表,也綁定了taskId,3000個不同的CacheData對應一個taskId,對應一個gRPC通道實例;設(shè)置isSyncWithServer=false表示 cache md5 data不是來自server同步,BlockingQueue中添加new Object() 供前面提到的長輪詢判斷使用。

四、配置變更執(zhí)行邏輯

上文中提到一個線程一直在輪詢,輪詢執(zhí)行executeConfigListen方法,這個方法比較關(guān)鍵。

  1. public void executeConfigListen() { 
  2.  
  3.     Map<String/*taskId*/, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16); 
  4.     Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16); 
  5.     long now = System.currentTimeMillis(); 
  6.     // 超過5分鐘 
  7.     boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL; 
  8.     for (CacheData cache : cacheMap.get().values()) { 
  9.         synchronized (cache) { 
  10.             // --------注解@9開始-------- 
  11.             if (cache.isSyncWithServer()) {  
  12.                 cache.checkListenerMd5(); // 內(nèi)容有變更通知Listener執(zhí)行 
  13.                 if (!needAllSync) { // 不超過5分鐘則不再全局校驗 
  14.                     continue
  15.                 } 
  16.             } 
  17.       // --------注解@9結(jié)束-------- 
  18.             if (!CollectionUtils.isEmpty(cache.getListeners())) { // 有添加Listeners 
  19.                 // get listen config 默認 false 
  20.                 if (!cache.isUseLocalConfigInfo()) { 
  21.                     List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId())); 
  22.                     if (cacheDatas == null) { 
  23.                         cacheDatas = new LinkedList<CacheData>(); 
  24.                         listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas); 
  25.                     } 
  26.                     // CacheData [test, DEFAULT_GROUP] 
  27.                     cacheDatas.add(cache); 
  28.  
  29.                 } 
  30.             } else if (CollectionUtils.isEmpty(cache.getListeners())) { // 沒有添加Listeners 
  31.  
  32.                 if (!cache.isUseLocalConfigInfo()) { 
  33.                     List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId())); 
  34.                     if (cacheDatas == null) { 
  35.                         cacheDatas = new LinkedList<CacheData>(); 
  36.                         removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas); 
  37.                     } 
  38.                     cacheDatas.add(cache); 
  39.  
  40.                 } 
  41.             } 
  42.         } 
  43.  
  44.     } 
  45.  
  46.     boolean hasChangedKeys = false
  47.    
  48.   //-------------------注解@10開始--------------------------------- 
  49.    
  50.     if (!listenCachesMap.isEmpty()) { // 有Listeners 
  51.         for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) { 
  52.             String taskId = entry.getKey(); 
  53.             List<CacheData> listenCaches = entry.getValue(); 
  54.  
  55.             ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches); 
  56.             configChangeListenRequest.setListen(true); 
  57.             try { 
  58.                 // 注解@10.1 每個taskId構(gòu)建rpcClient 
  59.                 RpcClient rpcClient = ensureRpcClient(taskId); 
  60.                 // 注解@10.2 
  61.                 ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy( 
  62.                         rpcClient, configChangeListenRequest); 
  63.                 if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) { 
  64.  
  65.                     Set<String> changeKeys = new HashSet<String>(); 
  66.                     // handle changed keys,notify listener 
  67.                     // 有變化的configContext 
  68.                     if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) { 
  69.                         hasChangedKeys = true
  70.                         for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse.getChangedConfigs()) { 
  71.                             String changeKey = GroupKey 
  72.                                     .getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(), 
  73.                                             changeConfig.getTenant()); 
  74.                             changeKeys.add(changeKey); 
  75.                             boolean isInitializing = cacheMap.get().get(changeKey).isInitializing(); 
  76.                             // 注解@10.3 回調(diào)Listener 
  77.                             refreshContentAndCheck(changeKey, !isInitializing); 
  78.                         } 
  79.  
  80.                     } 
  81.  
  82.                     //handler content configs 
  83.                     for (CacheData cacheData : listenCaches) { 
  84.                         String groupKey = GroupKey 
  85.                                 .getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant()); 
  86.                         if (!changeKeys.contains(groupKey)) { // 注解@10.4 
  87.                             //sync:cache data md5 = server md5 && cache data md5 = all listeners md5. 
  88.                             synchronized (cacheData) { 
  89.                                 if (!cacheData.getListeners().isEmpty()) { 
  90.                                     cacheData.setSyncWithServer(true); 
  91.                                     continue
  92.                                 } 
  93.                             } 
  94.                         } 
  95.  
  96.                         cacheData.setInitializing(false); 
  97.                     } 
  98.  
  99.                 } 
  100.             } catch (Exception e) { 
  101.  
  102.                 LOGGER.error("Async listen config change error ", e); 
  103.                 try { 
  104.                     Thread.sleep(50L); 
  105.                 } catch (InterruptedException interruptedException) { 
  106.                     //ignore 
  107.                 } 
  108.             } 
  109.         } 
  110.     } 
  111.  //-------------------注解@10結(jié)束--------------------------------- 
  112.     if (!removeListenCachesMap.isEmpty()) { 
  113.         for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) { 
  114.             String taskId = entry.getKey(); 
  115.             List<CacheData> removeListenCaches = entry.getValue(); 
  116.             ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches); 
  117.             configChangeListenRequest.setListen(false); 
  118.             try { 
  119.                 // 向server發(fā)送Listener取消訂閱請求ConfigBatchListenRequest#listen為false 
  120.                 RpcClient rpcClient = ensureRpcClient(taskId); 
  121.                 boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest); 
  122.                 if (removeSuccess) { 
  123.                     for (CacheData cacheData : removeListenCaches) { 
  124.                         synchronized (cacheData) { 
  125.                             if (cacheData.getListeners().isEmpty()) { 
  126.                                 // 移除本地緩存 
  127.                                 ClientWorker.this 
  128.                                         .removeCache(cacheData.dataId, cacheData.group, cacheData.tenant); 
  129.                             } 
  130.                         } 
  131.                     } 
  132.                 } 
  133.  
  134.             } catch (Exception e) { 
  135.                 LOGGER.error("async remove listen config change error ", e); 
  136.             } 
  137.             try { 
  138.                 Thread.sleep(50L); 
  139.             } catch (InterruptedException interruptedException) { 
  140.                 //ignore 
  141.             } 
  142.         } 
  143.     } 
  144.     if (needAllSync) { 
  145.         lastAllSyncTime = now; 
  146.     } 
  147.     //If has changed keys,notify re sync md5. 
  148.     if (hasChangedKeys) { // key有變化觸發(fā)下一輪輪詢 
  149.         notifyListenConfig(); 
  150.     } 

注解@9 isSyncWithServer初始為false,在下文代碼中校驗結(jié)束后會設(shè)置為true,表示md5 cache data同步來自server。如果為true會校驗Md5.

  1. void checkListenerMd5() { 
  2.     for (ManagerListenerWrap wrap : listeners) { 
  3.         if (!md5.equals(wrap.lastCallMd5)) { // 注解@9.1 
  4.             safeNotifyListener(dataId, group, content, type, md5, wrap); 
  5.         } 
  6.     } 

注解@9.1 配置內(nèi)容有變更時,回調(diào)到我們示例中注冊的Listener中。

  1. private void safeNotifyListener(final String dataId, final String group, final String content, final String type, 
  2.         final String md5, final ManagerListenerWrap listenerWrap) { 
  3.     final Listener listener = listenerWrap.listener; 
  4.     if (listenerWrap.inNotifying) { 
  5.         // ... 
  6.         return
  7.     } 
  8.     Runnable job = new Runnable() { 
  9.         @Override 
  10.         public void run() { 
  11.             long start = System.currentTimeMillis(); 
  12.             ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader(); 
  13.             ClassLoader appClassLoader = listener.getClass().getClassLoader(); 
  14.             try { 
  15.                 if (listener instanceof AbstractSharedListener) { 
  16.                     AbstractSharedListener adapter = (AbstractSharedListener) listener; 
  17.                     adapter.fillContext(dataId, group); 
  18.                    // ... 
  19.                 } 
  20.                 Thread.currentThread().setContextClassLoader(appClassLoader); 
  21.  
  22.                 ConfigResponse cr = new ConfigResponse(); 
  23.                 cr.setDataId(dataId); 
  24.                 cr.setGroup(group); 
  25.                 cr.setContent(content); 
  26.                 // filter攔截繼續(xù)過濾 
  27.                 configFilterChainManager.doFilter(null, cr); 
  28.  
  29.                 String contentTmp = cr.getContent(); 
  30.                 listenerWrap.inNotifying = true
  31.  
  32.                 // 注解@9.2 
  33.                 listener.receiveConfigInfo(contentTmp); 
  34.                 // compare lastContent and content 
  35.                 if (listener instanceof AbstractConfigChangeListener) { 
  36.                     Map data = ConfigChangeHandler.getInstance() 
  37.                             .parseChangeData(listenerWrap.lastContent, content, type); 
  38.                     ConfigChangeEvent event = new ConfigChangeEvent(data); 
  39.                     // 回調(diào)變更事件方法 
  40.                     ((AbstractConfigChangeListener) listener).receiveConfigChange(event); 
  41.                     listenerWrap.lastContent = content; 
  42.                 } 
  43.  
  44.                 listenerWrap.lastCallMd5 = md5; 
  45.                 // .. 
  46.             } catch (NacosException ex) { 
  47.                // ... 
  48.             } catch (Throwable t) { 
  49.                // ... 
  50.             } finally { 
  51.                 listenerWrap.inNotifying = false
  52.                 Thread.currentThread().setContextClassLoader(myClassLoader); 
  53.             } 
  54.         } 
  55.     }; 
  56.  
  57.     final long startNotify = System.currentTimeMillis(); 
  58.     try { 
  59.        // 注解@9.3 
  60.         if (null != listener.getExecutor()) { 
  61.             listener.getExecutor().execute(job); 
  62.         } else { 
  63.             try { 
  64.                 INTERNAL_NOTIFIER.submit(job); // 默認線程池執(zhí)行,為5個線程 
  65.             } catch (RejectedExecutionException rejectedExecutionException) { 
  66.                 // ... 
  67.                 job.run(); 
  68.             } catch (Throwable throwable) { 
  69.                 // ... 
  70.                 job.run(); 
  71.             } 
  72.         } 
  73.     } catch (Throwable t) { 
  74.        // ... 
  75.     } 
  76.     final long finishNotify = System.currentTimeMillis(); 
  77.     // ... 

注解@9.2 回調(diào)注冊Listener的receiveConfigInfo方法或者receiveConfigChange邏輯

注解@9.3 優(yōu)先使用我們示例中注冊提供的線程池執(zhí)行job,如果沒有設(shè)置使用默認線程池「INTERNAL_NOTIFIER」,默認5個線程

備注: 當CacheData從server同步后,會校驗md5是否變更了,當變更時會回調(diào)到我們注冊的Listener完成通知。通知任務被封裝成Runnable任務,執(zhí)行線程池可以自定義,默認為5個線程。

注解@10.1 每個taskId構(gòu)建rpcClient,例如:taskId= config-0-c70e0314-4770-43f5-add4-f258a4083fd7;結(jié)合上下文每3000個CacheData對應一個rpcClient。

注解@10.2 向server發(fā)起configChangeListenRequest,server端由ConfigChangeBatchListenRequestHandler處理,還是比較md5

是否變更了,變更后server端返回變更的key列表。

注解@10.3 當server返回變更key列表時執(zhí)行refreshContentAndCheck方法。

  1. private void refreshContentAndCheck(CacheData cacheData, boolean notify) { 
  2.     try { 
  3.         // 注解@10.3.1 
  4.         String[] ct = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L, notify); 
  5.         cacheData.setContent(ct[0]); 
  6.         if (null != ct[1]) { 
  7.             cacheData.setType(ct[1]); 
  8.         } 
  9.         if (notify) { // 記錄日志 
  10.            // ... 
  11.         } 
  12.         // 注解@10.3.2 
  13.         cacheData.checkListenerMd5(); 
  14.     } catch (Exception e) { 
  15.         //... 
  16.     } 

注解@10.3.1 向server發(fā)起ConfigQueryRequest,查詢配置內(nèi)容

注解@10.3.2 回調(diào)注冊的Listener邏輯見 注解@9

注解@10.4 key沒有變化的,內(nèi)容由server同步,設(shè)置SyncWithServer=true,下一輪邏輯會由 注解@9 部分執(zhí)行

備注: 從整個注解@10 注冊Listener后,會構(gòu)建與server的RPC通道rpcClient;向server發(fā)起變更查詢請求configChangeListenRequest,server端通過比較緩存的md5值,返回client變更的key列表;client通過變更的key列表向server發(fā)起配置查詢請求ConfigQueryRequest,獲取變更內(nèi)容,并回調(diào)我們注冊的Listener。

本文轉(zhuǎn)載自微信公眾號「瓜農(nóng)老梁」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系瓜農(nóng)老梁公眾號。

 

責任編輯:武曉燕 來源: 瓜農(nóng)老梁
相關(guān)推薦

2022-10-20 07:39:26

2022-08-26 09:29:01

Kubernetes策略Master

2021-06-09 19:23:52

MySQLROLE管理

2022-02-21 08:48:00

Pulsar部署配置

2011-03-24 13:00:31

配置nagios客戶端

2022-01-02 08:43:46

Python

2022-02-07 11:01:23

ZooKeeper

2023-01-03 08:31:54

Spring讀取器配置

2021-05-11 08:54:59

建造者模式設(shè)計

2021-07-05 22:11:38

MySQL體系架構(gòu)

2021-07-06 08:59:18

抽象工廠模式

2023-11-28 08:29:31

Rust內(nèi)存布局

2021-07-02 09:45:29

MySQL InnoDB數(shù)據(jù)

2022-08-23 08:00:59

磁盤性能網(wǎng)絡

2021-07-02 08:51:29

源碼參數(shù)Thread

2021-07-16 22:43:10

Go并發(fā)Golang

2021-09-28 08:59:30

復原IP地址

2022-04-12 08:30:52

回調(diào)函數(shù)代碼調(diào)試

2021-10-27 09:59:35

存儲

2022-03-11 10:21:30

IO系統(tǒng)日志
點贊
收藏

51CTO技術(shù)棧公眾號