FastAPI 與外部服務整合:RabbitMQ / Kafka 完整指南
簡介
在現代的微服務架構中,非同步訊息傳遞 已成為系統解耦、提升彈性與擴充性的關鍵技術。
RabbitMQ 與 Kafka 兩大訊息佇列(Message Queue)解決方案,各有不同的設計哲學與使用情境,卻都能與 FastAPI 無縫結合,讓 API 端點只負責接收請求、驗證參數,真正的業務處理則交給後端的訊息消費者(Consumer)去完成。
本篇文章將從 核心概念、實作範例、常見陷阱與最佳實踐,一路帶你完成 FastAPI 與 RabbitMQ / Kafka 的整合,並提供 實務應用場景,幫助初學者到中階開發者快速上手、打造可靠的非同步服務。
核心概念
1. 為什麼要在 FastAPI 中使用訊息佇列?
| 優點 | 說明 |
|---|---|
| 解耦 | API 只負責接收與回應,繁重的背景工作交給消費者處理,降低相互依賴。 |
| 彈性伸縮 | 消費者可以根據負載水平水平擴展,避免 API 端點因長時間處理而被阻塞。 |
| 容錯 | 訊息在佇列中持久化,即使服務暫時不可用,訊息也不會遺失。 |
| 流量削峰 | 高峰期間的請求可先寫入佇列,後端慢慢消費,避免突發流量衝擊資料庫或外部系統。 |
2. RabbitMQ vs Kafka:選擇依據
| 特性 | RabbitMQ | Kafka |
|---|---|---|
| 模型 | 典型的 AMQP(Advanced Message Queuing Protocol)模型,支援 Queue、Exchange、Routing Key 等概念。 | 基於 分散式日誌(Distributed Log),訊息以 Topic 為單位,保留時間可設定。 |
| 訊息保留 | 預設 即時消費,訊息在被 ACK 後即刪除;可設定持久化。 | 持久化 為預設行為,訊息依照保留期限或大小自動刪除。 |
| 吞吐量 | 中等,適合 可靠性 高於 速度 的場景。 | 高吞吐、低延遲,適合 大資料流(log、事件)處理。 |
| 順序保證 | 單一 Queue 可保證順序,但跨 Queue 需要自行處理。 | 同一 Partition 內保證順序,跨 Partition 需自行協調。 |
| 使用情境 | 任務佇列、工作流、RPC、即時通知。 | 日誌收集、事件驅動架構、資料流分析。 |
選擇提示:若你的系統需要 可靠的訊息投遞、複雜的路由規則,RabbitMQ 是較佳選擇;若你要處理 海量事件、持久化分析,則 Kafka 更適合。
3. FastAPI 與訊息佇列的整合方式
- 同步發佈(Publish):在 API 處理函式內直接呼叫佇列的
publish/send方法,等待回傳(通常是訊息 ID)。 - 背景任務(Background Tasks):利用 FastAPI 的
BackgroundTasks或asyncio.create_task,把發佈動作交給事件迴圈,讓 API 立即回應。 - 依賴注入(Dependency Injection):在
app層面建立佇列連線(Connection / Producer),並透過Depends注入到路由函式,確保資源可重用且易於測試。
下面的程式碼範例會逐步展示這三種方式的實作。
程式碼範例
本範例使用 Python 3.10+、FastAPI 0.109、aio-pika(RabbitMQ)以及 aiokafka(Kafka)套件。
若尚未安裝,可執行:
pip install fastapi uvicorn aio-pika aiokafka
1️⃣ 建立共用的佇列連線(Dependency)
# app/dependencies.py
import asyncio
from aio_pika import connect_robust, Message, ExchangeType
from aiokafka import AIOKafkaProducer
from fastapi import Depends
# -------------------- RabbitMQ --------------------
async def get_rabbitmq_connection():
"""
建立 *robust* 連線,支援自動重連與恢復。
"""
connection = await connect_robust("amqp://guest:guest@localhost/")
return connection
async def get_rabbitmq_channel(connection=Depends(get_rabbitmq_connection)):
"""
取得 channel,並宣告一個 direct exchange。
"""
channel = await connection.channel()
await channel.declare_exchange("fastapi_exchange", ExchangeType.DIRECT)
return channel
# -------------------- Kafka --------------------
async def get_kafka_producer():
"""
建立 Kafka Producer,使用 default broker.
"""
producer = AIOKafkaProducer(bootstrap_servers="localhost:9092")
await producer.start()
return producer
# 注意:在應用關閉時,需要手動關閉連線
async def close_kafka_producer(producer: AIOKafkaProducer = Depends(get_kafka_producer)):
await producer.stop()
重點:將連線建立為 非同步依賴,FastAPI 會在每次請求時注入同一個物件,減少資源浪費。
2️⃣ 同步發佈訊息(RabbitMQ)
# app/main.py
from fastapi import FastAPI, Depends, HTTPException
from aio_pika import Message
from .dependencies import get_rabbitmq_channel
app = FastAPI(title="FastAPI + RabbitMQ Demo")
@app.post("/tasks/rabbitmq")
async def create_task(payload: dict, channel=Depends(get_rabbitmq_channel)):
"""
接收到任務後,直接同步發佈到 RabbitMQ。
"""
try:
exchange = await channel.get_exchange("fastapi_exchange")
message = Message(body=str(payload).encode(), delivery_mode=2) # 持久化
await exchange.publish(message, routing_key="task_queue")
return {"status": "queued", "payload": payload}
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc))
此範例示範 即時回傳 訊息已寫入佇列,適合需要 確認投遞成功 的情境(例如金融交易)。
3️⃣ 背景任務發佈(RabbitMQ)
# app/main.py (續)
from fastapi import BackgroundTasks
@app.post("/tasks/rabbitmq-bg")
async def create_task_bg(payload: dict,
background_tasks: BackgroundTasks,
channel=Depends(get_rabbitmq_channel)):
"""
使用 FastAPI BackgroundTasks,讓 API 立即回應。
"""
async def publish():
exchange = await channel.get_exchange("fastapi_exchange")
msg = Message(body=str(payload).encode(), delivery_mode=2)
await exchange.publish(msg, routing_key="task_queue")
background_tasks.add_task(publish) # 加入背景任務
return {"status": "queued (bg)", "payload": payload}
此方式 不阻塞 請求流程,適合 大量寫入 或 不需要即時確認 的場景。
4️⃣ Kafka 同步發佈
# app/main.py (續)
from .dependencies import get_kafka_producer
from aiokafka.helpers import create_ssl_context
@app.post("/events/kafka")
async def publish_event(event: dict,
producer=Depends(get_kafka_producer)):
"""
把事件直接送到 Kafka topic。
"""
try:
await producer.send_and_wait(
topic="fastapi_events",
value=str(event).encode()
)
return {"status": "sent", "event": event}
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc))
說明:
send_and_wait會等待 broker 回覆 offset,確保訊息已成功寫入。
5️⃣ Kafka 背景任務 + 批次發送
# app/main.py (續)
from typing import List
@app.post("/events/kafka/batch")
async def publish_batch(events: List[dict],
background_tasks: BackgroundTasks,
producer=Depends(get_kafka_producer)):
"""
批次寫入 Kafka,使用背景任務提升效能。
"""
async def send_batch():
for ev in events:
await producer.send_and_wait(
topic="fastapi_events",
value=str(ev).encode()
)
background_tasks.add_task(send_batch)
return {"status": "batch queued", "count": len(events)}
此範例 示範 批次處理,在高頻寫入環境中可減少網路往返次數。
6️⃣ 消費者範例(RabbitMQ)
# worker/rabbitmq_worker.py
import asyncio
from aio_pika import connect_robust, IncomingMessage
async def on_message(message: IncomingMessage):
async with message.process():
print("🔔 Received:", message.body.decode())
# 這裡可以放入實際的業務處理,例如寫入 DB、呼叫外部 API
async def main():
connection = await connect_robust("amqp://guest:guest@localhost/")
channel = await connection.channel()
queue = await channel.declare_queue("task_queue", durable=True)
await queue.consume(on_message)
print("✅ RabbitMQ worker started, waiting for messages...")
await asyncio.Future() # keep running
if __name__ == "__main__":
asyncio.run(main())
7️⃣ 消費者範例(Kafka)
# worker/kafka_worker.py
import asyncio
from aiokafka import AIOKafkaConsumer
async def consume():
consumer = AIOKafkaConsumer(
"fastapi_events",
bootstrap_servers="localhost:9092",
group_id="fastapi_group",
enable_auto_commit=True,
auto_offset_reset="earliest"
)
await consumer.start()
try:
async for msg in consumer:
print(f"🔔 Kafka event [{msg.partition}] {msg.offset}: {msg.value.decode()}")
# 實際業務邏輯寫在這裡
finally:
await consumer.stop()
if __name__ == "__main__":
asyncio.run(consume())
提醒:Kafka 消費者必須設定 group_id,才能在多個實例間實現 負載平衡。
常見陷阱與最佳實踐
| 陷阱 | 可能的後果 | 解決方案 / 最佳實踐 |
|---|---|---|
| 連線未關閉 | 記憶體泄漏、資源枯竭 | 使用 FastAPI 的 lifespan 事件或依賴的 yield 方式,確保在關閉時 await connection.close()、await producer.stop() |
| 同步阻塞 | API 請求被卡住,導致 timeout | 盡量使用 非同步 client(aio-pika、aiokafka)並搭配 BackgroundTasks |
| 訊息遺失 | 消費失敗後訊息直接丟棄 | 設定 delivery_mode=2(RabbitMQ 持久化)或 Kafka 的 replication,並在消費端捕捉例外、使用 dead-letter queue |
| 重複投遞 | 消費端因未 ACK 重複處理,造成資料重複 | RabbitMQ 使用 manual ACK;Kafka 透過 idempotent producer(enable_idempotence=True)或在消費端實作 去重機制 |
| 過度使用單一佇列 | 單點瓶頸、無法水平擴展 | 為不同類型的任務 建立多個 queue / topic,或利用 routing key / partition 做負載分散 |
| 不恰當的序列化 | 文字化的字典不易解析、跨語言不兼容 | 建議使用 JSON、MessagePack 或 Protobuf,在 FastAPI 中可直接 jsonable_encoder,在 consumer 端再 json.loads |
其他最佳實踐
- 環境變數管理:將 RabbitMQ、Kafka 的連線資訊(host、port、user、password)放在
.env,使用pydantic.BaseSettings讀取。 - 健康檢查(Health Check):在 FastAPI 加入
/health端點,檢查佇列連線是否正常。 - 日誌與監控:使用 structlog 或 loguru 記錄訊息投遞與消費結果,並結合 Prometheus、Grafana 監控佇列深度與延遲。
- 測試策略:利用 pytest-asyncio + testcontainers(Docker)在 CI 中啟動臨時 RabbitMQ/Kafka,驗證 API 的發佈行為。
實際應用場景
| 場景 | 為何使用訊息佇列 | FastAPI + RabbitMQ | FastAPI + Kafka |
|---|---|---|---|
| 使用者上傳檔案 | 檔案上傳後需要做影像壓縮、病毒掃描等耗時工作 | 把檔案路徑與 meta 資訊發佈到 file_process_queue,消費者負責處理 |
若上傳量極大,可將事件寫入 Kafka,後端即時分析上傳趨勢 |
| 訂單交易 | 需要保證交易訊息不遺失,同時要即時通知其他服務(庫存、物流) | 使用 RabbitMQ 的 transactional 方式,確保每筆訂單只被消費一次 | 若需要 事件溯源(Event Sourcing),把每筆訂單事件寫入 Kafka,供後續分析 |
| 即時通知 | 多種渠道(Email、SMS、Push)需要同時推送 | RabbitMQ 的 fanout exchange 可一次投遞給多個消費者 | Kafka 的 multiple consumer groups 讓不同服務各自訂閱,同時保留歷史通知紀錄 |
| 日誌與監控 | 大量日誌需要即時聚合、分析 | 不太適合,RabbitMQ 會成為瓶頸 | Kafka 天然適合做 log pipeline,FastAPI 可把應用層面的事件寫入 Kafka,後端使用 Kafka Streams 或 Spark 處理 |
總結
- FastAPI 的非同步特性與 依賴注入 機制,使得與 RabbitMQ、Kafka 的整合變得簡潔且高效。
- 透過 同步發佈、背景任務、批次寫入 等不同策略,我們可以依需求在 即時性 與 吞吐量 之間取得平衡。
- 在實作過程中,連線管理、訊息持久化、錯誤重試 與 去重 是避免常見陷阱的關鍵。
- 最後,依照業務需求選擇合適的訊息平台:RabbitMQ 偏向可靠投遞與複雜路由,Kafka 則適合大規模事件流與長期保留。
掌握以上概念與實作範例,你就能在 FastAPI 專案中輕鬆引入訊息佇列,構建 高可用、可伸縮、易維護 的微服務系統。祝開發順利,打造出強韌的非同步應用! 🚀