FastAPI 背景任務(Background Tasks)與 Celery / RQ / Dramatiq 整合
簡介
在 Web API 中,即時回應 與 長時間執行的工作 常常是衝突的。使用者期待快速得到 HTTP 200,但後端可能需要發送郵件、產生報表或執行影像處理等耗時任務。FastAPI 內建的 BackgroundTasks 能在回應結束後繼續執行簡單任務,然而在分散式、需要重試或排程的情境下,我們需要更強大的背景工作系統。
本篇文章將說明如何把 Celery、Redis Queue (RQ)、Dramatiq 三大常見的任務佇列與 FastAPI 結合,讓你能在保持 API 高效回應的同時,安全、可靠地處理大量背景工作。
核心概念
1. 為什麼要使用佇列系統?
- 可擴展:工作可以分散到多台 worker 主機。
- 容錯:失敗的任務可自動重試或放入死信佇列。
- 排程:支援延遲執行、週期性任務。
- 資源隔離:CPU/IO 密集的工作不會阻塞 API 服務。
2. FastAPI 的 BackgroundTasks
BackgroundTasks 只適合 短小且不需要持久化 的工作,例如寫入日誌、簡單的快取更新。它在回應送出後於同一個工作進程執行,若該進程崩潰,任務會遺失。
3. Celery、RQ、Dramatiq 的共同特點
| 特性 | Celery | RQ | Dramatiq |
|---|---|---|---|
| Broker | RabbitMQ、Redis、SQS… | Redis | RabbitMQ、Redis |
| 後端 (結果儲存) | Redis、Database、Cache | Redis | Redis、MongoDB |
| 自動重試 | ✅ | ✅ (需自行實作) | ✅ |
| Schedule | Celery Beat | RQ Scheduler | Dramatiq‑Cron |
| 學習曲線 | 中等 | 低 | 中等 |
以下分別示範三種整合方式,並提供 完整可執行的範例。
程式碼範例
為了讓範例易於測試,我們統一使用 Redis 作為訊息代理(Broker)與結果後端。
1. FastAPI + Celery
# app/main.py
from fastapi import FastAPI, BackgroundTasks
from celery import Celery
# 建立 Celery 實例
celery_app = Celery(
"worker",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
app = FastAPI()
@celery_app.task
def send_welcome_email(user_id: int, email: str):
"""
假裝寄送驗證信,實務上會呼叫外部 SMTP 服務。
"""
import time, logging
time.sleep(5) # 模擬耗時
logging.info(f"Sent welcome email to {email} (user_id={user_id})")
return {"status": "sent", "user_id": user_id}
@app.post("/register")
async def register(username: str, email: str, background: BackgroundTasks):
"""
註冊 API:立即回應,背景任務交給 Celery 處理。
"""
# 這裡應該寫入 DB,省略
# 透過 Celery 非同步執行
send_welcome_email.delay(user_id=123, email=email)
# 若想同時使用 FastAPI 原生 BackgroundTasks,可寫 background.add_task(...)
return {"msg": "註冊成功,驗證信已寄出"}
# 啟動 Celery worker
celery -A app.main.celery_app worker --loglevel=info
# 啟動 FastAPI
uvicorn app.main:app --reload
2. FastAPI + RQ
# app/rq_worker.py
import os, time, logging
from redis import Redis
from rq import Queue, Worker, Connection
redis_conn = Redis(host="localhost", port=6379, db=0)
queue = Queue("default", connection=redis_conn)
def generate_report(user_id: int):
"""模擬產生 PDF 報表的耗時工作"""
logging.info(f"Start generating report for user {user_id}")
time.sleep(8)
logging.info(f"Report for user {user_id} completed")
return f"/tmp/report_{user_id}.pdf"
# 啟動 worker 時使用
if __name__ == "__main__":
with Connection(redis_conn):
worker = Worker([queue])
worker.work()
# app/main.py
from fastapi import FastAPI
from redis import Redis
from rq import Queue
from rq_worker import generate_report # 讓 RQ 能序列化
app = FastAPI()
redis_conn = Redis(host="localhost", port=6379, db=0)
queue = Queue("default", connection=redis_conn)
@app.post("/report/{user_id}")
def request_report(user_id: int):
"""
使用 RQ 將報表產生工作推入佇列,立刻回傳任務 ID。
"""
job = queue.enqueue(generate_report, user_id)
return {"job_id": job.get_id(), "status": "queued"}
# 啟動 RQ worker
python app/rq_worker.py
# 啟動 FastAPI
uvicorn app.main:app --reload
3. FastAPI + Dramatiq
# app/dramatiq_worker.py
import dramatiq
from dramatiq.brokers.redis import RedisBroker
redis_broker = RedisBroker(host="localhost", port=6379, db=0)
dramatiq.set_broker(redis_broker)
@dramatiq.actor
def resize_image(image_path: str, size: tuple):
"""
假裝執行圖像縮放,實務上會呼叫 Pillow 或 OpenCV。
"""
import time, logging
logging.info(f"Resizing {image_path} to {size}")
time.sleep(4)
logging.info(f"Resize completed for {image_path}")
# app/main.py
from fastapi import FastAPI, UploadFile, File
from app.dramatiq_worker import resize_image
app = FastAPI()
@app.post("/upload")
async def upload_image(file: UploadFile = File(...)):
"""
接收上傳圖片後,立刻回傳成功訊息,縮圖工作交給 Dramatiq 處理。
"""
# 先把檔案寫到暫存目錄
tmp_path = f"/tmp/{file.filename}"
with open(tmp_path, "wb") as f:
content = await file.read()
f.write(content)
# 非同步呼叫 Dramatiq actor
resize_image.send(tmp_path, (200, 200))
return {"msg": "檔案已上傳,縮圖任務已排程"}
# 啟動 Dramatiq worker
dramatiq app.dramatiq_worker
# 啟動 FastAPI
uvicorn app.main:app --reload
常見陷阱與最佳實踐
| 陷阱 | 說明 | 解決方式 |
|---|---|---|
| 任務序列化失敗 | 若任務函式引用了不可序列化的物件(如 DB 連線、FastAPI Request)會拋錯。 |
只傳遞 簡單資料型別(int、str、dict),在 worker 內部自行建立 DB 連線或其他資源。 |
| 重試無限循環 | 設定不當會讓失敗任務不斷重試,耗盡資源。 | 為每個任務設定 max_retries 與 backoff(指數退避)。 |
| 結果遺失 | 使用 BackgroundTasks 而非佇列,進程崩潰會遺失任務。 |
只在需要持久化、可重試的情境使用 Celery/RQ/Dramatiq。 |
| 佇列阻塞 | 大量任務一次性推入,導致 Redis memory 爆炸。 | 使用 批次推送、流控(rate limiting)或 分區佇列。 |
| 環境不一致 | 開發與生產使用不同的 Broker/Backend,導致行為差異。 | 透過 Docker Compose 或 環境變數 統一配置,並在 CI 中測試佇列流程。 |
最佳實踐小結
明確劃分任務層級:
- 快速且不需持久化 →
BackgroundTasks - 需要重試、排程或大量併發 → Celery / RQ / Dramatiq
- 快速且不需持久化 →
使用 DTO(Data Transfer Object):將傳入的資料轉成純粹的
dict/pydantic模型,再傳給佇列,避免序列化問題。監控與可觀測性:結合 Prometheus、Grafana 或 Flower(Celery)監控任務排程、失敗率與執行時間。
安全性:切勿把機密資訊(API 金鑰、密碼)直接寫入任務參數,改用 環境變數 或 Vault,在 worker 端讀取。
測試:使用 pytest‑asyncio 搭配 celery‑test‑worker、rq‑test‑utils 或 dramatiq‑test,確保任務行為在單元測試中可驗證。
實際應用場景
| 場景 | 推薦使用的佇列 | 為何選擇 |
|---|---|---|
| 使用者註冊後寄送驗證信 | BackgroundTasks(小量)或 Celery(大量) |
需要保證送信成功,且可能有重試需求。 |
| 大量報表產生(每日千筆) | RQ 或 Dramatiq(簡易設定) | 只需要 Redis,且不需要複雜的工作流。 |
| 多步驟工作流(上傳 → 轉檔 → 推送通知) | Celery(支援 chord、chain) | 需要串接多個任務且支援結果回傳。 |
| 即時影像處理或縮圖 | Dramatiq(低延遲、適合 CPU 密集) | Actor 模式提供快速啟動與低記憶體占用。 |
| 定時清理過期資料 | Celery Beat 或 RQ Scheduler | 內建排程功能,無需額外 cron 設定。 |
總結
在 FastAPI 中,背景任務 並不是只有 BackgroundTasks 那麼簡單。當系統開始面臨 高併發、需要可靠重試或排程 的需求時,將 Celery、RQ、Dramatiq 這類成熟的任務佇列引入是最佳解法。
本文從概念說明、三種佇列的整合範例、常見陷阱與最佳實踐,最後列出實務應用場景,提供了從 開發、測試、部署到監控 的完整藍圖。只要依照上述步驟配置,便能讓你的 FastAPI 服務在保持 即時回應 的同時,安全、有效地處理各類耗時工作。祝開發順利,打造出高可用、彈性的 API 系統!