本文 AI 產出,尚未審核

FastAPI 教學 – 非同步程式設計(Async Programming)

主題:非同步 ORM 支援


簡介

FastAPI 中,非同步(async / await)已成為打造高效能 API 的核心概念。傳統的同步 ORM(如 SQLAlchemy Core)在處理大量 I/O(資料庫查詢、網路請求)時會阻塞事件迴圈,導致效能瓶頸。為了充分發揮 FastAPI 的非同步特性,我們需要 非同步 ORM 來在同一個執行緒內同時處理多個請求,而不會被單一資料庫操作卡住。

本篇文章將說明什麼是非同步 ORM、如何在 FastAPI 中導入與使用它,以及在實務開發中常見的陷阱與最佳實踐。適合 初學者到中階開發者 快速上手,讓你的 API 從「能跑」變成「跑得快、跑得穩」。


核心概念

1️⃣ 為什麼需要非同步 ORM

同步 ORM 非同步 ORM
每一次 DB 呼叫都會 阻塞 事件迴圈 透過 await 讓事件迴圈在 I/O 時切換任務
需要額外的執行緒或工作池來提升併發 直接在單一執行緒內支援高併發
讀寫大量資料時效能較差 在高流量場景(如即時聊天、金融交易)表現更佳

重點:FastAPI 本身是非同步框架,若僅在路由使用 async def,卻在內部呼叫同步的 ORM,仍會造成「假非同步」的效能問題。


2️⃣ 常見的非同步 ORM

ORM 官方支援 主要特點
Tortoise‑ORM ✅(官方文件) Django‑like ORM、支援 SQLite、PostgreSQL、MySQL
SQLModel (async) ✅(SQLAlchemy 2.0) 結合 Pydantic 與 SQLAlchemy,簡潔且可選擇 async engine
Gino ✅(社群維護) 針對 PostgreSQL 設計,使用 asyncpg,語法類似 SQLAlchemy
Encode/Databases ✅(官方範例) 不是完整 ORM,但提供 async DB 連線與簡易 CRUD

以下將以 Tortoise‑ORMSQLModel (async) 為例,示範如何在 FastAPI 中完成 非同步 CRUD


3️⃣ 設定非同步資料庫連線

3.1 Tortoise‑ORM 基本設定

# db.py
from tortoise import Tortoise, fields, models

class User(models.Model):
    id = fields.IntField(pk=True)
    name = fields.CharField(max_length=50)
    email = fields.CharField(max_length=100, unique=True)

    class Meta:
        table = "users"

async def init_db():
    await Tortoise.init(
        db_url="postgres://user:password@localhost:5432/mydb",
        modules={"models": ["db"]},
    )
    # 只在開發環境自動產生表格,正式環境請使用 migration
    await Tortoise.generate_schemas()

說明

  • await Tortoise.init(...) 會在 事件迴圈 中非同步建立連線池。
  • generate_schemas() 只適合開發階段,正式環境建議使用 Aerich 進行 migration。

3.2 SQLModel (async) 設定

# db_async.py
from sqlmodel import SQLModel, Field, create_engine
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"

engine: AsyncEngine = create_async_engine(DATABASE_URL, echo=True)

class User(SQLModel, table=True):
    id: int = Field(default=None, primary_key=True)
    name: str = Field(max_length=50)
    email: str = Field(max_length=100, unique=True)

async def init_db():
    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)

說明

  • create_async_engine 來自 SQLAlchemy 2.0,配合 asyncpg 完成非同步連線。
  • run_sync 內部會把同步的 create_all 包裝成非同步執行。

4️⃣ 非同步 CRUD 範例

4️⃣1 建立 FastAPI 應用與啟動 DB

# main.py
from fastapi import FastAPI
from db import init_db as init_tortoise, User as TortoiseUser
# or from db_async import init_db as init_sqlmodel, User as SQLModelUser

app = FastAPI()

@app.on_event("startup")
async def on_startup():
    # 依需求切換下面一行
    await init_tortoise()      # Tortoise‑ORM
    # await init_sqlmodel()    # SQLModel (async)

@app.on_event("shutdown")
async def on_shutdown():
    # 若使用 Tortoise,需要關閉連線池
    from tortoise import Tortoise
    await Tortoise.close_connections()

小技巧:把 DB 初始化寫在 startup 事件,可確保在第一個請求前完成連線池建立。


4️⃣2 建立 (Create) 使用者

# routers/user.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel

router = APIRouter(prefix="/users")

class UserCreate(BaseModel):
    name: str
    email: str

@router.post("/", response_model=UserCreate)
async def create_user(payload: UserCreate):
    # Tortoise 範例
    user = await TortoiseUser.create(**payload.dict())
    return user

    # SQLModel (async) 範例
    # async with engine.begin() as conn:
    #     async with conn.begin():
    #         new_user = User.from_orm(payload)
    #         conn.add(new_user)
    #         await conn.flush()
    #         return new_user

重點await Model.create(...) 直接返回 ORM 物件,無需額外的 await session.commit()(Tortoise 內部已處理)。


4️⃣3 取得 (Read) 單筆使用者

@router.get("/{user_id}", response_model=UserCreate)
async def get_user(user_id: int):
    # Tortoise
    user = await TortoiseUser.filter(id=user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

    # SQLModel (async)
    # async with engine.connect() as conn:
    #     result = await conn.execute(select(User).where(User.id == user_id))
    #     user = result.scalar_one_or_none()
    #     if not user:
    #         raise HTTPException(status_code=404, detail="User not found")
    #     return user

技巧:使用 filter(...).first() 可避免拋出 DoesNotExist 例外,讓錯誤處理更簡潔。


4️⃣4 更新 (Update) 使用者

class UserUpdate(BaseModel):
    name: str | None = None
    email: str | None = None

@router.put("/{user_id}", response_model=UserCreate)
async def update_user(user_id: int, payload: UserUpdate):
    # Tortoise
    user = await TortoiseUser.get_or_none(id=user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    update_data = payload.dict(exclude_unset=True)
    await user.update_from_dict(update_data)
    await user.save()
    return user

    # SQLModel (async)
    # async with engine.begin() as conn:
    #     result = await conn.execute(select(User).where(User.id == user_id))
    #     user = result.scalar_one_or_none()
    #     if not user:
    #         raise HTTPException(status_code=404, detail="User not found")
    #     for key, value in payload.dict(exclude_unset=True).items():
    #         setattr(user, key, value)
    #     await conn.commit()
    #     return user

說明exclude_unset=True 只會更新客戶端實際傳送的欄位,避免把未提供的欄位寫成 None


4️⃣5 刪除 (Delete) 使用者

@router.delete("/{user_id}", status_code=204)
async def delete_user(user_id: int):
    # Tortoise
    deleted = await TortoiseUser.filter(id=user_id).delete()
    if not deleted:
        raise HTTPException(status_code=404, detail="User not found")
    return

    # SQLModel (async)
    # async with engine.begin() as conn:
    #     result = await conn.execute(delete(User).where(User.id == user_id))
    #     if result.rowcount == 0:
    #         raise HTTPException(status_code=404, detail="User not found")
    #     await conn.commit()
    #     return

提示delete() 直接回傳受影響的列數,利用它可以快速判斷是否真的刪除成功。


5️⃣ 交易(Transaction)與批次操作

非同步環境下,交易 必須在同一個連線(或 session)內完成,否則會失去原子性。以下示範在 SQLModel 中使用 async with engine.begin() as conn: 包裝多筆寫入:

from sqlmodel import select

async def bulk_create_users(users: list[UserCreate]):
    async with engine.begin() as conn:
        for payload in users:
            user = User.from_orm(payload)
            conn.add(user)
        # transaction 於此自動 commit,若發生例外則自動 rollback

Tortoise 亦提供 in_transaction

from tortoise.transactions import in_transaction

async def bulk_create_tortoise(users: list[UserCreate]):
    async with in_transaction() as conn:
        for payload in users:
            await TortoiseUser.create(**payload.dict())
        # 若拋出例外,transaction 會自動 rollback

常見陷阱與最佳實踐

陷阱 可能的結果 解決方案
async 路由中呼叫同步 ORM 事件迴圈被阻塞,導致請求排隊 確認使用的 ORM 支援 async,或將同步呼叫包在 run_in_threadpoolstarlette.concurrency
忘記在 startup 事件初始化 DB 首次請求出現連線錯誤或延遲 init_db() 放在 @app.on_event("startup"),並在 shutdown 時關閉連線
在同一請求中混用多個連線池 連線資源浪費、交易不一致 統一 使用同一個 ORM/engine;若必須混用,確保交易範圍相同
使用 await 時忘記加 async 定義函式 SyntaxErrorRuntimeWarning 每個使用 await 的函式必須宣告為 async def
在測試環境未切換為測試資料庫 測試資料被寫入正式 DB 用環境變數或 pytest fixture 動態切換 DATABASE_URL,並在測試結束時清除資料

最佳實踐

  1. 模型與 Pydantic 分離

    • 使用 SQLModel 時,模型同時擔任 ORM 與 Pydantic,對於簡單 CRUD 足夠。
    • 若需求較複雜,建議分離 Base(SQLAlchemy)與 Schema(Pydantic)以降低耦合。
  2. 使用 Migration 工具

    • Tortoise → Aerich
    • SQLModel → Alembic(配合 sqlalchemy
    • 永遠不要在生產環境直接呼叫 generate_schemas()
  3. 連線池大小

    • 預設 pool_size=5 可能不足以支撐高併發,根據實際流量調整 maxsize(如 create_async_engine(..., pool_size=20))。
  4. 適度使用 select_related / prefetch_related

    • 在 Tortoise 中,await User.filter(...).prefetch_related("posts") 可一次抓取關聯資料,減少 N+1 查詢。
  5. 錯誤與例外統一處理

    • 使用 FastAPI 的 exception_handler 捕捉 IntegrityErrorDoesNotExist,回傳統一的 JSON 錯誤格式。

實際應用場景

場景 為何需要非同步 ORM 範例實作
即時聊天系統 每秒產生大量訊息寫入 DB,若使用同步 ORM 會導致訊息延遲 使用 Tortoise 的 await Message.create(...),搭配 WebSocket 的 await websocket.send_text()
金融交易平台 必須在毫秒級別內完成交易寫入與查詢,且交易必須原子性 以 SQLModel 的 async with engine.begin() 包住交易,確保同時檢查餘額與扣款
大型報表匯出 報表需要同時讀取多張關聯表,若使用同步會造成大量阻塞 使用 prefetch_related 把所有關聯一次載入,配合 await 並行執行多筆查詢
多租戶 SaaS 每個租戶都有自己的資料庫連線,需動態切換 startup 讀取租戶設定,使用 create_async_engine 動態產生 engine,並在每個請求中注入對應的 engine

總結

  • 非同步 ORM 是讓 FastAPI 真正發揮「非同步」威力的關鍵。
  • 常見選擇有 Tortoise‑ORMSQLModel (async)Gino,依專案需求與資料庫類型挑選最合適的套件。
  • 正確的 初始化、連線池管理、交易處理 能避免大多數效能與穩定性問題。
  • 在實務開發中,避免混用同步與非同步使用 migration統一錯誤處理 是提升可維護性與可靠性的最佳實踐。

掌握了這些概念與技巧,你就能在 FastAPI 中建立 高併發、低延遲 的資料驅動 API,為任何需要即時回應的應用奠定堅實基礎。祝開發順利,期待看到你將非同步 ORM 運用在更多創新專案中! 🚀