前言
哈嘍,大家好,我是asong;前幾天逛github發(fā)現(xiàn)了一個(gè)有趣的并發(fā)庫-conc,其目標(biāo)是:
- 更難出現(xiàn)goroutine泄漏
- 處理panic更友好
- 并發(fā)代碼可讀性高
從簡介上看主要封裝功能如下:
- 對(duì)waitGroup進(jìn)行封裝,避免了產(chǎn)生大量重復(fù)代碼,并且也封裝recover,安全性更高
- 提供panics.Catcher封裝recover邏輯,統(tǒng)一捕獲panic,打印調(diào)用棧一些信息
- 提供一個(gè)并發(fā)執(zhí)行任務(wù)的worker池,可以控制并發(fā)度、goroutine可以進(jìn)行復(fù)用,支持函數(shù)簽名,同時(shí)提供了stream方法來保證結(jié)果有序
- 提供ForEach、map方法優(yōu)雅的處理切片
接下來就區(qū)分模塊來介紹一下這個(gè)庫;
倉庫地址:https://github.com/sourcegraph/conc
WatiGroup的封裝
Go語言標(biāo)準(zhǔn)庫有提供sync.waitGroup控制等待goroutine,我們一般會(huì)寫出如下代碼:
func main(){
var wg sync.WaitGroup
for i:=0; i < 10; i++{
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
// recover panic
err := recover()
if err != nil {
fmt.Println(err)
}
}
// do something
handle()
}
}
wg.Wait()
}
上述代碼我們需要些一堆重復(fù)代碼,并且需要單獨(dú)在每一個(gè)func中處理recover邏輯,所以conc庫對(duì)其進(jìn)行了封裝,代碼簡化如下:
func main() {
wg := conc.NewWaitGroup()
for i := 0; i < 10; i++ {
wg.Go(doSomething)
}
wg.Wait()
}
func doSomething() {
fmt.Println("test")
}
conc庫封裝也比較簡單,結(jié)構(gòu)如下:
type WaitGroup struct {
wg sync.WaitGroup
pc panics.Catcher
}
其自己實(shí)現(xiàn)了Catcher類型對(duì)recover邏輯進(jìn)行了封裝,封裝思路如下:
type Catcher struct {
recovered atomic.Pointer[RecoveredPanic]
}
recovered是原子指針類型,RecoveredPanic是捕獲的recover封裝,封裝了堆棧等信息:
type RecoveredPanic struct {
// The original value of the panic.
Value any
// The caller list as returned by runtime.Callers when the panic was
// recovered. Can be used to produce a more detailed stack information with
// runtime.CallersFrames.
Callers []uintptr
// The formatted stacktrace from the goroutine where the panic was recovered.
// Easier to use than Callers.
Stack []byte
}
提供了Try方法執(zhí)行方法,只會(huì)記錄第一個(gè)panic的gououtine信息:
func (p *Catcher) Try(f func()) {
defer p.tryRecover()
f()
}
func (p *Catcher) tryRecover() {
if val := recover(); val != nil {
rp := NewRecoveredPanic(1, val)
// 只會(huì)記錄第一個(gè)panic的goroutine信息
p.recovered.CompareAndSwap(nil, &rp)
}
}
提供了Repanic()方法用來重放捕獲的panic:
func (p *Catcher) Repanic() {
if val := p.Recovered(); val != nil {
panic(val)
}
}
func (p *Catcher) Recovered() *RecoveredPanic {
return p.recovered.Load()
}
waitGroup對(duì)此也分別提供了Wait()、WaitAndRecover()方法:
func (h *WaitGroup) Wait() {
h.wg.Wait()
// Propagate a panic if we caught one from a child goroutine.
h.pc.Repanic()
}
func (h *WaitGroup) WaitAndRecover() *panics.RecoveredPanic {
h.wg.Wait()
// Return a recovered panic if we caught one from a child goroutine.
return h.pc.Recovered()
}
wait方法只要有一個(gè)goroutine發(fā)生panic就會(huì)向上拋出panic,比較簡單粗暴;
waitAndRecover方法只有有一個(gè)goroutine發(fā)生panic就會(huì)返回第一個(gè)recover的gouroutine信息;
總結(jié):conc庫對(duì)waitGrouop的封裝總體是比較不錯(cuò)的,可以減少重復(fù)的代碼;
worker池
conc提供了幾種類型的worker池:
- ContextPool:可以傳遞context的pool,若有g(shù)oroutine發(fā)生錯(cuò)誤可以cancel其他goroutine
- ErrorPool:通過參數(shù)可以控制只收集第一個(gè)error還是所有error
- ResultContextPool:若有g(shù)oroutine發(fā)生錯(cuò)誤會(huì)cancel其他goroutine并且收集錯(cuò)誤
- RestultPool:收集work池中每個(gè)任務(wù)的執(zhí)行結(jié)果,并不能保證順序,保證順序需要使用stream或者iter.map;
我們來看一個(gè)簡單的例子:
import "github.com/sourcegraph/conc/pool"
func ExampleContextPool_WithCancelOnError() {
p := pool.New().
WithMaxGoroutines(4).
WithContext(context.Background()).
WithCancelOnError()
for i := 0; i < 3; i++ {
i := i
p.Go(func(ctx context.Context) error {
if i == 2 {
return errors.New("I will cancel all other tasks!")
}
<-ctx.Done()
return nil
})
}
err := p.Wait()
fmt.Println(err)
// Output:
// I will cancel all other tasks!
}
在創(chuàng)建pool時(shí)有如下方法可以調(diào)用:
- p.WithMaxGoroutines()配置pool中g(shù)oroutine的最大數(shù)量
- p.WithErrors:配置pool中的task是否返回error
- p.WithContext(ctx):配置pool中運(yùn)行的task當(dāng)遇到第一個(gè)error要取消
- p.WithFirstError:配置pool中的task只返回第一個(gè)error
- p.WithCollectErrored:配置pool的task收集所有error
pool的基礎(chǔ)結(jié)構(gòu)如下:
type Pool struct {
handle conc.WaitGroup
limiter limiter
tasks chan func()
initOnce sync.Once
}
limiter是控制器,用chan來控制goroutine的數(shù)量:
type limiter chan struct{}
func (l limiter) limit() int {
return cap(l)
}
func (l limiter) release() {
if l != nil {
<-l
}
}
pool的核心邏輯也比較簡單,如果沒有設(shè)置limiter,那么就看有沒有空閑的worker,否則就創(chuàng)建一個(gè)新的worker,然后投遞任務(wù)進(jìn)去;
如果設(shè)置了limiter,達(dá)到了limiter worker數(shù)量上限,就把任務(wù)投遞給空閑的worker,沒有空閑就阻塞等著;
func (p *Pool) Go(f func()) {
p.init()
if p.limiter == nil {
// 沒有限制
select {
case p.tasks <- f:
// A goroutine was available to handle the task.
default:
// No goroutine was available to handle the task.
// Spawn a new one and send it the task.
p.handle.Go(p.worker)
p.tasks <- f
}
} else {
select {
case p.limiter <- struct{}{}:
// If we are below our limit, spawn a new worker rather
// than waiting for one to become available.
p.handle.Go(p.worker)
// We know there is at least one worker running, so wait
// for it to become available. This ensures we never spawn
// more workers than the number of tasks.
p.tasks <- f
case p.tasks <- f:
// A worker is available and has accepted the task.
return
}
}
}
這里work使用的是一個(gè)無緩沖的channel,這種復(fù)用方式很巧妙,如果goroutine執(zhí)行很快避免創(chuàng)建過多的goroutine;
使用pool處理任務(wù)不能保證有序性,conc庫又提供了Stream方法,返回結(jié)果可以保持順序;
Stream
Steam的實(shí)現(xiàn)也是依賴于pool,在此基礎(chǔ)上做了封裝保證結(jié)果的順序性,先看一個(gè)例子:
func ExampleStream() {
times := []int{20, 52, 16, 45, 4, 80}
stream := stream2.New()
for _, millis := range times {
dur := time.Duration(millis) * time.Millisecond
stream.Go(func() stream2.Callback {
time.Sleep(dur)
// This will print in the order the tasks were submitted
return func() { fmt.Println(dur) }
})
}
stream.Wait()
// Output:
// 20ms
// 52ms
// 16ms
// 45ms
// 4ms
// 80ms
}
stream的結(jié)構(gòu)如下:
type Stream struct {
pool pool.Pool
callbackerHandle conc.WaitGroup
queue chan callbackCh
initOnce sync.Once
}
queue是一個(gè)channel類型,callbackCh也是channel類型 - chan func():
type callbackCh chan func()
在提交goroutine時(shí)按照順序生成callbackCh傳遞結(jié)果:
func (s *Stream) Go(f Task) {
s.init()
// Get a channel from the cache.
ch := getCh()
// Queue the channel for the callbacker.
s.queue <- ch
// Submit the task for execution.
s.pool.Go(func() {
defer func() {
// In the case of a panic from f, we don't want the callbacker to
// starve waiting for a callback from this channel, so give it an
// empty callback.
if r := recover(); r != nil {
ch <- func() {}
panic(r)
}
}()
// Run the task, sending its callback down this task's channel.
callback := f()
ch <- callback
})
}
var callbackChPool = sync.Pool{
New: func() any {
return make(callbackCh, 1)
},
}
func getCh() callbackCh {
return callbackChPool.Get().(callbackCh)
}
func putCh(ch callbackCh) {
callbackChPool.Put(ch)
}
ForEach和map
ForEach
conc庫提供了ForEach方法可以優(yōu)雅的并發(fā)處理切片,看一下官方的例子:

conc庫使用泛型進(jìn)行了封裝,我們只需要關(guān)注handle代碼即可,避免冗余代碼,我們自己動(dòng)手寫一個(gè)例子:
func main() {
input := []int{1, 2, 3, 4}
iterator := iter.Iterator[int]{
MaxGoroutines: len(input) / 2,
}
iterator.ForEach(input, func(v *int) {
if *v%2 != 0 {
*v = -1
}
})
fmt.Println(input)
}
ForEach內(nèi)部實(shí)現(xiàn)為Iterator結(jié)構(gòu)及核心邏輯如下:
type Iterator[T any] struct {
MaxGoroutines int
}
func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) {
if iter.MaxGoroutines == 0 {
// iter is a value receiver and is hence safe to mutate
iter.MaxGoroutines = defaultMaxGoroutines()
}
numInput := len(input)
if iter.MaxGoroutines > numInput {
// No more concurrent tasks than the number of input items.
iter.MaxGoroutines = numInput
}
var idx atomic.Int64
// 通過atomic控制僅創(chuàng)建一個(gè)閉包
task := func() {
i := int(idx.Add(1) - 1)
for ; i < numInput; i = int(idx.Add(1) - 1) {
f(i, &input[i])
}
}
var wg conc.WaitGroup
for i := 0; i < iter.MaxGoroutines; i++ {
wg.Go(task)
}
wg.Wait()
}
可以設(shè)置并發(fā)的goroutine數(shù)量,默認(rèn)取的是GOMAXPROCS ,也可以自定義傳參;
并發(fā)執(zhí)行這塊設(shè)計(jì)的很巧妙,僅創(chuàng)建了一個(gè)閉包,通過atomic控制idx,避免頻繁觸發(fā)GC;
map
conc庫提供的map方法可以得到對(duì)切片中元素結(jié)果,官方例子:

使用map可以提高代碼的可讀性,并且減少了冗余代碼,自己寫個(gè)例子:
func main() {
input := []int{1, 2, 3, 4}
mapper := iter.Mapper[int, bool]{
MaxGoroutines: len(input) / 2,
}
results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 })
fmt.Println(results)
// Output:
// [false true false true]
}
map的實(shí)現(xiàn)也依賴于Iterator,也是調(diào)用的ForEachIdx方法,區(qū)別于ForEach是記錄處理結(jié)果;
總結(jié)
花了小半天時(shí)間看了一下這個(gè)庫,很多設(shè)計(jì)點(diǎn)值得我們學(xué)習(xí),總結(jié)一下我學(xué)習(xí)到的知識(shí)點(diǎn):
- conc.WatiGroup對(duì)Sync.WaitGroup進(jìn)行了封裝,對(duì)Add、Done、Recover進(jìn)行了封裝,提高了可讀性,避免了冗余代碼
- ForEach、Map方法可以更優(yōu)雅的并發(fā)處理切片,代碼簡潔易讀,在實(shí)現(xiàn)上Iterator中的并發(fā)處理使用atomic來控制只創(chuàng)建一個(gè)閉包,避免了GC性能問題
- pool是一個(gè)并發(fā)的協(xié)程隊(duì)列,可以控制協(xié)程的數(shù)量,實(shí)現(xiàn)上也很巧妙,使用一個(gè)無緩沖的channel作為worker,如果goroutine執(zhí)行速度快,避免了創(chuàng)建多個(gè)goroutine
- stream是一個(gè)保證順序的并發(fā)協(xié)程隊(duì)列,實(shí)現(xiàn)上也很巧妙,使用sync.Pool在提交goroutine時(shí)控制順序,值得我們學(xué)習(xí);
小伙伴們有時(shí)間可以看一下這個(gè)并發(fā)庫,學(xué)習(xí)其中的優(yōu)點(diǎn),慢慢進(jìn)步~