Python 並行與非同步:深入探討 Threading 模組
簡介
在現代的應用程式中,效能 與 使用者體驗 常常取決於程式能否同時處理多件事。舉例來說,網路爬蟲需要同時抓取多個網頁、桌面程式需要在背景執行 I/O 操作卻不阻塞 UI、伺服器端需要同時服務多個客戶端連線,這些情境都離不開 並行(Concurrency)的概念。
Python 內建的 threading 模組提供了 多執行緒(Thread)的抽象,讓開發者可以在同一個行程(process)內,同步或非同步地執行多段程式碼。儘管 Python 的全域解釋器鎖(GIL)限制了 CPU 密集型工作的平行效能,但在 I/O 密集型、等待外部資源的情況下,threading 仍是非常實用且易於上手的工具。
本篇文章將從基礎概念說起,透過 5 個實作範例 逐步展示如何使用 threading,並討論常見的陷阱、最佳實踐以及真實的應用場景,幫助讀者在實務上快速上手。
核心概念
1. 什麼是執行緒?
- 執行緒(Thread):在同一個行程(process)內的最小執行單位,與同屬行程的其他執行緒共享記憶體空間、檔案描述符等資源。
- 全域解釋器鎖(GIL):Python(CPython)為了保證記憶體安全,讓同時只有一個執行緒執行 Python bytecode。這意味著 CPU 密集型 工作不會因多執行緒而加速,但 I/O 密集型 工作仍能受惠於併發。
2. 建立與啟動執行緒
threading.Thread 類別是建立執行緒的核心。最常見的兩種方式:
- 傳入目標函式(target)
- 繼承
Thread類別並覆寫run()方法
兩者在功能上等價,選擇依個人風格或程式結構決定。
3. 執行緒同步
多執行緒同時存取共享資源時,可能產生競爭條件(race condition)。threading 提供了多種同步原語:
| 同步原語 | 用途 |
|---|---|
Lock / RLock |
互斥鎖,保護臨界區 |
Semaphore |
控制同時執行的執行緒數量 |
Event |
執行緒間的訊號傳遞 |
Condition |
複雜的等待/通知機制 |
Barrier |
多執行緒在同一點同步 |
4. 執行緒的生命週期
- 建立:
Thread物件被實例化。 - 啟動:呼叫
start(),內部會呼叫run()。 - 執行:執行緒在作業系統排程下執行程式碼。
- 結束:
run()完成或拋出未捕獲的例外。 - 回收:呼叫
join()等待執行緒結束,釋放資源。
程式碼範例
以下示範 5 個常見且實用的 threading 用法,程式碼均以 Python 3 為例,並在關鍵位置加上註解說明。
範例 1:最簡單的執行緒 – 傳入 target 函式
import threading
import time
def worker(name, seconds):
"""執行緒要執行的工作"""
print(f"[{name}] 開始執行,將睡眠 {seconds} 秒")
time.sleep(seconds)
print(f"[{name}] 工作完成")
# 建立兩個執行緒
t1 = threading.Thread(target=worker, args=("Thread-A", 2))
t2 = threading.Thread(target=worker, args=("Thread-B", 3))
# 啟動執行緒
t1.start()
t2.start()
# 主執行緒等待子執行緒結束
t1.join()
t2.join()
print("所有執行緒已完成")
說明:
target指定要執行的函式,args為傳遞給函式的參數。join()讓主執行緒阻塞,直到子執行緒結束,避免程式提前結束。
範例 2:繼承 Thread 類別
import threading
import random
class CounterThread(threading.Thread):
"""繼承 Thread,計算 1~n 的總和"""
def __init__(self, n, name=None):
super().__init__(name=name)
self.n = n
self.result = 0
def run(self):
# 這裡的程式碼會在新執行緒中執行
self.result = sum(range(1, self.n + 1))
print(f"[{self.name}] 計算結果: {self.result}")
# 建立三個計算執行緒
threads = [CounterThread(random.randint(10, 100), name=f"Counter-{i}") for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
# 取得每個執行緒的計算結果
print(f"{t.name} 的最終結果 = {t.result}")
說明:
- 透過繼承,我們可以在
run()中保存執行結果 (self.result) 供外部存取。 self.name為Thread基底類別提供的屬性,方便除錯與日誌。
範例 3:使用 Lock 防止競爭條件
import threading
counter = 0 # 共享資源
lock = threading.Lock() # 建立互斥鎖
def increment():
global counter
for _ in range(100_000):
# 取得鎖定,進入臨界區
with lock:
counter += 1
threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"最終計數結果 = {counter}")
說明:
- 若不使用
Lock,多執行緒同時修改counter會產生 race condition,導致結果不正確。 with lock:為 Pythonic 的寫法,會自動在區塊結束時釋放鎖。
範例 4:Event 用於執行緒間的訊號傳遞
import threading
import time
event = threading.Event()
def waiter():
print("等待者:開始等待事件被觸發...")
event.wait() # 阻塞,直到被 set()
print("等待者:事件已觸發,繼續執行!")
def setter():
print("觸發者:3 秒後觸發事件")
time.sleep(3)
event.set() # 觸發事件,所有 wait() 會返回
t_wait = threading.Thread(target=waiter)
t_set = threading.Thread(target=setter)
t_wait.start()
t_set.start()
t_wait.join()
t_set.join()
說明:
Event提供 一次性 的訊號機制,適合「某個條件達成後,全部執行緒同時繼續」的情境。event.is_set()可檢查目前狀態。
範例 5:ThreadPoolExecutor(concurrent.futures)結合 threading
雖然 ThreadPoolExecutor 屬於 concurrent.futures,但它底層仍使用 threading,且提供更高層次的抽象,適合 大量相同任務 的情況。
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
URLS = [
"https://www.python.org",
"https://www.github.com",
"https://www.stackoverflow.com",
# 可以自行加入更多網址
]
def fetch(url):
"""下載網址內容並回傳長度"""
with urllib.request.urlopen(url) as resp:
data = resp.read()
print(f"{url} 下載完成,大小 {len(data)} bytes")
return len(data)
# 建立最多 5 個執行緒的池
with ThreadPoolExecutor(max_workers=5) as executor:
# map 會回傳結果的 iterator
futures = {executor.submit(fetch, url): url for url in URLS}
for future in as_completed(futures):
url = futures[future]
try:
size = future.result()
except Exception as exc:
print(f"{url} 產生例外: {exc}")
else:
print(f"{url} 的內容大小: {size} bytes")
說明:
ThreadPoolExecutor讓我們不必手動管理Thread物件與join(),只需要提交任務即可。as_completed允許我們依照任務完成的先後順序取得結果,適合 I/O 密集型的 爬蟲、批次下載 等情境。
常見陷阱與最佳實踐
| 陷阱 | 說明 | 解決方式 |
|---|---|---|
| GIL 限制 CPU 計算 | 多執行緒無法提升 CPU 密集型程式的效能。 | 采用 multiprocessing、C 擴充或 numba 等方式,或改用 非同步 I/O(asyncio)。 |
忘記呼叫 join() |
主執行緒提前結束,子執行緒被迫中斷。 | 在需要確保所有工作完成時,務必 join()。 |
| 競爭條件 | 多執行緒同時寫入共享變數導致錯誤結果。 | 使用 Lock、RLock、Queue 等同步結構。 |
| 死鎖 | 兩個以上執行緒互相等待對方釋放資源。 | 確保取得鎖的順序一致,或使用 RLock、timeout。 |
| 過度產生執行緒 | 每次任務都建立新執行緒,會耗盡系統資源。 | 使用 執行緒池(ThreadPoolExecutor 或自訂 queue.Queue + 工作者執行緒)。 |
| 例外未捕獲 | 執行緒內拋出的例外不會傳到主執行緒,導致 silent failure。 | 在 run() 或目標函式內使用 try/except,或在 ThreadPoolExecutor 中檢查 future.exception()。 |
推薦的最佳實踐
- 盡量使用執行緒池:避免手動管理過多的
Thread物件。 - 把共享資料封裝在
queue.Queue:Queue 本身已內建鎖,安全且易於使用。 - 保持執行緒短小:長時間佔用 CPU 的工作交給多行程或 C 擴充。
- 明確命名執行緒:使用
name=參數或在run()中設定,方便除錯。 - 記得釋放資源:使用
with句法包裝Lock、Event,確保例外時仍能釋放。
實際應用場景
| 場景 | 為何適合使用 threading |
|---|---|
| 檔案 I/O:大量讀寫磁碟檔案(如日誌切割、批次轉檔) | I/O 等待時間長,執行緒可在等待期間切換,提升整體吞吐量。 |
| 網路爬蟲:同時抓取多個網站頁面 | 每個 HTTP 請求會被阻塞在 socket 等待,執行緒讓多個請求同時進行。 |
| GUI 程式:在背景執行長時間任務(下載、計算)而不凍結 UI | 主執行緒負責 UI,工作執行緒負責耗時任務,兩者互不干擾。 |
| 資料庫連線池:同時處理多筆查詢或寫入 | 每筆查詢可放在獨立執行緒,利用 DB 的 I/O 並行性。 |
| 即時監控:持續讀取感測器或串口資料 | 每條感測通道使用獨立執行緒,確保即時性。 |
小技巧:在上述場景中,若同時需要大量執行緒且頻繁建立/銷毀,建議改用
ThreadPoolExecutor+queue.Queue,可大幅降低系統開銷。
總結
threading 模組是 Python 提供的 輕量級併發解決方案,特別適合 I/O 密集型、等待外部資源 的情況。透過本篇的 概念說明、5 個實作範例、以及 陷阱與最佳實踐,讀者應已能:
- 正確建立、啟動與回收執行緒。
- 使用
Lock、Event、ThreadPoolExecutor等同步工具,避免競爭條件與死鎖。 - 判斷何時應該選擇
threading、何時改用multiprocessing或asyncio。 - 在真實專案(爬蟲、檔案處理、GUI)中落實執行緒的應用。
掌握好 執行緒的生命週期 與 同步機制,就能在 Python 程式中靈活運用併發,提升效能與使用者體驗。祝大家在未來的開發旅程中,玩得開心、寫得順手! 🚀