Python 異步協(xié)程:從 async/await 到 asyncio 再到 async with
在 Python 3.8 以后的版本中,異步編程變得越來越重要。本文將系統(tǒng)介紹 Python 標(biāo)準(zhǔn)庫中的異步編程工具,帶領(lǐng)大家掌握 async/await 語法和 asyncio 的使用。
從一個(gè)簡單的場景開始
假設(shè)我們在處理一些耗時(shí)的 I/O 操作,比如讀取多個(gè)文件或處理多個(gè)數(shù)據(jù)。為了模擬這種場景,我們先用 time.sleep() 來代表耗時(shí)操作:
import time
import random
def process_item(item):
# 模擬耗時(shí)操作
print(f"處理中:{item}")
process_time = random.uniform(0.5, 2.0)
time.sleep(process_time)
return f"處理完成:{item},耗時(shí) {process_time:.2f} 秒"
def process_all_items():
items = ["任務(wù)A", "任務(wù)B", "任務(wù)C", "任務(wù)D"]
results = []
for item in items:
result = process_item(item)
results.append(result)
return results
if __name__ == "__main__":
start = time.time()
results = process_all_items()
end = time.time()
print("\n".join(results))
print(f"總耗時(shí):{end - start:.2f} 秒")
處理中:任務(wù)A
處理中:任務(wù)B
處理中:任務(wù)C
處理中:任務(wù)D
處理完成:任務(wù)A,耗時(shí) 1.97 秒
處理完成:任務(wù)B,耗時(shí) 1.28 秒
處理完成:任務(wù)C,耗時(shí) 0.66 秒
處理完成:任務(wù)D,耗時(shí) 1.80 秒
總耗時(shí):5.72 秒
這段代碼的問題很明顯:每個(gè)任務(wù)都必須等待前一個(gè)任務(wù)完成才能開始。如果有4個(gè)任務(wù),每個(gè)任務(wù)平均耗時(shí)1秒,那么總耗時(shí)就接近4秒。
認(rèn)識(shí) async/await
Python 引入了 async/await 語法來支持異步編程。當(dāng)我們在函數(shù)定義前加上 async 關(guān)鍵字時(shí),這個(gè)函數(shù)就變成了一個(gè)"協(xié)程"(coroutine)。而 await 關(guān)鍵字則用于等待一個(gè)協(xié)程完成。讓我們改寫上面的代碼:
import asyncio
import random
import time
async def process_item(item):
print(f"處理中:{item}")
# async 定義的函數(shù)變成了協(xié)程
process_time = random.uniform(0.5, 2.0)
# time.sleep() 換成 asyncio.sleep()
await asyncio.sleep(process_time) # await 等待異步操作完成
return f"處理完成:{item},耗時(shí) {process_time:.2f} 秒"
async def process_all_items():
items = ["任務(wù)A", "任務(wù)B", "任務(wù)C", "任務(wù)D"]
# 創(chuàng)建任務(wù)列表
tasks = [
asyncio.create_task(process_item(item))
for item in items
]
print("開始處理")
results = await asyncio.gather(*tasks)
return results
async def main():
start = time.time()
results = await process_all_items()
end = time.time()
print("\n".join(results))
print(f"總耗時(shí):{end - start:.2f} 秒")
if __name__ == "__main__":
asyncio.run(main())
開始處理
處理中:任務(wù)A
處理中:任務(wù)B
處理中:任務(wù)C
處理中:任務(wù)D
處理完成:任務(wù)A,耗時(shí) 1.97 秒
處理完成:任務(wù)B,耗時(shí) 0.80 秒
處理完成:任務(wù)C,耗時(shí) 0.83 秒
處理完成:任務(wù)D,耗時(shí) 1.46 秒
總耗時(shí):1.97 秒
讓我們詳細(xì)解釋這段代碼的執(zhí)行過程:
- 當(dāng)函數(shù)被 async 關(guān)鍵字修飾后,調(diào)用該函數(shù)不會(huì)直接執(zhí)行函數(shù)體,而是返回一個(gè)協(xié)程對象
- await 關(guān)鍵字只能在 async 函數(shù)內(nèi)使用,它表示"等待這個(gè)操作完成后再繼續(xù)"
- asyncio.create_task() 將協(xié)程包裝成一個(gè)任務(wù),該任務(wù)會(huì)被事件循環(huán)調(diào)度執(zhí)行
- asyncio.gather() 并發(fā)運(yùn)行多個(gè)任務(wù),并等待它們?nèi)客瓿?/li>
- asyncio.run() 創(chuàng)建事件循環(huán),運(yùn)行 main() 協(xié)程,直到它完成
使用 asyncio.wait_for 添加超時(shí)控制
在實(shí)際應(yīng)用中,我們往往需要為異步操作設(shè)置超時(shí)時(shí)間:
import asyncio
import random
import time
async def process_item(item):
process_time = random.uniform(0.5, 2.0)
try:
# 設(shè)置1秒超時(shí)
await asyncio.wait_for(
asyncio.sleep(process_time),
timeout=1.0
)
return f"處理完成:{item},耗時(shí) {process_time:.2f} 秒"
except asyncio.TimeoutError:
return f"處理超時(shí):{item}"
async def main():
items = ["任務(wù)A", "任務(wù)B", "任務(wù)C", "任務(wù)D"]
tasks = [
asyncio.create_task(process_item(item))
for item in items
]
start = time.time()
results = await asyncio.gather(*tasks, return_exceptions=True)
end = time.time()
print("\n".join(results))
print(f"總耗時(shí):{end - start:.2f} 秒")
if __name__ == "__main__":
asyncio.run(main())
處理超時(shí):任務(wù)A
處理完成:任務(wù)B,耗時(shí) 0.94 秒
處理超時(shí):任務(wù)C
處理完成:任務(wù)D,耗時(shí) 0.78 秒
總耗時(shí):1.00 秒
使用異步上下文管理器
Python 中的 with 語句可以用于資源管理,類似地,異步編程中我們可以使用 async with 。一個(gè)類要支持異步上下文管理,需要實(shí)現(xiàn) __aenter__ 和 __aexit__ 方法:
import asyncio
import random
class AsyncResource:
async def __aenter__(self):
# 異步初始化資源
print("正在初始化資源...")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 異步清理資源
print("正在清理資源...")
await asyncio.sleep(0.1)
async def process(self, item):
# 異步處理任務(wù)
print(f"正在處理任務(wù):{item}")
process_time = random.uniform(0.5, 2.0)
await asyncio.sleep(process_time)
return f"處理完成:{item},耗時(shí) {process_time:.2f} 秒"
async def main():
items = ["任務(wù)A", "任務(wù)B", "任務(wù)C"]
async with AsyncResource() as resource:
tasks = [
asyncio.create_task(resource.process(item))
for item in items
]
results = await asyncio.gather(*tasks)
print("\n".join(results))
if __name__ == "__main__":
asyncio.run(main())
正在初始化資源...
正在處理任務(wù):任務(wù)A
正在處理任務(wù):任務(wù)B
正在處理任務(wù):任務(wù)C
正在清理資源...
處理完成:任務(wù)A,耗時(shí) 1.31 秒
處理完成:任務(wù)B,耗時(shí) 0.77 秒
處理完成:任務(wù)C,耗時(shí) 0.84 秒
使用事件循環(huán)執(zhí)行阻塞操作 run_in_executor
在異步編程中,我們可能會(huì)遇到一些無法避免的阻塞操作(比如調(diào)用傳統(tǒng)的同步API)。這時(shí),asyncio.get_running_loop() 和 run_in_executor 就顯得特別重要:
import asyncio
import time
import requests # 一個(gè)同步的HTTP客戶端庫
async def blocking_operation():
# 獲取當(dāng)前事件循環(huán)
loop = asyncio.get_running_loop()
# 在線程池中執(zhí)行阻塞操作
result = await loop.run_in_executor(
None, # 使用默認(rèn)的線程池執(zhí)行器
requests.get, # 要執(zhí)行的阻塞函數(shù)
'http://httpbin.org/delay/1' # 函數(shù)參數(shù)
)
return result.status_code
async def non_blocking_operation():
await asyncio.sleep(1)
return "非阻塞操作完成"
async def main():
# 同時(shí)執(zhí)行阻塞和非阻塞操作
tasks = [
asyncio.create_task(blocking_operation()),
asyncio.create_task(non_blocking_operation())
]
start = time.time()
results = await asyncio.gather(*tasks)
end = time.time()
print(f"操作結(jié)果:{results}")
print(f"總耗時(shí):{end - start:.2f} 秒")
if __name__ == "__main__":
asyncio.run(main())
輸出:
操作結(jié)果:[200, '非阻塞操作完成']
總耗時(shí):1.99 秒
這個(gè)例子展示了如何在異步程序中優(yōu)雅地處理同步操作。如果不使用 run_in_executor,阻塞操作會(huì)阻塞整個(gè)事件循環(huán),導(dǎo)致其他任務(wù)無法執(zhí)行:
- requests.get() 是同步操作,會(huì)阻塞當(dāng)前線程
- 事件循環(huán)運(yùn)行在主線程上
- 如果直接在協(xié)程中調(diào)用 requests.get() ,整個(gè)事件循環(huán)都會(huì)被阻塞
- 其他任務(wù)無法在這期間執(zhí)行
- run_in_executor 會(huì)將阻塞操作放到另一個(gè)線程中執(zhí)行
- 主線程的事件循環(huán)可以繼續(xù)處理其他任務(wù)
- 當(dāng)線程池中的操作完成時(shí),結(jié)果會(huì)被返回給事件循環(huán)
最佳實(shí)踐是:
- 盡量使用原生支持異步的庫(如 aiohttp)
- 如果必須使用同步庫,就用 run_in_executor
- 對于 CPU 密集型任務(wù)也可以用 run_in_executor 放到進(jìn)程池中執(zhí)行
任務(wù)取消:優(yōu)雅地終止異步操作
有時(shí)我們需要取消正在執(zhí)行的異步任務(wù),比如用戶中斷操作或超時(shí)處理:
import asyncio
import random
async def long_operation(name):
try:
print(f"{name} 開始執(zhí)行")
while True: # 模擬一個(gè)持續(xù)運(yùn)行的操作
await asyncio.sleep(0.5)
print(f"{name} 正在執(zhí)行...")
except asyncio.CancelledError:
print(f"{name} 被取消了")
raise # 重要:繼續(xù)傳播取消信號(hào)
async def main():
# 創(chuàng)建三個(gè)任務(wù)
task1 = asyncio.create_task(long_operation("任務(wù)1"))
task2 = asyncio.create_task(long_operation("任務(wù)2"))
task3 = asyncio.create_task(long_operation("任務(wù)3"))
# 等待1秒后取消task1
await asyncio.sleep(1)
task1.cancel()
# 等待2秒后取消其余任務(wù)
await asyncio.sleep(1)
task2.cancel()
task3.cancel()
try:
# 等待所有任務(wù)完成或被取消
await asyncio.gather(task1, task2, task3, return_exceptions=True)
except asyncio.CancelledError:
print("某個(gè)任務(wù)被取消了")
if __name__ == "__main__":
asyncio.run(main())
輸出:
任務(wù)1 開始執(zhí)行
任務(wù)2 開始執(zhí)行
任務(wù)3 開始執(zhí)行
任務(wù)1 正在執(zhí)行...
任務(wù)2 正在執(zhí)行...
任務(wù)3 正在執(zhí)行...
任務(wù)1 被取消了
任務(wù)2 正在執(zhí)行...
任務(wù)3 正在執(zhí)行...
任務(wù)2 正在執(zhí)行...
任務(wù)3 正在執(zhí)行...
任務(wù)2 被取消了
任務(wù)3 被取消了
這個(gè)例子展示了如何正確處理任務(wù)取消:
- 任務(wù)可以在執(zhí)行過程中被取消
- 被取消的任務(wù)會(huì)拋出 CancelledError
- 我們應(yīng)該適當(dāng)處理取消信號(hào),確保資源被正確清理
深入理解協(xié)程:為什么需要 async/await?
協(xié)程(Coroutine)是一種特殊的函數(shù),它可以在執(zhí)行過程中暫停,并在之后從暫停的地方繼續(xù)執(zhí)行。當(dāng)我們使用 async 定義一個(gè)函數(shù)時(shí),我們實(shí)際上是在定義一個(gè)協(xié)程:
import asyncio
# 這是一個(gè)普通函數(shù)
def normal_function():
return "Hello"
# 這是一個(gè)協(xié)程
async def coroutine_function():
await asyncio.sleep(1)
return "Hello"
# 讓我們看看它們的區(qū)別
print(normal_function) # <function normal_function at 0x1052cc040>
print(coroutine_function) # <function coroutine_function at 0x1054b9790>
# 調(diào)用它們的結(jié)果不同
print(normal_function()) # 直接返回: "Hello"
print(coroutine_function()) # RuntimeWarning: coroutine 'coroutine_function' was never awaited
# <coroutine object coroutine_function at 0x105962e40>
await 如何與事件循環(huán)協(xié)作
協(xié)程(Coroutine)的核心在于它可以在執(zhí)行過程中主動(dòng)交出控制權(quán),讓其他代碼有機(jī)會(huì)執(zhí)行。讓我們通過一個(gè)詳細(xì)的例子來理解這個(gè)過程:
import asyncio
async def task1():
print("任務(wù)1:開始")
print("任務(wù)1:準(zhǔn)備休眠")
await asyncio.sleep(2) # 關(guān)鍵點(diǎn)1:交出控制權(quán)
print("任務(wù)1:休眠結(jié)束")
async def task2():
print("任務(wù)2:開始")
print("任務(wù)2:準(zhǔn)備休眠")
await asyncio.sleep(1) # 關(guān)鍵點(diǎn)2:交出控制權(quán)
print("任務(wù)2:休眠結(jié)束")
async def main():
# 同時(shí)執(zhí)行兩個(gè)任務(wù)
await asyncio.gather(task1(), task2())
asyncio.run(main())
這段代碼的輸出會(huì)是:
任務(wù)1:開始
任務(wù)1:準(zhǔn)備休眠
任務(wù)2:開始
任務(wù)2:準(zhǔn)備休眠
任務(wù)2:休眠結(jié)束 # 1秒后
任務(wù)1:休眠結(jié)束 # 2秒后
讓我們詳細(xì)解釋執(zhí)行過程:
- 當(dāng)程序遇到 await asyncio.sleep(2) 時(shí):
這個(gè) sleep 操作被注冊到事件循環(huán)中
Python 記錄當(dāng)前的執(zhí)行位置
task1 主動(dòng)交出控制權(quán)
重要:task1 并沒有停止運(yùn)行,而是被暫停了,等待之后恢復(fù)
- 事件循環(huán)接管控制權(quán)后:
尋找其他可以執(zhí)行的協(xié)程(這里是 task2)
開始執(zhí)行 task2,直到遇到 await asyncio.sleep(1)
task2 也交出控制權(quán),被暫停
- 事件循環(huán)繼續(xù)工作:
管理一個(gè)計(jì)時(shí)器,追蹤這兩個(gè) sleep 操作
1秒后,發(fā)現(xiàn) task2 的 sleep 時(shí)間到了
恢復(fù) task2 的執(zhí)行,打印"任務(wù)2:休眠結(jié)束"
2秒到時(shí),恢復(fù) task1 的執(zhí)行,打印"任務(wù)1:休眠結(jié)束"
這就像是一個(gè)指揮家(事件循環(huán))在指揮一個(gè)管弦樂隊(duì)(多個(gè)協(xié)程):
- 當(dāng)某個(gè)樂器(協(xié)程)需要休息時(shí),它舉手示意(await)
- 指揮家看到后,立即指揮其他樂器演奏
- 當(dāng)休息時(shí)間到了,指揮家會(huì)示意這個(gè)樂器繼續(xù)演奏
代碼驗(yàn)證:
import asyncio
import time
async def report_time(name, sleep_time):
print(f"{time.strftime('%H:%M:%S')} - {name}開始")
await asyncio.sleep(sleep_time)
print(f"{time.strftime('%H:%M:%S')} - {name}結(jié)束")
async def main():
# 同時(shí)執(zhí)行多個(gè)任務(wù)
await asyncio.gather(
report_time("任務(wù)A", 2),
report_time("任務(wù)B", 1),
report_time("任務(wù)C", 3)
)
asyncio.run(main())
輸出:
00:19:26 - 任務(wù)A開始
00:19:26 - 任務(wù)B開始
00:19:26 - 任務(wù)C開始
00:19:27 - 任務(wù)B結(jié)束
00:19:28 - 任務(wù)A結(jié)束
00:19:29 - 任務(wù)C結(jié)束
這種機(jī)制的優(yōu)勢在于:
- 單線程執(zhí)行,沒有線程切換開銷
- 協(xié)程主動(dòng)交出控制權(quán),而不是被操作系統(tǒng)強(qiáng)制切換
- 比起回調(diào)地獄,代碼更清晰易讀
- 錯(cuò)誤處理更直觀,可以使用普通的 try/except
理解了這個(gè)機(jī)制,我們就能更好地使用異步編程:
- 在 await 的時(shí)候,其他協(xié)程有機(jī)會(huì)執(zhí)行
- 耗時(shí)操作應(yīng)該是真正的異步操作(比如 asyncio.sleep )
- 不要在協(xié)程中使用阻塞操作,那樣會(huì)卡住整個(gè)事件循環(huán)
小結(jié)
Python 的異步編程主要依賴以下概念:
- async/await 語法:定義和等待協(xié)程
- asyncio 模塊:提供事件循環(huán)和任務(wù)調(diào)度
- Task 對象:表示待執(zhí)行的工作單元
- 異步上下文管理器:管理異步資源
使用異步編程的關(guān)鍵點(diǎn):
- I/O 密集型任務(wù)最適合使用異步編程
- 所有耗時(shí)操作都應(yīng)該是真正的異步操作
- 注意處理超時(shí)和異常情況
- 合理使用 asyncio.gather() 和 asyncio.wait_for()
異步編程不是萬能的,但在處理 I/O 密集型任務(wù)時(shí)確實(shí)能帶來顯著的性能提升。合理使用這些工具,能讓我們的程序更高效、更優(yōu)雅。