FastAPI WebSocket 多連線管理(WebSocketManager)
簡介
WebSocket 為即時雙向通訊提供了低延遲、持久連線的能力,常被用於即時聊天、即時通知、多人協作等場景。當應用需要同時與多個客戶端保持連線時,如何有效管理這些連線、廣播訊息、以及安全地關閉連線 成為關鍵。
本單元將聚焦在 WebSocketManager——一個負責管理多條 WebSocket 連線的工具。透過它,我們可以把分散在不同路由、不同使用者的連線集中在一起,實現簡潔且可維護的即時功能。
在接下來的章節,我們會先說明核心概念與設計原則,接著提供多個實作範例,最後討論常見陷阱、最佳實踐與實務應用情境,幫助你從 入門 走向 可在生產環境使用 的完整解決方案。
核心概念
1. 為什麼需要 WebSocketManager
- 集中管理:不管是哪個路由或是哪個使用者,只要連上 WebSocket,都會被加入同一個管理器。
- 廣播與分組:管理器可以一次向全部客戶端或特定子集合(如房間、頻道)發送訊息。
- 資源回收:當連線斷開或發生例外時,管理器負責將失效的連線自集合中移除,避免記憶體泄漏。
2. 基本資料結構
最常見的實作是使用 set 或 dict 來保存 WebSocket 物件:
active_connections: Set[WebSocket] = set()
# 或者以使用者 ID 為鍵
active_connections: Dict[int, WebSocket] = {}
set的好處是 去重、操作簡潔,適合「全部廣播」的情況。dict則允許 依使用者或房間分組,方便對特定目標發訊。
3. 生命週期管理
WebSocket 的生命週期包含三個階段:
- 建立連線 –
await websocket.accept()後,把websocket加入管理器。 - 訊息收發 –
await websocket.receive_text()/await websocket.send_text()。 - 關閉連線 – 例外或正常斷線時,必須 從管理器移除,並呼叫
await websocket.close()。
4. 非同步安全
FastAPI 基於 Starlette,所有 WebSocket 操作皆為 非同步。若同時有多個協程存取同一個集合,最好使用 asyncio.Lock 來避免競爭條件。
lock = asyncio.Lock()
async with lock:
active_connections.add(websocket)
程式碼範例
以下示範 5 個實用範例,從最簡單的管理器到支援房間、心跳檢測與錯誤處理,全部以 FastAPI + Python 3.11 為基礎。
1️⃣ 基礎 WebSocketManager(全局廣播)
# manager.py
from typing import Set
from fastapi import WebSocket
import asyncio
class WebSocketManager:
def __init__(self):
self.active_connections: Set[WebSocket] = set()
self._lock = asyncio.Lock()
async def connect(self, websocket: WebSocket):
await websocket.accept()
async with self._lock:
self.active_connections.add(websocket)
async def disconnect(self, websocket: WebSocket):
async with self._lock:
self.active_connections.discard(websocket)
async def broadcast(self, message: str):
async with self._lock:
for connection in self.active_connections:
await connection.send_text(message)
# main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from manager import WebSocketManager
app = FastAPI()
manager = WebSocketManager()
@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
await manager.connect(ws)
try:
while True:
data = await ws.receive_text()
# 直接廣播收到的訊息
await manager.broadcast(f"Client said: {data}")
except WebSocketDisconnect:
await manager.disconnect(ws)
說明:
connect內先accept再加入集合。broadcast會逐一send_text,若有客戶端斷線會在except中被移除。
2️⃣ 依使用者 ID 管理(點對點)
# manager_user.py
from typing import Dict
from fastapi import WebSocket
import asyncio
class UserWebSocketManager:
def __init__(self):
self.connections: Dict[int, WebSocket] = {}
self._lock = asyncio.Lock()
async def connect(self, user_id: int, ws: WebSocket):
await ws.accept()
async with self._lock:
self.connections[user_id] = ws
async def disconnect(self, user_id: int):
async with self._lock:
ws = self.connections.pop(user_id, None)
if ws:
await ws.close()
async def send_personal_message(self, user_id: int, message: str):
async with self._lock:
ws = self.connections.get(user_id)
if ws:
await ws.send_text(message)
# main_user.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
from manager_user import UserWebSocketManager
app = FastAPI()
user_manager = UserWebSocketManager()
@app.websocket("/ws/user")
async def ws_user(ws: WebSocket, user_id: int = Query(...)):
await user_manager.connect(user_id, ws)
try:
while True:
data = await ws.receive_text()
# 只回傳給自己
await user_manager.send_personal_message(user_id, f"Echo: {data}")
except WebSocketDisconnect:
await user_manager.disconnect(user_id)
重點:使用
Query取得user_id,在實務上通常會改為 JWT 或 Session 驗證後取得。
3️⃣ 房間(Group)概念的管理
# manager_room.py
from typing import DefaultDict, Set
from fastapi import WebSocket
import asyncio
from collections import defaultdict
class RoomManager:
def __init__(self):
self.rooms: DefaultDict[str, Set[WebSocket]] = defaultdict(set)
self._lock = asyncio.Lock()
async def join(self, room_name: str, ws: WebSocket):
await ws.accept()
async with self._lock:
self.rooms[room_name].add(ws)
async def leave(self, room_name: str, ws: WebSocket):
async with self._lock:
self.rooms[room_name].discard(ws)
if not self.rooms[room_name]:
del self.rooms[room_name]
async def broadcast(self, room_name: str, message: str):
async with self._lock:
connections = self.rooms.get(room_name, set())
for ws in connections:
await ws.send_text(message)
# main_room.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
from manager_room import RoomManager
app = FastAPI()
room_manager = RoomManager()
@app.websocket("/ws/room")
async def ws_room(ws: WebSocket, room: str = Query(...)):
await room_manager.join(room, ws)
try:
while True:
data = await ws.receive_text()
# 把訊息廣播給同房間的其他人
await room_manager.broadcast(room, f"[{room}] {data}")
except WebSocketDisconnect:
await room_manager.leave(room, ws)
應用:聊天室、協作白板、即時遊戲等都可以直接使用此模式。
4️⃣ 心跳機制(Ping/Pong)避免斷線
# heartbeat.py
import asyncio
from fastapi import WebSocket
PING_INTERVAL = 15 # 秒
async def keepalive(ws: WebSocket):
"""持續發送 ping,若 client 未回應則關閉連線"""
try:
while True:
await ws.send_json({"type": "ping"})
await asyncio.sleep(PING_INTERVAL)
except Exception:
# 任意例外代表連線已斷開
await ws.close()
在 WebSocketManager 中加入:
async def connect(self, ws: WebSocket):
await ws.accept()
async with self._lock:
self.active_connections.add(ws)
# 啟動背景心跳任務
asyncio.create_task(keepalive(ws))
說明:前端收到
ping後應回傳pong,若超過一定次數未收到回應即可視為斷線,讓服務端主動釋放資源。
5️⃣ 完整範例:結合驗證、分組與錯誤處理
# full_manager.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt, asyncio
from typing import Dict, Set, DefaultDict
from collections import defaultdict
app = FastAPI()
bearer = HTTPBearer()
SECRET = "super-secret-key"
class ChatManager:
def __init__(self):
self.user_ws: Dict[int, WebSocket] = {}
self.rooms: DefaultDict[str, Set[WebSocket]] = defaultdict(set)
self._lock = asyncio.Lock()
async def _get_user(self, token: str) -> int:
try:
payload = jwt.decode(token, SECRET, algorithms=["HS256"])
return int(payload["sub"])
except jwt.PyJWTError:
raise HTTPException(status_code=403, detail="Invalid token")
async def connect(self, token: str, room: str, ws: WebSocket):
user_id = await self._get_user(token)
await ws.accept()
async with self._lock:
self.user_ws[user_id] = ws
self.rooms[room].add(ws)
# 心跳
asyncio.create_task(self._heartbeat(ws))
async def disconnect(self, token: str, room: str, ws: WebSocket):
user_id = await self._get_user(token)
async with self._lock:
self.user_ws.pop(user_id, None)
self.rooms[room].discard(ws)
if not self.rooms[room]:
del self.rooms[room]
async def broadcast(self, room: str, message: str):
async with self._lock:
for ws in self.rooms.get(room, []):
await ws.send_text(message)
async def _heartbeat(self, ws: WebSocket):
try:
while True:
await ws.send_json({"type":"ping"})
await asyncio.sleep(20)
except Exception:
await ws.close()
manager = ChatManager()
@app.websocket("/ws/chat")
async def chat_endpoint(ws: WebSocket,
token: str = Depends(bearer),
room: str = "general"):
await manager.connect(token.credentials, room, ws)
try:
while True:
data = await ws.receive_text()
await manager.broadcast(room, f"[{room}] {data}")
except WebSocketDisconnect:
await manager.disconnect(token.credentials, room, ws)
關鍵點:
- 使用 JWT 驗證取得
user_id,保證每條連線都有身份。rooms仍使用defaultdict[set],支援多房間同時存在。- 心跳機制與斷線清理寫在同一個類別,保持程式碼一致性。
常見陷阱與最佳實踐
| 陷阱 | 可能的後果 | 解決方式 |
|---|---|---|
未使用 await websocket.accept() |
客戶端永遠收到 101 握手失敗 | 必須在任何訊息收發前先 accept |
直接在迴圈內遍歷 set 且同時刪除 |
RuntimeError: Set changed size during iteration |
先複製 list(self.active_connections) 再逐一發送,或使用 asyncio.Lock 包住整段迭代 |
| 忘記在例外或斷線時移除連線 | 記憶體泄漏、廣播時產生 WebSocketException |
把 disconnect 包在 finally 或 except WebSocketDisconnect 中呼叫 |
| 大量連線時同步寫入 | 競爭條件導致部分連線遺失或重複加入 | 使用 asyncio.Lock 或 asyncio.Queue 做排程 |
| 前端未回傳 pong | 心跳機制失效,伺服器無法偵測斷線 | 前端實作 onmessage 檢測 type: ping,回傳 pong |
| 將大量業務邏輯放在 WebSocket 處理函式 | 影響單一協程的吞吐量,造成訊息延遲 | 把耗時任務交給背景工作者(如 Celery)或使用 asyncio.create_task 非阻塞執行 |
最佳實踐
- 封裝管理器:把所有連線操作集中在一個 class,避免在路由裡散布
add/remove代碼。 - 使用型別提示:
Dict[int, WebSocket]、DefaultDict[str, Set[WebSocket]]能讓 IDE 與靜態分析工具提前捕捉錯誤。 - 心跳 + 超時:建議每 15~30 秒發一次 ping,若超過兩次未收到 pong,就主動關閉。
- 分離驗證與連線:先完成 JWT / OAuth2 驗證,再交給管理器,保持職責單一。
- 測試:利用
httpx.AsyncClient與WebSocketTestClient撰寫單元測試,確保connect、broadcast、disconnect行為正確。
實際應用場景
| 場景 | 為什麼需要多連線管理 | 建議的管理器類型 |
|---|---|---|
| 即時聊天系統 | 使用者會同時加入多個聊天室,訊息必須只送到該聊天室的成員 | RoomManager(分組) |
| 即時通知平台(如金融報價) | 數千甚至數萬客戶端同時接收同樣的資料流 | 基於 set 的全局 WebSocketManager + 心跳 |
| 多人協作白板 | 每個文件(白板)是一個房間,使用者可同時編輯多個文件 | RoomManager + UserWebSocketManager(可同時追蹤個人) |
| 線上遊戲伺服器 | 玩家分在不同房間/隊伍,需要頻繁廣播位置、狀態 | RoomManager + 頻率限制(每秒只廣播一次) |
| IoT 裝置監控 | 每台裝置維持一條長連線,伺服器需要即時推送指令 | UserWebSocketManager(以裝置 ID 為鍵) |
總結
- WebSocketManager 是在 FastAPI 中管理大量 WebSocket 連線的核心工具,能幫助我們實現 全局廣播、點對點訊息、房間分組 等多種即時需求。
- 透過 集合或字典 以及 非同步鎖,我們可以安全且高效地新增、刪除與遍歷連線。
- 心跳機制、驗證 與 錯誤處理 必不可少,缺一不可才能在生產環境中保持穩定。
- 本文提供了 5 個實作範例,從最簡單的全局廣播到結合 JWT 驗證與分組的完整解決方案,讀者可依需求自行取捨或組合。
只要遵循上述 最佳實踐,在 FastAPI 中使用 WebSocketManager 就能快速構建出 高效、可擴展且安全 的即時應用。祝開發順利,期待看到你用這套管理器打造的精彩即時服務!