一篇帶給你Go并發(fā)編程Singleflight
這一篇文章的內(nèi)容是在 Week05: 評論系統(tǒng)架構(gòu)設(shè)計 當(dāng)中的可用性設(shè)計當(dāng)中提到的,但是這個屬于 Go 官方擴(kuò)展同步包 (golang.org/x/sync/singleflight) 的一個庫,為了讓內(nèi)容統(tǒng)一就放到這里了。
SingleFlight
為什么我們需要 SingleFlight(使用場景)?
一般情況下我們在寫一寫對外的服務(wù)的時候都會有一層 cache 作為緩存,用來減少底層數(shù)據(jù)庫的壓力,但是在遇到例如 redis 抖動或者其他情況可能會導(dǎo)致大量的 cache miss 出現(xiàn)。
如下圖所示,可能存在來自桌面端和移動端的用戶有 1000 的并發(fā)請求,他們都訪問的獲取文章列表的接口,獲取前 20 條信息,如果這個時候我們服務(wù)直接去訪問 redis 出現(xiàn) cache miss 那么我們就會去請求 1000 次數(shù)據(jù)庫,這時可能會給數(shù)據(jù)庫帶來較大的壓力(這里的 1000 只是一個例子,實際上可能遠(yuǎn)大于這個值)導(dǎo)致我們的服務(wù)異?;蛘叱瑫r。
這時候就可以使用 singleflight 庫了,直譯過來就是單飛,這個庫的主要作用就是將一組相同的請求合并成一個請求,實際上只會去請求一次,然后對所有的請求返回相同的結(jié)果。如下圖所示,使用 singleflight 之后,我們在一個請求的時間周期內(nèi)實際上只會向底層的數(shù)據(jù)庫發(fā)起一次請求大大減少對數(shù)據(jù)庫的壓力。
SingleFlight 包怎么用(使用教程)?
函數(shù)簽名
主要是一個 Group 結(jié)構(gòu)體,三個方法,具體信息看下方注釋
- type Group
- // Do 執(zhí)行函數(shù), 對同一個 key 多次調(diào)用的時候,在第一次調(diào)用沒有執(zhí)行完的時候
- // 只會執(zhí)行一次 fn 其他的調(diào)用會阻塞住等待這次調(diào)用返回
- // v, err 是傳入的 fn 的返回值
- // shared 表示是否真正執(zhí)行了 fn 返回的結(jié)果,還是返回的共享的結(jié)果
- func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
- // DoChan 和 Do 類似,只是 DoChan 返回一個 channel,也就是同步與異步的區(qū)別
- func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
- // Forget 用于通知 Group 刪除某個 key 這樣后面繼續(xù)這個 key 的調(diào)用的時候就不會在阻塞等待了
- func (g *Group) Forget(key string)
使用示例
接下來我們看看實際上我們是怎么使用的,先使用一個普通的例子,這時一個獲取文章詳情的函數(shù),我們在函數(shù)里面使用一個 count 模擬不同并發(fā)下的耗時的不同,并發(fā)越多請求耗時越多
- func getArticle(id int) (article string, err error) {
- // 假設(shè)這里會對數(shù)據(jù)庫進(jìn)行調(diào)用, 模擬不同并發(fā)下耗時不同
- atomic.AddInt32(&count, 1)
- time.Sleep(time.Duration(count) * time.Millisecond)
- return fmt.Sprintf("article: %d", id), nil
- }
我們使用 singleflight 的時候就只需要 new(singleflight.Group) 然后調(diào)用一下相對應(yīng)的 Do 方法就可了,是不是很簡單
- func singleflightGetArticle(sg *singleflight.Group, id int) (string, error) {
- v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
- return getArticle(id)
- })
- return v.(string), err
- }
效果測試
光說不練假把式,寫一個簡單的測試代碼,下面我們啟動 1000 個 Goroutine 去并發(fā)調(diào)用這兩個方法
- var count int32
- func main() {
- time.AfterFunc(1*time.Second, func() {
- atomic.AddInt32(&count, -count)
- })
- var (
- wg sync.WaitGroup
- now = time.Now()
- n = 1000
- sg = &singleflight.Group{}
- )
- for i := 0; i < n; i++ {
- wg.Add(1)
- go func() {
- // res, _ := singleflightGetArticle(sg, 1)
- res, _ := getArticle(1)
- if res != "article: 1" {
- panic("err")
- }
- wg.Done()
- }()
- }
- wg.Wait()
- fmt.Printf("同時發(fā)起 %d 次請求,耗時: %s", n, time.Since(now))
- }
可以看到這個是調(diào)用 getArticle 方法的耗時,花費(fèi)了 1s 多
- # 直接調(diào)用的請求耗時
- ❯ go run ./1.go
- 同時發(fā)起 1000 次請求,耗時: 1.0022831s
而使用 singleflight 的方法,花費(fèi)了不到 3ms
- # 使用 singleflight 的請求耗時
- ❯ go run ./1.go
- 同時發(fā)起 1000 次請求,耗時: 2.5119ms
當(dāng)然每個庫都有自己的使用場景,軟件領(lǐng)域里面沒有銀彈,如果我們用的不太好的話甚至可能會得到適得其反的效果,而多看源碼不僅能夠幫助我們進(jìn)行學(xué)習(xí),也可以盡量少踩坑
它是如何實現(xiàn)的(源碼分析)?
本文基于 [https://pkg.go.dev/golang.org/x/sync@v0.0.0-20210220032951-036812b2e83c/singleflight](https://pkg.go.dev/golang.org/x/sync@v0.0.0-20210220032951-036812b2e83c/singleflight) 進(jìn)行分析,這個庫的實現(xiàn)很簡單,但是功能很強(qiáng)大,還有一些小技巧,非常值得學(xué)習(xí)
Group
- type Group struct {
- mu sync.Mutex // protects m
- m map[string]*call // lazily initialized
- }
Group 結(jié)構(gòu)體由一個互斥鎖和一個 map 組成,可以看到注釋 map 是懶加載的,所以 Group 只要聲明就可以使用,不用進(jìn)行額外的初始化零值就可以直接使用。call 保存了當(dāng)前調(diào)用對應(yīng)的信息,map 的鍵就是我們調(diào)用 Do 方法傳入的 key
- type call struct {
- wg sync.WaitGroup
- // 函數(shù)的返回值,在 wg 返回前只會寫入一次
- val interface{}
- err error
- // 使用調(diào)用了 Forgot 方法
- forgotten bool
- // 統(tǒng)計調(diào)用次數(shù)以及返回的 channel
- dups int
- chans []chan<- Result
- }
Do
- func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
- g.mu.Lock()
- // 前面提到的懶加載
- if g.m == nil {
- g.m = make(map[string]*call)
- }
- // 會先去看 key 是否已經(jīng)存在
- if c, ok := g.m[key]; ok {
- // 如果存在就會解鎖
- c.dups++
- g.mu.Unlock()
- // 然后等待 WaitGroup 執(zhí)行完畢,只要一執(zhí)行完,所有的 wait 都會被喚醒
- c.wg.Wait()
- // 這里區(qū)分 panic 錯誤和 runtime 的錯誤,避免出現(xiàn)死鎖,后面可以看到為什么這么做
- if e, ok := c.err.(*panicError); ok {
- panic(e)
- } else if c.err == errGoexit {
- runtime.Goexit()
- }
- return c.val, c.err, true
- }
- // 如果我們沒有找到這個 key 就 new call
- c := new(call)
- // 然后調(diào)用 waitgroup 這里只有第一次調(diào)用會 add 1,其他的都會調(diào)用 wait 阻塞掉
- // 所以這要這次調(diào)用返回,所有阻塞的調(diào)用都會被喚醒
- c.wg.Add(1)
- g.m[key] = c
- g.mu.Unlock()
- // 然后我們調(diào)用 doCall 去執(zhí)行
- g.doCall(c, key, fn)
- return c.val, c.err, c.dups > 0
- }
doCall
這個方法的實現(xiàn)有點(diǎn)意思,使用了兩個 defer 巧妙的將 runtime 的錯誤和我們傳入 function 的 panic 區(qū)別開來避免了由于傳入的 function panic 導(dǎo)致的死鎖
- func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
- normalReturn := false
- recovered := false
- // 第一個 defer 檢查 runtime 錯誤
- defer func() {
- }()
- // 使用一個匿名函數(shù)來執(zhí)行
- func() {
- defer func() {
- if !normalReturn {
- // 如果 panic 了我們就 recover 掉,然后 new 一個 panic 的錯誤
- // 后面在上層重新 panic
- if r := recover(); r != nil {
- c.err = newPanicError(r)
- }
- }
- }()
- c.val, c.err = fn()
- // 如果 fn 沒有 panic 就會執(zhí)行到這一步,如果 panic 了就不會執(zhí)行到這一步
- // 所以可以通過這個變量來判斷是否 panic 了
- normalReturn = true
- }()
- // 如果 normalReturn 為 false 就表示,我們的 fn panic 了
- // 如果執(zhí)行到了這一步,也說明我們的 fn recover 住了,不是直接 runtime exit
- if !normalReturn {
- recovered = true
- }
- }
再來看看第一個 defer 中的代碼
- defer func() {
- // 如果既沒有正常執(zhí)行完畢,又沒有 recover 那就說明需要直接退出了
- if !normalReturn && !recovered {
- c.err = errGoexit
- }
- c.wg.Done()
- g.mu.Lock()
- defer g.mu.Unlock()
- // 如果已經(jīng) forgot 過了,就不要重復(fù)刪除這個 key 了
- if !c.forgotten {
- delete(g.m, key)
- }
- if e, ok := c.err.(*panicError); ok {
- // 如果返回的是 panic 錯誤,為了避免 channel 死鎖,我們需要確保這個 panic 無法被恢復(fù)
- if len(c.chans) > 0 {
- go panic(e)
- select {} // Keep this goroutine around so that it will appear in the crash dump.
- } else {
- panic(e)
- }
- } else if c.err == errGoexit {
- // 已經(jīng)準(zhǔn)備退出了,也就不用做其他操作了
- } else {
- // 正常情況下向 channel 寫入數(shù)據(jù)
- for _, ch := range c.chans {
- ch <- Result{c.val, c.err, c.dups > 0}
- }
- }
- }()
DoChan
Do chan 和 Do 類似,其實就是一個是同步等待,一個是異步返回,主要實現(xiàn)上就是,如果調(diào)用 DoChan 會給 call.chans 添加一個 channel 這樣等第一次調(diào)用執(zhí)行完畢之后就會循環(huán)向這些 channel 寫入數(shù)據(jù)
- func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
- ch := make(chan Result, 1)
- g.mu.Lock()
- if g.m == nil {
- g.m = make(map[string]*call)
- }
- if c, ok := g.m[key]; ok {
- c.dups++
- c.chans = append(c.chans, ch)
- g.mu.Unlock()
- return ch
- }
- c := &call{chans: []chan<- Result{ch}}
- c.wg.Add(1)
- g.m[key] = c
- g.mu.Unlock()
- go g.doCall(c, key, fn)
- return ch
- }
Forget
forget 用于手動釋放某個 key 下次調(diào)用就不會阻塞等待了
- func (g *Group) Forget(key string) {
- g.mu.Lock()
- if c, ok := g.m[key]; ok {
- c.forgotten = true
- }
- delete(g.m, key)
- g.mu.Unlock()
- }
有哪些注意事項(避坑指南)?
單飛雖好但也不要濫用哦,還是存在一些坑的
1. 一個阻塞,全員等待
使用 singleflight 我們比較常見的是直接使用 Do 方法,但是這個極端情況下會導(dǎo)致整個程序 hang 住,如果我們的代碼出點(diǎn)問題,有一個調(diào)用 hang 住了,那么會導(dǎo)致所有的請求都 hang 住
還是之前的例子,我們加一個 select 模擬阻塞
- func singleflightGetArticle(sg *singleflight.Group, id int) (string, error) {
- v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
- // 模擬出現(xiàn)問題,hang 住
- select {}
- return getArticle(id)
- })
- return v.(string), err
- }
執(zhí)行就會發(fā)現(xiàn)死鎖了
- fatal error: all goroutines are asleep - deadlock!
- goroutine 1 [select (no cases)]:
這時候我們可以使用 DoChan 結(jié)合 select 做超時控制
- func singleflightGetArticle(ctx context.Context, sg *singleflight.Group, id int) (string, error) {
- result := sg.DoChan(fmt.Sprintf("%d", id), func() (interface{}, error) {
- // 模擬出現(xiàn)問題,hang 住
- select {}
- return getArticle(id)
- })
- select {
- case r := <-result:
- return r.Val.(string), r.Err
- case <-ctx.Done():
- return "", ctx.Err()
- }
- }
調(diào)用的時候傳入一個含 超時的 context 即可,執(zhí)行時就會返回超時錯誤
- ❯ go run ./1.go
- panic: context deadline exceeded
2. 一個出錯,全部出錯
這個本身不是什么問題,因為 singleflight 就是這么設(shè)計的,但是實際使用的時候 如果我們一次調(diào)用要 1s,我們的數(shù)據(jù)庫請求或者是 下游服務(wù)可以支撐 10rps 的請求的時候這會導(dǎo)致我們的錯誤閾提高,因為實際上我們可以一秒內(nèi)嘗試 10 次,但是用了 singleflight 之后只能嘗試一次,只要出錯這段時間內(nèi)的所有請求都會受影響
這種情況我們可以啟動一個 Goroutine 定時 forget 一下,相當(dāng)于將 rps 從 1rps 提高到了 10rps
- go func() {
- time.Sleep(100 * time.Millisecond)
- // logging
- g.Forget(key)
- }()
總結(jié)
這篇文章從使用場景,到使用方法,再到源碼分析和可能存在的坑給大家介紹了 singleflight,希望你能有所收獲,沒事看看官方的代碼還是很有收獲的,這次又學(xué)到了一個騷操作,用雙重 defer 來避免死鎖,你學(xué)廢了么?
我們下一篇會開啟一個新的系列,Go 可用性,敬請期待!
文章博客地址:https://lailin.xyz