自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

網(wǎng)工的Python之路:Concurrent.Futures

開發(fā) 后端
我在去年寫的兩篇專欄文章中已經(jīng)介紹過多線程(threading)和異步IO(asyncio),并向大家舉例講解了網(wǎng)工要如何將它們應(yīng)用在我們平常的網(wǎng)絡(luò)運維中來提升Python腳本的工作效率。這篇文章來介紹下另外一個可以實現(xiàn)并發(fā)編程的Python標(biāo)準(zhǔn)庫:concurrent.futures。

我在去年寫的兩篇專欄文章中已經(jīng)介紹過多線程(threading)和異步IO(asyncio),并向大家舉例講解了網(wǎng)工要如何將它們應(yīng)用在我們平常的網(wǎng)絡(luò)運維中來提升Python腳本的工作效率。這篇文章來介紹下另外一個可以實現(xiàn)并發(fā)編程的Python標(biāo)準(zhǔn)庫:concurrent.futures。

基本概念

網(wǎng)工在自學(xué)Python的時候肯定或多或少聽說過同步(Synchronous)、異步(Asynchronous)、單線程(Single Threaded)、多線程(Multi Threaded)、多進(jìn)程(Multiprocessing)、多任務(wù)(Multitasking) 、并發(fā)(Concurrent)、并行(Parallesim)、協(xié)程(Coroutine)、I/O密集型(I/O-bound)、CPU密集型(CPU-bound)等術(shù)語,如何區(qū)分它們對學(xué)習(xí)Python的網(wǎng)工來說是一個難點,開篇講concurrent.futures之前先把上述這些術(shù)語之間的關(guān)系和區(qū)別給大家大致捋一下:

1. 同步(Synchronous) VS 異步(Asynchronous)

所謂同步,可以理解為每當(dāng)系統(tǒng)執(zhí)行完一段代碼或者函數(shù)后,系統(tǒng)將一直等待該段代碼或函數(shù)返回的值或消息,直到系統(tǒng)接收到返回的值或消息后才繼續(xù)往下執(zhí)行下一段代碼或者函數(shù),在等待返回值或消息的期間,程序處于阻塞狀態(tài),系統(tǒng)將不做任何事情。而異步則恰恰相反,系統(tǒng)在執(zhí)行完一段代碼或者函數(shù)后,不用阻塞性地等待返回的值或消息,而是繼續(xù)執(zhí)行下一段代碼或函數(shù),在同一時間段里執(zhí)行多個任務(wù)(而不是傻傻地等著一件事情做完并且直到結(jié)果出來了以后才去做下件事情),將多個任務(wù)并發(fā)(注意不是并行),從而提高程序的執(zhí)行效率。如果你有讀過數(shù)學(xué)家華羅庚的《統(tǒng)籌方法》,一定不會對其中所舉的例子感到陌生:同樣是沏茶的步驟,因為燒水需要一段時間,你不用等水煮沸了過后才來洗茶杯、倒茶葉(類似“同步”),而是在等待燒水的過程中就把茶杯洗好,把茶葉倒好,等水燒開了就能直接泡茶喝了,這里燒水、洗茶杯、倒茶葉三個任務(wù)是在同一個時間段內(nèi)并發(fā)完成的,這就是一種典型的“異步”。對我們網(wǎng)工來說,paramiko, netmiko, telnetlib, pexpect, ciscolib等第三方模塊默認(rèn)都是基于同步的,基于異步的模塊有asyncio, asyncping, netdev等等(pexpect也支持異步,但是必須手動調(diào),默認(rèn)狀態(tài)下是同步)。

2. 線程(Thread) VS 進(jìn)程(Process)

所謂線程是指操作系統(tǒng)能夠進(jìn)行運算調(diào)度的最小單位。線程依托于進(jìn)程存在,是進(jìn)程中的實際運作單位,一個進(jìn)程可以有多個線程,每條線程可以并發(fā)執(zhí)行不同的任務(wù)。

3. 單線程(Single Threaded) VS 多線程 (Multi Threaded)

我們也可以引用同樣的例子來說明單線程和多線程的區(qū)別。在上面講到的華羅庚《統(tǒng)籌方法》里沏茶的這個例子中,如果只有一個人來完成燒水、洗茶杯、倒茶葉三項任務(wù)的話,因為此時只有一個勞動力,我們就可以把它看成是單線程(同步、異步IO都是基于單線程的)。假設(shè)我們能找來三個人分別負(fù)責(zé)燒水、洗茶杯、倒茶葉,那我們就可以把它看成是多線程,每一個勞動力代表一個線程,但是由于多線程的Global Interpreter Lock機(jī)制(俗稱的GIL全局鎖)的存在,實際上這三個勞動力并不是同時開工的,從并發(fā)的性能和效率的角度來看,多線程實際上是弱于基于單線程的異步IO的,這點我們已經(jīng)在之前的兩篇文章里通過實驗驗證了。

講到單線程和多線程,還需要講下異步IO和多線程之間的區(qū)別:

  • 異步IO是單線程,而多線程顧名思義就是多線程。
  • 異步IO和多線程的區(qū)別在于它們的機(jī)制不一樣,多線程使用的是搶占式多任務(wù)處理(Pre-emptive Multitasking) 。在這種搶占式環(huán)境下,操作系統(tǒng)本身具有掌控所有任務(wù)(也就是程序)的能力,能隨心所欲地剝奪每個任務(wù)的時間片來提供給其他任務(wù),也就是有一個幕后大boss掌控一切。而異步IO的機(jī)制為協(xié)作式多任務(wù)處理(Cooperative Multitasking), 這種機(jī)制沒有幕后大boss,在協(xié)作式環(huán)境下,每個任務(wù)被調(diào)度的前提是當(dāng)前任務(wù)主動放棄時間片。
  • 異步IO的核心是協(xié)程(Coroutine),這個是多線程不具備的。協(xié)程是一種輕量級線程,它是一種特殊的生成器函數(shù),它可以在return語句被執(zhí)行前停止該函數(shù)當(dāng)前正在執(zhí)行的任務(wù),并且能在一段時間內(nèi)間接地將執(zhí)行權(quán)交給另外一個協(xié)程函數(shù)。協(xié)程強(qiáng)調(diào)的是合作,而不是多線程強(qiáng)調(diào)的搶占,asyncio是Python中唯一支持協(xié)程的標(biāo)準(zhǔn)庫。

4. 并發(fā)(Concurrent) VS 并行 (Parallesim)

并發(fā)是一個籠統(tǒng)的概念,在Python里,在邏輯上同時發(fā)生的任務(wù)有多種稱謂:多線程,異步IO(多任務(wù)),多進(jìn)程,它們都是并發(fā)的一種。深入地說,只有調(diào)用多核CPU的多進(jìn)程(Multiprocessing)是用來處理在物理上同時發(fā)生的任務(wù)的,這個叫并行。基于單核CPU的多線程和異步IO(多任務(wù))同一時間內(nèi)只能處理一件事件(但是它們有自己獨特的機(jī)制來加快處理不同事件的能力),這個叫做并發(fā)。

借用某知乎網(wǎng)友舉的例子來說明同步、并發(fā)、并行三者之間的區(qū)別。

當(dāng)你吃飯的時候突然有人給你打電話,如果此時你:

  • 不接聽電話,繼續(xù)吃飯,等把飯吃完過后再來回電話,這個叫做同步。
  • 接聽電話后放下筷子停止進(jìn)食,等通話完畢后再接著吃,這個叫做并發(fā)。
  • 接聽電話的同時繼續(xù)進(jìn)食,這個叫做并行。

綜上,并行是并發(fā)的一種,但是并發(fā)并不等于并行。

5. I/O密集型(I/O bound) VS CPU密集型(CPU bound)

I/O密集型(I/O bound) 是指不會特別消耗 CPU 資源,但是I/O比較頻繁的任務(wù)和操作,比如文件的讀寫、網(wǎng)絡(luò)通信、數(shù)據(jù)庫訪問等等。

CPU密集型(CPU bound)是指需要大量耗費CPU資源的任務(wù)和操作,比如計算、解壓縮、加密解密等等。

異步和多線程適合I/O密集型場景, 多進(jìn)程適合CPU密集型場景。

上述內(nèi)容可以歸納總結(jié)成下表:

  • 并發(fā)類型切換機(jī)制CPU數(shù)量適用場景代表Python庫多線程(搶占式多任務(wù)處理)操作系統(tǒng)決定何時切換任務(wù)1個I/O密集型_thread(已淘汰), threading,
  • cocurrent.futures, nornir異步(協(xié)作式多任務(wù)處理)任務(wù)本身決定何時切換1個I/O密集型asyncio, netdev, aiohttp, aioping, gevent,
  • tornado, twisted多進(jìn)程 (并行)所有任務(wù)同時運行多個CPU密集型multiprocessing

好了,說了那么多下面進(jìn)入本篇正文:concurrent.futures。

什么是Concurrent.futures

Concurrent.futures是Python中的一個標(biāo)準(zhǔn)庫,顧名思義它是并發(fā)編程的一種,根據(jù)Python官方的定義,concurrent.futures是一種高級接口,它同時融合了多線程和多進(jìn)程的特點,并將兩者簡化。Concurrent.futures從Python3.2中被引入,它的誕生時間晚于threading和multiprocessing兩個標(biāo)準(zhǔn)庫,但是早于誕生于Python3.4的asyncio標(biāo)準(zhǔn)庫。

Future對象

在concurrent.futures中引入了future這個對象,關(guān)于future的中文翻譯目前為止我聽說過未來、期程等,但還沒有一個統(tǒng)一的說法(Python中文官方文檔上也沒有說明),所以這里我們還是用future來講。

主線程(或進(jìn)程)可以通過future對象獲取某一個線程(進(jìn)程)執(zhí)行的狀態(tài)或者某一個任務(wù)執(zhí)行的狀態(tài)及返回值。

執(zhí)行器對象

Concurrent.futures中還有一個重要的對象叫做執(zhí)行器(Executor),分為ThreadPoolExecutor和ProcessPoolExecutor兩種,你基本可以把它倆看成是multiprocessing庫中的線程池和進(jìn)程池(支持多進(jìn)程的multiprocessing標(biāo)準(zhǔn)庫以前沒講過,我準(zhǔn)備下篇文章中再講),前面提到了,concurrent.futures相較于multiprocessing以及threading兩個庫來說它的優(yōu)勢在于其語法更簡單,學(xué)習(xí)成本更低。

理論的東西先講到這里,接下來直接做實驗說明concurrent.futures怎么用,為了做對比,我會用單線程同步、threading、concurrent.futures分別舉三個例子。首先來看最原始的單線程同步:

1. 單線程同步實驗:

 

  1. import time 
  2.  
  3. def do_something(): 
  4.     print ('休眠1秒'
  5.     time.sleep(1) 
  6.  
  7. start_time = time.perf_counter() 
  8. do_something() 
  9. do_something() 
  10. end_time = time.perf_counter()-start_time 
  11.  
  12. print (f'總共耗時{round(end_time, 2)}秒'

這里我們自定義一個叫做do_something()的函數(shù),它的任務(wù)很簡單,就是打印出內(nèi)容“休眠1秒”,然后使用time.sleep(1)來讓程序休眠1秒。然后我們調(diào)用兩次do_something()函數(shù),打印出耗時,因為是單線程同步,所以兩次執(zhí)行do_something()的總耗時為2.01秒。

 

網(wǎng)工的Python之路:Concurrent.Futures

2. Threading實驗

 

  1. import threading 
  2. import time 
  3.  
  4. def do_something(): 
  5.     time.sleep(1) 
  6.  
  7. start_time = time.perf_counter() 
  8.  
  9. threads = [] 
  10. for i in range(1,11): 
  11.     t = threading.Thread(target=do_something, name=f'線程{str(i)}'
  12.     print (f'{t.name}開始運行'
  13.     print ('休眠1秒'
  14.     t.start() 
  15.     threads.append(t) 
  16. for thread in threads: 
  17.     thread.join() 
  18.  
  19. end_time = time.perf_counter()-start_time 
  20.  
  21. print (f'總共耗時{round(end_time, 2)}秒'

這里我們用threading來總共執(zhí)行10次do_something(),如果按單線程同步的方法的話,總計會耗費10秒+才能完成,而通過threading模塊我們使用多線程讓這10次do_something()并發(fā)執(zhí)行,所以僅僅只用到了1.05秒便宣告完成。

 

網(wǎng)工的Python之路:Concurrent.Futures

3. Concurrent.futures實驗(分為三種代碼)

因為涉及到不同的知識點,Concurrent.futures實驗的代碼我將分三種來寫,首先來看第一段代碼:

 

  1. from concurrent.futures import ThreadPoolExecutor 
  2. import time 
  3.  
  4. def do_something(seconds): 
  5.     print (f'休眠{seconds}秒'
  6.     time.sleep(seconds) 
  7.     return '休眠完畢' 
  8.  
  9. start_time = time.perf_counter() 
  10.  
  11. executor = ThreadPoolExecutor() 
  12. f1 = executor.submit(do_something, 1)  
  13. f2 = executor.submit(do_something, 1) 
  14. print (f1.result())  
  15. print (f2.result())  
  16. print (f'task1是否完成: {f1.done()}'
  17. print (f'task2是否完成: {f1.done()}'
  18.  
  19. end_time = time.perf_counter()-start_time 
  20.  
  21. print (f'總共耗時{round(end_time,2)}秒'

代碼講解(只講和concurrent.futures有關(guān)的知識點):

這里我們使用from concurrent.futures import ThreadPoolExecutor來調(diào)用concurrent.futures的線程池處理器對象

  1. from concurrent.futures import ThreadPoolExecutor 

這里注意我們在do_something()函數(shù)后面加了參數(shù)seconds,并在最后面加了一個return '休眠完畢',它們的作用等會兒會講到:

 

  1. def do_something(seconds): 
  2.     print (f'休眠{seconds}秒'
  3.     time.sleep(seconds) 
  4.     return '休眠完畢' 

在concurent.futures中,ThreadPoolExecutor是Executor下面的兩個子類之一(另一個是ProcessPoolExecutor),它使用線程池來執(zhí)行異步調(diào)用,這里我們將ThreadPoolExecutor()賦值給一個叫做executor的變量。

  1. executor = ThreadPoolExecutor() 

然后我們使用ThreadPoolExecutor下面的submit()函數(shù)來創(chuàng)建線程,submit()函數(shù)中包含了要調(diào)用的任務(wù),即do_something(),以及該函數(shù)要調(diào)用的參數(shù)(也就是dosmeting()里面的seconds),這里我們放1,表示休眠一秒鐘,所以寫成submit(do_something, 1),因為submit()函數(shù)返回的值為future類型的對象,所以這里我們把future簡寫為f, 分別賦值給f1和f2兩個變量,表示并發(fā)執(zhí)行兩次do_something()函數(shù)。

 

  1. f1 = executor.submit(do_something, 1) 
  2. f2 = executor.submit(do_something, 1) 

前面講到了,future對象的作用是幫助主線程(或進(jìn)程)獲取某一個線程(進(jìn)程)執(zhí)行的狀態(tài)或者某一個任務(wù)執(zhí)行的狀態(tài)及返回值,為了向大家演示,這里我對f1和f2兩個future對象分別調(diào)用了result()和done()兩個函數(shù)并將它們的結(jié)果打印出來。

 

  1. print (f1.result())  
  2. print (f2.result())  
  3. print (f'task1是否完成: {f1.done()}'
  4. print (f'task2是否完成: {f1.done()}'

在future中,result()的作用是告知你任務(wù)走到了哪一步,是否有異常,如果任務(wù)沒有異常正常完成的話,那么result()會返回自定義函數(shù)下面return的內(nèi)容(也就是我們do_someting()最下面的return'休眠完畢'),如果任務(wù)執(zhí)行過程中遇到異常 ,那么result()則會返回異常的具體內(nèi)容。 done()則返回一個布爾值,來告訴你任務(wù)是否完成,如果完成,則返回True,反之則返回False。

接下來看腳本運行效果:

 

網(wǎng)工的Python之路:Concurrent.Futures

可以看到同步需要2秒+完成的兩次任務(wù)通過concurrent.futures縮短為1.02秒完成(這個時間不定,如果你多跑腳本幾次,你會看到1.01秒,1.02秒,1.03秒,1.04秒等幾種,這個和當(dāng)前電腦的性能有關(guān)系)。注意這里的兩個“休眠完畢”是print (f1.result()) 和print (f2.result())打印出來的, “task1是否完成: True”和“task2是否完成: True”是 print (f'task1是否完成: {f1.done()}')和print (f'task2是否完成: {f1.done()}')打印出來的。

接下來我們再看concurrent.futures的第二段實驗代碼:

 

  1. from concurrent.futures import ThreadPoolExecutor, as_completed 
  2. import time 
  3.  
  4. def do_something(seconds): 
  5.     print (f'休眠{seconds}秒'
  6.     time.sleep(seconds) 
  7.     return '休眠完畢' 
  8.  
  9. start_time = time.perf_counter() 
  10.  
  11. executor=ThreadPoolExecutor() 
  12. results = [executor.submit(do_something, 1) for i in range(10)] 
  13. for f in as_completed(results): 
  14.     print (f.result()) 
  15.  
  16. end_time = time.perf_counter()-start_time 
  17.  
  18. print (f'總共耗時{round(end_time,2)}秒'

代碼講解(只講和concurrent.futures有關(guān)的知識點):

這里我們從concurrent.futures中新導(dǎo)入了一個函數(shù)叫做as_completed,它的作用后面會講到。

  1. from concurrent.futures import ThreadPoolExecutor, as_completed 

第一段代碼缺乏靈活性,因為我們是通過手動的方式創(chuàng)建了f1和f2兩個線程,如果我們要并發(fā)運行do_something()這個任務(wù)100次,顯然我們不可能去手動創(chuàng)建f1, f2, f3......f100這100個變量。這里我們可以用list comprehension的方式創(chuàng)建一個列表,讓do_something()這個函數(shù)并發(fā)運行10次。

  1. results = [executor.submit(do_something, 1) for i in range(10)] 

在concurrent.futures中,as_completed(fs)函數(shù)的作用是針對給定的 future 迭代器 fs,在其完成后,返回完成后的迭代器(類型仍然為future)。這里的fs即為我們創(chuàng)建的列表results。因為concurrent.futures.as_completed(results)返回的值是迭代器,因此我們可以使用for循環(huán)來遍歷它,然后對其中的元素(均為future類型)調(diào)用前面講到的result()函數(shù)并打印

 

  1. for f in as_completed(results): 
  2.     print (f.result()) 

執(zhí)行代碼看效果,可以看到10次do_something()任務(wù)1.06秒便完成了。

 

網(wǎng)工的Python之路:Concurrent.Futures

concurrent.futures的第三段實驗代碼:

 

  1. from concurrent.futures import ThreadPoolExecutor 
  2. import time 
  3.  
  4. def do_something(seconds): 
  5.     print (f'休眠{seconds}秒'
  6.     time.sleep(seconds) 
  7.     return '休眠完畢' 
  8.  
  9. start_time = time.perf_counter() 
  10.  
  11. executor=ThreadPoolExecutor() 
  12. sec = [5,4,3,2,1] 
  13. results = executor.map(do_something, sec) 
  14. for result in results: 
  15.  print (result) 
  16.  
  17. end_time = time.perf_counter()-start_time 
  18.  
  19. print (f'總共耗時{round(end_time,2)}秒'

代碼講解(只講和concurrent.futures有關(guān)的知識點):

  • 除了通過list comprehension來指定N次并發(fā)運行do_something(seconds)外,我們還可以通過concurrent.futures.ThreadPoolExecutor()下面的map()函數(shù)來達(dá)到目的,map()函數(shù)和submit()函數(shù)的用法類似,都可以用來創(chuàng)建線程,然后并發(fā)執(zhí)行任務(wù)并返回future對象,但是它比submit()函數(shù)更靈活。它們的區(qū)別是:map()函數(shù)傳入的第二個參數(shù)為一個可遍歷的對象,這個可遍歷的對象里的元素可以用來作為函數(shù)的參數(shù)。比如說這里我們定義了sec = [5,4,3,2,1]這個列表,該列表作為map()函數(shù)的第二個參數(shù)被傳入(executor.map(do_something, sec)),因為該列表總共有5個元素,因此我們這里創(chuàng)建并且并發(fā)了5個線程來分5次執(zhí)行do_something(seconds),第一次列表中的元素5作為參數(shù)被傳入do_something(seconds), 也就是第一個線程執(zhí)行后將休眠5秒,第二次列表中的元素4作為參數(shù)被傳入do_something(seconds), 也就是第二個線程執(zhí)行后將休眠4秒,以此類推。

 

  1. executor=ThreadPoolExecutor() 
  2. sec = [5,4,3,2,1] 
  3. results = executor.map(do_something, sec) 

接下來看腳本運行效果:因為5次任務(wù)是并發(fā)執(zhí)行的,所以程序消耗了5秒,4秒,3秒,2秒,1秒中的最大值,總共耗時5.03秒完成。

 

網(wǎng)工的Python之路:Concurrent.Futures

 

責(zé)任編輯:未麗燕 來源: 今日頭條
相關(guān)推薦

2024-01-17 12:44:23

Python并發(fā)編程

2024-12-24 08:03:56

2011-03-15 09:10:48

Concurrent

2021-11-22 12:35:14

NATIPv4網(wǎng)絡(luò)

2023-10-30 23:25:48

FuturesGo語言

2010-02-22 10:32:00

技術(shù)人員求職

2017-08-17 11:36:10

SDN運營商數(shù)據(jù)中心

2009-05-22 09:57:47

2017-01-17 15:30:32

網(wǎng)絡(luò)工程師SD-WAN網(wǎng)絡(luò)

2020-03-30 17:09:12

網(wǎng)絡(luò)SDN云計算

2009-01-05 15:55:00

軟考網(wǎng)工負(fù)載均衡

2009-03-06 10:28:30

MVCASP.NET異步Action

2020-11-12 13:05:28

航天科工管理建設(shè)

2025-01-22 12:45:00

WindowsCMD網(wǎng)絡(luò)操作命令

2009-01-05 15:55:00

軟考網(wǎng)工負(fù)載均衡

2010-02-23 22:04:06

2023-08-15 09:39:15

2015-12-03 15:00:18

寬帶集群通信共網(wǎng)發(fā)展

2010-08-02 17:30:06

網(wǎng)管美信MXsoft

2016-05-19 11:16:01

點贊
收藏

51CTO技術(shù)棧公眾號