FastAPI 教學 – 非同步程式設計(Async Programming)
主題:非同步 ORM 支援
簡介
在 FastAPI 中,非同步(async / await)已成為打造高效能 API 的核心概念。傳統的同步 ORM(如 SQLAlchemy Core)在處理大量 I/O(資料庫查詢、網路請求)時會阻塞事件迴圈,導致效能瓶頸。為了充分發揮 FastAPI 的非同步特性,我們需要 非同步 ORM 來在同一個執行緒內同時處理多個請求,而不會被單一資料庫操作卡住。
本篇文章將說明什麼是非同步 ORM、如何在 FastAPI 中導入與使用它,以及在實務開發中常見的陷阱與最佳實踐。適合 初學者到中階開發者 快速上手,讓你的 API 從「能跑」變成「跑得快、跑得穩」。
核心概念
1️⃣ 為什麼需要非同步 ORM
| 同步 ORM | 非同步 ORM |
|---|---|
| 每一次 DB 呼叫都會 阻塞 事件迴圈 | 透過 await 讓事件迴圈在 I/O 時切換任務 |
| 需要額外的執行緒或工作池來提升併發 | 直接在單一執行緒內支援高併發 |
| 讀寫大量資料時效能較差 | 在高流量場景(如即時聊天、金融交易)表現更佳 |
重點:FastAPI 本身是非同步框架,若僅在路由使用
async def,卻在內部呼叫同步的 ORM,仍會造成「假非同步」的效能問題。
2️⃣ 常見的非同步 ORM
| ORM | 官方支援 | 主要特點 |
|---|---|---|
| Tortoise‑ORM | ✅(官方文件) | Django‑like ORM、支援 SQLite、PostgreSQL、MySQL |
| SQLModel (async) | ✅(SQLAlchemy 2.0) | 結合 Pydantic 與 SQLAlchemy,簡潔且可選擇 async engine |
| Gino | ✅(社群維護) | 針對 PostgreSQL 設計,使用 asyncpg,語法類似 SQLAlchemy |
| Encode/Databases | ✅(官方範例) | 不是完整 ORM,但提供 async DB 連線與簡易 CRUD |
以下將以 Tortoise‑ORM 與 SQLModel (async) 為例,示範如何在 FastAPI 中完成 非同步 CRUD。
3️⃣ 設定非同步資料庫連線
3.1 Tortoise‑ORM 基本設定
# db.py
from tortoise import Tortoise, fields, models
class User(models.Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=50)
email = fields.CharField(max_length=100, unique=True)
class Meta:
table = "users"
async def init_db():
await Tortoise.init(
db_url="postgres://user:password@localhost:5432/mydb",
modules={"models": ["db"]},
)
# 只在開發環境自動產生表格,正式環境請使用 migration
await Tortoise.generate_schemas()
說明
await Tortoise.init(...)會在 事件迴圈 中非同步建立連線池。generate_schemas()只適合開發階段,正式環境建議使用 Aerich 進行 migration。
3.2 SQLModel (async) 設定
# db_async.py
from sqlmodel import SQLModel, Field, create_engine
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"
engine: AsyncEngine = create_async_engine(DATABASE_URL, echo=True)
class User(SQLModel, table=True):
id: int = Field(default=None, primary_key=True)
name: str = Field(max_length=50)
email: str = Field(max_length=100, unique=True)
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
說明
create_async_engine來自 SQLAlchemy 2.0,配合asyncpg完成非同步連線。run_sync內部會把同步的create_all包裝成非同步執行。
4️⃣ 非同步 CRUD 範例
4️⃣1 建立 FastAPI 應用與啟動 DB
# main.py
from fastapi import FastAPI
from db import init_db as init_tortoise, User as TortoiseUser
# or from db_async import init_db as init_sqlmodel, User as SQLModelUser
app = FastAPI()
@app.on_event("startup")
async def on_startup():
# 依需求切換下面一行
await init_tortoise() # Tortoise‑ORM
# await init_sqlmodel() # SQLModel (async)
@app.on_event("shutdown")
async def on_shutdown():
# 若使用 Tortoise,需要關閉連線池
from tortoise import Tortoise
await Tortoise.close_connections()
小技巧:把 DB 初始化寫在
startup事件,可確保在第一個請求前完成連線池建立。
4️⃣2 建立 (Create) 使用者
# routers/user.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
router = APIRouter(prefix="/users")
class UserCreate(BaseModel):
name: str
email: str
@router.post("/", response_model=UserCreate)
async def create_user(payload: UserCreate):
# Tortoise 範例
user = await TortoiseUser.create(**payload.dict())
return user
# SQLModel (async) 範例
# async with engine.begin() as conn:
# async with conn.begin():
# new_user = User.from_orm(payload)
# conn.add(new_user)
# await conn.flush()
# return new_user
重點:
await Model.create(...)直接返回 ORM 物件,無需額外的await session.commit()(Tortoise 內部已處理)。
4️⃣3 取得 (Read) 單筆使用者
@router.get("/{user_id}", response_model=UserCreate)
async def get_user(user_id: int):
# Tortoise
user = await TortoiseUser.filter(id=user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
# SQLModel (async)
# async with engine.connect() as conn:
# result = await conn.execute(select(User).where(User.id == user_id))
# user = result.scalar_one_or_none()
# if not user:
# raise HTTPException(status_code=404, detail="User not found")
# return user
技巧:使用
filter(...).first()可避免拋出DoesNotExist例外,讓錯誤處理更簡潔。
4️⃣4 更新 (Update) 使用者
class UserUpdate(BaseModel):
name: str | None = None
email: str | None = None
@router.put("/{user_id}", response_model=UserCreate)
async def update_user(user_id: int, payload: UserUpdate):
# Tortoise
user = await TortoiseUser.get_or_none(id=user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
update_data = payload.dict(exclude_unset=True)
await user.update_from_dict(update_data)
await user.save()
return user
# SQLModel (async)
# async with engine.begin() as conn:
# result = await conn.execute(select(User).where(User.id == user_id))
# user = result.scalar_one_or_none()
# if not user:
# raise HTTPException(status_code=404, detail="User not found")
# for key, value in payload.dict(exclude_unset=True).items():
# setattr(user, key, value)
# await conn.commit()
# return user
說明:
exclude_unset=True只會更新客戶端實際傳送的欄位,避免把未提供的欄位寫成None。
4️⃣5 刪除 (Delete) 使用者
@router.delete("/{user_id}", status_code=204)
async def delete_user(user_id: int):
# Tortoise
deleted = await TortoiseUser.filter(id=user_id).delete()
if not deleted:
raise HTTPException(status_code=404, detail="User not found")
return
# SQLModel (async)
# async with engine.begin() as conn:
# result = await conn.execute(delete(User).where(User.id == user_id))
# if result.rowcount == 0:
# raise HTTPException(status_code=404, detail="User not found")
# await conn.commit()
# return
提示:
delete()直接回傳受影響的列數,利用它可以快速判斷是否真的刪除成功。
5️⃣ 交易(Transaction)與批次操作
非同步環境下,交易 必須在同一個連線(或 session)內完成,否則會失去原子性。以下示範在 SQLModel 中使用 async with engine.begin() as conn: 包裝多筆寫入:
from sqlmodel import select
async def bulk_create_users(users: list[UserCreate]):
async with engine.begin() as conn:
for payload in users:
user = User.from_orm(payload)
conn.add(user)
# transaction 於此自動 commit,若發生例外則自動 rollback
Tortoise 亦提供 in_transaction:
from tortoise.transactions import in_transaction
async def bulk_create_tortoise(users: list[UserCreate]):
async with in_transaction() as conn:
for payload in users:
await TortoiseUser.create(**payload.dict())
# 若拋出例外,transaction 會自動 rollback
常見陷阱與最佳實踐
| 陷阱 | 可能的結果 | 解決方案 |
|---|---|---|
在 async 路由中呼叫同步 ORM |
事件迴圈被阻塞,導致請求排隊 | 確認使用的 ORM 支援 async,或將同步呼叫包在 run_in_threadpool(starlette.concurrency) |
忘記在 startup 事件初始化 DB |
首次請求出現連線錯誤或延遲 | 把 init_db() 放在 @app.on_event("startup"),並在 shutdown 時關閉連線 |
| 在同一請求中混用多個連線池 | 連線資源浪費、交易不一致 | 統一 使用同一個 ORM/engine;若必須混用,確保交易範圍相同 |
使用 await 時忘記加 async 定義函式 |
SyntaxError 或 RuntimeWarning |
每個使用 await 的函式必須宣告為 async def |
| 在測試環境未切換為測試資料庫 | 測試資料被寫入正式 DB | 用環境變數或 pytest fixture 動態切換 DATABASE_URL,並在測試結束時清除資料 |
最佳實踐
模型與 Pydantic 分離
- 使用 SQLModel 時,模型同時擔任 ORM 與 Pydantic,對於簡單 CRUD 足夠。
- 若需求較複雜,建議分離
Base(SQLAlchemy)與Schema(Pydantic)以降低耦合。
使用 Migration 工具
- Tortoise → Aerich
- SQLModel → Alembic(配合
sqlalchemy) - 永遠不要在生產環境直接呼叫
generate_schemas()。
連線池大小
- 預設
pool_size=5可能不足以支撐高併發,根據實際流量調整maxsize(如create_async_engine(..., pool_size=20))。
- 預設
適度使用
select_related/prefetch_related- 在 Tortoise 中,
await User.filter(...).prefetch_related("posts")可一次抓取關聯資料,減少 N+1 查詢。
- 在 Tortoise 中,
錯誤與例外統一處理
- 使用 FastAPI 的
exception_handler捕捉IntegrityError、DoesNotExist,回傳統一的 JSON 錯誤格式。
- 使用 FastAPI 的
實際應用場景
| 場景 | 為何需要非同步 ORM | 範例實作 |
|---|---|---|
| 即時聊天系統 | 每秒產生大量訊息寫入 DB,若使用同步 ORM 會導致訊息延遲 | 使用 Tortoise 的 await Message.create(...),搭配 WebSocket 的 await websocket.send_text() |
| 金融交易平台 | 必須在毫秒級別內完成交易寫入與查詢,且交易必須原子性 | 以 SQLModel 的 async with engine.begin() 包住交易,確保同時檢查餘額與扣款 |
| 大型報表匯出 | 報表需要同時讀取多張關聯表,若使用同步會造成大量阻塞 | 使用 prefetch_related 把所有關聯一次載入,配合 await 並行執行多筆查詢 |
| 多租戶 SaaS | 每個租戶都有自己的資料庫連線,需動態切換 | 在 startup 讀取租戶設定,使用 create_async_engine 動態產生 engine,並在每個請求中注入對應的 engine |
總結
- 非同步 ORM 是讓 FastAPI 真正發揮「非同步」威力的關鍵。
- 常見選擇有 Tortoise‑ORM、SQLModel (async)、Gino,依專案需求與資料庫類型挑選最合適的套件。
- 正確的 初始化、連線池管理、交易處理 能避免大多數效能與穩定性問題。
- 在實務開發中,避免混用同步與非同步、使用 migration、統一錯誤處理 是提升可維護性與可靠性的最佳實踐。
掌握了這些概念與技巧,你就能在 FastAPI 中建立 高併發、低延遲 的資料驅動 API,為任何需要即時回應的應用奠定堅實基礎。祝開發順利,期待看到你將非同步 ORM 運用在更多創新專案中! 🚀