本文 AI 產出,尚未審核

FastAPI 背景任務(Background Tasks)── 多任務同時執行


簡介

在 Web 應用程式中,長時間執行的工作(例如寄送電子郵件、產生報表、圖片壓縮)如果直接在 API 的請求路徑裡執行,會讓使用者必須等待數秒甚至更久才得到回應,嚴重影響使用體驗。FastAPI 提供的 BackgroundTask 機制讓開發者可以把這類工作「脫離」主請求流程,於回傳結果後於同一個工作執行緒(或協程)中繼續執行。

本篇文章聚焦在 同時執行多個背景任務 的技巧與實務應用。你將學會:

  1. 如何在單一次 API 呼叫中排入多個 BackgroundTask
  2. 何時應該使用 BackgroundTasksBackgroundTask、或是更高階的佇列(如 Celery)。
  3. 常見的陷阱、效能考量與最佳實踐。

即使你是剛接觸 FastAPI 的新手,只要跟著範例一步一步走,也能快速在自己的專案中導入安全、可維護的多任務背景處理。


核心概念

1. BackgroundTasks 與 BackgroundTask 的差別

類別 位置 用途
BackgroundTask 單一任務的封裝(fastapi.BackgroundTask 只接受 一個 callable,常用於簡單情境。
BackgroundTasks 請求層級的容器(fastapi.BackgroundTasks 可以 累積多個 BackgroundTask,在回應送出後一次性執行全部。

重點:若你只需要一次排入單一工作,直接使用 BackgroundTask 即可;若同一個請求要觸發 多個 後續工作,就必須使用 BackgroundTasks

2. 為什麼要同時排入多個任務?

  • 降低回應延遲:一次請求可以同時觸發寄送驗證信、寫入審計日誌、以及產生縮圖,所有工作會在回應之後平行執行,使用者不必等到最後一個工作完成才得到結果。
  • 提升資源利用率:FastAPI 基於 Starlette,底層使用 asyncio,背景任務會在同一個事件迴圈內執行,與其他非阻塞 I/O 任務共享執行緒,避免額外的執行緒或進程開銷。
  • 維護性:把不同職責的工作分離成獨立函式,程式碼更易測試與重構。

3. 基本使用方式

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def write_log(message: str):
    # 假設這是一個會寫入檔案或資料庫的阻塞函式
    with open("log.txt", "a") as f:
        f.write(message + "\n")

@app.post("/items/")
async def create_item(name: str, background_tasks: BackgroundTasks):
    # 先回傳成功訊息
    background_tasks.add_task(write_log, f"Item created: {name}")
    return {"message": f"Item {name} is being processed"}

上述程式碼只排入 一個 任務;接下來的範例會示範如何一次排入 多個


程式碼範例

範例 1️⃣ 同時寄送兩封不同內容的 Email

from fastapi import FastAPI, BackgroundTasks
import smtplib
from email.mime.text import MIMEText

app = FastAPI()

def send_email(to_addr: str, subject: str, body: str):
    """簡易的 SMTP 寄信函式(同步阻塞)"""
    msg = MIMEText(body, "plain", "utf-8")
    msg["Subject"] = subject
    msg["From"] = "no-reply@example.com"
    msg["To"] = to_addr

    # 這裡使用本機的測試 SMTP 伺服器
    with smtplib.SMTP("localhost", 1025) as server:
        server.send_message(msg)

@app.post("/register/")
async def register_user(email: str, background_tasks: BackgroundTasks):
    # 1. 寄送驗證信
    background_tasks.add_task(
        send_email,
        to_addr=email,
        subject="驗證您的帳號",
        body="點擊以下連結完成驗證..."
    )
    # 2. 寄送歡迎信
    background_tasks.add_task(
        send_email,
        to_addr=email,
        subject="歡迎加入我們!",
        body="感謝註冊,我們期待與您一起成長。"
    )
    return {"msg": "註冊成功,驗證信與歡迎信已送出"}

說明:兩個 add_task 呼叫分別排入不同的 send_email 任務,FastAPI 會在回應結束後依序執行它們。若 send_email 本身是同步阻塞,仍會在同一個執行緒內完成;若要更高效,可改寫成 async 版本或交給 Celery。


範例 2️⃣ 同時生成縮圖與上傳至雲端儲存

from fastapi import FastAPI, UploadFile, File, BackgroundTasks
from PIL import Image
import io, boto3

app = FastAPI()
s3_client = boto3.client("s3", region_name="ap-northeast-1")

def generate_thumbnail(image_bytes: bytes, size: tuple = (128, 128)):
    img = Image.open(io.BytesIO(image_bytes))
    img.thumbnail(size)
    buf = io.BytesIO()
    img.save(buf, format="JPEG")
    return buf.getvalue()

def upload_to_s3(key: str, data: bytes):
    s3_client.put_object(Bucket="my-bucket", Key=key, Body=data)

@app.post("/upload/")
async def upload_image(file: UploadFile = File(...), background_tasks: BackgroundTasks):
    raw_bytes = await file.read()

    # 1. 產生縮圖
    background_tasks.add_task(
        lambda: upload_to_s3(
            key=f"thumbnails/{file.filename}",
            data=generate_thumbnail(raw_bytes)
        )
    )
    # 2. 原始檔上傳
    background_tasks.add_task(
        upload_to_s3,
        key=f"original/{file.filename}",
        data=raw_bytes,
    )
    return {"msg": "檔案已收到,正同步處理中"}

說明

  • generate_thumbnail 為同步 CPU 密集型函式,使用 lambda 包裝是為了先把 raw_bytes 捕獲,再在背景任務中呼叫。
  • 兩個 add_task 分別上傳縮圖與原始檔,兩者會同時進行(實際上仍是順序執行,但因為 I/O 操作多為非阻塞,效能差異不大)。若需要真正的平行處理,可改用 ThreadPoolExecutor 或外部佇列。

範例 3️⃣ 同時寫入審計日誌、觸發 Webhook、清除快取

from fastapi import FastAPI, BackgroundTasks
import httpx, redis, json, datetime

app = FastAPI()
r = redis.Redis(host="localhost", port=6379, db=0)

def audit_log(user_id: int, action: str):
    entry = {
        "user_id": user_id,
        "action": action,
        "timestamp": datetime.datetime.utcnow().isoformat()
    }
    r.lpush("audit_log", json.dumps(entry))

def trigger_webhook(url: str, payload: dict):
    with httpx.Client() as client:
        client.post(url, json=payload, timeout=5)

def clear_cache(key_pattern: str):
    for key in r.scan_iter(match=key_pattern):
        r.delete(key)

@app.post("/order/{order_id}")
async def place_order(order_id: int, user_id: int, background_tasks: BackgroundTasks):
    # (此處省略實際下單邏輯)

    # 多任務排入
    background_tasks.add_task(audit_log, user_id, f"order_created:{order_id}")
    background_tasks.add_task(
        trigger_webhook,
        url="https://hooks.example.com/order",
        payload={"order_id": order_id, "user_id": user_id}
    )
    background_tasks.add_task(clear_cache, "order:list:*")
    return {"msg": f"Order {order_id} 已建立"}

說明

  • audit_logtrigger_webhookclear_cache 三個任務分屬不同領域(資料庫、外部服務、快取),同時排入可確保「主流程」不被任何其中一個阻塞。
  • 若其中一個任務拋出例外,FastAPI 仍會嘗試執行剩餘任務;但例外訊息只會在日誌中顯示,不會回傳給使用者

範例 4️⃣ 使用 async 背景任務(FastAPI 0.78+ 支援)

from fastapi import FastAPI, BackgroundTasks
import asyncio

app = FastAPI()

async def async_task(name: str, seconds: int):
    await asyncio.sleep(seconds)
    print(f"[{name}] 完成於 {seconds}s 後")

@app.get("/parallel")
async def run_parallel(background_tasks: BackgroundTasks):
    # 同時排入兩個 async 任務
    background_tasks.add_task(async_task, "Task A", 3)
    background_tasks.add_task(async_task, "Task B", 5)
    return {"msg": "兩個非同步任務已排入,請稍後觀察日誌"}

說明:當背景任務本身是 async def 時,FastAPI 會直接以 await 方式執行它們。雖然仍是順序執行(先 A 後 B),但因為兩者都是非阻塞的 await asyncio.sleep,實際上會在同一個事件迴圈內同時進行,等效於「平行」執行。


範例 5️⃣ 結合 ThreadPoolExecutor 處理 CPU 密集型工作

from fastapi import FastAPI, BackgroundTasks
from concurrent.futures import ThreadPoolExecutor
import hashlib, time

app = FastAPI()
executor = ThreadPoolExecutor(max_workers=4)

def heavy_hash(data: bytes):
    # 模擬 CPU 密集型運算(如大量加密雜湊)
    time.sleep(2)   # 假裝計算時間
    return hashlib.sha256(data).hexdigest()

def schedule_heavy_hash(data: bytes, background_tasks: BackgroundTasks):
    # 使用 executor 讓任務跑在獨立執行緒中
    future = executor.submit(heavy_hash, data)
    # 若需要在背景任務完成後做後續處理,可在此處加入回呼
    # future.add_done_callback(lambda f: print(f.result()))

@app.post("/hash/")
async def upload_and_hash(file: bytes, background_tasks: BackgroundTasks):
    schedule_heavy_hash(file, background_tasks)
    return {"msg": "檔案已接收,雜湊計算中"}

說明

  • BackgroundTasks 本身是在 同一個 事件迴圈內執行,對於 CPU 密集型工作會阻塞其他 I/O 任務。此時可透過 ThreadPoolExecutor 把工作交給工作執行緒池,避免卡住事件迴圈。
  • max_workers 應根據實際 CPU 核心數與工作負載調整。

常見陷阱與最佳實踐

陷阱 可能的後果 解決方案
在背景任務中直接拋出例外 任務中斷,其他任務仍執行,但錯誤只會寫入日誌,難以追蹤。 使用 try/except 包裹每個任務,並將錯誤寫入結構化日誌(如 Sentry)。
使用阻塞 I/O(例如 requests 會阻塞整個事件迴圈,導致其他請求卡住。 改用非阻塞庫(httpx.AsyncClient)或將阻塞呼叫包在 ThreadPoolExecutor
背景任務依賴於請求範圍的資源(如 DB 連線) 請求結束後資源可能被關閉,導致任務失敗。 在背景任務內自行建立/關閉 DB 連線,或使用全域的連線池。
過度使用 BackgroundTasks 而忽略佇列系統 伺服器重啟時未完成的任務會遺失,且無法保證任務執行次序/重試。 當任務重要且需要可靠性時,改用 Celery、RQ、或 Kafka 等外部佇列。
忘記限制同時執行的任務數量 大量同時排入的任務會耗盡 CPU/記憶體,導致服務崩潰。 使用 SemaphoreThreadPoolExecutor 或佇列系統的併發限制。

最佳實踐

  1. 任務拆分:每個背景任務只做一件事,保持函式短小、易測。
  2. 非阻塞優先:盡可能使用 async 函式與非阻塞 I/O,減少事件迴圈阻塞。
  3. 錯誤捕捉與上報:在每個任務內部捕捉例外,並將錯誤寫入結構化日誌或外部監控系統。
  4. 資源管理:不要在背景任務中直接使用請求範圍的依賴(如 Depends 產生的 DB session),應自行取得或使用全域池。
  5. 測試:使用 TestClient 並搭配 pytestcapsyscaplog 來驗證背景任務是否被正確排入與執行。
  6. 監控:在生產環境部署時,觀察事件迴圈的延遲(如 uvicorn --log-level debug)以及背景任務的執行時間,必要時加入統計指標(Prometheus)。

實際應用場景

場景 為什麼適合使用多任務背景執行
使用者註冊 同時寄送驗證信、寫入審計日誌、建立預設設定檔。
上傳圖片 產生多尺寸縮圖、上傳原圖至 CDN、更新資料庫的圖片 URL。
電商下單 寫入訂單資料、觸發第三方支付 Webhook、清除商品快取、發送訂單確認信。
報表產生 產生 CSV、PDF兩種格式、上傳至雲端儲存、通知使用者完成。
批次同步 從外部 API 取得資料、寫入本地資料庫、發送同步完成的 Slack 訊息。

範例:在電商平台的「下單」API 中,我們可以同時排入四個背景任務:audit_logtrigger_webhookclear_cachesend_email。主流程只負責寫入訂單資料庫,回傳成功訊息後,四個任務會在背景中依序或平行完成,確保使用者感受到即時回應,同時系統仍能完成所有後續處理。


總結

  • FastAPI 的 BackgroundTasks 讓我們可以在同一次請求中排入 多個 任務,避免長時間阻塞使用者的回應。
  • 使用 同步非阻塞 async、或 ThreadPoolExecutor 三種方式,依工作性質(I/O、CPU)選擇最適合的實作。
  • 陷阱 包括阻塞 I/O、資源管理不當、以及缺乏錯誤上報;對策是採用非阻塞庫、在任務內自行管理資源、以及完善的日誌與監控。
  • 當任務 重要性可靠性併發量 超過單機背景任務能承受的範圍時,應考慮外部佇列(Celery、RQ 等)作為補強。

透過本篇的概念與範例,你已具備在 FastAPI 專案中 安全、有效率地同時執行多個背景任務 的能力。從現在開始,將繁重的後續工作交給背景任務處理,讓 API 回應更快、使用者體驗更佳,同時保持程式碼的可讀性與可維護性。祝開發順利! 🚀