一篇帶給你Go語(yǔ)言的并發(fā)
并發(fā)
前言
在學(xué)習(xí) Go 的并發(fā)之前,先復(fù)習(xí)一下操作系統(tǒng)的基礎(chǔ)知識(shí)。
并發(fā)與并行
先來(lái)理一理并發(fā)與并行的區(qū)別。
- 并行:指的是在同一時(shí)間,多個(gè)程序在不同的 CPU 上共同運(yùn)行,互相之間并沒(méi)有對(duì) CPU 資源進(jìn)行競(jìng)爭(zhēng)。比如,我在看書(shū)的時(shí)候,左手用來(lái)翻書(shū),右手做筆記,兩者可以同時(shí)進(jìn)行。
- 并發(fā):如果系統(tǒng)只有一個(gè) CPU,有多個(gè)程序要運(yùn)行,系統(tǒng)只能將 CPU 的時(shí)間劃分為多個(gè)時(shí)間片,然后分配給不同的程序。比如,我看書(shū)的時(shí)候,只能用右手翻完書(shū)之后,才能騰出手來(lái)做筆記。
可是明確的是并發(fā)≠并行,但是只要 CPU 運(yùn)行足夠快,每個(gè)時(shí)間片劃分足夠小,就會(huì)給人們?cè)斐梢环N假象,認(rèn)為計(jì)算機(jī)在同一時(shí)刻做了多個(gè)事情。
進(jìn)程、線(xiàn)程、協(xié)程
進(jìn)程是一個(gè)程序執(zhí)行的過(guò)程,也是系統(tǒng)進(jìn)行資源分配和調(diào)度的基本單位。簡(jiǎn)單來(lái)說(shuō),一個(gè)進(jìn)程就是我們電腦上某個(gè)獨(dú)立運(yùn)行的程序。
而線(xiàn)程是系統(tǒng)能夠調(diào)度的最小單位,它被包含在進(jìn)程里面,是進(jìn)程中的實(shí)際運(yùn)作單位,一個(gè)進(jìn)程可以包含多個(gè)線(xiàn)程??梢詫⑦M(jìn)程理解為一個(gè)工廠(chǎng),而工廠(chǎng)里面的工人就是線(xiàn)程。就像工廠(chǎng)里面必須要有一個(gè)工人才能工作一樣,每個(gè)進(jìn)程里面也必須有一個(gè)線(xiàn)程才能工作。比如,JavaScript 就被成為單線(xiàn)程的語(yǔ)言,說(shuō)明 JavaScript 工廠(chǎng)里面只有一個(gè)打工人,這個(gè)打工人就是工頭,稱(chēng)為主線(xiàn)程。多線(xiàn)程的進(jìn)程中也會(huì)有一個(gè)主線(xiàn)程,主線(xiàn)程一般隨著進(jìn)程一起創(chuàng)建和銷(xiāo)毀。
進(jìn)程與線(xiàn)程都是操作系統(tǒng)上的概念,程序中如果要進(jìn)行進(jìn)程或者線(xiàn)程的切換,在切換的過(guò)程中,需要先保存當(dāng)線(xiàn)程的狀態(tài),然后恢復(fù)另一個(gè)線(xiàn)程的狀態(tài),這是需要耗費(fèi)時(shí)間的,如果是進(jìn)程的切換還可能跨 CPU,無(wú)法利用 CPU 緩存,導(dǎo)致進(jìn)程比線(xiàn)程的切換成本更加高昂。
所以,除了系統(tǒng)級(jí)別的內(nèi)核線(xiàn)程外,一些程序中創(chuàng)建了用戶(hù)線(xiàn)程這一說(shuō),這么做可以減少與操作系統(tǒng)交互,將線(xiàn)程的切換控制在程序內(nèi),這種用戶(hù)態(tài)的線(xiàn)程被稱(chēng)為協(xié)程。用戶(hù)線(xiàn)程的切換完全由程序控制,實(shí)際上使用的內(nèi)核線(xiàn)程就只存在一個(gè),內(nèi)核線(xiàn)程與用戶(hù)線(xiàn)程之間的關(guān)系為一對(duì)多。雖然這樣做可以減少線(xiàn)程上下文切換帶來(lái)的開(kāi)銷(xiāo),但是,無(wú)法避免阻塞的問(wèn)題。一旦某個(gè)用戶(hù)線(xiàn)程被阻塞會(huì)導(dǎo)致內(nèi)核線(xiàn)程的阻塞,無(wú)法進(jìn)行用戶(hù)線(xiàn)程進(jìn)行切換,從而整個(gè)進(jìn)程都被掛起,
協(xié)程
Go 語(yǔ)言中的線(xiàn)程模型既不是使用內(nèi)核線(xiàn)程,也不是完全的用戶(hù)線(xiàn)程,而是一種混合型的線(xiàn)程模型。用戶(hù)線(xiàn)程與內(nèi)核線(xiàn)程的對(duì)應(yīng)關(guān)系為多對(duì)多,用戶(hù)線(xiàn)程與內(nèi)核線(xiàn)程動(dòng)態(tài)關(guān)聯(lián),當(dāng)某個(gè)線(xiàn)程出現(xiàn)阻塞的時(shí)候,可以動(dòng)態(tài)切換到另外的內(nèi)核線(xiàn)程上。
G-P-M模型
上面只是 Go 語(yǔ)言中抽象層面的線(xiàn)程模型,具體是如何進(jìn)行線(xiàn)程調(diào)度的,還是看看 Go 語(yǔ)言的代碼。
- func log(msg string) {
- fmt.Println(msg)
- }
- func main() {
- log("hello")
- go log("world")
- }
之前的文章介紹過(guò),Go 程序在運(yùn)行時(shí),默認(rèn)以 main 函數(shù)為入口,main 函數(shù)中運(yùn)行的代碼會(huì)到一個(gè) goroutine 中運(yùn)行。如果我們?cè)谡{(diào)用的函數(shù)前,加上一個(gè) go 關(guān)鍵詞,那么這個(gè)函數(shù)就放到另外一個(gè) goroutine 中運(yùn)行。
這里說(shuō)的 goroutine 就是 Go 語(yǔ)言中的用戶(hù)線(xiàn)程,也就是協(xié)程。Go 語(yǔ)言在運(yùn)行時(shí),會(huì)建立一個(gè) G-P-M 模型,這個(gè)模型專(zhuān)門(mén)負(fù)責(zé) goroutine 的調(diào)度。
- G:gotoutine(用戶(hù)線(xiàn)程);
- P:processor(邏輯處理器);
- M:machine(機(jī)器資源);
每個(gè) goroutine 都會(huì)放到一個(gè) goroutine 隊(duì)列中,由于是用戶(hù)自主創(chuàng)建,上下文的切換成本極低。P(processor)的主要作用是管理用戶(hù)線(xiàn)程,將 goroutine 合理的安排到內(nèi)核線(xiàn)程上,也就是這個(gè)模型的 M。通常情況下,G 的數(shù)量遠(yuǎn)遠(yuǎn)多于 M。
Goroutine
如果你有運(yùn)行過(guò)上面的代碼,你會(huì)發(fā)現(xiàn),go 關(guān)鍵詞后的函數(shù)并沒(méi)有真正執(zhí)行。
- func log(msg string) {
- fmt.Println(msg)
- }
- func main() {
- log("hello")
- go log("world")
- }
運(yùn)行后,終端只輸出了 hello,并沒(méi)有輸出 world。
這是因?yàn)?main 函數(shù)會(huì)在主 goroutine 中運(yùn)行,類(lèi)似于主線(xiàn)程,而每個(gè) go 語(yǔ)句會(huì)啟動(dòng)一個(gè)新的 goroutine,啟動(dòng)后的 goroutine 并不會(huì)直接執(zhí)行,而是會(huì)放入一個(gè) G 隊(duì)列中,等待 P 的分配。但是主 goroutine 結(jié)束后,就意味著程序結(jié)束了,G 隊(duì)列中的 goroutine 還沒(méi)有等到執(zhí)行時(shí)間。所以,go 語(yǔ)句后的函數(shù)是一個(gè)異步的函數(shù),go 語(yǔ)句調(diào)用后,會(huì)立即去執(zhí)行后面的語(yǔ)句,而不會(huì)等待 go 語(yǔ)句后的函數(shù)執(zhí)行。
如果要 world 輸出,我們可以在 main 函數(shù)后面加一個(gè)休眠,延長(zhǎng)主 goroutine 的執(zhí)行時(shí)間。
- import (
- "fmt"
- "time"
- )
- func log(msg string) {
- fmt.Println(msg)
- }
- func main() {
- fmt.Println()
- log("hello")
- go log("world")
- time.Sleep(time.Millisecond * 500)
- }
通道
多線(xiàn)程編程中,由于各個(gè)線(xiàn)程之間需要共享數(shù)據(jù),一般采用的是共享內(nèi)存的方案。但是這么做,勢(shì)必會(huì)出現(xiàn)多個(gè)線(xiàn)程同時(shí)修改同一份數(shù)據(jù)情況,為了保證數(shù)據(jù)的安全性,需要為數(shù)據(jù)加鎖,處理起來(lái)就比較麻煩。
所以在 Go 語(yǔ)言社區(qū)有一句名言:
不要通過(guò)共享內(nèi)存來(lái)通信,而應(yīng)該通過(guò)通信來(lái)共享內(nèi)存。
創(chuàng)建通道
這里說(shuō)的通信的方式,就是 Go 語(yǔ)言中的通道(channel)。通道是 Go 語(yǔ)言中的一種特殊類(lèi)型,需要通過(guò) make 方法創(chuàng)建一個(gè)通道。
- ch := make(chan int) // 創(chuàng)建一個(gè) int 類(lèi)型的通道
創(chuàng)建通道的時(shí)候,需要加上一個(gè)類(lèi)型,表示該通道傳輸數(shù)據(jù)的類(lèi)型。也可以通過(guò)指定一個(gè)空接口的方式,創(chuàng)建一個(gè)可以傳送任意數(shù)據(jù)的通道。
- ch := make(chan interface{})
創(chuàng)建的通道分為無(wú)緩存通道和有緩存通道,make 方法的第二個(gè)參數(shù)表示可緩存的數(shù)量(如果傳入 0,效果和不傳一樣)。
- ch := make(chan string, 0) // 無(wú)緩存通道,傳入
- ch := make(chan string, 1)
發(fā)送和接收數(shù)據(jù)
通道創(chuàng)建后,通過(guò) <- 符號(hào)來(lái)接收和發(fā)送數(shù)據(jù)。
- ch := make(chan string)
- ch <- "hello world" // 發(fā)送一個(gè)字符串
- msg := <- ch // 接收之前發(fā)送的字符串
實(shí)際在這個(gè)代碼運(yùn)行的時(shí)候,會(huì)提示一個(gè)錯(cuò)誤。
- fatal error: all goroutines are asleep - deadlock!
表明當(dāng)前的 goroutine 處于掛起狀態(tài),并且后續(xù)不會(huì)有響應(yīng),只能直接中斷程序。因?yàn)檫@里創(chuàng)建的是無(wú)緩存通道,發(fā)送數(shù)據(jù)后通道不會(huì)將數(shù)據(jù)緩存在通道中,導(dǎo)致后面要找通道要數(shù)據(jù)的時(shí)候無(wú)法正常從通道中獲取數(shù)據(jù)。我們可以將通道的緩存設(shè)置為 1,讓通道可以緩存一個(gè)數(shù)據(jù)在里面。
- ch := make(chan string, 1)
- ch <- "hello world" // 發(fā)送一個(gè)字符串
- msg := <- ch // 接收之前發(fā)送的字符串
- fmt.Println(msg)
但是如果發(fā)送的數(shù)據(jù)超出了緩存數(shù)量,或者接受數(shù)據(jù)時(shí),緩存里面已經(jīng)沒(méi)有數(shù)據(jù)了,依然會(huì)報(bào)錯(cuò)。
- ch := make(chan string, 1)
- ch <- "hello world"
- ch <- "hello world"
- // fatal error: all goroutines are asleep - deadlock!
- ch := make(chan string, 1)
- ch <- "hello world"
- <- ch
- <- ch
- // fatal error: all goroutines are asleep - deadlock!
協(xié)程中使用通道
那么無(wú)緩存的通道中,應(yīng)該怎么發(fā)送和接收數(shù)據(jù)呢?這就需要將通道與協(xié)程進(jìn)行結(jié)合,也就是 Go 語(yǔ)言中常用的并發(fā)的開(kāi)發(fā)模式。
無(wú)緩存的通道在收發(fā)數(shù)據(jù)時(shí),由于一次只能同步的發(fā)送一個(gè)數(shù)據(jù),會(huì)在兩個(gè) goroutine 間反復(fù)橫跳,通道在接受數(shù)據(jù)時(shí),會(huì)阻塞當(dāng)前 goroutine,直到通道在另一個(gè) goroutine 發(fā)送了數(shù)據(jù)。
- ch := make(chan string) // 創(chuàng)建一個(gè)無(wú)緩存通道
- temp := "我在地球"
- go func () {
- // 接收一個(gè)字符串
- ch <- "hello world"
- temp = "進(jìn)入了異次元"
- }()
- // 運(yùn)行到這里會(huì)被阻塞
- // 直到通道在另一個(gè) goroutine 發(fā)送了數(shù)據(jù)
- msg := <- ch
- fmt.Println(msg)
- fmt.Println("temp =>", temp)
為了證明通道在接收數(shù)據(jù)時(shí)會(huì)被阻塞,我們可以在前面加上一個(gè) temp 變量,然后在另外的 goroutine 中修改這個(gè)變量,看最后輸出的值是否被修改,以此證明通道在接受數(shù)據(jù)時(shí)是否發(fā)生了阻塞。
運(yùn)行結(jié)果已經(jīng)證明,當(dāng)通道接收數(shù)據(jù)時(shí),阻塞了主 goroutine 的執(zhí)行。除了主動(dòng)的從通道里面一條條的獲取數(shù)據(jù),還可以通過(guò) range 的方式循環(huán)獲取數(shù)據(jù)。
- ch := make(chan string)
- go func() {
- for i := 0; i < 5; i++ {
- ch <- fmt.Sprintf("數(shù)據(jù) %d", i)
- }
- close(ch)
- }()
- for data := range ch {
- fmt.Println("接收 =>", data)
- }
如果使用 range 循環(huán)讀取通道中的數(shù)據(jù)時(shí),在數(shù)據(jù)發(fā)送完畢時(shí),需要調(diào)用 close(ch) ,將通道關(guān)閉。
實(shí)戰(zhàn)
在了解了前面的基礎(chǔ)知識(shí)之后,我們可以通過(guò)協(xié)程 + 通道的寫(xiě)一段爬蟲(chóng),來(lái)實(shí)戰(zhàn)一下 Go 語(yǔ)言的并發(fā)能力。
首先確定爬蟲(chóng)需要爬取的網(wǎng)站,由于個(gè)人比較喜歡看電影,所以決定爬一爬豆瓣的電影 TOP 榜單。
其域名為 https://movie.douban.com/top250,翻到第二頁(yè)后,域名為 https://movie.douban.com/top250?start=25 ,第三頁(yè)的域名為 https://movie.douban.com/top250?start=50,說(shuō)明每次這個(gè) TOP 榜單每頁(yè)會(huì)有 25 部電影,每次翻頁(yè)就給 start 參數(shù)加上 25。
- const limit = 25 // 每頁(yè)的數(shù)量為 25
- const total = 100 // 爬取榜單的前 100 部電影
- const page = total / limit // 需要爬取的頁(yè)數(shù)
- func main() {
- var start int
- var url string
- for i :=0; i < page; i++ {
- start := i * limit
- // 計(jì)算得到所有的域名
- url := "https://movie.douban.com/top250?start=" + strconv.Itoa(start)
- }
- }
然后,我們可以構(gòu)造一個(gè) fetch 函數(shù),用于請(qǐng)求對(duì)應(yīng)的頁(yè)面。
- func fetch(url string) {
- // 構(gòu)造請(qǐng)求體
- req, _ := http.NewRequest("GET", url, nil)
- // 由于豆瓣會(huì)校驗(yàn)請(qǐng)求的 Header
- // 如果沒(méi)有 User-Agent,http code 會(huì)返回 418
- req.Header.Add("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36")
- // 發(fā)送請(qǐng)求
- client := &http.Client{}
- rsp, _ := client.Do(req)
- // 斷開(kāi)連接
- defer rsp.Body.Close()
- }
- func main() {
- for i :=0; i < page; i++ {
- url := ……
- go fetch(url, ch)
- }
- }
然后使用 goquery 來(lái)解析 HTML,提取電影的排名以及電影名。
- // 第二個(gè)參數(shù)為與主goroutine 溝通的通道
- func fetch(url string, ch chan string) {
- // 省略部分代碼 ……
- rsp, _ := client.Do(req)
- // 斷開(kāi)連接
- defer rsp.Body.Close()
- // 解析 HTML
- doc, _ := goquery.NewDocumentFromReader(rsp.Body)
- // 提取 HTML 中的電影排行與電影名稱(chēng)
- doc.Find(".item").Each(func(_ int, s *goquery.Selection) {
- num := s.Find(".pic em").Text()
- title := s.Find(".title::first-child").Text()
- // 將電影排行與名稱(chēng)寫(xiě)入管道中
- ch <- fmt.Sprintf("top %s: %s\n", num, title)
- })
- }
最后,在主 goroutine 中創(chuàng)建通道,以及接收通道中的數(shù)據(jù)。
- func main() {
- ch := make(chan string)
- for i :=0; i < page; i++ {
- url := ……
- go fetch(url, ch)
- }
- for i :=0; i < total; i++ {
- top := <- ch // 接收數(shù)據(jù)
- fmt.Println(top)
- }
- }
最后的執(zhí)行結(jié)果如下:
可以看到由于是并發(fā)執(zhí)行,輸出的順序是亂序。
完整代碼
- package main
- import (
- "fmt"
- "github.com/PuerkitoBio/goquery"
- "net/http"
- "strconv"
- )
- const limit = 25
- const total = 100
- const page = total / limit
- func fetch(url string, ch chan string) {
- req, _ := http.NewRequest("GET", url, nil)
- req.Header.Add("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36")
- client := &http.Client{}
- rsp, _ := client.Do(req)
- defer rsp.Body.Close()
- doc, _ := goquery.NewDocumentFromReader(rsp.Body)
- doc.Find(".item").Each(func(_ int, s *goquery.Selection) {
- num := s.Find(".pic em").Text()
- title := s.Find("span.title::first-child").Text()
- ch <- fmt.Sprintf("top %s: %s\n", num, title)
- })
- }
- func main() {
- ch := make(chan string)
- for i :=0; i < page; i++ {
- start := i * limit
- url := "https://movie.douban.com/top250?start=" + strconv.Itoa(start)
- go fetch(url, ch)
- }
- for i :=0; i < total; i++ {
- top := <- ch
- fmt.Println(top)
- }
- }