要給Nacos的Udp通信功能點(diǎn)個(gè)贊
本文轉(zhuǎn)載自微信公眾號「程序新視界」,作者二師兄。轉(zhuǎn)載本文請聯(lián)系程序新視界公眾號。
學(xué)習(xí)不用那么功利,二師兄帶你從更高維度輕松閱讀源碼~
Nacos在服務(wù)注冊功能中使用到了UDP的通信方式,主要功能就是用來輔助服務(wù)實(shí)例變化時(shí)對客戶端進(jìn)行通知。然而,對于大多數(shù)使用Nacos的程序員來說,可能還不知道這個(gè)功能,更別說靈活運(yùn)用了。
看完整個(gè)源碼的實(shí)現(xiàn),還是要為這一功能點(diǎn)個(gè)贊的,可以說非常巧妙和實(shí)用。但在實(shí)現(xiàn)上有一些不足,文末會進(jìn)行指出。
本篇文章就帶大家從源碼層面來分析一下Nacos 2.0中是如何基于UDP協(xié)議來實(shí)現(xiàn)服務(wù)實(shí)例變更的通知。
UDP通知基本原理
在分析源碼之前,先來從整體上看一下Nacos中UDP的實(shí)現(xiàn)原理。
Nacos UDP基本原理
我們知道,UDP協(xié)議通信是雙向的,沒有所謂的客戶端和服務(wù)端,因此在客戶端和服務(wù)器端都會開啟UDP的監(jiān)聽。客戶端是單獨(dú)開啟一個(gè)線程來處理UDP消息的。當(dāng)采用HTTP協(xié)議與注冊中心通信時(shí),,在客戶端調(diào)用服務(wù)訂閱接口時(shí),會將客戶端的UPD信息(IP和端口)上送到注冊中心,注冊中心以PushClient對象來進(jìn)行封裝和存儲。
當(dāng)注冊中心有實(shí)例變化時(shí),會發(fā)布一個(gè)ServiceChangeEvent事件,注冊中心監(jiān)聽到這個(gè)事件之后,會遍歷存儲的PushClient,基于UDP協(xié)議對客戶端進(jìn)行通知??蛻舳私邮盏経DP通知,即可更新本地緩存的實(shí)例列表。
前面我們已經(jīng)知道,基于HTTP協(xié)議進(jìn)行服務(wù)注冊時(shí),會有一個(gè)實(shí)例更新的時(shí)間差,因?yàn)槭峭ㄟ^客戶端定時(shí)拉取服務(wù)器中的實(shí)例列表。如果拉取太頻繁,注冊中心壓力比較大,如果拉取的周期比較長,實(shí)例的變化又沒辦法快速感知到。而UDP協(xié)議的通知,恰恰彌補(bǔ)了這一缺點(diǎn),所以說,要為基于UDP通知這個(gè)功能點(diǎn)個(gè)贊。
下面就來看看源碼層面是如何實(shí)現(xiàn)的。
客戶端UDP通知監(jiān)聽與處理
客戶端在實(shí)例化NamingHttpClientProxy時(shí),在其構(gòu)造方法中會初始化PushReceiver。
- public NamingHttpClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListManager serverListManager,
- Properties properties, ServiceInfoHolder serviceInfoHolder) {
- // ...
- // 構(gòu)建BeatReactor
- this.beatReactor = new BeatReactor(this, properties);
- // 構(gòu)建UDP端口監(jiān)聽
- this.pushReceiver = new PushReceiver(serviceInfoHolder);
- // ...
- }
PushReceiver的構(gòu)造方法,如下:
- public PushReceiver(ServiceInfoHolder serviceInfoHolder) {
- try {
- // 持有ServiceInfoHolder引用
- this.serviceInfoHolder = serviceInfoHolder;
- // 獲取UDP端口
- String udpPort = getPushReceiverUdpPort();
- // 根據(jù)端口情況,構(gòu)建DatagramSocket,如果未設(shè)置端口,則采用隨機(jī)端口
- if (StringUtils.isEmpty(udpPort)) {
- this.udpSocket = new DatagramSocket();
- } else {
- this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
- }
- // 創(chuàng)建只有一個(gè)線程的ScheduledExecutorService
- this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r);
- thread.setDaemon(true);
- thread.setName("com.alibaba.nacos.naming.push.receiver");
- return thread;
- }
- });
- // 執(zhí)行線程,PushReceiver實(shí)現(xiàn)了Runnable接口
- this.executorService.execute(this);
- } catch (Exception e) {
- NAMING_LOGGER.error("[NA] init udp socket failed", e);
- }
- }
PushReceiver的構(gòu)造方法做了以下操作:
- 第一、持有ServiceInfoHolder對象引用;
- 第二、獲取UDP端口;
- 第三、實(shí)例化DatagramSocket對象,用于發(fā)送和接收Socket數(shù)據(jù);
- 第四,創(chuàng)建線程池,并執(zhí)行PushReceiver(實(shí)現(xiàn)了Runnable接口);
既然PushReceiver實(shí)現(xiàn)了Runnable接口,run方法肯定是需要重新實(shí)現(xiàn)的:
- @Override
- public void run() {
- while (!closed) {
- try {
- // byte[] is initialized with 0 full filled by default
- byte[] buffer = new byte[UDP_MSS];
- // 創(chuàng)建DatagramPacket用于存儲接收到的報(bào)文
- DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
- // 接收報(bào)文,在未接收到報(bào)文時(shí)會進(jìn)行線程阻塞
- udpSocket.receive(packet);
- // 將報(bào)文轉(zhuǎn)換為json格式
- String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
- NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
- // 將json格式的報(bào)文轉(zhuǎn)換為PushPacket對象
- PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
- String ack;
- // 如果符合條件,則調(diào)用ServiceInfoHolder進(jìn)行接收報(bào)文處理,并返回應(yīng)答報(bào)文
- if (PUSH_PACKAGE_TYPE_DOM.equals(pushPacket.type) || PUSH_PACKAGE_TYPE_SERVICE.equals(pushPacket.type)) {
- serviceInfoHolder.processServiceInfo(pushPacket.data);
- // send ack to server
- ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
- + "\"\"}";
- } else if (PUSH_PACKAGE_TYPE_DUMP.equals(pushPacket.type)) {
- // dump data to server
- ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
- + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap()))
- + "\"}";
- } else {
- // do nothing send ack only
- ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
- + "\", \"data\":" + "\"\"}";
- }
- // 發(fā)送應(yīng)答報(bào)文
- udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
- packet.getSocketAddress()));
- } catch (Exception e) {
- if (closed) {
- return;
- }
- NAMING_LOGGER.error("[NA] error while receiving push data", e);
- }
- }
- }
PushReceiver#run方法主要處理了以下操作:
- 第一、構(gòu)建DatagramPacket用于接收報(bào)文數(shù)據(jù);
- 第二、通過DatagramSocket#receive方法阻塞等待報(bào)文的到來;
- 第三、DatagramSocket#receive接收到報(bào)文之后,方法繼續(xù)執(zhí)行;
- 第四、解析JSON格式的報(bào)文為PushPacket對象;
- 第五、判斷報(bào)文類型,調(diào)用ServiceInfoHolder#processServiceInfo處理接收到的報(bào)文信息,在該方法中會將PushPacket轉(zhuǎn)化為ServiceInfo對象;
- 第六、封裝ACK信息(即應(yīng)答報(bào)文信息);
- 第七、通過DatagramSocket發(fā)送應(yīng)答報(bào)文;
上面我們看到了Nacos客戶端是如何基于UDP進(jìn)行報(bào)文的監(jiān)聽和處理的,但并未找到客戶端是如何將UDP信息上送給注冊中心的。下面我們就來梳理一下,上送UDP信息的邏輯。
客戶端上送UDP信息
在NamingHttpClientProxy中存儲了UDP_PORT_PARAM,即UDP的端口參數(shù)信息。
UDP端口信息通過實(shí)例查詢類接口進(jìn)行傳遞,比如:查詢實(shí)例列表、查詢單個(gè)健康實(shí)例、查詢所有實(shí)例、訂閱接口、訂閱的更新任務(wù)UpdateTask等接口。在這些方法中都調(diào)用了NamingClientProxy#queryInstancesOfService方法。
NamingHttpClientProxy中的queryInstancesOfService方法實(shí)現(xiàn):
- @Override
- public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
- boolean healthyOnly) throws NacosException {
- final Map<String, String> params = new HashMap<String, String>(8);
- params.put(CommonParams.NAMESPACE_ID, namespaceId);
- params.put(CommonParams.SERVICE_NAME, NamingUtils.getGroupedName(serviceName, groupName));
- params.put(CLUSTERS_PARAM, clusters);
- // 獲取UDP端口
- params.put(UDP_PORT_PARAM, String.valueOf(udpPort));
- params.put(CLIENT_IP_PARAM, NetUtils.localIP());
- params.put(HEALTHY_ONLY_PARAM, String.valueOf(healthyOnly));
- String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
- if (StringUtils.isNotEmpty(result)) {
- return JacksonUtils.toObj(result, ServiceInfo.class);
- }
- return new ServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), clusters);
- }
但查看源碼會發(fā)現(xiàn),查詢實(shí)例列表、查詢單個(gè)健康實(shí)例、查詢所有實(shí)例、訂閱的更新任務(wù)UpdateTask中,UDP端口傳遞的參數(shù)值均為0。只有HTTP協(xié)議的訂閱接口取值為PushReceiver中的UDP端口號。
- @Override
- public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
- return queryInstancesOfService(serviceName, groupName, clusters, pushReceiver.getUdpPort(), false);
- }
在上面的代碼中我們已經(jīng)知道PushReceiver中有一個(gè)getPushReceiverUdpPort的方法:
- public static String getPushReceiverUdpPort() {
- return System.getenv(PropertyKeyConst.PUSH_RECEIVER_UDP_PORT);
- }
很明顯,UDP的端口是通過環(huán)境變量設(shè)置的,對應(yīng)的key為“push.receiver.udp.port”。
而在1.4.2版本中,HostReactor中的NamingProxy成員變量的queryList方法也會傳遞UDP端口:
- public void updateService(String serviceName, String clusters) throws NacosException {
- ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
- try {
- String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
- if (StringUtils.isNotEmpty(result)) {
- processServiceJson(result);
- }
- } finally {
- // ...
- }
- }
關(guān)于1.4.2版本中的實(shí)現(xiàn),大家自行看源碼即可,這里不再展開。
完成了客戶端UDP基本信息的傳遞,再來看看服務(wù)器端是如何接收和存儲這些信息的。
UDP服務(wù)存儲
服務(wù)器端在獲取實(shí)例列表的接口中,對UDP端口進(jìn)行了處理。
- @GetMapping("/list")
- @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
- public Object list(HttpServletRequest request) throws Exception {
- // ...
- // 如果沒有獲得UDP端口信息,則默認(rèn)端口為0
- int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
- // ...
- // 客戶端的IP、UDP端口封裝到Subscriber對象中
- Subscriber subscriber = new Subscriber(clientIP + ":" + udpPort, agent, app, clientIP, namespaceId, serviceName,
- udpPort, clusters);
- return getInstanceOperator().listInstance(namespaceId, serviceName, subscriber, clusters, healthyOnly);
- }
在getInstanceOperator()方法中會獲得當(dāng)前采用的哪個(gè)協(xié)議,然后選擇對應(yīng)的處理類:
- /**
- * 判斷并返回采用V1版本或V2版本的操作服務(wù)
- * @return V1:Jraft協(xié)議(服務(wù)器端);V2:gRpc協(xié)議(客戶端)
- */
- private InstanceOperator getInstanceOperator() {
- return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;
- }
這里具體的實(shí)現(xiàn)類為InstanceOperatorServiceImpl:
- @Override
- public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster,
- boolean healthOnly) throws Exception {
- ClientInfo clientInfo = new ClientInfo(subscriber.getAgent());
- String clientIP = subscriber.getIp();
- ServiceInfo result = new ServiceInfo(serviceName, cluster);
- Service service = serviceManager.getService(namespaceId, serviceName);
- long cacheMillis = switchDomain.getDefaultCacheMillis();
- // now try to enable the push
- try {
- // 處理支持UDP協(xié)議的客戶端信息
- if (subscriber.getPort() > 0 && pushService.canEnablePush(subscriber.getAgent())) {
- subscriberServiceV1.addClient(namespaceId, serviceName, cluster, subscriber.getAgent(),
- new InetSocketAddress(clientIP, subscriber.getPort()), pushDataSource, StringUtils.EMPTY,
- StringUtils.EMPTY);
- cacheMillis = switchDomain.getPushCacheMillis(serviceName);
- }
- } catch (Exception e) {
- // ...
- }
- // ...
- }
當(dāng)UDP端口大于0,且agent參數(shù)定義的客戶端支持UDP,則將對應(yīng)的客戶端信息封裝到InetSocketAddress對象中,然后放入NamingSubscriberServiceV1Impl中(該類已經(jīng)被廢棄,看后續(xù)如何調(diào)整該方法實(shí)現(xiàn))。
在NamingSubscriberServiceV1Impl中,會將對應(yīng)的參數(shù)封裝為PushClient,存放在Map當(dāng)中。
- public void addClient(String namespaceId, String serviceName, String clusters, String agent,
- InetSocketAddress socketAddr, DataSource dataSource, String tenant, String app) {
- PushClient client = new PushClient(namespaceId, serviceName, clusters, agent, socketAddr, dataSource, tenant,
- app);
- addClient(client);
- }
addClient方法會將PushClient信息存放到ConcurrentMap
- private final ConcurrentMap<String, ConcurrentMap<String, PushClient>> clientMap = new ConcurrentHashMap<>();
- public void addClient(PushClient client) {
- // client is stored by key 'serviceName' because notify event is driven by serviceName change
- String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName());
- ConcurrentMap<String, PushClient> clients = clientMap.get(serviceKey);
- if (clients == null) {
- clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<>(1024));
- clients = clientMap.get(serviceKey);
- }
- PushClient oldClient = clients.get(client.toString());
- if (oldClient != null) {
- oldClient.refresh();
- } else {
- PushClient res = clients.putIfAbsent(client.toString(), client);
- // ...
- }
- }
此時(shí),UDP的IP、端口信息已經(jīng)封裝到PushClient當(dāng)中,并存儲在NamingSubscriberServiceV1Impl的成員變量當(dāng)中。
注冊中心的UDP通知
當(dāng)服務(wù)端發(fā)現(xiàn)某個(gè)實(shí)例發(fā)生了變化,比如主動注銷了,會發(fā)布一個(gè)ServiceChangeEvent事件,UdpPushService會監(jiān)聽到該事件,并進(jìn)行業(yè)務(wù)處理。
在UdpPushService的onApplicationEvent方法中,會根據(jù)PushClient的具體情況進(jìn)行移除或發(fā)送UDP通知。onApplicationEvent中核心邏輯代碼如下:
- ConcurrentMap<String, PushClient> clients = subscriberServiceV1.getClientMap()
- .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
- if (MapUtils.isEmpty(clients)) {
- return;
- }
- Map<String, Object> cache = new HashMap<>(16);
- long lastRefTime = System.nanoTime();
- for (PushClient client : clients.values()) {
- // 移除僵尸客戶端
- if (client.zombie()) {
- Loggers.PUSH.debug("client is zombie: " + client);
- clients.remove(client.toString());
- Loggers.PUSH.debug("client is zombie: " + client);
- continue;
- }
- AckEntry ackEntry;
- String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
- byte[] compressData = null;
- Map<String, Object> data = null;
- if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
- org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
- compressData = (byte[]) (pair.getValue0());
- data = (Map<String, Object>) pair.getValue1();
- }
- // 封裝AckEntry對象
- if (compressData != null) {
- ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
- } else {
- ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
- if (ackEntry != null) {
- cache.put(key, new org.javatuples.Pair<>(ackEntry.getOrigin().getData(), ackEntry.getData()));
- }
- }
- // 通過UDP通知其他客戶端
- udpPush(ackEntry);
- }
事件處理的核心邏輯是就是先判斷PushClient的狀態(tài)信息,如果已經(jīng)是僵尸客戶端,則移除。然后將發(fā)送UDP的報(bào)文信息和接收客戶端的信息封裝為AckEntry對象,然后調(diào)用udpPush方法,進(jìn)行UDP消息的發(fā)送。
注冊中心的UDP接收
在看客戶端源碼的時(shí)候,我們看到客戶端不僅會接收UDP請求,而且還會進(jìn)行應(yīng)答。那么注冊中心怎么接收應(yīng)答呢?也在UdpPushService類中,該類內(nèi)部的靜態(tài)代碼塊初始化一個(gè)UDP的DatagramSocket,用來接收消息:
- static {
- try {
- udpSocket = new DatagramSocket();
- Receiver receiver = new Receiver();
- Thread inThread = new Thread(receiver);
- inThread.setDaemon(true);
- inThread.setName("com.alibaba.nacos.naming.push.receiver");
- inThread.start();
- } catch (SocketException e) {
- Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");
- }
- }
Receiver是一個(gè)內(nèi)部類,實(shí)現(xiàn)了Runnable接口,在其run方法中主要就是接收報(bào)文信息,然后進(jìn)行報(bào)文消息的判斷,根據(jù)判斷結(jié)果,操作本地Map中數(shù)據(jù)。
UDP設(shè)計(jì)不足
文章最開始就寫到,UDP的設(shè)計(jì)非常棒,即彌補(bǔ)了HTTP定時(shí)拉取的不足,又不至于太影響性能。但目前Nacos在UDP方面有一些不足,也可能是個(gè)人的吹毛求疵吧。
第一,文檔中沒有明確說明UDP的功能如何使用,這導(dǎo)致很多使用者在使用時(shí)并不知道UDP功能的存在,以及使用的限制條件。
第二,對云服務(wù)不友好??蛻舳说腢DP端口可以自定義,但服務(wù)器端的UDP端口是隨機(jī)獲取到。在云服務(wù)中,即便是內(nèi)網(wǎng)服務(wù),UDP端口也是被防火墻限制的。如果服務(wù)端的UDP端口是隨機(jī)獲取(客戶端默認(rèn)也是),那么UDP的通信將直接被防火墻攔截掉,而用戶根本看不到任何異常(UDP協(xié)議不關(guān)注客戶端是否收到消息)。
至于這兩點(diǎn),說起來算是瑕不掩瑜,讀完源碼或讀過我這篇文章的朋友大概已經(jīng)知道怎么用了。后續(xù)可以給官方提一個(gè)Issue,看看是否可以改進(jìn)。
小結(jié)
本文重點(diǎn)從三個(gè)方面講解的Nacos基于UDP的服務(wù)實(shí)例變更通知:
第一,客戶端監(jiān)聽UDP端口,當(dāng)接收注冊中心發(fā)來的服務(wù)實(shí)例變化,可以及時(shí)的更新本地的實(shí)例緩存;
第二,客戶端通過訂閱接口,將自身的UDP信息發(fā)送給注冊中心,注冊中心進(jìn)行存儲;
第三,注冊中心中實(shí)例發(fā)生了變化,通過事件機(jī)制,將變更信息通過UDP協(xié)議發(fā)送給客戶端。
經(jīng)過本篇文章,想必你不僅了解了Nacos中UDP協(xié)議的通知機(jī)制。同時(shí),也開拓了一個(gè)新的思路,即如何使用UDP,在什么場景下使用UDP,以及在云服務(wù)中使用UDP可能會存在的問題。如果這篇文章對你有幫助,關(guān)注或點(diǎn)贊都可以。