自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

理解Python asyncio內(nèi)部實現(xiàn)機(jī)制

開發(fā) 后端
本文需要提前了解 Python 的 yeild from 語法,不了解的話,可以看看 之前關(guān)于 Generator 的文章 ;另外,最好對 future/promise 的概念有一定了解。文中不會介紹如何使用 asyncio 及協(xié)程,并且文中給出的代碼不一定能實際運行(不然代碼量太大)。

協(xié)程 (coroutine) 幾乎是 Python 里最為復(fù)雜的特性之一了,這篇文章我們來說一說 asyncio 的內(nèi)部實現(xiàn)機(jī)制,借此來理解一門語言要支持協(xié)程需要做的工作。

本文需要提前了解 Python 的 yeild from 語法,不了解的話,可以看看 之前關(guān)于 Generator 的文章 ;另外,***對 future/promise 的概念有一定了解。文中不會介紹如何使用 asyncio 及協(xié)程,并且文中給出的代碼不一定能實際運行(不然代碼量太大)。

多線程與協(xié)程

CPU 的執(zhí)行是順序的,線程是操作系統(tǒng)提供的一種機(jī)制,允許我們在操作系統(tǒng)的層面上實現(xiàn)“并行”。而協(xié)程則可以認(rèn)為是應(yīng)用程序提供的一種機(jī)制(用戶或庫來完成),允許我們在應(yīng)用程序的層面上實現(xiàn)“并行”。

由于本質(zhì)上程序是順序執(zhí)行的,要實現(xiàn)這種“并行”的假像,我們需要一種機(jī)制,來“暫停”當(dāng)前的執(zhí)行流,并在之后“恢復(fù)”之前的執(zhí)行流。這在操作系統(tǒng)及多線程/多進(jìn)程中稱為“上下文切換” (context switch)。其中“上下文”記錄了某個線程執(zhí)行的狀態(tài),包括線程里用到的各個變量,線程的調(diào)用棧等。而“切換”指的就是保存某個線程當(dāng)前的運行狀態(tài),之后再從之前的狀態(tài)中恢復(fù)。只不過線程相關(guān)的工作是由操作系統(tǒng)完成,而協(xié)程則是由應(yīng)用程序自己來完成。

與線程不同的時,協(xié)程完成的功能通常較小,所以會有需求將不同的協(xié)程串起來,我們暫時稱它為協(xié)程鏈 (coroutine chain)。

那么,與線程類似,要實現(xiàn)一個協(xié)程的庫,我們需要這幾樣?xùn)|西:

  1. 事件循環(huán) (event loop)。一方面,它類似于 CPU ,順序執(zhí)行協(xié)程的代碼;另一方面,它相當(dāng)于操作系統(tǒng),完成協(xié)程的調(diào)度,即一個協(xié)程“暫停”時,決定接下來執(zhí)行哪個協(xié)程。
  2. 上下文的表示。在 Python 中,我們使用 Python 本身支持的生成器 Generator 來代表基本的上下文,但協(xié)程鏈?zhǔn)侨绾喂ぷ鞯哪?
  3. 上下文的切換。最基礎(chǔ)的切換也是通過 Python 生成器的 yeild 加強(qiáng)版語法來完成的,但我們還要考慮協(xié)程鏈的情況。

Event Loop

首先,因為協(xié)程是一種能暫停的函數(shù),那么它暫停是為了什么?一般是等待某個事件,比如說某個連接建立了;某個 socket 接收到數(shù)據(jù)了;某個計時器歸零了等。而這些事件應(yīng)用程序只能通過輪詢的方式得知是否完成,但是操作系統(tǒng)(所有現(xiàn)代的操作系統(tǒng))可以提供一些中斷的方式通知應(yīng)用程序,如 select , epoll , kqueue 等等。

那么有了操作系統(tǒng)的支持,我們就可以手寫這樣的循環(huán)(偽代碼):

 

  1. while True 
  2.  happend = poll_events(events_to_listen, timeout) 
  3.  process_events(happend) 

***個問題是:如何注冊我們想監(jiān)聽的事件?很簡單,把事件加到 events_to_listen 里就可以了。第二個問題,可以監(jiān)聽什么事件?由于 process_events 需要操作系統(tǒng)的支持,那么我們想監(jiān)聽的事件是需要操作系統(tǒng)支持才行的,一般操作系統(tǒng)支持網(wǎng)絡(luò) I/O 的文件描述符 (file descriptor)。

接下來,當(dāng)事件發(fā)生時,我們要指定做一些事,一般稱為回調(diào) (callback)。也就是說我們需要告訴 event loop 一個 事件:回調(diào) 的對應(yīng)關(guān)系?,F(xiàn)在我們把 event loop 用類表示:

 

  1. class EventLoop: 
  2.  def __init__(self): 
  3.  self.events_to_listen = [] 
  4.  self.callbacks = {} 
  5.  self.timeout = None 
  6.  
  7.  def register_event(self, event, callback): 
  8.  self.events_to_listen.append(event) 
  9.  self.callbacks[event] = callback 
  10.  
  11.  def unregister_event(self, event): 
  12.  self.events_to_listen.remove(evenrt) 
  13.  del self.callbacks[event] 
  14.  
  15.  def _process_events(self, events): 
  16.  for event in events: 
  17.  self.callbacks[event](event) 
  18.  
  19.  def start_loop(self): 
  20.  while True
  21.  events_happend = poll_events(self.events_to_listen, timeout) 
  22.  self._process_events(events_happend) 
  23.  
  24. loop = EventLoop() 
  25. loop.register_event(fd, callback) 
  26. loop.start_loop() 

register_event 用到注冊 事件: 回調(diào) 的關(guān)系, start_loop 用于開啟事件循環(huán)。

現(xiàn)在,你不是想說,之前提到過事件也包括“某個計時器歸零了”,但 poll_events 只支持網(wǎng)絡(luò) I/O 的文件描述符,計時器又要如何實現(xiàn)呢?一般 poll_events 函數(shù)是支持 timeout 參數(shù)表示等待的時間。因此,可以修改 start_loop :

 

  1. def call_later(self, delay, callback): 
  2.  self.call_at(now() + delay, callback) 
  3.  
  4. def call_at(self, when, callback): 
  5.  self.timeout_callbacks[when] = callback 
  6.  
  7. def start_loop(self): 
  8.  while True
  9.  timeout = min(self.timeout_callbacks.keys()) - now() 
  10.  events_happend = poll_events(self.events_to_listen, timeout) 
  11.  if not empty(events_happend): 
  12.  self._process_events(events_happend) 
  13.  self._process_timeout_events() 
  14.  
  15. def _process_timeout_events(self): 
  16.  time_now = now() 
  17.  for time, callback in self.timeout_callbacks.iteritems(): 
  18.  if time < time_now: 
  19.  callback() 
  20.  del self.timeout_callbacks[time

這里 poll_events 之前,會去計算所有計時器事件最少需要等待的時間,這個時間內(nèi)即使沒有事件發(fā)生, poll_events 也會退出,以便觸發(fā)計時器事件。 _process_timeout_events 函數(shù)的作用是對比當(dāng)前時間與計時器的目標(biāo)執(zhí)行時間,如果目標(biāo)執(zhí)行時間已經(jīng)到達(dá),則執(zhí)行相應(yīng)的回調(diào)函數(shù)。

于是一個簡單的 event loop 就完成了??梢钥吹剑钱惒讲僮鞯幕A(chǔ):允許等待某個事件的發(fā)生并執(zhí)行相應(yīng)的操作。同時,它還是個簡單的調(diào)度器,能順序地執(zhí)行發(fā)生事件的回調(diào)函數(shù)。

Callback vs Promise vs await

好了,現(xiàn)在我們有了 event loop ,它允許我們?yōu)槭录曰卣{(diào)函數(shù)。現(xiàn)在假設(shè)我們要順序調(diào)用幾個 API, 用阻塞式編程如下:

 

  1. result1 = api1() 
  2. result2 = api2(result1) 
  3. result3 = api3(result2) 
  4. ... 

如果這幾個 API 都是異步的,用 event loop + callback 怎么實現(xiàn)?

 

  1. # Implementation for api 
  2. def api1(callback): 
  3.  def callback_for_api1(): 
  4.  result1 = some_calculation_1() 
  5.  event_loop.unregister_event(event1) 
  6.  return callback(result1) 
  7.  event_loop.register_event(event1, callback_for_api1) 
  8.  
  9. def api2(result, callback): 
  10.  def callback_for_api2(): 
  11.  result2 = some_calculation_2(result) 
  12.  event_loop.unregister_event(event2) 
  13.  return callback(result2) 
  14.  event_loop.register_event(event2, callback_for_api2) 
  15. ... 
  16.  
  17. # Our code 
  18. global result 
  19. def api1_callback(result1): 
  20.  def api2_callback(result2): 
  21.  def api3_callback(result3): 
  22.  global result 
  23.  result = some_calculation(result3) 
  24.  return api3(result2, api3_callback) 
  25.  return api2(result1, api2_callback) 
  26. api1(api1_callback) 

這里 api1 api2 的實現(xiàn)由于需要用 event loop 來注冊注銷某些事件,所以顯得特別復(fù)雜,這里我們可以先忽略它們的實現(xiàn),但是看***一段“用戶代碼”是不是極其復(fù)雜?隨著操作的復(fù)雜性增加,回調(diào)函數(shù)的嵌套會越變越深。如果你熟悉Javascript,你應(yīng)該聽過“callback hell”的大名?;卣{(diào)函數(shù)的方式為什么不好?最重要的就是它違反了我們寫代碼的直覺,我們都習(xí)慣順序執(zhí)行的代碼。

例如上例中,我們期待的是 api1 先執(zhí)行,我們再用它的結(jié)果做點什么,但采用回調(diào)的方式,我們就需要在寫 api1 的回調(diào)時,就去思考我們想用它的結(jié)果做些什么操作。在這個例子里,我們需要調(diào)用 api2 及 api3 ,這些嵌套的思考又得一遍遍重復(fù)下去。最終代碼非常難以理解。

因此 Javascript 提出了 Promise ,所謂的 promise 像是一個占位符,它表示一個運算現(xiàn)在還未完成,但我保證它會做完的;你可以指定它完成的時候做些其它的事。下面我們嘗試用這個思路去做一些改進(jìn)(Python 沒有原生的 promise 支持):

 

  1. class Promise(): 
  2.  def __init__(self): 
  3.  pass 
  4.  def then(self, callback_that_return_promise): 
  5.  self._then = callback_that_return_promise 
  6.  def set_result(self, result): 
  7.  return self._then(result) 
  8.  
  9. # Implementation for api 
  10. def api1(): 
  11.  promise = Promise() 
  12.  def callback_for_api1(): 
  13.  promise.set_result(some_calculation_1()) 
  14.  event_loop.unregister_event(event1) 
  15.  event_loop.register_event(event1, callback_for_api1) 
  16.  return promise 
  17.  
  18. def api2(result): 
  19.  promise = Promise() 
  20.  def callback_for_api2(): 
  21.  promise.set_result((some_calculation_2(result)) 
  22.  event_loop.unregister_event(event2) 
  23.  return callback(result2) 
  24.  return promise 
  25. ... 
  26.  
  27. # Our code 
  28. global result 
  29. promise = api1().then(lambda result1: return api2(result1)) 
  30.  .then(lambda result2: return api3(result3)) 
  31.  .then(lambda result3: global result; result = result3) 
  32.  
  33. promise.wait_till_complete() 

這里我們簡單實現(xiàn)了一個我們自己的 Promise 類,當(dāng)它的 set_result 方法被調(diào)用時,Promise 會去執(zhí)行之前用 .then 注冊的回調(diào)函數(shù),該回調(diào)函數(shù)將執(zhí)行另一些操作并返回一個新的 Promise。也因此,我們可以不斷地調(diào)用 then 將不同的 Promise 組合起來??梢钥吹?,現(xiàn)在我們的代碼就是線性的了!

然而故事還沒有結(jié)束,人們依舊不滿于 Promise 的寫法和用法,又提出了 async/await 的寫法。在 Python 中,上面的代碼用 async/await 重寫如下:

 

  1. result1 = await api1() 
  2. result2 = await api2(result1) 
  3. result3 = await api3(result2) 

 

是不是簡單明了?它的效果和我們前幾個例子是等價的,但它的寫法與我們初開始的阻塞版本幾乎一致。這樣能把異步與同步的編碼在結(jié)構(gòu)上盡量統(tǒng)一起來。

這里我不禁想問,為什么大家沒有一開始就想到 async/await 的方式呢?我的一個假設(shè)是 async/await 是需要語言本身的支持的,而寫編譯器/解釋器的專家不一定有編寫應(yīng)用的豐富經(jīng)驗,是很可能從一開始就拒絕這樣的修改的。因此程序員們只能自己用庫的形式添加支持了。當(dāng)然這純粹是猜測,只想感嘆下不同領(lǐng)域的隔閡。

總而言之,有了 event loop 我們就能通過回調(diào)函數(shù)來完成異步編程,但這種方式非常不友好,因此人們又提出了類似 Promise 的思想,讓我們能順序編寫異步代碼,***通過語言對 async/await 的語法支持,異步與同步代碼的結(jié)構(gòu)就幾乎達(dá)到統(tǒng)一。這種統(tǒng)一有很重要的意義,它使我們能以同步的思維去理解異步的代碼而不受回調(diào)方式的代碼結(jié)構(gòu)的影響。

而這一切都是為了將不同的異步函數(shù)“鏈接”起來,只不過是 async/await 的方式最為方便。對比線程,操作系統(tǒng)是沒有提供方式將不同的線程鏈接起來的,因此這種將不同的協(xié)程鏈接起來的工具是協(xié)程比線程好的一個方面。

上下文切換(恢復(fù)控制流)

前面提到過,如果某個協(xié)程在等待某些資源,我們需要暫停它的執(zhí)行,在 event loop 中注冊這個事件,以便當(dāng)事件發(fā)生的時候,能再次喚醒該協(xié)程的執(zhí)行。

這里舉一個 Python 官方文檔 的例子:

 

  1. import asyncio  
  2. async def compute(x, y): 
  3.  print("Compute %s + %s ..." % (x, y)) 
  4.  await asyncio.sleep(1.0) 
  5.  return x + y  
  6. async def print_sum(x, y): 
  7.  result = await compute(x, y) 
  8.  print("%s + %s = %s" % (x, y, result))  
  9. loop = asyncio.get_event_loop() 
  10. loop.run_until_complete(print_sum(1, 2)) 
  11. loop.close() 

上面的代碼的執(zhí)行流程是:

理解Python asyncio內(nèi)部實現(xiàn)機(jī)制

這里有兩個問題:

  1. 誰向 event loop 注冊了事件(及回調(diào))?
  2. 程序從哪里恢復(fù)執(zhí)行?

程序從 print_sum 開始執(zhí)行,執(zhí)行到 asyncio.sleep 時需要暫停,那么肯定是在 sleep 中向 event loop 注冊了計時器事件。那們問題來了,當(dāng)程序恢復(fù)執(zhí)行時,它應(yīng)該從哪里恢復(fù)呢?

從上面的流程圖中,可以看見它是從 print_sum 開始恢復(fù),但這樣的話, sleep 注冊事件時就需要知道是誰(即 print_sum )調(diào)用了它,這樣才能在 callback 中指定從 print_sum 開始恢復(fù)執(zhí)行!

但如果不是從 print_sum 恢復(fù)執(zhí)行,那么一樣的,從 sleep 恢復(fù)執(zhí)行后, sleep 需要知道接下來返回到什么位置(即 compute 函數(shù)中的 await 位置), asyncio 又是如何做到這點的?

那么事實(代碼實現(xiàn))是怎樣的呢?

當(dāng)我們把一個協(xié)程用 loop.run_until_complete (或其它相似方法)執(zhí)行時, event loop 會把它包裹成一個 Task 。當(dāng)協(xié)程開始執(zhí)行或被喚醒時,Task 的 _step 方法會被調(diào)用, 這里 它會調(diào)用 coro.send(None) 來執(zhí)行/喚醒它包裹著的協(xié)程。

 

  1. if exc is None: 
  2.  # We use the `send` method directly, because coroutines 
  3.  # don't have `__iter__` and `__next__` methods. 
  4.  result = coro.send(None) 
  5. else
  6.  result = coro.throw(exc) 

注意到這里將 coro.send 的結(jié)果賦值給了 result ,那么它會返回什么呢?在我們這個例子中,協(xié)程鏈的最末尾是 asyncio.sleep ,我們看看 它的實現(xiàn) :

 

  1. @coroutine 
  2. def sleep(delay, result=None, *, loop=None): 
  3.  """Coroutine that completes after a given time (in seconds).""" 
  4.  if delay == 0: 
  5.  yield 
  6.  return result 
  7.  
  8.  if loop is None: 
  9.  loop = events.get_event_loop() 
  10.  future = loop.create_future() 
  11.  h = future._loop.call_later(delay, 
  12.  futures._set_result_unless_cancelled, 
  13.  future, result) 
  14.  try: 
  15.  return (yield from future) 
  16.  finally: 
  17.  h.cancel() 

這里它創(chuàng)建了一個 future 并為它注冊了事件( call_later ),最終調(diào)用了 yield from future 返回。它代表什么呢?我們已經(jīng)假設(shè)你明白 yield from 的使用方法,這代表 Python 會首先調(diào)用 future.__iter__ 函數(shù),我們來看看 它長什么樣 :

 

  1. def __iter__(self): 
  2.  if not self.done(): 
  3.  self._asyncio_future_blocking = True 
  4.  yield self # This tells Task to wait for completion. 
  5.  assert self.done(), "yield from wasn't used with future" 
  6.  return self.result() # May raise too. 
  7.  
  8. if compat.PY35: 
  9.  __await__ = __iter__ # make compatible with 'await' expression 

注意這里的 yield self !也就是說 future 在***次執(zhí)行到這里時,會暫停執(zhí)行并返回它自己,由于 coroutine 中使用的都是 yield from/await (它們在接收的參數(shù)上有區(qū)別,但在本文的討論中沒有區(qū)別),因此這個值會一直向上傳遞,到 Task._step 函數(shù)的 result = coro.send(None) 這里,那我們來看看 Task 對 result 做了什么,重要的是 這一句 :

  1. result.add_done_callback(self._wakeup) 

也就是說 task( print_sum ) 得到了最內(nèi)層暫停的 sleep 生成的 future 并為該 future 注冊了一個回調(diào),使得在 future.set_result 被調(diào)用時, task._wakeup 會被調(diào)用。這部分的邏輯可以看 這里 。

我們再回過頭來看看 future.set_result 會在什么時候被調(diào)用,在 asyncio.sleep 函數(shù)里,我們?yōu)?event loop 注冊了一個回調(diào)函數(shù):

 

  1. h = future._loop.call_later(delay, 
  2.  futures._set_result_unless_cancelled, 
  3.  future, result) 

那么這個 _set_result_unless_cancelled 是這樣的:

 

  1. def _set_result_unless_cancelled(fut, result): 
  2.  """Helper setting the result only if the future was not cancelled.""" 
  3.  if fut.cancelled(): 
  4.  return 
  5.  fut.set_result(result) 

因此,所有的流程應(yīng)該是這樣的:

理解Python asyncio內(nèi)部實現(xiàn)機(jī)制

小結(jié)

那么 asyncio 做為一個庫,做了什么,沒做什么?

  1. 控制流的暫停與恢復(fù),這是通過 Python 內(nèi)部的 Generator(生成器)相關(guān)的功能實現(xiàn)的。
  2. 協(xié)程鏈,即把不同協(xié)程鏈鏈接在一起的機(jī)制。依舊是通過 Python 的內(nèi)置支持,即 async/await,或者說是生成器的 yield from。
  3. Event Loop,這個是 asyncio 實現(xiàn)的。它決定了我們能對什么事件進(jìn)行異步操作,目前只支持定時器與網(wǎng)絡(luò) IO 的異步。
  4. 協(xié)程鏈的控制流恢復(fù),即內(nèi)部的協(xié)程暫停了,恢復(fù)時卻需要從最外層的協(xié)程開始恢復(fù)。這是 asyncio 實現(xiàn)的內(nèi)容。
  5. 其它的庫支持,這里指的是像 asyncio.sleep() 這種協(xié)程鏈的最內(nèi)層的協(xié)程,因此我們一般不希望自己去調(diào)用 event loop 注冊/注銷事件。

因此,如果沒有 asyncio,我們要實現(xiàn)相應(yīng)的功能,主要的內(nèi)容就是 Event Loop 及控制流的恢復(fù),***再加上一些好用的協(xié)程函數(shù)。

責(zé)任編輯:未麗燕 來源: 三點水
相關(guān)推薦

2010-09-26 16:14:22

JVM實現(xiàn)機(jī)制JVM

2017-09-05 10:20:30

PyTorchTensorPython

2020-10-14 09:11:44

IO 多路復(fù)用實現(xiàn)機(jī)

2023-12-14 10:35:22

虛擬機(jī)程序

2017-02-14 13:08:45

2017-05-05 08:44:24

PythonAsyncio異步編程

2014-03-31 10:51:40

pythonasyncio

2020-02-21 08:00:00

Pythonasyncio編程語言

2017-08-02 15:00:12

PythonAsyncio異步編程

2017-05-24 15:50:08

PythonCPython

2017-05-22 15:42:39

Python字典哈希表

2014-06-13 11:08:52

Redis主鍵失效

2014-06-17 10:27:39

Redis緩存

2013-08-28 10:11:37

RedisRedis主鍵失效NoSQL

2022-06-27 11:04:24

RocketMQ順序消息

2022-07-18 21:53:46

RocketMQ廣播消息

2025-04-07 11:10:00

Python列表開發(fā)

2010-06-02 13:13:40

Cassandra

2010-06-01 16:43:07

Cassandra內(nèi)部

2023-06-07 15:25:19

Kafka版本日志
點贊
收藏

51CTO技術(shù)棧公眾號