本文 AI 產出,尚未審核

FastAPI WebSocket 多連線管理(WebSocketManager)

簡介

WebSocket 為即時雙向通訊提供了低延遲、持久連線的能力,常被用於即時聊天、即時通知、多人協作等場景。當應用需要同時與多個客戶端保持連線時,如何有效管理這些連線、廣播訊息、以及安全地關閉連線 成為關鍵。
本單元將聚焦在 WebSocketManager——一個負責管理多條 WebSocket 連線的工具。透過它,我們可以把分散在不同路由、不同使用者的連線集中在一起,實現簡潔且可維護的即時功能。

在接下來的章節,我們會先說明核心概念與設計原則,接著提供多個實作範例,最後討論常見陷阱、最佳實踐與實務應用情境,幫助你從 入門 走向 可在生產環境使用 的完整解決方案。


核心概念

1. 為什麼需要 WebSocketManager

  • 集中管理:不管是哪個路由或是哪個使用者,只要連上 WebSocket,都會被加入同一個管理器。
  • 廣播與分組:管理器可以一次向全部客戶端或特定子集合(如房間、頻道)發送訊息。
  • 資源回收:當連線斷開或發生例外時,管理器負責將失效的連線自集合中移除,避免記憶體泄漏。

2. 基本資料結構

最常見的實作是使用 setdict 來保存 WebSocket 物件:

active_connections: Set[WebSocket] = set()
# 或者以使用者 ID 為鍵
active_connections: Dict[int, WebSocket] = {}
  • set 的好處是 去重、操作簡潔,適合「全部廣播」的情況。
  • dict 則允許 依使用者或房間分組,方便對特定目標發訊。

3. 生命週期管理

WebSocket 的生命週期包含三個階段:

  1. 建立連線await websocket.accept() 後,把 websocket 加入管理器。
  2. 訊息收發await websocket.receive_text() / await websocket.send_text()
  3. 關閉連線 – 例外或正常斷線時,必須 從管理器移除,並呼叫 await websocket.close()

4. 非同步安全

FastAPI 基於 Starlette,所有 WebSocket 操作皆為 非同步。若同時有多個協程存取同一個集合,最好使用 asyncio.Lock 來避免競爭條件。

lock = asyncio.Lock()
async with lock:
    active_connections.add(websocket)

程式碼範例

以下示範 5 個實用範例,從最簡單的管理器到支援房間、心跳檢測與錯誤處理,全部以 FastAPI + Python 3.11 為基礎。

1️⃣ 基礎 WebSocketManager(全局廣播)

# manager.py
from typing import Set
from fastapi import WebSocket
import asyncio

class WebSocketManager:
    def __init__(self):
        self.active_connections: Set[WebSocket] = set()
        self._lock = asyncio.Lock()

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        async with self._lock:
            self.active_connections.add(websocket)

    async def disconnect(self, websocket: WebSocket):
        async with self._lock:
            self.active_connections.discard(websocket)

    async def broadcast(self, message: str):
        async with self._lock:
            for connection in self.active_connections:
                await connection.send_text(message)
# main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from manager import WebSocketManager

app = FastAPI()
manager = WebSocketManager()

@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
    await manager.connect(ws)
    try:
        while True:
            data = await ws.receive_text()
            # 直接廣播收到的訊息
            await manager.broadcast(f"Client said: {data}")
    except WebSocketDisconnect:
        await manager.disconnect(ws)

說明

  • connect 內先 accept 再加入集合。
  • broadcast 會逐一 send_text,若有客戶端斷線會在 except 中被移除。

2️⃣ 依使用者 ID 管理(點對點)

# manager_user.py
from typing import Dict
from fastapi import WebSocket
import asyncio

class UserWebSocketManager:
    def __init__(self):
        self.connections: Dict[int, WebSocket] = {}
        self._lock = asyncio.Lock()

    async def connect(self, user_id: int, ws: WebSocket):
        await ws.accept()
        async with self._lock:
            self.connections[user_id] = ws

    async def disconnect(self, user_id: int):
        async with self._lock:
            ws = self.connections.pop(user_id, None)
            if ws:
                await ws.close()

    async def send_personal_message(self, user_id: int, message: str):
        async with self._lock:
            ws = self.connections.get(user_id)
            if ws:
                await ws.send_text(message)
# main_user.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
from manager_user import UserWebSocketManager

app = FastAPI()
user_manager = UserWebSocketManager()

@app.websocket("/ws/user")
async def ws_user(ws: WebSocket, user_id: int = Query(...)):
    await user_manager.connect(user_id, ws)
    try:
        while True:
            data = await ws.receive_text()
            # 只回傳給自己
            await user_manager.send_personal_message(user_id, f"Echo: {data}")
    except WebSocketDisconnect:
        await user_manager.disconnect(user_id)

重點:使用 Query 取得 user_id,在實務上通常會改為 JWTSession 驗證後取得。

3️⃣ 房間(Group)概念的管理

# manager_room.py
from typing import DefaultDict, Set
from fastapi import WebSocket
import asyncio
from collections import defaultdict

class RoomManager:
    def __init__(self):
        self.rooms: DefaultDict[str, Set[WebSocket]] = defaultdict(set)
        self._lock = asyncio.Lock()

    async def join(self, room_name: str, ws: WebSocket):
        await ws.accept()
        async with self._lock:
            self.rooms[room_name].add(ws)

    async def leave(self, room_name: str, ws: WebSocket):
        async with self._lock:
            self.rooms[room_name].discard(ws)
            if not self.rooms[room_name]:
                del self.rooms[room_name]

    async def broadcast(self, room_name: str, message: str):
        async with self._lock:
            connections = self.rooms.get(room_name, set())
            for ws in connections:
                await ws.send_text(message)
# main_room.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
from manager_room import RoomManager

app = FastAPI()
room_manager = RoomManager()

@app.websocket("/ws/room")
async def ws_room(ws: WebSocket, room: str = Query(...)):
    await room_manager.join(room, ws)
    try:
        while True:
            data = await ws.receive_text()
            # 把訊息廣播給同房間的其他人
            await room_manager.broadcast(room, f"[{room}] {data}")
    except WebSocketDisconnect:
        await room_manager.leave(room, ws)

應用:聊天室、協作白板、即時遊戲等都可以直接使用此模式。

4️⃣ 心跳機制(Ping/Pong)避免斷線

# heartbeat.py
import asyncio
from fastapi import WebSocket

PING_INTERVAL = 15  # 秒

async def keepalive(ws: WebSocket):
    """持續發送 ping,若 client 未回應則關閉連線"""
    try:
        while True:
            await ws.send_json({"type": "ping"})
            await asyncio.sleep(PING_INTERVAL)
    except Exception:
        # 任意例外代表連線已斷開
        await ws.close()

WebSocketManager 中加入:

async def connect(self, ws: WebSocket):
    await ws.accept()
    async with self._lock:
        self.active_connections.add(ws)
    # 啟動背景心跳任務
    asyncio.create_task(keepalive(ws))

說明:前端收到 ping 後應回傳 pong,若超過一定次數未收到回應即可視為斷線,讓服務端主動釋放資源。

5️⃣ 完整範例:結合驗證、分組與錯誤處理

# full_manager.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt, asyncio
from typing import Dict, Set, DefaultDict
from collections import defaultdict

app = FastAPI()
bearer = HTTPBearer()
SECRET = "super-secret-key"

class ChatManager:
    def __init__(self):
        self.user_ws: Dict[int, WebSocket] = {}
        self.rooms: DefaultDict[str, Set[WebSocket]] = defaultdict(set)
        self._lock = asyncio.Lock()

    async def _get_user(self, token: str) -> int:
        try:
            payload = jwt.decode(token, SECRET, algorithms=["HS256"])
            return int(payload["sub"])
        except jwt.PyJWTError:
            raise HTTPException(status_code=403, detail="Invalid token")

    async def connect(self, token: str, room: str, ws: WebSocket):
        user_id = await self._get_user(token)
        await ws.accept()
        async with self._lock:
            self.user_ws[user_id] = ws
            self.rooms[room].add(ws)
        # 心跳
        asyncio.create_task(self._heartbeat(ws))

    async def disconnect(self, token: str, room: str, ws: WebSocket):
        user_id = await self._get_user(token)
        async with self._lock:
            self.user_ws.pop(user_id, None)
            self.rooms[room].discard(ws)
            if not self.rooms[room]:
                del self.rooms[room]

    async def broadcast(self, room: str, message: str):
        async with self._lock:
            for ws in self.rooms.get(room, []):
                await ws.send_text(message)

    async def _heartbeat(self, ws: WebSocket):
        try:
            while True:
                await ws.send_json({"type":"ping"})
                await asyncio.sleep(20)
        except Exception:
            await ws.close()

manager = ChatManager()

@app.websocket("/ws/chat")
async def chat_endpoint(ws: WebSocket,
                        token: str = Depends(bearer),
                        room: str = "general"):
    await manager.connect(token.credentials, room, ws)
    try:
        while True:
            data = await ws.receive_text()
            await manager.broadcast(room, f"[{room}] {data}")
    except WebSocketDisconnect:
        await manager.disconnect(token.credentials, room, ws)

關鍵點

  • 使用 JWT 驗證取得 user_id,保證每條連線都有身份。
  • rooms 仍使用 defaultdict[set],支援多房間同時存在。
  • 心跳機制與斷線清理寫在同一個類別,保持程式碼一致性。

常見陷阱與最佳實踐

陷阱 可能的後果 解決方式
未使用 await websocket.accept() 客戶端永遠收到 101 握手失敗 必須在任何訊息收發前先 accept
直接在迴圈內遍歷 set 且同時刪除 RuntimeError: Set changed size during iteration 先複製 list(self.active_connections) 再逐一發送,或使用 asyncio.Lock 包住整段迭代
忘記在例外或斷線時移除連線 記憶體泄漏、廣播時產生 WebSocketException disconnect 包在 finallyexcept WebSocketDisconnect 中呼叫
大量連線時同步寫入 競爭條件導致部分連線遺失或重複加入 使用 asyncio.Lockasyncio.Queue 做排程
前端未回傳 pong 心跳機制失效,伺服器無法偵測斷線 前端實作 onmessage 檢測 type: ping,回傳 pong
將大量業務邏輯放在 WebSocket 處理函式 影響單一協程的吞吐量,造成訊息延遲 把耗時任務交給背景工作者(如 Celery)或使用 asyncio.create_task 非阻塞執行

最佳實踐

  1. 封裝管理器:把所有連線操作集中在一個 class,避免在路由裡散布 add/remove 代碼。
  2. 使用型別提示Dict[int, WebSocket]DefaultDict[str, Set[WebSocket]] 能讓 IDE 與靜態分析工具提前捕捉錯誤。
  3. 心跳 + 超時:建議每 15~30 秒發一次 ping,若超過兩次未收到 pong,就主動關閉。
  4. 分離驗證與連線:先完成 JWT / OAuth2 驗證,再交給管理器,保持職責單一。
  5. 測試:利用 httpx.AsyncClientWebSocketTestClient 撰寫單元測試,確保 connectbroadcastdisconnect 行為正確。

實際應用場景

場景 為什麼需要多連線管理 建議的管理器類型
即時聊天系統 使用者會同時加入多個聊天室,訊息必須只送到該聊天室的成員 RoomManager(分組)
即時通知平台(如金融報價) 數千甚至數萬客戶端同時接收同樣的資料流 基於 set 的全局 WebSocketManager + 心跳
多人協作白板 每個文件(白板)是一個房間,使用者可同時編輯多個文件 RoomManager + UserWebSocketManager(可同時追蹤個人)
線上遊戲伺服器 玩家分在不同房間/隊伍,需要頻繁廣播位置、狀態 RoomManager + 頻率限制(每秒只廣播一次)
IoT 裝置監控 每台裝置維持一條長連線,伺服器需要即時推送指令 UserWebSocketManager(以裝置 ID 為鍵)

總結

  • WebSocketManager 是在 FastAPI 中管理大量 WebSocket 連線的核心工具,能幫助我們實現 全局廣播、點對點訊息、房間分組 等多種即時需求。
  • 透過 集合或字典 以及 非同步鎖,我們可以安全且高效地新增、刪除與遍歷連線。
  • 心跳機制驗證錯誤處理 必不可少,缺一不可才能在生產環境中保持穩定。
  • 本文提供了 5 個實作範例,從最簡單的全局廣播到結合 JWT 驗證與分組的完整解決方案,讀者可依需求自行取捨或組合。

只要遵循上述 最佳實踐,在 FastAPI 中使用 WebSocketManager 就能快速構建出 高效、可擴展且安全 的即時應用。祝開發順利,期待看到你用這套管理器打造的精彩即時服務!