用Go語言&&Redis實現(xiàn)分布式鎖,我還是第一次
一、為什么需要分布式鎖
共享資源訪問控制: 當(dāng)多個節(jié)點需要同時訪問共享資源時,為了避免并發(fā)寫入導(dǎo)致數(shù)據(jù)不一致,需要使用分布式鎖確保同時只有一個節(jié)點可以寫入或修改共享資源。
避免重復(fù)執(zhí)行: 在分布式系統(tǒng)中,某些操作可能需要在整個系統(tǒng)中只執(zhí)行一次,比如定時任務(wù)、數(shù)據(jù)初始化等。為了避免多個節(jié)點同時執(zhí)行這些操作,需要使用分布式鎖來確保只有一個節(jié)點可以執(zhí)行。
任務(wù)協(xié)調(diào): 在分布式任務(wù)隊列中,多個節(jié)點競爭執(zhí)行任務(wù)時,可能需要對任務(wù)進(jìn)行加鎖,以確保每個任務(wù)只被一個節(jié)點執(zhí)行,避免重復(fù)執(zhí)行或者操作沖突。
防止死鎖: 在分布式系統(tǒng)中,由于網(wǎng)絡(luò)延遲、節(jié)點故障等原因,可能會導(dǎo)致死鎖情況的發(fā)生。分布式鎖可以用來避免死鎖的發(fā)生,通過設(shè)置合理的超時時間和重試機制,確保鎖在一定時間內(nèi)被釋放。
分布式系統(tǒng)中共享同一個資源時,就需要分布式鎖來確保變更資源的一致性。這就是為什么要用到分布式鎖的原因咯。
二、分布式鎖需要具備特性
1 互斥性(Mutual Exclusion): 在任何時刻,只能有一個客戶端持有鎖,其他客戶端不能同時持有該鎖。這是最基本的鎖特性,確保在同一時間只有一個客戶端能夠訪問共享資源。
2 安全性(Safety): 在鎖被釋放之前,任何其他客戶端都不能獲得該鎖。即使是在網(wǎng)絡(luò)分區(qū)、節(jié)點故障等異常情況下,也要確保鎖的安全性,避免數(shù)據(jù)不一致或者操作沖突。
3 活性(Liveness): 鎖應(yīng)該能夠在合理的時間內(nèi)被獲取,避免長時間的等待導(dǎo)致死鎖或者無法響應(yīng)其他客戶端請求。活性也包括在鎖被釋放后,其他客戶端能夠盡快地獲取到該鎖。
4 容錯性(Fault Tolerance): 分布式系統(tǒng)中可能會發(fā)生網(wǎng)絡(luò)分區(qū)、節(jié)點故障等異常情況,分布式鎖需要具備容錯性,能夠在這些異常情況下正確地工作。比如,鎖的實現(xiàn)應(yīng)該能夠處理網(wǎng)絡(luò)分區(qū)導(dǎo)致的消息丟失或者超時等情況。
5 性能(Performance): 鎖的實現(xiàn)應(yīng)該盡可能地減少鎖競爭和通信開銷,提高系統(tǒng)的性能。例如,可以使用高效的算法和數(shù)據(jù)結(jié)構(gòu)來減少鎖的持有時間和等待時間,或者采用緩存和批處理等技術(shù)來減少通信開銷。
6 可擴展性(Scalability): 鎖的實現(xiàn)應(yīng)該能夠隨著系統(tǒng)規(guī)模的增長而擴展,確保在高并發(fā)和大規(guī)模的分布式環(huán)境下仍然能夠保持良好的性能和可用性。
三、實現(xiàn) Redis 鎖應(yīng)先掌握的知識點
set 命令
SET key value [EX seconds] [PX milliseconds] [NX|XX]
- EX second :設(shè)置鍵的過期時間為 second 秒。SET key value EX second 效果等同于 SETEX key second value。
- PX millisecond :設(shè)置鍵的過期時間為 millisecond 毫秒。SET key value PX millisecond ,效果等同于 PSETEX key millisecond value 。
- NX :鍵不存在時,才對鍵進(jìn)行設(shè)置操作。SET key value NX 等同于 SETNX key value 。
- XX :鍵已經(jīng)存在時,才對鍵進(jìn)行設(shè)置操作。
Redis.lua 腳本
我們可以使用 redis lua 腳本,將一系列命令操作封裝成 pipline,實現(xiàn)整體操作的原子性。
加鎖的整個流程,詳細(xì)原理說明看注釋
-- Lua 腳本實現(xiàn) Redis 分布式鎖
-- 生成唯一標(biāo)識
local requestId = ARGV[1]
-- 嘗試獲取鎖
local lockKey = KEYS[1]
local lockValue = requestId
local lockExpireTime = tonumber(ARGV[2])
local result = redis.call('SET', lockKey, lockValue, 'NX', 'PX', lockExpireTime)
-- 判斷獲取鎖的結(jié)果
if result == 'OK' then
-- 獲取鎖成功,設(shè)置鎖的過期時間
return 'OK'
else
-- 獲取鎖失敗
return 'FAIL'
end
1 生成唯一標(biāo)識: 首先,在客戶端生成一個唯一的標(biāo)識,可以是 UUID、Snowflake 算法生成的分布式 ID 等。
2 嘗試獲取鎖: 客戶端將生成的唯一標(biāo)識作為參數(shù),調(diào)用 Redis 的 SET 命令嘗試獲取鎖??梢允褂?nbsp;NX(如果鍵不存在則設(shè)置)和 PX(設(shè)置鍵的過期時間)選項,確保只有一個客戶端能夠成功獲取到鎖。
3 判斷獲取鎖的結(jié)果: 如果獲取鎖成功,SET 命令會返回 OK,表示當(dāng)前客戶端成功獲取了鎖。如果獲取鎖失敗,說明已經(jīng)有其他客戶端持有了鎖,此時客戶端需要進(jìn)行等待或者返回失敗。
4 設(shè)置鎖的過期時間: 在成功獲取鎖之后,客戶端需要設(shè)置鎖的過期時間,以防止因為客戶端崩潰或者其他原因?qū)е骆i一直占用,造成死鎖。
5 返回獲取鎖的結(jié)果: 根據(jù) SET 命令的返回值,客戶端判斷是否成功獲取到了鎖,并將結(jié)果返回給調(diào)用方。
加鎖流程圖
圖片
解鎖流程
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
1 使用 KEYS[1] 獲取傳入的鎖鍵名。
2 使用 ARGV[1] 獲取傳入的鎖值(即加鎖時設(shè)置的唯一標(biāo)識)。
3 判斷當(dāng)前鎖是否存在且鎖值與傳入的鎖值相同,若是,則調(diào)用 DEL 命令刪除該鎖,并返回 1 表示解鎖成功。
4 若鎖不存在或鎖值不匹配,則返回 0 表示解鎖失敗。
解鎖的流程圖
圖片
源碼解析
package redis
import (
"math/rand"
"strconv"
"sync/atomic"
"time"
red "github.com/go-redis/redis"
"github.com/tal-tech/go-zero/core/logx"
)
const (
letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
randomLen = 16
// 默認(rèn)超時時間,用來防止死鎖
tolerance = 300 // milliseconds
millisPerSecond = 800
lockCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
return "OK"
else
return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
end`
delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end`
)
type redisLock struct {
// redis客戶端
store *Redis
// 超時時間
seconds uint32
// 鎖key
keys string
// 鎖value,防止鎖被別人獲取到
value string
}
func init() {
rand.Seed(time.Now().UnixNano())
}
// NewRedisLock returns a RedisLock.
func NewRedisLock(store *Redis, keys string) *RedisLock {
return &RedisLock{
store: store,
keys: keys,
// 獲取鎖時,鎖的值通過隨機字符串生成
// 實際上go-zero提供更加高效的隨機字符串生成方式
// 見core/stringx/random.go:Randn
value: randomStr(randomLen),
}
}
// Acquire acquires the lock.
// 加鎖
func (rl *RedisLock) Acquire() (bool, error) {
// 獲取過期時間
seconds := atomic.LoadUint32(&rl.seconds)
// 默認(rèn)鎖過期時間為500ms,防止死鎖
resp, err := rl.store.Eval(lockCommand, []string{rl.keys}, []string{
rl.value, strconv.Itoa(int(seconds)*millisPerSecond + tolerance),
})
if err == red.Nil {
return false, nil
} else if err != nil {
logx.Errorf("Error on lock for %s, %s", rl.key, err.Error())
return false, err
} else if resp == nil {
return false, nil
}
reply, ok := resp.(string)
if ok && reply == "OK" {
return true, nil
}
logx.Errorf("Unknown reply lock for %s: %v", rl.keys, resp)
return false, nil
}
// Release releases the lock.
// 釋放鎖
func (rl *RedisLock) Release() (bool, error) {
resp, err := rl.store.Eval(delCommand, []string{rl.keys}, []string{rl.value})
if err != nil {
return false, err
}
reply, ok := resp.(int64)
if !ok {
return false, nil
}
return reply == 1, nil
}
func randomStr(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
// SetExpire sets the expire.
// 需要注意的是需要在Acquire()之前調(diào)用
// 不然默認(rèn)為300ms自動釋放
func (rl *RedisLock) SetExpire(seconds int) {
atomic.StoreUint32(&rl.seconds, uint32(seconds))
}
這個詳細(xì)源碼根據(jù)自己的業(yè)務(wù)需要,可以利用。