LangChain 部署與應用整合 – WebSocket 即時推流整合
簡介
在聊天機器人、文件問答或程式碼輔助等應用中,即時回應往往是使用者體驗的關鍵。傳統的 HTTP 請求需要等到後端完整產出結果才回傳,會產生明顯的延遲感;而 WebSocket 則提供了雙向、持久的連線,使伺服器能在生成答案的同時逐段 推流 給前端,使用者即可看到「打字」的效果,感受到「即時」的互動。
LangChain 作為 LLM(大型語言模型)應用的組件化框架,已支援 Streaming(串流)回傳。將 LangChain 的串流功能與 WebSocket 整合,不僅能提升回應速度,還能在多使用者環境下保持高併發、低資源消耗。本篇文章將從概念說明、實作範例、常見陷阱與最佳實踐,帶你一步一步完成 WebSocket 即時推流整合,讓你的 LangChain 應用即刻變得更具互動性。
核心概念
1. 為什麼使用 WebSocket?
- 雙向即時通訊:伺服器可隨時向客戶端推送訊息,客戶端亦能即時發送指令。
- 持久連線:一次握手後維持連線,避免每次請求都要重新建立 TCP 連線的開銷。
- 低延遲:訊息以幀(frame)形式傳遞,適合 LLM 逐字/逐句產生的場景。
2. LangChain 的 Streaming 機制
在 LangChain(以 JavaScript/TypeScript 版為例)中,ChatOpenAI、LLMChain 等模型支援 streaming 參數,回傳一個 AsyncIterable,開發者可以逐段讀取文字。典型的使用方式如下:
const { ChatOpenAI } = require("langchain/chat_models/openai");
// 啟用 streaming
const llm = new ChatOpenAI({
temperature: 0,
streaming: true, // <─ 重要
});
當 streaming 為 true 時,llm.call(messages) 會返回一個 AsyncIterable<string>,每次 yield 的字串即為模型產出的新片段。
3. WebSocket Server 與 LangChain 串流的橋接
整合的核心流程:
- 客戶端 透過 WebSocket 送出對話訊息(例如
{"type":"prompt","text":"…"}")。 - 伺服器 接收到訊息後,呼叫 LangChain 串流模型,取得
AsyncIterable。 - 伺服器 逐段 讀取串流,並立即使用
ws.send()推送給客戶端。 - 客戶端收到每段文字後即時顯示,完成「打字」效果。
下面的章節會提供完整的程式碼範例。
程式碼範例
範例 1️⃣:建立最簡單的 WebSocket 伺服器
使用
ws套件(Node.js 官方推薦的 WebSocket 實作)。
// server.js
const WebSocket = require("ws");
// 監聽 8080 埠口
const wss = new WebSocket.Server({ port: 8080 });
wss.on("connection", (ws) => {
console.log("✅ 客戶端已連線");
ws.on("message", (data) => {
// 直接回 echo,稍後會改為呼叫 LangChain
ws.send(`Echo: ${data}`);
});
ws.on("close", () => console.log("🔌 連線關閉"));
});
console.log("WebSocket Server listening on ws://localhost:8080");
說明:此段程式碼只負責建立 WebSocket 連線,後續會在
message事件中加入 LangChain 串流邏輯。
範例 2️⃣:整合 LangChain 的 Streaming 功能
// llmStream.js
const { ChatOpenAI } = require("langchain/chat_models/openai");
// 建立支援 streaming 的 LLM 實例
const llm = new ChatOpenAI({
temperature: 0,
streaming: true, // 必須開啟
openAIApiKey: process.env.OPENAI_API_KEY,
});
/**
* 取得模型的 AsyncIterable 串流
* @param {string} userPrompt 使用者輸入的問題
* @returns {AsyncIterable<string>}
*/
async function* getStreamingResponse(userPrompt) {
const messages = [{ role: "user", content: userPrompt }];
// LangChain 會回傳 AsyncIterable,每次 yield 一段文字
for await (const chunk of llm.call(messages)) {
// chunk 可能是完整句子,也可能是單字,視模型回傳頻率而定
yield chunk;
}
}
module.exports = { getStreamingResponse };
重點:
getStreamingResponse以 async generator 的形式包裝,讓呼叫端可以使用for await...of迭代每個文字片段。
範例 3️⃣:在 WebSocket 伺服器中推送串流結果
// serverWithStream.js
const WebSocket = require("ws");
const { getStreamingResponse } = require("./llmStream");
const wss = new WebSocket.Server({ port: 8080 });
wss.on("connection", (ws) => {
console.log("✅ 客戶端已連線");
ws.on("message", async (data) => {
const msg = JSON.parse(data);
if (msg.type !== "prompt") return;
const userPrompt = msg.text;
console.log(`📥 收到提示: ${userPrompt}`);
// 立即回傳一個開始訊號,讓前端顯示 loading
ws.send(JSON.stringify({ type: "start" }));
// 逐段讀取 LLM 串流,並推送給前端
for await (const chunk of getStreamingResponse(userPrompt)) {
ws.send(JSON.stringify({ type: "delta", text: chunk }));
}
// 串流結束,通知前端
ws.send(JSON.stringify({ type: "end" }));
});
ws.on("close", () => console.log("🔌 連線關閉"));
});
console.log("WebSocket + LangChain streaming listening on ws://localhost:8080");
說明
- 前端須根據
type欄位判斷訊息種類:start、delta、end。- 使用
JSON.stringify包裝訊息,可避免文字中斷行或特殊字元影響協議。for await...of讓我們能在 非阻塞 的情況下逐段發送,伺服器仍可同時處理其他連線。
範例 4️⃣:簡易的前端 WebSocket 客戶端(HTML + JavaScript)
<!DOCTYPE html>
<html lang="zh-TW">
<head>
<meta charset="UTF-8">
<title>LangChain WebSocket Demo</title>
<style>
#output { white-space: pre-wrap; font-family: monospace; }
#loading { color: #888; }
</style>
</head>
<body>
<h2>Chat with LLM (WebSocket Streaming)</h2>
<textarea id="prompt" rows="3" cols="60" placeholder="輸入問題…"></textarea><br>
<button id="sendBtn">送出</button>
<div id="loading" hidden>⏳ 回答產生中…</div>
<div id="output"></div>
<script>
const ws = new WebSocket("ws://localhost:8080");
const output = document.getElementById("output");
const loading = document.getElementById("loading");
const sendBtn = document.getElementById("sendBtn");
const prompt = document.getElementById("prompt");
ws.onopen = () => console.log("✅ WebSocket 已連線");
ws.onclose = () => console.log("🔌 WebSocket 已關閉");
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
switch (msg.type) {
case "start":
output.textContent = ""; // 清空舊內容
loading.hidden = false;
break;
case "delta":
output.textContent += msg.text; // 逐段累加
break;
case "end":
loading.hidden = true;
break;
}
};
sendBtn.onclick = () => {
const text = prompt.value.trim();
if (!text) return;
ws.send(JSON.stringify({ type: "prompt", text }));
};
</script>
</body>
</html>
重點:前端僅需根據
type來控制 UI,不必等待完整回傳,即可呈現「打字」動畫,提升使用者黏著度。
範例 5️⃣:在多使用者環境下的資源管理(簡易示範)
在高併發情境下,每條連線都會觸發一次 LLM 呼叫,若不加限制可能耗盡 OpenAI 配額或造成服務不穩。以下示範 Semaphore(信號量)來限制同時執行的模型數量:
// semaphore.js
class Semaphore {
constructor(max) {
this.max = max;
this.current = 0;
this.queue = [];
}
async acquire() {
if (this.current < this.max) {
this.current++;
return;
}
return new Promise((resolve) => this.queue.push(resolve));
}
release() {
this.current--;
if (this.queue.length > 0) {
this.current++;
const resolve = this.queue.shift();
resolve();
}
}
}
// 在 serverWithStream.js 中使用
const semaphore = new Semaphore(3); // 同時最多三個請求
ws.on("message", async (data) => {
const msg = JSON.parse(data);
if (msg.type !== "prompt") return;
await semaphore.acquire(); // 取得執行權
try {
// 這裡放前面的串流邏輯
// ...
} finally {
semaphore.release(); // 必定釋放
}
});
說明:此機制能防止突發流量一次性呼叫過多 LLM,避免 Rate‑limit 或 計費過高 的問題。
常見陷阱與最佳實踐
| 陷阱 | 可能的後果 | 解決方案 / Best Practice |
|---|---|---|
未設定 streaming: true |
LLM 仍以一次性回傳,前端無法即時顯示 | 確認 ChatOpenAI(或其他模型)建構子中 streaming 為 true |
直接 ws.send(string) 送出未編碼的文字 |
若文字中含有換行或控制字元,WebSocket 可能斷帧 | JSON 包裝訊息,或使用 TextEncoder |
過度頻繁的 ws.send |
網路封包過多,導致延遲或瀏覽器卡頓 | 批次合併:若模型每 20ms 產出一次,可自行緩衝 2~3 個片段再一次發送 |
忘記在 end 時關閉或清理資源 |
記憶體泄漏、舊的 generator 持續佔用 CPU | 在 for await...of 完成後,明確呼叫 ws.send({type:'end'}),必要時 ws.terminate() |
| 同時過多 LLM 呼叫 | 超過服務商的速率限制(Rate‑limit)或產生成本爆炸 | 使用 Semaphore、Queue 或 Rate Limiter,並在 UI 上顯示「排隊中」 |
| 錯誤未捕捉 | 例外拋出導致 WebSocket 斷線,使用者無法得到回覆 | 在 for await...of 包裹 try/catch,錯誤時發送 {type:'error', message: err.message} 給前端 |
其他實務建議
- 心跳機制(ping/pong)
- 定時
ws.ping()保持連線,避免因 NAT/防火牆閒置斷線。
- 定時
- TLS 加密
- 生產環境務必使用
wss://(HTTPS + WebSocket),保護 API 金鑰與使用者資料。
- 生產環境務必使用
- 訊息大小限制
- OpenAI 回傳的單個 token 可能很短,累積後再發送可減少封包開銷。
- 日誌與監控
- 記錄
prompt、response length、stream duration,搭配 Grafana/Prometheus 監控延遲。
- 記錄
- 前端 UI 優化
- 使用
requestAnimationFrame或setTimeout讓文字顯示更平滑,避免一次性大量 DOM 更新。
- 使用
實際應用場景
| 場景 | 為何需要即時推流 | 整合方式概述 |
|---|---|---|
| 客服聊天機器人 | 使用者期待即時回應,避免「長時間沉默」的焦慮感 | 前端顯示「打字中」動畫,後端使用 LangChain 產生答案並逐段推送 |
| 即時文件問答(如 PDF、PPT) | 大型文件解析需較長時間,逐段回傳可讓使用者先看到關鍵摘要 | 先用 Retriever 抓取相關段落,再用 LLM 串流生成答案 |
| 程式碼補全 / IDE 插件 | 開發者需要即時看到建議,才能快速決策 | 在 VSCode 插件中透過 WebSocket 與本機 LangChain 服務通訊,實現「打字」式補全 |
| 多語言翻譯即時字幕 | 直播或會議需要即時顯示翻譯結果 | 把語音文字先送至 LLM,使用 streaming 產生翻譯,WebSocket 推送至前端字幕層 |
| 遊戲 NPC 對話 | 交互式劇情需要自然流暢的對話體驗 | 遊戲客戶端維持 WebSocket,LLM 產生對話並即時推送,提升沉浸感 |
總結
透過 WebSocket 與 LangChain Streaming 的結合,我們可以將大型語言模型的強大能力以 即時、雙向 的方式呈現在使用者面前。本文從概念說明、完整程式碼範例、常見陷阱與最佳實踐,最後列出多種實務應用情境,提供了一套 從零到上線 的完整藍圖。
- 核心要點:
- 開啟 streaming (
streaming: true) 才能取得AsyncIterable。 - 使用 WebSocket 的
send推送 JSON 包裝 的逐段文字。 - 加入 Semaphore 或 Rate Limiter 防止過度呼叫,保護資源與成本。
- 前端以
type判斷訊息類型,配合 loading / error UI,打造流暢的打字體驗。
- 開啟 streaming (
只要遵循上述步驟,你的 LangChain 應用就能在 即時性、可擴展性 與 使用者體驗 上取得顯著提升,為未來的 AI 服務奠定堅實基礎。祝開發順利,期待看到你在實務中發揮創意! 🚀