FastAPI 依賴注入系統(Dependency Injection)
DI 與快取、連線池整合
簡介
在 FastAPI 中,依賴注入(Dependency Injection, DI)不只是為了把函式參數抽離成可重用的模組,更是打造 高效能、可維護 API 的關鍵。當應用程式需要與資料庫、Redis、外部 API 等資源頻繁互動時,若每一次請求都重新建立連線,將會造成 資源浪費、延遲增加,甚至觸發服務端的連線上限。
透過 DI,我們可以把 快取(Cache)與 連線池(Connection Pool)等資源的建立與釋放交給框架管理,使得:
- 單例資源 只會在應用啟動時建立一次,減少重複開銷。
- 請求範圍(request‑scoped)資源在每個請求結束後自動關閉,避免資源泄漏。
- 測試 時可以輕鬆替換成 mock 物件,提高測試效率與可靠度。
以下將從概念說明、實作範例、常見陷阱與最佳實踐,帶你一步步把快取與連線池整合進 FastAPI 的 DI 系統。
核心概念
1. 依賴注入的三種生命週期
| 生命週期 | 說明 | 典型使用情境 |
|---|---|---|
singleton (全域) |
應用程式啟動時建立一次,整個服務期間共用。 | 資料庫引擎、Redis 連線池、設定檔 |
scoped (請求範圍) |
每一次 HTTP 請求建立一次,請求結束時自動清理。 | 事務 (transaction) 物件、Session、臨時快取 |
transient (每次呼叫) |
每次依賴被注入時都會重新建立。 | 輕量級工具類別、驗證器 |
FastAPI 內建的 Depends 會依照 函式的返回值 來決定生命週期:如果返回的是 單例(例如 engine = create_engine(...)),則會被視為全域;如果返回的是 async with 區塊或 yield 的生成器,則會被當作 請求範圍。
2. 為什麼要把快取與連線池放入 DI?
- 集中管理:所有路由只需要宣告
Depends(get_redis),而不必在每個端點手動呼叫redis = redis_client()。 - 自動關閉:使用
yield的依賴可以在請求結束時自動執行清理程式,避免忘記close()。 - 測試友善:測試時只要提供另一個
Depends(例如get_redis_mock),即可完整替換,不影響原始程式碼。
3. 快取(Cache)與連線池(Connection Pool)的常見選擇
| 類別 | 套件 | 特色 |
|---|---|---|
| Redis(分散式快取) | redis-py、aioredis |
支援非同步、Pub/Sub、TTL |
| Memcached | pymemcache、aiomcache |
輕量、純快取 |
| 資料庫連線池 | SQLAlchemy(create_engine 自帶 pool)databases(非同步) |
支援同步/非同步、池大小調整 |
| HTTP 連線池 | httpx.AsyncClient |
重用 TCP 連線、支援 HTTP/2 |
以下範例將以 SQLAlchemy + Redis 為例,示範如何把它們納入 DI 系統。
程式碼範例
註解:所有範例均使用 Python 3.11、FastAPI 0.110+,並以非同步模式為主。
範例 1️⃣:建立全域的資料庫引擎(singleton)
# db.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"
# 建立引擎,同時啟用連線池(pool_size=10, max_overflow=20)
engine = create_async_engine(
DATABASE_URL,
echo=False,
pool_size=10,
max_overflow=20,
pool_pre_ping=True, # 防止 idle 連線被 DB server 關閉
)
# 產生 SessionFactory,使用 async session
AsyncSessionLocal = sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False,
)
範例 2️⃣:把 Session 包裝成 request‑scoped 依賴(scoped)
# dependencies.py
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from .db import AsyncSessionLocal
async def get_db() -> AsyncSession:
"""
每一次請求會產生一個獨立的 AsyncSession,
請求結束後自動關閉(release 回連線池)。
"""
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
範例 3️⃣:建立全域的 Redis 連線池(singleton)
# cache.py
import aioredis
from typing import AsyncGenerator
REDIS_URL = "redis://localhost:6379/0"
# 直接在模組層級建立連線池
redis_pool = aioredis.from_url(
REDIS_URL,
encoding="utf-8",
decode_responses=True,
max_connections=50, # 連線池大小
)
async def get_redis() -> aioredis.Redis:
"""
直接回傳已建立好的 Redis 物件。
因為 aioredis 本身已內建連線池,故此依賴為 singleton。
"""
return redis_pool
範例 4️⃣:結合 Cache 與 DB 的混合依賴(實作讀寫快取)
# services.py
from fastapi import Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from .dependencies import get_db
from .cache import get_redis
from .models import User # 假設已定義 ORM Model
CACHE_TTL = 300 # 秒
async def get_user_by_id(
user_id: int,
db: AsyncSession = Depends(get_db),
redis: aioredis.Redis = Depends(get_redis),
):
"""
1. 先嘗試從 Redis 讀取快取
2. 若快取不存在,則從資料庫查詢
3. 查詢成功後寫入 Redis,設定 TTL
"""
cache_key = f"user:{user_id}"
cached = await redis.get(cache_key)
if cached:
# 直接回傳快取的 JSON,省去 DB I/O
return User.parse_raw(cached)
# DB 查詢
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# 寫入快取
await redis.set(cache_key, user.json(), ex=CACHE_TTL)
return user
範例 5️⃣:在路由中使用上述依賴(完整示例)
# main.py
from fastapi import FastAPI, Depends, Path
from .services import get_user_by_id
from .models import User
app = FastAPI(title="DI + Cache + Connection Pool Demo")
@app.get("/users/{user_id}", response_model=User)
async def read_user(
user_id: int = Path(..., gt=0),
user: User = Depends(get_user_by_id),
):
"""
只需要把 `Depends(get_user_by_id)` 注入,
內部已自動完成 DB 連線、Redis 快取與釋放流程。
"""
return user
說明:
engine與redis_pool在模組載入時即建立,屬於 singleton。AsyncSession使用async with包裝,屬於 request‑scoped,請求結束自動關閉。get_user_by_id同時依賴兩個資源,示範 多重 DI 的寫法。
常見陷阱與最佳實踐
| 陷阱 | 可能的後果 | 解法 / 最佳實踐 |
|---|---|---|
| 把資料庫連線寫成全域變數(非 pool) | 每個請求都共用同一個連線,導致 競爭條件、資料錯亂 | 使用 SQLAlchemy 的 connection pool,或 databases 套件的 Database 物件 |
在依賴內直接 await redis.set(...),但未使用 yield |
產生 singleton 但無法在請求結束時釋放資源(如 close()) |
若需要在請求結束時清理,使用 async def dep() -> AsyncGenerator 並 yield |
| 快取 key 沒有規劃命名空間 | 不同模組的 key 可能衝突,難以除錯 | 統一使用 前綴(如 user:{id}、order:{id}) |
| TTL 設定過長或未設定 | 快取陳舊資料,造成 資料不一致 | 根據資料變動頻率設定合理的 TTL,或在資料變更時主動 invalidate |
| 在測試環境仍使用真實 Redis | 測試不獨立、速度慢、可能污染資料 | 使用 fakeredis 或自行實作 get_redis_mock,在 TestClient 中覆寫依賴 |
其他實用技巧
使用
ContextVar管理 request‑scoped 資源
FastAPI 已內建request.state,但在非路由函式(例如 background tasks)裡,可透過ContextVar傳遞同一個 session。分層封裝
db層只負責 連線池與 Session。repository層負責 CRUD,接受session作為參數。service層負責 快取邏輯,同時注入redis。
這樣的分層讓每個單元都容易測試與替換。
使用
lifespan事件
若需要在應用啟動時初始化資源(例如預熱 Redis 連線),可以在FastAPI的lifespan中完成。
# main.py (lifespan 範例)
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# 啟動階段
await redis_pool.ping()
yield
# 關閉階段
await redis_pool.close()
app = FastAPI(lifespan=lifespan)
實際應用場景
| 場景 | 為何需要 DI + Cache + Pool | 典型實作 |
|---|---|---|
| 電商商品查詢 | 商品資訊不常變動,讀取量大,使用 Redis 快取減少 DB 壓力;同時需要 DB 連線池支撐大量同時查詢 | get_product_by_id → 先查 Redis → DB fallback → 寫入快取 |
| 使用者認證 | JWT 驗證後需要查詢 Redis 中的 session,且每個請求都需要 DB 讀寫使用者狀態 | get_current_user 依賴 get_redis + get_db |
| 批次報表產生 | 背景任務需要大量 DB 讀取,若每次都新建連線會耗盡資源;使用 connection pool + Depends 注入的 AsyncSession 可重用 |
@app.on_event("startup") 中預熱連線池;背景任務使用 Depends(get_db) |
| 第三方 API 呼叫 | 需要重用 HTTP 連線(httpx.AsyncClient)以降低 RTT,且需要快取外部 API 回傳結果 | 建立 get_http_client 為 singleton,get_external_data 內部使用 redis 快取結果 |
總結
- 依賴注入 是 FastAPI 的核心設計,讓我們可以把 快取 與 連線池 這類資源的生命週期明確化、集中管理。
- singleton(全域)適合放資料庫引擎、Redis 連線池等不需要每次請求重建的資源;
- request‑scoped(scoped)則適用於需要在每個請求結束時釋放的 Session、事務等。
- 正確的 TTL、命名空間、測試替換,以及 lifespan 初始化,都是避免資源泄漏與效能瓶頸的關鍵。
把上述概念落實在程式碼中,你的 FastAPI 專案將會在 效能、可維護性與可測試性 三方面同時受益。快把這些技巧寫進你的專案基礎設施裡,讓每一次 API 呼叫都能快速、穩定地完成吧!