一個例子,給你講透典型的Go并發(fā)控制
Go中可以使用一個go關(guān)鍵字讓程序異步執(zhí)行
一個比較常見的場景:逐個異步調(diào)用多個函數(shù),或者循環(huán)中異步調(diào)用
func main() {
go do1()
go do2()
go do3()
}
// 或者
func main() {
for i := range []int{1,2,3}{
go do(i)
}
}
如果了解Go并發(fā)機制,就知道m(xù)ain在其他goroutine運行完成之前就已經(jīng)結(jié)束了,所以上面代碼的運行結(jié)果是不符合預期的。我們需要使用一種叫做并發(fā)控制的手段,來保證程序正確運行
為了更容易理解,我們虛擬一個??
已知有一個現(xiàn)成的函數(shù)search,能夠按照關(guān)鍵詞執(zhí)行搜索
期望實現(xiàn)一個新的函數(shù)coSearch能夠進行批量查詢
package main
import (
"context"
"errors"
"fmt"
"sync"
)
func search(ctx context.Context, word string) (string, error) {
if word == "Go" {
return "", errors.New("error: Go") // 模擬結(jié)果
}
return fmt.Sprintf("result: %s", word), nil // 模擬結(jié)果
}
func coSearch(ctx context.Context, words []string) (results []string, err error) {
//tbd
return
}
func main() {
words := []string{"Go", "Rust", "PHP", "JavaScript", "Java"}
results, err := coSearch(context.Background(), words)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(results)
}
可以先暫停想想該如何實現(xiàn)coSearch函數(shù)
并發(fā)控制基礎(chǔ)
sync.WaitGroup是Go標準庫中用來控制并發(fā)的結(jié)構(gòu),這里放一個使用WaitGroup實現(xiàn)coSearch的示例
package main
import (
"context"
"errors"
"fmt"
"sync"
)
func search(ctx context.Context, word string) (string, error) {
if word == "Go" {
return "", errors.New("error: Go") // 模擬結(jié)果
}
return fmt.Sprintf("result: %s", word), nil // 模擬結(jié)果
}
func coSearch(ctx context.Context, words []string) ([]string, error) {
var (
wg = sync.WaitGroup{}
once = sync.Once{}
results = make([]string, len(words))
err error
)
for i, word := range words {
wg.Add(1)
go func(word string, i int) {
defer wg.Done()
result, e := search(ctx, word)
if e != nil {
once.Do(func() {
err = e
})
return
}
results[i] = result
}(word, i)
}
wg.Wait()
return results, err
}
func main() {
words := []string{"Go", "Rust", "PHP", "JavaScript", "Java"}
results, err := coSearch(context.Background(), words)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(results)
}
上面的代碼中有非常多的細節(jié),來逐個聊一聊
?? sync.WaitGroup{}并發(fā)控制
sync.WaitGroup{}的用法非常簡潔
- 當新運行一個goroutine時,我們需要調(diào)用wg.Add(1)
- 當一個goroutine運行完成的時候,我們需要調(diào)用wg.Done()
- wg.Wait()讓程序阻塞在此處,直到所有的goroutine運行完畢。
對于coSearch來說,等待所有g(shù)oroutine運行完成,也就完成了函數(shù)的任務(wù),返回最終的結(jié)果
var (
wg = sync.WaitGroup{}
//...省略其他代碼
)
for i, word := range words {
wg.Add(1)
go func(word string, i int) {
defer wg.Done()
//...省略其他代碼
}(word, i)
}
wg.Wait()
?? for循環(huán)中的goroutine!
這是一個Go經(jīng)典錯誤,如果goroutine中使用了for迭代的變量,所有g(shù)oroutine都會獲得最后一次循環(huán)的值。例如下面的示例,并不會輸出"a", "b", "c" 而是輸出 "c", "c", "c"
func main() {
done := make(chan bool)
values := []string{"a", "b", "c"}
for _, v := range values {
go func() {
fmt.Println(v)
done <- true
}()
}
// wait for all goroutines to complete before exiting
for _ = range values {
<-done
}
}
正確的做法就是像上文示例一樣,將迭代的變量賦值給函數(shù)參數(shù),或者賦值給新的變量
for i, word := range words {
// ...
go func(word string, i int) {
// fmt.Println(word, i)
}(word, i)
}
for i, word := range words {
i, word := i, word
go func() {
// fmt.Println(word, i)
}()
}
由于這個錯誤實在太常見,從Go 1.22開始Go已經(jīng)修正了這個經(jīng)典的錯誤:Fixing For Loops in Go 1.22。
不過Go 1.22默認不會開啟修正,需要設(shè)置環(huán)境變量GOEXPERIMENT=loopvar才會 開啟
?? 并發(fā)安全
簡單理解:當多個goroutine對同一個內(nèi)存區(qū)域進行讀寫時,就會產(chǎn)生并發(fā)安全的問題,它會導致程序運行的結(jié)果不符合預期
上面的示例把最終的結(jié)果放入了results = make([]string, len(words))中。雖然我們在goroutine中并發(fā)的對于results變量進行寫入,但因為每一個goroutine都寫在了獨立的位置,且沒有任何讀取的操作,因此results[i] = result是并發(fā)安全的
results = [ xxxxxxxx, xxxxxxxx, xxxxxxxx, .... ]
^ ^ ^
| | |
goroutine1 goroutine2 goroutine3
這也意味著如果使用results = append(results, result)的方式并發(fā)賦值,因為會涉及到slice的擴容等操作,所以并不是并發(fā)安全的,需要利用sync.Mutex{}進行加鎖
如果想盡可能的提高程序的并發(fā)性能,推薦使用 results[i] = result這種方式賦值
?? sync.Once{}單次賦值
示例coSearch中,會返回第一個出錯的search的error。err是一個全局變量,在并發(fā)goroutine中賦值是并發(fā)不安全的操作
//...省略其他代碼
go func(word string, i int) {
defer wg.Done()
result, e := search(ctx, word)
if e != nil && err == nil {
err = e
return
}
results[i] = result
}(word, i)
//...省略其他代碼
對于全局變量的賦值比較常規(guī)做法就是利用sync.Mutex{}進行加鎖。但示例的邏輯為單次賦值,我們剛好可以利用同在sync庫的sync.Once{}來簡化代碼
sync.Once{}功能如其名,將我們要執(zhí)行的邏輯放到它的Do()方法中,無論多少并發(fā)都只會執(zhí)行一次
//...省略其他代碼
go func(word string, i int) {
defer wg.Done()
result, e := search(ctx, word)
if e != nil {
once.Do(func() {
err = e
})
return
}
results[i] = result
}(word, i)
//...省略其他代碼
Further more
上面的示例coSearch已經(jīng)是一個比較完善的函數(shù)了,但我們還可以做得更多
?? goroutine數(shù)量控制
coSearch入?yún)⒌臄?shù)組可能非常大,如果不加以控制可能導致我們的服務(wù)器資源耗盡,我們需要控制并發(fā)的數(shù)量
利用帶緩沖channel可以實現(xiàn)
tokens := make(chan struct{}, 10)
for i, word := range words {
tokens <- struct{}{} // 新增
wg.Add(1)
go func(word string, i int) {
defer func() {
wg.Done()
<-tokens // 新增
}()
result, e := search(ctx, word)
if e != nil {
once.Do(func() {
err = e
})
return
}
results[i] = result
}(word, i)
}
wg.Wait()
如上,代碼中創(chuàng)建了10個緩沖區(qū)的channel,當channel被填滿時,繼續(xù)寫入會被阻塞;當goroutine運行完成之后,除了原有的wg.Done(),我們需要從channel讀取走一個數(shù)據(jù),來允許新的goroutine運行
通過這種方式,我們控制了coSearch最多只能運行10個goroutine,當超過10個時需要等待前面運行的goroutine結(jié)束
?? context.Context
并發(fā)執(zhí)行的goroutine只要有一個出錯,其他goroutine就可以停止,沒有必要繼續(xù)執(zhí)行下去了。如何把取消的事件傳導到其他goroutine呢?context.Context就是用來傳遞類似上下文信息的結(jié)構(gòu)
ctx, cancel := context.WithCancelCause(ctx) // 新增
defer cancel(nil) // 新增
for i, word := range words {
tokens <- struct{}{}
wg.Add(1)
go func(word string, i int) {
defer func() {
wg.Done()
<-tokens
}()
result, e := search(ctx, word)
if e != nil {
once.Do(func() {
err = e
cancel(e) // 新增
})
return
}
results[i] = result
}(word, i)
}
wg.Wait()
完整的代碼
最終完成的效果如下
package main
import (
"context"
"errors"
"fmt"
"sync"
)
func search(ctx context.Context, word string) (string, error) {
select {
case <-ctx.Done():
return "", ctx.Err()
default:
if word == "Go" || word == "Java" {
return "", errors.New("Go or Java")
}
return fmt.Sprintf("result: %s", word), nil // 模擬結(jié)果
}
}
func coSearch(ctx context.Context, words []string) ([]string, error) {
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(nil)
var (
wg = sync.WaitGroup{}
once = sync.Once{}
results = make([]string, len(words))
tokens = make(chan struct{}, 2)
err error
)
for i, word := range words {
tokens <- struct{}{}
wg.Add(1)
go func(word string, i int) {
defer func() {
wg.Done()
<-tokens
}()
result, e := search(ctx, word)
if e != nil {
once.Do(func() {
err = e
cancel(e)
})
return
}
results[i] = result
}(word, i)
}
wg.Wait()
return results, err
}
并發(fā)控制庫errgroup
可以看到要實現(xiàn)一個較為完備的并發(fā)控制,需要做的工作非常多。不過Go官方團隊為大家準備了 golang.org/x/sync/errgroup
errgroup提供的能力和上文的示例類似,實現(xiàn)方式也類似,包含并發(fā)控制,錯誤傳遞,context.Context傳遞等
package main
import (
"context"
"fmt"
"sync"
"golang.org/x/sync/errgroup"
)
func coSearch(ctx context.Context, words []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10)
results := make([]string, len(words))
for i, word := range words {
i, word := i, word
g.Go(func() error {
result, err := search(ctx, word)
if err != nil {
return err
}
results[i] = result
return nil
})
}
err := g.Wait()
return results, err
}
errgroup的用法也很簡單
- 使用 g, ctx := errgroup.WithContext(ctx)來創(chuàng)建goroutine的管理器
- g.SetLimit()可以設(shè)置允許的最大的goroutine數(shù)量
- 類似于go關(guān)鍵詞, g.Go異步執(zhí)行函數(shù)
- g.Wait()和sync.WaitGroup{}的wg.Wait()類似,會阻塞直到所有g(shù)oroutine都運行完成,并返回其中一個goroutine的錯誤
利用golang.org/x/sync/errgroup大幅簡化了進行并發(fā)控制的邏輯,真是一個并發(fā)控制的利器??!
總結(jié)
本篇從基礎(chǔ)的sync.WaitGroup{}庫出發(fā),涉及到了并發(fā)安全、sync.Once等內(nèi)容。最后介紹了并發(fā)控制的利器:golang.org/x/sync/errgroup。
雖然使用Go語言能夠非常簡單的編寫并發(fā)程序,但其中要注意的細節(jié)非常多,忽略這些細節(jié)不僅沒有提升程序運行的效率,還會產(chǎn)生錯誤的結(jié)果