通俗易懂剖析Go Channel:理解并發(fā)通信的核心機(jī)制
我們?cè)趯W(xué)習(xí)與使用Go語(yǔ)言的過(guò)程中,對(duì)channel并不陌生,channel是Go語(yǔ)言與眾不同的特性之一,也是非常重要的一環(huán),深入理解Channel,相信能夠在使用的時(shí)候更加的得心應(yīng)手。
一、Channel基本用法
1、channel類(lèi)別
channel在類(lèi)型上,可以分為兩種:
- 雙向channel:既能接收又能發(fā)送的channel
- 單向channel:只能發(fā)送或只能接收的channel,即單向channel可以為分為:
只寫(xiě)channel
只讀channel
聲明并初始化如下如下:
func main() {
// 聲明并初始化
var ch chan string = make(chan string) // 雙向channel
var readCh <-chan string = make(<-chan string) // 只讀channel
var writeCh chan<- string = make(chan<- string) // 只寫(xiě)channel
}
上述定義中,<-表示單向的channel。如果箭頭指向chan,就表示只寫(xiě)channel,可以往chan里邊寫(xiě)入數(shù)據(jù);如果箭頭遠(yuǎn)離chan,則表示為只讀channel,可以從chan讀數(shù)據(jù)。
在定義channel時(shí),可以定義任意類(lèi)型的channel,因此也同樣可以定義chan類(lèi)型的channel。例如:
a := make(chan<- chan int) // 定義類(lèi)型為 chan int 的寫(xiě)channel
b := make(chan<- <-chan int) // 定義類(lèi)型為 <-chan int 的寫(xiě)channel
c := make(<-chan <-chan int) // 定義類(lèi)型為 <-chan int 的讀channel
d := make(chan (<-chan int)) // 定義類(lèi)型為 (<-chan int) 的讀channel
當(dāng)channel未初始化時(shí),其零值為nil。nil 是 chan 的零值,是一種特殊的 chan,對(duì)值是 nil 的 chan 的發(fā)送接收調(diào)用者總是會(huì)阻塞。
func main() {
var ch chan string
fmt.Println(ch) // <nil>
}
通過(guò)make我們可以初始化一個(gè)channel,并且可以設(shè)置其容量的大小,如下初始化了一個(gè)類(lèi)型為string,其容量大小為512的channel:
var ch chan string = make(chan string, 512)
當(dāng)初始化定義了channel的容量,則這樣的channel叫做buffered chan,即**有緩沖channel。如果沒(méi)有設(shè)置容量,channel的容量為0,這樣的channel叫做unbuffered chan,即無(wú)緩沖channel**。
有緩沖channel中,如果channel中還有數(shù)據(jù),則從這個(gè)channel接收數(shù)據(jù)時(shí)不會(huì)被阻塞。如果channel的容量還未滿(mǎn),那么向這個(gè)channel發(fā)送數(shù)據(jù)也不會(huì)被阻塞,反之則會(huì)被阻塞。
無(wú)緩沖channel則只有當(dāng)讀寫(xiě)操作都準(zhǔn)備好后,才不會(huì)阻塞,這也是unbuffered chan在使用過(guò)程中非常需要注意的一點(diǎn),否則可能會(huì)出現(xiàn)常見(jiàn)的bug。
channel的常見(jiàn)操作:
發(fā)送數(shù)據(jù)
往channel發(fā)送一個(gè)數(shù)據(jù)使用ch <-
func main() {
var ch chan int = make(chan int, 512)
ch <- 2000
}
上述的ch可以是chan int類(lèi)型,也可以是單向chan <-int。
接收數(shù)據(jù)
從channel接收一條數(shù)據(jù)可以使用<-ch
func main() {
var ch chan int = make(chan int, 512)
ch <- 2000 // 發(fā)送數(shù)據(jù)
data := <-ch // 接收數(shù)據(jù)
fmt.Println(data) // 2000
}
ch 類(lèi)型是 chan T,也可以是單向<-chan T
在接收數(shù)據(jù)時(shí),可以返回兩個(gè)返回值。第一個(gè)返回值返回channel中的元素,第二個(gè)返回值為bool類(lèi)型,表示是否成功地從channel中讀取到一個(gè)值。
如果第二個(gè)參數(shù)是false,則表示channel已經(jīng)被close而且channel中沒(méi)有緩存的數(shù)據(jù),這個(gè)時(shí)候第一個(gè)值返回的是零值。
func main() {
var ch chan int = make(chan int, 512)
ch <- 2000 // 發(fā)送數(shù)據(jù)
data1, ok1 := <-ch // 接收數(shù)據(jù)
fmt.Printf("data1 = %d, ok1 = %t\n", data1, ok1) // data1 = 2000, ok1 = true
close(ch) // 關(guān)閉channel
data2, ok2 := <-ch // 接收數(shù)據(jù)
fmt.Printf("data2 = %d, ok2 = %t", data2, ok2) // data2 = 0, ok2 = false
}
所以,如果從channel讀取到一個(gè)零值,可能是發(fā)送操作真正發(fā)送的零值,也可能是closed關(guān)閉channel并且channel沒(méi)有緩存元素產(chǎn)生的零值,這是需要注意判別的一個(gè)點(diǎn)。
其他操作
Go內(nèi)建的函數(shù)close、cap、len都可以對(duì)chan類(lèi)型進(jìn)行操作。
- close:關(guān)閉channel。
- cap:返回channel的容量。
- len:返回channel緩存中還未被取走的元素?cái)?shù)量。
func main() {
var ch chan int = make(chan int, 512)
ch <- 100
ch <- 200
fmt.Println("ch len:", len(ch)) // ch len: 2
fmt.Println("ch cap:", cap(ch)) // ch cap: 512
}
發(fā)送操作與接收操作可以作為select語(yǔ)句中的case clause,例如:
func main() {
var ch = make(chan int, 512)
for i := 0; i < 10; i++ {
select {
case ch <- i:
case v := <-ch:
fmt.Println(v)
}
}
}
for-range語(yǔ)句同樣可以在chan中使用,例如:
func main() {
var ch = make(chan int, 512)
ch <- 100
ch <- 200
ch <- 300
for v := range ch {
fmt.Println(v)
}
}
// 執(zhí)行結(jié)果
100
200
300
2、select介紹
在Go語(yǔ)言中,select語(yǔ)句用于監(jiān)控一組case語(yǔ)句,根據(jù)特定的條件執(zhí)行相對(duì)應(yīng)的case語(yǔ)句或default語(yǔ)句,與switch類(lèi)似,但不同之處在于select語(yǔ)句中所有case中的表達(dá)式都必須是channel的發(fā)送或接收操作。select使用示例代碼如下:
select {
case <-ch1:
fmt.Println("ch1")
case ch2 <- 1:
fmt.Println("ch2")
}
上述代碼中,select關(guān)鍵字讓當(dāng)前goroutine同時(shí)等待ch1 的可讀和ch2的可寫(xiě),在滿(mǎn)足任意一個(gè)case分支之前,select 會(huì)一直阻塞下去,直到其中的一個(gè) channel 轉(zhuǎn)為就緒狀態(tài)時(shí)執(zhí)行對(duì)應(yīng)case分支的代碼。如果多個(gè)channel同時(shí)就緒的話(huà)則隨機(jī)選擇一個(gè)case執(zhí)行。
當(dāng)使用空select時(shí),空的 select 語(yǔ)句會(huì)直接阻塞當(dāng)前的goroutine,使得該goroutine進(jìn)入無(wú)法被喚醒的永久休眠狀態(tài)??誷elect,即select內(nèi)不包含任何case。
select{
}
另外當(dāng)select語(yǔ)句內(nèi)只有一個(gè)case分支時(shí),如果該case分支不滿(mǎn)足,那么當(dāng)前select就變成了一個(gè)阻塞的channel讀/寫(xiě)操作。
select {
case <-ch1:
fmt.Println("ch1")
}
上述select中,當(dāng)ch1可讀時(shí),會(huì)執(zhí)行打印操作,反之則阻塞當(dāng)前goroutine。
當(dāng)select語(yǔ)句內(nèi)包含default分支時(shí),如果select內(nèi)的所有case都不滿(mǎn)足,則會(huì)執(zhí)行default分支的邏輯,用于當(dāng)其他case都不滿(mǎn)足時(shí)執(zhí)行一些默認(rèn)操作。
select {
case <-ch1:
fmt.Println("ch1")
case ch2 <- 1:
fmt.Println("ch2")
default:
fmt.Println("default")
}
上述代碼中,當(dāng)ch1可讀或ch2可寫(xiě)時(shí),會(huì)執(zhí)行相應(yīng)的打印操作,否則就執(zhí)行default語(yǔ)句中的代碼,相當(dāng)于一個(gè)非阻塞的channel讀取操作。
select的使用可以總結(jié)為:
- select不存在任何的case且沒(méi)有default分支:永久阻塞當(dāng)前 goroutine;
- select只存在一個(gè)case且沒(méi)有default分支:阻塞的發(fā)送/接收;
- select存在多個(gè)case:隨機(jī)選擇一個(gè)滿(mǎn)足條件的case執(zhí)行;
- select存在default,其他case都不滿(mǎn)足時(shí):執(zhí)行default語(yǔ)句中的代碼;
二、Channel實(shí)現(xiàn)原理
從代碼的角度剖析channel的實(shí)現(xiàn),能夠讓我們更好的去使用channel。
我們可以從chan類(lèi)型的數(shù)據(jù)結(jié)構(gòu)、初始化以及三個(gè)操作發(fā)送、接收和關(guān)閉這幾個(gè)方面來(lái)了解channel。
1、chan數(shù)據(jù)結(jié)構(gòu)
chan類(lèi)型的數(shù)據(jù)結(jié)構(gòu)定義位于runtime.hchan[1],其結(jié)構(gòu)體定義如下:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
解釋一下上述各個(gè)字段的意義:
- qcount:表示chan中已經(jīng)接收到的數(shù)據(jù)且還未被取走的元素個(gè)數(shù)。內(nèi)建函數(shù)len可以返回這個(gè)字段的值。
- datasiz:循環(huán)隊(duì)列的大小。chan在實(shí)現(xiàn)上使用一個(gè)循環(huán)隊(duì)列來(lái)存放元素的個(gè)數(shù),循環(huán)隊(duì)列適用于生產(chǎn)者-消費(fèi)者的場(chǎng)景。
- buf:存放元素的循環(huán)隊(duì)列buffer,buf 字段是一個(gè)指向隊(duì)列緩沖區(qū)的指針,即指向一個(gè)dataqsiz元素的數(shù)組。buf 字段是使用 unsafe.Pointer 類(lèi)型來(lái)表示隊(duì)列緩沖區(qū)的起始地址。unsafe.Pointer是一種特殊的指針類(lèi)型,它可以用于指向任何類(lèi)型的數(shù)據(jù)。由于隊(duì)列緩沖區(qū)的類(lèi)型是動(dòng)態(tài)分配的,所以不能直接使用某個(gè)具體類(lèi)型的指針來(lái)表示。
- elemtype、elemsize:elemtype表示chan中元素的數(shù)據(jù)類(lèi)型,elemsize表示其大小。當(dāng)chan定義后,它的元素類(lèi)型是固定的,即普通類(lèi)型或者指針類(lèi)型,因此元素大小也是固定的。
- sendx:處理發(fā)送數(shù)據(jù)操作的指針在buf隊(duì)列中的位置。當(dāng)channel接收到了新的數(shù)據(jù)時(shí),該指針就會(huì)加上elemsize,移動(dòng)到下一個(gè)位置。buf 的總大小是elemsize的整數(shù)倍且buf是一個(gè)循環(huán)列表。
- recvx:處理接收數(shù)據(jù)操作的指針在buf隊(duì)列中的位置。當(dāng)從buf中取出數(shù)據(jù),此指針會(huì)移動(dòng)到下一個(gè)位置。
- recvq:當(dāng)接收操作發(fā)現(xiàn)channel中沒(méi)有數(shù)據(jù)可讀時(shí),會(huì)被則色,此時(shí)會(huì)被加入到recvq隊(duì)列中。
- sendq:當(dāng)發(fā)送操作發(fā)現(xiàn)buf隊(duì)列已滿(mǎn)時(shí),會(huì)被進(jìn)行阻塞,此時(shí)會(huì)被加入到sendq隊(duì)列中。
圖片
2、chan初始化
channel在進(jìn)行初始化時(shí),Go編譯器會(huì)根據(jù)是否傳入容量的大小,來(lái)選擇調(diào)用makechan64,還是makechan。makechan64在實(shí)現(xiàn)上底層還是調(diào)用makechan來(lái)進(jìn)行初始化,makechan64只是對(duì)size做了檢查。
makechan函數(shù)根據(jù)chan的容量的大小和元素的類(lèi)型不同,初始化不同的存儲(chǔ)空間。省略一些檢查代碼,makechan函數(shù)的主要邏輯如下:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
...
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
...
var c *hchan
switch {
case mem == 0:
// 隊(duì)列或元素大小為零,不必創(chuàng)建buf
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 元素不包含指針,分配一塊連續(xù)的內(nèi)存給hchan數(shù)據(jù)結(jié)構(gòu)和buf
// hchan數(shù)據(jù)結(jié)構(gòu)后面緊接著就是buf,在一次調(diào)用中分配hchan和buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素包含指針,單獨(dú)分配buf
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 記錄元素大小、類(lèi)型、容量
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
...
return c
}
3、send發(fā)送操作
Go在編譯發(fā)送數(shù)據(jù)給channel時(shí),會(huì)把發(fā)送操作send轉(zhuǎn)換成chansend1函數(shù),而chansend1函數(shù)會(huì)調(diào)用chansend函數(shù)。
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
我們可以來(lái)分段分析chansend函數(shù)的實(shí)現(xiàn)邏輯。
第一部分:
主要是對(duì)chan進(jìn)行判斷,判斷chan是否為nil,若為nil,則判斷是否需要將當(dāng)前goroutine進(jìn)行阻塞,阻塞通過(guò)gopark來(lái)對(duì)調(diào)用者goroutine park(阻塞休眠)。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 第一部分
if c == nil { // 判斷chan是否為nil
if !block { // 判斷是否需要阻塞當(dāng)前goroutine
return false
}
// 調(diào)用這goroutine park,進(jìn)行阻塞休眠
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
}
第二部分
第二部分的邏輯判斷是當(dāng)你往一個(gè)容量已滿(mǎn)的chan實(shí)例發(fā)送數(shù)據(jù),且不想當(dāng)前調(diào)用的goroutine被阻塞時(shí)(chan未被關(guān)閉),那么處理的邏輯是直接返回。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 第二部分
if !block && c.closed == 0 && full(c) {
return false
}
...
}
第三部分
第三部分的邏輯判斷是首先進(jìn)行互斥鎖加鎖,然后判斷當(dāng)前chan是否關(guān)閉,如果chan已經(jīng)被close了,則釋放互斥鎖并panic,即對(duì)已關(guān)閉的chan發(fā)送數(shù)據(jù)會(huì)panic。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 第三部分
lock(&c.lock) // 開(kāi)始加鎖
if c.closed != 0 { // 判斷channel是否關(guān)閉
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}
第四部分
第四部分的邏輯主要是判斷接收隊(duì)列中是否有正在等待的接收方receiver。如果存在正在等待的receiver(說(shuō)明此時(shí)buf中沒(méi)有緩存的數(shù)據(jù)),則將他從接收隊(duì)列中彈出,直接將需要發(fā)送到channel的數(shù)據(jù)交給這個(gè)receiver,而無(wú)需放入到buf中,讓發(fā)送操作速度更快一些。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 第四部分
if sg := c.recvq.dequeue(); sg != nil {
// 找到了一個(gè)正在等待的接收者。我們傳遞我們想要發(fā)送的值
// 直接傳遞給receiver接收者,繞過(guò)channel buf緩存區(qū)(如果receiver有的話(huà))
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
...
}
第五部分
當(dāng)?shù)却?duì)列中并沒(méi)有正在等待的receiver,則說(shuō)明當(dāng)前buf還沒(méi)有滿(mǎn),此時(shí)將發(fā)送的數(shù)據(jù)放入到buf中。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 第五部分
if c.qcount < c.dataqsiz { // 判斷buf是否滿(mǎn)了
// channel buf還有可用的空間. 將發(fā)送數(shù)據(jù)入buf循環(huán)隊(duì)列.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}
第六部分
當(dāng)邏輯走到第六部分,說(shuō)明正在處理buf已滿(mǎn)的情況。如果buf已滿(mǎn),則發(fā)送操作的goroutine就會(huì)加入到發(fā)送者的等待隊(duì)列,直到被喚醒。當(dāng)goroutine被喚醒時(shí),數(shù)據(jù)或者被取走了,或者chan已經(jīng)被關(guān)閉了。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 第六部分
// chansend1函數(shù)調(diào)用不會(huì)進(jìn)入if塊里,因?yàn)閏hansend1的block=true
if !block {
unlock(&c.lock)
return false
}
...
c.sendq.enqueue(mysg) // 加入發(fā)送隊(duì)列
...
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 阻塞
...
}
4、recv接收操作
從channel中接收數(shù)據(jù)時(shí),Go會(huì)將代碼轉(zhuǎn)換成chanrecv1函數(shù)。如果需要返回兩個(gè)返回值,則會(huì)轉(zhuǎn)換成chanrecv2,chanrecv1函數(shù)和chanrecv2都會(huì)調(diào)用chanrecv函數(shù)。chanrecv1和chanrecv2傳入的 block參數(shù)的值是true,兩種調(diào)用都是阻塞方式,因此在分析chanrecv函數(shù)的實(shí)現(xiàn)時(shí),可以不考慮 block=false的情況。
// 從已編譯代碼中進(jìn)入 <-c 的入口點(diǎn)
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
同樣,省略一些檢查類(lèi)的代碼,我們也可以分段分析chanrecv函數(shù)的邏輯。
第一部分
第一部分主要判斷當(dāng)前進(jìn)行接收操作的chan實(shí)例是否為nil,若為nil,則從nil chan中接收數(shù)據(jù)的調(diào)用這goroutine會(huì)被阻塞。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 第一部分
if c == nil { // 判斷chan是否為nil
if !block { // 是否阻塞,默認(rèn)為block=true
return
}
// 進(jìn)行阻塞
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
}
第二部分
這一部分只要是考慮block=false且c為空的情況,block=false的情況我們可以不做考慮。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 檢查未獲得鎖的失敗非阻塞操作。
if !block && empty(c) {
...
}
...
}
第三部分
第三部分的邏輯為判斷當(dāng)前chan是否被關(guān)閉,若當(dāng)前chan已經(jīng)被close了,并且緩存隊(duì)列中沒(méi)有緩沖的元素時(shí),返回true、false。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
lock(&c.lock) // 加鎖,返回時(shí)釋放鎖
// 第三部分
if c.closed != 0 { // 當(dāng)chan已被關(guān)閉時(shí)
if c.qcount == 0 { // 且 buf區(qū) 沒(méi)有緩存的數(shù)據(jù)了
...
unlock(&c.lock) // 解鎖
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}
第四部分
第四部分是處理通道未關(guān)閉且buf緩存隊(duì)列已滿(mǎn)的情況。只有當(dāng)緩存隊(duì)列已滿(mǎn)時(shí),才能夠從發(fā)送等待隊(duì)列獲取到sender。若當(dāng)前的chan為unbuffer的chan,即無(wú)緩沖區(qū)channel時(shí),則直接將sender的發(fā)送數(shù)據(jù)傳遞給receiver。否則就從緩存隊(duì)列的頭部讀取一個(gè)元素值,并將獲取的sender攜帶的值加入到buf循環(huán)隊(duì)列的尾部。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.closed != 0 { // 當(dāng)chan已被關(guān)閉時(shí)
} else { // 第四部分,通道未關(guān)閉
// 如果sendq隊(duì)列中有等待發(fā)送的sender
if sg := c.sendq.dequeue(); sg != nil {
// 存在正在等待的sender,如果緩存區(qū)的容量為0則直接將發(fā)送方的值傳遞給接收方
// 反之,則從緩存隊(duì)列的頭部獲取數(shù)據(jù),并將獲取的sender的發(fā)送值加入到緩存隊(duì)列尾部
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
...
}
第五部分
第五部分的主要邏輯是處理發(fā)送隊(duì)列中沒(méi)有等待的sender且buf中有緩存的數(shù)據(jù)。該段邏輯與外出的互斥鎖共用一把鎖,因此不存在并發(fā)問(wèn)題。當(dāng)buf緩存區(qū)有緩存元素時(shí),則取出該元素傳遞給receiver,同時(shí)移動(dòng)接收指針。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 第五部分
if c.qcount > 0 { // 發(fā)送隊(duì)列中沒(méi)有等待的sender,且buf中有緩存數(shù)據(jù)
// 直接從緩存隊(duì)列中獲取數(shù)據(jù)
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++ // 移動(dòng)接收指針
if c.recvx == c.dataqsiz { // 指針若已到末尾則進(jìn)行重置(循環(huán)隊(duì)列)
c.recvx = 0
}
c.qcount-- // 獲取數(shù)據(jù)后,buf緩存區(qū)元素個(gè)數(shù)減一
unlock(&c.lock) // 解鎖
return true, true
}
if !block { // block=true
unlock(&c.lock)
return false, false
}
...
}
第六部分
第六部分的邏輯主要是處理buf緩存區(qū)中沒(méi)有緩存數(shù)據(jù)的情況。當(dāng)buf緩存區(qū)沒(méi)有緩存數(shù)據(jù)時(shí),那么當(dāng)前的receiver就會(huì)被阻塞,直到它從sender中接收了數(shù)據(jù),或者是chan被close,才會(huì)返回。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
c.recvq.enqueue(mysg) // 將當(dāng)前接收操作入接收隊(duì)列
...
// 進(jìn)行阻塞,等待喚醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
...
}
5、close關(guān)閉
close函數(shù)主要用于channel的關(guān)閉,Go編譯器會(huì)替換成closechan函數(shù)的調(diào)用。省略一些檢查下的代碼后,closechan函數(shù)的主要邏輯如下:
- 如果當(dāng)前chan為nil,則直接panic
- 如果當(dāng)前chan已關(guān)閉,再次close則直接panic
- 如果chan不為nil,chan也沒(méi)有closed,就把等待隊(duì)列中的 sender(writer)和 receiver(reader)從隊(duì)列中全部移除并喚醒。
func closechan(c *hchan) {
if c == nil { // 若當(dāng)前chan未nil,則直接panic
panic(plainError("close of nil channel"))
}
lock(&c.lock) // 加鎖
if c.closed != 0 { // 若當(dāng)前chan已經(jīng)關(guān)閉,則直接panic
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
...
c.closed = 1 // 設(shè)置當(dāng)前channel的狀態(tài)為已關(guān)閉
var glist gList
// 釋放接收隊(duì)列中所有的reader
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 釋放發(fā)送隊(duì)列中所有的writer (它們會(huì)panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
三、總結(jié)
通過(guò)學(xué)習(xí)channel的基本使用,了解其操作背后的實(shí)現(xiàn)原理,可以幫助我們更好的使用channel,避免一些操作不當(dāng)而導(dǎo)致的panic或者說(shuō)是bug,讓我們?cè)谑褂胏hannel時(shí)能夠更加的得心應(yīng)手。
channel的值和狀態(tài)有多種情況,而不同的操作(send、recv、close)又可能得到不同的結(jié)果,這是使用 channel 類(lèi)型時(shí)需要經(jīng)常注意的點(diǎn),我們可以將不同channel值下的不同操作進(jìn)行一個(gè)總結(jié),特別注意操作channel時(shí)會(huì)產(chǎn)生panic的情況,已經(jīng)可能會(huì)導(dǎo)致線(xiàn)程阻塞的情況,都是有可能導(dǎo)致死鎖與goroutine泄漏的罪魁禍?zhǔn)住?/p>
channel執(zhí)行操作\channel狀態(tài) | channel為nil | channel buf為空 | channel buf已滿(mǎn) | channel buf未滿(mǎn)且不為空 | channel已關(guān)閉 |
| 阻塞 | 阻塞 | 讀取數(shù)據(jù) | 讀取數(shù)據(jù) | 返回buf中緩存的數(shù)據(jù) |
| 阻塞 | 寫(xiě)入數(shù)據(jù) | 阻塞 | 寫(xiě)入數(shù)據(jù) | panic |
| panic | 關(guān)閉channel,buf中沒(méi)有緩存數(shù)據(jù) | 關(guān)閉channel,保留已緩存的數(shù)據(jù) | 關(guān)閉channel,保留已緩存的數(shù)據(jù) | panic |
原文鏈接
小韜同學(xué) 的掘金賬號(hào):Serena[2]
深入解析Go Channel的神秘原理[3]
本文轉(zhuǎn)載自微信公眾號(hào)「 程序員升級(jí)打怪之旅」,作者「小韜&王中陽(yáng)」,可以通過(guò)以下二維碼關(guān)注。
轉(zhuǎn)載本文請(qǐng)聯(lián)系「 程序員升級(jí)打怪之旅」公眾號(hào)。






