FastAPI WebSocket:Broadcast 與協同應用
簡介
在即時互動的網路應用中,WebSocket 提供了 雙向、低延遲 的通訊管道,讓伺服器能即時推送訊息給多個客戶端。單純的點對點連線已能滿足聊天或即時通知的需求,但在 多人協同(如多人編輯、即時看板、遊戲大廳)情境下,我們常需要把同一筆訊息 廣播 給所有已連線的使用者,或是根據房間、主題分類推送。
FastAPI 內建對 WebSocket 的支援,加上 starlette.websockets 的 WebSocket 物件與 Broadcast 套件,我們可以輕鬆實作 高效能、可擴充 的協同系統。本文將從核心概念出發,示範多個實用範例,並說明常見陷阱與最佳實踐,讓你快速上手並在專案中落地。
核心概念
1. WebSocket 連線的生命週期
FastAPI 透過 @app.websocket("/path") 裝飾器定義 WebSocket 端點。連線建立後會得到一個 WebSocket 物件,必須先呼叫 await websocket.accept() 才能開始收發訊息。
@app.websocket("/ws")
async def ws_endpoint(websocket: WebSocket):
await websocket.accept() # 接受連線
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"回音: {data}")
except WebSocketDisconnect:
print("用戶斷線")
- 接受 (accept):告訴瀏覽器連線已建立。
- 接收 (receive_text / receive_bytes):從客戶端取得訊息。
- 發送 (send_text / send_bytes):回傳訊息給單一客戶端。
- 斷線 (WebSocketDisconnect):例外處理,用於清理資源或從群組中移除。
2. 為什麼需要 Broadcast
單一 WebSocket 只能跟 一個 客戶端通訊。若想把訊息同步給多個使用者,就必須在伺服器端維護一個 連線集合(list、set、dict),或使用更完整的 Broadcast 系統。
FastAPI 官方建議使用 fastapi_contrib.broadcast(基於 aioredis)或 starlette.endpoints.WebSocketEndpoint 內建的 Broadcast 物件,以支援:
- 跨實例(多容器) 的訊息同步
- 主題/房間 分類(Pub/Sub)
- 持久化(Redis)與 自動重連
3. Broadcast 基本使用
以下示範如何在 FastAPI 中結合 Broadcast(以 aioredis 為後端):
# broadcast.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi_contrib.broadcast import Broadcast
app = FastAPI()
broadcaster = Broadcast("redis://localhost:6379") # 使用 Redis 作為訊息中介
@app.on_event("startup")
async def startup():
await broadcaster.connect() # 啟動連線
@app.on_event("shutdown")
async def shutdown():
await broadcaster.disconnect() # 關閉連線
4. 房間(Topic)概念
Broadcast 允許 發布(publish) 與 訂閱(subscribe) 任意字串主題。常見做法是以「房間 ID」或「類別」作為主題名稱,客戶端只會收到自己訂閱的訊息。
@app.websocket("/ws/{room_id}")
async def websocket_room(websocket: WebSocket, room_id: str):
await websocket.accept()
# 讓此連線訂閱指定房間的訊息
async with broadcaster.subscribe(room_id) as subscriber:
# 同時監聽客戶端訊息與廣播訊息
async def receive_from_client():
while True:
data = await websocket.receive_text()
# 把收到的訊息廣播給同房間的所有人
await broadcaster.publish(room_id, data)
async def forward_broadcast():
async for message in subscriber:
await websocket.send_text(message)
# 同時執行兩個協程
await asyncio.gather(receive_from_client(), forward_broadcast())
程式碼範例
以下提供 五 個實作範例,涵蓋從最簡單的廣播到進階的協同編輯機制。每段程式碼均附有說明註解,方便直接貼到專案中測試。
範例 1:單純的全域廣播(所有連線都收到)
# main.py
import asyncio
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi_contrib.broadcast import Broadcast
app = FastAPI()
broadcaster = Broadcast("redis://localhost:6379")
@app.on_event("startup")
async def startup():
await broadcaster.connect()
@app.on_event("shutdown")
async def shutdown():
await broadcaster.disconnect()
# 用一個全域集合保存所有 WebSocket 連線(備用方案)
connections: set[WebSocket] = set()
@app.websocket("/ws/broadcast")
async def ws_broadcast(ws: WebSocket):
await ws.accept()
connections.add(ws)
try:
while True:
data = await ws.receive_text()
# 收到訊息後,同步推送給 **所有** 連線
await broadcaster.publish("global", data)
except WebSocketDisconnect:
connections.remove(ws)
# 另一個協程負責把全域訊息轉送給每個已連線的客戶端
@app.on_event("startup")
async def start_global_forwarder():
async def forward():
async with broadcaster.subscribe("global") as sub:
async for msg in sub:
# 同步發送給每個客戶端
await asyncio.gather(*[c.send_text(msg) for c in connections])
asyncio.create_task(forward())
重點:使用
Broadcast的publish/subscribe,即使有多個 FastAPI 實例(例如在 Kubernetes 中水平擴展),訊息仍會跨容器同步。
範例 2:房間式聊天(依房間分發)
@app.websocket("/ws/chat/{room}")
async def ws_chat(ws: WebSocket, room: str):
await ws.accept()
async with broadcaster.subscribe(room) as sub:
async def recv():
while True:
txt = await ws.receive_text()
await broadcaster.publish(room, txt) # 發布到同房間
async def send():
async for broadcast_msg in sub:
await ws.send_text(broadcast_msg) # 轉發給此連線
await asyncio.gather(recv(), send())
技巧:
room直接作為 Redis 的 channel 名稱,避免自行管理字典映射,簡化程式碼。
範例 3:多人協同白板(座標同步)
# 前端會送出 JSON:{"x":120,"y":45,"color":"#ff0000"}
import json
@app.websocket("/ws/whiteboard/{room}")
async def ws_whiteboard(ws: WebSocket, room: str):
await ws.accept()
async with broadcaster.subscribe(f"board:{room}") as sub:
async def recv():
while True:
payload = await ws.receive_text()
# 直接將 JSON 轉發,不做額外處理
await broadcaster.publish(f"board:{room}", payload)
async def send():
async for msg in sub:
await ws.send_text(msg) # 前端自行解析 JSON
await asyncio.gather(recv(), send())
實務:白板資料量不大,直接以文字(JSON)傳遞即可;若需要大量筆畫,可考慮 二進位 (
receive_bytes)。
範例 4:即時多人文字編輯(OT / CRDT 基礎)
此範例僅示範 訊息分發,實際的衝突解決(OT/CRDT)需在前端或額外的服務中完成。
@app.websocket("/ws/editor/{doc_id}")
async def ws_editor(ws: WebSocket, doc_id: str):
await ws.accept()
topic = f"doc:{doc_id}"
async with broadcaster.subscribe(topic) as sub:
async def recv():
while True:
op = await ws.receive_text() # 例如: {"type":"ins","pos":5,"char":"a"}
await broadcaster.publish(topic, op) # 廣播操作
async def send():
async for op in sub:
await ws.send_text(op) # 前端套用操作
await asyncio.gather(recv(), send())
說明:每一次編輯動作都被視為「操作 (operation)」發布,所有使用者都會收到並套用,確保文件內容保持同步。
範例 5:遊戲大廳與即時匹配
# 玩家連入後會自動加入 "lobby" 主題,伺服器根據人數自動建立房間
@app.websocket("/ws/lobby")
async def ws_lobby(ws: WebSocket):
await ws.accept()
async with broadcaster.subscribe("lobby") as sub:
async def recv():
while True:
msg = await ws.receive_text()
# 例如: {"action":"join","player":"Alice"}
await broadcaster.publish("lobby", msg)
async def send():
async for broadcast_msg in sub:
await ws.send_text(broadcast_msg)
await asyncio.gather(recv(), send())
延伸:在
recv中加入匹配演算法,當滿足人數條件時publish("room:123", ...),玩家端會切換到新房間的 WebSocket。
常見陷阱與最佳實踐
| 陷阱 | 可能的症狀 | 解決方式 |
|---|---|---|
| 未正確關閉 Redis 連線 | 應用關閉後仍有背景任務卡住,部署失敗 | 在 shutdown 事件中必須 await broadcaster.disconnect() |
| 大量客戶端同時訂閱同一主題 | Redis 訊息佇列爆炸,延遲升高 | 考慮 分片(把房間數量拆成多個主題),或使用 Redis Cluster |
| WebSocket 例外未捕獲 | 伺服器意外崩潰,其他連線被踢掉 | 使用 try/except WebSocketDisconnect 包住 receive_* 迴圈,並在 finally 中清理資源 |
| 訊息大小過大 | 超過預設的 1 MB 限制,導致 MessageTooLarge 錯誤 |
若確實需要大檔案,改用 二進位 (receive_bytes) 並在前端切片上傳 |
同時使用 await ws.send_text 與 await ws.send_bytes |
產生 RuntimeError: Cannot send data while another send is in progress |
確保一次只執行一個 send,可以使用 asyncio.Lock 包住發送區段 |
最佳實踐
- 使用
asyncio.gather同時處理接收與轉發,避免阻塞。 - 將房間名稱作為 Redis channel,不必自行維護字典,減少記憶體占用。
- 在
startup時一次性連接 Broadcast,避免每次請求都建立連線。 - 加入心跳機制(
await websocket.send_text('ping')),防止 NAT/防火牆斷線。 - 測試水平擴展:在本機跑兩個
uvicorn實例,確保訊息仍能跨實例同步。
實際應用場景
| 場景 | 為何適合使用 Broadcast | 範例實作 |
|---|---|---|
| 即時聊天室 | 多人同時收發訊息、需要房間隔離 | 範例 2 |
| 協同白板 / 圖形編輯 | 大量座標資料即時同步,且不需持久化 | 範例 3 |
| 多人文件編輯 | 操作序列(OT/CRDT)需要即時分發 | 範例 4 |
| 線上遊戲大廳 | 玩家匹配、房間切換、全局公告 | 範例 5 |
| 即時儀表板 / 數據看板 | 後端推送監控數據給前端,所有使用者同時更新 | 可改寫範例 1 的全域廣播為「metrics」主題 |
總結
FastAPI 結合 WebSocket 與 Broadcast(Redis Pub/Sub),能在短時間內打造出 高效、可擴充、易維護 的即時協同系統。核心要點包括:
- 先接受連線 (
await websocket.accept()) 再開始收發。 - 使用 Broadcast 讓訊息跨容器同步,避免自行管理全域連線集合。
- 透過 主題(topic) 把不同房間、文件或遊戲大廳分離,保持訊息的有序與安全。
- 捕獲例外、正確關閉資源,並加入 心跳、鎖 等保護措施,提升穩定性。
- 針對不同應用(聊天、白板、編輯、遊戲)只需改變 topic 名稱 與 訊息格式,即可快速復用相同的廣播架構。
掌握上述概念與範例後,你就能在 FastAPI 中自信地實作各式即時協同功能,為使用者帶來流暢、即時的互動體驗。祝開發順利,玩得開心!