自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

NacosSync雙向復(fù)制源碼分析

開發(fā) 前端
通過開源同步工具NacosSync的分析,對(duì)我們實(shí)現(xiàn)自定義的同步工具提供參考。文本就同步任務(wù)分發(fā)與Nacos集群之間、從zk到Nacos的同步源碼做個(gè)分析。

[[421895]]

引言

通過開源同步工具NacosSync的分析,對(duì)我們實(shí)現(xiàn)自定義的同步工具提供參考。文本就同步任務(wù)分發(fā)與Nacos集群之間、從zk到Nacos的同步源碼做個(gè)分析。

一、內(nèi)容提要

任務(wù)和配置入庫(kù)

  • 集群配置入庫(kù)
  • 同步任務(wù)入庫(kù)

同步任務(wù)分發(fā)

  • 每三秒調(diào)度一次任務(wù)列表
  • 新增任務(wù)發(fā)布同步任務(wù)事件SyncTaskEvent并由listenerSyncTaskEvent處理
  • 刪除任務(wù)發(fā)布刪除任務(wù)事件DeleteTaskEvent并由listenerDeleteTaskEvent處理
  • 任務(wù)的發(fā)布和訂閱使用Guava的EventBus

Nacos集群之間同步邏輯

  • 兩個(gè)Nacos集群之間進(jìn)行同步,同步任務(wù)在Service維度(AppId)建立
  • 對(duì)源集群注冊(cè)監(jiān)聽獲取注冊(cè)節(jié)點(diǎn)列表,通過剔除無效節(jié)點(diǎn)后,將新的節(jié)點(diǎn)注冊(cè)到目標(biāo)集群

從zk集群同步到Nacos集群

  • NacosSync從zk集群同步到Nacos只支持dubbo路徑
  • 第一次先同步所有節(jié)點(diǎn)過去,再監(jiān)聽源集群路徑變化,同步到目標(biāo)集群

二、任務(wù)和配置入庫(kù)

入庫(kù)部分比較簡(jiǎn)單,只列出入口和處理類。

集群配置入庫(kù)

請(qǐng)求入口:ClusterApi#clusterAdd

入庫(kù)處理:ClusterAddProcessor#process

  1. clusterAccessService.insert(clusterDO); 

同步任務(wù)入庫(kù)

請(qǐng)求入口:TaskApi#taskAdd

入庫(kù)處理:TaskAccessService#addTask

  1. taskAccessService.addTask(taskDO); 

三、同步任務(wù)分發(fā)

同步任務(wù)入庫(kù)了,緊著需要任務(wù)進(jìn)行分發(fā)。代碼翻到QuerySyncTaskTimer實(shí)現(xiàn)了springboot的CommandLineRunner接口。

定時(shí)任務(wù)調(diào)度

  1. public void run(String... args) { 
  2.  scheduledExecutorService.scheduleWithFixedDelay(new CheckRunningStatusThread(), 0, 3000, 
  3.  TimeUnit.MILLISECONDS); 

備注: 定時(shí)任務(wù)每3秒鐘調(diào)度一次。

調(diào)度任務(wù)執(zhí)行

  1. private class CheckRunningStatusThread implements Runnable { 
  2.  
  3.     @Override 
  4.     public void run() { 
  5.  
  6.         Long start = System.currentTimeMillis(); 
  7.         try { 
  8.            // 注解@1 
  9.             Iterable<TaskDO> taskDOS = taskAccessService.findAll(); 
  10.             taskDOS.forEach(taskDO -> { 
  11.                // 注解@2  
  12.                 if ((null != skyWalkerCacheServices.getFinishedTask(taskDO))) { 
  13.                     return
  14.                 } 
  15.                // 注解@3 
  16.                 if (TaskStatusEnum.SYNC.getCode().equals(taskDO.getTaskStatus())) { 
  17.                     eventBus.post(new SyncTaskEvent(taskDO)); 
  18.                     log.info("從數(shù)據(jù)庫(kù)中查詢到一個(gè)同步任務(wù),發(fā)出一個(gè)同步事件:" + taskDO); 
  19.                 } 
  20.         // 注解@4 
  21.                 if (TaskStatusEnum.DELETE.getCode().equals(taskDO.getTaskStatus())) { 
  22.                     eventBus.post(new DeleteTaskEvent(taskDO)); 
  23.                     log.info("從數(shù)據(jù)庫(kù)中查詢到一個(gè)刪除任務(wù),發(fā)出一個(gè)同步事件:" + taskDO); 
  24.                 } 
  25.             }); 
  26.  
  27.         } catch (Exception e) { 
  28.             log.warn("CheckRunningStatusThread Exception", e); 
  29.         } 
  30.     // 注解@5 
  31.         metricsManager.record(MetricsStatisticsType.DISPATCHER_TASK, System.currentTimeMillis() - start); 
  32.     } 

注解@1 查詢所有同步任務(wù)

注解@2 過濾已完成的任務(wù)

注解@3 發(fā)布一個(gè)同步任務(wù)事件SyncTaskEvent

注解@4 發(fā)布一個(gè)刪除任務(wù)事件DeleteTaskEvent

注解@5 通過metric統(tǒng)計(jì)本次調(diào)度任務(wù)執(zhí)行的耗時(shí)情況

小結(jié): 當(dāng)有新增任務(wù)或者刪除任務(wù)時(shí)通過Guava的EventBus發(fā)布一個(gè)同步事件或刪除事件,該檢測(cè)3秒執(zhí)行一次。

四、同步事件處理

代碼EventListener#listenerSyncTaskEvent訂閱了同步事件SyncTaskEvent。

  1. @Subscribe 
  2. public void listenerSyncTaskEvent(SyncTaskEvent syncTaskEvent) { 
  3.  
  4.     try { 
  5.         long start = System.currentTimeMillis(); 
  6.        // 注解@6 
  7.         if (syncManagerService.sync(syncTaskEvent.getTaskDO())) {     
  8.            // 注解@7 
  9.             skyWalkerCacheServices.addFinishedTask(syncTaskEvent.getTaskDO()); 
  10.            // 注解@8 
  11.             metricsManager.record(MetricsStatisticsType.SYNC_TASK_RT, System.currentTimeMillis() - start); 
  12.         } else { 
  13.             log.warn("listenerSyncTaskEvent sync failure"); 
  14.         }                 
  15.     } catch (Exception e) { 
  16.         log.warn("listenerSyncTaskEvent process error", e); 
  17.     } 
  18.  

注解@6 執(zhí)行同步任務(wù)

注解@7 標(biāo)記該同步任務(wù)完成

注解@8 記錄任務(wù)執(zhí)行時(shí)間

代碼EventListener#listenerDeleteTaskEvent訂閱了刪除任務(wù)事件DeleteTaskEvent。

  1. @Subscribe 
  2. public void listenerDeleteTaskEvent(DeleteTaskEvent deleteTaskEvent) { 
  3.  
  4.     try { 
  5.         long start = System.currentTimeMillis(); 
  6.         if (syncManagerService.delete(deleteTaskEvent.getTaskDO())) { 
  7.             skyWalkerCacheServices.addFinishedTask(deleteTaskEvent.getTaskDO()); 
  8.             metricsManager.record(MetricsStatisticsType.DELETE_TASK_RT, System.currentTimeMillis() - start); 
  9.         } else { 
  10.             log.warn("listenerDeleteTaskEvent delete failure"); 
  11.         }                 
  12.     } catch (Exception e) { 
  13.         log.warn("listenerDeleteTaskEvent process error", e); 
  14.     } 
  15.  

小結(jié): listenerSyncTaskEvent和listenerDeleteTaskEvent代碼結(jié)構(gòu)一致,執(zhí)行任務(wù)邏輯,執(zhí)行完緩存已完成任務(wù),最后記錄耗時(shí)情況。

五、Nacos集群之間同步邏輯

先看下Nacos集群之間的同步,代碼在NacosSyncToNacosServiceImpl#sync部分。

執(zhí)行同步邏輯

  1. @Override 
  2. public boolean sync(TaskDO taskDO) { 
  3.   String taskId = taskDO.getTaskId(); 
  4.   try { 
  5.     // 注解@7 
  6.     NamingService sourceNamingService = 
  7.       nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace()); 
  8.  
  9.     // 注解@8 
  10.     NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), taskDO.getNameSpace()); 
  11.  
  12.  
  13.     this.listenerMap.putIfAbsent(taskId, event -> { 
  14.       if (event instanceof NamingEvent) { 
  15.         try { 
  16.           // 注解@9 
  17.           List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName(), 
  18.                                                                                getGroupNameOrDefault(taskDO.getGroupName()), new ArrayList<>(), true); 
  19.  
  20.           // 注解@10 
  21.           this.removeInvalidInstance(taskDO, destNamingService, sourceInstances); 
  22.  
  23.           // 注解@11 
  24.           if (sourceInstances.isEmpty()) { 
  25.             sourceInstanceSnapshot.remove(taskId); 
  26.             return
  27.           } 
  28.  
  29.           // 注解@12 
  30.           this.syncNewInstance(taskDO, destNamingService, sourceInstances); 
  31.         } catch (Exception e) { 
  32.           log.error("event process fail, taskId:{}", taskId, e); 
  33.           metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR); 
  34.         } 
  35.       } 
  36.     }); 
  37.  
  38.     sourceNamingService.subscribe(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()), 
  39.                                   listenerMap.get(taskId)); 
  40.   } catch (Exception e) { 
  41.     log.error("sync task from nacos to nacos was failed, taskId:{}", taskId, e); 
  42.     metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR); 
  43.     return false
  44.   } 
  45.   return true

注解@7 創(chuàng)建源集群的NameService

注解@8 創(chuàng)建目標(biāo)集群的NameService

注解@9 獲取服務(wù)注冊(cè)的實(shí)例

注解@10 先刪除已失效的節(jié)點(diǎn)

  1. private void removeInvalidInstance(TaskDO taskDO, NamingService destNamingService, 
  2.     List<Instance> sourceInstances) throws NacosException { 
  3.  
  4.     String taskId = taskDO.getTaskId(); 
  5.     if (this.sourceInstanceSnapshot.containsKey(taskId)) { 
  6.         // 注解@10.1 
  7.         Set<String> oldInstanceKeys = this.sourceInstanceSnapshot.get(taskId); 
  8.         List<String> newInstanceKeys = sourceInstances.stream().map(this::composeInstanceKey) 
  9.             .collect(Collectors.toList()); 
  10.         // 注解@10.2 
  11.         Collection<String> instanceKeys = Collections.subtract(oldInstanceKeys, newInstanceKeys); 
  12.         for (String instanceKey : instanceKeys) { 
  13.             log.info("任務(wù)Id:{},移除無效同步實(shí)例:{}", taskId, instanceKey); 
  14.             String[] split = instanceKey.split(":", -1); 
  15.             // 注解@10.3 
  16.             destNamingService 
  17.                 .deregisterInstance(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()), split[0], 
  18.                     Integer.parseInt(split[1])); 
  19.  
  20.         } 
  21.     } 

注解@10.1 緩存的舊節(jié)點(diǎn)信息

注解@10.2 從舊節(jié)點(diǎn)中剝離出廢棄無效的節(jié)點(diǎn)

注解@10.3 將廢棄無效節(jié)點(diǎn)注銷

注解@11 如果同步實(shí)例已經(jīng)為空代表該服務(wù)所有實(shí)例已經(jīng)下線,清除本地持有快照

注解@12 同步新節(jié)實(shí)例到目標(biāo)集群并更新緩存

  1. private void syncNewInstance(TaskDO taskDO, NamingService destNamingService, 
  2.     List<Instance> sourceInstances) throws NacosException { 
  3.     Set<String> latestSyncInstance = new TreeSet<>(); 
  4.     // 再次添加新實(shí)例 
  5.     String taskId = taskDO.getTaskId(); 
  6.     // 注解@12.1 
  7.     Set<String> instanceKeys = sourceInstanceSnapshot.get(taskId); 
  8.     // 注解@12.2 
  9.     for (Instance instance : sourceInstances) { 
  10.         if (needSync(instance.getMetadata())) { 
  11.             String instanceKey = composeInstanceKey(instance); 
  12.             // 注解@12.3 
  13.             if (CollectionUtils.isEmpty(instanceKeys) || !instanceKeys.contains(instanceKey)) { 
  14.                 destNamingService.registerInstance(taskDO.getServiceName(), 
  15.                     getGroupNameOrDefault(taskDO.getGroupName()), 
  16.                     buildSyncInstance(instance, taskDO)); 
  17.             } 
  18.             // 注解@12.4 
  19.             latestSyncInstance.add(instanceKey); 
  20.         } 
  21.     } 
  22.     if (CollectionUtils.isNotEmpty(latestSyncInstance)) { 
  23.  
  24.         log.info("任務(wù)Id:{},已同步實(shí)例個(gè)數(shù):{}", taskId, latestSyncInstance.size()); 
  25.         // 注解@12.5 
  26.         sourceInstanceSnapshot.put(taskId, latestSyncInstance); 
  27.     } 

注解@12.1 緩存的舊節(jié)點(diǎn)信息

注解@12.2 遍歷新節(jié)點(diǎn)信息

注解@12.3 當(dāng)新節(jié)點(diǎn)信息不為空并且舊節(jié)點(diǎn)不存在,則注冊(cè)到目標(biāo)集群

注解@12.4 收集新節(jié)點(diǎn)

注解@12.5 更新緩存節(jié)點(diǎn)信息

小結(jié): 在兩個(gè)Nacos集群之間進(jìn)行同步,同步任務(wù)在Service維度(AppId)建立。通過對(duì)源集群注冊(cè)監(jiān)聽獲取注冊(cè)節(jié)點(diǎn)列表,通過剔除無效節(jié)點(diǎn)后,將新的節(jié)點(diǎn)注冊(cè)到目標(biāo)集群的過程。

執(zhí)行刪除任務(wù)邏輯

代碼翻到NacosSyncToNacosServiceImpl#delete部分

  1. public boolean delete(TaskDO taskDO) { 
  2.     try { 
  3.         NamingService sourceNamingService = 
  4.             nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace()); 
  5.         NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), taskDO.getNameSpace()); 
  6.         // 注解@13 
  7.         sourceNamingService 
  8.             .unsubscribe(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()), 
  9.                 listenerMap.remove(taskDO.getTaskId())); 
  10.         sourceInstanceSnapshot.remove(taskDO.getTaskId()); 
  11.  
  12.         // 注解@14 
  13.         List<Instance> sourceInstances = sourceNamingService 
  14.             .getAllInstances(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()), 
  15.                 new ArrayList<>(), false); 
  16.         for (Instance instance : sourceInstances) { 
  17.             if (needSync(instance.getMetadata())) { 
  18.                // 注銷操作 
  19.                 destNamingService 
  20.                     .deregisterInstance(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()), 
  21.                         instance.getIp(), 
  22.                         instance.getPort()); 
  23.             } 
  24.         } 
  25.     } catch (Exception e) { 
  26.         log.error("delete task from nacos to nacos was failed, taskId:{}", taskDO.getTaskId(), e); 
  27.         metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR); 
  28.         return false
  29.     } 
  30.     return true

注解@13 移除該任務(wù)(service)源集群訂閱

注解@14 刪除目標(biāo)集群中同步的實(shí)例列表

小結(jié): 刪除邏輯比較簡(jiǎn)單,取消源集群訂閱,將目標(biāo)集群的注冊(cè)節(jié)點(diǎn)移除。

六、從zk集群同步到Nacos集群

再看從zk集群同步到Nacos集群,代碼翻到ZookeeperSyncToNacosServiceImpl#sync()

  1. @Override 
  2. public boolean sync(TaskDO taskDO) { 
  3.     try { 
  4.         if (treeCacheMap.containsKey(taskDO.getTaskId())) { 
  5.             return true
  6.         } 
  7.         // 注解@1 
  8.         TreeCache treeCache = getTreeCache(taskDO); 
  9.         // 注解@2 
  10.         NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null); 
  11.         // 注解@3 
  12.         registerAllInstances(taskDO, destNamingService); 
  13.         // 注解@4 
  14.         Objects.requireNonNull(treeCache).getListenable().addListener((client, event) -> { 
  15.             try { 
  16.                 String path = event.getData().getPath(); 
  17.                 Map<String, String> queryParam = parseQueryString(path); 
  18.                 if (isMatch(taskDO, queryParam) && needSync(queryParam)) { 
  19.                     processEvent(taskDO, destNamingService, event, path, queryParam); 
  20.                 } 
  21.             } catch (Exception e) { 
  22.                 // ... 
  23.             } 
  24.         }); 
  25.     } catch (Exception e) { 
  26.         // ... 
  27.         metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR); 
  28.         return false
  29.     } 
  30.     return true

注解@1 監(jiān)聽zk源集群 路徑為「/dubbo」

注解@2 目標(biāo)Nacos集群構(gòu)建

注解@3 初次執(zhí)行任務(wù)統(tǒng)一注冊(cè)所有實(shí)例

  1. private void registerAllInstances(TaskDO taskDO, NamingService destNamingService) throws Exception { 
  2.     CuratorFramework zk =  zookeeperServerHolder.get(taskDO.getSourceClusterId(), ""); 
  3.     // 注解@3.1 
  4.     if(!ALL_SERVICE_NAME_PATTERN.equals(taskDO.getServiceName())) { 
  5.         registerALLInstances0(taskDO, destNamingService, zk, taskDO.getServiceName()); 
  6.     } else { 
  7.         // 注解@3.2 
  8.         List<String> serviceList = zk.getChildren().forPath(DUBBO_ROOT_PATH); 
  9.         for(String serviceName : serviceList) { 
  10.             registerALLInstances0(taskDO, destNamingService, zk, serviceName); 
  11.         } 
  12.     } 

注解@3.1 同步特定服務(wù)注冊(cè)節(jié)點(diǎn)(Dubbo)

注解@3.2 同步全部所有的zk節(jié)點(diǎn)到Nacos

注解@4 注冊(cè)zk監(jiān)聽監(jiān)聽新增和更新的同步

  1. private void processEvent(TaskDO taskDO, NamingService destNamingService, TreeCacheEvent event, String path, 
  2.                           Map<String, String> queryParam) throws NacosException { 
  3.     if(!com.alibaba.nacossync.util.StringUtils.isDubboProviderPath(path)) { 
  4.         return
  5.     } 
  6.  
  7.     Map<String, String> ipAndPortParam = parseIpAndPortString(path); 
  8.     Instance instance = buildSyncInstance(queryParam, ipAndPortParam, taskDO); 
  9.     String serviceName = queryParam.get(INTERFACE_KEY); 
  10.     switch (event.getType()) { 
  11.         case NODE_ADDED: 
  12.         case NODE_UPDATED: 
  13.             // 注解@4.1 
  14.             destNamingService.registerInstance( 
  15.                 getServiceNameFromCache(serviceName, queryParam), instance); 
  16.             break; 
  17.         case NODE_REMOVED: 
  18.             // 注解@4.2 
  19.             destNamingService.deregisterInstance( 
  20.                 getServiceNameFromCache(serviceName, queryParam), 
  21.                 ipAndPortParam.get(INSTANCE_IP_KEY), 
  22.                 Integer.parseInt(ipAndPortParam.get(INSTANCE_PORT_KEY))); 
  23.             nacosServiceNameMap.remove(serviceName); 
  24.             break; 
  25.         default
  26.             break; 
  27.     } 

注解@4.1 同步節(jié)點(diǎn)新增更新到目標(biāo)集群

注解@4.2 源集群節(jié)點(diǎn)被刪除同步注銷目標(biāo)集群

小結(jié): NacosSync從zk集群同步到Nacos只支持dubbo路徑,可參考基于二次改造。第一次先同步所有節(jié)點(diǎn)過去,再監(jiān)聽源集群路徑變化,同步到目標(biāo)集群。

本文轉(zhuǎn)載自微信公眾號(hào)「瓜農(nóng)老梁」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系瓜農(nóng)老梁公眾號(hào)。

 

責(zé)任編輯:武曉燕 來源: 瓜農(nóng)老梁
相關(guān)推薦

2021-08-30 07:49:32

NacosSync雙向復(fù)制

2010-05-24 15:21:37

MySQL雙向

2021-01-22 09:47:22

鴻蒙HarmonyOS應(yīng)用開發(fā)

2009-03-16 13:44:29

雙向復(fù)制實(shí)例MySQL

2021-04-30 15:06:34

鴻蒙HarmonyOS應(yīng)用

2023-10-17 17:13:14

內(nèi)存程序源碼

2011-03-15 11:33:18

iptables

2014-08-26 11:11:57

AsyncHttpCl源碼分析

2024-11-04 06:00:00

redis雙向鏈表

2011-05-26 10:05:48

MongoDB

2021-11-11 17:40:08

WatchdogAndroid源碼分析

2011-05-26 16:18:51

Mongodb

2020-11-24 09:04:55

PriorityBlo

2021-11-26 17:17:43

Android廣播運(yùn)行原理源碼分析

2010-02-06 13:28:31

Android源碼

2023-02-26 08:42:10

源碼demouseEffect

2020-11-20 06:22:02

LinkedBlock

2021-07-12 08:00:21

Nacos 服務(wù)注冊(cè)源碼分析

2015-07-27 14:57:32

OpenFlow協(xié)議Ryu

2021-09-08 06:51:53

CountDownLa閉鎖源碼
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)