Linux高性能編程-線程池
今天我們來(lái)學(xué)習(xí)Linux線程池,線程池是高并發(fā)場(chǎng)景必須具備的軟件組件,很多開(kāi)源項(xiàng)目都會(huì)使用線程池,話不多說(shuō),直接開(kāi)始。
1.線程池簡(jiǎn)介
1.1 什么是線程池?
線程池就是提前創(chuàng)建好一批線程,通過(guò)一個(gè)池子來(lái)管理所有的線程。當(dāng)有任務(wù)時(shí),從池子中取出一個(gè)線程去執(zhí)行該任務(wù),執(zhí)行結(jié)束后,線程并不會(huì)死亡,而是再次返回線程池中成為空閑狀態(tài),等待執(zhí)行下一個(gè)任務(wù)。
我們通過(guò)幾個(gè)圖來(lái)講解一下線程池的作用。
1)常規(guī)多線程
圖片
常規(guī)多線程方式,每個(gè)任務(wù)都會(huì)創(chuàng)建一個(gè)新的線程來(lái)處理,任務(wù)處理完后線程會(huì)銷(xiāo)毀。
優(yōu)點(diǎn):在任務(wù)量比較少的情況下,任務(wù)執(zhí)行效率比較高。
缺點(diǎn):1.隨著任務(wù)量增多,線程數(shù)量會(huì)越來(lái)越多,線程開(kāi)銷(xiāo)(CPU,內(nèi)存開(kāi)銷(xiāo))很大,如果不控制線程數(shù)量,系統(tǒng)會(huì)出現(xiàn)異常。2.缺乏任務(wù)管理機(jī)制。3.缺乏多線程管理機(jī)制。
2)簡(jiǎn)單的線程池
圖片
簡(jiǎn)單線程池通過(guò)任務(wù)隊(duì)列和線程池完成。
新任務(wù)產(chǎn)生后存儲(chǔ)在任務(wù)隊(duì)列,線程池中的空閑線程從任務(wù)隊(duì)列獲取任務(wù)并執(zhí)行。
優(yōu)點(diǎn):1.線程數(shù)量可控,能夠保證系統(tǒng)安全。2.任務(wù)和線程統(tǒng)一管理,方便程序設(shè)計(jì)。
缺點(diǎn):1.沒(méi)有任務(wù)拒絕機(jī)制,任務(wù)會(huì)堆積在任務(wù)隊(duì)列。2.線程池線程數(shù)量固定,無(wú)法動(dòng)態(tài)調(diào)節(jié)線程數(shù)量,導(dǎo)致任務(wù)處理效率不高。
3)完善的線程池
圖片
完善的線程池需要具備一下幾個(gè)優(yōu)點(diǎn):
- 多線程動(dòng)態(tài)管理。
- 任務(wù)實(shí)時(shí)響應(yīng)。
- 高并發(fā)安全保護(hù)機(jī)制。
線程池被分為核心線程和動(dòng)態(tài)線程,核心線程會(huì)一直運(yùn)行保證基礎(chǔ)業(yè)務(wù),當(dāng)任務(wù)越來(lái)越多的時(shí)候,核心線程無(wú)法保證任務(wù)快速響應(yīng),此時(shí)需要通過(guò)創(chuàng)建動(dòng)態(tài)線程提高線程響應(yīng)速度。
當(dāng)動(dòng)態(tài)線程到達(dá)極限時(shí),系統(tǒng)的處理能力到達(dá)瓶頸,此時(shí)需要啟動(dòng)安全保護(hù)機(jī)制,通過(guò)任務(wù)拒絕,保證系統(tǒng)安全。
1.2 線程池優(yōu)點(diǎn)
通過(guò)上述分析,我們了解到線程池有以下優(yōu)點(diǎn):
- 降低開(kāi)銷(xiāo):線程池通過(guò)重用已存在的線程,降低了線程的創(chuàng)建和銷(xiāo)毀所帶來(lái)的開(kāi)銷(xiāo),提高了性能。
- 控制并發(fā)數(shù):線程池能夠有效控制同時(shí)執(zhí)行的線程數(shù)量,防止因線程過(guò)多導(dǎo)致系統(tǒng)資源耗盡或性能下降,提高系統(tǒng)穩(wěn)定性。
- 提高響應(yīng)速度:由于線程池中的線程可以復(fù)用,減少了線程創(chuàng)建的時(shí)間,因此提高了任務(wù)的響應(yīng)速度。
- 提供豐富功能:線程池不僅可以執(zhí)行普通任務(wù),還提供定時(shí)執(zhí)行、定期執(zhí)行、單線程執(zhí)行以及并發(fā)數(shù)控制等高級(jí)功能,使得線程的管理和使用更加靈活和高效。
2.線程池設(shè)計(jì)
2.1 整體設(shè)計(jì)
設(shè)計(jì)一個(gè)完善的線程池,我們需要設(shè)計(jì)幾個(gè)核心模塊:狀態(tài)管理、線程管理,任務(wù)管理。
- 狀態(tài)管理:管理線程池的生命周期。
- 線程管理:管理核心線程和動(dòng)態(tài)線程。
- 任務(wù)管理:管理任務(wù)申請(qǐng),任務(wù)拒絕以及任務(wù)高效調(diào)度。
2.2 詳細(xì)設(shè)計(jì)
線程池定義(struct thread_pool):
typedef struct thread_pool {
struct list_head head; //任務(wù)隊(duì)列
int task_max; //任務(wù)隊(duì)列最大長(zhǎng)度
int task_num; //任務(wù)隊(duì)列實(shí)際長(zhǎng)度
int core_num; //核心線程數(shù)量
int dyn_max; //動(dòng)態(tài)線程最大數(shù)量
atomic_int dyn_num; //動(dòng)態(tài)線程實(shí)際數(shù)量
pthread_t *core_th; //核心線程pthread_t數(shù)組
pthread_t *dyn_th; //動(dòng)態(tài)線程pthread_t數(shù)組
int status; //線程池狀態(tài)
pthread_mutex_t mutex; //互斥鎖
pthread_cond_t cond; //條件變量
} thread_pool_t;
線程池通過(guò)struct thread_pool結(jié)構(gòu)體定義,每個(gè)成員的作用已在代碼中注釋。
創(chuàng)建線程池:
thread_pool_t* thread_pool_create(int core_num, int dyn_max) {
thread_pool_t *tp = (thread_pool_t *)malloc(sizeof(thread_pool_t));
if (!tp) return NULL;
pthread_mutex_init(&tp->mutex, NULL);
pthread_cond_init(&tp->cond, NULL);
list_head_init(&tp->head);
tp->status = INIT_STATUS;
tp->task_max = TASK_MAX;
tp->task_num = 0;
tp->dyn_max = dyn_max;
atomic_init(&tp->dyn_num, 0);
tp->core_num = core_num;
tp->core_th = malloc(core_num * sizeof(pthread_t));
for (int i = 0; i < tp->core_num; i++) { //創(chuàng)建核心線程
thread_arg *th_arg = (thread_arg *)malloc(sizeof(thread_arg));
*th_arg = (thread_arg) {.tp = tp, .type = CORE_THREAD, .task = NULL};
pthread_create(&tp->core_th[i], NULL, thread_proc, (void *)th_arg);
}
tp->status = RUNNING_STATUS; //設(shè)置線程池為running狀態(tài)
return tp;
}
線程池創(chuàng)建時(shí)會(huì)指定核心線程數(shù)量以及最大動(dòng)態(tài)線程數(shù)量,核心線程跟著線程池一起創(chuàng)建。
動(dòng)態(tài)線程根據(jù)實(shí)際任務(wù)量動(dòng)態(tài)創(chuàng)建,為了防止創(chuàng)建過(guò)多的動(dòng)態(tài)線程,需限制動(dòng)態(tài)線程最大數(shù)量。
銷(xiāo)毀線程池:
void thread_pool_destroy_p(thread_pool_t **ptp) {
if (!ptp) return;
thread_pool_t *tp = *ptp;
while(atomic_load(&tp->dyn_num)) { //等待動(dòng)態(tài)線程全部退出
usleep(10 * 1000);
}
for (int i = 0; i < tp->core_num; i++) { //等待核心線程全部退出
pthread_join(tp->core_th[i], NULL);
}
free(tp->core_th);
pthread_mutex_destroy(&tp->mutex);
pthread_cond_destroy(&tp->cond);
tp->status = STOP_STATUS; //設(shè)置線程池為stop狀態(tài)
free(tp);
ptp = NULL;
return;
}
線程池銷(xiāo)毀需要回收核心線程和動(dòng)態(tài)線程,核心線程采用pthread_join方式回收。
動(dòng)態(tài)線程設(shè)置分離屬性,線程池成員dyn_num(原子變量)用于記錄仍在工作的動(dòng)態(tài)線程數(shù)量,dyn_num等于0時(shí)表示當(dāng)前所有的動(dòng)態(tài)線程都已經(jīng)退出。
2.2.1 狀態(tài)管理
線程池整個(gè)生命周期可分為4個(gè)狀態(tài):
#define INIT_STATUS (1<<0) //init狀態(tài)
#define RUNNING_STATUS (1<<1) //running狀態(tài)
#define SHUTDOWN_STATUS (1<<2) //shutdown狀態(tài)
#define STOP_STATUS (1<<3) //stop狀態(tài)
- init狀態(tài):線程池創(chuàng)建時(shí)的狀態(tài),init狀態(tài)線程不能處理任務(wù)。
- running狀態(tài):線程池初始化完畢后設(shè)置為running狀態(tài),此時(shí)線程池能夠提取任務(wù)并執(zhí)行。
- shutdown狀態(tài):線程池正?;蛘弋惓M顺?,設(shè)置為shutdown狀態(tài),此時(shí)線程池能繼續(xù)處理工作隊(duì)列中剩余任務(wù),但無(wú)法再接收新的任務(wù)。
- stop狀態(tài):線程池所有線程全部釋放,線程池被銷(xiāo)毀。
2.2.2 線程管理
核心線程和動(dòng)態(tài)線程處理函數(shù):
void *thread_proc(void *arg) {
thread_arg *th_arg = (thread_arg *)arg;
thread_pool_t *tp = th_arg->tp;
pid_t tid = gettid();
//動(dòng)態(tài)線程設(shè)置成分離模式,方便管理
if (th_arg->type == DYN_THREAD) pthread_detach(pthread_self());
int count = 0;
task_t *task = NULL;
while(1) {
if (th_arg->task) { //創(chuàng)建線程并執(zhí)行首次任務(wù)
printf("tid:%d first process task\n", tid);
task = th_arg->task;
th_arg->task = NULL;
} else {
task = thread_get_task(tp); //從任務(wù)隊(duì)列中獲取任務(wù)并執(zhí)行
if (!task) {
if (tp->status != RUNNING_STATUS) break; //線程池被關(guān)閉,線程退出
count++;
if ((count >= 10) && (th_arg->type == DYN_THREAD)) {
printf("dyn thread 10s break\n");
break; //動(dòng)態(tài)線程空閑10秒自動(dòng)退出
}
continue;
}
count = 0;
}
task->cb(task->arg);
task_freep(&task);
}
free(th_arg);
if (th_arg->type == DYN_THREAD) {
atomic_fetch_sub(&tp->dyn_num, 1);
}
return NULL;
}
1)核心線程
核心線程數(shù)量固定,在線程池創(chuàng)建時(shí)會(huì)創(chuàng)建所有的核心線程,線程池退出時(shí)會(huì)銷(xiāo)毀所有的核心線程。
2)動(dòng)態(tài)線程
動(dòng)態(tài)線程的管理比較復(fù)雜,需要根據(jù)任務(wù)數(shù)量做動(dòng)態(tài)調(diào)整,任務(wù)量大時(shí),動(dòng)態(tài)線程會(huì)被創(chuàng)建,提高線程池任務(wù)響應(yīng)速率,任務(wù)量小時(shí),空閑的動(dòng)態(tài)線程會(huì)被回收,從而減少線程開(kāi)銷(xiāo)。
2.2.3 任務(wù)管理
任務(wù)定義(struct task):
typedef enum TASK_TYPE {
FREE_TYPE,
NOFREE_TYPE,
}TASK_TYPE;
typedef void (*func)(void *arg);
typedef struct task {
struct list_head list; //隊(duì)列節(jié)點(diǎn)
func cb; //回調(diào)函數(shù)
void *arg; //回調(diào)函數(shù)參數(shù)
TASK_TYPE type; //任務(wù)參數(shù)是否需要釋放
}task_t;
任務(wù)通過(guò)struct task定義,任務(wù)主要成員:
- list:隊(duì)列節(jié)點(diǎn),用于插入和移除任務(wù)隊(duì)列。
- cb:回調(diào)函數(shù),任務(wù)處理函數(shù)。
- arg:回調(diào)函數(shù)參數(shù)。
- type:回調(diào)函數(shù)參數(shù)是否需要釋放標(biāo)志。
任務(wù)申請(qǐng)流程如下圖,創(chuàng)建一個(gè)新的任務(wù)后,需要做一些檢測(cè)才能將任務(wù)加入線程池,檢測(cè)不通過(guò)則執(zhí)行任務(wù)拒絕,從而保證線程池始終處于安全高效運(yùn)行狀態(tài)。
任務(wù)執(zhí)行完畢后,需釋放任務(wù),回收資源。
圖片
線程池一定要做任務(wù)管理,任務(wù)管理的目的有兩個(gè):
- 保證線程池安全,不會(huì)堆積過(guò)多任務(wù),消耗CPU和內(nèi)存資源。
- 提高任務(wù)處理效率。
3.線程池測(cè)試
測(cè)試環(huán)境:樹(shù)莓派4B,4核,4GB。
分別采用多線程和線程池方式測(cè)試CPU密集型和IO密集型任務(wù),對(duì)比兩種方式性能和效率的差異。每毫秒產(chǎn)生1個(gè)任務(wù),總共測(cè)試10000個(gè)任務(wù)。
通過(guò)time命令執(zhí)行測(cè)試程序,記錄測(cè)試程序執(zhí)行情況。
測(cè)試代碼如下:
(完整代碼請(qǐng)聯(lián)系博主獲取)
#define CORE_THREAD_NUM (4) //核心線程數(shù)量
#define DYN_THREAD_MAX (32) //動(dòng)態(tài)線程最大數(shù)量
#define TASK_MAX (128) //任務(wù)隊(duì)列任務(wù)最大數(shù)量
#define ENABLE_THREAD_POOL (0) //是否開(kāi)啟線程池,0:關(guān)閉 1:開(kāi)啟
#define TEST_TASK_NUM (10000) //測(cè)試任務(wù)數(shù)量
#define TASK_INTERVAL (1000) //任務(wù)產(chǎn)生間隔時(shí)間,單位:毫秒
#define TEST_TASK_TYPE (0) //任務(wù)類(lèi)型,0:CPU密集型 1:IO密集型
#define NOP_TIMES (10000000) //CPU密集型任務(wù)執(zhí)行空指令次數(shù)
#define RAND_RANGE (1 << 18) //IO密集型任務(wù)休眠時(shí)間隨機(jī)范圍,單位:毫秒
void cpu_stress() {
for (int i = 0; i < NOP_TIMES; i++) {
;
}
}
void rand_sleep() {
srand(time(0));
int ms = rand() & (RAND_RANGE - 1);
usleep(ms);
}
//任務(wù)處理函數(shù)
void task_cb(void *arg) {
#if !TEST_TASK_TYPE
cpu_stress();
#else
rand_sleep();
#endif
return;
}
atomic_int running_threads;
void *thread_proc1(void *arg) {
atomic_fetch_add(&running_threads, 1);
pthread_detach(pthread_self());
task_t *task = (task_t *)arg;
task->cb(task->arg);
free(task);
atomic_fetch_sub(&running_threads, 1);
return NULL;
}
int main(int argc, char *argv[]) {
#if ENABLE_THREAD_POOL
thread_pool_t *tp = thread_pool_create(CORE_THREAD_NUM, DYN_THREAD_MAX);
if (!tp) {
printf("thread_pool_create error");
return -1;
}
int seq = 0;
while(1) {
usleep(TASK_INTERVAL);
task_t *task = task_create(task_cb, NULL, FREE_TYPE);
if (!task) {
printf("task create error\n");
usleep(10 * 1000);
continue;
}
int ret = thread_add_task(tp, task);
if (ret == -1) { //任務(wù)拒絕
task_freep(&task);
usleep(10 * 1000);
continue;
}
if (seq++ >= TEST_TASK_NUM) {
thread_pool_exit(tp);
thread_pool_destroy_p(&tp);
break;
}
}
printf("thread pool test done------\n");
#else
atomic_init(&running_threads, 0);
int seq = 0;
int old_num = 0;
while(1) {
usleep(TASK_INTERVAL);
task_t *task = task_create(task_cb, NULL, FREE_TYPE);
if (!task) {
printf("task create error\n");
usleep(10 * 1000);
continue;
}
pthread_t th;
int ret = pthread_create(&th, NULL, thread_proc1, (void *)task);
if (ret != 0) {
free(task);
usleep(10 * 1000);
continue;
}
int num = atomic_load(&running_threads);
if (old_num < num) {
old_num = num;
printf("running_threads:%d\n", num);
}
if (seq++ >= TEST_TASK_NUM) break;
}
printf("threads test done------\n");
#endif
return 0;
}
1)CPU密集型場(chǎng)景測(cè)試
測(cè)試參數(shù)如下:
圖片
多線程測(cè)試結(jié)果-->:
圖片
CPU使用率397%(已使用完),最高同時(shí)創(chuàng)建913個(gè)線程,完成測(cè)試時(shí)間2分鐘,用戶(hù)時(shí)間7分51秒,系統(tǒng)時(shí)間0.5秒。
線程池測(cè)試結(jié)果-->:
圖片
CPU使用率398%(已使用完),核心線程4個(gè),動(dòng)態(tài)線程32個(gè),共36各個(gè)線程,完成測(cè)試時(shí)間1分56秒,用戶(hù)時(shí)間7分41秒,系統(tǒng)時(shí)間0.04秒。
小節(jié):CPU密集場(chǎng)景多線程和線程池方式處理效率相差不大,多線程方式最多同時(shí)創(chuàng)建900多個(gè)線程,會(huì)消耗大量系統(tǒng)資源。線程池方式線程始終控制在36個(gè),比較安全。
2)IO密集型場(chǎng)景測(cè)試
測(cè)試參數(shù)如下:
圖片
多線程測(cè)試結(jié)果-->:
圖片
CPU使用率10.2%(已使用完),最高同時(shí)創(chuàng)建212個(gè)線程,完成測(cè)試時(shí)間11秒,用戶(hù)時(shí)間0.15秒,系統(tǒng)時(shí)間為1秒。
線程池測(cè)試結(jié)果-->:
圖片
圖片
CPU使用率3.6%(已使用完),核心線程4個(gè),動(dòng)態(tài)線程32個(gè),共36各個(gè)線程,完成測(cè)試時(shí)間20秒,用戶(hù)時(shí)間0.25秒,系統(tǒng)時(shí)間為0.32秒。
小節(jié):IO密集型場(chǎng)景線程處于IO阻塞狀態(tài),CPU使用率并不高,此時(shí)可以適當(dāng)增加線程數(shù)量來(lái)提高CPU利用率。
總結(jié):
- 線程池能夠有效控制線程數(shù)量,防止線程過(guò)多對(duì)系統(tǒng)造成危害。
- 線程池能夠高效管理任務(wù),使軟件開(kāi)發(fā)更方便、高效。
- 從測(cè)試結(jié)果來(lái)看,多線程和線程池方式處理任務(wù)效率相差并不大,即使頻繁的創(chuàng)建和銷(xiāo)毀線程,也未對(duì)效率產(chǎn)生很大影響。