聊聊Linux C下線程池的使用
線程池也是多線程的處理方式。是將“生產(chǎn)者”線程提出任務(wù)添加到“任務(wù)隊(duì)列”,然后一些線程自動(dòng)完成“任務(wù)隊(duì)列”上的任務(wù)。
多線程編程,創(chuàng)建一個(gè)線程,指定去完成某一個(gè)任務(wù),等待線程的退出。雖然能夠滿足編程需求,但是當(dāng)我們需要?jiǎng)?chuàng)建大量的線程的時(shí)候,在創(chuàng)建過(guò)程以及銷毀線程的過(guò)程中可能會(huì)消耗大量的CPU.增加很大開銷。如:文件夾的copy、WEB服務(wù)器的響應(yīng)。
線程池就是用來(lái)解決類似于這樣的一個(gè)問(wèn)題的,可以降低頻繁地創(chuàng)建和銷毀線程所帶來(lái)地開銷。
線程池技術(shù)思路:一般采用預(yù)創(chuàng)建線程技術(shù),也就是提前把需要用線程先創(chuàng)建一定數(shù)目。這些線程提前創(chuàng)建好了之后,“任務(wù)隊(duì)列”里面假設(shè)沒有任務(wù),那么就讓這些線程休眠,一旦有任務(wù),就喚醒線程去執(zhí)行任務(wù),任務(wù)執(zhí)行完了,也不需要去銷毀線程,直到當(dāng)你想退出或者是關(guān)機(jī)時(shí),這個(gè)時(shí)候,那么你調(diào)用銷毀線程池地函數(shù)去銷毀線程。
線程完成任務(wù)之后不會(huì)銷毀,而是自動(dòng)地執(zhí)行下一個(gè)任務(wù)。而且,當(dāng)任務(wù)有很多,你可以有函數(shù)接口去增加線程數(shù)量,當(dāng)任務(wù)較少時(shí),你可以有函數(shù)接口去銷毀部分線程。
如果,創(chuàng)建和銷毀線程的時(shí)間對(duì)比執(zhí)行任務(wù)的時(shí)間可以忽略不計(jì),那么我們?cè)谶@種情況下面也就沒有必要用線程池。
“任務(wù)隊(duì)列”是一個(gè)共享資源“互斥訪問(wèn)”

線程池本質(zhì)上也是一個(gè)數(shù)據(jù)結(jié)構(gòu),需要一個(gè)結(jié)構(gòu)體去描述它:
- struct pthread_pool //線程池的實(shí)現(xiàn)
- {
- //一般會(huì)有如下成員
- //互斥鎖,用來(lái)保護(hù)這個(gè)“任務(wù)隊(duì)列”
- pthread_mutex_t lock; //互斥鎖
- //線程條件變量 表示“任務(wù)隊(duì)列”是否有任務(wù)
- pthread_cond_t cond; //條件變量
- bool shutdown; //表示是否退出程序 bool:類型 false / true
- //任務(wù)隊(duì)列(鏈表),指向第一個(gè)需要指向的任務(wù)
- //所有的線程都從任務(wù)鏈表中獲取任務(wù) "共享資源"
- struct task * task_list;
- //線程池中有多個(gè)線程,每一個(gè)線程都有tid, 需要一個(gè)數(shù)組去保存tid
- pthread_t * tids; //malloc()
- //線程池中正在服役的線程數(shù),當(dāng)前線程個(gè)數(shù)
- unsigned int active_threads;
- //線程池任務(wù)隊(duì)列最大的任務(wù)數(shù)量
- unsigned int max_waiting_tasks;
- //線程池任務(wù)隊(duì)列上當(dāng)前有多少個(gè)任務(wù)
- unsigned int cur_waiting_tasks;
- //......
- };
- //任務(wù)隊(duì)列(鏈表)上面的任務(wù)結(jié)點(diǎn),只要能夠描述好一個(gè)任務(wù)就可以了,
- //線程會(huì)不斷地任務(wù)隊(duì)列取任務(wù)
- struct task //任務(wù)結(jié)點(diǎn)
- {
- // 1. 任務(wù)結(jié)點(diǎn)表示的任務(wù),“函數(shù)指針”指向任務(wù)要執(zhí)行的函數(shù)(cp_file)
- void*(* do_task)(void * arg);
- //2. 指針,指向任務(wù)指向函數(shù)的參數(shù)(文件描述符)
- void * arg;
- //3. 任務(wù)結(jié)點(diǎn)類型的指針,指向下一個(gè)任務(wù)
- struct task * next;
- };
線程池框架代碼如下,功能自填:
操作線程池所需要的函數(shù)接口:pthread_pool.c 、pthread_pool.h
把“線程池”想象成一個(gè)外包公司,你需要去完成的就是操作線程池所提供的函數(shù)接口。
pthread_pool.c
- #include "pthread_pool.h"
- /*
- init_pool: 線程池初始化函數(shù),初始化指定的線程池中有thread_num個(gè)初始線程
- @pool:指針,指向您要初始化的那個(gè)線程池
- @threa_num: 您要初始化的線程池中開始的線程數(shù)量
- 返回值:
- 成功 0
- 失敗 -1
- */
- int init_pool(pthread_pool * pool , unsigned int threa_num)
- {
- //初始化線程池的結(jié)構(gòu)體
- //初始化線程互斥鎖
- pthread_mutex_init(&pool->lock, NULL);
- //初始化線程條件變量
- pthread_cond_init(&pool->cond, NULL);
- pool->shutdown = false ;// 不退出
- pool->task_list = (struct task*)malloc(sizeof(struct task));
- pool->tids = (pthread_t *)malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);
- if(pool->task_list == NULL || pool->tids == NULL)
- {
- perror("malloc memery error");
- return -1;
- }
- pool->task_list->next = NULL;
- //線程池中一開始初始化多少個(gè)線程來(lái)服役
- pool->active_threads = threa_num;
- //表示線程池中最多有多少個(gè)任務(wù)
- pool->max_waiting_tasks = MAX_WAITING_TASKS;
- //線程池中任務(wù)隊(duì)列當(dāng)前的任務(wù)數(shù)量
- pool->cur_waiting_tasks = 0;
- //創(chuàng)建thread_num個(gè)線程,并且讓線程去執(zhí)行任務(wù)調(diào)配函數(shù),
- //記錄所有線程的tid
- int i = 0;
- for(i = 0; i < threa_num; i++)
- {
- int ret = pthread_create(&(pool->tids)[i], NULL, routine, (void*)pool);
- if(ret != 0)
- {
- perror("create thread error");
- return -1;
- }
- printf("[%lu]:[%s] ===> tids[%d]:[%lu]",pthread_self(),
- __FUNCTION__, i , pool->tids[i]);
- }
- return 0;
- }
- /*
- routine: 任務(wù)調(diào)配函數(shù)。
- 所有線程開始都執(zhí)行此函數(shù),此函數(shù)會(huì)不斷的從線程池的任務(wù)隊(duì)列
- 中取下任務(wù)結(jié)點(diǎn),去執(zhí)行。
- 任務(wù)結(jié)點(diǎn)中包含“函數(shù)指針” h "函數(shù)參數(shù)"
- */
- void * routine(void * arg)
- {
- //arg表示你的線程池的指針
- while()
- {
- //獲取線程互斥鎖,lock
- //當(dāng)線程池沒有結(jié)束的時(shí)候,不斷地從線程池的任務(wù)隊(duì)列取下結(jié)點(diǎn)
- //去執(zhí)行。
- //釋放線程互斥鎖,unlock
- //釋放任務(wù)結(jié)點(diǎn)
- }
- }
- /*
- destroy_pool: 銷毀線程池,銷毀前要保證所有的任務(wù)已經(jīng)完成
- */
- int destroy_pool(pthread_pool * pool)
- {
- //釋放所有空間 等待任務(wù)執(zhí)行完畢(join)。
- //喚醒所有線程
- //利用join函數(shù)回收每一個(gè)線程資源。
- }
- /*
- add_task:給任務(wù)隊(duì)列增加任務(wù), 把do_task指向的任務(wù)(函數(shù)指針)和
- arg指向的參數(shù)保存到一個(gè)任務(wù)結(jié)點(diǎn),添加到pool任務(wù)隊(duì)列中。
- @pool : 您要添加任務(wù)的線程池
- @do_task : 您需要添加的任務(wù)(cp_file)
- @arg: 您要執(zhí)行的任務(wù)的參數(shù)(文件描述符)
- */
- int add_task(pthread_pool *pool,void*(* do_task)(void * arg), void*arg)
- {
- //把第二個(gè)參數(shù)和第三個(gè)參數(shù)封裝成struct task
- //再把它添加到 pool->task 任務(wù)隊(duì)列中去
- //注意任務(wù)隊(duì)列是一個(gè)共享資源
- //假如任務(wù)后要喚醒等待的線程。
- }
- //如果任務(wù)多的時(shí)候,往線程池中添加線程 pthread_create
- int add_threads(pthread_pool * pool, unsigned int num);
- {
- //新創(chuàng)建num個(gè)線程,讓每一個(gè)線程去執(zhí)行線程調(diào)配函數(shù)
- //將每一個(gè)新創(chuàng)建的線程tid,添加到pool-> tids
- }
- //如果任務(wù)少的時(shí)候,減少線程池中線程的數(shù)量 pthread_cancel join
- int remove_threads(pthread_pool * pool, unsigned int num)
- {
- //用pthread_cancel取消num個(gè)線程
- //利用pthread_join函數(shù)去回收資源。
- }
pthread_pool.h
- #ifndef __PTHREAD_POOL_H__
- #define __PTHREAD_POOL_H__
- //表示線程池中最多有多少個(gè)線程
- #define MAX_ACTIVE_THREADS 20
- //表示線程池中最多有多少個(gè)任務(wù)
- #define MAX_WAITING_TASKS 1024
- //任務(wù)隊(duì)列(鏈表)上面的任務(wù)結(jié)點(diǎn),只要能夠描述好一個(gè)任務(wù)就可以了,
- //線程會(huì)不斷地任務(wù)隊(duì)列取任務(wù)
- struct task //任務(wù)結(jié)點(diǎn)
- {
- // 1. 任務(wù)結(jié)點(diǎn)表示的任務(wù),“函數(shù)指針”指向任務(wù)要執(zhí)行的函數(shù)(cp_file)
- void*(* do_task)(void * arg);
- //2. 指針,指向任務(wù)指向函數(shù)的參數(shù)(文件描述符)
- void * arg;
- //3. 任務(wù)結(jié)點(diǎn)類型的指針,指向下一個(gè)任務(wù)
- struct task * next;
- };
- struct pthread_pool //線程池的實(shí)現(xiàn)
- {
- //一般會(huì)有如下成員
- //互斥鎖,用來(lái)保護(hù)這個(gè)“任務(wù)隊(duì)列”
- pthread_mutex_t lock; //互斥鎖
- //線程條件變量 表示“任務(wù)隊(duì)列”是否有任務(wù)
- pthread_cond_t cond; //條件變量
- bool shutdown; //表示是否退出程序 bool:類型 false / true
- //任務(wù)隊(duì)列(鏈表),指向第一個(gè)需要指向的任務(wù)
- //所有的線程都從任務(wù)鏈表中獲取任務(wù) "共享資源"
- struct task * task_list;
- //線程池中有多個(gè)線程,每一個(gè)線程都有tid, 需要一個(gè)數(shù)組去保存tid
- pthread_t * tids; //malloc()
- //線程池中正在服役的線程數(shù),當(dāng)前線程個(gè)數(shù)
- unsigned int active_threads;
- //線程池任務(wù)隊(duì)列最大的任務(wù)數(shù)量
- unsigned int max_waiting_tasks;
- //線程池任務(wù)隊(duì)列上當(dāng)前有多少個(gè)任務(wù)
- unsigned int cur_waiting_tasks;
- //......
- };
- /*
- init_pool: 線程池初始化函數(shù),初始化指定的線程池中有thread_num
- 個(gè)初始線程
- @pool:指針,指向您要初始化的那個(gè)線程池
- @threa_num: 您要初始化的線程池中開始的線程數(shù)量
- 返回值:
- 成功 0
- 失敗 -1
- */
- int init_pool(pthread_pool * pool , unsigned int threa_num);
- /*
- routine: 任務(wù)調(diào)配函數(shù)。
- 所有線程開始都執(zhí)行此函數(shù),此函數(shù)會(huì)不斷的從線程池的任務(wù)隊(duì)列
- 中取下任務(wù)結(jié)點(diǎn),去執(zhí)行。
- 任務(wù)結(jié)點(diǎn)中包含“函數(shù)指針” h "函數(shù)參數(shù)"
- */
- void * routine(void * arg);
- /*
- destroy_pool: 銷毀線程池,銷毀前要保證所有的任務(wù)已經(jīng)完成
- */
- int destroy_pool(pthread_pool * pool);
- /*
- add_task:給任務(wù)隊(duì)列增加任務(wù), 把do_task指向的任務(wù)(函數(shù)指針)和
- arg指向的參數(shù)保存到一個(gè)任務(wù)結(jié)點(diǎn),添加到pool任務(wù)隊(duì)列中。
- @pool : 您要添加任務(wù)的線程池
- @do_task : 您需要添加的任務(wù)(cp_file)
- @arg: 您要執(zhí)行的任務(wù)的參數(shù)(文件描述符)
- */
- int add_task(pthread_pool *pool,void*(* do_task)(void * arg), void*arg);
- //如果任務(wù)多的時(shí)候,往線程池中添加線程 pthread_create
- int add_threads(pthread_pool * pool, unsigned int num);
- //如果任務(wù)少的時(shí)候,減少線程池中線程的數(shù)量 pthread_cancel join
- int remove_threads(pthread_pool * pool, unsigned int num);
- #endif