C++ 從零實現(xiàn)協(xié)程調(diào)度框架
0 前言
推進掌握一門知識的方法除了溫故知新之外,還可以是觸類旁通. 近期沉寂停更的時間里,我在經(jīng)歷自學 c++ 理論知識的入門階段,不過大家都知道“紙上得來終覺淺”的道理,因此我決定以復刻 golang gmp 協(xié)程調(diào)度設(shè)計思路為目標,基于c++11的風格實現(xiàn)一個低配乞丐版協(xié)程調(diào)度框架,以此作為我的首個 c++ 實踐開源項目,并希望以此為契機,在提高 c++ 編程熟練度的同時,也能提供一波旁支輸入,反補提升我對gmp概念的理解.
該項目我已于 github 開源,cbricks 是我基于 c++11 從零實現(xiàn)的基礎(chǔ)工具開源庫:https://github.com/xiaoxuxiansheng/cbricks.其中實現(xiàn)的內(nèi)容包括但不僅限于 協(xié)程調(diào)度框架 workerpool、協(xié)程 coroutine/線程 thread、并發(fā)通信隊列 channel、日志打印組件 logger等基本工具類,而 協(xié)程調(diào)度框架 workerpool 正是我今天我要向大家介紹的主題.這是我作為 c++ 初學者推進的首個開源項目,完全出于學習實踐目的,難免存在水平不足以及重復造輪的問題,如有考慮不到位、不完善之處請多多包涵,也歡迎批評指正~
在開始正文前,致敬環(huán)節(jié) 必不可少. 在實現(xiàn) cbricks 的編程過程中,在很大程度上學習借鑒了sylar-yin 老師的課程,在此特別感謝,也附上其開源項目傳送門供大家參考使用:https://github.com/sylar-yin/sylar . 正因為有前輩們慷慨無私的傾囊分享,我的學習之路才得以更加平坦順暢. 正是以上種種鼓勵著我能有動力把技術(shù)分享以及這種開源精神繼續(xù)傳播下去.
1 基本概念
首先,需要大家一起理清楚有關(guān)協(xié)程的基本概念.
1.1 線程與協(xié)程
我們通常所熟知的最小調(diào)度單元稱為線程(thread),亦指內(nèi)核級線程,其創(chuàng)建、銷毀、調(diào)度過程需要交由操作系統(tǒng)內(nèi)核負責. 線程與進程可以是多對一關(guān)系,可以充分利用 CPU 多核的能力,提高程序運行的并發(fā)度。
而協(xié)程(coroutine) 又稱為用戶級線程,是在用戶態(tài)視角下對線程概念的二次封裝. 一方面,協(xié)程與線程關(guān)系為多對一,因此在邏輯意義上屬于更細粒度的調(diào)度單元;另一方面,因為協(xié)程的創(chuàng)建、銷毀、調(diào)度行為都在用戶態(tài)中完成,而無需內(nèi)核參與,因此協(xié)程是一個更加輕量化的概念. (對于內(nèi)核來說,其最小調(diào)度單元始終為線程不變,至于用戶態(tài)下對線程又作了怎樣的邏輯拆分,對于內(nèi)核而言是完全透明無感知的,因此無需介入)
線程與協(xié)程
1.2 coroutine 與 goroutine
因為我畢竟有著較長的 golang 開發(fā)使用經(jīng)驗,需要在探討相關(guān)問題的時候是無法繞開對 golang 中對 goroutine 這一設(shè)計的對比與探討的.
我們把常規(guī)的協(xié)程稱為 coroutine. 而在 golang 語言層面天然支持一種優(yōu)化版協(xié)程模型,稱為 goroutine,并運轉(zhuǎn)調(diào)度于 go 語言中知名的 gmp(goroutine-machine-processor) 架構(gòu)之下.
gmp架構(gòu)
有關(guān) gmp 相關(guān)內(nèi)容更細致的講解,可以參見我之前分享的文章:golang gmp 原理
在經(jīng)歷了 cbricks workerpool 的開發(fā)實踐后,我也對 gmp 架構(gòu)下 groutine 相較于普通 coroutine 存在的優(yōu)勢有了一些更深刻的體會:
- ? 線程松耦合:經(jīng)由 P 的從中斡旋,goroutine 能夠跨 M(thread)調(diào)度運行,真正實現(xiàn) G 與 M 之間的松耦合. 而這一點我所實現(xiàn)的 c++ coroutine 中無法做到.(本文實現(xiàn)的 coroutine 底層依賴于 c 中的 ucontext 庫完成協(xié)程??臻g的分配,由于棧的線程私有性,一經(jīng)分配便不允許被其他線程染指,因此 coroutine 在初始化后就必然是某個 thread 所獨有的)
- ? 棧空間自適應(yīng)擴縮:goroutine ??臻g大小可以根據(jù)實際需要做到自適應(yīng)擴縮,并且針對使用方完全屏蔽這一細節(jié). 而我所實現(xiàn)的 c++ coroutine 需要在其初始化時就顯式設(shè)定好??臻g大小,并且在運行過程中不便于修改.
用戶視角下的gmp并發(fā)
? 阻塞粒度適配:這一點非常重要. golang 為使用方屏蔽了線程的概念,所有并發(fā)操作都基于 goroutine 粒度完成,這不僅限于調(diào)度,也包括與之相應(yīng)的一系列并發(fā)阻塞工具,例如 鎖 mutex,通道 channel 等,都在語言層面天然支持 goroutine 粒度的被動阻塞(go_park)操作,與 gmp 體系完美適配;而這一點在 c++ 中則有所不同,如 鎖 mutex、信號量 semaphore 等工具的最小阻塞粒度都是線程,這就會導致協(xié)程的優(yōu)勢遭到削弱,因為在一個 coroutine 中的阻塞行為最終會上升到 thread 粒度,并進而導致 thread 下其他 coroutine 也無法得到正常調(diào)度.
2 快速上手
做完基本概念鋪墊后,下面我們開始介紹有關(guān)協(xié)程調(diào)度框架 cbricks workerpool 的具體實現(xiàn)內(nèi)容.
2.1 使用方法
本章我們聚焦在如何快速上手使用 workerpool 這一問題. workerpool 類型聲明于 ./pool/workerpool.h 頭文件中,使用方通常只需關(guān)心其構(gòu)造函數(shù)和兩個公開方法:
- ? 構(gòu)造函數(shù)——WorkerPool:初始化 workerpool 實例,其中唯一入?yún)?nbsp;threads 為需要啟用的線程個數(shù),默認為 8 個
- ? 公開方法——submit:往 workerpool 中投遞一個任務(wù) task(以 void() 閉包函數(shù)的形式)
- ? 公開方法——sched:主動讓渡當前 task 的執(zhí)行權(quán),以實現(xiàn)同線程下協(xié)程間的切換
// 命名空間 cbricks::pool
namespace cbricks{namespace pool{
// 協(xié)程調(diào)度池
classWorkerPool: base::Noncopyable{
public:
// 構(gòu)造函數(shù) threads——使用的線程個數(shù). 默認為 8 個
WorkerPool(size_t threads =8);
// ...
public:
/**
公開方法
*/
/**
* submit: 向協(xié)程調(diào)度池中提交一個任務(wù) (仿 golang 協(xié)程池 ants 風格)
- task 會被隨機分配到線程手中,保證負載均衡
- task 被一個線程取到之后,會創(chuàng)建對應(yīng)協(xié)程實例,為其分配本地棧,此時該任務(wù)和協(xié)程就固定屬于一個線程了
* param:task——提交的任務(wù) nonblock——是否為阻塞模式
- 阻塞模式:線程本地任務(wù)隊列滿時阻塞等待
- 非阻塞模式:線程本地隊列滿時直接返回 false
* response:true——提交成功 false——提交失敗
*/
bool submit(task task, bool nonblock = false);
// 工作協(xié)程調(diào)度任務(wù)過程中,可以通過執(zhí)行次方法主動讓出線程的調(diào)度權(quán) (仿 golang runtime.Goched 風格)
void sched();
}}
2.2 使用示例
下面是關(guān)于 workerpool 的具體使用示例,其中演示了如何完成一個 workerpool 的初始化,并通過 submit 方法向其中批量投遞異步執(zhí)行的任務(wù),最后對執(zhí)行結(jié)果進行驗收:
#include <iostream>
#include "sync/sem.h"
#include "pool/workerpool.h"
void testWorkerPool();
int main(int argc, char** argv){
// 測試函數(shù)
testWorkerPool();
}
void testWorkerPool(){
// 協(xié)程調(diào)度框架類型別名定義
typedef cbricks::pool::WorkerPool workerPool;
// 信號量類型別名定義
typedef cbricks::sync::Semaphore semaphore;
// 初始化協(xié)程調(diào)度框架,設(shè)置并發(fā)的 threads 數(shù)量為 8
workerPool::ptr workerPoolPtr(new workerPool(8));
// 初始化一個原子計數(shù)器
std::atomic<int> cnt{0};
// 初始化一個信號量實例
semaphore sem;
// 投遞 10000 個異步任務(wù)到協(xié)程調(diào)度框架中,執(zhí)行邏輯就是對 cnt 加 1
for(int i =0; i <10000; i++){
// 執(zhí)行 submit 方法,將任務(wù)提交到協(xié)程調(diào)度框架中
workerPoolPtr->submit([&cnt,&sem](){
cnt++;
sem.notify();
});
}
// 通過信號量等待 10000 個異步任務(wù)執(zhí)行完成
for(int i =0; i <10000; i++){
sem.wait();
}
// 輸出 cnt 結(jié)果(預期結(jié)果為 10000)
std::cout << cnt << std::endl;
}
3 架構(gòu)設(shè)計
了解完使用方式后,隨后就來揭曉其底層實現(xiàn)原理. 本著由總到分的學習指導綱領(lǐng),本章我們從全局視角縱覽 workerpool 的設(shè)計實現(xiàn)架構(gòu).
3.1 整體架構(gòu)與核心概念
cbricks協(xié)程調(diào)度架構(gòu)
workerpool 自下而上,由粗到細可以分為如下層級概念:
? 線程池 threadPool:workerpool 初始化時就啟動指定數(shù)量的常駐線程 thread 實例. 這些 thread 數(shù)量固定不變,并且會持續(xù)運行,直到整個 workerpool 被析構(gòu)為止. 由這些 thread 組成的集合,我們稱為 線程池 threadPool.
? 線程 thread:持續(xù)運營的 thread 單元,不斷執(zhí)行著調(diào)度邏輯,依次嘗試從本地任務(wù)隊列 taskq、本地協(xié)程隊列 sched_q 中獲取任務(wù) task /協(xié)程 coroutine 進行調(diào)度. 如果前兩者都空閑,則 thread 會仿照 gmp 中的 workstealing 機制,從其他 thread 的 taskq 中竊取 task 過來執(zhí)行. 最后 steal 后仍缺少 task 供執(zhí)行調(diào)度,則會利用 channel 的機制,使 thread 陷入阻塞,從而讓出 cpu 執(zhí)行權(quán)
? 任務(wù) task:用戶提交的異步任務(wù)(對應(yīng)為 void() 閉包函數(shù)類型). task 會被均勻分配到特定 thread 的 taskq 中,但還存在被其他 thread 竊取的可能性,因此 task 本質(zhì)上還是能夠跨 thread 傳遞使用的
? 協(xié)程 coroutine:在 workerpool 中,thread 不會直接執(zhí)行 task,而是會為 task 一對一構(gòu)建出 coroutine 實例,并切換至 coroutine 中完成對 task 的執(zhí)行. coroutine 被創(chuàng)建出來后,會完成棧 stack 的初始化和分配,隨后 coroutine 就固定屬于一個 thread 了,終生不可再被其他 thread 染指
? 線程本地任務(wù)隊列 taskq:每個 thread 私有的緩存 task 的隊列,底層由并發(fā)安全的通信隊列 channel 實現(xiàn). 當一筆 task 被投遞到 workerpool 時,會基于負載均衡策略投遞到特定 thread 的 taskq 中,接下來會被該 thread 優(yōu)先調(diào)度執(zhí)行
? 線程本地協(xié)程隊列 schedq:每個 thread 私有的緩存 coroutine 的隊列,底層由普通隊列 queue 實現(xiàn),但屬于線程本地變量 thread_local,因此也是并發(fā)安全的. 當一個 coroutine 因主動讓渡 sched 操作而暫停執(zhí)行時,會將其暫存到 schedq 中,等待后續(xù)再找時機完成該 coroutine 的調(diào)度工作.
3.2 相比 gmp 的不足之處
我在實現(xiàn) workerpool 時,一定程度上仿照了 gmp 的風格,包括 thread 本地任務(wù)隊列 taskq 的實現(xiàn)以及 workstealing 機制的設(shè)計.
cbricks協(xié)程調(diào)度框架的不足之處
然而受限于我的個人水平以及語言層面的風格差異,相較于 gmp,workerpool 還存在幾個明顯的缺陷:
? coroutine 與 thread 強綁定:當一個 coroutine 被初始化時,我使用的是 c 語言中 ucontext.h 完成 stack 的分配,這樣 coroutine stack 就是 thread 私有的,因此 coroutine 不能做到跨 thread 調(diào)度.
? thread 級阻塞粒度:c++ 中,并發(fā)工具因此的阻塞行為都是以 thread 為單位. 以互斥鎖 lock 為例,哪怕觸發(fā)加鎖阻塞行為的對象是 coroutine,但最終還是會引起整個 thread 對象陷入阻塞,從而導致 thread 下的其他已分配好的 coroutine 也無法得到執(zhí)行.
要解決這一問題,就必須連帶著對 lock、cond、semaphore 等工具進行改造,使得其能夠支持 coroutine 粒度的阻塞操作,這樣的成本無疑很高,本項目未予以實踐.
4 頭文件源碼
從第 4 章開始,我們正式進入源碼解析環(huán)節(jié). 首先給出關(guān)于 workerpool 頭文件的完整代碼展示,包含其中的成員屬性以及公私方法定義. 下面的示意圖以及源碼中給出的注釋相對比較完備,在此不再贅述:
workerpool 類定義
代碼位于 ./pool/workerpool.h:
// 保證頭文件內(nèi)容不被重復編譯
#pragma once
/**
依賴的標準庫頭文件
*/
// 標準庫智能指針相關(guān)
#include <memory>
// 標準庫函數(shù)編程相關(guān)
#include <functional>
// 標準庫原子量相關(guān)
#include <atomic>
// 標準庫——動態(tài)數(shù)組,以此作為線程池的載體
#include <vector>
/**
依賴的項目內(nèi)部頭文件
*/
// 線程 thread 實現(xiàn)
#include "../sync/thread.h"
// 協(xié)程 coroutine 實現(xiàn)
#include "../sync/coroutine.h"
// 阻塞隊列 channel 實現(xiàn) (一定程度上仿 golang channel 風格)
#include "../sync/channel.h"
// 信號量 semaphore 實現(xiàn)
#include "../sync/sem.h"
// 拷貝禁用工具,用于保證類實例無法被值拷貝和值傳遞
#include "../base/nocopy.h"
// 命名空間 cbricks::pool
namespace cbricks{namespace pool{
// 協(xié)程調(diào)度池 繼承 Noncopyable 保證禁用值拷貝和值傳遞功能
classWorkerPool: base::Noncopyable{
public:
// 協(xié)程池共享指針類型別名
typedef std::shared_ptr<WorkerPool> ptr;
// 一筆需要執(zhí)行的任務(wù)
typedef std::function<void()> task;
// 一個線程持有的本地任務(wù)隊列
typedef sync::Channel<task> localq;
// 本地任務(wù)隊列指針別名
typedef localq::ptr localqPtr;
// 線程指針別名
typedef sync::Thread* threadPtr;
// 一個分配了運行任務(wù)的協(xié)程
typedef sync::Coroutine worker;
// 協(xié)程智能指針別名
typedef sync::Coroutine::ptr workerPtr;
// 讀寫鎖別名
typedef sync::RWLock rwlock;
// 信號量類型別名
typedef sync::Semaphore semaphore;
public:
/**
構(gòu)造/析構(gòu)函數(shù)
*/
// 構(gòu)造函數(shù) threads——使用的線程個數(shù). 默認為 8 個
WorkerPool(size_t threads =8);
// 析構(gòu)函數(shù)
~WorkerPool();
public:
/**
公開方法
*/
/**
* submit: 向協(xié)程調(diào)度池中提交一個任務(wù) (仿 golang 協(xié)程池 ants 風格)
- task 會被隨機分配到線程手中,保證負載均衡
- task 被一個線程取到之后,會創(chuàng)建對應(yīng)協(xié)程實例,為其分配本地棧,此時該任務(wù)和協(xié)程就固定屬于一個線程了
* param:task——提交的任務(wù) nonblock——是否為阻塞模式
- 阻塞模式:線程本地任務(wù)隊列滿時阻塞等待
- 非阻塞模式:線程本地隊列滿時直接返回 false
* response:true——提交成功 false——提交失敗
*/
bool submit(task task, bool nonblock = false);
// 工作協(xié)程調(diào)度任務(wù)過程中,可以通過執(zhí)行次方法主動讓出線程的調(diào)度權(quán) (仿 golang runtime.Goched 風格)
void sched();
private:
/**
* thread——workerPool 中封裝的線程類
* - index:線程在線程池中的 index
* - thr:真正的線程實例,類型為 sync/thread.h 中的 Thread
* - taskq:線程的本地任務(wù)隊列,其中數(shù)據(jù)類型為閉包函數(shù) void()
* - lock:一把線程實例粒度的讀寫鎖. 用于隔離 submit 操作和 workstealing 操作,避免因任務(wù)隊列阻塞導致死鎖
*/
structthread{
typedef std::shared_ptr<thread> ptr;
int index;
threadPtr thr;
localqPtr taskq;
rwlock lock;
/**
* 構(gòu)造函數(shù)
* param: index: 線程在線程池中的 index; thr: 底層真正的線程實例; taskq:線程持有的本地任務(wù)隊列
*/
thread(int index,threadPtr thr, localqPtr taskq):index(index),thr(thr),taskq(taskq){}
~thread()=default;
};
private:
/**
私有方法
*/
// work:線程運行主函數(shù),持續(xù)不斷地從本地任務(wù)隊列 taskq 或本地協(xié)程隊列 t_schedq 中獲取任務(wù)/協(xié)程進行調(diào)度. 倘若本地任務(wù)為空,會嘗試從其他線程本地任務(wù)隊列竊取任務(wù)執(zhí)行
void work();
/**
* readAndGo:從指定的任務(wù)隊列中獲取任務(wù)并執(zhí)行
* param:taskq——指定的任務(wù)隊列 nonblock——是否為阻塞模式
* reponse:true——成功 false——失敗
*/
bool readAndGo(localqPtr taskq, bool nonblock);
/**
* goTask: 為一筆任務(wù)創(chuàng)建一個協(xié)程實例,并調(diào)度該任務(wù)函數(shù)
* param: cb——待執(zhí)行任務(wù)
* tip:如果該任務(wù)未一次性執(zhí)行完成(途中使用了 sched 方法),則會在棧中封存好任務(wù)的執(zhí)行信息,然后將該協(xié)程實例追加到線程本地的協(xié)程隊列 t_schedq 中,等待后續(xù)再被線程調(diào)度
*/
void goTask(task cb);
/**
* goWorker:調(diào)度某個協(xié)程實例,其中已經(jīng)分配好執(zhí)行的任務(wù)函數(shù)
* param: worker——分配好執(zhí)行任務(wù)函數(shù)的協(xié)程實例
* tip:如果該任務(wù)未一次性執(zhí)行完成(途中使用了 sched 方法),則會在棧中封存好任務(wù)的執(zhí)行信息,然后將該協(xié)程實例追加到線程本地的協(xié)程隊列 t_schedq 中,等待后續(xù)再被線程調(diào)度
*/
void goWorker(workerPtr worker);
/**
* workStealing:當其他線程任務(wù)隊列 taskq 中竊取半數(shù)任務(wù)填充到本地隊列
*/
void workStealing();
/**
* workStealing 重載:從線程 stealFrom 的任務(wù)隊列中竊取半數(shù)任務(wù)填充到線程 stealTo 本地隊列
*/
void workStealing(thread::ptr stealTo, thread::ptr stealFrom);
/**
* getStealingTarget:隨機獲取一個線程作為竊取目標
*/
thread::ptr getStealingTarget();
/**
* getThreadByThreadName 通過線程名稱獲取對應(yīng)的線程實例
*/
thread::ptr getThreadByThreadName(std::string threadName);
/**
* getThread 獲取當前線程實例
*/
thread::ptr getThread();
private:
/**
* 靜態(tài)私有方法
*/
// getThreadNameByIndex:通過線程 index 映射得到線程名稱
static const std::string getThreadNameByIndex(int index);
// getThreadIndex:獲取當前線程的 index
static const int getThreadIndex();
// getThreadName:獲取當前線程的名稱
static const std::string getThreadName();
private:
/**
* 私有成員屬性
*/
// 基于 vector 實現(xiàn)的線程池,元素類型為 WorkerPool::thread 對應(yīng)共享指針
std::vector<thread::ptr> m_threadPool;
// 基于原子變量標識 workerPool 是否已關(guān)閉
std::atomic<bool> m_closed{false};
};
}}
5 核心實現(xiàn)源碼
接下來針對 workerpool 中的核心流程進行詳細的源碼走讀,有關(guān) workerpool 具體實現(xiàn)代碼位于 ./pool/workerpool.cpp 中.
5.1 依賴的頭文件與變量
圖片
依賴的外部變量
首先涉及到兩個核心變量的定義:
- ? 全局變量 s_taskId:全局單調(diào)遞增的原子計數(shù)器,為每個到來的 task 分配全局唯一 task id,并依據(jù)此 id 明確 task 應(yīng)該指派給哪個 thread
- ? 線程本地變量(thread_local) t_schedq:線程私有的協(xié)程隊列. 運行過程因主動讓渡而暫停的 coroutine,會被暫存到其中,等待后續(xù)被相同的 thread 繼續(xù)調(diào)度執(zhí)行.
// 標準庫隊列實現(xiàn). 依賴隊列作為線程本地協(xié)程隊列的存儲載體
#include <queue>
// workerpool 頭文件
#include "workerpool.h"
// 本項目定義的斷言頭文件
#include "../trace/assert.h"
// namespace cbricks::pool
namespace cbricks{namespace pool{
/**
* 全局變量 s_taskId:用于分配任務(wù) id 的全局遞增計數(shù)器,通過原子變量保證并發(fā)安全
* 每個任務(wù)函數(shù)會根據(jù)分配到的 id,被均勻地分發(fā)給各個線程,以此實現(xiàn)負載均衡
*/
static std::atomic<int> s_taskId{0};
/**
* 線程本地變量 t_schedq:線程私有的協(xié)程隊列
* 當線程下某個協(xié)程沒有一次性將任務(wù)執(zhí)行完成時(任務(wù)調(diào)用了 sched 讓渡函數(shù)),則該協(xié)程會被暫存于此隊列中,等待后續(xù)被相同的線程繼續(xù)調(diào)度
*/
staticthread_local std::queue<WorkerPool::workerPtr> t_schedq;
// ...
}}
5.2 構(gòu)造函數(shù)與析構(gòu)函數(shù)
workerpool 構(gòu)造函數(shù)
下面介紹workerpool 的構(gòu)造函數(shù),其任務(wù)很明確,就是初始化好指定數(shù)量的 thread,為其分配好對應(yīng)的 taskq,并將 thread 一一投遞進入到線程池 threadPool 中.
此處值得一提的是,thread 啟動后異步運行的方法是 WorkerPool::work,其中會涉及到從 threadPool 中取出當前 thread 實例的操作,因此這里需要通過信號量 semaphore 保證 thread 實例先被投遞進入 threadPool 后,對應(yīng) WorkerPool::work 方法才能被放行.
// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
* workerpool 構(gòu)造函數(shù):
* - 初始化好各個線程實例 thread
* - 將各 thread 添加到線程池 m_threadPool 中
*/
WorkerPool::WorkerPool(size_t threads){
CBRICKS_ASSERT(threads >0,"worker pool init with nonpositive threads num");
// 為線程池預留好對應(yīng)的容量
this->m_threadPool.reserve(threads);
/**
* 構(gòu)造好對應(yīng)于每個 thread 的信號量
* 這是為了保證 thread 實例先被添加進入 m_threadPool,thread 中的調(diào)度函數(shù)才能往下執(zhí)行
* 這樣做是因為 thread 調(diào)度函數(shù)有依賴于從 m_threadPool 獲取自身實例的操作
*/
std::vector<semaphore> sems(threads);
// 另一個信號量,用于保證所有 thread 調(diào)度函數(shù)都正常啟動后,當前構(gòu)造函數(shù)才能退出,避免 sems 被提前析構(gòu)
semaphore waitGroup;
// 初始化好對應(yīng)數(shù)量的 thread 實例并添加進入 m_threadPool
for(int i =0; i < threads; i++){
// 根據(jù) index 映射得到 thread 名稱
std::string threadName =WorkerPool::getThreadNameByIndex(i);
// 將 thread 實例添加進入 m_threadPool
this->m_threadPool.push_back(thread::ptr(
// thread 實例初始化
newthread(
i,
//
new sync::Thread([this,&sems,&waitGroup](){
/**
* 此處 wait 操作是需要等待對應(yīng) thread 實例先被推送進入 m_threadPool
* 因為一旦后續(xù)的 work 函數(shù)運行,就會涉及從 m_threadPool 中獲取 thread 實例的操作
* 因此先后順序不能顛倒
*/
sems[getThreadIndex()].wait();
/**
* 此處 notify 操作是配合外層的 waitGroup.wait 操作
* 保證所有 thread 都正常啟動后,workerPool 構(gòu)造函數(shù)才能退出
* 這是為了防止 sems 被提前析構(gòu)
*/
waitGroup.notify();
// 異步啟動的 thread,最終運行的調(diào)度函數(shù)是 workerpool::work
this->work();
},
// 注入 thread 名稱,與 index 有映射關(guān)系
threadName),
// 分配給 thread 的本地任務(wù)隊列
localqPtr(new localq))));
/**
* 在 thread 實例被推送入 m_threadPool 后進行 notify
* 這樣 thread 調(diào)度函數(shù)才會被向下放行
*/
sems[i].notify();
}
/**
* 等待所有 thread 實例正常啟動后,構(gòu)造函數(shù)再退出
*/
for(int i =0; i < threads; i++){
waitGroup.wait();
}
}
在析構(gòu)函數(shù)中,要做的處理是將 workerpool 關(guān)閉標識 m_closed 置為 true,并且一一關(guān)閉所有 thread 下的 taskq ,這樣運行中的 thread 在感知到這一信息后都會主動退出.
// 析構(gòu)函數(shù)
WorkerPool::~WorkerPool(){
// 將 workpool 的關(guān)閉標識置為 true,后續(xù)運行中的線程感知到此標識后會主動退出
this->m_closed.store(true);
// 等待所有線程都退出后,再退出 workpool 的析構(gòu)函數(shù)
for(int i =0; i <this->m_threadPool.size(); i++){
// 關(guān)閉各 thread 的本地任務(wù)隊列
this->m_threadPool[i]->taskq->close();
// 等待各 thread 退出
this->m_threadPool[i]->thr->join();
}
}
// ...
}}
5.3 公有方法:提交任務(wù)
workerpool提交任務(wù)流程
用戶通過 submit 方法,能夠?qū)?nbsp;task 提交到 workerpool 中. 在 submit 流程中:
- ? 首先,為 task 分配全局唯一的 taskId.
- ? 然后,對 threadPool 長度取模后,找到 task 從屬的 thread.
- ? 接下來,將 task 投遞到該 thread 的 taskq 中即可.
這里需要注意的是,在投遞任務(wù)到 thread 的 taskq 前,需要先加上該 thread 的讀鎖 readlock. 這是為了和該 thread 下可能正在執(zhí)行的 workStealing 操作進行互斥,避免因 taskq 空間不足而導致死鎖問題. 這個點在竊取流程的講解中詳細展開.
// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
* submit: 提交一個任務(wù)到協(xié)程調(diào)度池中,任務(wù)以閉包函數(shù) void() 的形式組裝
* - 為任務(wù)分配全局遞增且唯一的 taskId
* - 根據(jù) taskId 將任務(wù)均勻分發(fā)給指定 thread
* - 將任務(wù)寫入到指定 thread 的本地任務(wù)隊列中
*/
bool WorkerPool::submit(task task, bool nonblock){
// 若 workerpool 已關(guān)閉,則提交失敗
if(this->m_closed.load()){
returnfalse;
}
// 基于任務(wù) id 對 m_threadPool 長度取模,將任務(wù)映射到指定 thread
int targetThreadId =(s_taskId++)%(this->m_threadPool.size());
thread::ptr targetThr =this->m_threadPool[targetThreadId];
// 針對目標 thread 加讀鎖,這是為了防止和目標 thread 的 workstealing 操作并發(fā)最終因任務(wù)隊列 taskq 容量溢出而導致死鎖
rwlock::readLockGuard guard(targetThr->lock);
// 往對應(yīng) thread 的本地任務(wù)隊列中寫入任務(wù)
return targetThr->taskq->write(task, nonblock);
}
// ...
}}
5.4 公有方法:讓渡執(zhí)行權(quán)
workerpool協(xié)程讓渡流程
task 在運行過程中,可以通過調(diào)用 workerpool::sched 方法完成執(zhí)行權(quán)的主動讓渡. 此時 task 對應(yīng) coroutine 會暫停運行,并將執(zhí)行權(quán)切換回到 thread 主函數(shù)中,然后 thread 會將該 coroutine 暫存到本地協(xié)程隊列 schedq 中,等待后續(xù)再對其調(diào)度執(zhí)行.
// namespace cbricks::pool
namespace cbricks{ namespace pool{
// ...
// sched:讓渡函數(shù). 在任務(wù)執(zhí)行過程中,可以通過該方法主動讓出線程的執(zhí)行權(quán),則此時任務(wù)所屬的協(xié)程會被添加到 thread 的本地協(xié)程隊列 t_schedq 中,等待后續(xù)再被調(diào)度執(zhí)行
void WorkerPool::sched(){
worker::GetThis()->sched();
}
// ...
}}
5.5 線程調(diào)度任務(wù)主流程
workerpool線程調(diào)度主流程
workerpool::work 方法是各 thread 循環(huán)運行的主函數(shù),其中包含了 thread 調(diào)度 task 和 coroutine 的核心邏輯:
- ? 調(diào)度優(yōu)先級一:從 thread 的本地任務(wù)隊列 taskq 中獲取 task 并調(diào)度執(zhí)行
- ? 調(diào)度優(yōu)先級二:當 taskq 為空或者連續(xù)獲取 10 次 taskq 后(為避免 schedq 產(chǎn)生饑餓),會主動獲取一次本地協(xié)程隊列 schedq 中的 coroutine 進行調(diào)度
- ? 調(diào)度優(yōu)先級三:如果 taskq 和 schedq 都是空的,則進入 workstealing 流程,嘗試從其他 thread taskq 中竊取半數(shù) taskq 填充到當前 thread taskq 中
- ? 必要性阻塞:如果經(jīng)歷完上述流程,仍沒有合適的目標供 thread 調(diào)度,則 thread 會依賴 channel 的阻塞消費能力陷入阻塞,從而讓出 cpu 執(zhí)行權(quán),避免資源浪費
// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
* work: 線程運行的主函數(shù)
* 1) 獲取需要調(diào)度的協(xié)程(下述任意步驟執(zhí)行成功,則跳到步驟 2))
* - 從本地任務(wù)隊列 taskq 中取任務(wù),獲取成功則為之初始化協(xié)程實例
* - 從本地協(xié)程隊列 schedq 中取協(xié)程
* - 從其他線程的任務(wù)隊列 taskq 中偷取一半任務(wù)到本地任務(wù)隊列
* 2) 調(diào)度協(xié)程執(zhí)行任務(wù)
* 3) 針對主動讓渡而退出的協(xié)程,添加到本地協(xié)程隊列
* 4) 循環(huán)上述流程
*/
void WorkerPool::work(){
// 獲取到當前 thread 對應(yīng)的本地任務(wù)隊列 taskq
localqPtr taskq =this->getThread()->taskq;
// main loop
while(true){
// 如果 workerpool 已關(guān)閉 則主動退出
if(this->m_closed.load()){
return;
}
/**
* 執(zhí)行優(yōu)先級為 本地任務(wù)隊列 taskq -> 本地協(xié)程隊列 t_t_schedq -> 竊取其他線程任務(wù)隊列 other_taskq
* 為防止饑餓,至多調(diào)度 10 次的 taskq 后,必須嘗試處理一次 t_schedq
*/
// 標識本地任務(wù)隊列 taskq 是否為空
bool taskqEmpty =false;
// 至多調(diào)度 10 次本地任務(wù)隊列 taskq
for(int i =0; i <10; i++){
// 以【非阻塞模式】從 taskq 獲取任務(wù)并為之分配協(xié)程實例和調(diào)度執(zhí)行
if(!this->readAndGo(taskq,false)){
// 如果 taskq 為空,將 taskqEmpty 置為 true 并直接退出循環(huán)
taskqEmpty =true;
break;
}
}
// 嘗試從線程本地的協(xié)程隊列 t_schedq 中獲取協(xié)程并進行調(diào)度
if(!t_schedq.empty()){
// 從協(xié)程隊列中取出頭部的協(xié)程實例
workerPtr worker = t_schedq.front();
t_schedq.pop();
// 進行協(xié)程調(diào)度
this->goWorker(worker);
// 處理完成后直接進入下一輪循環(huán)
continue;
}
// 如果未發(fā)現(xiàn) taskq 為空,則無需 workstealing,直接進入下一輪循環(huán)
if(!taskqEmpty){
continue;
}
/**
* 走到這里意味著 taskq 和 schedq 都是空的,則要嘗試發(fā)起竊取操作
* 隨機選擇一個目標線程竊取半數(shù)任務(wù)添加到本地隊列中
*/
this->workStealing();
/**
* 以【阻塞模式】嘗試從本地任務(wù)獲取任務(wù)并調(diào)度執(zhí)行
* 若此時仍沒有可調(diào)度的任務(wù),則當前 thread 陷入阻塞,讓出 cpu 執(zhí)行權(quán)
* 直到有新任務(wù)分配給當前 thread 時,thread 才會被喚醒
*/
this->readAndGo(taskq,true);
}
}
// ...
}}
workerpool單個任務(wù)處理流程
以 readAndGo 方法為入口,thread 會嘗試從 taskq 中獲取一筆 task;獲取到后,會為 task 構(gòu)建一一對應(yīng)的 coroutine 實例(至此 task/coroutine 與 thread 完全綁定),然后通過 coroutine::go 方法,將 thread 執(zhí)行權(quán)切換至 coroutine 手中,由 coroutine 執(zhí)行其中的 task. 只有在 task 執(zhí)行結(jié)束或者主動讓渡時,執(zhí)行權(quán)才會返還到 thread 主函數(shù)中,此時 thread 會判斷 coroutine 是否是因為主動讓渡而暫停執(zhí)行,如果是的話,則會將該 coroutine 實例追加到 schedq 中,等待后續(xù)尋找合適時機再作調(diào)度執(zhí)行.
// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
* readAndGo:
* - 從指定任務(wù)隊列中獲取一個任務(wù)
* - 為之分配協(xié)程實例并調(diào)度執(zhí)行
* - 若協(xié)程實例未一次性執(zhí)行完成(執(zhí)行了讓渡 sched),則將協(xié)程添加到線程本地的協(xié)程隊列 schedq 中
* param:taskq——任務(wù)隊列;nonblock——是否以非阻塞模式從任務(wù)隊列中獲取任務(wù)
* response:true——成功;false,失?。ㄈ蝿?wù)隊列 taskq 為空)
*/
// 將一個任務(wù)包裝成協(xié)程并進行調(diào)度. 如果沒有一次性調(diào)度完成,則將協(xié)程實例添加到線程本地的協(xié)程隊列 t_schedq
bool WorkerPool::readAndGo(cbricks::pool::WorkerPool::localqPtr taskq, bool nonblock){
// 任務(wù)容器
task cb;
// 從 taskq 中獲取任務(wù)
if(!taskq->read(cb,nonblock)){
returnfalse;
}
// 對任務(wù)進行調(diào)度
this->goTask(cb);
returntrue;
}
/**
* goTask
* - 為指定任務(wù)分配協(xié)程實例
* - 執(zhí)行協(xié)程
* - 若協(xié)程實例未一次性執(zhí)行完成(執(zhí)行了讓渡 sched),則將協(xié)程添加到線程本地的協(xié)程隊列 schedq 中
* param:cb——待執(zhí)行的任務(wù)
*/
void WorkerPool::goTask(task cb){
// 初始化協(xié)程實例
workerPtr _worker(newworker(cb));
// 調(diào)度協(xié)程
this->goWorker(_worker);
}
/**
* goWorker
* - 執(zhí)行協(xié)程
* - 若協(xié)程實例未一次性執(zhí)行完成(執(zhí)行了讓渡 sched),則將協(xié)程添加到線程本地的協(xié)程隊列 schedq 中
* param:worker——待運行的協(xié)程
*/
void WorkerPool::goWorker(workerPtr worker){
// 調(diào)度協(xié)程,此時線程的執(zhí)行權(quán)會切換進入到協(xié)程對應(yīng)的方法棧中
worker->go();
// 走到此處意味著線程執(zhí)行權(quán)已經(jīng)從協(xié)程切換回來
// 如果此時協(xié)程并非已完成的狀態(tài),則需要將其添加到線程本地的協(xié)程隊列 schedq 中,等待后續(xù)繼續(xù)調(diào)度
if(worker->getState()!= sync::Coroutine::Dead){
t_schedq.push(worker);
}
}
// ...
}}
5.6 任務(wù)竊取流程
workerpool跨線程任務(wù)竊取流程
當 thread 發(fā)現(xiàn) taskq 和 schedq 都空閑時,則會嘗試執(zhí)行竊取操作. 此時 thread 隨機選取另一個 thread 作為竊取目標,竊取其 taskq 中的半數(shù) task,追加到本地 taskq 中.
在執(zhí)行竊取操作的過程中,需要對當前 thread 加寫鎖,以避免發(fā)生死鎖問題:
比如在竊取前,當前 thread 判定自己的 taskq 還有足夠空間用于承載竊取來的 task;但是此期間若有新的任務(wù) submit 到來,則可能把 taskq 的空間占據(jù),最后導致沒有足夠容量承載竊取到的 task,最終導致 thread 調(diào)度流程 hang 死在 workstealing 流程無法退出.
上述問題的解法就是,在竊取前,先加 thread 寫鎖(這樣并發(fā)到來的 submit 操作就無法完成 task 投遞)然后再檢查一遍 taskq 并確認容量充足后,再發(fā)起實際的竊取操作.
// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
// 從某個 thread 中竊取一半任務(wù)給到本 thread 的 taskq
void WorkerPool::workStealing(){
// 選擇一個竊取的目標 thread
thread::ptr stealFrom =this->getStealingTarget();
if(!stealFrom){
return;
}
// 從目標 thread 中竊取半數(shù)任務(wù)添加到本 thread taskq 中
this->workStealing(this->getThread(),stealFrom);
}
// 從 thread:stealFrom 中竊取半數(shù)任務(wù)給到 thread:stealTo
void WorkerPool::workStealing(thread::ptr stealTo, thread::ptr stealFrom){
// 確定竊取任務(wù)數(shù)量:目標本地任務(wù)隊列 taskq 中任務(wù)總數(shù)的一半
int stealNum = stealFrom->taskq->size()/2;
if(stealNum <=0){
return;
}
// 針對 thread:stealTo 加寫鎖,防止因 workstealing 和 submit 行為并發(fā),導致線程因 taskq 容量溢出而發(fā)生死鎖
rwlock::lockGuard guard(stealTo->lock);
// 檢查此時 stealTo 中的 taskq 如果剩余容量已不足以承載擬竊取的任務(wù)量,則直接退出
if(stealTo->taskq->size()+ stealNum > stealTo->taskq->cap()){
return;
}
// 創(chuàng)建任務(wù)容器,以非阻塞模式從 stealFrom 的 taskq 中竊取指定數(shù)量的任務(wù)
std::vector<task> containers(stealNum);
if(!stealFrom->taskq->readN(containers,true)){
return;
}
// 將竊取到的任務(wù)添加到 stealTo 的 taskq 中
stealTo->taskq->writeN(containers,false);
}
// 隨機選擇本 thread 外的一個 thread 作為竊取的目標
WorkerPool::thread::ptr WorkerPool::getStealingTarget(){
// 如果線程池長度不足 2,直接返回
if(this->m_threadPool.size()<2){
returnnullptr;
}
// 通過隨機數(shù),獲取本 thread 之外的一個目標 thread index
int threadIndex =WorkerPool::getThreadIndex();
int targetIndex =rand()%this->m_threadPool.size();
while( targetIndex == threadIndex){
targetIndex =rand()%this->m_threadPool.size();
}
// 返回目標 thread
returnthis->m_threadPool[targetIndex];
}
// ...
}}
6 總結(jié)
祝賀,至此本文結(jié)束. 本篇和大家探討了,如何基于 c++ 從零到一實現(xiàn)一個協(xié)程調(diào)度框架,其核心功能包括:
- ? 創(chuàng)建指定數(shù)量線程持續(xù)復用,調(diào)度后續(xù)到來的任務(wù)
- ? 以閉包函數(shù)的風格提交任務(wù)到框架中,由異步協(xié)程完成執(zhí)行
- ? 任務(wù)運行過程中支持通過主動讓渡操作讓出調(diào)度執(zhí)行權(quán)
- ? 支持線程間的任務(wù)竊取操作,使得各調(diào)度線程間忙閑有度、負載均衡