本文 AI 產出,尚未審核

FastAPI 課程 – 非同步程式設計

主題:asyncio.create_task


簡介

FastAPI 中,非同步(asynchronous)是提升 API 效能與可擴充性的關鍵技術。
傳統的同步函式會在 I/O(例如資料庫查詢、外部 API 呼叫)等待時阻塞整個事件迴圈,導致同時處理的請求數量受限。
使用 asyncio.create_task 可以把一段協程(coroutine)排入事件迴圈,讓它在背景執行,而不必等到結果返回才繼續後續邏輯,從而實現 「同時跑多件事」 的效果。

本篇文章將從概念說明、實作範例、常見陷阱與最佳實踐,直到實務應用案例,完整呈現 asyncio.create_task 在 FastAPI 專案中的使用方式,適合剛接觸非同步的初學者,也能提供中級開發者進一步優化的參考。


核心概念

1. 協程、事件迴圈與任務(Task)

  • 協程(coroutine):以 async def 定義的函式,返回一個 coroutine object,只有在事件迴圈中被 await排程 才會真正執行。
  • 事件迴圈(event loop):管理所有待執行的協程與 I/O 事件的核心機制,FastAPI 內部會在啟動時自動建立並運行。
  • 任務(Task)asyncio.create_task(coro) 會把協程包裝成 Task,將其加入事件迴圈,並立即排程執行。Task 會在完成時持有結果或例外。

重點create_task 不會阻塞 呼叫端,函式會直接返回一個 Task 物件,讓程式可以繼續往下走。

2. 為什麼不直接 await

await 會等到協程完成才返回結果,適合需要即時結果的情況。
但在以下情境下,我們更想「先把工作交給背景」:

  • 不需要立即回應(例如發送 Email、寫入日誌、觸發長時間的報表產生)。
  • 同時執行多個 I/O,不想讓其中一個阻塞其他請求。
  • 避免阻塞 FastAPI 的請求處理流程,提升整體吞吐量。

此時 asyncio.create_task 正是最佳選擇。

3. asyncio.create_task 的基本語法

import asyncio

async def my_coroutine(param):
    # 模擬 I/O
    await asyncio.sleep(1)
    return f"Result: {param}"

# 建立任務
task = asyncio.create_task(my_coroutine("hello"))

# 之後可以透過 await 取得結果
result = await task   # 這裡仍會等待 task 完成
  • create_task 必須在 已啟動的事件迴圈 中呼叫,否則會拋出 RuntimeError
  • 返回的 Task 物件支援 awaitcancel()add_done_callback() 等方法。

程式碼範例

以下示範 5 個實用情境,從最簡單的背景任務,到在 FastAPI 路由中結合 create_task

範例 1️⃣ 基礎背景任務

import asyncio

async def background_job():
    print("🔹 任務開始")
    await asyncio.sleep(2)          # 假裝是長時間 I/O
    print("🔹 任務完成")

async def main():
    # 把協程排入背景
    task = asyncio.create_task(background_job())
    print("⚡ 主流程繼續執行")
    await asyncio.sleep(1)          # 主流程仍可執行其他事
    await task                      # 最後等待任務結束(可省略)

asyncio.run(main())

說明create_taskbackground_job 在背景執行,main 不會被阻塞。


範例 2️⃣ 同時執行多個協程

import asyncio

async def fetch_data(idx: int):
    await asyncio.sleep(1 + idx * 0.2)   # 模擬不同延遲
    return f"data-{idx}"

async def main():
    tasks = [asyncio.create_task(fetch_data(i)) for i in range(5)]
    # 同時跑 5 個任務,立即返回
    results = await asyncio.gather(*tasks)   # gather 會等所有任務完成
    print("所有結果:", results)

asyncio.run(main())

技巧:使用 asyncio.gather 取得多個 Task 的結果,避免逐一 await 造成的序列化。


範例 3️⃣ 在 FastAPI 路由中觸發背景任務

from fastapi import FastAPI, BackgroundTasks
import asyncio

app = FastAPI()

async def send_email(to: str, subject: str, body: str):
    # 假設這裡呼叫外部 SMTP 服務
    await asyncio.sleep(3)
    print(f"📧 Email sent to {to}")

@app.post("/notify")
async def notify_user(email: str, background_tasks: BackgroundTasks):
    # 直接使用 FastAPI 提供的 BackgroundTasks
    background_tasks.add_task(asyncio.create_task, send_email(email, "Welcome", "Hello!"))
    return {"message": "通知已排程"}

重點:FastAPI 的 BackgroundTasks 允許在回應前排程任務,內部仍會使用 asyncio.create_task 來執行非同步工作。


範例 4️⃣ 取消(Cancel)背景任務

import asyncio

async def long_running():
    try:
        for i in range(10):
            await asyncio.sleep(1)
            print(f"🔄 第 {i+1} 秒")
    except asyncio.CancelledError:
        print("🛑 任務被取消")
        raise

async def main():
    task = asyncio.create_task(long_running())
    await asyncio.sleep(3)      # 讓任務跑 3 秒
    task.cancel()               # 取消任務
    try:
        await task
    except asyncio.CancelledError:
        print("✅ 已捕獲取消例外")

asyncio.run(main())

說明:若任務不再需要,可呼叫 task.cancel(),並在協程內捕獲 CancelledError 完成清理。


範例 5️⃣ 任務完成回呼(Callback)

import asyncio

async def compute(x: int):
    await asyncio.sleep(2)
    return x * x

def on_done(task: asyncio.Task):
    try:
        result = task.result()
        print(f"✅ 任務完成,結果 = {result}")
    except Exception as e:
        print(f"❗ 任務失敗:{e}")

async def main():
    task = asyncio.create_task(compute(7))
    task.add_done_callback(on_done)   # 任務結束時自動呼叫
    await asyncio.sleep(3)            # 主流程繼續

asyncio.run(main())

技巧add_done_callback 可用於 記錄發送事件更新快取 等需求,避免在主流程中額外 await


常見陷阱與最佳實踐

陷阱 說明 解決方式
未在事件迴圈內呼叫 create_task 在同步程式碼或尚未啟動的迴圈中使用會拋 RuntimeError 使用 asyncio.get_running_loop() 或在 async def 裡呼叫。
忘記 await 任務結果 若任務拋例外,未 await 會導致「未處理的例外」警告。 在需要結果的地方 await task,或在 add_done_callback 中處理例外。
過度建立任務 大量短暫任務會造成 Task 內部排程成本,甚至記憶體飆升。 使用 semaphoreworker poolasyncio.Semaphoreasyncio.Queue)限制同時任務數。
阻塞型程式碼 在協程內呼叫同步的阻塞函式(如 time.sleep)會凍結整個事件迴圈。 改用非同步函式(await asyncio.sleep),或使用 ThreadPoolExecutor 包裝。
背景任務失去上下文 FastAPI 的依賴注入(DI)在背景任務中可能失效。 在背景任務中手動傳遞需要的資源(如 DB 連線、設定),或使用 ContextVar 保留上下文。

最佳實踐

  1. 盡量在同一層級建立 Task:不要在已完成的 Task 裡再呼叫 create_task,會增加不必要的層次。
  2. 使用 asyncio.create_task 取代 ensure_future:前者語意更清晰,且在 Python 3.7+ 為官方推薦。
  3. 對長時間任務加上 Timeoutasyncio.wait_for(task, timeout) 防止無限等待。
  4. 在 FastAPI 中搭配 BackgroundTasks:讓路由返回前即完成排程,保持 API 響應速度。
  5. 記錄任務狀態:使用 task.add_done_callback 或自訂監控,方便除錯與觀察。

實際應用場景

場景 為什麼適合使用 asyncio.create_task
發送驗證 Email 用戶註冊後立即回傳成功訊息,同時在背景發信,避免等待 SMTP 回應。
產生 PDF 報表 前端請求產生報表,API 回傳「已排程」訊息,報表完成後透過 WebSocket 推送或寫入資料庫。
批次資料同步 每次接收到 webhook 時,立即回應 200,並在背景同步外部系統資料。
快取預熱 當熱門資源被請求時,同步回傳結果,並在背景異步預先載入相關快取,以提升後續請求速度。
定時任務 (Scheduler) 替代 使用 asyncio.create_task 搭配 asyncio.sleepasyncio.wait 實作簡易的內部排程器。

範例:在 FastAPI 中,使用 create_task 實作「使用者上傳檔案後,立即回傳成功,並在背景壓縮與上傳至 S3」:

from fastapi import FastAPI, UploadFile, File
import aiofiles, asyncio, aioboto3

app = FastAPI()

async def compress_and_upload(file_path: str, bucket: str, key: str):
    # 假設使用 async 版的壓縮與 S3 客戶端
    await asyncio.sleep(1)          # 壓縮模擬
    session = aioboto3.Session()
    async with session.client('s3') as s3:
        await s3.upload_file(file_path, bucket, key)
    print("✅ 上傳完成")

@app.post("/upload")
async def upload(file: UploadFile = File(...)):
    save_path = f"/tmp/{file.filename}"
    async with aiofiles.open(save_path, "wb") as out_file:
        content = await file.read()
        await out_file.write(content)
    # 背景壓縮上傳
    asyncio.create_task(compress_and_upload(save_path, "my-bucket", file.filename))
    return {"status": "file received, processing in background"}

此模式讓 API 即時回應,同時保證大檔案處理不會阻塞其他使用者。


總結

  • asyncio.create_task將協程排入事件迴圈、以 非阻塞 方式執行的核心工具,對提升 FastAPI 應用的併發能力至關重要。
  • 透過 背景任務同時執行多個 I/O取消與回呼 等技巧,我們可以在不影響主請求流程的前提下,完成 Email、報表、檔案處理等耗時工作。
  • 使用時務必注意 事件迴圈的存活、例外處理、任務數量控制,並結合 FastAPI 的 BackgroundTasksContextVar 等機制,使程式碼保持 可測試、易除錯

掌握 asyncio.create_task,不僅能讓你的 FastAPI 服務 更快、更穩,也為未來的微服務與雲端原生應用奠定堅實的非同步基礎。祝開發順利,快去試試看吧! 🚀