本文 AI 產出,尚未審核

Python 並行與非同步(Concurrency & Async)

主題:非同步 IO 操作(aiohttp)


簡介

在現代的 Web 應用與資料爬蟲、微服務間的互動中,IO(輸入/輸出) 常常是程式執行的瓶頸。傳統的同步請求會在等待遠端伺服器回應的期間,完全阻塞執行緒,導致 CPU 資源閒置,效能無法發揮。
非同步 IO 透過事件迴圈(event loop)讓單一執行緒可以同時管理多個待處理的網路請求,極大提升併發效能。Python 官方提供的 asyncio 框架已成為寫非同步程式的標準,而 aiohttp 則是基於 asyncio非同步 HTTP 客戶端/伺服器 套件,讓我們可以用簡潔的語法撰寫高效能的網路程式。

本篇文章將從核心概念出發,示範 aiohttp 的基本與進階使用方式,並說明常見的陷阱與最佳實踐,最後提供實務應用場景,幫助讀者快速上手並在真實專案中落地。


核心概念

1. asyncio 事件迴圈與協程

  • 協程 (coroutine):使用 async def 定義的函式,回傳一個 協程物件,只有在事件迴圈 (event loop) 中被 await 時才會執行。
  • 事件迴圈:管理所有待執行的協程與 IO 事件,負責在 IO 完成時喚醒對應的協程。

重點await 只會讓出 CPU 給其他協程,不會產生新執行緒,因此在 CPU 密集任務上仍需配合 ThreadPoolExecutorProcessPoolExecutor

import asyncio

async def hello():
    await asyncio.sleep(1)   # 模擬非同步 IO
    print("Hello, async world!")

asyncio.run(hello())

2. 為什麼選擇 aiohttp

  • 純 Python 實作,不依賴外部 C 擴充,安裝簡單 (pip install aiohttp)。
  • 支援 HTTP/1.1WebSocketClientSession 連線池等功能。
  • 完全相容 asyncio,可與其他非同步套件(如 aioredisasyncpg)無縫整合。

3. 基本的 aiohttp 客戶端使用

ClientSessionaiohttp 的核心物件,負責管理連線池與共用設定。以下範例示範如何使用 async with 來自動關閉資源。

import aiohttp
import asyncio

async def fetch(url: str) -> str:
    async with aiohttp.ClientSession() as session:          # 建立 Session
        async with session.get(url) as response:           # 非同步 GET
            response.raise_for_status()                    # 錯誤檢查
            return await response.text()                   # 取得文字內容

async def main():
    html = await fetch('https://www.example.com')
    print(html[:200])  # 顯示前 200 個字元

asyncio.run(main())

4. 同時發送多筆請求

使用 asyncio.gather 可以同時等待多個協程完成,達到 高併發 效果。

import aiohttp
import asyncio

URLS = [
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/3",
    "https://httpbin.org/delay/1",
]

async def fetch_one(session, url):
    async with session.get(url) as resp:
        return await resp.text()

async def fetch_all():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, u) for u in URLS]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for i, content in enumerate(results):
            if isinstance(content, Exception):
                print(f"URL {URLS[i]} 錯誤: {content}")
            else:
                print(f"URL {URLS[i]} 回傳長度: {len(content)}")

asyncio.run(fetch_all())

5. 下載大檔案的串流 (Streaming)

對於大檔案,直接一次讀取會佔用大量記憶體。aiohttp 提供 content.iter_chunked 讓我們以 塊 (chunk) 方式逐段寫入磁碟。

import aiohttp
import asyncio
import pathlib

async def download(url: str, path: pathlib.Path):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            resp.raise_for_status()
            with path.open('wb') as f:
                async for chunk in resp.content.iter_chunked(1024 * 64):  # 64KB
                    f.write(chunk)
    print(f"已下載至 {path}")

asyncio.run(download(
    "https://speed.hetzner.de/100MB.bin",
    pathlib.Path("100MB.bin")
))

6. 設定 Timeout、重試與連線池

  • Timeoutaiohttp.ClientTimeout 可針對連線、讀取、寫入設定上限。
  • 重試:雖然 aiohttp 本身不提供自動重試機制,但可自行包裝協程。
  • 連線池ClientSession 內建連線池,預設同時最多 100 個連線,必要時可調整 connector
import aiohttp
import asyncio

timeout = aiohttp.ClientTimeout(total=10)  # 總逾時 10 秒
connector = aiohttp.TCPConnector(limit=50)  # 最多同時 50 條連線

async def fetch_with_options(url):
    async with aiohttp.ClientSession(timeout=timeout,
                                     connector=connector) as session:
        async with session.get(url) as resp:
            return await resp.json()

asyncio.run(fetch_with_options('https://api.github.com'))

常見陷阱與最佳實踐

陷阱 可能的後果 解決方案
忘記 await 協程不會執行,回傳 <coroutine object>,程式看似靜默失敗。 確認所有非同步呼叫皆使用 await,或在 asyncio.run 內包裝。
在同步函式中直接呼叫 asyncio.run 多次 會產生「事件迴圈已關閉」的例外,且效能低下。 將所有非同步工作集中於同一個事件迴圈,或使用 asyncio.get_event_loop().run_until_complete
未關閉 ClientSession 連線資源泄漏,最終導致「Too many open files」錯誤。 使用 async with 讓 Session 自動關閉,或在結束前呼叫 await session.close()
過度併發導致目標伺服器被封鎖 HTTP 429(Too Many Requests)或 IP 被封。 節流 (throttling):使用 asyncio.Semaphore 控制同時請求數量;加入隨機延遲。
忽略 Timeout 請求卡住導致整個程式無法前進。 為每個請求設定合理的 ClientTimeout,並捕獲 asyncio.TimeoutError 進行重試或回退。

最佳實踐

  1. 統一使用 ClientSession:在整個應用程式的生命週期內只建立一次 Session,減少 TCP 握手成本。
  2. 避免阻塞呼叫:若必須執行 CPU 密集工作,使用 run_in_executor 把它搬到執行緒池。
  3. 使用型別提示aiohttp 支援 typing,加入 -> str-> aiohttp.ClientResponse 等提示,可提升 IDE 輔助與程式可讀性。
  4. 加入日誌 (logging):在 ClientSession 中設定 trace_config,可追蹤每筆請求的開始/結束時間,方便除錯與效能分析。

實際應用場景

  1. 爬蟲與資料擷取:大量抓取公開 API 或網頁時,使用 aiohttp 的併發下載能在數秒內完成原本需數分鐘的工作。
  2. 微服務間的同步呼叫:在 FastAPI、Sanic 等非同步 Web 框架中,向其他服務發送 HTTP 請求時直接使用 aiohttp,保持全程非同步,避免阻塞事件迴圈。
  3. 即時資料推送:結合 aiohttp 的 WebSocket 支援,可實作聊天室、即時儀表板或推播服務。
  4. 大檔案傳輸:如備份系統或 CDN 上傳,利用 iter_chunked 串流寫入,減少記憶體占用,同時支援斷點續傳的自訂實作。

總結

  • 非同步 IO 是提升網路密集型 Python 程式效能的關鍵技術,aiohttp 為最常用且功能完整的套件。
  • 透過 協程、事件迴圈與連線池,我們可以在單一執行緒內同時管理數十甚至數百個 HTTP 請求,顯著縮短等待時間。
  • 正確的資源管理(async with、Timeout、連線池)與 節流、錯誤處理 是避免常見陷阱的要點。
  • 在爬蟲、微服務、即時推送與大檔案傳輸等真實案例中,aiohttp 已被廣泛驗證為可靠且高效的解決方案。

掌握了上述概念與實作技巧後,你就可以在自己的 Python 專案中,輕鬆寫出 高併發、低延遲 的非同步 HTTP 程式,為系統效能與使用者體驗加分。祝你開發順利!