本文 AI 產出,尚未審核

FastAPI – 請求生命週期(Request Lifecycle)

主題:全域資源建立與清理


簡介

在 Web 框架中,請求生命週期(Request Lifecycle)是每一次 HTTP 請求從進入伺服器、處理、產出回應,到最終關閉連線所經歷的一連串步驟。對於 FastAPI 這類以 ASGI 為基礎的非同步框架而言,正確管理全域資源(例如資料庫連線池、Redis client、外部 API 客戶端)尤為重要。

若資源的 建立清理 沒有遵循統一的生命週期,會導致:

  • 記憶體或連線泄漏 → 服務在高流量下崩潰
  • 初始化成本過高 → 每個請求都重複建立資源,效能急遽下降
  • 測試環境不一致 → 難以重現問題,影響 CI/CD 流程

本篇文章將深入說明 FastAPI 提供的 全域資源建立與清理 機制,從 @app.on_eventlifespan、依賴注入(Dependency Injection)到背景任務(BackgroundTask),一步步帶你在實務專案中安全、有效地管理資源。


核心概念

1. 什麼是「全域資源」?

全域資源指的是 在整個應用程式生命週期內只需要建立一次,且會被多個請求共享的物件。常見例子包括:

資源類型 典型使用情境
資料庫連線池 PostgreSQL、MySQL、MongoDB 等
Redis / Memcached 客戶端 快取、訊息佇列
外部 API 客戶端 第三方支付、OAuth2 Provider
機器學習模型 大型模型載入記憶體後供所有請求使用

這類資源的建立往往耗時且具備「一次就好」的特性,若在每次請求內部重複建立,會嚴重拖慢回應時間。

2. FastAPI 的兩種全域生命週期掛鉤

2.1 @app.on_event

FastAPI 允許在 應用程式啟動 (startup) 與 關閉 (shutdown) 時執行自訂函式:

from fastapi import FastAPI

app = FastAPI()

@app.on_event("startup")
async def init_resources():
    # 建立全域資源
    ...

@app.on_event("shutdown")
async def close_resources():
    # 清理全域資源
    ...
  • 優點:語法簡潔、可直接使用非同步函式。
  • 限制:只能註冊一次 startupshutdown,若在大型專案中想把資源分散到不同模組,需要額外的匯入與呼叫順序管理。

2.2 lifespan(FastAPI 0.95+ 推薦)

lifespancontext manager 方式提供更彈性的生命週期控制,支援同步與非同步兩種寫法,且可以在同一個函式內同時處理啟動與關閉邏輯:

from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # ------ 啟動階段 ------
    print("🚀 應用程式啟動,建立全域資源")
    app.state.db = await create_db_pool()
    app.state.redis = await create_redis_client()
    yield
    # ------ 關閉階段 ------
    print("🛑 應用程式關閉,釋放全域資源")
    await app.state.db.close()
    await app.state.redis.close()

app = FastAPI(lifespan=lifespan)
  • 優點:所有資源的建立與清理寫在同一個區塊,易於閱讀與維護。
  • 彈性:可以在 yield 前後插入任意程式碼,支援條件式建立、錯誤處理等。

小技巧:若專案同時使用 @app.on_eventlifespan,FastAPI 會先執行 lifespan,再依序呼叫 startup;關閉時則相反。建議在新專案中統一使用 lifespan,避免混用造成不易追蹤的執行順序。

3. 透過 app.state 存取全域資源

FastAPI 內建的 app.state 是一個 任意屬性可寫的容器,適合放置在生命週期中建立的資源:

# 在 startup 時
app.state.db = await asyncpg.create_pool(dsn=DATABASE_URL)

# 在路由或依賴中取得
async def get_db() -> asyncpg.Pool:
    return app.state.db

app.state 的好處是 型別提示友好(使用 IDE 時可自動補全)且不會與路由參數衝突。

4. 依賴注入(Dependency Injection)結合全域資源

FastAPI 的依賴系統允許我們把全域資源封裝成 可呼叫的依賴,讓每個路由只需要 Depends(get_db) 即可取得連線池:

from fastapi import Depends, APIRouter

router = APIRouter()

def get_db_pool() -> asyncpg.Pool:
    # 直接從 app.state 讀取,保持單例
    return app.state.db

@router.get("/users/{user_id}")
async def read_user(user_id: int, db: asyncpg.Pool = Depends(get_db_pool)):
    async with db.acquire() as conn:
        row = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
        return dict(row)

此方式的 好處

  • 測試友好:在單元測試時,只要替換 get_db_pool 為測試用的 mock,就能不啟動完整的資料庫。
  • 解耦:路由不需要知道資源的建立細節,只關心「取得」即可。

5. 背景任務(BackgroundTask)與資源清理

有時候我們需要在回應送出後,執行 非同步的清理工作(例如寫入日誌、關閉臨時檔案、發送通知)。FastAPI 提供 BackgroundTasks 讓你把這類工作排入事件迴圈:

from fastapi import BackgroundTasks

async def close_temp_file(file_path: str):
    # 假設這是一個耗時的 IO 操作
    await asyncio.sleep(0.1)
    os.remove(file_path)

@app.post("/upload")
async def upload_file(file: UploadFile, background: BackgroundTasks):
    temp_path = f"/tmp/{file.filename}"
    async with aiofiles.open(temp_path, "wb") as out_file:
        content = await file.read()
        await out_file.write(content)
    # 回傳成功後,交由背景任務刪除暫存檔
    background.add_task(close_temp_file, temp_path)
    return {"filename": file.filename}

透過背景任務,我們可以 延後資源釋放,避免在回應尚未送出前就關閉連線或檔案。


程式碼範例

以下提供 5 個實務常見的全域資源建立與清理範例,每個範例皆包含完整說明與註解。

範例 1:使用 lifespan 建立 PostgreSQL 連線池

# file: app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncpg

DATABASE_URL = "postgresql://user:pass@localhost:5432/mydb"

@asynccontextmanager
async def lifespan(app: FastAPI):
    # --------- 啟動階段 ----------
    print("🔧 建立 PostgreSQL 連線池")
    app.state.db = await asyncpg.create_pool(DATABASE_URL, min_size=5, max_size=20)
    yield
    # --------- 關閉階段 ----------
    print("🧹 關閉 PostgreSQL 連線池")
    await app.state.db.close()

app = FastAPI(lifespan=lifespan)

# 依賴函式
def get_db() -> asyncpg.Pool:
    return app.state.db

說明min_sizemax_size 控制連線池的上下限,讓程式在高併發時自動擴充。yield 前後分別是建立與釋放資源的時機點。


範例 2:在 startup / shutdown 中建立 Redis 客戶端

# file: app/redis.py
import aioredis
from fastapi import FastAPI

REDIS_URL = "redis://localhost:6379/0"

app = FastAPI()

@app.on_event("startup")
async def redis_startup():
    print("🚀 初始化 Redis 連線")
    app.state.redis = await aioredis.from_url(REDIS_URL, decode_responses=True)

@app.on_event("shutdown")
async def redis_shutdown():
    print("🔚 關閉 Redis 連線")
    await app.state.redis.close()

提示decode_responses=True 讓取得的值自動轉為 str,減少後續手動解碼的麻煩。


範例 3:結合依賴注入與背景任務,安全釋放檔案資源

# file: app/file_api.py
from fastapi import APIRouter, UploadFile, BackgroundTasks, HTTPException
import aiofiles, os, uuid

router = APIRouter(prefix="/files")

async def delete_file(path: str):
    """背景任務:刪除暫存檔"""
    try:
        os.remove(path)
        print(f"🗑️ 已刪除 {path}")
    except FileNotFoundError:
        pass

@router.post("/upload")
async def upload(
    file: UploadFile,
    background: BackgroundTasks,
):
    tmp_name = f"/tmp/{uuid.uuid4()}_{file.filename}"
    async with aiofiles.open(tmp_name, "wb") as out_file:
        content = await file.read()
        await out_file.write(content)

    # 把刪除工作交給背景任務,避免阻塞回應
    background.add_task(delete_file, tmp_name)
    return {"message": "上傳成功", "temp_path": tmp_name}

關鍵:使用 uuid 產生唯一檔名,避免同時上傳時衝突。背景任務保證即使使用者斷線,檔案仍會在稍後被清除。


範例 4:在 lifespan 中載入機器學習模型(大型檔案)

# file: app/ml.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
import joblib   # 假設使用 sklearn

MODEL_PATH = "models/price_predictor.pkl"

@asynccontextmanager
async def lifespan(app: FastAPI):
    print("🚀 載入機器學習模型")
    # joblib 本身是同步的,使用 threadpool 以免阻塞事件迴圈
    loop = asyncio.get_event_loop()
    app.state.model = await loop.run_in_executor(None, joblib.load, MODEL_PATH)
    yield
    # 若模型有需要釋放資源(例如 GPU 記憶體),可在此處處理
    print("🧹 釋放模型資源(若有)")

app = FastAPI(lifespan=lifespan)

def get_model():
    return app.state.model

實務建議:模型載入通常耗時較長,建議在容器啟動階段就完成,避免第一次請求被迫等待。


範例 5:使用 Depends 包裝第三方 OAuth2 客戶端,並在關閉時撤銷 token

# file: app/oauth.py
from fastapi import Depends, FastAPI
import httpx, asyncio

TOKEN_URL = "https://auth.example.com/token"
REVOKE_URL = "https://auth.example.com/revoke"
CLIENT_ID = "my-client-id"
CLIENT_SECRET = "my-secret"

app = FastAPI()
app.state.http_client = httpx.AsyncClient()

async def get_token() -> str:
    """取得一次性的 access token,存於 app.state 中"""
    if not hasattr(app.state, "access_token"):
        resp = await app.state.http_client.post(
            TOKEN_URL,
            data={"grant_type": "client_credentials"},
            auth=(CLIENT_ID, CLIENT_SECRET),
        )
        resp.raise_for_status()
        app.state.access_token = resp.json()["access_token"]
    return app.state.access_token

async def revoke_token():
    """在應用關閉時撤銷 token,避免浪費授權額度"""
    token = getattr(app.state, "access_token", None)
    if token:
        await app.state.http_client.post(REVOKE_URL, data={"token": token})

@app.on_event("shutdown")
async def on_shutdown():
    await revoke_token()
    await app.state.http_client.aclose()

要點httpx.AsyncClient 也是全域資源,必須在 shutdown 時呼叫 aclose() 釋放底層連線池。撤銷 token 可以降低安全風險與成本。


常見陷阱與最佳實踐

陷阱 說明 解決方案
資源建立放在路由函式內 每一次請求都會重新建立連線或模型,導致延遲與資源耗盡。 使用 lifespan / @app.on_event 只在應用啟動時建立一次。
忘記在 shutdown 呼叫 close() 連線池、HTTP client、Redis client 等不會自動釋放,容器關閉時會留下「僵屍」連線。 確保每個 startup 對應一個 shutdown,可在 IDE 中設定 lint 規則。
在非同步環境使用同步程式庫 psycopg2requests 會阻塞事件迴圈,影響整體效能。 改用 asyncpghttpx.AsyncClient,或使用 run_in_executor 包裝同步呼叫。
依賴函式直接返回資源的實體 測試時難以替換成 mock,導致測試不獨立。 使用 Depends 把資源包裝成可注入的函式,測試時僅替換這個函式。
背景任務中拋出未捕獲例外 例外不會回傳給使用者,且可能導致任務中斷,資源未清理。 在背景任務內部使用 try/except,或利用 logging.exception 記錄錯誤。

最佳實踐總結

  1. 統一使用 lifespan:新專案建議只使用 lifespan,保持建立與清理的對稱性。
  2. 把所有全域資源放入 app.state:讓程式碼在任何地方都能透過 app.state 取得,同時保持型別安全。
  3. 依賴注入是橋樑:透過 Dependsapp.state 包裝成可測試的函式。
  4. 非同步優先:盡量選擇支援 ASGI 的非同步套件;若必須使用同步套件,記得使用 run_in_executor
  5. 資源清理不可忽視:在 shutdown 階段務必關閉所有連線池、客戶端、檔案描述子,避免容器重啟時的「端口被占用」或「記憶體泄漏」。

實際應用場景

場景 1:大型電商平台的訂單服務

  • 需求:同時處理數千筆每秒的訂單寫入、庫存檢查與快取更新。
  • 解法
    • 使用 lifespan 建立 PostgreSQL 連線池max_size=50)與 Redis 客戶端
    • 透過依賴注入把 dbredis 注入到每個訂單路由。
    • 在訂單建立成功後,使用 BackgroundTasks發送確認郵件寫入審計日誌 放到背景任務,避免阻塞主要交易流程。

堆景 2:機器學習模型 API

  • 需求:提供即時預測服務,模型文件大小 500MB,且每次推論需 10ms 內返回。
  • 解法
    • lifespan載入模型(使用 joblibtorch.load),放入 app.state.model
    • 每個預測路由透過 Depends(get_model) 取得模型實例,直接呼叫 model.predict(...)
    • 若模型使用 GPU,必須在 shutdown 時呼叫 torch.cuda.empty_cache() 釋放顯示卡記憶體。

場景 3:多租戶 SaaS 平台的動態資料庫

  • 需求:每個租戶都有自己的資料庫,連線資訊在請求的 JWT 內。
  • 解法
    • startup建立全域的資料庫連線池工廠(例如 asyncpg.create_pool 的 factory),但不立即連接任何租戶。
    • 使用 依賴注入 讀取 JWT,根據租戶 ID 動態取得或建立對應的連線池(使用 lru_cache 快取 pool 實例)。
    • shutdown 時遍歷所有已建立的 pool,逐一關閉,避免遺漏。

總結

全域資源的 建立與清理 是 FastAPI 請求生命週期中不可或缺的環節。透過 lifespan@app.on_event,我們可以在應用程式啟動時一次性完成耗時的資源初始化,並在關閉時安全釋放,避免記憶體泄漏與連線枯竭。

結合 app.state依賴注入 以及 背景任務,不僅讓程式碼保持乾淨、可測試,也能在高併發環境中維持卓越效能。掌握這些概念,你就能在實務專案中:

  • 快速建立 資料庫、快取、第三方 API 等全域資源
  • 以最小的代價支援 大量同時請求
  • 在容器化部署或無伺服器環境中,確保資源能在 啟動 / 關閉 時正確對應

最後,別忘了在 測試 階段同樣模擬 lifespan 的行為,使用 TestClientAsyncClient 時手動呼叫 startup / shutdown,才能保證測試環境與正式環境行為一致。祝你在 FastAPI 的開發旅程中順利打造高效、可維護的服務! 🚀