并發(fā)場景下的冪等問題-分布式鎖詳解
寫在前面:本文討論的冪等問題,均為并發(fā)場景下的冪等問題。即系統(tǒng)本存在冪等設計,但是在并發(fā)場景下失效了。
一 摘要
本文從釘釘實人認證場景的一例數據重復問題出發(fā),分析了其原因是因為并發(fā)導致冪等失效,引出冪等的概念。
針對并發(fā)場景下的冪等問題,提出了一種實現冪等可行的方法論,結合通訊錄加人業(yè)務場景對數據庫冪等問題進行了簡單分析,就分布式鎖實現冪等方法展開了詳細討論。
分析了鎖在分布式場景下存在的問題,包括單點故障、網絡超時、錯誤釋放他人鎖、提前釋放鎖以及分布式鎖單點故障等,提出了對應的解決方案,介紹了對應方案的具體實現。
二 問題
釘釘實人認證業(yè)務存在數據重復的問題。
1 問題現象
正常情況下,數據庫中應該只有一條實人認證成功記錄,但是實際上某用戶有多條。
2 問題原因
并發(fā)導致了不冪等。
我們先來回顧一下冪等的概念:
冪等(idempotent、idempotence)是一個數學與計算機學概念,常見于抽象代數中。
在編程中一個冪等操作的特點是其任意多次執(zhí)行所產生的影響均與一次執(zhí)行的影響相同。
--來自百度百科
實人認證在業(yè)務上有冪等設計,其一般流程為:
1)用戶選擇實人認證后會在服務端初始化一條記錄;
2)用戶在釘釘移動端按照指示完成人臉比對;
3)比對完成后訪問服務端修改數據庫狀態(tài)。
在第3步中,在修改數據庫狀態(tài)之前,會判斷「是否已經初始化」、「是否已經實人認證」以及「智科是否返回認證成功」以保證冪等。僅當請求首次訪問服務端嘗試修改數據庫狀態(tài)時,才能滿足冪等的判斷條件并修改數據庫狀態(tài)。其余任意次請求將直接返回,對數據庫狀態(tài)無影響。請求多次訪問服務端所產生的結果,和請求首次訪問服務端一致。因此,在實人認證成功的前提下,數據庫應當有且僅有一條認證成功的記錄。
但是在實際過程中我們發(fā)現,同一個請求會多次修改數據庫狀態(tài),系統(tǒng)并未按照我們預期的那樣實現冪等。究其原因,是因為請求并發(fā)訪問,在首次請求完成修改服務端狀態(tài)前,并發(fā)的其他請求和首次請求都通過了冪等判斷,對數據庫狀態(tài)進行了多次修改。
并發(fā)導致了原冪等設計失效。
并發(fā)導致了不冪等。
三 解決方案
解決并發(fā)場景下冪等問題的關鍵,是找到唯一性約束,執(zhí)行唯一性檢查,相同的數據保存一次,相同的請求操作一次。
一次訪問服務端的請求,可能產生以下幾種交互:
- 與數據源交互,例如數據庫狀態(tài)變更等;
- 與其他業(yè)務系統(tǒng)交互,例如調用下游服務或發(fā)送消息等;
一次請求可以只包含一次交互,也可以包含多次交互。例如一次請求可以僅僅修改一次數據庫狀態(tài),也可以在修改數據庫狀態(tài)后再發(fā)送一條數據庫狀態(tài)修改成功的消息。
于是我們可以得出一個結論:并發(fā)場景下,如果一個系統(tǒng)依賴的組件冪等,那么該系統(tǒng)在天然冪等。
以數據庫為例,如果一個請求對數據造成的影響是新增一條數據,那么唯一索引可以是冪等問題的解法。數據庫會幫助我們執(zhí)行唯一性檢查,相同數據不會重復落庫。
釘釘通訊錄加人就是通過數據庫的唯一索引解決了冪等問題。以釘釘通訊錄加人為例,在向數據庫寫數據之前,會先判斷數據是否已經存在于數據庫之中,如果不存在,加人請求最終會向數據庫的員工表插入一條數據。大量相同的并發(fā)的通訊錄加人請求讓系統(tǒng)的冪等設計失效成為可能。在一次加人請求中,(組織ID,工號)可以唯一標記一個請求,在數據庫中,也存在(組織ID,工號)的唯一索引。因此我們可以保證,多次相同的加人請求,只會修改一次數據庫狀態(tài),即添加一條記錄。
如果所依賴的組件天然冪等,那么問題就簡單了,但是實際情況往往更加復雜。并發(fā)場景下,如果系統(tǒng)依賴的組件無法冪等,我們就需要使用額外的手段實現冪等。
一個常用的手段就是使用分布式鎖。分布式鎖的實現方式有很多,比較常用的是緩存式分布式鎖。
四 分布式鎖
在What is a Java distributed lock?中有這樣幾段話:
In computer science, locks are mechanisms in a multithreaded environment to prevent different threads from operating on the same resource. When using locking, a resource is "locked" for access by a specific thread, and can only be accessed by a different thread once the resource has been released. Locks have several benefits: they stop two threads from doing the same work, and they prevent errors and data corruption when two threads try to use the same resource simultaneously.
Distributed locks in Java are locks that can work with not only multiple threads running on the same machine, but also threads running on clients on different machines in a distributed system. The threads on these separate machines must communicate and coordinate to make sure that none of them try to access a resource that has been locked up by another.
這幾段話告訴我們,鎖的本質是共享資源的互斥訪問,分布式鎖解決了分布式系統(tǒng)中共享資源的互斥訪問的問題。
java.util.concurrent.locks包提供了豐富的鎖實現,包括公平鎖/非公平鎖,阻塞鎖/非阻塞鎖,讀寫鎖以及可重入鎖等。
我們要如何實現一個分布式鎖呢?
方案一
分布式系統(tǒng)中常見有兩個問題:
1)單點故障問題,即當持有鎖的應用發(fā)生單點故障時,鎖將被長期無效占有;
2)網絡超時問題,即當客戶端發(fā)生網絡超時但實際上鎖成功時,我們無法再次正確的
獲取鎖。
要解決問題1,一個簡單的方案是引入過期時間(lease time),對鎖的持有將是有時效的,當應用發(fā)生單點故障時,被其持有的鎖可以自動釋放。
要解決問題2,一個簡單的方案是支持可重入,我們?yōu)槊總€獲取鎖的客戶端都配置一個不會重復的身份標識(通常是UUID),上鎖成功后鎖將帶有該客戶端的身份標識。當實際上鎖成功而客戶端超時重試時,我們可以判斷鎖已被該客戶端持有而返回成功。
綜上我們給出了一個lease-based distribute lock方案。出于性能考量,使用緩存作為鎖的存儲介質,利用MVCC(Multiversion concurrency control)機制解決共享資源互斥訪問問題,具體實現可見附錄代碼。
分布式鎖的一般使用方式如下
- 初始化分布式鎖的工廠
- 利用工廠生成一個分布式鎖實例
- 使用該分布式實例上鎖和解鎖操作
- @Test
- public void testTryLock() {
- //初始化工廠
- MdbDistributeLockFactory mdbDistributeLockFactory = new MdbDistributeLockFactory();
- mdbDistributeLockFactory.setNamespace(603);
- mdbDistributeLockFactory.setMtairManager(new MultiClusterTairManager());
- //獲得鎖
- DistributeLock lock = mdbDistributeLockFactory.getLock("TestLock");
- //上鎖解鎖操作
- boolean locked = lock.tryLock();
- if (!locked) {
- return;
- }
- try {
- //do something
- } finally {
- lock.unlock();
- }
- }
該方案簡單易用,但是問題也很明顯。例如,釋放鎖的時候只是簡單的將緩存中的key失效,所以存在錯誤釋放他人已持有鎖問題。所幸只要鎖的租期設置的足夠長,該問題出現幾率就足夠小。
我們借用Martin Kleppmann在文章How to do distributed locking中的一張圖說明該問題。
設想一種情況,當占有鎖的Client 1在釋放鎖之前,鎖就已經到期了,Client 2將獲取鎖,此時鎖被Client 2持有,但是Client 1可能會錯誤的將其釋放。一個更優(yōu)秀的方案,我們給每個鎖都設置一個身份標識,在釋放鎖的時候,1)首先查詢鎖是否是自己的,2)如果是自己的則釋放鎖。受限于實現方式,步驟1和步驟2不是原子操作,在步驟1和步驟2之間,如果鎖到期被其他客戶端獲取,此時也會錯誤的釋放他人的鎖。
方案二
借助Redis的Lua腳本,可以完美的解決存在錯誤釋放他人已持有鎖問題的。在Distributed locks with Redis這篇文章的 Correct implementation with a single instance 這一節(jié)中,我們可以得到我們想要的答案——如何實現一個分布式鎖。
當我們想要獲取鎖時,我們可以執(zhí)行如下方法
- SET resource_name my_random_value NX PX 30000
當我們想要釋放鎖時,我們可以執(zhí)行如下的Lua腳本
- if redis.call("get",KEYS[1]) == ARGV[1] then
- return redis.call("del",KEYS[1])
- else
- return 0
- end
方案三
在方案一和方案二的討論過程中,有一個問題被我們反復提及:鎖的自動釋放。
這是一把雙刃劍:
1)一方面它很好的解決了持有鎖的客戶端單點故障的問題
2)另一方面,如果鎖提前釋放,就會出現鎖的錯誤持有狀態(tài)
這個時候,我們可以引入Watch Dog自動續(xù)租機制,我們可以參考以下Redisson是如何實現的。
在上鎖成功后,Redisson會調用renewExpiration()方法開啟一個Watch Dog線程,為鎖自動續(xù)期。每過1/3時間續(xù)一次,成功則繼續(xù)下一次續(xù)期,失敗取消續(xù)期操作。
我們可以再看看Redisson是如何續(xù)期的。renewExpiration()方法的第17行renewExpirationAsync()方法是執(zhí)行鎖續(xù)期的關鍵操作,我們進入到方法內部,可以看到Redisson也是使用Lua腳本進行鎖續(xù)租的:1)判斷鎖是否存在;2)如果存在則重置過期時間。
- private void renewExpiration() {
- ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
- if (ee == null) {
- return;
- }
- Timeout task = commandExecutor.getConnectionManager().newTimeout(timeout -> {
- ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
- if (ent == null) {
- return;
- }
- Long threadId = ent.getFirstThreadId();
- if (threadId == null) {
- return;
- }
- RFuture<Boolean> future = renewExpirationAsync(threadId);
- future.onComplete((res, e) -> {
- if (e != null) {
- log.error("Can't update lock " + getRawName() + " expiration", e);
- EXPIRATION_RENEWAL_MAP.remove(getEntryName());
- return;
- }
- if (res) {
- // reschedule itself
- renewExpiration();
- } else {
- cancelExpirationRenewal(null);
- }
- });
- }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
- ee.setTimeout(task);
- }
- protected RFuture<Boolean> renewExpirationAsync(long threadId) {
- return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
- "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "return 1; " +
- "end; " +
- "return 0;",
- Collections.singletonList(getRawName()),
- internalLockLeaseTime, getLockName(threadId));
- }
方案四
借助Redisson的自動續(xù)期機制,我們無需再擔心鎖的自動釋放。但是討論到這里,我還是不得不面對一個問題:分布式鎖本身不是一個分布式應用。當Redis服務器故障無法正常工作時,整個分布式鎖也就無法提供服務。
更進一步,我們可以看看Distributed locks with Redis這篇文章中提到的Redlock算法及其實現。
Redlock算法不是銀彈,關于它的好與壞,也有很多爭論:
How to do distributed locking:
https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
Is Redlock safe?:
http://antirez.com/news/101
Martin Kleppmann和Antirez關于Redlock的爭辯:
https://news.ycombinator.com/item
參考資料
What is a Java distributed lock?
https://redisson.org/glossary/java-distributed-lock.html
Distributed locks and synchronizers:
https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers
Distributed locks with Redis:
https://redis.io/topics/distlock?spm=ata.21736010.0.0.31f77e3aFs96rz
附錄
分布式鎖
- public class MdbDistributeLock implements DistributeLock {
- /**
- * 鎖的命名空間
- */
- private final int namespace;
- /**
- * 鎖對應的緩存key
- */
- private final String lockName;
- /**
- * 鎖的唯一標識,保證可重入,以應對put成功,但是返回超時的情況
- */
- private final String lockId;
- /**
- * 是否持有鎖。true:是
- */
- private boolean locked;
- /**
- * 緩存實例
- */
- private final TairManager tairManager;
- public MdbDistributeLock(TairManager tairManager, int namespace, String lockCacheKey) {
- this.tairManager = tairManager;
- this.namespace = namespace;
- this.lockName = lockCacheKey;
- this.lockId = UUID.randomUUID().toString();
- }
- @Override
- public boolean tryLock() {
- try {
- //獲取鎖狀態(tài)
- Result<DataEntry> getResult = null;
- ResultCode getResultCode = null;
- for (int cnt = 0; cnt < DEFAULT_RETRY_TIMES; cnt++) {
- getResult = tairManager.get(namespace, lockName);
- getResultCode = getResult == null ? null : getResult.getRc();
- if (noNeedRetry(getResultCode)) {
- break;
- }
- }
- //重入,已持有鎖,返回成功
- if (ResultCode.SUCCESS.equals(getResultCode)
- && getResult.getValue() != null && lockId.equals(getResult.getValue().getValue())) {
- locked = true;
- return true;
- }
- //不可獲取鎖,返回失敗
- if (!ResultCode.DATANOTEXSITS.equals(getResultCode)) {
- log.error("tryLock fail code={} lock={} traceId={}", getResultCode, this, EagleEye.getTraceId());
- return false;
- }
- //嘗試獲取鎖
- ResultCode putResultCode = null;
- for (int cnt = 0; cnt < DEFAULT_RETRY_TIMES; cnt++) {
- putResultCode = tairManager.put(namespace, lockName, lockId, MDB_CACHE_VERSION,
- DEFAULT_EXPIRE_TIME_SEC);
- if (noNeedRetry(putResultCode)) {
- break;
- }
- }
- if (!ResultCode.SUCCESS.equals(putResultCode)) {
- log.error("tryLock fail code={} lock={} traceId={}", getResultCode, this, EagleEye.getTraceId());
- return false;
- }
- locked = true;
- return true;
- } catch (Exception e) {
- log.error("DistributedLock.tryLock fail lock={}", this, e);
- }
- return false;
- }
- @Override
- public void unlock() {
- if (!locked) {
- return;
- }
- ResultCode resultCode = tairManager.invalid(namespace, lockName);
- if (!resultCode.isSuccess()) {
- log.error("DistributedLock.unlock fail lock={} resultCode={} traceId={}", this, resultCode,
- EagleEye.getTraceId());
- }
- locked = false;
- }
- /**
- * 判斷是否需要重試
- *
- * @param resultCode 緩存的返回碼
- * @return true:不用重試
- */
- private boolean noNeedRetry(ResultCode resultCode) {
- return resultCode != null && !ResultCode.CONNERROR.equals(resultCode) && !ResultCode.TIMEOUT.equals(
- resultCode) && !ResultCode.UNKNOW.equals(resultCode);
- }
- }
分布式鎖工廠
- public class MdbDistributeLockFactory implements DistributeLockFactory {
- /**
- * 緩存的命名空間
- */
- @Setter
- private int namespace;
- @Setter
- private MultiClusterTairManager mtairManager;
- @Override
- public DistributeLock getLock(String lockName) {
- return new MdbDistributeLock(mtairManager, namespace, lockName);
- }
- }