FastAPI 背景任務(BackgroundTasks)基礎教學
簡介
在 Web API 開發中,我們常常會遇到「這個請求必須先回傳結果,之後才要執行一些較耗時的工作」的情境。
如果直接在路由函式內部執行長時間的任務,使用者必須等待整個工作完成才會收到回應,會嚴重影響服務的效能與使用者體驗。
FastAPI 為了在 非阻塞 的前提下提供簡易的背景工作機制,內建了 BackgroundTasks 類別。
只要把要延後執行的函式註冊到 BackgroundTasks,FastAPI 會在回傳 HTTP 回應之後,於同一個工作執行緒(或事件迴圈)中自動呼叫它。
這讓我們可以輕鬆地把 寄送 Email、寫入日誌、產生報表、清理暫存檔 等操作,從主要請求流程中抽離出來,保持 API 的即時回應速度。
本篇文章將從概念說明、實作範例、常見陷阱與最佳實踐,帶你完整掌握 FastAPI 背景任務的 基本用法,適合剛接觸 FastAPI 的新手以及想在既有專案中加入背景工作流程的中級開發者。
核心概念
1. 為什麼使用 BackgroundTasks?
| 情境 | 若不使用背景任務 | 使用 BackgroundTasks 後 |
|---|---|---|
| 寄送驗證信 | 請求必須等到 Email 服務回應,可能花 2~5 秒 | 立即回傳 202,Email 在背景完成 |
| 產生大型 PDF 報表 | 使用者必須等待報表產生完畢才能得到回應 | 先回傳任務 ID,報表完成後可自行下載 |
| 清除過期快取 | 直接在每個請求裡跑,會降低每次請求的效能 | 只在特定時機或排程觸發,與主要請求無關 |
重點:BackgroundTasks 不是完整的任務排程系統(如 Celery、RQ),它適合「簡單、立即」的背景工作;若需要分散式、重試機制或長時間排程,仍建議使用外部任務佇列。
2. BackgroundTasks 的工作原理
- 依賴注入:在路由函式的參數中加入
background_tasks: BackgroundTasks,FastAPI 會自動建立並注入物件。 - 註冊任務:呼叫
background_tasks.add_task(func, *args, **kwargs),把要延遲執行的函式與參數加進佇列。 - 回傳 Response:FastAPI 先把 HTTP 回應送給客戶端,之後 會在同一個事件迴圈(若使用
uvicorn的 async workers)或工作執行緒中依序呼叫已註冊的函式。
注意:背景任務的執行環境與原始請求相同(同一個 Python 進程),因此 不應該在背景任務裡做會阻塞整個事件迴圈的同步 I/O,除非你已將 FastAPI 設定為使用多執行緒/多進程的 worker。
3. 基本使用步驟
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def write_log(message: str):
"""將訊息寫入本機檔案,模擬 I/O 操作"""
with open("log.txt", "a", encoding="utf-8") as f:
f.write(message + "\n")
@app.post("/items/")
async def create_item(name: str, background_tasks: BackgroundTasks):
# 主要業務邏輯:建立資料庫紀錄(此處省略)
# ...
# 註冊背景任務
background_tasks.add_task(write_log, f"Item created: {name}")
return {"msg": "Item created, log will be written in background"}
write_log為同步函式;FastAPI 會在回傳結果後於 執行緒池 中執行它。- 若
write_log為 非同步(async def),則會直接在事件迴圈內執行,無需額外執行緒。
程式碼範例
以下提供 五個實用範例,示範不同情境下的 BackgroundTasks 用法,並附上說明註解。
範例 1️⃣:寄送驗證 Email(非同步 I/O)
# file: main.py
from fastapi import FastAPI, BackgroundTasks, HTTPException
import httpx # 非同步 HTTP 客戶端
app = FastAPI()
async def send_verification_email(email: str, token: str):
"""
呼叫外部 Email 服務(假設提供 /send API),
使用 httpx 的非同步請求以免阻塞事件迴圈。
"""
async with httpx.AsyncClient() as client:
payload = {"to": email, "subject": "驗證信", "body": f"Your token: {token}"}
resp = await client.post("https://email-service.local/send", json=payload, timeout=10.0)
resp.raise_for_status() # 若失敗會拋出例外
@app.post("/register/")
async def register_user(email: str, background_tasks: BackgroundTasks):
# 產生驗證 token(簡化示意)
token = "123456"
# 直接回傳註冊成功訊息
background_tasks.add_task(send_verification_email, email, token)
return {"msg": "註冊成功,驗證信已發送(背景)"}
說明:
- 使用
httpx.AsyncClient讓 Email 請求保持非同步。- 若 Email 服務回傳錯誤,例外會在背景任務內被拋出,預設不會影響已回傳的 HTTP 回應。
範例 2️⃣:產生 CSV 報表(同步 CPU 密集)
# file: report.py
import csv
import time
from pathlib import Path
def generate_csv_report(user_id: int):
"""
模擬耗時的報表產生(CPU 密集),寫入 tmp 目錄。
"""
filename = Path("/tmp") / f"report_user_{user_id}.csv"
# 假設需要 3 秒才能完成
time.sleep(3)
with open(filename, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["項目", "數量"])
writer.writerow(["A", 10])
writer.writerow(["B", 20])
return str(filename)
# main.py
from fastapi import FastAPI, BackgroundTasks
from report import generate_csv_report
app = FastAPI()
@app.post("/report/{user_id}")
async def request_report(user_id: int, background_tasks: BackgroundTasks):
# 立刻回傳任務已接受,實際報表稍後產生
background_tasks.add_task(generate_csv_report, user_id)
return {"msg": f"報表產生中,稍後可於 /download/{user_id} 取得"}
說明:
generate_csv_report為同步且 CPU 密集,FastAPI 會把它交給 執行緒池,避免阻塞主事件迴圈。- 若報表產生失敗,可在函式內自行寫入失敗日誌或使用
try/except捕捉例外。
範例 3️⃣:清除過期快取(批次 I/O)
# file: cache_cleanup.py
import os
import time
CACHE_DIR = "/tmp/cache"
def clean_expired_files(max_age_seconds: int = 86400):
"""
刪除快取目錄中超過 max_age_seconds 的檔案。
"""
now = time.time()
for filename in os.listdir(CACHE_DIR):
path = os.path.join(CACHE_DIR, filename)
if os.path.isfile(path):
age = now - os.path.getmtime(path)
if age > max_age_seconds:
os.remove(path)
print(f"Removed expired cache: {path}")
# main.py
from fastapi import FastAPI, BackgroundTasks
from cache_cleanup import clean_expired_files
app = FastAPI()
@app.post("/cache/cleanup")
async def trigger_cleanup(background_tasks: BackgroundTasks):
# 手動觸發一次快取清理
background_tasks.add_task(clean_expired_files, max_age_seconds=3600)
return {"msg": "快取清理已排入背景任務"}
說明:
- 此任務通常在 低流量時間 或 管理介面 觸發。
- 若快取目錄非常大,建議改用
asyncio.to_thread(Python 3.9+)或外部排程工具。
範例 4️⃣:結合 asyncio.create_task 進階使用
import asyncio
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
async def async_heavy_job(data: str):
await asyncio.sleep(2) # 模擬非阻塞 I/O
print(f"Job done with: {data}")
@app.post("/async-job")
async def launch_job(data: str, background_tasks: BackgroundTasks):
# 直接使用 asyncio.create_task 讓任務在同一事件迴圈內平行執行
background_tasks.add_task(asyncio.create_task, async_heavy_job(data))
return {"msg": "非同步工作已在背景執行"}
說明:
BackgroundTasks.add_task接受任何 callable;我們把asyncio.create_task當作外層函式,將真正的 async 任務排入事件迴圈。- 這種寫法在需要 同時啟動多個 async 任務 時特別方便。
範例 5️⃣:使用 Depends 注入自訂服務(DI)
from fastapi import FastAPI, BackgroundTasks, Depends
class NotificationService:
async def push(self, user_id: int, message: str):
# 假設呼叫第三方推播平台
await asyncio.sleep(0.5)
print(f"Pushed to {user_id}: {message}")
def get_notifier() -> NotificationService:
# 這裡可以放置連線池、設定檔等
return NotificationService()
app = FastAPI()
@app.post("/notify/{user_id}")
async def send_notification(
user_id: int,
message: str,
background_tasks: BackgroundTasks,
notifier: NotificationService = Depends(get_notifier),
):
background_tasks.add_task(notifier.push, user_id, message)
return {"msg": "通知已排入背景任務"}
說明:
- 透過 FastAPI 的 依賴注入(Depends)取得自訂服務實例,讓背景任務能直接使用已建立好的物件(例如已連線的 Redis、資料庫或外部 API 客戶端)。
- 這樣的寫法保持程式碼的可測試性與可維護性。
常見陷阱與最佳實踐
| 陷阱 | 可能的後果 | 解決方案 / 最佳實踐 |
|---|---|---|
在背景任務裡使用阻塞的同步 I/O(如 requests、time.sleep) |
會阻塞執行緒池,若同時有大量請求會耗盡執行緒,導致服務斷流 | 改用非同步客戶端(httpx.AsyncClient)或使用 asyncio.to_thread 把同步 I/O 包裝成執行緒任務 |
| 背景任務拋出未捕捉的例外 | 例外會被記錄在伺服器日誌,但不會回傳給客戶端,容易忽略失敗 | 在背景函式內使用 try/except,並自行寫入失敗日誌、或把錯誤資訊寫入資料庫以供後續查詢 |
| 任務需長時間執行(超過幾分鐘) | 背景任務仍會在同一個進程中執行,若進程因為其他原因重啟會遺失任務 | 對於長時間、需要重試或分散式的工作,考慮使用 Celery、RQ、Dramatiq 等外部任務佇列 |
| 共享全域變數 | 多個背景任務同時寫入/讀取同一變數會產生競爭條件 | 使用 thread‑safe 結構(如 queue.Queue)或將狀態持久化到資料庫/Redis |
忘記在 uvicorn 啟動指令加上 --workers |
單一 worker 內部的背景任務若卡住,會影響所有請求 | 若應用需要更高併發,使用多 worker(uvicorn main:app --workers 4)或在 Docker/K8s 中水平擴展 |
其他最佳實踐
- 保持背景任務的執行時間盡可能短:若任務需要較長時間,考慮拆成多個小步驟或改用排程系統。
- 統一日誌格式:在背景任務內部使用與主程式相同的 logger,方便追蹤與排錯。
- 避免在背景任務裡直接存取
request物件:request只在主請求的生命週期內有效,背景任務執行時已經脫離該上下文。若需要資訊,請在加入任務時把所需資料作為參數傳入。 - 測試時使用
TestClient:FastAPI 的測試客戶端會同步等待背景任務完成,這有助於在單元測試中驗證任務行為。
實際應用場景
| 場景 | 為什麼適合使用 BackgroundTasks | 範例程式碼概念 |
|---|---|---|
| 使用者註冊後寄送驗證郵件 | 需要即時回應,郵件服務可能較慢 | background_tasks.add_task(send_email, ...) |
| 上傳檔案後產生縮圖 | 圖片處理耗時,且不需要立即回傳縮圖 URL | background_tasks.add_task(create_thumbnail, file_path) |
| 每日資料匯出 (CSV / Excel) | 使用者點擊「匯出」後即回傳任務 ID,稍後下載 | background_tasks.add_task(export_report, user_id) |
| API 呼叫外部付款平台 | 付款成功後再發送「付款成功」通知,避免阻塞付款流程 | background_tasks.add_task(notify_payment_success, order_id) |
| 定期清理暫存或過期資料 | 透過管理介面或自動觸發,保持資源乾淨 | background_tasks.add_task(clean_expired_files) |
小技巧:若想讓使用者能查詢背景任務的執行狀態,可在任務開始前在資料庫寫入 pending,完成後更新為 finished 或 failed,前端透過輪詢或 WebSocket 取得最新狀態。
總結
FastAPI 的 BackgroundTasks 為 簡易且高效 的背景工作機制,讓開發者只需要幾行程式碼就能把 耗時或非即時 的操作從主要請求流程中抽離。
透過本篇的概念說明與五個實作範例,你應該已掌握:
- 如何在路由函式中注入
BackgroundTasks - 何時使用 同步、非同步、或
asyncio.create_task的寫法 - 常見的陷阱(阻塞 I/O、未捕捉例外、長時間任務)以及對應的最佳實踐
- 真實的業務場景(Email、報表、快取清理、推播等)如何落地
在實務開發中,先把背景任務做得簡單、可觀測,再根據需求演進到更完整的任務佇列系統,這樣既能快速交付功能,又不會在未來的擴充上卡住。祝你在 FastAPI 專案中玩得開心,寫出高效、可維護的 API!