本文 AI 產出,尚未審核

FastAPI 資料庫整合 – Transaction 處理


簡介

在 Web API 開發中,資料一致性錯誤恢復是不可或缺的需求。
即使是最簡單的 CRUD 操作,只要涉及多筆資料或跨表寫入,就必須確保「要麼全成功,要麼全失敗」的原子性,這正是 transaction(交易)所提供的保證。

FastAPI 本身是一個非同步(async)框架,搭配 SQLAlchemyTortoise‑ORMGino 等 ORM 時,如何正確使用 transaction 成為開發者必須掌握的技巧。
本篇文章將從概念說明、實作範例、常見陷阱到最佳實踐,帶你一步步在 FastAPI 中安全、有效地使用 transaction。


核心概念

1. Transaction 是什麼?

Transaction 是一組 原子操作(Atomic Operations),具備四大特性(ACID):

特性 說明
Atomicity(原子性) 所有操作要麼全部成功,要麼全部失敗,不能只完成部份。
Consistency(一致性) 完成交易後,資料庫必須處於一致的狀態。
Isolation(隔離性) 同時執行的交易彼此不會互相干擾。
Durability(永久性) 成功提交的交易即使系統崩潰也不會遺失。

在 FastAPI 中,當我們在同一個請求內同時寫入多張表、更新庫存、產生帳單等,若其中任一步驟失敗,就必須回滾(rollback)已經完成的變更,避免產生「半完成」的資料。

2. 為什麼在 FastAPI 要特別注意?

  • 非同步執行
    FastAPI 支援 async def,但大多數 ORM(如 SQLAlchemy)在非同步模式下仍使用 同步的資料庫連線池。若在同一個 request 中混用同步與非同步的 DB 呼叫,容易造成 資料庫鎖死(deadlock)或 連線洩漏

  • 依賴注入(Dependency Injection)
    FastAPI 推薦使用依賴注入(DI)來提供 Session(或 Connection)給路由函式。若 DI 的生命週期管理不當,transaction 可能在請求結束前就被自動關閉,導致 "Session is closed" 錯誤。

  • 例外處理
    FastAPI 會自動將未捕獲的例外轉成 HTTP 500 回應。若在 transaction 內拋出例外而未正確 rollback(),資料庫會保持「未提交」狀態,造成資源浪費。

3. 常見的 Transaction 實作方式

ORM / Driver 同步/非同步 Transaction 實作方式
SQLAlchemy (Core/ORM) 同步
async 需使用 AsyncSession
session.begin(), session.commit(), session.rollback()
Tortoise‑ORM 完全非同步 async with in_transaction() as conn:
Gino 非同步(基於 asyncpg) async with db.acquire() as conn: + await conn.transaction()
Databases (純 async driver) 非同步 async with database.transaction():

以下將以 SQLAlchemy(同步)與 Tortoise‑ORM(非同步)兩種最常見的組合為例,示範如何在 FastAPI 中正確使用 transaction。


程式碼範例

:範例皆以 Python 為主,使用 ````python` 標記語法。

範例 1️⃣:同步 SQLAlchemy + FastAPI 的基本 Transaction

# db.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base

SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()
# models.py
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from .db import Base

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    orders = relationship("Order", back_populates="owner")

class Order(Base):
    __tablename__ = "orders"
    id = Column(Integer, primary_key=True, index=True)
    item = Column(String, index=True)
    user_id = Column(Integer, ForeignKey("users.id"))
    owner = relationship("User", back_populates="orders")
# main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from . import db, models

app = FastAPI()

# 建立資料表(僅示範用,正式環境請使用 Alembic)
models.Base.metadata.create_all(bind=db.engine)

def get_db():
    """FastAPI DI:提供一次請求的 Session 實例"""
    db_session = db.SessionLocal()
    try:
        yield db_session
    finally:
        db_session.close()

@app.post("/users/{user_id}/order")
def create_order(user_id: int, item: str, db: Session = Depends(get_db)):
    """
    為指定使用者新增訂單。若使用者不存在,或寫入失敗,全部回滾。
    """
    try:
        # 1️⃣ 開啟 transaction
        with db.begin():
            user = db.query(models.User).filter(models.User.id == user_id).first()
            if not user:
                raise HTTPException(status_code=404, detail="User not found")
            # 2️⃣ 建立 Order 物件
            new_order = models.Order(item=item, owner=user)
            db.add(new_order)
            # 3️⃣ commit 由 with db.begin() 自動完成
        return {"order_id": new_order.id, "item": new_order.item}
    except Exception as e:
        # 例外會自動觸發 rollback
        raise HTTPException(status_code=500, detail=str(e))

重點說明

  • 使用 with db.begin(): 可確保 自動 commit自動 rollback(發生例外時)。
  • get_db 依賴注入保證每個請求都有獨立的 Session,避免跨請求共享。

範例 2️⃣:非同步 Tortoise‑ORM + FastAPI 的 Transaction

# tortoise_config.py
TORTOISE_ORM = {
    "connections": {"default": "sqlite://db.sqlite3"},
    "apps": {
        "models": {
            "models": ["__main__"],  # 直接在同一檔案定義模型
            "default_connection": "default",
        }
    },
}
# models.py
from tortoise import fields, models

class Customer(models.Model):
    id = fields.IntField(pk=True)
    name = fields.CharField(max_length=100)

class Invoice(models.Model):
    id = fields.IntField(pk=True)
    amount = fields.DecimalField(max_digits=10, decimal_places=2)
    customer = fields.ForeignKeyField("models.Customer", related_name="invoices")
# main.py
import uvicorn
from fastapi import FastAPI, HTTPException
from tortoise.contrib.fastapi import register_tortoise
from tortoise.transactions import in_transaction
from .models import Customer, Invoice
from .tortoise_config import TORTOISE_ORM

app = FastAPI()

register_tortoise(app, config=TORTOISE_ORM, generate_schemas=True)

@app.post("/customers/{cust_id}/invoice")
async def create_invoice(cust_id: int, amount: float):
    """
    為指定客戶新增發票。若客戶不存在或寫入失敗,所有變更會回滾。
    """
    async with in_transaction() as conn:          # ① 開啟 transaction
        customer = await Customer.filter(id=cust_id).using(conn).first()
        if not customer:
            raise HTTPException(status_code=404, detail="Customer not found")
        invoice = await Invoice.create(
            amount=amount,
            customer=customer,
            using=conn                               # ② 確保使用同一個 transaction
        )
        # ③ 若此區塊內拋出例外,conn 會自動 rollback
    return {"invoice_id": invoice.id, "amount": str(invoice.amount)}

重點說明

  • in_transaction()非同步上下文管理器,支援自動 rollback。
  • 必須使用 using=conn 讓所有查詢都走同一個 transaction 連線,否則會產生 跨 transaction 的問題。

範例 3️⃣:多層 Service 與 Repository,Transaction 只在最外層控制

在實務專案中,我們常把資料庫操作封裝成 Repository,業務邏輯放在 Service,而 API 層只負責請求/回應。
下面示範如何在 Service 層統一管理 transaction,讓 Repository 完全不感知 transaction 的細節。

# repository.py
from sqlalchemy.orm import Session
from . import models

class UserRepo:
    @staticmethod
    def get_by_id(db: Session, user_id: int):
        return db.query(models.User).filter(models.User.id == user_id).first()

    @staticmethod
    def add_order(db: Session, user: models.User, item: str):
        order = models.Order(item=item, owner=user)
        db.add(order)
        return order
# service.py
from sqlalchemy.orm import Session
from fastapi import HTTPException
from . import repository

class OrderService:
    @staticmethod
    def create_order(db: Session, user_id: int, item: str):
        """
        只在此方法裡開啟 transaction,確保所有 repo 操作同時成功或失敗。
        """
        try:
            with db.begin():
                user = repository.UserRepo.get_by_id(db, user_id)
                if not user:
                    raise HTTPException(status_code=404, detail="User not found")
                order = repository.UserRepo.add_order(db, user, item)
            # transaction 已自動 commit
            return order
        except Exception as exc:
            # 例外會觸發 rollback,並向上拋出
            raise HTTPException(status_code=500, detail=str(exc))
# main.py
from fastapi import FastAPI, Depends
from .db import get_db
from .service import OrderService

app = FastAPI()

@app.post("/order")
def api_create_order(user_id: int, item: str, db: Session = Depends(get_db)):
    order = OrderService.create_order(db, user_id, item)
    return {"order_id": order.id, "item": order.item}

重點說明

  • Transaction 只在 Service 層,讓 Repository 保持「純粹」的 CRUD 操作。
  • 這樣的設計提升 可測試性:在單元測試時可以直接呼叫 Repository 而不必擔心 transaction。

範例 4️⃣:使用 databases 套件的純 async Transaction(適合 FastAPI + PostgreSQL)

# db_async.py
from databases import Database

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/testdb"
database = Database(DATABASE_URL)

async def get_database():
    """FastAPI DI – 每次請求取得同一個 Database 物件"""
    if not database.is_connected:
        await database.connect()
    return database
# main.py
from fastapi import FastAPI, Depends, HTTPException
from .db_async import get_database, database
from pydantic import BaseModel

app = FastAPI()

class Transfer(BaseModel):
    from_account: int
    to_account: int
    amount: float

@app.post("/transfer")
async def transfer_funds(payload: Transfer, db: Database = Depends(get_database)):
    """
    銀行轉帳範例:兩筆扣/加帳務必在同一 transaction 中完成。
    """
    async with db.transaction() as conn:            # ① 開啟 transaction
        # ② 讀取來源帳戶餘額
        query = "SELECT balance FROM accounts WHERE id = :id FOR UPDATE"
        from_balance = await conn.fetch_one(query, values={"id": payload.from_account})
        if not from_balance or from_balance["balance"] < payload.amount:
            raise HTTPException(status_code=400, detail="Insufficient funds")

        # ③ 扣除來源帳戶
        await conn.execute(
            "UPDATE accounts SET balance = balance - :amt WHERE id = :id",
            values={"amt": payload.amount, "id": payload.from_account},
        )
        # ④ 加到目標帳戶
        await conn.execute(
            "UPDATE accounts SET balance = balance + :amt WHERE id = :id",
            values={"amt": payload.amount, "id": payload.to_account},
        )
        # 若此區塊內任意一步拋出例外,conn 會自動 rollback
    return {"status": "success"}

重點說明

  • FOR UPDATE 鎖住被讀取的列,避免同時多個請求競爭導致 超賣(oversell)。
  • async with db.transaction() 同樣提供 自動 commit / rollback 機制。

範例 5️⃣:跨資料庫(PostgreSQL + Redis)結合 Transaction(兩段式提交)

雖然大多數應用只需要單一資料庫的 transaction,但有時會同時寫入 關係型資料庫快取系統(如 Redis)。以下示範如何使用 兩段式提交(Two‑Phase Commit, 2PC),確保兩者一致性。

# redis_client.py
import aioredis

redis = aioredis.from_url("redis://localhost", decode_responses=True)

async def set_cache(key: str, value: str):
    await redis.set(key, value)
# main.py
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy.orm import Session
from .db import get_db
from .redis_client import set_cache, redis
from . import models

app = FastAPI()

@app.post("/users/{user_id}/profile")
async def update_profile(user_id: int, name: str, db: Session = Depends(get_db)):
    """
    同時更新 PostgreSQL 的使用者名稱與 Redis 快取。
    使用兩段式提交確保兩者要麼同時成功,要麼同時失敗。
    """
    try:
        # 1️⃣ 開啟 DB transaction
        with db.begin() as txn:
            user = db.query(models.User).filter(models.User.id == user_id).first()
            if not user:
                raise HTTPException(status_code=404, detail="User not found")
            user.name = name
            db.add(user)                     # DB 端的變更

            # 2️⃣ 準備 Redis 操作(先寫入暫存 key)
            temp_key = f"temp:user:{user_id}"
            await set_cache(temp_key, name)

            # 3️⃣ 提交 DB transaction 前,檢查 Redis 是否成功
            if not await redis.exists(temp_key):
                raise RuntimeError("Redis write failed")

            # 4️⃣ DB commit 成功後,將暫存 key 換成正式 key
            await redis.rename(temp_key, f"user:{user_id}")

        # 若 DB commit 失敗,with 區塊會自動 rollback,Redis 暫存 key 仍會被刪除
        return {"msg": "Profile updated"}
    except Exception as exc:
        # 若任何一步失敗,清理 Redis 暫存 key
        await redis.delete(temp_key)
        raise HTTPException(status_code=500, detail=str(exc))

重點說明

  • 先在 Redis 寫入暫存 key,確保寫入成功後才正式提交 DB transaction。
  • 若 DB commit 失敗,暫存 key 會在 except 區塊中被刪除,避免「快取與資料庫不一致」的情況。

常見陷阱與最佳實踐

陷阱 說明 解決方案
忘記 await(非同步) 在 async function 中直接呼叫 in_transaction() 而未加 await,會得到 coroutine object 而非實際執行結果。 務必 使用 async withawait,例如 async with in_transaction() as conn:
with db.begin() 之外使用 db.add() 若在 with 區塊外加入資料,會在 transaction 已 commit 後才寫入,導致 資料不一致 所有寫入操作必須 在同一個 transaction 區塊內 完成。
混用同步與非同步 Session 同一請求內同時使用 Session(同步)與 AsyncSession,會產生 死鎖連線衝突 統一 使用同步或非同步的 ORM;若需混用,確保兩者在不同執行緒/協程中。
忽略例外導致未 rollback 捕捉例外後忘記呼叫 session.rollback(),transaction 仍保持開啟狀態。 使用 上下文管理器 (with db.begin():) 或在 except 中手動 rollback()
在多執行緒環境中共用 Session FastAPI 內部使用 非同步,但若自行建立全域 Session 物件,會在多請求間共享,導致 資料污染 每個請求 都透過依賴注入產生 獨立 Session。
忘記 FOR UPDATE(行鎖) 高併發寫入同一筆資料時,未加行鎖會產生 競爭條件(race condition)。 在 SELECT 時加上 FOR UPDATE(或 ORM 等價寫法)以保證排他性。

推薦的最佳實踐

  1. 使用上下文管理器with db.begin():async with in_transaction(): 為最安全的寫法,能自動處理 commit / rollback。
  2. 將 Transaction 放在最外層:在 Service 或 API 層統一控制,避免在 Repository 中散落多個 transaction。
  3. 明確設定 DI 生命週期yield 的依賴函式要在 finally 中關閉連線,防止連線洩漏。
  4. 寫測試:使用 pytest + TestClient,模擬失敗情境(如 raise Exception)驗證資料是否真的回滾。
  5. 監控與日誌:在 transaction 開始、提交、回滾時記錄日誌,便於事後追蹤問題。
import logging
logger = logging.getLogger("app.db")

with db.begin():
    logger.info("Transaction start")
    # ... DB 操作
    logger.info("Transaction commit")
# 若例外
except Exception as e:
    logger.error(f"Transaction rollback: {e}")
    raise

實際應用場景

場景 為何需要 Transaction 範例
訂單系統:新增訂單 + 減少庫存 + 記錄交易日志 必須保證「庫存扣除」與「訂單建立」同步成功,否則會出現「已下單但庫存未扣」的問題。 範例 1、2 中的 order 實作。
金融轉帳:從 A 扣款、向 B 加款 任何一步失敗都會導致資金不平衡。 範例 4 中的 transfer_funds
批次匯入:一次性寫入上千筆資料 若途中發生錯誤,整批資料必須全部回滾,避免半寫入造成資料污染。 使用 session.bulk_save_objects() 包在 with db.begin(): 中。
快取同步:寫入 DB 同時更新 Redis 若只更新 DB 而忘記刷新快取,讀取時會得到舊資料。 範例 5 中的 兩段式提交
多服務協調(微服務) 需要跨服務的最終一致性(例如使用 Saga Pattern),每個服務內部仍需 transaction 保證本地一致性。 在每個微服務內部使用上述 transaction 方法,外層再以訊息佇列協調。

總結

  • Transaction 是保證資料完整性與一致性的核心機制,在 FastAPI 中使用時必須考慮 非同步依賴注入例外處理 三大要點。
  • 透過 上下文管理器with db.begin() / async with in_transaction())可以讓 commit / rollback 自動化,減少程式碼錯誤。
  • 服務層(Service) 統一管理 transaction,讓資料存取層(Repository)保持純粹,提升測試性與可維護性。
  • 常見陷阱包括忘記 await、混用同步/非同步 Session、未加行鎖等,解決方式皆圍繞「明確、統一、可觀測」的原則。
  • 在實務上,從 訂單系統金融轉帳批次匯入快取同步,transaction 都是不可或缺的基礎建設。

掌握了上述概念與實作技巧,你就能在 FastAPI 專案中寫出 安全、可靠且易於維護 的資料庫操作程式碼。祝開發順利,寫出高品質的 API!