本文 AI 產出,尚未審核

asyncio 基礎 ── Python 並行與非同步入門

簡介

在現代的 Web 服務、爬蟲、即時聊天機器人等應用程式中,同時處理多件事 已經成為必備能力。傳統的多執行緒或多行程雖然能達成並行,但在 Python 中卻常因 GIL(全域解譯鎖)或資源開銷過高而受到限制。Python 3.4 之後加入的 asyncio,提供了一套基於事件迴圈(event loop)的協程(coroutine)機制,使得 單執行緒內的非同步 I/O 成為可能,既省資源又易於維護。

本篇文章將從 概念實作範例常見陷阱 以及 實務應用 四個面向,帶你快速掌握 asyncio 的核心要點,讓你能在日常開發中自信地使用非同步程式設計。


核心概念

1. 事件迴圈(Event Loop)

事件迴圈是 asyncio 的「心臟」。它持續監聽各種 I/O 事件(如 socket 可讀、檔案可寫),一旦事件就緒,就呼叫對應的協程繼續執行。程式的控制權在協程之間「讓渡」而不是「切換」——這也是協程比傳統 thread 輕量的原因。

import asyncio

async def hello():
    print("Hello")
    await asyncio.sleep(1)   # 交出控制權,讓事件迴圈去做其他事
    print("World")

# 直接取得預設事件迴圈並執行
asyncio.run(hello())

重點await 只能在 async def 定義的協程內使用,且必須等待一個 awaitable(如 asyncio.sleepTaskFuture)。


2. 協程(Coroutine) vs 任務(Task)

  • 協程:使用 async def 定義的函式,回傳一個 coroutine object。它本身不會自動執行,必須交給事件迴圈。
  • 任務(Task):將協程包裝成可排程的物件,交給事件迴圈管理。使用 asyncio.create_task()loop.create_task() 建立。
async def fetch_data(id: int):
    await asyncio.sleep(0.5)   # 模擬 I/O
    return f"data-{id}"

# 建立多個任務,同時執行
async def main():
    tasks = [asyncio.create_task(fetch_data(i)) for i in range(5)]
    results = await asyncio.gather(*tasks)   # 等待全部完成
    print(results)

asyncio.run(main())

技巧asyncio.gather 會一次回傳所有任務的結果,若其中有例外會立即拋出;若想保留失敗任務,可使用 return_exceptions=True


3. 非同步 I/O:awaitasync with

常見的非同步 I/O 包括網路請求、檔案讀寫、資料庫操作等。Python 標準庫的 asyncio 提供了 open_connectionstart_server,而第三方套件(如 aiohttpaiomysql)則擴充了更多功能。

import asyncio
import aiohttp

async def fetch(url: str):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()   # 取得回傳內容

async def main():
    urls = ["https://httpbin.org/get"] * 3
    tasks = [asyncio.create_task(fetch(u)) for u in urls]
    for content in await asyncio.gather(*tasks):
        print(content[:60], "...")   # 只印前 60 個字元

asyncio.run(main())
  • async with 會在進入與離開區塊時自動呼叫協程的 __aenter__ / __aexit__,確保資源正確釋放。

4. 取消與逾時(Cancellation & Timeout)

在長時間等待的情況下,常需要提供 逾時機制手動取消 任務,以避免資源卡住。

import asyncio

async def long_job():
    try:
        await asyncio.sleep(10)   # 假設是長時間 I/O
    except asyncio.CancelledError:
        print("任務被取消")
        raise

async def main():
    task = asyncio.create_task(long_job())
    await asyncio.sleep(2)        # 讓任務跑兩秒
    task.cancel()                 # 主動取消
    try:
        await task
    except asyncio.CancelledError:
        print("已捕獲取消例外")

asyncio.run(main())

若要設定逾時,可使用 asyncio.wait_for

await asyncio.wait_for(fetch(url), timeout=3)   # 超過 3 秒即拋出 asyncio.TimeoutError

程式碼範例

以下提供 五個實用範例,從最基礎到稍微進階,幫助你快速上手。

範例 1:基本的「Hello, World」協程

import asyncio

async def hello():
    print("開始執行")
    await asyncio.sleep(1)   # 非同步暫停
    print("結束執行")

asyncio.run(hello())

說明asyncio.sleep 不會阻塞整個執行緒,只是交出控制權給事件迴圈。


範例 2:同時下載多個網頁(使用 aiohttp)

import asyncio
import aiohttp

async def download(url: str):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            data = await resp.read()
            print(f"{url} 下載完成,大小 {len(data)} bytes")

async def main():
    urls = [
        "https://httpbin.org/bytes/1024",
        "https://httpbin.org/bytes/2048",
        "https://httpbin.org/bytes/4096",
    ]
    await asyncio.gather(*(download(u) for u in urls))

asyncio.run(main())

重點asyncio.gather 讓三個下載任務同時進行,總耗時約等於最慢的那一個。


範例 3:簡易 TCP Echo Server(使用 asyncio Stream)

import asyncio

async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    addr = writer.get_extra_info('peername')
    print(f"連線來自 {addr}")

    while data := await reader.read(100):
        message = data.decode()
        print(f"收到: {message!r}")
        writer.write(data)            # 回傳相同資料
        await writer.drain()          # 確保資料已送出

    print("關閉連線")
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
    async with server:
        await server.serve_forever()

asyncio.run(main())

說明asyncio.start_server 會返回一個 Server 物件,使用 async with 確保關閉時釋放資源。


範例 4:使用 asyncio.Queue 實作生產者 / 消費者模式

import asyncio
import random

async def producer(q: asyncio.Queue, n: int):
    for i in range(n):
        await asyncio.sleep(random.uniform(0.1, 0.5))
        item = f"item-{i}"
        await q.put(item)
        print(f"產生 {item}")

async def consumer(q: asyncio.Queue, id: int):
    while True:
        item = await q.get()
        print(f"消費者{id} 處理 {item}")
        await asyncio.sleep(random.uniform(0.2, 0.6))
        q.task_done()

async def main():
    q = asyncio.Queue()
    prod = asyncio.create_task(producer(q, 10))
    cons1 = asyncio.create_task(consumer(q, 1))
    cons2 = asyncio.create_task(consumer(q, 2))

    await prod                     # 等待生產者結束
    await q.join()                 # 等待隊列全部被消費
    cons1.cancel()
    cons2.cancel()

asyncio.run(main())

關鍵Queue 自帶的 task_done / join 能確保所有項目都被處理完畢,避免程式提前結束。


範例 5:結合 Timeout 與例外處理的安全抓取

import asyncio
import aiohttp

async def safe_fetch(url: str, timeout: int = 3):
    try:
        async with aiohttp.ClientSession() as session:
            async with asyncio.wait_for(session.get(url), timeout):
                async with session.get(url) as resp:
                    return await resp.text()
    except asyncio.TimeoutError:
        return f"{url} 逾時"
    except aiohttp.ClientError as e:
        return f"{url} 錯誤: {e}"

async def main():
    urls = ["https://httpbin.org/delay/5", "https://httpbin.org/get"]
    results = await asyncio.gather(*(safe_fetch(u) for u in urls))
    for r in results:
        print(r[:80], "...")

asyncio.run(main())

技巧:先用 asyncio.wait_for 包住 session.get,再在內層處理 HTTP 錯誤,讓程式在網路不穩時仍能平穩回傳結果。


常見陷阱與最佳實踐

陷阱 可能的結果 解決方式
在同步函式中直接呼叫 asyncio.run 會在已存在的事件迴圈中拋出 RuntimeError 只在程式入口 (if __name__ == "__main__":) 使用 asyncio.run;在已經有迴圈的環境(如 Jupyter、Web 框架)使用 awaitloop.create_task
忘記 await 協程不會被執行,回傳 <coroutine object ...>,程式行為不如預期 確保所有返回 awaitable 的呼叫都加上 await,或使用 asyncio.create_task 明確排程
阻塞 I/O 混入協程(例如 time.sleeprequests 會阻塞整個事件迴圈,失去非同步效益 改用 asyncio.sleepaiohttphttpx.AsyncClient 等非同步版本
過度建立大量 Task 記憶體與 CPU 壓力升高,甚至觸發 Task was destroyed but it is pending! 警告 使用 semaphoreasyncio.BoundedSemaphore 限制同時執行的任務數量
未正確關閉資源(例如 ClientSessionStreamWriter 連線洩漏、檔案描述符耗盡 使用 async with 或在 finally 區塊中 await session.close()

最佳實踐

  1. 入口統一:整個程式只使用一次 asyncio.run(main()),讓事件迴圈管理全局生命週期。
  2. 資源管理:盡量使用 async with 包裝 I/O 物件,確保例外發生時仍能正確關閉。
  3. 限制同時度:對外部服務(API、資料庫)使用 semaphore 控制併發數,避免被封鎖或過載。
  4. 例外傳遞:在協程內捕獲特定例外並重新拋出,讓上層 gather 能統一處理。
  5. 測試與除錯:使用 pytest-asyncio 撰寫非同步測試;若需要追蹤事件迴圈,可在 asyncio.run 前設定 PYTHONASYNCIODEBUG=1

實際應用場景

場景 為何選擇 asyncio 範例程式碼或說明
Web 爬蟲 同時發送上千個 HTTP 請求,節省等待時間 aiohttp + asyncio.gather
即時聊天機器人 需要同時處理多個 WebSocket 連線,且每條訊息都要快速回應 websockets 套件配合 asyncio.Queue
資料庫批次寫入 多筆寫入可同時執行,降低 DB 連線等待 aiomysql / asyncpg + 連線池
背景任務排程 定時抓取外部 API、清理暫存檔案,且不阻塞主服務 asyncio.create_task + asyncio.sleep 形成簡易 scheduler
硬體 I/O(串口、藍牙) 需要非阻塞讀寫,避免 UI 卡住 asyncio.open_serial_connection(第三方)

實務小技巧:在 Django、FastAPI 等 Web 框架中,視圖函式本身可以直接寫成 async def,框架會自動把它交給事件迴圈,讓資料庫查詢、外部 API 呼叫都能保持非同步。


總結

  • asyncio 為 Python 提供了 單執行緒內的高效非同步 能力,只要掌握事件迴圈、協程與任務的概念,就能在 I/O 密集的應用中大幅提升效能。
  • 透過 awaitasync withasyncio.create_task 等關鍵語法,我們可以寫出 可讀性高、資源友善 的程式碼。
  • 注意 避免阻塞 I/O、正確管理資源、控制併發度,即可避免常見的陷阱。
  • 無論是爬蟲、即時聊天、資料庫批次作業,或是任何需要同時處理多個 I/O 的情境,asyncio 都是值得投入的技術。

掌握了上述基礎,你已經具備在實務專案中使用 asyncio 的能力。接下來可以探索更進階的主題,如 自訂事件迴圈、結合多執行緒或多行程、以及使用 asyncio.runners 的新功能,讓你的 Python 應用程式更上一層樓。祝開發順利! 🚀