自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

一個例子,給你講透典型的Go并發(fā)控制

開發(fā) 前端
本篇從基礎(chǔ)的sync.WaitGroup{}?庫出發(fā),涉及到了并發(fā)安全、sync.Once?等內(nèi)容。最后介紹了并發(fā)控制的利器:golang.org/x/sync/errgroup。

Go中可以使用一個go關(guān)鍵字讓程序異步執(zhí)行

一個比較常見的場景:逐個異步調(diào)用多個函數(shù),或者循環(huán)中異步調(diào)用

func main() {
 go do1()
 go do2()
 go do3()
}

// 或者

func main() {
 for i := range []int{1,2,3}{
  go do(i)
 }
}

如果了解Go并發(fā)機制,就知道m(xù)ain在其他goroutine運行完成之前就已經(jīng)結(jié)束了,所以上面代碼的運行結(jié)果是不符合預期的。我們需要使用一種叫做并發(fā)控制的手段,來保證程序正確運行

為了更容易理解,我們虛擬一個??

已知有一個現(xiàn)成的函數(shù)search,能夠按照關(guān)鍵詞執(zhí)行搜索

期望實現(xiàn)一個新的函數(shù)coSearch能夠進行批量查詢

package main

import (
 "context"
 "errors"
 "fmt"
 "sync"
)

func search(ctx context.Context, word string) (string, error) {
 if word == "Go" {
  return "", errors.New("error: Go") // 模擬結(jié)果
 }
 return fmt.Sprintf("result: %s", word), nil // 模擬結(jié)果
}

func coSearch(ctx context.Context, words []string) (results []string, err error) {
 //tbd

 return
}

func main() {
 words := []string{"Go", "Rust", "PHP", "JavaScript", "Java"}
 results, err := coSearch(context.Background(), words)
 if err != nil {
  fmt.Println(err)
  return
 }

 fmt.Println(results)
}

可以先暫停想想該如何實現(xiàn)coSearch函數(shù)

并發(fā)控制基礎(chǔ)

sync.WaitGroup是Go標準庫中用來控制并發(fā)的結(jié)構(gòu),這里放一個使用WaitGroup實現(xiàn)coSearch的示例

package main

import (
 "context"
 "errors"
 "fmt"
 "sync"
)

func search(ctx context.Context, word string) (string, error) {
 if word == "Go" {
  return "", errors.New("error: Go") // 模擬結(jié)果
 }
 return fmt.Sprintf("result: %s", word), nil // 模擬結(jié)果
}

func coSearch(ctx context.Context, words []string) ([]string, error) {
 var (
  wg      = sync.WaitGroup{}
  once    = sync.Once{}
  results = make([]string, len(words))
  err     error
 )

 for i, word := range words {
  wg.Add(1)

  go func(word string, i int) {
   defer wg.Done()

   result, e := search(ctx, word)
   if e != nil {
    once.Do(func() {
     err = e
    })

    return
   }

   results[i] = result
  }(word, i)
 }

 wg.Wait()

 return results, err
}

func main() {
 words := []string{"Go", "Rust", "PHP", "JavaScript", "Java"}
 results, err := coSearch(context.Background(), words)
 if err != nil {
  fmt.Println(err)
  return
 }

 fmt.Println(results)
}

上面的代碼中有非常多的細節(jié),來逐個聊一聊

?? sync.WaitGroup{}并發(fā)控制

sync.WaitGroup{}的用法非常簡潔

  • 當新運行一個goroutine時,我們需要調(diào)用wg.Add(1)
  • 當一個goroutine運行完成的時候,我們需要調(diào)用wg.Done()
  • wg.Wait()讓程序阻塞在此處,直到所有的goroutine運行完畢。

對于coSearch來說,等待所有g(shù)oroutine運行完成,也就完成了函數(shù)的任務(wù),返回最終的結(jié)果

var (
    wg      = sync.WaitGroup{}
    //...省略其他代碼
)

for i, word := range words {
    wg.Add(1)

    go func(word string, i int) {
        defer wg.Done()
  //...省略其他代碼
    }(word, i)
}

wg.Wait()

?? for循環(huán)中的goroutine!

這是一個Go經(jīng)典錯誤,如果goroutine中使用了for迭代的變量,所有g(shù)oroutine都會獲得最后一次循環(huán)的值。例如下面的示例,并不會輸出"a", "b", "c" 而是輸出 "c", "c", "c"

func main() {
    done := make(chan bool)

    values := []string{"a", "b", "c"}
    for _, v := range values {
        go func() {
            fmt.Println(v)
            done <- true
        }()
    }

    // wait for all goroutines to complete before exiting
    for _ = range values {
        <-done
    }
}

正確的做法就是像上文示例一樣,將迭代的變量賦值給函數(shù)參數(shù),或者賦值給新的變量

for i, word := range words {
 // ...
    go func(word string, i int) {
        // fmt.Println(word, i)
    }(word, i)
}

for i, word := range words {
    i, word := i, word
    go func() {
        // fmt.Println(word, i)
    }()
}

由于這個錯誤實在太常見,從Go 1.22開始Go已經(jīng)修正了這個經(jīng)典的錯誤:Fixing For Loops in Go 1.22。

不過Go 1.22默認不會開啟修正,需要設(shè)置環(huán)境變量GOEXPERIMENT=loopvar才會 開啟

??  并發(fā)安全

簡單理解:當多個goroutine對同一個內(nèi)存區(qū)域進行讀寫時,就會產(chǎn)生并發(fā)安全的問題,它會導致程序運行的結(jié)果不符合預期

上面的示例把最終的結(jié)果放入了results = make([]string, len(words))中。雖然我們在goroutine中并發(fā)的對于results變量進行寫入,但因為每一個goroutine都寫在了獨立的位置,且沒有任何讀取的操作,因此results[i] = result是并發(fā)安全的

results = [ xxxxxxxx,    xxxxxxxx,    xxxxxxxx,    .... ]
                ^            ^            ^       
                |            |            |       
           goroutine1   goroutine2    goroutine3

這也意味著如果使用results = append(results, result)的方式并發(fā)賦值,因為會涉及到slice的擴容等操作,所以并不是并發(fā)安全的,需要利用sync.Mutex{}進行加鎖

如果想盡可能的提高程序的并發(fā)性能,推薦使用 results[i] = result這種方式賦值

?? sync.Once{}單次賦值

示例coSearch中,會返回第一個出錯的search的error。err是一個全局變量,在并發(fā)goroutine中賦值是并發(fā)不安全的操作

//...省略其他代碼
go func(word string, i int) {
    defer wg.Done()

    result, e := search(ctx, word)
    if e != nil && err == nil {
        err = e

        return
    }

    results[i] = result
}(word, i)
//...省略其他代碼

對于全局變量的賦值比較常規(guī)做法就是利用sync.Mutex{}進行加鎖。但示例的邏輯為單次賦值,我們剛好可以利用同在sync庫的sync.Once{}來簡化代碼

sync.Once{}功能如其名,將我們要執(zhí)行的邏輯放到它的Do()方法中,無論多少并發(fā)都只會執(zhí)行一次

//...省略其他代碼
go func(word string, i int) {
    defer wg.Done()

    result, e := search(ctx, word)
    if e != nil {
        once.Do(func() {
            err = e
        })

        return
    }

    results[i] = result
}(word, i)
//...省略其他代碼

Further more

上面的示例coSearch已經(jīng)是一個比較完善的函數(shù)了,但我們還可以做得更多

?? goroutine數(shù)量控制

coSearch入?yún)⒌臄?shù)組可能非常大,如果不加以控制可能導致我們的服務(wù)器資源耗盡,我們需要控制并發(fā)的數(shù)量

利用帶緩沖channel可以實現(xiàn)

tokens := make(chan struct{}, 10)

for i, word := range words {
    tokens <- struct{}{} // 新增
    wg.Add(1)

    go func(word string, i int) {
        defer func() {
            wg.Done()
            <-tokens  // 新增
        }()

        result, e := search(ctx, word)
        if e != nil {
            once.Do(func() {
                err = e
            })

            return
        }

        results[i] = result
    }(word, i)
}

wg.Wait()

如上,代碼中創(chuàng)建了10個緩沖區(qū)的channel,當channel被填滿時,繼續(xù)寫入會被阻塞;當goroutine運行完成之后,除了原有的wg.Done(),我們需要從channel讀取走一個數(shù)據(jù),來允許新的goroutine運行

通過這種方式,我們控制了coSearch最多只能運行10個goroutine,當超過10個時需要等待前面運行的goroutine結(jié)束

?? context.Context

并發(fā)執(zhí)行的goroutine只要有一個出錯,其他goroutine就可以停止,沒有必要繼續(xù)執(zhí)行下去了。如何把取消的事件傳導到其他goroutine呢?context.Context就是用來傳遞類似上下文信息的結(jié)構(gòu)

ctx, cancel := context.WithCancelCause(ctx) // 新增
defer cancel(nil) // 新增

for i, word := range words {
    tokens <- struct{}{}
    wg.Add(1)

    go func(word string, i int) {
        defer func() {
            wg.Done()
            <-tokens
        }()

        result, e := search(ctx, word)
        if e != nil {
            once.Do(func() {
                err = e
                cancel(e) // 新增
            })

            return
        }

        results[i] = result
    }(word, i)
}

wg.Wait()

完整的代碼

最終完成的效果如下

package main

import (
 "context"
 "errors"
 "fmt"
 "sync"
)

func search(ctx context.Context, word string) (string, error) {
 select {
 case <-ctx.Done():
  return "", ctx.Err()
 default:
  if word == "Go" || word == "Java" {
   return "", errors.New("Go or Java")
  }
  return fmt.Sprintf("result: %s", word), nil // 模擬結(jié)果
 }
}

func coSearch(ctx context.Context, words []string) ([]string, error) {
 ctx, cancel := context.WithCancelCause(ctx)
 defer cancel(nil)

 var (
  wg   = sync.WaitGroup{}
  once = sync.Once{}

  results = make([]string, len(words))
  tokens  = make(chan struct{}, 2)

  err error
 )

 for i, word := range words {
  tokens <- struct{}{}
  wg.Add(1)

  go func(word string, i int) {
   defer func() {
    wg.Done()
    <-tokens
   }()

   result, e := search(ctx, word)
   if e != nil {
    once.Do(func() {
     err = e
     cancel(e)
    })

    return
   }

   results[i] = result
  }(word, i)
 }

 wg.Wait()

 return results, err
}

并發(fā)控制庫errgroup

可以看到要實現(xiàn)一個較為完備的并發(fā)控制,需要做的工作非常多。不過Go官方團隊為大家準備了 golang.org/x/sync/errgroup

errgroup提供的能力和上文的示例類似,實現(xiàn)方式也類似,包含并發(fā)控制,錯誤傳遞,context.Context傳遞等

package main

import (
 "context"
 "fmt"
 "sync"

 "golang.org/x/sync/errgroup"
)

func coSearch(ctx context.Context, words []string) ([]string, error) {
 g, ctx := errgroup.WithContext(ctx)
 g.SetLimit(10)
 
 results := make([]string, len(words))

 for i, word := range words {
  i, word := i, word

  g.Go(func() error {
   result, err := search(ctx, word)
   if err != nil {
    return err
   }

   results[i] = result
   return nil
  })
 }

 err := g.Wait()

 return results, err
}

errgroup的用法也很簡單

  • 使用 g, ctx := errgroup.WithContext(ctx)來創(chuàng)建goroutine的管理器
  • g.SetLimit()可以設(shè)置允許的最大的goroutine數(shù)量
  • 類似于go關(guān)鍵詞, g.Go異步執(zhí)行函數(shù)
  • g.Wait()和sync.WaitGroup{}的wg.Wait()類似,會阻塞直到所有g(shù)oroutine都運行完成,并返回其中一個goroutine的錯誤

利用golang.org/x/sync/errgroup大幅簡化了進行并發(fā)控制的邏輯,真是一個并發(fā)控制的利器??!

總結(jié)

本篇從基礎(chǔ)的sync.WaitGroup{}庫出發(fā),涉及到了并發(fā)安全、sync.Once等內(nèi)容。最后介紹了并發(fā)控制的利器:golang.org/x/sync/errgroup。

雖然使用Go語言能夠非常簡單的編寫并發(fā)程序,但其中要注意的細節(jié)非常多,忽略這些細節(jié)不僅沒有提升程序運行的效率,還會產(chǎn)生錯誤的結(jié)果

責任編輯:武曉燕 來源: 涼涼的知識庫
相關(guān)推薦

2021-07-28 08:32:58

Go并發(fā)Select

2021-04-20 11:40:47

指針類型CPU

2024-09-06 12:52:59

2009-06-18 15:53:37

Hibernate B

2009-08-10 10:08:45

.NET調(diào)用PHP W

2018-07-03 15:20:36

Promise函數(shù)借錢

2025-03-28 08:30:00

PythonPandasaxis

2023-03-14 08:02:14

靜態(tài)路由動態(tài)路由設(shè)備

2009-06-11 14:48:48

jbpm工作流引擎jbpm例子

2021-06-24 06:35:00

Go語言進程

2024-01-25 11:41:00

Python開發(fā)前端

2021-03-24 06:06:13

Go并發(fā)編程Singlefligh

2023-11-06 13:55:59

聯(lián)合索引MySQ

2020-09-06 22:59:35

Linux文件命令

2021-07-09 06:11:37

Java泛型Object類型

2024-06-17 08:40:16

2020-03-26 09:18:54

高薪本質(zhì)因素

2022-10-08 13:29:19

Pandasgroupby

2023-05-25 08:02:09

構(gòu)建工具源碼JS

2023-01-30 16:21:24

Linux外觀
點贊
收藏

51CTO技術(shù)棧公眾號