Rust 並行與多執行緒
主題:非同步程式設計(Async)簡介
簡介
在現代的網路服務、IoT 裝置與高頻交易系統中,同時處理多個 I/O 任務已成為必備能力。傳統的多執行緒模型雖然能夠達成並行,但每個執行緒都會佔用系統的堆疊空間,建立與切換的成本相對較高。Rust 的 非同步(async) 機制則提供了一種「輕量級的協程」(coroutine) 方式,讓程式在等待 I/O 時可以「讓出」執行權,讓其他任務繼續跑,而不必真的阻塞 OS 執行緒。
對於 初學者,async 可能看起來像是另一套全新的語法;對 中級開發者,它則是提升效能、降低資源消耗的關鍵工具。本篇文章將從概念、語法到實務範例,完整介紹 Rust 的非同步程式設計,幫助讀者快速上手並避免常見陷阱。
核心概念
1. async 與 await 的基本原理
async fn:將普通函式標記為非同步,編譯器會把它轉換成返回Future的函式。Future:一個懸而未決的計算,只有在被 poll 時才會前進一步。await:在Future上呼叫.await,表示「暫停」當前任務,等候結果完成,之後再繼續執行。
重點:
await並不會阻塞執行緒,它只是把控制權交還給執行器 (executor)。
2. 執行器(Executor)
Future 本身不會自行跑起來,需要一個 executor 來驅動它。Rust 生態常見的執行器有:
| 執行器 | 特色 |
|---|---|
tokio |
功能最完整,支援 I/O、計時器、工作池等,適合大型服務 |
async-std |
API 與標準函式庫相似,輕量且易上手 |
smol |
超小型執行器,適合嵌入式或 CLI 工具 |
本文以 tokio 為例,因為它是業界最常見的選擇。
3. 非同步 I/O 與阻塞 I/O 的差異
| 阻塞 I/O | 非同步 I/O | |
|---|---|---|
| 執行緒使用 | 每個 I/O 需要佔用執行緒直到完成 | 同一執行緒可同時管理多個 I/O |
| 資源開銷 | 記憶體堆疊、上下文切換成本高 | 只在需要時切換,成本低 |
| 程式可讀性 | 直線式程式碼 | 需要 await,仍保持直線式邏輯 |
4. Send 與 Sync 在 async 中的角色
Send:表示Future可以安全地在不同執行緒之間傳遞。大部分tokio任務都要求Future: Send。Sync:表示同時多個執行緒可以共享同一個值。非同步程式中,若要在多個任務共享資料,通常會使用Arc<Mutex<T>>或RwLock。
程式碼範例
以下示範 5 個常見且實用的 async 範例,每段程式碼皆附有說明註解。
範例 1:最簡單的 async 函式
// main.rs
use tokio::time::{sleep, Duration};
/// 一個會等待 1 秒的非同步函式
async fn delayed_hello() -> &'static str {
// `sleep` 本身是非同步的,使用 `.await` 暫停當前任務
sleep(Duration::from_secs(1)).await;
"Hello, async Rust!"
}
#[tokio::main] // tokio 提供的入口巨集,會自動建立執行器
async fn main() {
// 呼叫 async 函式並立即 `.await` 取得結果
let msg = delayed_hello().await;
println!("{}", msg);
}
說明:#[tokio::main] 產生一個單執行緒的 runtime,讓 main 可以是 async fn。sleep 不會佔用執行緒,期間 runtime 可以排程其他任務。
範例 2:同時執行多個任務 (join!)
use tokio::time::{sleep, Duration};
use tokio::join;
/// 模擬兩個獨立的 I/O 任務
async fn task_a() -> u32 {
sleep(Duration::from_millis(500)).await;
10
}
async fn task_b() -> u32 {
sleep(Duration::from_millis(800)).await;
20
}
#[tokio::main]
async fn main() {
// `join!` 會同時 poll 兩個 Future,等全部完成才返回
let (a, b) = join!(task_a(), task_b());
println!("Result: {}", a + b); // 30
}
說明:join! 讓兩個任務在同一執行緒上「併發」執行,總耗時約為較長者的 800ms,而非 1.3s。
範例 3:使用 tokio::spawn 建立背景任務
use tokio::time::{sleep, Duration};
async fn background_job(id: usize) {
println!("Job {} started", id);
sleep(Duration::from_secs(2)).await;
println!("Job {} finished", id);
}
#[tokio::main]
async fn main() {
// `spawn` 會把 Future 放入執行器的工作隊列,立即返回 JoinHandle
let handle1 = tokio::spawn(background_job(1));
let handle2 = tokio::spawn(background_job(2));
// 主任務仍然可以做別的事
println!("Main thread is free!");
// 等待背景任務結束
let _ = handle1.await;
let _ = handle2.await;
}
說明:spawn 產生的任務是 Send 的,會在執行器的工作池中執行。即使主任務已結束,背景任務仍會持續跑完。
範例 4:共享可變資料 (Arc<Mutex<T>>)
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};
async fn increment(counter: Arc<Mutex<u32>>) {
// 取得鎖定,這裡會暫停當前任務直到鎖可用
let mut guard = counter.lock().await;
*guard += 1;
println!("counter = {}", *guard);
// 鎖會在 guard 被 drop 時自動釋放
sleep(Duration::from_millis(200)).await;
}
#[tokio::main]
async fn main() {
let counter = Arc::new(Mutex::new(0));
// 同時啟動 5 個任務,每個任務都要遞增計數器
let mut handles = Vec::new();
for _ in 0..5 {
let c = Arc::clone(&counter);
handles.push(tokio::spawn(increment(c)));
}
for h in handles {
let _ = h.await;
}
}
說明:Arc 讓多個任務共享所有權,Mutex(非阻塞版)確保同時間只有一個任務可以修改資料。注意:若在阻塞環境中使用 std::sync::Mutex,會導致執行緒被卡住,應改用 tokio::sync::Mutex。
範例 5:結合 async 與 reqwest 進行 HTTP 請求
use reqwest::Error;
use tokio::time::Instant;
async fn fetch_url(url: &str) -> Result<String, Error> {
// reqwest 的 `get` 會返回一個 Future,使用 `.await` 取得回應
let resp = reqwest::get(url).await?;
let body = resp.text().await?;
Ok(body)
}
#[tokio::main]
async fn main() {
let start = Instant::now();
// 同時抓取三個網站
let urls = [
"https://www.rust-lang.org",
"https://docs.rs",
"https://crates.io",
];
let mut tasks = Vec::new();
for &u in &urls {
tasks.push(tokio::spawn(fetch_url(u)));
}
for t in tasks {
match t.await {
Ok(Ok(content)) => println!("Fetched {} bytes", content.len()),
Ok(Err(e)) => eprintln!("Request error: {}", e),
Err(e) => eprintln!("Task panicked: {}", e),
}
}
println!("All done in {:.2?}", start.elapsed());
}
說明:reqwest 已經內建 async 支援,搭配 tokio 可以在同一執行緒上同時發送多個 HTTP 請求,大幅縮短總等待時間。
常見陷阱與最佳實踐
| 陷阱 | 說明 | 建議的做法 |
|---|---|---|
忘記 .await |
直接呼叫 async fn 只會得到 Future,不會執行。 |
確保每個 Future 在需要結果時都有 .await,或使用 join!、spawn。 |
| 阻塞執行緒 | 在 async 環境裡使用 std::thread::sleep、std::fs::read_to_string 等阻塞 API。 |
改用 tokio::time::sleep、tokio::fs::read_to_string 等非阻塞版本。 |
| 共享資料未加鎖 | 多個任務同時讀寫同一變數會產生資料競爭。 | 使用 Arc<Mutex<T>>、Arc<RwLock<T>> 或訊息通道 (tokio::sync::mpsc)。 |
| 過度 spawn | 每次小任務都 spawn,會產生大量 JoinHandle,浪費資源。 |
小任務直接 .await,只有需要背景執行或跨執行緒時才 spawn。 |
!Send Future |
在多執行緒 runtime (如 tokio::multi_thread) 中,!Send 的 Future 只能在單執行緒上跑。 |
若要在多執行緒環境使用,確保所有 Future 都是 Send,或使用 #[tokio::main(flavor = "current_thread")]。 |
| 忘記錯誤處理 | .await 失敗會回傳 Result,未處理會導致 panic。 |
使用 match、? 或 anyhow 進行錯誤傳播與記錄。 |
最佳實踐
- 保持 async/await 的直線式邏輯:盡量讓程式碼看起來像同步流程,避免過度嵌套
match。 - 使用
tokio::select!處理多個競爭事件(如超時、取消)。 - 將阻塞程式碼包在
tokio::task::spawn_blocking,讓 runtime 自動把它搬到專門的阻塞執行緒池。 - 在公共函式庫中提供同步與非同步兩套 API,方便使用者根據需求選擇。
- 測試 async 程式:使用
#[tokio::test]或async-std::test,確保測試本身也是非同步的。
實際應用場景
| 場景 | 為什麼適合 async | 範例程式碼或概念 |
|---|---|---|
| Web 伺服器(REST API) | 同時處理成千上萬的 HTTP 請求,I/O 為主 | tokio::net::TcpListener + hyper |
| 資料庫連線池 | 多個查詢同時等待 DB 回應,避免建立過多執行緒 | sqlx、deadpool 提供 async pool |
| 即時聊天或推播服務 | 大量長連線 (WebSocket) 需要保持低延遲 | tokio-tungstenite + select! 處理訊息與心跳 |
| 檔案同步/備份工具 | 同時讀寫多個檔案、上傳至雲端 | tokio::fs + reqwest::Client::post |
| 嵌入式或 CLI 工具 | 限制資源的環境下仍需同時處理 I/O | smol + async-io,使用 no_std 時可考慮 embassy |
總結
非同步程式設計 是 Rust 在 安全、效能與可組合性 三方面的核心優勢之一。透過 async fn、await、以及功能完整的執行器(如 tokio),開發者可以在 單執行緒 上同時管理大量 I/O 任務,顯著降低記憶體與上下文切換成本。
本文從概念、語法、實作範例到常見陷阱與最佳實踐,提供了一條 從入門到實務 的完整學習路徑。只要遵守以下要點,就能在自己的專案中安全、有效地使用 async:
- 永遠使用非阻塞 API(
tokio::time::sleep、tokio::fs等)。 - 正確管理共享狀態(
Arc<Mutex<T>>、訊息通道)。 - 適時使用
spawn、join!、select!來達成併發與競爭。 - 確保 Future 為
Send,除非你明確選擇單執行緒 runtime。 - 寫測試、加錯誤處理,讓非同步程式同樣可靠。
掌握了這些基礎,你就能在 Web 服務、資料處理、即時系統 等各種場景中,發揮 Rust 非同步的最大威力。祝你寫程式快樂、寫出高效且安全的 Rust 應用!