自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Python 并發(fā)編程實(shí)戰(zhàn):優(yōu)雅地使用 Concurrent.futures

開發(fā) 后端
Concurrent.futures? 模塊為 Python 并發(fā)編程提供了一個優(yōu)雅的高級接口。相比傳統(tǒng)的 threading? / multiprocessing 模塊。

在 Python 多線程編程中,concurrent.futures 模塊提供了一個高層的接口來異步執(zhí)行可調(diào)用對象。今天,我們將通過一個循序漸進(jìn)的案例,深入了解如何使用這個強(qiáng)大的工具。

從一個模擬場景開始

假設(shè)我們需要處理一批網(wǎng)絡(luò)請求。為了模擬這個場景,我們使用 sleep 來代表耗時操作:

import time
import random

def slow_operation(task_id):
    """模擬一個耗時的網(wǎng)絡(luò)請求"""
    sleep_time = random.uniform(0.5, 2)
    time.sleep(sleep_time)
    return f"Task {task_id} completed in {sleep_time:.2f} seconds"

# 串行處理
def process_serial():
    start = time.perf_counter()
    results = []
    for i in range(10):
        result = slow_operation(i)
        results.append(result)
    end = time.perf_counter()
    print(f"串行處理總耗時:{end - start:.2f} 秒")
    return results

# 運(yùn)行示例
if __name__ == '__main__':
    results = process_serial()
    for r in results:
        print(r)
串行處理總耗時:11.75 秒
Task 0 completed in 1.27 seconds
Task 1 completed in 1.10 seconds
Task 2 completed in 1.35 seconds
Task 3 completed in 1.36 seconds
Task 4 completed in 1.42 seconds
Task 5 completed in 1.55 seconds
Task 6 completed in 0.74 seconds
Task 7 completed in 0.55 seconds
Task 8 completed in 1.40 seconds
Task 9 completed in 0.97 seconds

運(yùn)行這段代碼,你會發(fā)現(xiàn)處理 10 個任務(wù)需要大約 10-15 秒。這顯然不夠高效。

使用傳統(tǒng)的 threading 模塊

讓我們先看看使用傳統(tǒng)的 threading 模塊如何改進(jìn):

import threading
from queue import Queue

def slow_operation(task_id):
    """模擬一個耗時的網(wǎng)絡(luò)請求"""
    sleep_time = random.uniform(0.5, 2)
    time.sleep(sleep_time)
    return f"Task {task_id} completed in {sleep_time:.2f} seconds"

def process_threading():
    start = time.perf_counter()
    results = []
    work_queue = Queue()
    lock = threading.Lock()
    
    # 填充工作隊(duì)列
    for i in range(10):
        work_queue.put(i)
    
    def worker():
        while True:
            try:
                task_id = work_queue.get_nowait()
                result = slow_operation(task_id)
                with lock:
                    results.append(result)
                work_queue.task_done()
            except Queue.Empty:
                break
    
    threads = []
    for _ in range(4):  # 使用4個線程
        t = threading.Thread(target=worker)
        t.start()
        threads.append(t)
    
    for t in threads:
        t.join()
    
    end = time.perf_counter()
    print(f"多線程處理總耗時:{end - start:.2f} 秒")
    return results
多線程處理總耗時:3.24 秒

這個版本使用了多線程,性能確實(shí)提升了,但代碼比較復(fù)雜,需要手動管理線程、鎖和隊(duì)列。

concurrent.futures 的優(yōu)雅解決方案

現(xiàn)在,讓我們看看如何使用 concurrent.futures 來簡化代碼:

import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed

def slow_operation(task_id):
    """模擬一個耗時的網(wǎng)絡(luò)請求"""
    sleep_time = random.uniform(0.5, 2)
    time.sleep(sleep_time)
    return f"Task {task_id} completed in {sleep_time:.2f} seconds"

def process_concurrent():
    start = time.perf_counter()
    results = []
    
    # 創(chuàng)建線程池,設(shè)置最大線程數(shù)為4
    with ThreadPoolExecutor(max_workers=4) as executor:
        # 提交任務(wù)到線程池
        future_to_id = {executor.submit(slow_operation, i): i for i in range(10)}
        
        # 獲取結(jié)果
        for future in as_completed(future_to_id):
            results.append(future.result())
    
    end = time.perf_counter()
    print(f"concurrent.futures 處理總耗時:{end - start:.2f} 秒")
    return results

process_concurrent()
concurrent.futures 處理總耗時:3.54 秒

這里我們用到了幾個關(guān)鍵概念:

  • ThreadPoolExecutor :線程池執(zhí)行器,用于管理一組工作線程。創(chuàng)建時可以指定最大線程數(shù)。
  • executor.submit() :向線程池提交一個任務(wù)。返回 Future 對象,代表將來某個時刻會完成的操作。
  • as_completed() :返回一個迭代器,在 Future 完成時產(chǎn)生對應(yīng)的 Future 對象。這意味著結(jié)果是按照完成順序而不是提交順序返回的。

Future 對象的高級用法

Future 對象提供了多個有用的方法,讓我們通過實(shí)例來了解:

import time
import random
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED

def slow_operation(task_id):
    """模擬一個耗時的網(wǎng)絡(luò)請求"""
    sleep_time = random.uniform(0.5, 2)
    time.sleep(sleep_time)
    return f"Task {task_id} completed in {sleep_time:.2f} seconds"

def demonstrate_future_features():
    with ThreadPoolExecutor(max_workers=4) as executor:
        # 提交任務(wù)并獲取 Future 對象
        futures = [executor.submit(slow_operation, i) for i in range(10)]
        
        # 1. done() 檢查任務(wù)是否完成
        print("檢查第一個任務(wù)是否完成:", futures[0].done())
        
        # 2. 使用 wait() 等待部分任務(wù)完成
        done, not_done = wait(futures, return_when=FIRST_COMPLETED)
        print(f"完成的任務(wù)數(shù): {len(done)}, 未完成的任務(wù)數(shù): {len(not_done)}")
        
        # 3. 獲取結(jié)果時設(shè)置超時
        try:
            result = futures[0].result(timeout=1.0)
            print("獲取到結(jié)果:", result)
        except TimeoutError:
            print("獲取結(jié)果超時")
        
        # 4. cancel() 取消未開始的任務(wù)
        for f in not_done:
            cancelled = f.cancel()
            print(f"取消任務(wù): {'成功' if cancelled else '失敗'}")

demonstrate_future_features()
檢查第一個任務(wù)是否完成: False
完成的任務(wù)數(shù): 1, 未完成的任務(wù)數(shù): 9
獲取到結(jié)果: Task 0 completed in 1.07 seconds
取消任務(wù): 失敗
取消任務(wù): 成功
取消任務(wù): 成功
取消任務(wù): 失敗
取消任務(wù): 失敗
取消任務(wù): 失敗
取消任務(wù): 失敗
取消任務(wù): 成功
取消任務(wù): 失敗

線程/進(jìn)程池還是異步 IO?

IO 密集型任務(wù):優(yōu)先選擇 asyncio

為什么選擇 asyncio ?

  • 更低的資源開銷 asyncio 使用協(xié)程,不需要創(chuàng)建額外的線程或進(jìn)程
  • 更高的并發(fā)量:單線程可以輕松處理數(shù)千個并發(fā)任務(wù)
  • 沒有 GIL 的限制:協(xié)程在單線程內(nèi)切換,完全規(guī)避了 GIL 的影響

讓我們通過一個網(wǎng)絡(luò)請求的例子來對比:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

# 模擬網(wǎng)絡(luò)請求
def sync_request(url):
    time.sleep(1)  # 模擬網(wǎng)絡(luò)延遲
    return f"Response from {url}"

async def async_request(url):
    await asyncio.sleep(1)  # 模擬網(wǎng)絡(luò)延遲
    return f"Response from {url}"

# 使用線程池
def thread_pool_example():
    urls = [f"http://example.com/{i}" for i in range(100)]
    start = time.perf_counter()
    
    with ThreadPoolExecutor(max_workers=20) as executor:
        results = list(executor.map(sync_request, urls))
    
    end = time.perf_counter()
    print(f"ThreadPoolExecutor 耗時: {end - start:.2f} 秒")
    return results

# 使用 asyncio
async def asyncio_example():
    urls = [f"http://example.com/{i}" for i in range(100)]
    start = time.perf_counter()
    
    tasks = [async_request(url) for url in urls]
    results = await asyncio.gather(*tasks)
    
    end = time.perf_counter()
    print(f"asyncio 耗時: {end - start:.2f} 秒")
    return results

if __name__ == '__main__':
    # 運(yùn)行線程池版本
    thread_results = thread_pool_example()
    
    # 運(yùn)行 asyncio 版本
    asyncio_results = asyncio.run(asyncio_example())
ThreadPoolExecutor 耗時: 5.03 秒
asyncio 耗時: 1.00 秒

在這個例子中, asyncio 版本通常會表現(xiàn)出更好的性能,尤其是在并發(fā)量大的情況下。

CPU 密集型任務(wù):使用 ProcessPoolExecutor

為什么選擇多進(jìn)程?

  • 繞過 GIL:每個進(jìn)程都有自己的 Python 解釋器和 GIL
  • 充分利用多核性能:可以真正實(shí)現(xiàn)并行計(jì)算
  • 適合計(jì)算密集型任務(wù):如數(shù)據(jù)處理、圖像處理等

來看一個計(jì)算密集型任務(wù)的對比:

import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

def cpu_intensive_task(n):
    """計(jì)算密集型任務(wù):計(jì)算大量浮點(diǎn)數(shù)運(yùn)算"""
    result = 0
    for i in range(n):
        result += i ** 2 / 3.14
    return result

def compare_performance():
    numbers = [10**6] * 20  # 20個大規(guī)模計(jì)算任務(wù)
    
    # 使用線程池
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=4) as executor:
        thread_results = list(executor.map(cpu_intensive_task, numbers))
    thread_time = time.perf_counter() - start
    print(f"線程池耗時: {thread_time:.2f} 秒")
    
    # 使用進(jìn)程池
    start = time.perf_counter()
    with ProcessPoolExecutor(max_workers=4) as executor:
        process_results = list(executor.map(cpu_intensive_task, numbers))
    process_time = time.perf_counter() - start
    print(f"進(jìn)程池耗時: {process_time:.2f} 秒")

if __name__ == '__main__':
    compare_performance()
線程池耗時: 4.61 秒
進(jìn)程池耗時: 1.34 秒

在這種場景下, ProcessPoolExecutor 的性能明顯優(yōu)于 ThreadPoolExecutor 。

混合型任務(wù):ThreadPoolExecutor 的優(yōu)勢

為什么有時候選擇線程池?

  • 更容易與現(xiàn)有代碼集成:大多數(shù) Python 庫都是基于同步設(shè)計(jì)的
  • 資源開銷比進(jìn)程池小:線程共享內(nèi)存空間
  1. 適合 IO 和 CPU 混合的場景:當(dāng)任務(wù)既有 IO 操作又有計(jì)算時

示例場景:

import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

def mixed_task(task_id):
    """混合型任務(wù):既有 IO 操作又有計(jì)算"""
    # IO 操作
    time.sleep(0.5)
    
    # CPU 計(jì)算
    result = sum(i * i for i in range(10**5))
    
    # 再次 IO 操作
    time.sleep(0.5)
    
    return f"Task {task_id}: {result}"

def demonstrate_mixed_workload():
    tasks = range(10)
    
    # 使用線程池
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=4) as executor:
        thread_results = list(executor.map(mixed_task, tasks))
    thread_time = time.perf_counter() - start
    print(f"線程池處理混合任務(wù)耗時: {thread_time:.2f} 秒")
    
    # 使用進(jìn)程池
    start = time.perf_counter()
    with ProcessPoolExecutor(max_workers=4) as executor:
        process_results = list(executor.map(mixed_task, tasks))
    process_time = time.perf_counter() - start
    print(f"進(jìn)程池處理混合任務(wù)耗時: {process_time:.2f} 秒")

if __name__ == '__main__':
    demonstrate_mixed_workload()
線程池處理混合任務(wù)耗時: 3.05 秒
進(jìn)程池處理混合任務(wù)耗時: 3.11 秒

選擇建議的決策樹

在選擇并發(fā)方案時,可以參考以下決策流程:

首先判斷任務(wù)類型:

  • 如果是純 IO 密集型(網(wǎng)絡(luò)請求、文件操作),優(yōu)先選擇 asyncio。
  • 如果是純 CPU 密集型(大量計(jì)算),優(yōu)先選擇 ProcessPoolExecutor。
  • 如果是混合型任務(wù),考慮使用 ThreadPoolExecutor。

考慮其他因素

  • 現(xiàn)有代碼是否易于改造為異步?
  • 是否需要與同步代碼交互?
  • 并發(fā)量有多大?
  • 是否需要跨進(jìn)程通信?
def choose_concurrency_model(task_type, 
                           concurrent_count,
                           legacy_code=False,
                           need_shared_memory=False):
    """幫助選擇并發(fā)模型的示例函數(shù)"""
    if task_type == "IO":
        if legacy_code or need_shared_memory:
            return "ThreadPoolExecutor"
        else:
            return "asyncio"
    elif task_type == "CPU":
        if need_shared_memory:
            return "ThreadPoolExecutor"
        else:
            return "ProcessPoolExecutor"
    else:  # mixed
        if concurrent_count > 1000:
            return "asyncio"
        else:
            return "ThreadPoolExecutor"

性能對比總結(jié)

方案

IO密集型

CPU密集型

混合型

資源開銷

代碼復(fù)雜度

asyncio

最佳

較差

最低

較高

ThreadPoolExecutor

較差

較好

ProcessPoolExecutor

一般

最佳

一般

總的來說,選擇合適的并發(fā)方案需要綜合考慮任務(wù)特性、性能需求、代碼復(fù)雜度等多個因素。在實(shí)際應(yīng)用中,有時候甚至可以混合使用多種方案,以達(dá)到最優(yōu)的性能表現(xiàn)。

實(shí)用技巧總結(jié)

控制線程池大小

def demonstrate_pool_sizing():
    # CPU 核心數(shù)
    cpu_count = os.cpu_count()
    # IO 密集型任務(wù),線程數(shù)可以設(shè)置為核心數(shù)的 1-4 倍
    io_bound_workers = cpu_count * 2
    # CPU 密集型任務(wù),線程數(shù)不應(yīng)超過核心數(shù)
    cpu_bound_workers = cpu_count

    print(f"推薦的線程數(shù):")
    print(f"IO 密集型任務(wù):{io_bound_workers}")
    print(f"CPU 密集型任務(wù):{cpu_bound_workers}")

批量提交任務(wù)

def demonstrate_batch_submit():
    with ThreadPoolExecutor(max_workers=4) as executor:
        results_ordered = list(executor.map(slow_operation, range(5)))

        futures = [executor.submit(slow_operation, i) for i in range(5)]
        results_completion = [f.result() for f in as_completed(futures)]

        return results_ordered, results_completion

錯誤處理

def demonstrate_error_handling():
    def faulty_operation(task_id):
        if task_id == 3:
            raise ValueError(f"Task {task_id} failed")
        return slow_operation(task_id)
    
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(faulty_operation, i) for i in range(5)]
        
        for future in as_completed(futures):
            try:
                result = future.result()
                print(f"成功:{result}")
            except Exception as e:
                print(f"錯誤:{str(e)}")

總結(jié)

concurrent.futures 模塊為 Python 并發(fā)編程提供了一個優(yōu)雅的高級接口。相比傳統(tǒng)的 threading / multiprocessing 模塊,它具有以下優(yōu)勢:

  • 使用線程池自動管理線程的生命周期
  • 提供簡潔的接口提交任務(wù)和獲取結(jié)果
  • 支持超時和錯誤處理
  • 代碼更加 Pythonic 和易于維護(hù)
責(zé)任編輯:姜華 來源: Piper蛋窩
相關(guān)推薦

2024-01-17 12:44:23

Python并發(fā)編程

2021-01-13 11:29:43

Python多線程異步

2021-05-12 22:07:43

并發(fā)編排任務(wù)

2024-04-24 12:34:08

Spring事務(wù)編程

2023-10-30 23:25:48

FuturesGo語言

2021-01-28 14:53:19

PHP編碼開發(fā)

2023-11-22 13:05:12

Pytest測試

2017-04-12 11:16:08

Python終端編程

2023-05-12 14:14:00

Java線程中斷

2018-08-20 10:40:09

Redis位圖操作

2025-02-12 00:21:44

Java并發(fā)編程

2025-01-16 08:08:29

2017-12-14 14:17:08

Windows使用技巧手冊

2021-03-24 10:20:50

Fonts前端代碼

2018-07-23 08:19:26

編程語言Python工具

2021-02-25 22:17:19

開發(fā)技術(shù)編程

2023-03-28 08:07:12

2024-11-13 16:37:00

Java線程池

2024-06-05 09:17:31

Python數(shù)據(jù)清洗開發(fā)

2024-11-21 09:00:00

Python字典代碼
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號