Python – 並行與非同步(Concurrency & Async)
主題:async / await 語法
簡介
在現代的 Web 服務、爬蟲、即時資料處理等應用中,同時處理多件事 已成為常態需求。傳統的多執行緒或多行程解決方案固然有效,但在 I/O 密集的情況下會產生大量的上下文切換與資源浪費。Python 於 3.5 版正式導入的 async / await 語法,提供了一種 協程(coroutine) 的非同步程式設計方式,讓開發者可以以同步程式碼的寫法,撰寫出高效能且易於維護的非同步程式。
本篇文章將從 概念、實作、常見陷阱 以及 實務應用 四個面向,全面說明 async / await 的使用方式,幫助讀者快速上手,並在真實專案中正確運用。
核心概念
1. 協程(Coroutine)是什麼?
協程是一種「可暫停」的函式,允許在執行過程中暫時交出控制權,讓其他協程得以執行。與傳統的 阻塞式 I/O 不同,協程在等待外部資源(如網路、磁碟)時不會佔用 CPU,從而提升整體效能。
關鍵點:協程本身仍在單一執行緒內執行,不會自動產生多執行緒。如果需要 CPU 密集型運算,仍須配合
concurrent.futures或多行程。
2. async / await 的語法結構
| 關鍵字 | 用途 | 範例 |
|---|---|---|
async def |
定義一個協程函式 | async def fetch(url): |
await |
暫停協程,等待另一個協程完成 | data = await response.text() |
async with |
非同步上下文管理器(如 aiofiles、aiohttp) |
async with aiofiles.open(...) as f: |
async for |
非同步迭代器(如 aiohttp.StreamReader) |
async for line in resp.content: |
注意:
await只能在async def定義的函式內使用,否則會拋出SyntaxError。
3. 事件迴圈(Event Loop)
事件迴圈是協程執行的核心。它負責 排程、執行 以及 切換 協程,確保在 I/O 完成時喚醒相應的協程。Python 內建的 asyncio 模組提供了 get_event_loop()、run() 等 API。
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1) # 暫停 1 秒,讓出控制權
print("World")
# Python 3.7+ 的簡潔寫法
asyncio.run(hello())
4. 非同步函式 vs. 同步函式
| 項目 | 同步函式 | 非同步函式 (async def) |
|---|---|---|
| 呼叫方式 | 直接呼叫 | 必須 await 或交給事件迴圈 |
| 阻塞行為 | 會阻塞執行緒 | 只在 await 時暫停,其他時間可執行別的協程 |
| 適用情境 | CPU 密集、簡單腳本 | I/O 密集(網路請求、檔案 I/O) |
程式碼範例
以下提供 五個 常見且實用的範例,從最基礎的協程到結合外部套件的完整流程,逐步說明每段程式碼的意圖與注意事項。
範例 1️⃣:最簡單的協程與 await asyncio.sleep
import asyncio
async def count_down(name: str, seconds: int):
"""倒數計時示範,使用 await 讓出控制權"""
for i in range(seconds, 0, -1):
print(f"{name} -> {i}s")
await asyncio.sleep(1) # 真正的非同步等待
print(f"{name} 完成!")
async def main():
# 同時執行兩個倒數計時
await asyncio.gather(
count_down("Task A", 3),
count_down("Task B", 5)
)
asyncio.run(main())
說明
asyncio.gather會同時排程多個協程,等全部完成才回傳。await asyncio.sleep(1)不會 佔用 CPU,等同於「讓出」1 秒的時間給其他協程。
範例 2️⃣:非同步 HTTP 請求(使用 aiohttp)
import asyncio
import aiohttp # pip install aiohttp
async def fetch(url: str) -> str:
"""非同步取得單一網頁內容"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
# await 讀取完整文字內容
return await resp.text()
async def main():
urls = [
"https://www.python.org",
"https://httpbin.org/delay/2", # 模擬 2 秒延遲
"https://api.github.com"
]
# 同時發送三個請求
results = await asyncio.gather(*(fetch(u) for u in urls))
for i, content in enumerate(results):
print(f"URL {i+1} 取得長度: {len(content)}")
asyncio.run(main())
說明
aiohttp.ClientSession為非同步的 HTTP 客戶端,必須以async with包住。await resp.text()會在收到完整回應前暫停協程,讓其他協程繼續執行。
範例 3️⃣:非同步檔案 I/O(使用 aiofiles)
import asyncio
import aiofiles # pip install aiofiles
async def write_numbers(path: str, n: int):
"""將 1~n 寫入檔案,每次寫入前暫停 0.1 秒"""
async with aiofiles.open(path, mode='w') as f:
for i in range(1, n + 1):
await f.write(f"{i}\n")
await asyncio.sleep(0.1) # 模擬其他工作
async def read_numbers(path: str):
"""非同步讀取檔案內容"""
async with aiofiles.open(path, mode='r') as f:
content = await f.read()
return content
async def main():
file_path = "numbers.txt"
await write_numbers(file_path, 20)
data = await read_numbers(file_path)
print("檔案內容:")
print(data)
asyncio.run(main())
說明
aiofiles讓檔案的讀寫操作也能以非同步方式執行,避免在大量 I/O 時阻塞事件迴圈。await asyncio.sleep(0.1)僅為示範,可省略。
範例 4️⃣:併發控制(使用 asyncio.Semaphore)
在大量同時請求時,若不加限制可能會觸發服務端的 rate limit。以下示範使用信號量限制同時執行的協程數量。
import asyncio
import aiohttp
MAX_CONCURRENT = 3 # 同時最多 3 個請求
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
async def limited_fetch(url: str):
async with semaphore: # 取得信號量
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()
async def main():
urls = [f"https://httpbin.org/delay/{i%3}" for i in range(10)]
results = await asyncio.gather(*(limited_fetch(u) for u in urls))
print(f"取得 {len(results)} 個回應")
asyncio.run(main())
說明
asyncio.Semaphore確保 同時 只會有MAX_CONCURRENT個協程在執行session.get,其餘協程會被暫停,直到有空位釋放。
範例 5️⃣:結合 CPU 密集任務(使用 run_in_executor)
非同步適合 I/O,但若要在同一事件迴圈內處理 CPU 密集計算,可將其委派給執行緒池(或進程池)。
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def cpu_bound(n: int) -> int:
"""模擬 CPU 密集運算:計算 n 的階乘"""
result = 1
for i in range(2, n + 1):
result *= i
return result
async def async_factorial(n: int, executor):
loop = asyncio.get_running_loop()
# 將 CPU 任務交給執行緒池,await 其完成
return await loop.run_in_executor(executor, cpu_bound, n)
async def main():
executor = ThreadPoolExecutor(max_workers=4)
numbers = [20_000, 30_000, 40_000, 50_000]
tasks = [async_factorial(n, executor) for n in numbers]
start = time.time()
results = await asyncio.gather(*tasks)
print(f"總耗時 {time.time() - start:.2f} 秒")
# 只示範結果長度
print([len(str(r)) for r in results])
asyncio.run(main())
說明
loop.run_in_executor會把cpu_bound放到執行緒池中執行,協程在等待結果時不會阻塞事件迴圈。- 若任務更為 CPU 密集,建議改用
ProcessPoolExecutor,避免 GIL 限制。
常見陷阱與最佳實踐
| 陷阱 | 可能的後果 | 解決方式 |
|---|---|---|
在同步函式中直接使用 await |
SyntaxError |
必須把包含 await 的程式碼放入 async def,或使用 asyncio.run 包裹 |
忘記 await 協程 |
協程不會執行,程式看似「卡住」或根本不會有結果 | 確認每個協程呼叫都有 await,或使用 asyncio.create_task 取得 Task 物件 |
阻塞 I/O(如 time.sleep)混入協程 |
事件迴圈被卡住,所有協程停頓 | 替換為 await asyncio.sleep,或使用 run_in_executor |
| 過度併發導致資源耗盡 | 網路服務被封鎖、記憶體暴增 | 使用 Semaphore、Queue 或限制 max_workers |
忘記關閉 ClientSession |
連線資源泄漏,程式結束時警告 | 使用 async with aiohttp.ClientSession() as session: 或手動 await session.close() |
在事件迴圈外部呼叫 asyncio.run 多次 |
RuntimeError: Event loop is closed |
僅在程式入口處呼叫一次 asyncio.run,其他地方使用 await 或 asyncio.create_task |
最佳實踐
- 最小化阻塞:所有可能等待的操作(網路、磁碟、睡眠)都應該使用非同步 API。
- 使用
async with/async for:確保資源在協程結束時正確釋放。 - 分層設計:底層 I/O 使用非同步,業務邏輯保持同步或以
await包裝,提升可測試性。 - 錯誤處理:在協程內使用
try/except,必要時捕捉asyncio.TimeoutError、aiohttp.ClientError等例外。 - 監控與日誌:非同步程式的執行順序較難追蹤,建議加入結構化日誌(如
loguru)與執行時間統計。
實際應用場景
| 場景 | 為何適合使用 async / await |
|---|---|
| Web Scraper(大量抓取網頁) | 每個 HTTP 請求都是 I/O,使用 aiohttp 能同時發起上百個請求,縮短總耗時。 |
| 即時聊天伺服器(WebSocket) | 需要同時處理多個連線的訊息收發,asyncio 的事件迴圈天然支援多路復用。 |
| 資料管線(ETL) | 從遠端 API 抓取資料、寫入雲端儲存或資料庫(如 aiopg、aiomysql),非同步可在等待 DB 回應時處理下一筆資料。 |
| 定時任務 & 背景工作 | 使用 asyncio.create_task 配合 asyncio.sleep 實作自動重試、輪詢等機制,避免額外的排程工具。 |
| 多媒體串流(音訊/影片) | 讀寫檔案與網路傳輸同時進行,非同步 I/O 能降低緩衝延遲,提升使用者體驗。 |
範例:一個簡易的即時股票價格推播服務
import asyncio, aiohttp, websockets, json async def fetch_price(symbol): url = f"https://api.example.com/price/{symbol}" async with aiohttp.ClientSession() as s: async with s.get(url) as r: data = await r.json() return data["price"] async def push_updates(ws, symbols): while True: prices = await asyncio.gather(*(fetch_price(sym) for sym in symbols)) await ws.send(json.dumps(dict(zip(symbols, prices)))) await asyncio.sleep(1) # 每秒推送一次 async def main(): symbols = ["AAPL", "GOOG", "TSLA"] async with websockets.serve(lambda ws, _: push_updates(ws, symbols), "0.0.0.0", 8765): await asyncio.Future() # 永久執行 asyncio.run(main())這段程式碼同時向三支股票查價,並透過 WebSocket 即時推送給前端,用不到任何額外的執行緒或進程。
總結
async/await為 協程 的語法糖,讓非同步程式碼寫起來像同步程式,降低學習曲線。- 核心在於 事件迴圈:協程在等待 I/O 時交出控制權,讓其他協程繼續執行,提升 CPU 利用率。
- 常見的非同步套件(
aiohttp、aiofiles、aiomysql等)都遵循同一套async/await規範,易於組合使用。 - 使用時務必避免阻塞呼叫、適度限制併發、妥善管理資源,才能發揮非同步的最大效益。
掌握了 async / await,你就能在 高併發、即時互動、大量 I/O 的場景中寫出既簡潔又高效的 Python 程式。祝你在未來的專案裡,玩得開心、寫得順手! 🚀