FastAPI 資料庫整合 – Transaction 處理
簡介
在 Web API 開發中,資料一致性與錯誤恢復是不可或缺的需求。
即使是最簡單的 CRUD 操作,只要涉及多筆資料或跨表寫入,就必須確保「要麼全成功,要麼全失敗」的原子性,這正是 transaction(交易)所提供的保證。
FastAPI 本身是一個非同步(async)框架,搭配 SQLAlchemy、Tortoise‑ORM、Gino 等 ORM 時,如何正確使用 transaction 成為開發者必須掌握的技巧。
本篇文章將從概念說明、實作範例、常見陷阱到最佳實踐,帶你一步步在 FastAPI 中安全、有效地使用 transaction。
核心概念
1. Transaction 是什麼?
Transaction 是一組 原子操作(Atomic Operations),具備四大特性(ACID):
| 特性 | 說明 |
|---|---|
| Atomicity(原子性) | 所有操作要麼全部成功,要麼全部失敗,不能只完成部份。 |
| Consistency(一致性) | 完成交易後,資料庫必須處於一致的狀態。 |
| Isolation(隔離性) | 同時執行的交易彼此不會互相干擾。 |
| Durability(永久性) | 成功提交的交易即使系統崩潰也不會遺失。 |
在 FastAPI 中,當我們在同一個請求內同時寫入多張表、更新庫存、產生帳單等,若其中任一步驟失敗,就必須回滾(rollback)已經完成的變更,避免產生「半完成」的資料。
2. 為什麼在 FastAPI 要特別注意?
非同步執行
FastAPI 支援async def,但大多數 ORM(如 SQLAlchemy)在非同步模式下仍使用 同步的資料庫連線池。若在同一個 request 中混用同步與非同步的 DB 呼叫,容易造成 資料庫鎖死(deadlock)或 連線洩漏。依賴注入(Dependency Injection)
FastAPI 推薦使用依賴注入(DI)來提供Session(或Connection)給路由函式。若 DI 的生命週期管理不當,transaction 可能在請求結束前就被自動關閉,導致 "Session is closed" 錯誤。例外處理
FastAPI 會自動將未捕獲的例外轉成 HTTP 500 回應。若在 transaction 內拋出例外而未正確rollback(),資料庫會保持「未提交」狀態,造成資源浪費。
3. 常見的 Transaction 實作方式
| ORM / Driver | 同步/非同步 | Transaction 實作方式 |
|---|---|---|
| SQLAlchemy (Core/ORM) | 同步 ( async 需使用 AsyncSession) |
session.begin(), session.commit(), session.rollback() |
| Tortoise‑ORM | 完全非同步 | async with in_transaction() as conn: |
| Gino | 非同步(基於 asyncpg) | async with db.acquire() as conn: + await conn.transaction() |
| Databases (純 async driver) | 非同步 | async with database.transaction(): |
以下將以 SQLAlchemy(同步)與 Tortoise‑ORM(非同步)兩種最常見的組合為例,示範如何在 FastAPI 中正確使用 transaction。
程式碼範例
註:範例皆以 Python 為主,使用 ````python` 標記語法。
範例 1️⃣:同步 SQLAlchemy + FastAPI 的基本 Transaction
# db.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# models.py
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from .db import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
orders = relationship("Order", back_populates="owner")
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True, index=True)
item = Column(String, index=True)
user_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="orders")
# main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from . import db, models
app = FastAPI()
# 建立資料表(僅示範用,正式環境請使用 Alembic)
models.Base.metadata.create_all(bind=db.engine)
def get_db():
"""FastAPI DI:提供一次請求的 Session 實例"""
db_session = db.SessionLocal()
try:
yield db_session
finally:
db_session.close()
@app.post("/users/{user_id}/order")
def create_order(user_id: int, item: str, db: Session = Depends(get_db)):
"""
為指定使用者新增訂單。若使用者不存在,或寫入失敗,全部回滾。
"""
try:
# 1️⃣ 開啟 transaction
with db.begin():
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# 2️⃣ 建立 Order 物件
new_order = models.Order(item=item, owner=user)
db.add(new_order)
# 3️⃣ commit 由 with db.begin() 自動完成
return {"order_id": new_order.id, "item": new_order.item}
except Exception as e:
# 例外會自動觸發 rollback
raise HTTPException(status_code=500, detail=str(e))
重點說明
- 使用
with db.begin():可確保 自動 commit 或 自動 rollback(發生例外時)。 get_db依賴注入保證每個請求都有獨立的 Session,避免跨請求共享。
範例 2️⃣:非同步 Tortoise‑ORM + FastAPI 的 Transaction
# tortoise_config.py
TORTOISE_ORM = {
"connections": {"default": "sqlite://db.sqlite3"},
"apps": {
"models": {
"models": ["__main__"], # 直接在同一檔案定義模型
"default_connection": "default",
}
},
}
# models.py
from tortoise import fields, models
class Customer(models.Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=100)
class Invoice(models.Model):
id = fields.IntField(pk=True)
amount = fields.DecimalField(max_digits=10, decimal_places=2)
customer = fields.ForeignKeyField("models.Customer", related_name="invoices")
# main.py
import uvicorn
from fastapi import FastAPI, HTTPException
from tortoise.contrib.fastapi import register_tortoise
from tortoise.transactions import in_transaction
from .models import Customer, Invoice
from .tortoise_config import TORTOISE_ORM
app = FastAPI()
register_tortoise(app, config=TORTOISE_ORM, generate_schemas=True)
@app.post("/customers/{cust_id}/invoice")
async def create_invoice(cust_id: int, amount: float):
"""
為指定客戶新增發票。若客戶不存在或寫入失敗,所有變更會回滾。
"""
async with in_transaction() as conn: # ① 開啟 transaction
customer = await Customer.filter(id=cust_id).using(conn).first()
if not customer:
raise HTTPException(status_code=404, detail="Customer not found")
invoice = await Invoice.create(
amount=amount,
customer=customer,
using=conn # ② 確保使用同一個 transaction
)
# ③ 若此區塊內拋出例外,conn 會自動 rollback
return {"invoice_id": invoice.id, "amount": str(invoice.amount)}
重點說明
in_transaction()為 非同步上下文管理器,支援自動 rollback。- 必須使用
using=conn讓所有查詢都走同一個 transaction 連線,否則會產生 跨 transaction 的問題。
範例 3️⃣:多層 Service 與 Repository,Transaction 只在最外層控制
在實務專案中,我們常把資料庫操作封裝成 Repository,業務邏輯放在 Service,而 API 層只負責請求/回應。
下面示範如何在 Service 層統一管理 transaction,讓 Repository 完全不感知 transaction 的細節。
# repository.py
from sqlalchemy.orm import Session
from . import models
class UserRepo:
@staticmethod
def get_by_id(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
@staticmethod
def add_order(db: Session, user: models.User, item: str):
order = models.Order(item=item, owner=user)
db.add(order)
return order
# service.py
from sqlalchemy.orm import Session
from fastapi import HTTPException
from . import repository
class OrderService:
@staticmethod
def create_order(db: Session, user_id: int, item: str):
"""
只在此方法裡開啟 transaction,確保所有 repo 操作同時成功或失敗。
"""
try:
with db.begin():
user = repository.UserRepo.get_by_id(db, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
order = repository.UserRepo.add_order(db, user, item)
# transaction 已自動 commit
return order
except Exception as exc:
# 例外會觸發 rollback,並向上拋出
raise HTTPException(status_code=500, detail=str(exc))
# main.py
from fastapi import FastAPI, Depends
from .db import get_db
from .service import OrderService
app = FastAPI()
@app.post("/order")
def api_create_order(user_id: int, item: str, db: Session = Depends(get_db)):
order = OrderService.create_order(db, user_id, item)
return {"order_id": order.id, "item": order.item}
重點說明
- Transaction 只在 Service 層,讓 Repository 保持「純粹」的 CRUD 操作。
- 這樣的設計提升 可測試性:在單元測試時可以直接呼叫 Repository 而不必擔心 transaction。
範例 4️⃣:使用 databases 套件的純 async Transaction(適合 FastAPI + PostgreSQL)
# db_async.py
from databases import Database
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/testdb"
database = Database(DATABASE_URL)
async def get_database():
"""FastAPI DI – 每次請求取得同一個 Database 物件"""
if not database.is_connected:
await database.connect()
return database
# main.py
from fastapi import FastAPI, Depends, HTTPException
from .db_async import get_database, database
from pydantic import BaseModel
app = FastAPI()
class Transfer(BaseModel):
from_account: int
to_account: int
amount: float
@app.post("/transfer")
async def transfer_funds(payload: Transfer, db: Database = Depends(get_database)):
"""
銀行轉帳範例:兩筆扣/加帳務必在同一 transaction 中完成。
"""
async with db.transaction() as conn: # ① 開啟 transaction
# ② 讀取來源帳戶餘額
query = "SELECT balance FROM accounts WHERE id = :id FOR UPDATE"
from_balance = await conn.fetch_one(query, values={"id": payload.from_account})
if not from_balance or from_balance["balance"] < payload.amount:
raise HTTPException(status_code=400, detail="Insufficient funds")
# ③ 扣除來源帳戶
await conn.execute(
"UPDATE accounts SET balance = balance - :amt WHERE id = :id",
values={"amt": payload.amount, "id": payload.from_account},
)
# ④ 加到目標帳戶
await conn.execute(
"UPDATE accounts SET balance = balance + :amt WHERE id = :id",
values={"amt": payload.amount, "id": payload.to_account},
)
# 若此區塊內任意一步拋出例外,conn 會自動 rollback
return {"status": "success"}
重點說明
FOR UPDATE鎖住被讀取的列,避免同時多個請求競爭導致 超賣(oversell)。async with db.transaction()同樣提供 自動 commit / rollback 機制。
範例 5️⃣:跨資料庫(PostgreSQL + Redis)結合 Transaction(兩段式提交)
雖然大多數應用只需要單一資料庫的 transaction,但有時會同時寫入 關係型資料庫 與 快取系統(如 Redis)。以下示範如何使用 兩段式提交(Two‑Phase Commit, 2PC),確保兩者一致性。
# redis_client.py
import aioredis
redis = aioredis.from_url("redis://localhost", decode_responses=True)
async def set_cache(key: str, value: str):
await redis.set(key, value)
# main.py
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy.orm import Session
from .db import get_db
from .redis_client import set_cache, redis
from . import models
app = FastAPI()
@app.post("/users/{user_id}/profile")
async def update_profile(user_id: int, name: str, db: Session = Depends(get_db)):
"""
同時更新 PostgreSQL 的使用者名稱與 Redis 快取。
使用兩段式提交確保兩者要麼同時成功,要麼同時失敗。
"""
try:
# 1️⃣ 開啟 DB transaction
with db.begin() as txn:
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.name = name
db.add(user) # DB 端的變更
# 2️⃣ 準備 Redis 操作(先寫入暫存 key)
temp_key = f"temp:user:{user_id}"
await set_cache(temp_key, name)
# 3️⃣ 提交 DB transaction 前,檢查 Redis 是否成功
if not await redis.exists(temp_key):
raise RuntimeError("Redis write failed")
# 4️⃣ DB commit 成功後,將暫存 key 換成正式 key
await redis.rename(temp_key, f"user:{user_id}")
# 若 DB commit 失敗,with 區塊會自動 rollback,Redis 暫存 key 仍會被刪除
return {"msg": "Profile updated"}
except Exception as exc:
# 若任何一步失敗,清理 Redis 暫存 key
await redis.delete(temp_key)
raise HTTPException(status_code=500, detail=str(exc))
重點說明
- 先在 Redis 寫入暫存 key,確保寫入成功後才正式提交 DB transaction。
- 若 DB commit 失敗,暫存 key 會在
except區塊中被刪除,避免「快取與資料庫不一致」的情況。
常見陷阱與最佳實踐
| 陷阱 | 說明 | 解決方案 |
|---|---|---|
忘記 await(非同步) |
在 async function 中直接呼叫 in_transaction() 而未加 await,會得到 coroutine object 而非實際執行結果。 |
務必 使用 async with 或 await,例如 async with in_transaction() as conn: |
在 with db.begin() 之外使用 db.add() |
若在 with 區塊外加入資料,會在 transaction 已 commit 後才寫入,導致 資料不一致。 |
所有寫入操作必須 在同一個 transaction 區塊內 完成。 |
| 混用同步與非同步 Session | 同一請求內同時使用 Session(同步)與 AsyncSession,會產生 死鎖 或 連線衝突。 |
統一 使用同步或非同步的 ORM;若需混用,確保兩者在不同執行緒/協程中。 |
| 忽略例外導致未 rollback | 捕捉例外後忘記呼叫 session.rollback(),transaction 仍保持開啟狀態。 |
使用 上下文管理器 (with db.begin():) 或在 except 中手動 rollback()。 |
| 在多執行緒環境中共用 Session | FastAPI 內部使用 非同步,但若自行建立全域 Session 物件,會在多請求間共享,導致 資料污染。 | 每個請求 都透過依賴注入產生 獨立 Session。 |
忘記 FOR UPDATE(行鎖) |
高併發寫入同一筆資料時,未加行鎖會產生 競爭條件(race condition)。 | 在 SELECT 時加上 FOR UPDATE(或 ORM 等價寫法)以保證排他性。 |
推薦的最佳實踐
- 使用上下文管理器:
with db.begin():、async with in_transaction():為最安全的寫法,能自動處理 commit / rollback。 - 將 Transaction 放在最外層:在 Service 或 API 層統一控制,避免在 Repository 中散落多個 transaction。
- 明確設定 DI 生命週期:
yield的依賴函式要在finally中關閉連線,防止連線洩漏。 - 寫測試:使用 pytest + TestClient,模擬失敗情境(如 raise Exception)驗證資料是否真的回滾。
- 監控與日誌:在 transaction 開始、提交、回滾時記錄日誌,便於事後追蹤問題。
import logging
logger = logging.getLogger("app.db")
with db.begin():
logger.info("Transaction start")
# ... DB 操作
logger.info("Transaction commit")
# 若例外
except Exception as e:
logger.error(f"Transaction rollback: {e}")
raise
實際應用場景
| 場景 | 為何需要 Transaction | 範例 |
|---|---|---|
| 訂單系統:新增訂單 + 減少庫存 + 記錄交易日志 | 必須保證「庫存扣除」與「訂單建立」同步成功,否則會出現「已下單但庫存未扣」的問題。 | 範例 1、2 中的 order 實作。 |
| 金融轉帳:從 A 扣款、向 B 加款 | 任何一步失敗都會導致資金不平衡。 | 範例 4 中的 transfer_funds。 |
| 批次匯入:一次性寫入上千筆資料 | 若途中發生錯誤,整批資料必須全部回滾,避免半寫入造成資料污染。 | 使用 session.bulk_save_objects() 包在 with db.begin(): 中。 |
| 快取同步:寫入 DB 同時更新 Redis | 若只更新 DB 而忘記刷新快取,讀取時會得到舊資料。 | 範例 5 中的 兩段式提交。 |
| 多服務協調(微服務) | 需要跨服務的最終一致性(例如使用 Saga Pattern),每個服務內部仍需 transaction 保證本地一致性。 | 在每個微服務內部使用上述 transaction 方法,外層再以訊息佇列協調。 |
總結
- Transaction 是保證資料完整性與一致性的核心機制,在 FastAPI 中使用時必須考慮 非同步、依賴注入 與 例外處理 三大要點。
- 透過 上下文管理器(
with db.begin()/async with in_transaction())可以讓 commit / rollback 自動化,減少程式碼錯誤。 - 服務層(Service) 統一管理 transaction,讓資料存取層(Repository)保持純粹,提升測試性與可維護性。
- 常見陷阱包括忘記
await、混用同步/非同步 Session、未加行鎖等,解決方式皆圍繞「明確、統一、可觀測」的原則。 - 在實務上,從 訂單系統、金融轉帳、批次匯入 到 快取同步,transaction 都是不可或缺的基礎建設。
掌握了上述概念與實作技巧,你就能在 FastAPI 專案中寫出 安全、可靠且易於維護 的資料庫操作程式碼。祝開發順利,寫出高品質的 API!