FastAPI 教學:WebSocket + Redis Pub/Sub 實作
簡介
在即時互動的 Web 應用中,WebSocket 讓伺服器與用戶端可以保持雙向的持久連線,避免頻繁的 HTTP 請求所帶來的延遲與效能損耗。
然而,單純的 WebSocket 只適合點對點或同一服務實例內的訊息傳遞,當應用需要在多個 FastAPI 工作執行緒、容器,甚至跨機房同步即時資料時,Pub/Sub 機制就顯得必不可少。
Redis 作為輕量級的記憶體資料庫,內建的 publish/subscribe 功能不僅簡潔好用,還支援高併發的訊息分發。結合 FastAPI 的 WebSocket 與 Redis Pub/Sub,我們可以快速打造 即時聊天、即時通知、協同編輯 等多種場景的後端服務。
本篇文章將從概念說明、程式碼範例到最佳實踐,完整示範如何在 FastAPI 中實作 WebSocket + Redis Pub/Sub,讓你能立即在自己的專案中上手。
核心概念
1. WebSocket 與 FastAPI
FastAPI 透過 WebSocket 類別提供原生支援,只需要在路由中加入 websocket 參數,即可建立雙向通道。
關鍵點:
await websocket.accept()– 接受連線。await websocket.send_text()/await websocket.receive_text()– 發送與接收文字訊息。- 連線關閉時會拋出
WebSocketDisconnect例外,可用try/except捕捉。
2. Redis Pub/Sub 基礎
Redis 的 Pub/Sub 采用「頻道」的概念:
- PUBLISH channel message – 發布訊息到指定頻道。
- SUBSCRIBE channel – 訂閱頻道,收到的訊息會以迭代器形式回傳。
使用 aioredis(或 redis.asyncio)可在 asyncio 環境下非阻塞操作。
3. 為何要把兩者結合?
- 水平擴展:多個 FastAPI 實例都可以訂閱同一 Redis 頻道,訊息自動同步。
- 解耦:前端只與 WebSocket 互動,業務邏輯則透過 Redis 進行分發,降低耦合度。
- 可靠性:Redis 本身支援持久化與叢集,讓即時訊息有更好的容錯能力。
程式碼範例
以下範例採用 Python 3.9+、FastAPI 0.110、redis-py 5.x(redis.asyncio)與 uvicorn。
為了說明完整流程,我們分成四個主要檔案:
main.py、redis_client.py、ws_router.py、frontend.html。
1️⃣ redis_client.py – 建立共享的 Redis 連線
# redis_client.py
import os
import redis.asyncio as redis
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")
# 單例模式,讓整個應用共享同一個連線池
redis_pool = redis.ConnectionPool.from_url(REDIS_URL)
async def get_redis():
"""取得 Redis 客戶端(使用同一個連線池)"""
return redis.Redis(connection_pool=redis_pool)
說明:
- 使用 ConnectionPool 可避免每次請求都建立新連線,提升效能。
get_redis為非同步函式,方便在 FastAPI 的依賴注入中使用。
2️⃣ ws_router.py – WebSocket 路由與 Redis 訂閱協程
# ws_router.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from typing import List
import asyncio
from .redis_client import get_redis
router = APIRouter()
# 用來保存所有已連線的 WebSocket,方便廣播
connections: List[WebSocket] = []
async def redis_subscriber(channel: str):
"""背景任務:持續從 Redis 訂閱訊息,並廣播給所有 WebSocket"""
redis = await get_redis()
pubsub = redis.pubsub()
await pubsub.subscribe(channel)
async for message in pubsub.listen():
if message["type"] != "message":
continue
data = message["data"].decode()
# 廣播給所有連線
await broadcast(data)
async def broadcast(message: str):
"""將訊息送給每一個已連線的 client"""
dead_conns = []
for ws in connections:
try:
await ws.send_text(message)
except Exception:
# 可能已斷線,稍後清除
dead_conns.append(ws)
for ws in dead_conns:
connections.remove(ws)
@router.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
await websocket.accept()
connections.append(websocket)
# 若是第一次進入此房間,啟動 Redis 訂閱任務
if len([c for c in connections if c != websocket]) == 0:
asyncio.create_task(redis_subscriber(f"room:{room}"))
try:
while True:
# 接收來自前端的訊息,直接 publish 到 Redis
data = await websocket.receive_text()
redis = await get_redis()
await redis.publish(f"room:{room}", data)
except WebSocketDisconnect:
connections.remove(websocket)
說明:
redis_subscriber以 背景任務(asyncio.create_task)持續監聽 Redis 訊息,收到後呼叫broadcast。broadcast會遍歷connections,將訊息推送給每個 WebSocket,並在例外時清除斷線的連線。- 每個房間使用
room:{room}作為 Redis 頻道名稱,確保訊息不會混雜。
3️⃣ main.py – 組合 FastAPI 應用
# main.py
from fastapi import FastAPI
from .ws_router import router as ws_router
from fastapi.staticfiles import StaticFiles
app = FastAPI(title="FastAPI WebSocket + Redis Pub/Sub Demo")
# 將 WebSocket 路由掛載
app.include_router(ws_router)
# 提供簡易的前端測試頁面
app.mount("/", StaticFiles(directory="static", html=True), name="static")
說明:
StaticFiles用來提供前端測試頁面,讓我們可以直接在瀏覽器驗證功能。
4️⃣ static/frontend.html – 前端 JavaScript 客戶端
<!DOCTYPE html>
<html lang="zh-TW">
<head>
<meta charset="UTF-8">
<title>WebSocket + Redis Demo</title>
<style>
body { font-family:Arial, sans-serif; margin:20px; }
#log { height:300px; overflow:auto; border:1px solid #ccc; padding:10px; }
</style>
</head>
<body>
<h2>即時聊天室(Room: <span id="roomName"></span>)</h2>
<div id="log"></div>
<input id="msg" type="text" placeholder="輸入訊息" style="width:80%">
<button id="sendBtn">送出</button>
<script>
const urlParams = new URLSearchParams(location.search);
const room = urlParams.get('room') || 'default';
document.getElementById('roomName').textContent = room;
const ws = new WebSocket(`ws://${location.host}/ws/${room}`);
ws.onmessage = (event) => {
const log = document.getElementById('log');
const p = document.createElement('p');
p.textContent = event.data;
log.appendChild(p);
log.scrollTop = log.scrollHeight;
};
ws.onopen = () => console.log('WebSocket 已連線');
ws.onclose = () => console.log('WebSocket 已關閉');
document.getElementById('sendBtn').onclick = () => {
const input = document.getElementById('msg');
if (input.value) {
ws.send(input.value);
input.value = '';
}
};
</script>
</body>
</html>
說明:
- 透過
new WebSocket(...)直接連到 FastAPI 提供的/ws/{room}。 - 收到訊息時即寫入畫面,送出訊息則透過
ws.send(),後端會自動把它 publish 到 Redis,其他連線再收到。
5️⃣ 執行與測試
# 1. 安裝套件
pip install fastapi uvicorn redis[asyncio]
# 2. 啟動 Redis(Docker 範例)
docker run -d -p 6379:6379 redis:7
# 3. 啟動 FastAPI 伺服器
uvicorn app.main:app --reload
打開兩個瀏覽器分別指向 http://localhost:8000/?room=room1,在任一視窗輸入訊息,另一個視窗會即時顯示,證明 WebSocket + Redis Pub/Sub 已成功協同運作。
常見陷阱與最佳實踐
| 陷阱 | 說明 | 最佳實踐 |
|---|---|---|
| Redis 連線未重用 | 每次 publish/subscriber 都建立新連線,會導致大量 FD 被耗盡。 | 使用 ConnectionPool(如 redis_client.py)讓所有協程共享同一池。 |
| 忘記關閉 Pub/Sub | 背景任務結束時沒有 await pubsub.unsubscribe(),會留下「幽靈」訂閱。 |
在應用關閉或房間無人時,呼叫 pubsub.unsubscribe() 並 await pubsub.close()。 |
| WebSocket 例外未捕捉 | 客戶端斷線時 await ws.send_text() 會拋出例外,若未處理會導致整個協程崩潰。 |
在 broadcast 中捕獲例外,並從 connections 中移除失效連線。 |
| 頻道名稱衝突 | 使用過於簡單的頻道名稱(如 chat)在多個房間間會混雜訊息。 |
結合 命名空間(例如 room:{room_id})或 UUID 產生唯一頻道。 |
| 訊息序列化問題 | 直接傳送 Python 物件會因為 bytes 編碼錯誤而失敗。 |
使用 JSON(json.dumps / json.loads)或 MessagePack 進行序列化。 |
其他最佳實踐
心跳機制(Ping/Pong)
- 設定
websocket.send_json({"type":"ping"}),若長時間未回應則關閉連線,避免「僵屍」客戶端佔用資源。
- 設定
限速與防刷
- 在
websocket_endpoint中加入簡易的 rate‑limit(如每秒最多 5 條訊息),防止單一客戶端濫發訊息造成 Redis 負載。
- 在
使用 Redis Cluster
- 當流量大於單機可承載時,切換至 Redis Cluster,只需要把
REDIS_URL改為叢集入口即可,程式碼不變。
- 當流量大於單機可承載時,切換至 Redis Cluster,只需要把
結構化訊息
- 建議傳遞
{ "user": "...", "msg": "...", "ts": 1721234567 },方便前端顯示時間戳與使用者資訊。
- 建議傳遞
實際應用場景
| 場景 | 為何適合使用 WebSocket + Redis Pub/Sub |
|---|---|
| 即時聊天室 | 多個伺服器實例需要同步訊息,Redis 作為訊息中心,WebSocket 提供低延遲傳輸。 |
| 即時通知系統(例如訂單狀態、系統警報) | 後端服務只需 publish 訊息,所有連線的前端即時收到,無需輪詢。 |
| 協同編輯(文字或圖形) | 每次編輯變更 publish 到頻道,所有編輯者的瀏覽器即時收到更新。 |
| 遊戲即時狀態同步 | 遊戲伺服器透過 Redis 分發玩家位置、動作等,WebSocket 保證即時性。 |
| 股票/加密貨幣即時行情 | 後端行情來源 publish 價格變動,前端使用 WebSocket 即時渲染圖表。 |
總結
- WebSocket 為前端提供持久、雙向的即時通道;
- Redis Pub/Sub 則是跨實例、跨容器的訊息分發中心,兩者結合可打造 高效、可水平擴展 的即時應用。
本文示範了:
- 建立共享的 Redis 連線池。
- 使用
asyncio.create_task在背景持續訂閱 Redis 頻道。 - 把收到的訊息廣播給所有已連線的 WebSocket。
- 前端透過簡潔的 JavaScript 完成即時聊天體驗。
在實務開發中,務必注意 連線管理、例外處理、頻道命名與訊息序列化,並根據需求加入 心跳、限速、叢集 等機制。掌握這些要點後,你就能在 FastAPI 上快速構建各式即時服務,為使用者帶來流暢的互動體驗。祝開發順利!