手把手教你Golang的協(xié)程池設(shè)計
本文轉(zhuǎn)載自微信公眾號「程序員小飯」,作者飯米粒。轉(zhuǎn)載本文請聯(lián)系程序員小飯公眾號。
前言
現(xiàn)在很多公司都在陸續(xù)的搭建golang的語言棧,大家有沒有想過為什么會出現(xiàn)這種情況?一是因為go比較適合做中間件,還有一個原因就是go的并發(fā)支持比較好,也就是咱們平時所謂的高并發(fā),并發(fā)支持離不開協(xié)程,當然協(xié)程也不是亂用的,需要管理起來,管理協(xié)程的方式就是協(xié)程池,所以協(xié)程池也并沒有那么神秘,今天咱們就來一步一步的揭開協(xié)程池的面紗,如果你沒有接觸過go的協(xié)程這塊的話也沒有關(guān)系,我會盡量寫的詳細。
goroutine(協(xié)程)
先來看一個簡單的例子
- func go_worker(name string) {
- for i := 0; i < 5; i++ {
- fmt.Println("我的名字是", name)
- time.Sleep(1 * time.Second)
- }
- fmt.Println(name, "執(zhí)行完畢")
- }
- func main() {
- go_worker("123")
- go_worker("456")
- for i := 0; i < 5; i++ {
- fmt.Println("我是main")
- time.Sleep(1 * time.Second)
- }
- }
咱們在執(zhí)行這段代碼的時候,當然是按照順序執(zhí)行
go_worker("123")->go_worker("456")->我是main執(zhí)行
輸出結(jié)果如下
- 我的名字是 123
- 我的名字是 123
- 我的名字是 123
- 我的名字是 123
- 我的名字是 123
- 123 執(zhí)行完畢
- 我的名字是 456
- 我的名字是 456
- 我的名字是 456
- 我的名字是 456
- 我的名字是 456
- 456 執(zhí)行完畢
- 我是main
- 我是main
- 我是main
- 我是main
- 我是main
這樣的執(zhí)行是并行的,也就是說必須得等一個任務(wù)執(zhí)行結(jié)束,下一個任務(wù)才會開始,如果某個任務(wù)比較慢的話,整個程序的效率是可想而知的,但是在go語言中,支持協(xié)程,所以我們可以把上面的代碼改造一下
- func go_worker(name string) {
- for i := 0; i < 5; i++ {
- fmt.Println("我的名字是", name)
- time.Sleep(1 * time.Second)
- }
- fmt.Println(name, "執(zhí)行完畢")
- }
- func main() {
- go go_worker("123") //協(xié)程
- go go_worker("456") //協(xié)程
- for i := 0; i < 5; i++ {
- fmt.Println("我是main")
- time.Sleep(1 * time.Second)
- }
- }
我們在不同的go_worker前面加上了一個go,這樣所有任務(wù)就異步的串行了起來,輸出結(jié)果如下
- 我是main
- 我的名字是 456
- 我的名字是 123
- 我的名字是 123
- 我是main
- 我的名字是 456
- 我是main
- 我的名字是 456
- 我的名字是 123
- 我是main
- 我的名字是 456
- 我的名字是 123
- 我的名字是 456
- 我的名字是 123
- 我是main
大家可以看到這樣的話就是各自任務(wù)執(zhí)行各自的事情,互相不影響,效率也得到了很大的提升,這就是goroutine
channel(管道)
有了協(xié)程之后就會帶來一個新的問題,協(xié)程之間是如何通信的?于是就引出了管道這個概念,管道其實很簡單,無非就是往里放數(shù)據(jù),往外取數(shù)據(jù)而已
- func worker(c chan int) {
- num := <-c //讀取管道中的數(shù)據(jù),并輸出
- fmt.Println("接收到參數(shù)c:", num)
- }
- func main() {
- //channel的創(chuàng)建,需要執(zhí)行管道數(shù)據(jù)的類型,我們這里是int
- c := make(chan int)
- //開辟一個協(xié)程 去執(zhí)行worker函數(shù)
- go worker(c)
- c <- 2 //往管道中寫入2
- fmt.Println("main")
- }
我們可以看到上述例子,在main函數(shù)中,我們定義了一個管道,為int類型,而且往里面寫入了一個2,然后在worker中讀取管道c,就能獲取到2
協(xié)程會引發(fā)的問題
既然golang中開啟協(xié)程這么方便,那么會不會存在什么坑呢?
我們可以看上圖,實際業(yè)務(wù)中,不同的業(yè)務(wù)都開啟不同的goroutine來執(zhí)行,但是在cpu微觀層面上來講,是串行的一個指令一個指令去執(zhí)行的,只是執(zhí)行的非常快而已,如果指令來的太多,cpu的切換也會變多,在切換的過程中就需要消耗性能,所以協(xié)程池的主要作用就是管理goroutine,限定goroutine的個數(shù)
協(xié)程池的實現(xiàn)
- 首先不同的任務(wù),請求過來,直接往entryChannel中寫入,entryChannel再和jobsChannel建立通信
- 然后我們固定開啟三個協(xié)程(不一定是三個,只是用三個舉例子),固定的從jobsChannel中讀取數(shù)據(jù),來進行任務(wù)處理。
- 其實本質(zhì)上,channel就是一道橋梁,做一個中轉(zhuǎn)的作用,之所以要設(shè)計一個jobsChannel和entryChannel,是為了解耦,entryChannel可以完全用做入口,jobsChannel可以做更深入的比如任務(wù)優(yōu)先級,或者加鎖,解鎖等處理
代碼實現(xiàn)
原理清楚了,接下來我們來具體看代碼實現(xiàn)
首先我們來處理任務(wù) task,task無非就是業(yè)務(wù)中的各種任務(wù),需要能實力化,并且執(zhí)行,代碼如下
- //定義任務(wù)Task類型,每一個任務(wù)Task都可以抽象成一個函數(shù)
- type Task struct{
- f func() error //一個task中必須包含一個具體的業(yè)務(wù)
- }
- //通過NewTask來創(chuàng)建一個Task
- func NewTask(arg_f func() error) *Task{
- t := Task{
- f:arg_f,
- }
- return &t
- }
- //Task也需要一個執(zhí)行業(yè)務(wù)的方法
- func (t *Task) Execute(){
- t.f()//調(diào)用任務(wù)中已經(jīng)綁定好的業(yè)務(wù)方法
- }
接下來我們來定義協(xié)程池
- //定義池類型
- type Pool struct{
- EntryChannel chan *Task
- WorkerNum int
- JobsChanel chan *Task
- }
- //創(chuàng)建一個協(xié)程池
- func NewPool(cap int) *Pool{
- p := Pool{
- EntryChannel: make(chan *Task),
- JobsChanel: make(chan *Task),
- WorkerNum: cap,
- }
- return &p
- }
協(xié)程池需要創(chuàng)建worker,然后不斷的從JobsChannel內(nèi)部任務(wù)隊列中拿任務(wù)開始工作
- //協(xié)程池創(chuàng)建worker并開始工作
- func (p *Pool) worker(workerId int){
- //worker不斷的從JobsChannel內(nèi)部任務(wù)隊列中拿任務(wù)
- for task := range p.JobsChanel{
- task.Execute()
- fmt.Println("workerId",workerId,"執(zhí)行任務(wù)成功")
- }
- }
- EntryChannel獲取Task任務(wù)
- func (p *Pool) ReceiveTask(t *Task){
- p.EntryChannel <- t
- }
- //讓協(xié)程池開始工作
- func (p *Pool) Run(){
- //1:首先根據(jù)協(xié)程池的worker數(shù)量限定,開啟固定數(shù)量的worker
- for i:=0; i<p.WorkerNum; i++{
- go p.worker(i)
- }
- //2:從EntryChannel協(xié)程出入口取外部傳遞過來的任務(wù)
- //并將任務(wù)送進JobsChannel中
- for task := range p.EntryChannel{
- p.JobsChanel <- task
- }
- //3:執(zhí)行完畢需要關(guān)閉JobsChannel和EntryChannel
- close(p.JobsChanel)
- close(p.EntryChannel)
- }
然后我們看在main函數(shù)中
- //創(chuàng)建一個task
- t:= NewTask(func() error{
- fmt.Println(time.Now())
- return nil
- })
- //創(chuàng)建一個協(xié)程池,最大開啟5個協(xié)程worker
- p:= NewPool(3)
- //開啟一個協(xié)程,不斷的向Pool輸送打印一條時間的task任務(wù)
- go func(){
- for {
- p.ReceiveTask(t)//把任務(wù)推向EntryChannel
- }
- }()
- //啟動協(xié)程池p
- p.Run()
基于上述方法,咱們一個簡單的協(xié)程池設(shè)計就完成了,當然在實際生產(chǎn)環(huán)境中這樣做還是不夠的,不過這些方法能手寫出來,那對golang是相當熟悉了,






