本文 AI 產出,尚未審核

Python 並行與非同步(Concurrency & Async)

主題:Task 與 Future


簡介

在現代的 Web 服務、資料處理與 IoT 應用中,同時處理多件事務已成為基本需求。傳統的同步程式會因為 I/O(檔案、網路、資料庫)阻塞而浪費大量 CPU 時間,導致效能瓶頸。Python 從 3.4 版開始引入 asyncio,提供了 協程(coroutine)TaskFuture 等抽象,讓開發者能以單執行緒的方式寫出「看似同時」執行的程式碼,同時保留可讀性與可維護性。

本篇文章聚焦於 TaskFuture 兩個核心概念,說明它們在 asyncio 生態系中的角色、如何正確建立與管理,並透過實作範例展示它們在真實專案中的應用。即使你是剛接觸非同步程式設計的初學者,也能在閱讀完本篇後,快速上手並避免常見的坑洞。


核心概念

1. 什麼是 Future?

Future 是一個 容器,用來表示「將來某個時間點」會完成的結果。它本身不會執行任何工作,只是 占位,等待其他程式碼(例如協程、執行緒或外部 I/O)把結果填入。Future 具備以下特性:

  • pending / done:在結果尚未產生時為 pending,完成後變為 done
  • result / exception:成功時可取得 result(),失敗時會拋出 exception()
  • awaitableFuture 實作了 __await__,所以可以直接在 await 表達式中使用。

asyncio 中,最常見的 Future 由事件迴圈自動建立,供協程之間傳遞訊息或同步狀態。

import asyncio

async def demo_future():
    loop = asyncio.get_running_loop()
    # 建立一個尚未完成的 Future
    fut = loop.create_future()

    # 模擬其他非同步工作在 2 秒後設定結果
    loop.call_later(2, fut.set_result, "完成!")
    print("等待 Future 結果...")

    # await 會在 Future 完成前暫停此協程
    result = await fut
    print(f"Future 結果: {result}")

asyncio.run(demo_future())

重點Future 並不會自行執行任何程式碼,必須有人(或其他協程)在適當時機呼叫 set_resultset_exception


2. 什麼是 Task?

TaskFuture 的子類別,同時具備 Future 的特性,且 會自動排入事件迴圈執行。簡單說,Task 把一個 協程(coroutine) 包裝起來,交給 asyncio 讓它在背景執行,並在完成時把結果放入自身(即 Future)中。這使得:

  • Task 本身也是 awaitable:可以在其他協程裡使用 await task 取得結果。
  • 支援取消(cancellation):呼叫 task.cancel() 會向協程拋出 CancelledError
  • 可取得執行狀態task.done(), task.cancelled(), task.exception() 等。

建立 Task 的常用方式:

# 方式一:直接使用 asyncio.create_task()
task = asyncio.create_task(coro())

# 方式二:使用 loop.create_task()
loop = asyncio.get_running_loop()
task = loop.create_task(coro())

以下範例示範同時啟動三個協程,並透過 Task 取得它們的結果:

import asyncio
import random

async def fetch_data(id: int) -> str:
    # 模擬不固定的 I/O 延遲
    delay = random.uniform(0.5, 2.0)
    await asyncio.sleep(delay)
    return f"資料{id}(延遲 {delay:.2f}s)"

async def main():
    # 建立多個 Task
    tasks = [asyncio.create_task(fetch_data(i)) for i in range(1, 4)]

    # 等待全部完成 (等同於 await asyncio.gather(*tasks))
    for task in tasks:
        result = await task          # 逐一取得結果
        print("Task 完成:", result)

asyncio.run(main())

3. Task 與 Future 的關係

項目 Future Task
是否自行執行協程 ❌ 必須外部設定結果 ✅ 內部自動排程協程
可否 await
是否可取消 ✅(但需自行實作) ✅(內建 cancel()
典型使用情境 跨協程通訊、手動控制結果 包裝協程、同時執行多任務

小技巧:在需要 手動控制 結果的情況下(例如外部事件驅動),仍可直接使用 Future;若只是想 把協程交給事件迴圈,則應使用 Task


4. 程式碼範例:實作自訂 Future + Task

下面示範一個 生產者 / 消費者 場景,生產者透過 Future 把資料「交給」消費者,消費者則以 Task 方式持續等待新資料。

import asyncio
import itertools

async def producer(fut: asyncio.Future):
    """每秒產生一筆資料,並設定 Future 結果"""
    for i in itertools.count(1):
        await asyncio.sleep(1)                # 模擬 I/O
        if not fut.done():
            fut.set_result(f"訊息 {i}")
            # 為下一輪建立新的 Future
            fut = asyncio.get_running_loop().create_future()

async def consumer():
    loop = asyncio.get_running_loop()
    fut = loop.create_future()

    # 以 Task 方式持續執行 producer
    prod_task = asyncio.create_task(producer(fut))

    try:
        while True:
            # 等待下一筆資料
            msg = await fut
            print("收到:", msg)
            # 重新建立等待的 Future
            fut = loop.create_future()
    except asyncio.CancelledError:
        prod_task.cancel()
        raise

async def main():
    consumer_task = asyncio.create_task(consumer())
    # 讓程式跑 5 秒後自行停止
    await asyncio.sleep(5)
    consumer_task.cancel()
    try:
        await consumer_task
    except asyncio.CancelledError:
        print("消費者已停止")

asyncio.run(main())

說明

  1. producer 每秒產生一次訊息,利用 Future.set_result 把資料傳遞給 consumer
  2. consumer 透過 await fut 等待資料,完成後立即建立新的 Future 以繼續等待。
  3. 整體流程是 單執行緒非阻塞,且透過 TaskFuture 完成訊息的「推」與「拉」機制。

5. 使用 asyncio.gatherTask 的差異

asyncio.gather(*coros_or_tasks, return_exceptions=False) 會一次性收集多個協程或 Task,在所有工作完成後一次返回結果。它背後仍是使用 Task,但提供了 批次管理 的便利。以下示範兩者的等價寫法:

# 方式一:手動建立 Task,逐一 await
tasks = [asyncio.create_task(fetch_data(i)) for i in range(5)]
results = []
for t in tasks:
    results.append(await t)

# 方式二:直接使用 gather(更簡潔)
results = await asyncio.gather(*(fetch_data(i) for i in range(5)))

若希望在 任務失敗時仍取得其他成功結果,可以使用 return_exceptions=True

results = await asyncio.gather(
    fetch_data(1),
    fetch_data(2),       # 假設會拋出例外
    fetch_data(3),
    return_exceptions=True,
)

for i, r in enumerate(results, 1):
    if isinstance(r, Exception):
        print(f"Task {i} 發生錯誤:", r)
    else:
        print(f"Task {i} 成功:", r)

常見陷阱與最佳實踐

常見問題 原因 解決方式
忘記 await Task 直接呼叫 asyncio.create_task(coro) 而未 await,導致例外被忽略或程式提前結束。 必須在適當時機 await task,或使用 asyncio.gather 收集。
Future 被多次設定結果 同一 Future 呼叫多次 set_resultset_exception,會拋 InvalidStateError 在設定前先檢查 future.done(),或使用 try/except 捕捉例外。
Task 取消後仍持續執行 協程內部未正確處理 CancelledError,導致取消無效。 在協程開頭或關鍵等待點使用 try: ... except asyncio.CancelledError: ... raise
阻塞 I/O 造成事件迴圈停頓 在協程中直接呼叫同步的長時間函式(如 time.sleep、檔案讀寫)。 改用 await asyncio.sleeprun_in_executor 或相應的非同步庫(如 aiofiles)。
過度建立 Task,導致記憶體泄漏 無限制地產生大量 Task,卻未回收或等待完成。 使用 semaphore 控制同時數量,或在 Task 完成後立即 await/cancel

推薦的最佳實踐

  1. 統一使用 asyncio.create_task:不論是簡單的協程還是複雜的 pipeline,都以此方式建立 Task,確保它們自動加入當前事件迴圈。
  2. async with 包裝資源:例如 asyncio.timeoutaiohttp.ClientSession,確保在例外或取消時自動釋放。
  3. 限制同時執行數量:使用 asyncio.Semaphoreasyncio.BoundedSemaphore,避免過度佔用系統資源。
  4. 將長時間同步工作交給執行緒池loop.run_in_executor(None, sync_func, *args),讓 CPU‑bound 任務不阻塞事件迴圈。
  5. 在測試環境使用 pytest-asyncio:寫測試時以 @pytest.mark.asyncio 裝飾測試函式,確保 Task 與 Future 行為正確。

實際應用場景

場景 為何使用 Task / Future 範例簡述
Web API 並行請求 多個外部服務同時呼叫,等待最慢者回傳即可。 asyncio.gather 包裝多個 aiohttp 請求 Task。
即時資料串流 生產者持續產生訊息,消費者即時處理。 前述 生產者 / 消費者 範例。
背景排程 (cron) 定時執行長時間任務,同時允許即時取消。 使用 asyncio.create_task(background_job()),搭配 task.cancel()
大量檔案 I/O 讀寫大量檔案時,避免阻塞主迴圈。 aiofiles 讀檔時返回 Future,配合 Task 同時處理多個檔案。
CPU‑bound 計算 需要在非同步環境中跑 CPU 密集運算。 loop.run_in_executor 產生 Future,再 await 結果。

總結

  • Future 是「未來結果」的容器,必須由外部手動設定結果或例外。
  • TaskFuture 的子類別,會自動把協程排入事件迴圈執行,並支援取消與狀態查詢。
  • 使用 asyncio.create_task 建立 Task,搭配 awaitasyncio.gather 取得結果,是最常見且安全的模式。
  • 在實務開發中,善用 Future 進行跨協程通訊、Task 處理並行工作,並遵守「不要阻塞事件迴圈」的原則,可大幅提升系統的 相應性可擴充性

透過本文的概念說明與實作範例,你已掌握在 Python 中運用 Task 與 Future 的基礎技巧,接下來可以把這些工具帶入自己的專案,打造更高效、彈性的非同步應用程式。祝開發順利,玩得開心! 🚀