自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

ZooKeeper分布式配置,看這篇就夠了

開(kāi)發(fā) 開(kāi)發(fā)工具 分布式
對(duì)于剛開(kāi)始的時(shí)候,很多公司的服務(wù)器可能是由單個(gè)組成,但是隨著業(yè)務(wù)的發(fā)展,單一節(jié)點(diǎn)的服務(wù)無(wú)法滿足業(yè)務(wù)的飛速發(fā)展,后面就出現(xiàn)了分布式、集群的概念,到了現(xiàn)在形成的微服務(wù),技術(shù)的改進(jìn)能夠更好的滿足業(yè)務(wù)的需要。

[[430218]]

本文轉(zhuǎn)載自微信公眾號(hào)「牧小農(nóng)」,作者牧小農(nóng)。轉(zhuǎn)載本文請(qǐng)聯(lián)系牧小農(nóng)公眾號(hào)。

ZooKeeper 的由來(lái)

PS:這個(gè)不重要,不感興趣的,可以直接看下面

來(lái)源:《從 Paxos 到 ZooKeeper 》

ZooKeeper 最早起源于雅虎研究院的一個(gè)研究小組,在當(dāng)時(shí),研究人員發(fā)現(xiàn),在雅虎內(nèi)部有很多的大型系統(tǒng)基本上都需要依賴一個(gè)類似的系統(tǒng)來(lái)進(jìn)行分布式協(xié)調(diào),但是這些系統(tǒng)往往都存在分布式單點(diǎn)的問(wèn)題,所有雅虎的開(kāi)發(fā)人員就嘗試開(kāi)發(fā)了一個(gè)通用的無(wú)單點(diǎn)問(wèn)題的分布式協(xié)調(diào)框架,以便讓開(kāi)發(fā)人員將精力集中在處理業(yè)務(wù)邏輯上。關(guān)于"ZooKeeper"這個(gè)項(xiàng)目的名字。也有一個(gè)故事,在項(xiàng)目開(kāi)始初期,因?yàn)榭紤]到內(nèi)部的很多項(xiàng)目都是用動(dòng)物的名字來(lái)命名的(例如:Pig項(xiàng)目),所以雅虎的工程師也希望給這個(gè)項(xiàng)目也取一個(gè)動(dòng)物的名字,這個(gè)時(shí)候擔(dān)任研究院的首席科學(xué)家 Raghu Ramakrishnan 開(kāi)玩笑地說(shuō):“在這樣下去,我們這兒就變成動(dòng)物園”,此話一出,大家紛紛表示就叫動(dòng)物園管理員吧,因?yàn)楦鱾€(gè)以動(dòng)物命名的分布式組件放在一起,這個(gè)分布式系統(tǒng)看上去就像一個(gè)大型的動(dòng)物園了,而ZooKeeper 正好要用來(lái)進(jìn)行分布式環(huán)境的協(xié)調(diào),于是ZooKeeper 的名字也就由此誕生了。

分布式配置中心

在上一期中我們講解了 ZooKeeper集群的配置和安裝,ZooKeeper集群 主要是幫我們做分布式協(xié)調(diào)的,今天我們用ZK實(shí)現(xiàn)分布式配置。關(guān)于ZooKeeper 集群的配置大家可以參考上一篇文章《Zookeeper 集群部署的那些事兒》。

為什么需要分布式配置中心

對(duì)于剛開(kāi)始的時(shí)候,很多公司的服務(wù)器可能是由單個(gè)組成,但是隨著業(yè)務(wù)的發(fā)展,單一節(jié)點(diǎn)的服務(wù)無(wú)法滿足業(yè)務(wù)的飛速發(fā)展,后面就出現(xiàn)了分布式、集群的概念,到了現(xiàn)在形成的微服務(wù),技術(shù)的改進(jìn)能夠更好的滿足業(yè)務(wù)的需要。

假設(shè)我們線上有很多個(gè)微服務(wù)分布在不同的服務(wù)器上,其中一個(gè)微服務(wù),我們就叫它 goods-service,當(dāng) goods-service的IP地址需要變更的時(shí)候,但是 goods-service又對(duì)很多其他的程序提供了服務(wù),這個(gè)時(shí)候如果沒(méi)有一個(gè)統(tǒng)一配置的東西,每一個(gè)應(yīng)用到 goods-service的應(yīng)用程序都要做相應(yīng)的IP地址修改,這是一個(gè)很麻煩的事情!

如果使用ZooKeeper來(lái)做分布式配置的話,是可以解決這個(gè)問(wèn)題的。

注冊(cè)中心對(duì)比

如果我們只考慮服務(wù)治理的話,Eureka是比較合適的,Eureka是比較純粹的注冊(cè)中心了,和Eureka不同Apache ZooKeeper 在設(shè)計(jì)的時(shí)候就遵循 CP原則,任何時(shí)候?qū)?ZooKeeper 的訪問(wèn)請(qǐng)求都能得到一致的數(shù)據(jù)結(jié)果,同事系統(tǒng)對(duì)網(wǎng)絡(luò)分割具有容錯(cuò)性,今天我們講解的就是關(guān)于ZooKeeper 的注冊(cè)發(fā)現(xiàn)。

配置中心的核心

低延遲: 配置改變( create/update/delete)后能夠最快的把最新的配置同步到其他節(jié)點(diǎn)中

高可用: 配置中心可以穩(wěn)定的對(duì)外提供服務(wù)

其中 低延遲 我們可以通過(guò) ZooKeeper 的 Watcher 機(jī)制來(lái)實(shí)現(xiàn)(等下會(huì)講到Watcher機(jī)制)。約定一個(gè)節(jié)點(diǎn)用來(lái)存放配置信息,每個(gè)客戶端都來(lái)監(jiān)聽(tīng)這個(gè)節(jié)點(diǎn)的NodeDataChanged事件,當(dāng)配置發(fā)生改變時(shí)將最新的配置更新到這個(gè)節(jié)點(diǎn)上,誰(shuí)更新無(wú)所謂,任何節(jié)點(diǎn)都可以更新,當(dāng)這個(gè)節(jié)點(diǎn)觸發(fā) NodeDataChanged 事件后,在去通知所有監(jiān)聽(tīng)這個(gè)節(jié)點(diǎn)的客戶端去獲取這個(gè)節(jié)點(diǎn)的最新信息,因?yàn)閣atcher 是一次性的,所以當(dāng)我們?cè)讷@取最新信息的時(shí)候需要設(shè)置監(jiān)聽(tīng)事件,大部分查詢信息都是具有原子性的,所以ZooKeeper中的 getData 也是具有原子性操作,能夠保證我們?nèi)〉玫男畔⑹亲钚碌摹?/p>

對(duì)于 高可用 我們首先需要保證的多集群操作來(lái)進(jìn)行ZooKeeper進(jìn)行部署,在代碼層不太需要做過(guò)多的工作。

Watch 機(jī)制

Watch 是 ZooKeeper 針對(duì)節(jié)點(diǎn)的一次性觀察者機(jī)制,就如同我們上面 * 低延遲* 中講到的,一次觸發(fā)后就失效,需要手工重新創(chuàng)建Watch。

當(dāng)Watch監(jiān)視的數(shù)據(jù)發(fā)生變化的時(shí)候,就會(huì)通知設(shè)置了 Watch 的客戶端,就是我們API中的Watcher,Watcher機(jī)制就是為了監(jiān)聽(tīng)Znode節(jié)點(diǎn)發(fā)生了哪些變化,所以會(huì)有對(duì)應(yīng)的事件類型和狀態(tài)類型,用過(guò)代碼中switch進(jìn)行監(jiān)聽(tīng),一個(gè)客戶端可以鏈接多個(gè)節(jié)點(diǎn),只要Znode節(jié)點(diǎn)發(fā)生變化就會(huì)執(zhí)行 process(WatchedEventevent)。

如下圖所示:

從上圖中我們可以看到,在ZooKeeper中,Watch采用的是推送機(jī)制,而不是客戶端輪詢,有些中間件采用的是拉取的模式,例如:KafKa。

Watch有兩種監(jiān)聽(tīng)模式,分別為 事件類型和狀態(tài)類型 :

事件類型:Znode 節(jié)點(diǎn)關(guān)聯(lián),主要是針對(duì)節(jié)點(diǎn)的操作

  • 創(chuàng)建節(jié)點(diǎn):EventType.NodeCreated
  • 節(jié)點(diǎn)數(shù)據(jù)發(fā)生變化:EventType.NodeDataChanged
  • 當(dāng)前節(jié)點(diǎn)的子節(jié)點(diǎn)發(fā)生變化:EventType.NodeChildrenChanged
  • 刪除節(jié)點(diǎn):EventType.NodeDeleted

狀態(tài)類型:客戶端關(guān)聯(lián),主要是針對(duì)于ZooKeeper集群和應(yīng)用服務(wù)之間的狀態(tài)的變更

  • 未連接:KeeperState.Disconnected
  • 已連接:KeeperState.SyncConnected
  • 認(rèn)證失?。篕eeperState.AuthFailed
  • 過(guò)期:KeeperState.Expired
  • 客戶端連接到只讀服務(wù)器:KeeperState.ConnectedReadOnly

watch的特性

一次性觸發(fā): 對(duì)于ZooKeeper的Watcher事件,是一次性觸發(fā)的,當(dāng) Watch 監(jiān)視的數(shù)據(jù)發(fā)生變化的時(shí)候,通知設(shè)置當(dāng)前Watch 的 Client,就是我們對(duì)應(yīng)的 Watcher,因?yàn)閆ooKeeper 的監(jiān)控都是一次性的,所以我們需要在每次觸發(fā)后設(shè)置監(jiān)控。

客戶端串行執(zhí)行: 客戶端Watcher回調(diào)的過(guò)程是一個(gè)串行同步的過(guò)程,可以為我們保證順序的執(zhí)行。

輕量級(jí): WatchedEvent是ZooKeeper整個(gè)Watcher通知機(jī)制的最小通知單元,總共包含三個(gè)部分(通知狀態(tài)、事件類型和節(jié)點(diǎn)路徑),Watcher通知,只會(huì)告訴客戶端發(fā)生事件而不會(huì)告知具體內(nèi)容,需要客戶端主動(dòng)去進(jìn)行獲取,比如 當(dāng)監(jiān)聽(tīng)到 WatchedEvent.NodeDataChanged 信息變化的時(shí)候,只會(huì)告訴我們這個(gè)節(jié)點(diǎn)的數(shù)據(jù)發(fā)生了變更,你快來(lái)獲取最新的值吧。

客戶端設(shè)置的每個(gè)監(jiān)視點(diǎn)與會(huì)話關(guān)聯(lián),如果會(huì)話過(guò)期,等待中的監(jiān)視點(diǎn)將會(huì)被刪除。不過(guò)監(jiān)視點(diǎn)可以跨越不同服務(wù)端的連接而保持,例如,當(dāng)一個(gè)ZooKeeper客戶端與一個(gè)ZooKeeper服務(wù)端的連接斷開(kāi)后連接到集合中的另一個(gè)服務(wù)端,客戶端會(huì)發(fā)送未觸發(fā)的監(jiān)視點(diǎn)列表,在注冊(cè)監(jiān)視點(diǎn)時(shí),服務(wù)端將要檢查已監(jiān)視的znode節(jié)點(diǎn)在之前注冊(cè)監(jiān)視點(diǎn)之后是否已經(jīng)變化,如果znode節(jié)點(diǎn)已經(jīng)發(fā)生變化,一個(gè)監(jiān)視點(diǎn)的事件就會(huì)被發(fā)送給客戶端,否則在新的服務(wù)端上注冊(cè)監(jiān)視點(diǎn)。這一機(jī)制使得我們可以關(guān)心邏輯層會(huì)話,而非底層連接本身。

客戶端注冊(cè)

ZooKeeper 注冊(cè)的時(shí)候會(huì)向ZooKeeper 服務(wù)端請(qǐng)求注冊(cè),服務(wù)端會(huì)返回請(qǐng)求響應(yīng),不管成功失敗,都會(huì)返回響應(yīng)結(jié)果,當(dāng)響應(yīng)成功的時(shí)候,ZooKeeper服務(wù)端會(huì)把Watcher對(duì)象放到客戶端的WatchManager管理并返回響應(yīng)給客戶端

服務(wù)端注冊(cè)

FinalRequestProcessor.ProcessRequest()會(huì)判斷當(dāng)前請(qǐng)求是否需要注冊(cè)Watcher

如果ZooKeeper判斷當(dāng)前客戶端需要進(jìn)行Watcher注冊(cè),會(huì)將當(dāng)前的ServerCnxn 對(duì)象和數(shù)據(jù)路徑傳入 getData 方法中去。ServerCnxn 是ZooKeeper 客戶端和服務(wù)端之間的連接接口,代表了一個(gè)客戶端和服務(wù)端的連接,可以將 ServerCnxn 當(dāng)做一個(gè) Watcher 對(duì)象,因?yàn)樗鼘?shí)現(xiàn)了 Watcher 的 process 接口。

WatcherManager

WatcherManager是 ZK服務(wù)端 Watcher 的管理器,分為 WatchTable 和 Watch2Paths 兩個(gè)存儲(chǔ)結(jié)構(gòu),這兩個(gè)是不同的存儲(chǔ)結(jié)構(gòu) 1)WatchTable:從數(shù)據(jù)節(jié)點(diǎn)路徑的顆粒度管理 Watcher 2)Watch2Paths:從Watcher的顆粒度來(lái)控制時(shí)間出發(fā)的數(shù)據(jù)節(jié)點(diǎn)

在服務(wù)端,DataTree 中會(huì)托管兩個(gè) WatchManager, 分別是 dataWatches (數(shù)據(jù)變更Watch) 和 childWatches(子節(jié)點(diǎn)變更Watch)。

Watcher 觸發(fā)邏輯

1)封裝WatchedEven:將(KeeperState(通知狀態(tài)),EventType(事件類型),Path(節(jié)點(diǎn)路徑))封裝成一個(gè) WatchedEvent 對(duì)象 2)查詢Watcher:根據(jù)路徑取出對(duì)應(yīng)的Watcher,如果存在,取出數(shù)據(jù)同時(shí)從 WatcherManager(WatchTable/Watch2Paths) 中刪除 3)調(diào)用Process方法觸發(fā)Watcher

4.客戶端回調(diào) Watcher

1)反序列化:字節(jié)流轉(zhuǎn)換成 WatcherEvent 對(duì)象 2)處理 chrootPath:如果客戶端設(shè)置了 chrootPath 屬性,那么需要對(duì)服務(wù)器傳過(guò)來(lái)的完整節(jié)點(diǎn)路徑進(jìn)行 chrootPath 處理,生成客戶端的一個(gè)相對(duì)節(jié)點(diǎn)路徑。比如(/mxn/app/love,經(jīng)過(guò)chrootPath處理,會(huì)變成 /love) 3)還原 WatchedEvent:WatcherEvent 轉(zhuǎn)換成 WatchedEvent 4)回調(diào) Watcher:將 WatcherEvent 對(duì)象交給 EventThread 線程,在下一個(gè)輪詢周期中進(jìn)行 Watcher 回調(diào)

EventThread 處理時(shí)間通知

1) SendThread 接收到服務(wù)端的通知事件后,會(huì)通過(guò)調(diào)用 EventThread.queueEvent 方法將事件傳給 EventThread 線程 2)queueEvent 方法首先會(huì)根據(jù)該通知事件,從 ZKWatchManager 中取出所有相關(guān)的 Watcher 客戶端識(shí)別出 事件類型 EventType 后,會(huì)從相應(yīng)的 Watcher 存儲(chǔ) (即3個(gè)注冊(cè)方法( dataWatches、existWatcher 或 childWatcher)中去除對(duì)應(yīng)的 Watcher 3) 獲取到相關(guān)的所有 Watcher 后,會(huì)將其放入 waitingEvents 這個(gè)隊(duì)列去

代碼實(shí)現(xiàn)

下面我們就來(lái)演示如何使用代碼來(lái)實(shí)現(xiàn)ZooKeeper的配置

首先我們需要引入ZK的jar

  1. <dependency> 
  2.       <groupId>org.apache.zookeeper</groupId> 
  3.       <artifactId>zookeeper</artifactId> 
  4.       <version>3.6.3</version> 
  5.     </dependency> 

配置類

既然我們要做的是分布式配置,首先我們需要模擬一個(gè)配置,這個(gè)配置用來(lái)同步服務(wù)的地址

  1. /** 
  2.  * @program: mxnzookeeper 
  3.  * @ClassName MyConf 
  4.  * @description: 配置類 
  5.  * @author: muxiaonong 
  6.  * @create: 2021-10-19 22:18 
  7.  * @Version 1.0 
  8.  **/ 
  9. public class MyConfig { 
  10.  
  11.     private String conf ; 
  12.  
  13.     public String getConf() { 
  14.         return conf; 
  15.     } 
  16.  
  17.     public void setConf(String conf) { 
  18.         this.conf = conf; 
  19.     } 
  20.  

Watcher

創(chuàng)建ZooKeeper的時(shí)候,我們需要一個(gè)Watcher進(jìn)行監(jiān)聽(tīng),后續(xù)對(duì)Znode節(jié)點(diǎn)操作的時(shí)候,我們也需要使用到Watcher,但是這兩類的功能不一樣,所以我們需要定義一個(gè)自己的watcher類,如下所示:

  1. import org.apache.zookeeper.WatchedEvent; 
  2. import org.apache.zookeeper.Watcher; 
  3.  
  4. import java.util.concurrent.CountDownLatch; 
  5.  
  6. /** 
  7.  * @program: mxnzookeeper 
  8.  * @ClassName DefaultWatch 
  9.  * @description: 
  10.  * @author: muxiaonong 
  11.  * @create: 2021-10-19 22:02 
  12.  * @Version 1.0 
  13.  **/ 
  14. public class DefaultWatch implements Watcher { 
  15.  
  16.     CountDownLatch cc; 
  17.      
  18.     public void setCc(CountDownLatch cc) { 
  19.         this.cc = cc; 
  20.     } 
  21.  
  22.     @Override 
  23.     public void process(WatchedEvent event) { 
  24.         System.out.println(event.toString()); 
  25.  
  26.         switch (event.getState()) { 
  27.             case Unknown: 
  28.                 break; 
  29.             case Disconnected: 
  30.                 break; 
  31.             case NoSyncConnected: 
  32.                 break; 
  33.             case SyncConnected: 
  34.                 System.out.println("連接成功。。。。。"); 
  35.                 //連接成功后,執(zhí)行countDown,此時(shí)便可以拿zk對(duì)象使用了 
  36.                 cc.countDown(); 
  37.                 break; 
  38.             case AuthFailed: 
  39.                 break; 
  40.             case ConnectedReadOnly: 
  41.                 break; 
  42.             case SaslAuthenticated: 
  43.                 break; 
  44.             case Expired: 
  45.                 break; 
  46.             case Closed: 
  47.                 break; 
  48.         } 
  49.  
  50.     } 

由于是異步進(jìn)行操作的,我們創(chuàng)建一個(gè)ZooKeeper對(duì)象之后,如果不進(jìn)行阻塞操作的話,有可能還沒(méi)有連接完成就執(zhí)行后續(xù)的操作,所以這里我們用 CountDownLatch進(jìn)行阻塞操作,當(dāng)監(jiān)測(cè)連接成功后,進(jìn)行 countDown放行,執(zhí)行后續(xù)的ZK的動(dòng)作。

當(dāng)我們連接成功 ZooKeeper 之后,我們需要通過(guò) exists判斷是否存在節(jié)點(diǎn),存在就進(jìn)行 getData操作。這里我們創(chuàng)建一個(gè) WatchCallBack因?yàn)?exists和getData都需要一個(gè)callback,所以除了實(shí)現(xiàn)Watcher以外還需要實(shí)現(xiàn) 節(jié)點(diǎn)狀態(tài):AsyncCallback.StatCallback數(shù)據(jù)監(jiān)聽(tīng):AsyncCallback.DataCallback

  1. import org.apache.zookeeper.AsyncCallback; 
  2. import org.apache.zookeeper.WatchedEvent; 
  3. import org.apache.zookeeper.Watcher; 
  4. import org.apache.zookeeper.ZooKeeper; 
  5. import org.apache.zookeeper.data.Stat; 
  6.  
  7. import java.util.concurrent.CountDownLatch; 
  8.  
  9. /** 
  10.  * @program: mxnzookeeper 
  11.  * @ClassName WatchCallBack 
  12.  * @description: 
  13.  * @author: muxiaonong 
  14.  * @create: 2021-10-19 22:13 
  15.  * @Version 1.0 
  16.  **/ 
  17. public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { 
  18.  
  19.     ZooKeeper zk ; 
  20.     MyConfig conf ; 
  21.     CountDownLatch cc = new CountDownLatch(1); 
  22.  
  23.     public MyConfig getConf() { 
  24.         return conf; 
  25.     } 
  26.  
  27.     public void setConf(MyConfig conf) { 
  28.         this.conf = conf; 
  29.     } 
  30.  
  31.     public ZooKeeper getZk() { 
  32.         return zk; 
  33.     } 
  34.  
  35.     public void setZk(ZooKeeper zk) { 
  36.         this.zk = zk; 
  37.     } 
  38.  
  39.  
  40.     public void aWait(){ 
  41.         //exists的異步實(shí)現(xiàn)版本 
  42.         zk.exists(ZKConstants.ZK_NODE,this,this ,"exists watch"); 
  43.         try { 
  44.             cc.await(); 
  45.         } catch (InterruptedException e) { 
  46.             e.printStackTrace(); 
  47.         } 
  48.     } 
  49.  
  50.     /** @Author mxn 
  51.      * @Description //TODO 此回調(diào)用于檢索節(jié)點(diǎn)的stat 
  52.      * @Date 21:24 2021/10/20 
  53.      * @param rc 調(diào)用返回的code或結(jié)果 
  54.      * @param path 傳遞給異步調(diào)用的路徑 
  55.      * @param ctx 傳遞給異步調(diào)用的上下文對(duì)象 
  56.      * @param stat 指定路徑上節(jié)點(diǎn)的Stat對(duì)象 
  57.      * @return  
  58.      **/ 
  59.     @Override 
  60.     public void processResult(int rc, String path, Object ctx, Stat stat) { 
  61.         if(stat != null){ 
  62.             //getData的異步實(shí)現(xiàn)版本 
  63.             zk.getData(ZKConstants.ZK_NODE,this,this,"status"); 
  64.         } 
  65.     } 
  66.  
  67.  
  68.     /** @Author mxn 
  69.      * @Description //TODO  此回調(diào)用于檢索節(jié)點(diǎn)的數(shù)據(jù)和stat 
  70.      * @Date 21:23 2021/10/20 
  71.      * @param rc 調(diào)用返回的code或結(jié)果 
  72.      * @param path 傳遞給異步調(diào)用的路徑 
  73.      * @param ctx 傳遞給異步調(diào)用的上下文對(duì)象 
  74.      * @param data 節(jié)點(diǎn)的數(shù)據(jù) 
  75.      * @param stat 指定節(jié)點(diǎn)的Stat對(duì)象 
  76.      * @return 
  77.      **/ 
  78.     @Override 
  79.     public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { 
  80.         if(data != null ){ 
  81.             String s = new String(data); 
  82.             conf.setConf(s); 
  83.             cc.countDown(); 
  84.         } 
  85.     } 
  86.  
  87.     /** @Author mxn 
  88.      * @Description //TODO Watcher接口的實(shí)現(xiàn)。 
  89.      *                      Watcher接口指定事件處理程序類必須實(shí)現(xiàn)的公共接口。 
  90.      *                      ZooKeeper客戶機(jī)將從它連接到的ZooKeeper服務(wù)器獲取各種事件。 
  91.      *                      使用這種客戶機(jī)的應(yīng)用程序通過(guò)向客戶機(jī)注冊(cè)回調(diào)對(duì)象來(lái)處理這些事件。 
  92.      *                      回調(diào)對(duì)象應(yīng)該是實(shí)現(xiàn)監(jiān)視器接口的類的實(shí)例。 
  93.      * @Date 21:24 2021/10/20 
  94.      * @Param  watchedEvent WatchedEvent表示監(jiān)視者能夠響應(yīng)的ZooKeeper上的更改。 
  95.      *          WatchedEvent包含發(fā)生了什么, 
  96.      *          ZooKeeper的當(dāng)前狀態(tài),以及事件中涉及的znode的路徑。 
  97.      * @return  
  98.      **/ 
  99.     @Override 
  100.     public void process(WatchedEvent event) { 
  101.         switch (event.getType()) { 
  102.             case None: 
  103.                 break; 
  104.             case NodeCreated: 
  105.                 //當(dāng)一個(gè)node被創(chuàng)建后,獲取node 
  106.                 //getData中又會(huì)觸發(fā)StatCallback的回調(diào)processResult 
  107.                 zk.getData(ZKConstants.ZK_NODE,this,this,"sdfs"); 
  108.                 break; 
  109.             case NodeDeleted: 
  110.                 //節(jié)點(diǎn)刪除 
  111.                 conf.setConf(""); 
  112.                 //重新開(kāi)啟CountDownLatch 
  113.                 cc = new CountDownLatch(1); 
  114.                 break; 
  115.             case NodeDataChanged: 
  116.                 //節(jié)點(diǎn)數(shù)據(jù)被改變了 
  117.                 //觸發(fā)DataCallback的回調(diào) 
  118.                 zk.getData(ZKConstants.ZK_NODE,this,this,"sdfs"); 
  119.                 break; 
  120.                 //子節(jié)點(diǎn)發(fā)生變化的時(shí)候 
  121.             case NodeChildrenChanged: 
  122.                 break; 
  123.         } 
  124.  
  125.  
  126.     } 

當(dāng)前面準(zhǔn)備好了之后,我們可以編寫測(cè)試用例了:

ZKUtils 工具類

  1. import org.apache.zookeeper.ZooKeeper; 
  2.  
  3. import java.util.concurrent.CountDownLatch; 
  4.  
  5. /** 
  6.  * @program: mxnzookeeper 
  7.  * @ClassName ZKUtils 
  8.  * @description: 
  9.  * @author: muxiaonong 
  10.  * @create: 2021-10-19 21:59 
  11.  * @Version 1.0 
  12.  **/ 
  13. public class ZKUtils { 
  14.  
  15.     private static ZooKeeper zk; 
  16.  
  17.     //192.168.5.130:2181/mxn 這個(gè)后面/mxn,表示客戶端如果成功建立了到zk集群的連接, 
  18.     // 那么默認(rèn)該客戶端工作的根path就是/mxn,如果不帶/mxn,默認(rèn)根path是/ 
  19.     //當(dāng)然我們要保證/mxn這個(gè)節(jié)點(diǎn)在ZK上是存在的 
  20.     private static String address ="192.18.5.129:2181,192.168.5.130:2181,192.168.5.130:2181/mxn"
  21.  
  22.     private static DefaultWatch watch = new DefaultWatch(); 
  23.  
  24.     private static CountDownLatch init = new CountDownLatch(1); 
  25.  
  26.     public static ZooKeeper getZK(){ 
  27.  
  28.         try { 
  29.             //因?yàn)槭钱惒降?,所以要await,等到連接上zk集群之后再進(jìn)行后續(xù)操作 
  30.             zk = new ZooKeeper(address,1000,watch); 
  31.             watch.setCc(init); 
  32.             init.await(); 
  33.  
  34.         } catch (Exception e) { 
  35.             e.printStackTrace(); 
  36.         } 
  37.  
  38.         return zk; 
  39.     } 
  40.  

測(cè)試類:

  1. import org.apache.zookeeper.ZooKeeper; 
  2. import org.junit.Before; 
  3. import org.junit.Test; 
  4.  
  5. /** 
  6.  * @program: mxnzookeeper 
  7.  * @ClassName TestConfig 
  8.  * @description: 
  9.  * @author: muxiaonong 
  10.  * @create: 2021-10-19 22:04 
  11.  * @Version 1.0 
  12.  **/ 
  13. public class TestConfig { 
  14.  
  15.     ZooKeeper zk; 
  16.  
  17.     @Before 
  18.     public void conn(){ 
  19.         zk = ZKUtils.getZK(); 
  20.     } 
  21.  
  22.     /** @Author mxn 
  23.      * @Description //TODO 關(guān)閉ZK 
  24.      * @Date 21:16 2021/10/20 
  25.      * @Param 
  26.      * @return 
  27.      **/ 
  28.     public void close(){ 
  29.         try { 
  30.             zk.close(); 
  31.         }catch (Exception e){ 
  32.             e.printStackTrace(); 
  33.         } 
  34.     } 
  35.  
  36.     @Test 
  37.     public void getConf(){ 
  38.         WatchCallBack watchCallBack = new WatchCallBack(); 
  39.         watchCallBack.setZk(zk); 
  40.         MyConfig myConfig = new MyConfig(); 
  41.         watchCallBack.setConf(myConfig); 
  42.  
  43.         //阻塞等待 
  44.         watchCallBack.aWait(); 
  45.  
  46.  
  47.         while(true){ 
  48.  
  49.             if(myConfig.getConf().equals("")){ 
  50.                 System.out.println("zk node 節(jié)點(diǎn)丟失了 ......"); 
  51.                 watchCallBack.aWait(); 
  52.             }else
  53.                 System.out.println(myConfig.getConf()); 
  54.  
  55.             } 
  56. // 
  57.             try { 
  58.                 //每隔500毫秒打印一次 
  59.                 Thread.sleep(500); 
  60.             } catch (InterruptedException e) { 
  61.                 e.printStackTrace(); 
  62.             } 
  63.         } 
  64.  
  65.  
  66.     } 

運(yùn)行測(cè)試

首先我們要知道,因?yàn)槲覀冞B接IP的時(shí)候加上了 /mxn這個(gè)目錄結(jié)構(gòu),所以我們?cè)诜?wù)器初始狀態(tài)就必須要有這個(gè)節(jié)點(diǎn):

集群初始狀態(tài):

  1. [zk: localhost:2181(CONNECTED) 7] ls / 
  2. [mxn, zookeeper] 

我們啟動(dòng)程序看看

連接成功

ZooKeeper 下 /mxn 現(xiàn)在也是空

  1. [zk: localhost:2181(CONNECTED) 9] ls /mxn 
  2. [] 
  3. [zk: localhost:2181(CONNECTED) 10] 

現(xiàn)在我們來(lái)創(chuàng)建一個(gè) /mxn/myZNode節(jié)點(diǎn)數(shù)據(jù)

  1. [zk: localhost:2181(CONNECTED) 10] create /mxn/myZNode "muxiaonong666" 
  2. Created /mxn/myZNode 

可以看到,創(chuàng)建完成之后,程序馬上給出響應(yīng),打印出了我配置的值,muxiaonong666

此時(shí),再設(shè)置 /mxn/myZNode的值為 muxiaonong6969

啪,很快啊!!!我們就可以看到值瞬間改變了

這個(gè)時(shí)候我們?nèi)绻麆h除 /mxn/myZNode節(jié)點(diǎn),會(huì)發(fā)生什么呢,前面我們已經(jīng)寫了watch,如果Znode被刪除了,,watch and callback執(zhí)行

  1. case NodeDeleted: 
  2.                 //節(jié)點(diǎn)刪除 
  3.                 conf.setConf(""); 
  4.                 //重新開(kāi)啟CountDownLatch 
  5.                 cc = new CountDownLatch(1); 
  6.                 break; 
  7.  
  8.  
  9.  
  10.  if(myConfig.getConf().equals("")){ 
  11.               System.out.println("zk node 節(jié)點(diǎn)丟失了 ......"); 
  12.                 ////此時(shí)應(yīng)該阻塞住,等待著node重新創(chuàng)建 
  13.                 watchCallBack.aWait(); 
  14.   } 

刪除 /mxn/myZNode 節(jié)點(diǎn)

  1. delete /mxn/myZNode 

我們可以看到前面還在打印數(shù)據(jù),后面就提示丟失。

但是這個(gè)時(shí)候我們客戶端沒(méi)有關(guān)閉,而是還在等待數(shù)據(jù)的更新,如果這個(gè)時(shí)候當(dāng)重新進(jìn)行創(chuàng)建 /mxn/myZNode節(jié)點(diǎn)的時(shí)候,程序又會(huì)繼續(xù)瘋狂輸出。

  1. create/mxn/myZNode"muxiaonong666" 

程序正常運(yùn)行,并且成功獲取到了zk配置的最新數(shù)據(jù),到這里基本上就實(shí)現(xiàn)了,ZooKeeper的分布式配置中心功能了

在這里我測(cè)試用的是 getData,但是在項(xiàng)目實(shí)戰(zhàn)用可能用的更多的是 子節(jié)點(diǎn)的操作 getChildren

總結(jié)

到這里我們這篇 ZooKeeper分布式配置注冊(cè)發(fā)現(xiàn) 就講完了,如果有疑問(wèn)的地方歡迎進(jìn)行討論,ZooKeeper 可以作為分布式配置中心,也可以用來(lái)當(dāng)然微服務(wù)的注冊(cè),不過(guò)現(xiàn)在微服務(wù)都有自己的一套服務(wù)發(fā)現(xiàn),對(duì)于了解ZooKeeper可以我們方便我們?cè)谶M(jìn)行技術(shù)選型的時(shí)候更好的去抉擇, ZooKeeper 的高可用和最終一致性也是比較穩(wěn)定。

本文代碼地址:https://github.com/muxiaonong/ZooKeeper/tree/master/mxnzookeeper

 

責(zé)任編輯:武曉燕 來(lái)源: 牧小農(nóng)
相關(guān)推薦

2021-09-30 07:59:06

zookeeper一致性算法CAP

2023-11-07 07:46:02

GatewayKubernetes

2018-10-12 09:42:00

分布式鎖 Java多線

2019-08-16 09:41:56

UDP協(xié)議TCP

2021-05-07 07:52:51

Java并發(fā)編程

2022-03-29 08:23:56

項(xiàng)目數(shù)據(jù)SIEM

2017-03-30 22:41:55

虛擬化操作系統(tǒng)軟件

2024-08-27 11:00:56

單例池緩存bean

2021-09-10 13:06:45

HDFS底層Hadoop

2023-09-25 08:32:03

Redis數(shù)據(jù)結(jié)構(gòu)

2023-10-04 00:32:01

數(shù)據(jù)結(jié)構(gòu)Redis

2021-07-28 13:29:57

大數(shù)據(jù)PandasCSV

2020-10-23 07:43:37

Log配置性能

2021-04-11 08:30:40

VRAR虛擬現(xiàn)實(shí)技術(shù)

2018-09-26 11:02:46

微服務(wù)架構(gòu)組件

2022-08-18 20:45:30

HTTP協(xié)議數(shù)據(jù)

2023-12-07 09:07:58

2021-11-10 07:47:48

Traefik邊緣網(wǎng)關(guān)

2023-11-22 07:54:33

Xargs命令Linux

2021-12-13 10:43:45

HashMapJava集合容器
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)