本文 AI 產出,尚未審核

Python 並行與非同步:深入探討 Threading 模組


簡介

在現代的應用程式中,效能使用者體驗 常常取決於程式能否同時處理多件事。舉例來說,網路爬蟲需要同時抓取多個網頁、桌面程式需要在背景執行 I/O 操作卻不阻塞 UI、伺服器端需要同時服務多個客戶端連線,這些情境都離不開 並行(Concurrency)的概念。

Python 內建的 threading 模組提供了 多執行緒(Thread)的抽象,讓開發者可以在同一個行程(process)內,同步或非同步地執行多段程式碼。儘管 Python 的全域解釋器鎖(GIL)限制了 CPU 密集型工作的平行效能,但在 I/O 密集型、等待外部資源的情況下,threading 仍是非常實用且易於上手的工具。

本篇文章將從基礎概念說起,透過 5 個實作範例 逐步展示如何使用 threading,並討論常見的陷阱、最佳實踐以及真實的應用場景,幫助讀者在實務上快速上手。


核心概念

1. 什麼是執行緒?

  • 執行緒(Thread):在同一個行程(process)內的最小執行單位,與同屬行程的其他執行緒共享記憶體空間、檔案描述符等資源。
  • 全域解釋器鎖(GIL):Python(CPython)為了保證記憶體安全,讓同時只有一個執行緒執行 Python bytecode。這意味著 CPU 密集型 工作不會因多執行緒而加速,但 I/O 密集型 工作仍能受惠於併發。

2. 建立與啟動執行緒

threading.Thread 類別是建立執行緒的核心。最常見的兩種方式:

  1. 傳入目標函式(target)
  2. 繼承 Thread 類別並覆寫 run() 方法

兩者在功能上等價,選擇依個人風格或程式結構決定。

3. 執行緒同步

多執行緒同時存取共享資源時,可能產生競爭條件(race condition)。threading 提供了多種同步原語:

同步原語 用途
Lock / RLock 互斥鎖,保護臨界區
Semaphore 控制同時執行的執行緒數量
Event 執行緒間的訊號傳遞
Condition 複雜的等待/通知機制
Barrier 多執行緒在同一點同步

4. 執行緒的生命週期

  1. 建立Thread 物件被實例化。
  2. 啟動:呼叫 start(),內部會呼叫 run()
  3. 執行:執行緒在作業系統排程下執行程式碼。
  4. 結束run() 完成或拋出未捕獲的例外。
  5. 回收:呼叫 join() 等待執行緒結束,釋放資源。

程式碼範例

以下示範 5 個常見且實用的 threading 用法,程式碼均以 Python 3 為例,並在關鍵位置加上註解說明。

範例 1:最簡單的執行緒 – 傳入 target 函式

import threading
import time

def worker(name, seconds):
    """執行緒要執行的工作"""
    print(f"[{name}] 開始執行,將睡眠 {seconds} 秒")
    time.sleep(seconds)
    print(f"[{name}] 工作完成")

# 建立兩個執行緒
t1 = threading.Thread(target=worker, args=("Thread-A", 2))
t2 = threading.Thread(target=worker, args=("Thread-B", 3))

# 啟動執行緒
t1.start()
t2.start()

# 主執行緒等待子執行緒結束
t1.join()
t2.join()
print("所有執行緒已完成")

說明

  • target 指定要執行的函式,args 為傳遞給函式的參數。
  • join() 讓主執行緒阻塞,直到子執行緒結束,避免程式提前結束。

範例 2:繼承 Thread 類別

import threading
import random

class CounterThread(threading.Thread):
    """繼承 Thread,計算 1~n 的總和"""
    def __init__(self, n, name=None):
        super().__init__(name=name)
        self.n = n
        self.result = 0

    def run(self):
        # 這裡的程式碼會在新執行緒中執行
        self.result = sum(range(1, self.n + 1))
        print(f"[{self.name}] 計算結果: {self.result}")

# 建立三個計算執行緒
threads = [CounterThread(random.randint(10, 100), name=f"Counter-{i}") for i in range(3)]

for t in threads:
    t.start()

for t in threads:
    t.join()
    # 取得每個執行緒的計算結果
    print(f"{t.name} 的最終結果 = {t.result}")

說明

  • 透過繼承,我們可以在 run() 中保存執行結果 (self.result) 供外部存取。
  • self.nameThread 基底類別提供的屬性,方便除錯與日誌。

範例 3:使用 Lock 防止競爭條件

import threading

counter = 0          # 共享資源
lock = threading.Lock()   # 建立互斥鎖

def increment():
    global counter
    for _ in range(100_000):
        # 取得鎖定,進入臨界區
        with lock:
            counter += 1

threads = [threading.Thread(target=increment) for _ in range(5)]

for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"最終計數結果 = {counter}")

說明

  • 若不使用 Lock,多執行緒同時修改 counter 會產生 race condition,導致結果不正確。
  • with lock: 為 Pythonic 的寫法,會自動在區塊結束時釋放鎖。

範例 4:Event 用於執行緒間的訊號傳遞

import threading
import time

event = threading.Event()

def waiter():
    print("等待者:開始等待事件被觸發...")
    event.wait()                # 阻塞,直到被 set()
    print("等待者:事件已觸發,繼續執行!")

def setter():
    print("觸發者:3 秒後觸發事件")
    time.sleep(3)
    event.set()                 # 觸發事件,所有 wait() 會返回

t_wait = threading.Thread(target=waiter)
t_set = threading.Thread(target=setter)

t_wait.start()
t_set.start()
t_wait.join()
t_set.join()

說明

  • Event 提供 一次性 的訊號機制,適合「某個條件達成後,全部執行緒同時繼續」的情境。
  • event.is_set() 可檢查目前狀態。

範例 5:ThreadPoolExecutorconcurrent.futures)結合 threading

雖然 ThreadPoolExecutor 屬於 concurrent.futures,但它底層仍使用 threading,且提供更高層次的抽象,適合 大量相同任務 的情況。

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request

URLS = [
    "https://www.python.org",
    "https://www.github.com",
    "https://www.stackoverflow.com",
    # 可以自行加入更多網址
]

def fetch(url):
    """下載網址內容並回傳長度"""
    with urllib.request.urlopen(url) as resp:
        data = resp.read()
        print(f"{url} 下載完成,大小 {len(data)} bytes")
        return len(data)

# 建立最多 5 個執行緒的池
with ThreadPoolExecutor(max_workers=5) as executor:
    # map 會回傳結果的 iterator
    futures = {executor.submit(fetch, url): url for url in URLS}
    for future in as_completed(futures):
        url = futures[future]
        try:
            size = future.result()
        except Exception as exc:
            print(f"{url} 產生例外: {exc}")
        else:
            print(f"{url} 的內容大小: {size} bytes")

說明

  • ThreadPoolExecutor 讓我們不必手動管理 Thread 物件與 join(),只需要提交任務即可。
  • as_completed 允許我們依照任務完成的先後順序取得結果,適合 I/O 密集型的 爬蟲批次下載 等情境。

常見陷阱與最佳實踐

陷阱 說明 解決方式
GIL 限制 CPU 計算 多執行緒無法提升 CPU 密集型程式的效能。 采用 multiprocessing、C 擴充或 numba 等方式,或改用 非同步 I/Oasyncio)。
忘記呼叫 join() 主執行緒提前結束,子執行緒被迫中斷。 在需要確保所有工作完成時,務必 join()
競爭條件 多執行緒同時寫入共享變數導致錯誤結果。 使用 LockRLockQueue 等同步結構。
死鎖 兩個以上執行緒互相等待對方釋放資源。 確保取得鎖的順序一致,或使用 RLocktimeout
過度產生執行緒 每次任務都建立新執行緒,會耗盡系統資源。 使用 執行緒池ThreadPoolExecutor 或自訂 queue.Queue + 工作者執行緒)。
例外未捕獲 執行緒內拋出的例外不會傳到主執行緒,導致 silent failure。 run() 或目標函式內使用 try/except,或在 ThreadPoolExecutor 中檢查 future.exception()

推薦的最佳實踐

  1. 盡量使用執行緒池:避免手動管理過多的 Thread 物件。
  2. 把共享資料封裝在 queue.Queue:Queue 本身已內建鎖,安全且易於使用。
  3. 保持執行緒短小:長時間佔用 CPU 的工作交給多行程或 C 擴充。
  4. 明確命名執行緒:使用 name= 參數或在 run() 中設定,方便除錯。
  5. 記得釋放資源:使用 with 句法包裝 LockEvent,確保例外時仍能釋放。

實際應用場景

場景 為何適合使用 threading
檔案 I/O:大量讀寫磁碟檔案(如日誌切割、批次轉檔) I/O 等待時間長,執行緒可在等待期間切換,提升整體吞吐量。
網路爬蟲:同時抓取多個網站頁面 每個 HTTP 請求會被阻塞在 socket 等待,執行緒讓多個請求同時進行。
GUI 程式:在背景執行長時間任務(下載、計算)而不凍結 UI 主執行緒負責 UI,工作執行緒負責耗時任務,兩者互不干擾。
資料庫連線池:同時處理多筆查詢或寫入 每筆查詢可放在獨立執行緒,利用 DB 的 I/O 並行性。
即時監控:持續讀取感測器或串口資料 每條感測通道使用獨立執行緒,確保即時性。

小技巧:在上述場景中,若同時需要大量執行緒且頻繁建立/銷毀,建議改用 ThreadPoolExecutor + queue.Queue,可大幅降低系統開銷。


總結

threading 模組是 Python 提供的 輕量級併發解決方案,特別適合 I/O 密集型等待外部資源 的情況。透過本篇的 概念說明5 個實作範例、以及 陷阱與最佳實踐,讀者應已能:

  1. 正確建立、啟動與回收執行緒。
  2. 使用 LockEventThreadPoolExecutor 等同步工具,避免競爭條件與死鎖。
  3. 判斷何時應該選擇 threading、何時改用 multiprocessingasyncio
  4. 在真實專案(爬蟲、檔案處理、GUI)中落實執行緒的應用。

掌握好 執行緒的生命週期同步機制,就能在 Python 程式中靈活運用併發,提升效能與使用者體驗。祝大家在未來的開發旅程中,玩得開心、寫得順手! 🚀