Python 並行與非同步:深入 concurrent.futures 模組
簡介
在現代的 Python 應用程式中,效能與資源利用率往往是決定使用者體驗的關鍵。當程式需要同時處理多筆 I/O 任務(如網路請求、檔案讀寫)或大量 CPU 密集計算時,單純的同步執行會造成大量的等待時間,導致 CPU 閒置或系統回應遲緩。
concurrent.futures 是 Python 標準庫在 3.2 版加入的高階抽象,提供 **執行緒池(ThreadPoolExecutor)與行程池(ProcessPoolExecutor)**兩種簡潔的介面,讓開發者可以不必直接操作 threading、multiprocessing 或 asyncio,就能快速實現 並行(parallel) 或 非同步(asynchronous) 的工作流程。
本篇文章將從核心概念切入,搭配實作範例,說明如何使用 concurrent.futures 解決常見的併發需求,並提供最佳實踐與常見陷阱,幫助讀者在實務專案中自信地運用這個模組。
核心概念
1. Executor 與 Future
- Executor:負責管理工作執行的容器。
ThreadPoolExecutor使用多執行緒,適合 I/O 密集任務;ProcessPoolExecutor使用多行程,適合 CPU 密集任務。 - Future:代表一個尚未完成的計算結果。透過
Future,我們可以 取得結果、檢查狀態、或 設定回呼函式。
from concurrent.futures import ThreadPoolExecutor, Future
def task(x: int) -> int:
return x * x
executor = ThreadPoolExecutor(max_workers=4)
future: Future = executor.submit(task, 10) # 提交任務,立即返回 Future 物件
print(future.done()) # False,任務尚未完成
result = future.result() # 阻塞等待,取得回傳值
print(result) # 100
executor.shutdown()
重點:
Future.result()會阻塞直到任務完成,若想避免阻塞,可使用as_completed或add_done_callback。
2. 取得多筆任務結果:as_completed 與 map
as_completed:返回一個 iterator,依照任務完成的順序產生Future。executor.map:類似內建的map,會依提交順序返回結果,適合「所有任務都必須完成」的情況。
from concurrent.futures import ThreadPoolExecutor, as_completed
import time, random
def fetch_url(url: str) -> str:
time.sleep(random.uniform(0.1, 0.5)) # 模擬網路延遲
return f"content of {url}"
urls = [f"https://example.com/page{i}" for i in range(5)]
with ThreadPoolExecutor(max_workers=3) as executor:
# 方法一:as_completed
futures = [executor.submit(fetch_url, u) for u in urls]
for fut in as_completed(futures):
print(fut.result())
# 方法二:map(保持提交順序)
for content in executor.map(fetch_url, urls):
print(content)
技巧:
as_completed讓你可以即時處理最先完成的任務,減少等待時間;map則在需要保持順序或一次性取得所有結果時更直觀。
3. 執行緒與行程的選擇
| 任務類型 | 推薦 Executor | 為何選擇 |
|---|---|---|
| I/O 密集(網路、磁碟) | ThreadPoolExecutor |
GIL(全域解釋器鎖)在 I/O 等待時會釋放,執行緒切換成本低 |
| CPU 密集(數值運算、影像處理) | ProcessPoolExecutor |
每個行程擁有獨立的 Python 直譯器與 GIL,可真正平行執行 |
實務建議:在開發初期先以
ThreadPoolExecutor測試,若發現 CPU 使用率仍接近 100% 且瓶頸在計算上,再切換為ProcessPoolExecutor。
4. 例外處理與取消任務
Future 會把任務執行過程中拋出的例外捕獲,並在呼叫 result() 時重新拋出。若需要提前取消任務,可呼叫 future.cancel()(僅在任務尚未開始執行時有效)。
def divide(a, b):
return a / b # 可能拋出 ZeroDivisionError
with ThreadPoolExecutor() as executor:
f1 = executor.submit(divide, 10, 2)
f2 = executor.submit(divide, 5, 0) # 故意觸發錯誤
try:
print(f1.result()) # 5.0
print(f2.result()) # 這裡會拋出 ZeroDivisionError
except ZeroDivisionError as e:
print("除以零錯誤:", e)
# 取消尚未執行的任務
f3 = executor.submit(time.sleep, 5)
cancelled = f3.cancel()
print("任務是否成功取消?", cancelled)
5. 與 asyncio 的結合
雖然 concurrent.futures 已提供非同步的感覺,但在純 asyncio 程式中,我們仍可透過 loop.run_in_executor 把阻塞函式交給執行緒或行程池,以免阻塞事件迴圈。
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
def blocking_io():
time.sleep(2)
return "IO 完成"
async def main():
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io)
print(result)
asyncio.run(main())
要點:使用
run_in_executor時,不要在同一個執行緒內同時呼叫阻塞 I/O,否則會失去非同步的好處。
常見陷阱與最佳實踐
| 陷阱 | 說明 | 解決方案 |
|---|---|---|
| 過度建立執行緒/行程 | max_workers 設太大會導致系統資源耗盡,甚至產生「Too many open files」錯誤。 |
根據硬體(CPU 核心數、記憶體)與任務特性調整 max_workers;對 I/O 密集可稍高,CPU 密集則建議不超過核心數。 |
| 忘記關閉 Executor | 程式結束前未呼叫 shutdown() 可能導致子執行緒仍在執行,造成資源泄漏。 |
使用 with 陳述式自動關閉,或手動呼叫 executor.shutdown(wait=True)。 |
| 在子執行緒裡使用全域變數 | 子執行緒/行程不共享主執行緒的全域狀態,修改後不會回傳。 | 透過 queue.Queue、multiprocessing.Manager 或回傳值的方式傳遞資料。 |
| 例外被吞掉 | 若只檢查 future.done() 而不呼叫 result(),例外不會被暴露。 |
必須在取得結果時呼叫 future.result(),或在 as_completed 迴圈中處理例外。 |
在 ProcessPoolExecutor 中使用不可序列化的物件 |
行程間只能傳遞可 picklable 的物件,像是 lambda、局部函式會失敗。 | 定義在模組層級的函式或使用 cloudpickle(需自行安裝)。 |
最佳實踐
- 使用
with句法:確保資源自動釋放。 - 根據任務性質選擇 Executor:I/O 用執行緒,CPU 用行程。
- 限制
max_workers:一般建議max_workers = (os.cpu_count() or 1) * 5(I/O)或max_workers = os.cpu_count()(CPU)。 - 適度使用回呼(callback):
future.add_done_callback(fn)可在任務完成後立即觸發後續處理,避免阻塞主執行緒。 - 監控與日誌:在大型服務中加入
logging,記錄每個Future的開始、結束與例外,方便排錯。
import logging
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
def task(i):
logging.info(f"Task {i} start")
time.sleep(1)
logging.info(f"Task {i} done")
return i * i
with ThreadPoolExecutor(max_workers=3) as exe:
futures = [exe.submit(task, n) for n in range(5)]
for f in futures:
f.add_done_callback(lambda fut: logging.info(f"Result: {fut.result()}"))
實際應用場景
| 場景 | 為何使用 concurrent.futures |
範例簡述 |
|---|---|---|
| 批次爬蟲 | 同時發送多個 HTTP 請求,縮短總抓取時間 | 使用 ThreadPoolExecutor 下載多頁面,as_completed 即時處理回傳結果。 |
| 圖像處理管線 | 大量照片需要縮放、加水印等 CPU 密集運算 | 使用 ProcessPoolExecutor 把每張圖交給獨立行程處理,避免 GIL 限制。 |
| 資料庫批次寫入 | 多筆寫入操作可以平行化,提升寫入吞吐量 | 以 ThreadPoolExecutor 包裝資料庫客戶端的 execute_many,減少等待時間。 |
| 機器學習模型預測服務 | 多個請求同時呼叫模型推論,需快速回應 | 透過 ThreadPoolExecutor 把模型的 predict 包裝成非阻塞任務,配合 asyncio 提供 Web API。 |
| 定時任務調度 | 系統需要同時執行多個背景任務(備份、報表產生) | 使用 ProcessPoolExecutor 在背景執行長時間任務,主程式仍保持回應。 |
實務小提醒:在雲端或容器環境部署時,請務必根據容器的 CPU 限額設定
max_workers,避免因過度併發導致容器被 OOM(Out‑Of‑Memory)或 CPU 限制。
總結
concurrent.futures 為 Python 開發者提供了一套 簡潔、統一且安全 的併發介面。透過 ThreadPoolExecutor 與 ProcessPoolExecutor,我們可以輕鬆:
- 將 I/O 密集或 CPU 密集的工作平行化,顯著縮短執行時間。
- 以 Future 物件追蹤任務狀態,在需要時取得結果、捕捉例外或取消任務。
- 結合
asyncio,在事件迴圈中安全地執行阻塞函式,保持程式的非同步特性。
在實務開發中,掌握 適當的執行緒/行程選擇、資源釋放、例外處理與日誌紀錄,即可讓程式在效能與可維護性之間取得最佳平衡。未來若需求更複雜(如分散式佈署),concurrent.futures 仍是 進階併發框架(如 Ray、Dask)的概念基礎,值得投入時間深入了解。
祝你在 Python 的併發世界中玩得開心,寫出既快又穩的程式! 🚀