從Curator實(shí)現(xiàn)分布式鎖的源碼再到羊群效應(yīng)
一、前言
Curator是一款由Java編寫的,操作Zookeeper的客戶端工具,在其內(nèi)部封裝了分布式鎖、選舉等高級(jí)功能。
今天主要是分析其實(shí)現(xiàn)分布式鎖的主要原理,有關(guān)分布式鎖的一些介紹或其他實(shí)現(xiàn),有興趣的同學(xué)可以翻閱以下文章:
我用了上萬(wàn)字,走了一遍Redis實(shí)現(xiàn)分布式鎖的坎坷之路,從單機(jī)到主從再到多實(shí)例,原來(lái)會(huì)發(fā)生這么多的問(wèn)題_陽(yáng)陽(yáng)的博客-CSDN博客
Redisson可重入與鎖續(xù)期源碼分析_陽(yáng)陽(yáng)的博客-CSDN博客
在使用Curator獲取分布式鎖時(shí),Curator會(huì)在指定的path下創(chuàng)建一個(gè)有序的臨時(shí)節(jié)點(diǎn),如果該節(jié)點(diǎn)是最小的,則代表獲取鎖成功。
接下來(lái),在準(zhǔn)備工作中,我們可以觀察是否會(huì)創(chuàng)建出一個(gè)臨時(shí)節(jié)點(diǎn)出來(lái)。
二、準(zhǔn)備工作
首先我們需要搭建一個(gè)zookeeper集群,當(dāng)然你使用單機(jī)也行。
在這篇文章面試官:能給我畫個(gè)Zookeeper選舉的圖嗎?,介紹了一種使用docker-compose方式快速搭建zk集群的方式。
在pom中引入依賴:
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>2.12.0</version>
- </dependency>
Curator客戶端的配置項(xiàng):
- /**
- * @author qcy
- * @create 2022/01/01 22:59:34
- */
- @Configuration
- public class CuratorFrameworkConfig {
- //zk各節(jié)點(diǎn)地址
- private static final String CONNECT_STRING = "localhost:2181,localhost:2182,localhost:2183";
- //連接超時(shí)時(shí)間(單位:毫秒)
- private static final int CONNECTION_TIME_OUT_MS = 10 * 1000;
- //會(huì)話超時(shí)時(shí)間(單位:毫秒)
- private static final int SESSION_TIME_OUT_MS = 30 * 1000;
- //重試的初始等待時(shí)間(單位:毫秒)
- private static final int BASE_SLEEP_TIME_MS = 2 * 1000;
- //最大重試次數(shù)
- private static final int MAX_RETRIES = 3;
- @Bean
- public CuratorFramework getCuratorFramework() {
- CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
- .connectString(CONNECT_STRING)
- .connectionTimeoutMs(CONNECTION_TIME_OUT_MS)
- .sessionTimeoutMs(SESSION_TIME_OUT_MS)
- .retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES))
- .build();
- curatorFramework.start();
- return curatorFramework;
- }
- }
SESSION_TIME_OUT_MS參數(shù)則會(huì)保證,在某個(gè)客戶端獲取到鎖之后突然宕機(jī),zk能在該時(shí)間內(nèi)刪除當(dāng)前客戶端創(chuàng)建的臨時(shí)有序節(jié)點(diǎn)。
測(cè)試代碼如下:
- //臨時(shí)節(jié)點(diǎn)路徑,qcy是博主名字縮寫哈
- private static final String LOCK_PATH = "/lockqcy";
- @Resource
- CuratorFramework curatorFramework;
- public void testCurator() throws Exception {
- InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, LOCK_PATH);
- interProcessMutex.acquire();
- try {
- //模擬業(yè)務(wù)耗時(shí)
- Thread.sleep(30 * 1000);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- interProcessMutex.release();
- }
- }
當(dāng)使用接口調(diào)用該方法時(shí),在Thread.sleep處打上斷點(diǎn),進(jìn)入到zk容器中觀察創(chuàng)建出來(lái)的節(jié)點(diǎn)。
使用 docker exec -it zk容器名 /bin/bash 以交互模式進(jìn)入容器,接著使用 ./bin/zkCli.sh 連接到zk的server端。
然后使用 ls path 查看節(jié)點(diǎn)
這三個(gè)節(jié)點(diǎn)都是持久節(jié)點(diǎn),可以使用 get path 查看節(jié)點(diǎn)的數(shù)據(jù)結(jié)構(gòu)信息
若一個(gè)節(jié)點(diǎn)的ephemeralOwner值為0,即該節(jié)點(diǎn)的臨時(shí)擁有者的會(huì)話id為0,則代表該節(jié)點(diǎn)為持久節(jié)點(diǎn)。
當(dāng)走到斷點(diǎn)Thread.sleep時(shí),確實(shí)發(fā)現(xiàn)在lockqcy下創(chuàng)建出來(lái)一個(gè)臨時(shí)節(jié)點(diǎn)
到這里嗎,準(zhǔn)備工作已經(jīng)做完了,接下來(lái)分析interProcessMutex.acquire與release的流程
三、源碼分析
Curator支持多種類型的鎖,例如
- InterProcessMutex,可重入鎖排它鎖
- InterProcessReadWriteLock,讀寫鎖
- InterProcessSemaphoreMutex,不可重入排它鎖
今天主要是分析InterProcessMutex的加解鎖過(guò)程,先看加鎖過(guò)程
加鎖
- public void acquire() throws Exception {
- if (!internalLock(-1, null)) {
- throw new IOException("Lost connection while trying to acquire lock: " + basePath);
- }
- }
這里是阻塞式獲取鎖,獲取不到鎖,就一直進(jìn)行阻塞。所以對(duì)于internalLock方法,超時(shí)時(shí)間設(shè)置為-1,時(shí)間單位設(shè)置成null。
- private boolean internalLock(long time, TimeUnit unit) throws Exception {
- Thread currentThread = Thread.currentThread();
- //通過(guò)能否在map中取到該線程的LockData信息,來(lái)判斷該線程是否已經(jīng)持有鎖
- LockData lockData = threadData.get(currentThread);
- if (lockData != null) {
- //進(jìn)行可重入,直接返回加鎖成功
- lockData.lockCount.incrementAndGet();
- return true;
- }
- //進(jìn)行加鎖
- String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
- if (lockPath != null) {
- //加鎖成功,保存到map中
- LockData newLockData = new LockData(currentThread, lockPath);
- threadData.put(currentThread, newLockData);
- return true;
- }
- return false;
- }
其中threadData是一個(gè)map,key線程對(duì)象,value為該線程綁定的鎖數(shù)據(jù)。
LockData中保存了加鎖線程owningThread,重入計(jì)數(shù)lockCount與加鎖路徑lockPath,例如
- /lockqcy/_c_c46513c3-ace0-405f-aa1e-a531ce28fb47-lock-0000000005
- private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
- private static class LockData {
- final Thread owningThread;
- final String lockPath;
- final AtomicInteger lockCount = new AtomicInteger(1);
- private LockData(Thread owningThread, String lockPath) {
- this.owningThread = owningThread;
- this.lockPath = lockPath;
- }
- }
進(jìn)入到internals.attemptLock方法中
- String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
- //開(kāi)始時(shí)間
- final long startMillis = System.currentTimeMillis();
- //將超時(shí)時(shí)間統(tǒng)一轉(zhuǎn)化為毫秒單位
- final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
- //節(jié)點(diǎn)數(shù)據(jù),這里為null
- final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
- //重試次數(shù)
- int retryCount = 0;
- //鎖路徑
- String ourPath = null;
- //是否獲取到鎖
- boolean hasTheLock = false;
- //是否完成
- boolean isDone = false;
- while (!isDone) {
- isDone = true;
- try {
- //創(chuàng)建一個(gè)臨時(shí)有序節(jié)點(diǎn),并返回節(jié)點(diǎn)路徑
- //內(nèi)部調(diào)用client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
- ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
- //依據(jù)返回的節(jié)點(diǎn)路徑,判斷是否搶到了鎖
- hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
- } catch (KeeperException.NoNodeException e) {
- //在會(huì)話過(guò)期時(shí),可能導(dǎo)致driver找不到臨時(shí)有序節(jié)點(diǎn),從而拋出NoNodeException
- //這里就進(jìn)行重試
- if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
- isDone = false;
- } else {
- throw e;
- }
- }
- }
- //獲取到鎖,則返回節(jié)點(diǎn)路徑,供調(diào)用方記錄到map中
- if (hasTheLock) {
- return ourPath;
- }
- return null;
- }
接下來(lái),將會(huì)在internalLockLoop中利用剛才創(chuàng)建出來(lái)的臨時(shí)有序節(jié)點(diǎn),判斷是否獲取到了鎖。
- private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
- //是否獲取到鎖
- boolean haveTheLock = false;
- boolean doDelete = false;
- try {
- if (revocable.get() != null) {
- //當(dāng)前不會(huì)進(jìn)入這里
- client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
- }
- //一直嘗試獲取鎖
- while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
- //返回basePath(這里是lockqcy)下所有的臨時(shí)有序節(jié)點(diǎn),并且按照后綴從小到大排列
- List<String> children = getSortedChildren();
- //取出當(dāng)前線程創(chuàng)建出來(lái)的臨時(shí)有序節(jié)點(diǎn)的名稱,這里就是/_c_c46513c3-ace0-405f-aa1e-a531ce28fb47-lock-0000000005
- String sequenceNodeName = ourPath.substring(basePath.length() + 1);
- //判斷當(dāng)前節(jié)點(diǎn)是否處于排序后的首位,如果處于首位,則代表獲取到了鎖
- PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
- if (predicateResults.getsTheLock()) {
- //獲取到鎖之后,則終止循環(huán)
- haveTheLock = true;
- } else {
- //這里代表沒(méi)有獲取到鎖
- //獲取比當(dāng)前節(jié)點(diǎn)索引小的前一個(gè)節(jié)點(diǎn)
- String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
- synchronized (this) {
- try {
- //如果前一個(gè)節(jié)點(diǎn)不存在,則直接拋出NoNodeException,catch中不進(jìn)行處理,在下一輪中繼續(xù)獲取鎖
- //如果前一個(gè)節(jié)點(diǎn)存在,則給它設(shè)置一個(gè)監(jiān)聽(tīng)器,監(jiān)聽(tīng)它的釋放事件
- client.getData().usingWatcher(watcher).forPath(previousSequencePath);
- if (millisToWait != null) {
- millisToWait -= (System.currentTimeMillis() - startMillis);
- startMillis = System.currentTimeMillis();
- //判斷是否超時(shí)
- if (millisToWait <= 0) {
- //獲取鎖超時(shí),刪除剛才創(chuàng)建的臨時(shí)有序節(jié)點(diǎn)
- doDelete = true;
- break;
- }
- //沒(méi)超時(shí)的話,在millisToWait內(nèi)進(jìn)行等待
- wait(millisToWait);
- } else {
- //無(wú)限期阻塞等待,監(jiān)聽(tīng)到前一個(gè)節(jié)點(diǎn)被刪除時(shí),才會(huì)觸發(fā)喚醒操作
- wait();
- }
- } catch (KeeperException.NoNodeException e) {
- //如果前一個(gè)節(jié)點(diǎn)不存在,則直接拋出NoNodeException,catch中不進(jìn)行處理,在下一輪中繼續(xù)獲取鎖
- }
- }
- }
- }
- } catch (Exception e) {
- ThreadUtils.checkInterrupted(e);
- doDelete = true;
- throw e;
- } finally {
- if (doDelete) {
- //刪除剛才創(chuàng)建出來(lái)的臨時(shí)有序節(jié)點(diǎn)
- deleteOurPath(ourPath);
- }
- }
- return haveTheLock;
- }
判斷是否獲取到鎖的核心邏輯位于getsTheLock中
- public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
- //獲取當(dāng)前節(jié)點(diǎn)在所有子節(jié)點(diǎn)排序后的索引位置
- int ourIndex = children.indexOf(sequenceNodeName);
- //判斷當(dāng)前節(jié)點(diǎn)是否處于子節(jié)點(diǎn)中
- validateOurIndex(sequenceNodeName, ourIndex);
- //InterProcessMutex的構(gòu)造方法,會(huì)將maxLeases初始化為1
- //ourIndex必須為0,才能使得getsTheLock為true,也就是說(shuō),當(dāng)前節(jié)點(diǎn)必須是basePath下的最小節(jié)點(diǎn),才能代表獲取到了鎖
- boolean getsTheLock = ourIndex < maxLeases;
- //如果獲取不到鎖,則返回上一個(gè)節(jié)點(diǎn)的名稱,用作對(duì)其設(shè)置監(jiān)聽(tīng)
- String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
- return new PredicateResults(pathToWatch, getsTheLock);
- }
- static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException {
- if (ourIndex < 0) {
- //可能會(huì)由于連接丟失導(dǎo)致臨時(shí)節(jié)點(diǎn)被刪除,因此這里屬于保險(xiǎn)措施
- throw new KeeperException.NoNodeException("Sequential path not found: " + sequenceNodeName);
- }
- }
那什么時(shí)候,在internalLockLoop處于wait的線程能被喚醒呢?
在internalLockLoop方法中,已經(jīng)使用
- client.getData().usingWatcher(watcher).forPath(previousSequencePath);
給前一個(gè)節(jié)點(diǎn)設(shè)置了監(jiān)聽(tīng)器,當(dāng)該節(jié)點(diǎn)被刪除時(shí),將會(huì)觸發(fā)watcher中的回調(diào)
- private final Watcher watcher = new Watcher() {
- //回調(diào)方法
- @Override
- public void process(WatchedEvent event) {
- notifyFromWatcher();
- }
- };
- private synchronized void notifyFromWatcher() {
- //喚醒所以在LockInternals實(shí)例上等待的線程
- notifyAll();
- }
到這里,基本上已經(jīng)分析完加鎖的過(guò)程了,在這里總結(jié)下:
首先創(chuàng)建一個(gè)臨時(shí)有序節(jié)點(diǎn)
如果該節(jié)點(diǎn)是basePath下最小節(jié)點(diǎn),則代表獲取到了鎖,存入map中,下次直接進(jìn)行重入。
如果該節(jié)點(diǎn)不是最小節(jié)點(diǎn),則對(duì)前一個(gè)節(jié)點(diǎn)設(shè)置監(jiān)聽(tīng),接著進(jìn)行wait等待。當(dāng)前一個(gè)節(jié)點(diǎn)被刪除時(shí),將會(huì)通知notify該線程。
解鎖
解鎖的邏輯,就比較簡(jiǎn)單了,直接進(jìn)入release方法中
- public void release() throws Exception {
- Thread currentThread = Thread.currentThread();
- LockData lockData = threadData.get(currentThread);
- if (lockData == null) {
- throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
- }
- int newLockCount = lockData.lockCount.decrementAndGet();
- //直接減少一次重入次數(shù)
- if (newLockCount > 0) {
- return;
- }
- if (newLockCount < 0) {
- throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
- }
- //到這里代表重入次數(shù)為0
- try {
- //釋放鎖
- internals.releaseLock(lockData.lockPath);
- } finally {
- //從map中移除
- threadData.remove(currentThread);
- }
- }
- void releaseLock(String lockPath) throws Exception {
- revocable.set(null);
- //內(nèi)部使用guaranteed,會(huì)在后臺(tái)不斷嘗試刪除節(jié)點(diǎn)
- deleteOurPath(lockPath);
- }
重入次數(shù)大于0,就減少重入次數(shù)。當(dāng)減為0時(shí),調(diào)用zk去刪除節(jié)點(diǎn),這一點(diǎn)和Redisson可重入鎖釋放時(shí)一致。
四、羊群效應(yīng)
在這里談?wù)勈褂肸ookeeper實(shí)現(xiàn)分布式鎖場(chǎng)景中的羊群效應(yīng)
什么是羊群效應(yīng)
首先,羊群是一種很散亂的組織,漫無(wú)目的,缺少管理,一般需要牧羊犬來(lái)幫助主人控制羊群。
某個(gè)時(shí)候,當(dāng)其中一只羊發(fā)現(xiàn)前面有更加美味的草而動(dòng)起來(lái),就會(huì)導(dǎo)致其余的羊一哄而上,根本不管周圍的情況。
所以羊群效應(yīng),指的是一個(gè)人在進(jìn)行理性的行為后,導(dǎo)致其余人直接盲從,產(chǎn)生非理性的從眾行為。
而Zookeeper中的羊群效應(yīng),則是指一個(gè)znode被改變后,觸發(fā)了大量本可以被避免的watch通知,造成集群資源的浪費(fèi)。
獲取不到鎖時(shí)的等待演化
sleep一段時(shí)間
如果某個(gè)線程在獲取鎖失敗后,完全可以sleep一段時(shí)間,再嘗試獲取鎖。
但這樣的方式,效率極低。
sleep時(shí)間短的話,會(huì)頻繁地進(jìn)行輪詢,浪費(fèi)資源。
sleep時(shí)間長(zhǎng)的話,會(huì)出現(xiàn)鎖被釋放但仍然獲取不到鎖的尷尬情況。
所以,這里的優(yōu)化點(diǎn),在于如何變主動(dòng)輪詢?yōu)楫惒酵ㄖ?/p>
watch被鎖住的節(jié)點(diǎn)
所有的客戶端要獲取鎖時(shí),只去創(chuàng)建一個(gè)同名的node。
當(dāng)znode存在時(shí),這些客戶端對(duì)其設(shè)置監(jiān)聽(tīng)。當(dāng)znode被刪除后,通知所有等待鎖的客戶端,接著這些客戶端再次嘗試獲取鎖。
雖然這里使用watch機(jī)制來(lái)異步通知,可是當(dāng)客戶端的數(shù)量特別多時(shí),會(huì)存在性能低點(diǎn)。
當(dāng)znode被刪除后,在這一瞬間,需要給大量的客戶端發(fā)送通知。在此期間,其余提交給zk的正常請(qǐng)求可能會(huì)被延遲或者阻塞。
這就產(chǎn)生了羊群效應(yīng),一個(gè)點(diǎn)的變化(znode被刪除),造成了全面的影響(通知大量的客戶端)。
所以,這里的優(yōu)化點(diǎn),在于如何減少對(duì)一個(gè)znode的監(jiān)聽(tīng)數(shù)量,最好的情況是只有一個(gè)。
watch前一個(gè)有序節(jié)點(diǎn)
如果先指定一個(gè)basePath,想要獲取鎖的客戶端,直接在該路徑下創(chuàng)建臨時(shí)有序節(jié)點(diǎn)。
當(dāng)創(chuàng)建的節(jié)點(diǎn)是最小節(jié)點(diǎn)時(shí),代表獲取到了鎖。如果不是最小的節(jié)點(diǎn),則只對(duì)前一個(gè)節(jié)點(diǎn)設(shè)置監(jiān)聽(tīng)器,只監(jiān)聽(tīng)前一個(gè)節(jié)點(diǎn)的刪除行為。
這樣前一個(gè)節(jié)點(diǎn)被刪除時(shí),只會(huì)給下一個(gè)節(jié)點(diǎn)代表的客戶端發(fā)送通知,不會(huì)給所有客戶端發(fā)送通知,從而避免了羊群效應(yīng)。
在避免羊群效應(yīng)的同時(shí),使得當(dāng)前鎖成為公平鎖。即按照申請(qǐng)鎖的先后順序獲得鎖,避免存在饑餓過(guò)度的線程。
五、后語(yǔ)
本文從源碼角度講解了使用Curator獲取分布式鎖的流程,接著從等待鎖的演化過(guò)程角度出發(fā),分析了Zookeeper在分布式鎖場(chǎng)景下避免羊群效應(yīng)的解決方案。
這是Zookeeper系列的第二篇,關(guān)于其watch原理分析、zab協(xié)議等文章也在安排的路上了。