基于Consul的分布式信號量實現(xiàn)
本文將繼續(xù)討論基于Consul的分布式鎖實現(xiàn)。信號量是我們在實現(xiàn)并發(fā)控制時會經(jīng)常使用的手段,主要用來限制同時并發(fā)線程或進程的數(shù)量,比如:Zuul默認情況下就使用信號量來限制每個路由的并發(fā)數(shù),以實現(xiàn)不同路由間的資源隔離。
信號量(Semaphore),有時被稱為信號燈,是在多線程環(huán)境下使用的一種設(shè)施,是可以用來保證兩個或多個關(guān)鍵代碼段不被并發(fā)調(diào)用。在進入一個關(guān)鍵代碼段之前,線程必須獲取一個信號量;一旦該關(guān)鍵代碼段完成了,那么該線程必須釋放信號量。其他想進入該關(guān)鍵代碼段的線程必須等待直到***個線程釋放信號量。為了完成這個過程,需要創(chuàng)建一個信號量VI,然后將Acquire Semaphore VI以及Release Semaphore VI分別放置在每個關(guān)鍵代碼段的首末端,確認這些信號量VI引用的是初始創(chuàng)建的信號量。如在這個停車場系統(tǒng)中,車位是公共資源,每輛車好比一個線程,看門人起的就是信號量的作用。
實現(xiàn)思路
- 信號量存儲:semaphore/key
- acquired操作:
- 創(chuàng)建session
- 鎖定key競爭者:semaphore/key/session
- 查詢信號量:semaphore/key/.lock,可以獲得如下內(nèi)容(如果是***次創(chuàng)建信號量,將獲取不到,這個時候就直接創(chuàng)建)
- {
- "limit": 3,
- "holders": [
- "90c0772a-4bd3-3a3c-8215-3b8937e36027",
- "93e5611d-5365-a374-8190-f80c4a7280ab"
- ]
- }
- 如果持有者已達上限,返回false,如果阻塞模式,就繼續(xù)嘗試acquired操作
- 如果持有者未達上限,更新semaphore/key/.lock的內(nèi)容,將當前線程的sessionId加入到holders中。注意:更新的時候需要設(shè)置cas,它的值是“查詢信號量”步驟獲得的“ModifyIndex”值,該值用于保證更新操作的基礎(chǔ)沒有被其他競爭者更新。如果更新成功,就開始執(zhí)行具體邏輯。如果沒有更新成功,說明有其他競爭者搶占了資源,返回false,阻塞模式下繼續(xù)嘗試acquired操作
- release操作:
- 從semaphore/key/.lock的holders中移除當前sessionId
- 刪除semaphore/key/session
- 刪除當前的session
流程圖
代碼實現(xiàn)
- public class Semaphore {
- private Logger logger = Logger.getLogger(getClass());
- private static final String prefix = "semaphore/"; // 信號量參數(shù)前綴
- private ConsulClient consulClient;
- private int limit;
- private String keyPath;
- private String sessionId = null;
- private boolean acquired = false;
- /**
- *
- * @param consulClient consul客戶端實例
- * @param limit 信號量上限值
- * @param keyPath 信號量在consul中存儲的參數(shù)路徑
- */
- public Semaphore(ConsulClient consulClient, int limit, String keyPath) {
- this.consulClient = consulClient;
- this.limit = limit;
- this.keyPath = prefix + keyPath;
- }
- /**
- * acquired信號量
- *
- * @param block 是否阻塞。如果為true,那么一直嘗試,直到獲取到該資源為止。
- * @return
- * @throws IOException
- */
- public Boolean acquired(boolean block) throws IOException {
- if(acquired) {
- logger.error(sessionId + " - Already acquired");
- throw new RuntimeException(sessionId + " - Already acquired");
- }
- // create session
- clearSession();
- this.sessionId = createSessionId("semaphore");
- logger.debug("Create session : " + sessionId);
- // add contender entry
- String contenderKey = keyPath + "/" + sessionId;
- logger.debug("contenderKey : " + contenderKey);
- PutParams putParams = new PutParams();
- putParams.setAcquireSession(sessionId);
- Boolean b = consulClient.setKVValue(contenderKey, "", putParams).getValue();
- if(!b) {
- logger.error("Failed to add contender entry : " + contenderKey + ", " + sessionId);
- throw new RuntimeException("Failed to add contender entry : " + contenderKey + ", " + sessionId);
- }
- while(true) {
- // try to take the semaphore
- String lockKey = keyPath + "/.lock";
- String lockKeyValue;
- GetValue lockKeyContent = consulClient.getKVValue(lockKey).getValue();
- if (lockKeyContent != null) {
- // lock值轉(zhuǎn)換
- lockKeyValue = lockKeyContent.getValue();
- BASE64Decoder decoder = new BASE64Decoder();
- byte[] v = decoder.decodeBuffer(lockKeyValue);
- String lockKeyValueDecode = new String(v);
- logger.debug("lockKey=" + lockKey + ", lockKeyValueDecode=" + lockKeyValueDecode);
- Gson gson = new Gson();
- ContenderValue contenderValue = gson.fromJson(lockKeyValueDecode, ContenderValue.class);
- // 當前信號量已滿
- if(contenderValue.getLimit() == contenderValue.getHolders().size()) {
- logger.debug("Semaphore limited " + contenderValue.getLimit() + ", waiting...");
- if(block) {
- // 如果是阻塞模式,再嘗試
- try {
- Thread.sleep(100L);
- } catch (InterruptedException e) {
- }
- continue;
- }
- // 非阻塞模式,直接返回沒有獲取到信號量
- return false;
- }
- // 信號量增加
- contenderValue.getHolders().add(sessionId);
- putParams = new PutParams();
- putParams.setCas(lockKeyContent.getModifyIndex());
- boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue();
- if(c) {
- acquired = true;
- return true;
- }
- else
- continue;
- } else {
- // 當前信號量還沒有,所以創(chuàng)建一個,并馬上搶占一個資源
- ContenderValue contenderValue = new ContenderValue();
- contenderValue.setLimit(limit);
- contenderValue.getHolders().add(sessionId);
- putParams = new PutParams();
- putParams.setCas(0L);
- boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue();
- if (c) {
- acquired = true;
- return true;
- }
- continue;
- }
- }
- }
- /**
- * 創(chuàng)建sessionId
- * @param sessionName
- * @return
- */
- public String createSessionId(String sessionName) {
- NewSession newnewSession = new NewSession();
- newSession.setName(sessionName);
- return consulClient.sessionCreate(newSession, null).getValue();
- }
- /**
- * 釋放session、并從lock中移除當前的sessionId
- * @throws IOException
- */
- public void release() throws IOException {
- if(this.acquired) {
- // remove session from lock
- while(true) {
- String contenderKey = keyPath + "/" + sessionId;
- String lockKey = keyPath + "/.lock";
- String lockKeyValue;
- GetValue lockKeyContent = consulClient.getKVValue(lockKey).getValue();
- if (lockKeyContent != null) {
- // lock值轉(zhuǎn)換
- lockKeyValue = lockKeyContent.getValue();
- BASE64Decoder decoder = new BASE64Decoder();
- byte[] v = decoder.decodeBuffer(lockKeyValue);
- String lockKeyValueDecode = new String(v);
- Gson gson = new Gson();
- ContenderValue contenderValue = gson.fromJson(lockKeyValueDecode, ContenderValue.class);
- contenderValue.getHolders().remove(sessionId);
- PutParams putParams = new PutParams();
- putParams.setCas(lockKeyContent.getModifyIndex());
- consulClient.deleteKVValue(contenderKey);
- boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue();
- if(c) {
- break;
- }
- }
- }
- // remove session key
- }
- this.acquired = false;
- clearSession();
- }
- public void clearSession() {
- if(sessionId != null) {
- consulClient.sessionDestroy(sessionId, null);
- sessionId = null;
- }
- }
- class ContenderValue implements Serializable {
- private Integer limit;
- private List<String> holders = new ArrayList<>();
- public Integer getLimit() {
- return limit;
- }
- public void setLimit(Integer limit) {
- this.limit = limit;
- }
- public List<String> getHolders() {
- return holders;
- }
- public void setHolders(List<String> holders) {
- this.holders = holders;
- }
- @Override
- public String toString() {
- return new Gson().toJson(this);
- }
- }
- }
單元測試
下面單元測試的邏輯:通過線程的方式來模擬不同的分布式服務(wù)來獲取信號量執(zhí)行業(yè)務(wù)邏輯。由于信號量與簡單的分布式互斥鎖有所不同,它不是只限定一個線程可以操作,而是可以控制多個線程的并發(fā),所以通過下面的單元測試,我們設(shè)置信號量為3,然后同時啟動15個線程來競爭的情況,來觀察分布式信號量實現(xiàn)的結(jié)果如何。
- INFO [Thread-6] SemaphoreRunner - Thread 7 start!
- INFO [Thread-2] SemaphoreRunner - Thread 3 start!
- INFO [Thread-7] SemaphoreRunner - Thread 8 start!
- INFO [Thread-2] SemaphoreRunner - Thread 3 end!
- INFO [Thread-5] SemaphoreRunner - Thread 6 start!
- INFO [Thread-6] SemaphoreRunner - Thread 7 end!
- INFO [Thread-9] SemaphoreRunner - Thread 10 start!
- INFO [Thread-5] SemaphoreRunner - Thread 6 end!
- INFO [Thread-1] SemaphoreRunner - Thread 2 start!
- INFO [Thread-7] SemaphoreRunner - Thread 8 end!
- INFO [Thread-10] SemaphoreRunner - Thread 11 start!
- INFO [Thread-10] SemaphoreRunner - Thread 11 end!
- INFO [Thread-12] SemaphoreRunner - Thread 13 start!
- INFO [Thread-1] SemaphoreRunner - Thread 2 end!
- INFO [Thread-3] SemaphoreRunner - Thread 4 start!
- INFO [Thread-9] SemaphoreRunner - Thread 10 end!
- INFO [Thread-0] SemaphoreRunner - Thread 1 start!
- INFO [Thread-3] SemaphoreRunner - Thread 4 end!
- INFO [Thread-14] SemaphoreRunner - Thread 15 start!
- INFO [Thread-12] SemaphoreRunner - Thread 13 end!
- INFO [Thread-0] SemaphoreRunner - Thread 1 end!
- INFO [Thread-13] SemaphoreRunner - Thread 14 start!
- INFO [Thread-11] SemaphoreRunner - Thread 12 start!
- INFO [Thread-13] SemaphoreRunner - Thread 14 end!
- INFO [Thread-4] SemaphoreRunner - Thread 5 start!
- INFO [Thread-4] SemaphoreRunner - Thread 5 end!
- INFO [Thread-8] SemaphoreRunner - Thread 9 start!
- INFO [Thread-11] SemaphoreRunner - Thread 12 end!
- INFO [Thread-14] SemaphoreRunner - Thread 15 end!
- INFO [Thread-8] SemaphoreRunner - Thread 9 end!
- public class TestLock {
- private Logger logger = Logger.getLogger(getClass());
- @Test
- public void testSemaphore() throws Exception {
- new Thread(new SemaphoreRunner(1)).start();
- new Thread(new SemaphoreRunner(2)).start();
- new Thread(new SemaphoreRunner(3)).start();
- new Thread(new SemaphoreRunner(4)).start();
- new Thread(new SemaphoreRunner(5)).start();
- new Thread(new SemaphoreRunner(6)).start();
- new Thread(new SemaphoreRunner(7)).start();
- new Thread(new SemaphoreRunner(8)).start();
- new Thread(new SemaphoreRunner(9)).start();
- new Thread(new SemaphoreRunner(10)).start();
- Thread.sleep(1000000L);
- }
- }
- public class SemaphoreRunner implements Runnable {
- private Logger logger = Logger.getLogger(getClass());
- private int flag;
- public SemaphoreRunner(int flag) {
- this.flag = flag;
- }
- @Override
- public void run() {
- Semaphore semaphore = new Semaphore(new ConsulClient(), 3, "mg-init");
- try {
- if (semaphore.acquired(true)) {
- // 獲取到信號量,執(zhí)行業(yè)務(wù)邏輯
- logger.info("Thread " + flag + " start!");
- Thread.sleep(new Random().nextInt(10000));
- logger.info("Thread " + flag + " end!");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- // 信號量釋放、Session鎖釋放、Session刪除
- semaphore.release();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
從測試結(jié)果,我們可以發(fā)現(xiàn)當信號量持有者數(shù)量達到信號量上限3的時候,其他競爭者就開始進行等待了,只有當某個持有者釋放信號量之后,才會有新的線程變成持有者,從而開始執(zhí)行自己的業(yè)務(wù)邏輯。所以,分布式信號量可以幫助我們有效的控制同時操作某個共享資源的并發(fā)數(shù)。
優(yōu)化建議
同前文一樣,這里只是做了簡單的實現(xiàn)。線上應(yīng)用還必須加入TTL的session清理以及對.lock資源中的無效holder進行清理的機制。
參考文檔:https://www.consul.io/docs/guides/semaphore.html
實現(xiàn)代碼
- GitHub:https://github.com/dyc87112/consul-distributed-lock
- 開源中國:http://git.oschina.net/didispace/consul-distributed-lock
【本文為51CTO專欄作者“翟永超”的原創(chuàng)稿件,轉(zhuǎn)載請通過51CTO聯(lián)系作者獲取授權(quán)】