本文 AI 產出,尚未審核

FastAPI 背景任務(BackgroundTasks)基礎教學


簡介

在 Web API 開發中,我們常常會遇到「這個請求必須先回傳結果,之後才要執行一些較耗時的工作」的情境。
如果直接在路由函式內部執行長時間的任務,使用者必須等待整個工作完成才會收到回應,會嚴重影響服務的效能與使用者體驗。

FastAPI 為了在 非阻塞 的前提下提供簡易的背景工作機制,內建了 BackgroundTasks 類別。
只要把要延後執行的函式註冊到 BackgroundTasks,FastAPI 會在回傳 HTTP 回應之後,於同一個工作執行緒(或事件迴圈)中自動呼叫它。
這讓我們可以輕鬆地把 寄送 Email、寫入日誌、產生報表、清理暫存檔 等操作,從主要請求流程中抽離出來,保持 API 的即時回應速度。

本篇文章將從概念說明、實作範例、常見陷阱與最佳實踐,帶你完整掌握 FastAPI 背景任務的 基本用法,適合剛接觸 FastAPI 的新手以及想在既有專案中加入背景工作流程的中級開發者。


核心概念

1. 為什麼使用 BackgroundTasks?

情境 若不使用背景任務 使用 BackgroundTasks 後
寄送驗證信 請求必須等到 Email 服務回應,可能花 2~5 秒 立即回傳 202,Email 在背景完成
產生大型 PDF 報表 使用者必須等待報表產生完畢才能得到回應 先回傳任務 ID,報表完成後可自行下載
清除過期快取 直接在每個請求裡跑,會降低每次請求的效能 只在特定時機或排程觸發,與主要請求無關

重點:BackgroundTasks 不是完整的任務排程系統(如 Celery、RQ),它適合「簡單、立即」的背景工作;若需要分散式、重試機制或長時間排程,仍建議使用外部任務佇列。

2. BackgroundTasks 的工作原理

  1. 依賴注入:在路由函式的參數中加入 background_tasks: BackgroundTasks,FastAPI 會自動建立並注入物件。
  2. 註冊任務:呼叫 background_tasks.add_task(func, *args, **kwargs),把要延遲執行的函式與參數加進佇列。
  3. 回傳 Response:FastAPI 先把 HTTP 回應送給客戶端,之後 會在同一個事件迴圈(若使用 uvicorn 的 async workers)或工作執行緒中依序呼叫已註冊的函式。

注意:背景任務的執行環境與原始請求相同(同一個 Python 進程),因此 不應該在背景任務裡做會阻塞整個事件迴圈的同步 I/O,除非你已將 FastAPI 設定為使用多執行緒/多進程的 worker。

3. 基本使用步驟

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def write_log(message: str):
    """將訊息寫入本機檔案,模擬 I/O 操作"""
    with open("log.txt", "a", encoding="utf-8") as f:
        f.write(message + "\n")

@app.post("/items/")
async def create_item(name: str, background_tasks: BackgroundTasks):
    # 主要業務邏輯:建立資料庫紀錄(此處省略)
    # ...

    # 註冊背景任務
    background_tasks.add_task(write_log, f"Item created: {name}")

    return {"msg": "Item created, log will be written in background"}
  • write_log 為同步函式;FastAPI 會在回傳結果後於 執行緒池 中執行它。
  • write_log非同步async def),則會直接在事件迴圈內執行,無需額外執行緒。

程式碼範例

以下提供 五個實用範例,示範不同情境下的 BackgroundTasks 用法,並附上說明註解。

範例 1️⃣:寄送驗證 Email(非同步 I/O)

# file: main.py
from fastapi import FastAPI, BackgroundTasks, HTTPException
import httpx  # 非同步 HTTP 客戶端

app = FastAPI()

async def send_verification_email(email: str, token: str):
    """
    呼叫外部 Email 服務(假設提供 /send API),
    使用 httpx 的非同步請求以免阻塞事件迴圈。
    """
    async with httpx.AsyncClient() as client:
        payload = {"to": email, "subject": "驗證信", "body": f"Your token: {token}"}
        resp = await client.post("https://email-service.local/send", json=payload, timeout=10.0)
        resp.raise_for_status()  # 若失敗會拋出例外

@app.post("/register/")
async def register_user(email: str, background_tasks: BackgroundTasks):
    # 產生驗證 token(簡化示意)
    token = "123456"
    # 直接回傳註冊成功訊息
    background_tasks.add_task(send_verification_email, email, token)
    return {"msg": "註冊成功,驗證信已發送(背景)"}

說明

  • 使用 httpx.AsyncClient 讓 Email 請求保持非同步。
  • 若 Email 服務回傳錯誤,例外會在背景任務內被拋出,預設不會影響已回傳的 HTTP 回應。

範例 2️⃣:產生 CSV 報表(同步 CPU 密集)

# file: report.py
import csv
import time
from pathlib import Path

def generate_csv_report(user_id: int):
    """
    模擬耗時的報表產生(CPU 密集),寫入 tmp 目錄。
    """
    filename = Path("/tmp") / f"report_user_{user_id}.csv"
    # 假設需要 3 秒才能完成
    time.sleep(3)
    with open(filename, "w", newline="", encoding="utf-8") as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(["項目", "數量"])
        writer.writerow(["A", 10])
        writer.writerow(["B", 20])
    return str(filename)

# main.py
from fastapi import FastAPI, BackgroundTasks
from report import generate_csv_report

app = FastAPI()

@app.post("/report/{user_id}")
async def request_report(user_id: int, background_tasks: BackgroundTasks):
    # 立刻回傳任務已接受,實際報表稍後產生
    background_tasks.add_task(generate_csv_report, user_id)
    return {"msg": f"報表產生中,稍後可於 /download/{user_id} 取得"}

說明

  • generate_csv_report 為同步且 CPU 密集,FastAPI 會把它交給 執行緒池,避免阻塞主事件迴圈。
  • 若報表產生失敗,可在函式內自行寫入失敗日誌或使用 try/except 捕捉例外。

範例 3️⃣:清除過期快取(批次 I/O)

# file: cache_cleanup.py
import os
import time

CACHE_DIR = "/tmp/cache"

def clean_expired_files(max_age_seconds: int = 86400):
    """
    刪除快取目錄中超過 max_age_seconds 的檔案。
    """
    now = time.time()
    for filename in os.listdir(CACHE_DIR):
        path = os.path.join(CACHE_DIR, filename)
        if os.path.isfile(path):
            age = now - os.path.getmtime(path)
            if age > max_age_seconds:
                os.remove(path)
                print(f"Removed expired cache: {path}")

# main.py
from fastapi import FastAPI, BackgroundTasks
from cache_cleanup import clean_expired_files

app = FastAPI()

@app.post("/cache/cleanup")
async def trigger_cleanup(background_tasks: BackgroundTasks):
    # 手動觸發一次快取清理
    background_tasks.add_task(clean_expired_files, max_age_seconds=3600)
    return {"msg": "快取清理已排入背景任務"}

說明

  • 此任務通常在 低流量時間管理介面 觸發。
  • 若快取目錄非常大,建議改用 asyncio.to_thread(Python 3.9+)或外部排程工具。

範例 4️⃣:結合 asyncio.create_task 進階使用

import asyncio
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

async def async_heavy_job(data: str):
    await asyncio.sleep(2)   # 模擬非阻塞 I/O
    print(f"Job done with: {data}")

@app.post("/async-job")
async def launch_job(data: str, background_tasks: BackgroundTasks):
    # 直接使用 asyncio.create_task 讓任務在同一事件迴圈內平行執行
    background_tasks.add_task(asyncio.create_task, async_heavy_job(data))
    return {"msg": "非同步工作已在背景執行"}

說明

  • BackgroundTasks.add_task 接受任何 callable;我們把 asyncio.create_task 當作外層函式,將真正的 async 任務排入事件迴圈。
  • 這種寫法在需要 同時啟動多個 async 任務 時特別方便。

範例 5️⃣:使用 Depends 注入自訂服務(DI)

from fastapi import FastAPI, BackgroundTasks, Depends

class NotificationService:
    async def push(self, user_id: int, message: str):
        # 假設呼叫第三方推播平台
        await asyncio.sleep(0.5)
        print(f"Pushed to {user_id}: {message}")

def get_notifier() -> NotificationService:
    # 這裡可以放置連線池、設定檔等
    return NotificationService()

app = FastAPI()

@app.post("/notify/{user_id}")
async def send_notification(
    user_id: int,
    message: str,
    background_tasks: BackgroundTasks,
    notifier: NotificationService = Depends(get_notifier),
):
    background_tasks.add_task(notifier.push, user_id, message)
    return {"msg": "通知已排入背景任務"}

說明

  • 透過 FastAPI 的 依賴注入(Depends)取得自訂服務實例,讓背景任務能直接使用已建立好的物件(例如已連線的 Redis、資料庫或外部 API 客戶端)。
  • 這樣的寫法保持程式碼的可測試性與可維護性。

常見陷阱與最佳實踐

陷阱 可能的後果 解決方案 / 最佳實踐
在背景任務裡使用阻塞的同步 I/O(如 requeststime.sleep 會阻塞執行緒池,若同時有大量請求會耗盡執行緒,導致服務斷流 改用非同步客戶端(httpx.AsyncClient)或使用 asyncio.to_thread 把同步 I/O 包裝成執行緒任務
背景任務拋出未捕捉的例外 例外會被記錄在伺服器日誌,但不會回傳給客戶端,容易忽略失敗 在背景函式內使用 try/except,並自行寫入失敗日誌、或把錯誤資訊寫入資料庫以供後續查詢
任務需長時間執行(超過幾分鐘) 背景任務仍會在同一個進程中執行,若進程因為其他原因重啟會遺失任務 對於長時間、需要重試或分散式的工作,考慮使用 Celery、RQ、Dramatiq 等外部任務佇列
共享全域變數 多個背景任務同時寫入/讀取同一變數會產生競爭條件 使用 thread‑safe 結構(如 queue.Queue)或將狀態持久化到資料庫/Redis
忘記在 uvicorn 啟動指令加上 --workers 單一 worker 內部的背景任務若卡住,會影響所有請求 若應用需要更高併發,使用多 worker(uvicorn main:app --workers 4)或在 Docker/K8s 中水平擴展

其他最佳實踐

  1. 保持背景任務的執行時間盡可能短:若任務需要較長時間,考慮拆成多個小步驟或改用排程系統。
  2. 統一日誌格式:在背景任務內部使用與主程式相同的 logger,方便追蹤與排錯。
  3. 避免在背景任務裡直接存取 request 物件request 只在主請求的生命週期內有效,背景任務執行時已經脫離該上下文。若需要資訊,請在加入任務時把所需資料作為參數傳入。
  4. 測試時使用 TestClient:FastAPI 的測試客戶端會同步等待背景任務完成,這有助於在單元測試中驗證任務行為。

實際應用場景

場景 為什麼適合使用 BackgroundTasks 範例程式碼概念
使用者註冊後寄送驗證郵件 需要即時回應,郵件服務可能較慢 background_tasks.add_task(send_email, ...)
上傳檔案後產生縮圖 圖片處理耗時,且不需要立即回傳縮圖 URL background_tasks.add_task(create_thumbnail, file_path)
每日資料匯出 (CSV / Excel) 使用者點擊「匯出」後即回傳任務 ID,稍後下載 background_tasks.add_task(export_report, user_id)
API 呼叫外部付款平台 付款成功後再發送「付款成功」通知,避免阻塞付款流程 background_tasks.add_task(notify_payment_success, order_id)
定期清理暫存或過期資料 透過管理介面或自動觸發,保持資源乾淨 background_tasks.add_task(clean_expired_files)

小技巧:若想讓使用者能查詢背景任務的執行狀態,可在任務開始前在資料庫寫入 pending,完成後更新為 finishedfailed,前端透過輪詢或 WebSocket 取得最新狀態。


總結

FastAPI 的 BackgroundTasks簡易且高效 的背景工作機制,讓開發者只需要幾行程式碼就能把 耗時或非即時 的操作從主要請求流程中抽離。
透過本篇的概念說明與五個實作範例,你應該已掌握:

  • 如何在路由函式中注入 BackgroundTasks
  • 何時使用 同步非同步、或 asyncio.create_task 的寫法
  • 常見的陷阱(阻塞 I/O、未捕捉例外、長時間任務)以及對應的最佳實踐
  • 真實的業務場景(Email、報表、快取清理、推播等)如何落地

在實務開發中,先把背景任務做得簡單、可觀測,再根據需求演進到更完整的任務佇列系統,這樣既能快速交付功能,又不會在未來的擴充上卡住。祝你在 FastAPI 專案中玩得開心,寫出高效、可維護的 API!