【Redis實(shí)戰(zhàn)五】Redisson鎖機(jī)制源碼分析
1、了解分布式鎖的特性
1、鎖的互斥性
也就是說,在任意時(shí)刻,只能有一個(gè)客戶端能獲取到鎖,不能同時(shí)有兩個(gè)或多個(gè)客戶端獲取到鎖。
簡(jiǎn)單來說,就比如上廁所,一個(gè)廁所只有一個(gè)坑位,只能一個(gè)人上,不能同時(shí)兩個(gè)人或多個(gè)人上。
2、鎖的同一性
也就是說,鎖只能被持有該鎖的客戶端進(jìn)行刪除(釋放鎖),不能由其他客戶端刪除。
簡(jiǎn)單倆說,就是誰加的鎖,就只能誰來解鎖。也就是解鈴還須系鈴人。
3、鎖的可重入性
也就是說,持有某個(gè)鎖的客戶端,可以繼續(xù)對(duì)該鎖進(jìn)行加鎖,實(shí)現(xiàn)鎖的續(xù)租。
簡(jiǎn)單來說,就是你上廁所的按時(shí)間收費(fèi)的,時(shí)間快到了會(huì)按照時(shí)間給你續(xù)租,而會(huì)給你價(jià)錢。
而Redisson則會(huì)增大的你的續(xù)租次數(shù),也就是可重入次數(shù)。但絕不收費(fèi),因?yàn)镽edis是開源的嘛。
4、鎖的容錯(cuò)性
鎖超過了最大續(xù)租時(shí)間后,會(huì)自動(dòng)釋放鎖,其他客戶端會(huì)繼續(xù)獲得該鎖,從而防止死鎖的發(fā)生。
簡(jiǎn)單來說,比如你上個(gè)廁所上了五小時(shí),廁管員覺得不對(duì)勁,就來測(cè)試,發(fā)現(xiàn)你悄悄逃票了,此時(shí)測(cè)試會(huì)自動(dòng)變成解鎖狀態(tài),其他人就可以去上了,只是廁管員血虧5塊大洋。
2、帶著幾個(gè)特性去看Redisson源碼
先回顧一下Redisson加解鎖代碼如何寫的
public TestEntity getById2(Long id){
RLock lock = redissonClient.getLock("demo2_lock");
lock.lock(20, TimeUnit.SECONDS);
index++;
log.info("current index is : {}", index);
TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);
log.info("模擬查詢數(shù)據(jù)庫(kù):{}", testEntity);
lock.unlock();
return testEntity;
}
2.1、關(guān)注Redisson.getLock()方法
@Override
public RLock getLock(String name) {
return new RedissonLock(commandExecutor, name);
}
其實(shí)就是創(chuàng)建一個(gè)RedissonLock對(duì)象, 所以加鎖的邏輯就在RedissonLock.lock()中,解鎖的邏輯就在RedissonLock.unlock()。
2.2、關(guān)注RedissonLock.lock()方法
// RedissonLock.lock()的方法體
public void lock(long leaseTime, TimeUnit unit) {
try {
// 調(diào)用了lock的重載方法
lock(leaseTime, unit, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
關(guān)注lock的重載方法
// leaseTime表示最大續(xù)時(shí)間,unit表示續(xù)約時(shí)間單位,interruptibly表示是否可以中斷
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 獲取當(dāng)前線程的線程ID
long threadId = Thread.currentThread().getId();
// 嘗試獲取鎖,結(jié)果為null表示此時(shí)沒有客戶端占用鎖,絕不矯情,直接拿到鎖就返回。
// 結(jié)果ttl>0的話,表示此時(shí)已經(jīng)有了其他不識(shí)好歹的客戶端暫用了鎖,那么就只能絕望的等待了
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// 等待時(shí)訂閱一個(gè)渠道,如果鎖被其他客戶端釋放了,會(huì)通過發(fā)布訂閱模式在publish上發(fā)一個(gè)消息,表示鎖已經(jīng)釋放了
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
pubSub.timeout(future);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
try {
// 我干等這不是辦法,我還是要不斷去嘗試看能不能獲取鎖
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// 如果TTL為空了,表示獲取到了鎖,那還等什么,長(zhǎng)驅(qū)直入就是。
if (ttl == null) {
// 結(jié)束循環(huán)等待
break;
}
// 如果ttl還是大于0的,表示其他客戶端真的是過于不識(shí)好歹,還不肯釋放鎖。但好歹還是說了它還要持有錯(cuò)多久。
if (ttl >= 0) {
try {
// 既然如此,那么我就等待你的時(shí)間到達(dá)吧,除非我突然有啥事被中斷了,否則我就等到你過期
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// 如果傳入了中斷標(biāo)識(shí),直接拋出異常,中斷了,干別的事情去
if (interruptibly) {
throw e;
}
// 否則還是老老實(shí)實(shí)的繼續(xù)等待時(shí)間到來
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
}
// 鎖過期時(shí)間小于0, 表示那個(gè)殺千刀的客戶端居然沒有設(shè)置超時(shí)時(shí)間,它包場(chǎng)了,這可咋整。
else {
// 如果不被中斷,那么我也只有無期限的等待下去了,我不希望這個(gè)期限是一萬年
if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 最后,不管如何,我無論如何都要去取消訂閱這個(gè)publish的消息,因?yàn)檫@會(huì)浪費(fèi)我的精力,這已經(jīng)是我最后的堅(jiān)持了。
// 其實(shí)是釋放資源
unsubscribe(entry, threadId);
}
// get(lockAsync(leaseTime, unit));
}
關(guān)注tryAcquire加鎖方法
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
該方法調(diào)用了tryAcquireAsync來實(shí)現(xiàn)的,所以我們關(guān)注tryAcquireAsync方法,繼續(xù)跟進(jìn)。
關(guān)注tryAcquireAsync加鎖方法
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
// 首先判斷租約時(shí)間是否大于0
if (leaseTime > 0) {
// 大于零,調(diào)用tryLockInnerAsync獲取鎖
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 否則,使用默認(rèn)的租約時(shí)間 追溯下去發(fā)現(xiàn)private long lockWatchdogTimeout = 30 * 1000; 也就是30s的租約時(shí)間
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
//
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
// 結(jié)果為空,如果leaseTime大于哦,更新internalLockLeaseTime為指定的超時(shí)時(shí)間,并且不會(huì)啟動(dòng)看門狗(watch dog)
if (ttlRemaining == null) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 使用定時(shí)任務(wù),自動(dòng)續(xù)約(使用看門狗(watch dog))
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
可以看到,加鎖最終會(huì)調(diào)用tryLockInnerAsync進(jìn)行加鎖,而續(xù)約會(huì)使用scheduleExpirationRenewal進(jìn)行續(xù)約。
關(guān)注tryLockInnerAsync實(shí)現(xiàn)真正的加鎖邏輯
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
這里執(zhí)行了一段lua腳本(整個(gè)lua腳本保障原子性),我們將腳本內(nèi)容復(fù)制出來,詳細(xì)解釋一下。
-- KEYS[1] 加鎖的對(duì)象(也就是我們傳入的的鎖名稱)
-- ARGV[1] 表示鎖的過期時(shí)間
-- ARGV[2]:UUID+當(dāng)前線程id
-- 如果鎖不存在。 == 0表示不存在 == 1表示存在
if (redis.call('exists', KEYS[1]) == 0) then
-- 對(duì)我自己的鎖執(zhí)行一個(gè)incrby(自增,表示鎖的可重入次數(shù))操作
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 對(duì)key設(shè)置一個(gè)過期時(shí)間(過期時(shí)間就是保證鎖的容錯(cuò)性)
redis.call('pexpire', KEYS[1], ARGV[1]);
-- 返回nil, 相當(dāng)于null, 表示獲取鎖成功
return nil;
end ;
-- 繼續(xù)判斷鎖名成+UUID+當(dāng)前線程id是否存在,其實(shí)就是判斷我自己有沒有已經(jīng)拿到鎖(保證鎖的可重入性)
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 自己已經(jīng)持有鎖,執(zhí)行一個(gè)incrby(自增,表示鎖的可重入次數(shù))操作
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 重新設(shè)置過期時(shí)間
redis.call('pexpire', KEYS[1], ARGV[1]);
-- 返回nil, 相當(dāng)于null, 表示獲取鎖成功
return nil;
end ;
-- 都不是,表示已經(jīng)有其他客戶端獲取到了鎖,此時(shí)返回key的過期時(shí)間,也就是別人釋放鎖的時(shí)間(但其他客戶端可能出現(xiàn)續(xù)約,存在會(huì)等待更久的可能)
return redis.call('pttl', KEYS[1]);
整個(gè)lua腳本保障原子性,從而只會(huì)有一個(gè)客戶端能夠獲取到鎖,這樣就保證了鎖的互斥性。
打一個(gè)斷點(diǎn)看獲取到的鎖信息
hash表中的第一個(gè)值表示UUID+線程ID,這二個(gè)值表示鎖的重入次數(shù),如果鎖被多次獲取,那么這個(gè)值就是大于1。
關(guān)注scheduleExpirationRenewal實(shí)現(xiàn)自動(dòng)續(xù)約
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
// 不為空表示已經(jīng)開啟了續(xù)約操作
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
// 如果沒有開啟續(xù)約操作
entry.addThreadId(threadId);
try {
// 自動(dòng)續(xù)約
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
關(guān)注renewExpiration()方法
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 創(chuàng)建一個(gè)定時(shí)任務(wù)去實(shí)現(xiàn)自動(dòng)續(xù)約
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 獲取當(dāng)前鎖的ExpirationEntry 對(duì)象。
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
// 獲取第一個(gè)線程ID
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 鎖續(xù)期
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
// 續(xù)約成功,遞歸自己無限續(xù)約下去
if (res) {
// reschedule itself
renewExpiration();
} else {
// 續(xù)約失敗,表示鎖已釋放,取消續(xù)約任務(wù)
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // internalLockLeaseTime / 3表示每隔鎖時(shí)間的三分之一,去續(xù)約一次
ee.setTimeout(task);
}
關(guān)注renewExpirationAsync方法
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
我們發(fā)現(xiàn),又是一段lua腳本,還是復(fù)制出來,格式化后詳細(xì)解釋下代碼。
-- KEYS[1] 加鎖的對(duì)象(也就是我們傳入的的鎖名稱)
-- ARGV[1] 表示鎖的過期時(shí)間
-- ARGV[2]:UUID+當(dāng)前線程id
-- 使用hexists判斷鎖是不是自己持有的, == 1表示是自己持有,== 0 表示被其他客戶端持有
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 重新設(shè)置過期時(shí)間
redis.call('pexpire', KEYS[1], ARGV[1]);
-- 返回1 表示續(xù)約成功
return 1;
end ;
-- 返回0 表示續(xù)約失敗,也意味著鎖已經(jīng)被釋放或者被其他客戶端獲取了
return 0;
所以續(xù)約的邏輯就是,啟動(dòng)一個(gè)定時(shí)任務(wù),每隔續(xù)約時(shí)間的三分之一次就執(zhí)行一次。嘗試去續(xù)約,續(xù)約成功則會(huì)一直遞歸續(xù)約下去。續(xù)約失敗表示鎖已被釋放,則停止續(xù)約任務(wù)。
而續(xù)約的操作就是,判斷是否是自己持有鎖,是的話就重新設(shè)置過期時(shí)間,并且返回1表示續(xù)約成功,否則返回0表示續(xù)約失敗。
2.3、關(guān)注RedissonLock.unlock()方法
@Override
public void unlock() {
try {
// 其實(shí)就是調(diào)用了unlockAsync進(jìn)行解鎖
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
// Future<Void> future = unlockAsync();
// future.awaitUninterruptibly();
// if (future.isSuccess()) {
// return;
// }
// if (future.cause() instanceof IllegalMonitorStateException) {
// throw (IllegalMonitorStateException)future.cause();
// }
// throw commandExecutor.convertException(future);
}
我們可以看到,會(huì)使用unlockAsync方法進(jìn)行解鎖,并且在這里傳入了當(dāng)前的線程ID。
關(guān)注unlockAsync方法
@Override
public RFuture<Void> unlockAsync(long threadId) {
// 調(diào)用unlockInnerAsync實(shí)現(xiàn)異步解鎖
RFuture<Boolean> future = unlockInnerAsync(threadId);
// 釋放之后再處理一些事情
CompletionStage<Void> f = future.handle((opStatus, e) -> {
// 取消(停止)續(xù)約任務(wù),這里也會(huì)停止watch dog
cancelExpirationRenewal(threadId);
if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompletableFutureWrapper<>(f);
}
關(guān)注解鎖的核心邏輯unlockInnerAsync方法
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
可以看到,其實(shí)又是一段lua腳本,繼續(xù)復(fù)制出來分析一下。
-- KEYS[1] 加鎖的對(duì)象(也就是我們傳入的的鎖名稱)
-- KEYS[2] 監(jiān)聽該鎖的頻道 也就是publish要發(fā)送鎖被釋放的頻道,用于在鎖釋放時(shí)通知其他客戶端可以重新獲取鎖了
-- ARGV[1]:解鎖消息
-- ARGV[2] 表示鎖的過期時(shí)間
-- ARGV[3]:UUID+當(dāng)前線程id
-- 先判斷自己的鎖是不是已經(jīng)釋放了 ==0 表示key不存在了,也就是鎖被釋放了
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
-- 返回nil,也就是null, 表示釋放鎖成功
return nil;
end ;
-- 對(duì)鎖的重入次數(shù)減一 因?yàn)橹厝胍淮蝐ounter會(huì)+1,所以釋放時(shí)每次也只能-1,跟重入次數(shù)匹配
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
-- 如果重入次數(shù)仍然大于0,續(xù)約過期時(shí)間
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
-- 返回解說失敗
return 0;
else
-- 表示重入次數(shù)已經(jīng)為0了,刪除鎖的key
redis.call('del', KEYS[1]);
-- 使用publish發(fā)布一個(gè)消息,其他訂閱了的客戶端收到消息,就說明解鎖成功了餓、然后可以重新獲取鎖了
redis.call('publish', KEYS[2], ARGV[1]);
-- 返回1 表示解鎖成功
return 1;
end ;
return nil;
其實(shí)就是在解鎖的時(shí)候,已經(jīng)解鎖了直接返回成功,可重入次數(shù)沒有到0,將會(huì)解鎖失敗,直到可重入次數(shù)重新減到0后,開始刪除鎖的key.
并且此時(shí)會(huì)使用publish發(fā)送一個(gè)消息在渠道上,訂閱者們訂閱到了,就說明鎖已經(jīng)被釋放了,然后可以從重新獲取鎖了。
3、小結(jié)
Redisson實(shí)現(xiàn)分布式鎖,就是使用lua腳本保證原子性和互斥性的。每次都判斷是不是自己持有鎖,才進(jìn)行操作,這就保證了同一性。
在加鎖時(shí)使用incrby對(duì)key對(duì)應(yīng)的value值進(jìn)行自增,減鎖時(shí)自減實(shí)現(xiàn)鎖的可重入性。
使用redis的超時(shí)自動(dòng)過期來保證鎖的容錯(cuò)性,不會(huì)一直鎖死下去。所以鎖的最大續(xù)約時(shí)間是防止思索的一個(gè)有效的方法。