基于Zookeeper的分布式鎖
這篇文章只需要你10分鐘的時間。
實現(xiàn)分布式鎖目前有三種流行方案,分別為基于數據庫、Redis、Zookeeper的方案,其中前兩種方案網絡上有很多資料可以參考,本文不做展開。我們來看下使用Zookeeper如何實現(xiàn)分布式鎖。
什么是Zookeeper?
Zookeeper(業(yè)界簡稱zk)是一種提供配置管理、分布式協(xié)同以及命名的中心化服務,這些提供的功能都是分布式系統(tǒng)中非常底層且必不可少的基本功能,但是如果自己實現(xiàn)這些功能而且要達到高吞吐、低延遲同時還要保持一致性和可用性,實際上非常困難。因此zookeeper提供了這些功能,開發(fā)者在zookeeper之上構建自己的各種分布式系統(tǒng)。
雖然zookeeper的實現(xiàn)比較復雜,但是它提供的模型抽象卻是非常簡單的。Zookeeper提供一個多層級的節(jié)點命名空間(節(jié)點稱為znode),每個節(jié)點都用一個以斜杠(/)分隔的路徑表示,而且每個節(jié)點都有父節(jié)點(根節(jié)點除外),非常類似于文件系統(tǒng)。例如,/foo/doo這個表示一個znode,它的父節(jié)點為/foo,父父節(jié)點為/,而/為根節(jié)點沒有父節(jié)點。與文件系統(tǒng)不同的是,這些節(jié)點都可以設置關聯(lián)的數據,而文件系統(tǒng)中只有文件節(jié)點可以存放數據而目錄節(jié)點不行。Zookeeper為了保證高吞吐和低延遲,在內存中維護了這個樹狀的目錄結構,這種特性使得Zookeeper不能用于存放大量的數據,每個節(jié)點的存放數據上限為1M。
而為了保證高可用,zookeeper需要以集群形態(tài)來部署,這樣只要集群中大部分機器是可用的(能夠容忍一定的機器故障),那么zookeeper本身仍然是可用的??蛻舳嗽谑褂脄ookeeper時,需要知道集群機器列表,通過與集群中的某一臺機器建立TCP連接來使用服務,客戶端使用這個TCP鏈接來發(fā)送請求、獲取結果、獲取監(jiān)聽事件以及發(fā)送心跳包。如果這個連接異常斷開了,客戶端可以連接到另外的機器上。
架構簡圖如下所示:
客戶端的讀請求可以被集群中的任意一臺機器處理,如果讀請求在節(jié)點上注冊了監(jiān)聽器,這個監(jiān)聽器也是由所連接的zookeeper機器來處理。對于寫請求,這些請求會同時發(fā)給其他zookeeper機器并且達成一致后,請求才會返回成功。因此,隨著zookeeper的集群機器增多,讀請求的吞吐會提高但是寫請求的吞吐會下降。
有序性是zookeeper中非常重要的一個特性,所有的更新都是全局有序的,每個更新都有一個唯一的時間戳,這個時間戳稱為zxid(Zookeeper Transaction Id)。而讀請求只會相對于更新有序,也就是讀請求的返回結果中會帶有這個zookeeper***的zxid。
如何使用zookeeper實現(xiàn)分布式鎖?
在描述算法流程之前,先看下zookeeper中幾個關于節(jié)點的有趣的性質:
- 有序節(jié)點:假如當前有一個父節(jié)點為/lock,我們可以在這個父節(jié)點下面創(chuàng)建子節(jié)點;zookeeper提供了一個可選的有序特性,例如我們可以創(chuàng)建子節(jié)點“/lock/node-”并且指明有序,那么zookeeper在生成子節(jié)點時會根據當前的子節(jié)點數量自動添加整數序號,也就是說如果是***個創(chuàng)建的子節(jié)點,那么生成的子節(jié)點為/lock/node-0000000000,下一個節(jié)點則為/lock/node-0000000001,依次類推。
- 臨時節(jié)點:客戶端可以建立一個臨時節(jié)點,在會話結束或者會話超時后,zookeeper會自動刪除該節(jié)點。
- 事件監(jiān)聽:在讀取數據時,我們可以同時對節(jié)點設置事件監(jiān)聽,當節(jié)點數據或結構變化時,zookeeper會通知客戶端。當前zookeeper有如下四種事件:1)節(jié)點創(chuàng)建;2)節(jié)點刪除;3)節(jié)點數據修改;4)子節(jié)點變更。
下面描述使用zookeeper實現(xiàn)分布式鎖的算法流程,假設鎖空間的根節(jié)點為/lock:
- 客戶端連接zookeeper,并在/lock下創(chuàng)建 臨時的 且 有序的 子節(jié)點,***個客戶端對應的子節(jié)點為/lock/lock-0000000000,第二個為/lock/lock-0000000001,以此類推。
- 客戶端獲取/lock下的子節(jié)點列表,判斷自己創(chuàng)建的子節(jié)點是否為當前子節(jié)點列表中 序號最小 的子節(jié)點,如果是則認為獲得鎖,否則監(jiān)聽/lock的子節(jié)點變更消息,獲得子節(jié)點變更通知后重復此步驟直至獲得鎖;
- 執(zhí)行業(yè)務代碼;
- 完成業(yè)務流程后,刪除對應的子節(jié)點釋放鎖。
步驟1中創(chuàng)建的臨時節(jié)點能夠保證在故障的情況下鎖也能被釋放,考慮這么個場景:假如客戶端a當前創(chuàng)建的子節(jié)點為序號最小的節(jié)點,獲得鎖之后客戶端所在機器宕機了,客戶端沒有主動刪除子節(jié)點;如果創(chuàng)建的是***的節(jié)點,那么這個鎖永遠不會釋放,導致死鎖;由于創(chuàng)建的是臨時節(jié)點,客戶端宕機后,過了一定時間zookeeper沒有收到客戶端的心跳包判斷會話失效,將臨時節(jié)點刪除從而釋放鎖。
另外細心的朋友可能會想到,在步驟2中獲取子節(jié)點列表與設置監(jiān)聽這兩步操作的原子性問題,考慮這么個場景:客戶端a對應子節(jié)點為/lock/lock-0000000000,客戶端b對應子節(jié)點為/lock/lock-0000000001,客戶端b獲取子節(jié)點列表時發(fā)現(xiàn)自己不是序號最小的,但是在設置監(jiān)聽器前客戶端a完成業(yè)務流程刪除了子節(jié)點/lock/lock-0000000000,客戶端b設置的監(jiān)聽器豈不是丟失了這個事件從而導致永遠等待了?這個問題不存在的。因為zookeeper提供的API中設置監(jiān)聽器的操作與讀操作是 原子執(zhí)行 的,也就是說在讀子節(jié)點列表時同時設置監(jiān)聽器,保證不會丟失事件。
***,對于這個算法有個極大的優(yōu)化點:假如當前有1000個節(jié)點在等待鎖,如果獲得鎖的客戶端釋放鎖時,這1000個客戶端都會被喚醒,這種情況稱為“羊群效應”;在這種羊群效應中,zookeeper需要通知1000個客戶端,這會阻塞其他的操作,***的情況應該只喚醒新的最小節(jié)點對應的客戶端。應該怎么做呢?在設置事件監(jiān)聽時,每個客戶端應該對剛好在它之前的子節(jié)點設置事件監(jiān)聽,例如子節(jié)點列表為/lock/lock-0000000000、/lock/lock-0000000001、/lock/lock-0000000002,序號為1的客戶端監(jiān)聽序號為0的子節(jié)點刪除消息,序號為2的監(jiān)聽序號為1的子節(jié)點刪除消息。
所以調整后的分布式鎖算法流程如下:
- 客戶端連接zookeeper,并在/lock下創(chuàng)建 臨時的 且 有序的 子節(jié)點,***個客戶端對應的子節(jié)點為/lock/lock-0000000000,第二個為/lock/lock-0000000001,以此類推。
- 客戶端獲取/lock下的子節(jié)點列表,判斷自己創(chuàng)建的子節(jié)點是否為當前子節(jié)點列表中 序號最小 的子節(jié)點,如果是則認為獲得鎖,否則 監(jiān)聽剛好在自己之前一位的子節(jié)點刪除消息 ,獲得子節(jié)點變更通知后重復此步驟直至獲得鎖;
- 執(zhí)行業(yè)務代碼;
- 完成業(yè)務流程后,刪除對應的子節(jié)點釋放鎖。
Curator的源碼分析
雖然zookeeper原生客戶端暴露的API已經非常簡潔了,但是實現(xiàn)一個分布式鎖還是比較麻煩的…我們可以直接使用 curator 這個開源項目提供的zookeeper分布式鎖實現(xiàn)。
我們只需要引入下面這個包(基于maven):
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>4.0.0</version>
- </dependency>
然后就可以用啦!代碼如下:
- public static void main(String[] args) throws Exception {
- //創(chuàng)建zookeeper的客戶端
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- CuratorFramework client = CuratorFrameworkFactory.newClient("10.21.41.181:2181,10.21.42.47:2181,10.21.49.252:2181", retryPolicy);
- client.start();
- //創(chuàng)建分布式鎖, 鎖空間的根節(jié)點路徑為/curator/lock
- InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
- mutex.acquire();
- //獲得了鎖, 進行業(yè)務流程
- System.out.println("Enter mutex");
- //完成業(yè)務流程, 釋放鎖
- mutex.release();
- //關閉客戶端
- client.close();
- }
可以看到關鍵的核心操作就只有mutex.acquire()和mutex.release(),簡直太方便了!
下面來分析下獲取鎖的源碼實現(xiàn)。acquire的方法如下:
- /*
- * 獲取鎖,當鎖被占用時會阻塞等待,這個操作支持同線程的可重入(也就是重復獲取鎖),acquire的次數需要與release的次數相同。
- * @throws Exception ZK errors, connection interruptions
- */
- @Override
- public void acquire() throws Exception
- {
- if ( !internalLock(-1, null) )
- {
- throw new IOException("Lost connection while trying to acquire lock: " + basePath);
- }
- }
這里有個地方需要注意,當與zookeeper通信存在異常時,acquire會直接拋出異常,需要使用者自身做重試策略。代碼中調用了internalLock(-1, null),參數表明在鎖被占用時***阻塞等待。internalLock的代碼如下:
- private boolean internalLock(long time, TimeUnit unit) throws Exception
- {
- //這里處理同線程的可重入性,如果已經獲得鎖,那么只是在對應的數據結構中增加acquire的次數統(tǒng)計,直接返回成功
- Thread currentThread = Thread.currentThread();
- LockData lockData = threadData.get(currentThread);
- if ( lockData != null )
- {
- // re-entering
- lockData.lockCount.incrementAndGet();
- return true;
- }
- //這里才真正去zookeeper中獲取鎖
- String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
- if ( lockPath != null )
- {
- //獲得鎖之后,記錄當前的線程獲得鎖的信息,在重入時只需在LockData中增加次數統(tǒng)計即可
- LockData newLockData = new LockData(currentThread, lockPath);
- threadData.put(currentThread, newLockData);
- return true;
- }
- //在阻塞返回時仍然獲取不到鎖,這里上下文的處理隱含的意思為zookeeper通信異常
- return false;
- }
代碼中增加了具體注釋,不做展開。看下zookeeper獲取鎖的具體實現(xiàn):
- String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
- {
- //參數初始化,此處省略
- //...
- //自旋獲取鎖
- while ( !isDone )
- {
- isDone = true;
- try
- {
- //在鎖空間下創(chuàng)建臨時且有序的子節(jié)點
- ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
- //判斷是否獲得鎖(子節(jié)點序號最?。?,獲得鎖則直接返回,否則阻塞等待前一個子節(jié)點刪除通知
- hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
- }
- catch ( KeeperException.NoNodeException e )
- {
- //對于NoNodeException,代碼中確保了只有發(fā)生session過期才會在這里拋出NoNodeException,因此這里根據重試策略進行重試
- if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
- {
- isDone = false;
- }
- else
- {
- throw e;
- }
- }
- }
- //如果獲得鎖則返回該子節(jié)點的路徑
- if ( hasTheLock )
- {
- return ourPath;
- }
- return null;
- }
上面代碼中主要有兩步操作:
- driver.createsTheLock:創(chuàng)建臨時且有序的子節(jié)點,里面實現(xiàn)比較簡單不做展開,主要關注幾種節(jié)點的模式:1)PERSISTENT(***);2)PERSISTENT_SEQUENTIAL(***且有序);3)EPHEMERAL(臨時);4)EPHEMERAL_SEQUENTIAL(臨時且有序)。
- internalLockLoop:阻塞等待直到獲得鎖。
看下internalLockLoop是怎么判斷鎖以及阻塞等待的,這里刪除了一些無關代碼,只保留主流程:
- //自旋直至獲得鎖
- while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
- {
- //獲取所有的子節(jié)點列表,并且按序號從小到大排序
- List<String> children = getSortedChildren();
- //根據序號判斷當前子節(jié)點是否為最小子節(jié)點
- String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
- PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
- if ( predicateResults.getsTheLock() )
- {
- //如果為最小子節(jié)點則認為獲得鎖
- haveTheLock = true;
- }
- else
- {
- //否則獲取前一個子節(jié)點
- String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
- //這里使用對象監(jiān)視器做線程同步,當獲取不到鎖時監(jiān)聽前一個子節(jié)點刪除消息并且進行wait(),當前一個子節(jié)點刪除(也就是鎖釋放)時,回調會通過notifyAll喚醒此線程,此線程繼續(xù)自旋判斷是否獲得鎖
- synchronized(this)
- {
- try
- {
- //這里使用getData()接口而不是checkExists()是因為,如果前一個子節(jié)點已經被刪除了那么會拋出異常而且不會設置事件監(jiān)聽器,而checkExists雖然也可以獲取到節(jié)點是否存在的信息但是同時設置了監(jiān)聽器,這個監(jiān)聽器其實永遠不會觸發(fā),對于zookeeper來說屬于資源泄露
- client.getData().usingWatcher(watcher).forPath(previousSequencePath);
- //如果設置了阻塞等待的時間
- if ( millisToWait != null )
- {
- millisToWait -= (System.currentTimeMillis() - startMillis);
- startMillis = System.currentTimeMillis();
- if ( millisToWait <= 0 )
- {
- doDelete = true; // 等待時間到達,刪除對應的子節(jié)點
- break;
- }
- //等待相應的時間
- wait(millisToWait);
- }
- else
- {
- //永遠等待
- wait();
- }
- }
- catch ( KeeperException.NoNodeException e )
- {
- //上面使用getData來設置監(jiān)聽器時,如果前一個子節(jié)點已經被刪除那么會拋出NoNodeException,只需要自旋一次即可,無需額外處理
- }
- }
- }
- }
具體邏輯見注釋,不再贅述。代碼中設置的事件監(jiān)聽器,在事件發(fā)生回調時只是簡單的notifyAll喚醒當前線程以重新自旋判斷,比較簡單不再展開。
以上。