本文 AI 產出,尚未審核

FastAPI 請求生命週期:舊式 on_event("startup") vs 新式 lifespan 裝飾器


簡介

FastAPI 中,應用程式的啟動與關閉階段常常需要執行一些前置或清理工作,例如連線資料庫、初始化快取、設定日誌等。過去我們會藉由 app.on_event("startup")app.on_event("shutdown") 兩個事件來完成這些任務,寫法簡單卻有一些限制——例如 無法保證兩個事件的執行順序無法在同一個函式內同時處理啟動與關閉,而且在 非同步環境 中,錯誤處理與資源釋放也較為繁瑣。

FastAPI 2.0 之後引入了 lifespan 裝飾器(或 lifespan 參數),提供了一個 單一入口 來管理應用的整個生命週期。它以 async generator 的形式呈現,讓開發者能在同一個程式碼區塊內 同步寫出啟動與關閉的邏輯,同時支援 同步與非同步 兩種模式,提升可讀性與錯誤處理的可控性。

本篇文章將深入比較這兩種做法,說明它們的底層原理、使用方式、常見陷阱與最佳實踐,並提供 3~5 個實作範例,幫助你在日常開發中選擇最適合的方案。


核心概念

1. 為什麼需要生命週期管理?

  • 資源初始化:資料庫、Redis、外部 API 客戶端等,都需要在第一個請求到來前建立連線。
  • 資源釋放:應用關閉時必須正確關閉連線、寫入最後的日誌、釋放記憶體。
  • 全域設定:例如載入機器學習模型、讀取大型設定檔,這類工作只需執行一次。

重點:若這些工作寫在每個路由函式裡,會造成 效能瓶頸資源浪費

2. on_event("startup") / on_event("shutdown") 的運作方式

from fastapi import FastAPI

app = FastAPI()

@app.on_event("startup")
async def startup_event():
    # 這裡放置啟動時要執行的程式碼
    ...

@app.on_event("shutdown")
async def shutdown_event():
    # 這裡放置關閉時要執行的程式碼
    ...
  • 註冊方式:透過 app.on_event(event_name) 裝飾器分別註冊兩個獨立的函式。
  • 執行順序:FastAPI 會先執行所有 startup 事件,再開始接受請求;關閉時則相反。
  • 限制
    • 無法保證多個 startup 之間的執行順序(除非自行編寫依賴)。
    • 啟動與關閉的程式碼分散在兩個地方,不易維護。
    • 若在 startup 中拋出例外,整個應用會直接失敗,但錯誤資訊可能不完整。

3. lifespan 裝飾器的設計理念

from fastapi import FastAPI

app = FastAPI()

@app.lifespan
async def lifespan(app: FastAPI):
    # ── 啟動階段 ──
    # 可以在此執行同步或非同步的初始化工作
    await init_db()
    cache = await create_redis_pool()
    app.state.cache = cache   # 把資源掛到 app.state,供路由使用

    yield  # <-- 這裡交回控制權,讓應用開始處理請求

    # ── 關閉階段 ──
    # 在此釋放資源、寫入日誌等
    await cache.close()
    await close_db()
  • 單一 async generatoryield 前的程式碼視為 啟動yield 後的程式碼視為 關閉
  • 上下文管理:類似 with 語句,保證 無論發生什麼錯誤,關閉階段都會被執行
  • 支援同步:若使用 def lifespan(app):(非 async),仍可正常運作,只是只能執行同步程式碼。
  • 更好的錯誤捕捉:在 yield 前拋出的例外會被 FastAPI 捕獲,並在關閉階段仍有機會執行清理工作。

4. lifespanon_event 的差異圖

特性 on_event lifespan
程式結構 兩個獨立函式(startup / shutdown) 單一 async generator
執行順序保證 無法保證多個 startup 的順序 前後順序固定(啟動 → yield → 關閉)
錯誤處理 啟動階段例外會直接中斷,關閉不一定執行 yield 前例外仍會觸發關閉階段
資源共享 需要透過 app.state 或全域變數 同樣使用 app.state,但在同一函式內更直觀
支援同步/非同步 只能使用 async def(非同步) 同時支援同步與非同步
可讀性 分散、易遺漏 集中、易維護

程式碼範例

以下示範 3 個實務常見情境,分別用舊式 on_event 與新式 lifespan 實作,讓你能直接對照差異。

範例 1:資料庫連線池的建立與關閉

1. 使用 on_event

# file: app_on_event.py
from fastapi import FastAPI
import asyncpg

app = FastAPI()
DATABASE_URL = "postgresql://user:pwd@localhost/dbname"

@app.on_event("startup")
async def startup_db():
    """啟動時建立資料庫連線池"""
    app.state.pool = await asyncpg.create_pool(DATABASE_URL)
    print("✅ DB pool created")

@app.on_event("shutdown")
async def shutdown_db():
    """關閉時釋放連線池"""
    await app.state.pool.close()
    print("🔚 DB pool closed")

說明app.state.pool 讓所有路由都能共享同一個連線池。但若在 startup_db 發生例外,shutdown_db 可能不會被呼叫,導致資源遺漏。

2. 使用 lifespan

# file: app_lifespan.py
from fastapi import FastAPI
import asyncpg

app = FastAPI()
DATABASE_URL = "postgresql://user:pwd@localhost/dbname"

@app.lifespan
async def db_lifespan(app: FastAPI):
    # ── 啟動階段 ──
    app.state.pool = await asyncpg.create_pool(DATABASE_URL)
    print("✅ DB pool created")
    yield
    # ── 關閉階段 ──
    await app.state.pool.close()
    print("🔚 DB pool closed")

優點:即使 create_pool 拋出例外,yield 之後的清理程式仍會被呼叫,確保不會留下未關閉的連線。


範例 2:載入大型機器學習模型(同步 I/O)

1. on_event(同步寫法)

import joblib
from fastapi import FastAPI

app = FastAPI()
MODEL_PATH = "model.pkl"

@app.on_event("startup")
def load_model():
    """同步載入模型,會阻塞主執行緒"""
    app.state.model = joblib.load(MODEL_PATH)
    print("🤖 Model loaded")

@app.on_event("shutdown")
def unload_model():
    """釋放模型資源(若有)"""
    del app.state.model
    print("🗑 Model unloaded")

缺點load_model 為同步阻塞,若模型檔案很大會延遲服務啟動,且無法使用 await

2. lifespan(支援同步/非同步混合)

import joblib
from fastapi import FastAPI

app = FastAPI()
MODEL_PATH = "model.pkl"

@app.lifespan
def model_lifespan(app: FastAPI):
    # ── 啟動階段(同步) ──
    app.state.model = joblib.load(MODEL_PATH)
    print("🤖 Model loaded")
    yield
    # ── 關閉階段(同步) ──
    del app.state.model
    print("🗑 Model unloaded")

好處:即使使用同步 I/O,也能在同一個函式內清楚看到「載入 → yield → 卸載」的完整流程,避免忘記寫關閉程式。


範例 3:結合多個資源(DB + Redis + 設定檔)並使用依賴注入

# file: app_complex.py
from fastapi import FastAPI, Depends, Request
import asyncpg, aioredis, json

app = FastAPI()
DB_URL = "postgresql://user:pwd@localhost/db"
REDIS_URL = "redis://localhost"
CONFIG_PATH = "config.json"

# -------- Lifespan 內部設定 --------
@app.lifespan
async def complex_lifespan(app: FastAPI):
    # 啟動階段
    app.state.db_pool = await asyncpg.create_pool(DB_URL)
    app.state.redis = await aioredis.from_url(REDIS_URL)
    with open(CONFIG_PATH, "r", encoding="utf-8") as f:
        app.state.config = json.load(f)
    print("🚀 All resources ready")
    yield
    # 關閉階段
    await app.state.db_pool.close()
    await app.state.redis.close()
    print("🔚 All resources released")

# -------- 依賴注入範例 --------
async def get_db(request: Request):
    """從 request.app.state 取得 DB pool"""
    return request.app.state.db_pool

async def get_redis(request: Request):
    return request.app.state.redis

def get_config(request: Request):
    return request.app.state.config

@app.get("/items/{item_id}")
async def read_item(
    item_id: int,
    db_pool = Depends(get_db),
    redis = Depends(get_redis),
    config = Depends(get_config),
):
    # 使用 db_pool、redis、config 做任何業務邏輯
    async with db_pool.acquire() as conn:
        item = await conn.fetchrow("SELECT * FROM items WHERE id=$1", item_id)
    cached = await redis.get(f"item:{item_id}")
    return {
        "item": dict(item) if item else None,
        "cached": cached,
        "feature_flag": config.get("feature_flag", False),
    }

重點說明

  1. 所有資源在 lifespan 中一次性建立,避免在每個路由內重複建立連線。
  2. 使用 request.app.state 作為全域共享容器,配合 FastAPI 的 依賴注入系統 (Depends) 讓路由保持乾淨、易測試。
  3. yield 前後的程式碼 完全對稱,保證即使在啟動階段發生例外,仍會嘗試關閉已成功建立的資源。

常見陷阱與最佳實踐

陷阱 可能的後果 解決方案
startup 中使用阻塞 I/O 服務啟動緩慢、容器健康檢查失敗 改用非同步庫或將阻塞工作搬到 ThreadPoolExecutor
忘記在 shutdown 釋放資源 記憶體洩漏、資料庫連線耗盡 使用 lifespan,確保 yield 後的程式碼一定會執行
多個 on_event("startup") 的執行順序不確定 某些資源依賴未先建立,導致錯誤 若必須依序執行,改用 lifespan 或在單一 startup 中手動排序
lifespan 中寫入大量同步程式 仍會阻塞事件迴圈 使用 run_in_threadpoolfrom fastapi.concurrency import run_in_threadpool)將同步工作移至執行緒池
直接在 app.state 放置大量資料 可能導致記憶體過度使用 僅放置必要的「連線、設定、快取」等輕量物件,其他大型資料可考慮放在外部服務(如 Redis)

最佳實踐

  1. 優先使用 lifespan:除非你必須兼容非常舊的 FastAPI 版本(< 0.78),否則 lifespan 提供更安全、可讀的生命週期管理。
  2. 資源放在 app.state:這是 FastAPI 官方推薦的全域共享方式,且在 lifespan 中直接掛載最直觀。
  3. 錯誤要明確捕捉:在 lifespan 中,建議使用 try / finally 包住 yield,確保即使 yield 前拋出例外,仍能執行關閉程式。
    @app.lifespan
    async def safe_lifespan(app: FastAPI):
        try:
            app.state.db = await asyncpg.create_pool(DB_URL)
            yield
        finally:
            await app.state.db.close()
    
  4. 測試生命週期:利用 TestClient 可以模擬應用的啟動與關閉,確保 lifespan 內的資源正確釋放。
    from fastapi.testclient import TestClient
    client = TestClient(app)
    # 在 client 產生期間,lifespan 已啟動
    response = client.get("/ping")
    # client 關閉時,lifespan 的關閉階段會自動執行
    

實際應用場景

場景 為何適合使用 lifespan
微服務容器化(Docker/Kubernetes) 容器啟動時需要一次性連線多個外部服務,關閉時保證釋放,避免 POD 重啟時留下殘留連線
機器學習服務(載入大型模型) 模型載入通常是同步且耗時,lifespan 能把「載入 → yield → 卸載」寫在同一檔案,維護更簡單
多租戶 SaaS(每個租戶有獨立 DB) lifespan 中建立租戶資料庫池,並在關閉時統一清理,避免在每個路由內重複建立
高可用 API Gateway 需要在啟動時預熱路由、載入快取策略,關閉時寫入統計資料,lifespan 讓這些前後置作業自然串接
測試環境自動化 測試框架可透過 lifespan 直接注入測試資料庫、Mock 服務,測試完成後自動回收資源

總結

  • on_event("startup") / on_event("shutdown") 是 FastAPI 早期提供的生命週期管理方式,使用簡單但容易產生 執行順序不確定錯誤處理不完整 的問題。
  • lifespan 裝飾器async generator 為核心概念,讓 啟動關閉的程式碼寫在同一個區塊,保證 前後對稱錯誤安全,同時支援同步與非同步兩種工作負載。
  • 在實務開發中,建議將所有全域資源(資料庫、快取、設定、模型)集中於 lifespan,再透過 app.state 配合 FastAPI 的 依賴注入,讓路由保持乾淨、易測試。
  • 請務必留意 阻塞 I/O資源釋放 以及 測試 三大要點,避免因生命週期管理不當造成服務不穩或資源洩漏。

透過本文的比較與範例,你應該已經能夠自信地在新專案或既有專案中 on_event 轉移到 lifespan,讓 FastAPI 應用的啟動與關閉更安全、更可維護。祝開發順利 🚀!