本文 AI 產出,尚未審核

Python 並行與非同步:深入 concurrent.futures 模組


簡介

在現代的 Python 應用程式中,效能資源利用率往往是決定使用者體驗的關鍵。當程式需要同時處理多筆 I/O 任務(如網路請求、檔案讀寫)或大量 CPU 密集計算時,單純的同步執行會造成大量的等待時間,導致 CPU 閒置或系統回應遲緩。

concurrent.futures 是 Python 標準庫在 3.2 版加入的高階抽象,提供 **執行緒池(ThreadPoolExecutor)行程池(ProcessPoolExecutor)**兩種簡潔的介面,讓開發者可以不必直接操作 threadingmultiprocessingasyncio,就能快速實現 並行(parallel)非同步(asynchronous) 的工作流程。

本篇文章將從核心概念切入,搭配實作範例,說明如何使用 concurrent.futures 解決常見的併發需求,並提供最佳實踐與常見陷阱,幫助讀者在實務專案中自信地運用這個模組。


核心概念

1. Executor 與 Future

  • Executor:負責管理工作執行的容器。ThreadPoolExecutor 使用多執行緒,適合 I/O 密集任務;ProcessPoolExecutor 使用多行程,適合 CPU 密集任務。
  • Future:代表一個尚未完成的計算結果。透過 Future,我們可以 取得結果檢查狀態、或 設定回呼函式
from concurrent.futures import ThreadPoolExecutor, Future

def task(x: int) -> int:
    return x * x

executor = ThreadPoolExecutor(max_workers=4)
future: Future = executor.submit(task, 10)   # 提交任務,立即返回 Future 物件
print(future.done())   # False,任務尚未完成
result = future.result()   # 阻塞等待,取得回傳值
print(result)   # 100
executor.shutdown()

重點Future.result()阻塞直到任務完成,若想避免阻塞,可使用 as_completedadd_done_callback


2. 取得多筆任務結果:as_completedmap

  • as_completed:返回一個 iterator,依照任務完成的順序產生 Future
  • executor.map:類似內建的 map,會依提交順序返回結果,適合「所有任務都必須完成」的情況。
from concurrent.futures import ThreadPoolExecutor, as_completed
import time, random

def fetch_url(url: str) -> str:
    time.sleep(random.uniform(0.1, 0.5))   # 模擬網路延遲
    return f"content of {url}"

urls = [f"https://example.com/page{i}" for i in range(5)]

with ThreadPoolExecutor(max_workers=3) as executor:
    # 方法一:as_completed
    futures = [executor.submit(fetch_url, u) for u in urls]
    for fut in as_completed(futures):
        print(fut.result())

    # 方法二:map(保持提交順序)
    for content in executor.map(fetch_url, urls):
        print(content)

技巧as_completed 讓你可以即時處理最先完成的任務,減少等待時間;map 則在需要保持順序一次性取得所有結果時更直觀。


3. 執行緒與行程的選擇

任務類型 推薦 Executor 為何選擇
I/O 密集(網路、磁碟) ThreadPoolExecutor GIL(全域解釋器鎖)在 I/O 等待時會釋放,執行緒切換成本低
CPU 密集(數值運算、影像處理) ProcessPoolExecutor 每個行程擁有獨立的 Python 直譯器與 GIL,可真正平行執行

實務建議:在開發初期先以 ThreadPoolExecutor 測試,若發現 CPU 使用率仍接近 100% 且瓶頸在計算上,再切換為 ProcessPoolExecutor


4. 例外處理與取消任務

Future 會把任務執行過程中拋出的例外捕獲,並在呼叫 result() 時重新拋出。若需要提前取消任務,可呼叫 future.cancel()(僅在任務尚未開始執行時有效)。

def divide(a, b):
    return a / b   # 可能拋出 ZeroDivisionError

with ThreadPoolExecutor() as executor:
    f1 = executor.submit(divide, 10, 2)
    f2 = executor.submit(divide, 5, 0)   # 故意觸發錯誤

    try:
        print(f1.result())   # 5.0
        print(f2.result())   # 這裡會拋出 ZeroDivisionError
    except ZeroDivisionError as e:
        print("除以零錯誤:", e)

    # 取消尚未執行的任務
    f3 = executor.submit(time.sleep, 5)
    cancelled = f3.cancel()
    print("任務是否成功取消?", cancelled)

5. 與 asyncio 的結合

雖然 concurrent.futures 已提供非同步的感覺,但在純 asyncio 程式中,我們仍可透過 loop.run_in_executor 把阻塞函式交給執行緒或行程池,以免阻塞事件迴圈。

import asyncio
from concurrent.futures import ThreadPoolExecutor
import time

def blocking_io():
    time.sleep(2)
    return "IO 完成"

async def main():
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, blocking_io)
        print(result)

asyncio.run(main())

要點:使用 run_in_executor 時,不要在同一個執行緒內同時呼叫阻塞 I/O,否則會失去非同步的好處。


常見陷阱與最佳實踐

陷阱 說明 解決方案
過度建立執行緒/行程 max_workers 設太大會導致系統資源耗盡,甚至產生「Too many open files」錯誤。 根據硬體(CPU 核心數、記憶體)與任務特性調整 max_workers;對 I/O 密集可稍高,CPU 密集則建議不超過核心數。
忘記關閉 Executor 程式結束前未呼叫 shutdown() 可能導致子執行緒仍在執行,造成資源泄漏。 使用 with 陳述式自動關閉,或手動呼叫 executor.shutdown(wait=True)
在子執行緒裡使用全域變數 子執行緒/行程不共享主執行緒的全域狀態,修改後不會回傳。 透過 queue.Queuemultiprocessing.Manager 或回傳值的方式傳遞資料。
例外被吞掉 若只檢查 future.done() 而不呼叫 result(),例外不會被暴露。 必須在取得結果時呼叫 future.result(),或在 as_completed 迴圈中處理例外。
ProcessPoolExecutor 中使用不可序列化的物件 行程間只能傳遞可 picklable 的物件,像是 lambda、局部函式會失敗。 定義在模組層級的函式或使用 cloudpickle(需自行安裝)。

最佳實踐

  1. 使用 with 句法:確保資源自動釋放。
  2. 根據任務性質選擇 Executor:I/O 用執行緒,CPU 用行程。
  3. 限制 max_workers:一般建議 max_workers = (os.cpu_count() or 1) * 5(I/O)或 max_workers = os.cpu_count()(CPU)。
  4. 適度使用回呼(callback)future.add_done_callback(fn) 可在任務完成後立即觸發後續處理,避免阻塞主執行緒。
  5. 監控與日誌:在大型服務中加入 logging,記錄每個 Future 的開始、結束與例外,方便排錯。
import logging
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')

def task(i):
    logging.info(f"Task {i} start")
    time.sleep(1)
    logging.info(f"Task {i} done")
    return i * i

with ThreadPoolExecutor(max_workers=3) as exe:
    futures = [exe.submit(task, n) for n in range(5)]
    for f in futures:
        f.add_done_callback(lambda fut: logging.info(f"Result: {fut.result()}"))

實際應用場景

場景 為何使用 concurrent.futures 範例簡述
批次爬蟲 同時發送多個 HTTP 請求,縮短總抓取時間 使用 ThreadPoolExecutor 下載多頁面,as_completed 即時處理回傳結果。
圖像處理管線 大量照片需要縮放、加水印等 CPU 密集運算 使用 ProcessPoolExecutor 把每張圖交給獨立行程處理,避免 GIL 限制。
資料庫批次寫入 多筆寫入操作可以平行化,提升寫入吞吐量 ThreadPoolExecutor 包裝資料庫客戶端的 execute_many,減少等待時間。
機器學習模型預測服務 多個請求同時呼叫模型推論,需快速回應 透過 ThreadPoolExecutor 把模型的 predict 包裝成非阻塞任務,配合 asyncio 提供 Web API。
定時任務調度 系統需要同時執行多個背景任務(備份、報表產生) 使用 ProcessPoolExecutor 在背景執行長時間任務,主程式仍保持回應。

實務小提醒:在雲端或容器環境部署時,請務必根據容器的 CPU 限額設定 max_workers,避免因過度併發導致容器被 OOM(Out‑Of‑Memory)或 CPU 限制。


總結

concurrent.futures 為 Python 開發者提供了一套 簡潔、統一且安全 的併發介面。透過 ThreadPoolExecutorProcessPoolExecutor,我們可以輕鬆:

  • 將 I/O 密集或 CPU 密集的工作平行化,顯著縮短執行時間。
  • 以 Future 物件追蹤任務狀態,在需要時取得結果、捕捉例外或取消任務。
  • 結合 asyncio,在事件迴圈中安全地執行阻塞函式,保持程式的非同步特性。

在實務開發中,掌握 適當的執行緒/行程選擇、資源釋放、例外處理與日誌紀錄,即可讓程式在效能與可維護性之間取得最佳平衡。未來若需求更複雜(如分散式佈署),concurrent.futures 仍是 進階併發框架(如 Ray、Dask)的概念基礎,值得投入時間深入了解。

祝你在 Python 的併發世界中玩得開心,寫出既快又穩的程式! 🚀