自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Python 異步協(xié)程:從 async/await 到 asyncio 再到 async with

開發(fā) 前端
異步編程不是萬能的,但在處理 I/O 密集型任務(wù)時(shí)確實(shí)能帶來顯著的性能提升。合理使用這些工具,能讓我們的程序更高效、更優(yōu)雅。

在 Python 3.8 以后的版本中,異步編程變得越來越重要。本文將系統(tǒng)介紹 Python 標(biāo)準(zhǔn)庫中的異步編程工具,帶領(lǐng)大家掌握 async/await 語法和 asyncio 的使用。

從一個(gè)簡單的場景開始

假設(shè)我們在處理一些耗時(shí)的 I/O 操作,比如讀取多個(gè)文件或處理多個(gè)數(shù)據(jù)。為了模擬這種場景,我們先用 time.sleep() 來代表耗時(shí)操作:

import time
import random

def process_item(item):
    # 模擬耗時(shí)操作
    print(f"處理中:{item}")
    process_time = random.uniform(0.5, 2.0)
    time.sleep(process_time)
    return f"處理完成:{item},耗時(shí) {process_time:.2f} 秒"

def process_all_items():
    items = ["任務(wù)A", "任務(wù)B", "任務(wù)C", "任務(wù)D"]
    results = []
    for item in items:
        result = process_item(item)
        results.append(result)
    return results

if __name__ == "__main__":
    start = time.time()
    results = process_all_items()
    end = time.time()
    
    print("\n".join(results))
    print(f"總耗時(shí):{end - start:.2f} 秒")
處理中:任務(wù)A
處理中:任務(wù)B
處理中:任務(wù)C
處理中:任務(wù)D
處理完成:任務(wù)A,耗時(shí) 1.97 秒
處理完成:任務(wù)B,耗時(shí) 1.28 秒
處理完成:任務(wù)C,耗時(shí) 0.66 秒
處理完成:任務(wù)D,耗時(shí) 1.80 秒
總耗時(shí):5.72 秒

這段代碼的問題很明顯:每個(gè)任務(wù)都必須等待前一個(gè)任務(wù)完成才能開始。如果有4個(gè)任務(wù),每個(gè)任務(wù)平均耗時(shí)1秒,那么總耗時(shí)就接近4秒。

認(rèn)識(shí) async/await

Python 引入了 async/await 語法來支持異步編程。當(dāng)我們在函數(shù)定義前加上 async 關(guān)鍵字時(shí),這個(gè)函數(shù)就變成了一個(gè)"協(xié)程"(coroutine)。而 await 關(guān)鍵字則用于等待一個(gè)協(xié)程完成。讓我們改寫上面的代碼:

import asyncio
import random
import time

async def process_item(item):
    print(f"處理中:{item}")
    # async 定義的函數(shù)變成了協(xié)程
    process_time = random.uniform(0.5, 2.0)
    # time.sleep() 換成 asyncio.sleep()
    await asyncio.sleep(process_time)  # await 等待異步操作完成
    return f"處理完成:{item},耗時(shí) {process_time:.2f} 秒"

async def process_all_items():
    items = ["任務(wù)A", "任務(wù)B", "任務(wù)C", "任務(wù)D"]
    # 創(chuàng)建任務(wù)列表
    tasks = [
        asyncio.create_task(process_item(item))
        for item in items
    ]
    print("開始處理")
    results = await asyncio.gather(*tasks)
    return results

async def main():
    start = time.time()
    results = await process_all_items()
    end = time.time()
    
    print("\n".join(results))
    print(f"總耗時(shí):{end - start:.2f} 秒")

if __name__ == "__main__":
    asyncio.run(main())
開始處理
處理中:任務(wù)A
處理中:任務(wù)B
處理中:任務(wù)C
處理中:任務(wù)D
處理完成:任務(wù)A,耗時(shí) 1.97 秒
處理完成:任務(wù)B,耗時(shí) 0.80 秒
處理完成:任務(wù)C,耗時(shí) 0.83 秒
處理完成:任務(wù)D,耗時(shí) 1.46 秒
總耗時(shí):1.97 秒

讓我們詳細(xì)解釋這段代碼的執(zhí)行過程:

  1. 當(dāng)函數(shù)被 async 關(guān)鍵字修飾后,調(diào)用該函數(shù)不會(huì)直接執(zhí)行函數(shù)體,而是返回一個(gè)協(xié)程對象
  2. await 關(guān)鍵字只能在 async 函數(shù)內(nèi)使用,它表示"等待這個(gè)操作完成后再繼續(xù)"
  3. asyncio.create_task() 將協(xié)程包裝成一個(gè)任務(wù),該任務(wù)會(huì)被事件循環(huán)調(diào)度執(zhí)行
  4. asyncio.gather() 并發(fā)運(yùn)行多個(gè)任務(wù),并等待它們?nèi)客瓿?/li>
  5. asyncio.run() 創(chuàng)建事件循環(huán),運(yùn)行 main() 協(xié)程,直到它完成

使用 asyncio.wait_for 添加超時(shí)控制

在實(shí)際應(yīng)用中,我們往往需要為異步操作設(shè)置超時(shí)時(shí)間:

import asyncio
import random
import time

async def process_item(item):
    process_time = random.uniform(0.5, 2.0)
    try:
        # 設(shè)置1秒超時(shí)
        await asyncio.wait_for(
            asyncio.sleep(process_time),
            timeout=1.0
        )
        return f"處理完成:{item},耗時(shí) {process_time:.2f} 秒"
    except asyncio.TimeoutError:
        return f"處理超時(shí):{item}"

async def main():
    items = ["任務(wù)A", "任務(wù)B", "任務(wù)C", "任務(wù)D"]
    tasks = [
        asyncio.create_task(process_item(item))
        for item in items
    ]
    
    start = time.time()
    results = await asyncio.gather(*tasks, return_exceptions=True)
    end = time.time()
    
    print("\n".join(results))
    print(f"總耗時(shí):{end - start:.2f} 秒")

if __name__ == "__main__":
    asyncio.run(main())
處理超時(shí):任務(wù)A
處理完成:任務(wù)B,耗時(shí) 0.94 秒
處理超時(shí):任務(wù)C
處理完成:任務(wù)D,耗時(shí) 0.78 秒
總耗時(shí):1.00 秒

使用異步上下文管理器

Python 中的 with 語句可以用于資源管理,類似地,異步編程中我們可以使用 async with 。一個(gè)類要支持異步上下文管理,需要實(shí)現(xiàn) __aenter__ 和 __aexit__ 方法:

import asyncio
import random

class AsyncResource:
    async def __aenter__(self):
        # 異步初始化資源
        print("正在初始化資源...")
        await asyncio.sleep(0.1)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 異步清理資源
        print("正在清理資源...")
        await asyncio.sleep(0.1)
    
    async def process(self, item):
        # 異步處理任務(wù)
        print(f"正在處理任務(wù):{item}")
        process_time = random.uniform(0.5, 2.0)
        await asyncio.sleep(process_time)
        return f"處理完成:{item},耗時(shí) {process_time:.2f} 秒"

async def main():
    items = ["任務(wù)A", "任務(wù)B", "任務(wù)C"]
    
    async with AsyncResource() as resource:
        tasks = [
            asyncio.create_task(resource.process(item))
            for item in items
        ]
        results = await asyncio.gather(*tasks)
    
    print("\n".join(results))

if __name__ == "__main__":
    asyncio.run(main())
正在初始化資源...
正在處理任務(wù):任務(wù)A
正在處理任務(wù):任務(wù)B
正在處理任務(wù):任務(wù)C
正在清理資源...
處理完成:任務(wù)A,耗時(shí) 1.31 秒
處理完成:任務(wù)B,耗時(shí) 0.77 秒
處理完成:任務(wù)C,耗時(shí) 0.84 秒

使用事件循環(huán)執(zhí)行阻塞操作 run_in_executor

在異步編程中,我們可能會(huì)遇到一些無法避免的阻塞操作(比如調(diào)用傳統(tǒng)的同步API)。這時(shí),asyncio.get_running_loop() 和 run_in_executor 就顯得特別重要:

import asyncio
import time
import requests  # 一個(gè)同步的HTTP客戶端庫

async def blocking_operation():
    # 獲取當(dāng)前事件循環(huán)
    loop = asyncio.get_running_loop()

    # 在線程池中執(zhí)行阻塞操作
    result = await loop.run_in_executor(
        None,  # 使用默認(rèn)的線程池執(zhí)行器
        requests.get,  # 要執(zhí)行的阻塞函數(shù)
        'http://httpbin.org/delay/1'  # 函數(shù)參數(shù)
    )
    return result.status_code

async def non_blocking_operation():
    await asyncio.sleep(1)
    return "非阻塞操作完成"

async def main():
    # 同時(shí)執(zhí)行阻塞和非阻塞操作
    tasks = [
        asyncio.create_task(blocking_operation()),
        asyncio.create_task(non_blocking_operation())
    ]
    
    start = time.time()
    results = await asyncio.gather(*tasks)
    end = time.time()
    
    print(f"操作結(jié)果:{results}")
    print(f"總耗時(shí):{end - start:.2f} 秒")

if __name__ == "__main__":
    asyncio.run(main())

輸出:

操作結(jié)果:[200, '非阻塞操作完成']
總耗時(shí):1.99 秒

這個(gè)例子展示了如何在異步程序中優(yōu)雅地處理同步操作。如果不使用 run_in_executor,阻塞操作會(huì)阻塞整個(gè)事件循環(huán),導(dǎo)致其他任務(wù)無法執(zhí)行:

  • requests.get() 是同步操作,會(huì)阻塞當(dāng)前線程
  • 事件循環(huán)運(yùn)行在主線程上
  • 如果直接在協(xié)程中調(diào)用 requests.get() ,整個(gè)事件循環(huán)都會(huì)被阻塞
  • 其他任務(wù)無法在這期間執(zhí)行
  • run_in_executor 會(huì)將阻塞操作放到另一個(gè)線程中執(zhí)行
  • 主線程的事件循環(huán)可以繼續(xù)處理其他任務(wù)
  • 當(dāng)線程池中的操作完成時(shí),結(jié)果會(huì)被返回給事件循環(huán)

最佳實(shí)踐是:

  • 盡量使用原生支持異步的庫(如 aiohttp)
  • 如果必須使用同步庫,就用 run_in_executor
  • 對于 CPU 密集型任務(wù)也可以用 run_in_executor 放到進(jìn)程池中執(zhí)行

任務(wù)取消:優(yōu)雅地終止異步操作

有時(shí)我們需要取消正在執(zhí)行的異步任務(wù),比如用戶中斷操作或超時(shí)處理:

import asyncio
import random

async def long_operation(name):
    try:
        print(f"{name} 開始執(zhí)行")
        while True:  # 模擬一個(gè)持續(xù)運(yùn)行的操作
            await asyncio.sleep(0.5)
            print(f"{name} 正在執(zhí)行...")
    except asyncio.CancelledError:
        print(f"{name} 被取消了")
        raise  # 重要:繼續(xù)傳播取消信號(hào)

async def main():
    # 創(chuàng)建三個(gè)任務(wù)
    task1 = asyncio.create_task(long_operation("任務(wù)1"))
    task2 = asyncio.create_task(long_operation("任務(wù)2"))
    task3 = asyncio.create_task(long_operation("任務(wù)3"))
    
    # 等待1秒后取消task1
    await asyncio.sleep(1)
    task1.cancel()
    
    # 等待2秒后取消其余任務(wù)
    await asyncio.sleep(1)
    task2.cancel()
    task3.cancel()
    
    try:
        # 等待所有任務(wù)完成或被取消
        await asyncio.gather(task1, task2, task3, return_exceptions=True)
    except asyncio.CancelledError:
        print("某個(gè)任務(wù)被取消了")

if __name__ == "__main__":
    asyncio.run(main())

輸出:

任務(wù)1 開始執(zhí)行
任務(wù)2 開始執(zhí)行
任務(wù)3 開始執(zhí)行
任務(wù)1 正在執(zhí)行...
任務(wù)2 正在執(zhí)行...
任務(wù)3 正在執(zhí)行...
任務(wù)1 被取消了
任務(wù)2 正在執(zhí)行...
任務(wù)3 正在執(zhí)行...
任務(wù)2 正在執(zhí)行...
任務(wù)3 正在執(zhí)行...
任務(wù)2 被取消了
任務(wù)3 被取消了

這個(gè)例子展示了如何正確處理任務(wù)取消:

  1. 任務(wù)可以在執(zhí)行過程中被取消
  2. 被取消的任務(wù)會(huì)拋出 CancelledError
  3. 我們應(yīng)該適當(dāng)處理取消信號(hào),確保資源被正確清理

深入理解協(xié)程:為什么需要 async/await?

協(xié)程(Coroutine)是一種特殊的函數(shù),它可以在執(zhí)行過程中暫停,并在之后從暫停的地方繼續(xù)執(zhí)行。當(dāng)我們使用 async 定義一個(gè)函數(shù)時(shí),我們實(shí)際上是在定義一個(gè)協(xié)程:

import asyncio

# 這是一個(gè)普通函數(shù)
def normal_function():
    return "Hello"

# 這是一個(gè)協(xié)程
async def coroutine_function():
    await asyncio.sleep(1)
    return "Hello"

# 讓我們看看它們的區(qū)別
print(normal_function)      # <function normal_function at 0x1052cc040>
print(coroutine_function)   # <function coroutine_function at 0x1054b9790>

# 調(diào)用它們的結(jié)果不同
print(normal_function())    # 直接返回: "Hello"
print(coroutine_function()) # RuntimeWarning: coroutine 'coroutine_function' was never awaited
# <coroutine object coroutine_function at 0x105962e40>

await 如何與事件循環(huán)協(xié)作

協(xié)程(Coroutine)的核心在于它可以在執(zhí)行過程中主動(dòng)交出控制權(quán),讓其他代碼有機(jī)會(huì)執(zhí)行。讓我們通過一個(gè)詳細(xì)的例子來理解這個(gè)過程:

import asyncio

async def task1():
    print("任務(wù)1:開始")
    print("任務(wù)1:準(zhǔn)備休眠")
    await asyncio.sleep(2)  # 關(guān)鍵點(diǎn)1:交出控制權(quán)
    print("任務(wù)1:休眠結(jié)束")

async def task2():
    print("任務(wù)2:開始")
    print("任務(wù)2:準(zhǔn)備休眠")
    await asyncio.sleep(1)  # 關(guān)鍵點(diǎn)2:交出控制權(quán)
    print("任務(wù)2:休眠結(jié)束")

async def main():
    # 同時(shí)執(zhí)行兩個(gè)任務(wù)
    await asyncio.gather(task1(), task2())

asyncio.run(main())

這段代碼的輸出會(huì)是:

任務(wù)1:開始
任務(wù)1:準(zhǔn)備休眠
任務(wù)2:開始
任務(wù)2:準(zhǔn)備休眠
任務(wù)2:休眠結(jié)束    # 1秒后
任務(wù)1:休眠結(jié)束    # 2秒后

讓我們詳細(xì)解釋執(zhí)行過程:

  • 當(dāng)程序遇到 await asyncio.sleep(2) 時(shí):

這個(gè) sleep 操作被注冊到事件循環(huán)中

Python 記錄當(dāng)前的執(zhí)行位置

task1 主動(dòng)交出控制權(quán)

重要:task1 并沒有停止運(yùn)行,而是被暫停了,等待之后恢復(fù)

  • 事件循環(huán)接管控制權(quán)后:

尋找其他可以執(zhí)行的協(xié)程(這里是 task2)

開始執(zhí)行 task2,直到遇到 await asyncio.sleep(1)

task2 也交出控制權(quán),被暫停

  • 事件循環(huán)繼續(xù)工作:

管理一個(gè)計(jì)時(shí)器,追蹤這兩個(gè) sleep 操作

1秒后,發(fā)現(xiàn) task2 的 sleep 時(shí)間到了

恢復(fù) task2 的執(zhí)行,打印"任務(wù)2:休眠結(jié)束"

2秒到時(shí),恢復(fù) task1 的執(zhí)行,打印"任務(wù)1:休眠結(jié)束"

這就像是一個(gè)指揮家(事件循環(huán))在指揮一個(gè)管弦樂隊(duì)(多個(gè)協(xié)程):

  • 當(dāng)某個(gè)樂器(協(xié)程)需要休息時(shí),它舉手示意(await)
  • 指揮家看到后,立即指揮其他樂器演奏
  • 當(dāng)休息時(shí)間到了,指揮家會(huì)示意這個(gè)樂器繼續(xù)演奏

代碼驗(yàn)證:

import asyncio
import time

async def report_time(name, sleep_time):
    print(f"{time.strftime('%H:%M:%S')} - {name}開始")
    await asyncio.sleep(sleep_time)
    print(f"{time.strftime('%H:%M:%S')} - {name}結(jié)束")

async def main():
    # 同時(shí)執(zhí)行多個(gè)任務(wù)
    await asyncio.gather(
        report_time("任務(wù)A", 2),
        report_time("任務(wù)B", 1),
        report_time("任務(wù)C", 3)
    )

asyncio.run(main())

輸出:

00:19:26 - 任務(wù)A開始
00:19:26 - 任務(wù)B開始
00:19:26 - 任務(wù)C開始
00:19:27 - 任務(wù)B結(jié)束
00:19:28 - 任務(wù)A結(jié)束
00:19:29 - 任務(wù)C結(jié)束

這種機(jī)制的優(yōu)勢在于:

  1. 單線程執(zhí)行,沒有線程切換開銷
  2. 協(xié)程主動(dòng)交出控制權(quán),而不是被操作系統(tǒng)強(qiáng)制切換
  3. 比起回調(diào)地獄,代碼更清晰易讀
  4. 錯(cuò)誤處理更直觀,可以使用普通的 try/except

理解了這個(gè)機(jī)制,我們就能更好地使用異步編程:

  • 在 await 的時(shí)候,其他協(xié)程有機(jī)會(huì)執(zhí)行
  • 耗時(shí)操作應(yīng)該是真正的異步操作(比如 asyncio.sleep )
  • 不要在協(xié)程中使用阻塞操作,那樣會(huì)卡住整個(gè)事件循環(huán)

小結(jié)

Python 的異步編程主要依賴以下概念:

  1. async/await 語法:定義和等待協(xié)程
  2. asyncio 模塊:提供事件循環(huán)和任務(wù)調(diào)度
  3. Task 對象:表示待執(zhí)行的工作單元
  4. 異步上下文管理器:管理異步資源

使用異步編程的關(guān)鍵點(diǎn):

  1. I/O 密集型任務(wù)最適合使用異步編程
  2. 所有耗時(shí)操作都應(yīng)該是真正的異步操作
  3. 注意處理超時(shí)和異常情況
  4. 合理使用 asyncio.gather() 和 asyncio.wait_for()

異步編程不是萬能的,但在處理 I/O 密集型任務(wù)時(shí)確實(shí)能帶來顯著的性能提升。合理使用這些工具,能讓我們的程序更高效、更優(yōu)雅。

責(zé)任編輯:武曉燕 來源: Piper蛋窩
相關(guān)推薦

2017-08-02 14:17:08

前端asyncawait

2014-07-15 10:08:42

異步編程In .NET

2021-06-28 08:10:59

JavaScript異步編程

2014-07-15 10:31:07

asyncawait

2016-11-22 11:08:34

asyncjavascript

2012-07-22 15:59:42

Silverlight

2021-07-20 10:26:12

JavaScriptasyncawait

2023-10-08 10:21:11

JavaScriptAsync

2022-08-27 13:49:36

ES7promiseresolve

2023-07-28 07:31:52

JavaScriptasyncawait

2024-12-30 08:22:35

2021-06-28 07:27:43

AwaitAsync語法

2023-04-14 08:10:59

asyncawait

2021-02-09 09:53:11

C#多線程異步

2024-06-25 08:33:48

2022-06-13 07:36:47

useEffectHooks

2021-06-15 05:36:45

Gulpawaitasync

2022-06-16 10:37:09

asyncawait

2017-04-10 15:57:10

AsyncAwaitPromise

2017-04-19 08:47:42

AsyncJavascript異步代碼
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)