本文 AI 產出,尚未審核

FastAPI 課程 – 資料庫整合

主題:非同步 ORM(SQLModel / async SQLAlchemy)


簡介

在現代的 Web 應用程式中,效能可擴充性往往是成功的關鍵。
FastAPI 天生支援非同步(async)程式設計,若資料庫操作也能保持非同步,就能避免 I/O 阻塞,讓每個請求都能被更快地處理。

傳統的 SQLAlchemy(同步模式)在 FastAPI 中仍然可用,但若想發揮 FastAPI 的全部潛能,建議改用 SQLModel(由 FastAPI 團隊打造的簡化版)或 async SQLAlchemy。本篇文章將從概念說明、實作範例、常見陷阱與最佳實踐,帶你一步步完成非同步資料庫整合,適合 初學者 也能滿足 中階開發者 在實務上快速上手的需求。


核心概念

1. 為什麼要使用非同步 ORM?

  • I/O 阻塞最小化:資料庫查詢屬於 I/O 密集型工作,使用 await 可讓事件迴圈在等待回應時切換到其他任務。
  • 更高的併發量:在同一個 FastAPI 執行個體中,同時處理上百甚至上千個請求,而不需要額外的執行緒或進程。
  • 與 FastAPI 完美配合:FastAPI 的路由函式支援 async def,若資料庫操作仍是同步的,會造成「阻塞」的反效果。

2. async SQLAlchemy 基本構成

元件 角色 說明
create_async_engine 建立非同步資料庫引擎 會產生一個支援 asyncpg(PostgreSQL)或 aiomysql(MySQL)等的引擎。
AsyncSession 非同步會話 替代傳統的 Session,必須在 async with 內使用。
async_scoped_session(可選) 作用域會話 在多執行緒/多協程環境中保證會話唯一性。
select / insert / update / delete SQL 表達式 與同步版相同,只是執行時需要 await

3. SQLModel 簡化版

SQLModel 結合了 PydanticSQLAlchemy,讓模型同時具備資料驗證與 ORM 功能。
在 FastAPI 中使用時,只需要:

class User(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    email: str = Field(index=True, unique=True)

SQLModel 內部已支援非同步,只要在建立引擎時使用 create_engine(..., echo=True, future=True, connect_args={"check_same_thread": False}) 並搭配 AsyncSession 即可。


程式碼範例

以下示範 完整的非同步資料庫整合流程,包括模型、引擎、依賴、以及 CRUD API。

範例 1:建立非同步引擎與 Session

# db.py
from sqlmodel import SQLModel, create_engine
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"

# 建立非同步引擎
engine: AsyncEngine = create_async_engine(DATABASE_URL, echo=True)

# 建立非同步 Session 工廠
AsyncSessionLocal = sessionmaker(
    bind=engine,
    class_=AsyncSession,
    expire_on_commit=False,   # 讓取出的資料在 commit 後仍可存取
)

# 依賴注入:在 FastAPI 中取得 session
async def get_session() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        yield session

重點expire_on_commit=False 可以避免在 commit 後資料被自動過期,常見於 API 回傳剛寫入的資料時。

範例 2:定義 SQLModel 模型

# models.py
from typing import Optional
from sqlmodel import SQLModel, Field

class User(SQLModel, table=True):
    """使用者資料表"""
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str = Field(index=True, max_length=50)
    email: str = Field(index=True, unique=True, max_length=120)

SQLModel 同時是 Pydantic 模型,直接可作為 FastAPI 請求或回應的 schema。

範例 3:建立資料表(一次性執行)

# create_tables.py
import asyncio
from sqlmodel import SQLModel
from db import engine
from models import User

async def init_db():
    async with engine.begin() as conn:
        # 若資料表已存在,可改為 `await conn.run_sync(SQLModel.metadata.drop_all)`
        await conn.run_sync(SQLModel.metadata.create_all)

if __name__ == "__main__":
    asyncio.run(init_db())

技巧:使用 engine.begin() 取得一個事務(transaction),在非同步環境下建立/刪除表格最安全。

範例 4:CRUD API(FastAPI)

# main.py
from fastapi import FastAPI, Depends, HTTPException, status
from sqlmodel import select
from typing import List

from db import get_session
from models import User

app = FastAPI(title="FastAPI + async SQLModel Demo")

# ----------------- Create -----------------
@app.post("/users/", response_model=User, status_code=status.HTTP_201_CREATED)
async def create_user(user: User, session=Depends(get_session)):
    session.add(user)
    await session.commit()
    await session.refresh(user)   # 取得自動產生的 ID
    return user

# ----------------- Read (list) -----------------
@app.get("/users/", response_model=List[User])
async def read_users(skip: int = 0, limit: int = 10, session=Depends(get_session)):
    stmt = select(User).offset(skip).limit(limit)
    result = await session.execute(stmt)
    users = result.scalars().all()
    return users

# ----------------- Read (by id) -----------------
@app.get("/users/{user_id}", response_model=User)
async def read_user(user_id: int, session=Depends(get_session)):
    user = await session.get(User, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

# ----------------- Update -----------------
@app.put("/users/{user_id}", response_model=User)
async def update_user(user_id: int, new_data: User, session=Depends(get_session)):
    user = await session.get(User, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    user.name = new_data.name
    user.email = new_data.email
    session.add(user)
    await session.commit()
    await session.refresh(user)
    return user

# ----------------- Delete -----------------
@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int, session=Depends(get_session)):
    user = await session.get(User, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    await session.delete(user)
    await session.commit()
    return None

說明

  • 所有資料庫操作皆使用 await,確保不阻塞事件迴圈。
  • session.refresh() 用來把 DB 端產生的欄位(例如自增 PK)同步回 Python 物件。
  • Depends(get_session) 讓 FastAPI 自動為每個請求建立/釋放 AsyncSession

範例 5:使用 async with 直接執行原始 SQL

# raw_sql.py
from db import engine

async def raw_query_example():
    async with engine.connect() as conn:
        result = await conn.execute(
            "SELECT COUNT(*) FROM users WHERE email LIKE :pattern",
            {"pattern": "%@example.com"},
        )
        count = result.scalar_one()
        print(f"符合條件的使用者有 {count} 位")

這種方式適合需要高度客製化或效能極限的查詢,但仍建議盡量使用 SQLAlchemy 表達式以保持可讀性。


常見陷阱與最佳實踐

陷阱 可能的問題 解決方式
混用同步與非同步 Session 在同一條路由中同時使用 Session(同步)與 AsyncSession,會導致事件迴圈阻塞或 RuntimeError: This event loop is already running 統一使用 AsyncSession,或在需要同步操作時使用 run_in_threadpool 包裝。
忘記 await 忘記在 session.commit()session.execute()session.refresh() 前加 await,程式會回傳 coroutine 而非結果,且不會真正執行。 養成寫程式時先搜尋 await,或使用 IDE 內建的 lint 檢查。
連線池設定不當 預設的 pool size 可能過小,導致高併發時出現 TimeoutError: QueuePool limit of size X overflow Y create_async_engine 時設定 pool_sizemax_overflow,例如 create_async_engine(..., pool_size=20, max_overflow=10)
資料遺失或重複提交 未使用 expire_on_commit=False,或在 commit 後直接存取已過期的物件。 設定 expire_on_commit=False,或在 commit 後立即 await session.refresh(obj)
遷移工具不支援 async Alembic 預設支援同步 SQLAlchemy,直接搬移 async engine 會失敗。 使用 alembic + sqlmodel(同步引擎)產生 migration,然後在正式執行時切換成 async engine;或使用 alembicrun_async 功能。

推薦的最佳實踐

  1. 全程使用 async:從路由、依賴、服務層到測試,都保持 async def,避免「同步炸彈」。
  2. 依賴注入 Session:使用 Depends(get_session),讓每個請求自動取得獨立的連線,避免共享導致競爭條件。
  3. 明確的例外處理:將資料庫例外(如 IntegrityError)轉成 HTTP 例外,讓前端能得到友善的錯誤訊息。
  4. 分離測試資料庫:在 pytest 中使用 sqlite+aiosqlite:///:memory: 或臨時 PostgreSQL Docker 容器,確保測試不干擾正式 DB。
  5. 使用 Pydantic 進行資料驗證:即使模型已是 SQLModel,仍可自行定義 Request/ Response schema,避免直接暴露 ORM 物件。

實際應用場景

場景 為何適合使用 async ORM
即時聊天系統 每秒產生大量寫入/讀取訊息,非同步資料庫能降低延遲,提升使用者體驗。
IoT 數據收集平台 裝置同時上報數千筆感測資料,使用 async 可以在單一服務實例上處理更多連線。
多租戶 SaaS 後端 每個租戶都有獨立的資料表或 schema,併發查詢量大,async 能有效利用資源。
資料分析 API 複雜的聚合查詢常與大量資料交互,使用 await conn.execute() 搭配原始 SQL 可取得最佳效能。
微服務間的資料同步 服務 A 需要在收到訊息後立即寫入 DB,若阻塞會拖慢整條訊息管線,async 可解耦。

總結

非同步 ORMFastAPI 帶來了 高併發、低延遲 的資料存取能力。透過 SQLModelasync SQLAlchemy,開發者可以:

  1. 保持全程 async,避免不必要的阻塞。
  2. 利用依賴注入,讓每個請求自動取得獨立且安全的資料庫連線。
  3. 結合 Pydantic,在同一模型中完成驗證、序列化與 ORM 功能,減少樣板程式碼。
  4. 遵循最佳實踐(正確管理 Session、設定連線池、妥善處理例外),即使在高流量環境下也能穩定運行。

只要掌握上述概念與範例,你就能在 FastAPI 專案中自信地使用非同步資料庫,打造出效能卓越且易於維護的現代 Web API。祝開發順利,玩得開心! 🚀