本文 AI 產出,尚未審核

Python – 並行與非同步(Concurrency & Async)

主題:async / await 語法


簡介

在現代的 Web 服務、爬蟲、即時資料處理等應用中,同時處理多件事 已成為常態需求。傳統的多執行緒或多行程解決方案固然有效,但在 I/O 密集的情況下會產生大量的上下文切換與資源浪費。Python 於 3.5 版正式導入的 async / await 語法,提供了一種 協程(coroutine) 的非同步程式設計方式,讓開發者可以以同步程式碼的寫法,撰寫出高效能且易於維護的非同步程式。

本篇文章將從 概念實作常見陷阱 以及 實務應用 四個面向,全面說明 async / await 的使用方式,幫助讀者快速上手,並在真實專案中正確運用。


核心概念

1. 協程(Coroutine)是什麼?

協程是一種「可暫停」的函式,允許在執行過程中暫時交出控制權,讓其他協程得以執行。與傳統的 阻塞式 I/O 不同,協程在等待外部資源(如網路、磁碟)時不會佔用 CPU,從而提升整體效能。

關鍵點:協程本身仍在單一執行緒內執行,不會自動產生多執行緒。如果需要 CPU 密集型運算,仍須配合 concurrent.futures 或多行程。

2. async / await 的語法結構

關鍵字 用途 範例
async def 定義一個協程函式 async def fetch(url):
await 暫停協程,等待另一個協程完成 data = await response.text()
async with 非同步上下文管理器(如 aiofilesaiohttp async with aiofiles.open(...) as f:
async for 非同步迭代器(如 aiohttp.StreamReader async for line in resp.content:

注意await 只能在 async def 定義的函式內使用,否則會拋出 SyntaxError

3. 事件迴圈(Event Loop)

事件迴圈是協程執行的核心。它負責 排程執行 以及 切換 協程,確保在 I/O 完成時喚醒相應的協程。Python 內建的 asyncio 模組提供了 get_event_loop()run() 等 API。

import asyncio

async def hello():
    print("Hello")
    await asyncio.sleep(1)   # 暫停 1 秒,讓出控制權
    print("World")

# Python 3.7+ 的簡潔寫法
asyncio.run(hello())

4. 非同步函式 vs. 同步函式

項目 同步函式 非同步函式 (async def)
呼叫方式 直接呼叫 必須 await 或交給事件迴圈
阻塞行為 會阻塞執行緒 只在 await 時暫停,其他時間可執行別的協程
適用情境 CPU 密集、簡單腳本 I/O 密集(網路請求、檔案 I/O)

程式碼範例

以下提供 五個 常見且實用的範例,從最基礎的協程到結合外部套件的完整流程,逐步說明每段程式碼的意圖與注意事項。

範例 1️⃣:最簡單的協程與 await asyncio.sleep

import asyncio

async def count_down(name: str, seconds: int):
    """倒數計時示範,使用 await 讓出控制權"""
    for i in range(seconds, 0, -1):
        print(f"{name} -> {i}s")
        await asyncio.sleep(1)          # 真正的非同步等待
    print(f"{name} 完成!")

async def main():
    # 同時執行兩個倒數計時
    await asyncio.gather(
        count_down("Task A", 3),
        count_down("Task B", 5)
    )

asyncio.run(main())

說明

  • asyncio.gather 會同時排程多個協程,等全部完成才回傳。
  • await asyncio.sleep(1) 不會 佔用 CPU,等同於「讓出」1 秒的時間給其他協程。

範例 2️⃣:非同步 HTTP 請求(使用 aiohttp

import asyncio
import aiohttp   # pip install aiohttp

async def fetch(url: str) -> str:
    """非同步取得單一網頁內容"""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            # await 讀取完整文字內容
            return await resp.text()

async def main():
    urls = [
        "https://www.python.org",
        "https://httpbin.org/delay/2",   # 模擬 2 秒延遲
        "https://api.github.com"
    ]
    # 同時發送三個請求
    results = await asyncio.gather(*(fetch(u) for u in urls))
    for i, content in enumerate(results):
        print(f"URL {i+1} 取得長度: {len(content)}")

asyncio.run(main())

說明

  • aiohttp.ClientSession 為非同步的 HTTP 客戶端,必須以 async with 包住。
  • await resp.text() 會在收到完整回應前暫停協程,讓其他協程繼續執行。

範例 3️⃣:非同步檔案 I/O(使用 aiofiles

import asyncio
import aiofiles   # pip install aiofiles

async def write_numbers(path: str, n: int):
    """將 1~n 寫入檔案,每次寫入前暫停 0.1 秒"""
    async with aiofiles.open(path, mode='w') as f:
        for i in range(1, n + 1):
            await f.write(f"{i}\n")
            await asyncio.sleep(0.1)   # 模擬其他工作

async def read_numbers(path: str):
    """非同步讀取檔案內容"""
    async with aiofiles.open(path, mode='r') as f:
        content = await f.read()
    return content

async def main():
    file_path = "numbers.txt"
    await write_numbers(file_path, 20)
    data = await read_numbers(file_path)
    print("檔案內容:")
    print(data)

asyncio.run(main())

說明

  • aiofiles 讓檔案的讀寫操作也能以非同步方式執行,避免在大量 I/O 時阻塞事件迴圈。
  • await asyncio.sleep(0.1) 僅為示範,可省略。

範例 4️⃣:併發控制(使用 asyncio.Semaphore

在大量同時請求時,若不加限制可能會觸發服務端的 rate limit。以下示範使用信號量限制同時執行的協程數量。

import asyncio
import aiohttp

MAX_CONCURRENT = 3          # 同時最多 3 個請求

semaphore = asyncio.Semaphore(MAX_CONCURRENT)

async def limited_fetch(url: str):
    async with semaphore:   # 取得信號量
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                return await resp.text()

async def main():
    urls = [f"https://httpbin.org/delay/{i%3}" for i in range(10)]
    results = await asyncio.gather(*(limited_fetch(u) for u in urls))
    print(f"取得 {len(results)} 個回應")

asyncio.run(main())

說明

  • asyncio.Semaphore 確保 同時 只會有 MAX_CONCURRENT 個協程在執行 session.get,其餘協程會被暫停,直到有空位釋放。

範例 5️⃣:結合 CPU 密集任務(使用 run_in_executor

非同步適合 I/O,但若要在同一事件迴圈內處理 CPU 密集計算,可將其委派給執行緒池(或進程池)。

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

def cpu_bound(n: int) -> int:
    """模擬 CPU 密集運算:計算 n 的階乘"""
    result = 1
    for i in range(2, n + 1):
        result *= i
    return result

async def async_factorial(n: int, executor):
    loop = asyncio.get_running_loop()
    # 將 CPU 任務交給執行緒池,await 其完成
    return await loop.run_in_executor(executor, cpu_bound, n)

async def main():
    executor = ThreadPoolExecutor(max_workers=4)
    numbers = [20_000, 30_000, 40_000, 50_000]
    tasks = [async_factorial(n, executor) for n in numbers]
    start = time.time()
    results = await asyncio.gather(*tasks)
    print(f"總耗時 {time.time() - start:.2f} 秒")
    # 只示範結果長度
    print([len(str(r)) for r in results])

asyncio.run(main())

說明

  • loop.run_in_executor 會把 cpu_bound 放到執行緒池中執行,協程在等待結果時不會阻塞事件迴圈。
  • 若任務更為 CPU 密集,建議改用 ProcessPoolExecutor,避免 GIL 限制。

常見陷阱與最佳實踐

陷阱 可能的後果 解決方式
在同步函式中直接使用 await SyntaxError 必須把包含 await 的程式碼放入 async def,或使用 asyncio.run 包裹
忘記 await 協程 協程不會執行,程式看似「卡住」或根本不會有結果 確認每個協程呼叫都有 await,或使用 asyncio.create_task 取得 Task 物件
阻塞 I/O(如 time.sleep)混入協程 事件迴圈被卡住,所有協程停頓 替換為 await asyncio.sleep,或使用 run_in_executor
過度併發導致資源耗盡 網路服務被封鎖、記憶體暴增 使用 SemaphoreQueue 或限制 max_workers
忘記關閉 ClientSession 連線資源泄漏,程式結束時警告 使用 async with aiohttp.ClientSession() as session: 或手動 await session.close()
在事件迴圈外部呼叫 asyncio.run 多次 RuntimeError: Event loop is closed 僅在程式入口處呼叫一次 asyncio.run,其他地方使用 awaitasyncio.create_task

最佳實踐

  1. 最小化阻塞:所有可能等待的操作(網路、磁碟、睡眠)都應該使用非同步 API。
  2. 使用 async with/async for:確保資源在協程結束時正確釋放。
  3. 分層設計:底層 I/O 使用非同步,業務邏輯保持同步或以 await 包裝,提升可測試性。
  4. 錯誤處理:在協程內使用 try/except,必要時捕捉 asyncio.TimeoutErroraiohttp.ClientError 等例外。
  5. 監控與日誌:非同步程式的執行順序較難追蹤,建議加入結構化日誌(如 loguru)與執行時間統計。

實際應用場景

場景 為何適合使用 async / await
Web Scraper(大量抓取網頁) 每個 HTTP 請求都是 I/O,使用 aiohttp 能同時發起上百個請求,縮短總耗時。
即時聊天伺服器(WebSocket) 需要同時處理多個連線的訊息收發,asyncio 的事件迴圈天然支援多路復用。
資料管線(ETL) 從遠端 API 抓取資料、寫入雲端儲存或資料庫(如 aiopgaiomysql),非同步可在等待 DB 回應時處理下一筆資料。
定時任務 & 背景工作 使用 asyncio.create_task 配合 asyncio.sleep 實作自動重試、輪詢等機制,避免額外的排程工具。
多媒體串流(音訊/影片) 讀寫檔案與網路傳輸同時進行,非同步 I/O 能降低緩衝延遲,提升使用者體驗。

範例:一個簡易的即時股票價格推播服務

import asyncio, aiohttp, websockets, json

async def fetch_price(symbol):
    url = f"https://api.example.com/price/{symbol}"
    async with aiohttp.ClientSession() as s:
        async with s.get(url) as r:
            data = await r.json()
            return data["price"]

async def push_updates(ws, symbols):
    while True:
        prices = await asyncio.gather(*(fetch_price(sym) for sym in symbols))
        await ws.send(json.dumps(dict(zip(symbols, prices))))
        await asyncio.sleep(1)   # 每秒推送一次

async def main():
    symbols = ["AAPL", "GOOG", "TSLA"]
    async with websockets.serve(lambda ws, _: push_updates(ws, symbols), "0.0.0.0", 8765):
        await asyncio.Future()   # 永久執行

asyncio.run(main())

這段程式碼同時向三支股票查價,並透過 WebSocket 即時推送給前端,用不到任何額外的執行緒或進程。


總結

  • async / await協程 的語法糖,讓非同步程式碼寫起來像同步程式,降低學習曲線。
  • 核心在於 事件迴圈:協程在等待 I/O 時交出控制權,讓其他協程繼續執行,提升 CPU 利用率。
  • 常見的非同步套件(aiohttpaiofilesaiomysql 等)都遵循同一套 async / await 規範,易於組合使用。
  • 使用時務必避免阻塞呼叫、適度限制併發、妥善管理資源,才能發揮非同步的最大效益。

掌握了 async / await,你就能在 高併發即時互動大量 I/O 的場景中寫出既簡潔又高效的 Python 程式。祝你在未來的專案裡,玩得開心、寫得順手! 🚀