FastAPI – 效能與最佳化
主題:async 效能瓶頸分析
簡介
在 FastAPI 中,使用 async / await 可以讓 API 處理大量同時請求時保持高併發、低資源佔用,這也是 FastAPI 相較於傳統同步框架(如 Flask)最吸引人的特點之一。然而,async 本身並不是萬能的藥丸,若使用不當,反而會成為效能的瓶頸。
本篇文章將從概念、實作、常見陷阱與最佳實踐四個面向,深入探討 async 效能瓶頸 的根源,並提供可直接套用在實務專案中的程式碼範例,幫助初學者到中級開發者在開發 FastAPI 應用時,既能保留非同步的優勢,又能避免常見的效能踩雷。
核心概念
1. 為什麼需要 async?
IO‑bound 與 CPU‑bound:
- IO‑bound(例如資料庫、外部 API、檔案 I/O)會因等待外部資源而阻塞執行緒。使用
async可以在等待期間釋放執行緒,讓其他請求繼續執行。 - CPU‑bound(大量計算)則不適合交給
async,因為 Python 的協程仍在同一個執行緒中運行,計算密集的工作會阻塞整個事件迴圈。
- IO‑bound(例如資料庫、外部 API、檔案 I/O)會因等待外部資源而阻塞執行緒。使用
事件迴圈(Event Loop):
FastAPI 建立在 Starlette 之上,而 Starlette 使用 uvicorn(或 hypercorn)作為 ASGI 伺服器,底層是asyncio事件迴圈。所有async def的路由都會被事件迴圈調度。
重點:只有在 等待 I/O 時才能真正釋放 CPU,否則
await只會把同一個執行緒卡住。
2. await 的成本
每一次 await 都會觸發一次 協程切換(context switch),這在 C 語言層級的切換成本極低,但在 Python 中仍會產生:
- 堆疊保存與恢復:協程的局部變數、指令指標需要保存與恢復。
- 事件迴圈的排程:若大量小碎片的
await(如每 1msawait asyncio.sleep(0)),會造成事件迴圈頻繁喚醒,降低吞吐量。
實務建議:盡量把同類型的 I/O 合併,減少不必要的
await,尤其是同步阻塞的呼叫被包在await中時,會直接把執行緒卡死。
3. 同步函式(Blocking)與協程的混用
def sync_heavy():
# 這是一段阻塞的 CPU 密集運算
total = 0
for i in range(10_000_000):
total += i
return total
如果在 async 路由中直接呼叫 sync_heavy(),整個事件迴圈會被卡住,所有其他連線都會被阻塞。正確的做法是:
- 將阻塞工作交給執行緒池:
await run_in_threadpool(sync_heavy)(Starlette 提供的工具) - 或改寫為原生非同步:使用
numpy、numba等加速庫,或把計算外包給 microservice。
4. 資料庫與外部服務的非同步客戶端
| 同步客戶端 | 非同步客戶端 | 典型使用方式 |
|---|---|---|
psycopg2(PostgreSQL) |
asyncpg |
await conn.fetch(...) |
pymongo(MongoDB) |
motor |
await collection.find_one(...) |
requests(HTTP) |
httpx(async 模式) |
await client.get(url) |
關鍵:若使用同步客戶端,即使路由是
async def,仍會阻塞事件迴圈。因此在效能敏感的服務中,務必選擇非同步客戶端。
5. 併發控制:asyncio.gather vs asyncio.wait
asyncio.gather(*tasks, return_exceptions=True):一次收集多個協程,若任務失敗仍會返回例外,適合批次 I/O。asyncio.wait(tasks, return_when=FIRST_COMPLETED):在任務完成條件滿足時即返回,適合**競賽(race)**情境。
程式碼範例
以下示範 4 個常見的 async 效能優化案例,皆可直接貼到 FastAPI 專案中測試。
範例 1:使用非同步資料庫客戶端(asyncpg)
# app/database.py
import asyncpg
from typing import List
class PostgresClient:
def __init__(self, dsn: str):
self.dsn = dsn
self.pool = None
async def connect(self):
self.pool = await asyncpg.create_pool(dsn=self.dsn, min_size=5, max_size=20)
async def fetch_users(self) -> List[dict]:
async with self.pool.acquire() as conn:
rows = await conn.fetch("SELECT id, name, email FROM users LIMIT 100")
return [dict(row) for row in rows]
# 初始化(在 main.py 中呼叫)
# await PostgresClient(dsn).connect()
說明
- 使用連線池(
min_size、max_size)減少建立連線的開銷。 await conn.fetch為非同步 I/O,允許同時處理其他請求。
範例 2:將阻塞計算交給執行緒池
# app/compute.py
import asyncio
from starlette.concurrency import run_in_threadpool
def heavy_calculation(n: int) -> int:
total = 0
for i in range(n):
total += i * i
return total
async def async_heavy_calculation(n: int) -> int:
# 交給執行緒池,避免阻塞事件迴圈
result = await run_in_threadpool(heavy_calculation, n)
return result
說明
run_in_threadpool會把heavy_calculation放到 ThreadPoolExecutor,在背景執行。- 呼叫端只需
await async_heavy_calculation(10_000_000),不會卡住其他協程。
範例 3:批次呼叫外部 API(httpx)並行
# app/external.py
import httpx
import asyncio
async def fetch_one(url: str) -> dict:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(url)
resp.raise_for_status()
return resp.json()
async def fetch_multiple(urls: list[str]) -> list[dict]:
# 使用 asyncio.gather 同時發起多個請求
tasks = [fetch_one(u) for u in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 例外處理:把失敗的結果記錄下來
return [r if not isinstance(r, Exception) else {"error": str(r)} for r in results]
說明
httpx.AsyncClient為非同步 HTTP 客戶端。asyncio.gather讓所有請求同時在事件迴圈中執行,極大提升吞吐量。
範例 4:限制併發數(Semaphore)避免資源耗盡
# app/limit.py
import asyncio
from typing import Any
# 假設外部服務只能同時接受 5 個請求
semaphore = asyncio.Semaphore(5)
async def limited_fetch(url: str) -> Any:
async with semaphore:
async with httpx.AsyncClient() as client:
resp = await client.get(url)
resp.raise_for_status()
return resp.json()
說明
asyncio.Semaphore控制同時執行的協程數,防止外部服務因過度併發被 throttling。- 這種模式在 爬蟲、批次資料同步 時非常常見。
常見陷阱與最佳實踐
| 陷阱 | 為什麼會發生 | 解決方案 |
|---|---|---|
| 同步 I/O 混入 async | 使用 requests、psycopg2 等阻塞套件 |
換成 httpx、asyncpg,或用 run_in_threadpool 包裝 |
| 過度細分 await | 每個小操作都 await(例如 await asyncio.sleep(0)) |
合併相鄰的 I/O,減少切換次數 |
| 未使用連線池 | 每次請求都新建 DB/HTTP 連線 | 建立 連線池(asyncpg.create_pool、httpx.AsyncClient) |
| CPU 密集任務直接執行 | 大量計算卡住事件迴圈 | 使用執行緒池、進程池或外部微服務 |
| 忘記關閉資源 | AsyncClient、資料庫連線未關閉導致資源泄漏 |
在 startup / shutdown 事件中管理生命週期 |
| 無限制的併發 | 同時發起過多請求導致外部服務 429 或 OOM | 使用 Semaphore、asyncio.BoundedSemaphore 限流 |
最佳實踐清單
- 全程使用非同步客戶端:資料庫、快取、HTTP、檔案 I/O。
- 建立與釋放資源:在
app.on_event("startup")建立連線池,在shutdown時關閉。 - 合理使用執行緒池:僅將 阻塞 工作交給
run_in_threadpool,避免過度使用。 - 批次化 I/O:使用
asyncio.gather、asyncio.wait同時發送多筆請求。 - 限制併發:根據外部服務的容忍度設定
Semaphore,或在反向代理(NGINX)層面加上速率限制。 - 監控與測試:使用
Locust、k6、wrk等工具測試併發量,配合prometheus_client收集延遲、CPU、記憶體指標。
實際應用場景
| 場景 | 為何需要 async 效能分析 | 典型解法 |
|---|---|---|
| 電商平台商品搜尋 | 高併發的搜尋請求會同時呼叫 ElasticSearch、Redis、MySQL | 使用 async-elasticsearch、aioredis,把多個 I/O 並行化 |
| 即時聊天系統 | 每條訊息需同時寫入 DB、推送 WebSocket、寫入 Kafka | WebSocket 端點使用 async,Kafka 生產者使用 aiokafka,DB 使用 asyncpg |
| 金融資料抓取 | 每秒鐘要向 10+ 外部 API 拉取行情,且 API 有速率限制 | 使用 asyncio.Semaphore 控制同時請求數,asyncio.gather 批次發送 |
| 影像處理服務 | 上傳後需要呼叫外部 AI 推論服務(CPU 密集) | 把推論工作放入 Celery + RabbitMQ,FastAPI 只負責排程(await) |
| 大規模報表產生 | 報表需要同時查詢多個資料來源,資料量龐大 | 把每個子查詢交給執行緒池或分散式 Spark,FastAPI 僅協調結果 |
總結
async 在 FastAPI 中提供了 高併發、低阻塞 的能力,但若忽視瓶頸、混用同步阻塞呼叫,效能不但不會提升,反而會倒退。本文重點如下:
- 辨識 I/O 與 CPU 密集工作,只把 I/O 放入協程。
- 使用非同步客戶端(
asyncpg、httpx、motor)以及 連線池,減少建立連線的開銷。 - 合理安排
await時機,避免過度細分的協程切換。 - 將阻塞運算交給執行緒池 或外部服務,保持事件迴圈暢通。
- 以 Semaphore、batch I/O 控制併發,防止外部系統被打垮。
- 監控與壓測 是驗證效能優化是否成功的唯一方法。
掌握以上概念與實作技巧,開發者即可在 FastAPI 專案中 發揮非同步的最大威力,同時避免常見的效能陷阱,打造出既快速又可靠的服務。祝開發順利,效能長虹! 🚀