精講Redis限流:多種方法與生產(chǎn)實(shí)踐
令牌桶算法實(shí)現(xiàn)限流
令牌桶算法是一種常見的限流算法,它通過維護(hù)一個(gè)固定容量的令牌桶來控制流量。每個(gè)請(qǐng)求需要獲取一個(gè)令牌,如果桶中沒有足夠的令牌,則請(qǐng)求會(huì)被限制。
首先,你需要在Redis中設(shè)置一個(gè)計(jì)數(shù)器和一個(gè)定時(shí)器來模擬令牌桶:
import redis
import time
# 連接Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)
# 設(shè)置令牌桶容量和每秒生成的令牌數(shù)
bucket_capacity = 10
tokens_per_second = 2
# 初始化令牌桶
r.set('tokens', bucket_capacity)
r.set('last_time', int(time.time()))
# 請(qǐng)求令牌的函數(shù)
def request_token():
current_time = int(time.time())
last_time = int(r.get('last_time'))
elapsed_time = current_time - last_time
# 計(jì)算新增的令牌數(shù)量
new_tokens = elapsed_time * tokens_per_second
current_tokens = int(r.get('tokens'))
# 更新令牌數(shù)量
if new_tokens + current_tokens > bucket_capacity:
r.set('tokens', bucket_capacity)
else:
r.set('tokens', new_tokens + current_tokens)
r.set('last_time', current_time)
# 使用令牌的代碼
def process_request():
if int(r.get('tokens')) > 0:
# 執(zhí)行你的請(qǐng)求處理邏輯
print('請(qǐng)求通過')
r.decr('tokens') # 消耗一個(gè)令牌
else:
print('請(qǐng)求被限制')
# 測(cè)試請(qǐng)求
for _ in range(15):
request_token()
process_request()
time.sleep(1)
這個(gè)示例中,我們通過Redis來維護(hù)令牌桶的狀態(tài),并在請(qǐng)求到來時(shí)檢查是否有足夠的令牌。如果有足夠的令牌,請(qǐng)求將被處理,否則請(qǐng)求將被限制。
漏桶算法實(shí)現(xiàn)限流
漏桶算法是另一種流量控制算法,它維護(hù)一個(gè)固定容量的漏桶,請(qǐng)求進(jìn)來后,會(huì)以固定速率從漏桶中排出。
以下是使用Redis實(shí)現(xiàn)漏桶算法的示例:
import redis
import time
# 連接Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)
# 設(shè)置漏桶容量和漏出速率(每秒排出的請(qǐng)求數(shù))
bucket_capacity = 10
leak_rate = 2
# 初始化漏桶
r.set('bucket_capacity', bucket_capacity)
r.set('last_leak_time', int(time.time()))
# 請(qǐng)求處理函數(shù)
def process_request():
current_time = int(time.time())
last_leak_time = int(r.get('last_leak_time'))
time_elapsed = current_time - last_leak_time
# 計(jì)算漏出的請(qǐng)求數(shù)
leaked_requests = min(int(r.get('bucket_capacity')) * (time_elapsed // 1), int(r.get('bucket_capacity')))
# 更新漏桶狀態(tài)
r.incrby('bucket_capacity', leaked_requests)
r.set('last_leak_time', current_time)
# 處理請(qǐng)求
if int(r.get('bucket_capacity')) >= 1:
print('請(qǐng)求通過')
r.decr('bucket_capacity')
else:
print('請(qǐng)求被限制')
# 測(cè)試請(qǐng)求
for _ in range(15):
process_request()
time.sleep(1)
在漏桶算法中,請(qǐng)求會(huì)被排入漏桶中,然后以固定速率漏出。如果漏桶中有請(qǐng)求,則請(qǐng)求會(huì)被處理,否則請(qǐng)求會(huì)被限制。
以上兩個(gè)案例雖然能夠?qū)崿F(xiàn)限流,但是存在一定的問題,無法滿足生產(chǎn)的要求,下面講一下其他思路。
有序集合zset實(shí)現(xiàn)限流
使用Redis的有序集合(ZSET)也可以實(shí)現(xiàn)限流功能。有序集合中的成員可以關(guān)聯(lián)一個(gè)分?jǐn)?shù),我們可以使用分?jǐn)?shù)來表示每個(gè)請(qǐng)求的權(quán)重或時(shí)間戳,并利用有序集合的排序特性來判斷請(qǐng)求是否被允許。
以下是使用有序集合實(shí)現(xiàn)基于時(shí)間窗口的限流示例:
import redis
import time
# 連接Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)
# 限流配置
max_requests = 10 # 在時(shí)間窗口內(nèi)允許的最大請(qǐng)求數(shù)
window_duration = 60 # 時(shí)間窗口的持續(xù)時(shí)間(秒)
# 請(qǐng)求處理函數(shù)
def process_request(user_id):
current_time = time.time()
zset_key = "requests:" + user_id
# 刪除時(shí)間窗口之外的請(qǐng)求記錄
r.zremrangebyscore(zset_key, '-inf', current_time - window_duration)
# 獲取當(dāng)前時(shí)間窗口內(nèi)的請(qǐng)求數(shù)
requests_in_window = r.zcard(zset_key)
if requests_in_window < max_requests:
# 如果請(qǐng)求數(shù)在限制范圍內(nèi),允許請(qǐng)求并記錄請(qǐng)求時(shí)間
r.zadd(zset_key, {str(current_time): current_time})
print('請(qǐng)求通過')
else:
print('請(qǐng)求被限制')
# 測(cè)試請(qǐng)求
user_id = "user123"
for _ in range(15):
process_request(user_id)
time.sleep(2)
在這個(gè)示例中,我們?yōu)槊總€(gè)用戶維護(hù)一個(gè)有序集合,其中成員是請(qǐng)求的時(shí)間戳,分?jǐn)?shù)也設(shè)置為時(shí)間戳。在處理請(qǐng)求時(shí),我們首先刪除時(shí)間窗口之外的請(qǐng)求記錄,然后檢查時(shí)間窗口內(nèi)的請(qǐng)求數(shù)是否超過了限制。如果沒有超過限制,允許請(qǐng)求并記錄請(qǐng)求時(shí)間戳。
這種方法可以實(shí)現(xiàn)基于時(shí)間窗口的限流,你可以根據(jù)需要調(diào)整max_requests和window_duration來配置限流策略。
但是這樣又引發(fā)了一個(gè)并發(fā)性問題
在分布式系統(tǒng)中,處理請(qǐng)求的并發(fā)性是一個(gè)重要考慮因素,特別是在多個(gè)客戶端同時(shí)發(fā)送請(qǐng)求的情況下。以下是一些常見的方法來確保process_request操作的并發(fā)安全性:
互斥鎖(Mutex Lock):使用互斥鎖可以確保在同一時(shí)刻只有一個(gè)線程或進(jìn)程可以執(zhí)行process_request操作。這可以通過在關(guān)鍵部分的代碼周圍放置鎖來實(shí)現(xiàn)。在Redis中,你可以使用Redis的SETNX(Set If Not Exists)命令來實(shí)現(xiàn)互斥鎖,確保只有一個(gè)客戶端可以獲取鎖并執(zhí)行請(qǐng)求處理操作。
def process_request(user_id):
lock_key = "lock:" + user_id
acquired_lock = r.setnx(lock_key, "1")
if acquired_lock:
try:
# 在獲取鎖后,執(zhí)行請(qǐng)求處理操作
current_time = time.time()
zset_key = "requests:" + user_id
# 刪除時(shí)間窗口之外的請(qǐng)求記錄
r.zremrangebyscore(zset_key, '-inf', current_time - window_duration)
# 獲取當(dāng)前時(shí)間窗口內(nèi)的請(qǐng)求數(shù)
requests_in_window = r.zcard(zset_key)
if requests_in_window < max_requests:
# 如果請(qǐng)求數(shù)在限制范圍內(nèi),允許請(qǐng)求并記錄請(qǐng)求時(shí)間
r.zadd(zset_key, {str(current_time): current_time})
print('請(qǐng)求通過')
else:
print('請(qǐng)求被限制')
finally:
# 釋放鎖
r.delete(lock_key)
else:
print('無法獲取鎖,請(qǐng)求被限制')
分布式鎖:如果你的系統(tǒng)是分布式的,你可以考慮使用分布式鎖來確保不同節(jié)點(diǎn)上的請(qǐng)求處理代碼不會(huì)同時(shí)執(zhí)行。一些常見的分布式鎖實(shí)現(xiàn)包括基于ZooKeeper或Redis的分布式鎖。這些鎖可以協(xié)調(diào)不同節(jié)點(diǎn)之間的并發(fā)執(zhí)行。
事務(wù):Redis支持事務(wù),你可以使用MULTI和EXEC命令將多個(gè)操作包裝在一個(gè)事務(wù)中。在這種情況下,Redis會(huì)確保整個(gè)事務(wù)要么全部成功執(zhí)行,要么全部失敗,從而保證一致性。
這種雖然能解決問題,但是并不是最優(yōu)解
EXEC + lua 實(shí)現(xiàn)
使用Redis的EXEC命令和Lua腳本可以確保多個(gè)Redis命令在一個(gè)事務(wù)中執(zhí)行,從而保證一致性。下面是一個(gè)使用EXEC和Lua腳本來實(shí)現(xiàn)請(qǐng)求處理的示例:
import redis
# 連接Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)
# 限流配置
max_requests = 10 # 在時(shí)間窗口內(nèi)允許的最大請(qǐng)求數(shù)
window_duration = 60 # 時(shí)間窗口的持續(xù)時(shí)間(秒)
# Lua腳本,用于限流處理
lua_script = """
local user_id = KEYS[1]
local max_requests = tonumber(ARGV[1])
local window_duration = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])
-- 刪除時(shí)間窗口之外的請(qǐng)求記錄
redis.call('ZREMRANGEBYSCORE', 'requests:'..user_id, '-inf', current_time - window_duration)
-- 獲取當(dāng)前時(shí)間窗口內(nèi)的請(qǐng)求數(shù)
local requests_in_window = redis.call('ZCARD', 'requests:'..user_id)
if requests_in_window < max_requests then
-- 如果請(qǐng)求數(shù)在限制范圍內(nèi),允許請(qǐng)求并記錄請(qǐng)求時(shí)間
redis.call('ZADD', 'requests:'..user_id, current_time, current_time)
return 'ALLOWED'
else
return 'LIMITED'
end
"""
# 請(qǐng)求處理函數(shù)
def process_request(user_id):
current_time = int(time.time())
result = r.eval(lua_script, 1, user_id, max_requests, window_duration, current_time)
if result == b'ALLOWED':
print('請(qǐng)求通過')
else:
print('請(qǐng)求被限制')
# 測(cè)試請(qǐng)求
user_id = "user123"
for _ in range(15):
process_request(user_id)
time.sleep(2)
在上述示例中,我們使用Lua腳本編寫了一個(gè)與之前的請(qǐng)求處理邏輯相同的限流處理邏輯。然后,我們通過eval命令將Lua腳本傳遞給Redis,并在一個(gè)事務(wù)中執(zhí)行它。這樣可以確保在同一事務(wù)內(nèi)執(zhí)行多個(gè)Redis命令,從而保證了一致性。
請(qǐng)注意,在Lua腳本中,我們使用了Redis的命令來執(zhí)行限流邏輯,然后根據(jù)結(jié)果返回相應(yīng)的值,以便在Python中進(jìn)行處理。如果請(qǐng)求被限制,Lua腳本返回'LIMITED',否則返回'ALLOWED'。
通過這種方式,你可以使用Redis實(shí)現(xiàn)基于令牌桶算法的限流功能??梢愿鶕?jù)需要調(diào)整令牌桶容量和生成速率來滿足你的應(yīng)用需求。此外,需要注意在高并發(fā)情況下,需要謹(jǐn)慎處理并發(fā)問題。