基于Consul的分布式鎖實(shí)現(xiàn)
我們在構(gòu)建分布式系統(tǒng)的時(shí)候,經(jīng)常需要控制對(duì)共享資源的互斥訪問。這個(gè)時(shí)候我們就涉及到分布式鎖(也稱為全局鎖)的實(shí)現(xiàn),基于目前的各種工具,我們已經(jīng)有了大量的實(shí)現(xiàn)方式,比如:基于Redis的實(shí)現(xiàn)、基于Zookeeper的實(shí)現(xiàn)。本文將介紹一種基于Consul 的Key/Value存儲(chǔ)來實(shí)現(xiàn)分布式鎖以及信號(hào)量的方法。
分布式鎖實(shí)現(xiàn)
基于Consul的分布式鎖主要利用Key/Value存儲(chǔ)API中的acquire和release操作來實(shí)現(xiàn)。acquire和release操作是類似Check-And-Set的操作:
- acquire操作只有當(dāng)鎖不存在持有者時(shí)才會(huì)返回true,并且set設(shè)置的Value值,同時(shí)執(zhí)行操作的session會(huì)持有對(duì)該Key的鎖,否則就返回false
- release操作則是使用指定的session來釋放某個(gè)Key的鎖,如果指定的session無效,那么會(huì)返回false,否則就會(huì)set設(shè)置Value值,并返回true
具體實(shí)現(xiàn)中主要使用了這幾個(gè)Key/Value的API:
create session:https://www.consul.io/api/session.html#session_create
delete session:https://www.consul.io/api/session.html#delete-session
KV acquire/release:https://www.consul.io/api/kv.html#create-update-key
基本流程
具體實(shí)現(xiàn)
- public class Lock {
- private static final String prefix = "lock/"; // 同步鎖參數(shù)前綴
- private ConsulClient consulClient;
- private String sessionName;
- private String sessionId = null;
- private String lockKey;
- /**
- *
- * @param consulClient
- * @param sessionName 同步鎖的session名稱
- * @param lockKey 同步鎖在consul的KV存儲(chǔ)中的Key路徑,會(huì)自動(dòng)增加prefix前綴,方便歸類查詢
- */
- public Lock(ConsulClient consulClient, String sessionName, String lockKey) {
- this.consulClient = consulClient;
- this.sessionName = sessionName;
- this.lockKey = prefix + lockKey;
- }
- /**
- * 獲取同步鎖
- *
- * @param block 是否阻塞,直到獲取到鎖為止
- * @return
- */
- public Boolean lock(boolean block) {
- if (sessionId != null) {
- throw new RuntimeException(sessionId + " - Already locked!");
- }
- sessionId = createSession(sessionName);
- while(true) {
- PutParams putParams = new PutParams();
- putParams.setAcquireSession(sessionId);
- if(consulClient.setKVValue(lockKey, "lock:" + LocalDateTime.now(), putParams).getValue()) {
- return true;
- } else if(block) {
- continue;
- } else {
- return false;
- }
- }
- }
- /**
- * 釋放同步鎖
- *
- * @return
- */
- public Boolean unlock() {
- PutParams putParams = new PutParams();
- putParams.setReleaseSession(sessionId);
- boolean result = consulClient.setKVValue(lockKey, "unlock:" + LocalDateTime.now(), putParams).getValue();
- consulClient.sessionDestroy(sessionId, null);
- return result;
- }
- /**
- * 創(chuàng)建session
- * @param sessionName
- * @return
- */
- private String createSession(String sessionName) {
- NewSession newSession = new NewSession();
- newSession.setName(sessionName);
- return consulClient.sessionCreate(newSession, null).getValue();
- }
- }
單元測試
- public class TestLock {
- private Logger logger = Logger.getLogger(getClass());
- @Test
- public void testLock() throws Exception {
- new Thread(new LockRunner(1)).start();
- new Thread(new LockRunner(2)).start();
- new Thread(new LockRunner(3)).start();
- new Thread(new LockRunner(4)).start();
- new Thread(new LockRunner(5)).start();
- Thread.sleep(200000L);
- }
- class LockRunner implements Runnable {
- private Logger logger = Logger.getLogger(getClass());
- private int flag;
- public LockRunner(int flag) {
- this.flag = flag;
- }
- @Override
- public void run() {
- Lock lock = new Lock(new ConsulClient(), "lock-session", "lock-key");
- try {
- if (lock.lock(true)) {
- logger.info("Thread " + flag + " start!");
- Thread.sleep(new Random().nextInt(3000L));
- logger.info("Thread " + flag + " end!");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- }
- }
優(yōu)化建議
本文我們實(shí)現(xiàn)了基于Consul的簡單分布式鎖,但是在實(shí)際運(yùn)行時(shí),可能會(huì)因?yàn)楦鞣N各樣的意外情況導(dǎo)致unlock操作沒有得到正確地執(zhí)行,從而使得分布式鎖無法釋放。所以為了更完善的使用分布式鎖,我們還必須實(shí)現(xiàn)對(duì)鎖的超時(shí)清理等控制,保證即使出現(xiàn)了未正常解鎖的情況下也能自動(dòng)修復(fù),以提升系統(tǒng)的健壯性。那么如何實(shí)現(xiàn)呢?請(qǐng)持續(xù)關(guān)注我的后續(xù)分解!
【本文為51CTO專欄作者“翟永超”的原創(chuàng)稿件,轉(zhuǎn)載請(qǐng)通過51CTO聯(lián)系作者獲取授權(quán)】