FastAPI 背景任務(Background Tasks)── 多任務同時執行
簡介
在 Web 應用程式中,長時間執行的工作(例如寄送電子郵件、產生報表、圖片壓縮)如果直接在 API 的請求路徑裡執行,會讓使用者必須等待數秒甚至更久才得到回應,嚴重影響使用體驗。FastAPI 提供的 BackgroundTask 機制讓開發者可以把這類工作「脫離」主請求流程,於回傳結果後於同一個工作執行緒(或協程)中繼續執行。
本篇文章聚焦在 同時執行多個背景任務 的技巧與實務應用。你將學會:
- 如何在單一次 API 呼叫中排入多個
BackgroundTask。 - 何時應該使用
BackgroundTasks、BackgroundTask、或是更高階的佇列(如 Celery)。 - 常見的陷阱、效能考量與最佳實踐。
即使你是剛接觸 FastAPI 的新手,只要跟著範例一步一步走,也能快速在自己的專案中導入安全、可維護的多任務背景處理。
核心概念
1. BackgroundTasks 與 BackgroundTask 的差別
| 類別 | 位置 | 用途 |
|---|---|---|
BackgroundTask |
單一任務的封裝(fastapi.BackgroundTask) |
只接受 一個 callable,常用於簡單情境。 |
BackgroundTasks |
請求層級的容器(fastapi.BackgroundTasks) |
可以 累積多個 BackgroundTask,在回應送出後一次性執行全部。 |
重點:若你只需要一次排入單一工作,直接使用
BackgroundTask即可;若同一個請求要觸發 多個 後續工作,就必須使用BackgroundTasks。
2. 為什麼要同時排入多個任務?
- 降低回應延遲:一次請求可以同時觸發寄送驗證信、寫入審計日誌、以及產生縮圖,所有工作會在回應之後平行執行,使用者不必等到最後一個工作完成才得到結果。
- 提升資源利用率:FastAPI 基於 Starlette,底層使用
asyncio,背景任務會在同一個事件迴圈內執行,與其他非阻塞 I/O 任務共享執行緒,避免額外的執行緒或進程開銷。 - 維護性:把不同職責的工作分離成獨立函式,程式碼更易測試與重構。
3. 基本使用方式
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def write_log(message: str):
# 假設這是一個會寫入檔案或資料庫的阻塞函式
with open("log.txt", "a") as f:
f.write(message + "\n")
@app.post("/items/")
async def create_item(name: str, background_tasks: BackgroundTasks):
# 先回傳成功訊息
background_tasks.add_task(write_log, f"Item created: {name}")
return {"message": f"Item {name} is being processed"}
上述程式碼只排入 一個 任務;接下來的範例會示範如何一次排入 多個。
程式碼範例
範例 1️⃣ 同時寄送兩封不同內容的 Email
from fastapi import FastAPI, BackgroundTasks
import smtplib
from email.mime.text import MIMEText
app = FastAPI()
def send_email(to_addr: str, subject: str, body: str):
"""簡易的 SMTP 寄信函式(同步阻塞)"""
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = subject
msg["From"] = "no-reply@example.com"
msg["To"] = to_addr
# 這裡使用本機的測試 SMTP 伺服器
with smtplib.SMTP("localhost", 1025) as server:
server.send_message(msg)
@app.post("/register/")
async def register_user(email: str, background_tasks: BackgroundTasks):
# 1. 寄送驗證信
background_tasks.add_task(
send_email,
to_addr=email,
subject="驗證您的帳號",
body="點擊以下連結完成驗證..."
)
# 2. 寄送歡迎信
background_tasks.add_task(
send_email,
to_addr=email,
subject="歡迎加入我們!",
body="感謝註冊,我們期待與您一起成長。"
)
return {"msg": "註冊成功,驗證信與歡迎信已送出"}
說明:兩個
add_task呼叫分別排入不同的send_email任務,FastAPI 會在回應結束後依序執行它們。若send_email本身是同步阻塞,仍會在同一個執行緒內完成;若要更高效,可改寫成async版本或交給 Celery。
範例 2️⃣ 同時生成縮圖與上傳至雲端儲存
from fastapi import FastAPI, UploadFile, File, BackgroundTasks
from PIL import Image
import io, boto3
app = FastAPI()
s3_client = boto3.client("s3", region_name="ap-northeast-1")
def generate_thumbnail(image_bytes: bytes, size: tuple = (128, 128)):
img = Image.open(io.BytesIO(image_bytes))
img.thumbnail(size)
buf = io.BytesIO()
img.save(buf, format="JPEG")
return buf.getvalue()
def upload_to_s3(key: str, data: bytes):
s3_client.put_object(Bucket="my-bucket", Key=key, Body=data)
@app.post("/upload/")
async def upload_image(file: UploadFile = File(...), background_tasks: BackgroundTasks):
raw_bytes = await file.read()
# 1. 產生縮圖
background_tasks.add_task(
lambda: upload_to_s3(
key=f"thumbnails/{file.filename}",
data=generate_thumbnail(raw_bytes)
)
)
# 2. 原始檔上傳
background_tasks.add_task(
upload_to_s3,
key=f"original/{file.filename}",
data=raw_bytes,
)
return {"msg": "檔案已收到,正同步處理中"}
說明:
generate_thumbnail為同步 CPU 密集型函式,使用lambda包裝是為了先把raw_bytes捕獲,再在背景任務中呼叫。- 兩個
add_task分別上傳縮圖與原始檔,兩者會同時進行(實際上仍是順序執行,但因為 I/O 操作多為非阻塞,效能差異不大)。若需要真正的平行處理,可改用ThreadPoolExecutor或外部佇列。
範例 3️⃣ 同時寫入審計日誌、觸發 Webhook、清除快取
from fastapi import FastAPI, BackgroundTasks
import httpx, redis, json, datetime
app = FastAPI()
r = redis.Redis(host="localhost", port=6379, db=0)
def audit_log(user_id: int, action: str):
entry = {
"user_id": user_id,
"action": action,
"timestamp": datetime.datetime.utcnow().isoformat()
}
r.lpush("audit_log", json.dumps(entry))
def trigger_webhook(url: str, payload: dict):
with httpx.Client() as client:
client.post(url, json=payload, timeout=5)
def clear_cache(key_pattern: str):
for key in r.scan_iter(match=key_pattern):
r.delete(key)
@app.post("/order/{order_id}")
async def place_order(order_id: int, user_id: int, background_tasks: BackgroundTasks):
# (此處省略實際下單邏輯)
# 多任務排入
background_tasks.add_task(audit_log, user_id, f"order_created:{order_id}")
background_tasks.add_task(
trigger_webhook,
url="https://hooks.example.com/order",
payload={"order_id": order_id, "user_id": user_id}
)
background_tasks.add_task(clear_cache, "order:list:*")
return {"msg": f"Order {order_id} 已建立"}
說明:
audit_log、trigger_webhook、clear_cache三個任務分屬不同領域(資料庫、外部服務、快取),同時排入可確保「主流程」不被任何其中一個阻塞。- 若其中一個任務拋出例外,FastAPI 仍會嘗試執行剩餘任務;但例外訊息只會在日誌中顯示,不會回傳給使用者。
範例 4️⃣ 使用 async 背景任務(FastAPI 0.78+ 支援)
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
async def async_task(name: str, seconds: int):
await asyncio.sleep(seconds)
print(f"[{name}] 完成於 {seconds}s 後")
@app.get("/parallel")
async def run_parallel(background_tasks: BackgroundTasks):
# 同時排入兩個 async 任務
background_tasks.add_task(async_task, "Task A", 3)
background_tasks.add_task(async_task, "Task B", 5)
return {"msg": "兩個非同步任務已排入,請稍後觀察日誌"}
說明:當背景任務本身是
async def時,FastAPI 會直接以await方式執行它們。雖然仍是順序執行(先 A 後 B),但因為兩者都是非阻塞的await asyncio.sleep,實際上會在同一個事件迴圈內同時進行,等效於「平行」執行。
範例 5️⃣ 結合 ThreadPoolExecutor 處理 CPU 密集型工作
from fastapi import FastAPI, BackgroundTasks
from concurrent.futures import ThreadPoolExecutor
import hashlib, time
app = FastAPI()
executor = ThreadPoolExecutor(max_workers=4)
def heavy_hash(data: bytes):
# 模擬 CPU 密集型運算(如大量加密雜湊)
time.sleep(2) # 假裝計算時間
return hashlib.sha256(data).hexdigest()
def schedule_heavy_hash(data: bytes, background_tasks: BackgroundTasks):
# 使用 executor 讓任務跑在獨立執行緒中
future = executor.submit(heavy_hash, data)
# 若需要在背景任務完成後做後續處理,可在此處加入回呼
# future.add_done_callback(lambda f: print(f.result()))
@app.post("/hash/")
async def upload_and_hash(file: bytes, background_tasks: BackgroundTasks):
schedule_heavy_hash(file, background_tasks)
return {"msg": "檔案已接收,雜湊計算中"}
說明:
BackgroundTasks本身是在 同一個 事件迴圈內執行,對於 CPU 密集型工作會阻塞其他 I/O 任務。此時可透過ThreadPoolExecutor把工作交給工作執行緒池,避免卡住事件迴圈。max_workers應根據實際 CPU 核心數與工作負載調整。
常見陷阱與最佳實踐
| 陷阱 | 可能的後果 | 解決方案 |
|---|---|---|
| 在背景任務中直接拋出例外 | 任務中斷,其他任務仍執行,但錯誤只會寫入日誌,難以追蹤。 | 使用 try/except 包裹每個任務,並將錯誤寫入結構化日誌(如 Sentry)。 |
使用阻塞 I/O(例如 requests) |
會阻塞整個事件迴圈,導致其他請求卡住。 | 改用非阻塞庫(httpx.AsyncClient)或將阻塞呼叫包在 ThreadPoolExecutor。 |
| 背景任務依賴於請求範圍的資源(如 DB 連線) | 請求結束後資源可能被關閉,導致任務失敗。 | 在背景任務內自行建立/關閉 DB 連線,或使用全域的連線池。 |
| 過度使用 BackgroundTasks 而忽略佇列系統 | 伺服器重啟時未完成的任務會遺失,且無法保證任務執行次序/重試。 | 當任務重要且需要可靠性時,改用 Celery、RQ、或 Kafka 等外部佇列。 |
| 忘記限制同時執行的任務數量 | 大量同時排入的任務會耗盡 CPU/記憶體,導致服務崩潰。 | 使用 Semaphore、ThreadPoolExecutor 或佇列系統的併發限制。 |
最佳實踐
- 任務拆分:每個背景任務只做一件事,保持函式短小、易測。
- 非阻塞優先:盡可能使用
async函式與非阻塞 I/O,減少事件迴圈阻塞。 - 錯誤捕捉與上報:在每個任務內部捕捉例外,並將錯誤寫入結構化日誌或外部監控系統。
- 資源管理:不要在背景任務中直接使用請求範圍的依賴(如
Depends產生的 DB session),應自行取得或使用全域池。 - 測試:使用
TestClient並搭配pytest的capsys或caplog來驗證背景任務是否被正確排入與執行。 - 監控:在生產環境部署時,觀察事件迴圈的延遲(如
uvicorn --log-level debug)以及背景任務的執行時間,必要時加入統計指標(Prometheus)。
實際應用場景
| 場景 | 為什麼適合使用多任務背景執行 |
|---|---|
| 使用者註冊 | 同時寄送驗證信、寫入審計日誌、建立預設設定檔。 |
| 上傳圖片 | 產生多尺寸縮圖、上傳原圖至 CDN、更新資料庫的圖片 URL。 |
| 電商下單 | 寫入訂單資料、觸發第三方支付 Webhook、清除商品快取、發送訂單確認信。 |
| 報表產生 | 產生 CSV、PDF兩種格式、上傳至雲端儲存、通知使用者完成。 |
| 批次同步 | 從外部 API 取得資料、寫入本地資料庫、發送同步完成的 Slack 訊息。 |
範例:在電商平台的「下單」API 中,我們可以同時排入四個背景任務:
audit_log、trigger_webhook、clear_cache、send_email。主流程只負責寫入訂單資料庫,回傳成功訊息後,四個任務會在背景中依序或平行完成,確保使用者感受到即時回應,同時系統仍能完成所有後續處理。
總結
- FastAPI 的
BackgroundTasks讓我們可以在同一次請求中排入 多個 任務,避免長時間阻塞使用者的回應。 - 使用 同步、非阻塞 async、或 ThreadPoolExecutor 三種方式,依工作性質(I/O、CPU)選擇最適合的實作。
- 陷阱 包括阻塞 I/O、資源管理不當、以及缺乏錯誤上報;對策是採用非阻塞庫、在任務內自行管理資源、以及完善的日誌與監控。
- 當任務 重要性、可靠性 或 併發量 超過單機背景任務能承受的範圍時,應考慮外部佇列(Celery、RQ 等)作為補強。
透過本篇的概念與範例,你已具備在 FastAPI 專案中 安全、有效率地同時執行多個背景任務 的能力。從現在開始,將繁重的後續工作交給背景任務處理,讓 API 回應更快、使用者體驗更佳,同時保持程式碼的可讀性與可維護性。祝開發順利! 🚀