被遺棄在角落里的 Sync.Cond
本文轉(zhuǎn)載自微信公眾號(hào)「Golang技術(shù)分享」,作者機(jī)器鈴砍菜刀。轉(zhuǎn)載本文請(qǐng)聯(lián)系Golang技術(shù)分享公眾號(hào)。
Go 語(yǔ)言通過(guò) go 關(guān)鍵字開(kāi)啟 goroutine 讓開(kāi)發(fā)者可以輕松地實(shí)現(xiàn)并發(fā)編程,而并發(fā)程序的有效運(yùn)行,往往離不開(kāi) sync 包的保駕護(hù)航。目前,sync 包的賦能列表包括:sync.atomic 下的原子操作、sync.Map 并發(fā)安全 map、sync.Mutex 與 sync.RWMutex 提供的互斥鎖與讀寫(xiě)鎖、sync.Pool 復(fù)用對(duì)象池、sync.Once 單例模式、 sync.Waitgroup 的多任務(wù)協(xié)作模式、sync.Cond 的監(jiān)視器模式。當(dāng)然,除了 sync 包,還有封裝層面更高的 channel 與 context。
要想寫(xiě)出合格的 Go 程序,以上的這些并發(fā)原語(yǔ)是必須要掌握的。對(duì)于大多數(shù) Gopher 而言,sync.Cond 應(yīng)該是最為陌生,本文將一探究竟。
初識(shí) sync.Cond
sync.Cond 字面意義就是同步條件變量,它實(shí)現(xiàn)的是一種監(jiān)視器(Monitor)模式。
In concurrent programming(also known as parallel programming), a monitor is a synchronization construct that allows threads to have both mutual exclusion and the ability to wait (block) for a certain condition to become false.
對(duì)于 Cond 而言,它實(shí)現(xiàn)一個(gè)條件變量,是 goroutine 間等待和通知的點(diǎn)。條件變量與共享的數(shù)據(jù)隔離,它可以同時(shí)阻塞多個(gè) goroutine,直到另外的 goroutine 更改了條件變量,并通知喚醒阻塞著的一個(gè)或多個(gè) goroutine。
初次接觸的讀者,可能會(huì)不太明白,那么下面我們看一下 GopherCon 2018 上《Rethinking Classical Concurrency Patterns》 中的演示代碼例子。
- 1type Item = int
- 2
- 3type Queue struct {
- 4 items []Item
- 5 itemAdded sync.Cond
- 6}
- 7
- 8func NewQueue() *Queue {
- 9 q := new(Queue)
- 10 q.itemAdded.L = &sync.Mutex{} // 為 Cond 綁定鎖
- 11 return q
- 12}
- 13
- 14func (q *Queue) Put(item Item) {
- 15 q.itemAdded.L.Lock()
- 16 defer q.itemAdded.L.Unlock()
- 17 q.items = append(q.items, item)
- 18 q.itemAdded.Signal() // 當(dāng) Queue 中加入數(shù)據(jù)成功,調(diào)用 Singal 發(fā)送通知
- 19}
- 20
- 21func (q *Queue) GetMany(n int) []Item {
- 22 q.itemAdded.L.Lock()
- 23 defer q.itemAdded.L.Unlock()
- 24 for len(q.items) < n { // 等待 Queue 中有 n 個(gè)數(shù)據(jù)
- 25 q.itemAdded.Wait() // 阻塞等待 Singal 發(fā)送通知
- 26 }
- 27 items := q.items[:n:n]
- 28 q.items = q.items[n:]
- 29 return items
- 30}
- 31
- 32func main() {
- 33 q := NewQueue()
- 34
- 35 var wg sync.WaitGroup
- 36 for n := 10; n > 0; n-- {
- 37 wg.Add(1)
- 38 go func(n int) {
- 39 items := q.GetMany(n)
- 40 fmt.Printf("%2d: %2d\n", n, items)
- 41 wg.Done()
- 42 }(n)
- 43 }
- 44
- 45 for i := 0; i < 100; i++ {
- 46 q.Put(i)
- 47 }
- 48
- 49 wg.Wait()
- 50}
在這個(gè)例子中,Queue 是存儲(chǔ)數(shù)據(jù) Item 的結(jié)構(gòu)體,它通過(guò) Cond 類(lèi)型的 itemAdded 來(lái)控制數(shù)據(jù)的輸入與輸出??梢宰⒁獾剑@里通過(guò) 10 個(gè) goroutine 來(lái)消費(fèi)數(shù)據(jù),但它們所需的數(shù)據(jù)量并不相等,我們可以稱(chēng)之為 batch,依次在 1-10 之間。之后,逐步添加 100 個(gè)數(shù)據(jù)至 Queue 中。最后,我們能夠看到 10 個(gè) gotoutine 都能被喚醒,得到它想要的數(shù)據(jù)。
程序運(yùn)行結(jié)果如下
- 1 6: [ 7 8 9 10 11 12]
- 2 5: [50 51 52 53 54]
- 3 9: [14 15 16 17 18 19 20 21 22]
- 4 1: [13]
- 5 2: [33 34]
- 6 4: [35 36 37 38]
- 7 3: [39 40 41]
- 8 7: [ 0 1 2 3 4 5 6]
- 9 8: [42 43 44 45 46 47 48 49]
- 010: [23 24 25 26 27 28 29 30 31 32]
當(dāng)然,程序每次運(yùn)行結(jié)果都不會(huì)相同,以上輸出只是某一種情況。
sync.Cond 實(shí)現(xiàn)
在 $GOPATH/src/sync/cond.go 中,Cond 的結(jié)構(gòu)體定義如下
- 1type Cond struct {
- 2 noCopy noCopy
- 3 L Locker
- 4 notify notifyList
- 5 checker copyChecker
- 6}
其中,noCopy 與 checker 字段均是為了避免 Cond 在使用過(guò)程中被復(fù)制,詳見(jiàn)小菜刀的 《no copy 機(jī)制》 一文。
L 是 Locker 接口,一般該字段的實(shí)際對(duì)象是 *RWmutex 或者 *Mutex。
- 1type Locker interface {
- 2 Lock()
- 3 Unlock()
- 4}
notifyList 記錄的是一個(gè)基于票號(hào)的通知列表,這里初次看注釋看不懂沒(méi)關(guān)系,和下文來(lái)回連貫著看。
- 1type notifyList struct {
- 2 wait uint32 // 用于記錄下一個(gè)等待者 waiter 的票號(hào)
- 3 notify uint32 // 用于記錄下一個(gè)應(yīng)該被通知的 waiter 的票號(hào)
- 4 lock uintptr // 內(nèi)部鎖
- 5 head unsafe.Pointer // 指向等待者 waiter 的隊(duì)列隊(duì)頭
- 6 tail unsafe.Pointer // 指向等待者 waiter 的隊(duì)列隊(duì)尾
- 7}
其中,head 與 tail 是指向 sudog 結(jié)構(gòu)體的指針,sudog 是代表的處于等待列表的 goroutine,它本身就是雙向鏈表。值得一提的是,在 sudog 中有一個(gè)字段 ticket 就是用于給當(dāng)前 goroutine 記錄票號(hào)使用的。
Cond 實(shí)現(xiàn)的核心模式為票務(wù)系統(tǒng)(ticket system),每一個(gè)想要來(lái)買(mǎi)票的 goroutine (調(diào)用Cond.Wait())我們稱(chēng)之為 waiter,票務(wù)系統(tǒng)會(huì)給每個(gè) waiter 分配一個(gè)取票碼,等供票方有該取票碼的號(hào)時(shí),就會(huì)喚醒 waiter。賣(mài)票的 goroutine 有兩種,第一種是調(diào)用 Cond.Signal() 的,它會(huì)按照票號(hào)喚醒一個(gè)買(mǎi)票的 waiter (如果有的話),第二種是調(diào)用 Cond.Broadcast() 的,它會(huì)通知喚醒所有的阻塞 waiter。為了方便讀者能夠比較輕松地理解票務(wù)系統(tǒng),下面我們給出圖解示例。
在 上文中,我們知道 Cond 字段中 notifyList 結(jié)構(gòu)體是一個(gè)記錄票號(hào)的通知列表。這里將 notifyList 比作排隊(duì)取票買(mǎi)電影票,當(dāng) G1 通過(guò) Wait 來(lái)買(mǎi)票時(shí),發(fā)現(xiàn)此時(shí)并沒(méi)有票可買(mǎi),因此他只能阻塞等待有票之后的通知,此時(shí)他手上已經(jīng)取得了專(zhuān)屬取票碼 0。同樣的,G2 和 G3 也同樣無(wú)票可買(mǎi),它們分別取到了自己的取票碼 1和 2。而 G4 是電影票提供商,它是賣(mài)票的,它通過(guò)兩次 Signal 先后帶來(lái)了兩張票,按照票號(hào)順序依次通知了 G1 和 G2 來(lái)取票,并把 notify 更新為了最新的 1。G5 也是買(mǎi)票的,它發(fā)現(xiàn)此時(shí)已經(jīng)無(wú)票可買(mǎi)了,拿了自己的取票碼 3 ,就阻塞等待了。G6 是個(gè)大票商,它通過(guò) Broadcast 可以滿(mǎn)足所有正在等待的買(mǎi)票者都買(mǎi)到票,此時(shí)等待的是 G3 和 G5,因此他直接喚醒了 G3 和 G5,并將 notify 更新到和 wait 值相等。
理解了上述取票系統(tǒng)的運(yùn)作原理后,我們下面來(lái)看 Cond 包下四個(gè)實(shí)際對(duì)外方法函數(shù)的實(shí)現(xiàn)。
NewCond 方法
- 1func NewCond(l Locker) *Cond {
- 2 return &Cond{L: l}
- 3}
用于初始化 Cond 對(duì)象,就是初始化控制鎖。
Cond.Wait 方法
- 1func (c *Cond) Wait() {
- 2 c.checker.check()
- 3 t := runtime_notifyListAdd(&c.notify)
- 4 c.L.Unlock()
- 5 runtime_notifyListWait(&c.notify, t)
- 6 c.L.Lock()
- 7}
runtime_notifyListAdd 的實(shí)現(xiàn)在 runtime/sema.go 的 notifyListAdd ,它用于原子性地增加等待者的 waiter 票號(hào),并返回當(dāng)前 goroutine 應(yīng)該取的票號(hào)值 t 。runtime_notifyListWait 的實(shí)現(xiàn)在runtime/sema.go 的 notifyListWait,它會(huì)嘗試去比較此時(shí) goroutine 的應(yīng)取票號(hào) t 與 notify 中記錄的當(dāng)前應(yīng)該被通知的票號(hào)。如果 t 小于當(dāng)前票號(hào),那么直接能得到返回,否則將會(huì)則塞等待,通知取號(hào)。
同時(shí),這里需要注意的是,由于在進(jìn)入 runtime_notifyListWait 時(shí),當(dāng)前 goroutine 通過(guò) c.L.Unlock() 將鎖解了,這就意味著有可能會(huì)有多個(gè) goroutine 來(lái)讓條件發(fā)生變化。那么,當(dāng)前 goroutine 是不能保證在 runtime_notifyListWait 返回后,條件就一定是真的,因此需要循環(huán)判斷條件。正確的 Wait 使用姿勢(shì)如下:
- 1// c.L.Lock()
- 2// for !condition() {
- 3// c.Wait()
- 4// }
- 5// ... make use of condition ...
- 6// c.L.Unlock()
Cond.Signal 方法
- 1func (c *Cond) Signal() {
- 2 c.checker.check()
- 3 runtime_notifyListNotifyOne(&c.notify)
- 4}
runtime_notifyListNotifyOne 的詳細(xì)實(shí)現(xiàn)在 runtime/sema.go 的 notifyListNotifyOne,它的目的就是通知 waiter 取票。具體操作是:如果在上一次通知取票之后沒(méi)有新的 waiter 取票者,那么該函數(shù)會(huì)直接返回。否則,它會(huì)將取票號(hào) +1,并通知喚醒等待取票的 waiter。
需要注意的是,調(diào)用 Signal 方法時(shí),并不需要持有 c.L 鎖。
Cond.Broadcast 方法
- 1func (c *Cond) Broadcast() {
- 2 c.checker.check()
- 3 runtime_notifyListNotifyAll(&c.notify)
- 4}
runtime_notifyListNotifyAll 的詳細(xì)實(shí)現(xiàn)在 runtime/sema.go 的 notifyListNotifyAll,它會(huì)通知喚醒所有的 waiter,并將 notify 值置為 和 wait 值相等。調(diào)用 Broadcast 方法時(shí),也不需要持有 c.L 鎖。
討論
在 $GOPATH/src/sync/cond.go 下,我們可以發(fā)現(xiàn)其代碼量非常之少,但它呈現(xiàn)的只是核心邏輯,其實(shí)現(xiàn)細(xì)節(jié)位于 runtime/sema.go 之中,依賴(lài)的是 runtime 層的調(diào)度原語(yǔ),對(duì)細(xì)節(jié)感興趣的讀者可以深入學(xué)習(xí)。
問(wèn)題來(lái)了,為什么在日常開(kāi)發(fā)中,我們很少會(huì)使用到 sync.Cond ?
無(wú)效喚醒
前文中我們提到,使用 Cond.Wait 正確姿勢(shì)如下
- 1 c.L.Lock()
- 2 for !condition() {
- 3 c.Wait()
- 4 }
- 5 ... make use of condition ...
- 6 c.L.Unlock()
以文章開(kāi)頭的例子而言,如果在每次調(diào)用 Put 方法時(shí),使用 Broadcast 方法喚醒所有的 waiter,那么很大概率上被喚醒的 waiter 醒來(lái)發(fā)現(xiàn)條件并不滿(mǎn)足,又會(huì)重新進(jìn)入等待。盡管是調(diào)用 Signal 方法喚醒指定的 waiter,但是它也不能保證喚醒的 waiter 條件一定滿(mǎn)足。因此,在實(shí)際的使用中,我們需要盡量保證喚醒操作是有效地,為了做到這點(diǎn),代碼的復(fù)雜度難免會(huì)增加。
- 饑餓問(wèn)題
還是以文章開(kāi)頭例子為例,如果同時(shí)有多個(gè) goroutine 執(zhí)行 GetMany(3) 和 GetMany(3000),執(zhí)行 GetMany(3) 與執(zhí)行 GetMany(3000) 的 goroutine 被喚醒的概率是一樣的,但是由于 GetMany(3) 只需要 3個(gè)數(shù)據(jù)就能滿(mǎn)足條件,那么如果一直存在 GetMany(3) 的 goroutine,執(zhí)行 GetMany(3000) 的 goroutine 將永遠(yuǎn)拿不到數(shù)據(jù),一直被無(wú)效喚醒。
- 不能響應(yīng)其他事件
條件變量的意義在于讓 goroutine 等待某種條件發(fā)生時(shí)進(jìn)入睡眠狀態(tài)。但是這會(huì)讓 goroutine 在等待條件時(shí),可能會(huì)錯(cuò)過(guò)一些需要注意的其他事件。例如,調(diào)用 Cond.Wait 的函數(shù)中包含了 context 上下文,當(dāng) context 傳來(lái)取消信號(hào)時(shí),它并不能像我們期望的一樣,獲取到取消信號(hào)并退出。Cond 的使用,讓我們不能同時(shí)選擇(select)條件和其他事件。
- 可替代性
通過(guò)對(duì) sync.Cond 幾個(gè)對(duì)外方法的分析,我們不難看到,它的使用場(chǎng)景是可以被 channel 所代替的,但是這也會(huì)增加代碼的復(fù)雜性。上文中的例子,可以使用 channel 改寫(xiě)如下。
- 1type Item = int
- 2
- 3type waiter struct {
- 4 n int
- 5 c chan []Item
- 6}
- 7
- 8type state struct {
- 9 items []Item
- 10 wait []waiter
- 11}
- 12
- 13type Queue struct {
- 14 s chan state
- 15}
- 16
- 17func NewQueue() *Queue {
- 18 s := make(chan state, 1)
- 19 s <- state{}
- 20 return &Queue{s}
- 21}
- 22
- 23func (q *Queue) Put(item Item) {
- 24 s := <-q.s
- 25 s.items = append(s.items, item)
- 26 for len(s.wait) > 0 {
- 27 w := s.wait[0]
- 28 if len(s.items) < w.n {
- 29 break
- 30 }
- 31 w.c <- s.items[:w.n:w.n]
- 32 s.items = s.items[w.n:]
- 33 s.wait = s.wait[1:]
- 34 }
- 35 q.s <- s
- 36}
- 37
- 38func (q *Queue) GetMany(n int) []Item {
- 39 s := <-q.s
- 40 if len(s.wait) == 0 && len(s.items) >= n {
- 41 items := s.items[:n:n]
- 42 s.items = s.items[n:]
- 43 q.s <- s
- 44 return items
- 45 }
- 46
- 47 c := make(chan []Item)
- 48 s.wait = append(s.wait, waiter{n, c})
- 49 q.s <- s
- 50
- 51 return <-c
- 52}
最后,雖然在上文的討論中都是列出的 sync.Cond 潛在問(wèn)題,但是如果開(kāi)發(fā)者能夠在使用中考慮到以上的幾點(diǎn)問(wèn)題,對(duì)于監(jiān)視器模型的實(shí)現(xiàn)而言,在代碼的語(yǔ)義邏輯上,sync.Cond 的使用會(huì)比 channel 的模式更易理解和維護(hù)。記住一點(diǎn),通俗易懂的代碼模型總是比深?yuàn)W的炫技要接地氣。