自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

實(shí)現(xiàn)一個(gè)Redis分布式鎖

運(yùn)維 數(shù)據(jù)庫(kù)運(yùn)維 分布式 Redis
在我們?nèi)粘i_發(fā)中,難免會(huì)遇到要加鎖的情景。例如扣除產(chǎn)品庫(kù)存,首先要從數(shù)據(jù)庫(kù)中取出庫(kù)存,進(jìn)行庫(kù)存判斷,再減去庫(kù)存。這一波操作明顯不符合原子性,如果代碼塊不加鎖,很容易因?yàn)椴l(fā)導(dǎo)致超賣問題。

[[335642]]

前言

在我們?nèi)粘i_發(fā)中,難免會(huì)遇到要加鎖的情景。例如扣除產(chǎn)品庫(kù)存,首先要從數(shù)據(jù)庫(kù)中取出庫(kù)存,進(jìn)行庫(kù)存判斷,再減去庫(kù)存。這一波操作明顯不符合原子性,如果代碼塊不加鎖,很容易因?yàn)椴l(fā)導(dǎo)致超賣問題。咱們的系統(tǒng)如果是單體架構(gòu),那我們使用本地鎖就可以解決問題。如果是分布式架構(gòu),就需要使用分布式鎖。

方案

使用 SETNX 和 EXPIRE 命令 

  1. SETNX key value  
  2. EXPIRE key seconds  
  3. DEL key  
  4. if (setnx("item_1_lock", 1)) {  
  5.     expire("item_1_lock", 30);  
  6.     try {  
  7.         ... 邏輯  
  8.     } catch {  
  9.         ...  
  10.     } finally {  
  11.         del("item_1_lock");  
  12.     }  

這種方法看起來可以解決問題,但是有一定的風(fēng)險(xiǎn),因?yàn)?SETNX 和 EXPIRE 這波操作是非原子性的,如果 SETNX 成功之后,出現(xiàn)錯(cuò)誤,導(dǎo)致 EXPIRE 沒有執(zhí)行,導(dǎo)致鎖沒有設(shè)置超時(shí)時(shí)間形成死鎖。

針對(duì)這種情況,我們可以使用 lua 腳本來保持操作原子性,保證 SETNX 和 EXPIRE 兩個(gè)操作要么都成功,要么都不成功。 

  1. if (redis.call('setnx', KEYS[1], ARGV[1]) < 1 
  2. then return 0;  
  3. end;  
  4. redis.call('expire', KEYS[1], tonumber(ARGV[2]));  
  5. return 1; 

通過這樣的方法,我們初步解決了競(jìng)爭(zhēng)鎖的原子性問題,雖然其他功能還未實(shí)現(xiàn),但是應(yīng)該不會(huì)造成死鎖🤪🤪🤪。

Redis 2.6.12 以上可靈活使用 SET 命令 

  1. SET key value NX EX 30  
  2. DEL key  
  3. if (set("item_1_lock", 1, "NX", "EX", 30)) {  
  4.     try {  
  5.         ... 邏輯  
  6.     } catch {  
  7.         ...  
  8.     } finally {  
  9.         del("item_1_lock");  
  10.     }  

改進(jìn)后的方法不需要借助 lua 腳本就解決了 SETNX 和 EXPIRE 的原子性問題?,F(xiàn)在我們?cè)僮屑?xì)琢磨琢磨,如果 A 拿到了鎖順利進(jìn)入代碼塊執(zhí)行邏輯,但是由于各種原因?qū)е鲁瑫r(shí)自動(dòng)釋放鎖。在這之后 B 成功拿到了鎖進(jìn)入代碼塊執(zhí)行邏輯,但此時(shí)如果 A 執(zhí)行邏輯完畢再來釋放鎖,就會(huì)把 B 剛獲得的鎖釋放了。就好比用自己家的鑰匙開了別家的門,這是不可接受的。

為了解決這個(gè)問題我們可以嘗試在 SET 的時(shí)候設(shè)置一個(gè)鎖標(biāo)識(shí),然后在 DEL 的時(shí)候驗(yàn)證當(dāng)前鎖是否為自己的鎖。 

  1. String value = UUID.randomUUID().toString().replaceAll("-", "");  
  2. if (set("item_1_lock", value, "NX", "EX", 30)) {  
  3.     try {  
  4.         ... 邏輯  
  5.     } catch {  
  6.         ...  
  7.     } finally {  
  8.         ... lua 腳本保證原子性  
  9.     }  
  10.  
  11. if (redis.call('get', KEYS[1]) == ARGV[1])  
  12. then return redis.call('del', KEYS[1])  
  13. else return 0  
  14. end 

到這里,我們終于解決了競(jìng)爭(zhēng)鎖的原子性問題和誤刪鎖問題。但是鎖一般還需要支持可重入、循環(huán)等待和超時(shí)自動(dòng)續(xù)約等功能點(diǎn)。下面我們學(xué)習(xí)使用一個(gè)非常好用的包來解決這些問題。

入門 Redisson

Redission 的鎖,實(shí)現(xiàn)了可重入和超時(shí)自動(dòng)續(xù)約功能,它都幫我們封裝好了,我們只要按照自己的需求調(diào)用它的 API 就可以輕松實(shí)現(xiàn)上面所提到的幾個(gè)功能點(diǎn)。詳細(xì)功能可以查看 Redisson 文檔

在項(xiàng)目中安裝 Redisson 

  1. <dependency>  
  2.     <groupId>org.redisson</groupId>  
  3.     <artifactId>redisson</artifactId>  
  4.     <version>3.13.2</version>  
  5. </dependency>  
  1. implementation 'org.redisson:redisson:3.13.2' 

用 Maven 或者 Gradle 構(gòu)建,目前最新版本為 3.13.2,也可以在這里 Redisson 找到你需要的版本。

簡(jiǎn)單嘗試 

  1. RedissonClient redissonClient = Redisson.create();  
  2. RLock lock = redissonClient.getLock("lock");  
  3. boolean res = lock.lock();  
  4. if (res) {  
  5.    try {  
  6.      ... 邏輯  
  7.    } finally {  
  8.        lock.unlock();  
  9.    }  

Redisson 將底層邏輯全部做了一個(gè)封裝 📦,我們無需關(guān)心具體實(shí)現(xiàn),幾行代碼就能使用一把完美的鎖。下面我們簡(jiǎn)單折騰折騰源碼 🤔🤔🤔。

加鎖 

  1. private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {  
  2.     long threadId = Thread.currentThread().getId();  
  3.     Long ttl = tryAcquire(leaseTime, unit, threadId);  
  4.     if (ttl == null) { 
  5.          return;  
  6.     }  
  7.     RFuture<RedissonLockEntry> future = subscribe(threadId);  
  8.     if (interruptibly) {  
  9.         commandExecutor.syncSubscriptionInterrupted(future);  
  10.     } else {  
  11.         commandExecutor.syncSubscription(future);  
  12.     }  
  13.     try {  
  14.         while (true) {  
  15.             ttl = tryAcquire(leaseTime, unit, threadId);  
  16.             if (ttl == null) {  
  17.                 break;  
  18.             }  
  19.             if (ttl >= 0) {  
  20.                 try {  
  21.                     future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);  
  22.                 } catch (InterruptedException e) {  
  23.                     if (interruptibly) {  
  24.                         throw e;  
  25.                     }  
  26.                     future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);  
  27.                 }  
  28.             } else {  
  29.                 if (interruptibly) {  
  30.                     future.getNow().getLatch().acquire();  
  31.                 } else {  
  32.                     future.getNow().getLatch().acquireUninterruptibly();  
  33.                 }  
  34.             } 
  35.         }  
  36.     } finally {  
  37.         unsubscribe(future, threadId);  
  38.     }  

獲取鎖 

  1. private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {  
  2.     if (leaseTime != -1) {  
  3.         return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);  
  4.     }  
  5.     RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); 
  6.      ttlRemainingFuture.onComplete((ttlRemaining, e) -> {  
  7.         if (e != null) {  
  8.             return;  
  9.         }  
  10.         if (ttlRemaining == null) {  
  11.             scheduleExpirationRenewal(threadId);  
  12.         }  
  13.     });  
  14.     return ttlRemainingFuture;  
  15.  
  16. <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {  
  17.     internalLockLeaseTime = unit.toMillis(leaseTime);  
  18.     return evalWriteAsync(getName(), LongCodec.INSTANCE, command,  
  19.             "if (redis.call('exists', KEYS[1]) == 0) then " +  
  20.                     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +  
  21.                     "redis.call('pexpire', KEYS[1], ARGV[1]); " +  
  22.                     "return nil; " +  
  23.                     "end; " +  
  24.                     "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +  
  25.                     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +  
  26.                     "redis.call('pexpire', KEYS[1], ARGV[1]); " +  
  27.                     "return nil; " +  
  28.                     "end; " +  
  29.                     "return redis.call('pttl', KEYS[1]);",  
  30.             Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));  

刪除鎖 

  1. public RFuture<Void> unlockAsync(long threadId) {  
  2.     RPromise<Void> result = new RedissonPromise<Void>();  
  3.     RFuture<Boolean> future = unlockInnerAsync(threadId);  
  4.     future.onComplete((opStatus, e) -> {  
  5.         cancelExpirationRenewal(threadId);  
  6.         if (e != null) { 
  7.              result.tryFailure(e);  
  8.             return;  
  9.         }  
  10.         if (opStatus == null) { 
  11.              IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "  
  12.                     + id + " thread-id: " + threadId);  
  13.             result.tryFailure(cause);  
  14.             return; 
  15.          }  
  16.         result.trySuccess(null);  
  17.     });  
  18.     return result;  
  19.  
  20. protected RFuture<Boolean> unlockInnerAsync(long threadId) {  
  21.     return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,  
  22.             "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +  
  23.                     "return nil;" +  
  24.                     "end; " +  
  25.                     "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +  
  26.                     "if (counter > 0) then " +  
  27.                     "redis.call('pexpire', KEYS[1], ARGV[2]); " +  
  28.                     "return 0; " +  
  29.                     "else " +  
  30.                     "redis.call('del', KEYS[1]); " +  
  31.                     "redis.call('publish', KEYS[2], ARGV[1]); " +  
  32.                     "return 1; " +  
  33.                     "end; " +  
  34.                     "return nil;",  
  35.             Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));  

總結(jié)

使用 Redis 做分布式鎖來解決并發(fā)問題仍存在一些困難,也有很多需要注意的點(diǎn),我們應(yīng)該正確評(píng)估系統(tǒng)的體量,不能為了使用某項(xiàng)技術(shù)而用。要完全解決并發(fā)問題,仍需要在數(shù)據(jù)庫(kù)層面做功夫。 

 

責(zé)任編輯:龐桂玉 來源: 馬哥Linux運(yùn)維
相關(guān)推薦

2021-11-01 12:25:56

Redis分布式

2024-02-19 00:00:00

Redis分布式

2024-05-08 10:20:00

Redis分布式

2024-07-15 08:25:07

2022-04-14 07:56:30

公平鎖Java線程

2022-09-29 08:28:57

SpringRedis分布式

2022-09-22 13:28:34

Redis分布式鎖

2023-08-21 19:10:34

Redis分布式

2022-01-06 10:58:07

Redis數(shù)據(jù)分布式鎖

2019-02-26 09:51:52

分布式鎖RedisZookeeper

2024-04-01 05:10:00

Redis數(shù)據(jù)庫(kù)分布式鎖

2024-10-07 10:07:31

2022-11-11 08:19:03

redis分布式

2019-06-19 15:40:06

分布式鎖RedisJava

2023-03-06 08:14:48

MySQLRedis場(chǎng)景

2023-03-01 08:07:51

2020-07-15 16:50:57

Spring BootRedisJava

2023-01-13 07:39:07

2023-09-04 08:45:07

分布式配置中心Zookeeper

2023-09-21 22:22:51

開發(fā)分布式鎖
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)