新聞中心
本文主要介紹python中Enhanced generator即coroutine相關(guān)內(nèi)容,包括基本語法、使用場景、注意事項,以及與其他語言協(xié)程實現(xiàn)的異同。

enhanced generator
在上文《Python Yield Generator 詳解》中介紹了yield和generator的使用場景和主意事項,只用到了generator的next方法,事實上generator還有更強大的功能。PEP 342為generator增加了一系列方法來使得generator更像一個協(xié)程Coroutine。做主要的變化在于早期的yield只能返回值(作為數(shù)據(jù)的產(chǎn)生者), 而新增加的send方法能在generator恢復的時候消費一個數(shù)值,而去caller(generator的調(diào)用著)也可以通過throw在generator掛起的主動拋出異常。
- back_data = yield cur_ret
這段代碼的意思是:當執(zhí)行到這條語句時,返回cur_ret給調(diào)用者;并且當generator通過next()或者send(some_data)方法恢復的時候,將some_data賦值給back_data.例如:
- def gen(data):
- print 'before yield', data
- back_data = yield data
- print 'after resume', back_data
- if __name__ == '__main__':
- g = gen(1)
- print g.next()
- try:
- g.send(0)
- except StopIteration:
- pass
輸出:
- before yield 1
- 1
- after resume 0
兩點需要注意:
- next() 等價于 send(None)
- ***次調(diào)用時,需要使用next()語句或是send(None),不能使用send發(fā)送一個非None的值,否則會出錯的,因為沒有Python yield語句來接收這個值。
應用場景
當generator可以接受數(shù)據(jù)(在從掛起狀態(tài)恢復的時候) 而不僅僅是返回數(shù)據(jù)時, generator就有了消費數(shù)據(jù)(push)的能力。下面的例子來自這里:
- word_map = {}
- def consume_data_from_file(file_name, consumer):
- for line in file(file_name):
- consumer.send(line)
- def consume_words(consumer):
- while True:
- line = yield
- for word in (w for w in line.split() if w.strip()):
- consumer.send(word)
- def count_words_consumer():
- while True:
- word = yield
- if word not in word_map:
- word_map[word] = 0
- word_map[word] += 1
- print word_map
- if __name__ == '__main__':
- cons = count_words_consumer()
- cons.next()
- cons_inner = consume_words(cons)
- cons_inner.next()
- c = consume_data_from_file('test.txt', cons_inner)
- print word_map
上面的代碼中,真正的數(shù)據(jù)消費者是count_words_consumer,最原始的數(shù)據(jù)生產(chǎn)者是consume_data_from_file,數(shù)據(jù)的流向是主動從生產(chǎn)者推向消費者。不過上面第22、24行分別調(diào)用了兩次next,這個可以使用一個decorator封裝一下。
- def consumer(func):
- def wrapper(*args,**kw):
- gen = func(*args, **kw)
- gen.next()
- return gen
- wrapper.__name__ = func.__name__
- wrapper.__dict__ = func.__dict__
- wrapper.__doc__ = func.__doc__
- return wrapper
修改后的代碼:
- def consumer(func):
- def wrapper(*args,**kw):
- gen = func(*args, **kw)
- gen.next()
- return gen
- wrapper.__name__ = func.__name__
- wrapper.__dict__ = func.__dict__
- wrapper.__doc__ = func.__doc__
- return wrapper
- word_map = {}
- def consume_data_from_file(file_name, consumer):
- for line in file(file_name):
- consumer.send(line)
- @consumer
- def consume_words(consumer):
- while True:
- line = yield
- for word in (w for w in line.split() if w.strip()):
- consumer.send(word)
- @consumer
- def count_words_consumer():
- while True:
- word = yield
- if word not in word_map:
- word_map[word] = 0
- word_map[word] += 1
- print word_map
- if __name__ == '__main__':
- cons = count_words_consumer()
- cons_inner = consume_words(cons)
- c = consume_data_from_file('test.txt', cons_inner)
- print word_map
- example_with_deco
generator throw
除了next和send方法,generator還提供了兩個實用的方法,throw和close,這兩個方法加強了caller對generator的控制。send方法可以傳遞一個值給generator,throw方法在generator掛起的地方拋出異常,close方法讓generator正常結(jié)束(之后就不能再調(diào)用next send了)。下面詳細介紹一下throw方法。
- throw(type[, value[, traceback]])
在generator yield的地方拋出type類型的異常,并且返回下一個被yield的值。如果type類型的異常沒有被捕獲,那么會被傳給caller。另外,如果generator不能yield新的值,那么向caller拋出StopIteration異常:
- @consumer
- def gen_throw():
- value = yield
- try:
- yield value
- except Exception, e:
- yield str(e) # 如果注釋掉這行,那么會拋出StopIteration
- if __name__ == '__main__':
- g = gen_throw()
- assert g.send(5) == 5
- assert g.throw(Exception, 'throw Exception') == 'throw Exception'
***次調(diào)用send,代碼返回value(5)之后在第5行掛起, 然后generator throw之后會被第6行catch住。如果第7行沒有重新yield,那么會重新拋出StopIteration異常。
注意事項
如果一個生成器已經(jīng)通過send開始執(zhí)行,那么在其再次yield之前,是不能從其他生成器再次調(diào)度到該生成器
- @consumer
- def funcA():
- while True:
- data = yield
- print 'funcA recevie', data
- fb.send(data * 2)
- @consumer
- def funcB():
- while True:
- data = yield
- print 'funcB recevie', data
- fa.send(data * 2)
- fa = funcA()
- fb = funcB()
- if __name__ == '__main__':
- fa.send(10)
輸出:
- funcA recevie 10
- funcB recevie 20
- ValueError: generator already executing
Generator 與 Coroutine
回到Coroutine,可參見維基百科解釋,而我自己的理解比較簡單(或者片面):程序員可控制的并發(fā)流程,不管是進程還是線程,其切換都是操作系統(tǒng)在調(diào)度,而對于協(xié)程,程序員可以控制什么時候切換出去,什么時候切換回來。協(xié)程比進程 線程輕量級很多,較少了上下文切換的開銷。另外,由于是程序員控制調(diào)度,一定程度上也能避免一個任務被中途中斷.。協(xié)程可以用在哪些場景呢,我覺得可以歸納為非阻塞等待的場景,如游戲編程,異步IO,事件驅(qū)動。
Python中,generator的send和throw方法使得generator很像一個協(xié)程(coroutine), 但是generator只是一個半?yún)f(xié)程(semicoroutines),python doc是這樣描述的:
“All of this makes generator functions quite similar to coroutines; they yield multiple times, they have more than one entry point and their execution can be suspended. The only difference is that a generator function cannot control where should the execution continue after it yields; the control is always transferred to the generator’s caller.”
盡管如此,利用enhanced generator也能實現(xiàn)更強大的功能。比如上文中提到的yield_dec的例子,只能被動的等待時間到達之后繼續(xù)執(zhí)行。在某些情況下比如觸發(fā)了某個事件,我們希望立即恢復執(zhí)行流程,而且我們也關(guān)心具體是什么事件,這個時候就需要在generator send了。另外一種情形,我們需要終止這個執(zhí)行流程,那么刻意調(diào)用close,同時在代碼里面做一些處理,偽代碼如下:
- @yield_dec
- def do(a):
- print 'do', a
- try:
- event = yield 5
- print 'post_do', a, event
- finally:
- print 'do sth'
至于之前提到的另一個例子,服務(進程)之間的異步調(diào)用,也是非常適合實用協(xié)程的例子。callback的方式會割裂代碼,把一段邏輯分散到多個函數(shù),協(xié)程的方式會好很多,至少對于代碼閱讀而言。其他語言,比如C#、Go語言,協(xié)程都是標準實現(xiàn),特別對于go語言,協(xié)程是高并發(fā)的基石。在python3.x中,通過asyncio和async\await也增加了對協(xié)程的支持。在筆者所使用的2.7環(huán)境下,也可以使用greenlet,之后會有博文介紹。
參考
- https://www.python.org/dev/peps/pep-0342/
- http://www.dabeaz.com/coroutines/
- https://en.wikipedia.org/wiki/Coroutine#Implementations_for_Python
文章題目:Python增強的生成器:協(xié)程
文章鏈接:http://fisionsoft.com.cn/article/cdedeeg.html


咨詢
建站咨詢
