Python協(xié)程知多少
從概念上來說,我們都知道多進(jìn)程和多線程,而協(xié)程其實(shí)是在單線程中實(shí)現(xiàn)多并發(fā)。從句法上看,協(xié)程與生成器類似,都是定義體中包含yield關(guān)鍵字的函數(shù)。區(qū)別在于協(xié)程的yield通常出現(xiàn)在表達(dá)式的右邊:datum = yield。這一下就讓初學(xué)者瞬間覺得yield關(guān)鍵字不香了,本來以為yield就是簡簡單單的暫停執(zhí)行順手返回個(gè)值,結(jié)果還能放右邊?
從生成器到協(xié)程
先看一個(gè)可能是協(xié)程最簡單的使用示例:
- >>> def simple_coroutine():
- ... print("-> coroutine started")
- ... x = yield
- ... print("-> coroutine received:", x)
- ...
- >>> my_coro = simple_coroutine()
- >>> my_coro
- <generator object simple_coroutine at 0x0000019A681F27B0>
- >>> next(my_coro)
- -> coroutine started
- >>> my_coro.send(42)
- -> coroutine received: 42
- Traceback (most recent call last):
- File "<input>", line 1, in <module>
- StopIteration
之所以yield可以放右邊,是因?yàn)閰f(xié)程可以接收調(diào)用方使用.send()推送的值。
yield放在右邊以后,它的右邊還能再放個(gè)表達(dá)式,請看下面這個(gè)例子:
- def simple_coro2(a):
- b = yield a
- c = yield a + b
- my_coro2 = simple_coro2(14)
- next(my_coro2)
- my_coro2.send(28)
- my_coro2.send(99)
執(zhí)行過程是:
- 調(diào)用next(my_coro2),執(zhí)行yield a,產(chǎn)出14。
- 調(diào)用my_coro2.send(28),把28賦值給b,然后執(zhí)行yield a + b,產(chǎn)出42。
- 調(diào)用my_coro2.send(99),把99賦值給c,協(xié)程終止。
由此得出結(jié)論,對于b = yield a這行代碼來說,= 右邊的代碼在賦值之前執(zhí)行。
在示例中,需要先調(diào)用next(my_coro)啟動(dòng)生成器,讓程序在yield語句處暫停,然后才可以發(fā)送數(shù)據(jù)。這是因?yàn)閰f(xié)程有四種狀態(tài):
- 'GEN_CREATED' 等待開始執(zhí)行
- 'GEN_RUNNING' 解釋器正在執(zhí)行
- 'GEN_SUSPENDED' 在yield表達(dá)式處暫停
- 'GEN_CLOSED' 執(zhí)行結(jié)束
只有在GEN_SUSPENDED狀態(tài)才能發(fā)送數(shù)據(jù),提前做的這一步叫做預(yù)激,既可以調(diào)用next(my_coro)預(yù)激,也可以調(diào)用my_coro.send(None)預(yù)激,效果一樣。
預(yù)激協(xié)程
協(xié)程必須預(yù)激才能使用,也就是send前,先調(diào)用next,讓協(xié)程處于GEN_SUSPENDED狀態(tài)。但是這件事經(jīng)常會(huì)忘記。為了避免忘記,可以定義一個(gè)預(yù)激裝飾器,比如:
- from functools import wraps
- def coroutine(func):
- @wraps(func)
- def primer(*args, **kwargs):
- gen = func(*args, **kwargs)
- next(gen)
- return gen
- return primer
但實(shí)際上Python給出了一個(gè)更優(yōu)雅的方式,叫做yield from,它會(huì)自動(dòng)預(yù)激協(xié)程。
自定義預(yù)激裝飾器和yield from是不兼容的。
yield from
yield from相當(dāng)于其他語言中的await關(guān)鍵字,作用是:在生成器gen中使用yield from subgen()時(shí),subgen會(huì)獲得控制權(quán),把產(chǎn)出的值傳給gen的調(diào)用方,即調(diào)用方可以直接控制subgen。與此同時(shí),gen會(huì)阻塞,等待subgen終止。
yield from可以用來簡化for循環(huán)中的yield:
- for c in "AB":
- yield c
- yield from "AB"
yield from x表達(dá)式對x做的第一件事就是,調(diào)用iter(x),從中獲取迭代器。
但yield from的作用遠(yuǎn)不止于此,它更重要的作用是打開雙向通道。如下圖所示:
這個(gè)圖信息量很大,很難理解。
首先要理解這3個(gè)概念:調(diào)用方、委派生成器、子生成器。
- 調(diào)用方
說白了就是main函數(shù),也就是眾所周知的程序入口main函數(shù)。
- # the client code, a.k.a. the caller
- def main(data): # <8>
- results = {}
- for key, values in data.items():
- group = grouper(results, key) # <9>
- next(group) # <10>
- for value in values:
- group.send(value) # <11>
- group.send(None) # important! <12>
- # print(results) # uncomment to debug
- report(results)
- 委派生成器
就是包含了yield from語句的函數(shù),也就是協(xié)程。
- # the delegating generator
- def grouper(results, key): # <5>
- while True: # <6>
- results[key] = yield from averager() # <7>
- 子生成器
就是yield from語句右邊跟著的子協(xié)程。
- # the subgenerator
- def averager(): # <1>
- total = 0.0
- count = 0
- average = None
- while True:
- term = yield # <2>
- if term is None: # <3>
- break
- total += term
- count += 1
- average = total/count
- return Result(count, average) # <4>
這比術(shù)語看著舒服多了。
然后是5條線:send、yield、throw、StopIteration、close。
- send
協(xié)程在yield from表達(dá)式處暫停時(shí),main函數(shù)可以通過yield from表達(dá)式把數(shù)據(jù)發(fā)給yield from語句右邊跟著的子協(xié)程。
- yield
yield from語句右邊跟著的子協(xié)程再把產(chǎn)出的值通過yield from表達(dá)式發(fā)給main函數(shù)。
- throw
main函數(shù)通過group.send(None),傳入一個(gè)None值,讓yield from語句右邊跟著的子協(xié)程的while循環(huán)終止,這樣控制權(quán)才會(huì)交回協(xié)程,才能繼續(xù)執(zhí)行,否則會(huì)一直暫在yield from語句暫停。
- StopIteration
yield from語句右邊跟著的生成器函數(shù)返回之后,解釋器會(huì)拋出StopIteration異常。并把返回值附加到異常對象上,此時(shí)協(xié)程會(huì)恢復(fù)。
- close
main函數(shù)執(zhí)行完以后,會(huì)調(diào)用close()方法退出協(xié)程。
大體流程搞清楚了,更多的技術(shù)細(xì)節(jié)就不繼續(xù)研究了,有時(shí)間的話,在以后的Python原理系列中再學(xué)習(xí)吧。
yield from經(jīng)常與Python3.4標(biāo)準(zhǔn)庫里的@asyncio.coroutine裝飾器結(jié)合使用。
協(xié)程用作累加器
這是協(xié)程的常見用途,代碼如下:
- def averager():
- total = 0.0
- count = 0
- average = None
- while True: # <1>
- term = yield average # <2>
- total += term
- count += 1
- average = total/count
協(xié)程實(shí)現(xiàn)并發(fā)
這里例子有點(diǎn)復(fù)雜,源碼地址是:
https://github.com/fluentpython/example-code/blob/master/16-coroutine/taxi_sim.py
核心代碼片段是:
- # BEGIN TAXI_PROCESS
- def taxi_process(ident, trips, start_time=0): # <1>
- """Yield to simulator issuing event at each state change"""
- time = yield Event(start_time, ident, 'leave garage') # <2>
- for i in range(trips): # <3>
- time = yield Event(time, ident, 'pick up passenger') # <4>
- time = yield Event(time, ident, 'drop off passenger') # <5>
- yield Event(time, ident, 'going home') # <6>
- # end of taxi process # <7>
- # END TAXI_PROCESS
- def main(end_time=DEFAULT_END_TIME, num_taxis=DEFAULT_NUMBER_OF_TAXIS,
- seed=None):
- """Initialize random generator, build procs and run simulation"""
- if seed is not None:
- random.seed(seed) # get reproducible results
- taxis = {i: taxi_process(i, (i+1)*2, i*DEPARTURE_INTERVAL)
- for i in range(num_taxis)}
- sim = Simulator(taxis)
- sim.run(end_time)
這個(gè)示例說明了如何在一個(gè)主循環(huán)中處理事件,以及如何通過發(fā)送數(shù)據(jù)驅(qū)動(dòng)協(xié)程。這是asyncio包底層的基本思想。使用協(xié)程代替線程和回調(diào),實(shí)現(xiàn)并發(fā)。
參考資料:
《流暢的Python》第16章 協(xié)程 https://zhuanlan.zhihu.com/p/104918655