大家好呀,我是樓仔。
現(xiàn)如今市面上注冊(cè)中心的輪子很多,我實(shí)際使用過(guò)的就有三款:Eureka、Gsched、Nacos,由于當(dāng)前參與 Nacos 集群的維護(hù)和開(kāi)發(fā)工作,期間也參與了 Nacos 社區(qū)的一些開(kāi)發(fā)和 Bug Fix 工作,過(guò)程中對(duì) Nacos 原理有了一定的積累,今天給大家分享一下 Nacos 動(dòng)態(tài)服務(wù)發(fā)現(xiàn)的原理。
不 BB,上文章目錄:

1、什么是動(dòng)態(tài)服務(wù)發(fā)現(xiàn)?
服務(wù)發(fā)現(xiàn)是指使用一個(gè)注冊(cè)中心來(lái)記錄分布式系統(tǒng)中的全部服務(wù)的信息,以便其他服務(wù)能夠快速的找到這些已注冊(cè)的服務(wù)。
在單體應(yīng)用中,DNS+Nginx 可以滿足服務(wù)發(fā)現(xiàn)的要求,此時(shí)服務(wù)的IP列表配置在 nginx 上。在微服務(wù)架構(gòu)中,由于服務(wù)粒度變的更細(xì),服務(wù)的上下線更加頻繁,我們需要一款注冊(cè)中心來(lái)動(dòng)態(tài)感知服務(wù)的上下線,并且推送IP列表變化給服務(wù)消費(fèi)者,架構(gòu)如下圖。

2、Nacos 實(shí)現(xiàn)動(dòng)態(tài)服務(wù)發(fā)現(xiàn)的原理
Nacos實(shí)現(xiàn)動(dòng)態(tài)服務(wù)發(fā)現(xiàn)的核心原理如下圖,我們接下來(lái)的內(nèi)容將圍繞這個(gè)圖來(lái)進(jìn)行。

2.1 通訊協(xié)議
整個(gè)服務(wù)注冊(cè)與發(fā)現(xiàn)過(guò)程,都離不開(kāi)通訊協(xié)議,在1.x的 Nacos 版本中服務(wù)端只支持 http 協(xié)議,后來(lái)為了提升性能在2.x版本引入了谷歌的 grpc,grpc 是一款長(zhǎng)連接協(xié)議,極大的減少了 http 請(qǐng)求頻繁的連接創(chuàng)建和銷(xiāo)毀過(guò)程,能大幅度提升性能,節(jié)約資源。
據(jù)官方測(cè)試,Nacos服務(wù)端 grpc 版本,相比 http 版本的性能提升了9倍以上。
2.2 Nacos 服務(wù)注冊(cè)
簡(jiǎn)單來(lái)講,服務(wù)注冊(cè)的目的就是客戶端將自己的ip端口等信息上報(bào)給 Nacos 服務(wù)端,過(guò)程如下:
- 創(chuàng)建長(zhǎng)連接:Nacos SDK 通過(guò)Nacos服務(wù)端域名解析出服務(wù)端ip列表,選擇其中一個(gè)ip創(chuàng)建 grpc 連接,并定時(shí)檢查連接狀態(tài),當(dāng)連接斷開(kāi),則自動(dòng)選擇服務(wù)端ip列表中的下一個(gè)ip進(jìn)行重連。
- 健康檢查請(qǐng)求:在正式發(fā)起注冊(cè)之前,Nacos SDK 向服務(wù)端發(fā)送一個(gè)空請(qǐng)求,服務(wù)端回應(yīng)一個(gè)空請(qǐng)求,若Nacos SDK 未收到服務(wù)端回應(yīng),則認(rèn)為服務(wù)端不健康,并進(jìn)行一定次數(shù)重試,如果都未收到回應(yīng),則注冊(cè)失敗。
- 發(fā)起注冊(cè):當(dāng)你查看Nacos java SDK的注冊(cè)方法時(shí),你會(huì)發(fā)現(xiàn)沒(méi)有返回值,這是因?yàn)镹acos SDK做了補(bǔ)償機(jī)制,在真實(shí)給服務(wù)端上報(bào)數(shù)據(jù)之前,會(huì)先往緩存中插入一條記錄表示開(kāi)始注冊(cè),注冊(cè)成功之后再?gòu)木彺嬷袠?biāo)記這條記錄為注冊(cè)成功,當(dāng)注冊(cè)失敗時(shí),緩存中這條記錄是未注冊(cè)成功的狀態(tài),Nacos SDK開(kāi)啟了一個(gè)定時(shí)任務(wù),定時(shí)查詢異常的緩存數(shù)據(jù),重新發(fā)起注冊(cè)。
Nacos SDK注冊(cè)失敗時(shí)的自動(dòng)補(bǔ)償機(jī)制時(shí)序圖。

相關(guān)源碼如下:
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
instance);
//添加redo日志
redoService.cacheInstanceForRedo(serviceName, groupName, instance);
doRegisterService(serviceName, groupName, instance);
}
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
//向服務(wù)端發(fā)起注冊(cè)
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
requestToServer(request, Response.class);
//標(biāo)記注冊(cè)成功
redoService.instanceRegistered(serviceName, groupName);
}
執(zhí)行補(bǔ)償定時(shí)任務(wù)RedoScheduledTask。
@Override
public void run() {
if (!redoService.isConnected()) {
LogUtils.NAMING_LOGGER.warn("Grpc Connection is disconnect, skip current redo task");
return;
}
try {
redoForInstances();
redoForSubscribes();
} catch (Exception e) {
LogUtils.NAMING_LOGGER.warn("Redo task run with unexpected exception: ", e);
}
}
private void redoForInstances() {
for (InstanceRedoData each : redoService.findInstanceRedoData()) {
try {
redoForInstance(each);
} catch (NacosException e) {
LogUtils.NAMING_LOGGER.error("Redo instance operation {} for {}@@{} failed. ", each.getRedoType(),
each.getGroupName(), each.getServiceName(), e);
}
}
}
- 服務(wù)端數(shù)據(jù)同步(Distro協(xié)議):Nacos SDK只會(huì)與服務(wù)端某個(gè)節(jié)點(diǎn)建立長(zhǎng)連接,當(dāng)服務(wù)端接受到客戶端注冊(cè)的實(shí)例數(shù)據(jù)后,還需要將實(shí)例數(shù)據(jù)同步給其他節(jié)點(diǎn)。Nacos自己實(shí)現(xiàn)了一個(gè)一致性協(xié)議名為Distro,服務(wù)注冊(cè)的時(shí)候會(huì)觸發(fā)Distro一次同步,每個(gè)Nacos節(jié)點(diǎn)之間會(huì)定時(shí)互相發(fā)送Distro數(shù)據(jù),以此保證數(shù)據(jù)最終一致。
- 服務(wù)實(shí)例上線推送:Nacos服務(wù)端收到服務(wù)實(shí)例數(shù)據(jù)后會(huì)將服務(wù)的最新實(shí)例列表通過(guò)grpc推送給該服務(wù)的所有訂閱者。
- 服務(wù)注冊(cè)過(guò)程源碼時(shí)序圖:整理了一下服務(wù)注冊(cè)過(guò)程整體時(shí)序圖,對(duì)源碼實(shí)現(xiàn)感興趣的可以按照根據(jù)這個(gè)時(shí)序圖view一下源碼。

2.3 Nacos 心跳機(jī)制
目前主流的注冊(cè)中心,比如Consul、Eureka、Zk包括我們公司自研的Gsched,都是通過(guò)心跳機(jī)制來(lái)感知服務(wù)的下線。Nacos也是通過(guò)心跳機(jī)制來(lái)實(shí)現(xiàn)的。
Nacos目前SDK維護(hù)了兩個(gè)分支的版本(1.x、2.x),這兩個(gè)版本心跳機(jī)制的實(shí)現(xiàn)不一樣。其中1.x版本的SDK通過(guò)http協(xié)議來(lái)定時(shí)向服務(wù)端發(fā)送心跳維持自己的健康狀態(tài),2.x版本的SDK則通過(guò)grpc自身的心跳機(jī)制來(lái)?;睿?dāng)Nacos服務(wù)端接受不到服務(wù)實(shí)例的心跳,會(huì)認(rèn)為實(shí)例下線。如下圖:

grpc監(jiān)測(cè)到連接斷開(kāi)事件,發(fā)送ClientDisconnectEvent。
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
//連接斷開(kāi),發(fā)送連接斷開(kāi)事件
public boolean clientDisconnected(String clientId) {
Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
ConnectionBasedClient client = clients.remove(clientId);
if (null == client) {
return true;
}
client.release();
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
return true;
}
}
移除客戶端注冊(cè)的服務(wù)實(shí)例
public class ClientServiceIndexesManager extends SmartSubscriber {
@Override
public void onEvent(Event event) {
//接收失去連接事件
if (event instanceof ClientEvent.ClientDisconnectEvent) {
handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
} else if (event instanceof ClientOperationEvent) {
handleClientOperation((ClientOperationEvent) event);
}
}
private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {
Client client = event.getClient();
for (Service each : client.getAllSubscribeService()) {
removeSubscriberIndexes(each, client.getClientId());
}
//移除客戶端注冊(cè)的服務(wù)實(shí)例
for (Service each : client.getAllPublishedService()) {
removePublisherIndexes(each, client.getClientId());
}
}
//移除客戶端注冊(cè)的服務(wù)實(shí)例
private void removePublisherIndexes(Service service, String clientId) {
if (!publisherIndexes.containsKey(service)) {
return;
}
publisherIndexes.get(service).remove(clientId);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
}
2.4 Nacos 服務(wù)訂閱
當(dāng)一個(gè)服務(wù)發(fā)生上下線,Nacos如何知道要推送給哪些客戶端?
Nacos SDK 提供了訂閱和取消訂閱方法,當(dāng)客戶端向服務(wù)端發(fā)起訂閱請(qǐng)求,服務(wù)端會(huì)記錄發(fā)起調(diào)用的客戶端為該服務(wù)的訂閱者,同時(shí)將服務(wù)的最新實(shí)例列表返回。當(dāng)客戶端發(fā)起了取消訂閱,服務(wù)端就會(huì)從該服務(wù)的訂閱者列表中把當(dāng)前客戶端移除。
當(dāng)客戶端發(fā)起訂閱時(shí),服務(wù)端除了會(huì)同步返回最新的服務(wù)實(shí)例列表,還會(huì)異步的通過(guò)grpc推送給該訂閱者最新的服務(wù)實(shí)例列表,這樣做的目的是為了異步更新客戶端本地緩存的服務(wù)數(shù)據(jù)。
當(dāng)客戶端訂閱的服務(wù)上下線,該服務(wù)所有的訂閱者會(huì)立刻收到最新的服務(wù)列表并且將服務(wù)最新的實(shí)例數(shù)據(jù)更新到內(nèi)存。

我們也看一下相關(guān)源碼,服務(wù)端接收到訂閱數(shù)據(jù),首先保存到內(nèi)存中。
@Override
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
Client client = clientManager.getClient(clientId);
//校驗(yàn)長(zhǎng)連接是否正常
if (!clientIsLegal(client, clientId)) {
return;
}
//保存訂閱數(shù)據(jù)
client.addServiceSubscriber(singleton, subscriber);
client.setLastUpdatedTime();
//發(fā)送訂閱事件
NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}
private void handleClientOperation(ClientOperationEvent event) {
Service service = event.getService();
String clientId = event.getClientId();
if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
addPublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
removePublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
//處理訂閱操作
addSubscriberIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
removeSubscriberIndexes(service, clientId);
}
}
然后發(fā)布訂閱事件。
private void addSubscriberIndexes(Service service, String clientId) {
//保存訂閱數(shù)據(jù)
subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
// Fix #5404, Only first time add need notify event.
if (subscriberIndexes.get(service).add(clientId)) {
//發(fā)布訂閱事件
NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
}
}
服務(wù)端自己消費(fèi)訂閱事件,并且推送給訂閱的客戶端最新的服務(wù)實(shí)例數(shù)據(jù)。
@Override
public void onEvent(Event event) {
if (!upgradeJudgement.isUseGrpcFeatures()) {
return;
}
if (event instanceof ServiceEvent.ServiceChangedEvent) {
// If service changed, push to all subscribers.
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
// If service is subscribed by one client, only push this client.
ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
Service service = subscribedEvent.getService();
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
subscribedEvent.getClientId()));
}
}
2.5 Nacos 推送
推送方式
前面說(shuō)了服務(wù)的注冊(cè)和訂閱都會(huì)發(fā)生推送(服務(wù)端->客戶端),那推送到底是如何實(shí)現(xiàn)的呢?
在早期的Nacos版本,當(dāng)服務(wù)實(shí)例變化,服務(wù)端會(huì)通過(guò)udp協(xié)議將最新的數(shù)據(jù)發(fā)送給客戶端,后來(lái)發(fā)現(xiàn)udp推送有一定的丟包率,于是新版本的Nacos支持了grpc推送。Nacos服務(wù)端會(huì)自動(dòng)判斷客戶端的版本來(lái)選擇哪種方式來(lái)進(jìn)行推送,如果你使用1.4.2以前的SDK進(jìn)行注冊(cè),那Nacos服務(wù)端會(huì)使用udp協(xié)議來(lái)進(jìn)行推送,反之則使用grpc。
推送失敗重試
當(dāng)發(fā)送推送時(shí),客戶端可能正在重啟,或者連接不穩(wěn)定導(dǎo)致推送失敗,這個(gè)時(shí)候Nacos會(huì)進(jìn)行重試。Nacos將每個(gè)推送都封裝成一個(gè)任務(wù)對(duì)象,放入到隊(duì)列中,再開(kāi)啟一個(gè)線程不停的從隊(duì)列取出任務(wù)執(zhí)行,執(zhí)行之前會(huì)先刪除該任務(wù),如果執(zhí)行失敗則將任務(wù)重新添加到隊(duì)列,該線程會(huì)記錄任務(wù)執(zhí)行的時(shí)間,如果超過(guò)1秒,則會(huì)記錄到日志。
推送部分源碼
添加推送任務(wù)到執(zhí)行隊(duì)列中。
private static class PushDelayTaskProcessor implements NacosTaskProcessor {
private final PushDelayTaskExecuteEngine executeEngine;
public PushDelayTaskProcessor(PushDelayTaskExecuteEngine executeEngine) {
this.executeEngine = executeEngine;
}
@Override
public boolean process(NacosTask task) {
PushDelayTask pushDelayTask = (PushDelayTask) task;
Service service = pushDelayTask.getService();
NamingExecuteTaskDispatcher.getInstance()
.dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
return true;
}
}
推送任務(wù)PushExecuteTask 的執(zhí)行。
public class PushExecuteTask extends AbstractExecuteTask {
//..省略
@Override
public void run() {
try {
//封裝要推送的服務(wù)實(shí)例數(shù)據(jù)
PushDataWrapper wrapper = generatePushData();
ClientManager clientManager = delayTaskEngine.getClientManager();
//如果是服務(wù)上下線導(dǎo)致的推送,獲取所有訂閱者
//如果是訂閱導(dǎo)致的推送,獲取訂閱者
for (String each : getTargetClientIds()) {
Client client = clientManager.getClient(each);
if (null == client) {
// means this client has disconnect
continue;
}
Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
//推送給訂閱者
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
}
} catch (Exception e) {
Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
//當(dāng)推送發(fā)生異常,重新將推送任務(wù)放入執(zhí)行隊(duì)列
delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
}
}
//如果是服務(wù)上下線導(dǎo)致的推送,獲取所有訂閱者
//如果是訂閱導(dǎo)致的推送,獲取訂閱者
private Collection<String> getTargetClientIds() {
return delayTask.isPushToAll() ? delayTaskEngine.getIndexesManager().getAllClientsSubscribeService(service)
: delayTask.getTargetClients();
}
執(zhí)行推送任務(wù)線程InnerWorker 的執(zhí)行。
/**
* Inner execute worker.
*/
private class InnerWorker extends Thread {
InnerWorker(String name) {
setDaemon(false);
setName(name);
}
@Override
public void run() {
while (!closed.get()) {
try {
//從隊(duì)列中取出任務(wù)PushExecuteTask
Runnable task = queue.take();
long begin = System.currentTimeMillis();
//執(zhí)行PushExecuteTask
task.run();
long duration = System.currentTimeMillis() - begin;
if (duration > 1000L) {
log.warn("task {} takes {}ms", task, duration);
}
} catch (Throwable e) {
log.error("[TASK-FAILED] " + e.toString(), e);
}
}
}
}
2.6 Nacos SDK 查詢服務(wù)實(shí)例
服務(wù)消費(fèi)者首先需要調(diào)用Nacos SDK的接口來(lái)獲取最新的服務(wù)實(shí)例,然后才能從獲取到的實(shí)例列表中以加權(quán)輪詢的方式選擇出一個(gè)實(shí)例(包含ip,port等信息),最后再發(fā)起調(diào)用。
前面已經(jīng)提到Nacos服務(wù)發(fā)生上下線、訂閱的時(shí)候都會(huì)推送最新的服務(wù)實(shí)例列表到當(dāng)客戶端,客戶端再更新本地內(nèi)存中的緩沖數(shù)據(jù),所以調(diào)用Nacos SDK提供的查詢實(shí)例列表的接口時(shí),不會(huì)直接請(qǐng)求服務(wù)端獲取數(shù)據(jù),而是會(huì)優(yōu)先使用內(nèi)存中的服務(wù)數(shù)據(jù),只有內(nèi)存中查不到的情況下才會(huì)發(fā)起訂閱請(qǐng)求服務(wù)端數(shù)據(jù)。
Nacos SDK內(nèi)存中的數(shù)據(jù)除了接受來(lái)自服務(wù)端的推送更新之外,自己本地也會(huì)有一個(gè)定時(shí)任務(wù)定時(shí)去獲取服務(wù)端數(shù)據(jù)來(lái)進(jìn)行兜底。Nacos SDK在查詢的時(shí)候也了容災(zāi)機(jī)制,即從磁盤(pán)獲取服務(wù)數(shù)據(jù),而這個(gè)磁盤(pán)的數(shù)據(jù)其實(shí)也是來(lái)自于內(nèi)存,有一個(gè)定時(shí)任務(wù)定時(shí)從內(nèi)存緩存中獲取然后加載到磁盤(pán)。Nacos SDK的容災(zāi)機(jī)制默認(rèn)關(guān)閉,可通過(guò)設(shè)置環(huán)境變量failover-mode=true來(lái)開(kāi)啟。
架構(gòu)圖
用戶查詢流程
查詢服務(wù)實(shí)例部分源碼
private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
//這里默認(rèn)傳過(guò)來(lái)是true
if (subscribe) {
//從本地內(nèi)存獲取服務(wù)數(shù)據(jù),如果獲取不到則從磁盤(pán)獲取
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
//如果從本地獲取不到數(shù)據(jù),則調(diào)用訂閱方法
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
//適用于不走訂閱,直接從服務(wù)端獲取數(shù)據(jù)的情況
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
}
//從本地內(nèi)存獲取服務(wù)數(shù)據(jù),如果開(kāi)啟了故障轉(zhuǎn)移則直接從磁盤(pán)獲取,因?yàn)楫?dāng)服務(wù)端掛了,本地啟動(dòng)時(shí)內(nèi)存中也沒(méi)有數(shù)據(jù)
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
String key = ServiceInfo.getKey(groupedServiceName, clusters);
//故障轉(zhuǎn)移則直接從磁盤(pán)獲取
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
//返回內(nèi)存中數(shù)據(jù)
return serviceInfoMap.get(key);
}
3. 結(jié)語(yǔ)
本篇文章向大家介紹 Nacos 服務(wù)發(fā)現(xiàn)的基本概念和核心能力以及實(shí)現(xiàn)的原理,旨在讓大家對(duì) Nacos 的服務(wù)注冊(cè)與發(fā)現(xiàn)功能有更多的了解,做到心中有數(shù)。