真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Python混合如何使用同步和異步函數(shù)

這篇文章主要介紹“Python混合如何使用同步和異步函數(shù)”,在日常操作中,相信很多人在Python混合如何使用同步和異步函數(shù)問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Python混合如何使用同步和異步函數(shù)”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

在滁州等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都做網(wǎng)站、網(wǎng)站建設(shè) 網(wǎng)站設(shè)計制作按需定制開發(fā),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站制作,全網(wǎng)營銷推廣,成都外貿(mào)網(wǎng)站制作,滁州網(wǎng)站建設(shè)費用合理。

    在協(xié)程函數(shù)中調(diào)用同步函數(shù)

    在協(xié)程函數(shù)中直接調(diào)用同步函數(shù)會阻塞事件循環(huán),從而影響整個程序的性能。我們先來看一個例子:

    以下是使用異步 Web 框架 FastAPI 寫的一個例子,F(xiàn)astAPI 是比較快,但不正確的操作將會變得很慢。

    import time
    
    from fastapi import FastAPI
    
    app = FastAPI()
    
    
    @app.get("/")
    async def root():
        time.sleep(10)
        return {"message": "Hello World"}
    
    
    @app.get("/health")
    async def health():
        return {"status": "ok"}

    上面我們寫了兩個接口,假設(shè) root 接口函數(shù)耗時 10 秒,在這 10 秒內(nèi)訪問 health 接口,想一想會發(fā)生什么?

    Python混合如何使用同步和異步函數(shù)

    訪問 root 接口(左),立即訪問 health 接口(右),health 接口被阻塞,直至 root 接口返回后,health 接口才成功響應(yīng)。

    time.sleep 就是一個「同步」函數(shù),它會阻塞整個事件循環(huán)。

    如何解決呢?想一想以前的處理方法,如果一個函數(shù)會阻塞主線程,那么就再開一個線程讓這個阻塞函數(shù)單獨運行。所以,這里也是同理,開一個線程單獨去運行那些阻塞式操作,比如讀取文件等。

    loop.run_in_executor 方法將同步函數(shù)轉(zhuǎn)換為異步非阻塞方式進行處理。具體來說,loop.run_in_executor() 可以將同步函數(shù)創(chuàng)建為一個線程進程,并在其中執(zhí)行該函數(shù),從而避免阻塞事件循環(huán)。

    官方例子:在線程或者進程池中執(zhí)行代碼。

    那么,我們使用 loop.run_in_executor 改寫上面例子,如下:

    import asyncio
    import time
    
    from fastapi import FastAPI
    
    app = FastAPI()
    
    
    @app.get("/")
    async def root():
        loop = asyncio.get_event_loop()
    
        def do_blocking_work():
            time.sleep(10)
            print("Done blocking work!!")
    
        await loop.run_in_executor(None, do_blocking_work)
        return {"message": "Hello World"}
    
    
    @app.get("/health")
    async def health():
        return {"status": "ok"}

    效果如下:

    Python混合如何使用同步和異步函數(shù)

    root 接口被阻塞期間,health 依然正常訪問互不影響。

    注意:這里都是為了演示,實際在使用 FastAPI 開發(fā)時,你可以直接將 async def root 更換成 def root ,也就是將其換成同步接口函數(shù),F(xiàn)astAPI 內(nèi)部會自動創(chuàng)建線程處理這個同步接口函數(shù)??偟膩碚f,F(xiàn)astAPI 內(nèi)部也是依靠線程去處理同步函數(shù)從而避免阻塞主線程(或主線程中的事件循環(huán))。

    在同步函數(shù)中調(diào)用異步函數(shù)

    協(xié)程只能在「事件循環(huán)」內(nèi)被執(zhí)行,且同一時刻只能有一個協(xié)程被執(zhí)行。

    所以,在同步函數(shù)中調(diào)用異步函數(shù),其本質(zhì)就是將協(xié)程「扔進」事件循環(huán)中,等待該協(xié)程執(zhí)行完獲取結(jié)果即可。

    以下這些函數(shù),都可以實現(xiàn)這個效果:

    • asyncio.run

    • asyncio.run_coroutine_threadsafe

    • loop.run_until_complete

    • create_task

    接下來,我們將一一講解這些方法并舉例說明。

    asyncio.run

    這個方法使用起來最簡單,先看下如何使用,然后緊跟著講一下哪些場景不能直接使用 asyncio.run

    import asyncio
    
    async def do_work():
        return 1
    
    def main():
        result = asyncio.run(do_work())
        print(result)  # 1
    
    if __name__ == "__main__":
        main()

    直接 run 就完事了,然后接受返回值即可。

    但是需要,注意的是 asyncio.run 每次調(diào)用都會新開一個事件循環(huán),當(dāng)結(jié)束時自動關(guān)閉該事件循環(huán)。

    一個線程內(nèi)只存在一個事件循環(huán),所以如果當(dāng)前線程已經(jīng)有存在的事件循環(huán)了,就不應(yīng)該使用 asyncio.run 了,否則就會拋出如下異常:

    RuntimeError: asyncio.run() cannot be called from a running event loop

    因此,asyncio.run 用作新開一個事件循環(huán)時使用。

    asyncio.run_coroutine_threadsafe

    向指定事件循環(huán)提交一個協(xié)程。(線程安全)
    返回一個 concurrent.futures.Future 以等待來自其他 OS 線程的結(jié)果。

    換句話說,就是將協(xié)程丟給其他線程中的事件循環(huán)去運行。

    值得注意的是這里的「事件循環(huán)」應(yīng)該是其他線程中的事件循環(huán),非當(dāng)前線程的事件循環(huán)。

    其返回的結(jié)果是一個 future 對象,如果你需要獲取協(xié)程的執(zhí)行結(jié)果可以使用 future.result() 獲取

    下方給了一個例子,一共有兩個線程:thread_with_loopanother_thread,分別用于啟動事件循環(huán)和調(diào)用 run_coroutine_threadsafe

    import asyncio
    import threading
    import time
    
    loop = None
    
    
    def get_loop():
        global loop
        if loop is None:
            loop = asyncio.new_event_loop()
        return loop
    
    
    def another_thread():
        async def coro_func():
            return 1
    
        loop = get_loop()
        # 將協(xié)程提交到另一個線程的事件循環(huán)中執(zhí)行
        future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
        # 等待協(xié)程執(zhí)行結(jié)果
        print(future.result())
        # 停止事件循環(huán)
        loop.call_soon_threadsafe(loop.stop)
    
    
    def thread_with_loop():
        loop = get_loop()
        # 啟動事件循環(huán),確保事件循環(huán)不會退出,直到 loop.stop() 被調(diào)用
        loop.run_forever()
        loop.close()
    
    
    # 啟動一個線程,線程內(nèi)部啟動了一個事件循環(huán)
    threading.Thread(target=thread_with_loop).start()
    time.sleep(1)
    # 在主線程中啟動一個協(xié)程, 并將協(xié)程提交到另一個線程的事件循環(huán)中執(zhí)行
    t = threading.Thread(target=another_thread)
    t.start()
    t.join()
    loop.run_until_complete

    運行直到 future ( Future 的實例 ) 被完成。

    這個方法和 asyncio.run 類似。

    具體就是傳入一個協(xié)程對象或者任務(wù),然后可以直接拿到協(xié)程的返回值。

    run_until_complete 屬于 loop 對象的方法,所以這個方法的使用前提是有一個事件循環(huán),注意這個事件循環(huán)必須是非運行狀態(tài),如果是運行中就會拋出如下異常:

    RuntimeError: This event loop is already running

    例子:

    loop = asyncio.new_event_loop()
    loop.run_until_complete(do_async_work())
    create_task

    再次準確一點:要運行一個協(xié)程函數(shù)的本質(zhì)是將攜帶協(xié)程函數(shù)的任務(wù)提交至事件循環(huán)中,由事件循環(huán)發(fā)現(xiàn)、調(diào)度并執(zhí)行。

    其實一共就是滿足兩個條件:

    • 任務(wù);

    • 事件循環(huán)。

    我們使用 async def func 定義的函數(shù)叫做協(xié)程函數(shù),func() 這樣調(diào)用之后返回的結(jié)果是協(xié)程對象,到這一步協(xié)程函數(shù)內(nèi)的代碼都沒有被執(zhí)行,直到協(xié)程對象被包裝成了任務(wù),事件循環(huán)才會“正眼看它們”。

    所以事件循環(huán)調(diào)度運行的基本單元就是任務(wù),那為什么我們在使用 async/await 這些語句時沒有涉及到任務(wù)這個概念呢?

    這是因為 await 語法糖在內(nèi)部將協(xié)程對象封裝成了任務(wù),再次強調(diào)事件循環(huán)只認識任務(wù)。

    所以,想要運行一個協(xié)程對象,其實就是將協(xié)程對象封裝成一個任務(wù),至于事件循環(huán)是如何發(fā)現(xiàn)、調(diào)度和執(zhí)行的,這個我們不用關(guān)心。

    那將協(xié)程封裝成的任務(wù)的方法有哪些呢?

    • asyncio.create_task

    • asyncio.ensure_future

    • loop.create_task

    看著有好幾個的,沒關(guān)系,我們只關(guān)心 loop.create_task,因為其他方法最終都是調(diào)用 loop.create_task。

    使用起來也是很簡單的,將協(xié)程對象傳入,返回值是一個任務(wù)對象。

    async def do_work():
        return 222
    
    task = loop.create_task(do_work())

    do_work 會被異步執(zhí)行,那么 do_work 的結(jié)果怎么獲取呢,task.result() 可以嗎?

    分情況:

    • 如果是在一個協(xié)程函數(shù)內(nèi)使用 await task.result(),這是可以的;

    • 如果是在普通函數(shù)內(nèi)則不行。你不可能立即獲得協(xié)程函數(shù)的返回值,因為協(xié)程函數(shù)還沒有被執(zhí)行呢。

    asyncio.Task 運行使用 add_done_callback 添加完成時的回調(diào)函數(shù),所以我們可以「曲線救國」,使用回調(diào)函數(shù)將結(jié)果添加到隊列、Future 等等。

    我這里給個基于 concurrent.futures.Future 獲取結(jié)果的例子,如下:

    import asyncio
    from asyncio import Task
    from concurrent.futures import Future
    
    from fastapi import FastAPI
    
    app = FastAPI()
    loop = asyncio.get_event_loop()
    
    
    async def do_work1():
        return 222
    
    
    @app.get("/")
    def root():
        # 新建一個 future 對象,用于接受結(jié)果值
        future = Future()
    
        # 提交任務(wù)至事件循環(huán)
        task = loop.create_task(do_work1())
    
        # 回調(diào)函數(shù)
        def done_callback(task: Task):
            # 設(shè)置結(jié)果
            future.set_result(task.result())
    
        # 為這個任務(wù)添加回調(diào)函數(shù)
        task.add_done_callback(done_callback)
    
        # future.result 會被阻塞,直到有結(jié)果返回為止
        return future.result()  # 222

    到此,關(guān)于“Python混合如何使用同步和異步函數(shù)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
    當(dāng)前題目:Python混合如何使用同步和異步函數(shù)
    分享URL:http://weahome.cn/article/ggeheg.html

    其他資訊

    在線咨詢

    微信咨詢

    電話咨詢

    028-86922220(工作日)

    18980820575(7×24)

    提交需求

    返回頂部