自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

聊聊并發(fā)庫 Conc,你學(xué)會了嗎?

開發(fā) 項目管理
我們在寫通用庫和框架的時候,都有一個原則,并發(fā)控制與業(yè)務(wù)邏輯分離,背離這個原則肯定做不出通用庫。

上個月 sourcegraph 放出了 conc[1] 并發(fā)庫,目標是 better structured concurrency for go, 簡單的評價一下

每個公司都有類似的輪子,與以往的庫比起來,多了泛型,代碼寫起來更優(yōu)雅,不需要 interface, 不需要運行時 assert, 性能肯定更好

我們在寫通用庫和框架的時候,都有一個原則,并發(fā)控制與業(yè)務(wù)邏輯分離,背離這個原則肯定做不出通用庫

整體介紹

1. WaitGroup 與 Panic

標準庫自帶 sync.WaitGroup 用于等待 goroutine 運行結(jié)束,缺點是我們要處理控制部分

圖片

代碼里大量的 wg.Add 與 wg.Done 函數(shù),所以一般封裝成右側(cè)的庫

type WaitGroup struct {
wg sync.WaitGroup
pc panics.Catcher
}

// Go spawns a new goroutine in the WaitGroup.
func (h *WaitGroup) Go(f func()) {
h.wg.Add(1)
go func() {
defer h.wg.Done()
h.pc.Try(f)
}()
}

但是如何處理 panic 呢?簡單的可以在閉包 doSomething 運行時增加一個 safeGo 函數(shù),用于捕捉 recover

圖片

原生 Go 要生成大量無用代碼,我司 repo 運動式的清理過一波,也遇到過 goroutine 忘寫 recover 導(dǎo)致的事故。conc 同時提供 catcher 封裝 recover 邏輯,conc.WaitGroup 可以選擇 Wait 重新拋出 panic, 也可以 WaitAndRecover 返回捕獲到的 panic 堆棧信息

func (h *WaitGroup) Wait() {
h.wg.Wait()

// Propagate a panic if we caught one from a child goroutine.
h.pc.Repanic()
}

func (h *WaitGroup) WaitAndRecover() *panics.RecoveredPanic {
h.wg.Wait()

// Return a recovered panic if we caught one from a child goroutine.
return h.pc.Recovered()
}

2. ForEach 與 Map

高級語言很多的基操,在 go 里面很奢侈,只能寫很多繁瑣代碼。conc封裝了泛型版本的 iterator 和 mapper

func process(values []int) {
iter.ForEach(values, handle)
}

func concMap(input []int, f func(int) int) []int {
return iter.Map(input, f)
}

上面是使用例子,用戶只需要寫業(yè)務(wù)函數(shù) handle. 相比 go1.19 前的版本,泛型的引入,使得基礎(chǔ)庫的編寫更游刃有余

// Iterator is also safe for reuse and concurrent use.
type Iterator[T any] struct {
// MaxGoroutines controls the maximum number of goroutines
// to use on this Iterator's methods.
//
// If unset, MaxGoroutines defaults to runtime.GOMAXPROCS(0).
MaxGoroutines int
}

MaxGoroutines 默認 GOMAXPROCS 并發(fā)處理傳參 slice, 也可以自定義,個人認為不合理,默認為 1 最妥

// ForEachIdx is the same as ForEach except it also provides the
// index of the element to the callback.
func ForEachIdx[T any](input []T, f func(int, *T)) { Iterator[T]{}.ForEachIdx(input, f) }

ForEachIdx 在創(chuàng)建 Iterator[T]{} 可以自定義并發(fā)度,最終調(diào)用 iter.ForEachIdx

// ForEachIdx is the same as ForEach except it also provides the
// index of the element to the callback.
func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) {
......
var idx atomic.Int64
// Create the task outside the loop to avoid extra closure allocations.
task := func() {
i := int(idx.Add(1) - 1)
for ; i < numInput; i = int(idx.Add(1) - 1) {
f(i, &input[i])
}
}

var wg conc.WaitGroup
for i := 0; i < iter.MaxGoroutines; i++ {
wg.Go(task)
}
wg.Wait()
}

ForEachIdx 泛型函數(shù)寫得非常好,略去部分代碼。樸素的實現(xiàn)在 for 循環(huán)里創(chuàng)建閉包,傳入 idx 參數(shù),然后 wg.Go 去運行。但是這樣會產(chǎn)生大量閉包,我司遇到過大量閉包,造成 heap 內(nèi)存增長很快頻繁觸發(fā) GC 的性能問題,所以在外層只創(chuàng)建一個閉包,通過 atomic 控制 idx

func Map[T, R any](input []T, f func(*T) R) []R {
return Mapper[T, R]{}.Map(input, f)
}

func MapErr[T, R any](input []T, f func(*T) (R, error)) ([]R, error) {
return Mapper[T, R]{}.MapErr(input, f)
}

Map 與 MapErr 也只是對 ForEachIdx 的封裝,區(qū)別是處理 error

3. 各種 Pool 與 Stream

Pool 用于并發(fā)處理,同時 Wait 等待任務(wù)結(jié)束。相比我司現(xiàn)有 concurrency 庫

  • 增加了泛型實現(xiàn)
  • 增加了對 goroutine 的復(fù)用
  • 增加并發(fā)度設(shè)置(我司有,但 conc 實現(xiàn)方式更巧秒)
  • 支持的函數(shù)簽名更多

先看一下支持的接口

Go(f func())
Go(f func() error)
Go(f func(ctx context.Context) error)
Go(f func(context.Context) (T, error))
Go(f func() (T, error))
Go(f func() T)
Go(f func(context.Context) (T, error))

理論上這一個足夠用了,傳參 Context, 返回泛型類型與錯誤。

Wait() ([]T, error)

這是對應(yīng)的 Wait 回收函數(shù),返回泛型結(jié)果 []T 與錯誤。具體 Pool 實現(xiàn)由多種組合而來:Pool, ErrorPool, ContextPool, ResultContextPool, ResultPool

func (p *Pool) Go(f func()) {
p.init()

if p.limiter == nil {
// No limit on the number of goroutines.
select {
case p.tasks <- f:
// A goroutine was available to handle the task.
default:
// No goroutine was available to handle the task.
// Spawn a new one and send it the task.
p.handle.Go(p.worker)
p.tasks <- f
}
}
......
}

func (p *Pool) worker() {
// The only time this matters is if the task panics.
// This makes it possible to spin up new workers in that case.
defer p.limiter.release()

for f := range p.tasks {
f()
}
}

復(fù)用方式很巧妙,如果處理速度足夠快,沒必要過多創(chuàng)建 goroutine

Stream 用于并發(fā)處理 goroutine, 但是返回結(jié)果保持順序

type Stream struct {
pool pool.Pool
callbackerHandle conc.WaitGroup
queue chan callbackCh

initOnce sync.Once
}

實現(xiàn)很簡單,queue 是一個 channel, 類型 callbackCh 同樣也是 channel, 在真正派生 goroutine 前按序順生成 callbackCh 傳遞結(jié)果

Stream 命名很差,容易讓人混淆,感覺叫 OrderedResultsPool 更理想,整體非常雞肋

超時

超時永遠是最難處理的問題,目前 conc 庫 Wait 函數(shù)并沒有提供 timeout 傳參,這就要求閉包內(nèi)部必須考濾超時,如果添加 timeout 傳參,又涉及 conc 內(nèi)部庫并發(fā)問題題

Wait() ([]T, error)

比如這個返回值,內(nèi)部 append 到 slice 時是有鎖的,如果 Wait 提前結(jié)束了會發(fā)生什么?

[]T 拿到的部分結(jié)果只能丟棄,返回給上層 timeout error

Context 框架傳遞參數(shù)

通用庫很容易做的臃腫,我司并發(fā)庫會給閉包產(chǎn)生新的 context, 并繼承所需框架層的 metadata, 兩種實現(xiàn)無可厚非,這些細節(jié)總得要處理

小結(jié)

代碼量不大,感興趣的可以看看。沒有造輪子的必要,夠用就行,這種庫寫了也沒價值

參考資料

[1]conc: https://github.com/sourcegraph/conc,

責(zé)任編輯:武曉燕 來源: 董澤潤的技術(shù)筆記
相關(guān)推薦

2023-07-10 08:36:21

工具pptword

2024-06-12 08:36:25

2022-12-26 07:48:04

敏捷項目

2024-03-05 10:09:16

restfulHTTPAPI

2024-11-08 08:56:01

2023-03-07 07:50:15

Transactio事務(wù)代碼

2022-04-13 09:01:45

SASSCSS處理器

2022-09-26 08:49:11

Java架構(gòu)CPU

2022-12-08 10:49:43

2024-08-19 10:24:14

2022-07-11 09:00:37

依賴配置文件Mybati

2022-10-25 07:24:23

數(shù)據(jù)庫TiDBmysql

2023-06-05 08:36:04

SQL函數(shù)RANK()

2024-10-29 08:08:44

2022-03-05 23:29:18

LibuvwatchdogNode.js

2022-12-14 08:31:43

#error編譯命令

2023-02-15 08:41:56

多層維表性能寬表

2024-03-04 07:41:18

SpringAOPOOP?

2022-10-11 08:48:08

HTTP狀態(tài)碼瀏覽器

2022-12-27 08:45:00

繪制菜單符號
點贊
收藏

51CTO技術(shù)棧公眾號