講透Go中的并發(fā)接收控制結(jié)構(gòu)Select
select 與 switch
讓我們來復(fù)習(xí)一下switch語句,在switch語句中,會逐個匹配case語句(可以是值也可以是表達式),一個一個的判斷過去,直到有符合的語句存在,執(zhí)行匹配的語句內(nèi)容后跳出switch。
- func demo(number int){
- switch{
- case number >= 90:
- fmt.Println("優(yōu)秀")
- default:
- fmt.Println("太搓了")
- }
- }
而 select 用于處理通道,它的語法與 switch 非常類似。每個 case 語句里必須是一個 channel 操作。它既可以用于 channel 的數(shù)據(jù)接收,也可以用于 channel 的數(shù)據(jù)發(fā)送。
- func foo() {
- chanInt := make(chan int)
- defer close(chanInt)
- go func() {
- select {
- case data, ok := <-chanInt:
- if ok {
- fmt.Println(data)
- }
- default:
- fmt.Println("全部阻塞")
- }
- }()
- chanInt <- 1
- }
輸出1
- 這是一個簡單的接收發(fā)送模型
- 如果 select 的多個分支都滿足條件,則會隨機的選取其中一個滿足條件的分支。
- 第 6 行加上 ok 是因為上一節(jié)講過,如果不加會導(dǎo)致通道關(guān)閉時收到零值
- 回憶之前的知識,讓接收和發(fā)送在不同的goroutine里,否則會死鎖
這個程序存在什么問題?
假如發(fā)送太慢,所有case都處于阻塞狀態(tài),會直接執(zhí)行default的內(nèi)容。這里加一行sleep試試。
- func bar() {
- chanInt := make(chan int)
- defer close(chanInt)
- go func() {
- ....
- }()
- time.Sleep(time.Second)
- chanInt <- 1
- }
- 倒數(shù)第二行加了sleep 1 秒,導(dǎo)致select語句提前結(jié)束
- 猜測一下會輸出全部阻塞嗎?
- 全部阻塞
- fatal error: all goroutines are asleep - deadlock!
- goroutine 1 [chan send]:
- main.bar()
是會輸出全部阻塞的。
因為接收執(zhí)行完了,退出了goroutine,而發(fā)送才剛剛執(zhí)行到,沒有與其匹配的接收,故死鎖。
正確的做法是把接收套在循環(huán)里面。
- func baz() {
- chanInt := make(chan int)
- defer close(chanInt)
- go func() {
- for {
- select {
- ...
- }
- }
- }()
- chanInt <- 1
- }
- 不再死鎖了
- 假如程序不停止,會出現(xiàn)一個泄露的goroutine,永遠的在for循環(huán)中無法跳出,此時引入下一節(jié)的內(nèi)容
通知機制
Go 語言總是簡單和靈活的,雖然沒有針對提供專門的機制來處理退出,但我們可以自己組合
- func main() {
- chanInt, done := make(chan int), make(chan struct{})
- defer close(chanInt)
- defer close(done)
- go func() {
- for {
- select {
- case <-chanInt:
- case <-done:
- break
- }
- }
- }()
- done <- struct{}{}
- }
沒有給chanInt發(fā)送任何東西,按理說會阻塞,導(dǎo)致goroutine泄露
但可以使用額外的通道完成協(xié)程的退出控制
這種方式還可以做到周期性處理任務(wù),下一節(jié)我們再詳細講解
case 的并發(fā)性
case是有并發(fā)屬性的,比如兩次輸入,分別等待 1、2 秒,再進行兩次讀取,會花 3 秒時間嗎?
- func main() {
- c1,c2 := make(chan string), make(chan string)
- close(c1)
- close(c2)
- go func() {
- time.Sleep(time.Second * 1)
- c1 <- "one"
- }()
- go func() {
- time.Sleep(time.Second * 2)
- c2 <- "two"
- }()
- start := time.Now() // 獲取當(dāng)前時間
- for i := 0; i < 2; i++ {
- select {
- case <-c1:
- case <-c2:
- }
- }
- elapsed := time.Since(start)
- // 這里沒有用到3秒,為什么?
- fmt.Println("該函數(shù)執(zhí)行完成耗時:", elapsed)
- }
以上代碼先初始化兩個 channel c1 和 c2,然后開啟兩個 goroutine 分別往 c1 和 c2 寫入數(shù)據(jù),再通過 select 監(jiān)聽兩個 channel,從中讀取數(shù)據(jù)并輸出。
運行結(jié)果如下:
- $ go run channel.go
- received one
- received two
- 該函數(shù)執(zhí)行完成耗時:2.004695535s
這充分說明case是并發(fā)的,但要注意此處的并發(fā)是 case 對channel阻塞做出的特殊處理。
case并發(fā)的原理
假如case后左邊和右邊跟了函數(shù),會執(zhí)行函數(shù),我們來探索一下。
定義A、B函數(shù),作用相同
- func A() int {
- fmt.Println("start A")
- time.Sleep(1 * time.Second)
- fmt.Println("end A")
- return 1
- }
定義函數(shù)lee,請問該函數(shù)執(zhí)行完成耗時多少呢?
- func lee() {
- ch, done := make(chan int), make(chan struct{})
- defer close(ch)
- go func() {
- select {
- case ch <- A():
- case ch <- B():
- case <-done:
- }
- }()
- done <- struct{}{}
- }
答案是 2 秒
- start A
- end A
- start B
- end B
- main.leespend time: 2.003504395s
- select 掃描是從左到右從上到下的,按這個順序先求值,如果是函數(shù)會先執(zhí)行函數(shù)。
- 然后立馬判斷是否可以立即執(zhí)行(這里是指 case 是否會因為執(zhí)行而阻塞)。
- 所以兩個函數(shù)都會進入,而且是先進入 A 再進入 B,兩個函數(shù)都會執(zhí)行完,所以等待時間會累計。
如果都不會阻塞,此時就會使用一個偽隨機的算法,去選中一個 case,只要選中了其他就被放棄了。
超時控制
我們來模擬一個更真實點的例子,讓程序一段時間超時退出。
定義一個結(jié)構(gòu)體
- type Worker struct {
- stream <-chan int //處理
- timeout time.Duration //超時
- done chan struct{} //結(jié)束信號
- }
定義初始化函數(shù)
- func NewWorker(stream <-chan int, timeout int) *Worker {
- return &Worker{
- stream: stream,
- timeout: time.Duration(timeout) * time.Second,
- done: make(chan struct{}),
- }
- }
定義超時處理函數(shù)
- func (w *Worker) afterTimeStop() {
- go func() {
- time.Sleep(w.timeout)
- w.done <- struct{}{}
- }()
- }
- 超過時間發(fā)送結(jié)束信號
接收數(shù)據(jù)并處理函數(shù)
- func (w *Worker) Start() {
- w.afterTimeStop()
- for {
- select {
- case data, ok := <-w.stream:
- if !ok {
- return
- }
- fmt.Println(data)
- case <-w.done:
- close(w.done)
- return
- }
- }
- }
- 收到結(jié)束信號關(guān)閉函數(shù)
- 這樣的方法就可以讓程序在等待 1 秒后繼續(xù)執(zhí)行,而不會因為 ch 讀取等待而導(dǎo)致程序停滯。
- func main() {
- stream := make(chan int)
- defer close(stream)
- w := NewWorker(stream, 3)
- w.Start()
- }
實際 3 秒到程序運行結(jié)束。
這種方式巧妙地實現(xiàn)了超時處理機制,這種方法不僅簡單,在實際項目開發(fā)中也是非常實用的。
小結(jié)
本節(jié)介紹了select的用法以及包含的陷阱,我們學(xué)會了
- case是并發(fā)的
- case只針對通道傳輸阻塞做特殊處理,如果有計算將會先進行計算
- 掃描是從左到右從上到下的,按這個順序先求值,如果是函數(shù)會先執(zhí)行函數(shù)。如果函數(shù)運行時間長,時間會累計
- 在case全部阻塞時,會執(zhí)行default中的內(nèi)容
- 可使用結(jié)束信號,讓select退出
- 延時發(fā)送結(jié)束信號可以實現(xiàn)超時自動退出的功能
問題:為什么w.stream沒有程序向他發(fā)送數(shù)據(jù),卻沒有死鎖呢?
本節(jié)源碼位置 https://github.com/golang-minibear2333/golang/blob/master/4.concurrent/4.5-select”
本文轉(zhuǎn)載自微信公眾號「機智的程序員小熊」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系機智的程序員小熊公眾號。