理解Python asyncio內(nèi)部實現(xiàn)機(jī)制
協(xié)程 (coroutine) 幾乎是 Python 里最為復(fù)雜的特性之一了,這篇文章我們來說一說 asyncio 的內(nèi)部實現(xiàn)機(jī)制,借此來理解一門語言要支持協(xié)程需要做的工作。
本文需要提前了解 Python 的 yeild from 語法,不了解的話,可以看看 之前關(guān)于 Generator 的文章 ;另外,***對 future/promise 的概念有一定了解。文中不會介紹如何使用 asyncio 及協(xié)程,并且文中給出的代碼不一定能實際運行(不然代碼量太大)。
多線程與協(xié)程
CPU 的執(zhí)行是順序的,線程是操作系統(tǒng)提供的一種機(jī)制,允許我們在操作系統(tǒng)的層面上實現(xiàn)“并行”。而協(xié)程則可以認(rèn)為是應(yīng)用程序提供的一種機(jī)制(用戶或庫來完成),允許我們在應(yīng)用程序的層面上實現(xiàn)“并行”。
由于本質(zhì)上程序是順序執(zhí)行的,要實現(xiàn)這種“并行”的假像,我們需要一種機(jī)制,來“暫停”當(dāng)前的執(zhí)行流,并在之后“恢復(fù)”之前的執(zhí)行流。這在操作系統(tǒng)及多線程/多進(jìn)程中稱為“上下文切換” (context switch)。其中“上下文”記錄了某個線程執(zhí)行的狀態(tài),包括線程里用到的各個變量,線程的調(diào)用棧等。而“切換”指的就是保存某個線程當(dāng)前的運行狀態(tài),之后再從之前的狀態(tài)中恢復(fù)。只不過線程相關(guān)的工作是由操作系統(tǒng)完成,而協(xié)程則是由應(yīng)用程序自己來完成。
與線程不同的時,協(xié)程完成的功能通常較小,所以會有需求將不同的協(xié)程串起來,我們暫時稱它為協(xié)程鏈 (coroutine chain)。
那么,與線程類似,要實現(xiàn)一個協(xié)程的庫,我們需要這幾樣?xùn)|西:
- 事件循環(huán) (event loop)。一方面,它類似于 CPU ,順序執(zhí)行協(xié)程的代碼;另一方面,它相當(dāng)于操作系統(tǒng),完成協(xié)程的調(diào)度,即一個協(xié)程“暫停”時,決定接下來執(zhí)行哪個協(xié)程。
- 上下文的表示。在 Python 中,我們使用 Python 本身支持的生成器 Generator 來代表基本的上下文,但協(xié)程鏈?zhǔn)侨绾喂ぷ鞯哪?
- 上下文的切換。最基礎(chǔ)的切換也是通過 Python 生成器的 yeild 加強(qiáng)版語法來完成的,但我們還要考慮協(xié)程鏈的情況。
Event Loop
首先,因為協(xié)程是一種能暫停的函數(shù),那么它暫停是為了什么?一般是等待某個事件,比如說某個連接建立了;某個 socket 接收到數(shù)據(jù)了;某個計時器歸零了等。而這些事件應(yīng)用程序只能通過輪詢的方式得知是否完成,但是操作系統(tǒng)(所有現(xiàn)代的操作系統(tǒng))可以提供一些中斷的方式通知應(yīng)用程序,如 select , epoll , kqueue 等等。
那么有了操作系統(tǒng)的支持,我們就可以手寫這樣的循環(huán)(偽代碼):
- while True
- happend = poll_events(events_to_listen, timeout)
- process_events(happend)
***個問題是:如何注冊我們想監(jiān)聽的事件?很簡單,把事件加到 events_to_listen 里就可以了。第二個問題,可以監(jiān)聽什么事件?由于 process_events 需要操作系統(tǒng)的支持,那么我們想監(jiān)聽的事件是需要操作系統(tǒng)支持才行的,一般操作系統(tǒng)支持網(wǎng)絡(luò) I/O 的文件描述符 (file descriptor)。
接下來,當(dāng)事件發(fā)生時,我們要指定做一些事,一般稱為回調(diào) (callback)。也就是說我們需要告訴 event loop 一個 事件:回調(diào) 的對應(yīng)關(guān)系?,F(xiàn)在我們把 event loop 用類表示:
- class EventLoop:
- def __init__(self):
- self.events_to_listen = []
- self.callbacks = {}
- self.timeout = None
- def register_event(self, event, callback):
- self.events_to_listen.append(event)
- self.callbacks[event] = callback
- def unregister_event(self, event):
- self.events_to_listen.remove(evenrt)
- del self.callbacks[event]
- def _process_events(self, events):
- for event in events:
- self.callbacks[event](event)
- def start_loop(self):
- while True:
- events_happend = poll_events(self.events_to_listen, timeout)
- self._process_events(events_happend)
- loop = EventLoop()
- loop.register_event(fd, callback)
- loop.start_loop()
register_event 用到注冊 事件: 回調(diào) 的關(guān)系, start_loop 用于開啟事件循環(huán)。
現(xiàn)在,你不是想說,之前提到過事件也包括“某個計時器歸零了”,但 poll_events 只支持網(wǎng)絡(luò) I/O 的文件描述符,計時器又要如何實現(xiàn)呢?一般 poll_events 函數(shù)是支持 timeout 參數(shù)表示等待的時間。因此,可以修改 start_loop :
- def call_later(self, delay, callback):
- self.call_at(now() + delay, callback)
- def call_at(self, when, callback):
- self.timeout_callbacks[when] = callback
- def start_loop(self):
- while True:
- timeout = min(self.timeout_callbacks.keys()) - now()
- events_happend = poll_events(self.events_to_listen, timeout)
- if not empty(events_happend):
- self._process_events(events_happend)
- self._process_timeout_events()
- def _process_timeout_events(self):
- time_now = now()
- for time, callback in self.timeout_callbacks.iteritems():
- if time < time_now:
- callback()
- del self.timeout_callbacks[time]
這里 poll_events 之前,會去計算所有計時器事件最少需要等待的時間,這個時間內(nèi)即使沒有事件發(fā)生, poll_events 也會退出,以便觸發(fā)計時器事件。 _process_timeout_events 函數(shù)的作用是對比當(dāng)前時間與計時器的目標(biāo)執(zhí)行時間,如果目標(biāo)執(zhí)行時間已經(jīng)到達(dá),則執(zhí)行相應(yīng)的回調(diào)函數(shù)。
于是一個簡單的 event loop 就完成了??梢钥吹剑钱惒讲僮鞯幕A(chǔ):允許等待某個事件的發(fā)生并執(zhí)行相應(yīng)的操作。同時,它還是個簡單的調(diào)度器,能順序地執(zhí)行發(fā)生事件的回調(diào)函數(shù)。
Callback vs Promise vs await
好了,現(xiàn)在我們有了 event loop ,它允許我們?yōu)槭录曰卣{(diào)函數(shù)。現(xiàn)在假設(shè)我們要順序調(diào)用幾個 API, 用阻塞式編程如下:
- result1 = api1()
- result2 = api2(result1)
- result3 = api3(result2)
- ...
如果這幾個 API 都是異步的,用 event loop + callback 怎么實現(xiàn)?
- # Implementation for api
- def api1(callback):
- def callback_for_api1():
- result1 = some_calculation_1()
- event_loop.unregister_event(event1)
- return callback(result1)
- event_loop.register_event(event1, callback_for_api1)
- def api2(result, callback):
- def callback_for_api2():
- result2 = some_calculation_2(result)
- event_loop.unregister_event(event2)
- return callback(result2)
- event_loop.register_event(event2, callback_for_api2)
- ...
- # Our code
- global result
- def api1_callback(result1):
- def api2_callback(result2):
- def api3_callback(result3):
- global result
- result = some_calculation(result3)
- return api3(result2, api3_callback)
- return api2(result1, api2_callback)
- api1(api1_callback)
這里 api1 api2 的實現(xiàn)由于需要用 event loop 來注冊注銷某些事件,所以顯得特別復(fù)雜,這里我們可以先忽略它們的實現(xiàn),但是看***一段“用戶代碼”是不是極其復(fù)雜?隨著操作的復(fù)雜性增加,回調(diào)函數(shù)的嵌套會越變越深。如果你熟悉Javascript,你應(yīng)該聽過“callback hell”的大名?;卣{(diào)函數(shù)的方式為什么不好?最重要的就是它違反了我們寫代碼的直覺,我們都習(xí)慣順序執(zhí)行的代碼。
例如上例中,我們期待的是 api1 先執(zhí)行,我們再用它的結(jié)果做點什么,但采用回調(diào)的方式,我們就需要在寫 api1 的回調(diào)時,就去思考我們想用它的結(jié)果做些什么操作。在這個例子里,我們需要調(diào)用 api2 及 api3 ,這些嵌套的思考又得一遍遍重復(fù)下去。最終代碼非常難以理解。
因此 Javascript 提出了 Promise ,所謂的 promise 像是一個占位符,它表示一個運算現(xiàn)在還未完成,但我保證它會做完的;你可以指定它完成的時候做些其它的事。下面我們嘗試用這個思路去做一些改進(jìn)(Python 沒有原生的 promise 支持):
- class Promise():
- def __init__(self):
- pass
- def then(self, callback_that_return_promise):
- self._then = callback_that_return_promise
- def set_result(self, result):
- return self._then(result)
- # Implementation for api
- def api1():
- promise = Promise()
- def callback_for_api1():
- promise.set_result(some_calculation_1())
- event_loop.unregister_event(event1)
- event_loop.register_event(event1, callback_for_api1)
- return promise
- def api2(result):
- promise = Promise()
- def callback_for_api2():
- promise.set_result((some_calculation_2(result))
- event_loop.unregister_event(event2)
- return callback(result2)
- return promise
- ...
- # Our code
- global result
- promise = api1().then(lambda result1: return api2(result1))
- .then(lambda result2: return api3(result3))
- .then(lambda result3: global result; result = result3)
- promise.wait_till_complete()
這里我們簡單實現(xiàn)了一個我們自己的 Promise 類,當(dāng)它的 set_result 方法被調(diào)用時,Promise 會去執(zhí)行之前用 .then 注冊的回調(diào)函數(shù),該回調(diào)函數(shù)將執(zhí)行另一些操作并返回一個新的 Promise。也因此,我們可以不斷地調(diào)用 then 將不同的 Promise 組合起來??梢钥吹?,現(xiàn)在我們的代碼就是線性的了!
然而故事還沒有結(jié)束,人們依舊不滿于 Promise 的寫法和用法,又提出了 async/await 的寫法。在 Python 中,上面的代碼用 async/await 重寫如下:
- result1 = await api1()
- result2 = await api2(result1)
- result3 = await api3(result2)
是不是簡單明了?它的效果和我們前幾個例子是等價的,但它的寫法與我們初開始的阻塞版本幾乎一致。這樣能把異步與同步的編碼在結(jié)構(gòu)上盡量統(tǒng)一起來。
這里我不禁想問,為什么大家沒有一開始就想到 async/await 的方式呢?我的一個假設(shè)是 async/await 是需要語言本身的支持的,而寫編譯器/解釋器的專家不一定有編寫應(yīng)用的豐富經(jīng)驗,是很可能從一開始就拒絕這樣的修改的。因此程序員們只能自己用庫的形式添加支持了。當(dāng)然這純粹是猜測,只想感嘆下不同領(lǐng)域的隔閡。
總而言之,有了 event loop 我們就能通過回調(diào)函數(shù)來完成異步編程,但這種方式非常不友好,因此人們又提出了類似 Promise 的思想,讓我們能順序編寫異步代碼,***通過語言對 async/await 的語法支持,異步與同步代碼的結(jié)構(gòu)就幾乎達(dá)到統(tǒng)一。這種統(tǒng)一有很重要的意義,它使我們能以同步的思維去理解異步的代碼而不受回調(diào)方式的代碼結(jié)構(gòu)的影響。
而這一切都是為了將不同的異步函數(shù)“鏈接”起來,只不過是 async/await 的方式最為方便。對比線程,操作系統(tǒng)是沒有提供方式將不同的線程鏈接起來的,因此這種將不同的協(xié)程鏈接起來的工具是協(xié)程比線程好的一個方面。
上下文切換(恢復(fù)控制流)
前面提到過,如果某個協(xié)程在等待某些資源,我們需要暫停它的執(zhí)行,在 event loop 中注冊這個事件,以便當(dāng)事件發(fā)生的時候,能再次喚醒該協(xié)程的執(zhí)行。
這里舉一個 Python 官方文檔 的例子:
- import asyncio
- async def compute(x, y):
- print("Compute %s + %s ..." % (x, y))
- await asyncio.sleep(1.0)
- return x + y
- async def print_sum(x, y):
- result = await compute(x, y)
- print("%s + %s = %s" % (x, y, result))
- loop = asyncio.get_event_loop()
- loop.run_until_complete(print_sum(1, 2))
- loop.close()
上面的代碼的執(zhí)行流程是:
這里有兩個問題:
- 誰向 event loop 注冊了事件(及回調(diào))?
- 程序從哪里恢復(fù)執(zhí)行?
程序從 print_sum 開始執(zhí)行,執(zhí)行到 asyncio.sleep 時需要暫停,那么肯定是在 sleep 中向 event loop 注冊了計時器事件。那們問題來了,當(dāng)程序恢復(fù)執(zhí)行時,它應(yīng)該從哪里恢復(fù)呢?
從上面的流程圖中,可以看見它是從 print_sum 開始恢復(fù),但這樣的話, sleep 注冊事件時就需要知道是誰(即 print_sum )調(diào)用了它,這樣才能在 callback 中指定從 print_sum 開始恢復(fù)執(zhí)行!
但如果不是從 print_sum 恢復(fù)執(zhí)行,那么一樣的,從 sleep 恢復(fù)執(zhí)行后, sleep 需要知道接下來返回到什么位置(即 compute 函數(shù)中的 await 位置), asyncio 又是如何做到這點的?
那么事實(代碼實現(xiàn))是怎樣的呢?
當(dāng)我們把一個協(xié)程用 loop.run_until_complete (或其它相似方法)執(zhí)行時, event loop 會把它包裹成一個 Task 。當(dāng)協(xié)程開始執(zhí)行或被喚醒時,Task 的 _step 方法會被調(diào)用, 這里 它會調(diào)用 coro.send(None) 來執(zhí)行/喚醒它包裹著的協(xié)程。
- if exc is None:
- # We use the `send` method directly, because coroutines
- # don't have `__iter__` and `__next__` methods.
- result = coro.send(None)
- else:
- result = coro.throw(exc)
注意到這里將 coro.send 的結(jié)果賦值給了 result ,那么它會返回什么呢?在我們這個例子中,協(xié)程鏈的最末尾是 asyncio.sleep ,我們看看 它的實現(xiàn) :
- @coroutine
- def sleep(delay, result=None, *, loop=None):
- """Coroutine that completes after a given time (in seconds)."""
- if delay == 0:
- yield
- return result
- if loop is None:
- loop = events.get_event_loop()
- future = loop.create_future()
- h = future._loop.call_later(delay,
- futures._set_result_unless_cancelled,
- future, result)
- try:
- return (yield from future)
- finally:
- h.cancel()
這里它創(chuàng)建了一個 future 并為它注冊了事件( call_later ),最終調(diào)用了 yield from future 返回。它代表什么呢?我們已經(jīng)假設(shè)你明白 yield from 的使用方法,這代表 Python 會首先調(diào)用 future.__iter__ 函數(shù),我們來看看 它長什么樣 :
- def __iter__(self):
- if not self.done():
- self._asyncio_future_blocking = True
- yield self # This tells Task to wait for completion.
- assert self.done(), "yield from wasn't used with future"
- return self.result() # May raise too.
- if compat.PY35:
- __await__ = __iter__ # make compatible with 'await' expression
注意這里的 yield self !也就是說 future 在***次執(zhí)行到這里時,會暫停執(zhí)行并返回它自己,由于 coroutine 中使用的都是 yield from/await (它們在接收的參數(shù)上有區(qū)別,但在本文的討論中沒有區(qū)別),因此這個值會一直向上傳遞,到 Task._step 函數(shù)的 result = coro.send(None) 這里,那我們來看看 Task 對 result 做了什么,重要的是 這一句 :
- result.add_done_callback(self._wakeup)
也就是說 task( print_sum ) 得到了最內(nèi)層暫停的 sleep 生成的 future 并為該 future 注冊了一個回調(diào),使得在 future.set_result 被調(diào)用時, task._wakeup 會被調(diào)用。這部分的邏輯可以看 這里 。
我們再回過頭來看看 future.set_result 會在什么時候被調(diào)用,在 asyncio.sleep 函數(shù)里,我們?yōu)?event loop 注冊了一個回調(diào)函數(shù):
- h = future._loop.call_later(delay,
- futures._set_result_unless_cancelled,
- future, result)
那么這個 _set_result_unless_cancelled 是這樣的:
- def _set_result_unless_cancelled(fut, result):
- """Helper setting the result only if the future was not cancelled."""
- if fut.cancelled():
- return
- fut.set_result(result)
因此,所有的流程應(yīng)該是這樣的:
小結(jié)
那么 asyncio 做為一個庫,做了什么,沒做什么?
- 控制流的暫停與恢復(fù),這是通過 Python 內(nèi)部的 Generator(生成器)相關(guān)的功能實現(xiàn)的。
- 協(xié)程鏈,即把不同協(xié)程鏈鏈接在一起的機(jī)制。依舊是通過 Python 的內(nèi)置支持,即 async/await,或者說是生成器的 yield from。
- Event Loop,這個是 asyncio 實現(xiàn)的。它決定了我們能對什么事件進(jìn)行異步操作,目前只支持定時器與網(wǎng)絡(luò) IO 的異步。
- 協(xié)程鏈的控制流恢復(fù),即內(nèi)部的協(xié)程暫停了,恢復(fù)時卻需要從最外層的協(xié)程開始恢復(fù)。這是 asyncio 實現(xiàn)的內(nèi)容。
- 其它的庫支持,這里指的是像 asyncio.sleep() 這種協(xié)程鏈的最內(nèi)層的協(xié)程,因此我們一般不希望自己去調(diào)用 event loop 注冊/注銷事件。
因此,如果沒有 asyncio,我們要實現(xiàn)相應(yīng)的功能,主要的內(nèi)容就是 Event Loop 及控制流的恢復(fù),***再加上一些好用的協(xié)程函數(shù)。