本文 AI 產出,尚未審核

FastAPI WebSocket 路由定義(@app.websocket

簡介

在現代 Web 應用程式中,即時雙向通訊已成為不可或缺的需求。聊天系統、即時通知、協同編輯、線上遊戲等,都需要伺服器與客戶端在同一條連線上即時交換資料。FastAPI 內建對 WebSocket 的支援,讓開發者能以簡潔的裝飾器 @app.websocket 直接定義 WebSocket 路由,無需額外的框架或繁雜的設定。

本文將從 概念到實作,一步步說明如何在 FastAPI 中使用 @app.websocket,並提供多個實用範例、常見陷阱與最佳實踐,幫助初學者快速上手,同時給予中級開發者在實務專案中可直接套用的技巧。


核心概念

1. WebSocket 基礎

WebSocket 是一種基於 TCP 的協議,允許 單一持久連線 在客戶端與伺服器之間雙向傳輸訊息。與傳統的 HTTP 請求-回應模式不同,WebSocket 只在建立連線時進行一次握手,之後的資料交換不再受限於請求/回應的週期。

2. FastAPI 中的 @app.websocket 裝飾器

在 FastAPI 中,@app.websocket(path)@app.get@app.post 類似,只是它會將對應的路徑註冊為 WebSocket 端點。被裝飾的函式必須接受一個 WebSocket 物件,透過該物件可以執行:

  • await websocket.accept():接受連線(必須先呼叫)。
  • await websocket.receive_text() / await websocket.receive_bytes():接收客戶端訊息。
  • await websocket.send_text(message) / await websocket.send_bytes(data):向客戶端傳送訊息。
  • await websocket.close():關閉連線。

注意:若未先呼叫 accept(),任何 send_*receive_* 操作都會拋出例外。

3. 路由參數與型別驗證

@app.websocket 同樣支援路由參數與 FastAPI 的依賴注入。例如:

@app.websocket("/ws/chat/{room_id}")
async def chat_endpoint(websocket: WebSocket, room_id: int):
    ...

room_id 會自動被轉換為 int,若轉換失敗則會返回 404。


程式碼範例

範例 1️⃣ 基本回聲(Echo)伺服器

最簡單的 WebSocket 範例,只要把收到的文字訊息原封不動回傳給客戶端。

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws/echo")
async def websocket_echo(websocket: WebSocket):
    await websocket.accept()                     # 接受連線
    try:
        while True:
            data = await websocket.receive_text()   # 接收文字訊息
            await websocket.send_text(f"Echo: {data}")  # 回傳
    except Exception as e:                         # 客戶端斷線或其他錯誤
        await websocket.close()

說明

  • while True 形成永續的訊息循環,直到客戶端斷線。
  • 使用 try/except 捕捉例外,確保在斷線時正確關閉連線。

範例 2️⃣ 文字與二進位混合傳輸

有時需要同時處理文字與檔案(例如圖片、音訊),可以分別使用 receive_textreceive_bytes

@app.websocket("/ws/mixed")
async def mixed_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        msg_type = await websocket.receive_json()   # 先接收一個 JSON,說明訊息類型
        if msg_type["type"] == "text":
            text = await websocket.receive_text()
            await websocket.send_text(f"收到文字: {text}")
        elif msg_type["type"] == "binary":
            data = await websocket.receive_bytes()
            # 假設只回傳資料長度給前端
            await websocket.send_text(f"收到 {len(data)} 位元組的檔案")
        else:
            await websocket.send_text("未知的訊息類型")

說明

  • 先以 JSON 格式告訴伺服器接下來的資料類型,避免混淆。
  • receive_bytes 直接取得二進位資料,適合傳輸檔案或音訊流。

範例 3️⃣ 多客戶端廣播(Chat Room)

實作一個簡易的聊天室,所有連入同一個房間的使用者都會收到其他人的訊息。

from typing import List, Dict
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

# 用來保存每個房間的連線列表
rooms: Dict[str, List[WebSocket]] = {}

@app.websocket("/ws/chat/{room_name}")
async def chat_room(websocket: WebSocket, room_name: str):
    await websocket.accept()
    # 加入房間
    if room_name not in rooms:
        rooms[room_name] = []
    rooms[room_name].append(websocket)

    try:
        while True:
            data = await websocket.receive_text()
            # 廣播給同房間的所有連線
            for conn in rooms[room_name]:
                if conn != websocket:  # 不回傳給自己(視需求而定)
                    await conn.send_text(f"[{room_name}] {data}")
    except WebSocketDisconnect:
        # 客戶端斷線時,從房間列表中移除
        rooms[room_name].remove(websocket)
        if not rooms[room_name]:  # 若房間空了,刪除鍵值
            del rooms[room_name]

說明

  • 使用字典 rooms 以房間名稱為鍵,保存同一房間的所有 WebSocket 連線。
  • WebSocketDisconnect 例外在客戶端斷線時自動拋出,我們利用它清理資源。

範例 4️⃣ 結合依賴注入(認證)

在實務上,WebSocket 端點往往需要驗證使用者身份。下面示範如何使用 FastAPI 的依賴注入機制,取得 JWT token 並驗證。

from fastapi import Depends, HTTPException, status
import jwt

SECRET_KEY = "your_secret_key"

def get_current_user(token: str = Depends(...)):
    # 這裡的 Depends(...) 代表從 query、header 或 cookie 取得 token
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return payload["sub"]   # 返回使用者 ID
    except jwt.PyJWTError:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Invalid authentication credentials"
        )

@app.websocket("/ws/protected")
async def protected_ws(websocket: WebSocket, user: str = Depends(get_current_user)):
    await websocket.accept()
    await websocket.send_text(f"歡迎使用者 {user}")
    # 此後可根據 user 進行授權檢查
    while True:
        msg = await websocket.receive_text()
        await websocket.send_text(f"[{user}] {msg}")

說明

  • Depends(get_current_user) 會在建立 WebSocket 之前執行認證邏輯。
  • 若認證失敗,FastAPI 直接回傳 403,客戶端無法建立連線。

範例 5️⃣ 心跳機制(Ping/Pong)避免連線被閒置關閉

WebSocket 連線若長時間沒有訊息傳遞,可能會被防火牆或雲端平台自動斷開。加入 心跳 可以維持連線活躍。

import asyncio

@app.websocket("/ws/heartbeat")
async def heartbeat_endpoint(websocket: WebSocket):
    await websocket.accept()
    async def send_ping():
        while True:
            await asyncio.sleep(10)               # 每 10 秒一次
            try:
                await websocket.send_text("ping")
            except Exception:
                break

    ping_task = asyncio.create_task(send_ping())
    try:
        while True:
            data = await websocket.receive_text()
            if data == "pong":
                continue
            await websocket.send_text(f"回應: {data}")
    except Exception:
        pass
    finally:
        ping_task.cancel()
        await websocket.close()

說明

  • 透過 asyncio.create_task 在背景持續發送 "ping"
  • 客戶端收到後回傳 "pong",確保雙方仍保持活躍。

常見陷阱與最佳實踐

常見問題 原因 解決方式
忘記呼叫 await websocket.accept() WebSocket 必須先接受連線才能收發訊息 在函式一開始 必須 呼叫 await websocket.accept(),否則會拋出 WebSocketException
例外未捕捉導致伺服器崩潰 receive_* 在客戶端斷線時會拋出 WebSocketDisconnect 使用 try/except WebSocketDisconnect 包住訊息迴圈,並在 finally 中關閉連線。
同一房間的廣播造成訊息迴圈 廣播時未排除發送者本身 在廣播迴圈中加入 if conn != websocket:,或根據需求決定是否回傳給自己。
資源泄漏(未移除斷線的 WebSocket) 斷線後忘記從房間列表或全域集合中移除 except WebSocketDisconnectfinally 區塊中執行清理工作。
心跳未同步導致連線被意外關閉 客戶端或伺服器的 ping/pong 時間不一致 兩端協商心跳間隔,並在任何例外發生時立即取消心跳任務。
認證資訊放在 URL query,安全性不足 URL 可能被日誌或瀏覽器快取 儘可能使用 Header(如 Authorization: Bearer <token>)或 Cookie 傳遞敏感資訊。

最佳實踐

  1. 保持連線的生命週期管理:使用 try/except 捕捉 WebSocketDisconnect,在 finally 中關閉連線並清理資源。
  2. 分層設計:把業務邏輯(例如訊息儲存、授權檢查)抽離到服務層,而不是直接寫在路由函式內。
  3. 使用 asyncio.Queue 或 Pub/Sub:在多客戶端或多房間情境下,透過佇列或訊息中介(如 Redis)來分發訊息,避免在單一程式執行緒內直接遍歷大量連線。
  4. 限制訊息大小:為防止惡意攻擊,可在 receive_text 前檢查 websocket.client_state 或設定 max_message_size
  5. 紀錄與監控:使用 logMetrics(Prometheus)追蹤連線數、斷線率與訊息吞吐量,有助於問題排查與容量規劃。

實際應用場景

場景 為何選擇 WebSocket 範例實作要點
即時聊天 需要雙向即時傳遞訊息,且頻率高 使用房間概念、廣播、心跳、訊息持久化(寫入資料庫或 Redis)
線上協作編輯(如 Google Docs) 多人同時編輯同一文件,需要即時同步變更 每次編輯產生的變更以 JSON 傳送,使用 receive_json / send_json,搭配版本號避免衝突
即時儀表板(股票、IoT) 後端資料來源不斷推播更新,前端即時渲染 伺服器端透過背景任務(asyncio.create_task)抓取資料,使用 send_json 推送給所有訂閱的客戶端
線上遊戲 玩家操作需要毫秒級的回饋,且雙向互動頻繁 使用二進位傳輸 (receive_bytes / send_bytes),在伺服器端實作房間或 match‑making 機制
推播通知 手機或瀏覽器需要即時收到訊息,且不想使用輪詢 於使用者登入時建立 WebSocket,伺服器端在有新通知時直接 send_textsend_json 給對方

總結

  • @app.websocket 讓 FastAPI 能以最少的程式碼定義即時雙向通訊端點。
  • 必須先 accept() 連線,並透過 receive_* / send_* 完成訊息的收發。
  • 透過 路由參數、依賴注入,可輕鬆實作認證、房間、分層邏輯。
  • 常見問題多與 資源管理、例外處理、心跳機制 有關,遵循最佳實踐能大幅提升系統穩定性。
  • 在聊天、協作編輯、即時儀表板、線上遊戲等場景中,WebSocket 已成為事實上的標準解決方案。

掌握了上述概念與範例後,你就能在 FastAPI 中自信地建構各式即時應用,讓使用者體驗到 毫秒級回應流暢互動 的現代 Web 服務。祝開發順利! 🚀