一篇文章淺析Python自帶的線程池和進(jìn)程池
前言
大家好,我是星期八。
我們都知道,不管是Java,還是C++,還是Go,還是Python,都是有線程這個(gè)概念的。
但是我們知道,線程是不能隨便創(chuàng)建的,就像每招一個(gè)員工一樣,是有代價(jià)的,無限制招人肯定最后各種崩潰。
所以通常情況下,我們會(huì)引出線程池這個(gè)概念。
本質(zhì)就是我就招了幾個(gè)固定的員工,給他們派活,某一個(gè)人的活干完了再去任務(wù)中心領(lǐng)取新的活。
防止任務(wù)太多,一次性招太多工人,最后系統(tǒng)崩潰。
開心一刻
理想的多線程
實(shí)際的多線程
from concurrent.futures import ...
可能也是因?yàn)榫€程池這個(gè)東西用的越來越多了吧,從Python3.2+之后,就成了內(nèi)置模塊。
對(duì)的,直接就能使用,不需要pip進(jìn)行安裝什么的。
concurrent.futures下面主要有倆接口。
- ThreadPoolExecutor 線程池。
- ProcessPoolExecutor進(jìn)程池。
這里可沒有什么所謂的異步池。
個(gè)人看法:雖然異步的性能很高,但是目前除了Go以外,其他實(shí)現(xiàn)的都不是太好,用法上面有些怪異,當(dāng)然,你們可以說我菜,我承認(rèn)。
線程池
示例代碼
- import time
- from concurrent.futures import ThreadPoolExecutor
- import random
- # max_workers表示工人數(shù)量,也就是線程池里面的線程數(shù)量
- pool = ThreadPoolExecutor(max_workers=10)
- # 任務(wù)列表
- task_list = ["任務(wù)1", "任務(wù)2", "任務(wù)3", "任務(wù)4", ]
- def handler(task_name):
- # 隨機(jī)睡眠,模仿任務(wù)執(zhí)行時(shí)間不確定性
- n = random.randrange(5)
- time.sleep(n)
- print(f"任務(wù)內(nèi)容:{task_name}")
- if __name__ == '__main__':
- # 遍歷任務(wù),
- for task in task_list:
- """
- 交給函數(shù)處理,submit會(huì)將所有任務(wù)都提交到一個(gè)地方,不會(huì)阻塞
- 然后線程池里面的每個(gè)線程會(huì)來取任務(wù),
- 比如:線程池有3個(gè)線程,但是有5個(gè)任務(wù)
- 會(huì)先取走三個(gè)任務(wù),每個(gè)線程去處理
- 其中一個(gè)線程處理完自己的任務(wù)之后,會(huì)再來提交過的任務(wù)區(qū)再拿走一個(gè)任務(wù)
- """
- pool.submit(handler, task)
- print("main執(zhí)行完畢")
執(zhí)行結(jié)果
發(fā)現(xiàn)的問題
其實(shí)這個(gè)就是并發(fā)的,不要懷疑,但是你有沒有發(fā)現(xiàn)個(gè)問題,main先執(zhí)行,這說明啥?
這說明,我main跑完之后,是不管子線程的死活的。
那能不能設(shè)置一下,所有的子線程都執(zhí)行完之后,main函數(shù)在執(zhí)行完?
當(dāng)然可以,需要一個(gè)參數(shù)即可。
- pool.shutdown()
要完成上述的問題,我們需要一個(gè)參數(shù),加上這個(gè)參數(shù)之后。
就可以讓主線程等待所有子線程執(zhí)行完之后,主線程再執(zhí)行完。
示例代碼
- ...
- if __name__ == '__main__':
- # 遍歷任務(wù),
- for task in task_list:
- """
- 交給函數(shù)處理,submit會(huì)將所有任務(wù)都提交到一個(gè)地方
- 然后線程池里面的每個(gè)線程會(huì)來取任務(wù),
- 比如:線程池有3個(gè)線程,但是有5個(gè)任務(wù)
- 會(huì)先取走三個(gè)任務(wù),每個(gè)線程去處理
- 其中一個(gè)線程處理完自己的任務(wù)之后,會(huì)再來提交過的任務(wù)區(qū)再拿走一個(gè)任務(wù)
- """
- pool.submit(handler, task)
- pool.shutdown()
- print("main執(zhí)行完畢")
主要就是13行的pool.shutdown()。
執(zhí)行結(jié)果
這次結(jié)果就是我們想要的了,hhh!!!
- add_done_callback
add_done_callback可以理解為是回調(diào)函數(shù),線程執(zhí)行完之后,會(huì)自動(dòng)調(diào)用指定的回調(diào)函數(shù)。
并且能拿到線程執(zhí)行函數(shù)的返回值。
有什么用,我也沒用過,怪我才疏學(xué)淺叭。
示例代碼
- import time
- from concurrent.futures import ThreadPoolExecutor
- import random
- from concurrent.futures._base import Future
- # max_workers表示工人數(shù)量,也就是線程池里面的線程數(shù)量
- pool = ThreadPoolExecutor(max_workers=10)
- # 任務(wù)列表
- task_list = ["任務(wù)1", "任務(wù)2", "任務(wù)3", "任務(wù)4", ]
- def handler(task_name):
- # 隨機(jī)睡眠,模仿任務(wù)執(zhí)行時(shí)間不確定性
- n = random.randrange(5)
- time.sleep(n)
- print(f"任務(wù)內(nèi)容:{task_name}")
- return f"任務(wù)內(nèi)容:{task_name}"
- def done(res: Future):
- print("done拿到的返回值:", res.result())
- if __name__ == '__main__':
- # 遍歷任務(wù),
- for task in task_list:
- futrue = pool.submit(handler, task) # type:Future
- futrue.add_done_callback(done)
- pool.shutdown()
- print("main執(zhí)行完畢")
注意:第17,27,28行代碼!
執(zhí)行效果
我想,可能通常用在一些善后工作叭。
多進(jìn)程方式
其實(shí)通過上述幾個(gè)例子,我們基本是知道怎么使用上面這個(gè)線程池了。
但是都知道Python的線程,因?yàn)镚IL(全局解釋器鎖)的原因,是不能并發(fā)到多個(gè)物理核心上的。
所以是IO密集型的,像爬蟲,讀寫文件,使用線程池是ok的。
但是如果說我就是野,就是頭鐵,非要用Python做計(jì)算型應(yīng)用,像圖片壓縮、視頻流推送,那沒辦法,需要使用多進(jìn)程池方式。
其實(shí)通過concurrent這個(gè)接口,可以很方便的創(chuàng)建進(jìn)程池,只需要修改兩個(gè)地方。
- ...
- # 改成導(dǎo)入進(jìn)程池方式
- from concurrent.futures import ProcessPoolExecutor
- ...
- if __name__ == '__main__':
- ...
- # 進(jìn)程池方式
- pool = ProcessPoolExecutor(max_workers=10)
- ...
只需要修改這倆地方即可,其他和上述用法一摸一樣。
總結(jié)
本篇主要講的是Python自帶的線程池和進(jìn)程池。
比較有特色的是,ThreadPoolExecutor,ProcessPoolExecutor的接口是一樣的。
只需要修改導(dǎo)入的包就行。
concurrent的接口主要有pool.submit(),pool.shutdown(),futrue.add_done_callback()。
基本這幾個(gè)都?jí)蜃约河昧恕?/p>
本文轉(zhuǎn)載自微信公眾號(hào)「 Python爬蟲與數(shù)據(jù)挖掘」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系 Python爬蟲與數(shù)據(jù)挖掘公眾號(hào)。