自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

面試官:Zookeeper了解嗎?說說都有哪些使用場景?

開發(fā)
本文主要來聊聊 Zookeeper 主要的幾個(gè)使用場景。

 前言

  • Zookeeper特性與節(jié)點(diǎn)說明
  • Zookeeper客戶端使用與集群原理

前兩篇講了Zookeeper的特性、客戶端使用和集群原理,因?yàn)?Zookeeper 是分布式系統(tǒng)中很常見的一個(gè)基礎(chǔ)系統(tǒng)。 而且問的話常問的就是說 zookeeper 的使用場景是什么? 看你知道不知道一些基本的使用場景。 但是其實(shí) Zookeeper 挖深了自然是可以問的很深很深的。本文主要來聊聊 Zookeeper 主要的幾個(gè)使用場景。

  1. 分布式集群管理
  2. 分布式注冊中心
  3. 分布式JOB
  4. 分布式鎖

分布式集群管理
分布式集群管理的需求

  1. 主動(dòng)查看線上服務(wù)節(jié)點(diǎn)
  2. 查看服務(wù)節(jié)點(diǎn)資源使用情況
  3. 服務(wù)離線通知
  4. 服務(wù)資源(CPU、內(nèi)存、硬盤)超出閥值通知

架構(gòu)設(shè)計(jì)

節(jié)點(diǎn)結(jié)構(gòu)

  1. niuh-manger // 根節(jié)點(diǎn)
  2. server00001 : //服務(wù)節(jié)點(diǎn) 1
  3. server00002 ://服務(wù)節(jié)點(diǎn) 2
  4. server........n ://服務(wù)節(jié)點(diǎn) n

服務(wù)狀態(tài)信息

  1. ip
  2. cpu
  3. memory
  4. disk

功能實(shí)現(xiàn)
數(shù)據(jù)生成與上報(bào)

  1. 創(chuàng)建臨時(shí)節(jié)點(diǎn):
  2. 定時(shí)變更節(jié)點(diǎn)狀態(tài)信息:

主動(dòng)查詢

  • 實(shí)時(shí)查詢 zookeeper 獲取集群節(jié)點(diǎn)的狀態(tài)信息。

被動(dòng)通知

  • 監(jiān)聽根節(jié)點(diǎn)下子節(jié)點(diǎn)的變化情況,如果CPU 等硬件資源低于警告位則發(fā)出警報(bào)。

關(guān)鍵示例代碼

  1. package com.niuh.os; 
  2. import com.fasterxml.jackson.core.JsonProcessingException; 
  3. import com.fasterxml.jackson.databind.ObjectMapper; 
  4. import org.I0Itec.zkclient.ZkClient; 
  5. import java.lang.instrument.Instrumentation; 
  6. import java.lang.management.ManagementFactory; 
  7. import java.lang.management.MemoryUsage; 
  8. import java.net.InetAddress; 
  9. import java.net.UnknownHostException; 
  10. public class Agent { 
  11.     private static Agent ourInstance = new Agent(); 
  12.     private String server = "127.0.0.1:2181"
  13.     private ZkClient zkClient; 
  14.     private static final String rootPath = "/niuh-manger"
  15.     private static final String servicePath = rootPath + "/service"
  16.     private String nodePath; ///niuh-manger/service0000001 當(dāng)前節(jié)點(diǎn)路徑 
  17.     private Thread stateThread; 
  18.     public static Agent getInstance() { 
  19.         return ourInstance; 
  20.     } 
  21.     private Agent() { 
  22.     } 
  23.     // javaagent 數(shù)據(jù)監(jiān)控 
  24.     public static void premain(String args, Instrumentation instrumentation) { 
  25.         Agent.getInstance().init(); 
  26.     } 
  27.     public void init() { 
  28.         zkClient = new ZkClient(server, 5000, 10000); 
  29.         System.out.println("zk連接成功" + server); 
  30.         // 創(chuàng)建根節(jié)點(diǎn) 
  31.         buildRoot(); 
  32.         // 創(chuàng)建臨時(shí)節(jié)點(diǎn) 
  33.         createServerNode(); 
  34.         // 啟動(dòng)更新的線程 
  35.         stateThread = new Thread(() -> { 
  36.             while (true) { 
  37.                 updateServerNode(); 
  38.                 try { 
  39.                     Thread.sleep(5000); 
  40.                 } catch (InterruptedException e) { 
  41.                     e.printStackTrace(); 
  42.                 } 
  43.             } 
  44.         }, "zk_stateThread"); 
  45.         stateThread.setDaemon(true); 
  46.         stateThread.start(); 
  47.     } 
  48.     // 數(shù)據(jù)寫到 當(dāng)前的臨時(shí)節(jié)點(diǎn)中去 
  49.     public void updateServerNode() { 
  50.         zkClient.writeData(nodePath, getOsInfo()); 
  51.     } 
  52.     // 生成服務(wù)節(jié)點(diǎn) 
  53.     public void createServerNode() { 
  54.         nodePath = zkClient.createEphemeralSequential(servicePath, getOsInfo()); 
  55.         System.out.println("創(chuàng)建節(jié)點(diǎn):" + nodePath); 
  56.     } 
  57.     // 更新服務(wù)節(jié)點(diǎn)狀態(tài) 
  58.     public String getOsInfo() { 
  59.         OsBean bean = new OsBean(); 
  60.         bean.lastUpdateTime = System.currentTimeMillis(); 
  61.         bean.ip = getLocalIp(); 
  62.         bean.cpu = CPUMonitorCalc.getInstance().getProcessCpu(); 
  63.         MemoryUsage memoryUsag = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); 
  64.         bean.usedMemorySize = memoryUsag.getUsed() / 1024 / 1024; 
  65.         bean.usableMemorySize = memoryUsag.getMax() / 1024 / 1024; 
  66.         bean.pid = ManagementFactory.getRuntimeMXBean().getName(); 
  67.         ObjectMapper mapper = new ObjectMapper(); 
  68.         try { 
  69.             return mapper.writeValueAsString(bean); 
  70.         } catch (JsonProcessingException e) { 
  71.             throw new RuntimeException(e); 
  72.         } 
  73.     } 
  74.     public static String getLocalIp() { 
  75.         InetAddress addr = null
  76.         try { 
  77.             addr = InetAddress.getLocalHost(); 
  78.         } catch (UnknownHostException e) { 
  79.             throw new RuntimeException(e); 
  80.         } 
  81.         return addr.getHostAddress(); 
  82.     } 
  83.     public void buildRoot() { 
  84.         if (!zkClient.exists(rootPath)) { 
  85.             zkClient.createPersistent(rootPath); 
  86.         } 
  87.     } 

實(shí)現(xiàn)效果
啟動(dòng)參數(shù)設(shè)置

 

運(yùn)行測試用例:

  1. package com.niuh.test; 
  2. import com.niuh.os.Agent; 
  3. import org.junit.Ignore
  4. import org.junit.Test; 
  5. public class AgentTest { 
  6.     @Test 
  7.     @Ignore 
  8.     public void initTest() { 
  9.         Agent.premain(nullnull); 
  10.         runCPU(2); //20% 占用 
  11.         try { 
  12.             Thread.sleep(Long.MAX_VALUE); 
  13.         } catch (InterruptedException e) { 
  14.             e.printStackTrace(); 
  15.         } 
  16.     } 
  17.     // 
  18.     private void runCPU(int count) { 
  19.         for (int i = 0; i < count; i++) { 
  20.             new Thread(() -> { 
  21.                 while (true) { 
  22.                     long bac = 1000000; 
  23.                     bac = bac >> 1; 
  24.                 } 
  25.             }).start(); 
  26.             ; 
  27.         } 
  28.     } 

控制臺(tái)輸出:

  1. CPU 報(bào)警...22.55120088850181 
  2. CPU 報(bào)警...46.06592086097357CPU 報(bào)警...47.87206766163349CPU 報(bào)警...49.49176420213768CPU 報(bào)警...48.967942479969004CPU 報(bào)警...49.193921607021565CPU 報(bào)警...48.806604284784676CPU 報(bào)警...48.63229912951865CPU 報(bào)警...49.34509647972038CPU 報(bào)警...47.07551108884401CPU 報(bào)警...49.18489236134496CPU 報(bào)警...49.903007346777066CPU 報(bào)警...49.28868795953268// 關(guān)閉測試用例服務(wù)已下線:OsBean{ip='192.168.43.11', cpu=49.28868795953268, usedMemorySize=56, usableMemorySize=3641, pid='47192@hejianhui', lastUpdateTime=1602056208842} 

本Demo不適用在生產(chǎn)環(huán)境,示例Demo涉及組件zookeeper-agent、zookeeper-web。源代碼提交在 github:https://github.com/Niuh-Frame/niuh-zookeeper。

分布式注冊中心
在單體式服務(wù)中,通常是由多個(gè)客戶端去調(diào)用一個(gè)服務(wù),只要在客戶端中配置唯一服務(wù)節(jié)點(diǎn)地址即可,當(dāng)升級(jí)到分布式后,服務(wù)節(jié)點(diǎn)變多,像一線大廠服務(wù)節(jié)點(diǎn)更是上萬之多,這么多節(jié)點(diǎn)不可能手動(dòng)配置在客戶端,這里就需要一個(gè)中間服務(wù),專門用于幫助客戶端發(fā)現(xiàn)服務(wù)節(jié)點(diǎn),即許多技術(shù)書籍經(jīng)常提到的服務(wù)發(fā)現(xiàn)。

一個(gè)完整的注冊中心涵蓋以下功能特性:

  • 服務(wù)注冊:提供者上線時(shí)將自提供的服務(wù)提交給注冊中心。
  • 服務(wù)注銷:通知注冊心提供者下線。
  • 服務(wù)訂閱:動(dòng)態(tài)實(shí)時(shí)接收服務(wù)變更消息。
  • 可靠:注冊服務(wù)本身是集群的,數(shù)據(jù)冗余存儲(chǔ)。避免單點(diǎn)故障,及數(shù)據(jù)丟失。
  • 容錯(cuò):當(dāng)服務(wù)提供者出現(xiàn)宕機(jī),斷電等極情況時(shí),注冊中心能夠動(dòng)態(tài)感知并通知客戶端服務(wù)提供者的狀態(tài)。

Dubbo 對 Zookeeper的使用
阿里著名的開源項(xiàng)目Dubbo 是一個(gè)基于JAVA的RCP框架,其中必不可少的注冊中心可基于多種第三方組件實(shí)現(xiàn),但其官方推薦的還是Zookeeper作為注冊中心服務(wù)。

Dubbo Zookeeper注冊中心存儲(chǔ)結(jié)構(gòu)

節(jié)點(diǎn)說明

流程說明

  1. 服務(wù)提供者啟動(dòng)時(shí): 向 /dubbo/com.foo.BarService/providers 目錄下寫入自己的 URL 地址
  2. 服務(wù)消費(fèi)者啟動(dòng)時(shí): 訂閱 /dubbo/com.foo.BarService/providers 目錄下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目錄下寫入自己的 URL 地址
  3. 監(jiān)控中心啟動(dòng)時(shí): 訂閱 /dubbo/com.foo.BarService 目錄下的所有提供者和消費(fèi)者 URL 地址。

示例Demo
服務(wù)端代碼

  1. package com.niuh.zk.dubbo; 
  2. import com.alibaba.dubbo.config.ApplicationConfig; 
  3. import com.alibaba.dubbo.config.ProtocolConfig; 
  4. import com.alibaba.dubbo.config.RegistryConfig; 
  5. import com.alibaba.dubbo.config.ServiceConfig; 
  6. import java.io.IOException; 
  7. public class Server { 
  8.     public void openServer(int port) { 
  9.         // 構(gòu)建應(yīng)用 
  10.         ApplicationConfig config = new ApplicationConfig(); 
  11.         config.setName("simple-app"); 
  12.         // 通信協(xié)議 
  13.         ProtocolConfig protocolConfig = new ProtocolConfig("dubbo", port); 
  14.         protocolConfig.setThreads(200); 
  15.         ServiceConfig<UserService> serviceConfig = new ServiceConfig(); 
  16.         serviceConfig.setApplication(config); 
  17.         serviceConfig.setProtocol(protocolConfig); 
  18.         serviceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); 
  19.         serviceConfig.setInterface(UserService.class); 
  20.         UserServiceImpl ref = new UserServiceImpl(); 
  21.         serviceConfig.setRef(ref); 
  22.         //開始提供服務(wù)  開張做生意 
  23.         serviceConfig.export(); 
  24.         System.out.println("服務(wù)已開啟!端口:"+serviceConfig.getExportedUrls().get(0).getPort()); 
  25.         ref.setPort(serviceConfig.getExportedUrls().get(0).getPort()); 
  26.     } 
  27.     public static void main(String[] args) throws IOException { 
  28.         new Server().openServer(-1); 
  29.         System.in.read(); 
  30.     } 

客戶端代碼

  1. package com.niuh.zk.dubbo; 
  2. import com.alibaba.dubbo.config.ApplicationConfig; 
  3. import com.alibaba.dubbo.config.ReferenceConfig; 
  4. import com.alibaba.dubbo.config.RegistryConfig; 
  5. import java.io.IOException; 
  6. public class Client {    UserService service;    // URL 遠(yuǎn)程服務(wù)的調(diào)用地址    public UserService buildService(String url) {        ApplicationConfig config = new ApplicationConfig("young-app"); 
  7.         // 構(gòu)建一個(gè)引用對象        ReferenceConfig<UserService> referenceConfig = new ReferenceConfig<UserService>();        referenceConfig.setApplication(config); 
  8.         referenceConfig.setInterface(UserService.class);        // referenceConfig.setUrl(url);        referenceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); 
  9.         referenceConfig.setTimeout(5000); 
  10.         // 透明化        this.service = referenceConfig.get();        return service; 
  11.     }    static int i = 0; 
  12.     public static void main(String[] args) throws IOException {        Client client1 = new Client();        client1.buildService(""); 
  13.         String cmd;        while (!(cmd = read()).equals("exit")) { 
  14.             UserVo u = client1.service.getUser(Integer.parseInt(cmd));            System.out.println(u);        }    }    private static String read() throws IOException { 
  15.         byte[] b = new byte[1024]; 
  16.         int size = System.in.read(b); 
  17.         return new String(b, 0, size).trim(); 
  18.     }} 

查詢 zk 實(shí)際存儲(chǔ)內(nèi)容:

  1. /dubbo 
  2. /dubbo/com.niuh.zk.dubbo.UserService/dubbo/com.niuh.zk.dubbo.UserService/configurators/dubbo/com.niuh.zk.dubbo.UserService/routers/dubbo/com.niuh.zk.dubbo.UserService/providers/dubbo/com.niuh.zk.dubbo.UserService/providers/dubbo://192.168.43.11:20880/com.niuh.zk.dubbo.UserService?anyhost=true&application=simple-app&dubbo=2.6.2&generic=false&interface=com.niuh.zk.dubbo.UserService&methods=getUser&pid=48302&side=provider&threads=200×tamp=1602057895881/dubbo/com.niuh.zk.dubbo.UserService/consumers/dubbo/com.niuh.zk.dubbo.UserService/consumers/consumer://192.168.43.11com.niuh.zk.dubbo.UserService?application=young-app&category=consumers&check=false&dubbo=2.6.2&interface=com.niuh.zk.dubbo.UserService&methods=getUser&pid=49036&side=consumer&timeout=5000×tamp=1602075359549 

示例Demo涉及組件zookeeper-dubbo。源代碼提交在 github:https://github.com/Niuh-Frame/niuh-zookeeper。

分布式JOB
分布式JOB需求
多個(gè)服務(wù)節(jié)點(diǎn)只允許其中一個(gè)主節(jié)點(diǎn)運(yùn)行JOB任務(wù)。
當(dāng)主節(jié)點(diǎn)掛掉后能自動(dòng)切換主節(jié)點(diǎn),繼續(xù)執(zhí)行JOB任務(wù)。
架構(gòu)設(shè)計(jì)

node結(jié)構(gòu)

  1. niuh-master
  2. server0001:master
  3. server0002:slave
  4. server000n:slave

選舉流程
服務(wù)啟動(dòng):

  1. 在niuh-maste下創(chuàng)建server子節(jié)點(diǎn),值為slave
  2. 獲取所有niuh-master 下所有子節(jié)點(diǎn)
  3. 判斷是否存在master 節(jié)點(diǎn)
  4. 如果沒有設(shè)置自己為master節(jié)點(diǎn)

子節(jié)點(diǎn)刪除事件觸發(fā):

  1. 獲取所有niuh-master 下所有子節(jié)點(diǎn)
  2. 判斷是否存在master 節(jié)點(diǎn)
  3. 如果沒有設(shè)置最小值序號(hào)為master 節(jié)點(diǎn)

示例Demo

  1. package com.niuh.zookeeper.master; 
  2. import org.I0Itec.zkclient.ZkClient; 
  3. import java.util.Map; 
  4. import java.util.stream.Collectors; 
  5. public class MasterResolve { 
  6.     private String server = "127.0.0.1:2181"
  7.     private ZkClient zkClient; 
  8.     private static final String rootPath = "/niuh-master"
  9.     private static final String servicePath = rootPath + "/service"
  10.     private String nodePath; 
  11.     private volatile boolean master = false
  12.     private static MasterResolve resolve; 
  13.     private MasterResolve() { 
  14.         zkClient = new ZkClient(server, 2000, 5000); 
  15.         buildRoot();        createServerNode();    }    public static MasterResolve getInstance() { 
  16.         if (resolve == null) { 
  17.             resolve= new MasterResolve(); 
  18.         }        return resolve; 
  19.     }    // 構(gòu)建根節(jié)點(diǎn) 
  20.     public void buildRoot() { 
  21.         if (!zkClient.exists(rootPath)) { 
  22.             zkClient.createPersistent(rootPath); 
  23.         } 
  24.     } 
  25.     // 創(chuàng)建server節(jié)點(diǎn) 
  26.     public void createServerNode() { 
  27.         nodePath = zkClient.createEphemeralSequential(servicePath, "slave"); 
  28.         System.out.println("創(chuàng)建service節(jié)點(diǎn):" + nodePath); 
  29.         initMaster(); 
  30.         initListener(); 
  31.     } 
  32.     private void initMaster() { 
  33.         boolean existMaster = zkClient.getChildren(rootPath) 
  34.                 .stream() 
  35.                 .map(p -> rootPath + "/" + p) 
  36.                 .map(p -> zkClient.readData(p)) 
  37.                 .anyMatch(d -> "master".equals(d)); 
  38.         if (!existMaster) { 
  39.             doElection(); 
  40.             System.out.println("當(dāng)前當(dāng)選master"); 
  41.         } 
  42.     } 
  43.     private void initListener() { 
  44.         zkClient.subscribeChildChanges(rootPath, (parentPath, currentChilds) -> { 
  45.             doElection();//  執(zhí)行選舉 
  46.         }); 
  47.     } 
  48.     // 執(zhí)行選舉 
  49.     public void doElection() { 
  50.         Map<String, Object> childData = zkClient.getChildren(rootPath) 
  51.                 .stream() 
  52.                 .map(p -> rootPath + "/" + p) 
  53.                 .collect(Collectors.toMap(p -> p, p -> zkClient.readData(p))); 
  54.         if (childData.containsValue("master")) { 
  55.             return
  56.         } 
  57.         childData.keySet().stream().sorted().findFirst().ifPresent(p -> { 
  58.             if (p.equals(nodePath)) { // 設(shè)置最小值序號(hào)為master 節(jié)點(diǎn) 
  59.                 zkClient.writeData(nodePath, "master"); 
  60.                 master = true
  61.                 System.out.println("當(dāng)前當(dāng)選master" + nodePath); 
  62.             } 
  63.         }); 
  64.     } 
  65.     public static boolean isMaster() { 
  66.         return getInstance().master; 
  67.     } 

示例Demo涉及組件zookeeper-master。源代碼提交在 github :https://github.com/Niuh-Frame/niuh-zookeeper。

分布式鎖
鎖的的基本概念
開發(fā)中鎖的概念并不陌生,通過鎖可以實(shí)現(xiàn)在多個(gè)線程或多個(gè)進(jìn)程間在爭搶資源時(shí),能夠合理的分配置資源的所有權(quán)。在單體應(yīng)用中我們可以通過 synchronized 或 ReentrantLock 來實(shí)現(xiàn)鎖。但在分布式系統(tǒng)中,僅僅是加synchronized 是不夠的,需要借助第三組件來實(shí)現(xiàn)。比如一些簡單的做法是使用關(guān)系型數(shù)據(jù)行級(jí)鎖來實(shí)現(xiàn)不同進(jìn)程之間的互斥,但大型分布式系統(tǒng)的性能瓶頸往往集中在數(shù)據(jù)庫操作上。為了提高性能得采用如Redis、Zookeeper之內(nèi)的組件實(shí)現(xiàn)分布式鎖。

共享鎖:也稱作只讀鎖,當(dāng)一方獲得共享鎖之后,其它方也可以獲得共享鎖。但其只允許讀取。在共享鎖全部釋放之前,其它方不能獲得寫鎖。

排它鎖:也稱作讀寫鎖,獲得排它鎖后,可以進(jìn)行數(shù)據(jù)的讀寫。在其釋放之前,其它方不能獲得任何鎖。

鎖的獲取
某銀行賬戶,可以同時(shí)進(jìn)行帳戶信息的讀取,但讀取期間不能修改帳戶數(shù)據(jù)。其賬戶ID為:888

獲得讀鎖流程

  1. 基于資源ID創(chuàng)建臨時(shí)序號(hào)讀鎖節(jié)點(diǎn) /lock/888.R0000000002 Read
  2. 獲取 /lock 下所有子節(jié)點(diǎn),判斷其最小的節(jié)點(diǎn)是否為讀鎖,如果是則獲鎖成功
  3. 最小節(jié)點(diǎn)不是讀鎖,則阻塞等待。添加lock/ 子節(jié)點(diǎn)變更監(jiān)聽。
  4. 當(dāng)節(jié)點(diǎn)變更監(jiān)聽觸發(fā),執(zhí)行第2步

數(shù)據(jù)結(jié)構(gòu)

獲得寫鎖

  1. 基于資源ID創(chuàng)建臨時(shí)序號(hào)寫鎖節(jié)點(diǎn) /lock/888.R0000000002 Write
  2. 獲取 /lock 下所有子節(jié)點(diǎn),判斷其最小的節(jié)點(diǎn)是否為自己,如果是則獲鎖成功
  3. 最小節(jié)點(diǎn)不是自己,則阻塞等待。添加lock/ 子節(jié)點(diǎn)變更監(jiān)聽。
  4. 當(dāng)節(jié)點(diǎn)變更監(jiān)聽觸發(fā),執(zhí)行第2步

釋放鎖
讀取完畢后,手動(dòng)刪除臨時(shí)節(jié)點(diǎn),如果獲鎖期間宕機(jī),則會(huì)在會(huì)話失效后自動(dòng)刪除。

關(guān)于羊群效應(yīng)
在等待鎖獲得期間,所有等待節(jié)點(diǎn)都在監(jiān)聽 Lock節(jié)點(diǎn),一但lock 節(jié)點(diǎn)變更所有等待節(jié)點(diǎn)都會(huì)被觸發(fā),然后在同時(shí)反查Lock 子節(jié)點(diǎn)。如果等待對例過大會(huì)使用Zookeeper承受非常大的流量壓力。

為了改善這種情況,可以采用監(jiān)聽鏈表的方式,每個(gè)等待隊(duì)列只監(jiān)聽前一個(gè)節(jié)點(diǎn),如果前一個(gè)節(jié)點(diǎn)釋放鎖的時(shí)候,才會(huì)被觸發(fā)通知。這樣就形成了一個(gè)監(jiān)聽鏈表。

示例Demo

  1. package com.niuh.zookeeper.lock; 
  2. import org.I0Itec.zkclient.IZkDataListener; 
  3. import org.I0Itec.zkclient.ZkClient; 
  4. import java.util.List; 
  5. import java.util.stream.Collectors; 
  6. public class ZookeeperLock { 
  7.     private String server = "127.0.0.1:2181"
  8.     private ZkClient zkClient; 
  9.     private static final String rootPath = "/niuh-lock1"
  10.     public ZookeeperLock() { 
  11.         zkClient = new ZkClient(server, 5000, 20000); 
  12.         buildRoot();    }    // 構(gòu)建根節(jié)點(diǎn) 
  13.     public void buildRoot() { 
  14.         if (!zkClient.exists(rootPath)) { 
  15.             zkClient.createPersistent(rootPath); 
  16.         } 
  17.     } 
  18.     // 獲取鎖 
  19.     public Lock lock(String lockId, long timeout) { 
  20.         // 創(chuàng)建臨時(shí)節(jié)點(diǎn) 
  21.         Lock lockNode = createLockNode(lockId); 
  22.         lockNode = tryActiveLock(lockNode);// 嘗試激活鎖 
  23.         if (!lockNode.isActive()) { 
  24.             try { 
  25.                 synchronized (lockNode) { 
  26.                     lockNode.wait(timeout); // 線程鎖住 
  27.                 } 
  28.             } catch (InterruptedException e) { 
  29.                 throw new RuntimeException(e); 
  30.             } 
  31.         } 
  32.         if (!lockNode.isActive()) { 
  33.             throw new RuntimeException(" lock  timeout"); 
  34.         } 
  35.         return lockNode; 
  36.     } 
  37.     // 釋放鎖 
  38.     public void unlock(Lock lock) { 
  39.         if (lock.isActive()) { 
  40.             zkClient.delete(lock.getPath()); 
  41.         } 
  42.     } 
  43.     // 嘗試激活鎖 
  44.     private Lock tryActiveLock(Lock lockNode) { 
  45.         // 獲取根節(jié)點(diǎn)下面所有的子節(jié)點(diǎn) 
  46.         List<String> list = zkClient.getChildren(rootPath) 
  47.                 .stream() 
  48.                 .sorted() 
  49.                 .map(p -> rootPath + "/" + p) 
  50.                 .collect(Collectors.toList());      // 判斷當(dāng)前是否為最小節(jié)點(diǎn) 
  51.         String firstNodePath = list.get(0); 
  52.         // 最小節(jié)點(diǎn)是不是當(dāng)前節(jié)點(diǎn) 
  53.         if (firstNodePath.equals(lockNode.getPath())) { 
  54.             lockNode.setActive(true); 
  55.         } else { 
  56.             String upNodePath = list.get(list.indexOf(lockNode.getPath()) - 1); 
  57.             zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() { 
  58.                 @Override 
  59.                 public void handleDataChange(String dataPath, Object data) throws Exception { 
  60.                 } 
  61.                 @Override 
  62.                 public void handleDataDeleted(String dataPath) throws Exception { 
  63.                     // 事件處理 與心跳 在同一個(gè)線程,如果Debug時(shí)占用太多時(shí)間,將導(dǎo)致本節(jié)點(diǎn)被刪除,從而影響鎖邏輯。 
  64.                     System.out.println("節(jié)點(diǎn)刪除:" + dataPath); 
  65.                      Lock lock = tryActiveLock(lockNode); 
  66.                     synchronized (lockNode) { 
  67.                         if (lock.isActive()) { 
  68.                             lockNode.notify(); // 釋放了 
  69.                         } 
  70.                     } 
  71.                     zkClient.unsubscribeDataChanges(upNodePath, this); 
  72.                 } 
  73.             }); 
  74.         } 
  75.         return lockNode; 
  76.     } 
  77.     public Lock createLockNode(String lockId) { 
  78.         String nodePath = zkClient.createEphemeralSequential(rootPath + "/" + lockId, "w"); 
  79.         return new Lock(lockId, nodePath); 
  80.     } 

示例Demo涉及組件zookeeper-lock。源代碼提交在 github :https://github.com/Niuh-Frame/niuh-zookeeper。

 

責(zé)任編輯:姜華 來源: 今日頭條
相關(guān)推薦

2021-05-31 10:35:34

TCPWebSocket協(xié)議

2021-09-16 07:52:18

算法應(yīng)用場景

2021-07-07 08:36:45

React應(yīng)用場景

2021-07-12 08:35:24

組件應(yīng)用場景

2022-06-10 13:56:42

Java

2021-11-10 07:47:49

組合模式場景

2021-11-04 06:58:32

策略模式面試

2021-11-03 14:10:28

工廠模式場景

2021-08-16 08:33:26

git

2021-11-09 08:51:13

模式命令面試

2021-11-05 07:47:56

代理模式對象

2024-05-29 14:34:07

2021-09-29 07:24:20

場景數(shù)據(jù)

2021-06-07 09:41:48

NodeBuffer 網(wǎng)絡(luò)協(xié)議

2021-06-08 08:33:23

NodeStream數(shù)據(jù)

2021-09-28 07:12:09

測試路徑

2021-09-06 10:51:27

TypeScriptJavaScript

2021-11-11 16:37:05

模板模式方法

2021-11-22 23:50:59

責(zé)任鏈模式場景

2021-09-08 07:49:34

TypeScript 泛型場景
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)