FastAPI 背景任務(Background Tasks) – 任務錯誤處理
簡介
在 Web API 中,常會遇到需要 非同步 執行的工作,例如寄送通知信件、產生報表、寫入大量日誌等。FastAPI 提供的 BackgroundTask 讓開發者可以在回應已送出給用戶端後,於同一個工作執行緒中繼續執行這些「背景」工作,既避免了阻塞請求,又不必額外架設 Celery、RQ 等分散式佇列系統。
然而,背景任務本身 不會直接回傳錯誤給呼叫端,如果發生例外,若未妥善處理,可能會導致任務失敗、資源遺漏,甚至讓整個服務的穩定性受影響。本文將深入說明 如何在 FastAPI 背景任務中捕捉與處理錯誤,並提供實作範例、常見陷阱與最佳實踐,協助你在正式環境中安全、可靠地使用 BackgroundTask。
核心概念
1. BackgroundTask 的執行模型
BackgroundTask會在 同一個 ASGI 事件迴圈 中、回應已送出後執行。- 它 不是多執行緒或多進程,因此不會自動提供隔離機制。
- 任何未捕捉的例外會被 FastAPI 捕捉,並 寫入日誌,但不會影響已回傳的 HTTP 回應。
重點:背景任務的錯誤必須在任務內部自行捕捉與處理,否則只能靠日誌觀測,無法回饋給使用者。
2. 為什麼要在背景任務裡自行處理錯誤
- 資源釋放:例如資料庫連線、檔案句柄等若在例外發生時未正確關閉,會造成資源泄漏。
- 重試機制:某些任務(寄送郵件)失敗後可能需要 重試 或 備援。
- 錯誤通知:開發者或維運人員需要即時得知失敗原因,才能快速排除問題。
3. 常見的錯誤類型
| 類型 | 典型例子 | 處理方式 |
|---|---|---|
| 外部服務失效 | 呼叫第三方 API、SMTP 伺服器逾時 | 重試、退回至佇列、或發送警報 |
| 資料驗證錯誤 | 傳入不合法的資料造成例外 | 事先驗證、或捕捉後寫入錯誤日誌 |
| 系統資源問題 | 磁碟寫入失敗、記憶體不足 | 使用 try/finally 確保釋放資源 |
| 程式碼錯誤 | 未處理的 KeyError、AttributeError |
加入全域例外捕捉,或使用裝飾器統一處理 |
程式碼範例
以下示範 5 個實用範例,涵蓋 基本錯誤捕捉、重試機制、日誌與通知、資源釋放、以及全域裝飾器。每段程式碼均附上說明,請依實際需求挑選或組合使用。
範例 1️⃣ 基本的 try/except 捕捉
from fastapi import FastAPI, BackgroundTasks
import logging
app = FastAPI()
logger = logging.getLogger("uvicorn.error")
def send_welcome_email(email: str):
try:
# 假設這裡是寄送郵件的程式
# 若發生連線錯誤會拋出 Exception
smtp_send(email, subject="Welcome", body="Thanks for joining!")
except Exception as exc:
# 捕捉例外並寫入日誌
logger.error(f"寄送歡迎信件失敗 (to={email}): {exc}")
# 依需求可加入重試或其他補救措施
@app.post("/register")
async def register_user(email: str, background_tasks: BackgroundTasks):
# 這裡執行註冊邏輯(寫入 DB 等)
# ...
background_tasks.add_task(send_welcome_email, email)
return {"msg": "註冊成功,歡迎信件稍後寄出"}
說明:最簡單的做法是把可能拋出的例外包在
try/except,確保不會讓背景任務直接崩潰,並把錯誤寫入日誌供日後查詢。
範例 2️⃣ 具備 重試機制 的背景任務
import time
from typing import Callable
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def retry_on_exception(
func: Callable,
max_retries: int = 3,
delay: float = 2.0,
*args,
**kwargs,
):
"""簡易的重試裝飾器,適用於背景任務"""
for attempt in range(1, max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as exc:
if attempt == max_retries:
raise # 最後一次仍失敗,拋出例外讓外層捕捉
time.sleep(delay) # 暫停後再試
# 可在此加入日誌或告警
print(f"第 {attempt} 次嘗試失敗: {exc},稍後重試...")
def generate_report(user_id: int):
# 假設此函式會呼叫外部報表服務
retry_on_exception(_call_report_service, user_id=user_id)
def _call_report_service(user_id: int):
# 這裡故意拋出例外模擬服務不穩
raise ConnectionError("報表服務暫時無法連線")
@app.post("/report/{user_id}")
async def request_report(user_id: int, background_tasks: BackgroundTasks):
background_tasks.add_task(generate_report, user_id)
return {"msg": "報表產生已排入背景任務"}
說明:利用自訂的
retry_on_exception函式,將可能失敗的呼叫包裝起來。若所有重試皆失敗,最外層仍會捕捉例外,讓你可以在generate_report中加入最後的錯誤處理(例如寫入失敗日誌或發送 Slack 通知)。
範例 3️⃣ 使用 Context Manager 確保資源釋放
from fastapi import FastAPI, BackgroundTasks
import sqlite3
import logging
app = FastAPI()
logger = logging.getLogger("uvicorn.error")
class DBConnection:
"""簡易的 SQLite 連線管理器"""
def __init__(self, db_path: str = "data.db"):
self.db_path = db_path
self.conn = None
def __enter__(self):
self.conn = sqlite3.connect(self.db_path)
return self.conn
def __exit__(self, exc_type, exc_val, exc_tb):
if self.conn:
self.conn.close()
if exc_type:
logger.error(f"DB 操作失敗: {exc_val}")
def export_user_data(user_id: int):
try:
with DBConnection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE id=?", (user_id,))
data = cursor.fetchone()
# 假設接下來把 data 寫入 CSV
# ...
except Exception as exc:
logger.error(f"匯出使用者資料失敗 (id={user_id}): {exc}")
@app.post("/export/{user_id}")
async def export(user_id: int, background_tasks: BackgroundTasks):
background_tasks.add_task(export_user_data, user_id)
return {"msg": "資料匯出已排入背景任務"}
說明:使用
with語法自動關閉資料庫連線,即使發生例外也能確保 資源不會遺漏。在__exit__中寫入錯誤日誌,讓維運人員能即時掌握問題。
範例 4️⃣ 統一的 錯誤裝飾器(全域)
import traceback
from functools import wraps
from fastapi import FastAPI, BackgroundTasks
import logging
app = FastAPI()
logger = logging.getLogger("uvicorn.error")
def background_error_handler(func):
"""裝飾器:捕捉背景任務所有例外,統一記錄與通知"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as exc:
# 完整的 traceback 方便除錯
tb = traceback.format_exc()
logger.error(f"背景任務 {func.__name__} 發生錯誤: {exc}\n{tb}")
# 這裡可以加入 Slack / Email 通知
# notify_admin(f"Task {func.__name__} failed: {exc}")
return wrapper
@background_error_handler
def clean_temp_folder(path: str):
# 假設刪除檔案時可能因權限問題拋出例外
import shutil
shutil.rmtree(path)
@app.post("/cleanup")
async def cleanup(path: str, background_tasks: BackgroundTasks):
background_tasks.add_task(clean_temp_folder, path)
return {"msg": "清理工作已排入背景任務"}
說明:把錯誤捕捉與日誌寫入抽離成裝飾器,讓每個背景任務的程式碼保持 乾淨,同時確保所有任務都有一致的錯誤處理流程。若要加入 通知機制(如 Slack、Email),只需要在裝飾器內實作一次即可。
範例 5️⃣ 結合 FastAPI 的事件系統(啟動/關閉)以收集失敗任務
from fastapi import FastAPI, BackgroundTasks
from collections import defaultdict
import logging
app = FastAPI()
logger = logging.getLogger("uvicorn.error")
# 用於暫存失敗任務的資料結構(簡易示範)
failed_tasks = defaultdict(list)
def record_failure(task_name: str, exc: Exception):
failed_tasks[task_name].append(str(exc))
logger.error(f"[FAIL] {task_name}: {exc}")
def task_with_record(task_name: str):
"""產生帶有失敗紀錄的背景任務函式"""
def decorator(func):
async def wrapper(*args, **kwargs):
try:
await func(*args, **kwargs)
except Exception as exc:
record_failure(task_name, exc)
return wrapper
return decorator
@task_with_record("send_sms")
async def send_sms(phone: str, message: str):
# 假設此處呼叫外部 SMS 服務
raise RuntimeError("SMS 服務回應逾時")
@app.post("/sms")
async def sms(phone: str, message: str, background_tasks: BackgroundTasks):
background_tasks.add_task(send_sms, phone, message)
return {"msg": "簡訊已排入背景任務"}
@app.on_event("shutdown")
def show_failed_tasks():
if failed_tasks:
logger.info("服務關閉前,統計失敗的背景任務:")
for name, msgs in failed_tasks.items():
logger.info(f"- {name}: {len(msgs)} 次失敗")
for m in msgs:
logger.info(f" • {m}")
說明:利用 FastAPI 的
on_event("shutdown")事件,在服務關閉前彙總所有失敗任務。這對 批次檢查、監控 或 自動重試 都很有幫助。實務上可把failed_tasks換成 Redis、資料庫或外部監控系統。
常見陷阱與最佳實踐
| 陷阱 | 可能的後果 | 推薦的做法 |
|---|---|---|
| 未捕捉例外 | 任務直接崩潰,錯誤只出現在日誌,無法自動重試 | 使用 try/except、裝飾器或全域錯誤處理器 |
| 在背景任務中直接回傳 HTTPException | BackgroundTask 不會把例外傳回客戶端,且會產生不必要的日誌噪音 |
只在請求階段拋出例外,背景任務內僅記錄或通知 |
使用阻塞 I/O(如同步的 requests) |
會阻塞整個事件迴圈,影響其他請求 | 改用 httpx.AsyncClient 或把阻塞程式包在 run_in_threadpool |
| 資源未正確釋放(DB、檔案、網路連線) | 可能導致連線耗盡、檔案鎖死 | 使用 with、try/finally 或依賴注入的 Depends 釋放資源 |
| 背景任務過多導致記憶體泄漏 | 服務長時間運行後變慢甚至崩潰 | 設計上限(如每秒最多 X 個背景任務),或改用專門的任務佇列(Celery、RQ) |
| 重試機制缺乏退避(Backoff) | 短時間內大量重試會加重外部服務壓力 | 使用指數退避(exponential backoff)或 jitter,避免「驟雨」效應 |
最佳實踐清單
- 統一錯誤處理:使用裝飾器或基礎函式把例外捕捉、日誌、通知流程集中管理。
- 非阻塞 I/O:盡量使用
async函式與await,若必須使用同步庫,將其包在run_in_threadpool。 - 資源管理:使用 context manager 或依賴注入 (
Depends) 確保 DB、檔案、網路連線在任務結束時被關閉。 - 重試與退避:對外部服務的呼叫加入 重試次數限制、退避時間,必要時記錄失敗次數以供後續分析。
- 日誌與監控:將背景任務的錯誤寫入結構化日誌,並結合 Prometheus、Grafana 或 ELK 監控服務的失敗率。
- 測試:利用
TestClient與anyio的sleep模擬背景任務,確保錯誤處理邏輯在單元測試中被覆蓋。
實際應用場景
| 場景 | 為何需要背景任務的錯誤處理 | 可能的實作方式 |
|---|---|---|
| 使用者註冊後寄送驗證信 | 郵件服務可能暫時不可用,若失敗需重新嘗試或通知管理員 | try/except + 重試裝飾器,失敗時寫入 DB 失敗表,定時任務重送 |
| 大量影像處理(縮圖、加水印) | 圖片處理可能因磁碟空間不足拋出例外,需要清理或暫停排程 | with 管理檔案資源、捕捉 OSError,失敗時將任務移至「待處理」佇列 |
| 匯出報表給企業客戶 | 報表產生涉及多個外部 API,任何一個失敗都會導致整份報表不完整 | 使用 retry_on_exception 包裝每個 API 呼叫,最終失敗則寫入失敗通知郵件 |
| 推送即時通知(WebSocket、FCM) | 推送服務不穩定,若失敗需保留訊息以便稍後重送 | 把訊息寫入 Redis 队列,背景任務嘗試推送,失敗則保留在隊列中,定時重試 |
| 資料備份至雲端儲存 | 雲端 API 限流或網路斷線,備份失敗會導致資料遺失風險 | 捕捉 ClientError,使用指數退避 + 失敗記錄,必要時觸發警報系統(PagerDuty) |
總結
- 背景任務 為 FastAPI 提供了輕量級的非同步執行方式,但 錯誤不會自動回傳,開發者必須自行捕捉、記錄與處理。
- 透過
try/except、重試機制、資源管理、統一裝飾器,可以將錯誤處理寫得既安全又可維護。 - 在實務上,將失敗資訊 寫入日誌、資料庫或監控系統,並結合 通知渠道(Slack、Email)能讓團隊在問題發生時快速反應。
- 為避免 資源泄漏與服務阻塞,務必使用非阻塞 I/O、適當的 上下文管理,以及 限制背景任務的併發量。
掌握以上技巧,你就能在 FastAPI 專案中安全地使用 BackgroundTask,即使在面對外部服務不穩、資源限制或程式錯誤時,也能保持服務的高可用與可觀測性。祝開發順利,快把這些實務技巧帶到你的下一個 FastAPI 專案吧!