LangChain 教學:Runnable – 高度模組化流程中的 RunnableParallel
簡介
在建構大型語言模型(LLM)應用時,流程的可組合性與 效能 常常是兩大挑戰。
LangChain 透過 Runnable 介面提供了「把任務切成小塊、再自由拼湊」的抽象,而 RunnableParallel 則是讓多個 Runnable 同時執行、把結果彙整回傳的利器。
使用 RunnableParallel,我們可以在同一時間向多個模型、工具或資料來源發送請求,顯著縮短回應時間,同時保持程式碼的可讀性與可維護性。對於需要 同時比對多個答案、同時呼叫多個 API、或是平行處理大量資料 的情境,RunnableParallel 是不可或缺的基礎建構塊。
本文將從概念說明、實作範例、常見陷阱與最佳實踐,逐步帶你掌握在 LangChainJS 中使用 RunnableParallel 的技巧,讓你的 LLM 應用更快、更彈性。
核心概念
1. Runnable 是什麼?
在 LangChain 中,Runnable 是一個「可呼叫」的物件,符合以下介面:
interface Runnable<I, O> {
invoke(input: I, options?: InvokeOptions): Promise<O>;
}
I:輸入資料型別O:輸出資料型別
所有 LangChain 的 Chain、Tool、LLM Wrapper 都實作了這個介面,讓它們可以像函式一樣被 await 呼叫。
2. 為什麼需要 Parallel?
單一 Runnable 雖然簡潔,但在以下情況會產生瓶頸:
| 情境 | 可能的瓶頸 |
|---|---|
| 同時查詢多個外部 API(如天氣、股價) | 網路延遲 會累加 |
| 多模型比較(GPT‑4 vs Claude) | 模型回應時間 逐一等待 |
| 大量文件向量搜尋(上千筆) | IO/CPU 佔用過高 |
若能 同時發送 這些請求,總耗時會接近最慢那一筆,而不是所有請求的總和。這正是 RunnableParallel 的核心價值。
3. RunnableParallel 的工作原理
RunnableParallel 接收一個 Runnable 陣列(或映射),在 invoke 時會:
- 同時 呼叫每個子 Runnable 的
invoke(使用Promise.all)。 - 等待所有子任務完成。
- 彙總 結果,回傳一個 物件(或陣列)讓使用者使用。
class RunnableParallel<I, O extends Record<string, any>> implements Runnable<I, O> {
constructor(private runnables: Record<string, Runnable<I, any>>) {}
async invoke(input: I, options?: InvokeOptions): Promise<O> {
const entries = Object.entries(this.runnables);
const promises = entries.map(([key, runnable]) =>
runnable.invoke(input, options).then((result) => ({ [key]: result }))
);
const resolved = await Promise.all(promises);
return Object.assign({}, ...resolved) as O;
}
}
注意:
RunnableParallel本身不會改變子 Runnable 的行為,只是負責 平行化調度 與 結果合併。
程式碼範例
以下示範 5 個實用範例,從最基礎到進階應用,皆以 LangChainJS(JavaScript/TypeScript)撰寫。
範例 1:平行呼叫兩個 LLM(OpenAI GPT‑3.5 與 Anthropic Claude)
import { ChatOpenAI } from "langchain/chat_models/openai";
import { ChatAnthropic } from "langchain/chat_models/anthropic";
import { RunnableParallel } from "langchain/schema/runnable";
// 建立兩個 LLM Runnable
const gpt = new ChatOpenAI({ modelName: "gpt-3.5-turbo", temperature: 0 });
const claude = new ChatAnthropic({ modelName: "claude-v1", temperature: 0 });
// 包裝成 RunnableParallel
const parallelLLM = new RunnableParallel({
gpt,
claude,
});
// 輸入訊息
const prompt = { input: "請用繁體中文說明什麼是區塊鏈。" };
parallelLLM.invoke(prompt).then((result) => {
console.log("GPT 回應:", result.gpt);
console.log("Claude 回應:", result.claude);
});
重點:只要把 LLM 包裝成 Runnable,
RunnableParallel即可同時取得兩個模型的答案,省下逐一呼叫的時間。
範例 2:同時呼叫兩個外部 API(天氣與匯率)
import { Runnable } from "langchain/schema/runnable";
import fetch from "node-fetch";
class WeatherAPI extends Runnable {
async invoke({ location }) {
const res = await fetch(`https://api.weather.com/v3/weather/${location}`);
return res.json();
}
}
class ExchangeRateAPI extends Runnable {
async invoke({ base, target }) {
const res = await fetch(`https://api.exchangerate.host/latest?base=${base}&symbols=${target}`);
return res.json();
}
}
// 建立平行 Runnable
const parallelAPI = new RunnableParallel({
weather: new WeatherAPI(),
exchange: new ExchangeRateAPI(),
});
parallelAPI.invoke({ location: "Taipei", base: "USD", target: "TWD" })
.then((result) => {
console.log("天氣資料:", result.weather);
console.log("匯率資料:", result.exchange);
});
技巧:只要保證每個子 Runnable 接受相同的輸入型別(或在
invoke前自行轉換),就能輕鬆平行化。
範例 3:平行向多個向量資料庫搜尋(FAISS、Pinecone)
import { FAISS } from "langchain/vectorstores/faiss";
import { PineconeStore } from "langchain/vectorstores/pinecone";
import { RunnableParallel } from "langchain/schema/runnable";
const faiss = await FAISS.fromTexts(
["文件 A", "文件 B"],
[{ source: "local" }],
new OpenAIEmbeddings()
);
const pinecone = await PineconeStore.fromTexts(
["文件 C", "文件 D"],
[{ source: "cloud" }],
new OpenAIEmbeddings(),
{ indexName: "my-index" }
);
const parallelSearch = new RunnableParallel({
local: faiss.asRetriever(),
cloud: pinecone.asRetriever(),
});
parallelSearch.invoke({ query: "什麼是機器學習?", k: 3 })
.then((result) => {
console.log("FAISS 結果:", result.local);
console.log("Pinecone 結果:", result.cloud);
});
說明:
asRetriever()會返回一個符合 Runnable 介面的檢索器,讓向量搜尋也能加入平行流程。
範例 4:結合工具(Tool)與 LLM,平行執行多個工具
import { StructuredTool } from "langchain/tools";
import { RunnableParallel } from "langchain/schema/runnable";
class CalculatorTool extends StructuredTool {
name = "calculator";
description = "計算簡單的數學表達式";
async _call({ expression }) {
// 使用 eval 只做示範,實務請使用安全的 parser
return eval(expression);
}
}
class WikipediaTool extends StructuredTool {
name = "wikipedia";
description = "搜尋維基百科摘要";
async _call({ query }) {
const res = await fetch(`https://en.wikipedia.org/api/rest_v1/page/summary/${encodeURIComponent(query)}`);
const data = await res.json();
return data.extract;
}
}
// 把工具包成 Runnable
const calculator = new CalculatorTool();
const wikipedia = new WikipediaTool();
const parallelTools = new RunnableParallel({
calc: calculator,
wiki: wikipedia,
});
parallelTools.invoke({ expression: "2+3*4", query: "Artificial intelligence" })
.then((result) => {
console.log("計算結果:", result.calc);
console.log("維基摘要:", result.wiki);
});
實務提醒:工具的
_call必須回傳 Promise,否則無法正確被RunnableParallel包裝。
範例 5:自訂輸入前處理與結果後處理的組合(Chain + Parallel)
import { RunnableSequence } from "langchain/schema/runnable";
import { ChatOpenAI } from "langchain/chat_models/openai";
import { RunnableParallel } from "langchain/schema/runnable";
// 前置:把使用者問題拆成兩個子問題
const splitter = RunnableSequence.from([
async ({ question }) => ({
q1: `${question} 請簡短說明`,
q2: `${question} 請提供三個相關例子`,
})
]);
// 兩個 LLM 分別負責不同子問題
const shortAnswer = new ChatOpenAI({ temperature: 0 });
const examples = new ChatOpenAI({ temperature: 0.7 });
const parallelLLM = new RunnableParallel({
summary: shortAnswer,
examples: examples,
});
// 後置:把兩段回應合併成最終輸出
const merger = RunnableSequence.from([
async ({ summary, examples }) => ({
final: `簡短說明:${summary}\n\n相關例子:\n${examples}`
})
]);
// 完整流程
const chain = splitter
.pipe(parallelLLM)
.pipe(merger);
chain.invoke({ question: "什麼是區塊鏈?" })
.then((out) => console.log(out.final));
關鍵:
RunnableSequence讓我們可以在 平行前後 加入 前處理 與 後處理,形成完整的端到端工作流。
常見陷阱與最佳實踐
| 陷阱 | 說明 | 解決方案 |
|---|---|---|
| 子 Runnable 互相依賴 | 若兩個 Runnable 必須先後執行,直接放入 RunnableParallel 會產生錯誤結果。 |
使用 RunnableSequence 先做依賴處理,或把依賴的部分抽成單獨的 Runnable。 |
| 錯誤傳播 | 當任一子任務拋出例外,Promise.all 會立即 reject,其他任務的結果會被捨棄。 |
使用 Promise.allSettled 包裝 RunnableParallel,或在子 Runnable 內捕獲錯誤並回傳預設值。 |
| 資源競爭 | 同時呼叫大量 LLM 可能觸發 API 配額上限或網路瓶頸。 | 控制平行度(如 p-limit)或在 invoke 前加入 速率限制(RateLimiter)。 |
| 輸入不一致 | 子 Runnable 接受的參數結構不同,導致執行時缺少欄位。 | 在 RunnableParallel 前加入 Adapter Runnable,統一輸入格式。 |
| 結果命名衝突 | 若子 Runnable 回傳的物件屬性名稱相同,最終合併時會被覆蓋。 | 為每個 Runnable 指定唯一的 key(如 summary, details),或在子 Runnable 中自行命名空間。 |
最佳實踐
- 保持 Stateless:讓每個 Runnable 只依賴輸入參數,避免全域狀態造成競爭條件。
- 使用型別(TypeScript):明確宣告
RunnableParallel的輸入/輸出型別,編譯階段即可捕捉不相容的錯誤。 - 錯誤容忍:在生產環境建議使用
Promise.allSettled包裝,並在結果中檢查status === "rejected"。 - 資源管理:對於需要大量計算或 I/O 的子任務,可使用 Worker Threads 或 Cluster 進一步水平擴展。
- 日誌與監控:將每個子 Runnable 的開始、結束與耗時寫入日誌,方便後續性能分析。
實際應用場景
| 場景 | 為何需要 RunnableParallel | 實作概念 |
|---|---|---|
| 多模型答案比對 | 同時取得不同 LLM 的回應,快速決策最佳答案 | 把多個 LLM 包成 Parallel,根據置信度或投票機制挑選 |
| 跨服務資料聚合(天氣 + 交通 + 事件) | 使用者查詢「今天天氣與交通狀況」,需要同時呼叫 3 個 API | 每個 API 封裝成 Runnable,Parallel 後合併成單一回覆 |
| 向量搜尋 + 文字搜尋 | 同時在本地向量庫與遠端全文檢索引擎查找相關文件 | 把兩個檢索器包成 Parallel,最後在 LLM 中做摘要合成 |
| 即時金融分析 | 同時抓取股價、匯率、新聞情緒,快速生成投資報告 | 各資訊源各自 Runnable,Parallel 提升報告產出速度 |
| 大型問答系統 | 使用子模型分工(專業領域、通用領域)同時產生答案,再由主模型進行整合 | 子模型作為 Parallel,主模型作為後置 Merger |
總結
- RunnableParallel 是 LangChain 中實現 高效平行化 的核心工具,讓多個 Runnable 同時執行、結果自動彙整。
- 透過 RunnableSequence、Adapter 與 錯誤容忍 的技巧,我們可以在不犧牲可讀性的前提下,構建 高度模組化、可擴充 的 LLM 工作流。
- 在實務上,從 多模型比對、跨服務聚合 到 向量搜尋與文字搜尋混合,RunnableParallel 都能大幅縮短回應時間並提升使用者體驗。
- 記得遵守 資源管理、錯誤處理 與 型別安全 的最佳實踐,讓你的應用在生產環境中更穩定、更易維護。
掌握了 RunnableParallel,未來在 LangChain 的開發旅程中,你將能更自由地組合工具、模型與資料來源,打造出 即時、彈性、可靠 的 AI 應用。祝開發順利,期待看到你用 Parallel 產出的精彩作品! 🚀