自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

使用Context、WaitGroup優(yōu)雅處理Goroutine

開發(fā) 后端
最近,我正在編寫一個“滴答器”的應(yīng)用程序,每次“滴答”時可能會產(chǎn)生數(shù)千的 goroutine。我想確保當(dāng)應(yīng)用終止時,即使有一些特定的 goroutine 處理比較緩慢,它也能快速而優(yōu)雅地退出。

本文轉(zhuǎn)載自微信公眾號「Golang來啦」,作者Seekload。轉(zhuǎn)載本文請聯(lián)系Golang來啦公眾號。

你好,我是 Seekload。

今天給大家分享一篇 如何使用 context、waitGroup 實現(xiàn)程序快速且優(yōu)雅退出 的文章!

原文如下:

最近,我正在編寫一個“滴答器”的應(yīng)用程序,每次“滴答”時可能會產(chǎn)生數(shù)千的 goroutine。我想確保當(dāng)應(yīng)用終止時,即使有一些特定的 goroutine 處理比較緩慢,它也能快速而優(yōu)雅地退出。

剛開始的時候,圍繞如何輸出日志,我使用 sync.WaitGroup 實現(xiàn)流程控制,但我很快意識到如果我創(chuàng)建了很多 goroutine,即使其中很小一部分沒有立即返回,我的程序會在終止時 hang 住。這讓我重新考慮 context.WithCancel,并理解該如何重新調(diào)整我的程序,使其能快速且優(yōu)雅地退出!

我們可以通過構(gòu)建示例程序一步步來驗證下,最初的示例程序并不會使用前面提到的技術(shù)點。

  1. package main 
  2.  
  3. import ( 
  4.  "fmt" 
  5.  "log" 
  6.  "math/rand" 
  7.  "os" 
  8.  "os/signal" 
  9.  "syscall" 
  10.  "time" 
  11.  
  12. func doSomething(ch chan int) { 
  13.  fmt.Printf("Received job %d\n", <-ch) 
  14.  
  15. func init() { 
  16.  rand.Seed(time.Now().Unix()) 
  17.  
  18. func main() { 
  19.  var ( 
  20.   closing   = make(chan struct{}) 
  21.   ticker    = time.NewTicker(1 * time.Second
  22.   logger    = log.New(os.Stderr, "", log.LstdFlags) 
  23.   batchSize = 6 
  24.   jobs      = make(chan int, batchSize) 
  25.  ) 
  26.  
  27.  go func() { 
  28.   signals := make(chan os.Signal, 1) 
  29.   signal.Notify(signals, syscall.SIGTERM, os.Interrupt) 
  30.   <-signals 
  31.   close(closing) 
  32.  }() 
  33. loop: 
  34.  for { 
  35.   select { 
  36.   case <-closing: 
  37.    break loop 
  38.   case <-ticker.C: 
  39.    for n := 0; n < batchSize; n++ { 
  40.     jobs <- n 
  41.     go doSomething(jobs) 
  42.    } 
  43.    logger.Printf("Completed doing %d things.", batchSize) 
  44.   } 
  45.  } 

執(zhí)行程序,我們會發(fā)現(xiàn) Received job ... 和 Completed doing ... 會交替輸出,輸出可能類似下面這樣:

  1. Received job 0 
  2. Received job 1 
  3. Received job 2 
  4. 2021/02/08 21:30:59 Completed doing 6 things. 
  5. Received job 3 
  6. Received job 4 
  7. Received job 5 
  8. 2021/02/08 21:31:00 Completed doing 6 things. 

多次打印的結(jié)果并不一致!這是合理的,我們都知道 goroutines 并不會阻塞,所以除非我們對它做些什么,否則協(xié)程里的代碼會立即執(zhí)行。

我們添加 WaitGroup 來完善下流程,先在 var 代碼塊中定義變量:

  1. var ( 
  2.     .. 
  3.     wg sync.WaitGroup 

調(diào)整下 loop 循環(huán):

  1. for n := 0; n < batchSize; n++ { 
  2.     wg.Add(1) 
  3.     jobs <- n 
  4.     go doSomething(&wg, jobs) 
  5. wg.Wait() 
  6. logger.Printf("Completed doing %d things.", batchSize) 

最后,修改協(xié)程函數(shù):

  1. func doSomething(wg *sync.WaitGroup, ch chan int) { 
  2.     defer wg.Done() 
  3.     fmt.Printf("Received job %d\n", <-ch) 

WaitGroups 會等待一組 goroutines 執(zhí)行完成,仔細(xì)閱讀代碼我們發(fā)現(xiàn):

  1. 每次循環(huán)時 WaitGroup 的計數(shù)器會加 1,加 1 原因是因為在 goroutine 里每次調(diào)用 wg.Done() 計數(shù)器會減一,這樣 goroutine 執(zhí)行完成返回之后計數(shù)器能維持平衡;
  2. 在調(diào)用 logger 之前,我們添加了 wg.Wait(),這樣當(dāng)程序執(zhí)行到這里的時候便會阻塞直到 WaitGroups 的計數(shù)器減為 0。當(dāng)所有 goroutines 調(diào)用 wg.Done() 之后,計數(shù)器便會恢復(fù)成 0。

很簡單,是不是?我們再次執(zhí)行程序,可以看到結(jié)果比之前的更一致:

  1. 2021/02/08 21:46:47 Completed doing 6 things. 
  2. Received job 0 
  3. Received job 1 
  4. Received job 2 
  5. Received job 4 
  6. Received job 5 
  7. Received job 3 
  8. 2021/02/08 21:46:48 Completed doing 6 things. 
  9. Received job 0 
  10. Received job 2 
  11. Received job 3 
  12. Received job 4 
  13. Received job 5 
  14. Received job 1 

順便說一句,與預(yù)期的一樣,jobs 并不會按順序執(zhí)行,因為我們并沒有采取任何措施來確保這一點。

在我們繼續(xù)之前,按照目前的狀態(tài)執(zhí)行程序并嘗試使用 Control+D 來終止程序,程序退出不會出現(xiàn)任何問題。

為了證明程序需要進一步完善,讓我們添加一些代碼模擬真實業(yè)務(wù)場景。我們新建一個函數(shù),函數(shù)里面調(diào)用外部 API 并等待請求響應(yīng)。請求過程中,我們將會調(diào)用 context.WithCancel 取消請求。

首先,創(chuàng)建一個未使用 context 的函數(shù)。下面的代碼更復(fù)雜,有必要的話請看注釋:

  1. func doAPICall(wg *sync.WaitGroup) error { 
  2.  defer wg.Done() 
  3.  
  4.  req, err := http.NewRequest("GET""https://httpstat.us/200", nil) 
  5.  if err != nil { 
  6.   return err 
  7.  } 
  8.  
  9.  // The httpstat.us API accepts a sleep parameter which sleeps the request for the 
  10.  // passed time in ms 
  11.  q := req.URL.Query() 
  12.  sleepMin := 1000 
  13.  sleepMax := 4000 
  14.  q.Set("sleep", fmt.Sprintf("%d", rand.Intn(sleepMax-sleepMin)+sleepMin)) 
  15.  req.URL.RawQuery = q.Encode() 
  16.  
  17.  // Make the request to the API in an anonymous function, using a channel to 
  18.  // communicate the results 
  19.  c := make(chan error, 1) 
  20.  go func() { 
  21.   // For the purposes of this example, we're not doing anything with the response. 
  22.   _, err := http.DefaultClient.Do(req) 
  23.   c <- err 
  24.  }() 
  25.  
  26.  // Block until the channel is populated 
  27.  return <-c 

修改定時器“滴答”,刪除調(diào)用 doSomething() 的代碼、刪除 jobs channel(不會再使用到它)并且調(diào)用 doAPICall()。

  1. for n := 0; n < batchSize; n++ { 
  2.     wg.Add(1) 
  3.     go doAPICall(&wg) 

執(zhí)行程序并再次嘗試退出程序:

  • WaitGroup 會等待所有的 goroutines 完成;
  • doAPICall() 調(diào)用會發(fā)生阻塞直到 httpstat.us() 接口返回,調(diào)用耗時大概 1000ms ~ 4000ms;
  • 取決于你終止程序的時間,退出會變得很困難(耗時比較長),試一次可能發(fā)現(xiàn)不了問題,在不同的時刻多嘗試幾次;

現(xiàn)在來演示 context.WithCancel 如何進一步控制程序取消。當(dāng) context.WithCancel 初始化之后,會返回一個 context 和取消函數(shù) CancelFunc()。這個取消函數(shù)會取消 context,第一次聽到這個會困惑。閱讀 Go 官方博客的文章 Go Concurrency Patterns: Context[1] 對于進一步理解 context.WithCancel 會有所幫助,推薦閱讀完本篇文章之后再看!

ok,我們回到正文。為了實現(xiàn)取消流程控制,需要修改下代碼。首先,使用 context 創(chuàng)建一個取消函數(shù):

  1. var ( 
  2.     ctx, cancel = context.WithCancel(context.Background()) 
  3.     ... 

接著,在匿名函數(shù)里監(jiān)聽程序終止的信號,signals 被通知之后調(diào)用 CancelFunc,這意味著上下文將被視為已取消:

  1. go func() { 
  2.     signals := make(chan os.Signal, 1) 
  3.     signal.Notify(signals, syscall.SIGTERM, os.Interrupt) 
  4.     <-signals 
  5.     logger.Println("Initiating shutdown of producer."
  6.     cancel() 
  7.     close(closing) 
  8. }() 

接著,調(diào)整 doAPICall() 函數(shù),多接收一個 context 參數(shù);使用 select-case 修改函數(shù)返回,等待 ctx.Done 或等待請求響應(yīng)。為了簡介,只展示了函數(shù)部分代碼:

  1. func doAPICall(ctx context.Context, ....) { 
  2.     // Cancel the request if ctx.Done is closed or await the response 
  3.     select { 
  4.     case <-ctx.Done(): 
  5.            return ctx.Err() 
  6.     case err := <-c: 
  7.         return err 
  8.     } 

最后,確保調(diào)用 doAPICall() 函數(shù)時傳遞了 context 參數(shù)。現(xiàn)在,運行程序并多次在不同的時間點終止程序。

現(xiàn)在會發(fā)生什么?程序會立即退出。select-case 代碼會監(jiān)聽 ctx.Done 是否關(guān)閉或者接口請求是否響應(yīng),哪個 case 的 channel 信號先到就先執(zhí)行誰。當(dāng)應(yīng)用程序終止時,ctx.Done() 優(yōu)先執(zhí)行并且函數(shù)提前返回,不再關(guān)心請求是否響應(yīng)。WaitGroup 的作用沒變 - 等待一組 goroutines 完成。現(xiàn)在,程序的終止流程得到很大改善。

Go 的基本哲學(xué)之一就是:

Don't communicate by sharing memory; share memory by communicating.

這里,我們使用 channel 在 goroutines 之間傳遞引用,這使得我們能夠改進應(yīng)用程序的流程。

有很多種辦法可以用來改善流程,例如,我們不跨 goroutine 接收 API 的響應(yīng)或者錯誤。值得慶幸的是,Go 很容易就可以實現(xiàn)這點,因此可以將它視為一個起點,如果你還想完善,可以嘗試下這些想法。

下面是完整的示例,僅供參考:

  1. package main 
  2.  
  3. import ( 
  4.  "context" 
  5.  "fmt" 
  6.  "log" 
  7.  "math/rand" 
  8.  "net/http" 
  9.  "os" 
  10.  "os/signal" 
  11.  "sync" 
  12.  "syscall" 
  13.  "time" 
  14.  
  15. func doAPICall(ctx context.Context, wg *sync.WaitGroup) error { 
  16.  defer wg.Done() 
  17.  
  18.  req, err := http.NewRequest("GET""https://httpstat.us/200", nil) 
  19.  if err != nil { 
  20.   return err 
  21.  } 
  22.  
  23.  // The httpstat.us API accepts a sleep parameter which sleeps the request for the 
  24.  // passed time in ms 
  25.  q := req.URL.Query() 
  26.  sleepMin := 1000 
  27.  sleepMax := 4000 
  28.  q.Set("sleep", fmt.Sprintf("%d", rand.Intn(sleepMax-sleepMin)+sleepMin)) 
  29.  req.URL.RawQuery = q.Encode() 
  30.  
  31.  c := make(chan error, 1) 
  32.  go func() { 
  33.   // For the purposes of this example, we're not doing anything with the response. 
  34.   _, err := http.DefaultClient.Do(req) 
  35.   c <- err 
  36.  }() 
  37.  
  38.  // Block until either channel is populated or closed 
  39.  select { 
  40.  case <-ctx.Done(): 
  41.   return ctx.Err() 
  42.  case err := <-c: 
  43.   return err 
  44.  } 
  45.  
  46. func init() { 
  47.  rand.Seed(time.Now().Unix()) 
  48.  
  49. func main() { 
  50.  var ( 
  51.   closing     = make(chan struct{}) 
  52.   ticker      = time.NewTicker(1 * time.Second
  53.   logger      = log.New(os.Stderr, "", log.LstdFlags) 
  54.   batchSize   = 6 
  55.   wg          sync.WaitGroup 
  56.   ctx, cancel = context.WithCancel(context.Background()) 
  57.  ) 
  58.  
  59.  go func() { 
  60.   signals := make(chan os.Signal, 1) 
  61.   signal.Notify(signals, syscall.SIGTERM, os.Interrupt) 
  62.   <-signals 
  63.   cancel() 
  64.   close(closing) 
  65.  }() 
  66. loop: 
  67.  for { 
  68.   select { 
  69.   case <-closing: 
  70.    break loop 
  71.   case <-ticker.C: 
  72.    for n := 0; n < batchSize; n++ { 
  73.     wg.Add(1) 
  74.     go doAPICall(ctx, &wg) 
  75.    } 
  76.    wg.Wait() 
  77.    logger.Printf("Completed doing %d things.", batchSize) 
  78.   } 
  79.  } 

最后一點,本文部分代碼受到博文 Go Concurrency Patterns: Context[2] 的啟發(fā),再次推薦這篇文章。這篇文章還介紹了其他控制函數(shù),比如:context.WithTimeout 等。Go 官方博客是每個人都應(yīng)該閱讀的寶庫!

參考資料

[1]Go Concurrency Patterns: Context: https://blog.golang.org/context

[2]Go Concurrency Patterns: Context: https://blog.golang.org/context

via:https://justbartek.ca/p/golang-context-wg-go-routines/

作者:Bartek

 

責(zé)任編輯:武曉燕 來源: Golang來啦
相關(guān)推薦

2021-07-05 07:55:11

Goroutine錯誤語言

2023-06-16 09:08:39

ReactContextRFC

2023-06-05 09:23:00

Golang同步工具

2022-10-27 11:23:26

GoFrame共享變量

2025-02-14 08:56:09

GoroutineContextChannel

2021-04-28 09:02:48

Golang語言Context

2022-08-26 08:17:14

微服務(wù)Guava開發(fā)

2024-01-15 08:09:44

Fluent錯誤代碼

2024-06-05 11:06:22

Go語言工具

2023-11-28 08:22:05

goroutine語言

2025-04-03 09:12:26

GolangWaitGroup工具

2023-10-10 13:23:18

空指針異常Java

2022-08-08 08:31:55

Go 語言閉包匿名函數(shù)

2022-08-08 06:50:06

Go語言閉包

2023-03-16 08:02:05

WaitGroup任務(wù)數(shù)邏輯

2025-02-07 09:11:04

JSON對象策略

2021-06-17 09:32:39

重復(fù)請求并發(fā)請求Java

2014-07-22 09:01:53

SwiftJSON

2015-11-26 10:53:45

LinuxWindowsMac OS

2017-07-26 11:32:50

NETRabbitMQ系統(tǒng)集成
點贊
收藏

51CTO技術(shù)棧公眾號