集群中節(jié)點(diǎn)之間健康檢查
引言
當(dāng)新的節(jié)點(diǎn)加入集群或者集群中有節(jié)點(diǎn)下線了,集群之間可以通過(guò)健康檢查發(fā)現(xiàn)。健康檢查的頻率是怎么樣的?節(jié)點(diǎn)的狀態(tài)又是如何變動(dòng)的?狀態(tài)的變動(dòng)又會(huì)觸發(fā)什么動(dòng)作。帶著這些問(wèn)題本文捋一捋。
一、內(nèi)容提要
內(nèi)容提要
健康檢查
- Nacos節(jié)點(diǎn)會(huì)向集群其他節(jié)點(diǎn)發(fā)送健康檢查心跳,每一輪頻率為2秒
- 當(dāng)健康檢查異常時(shí)設(shè)置為不信任「SUSPICIOUS」?fàn)顟B(tài),超過(guò)失敗最大次數(shù)3次設(shè)置為下線「DOWN」?fàn)顟B(tài)
- 健康檢查成功設(shè)置該節(jié)點(diǎn)為科通信「UP」?fàn)顟B(tài)
- 無(wú)論成功還是失敗當(dāng)節(jié)點(diǎn)狀態(tài)變更時(shí)均發(fā)布MembersChangeEvent事件
成員變更事件
當(dāng)集群節(jié)點(diǎn)成員變更時(shí),MemberChangeListener會(huì)收到該事件
例如回調(diào)ClusterRpcClientProxy#onEvent觸發(fā)refresh
刷新本節(jié)點(diǎn)與集群中其他節(jié)點(diǎn)的RPC狀態(tài),關(guān)閉無(wú)效的或者增加新的RPC連接
二、健康檢查
代碼翻到ServerMemberManager#onApplicationEvent,在Nacos啟動(dòng)的時(shí)候會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù),第一次延遲5秒執(zhí)行,該定時(shí)任務(wù)即負(fù)責(zé)節(jié)點(diǎn)之間的心跳。
- @Override
- public void onApplicationEvent(WebServerInitializedEvent event) {
- getSelf().setState(NodeState.UP);
- if (!EnvUtil.getStandaloneMode()) { // 注解@1
- GlobalExecutor.scheduleByCommon(this.infoReportTask, 5_000L);
- }
- EnvUtil.setPort(event.getWebServer().getPort());
- EnvUtil.setLocalAddress(this.localAddress);
- Loggers.CLUSTER.info("This node is ready to provide external services");
- }
注解@1 非單機(jī)模式延遲5秒執(zhí)行,執(zhí)行的infoReportTask為MemberInfoReportTask。
- public abstract class Task implements Runnable {
- protected volatile boolean shutdown = false;
- @Override
- public void run() { // 注解@2
- if (shutdown) {
- return;
- }
- try {
- executeBody();
- } catch (Throwable t) {
- Loggers.CORE.error("this task execute has error : {}", ExceptionUtil.getStackTrace(t));
- } finally {
- if (!shutdown) {
- after();
- }
- }
- }
- }
注解@2 看下這個(gè)Task執(zhí)行邏輯,先執(zhí)行 executeBody(),執(zhí)行結(jié)束后執(zhí)行after()。
- class MemberInfoReportTask extends Task {
- private final GenericType<RestResult<String>> reference = new GenericType<RestResult<String>>() {
- };
- private int cursor = 0;
- @Override
- protected void executeBody() {
- // ----------注解@1 start---------------
- // 獲取集群中除了自身以外的其他節(jié)點(diǎn)列表
- List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();
- if (members.isEmpty()) {
- return;
- }
- // 定義一個(gè)游標(biāo)
- this.cursor = (this.cursor + 1) % members.size();
- // 獲取每個(gè)節(jié)信息
- Member target = members.get(cursor);
- //-----------注解@1 end-----------------
- Loggers.CLUSTER.debug("report the metadata to the node : {}", target.getAddress());
- // 注解@2
- final String url = HttpUtils
- .buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT,
- "/cluster/report");
- try {
- // 注解@3
- asyncRestTemplate
- .post(url, Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version),
- Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() {
- @Override
- public void onReceive(RestResult<String> result) { // 注解@4
- // 注解@5 返回版本不一致
- if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value()
- || result.getCode() == HttpStatus.NOT_FOUND.value()) {
- // ...
- Member memberNew = target.copy();
- if (memberNew.getAbilities() != null
- && memberNew.getAbilities().getRemoteAbility() != null && memberNew
- .getAbilities().getRemoteAbility().isSupportRemoteConnection()) {
- memberNew.getAbilities().getRemoteAbility()
- .setSupportRemoteConnection(false);
- update(memberNew); // 更新節(jié)點(diǎn)屬性
- }
- return;
- }
- // 注解@6
- if (result.ok()) {
- MemberUtil.onSuccess(ServerMemberManager.this, target);
- } else {
- // 注解@7 處理失敗上報(bào)
- MemberUtil.onFail(ServerMemberManager.this, target);
- }
- }
- @Override
- public void onError(Throwable throwable) {
- // 注解@8 處理失敗上報(bào)
- MemberUtil.onFail(ServerMemberManager.this, target, throwable);
- }
- @Override
- public void onCancel() {
- }
- });
- } catch (Throwable ex) {
- // ...
- }
- }
- @Override
- protected void after() {
- GlobalExecutor.scheduleByCommon(this, 2_000L); // 注解@9
- }
- }
注解@1 獲取集群中除了自身以外的其他節(jié)點(diǎn)列表,通過(guò)游標(biāo)循環(huán)每個(gè)節(jié)點(diǎn)。
注解@2 構(gòu)造每個(gè)節(jié)點(diǎn)的上報(bào)url請(qǐng)求路徑為「/cluster/report」
注解@3 發(fā)起Post健康檢查請(qǐng)求,請(qǐng)求內(nèi)容為自身信息Member
注解@4 處理健康檢查返回結(jié)果,有以下三種類型
注解@5 版本過(guò)低錯(cuò)誤,這個(gè)可能在集群中版本不一致出現(xiàn)
注解@6 處理成功上報(bào),更新該節(jié)點(diǎn)member的狀態(tài)為UP表示科通信,設(shè)置失敗次數(shù)為0,并發(fā)布成員變更事件
- public static void onSuccess(final ServerMemberManager manager, final Member member) {
- final NodeState old = member.getState();
- manager.getMemberAddressInfos().add(member.getAddress());
- member.setState(NodeState.UP); // 狀態(tài)為UP可通信狀態(tài)
- member.setFailAccessCnt(0); // 失敗次數(shù)為0
- if (!Objects.equals(old, member.getState())) {
- manager.notifyMemberChange(); // 發(fā)布成員變更事件
- }
- }
注解@7&注解@8 均為處理失敗的上報(bào),例如:集群中一個(gè)節(jié)點(diǎn)被kill -9 殺掉后。在nacos-cluster.log日志文件中會(huì)打印如下日志,并發(fā)布成員變更事件
- 2021-07-0x 16:30:24,994 ERROR failed to report new info to target node : x.x.x.x:8848, error : caused: Connection refused;
- :2021-07-0x 16:30:30,995 ERROR failed to report new info to target node : x.x.x.x:8848, error : caused: Connection refused;
- public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex) {
- manager.getMemberAddressInfos().remove(member.getAddress());
- final NodeState old = member.getState();
- // 設(shè)置該節(jié)點(diǎn)為「不信任」
- member.setState(NodeState.SUSPICIOUS);
- // 失敗次數(shù)遞增+1
- member.setFailAccessCnt(member.getFailAccessCnt() + 1);
- // 默認(rèn)最大失敗重試次數(shù)為3
- int maxFailAccessCnt = EnvUtil.getProperty("nacos.core.member.fail-access-cnt", Integer.class, 3);
- // If the number of consecutive failures to access the target node reaches
- // a maximum, or the link request is rejected, the state is directly down
- // 超過(guò)重試次數(shù)設(shè)置節(jié)點(diǎn)狀態(tài)為「下線」
- if (member.getFailAccessCnt() > maxFailAccessCnt || StringUtils
- .containsIgnoreCase(ex.getMessage(), TARGET_MEMBER_CONNECT_REFUSE_ERRMSG)) {
- member.setState(NodeState.DOWN);
- }
- if (!Objects.equals(old, member.getState())) {
- manager.notifyMemberChange(); // 發(fā)布成員變更事件
- }
- }
被kill -9 殺掉的節(jié)點(diǎn)顯示狀態(tài)為下線DOWN
注解@9 執(zhí)行完executeBody后延遲2秒繼續(xù)執(zhí)行executeBody,也就是檢查健康檢查的心跳頻率為2秒,一輪全部節(jié)點(diǎn)檢查結(jié)束后延遲2秒接著下一輪
無(wú)論檢查成功還是失敗,當(dāng)節(jié)點(diǎn)狀態(tài)變更時(shí),發(fā)布成員變更事件。
- if (!Objects.equals(old, member.getState())) {
- manager.notifyMemberChange();
- }
- void notifyMemberChange() {
- NotifyCenter.publishEvent(MembersChangeEvent.builder().members(allMembers()).build());
- }
小結(jié): Nacos節(jié)點(diǎn)會(huì)向集群其他節(jié)點(diǎn)發(fā)送健康檢查心跳,每一輪頻率為2秒;當(dāng)健康檢查異常時(shí)設(shè)置為不信任「SUSPICIOUS」?fàn)顟B(tài),超過(guò)失敗最大次數(shù)3次設(shè)置為下線「DOWN」?fàn)顟B(tài);健康檢查成功設(shè)置該節(jié)點(diǎn)為科通信「UP」?fàn)顟B(tài);無(wú)論成功還是失敗當(dāng)節(jié)點(diǎn)狀態(tài)變更時(shí)均發(fā)布MembersChangeEvent事件。
三、成員變更事件
當(dāng)集群中有節(jié)點(diǎn)下線或者新節(jié)點(diǎn)上線都會(huì)通過(guò)心跳健康檢查探測(cè)對(duì)節(jié)點(diǎn)狀態(tài)進(jìn)行改變。而狀態(tài)的變更均會(huì)觸發(fā)成員變更事件MembersChangeEvent。那訂閱到這個(gè)事件干啥呢?
ClusterRpcClientProxy繼承了MemberChangeListener,當(dāng)有MembersChangeEvent事件時(shí)會(huì)回調(diào)其onEvent方法。
- @Override
- public void onEvent(MembersChangeEvent event) {
- try {
- List<Member> members = serverMemberManager.allMembersWithoutSelf();
- refresh(members);
- } catch (NacosException e) {
- // ...
- }
- }
那接著看refresh方法。
- private void refresh(List<Member> members) throws NacosException {
- for (Member member : members) {
- if (MemberUtil.isSupportedLongCon(member)) {
- // 注解@10
- createRpcClientAndStart(member, ConnectionType.GRPC);
- }
- }
- Set<Map.Entry<String, RpcClient>> allClientEntrys = RpcClientFactory.getAllClientEntries();
- Iterator<Map.Entry<String, RpcClient>> iterator = allClientEntrys.iterator();
- List<String> newMemberKeys = members.stream().filter(a -> MemberUtil.isSupportedLongCon(a))
- .map(a -> memberClientKey(a)).collect(Collectors.toList());
- // 關(guān)閉舊的grpc連接
- while (iterator.hasNext()) {
- Map.Entry<String, RpcClient> next1 = iterator.next();
- if (next1.getKey().startsWith("Cluster-") && !newMemberKeys.contains(next1.getKey())) {
- Loggers.CLUSTER.info("member leave,destroy client of member - > : {}", next1.getKey());
- RpcClientFactory.getClient(next1.getKey()).shutdown();
- iterator.remove();
- }
- }
- }
注解@10 為集群中每個(gè)節(jié)點(diǎn)member創(chuàng)建rcp client,在client啟動(dòng)時(shí)會(huì)先目標(biāo)節(jié)點(diǎn)發(fā)送HealthCheckRequest,如果非健康節(jié)點(diǎn)將會(huì)被移除。見(jiàn)RpcClient類部分代碼。
- boolean isHealthy = healthCheck();
- // 非健康節(jié)點(diǎn)
- if (!isHealthy) {
- if (currentConnection == null) {
- continue;
- }
- LoggerUtils.printIfInfoEnabled(LOGGER,
- "[{}]Server healthy check fail,currentConnection={}", name,
- currentConnection.getConnectionId());
- // 標(biāo)記客戶端狀態(tài)為unhealthy
- rpcClientStatus.set(RpcClientStatus.UNHEALTHY);
- // 重置ReconnectContext移除serverInfo
- reconnectContext = new ReconnectContext(null, false);
這個(gè)意味著如果集群中有節(jié)點(diǎn)下線,與下線節(jié)點(diǎn)的rpc將會(huì)失效;同樣如果集群中有新節(jié)點(diǎn)加入將會(huì)建立新的rpc通道。
小結(jié): 當(dāng)集群節(jié)點(diǎn)成員變更時(shí),MemberChangeListener會(huì)收到該事件。例如回調(diào)ClusterRpcClientProxy#onEvent觸發(fā)refresh。刷新本節(jié)點(diǎn)與集群中其他節(jié)點(diǎn)的RPC狀態(tài),關(guān)閉無(wú)效的或者增加新的RPC連接。
本文轉(zhuǎn)載自微信公眾號(hào)「瓜農(nóng)老梁」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系瓜農(nóng)老梁公眾號(hào)。