本文 AI 產出,尚未審核

LangChain 教學:Runnable – 高度模組化流程中的 RunnableParallel


簡介

在建構大型語言模型(LLM)應用時,流程的可組合性效能 常常是兩大挑戰。
LangChain 透過 Runnable 介面提供了「把任務切成小塊、再自由拼湊」的抽象,而 RunnableParallel 則是讓多個 Runnable 同時執行、把結果彙整回傳的利器。

使用 RunnableParallel,我們可以在同一時間向多個模型、工具或資料來源發送請求,顯著縮短回應時間,同時保持程式碼的可讀性與可維護性。對於需要 同時比對多個答案、同時呼叫多個 API、或是平行處理大量資料 的情境,RunnableParallel 是不可或缺的基礎建構塊。

本文將從概念說明、實作範例、常見陷阱與最佳實踐,逐步帶你掌握在 LangChainJS 中使用 RunnableParallel 的技巧,讓你的 LLM 應用更快、更彈性。


核心概念

1. Runnable 是什麼?

在 LangChain 中,Runnable 是一個「可呼叫」的物件,符合以下介面:

interface Runnable<I, O> {
  invoke(input: I, options?: InvokeOptions): Promise<O>;
}
  • I:輸入資料型別
  • O:輸出資料型別

所有 LangChain 的 Chain、Tool、LLM Wrapper 都實作了這個介面,讓它們可以像函式一樣被 await 呼叫。

2. 為什麼需要 Parallel?

單一 Runnable 雖然簡潔,但在以下情況會產生瓶頸:

情境 可能的瓶頸
同時查詢多個外部 API(如天氣、股價) 網路延遲 會累加
多模型比較(GPT‑4 vs Claude) 模型回應時間 逐一等待
大量文件向量搜尋(上千筆) IO/CPU 佔用過高

若能 同時發送 這些請求,總耗時會接近最慢那一筆,而不是所有請求的總和。這正是 RunnableParallel 的核心價值。

3. RunnableParallel 的工作原理

RunnableParallel 接收一個 Runnable 陣列(或映射),在 invoke 時會:

  1. 同時 呼叫每個子 Runnable 的 invoke(使用 Promise.all)。
  2. 等待所有子任務完成。
  3. 彙總 結果,回傳一個 物件(或陣列)讓使用者使用。
class RunnableParallel<I, O extends Record<string, any>> implements Runnable<I, O> {
  constructor(private runnables: Record<string, Runnable<I, any>>) {}
  async invoke(input: I, options?: InvokeOptions): Promise<O> {
    const entries = Object.entries(this.runnables);
    const promises = entries.map(([key, runnable]) =>
      runnable.invoke(input, options).then((result) => ({ [key]: result }))
    );
    const resolved = await Promise.all(promises);
    return Object.assign({}, ...resolved) as O;
  }
}

注意RunnableParallel 本身不會改變子 Runnable 的行為,只是負責 平行化調度結果合併


程式碼範例

以下示範 5 個實用範例,從最基礎到進階應用,皆以 LangChainJS(JavaScript/TypeScript)撰寫。

範例 1:平行呼叫兩個 LLM(OpenAI GPT‑3.5 與 Anthropic Claude)

import { ChatOpenAI } from "langchain/chat_models/openai";
import { ChatAnthropic } from "langchain/chat_models/anthropic";
import { RunnableParallel } from "langchain/schema/runnable";

// 建立兩個 LLM Runnable
const gpt = new ChatOpenAI({ modelName: "gpt-3.5-turbo", temperature: 0 });
const claude = new ChatAnthropic({ modelName: "claude-v1", temperature: 0 });

// 包裝成 RunnableParallel
const parallelLLM = new RunnableParallel({
  gpt,
  claude,
});

// 輸入訊息
const prompt = { input: "請用繁體中文說明什麼是區塊鏈。" };

parallelLLM.invoke(prompt).then((result) => {
  console.log("GPT 回應:", result.gpt);
  console.log("Claude 回應:", result.claude);
});

重點:只要把 LLM 包裝成 Runnable,RunnableParallel 即可同時取得兩個模型的答案,省下逐一呼叫的時間。


範例 2:同時呼叫兩個外部 API(天氣與匯率)

import { Runnable } from "langchain/schema/runnable";
import fetch from "node-fetch";

class WeatherAPI extends Runnable {
  async invoke({ location }) {
    const res = await fetch(`https://api.weather.com/v3/weather/${location}`);
    return res.json();
  }
}

class ExchangeRateAPI extends Runnable {
  async invoke({ base, target }) {
    const res = await fetch(`https://api.exchangerate.host/latest?base=${base}&symbols=${target}`);
    return res.json();
  }
}

// 建立平行 Runnable
const parallelAPI = new RunnableParallel({
  weather: new WeatherAPI(),
  exchange: new ExchangeRateAPI(),
});

parallelAPI.invoke({ location: "Taipei", base: "USD", target: "TWD" })
  .then((result) => {
    console.log("天氣資料:", result.weather);
    console.log("匯率資料:", result.exchange);
  });

技巧:只要保證每個子 Runnable 接受相同的輸入型別(或在 invoke 前自行轉換),就能輕鬆平行化。


範例 3:平行向多個向量資料庫搜尋(FAISS、Pinecone)

import { FAISS } from "langchain/vectorstores/faiss";
import { PineconeStore } from "langchain/vectorstores/pinecone";
import { RunnableParallel } from "langchain/schema/runnable";

const faiss = await FAISS.fromTexts(
  ["文件 A", "文件 B"],
  [{ source: "local" }],
  new OpenAIEmbeddings()
);

const pinecone = await PineconeStore.fromTexts(
  ["文件 C", "文件 D"],
  [{ source: "cloud" }],
  new OpenAIEmbeddings(),
  { indexName: "my-index" }
);

const parallelSearch = new RunnableParallel({
  local: faiss.asRetriever(),
  cloud: pinecone.asRetriever(),
});

parallelSearch.invoke({ query: "什麼是機器學習?", k: 3 })
  .then((result) => {
    console.log("FAISS 結果:", result.local);
    console.log("Pinecone 結果:", result.cloud);
  });

說明asRetriever() 會返回一個符合 Runnable 介面的檢索器,讓向量搜尋也能加入平行流程。


範例 4:結合工具(Tool)與 LLM,平行執行多個工具

import { StructuredTool } from "langchain/tools";
import { RunnableParallel } from "langchain/schema/runnable";

class CalculatorTool extends StructuredTool {
  name = "calculator";
  description = "計算簡單的數學表達式";

  async _call({ expression }) {
    // 使用 eval 只做示範,實務請使用安全的 parser
    return eval(expression);
  }
}

class WikipediaTool extends StructuredTool {
  name = "wikipedia";
  description = "搜尋維基百科摘要";

  async _call({ query }) {
    const res = await fetch(`https://en.wikipedia.org/api/rest_v1/page/summary/${encodeURIComponent(query)}`);
    const data = await res.json();
    return data.extract;
  }
}

// 把工具包成 Runnable
const calculator = new CalculatorTool();
const wikipedia = new WikipediaTool();

const parallelTools = new RunnableParallel({
  calc: calculator,
  wiki: wikipedia,
});

parallelTools.invoke({ expression: "2+3*4", query: "Artificial intelligence" })
  .then((result) => {
    console.log("計算結果:", result.calc);
    console.log("維基摘要:", result.wiki);
  });

實務提醒:工具的 _call 必須回傳 Promise,否則無法正確被 RunnableParallel 包裝。


範例 5:自訂輸入前處理與結果後處理的組合(Chain + Parallel)

import { RunnableSequence } from "langchain/schema/runnable";
import { ChatOpenAI } from "langchain/chat_models/openai";
import { RunnableParallel } from "langchain/schema/runnable";

// 前置:把使用者問題拆成兩個子問題
const splitter = RunnableSequence.from([
  async ({ question }) => ({
    q1: `${question} 請簡短說明`,
    q2: `${question} 請提供三個相關例子`,
  })
]);

// 兩個 LLM 分別負責不同子問題
const shortAnswer = new ChatOpenAI({ temperature: 0 });
const examples = new ChatOpenAI({ temperature: 0.7 });

const parallelLLM = new RunnableParallel({
  summary: shortAnswer,
  examples: examples,
});

// 後置:把兩段回應合併成最終輸出
const merger = RunnableSequence.from([
  async ({ summary, examples }) => ({
    final: `簡短說明:${summary}\n\n相關例子:\n${examples}`
  })
]);

// 完整流程
const chain = splitter
  .pipe(parallelLLM)
  .pipe(merger);

chain.invoke({ question: "什麼是區塊鏈?" })
  .then((out) => console.log(out.final));

關鍵RunnableSequence 讓我們可以在 平行前後 加入 前處理後處理,形成完整的端到端工作流。


常見陷阱與最佳實踐

陷阱 說明 解決方案
子 Runnable 互相依賴 若兩個 Runnable 必須先後執行,直接放入 RunnableParallel 會產生錯誤結果。 使用 RunnableSequence 先做依賴處理,或把依賴的部分抽成單獨的 Runnable。
錯誤傳播 當任一子任務拋出例外,Promise.all 會立即 reject,其他任務的結果會被捨棄。 使用 Promise.allSettled 包裝 RunnableParallel,或在子 Runnable 內捕獲錯誤並回傳預設值。
資源競爭 同時呼叫大量 LLM 可能觸發 API 配額上限或網路瓶頸。 控制平行度(如 p-limit)或在 invoke 前加入 速率限制(RateLimiter)。
輸入不一致 子 Runnable 接受的參數結構不同,導致執行時缺少欄位。 RunnableParallel 前加入 Adapter Runnable,統一輸入格式。
結果命名衝突 若子 Runnable 回傳的物件屬性名稱相同,最終合併時會被覆蓋。 為每個 Runnable 指定唯一的 key(如 summary, details),或在子 Runnable 中自行命名空間。

最佳實踐

  1. 保持 Stateless:讓每個 Runnable 只依賴輸入參數,避免全域狀態造成競爭條件。
  2. 使用型別(TypeScript):明確宣告 RunnableParallel 的輸入/輸出型別,編譯階段即可捕捉不相容的錯誤。
  3. 錯誤容忍:在生產環境建議使用 Promise.allSettled 包裝,並在結果中檢查 status === "rejected"
  4. 資源管理:對於需要大量計算或 I/O 的子任務,可使用 Worker ThreadsCluster 進一步水平擴展。
  5. 日誌與監控:將每個子 Runnable 的開始、結束與耗時寫入日誌,方便後續性能分析。

實際應用場景

場景 為何需要 RunnableParallel 實作概念
多模型答案比對 同時取得不同 LLM 的回應,快速決策最佳答案 把多個 LLM 包成 Parallel,根據置信度或投票機制挑選
跨服務資料聚合(天氣 + 交通 + 事件) 使用者查詢「今天天氣與交通狀況」,需要同時呼叫 3 個 API 每個 API 封裝成 Runnable,Parallel 後合併成單一回覆
向量搜尋 + 文字搜尋 同時在本地向量庫與遠端全文檢索引擎查找相關文件 把兩個檢索器包成 Parallel,最後在 LLM 中做摘要合成
即時金融分析 同時抓取股價、匯率、新聞情緒,快速生成投資報告 各資訊源各自 Runnable,Parallel 提升報告產出速度
大型問答系統 使用子模型分工(專業領域、通用領域)同時產生答案,再由主模型進行整合 子模型作為 Parallel,主模型作為後置 Merger

總結

  • RunnableParallel 是 LangChain 中實現 高效平行化 的核心工具,讓多個 Runnable 同時執行、結果自動彙整。
  • 透過 RunnableSequenceAdapter錯誤容忍 的技巧,我們可以在不犧牲可讀性的前提下,構建 高度模組化可擴充 的 LLM 工作流。
  • 在實務上,從 多模型比對跨服務聚合向量搜尋與文字搜尋混合,RunnableParallel 都能大幅縮短回應時間並提升使用者體驗。
  • 記得遵守 資源管理錯誤處理型別安全 的最佳實踐,讓你的應用在生產環境中更穩定、更易維護。

掌握了 RunnableParallel,未來在 LangChain 的開發旅程中,你將能更自由地組合工具、模型與資料來源,打造出 即時、彈性、可靠 的 AI 應用。祝開發順利,期待看到你用 Parallel 產出的精彩作品! 🚀