Python 並行與非同步(Concurrency & Async)
主題:Task 與 Future
簡介
在現代的 Web 服務、資料處理與 IoT 應用中,同時處理多件事務已成為基本需求。傳統的同步程式會因為 I/O(檔案、網路、資料庫)阻塞而浪費大量 CPU 時間,導致效能瓶頸。Python 從 3.4 版開始引入 asyncio,提供了 協程(coroutine)、Task、Future 等抽象,讓開發者能以單執行緒的方式寫出「看似同時」執行的程式碼,同時保留可讀性與可維護性。
本篇文章聚焦於 Task 與 Future 兩個核心概念,說明它們在 asyncio 生態系中的角色、如何正確建立與管理,並透過實作範例展示它們在真實專案中的應用。即使你是剛接觸非同步程式設計的初學者,也能在閱讀完本篇後,快速上手並避免常見的坑洞。
核心概念
1. 什麼是 Future?
Future 是一個 容器,用來表示「將來某個時間點」會完成的結果。它本身不會執行任何工作,只是 占位,等待其他程式碼(例如協程、執行緒或外部 I/O)把結果填入。Future 具備以下特性:
- pending / done:在結果尚未產生時為 pending,完成後變為 done。
- result / exception:成功時可取得
result(),失敗時會拋出exception()。 - awaitable:
Future實作了__await__,所以可以直接在await表達式中使用。
在 asyncio 中,最常見的 Future 由事件迴圈自動建立,供協程之間傳遞訊息或同步狀態。
import asyncio
async def demo_future():
loop = asyncio.get_running_loop()
# 建立一個尚未完成的 Future
fut = loop.create_future()
# 模擬其他非同步工作在 2 秒後設定結果
loop.call_later(2, fut.set_result, "完成!")
print("等待 Future 結果...")
# await 會在 Future 完成前暫停此協程
result = await fut
print(f"Future 結果: {result}")
asyncio.run(demo_future())
重點:
Future並不會自行執行任何程式碼,必須有人(或其他協程)在適當時機呼叫set_result或set_exception。
2. 什麼是 Task?
Task 是 Future 的子類別,同時具備 Future 的特性,且 會自動排入事件迴圈執行。簡單說,Task 把一個 協程(coroutine) 包裝起來,交給 asyncio 讓它在背景執行,並在完成時把結果放入自身(即 Future)中。這使得:
- Task 本身也是 awaitable:可以在其他協程裡使用
await task取得結果。 - 支援取消(cancellation):呼叫
task.cancel()會向協程拋出CancelledError。 - 可取得執行狀態:
task.done(),task.cancelled(),task.exception()等。
建立 Task 的常用方式:
# 方式一:直接使用 asyncio.create_task()
task = asyncio.create_task(coro())
# 方式二:使用 loop.create_task()
loop = asyncio.get_running_loop()
task = loop.create_task(coro())
以下範例示範同時啟動三個協程,並透過 Task 取得它們的結果:
import asyncio
import random
async def fetch_data(id: int) -> str:
# 模擬不固定的 I/O 延遲
delay = random.uniform(0.5, 2.0)
await asyncio.sleep(delay)
return f"資料{id}(延遲 {delay:.2f}s)"
async def main():
# 建立多個 Task
tasks = [asyncio.create_task(fetch_data(i)) for i in range(1, 4)]
# 等待全部完成 (等同於 await asyncio.gather(*tasks))
for task in tasks:
result = await task # 逐一取得結果
print("Task 完成:", result)
asyncio.run(main())
3. Task 與 Future 的關係
| 項目 | Future |
Task |
|---|---|---|
| 是否自行執行協程 | ❌ 必須外部設定結果 | ✅ 內部自動排程協程 |
可否 await |
✅ | ✅ |
| 是否可取消 | ✅(但需自行實作) | ✅(內建 cancel()) |
| 典型使用情境 | 跨協程通訊、手動控制結果 | 包裝協程、同時執行多任務 |
小技巧:在需要 手動控制 結果的情況下(例如外部事件驅動),仍可直接使用
Future;若只是想 把協程交給事件迴圈,則應使用Task。
4. 程式碼範例:實作自訂 Future + Task
下面示範一個 生產者 / 消費者 場景,生產者透過 Future 把資料「交給」消費者,消費者則以 Task 方式持續等待新資料。
import asyncio
import itertools
async def producer(fut: asyncio.Future):
"""每秒產生一筆資料,並設定 Future 結果"""
for i in itertools.count(1):
await asyncio.sleep(1) # 模擬 I/O
if not fut.done():
fut.set_result(f"訊息 {i}")
# 為下一輪建立新的 Future
fut = asyncio.get_running_loop().create_future()
async def consumer():
loop = asyncio.get_running_loop()
fut = loop.create_future()
# 以 Task 方式持續執行 producer
prod_task = asyncio.create_task(producer(fut))
try:
while True:
# 等待下一筆資料
msg = await fut
print("收到:", msg)
# 重新建立等待的 Future
fut = loop.create_future()
except asyncio.CancelledError:
prod_task.cancel()
raise
async def main():
consumer_task = asyncio.create_task(consumer())
# 讓程式跑 5 秒後自行停止
await asyncio.sleep(5)
consumer_task.cancel()
try:
await consumer_task
except asyncio.CancelledError:
print("消費者已停止")
asyncio.run(main())
說明
producer每秒產生一次訊息,利用Future.set_result把資料傳遞給consumer。consumer透過await fut等待資料,完成後立即建立新的Future以繼續等待。- 整體流程是 單執行緒、非阻塞,且透過
Task與Future完成訊息的「推」與「拉」機制。
5. 使用 asyncio.gather 與 Task 的差異
asyncio.gather(*coros_or_tasks, return_exceptions=False) 會一次性收集多個協程或 Task,在所有工作完成後一次返回結果。它背後仍是使用 Task,但提供了 批次管理 的便利。以下示範兩者的等價寫法:
# 方式一:手動建立 Task,逐一 await
tasks = [asyncio.create_task(fetch_data(i)) for i in range(5)]
results = []
for t in tasks:
results.append(await t)
# 方式二:直接使用 gather(更簡潔)
results = await asyncio.gather(*(fetch_data(i) for i in range(5)))
若希望在 任務失敗時仍取得其他成功結果,可以使用 return_exceptions=True:
results = await asyncio.gather(
fetch_data(1),
fetch_data(2), # 假設會拋出例外
fetch_data(3),
return_exceptions=True,
)
for i, r in enumerate(results, 1):
if isinstance(r, Exception):
print(f"Task {i} 發生錯誤:", r)
else:
print(f"Task {i} 成功:", r)
常見陷阱與最佳實踐
| 常見問題 | 原因 | 解決方式 |
|---|---|---|
忘記 await Task |
直接呼叫 asyncio.create_task(coro) 而未 await,導致例外被忽略或程式提前結束。 |
必須在適當時機 await task,或使用 asyncio.gather 收集。 |
| Future 被多次設定結果 | 同一 Future 呼叫多次 set_result 或 set_exception,會拋 InvalidStateError。 |
在設定前先檢查 future.done(),或使用 try/except 捕捉例外。 |
| Task 取消後仍持續執行 | 協程內部未正確處理 CancelledError,導致取消無效。 |
在協程開頭或關鍵等待點使用 try: ... except asyncio.CancelledError: ... raise。 |
| 阻塞 I/O 造成事件迴圈停頓 | 在協程中直接呼叫同步的長時間函式(如 time.sleep、檔案讀寫)。 |
改用 await asyncio.sleep、run_in_executor 或相應的非同步庫(如 aiofiles)。 |
| 過度建立 Task,導致記憶體泄漏 | 無限制地產生大量 Task,卻未回收或等待完成。 | 使用 semaphore 控制同時數量,或在 Task 完成後立即 await/cancel。 |
推薦的最佳實踐
- 統一使用
asyncio.create_task:不論是簡單的協程還是複雜的 pipeline,都以此方式建立 Task,確保它們自動加入當前事件迴圈。 - 以
async with包裝資源:例如asyncio.timeout、aiohttp.ClientSession,確保在例外或取消時自動釋放。 - 限制同時執行數量:使用
asyncio.Semaphore或asyncio.BoundedSemaphore,避免過度佔用系統資源。 - 將長時間同步工作交給執行緒池:
loop.run_in_executor(None, sync_func, *args),讓 CPU‑bound 任務不阻塞事件迴圈。 - 在測試環境使用
pytest-asyncio:寫測試時以@pytest.mark.asyncio裝飾測試函式,確保 Task 與 Future 行為正確。
實際應用場景
| 場景 | 為何使用 Task / Future | 範例簡述 |
|---|---|---|
| Web API 並行請求 | 多個外部服務同時呼叫,等待最慢者回傳即可。 | asyncio.gather 包裝多個 aiohttp 請求 Task。 |
| 即時資料串流 | 生產者持續產生訊息,消費者即時處理。 | 前述 生產者 / 消費者 範例。 |
| 背景排程 (cron) | 定時執行長時間任務,同時允許即時取消。 | 使用 asyncio.create_task(background_job()),搭配 task.cancel()。 |
| 大量檔案 I/O | 讀寫大量檔案時,避免阻塞主迴圈。 | aiofiles 讀檔時返回 Future,配合 Task 同時處理多個檔案。 |
| CPU‑bound 計算 | 需要在非同步環境中跑 CPU 密集運算。 | loop.run_in_executor 產生 Future,再 await 結果。 |
總結
- Future 是「未來結果」的容器,必須由外部手動設定結果或例外。
- Task 是 Future 的子類別,會自動把協程排入事件迴圈執行,並支援取消與狀態查詢。
- 使用
asyncio.create_task建立 Task,搭配await或asyncio.gather取得結果,是最常見且安全的模式。 - 在實務開發中,善用 Future 進行跨協程通訊、Task 處理並行工作,並遵守「不要阻塞事件迴圈」的原則,可大幅提升系統的 相應性 與 可擴充性。
透過本文的概念說明與實作範例,你已掌握在 Python 中運用 Task 與 Future 的基礎技巧,接下來可以把這些工具帶入自己的專案,打造更高效、彈性的非同步應用程式。祝開發順利,玩得開心! 🚀