FastAPI – 非同步程式設計:背景任務與非同步執行
簡介
在 Web API 開發中,效能與使用者體驗往往取決於伺服器如何處理 I/O 密集的工作(例如寄送 Email、產生報表、呼叫外部服務)。傳統的同步處理會讓請求在等待 I/O 完成的期間被阻塞,導致併發量受限、回應時間變長。
FastAPI 內建支援 async/await 語法,讓開發者可以輕鬆寫出非同步端點;同時提供 BackgroundTasks 物件,讓長時間工作可以在回應已送出後於背景執行,避免阻塞前端。
本篇文章將從概念說明、實作範例、常見陷阱與最佳實踐,帶你完整掌握 FastAPI 中的背景任務與非同步執行,讓你的 API 更加 高效、可擴充。
核心概念
1. 為什麼需要非同步?
- I/O 密集:資料庫查詢、檔案上傳、第三方 API 呼叫等,都需要等待外部資源回應。
- CPU 空閒:在等待期間,CPU 可以切換去處理其他請求,只要程式以
await釋放控制權。 - 提升併發:同時處理更多請求,而不必額外增加執行緒或進程。
重點:非同步不是讓程式跑得更快,而是讓 等待時間 不再佔用執行緒資源。
2. FastAPI 的 async 支援
FastAPI 會根據端點函式的宣告方式自動決定是否以 ASGI 方式執行:
from fastapi import FastAPI
app = FastAPI()
@app.get("/sync")
def sync_endpoint():
# 同步函式,會在執行期間阻塞
...
@app.get("/async")
async def async_endpoint():
# 非同步函式,可使用 await 呼叫 I/O
...
只要在函式前加上 async,並在需要的地方使用 await,就能把 I/O 交給事件迴圈(如 uvicorn 使用的 asyncio)。
3. BackgroundTasks:回應後執行
有些工作 不需要即時回傳結果(例如寄送驗證信、寫入審計日誌),此時可使用 BackgroundTasks:
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def write_log(message: str):
with open("audit.log", "a") as f:
f.write(message + "\n")
@app.post("/register")
async def register_user(username: str, background_tasks: BackgroundTasks):
# 立即回應成功訊息
background_tasks.add_task(write_log, f"User {username} registered")
return {"msg": "註冊成功,驗證信已送出"}
add_task 會在回應送出後,於同一個事件迴圈中執行 write_log,不會阻塞請求。
4. 非同步背景任務
若背景工作本身需要 I/O(例如呼叫外部 API),應使用 非同步函式:
import httpx
async def send_welcome_email(email: str):
async with httpx.AsyncClient() as client:
await client.post(
"https://email.service/send",
json={"to": email, "template": "welcome"},
)
在端點中加入:
@app.post("/invite")
async def invite_user(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(send_welcome_email, email) # 直接加入 async 函式
return {"msg": "邀請已發送"}
FastAPI 會自動偵測 send_welcome_email 為 async,並在背景執行時使用 await。
5. 使用 asyncio.create_task 直接建立任務
有時你想在 同一個請求內 同時觸發多個非同步工作,且不需要等它們完成,可使用 asyncio.create_task:
import asyncio
@app.post("/bulk")
async def bulk_process(items: list[int]):
async def process_item(item: int):
await asyncio.sleep(1) # 模擬 I/O
return item * 2
tasks = [asyncio.create_task(process_item(i)) for i in items]
# 不等 tasks 完成,直接回傳
return {"msg": f"已接受 {len(items)} 筆任務"}
若需要等全部完成,則改用 await asyncio.gather(*tasks)。
程式碼範例
以下提供 5 個實用範例,涵蓋同步、非同步、背景任務與錯誤處理。
範例 1:簡易同步端點
@app.get("/ping")
def ping():
"""最基本的同步回應,用於健康檢查。"""
return {"msg": "pong"}
範例 2:非同步資料庫查詢(使用 asyncpg)
import asyncpg
async def fetch_user(user_id: int):
conn = await asyncpg.connect(dsn="postgresql://user:pwd@db/postgres")
row = await conn.fetchrow("SELECT * FROM users WHERE id=$1", user_id)
await conn.close()
return dict(row)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = await fetch_user(user_id)
return {"user": user}
說明:
await讓事件迴圈在等待 DB 回應時可以處理其他請求。
範例 3:背景寫入審計日誌(同步函式)
def audit(action: str, payload: dict):
with open("audit.log", "a") as f:
f.write(f"{action}: {payload}\n")
@app.post("/order")
async def place_order(order: dict, background_tasks: BackgroundTasks):
# 假設此處已完成下單邏輯
background_tasks.add_task(audit, "order_created", order)
return {"msg": "訂單已建立"}
範例 4:非同步背景任務 – 發送 Slack 訊息
import httpx
async def post_to_slack(text: str):
async with httpx.AsyncClient() as client:
await client.post(
"https://hooks.slack.com/services/XXXXX/XXXXX/XXXXX",
json={"text": text},
)
@app.post("/alert")
async def trigger_alert(event: str, background_tasks: BackgroundTasks):
background_tasks.add_task(post_to_slack, f"⚠️ 事件發生: {event}")
return {"msg": "警告已送出"}
範例 5:同時執行多個非同步任務並收集結果
@app.get("/stats")
async def get_stats():
async def fetch_cpu():
await asyncio.sleep(0.5) # 模擬 I/O
return {"cpu": 42}
async def fetch_memory():
await asyncio.sleep(0.7)
return {"memory": "8GB"}
cpu_task = asyncio.create_task(fetch_cpu())
mem_task = asyncio.create_task(fetch_memory())
cpu, mem = await asyncio.gather(cpu_task, mem_task)
return {"cpu": cpu["cpu"], "memory": mem["memory"]}
常見陷阱與最佳實踐
| 陷阱 | 說明 | 解決方式 |
|---|---|---|
| 混用同步與非同步 | 在 async 端點內直接呼叫阻塞的函式(如 time.sleep、同步 DB 驅動)會阻塞整個事件迴圈。 |
使用 await asyncio.to_thread(sync_func, *args) 或改用 async 版本的庫。 |
忘記 await |
呼叫 async 函式卻未加 await,會得到 <coroutine object ...>,且不會執行。 |
確認所有需要結果的 async 呼叫都有 await。 |
| 背景任務例外未捕獲 | BackgroundTasks 中的例外不會傳回給客戶端,若未處理會造成隱藏錯誤。 | 在背景函式內部使用 try/except 並記錄日誌。 |
| 過度使用背景任務 | 把所有耗時工作都塞進 BackgroundTasks,可能導致同時大量任務堆積,佔用記憶體。 | 評估是否需要佇列系統(如 Celery、RQ)或限制同時任務數量。 |
| 共享全域變數 | 多個 async 任務同時寫入同一全域變數會產生競爭條件。 | 使用 asyncio.Lock 或避免共享可變狀態。 |
最佳實踐
- 優先使用 async I/O:選擇支援
await的套件(httpx.AsyncClient、databases、asyncpg等)。 - 背景任務只做「fire‑and‑forget」:不需要即時回傳結果的作業才放入
BackgroundTasks。 - 日誌與錯誤捕獲:在背景函式內部加入完整的例外處理與結構化日誌。
- 限制同時任務:可利用
semaphore = asyncio.Semaphore(10)控制同時執行的背景任務數量。 - 測試非同步行為:使用
pytest-asyncio撰寫測試,確保await、背景任務的行為符合預期。
實際應用場景
| 場景 | 為何使用非同步/背景任務 | 範例實作 |
|---|---|---|
| 使用者註冊 | 註冊後需寄送驗證信、寫入審計日誌,這些不必同步完成。 | BackgroundTasks.add_task(send_email, ...) |
| 大量資料匯入 | 前端上傳 CSV,後端需分批寫入資料庫,避免阻塞 API。 | asyncio.create_task(import_chunk(chunk)) |
| 即時通知 | 觸發事件時需向 Slack、Line、Discord 同時發訊息。 | 多個 await httpx.AsyncClient().post(...) |
| 報表產生 | 報表生成耗時,使用者只需要收到「已開始產生」的回應。 | 背景任務產生 PDF,完成後上傳至雲端儲存。 |
| 第三方 API 串接 | 多個外部服務需同時呼叫(如付款、物流),使用 asyncio.gather 同步等待。 |
await asyncio.gather(pay(), ship()) |
總結
FastAPI 讓 非同步程式設計 變得簡潔且具備高效能。透過 async def、await 以及內建的 BackgroundTasks,開發者可以:
- 釋放 I/O 等待時間,提升服務併發量。
- 將不需要即時回傳的工作 交給背景執行,避免阻塞使用者請求。
- 保持程式碼可讀性,因為非同步邏輯與同步寫法在語法上幾乎相同。
在實務開發中,掌握何時使用 async、何時使用 BackgroundTasks,以及如何正確處理例外與資源競爭,將直接影響系統的可靠性與可維護性。希望本篇文章的概念說明與範例能幫助你在 FastAPI 專案中寫出更快、更穩的 API!祝開發順利 🚀