Golang 實(shí)現(xiàn)熔斷機(jī)制
一些場(chǎng)景下,為了保障服務(wù)穩(wěn)定性會(huì)引入熔斷機(jī)制。本文介紹了用 Go 語(yǔ)言自己實(shí)現(xiàn)熔斷需要什么操作。
什么是熔斷?
熔斷是指在下游發(fā)生錯(cuò)誤時(shí)上游主動(dòng)關(guān)閉或限制對(duì)下游的請(qǐng)求。
原理
- 通常熔斷器分為三個(gè)時(shí)期:CLOSED,OPEN,HALFOPEN
- RPC 正常時(shí),為 CLOSED;
- 當(dāng) RPC 錯(cuò)誤增多時(shí),熔斷器會(huì)被觸發(fā),進(jìn)入 OPEN;
- OPEN 后經(jīng)過(guò)一定的冷卻時(shí)間,熔斷器變?yōu)?HALFOPEN;
- HALFOPEN 時(shí)會(huì)對(duì)下游進(jìn)行一些有策略的訪問(wèn),然后根據(jù)結(jié)果決定是變?yōu)?CLOSED,還是 OPEN;
總得來(lái)說(shuō)三個(gè)狀態(tài)的轉(zhuǎn)換大致如下圖:
Go 實(shí)現(xiàn)
https://github.com/rubyist/circuitbreaker
IsAllowed 是否允許請(qǐng)求,根據(jù)當(dāng)前狀態(tài)判斷
CLOSE 允許
OPEN
- 在 CoolingTimeout 冷卻時(shí)間內(nèi),不允許
- 過(guò)了冷卻時(shí)間,狀態(tài)變?yōu)?HALFOPEN,允許訪問(wèn)
HALFOPEN
- 在 DetectTimeout 檢測(cè)時(shí)間內(nèi),允許訪問(wèn)
- 否則不允許
atomic.StoreInt32((*int32)(&b.state), int32(HALFOPEN))
trip 判斷是否達(dá)到熔斷限額(可以自定義)
- type TripFunc func(Metricser) bool
- ThresholdTripFunc 錯(cuò)誤閾值
- ConsecutiveTripFunc 連續(xù)錯(cuò)誤超過(guò)閾值
- RateTripFunc 根據(jù)最少訪問(wèn)數(shù)和錯(cuò)誤率判斷
Metricser 訪問(wèn)統(tǒng)計(jì),包括成功數(shù)、失敗數(shù)、超時(shí)數(shù)、錯(cuò)誤率、采樣數(shù)、連續(xù)錯(cuò)誤數(shù)
- type Metricser interface {
- Fail() // records a failure
- Succeed() // records a success
- Timeout() // records a timeout
- Failures() int64 // return the number of failures
- Successes() int64 // return the number of successes
- Timeouts() int64 // return the number of timeouts
- ConseErrors() int64 // return the consecutive errors recently
- ErrorRate() float64 // rate = (timeouts + failures) / (timeouts + failures + successes)
- Samples() int64 // (timeouts + failures + successes)
- Counts() (successes, failures, timeouts int64)
- Reset()
- }
window 實(shí)現(xiàn)類
- type window struct {
- sync.RWMutex
- oldest int32 // oldest bucket index
- latest int32 // latest bucket index
- buckets []bucket // buckets this window holds
- bucketTime time.Duration // time each bucket holds
- bucketNums int32 // the numbe of buckets
- inWindow int32 // the number of buckets in the window
- allSuccess int64
- allFailure int64
- allTimeout int64
- conseErr int64
- }
- type bucket struct {
- failure int64
- success int64
- timeout int64
- }
用環(huán)形隊(duì)列實(shí)現(xiàn)動(dòng)態(tài)統(tǒng)計(jì)。把一個(gè)連續(xù)的時(shí)間切成多個(gè)小份,每一個(gè) bucket 保存 BucketTime 的統(tǒng)計(jì)數(shù)據(jù),BucketTime * BucketNums 是統(tǒng)計(jì)的時(shí)間區(qū)間。
每 BucketTime,會(huì)有一個(gè) bucket 過(guò)期
- if w.inWindow == w.bucketNums {
- // the lastest covered the oldest(latest == oldest)
- oldBucket := &w.buckets[w.oldest]
- atomic.AddInt64(&w.allSuccess, -oldBucket.Successes())
- atomic.AddInt64(&w.allFailure, -oldBucket.Failures())
- atomic.AddInt64(&w.allTimeout, -oldBucket.Timeouts())
- w.oldest++
- if w.oldest >= w.bucketNums {
- w.oldest = 0
- }
- } else {
- w.inWindow++
- }
- w.latest++
- if w.latest >= w.bucketNums {
- w.latest = 0
- }
- (&w.buckets[w.latest]).Reset()
Panel Metricser 的容器
PanelStateChangeHandler 熔斷事件
- type PanelStateChangeHandler func(key string, oldState, newState State, m Metricser)
缺陷
- 所有 breaker 公用同一個(gè) BucketTime,統(tǒng)計(jì)周期不支持更新
- 冷卻時(shí)間不支持動(dòng)態(tài)更新