自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

通俗易懂剖析Go Channel:理解并發(fā)通信的核心機(jī)制

開(kāi)發(fā) 前端
channel?的值和狀態(tài)有多種情況,而不同的操作(send、recv、close)?又可能得到不同的結(jié)果,這是使用 channel? 類(lèi)型時(shí)需要經(jīng)常注意的點(diǎn),我們可以將不同channel?值下的不同操作進(jìn)行一個(gè)總結(jié),特別注意操作?channel?時(shí)會(huì)產(chǎn)生?panic?的情況,已經(jīng)可能會(huì)導(dǎo)致線(xiàn)程阻塞的情況,都是有可能導(dǎo)致死鎖與goroutine泄漏的罪魁禍?zhǔn)住?/div>

我們?cè)趯W(xué)習(xí)與使用Go語(yǔ)言的過(guò)程中,對(duì)channel并不陌生,channel是Go語(yǔ)言與眾不同的特性之一,也是非常重要的一環(huán),深入理解Channel,相信能夠在使用的時(shí)候更加的得心應(yīng)手。

一、Channel基本用法

1、channel類(lèi)別

channel在類(lèi)型上,可以分為兩種:

  • 雙向channel:既能接收又能發(fā)送的channel
  • 單向channel:只能發(fā)送或只能接收的channel,即單向channel可以為分為:

只寫(xiě)channel

只讀channel

聲明并初始化如下如下:

func main() {
    // 聲明并初始化
    var ch chan string = make(chan string) // 雙向channel
    var readCh <-chan string = make(<-chan string) // 只讀channel
    var writeCh chan<- string = make(chan<- string) // 只寫(xiě)channel
}

上述定義中,<-表示單向的channel。如果箭頭指向chan,就表示只寫(xiě)channel,可以往chan里邊寫(xiě)入數(shù)據(jù);如果箭頭遠(yuǎn)離chan,則表示為只讀channel,可以從chan讀數(shù)據(jù)。

在定義channel時(shí),可以定義任意類(lèi)型的channel,因此也同樣可以定義chan類(lèi)型的channel。例如:

a := make(chan<- chan int)   // 定義類(lèi)型為 chan int 的寫(xiě)channel
b := make(chan<- <-chan int) // 定義類(lèi)型為 <-chan int 的寫(xiě)channel
c := make(<-chan <-chan int) // 定義類(lèi)型為 <-chan int 的讀channel
d := make(chan (<-chan int)) // 定義類(lèi)型為 (<-chan int) 的讀channel

當(dāng)channel未初始化時(shí),其零值為nil。nil 是 chan 的零值,是一種特殊的 chan,對(duì)值是 nil 的 chan 的發(fā)送接收調(diào)用者總是會(huì)阻塞。

func main() {
    var ch chan string
    fmt.Println(ch) // <nil>
}

通過(guò)make我們可以初始化一個(gè)channel,并且可以設(shè)置其容量的大小,如下初始化了一個(gè)類(lèi)型為string,其容量大小為512的channel:

var ch chan string = make(chan string, 512)

當(dāng)初始化定義了channel的容量,則這樣的channel叫做buffered chan,即**有緩沖channel。如果沒(méi)有設(shè)置容量,channel的容量為0,這樣的channel叫做unbuffered chan,即無(wú)緩沖channel**。

有緩沖channel中,如果channel中還有數(shù)據(jù),則從這個(gè)channel接收數(shù)據(jù)時(shí)不會(huì)被阻塞。如果channel的容量還未滿(mǎn),那么向這個(gè)channel發(fā)送數(shù)據(jù)也不會(huì)被阻塞,反之則會(huì)被阻塞。

無(wú)緩沖channel則只有當(dāng)讀寫(xiě)操作都準(zhǔn)備好后,才不會(huì)阻塞,這也是unbuffered chan在使用過(guò)程中非常需要注意的一點(diǎn),否則可能會(huì)出現(xiàn)常見(jiàn)的bug。

channel的常見(jiàn)操作:

發(fā)送數(shù)據(jù)

往channel發(fā)送一個(gè)數(shù)據(jù)使用ch <-

func main() {
    var ch chan int = make(chan int, 512)
    ch <- 2000
}

上述的ch可以是chan int類(lèi)型,也可以是單向chan <-int。

接收數(shù)據(jù)

從channel接收一條數(shù)據(jù)可以使用<-ch

func main() {
    var ch chan int = make(chan int, 512)
    ch <- 2000 // 發(fā)送數(shù)據(jù)

    data := <-ch // 接收數(shù)據(jù)
    fmt.Println(data) // 2000
}

ch 類(lèi)型是 chan T,也可以是單向<-chan T

在接收數(shù)據(jù)時(shí),可以返回兩個(gè)返回值。第一個(gè)返回值返回channel中的元素,第二個(gè)返回值為bool類(lèi)型,表示是否成功地從channel中讀取到一個(gè)值。

如果第二個(gè)參數(shù)是false,則表示channel已經(jīng)被close而且channel中沒(méi)有緩存的數(shù)據(jù),這個(gè)時(shí)候第一個(gè)值返回的是零值。

func main() {
    var ch chan int = make(chan int, 512)
    ch <- 2000 // 發(fā)送數(shù)據(jù)

    data1, ok1 := <-ch // 接收數(shù)據(jù)
    fmt.Printf("data1 = %d, ok1 = %t\n", data1, ok1) // data1 = 2000, ok1 = true
    close(ch)  // 關(guān)閉channel
    data2, ok2 := <-ch  // 接收數(shù)據(jù)
    fmt.Printf("data2 = %d, ok2 = %t", data2, ok2) // data2 = 0, ok2 = false
}

所以,如果從channel讀取到一個(gè)零值,可能是發(fā)送操作真正發(fā)送的零值,也可能是closed關(guān)閉channel并且channel沒(méi)有緩存元素產(chǎn)生的零值,這是需要注意判別的一個(gè)點(diǎn)。

其他操作

Go內(nèi)建的函數(shù)close、cap、len都可以對(duì)chan類(lèi)型進(jìn)行操作。

  • close:關(guān)閉channel。
  • cap:返回channel的容量。
  • len:返回channel緩存中還未被取走的元素?cái)?shù)量。
func main() {
    var ch chan int = make(chan int, 512)
    ch <- 100
    ch <- 200
    fmt.Println("ch len:", len(ch)) // ch len: 2
    fmt.Println("ch cap:", cap(ch)) // ch cap: 512
}

發(fā)送操作與接收操作可以作為select語(yǔ)句中的case clause,例如:

func main() {
    var ch = make(chan int, 512)
    for i := 0; i < 10; i++ {
       select {
       case ch <- i:
       case v := <-ch:
          fmt.Println(v)
       }
    }
}

for-range語(yǔ)句同樣可以在chan中使用,例如:

func main() {
    var ch = make(chan int, 512)
    ch <- 100
    ch <- 200
    ch <- 300
    for v := range ch {
       fmt.Println(v)
    }
}

// 執(zhí)行結(jié)果
100
200
300

2、select介紹

在Go語(yǔ)言中,select語(yǔ)句用于監(jiān)控一組case語(yǔ)句,根據(jù)特定的條件執(zhí)行相對(duì)應(yīng)的case語(yǔ)句或default語(yǔ)句,與switch類(lèi)似,但不同之處在于select語(yǔ)句中所有case中的表達(dá)式都必須是channel的發(fā)送或接收操作。select使用示例代碼如下:

select {
case <-ch1:
    fmt.Println("ch1")
case ch2 <- 1:
    fmt.Println("ch2")
}

上述代碼中,select關(guān)鍵字讓當(dāng)前goroutine同時(shí)等待ch1 的可讀和ch2的可寫(xiě),在滿(mǎn)足任意一個(gè)case分支之前,select 會(huì)一直阻塞下去,直到其中的一個(gè) channel 轉(zhuǎn)為就緒狀態(tài)時(shí)執(zhí)行對(duì)應(yīng)case分支的代碼。如果多個(gè)channel同時(shí)就緒的話(huà)則隨機(jī)選擇一個(gè)case執(zhí)行。

當(dāng)使用空select時(shí),空的 select 語(yǔ)句會(huì)直接阻塞當(dāng)前的goroutine,使得該goroutine進(jìn)入無(wú)法被喚醒的永久休眠狀態(tài)??誷elect,即select內(nèi)不包含任何case。

select{
  
}

另外當(dāng)select語(yǔ)句內(nèi)只有一個(gè)case分支時(shí),如果該case分支不滿(mǎn)足,那么當(dāng)前select就變成了一個(gè)阻塞的channel讀/寫(xiě)操作。

select {
case <-ch1:
 fmt.Println("ch1")
}

上述select中,當(dāng)ch1可讀時(shí),會(huì)執(zhí)行打印操作,反之則阻塞當(dāng)前goroutine。

當(dāng)select語(yǔ)句內(nèi)包含default分支時(shí),如果select內(nèi)的所有case都不滿(mǎn)足,則會(huì)執(zhí)行default分支的邏輯,用于當(dāng)其他case都不滿(mǎn)足時(shí)執(zhí)行一些默認(rèn)操作。

select {
case <-ch1:
    fmt.Println("ch1")
case ch2 <- 1:
    fmt.Println("ch2")
default:
    fmt.Println("default")
}

上述代碼中,當(dāng)ch1可讀或ch2可寫(xiě)時(shí),會(huì)執(zhí)行相應(yīng)的打印操作,否則就執(zhí)行default語(yǔ)句中的代碼,相當(dāng)于一個(gè)非阻塞的channel讀取操作。

select的使用可以總結(jié)為:

  • select不存在任何的case且沒(méi)有default分支:永久阻塞當(dāng)前 goroutine;
  • select只存在一個(gè)case且沒(méi)有default分支:阻塞的發(fā)送/接收;
  • select存在多個(gè)case:隨機(jī)選擇一個(gè)滿(mǎn)足條件的case執(zhí)行;
  • select存在default,其他case都不滿(mǎn)足時(shí):執(zhí)行default語(yǔ)句中的代碼;

二、Channel實(shí)現(xiàn)原理

從代碼的角度剖析channel的實(shí)現(xiàn),能夠讓我們更好的去使用channel。

我們可以從chan類(lèi)型的數(shù)據(jù)結(jié)構(gòu)、初始化以及三個(gè)操作發(fā)送、接收和關(guān)閉這幾個(gè)方面來(lái)了解channel。

1、chan數(shù)據(jù)結(jié)構(gòu)

chan類(lèi)型的數(shù)據(jù)結(jié)構(gòu)定義位于runtime.hchan[1],其結(jié)構(gòu)體定義如下:

type hchan struct {
 qcount   uint           // total data in the queue
 dataqsiz uint           // size of the circular queue
 buf      unsafe.Pointer // points to an array of dataqsiz elements
 elemsize uint16
 closed   uint32
 elemtype *_type // element type
 sendx    uint   // send index
 recvx    uint   // receive index
 recvq    waitq  // list of recv waiters
 sendq    waitq  // list of send waiters

 // lock protects all fields in hchan, as well as several
 // fields in sudogs blocked on this channel.
 //
 // Do not change another G's status while holding this lock
 // (in particular, do not ready a G), as this can deadlock
 // with stack shrinking.
 lock mutex
}

解釋一下上述各個(gè)字段的意義:

  • qcount:表示chan中已經(jīng)接收到的數(shù)據(jù)且還未被取走的元素個(gè)數(shù)。內(nèi)建函數(shù)len可以返回這個(gè)字段的值。
  • datasiz:循環(huán)隊(duì)列的大小。chan在實(shí)現(xiàn)上使用一個(gè)循環(huán)隊(duì)列來(lái)存放元素的個(gè)數(shù),循環(huán)隊(duì)列適用于生產(chǎn)者-消費(fèi)者的場(chǎng)景。
  • buf:存放元素的循環(huán)隊(duì)列buffer,buf 字段是一個(gè)指向隊(duì)列緩沖區(qū)的指針,即指向一個(gè)dataqsiz元素的數(shù)組。buf 字段是使用 unsafe.Pointer 類(lèi)型來(lái)表示隊(duì)列緩沖區(qū)的起始地址。unsafe.Pointer是一種特殊的指針類(lèi)型,它可以用于指向任何類(lèi)型的數(shù)據(jù)。由于隊(duì)列緩沖區(qū)的類(lèi)型是動(dòng)態(tài)分配的,所以不能直接使用某個(gè)具體類(lèi)型的指針來(lái)表示。
  • elemtype、elemsize:elemtype表示chan中元素的數(shù)據(jù)類(lèi)型,elemsize表示其大小。當(dāng)chan定義后,它的元素類(lèi)型是固定的,即普通類(lèi)型或者指針類(lèi)型,因此元素大小也是固定的。
  • sendx:處理發(fā)送數(shù)據(jù)操作的指針在buf隊(duì)列中的位置。當(dāng)channel接收到了新的數(shù)據(jù)時(shí),該指針就會(huì)加上elemsize,移動(dòng)到下一個(gè)位置。buf 的總大小是elemsize的整數(shù)倍且buf是一個(gè)循環(huán)列表。
  • recvx:處理接收數(shù)據(jù)操作的指針在buf隊(duì)列中的位置。當(dāng)從buf中取出數(shù)據(jù),此指針會(huì)移動(dòng)到下一個(gè)位置。
  • recvq:當(dāng)接收操作發(fā)現(xiàn)channel中沒(méi)有數(shù)據(jù)可讀時(shí),會(huì)被則色,此時(shí)會(huì)被加入到recvq隊(duì)列中。
  • sendq:當(dāng)發(fā)送操作發(fā)現(xiàn)buf隊(duì)列已滿(mǎn)時(shí),會(huì)被進(jìn)行阻塞,此時(shí)會(huì)被加入到sendq隊(duì)列中。

圖片圖片

2、chan初始化

channel在進(jìn)行初始化時(shí),Go編譯器會(huì)根據(jù)是否傳入容量的大小,來(lái)選擇調(diào)用makechan64,還是makechan。makechan64在實(shí)現(xiàn)上底層還是調(diào)用makechan來(lái)進(jìn)行初始化,makechan64只是對(duì)size做了檢查。

makechan函數(shù)根據(jù)chan的容量的大小和元素的類(lèi)型不同,初始化不同的存儲(chǔ)空間。省略一些檢查代碼,makechan函數(shù)的主要邏輯如下:

func makechan(t *chantype, size int) *hchan {
    elem := t.elem
    
    ...

    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    
    ...
    
    var c *hchan
    switch {
    case mem == 0:
       // 隊(duì)列或元素大小為零,不必創(chuàng)建buf
       c = (*hchan)(mallocgc(hchanSize, nil, true))
       c.buf = c.raceaddr()
    case elem.ptrdata == 0:
       // 元素不包含指針,分配一塊連續(xù)的內(nèi)存給hchan數(shù)據(jù)結(jié)構(gòu)和buf
       // hchan數(shù)據(jù)結(jié)構(gòu)后面緊接著就是buf,在一次調(diào)用中分配hchan和buf
       c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
       c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
       // 元素包含指針,單獨(dú)分配buf
       c = new(hchan)
       c.buf = mallocgc(mem, elem, true)
    }

    // 記錄元素大小、類(lèi)型、容量
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)
    
    ...
    
    return c
}

3、send發(fā)送操作

Go在編譯發(fā)送數(shù)據(jù)給channel時(shí),會(huì)把發(fā)送操作send轉(zhuǎn)換成chansend1函數(shù),而chansend1函數(shù)會(huì)調(diào)用chansend函數(shù)。

func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

我們可以來(lái)分段分析chansend函數(shù)的實(shí)現(xiàn)邏輯。

第一部分:

主要是對(duì)chan進(jìn)行判斷,判斷chan是否為nil,若為nil,則判斷是否需要將當(dāng)前goroutine進(jìn)行阻塞,阻塞通過(guò)gopark來(lái)對(duì)調(diào)用者goroutine park(阻塞休眠)。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 第一部分
    if c == nil { // 判斷chan是否為nil
       if !block { // 判斷是否需要阻塞當(dāng)前goroutine
          return false
       }
       // 調(diào)用這goroutine park,進(jìn)行阻塞休眠
       gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
       throw("unreachable")
    }
    
    ...
}

第二部分

第二部分的邏輯判斷是當(dāng)你往一個(gè)容量已滿(mǎn)的chan實(shí)例發(fā)送數(shù)據(jù),且不想當(dāng)前調(diào)用的goroutine被阻塞時(shí)(chan未被關(guān)閉),那么處理的邏輯是直接返回。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ...
    // 第二部分
    if !block && c.closed == 0 && full(c) {
        return false
    }
    ...
}

第三部分

第三部分的邏輯判斷是首先進(jìn)行互斥鎖加鎖,然后判斷當(dāng)前chan是否關(guān)閉,如果chan已經(jīng)被close了,則釋放互斥鎖并panic,即對(duì)已關(guān)閉的chan發(fā)送數(shù)據(jù)會(huì)panic。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ...
    // 第三部分
    lock(&c.lock) // 開(kāi)始加鎖

    if c.closed != 0 { // 判斷channel是否關(guān)閉
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    ...
}

第四部分

第四部分的邏輯主要是判斷接收隊(duì)列中是否有正在等待的接收方receiver。如果存在正在等待的receiver(說(shuō)明此時(shí)buf中沒(méi)有緩存的數(shù)據(jù)),則將他從接收隊(duì)列中彈出,直接將需要發(fā)送到channel的數(shù)據(jù)交給這個(gè)receiver,而無(wú)需放入到buf中,讓發(fā)送操作速度更快一些。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ...
    
    // 第四部分
    if sg := c.recvq.dequeue(); sg != nil {
       // 找到了一個(gè)正在等待的接收者。我們傳遞我們想要發(fā)送的值
       // 直接傳遞給receiver接收者,繞過(guò)channel buf緩存區(qū)(如果receiver有的話(huà))
       send(c, sg, ep, func() { unlock(&c.lock) }, 3)
       return true
    }

    ...
}

第五部分

當(dāng)?shù)却?duì)列中并沒(méi)有正在等待的receiver,則說(shuō)明當(dāng)前buf還沒(méi)有滿(mǎn),此時(shí)將發(fā)送的數(shù)據(jù)放入到buf中。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ...
    
    // 第五部分
    if c.qcount < c.dataqsiz { // 判斷buf是否滿(mǎn)了
       // channel buf還有可用的空間. 將發(fā)送數(shù)據(jù)入buf循環(huán)隊(duì)列.
       qp := chanbuf(c, c.sendx)
       if raceenabled {
          racenotify(c, c.sendx, nil)
       }
       typedmemmove(c.elemtype, qp, ep)
       c.sendx++
       if c.sendx == c.dataqsiz {
          c.sendx = 0
       }
       c.qcount++
       unlock(&c.lock)
       return true
    }
    
    ...
}

第六部分

當(dāng)邏輯走到第六部分,說(shuō)明正在處理buf已滿(mǎn)的情況。如果buf已滿(mǎn),則發(fā)送操作的goroutine就會(huì)加入到發(fā)送者的等待隊(duì)列,直到被喚醒。當(dāng)goroutine被喚醒時(shí),數(shù)據(jù)或者被取走了,或者chan已經(jīng)被關(guān)閉了。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ...
    // 第六部分
    
    // chansend1函數(shù)調(diào)用不會(huì)進(jìn)入if塊里,因?yàn)閏hansend1的block=true
    if !block {
       unlock(&c.lock)
       return false
    }
    
    ...
    
    c.sendq.enqueue(mysg) // 加入發(fā)送隊(duì)列
    
    ...
    
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 阻塞
    
    ...
}

4、recv接收操作

從channel中接收數(shù)據(jù)時(shí),Go會(huì)將代碼轉(zhuǎn)換成chanrecv1函數(shù)。如果需要返回兩個(gè)返回值,則會(huì)轉(zhuǎn)換成chanrecv2,chanrecv1函數(shù)和chanrecv2都會(huì)調(diào)用chanrecv函數(shù)。chanrecv1和chanrecv2傳入的 block參數(shù)的值是true,兩種調(diào)用都是阻塞方式,因此在分析chanrecv函數(shù)的實(shí)現(xiàn)時(shí),可以不考慮 block=false的情況。

// 從已編譯代碼中進(jìn)入 <-c 的入口點(diǎn)
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

同樣,省略一些檢查類(lèi)的代碼,我們也可以分段分析chanrecv函數(shù)的邏輯。

第一部分

第一部分主要判斷當(dāng)前進(jìn)行接收操作的chan實(shí)例是否為nil,若為nil,則從nil chan中接收數(shù)據(jù)的調(diào)用這goroutine會(huì)被阻塞。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    // 第一部分
    if c == nil { // 判斷chan是否為nil
       if !block { // 是否阻塞,默認(rèn)為block=true
          return
       }
       // 進(jìn)行阻塞
       gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
       throw("unreachable")
    }
    ...
}

第二部分 

這一部分只要是考慮block=false且c為空的情況,block=false的情況我們可以不做考慮。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    // 檢查未獲得鎖的失敗非阻塞操作。
    if !block && empty(c) {
        ...
    }
    ...
}

第三部分

第三部分的邏輯為判斷當(dāng)前chan是否被關(guān)閉,若當(dāng)前chan已經(jīng)被close了,并且緩存隊(duì)列中沒(méi)有緩沖的元素時(shí),返回true、false。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

    ...
   
    lock(&c.lock) // 加鎖,返回時(shí)釋放鎖
    
    // 第三部分
    if c.closed != 0 { // 當(dāng)chan已被關(guān)閉時(shí)
        if c.qcount == 0 { // 且 buf區(qū) 沒(méi)有緩存的數(shù)據(jù)了
            
            ...
            
            unlock(&c.lock) // 解鎖
            if ep != nil {
               typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    } 
    ...
}

第四部分

第四部分是處理通道未關(guān)閉且buf緩存隊(duì)列已滿(mǎn)的情況。只有當(dāng)緩存隊(duì)列已滿(mǎn)時(shí),才能夠從發(fā)送等待隊(duì)列獲取到sender。若當(dāng)前的chan為unbuffer的chan,即無(wú)緩沖區(qū)channel時(shí),則直接將sender的發(fā)送數(shù)據(jù)傳遞給receiver。否則就從緩存隊(duì)列的頭部讀取一個(gè)元素值,并將獲取的sender攜帶的值加入到buf循環(huán)隊(duì)列的尾部。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    if c.closed != 0 { // 當(dāng)chan已被關(guān)閉時(shí)
    
    } else { // 第四部分,通道未關(guān)閉
       // 如果sendq隊(duì)列中有等待發(fā)送的sender
       if sg := c.sendq.dequeue(); sg != nil {
          // 存在正在等待的sender,如果緩存區(qū)的容量為0則直接將發(fā)送方的值傳遞給接收方
          // 反之,則從緩存隊(duì)列的頭部獲取數(shù)據(jù),并將獲取的sender的發(fā)送值加入到緩存隊(duì)列尾部
          recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
          return true, true
       }
    }
    
    ...
}

第五部分

第五部分的主要邏輯是處理發(fā)送隊(duì)列中沒(méi)有等待的sender且buf中有緩存的數(shù)據(jù)。該段邏輯與外出的互斥鎖共用一把鎖,因此不存在并發(fā)問(wèn)題。當(dāng)buf緩存區(qū)有緩存元素時(shí),則取出該元素傳遞給receiver,同時(shí)移動(dòng)接收指針。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    
    // 第五部分
    if c.qcount > 0 { // 發(fā)送隊(duì)列中沒(méi)有等待的sender,且buf中有緩存數(shù)據(jù)
        // 直接從緩存隊(duì)列中獲取數(shù)據(jù)
        qp := chanbuf(c, c.recvx)
        if raceenabled {
           racenotify(c, c.recvx, nil)
        }
        if ep != nil {
           typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++ // 移動(dòng)接收指針
        if c.recvx == c.dataqsiz { // 指針若已到末尾則進(jìn)行重置(循環(huán)隊(duì)列)
           c.recvx = 0
        }
        c.qcount-- // 獲取數(shù)據(jù)后,buf緩存區(qū)元素個(gè)數(shù)減一
        unlock(&c.lock) // 解鎖
        return true, true
    }

    if !block { // block=true
        unlock(&c.lock)
        return false, false
    }
    ...
}

第六部分

第六部分的邏輯主要是處理buf緩存區(qū)中沒(méi)有緩存數(shù)據(jù)的情況。當(dāng)buf緩存區(qū)沒(méi)有緩存數(shù)據(jù)時(shí),那么當(dāng)前的receiver就會(huì)被阻塞,直到它從sender中接收了數(shù)據(jù),或者是chan被close,才會(huì)返回。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    c.recvq.enqueue(mysg) // 將當(dāng)前接收操作入接收隊(duì)列
    
    ...
    
    // 進(jìn)行阻塞,等待喚醒
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    ...
}

5、close關(guān)閉

close函數(shù)主要用于channel的關(guān)閉,Go編譯器會(huì)替換成closechan函數(shù)的調(diào)用。省略一些檢查下的代碼后,closechan函數(shù)的主要邏輯如下:

  • 如果當(dāng)前chan為nil,則直接panic
  • 如果當(dāng)前chan已關(guān)閉,再次close則直接panic
  • 如果chan不為nil,chan也沒(méi)有closed,就把等待隊(duì)列中的 sender(writer)和 receiver(reader)從隊(duì)列中全部移除并喚醒。
func closechan(c *hchan) {
    if c == nil { // 若當(dāng)前chan未nil,則直接panic
       panic(plainError("close of nil channel"))
    }

    lock(&c.lock) // 加鎖
    
    if c.closed != 0 { // 若當(dāng)前chan已經(jīng)關(guān)閉,則直接panic
       unlock(&c.lock)
       panic(plainError("close of closed channel"))
    }
    
    ...

    c.closed = 1 // 設(shè)置當(dāng)前channel的狀態(tài)為已關(guān)閉

    var glist gList

    // 釋放接收隊(duì)列中所有的reader
    for {
       sg := c.recvq.dequeue()
       if sg == nil {
          break
       }
       if sg.elem != nil {
          typedmemclr(c.elemtype, sg.elem)
          sg.elem = nil
       }
       if sg.releasetime != 0 {
          sg.releasetime = cputicks()
       }
       gp := sg.g
       gp.param = unsafe.Pointer(sg)
       sg.success = false
       if raceenabled {
          raceacquireg(gp, c.raceaddr())
       }
       glist.push(gp)
    }

    // 釋放發(fā)送隊(duì)列中所有的writer (它們會(huì)panic)
    for {
       sg := c.sendq.dequeue()
       if sg == nil {
          break
       }
       sg.elem = nil
       if sg.releasetime != 0 {
          sg.releasetime = cputicks()
       }
       gp := sg.g
       gp.param = unsafe.Pointer(sg)
       sg.success = false
       if raceenabled {
          raceacquireg(gp, c.raceaddr())
       }
       glist.push(gp)
    }
    unlock(&c.lock)

    for !glist.empty() {
       gp := glist.pop()
       gp.schedlink = 0
       goready(gp, 3)
    }
}

三、總結(jié)

通過(guò)學(xué)習(xí)channel的基本使用,了解其操作背后的實(shí)現(xiàn)原理,可以幫助我們更好的使用channel,避免一些操作不當(dāng)而導(dǎo)致的panic或者說(shuō)是bug,讓我們?cè)谑褂胏hannel時(shí)能夠更加的得心應(yīng)手。

channel的值和狀態(tài)有多種情況,而不同的操作(send、recv、close)又可能得到不同的結(jié)果,這是使用 channel 類(lèi)型時(shí)需要經(jīng)常注意的點(diǎn),我們可以將不同channel值下的不同操作進(jìn)行一個(gè)總結(jié),特別注意操作channel時(shí)會(huì)產(chǎn)生panic的情況,已經(jīng)可能會(huì)導(dǎo)致線(xiàn)程阻塞的情況,都是有可能導(dǎo)致死鎖與goroutine泄漏的罪魁禍?zhǔn)住?/p>

channel執(zhí)行操作\channel狀態(tài)

channel為nil

channel buf為空

channel buf已滿(mǎn)

channel buf未滿(mǎn)且不為空

channel已關(guān)閉

receive接收操作

阻塞

阻塞

讀取數(shù)據(jù)

讀取數(shù)據(jù)

返回buf中緩存的數(shù)據(jù)

send發(fā)送操作

阻塞

寫(xiě)入數(shù)據(jù)

阻塞

寫(xiě)入數(shù)據(jù)

panic

close關(guān)閉

panic

關(guān)閉channel,buf中沒(méi)有緩存數(shù)據(jù)

關(guān)閉channel,保留已緩存的數(shù)據(jù)

關(guān)閉channel,保留已緩存的數(shù)據(jù)

panic

原文鏈接

小韜同學(xué) 的掘金賬號(hào):Serena[2]

深入解析Go Channel的神秘原理[3]

本文轉(zhuǎn)載自微信公眾號(hào)「 程序員升級(jí)打怪之旅」,作者「小韜&王中陽(yáng)」,可以通過(guò)以下二維碼關(guān)注。

轉(zhuǎn)載本文請(qǐng)聯(lián)系「 程序員升級(jí)打怪之旅」公眾號(hào)。

責(zé)任編輯:武曉燕 來(lái)源: 王中陽(yáng)Go
相關(guān)推薦

2021-05-26 16:12:20

區(qū)塊鏈加密貨幣比特幣

2011-10-26 19:57:33

2018-12-03 09:19:28

移動(dòng)網(wǎng)絡(luò)NAT

2024-07-30 12:24:23

2019-06-19 08:30:47

網(wǎng)絡(luò)協(xié)議IPTCP

2023-01-06 09:40:20

項(xiàng)目性能

2020-06-08 10:50:58

前端TypeScript代碼

2022-06-28 07:31:11

哨兵模式redis

2022-09-23 08:32:53

微服務(wù)架構(gòu)服務(wù)

2022-10-11 11:35:29

自動(dòng)駕駛

2024-06-19 10:08:34

GoChannel工具

2021-11-04 08:16:50

MySQL SQL 語(yǔ)句數(shù)據(jù)庫(kù)

2022-07-06 08:17:50

C 語(yǔ)言函數(shù)選型

2024-09-02 09:00:59

2019-05-20 07:37:00

TCPIP網(wǎng)絡(luò)協(xié)議

2018-01-17 22:36:46

區(qū)塊鏈數(shù)字貨幣比特幣

2018-03-05 08:56:10

物聯(lián)網(wǎng)無(wú)線(xiàn)通信終端設(shè)備

2018-03-11 15:11:38

物聯(lián)網(wǎng)數(shù)據(jù)物聯(lián)網(wǎng)數(shù)據(jù)

2021-05-25 09:50:01

GitLinux命令

2021-05-30 19:02:59

變量對(duì)象上下文
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)