Python增強(qiáng)的生成器:協(xié)程
本文主要介紹python中Enhanced generator即coroutine相關(guān)內(nèi)容,包括基本語法、使用場景、注意事項(xiàng),以及與其他語言協(xié)程實(shí)現(xiàn)的異同。
enhanced generator
在上文《Python Yield Generator 詳解》中介紹了yield和generator的使用場景和主意事項(xiàng),只用到了generator的next方法,事實(shí)上generator還有更強(qiáng)大的功能。PEP 342為generator增加了一系列方法來使得generator更像一個(gè)協(xié)程Coroutine。做主要的變化在于早期的yield只能返回值(作為數(shù)據(jù)的產(chǎn)生者), 而新增加的send方法能在generator恢復(fù)的時(shí)候消費(fèi)一個(gè)數(shù)值,而去caller(generator的調(diào)用著)也可以通過throw在generator掛起的主動(dòng)拋出異常。
- back_data = yield cur_ret
這段代碼的意思是:當(dāng)執(zhí)行到這條語句時(shí),返回cur_ret給調(diào)用者;并且當(dāng)generator通過next()或者send(some_data)方法恢復(fù)的時(shí)候,將some_data賦值給back_data.例如:
- def gen(data):
- print 'before yield', data
- back_data = yield data
- print 'after resume', back_data
- if __name__ == '__main__':
- g = gen(1)
- print g.next()
- try:
- g.send(0)
- except StopIteration:
- pass
輸出:
- before yield 1
- 1
- after resume 0
兩點(diǎn)需要注意:
- next() 等價(jià)于 send(None)
- ***次調(diào)用時(shí),需要使用next()語句或是send(None),不能使用send發(fā)送一個(gè)非None的值,否則會(huì)出錯(cuò)的,因?yàn)闆]有Python yield語句來接收這個(gè)值。
應(yīng)用場景
當(dāng)generator可以接受數(shù)據(jù)(在從掛起狀態(tài)恢復(fù)的時(shí)候) 而不僅僅是返回?cái)?shù)據(jù)時(shí), generator就有了消費(fèi)數(shù)據(jù)(push)的能力。下面的例子來自這里:
- word_map = {}
- def consume_data_from_file(file_name, consumer):
- for line in file(file_name):
- consumer.send(line)
- def consume_words(consumer):
- while True:
- line = yield
- for word in (w for w in line.split() if w.strip()):
- consumer.send(word)
- def count_words_consumer():
- while True:
- word = yield
- if word not in word_map:
- word_map[word] = 0
- word_map[word] += 1
- print word_map
- if __name__ == '__main__':
- cons = count_words_consumer()
- cons.next()
- cons_inner = consume_words(cons)
- cons_inner.next()
- c = consume_data_from_file('test.txt', cons_inner)
- print word_map
上面的代碼中,真正的數(shù)據(jù)消費(fèi)者是count_words_consumer,最原始的數(shù)據(jù)生產(chǎn)者是consume_data_from_file,數(shù)據(jù)的流向是主動(dòng)從生產(chǎn)者推向消費(fèi)者。不過上面第22、24行分別調(diào)用了兩次next,這個(gè)可以使用一個(gè)decorator封裝一下。
- def consumer(func):
- def wrapper(*args,**kw):
- gen = func(*args, **kw)
- gen.next()
- return gen
- wrapper.__name__ = func.__name__
- wrapper.__dict__ = func.__dict__
- wrapper.__doc__ = func.__doc__
- return wrapper
修改后的代碼:
- def consumer(func):
- def wrapper(*args,**kw):
- gen = func(*args, **kw)
- gen.next()
- return gen
- wrapper.__name__ = func.__name__
- wrapper.__dict__ = func.__dict__
- wrapper.__doc__ = func.__doc__
- return wrapper
- word_map = {}
- def consume_data_from_file(file_name, consumer):
- for line in file(file_name):
- consumer.send(line)
- @consumer
- def consume_words(consumer):
- while True:
- line = yield
- for word in (w for w in line.split() if w.strip()):
- consumer.send(word)
- @consumer
- def count_words_consumer():
- while True:
- word = yield
- if word not in word_map:
- word_map[word] = 0
- word_map[word] += 1
- print word_map
- if __name__ == '__main__':
- cons = count_words_consumer()
- cons_inner = consume_words(cons)
- c = consume_data_from_file('test.txt', cons_inner)
- print word_map
- example_with_deco
generator throw
除了next和send方法,generator還提供了兩個(gè)實(shí)用的方法,throw和close,這兩個(gè)方法加強(qiáng)了caller對(duì)generator的控制。send方法可以傳遞一個(gè)值給generator,throw方法在generator掛起的地方拋出異常,close方法讓generator正常結(jié)束(之后就不能再調(diào)用next send了)。下面詳細(xì)介紹一下throw方法。
- throw(type[, value[, traceback]])
在generator yield的地方拋出type類型的異常,并且返回下一個(gè)被yield的值。如果type類型的異常沒有被捕獲,那么會(huì)被傳給caller。另外,如果generator不能yield新的值,那么向caller拋出StopIteration異常:
- @consumer
- def gen_throw():
- value = yield
- try:
- yield value
- except Exception, e:
- yield str(e) # 如果注釋掉這行,那么會(huì)拋出StopIteration
- if __name__ == '__main__':
- g = gen_throw()
- assert g.send(5) == 5
- assert g.throw(Exception, 'throw Exception') == 'throw Exception'
***次調(diào)用send,代碼返回value(5)之后在第5行掛起, 然后generator throw之后會(huì)被第6行catch住。如果第7行沒有重新yield,那么會(huì)重新拋出StopIteration異常。
注意事項(xiàng)
如果一個(gè)生成器已經(jīng)通過send開始執(zhí)行,那么在其再次yield之前,是不能從其他生成器再次調(diào)度到該生成器
- @consumer
- def funcA():
- while True:
- data = yield
- print 'funcA recevie', data
- fb.send(data * 2)
- @consumer
- def funcB():
- while True:
- data = yield
- print 'funcB recevie', data
- fa.send(data * 2)
- fa = funcA()
- fb = funcB()
- if __name__ == '__main__':
- fa.send(10)
輸出:
- funcA recevie 10
- funcB recevie 20
- ValueError: generator already executing
Generator 與 Coroutine
回到Coroutine,可參見維基百科解釋,而我自己的理解比較簡單(或者片面):程序員可控制的并發(fā)流程,不管是進(jìn)程還是線程,其切換都是操作系統(tǒng)在調(diào)度,而對(duì)于協(xié)程,程序員可以控制什么時(shí)候切換出去,什么時(shí)候切換回來。協(xié)程比進(jìn)程 線程輕量級(jí)很多,較少了上下文切換的開銷。另外,由于是程序員控制調(diào)度,一定程度上也能避免一個(gè)任務(wù)被中途中斷.。協(xié)程可以用在哪些場景呢,我覺得可以歸納為非阻塞等待的場景,如游戲編程,異步IO,事件驅(qū)動(dòng)。
Python中,generator的send和throw方法使得generator很像一個(gè)協(xié)程(coroutine), 但是generator只是一個(gè)半?yún)f(xié)程(semicoroutines),python doc是這樣描述的:
“All of this makes generator functions quite similar to coroutines; they yield multiple times, they have more than one entry point and their execution can be suspended. The only difference is that a generator function cannot control where should the execution continue after it yields; the control is always transferred to the generator’s caller.”
盡管如此,利用enhanced generator也能實(shí)現(xiàn)更強(qiáng)大的功能。比如上文中提到的yield_dec的例子,只能被動(dòng)的等待時(shí)間到達(dá)之后繼續(xù)執(zhí)行。在某些情況下比如觸發(fā)了某個(gè)事件,我們希望立即恢復(fù)執(zhí)行流程,而且我們也關(guān)心具體是什么事件,這個(gè)時(shí)候就需要在generator send了。另外一種情形,我們需要終止這個(gè)執(zhí)行流程,那么刻意調(diào)用close,同時(shí)在代碼里面做一些處理,偽代碼如下:
- @yield_dec
- def do(a):
- print 'do', a
- try:
- event = yield 5
- print 'post_do', a, event
- finally:
- print 'do sth'
至于之前提到的另一個(gè)例子,服務(wù)(進(jìn)程)之間的異步調(diào)用,也是非常適合實(shí)用協(xié)程的例子。callback的方式會(huì)割裂代碼,把一段邏輯分散到多個(gè)函數(shù),協(xié)程的方式會(huì)好很多,至少對(duì)于代碼閱讀而言。其他語言,比如C#、Go語言,協(xié)程都是標(biāo)準(zhǔn)實(shí)現(xiàn),特別對(duì)于go語言,協(xié)程是高并發(fā)的基石。在python3.x中,通過asyncio和async\await也增加了對(duì)協(xié)程的支持。在筆者所使用的2.7環(huán)境下,也可以使用greenlet,之后會(huì)有博文介紹。
參考
- https://www.python.org/dev/peps/pep-0342/
- http://www.dabeaz.com/coroutines/
- https://en.wikipedia.org/wiki/Coroutine#Implementations_for_Python