自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

面試官問用Zookeeper怎么實現(xiàn)分布式鎖,你知道嗎???

開發(fā) 前端
因為每次在創(chuàng)建鎖和釋放鎖的過程中,都要動態(tài)創(chuàng)建、銷毀瞬時節(jié)點來實現(xiàn)鎖功能。大家知道,ZK中創(chuàng)建和刪除節(jié)點只能通過Leader?服務(wù)器來執(zhí)行,然后Leader服務(wù)器還需要將數(shù)據(jù)同不到所有的Follower機器上,這樣頻繁的網(wǎng)絡(luò)通信,性能的短板是非常突出的。

?概述

提到鎖,想必大家可能最先想到的是Java JUC中的synchronized?關(guān)鍵字或者可重入鎖ReentrantLock。它能夠保證我們的代碼在同一個時刻只有一個線程執(zhí)行,保證數(shù)據(jù)的一致性和完整性。但是它僅限于單體項目,也就是說它們只能保證單個JVM應(yīng)用內(nèi)線程的順序執(zhí)行。

如果你部署了多個節(jié)點,也就是分布式場景下如何保證不同節(jié)點在同一時刻只有一個線程執(zhí)行呢?場景的業(yè)務(wù)場景比如秒殺、搶優(yōu)惠券等,這就引入了我們的分布式鎖,本文我們主要講解利用Zookeeper的特性如何來實現(xiàn)我們的分布式鎖。

Zookeeper分布式鎖實現(xiàn)原理

利用Zookeeper的臨時順序節(jié)點和監(jiān)聽機制兩大特性,可以幫助我們實現(xiàn)分布式鎖。

圖片

  1. 首先得有一個持久節(jié)點/locks, 路徑服務(wù)于某個使用場景,如果有多個使用場景建議路徑不同。
  2. 請求進來時首先在/locks創(chuàng)建臨時有序節(jié)點,所有會看到在/locks下面有seq-000000000, seq-00000001 等等節(jié)點。
  3. 然后判斷當(dāng)前創(chuàng)建得節(jié)點是不是/locks路徑下面最小的節(jié)點,如果是,獲取鎖,不是,阻塞線程,同時設(shè)置監(jiān)聽器,監(jiān)聽前一個節(jié)點。
  4. 獲取到鎖以后,開始處理業(yè)務(wù)邏輯,最后delete當(dāng)前節(jié)點,表示釋放鎖。
  5. 后一個節(jié)點就會收到通知,喚起線程,重復(fù)上面的判斷。

大家有沒有想過為什么要設(shè)置對前一個節(jié)點的監(jiān)聽?

主要為了避免羊群效應(yīng)。所謂羊群效應(yīng)就是一個節(jié)點掛掉,所有節(jié)點都去監(jiān)聽,然后做出反應(yīng),這樣會給服務(wù)器帶來巨大壓力,所以有了臨時順序節(jié)點,當(dāng)一個節(jié)點掛掉,只有它后面的那一個節(jié)點才做出反應(yīng)。

原生Zookeeper客戶端實現(xiàn)分布式鎖

通過原生zookeeper api方式的實現(xiàn),可以加強我們對zk實現(xiàn)分布式鎖原理的理解。

public class DistributedLock {

private String connectString = "10.100.1.176:2281";

private int sessionTimeout = 2000;

private ZooKeeper zk;

private String rootNode = "lock";

private String subNode = "seq-";

private String waitPath;

// 當(dāng)前client創(chuàng)建的子節(jié)點
private String currentNode;

private CountDownLatch countDownLatch = new CountDownLatch(1);

private CountDownLatch waitDownLatch = new CountDownLatch(1);

public DistributedLock() throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 如果連接建立時,喚醒 wait 在該 latch 上的線程
if(event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}

// 發(fā)生了 waitPath 的刪除事件
if(event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
waitDownLatch.countDown();
}
}
});

// 等待連接建立,因為連接建立時異步過程
countDownLatch.await();
// 獲取根節(jié)點
Stat stat = zk.exists("/" + rootNode, false);
// 如果根節(jié)點不存在,則創(chuàng)建根節(jié)點
if(stat == null) {
System.out.println("創(chuàng)建根節(jié)點");
zk.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}

public void zkLock() {
try {
// 在根節(jié)點創(chuàng)建臨時順序節(jié)點
currentNode = zk.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

// 獲取子節(jié)點
List<String> childrenNodes = zk.getChildren("/" + rootNode, false);
// 如果只有一個子節(jié)點,說明是當(dāng)前節(jié)點,直接獲得鎖
if(childrenNodes.size() == 1) {
return;
} else {
//對根節(jié)點下的所有臨時順序節(jié)點進行從小到大排序
Collections.sort(childrenNodes);
//當(dāng)前節(jié)點名稱
String thisNode = currentNode.substring(("/" + rootNode + "/").length());
//獲取當(dāng)前節(jié)點的位置
int index = childrenNodes.indexOf(thisNode);
if (index == -1) {
System.out.println("數(shù)據(jù)異常");
} else if (index == 0) {
// index == 0, 說明 thisNode 在列表中最小, 當(dāng)前client 獲得鎖
return;
} else {
// 獲得排名比 currentNode 前 1 位的節(jié)點
this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
// 在 waitPath節(jié)點上注冊監(jiān)聽器, 當(dāng) waitPath 被刪除時,zookeeper 會回調(diào)監(jiān)聽器的 process 方法
zk.getData(waitPath, true, new Stat());
//進入等待鎖狀態(tài)
waitDownLatch.await();
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void zkUnlock() {
try {
zk.delete(this.currentNode, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}

測試代碼如下:

public class DistributedLockTest {

public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributedLock lock1 = new DistributedLock();
DistributedLock lock2 = new DistributedLock();

new Thread(() -> {
// 獲取鎖對象
try {
lock1.zkLock();
System.out.println("線程 1 獲取鎖");
Thread.sleep(5 * 1000);
System.out.println("線程 1 釋放鎖");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock1.zkUnlock();
}
}).start();

new Thread(() -> {
// 獲取鎖對象
try {
lock2.zkLock();
System.out.println("線程 2 獲取鎖");
Thread.sleep(5 * 1000);

System.out.println("線程 2 釋放鎖");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock2.zkUnlock();
}
}).start();
}
}

測試結(jié)果:

線程 2 獲取鎖
線程 2 釋放鎖
線程 1 獲取鎖
線程 1 釋放鎖

獲取鎖和釋放鎖成對出現(xiàn),說明分布式鎖生效了。

Curator框架實現(xiàn)分布式鎖

在實際的開發(fā)鐘,我們會直接使用成熟的框架Curator客戶端,它里面封裝了分布式鎖的實現(xiàn),避免我們?nèi)ブ貜?fù)造輪子。

pom.xml添加如下依賴

 <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.1</version>
</dependency>

通過InterProcessLock實現(xiàn)分布式鎖

public class CuratorLockTest {

private String connectString = "10.100.1.14:2181";

private String rootNode = "/locks";

public static void main(String[] args) {

new CuratorLockTest().testLock();

}

public void testLock() {
// 分布式鎖1
InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), rootNode);
// 分布式鎖2
InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), rootNode);
// 第一個線程
new Thread(() -> {
// 獲取鎖對象
try {
lock1.acquire();
System.out.println("線程 1 獲取鎖");
// 測試鎖重入
lock1.acquire();
System.out.println("線程 1 再次獲取鎖");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("線程 1 釋放鎖");
lock1.release();
System.out.println("線程 1 再次釋放鎖");
} catch (Exception e) {
e.printStackTrace();
}
}).start();

// 第二個線程
new Thread(() -> {
// 獲取鎖對象
try {
lock2.acquire();
System.out.println("線程 2 獲取鎖");
// 測試鎖重入
lock2.acquire();
System.out.println("線程 2 再次獲取鎖");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("線程 2 釋放鎖");
lock2.release();
System.out.println("線程 2 再次釋放鎖");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}

public CuratorFramework getCuratorFramework() {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectString).connectionTimeoutMs(2000)
.sessionTimeoutMs(2000)
.retryPolicy(new ExponentialBackoffRetry(3000, 3)).build();
// 連接
client.start();
System.out.println("zookeeper 初始化完成...");
return client;
}
}

結(jié)果展示

線程 1 釋放鎖
線程 1 再次釋放鎖
線程 2 獲取鎖
線程 2 再次獲取鎖
線程 2 釋放鎖
線程 2 再次釋放鎖

有興趣的看下源碼,它是通過wait、notify來實現(xiàn)阻塞。

代碼: https://github.com/alvinlkk/awesome-java-full-demo/tree/master/zookeeper-demo/zookeeper-lock

總結(jié)

ZooKeeper?分布式鎖(如InterProcessMutex),能有效的解決分布式鎖問題,但是性能并不高。

因為每次在創(chuàng)建鎖和釋放鎖的過程中,都要動態(tài)創(chuàng)建、銷毀瞬時節(jié)點來實現(xiàn)鎖功能。大家知道,ZK中創(chuàng)建和刪除節(jié)點只能通過Leader?服務(wù)器來執(zhí)行,然后Leader服務(wù)器還需要將數(shù)據(jù)同不到所有的Follower機器上,這樣頻繁的網(wǎng)絡(luò)通信,性能的短板是非常突出的。

在高性能,高并發(fā)的場景下,不建議使用ZooKeeper?的分布式鎖,可以使用Redis?的分布式鎖。而由于ZooKeeper?的高可用特性,所以在并發(fā)量不是太高的場景,推薦使用ZooKeeper的分布式鎖。

責(zé)任編輯:武曉燕 來源: JAVA旭陽
相關(guān)推薦

2023-11-10 08:44:13

分布式鎖分布式系統(tǒng)

2024-09-24 16:30:46

分布式鎖Redis數(shù)據(jù)中間件

2022-08-11 18:27:50

面試Redis分布式鎖

2024-02-22 17:02:09

IDUUID雪花算法

2020-12-15 10:20:24

分布式鎖RedisZookeeper

2021-10-09 11:34:59

MySQL分布式鎖庫存

2025-03-05 00:00:00

RTKRedux開發(fā)

2021-10-25 10:21:59

ZK分布式鎖ZooKeeper

2019-07-16 09:22:10

RedisZookeeper分布式鎖

2020-11-16 12:55:41

Redis分布式鎖Zookeeper

2021-02-28 07:49:28

Zookeeper分布式

2022-11-28 17:01:49

接口分布式鎖

2024-09-29 09:12:47

分布式系統(tǒng)性能

2018-02-06 09:06:03

主流分布式存儲系統(tǒng)

2022-07-06 08:01:05

數(shù)據(jù)庫分布式

2021-06-03 08:55:54

分布式事務(wù)ACID

2022-10-27 10:44:14

分布式Zookeeper

2020-09-27 06:52:22

分布式存儲服務(wù)器

2022-01-06 10:58:07

Redis數(shù)據(jù)分布式鎖

2024-07-19 08:14:21

點贊
收藏

51CTO技術(shù)棧公眾號