ZK(ZooKeeper)分布式鎖實(shí)現(xiàn)
準(zhǔn)備
本文會(huì)使用到 三臺(tái) 獨(dú)立服務(wù)器,可以自行提前搭建好。
不知道如何搭建的,可以看我之前 ZooKeeper集群 搭建:Zookeeper 集群部署的那些事兒
關(guān)于ZooKeeper 一些基礎(chǔ)命令可以看這篇:Zookeeper入門(mén)看這篇就夠了
前言
在平時(shí)我們對(duì)鎖的使用,在針對(duì)單個(gè)服務(wù),我們可以用 Java 自帶的一些鎖來(lái)實(shí)現(xiàn),資源的順序訪問(wèn),但是隨著業(yè)務(wù)的發(fā)展,現(xiàn)在基本上公司的服務(wù)都是多個(gè),單純的 Lock或者Synchronize 只能解決單個(gè)JVM線程的問(wèn)題,那么針對(duì)于單個(gè)服務(wù)的 Java 的鎖是無(wú)法滿(mǎn)足我們業(yè)務(wù)的需要的,為了解決多個(gè)服務(wù)跨服務(wù)訪問(wèn)共享資源,于是就有了分布鎖,分布式鎖產(chǎn)生的原因就是集群。
正文
實(shí)現(xiàn)分布式鎖的方式有哪些呢?
- 分布式鎖的實(shí)現(xiàn)方式主要以(ZooKeeper、Reids、Mysql)這三種為主
今天我們主要講解的是使用 ZooKeeper來(lái)實(shí)現(xiàn)分布式鎖,ZooKeeper的應(yīng)用場(chǎng)景主要包含這幾個(gè)方面:
- 服務(wù)注冊(cè)與訂閱(共用節(jié)點(diǎn))
- 分布式通知(監(jiān)聽(tīng)ZNode)
- 服務(wù)命令(ZNode特性)
- 數(shù)據(jù)訂閱、發(fā)布(Watcher)
- 分布式鎖(臨時(shí)節(jié)點(diǎn))
ZooKeeper實(shí)現(xiàn)分布式鎖,主要是得益于ZooKeeper 保證了數(shù)據(jù)的強(qiáng)一致性,鎖的服務(wù)可以分為兩大類(lèi):
保持獨(dú)占
所有試圖來(lái)獲取當(dāng)前鎖的客戶(hù)端,最終有且只有一個(gè)能夠成功得到當(dāng)前鎖的鑰匙,通常我們會(huì)把 ZooKeeper 上的節(jié)點(diǎn)(ZNode)看做一把鎖,通過(guò) create臨時(shí)節(jié)點(diǎn)的方式來(lái)實(shí)現(xiàn),當(dāng)多個(gè)客戶(hù)端都去創(chuàng)建一把鎖的時(shí)候,那么只有成功創(chuàng)建了那個(gè)客戶(hù)端才能擁有這把鎖
控制時(shí)序
所有試圖獲取鎖的客戶(hù)端,都是被順序執(zhí)行,只是會(huì)有一個(gè)序號(hào)(zxid),我們會(huì)有一個(gè)節(jié)點(diǎn),例如:/testLock,所有臨時(shí)節(jié)點(diǎn)都在這個(gè)下面去創(chuàng)建,ZK的父節(jié)點(diǎn)(/testLock) 維持了一個(gè)序號(hào),這個(gè)是ZK自帶的屬性,他保證了子節(jié)點(diǎn)創(chuàng)建的時(shí)序性,從而也形成了每個(gè)客戶(hù)端的一個(gè) 全局時(shí)序
ZK鎖機(jī)制
在實(shí)現(xiàn)ZooKeeper 分布式鎖之前我們有必要了解一下,關(guān)于ZooKeeper分布式鎖機(jī)制的實(shí)現(xiàn)流程和原理,不然各位寶貝,出去面試的時(shí)候怎么和面試官侃侃而談~
臨時(shí)順序節(jié)點(diǎn)
基于ZooKeeper的臨時(shí)順序節(jié)點(diǎn) ,ZooKeeper比較適合來(lái)實(shí)現(xiàn)分布式鎖:
- 順序發(fā)號(hào)器: ZooKeeper的每一個(gè)節(jié)點(diǎn),都是自帶順序生成器:在每個(gè)節(jié)點(diǎn)下面創(chuàng)建臨時(shí)節(jié)點(diǎn),新的子節(jié)點(diǎn)后面,會(huì)添加一個(gè)次序編號(hào),這個(gè)生成的編號(hào),會(huì)在上一次的編號(hào)進(jìn)行 +1 操作
- 有序遞增: ZooKeeper節(jié)點(diǎn)有序遞增,可以保證鎖的公平性,我們只需要在一個(gè)持久父節(jié)點(diǎn)下,創(chuàng)建對(duì)應(yīng)的臨時(shí)順序節(jié)點(diǎn),每個(gè)線程在嘗試占用鎖之前,會(huì)調(diào)用watch,判斷自己當(dāng)前的序號(hào)是不是在當(dāng)前父節(jié)點(diǎn)最小,如果是,那么獲取鎖
- Znode監(jiān)聽(tīng): 每個(gè)線程在搶占所之前,會(huì)創(chuàng)建屬于當(dāng)前線程的ZNode節(jié)點(diǎn),在釋放鎖的時(shí)候,會(huì)刪除創(chuàng)建的ZNode,當(dāng)我們創(chuàng)建的序號(hào)不是最小的時(shí)候,會(huì)等待watch通知,也就是上一個(gè)ZNode的狀態(tài)通知,當(dāng)前一個(gè)ZNode刪除的時(shí)候,會(huì)觸發(fā)回調(diào)機(jī)制,告訴下一個(gè)ZNode,你可以獲取鎖開(kāi)始工作了
- 臨時(shí)節(jié)點(diǎn)自動(dòng)刪除: ZooKeeper還有一個(gè)好處,當(dāng)我們客戶(hù)端斷開(kāi)連接之后,我們出創(chuàng)建的臨時(shí)節(jié)點(diǎn)會(huì)進(jìn)行自動(dòng)刪除操作,所以我們?cè)谑褂梅植际芥i的時(shí)候,一般都是會(huì)去創(chuàng)建臨時(shí)節(jié)點(diǎn),這樣可以避免因?yàn)榫W(wǎng)絡(luò)異常等原因,造成的死鎖。
- 羊群效應(yīng): ZooKeeper節(jié)點(diǎn)的順序訪問(wèn)性,后面監(jiān)聽(tīng)前面的方式,可以有效的避免 羊群效應(yīng),什么是羊群效應(yīng):當(dāng)某一個(gè)節(jié)點(diǎn)掛掉了,所有的節(jié)點(diǎn)都要去監(jiān)聽(tīng),然后做出回應(yīng),這樣會(huì)給服務(wù)器帶來(lái)比較大壓力,如果有了臨時(shí)順序節(jié)點(diǎn),當(dāng)一個(gè)節(jié)點(diǎn)掛掉了,只有它后面的那一個(gè)節(jié)點(diǎn)才做出反應(yīng)。
我們現(xiàn)在看一下下面一張圖:
在上圖中, ZooKeeper里面有一把鎖節(jié)點(diǎn) testLock,這個(gè)鎖就是 ZooKeeper的一個(gè)節(jié)點(diǎn),當(dāng)兩個(gè)客戶(hù)端來(lái)獲取這把鎖的時(shí)候,會(huì)對(duì) ZooKeeper進(jìn)行加鎖的請(qǐng)求,也就是我們所說(shuō)的 臨時(shí)順序節(jié)點(diǎn)。
當(dāng)我們?cè)?/testLock目錄下創(chuàng)建了一個(gè)順序臨時(shí)節(jié)點(diǎn)后,ZK會(huì)自動(dòng)對(duì)這個(gè)臨時(shí)節(jié)點(diǎn)維護(hù) 一個(gè)節(jié)點(diǎn)序號(hào),并且這個(gè)節(jié)點(diǎn)是遞增的,比如我們 clientA 創(chuàng)建了一個(gè)臨時(shí)順序節(jié)點(diǎn),ZK內(nèi)部會(huì)生成一個(gè)序號(hào):/lock0000000001,那么 clientB 也生成了一個(gè)臨時(shí)順序節(jié)點(diǎn),ZK會(huì)生成一個(gè)序號(hào)為 /lock0000000002,在這里數(shù)字都是依次遞增的,從1開(kāi)始遞增,ZK內(nèi)部會(huì)維護(hù)這個(gè)順序。
下圖所示:
這時(shí)候,ClientA會(huì)進(jìn)行監(jiān)聽(tīng)判斷,在父節(jié)點(diǎn)下,我是不是最小的,如果是的話(huà),那么俺就可以加鎖了,因?yàn)槲沂亲钚〉?,其他的都比我大。我自己可以進(jìn)行加鎖,你已經(jīng)是一個(gè)成熟的臨時(shí)節(jié)點(diǎn)了,要學(xué)會(huì)自己加鎖???,那么ZK是怎么進(jìn)行判斷的呢?寶貝,您往下看:
這個(gè)是 cleintA已經(jīng)加鎖完成了,這個(gè)時(shí)候 clientB也要過(guò)來(lái)加鎖,那么他也要在 /testLock,創(chuàng)建一個(gè)屬于自己的臨時(shí)節(jié)點(diǎn),那么這個(gè)時(shí)候他的序號(hào)就會(huì)變成 /lock0000000002,如下圖所示:
這個(gè)時(shí)候就會(huì)出現(xiàn)我們前面所講的,clientB 在加鎖的時(shí)候會(huì)判斷,自己是不是最小的,一看在當(dāng)前父節(jié)點(diǎn)下不是最小的,啊~我還挺大的,還有比我小的!!!
加鎖失敗呀,咳咳,這個(gè)時(shí)候呢,clientB 就會(huì)去偷窺clientA,氣氛逐漸曖昧起來(lái),啊不是,是按照順序去監(jiān)聽(tīng)前一個(gè)節(jié)點(diǎn)(clientA),是否完成工作了,如果完成了,clientB才可以進(jìn)行加鎖工作,寶貝,你往下看圖片:
clientA 加鎖成功后,會(huì)進(jìn)行自己的業(yè)務(wù)處理,當(dāng) clientA 處理完工作后,說(shuō)我完事了,下一個(gè),那么 clientA 是怎么完事的呢,他多長(zhǎng)時(shí)間?不是,具體流程是怎樣的?小農(nóng)你不對(duì)勁,說(shuō)什么呢!!!真羞澀
上面我們不是說(shuō)了,當(dāng) clientB 加鎖失敗后,會(huì)給前一個(gè)節(jié)點(diǎn)(clientA)加上一個(gè)監(jiān)聽(tīng),當(dāng)clientA被刪除以后,就表示有人釋放了鎖,這個(gè)時(shí)候就會(huì)通知 clientB重新去獲取鎖。
這個(gè)時(shí)候clientB重新獲取鎖的時(shí)候,發(fā)現(xiàn)自己就是當(dāng)前父節(jié)點(diǎn)下面最小的那個(gè),于是clientB就開(kāi)始加鎖,開(kāi)始工作等一系列操作,當(dāng)clientB 完事以后,釋放鎖,也說(shuō)了一句,下一個(gè)。
如下圖所示:
當(dāng)然除了 clientA、clientB還有C\D\E等,這字母看著好奇怪又好熟悉,原理都是一樣的,都是最小節(jié)點(diǎn)進(jìn)行解鎖,如果不是,監(jiān)聽(tīng)前一個(gè)節(jié)點(diǎn)是否釋放,如果釋放了,再次嘗試加鎖。如果前一節(jié)節(jié)點(diǎn)釋放了,自己就是最小了,就排到前面去了,有點(diǎn)類(lèi)似于 銀行取號(hào) 的操作。
代碼實(shí)現(xiàn)
使用ZooKeeper 創(chuàng)建臨時(shí)順序節(jié)點(diǎn)來(lái)實(shí)現(xiàn)分布式鎖,大體的流程就是 先創(chuàng)建一個(gè)持久父節(jié)點(diǎn),在當(dāng)前節(jié)點(diǎn)下,創(chuàng)建臨時(shí)順序節(jié)點(diǎn),找出最小的序列號(hào),獲取分布式鎖,程序業(yè)務(wù)完成之后釋放鎖,通知下一個(gè)節(jié)點(diǎn)進(jìn)行操作,使用的是watch來(lái)監(jiān)控節(jié)點(diǎn)的變化,然后依次下一個(gè)最小序列節(jié)點(diǎn)進(jìn)行操作。
首先我們需要?jiǎng)?chuàng)建一個(gè)持久父類(lèi)節(jié)點(diǎn):我這里是 /mxn
WatchCallBack
- import org.apache.zookeeper.*;
- import org.apache.zookeeper.data.Stat;
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
- /**
- * @program: mxnzookeeper
- * @ClassName WatchCallBack
- * @description:
- * @author: 微信搜索:牧小農(nóng)
- * @create: 2021-10-23 10:48
- * @Version 1.0
- **/
- public class WatchCallBack implements Watcher, AsyncCallback.StringCallback ,AsyncCallback.Children2Callback ,AsyncCallback.StatCallback {
- ZooKeeper zk ;
- String threadName;
- CountDownLatch cc = new CountDownLatch(1);
- String pathName;
- public String getPathName() {
- return pathName;
- }
- public void setPathName(String pathName) {
- this.pathName = pathName;
- }
- public String getThreadName() {
- return threadName;
- }
- public void setThreadName(String threadName) {
- this.threadName = threadName;
- }
- public ZooKeeper getZk() {
- return zk;
- }
- public void setZk(ZooKeeper zk) {
- this.zk = zk;
- }
- /** @Author 牧小農(nóng)
- * @Description //TODO 嘗試加鎖方法
- * @Date 16:14 2021/10/24
- * @Param
- * @return
- **/
- public void tryLock(){
- try {
- System.out.println(threadName + " 開(kāi)始創(chuàng)建。。。。");
- //創(chuàng)建一個(gè)順序臨時(shí)節(jié)點(diǎn)
- zk.create("/lock",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"abc");
- //阻塞當(dāng)前,監(jiān)聽(tīng)前一個(gè)節(jié)點(diǎn)是否釋放鎖
- cc.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- /** @Author 牧小農(nóng)
- * @Description //TODO 解鎖方法
- * @Date 16:14 2021/10/24
- * @Param
- * @return
- **/
- public void unLock(){
- try {
- //釋放鎖,刪除臨時(shí)節(jié)點(diǎn)
- zk.delete(pathName,-1);
- //結(jié)束工作
- System.out.println(threadName + " 結(jié)束工作了....");
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (KeeperException e) {
- e.printStackTrace();
- }
- }
- @Override
- public void process(WatchedEvent event) {
- //如果第一個(gè)節(jié)點(diǎn)釋放了鎖,那么第二個(gè)就會(huì)收到回調(diào)
- //告訴它前一個(gè)節(jié)點(diǎn)釋放了,你可以開(kāi)始嘗試獲取鎖
- switch (event.getType()) {
- case None:
- break;
- case NodeCreated:
- break;
- case NodeDeleted:
- //當(dāng)前節(jié)點(diǎn)重新獲取鎖
- zk.getChildren("/",false,this ,"sdf");
- break;
- case NodeDataChanged:
- break;
- case NodeChildrenChanged:
- break;
- }
- }
- @Override
- public void processResult(int rc, String path, Object ctx, String name) {
- if(name != null ){
- System.out.println(threadName +" 線程創(chuàng)建了一個(gè)節(jié)點(diǎn)為 : " + name );
- pathName = name ;
- //監(jiān)聽(tīng)前一個(gè)節(jié)點(diǎn)
- zk.getChildren("/",false,this ,"sdf");
- }
- }
- //getChildren call back
- @Override
- public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
- //節(jié)點(diǎn)按照編號(hào),升序排列
- Collections.sort(children);
- //對(duì)節(jié)點(diǎn)進(jìn)行截取例如 /lock0000000022 截取后就是 lock0000000022
- int i = children.indexOf(pathName.substring(1));
- //是不是第一個(gè),也就是說(shuō)是不是最小的
- if(i == 0){
- //是第一個(gè)
- System.out.println(threadName +" 現(xiàn)在我是最小的....");
- try {
- zk.setData("/",threadName.getBytes(),-1);
- cc.countDown();
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }else{
- //不是第一個(gè)
- //監(jiān)聽(tīng)前一個(gè)節(jié)點(diǎn) 看它是不是完成了工作進(jìn)行釋放鎖了
- zk.exists("/"+children.get(i-1),this,this,"sdf");
- }
- }
- @Override
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- //判斷是否失敗exists
- }
- }
TestLock
- import com.mxn.zookeeper.config.ZKUtils;
- import org.apache.zookeeper.ZooKeeper;
- import org.junit.After;
- import org.junit.Before;
- import org.junit.Test;
- /**
- * @program: mxnzookeeper
- * @ClassName TestLock
- * @description:
- * @author: 微信搜索:牧小農(nóng)
- * @create: 2021-10-23 10:45
- * @Version 1.0
- **/
- public class TestLock {
- ZooKeeper zk ;
- @Before
- public void conn (){
- zk = ZKUtils.getZK();
- }
- @After
- public void close (){
- try {
- zk.close();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- @Test
- public void lock(){
- //創(chuàng)建十個(gè)線程
- for (int i = 0; i < 10; i++) {
- new Thread(){
- @Override
- public void run() {
- WatchCallBack watchCallBack = new WatchCallBack();
- watchCallBack.setZk(zk);
- String threadName = Thread.currentThread().getName();
- watchCallBack.setThreadName(threadName);
- //線程進(jìn)行搶鎖操作
- watchCallBack.tryLock();
- try {
- //進(jìn)行業(yè)務(wù)邏輯處理
- System.out.println(threadName+" 開(kāi)始處理業(yè)務(wù)邏輯了...");
- Thread.sleep(200);
- }catch (Exception e){
- e.printStackTrace();
- }
- //釋放鎖
- watchCallBack.unLock();
- }
- }.start();
- }
- while(true){
- }
- }
- }
運(yùn)行結(jié)果:
- Thread-1 線程創(chuàng)建了一個(gè)節(jié)點(diǎn)為 : /lock0000000112
- Thread-5 線程創(chuàng)建了一個(gè)節(jié)點(diǎn)為 : /lock0000000113
- Thread-2 線程創(chuàng)建了一個(gè)節(jié)點(diǎn)為 : /lock0000000114
- Thread-6 線程創(chuàng)建了一個(gè)節(jié)點(diǎn)為 : /lock0000000115
- Thread-9 線程創(chuàng)建了一個(gè)節(jié)點(diǎn)為 : /lock0000000116
- Thread-4 線程創(chuàng)建了一個(gè)節(jié)點(diǎn)為 : /lock0000000117
- Thread-7 線程創(chuàng)建了一個(gè)節(jié)點(diǎn)為 : /lock0000000118
- Thread-3 線程創(chuàng)建了一個(gè)節(jié)點(diǎn)為 : /lock0000000119
- Thread-8 線程創(chuàng)建了一個(gè)節(jié)點(diǎn)為 : /lock0000000120
- Thread-0 線程創(chuàng)建了一個(gè)節(jié)點(diǎn)為 : /lock0000000121
- Thread-1 現(xiàn)在我是最小的....
- Thread-1 開(kāi)始處理業(yè)務(wù)邏輯了...
- Thread-1 結(jié)束工作了....
- Thread-5 現(xiàn)在我是最小的....
- Thread-5 開(kāi)始處理業(yè)務(wù)邏輯了...
- Thread-5 結(jié)束工作了....
- Thread-2 現(xiàn)在我是最小的....
- Thread-2 開(kāi)始處理業(yè)務(wù)邏輯了...
- Thread-2 結(jié)束工作了....
- Thread-6 現(xiàn)在我是最小的....
- Thread-6 開(kāi)始處理業(yè)務(wù)邏輯了...
- Thread-6 結(jié)束工作了....
- Thread-9 現(xiàn)在我是最小的....
- Thread-9 開(kāi)始處理業(yè)務(wù)邏輯了...
- Thread-9 結(jié)束工作了....
- Thread-4 現(xiàn)在我是最小的....
- Thread-4 開(kāi)始處理業(yè)務(wù)邏輯了...
- Thread-4 結(jié)束工作了....
- Thread-7 現(xiàn)在我是最小的....
- Thread-7 開(kāi)始處理業(yè)務(wù)邏輯了...
- Thread-7 結(jié)束工作了....
- Thread-3 現(xiàn)在我是最小的....
- Thread-3 開(kāi)始處理業(yè)務(wù)邏輯了...
- Thread-3 結(jié)束工作了....
- Thread-8 現(xiàn)在我是最小的....
- Thread-8 開(kāi)始處理業(yè)務(wù)邏輯了...
- Thread-8 結(jié)束工作了....
- Thread-0 現(xiàn)在我是最小的....
- Thread-0 開(kāi)始處理業(yè)務(wù)邏輯了...
- Thread-0 結(jié)束工作了....
總結(jié)
ZK分布式鎖,能夠有效的解決分布式、不可重入的問(wèn)題,在上面的案例中我, 沒(méi)有實(shí)現(xiàn)可重入鎖,但是實(shí)現(xiàn)起來(lái)也不麻煩,只需要帶上線程信息等唯一標(biāo)識(shí),判斷一下就可以了
ZK實(shí)現(xiàn)分布式鎖具有天然的優(yōu)勢(shì),臨時(shí)順序節(jié)點(diǎn),可以有效的避免死鎖問(wèn)題,讓客戶(hù)端斷開(kāi),那么就會(huì)刪除當(dāng)前臨時(shí)節(jié)點(diǎn),讓下一個(gè)節(jié)點(diǎn)進(jìn)行工作。
如果文中有錯(cuò)誤或者不了解的地方,歡迎留言,小農(nóng)看見(jiàn)了會(huì)第一時(shí)間回復(fù)大家,大家加油
我是牧小農(nóng),一個(gè)卑微的打工人,如果覺(jué)得文中的內(nèi)容對(duì)你有幫助,記得一鍵三連啊,你們的三連是小農(nóng)最大的動(dòng)力。