自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

基于Consul的分布式鎖實(shí)現(xiàn)

開發(fā) 開發(fā)工具 分布式
本文將介紹一種基于Consul 的Key/Value存儲(chǔ)來實(shí)現(xiàn)分布式鎖以及信號(hào)量的方法。

[[188445]]

我們在構(gòu)建分布式系統(tǒng)的時(shí)候,經(jīng)常需要控制對(duì)共享資源的互斥訪問。這個(gè)時(shí)候我們就涉及到分布式鎖(也稱為全局鎖)的實(shí)現(xiàn),基于目前的各種工具,我們已經(jīng)有了大量的實(shí)現(xiàn)方式,比如:基于Redis的實(shí)現(xiàn)、基于Zookeeper的實(shí)現(xiàn)。本文將介紹一種基于Consul 的Key/Value存儲(chǔ)來實(shí)現(xiàn)分布式鎖以及信號(hào)量的方法。

分布式鎖實(shí)現(xiàn)

基于Consul的分布式鎖主要利用Key/Value存儲(chǔ)API中的acquire和release操作來實(shí)現(xiàn)。acquire和release操作是類似Check-And-Set的操作:

  • acquire操作只有當(dāng)鎖不存在持有者時(shí)才會(huì)返回true,并且set設(shè)置的Value值,同時(shí)執(zhí)行操作的session會(huì)持有對(duì)該Key的鎖,否則就返回false
  • release操作則是使用指定的session來釋放某個(gè)Key的鎖,如果指定的session無效,那么會(huì)返回false,否則就會(huì)set設(shè)置Value值,并返回true

具體實(shí)現(xiàn)中主要使用了這幾個(gè)Key/Value的API:

create session:https://www.consul.io/api/session.html#session_create

delete session:https://www.consul.io/api/session.html#delete-session

KV acquire/release:https://www.consul.io/api/kv.html#create-update-key

基本流程

 

 

具體實(shí)現(xiàn)

  1. public class Lock { 
  2.   
  3.     private static final String prefix = "lock/";  // 同步鎖參數(shù)前綴 
  4.   
  5.     private ConsulClient consulClient; 
  6.     private String sessionName; 
  7.     private String sessionId = null
  8.     private String lockKey; 
  9.   
  10.     /** 
  11.      * 
  12.      * @param consulClient 
  13.      * @param sessionName   同步鎖的session名稱 
  14.      * @param lockKey       同步鎖在consul的KV存儲(chǔ)中的Key路徑,會(huì)自動(dòng)增加prefix前綴,方便歸類查詢 
  15.      */ 
  16.     public Lock(ConsulClient consulClient, String sessionName, String lockKey) { 
  17.         this.consulClient = consulClient; 
  18.         this.sessionName = sessionName; 
  19.         this.lockKey = prefix + lockKey; 
  20.     } 
  21.   
  22.     /** 
  23.      * 獲取同步鎖 
  24.      * 
  25.      * @param block     是否阻塞,直到獲取到鎖為止 
  26.      * @return 
  27.      */ 
  28.     public Boolean lock(boolean block) { 
  29.         if (sessionId != null) { 
  30.             throw new RuntimeException(sessionId + " - Already locked!"); 
  31.         } 
  32.         sessionId = createSession(sessionName); 
  33.         while(true) { 
  34.             PutParams putParams = new PutParams(); 
  35.             putParams.setAcquireSession(sessionId); 
  36.             if(consulClient.setKVValue(lockKey, "lock:" + LocalDateTime.now(), putParams).getValue()) { 
  37.                 return true
  38.             } else if(block) { 
  39.                 continue
  40.             } else { 
  41.                 return false
  42.             } 
  43.         } 
  44.     } 
  45.   
  46.     /** 
  47.      * 釋放同步鎖 
  48.      * 
  49.      * @return 
  50.      */ 
  51.     public Boolean unlock() { 
  52.         PutParams putParams = new PutParams(); 
  53.         putParams.setReleaseSession(sessionId); 
  54.         boolean result = consulClient.setKVValue(lockKey, "unlock:" + LocalDateTime.now(), putParams).getValue(); 
  55.         consulClient.sessionDestroy(sessionId, null); 
  56.         return result; 
  57.     } 
  58.   
  59.     /** 
  60.      * 創(chuàng)建session 
  61.      * @param sessionName 
  62.      * @return 
  63.      */ 
  64.     private String createSession(String sessionName) { 
  65.         NewSession newSession = new NewSession(); 
  66.         newSession.setName(sessionName); 
  67.         return consulClient.sessionCreate(newSession, null).getValue(); 
  68.     } 
  69.   

單元測試

  1. public class TestLock { 
  2.   
  3.     private Logger logger = Logger.getLogger(getClass()); 
  4.   
  5.     @Test 
  6.     public void testLock() throws Exception  { 
  7.         new Thread(new LockRunner(1)).start(); 
  8.         new Thread(new LockRunner(2)).start(); 
  9.         new Thread(new LockRunner(3)).start(); 
  10.         new Thread(new LockRunner(4)).start(); 
  11.         new Thread(new LockRunner(5)).start(); 
  12.         Thread.sleep(200000L); 
  13.     } 
  14.    
  15.     class LockRunner implements Runnable { 
  16.   
  17.         private Logger logger = Logger.getLogger(getClass()); 
  18.         private int flag; 
  19.   
  20.         public LockRunner(int flag) { 
  21.             this.flag = flag; 
  22.         } 
  23.   
  24.         @Override 
  25.         public void run() { 
  26.             Lock lock = new Lock(new ConsulClient(), "lock-session""lock-key"); 
  27.             try { 
  28.                 if (lock.lock(true)) { 
  29.                     logger.info("Thread " + flag + " start!"); 
  30.                     Thread.sleep(new Random().nextInt(3000L)); 
  31.                     logger.info("Thread " + flag + " end!"); 
  32.                 } 
  33.             } catch (Exception e) { 
  34.                 e.printStackTrace(); 
  35.             } finally { 
  36.                 lock.unlock(); 
  37.             } 
  38.         } 
  39.     } 
  40.    

優(yōu)化建議

本文我們實(shí)現(xiàn)了基于Consul的簡單分布式鎖,但是在實(shí)際運(yùn)行時(shí),可能會(huì)因?yàn)楦鞣N各樣的意外情況導(dǎo)致unlock操作沒有得到正確地執(zhí)行,從而使得分布式鎖無法釋放。所以為了更完善的使用分布式鎖,我們還必須實(shí)現(xiàn)對(duì)鎖的超時(shí)清理等控制,保證即使出現(xiàn)了未正常解鎖的情況下也能自動(dòng)修復(fù),以提升系統(tǒng)的健壯性。那么如何實(shí)現(xiàn)呢?請(qǐng)持續(xù)關(guān)注我的后續(xù)分解!

【本文為51CTO專欄作者“翟永超”的原創(chuàng)稿件,轉(zhuǎn)載請(qǐng)通過51CTO聯(lián)系作者獲取授權(quán)】

戳這里,看該作者更多好文

責(zé)任編輯:武曉燕 來源: 51CTO專欄
相關(guān)推薦

2017-05-11 14:05:25

Consul分布式信號(hào)量

2022-10-27 10:44:14

分布式Zookeeper

2017-10-24 11:28:23

Zookeeper分布式鎖架構(gòu)

2024-11-28 15:11:28

2022-11-06 19:28:02

分布式鎖etcd云原生

2019-06-19 15:40:06

分布式鎖RedisJava

2021-02-28 07:49:28

Zookeeper分布式

2023-08-21 19:10:34

Redis分布式

2021-10-25 10:21:59

ZK分布式鎖ZooKeeper

2022-01-06 10:58:07

Redis數(shù)據(jù)分布式鎖

2019-02-26 09:51:52

分布式鎖RedisZookeeper

2017-01-16 14:13:37

分布式數(shù)據(jù)庫

2018-04-03 16:24:34

分布式方式

2022-04-08 08:27:08

分布式鎖系統(tǒng)

2021-11-01 12:25:56

Redis分布式

2023-12-01 10:49:07

Redis分布式鎖

2021-06-03 00:02:43

RedisRedlock算法

2021-07-30 00:09:21

Redlock算法Redis

2022-03-08 07:22:48

Redis腳本分布式鎖

2024-10-09 17:12:34

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)