本文 AI 產出,尚未審核
FastAPI WebSocket – 連線與斷線處理完整教學
簡介
WebSocket 是建立在 HTTP 之上的全雙工通訊協定,允許 伺服器與客戶端在單一連線上即時雙向傳輸資料。在即時聊天、即時通知、線上協作白板等需要低延遲的應用中,WebSocket 幾乎是唯一的選擇。
FastAPI 內建對 WebSocket 的支援,語法簡潔、與依賴注入、路由系統無縫結合,讓開發者能以最少的程式碼完成即時功能。
然而,WebSocket 並非「只要連上就永遠不會斷」的保證。連線斷線的處理是打造穩定服務的關鍵:
- 客戶端可能因網路不穩、瀏覽器關閉或使用者主動離開而斷線。
- 伺服器端若未正確釋放資源,會導致記憶體泄漏、連線池耗盡。
本篇文章將從概念、實作、常見陷阱到最佳實踐,完整說明在 FastAPI 中如何安全、優雅地管理 WebSocket 連線與斷線。
核心概念
1. WebSocket 的生命週期
| 階段 | 事件 | FastAPI 中的對應處理 |
|---|---|---|
| 建立連線 | 客戶端發送升級請求 (Upgrade) → 伺服器回應 101 Switching Protocols | @app.websocket("/ws") 的 websocket.accept() |
| 資料傳輸 | 雙向發送文字或二進位訊息 | await websocket.receive_text()、await websocket.send_text() |
| 斷線 | 客戶端關閉、網路中斷、伺服器主動關閉 | WebSocketDisconnect 例外、await websocket.close() |
了解這三個階段後,我們才能在 連線建立 時加入驗證、在 資料傳輸 時維持心跳、在 斷線 時釋放資源。
2. 例外 WebSocketDisconnect
FastAPI 會在客戶端斷線時拋出 WebSocketDisconnect,這是捕捉斷線事件的唯一方式。
from fastapi import WebSocket, WebSocketDisconnect
async def websocket_endpoint(ws: WebSocket):
await ws.accept()
try:
while True:
data = await ws.receive_text()
await ws.send_text(f"Echo: {data}")
except WebSocketDisconnect:
# 這裡會在斷線時被觸發
print("客戶端已斷線")
3. 心跳(Ping/Pong)機制
若長時間沒有訊息傳遞,某些防火牆或代理會自動關閉閒置連線。
FastAPI 內建 websocket.send_ping() / websocket.receive_pong(),或自行實作 定時發送訊息,保持連線活躍。
4. 連線管理(Connection Manager)
在多人聊天室、多人協作等情境下,我們常需要 追蹤所有活躍的 WebSocket,並能向特定或全部客戶端廣播訊息。
以下提供一個 連線管理器 範例,示範如何加入、移除、廣播。
程式碼範例
範例 1️⃣ 基本的 Echo Server(含斷線捕捉)
# file: main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket("/ws/echo")
async def websocket_echo(ws: WebSocket):
await ws.accept()
try:
while True:
# 接收客戶端文字訊息
data = await ws.receive_text()
# 回傳相同內容
await ws.send_text(f"Echo: {data}")
except WebSocketDisconnect:
# 客戶端斷線時執行的清理工作
print("Echo client disconnected")
說明
await ws.accept()必須在任何接收/發送之前呼叫。WebSocketDisconnect捕捉斷線,可在此處記錄日誌或釋放資源。
範例 2️⃣ 心跳機制(Ping/Pong)
import asyncio
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
PING_INTERVAL = 15 # 秒
@app.websocket("/ws/heartbeat")
async def websocket_heartbeat(ws: WebSocket):
await ws.accept()
async def send_ping():
while True:
await asyncio.sleep(PING_INTERVAL)
try:
await ws.send_ping()
# 若對端未回應 pong,會拋出例外
await ws.receive_pong()
except Exception:
# 斷線或無回應時直接關閉
await ws.close()
break
ping_task = asyncio.create_task(send_ping())
try:
while True:
msg = await ws.receive_text()
await ws.send_text(f"收到: {msg}")
except WebSocketDisconnect:
print("Heartbeat client disconnected")
finally:
ping_task.cancel()
說明
send_ping以asyncio.create_task背景執行,每PING_INTERVAL秒發送一次 ping。- 若
receive_pong超時或失敗,立即關閉連線,避免資源浪費。
範例 3️⃣ 連線管理器(多人聊天室)
# file: manager.py
from typing import List
from fastapi import WebSocket
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept()
self.active_connections.append(ws)
def disconnect(self, ws: WebSocket):
self.active_connections.remove(ws)
async def send_personal_message(self, message: str, ws: WebSocket):
await ws.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
# file: main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from manager import ConnectionManager
app = FastAPI()
manager = ConnectionManager()
@app.websocket("/ws/chat")
async def chat_endpoint(ws: WebSocket):
await manager.connect(ws)
try:
while True:
data = await ws.receive_text()
# 以 "username: message" 格式傳遞
await manager.broadcast(f"[訊息] {data}")
except WebSocketDisconnect:
manager.disconnect(ws)
await manager.broadcast("一位使用者已離開聊天室")
說明
ConnectionManager把所有活躍的 WebSocket 放入列表,提供 個人訊息 與 廣播 兩種常用功能。- 斷線時必須呼叫
manager.disconnect(ws),否則列表會保留失效的連線,最終導致 記憶體泄漏。
範例 4️⃣ 認證與斷線清理(使用 JWT)
import jwt
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException, status
app = FastAPI()
SECRET_KEY = "your-secret-key"
def get_current_user(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return payload["sub"]
except jwt.PyJWTError:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid authentication credentials")
@app.websocket("/ws/auth")
async def websocket_auth(ws: WebSocket, token: str = None):
# 1️⃣ 先驗證 token
if token is None:
await ws.close(code=1008) # Policy Violation
return
try:
user = get_current_user(token)
except HTTPException:
await ws.close(code=1008)
return
await ws.accept()
try:
while True:
data = await ws.receive_text()
await ws.send_text(f"{user} 說: {data}")
except WebSocketDisconnect:
print(f"使用者 {user} 已斷線")
說明
- 在 WebSocket 握手階段無法直接使用
Depends,必須手動驗證 token(可從 query string、header 或 cookie 取得)。 - 若驗證失敗,用
await ws.close(code=1008)立即關閉連線,避免未授權的客戶端佔用資源。
範例 5️⃣ 使用 async_generator 自動管理斷線(FastAPI 0.78+)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from contextlib import asynccontextmanager
app = FastAPI()
@asynccontextmanager
async def websocket_lifecycle(ws: WebSocket):
await ws.accept()
try:
yield ws
finally:
# 這裡一定會在斷線或錯誤時執行
await ws.close()
print("WebSocket 已關閉")
@app.websocket("/ws/context")
async def websocket_context(ws: WebSocket):
async with websocket_lifecycle(ws) as client:
while True:
try:
data = await client.receive_text()
await client.send_text(f"回覆: {data}")
except WebSocketDisconnect:
break
說明
asynccontextmanager讓 資源清理 集中在finally區塊,減少每個 endpoint 重複寫except/finally的樣板程式碼。
常見陷阱與最佳實踐
| 陷阱 | 為什麼會發生 | 解決方案 |
|---|---|---|
忘記呼叫 await websocket.accept() |
直接進入 receive_* 會拋 RuntimeError |
必須在任何 I/O 前先 accept() |
| 斷線後仍持續發送 | 背景任務未取消或 broadcast 包含已斷線的連線 |
在 WebSocketDisconnect 中 移除 連線,並在廣播前檢查 connection.client_state |
| 心跳間隔過長 | 防火牆或雲端負載平衡器在 30 秒內關閉閒置連線 | 建議 10~15 秒 發一次 ping,或根據部署環境調整 |
在 except 區塊裡直接拋例外 |
會導致 FastAPI 把錯誤傳回客戶端,產生不必要的 500 錯誤 | 只捕捉 WebSocketDisconnect,其他錯誤可記錄後自行關閉 |
在 broadcast 時使用同步迴圈 |
大量連線時會阻塞事件迴圈,導致所有客戶端卡住 | 使用 asyncio.gather(*tasks, return_exceptions=True) 或 分批發送 |
| 認證資訊放在 query string | 會被瀏覽器快取、日誌紀錄,安全性較低 | 優先使用 Header 或 Cookie,或在 TLS 下使用 Sec-WebSocket-Protocol 進行協商 |
最佳實踐清單
- 統一管理連線:建立
ConnectionManager或類似抽象層,所有 endpoint 只透過它操作連線。 - 使用
asynccontextmanager:確保await ws.close()必定在斷線或異常時執行。 - 心跳 + 超時:
ping_interval+receive_pong+asyncio.wait_for,在超時後主動關閉。 - 例外分層:只捕捉
WebSocketDisconnect,其他例外交給全域例外處理器或記錄後關閉。 - 資源釋放:斷線時同時清除與之關聯的資料(例如使用者所在的聊天室、訂閱的訊息主題)。
- 測試斷線情境:利用
websocat、wscat或自訂腳本模擬突斷、網路卡頓,驗證伺服器不會泄漏。
實際應用場景
| 場景 | 為什麼需要斷線處理 | 典型實作方式 |
|---|---|---|
| 即時聊天系統 | 使用者可能隨時關閉分頁或切換網路,必須即時移除其在聊天室的成員列表 | ConnectionManager + WebSocketDisconnect 觸發「使用者離開」廣播 |
| 線上多人協作白板 | 當使用者斷線時,需要撤回其未完成的繪圖指令,避免畫面卡住 | 在 disconnect 時刪除該使用者的臨時圖層或回滾操作 |
| 即時通知 (Push) 系統 | 行動裝置可能因省電模式斷線,後端需重新排程未送達的通知 | 斷線時把未送達訊息寫入 Redis Queue,待客戶端重新連線時補發 |
| 金融交易平台 | 斷線可能導致資金凍結或交易未完成,需要立即回滾或取消掛單 | 斷線時觸發資料庫交易回滾,並向監控系統發送告警 |
| IoT 裝置監控 | 裝置連線不穩,斷線後需自動切換至備援節點 | 使用 ping/pong 檢測,斷線即時切換 MQTT/HTTP 備援通道 |
總結
- WebSocket 斷線處理是服務穩定性的根本,不僅關係到資源釋放,也直接影響使用者體驗。
- FastAPI 提供
WebSocketDisconnect、await ws.close()、asynccontextmanager等工具,讓開發者能以 清晰、可維護 的方式完成斷線偵測與清理。 - 心跳機制、連線管理器 以及 認證檢查 應該在每個 WebSocket endpoint 中被視為標準配置。
- 透過 最佳實踐清單(統一管理、資源釋放、測試斷線)與 實務案例(聊天、白板、金融等),你可以快速把理論落實到專案中,打造出 高可用、低延遲 的即時應用。
祝你在 FastAPI 的 WebSocket 世界裡玩得開心,寫出既 安全 又 好用 的即時服務! 🚀