自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

集群中節(jié)點(diǎn)之間健康檢查

開(kāi)發(fā) 前端
當(dāng)新的節(jié)點(diǎn)加入集群或者集群中有節(jié)點(diǎn)下線了,集群之間可以通過(guò)健康檢查發(fā)現(xiàn)。健康檢查的頻率是怎么樣的?節(jié)點(diǎn)的狀態(tài)又是如何變動(dòng)的?狀態(tài)的變動(dòng)又會(huì)觸發(fā)什么動(dòng)作。帶著這些問(wèn)題本文捋一捋。

[[411449]]

引言

當(dāng)新的節(jié)點(diǎn)加入集群或者集群中有節(jié)點(diǎn)下線了,集群之間可以通過(guò)健康檢查發(fā)現(xiàn)。健康檢查的頻率是怎么樣的?節(jié)點(diǎn)的狀態(tài)又是如何變動(dòng)的?狀態(tài)的變動(dòng)又會(huì)觸發(fā)什么動(dòng)作。帶著這些問(wèn)題本文捋一捋。

一、內(nèi)容提要

內(nèi)容提要

健康檢查

  • Nacos節(jié)點(diǎn)會(huì)向集群其他節(jié)點(diǎn)發(fā)送健康檢查心跳,每一輪頻率為2秒
  • 當(dāng)健康檢查異常時(shí)設(shè)置為不信任「SUSPICIOUS」?fàn)顟B(tài),超過(guò)失敗最大次數(shù)3次設(shè)置為下線「DOWN」?fàn)顟B(tài)
  • 健康檢查成功設(shè)置該節(jié)點(diǎn)為科通信「UP」?fàn)顟B(tài)
  • 無(wú)論成功還是失敗當(dāng)節(jié)點(diǎn)狀態(tài)變更時(shí)均發(fā)布MembersChangeEvent事件

成員變更事件

當(dāng)集群節(jié)點(diǎn)成員變更時(shí),MemberChangeListener會(huì)收到該事件

例如回調(diào)ClusterRpcClientProxy#onEvent觸發(fā)refresh

刷新本節(jié)點(diǎn)與集群中其他節(jié)點(diǎn)的RPC狀態(tài),關(guān)閉無(wú)效的或者增加新的RPC連接

二、健康檢查

代碼翻到ServerMemberManager#onApplicationEvent,在Nacos啟動(dòng)的時(shí)候會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù),第一次延遲5秒執(zhí)行,該定時(shí)任務(wù)即負(fù)責(zé)節(jié)點(diǎn)之間的心跳。

  1. @Override 
  2. public void onApplicationEvent(WebServerInitializedEvent event) { 
  3.     getSelf().setState(NodeState.UP); 
  4.     if (!EnvUtil.getStandaloneMode()) { // 注解@1 
  5.         GlobalExecutor.scheduleByCommon(this.infoReportTask, 5_000L); 
  6.     } 
  7.     EnvUtil.setPort(event.getWebServer().getPort()); 
  8.     EnvUtil.setLocalAddress(this.localAddress); 
  9.     Loggers.CLUSTER.info("This node is ready to provide external services"); 

注解@1 非單機(jī)模式延遲5秒執(zhí)行,執(zhí)行的infoReportTask為MemberInfoReportTask。

  1. public abstract class Task implements Runnable { 
  2.      
  3.     protected volatile boolean shutdown = false
  4.      
  5.     @Override 
  6.     public void run() { // 注解@2 
  7.         if (shutdown) { 
  8.             return
  9.         } 
  10.         try { 
  11.             executeBody(); 
  12.         } catch (Throwable t) { 
  13.             Loggers.CORE.error("this task execute has error : {}", ExceptionUtil.getStackTrace(t)); 
  14.         } finally { 
  15.             if (!shutdown) { 
  16.                 after(); 
  17.             } 
  18.         } 
  19.     } 
  20.    

注解@2 看下這個(gè)Task執(zhí)行邏輯,先執(zhí)行 executeBody(),執(zhí)行結(jié)束后執(zhí)行after()。

  1. class MemberInfoReportTask extends Task { 
  2.  
  3.     private final GenericType<RestResult<String>> reference = new GenericType<RestResult<String>>() { 
  4.     }; 
  5.  
  6.     private int cursor = 0; 
  7.  
  8.     @Override 
  9.     protected void executeBody() { 
  10.         // ----------注解@1 start--------------- 
  11.        // 獲取集群中除了自身以外的其他節(jié)點(diǎn)列表 
  12.         List<Member> members = ServerMemberManager.this.allMembersWithoutSelf(); 
  13.         if (members.isEmpty()) { 
  14.             return
  15.         } 
  16.         // 定義一個(gè)游標(biāo) 
  17.         this.cursor = (this.cursor + 1) % members.size(); 
  18.         // 獲取每個(gè)節(jié)信息 
  19.         Member target = members.get(cursor); 
  20.     //-----------注解@1 end----------------- 
  21.         Loggers.CLUSTER.debug("report the metadata to the node : {}", target.getAddress()); 
  22.         // 注解@2 
  23.         final String url = HttpUtils 
  24.                 .buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT, 
  25.                         "/cluster/report"); 
  26.         try { 
  27.            // 注解@3 
  28.             asyncRestTemplate 
  29.                     .post(url, Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version), 
  30.                             Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() {  
  31.                                 @Override 
  32.                                 public void onReceive(RestResult<String> result) { // 注解@4 
  33.                                     // 注解@5 返回版本不一致 
  34.                                     if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value() 
  35.                                             || result.getCode() == HttpStatus.NOT_FOUND.value()) { 
  36.                                         // ... 
  37.                                         Member memberNew = target.copy(); 
  38.                                         if (memberNew.getAbilities() != null 
  39.                                                 && memberNew.getAbilities().getRemoteAbility() != null && memberNew 
  40.                                                 .getAbilities().getRemoteAbility().isSupportRemoteConnection()) { 
  41.                                             memberNew.getAbilities().getRemoteAbility() 
  42.                                                     .setSupportRemoteConnection(false); 
  43.                                             update(memberNew); // 更新節(jié)點(diǎn)屬性 
  44.                                         } 
  45.                                         return
  46.                                     } 
  47.                                     // 注解@6 
  48.                                     if (result.ok()) { 
  49.                                         MemberUtil.onSuccess(ServerMemberManager.this, target); 
  50.                                     } else { 
  51.                                      // 注解@7 處理失敗上報(bào) 
  52.                                         MemberUtil.onFail(ServerMemberManager.this, target); 
  53.                                     } 
  54.                                 } 
  55.  
  56.                                 @Override 
  57.                                 public void onError(Throwable throwable) { 
  58.                                    // 注解@8 處理失敗上報(bào) 
  59.                                      MemberUtil.onFail(ServerMemberManager.this, target, throwable); 
  60.                                 } 
  61.  
  62.                                 @Override 
  63.                                 public void onCancel() { 
  64.  
  65.                                 } 
  66.                             }); 
  67.         } catch (Throwable ex) { 
  68.             // ... 
  69.         } 
  70.     } 
  71.  
  72.     @Override 
  73.     protected void after() { 
  74.         GlobalExecutor.scheduleByCommon(this, 2_000L); // 注解@9 
  75.     } 

注解@1 獲取集群中除了自身以外的其他節(jié)點(diǎn)列表,通過(guò)游標(biāo)循環(huán)每個(gè)節(jié)點(diǎn)。

注解@2 構(gòu)造每個(gè)節(jié)點(diǎn)的上報(bào)url請(qǐng)求路徑為「/cluster/report」

注解@3 發(fā)起Post健康檢查請(qǐng)求,請(qǐng)求內(nèi)容為自身信息Member

注解@4 處理健康檢查返回結(jié)果,有以下三種類型

注解@5 版本過(guò)低錯(cuò)誤,這個(gè)可能在集群中版本不一致出現(xiàn)

注解@6 處理成功上報(bào),更新該節(jié)點(diǎn)member的狀態(tài)為UP表示科通信,設(shè)置失敗次數(shù)為0,并發(fā)布成員變更事件

  1. public static void onSuccess(final ServerMemberManager manager, final Member member) { 
  2.     final NodeState old = member.getState(); 
  3.     manager.getMemberAddressInfos().add(member.getAddress()); 
  4.     member.setState(NodeState.UP); // 狀態(tài)為UP可通信狀態(tài) 
  5.     member.setFailAccessCnt(0); // 失敗次數(shù)為0 
  6.     if (!Objects.equals(old, member.getState())) { 
  7.         manager.notifyMemberChange(); // 發(fā)布成員變更事件 
  8.     } 

注解@7&注解@8 均為處理失敗的上報(bào),例如:集群中一個(gè)節(jié)點(diǎn)被kill -9 殺掉后。在nacos-cluster.log日志文件中會(huì)打印如下日志,并發(fā)布成員變更事件

  1. 2021-07-0x 16:30:24,994 ERROR failed to report new info to target node : x.x.x.x:8848, error : caused: Connection refused; 
  2.  
  3. :2021-07-0x 16:30:30,995 ERROR failed to report new info to target node : x.x.x.x:8848, error : caused: Connection refused; 
  1. public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex) { 
  2.     manager.getMemberAddressInfos().remove(member.getAddress()); 
  3.     final NodeState old = member.getState(); 
  4.  
  5.     // 設(shè)置該節(jié)點(diǎn)為「不信任」 
  6.     member.setState(NodeState.SUSPICIOUS); 
  7.     // 失敗次數(shù)遞增+1 
  8.     member.setFailAccessCnt(member.getFailAccessCnt() + 1); 
  9.     // 默認(rèn)最大失敗重試次數(shù)為3 
  10.     int maxFailAccessCnt = EnvUtil.getProperty("nacos.core.member.fail-access-cnt"Integer.class, 3); 
  11.  
  12.     // If the number of consecutive failures to access the target node reaches 
  13.     // a maximum, or the link request is rejected, the state is directly down 
  14.     // 超過(guò)重試次數(shù)設(shè)置節(jié)點(diǎn)狀態(tài)為「下線」 
  15.     if (member.getFailAccessCnt() > maxFailAccessCnt || StringUtils 
  16.             .containsIgnoreCase(ex.getMessage(), TARGET_MEMBER_CONNECT_REFUSE_ERRMSG)) { 
  17.         member.setState(NodeState.DOWN); 
  18.     } 
  19.  
  20.     if (!Objects.equals(old, member.getState())) { 
  21.         manager.notifyMemberChange(); // 發(fā)布成員變更事件 
  22.     } 

被kill -9 殺掉的節(jié)點(diǎn)顯示狀態(tài)為下線DOWN

注解@9 執(zhí)行完executeBody后延遲2秒繼續(xù)執(zhí)行executeBody,也就是檢查健康檢查的心跳頻率為2秒,一輪全部節(jié)點(diǎn)檢查結(jié)束后延遲2秒接著下一輪

無(wú)論檢查成功還是失敗,當(dāng)節(jié)點(diǎn)狀態(tài)變更時(shí),發(fā)布成員變更事件。

  1. if (!Objects.equals(old, member.getState())) { 
  2.     manager.notifyMemberChange(); 
  3.  
  4. void notifyMemberChange() { 
  5.     NotifyCenter.publishEvent(MembersChangeEvent.builder().members(allMembers()).build()); 

小結(jié): Nacos節(jié)點(diǎn)會(huì)向集群其他節(jié)點(diǎn)發(fā)送健康檢查心跳,每一輪頻率為2秒;當(dāng)健康檢查異常時(shí)設(shè)置為不信任「SUSPICIOUS」?fàn)顟B(tài),超過(guò)失敗最大次數(shù)3次設(shè)置為下線「DOWN」?fàn)顟B(tài);健康檢查成功設(shè)置該節(jié)點(diǎn)為科通信「UP」?fàn)顟B(tài);無(wú)論成功還是失敗當(dāng)節(jié)點(diǎn)狀態(tài)變更時(shí)均發(fā)布MembersChangeEvent事件。

三、成員變更事件

當(dāng)集群中有節(jié)點(diǎn)下線或者新節(jié)點(diǎn)上線都會(huì)通過(guò)心跳健康檢查探測(cè)對(duì)節(jié)點(diǎn)狀態(tài)進(jìn)行改變。而狀態(tài)的變更均會(huì)觸發(fā)成員變更事件MembersChangeEvent。那訂閱到這個(gè)事件干啥呢?

ClusterRpcClientProxy繼承了MemberChangeListener,當(dāng)有MembersChangeEvent事件時(shí)會(huì)回調(diào)其onEvent方法。

  1. @Override 
  2. public void onEvent(MembersChangeEvent event) { 
  3.     try { 
  4.         List<Member> members = serverMemberManager.allMembersWithoutSelf(); 
  5.         refresh(members); 
  6.     } catch (NacosException e) { 
  7.         // ... 
  8.     } 

那接著看refresh方法。

  1. private void refresh(List<Member> members) throws NacosException { 
  2.  
  3.     for (Member member : members) { 
  4.         if (MemberUtil.isSupportedLongCon(member)) { 
  5.             // 注解@10 
  6.             createRpcClientAndStart(member, ConnectionType.GRPC); 
  7.         } 
  8.     } 
  9.     Set<Map.Entry<String, RpcClient>> allClientEntrys = RpcClientFactory.getAllClientEntries(); 
  10.     Iterator<Map.Entry<String, RpcClient>> iterator = allClientEntrys.iterator(); 
  11.     List<String> newMemberKeys = members.stream().filter(a -> MemberUtil.isSupportedLongCon(a)) 
  12.             .map(a -> memberClientKey(a)).collect(Collectors.toList()); 
  13.     // 關(guān)閉舊的grpc連接 
  14.     while (iterator.hasNext()) { 
  15.         Map.Entry<String, RpcClient> next1 = iterator.next(); 
  16.         if (next1.getKey().startsWith("Cluster-") && !newMemberKeys.contains(next1.getKey())) { 
  17.             Loggers.CLUSTER.info("member leave,destroy client of member - > : {}", next1.getKey()); 
  18.             RpcClientFactory.getClient(next1.getKey()).shutdown(); 
  19.             iterator.remove(); 
  20.         } 
  21.     } 
  22.  

注解@10 為集群中每個(gè)節(jié)點(diǎn)member創(chuàng)建rcp client,在client啟動(dòng)時(shí)會(huì)先目標(biāo)節(jié)點(diǎn)發(fā)送HealthCheckRequest,如果非健康節(jié)點(diǎn)將會(huì)被移除。見(jiàn)RpcClient類部分代碼。

  1. boolean isHealthy = healthCheck(); 
  2. // 非健康節(jié)點(diǎn) 
  3. if (!isHealthy) { 
  4.     if (currentConnection == null) { 
  5.         continue
  6.     } 
  7.     LoggerUtils.printIfInfoEnabled(LOGGER, 
  8.             "[{}]Server healthy check fail,currentConnection={}"name
  9.             currentConnection.getConnectionId()); 
  10.     // 標(biāo)記客戶端狀態(tài)為unhealthy 
  11.     rpcClientStatus.set(RpcClientStatus.UNHEALTHY); 
  12.     // 重置ReconnectContext移除serverInfo 
  13.     reconnectContext = new ReconnectContext(nullfalse); 

這個(gè)意味著如果集群中有節(jié)點(diǎn)下線,與下線節(jié)點(diǎn)的rpc將會(huì)失效;同樣如果集群中有新節(jié)點(diǎn)加入將會(huì)建立新的rpc通道。

小結(jié): 當(dāng)集群節(jié)點(diǎn)成員變更時(shí),MemberChangeListener會(huì)收到該事件。例如回調(diào)ClusterRpcClientProxy#onEvent觸發(fā)refresh。刷新本節(jié)點(diǎn)與集群中其他節(jié)點(diǎn)的RPC狀態(tài),關(guān)閉無(wú)效的或者增加新的RPC連接。

本文轉(zhuǎn)載自微信公眾號(hào)「瓜農(nóng)老梁」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系瓜農(nóng)老梁公眾號(hào)。

 

 

責(zé)任編輯:武曉燕 來(lái)源: 瓜農(nóng)老梁
相關(guān)推薦

2023-03-02 07:20:10

GRPC服務(wù)健康檢查協(xié)議

2023-03-03 08:19:35

KubernetesgRPC

2023-03-01 08:33:37

gRPC健康檢查代碼

2017-08-25 10:20:46

Docker容器機(jī)制

2023-10-14 15:36:14

PodKubernetes

2024-02-27 17:30:11

2020-12-07 06:29:13

SpringBoot

2023-05-09 07:34:25

Docker健康檢查方式

2021-01-15 05:38:28

ASPHttp端口

2022-09-07 09:19:49

Docker健康檢查

2021-04-18 10:34:28

Spring Clou郵件釘釘

2021-09-18 16:10:48

Spring BootJava微服務(wù)

2022-02-28 07:40:23

Nacos注冊(cè)中心客戶端

2021-08-02 07:57:03

注冊(cè)Nacos源碼

2021-09-21 16:31:56

Windows 11微軟PC健康檢查工具

2021-07-05 06:51:41

Nacos微服務(wù)源碼

2015-07-17 10:25:43

kubernetesDocker集群系統(tǒng)

2024-10-31 15:16:35

2012-08-03 11:21:50

應(yīng)用交付深信服

2023-03-07 07:49:06

Kubernetes容器
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)