asyncio 基礎 ── Python 並行與非同步入門
簡介
在現代的 Web 服務、爬蟲、即時聊天機器人等應用程式中,同時處理多件事 已經成為必備能力。傳統的多執行緒或多行程雖然能達成並行,但在 Python 中卻常因 GIL(全域解譯鎖)或資源開銷過高而受到限制。Python 3.4 之後加入的 asyncio,提供了一套基於事件迴圈(event loop)的協程(coroutine)機制,使得 單執行緒內的非同步 I/O 成為可能,既省資源又易於維護。
本篇文章將從 概念、實作範例、常見陷阱 以及 實務應用 四個面向,帶你快速掌握 asyncio 的核心要點,讓你能在日常開發中自信地使用非同步程式設計。
核心概念
1. 事件迴圈(Event Loop)
事件迴圈是 asyncio 的「心臟」。它持續監聽各種 I/O 事件(如 socket 可讀、檔案可寫),一旦事件就緒,就呼叫對應的協程繼續執行。程式的控制權在協程之間「讓渡」而不是「切換」——這也是協程比傳統 thread 輕量的原因。
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1) # 交出控制權,讓事件迴圈去做其他事
print("World")
# 直接取得預設事件迴圈並執行
asyncio.run(hello())
重點:
await只能在async def定義的協程內使用,且必須等待一個 awaitable(如asyncio.sleep、Task、Future)。
2. 協程(Coroutine) vs 任務(Task)
- 協程:使用
async def定義的函式,回傳一個 coroutine object。它本身不會自動執行,必須交給事件迴圈。 - 任務(Task):將協程包裝成可排程的物件,交給事件迴圈管理。使用
asyncio.create_task()或loop.create_task()建立。
async def fetch_data(id: int):
await asyncio.sleep(0.5) # 模擬 I/O
return f"data-{id}"
# 建立多個任務,同時執行
async def main():
tasks = [asyncio.create_task(fetch_data(i)) for i in range(5)]
results = await asyncio.gather(*tasks) # 等待全部完成
print(results)
asyncio.run(main())
技巧:
asyncio.gather會一次回傳所有任務的結果,若其中有例外會立即拋出;若想保留失敗任務,可使用return_exceptions=True。
3. 非同步 I/O:await 與 async with
常見的非同步 I/O 包括網路請求、檔案讀寫、資料庫操作等。Python 標準庫的 asyncio 提供了 open_connection、start_server,而第三方套件(如 aiohttp、aiomysql)則擴充了更多功能。
import asyncio
import aiohttp
async def fetch(url: str):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text() # 取得回傳內容
async def main():
urls = ["https://httpbin.org/get"] * 3
tasks = [asyncio.create_task(fetch(u)) for u in urls]
for content in await asyncio.gather(*tasks):
print(content[:60], "...") # 只印前 60 個字元
asyncio.run(main())
async with會在進入與離開區塊時自動呼叫協程的__aenter__/__aexit__,確保資源正確釋放。
4. 取消與逾時(Cancellation & Timeout)
在長時間等待的情況下,常需要提供 逾時機制 或 手動取消 任務,以避免資源卡住。
import asyncio
async def long_job():
try:
await asyncio.sleep(10) # 假設是長時間 I/O
except asyncio.CancelledError:
print("任務被取消")
raise
async def main():
task = asyncio.create_task(long_job())
await asyncio.sleep(2) # 讓任務跑兩秒
task.cancel() # 主動取消
try:
await task
except asyncio.CancelledError:
print("已捕獲取消例外")
asyncio.run(main())
若要設定逾時,可使用 asyncio.wait_for:
await asyncio.wait_for(fetch(url), timeout=3) # 超過 3 秒即拋出 asyncio.TimeoutError
程式碼範例
以下提供 五個實用範例,從最基礎到稍微進階,幫助你快速上手。
範例 1:基本的「Hello, World」協程
import asyncio
async def hello():
print("開始執行")
await asyncio.sleep(1) # 非同步暫停
print("結束執行")
asyncio.run(hello())
說明:
asyncio.sleep不會阻塞整個執行緒,只是交出控制權給事件迴圈。
範例 2:同時下載多個網頁(使用 aiohttp)
import asyncio
import aiohttp
async def download(url: str):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.read()
print(f"{url} 下載完成,大小 {len(data)} bytes")
async def main():
urls = [
"https://httpbin.org/bytes/1024",
"https://httpbin.org/bytes/2048",
"https://httpbin.org/bytes/4096",
]
await asyncio.gather(*(download(u) for u in urls))
asyncio.run(main())
重點:
asyncio.gather讓三個下載任務同時進行,總耗時約等於最慢的那一個。
範例 3:簡易 TCP Echo Server(使用 asyncio Stream)
import asyncio
async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
addr = writer.get_extra_info('peername')
print(f"連線來自 {addr}")
while data := await reader.read(100):
message = data.decode()
print(f"收到: {message!r}")
writer.write(data) # 回傳相同資料
await writer.drain() # 確保資料已送出
print("關閉連線")
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
async with server:
await server.serve_forever()
asyncio.run(main())
說明:
asyncio.start_server會返回一個Server物件,使用async with確保關閉時釋放資源。
範例 4:使用 asyncio.Queue 實作生產者 / 消費者模式
import asyncio
import random
async def producer(q: asyncio.Queue, n: int):
for i in range(n):
await asyncio.sleep(random.uniform(0.1, 0.5))
item = f"item-{i}"
await q.put(item)
print(f"產生 {item}")
async def consumer(q: asyncio.Queue, id: int):
while True:
item = await q.get()
print(f"消費者{id} 處理 {item}")
await asyncio.sleep(random.uniform(0.2, 0.6))
q.task_done()
async def main():
q = asyncio.Queue()
prod = asyncio.create_task(producer(q, 10))
cons1 = asyncio.create_task(consumer(q, 1))
cons2 = asyncio.create_task(consumer(q, 2))
await prod # 等待生產者結束
await q.join() # 等待隊列全部被消費
cons1.cancel()
cons2.cancel()
asyncio.run(main())
關鍵:
Queue自帶的task_done/join能確保所有項目都被處理完畢,避免程式提前結束。
範例 5:結合 Timeout 與例外處理的安全抓取
import asyncio
import aiohttp
async def safe_fetch(url: str, timeout: int = 3):
try:
async with aiohttp.ClientSession() as session:
async with asyncio.wait_for(session.get(url), timeout):
async with session.get(url) as resp:
return await resp.text()
except asyncio.TimeoutError:
return f"{url} 逾時"
except aiohttp.ClientError as e:
return f"{url} 錯誤: {e}"
async def main():
urls = ["https://httpbin.org/delay/5", "https://httpbin.org/get"]
results = await asyncio.gather(*(safe_fetch(u) for u in urls))
for r in results:
print(r[:80], "...")
asyncio.run(main())
技巧:先用
asyncio.wait_for包住session.get,再在內層處理 HTTP 錯誤,讓程式在網路不穩時仍能平穩回傳結果。
常見陷阱與最佳實踐
| 陷阱 | 可能的結果 | 解決方式 |
|---|---|---|
在同步函式中直接呼叫 asyncio.run |
會在已存在的事件迴圈中拋出 RuntimeError |
只在程式入口 (if __name__ == "__main__":) 使用 asyncio.run;在已經有迴圈的環境(如 Jupyter、Web 框架)使用 await 或 loop.create_task |
忘記 await |
協程不會被執行,回傳 <coroutine object ...>,程式行為不如預期 |
確保所有返回 awaitable 的呼叫都加上 await,或使用 asyncio.create_task 明確排程 |
阻塞 I/O 混入協程(例如 time.sleep、requests) |
會阻塞整個事件迴圈,失去非同步效益 | 改用 asyncio.sleep、aiohttp、httpx.AsyncClient 等非同步版本 |
| 過度建立大量 Task | 記憶體與 CPU 壓力升高,甚至觸發 Task was destroyed but it is pending! 警告 |
使用 semaphore 或 asyncio.BoundedSemaphore 限制同時執行的任務數量 |
未正確關閉資源(例如 ClientSession、StreamWriter) |
連線洩漏、檔案描述符耗盡 | 使用 async with 或在 finally 區塊中 await session.close() |
最佳實踐
- 入口統一:整個程式只使用一次
asyncio.run(main()),讓事件迴圈管理全局生命週期。 - 資源管理:盡量使用
async with包裝 I/O 物件,確保例外發生時仍能正確關閉。 - 限制同時度:對外部服務(API、資料庫)使用 semaphore 控制併發數,避免被封鎖或過載。
- 例外傳遞:在協程內捕獲特定例外並重新拋出,讓上層
gather能統一處理。 - 測試與除錯:使用
pytest-asyncio撰寫非同步測試;若需要追蹤事件迴圈,可在asyncio.run前設定PYTHONASYNCIODEBUG=1。
實際應用場景
| 場景 | 為何選擇 asyncio | 範例程式碼或說明 |
|---|---|---|
| Web 爬蟲 | 同時發送上千個 HTTP 請求,節省等待時間 | aiohttp + asyncio.gather |
| 即時聊天機器人 | 需要同時處理多個 WebSocket 連線,且每條訊息都要快速回應 | websockets 套件配合 asyncio.Queue |
| 資料庫批次寫入 | 多筆寫入可同時執行,降低 DB 連線等待 | aiomysql / asyncpg + 連線池 |
| 背景任務排程 | 定時抓取外部 API、清理暫存檔案,且不阻塞主服務 | asyncio.create_task + asyncio.sleep 形成簡易 scheduler |
| 硬體 I/O(串口、藍牙) | 需要非阻塞讀寫,避免 UI 卡住 | asyncio.open_serial_connection(第三方) |
實務小技巧:在 Django、FastAPI 等 Web 框架中,視圖函式本身可以直接寫成
async def,框架會自動把它交給事件迴圈,讓資料庫查詢、外部 API 呼叫都能保持非同步。
總結
- asyncio 為 Python 提供了 單執行緒內的高效非同步 能力,只要掌握事件迴圈、協程與任務的概念,就能在 I/O 密集的應用中大幅提升效能。
- 透過
await、async with、asyncio.create_task等關鍵語法,我們可以寫出 可讀性高、資源友善 的程式碼。 - 注意 避免阻塞 I/O、正確管理資源、控制併發度,即可避免常見的陷阱。
- 無論是爬蟲、即時聊天、資料庫批次作業,或是任何需要同時處理多個 I/O 的情境,asyncio 都是值得投入的技術。
掌握了上述基礎,你已經具備在實務專案中使用 asyncio 的能力。接下來可以探索更進階的主題,如 自訂事件迴圈、結合多執行緒或多行程、以及使用 asyncio.runners 的新功能,讓你的 Python 應用程式更上一層樓。祝開發順利! 🚀