協(xié)程函數(shù):async def?函數(shù)名。3.5+
站在用戶的角度思考問(wèn)題,與客戶深入溝通,找到博羅網(wǎng)站設(shè)計(jì)與博羅網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗(yàn),讓設(shè)計(jì)與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個(gè)性化、用戶體驗(yàn)好的作品,建站類型包括:成都做網(wǎng)站、成都網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、域名與空間、虛擬空間、企業(yè)郵箱。業(yè)務(wù)覆蓋博羅地區(qū)。
協(xié)程對(duì)象:執(zhí)行協(xié)程函數(shù)()得到的協(xié)程對(duì)象。
3.5之后的寫法:
3.7之后的寫法:更簡(jiǎn)便
await后面?跟?可等待的對(duì)象。(協(xié)程對(duì)象,F(xiàn)uture,Task對(duì)象?約等于IO等待)
await實(shí)例2:串行執(zhí)行。 一個(gè)協(xié)程函數(shù)里面可以支持多個(gè)await ,雖然會(huì)串行,但是如果有其他協(xié)程函數(shù),任務(wù)列表也在執(zhí)行,依然會(huì)切換。只是案例中的main對(duì)應(yīng)執(zhí)行的others1和others2串行 。 await會(huì)等待對(duì)象的值得到之后才繼續(xù)往下走。
如果你厭倦了多線程,不妨試試python的異步編程,再引入async, await關(guān)鍵字之后語(yǔ)法變得更加簡(jiǎn)潔和直觀,又經(jīng)過(guò)幾年的生態(tài)發(fā)展,現(xiàn)在是一個(gè)很不錯(cuò)的并發(fā)模型。
下面介紹一下python異步編程的方方面面。
因?yàn)镚IL的存在,所以Python的多線程在CPU密集的任務(wù)下顯得無(wú)力,但是對(duì)于IO密集的任務(wù),多線程還是足以發(fā)揮多線程的優(yōu)勢(shì)的,而異步也是為了應(yīng)對(duì)IO密集的任務(wù),所以兩者是一個(gè)可以相互替代的方案,因?yàn)樵O(shè)計(jì)的不同,理論上異步要比多線程快,因?yàn)楫惒降幕ㄤN更少, 因?yàn)椴恍枰~外系統(tǒng)申請(qǐng)額外的內(nèi)存,而線程的創(chuàng)建跟系統(tǒng)有關(guān),需要分配一定量的內(nèi)存,一般是幾兆,比如linux默認(rèn)是8MB。
雖然異步很好,比如可以使用更少的內(nèi)存,比如更好地控制并發(fā)(也許你并不這么認(rèn)為:))。但是由于async/await 語(yǔ)法的存在導(dǎo)致與之前的語(yǔ)法有些割裂,所以需要適配,需要付出額外的努力,再者就是生態(tài)遠(yuǎn)遠(yuǎn)沒(méi)有同步編程強(qiáng)大,比如很多庫(kù)還不支持異步,所以你需要一些額外的適配。
為了不給其他網(wǎng)站帶來(lái)困擾,這里首先在自己電腦啟動(dòng)web服務(wù)用于測(cè)試,代碼很簡(jiǎn)單。
本文所有依賴如下:
所有依賴可通過(guò)代碼倉(cāng)庫(kù)的requirements.txt一次性安裝。
首先看一個(gè)錯(cuò)誤的例子
輸出如下:
發(fā)現(xiàn)花費(fèi)了3秒,不符合預(yù)期呀。。。。這是因?yàn)殡m然用了協(xié)程,但是每個(gè)協(xié)程是串行的運(yùn)行,也就是說(shuō)后一個(gè)等前一個(gè)完成之后才開(kāi)始,那么這樣的異步代碼并沒(méi)有并發(fā),所以我們需要讓這些協(xié)程并行起來(lái)
為了讓代碼變動(dòng)的不是太多,所以這里用了一個(gè)笨辦法來(lái)等待所有任務(wù)完成, 之所以在main函數(shù)中等待是為了不讓ClientSession關(guān)閉, 如果你移除了main函數(shù)中的等待代碼會(huì)發(fā)現(xiàn)報(bào)告異常 RuntimeError: Session is closed ,而代碼里的解決方案非常的不優(yōu)雅,需要手動(dòng)的等待,為了解決這個(gè)問(wèn)題,我們?cè)俅胃倪M(jìn)代碼。
這里解決的方式是通過(guò) asyncio.wait 方法等待一個(gè)協(xié)程列表,默認(rèn)是等待所有協(xié)程結(jié)束后返回,會(huì)返回一個(gè)完成(done)列表,以及一個(gè)待辦(pending)列表。
如果我們不想要協(xié)程對(duì)象而是結(jié)果,那么我們可以使用 asyncio.gather
結(jié)果輸出如下:
通過(guò) asyncio.ensure_future 我們就能創(chuàng)建一個(gè)協(xié)程,跟調(diào)用一個(gè)函數(shù)差別不大,為了等待所有任務(wù)完成之后退出,我們需要使用 asyncio.wait 等方法來(lái)等待,如果只想要協(xié)程輸出的結(jié)果,我們可以使用 asyncio.gather 來(lái)獲取結(jié)果。
雖然前面能夠隨心所欲的創(chuàng)建協(xié)程,但是就像多線程一樣,我們也需要處理協(xié)程之間的同步問(wèn)題,為了保持語(yǔ)法及使用情況的一致,多線程中用到的同步功能,asyncio中基本也能找到, 并且用法基本一致,不一致的地方主要是需要用異步的關(guān)鍵字,比如 async with/ await 等
通過(guò)鎖讓并發(fā)慢下來(lái),讓協(xié)程一個(gè)一個(gè)的運(yùn)行。
輸出如下:
通過(guò)觀察很容易發(fā)現(xiàn),并發(fā)的速度因?yàn)殒i而慢下來(lái)了,因?yàn)槊看沃挥幸粋€(gè)協(xié)程能獲得鎖,所以并發(fā)變成了串行。
通過(guò)事件來(lái)通知特定的協(xié)程開(kāi)始工作,假設(shè)有一個(gè)任務(wù)是根據(jù)http響應(yīng)結(jié)果選擇是否激活。
輸出如下:
可以看到事件(Event)等待者都是在得到響應(yīng)內(nèi)容之后輸出,并且事件(Event)可以是多個(gè)協(xié)程同時(shí)等待。
上面的事件雖然很棒,能夠在不同的協(xié)程之間同步狀態(tài),并且也能夠一次性同步所有的等待協(xié)程,但是還不夠精細(xì)化,比如想通知指定數(shù)量的等待協(xié)程,這個(gè)時(shí)候Event就無(wú)能為力了,所以同步原語(yǔ)中出現(xiàn)了Condition。
輸出如下:
可以看到,前面兩個(gè)等待的協(xié)程是在同一時(shí)刻完成,而不是全部等待完成。
通過(guò)創(chuàng)建協(xié)程的數(shù)量來(lái)控制并發(fā)并不是非常優(yōu)雅的方式,所以可以通過(guò)信號(hào)量的方式來(lái)控制并發(fā)。
輸出如下:
可以發(fā)現(xiàn),雖然同時(shí)創(chuàng)建了三個(gè)協(xié)程,但是同一時(shí)刻只有兩個(gè)協(xié)程工作,而另外一個(gè)協(xié)程需要等待一個(gè)協(xié)程讓出信號(hào)量才能運(yùn)行。
無(wú)論是協(xié)程還是線程,任務(wù)之間的狀態(tài)同步還是很重要的,所以有了應(yīng)對(duì)各種同步機(jī)制的同步原語(yǔ),因?yàn)橐WC一個(gè)資源同一個(gè)時(shí)刻只能一個(gè)任務(wù)訪問(wèn),所以引入了鎖,又因?yàn)樾枰粋€(gè)任務(wù)等待另一個(gè)任務(wù),或者多個(gè)任務(wù)等待某個(gè)任務(wù),因此引入了事件(Event),但是為了更精細(xì)的控制通知的程度,所以又引入了條件(Condition), 通過(guò)條件可以控制一次通知多少的任務(wù)。
有時(shí)候的并發(fā)需求是通過(guò)一個(gè)變量控制并發(fā)任務(wù)的并發(fā)數(shù)而不是通過(guò)創(chuàng)建協(xié)程的數(shù)量來(lái)控制并發(fā),所以引入了信號(hào)量(Semaphore),這樣就可以在創(chuàng)建的協(xié)程數(shù)遠(yuǎn)遠(yuǎn)大于并發(fā)數(shù)的情況下讓協(xié)程在指定的并發(fā)量情況下并發(fā)。
不得不承認(rèn)異步編程相比起同步編程的生態(tài)要小的很多,所以不可能完全異步編程,因此需要一種方式兼容。
多線程是為了兼容同步得代碼。
多進(jìn)程是為了利用CPU多核的能力。
輸出如下:
可以看到總耗時(shí)1秒,說(shuō)明所有的線程跟進(jìn)程是同時(shí)運(yùn)行的。
下面是本人使用過(guò)的一些異步庫(kù),僅供參考
web框架
http客戶端
數(shù)據(jù)庫(kù)
ORM
雖然異步庫(kù)發(fā)展得還算不錯(cuò),但是中肯的說(shuō)并沒(méi)有覆蓋方方面面。
雖然我鼓勵(lì)大家嘗試異步編程,但是本文的最后卻是讓大家謹(jǐn)慎的選擇開(kāi)發(fā)環(huán)境,如果你覺(jué)得本文的并發(fā),同步,兼容多線程,多進(jìn)程不值得一提,那么我十分推薦你嘗試以異步編程的方式開(kāi)始一個(gè)新的項(xiàng)目,如果你對(duì)其中一些還有疑問(wèn)或者你確定了要使用的依賴庫(kù)并且大多數(shù)是沒(méi)有異步庫(kù)替代的,那么我還是建議你直接按照自己擅長(zhǎng)的同步編程開(kāi)始。
異步編程雖然很不錯(cuò),不過(guò),也許你并不需要。
最重要的是生成器函數(shù)碰到y(tǒng)ield停止執(zhí)行,收到next或send才會(huì)繼續(xù)執(zhí)行的機(jī)制。
而且send方法令我們可以傳遞值到生成器暫停的地方。
生成器執(zhí)行結(jié)束拋出 StopIteration 異常。
yield from用于把其他生成器當(dāng)做子例程調(diào)用。
然后它可以被其他用 async def 定義的的協(xié)程函數(shù)B和C await ,只有當(dāng) await 返回時(shí),B和C才繼續(xù)執(zhí)行。
這樣我們就可以有效地控制B和C的執(zhí)行順序。
然后我們創(chuàng)建了一個(gè)調(diào)度器,它對(duì)列表進(jìn)行了兩次深拷貝以避免問(wèn)題。它循環(huán)協(xié)程隊(duì)列,使用 send 方法對(duì)每個(gè)協(xié)程依次遞進(jìn),如果有協(xié)程已經(jīng)完成則將其移出隊(duì)列,當(dāng)列表中的協(xié)程全部完成時(shí)結(jié)束。
然后通過(guò) args=coro.send(None) 與該函數(shù)碰撞,得到含有 delay 參數(shù)的字典作為 send 的返回值。便可以判斷出是否調(diào)用調(diào)度器的睡眠機(jī)制。
最后在調(diào)度器中實(shí)現(xiàn)每一次協(xié)程列表循環(huán)結(jié)束后判斷在睡眠列表中的協(xié)程是否有到時(shí)間的,到時(shí)間或時(shí)間超出則添加到運(yùn)行協(xié)程列表中進(jìn)入循環(huán)執(zhí)行。如果運(yùn)行列表中的協(xié)程都執(zhí)行完了,則查看睡眠列表中的協(xié)程中還需睡眠的最少時(shí)間,線程睡眠,睡眠完成再將其添加到運(yùn)行隊(duì)列。
該裝飾器能將一個(gè)比較耗時(shí)的計(jì)算函數(shù)封裝為一個(gè)協(xié)程,使其可以被其他協(xié)程 await 。在調(diào)度器中利用 send 函數(shù)的返回值可以獲取它的類型為 background 、函數(shù)入口地址以及函數(shù)的傳參,然后在調(diào)度器中按相應(yīng)機(jī)制執(zhí)行。
第二部分 是在調(diào)度器中的修改:我們讓調(diào)度器類擁有了一個(gè)私有的 concurrent.futures.ThreadPoolExecutor() 對(duì)象。并在運(yùn)行協(xié)程隊(duì)列的循環(huán)判斷中將 background 類型的操作提交給線程池對(duì)象,并將當(dāng)前的協(xié)程移出運(yùn)行隊(duì)列,添加到 futures 隊(duì)列中。然后在每次運(yùn)行隊(duì)列循環(huán)后判斷 futures 中的任務(wù)是否有完成的(使用的參數(shù)為一旦有任一任務(wù)完成或被取消都返回),如果主線程此時(shí)處于將要睡眠的狀態(tài),就等待相應(yīng)的時(shí)間,沒(méi)有的話則立刻返回,下次再查詢,完成的任務(wù)將其所在協(xié)程帶入運(yùn)行隊(duì)列,任務(wù)結(jié)果通過(guò)調(diào)度器 send 傳回該協(xié)程。
yield相當(dāng)于return,他將相應(yīng)的值返回給調(diào)用next()或者send()的調(diào)用者,從而交出了CPU使用權(quán),而當(dāng)調(diào)用者再次調(diào)用next()或者send()的時(shí)候,又會(huì)返回到y(tǒng)ield中斷的地方,如果send有參數(shù),還會(huì)將參數(shù)返回給yield賦值的變量,如果沒(méi)有就和next()一樣賦值為None。但是這里會(huì)遇到一個(gè)問(wèn)題,就是嵌套使用generator時(shí)外層的generator需要寫大量代碼,看如下示例:?
注意以下代碼均在Python3.6上運(yùn)行調(diào)試
#!/usr/bin/env python# encoding:utf-8def inner_generator():
i = 0
while True:
i = yield i ? ? ? ?if i 10: ? ? ? ? ? ?raise StopIterationdef outer_generator():
print("do something before yield")
from_inner = 0
from_outer = 1
g = inner_generator()
g.send(None) ? ?while 1: ? ? ? ?try:
from_inner = g.send(from_outer)
from_outer = yield from_inner ? ? ? ?except StopIteration: ? ? ? ? ? ?breakdef main():
g = outer_generator()
g.send(None)
i = 0
while 1: ? ? ? ?try:
i = g.send(i + 1)
print(i) ? ? ? ?except StopIteration: ? ? ? ? ? ?breakif __name__ == '__main__':
main()1234567891011121314151617181920212223242526272829303132333435363738394041
為了簡(jiǎn)化,在Python3.3中引入了yield from
yield from
使用yield from有兩個(gè)好處,
1、可以將main中send的參數(shù)一直返回給最里層的generator,?
2、同時(shí)我們也不需要再使用while循環(huán)和send (), next()來(lái)進(jìn)行迭代。
我們可以將上邊的代碼修改如下:
def inner_generator():
i = 0
while True:
i = yield i ? ? ? ?if i 10: ? ? ? ? ? ?raise StopIterationdef outer_generator():
print("do something before coroutine start") ? ?yield from inner_generator()def main():
g = outer_generator()
g.send(None)
i = 0
while 1: ? ? ? ?try:
i = g.send(i + 1)
print(i) ? ? ? ?except StopIteration: ? ? ? ? ? ?breakif __name__ == '__main__':
main()1234567891011121314151617181920212223242526
執(zhí)行結(jié)果如下:
do something before coroutine start123456789101234567891011
這里inner_generator()中執(zhí)行的代碼片段我們實(shí)際就可以認(rèn)為是協(xié)程,所以總的來(lái)說(shuō)邏輯圖如下:?
接下來(lái)我們就看下究竟協(xié)程是啥樣子
協(xié)程coroutine
協(xié)程的概念應(yīng)該是從進(jìn)程和線程演變而來(lái)的,他們都是獨(dú)立的執(zhí)行一段代碼,但是不同是線程比進(jìn)程要輕量級(jí),協(xié)程比線程還要輕量級(jí)。多線程在同一個(gè)進(jìn)程中執(zhí)行,而協(xié)程通常也是在一個(gè)線程當(dāng)中執(zhí)行。它們的關(guān)系圖如下:
我們都知道Python由于GIL(Global Interpreter Lock)原因,其線程效率并不高,并且在*nix系統(tǒng)中,創(chuàng)建線程的開(kāi)銷并不比進(jìn)程小,因此在并發(fā)操作時(shí),多線程的效率還是受到了很大制約的。所以后來(lái)人們發(fā)現(xiàn)通過(guò)yield來(lái)中斷代碼片段的執(zhí)行,同時(shí)交出了cpu的使用權(quán),于是協(xié)程的概念產(chǎn)生了。在Python3.4正式引入了協(xié)程的概念,代碼示例如下:
import asyncio# Borrowed from countdown(number, n):
while n 0:
print('T-minus', n, '({})'.format(number)) ? ? ? ?yield from asyncio.sleep(1)
n -= 1loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(countdown("A", 2)),
asyncio.ensure_future(countdown("B", 3))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()12345678910111213141516
示例顯示了在Python3.4引入兩個(gè)重要概念協(xié)程和事件循環(huán),?
通過(guò)修飾符@asyncio.coroutine定義了一個(gè)協(xié)程,而通過(guò)event loop來(lái)執(zhí)行tasks中所有的協(xié)程任務(wù)。之后在Python3.5引入了新的async await語(yǔ)法,從而有了原生協(xié)程的概念。
async await
在Python3.5中,引入了ayncawait 語(yǔ)法結(jié)構(gòu),通過(guò)”aync def”可以定義一個(gè)協(xié)程代碼片段,作用類似于Python3.4中的@asyncio.coroutine修飾符,而await則相當(dāng)于”yield from”。
先來(lái)看一段代碼,這個(gè)是我剛開(kāi)始使用asyncawait語(yǔ)法時(shí),寫的一段小程序。
#!/usr/bin/env python# encoding:utf-8import asyncioimport requestsimport time
async def wait_download(url):
response = await requets.get(url)
print("get {} response complete.".format(url))
async def main():
start = time.time()
await asyncio.wait([
wait_download(""),
wait_download(""),
wait_download("")])
end = time.time()
print("Complete in {} seconds".format(end - start))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())12345678910111213141516171819202122232425
這里會(huì)收到這樣的報(bào)錯(cuò):
Task exception was never retrieved
future: Task finished coro=wait_download() done, defined at asynctest.py:9 exception=TypeError("object Response can't be used in 'await' expression",)
Traceback (most recent call last):
File "asynctest.py", line 10, in wait_download
data = await requests.get(url)
TypeError: object Response can't be used in 'await' expression123456
這是由于requests.get()函數(shù)返回的Response對(duì)象不能用于await表達(dá)式,可是如果不能用于await,還怎么樣來(lái)實(shí)現(xiàn)異步呢??
原來(lái)Python的await表達(dá)式是類似于”yield from”的東西,但是await會(huì)去做參數(shù)檢查,它要求await表達(dá)式中的對(duì)象必須是awaitable的,那啥是awaitable呢? awaitable對(duì)象必須滿足如下條件中其中之一:
1、A native coroutine object returned from a native coroutine function .
原生協(xié)程對(duì)象
2、A generator-based coroutine object returned from a function decorated with types.coroutine() .
types.coroutine()修飾的基于生成器的協(xié)程對(duì)象,注意不是Python3.4中asyncio.coroutine
3、An object with an await method returning an iterator.
實(shí)現(xiàn)了await method,并在其中返回了iterator的對(duì)象
根據(jù)這些條件定義,我們可以修改代碼如下:
#!/usr/bin/env python# encoding:utf-8import asyncioimport requestsimport time
async def download(url): # 通過(guò)async def定義的函數(shù)是原生的協(xié)程對(duì)象
response = requests.get(url)
print(response.text)
async def wait_download(url):
await download(url) # 這里download(url)就是一個(gè)原生的協(xié)程對(duì)象
print("get {} data complete.".format(url))
async def main():
start = time.time()
await asyncio.wait([
wait_download(""),
wait_download(""),
wait_download("")])
end = time.time()
print("Complete in {} seconds".format(end - start))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())123456789101112131415161718192021222324252627282930
好了現(xiàn)在一個(gè)真正的實(shí)現(xiàn)了異步編程的小程序終于誕生了。?
而目前更牛逼的異步是使用uvloop或者pyuv,這兩個(gè)最新的Python庫(kù)都是libuv實(shí)現(xiàn)的,可以提供更加高效的event loop。
uvloop和pyuv
pyuv實(shí)現(xiàn)了Python2.x和3.x,但是該項(xiàng)目在github上已經(jīng)許久沒(méi)有更新了,不知道是否還有人在維護(hù)。?
uvloop只實(shí)現(xiàn)了3.x, 但是該項(xiàng)目在github上始終活躍。
它們的使用也非常簡(jiǎn)單,以u(píng)vloop為例,只需要添加以下代碼就可以了
import asyncioimport uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())123