Python 并發(fā)編程實(shí)戰(zhàn):優(yōu)雅地使用 Concurrent.futures
在 Python 多線程編程中,concurrent.futures 模塊提供了一個高層的接口來異步執(zhí)行可調(diào)用對象。今天,我們將通過一個循序漸進(jìn)的案例,深入了解如何使用這個強(qiáng)大的工具。
從一個模擬場景開始
假設(shè)我們需要處理一批網(wǎng)絡(luò)請求。為了模擬這個場景,我們使用 sleep 來代表耗時操作:
import time
import random
def slow_operation(task_id):
"""模擬一個耗時的網(wǎng)絡(luò)請求"""
sleep_time = random.uniform(0.5, 2)
time.sleep(sleep_time)
return f"Task {task_id} completed in {sleep_time:.2f} seconds"
# 串行處理
def process_serial():
start = time.perf_counter()
results = []
for i in range(10):
result = slow_operation(i)
results.append(result)
end = time.perf_counter()
print(f"串行處理總耗時:{end - start:.2f} 秒")
return results
# 運(yùn)行示例
if __name__ == '__main__':
results = process_serial()
for r in results:
print(r)
串行處理總耗時:11.75 秒
Task 0 completed in 1.27 seconds
Task 1 completed in 1.10 seconds
Task 2 completed in 1.35 seconds
Task 3 completed in 1.36 seconds
Task 4 completed in 1.42 seconds
Task 5 completed in 1.55 seconds
Task 6 completed in 0.74 seconds
Task 7 completed in 0.55 seconds
Task 8 completed in 1.40 seconds
Task 9 completed in 0.97 seconds
運(yùn)行這段代碼,你會發(fā)現(xiàn)處理 10 個任務(wù)需要大約 10-15 秒。這顯然不夠高效。
使用傳統(tǒng)的 threading 模塊
讓我們先看看使用傳統(tǒng)的 threading 模塊如何改進(jìn):
import threading
from queue import Queue
def slow_operation(task_id):
"""模擬一個耗時的網(wǎng)絡(luò)請求"""
sleep_time = random.uniform(0.5, 2)
time.sleep(sleep_time)
return f"Task {task_id} completed in {sleep_time:.2f} seconds"
def process_threading():
start = time.perf_counter()
results = []
work_queue = Queue()
lock = threading.Lock()
# 填充工作隊(duì)列
for i in range(10):
work_queue.put(i)
def worker():
while True:
try:
task_id = work_queue.get_nowait()
result = slow_operation(task_id)
with lock:
results.append(result)
work_queue.task_done()
except Queue.Empty:
break
threads = []
for _ in range(4): # 使用4個線程
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for t in threads:
t.join()
end = time.perf_counter()
print(f"多線程處理總耗時:{end - start:.2f} 秒")
return results
多線程處理總耗時:3.24 秒
這個版本使用了多線程,性能確實(shí)提升了,但代碼比較復(fù)雜,需要手動管理線程、鎖和隊(duì)列。
concurrent.futures 的優(yōu)雅解決方案
現(xiàn)在,讓我們看看如何使用 concurrent.futures 來簡化代碼:
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
def slow_operation(task_id):
"""模擬一個耗時的網(wǎng)絡(luò)請求"""
sleep_time = random.uniform(0.5, 2)
time.sleep(sleep_time)
return f"Task {task_id} completed in {sleep_time:.2f} seconds"
def process_concurrent():
start = time.perf_counter()
results = []
# 創(chuàng)建線程池,設(shè)置最大線程數(shù)為4
with ThreadPoolExecutor(max_workers=4) as executor:
# 提交任務(wù)到線程池
future_to_id = {executor.submit(slow_operation, i): i for i in range(10)}
# 獲取結(jié)果
for future in as_completed(future_to_id):
results.append(future.result())
end = time.perf_counter()
print(f"concurrent.futures 處理總耗時:{end - start:.2f} 秒")
return results
process_concurrent()
concurrent.futures 處理總耗時:3.54 秒
這里我們用到了幾個關(guān)鍵概念:
- ThreadPoolExecutor :線程池執(zhí)行器,用于管理一組工作線程。創(chuàng)建時可以指定最大線程數(shù)。
- executor.submit() :向線程池提交一個任務(wù)。返回 Future 對象,代表將來某個時刻會完成的操作。
- as_completed() :返回一個迭代器,在 Future 完成時產(chǎn)生對應(yīng)的 Future 對象。這意味著結(jié)果是按照完成順序而不是提交順序返回的。
Future 對象的高級用法
Future 對象提供了多個有用的方法,讓我們通過實(shí)例來了解:
import time
import random
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
def slow_operation(task_id):
"""模擬一個耗時的網(wǎng)絡(luò)請求"""
sleep_time = random.uniform(0.5, 2)
time.sleep(sleep_time)
return f"Task {task_id} completed in {sleep_time:.2f} seconds"
def demonstrate_future_features():
with ThreadPoolExecutor(max_workers=4) as executor:
# 提交任務(wù)并獲取 Future 對象
futures = [executor.submit(slow_operation, i) for i in range(10)]
# 1. done() 檢查任務(wù)是否完成
print("檢查第一個任務(wù)是否完成:", futures[0].done())
# 2. 使用 wait() 等待部分任務(wù)完成
done, not_done = wait(futures, return_when=FIRST_COMPLETED)
print(f"完成的任務(wù)數(shù): {len(done)}, 未完成的任務(wù)數(shù): {len(not_done)}")
# 3. 獲取結(jié)果時設(shè)置超時
try:
result = futures[0].result(timeout=1.0)
print("獲取到結(jié)果:", result)
except TimeoutError:
print("獲取結(jié)果超時")
# 4. cancel() 取消未開始的任務(wù)
for f in not_done:
cancelled = f.cancel()
print(f"取消任務(wù): {'成功' if cancelled else '失敗'}")
demonstrate_future_features()
檢查第一個任務(wù)是否完成: False
完成的任務(wù)數(shù): 1, 未完成的任務(wù)數(shù): 9
獲取到結(jié)果: Task 0 completed in 1.07 seconds
取消任務(wù): 失敗
取消任務(wù): 成功
取消任務(wù): 成功
取消任務(wù): 失敗
取消任務(wù): 失敗
取消任務(wù): 失敗
取消任務(wù): 失敗
取消任務(wù): 成功
取消任務(wù): 失敗
線程/進(jìn)程池還是異步 IO?
IO 密集型任務(wù):優(yōu)先選擇 asyncio
為什么選擇 asyncio ?
- 更低的資源開銷: asyncio 使用協(xié)程,不需要創(chuàng)建額外的線程或進(jìn)程
- 更高的并發(fā)量:單線程可以輕松處理數(shù)千個并發(fā)任務(wù)
- 沒有 GIL 的限制:協(xié)程在單線程內(nèi)切換,完全規(guī)避了 GIL 的影響
讓我們通過一個網(wǎng)絡(luò)請求的例子來對比:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
# 模擬網(wǎng)絡(luò)請求
def sync_request(url):
time.sleep(1) # 模擬網(wǎng)絡(luò)延遲
return f"Response from {url}"
async def async_request(url):
await asyncio.sleep(1) # 模擬網(wǎng)絡(luò)延遲
return f"Response from {url}"
# 使用線程池
def thread_pool_example():
urls = [f"http://example.com/{i}" for i in range(100)]
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=20) as executor:
results = list(executor.map(sync_request, urls))
end = time.perf_counter()
print(f"ThreadPoolExecutor 耗時: {end - start:.2f} 秒")
return results
# 使用 asyncio
async def asyncio_example():
urls = [f"http://example.com/{i}" for i in range(100)]
start = time.perf_counter()
tasks = [async_request(url) for url in urls]
results = await asyncio.gather(*tasks)
end = time.perf_counter()
print(f"asyncio 耗時: {end - start:.2f} 秒")
return results
if __name__ == '__main__':
# 運(yùn)行線程池版本
thread_results = thread_pool_example()
# 運(yùn)行 asyncio 版本
asyncio_results = asyncio.run(asyncio_example())
ThreadPoolExecutor 耗時: 5.03 秒
asyncio 耗時: 1.00 秒
在這個例子中, asyncio 版本通常會表現(xiàn)出更好的性能,尤其是在并發(fā)量大的情況下。
CPU 密集型任務(wù):使用 ProcessPoolExecutor
為什么選擇多進(jìn)程?
- 繞過 GIL:每個進(jìn)程都有自己的 Python 解釋器和 GIL
- 充分利用多核性能:可以真正實(shí)現(xiàn)并行計(jì)算
- 適合計(jì)算密集型任務(wù):如數(shù)據(jù)處理、圖像處理等
來看一個計(jì)算密集型任務(wù)的對比:
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
def cpu_intensive_task(n):
"""計(jì)算密集型任務(wù):計(jì)算大量浮點(diǎn)數(shù)運(yùn)算"""
result = 0
for i in range(n):
result += i ** 2 / 3.14
return result
def compare_performance():
numbers = [10**6] * 20 # 20個大規(guī)模計(jì)算任務(wù)
# 使用線程池
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as executor:
thread_results = list(executor.map(cpu_intensive_task, numbers))
thread_time = time.perf_counter() - start
print(f"線程池耗時: {thread_time:.2f} 秒")
# 使用進(jìn)程池
start = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as executor:
process_results = list(executor.map(cpu_intensive_task, numbers))
process_time = time.perf_counter() - start
print(f"進(jìn)程池耗時: {process_time:.2f} 秒")
if __name__ == '__main__':
compare_performance()
線程池耗時: 4.61 秒
進(jìn)程池耗時: 1.34 秒
在這種場景下, ProcessPoolExecutor 的性能明顯優(yōu)于 ThreadPoolExecutor 。
混合型任務(wù):ThreadPoolExecutor 的優(yōu)勢
為什么有時候選擇線程池?
- 更容易與現(xiàn)有代碼集成:大多數(shù) Python 庫都是基于同步設(shè)計(jì)的
- 資源開銷比進(jìn)程池小:線程共享內(nèi)存空間
- 適合 IO 和 CPU 混合的場景:當(dāng)任務(wù)既有 IO 操作又有計(jì)算時
示例場景:
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
def mixed_task(task_id):
"""混合型任務(wù):既有 IO 操作又有計(jì)算"""
# IO 操作
time.sleep(0.5)
# CPU 計(jì)算
result = sum(i * i for i in range(10**5))
# 再次 IO 操作
time.sleep(0.5)
return f"Task {task_id}: {result}"
def demonstrate_mixed_workload():
tasks = range(10)
# 使用線程池
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as executor:
thread_results = list(executor.map(mixed_task, tasks))
thread_time = time.perf_counter() - start
print(f"線程池處理混合任務(wù)耗時: {thread_time:.2f} 秒")
# 使用進(jìn)程池
start = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as executor:
process_results = list(executor.map(mixed_task, tasks))
process_time = time.perf_counter() - start
print(f"進(jìn)程池處理混合任務(wù)耗時: {process_time:.2f} 秒")
if __name__ == '__main__':
demonstrate_mixed_workload()
線程池處理混合任務(wù)耗時: 3.05 秒
進(jìn)程池處理混合任務(wù)耗時: 3.11 秒
選擇建議的決策樹
在選擇并發(fā)方案時,可以參考以下決策流程:
首先判斷任務(wù)類型:
- 如果是純 IO 密集型(網(wǎng)絡(luò)請求、文件操作),優(yōu)先選擇 asyncio。
- 如果是純 CPU 密集型(大量計(jì)算),優(yōu)先選擇 ProcessPoolExecutor。
- 如果是混合型任務(wù),考慮使用 ThreadPoolExecutor。
考慮其他因素:
- 現(xiàn)有代碼是否易于改造為異步?
- 是否需要與同步代碼交互?
- 并發(fā)量有多大?
- 是否需要跨進(jìn)程通信?
def choose_concurrency_model(task_type,
concurrent_count,
legacy_code=False,
need_shared_memory=False):
"""幫助選擇并發(fā)模型的示例函數(shù)"""
if task_type == "IO":
if legacy_code or need_shared_memory:
return "ThreadPoolExecutor"
else:
return "asyncio"
elif task_type == "CPU":
if need_shared_memory:
return "ThreadPoolExecutor"
else:
return "ProcessPoolExecutor"
else: # mixed
if concurrent_count > 1000:
return "asyncio"
else:
return "ThreadPoolExecutor"
性能對比總結(jié)
方案 | IO密集型 | CPU密集型 | 混合型 | 資源開銷 | 代碼復(fù)雜度 |
asyncio | 最佳 | 較差 | 好 | 最低 | 較高 |
ThreadPoolExecutor | 好 | 較差 | 較好 | 低 | 低 |
ProcessPoolExecutor | 一般 | 最佳 | 一般 | 高 | 低 |
總的來說,選擇合適的并發(fā)方案需要綜合考慮任務(wù)特性、性能需求、代碼復(fù)雜度等多個因素。在實(shí)際應(yīng)用中,有時候甚至可以混合使用多種方案,以達(dá)到最優(yōu)的性能表現(xiàn)。
實(shí)用技巧總結(jié)
控制線程池大小
def demonstrate_pool_sizing():
# CPU 核心數(shù)
cpu_count = os.cpu_count()
# IO 密集型任務(wù),線程數(shù)可以設(shè)置為核心數(shù)的 1-4 倍
io_bound_workers = cpu_count * 2
# CPU 密集型任務(wù),線程數(shù)不應(yīng)超過核心數(shù)
cpu_bound_workers = cpu_count
print(f"推薦的線程數(shù):")
print(f"IO 密集型任務(wù):{io_bound_workers}")
print(f"CPU 密集型任務(wù):{cpu_bound_workers}")
批量提交任務(wù)
def demonstrate_batch_submit():
with ThreadPoolExecutor(max_workers=4) as executor:
results_ordered = list(executor.map(slow_operation, range(5)))
futures = [executor.submit(slow_operation, i) for i in range(5)]
results_completion = [f.result() for f in as_completed(futures)]
return results_ordered, results_completion
錯誤處理
def demonstrate_error_handling():
def faulty_operation(task_id):
if task_id == 3:
raise ValueError(f"Task {task_id} failed")
return slow_operation(task_id)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(faulty_operation, i) for i in range(5)]
for future in as_completed(futures):
try:
result = future.result()
print(f"成功:{result}")
except Exception as e:
print(f"錯誤:{str(e)}")
總結(jié)
concurrent.futures 模塊為 Python 并發(fā)編程提供了一個優(yōu)雅的高級接口。相比傳統(tǒng)的 threading / multiprocessing 模塊,它具有以下優(yōu)勢:
- 使用線程池自動管理線程的生命周期
- 提供簡潔的接口提交任務(wù)和獲取結(jié)果
- 支持超時和錯誤處理
- 代碼更加 Pythonic 和易于維護(hù)