本文 AI 產出,尚未審核

FastAPI WebSocket:Broadcast 與協同應用

簡介

在即時互動的網路應用中,WebSocket 提供了 雙向、低延遲 的通訊管道,讓伺服器能即時推送訊息給多個客戶端。單純的點對點連線已能滿足聊天或即時通知的需求,但在 多人協同(如多人編輯、即時看板、遊戲大廳)情境下,我們常需要把同一筆訊息 廣播 給所有已連線的使用者,或是根據房間、主題分類推送。
FastAPI 內建對 WebSocket 的支援,加上 starlette.websocketsWebSocket 物件與 Broadcast 套件,我們可以輕鬆實作 高效能、可擴充 的協同系統。本文將從核心概念出發,示範多個實用範例,並說明常見陷阱與最佳實踐,讓你快速上手並在專案中落地。


核心概念

1. WebSocket 連線的生命週期

FastAPI 透過 @app.websocket("/path") 裝飾器定義 WebSocket 端點。連線建立後會得到一個 WebSocket 物件,必須先呼叫 await websocket.accept() 才能開始收發訊息。

@app.websocket("/ws")
async def ws_endpoint(websocket: WebSocket):
    await websocket.accept()          # 接受連線
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"回音: {data}")
    except WebSocketDisconnect:
        print("用戶斷線")
  • 接受 (accept):告訴瀏覽器連線已建立。
  • 接收 (receive_text / receive_bytes):從客戶端取得訊息。
  • 發送 (send_text / send_bytes):回傳訊息給單一客戶端。
  • 斷線 (WebSocketDisconnect):例外處理,用於清理資源或從群組中移除。

2. 為什麼需要 Broadcast

單一 WebSocket 只能跟 一個 客戶端通訊。若想把訊息同步給多個使用者,就必須在伺服器端維護一個 連線集合(list、set、dict),或使用更完整的 Broadcast 系統。
FastAPI 官方建議使用 fastapi_contrib.broadcast(基於 aioredis)或 starlette.endpoints.WebSocketEndpoint 內建的 Broadcast 物件,以支援:

  • 跨實例(多容器) 的訊息同步
  • 主題/房間 分類(Pub/Sub)
  • 持久化(Redis)與 自動重連

3. Broadcast 基本使用

以下示範如何在 FastAPI 中結合 Broadcast(以 aioredis 為後端):

# broadcast.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi_contrib.broadcast import Broadcast

app = FastAPI()
broadcaster = Broadcast("redis://localhost:6379")   # 使用 Redis 作為訊息中介

@app.on_event("startup")
async def startup():
    await broadcaster.connect()                     # 啟動連線

@app.on_event("shutdown")
async def shutdown():
    await broadcaster.disconnect()                  # 關閉連線

4. 房間(Topic)概念

Broadcast 允許 發布(publish)訂閱(subscribe) 任意字串主題。常見做法是以「房間 ID」或「類別」作為主題名稱,客戶端只會收到自己訂閱的訊息。

@app.websocket("/ws/{room_id}")
async def websocket_room(websocket: WebSocket, room_id: str):
    await websocket.accept()
    # 讓此連線訂閱指定房間的訊息
    async with broadcaster.subscribe(room_id) as subscriber:
        # 同時監聽客戶端訊息與廣播訊息
        async def receive_from_client():
            while True:
                data = await websocket.receive_text()
                # 把收到的訊息廣播給同房間的所有人
                await broadcaster.publish(room_id, data)

        async def forward_broadcast():
            async for message in subscriber:
                await websocket.send_text(message)

        # 同時執行兩個協程
        await asyncio.gather(receive_from_client(), forward_broadcast())

程式碼範例

以下提供 個實作範例,涵蓋從最簡單的廣播到進階的協同編輯機制。每段程式碼均附有說明註解,方便直接貼到專案中測試。

範例 1:單純的全域廣播(所有連線都收到)

# main.py
import asyncio
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi_contrib.broadcast import Broadcast

app = FastAPI()
broadcaster = Broadcast("redis://localhost:6379")

@app.on_event("startup")
async def startup():
    await broadcaster.connect()

@app.on_event("shutdown")
async def shutdown():
    await broadcaster.disconnect()

# 用一個全域集合保存所有 WebSocket 連線(備用方案)
connections: set[WebSocket] = set()

@app.websocket("/ws/broadcast")
async def ws_broadcast(ws: WebSocket):
    await ws.accept()
    connections.add(ws)
    try:
        while True:
            data = await ws.receive_text()
            # 收到訊息後,同步推送給 **所有** 連線
            await broadcaster.publish("global", data)
    except WebSocketDisconnect:
        connections.remove(ws)

# 另一個協程負責把全域訊息轉送給每個已連線的客戶端
@app.on_event("startup")
async def start_global_forwarder():
    async def forward():
        async with broadcaster.subscribe("global") as sub:
            async for msg in sub:
                # 同步發送給每個客戶端
                await asyncio.gather(*[c.send_text(msg) for c in connections])
    asyncio.create_task(forward())

重點:使用 Broadcastpublish/subscribe,即使有多個 FastAPI 實例(例如在 Kubernetes 中水平擴展),訊息仍會跨容器同步。


範例 2:房間式聊天(依房間分發)

@app.websocket("/ws/chat/{room}")
async def ws_chat(ws: WebSocket, room: str):
    await ws.accept()
    async with broadcaster.subscribe(room) as sub:
        async def recv():
            while True:
                txt = await ws.receive_text()
                await broadcaster.publish(room, txt)   # 發布到同房間

        async def send():
            async for broadcast_msg in sub:
                await ws.send_text(broadcast_msg)      # 轉發給此連線

        await asyncio.gather(recv(), send())

技巧room 直接作為 Redis 的 channel 名稱,避免自行管理字典映射,簡化程式碼。


範例 3:多人協同白板(座標同步)

# 前端會送出 JSON:{"x":120,"y":45,"color":"#ff0000"}
import json

@app.websocket("/ws/whiteboard/{room}")
async def ws_whiteboard(ws: WebSocket, room: str):
    await ws.accept()
    async with broadcaster.subscribe(f"board:{room}") as sub:
        async def recv():
            while True:
                payload = await ws.receive_text()
                # 直接將 JSON 轉發,不做額外處理
                await broadcaster.publish(f"board:{room}", payload)

        async def send():
            async for msg in sub:
                await ws.send_text(msg)   # 前端自行解析 JSON

        await asyncio.gather(recv(), send())

實務:白板資料量不大,直接以文字(JSON)傳遞即可;若需要大量筆畫,可考慮 二進位 (receive_bytes)。


範例 4:即時多人文字編輯(OT / CRDT 基礎)

此範例僅示範 訊息分發,實際的衝突解決(OT/CRDT)需在前端或額外的服務中完成。

@app.websocket("/ws/editor/{doc_id}")
async def ws_editor(ws: WebSocket, doc_id: str):
    await ws.accept()
    topic = f"doc:{doc_id}"
    async with broadcaster.subscribe(topic) as sub:
        async def recv():
            while True:
                op = await ws.receive_text()          # 例如: {"type":"ins","pos":5,"char":"a"}
                await broadcaster.publish(topic, op)   # 廣播操作

        async def send():
            async for op in sub:
                await ws.send_text(op)                # 前端套用操作

        await asyncio.gather(recv(), send())

說明:每一次編輯動作都被視為「操作 (operation)」發布,所有使用者都會收到並套用,確保文件內容保持同步。


範例 5:遊戲大廳與即時匹配

# 玩家連入後會自動加入 "lobby" 主題,伺服器根據人數自動建立房間
@app.websocket("/ws/lobby")
async def ws_lobby(ws: WebSocket):
    await ws.accept()
    async with broadcaster.subscribe("lobby") as sub:
        async def recv():
            while True:
                msg = await ws.receive_text()
                # 例如: {"action":"join","player":"Alice"}
                await broadcaster.publish("lobby", msg)

        async def send():
            async for broadcast_msg in sub:
                await ws.send_text(broadcast_msg)

        await asyncio.gather(recv(), send())

延伸:在 recv 中加入匹配演算法,當滿足人數條件時 publish("room:123", ...),玩家端會切換到新房間的 WebSocket。


常見陷阱與最佳實踐

陷阱 可能的症狀 解決方式
未正確關閉 Redis 連線 應用關閉後仍有背景任務卡住,部署失敗 shutdown 事件中必須 await broadcaster.disconnect()
大量客戶端同時訂閱同一主題 Redis 訊息佇列爆炸,延遲升高 考慮 分片(把房間數量拆成多個主題),或使用 Redis Cluster
WebSocket 例外未捕獲 伺服器意外崩潰,其他連線被踢掉 使用 try/except WebSocketDisconnect 包住 receive_* 迴圈,並在 finally 中清理資源
訊息大小過大 超過預設的 1 MB 限制,導致 MessageTooLarge 錯誤 若確實需要大檔案,改用 二進位 (receive_bytes) 並在前端切片上傳
同時使用 await ws.send_textawait ws.send_bytes 產生 RuntimeError: Cannot send data while another send is in progress 確保一次只執行一個 send,可以使用 asyncio.Lock 包住發送區段

最佳實踐

  1. 使用 asyncio.gather 同時處理接收與轉發,避免阻塞。
  2. 將房間名稱作為 Redis channel,不必自行維護字典,減少記憶體占用。
  3. startup 時一次性連接 Broadcast,避免每次請求都建立連線。
  4. 加入心跳機制await websocket.send_text('ping')),防止 NAT/防火牆斷線。
  5. 測試水平擴展:在本機跑兩個 uvicorn 實例,確保訊息仍能跨實例同步。

實際應用場景

場景 為何適合使用 Broadcast 範例實作
即時聊天室 多人同時收發訊息、需要房間隔離 範例 2
協同白板 / 圖形編輯 大量座標資料即時同步,且不需持久化 範例 3
多人文件編輯 操作序列(OT/CRDT)需要即時分發 範例 4
線上遊戲大廳 玩家匹配、房間切換、全局公告 範例 5
即時儀表板 / 數據看板 後端推送監控數據給前端,所有使用者同時更新 可改寫範例 1 的全域廣播為「metrics」主題

總結

FastAPI 結合 WebSocketBroadcast(Redis Pub/Sub),能在短時間內打造出 高效、可擴充、易維護 的即時協同系統。核心要點包括:

  • 先接受連線 (await websocket.accept()) 再開始收發。
  • 使用 Broadcast 讓訊息跨容器同步,避免自行管理全域連線集合。
  • 透過 主題(topic) 把不同房間、文件或遊戲大廳分離,保持訊息的有序與安全。
  • 捕獲例外、正確關閉資源,並加入 心跳 等保護措施,提升穩定性。
  • 針對不同應用(聊天、白板、編輯、遊戲)只需改變 topic 名稱訊息格式,即可快速復用相同的廣播架構。

掌握上述概念與範例後,你就能在 FastAPI 中自信地實作各式即時協同功能,為使用者帶來流暢、即時的互動體驗。祝開發順利,玩得開心!