自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

手寫線程池 - C 語言版

開發(fā) 后端
在各個編程語言的語種中都有線程池的概念,并且很多語言中直接提供了線程池,作為程序猿直接使用就可以了,下面給大家介紹一下線程池的實現(xiàn)原理。

[[442559]]

1. 線程池原理

我們使用線程的時候就去創(chuàng)建一個線程,這樣實現(xiàn)起來非常簡便,但是就會有一個問題:如果并發(fā)的線程數(shù)量很多,并且每個線程都是執(zhí)行一個時間很短的任務就結(jié)束了,這樣頻繁創(chuàng)建線程就會大大降低系統(tǒng)的效率,因為頻繁創(chuàng)建線程和銷毀線程需要時間。

那么有沒有一種辦法使得線程可以復用,就是執(zhí)行完一個任務,并不被銷毀,而是可以繼續(xù)執(zhí)行其他的任務呢?

線程池是一種多線程處理形式,處理過程中將任務添加到隊列,然后在創(chuàng)建線程后自動啟動這些任務。線程池線程都是后臺線程。每個線程都使用默認的堆棧大小,以默認的優(yōu)先級運行,并處于多線程單元中。

如果某個線程在托管代碼中空閑(如正在等待某個事件), 則線程池將插入另一個輔助線程來使所有處理器保持繁忙。如果所有線程池線程都始終保持繁忙,但隊列中包含掛起的工作,則線程池將在一段時間后創(chuàng)建另一個輔助線程但線程的數(shù)目永遠不會超過最大值。超過最大值的線程可以排隊,但他們要等到其他線程完成后才啟動。

在各個編程語言的語種中都有線程池的概念,并且很多語言中直接提供了線程池,作為程序猿直接使用就可以了,下面給大家介紹一下線程池的實現(xiàn)原理:

線程池的組成主要分為 3 個部分,這三部分配合工作就可以得到一個完整的線程池:

1).任務隊列,存儲需要處理的任務,由工作的線程來處理這些任務

  • 通過線程池提供的 API 函數(shù),將一個待處理的任務添加到任務隊列,或者從任務隊列中刪除
  • 已處理的任務會被從任務隊列中刪除
  • 線程池的使用者,也就是調(diào)用線程池函數(shù)往任務隊列中添加任務的線程就是生產(chǎn)者線程

2).工作的線程(任務隊列任務的消費者) ,N個

  • 線程池中維護了一定數(shù)量的工作線程,他們的作用是是不停的讀任務隊列,從里邊取出任務并處理
  • 工作的線程相當于是任務隊列的消費者角色,
  • 如果任務隊列為空,工作的線程將會被阻塞 (使用條件變量 / 信號量阻塞)
  • 如果阻塞之后有了新的任務,由生產(chǎn)者將阻塞解除,工作線程開始工作

3).管理者線程(不處理任務隊列中的任務),1個

  • 它的任務是周期性的對任務隊列中的任務數(shù)量以及處于忙狀態(tài)的工作線程個數(shù)進行檢測
  • 當任務過多的時候,可以適當?shù)膭?chuàng)建一些新的工作線程
  • 當任務過少的時候,可以適當?shù)匿N毀一些工作的線程

 

 

 

 

2. 任務隊列

 

  1. // 任務結(jié)構(gòu)體 
  2. typedef struct Task 
  3.     void (*function)(void* arg); 
  4.     void* arg; 
  5. }Task; 

3. 線程池定義

 

  1. // 線程池結(jié)構(gòu)體 
  2. struct ThreadPool 
  3.     // 任務隊列 
  4.     Task* taskQ; 
  5.     int queueCapacity;  // 容量 
  6.     int queueSize;      // 當前任務個數(shù) 
  7.     int queueFront;     // 隊頭 -> 取數(shù)據(jù) 
  8.     int queueRear;      // 隊尾 -> 放數(shù)據(jù) 
  9.  
  10.     pthread_t managerID;    // 管理者線程ID 
  11.     pthread_t *threadIDs;   // 工作的線程ID 
  12.     int minNum;             // 最小線程數(shù)量 
  13.     int maxNum;             // 最大線程數(shù)量 
  14.     int busyNum;            // 忙的線程的個數(shù) 
  15.     int liveNum;            // 存活的線程的個數(shù) 
  16.     int exitNum;            // 要銷毀的線程個數(shù) 
  17.     pthread_mutex_t mutexPool;  // 鎖整個的線程池 
  18.     pthread_mutex_t mutexBusy;  // 鎖busyNum變量 
  19.     pthread_cond_t notFull;     // 任務隊列是不是滿了 
  20.     pthread_cond_t notEmpty;    // 任務隊列是不是空了 
  21.  
  22.     int shutdown;           // 是不是要銷毀線程池, 銷毀為1, 不銷毀為0 
  23. }; 

4. 頭文件聲明

 

  1. #ifndef _THREADPOOL_H 
  2. #define _THREADPOOL_H 
  3.  
  4. typedef struct ThreadPool ThreadPool; 
  5. // 創(chuàng)建線程池并初始化 
  6. ThreadPool *threadPoolCreate(int minint maxint queueSize); 
  7.  
  8. // 銷毀線程池 
  9. int threadPoolDestroy(ThreadPool* pool); 
  10.  
  11. // 給線程池添加任務 
  12. void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg); 
  13.  
  14. // 獲取線程池中工作的線程的個數(shù) 
  15. int threadPoolBusyNum(ThreadPool* pool); 
  16.  
  17. // 獲取線程池中活著的線程的個數(shù) 
  18. int threadPoolAliveNum(ThreadPool* pool); 
  19.  
  20. ////////////////////// 
  21. // 工作的線程(消費者線程)任務函數(shù) 
  22. void* worker(void* arg); 
  23. // 管理者線程任務函數(shù) 
  24. void* manager(void* arg); 
  25. // 單個線程退出 
  26. void threadExit(ThreadPool* pool); 
  27. #endif  // _THREADPOOL_H 

5. 源文件定義

 

  1. ThreadPool* threadPoolCreate(int minint maxint queueSize) 
  2.     ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool)); 
  3.     do  
  4.     { 
  5.         if (pool == NULL
  6.         { 
  7.             printf("malloc threadpool fail...\n"); 
  8.             break; 
  9.         } 
  10.  
  11.         pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max); 
  12.         if (pool->threadIDs == NULL
  13.         { 
  14.             printf("malloc threadIDs fail...\n"); 
  15.             break; 
  16.         } 
  17.         memset(pool->threadIDs, 0, sizeof(pthread_t) * max); 
  18.         pool->minNum = min
  19.         pool->maxNum = max
  20.         pool->busyNum = 0; 
  21.         pool->liveNum = min;    // 和最小個數(shù)相等 
  22.         pool->exitNum = 0; 
  23.  
  24.         if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 || 
  25.             pthread_mutex_init(&pool->mutexBusy, NULL) != 0 || 
  26.             pthread_cond_init(&pool->notEmpty, NULL) != 0 || 
  27.             pthread_cond_init(&pool->notFull, NULL) != 0) 
  28.         { 
  29.             printf("mutex or condition init fail...\n"); 
  30.             break; 
  31.         } 
  32.  
  33.         // 任務隊列 
  34.         pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize); 
  35.         pool->queueCapacity = queueSize; 
  36.         pool->queueSize = 0; 
  37.         pool->queueFront = 0; 
  38.         pool->queueRear = 0; 
  39.  
  40.         pool->shutdown = 0; 
  41.  
  42.         // 創(chuàng)建線程 
  43.         pthread_create(&pool->managerID, NULL, manager, pool); 
  44.         for (int i = 0; i < min; ++i) 
  45.         { 
  46.             pthread_create(&pool->threadIDs[i], NULL, worker, pool); 
  47.         } 
  48.         return pool; 
  49.     } while (0); 
  50.  
  51.     // 釋放資源 
  52.     if (pool && pool->threadIDs) free(pool->threadIDs); 
  53.     if (pool && pool->taskQ) free(pool->taskQ); 
  54.     if (pool) free(pool); 
  55.  
  56.     return NULL
  57.  
  58. int threadPoolDestroy(ThreadPool* pool) 
  59.     if (pool == NULL
  60.     { 
  61.         return -1; 
  62.     } 
  63.  
  64.     // 關(guān)閉線程池 
  65.     pool->shutdown = 1; 
  66.     // 阻塞回收管理者線程 
  67.     pthread_join(pool->managerID, NULL); 
  68.     // 喚醒阻塞的消費者線程 
  69.     for (int i = 0; i < pool->liveNum; ++i) 
  70.     { 
  71.         pthread_cond_signal(&pool->notEmpty); 
  72.     } 
  73.     // 釋放堆內(nèi)存 
  74.     if (pool->taskQ) 
  75.     { 
  76.         free(pool->taskQ); 
  77.     } 
  78.     if (pool->threadIDs) 
  79.     { 
  80.         free(pool->threadIDs); 
  81.     } 
  82.  
  83.     pthread_mutex_destroy(&pool->mutexPool); 
  84.     pthread_mutex_destroy(&pool->mutexBusy); 
  85.     pthread_cond_destroy(&pool->notEmpty); 
  86.     pthread_cond_destroy(&pool->notFull); 
  87.  
  88.     free(pool); 
  89.     pool = NULL
  90.  
  91.     return 0; 
  92.  
  93.  
  94. void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg) 
  95.     pthread_mutex_lock(&pool->mutexPool); 
  96.     while (pool->queueSize == pool->queueCapacity && !pool->shutdown) 
  97.     { 
  98.         // 阻塞生產(chǎn)者線程 
  99.         pthread_cond_wait(&pool->notFull, &pool->mutexPool); 
  100.     } 
  101.     if (pool->shutdown) 
  102.     { 
  103.         pthread_mutex_unlock(&pool->mutexPool); 
  104.         return
  105.     } 
  106.     // 添加任務 
  107.     pool->taskQ[pool->queueRear].function = func; 
  108.     pool->taskQ[pool->queueRear].arg = arg; 
  109.     pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity; 
  110.     pool->queueSize++; 
  111.  
  112.     pthread_cond_signal(&pool->notEmpty); 
  113.     pthread_mutex_unlock(&pool->mutexPool); 
  114.  
  115. int threadPoolBusyNum(ThreadPool* pool) 
  116.     pthread_mutex_lock(&pool->mutexBusy); 
  117.     int busyNum = pool->busyNum; 
  118.     pthread_mutex_unlock(&pool->mutexBusy); 
  119.     return busyNum; 
  120.  
  121. int threadPoolAliveNum(ThreadPool* pool) 
  122.     pthread_mutex_lock(&pool->mutexPool); 
  123.     int aliveNum = pool->liveNum; 
  124.     pthread_mutex_unlock(&pool->mutexPool); 
  125.     return aliveNum; 
  126.  
  127. void* worker(void* arg) 
  128.     ThreadPool* pool = (ThreadPool*)arg; 
  129.  
  130.     while (1) 
  131.     { 
  132.         pthread_mutex_lock(&pool->mutexPool); 
  133.         // 當前任務隊列是否為空 
  134.         while (pool->queueSize == 0 && !pool->shutdown) 
  135.         { 
  136.             // 阻塞工作線程 
  137.             pthread_cond_wait(&pool->notEmpty, &pool->mutexPool); 
  138.  
  139.             // 判斷是不是要銷毀線程 
  140.             if (pool->exitNum > 0) 
  141.             { 
  142.                 pool->exitNum--; 
  143.                 if (pool->liveNum > pool->minNum) 
  144.                 { 
  145.                     pool->liveNum--; 
  146.                     pthread_mutex_unlock(&pool->mutexPool); 
  147.                     threadExit(pool); 
  148.                 } 
  149.             } 
  150.         } 
  151.  
  152.         // 判斷線程池是否被關(guān)閉了 
  153.         if (pool->shutdown) 
  154.         { 
  155.             pthread_mutex_unlock(&pool->mutexPool); 
  156.             threadExit(pool); 
  157.         } 
  158.  
  159.         // 從任務隊列中取出一個任務 
  160.         Task task; 
  161.         task.function = pool->taskQ[pool->queueFront].function
  162.         task.arg = pool->taskQ[pool->queueFront].arg; 
  163.         // 移動頭結(jié)點 
  164.         pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity; 
  165.         pool->queueSize--; 
  166.         // 解鎖 
  167.         pthread_cond_signal(&pool->notFull); 
  168.         pthread_mutex_unlock(&pool->mutexPool); 
  169.  
  170.         printf("thread %ld start working...\n", pthread_self()); 
  171.         pthread_mutex_lock(&pool->mutexBusy); 
  172.         pool->busyNum++; 
  173.         pthread_mutex_unlock(&pool->mutexBusy); 
  174.         task.function(task.arg); 
  175.         free(task.arg); 
  176.         task.arg = NULL
  177.  
  178.         printf("thread %ld end working...\n", pthread_self()); 
  179.         pthread_mutex_lock(&pool->mutexBusy); 
  180.         pool->busyNum--; 
  181.         pthread_mutex_unlock(&pool->mutexBusy); 
  182.     } 
  183.     return NULL
  184.  
  185. void* manager(void* arg) 
  186.     ThreadPool* pool = (ThreadPool*)arg; 
  187.     while (!pool->shutdown) 
  188.     { 
  189.         // 每隔3s檢測一次 
  190.         sleep(3); 
  191.  
  192.         // 取出線程池中任務的數(shù)量和當前線程的數(shù)量 
  193.         pthread_mutex_lock(&pool->mutexPool); 
  194.         int queueSize = pool->queueSize; 
  195.         int liveNum = pool->liveNum; 
  196.         pthread_mutex_unlock(&pool->mutexPool); 
  197.  
  198.         // 取出忙的線程的數(shù)量 
  199.         pthread_mutex_lock(&pool->mutexBusy); 
  200.         int busyNum = pool->busyNum; 
  201.         pthread_mutex_unlock(&pool->mutexBusy); 
  202.  
  203.         // 添加線程 
  204.         // 任務的個數(shù)>存活的線程個數(shù) && 存活的線程數(shù)<最大線程數(shù) 
  205.         if (queueSize > liveNum && liveNum < pool->maxNum) 
  206.         { 
  207.             pthread_mutex_lock(&pool->mutexPool); 
  208.             int counter = 0; 
  209.             for (int i = 0; i < pool->maxNum && counter < NUMBER 
  210.                 && pool->liveNum < pool->maxNum; ++i) 
  211.             { 
  212.                 if (pool->threadIDs[i] == 0) 
  213.                 { 
  214.                     pthread_create(&pool->threadIDs[i], NULL, worker, pool); 
  215.                     counter++; 
  216.                     pool->liveNum++; 
  217.                 } 
  218.             } 
  219.             pthread_mutex_unlock(&pool->mutexPool); 
  220.         } 
  221.         // 銷毀線程 
  222.         // 忙的線程*2 < 存活的線程數(shù) && 存活的線程>最小線程數(shù) 
  223.         if (busyNum * 2 < liveNum && liveNum > pool->minNum) 
  224.         { 
  225.             pthread_mutex_lock(&pool->mutexPool); 
  226.             pool->exitNum = NUMBER; 
  227.             pthread_mutex_unlock(&pool->mutexPool); 
  228.             // 讓工作的線程自殺 
  229.             for (int i = 0; i < NUMBER; ++i) 
  230.             { 
  231.                 pthread_cond_signal(&pool->notEmpty); 
  232.             } 
  233.         } 
  234.     } 
  235.     return NULL
  236.  
  237. void threadExit(ThreadPool* pool) 
  238.     pthread_t tid = pthread_self(); 
  239.     for (int i = 0; i < pool->maxNum; ++i) 
  240.     { 
  241.         if (pool->threadIDs[i] == tid) 
  242.         { 
  243.             pool->threadIDs[i] = 0; 
  244.             printf("threadExit() called, %ld exiting...\n", tid); 
  245.             break; 
  246.         } 
  247.     } 
  248.     pthread_exit(NULL); 

6. 測試代碼

 

  1. void taskFunc(void* arg) 
  2.     int num = *(int*)arg; 
  3.     printf("thread %ld is working, number = %d\n"
  4.         pthread_self(), num); 
  5.     sleep(1); 
  6.  
  7. int main() 
  8.     // 創(chuàng)建線程池 
  9.     ThreadPool* pool = threadPoolCreate(3, 10, 100); 
  10.     for (int i = 0; i < 100; ++i) 
  11.     { 
  12.         int* num = (int*)malloc(sizeof(int)); 
  13.         *num = i + 100; 
  14.         threadPoolAdd(pool, taskFunc, num); 
  15.     } 
  16.  
  17.     sleep(30); 
  18.  
  19.     threadPoolDestroy(pool); 
  20.     return 0; 

 

責任編輯:龐桂玉 來源: C語言與C++編程
相關(guān)推薦

2013-06-03 09:34:14

崩潰程序程序算法

2020-12-10 08:24:40

線程池線程方法

2020-03-05 15:34:16

線程池C語言局域網(wǎng)

2022-03-09 09:43:01

工具類線程項目

2009-05-29 09:48:05

Sandboxie瀏覽器

2011-10-21 15:33:45

Dart

2024-05-21 11:09:17

2021-02-01 08:28:24

Linux線程池Linux系統(tǒng)

2011-01-05 11:12:34

C++

2009-08-04 17:18:02

C#線程

2021-11-11 15:12:21

C語言線程代碼

2013-06-24 15:58:19

Windows 8.1Windows 8.1

2024-12-27 09:08:25

2023-05-19 08:01:24

Key消費場景

2018-09-25 14:35:06

zl

2013-04-09 12:18:45

socket.ioC服務器

2021-04-08 11:10:07

C語言版本Cmake

2022-09-06 08:31:09

線程池工具系統(tǒng)

2024-07-15 08:20:24

2024-05-06 00:00:00

ThreadPool線程調(diào)度
點贊
收藏

51CTO技術(shù)棧公眾號