線程池的使用場景和代碼實(shí)現(xiàn)!
前言:
大家周末好,今天給大家?guī)硪黄夹g(shù)文章,是關(guān)于線程池的實(shí)現(xiàn)和使用場景;我相信大家在公司里面的代碼里面經(jīng)常看到這個(gè)線程池的用法,或者甚至大家可能會聽到內(nèi)存池、對象池、連接池等這些專業(yè)術(shù)語,反正就很多帶池的專業(yè)術(shù)語,不過你會發(fā)現(xiàn)他們都有一個(gè)共同的特點(diǎn)就是“屁股”末尾都帶一個(gè)“池”字,池字,簡單理解就是用來存東西的,舉個(gè)簡單例子來說,你比如游泳池里面可以用來存儲水!
好了簡單說了一下,后面的哪些什么內(nèi)存池、連接池,后期復(fù)習(xí)都再給大家分享吧,今天我們的主題是線程池。
一、線程池的實(shí)現(xiàn):
1、為啥要用到線程池?
多線程編程,大家這個(gè)應(yīng)該很熟悉了,上次有一位朋友問了一個(gè)問題,一個(gè)線程大概占用多大內(nèi)存大小,一般按照POSIX標(biāo)準(zhǔn)來算的話,一個(gè)線程大概在8M左右,但是我們一般內(nèi)存資源有限,在進(jìn)行高并發(fā)的時(shí)候,比如說,多個(gè)客戶端同時(shí)向服務(wù)器端發(fā)送請求:
這個(gè)時(shí)候,你想一下給這么多客戶端都分配開一個(gè)大概8M的內(nèi)存大小,這現(xiàn)實(shí)嘛,顯然行不通的嘛,我們來計(jì)算一下:
- 一個(gè)線程:8M
- 1024M可以開128個(gè)線程
- 16G內(nèi)存大小可以開16x128,計(jì)算下來大概在2048個(gè)線程
所以百萬級個(gè)客戶端都分配開一個(gè)線程的話,那內(nèi)存資源肯定是不夠的,所以這涉及到我們的線程池了,這也是為什么在這種場景下要使用線程池了!
為了幫助大家更好的理解線程池這個(gè)概念,我們還是舉一個(gè)生活當(dāng)中的實(shí)際場景吧;去銀行存錢或者辦理相關(guān)業(yè)務(wù),這個(gè)大家都不陌生吧,你到了銀行里面,一般來說的話,都要排隊(duì)在窗口等待前面的人把業(yè)務(wù)辦理完,才能夠輪到你來辦理你想要辦理的業(yè)務(wù),而窗口里面就是幫你辦理各種業(yè)務(wù)的銀行工作人員,同時(shí)一般窗口辦理業(yè)務(wù)上面有一個(gè)提示電子信息,如果輪到了你,就會通知你,你就知道了輪到自己辦理業(yè)務(wù)了。
這里換個(gè)專業(yè)的角度來說(也不專業(yè)哈,只是一個(gè)打比方),你來辦理的這個(gè)業(yè)務(wù)就是一個(gè)任務(wù)(也就是一個(gè)線程,可以說成任務(wù)隊(duì)列,因?yàn)橐抨?duì)嘛,不可能一下子執(zhí)行那么多任務(wù),任務(wù)隊(duì)列里面的任務(wù)必須一個(gè)一個(gè)執(zhí)行),而銀行工作人員相當(dāng)于從任務(wù)隊(duì)列里面拿一個(gè)任務(wù)來執(zhí)行,你可以把銀行工作人員看成是執(zhí)行任務(wù)隊(duì)列;而電子顯示通知信息,你可以把它看成防止多個(gè)業(yè)務(wù)同時(shí)在一個(gè)窗口讓一個(gè)銀行工作人員來辦理,兩個(gè)窗口也就是兩個(gè)銀行工作人員同時(shí)辦理一個(gè)業(yè)務(wù),也就是說這個(gè)電子顯示信息是一個(gè)管理組件,管理任務(wù)是否可以去辦理,管理著銀行工作人員是否開始辦理業(yè)務(wù)任務(wù),不讓他們亂套了,合理有效的執(zhí)行任務(wù)。
那么你從上面可以看到,使用線程池的優(yōu)點(diǎn)了:
- 避免線程太多,使得內(nèi)存耗盡
- 開始的時(shí)候,你可以把創(chuàng)建好的線程放入到線程池當(dāng)中去,當(dāng)我們要用的時(shí)候,就可以從線程池里面拿一個(gè)線程來用,用完這個(gè)線程的時(shí)候,再把這個(gè)線程放回到線程池里面;避免創(chuàng)建線程與銷毀的代價(jià)
2、線程池實(shí)現(xiàn)模板步驟:
其實(shí)這個(gè)線程池的實(shí)現(xiàn)大概流程步驟都差不多,如果大家平時(shí)仔細(xì)看公司代碼或者說自己去實(shí)現(xiàn)一個(gè)線程池的話,大概實(shí)現(xiàn)模板如下:
- 任務(wù)隊(duì)列(前來辦理業(yè)務(wù)的人)
- 執(zhí)行隊(duì)列(就是銀行工作人員執(zhí)行任務(wù)隊(duì)列里面的任務(wù))
- 管理組件(管理任務(wù)有序的執(zhí)行)
3、線程池實(shí)現(xiàn)結(jié)構(gòu)體定義:
任務(wù)隊(duì)列:
- struct nTask
- {
- //用函數(shù)指針來存放不同的任務(wù)
- void (*task_func)(struct nTask *task);
- //這個(gè)參數(shù)用來做任務(wù)執(zhí)行的參數(shù)
- void *user_data;
- //鏈表節(jié)點(diǎn)的定義,這里采用鏈表的方式實(shí)現(xiàn)
- struct nTask *prev;
- struct nTask *next;
- };
- 執(zhí)行隊(duì)列:
- struct nWorker
- {
- pthread_t threadid;//線程id
- int terminate;//表示是否終止任務(wù)
- //表示銀行工作人員要執(zhí)行任務(wù)還要向執(zhí)行組件通告一下
- struct nManager *manager;
- //還是通過鏈表的方式來實(shí)現(xiàn)執(zhí)行隊(duì)列
- struct nWorker *prev;
- struct nWorker *next;
- };
注意:這里如果沒有辦理業(yè)務(wù)的人來,銀行工作人員只能在哪里等待任務(wù)的到來,然后再執(zhí)行任務(wù)。
- 管理組件:
- typedef struct nManager
- {
- struct nTask *task;
- struct nWorker *workers;
- pthread_mutex_t mutex;//互斥鎖
- pthread_cond_t cond;//條件變量
- }ThreadPool;
- 鏈表的插入和刪除模板:
- //插入
- #define LIST_INSERT(item,list) do{\
- item->prev=NULL; \
- item->next=list; \
- if((list)!=NULL) list->prev=item;\
- list=item;
- }while(0)
- //刪除
- #define LIST_REMOVE(item,list) do{ \
- if(item->prev != NULL) item->prev->next = item->next; \
- if(item->next !=NULL) item->next->prev=item->prev; \
- if(list == item)list = item->netx; \
- item->prev=item->next=NULL;\
- }while(0)
- }
4、線程池接口定義如下:
1、線程池初始化接口:
- int nThreadPoolCreate(ThreadPool *pool,int numWorkers)
- {
- //參數(shù)pool表示線程池,numWorkers表示線程池里面有多少個(gè)任務(wù)
- }
2、線程池銷毀接口:
- int nThreadPoolDestory(ThreadPool *pool,int nWorker)
- {
- }
3、往線程池里面添加任務(wù)接口:
- int nThreadPoolPushTask(ThreadPool *pool,struct nTask *task)
- {
- }
4、線程回調(diào)函數(shù):
- void *nThreadPoolCallback(void *arg)
- {
- }
二、線程池工程代碼:
- #include <stdio.h>
- #include <string.h>
- #include <stdlib.h>
- #include <pthread.h>
- //鏈表插入
- #define LIST_INSTER(item,list)do{ \
- item->prev=NULL; \
- item->next=next; \
- if(list!=NULL) list->prev=item; \
- list=item;
- }while(0)
- //刪除
- #define LIST_REMOVE(item,list)do { \
- if(item->prev!=NULL)item->prev->next=item->next; \
- if(item->next!=NULL)itme->next->prev=item->prev;\
- if(list==item)list=item->next;
- item->prev=item->next=NULL;
- }while(0)
- //任務(wù)隊(duì)列
- struct nTask
- {
- void(*task_funt)(struct nTask *task);
- void *uset_data;
- struct nTask *prev;
- struct nTask *next;
- };
- //執(zhí)行隊(duì)列
- struct nWorker
- {
- pthread_t threadid;
- int terminate;
- struct nManager *manager;
- struct nWorker *prev;
- struct nWorker *next;
- };
- //管理組件
- typedef struct nManager
- {
- struct nTask *tasks;
- struct nWoker *workers;
- pthread_mutex_t mutex;
- pthread_cond_t cond;
- }ThreadPool;
- //線程回調(diào)函數(shù)
- void *nThreadPoolCallback(void *arg)
- {
- struct nWorker *worker=(struct nWorker*)arg;
- while(1)
- {
- //判斷是否有任務(wù)
- pthread_mutex_lock(&worker->manager-mutex);
- while(worker->manager->tasks==NULL)
- {
- if(worker-terminate)
- break;
- pthread_cond_wait(&worker->manager->cond,&worker->manager->mutex);//如果沒有任務(wù),一直等待任務(wù)的到來
- }
- if(worker->terminate)
- {
- pthread_mutex_unlock(&worker->manager->mutex);
- break;
- }
- struct nTask *task = worker->manager->tasks;
- LIST_REMOVE(task,worker->manager->tasks);
- pthread_mutex_unlock(&worker->manager->mutex);
- task->task_func(task);
- }
- free(worker);
- }
- //創(chuàng)建線程池
- int nThreadPoolCreate(ThreadPool *pool, int numWorkers)
- {
- if(pool == NULL) return -1;
- if(numWorkers < 1)numWorkers =1;
- memset(&pool,0,sizeof(ThreadPool));
- //開始初始化
- pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
- memcpy(&pool->cond,&blank_cond,sizeof(pthread_cond_t));
- pthread_mutex_t blank_mutex =PTHREAD_MUTEX_INITIALIZER;
- memcpy(&pool->mutex,&blank_mutex,sizeof(pthread_mutex_t));
- int i =0;//開線程的個(gè)數(shù),也就是執(zhí)行任務(wù)的個(gè)數(shù)
- for(i=0;i < numWorkers;i++)
- {
- struct nWorker *worker =(struct nWorker*)malloc(sizeof(struct nWorker));
- if(worker == NUll)
- {
- perror("malloc");
- return -2;
- }
- memset(worker,0,sizeof(struct nWorker));
- worker->manager=pool;
- //創(chuàng)建線程
- int ret=pthread_create(&worker->pthreadid,NULL,nThreadPoolCallback,worker);
- if(ret)
- {
- perror("pthread_create");
- free(worker);
- return -3;
- }
- LIST_INSERT(worker,pool->workers);
- }
- }
- //線程池銷毀
- int nThreadPoolDestory(ThreadPool *pool,int nWorker)
- {
- struct nWorker *worker = NULL;
- for(worker=pool->workers;worker!=NULL;worker=worker->next)
- {
- worker->terminate;
- }
- pthread_mutex_lock(&pool->mutex);
- pthread_cond_broadcast(&pool->cond);//做一個(gè)廣播通知
- pthread_mutex_unlock(&pool->mutex);
- pool->workers = NULL;
- pool->tasks = NULL;
- }
- //往線程池里面添加任務(wù)
- int nThreadPoolPushTask(ThreadPool *pool,struct nTask *task)
- {
- pthread_mutex_lock(&pool->mutex);
- LIST_INSERTER(task,pool->tasks);
- pthread_cond_sigal(&pool->cond);// 發(fā)送一個(gè)信號,有人來辦理業(yè)務(wù)了
- pthread_mutex_unlock(&pool-mutex);
- }
- #if 1
- #define THREADPOOL_INIT_COUNT 20
- #define TASK_INIT_SIZE 1000
- void task_entry(struct nTask *task) { //type
- //struct nTask *task = (struct nTask*)task;
- int idx = *(int *)task->user_data;
- printf("idx: %d\n", idx);
- free(task->user_data);
- free(task);
- }
- int main(void) {
- ThreadPool pool = {0};
- nThreadPoolCreate(&pool, THREADPOOL_INIT_COUNT);
- // pool --> memset();
- int i = 0;
- for (i = 0;i < TASK_INIT_SIZE;i ++) {
- struct nTask *task = (struct nTask *)malloc(sizeof(struct nTask));
- if (task == NULL) {
- perror("malloc");
- exit(1);
- }
- memset(task, 0, sizeof(struct nTask));
- task->task_func = task_entry;
- task->user_data = malloc(sizeof(int));
- *(int*)task->user_data = i;
- nThreadPoolPushTask(&pool, task);
- }
- getchar();
- }
代碼量稍微有點(diǎn)多,大家可以多多看看幾遍!
本文轉(zhuǎn)載自微信公眾號「txp玩Linux」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系txp玩Linux公眾號。