軟件系統(tǒng)限流的底層原理解析
作者 | 騰訊云天御業(yè)務(wù)安全工程師 knightwwang
在軟件架構(gòu)中,限流是一種控制資源使用和保護(hù)系統(tǒng)安全的重要機(jī)制。它通過限制在一定時間內(nèi)可以處理的請求數(shù)量,來防止系統(tǒng)過載。
一、限流的目的
限流主要有兩個目的:
- 防止系統(tǒng)過載:確保系統(tǒng)在高負(fù)載情況下仍能保持穩(wěn)定運行。
- 保證服務(wù)質(zhì)量:為所有用戶提供公平的服務(wù),避免某些用戶占用過多資源。
二、限流算法的實現(xiàn)
1. 固定窗口計數(shù)器算法
固定窗口計數(shù)器算法是一種基本的限流方法,它通過在固定時間窗口內(nèi)跟蹤請求的數(shù)量來實現(xiàn)限流。
// 這是一個簡單的實現(xiàn)案例
package main
import (
"fmt"
"sync"
"time"
)
// FixedWindowCounter 結(jié)構(gòu)體實現(xiàn)固定窗口計數(shù)器限流算法。
// mu 用于同步訪問,保證并發(fā)安全。
// count 記錄當(dāng)前時間窗口內(nèi)的請求數(shù)量。
// limit 是時間窗口內(nèi)允許的最大請求數(shù)量。
// window 記錄當(dāng)前時間窗口的開始時間。
// duration 是時間窗口的持續(xù)時間。
type FixedWindowCounter struct {
mu sync.Mutex
count int
limit int
window time.Time
duration time.Duration
}
// NewFixedWindowCounter 構(gòu)造函數(shù)初始化 FixedWindowCounter 實例。
// limit 參數(shù)定義了每個時間窗口內(nèi)允許的請求數(shù)量。
// duration 參數(shù)定義了時間窗口的大小。
func NewFixedWindowCounter(limit int, duration time.Duration) *FixedWindowCounter {
return &FixedWindowCounter{
limit: limit,
window: time.Now(), // 設(shè)置當(dāng)前時間作為窗口的開始時間。
duration: duration, // 設(shè)置時間窗口的持續(xù)時間。
}
}
// Allow 方法用于判斷當(dāng)前請求是否被允許。
// 首先通過互斥鎖保證方法的原子性。
func (f *FixedWindowCounter) Allow() bool {
f.mu.Lock()
defer f.mu.Unlock()
now := time.Now() // 獲取當(dāng)前時間。
// 如果當(dāng)前時間超過了窗口的結(jié)束時間,重置計數(shù)器和窗口開始時間。
if now.After(f.window.Add(f.duration)) {
f.count = 0
f.window = now
}
// 如果當(dāng)前計數(shù)小于限制,則增加計數(shù)并允許請求。
if f.count < f.limit {
f.count++
return true
}
// 如果計數(shù)達(dá)到限制,則拒絕請求。
return false
}
// main 函數(shù)是程序的入口點。
func main() {
// 創(chuàng)建一個新的限流器,設(shè)置每分鐘(time.Minute)只允許10個請求。
limiter := NewFixedWindowCounter(10, time.Minute)
// 模擬15個請求,觀察限流效果。
for i := 0; i < 15; i++ {
if limiter.Allow() {
fmt.Println("Request", i+1, "allowed")
} else {
fmt.Println("Request", i+1, "rejected")
}
}
}
實現(xiàn)原理:固定窗口計數(shù)器算法通過設(shè)置一個固定的時間窗口(例如每分鐘)和一個在這個窗口內(nèi)允許的請求數(shù)量限制(例如10個請求)。在每個時間窗口開始時,計數(shù)器重置為零,隨著請求的到來,計數(shù)器遞增。當(dāng)計數(shù)器達(dá)到限制時,后續(xù)的請求將被拒絕,直到窗口重置。
優(yōu)點:
- 實現(xiàn)簡單直觀。
- 容易理解和實現(xiàn)。
- 可以保證在任何給定的固定時間窗口內(nèi),請求的數(shù)量不會超過設(shè)定的閾值。
缺點:
- 在窗口切換的瞬間可能會有請求高峰,因為計數(shù)器重置可能導(dǎo)致大量請求幾乎同時被處理。
- 無法平滑地處理突發(fā)流量,可能導(dǎo)致服務(wù)體驗不佳。
- 固定窗口計數(shù)器算法適用于請求分布相對均勻的場景,但在請求可能在短時間內(nèi)集中到達(dá)的場景下,可能需要考慮更復(fù)雜的限流算法,如滑動窗口或令牌桶算法。
2. 滑動窗口算法
滑動窗口算法是固定窗口計數(shù)器算法的一個改進(jìn),它通過覆蓋多個時間段來平滑請求流量,避免瞬時高峰。這種算法通常需要使用更高級的數(shù)據(jù)結(jié)構(gòu),如時間輪(Timing Wheel),來實現(xiàn)。
// 這是一個簡單的實現(xiàn)案例,這個代碼示例僅用于說明滑動窗口限流算法的邏輯,并非完整的工作代碼。
package main
import (
"fmt"
"sync"
"time"
)
// SlidingWindowLimiter 結(jié)構(gòu)體實現(xiàn)滑動窗口限流算法。
type SlidingWindowLimiter struct {
mutex sync.Mutex
counters []int
limit int
windowStart time.Time
windowDuration time.Duration
interval time.Duration
}
// NewSlidingWindowLimiter 構(gòu)造函數(shù)初始化 SlidingWindowLimiter 實例。
func NewSlidingWindowLimiter(limit int, windowDuration time.Duration, interval time.Duration) *SlidingWindowLimiter {
buckets := int(windowDuration / interval)
return &SlidingWindowLimiter{
counters: make([]int, buckets),
limit: limit,
windowStart: time.Now(),
windowDuration: windowDuration,
interval: interval,
}
}
// Allow 方法用于判斷當(dāng)前請求是否被允許,并實現(xiàn)滑動窗口的邏輯。
func (s *SlidingWindowLimiter) Allow() bool {
s.mutex.Lock()
defer s.mutex.Unlock()
// 檢查是否需要滑動窗口
if time.Since(s.windowStart) > s.windowDuration {
s.slideWindow()
}
now := time.Now()
index := int((now.UnixNano() - s.windowStart.UnixNano()) / s.interval.Nanoseconds()) % len(s.counters)
if s.counters[index] < s.limit {
s.counters[index]++
return true
}
return false
}
// slideWindow 方法實現(xiàn)滑動窗口邏輯,移除最舊的時間段并重置計數(shù)器。
func (s *SlidingWindowLimiter) slideWindow() {
// 滑動窗口,忽略最舊的時間段
copy(s.counters, s.counters[1:])
// 重置最后一個時間段的計數(shù)器
s.counters[len(s.counters)-1] = 0
// 更新窗口開始時間
s.windowStart = time.Now()
}
// main 函數(shù)是程序的入口點。
func main() {
limiter := NewSlidingWindowLimiter(1, time.Second, 10*time.Millisecond)
for i := 0; i < 100; i++ {
if limiter.Allow() {
fmt.Println("Request", i+1, "allowed")
} else {
fmt.Println("Request", i+1, "rejected")
}
}
}
實現(xiàn)原理:滑動窗口算法通過將時間分為多個小的時間段,每個時間段內(nèi)維護(hù)一個獨立的計數(shù)器。當(dāng)一個請求到達(dá)時,它會被分配到當(dāng)前時間所在的小時間段,并檢查該時間段的計數(shù)器是否已達(dá)到限制。如果未達(dá)到,則允許請求并增加計數(shù);如果已達(dá)到,則拒絕請求。隨著時間的推移,舊的時間段會淡出窗口,新的時間段會加入。
優(yōu)點:
- 相比固定窗口算法,滑動窗口算法能夠更平滑地處理請求,避免瞬時高峰。
- 可以提供更細(xì)致的流量控制。
缺點:
- 實現(xiàn)相對復(fù)雜,需要維護(hù)多個計數(shù)器和時間索引。
- 對內(nèi)存和計算的要求更高。
滑動窗口算法適用于需要平滑流量控制的場景,尤其是在面對突發(fā)流量時,能夠提供比固定窗口計數(shù)器更優(yōu)的流量控制效果。
3. 漏桶算法
漏桶算法是一種經(jīng)典的流量控制方法,特別適合于平滑突發(fā)流量,確保數(shù)據(jù)以均勻的速率被處理。
// 這是一個簡單的實現(xiàn)案例,這個代碼示例僅用于說明漏桶算法的基本邏輯,并非完整的工作代碼。
package main
import (
"fmt"
"time"
)
// LeakyBucket 結(jié)構(gòu)體,包含請求隊列
type LeakyBucket struct {
queue chan struct{} // 請求隊列
}
// NewLeakyBucket 創(chuàng)建一個新的漏桶實例
func NewLeakyBucket(capacity int) *LeakyBucket {
return &LeakyBucket{
queue: make(chan struct{}, capacity),
}
}
// push 將請求放入隊列,如果隊列滿了,返回 false,表示請求被丟棄
func (lb *LeakyBucket) push() bool {
// 如果通道可以發(fā)送,請求被接受
select {
case lb.queue <- struct{}{}:
return true
default:
return false
}
}
// process 從隊列中取出請求并模擬處理過程
func (lb *LeakyBucket) process() {
for range lb.queue { // 使用 range 來持續(xù)接收隊列中的請求
fmt.Println("Request processed at", time.Now().Format("2006-01-02 15:04:05"))
time.Sleep(100 * time.Millisecond) // 模擬請求處理時間
}
}
func main() {
lb := NewLeakyBucket(5) // 創(chuàng)建一個容量為5的漏桶
// 啟動請求處理循環(huán)
go lb.process()
// 模擬請求
for i := 0; i < 10; i++ {
accepted := lb.push()
if accepted {
fmt.Printf("Request %d accepted at %v\n", i+1, time.Now().Format("2006-01-02 15:04:05"))
} else {
fmt.Printf("Request %d rejected at %v\n", i+1, time.Now().Format("2006-01-02 15:04:05"))
}
}
time.Sleep(2 * time.Second)
}
實現(xiàn)原理:通過一個固定容量的隊列來模擬桶,以恒定速率從桶中取出請求進(jìn)行處理,無論請求到達(dá)的頻率如何,都保證請求以均勻的速度被處理,從而平滑流量并防止流量突增。
優(yōu)點:
- 能夠強(qiáng)制實現(xiàn)固定的數(shù)據(jù)處理速率,平滑流量。
- 即使面對突發(fā)流量,也能保持穩(wěn)定的處理速率。
缺點:
- 對于突發(fā)流量的處理不夠靈活,可能會延遲處理。
- 實現(xiàn)相對簡單,但需要維護(hù)桶的狀態(tài)。
漏桶算法適用于需要強(qiáng)制執(zhí)行固定速率處理的場景,如網(wǎng)絡(luò)流量控制、API請求限制等。通過控制令牌的添加速率,漏桶算法能夠有效地避免系統(tǒng)因瞬時流量高峰而過載。
4. 令牌桶算法
令牌桶算法是一種流行的限流算法,它允許一定程度的突發(fā)流量,同時保持長期的平均速率。
// 這是一個簡單的實現(xiàn)案例,這個代碼示例僅用于說明令牌桶算法的基本邏輯,并非完整的工作代碼。
package main
import (
"fmt"
"sync"
"time"
)
// TokenBucket 結(jié)構(gòu)體實現(xiàn)令牌桶限流算法。
// - mu 用于同步訪問,保證并發(fā)安全。
// - capacity 定義桶的容量,即桶中最多可以存放的令牌數(shù)。
// - tokens 表示桶中當(dāng)前的令牌數(shù)。
// - refillRate 是令牌的填充速率,表示每秒向桶中添加的令牌數(shù)。
// - lastRefill 記錄上次填充令牌的時間。
type TokenBucket struct {
mu sync.Mutex
capacity int
tokens int
refillRate float64
lastRefill time.Time
}
// NewTokenBucket 構(gòu)造函數(shù)初始化 TokenBucket 實例。
// - capacity 參數(shù)定義了桶的容量。
// - refillRate 參數(shù)定義了每秒向桶中添加的令牌數(shù)。
func NewTokenBucket(capacity int, refillRate float64) *TokenBucket {
// 初始化時桶被填滿,tokens 和 capacity 相等。
// lastRefill 設(shè)置為當(dāng)前時間。
return &TokenBucket{
capacity: capacity,
tokens: capacity,
refillRate: refillRate,
lastRefill: time.Now(),
}
}
// Allow 方法用于判斷當(dāng)前請求是否被允許。
func (t *TokenBucket) Allow() bool {
t.mu.Lock() // 進(jìn)入臨界區(qū),確保操作的原子性。
defer t.mu.Unlock()
now := time.Now() // 獲取當(dāng)前時間。
// 計算自上次填充以來經(jīng)過的秒數(shù),并轉(zhuǎn)換為float64類型。
timeElapsed := float64(now.Unix() - t.lastRefill.Unix())
// 根據(jù) refillRate 計算應(yīng)該添加的令牌數(shù)。
tokensToAdd := t.refillRate * timeElapsed
// 更新令牌數(shù),但不超過桶的容量。
t.tokens += int(tokensToAdd)
if t.tokens > t.capacity {
t.tokens = t.capacity // 確保令牌數(shù)不超過桶的容量。
}
// 如果桶中有令牌,則移除一個令牌并允許請求通過。
if t.tokens > 0 {
t.tokens-- // 移除一個令牌。
t.lastRefill = now // 更新上次填充時間到當(dāng)前時間。
return true
}
// 如果桶中無令牌,則請求被拒絕。
return false
}
// main 函數(shù)是程序的入口點。
func main() {
// 創(chuàng)建一個新的令牌桶實例,桶的容量為10,每秒填充2個令牌。
limiter := NewTokenBucket(10, 2)
// 模擬請求,觀察限流效果。
// 循環(huán)15次,每次請求判斷是否被允許。
for i := 0; i < 15; i++ {
if limiter.Allow() {
fmt.Println("Request", i+1, "allowed")
} else {
fmt.Println("Request", i+1, "rejected")
}
}
}
實現(xiàn)原理:令牌桶算法使用一個令牌桶來調(diào)節(jié)數(shù)據(jù)流的速率,允許一定程度的流量突發(fā)。桶初始時為空,并以固定的速率填充令牌,直至達(dá)到預(yù)設(shè)的容量上限。與漏桶算法不同,令牌桶算法在桶未滿時,可以在每個時間間隔內(nèi)向桶中添加多個令牌,從而積累處理突發(fā)請求的能力。當(dāng)請求到達(dá)時,如果桶中存在令牌,算法會從桶中移除相應(yīng)數(shù)量的令牌來處理請求。如果桶中的令牌不足,請求將被延遲處理或根據(jù)策略拒絕服務(wù)。如果桶已滿,額外的令牌將不會被添加,確保了令牌數(shù)量不會超過桶的容量限制。
優(yōu)點:
- 允許一定程度的突發(fā)流量,更加靈活。
- 可以平滑流量,同時在桶未滿時快速處理請求。
缺點:
- 實現(xiàn)相對復(fù)雜,需要維護(hù)桶的狀態(tài)和時間。
- 對于計算和同步的要求更高。
令牌桶算法適用于需要處理突發(fā)流量的場景,如網(wǎng)絡(luò)通信、API調(diào)用等。通過控制令牌的填充速率和桶的容量,令牌桶算法能夠有效地平衡流量,防止系統(tǒng)過載,同時允許在短期內(nèi)處理更多的請求。
三、限流的實現(xiàn)方式
限流可以通過不同的組件和層次實現(xiàn)
1. 應(yīng)用層限流
應(yīng)用層限流是在應(yīng)用程序的代碼中直接實現(xiàn)限流邏輯,這通常是通過使用中間件來完成的。中間件可以在處理請求之前先進(jìn)行限流檢查,以決定是否繼續(xù)處理請求或者返回錯誤信息。
// 這是一個偽代碼案例,演示實現(xiàn)邏輯
package main
import (
"fmt"
"github.com/gin-gonic/gin" // 引入Gin框架,用于構(gòu)建Web服務(wù)器和處理HTTP請求
"net/http"
"sync" // 引入sync包,用于同步原語,如互斥鎖
"time" // 引入time包,用于時間相關(guān)操作
)
// TokenBucket 結(jié)構(gòu)體實現(xiàn)令牌桶限流算法。
// 它包含互斥鎖mu用于同步訪問,capacity代表桶的容量,
// tokens表示當(dāng)前桶中的令牌數(shù),refillRate是令牌的填充速率(每秒),
// lastRefill記錄上次填充的時間。
type TokenBucket struct {
mu sync.Mutex
capacity int
tokens int
refillRate float64
lastRefill time.Time
}
// NewTokenBucket 函數(shù)創(chuàng)建并初始化一個新的TokenBucket實例。
// 它設(shè)置桶的容量和填充速率,并將初始令牌數(shù)設(shè)為容量的值。
func NewTokenBucket(capacity int, refillRate float64) *TokenBucket {
return &TokenBucket{
capacity: capacity,
tokens: capacity, // 初始化時桶被填滿
refillRate: refillRate,
lastRefill: time.Now(), // 記錄創(chuàng)建時的時間作為上次填充時間
}
}
// Allow 方法用于檢查是否允許通過當(dāng)前請求。
// 它首先獲取鎖,然后計算自上次填充以來應(yīng)該添加的令牌數(shù),
// 更新桶中的令牌數(shù),但不超過桶的容量。
// 如果桶中至少有一個令牌,它將減少一個令牌并返回true,表示請求被允許。
// 如果桶為空,則返回false,表示請求被拒絕。
func (tb *TokenBucket) Allow() bool {
tb.mu.Lock() // 獲取鎖,保證操作的原子性
defer tb.mu.Unlock()
now := time.Now() // 獲取當(dāng)前時間
// 計算自上次填充以來經(jīng)過的秒數(shù),然后乘以填充速率,得到應(yīng)添加的令牌數(shù)
tokensToAdd := int(tb.refillRate * (now.Sub(tb.lastRefill).Seconds()))
tb.tokens += tokensToAdd // 更新桶中的令牌數(shù)
if tb.tokens > tb.capacity {
tb.tokens = tb.capacity // 確保不超過桶的容量
}
if tb.tokens > 0 {
tb.tokens-- // 處理請求,減少一個令牌
tb.lastRefill = now // 更新上次填充時間為當(dāng)前時間
return true
}
return false // 如果桶為空,返回false
}
// Middleware 函數(shù)返回一個Gin中間件,該中間件使用TokenBucket來限流。
// 如果TokenBucket的Allow方法返回false,中間件將中斷請求處理,
// 并返回HTTP狀態(tài)碼429(Too Many Requests)和錯誤信息。
// 如果請求被允許,中間件將調(diào)用c.Next()繼續(xù)執(zhí)行后續(xù)的處理鏈。
func Middleware(tb *TokenBucket) gin.HandlerFunc {
return func(c *gin.Context) {
// 在處理請求之前,調(diào)用TokenBucket的Allow方法檢查是否允許請求
if !tb.Allow() {
// 如果請求被限流,返回錯誤信息和狀態(tài)碼
c.JSON(http.StatusTooManyRequests, gin.H{"error": "too many requests"})
c.Abort() // 中斷請求處理
return
}
// 如果請求未被限流,繼續(xù)執(zhí)行后續(xù)的處理鏈
c.Next()
}
}
func main() {
// 創(chuàng)建一個Gin的默認(rèn)實例,用于Web服務(wù)
r := gin.Default()
// 創(chuàng)建TokenBucket實例,用于限流控制
tb := NewTokenBucket(10, 1.0) // 桶的容量為10,每秒填充1個令牌
// 使用上面定義的限流中間件
r.Use(Middleware(tb))
// 定義一個簡單的路由,當(dāng)訪問/hello路徑時,返回JSON格式的消息
r.GET("/hello", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"message": "hello world"})
})
// 啟動Gin服務(wù)器,默認(rèn)監(jiān)聽在0.0.0.0:8080
r.Run()
}
實現(xiàn)原理:在Web應(yīng)用程序中,限流可以通過中間件實現(xiàn)。中間件在處理HTTP請求之前先執(zhí)行,可以用來進(jìn)行身份驗證、日志記錄、限流等操作。在上述代碼中,創(chuàng)建了一個TokenBucket類型的限流器,并實現(xiàn)了一個Middleware函數(shù),該函數(shù)接收一個TokenBucket實例作為參數(shù),并返回一個Gin中間件處理器。中間件在處理請求時首先調(diào)用Allow方法檢查是否允許請求通過。
優(yōu)點:
- 易于實現(xiàn)和集成,可以輕松地添加到現(xiàn)有的Web應(yīng)用程序中。
- 細(xì)粒度控制,可以針對不同的路由或用戶應(yīng)用不同的限流策略。
缺點:
- 可能會增加請求處理的延遲,因為中間件需要在每次請求時進(jìn)行同步操作。
- 如果不恰當(dāng)?shù)厥褂?,可能會降低?yīng)用程序的并發(fā)處理能力。
應(yīng)用層限流適用于需要細(xì)粒度控制的場景,允許開發(fā)者根據(jù)具體的業(yè)務(wù)需求定制限流策略。通過合理配置限流器的參數(shù),可以在保證服務(wù)質(zhì)量的同時,提高應(yīng)用程序的吞吐量和穩(wěn)定性。
2. 代理層限流
代理層限流是在網(wǎng)絡(luò)通信的代理服務(wù)器層面實現(xiàn)限流,例如使用Nginx或HAProxy等代理服務(wù)器。這種方法可以在請求到達(dá)后端服務(wù)之前對它們進(jìn)行限制,從而保護(hù)后端服務(wù)不受過多請求的沖擊。
Nginx配置示例:
http {
# 定義一個限流區(qū)域,使用共享內(nèi)存存儲狀態(tài)
limit_req_zone $binary_remote_addr zone=mylimit:10m rate=1r/s;
server {
# 監(jiān)聽80端口
listen 80;
# 定義一個location塊,用于匹配特定的請求路徑
location /api/ {
# 應(yīng)用限流規(guī)則
limit_req zone=mylimit burst=5 nodelay;
# 代理請求到后端服務(wù)
proxy_pass http://backend/;
}
}
}
實現(xiàn)原理:在Nginx中,通過定義limit_req_zone指令創(chuàng)建一個限流區(qū)域,并指定使用共享內(nèi)存來存儲客戶端IP地址和對應(yīng)的請求計數(shù)。rate參數(shù)定義了每個客戶端每秒鐘允許的請求數(shù)量。在server塊中,使用limit_req指令引用之前定義的限流區(qū)域,并設(shè)置burst參數(shù)允許一定數(shù)量的突發(fā)請求。
優(yōu)點:
- 在網(wǎng)絡(luò)層面進(jìn)行限流,可以保護(hù)所有后端服務(wù),而不需要在每個應(yīng)用程序中單獨實現(xiàn)限流邏輯。
- 減輕了后端服務(wù)的負(fù)擔(dān),因為多余的請求在到達(dá)后端之前就被拒絕了。
- 配置靈活,可以針對不同的請求路徑和客戶端設(shè)置不同的限流規(guī)則。
缺點:
- 需要代理服務(wù)器支持限流功能,可能需要額外的配置和調(diào)優(yōu)。
- 對于分布式系統(tǒng),可能需要額外的機(jī)制來同步狀態(tài),確保全局的限流效果。
代理層限流適用于需要在多個服務(wù)或整個應(yīng)用層面控制請求的場景。通過合理配置代理服務(wù)器的限流規(guī)則,可以在不同的層面上保護(hù)系統(tǒng),提高整體的穩(wěn)定性和可用性。
3. 硬件層限流
在硬件層(如負(fù)載均衡器)實現(xiàn)限流,可以在請求到達(dá)應(yīng)用服務(wù)器之前進(jìn)行控制。
四、限流策略
限流策略是確保應(yīng)用程序能夠處理預(yù)期負(fù)載并防止過載的一系列規(guī)則和措施。
1.閾值設(shè)置
閾值設(shè)置是限流策略的基礎(chǔ),它決定了系統(tǒng)在單位時間內(nèi)能夠處理的最大請求數(shù)量。
偽代碼示例:
http {
# 定義一個限流區(qū)域,使用共享內(nèi)存存儲狀態(tài)
limit_req_zone $binary_remote_addr zone=mylimit:10m rate=1r/s;
server {
# 監(jiān)聽80端口
listen 80;
# 定義一個location塊,用于匹配特定的請求路徑
location /api/ {
# 應(yīng)用限流規(guī)則
limit_req zone=mylimit burst=5 nodelay;
# 代理請求到后端服務(wù)
proxy_pass http://backend/;
}
}
}
2.請求分類
請求分類允許對不同類型的請求應(yīng)用不同的限流規(guī)則,例如,對API的不同端點設(shè)置不同的閾值。
偽代碼示例:
// RouteLimiterMap 是一個映射,存儲每個路由路徑對應(yīng)的限流器實例。
// 鍵是路由的字符串表示,值是指向RateLimiterV2類型實例的指針。
var RouteLimiterMap = map[string]*RateLimiterV2{}
// SetRateLimiterForRoute 函數(shù)為指定的路由設(shè)置一個新的限流器。
// 它接受路由的路徑、桶的容量、每秒填充的令牌數(shù)和請求處理的閾值作為參數(shù),
// 并創(chuàng)建一個新的RateLimiterV2實例,將其存儲在RouteLimiterMap中。
func SetRateLimiterForRoute(route string, capacity int, refillRate float64, limit int) {
// 在RouteLimiterMap中為給定的路由創(chuàng)建或更新限流器實例。
RouteLimiterMap[route] = NewRateLimiterV2(capacity, refillRate, limit)
}
// MiddlewareWithRoute 函數(shù)返回一個Gin中間件處理函數(shù)。
// 該中間件基于路由名稱來應(yīng)用限流邏輯。
func MiddlewareWithRoute(route string) gin.HandlerFunc {
// 返回一個Gin的處理函數(shù),該函數(shù)內(nèi)部封裝了限流邏輯。
return func(c *gin.Context) {
// 檢查RouteLimiterMap中是否存在對應(yīng)路由的限流器。
// 如果存在,調(diào)用其Allow方法來決定當(dāng)前請求是否應(yīng)該被允許。
if !RouteLimiterMap[route].Allow() {
// 如果請求被限流(不允許),返回HTTP 429狀態(tài)碼和錯誤信息。
c.JSON(http.StatusTooManyRequests, gin.H{"error": "too many requests"})
c.Abort() // 中斷請求的進(jìn)一步處理。
return // 退出中間件函數(shù)。
}
// 如果請求未被限流,調(diào)用c.Next繼續(xù)執(zhí)行Gin的處理鏈。
c.Next()
}
}
3.反饋機(jī)制
反饋機(jī)制在請求被限流時向用戶提供適當(dāng)?shù)姆答?,如錯誤消息或重試后的時間。
偽代碼示例:
// AllowWithFeedback 提供反饋的請求允許邏輯。
func (r *RateLimiterV2) AllowWithFeedback() (bool, string) {
r.mu.Lock()
defer r.mu.Unlock()
// 令牌桶邏輯...
if r.tokens >= r.limit {
return false, "Too many requests. Please try again later."
}
// 允許請求邏輯...
r.tokens-- // 移除令牌
return true, ""
}
// 使用反饋機(jī)制的中間件。
func MiddlewareWithFeedback() gin.HandlerFunc {
return func(c *gin.Context) {
allowed, message := RouteLimiterMap["/api/"].AllowWithFeedback()
if !allowed {
c.JSON(http.StatusTooManyRequests, gin.H{"error": message})
c.Abort()
return
}
c.Next()
}
}
五、限流的考慮因素
在設(shè)計和實施限流機(jī)制時,需要綜合考慮多個關(guān)鍵因素以確保限流系統(tǒng)的有效性和公平性。
1.公平性
公平性是限流設(shè)計中的首要原則,確保所有用戶和客戶端能夠平等地訪問服務(wù)。
偽代碼示例:
// FairLimiter 結(jié)構(gòu)體實現(xiàn)基于用戶ID或IP的公平限流。
type FairLimiter struct {
sync.Mutex
limits map[string]*RateLimiterV2 // 為每個用戶或IP維護(hù)一個獨立的限流器
}
// NewFairLimiter 創(chuàng)建一個新的FairLimiter實例。
func NewFairLimiter(capacity int, refillRate float64) *FairLimiter {
return &FairLimiter{
limits: make(map[string]*RateLimiterV2),
}
}
// Allow 根據(jù)用戶ID或IP決定是否允許請求。
func (f *FairLimiter) Allow(userID string) (bool, string) {
f.Lock()
defer f.Unlock()
if _, exists := f.limits[userID]; !exists {
// 如果用戶沒有限流器,則創(chuàng)建一個新的。
f.limits[userID] = NewRateLimiterV2(capacity, refillRate, limit)
}
// 使用用戶的限流器檢查請求。
return f.limits[userID].AllowWithFeedback()
}
2.靈活性
靈活性意味著限流策略能夠適應(yīng)不同的流量模式和業(yè)務(wù)需求,例如在高流量期間放寬限制。
偽代碼示例:
// FlexibleLimiter 結(jié)構(gòu)體是一個靈活的限流器,允許在運行時動態(tài)調(diào)整限流參數(shù)。
type FlexibleLimiter struct {
sync.Mutex // 使用sync.Mutex提供互斥鎖功能,確保線程安全。
capacity int // 桶的容量,表示最多可以存儲的令牌數(shù)。
refillRate float64 // 令牌的填充速率,表示每秒可以新增的令牌數(shù)。
limit int // 請求處理的閾值,用于確定是否限流。
}
// SetParams 方法允許動態(tài)設(shè)置FlexibleLimiter的限流參數(shù)。
// 這些參數(shù)包括桶的容量、填充速率和請求處理的閾值。
func (f *FlexibleLimiter) SetParams(capacity int, refillRate float64, limit int) {
f.Lock() // 使用互斥鎖進(jìn)入臨界區(qū),防止并發(fā)訪問導(dǎo)致的數(shù)據(jù)不一致。
defer f.Unlock() // 離開臨界區(qū)前自動釋放鎖。
// 更新FlexibleLimiter的參數(shù)。
f.capacity, f.refillRate, f.limit = capacity, refillRate, limit
}
// Allow 方法根據(jù)FlexibleLimiter當(dāng)前的參數(shù)決定是否允許新的請求。
// 它首先基于當(dāng)前參數(shù)創(chuàng)建一個新的RateLimiterV2實例,然后調(diào)用它的AllowWithFeedback方法。
func (f *FlexibleLimiter) Allow() (bool, string) {
// 根據(jù)FlexibleLimiter當(dāng)前的容量、填充速率和閾值創(chuàng)建一個新的RateLimiterV2實例。
rl := NewRateLimiterV2(f.capacity, f.refillRate, f.limit)
// 調(diào)用RateLimiterV2的AllowWithFeedback方法,獲取是否允許請求的反饋。
// 這個方法返回一個布爾值表示是否允許請求,和一個字符串消息提供反饋信息。
return rl.AllowWithFeedback()
}
3.透明性
透明性要求限流規(guī)則和當(dāng)前狀態(tài)對用戶可見,使用戶能夠了解他們被限流的原因和情況。
偽代碼示例:
// TransparentLimiter 結(jié)構(gòu)體嵌入了RateLimiterV2,提供了額外的狀態(tài)信息,
// 包括當(dāng)前剩余的令牌數(shù),以增強(qiáng)限流機(jī)制的透明性。
type TransparentLimiter struct {
*RateLimiterV2 // 嵌入RateLimiterV2,獲得其所有功能。
currentTokens int // 存儲當(dāng)前桶中剩余的令牌數(shù)。
}
// AllowWithStatus 方法允許請求并返回當(dāng)前限流狀態(tài)。
// 它調(diào)用內(nèi)嵌RateLimiterV2的AllowWithFeedback方法來決定是否允許請求,
// 并獲取反饋消息,同時返回當(dāng)前剩余的令牌數(shù)。
func (t *TransparentLimiter) AllowWithStatus() (bool, string, int) {
allowed, message := t.RateLimiterV2.AllowWithFeedback() // 調(diào)用內(nèi)嵌限流器的允許邏輯。
return allowed, message, t.currentTokens // 返回是否允許、消息和當(dāng)前令牌數(shù)。
}
// MiddlewareWithTransparency 函數(shù)創(chuàng)建一個中間件,用于在HTTP響應(yīng)中包含限流狀態(tài)。
// 這個中間件包裝了下一個http.Handler,并在處理請求之前檢查限流狀態(tài)。
func MiddlewareWithTransparency(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// 創(chuàng)建或使用全局的transparentLimiter實例來檢查限流狀態(tài)。
allowed, message, tokens := transparentLimiter.AllowWithStatus()
// 如果請求被限流(不允許),則設(shè)置HTTP頭部信息和狀態(tài)碼,并返回錯誤消息。
if !allowed {
w.Header().Set("X-RateLimit-Remaining", fmt.Sprintf("%d", tokens)) // 設(shè)置剩余令牌數(shù)的頭部。
w.WriteHeader(http.StatusTooManyRequests) // 設(shè)置HTTP狀態(tài)碼為429。
fmt.Fprintln(w, message) // 寫入錯誤消息到響應(yīng)體。
return // 中斷請求處理。
}
// 如果請求未被限流,繼續(xù)執(zhí)行后續(xù)的處理鏈。
next.ServeHTTP(w, r)
})
}