本文 AI 產出,尚未審核

Rust 並行與多執行緒:執行緒間通訊(Channels)

簡介

在現代的系統程式設計中,多執行緒已成為提升效能與資源利用率的關鍵手段。Rust 以「安全」為核心,提供了零成本抽象的同步機制,其中 Channel(通道)是最常被使用的執行緒間通訊方式。透過 Channel,資料可以在不同執行緒之間安全地傳遞,而不必直接操作共享記憶體,從而避免資料競爭與死結等常見問題。

本單元將帶你了解 Rust 標準函式庫 std::sync::mpsc(multiple‑producer, single‑consumer)與 crossbeam::channel 兩大通道實作,說明其使用方式、注意事項與實務應用,讓你在寫多執行緒程式時能夠 快速、正確 地完成訊息傳遞。


核心概念

1. 為什麼使用 Channel?

  • 所有權模型:Channel 透過所有權的搬移(move)傳遞資料,編譯器在編譯期就能保證不會有同時的可變引用,避免資料競爭。
  • 阻塞與非阻塞recv(接收)會在沒有訊息時阻塞執行緒,try_recv 則提供非阻塞的查詢方式,讓開發者依需求選擇行為。
  • 多生產者mpsc 允許多個 Sender 複製(clone)後同時發送訊息,適合「工作者池」模式。

2. std::sync::mpsc 基本使用

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // 建立一個 channel,返回 (sender, receiver)
    let (tx, rx) = mpsc::channel();

    // 產生子執行緒,搬移 tx 進去
    thread::spawn(move || {
        let data = "Hello from child".to_string();
        // 送出資料,若接收端已關閉會回傳 Err
        tx.send(data).unwrap();
    });

    // 主執行緒阻塞等待訊息
    let received = rx.recv().unwrap();
    println!("收到訊息: {}", received);
}
  • mpsc::channel() 產生 單向 通道,txSenderrxReceiver
  • tx.send(value) 會把 value 的所有權搬移給接收端。
  • rx.recv() 會阻塞直到有訊息或所有 Sender 被丟棄。

3. 多生產者(Clone Sender)

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    // 複製 Sender,變成兩個生產者
    let tx1 = tx.clone();
    let tx2 = tx.clone();

    // 第一個子執行緒
    thread::spawn(move || {
        tx1.send(1).unwrap();
    });

    // 第二個子執行緒
    thread::spawn(move || {
        tx2.send(2).unwrap();
    });

    // 主執行緒收集兩個訊息
    for _ in 0..2 {
        println!("收到: {}", rx.recv().unwrap());
    }
}
  • clone() 只複製 傳送端的句柄,底層通道仍是同一條。
  • 接收端不需要特別處理,只要呼叫 recv 即可取得任意生產者的訊息。

4. 非阻塞接收與超時

use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};

fn main() {
    let (tx, rx) = mpsc::channel();

    // 子執行緒延遲 500ms 後才發送
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(500));
        tx.send("delayed").unwrap();
    });

    // 使用 try_recv 進行輪詢
    let start = Instant::now();
    loop {
        match rx.try_recv() {
            Ok(msg) => {
                println!("收到: {}", msg);
                break;
            }
            Err(mpsc::TryRecvError::Empty) => {
                // 尚未有訊息,繼續輪詢
                if start.elapsed() > Duration::from_secs(1) {
                    println!("超時未收到訊息");
                    break;
                }
            }
            Err(e) => panic!("通道錯誤: {:?}", e),
        }
        thread::sleep(Duration::from_millis(50));
    }
}
  • try_recv 不會阻塞,適合需要即時回應或 UI 事件循環的情境。
  • 若想要「阻塞但有上限」,可自行使用 select!(在 crossbeam)或 recv_timeoutstd)實作。

5. 使用 crossbeam::channel(進階功能)

crossbeam 提供 多消費者(multiple‑consumer)有界(bounded)無界(unbounded) 兩種通道,效能也比 std::sync::mpsc 更佳。

use crossbeam::channel;
use std::thread;

fn main() {
    // 建立一條有界通道,容量為 2
    let (tx, rx) = channel::bounded(2);

    // 多個消費者
    for i in 0..3 {
        let rx_clone = rx.clone();
        thread::spawn(move || {
            while let Ok(v) = rx_clone.recv() {
                println!("消費者 {} 收到 {}", i, v);
            }
        });
    }

    // 生產者持續發送
    for n in 0..5 {
        tx.send(n).unwrap(); // 若緩衝已滿會阻塞
        println!("發送 {}", n);
    }
    // 所有 Sender 被 drop,接收端會自動結束迴圈
}
  • bounded(cap) 產生 有界 通道,當緩衝區滿時 send 會阻塞,適合流量控制。
  • unbounded() 則是無界通道,send 永遠不會阻塞,但可能會因記憶體使用過大而造成 OOM。
  • rx.clone() 產生 多個接收端,每條訊息只會被其中一個接收端消費,實現 work‑stealing

常見陷阱與最佳實踐

陷阱 說明 建議的解決方式
忘記 Sender 被 drop 所有 Sender 被釋放後,Receiver::recv 會返回 Err,若未處理會導致程式意外結束。 在需要持續接收的地方,保留至少一個 Sender,或使用 while let Ok(v) = rx.recv() 迴圈來安全結束。
傳遞非 Send 型別 Channel 只能傳遞實作了 Send 的類型。 確保資料結構實作 Send(大多數標準型別皆支援),若自訂型別需要 unsafe impl Send,必須非常小心。
有界通道死結 生產者阻塞在 send,而所有消費者已經結束或被阻塞,導致系統無法前進。 使用 select!try_send 先檢查緩衝區狀態,或設計「關閉訊號」讓消費者在適當時機退出。
過度使用 clone 每一次 clone 都會產生一個新的句柄,過多的句柄會增加記憶體與同步開銷。 僅在需要多個生產者/消費者時才 clone,若可以共用同一個句柄則避免不必要的複製。
阻塞在 UI 執行緒 在 GUI 應用程式中直接呼叫 recv 會卡住 UI,造成使用者體驗不佳。 使用 try_recv 搭配事件迴圈,或將阻塞的接收工作交給背景執行緒,再透過回呼或訊號更新 UI。

最佳實踐

  1. 明確定義訊息類型:使用 enum 包裝所有可能的訊息,讓接收端可以一次 match 完成所有分支。
  2. 使用有界通道做流量控制:避免生產者過快產生訊息導致記憶體飆升。
  3. 在結束時發送「關閉訊號」:例如 Message::Terminate,讓消費者能優雅地離開迴圈。
  4. 盡量使用 crossbeam:在需要多消費者或高併發的情境下,crossbeam::channel 的效能與功能更完整。

實際應用場景

1. 工作者池(Thread Pool)

use crossbeam::channel;
use std::thread;

enum Job {
    Compute(u64),
    Quit,
}

fn main() {
    let (job_tx, job_rx) = channel::unbounded::<Job>();
    let mut handles = Vec::new();

    // 建立 4 個工作者執行緒
    for id in 0..4 {
        let rx = job_rx.clone();
        handles.push(thread::spawn(move || {
            while let Ok(job) = rx.recv() {
                match job {
                    Job::Compute(n) => {
                        println!("Worker {} 計算 {}", id, n);
                        // 假裝做一些計算
                        let _ = (0..n).fold(0, |a, b| a + b);
                    }
                    Job::Quit => break,
                }
            }
        }));
    }

    // 主執行緒發送工作
    for i in 1..=10 {
        job_tx.send(Job::Compute(i * 1000)).unwrap();
    }

    // 發送結束訊號
    for _ in 0..4 {
        job_tx.send(Job::Quit).unwrap();
    }

    // 等待所有工作者結束
    for h in handles {
        h.join().unwrap();
    }
}
  • 情境:大量計算任務需要分配給固定數量的執行緒,避免頻繁建立/銷毀執行緒的開銷。
  • 重點:使用 unbounded 讓主執行緒不會被緩衝區限制;Job::Quit 作為結束訊號。

2. 事件驅動的 GUI 後端

在桌面 GUI(例如使用 gtk-rs)時,常把 I/O 或計算工作放在背景執行緒,完成後透過 Channel 把結果送回主執行緒更新 UI。

use gtk::prelude::*;
use gtk::{Application, ApplicationWindow, Button, Label};
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let app = Application::builder()
        .application_id("com.example.channel_demo")
        .build();

    app.connect_activate(|app| {
        let (tx, rx) = mpsc::channel::<String>();
        let window = ApplicationWindow::builder()
            .application(app)
            .title("Channel Demo")
            .default_width(300)
            .default_height(100)
            .build();

        let label = Label::new(Some("等待結果..."));
        let button = Button::with_label("開始工作");
        let vbox = gtk::Box::new(gtk::Orientation::Vertical, 5);
        vbox.append(&button);
        vbox.append(&label);
        window.set_child(Some(&vbox));
        window.show();

        // 按下按鈕後啟動背景工作
        let tx_clone = tx.clone();
        button.connect_clicked(move |_| {
            let tx_bg = tx_clone.clone();
            thread::spawn(move || {
                thread::sleep(Duration::from_secs(2));
                tx_bg.send("工作完成".to_string()).unwrap();
            });
        });

        // 主執行緒輪詢接收結果,使用 idle_add 讓 UI 不會卡住
        glib::source::idle_add_local(move || {
            if let Ok(msg) = rx.try_recv() {
                label.set_text(&msg);
            }
            glib::Continue(true)
        });
    });

    app.run();
}
  • 關鍵:背景執行緒使用 tx.send,主執行緒在 UI 事件迴圈中以 try_recv 非阻塞方式取得訊息,避免 UI 卡頓。

3. 多機制協調(Producer‑Consumer + Timer)

use crossbeam::channel::{self, select, tick, Receiver};
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = channel::unbounded::<u32>();
    let ticker = tick(Duration::from_secs(1));

    // 生產者:每 200ms 產生一個數字
    thread::spawn(move || {
        for i in 0.. {
            thread::sleep(Duration::from_millis(200));
            tx.send(i).unwrap();
        }
    });

    // 消費者:同時監聽資料與計時器
    loop {
        select! {
            recv(rx) -> msg => {
                println!("收到資料: {}", msg.unwrap());
            }
            recv(ticker) -> _ => {
                println!("--- 每秒一次的心跳 ---");
            }
        }
    }
}
  • select! 讓執行緒能同時等待多個通道事件,常用於 事件驅動時間驅動 的混合場景。

總結

  • Channel 是 Rust 提供的「所有權安全」的執行緒間通訊機制,讓資料搬移不會產生競爭條件。
  • std::sync::mpsc 適合簡單的 單消費者 場景;crossbeam::channel 則支援 多消費者有界select 等進階功能,效能也更佳。
  • 使用時要注意 Sender 的生命週期訊息類型必須實作 Send,以及 有界通道可能造成的死結
  • 透過 enum 包裝訊息、關閉訊號有界緩衝,可以寫出 可預測、易維護 的多執行緒程式。
  • 在實務上,Channel 常被用於 工作者池、GUI 後端、事件驅動系統 等多種情境,配合 select!try_recvrecv_timeout 等技巧,能滿足從簡單到高併發的需求。

掌握了 Channel 後,你就能在 Rust 中安全且高效地構建並行程式,讓程式碼既 不會因競爭條件而崩潰。祝你在 Rust 的並行世界裡玩得開心!