本文 AI 產出,尚未審核
FastAPI 與外部服務整合 – Celery 任務佇列
簡介
在 Web 應用中,即時回應 與 背景工作 常常需要同時存在。
FastAPI 以其高效能與自動產生 OpenAPI 文件的特性,已成為 Python 生態系最受歡迎的框架之一;然而,當面對大量的 I/O 密集或計算密集任務(如寄送 Email、產生報表、影像處理)時,直接在 API 路由內執行會造成請求延遲,甚至可能導致服務崩潰。
此時,Celery——一個成熟且功能完整的分散式任務佇列系統,就能幫助我們把繁重的工作交給背景工作者(worker)處理,讓 API 只負責快速回應與任務排程。本文將以 FastAPI + Celery 的典型整合方式為例,從概念說明、實作範例、常見陷阱到最佳實踐,完整呈現如何在實務專案中運用 Celery。
核心概念
1. Celery 基本架構
Celery 主要由三個角色組成:
| 角色 | 功能說明 |
|---|---|
| Producer(產生者) | 由 FastAPI 路由或其他程式碼呼叫 delay() / apply_async(),把任務訊息送到 Broker。 |
| Broker(訊息中介) | 常見的有 Redis、RabbitMQ,負責暫存任務訊息,讓 worker 能夠取出。 |
| Worker(工作者) | 監聽 broker,取出任務後執行實際的業務程式。 |
重點:Celery 本身不儲存任務結果,若需要追蹤結果,需要額外設定 Result Backend(如 Redis、Database)。
2. 為什麼要在 FastAPI 中使用 Celery?
- 即時回應:API 只需要回傳任務 ID,使用者即可在之後查詢狀態。
- 水平擴展:Worker 可以根據負載水平擴增或縮減,與 FastAPI 應用分離。
- 容錯與重試:Celery 內建重試機制,失敗的任務可以自動重新排程。
3. 基本安裝與設定
pip install fastapi[all] celery redis uvicorn
這裡以 Redis 作為 broker 與 result backend,因為它安裝簡單且在開發環境中常見。
3.1 celery_config.py – Celery 設定檔
# celery_config.py
from celery import Celery
# 建立 Celery 實例,使用 Redis 作為 broker 與結果儲存
celery_app = Celery(
"worker",
broker="redis://localhost:6379/0", # broker URL
backend="redis://localhost:6379/1", # result backend
)
# 讓 Celery 能自動載入任務模組
celery_app.autodiscover_tasks(["app.tasks"])
4. 定義背景任務
4.1 範例 1:寄送驗證 Email
# app/tasks/email_tasks.py
from celery import shared_task
import smtplib
from email.mime.text import MIMEText
@shared_task
def send_verification_email(to_address: str, token: str):
"""
背景任務:寄送驗證信給使用者
"""
subject = "註冊驗證信"
body = f"請點擊以下連結完成驗證: https://example.com/verify/{token}"
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = subject
msg["From"] = "no-reply@example.com"
msg["To"] = to_address
# 這裡僅示範,實務上建議使用專業的郵件服務 (SendGrid、Mailgun)
with smtplib.SMTP("localhost") as server:
server.send_message(msg)
4.2 範例 2:產生 PDF 報表
# app/tasks/report_tasks.py
from celery import shared_task
import io
from reportlab.pdfgen import canvas
@shared_task
def generate_pdf_report(user_id: int) -> bytes:
"""
背景任務:根據使用者資料產生 PDF 報表,回傳二進位檔案
"""
buffer = io.BytesIO()
p = canvas.Canvas(buffer)
p.drawString(100, 750, f"User Report for ID: {user_id}")
# 這裡可以加入更多圖表、資料表等
p.showPage()
p.save()
buffer.seek(0)
return buffer.read()
4.3 範例 3:圖片縮圖(CPU 密集)
# app/tasks/image_tasks.py
from celery import shared_task
from PIL import Image
import pathlib
@shared_task
def create_thumbnail(image_path: str, size: tuple = (200, 200)):
"""
背景任務:將原圖縮小為縮圖,存回同一目錄
"""
img = Image.open(image_path)
img.thumbnail(size)
thumb_path = pathlib.Path(image_path).with_name(
f"{pathlib.Path(image_path).stem}_thumb{pathlib.Path(image_path).suffix}"
)
img.save(thumb_path)
return str(thumb_path)
4.4 範例 4:重試機制範例
# app/tasks/retry_tasks.py
from celery import shared_task
import random
@shared_task(bind=True, max_retries=3, default_retry_delay=10)
def unreliable_task(self, payload: str):
"""
示範任務失敗時自動重試
"""
try:
# 假設 50% 機率拋出例外
if random.random() < 0.5:
raise ValueError("模擬失敗")
return f"成功處理: {payload}"
except Exception as exc:
# 重新排程,最多重試 3 次
raise self.retry(exc=exc)
5. 在 FastAPI 中呼叫 Celery 任務
5.1 建立 FastAPI 應用
# main.py
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel, EmailStr
from app.tasks.email_tasks import send_verification_email
from app.tasks.report_tasks import generate_pdf_report
from app.tasks.image_tasks import create_thumbnail
from app.tasks.retry_tasks import unreliable_task
from celery.result import AsyncResult
from celery_config import celery_app
app = FastAPI(title="FastAPI + Celery Demo")
class RegisterForm(BaseModel):
email: EmailStr
class ReportRequest(BaseModel):
user_id: int
class ImageRequest(BaseModel):
image_path: str
@app.post("/register")
async def register(form: RegisterForm):
"""
使用者註冊 → 產生驗證 token → 背景寄送驗證信
"""
token = "dummy-token-123" # 實務上應使用 JWT 或隨機字串
task = send_verification_email.delay(form.email, token)
return {"msg": "驗證信已送出", "task_id": task.id}
@app.post("/report")
async def create_report(req: ReportRequest):
"""
產生 PDF 報表 → 回傳任務 ID,使用者可稍後查詢結果
"""
task = generate_pdf_report.delay(req.user_id)
return {"task_id": task.id}
@app.get("/report/{task_id}")
async def get_report(task_id: str):
"""
取得 PDF 報表的二進位結果(若已完成)
"""
result = AsyncResult(task_id, app=celery_app)
if not result.ready():
raise HTTPException(status_code=202, detail="報表尚在產出中")
# 直接回傳二進位檔案
return {"pdf_bytes": result.get()}
@app.post("/thumbnail")
async def thumbnail(req: ImageRequest):
"""
上傳圖片後,背景產生縮圖
"""
task = create_thumbnail.delay(req.image_path)
return {"task_id": task.id}
@app.get("/task/{task_id}/status")
async def task_status(task_id: str):
"""
查詢任務狀態(PENDING, STARTED, SUCCESS, FAILURE)
"""
result = AsyncResult(task_id, app=celery_app)
return {"task_id": task_id, "status": result.status, "result": result.result}
說明:
delay()為apply_async()的簡寫,會立即把任務送到 broker。AsyncResult可用來查詢任務的狀態與結果。- 若任務尚未完成,回傳 202 Accepted,讓前端知道仍在處理中。
6. 啟動與測試
# 1. 啟動 Redis(假設已安裝 Docker)
docker run -d -p 6379:6379 redis:7
# 2. 啟動 Celery worker
celery -A celery_config.celery_app worker --loglevel=info
# 3. 啟動 FastAPI
uvicorn main:app --reload
常見陷阱與最佳實踐
| 陷阱 | 可能的後果 | 解決方案 / Best Practice |
|---|---|---|
| 任務序列化失敗(例如傳入不可序列化的物件) | 任務卡在 queue,無法執行 | 只傳遞 JSON 可序列化的資料(字串、數字、列表、字典)。必要時自行實作 json.dumps / json.loads。 |
| 長時間執行的任務未設定超時 | Worker 被卡住,導致佇列阻塞 | 使用 task_time_limit 與 task_soft_time_limit 參數限制執行時間。 |
| 結果儲存於 Redis 時容量不足 | 結果被截斷或丟失 | 定期清理過期結果,或改用 Database(PostgreSQL)作為 backend。 |
在開發環境直接使用 celery -A,忘記載入任務模組 |
Worker 啟動後找不到任務,報 NotRegistered 錯誤 |
確保 celery_app.autodiscover_tasks([...]) 包含所有任務所在的 Python package。 |
| 忽略重試次數與延遲設定 | 失敗任務持續重試導致資源耗盡 | 針對外部 API 設定合理的 max_retries 與 default_retry_delay,必要時加入指數退避。 |
| 使用同步的資料庫連線(如 SQLAlchemy)於 Celery 任務內未建立新 Session | 產生「已關閉的連線」錯誤 | 在每個任務內部 建立 與 關閉 資料庫 Session,或使用 @celery_app.task(bind=True) 取得 self.request 來管理連線。 |
其他最佳實踐
- 分離設定:將 Celery、FastAPI、資料庫的環境變數分別管理,避免在程式碼中硬編碼。
- 監控:使用 Flower(Celery 的 UI)或 Prometheus + Grafana 監控佇列長度、失敗率。
- 測試:利用
celery.app.task.Task.apply()直接在單元測試中執行任務,避免依賴 broker。 - 安全:若任務內含敏感資訊(如 API 金鑰),請使用 環境變數 或 Vault,不要直接放在訊息 payload 中。
- 版本相容:確保 FastAPI、Pydantic 與 Celery 的版本相容,尤其是 Python 3.11+ 時,有些舊版 Celery 仍未完全支援。
實際應用場景
| 場景 | 為何使用 Celery | 範例實作 |
|---|---|---|
| 使用者註冊 → 發送驗證信 | Email 送出可能需要數秒,若同步會拖慢 API 回應。 | send_verification_email.delay(email, token) |
| 大型報表產生(每日 10,000 筆交易) | 計算與 PDF 產生屬 CPU 密集,適合分散到多個 worker。 | generate_pdf_report.delay(user_id) |
| 圖片/影片處理(上傳後需轉碼、產生縮圖) | 需要大量 I/O 與 CPU,且可能失敗,需要重試。 | create_thumbnail.delay(path) |
| 第三方 API 整合(付款、物流) | 第三方服務可能暫時不可用,需重試或排程。 | unreliable_task.delay(payload) |
| 資料清理與批次匯出(每夜執行) | 這類工作不需要即時回應,適合排程(Celery Beat) | celery -A ... beat 配合 @periodic_task |
總結
- Celery 為 FastAPI 補足背景工作需求的利器,讓 API 能保持 快速回應,同時把耗時或不穩定的任務交給專門的 worker 處理。
- 透過 Redis(或 RabbitMQ)作為 broker,配合 Result Backend,我們可以輕鬆追蹤任務狀態、取得結果,甚至利用 重試與超時 機制提升系統韌性。
- 在實作時,務必注意 資料序列化、資源清理、監控 與 安全,避免常見的陷阱。
- 只要遵循本文的 最佳實踐,從簡單的 Email 任務到複雜的批次報表、影像處理,都能在 FastAPI 專案中順利導入 Celery,提升開發效率與系統可擴充性。
祝你在 FastAPI + Celery 的旅程中玩得開心,打造出高效、可靠的 Web 服務!