本文 AI 產出,尚未審核

FastAPI 背景任務(Background Tasks)與 Celery / RQ / Dramatiq 整合

簡介

在 Web API 中,即時回應長時間執行的工作 常常是衝突的。使用者期待快速得到 HTTP 200,但後端可能需要發送郵件、產生報表或執行影像處理等耗時任務。FastAPI 內建的 BackgroundTasks 能在回應結束後繼續執行簡單任務,然而在分散式、需要重試或排程的情境下,我們需要更強大的背景工作系統。

本篇文章將說明如何把 CeleryRedis Queue (RQ)Dramatiq 三大常見的任務佇列與 FastAPI 結合,讓你能在保持 API 高效回應的同時,安全、可靠地處理大量背景工作。

核心概念

1. 為什麼要使用佇列系統?

  • 可擴展:工作可以分散到多台 worker 主機。
  • 容錯:失敗的任務可自動重試或放入死信佇列。
  • 排程:支援延遲執行、週期性任務。
  • 資源隔離:CPU/IO 密集的工作不會阻塞 API 服務。

2. FastAPI 的 BackgroundTasks

BackgroundTasks 只適合 短小且不需要持久化 的工作,例如寫入日誌、簡單的快取更新。它在回應送出後於同一個工作進程執行,若該進程崩潰,任務會遺失。

3. Celery、RQ、Dramatiq 的共同特點

特性 Celery RQ Dramatiq
Broker RabbitMQ、Redis、SQS… Redis RabbitMQ、Redis
後端 (結果儲存) Redis、Database、Cache Redis Redis、MongoDB
自動重試 ✅ (需自行實作)
Schedule Celery Beat RQ Scheduler Dramatiq‑Cron
學習曲線 中等 中等

以下分別示範三種整合方式,並提供 完整可執行的範例

程式碼範例

為了讓範例易於測試,我們統一使用 Redis 作為訊息代理(Broker)與結果後端。

1. FastAPI + Celery

# app/main.py
from fastapi import FastAPI, BackgroundTasks
from celery import Celery

# 建立 Celery 實例
celery_app = Celery(
    "worker",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

app = FastAPI()


@celery_app.task
def send_welcome_email(user_id: int, email: str):
    """
    假裝寄送驗證信,實務上會呼叫外部 SMTP 服務。
    """
    import time, logging
    time.sleep(5)  # 模擬耗時
    logging.info(f"Sent welcome email to {email} (user_id={user_id})")
    return {"status": "sent", "user_id": user_id}


@app.post("/register")
async def register(username: str, email: str, background: BackgroundTasks):
    """
    註冊 API:立即回應,背景任務交給 Celery 處理。
    """
    # 這裡應該寫入 DB,省略
    # 透過 Celery 非同步執行
    send_welcome_email.delay(user_id=123, email=email)
    # 若想同時使用 FastAPI 原生 BackgroundTasks,可寫 background.add_task(...)
    return {"msg": "註冊成功,驗證信已寄出"}
# 啟動 Celery worker
celery -A app.main.celery_app worker --loglevel=info
# 啟動 FastAPI
uvicorn app.main:app --reload

2. FastAPI + RQ

# app/rq_worker.py
import os, time, logging
from redis import Redis
from rq import Queue, Worker, Connection

redis_conn = Redis(host="localhost", port=6379, db=0)
queue = Queue("default", connection=redis_conn)


def generate_report(user_id: int):
    """模擬產生 PDF 報表的耗時工作"""
    logging.info(f"Start generating report for user {user_id}")
    time.sleep(8)
    logging.info(f"Report for user {user_id} completed")
    return f"/tmp/report_{user_id}.pdf"


# 啟動 worker 時使用
if __name__ == "__main__":
    with Connection(redis_conn):
        worker = Worker([queue])
        worker.work()
# app/main.py
from fastapi import FastAPI
from redis import Redis
from rq import Queue
from rq_worker import generate_report  # 讓 RQ 能序列化

app = FastAPI()
redis_conn = Redis(host="localhost", port=6379, db=0)
queue = Queue("default", connection=redis_conn)


@app.post("/report/{user_id}")
def request_report(user_id: int):
    """
    使用 RQ 將報表產生工作推入佇列,立刻回傳任務 ID。
    """
    job = queue.enqueue(generate_report, user_id)
    return {"job_id": job.get_id(), "status": "queued"}
# 啟動 RQ worker
python app/rq_worker.py
# 啟動 FastAPI
uvicorn app.main:app --reload

3. FastAPI + Dramatiq

# app/dramatiq_worker.py
import dramatiq
from dramatiq.brokers.redis import RedisBroker

redis_broker = RedisBroker(host="localhost", port=6379, db=0)
dramatiq.set_broker(redis_broker)


@dramatiq.actor
def resize_image(image_path: str, size: tuple):
    """
    假裝執行圖像縮放,實務上會呼叫 Pillow 或 OpenCV。
    """
    import time, logging
    logging.info(f"Resizing {image_path} to {size}")
    time.sleep(4)
    logging.info(f"Resize completed for {image_path}")
# app/main.py
from fastapi import FastAPI, UploadFile, File
from app.dramatiq_worker import resize_image

app = FastAPI()


@app.post("/upload")
async def upload_image(file: UploadFile = File(...)):
    """
    接收上傳圖片後,立刻回傳成功訊息,縮圖工作交給 Dramatiq 處理。
    """
    # 先把檔案寫到暫存目錄
    tmp_path = f"/tmp/{file.filename}"
    with open(tmp_path, "wb") as f:
        content = await file.read()
        f.write(content)

    # 非同步呼叫 Dramatiq actor
    resize_image.send(tmp_path, (200, 200))
    return {"msg": "檔案已上傳,縮圖任務已排程"}
# 啟動 Dramatiq worker
dramatiq app.dramatiq_worker
# 啟動 FastAPI
uvicorn app.main:app --reload

常見陷阱與最佳實踐

陷阱 說明 解決方式
任務序列化失敗 若任務函式引用了不可序列化的物件(如 DB 連線、FastAPI Request)會拋錯。 只傳遞 簡單資料型別(int、str、dict),在 worker 內部自行建立 DB 連線或其他資源。
重試無限循環 設定不當會讓失敗任務不斷重試,耗盡資源。 為每個任務設定 max_retriesbackoff(指數退避)。
結果遺失 使用 BackgroundTasks 而非佇列,進程崩潰會遺失任務。 只在需要持久化、可重試的情境使用 Celery/RQ/Dramatiq。
佇列阻塞 大量任務一次性推入,導致 Redis memory 爆炸。 使用 批次推送流控(rate limiting)或 分區佇列
環境不一致 開發與生產使用不同的 Broker/Backend,導致行為差異。 透過 Docker Compose環境變數 統一配置,並在 CI 中測試佇列流程。

最佳實踐小結

  1. 明確劃分任務層級

    • 快速且不需持久化BackgroundTasks
    • 需要重試、排程或大量併發 → Celery / RQ / Dramatiq
  2. 使用 DTO(Data Transfer Object):將傳入的資料轉成純粹的 dict/pydantic 模型,再傳給佇列,避免序列化問題。

  3. 監控與可觀測性:結合 PrometheusGrafanaFlower(Celery)監控任務排程、失敗率與執行時間。

  4. 安全性:切勿把機密資訊(API 金鑰、密碼)直接寫入任務參數,改用 環境變數Vault,在 worker 端讀取。

  5. 測試:使用 pytest‑asyncio 搭配 celery‑test‑workerrq‑test‑utilsdramatiq‑test,確保任務行為在單元測試中可驗證。

實際應用場景

場景 推薦使用的佇列 為何選擇
使用者註冊後寄送驗證信 BackgroundTasks(小量)或 Celery(大量) 需要保證送信成功,且可能有重試需求。
大量報表產生(每日千筆) RQDramatiq(簡易設定) 只需要 Redis,且不需要複雜的工作流。
多步驟工作流(上傳 → 轉檔 → 推送通知) Celery(支援 chord、chain) 需要串接多個任務且支援結果回傳。
即時影像處理或縮圖 Dramatiq(低延遲、適合 CPU 密集) Actor 模式提供快速啟動與低記憶體占用。
定時清理過期資料 Celery BeatRQ Scheduler 內建排程功能,無需額外 cron 設定。

總結

在 FastAPI 中,背景任務 並不是只有 BackgroundTasks 那麼簡單。當系統開始面臨 高併發、需要可靠重試或排程 的需求時,將 Celery、RQ、Dramatiq 這類成熟的任務佇列引入是最佳解法。

本文從概念說明、三種佇列的整合範例、常見陷阱與最佳實踐,最後列出實務應用場景,提供了從 開發、測試、部署到監控 的完整藍圖。只要依照上述步驟配置,便能讓你的 FastAPI 服務在保持 即時回應 的同時,安全、有效地處理各類耗時工作。祝開發順利,打造出高可用、彈性的 API 系統!