FastAPI – 非同步程式設計(Async Programming)
I/O bound vs CPU bound
簡介
在現代的 Web 框架中,非同步(asynchronous) 已成為提升效能的關鍵技術。FastAPI 天生支援 async / await,讓開發者可以輕鬆寫出 非阻塞 的 API。要真正善用這項特性,首先必須了解 I/O bound 與 CPU bound 兩種工作類型的差異,因為它們決定了我們應該使用非同步還是同步(或是多執行緒/多行程)的方式來處理。
- I/O bound:程式大部分時間在等待外部資源(磁碟、資料庫、網路、檔案上傳等)回應。
- CPU bound:程式主要耗費 CPU 時間在計算、加密、影像處理等繁重運算上。
正確辨識任務屬性,才能避免「把 CPU 密集的工作塞進非同步事件迴圈」而造成效能倒退,或是「把簡單的 I/O 任務硬塞進多執行緒」而浪費資源。本篇文章將從概念出發,結合 FastAPI 的實作範例,說明如何在不同情境下選擇最適合的處理方式。
核心概念
1️⃣ 什麼是 I/O bound?
I/O bound 任務在執行時會大量「等待」外部設備完成資料傳輸。舉例來說:
- 從資料庫查詢資料
- 呼叫第三方 API
- 讀寫本機或雲端檔案
- 等待使用者上傳檔案
在等待期間,CPU 實際上是空閒的。如果我們使用 同步 的寫法(例如 def),整個執行緒會被「阻塞」住,導致同時只能處理少量請求。相反地,使用 非同步(async def)可以把執行緒釋放回事件迴圈,讓其他請求得以繼續執行。
範例 1:同步的資料庫查詢(阻塞)
from fastapi import FastAPI
import time
import sqlite3
app = FastAPI()
def query_user_sync(user_id: int):
conn = sqlite3.connect("example.db")
cursor = conn.cursor()
time.sleep(2) # 模擬慢查詢
cursor.execute("SELECT * FROM users WHERE id=?", (user_id,))
return cursor.fetchone()
@app.get("/sync/user/{user_id}")
def get_user_sync(user_id: int):
user = query_user_sync(user_id) # 這裡會阻塞整個執行緒
return {"user": user}
⚠️ 注意:
time.sleep會讓整個執行緒停下來,其他請求必須等待。
範例 2:非同步的資料庫查詢(不阻塞)
from fastapi import FastAPI
import asyncio
import aiosqlite
app = FastAPI()
async def query_user_async(user_id: int):
async with aiosqlite.connect("example.db") as db:
await asyncio.sleep(2) # 模擬慢查詢,但不阻塞
async with db.execute("SELECT * FROM users WHERE id=?", (user_id,)) as cursor:
return await cursor.fetchone()
@app.get("/async/user/{user_id}")
async def get_user_async(user_id: int):
user = await query_user_async(user_id) # 釋放執行緒,允許其他請求繼續
return {"user": user}
重點:
await會暫停當前協程,讓事件迴圈去執行其他協程,從而提升併發量。
2️⃣ 什麼是 CPU bound?
CPU bound 任務的瓶頸在於 計算資源,例如:
- 大量數據的統計、排序
- 圖片/影片的編碼或壓縮
- 密碼雜湊、加解密
- 機器學習模型推論
即使使用 await,CPU 密集的程式碼仍會佔用事件迴圈的執行緒,導致所有協程都被卡住。此時,我們應該把工作交給 執行緒池(ThreadPoolExecutor) 或 多行程(ProcessPoolExecutor),讓 CPU 密集的工作在其他執行緒/行程中平行執行。
範例 3:非同步入口卻執行 CPU 密集工作(效能倒退)
import math
from fastapi import FastAPI
app = FastAPI()
def heavy_computation(n: int) -> int:
# 計算 n 的階乘,會佔用大量 CPU
return math.factorial(n)
@app.get("/async/factorial/{n}")
async def factorial_async(n: int):
# 直接在協程內呼叫會阻塞事件迴圈
result = heavy_computation(n)
return {"result": result}
結果:即使路由是
async def,仍會把事件迴圈卡住,其他請求無法同時處理。
範例 4:使用執行緒池處理 CPU 密集工作
import math
import asyncio
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI
app = FastAPI()
executor = ThreadPoolExecutor(max_workers=4) # 依據 CPU 核心數調整
def heavy_computation(n: int) -> int:
return math.factorial(n)
@app.get("/async/factorial_thread/{n}")
async def factorial_thread(n: int):
loop = asyncio.get_running_loop()
# 把 CPU 密集工作交給執行緒池
result = await loop.run_in_executor(executor, heavy_computation, n)
return {"result": result}
說明:
run_in_executor會把heavy_computation移交給執行緒池,事件迴圈得以繼續處理其他協程。
範例 5:使用 ProcessPoolExecutor 處理更重的計算
import math
import asyncio
from concurrent.futures import ProcessPoolExecutor
from fastapi import FastAPI
app = FastAPI()
process_pool = ProcessPoolExecutor(max_workers=2) # 適合 CPU 密集且不共享全域變數
def heavy_computation(n: int) -> int:
return math.factorial(n)
@app.get("/async/factorial_process/{n}")
async def factorial_process(n: int):
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(process_pool, heavy_computation, n)
return {"result": result}
為什麼使用 ProcessPool:在 CPU 密集且需要大量運算的情況下,子行程可避免 GIL(全域解釋器鎖)的限制,真正達到多核心平行。
3️⃣ I/O 與 CPU 何時混合?
實務上,許多 API 同時涉及 I/O 與 CPU。例如:
- 接收到上傳的圖片(I/O) → 需要做圖像壓縮或辨識(CPU) → 再把結果寫回資料庫(I/O)。
這種 混合工作流 建議:
- I/O:使用
async/await搭配非同步驅動(如aiohttp、aioredis)。 - CPU:在 I/O 完成後,將 CPU 密集的部份交給執行緒或行程池。
範例 6:圖片上傳 + 縮圖(混合)
import asyncio
from fastapi import FastAPI, File, UploadFile
from PIL import Image
from io import BytesIO
from concurrent.futures import ThreadPoolExecutor
app = FastAPI()
executor = ThreadPoolExecutor(max_workers=4)
def resize_image(data: bytes, size: tuple[int, int]) -> bytes:
img = Image.open(BytesIO(data))
img.thumbnail(size)
out = BytesIO()
img.save(out, format="JPEG")
return out.getvalue()
@app.post("/upload/thumbnail")
async def upload_thumbnail(file: UploadFile = File(...)):
# 1. 非同步讀取檔案 (I/O bound)
content = await file.read()
# 2. CPU 密集的縮圖工作交給執行緒池
loop = asyncio.get_running_loop()
thumb_bytes = await loop.run_in_executor(executor, resize_image, content, (200, 200))
# 3. 回傳縮圖 (I/O bound)
return {"filename": file.filename, "thumbnail_size": len(thumb_bytes)}
關鍵:
await file.read()為非同步 I/O,run_in_executor處理 CPU 密集的圖像縮放,兩者配合讓 API 在高併發情境下依然保持流暢。
常見陷阱與最佳實踐
| 陷阱 | 說明 | 解決方式 |
|---|---|---|
把阻塞 I/O 直接放在 async def |
使用同步的資料庫或檔案 API(如 sqlite3、requests)會阻塞事件迴圈。 |
改用非同步套件(aiosqlite、httpx.AsyncClient)或 run_in_executor。 |
| 在非同步端點呼叫大量 CPU 密集函式 | 即使路由是 async,CPU 密集函式仍會卡住整個迴圈。 |
使用 run_in_executor,依需求選擇 ThreadPoolExecutor 或 ProcessPoolExecutor。 |
| 過度使用執行緒池 | 每個請求都開新執行緒會造成上下文切換成本,反而降低效能。 | 設定適當的 max_workers,並盡量重複使用同一個執行緒池。 |
| 忘記關閉資源 | 非同步資料庫連線或執行緒池未關閉會造成資源泄漏。 | 使用 async with、app.on_event("shutdown") 釋放資源。 |
| 混用同步與非同步程式碼 | 同步函式內部呼叫 await 會拋出語法錯誤。 |
確保所有 await 只出現在 async def 中,必要時將同步程式碼封裝成協程。 |
最佳實踐清單
- 先辨識工作類型:是 I/O 還是 CPU,或兩者混合?
- 選擇正確的工具:
- I/O → 非同步套件 (
httpx,aioredis,aiomysql…) - CPU →
run_in_executor+ 適當的執行緒/行程池
- I/O → 非同步套件 (
- 限制併發數:使用
asyncio.Semaphore或 FastAPI 的Depends來控制同時執行的 I/O 請求數量,避免 DB 連線耗盡。 - 監控與測試:使用
uvicorn --workers搭配locust、hey等壓測工具,比對同步 vs 非同步的 RPS、延遲差異。 - 保持程式碼可讀性:將 I/O 與 CPU 密集的邏輯分層,讓協程只負責協調,實際工作交給底層函式。
實際應用場景
| 場景 | 典型任務 | 推薦寫法 |
|---|---|---|
| 即時聊天服務 | WebSocket 接收訊息 → 寫入 Redis → 廣播給其他使用者 | 完全非同步 (async def + aioredis) |
| 批次報表產生 | 從 PostgreSQL 讀取大量資料 → 計算統計 → 產生 PDF | I/O 使用 asyncpg,CPU 使用 run_in_executor(或 Celery) |
| 圖像辨識 API | 上傳圖片 → 前置處理 (CPU) → 呼叫 TensorFlow 服務 (I/O) | 前置處理交給 ProcessPoolExecutor,遠端模型呼叫使用 httpx.AsyncClient |
| 第三方支付整合 | 呼叫支付平台 API → 寫入交易紀錄 → 回傳結果 | 完全非同步,使用 httpx.AsyncClient + aiosqlite/asyncpg |
| 資料匯入/匯出 | 從 S3 下載 CSV → 解析 → 寫入資料庫 | 下載使用 aioboto3,解析交給 ThreadPoolExecutor,寫入使用 asyncpg |
總結
- I/O bound 任務適合 非同步,能讓事件迴圈在等待外部資源時處理其他請求,提升併發度。
- CPU bound 任務則應交給 執行緒池或行程池,避免阻塞事件迴圈,同時利用多核心效能。
- 在 FastAPI 中,正確的工具選擇(
async/await、run_in_executor、非同步資料庫驅動)與 清晰的程式碼結構 是提升服務效能與可維護性的關鍵。 - 透過上述範例與最佳實踐,你可以在開發過程中快速判斷任務類型,選擇最適合的處理方式,讓你的 API 在高併發環境下仍然保持 快速、穩定。
祝你在 FastAPI 的非同步旅程中,寫出更快、更可靠的服務! 🚀