NacosSync雙向復(fù)制源碼分析
引言
通過開源同步工具NacosSync的分析,對(duì)我們實(shí)現(xiàn)自定義的同步工具提供參考。文本就同步任務(wù)分發(fā)與Nacos集群之間、從zk到Nacos的同步源碼做個(gè)分析。
一、內(nèi)容提要
任務(wù)和配置入庫(kù)
- 集群配置入庫(kù)
- 同步任務(wù)入庫(kù)
同步任務(wù)分發(fā)
- 每三秒調(diào)度一次任務(wù)列表
- 新增任務(wù)發(fā)布同步任務(wù)事件SyncTaskEvent并由listenerSyncTaskEvent處理
- 刪除任務(wù)發(fā)布刪除任務(wù)事件DeleteTaskEvent并由listenerDeleteTaskEvent處理
- 任務(wù)的發(fā)布和訂閱使用Guava的EventBus
Nacos集群之間同步邏輯
- 兩個(gè)Nacos集群之間進(jìn)行同步,同步任務(wù)在Service維度(AppId)建立
- 對(duì)源集群注冊(cè)監(jiān)聽獲取注冊(cè)節(jié)點(diǎn)列表,通過剔除無效節(jié)點(diǎn)后,將新的節(jié)點(diǎn)注冊(cè)到目標(biāo)集群
從zk集群同步到Nacos集群
- NacosSync從zk集群同步到Nacos只支持dubbo路徑
- 第一次先同步所有節(jié)點(diǎn)過去,再監(jiān)聽源集群路徑變化,同步到目標(biāo)集群
二、任務(wù)和配置入庫(kù)
入庫(kù)部分比較簡(jiǎn)單,只列出入口和處理類。
集群配置入庫(kù)
請(qǐng)求入口:ClusterApi#clusterAdd
入庫(kù)處理:ClusterAddProcessor#process
- clusterAccessService.insert(clusterDO);
同步任務(wù)入庫(kù)
請(qǐng)求入口:TaskApi#taskAdd
入庫(kù)處理:TaskAccessService#addTask
- taskAccessService.addTask(taskDO);
三、同步任務(wù)分發(fā)
同步任務(wù)入庫(kù)了,緊著需要任務(wù)進(jìn)行分發(fā)。代碼翻到QuerySyncTaskTimer實(shí)現(xiàn)了springboot的CommandLineRunner接口。
定時(shí)任務(wù)調(diào)度
- public void run(String... args) {
- scheduledExecutorService.scheduleWithFixedDelay(new CheckRunningStatusThread(), 0, 3000,
- TimeUnit.MILLISECONDS);
- }
備注: 定時(shí)任務(wù)每3秒鐘調(diào)度一次。
調(diào)度任務(wù)執(zhí)行
- private class CheckRunningStatusThread implements Runnable {
- @Override
- public void run() {
- Long start = System.currentTimeMillis();
- try {
- // 注解@1
- Iterable<TaskDO> taskDOS = taskAccessService.findAll();
- taskDOS.forEach(taskDO -> {
- // 注解@2
- if ((null != skyWalkerCacheServices.getFinishedTask(taskDO))) {
- return;
- }
- // 注解@3
- if (TaskStatusEnum.SYNC.getCode().equals(taskDO.getTaskStatus())) {
- eventBus.post(new SyncTaskEvent(taskDO));
- log.info("從數(shù)據(jù)庫(kù)中查詢到一個(gè)同步任務(wù),發(fā)出一個(gè)同步事件:" + taskDO);
- }
- // 注解@4
- if (TaskStatusEnum.DELETE.getCode().equals(taskDO.getTaskStatus())) {
- eventBus.post(new DeleteTaskEvent(taskDO));
- log.info("從數(shù)據(jù)庫(kù)中查詢到一個(gè)刪除任務(wù),發(fā)出一個(gè)同步事件:" + taskDO);
- }
- });
- } catch (Exception e) {
- log.warn("CheckRunningStatusThread Exception", e);
- }
- // 注解@5
- metricsManager.record(MetricsStatisticsType.DISPATCHER_TASK, System.currentTimeMillis() - start);
- }
- }
注解@1 查詢所有同步任務(wù)
注解@2 過濾已完成的任務(wù)
注解@3 發(fā)布一個(gè)同步任務(wù)事件SyncTaskEvent
注解@4 發(fā)布一個(gè)刪除任務(wù)事件DeleteTaskEvent
注解@5 通過metric統(tǒng)計(jì)本次調(diào)度任務(wù)執(zhí)行的耗時(shí)情況
小結(jié): 當(dāng)有新增任務(wù)或者刪除任務(wù)時(shí)通過Guava的EventBus發(fā)布一個(gè)同步事件或刪除事件,該檢測(cè)3秒執(zhí)行一次。
四、同步事件處理
代碼EventListener#listenerSyncTaskEvent訂閱了同步事件SyncTaskEvent。
- @Subscribe
- public void listenerSyncTaskEvent(SyncTaskEvent syncTaskEvent) {
- try {
- long start = System.currentTimeMillis();
- // 注解@6
- if (syncManagerService.sync(syncTaskEvent.getTaskDO())) {
- // 注解@7
- skyWalkerCacheServices.addFinishedTask(syncTaskEvent.getTaskDO());
- // 注解@8
- metricsManager.record(MetricsStatisticsType.SYNC_TASK_RT, System.currentTimeMillis() - start);
- } else {
- log.warn("listenerSyncTaskEvent sync failure");
- }
- } catch (Exception e) {
- log.warn("listenerSyncTaskEvent process error", e);
- }
- }
注解@6 執(zhí)行同步任務(wù)
注解@7 標(biāo)記該同步任務(wù)完成
注解@8 記錄任務(wù)執(zhí)行時(shí)間
代碼EventListener#listenerDeleteTaskEvent訂閱了刪除任務(wù)事件DeleteTaskEvent。
- @Subscribe
- public void listenerDeleteTaskEvent(DeleteTaskEvent deleteTaskEvent) {
- try {
- long start = System.currentTimeMillis();
- if (syncManagerService.delete(deleteTaskEvent.getTaskDO())) {
- skyWalkerCacheServices.addFinishedTask(deleteTaskEvent.getTaskDO());
- metricsManager.record(MetricsStatisticsType.DELETE_TASK_RT, System.currentTimeMillis() - start);
- } else {
- log.warn("listenerDeleteTaskEvent delete failure");
- }
- } catch (Exception e) {
- log.warn("listenerDeleteTaskEvent process error", e);
- }
- }
小結(jié): listenerSyncTaskEvent和listenerDeleteTaskEvent代碼結(jié)構(gòu)一致,執(zhí)行任務(wù)邏輯,執(zhí)行完緩存已完成任務(wù),最后記錄耗時(shí)情況。
五、Nacos集群之間同步邏輯
先看下Nacos集群之間的同步,代碼在NacosSyncToNacosServiceImpl#sync部分。
執(zhí)行同步邏輯
- @Override
- public boolean sync(TaskDO taskDO) {
- String taskId = taskDO.getTaskId();
- try {
- // 注解@7
- NamingService sourceNamingService =
- nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());
- // 注解@8
- NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), taskDO.getNameSpace());
- this.listenerMap.putIfAbsent(taskId, event -> {
- if (event instanceof NamingEvent) {
- try {
- // 注解@9
- List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName(),
- getGroupNameOrDefault(taskDO.getGroupName()), new ArrayList<>(), true);
- // 注解@10
- this.removeInvalidInstance(taskDO, destNamingService, sourceInstances);
- // 注解@11
- if (sourceInstances.isEmpty()) {
- sourceInstanceSnapshot.remove(taskId);
- return;
- }
- // 注解@12
- this.syncNewInstance(taskDO, destNamingService, sourceInstances);
- } catch (Exception e) {
- log.error("event process fail, taskId:{}", taskId, e);
- metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
- }
- }
- });
- sourceNamingService.subscribe(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()),
- listenerMap.get(taskId));
- } catch (Exception e) {
- log.error("sync task from nacos to nacos was failed, taskId:{}", taskId, e);
- metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
- return false;
- }
- return true;
- }
注解@7 創(chuàng)建源集群的NameService
注解@8 創(chuàng)建目標(biāo)集群的NameService
注解@9 獲取服務(wù)注冊(cè)的實(shí)例
注解@10 先刪除已失效的節(jié)點(diǎn)
- private void removeInvalidInstance(TaskDO taskDO, NamingService destNamingService,
- List<Instance> sourceInstances) throws NacosException {
- String taskId = taskDO.getTaskId();
- if (this.sourceInstanceSnapshot.containsKey(taskId)) {
- // 注解@10.1
- Set<String> oldInstanceKeys = this.sourceInstanceSnapshot.get(taskId);
- List<String> newInstanceKeys = sourceInstances.stream().map(this::composeInstanceKey)
- .collect(Collectors.toList());
- // 注解@10.2
- Collection<String> instanceKeys = Collections.subtract(oldInstanceKeys, newInstanceKeys);
- for (String instanceKey : instanceKeys) {
- log.info("任務(wù)Id:{},移除無效同步實(shí)例:{}", taskId, instanceKey);
- String[] split = instanceKey.split(":", -1);
- // 注解@10.3
- destNamingService
- .deregisterInstance(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()), split[0],
- Integer.parseInt(split[1]));
- }
- }
- }
注解@10.1 緩存的舊節(jié)點(diǎn)信息
注解@10.2 從舊節(jié)點(diǎn)中剝離出廢棄無效的節(jié)點(diǎn)
注解@10.3 將廢棄無效節(jié)點(diǎn)注銷
注解@11 如果同步實(shí)例已經(jīng)為空代表該服務(wù)所有實(shí)例已經(jīng)下線,清除本地持有快照
注解@12 同步新節(jié)實(shí)例到目標(biāo)集群并更新緩存
- private void syncNewInstance(TaskDO taskDO, NamingService destNamingService,
- List<Instance> sourceInstances) throws NacosException {
- Set<String> latestSyncInstance = new TreeSet<>();
- // 再次添加新實(shí)例
- String taskId = taskDO.getTaskId();
- // 注解@12.1
- Set<String> instanceKeys = sourceInstanceSnapshot.get(taskId);
- // 注解@12.2
- for (Instance instance : sourceInstances) {
- if (needSync(instance.getMetadata())) {
- String instanceKey = composeInstanceKey(instance);
- // 注解@12.3
- if (CollectionUtils.isEmpty(instanceKeys) || !instanceKeys.contains(instanceKey)) {
- destNamingService.registerInstance(taskDO.getServiceName(),
- getGroupNameOrDefault(taskDO.getGroupName()),
- buildSyncInstance(instance, taskDO));
- }
- // 注解@12.4
- latestSyncInstance.add(instanceKey);
- }
- }
- if (CollectionUtils.isNotEmpty(latestSyncInstance)) {
- log.info("任務(wù)Id:{},已同步實(shí)例個(gè)數(shù):{}", taskId, latestSyncInstance.size());
- // 注解@12.5
- sourceInstanceSnapshot.put(taskId, latestSyncInstance);
- }
- }
注解@12.1 緩存的舊節(jié)點(diǎn)信息
注解@12.2 遍歷新節(jié)點(diǎn)信息
注解@12.3 當(dāng)新節(jié)點(diǎn)信息不為空并且舊節(jié)點(diǎn)不存在,則注冊(cè)到目標(biāo)集群
注解@12.4 收集新節(jié)點(diǎn)
注解@12.5 更新緩存節(jié)點(diǎn)信息
小結(jié): 在兩個(gè)Nacos集群之間進(jìn)行同步,同步任務(wù)在Service維度(AppId)建立。通過對(duì)源集群注冊(cè)監(jiān)聽獲取注冊(cè)節(jié)點(diǎn)列表,通過剔除無效節(jié)點(diǎn)后,將新的節(jié)點(diǎn)注冊(cè)到目標(biāo)集群的過程。
執(zhí)行刪除任務(wù)邏輯
代碼翻到NacosSyncToNacosServiceImpl#delete部分
- public boolean delete(TaskDO taskDO) {
- try {
- NamingService sourceNamingService =
- nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());
- NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), taskDO.getNameSpace());
- // 注解@13
- sourceNamingService
- .unsubscribe(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()),
- listenerMap.remove(taskDO.getTaskId()));
- sourceInstanceSnapshot.remove(taskDO.getTaskId());
- // 注解@14
- List<Instance> sourceInstances = sourceNamingService
- .getAllInstances(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()),
- new ArrayList<>(), false);
- for (Instance instance : sourceInstances) {
- if (needSync(instance.getMetadata())) {
- // 注銷操作
- destNamingService
- .deregisterInstance(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()),
- instance.getIp(),
- instance.getPort());
- }
- }
- } catch (Exception e) {
- log.error("delete task from nacos to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
- metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
- return false;
- }
- return true;
- }
注解@13 移除該任務(wù)(service)源集群訂閱
注解@14 刪除目標(biāo)集群中同步的實(shí)例列表
小結(jié): 刪除邏輯比較簡(jiǎn)單,取消源集群訂閱,將目標(biāo)集群的注冊(cè)節(jié)點(diǎn)移除。
六、從zk集群同步到Nacos集群
再看從zk集群同步到Nacos集群,代碼翻到ZookeeperSyncToNacosServiceImpl#sync()
- @Override
- public boolean sync(TaskDO taskDO) {
- try {
- if (treeCacheMap.containsKey(taskDO.getTaskId())) {
- return true;
- }
- // 注解@1
- TreeCache treeCache = getTreeCache(taskDO);
- // 注解@2
- NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null);
- // 注解@3
- registerAllInstances(taskDO, destNamingService);
- // 注解@4
- Objects.requireNonNull(treeCache).getListenable().addListener((client, event) -> {
- try {
- String path = event.getData().getPath();
- Map<String, String> queryParam = parseQueryString(path);
- if (isMatch(taskDO, queryParam) && needSync(queryParam)) {
- processEvent(taskDO, destNamingService, event, path, queryParam);
- }
- } catch (Exception e) {
- // ...
- }
- });
- } catch (Exception e) {
- // ...
- metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
- return false;
- }
- return true;
- }
注解@1 監(jiān)聽zk源集群 路徑為「/dubbo」
注解@2 目標(biāo)Nacos集群構(gòu)建
注解@3 初次執(zhí)行任務(wù)統(tǒng)一注冊(cè)所有實(shí)例
- private void registerAllInstances(TaskDO taskDO, NamingService destNamingService) throws Exception {
- CuratorFramework zk = zookeeperServerHolder.get(taskDO.getSourceClusterId(), "");
- // 注解@3.1
- if(!ALL_SERVICE_NAME_PATTERN.equals(taskDO.getServiceName())) {
- registerALLInstances0(taskDO, destNamingService, zk, taskDO.getServiceName());
- } else {
- // 注解@3.2
- List<String> serviceList = zk.getChildren().forPath(DUBBO_ROOT_PATH);
- for(String serviceName : serviceList) {
- registerALLInstances0(taskDO, destNamingService, zk, serviceName);
- }
- }
- }
注解@3.1 同步特定服務(wù)注冊(cè)節(jié)點(diǎn)(Dubbo)
注解@3.2 同步全部所有的zk節(jié)點(diǎn)到Nacos
注解@4 注冊(cè)zk監(jiān)聽監(jiān)聽新增和更新的同步
- private void processEvent(TaskDO taskDO, NamingService destNamingService, TreeCacheEvent event, String path,
- Map<String, String> queryParam) throws NacosException {
- if(!com.alibaba.nacossync.util.StringUtils.isDubboProviderPath(path)) {
- return;
- }
- Map<String, String> ipAndPortParam = parseIpAndPortString(path);
- Instance instance = buildSyncInstance(queryParam, ipAndPortParam, taskDO);
- String serviceName = queryParam.get(INTERFACE_KEY);
- switch (event.getType()) {
- case NODE_ADDED:
- case NODE_UPDATED:
- // 注解@4.1
- destNamingService.registerInstance(
- getServiceNameFromCache(serviceName, queryParam), instance);
- break;
- case NODE_REMOVED:
- // 注解@4.2
- destNamingService.deregisterInstance(
- getServiceNameFromCache(serviceName, queryParam),
- ipAndPortParam.get(INSTANCE_IP_KEY),
- Integer.parseInt(ipAndPortParam.get(INSTANCE_PORT_KEY)));
- nacosServiceNameMap.remove(serviceName);
- break;
- default:
- break;
- }
- }
注解@4.1 同步節(jié)點(diǎn)新增更新到目標(biāo)集群
注解@4.2 源集群節(jié)點(diǎn)被刪除同步注銷目標(biāo)集群
小結(jié): NacosSync從zk集群同步到Nacos只支持dubbo路徑,可參考基于二次改造。第一次先同步所有節(jié)點(diǎn)過去,再監(jiān)聽源集群路徑變化,同步到目標(biāo)集群。
本文轉(zhuǎn)載自微信公眾號(hào)「瓜農(nóng)老梁」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系瓜農(nóng)老梁公眾號(hào)。