自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Redisson雜談,你學(xué)到了什么?

開發(fā) 前端
本文主要簡單講述了在 Redisson 中分布式鎖的實(shí)現(xiàn)。其實(shí)在 Redisson 中還有很多值得深挖的點(diǎn)。比如:Redisson 中使用了大量 Netty 的特性。大家有興趣的話,可以仔細(xì)研究一下。

一.Redisson 簡介

Redisson 是一個基于 Netty 通信框架的高性能 Redis 客戶端, 實(shí)現(xiàn)了分布式和可擴(kuò)展的 Java 數(shù)據(jù)結(jié)構(gòu),提供很多分布式相關(guān)操作服務(wù)以及大量便利的工具方法,讓開發(fā)者可以把精力放在開發(fā)業(yè)務(wù),避免重復(fù)造輪子。

二.Redisson 優(yōu)點(diǎn)

1.通信框架基于 Netty,使用多路復(fù)用。吞吐量高。

2.兼容支持 Redis 集群模式,Reids 哨兵模式等,天然適配分布式服務(wù)。

3.提供多種分布式對象的封裝,如:Bloom Filter,Object Bucket,Bitset,AtomicLong, 和 HyperLogLog 等。

4.提供分布式鎖實(shí)現(xiàn)包括:

RedissonFairLock 公平鎖,

RedissonLock 非公平鎖,

RedissonRedLock 紅鎖(基于紅鎖算法, 當(dāng)集群中大多數(shù)( N/2 + 1 )加鎖成功了,則認(rèn)為加鎖成功,

目前已被棄用,Redisson 官方不再建議使用)。

三.RedissonLock 分布式鎖相關(guān)部分源碼解析

RedissonLock 作為分布式鎖,實(shí)現(xiàn)了可重入鎖。阻塞鎖,非阻塞鎖。并且 Redisson 存在看門狗機(jī)制,可以對未手動設(shè)置超時時間的鎖實(shí)現(xiàn)自動續(xù)期。

1.Trylock 加鎖

加鎖代碼邏輯

/**
*
* @param waitTime 獲取鎖的最大等待時間,默認(rèn) -1,
* @param leaseTime 鎖的過期時間,默認(rèn) -1
* @param unit
* @param threadId
* @return
*/
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  RFuture<Boolean> acquiredFuture;
  if (leaseTime > 0) {
    //若手動設(shè)置了鎖的過期時間,則加鎖時以當(dāng)前傳入過期時間為準(zhǔn)
    //執(zhí)行Lua腳本,加鎖
    acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, 
    threadId,RedisCommands.EVAL_NULL_BOOLEAN);                                                 
  } else {
    //若未手動設(shè)置,則默認(rèn)過期時間等于配置的lockWatchdogTimeout,lockWatchdogTimeout默認(rèn)為30s。
    //然后執(zhí)行Lua腳本,加鎖
    acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
  }

  CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
  //lock acquired
  //若鎖成功獲取到
  if (acquired) {
    if (leaseTime > 0) {
      internalLockLeaseTime = unit.toMillis(leaseTime);
      } else {
      //若未手動設(shè)置過期時間,則執(zhí)行看門狗任務(wù),自動續(xù)期
      scheduleExpirationRenewal(threadId);
    }
  }
  return acquired;
  });
  return new CompletableFutureWrapper<>(f);
}

加鎖 Lua 腳本如下:

if (redis.call('exists', KEYS[1]) == 0) then " +
  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  "return nil; " +
  "end; " +
  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  "return nil; " +
  "end; " +
  "return redis.call('pttl', KEYS[1]);

其中 KEYS[1] 是鎖邏輯名稱,ARGV[1] 是 key 的過期時間,ARGV[2]是鎖的線程級別名稱( uuid + 線程id ,uuid 是每個 Redisson 客戶端創(chuàng)建時唯一生成的)。

由此可看出,鎖利用 Hash 結(jié)構(gòu)實(shí)現(xiàn),其中 Hash 的 key 是鎖的邏輯名稱,field 是鎖的線程級別名稱,value 是鎖的重入次數(shù)。

加鎖 Lua 腳本的含義:

先判斷當(dāng)前邏輯鎖名稱的 key 是否存在,

若不存在,在 Hash 結(jié)構(gòu)中設(shè)置這個鎖,鎖重入次數(shù)加 1,然后給 key 設(shè)置一個過期時間,最后返回 null。

若存在,并且已經(jīng)被當(dāng)前線程持有,就鎖可重入次數(shù)加 1,并且重新設(shè)置 key 的過期時間,最后返回 null,

若當(dāng)前鎖被其他線程持有,返回 key 剩余過期時間。

2.Lock 阻塞鎖

Lock 阻塞鎖與 Trylock 底層調(diào)用代碼基本一致。多了一個等待鎖被其他線程釋放后,重新嘗試加鎖的過程。

代碼如下:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
  long threadId = Thread.currentThread().getId();
  Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
  // lock acquired
  if (ttl == null) {
    return;
  }
  //訂閱釋放鎖消息
  CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
  pubSub.timeout(future);
  RedissonLockEntry entry;
  if (interruptibly) {
    entry = commandExecutor.getInterrupted(future);
  } else {
    entry = commandExecutor.get(future);
  }

  try {
    while (true) {
      //重新嘗試取鎖
      ttl = tryAcquire(-1, leaseTime, unit, threadId);
      // lock acquired
      if (ttl == null) {
        break;
      }
      // waiting for message,
      if (ttl >= 0) {
        try {
          //當(dāng)鎖仍然被其他線程占有時,調(diào)用
          //java.util.concurrent.Semaphore#tryAcquire方法進(jìn)行信號量阻塞,
          //當(dāng)線程阻塞等待時間超過最大超時時間(ttl即鎖的key的剩余存活時間)
          //或者 監(jiān)聽到鎖釋放消息后,信號量被釋放后,線程不再阻塞
          entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
          if (interruptibly) {
            throw e;
          }
          entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
        }
      } else {
        if (interruptibly) {
          //嘗試從信號量獲取一個許可
          entry.getLatch().acquire();
        } else {
          entry.getLatch().acquireUninterruptibly();
        }
      }
    }
  } finally {
  //取消訂閱鎖釋放消息
  unsubscribe(entry, threadId);
}

大致流程如下:

1.先獲取鎖,若獲取鎖成功,直接返回。

2.若獲取失敗,訂閱釋放鎖消息。

3.進(jìn)入 while 循環(huán),重新嘗試獲取鎖。若獲取鎖成功,則跳出循環(huán),并不再訂閱釋放鎖消息。

4.若重新獲取鎖失敗,進(jìn)行信號量阻塞,直到鎖被其他占有線程釋放(監(jiān)聽鎖釋放消息的監(jiān)聽器中,有喚醒信號量的邏輯)或者到達(dá)阻塞超時時間,然后繼續(xù)這個 while 循環(huán)。

3.Unlock 解鎖

代碼如下

public RFuture<Void> unlockAsync(long threadId) {
  //執(zhí)行解鎖lua腳本
  RFuture<Boolean> future = unlockInnerAsync(threadId);

  CompletionStage<Void> f = future.handle((opStatus, e) -> {
    //取消看門狗任務(wù)
    cancelExpirationRenewal(threadId);

    if (e != null) {
      throw new CompletionException(e);
    }
    if (opStatus == null) {
      IllegalMonitorStateException cause = new IllegalMonitorStateException
      ("attempt to unlock lock, not locked by current thread by node id: "
      + id + " thread-id: " + threadId);
      throw new CompletionException(cause);
    }
    return null;
  });

  return new CompletableFutureWrapper<>(f);
}

1.其中解鎖 Lua 腳本如下:

if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
  "return nil;" +
  "end; " +
  "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
  "if (counter > 0) then " +
  "redis.call('pexpire', KEYS[1], ARGV[2]); " +
  "return 0; " +
  "else " +
  "redis.call('del', KEYS[1]); " +
  "redis.call('publish', KEYS[2], ARGV[1]); " +
  "return 1; " +
  "end; " +
  "return nil;

其中 KEYS[1] 為鎖的邏輯名稱,KEYS[2] 為通道名稱,ARGV[1] 為 0, ARGV[2] 為鎖的過期時間,默認(rèn) 30s,ARGV[3] 為鎖的線程級別名稱。

解鎖 Lua 腳本含義:

解鎖時,先判斷當(dāng)前鎖是否被當(dāng)前線程持有,

若不是,則返回 null。

若是,鎖的可重入次數(shù) 減1。

然后繼續(xù)判斷鎖的可重入次數(shù)是否大于 0,若大于 0,繼續(xù)給這個鎖 key 續(xù)期 30s,并且最后返回 0。

若不大于 0,刪除這個鎖的 key,并向指定通道發(fā)布這個解鎖消息,并且返回 1。

2.如果這個鎖有看門狗任務(wù)在定時續(xù)期,當(dāng)解鎖成功時會取消這個定時續(xù)期任務(wù)。

4.看門狗機(jī)制

當(dāng)某個鎖內(nèi)的任務(wù)的執(zhí)行時間不可預(yù)估時,可能執(zhí)行時間很長,也可能很短。此時若直接設(shè)置一個固定的鎖過期時間,可能會導(dǎo)致任務(wù)執(zhí)行時間遠(yuǎn)遠(yuǎn)大于鎖的過期時間,導(dǎo)致任務(wù)還未執(zhí)行完成,但是鎖已經(jīng)過期了。那其他線程又可以獲取到鎖,然后執(zhí)行該任務(wù)了,最終導(dǎo)致線程安全問題。

為應(yīng)對這種情況,定期給鎖續(xù)期的看門狗機(jī)制出現(xiàn)了。

代碼:

//真正看門狗續(xù)期任務(wù)
private void renewExpiration() {
  ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  if (ee == null) {
    return;
  }
  //創(chuàng)建一個延時任務(wù),底層實(shí)現(xiàn)是netty時間輪。當(dāng)每過了lockWatchdogTimeout/3的時間,執(zhí)行該任務(wù)
  Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
    @Override
    public void run(Timeout timeout) throws Exception {
      ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ent == null) {
          return;
        }
        Long threadId = ent.getFirstThreadId();
        //若當(dāng)前鎖已經(jīng)被當(dāng)前線程釋放,則鎖不再續(xù)期
        if (threadId == null) {
          return;
        }
        //調(diào)用Lua腳本,判斷當(dāng)前鎖是否被當(dāng)前線程占有,若是則返回true,
        //并且重新設(shè)置key的過期時間,默認(rèn)30s
        CompletionStage<Boolean> future = renewExpirationAsync(threadId);
        future.whenComplete((res, e) -> {
          if (e != null) {
            log.error("Can't update lock " + getRawName() + " expiration", e);
            EXPIRATION_RENEWAL_MAP.remove(getEntryName());
            return;
            }
            //當(dāng)鎖仍然被當(dāng)前線程占有,說明業(yè)務(wù)代碼還在執(zhí)行,則遞歸調(diào)用續(xù)期任務(wù)
            if (res) {
              // reschedule itself
              log.info("續(xù)期任務(wù)執(zhí)行"+ "threadId:" +threadId);
              renewExpiration();
            } else {
              //否則移除該續(xù)期任務(wù),直接在EXPIRATION_RENEWAL_MAP移除ExpirationEntry
              cancelExpirationRenewal(null);
            }
        });
    }
  }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
  
  ee.setTimeout(task);
}

當(dāng)沒有顯式指定鎖過期時間時候,就默認(rèn) key 過期時間 30s,然后定時任務(wù)每 10 秒( lockWatchdogTimeout/3 )進(jìn)行一次調(diào)用,執(zhí)行鎖續(xù)期動作,若這個線程還持有這個鎖,就對這個線程持有的鎖進(jìn)行續(xù)期操作(通過 pexpire 續(xù)期 key 30s),若途中持有鎖的線程 手動被 unlock 或者機(jī)器宕機(jī)才會取消這個任務(wù)。否則會一直續(xù)期。

四.總結(jié)

Redisson 作為一個 Redis 客戶端,基于 Redis、Lua 和 Netty 建立起了一套完善的分布式解決方案,比如分布式鎖的實(shí)現(xiàn),分布式對象的操作等。本文主要簡單講述了在 Redisson 中分布式鎖的實(shí)現(xiàn)。其實(shí)在 Redisson 中還有很多值得深挖的點(diǎn)。比如:Redisson 中使用了大量 Netty 的特性。大家有興趣的話,可以仔細(xì)研究一下。

五.參考文章

https://github.com/redisson/redisson/wiki

https://cloud.tencent.com/developer/article/1500854

責(zé)任編輯:武曉燕 來源: 政采云技術(shù)
相關(guān)推薦

2023-04-10 07:40:36

GraphQLRest通信模式

2022-07-19 08:04:04

HTTP應(yīng)用層協(xié)議

2024-11-13 09:22:40

2023-06-03 00:05:18

TypeScriptJSDoc掃描器

2024-04-12 08:54:13

從庫數(shù)據(jù)庫應(yīng)用

2024-07-31 09:28:56

2024-10-18 11:48:00

2024-08-12 15:44:06

2025-02-28 00:03:00

2023-06-06 08:14:18

核心Docker應(yīng)用程序

2023-04-26 22:52:19

視覺人臉檢測人臉對齊

2021-03-09 09:55:02

Vuejs前端代碼

2021-09-03 06:46:34

MyBatis緩存后端

2023-04-26 01:25:05

案例故障模型

2021-12-26 18:30:56

嵌入式ARM鏈接

2023-06-30 07:30:38

2023-05-19 07:31:48

2021-07-29 18:46:52

可視化類型圖形化

2021-07-28 07:01:09

薅羊毛架構(gòu)Vue+SSR

2015-09-06 16:03:57

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號