LangChain 進階主題:與 Workflow 引擎整合(Airflow、Temporal)
簡介
在實務專案中,LLM(大型語言模型) 常常不只是單一的問答或文字生成,而是需要被串接進更大型的資料管線或業務流程中。單純在 Jupyter Notebook 或簡易腳本裡呼叫 LangChain 雖然方便,但面對 排程、重試、依賴管理、分散式執行 等需求時,會迅速遇到瓶頸。這時,將 LangChain 的 chain、agent 與成熟的 Workflow 引擎(如 Apache Airflow、Temporal)結合,就能讓 AI 工作流具備 可觀測、可重試、可擴展 的特性,真正落地到企業級應用。
本文將說明如何把 LangChain 嵌入 Airflow DAG 與 Temporal 工作流,提供完整的概念說明、實作範例、常見陷阱與最佳實踐,讓你能快速在自己的專案中建立可靠的 AI pipeline。
核心概念
1. 為什麼要結合 Workflow 引擎?
| 項目 | 只使用 LangChain | 結合 Workflow 引擎 |
|---|---|---|
| 排程 | 手動或自訂腳本 | 依賴 Cron / Scheduler |
| 容錯 | 需要自行撰寫 try/except | 自動重試、失敗告警 |
| 依賴管理 | 難以追蹤多個 chain 間的前後關係 | DAG / 有向圖自動管理 |
| 可觀測 | 少量日誌 | 完整 UI、Metrics、Tracing |
| 彈性伸縮 | 受限於單機資源 | 分散式 worker、併發控制 |
結論:若你的 LangChain 應用需要 定時執行、跨服務協調或高可用,Workflow 引擎是不可或缺的基礎建設。
2. Airflow 與 Temporal 的差異
| 特性 | Apache Airflow | Temporal |
|---|---|---|
| 執行模型 | DAG(有向無環圖)+ Scheduler | 活動(Activity)+ 工作流(Workflow) |
| 語言支援 | Python 為主 | 多語言(Go、Java、Python、Node.js) |
| 容錯機制 | 任務失敗即重試,可設定依賴 | 工作流狀態持久化,支援長時間執行 |
| 可見性 | Web UI 顯示 DAG 走向與執行紀錄 | Web UI(Temporal UI)提供時間線與事件追蹤 |
| 適用情境 | 批次 ETL、定時報表 | 複雜業務流程、長期協調(如訂單處理) |
選擇指引:若你已在使用 Airflow 作為 ETL 平台,直接在 DAG 中加入 LangChain 任務最省事;若需要 長時間執行、跨服務事務,Temporal 的工作流模型更適合。
3. LangChain 與 Workflow 的整合點
- Chain / Agent 作為單一任務
- 把完整的
Chain包裝成 AirflowPythonOperator或 TemporalActivity。
- 把完整的
- 多階段工作流
- 先用
LLMChain產生 prompt,再用Tool呼叫外部 API,最後把結果寫入資料庫。每一步都可以是獨立的 workflow 任務。
- 先用
- 狀態持久化
- Temporal 自動把工作流狀態寫入資料庫,讓長時間的 LLM 呼叫(如 30 分鐘的文件摘要)不會因為服務重啟而遺失。
- 重試與超時
- Airflow 可設定
retries、retry_delay;Temporal 可以在ActivityOptions中設定retry_policy、schedule_to_close_timeout。
- Airflow 可設定
程式碼範例
以下示範 Python(LangChain、Airflow、Temporal) 的實作,並以 註解 說明每一步的目的。若你偏好 Node.js,只要把 python 片段換成 javascript,概念相同。
範例 1:在 Airflow DAG 中執行一個簡單的 LLMChain
# file: dags/llm_chain_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# LangChain 必要套件
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
default_args = {
"owner": "ai-team",
"depends_on_past": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
def run_llm_chain(**context):
"""Airflow 任務:呼叫 OpenAI 完成文字摘要"""
# 1. 建立 Prompt
template = "請將以下文章摘要成 100 個字以內:\n{content}"
prompt = PromptTemplate.from_template(template)
# 2. 建立 LLM 與 Chain
llm = OpenAI(model_name="gpt-3.5-turbo", temperature=0.2)
chain = LLMChain(llm=llm, prompt=prompt)
# 3. 讀取來自前置任務的輸入(此處直接寫死示範)
article = """(此處放入長篇文章)"""
result = chain.run(content=article)
# 4. 把結果寫入 XCom,供後續任務使用
context["ti"].xcom_push(key="summary", value=result)
with DAG(
dag_id="llm_summary_workflow",
default_args=default_args,
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
summarize = PythonOperator(
task_id="summarize_article",
python_callable=run_llm_chain,
provide_context=True,
)
重點:
PythonOperator只是一個 包裝器,把 LangChain 的LLMChain當成普通函式呼叫,讓 Airflow 仍負責排程、重試與日誌。
範例 2:使用 Airflow + LangChain + 外部工具(搜尋 API)
# file: dags/search_and_answer.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
def search_api(query: str) -> str:
"""簡易搜尋 API,回傳前 3 筆結果的摘要"""
resp = requests.get(
"https://api.example.com/search",
params={"q": query, "limit": 3},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
# 假設每筆都有 'title' 與 'snippet'
return "\n".join([f"{d['title']}: {d['snippet']}" for d in data["results"]])
def answer_with_context(**context):
# 1. 取得題目(此例硬編碼)
question = "什麼是量子電腦的主要挑戰?"
# 2. 先呼叫搜尋 API,取得相關資訊
knowledge = search_api(question)
# 3. 組合 Prompt
template = """
你是一位 AI 助手,請根據以下搜尋結果,針對問題給出簡潔且正確的答案。
----
{knowledge}
----
問題:{question}
"""
prompt = PromptTemplate.from_template(template)
# 4. 呼叫 LLM
llm = OpenAI(model_name="gpt-4", temperature=0.0)
chain = LLMChain(llm=llm, prompt=prompt)
answer = chain.run(question=question, knowledge=knowledge)
# 5. 輸出結果至 XCom
context["ti"].xcom_push(key="answer", value=answer)
with DAG(
dag_id="search_and_answer",
default_args={"owner": "ai-team", "retries": 3, "retry_delay": timedelta(minutes=2)},
schedule_interval=None,
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
answer_task = PythonOperator(
task_id="generate_answer",
python_callable=answer_with_context,
provide_context=True,
)
技巧:將外部 API 包裝成 純函式,讓它可以被 Airflow 的
PythonOperator重用;同時把搜尋結果作為 Prompt 的一部份,提升答案正確性。
範例 3:在 Temporal 中建立 LLM 工作流(Python SDK)
# file: temporal_llm_workflow.py
import asyncio
from temporalio import workflow, activity
from temporalio.client import Client
# ---------- Activity ----------
@activity.defn
async def summarize_text(content: str) -> str:
"""使用 LangChain 完成文字摘要的 Activity"""
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
template = "請把以下內容濃縮成 150 個字以內:\n{content}"
prompt = PromptTemplate.from_template(template)
llm = OpenAI(model_name="gpt-3.5-turbo", temperature=0.1)
chain = LLMChain(llm=llm, prompt=prompt)
return await asyncio.to_thread(chain.run, content=content)
# ---------- Workflow ----------
@workflow.defn
class SummarizeWorkflow:
@workflow.run
async def run(self, article: str) -> str:
# 設定 Activity 選項(重試、逾時)
options = activity.ActivityOptions(
start_to_close_timeout=timedelta(seconds=30),
retry_policy=activity.RetryPolicy(
maximum_attempts=3,
initial_interval=timedelta(seconds=5),
backoff_coefficient=2.0,
),
)
# 呼叫 Activity
summary = await workflow.execute_activity(
summarize_text,
article,
schedule_to_close_timeout=timedelta(seconds=60),
retry_policy=options.retry_policy,
)
return summary
# ---------- 客戶端啟動 ----------
async def main():
client = await Client.connect("localhost:7233")
handle = await client.start_workflow(
SummarizeWorkflow.run,
"(此處放長篇文章)",
id="summarize-workflow-001",
task_queue="llm-task-queue",
)
result = await handle.result()
print("摘要結果:", result)
if __name__ == "__main__":
asyncio.run(main())
亮點
- Activity 內部使用
asyncio.to_thread把同步的 LangChain 呼叫搬到執行緒池,避免阻塞 Temporal Worker。- RetryPolicy 讓 LLM 呼叫失敗時自動重試,避免因網路抖動導致工作流失敗。
- 狀態持久化:即使
summarize_text需要 2 分鐘完成,Temporal 仍能可靠追蹤。
範例 4:結合 Temporal 與多階段 Agent(工具呼叫 + LLM)
# file: temporal_agent_workflow.py
import asyncio
from temporalio import workflow, activity
from datetime import timedelta
# ---------- Activity:呼叫外部工具 ----------
@activity.defn
async def fetch_stock_price(symbol: str) -> float:
# 假設有一個即時股價 API
import httpx
resp = await httpx.get(f"https://api.example.com/price/{symbol}")
resp.raise_for_status()
return resp.json()["price"]
# ---------- Activity:LLM 分析 ----------
@activity.defn
async def analyze_stock(price: float) -> str:
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
tmpl = "目前 {symbol} 的股價為 {price} 美元,請提供投資建議(最多 2 句)"
prompt = PromptTemplate.from_template(tmpl)
llm = OpenAI(model_name="gpt-4", temperature=0.3)
chain = LLMChain(llm=llm, prompt=prompt)
return await asyncio.to_thread(chain.run, symbol="AAPL", price=price)
# ---------- Workflow ----------
@workflow.defn
class StockAdviceWorkflow:
@workflow.run
async def run(self, symbol: str) -> str:
price = await workflow.execute_activity(
fetch_stock_price,
symbol,
start_to_close_timeout=timedelta(seconds=10),
)
advice = await workflow.execute_activity(
analyze_stock,
price,
start_to_close_timeout=timedelta(seconds=30),
)
return advice
實務意義:把 工具呼叫(如金融 API)與 LLM 分析 分成兩個 Activity,讓每個環節都能獨立重試、監控,符合企業級 可觀測性 與 容錯 要求。
範例 5:Airflow 中的「動態 DAG」結合多個 LangChain 任務
# file: dags/dynamic_llm_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from typing import List
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
def create_llm_task(task_id: str, prompt_template: str):
"""工廠函式,回傳 PythonOperator"""
def _run(**context):
prompt = PromptTemplate.from_template(prompt_template)
llm = OpenAI(model_name="gpt-3.5-turbo", temperature=0.0)
chain = LLMChain(llm=llm, prompt=prompt)
# 取得前一個任務的輸出(若無則使用預設值)
prev_output = context["ti"].xcom_pull(key="output", task_ids=context["ti"].upstream_task_ids)
input_text = prev_output or "初始輸入文字"
result = chain.run(text=input_text)
context["ti"].xcom_push(key="output", value=result)
return PythonOperator(
task_id=task_id,
python_callable=_run,
provide_context=True,
)
default_args = {"owner": "ai-team", "retries": 1, "retry_delay": timedelta(minutes=1)}
with DAG(
dag_id="dynamic_llm_pipeline",
default_args=default_args,
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False,
) as dag:
# 定義多個 Prompt(可從資料庫或 Config 讀入)
prompts: List[tuple[str, str]] = [
("step_1_clean", "請把以下文字去除所有 HTML 標籤:\n{text}"),
("step_2_summarize", "請將上述文字摘要成 100 個字以內。"),
("step_3_translate", "請把摘要翻譯成繁體中文。"),
]
# 動態產生任務
previous_task = None
for task_id, tmpl in prompts:
task = create_llm_task(task_id, tmpl)
if previous_task:
previous_task >> task # 設定前後依賴
previous_task = task
說明:此範例展示 動態產生 DAG 的技巧,讓你可以把 任務清單(Prompt 列表) 放在外部設定檔,僅透過程式碼自動產生相依 DAG,極大提升 可維護性。
常見陷阱與最佳實踐
| 陷阱 | 可能的影響 | 解決方案或最佳實踐 |
|---|---|---|
| LLM 呼叫超時 | 工作流卡住、資源浪費 | 在 Airflow 設 execution_timeout、Temporal 設 start_to_close_timeout,並使用 非同步 包裝 (asyncio.to_thread) |
| Prompt 內容過大 | 超過模型 token 限制,API 錯誤 | 先用 text_splitter 把長文本切片,或在 Workflow 中加入 前置分段 任務 |
| 重試導致重複副作用(例如寫入 DB) | 重複資料、資金重複扣款 | 使用 幂等設計:在 Activity 中檢查唯一鍵、或在 Airflow 中加入 trigger_rule="all_success" |
| 工作流狀態不透明 | 難以除錯、無法快速定位失敗點 | 利用 Airflow UI 的 Task Log、Temporal UI 的 Event History,同時在程式碼內加入 logging(建議使用 structlog) |
| 依賴過多的外部服務 | 單點失效、整體延遲 | 把外部服務呼叫抽離成獨立 Activity,並在 Workflow 中設定 fallback(如備援 API) |
| 模型金鑰硬編碼 | 安全風險、部署困難 | 使用 環境變數、Secret Manager(AWS Secrets Manager、GCP Secret Manager)或 Airflow 的 Connection 機制 |
其他實務建議
- 版本管理:將 LangChain、Airflow、Temporal 的依賴鎖定在
requirements.txt,避免升級破壞工作流。 - CI/CD:在 CI pipeline 中加入 單元測試(mock LLM 回傳)與 整合測試(測試 DAG 能否正確排程)。
- 觀測:結合 OpenTelemetry,把 LLM 呼叫的 latency、token 數量寫入 Metrics,方便後續成本分析。
- 資源分離:在 Kubernetes 上部署 Airflow worker 與 Temporal worker,分別設定 GPU / CPU 配置,避免互相競爭資源。
實際應用場景
| 場景 | 為什麼需要 Workflow + LangChain | 典型流程 |
|---|---|---|
| 金融報表自動化 | 每日抓取市場資料、生成文字分析、發送 Email,必須保證 準時且不遺漏。 | Airflow DAG: 1. 抓取行情 API → 2. LLM 產生報表文字 → 3. PDF 產出 → 4. 寄送給主管 |
| 客服聊天記錄總結 | 客服通話結束後,需要把對話摘要寫入 CRM,且必須 容錯(通話斷線、API 異常)。 | Temporal 工作流: 1. 收集聊天 JSON → 2. LLM 產生摘要 → 3. 呼叫 CRM API → 4. 若失敗則重試 3 次 |
| 內容審核與翻譯 | 多語系平台每天上傳上千篇文章,需要先 審核 再 翻譯,且每一步都有獨立的審核人員介入。 | Airflow + BranchPythonOperator: 1. LLM 判斷是否違規 → 2. 若合格則觸發翻譯 DAG → 3. 翻譯完成後發佈 |
| 科學文獻自動摘要 | 研究機構每天產出大量 PDF,需把關鍵結論抽取出來供內部搜尋引擎使用。 | Temporal: 1. PDF 解析 → 2. LLM 摘要(長時間) → 3. 寫入 ElasticSearch → 4. 成功通知 Slack |
| IoT 故障診斷 | 大量感測器資料進來後,需要即時判斷是否異常並產生維修建議。 | Airflow + Sensor: 1. Sensor 偵測新資料 → 2. LLM 產生故障說明 → 3. 發送工單至 ServiceNow |
關鍵觀點:所有需要 排程、容錯、可觀測 的 AI 任務,都可以透過上述方式把 LangChain 包裝成 Workflow 中的「原子」或「子流程」,從而提升系統的 可靠性 與 維運效率。
總結
- LangChain 提供了建構 LLM 應用的抽象層,讓我們可以專注於 Prompt、Tool、Chain 的設計。
- Workflow 引擎(Airflow、Temporal)則負責 排程、重試、依賴管理、觀測,兩者結合即能產出 企業級 AI 服務。
- 透過 PythonOperator 或 Activity 的方式,把每一個
Chain、Agent包裝成獨立任務,可讓整個 pipeline 具備 彈性伸縮與容錯。 - 實作時要注意 超時、幂等、Prompt 大小、金鑰管理,同時加入 日誌與 Metrics,才能在生產環境中長期穩定運作。
把本文提供的範例直接搬進你的專案,從 每日報表、客服摘要 到 科學文獻搜尋,都能快速上線,並在未來隨著模型升級或業務變更,只需要調整 Prompt 或工作流的依賴,即可保持系統的 可維護性 與 可擴展性。祝開發順利,讓 AI 成為你工作流的核心推手!