本文 AI 產出,尚未審核

FastAPI – 非同步程式設計:背景任務與非同步執行


簡介

在 Web API 開發中,效能使用者體驗往往取決於伺服器如何處理 I/O 密集的工作(例如寄送 Email、產生報表、呼叫外部服務)。傳統的同步處理會讓請求在等待 I/O 完成的期間被阻塞,導致併發量受限、回應時間變長。
FastAPI 內建支援 async/await 語法,讓開發者可以輕鬆寫出非同步端點;同時提供 BackgroundTasks 物件,讓長時間工作可以在回應已送出後於背景執行,避免阻塞前端。

本篇文章將從概念說明、實作範例、常見陷阱與最佳實踐,帶你完整掌握 FastAPI 中的背景任務與非同步執行,讓你的 API 更加 高效可擴充


核心概念

1. 為什麼需要非同步?

  • I/O 密集:資料庫查詢、檔案上傳、第三方 API 呼叫等,都需要等待外部資源回應。
  • CPU 空閒:在等待期間,CPU 可以切換去處理其他請求,只要程式以 await 釋放控制權。
  • 提升併發:同時處理更多請求,而不必額外增加執行緒或進程。

重點:非同步不是讓程式跑得更快,而是讓 等待時間 不再佔用執行緒資源。

2. FastAPI 的 async 支援

FastAPI 會根據端點函式的宣告方式自動決定是否以 ASGI 方式執行:

from fastapi import FastAPI

app = FastAPI()

@app.get("/sync")
def sync_endpoint():
    # 同步函式,會在執行期間阻塞
    ...

@app.get("/async")
async def async_endpoint():
    # 非同步函式,可使用 await 呼叫 I/O
    ...

只要在函式前加上 async,並在需要的地方使用 await,就能把 I/O 交給事件迴圈(如 uvicorn 使用的 asyncio)。

3. BackgroundTasks:回應後執行

有些工作 不需要即時回傳結果(例如寄送驗證信、寫入審計日誌),此時可使用 BackgroundTasks

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def write_log(message: str):
    with open("audit.log", "a") as f:
        f.write(message + "\n")

@app.post("/register")
async def register_user(username: str, background_tasks: BackgroundTasks):
    # 立即回應成功訊息
    background_tasks.add_task(write_log, f"User {username} registered")
    return {"msg": "註冊成功,驗證信已送出"}

add_task 會在回應送出後,於同一個事件迴圈中執行 write_log,不會阻塞請求。

4. 非同步背景任務

若背景工作本身需要 I/O(例如呼叫外部 API),應使用 非同步函式

import httpx

async def send_welcome_email(email: str):
    async with httpx.AsyncClient() as client:
        await client.post(
            "https://email.service/send",
            json={"to": email, "template": "welcome"},
        )

在端點中加入:

@app.post("/invite")
async def invite_user(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(send_welcome_email, email)   # 直接加入 async 函式
    return {"msg": "邀請已發送"}

FastAPI 會自動偵測 send_welcome_email 為 async,並在背景執行時使用 await

5. 使用 asyncio.create_task 直接建立任務

有時你想在 同一個請求內 同時觸發多個非同步工作,且不需要等它們完成,可使用 asyncio.create_task

import asyncio

@app.post("/bulk")
async def bulk_process(items: list[int]):
    async def process_item(item: int):
        await asyncio.sleep(1)   # 模擬 I/O
        return item * 2

    tasks = [asyncio.create_task(process_item(i)) for i in items]
    # 不等 tasks 完成,直接回傳
    return {"msg": f"已接受 {len(items)} 筆任務"}

若需要等全部完成,則改用 await asyncio.gather(*tasks)


程式碼範例

以下提供 5 個實用範例,涵蓋同步、非同步、背景任務與錯誤處理。

範例 1:簡易同步端點

@app.get("/ping")
def ping():
    """最基本的同步回應,用於健康檢查。"""
    return {"msg": "pong"}

範例 2:非同步資料庫查詢(使用 asyncpg)

import asyncpg

async def fetch_user(user_id: int):
    conn = await asyncpg.connect(dsn="postgresql://user:pwd@db/postgres")
    row = await conn.fetchrow("SELECT * FROM users WHERE id=$1", user_id)
    await conn.close()
    return dict(row)

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    user = await fetch_user(user_id)
    return {"user": user}

說明await 讓事件迴圈在等待 DB 回應時可以處理其他請求。


範例 3:背景寫入審計日誌(同步函式)

def audit(action: str, payload: dict):
    with open("audit.log", "a") as f:
        f.write(f"{action}: {payload}\n")

@app.post("/order")
async def place_order(order: dict, background_tasks: BackgroundTasks):
    # 假設此處已完成下單邏輯
    background_tasks.add_task(audit, "order_created", order)
    return {"msg": "訂單已建立"}

範例 4:非同步背景任務 – 發送 Slack 訊息

import httpx

async def post_to_slack(text: str):
    async with httpx.AsyncClient() as client:
        await client.post(
            "https://hooks.slack.com/services/XXXXX/XXXXX/XXXXX",
            json={"text": text},
        )

@app.post("/alert")
async def trigger_alert(event: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(post_to_slack, f"⚠️ 事件發生: {event}")
    return {"msg": "警告已送出"}

範例 5:同時執行多個非同步任務並收集結果

@app.get("/stats")
async def get_stats():
    async def fetch_cpu():
        await asyncio.sleep(0.5)   # 模擬 I/O
        return {"cpu": 42}

    async def fetch_memory():
        await asyncio.sleep(0.7)
        return {"memory": "8GB"}

    cpu_task = asyncio.create_task(fetch_cpu())
    mem_task = asyncio.create_task(fetch_memory())

    cpu, mem = await asyncio.gather(cpu_task, mem_task)
    return {"cpu": cpu["cpu"], "memory": mem["memory"]}

常見陷阱與最佳實踐

陷阱 說明 解決方式
混用同步與非同步 在 async 端點內直接呼叫阻塞的函式(如 time.sleep、同步 DB 驅動)會阻塞整個事件迴圈。 使用 await asyncio.to_thread(sync_func, *args) 或改用 async 版本的庫。
忘記 await 呼叫 async 函式卻未加 await,會得到 <coroutine object ...>,且不會執行。 確認所有需要結果的 async 呼叫都有 await
背景任務例外未捕獲 BackgroundTasks 中的例外不會傳回給客戶端,若未處理會造成隱藏錯誤。 在背景函式內部使用 try/except 並記錄日誌。
過度使用背景任務 把所有耗時工作都塞進 BackgroundTasks,可能導致同時大量任務堆積,佔用記憶體。 評估是否需要佇列系統(如 Celery、RQ)或限制同時任務數量。
共享全域變數 多個 async 任務同時寫入同一全域變數會產生競爭條件。 使用 asyncio.Lock 或避免共享可變狀態。

最佳實踐

  1. 優先使用 async I/O:選擇支援 await 的套件(httpx.AsyncClientdatabasesasyncpg 等)。
  2. 背景任務只做「fire‑and‑forget」:不需要即時回傳結果的作業才放入 BackgroundTasks
  3. 日誌與錯誤捕獲:在背景函式內部加入完整的例外處理與結構化日誌。
  4. 限制同時任務:可利用 semaphore = asyncio.Semaphore(10) 控制同時執行的背景任務數量。
  5. 測試非同步行為:使用 pytest-asyncio 撰寫測試,確保 await、背景任務的行為符合預期。

實際應用場景

場景 為何使用非同步/背景任務 範例實作
使用者註冊 註冊後需寄送驗證信、寫入審計日誌,這些不必同步完成。 BackgroundTasks.add_task(send_email, ...)
大量資料匯入 前端上傳 CSV,後端需分批寫入資料庫,避免阻塞 API。 asyncio.create_task(import_chunk(chunk))
即時通知 觸發事件時需向 Slack、Line、Discord 同時發訊息。 多個 await httpx.AsyncClient().post(...)
報表產生 報表生成耗時,使用者只需要收到「已開始產生」的回應。 背景任務產生 PDF,完成後上傳至雲端儲存。
第三方 API 串接 多個外部服務需同時呼叫(如付款、物流),使用 asyncio.gather 同步等待。 await asyncio.gather(pay(), ship())

總結

FastAPI 讓 非同步程式設計 變得簡潔且具備高效能。透過 async defawait 以及內建的 BackgroundTasks,開發者可以:

  • 釋放 I/O 等待時間,提升服務併發量。
  • 將不需要即時回傳的工作 交給背景執行,避免阻塞使用者請求。
  • 保持程式碼可讀性,因為非同步邏輯與同步寫法在語法上幾乎相同。

在實務開發中,掌握何時使用 async、何時使用 BackgroundTasks,以及如何正確處理例外與資源競爭,將直接影響系統的可靠性與可維護性。希望本篇文章的概念說明與範例能幫助你在 FastAPI 專案中寫出更快、更穩的 API!祝開發順利 🚀