本文 AI 產出,尚未審核

FastAPI 與外部服務整合 – Celery 任務佇列

簡介

在 Web 應用中,即時回應背景工作 常常需要同時存在。
FastAPI 以其高效能與自動產生 OpenAPI 文件的特性,已成為 Python 生態系最受歡迎的框架之一;然而,當面對大量的 I/O 密集或計算密集任務(如寄送 Email、產生報表、影像處理)時,直接在 API 路由內執行會造成請求延遲,甚至可能導致服務崩潰。

此時,Celery——一個成熟且功能完整的分散式任務佇列系統,就能幫助我們把繁重的工作交給背景工作者(worker)處理,讓 API 只負責快速回應與任務排程。本文將以 FastAPI + Celery 的典型整合方式為例,從概念說明、實作範例、常見陷阱到最佳實踐,完整呈現如何在實務專案中運用 Celery。


核心概念

1. Celery 基本架構

Celery 主要由三個角色組成:

角色 功能說明
Producer(產生者) 由 FastAPI 路由或其他程式碼呼叫 delay() / apply_async(),把任務訊息送到 Broker
Broker(訊息中介) 常見的有 RedisRabbitMQ,負責暫存任務訊息,讓 worker 能夠取出。
Worker(工作者) 監聽 broker,取出任務後執行實際的業務程式。

重點:Celery 本身不儲存任務結果,若需要追蹤結果,需要額外設定 Result Backend(如 Redis、Database)。

2. 為什麼要在 FastAPI 中使用 Celery?

  • 即時回應:API 只需要回傳任務 ID,使用者即可在之後查詢狀態。
  • 水平擴展:Worker 可以根據負載水平擴增或縮減,與 FastAPI 應用分離。
  • 容錯與重試:Celery 內建重試機制,失敗的任務可以自動重新排程。

3. 基本安裝與設定

pip install fastapi[all] celery redis uvicorn

這裡以 Redis 作為 broker 與 result backend,因為它安裝簡單且在開發環境中常見。

3.1 celery_config.py – Celery 設定檔

# celery_config.py
from celery import Celery

# 建立 Celery 實例,使用 Redis 作為 broker 與結果儲存
celery_app = Celery(
    "worker",
    broker="redis://localhost:6379/0",          # broker URL
    backend="redis://localhost:6379/1",         # result backend
)

# 讓 Celery 能自動載入任務模組
celery_app.autodiscover_tasks(["app.tasks"])

4. 定義背景任務

4.1 範例 1:寄送驗證 Email

# app/tasks/email_tasks.py
from celery import shared_task
import smtplib
from email.mime.text import MIMEText

@shared_task
def send_verification_email(to_address: str, token: str):
    """
    背景任務:寄送驗證信給使用者
    """
    subject = "註冊驗證信"
    body = f"請點擊以下連結完成驗證: https://example.com/verify/{token}"
    msg = MIMEText(body, "plain", "utf-8")
    msg["Subject"] = subject
    msg["From"] = "no-reply@example.com"
    msg["To"] = to_address

    # 這裡僅示範,實務上建議使用專業的郵件服務 (SendGrid、Mailgun)
    with smtplib.SMTP("localhost") as server:
        server.send_message(msg)

4.2 範例 2:產生 PDF 報表

# app/tasks/report_tasks.py
from celery import shared_task
import io
from reportlab.pdfgen import canvas

@shared_task
def generate_pdf_report(user_id: int) -> bytes:
    """
    背景任務:根據使用者資料產生 PDF 報表,回傳二進位檔案
    """
    buffer = io.BytesIO()
    p = canvas.Canvas(buffer)
    p.drawString(100, 750, f"User Report for ID: {user_id}")
    # 這裡可以加入更多圖表、資料表等
    p.showPage()
    p.save()
    buffer.seek(0)
    return buffer.read()

4.3 範例 3:圖片縮圖(CPU 密集)

# app/tasks/image_tasks.py
from celery import shared_task
from PIL import Image
import pathlib

@shared_task
def create_thumbnail(image_path: str, size: tuple = (200, 200)):
    """
    背景任務:將原圖縮小為縮圖,存回同一目錄
    """
    img = Image.open(image_path)
    img.thumbnail(size)
    thumb_path = pathlib.Path(image_path).with_name(
        f"{pathlib.Path(image_path).stem}_thumb{pathlib.Path(image_path).suffix}"
    )
    img.save(thumb_path)
    return str(thumb_path)

4.4 範例 4:重試機制範例

# app/tasks/retry_tasks.py
from celery import shared_task
import random

@shared_task(bind=True, max_retries=3, default_retry_delay=10)
def unreliable_task(self, payload: str):
    """
    示範任務失敗時自動重試
    """
    try:
        # 假設 50% 機率拋出例外
        if random.random() < 0.5:
            raise ValueError("模擬失敗")
        return f"成功處理: {payload}"
    except Exception as exc:
        # 重新排程,最多重試 3 次
        raise self.retry(exc=exc)

5. 在 FastAPI 中呼叫 Celery 任務

5.1 建立 FastAPI 應用

# main.py
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel, EmailStr
from app.tasks.email_tasks import send_verification_email
from app.tasks.report_tasks import generate_pdf_report
from app.tasks.image_tasks import create_thumbnail
from app.tasks.retry_tasks import unreliable_task
from celery.result import AsyncResult
from celery_config import celery_app

app = FastAPI(title="FastAPI + Celery Demo")

class RegisterForm(BaseModel):
    email: EmailStr

class ReportRequest(BaseModel):
    user_id: int

class ImageRequest(BaseModel):
    image_path: str

@app.post("/register")
async def register(form: RegisterForm):
    """
    使用者註冊 → 產生驗證 token → 背景寄送驗證信
    """
    token = "dummy-token-123"   # 實務上應使用 JWT 或隨機字串
    task = send_verification_email.delay(form.email, token)
    return {"msg": "驗證信已送出", "task_id": task.id}

@app.post("/report")
async def create_report(req: ReportRequest):
    """
    產生 PDF 報表 → 回傳任務 ID,使用者可稍後查詢結果
    """
    task = generate_pdf_report.delay(req.user_id)
    return {"task_id": task.id}

@app.get("/report/{task_id}")
async def get_report(task_id: str):
    """
    取得 PDF 報表的二進位結果(若已完成)
    """
    result = AsyncResult(task_id, app=celery_app)
    if not result.ready():
        raise HTTPException(status_code=202, detail="報表尚在產出中")
    # 直接回傳二進位檔案
    return {"pdf_bytes": result.get()}

@app.post("/thumbnail")
async def thumbnail(req: ImageRequest):
    """
    上傳圖片後,背景產生縮圖
    """
    task = create_thumbnail.delay(req.image_path)
    return {"task_id": task.id}

@app.get("/task/{task_id}/status")
async def task_status(task_id: str):
    """
    查詢任務狀態(PENDING, STARTED, SUCCESS, FAILURE)
    """
    result = AsyncResult(task_id, app=celery_app)
    return {"task_id": task_id, "status": result.status, "result": result.result}

說明

  • delay()apply_async() 的簡寫,會立即把任務送到 broker。
  • AsyncResult 可用來查詢任務的狀態結果
  • 若任務尚未完成,回傳 202 Accepted,讓前端知道仍在處理中。

6. 啟動與測試

# 1. 啟動 Redis(假設已安裝 Docker)
docker run -d -p 6379:6379 redis:7

# 2. 啟動 Celery worker
celery -A celery_config.celery_app worker --loglevel=info

# 3. 啟動 FastAPI
uvicorn main:app --reload

常見陷阱與最佳實踐

陷阱 可能的後果 解決方案 / Best Practice
任務序列化失敗(例如傳入不可序列化的物件) 任務卡在 queue,無法執行 只傳遞 JSON 可序列化的資料(字串、數字、列表、字典)。必要時自行實作 json.dumps / json.loads
長時間執行的任務未設定超時 Worker 被卡住,導致佇列阻塞 使用 task_time_limittask_soft_time_limit 參數限制執行時間。
結果儲存於 Redis 時容量不足 結果被截斷或丟失 定期清理過期結果,或改用 Database(PostgreSQL)作為 backend。
在開發環境直接使用 celery -A,忘記載入任務模組 Worker 啟動後找不到任務,報 NotRegistered 錯誤 確保 celery_app.autodiscover_tasks([...]) 包含所有任務所在的 Python package。
忽略重試次數與延遲設定 失敗任務持續重試導致資源耗盡 針對外部 API 設定合理的 max_retriesdefault_retry_delay,必要時加入指數退避。
使用同步的資料庫連線(如 SQLAlchemy)於 Celery 任務內未建立新 Session 產生「已關閉的連線」錯誤 在每個任務內部 建立關閉 資料庫 Session,或使用 @celery_app.task(bind=True) 取得 self.request 來管理連線。

其他最佳實踐

  1. 分離設定:將 Celery、FastAPI、資料庫的環境變數分別管理,避免在程式碼中硬編碼。
  2. 監控:使用 Flower(Celery 的 UI)或 Prometheus + Grafana 監控佇列長度、失敗率。
  3. 測試:利用 celery.app.task.Task.apply() 直接在單元測試中執行任務,避免依賴 broker。
  4. 安全:若任務內含敏感資訊(如 API 金鑰),請使用 環境變數Vault,不要直接放在訊息 payload 中。
  5. 版本相容:確保 FastAPI、Pydantic 與 Celery 的版本相容,尤其是 Python 3.11+ 時,有些舊版 Celery 仍未完全支援。

實際應用場景

場景 為何使用 Celery 範例實作
使用者註冊 → 發送驗證信 Email 送出可能需要數秒,若同步會拖慢 API 回應。 send_verification_email.delay(email, token)
大型報表產生(每日 10,000 筆交易) 計算與 PDF 產生屬 CPU 密集,適合分散到多個 worker。 generate_pdf_report.delay(user_id)
圖片/影片處理(上傳後需轉碼、產生縮圖) 需要大量 I/O 與 CPU,且可能失敗,需要重試。 create_thumbnail.delay(path)
第三方 API 整合(付款、物流) 第三方服務可能暫時不可用,需重試或排程。 unreliable_task.delay(payload)
資料清理與批次匯出(每夜執行) 這類工作不需要即時回應,適合排程(Celery Beat) celery -A ... beat 配合 @periodic_task

總結

  • Celery 為 FastAPI 補足背景工作需求的利器,讓 API 能保持 快速回應,同時把耗時或不穩定的任務交給專門的 worker 處理。
  • 透過 Redis(或 RabbitMQ)作為 broker,配合 Result Backend,我們可以輕鬆追蹤任務狀態、取得結果,甚至利用 重試與超時 機制提升系統韌性。
  • 在實作時,務必注意 資料序列化資源清理監控安全,避免常見的陷阱。
  • 只要遵循本文的 最佳實踐,從簡單的 Email 任務到複雜的批次報表、影像處理,都能在 FastAPI 專案中順利導入 Celery,提升開發效率與系統可擴充性。

祝你在 FastAPI + Celery 的旅程中玩得開心,打造出高效、可靠的 Web 服務!