聊聊對Nacos Client服務(wù)發(fā)現(xiàn)源碼分析
本文轉(zhuǎn)載自微信公眾號「程序新視界」,作者二師兄。轉(zhuǎn)載本文請聯(lián)系程序新視界公眾號。
學(xué)習(xí)不用那么功利,二師兄帶你從更高維度輕松閱讀源碼~
本篇帶大家通過源碼層面分析一下Nacos Client的服務(wù)發(fā)現(xiàn)的路程,事實可能并不像你想象的那樣簡單。
Nacos服務(wù)發(fā)現(xiàn)
直觀的看,Nacos客戶端的服務(wù)發(fā)現(xiàn),就是封裝參數(shù)、調(diào)用服務(wù)端接口、獲得返回實例列表。
naocos
但細(xì)化這個流程,會發(fā)現(xiàn)不僅包括了通過NamingService獲取服務(wù)列表,在獲取服務(wù)列表的過程中還涉及到通信協(xié)議(Http or gRPC)、訂閱流程、故障轉(zhuǎn)移邏輯等。下面我們根據(jù)服務(wù)發(fā)現(xiàn)來捋一下相關(guān)的流程。
先說入口程序,依舊是在NamingTest中可以看到:
- NamingService namingService = NacosFactory.createNamingService(properties);
- namingService.registerInstance("nacos.test.1", instance);
- ThreadUtils.sleep(5000L);
- // 獲取實例列表
- List<Instance> list = namingService.getAllInstances("nacos.test.1");
關(guān)于NamingService的實例化和基本功能,在服務(wù)注冊時已經(jīng)講過,這里直接看獲取實例列表方法getAllInstances。該方法的參數(shù)就是服務(wù)的名稱。
經(jīng)過一些列的重載方法調(diào)用,真正處理核心邏輯的方法如下:
- @Override
- public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
- boolean subscribe) throws NacosException {
- ServiceInfo serviceInfo;
- String clusterString = StringUtils.join(clusters, ",");
- // 是否訂閱模式
- if (subscribe) {
- // 先從客戶端緩存獲取服務(wù)信息
- serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
- if (null == serviceInfo) {
- // 如果本地緩存不存在服務(wù)信息,則進(jìn)行訂閱
- serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
- }
- } else {
- // 如果未訂閱服務(wù)信息,則直接從服務(wù)器進(jìn)行查詢
- serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
- }
- // 從服務(wù)信息中獲去實例列表
- List<Instance> list;
- if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
- return new ArrayList<Instance>();
- }
- return list;
- }
首先看重載的getAllInstances方法,比入口方法多了幾個參數(shù),這里不僅有服務(wù)名稱,還有分組名稱(groupName)、集群列表(clusters)、是否訂閱(subscribe)。
重載方法中的其他參數(shù)已經(jīng)設(shè)置了默認(rèn)值。比如,分組名稱默認(rèn)為“DEFAULT_GROUP”、集群列表默認(rèn)為空數(shù)組、是否訂閱默認(rèn)為“訂閱”。
上述方法整理成流程圖如下:
naocos
上述流程的基本邏輯為:
如果是訂閱模式,則直接從本地緩存獲取服務(wù)信息(ServiceInfo),然后從中獲取實例列表,這是因為訂閱機制會自動同步服務(wù)器實例的變化到本地。如果本地緩存中沒有,那說明是首次調(diào)用,則進(jìn)行訂閱,在訂閱完成后會獲得到服務(wù)信息。
如果是非訂閱模式,那就直接請求服務(wù)器端,獲得服務(wù)信息。
訂閱處理流程
在上述流程中,涉及到了訂閱邏輯,入口代碼為獲取實例列表中的如下方法:
- serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
下面就來看看該方法內(nèi)部是如何進(jìn)行處理的。首先,這里的clientProxy是NamingClientProxy類的對象。對應(yīng)的subscribe實現(xiàn)如下:
- @Override
- public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
- String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
- String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
- // 獲取緩存中的ServiceInfo
- ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
- if (null == result) {
- // 如果為null,則進(jìn)行訂閱邏輯處理,基于gRPC協(xié)議
- result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
- }
- // 定時調(diào)度UpdateTask
- serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
- // ServiceInfo本地緩存處理
- serviceInfoHolder.processServiceInfo(result);
- return result;
- }
在上述代碼中,可以看到在獲取服務(wù)實例列表時(特別是首次),也進(jìn)行了訂閱邏輯的拓展,基本流程圖如下:
naocos
上圖流程中可以看出,訂閱方法先通過代理類進(jìn)行了本地緩存的判斷,如果本地緩存存在ServiceInfo信息,則直接返回。如果不存在,則默認(rèn)采用gRPC協(xié)議進(jìn)行訂閱,并返回ServiceInfo。
grpcClientProxy的subscribe訂閱方法就是直接向服務(wù)器發(fā)送了一個訂閱請求,并返回結(jié)果,就沒有做過多處理了。
訂閱完成之后,會通過ServiceInfoUpdateService開啟一個定時任務(wù),這個定時任務(wù)主要的作用就是來定時同步服務(wù)器端的實例列表信息,并進(jìn)行本地緩存更新等操作。
最后一步,ServiceInfo本地緩存處理。這里會將獲得的最新ServiceInfo與本地內(nèi)存中的ServiceInfo進(jìn)行比較,更新,發(fā)布變更時間,磁盤文件存儲等操作。其實,這一步的操作,在訂閱定時任務(wù)中也進(jìn)行了處理。
關(guān)于訂閱細(xì)節(jié)和本地緩存處理,涉及內(nèi)容較多,我們后面單獨拓展開講解。這里知道整體流程即可。
小結(jié)
本文主要梳理了Nacos客戶端服務(wù)發(fā)現(xiàn)的核心流程,包括:
第一,如果沒有開啟訂閱模式,則直接通過/instance/list接口(默認(rèn)通過gRPC協(xié)議)獲取服務(wù)實例列表信息;
第二,如果開啟訂閱模式(默認(rèn)開啟),則先會從本地緩存中獲取實例信息,如果不存在,則進(jìn)行訂閱獲并獲取實例信息;
第三,在開啟訂閱時,會開啟定時任務(wù),定時執(zhí)行UpdateTask(獲取服務(wù)器實例信息、更新本地緩存、發(fā)布事件);
第四,在第二步獲得最新的實例信息之后,也會執(zhí)行processServiceInfo方法來更新內(nèi)存和本地實例緩存,并發(fā)布變更時間。
第五,至此,與第二步形成循環(huán),每次獲取本地緩存,不存在則更新……
關(guān)于用來處理訂閱相關(guān)的UpdateTask和用來處理本地緩存的ServiceInfoHolder#processServiceInfo方法,我們后面文章繼續(xù)講解。