如何在Python 的線程中運(yùn)行協(xié)程
在一篇文章理解Python異步編程的基本原理這篇文章中,我們講到,如果在異步代碼里面又包含了一段非常耗時(shí)的同步代碼,異步代碼就會(huì)被卡住。
那么有沒有辦法讓同步代碼與異步代碼看起來也是同時(shí)運(yùn)行的呢?方法就是使用事件循環(huán)的.run_in_executor()方法。
我們來看一下 Python 官方文檔[1]中的說法:
那么怎么使用呢?還是以非常耗時(shí)的遞歸方式計(jì)算斐波那契數(shù)列的這個(gè)函數(shù)為例:
- def sync_calc_fib(n):
- if n in [1, 2]:
- return1
- return sync_calc_fib(n - 1) + sync_calc_fib(n - 2)
- async def calc_fib(n):
- result = sync_calc_fib(n)
- print(f'第 {n} 項(xiàng)計(jì)算完成,結(jié)果是:{result}')
- return result
我們現(xiàn)在需要用 aiohttp 訪問一個(gè)延遲5秒的網(wǎng)頁(yè),同時(shí)計(jì)算斐波那契數(shù)列第36項(xiàng)。
首先我們看看單獨(dú)計(jì)算第36項(xiàng)需要5秒鐘:
我們?cè)賮砜纯慈绻苯影堰@計(jì)算斐波那契數(shù)列和請(qǐng)求網(wǎng)站的兩個(gè)異步任務(wù)放在一起“并行”,實(shí)際時(shí)間是兩個(gè)任務(wù)的時(shí)間疊加:
具體原因我在上一篇文章里面已經(jīng)做了說明。
現(xiàn)在,我想讓兩個(gè)任務(wù)“同時(shí)運(yùn)行”,于是就可以這樣修改代碼:
- import aiohttp
- import asyncio
- import time
- from concurrent.futures import ThreadPoolExecutor
- async def request(sleep_time):
- async with aiohttp.ClientSession() as client:
- resp = await client.get(f'http://127.0.0.1:8000/sleep/{sleep_time}')
- resp_json = await resp.json()
- print(resp_json)
- def sync_calc_fib(n):
- if n in [1, 2]:
- return 1
- return sync_calc_fib(n - 1) + sync_calc_fib(n - 2)
- def calc_fib(n):
- result = sync_calc_fib(n)
- print(f'第 {n} 項(xiàng)計(jì)算完成,結(jié)果是:{result}')
- return result
- async def main():
- start = time.perf_counter()
- loop = asyncio.get_event_loop()
- with ThreadPoolExecutor(max_workers=4) as executor:
- tasks_list = [
- loop.run_in_executor(executor, calc_fib, 36),
- asyncio.create_task(request(5))
- ]
- await asyncio.gather(*tasks_list)
- end = time.perf_counter()
- print(f'總計(jì)耗時(shí):{end - start}')
- asyncio.run(main())
運(yùn)行效果如下圖所示:
在5秒鐘的時(shí)間,就把計(jì)算斐波那契數(shù)列和請(qǐng)求5秒延遲的網(wǎng)站都做完了。
實(shí)現(xiàn)這樣的轉(zhuǎn)變,關(guān)鍵的代碼就是:loop.run_in_executor(executor, calc_fib, 36)
其中的 loop就是主線程的事件循環(huán)(event loop),它是用來調(diào)度同一個(gè)線程里面的多個(gè)協(xié)程。
executor是我們使用ThreadPoolExecutor(max_workers=4)創(chuàng)建的一個(gè)有4個(gè)線程的線程池,calc_fib是一個(gè)耗時(shí)的同步函數(shù),36是傳入calc_fib的參數(shù)。loop.run_in_executor(executor, calc_fib, 36)的意思是說:
- 把calc_fib函數(shù)放到線程池里面去運(yùn)行
- 給線程池增加一個(gè)回調(diào)函數(shù),這個(gè)回調(diào)函數(shù)會(huì)在運(yùn)行結(jié)束后的下一次事件循環(huán)把結(jié)果保存下來。
請(qǐng)注意上圖中紅色箭頭對(duì)應(yīng)的calc_fib這是一個(gè)同步函數(shù),請(qǐng)與上一篇文章中的異步函數(shù)區(qū)分開。run_in_executor的第二個(gè)參數(shù)需要是一個(gè)同步函數(shù)的函數(shù)名。
在上面的例子中,我們創(chuàng)建的是有4個(gè)線程的線程池。所以這個(gè)線程池最多允許4個(gè)阻塞式的同步函數(shù)“并行”。