Go 語(yǔ)言中 sync 包的近距離觀察
讓我們來(lái)看看負(fù)責(zé)提供同步原語(yǔ)的 Go 包:sync。
sync.Mutex
sync.Mutex 可能是 sync 包中被廣泛使用的原語(yǔ)。它允許對(duì)共享資源進(jìn)行互斥操作(即不允許同時(shí)訪問(wèn)):
mutex := &sync.Mutex{}
mutex.Lock()
// Update shared variable (e.g. slice, pointer on a structure, etc.)
mutex.Unlock()
必須指出的是 sync.Mutex 無(wú)法被復(fù)制(就像 sync 包中的所有其他原語(yǔ)一樣)。如果一個(gè)結(jié)構(gòu)體有一個(gè) sync 字段,必須通過(guò)指針進(jìn)行傳遞。
sync.RWMutex
sync.RWMutex 是一個(gè)讀寫(xiě)鎖。它提供了與我們剛剛看到的 Lock() 和 Unlock() 相同的方法(因?yàn)檫@兩個(gè)結(jié)構(gòu)都實(shí)現(xiàn)了 sync.Locker 接口)。然而,它還允許使用 RLock() 和 RUnlock() 方法進(jìn)行并發(fā)讀?。?/p>
mutex := &sync.RWMutex{}
mutex.Lock()
// Update shared variable
mutex.Unlock()
mutex.RLock()
// Read shared variable
mutex.RUnlock()
一個(gè) sync.RWMutex 允許至少一個(gè)讀取者或正好一個(gè)寫(xiě)入者,而一個(gè) sync.Mutex 則允許正好一個(gè)讀取者或?qū)懭胝摺?/p>
讓我們運(yùn)行一個(gè)快速的基準(zhǔn)測(cè)試來(lái)比較這些方法:
func BenchmarkMutexLock(b *testing.B) {
m := sync.Mutex{}
for i := 0; i < b.N; i++ {
m.Lock()
m.Unlock()
}
}
func BenchmarkRWMutexLock(b *testing.B) {
m := sync.RWMutex{}
for i := 0; i < b.N; i++ {
m.Lock()
m.Unlock()
}
}
func BenchmarkRWMutexRLock(b *testing.B) {
m := sync.RWMutex{}
for i := 0; i < b.N; i++ {
m.RLock()
m.RUnlock()
}
}
BenchmarkMutexLock-4 83497579 17.7 ns/op
BenchmarkRWMutexLock-4 35286374 44.3 ns/op
BenchmarkRWMutexRLock-4 89403342 15.3 ns/op
正如我們注意到的那樣,讀取鎖定/解鎖 sync.RWMutex 比鎖定/解鎖 sync.Mutex 更快。另一方面,調(diào)用 Lock()/Unlock() 在 sync.RWMutex 上是最慢的操作。
總的來(lái)說(shuō),當(dāng)我們有頻繁的讀取和不經(jīng)常的寫(xiě)入時(shí),應(yīng)該使用 sync.RWMutex。
sync.WaitGroup
sync.WaitGroup 也經(jīng)常被使用。它是一個(gè) goroutine 等待一組 goroutine 完成的慣用方式。
sync.WaitGroup 擁有一個(gè)內(nèi)部計(jì)數(shù)器。如果這個(gè)計(jì)數(shù)器等于 0,Wait() 方法會(huì)立即返回。否則,它會(huì)被阻塞,直到計(jì)數(shù)器變?yōu)?0。
要增加計(jì)數(shù)器,我們可以使用 Add(int) 方法。要減少計(jì)數(shù)器,可以使用 Done()(將計(jì)數(shù)器減 1)或者使用帶有負(fù)值的相同的 Add(int) 方法。
在以下示例中,我們將啟動(dòng)八個(gè) goroutine 并等待它們完成:
wg := &sync.WaitGroup{}
for i := 0; i < 8; i++ {
wg.Add(1)
go func() {
// Do something
wg.Done()
}()
}
wg.Wait()
// Continue execution
每次我們創(chuàng)建一個(gè) goroutine 時(shí),都會(huì)使用 wg.Add(1) 來(lái)增加 wg 的內(nèi)部計(jì)數(shù)器。我們也可以在 for 循環(huán)外部調(diào)用 wg.Add(8)。
與此同時(shí),每當(dāng)一個(gè) goroutine 完成時(shí),它會(huì)使用 wg.Done() 來(lái)減少 wg 的內(nèi)部計(jì)數(shù)器。
一旦執(zhí)行了八個(gè) wg.Done() 語(yǔ)句,主 goroutine 就會(huì)繼續(xù)執(zhí)行。
sync.Map
sync.Map 是 Go 中的一個(gè)并發(fā)版本的 map,我們可以:
- 使用 Store(interface{}, interface{}) 添加元素
- 使用 Load(interface) interface{} 檢索元素
- 使用 Delete(interface{}) 刪除元素
- 使用 LoadOrStore(interface{}, interface{}) (interface, bool) 檢索或添加元素(如果之前不存在)。返回的 bool 值為 true 表示在操作前鍵存在于 map 中。
- 使用 Range 在元素上進(jìn)行迭代
m := &sync.Map{}
// Put elements
m.Store(1, "one")
m.Store(2, "two")
// Get element 1
value, contains := m.Load(1)
if contains {
fmt.Printf("%s\n", value.(string))
}
// Returns the existing value if present, otherwise stores it
value, loaded := m.LoadOrStore(3, "three")
if !loaded {
fmt.Printf("%s\n", value.(string))
}
// Delete element 3
m.Delete(3)
// Iterate over all the elements
m.Range(func(key, value interface{}) bool {
fmt.Printf("%d: %s\n", key.(int), value.(string))
return true
})
Go 在線測(cè)試: https://play.golang.org/p/BO8IDVIDwsr
one
three
1: one
2: two
正如你所看到的,Range 方法接受一個(gè) func(key, value interface{}) bool 函數(shù)作為參數(shù)。如果我們返回 false,則迭代會(huì)停止。有趣的是,即使我們?cè)诤愣〞r(shí)間之后返回 false(更多信息),最壞情況下的時(shí)間復(fù)雜度仍然保持為 O(n)。
何時(shí)應(yīng)該使用 sync.Map 而不是在經(jīng)典的 map 上加 sync.Mutex 呢?
- 當(dāng)我們有頻繁讀取和不經(jīng)常寫(xiě)入時(shí)(與 sync.RWMutex 類似)
- 當(dāng)多個(gè) goroutine 為不相交的鍵集合讀取、寫(xiě)入和覆蓋條目。這具體意味著什么?例如,如果我們有一個(gè)分片實(shí)現(xiàn),有 4 個(gè) goroutine 每個(gè)負(fù)責(zé) 25% 的鍵(沒(méi)有沖突)。在這種情況下,sync.Map 也是首選。
sync.Pool
sync.Pool 是一個(gè)并發(fā)池,負(fù)責(zé)安全地保存一組對(duì)象。
其公共方法包括:
- Get() interface{} 用于檢索一個(gè)元素
- Put(interface{}) 用于添加一個(gè)元素
pool := &sync.Pool{}
pool.Put(NewConnection(1))
pool.Put(NewConnection(2))
pool.Put(NewConnection(3))
connection := pool.Get().(*Connection)
fmt.Printf("%d\n", connection.id)
connection = pool.Get().(*Connection)
fmt.Printf("%d\n", connection.id)
connection = pool.Get().(*Connection)
fmt.Printf("%d\n", connection.id)
1
3
2
值得注意的是,就順序而言是沒(méi)有保證的。Get 方法指定它從池中獲取一個(gè)任意的項(xiàng)目。
也可以指定一個(gè)創(chuàng)建方法:
pool := &sync.Pool{
New: func() interface{} {
return NewConnection()
},
}
connection := pool.Get().(*Connection)
每次調(diào)用 Get() 時(shí),它將返回由傳遞給 pool.New 的函數(shù)創(chuàng)建的對(duì)象(在本例中是一個(gè)指針)。
何時(shí)應(yīng)該使用 sync.Pool 呢?有兩種情況:
- 第一種情況是當(dāng)我們需要重用共享且長(zhǎng)期存在的對(duì)象時(shí),比如一個(gè)數(shù)據(jù)庫(kù)連接。
- 第二種情況是優(yōu)化內(nèi)存分配。
讓我們考慮一個(gè)函數(shù)的示例,該函數(shù)將數(shù)據(jù)寫(xiě)入緩沖區(qū)并將結(jié)果持久化到文件中。使用 sync.Pool,我們可以重復(fù)使用分配給緩沖區(qū)的空間,跨不同的函數(shù)調(diào)用重復(fù)使用同一個(gè)對(duì)象。
第一步是檢索先前分配的緩沖區(qū)(或者如果是第一次調(diào)用,則創(chuàng)建一個(gè),但這已經(jīng)被抽象化了)。然后,延遲操作是將緩沖區(qū)放回池中。
func writeFile(pool *sync.Pool, filename string) error {
// Gets a buffer object
buf := pool.Get().(*bytes.Buffer)
// Returns the buffer into the pool
defer pool.Put(buf)
// Reset buffer otherwise it will contain "foo" during the first call
// Then "foofoo" etc.
buf.Reset()
buf.WriteString("foo")
return ioutil.WriteFile(filename, buf.Bytes(), 0644)
}
sync.Pool 還有一個(gè)要提到的重要點(diǎn)。由于指針可以被放入 Get() 返回的接口值中,無(wú)需進(jìn)行任何分配,因此最好將指針?lè)湃氤刂卸皇墙Y(jié)構(gòu)體。
這樣,我們既可以有效地重用已分配的內(nèi)存,又可以減輕垃圾收集器的壓力,因?yàn)槿绻兞刻右莸蕉焉?,它就不需要再次分配?nèi)存。
sync.Once
sync.Once 是一個(gè)簡(jiǎn)單而強(qiáng)大的原語(yǔ),用于確保一個(gè)函數(shù)只被執(zhí)行一次。
在這個(gè)例子中,將只有一個(gè) goroutine 顯示輸出消息:
once := &sync.Once{}
for i := 0; i < 4; i++ {
i := i
go func() {
once.Do(func() {
fmt.Printf("first %d\n", i)
})
}()
}
我們使用了 Do(func()) 方法來(lái)指定只有這部分代碼必須被執(zhí)行一次。
sync.Cond
讓我們以最可能最少使用的原語(yǔ) sync.Cond 結(jié)束。
它用于向 goroutine 發(fā)出信號(hào)(一對(duì)一)或向 goroutine(s) 廣播信號(hào)(一對(duì)多)。
假設(shè)我們有一個(gè)場(chǎng)景,需要通知一個(gè) goroutine 共享切片的第一個(gè)元素已被更新。
創(chuàng)建一個(gè) sync.Cond 需要一個(gè) sync.Locker 對(duì)象(可以是 sync.Mutex 或 sync.RWMutex):
cond := sync.NewCond(&sync.RWMutex{})
接下來(lái),讓我們編寫(xiě)一個(gè)函數(shù)來(lái)顯示切片的第一個(gè)元素:
func printFirstElement(s []int, cond *sync.Cond) {
cond.L.Lock()
cond.Wait()
fmt.Printf("%d\n", s[0])
cond.L.Unlock()
}
正如你所看到的,我們可以使用 cond.L 來(lái)訪問(wèn)內(nèi)部互斥鎖。一旦鎖被獲取,我們調(diào)用 cond.Wait(),它會(huì)阻塞直到收到信號(hào)。
現(xiàn)在回到主 goroutine。我們將通過(guò)傳遞一個(gè)共享切片和之前創(chuàng)建的 sync.Cond 來(lái)創(chuàng)建一個(gè) printFirstElement 池。然后,我們調(diào)用一個(gè) get() 函數(shù),將結(jié)果存儲(chǔ)在 s[0] 中并發(fā)出一個(gè)信號(hào):
s := make([]int, 1)
for i := 0; i < runtime.NumCPU(); i++ {
go printFirstElement(s, cond)
}
i := get()
cond.L.Lock()
s[0] = i
cond.Signal()
cond.L.Unlock()
這個(gè)信號(hào)將解除一個(gè)創(chuàng)建的 goroutine 的阻塞狀態(tài),它將顯示 s[0]。
然而,如果我們退一步來(lái)看,我們可能會(huì)認(rèn)為我們的代碼可能違反了 Go 最基本的原則之一:
不要通過(guò)共享內(nèi)存來(lái)通信;相反,通過(guò)通信來(lái)共享內(nèi)存。
事實(shí)上,在這個(gè)例子中,最好使用一個(gè)通道來(lái)傳遞 get() 返回的值。
然而,我們也提到了 sync.Cond 還可以用于廣播信號(hào)。
讓我們修改上一個(gè)示例的結(jié)尾,將 Signal() 改為 Broadcast():
i := get()
cond.L.Lock()
s[0] = i
cond.Broadcast()
cond.L.Unlock()
在這種情況下,所有的 goroutine 都會(huì)被觸發(fā)。
眾所周知,通道元素只會(huì)被一個(gè) goroutine 捕獲。唯一模擬廣播的方式是關(guān)閉一個(gè)通道,但這不能重復(fù)使用。因此,盡管 頗具爭(zhēng)議,這無(wú)疑是一個(gè)有趣的特性。
還有一個(gè)值得提及的 sync.Cond 使用場(chǎng)景,也許是最重要的一個(gè):
示例的 Go Playground 地址:https://play.golang.org/p/ap5qXF5DAg5