本文 AI 產出,尚未審核

FastAPI – 效能與最佳化

主題:async 效能瓶頸分析


簡介

FastAPI 中,使用 async / await 可以讓 API 處理大量同時請求時保持高併發、低資源佔用,這也是 FastAPI 相較於傳統同步框架(如 Flask)最吸引人的特點之一。然而,async 本身並不是萬能的藥丸,若使用不當,反而會成為效能的瓶頸

本篇文章將從概念、實作、常見陷阱與最佳實踐四個面向,深入探討 async 效能瓶頸 的根源,並提供可直接套用在實務專案中的程式碼範例,幫助初學者到中級開發者在開發 FastAPI 應用時,既能保留非同步的優勢,又能避免常見的效能踩雷。


核心概念

1. 為什麼需要 async

  • IO‑bound 與 CPU‑bound

    • IO‑bound(例如資料庫、外部 API、檔案 I/O)會因等待外部資源而阻塞執行緒。使用 async 可以在等待期間釋放執行緒,讓其他請求繼續執行。
    • CPU‑bound(大量計算)則不適合交給 async,因為 Python 的協程仍在同一個執行緒中運行,計算密集的工作會阻塞整個事件迴圈。
  • 事件迴圈(Event Loop)
    FastAPI 建立在 Starlette 之上,而 Starlette 使用 uvicorn(或 hypercorn)作為 ASGI 伺服器,底層是 asyncio 事件迴圈。所有 async def 的路由都會被事件迴圈調度。

重點:只有在 等待 I/O 時才能真正釋放 CPU,否則 await 只會把同一個執行緒卡住。

2. await 的成本

每一次 await 都會觸發一次 協程切換(context switch),這在 C 語言層級的切換成本極低,但在 Python 中仍會產生:

  • 堆疊保存與恢復:協程的局部變數、指令指標需要保存與恢復。
  • 事件迴圈的排程:若大量小碎片的 await(如每 1ms await asyncio.sleep(0)),會造成事件迴圈頻繁喚醒,降低吞吐量。

實務建議:盡量把同類型的 I/O 合併,減少不必要的 await,尤其是同步阻塞的呼叫被包在 await 中時,會直接把執行緒卡死。

3. 同步函式(Blocking)與協程的混用

def sync_heavy():
    # 這是一段阻塞的 CPU 密集運算
    total = 0
    for i in range(10_000_000):
        total += i
    return total

如果在 async 路由中直接呼叫 sync_heavy(),整個事件迴圈會被卡住,所有其他連線都會被阻塞。正確的做法是:

  • 將阻塞工作交給執行緒池await run_in_threadpool(sync_heavy)(Starlette 提供的工具)
  • 或改寫為原生非同步:使用 numpynumba 等加速庫,或把計算外包給 microservice。

4. 資料庫與外部服務的非同步客戶端

同步客戶端 非同步客戶端 典型使用方式
psycopg2(PostgreSQL) asyncpg await conn.fetch(...)
pymongo(MongoDB) motor await collection.find_one(...)
requests(HTTP) httpx(async 模式) await client.get(url)

關鍵:若使用同步客戶端,即使路由是 async def,仍會阻塞事件迴圈。因此在效能敏感的服務中,務必選擇非同步客戶端

5. 併發控制:asyncio.gather vs asyncio.wait

  • asyncio.gather(*tasks, return_exceptions=True):一次收集多個協程,若任務失敗仍會返回例外,適合批次 I/O
  • asyncio.wait(tasks, return_when=FIRST_COMPLETED):在任務完成條件滿足時即返回,適合**競賽(race)**情境。

程式碼範例

以下示範 4 個常見的 async 效能優化案例,皆可直接貼到 FastAPI 專案中測試。

範例 1:使用非同步資料庫客戶端(asyncpg)

# app/database.py
import asyncpg
from typing import List

class PostgresClient:
    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool = None

    async def connect(self):
        self.pool = await asyncpg.create_pool(dsn=self.dsn, min_size=5, max_size=20)

    async def fetch_users(self) -> List[dict]:
        async with self.pool.acquire() as conn:
            rows = await conn.fetch("SELECT id, name, email FROM users LIMIT 100")
            return [dict(row) for row in rows]

# 初始化(在 main.py 中呼叫)
# await PostgresClient(dsn).connect()

說明

  • 使用連線池(min_sizemax_size)減少建立連線的開銷。
  • await conn.fetch 為非同步 I/O,允許同時處理其他請求。

範例 2:將阻塞計算交給執行緒池

# app/compute.py
import asyncio
from starlette.concurrency import run_in_threadpool

def heavy_calculation(n: int) -> int:
    total = 0
    for i in range(n):
        total += i * i
    return total

async def async_heavy_calculation(n: int) -> int:
    # 交給執行緒池,避免阻塞事件迴圈
    result = await run_in_threadpool(heavy_calculation, n)
    return result

說明

  • run_in_threadpool 會把 heavy_calculation 放到 ThreadPoolExecutor,在背景執行。
  • 呼叫端只需 await async_heavy_calculation(10_000_000),不會卡住其他協程。

範例 3:批次呼叫外部 API(httpx)並行

# app/external.py
import httpx
import asyncio

async def fetch_one(url: str) -> dict:
    async with httpx.AsyncClient(timeout=5.0) as client:
        resp = await client.get(url)
        resp.raise_for_status()
        return resp.json()

async def fetch_multiple(urls: list[str]) -> list[dict]:
    # 使用 asyncio.gather 同時發起多個請求
    tasks = [fetch_one(u) for u in urls]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    # 例外處理:把失敗的結果記錄下來
    return [r if not isinstance(r, Exception) else {"error": str(r)} for r in results]

說明

  • httpx.AsyncClient 為非同步 HTTP 客戶端。
  • asyncio.gather 讓所有請求同時在事件迴圈中執行,極大提升吞吐量。

範例 4:限制併發數(Semaphore)避免資源耗盡

# app/limit.py
import asyncio
from typing import Any

# 假設外部服務只能同時接受 5 個請求
semaphore = asyncio.Semaphore(5)

async def limited_fetch(url: str) -> Any:
    async with semaphore:
        async with httpx.AsyncClient() as client:
            resp = await client.get(url)
            resp.raise_for_status()
            return resp.json()

說明

  • asyncio.Semaphore 控制同時執行的協程數,防止外部服務因過度併發被 throttling。
  • 這種模式在 爬蟲批次資料同步 時非常常見。

常見陷阱與最佳實踐

陷阱 為什麼會發生 解決方案
同步 I/O 混入 async 使用 requestspsycopg2 等阻塞套件 換成 httpxasyncpg,或用 run_in_threadpool 包裝
過度細分 await 每個小操作都 await(例如 await asyncio.sleep(0) 合併相鄰的 I/O,減少切換次數
未使用連線池 每次請求都新建 DB/HTTP 連線 建立 連線池asyncpg.create_poolhttpx.AsyncClient
CPU 密集任務直接執行 大量計算卡住事件迴圈 使用執行緒池、進程池或外部微服務
忘記關閉資源 AsyncClient、資料庫連線未關閉導致資源泄漏 startup / shutdown 事件中管理生命週期
無限制的併發 同時發起過多請求導致外部服務 429 或 OOM 使用 Semaphoreasyncio.BoundedSemaphore 限流

最佳實踐清單

  1. 全程使用非同步客戶端:資料庫、快取、HTTP、檔案 I/O。
  2. 建立與釋放資源:在 app.on_event("startup") 建立連線池,在 shutdown 時關閉。
  3. 合理使用執行緒池:僅將 阻塞 工作交給 run_in_threadpool,避免過度使用。
  4. 批次化 I/O:使用 asyncio.gatherasyncio.wait 同時發送多筆請求。
  5. 限制併發:根據外部服務的容忍度設定 Semaphore,或在反向代理(NGINX)層面加上速率限制。
  6. 監控與測試:使用 Locustk6wrk 等工具測試併發量,配合 prometheus_client 收集延遲、CPU、記憶體指標。

實際應用場景

場景 為何需要 async 效能分析 典型解法
電商平台商品搜尋 高併發的搜尋請求會同時呼叫 ElasticSearch、Redis、MySQL 使用 async-elasticsearchaioredis,把多個 I/O 並行化
即時聊天系統 每條訊息需同時寫入 DB、推送 WebSocket、寫入 Kafka WebSocket 端點使用 async,Kafka 生產者使用 aiokafka,DB 使用 asyncpg
金融資料抓取 每秒鐘要向 10+ 外部 API 拉取行情,且 API 有速率限制 使用 asyncio.Semaphore 控制同時請求數,asyncio.gather 批次發送
影像處理服務 上傳後需要呼叫外部 AI 推論服務(CPU 密集) 把推論工作放入 Celery + RabbitMQ,FastAPI 只負責排程(await
大規模報表產生 報表需要同時查詢多個資料來源,資料量龐大 把每個子查詢交給執行緒池或分散式 Spark,FastAPI 僅協調結果

總結

async 在 FastAPI 中提供了 高併發、低阻塞 的能力,但若忽視瓶頸、混用同步阻塞呼叫,效能不但不會提升,反而會倒退。本文重點如下:

  1. 辨識 I/O 與 CPU 密集工作,只把 I/O 放入協程。
  2. 使用非同步客戶端asyncpghttpxmotor)以及 連線池,減少建立連線的開銷。
  3. 合理安排 await 時機,避免過度細分的協程切換。
  4. 將阻塞運算交給執行緒池 或外部服務,保持事件迴圈暢通。
  5. 以 Semaphore、batch I/O 控制併發,防止外部系統被打垮。
  6. 監控與壓測 是驗證效能優化是否成功的唯一方法。

掌握以上概念與實作技巧,開發者即可在 FastAPI 專案中 發揮非同步的最大威力,同時避免常見的效能陷阱,打造出既快速又可靠的服務。祝開發順利,效能長虹! 🚀