本文 AI 產出,尚未審核

FastAPI – WebSocket:接收與傳送訊息完整教學


簡介

在現代的 Web 應用程式中,即時雙向通訊已成為不可或缺的功能。聊天系統、即時通知、協同編輯、線上遊戲等,都需要伺服器與客戶端能夠隨時互相推送訊息。傳統的 HTTP 請求是一次性、單向的,無法滿足這類需求。
FastAPI 內建對 WebSocket 的支援,讓開發者只需少量程式碼就能建立高效、非阻塞的雙向通道。

本篇文章將從 概念說明實作範例常見陷阱最佳實踐 以及 實務應用 四個面向,完整介紹如何在 FastAPI 中 接收傳送 訊息。文章內容以繁體中文(台灣)撰寫,適合剛接觸 FastAPI 的初學者,也能為已有基礎的開發者提供實務參考。


核心概念

1. WebSocket 基礎

  • 全雙工 (Full‑duplex):連線建立後,伺服器與客戶端可以同時發送與接收資料。
  • 持久連線:不同於每次請求都要重新建立 TCP 連線,WebSocket 在握手完成後會保持開放,降低建立連線的開銷。
  • 協議升級:瀏覽器先以 HTTP 發起 Upgrade: websocket 請求,伺服器回應 101 Switching Protocols 後,通訊即改用 WebSocket 協議。

FastAPI 透過 Starlette(底層 ASGI 框架)提供 WebSocket 類別,讓我們可以直接在路由函式中使用 await websocket.accept()await websocket.receive_text()await websocket.send_text() 等非阻塞 API。

2. ASGI 與非同步程式設計

FastAPI 完全基於 ASGI(Asynchronous Server Gateway Interface),支援非同步函式 (async def)。在 WebSocket 連線中,每一次的 receivesend 都是 協程,必須使用 await,才能讓事件迴圈正確切換任務,避免阻塞其他連線。

小提醒:若在 WebSocket 處理程式內混用同步程式碼(例如直接呼叫阻塞的資料庫操作),會造成整個伺服器效能下降,建議使用支援 async 的套件或將阻塞操作包在 run_in_threadpool 中。

3. 連線管理

在多使用者的即時應用中,我們常需要 追蹤所有連線(例如廣播訊息給全部使用者)。最簡單的做法是將 WebSocket 物件放入一個集合(如 set)中,並在連線關閉時移除。若要支援更複雜的需求(分房間、使用者認證),可以自行實作 ConnectionManager 類別,將連線與使用者資訊映射。


程式碼範例

以下示範 5 個實用範例,從最簡單的回音伺服器到支援多房間的聊天室。每段程式碼都附有說明註解,方便讀者快速理解。

範例 1:最簡單的回音 WebSocket

# main.py
from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws/echo")
async def websocket_echo(websocket: WebSocket):
    # 接受連線
    await websocket.accept()
    while True:
        # 接收文字訊息
        data = await websocket.receive_text()
        # 將收到的訊息回傳給客戶端
        await websocket.send_text(f"Echo: {data}")

說明

  • await websocket.accept() 必須在任何收發動作之前呼叫。
  • while True 形成一個永遠循環,直到客戶端斷線或拋出例外。

範例 2:簡易的廣播系統(單一房間)

# broadcast.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        # 同時向所有連線發送訊息
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/chat")
async def chat_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            # 收到訊息後廣播給全部使用者
            await manager.broadcast(f"User says: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)

重點

  • 使用 ConnectionManager 把所有連線集中管理,斷線時必須手動移除,否則會出現「已斷線的 socket」導致 await connection.send_text 拋錯。
  • WebSocketDisconnect 是 Starlette 提供的例外,用於捕捉客戶端關閉連線的情況。

範例 3:分房間的聊天室(Room‑based)

# room_chat.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from collections import defaultdict
from typing import List, Dict

app = FastAPI()

class RoomManager:
    def __init__(self):
        # 每個房間對應一個 WebSocket 列表
        self.rooms: Dict[str, List[WebSocket]] = defaultdict(list)

    async def connect(self, websocket: WebSocket, room: str):
        await websocket.accept()
        self.rooms[room].append(websocket)

    def disconnect(self, websocket: WebSocket, room: str):
        self.rooms[room].remove(websocket)
        # 若房間內無人,移除該房間鍵值
        if not self.rooms[room]:
            del self.rooms[room]

    async def send_to_room(self, room: str, message: str):
        for ws in self.rooms.get(room, []):
            await ws.send_text(message)

manager = RoomManager()

@app.websocket("/ws/room/{room_name}")
async def room_endpoint(websocket: WebSocket, room_name: str):
    await manager.connect(websocket, room_name)
    try:
        while True:
            data = await websocket.receive_text()
            # 只向同一房間的使用者廣播
            await manager.send_to_room(room_name, f"[{room_name}] {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket, room_name)

技巧

  • 使用 defaultdict(list) 可以避免手動檢查房間是否已存在。
  • 若需要 身份驗證,可以在 connect 前先讀取 URL query string 或 Header,判斷使用者是否有權限加入該房間。

範例 4:傳送二進位檔案(如圖片、音訊)

# binary_transfer.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws/upload")
async def upload_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # 接收二進位資料
            data = await websocket.receive_bytes()
            # 這裡僅示範回傳相同資料,實務上可寫入檔案系統或雲端儲存
            await websocket.send_bytes(data)
    except WebSocketDisconnect:
        pass

注意

  • receive_bytes()send_bytes() 允許傳遞 原始二進位,適合檔案上傳或流式媒體。
  • 若檔案過大,建議在前端切割成多個小塊(chunk)傳送,並在伺服器端組合。

範例 5:結合 JWT 認證的安全 WebSocket

# auth_ws.py
import jwt
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer

app = FastAPI()
security = HTTPBearer()

SECRET_KEY = "your-secret-key"

def decode_jwt(token: str):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return payload
    except jwt.PyJWTError:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Invalid authentication credentials",
        )

async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
    return decode_jwt(credentials.credentials)

@app.websocket("/ws/secure")
async def secure_ws(websocket: WebSocket, user=Depends(get_current_user)):
    # 連線成功即代表 token 有效
    await websocket.accept()
    await websocket.send_text(f"Welcome {user['sub']}!")
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Echo from secure channel: {data}")
    except WebSocketDisconnect:
        pass

重點說明

  • HTTPBearer 會自動從 Authorization Header 抽取 Bearer <token>
  • 使用 Depends 把驗證邏輯注入到 WebSocket 路由,確保只有合法使用者能建立連線。
  • 若驗證失敗,FastAPI 會直接回傳 403 Forbidden,客戶端無法進入 WebSocket 階段。

常見陷阱與最佳實踐

常見問題 可能原因 解決方案
連線斷掉後仍持續發送 未正確捕捉 WebSocketDisconnect,導致仍在 while True 中嘗試 send except WebSocketDisconnect 區塊內 移除連線,並 break 迴圈
訊息順序錯亂 多個協程同時 await manager.broadcast(),沒有排程保證順序 若順序重要,可使用 Queueasyncio.Queue)或 Lock 來序列化發送
程式阻塞 在 WebSocket 處理函式裡直接呼叫同步的資料庫或檔案 I/O 改用支援 async 的套件(如 databasesaioredis),或使用 run_in_threadpool 包裝阻塞函式
記憶體洩漏 ConnectionManager 沒有在斷線時移除 WebSocket,導致集合持續增長 確保 每一次 WebSocketDisconnect 都呼叫 disconnect,或使用 WeakSet 讓 GC 自動回收
跨域 WebSocket 被阻擋 未設定 CORS 或 allowed_origins,瀏覽器安全策略拒絕連線 FastAPI 初始化時加入 CORSMiddleware,或在 WebSocket 路由使用 origin 檢查

推薦的最佳實踐

  1. 使用 ConnectionManager:集中管理所有連線,方便廣播、分房間與斷線清理。
  2. 將業務邏輯抽離:WebSocket 路由僅負責收發,真正的訊息處理(儲存、驗證、推播)應放在服務層或背景任務(asyncio.create_task)中。
  3. 心跳機制:定期發送 ping/pongawait websocket.send_text('ping'))或使用 websocket.receive() 超時檢測,避免長時間閒置的死連線。
  4. 錯誤回傳格式:統一使用 JSON 結構回傳錯誤,前端可直接解析,例如 {"type":"error","code":4001,"msg":"Invalid payload"}
  5. 日誌與監控:記錄每一次連線、斷線與訊息量,配合 Prometheus 或 Grafana 監控 WebSocket 連線數與訊息速率。

實際應用場景

場景 為何使用 WebSocket FastAPI 實作要點
即時聊天室 使用者需即時看到其他人的訊息,且訊息量大 建立 ConnectionManager + 分房間;使用 JWT 驗證確保身分
線上遊戲 (即時位置同步) 每秒鐘多次的座標更新,需要低延遲 使用二進位傳輸 (receive_bytes)、心跳檢測、防止封包堆疊
即時儀表板 後端資料變動時即時推送給前端圖表 透過 broadcast 把更新事件推給所有連線的瀏覽器
文件協同編輯 多人同時編輯同一文件,變更需要即時同步 分房間(每個文件一個房間),使用 json 包裝操作指令
IoT 裝置監控 裝置向伺服器持續回報感測資料,伺服器即時下指令 采用二進位或 JSON,配合 asyncio.Queue 處理高頻訊息

總結

  • WebSocket 為全雙工、持久的即時通訊協議,FastAPI 以非同步、易上手的方式提供完整支援。
  • 透過 ConnectionManager 可以輕鬆管理多條連線、實作廣播或分房間功能;JWTHeader 能為 WebSocket 加上安全層。
  • 開發時要特別留意 斷線清理、非阻塞 I/O、心跳機制,以及 CORS 設定,避免常見的效能與安全問題。
  • 本文的 5 個實作範例涵蓋 回音、廣播、分房間、二進位傳輸、認證,足以作為大多數即時應用的起點。只要再根據實際需求加入資料庫、訊息佇列或分散式 Pub/Sub(如 Redis、Kafka),即可打造 高可用、可擴展 的即時系統。

祝你在 FastAPI 的 WebSocket 世界裡玩得開心,開發出快速、穩定且安全的即時服務! 🎉