本文 AI 產出,尚未審核

FastAPI 教學:WebSocket + Redis Pub/Sub 實作


簡介

在即時互動的 Web 應用中,WebSocket 讓伺服器與用戶端可以保持雙向的持久連線,避免頻繁的 HTTP 請求所帶來的延遲與效能損耗。
然而,單純的 WebSocket 只適合點對點或同一服務實例內的訊息傳遞,當應用需要在多個 FastAPI 工作執行緒、容器,甚至跨機房同步即時資料時,Pub/Sub 機制就顯得必不可少。

Redis 作為輕量級的記憶體資料庫,內建的 publish/subscribe 功能不僅簡潔好用,還支援高併發的訊息分發。結合 FastAPI 的 WebSocket 與 Redis Pub/Sub,我們可以快速打造 即時聊天、即時通知、協同編輯 等多種場景的後端服務。

本篇文章將從概念說明、程式碼範例到最佳實踐,完整示範如何在 FastAPI 中實作 WebSocket + Redis Pub/Sub,讓你能立即在自己的專案中上手。


核心概念

1. WebSocket 與 FastAPI

FastAPI 透過 WebSocket 類別提供原生支援,只需要在路由中加入 websocket 參數,即可建立雙向通道。
關鍵點:

  • await websocket.accept() – 接受連線。
  • await websocket.send_text() / await websocket.receive_text() – 發送與接收文字訊息。
  • 連線關閉時會拋出 WebSocketDisconnect 例外,可用 try/except 捕捉。

2. Redis Pub/Sub 基礎

Redis 的 Pub/Sub 采用「頻道」的概念:

  • PUBLISH channel message – 發布訊息到指定頻道。
  • SUBSCRIBE channel – 訂閱頻道,收到的訊息會以迭代器形式回傳。

使用 aioredis(或 redis.asyncio)可在 asyncio 環境下非阻塞操作。

3. 為何要把兩者結合?

  • 水平擴展:多個 FastAPI 實例都可以訂閱同一 Redis 頻道,訊息自動同步。
  • 解耦:前端只與 WebSocket 互動,業務邏輯則透過 Redis 進行分發,降低耦合度。
  • 可靠性:Redis 本身支援持久化與叢集,讓即時訊息有更好的容錯能力。

程式碼範例

以下範例採用 Python 3.9+FastAPI 0.110redis-py 5.xredis.asyncio)與 uvicorn

為了說明完整流程,我們分成四個主要檔案:main.pyredis_client.pyws_router.pyfrontend.html

1️⃣ redis_client.py – 建立共享的 Redis 連線

# redis_client.py
import os
import redis.asyncio as redis

REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")

# 單例模式,讓整個應用共享同一個連線池
redis_pool = redis.ConnectionPool.from_url(REDIS_URL)

async def get_redis():
    """取得 Redis 客戶端(使用同一個連線池)"""
    return redis.Redis(connection_pool=redis_pool)

說明

  • 使用 ConnectionPool 可避免每次請求都建立新連線,提升效能。
  • get_redis 為非同步函式,方便在 FastAPI 的依賴注入中使用。

2️⃣ ws_router.py – WebSocket 路由與 Redis 訂閱協程

# ws_router.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from typing import List
import asyncio
from .redis_client import get_redis

router = APIRouter()
# 用來保存所有已連線的 WebSocket,方便廣播
connections: List[WebSocket] = []


async def redis_subscriber(channel: str):
    """背景任務:持續從 Redis 訂閱訊息,並廣播給所有 WebSocket"""
    redis = await get_redis()
    pubsub = redis.pubsub()
    await pubsub.subscribe(channel)

    async for message in pubsub.listen():
        if message["type"] != "message":
            continue
        data = message["data"].decode()
        # 廣播給所有連線
        await broadcast(data)


async def broadcast(message: str):
    """將訊息送給每一個已連線的 client"""
    dead_conns = []
    for ws in connections:
        try:
            await ws.send_text(message)
        except Exception:
            # 可能已斷線,稍後清除
            dead_conns.append(ws)
    for ws in dead_conns:
        connections.remove(ws)


@router.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
    await websocket.accept()
    connections.append(websocket)

    # 若是第一次進入此房間,啟動 Redis 訂閱任務
    if len([c for c in connections if c != websocket]) == 0:
        asyncio.create_task(redis_subscriber(f"room:{room}"))

    try:
        while True:
            # 接收來自前端的訊息,直接 publish 到 Redis
            data = await websocket.receive_text()
            redis = await get_redis()
            await redis.publish(f"room:{room}", data)
    except WebSocketDisconnect:
        connections.remove(websocket)

說明

  • redis_subscriber背景任務asyncio.create_task)持續監聽 Redis 訊息,收到後呼叫 broadcast
  • broadcast 會遍歷 connections,將訊息推送給每個 WebSocket,並在例外時清除斷線的連線。
  • 每個房間使用 room:{room} 作為 Redis 頻道名稱,確保訊息不會混雜。

3️⃣ main.py – 組合 FastAPI 應用

# main.py
from fastapi import FastAPI
from .ws_router import router as ws_router
from fastapi.staticfiles import StaticFiles

app = FastAPI(title="FastAPI WebSocket + Redis Pub/Sub Demo")

# 將 WebSocket 路由掛載
app.include_router(ws_router)

# 提供簡易的前端測試頁面
app.mount("/", StaticFiles(directory="static", html=True), name="static")

說明

  • StaticFiles 用來提供前端測試頁面,讓我們可以直接在瀏覽器驗證功能。

4️⃣ static/frontend.html – 前端 JavaScript 客戶端

<!DOCTYPE html>
<html lang="zh-TW">
<head>
    <meta charset="UTF-8">
    <title>WebSocket + Redis Demo</title>
    <style>
        body { font-family:Arial, sans-serif; margin:20px; }
        #log { height:300px; overflow:auto; border:1px solid #ccc; padding:10px; }
    </style>
</head>
<body>
    <h2>即時聊天室(Room: <span id="roomName"></span>)</h2>
    <div id="log"></div>
    <input id="msg" type="text" placeholder="輸入訊息" style="width:80%">
    <button id="sendBtn">送出</button>

    <script>
        const urlParams = new URLSearchParams(location.search);
        const room = urlParams.get('room') || 'default';
        document.getElementById('roomName').textContent = room;

        const ws = new WebSocket(`ws://${location.host}/ws/${room}`);

        ws.onmessage = (event) => {
            const log = document.getElementById('log');
            const p = document.createElement('p');
            p.textContent = event.data;
            log.appendChild(p);
            log.scrollTop = log.scrollHeight;
        };

        ws.onopen = () => console.log('WebSocket 已連線');
        ws.onclose = () => console.log('WebSocket 已關閉');

        document.getElementById('sendBtn').onclick = () => {
            const input = document.getElementById('msg');
            if (input.value) {
                ws.send(input.value);
                input.value = '';
            }
        };
    </script>
</body>
</html>

說明

  • 透過 new WebSocket(...) 直接連到 FastAPI 提供的 /ws/{room}
  • 收到訊息時即寫入畫面,送出訊息則透過 ws.send(),後端會自動把它 publish 到 Redis,其他連線再收到。

5️⃣ 執行與測試

# 1. 安裝套件
pip install fastapi uvicorn redis[asyncio]

# 2. 啟動 Redis(Docker 範例)
docker run -d -p 6379:6379 redis:7

# 3. 啟動 FastAPI 伺服器
uvicorn app.main:app --reload

打開兩個瀏覽器分別指向 http://localhost:8000/?room=room1,在任一視窗輸入訊息,另一個視窗會即時顯示,證明 WebSocket + Redis Pub/Sub 已成功協同運作。


常見陷阱與最佳實踐

陷阱 說明 最佳實踐
Redis 連線未重用 每次 publish/subscriber 都建立新連線,會導致大量 FD 被耗盡。 使用 ConnectionPool(如 redis_client.py)讓所有協程共享同一池。
忘記關閉 Pub/Sub 背景任務結束時沒有 await pubsub.unsubscribe(),會留下「幽靈」訂閱。 在應用關閉或房間無人時,呼叫 pubsub.unsubscribe()await pubsub.close()
WebSocket 例外未捕捉 客戶端斷線時 await ws.send_text() 會拋出例外,若未處理會導致整個協程崩潰。 broadcast 中捕獲例外,並從 connections 中移除失效連線。
頻道名稱衝突 使用過於簡單的頻道名稱(如 chat)在多個房間間會混雜訊息。 結合 命名空間(例如 room:{room_id})或 UUID 產生唯一頻道。
訊息序列化問題 直接傳送 Python 物件會因為 bytes 編碼錯誤而失敗。 使用 JSONjson.dumps / json.loads)或 MessagePack 進行序列化。

其他最佳實踐

  1. 心跳機制(Ping/Pong)

    • 設定 websocket.send_json({"type":"ping"}),若長時間未回應則關閉連線,避免「僵屍」客戶端佔用資源。
  2. 限速與防刷

    • websocket_endpoint 中加入簡易的 rate‑limit(如每秒最多 5 條訊息),防止單一客戶端濫發訊息造成 Redis 負載。
  3. 使用 Redis Cluster

    • 當流量大於單機可承載時,切換至 Redis Cluster,只需要把 REDIS_URL 改為叢集入口即可,程式碼不變。
  4. 結構化訊息

    • 建議傳遞 { "user": "...", "msg": "...", "ts": 1721234567 },方便前端顯示時間戳與使用者資訊。

實際應用場景

場景 為何適合使用 WebSocket + Redis Pub/Sub
即時聊天室 多個伺服器實例需要同步訊息,Redis 作為訊息中心,WebSocket 提供低延遲傳輸。
即時通知系統(例如訂單狀態、系統警報) 後端服務只需 publish 訊息,所有連線的前端即時收到,無需輪詢。
協同編輯(文字或圖形) 每次編輯變更 publish 到頻道,所有編輯者的瀏覽器即時收到更新。
遊戲即時狀態同步 遊戲伺服器透過 Redis 分發玩家位置、動作等,WebSocket 保證即時性。
股票/加密貨幣即時行情 後端行情來源 publish 價格變動,前端使用 WebSocket 即時渲染圖表。

總結

  • WebSocket 為前端提供持久、雙向的即時通道;
  • Redis Pub/Sub 則是跨實例、跨容器的訊息分發中心,兩者結合可打造 高效、可水平擴展 的即時應用。

本文示範了:

  1. 建立共享的 Redis 連線池。
  2. 使用 asyncio.create_task 在背景持續訂閱 Redis 頻道。
  3. 把收到的訊息廣播給所有已連線的 WebSocket。
  4. 前端透過簡潔的 JavaScript 完成即時聊天體驗。

在實務開發中,務必注意 連線管理、例外處理、頻道命名與訊息序列化,並根據需求加入 心跳、限速、叢集 等機制。掌握這些要點後,你就能在 FastAPI 上快速構建各式即時服務,為使用者帶來流暢的互動體驗。祝開發順利!