基于Redis的分布式鎖和Redlock算法
本文轉載自微信公眾號「UP技術控」,作者conan5566。轉載本文請聯(lián)系UP技術控公眾號。
在單進程的系統(tǒng)中,當存在多個線程可以同時改變某個變量(可變共享變量)時,就需要對變量或代碼塊做同步,使其在修改這種變量時能夠線性執(zhí)行消除并發(fā)修改變量。
而同步的本質是通過鎖來實現(xiàn)的。為了實現(xiàn)多個線程在一個時刻同一個代碼塊只能有一個線程可執(zhí)行,那么需要在某個地方做個標記,這個標記必須每個線程都能看到,當標記不存在時可以設置該標記,其余后續(xù)線程發(fā)現(xiàn)已經(jīng)有標記了則等待擁有標記的線程結束同步代碼塊取消標記后再去嘗試設置標記。這個標記可以理解為鎖。
不同地方實現(xiàn)鎖的方式也不一樣,只要能滿足所有線程都能看得到標記即可。如 Java 中 synchronize 是在對象頭設置標記,Lock 接口的實現(xiàn)類基本上都只是某一個 volitile 修飾的 int 型變量其保證每個線程都能擁有對該 int 的可見性和原子修改,linux 內核中也是利用互斥量或信號量等內存數(shù)據(jù)做標記。
除了利用內存數(shù)據(jù)做鎖其實任何互斥的都能做鎖(只考慮互斥情況),如流水表中流水號與時間結合做冪等校驗可以看作是一個不會釋放的鎖,或者使用某個文件是否存在作為鎖等。只需要滿足在對標記進行修改能保證原子性和內存可見性即可。
1 什么是分布式?
分布式的 CAP 理論告訴我們:
任何一個分布式系統(tǒng)都無法同時滿足一致性(Consistency)、可用性(Availability)和分區(qū)容錯性(Partition tolerance),最多只能同時滿足兩項。
目前很多大型網(wǎng)站及應用都是分布式部署的,分布式場景中的數(shù)據(jù)一致性問題一直是一個比較重要的話題?;? CAP理論,很多系統(tǒng)在設計之初就要對這三者做出取舍。在互聯(lián)網(wǎng)領域的絕大多數(shù)的場景中,都需要犧牲強一致性來換取系統(tǒng)的高可用性,系統(tǒng)往往只需要保證最終一致性。
分布式場景
此處主要指集群模式下,多個相同服務同時開啟.
在許多的場景中,我們?yōu)榱吮WC數(shù)據(jù)的最終一致性,需要很多的技術方案來支持,比如分布式事務、分布式鎖等。很多時候我們需要保證一個方法在同一時間內只能被同一個線程執(zhí)行。在單機環(huán)境中,通過 Java 提供的并發(fā) API 我們可以解決,但是在分布式環(huán)境下,就沒有那么簡單啦。
- 分布式與單機情況下最大的不同在于其不是多線程而是多進程。
- 多線程由于可以共享堆內存,因此可以簡單的采取內存作為標記存儲位置。而進程之間甚至可能都不在同一臺物理機上,因此需要將標記存儲在一個所有進程都能看到的地方。
什么是分布式鎖?
- 當在分布式模型下,數(shù)據(jù)只有一份(或有限制),此時需要利用鎖的技術控制某一時刻修改數(shù)據(jù)的進程數(shù)。
- 與單機模式下的鎖不僅需要保證進程可見,還需要考慮進程與鎖之間的網(wǎng)絡問題。(我覺得分布式情況下之所以問題變得復雜,主要就是需要考慮到網(wǎng)絡的延時和不可靠。。。一個大坑)
- 分布式鎖還是可以將標記存在內存,只是該內存不是某個進程分配的內存而是公共內存如 Redis、Memcache。至于利用數(shù)據(jù)庫、文件等做鎖與單機的實現(xiàn)是一樣的,只要保證標記能互斥就行。
2 我們需要怎樣的分布式鎖?
可以保證在分布式部署的應用集群中,同一個方法在同一時間只能被一臺機器上的一個線程執(zhí)行。
- 這把鎖要是一把可重入鎖(避免死鎖)
- 這把鎖最好是一把阻塞鎖(根據(jù)業(yè)務需求考慮要不要這條)
- 這把鎖最好是一把公平鎖(根據(jù)業(yè)務需求考慮要不要這條)
有高可用的獲取鎖和釋放鎖功能
獲取鎖和釋放鎖的性能要好
代碼實現(xiàn)
- public interface IDistributedLock
- {
- ILockResult Lock(string resourceKey);
- ILockResult Lock(string resourceKey, TimeSpan expiryTime);
- ILockResult Lock(string resourceKey, TimeSpan expiryTime, TimeSpan waitTime, TimeSpan retryTime);
- ILockResult Lock(string resourceKey, TimeSpan expiryTime, TimeSpan waitTime, TimeSpan retryTime, CancellationToken cancellationToken);
- Task<ILockResult> LockAsync(string resourceKey);
- Task<ILockResult> LockAsync(string resourceKey, TimeSpan expiryTime);
- Task<ILockResult> LockAsync(string resourceKey, TimeSpan expiryTime, TimeSpan waitTime, TimeSpan retryTime);
- Task<ILockResult> LockAsync(string resourceKey, TimeSpan expiryTime, TimeSpan waitTime, TimeSpan retryTime, CancellationToken cancellationToken);
- }
- public interface ILockResult : IDisposable
- {
- string LockId { get; }
- bool IsAcquired { get; }
- int ExtendCount { get; }
- }
- class EndPoint:RedLock.RedisLockEndPoint
- {
- private readonly string _connectionString;
- public EndPoint(string connectionString)
- {
- _connectionString = connectionString;
- //139.196.40.252,password=xstudio,defaultDatabase=9
- var connection = connectionString.Split(',');
- var dict = new Dictionary<string, string>();
- foreach (var item in connection)
- {
- var keypar = item.Split('=');
- if (keypar.Length>1)
- {
- dict[keypar[0]] = keypar[1];
- }
- }
- this.EndPoint = new System.Net.DnsEndPoint(connection[0], 6379);
- if (dict.TryGetValue("password", out string password))
- {
- this.Password = password;
- }
- if (dict.TryGetValue("defaultDatabase", out string defaultDatabase) && int.TryParse(defaultDatabase,out int database))
- {
- RedisDatabase = database;
- }
- }
- }
- [Export(typeof(IDistributedLock))]
- class InnerLock : IDistributedLock
- {
- private static Lazy<RedLock.RedisLockFactory> _factory;
- static InnerLock()
- {
- _factory = new Lazy<RedisLockFactory>(() => new RedisLockFactory(new EndPoint(ConfigurationManager.AppSettings["Redis"])), System.Threading.LazyThreadSafetyMode.ExecutionAndPublication);
- }
- public ILockResult Lock(string resourceKey)
- {
- return new LockResult(_factory.Value.Create(resourceKey, TimeSpan.FromDays(1)));
- }
- public ILockResult Lock(string resourceKey, TimeSpan expiryTime)
- {
- return new LockResult(_factory.Value.Create(resourceKey, expiryTime));
- }
- public ILockResult Lock(string resourceKey, TimeSpan expiryTime, TimeSpan waitTime, TimeSpan retryTime)
- {
- return new LockResult(_factory.Value.Create(resourceKey, expiryTime, waitTime, retryTime));
- }
- public ILockResult Lock(string resourceKey, TimeSpan expiryTime, TimeSpan waitTime, TimeSpan retryTime, CancellationToken cancellationToken)
- {
- return new LockResult(_factory.Value.Create(resourceKey, expiryTime, waitTime, retryTime, cancellationToken));
- }
- public async Task<ILockResult> LockAsync(string resourceKey)
- {
- var result = await _factory.Value.CreateAsync(resourceKey, TimeSpan.FromDays(1));
- return new LockResult(result);
- }
- public async Task<ILockResult> LockAsync(string resourceKey, TimeSpan expiryTime)
- {
- var result = await _factory.Value.CreateAsync(resourceKey, expiryTime);
- return new LockResult(result);
- }
- public async Task<ILockResult> LockAsync(string resourceKey, TimeSpan expiryTime, TimeSpan waitTime, TimeSpan retryTime)
- {
- var result = await _factory.Value.CreateAsync(resourceKey, expiryTime, waitTime, retryTime);
- return new LockResult(result);
- }
- public async Task<ILockResult> LockAsync(string resourceKey, TimeSpan expiryTime, TimeSpan waitTime, TimeSpan retryTime, CancellationToken cancellationToken)
- {
- var result = await _factory.Value.CreateAsync(resourceKey, expiryTime, waitTime, retryTime, cancellationToken);
- return new LockResult(result);
- }
- }
- class LockResult : ILockResult
- {
- private IRedisLock _lock;
- public LockResult(IRedisLock redisLock)
- {
- _lock = redisLock;
- }
- public string LockId => _lock.LockId;
- public bool IsAcquired => _lock.IsAcquired;
- public int ExtendCount => _lock.ExtendCount;
- public void Dispose()
- {
- _lock.Dispose();
- }
- }