如何優(yōu)雅地實(shí)現(xiàn)并發(fā)編排任務(wù)
本文轉(zhuǎn)載自微信公眾號(hào)「吳親強(qiáng)的深夜食堂」,作者吳親庫(kù)里。轉(zhuǎn)載本文請(qǐng)聯(lián)系吳親強(qiáng)的深夜食堂公眾號(hào)。
業(yè)務(wù)場(chǎng)景
在做任務(wù)開發(fā)的時(shí)候,你們一定會(huì)碰到以下場(chǎng)景:
場(chǎng)景1:調(diào)用第三方接口的時(shí)候, 一個(gè)需求你需要調(diào)用不同的接口,做數(shù)據(jù)組裝。
場(chǎng)景2:一個(gè)應(yīng)用首頁(yè)可能依托于很多服務(wù)。那就涉及到在加載頁(yè)面時(shí)需要同時(shí)請(qǐng)求多個(gè)服務(wù)的接口。這一步往往是由后端統(tǒng)一調(diào)用組裝數(shù)據(jù)再返回給前端,也就是所謂的 BFF(Backend For Frontend) 層。
針對(duì)以上兩種場(chǎng)景,假設(shè)在沒有強(qiáng)依賴關(guān)系下,選擇串行調(diào)用,那么總耗時(shí)即:
- time=s1+s2+....sn
按照當(dāng)代秒入百萬(wàn)的有為青年,這么長(zhǎng)時(shí)間早就把你祖宗十八代問候了一遍。
為了偉大的KPI,我們往往會(huì)選擇并發(fā)地調(diào)用這些依賴接口。那么總耗時(shí)就是:
- time=max(s1,s2,s3.....,sn)
當(dāng)然開始堆業(yè)務(wù)的時(shí)候可以先串行化,等到上面的人著急的時(shí)候,亮出絕招。
這樣,年底 PPT 就可以加上濃重的一筆流水賬:為業(yè)務(wù)某個(gè)接口提高百分之XXX性能,間接產(chǎn)生XXX價(jià)值。
當(dāng)然這一切的前提是,做老板不懂技術(shù),做技術(shù)”懂”你。
言歸正傳,如果修改成并發(fā)調(diào)用,你可能會(huì)這么寫,
- package main
- import (
- "fmt"
- "sync"
- "time"
- )
- func main() {
- var wg sync.WaitGroup
- wg.Add(2)
- var userInfo *User
- var productList []Product
- go func() {
- defer wg.Done()
- userInfo, _ = getUser()
- }()
- go func() {
- defer wg.Done()
- productList, _ = getProductList()
- }()
- wg.Wait()
- fmt.Printf("用戶信息:%+v\n", userInfo)
- fmt.Printf("商品信息:%+v\n", productList)
- }
- /********用戶服務(wù)**********/
- type User struct {
- Name string
- Age uint8
- }
- func getUser() (*User, error) {
- time.Sleep(500 * time.Millisecond)
- var u User
- u.Name = "wuqinqiang"
- u.Age = 18
- return &u, nil
- }
- /********商品服務(wù)**********/
- type Product struct {
- Title string
- Price uint32
- }
- func getProductList() ([]Product, error) {
- time.Sleep(400 * time.Millisecond)
- var list []Product
- list = append(list, Product{
- Title: "SHib",
- Price: 10,
- })
- return list, nil
- }
從實(shí)現(xiàn)上來(lái)說(shuō),需要多少服務(wù),會(huì)開多少個(gè) G,利用 sync.WaitGroup 的特性,
實(shí)現(xiàn)并發(fā)編排任務(wù)的效果。
好像,問題不大。
但是隨著代號(hào) 996 業(yè)務(wù)場(chǎng)景的增加,你會(huì)發(fā)現(xiàn),好多模塊都有相似的功能,只是對(duì)應(yīng)的業(yè)務(wù)場(chǎng)景不同而已。
那么我們能不能抽像出一套針對(duì)此業(yè)務(wù)場(chǎng)景的工具,而把具體業(yè)務(wù)實(shí)現(xiàn)交給業(yè)務(wù)方。
使用
本著不重復(fù)造輪子的原則,去搜了下開源項(xiàng)目,最終看上了 go-zero 里面的一個(gè)工具 mapreduce。
可以自行 Google 這個(gè)名詞。
使用很簡(jiǎn)單。我們通過它改造一下上面的代碼:
- package main
- import (
- "fmt"
- "github.com/tal-tech/go-zero/core/mr"
- "time"
- )
- func main() {
- var userInfo *User
- var productList []Product
- _ = mr.Finish(func() (err error) {
- userInfo, err = getUser()
- return err
- }, func() (err error) {
- productList, err = getProductList()
- return err
- })
- fmt.Printf("用戶信息:%+v\n", userInfo)
- fmt.Printf("商品信息:%+v\n", productList)
- }
- //打印
- 用戶信息:&{Name:wuqinqiang Age:18}
- 商品信息:[{Title:SHib Price:10}]
是不是舒服多了。
但是這里還需要注意一點(diǎn),假設(shè)你調(diào)用的其中一個(gè)服務(wù)錯(cuò)誤,并且你 return err 對(duì)應(yīng)的錯(cuò)誤,那么其他調(diào)用的服務(wù)會(huì)被取消。
比如我們修改 getProductList 直接響應(yīng)錯(cuò)誤。
- func getProductList() ([]Product, error) {
- return nil, errors.New("test error")
- }
- //打印
- // 用戶信息:<nil>
- // 商品信息:[]
那么最終打印的時(shí)候連用戶信息都會(huì)為空,因?yàn)槌霈F(xiàn)一個(gè)服務(wù)錯(cuò)誤,用戶服務(wù)請(qǐng)求被取消了。
一般情況下,在請(qǐng)求服務(wù)錯(cuò)誤的時(shí)候我們會(huì)有保底操作,一個(gè)服務(wù)錯(cuò)誤不能影響其他請(qǐng)求的結(jié)果。
所以在使用的時(shí)候具體處理取決于業(yè)務(wù)場(chǎng)景。
源碼
既然用了,那么就追下源碼吧。
- func Finish(fns ...func() error) error {
- if len(fns) == 0 {
- return nil
- }
- return MapReduceVoid(func(source chan<- interface{}) {
- for _, fn := range fns {
- source <- fn
- }
- }, func(item interface{}, writer Writer, cancel func(error)) {
- fn := item.(func() error)
- if err := fn(); err != nil {
- cancel(err)
- }
- }, func(pipe <-chan interface{}, cancel func(error)) {
- drain(pipe)
- }, WithWorkers(len(fns)))
- }
- func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
- _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
- reducer(input, cancel)
- drain(input)
- // We need to write a placeholder to let MapReduce to continue on reducer done,
- // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
- writer.Write(lang.Placeholder)
- }, opts...)
- return err
- }
對(duì)于 MapReduceVoid函數(shù),主要查看三個(gè)閉包參數(shù)。
- 第一個(gè) GenerateFunc 用于生產(chǎn)數(shù)據(jù)。
- MapperFunc 讀取生產(chǎn)出的數(shù)據(jù),進(jìn)行處理。
- VoidReducerFunc 這里表示不對(duì) mapper 后的數(shù)據(jù)做聚合返回。所以這個(gè)閉包在此操作幾乎0作用。
- func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) {
- source := buildSource(generate)
- return MapReduceWithSource(source, mapper, reducer, opts...)
- }
- func buildSource(generate GenerateFunc) chan interface{} {
- source := make(chan interface{})// 創(chuàng)建無(wú)緩沖通道
- threading.GoSafe(func() {
- defer close(source)
- generate(source) //開始生產(chǎn)數(shù)據(jù)
- })
- return source //返回?zé)o緩沖通道
- }
buildSource函數(shù)中,返回一個(gè)無(wú)緩沖的通道。并開啟一個(gè) G 運(yùn)行 generate(source),往無(wú)緩沖通道塞數(shù)據(jù)。這個(gè)generate(source) 不就是一開始 Finish 傳遞的第一個(gè)閉包參數(shù)。
- return MapReduceVoid(func(source chan<- interface{}) {
- // 就這個(gè)
- for _, fn := range fns {
- source <- fn
- }
- })
然后查看 MapReduceWithSource 函數(shù),
- func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
- opts ...Option) (interface{}, error) {
- options := buildOptions(opts...)
- //任務(wù)執(zhí)行結(jié)束通知信號(hào)
- output := make(chan interface{})
- //將mapper處理完的數(shù)據(jù)寫入collector
- collector := make(chan interface{}, options.workers)
- // 取消操作信號(hào)
- done := syncx.NewDoneChan()
- writer := newGuardedWriter(output, done.Done())
- var closeOnce sync.Once
- var retErr errorx.AtomicError
- finish := func() {
- closeOnce.Do(func() {
- done.Close()
- close(output)
- })
- }
- cancel := once(func(err error) {
- if err != nil {
- retErr.Set(err)
- } else {
- retErr.Set(ErrCancelWithNil)
- }
- drain(source)
- finish()
- })
- go func() {
- defer func() {
- if r := recover(); r != nil {
- cancel(fmt.Errorf("%v", r))
- } else {
- finish()
- }
- }()
- reducer(collector, writer, cancel)
- drain(collector)
- }()
- // 真正從生成器通道取數(shù)據(jù)執(zhí)行Mapper
- go executeMappers(func(item interface{}, w Writer) {
- mapper(item, w, cancel)
- }, source, collector, done.Done(), options.workers)
- value, ok := <-output
- if err := retErr.Load(); err != nil {
- return nil, err
- } else if ok {
- return value, nil
- } else {
- return nil, ErrReduceNoOutput
- }
- }
這段代碼挺長(zhǎng)的,我們說(shuō)下核心的點(diǎn)。這里使用一個(gè)G 調(diào)用 executeMappers 方法。
- go executeMappers(func(item interface{}, w Writer) {
- mapper(item, w, cancel)
- }, source, collector, done.Done(), options.workers)
- func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},
- done <-chan lang.PlaceholderType, workers int) {
- var wg sync.WaitGroup
- defer func() {
- // 等待所有任務(wù)全部執(zhí)行完畢
- wg.Wait()
- // 關(guān)閉通道
- close(collector)
- }()
- //根據(jù)指定數(shù)量創(chuàng)建 worker池
- pool := make(chan lang.PlaceholderType, workers)
- writer := newGuardedWriter(collector, done)
- for {
- select {
- case <-done:
- return
- case pool <- lang.Placeholder:
- // 從buildSource() 返回的無(wú)緩沖通道取數(shù)據(jù)
- item, ok := <-input
- // 當(dāng)通道關(guān)閉,結(jié)束
- if !ok {
- <-pool
- return
- }
- wg.Add(1)
- // better to safely run caller defined method
- threading.GoSafe(func() {
- defer func() {
- wg.Done()
- <-pool
- }()
- //真正運(yùn)行閉包函數(shù)的地方
- // func(item interface{}, w Writer) {
- // mapper(item, w, cancel)
- // }
- mapper(item, writer)
- })
- }
- }
- }
具體的邏輯已備注,代碼很容易懂。
一旦 executeMappers 函數(shù)返回,關(guān)閉 collector 通道,那么執(zhí)行 reducer 不再阻塞。
- go func() {
- defer func() {
- if r := recover(); r != nil {
- cancel(fmt.Errorf("%v", r))
- } else {
- finish()
- }
- }()
- reducer(collector, writer, cancel)
- //這里
- drain(collector)
- }()
這里的 reducer(collector, writer, cancel) 其實(shí)就是從 MapReduceVoid 傳遞的第三個(gè)閉包函數(shù)。
- func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
- _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
- reducer(input, cancel)
- //這里
- drain(input)
- // We need to write a placeholder to let MapReduce to continue on reducer done,
- // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
- writer.Write(lang.Placeholder)
- }, opts...)
- return err
- }
然后這個(gè)閉包函數(shù)又執(zhí)行了 reducer(input, cancel),這里的 reducer 就是我們一開始解釋過的 VoidReducerFunc,從 Finish() 而來(lái)。
等等,看到上面三個(gè)地方的 drain(input)了嗎?
- // drain drains the channel.
- func drain(channel <-chan interface{}) {
- // drain the channel
- for range channel {
- }
- }
其實(shí)就是一個(gè)排空 channel 的操作,但是三個(gè)地方都對(duì)同一個(gè) channel做同樣的操作,也是讓我費(fèi)解。
還有更重要的一點(diǎn)。
- go func() {
- defer func() {
- if r := recover(); r != nil {
- cancel(fmt.Errorf("%v", r))
- } else {
- finish()
- }
- }()
- reducer(collector, writer, cancel)
- drain(collector)
- }()
上面的代碼,假如執(zhí)行 reducer,writer 寫入引發(fā) panic,那么drain(collector) 將沒有機(jī)會(huì)執(zhí)行。
不過作者已經(jīng)修復(fù)了這個(gè)問題,直接把 drain(collector) 放入到 defer。
具體 issues[1]。
到這里,關(guān)于 Finish 的源碼也就結(jié)束了。感興趣的可以看看其他源碼。
很喜歡 go-zero 里的一些工具,但是工具往往并不獨(dú)立,依賴于其他文件包,導(dǎo)致明明只想使用其中一個(gè)工具卻需要安裝整個(gè)包。
所以最終的結(jié)果就是扒源碼,創(chuàng)建無(wú)依賴庫(kù)工具集,遵循 MIT 即可。
附錄[1]https://github.com/tal-tech/go-zero/issues/676