FastAPI 課程 – 非同步程式設計
主題:asyncio.create_task
簡介
在 FastAPI 中,非同步(asynchronous)是提升 API 效能與可擴充性的關鍵技術。
傳統的同步函式會在 I/O(例如資料庫查詢、外部 API 呼叫)等待時阻塞整個事件迴圈,導致同時處理的請求數量受限。
使用 asyncio.create_task 可以把一段協程(coroutine)排入事件迴圈,讓它在背景執行,而不必等到結果返回才繼續後續邏輯,從而實現 「同時跑多件事」 的效果。
本篇文章將從概念說明、實作範例、常見陷阱與最佳實踐,直到實務應用案例,完整呈現 asyncio.create_task 在 FastAPI 專案中的使用方式,適合剛接觸非同步的初學者,也能提供中級開發者進一步優化的參考。
核心概念
1. 協程、事件迴圈與任務(Task)
- 協程(coroutine):以
async def定義的函式,返回一個 coroutine object,只有在事件迴圈中被 await 或 排程 才會真正執行。 - 事件迴圈(event loop):管理所有待執行的協程與 I/O 事件的核心機制,FastAPI 內部會在啟動時自動建立並運行。
- 任務(Task):
asyncio.create_task(coro)會把協程包裝成 Task,將其加入事件迴圈,並立即排程執行。Task 會在完成時持有結果或例外。
重點:
create_task不會阻塞 呼叫端,函式會直接返回一個Task物件,讓程式可以繼續往下走。
2. 為什麼不直接 await?
await 會等到協程完成才返回結果,適合需要即時結果的情況。
但在以下情境下,我們更想「先把工作交給背景」:
- 不需要立即回應(例如發送 Email、寫入日誌、觸發長時間的報表產生)。
- 同時執行多個 I/O,不想讓其中一個阻塞其他請求。
- 避免阻塞 FastAPI 的請求處理流程,提升整體吞吐量。
此時 asyncio.create_task 正是最佳選擇。
3. asyncio.create_task 的基本語法
import asyncio
async def my_coroutine(param):
# 模擬 I/O
await asyncio.sleep(1)
return f"Result: {param}"
# 建立任務
task = asyncio.create_task(my_coroutine("hello"))
# 之後可以透過 await 取得結果
result = await task # 這裡仍會等待 task 完成
create_task必須在 已啟動的事件迴圈 中呼叫,否則會拋出RuntimeError。- 返回的
Task物件支援await、cancel()、add_done_callback()等方法。
程式碼範例
以下示範 5 個實用情境,從最簡單的背景任務,到在 FastAPI 路由中結合 create_task。
範例 1️⃣ 基礎背景任務
import asyncio
async def background_job():
print("🔹 任務開始")
await asyncio.sleep(2) # 假裝是長時間 I/O
print("🔹 任務完成")
async def main():
# 把協程排入背景
task = asyncio.create_task(background_job())
print("⚡ 主流程繼續執行")
await asyncio.sleep(1) # 主流程仍可執行其他事
await task # 最後等待任務結束(可省略)
asyncio.run(main())
說明:
create_task讓background_job在背景執行,main不會被阻塞。
範例 2️⃣ 同時執行多個協程
import asyncio
async def fetch_data(idx: int):
await asyncio.sleep(1 + idx * 0.2) # 模擬不同延遲
return f"data-{idx}"
async def main():
tasks = [asyncio.create_task(fetch_data(i)) for i in range(5)]
# 同時跑 5 個任務,立即返回
results = await asyncio.gather(*tasks) # gather 會等所有任務完成
print("所有結果:", results)
asyncio.run(main())
技巧:使用
asyncio.gather取得多個 Task 的結果,避免逐一await造成的序列化。
範例 3️⃣ 在 FastAPI 路由中觸發背景任務
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
async def send_email(to: str, subject: str, body: str):
# 假設這裡呼叫外部 SMTP 服務
await asyncio.sleep(3)
print(f"📧 Email sent to {to}")
@app.post("/notify")
async def notify_user(email: str, background_tasks: BackgroundTasks):
# 直接使用 FastAPI 提供的 BackgroundTasks
background_tasks.add_task(asyncio.create_task, send_email(email, "Welcome", "Hello!"))
return {"message": "通知已排程"}
重點:FastAPI 的
BackgroundTasks允許在回應前排程任務,內部仍會使用asyncio.create_task來執行非同步工作。
範例 4️⃣ 取消(Cancel)背景任務
import asyncio
async def long_running():
try:
for i in range(10):
await asyncio.sleep(1)
print(f"🔄 第 {i+1} 秒")
except asyncio.CancelledError:
print("🛑 任務被取消")
raise
async def main():
task = asyncio.create_task(long_running())
await asyncio.sleep(3) # 讓任務跑 3 秒
task.cancel() # 取消任務
try:
await task
except asyncio.CancelledError:
print("✅ 已捕獲取消例外")
asyncio.run(main())
說明:若任務不再需要,可呼叫
task.cancel(),並在協程內捕獲CancelledError完成清理。
範例 5️⃣ 任務完成回呼(Callback)
import asyncio
async def compute(x: int):
await asyncio.sleep(2)
return x * x
def on_done(task: asyncio.Task):
try:
result = task.result()
print(f"✅ 任務完成,結果 = {result}")
except Exception as e:
print(f"❗ 任務失敗:{e}")
async def main():
task = asyncio.create_task(compute(7))
task.add_done_callback(on_done) # 任務結束時自動呼叫
await asyncio.sleep(3) # 主流程繼續
asyncio.run(main())
技巧:
add_done_callback可用於 記錄、發送事件 或 更新快取 等需求,避免在主流程中額外await。
常見陷阱與最佳實踐
| 陷阱 | 說明 | 解決方式 |
|---|---|---|
未在事件迴圈內呼叫 create_task |
在同步程式碼或尚未啟動的迴圈中使用會拋 RuntimeError。 |
使用 asyncio.get_running_loop() 或在 async def 裡呼叫。 |
忘記 await 任務結果 |
若任務拋例外,未 await 會導致「未處理的例外」警告。 |
在需要結果的地方 await task,或在 add_done_callback 中處理例外。 |
| 過度建立任務 | 大量短暫任務會造成 Task 內部排程成本,甚至記憶體飆升。 | 使用 semaphore 或 worker pool(asyncio.Semaphore、asyncio.Queue)限制同時任務數。 |
| 阻塞型程式碼 | 在協程內呼叫同步的阻塞函式(如 time.sleep)會凍結整個事件迴圈。 |
改用非同步函式(await asyncio.sleep),或使用 ThreadPoolExecutor 包裝。 |
| 背景任務失去上下文 | FastAPI 的依賴注入(DI)在背景任務中可能失效。 | 在背景任務中手動傳遞需要的資源(如 DB 連線、設定),或使用 ContextVar 保留上下文。 |
最佳實踐
- 盡量在同一層級建立 Task:不要在已完成的 Task 裡再呼叫
create_task,會增加不必要的層次。 - 使用
asyncio.create_task取代ensure_future:前者語意更清晰,且在 Python 3.7+ 為官方推薦。 - 對長時間任務加上 Timeout:
asyncio.wait_for(task, timeout)防止無限等待。 - 在 FastAPI 中搭配
BackgroundTasks:讓路由返回前即完成排程,保持 API 響應速度。 - 記錄任務狀態:使用
task.add_done_callback或自訂監控,方便除錯與觀察。
實際應用場景
| 場景 | 為什麼適合使用 asyncio.create_task |
|---|---|
| 發送驗證 Email | 用戶註冊後立即回傳成功訊息,同時在背景發信,避免等待 SMTP 回應。 |
| 產生 PDF 報表 | 前端請求產生報表,API 回傳「已排程」訊息,報表完成後透過 WebSocket 推送或寫入資料庫。 |
| 批次資料同步 | 每次接收到 webhook 時,立即回應 200,並在背景同步外部系統資料。 |
| 快取預熱 | 當熱門資源被請求時,同步回傳結果,並在背景異步預先載入相關快取,以提升後續請求速度。 |
| 定時任務 (Scheduler) 替代 | 使用 asyncio.create_task 搭配 asyncio.sleep 或 asyncio.wait 實作簡易的內部排程器。 |
範例:在 FastAPI 中,使用
create_task實作「使用者上傳檔案後,立即回傳成功,並在背景壓縮與上傳至 S3」:
from fastapi import FastAPI, UploadFile, File
import aiofiles, asyncio, aioboto3
app = FastAPI()
async def compress_and_upload(file_path: str, bucket: str, key: str):
# 假設使用 async 版的壓縮與 S3 客戶端
await asyncio.sleep(1) # 壓縮模擬
session = aioboto3.Session()
async with session.client('s3') as s3:
await s3.upload_file(file_path, bucket, key)
print("✅ 上傳完成")
@app.post("/upload")
async def upload(file: UploadFile = File(...)):
save_path = f"/tmp/{file.filename}"
async with aiofiles.open(save_path, "wb") as out_file:
content = await file.read()
await out_file.write(content)
# 背景壓縮上傳
asyncio.create_task(compress_and_upload(save_path, "my-bucket", file.filename))
return {"status": "file received, processing in background"}
此模式讓 API 即時回應,同時保證大檔案處理不會阻塞其他使用者。
總結
asyncio.create_task是 將協程排入事件迴圈、以 非阻塞 方式執行的核心工具,對提升 FastAPI 應用的併發能力至關重要。- 透過 背景任務、同時執行多個 I/O、取消與回呼 等技巧,我們可以在不影響主請求流程的前提下,完成 Email、報表、檔案處理等耗時工作。
- 使用時務必注意 事件迴圈的存活、例外處理、任務數量控制,並結合 FastAPI 的
BackgroundTasks、ContextVar等機制,使程式碼保持 可測試、易除錯。
掌握 asyncio.create_task,不僅能讓你的 FastAPI 服務 更快、更穩,也為未來的微服務與雲端原生應用奠定堅實的非同步基礎。祝開發順利,快去試試看吧! 🚀