本文 AI 產出,尚未審核

LangChain 進階主題:與 Workflow 引擎整合(Airflow、Temporal)


簡介

在實務專案中,LLM(大型語言模型) 常常不只是單一的問答或文字生成,而是需要被串接進更大型的資料管線或業務流程中。單純在 Jupyter Notebook 或簡易腳本裡呼叫 LangChain 雖然方便,但面對 排程、重試、依賴管理、分散式執行 等需求時,會迅速遇到瓶頸。這時,將 LangChain 的 chain、agent 與成熟的 Workflow 引擎(如 Apache Airflow、Temporal)結合,就能讓 AI 工作流具備 可觀測、可重試、可擴展 的特性,真正落地到企業級應用。

本文將說明如何把 LangChain 嵌入 Airflow DAG 與 Temporal 工作流,提供完整的概念說明、實作範例、常見陷阱與最佳實踐,讓你能快速在自己的專案中建立可靠的 AI pipeline。


核心概念

1. 為什麼要結合 Workflow 引擎?

項目 只使用 LangChain 結合 Workflow 引擎
排程 手動或自訂腳本 依賴 Cron / Scheduler
容錯 需要自行撰寫 try/except 自動重試、失敗告警
依賴管理 難以追蹤多個 chain 間的前後關係 DAG / 有向圖自動管理
可觀測 少量日誌 完整 UI、Metrics、Tracing
彈性伸縮 受限於單機資源 分散式 worker、併發控制

結論:若你的 LangChain 應用需要 定時執行、跨服務協調或高可用,Workflow 引擎是不可或缺的基礎建設。

2. Airflow 與 Temporal 的差異

特性 Apache Airflow Temporal
執行模型 DAG(有向無環圖)+ Scheduler 活動(Activity)+ 工作流(Workflow)
語言支援 Python 為主 多語言(Go、Java、Python、Node.js)
容錯機制 任務失敗即重試,可設定依賴 工作流狀態持久化,支援長時間執行
可見性 Web UI 顯示 DAG 走向與執行紀錄 Web UI(Temporal UI)提供時間線與事件追蹤
適用情境 批次 ETL、定時報表 複雜業務流程、長期協調(如訂單處理)

選擇指引:若你已在使用 Airflow 作為 ETL 平台,直接在 DAG 中加入 LangChain 任務最省事;若需要 長時間執行、跨服務事務,Temporal 的工作流模型更適合。

3. LangChain 與 Workflow 的整合點

  1. Chain / Agent 作為單一任務
    • 把完整的 Chain 包裝成 Airflow PythonOperator 或 Temporal Activity
  2. 多階段工作流
    • 先用 LLMChain 產生 prompt,再用 Tool 呼叫外部 API,最後把結果寫入資料庫。每一步都可以是獨立的 workflow 任務。
  3. 狀態持久化
    • Temporal 自動把工作流狀態寫入資料庫,讓長時間的 LLM 呼叫(如 30 分鐘的文件摘要)不會因為服務重啟而遺失。
  4. 重試與超時
    • Airflow 可設定 retriesretry_delay;Temporal 可以在 ActivityOptions 中設定 retry_policyschedule_to_close_timeout

程式碼範例

以下示範 Python(LangChain、Airflow、Temporal) 的實作,並以 註解 說明每一步的目的。若你偏好 Node.js,只要把 python 片段換成 javascript,概念相同。

範例 1:在 Airflow DAG 中執行一個簡單的 LLMChain

# file: dags/llm_chain_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# LangChain 必要套件
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

default_args = {
    "owner": "ai-team",
    "depends_on_past": False,
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

def run_llm_chain(**context):
    """Airflow 任務:呼叫 OpenAI 完成文字摘要"""
    # 1. 建立 Prompt
    template = "請將以下文章摘要成 100 個字以內:\n{content}"
    prompt = PromptTemplate.from_template(template)

    # 2. 建立 LLM 與 Chain
    llm = OpenAI(model_name="gpt-3.5-turbo", temperature=0.2)
    chain = LLMChain(llm=llm, prompt=prompt)

    # 3. 讀取來自前置任務的輸入(此處直接寫死示範)
    article = """(此處放入長篇文章)"""
    result = chain.run(content=article)

    # 4. 把結果寫入 XCom,供後續任務使用
    context["ti"].xcom_push(key="summary", value=result)

with DAG(
    dag_id="llm_summary_workflow",
    default_args=default_args,
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    summarize = PythonOperator(
        task_id="summarize_article",
        python_callable=run_llm_chain,
        provide_context=True,
    )

重點PythonOperator 只是一個 包裝器,把 LangChain 的 LLMChain 當成普通函式呼叫,讓 Airflow 仍負責排程、重試與日誌。


範例 2:使用 Airflow + LangChain + 外部工具(搜尋 API)

# file: dags/search_and_answer.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests

from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

def search_api(query: str) -> str:
    """簡易搜尋 API,回傳前 3 筆結果的摘要"""
    resp = requests.get(
        "https://api.example.com/search",
        params={"q": query, "limit": 3},
        timeout=10,
    )
    resp.raise_for_status()
    data = resp.json()
    # 假設每筆都有 'title' 與 'snippet'
    return "\n".join([f"{d['title']}: {d['snippet']}" for d in data["results"]])

def answer_with_context(**context):
    # 1. 取得題目(此例硬編碼)
    question = "什麼是量子電腦的主要挑戰?"
    # 2. 先呼叫搜尋 API,取得相關資訊
    knowledge = search_api(question)

    # 3. 組合 Prompt
    template = """
    你是一位 AI 助手,請根據以下搜尋結果,針對問題給出簡潔且正確的答案。
    ----
    {knowledge}
    ----
    問題:{question}
    """
    prompt = PromptTemplate.from_template(template)

    # 4. 呼叫 LLM
    llm = OpenAI(model_name="gpt-4", temperature=0.0)
    chain = LLMChain(llm=llm, prompt=prompt)
    answer = chain.run(question=question, knowledge=knowledge)

    # 5. 輸出結果至 XCom
    context["ti"].xcom_push(key="answer", value=answer)

with DAG(
    dag_id="search_and_answer",
    default_args={"owner": "ai-team", "retries": 3, "retry_delay": timedelta(minutes=2)},
    schedule_interval=None,
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    answer_task = PythonOperator(
        task_id="generate_answer",
        python_callable=answer_with_context,
        provide_context=True,
    )

技巧:將外部 API 包裝成 純函式,讓它可以被 Airflow 的 PythonOperator 重用;同時把搜尋結果作為 Prompt 的一部份,提升答案正確性。


範例 3:在 Temporal 中建立 LLM 工作流(Python SDK)

# file: temporal_llm_workflow.py
import asyncio
from temporalio import workflow, activity
from temporalio.client import Client

# ---------- Activity ----------
@activity.defn
async def summarize_text(content: str) -> str:
    """使用 LangChain 完成文字摘要的 Activity"""
    from langchain.llms import OpenAI
    from langchain.prompts import PromptTemplate
    from langchain.chains import LLMChain

    template = "請把以下內容濃縮成 150 個字以內:\n{content}"
    prompt = PromptTemplate.from_template(template)

    llm = OpenAI(model_name="gpt-3.5-turbo", temperature=0.1)
    chain = LLMChain(llm=llm, prompt=prompt)

    return await asyncio.to_thread(chain.run, content=content)

# ---------- Workflow ----------
@workflow.defn
class SummarizeWorkflow:
    @workflow.run
    async def run(self, article: str) -> str:
        # 設定 Activity 選項(重試、逾時)
        options = activity.ActivityOptions(
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=activity.RetryPolicy(
                maximum_attempts=3,
                initial_interval=timedelta(seconds=5),
                backoff_coefficient=2.0,
            ),
        )
        # 呼叫 Activity
        summary = await workflow.execute_activity(
            summarize_text,
            article,
            schedule_to_close_timeout=timedelta(seconds=60),
            retry_policy=options.retry_policy,
        )
        return summary

# ---------- 客戶端啟動 ----------
async def main():
    client = await Client.connect("localhost:7233")
    handle = await client.start_workflow(
        SummarizeWorkflow.run,
        "(此處放長篇文章)",
        id="summarize-workflow-001",
        task_queue="llm-task-queue",
    )
    result = await handle.result()
    print("摘要結果:", result)

if __name__ == "__main__":
    asyncio.run(main())

亮點

  1. Activity 內部使用 asyncio.to_thread 把同步的 LangChain 呼叫搬到執行緒池,避免阻塞 Temporal Worker。
  2. RetryPolicy 讓 LLM 呼叫失敗時自動重試,避免因網路抖動導致工作流失敗。
  3. 狀態持久化:即使 summarize_text 需要 2 分鐘完成,Temporal 仍能可靠追蹤。

範例 4:結合 Temporal 與多階段 Agent(工具呼叫 + LLM)

# file: temporal_agent_workflow.py
import asyncio
from temporalio import workflow, activity
from datetime import timedelta

# ---------- Activity:呼叫外部工具 ----------
@activity.defn
async def fetch_stock_price(symbol: str) -> float:
    # 假設有一個即時股價 API
    import httpx
    resp = await httpx.get(f"https://api.example.com/price/{symbol}")
    resp.raise_for_status()
    return resp.json()["price"]

# ---------- Activity:LLM 分析 ----------
@activity.defn
async def analyze_stock(price: float) -> str:
    from langchain.llms import OpenAI
    from langchain.prompts import PromptTemplate
    from langchain.chains import LLMChain

    tmpl = "目前 {symbol} 的股價為 {price} 美元,請提供投資建議(最多 2 句)"
    prompt = PromptTemplate.from_template(tmpl)

    llm = OpenAI(model_name="gpt-4", temperature=0.3)
    chain = LLMChain(llm=llm, prompt=prompt)
    return await asyncio.to_thread(chain.run, symbol="AAPL", price=price)

# ---------- Workflow ----------
@workflow.defn
class StockAdviceWorkflow:
    @workflow.run
    async def run(self, symbol: str) -> str:
        price = await workflow.execute_activity(
            fetch_stock_price,
            symbol,
            start_to_close_timeout=timedelta(seconds=10),
        )
        advice = await workflow.execute_activity(
            analyze_stock,
            price,
            start_to_close_timeout=timedelta(seconds=30),
        )
        return advice

實務意義:把 工具呼叫(如金融 API)與 LLM 分析 分成兩個 Activity,讓每個環節都能獨立重試、監控,符合企業級 可觀測性容錯 要求。


範例 5:Airflow 中的「動態 DAG」結合多個 LangChain 任務

# file: dags/dynamic_llm_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from typing import List

from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

def create_llm_task(task_id: str, prompt_template: str):
    """工廠函式,回傳 PythonOperator"""
    def _run(**context):
        prompt = PromptTemplate.from_template(prompt_template)
        llm = OpenAI(model_name="gpt-3.5-turbo", temperature=0.0)
        chain = LLMChain(llm=llm, prompt=prompt)

        # 取得前一個任務的輸出(若無則使用預設值)
        prev_output = context["ti"].xcom_pull(key="output", task_ids=context["ti"].upstream_task_ids)
        input_text = prev_output or "初始輸入文字"
        result = chain.run(text=input_text)
        context["ti"].xcom_push(key="output", value=result)

    return PythonOperator(
        task_id=task_id,
        python_callable=_run,
        provide_context=True,
    )

default_args = {"owner": "ai-team", "retries": 1, "retry_delay": timedelta(minutes=1)}

with DAG(
    dag_id="dynamic_llm_pipeline",
    default_args=default_args,
    start_date=datetime(2024, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:

    # 定義多個 Prompt(可從資料庫或 Config 讀入)
    prompts: List[tuple[str, str]] = [
        ("step_1_clean", "請把以下文字去除所有 HTML 標籤:\n{text}"),
        ("step_2_summarize", "請將上述文字摘要成 100 個字以內。"),
        ("step_3_translate", "請把摘要翻譯成繁體中文。"),
    ]

    # 動態產生任務
    previous_task = None
    for task_id, tmpl in prompts:
        task = create_llm_task(task_id, tmpl)
        if previous_task:
            previous_task >> task  # 設定前後依賴
        previous_task = task

說明:此範例展示 動態產生 DAG 的技巧,讓你可以把 任務清單(Prompt 列表) 放在外部設定檔,僅透過程式碼自動產生相依 DAG,極大提升 可維護性


常見陷阱與最佳實踐

陷阱 可能的影響 解決方案或最佳實踐
LLM 呼叫超時 工作流卡住、資源浪費 在 Airflow 設 execution_timeout、Temporal 設 start_to_close_timeout,並使用 非同步 包裝 (asyncio.to_thread)
Prompt 內容過大 超過模型 token 限制,API 錯誤 先用 text_splitter 把長文本切片,或在 Workflow 中加入 前置分段 任務
重試導致重複副作用(例如寫入 DB) 重複資料、資金重複扣款 使用 幂等設計:在 Activity 中檢查唯一鍵、或在 Airflow 中加入 trigger_rule="all_success"
工作流狀態不透明 難以除錯、無法快速定位失敗點 利用 Airflow UI 的 Task Log、Temporal UI 的 Event History,同時在程式碼內加入 logging(建議使用 structlog
依賴過多的外部服務 單點失效、整體延遲 把外部服務呼叫抽離成獨立 Activity,並在 Workflow 中設定 fallback(如備援 API)
模型金鑰硬編碼 安全風險、部署困難 使用 環境變數、Secret Manager(AWS Secrets Manager、GCP Secret Manager)或 Airflow 的 Connection 機制

其他實務建議

  1. 版本管理:將 LangChain、Airflow、Temporal 的依賴鎖定在 requirements.txt,避免升級破壞工作流。
  2. CI/CD:在 CI pipeline 中加入 單元測試(mock LLM 回傳)與 整合測試(測試 DAG 能否正確排程)。
  3. 觀測:結合 OpenTelemetry,把 LLM 呼叫的 latency、token 數量寫入 Metrics,方便後續成本分析。
  4. 資源分離:在 Kubernetes 上部署 Airflow worker 與 Temporal worker,分別設定 GPU / CPU 配置,避免互相競爭資源。

實際應用場景

場景 為什麼需要 Workflow + LangChain 典型流程
金融報表自動化 每日抓取市場資料、生成文字分析、發送 Email,必須保證 準時且不遺漏 Airflow DAG:
1. 抓取行情 API → 2. LLM 產生報表文字 → 3. PDF 產出 → 4. 寄送給主管
客服聊天記錄總結 客服通話結束後,需要把對話摘要寫入 CRM,且必須 容錯(通話斷線、API 異常)。 Temporal 工作流:
1. 收集聊天 JSON → 2. LLM 產生摘要 → 3. 呼叫 CRM API → 4. 若失敗則重試 3 次
內容審核與翻譯 多語系平台每天上傳上千篇文章,需要先 審核翻譯,且每一步都有獨立的審核人員介入。 Airflow + BranchPythonOperator:
1. LLM 判斷是否違規 → 2. 若合格則觸發翻譯 DAG → 3. 翻譯完成後發佈
科學文獻自動摘要 研究機構每天產出大量 PDF,需把關鍵結論抽取出來供內部搜尋引擎使用。 Temporal:
1. PDF 解析 → 2. LLM 摘要(長時間) → 3. 寫入 ElasticSearch → 4. 成功通知 Slack
IoT 故障診斷 大量感測器資料進來後,需要即時判斷是否異常並產生維修建議。 Airflow + Sensor:
1. Sensor 偵測新資料 → 2. LLM 產生故障說明 → 3. 發送工單至 ServiceNow

關鍵觀點:所有需要 排程、容錯、可觀測 的 AI 任務,都可以透過上述方式把 LangChain 包裝成 Workflow 中的「原子」或「子流程」,從而提升系統的 可靠性維運效率


總結

  • LangChain 提供了建構 LLM 應用的抽象層,讓我們可以專注於 Prompt、Tool、Chain 的設計。
  • Workflow 引擎(Airflow、Temporal)則負責 排程、重試、依賴管理、觀測,兩者結合即能產出 企業級 AI 服務
  • 透過 PythonOperatorActivity 的方式,把每一個 ChainAgent 包裝成獨立任務,可讓整個 pipeline 具備 彈性伸縮與容錯
  • 實作時要注意 超時、幂等、Prompt 大小、金鑰管理,同時加入 日誌與 Metrics,才能在生產環境中長期穩定運作。

把本文提供的範例直接搬進你的專案,從 每日報表客服摘要科學文獻搜尋,都能快速上線,並在未來隨著模型升級或業務變更,只需要調整 Prompt 或工作流的依賴,即可保持系統的 可維護性可擴展性。祝開發順利,讓 AI 成為你工作流的核心推手!