FastAPI 課程 – 資料庫整合
主題:非同步 ORM(SQLModel / async SQLAlchemy)
簡介
在現代的 Web 應用程式中,效能與可擴充性往往是成功的關鍵。
FastAPI 天生支援非同步(async)程式設計,若資料庫操作也能保持非同步,就能避免 I/O 阻塞,讓每個請求都能被更快地處理。
傳統的 SQLAlchemy(同步模式)在 FastAPI 中仍然可用,但若想發揮 FastAPI 的全部潛能,建議改用 SQLModel(由 FastAPI 團隊打造的簡化版)或 async SQLAlchemy。本篇文章將從概念說明、實作範例、常見陷阱與最佳實踐,帶你一步步完成非同步資料庫整合,適合 初學者 也能滿足 中階開發者 在實務上快速上手的需求。
核心概念
1. 為什麼要使用非同步 ORM?
- I/O 阻塞最小化:資料庫查詢屬於 I/O 密集型工作,使用
await可讓事件迴圈在等待回應時切換到其他任務。 - 更高的併發量:在同一個 FastAPI 執行個體中,同時處理上百甚至上千個請求,而不需要額外的執行緒或進程。
- 與 FastAPI 完美配合:FastAPI 的路由函式支援
async def,若資料庫操作仍是同步的,會造成「阻塞」的反效果。
2. async SQLAlchemy 基本構成
| 元件 | 角色 | 說明 |
|---|---|---|
create_async_engine |
建立非同步資料庫引擎 | 會產生一個支援 asyncpg(PostgreSQL)或 aiomysql(MySQL)等的引擎。 |
AsyncSession |
非同步會話 | 替代傳統的 Session,必須在 async with 內使用。 |
async_scoped_session(可選) |
作用域會話 | 在多執行緒/多協程環境中保證會話唯一性。 |
select / insert / update / delete |
SQL 表達式 | 與同步版相同,只是執行時需要 await。 |
3. SQLModel 簡化版
SQLModel 結合了 Pydantic 與 SQLAlchemy,讓模型同時具備資料驗證與 ORM 功能。
在 FastAPI 中使用時,只需要:
class User(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str
email: str = Field(index=True, unique=True)
SQLModel 內部已支援非同步,只要在建立引擎時使用 create_engine(..., echo=True, future=True, connect_args={"check_same_thread": False}) 並搭配 AsyncSession 即可。
程式碼範例
以下示範 完整的非同步資料庫整合流程,包括模型、引擎、依賴、以及 CRUD API。
範例 1:建立非同步引擎與 Session
# db.py
from sqlmodel import SQLModel, create_engine
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"
# 建立非同步引擎
engine: AsyncEngine = create_async_engine(DATABASE_URL, echo=True)
# 建立非同步 Session 工廠
AsyncSessionLocal = sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False, # 讓取出的資料在 commit 後仍可存取
)
# 依賴注入:在 FastAPI 中取得 session
async def get_session() -> AsyncSession:
async with AsyncSessionLocal() as session:
yield session
重點:
expire_on_commit=False可以避免在commit後資料被自動過期,常見於 API 回傳剛寫入的資料時。
範例 2:定義 SQLModel 模型
# models.py
from typing import Optional
from sqlmodel import SQLModel, Field
class User(SQLModel, table=True):
"""使用者資料表"""
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True, max_length=50)
email: str = Field(index=True, unique=True, max_length=120)
SQLModel 同時是 Pydantic 模型,直接可作為 FastAPI 請求或回應的 schema。
範例 3:建立資料表(一次性執行)
# create_tables.py
import asyncio
from sqlmodel import SQLModel
from db import engine
from models import User
async def init_db():
async with engine.begin() as conn:
# 若資料表已存在,可改為 `await conn.run_sync(SQLModel.metadata.drop_all)`
await conn.run_sync(SQLModel.metadata.create_all)
if __name__ == "__main__":
asyncio.run(init_db())
技巧:使用
engine.begin()取得一個事務(transaction),在非同步環境下建立/刪除表格最安全。
範例 4:CRUD API(FastAPI)
# main.py
from fastapi import FastAPI, Depends, HTTPException, status
from sqlmodel import select
from typing import List
from db import get_session
from models import User
app = FastAPI(title="FastAPI + async SQLModel Demo")
# ----------------- Create -----------------
@app.post("/users/", response_model=User, status_code=status.HTTP_201_CREATED)
async def create_user(user: User, session=Depends(get_session)):
session.add(user)
await session.commit()
await session.refresh(user) # 取得自動產生的 ID
return user
# ----------------- Read (list) -----------------
@app.get("/users/", response_model=List[User])
async def read_users(skip: int = 0, limit: int = 10, session=Depends(get_session)):
stmt = select(User).offset(skip).limit(limit)
result = await session.execute(stmt)
users = result.scalars().all()
return users
# ----------------- Read (by id) -----------------
@app.get("/users/{user_id}", response_model=User)
async def read_user(user_id: int, session=Depends(get_session)):
user = await session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
# ----------------- Update -----------------
@app.put("/users/{user_id}", response_model=User)
async def update_user(user_id: int, new_data: User, session=Depends(get_session)):
user = await session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.name = new_data.name
user.email = new_data.email
session.add(user)
await session.commit()
await session.refresh(user)
return user
# ----------------- Delete -----------------
@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int, session=Depends(get_session)):
user = await session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
await session.delete(user)
await session.commit()
return None
說明
- 所有資料庫操作皆使用
await,確保不阻塞事件迴圈。session.refresh()用來把 DB 端產生的欄位(例如自增 PK)同步回 Python 物件。Depends(get_session)讓 FastAPI 自動為每個請求建立/釋放AsyncSession。
範例 5:使用 async with 直接執行原始 SQL
# raw_sql.py
from db import engine
async def raw_query_example():
async with engine.connect() as conn:
result = await conn.execute(
"SELECT COUNT(*) FROM users WHERE email LIKE :pattern",
{"pattern": "%@example.com"},
)
count = result.scalar_one()
print(f"符合條件的使用者有 {count} 位")
這種方式適合需要高度客製化或效能極限的查詢,但仍建議盡量使用 SQLAlchemy 表達式以保持可讀性。
常見陷阱與最佳實踐
| 陷阱 | 可能的問題 | 解決方式 |
|---|---|---|
| 混用同步與非同步 Session | 在同一條路由中同時使用 Session(同步)與 AsyncSession,會導致事件迴圈阻塞或 RuntimeError: This event loop is already running。 |
統一使用 AsyncSession,或在需要同步操作時使用 run_in_threadpool 包裝。 |
忘記 await |
忘記在 session.commit()、session.execute()、session.refresh() 前加 await,程式會回傳 coroutine 而非結果,且不會真正執行。 |
養成寫程式時先搜尋 await,或使用 IDE 內建的 lint 檢查。 |
| 連線池設定不當 | 預設的 pool size 可能過小,導致高併發時出現 TimeoutError: QueuePool limit of size X overflow Y。 |
在 create_async_engine 時設定 pool_size、max_overflow,例如 create_async_engine(..., pool_size=20, max_overflow=10)。 |
| 資料遺失或重複提交 | 未使用 expire_on_commit=False,或在 commit 後直接存取已過期的物件。 |
設定 expire_on_commit=False,或在 commit 後立即 await session.refresh(obj)。 |
| 遷移工具不支援 async | Alembic 預設支援同步 SQLAlchemy,直接搬移 async engine 會失敗。 | 使用 alembic + sqlmodel(同步引擎)產生 migration,然後在正式執行時切換成 async engine;或使用 alembic 的 run_async 功能。 |
推薦的最佳實踐
- 全程使用 async:從路由、依賴、服務層到測試,都保持
async def,避免「同步炸彈」。 - 依賴注入 Session:使用
Depends(get_session),讓每個請求自動取得獨立的連線,避免共享導致競爭條件。 - 明確的例外處理:將資料庫例外(如
IntegrityError)轉成 HTTP 例外,讓前端能得到友善的錯誤訊息。 - 分離測試資料庫:在
pytest中使用sqlite+aiosqlite:///:memory:或臨時 PostgreSQL Docker 容器,確保測試不干擾正式 DB。 - 使用 Pydantic 進行資料驗證:即使模型已是
SQLModel,仍可自行定義 Request/ Response schema,避免直接暴露 ORM 物件。
實際應用場景
| 場景 | 為何適合使用 async ORM |
|---|---|
| 即時聊天系統 | 每秒產生大量寫入/讀取訊息,非同步資料庫能降低延遲,提升使用者體驗。 |
| IoT 數據收集平台 | 裝置同時上報數千筆感測資料,使用 async 可以在單一服務實例上處理更多連線。 |
| 多租戶 SaaS 後端 | 每個租戶都有獨立的資料表或 schema,併發查詢量大,async 能有效利用資源。 |
| 資料分析 API | 複雜的聚合查詢常與大量資料交互,使用 await conn.execute() 搭配原始 SQL 可取得最佳效能。 |
| 微服務間的資料同步 | 服務 A 需要在收到訊息後立即寫入 DB,若阻塞會拖慢整條訊息管線,async 可解耦。 |
總結
非同步 ORM 為 FastAPI 帶來了 高併發、低延遲 的資料存取能力。透過 SQLModel 或 async SQLAlchemy,開發者可以:
- 保持全程 async,避免不必要的阻塞。
- 利用依賴注入,讓每個請求自動取得獨立且安全的資料庫連線。
- 結合 Pydantic,在同一模型中完成驗證、序列化與 ORM 功能,減少樣板程式碼。
- 遵循最佳實踐(正確管理 Session、設定連線池、妥善處理例外),即使在高流量環境下也能穩定運行。
只要掌握上述概念與範例,你就能在 FastAPI 專案中自信地使用非同步資料庫,打造出效能卓越且易於維護的現代 Web API。祝開發順利,玩得開心! 🚀