自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

通過(guò) Ribbon 查詢 Nacos 服務(wù)實(shí)例

開(kāi)發(fā) 前端
Nacos 提供了開(kāi)放 API 可通過(guò) /nacos/v1/ns/instance/list 獲取服務(wù)列表。如果我們采用 spring-cloud 方式去獲取服務(wù),最終會(huì)通過(guò) Nacos Client + loadbalancer 的方式進(jìn)行客戶端負(fù)載均衡。

本文轉(zhuǎn)載自微信公眾號(hào)「運(yùn)維開(kāi)發(fā)故事」,作者老鄭。轉(zhuǎn)載本文請(qǐng)聯(lián)系運(yùn)維開(kāi)發(fā)故事公眾號(hào)。

Nacos 服務(wù)列表管理

Nacos 提供了開(kāi)放 API 可通過(guò) /nacos/v1/ns/instance/list 獲取服務(wù)列表。如果我們采用 spring-cloud 方式去獲取服務(wù),最終會(huì)通過(guò) Nacos Client + loadbalancer 的方式進(jìn)行客戶端負(fù)載均衡。

Ribbon 源碼解析

Ribbon 簡(jiǎn)介

Spring Cloud Ribbon 是 Netflix Ribbon 實(shí)現(xiàn)的一套客戶端負(fù)載均衡工具 簡(jiǎn)單的說(shuō),Ribbon 是 Netflix 發(fā)布的開(kāi)源項(xiàng)目,主要功能是提供客戶端的復(fù)雜算法和服務(wù)調(diào)用。 Ribbon 客戶端組件提供一系列完善的配置項(xiàng)如超時(shí)、重試等。簡(jiǎn)單的說(shuō),就是配置文件中列出 load Balancer (簡(jiǎn)稱 LB)后面所有的機(jī)器,Ribbon 會(huì)自動(dòng)的幫助你基于某種規(guī)則(如簡(jiǎn)單輪詢,隨機(jī)鏈接等)去鏈接這些機(jī)器。我們很容易使用 Ribbon 自定義的負(fù)載均衡算法。

Ribbon 使用

首先需要定義 RestTemplate 使用 Ribbon 策略;

  1. @Configuration 
  2. public class RestTemplateConfig { 
  3.     @LoadBalanced 
  4.     @Bean 
  5.     public RestTemplate restTemplate() { 
  6.         return new RestTemplate(); 
  7.     } 

本地使用 RestTemplate 調(diào)用遠(yuǎn)程接口;

  1. @Autowired 
  2. private RestTemplate restTemplate; 
  3. @RequestMapping(value = "/echo/{id}", method = RequestMethod.GET) 
  4. public String echo(@PathVariable Long id) { 
  5.     return restTemplate.getForObject("http://member-service/member/get/" + id, String.class); 

Ribbon 源碼分析

RestTemplate 繼承 InterceptingHttpAccessor 通過(guò) interceptors 字段接受 HttpRequestInterceptor 請(qǐng)求攔截器。對(duì)于 Ribbion 初始化類是 RibbonAutoConfiguration 中的, 它在 spring-cloud-netflix-ribbon 中定義。但是它在初始化之前,又需要加載 RibbonAutoConfiguration 配置,它是在 spring-cloud-common 中。具體的代碼如下:

  1. @Configuration(proxyBeanMethods = false
  2. // 工程中一定存在 RestTemplate 類 
  3. @ConditionalOnClass(RestTemplate.class) 
  4. // 容器中一定存在 LoadBalancerClient 類 Bean 實(shí)例 
  5. @ConditionalOnBean(LoadBalancerClient.class) 
  6. @EnableConfigurationProperties(LoadBalancerRetryProperties.class) 
  7. public class LoadBalancerAutoConfiguration { 
  8.  
  9.    // 獲取 Spring 容器中所有的 RestTemplate 實(shí)例 
  10.    @LoadBalanced 
  11.    @Autowired(required = false
  12.    private List<RestTemplate> restTemplates = Collections.emptyList(); 
  13.  
  14.    // 獲取 Spring 容器中 LoadBalancerRequestTransformer 實(shí)例 
  15.    @Autowired(required = false
  16.    private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList(); 
  17.  
  18.    // 在 Bean 初始化完成后會(huì)調(diào)用 afterSingletonsInstantiated 方法 
  19.    // 這里是一個(gè) lambda 表達(dá)式方式的實(shí)現(xiàn), 主要是為 restTemplate 實(shí)例設(shè)置 RestTemplateCustomizer 
  20.    @Bean 
  21.    public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated( 
  22.          final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) { 
  23.       return () -> restTemplateCustomizers.ifAvailable(customizers -> { 
  24.          for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) { 
  25.             for (RestTemplateCustomizer customizer : customizers) { 
  26.                customizer.customize(restTemplate); 
  27.             } 
  28.          } 
  29.       }); 
  30.    } 
  31.  
  32.    // LoadBalancerRequestFactory 工廠類 
  33.    // 主要是用來(lái)提供 LoadBalancerClient 實(shí)例和 LoadBalancerRequestTransformer 
  34.    @Bean 
  35.    @ConditionalOnMissingBean 
  36.    public LoadBalancerRequestFactory loadBalancerRequestFactory( 
  37.          LoadBalancerClient loadBalancerClient) { 
  38.       return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers); 
  39.    } 
  40.  
  41.    // LoadBalancerInterceptor 攔截器  
  42.    @Configuration(proxyBeanMethods = false
  43.    @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate"
  44.    static class LoadBalancerInterceptorConfig { 
  45.  
  46.       // 創(chuàng)建默認(rèn)的攔截器 LoadBalancerInterceptor 的實(shí)例 
  47.       @Bean 
  48.       public LoadBalancerInterceptor ribbonInterceptor( 
  49.             LoadBalancerClient loadBalancerClient, 
  50.             LoadBalancerRequestFactory requestFactory) { 
  51.          return new LoadBalancerInterceptor(loadBalancerClient, requestFactory); 
  52.       } 
  53.  
  54.       // 如果沒(méi)有 RestTemplateCustomizer 實(shí)例才會(huì)創(chuàng)建 
  55.       // 這里就就會(huì)為咱們所有的  restTemplate 實(shí)例添加 loadBalancerInterceptor 攔截器 
  56.       @Bean 
  57.       @ConditionalOnMissingBean 
  58.       public RestTemplateCustomizer restTemplateCustomizer( 
  59.             final LoadBalancerInterceptor loadBalancerInterceptor) { 
  60.          return restTemplate -> { 
  61.             List<ClientHttpRequestInterceptor> list = new ArrayList<>( 
  62.                   restTemplate.getInterceptors()); 
  63.             list.add(loadBalancerInterceptor); 
  64.             restTemplate.setInterceptors(list); 
  65.          }; 
  66.       } 
  67.  
  68.    } 
  69.   // ... 
  70.  

針對(duì)下面的代碼我們可以總結(jié)一下:

如果需要使用負(fù)載均衡,工程下面必須要有 RestTemplate 類, 然后Spring 容器中要有 LoadBalancerClient 的實(shí)例。

LoadBalancerClient 在 spring-cloud-netflix-ribbon 中只有一個(gè)實(shí)現(xiàn)類: RibbonLoadBalancerClient

利用 Spring 的 SmartInitializingSingleton 拓展點(diǎn),在 restTemplateCustomizer() 中為所有的 RestTemplate 添加 LoadBalancerInterceptor 攔截器

其實(shí) LoadBalancer 的本質(zhì)就是通過(guò)攔截器。利用 RestTemplate 的拓展點(diǎn)來(lái)實(shí)現(xiàn)請(qǐng)求服務(wù)的負(fù)載均衡。

LoadBalancerInterceptor

LoadBalancerInterceptor 攔截器會(huì)將請(qǐng)求交給 LoadBalancerClient 去處理,首先會(huì)選擇一個(gè) ILoadBalancer 的實(shí)現(xiàn)來(lái)處理獲取和選擇服務(wù),然后通過(guò) serviceName 和負(fù)載均衡算法去選擇 Server 對(duì)象。最后執(zhí)行請(qǐng)求。

  1. public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor { 
  2.  
  3.    // 負(fù)載均衡 
  4.    private LoadBalancerClient loadBalancer; 
  5.  
  6.    // 構(gòu)建請(qǐng)求  
  7.    private LoadBalancerRequestFactory requestFactory; 
  8.  
  9.    // ... 
  10.  
  11.    @Override 
  12.    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, 
  13.          final ClientHttpRequestExecution execution) throws IOException { 
  14.       final URI originalUri = request.getURI(); 
  15.       String serviceName = originalUri.getHost();  
  16.       return this.loadBalancer.execute(serviceName, 
  17.             this.requestFactory.createRequest(request, body, execution)); 
  18.    } 
  19.  

RibbonLoadBalancerClient

我們通過(guò)跟蹤 this.loadBalancer.execute 代碼發(fā)現(xiàn)。最終所有的請(qǐng)求都交由 RibbonLoadBalancerClient 去處理。它實(shí)現(xiàn)了。LoadBalancerClient 接口, 代碼如下:

  1. public interface ServiceInstanceChooser { 
  2.  
  3.   // 通過(guò) serviceId 選擇具體的服務(wù)實(shí)例 
  4.   ServiceInstance choose(String serviceId); 
  5.  
  6.  
  7. public interface LoadBalancerClient extends ServiceInstanceChooser { 
  8.  
  9.   <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException; 
  10.   <T> T execute(String serviceId, ServiceInstance serviceInstance, 
  11.          LoadBalancerRequest<T> request) throws IOException; 
  12.   // 將服務(wù)實(shí)例信息替換還具體的 IP 信息  
  13.   URI reconstructURI(ServiceInstance instance, URI original); 
  14.  

我們先來(lái)分析 RibbonLoadBalancerClient 的 choose 方法

  1. @Override 
  2. public ServiceInstance choose(String serviceId) { 
  3.    return choose(serviceId, null); 
  4.  
  5. // 通過(guò)服務(wù)名選擇具體的服務(wù)實(shí)例 
  6. public ServiceInstance choose(String serviceId, Object hint) { 
  7.    Server server = getServer(getLoadBalancer(serviceId), hint); 
  8.    if (server == null) { 
  9.       return null
  10.    } 
  11.    return new RibbonServer(serviceId, server, isSecure(server, serviceId), 
  12.          serverIntrospector(serviceId).getMetadata(server)); 
  13.  
  14. // 通過(guò)服務(wù)名選擇一個(gè)負(fù)載均衡器, 默認(rèn)是 `ZoneAwareLoadBalancer` 
  15. protected ILoadBalancer getLoadBalancer(String serviceId) { 
  16.    return this.clientFactory.getLoadBalancer(serviceId); 
  17.  
  18. // 獲取服務(wù) 
  19. protected Server getServer(ILoadBalancer loadBalancer) { 
  20.    return getServer(loadBalancer, null); 
  21. protected Server getServer(ILoadBalancer loadBalancer, Object hint) { 
  22.    if (loadBalancer == null) { 
  23.       return null
  24.    } 
  25.    // Use 'default' on a null hint, or just pass it on
  26.    return loadBalancer.chooseServer(hint != null ? hint : "default"); 

LoadBalancerInterceptor 執(zhí)行的時(shí)候是直接委托執(zhí)行的 loadBalancer.execute() 這個(gè)方法:

  1. // LoadBalancerRequest 是通過(guò) LoadBalancerRequestFactory.createRequest(request, body, execution) 創(chuàng)建 
  2. // 它實(shí)現(xiàn) LoadBalancerRequest 接口是用的一個(gè)匿名內(nèi)部類,泛型類型是ClientHttpResponse 
  3. // 因?yàn)樽罱K執(zhí)行的顯然還是執(zhí)行器:ClientHttpRequestExecution.execute() 
  4. @Override 
  5. public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { 
  6.   return execute(serviceId, request, null); 
  7.  
  8. public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException { 
  9.   // 拿到負(fù)載均衡器,然后拿到一個(gè)serverInstance實(shí)例 
  10.   ILoadBalancer loadBalancer = getLoadBalancer(serviceId); 
  11.   Server server = getServer(loadBalancer, hint); 
  12.   if (server == null) { // 若沒(méi)找到就直接拋出異常。這里使用的是IllegalStateException這個(gè)異常 
  13.     throw new IllegalStateException("No instances available for " + serviceId); 
  14.   } 
  15.  
  16.   // 把Server適配為RibbonServer  isSecure:客戶端是否安全 
  17.   // serverIntrospector內(nèi)省  參考配置文件:ServerIntrospectorProperties 
  18.   RibbonServer ribbonServer = new RibbonServer(serviceId, server, 
  19.       isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); 
  20.  
  21.   //調(diào)用本類的重載接口方法 
  22.   return execute(serviceId, ribbonServer, request); 
  23.  
  24. // 它的參數(shù)是 ServiceInstance --> 已經(jīng)確定了唯一的Server實(shí)例 
  25. @Override 
  26. public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { 
  27.  
  28.   // 拿到 Server,RibbonServer 是 execute 時(shí)的唯一實(shí)現(xiàn) 
  29.   Server server = null
  30.   if (serviceInstance instanceof RibbonServer) { 
  31.     server = ((RibbonServer) serviceInstance).getServer(); 
  32.   } 
  33.   if (server == null) { 
  34.     throw new IllegalStateException("No instances available for " + serviceId); 
  35.   } 
  36.  
  37.   // 執(zhí)行的上下文是和serviceId綁定的 
  38.   RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId); 
  39.   ...  
  40.   // 真正的向server發(fā)送請(qǐng)求,得到返回值 
  41.   // 因?yàn)橛袛r截器,所以這里肯定說(shuō)執(zhí)行的是InterceptingRequestExecution.execute()方法 
  42.   // so會(huì)調(diào)用ServiceRequestWrapper.getURI(),從而就會(huì)調(diào)用reconstructURI()方法 
  43.     T returnVal = request.apply(serviceInstance); 
  44.     return returnVal; 
  45.   ... // 異常處理 

returnVal 是一個(gè) ClientHttpResponse,最后交給 handleResponse()方法來(lái)處理異常情況(若存在的話),若無(wú)異常就交給提取器提值:responseExtractor.extractData(response),這樣整個(gè)請(qǐng)求就算全部完成了。

ZoneAwareLoadBalancer

負(fù)載均衡器 ZoneAwareLoadBalancer 的類圖結(jié)構(gòu)如下圖所示。它 DynamicServerListLoadBalancer 它的父類, 核心方法 重置和初始化:restOfInit(clientConfig) 更新服務(wù)列表:updateListOfServers(); 這個(gè)方需要調(diào)用到 ServerList.getUpdatedListOfServers() 這里就會(huì)調(diào)用到具體的注冊(cè)中心實(shí)現(xiàn),以 Nacos 為例他的實(shí)現(xiàn)就是 NacosServerList#getUpdatedListOfServers();

  • 更新所有服務(wù)列表:updateAllServerList();
  • 設(shè)置所有服務(wù)列表 setServersList() ZoneAwareLoadBalancer 它的核心方法:
  • 選擇服務(wù)實(shí)例 chooseServer()
  • 選擇負(fù)載均衡器 getLoadBalancer
  • 選擇區(qū)域內(nèi)的服務(wù)實(shí)例:zoneLoadBalancer.chooseServer

Ribbon 總結(jié)

針對(duì) @LoadBalanced 下的 RestTemplate 的使用,我總結(jié)如下:

  • 傳入的String類型的url必須是絕對(duì)路徑(http://...),否則拋出異常:java.lang.IllegalArgumentException: URI is not absolute
  • serviceId 不區(qū)分大小寫(xiě)(http://order-service/...效果同http://OERDER-SERVICE/...)
  • serviceId 后請(qǐng)不要跟 port 端口號(hào)

最后,需要特別指出的是:標(biāo)注有@LoadBalanced 的 RestTemplate 只能填寫(xiě) serviceId 而不能再寫(xiě) IP地址/域名去發(fā)送請(qǐng)求了, 若你的項(xiàng)目中兩種 case 都有需要,需要定義多個(gè) RestTemplate 分別應(yīng)對(duì)不同的使用場(chǎng)景

Nacos 服務(wù)查詢

客戶端查詢

如果我們使用默認(rèn)的 Nacos 客戶端,那么走的就是 NacosServerList#getUpdatedListOfServers();接口來(lái)查詢服務(wù)列表。

  1. public class NacosServerList extends AbstractServerList<NacosServer> { 
  2.  
  3.   private NacosDiscoveryProperties discoveryProperties; 
  4.  
  5.   @Override 
  6.   public List<NacosServer> getUpdatedListOfServers() { 
  7.     return getServers(); 
  8.   } 
  9.      
  10.     private List<NacosServer> getServers() { 
  11.     try { 
  12.       String group = discoveryProperties.getGroup(); 
  13.       // discoveryProperties.namingServiceInstance()  
  14.             // 最終通過(guò)反射獲取 com.alibaba.nacos.client.naming.NacosNamingService 實(shí)例 
  15.             List<Instance> instances = discoveryProperties.namingServiceInstance() 
  16.           .selectInstances(serviceId, grouptrue); 
  17.       return instancesToServerList(instances); 
  18.     } 
  19.     catch (Exception e) { 
  20.       throw new IllegalStateException( 
  21.           "Can not get service instances from nacos, serviceId=" + serviceId, 
  22.           e); 
  23.     } 
  24.   } 

然后調(diào)用 selectInstances 方法

  1. @Override 
  2. public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, 
  3.                                       boolean subscribe) throws NacosException { 
  4.  
  5.     ServiceInfo serviceInfo; 
  6.     // subscribe 默認(rèn)傳的是 true 
  7.     if (subscribe) { 
  8.         serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), 
  9.                                                  StringUtils.join(clusters, ",")); 
  10.     } else { 
  11.         serviceInfo = hostReactor 
  12.             .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), 
  13.                                               StringUtils.join(clusters, ",")); 
  14.     } 
  15.     return selectInstances(serviceInfo, healthy); 

其實(shí)核心的邏輯在 hostReactor.getServiceInfo 在查詢服務(wù)信息里面會(huì)把當(dāng)前的 serviceName、 clusters 轉(zhuǎn)換為 key, 然后通過(guò) getServiceInfo0 方法查詢服務(wù)信息這里主要是查詢的是本地的數(shù)據(jù)。

如果 null == serviceObj 會(huì)在 updateServiceNow 里面去調(diào)用 /instance/list接口查詢服務(wù)信息

  1. public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { 
  2.          
  3.         NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); 
  4.         String key = ServiceInfo.getKey(serviceName, clusters); 
  5.         if (failoverReactor.isFailoverSwitch()) { 
  6.             return failoverReactor.getService(key); 
  7.         } 
  8.          
  9.         ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); 
  10.          
  11.         if (null == serviceObj) { 
  12.             serviceObj = new ServiceInfo(serviceName, clusters); 
  13.              
  14.             serviceInfoMap.put(serviceObj.getKey(), serviceObj); 
  15.              
  16.             updatingMap.put(serviceName, new Object()); 
  17.             updateServiceNow(serviceName, clusters); 
  18.             updatingMap.remove(serviceName); 
  19.              
  20.         } else if (updatingMap.containsKey(serviceName)) { 
  21.             // UPDATE_HOLD_INTERVAL 為常量默認(rèn)金輝進(jìn)去 
  22.             if (UPDATE_HOLD_INTERVAL > 0) { 
  23.                 // hold a moment waiting for update finish 
  24.                 synchronized (serviceObj) { 
  25.                     try { 
  26.                         // 最大等待時(shí)間 5s, 在更新 serviceObj 之后, 就會(huì)執(zhí)行 notifyAll() 
  27.                         // 方法入口 updateService(String serviceName, String clusters) 
  28.                         // 最大延遲 2s DEFAULT_DELAY = 1 
  29.                         serviceObj.wait(UPDATE_HOLD_INTERVAL); 
  30.                     } catch (InterruptedException e) { 
  31.                         NAMING_LOGGER 
  32.                                 .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); 
  33.                     } 
  34.                 } 
  35.             } 
  36.         } 
  37.          
  38.       // 通過(guò) Schedule 更新 服務(wù)信息  
  39.         scheduleUpdateIfAbsent(serviceName, clusters); 
  40.          
  41.       // 獲取最新的值 
  42.         return serviceInfoMap.get(serviceObj.getKey()); 
  43.     } 

代碼看到這里我們不難理解,為什么第一次 Ribbon 調(diào)用的時(shí)候都會(huì)比較慢,因?yàn)樗厝コ跏蓟?wù)列表,然后通過(guò) Nacos Client 去 Nacos 查詢服務(wù)實(shí)例信息。

服務(wù)端處理

服務(wù)端通過(guò) /instance/list 接口來(lái)處理服務(wù)實(shí)例信息查詢請(qǐng)求。首先服務(wù)實(shí)例信息都是被存儲(chǔ)在 ConcurrentHashMap 中

  1. /** 
  2.  * Map(namespace, Map(group::serviceName, Service)). 
  3.  */ 
  4. private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); 

在我們查詢的過(guò)程中主要是通過(guò) ServiceManager 來(lái)進(jìn)行管理, 核心的入口方法在 InstanceController#doSrvIpxt 中

  1. public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP, 
  2.           int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception { 
  3.        
  4.       ClientInfo clientInfo = new ClientInfo(agent); 
  5.       ObjectNode result = JacksonUtils.createEmptyJsonNode(); 
  6.       Service service = serviceManager.getService(namespaceId, serviceName); 
  7.       long cacheMillis = switchDomain.getDefaultCacheMillis(); 
  8.        
  9.       // now try to enable the push 
  10.       try { 
  11.           if (udpPort > 0 && pushService.canEnablePush(agent)) { 
  12.                
  13.               pushService 
  14.                       .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort), 
  15.                               pushDataSource, tid, app); 
  16.               cacheMillis = switchDomain.getPushCacheMillis(serviceName); 
  17.           } 
  18.       } catch (Exception e) { 
  19.           Loggers.SRV_LOG 
  20.                   .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e); 
  21.           cacheMillis = switchDomain.getDefaultCacheMillis(); 
  22.       } 
  23.        
  24.       if (service == null) { 
  25.           if (Loggers.SRV_LOG.isDebugEnabled()) { 
  26.               Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName); 
  27.           } 
  28.           result.put("name", serviceName); 
  29.           result.put("clusters", clusters); 
  30.           result.put("cacheMillis", cacheMillis); 
  31.           result.replace("hosts", JacksonUtils.createEmptyArrayNode()); 
  32.           return result; 
  33.       } 
  34.        
  35.       checkIfDisabled(service); 
  36.        
  37.       List<Instance> srvedIPs; 
  38.        
  39.       // 查詢所有的服務(wù) 
  40.       // 內(nèi)部會(huì)更新服務(wù)列表 
  41.       // allInstances.addAll(persistentInstances); 
  42.       // allInstances.addAll(ephemeralInstances); 
  43.       srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ","))); 
  44.        
  45.       // filter ips using selector: 
  46.       if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) { 
  47.           srvedIPs = service.getSelector().select(clientIP, srvedIPs); 
  48.       } 
  49.        
  50.       if (CollectionUtils.isEmpty(srvedIPs)) { 
  51.            
  52.           if (Loggers.SRV_LOG.isDebugEnabled()) { 
  53.               Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName); 
  54.           } 
  55.            
  56.           if (clientInfo.type == ClientInfo.ClientType.JAVA 
  57.                   && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { 
  58.               result.put("dom", serviceName); 
  59.           } else { 
  60.               result.put("dom", NamingUtils.getServiceName(serviceName)); 
  61.           } 
  62.            
  63.           result.put("name", serviceName); 
  64.           result.put("cacheMillis", cacheMillis); 
  65.           result.put("lastRefTime", System.currentTimeMillis()); 
  66.           result.put("checksum", service.getChecksum()); 
  67.           result.put("useSpecifiedURL"false); 
  68.           result.put("clusters", clusters); 
  69.           result.put("env", env); 
  70.           result.set("hosts", JacksonUtils.createEmptyArrayNode()); 
  71.           result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata())); 
  72.           return result; 
  73.       } 
  74.        
  75.       Map<Boolean, List<Instance>> ipMap = new HashMap<>(2); 
  76.       ipMap.put(Boolean.TRUE, new ArrayList<>()); 
  77.       ipMap.put(Boolean.FALSE, new ArrayList<>()); 
  78.        
  79.       for (Instance ip : srvedIPs) { 
  80.           ipMap.get(ip.isHealthy()).add(ip); 
  81.       } 
  82.        
  83.       if (isCheck) { 
  84.           result.put("reachProtectThreshold"false); 
  85.       } 
  86.        
  87.       double threshold = service.getProtectThreshold(); 
  88.        
  89.       if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) { 
  90.            
  91.           Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName); 
  92.           if (isCheck) { 
  93.               result.put("reachProtectThreshold"true); 
  94.           } 
  95.            
  96.           ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE)); 
  97.           ipMap.get(Boolean.FALSE).clear(); 
  98.       } 
  99.        
  100.       if (isCheck) { 
  101.           result.put("protectThreshold", service.getProtectThreshold()); 
  102.           result.put("reachLocalSiteCallThreshold"false); 
  103.            
  104.           return JacksonUtils.createEmptyJsonNode(); 
  105.       } 
  106.        
  107.       ArrayNode hosts = JacksonUtils.createEmptyArrayNode(); 
  108.        
  109.       for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) { 
  110.           List<Instance> ips = entry.getValue(); 
  111.            
  112.           if (healthyOnly && !entry.getKey()) { 
  113.               continue
  114.           } 
  115.            
  116.           for (Instance instance : ips) { 
  117.                
  118.               // remove disabled instance: 
  119.               if (!instance.isEnabled()) { 
  120.                   continue
  121.               } 
  122.                
  123.               ObjectNode ipObj = JacksonUtils.createEmptyJsonNode(); 
  124.                
  125.               ipObj.put("ip", instance.getIp()); 
  126.               ipObj.put("port", instance.getPort()); 
  127.               // deprecated since nacos 1.0.0: 
  128.               ipObj.put("valid", entry.getKey()); 
  129.               ipObj.put("healthy", entry.getKey()); 
  130.               ipObj.put("marked", instance.isMarked()); 
  131.               ipObj.put("instanceId", instance.getInstanceId()); 
  132.               ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata())); 
  133.               ipObj.put("enabled", instance.isEnabled()); 
  134.               ipObj.put("weight", instance.getWeight()); 
  135.               ipObj.put("clusterName", instance.getClusterName()); 
  136.               if (clientInfo.type == ClientInfo.ClientType.JAVA 
  137.                       && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { 
  138.                   ipObj.put("serviceName", instance.getServiceName()); 
  139.               } else { 
  140.                   ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName())); 
  141.               } 
  142.                
  143.               ipObj.put("ephemeral", instance.isEphemeral()); 
  144.               hosts.add(ipObj); 
  145.                
  146.           } 
  147.       } 
  148.        
  149.       result.replace("hosts", hosts); 
  150.       if (clientInfo.type == ClientInfo.ClientType.JAVA 
  151.               && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { 
  152.           result.put("dom", serviceName); 
  153.       } else { 
  154.           result.put("dom", NamingUtils.getServiceName(serviceName)); 
  155.       } 
  156.       result.put("name", serviceName); 
  157.       result.put("cacheMillis", cacheMillis); 
  158.       result.put("lastRefTime", System.currentTimeMillis()); 
  159.       result.put("checksum", service.getChecksum()); 
  160.       result.put("useSpecifiedURL"false); 
  161.       result.put("clusters", clusters); 
  162.       result.put("env", env); 
  163.       result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata())); 
  164.       return result; 
  165.   } 

在上面的核心邏輯主要是:

  • 調(diào)用 service.srvIPs 方法查詢所有的服務(wù)實(shí)例信息
  • Cluster#allIPs會(huì)將所有的服務(wù)注冊(cè)信息寫(xiě)到服務(wù)注冊(cè)列表。

參考鏈接

https://nacos.io

https://zhuanlan.zhihu.com

https://blog.csdn.net/f641385712/article/details/100788040

 

責(zé)任編輯:武曉燕 來(lái)源: 運(yùn)維開(kāi)發(fā)故事
相關(guān)推薦

2022-03-10 07:41:36

調(diào)用服務(wù)Nacos

2009-10-09 17:18:13

RHEL配置NIS

2009-12-11 13:59:35

F#

2010-03-02 14:06:37

WCF服務(wù)實(shí)例管理模式

2009-08-14 17:04:19

Windows后臺(tái)服務(wù)

2025-03-07 08:17:36

2010-02-26 14:49:10

WCF服務(wù)實(shí)例單一性

2010-09-24 19:12:11

SQL隱性事務(wù)

2013-03-19 10:35:24

Oracle

2013-02-26 10:23:52

F5電信

2024-09-04 10:44:19

2022-07-08 08:37:23

Nacos服務(wù)注冊(cè)動(dòng)態(tài)配置

2024-07-02 10:58:53

2017-10-21 21:46:32

2023-04-10 23:05:54

NacosOpenFeignRibbon

2021-09-06 06:45:07

NacosUdp通信

2017-09-05 14:05:11

微服務(wù)spring clou路由

2020-10-09 18:41:55

AWS云服務(wù)云計(jì)算

2020-10-13 14:03:50

搭建ngrok服務(wù)

2024-04-29 08:05:34

NacosJava數(shù)據(jù)結(jié)構(gòu)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)