Java 從零實現(xiàn)屬于你的 Redis 分布式鎖
redis分布式鎖
為什么需要分布式鎖
在 jdk 中為我們提供了加鎖的方式:
(1)synchronized 關(guān)鍵字
(2)volatile + CAS 實現(xiàn)的樂觀鎖
(3)ReadWriteLock 讀寫鎖
(4)ReenTrantLock 可重入鎖
等等,這些鎖為我們變成提供極大的便利性,保證在多線程的情況下,保證線程安全。
但是在分布式系統(tǒng)中,上面的鎖就統(tǒng)統(tǒng)沒用了。
我們想要解決分布式系統(tǒng)中的并發(fā)問題,就需要引入分布式鎖的概念。
java 代碼實現(xiàn)
創(chuàng)作動機
首先是對鎖實現(xiàn)原理的一個實現(xiàn),理論指導(dǎo)實踐,實踐完善理論。
晚上關(guān)于 redis 分布式鎖的文章一大堆,但是也都稂莠不齊。
redis 分布式鎖工具有時候中間件團隊不見得會提供,提供了也不見得經(jīng)常維護,不如自己實現(xiàn)一個,知道原理,也方便修改。
接口定義
為了便于和 JDK 復(fù)用,我們讓接口繼承自 jdk 的 Lock 接口。
- package com.github.houbb.lock.api.core;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Lock;
- /**
- * 鎖定義
- * @author binbin.hou
- * @since 0.0.1
- */
- public interface ILock extends Lock {
- /**
- * 嘗試加鎖
- * @param time 時間
- * @param unit 當(dāng)為
- * @param key key
- * @return 返回
- * @throws InterruptedException 異常
- * @since 0.0.1
- */
- boolean tryLock(long time, TimeUnit unit,
- String key) throws InterruptedException;
- /**
- * 嘗試加鎖
- * @param key key
- * @return 返回
- * @since 0.0.1
- */
- boolean tryLock(String key);
- /**
- * 解鎖
- * @param key key
- * @since 0.0.1
- */
- void unlock(String key);
- }
方法我們只添加了三個比較常用的核心方法,作為第一個版本,簡單點。
后續(xù)陸續(xù)添加即可。
抽象實現(xiàn)
為了便于后期添加更多的所實現(xiàn),這里首先實現(xiàn)了一個公用的抽象父類。
- package com.github.houbb.lock.redis.core;
- import com.github.houbb.lock.api.core.ILock;
- import com.github.houbb.lock.redis.constant.LockRedisConst;
- import com.github.houbb.wait.api.IWait;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Condition;
- /**
- * 抽象實現(xiàn)
- * @author binbin.hou
- * @since 0.0.1
- */
- public abstract class AbstractLockRedis implements ILock {
- /**
- * 鎖等待
- * @since 0.0.1
- */
- private final IWait wait;
- protected AbstractLockRedis(IWait wait) {
- this.wait = wait;
- }
- @Override
- public void lock() {
- throw new UnsupportedOperationException();
- }
- @Override
- public void lockInterruptibly() throws InterruptedException {
- throw new UnsupportedOperationException();
- }
- @Override
- public boolean tryLock() {
- return tryLock(LockRedisConst.DEFAULT_KEY);
- }
- @Override
- public void unlock() {
- unlock(LockRedisConst.DEFAULT_KEY);
- }
- @Override
- public boolean tryLock(long time, TimeUnit unit, String key) throws InterruptedException {
- long startTimeMills = System.currentTimeMillis();
- // 一次獲取,直接成功
- boolean result = this.tryLock(key);
- if(result) {
- return true;
- }
- // 時間判斷
- if(time <= 0) {
- return false;
- }
- long durationMills = unit.toMillis(time);
- long endMills = startTimeMills + durationMills;
- // 循環(huán)等待
- while (System.currentTimeMillis() < endMills) {
- result = tryLock(key);
- if(result) {
- return true;
- }
- // 等待 10ms
- wait.wait(TimeUnit.MILLISECONDS, 10);
- }
- return false;
- }
- @Override
- public synchronized boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
- return tryLock(time, unit, LockRedisConst.DEFAULT_KEY);
- }
- @Override
- public Condition newCondition() {
- throw new UnsupportedOperationException();
- }
- }
最核心的實際上是 public boolean tryLock(long time, TimeUnit unit, String key) throws InterruptedException 方法。
這個方法會調(diào)用 this.tryLock(key) 獲取鎖,如果成功,直接返回;如果不成功,則循環(huán)等待。
這里設(shè)置了超時時間,如果超時,則直接返回 true。
redis 鎖實現(xiàn)
我們實現(xiàn)的 redis 分布鎖,繼承自上面的抽象類。
- package com.github.houbb.lock.redis.core;
- import com.github.houbb.heaven.util.lang.StringUtil;
- import com.github.houbb.id.api.Id;
- import com.github.houbb.id.core.util.IdThreadLocalHelper;
- import com.github.houbb.lock.redis.constant.LockRedisConst;
- import com.github.houbb.lock.redis.exception.LockRedisException;
- import com.github.houbb.lock.redis.support.operator.IOperator;
- import com.github.houbb.wait.api.IWait;
- /**
- * 這里是基于 redis 實現(xiàn)
- *
- * 實際上也可以基于 zk/數(shù)據(jù)庫等實現(xiàn)。
- *
- * @author binbin.hou
- * @since 0.0.1
- */
- public class LockRedis extends AbstractLockRedis {
- /**
- * redis 操作實現(xiàn)
- * @since 0.0.1
- */
- private final IOperator redisOperator;
- /**
- * 主鍵標(biāo)識
- * @since 0.0.1
- */
- private final Id id;
- public LockRedis(IWait wait, IOperator redisOperator, Id id) {
- super(wait);
- this.redisOperator = redisOperator;
- this.id = id;
- }
- @Override
- public boolean tryLock(String key) {
- final String requestId = id.id();
- IdThreadLocalHelper.put(requestId);
- return redisOperator.lock(key, requestId, LockRedisConst.DEFAULT_EXPIRE_MILLS);
- }
- @Override
- public void unlock(String key) {
- final String requestId = IdThreadLocalHelper.get();
- if(StringUtil.isEmpty(requestId)) {
- String threadName = Thread.currentThread().getName();
- throw new LockRedisException("Thread " + threadName +" not contains requestId");
- }
- boolean unlock = redisOperator.unlock(key, requestId);
- if(!unlock) {
- throw new LockRedisException("Unlock key " + key + " result is failed!");
- }
- }
- }
這里就是 redis 鎖的核心實現(xiàn)了,如果不太理解,建議回顧一下原理篇:
redis 分布式鎖原理詳解
加鎖
加鎖部分,這里會生成一個 id 標(biāo)識,用于區(qū)分當(dāng)前操作者。
為了安全也設(shè)置了默認(rèn)的超時時間。
當(dāng)然這里是為了簡化調(diào)用者的使用成本,開發(fā)在使用的時候只需要關(guān)心自己要加鎖的 key 即可。
當(dāng)然,甚至連加鎖的 key 都可以進一步抽象掉,比如封裝 @DistributedLock 放在方法上,即可實現(xiàn)分布式鎖。這個后續(xù)有時間可以拓展,原理也不難。
解鎖
解鎖的時候,就會獲取當(dāng)前進程的持有標(biāo)識。
憑借當(dāng)前線程持有的 id 標(biāo)識,去解鎖。
IOperator
我們對 redis 的操作進行了抽象,為什么抽象呢?
因為 redis 服務(wù)種類實際很多,可以是 redis 單點,集群,主從,哨兵。
連接的客戶端也可以很多,jedis,spring redisTemplate, codis, redisson 等等。
這里為了后期拓展方便,就對操作進行了抽象。
接口
定義接口如下:
- package com.github.houbb.lock.redis.support.operator;
- /**
- * Redis 客戶端
- * @author binbin.hou
- * @since 0.0.1
- */
- public interface IOperator {
- /**
- * 嘗試獲取分布式鎖
- *
- * @param lockKey 鎖
- * @param requestId 請求標(biāo)識
- * @param expireTimeMills 超期時間
- * @return 是否獲取成功
- * @since 0.0.1
- */
- boolean lock(String lockKey, String requestId, int expireTimeMills);
- /**
- * 解鎖
- * @param lockKey 鎖 key
- * @param requestId 請求標(biāo)識
- * @return 結(jié)果
- * @since 0.0.1
- */
- boolean unlock(String lockKey, String requestId);
- }
jedis 實現(xiàn)
我們實現(xiàn)一個 jedis 單點版本的:
- package com.github.houbb.lock.redis.support.operator.impl;
- import com.github.houbb.lock.redis.constant.LockRedisConst;
- import com.github.houbb.lock.redis.support.operator.IOperator;
- import redis.clients.jedis.Jedis;
- import java.util.Collections;
- /**
- * Redis 客戶端
- * @author binbin.hou
- * @since 0.0.1
- */
- public class JedisOperator implements IOperator {
- /**
- * jedis 客戶端
- * @since 0.0.1
- */
- private final Jedis jedis;
- public JedisOperator(Jedis jedis) {
- this.jedis = jedis;
- }
- /**
- * 嘗試獲取分布式鎖
- *
- * expireTimeMills 保證當(dāng)前進程掛掉,也能釋放鎖
- *
- * requestId 保證解鎖的是當(dāng)前進程(鎖的持有者)
- *
- * @param lockKey 鎖
- * @param requestId 請求標(biāo)識
- * @param expireTimeMills 超期時間
- * @return 是否獲取成功
- * @since 0.0.1
- */
- @Override
- public boolean lock(String lockKey, String requestId, int expireTimeMills) {
- String result = jedis.set(lockKey, requestId, LockRedisConst.SET_IF_NOT_EXIST, LockRedisConst.SET_WITH_EXPIRE_TIME, expireTimeMills);
- return LockRedisConst.LOCK_SUCCESS.equals(result);
- }
- /**
- * 解鎖
- *
- * (1)使用 requestId,保證為當(dāng)前鎖的持有者
- * (2)使用 lua 腳本,保證執(zhí)行的原子性。
- *
- * @param lockKey 鎖 key
- * @param requestId 請求標(biāo)識
- * @return 結(jié)果
- * @since 0.0.1
- */
- @Override
- public boolean unlock(String lockKey, String requestId) {
- String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
- Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
- return LockRedisConst.RELEASE_SUCCESS.equals(result);
- }
- }
這里時最核心的部分。
別看簡單幾行代碼,需要注意的點還是很多的。
加鎖
加鎖時附帶 requestId,用來標(biāo)識自己為鎖的持有者。
SETNX 當(dāng) key 不存在時才進行加鎖。
設(shè)置加鎖的過期時間,避免因異常等原因未釋放鎖,導(dǎo)致鎖的長時間占用。
解鎖
使用 lua 腳本,保證操作的原子性。
為了證明為鎖的持有者,傳入 requestId。
測試驗證
maven 引入
- <dependency>
- <groupId>com.github.houbb</groupId>
- <artifactId>lock-core</artifactId>
- <version>0.0.1</version>
- </dependency>
測試代碼
- Jedis jedis = new Jedis("127.0.0.1", 6379);
- IOperator operator = new JedisOperator(jedis);
- // 獲取鎖
- ILock lock = LockRedisBs.newInstance().operator(operator).lock();
- try {
- boolean lockResult = lock.tryLock();
- System.out.println(lockResult);
- // 業(yè)務(wù)處理
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
小結(jié)
到這里,一個簡單版本的 redis 分布式鎖就實現(xiàn)完成了。
當(dāng)然還有很多可以改進的地方:
(1)比如引入遞增的 sequence,避免分布式鎖中的 GC 導(dǎo)致的問題
(2)對于更多 redis 服務(wù)端+客戶端的支持
(3)對于注解式 redis 分布式鎖的支持