Redis分布式鎖抽絲剝繭
分布式鎖是"線程同步"的延續(xù)
最近首度應(yīng)用"分布式鎖",現(xiàn)在想想,分布式鎖不是孤立的技能點,這其實就是跨主機的線程同步。
進程內(nèi) | 跨進程 | 跨主機 |
Lock/Monitor、SemaphoreSlim | Metux、Semaphore | 分布式鎖 |
用戶態(tài)線程安全 | 內(nèi)核態(tài)線程安全 |
單機服務(wù)器可以通過共享某堆內(nèi)存來標(biāo)記上鎖/解鎖,線程同步說到底是建立在單機操作系統(tǒng)的用戶態(tài)/內(nèi)核態(tài)對共享內(nèi)存的訪問控制。
而分布式服務(wù)器不是在同一臺機器上:跨主機,因此需要將鎖標(biāo)記存儲在所有機器進程都能看到的地方。
在開發(fā)很多業(yè)務(wù)場景會使用到鎖,例如庫存控制,抽獎等。
例如庫存只剩1個商品,有三個用戶同時打算購買,誰先購買庫存立即清零,不能讓其他二人也購買成功。
解讀分布式鎖
我們常說的線程安全、線程同步方案,包括此次的分布式鎖都是基于
“多線程/多進程對特定資源同時有更新操作”。
基本考量
1.分布式系統(tǒng),一個鎖在同一時間只能被一個服務(wù)器獲取 (這是分布式鎖的基礎(chǔ))
2.具備鎖失效機制,防止死鎖 (防止某些意外,鎖沒有得到釋放,別人也無法得到鎖)
Redis SET resource-name anystring NX EX max-lock-time
是一種最簡單的分布式鎖實現(xiàn)方案。
SET 命令支持多個參數(shù):
- EX seconds-- 設(shè)置過期時間(s)
- NX -- 如果key不存在,則設(shè)置 ......
因為SET命令參數(shù)可以替代SETNX,SETEX,GETSET,這些命令在未來可能被廢棄。
上面的命令返回OK(或經(jīng)過重試),客戶端就獲取到這個鎖;
使用DEL命令解鎖;到達超時時間會自動釋放鎖。
在解鎖時,增加一些設(shè)計,讓系統(tǒng)更加健壯:
3.不要使用固定的String值作為鎖標(biāo)記值,而是使用一個不易被猜中的隨機值, 業(yè)內(nèi)稱為token
4.不使用DEL命令釋放鎖,而是發(fā)送script去移除key
第3、4點是為了解決 :“鎖提前過期,客戶端A還沒有執(zhí)行完,然后客戶端B獲取了鎖,這時客戶端A執(zhí)行完了,會不會在刪鎖的時候把B的鎖給刪掉” -- 4是3技術(shù)上的推薦實現(xiàn)。
腳本如下:
- if redis.call("get",KEYS1] ==ARGV[1])
- then
- return redis.call("DEL",KEYS[1])
- else
- return 0
- end
下面使用StackExchange.Redis 寫了基于以上考量的代碼示例:
- /// <summary>
- /// Acquires the lock.
- /// </summary>
- /// <param name="key"></param>
- /// <param name="token">隨機值</param>
- /// <param name="expireSecond"></param>
- /// <param name="waitLockSeconds">非阻塞鎖</param>
- static bool Lock(string key, string token,int expireSecond=10, double waitLockSeconds = 0)
- {
- var waitIntervalMs = 50;
- bool isLock;
- DateTime begin = DateTime.Now;
- do
- {
- isLock = Connection.GetDatabase().StringSet(key, token, TimeSpan.FromSeconds(expireSecond), When.NotExists);
- if (isLock)
- return true;
- //不等待鎖則返回
- if (waitLockSeconds == 0) break;
- //超過等待時間,則不再等待
- if ((DateTime.Now - begin).TotalSeconds >= waitLockSeconds) break;
- Thread.Sleep(waitIntervalMs);
- } while (!isLock);
- return false;
- }
- /// <summary>
- /// Releases the lock.
- /// </summary>
- /// <returns><c>true</c>, if lock was released, <c>false</c> otherwise.</returns>
- /// <param name="key">Key.</param>
- /// <param name="value">value</param>
- static bool UnLock(string key, string value)
- {
- string lua_script = @"
- if (redis.call('GET', KEYS[1]) == ARGV[1]) then
- redis.call('DEL', KEYS[1])
- return true
- else
- return false
- end
- ";
- try
- {
- var res = Connection.GetDatabase().ScriptEvaluate(lua_script,
- new RedisKey[] { key },
- new RedisValue[] { value });
- return (bool)res;
- }
- catch (Exception ex)
- {
- Console.WriteLine($"ReleaseLock lock fail...{ex.Message}");
- return false;
- }
- }
- private static Lazy<ConnectionMultiplexer> lazyConnection = new Lazy<ConnectionMultiplexer>(() =>
- {
- ConfigurationOptions configuration = new ConfigurationOptions
- {
- AbortOnConnectFail = false,
- ConnectTimeout = 5000,
- };
- configuration.EndPoints.Add("10.100.219.9", 6379);
- return ConnectionMultiplexer.Connect(configuration.ToString());
- });
- public static ConnectionMultiplexer Connection => lazyConnection.Value;
以上代碼新增了第五點考量:
5. 為避免無限制搶鎖,增加了非阻塞鎖:輪詢_s等待鎖,未等到則不再搶鎖
使用方式:
下面并行開啟三個任務(wù),同時減少庫存:
- static void Main(string[] args)
- {
- // 嘗試并行執(zhí)行3個任務(wù)
- Parallel.For(0, 3, x =>
- {
- string token = $"loki:{x}";
- bool isLocked = Lock("loki", token, 5, 10);
- if (isLocked)
- {
- Console.WriteLine($"{token} begin reduce stocks (with lock) at {DateTime.Now}.");
- Thread.Sleep(1000);
- Console.WriteLine($"{token} release lock {UnLock("loki", token)} at {DateTime.Now}. ");
- }
- else
- {
- Console.WriteLine($"{token} begin reduce stocks at {DateTime.Now}.");
- }
- });
- }
可以看到三個并行任務(wù)依次獲取/釋放鎖
輸出總結(jié)
本文從基礎(chǔ)的線程安全、線程同步,認識到分布式鎖是跨主機的資源線程/進程同步方案, 以步步為營的風(fēng)格 演示了RedisSET命令做分布式鎖的設(shè)計考量,好記性不如爛筆頭。