實(shí)現(xiàn)一個(gè)Redis分布式鎖
前言
在我們?nèi)粘i_發(fā)中,難免會(huì)遇到要加鎖的情景。例如扣除產(chǎn)品庫(kù)存,首先要從數(shù)據(jù)庫(kù)中取出庫(kù)存,進(jìn)行庫(kù)存判斷,再減去庫(kù)存。這一波操作明顯不符合原子性,如果代碼塊不加鎖,很容易因?yàn)椴l(fā)導(dǎo)致超賣問題。咱們的系統(tǒng)如果是單體架構(gòu),那我們使用本地鎖就可以解決問題。如果是分布式架構(gòu),就需要使用分布式鎖。
方案
使用 SETNX 和 EXPIRE 命令
- SETNX key value
- EXPIRE key seconds
- DEL key
- if (setnx("item_1_lock", 1)) {
- expire("item_1_lock", 30);
- try {
- ... 邏輯
- } catch {
- ...
- } finally {
- del("item_1_lock");
- }
- }
這種方法看起來可以解決問題,但是有一定的風(fēng)險(xiǎn),因?yàn)?SETNX 和 EXPIRE 這波操作是非原子性的,如果 SETNX 成功之后,出現(xiàn)錯(cuò)誤,導(dǎo)致 EXPIRE 沒有執(zhí)行,導(dǎo)致鎖沒有設(shè)置超時(shí)時(shí)間形成死鎖。
針對(duì)這種情況,我們可以使用 lua 腳本來保持操作原子性,保證 SETNX 和 EXPIRE 兩個(gè)操作要么都成功,要么都不成功。
- if (redis.call('setnx', KEYS[1], ARGV[1]) < 1)
- then return 0;
- end;
- redis.call('expire', KEYS[1], tonumber(ARGV[2]));
- return 1;
通過這樣的方法,我們初步解決了競(jìng)爭(zhēng)鎖的原子性問題,雖然其他功能還未實(shí)現(xiàn),但是應(yīng)該不會(huì)造成死鎖🤪🤪🤪。
Redis 2.6.12 以上可靈活使用 SET 命令
- SET key value NX EX 30
- DEL key
- if (set("item_1_lock", 1, "NX", "EX", 30)) {
- try {
- ... 邏輯
- } catch {
- ...
- } finally {
- del("item_1_lock");
- }
- }
改進(jìn)后的方法不需要借助 lua 腳本就解決了 SETNX 和 EXPIRE 的原子性問題?,F(xiàn)在我們?cè)僮屑?xì)琢磨琢磨,如果 A 拿到了鎖順利進(jìn)入代碼塊執(zhí)行邏輯,但是由于各種原因?qū)е鲁瑫r(shí)自動(dòng)釋放鎖。在這之后 B 成功拿到了鎖進(jìn)入代碼塊執(zhí)行邏輯,但此時(shí)如果 A 執(zhí)行邏輯完畢再來釋放鎖,就會(huì)把 B 剛獲得的鎖釋放了。就好比用自己家的鑰匙開了別家的門,這是不可接受的。
為了解決這個(gè)問題我們可以嘗試在 SET 的時(shí)候設(shè)置一個(gè)鎖標(biāo)識(shí),然后在 DEL 的時(shí)候驗(yàn)證當(dāng)前鎖是否為自己的鎖。
- String value = UUID.randomUUID().toString().replaceAll("-", "");
- if (set("item_1_lock", value, "NX", "EX", 30)) {
- try {
- ... 邏輯
- } catch {
- ...
- } finally {
- ... lua 腳本保證原子性
- }
- }
- if (redis.call('get', KEYS[1]) == ARGV[1])
- then return redis.call('del', KEYS[1])
- else return 0
- end
到這里,我們終于解決了競(jìng)爭(zhēng)鎖的原子性問題和誤刪鎖問題。但是鎖一般還需要支持可重入、循環(huán)等待和超時(shí)自動(dòng)續(xù)約等功能點(diǎn)。下面我們學(xué)習(xí)使用一個(gè)非常好用的包來解決這些問題。
入門 Redisson
Redission 的鎖,實(shí)現(xiàn)了可重入和超時(shí)自動(dòng)續(xù)約功能,它都幫我們封裝好了,我們只要按照自己的需求調(diào)用它的 API 就可以輕松實(shí)現(xiàn)上面所提到的幾個(gè)功能點(diǎn)。詳細(xì)功能可以查看 Redisson 文檔
在項(xiàng)目中安裝 Redisson
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson</artifactId>
- <version>3.13.2</version>
- </dependency>
- implementation 'org.redisson:redisson:3.13.2'
用 Maven 或者 Gradle 構(gòu)建,目前最新版本為 3.13.2,也可以在這里 Redisson 找到你需要的版本。
簡(jiǎn)單嘗試
- RedissonClient redissonClient = Redisson.create();
- RLock lock = redissonClient.getLock("lock");
- boolean res = lock.lock();
- if (res) {
- try {
- ... 邏輯
- } finally {
- lock.unlock();
- }
- }
Redisson 將底層邏輯全部做了一個(gè)封裝 📦,我們無需關(guān)心具體實(shí)現(xiàn),幾行代碼就能使用一把完美的鎖。下面我們簡(jiǎn)單折騰折騰源碼 🤔🤔🤔。
加鎖
- private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
- long threadId = Thread.currentThread().getId();
- Long ttl = tryAcquire(leaseTime, unit, threadId);
- if (ttl == null) {
- return;
- }
- RFuture<RedissonLockEntry> future = subscribe(threadId);
- if (interruptibly) {
- commandExecutor.syncSubscriptionInterrupted(future);
- } else {
- commandExecutor.syncSubscription(future);
- }
- try {
- while (true) {
- ttl = tryAcquire(leaseTime, unit, threadId);
- if (ttl == null) {
- break;
- }
- if (ttl >= 0) {
- try {
- future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- if (interruptibly) {
- throw e;
- }
- future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- }
- } else {
- if (interruptibly) {
- future.getNow().getLatch().acquire();
- } else {
- future.getNow().getLatch().acquireUninterruptibly();
- }
- }
- }
- } finally {
- unsubscribe(future, threadId);
- }
- }
獲取鎖
- private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
- if (leaseTime != -1) {
- return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
- }
- RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
- ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
- if (e != null) {
- return;
- }
- if (ttlRemaining == null) {
- scheduleExpirationRenewal(threadId);
- }
- });
- return ttlRemainingFuture;
- }
- <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
- internalLockLeaseTime = unit.toMillis(leaseTime);
- return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
- "if (redis.call('exists', KEYS[1]) == 0) then " +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "return nil; " +
- "end; " +
- "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "return nil; " +
- "end; " +
- "return redis.call('pttl', KEYS[1]);",
- Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
- }
刪除鎖
- public RFuture<Void> unlockAsync(long threadId) {
- RPromise<Void> result = new RedissonPromise<Void>();
- RFuture<Boolean> future = unlockInnerAsync(threadId);
- future.onComplete((opStatus, e) -> {
- cancelExpirationRenewal(threadId);
- if (e != null) {
- result.tryFailure(e);
- return;
- }
- if (opStatus == null) {
- IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
- + id + " thread-id: " + threadId);
- result.tryFailure(cause);
- return;
- }
- result.trySuccess(null);
- });
- return result;
- }
- protected RFuture<Boolean> unlockInnerAsync(long threadId) {
- return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
- "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
- "return nil;" +
- "end; " +
- "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
- "if (counter > 0) then " +
- "redis.call('pexpire', KEYS[1], ARGV[2]); " +
- "return 0; " +
- "else " +
- "redis.call('del', KEYS[1]); " +
- "redis.call('publish', KEYS[2], ARGV[1]); " +
- "return 1; " +
- "end; " +
- "return nil;",
- Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
- }
總結(jié)
使用 Redis 做分布式鎖來解決并發(fā)問題仍存在一些困難,也有很多需要注意的點(diǎn),我們應(yīng)該正確評(píng)估系統(tǒng)的體量,不能為了使用某項(xiàng)技術(shù)而用。要完全解決并發(fā)問題,仍需要在數(shù)據(jù)庫(kù)層面做功夫。