本文 AI 產出,尚未審核

FastAPI WebSocket – 連線與斷線處理完整教學


簡介

WebSocket 是建立在 HTTP 之上的全雙工通訊協定,允許 伺服器與客戶端在單一連線上即時雙向傳輸資料。在即時聊天、即時通知、線上協作白板等需要低延遲的應用中,WebSocket 幾乎是唯一的選擇。

FastAPI 內建對 WebSocket 的支援,語法簡潔、與依賴注入、路由系統無縫結合,讓開發者能以最少的程式碼完成即時功能。

然而,WebSocket 並非「只要連上就永遠不會斷」的保證。連線斷線的處理是打造穩定服務的關鍵:

  • 客戶端可能因網路不穩、瀏覽器關閉或使用者主動離開而斷線。
  • 伺服器端若未正確釋放資源,會導致記憶體泄漏、連線池耗盡。

本篇文章將從概念、實作、常見陷阱到最佳實踐,完整說明在 FastAPI 中如何安全、優雅地管理 WebSocket 連線與斷線。


核心概念

1. WebSocket 的生命週期

階段 事件 FastAPI 中的對應處理
建立連線 客戶端發送升級請求 (Upgrade) → 伺服器回應 101 Switching Protocols @app.websocket("/ws")websocket.accept()
資料傳輸 雙向發送文字或二進位訊息 await websocket.receive_text()await websocket.send_text()
斷線 客戶端關閉、網路中斷、伺服器主動關閉 WebSocketDisconnect 例外、await websocket.close()

了解這三個階段後,我們才能在 連線建立 時加入驗證、在 資料傳輸 時維持心跳、在 斷線 時釋放資源。


2. 例外 WebSocketDisconnect

FastAPI 會在客戶端斷線時拋出 WebSocketDisconnect,這是捕捉斷線事件的唯一方式。

from fastapi import WebSocket, WebSocketDisconnect

async def websocket_endpoint(ws: WebSocket):
    await ws.accept()
    try:
        while True:
            data = await ws.receive_text()
            await ws.send_text(f"Echo: {data}")
    except WebSocketDisconnect:
        # 這裡會在斷線時被觸發
        print("客戶端已斷線")

3. 心跳(Ping/Pong)機制

若長時間沒有訊息傳遞,某些防火牆或代理會自動關閉閒置連線。
FastAPI 內建 websocket.send_ping() / websocket.receive_pong(),或自行實作 定時發送訊息,保持連線活躍。


4. 連線管理(Connection Manager)

在多人聊天室、多人協作等情境下,我們常需要 追蹤所有活躍的 WebSocket,並能向特定或全部客戶端廣播訊息。
以下提供一個 連線管理器 範例,示範如何加入、移除、廣播。


程式碼範例

範例 1️⃣ 基本的 Echo Server(含斷線捕捉)

# file: main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws/echo")
async def websocket_echo(ws: WebSocket):
    await ws.accept()
    try:
        while True:
            # 接收客戶端文字訊息
            data = await ws.receive_text()
            # 回傳相同內容
            await ws.send_text(f"Echo: {data}")
    except WebSocketDisconnect:
        # 客戶端斷線時執行的清理工作
        print("Echo client disconnected")

說明

  • await ws.accept() 必須在任何接收/發送之前呼叫。
  • WebSocketDisconnect 捕捉斷線,可在此處記錄日誌或釋放資源。

範例 2️⃣ 心跳機制(Ping/Pong)

import asyncio
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()
PING_INTERVAL = 15  # 秒

@app.websocket("/ws/heartbeat")
async def websocket_heartbeat(ws: WebSocket):
    await ws.accept()
    async def send_ping():
        while True:
            await asyncio.sleep(PING_INTERVAL)
            try:
                await ws.send_ping()
                # 若對端未回應 pong,會拋出例外
                await ws.receive_pong()
            except Exception:
                # 斷線或無回應時直接關閉
                await ws.close()
                break

    ping_task = asyncio.create_task(send_ping())
    try:
        while True:
            msg = await ws.receive_text()
            await ws.send_text(f"收到: {msg}")
    except WebSocketDisconnect:
        print("Heartbeat client disconnected")
    finally:
        ping_task.cancel()

說明

  • send_pingasyncio.create_task 背景執行,每 PING_INTERVAL 秒發送一次 ping。
  • receive_pong 超時或失敗,立即關閉連線,避免資源浪費。

範例 3️⃣ 連線管理器(多人聊天室)

# file: manager.py
from typing import List
from fastapi import WebSocket

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, ws: WebSocket):
        await ws.accept()
        self.active_connections.append(ws)

    def disconnect(self, ws: WebSocket):
        self.active_connections.remove(ws)

    async def send_personal_message(self, message: str, ws: WebSocket):
        await ws.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)
# file: main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from manager import ConnectionManager

app = FastAPI()
manager = ConnectionManager()

@app.websocket("/ws/chat")
async def chat_endpoint(ws: WebSocket):
    await manager.connect(ws)
    try:
        while True:
            data = await ws.receive_text()
            # 以 "username: message" 格式傳遞
            await manager.broadcast(f"[訊息] {data}")
    except WebSocketDisconnect:
        manager.disconnect(ws)
        await manager.broadcast("一位使用者已離開聊天室")

說明

  • ConnectionManager 把所有活躍的 WebSocket 放入列表,提供 個人訊息廣播 兩種常用功能。
  • 斷線時必須呼叫 manager.disconnect(ws),否則列表會保留失效的連線,最終導致 記憶體泄漏

範例 4️⃣ 認證與斷線清理(使用 JWT)

import jwt
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException, status

app = FastAPI()
SECRET_KEY = "your-secret-key"

def get_current_user(token: str):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return payload["sub"]
    except jwt.PyJWTError:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
                            detail="Invalid authentication credentials")

@app.websocket("/ws/auth")
async def websocket_auth(ws: WebSocket, token: str = None):
    # 1️⃣ 先驗證 token
    if token is None:
        await ws.close(code=1008)  # Policy Violation
        return
    try:
        user = get_current_user(token)
    except HTTPException:
        await ws.close(code=1008)
        return

    await ws.accept()
    try:
        while True:
            data = await ws.receive_text()
            await ws.send_text(f"{user} 說: {data}")
    except WebSocketDisconnect:
        print(f"使用者 {user} 已斷線")

說明

  • 在 WebSocket 握手階段無法直接使用 Depends,必須手動驗證 token(可從 query string、header 或 cookie 取得)。
  • 若驗證失敗,用 await ws.close(code=1008) 立即關閉連線,避免未授權的客戶端佔用資源。

範例 5️⃣ 使用 async_generator 自動管理斷線(FastAPI 0.78+)

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from contextlib import asynccontextmanager

app = FastAPI()

@asynccontextmanager
async def websocket_lifecycle(ws: WebSocket):
    await ws.accept()
    try:
        yield ws
    finally:
        # 這裡一定會在斷線或錯誤時執行
        await ws.close()
        print("WebSocket 已關閉")

@app.websocket("/ws/context")
async def websocket_context(ws: WebSocket):
    async with websocket_lifecycle(ws) as client:
        while True:
            try:
                data = await client.receive_text()
                await client.send_text(f"回覆: {data}")
            except WebSocketDisconnect:
                break

說明

  • asynccontextmanager資源清理 集中在 finally 區塊,減少每個 endpoint 重複寫 except/finally 的樣板程式碼。

常見陷阱與最佳實踐

陷阱 為什麼會發生 解決方案
忘記呼叫 await websocket.accept() 直接進入 receive_* 會拋 RuntimeError 必須在任何 I/O 前先 accept()
斷線後仍持續發送 背景任務未取消或 broadcast 包含已斷線的連線 WebSocketDisconnect移除 連線,並在廣播前檢查 connection.client_state
心跳間隔過長 防火牆或雲端負載平衡器在 30 秒內關閉閒置連線 建議 10~15 秒 發一次 ping,或根據部署環境調整
except 區塊裡直接拋例外 會導致 FastAPI 把錯誤傳回客戶端,產生不必要的 500 錯誤 只捕捉 WebSocketDisconnect,其他錯誤可記錄後自行關閉
broadcast 時使用同步迴圈 大量連線時會阻塞事件迴圈,導致所有客戶端卡住 使用 asyncio.gather(*tasks, return_exceptions=True)分批發送
認證資訊放在 query string 會被瀏覽器快取、日誌紀錄,安全性較低 優先使用 HeaderCookie,或在 TLS 下使用 Sec-WebSocket-Protocol 進行協商

最佳實踐清單

  1. 統一管理連線:建立 ConnectionManager 或類似抽象層,所有 endpoint 只透過它操作連線。
  2. 使用 asynccontextmanager:確保 await ws.close() 必定在斷線或異常時執行。
  3. 心跳 + 超時ping_interval + receive_pong + asyncio.wait_for,在超時後主動關閉。
  4. 例外分層:只捕捉 WebSocketDisconnect,其他例外交給全域例外處理器或記錄後關閉。
  5. 資源釋放:斷線時同時清除與之關聯的資料(例如使用者所在的聊天室、訂閱的訊息主題)。
  6. 測試斷線情境:利用 websocatwscat 或自訂腳本模擬突斷、網路卡頓,驗證伺服器不會泄漏。

實際應用場景

場景 為什麼需要斷線處理 典型實作方式
即時聊天系統 使用者可能隨時關閉分頁或切換網路,必須即時移除其在聊天室的成員列表 ConnectionManager + WebSocketDisconnect 觸發「使用者離開」廣播
線上多人協作白板 當使用者斷線時,需要撤回其未完成的繪圖指令,避免畫面卡住 disconnect 時刪除該使用者的臨時圖層或回滾操作
即時通知 (Push) 系統 行動裝置可能因省電模式斷線,後端需重新排程未送達的通知 斷線時把未送達訊息寫入 Redis Queue,待客戶端重新連線時補發
金融交易平台 斷線可能導致資金凍結或交易未完成,需要立即回滾或取消掛單 斷線時觸發資料庫交易回滾,並向監控系統發送告警
IoT 裝置監控 裝置連線不穩,斷線後需自動切換至備援節點 使用 ping/pong 檢測,斷線即時切換 MQTT/HTTP 備援通道

總結

  • WebSocket 斷線處理是服務穩定性的根本,不僅關係到資源釋放,也直接影響使用者體驗。
  • FastAPI 提供 WebSocketDisconnectawait ws.close()asynccontextmanager 等工具,讓開發者能以 清晰、可維護 的方式完成斷線偵測與清理。
  • 心跳機制連線管理器 以及 認證檢查 應該在每個 WebSocket endpoint 中被視為標準配置。
  • 透過 最佳實踐清單(統一管理、資源釋放、測試斷線)與 實務案例(聊天、白板、金融等),你可以快速把理論落實到專案中,打造出 高可用、低延遲 的即時應用。

祝你在 FastAPI 的 WebSocket 世界裡玩得開心,寫出既 安全好用 的即時服務! 🚀