本文 AI 產出,尚未審核

Python 並行與非同步:深入了解 multiprocessing 模組


簡介

在現代的資料處理與網路服務中,單純依賴單執行緒的程式往往無法滿足效能需求。CPU 密集型(例如數值計算、圖像處理)或 IO 密集型(例如大量檔案讀寫、網路請求)的工作,都可以透過並行(Concurrency)或平行(Parallelism)來加速。

Python 內建的 multiprocessing 模組提供了 跨平台類似 threading 的 API,卻能真正利用多核心 CPU,讓每個子行程(process)擁有獨立的 Python 解釋器與記憶體空間,避免了 GIL(Global Interpreter Lock)的限制。掌握 multiprocessing,不只能提升程式效能,更能在資料科學、機器學習、網路服務等領域打造更具彈性的系統。


核心概念

1. 為什麼要使用 multiprocessing

  • 突破 GIL:在 CPython 中,單一執行緒同時只能執行一段 Python bytecode,導致 CPU 密集型任務無法真正平行。multiprocessing 透過多個子行程,各自擁有獨立的 GIL,實現真正的平行運算。
  • 跨平台支援:Windows、macOS、Linux 都能使用相同的 API,唯一需要留意的是 Windows 必須使用 if __name__ == '__main__': 防止子行程遞迴產生。
  • 類似 threading 的介面:如果你已熟悉 threading,轉而使用 multiprocessing 幾乎不需要重新學習大量概念。

2. 基本結構:Process 類別

multiprocessing.Process 類別相當於 threading.Thread,但每個 Process 會在獨立的記憶體空間執行。

import multiprocessing as mp
import time

def worker(name, seconds):
    """子行程要執行的函式"""
    print(f"[{name}] 開始執行,預計 {seconds} 秒")
    time.sleep(seconds)
    print(f"[{name}] 執行結束")

if __name__ == '__main__':
    # 建立兩個子行程
    p1 = mp.Process(target=worker, args=('A', 2))
    p2 = mp.Process(target=worker, args=('B', 3))

    # 啟動子行程
    p1.start()
    p2.start()

    # 等待子行程結束
    p1.join()
    p2.join()
    print("主程式結束")

重點start() 會在子行程中執行 target 函式,join() 則是阻塞主執行緒,直到子行程結束。

3. 使用 Pool 管理工作池

當需要大量相同類型的任務時,手動建立 Process 會變得笨重。multiprocessing.Pool 可以自動管理固定數量的工作者行程,提供 mapapply_async 等高階介面。

import multiprocessing as mp

def square(x):
    """計算平方的簡單函式"""
    return x * x

if __name__ == '__main__':
    numbers = list(range(1, 11))

    # 建立一個包含 4 個工作者的 Pool
    with mp.Pool(processes=4) as pool:
        # 使用 map 讓每個元素平行計算
        results = pool.map(square, numbers)

    print("平方結果:", results)
  • map 會保證結果的順序與輸入一致。
  • 若需要 非同步 的回傳,可改用 apply_asyncstarmap_async

4. 進程間通訊(IPC)——QueuePipeManager

多個子行程需要共享資料時,直接使用全域變數會因為記憶體獨立而失效。multiprocessing 提供了安全的 IPC 機制:

4.1 Queue(先進先出)

import multiprocessing as mp
import random, time

def producer(q, n):
    for i in range(n):
        item = random.randint(1, 100)
        q.put(item)
        print(f"[Producer] 產生 {item}")
        time.sleep(0.1)

def consumer(q):
    while True:
        item = q.get()
        if item is None:          # 終止訊號
            break
        print(f"[Consumer] 處理 {item}")

if __name__ == '__main__':
    q = mp.Queue()
    p = mp.Process(target=producer, args=(q, 10))
    c = mp.Process(target=consumer, args=(q,))

    p.start()
    c.start()
    p.join()

    # 傳送終止訊號給 consumer
    q.put(None)
    c.join()

4.2 Manager(共享資料結構)

Manager 允許在多個行程間共享 listdictNamespace 等 Python 內建資料型別。

import multiprocessing as mp

def worker(shared_dict, key, value):
    shared_dict[key] = value
    print(f"設定 {key} = {value}")

if __name__ == '__main__':
    with mp.Manager() as manager:
        shared_dict = manager.dict()
        processes = []
        for i in range(5):
            p = mp.Process(target=worker, args=(shared_dict, f'key{i}', i*i))
            processes.append(p)
            p.start()

        for p in processes:
            p.join()

        print("最終共享字典:", dict(shared_dict))

5. 共享記憶體:ValueArray

對於需要 高效能、低延遲 的共享變數,可使用 multiprocessing.Value(單一值)或 multiprocessing.Array(陣列),底層基於 C 語言的共享記憶體。

import multiprocessing as mp
import time

def increment(counter, steps):
    for _ in range(steps):
        with counter.get_lock():          # 確保原子操作
            counter.value += 1

if __name__ == '__main__':
    counter = mp.Value('i', 0)            # 'i' 表示 C int
    processes = [mp.Process(target=increment, args=(counter, 100000)) for _ in range(4)]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print("最終計數值:", counter.value)   # 預期 400000

常見陷阱與最佳實踐

陷阱 說明 建議的解決方案
忘記 if __name__ == '__main__': 在 Windows 上會導致子行程遞迴產生無窮子行程。 必須將建立與啟動 Process 的程式碼包在 if __name__ == '__main__': 區塊。
共享可變物件時的競爭條件 直接傳遞 list、dict 給子行程會得到各自的副本。 使用 QueueManagerValue/Array,並在需要時加鎖(LockRLock)。
過度產生子行程 每次任務都建立新 Process,成本高、記憶體浪費。 使用 PoolProcessPoolExecutorconcurrent.futures)重複利用工作者。
忘記呼叫 join() 主程式提前結束,子行程被強制終止。 務必在適當時機呼叫 join(),或使用 with Pool() 的 context 管理。
資料序列化失敗 multiprocessing 需要把參數序列化(pickle),某些物件(如 lambda、開啟的檔案)無法序列化。 只傳遞可 pickle 的資料,必要時改寫成可序列化的函式或使用 dill 替代。
記憶體泄漏 子行程未正確關閉或大量 Queue 未清空。 使用 with 句法管理 QueuePool,或在結束前呼叫 close()join()

最佳實踐

  1. 盡量使用 Pool 而非手動 Process,除非任務高度不對稱或需要自訂行程生命週期。
  2. 資料傳遞盡量保持不可變(immutable),減少鎖的需求。
  3. 在 CPU 密集型任務使用 multiprocessing,IO 密集型任務則考慮 asynciothreading,以避免不必要的記憶體開銷。
  4. 測試與除錯:使用 multiprocessing.set_start_method('spawn')(或 'fork')可以在不同平台上統一行為,並在開發階段加入 log,方便追蹤子行程狀態。

實際應用場景

場景 為何適合使用 multiprocessing 範例
影像批次處理(如縮圖、濾鏡) 每張圖片的處理是 CPU 密集且相互獨立,透過多核心可線性加速。 使用 Pool.map 把圖片路徑列表傳給圖像處理函式。
資料科學中的特徵工程 大量資料切割、統計計算可平行化。 Pool.starmap 同時計算多個特徵。
網路爬蟲的批次下載(IO 密集) 雖然是 IO 密集,但若每個下載需要大量解壓或解析,可結合 multiprocessingasyncio 子行程負責下載並解壓,內部使用 aiohttp 非同步下載。
模擬與 Monte‑Carlo 計算 每次模擬相互獨立,適合大量平行運算。 Pool.apply_async 產生多條模擬線程,最後收集結果。
服務端的工作隊列 例如背景任務(發送郵件、產生報表)需要與主服務分離,避免阻塞。 使用 multiprocessing.Queue 作為工作隊列,由多個子行程消費。

總結

multiprocessing 為 Python 提供了一套 簡潔、跨平台 的平行運算工具,讓開發者能在 CPU 密集型 任務上突破 GIL 的限制。掌握以下要點,即可在實務專案中發揮最大效益:

  1. 以 Process 為單位思考:每個子行程都有獨立的記憶體與 GIL,適合 CPU 密集型工作。
  2. 使用 Pool 管理工作者,減少建立/銷毀行程的開銷。
  3. 透過 QueueManagerValue/Array 進行安全的進程間通訊,避免競爭條件。
  4. 遵守平台差異(特別是 Windows 的 __main__ 保護)與 資源釋放join()close())。
  5. 根據任務性質選擇工具:CPU 密集型 → multiprocessing,IO 密集型 → asynciothreading

只要善用上述概念與範例,你就能在 資料分析、機器學習前處理、網路服務背景任務 等領域,寫出更快、更可靠的 Python 程式。祝你玩得開心,寫出高效能的並行應用!