自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

如何優(yōu)雅地實(shí)現(xiàn)并發(fā)編排任務(wù)

開發(fā) 前端
一個(gè)應(yīng)用首頁(yè)可能依托于很多服務(wù)。那就涉及到在加載頁(yè)面時(shí)需要同時(shí)請(qǐng)求多個(gè)服務(wù)的接口。這一步往往是由后端統(tǒng)一調(diào)用組裝數(shù)據(jù)再返回給前端,也就是所謂的 BFF(Backend For Frontend) 層。

 [[399159]]

本文轉(zhuǎn)載自微信公眾號(hào)「吳親強(qiáng)的深夜食堂」,作者吳親庫(kù)里。轉(zhuǎn)載本文請(qǐng)聯(lián)系吳親強(qiáng)的深夜食堂公眾號(hào)。

業(yè)務(wù)場(chǎng)景

在做任務(wù)開發(fā)的時(shí)候,你們一定會(huì)碰到以下場(chǎng)景:

場(chǎng)景1:調(diào)用第三方接口的時(shí)候, 一個(gè)需求你需要調(diào)用不同的接口,做數(shù)據(jù)組裝。

場(chǎng)景2:一個(gè)應(yīng)用首頁(yè)可能依托于很多服務(wù)。那就涉及到在加載頁(yè)面時(shí)需要同時(shí)請(qǐng)求多個(gè)服務(wù)的接口。這一步往往是由后端統(tǒng)一調(diào)用組裝數(shù)據(jù)再返回給前端,也就是所謂的 BFF(Backend For Frontend) 層。

針對(duì)以上兩種場(chǎng)景,假設(shè)在沒有強(qiáng)依賴關(guān)系下,選擇串行調(diào)用,那么總耗時(shí)即:

  1. time=s1+s2+....sn 

按照當(dāng)代秒入百萬(wàn)的有為青年,這么長(zhǎng)時(shí)間早就把你祖宗十八代問候了一遍。

為了偉大的KPI,我們往往會(huì)選擇并發(fā)地調(diào)用這些依賴接口。那么總耗時(shí)就是:

  1. time=max(s1,s2,s3.....,sn) 

當(dāng)然開始堆業(yè)務(wù)的時(shí)候可以先串行化,等到上面的人著急的時(shí)候,亮出絕招。

這樣,年底 PPT 就可以加上濃重的一筆流水賬:為業(yè)務(wù)某個(gè)接口提高百分之XXX性能,間接產(chǎn)生XXX價(jià)值。

當(dāng)然這一切的前提是,做老板不懂技術(shù),做技術(shù)”懂”你。

言歸正傳,如果修改成并發(fā)調(diào)用,你可能會(huì)這么寫,

  1. package main 
  2.  
  3. import ( 
  4.     "fmt" 
  5.     "sync" 
  6.     "time" 
  7.  
  8. func main() { 
  9.     var wg sync.WaitGroup 
  10.     wg.Add(2) 
  11.  
  12.     var userInfo *User 
  13.     var productList []Product 
  14.  
  15.     go func() { 
  16.         defer wg.Done() 
  17.         userInfo, _ = getUser() 
  18.     }() 
  19.  
  20.     go func() { 
  21.         defer wg.Done() 
  22.         productList, _ = getProductList() 
  23.     }() 
  24.     wg.Wait() 
  25.     fmt.Printf("用戶信息:%+v\n", userInfo) 
  26.     fmt.Printf("商品信息:%+v\n", productList) 
  27.  
  28.  
  29. /********用戶服務(wù)**********/ 
  30.  
  31. type User struct { 
  32.     Name string 
  33.     Age uint8 
  34.  
  35. func getUser() (*User, error) { 
  36.     time.Sleep(500 * time.Millisecond) 
  37.     var u User 
  38.     u.Name = "wuqinqiang" 
  39.     u.Age = 18 
  40.     return &u, nil 
  41.  
  42. /********商品服務(wù)**********/ 
  43.  
  44. type Product struct { 
  45.     Title string 
  46.     Price uint32 
  47.  
  48. func getProductList() ([]Product, error) { 
  49.     time.Sleep(400 * time.Millisecond) 
  50.     var list []Product 
  51.     list = append(list, Product{ 
  52.         Title: "SHib"
  53.         Price: 10, 
  54.     }) 
  55.     return list, nil 

從實(shí)現(xiàn)上來(lái)說(shuō),需要多少服務(wù),會(huì)開多少個(gè) G,利用 sync.WaitGroup 的特性,

實(shí)現(xiàn)并發(fā)編排任務(wù)的效果。

好像,問題不大。

但是隨著代號(hào) 996 業(yè)務(wù)場(chǎng)景的增加,你會(huì)發(fā)現(xiàn),好多模塊都有相似的功能,只是對(duì)應(yīng)的業(yè)務(wù)場(chǎng)景不同而已。

那么我們能不能抽像出一套針對(duì)此業(yè)務(wù)場(chǎng)景的工具,而把具體業(yè)務(wù)實(shí)現(xiàn)交給業(yè)務(wù)方。

使用

本著不重復(fù)造輪子的原則,去搜了下開源項(xiàng)目,最終看上了 go-zero 里面的一個(gè)工具 mapreduce。

可以自行 Google 這個(gè)名詞。

使用很簡(jiǎn)單。我們通過它改造一下上面的代碼:

  1. package main 
  2.  
  3. import ( 
  4.     "fmt" 
  5.     "github.com/tal-tech/go-zero/core/mr" 
  6.     "time" 
  7.  
  8. func main() { 
  9.     var userInfo *User 
  10.     var productList []Product 
  11.     _ = mr.Finish(func() (err error) { 
  12.         userInfo, err = getUser() 
  13.         return err 
  14.     }, func() (err error) { 
  15.         productList, err = getProductList() 
  16.         return err 
  17.     }) 
  18.     fmt.Printf("用戶信息:%+v\n", userInfo) 
  19.     fmt.Printf("商品信息:%+v\n", productList) 
  20. //打印 
  21. 用戶信息:&{Name:wuqinqiang Age:18} 
  22. 商品信息:[{Title:SHib Price:10}] 

是不是舒服多了。

但是這里還需要注意一點(diǎn),假設(shè)你調(diào)用的其中一個(gè)服務(wù)錯(cuò)誤,并且你 return err 對(duì)應(yīng)的錯(cuò)誤,那么其他調(diào)用的服務(wù)會(huì)被取消。

比如我們修改 getProductList 直接響應(yīng)錯(cuò)誤。

  1. func getProductList() ([]Product, error) { 
  2.     return nil, errors.New("test error"
  3. //打印 
  4. // 用戶信息:<nil> 
  5. // 商品信息:[] 

那么最終打印的時(shí)候連用戶信息都會(huì)為空,因?yàn)槌霈F(xiàn)一個(gè)服務(wù)錯(cuò)誤,用戶服務(wù)請(qǐng)求被取消了。

一般情況下,在請(qǐng)求服務(wù)錯(cuò)誤的時(shí)候我們會(huì)有保底操作,一個(gè)服務(wù)錯(cuò)誤不能影響其他請(qǐng)求的結(jié)果。

所以在使用的時(shí)候具體處理取決于業(yè)務(wù)場(chǎng)景。

源碼

既然用了,那么就追下源碼吧。

  1. func Finish(fns ...func() error) error { 
  2.     if len(fns) == 0 { 
  3.         return nil 
  4.     } 
  5.  
  6.     return MapReduceVoid(func(source chan<- interface{}) { 
  7.         for _, fn := range fns { 
  8.             source <- fn 
  9.         } 
  10.     }, func(item interface{}, writer Writer, cancel func(error)) { 
  11.         fn := item.(func() error) 
  12.         if err := fn(); err != nil { 
  13.             cancel(err) 
  14.         } 
  15.     }, func(pipe <-chan interface{}, cancel func(error)) { 
  16.         drain(pipe) 
  17.     }, WithWorkers(len(fns))) 
  18. }  
  1. func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error { 
  2.     _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) { 
  3.         reducer(input, cancel) 
  4.         drain(input) 
  5.         // We need to write a placeholder to let MapReduce to continue on reducer done, 
  6.         // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce. 
  7.         writer.Write(lang.Placeholder) 
  8.     }, opts...) 
  9.     return err 

對(duì)于 MapReduceVoid函數(shù),主要查看三個(gè)閉包參數(shù)。

  • 第一個(gè) GenerateFunc 用于生產(chǎn)數(shù)據(jù)。
  • MapperFunc 讀取生產(chǎn)出的數(shù)據(jù),進(jìn)行處理。
  • VoidReducerFunc 這里表示不對(duì) mapper 后的數(shù)據(jù)做聚合返回。所以這個(gè)閉包在此操作幾乎0作用。
  1. func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) { 
  2.     source := buildSource(generate)  
  3.     return MapReduceWithSource(source, mapper, reducer, opts...) 
  4.  
  5. func buildSource(generate GenerateFunc) chan interface{} { 
  6.     source := make(chan interface{})// 創(chuàng)建無(wú)緩沖通道 
  7.     threading.GoSafe(func() { 
  8.         defer close(source) 
  9.         generate(source) //開始生產(chǎn)數(shù)據(jù) 
  10.     }) 
  11.  
  12.     return source //返回?zé)o緩沖通道 

buildSource函數(shù)中,返回一個(gè)無(wú)緩沖的通道。并開啟一個(gè) G 運(yùn)行 generate(source),往無(wú)緩沖通道塞數(shù)據(jù)。這個(gè)generate(source) 不就是一開始 Finish 傳遞的第一個(gè)閉包參數(shù)。

  1. return MapReduceVoid(func(source chan<- interface{}) { 
  2.     // 就這個(gè) 
  3.         for _, fn := range fns { 
  4.             source <- fn 
  5.         } 
  6.     }) 

然后查看 MapReduceWithSource 函數(shù),

  1. func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc, 
  2.     opts ...Option) (interface{}, error) { 
  3.     options := buildOptions(opts...) 
  4.     //任務(wù)執(zhí)行結(jié)束通知信號(hào) 
  5.     output := make(chan interface{}) 
  6.     //將mapper處理完的數(shù)據(jù)寫入collector 
  7.     collector := make(chan interface{}, options.workers) 
  8.     // 取消操作信號(hào) 
  9.     done := syncx.NewDoneChan() 
  10.     writer := newGuardedWriter(output, done.Done()) 
  11.     var closeOnce sync.Once 
  12.     var retErr errorx.AtomicError 
  13.     finish := func() { 
  14.         closeOnce.Do(func() { 
  15.             done.Close() 
  16.             close(output
  17.         }) 
  18.     } 
  19.     cancel := once(func(err error) { 
  20.         if err != nil { 
  21.             retErr.Set(err) 
  22.         } else { 
  23.             retErr.Set(ErrCancelWithNil) 
  24.         } 
  25.  
  26.         drain(source) 
  27.         finish() 
  28.     }) 
  29.  
  30.     go func() { 
  31.         defer func() { 
  32.             if r := recover(); r != nil { 
  33.                 cancel(fmt.Errorf("%v", r)) 
  34.             } else { 
  35.                 finish() 
  36.             } 
  37.         }() 
  38.         reducer(collector, writer, cancel) 
  39.         drain(collector) 
  40.     }() 
  41.     // 真正從生成器通道取數(shù)據(jù)執(zhí)行Mapper 
  42.     go executeMappers(func(item interface{}, w Writer) { 
  43.         mapper(item, w, cancel) 
  44.     }, source, collector, done.Done(), options.workers) 
  45.  
  46.     value, ok := <-output 
  47.     if err := retErr.Load(); err != nil { 
  48.         return nil, err 
  49.     } else if ok { 
  50.         return value, nil 
  51.     } else { 
  52.         return nil, ErrReduceNoOutput 
  53.     } 

這段代碼挺長(zhǎng)的,我們說(shuō)下核心的點(diǎn)。這里使用一個(gè)G 調(diào)用 executeMappers 方法。

  1. go executeMappers(func(item interface{}, w Writer) { 
  2.         mapper(item, w, cancel) 
  3.     }, source, collector, done.Done(), options.workers) 
  1. func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{}, 
  2.     done <-chan lang.PlaceholderType, workers int) { 
  3.     var wg sync.WaitGroup 
  4.     defer func() { 
  5.         // 等待所有任務(wù)全部執(zhí)行完畢 
  6.         wg.Wait() 
  7.         // 關(guān)閉通道 
  8.         close(collector) 
  9.     }() 
  10.    //根據(jù)指定數(shù)量創(chuàng)建 worker池 
  11.     pool := make(chan lang.PlaceholderType, workers)  
  12.     writer := newGuardedWriter(collector, done) 
  13.     for { 
  14.         select { 
  15.         case <-done: 
  16.             return 
  17.         case pool <- lang.Placeholder: 
  18.             // 從buildSource() 返回的無(wú)緩沖通道取數(shù)據(jù) 
  19.             item, ok := <-input  
  20.             // 當(dāng)通道關(guān)閉,結(jié)束 
  21.             if !ok { 
  22.                 <-pool 
  23.                 return 
  24.             } 
  25.  
  26.             wg.Add(1) 
  27.             // better to safely run caller defined method 
  28.             threading.GoSafe(func() { 
  29.                 defer func() { 
  30.                     wg.Done() 
  31.                     <-pool 
  32.                 }() 
  33.                 //真正運(yùn)行閉包函數(shù)的地方 
  34.                // func(item interface{}, w Writer) { 
  35.                // mapper(item, w, cancel) 
  36.                // } 
  37.                 mapper(item, writer) 
  38.             }) 
  39.         } 
  40.     } 

具體的邏輯已備注,代碼很容易懂。

一旦 executeMappers 函數(shù)返回,關(guān)閉 collector 通道,那么執(zhí)行 reducer 不再阻塞。

  1. go func() { 
  2.         defer func() { 
  3.             if r := recover(); r != nil { 
  4.                 cancel(fmt.Errorf("%v", r)) 
  5.             } else { 
  6.                 finish() 
  7.             } 
  8.         }() 
  9.         reducer(collector, writer, cancel) 
  10.         //這里 
  11.         drain(collector) 
  12.     }() 

這里的 reducer(collector, writer, cancel) 其實(shí)就是從 MapReduceVoid 傳遞的第三個(gè)閉包函數(shù)。

  1. func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error { 
  2.     _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) { 
  3.         reducer(input, cancel) 
  4.         //這里 
  5.         drain(input) 
  6.         // We need to write a placeholder to let MapReduce to continue on reducer done, 
  7.         // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce. 
  8.         writer.Write(lang.Placeholder) 
  9.     }, opts...) 
  10.     return err 

然后這個(gè)閉包函數(shù)又執(zhí)行了 reducer(input, cancel),這里的 reducer 就是我們一開始解釋過的 VoidReducerFunc,從 Finish() 而來(lái)。

等等,看到上面三個(gè)地方的 drain(input)了嗎?

  1. // drain drains the channel. 
  2. func drain(channel <-chan interface{}) { 
  3.     // drain the channel 
  4.     for range channel { 
  5.     } 

其實(shí)就是一個(gè)排空 channel 的操作,但是三個(gè)地方都對(duì)同一個(gè) channel做同樣的操作,也是讓我費(fèi)解。

還有更重要的一點(diǎn)。

  1. go func() { 
  2.         defer func() { 
  3.             if r := recover(); r != nil { 
  4.                 cancel(fmt.Errorf("%v", r)) 
  5.             } else { 
  6.                 finish() 
  7.             } 
  8.         }() 
  9.         reducer(collector, writer, cancel) 
  10.         drain(collector) 
  11.     }() 

上面的代碼,假如執(zhí)行 reducer,writer 寫入引發(fā) panic,那么drain(collector) 將沒有機(jī)會(huì)執(zhí)行。

不過作者已經(jīng)修復(fù)了這個(gè)問題,直接把 drain(collector) 放入到 defer。

具體 issues[1]。

到這里,關(guān)于 Finish 的源碼也就結(jié)束了。感興趣的可以看看其他源碼。

很喜歡 go-zero 里的一些工具,但是工具往往并不獨(dú)立,依賴于其他文件包,導(dǎo)致明明只想使用其中一個(gè)工具卻需要安裝整個(gè)包。

所以最終的結(jié)果就是扒源碼,創(chuàng)建無(wú)依賴庫(kù)工具集,遵循 MIT 即可。

附錄[1]https://github.com/tal-tech/go-zero/issues/676

 

責(zé)任編輯:武曉燕 來(lái)源: 吳親強(qiáng)的深夜食堂
相關(guān)推薦

2020-12-08 08:08:51

Java接口數(shù)據(jù)

2024-05-16 17:58:30

線程任務(wù)線程通訊線程池

2021-03-24 10:20:50

Fonts前端代碼

2024-09-09 15:09:30

2023-06-06 08:51:06

2024-11-13 16:37:00

Java線程池

2024-12-24 08:03:56

2020-07-07 07:33:12

Java單元集成

2020-03-26 11:04:00

Linux命令光標(biāo)

2021-01-18 13:17:04

鴻蒙HarmonyOSAPP

2022-05-13 21:20:23

組件庫(kù)樣式選擇器

2021-01-28 14:53:19

PHP編碼開發(fā)

2022-05-24 06:07:48

JShack用戶代碼

2024-04-24 12:34:08

Spring事務(wù)編程

2021-07-07 07:47:10

瀏覽器CSS兼容

2020-03-27 15:10:23

SpringJava框架

2024-01-30 12:08:31

Go框架停止服務(wù)

2021-09-08 08:34:37

Go 文檔Goland

2020-10-22 10:15:33

優(yōu)化Windows電腦

2018-08-20 10:40:09

Redis位圖操作
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)