如何在Python中使用協(xié)程?相信很多沒有經(jīng)驗(yàn)的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。
創(chuàng)新互聯(lián)公司一直通過網(wǎng)站建設(shè)和網(wǎng)站營銷幫助企業(yè)獲得更多客戶資源。 以"深度挖掘,量身打造,注重實(shí)效"的一站式服務(wù),以網(wǎng)站建設(shè)、成都網(wǎng)站建設(shè)、移動(dòng)互聯(lián)產(chǎn)品、全網(wǎng)營銷推廣服務(wù)為核心業(yè)務(wù)。10余年網(wǎng)站制作的經(jīng)驗(yàn),使用新網(wǎng)站建設(shè)技術(shù),全新開發(fā)出的標(biāo)準(zhǔn)網(wǎng)站,不但價(jià)格便宜而且實(shí)用、靈活,特別適合中小公司網(wǎng)站制作。網(wǎng)站管理系統(tǒng)簡單易用,維護(hù)方便,您可以完全操作網(wǎng)站資料,是中小公司快速網(wǎng)站建設(shè)的選擇。python主要應(yīng)用領(lǐng)域有哪些1、云計(jì)算,典型應(yīng)用OpenStack。2、WEB前端開發(fā),眾多大型網(wǎng)站均為Python開發(fā)。3.人工智能應(yīng)用,基于大數(shù)據(jù)分析和深度學(xué)習(xí)而發(fā)展出來的人工智能本質(zhì)上已經(jīng)無法離開python。4、系統(tǒng)運(yùn)維工程項(xiàng)目,自動(dòng)化運(yùn)維的標(biāo)配就是python+Django/flask。5、金融理財(cái)分析,量化交易,金融分析。6、大數(shù)據(jù)分析。
協(xié)程的生成器的基本行為
這里有一個(gè)最簡單的協(xié)程代碼:
def simple_coroutine(): print('-> start') x = yield print('-> recived', x) sc = simple_coroutine() next(sc) sc.send('zhexiao')
解釋:
1. 協(xié)程使用生成器函數(shù)定義:定義體中有 yield 關(guān)鍵字。
2. yield 在表達(dá)式中使用;如果協(xié)程只需從客戶那里接收數(shù)據(jù),那么產(chǎn)出的值是 None —— 這個(gè)值是隱式指定的,因?yàn)?yield 關(guān)鍵字右邊沒有表達(dá)式。
3. 首先要調(diào)用 next(…) 函數(shù),因?yàn)樯善鬟€沒啟動(dòng),沒在 yield 語句處暫停,所以一開始無法發(fā)送數(shù)據(jù)。
4. 調(diào)用send方法,把值傳給 yield 的變量,然后協(xié)程恢復(fù),繼續(xù)執(zhí)行下面的代碼,直到運(yùn)行到下一個(gè) yield 表達(dá)式,或者終止。
==注意:send方法只有當(dāng)協(xié)程處于 GEN_SUSPENDED 狀態(tài)下時(shí)才會(huì)運(yùn)作,所以我們使用 next() 方法激活協(xié)程到 yield 表達(dá)式處停止,或者我們也可以使用 sc.send(None),效果與 next(sc) 一樣==。
協(xié)程的四個(gè)狀態(tài):
協(xié)程可以身處四個(gè)狀態(tài)中的一個(gè)。當(dāng)前狀態(tài)可以使用inspect.getgeneratorstate(…) 函數(shù)確定,該函數(shù)會(huì)返回下述字符串中的一個(gè):
1. GEN_CREATED:等待開始執(zhí)行
2. GEN_RUNNING:解釋器正在執(zhí)行
3. GEN_SUSPENED:在yield表達(dá)式處暫停
4. GEN_CLOSED:執(zhí)行結(jié)束
==最先調(diào)用 next(sc) 函數(shù)這一步通常稱為“預(yù)激”(prime)協(xié)程==(即,讓協(xié)程向前執(zhí)行到第一個(gè) yield 表達(dá)式,準(zhǔn)備好作為活躍的協(xié)程使用)。
import inspect def simple_coroutine(a): print('-> start') b = yield a print('-> recived', a, b) c = yield a + b print('-> recived', a, b, c) # run sc = simple_coroutine(5) next(sc) sc.send(6) # 5, 6 sc.send(7) # 5, 6, 7
示例:使用協(xié)程計(jì)算移動(dòng)平均值
def averager(): total = 0.0 count = 0 avg = None while True: num = yield avg total += num count += 1 avg = total/count # run ag = averager() # 預(yù)激協(xié)程 print(next(ag)) # None print(ag.send(10)) # 10 print(ag.send(20)) # 15
解釋:
1. 調(diào)用 next(ag) 函數(shù)后,協(xié)程會(huì)向前執(zhí)行到 yield 表達(dá)式,產(chǎn)出 average 變量的初始值——None。
2. 此時(shí),協(xié)程在 yield 表達(dá)式處暫停。
3. 使用 send() 激活協(xié)程,把發(fā)送的值賦給 num,并計(jì)算出 avg 的值。
4. 使用 print 打印出 yield 返回的數(shù)據(jù)。
終止協(xié)程和異常處理
協(xié)程中未處理的異常會(huì)向上冒泡,傳給 next 函數(shù)或 send 方法的調(diào)用方(即觸發(fā)協(xié)程的對象)。
==終止協(xié)程的一種方式:發(fā)送某個(gè)哨符值,讓協(xié)程退出。內(nèi)置的 None 和Ellipsis 等常量經(jīng)常用作哨符值==。
顯式地把異常發(fā)給協(xié)程
從 Python 2.5 開始,客戶代碼可以在生成器對象上調(diào)用兩個(gè)方法,顯式地把異常發(fā)給協(xié)程。
generator.throw(exc_type[, exc_value[, traceback]])
致使生成器在暫停的 yield 表達(dá)式處拋出指定的異常。如果生成器處理了拋出的異常,代碼會(huì)向前執(zhí)行到下一個(gè) yield 表達(dá)式,而產(chǎn)出的值會(huì)成為調(diào)用 generator.throw方法得到的返回值。如果生成器沒有處理拋出的異常,異常會(huì)向上冒泡,傳到調(diào)用方的上下文中。
generator.close()
致使生成器在暫停的 yield 表達(dá)式處拋出 GeneratorExit 異常。如果生成器沒有處理這個(gè)異常,或者拋出了 StopIteration 異常(通常是指運(yùn)行到結(jié)尾),調(diào)用方不會(huì)報(bào)錯(cuò)。如果收到 GeneratorExit 異常,生成器一定不能產(chǎn)出值,否則解釋器會(huì)拋出RuntimeError 異常。生成器拋出的其他異常會(huì)向上冒泡,傳給調(diào)用方。
異常處理示例:
class DemoException(Exception): """ custom exception """ def handle_exception(): print('-> start') while True: try: x = yield except DemoException: print('-> run demo exception') else: print('-> recived x:', x) raise RuntimeError('this line should never run') he = handle_exception() next(he) he.send(10) # recived x: 10 he.send(20) # recived x: 20 he.throw(DemoException) # run demo exception he.send(40) # recived x: 40 he.close()
如果傳入無法處理的異常,則協(xié)程會(huì)終止:
he.throw(Exception) # run demo exception
yield from獲取協(xié)程的返回值
為了得到返回值,協(xié)程必須正常終止;然后生成器對象會(huì)拋出StopIteration 異常,異常對象的 value 屬性保存著返回的值。
==yield from 結(jié)構(gòu)會(huì)在內(nèi)部自動(dòng)捕獲 StopIteration 異常==。對 yield from 結(jié)構(gòu)來說,解釋器不僅會(huì)捕獲 StopIteration 異常,還會(huì)把value 屬性的值變成 yield from 表達(dá)式的值。
yield from基本用法
==在生成器 gen 中使用 yield from subgen() 時(shí), subgen 會(huì)獲得控制權(quán),把產(chǎn)出的值傳給 gen 的調(diào)用方,即調(diào)用方可以直接控制 subgen。與此同時(shí), gen 會(huì)阻塞,等待 subgen 終止==。
下面2個(gè)函數(shù)的作用一樣,只是使用了 yield from 的更加簡潔:
def gen(): for c in 'AB': yield c print(list(gen())) def gen_new(): yield from 'AB' print(list(gen_new()))
==yield from x 表達(dá)式對 x 對象所做的第一件事是,調(diào)用 iter(x),從中獲取迭代器,因此, x 可以是任何可迭代的對象,這只是 yield from 最基礎(chǔ)的用法==。
yield from高級用法
==yield from 的主要功能是打開雙向通道,把最外層的調(diào)用方與最內(nèi)層的子生成器連接起來,這樣二者可以直接發(fā)送和產(chǎn)出值,還可以直接傳入異常,而不用在位于中間的協(xié)程中添加大量處理異常的樣板代碼==。
yield from 專門的術(shù)語
委派生成器:包含 yield from 表達(dá)式的生成器函數(shù)。
子生成器:從 yield from 中 部分獲取的生成器。
圖示
解釋:
1. 委派生成器在 yield from 表達(dá)式處暫停時(shí),調(diào)用方可以直接把數(shù)據(jù)發(fā)給子生成器。
2. 子生成器再把產(chǎn)出的值發(fā)給調(diào)用方。
3. 子生成器返回之后,解釋器會(huì)拋出 StopIteration 異常,并把返回值附加到異常對象上,此時(shí)委派生成器會(huì)恢復(fù)。
高級示例
from collections import namedtuple ResClass = namedtuple('Res', 'count average') # 子生成器 def averager(): total = 0.0 count = 0 average = None while True: term = yield if term is None: break total += term count += 1 average = total / count return ResClass(count, average) # 委派生成器 def grouper(storages, key): while True: # 獲取averager()返回的值 storages[key] = yield from averager() # 客戶端代碼 def client(): process_data = { 'boys_2': [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3], 'boys_1': [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46] } storages = {} for k, v in process_data.items(): # 獲得協(xié)程 coroutine = grouper(storages, k) # 預(yù)激協(xié)程 next(coroutine) # 發(fā)送數(shù)據(jù)到協(xié)程 for dt in v: coroutine.send(dt) # 終止協(xié)程 coroutine.send(None) print(storages) # run client()
解釋:
1. 外層 for 循環(huán)每次迭代會(huì)新建一個(gè) grouper 實(shí)例,賦值給 coroutine 變量; grouper 是委派生成器。
2. 調(diào)用 next(coroutine),預(yù)激委派生成器 grouper,此時(shí)進(jìn)入 while True 循環(huán),調(diào)用子生成器 averager 后,在 yield from 表達(dá)式處暫停。
3. 內(nèi)層 for 循環(huán)調(diào)用 coroutine.send(value),直接把值傳給子生成器 averager。同時(shí),當(dāng)前的 grouper 實(shí)例(coroutine)在 yield from 表達(dá)式處暫停。
4. 內(nèi)層循環(huán)結(jié)束后, grouper 實(shí)例依舊在 yield from 表達(dá)式處暫停,因此, grouper函數(shù)定義體中為 results[key] 賦值的語句還沒有執(zhí)行。
5. coroutine.send(None) 終止 averager 子生成器,子生成器拋出 StopIteration 異常并將返回的數(shù)據(jù)包含在異常對象的value中,yield from 可以直接抓取 StopItration 異常并將異常對象的 value 賦值給 results[key]
yield from的意義
子生成器產(chǎn)出的值都直接傳給委派生成器的調(diào)用方(即客戶端代碼)。
使用 send() 方法發(fā)給委派生成器的值都直接傳給子生成器。如果發(fā)送的值是None,那么會(huì)調(diào)用子生成器的 next() 方法。如果發(fā)送的值不是 None,那么會(huì)調(diào)用子生成器的 send() 方法。如果調(diào)用的方法拋出 StopIteration 異常,那么委派生成器恢復(fù)運(yùn)行。任何其他異常都會(huì)向上冒泡,傳給委派生成器。
生成器退出時(shí),生成器(或子生成器)中的 return expr 表達(dá)式會(huì)觸發(fā) StopIteration(expr) 異常拋出。
yield from 表達(dá)式的值是子生成器終止時(shí)傳給 StopIteration 異常的第一個(gè)參數(shù)。
傳入委派生成器的異常,除了 GeneratorExit 之外都傳給子生成器的 throw() 方法。如果調(diào)用 throw() 方法時(shí)拋出 StopIteration 異常,委派生成器恢復(fù)運(yùn)行。 StopIteration 之外的異常會(huì)向上冒泡,傳給委派生成器。
如果把 GeneratorExit 異常傳入委派生成器,或者在委派生成器上調(diào)用 close() 方法,那么在子生成器上調(diào)用 close() 方法,如果它有的話。如果調(diào)用close()方法導(dǎo)致異常拋出,那么異常會(huì)向上冒泡,傳給委派生成器;否則,委派生成器拋出GeneratorExit 異常。
使用案例
協(xié)程能自然地表述很多算法,例如仿真、游戲、異步 I/O,以及其他事件驅(qū)動(dòng)型編程形式或協(xié)作式多任務(wù)。協(xié)程是 asyncio 包的基礎(chǔ)構(gòu)建。通過仿真系統(tǒng)能說明如何使用協(xié)程代替線程實(shí)現(xiàn)并發(fā)的活動(dòng)。
在仿真領(lǐng)域,進(jìn)程這個(gè)術(shù)語指代模型中某個(gè)實(shí)體的活動(dòng),與操作系統(tǒng)中的進(jìn)程無關(guān)。仿真系統(tǒng)中的一個(gè)進(jìn)程可以使用操作系統(tǒng)中的一個(gè)進(jìn)程實(shí)現(xiàn),但是通常會(huì)使用一個(gè)線程或一個(gè)協(xié)程實(shí)現(xiàn)。
出租車示例
import collections # time 字段是事件發(fā)生時(shí)的仿真時(shí)間, # proc 字段是出租車進(jìn)程實(shí)例的編號(hào), # action 字段是描述活動(dòng)的字符串。 Event = collections.namedtuple('Event', 'time proc action') def taxi_process(proc_num, trips_num, start_time=0): """ 每次改變狀態(tài)時(shí)創(chuàng)建事件,把控制權(quán)讓給仿真器 :param proc_num: :param trips_num: :param start_time: :return: """ time = yield Event(start_time, proc_num, 'leave garage') for i in range(trips_num): time = yield Event(time, proc_num, 'pick up people') time = yield Event(time, proc_num, 'drop off people') yield Event(time, proc_num, 'go home') # run t1 = taxi_process(1, 1) a = next(t1) print(a) # Event(time=0, proc=1, action='leave garage') b = t1.send(a.time + 6) print(b) # Event(time=6, proc=1, action='pick up people') c = t1.send(b.time + 12) print(c) # Event(time=18, proc=1, action='drop off people') d = t1.send(c.time + 1) print(d) # Event(time=19, proc=1, action='go home')
模擬控制臺(tái)控制3個(gè)出租車異步
import collections import queue import random # time 字段是事件發(fā)生時(shí)的仿真時(shí)間, # proc 字段是出租車進(jìn)程實(shí)例的編號(hào), # action 字段是描述活動(dòng)的字符串。 Event = collections.namedtuple('Event', 'time proc action') def taxi_process(proc_num, trips_num, start_time=0): """ 每次改變狀態(tài)時(shí)創(chuàng)建事件,把控制權(quán)讓給仿真器 :param proc_num: :param trips_num: :param start_time: :return: """ time = yield Event(start_time, proc_num, 'leave garage') for i in range(trips_num): time = yield Event(time, proc_num, 'pick up people') time = yield Event(time, proc_num, 'drop off people') yield Event(time, proc_num, 'go home') class SimulateTaxi(object): """ 模擬出租車控制臺(tái) """ def __init__(self, proc_map): # 保存排定事件的 PriorityQueue 對象, # 如果進(jìn)來的是tuple類型,則默認(rèn)使用tuple[0]做排序 self.events = queue.PriorityQueue() # procs_map 參數(shù)是一個(gè)字典,使用dict構(gòu)建本地副本 self.procs = dict(proc_map) def run(self, end_time): """ 排定并顯示事件,直到時(shí)間結(jié)束 :param end_time: :return: """ for _, taxi_gen in self.procs.items(): leave_evt = next(taxi_gen) self.events.put(leave_evt) # 仿真系統(tǒng)的主循環(huán) simulate_time = 0 while simulate_time < end_time: if self.events.empty(): print('*** end of events ***') break # 第一個(gè)事件的發(fā)生 current_evt = self.events.get() simulate_time, proc_num, action = current_evt print('taxi:', proc_num, ', at time:', simulate_time, ', ', action) # 準(zhǔn)備下個(gè)事件的發(fā)生 proc_gen = self.procs[proc_num] next_simulate_time = simulate_time + self.compute_duration() try: next_evt = proc_gen.send(next_simulate_time) except StopIteration: del self.procs[proc_num] else: self.events.put(next_evt) else: msg = '*** end of simulation time: {} events pending ***' print(msg.format(self.events.qsize())) @staticmethod def compute_duration(): """ 隨機(jī)產(chǎn)生下個(gè)事件發(fā)生的時(shí)間 :return: """ duration_time = random.randint(1, 20) return duration_time # 生成3個(gè)出租車,現(xiàn)在全部都沒有離開garage taxis = {i: taxi_process(i, (i + 1) * 2, i * 5) for i in range(3)} # 模擬運(yùn)行 st = SimulateTaxi(taxis) st.run(100)
看完上述內(nèi)容,你們掌握如何在Python中使用協(xié)程的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!