Python 並行與非同步:深入了解 multiprocessing 模組
簡介
在現代的資料處理與網路服務中,單純依賴單執行緒的程式往往無法滿足效能需求。CPU 密集型(例如數值計算、圖像處理)或 IO 密集型(例如大量檔案讀寫、網路請求)的工作,都可以透過並行(Concurrency)或平行(Parallelism)來加速。
Python 內建的 multiprocessing 模組提供了 跨平台、類似 threading 的 API,卻能真正利用多核心 CPU,讓每個子行程(process)擁有獨立的 Python 解釋器與記憶體空間,避免了 GIL(Global Interpreter Lock)的限制。掌握 multiprocessing,不只能提升程式效能,更能在資料科學、機器學習、網路服務等領域打造更具彈性的系統。
核心概念
1. 為什麼要使用 multiprocessing?
- 突破 GIL:在 CPython 中,單一執行緒同時只能執行一段 Python bytecode,導致 CPU 密集型任務無法真正平行。
multiprocessing透過多個子行程,各自擁有獨立的 GIL,實現真正的平行運算。 - 跨平台支援:Windows、macOS、Linux 都能使用相同的 API,唯一需要留意的是 Windows 必須使用
if __name__ == '__main__':防止子行程遞迴產生。 - 類似
threading的介面:如果你已熟悉threading,轉而使用multiprocessing幾乎不需要重新學習大量概念。
2. 基本結構:Process 類別
multiprocessing.Process 類別相當於 threading.Thread,但每個 Process 會在獨立的記憶體空間執行。
import multiprocessing as mp
import time
def worker(name, seconds):
"""子行程要執行的函式"""
print(f"[{name}] 開始執行,預計 {seconds} 秒")
time.sleep(seconds)
print(f"[{name}] 執行結束")
if __name__ == '__main__':
# 建立兩個子行程
p1 = mp.Process(target=worker, args=('A', 2))
p2 = mp.Process(target=worker, args=('B', 3))
# 啟動子行程
p1.start()
p2.start()
# 等待子行程結束
p1.join()
p2.join()
print("主程式結束")
重點:
start()會在子行程中執行target函式,join()則是阻塞主執行緒,直到子行程結束。
3. 使用 Pool 管理工作池
當需要大量相同類型的任務時,手動建立 Process 會變得笨重。multiprocessing.Pool 可以自動管理固定數量的工作者行程,提供 map、apply_async 等高階介面。
import multiprocessing as mp
def square(x):
"""計算平方的簡單函式"""
return x * x
if __name__ == '__main__':
numbers = list(range(1, 11))
# 建立一個包含 4 個工作者的 Pool
with mp.Pool(processes=4) as pool:
# 使用 map 讓每個元素平行計算
results = pool.map(square, numbers)
print("平方結果:", results)
map會保證結果的順序與輸入一致。- 若需要 非同步 的回傳,可改用
apply_async或starmap_async。
4. 進程間通訊(IPC)——Queue、Pipe、Manager
多個子行程需要共享資料時,直接使用全域變數會因為記憶體獨立而失效。multiprocessing 提供了安全的 IPC 機制:
4.1 Queue(先進先出)
import multiprocessing as mp
import random, time
def producer(q, n):
for i in range(n):
item = random.randint(1, 100)
q.put(item)
print(f"[Producer] 產生 {item}")
time.sleep(0.1)
def consumer(q):
while True:
item = q.get()
if item is None: # 終止訊號
break
print(f"[Consumer] 處理 {item}")
if __name__ == '__main__':
q = mp.Queue()
p = mp.Process(target=producer, args=(q, 10))
c = mp.Process(target=consumer, args=(q,))
p.start()
c.start()
p.join()
# 傳送終止訊號給 consumer
q.put(None)
c.join()
4.2 Manager(共享資料結構)
Manager 允許在多個行程間共享 list、dict、Namespace 等 Python 內建資料型別。
import multiprocessing as mp
def worker(shared_dict, key, value):
shared_dict[key] = value
print(f"設定 {key} = {value}")
if __name__ == '__main__':
with mp.Manager() as manager:
shared_dict = manager.dict()
processes = []
for i in range(5):
p = mp.Process(target=worker, args=(shared_dict, f'key{i}', i*i))
processes.append(p)
p.start()
for p in processes:
p.join()
print("最終共享字典:", dict(shared_dict))
5. 共享記憶體:Value、Array
對於需要 高效能、低延遲 的共享變數,可使用 multiprocessing.Value(單一值)或 multiprocessing.Array(陣列),底層基於 C 語言的共享記憶體。
import multiprocessing as mp
import time
def increment(counter, steps):
for _ in range(steps):
with counter.get_lock(): # 確保原子操作
counter.value += 1
if __name__ == '__main__':
counter = mp.Value('i', 0) # 'i' 表示 C int
processes = [mp.Process(target=increment, args=(counter, 100000)) for _ in range(4)]
for p in processes:
p.start()
for p in processes:
p.join()
print("最終計數值:", counter.value) # 預期 400000
常見陷阱與最佳實踐
| 陷阱 | 說明 | 建議的解決方案 |
|---|---|---|
忘記 if __name__ == '__main__': |
在 Windows 上會導致子行程遞迴產生無窮子行程。 | 必須將建立與啟動 Process 的程式碼包在 if __name__ == '__main__': 區塊。 |
| 共享可變物件時的競爭條件 | 直接傳遞 list、dict 給子行程會得到各自的副本。 | 使用 Queue、Manager 或 Value/Array,並在需要時加鎖(Lock、RLock)。 |
| 過度產生子行程 | 每次任務都建立新 Process,成本高、記憶體浪費。 |
使用 Pool 或 ProcessPoolExecutor(concurrent.futures)重複利用工作者。 |
忘記呼叫 join() |
主程式提前結束,子行程被強制終止。 | 務必在適當時機呼叫 join(),或使用 with Pool() 的 context 管理。 |
| 資料序列化失敗 | multiprocessing 需要把參數序列化(pickle),某些物件(如 lambda、開啟的檔案)無法序列化。 |
只傳遞可 pickle 的資料,必要時改寫成可序列化的函式或使用 dill 替代。 |
| 記憶體泄漏 | 子行程未正確關閉或大量 Queue 未清空。 |
使用 with 句法管理 Queue、Pool,或在結束前呼叫 close()、join()。 |
最佳實踐:
- 盡量使用
Pool而非手動Process,除非任務高度不對稱或需要自訂行程生命週期。 - 資料傳遞盡量保持不可變(immutable),減少鎖的需求。
- 在 CPU 密集型任務使用
multiprocessing,IO 密集型任務則考慮asyncio或threading,以避免不必要的記憶體開銷。 - 測試與除錯:使用
multiprocessing.set_start_method('spawn')(或'fork')可以在不同平台上統一行為,並在開發階段加入log,方便追蹤子行程狀態。
實際應用場景
| 場景 | 為何適合使用 multiprocessing |
範例 |
|---|---|---|
| 影像批次處理(如縮圖、濾鏡) | 每張圖片的處理是 CPU 密集且相互獨立,透過多核心可線性加速。 | 使用 Pool.map 把圖片路徑列表傳給圖像處理函式。 |
| 資料科學中的特徵工程 | 大量資料切割、統計計算可平行化。 | Pool.starmap 同時計算多個特徵。 |
| 網路爬蟲的批次下載(IO 密集) | 雖然是 IO 密集,但若每個下載需要大量解壓或解析,可結合 multiprocessing 與 asyncio。 |
子行程負責下載並解壓,內部使用 aiohttp 非同步下載。 |
| 模擬與 Monte‑Carlo 計算 | 每次模擬相互獨立,適合大量平行運算。 | Pool.apply_async 產生多條模擬線程,最後收集結果。 |
| 服務端的工作隊列 | 例如背景任務(發送郵件、產生報表)需要與主服務分離,避免阻塞。 | 使用 multiprocessing.Queue 作為工作隊列,由多個子行程消費。 |
總結
multiprocessing 為 Python 提供了一套 簡潔、跨平台 的平行運算工具,讓開發者能在 CPU 密集型 任務上突破 GIL 的限制。掌握以下要點,即可在實務專案中發揮最大效益:
- 以 Process 為單位思考:每個子行程都有獨立的記憶體與 GIL,適合 CPU 密集型工作。
- 使用
Pool管理工作者,減少建立/銷毀行程的開銷。 - 透過
Queue、Manager、Value/Array進行安全的進程間通訊,避免競爭條件。 - 遵守平台差異(特別是 Windows 的
__main__保護)與 資源釋放(join()、close())。 - 根據任務性質選擇工具:CPU 密集型 →
multiprocessing,IO 密集型 →asyncio或threading。
只要善用上述概念與範例,你就能在 資料分析、機器學習前處理、網路服務背景任務 等領域,寫出更快、更可靠的 Python 程式。祝你玩得開心,寫出高效能的並行應用!