FastAPI – WebSocket:接收與傳送訊息完整教學
簡介
在現代的 Web 應用程式中,即時雙向通訊已成為不可或缺的功能。聊天系統、即時通知、協同編輯、線上遊戲等,都需要伺服器與客戶端能夠隨時互相推送訊息。傳統的 HTTP 請求是一次性、單向的,無法滿足這類需求。
FastAPI 內建對 WebSocket 的支援,讓開發者只需少量程式碼就能建立高效、非阻塞的雙向通道。
本篇文章將從 概念說明、實作範例、常見陷阱、最佳實踐 以及 實務應用 四個面向,完整介紹如何在 FastAPI 中 接收 與 傳送 訊息。文章內容以繁體中文(台灣)撰寫,適合剛接觸 FastAPI 的初學者,也能為已有基礎的開發者提供實務參考。
核心概念
1. WebSocket 基礎
- 全雙工 (Full‑duplex):連線建立後,伺服器與客戶端可以同時發送與接收資料。
- 持久連線:不同於每次請求都要重新建立 TCP 連線,WebSocket 在握手完成後會保持開放,降低建立連線的開銷。
- 協議升級:瀏覽器先以 HTTP 發起
Upgrade: websocket請求,伺服器回應101 Switching Protocols後,通訊即改用 WebSocket 協議。
FastAPI 透過 Starlette(底層 ASGI 框架)提供 WebSocket 類別,讓我們可以直接在路由函式中使用 await websocket.accept()、await websocket.receive_text()、await websocket.send_text() 等非阻塞 API。
2. ASGI 與非同步程式設計
FastAPI 完全基於 ASGI(Asynchronous Server Gateway Interface),支援非同步函式 (async def)。在 WebSocket 連線中,每一次的 receive 與 send 都是 協程,必須使用 await,才能讓事件迴圈正確切換任務,避免阻塞其他連線。
小提醒:若在 WebSocket 處理程式內混用同步程式碼(例如直接呼叫阻塞的資料庫操作),會造成整個伺服器效能下降,建議使用支援 async 的套件或將阻塞操作包在
run_in_threadpool中。
3. 連線管理
在多使用者的即時應用中,我們常需要 追蹤所有連線(例如廣播訊息給全部使用者)。最簡單的做法是將 WebSocket 物件放入一個集合(如 set)中,並在連線關閉時移除。若要支援更複雜的需求(分房間、使用者認證),可以自行實作 ConnectionManager 類別,將連線與使用者資訊映射。
程式碼範例
以下示範 5 個實用範例,從最簡單的回音伺服器到支援多房間的聊天室。每段程式碼都附有說明註解,方便讀者快速理解。
範例 1:最簡單的回音 WebSocket
# main.py
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws/echo")
async def websocket_echo(websocket: WebSocket):
# 接受連線
await websocket.accept()
while True:
# 接收文字訊息
data = await websocket.receive_text()
# 將收到的訊息回傳給客戶端
await websocket.send_text(f"Echo: {data}")
說明
await websocket.accept()必須在任何收發動作之前呼叫。while True形成一個永遠循環,直到客戶端斷線或拋出例外。
範例 2:簡易的廣播系統(單一房間)
# broadcast.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
# 同時向所有連線發送訊息
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/chat")
async def chat_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
# 收到訊息後廣播給全部使用者
await manager.broadcast(f"User says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
重點
- 使用
ConnectionManager把所有連線集中管理,斷線時必須手動移除,否則會出現「已斷線的 socket」導致await connection.send_text拋錯。WebSocketDisconnect是 Starlette 提供的例外,用於捕捉客戶端關閉連線的情況。
範例 3:分房間的聊天室(Room‑based)
# room_chat.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from collections import defaultdict
from typing import List, Dict
app = FastAPI()
class RoomManager:
def __init__(self):
# 每個房間對應一個 WebSocket 列表
self.rooms: Dict[str, List[WebSocket]] = defaultdict(list)
async def connect(self, websocket: WebSocket, room: str):
await websocket.accept()
self.rooms[room].append(websocket)
def disconnect(self, websocket: WebSocket, room: str):
self.rooms[room].remove(websocket)
# 若房間內無人,移除該房間鍵值
if not self.rooms[room]:
del self.rooms[room]
async def send_to_room(self, room: str, message: str):
for ws in self.rooms.get(room, []):
await ws.send_text(message)
manager = RoomManager()
@app.websocket("/ws/room/{room_name}")
async def room_endpoint(websocket: WebSocket, room_name: str):
await manager.connect(websocket, room_name)
try:
while True:
data = await websocket.receive_text()
# 只向同一房間的使用者廣播
await manager.send_to_room(room_name, f"[{room_name}] {data}")
except WebSocketDisconnect:
manager.disconnect(websocket, room_name)
技巧
- 使用
defaultdict(list)可以避免手動檢查房間是否已存在。- 若需要 身份驗證,可以在
connect前先讀取 URL query string 或 Header,判斷使用者是否有權限加入該房間。
範例 4:傳送二進位檔案(如圖片、音訊)
# binary_transfer.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket("/ws/upload")
async def upload_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
# 接收二進位資料
data = await websocket.receive_bytes()
# 這裡僅示範回傳相同資料,實務上可寫入檔案系統或雲端儲存
await websocket.send_bytes(data)
except WebSocketDisconnect:
pass
注意
receive_bytes()與send_bytes()允許傳遞 原始二進位,適合檔案上傳或流式媒體。- 若檔案過大,建議在前端切割成多個小塊(chunk)傳送,並在伺服器端組合。
範例 5:結合 JWT 認證的安全 WebSocket
# auth_ws.py
import jwt
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
app = FastAPI()
security = HTTPBearer()
SECRET_KEY = "your-secret-key"
def decode_jwt(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return payload
except jwt.PyJWTError:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid authentication credentials",
)
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
return decode_jwt(credentials.credentials)
@app.websocket("/ws/secure")
async def secure_ws(websocket: WebSocket, user=Depends(get_current_user)):
# 連線成功即代表 token 有效
await websocket.accept()
await websocket.send_text(f"Welcome {user['sub']}!")
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo from secure channel: {data}")
except WebSocketDisconnect:
pass
重點說明
HTTPBearer會自動從 Authorization Header 抽取Bearer <token>。- 使用
Depends把驗證邏輯注入到 WebSocket 路由,確保只有合法使用者能建立連線。- 若驗證失敗,FastAPI 會直接回傳
403 Forbidden,客戶端無法進入 WebSocket 階段。
常見陷阱與最佳實踐
| 常見問題 | 可能原因 | 解決方案 |
|---|---|---|
| 連線斷掉後仍持續發送 | 未正確捕捉 WebSocketDisconnect,導致仍在 while True 中嘗試 send |
在 except WebSocketDisconnect 區塊內 移除連線,並 break 迴圈 |
| 訊息順序錯亂 | 多個協程同時 await manager.broadcast(),沒有排程保證順序 |
若順序重要,可使用 Queue(asyncio.Queue)或 Lock 來序列化發送 |
| 程式阻塞 | 在 WebSocket 處理函式裡直接呼叫同步的資料庫或檔案 I/O | 改用支援 async 的套件(如 databases、aioredis),或使用 run_in_threadpool 包裝阻塞函式 |
| 記憶體洩漏 | ConnectionManager 沒有在斷線時移除 WebSocket,導致集合持續增長 |
確保 每一次 WebSocketDisconnect 都呼叫 disconnect,或使用 WeakSet 讓 GC 自動回收 |
| 跨域 WebSocket 被阻擋 | 未設定 CORS 或 allowed_origins,瀏覽器安全策略拒絕連線 |
在 FastAPI 初始化時加入 CORSMiddleware,或在 WebSocket 路由使用 origin 檢查 |
推薦的最佳實踐
- 使用 ConnectionManager:集中管理所有連線,方便廣播、分房間與斷線清理。
- 將業務邏輯抽離:WebSocket 路由僅負責收發,真正的訊息處理(儲存、驗證、推播)應放在服務層或背景任務(
asyncio.create_task)中。 - 心跳機制:定期發送
ping/pong(await websocket.send_text('ping'))或使用websocket.receive()超時檢測,避免長時間閒置的死連線。 - 錯誤回傳格式:統一使用 JSON 結構回傳錯誤,前端可直接解析,例如
{"type":"error","code":4001,"msg":"Invalid payload"}。 - 日誌與監控:記錄每一次連線、斷線與訊息量,配合 Prometheus 或 Grafana 監控 WebSocket 連線數與訊息速率。
實際應用場景
| 場景 | 為何使用 WebSocket | FastAPI 實作要點 |
|---|---|---|
| 即時聊天室 | 使用者需即時看到其他人的訊息,且訊息量大 | 建立 ConnectionManager + 分房間;使用 JWT 驗證確保身分 |
| 線上遊戲 (即時位置同步) | 每秒鐘多次的座標更新,需要低延遲 | 使用二進位傳輸 (receive_bytes)、心跳檢測、防止封包堆疊 |
| 即時儀表板 | 後端資料變動時即時推送給前端圖表 | 透過 broadcast 把更新事件推給所有連線的瀏覽器 |
| 文件協同編輯 | 多人同時編輯同一文件,變更需要即時同步 | 分房間(每個文件一個房間),使用 json 包裝操作指令 |
| IoT 裝置監控 | 裝置向伺服器持續回報感測資料,伺服器即時下指令 | 采用二進位或 JSON,配合 asyncio.Queue 處理高頻訊息 |
總結
- WebSocket 為全雙工、持久的即時通訊協議,FastAPI 以非同步、易上手的方式提供完整支援。
- 透過 ConnectionManager 可以輕鬆管理多條連線、實作廣播或分房間功能;JWT 或 Header 能為 WebSocket 加上安全層。
- 開發時要特別留意 斷線清理、非阻塞 I/O、心跳機制,以及 CORS 設定,避免常見的效能與安全問題。
- 本文的 5 個實作範例涵蓋 回音、廣播、分房間、二進位傳輸、認證,足以作為大多數即時應用的起點。只要再根據實際需求加入資料庫、訊息佇列或分散式 Pub/Sub(如 Redis、Kafka),即可打造 高可用、可擴展 的即時系統。
祝你在 FastAPI 的 WebSocket 世界裡玩得開心,開發出快速、穩定且安全的即時服務! 🎉