> victor
← Back to Blog
#LangGraph#AI#Python

LangGraph 入门之旅

2026-05-06·20 min read·--- views

简介

LangGraph 是由 LangChain 团队开发的低层级 Agent 编排框架,专为构建有状态(Stateful)、长时运行的 AI 工作流而设计。与传统线性 LLM Chain 不同,LangGraph 将工作流建模为有向图(Directed Graph),支持循环、条件分支、人机协作等复杂流程控制。

想象你正在指挥一场交响乐演出:传统 LLM Chain 就像演奏一首从头到尾的曲子,只能顺序播放;而 LangGraph 则像一位指挥家,可以根据现场观众的反应随时调整演奏顺序,让某个乐章重复,或者跳转到特定段落。

适用场景

与传统的线性 LLM Chain 相比,LangGraph 具有以下核心优势:

特性传统 LLM ChainLangGraph
工作流结构线性,单向执行图结构,支持循环
状态管理需手动管理内置状态持久化
条件路由实现复杂原生支持
人机协作需要额外开发内置支持 interrupt
多 Agent 协调实现困难一流支持
调试工具有限LangGraph Studio

典型应用场景:

  • 对话机器人:需要记忆多轮对话上下文
  • 自主 Agent:能够规划、使用工具、迭代思考
  • 多 Agent 系统:多个 AI 协同完成复杂任务
  • 审批工作流:需要人工审核的自动化流程
  • 研究助手:需要多步骤推理和信息检索

核心概念

Graph、State、Nodes、Edges 是 LangGraph 的四个核心组成部分:

Graph(图)

Graph 是整个工作流的蓝图,定义了 Agent 的完整逻辑结构:

StateGraph
   |-- Nodes(节点)
   |     |-- node_a
   |     |-- node_b
   |     +-- node_c
   +-- Edges(边)
         |-- START -> node_a
         |-- node_a -> node_b(条件边)
         |-- node_a -> node_c(条件边)
         +-- node_b -> END

State(状态)

State 是贯穿整个图的共享数据结构。每个节点可以读取和更新 State,更新后的 State 会传递给下一个节点。

from typing import TypedDict, Annotated
from langgraph.graph import add_messages

class MyState(TypedDict):
    messages: Annotated[list, add_messages]  # 消息列表(自动追加)
    user_name: str                            # 用户名称
    step_count: int                           # 步骤计数

重要Annotated[list, add_messages] 表示该字段使用 add_messages 作为 reducer——新消息会追加到列表而不是覆盖。

Nodes(节点)

节点是普通的 Python 函数,接收当前 State,返回更新后的 State(部分字段):

def my_node(state: MyState) -> dict:
    # 读取状态
    messages = state["messages"]

    # 执行操作...
    result = "处理结果"

    # 返回更新的字段(不需要返回所有字段)
    return {"messages": [{"role": "ai", "content": result}]}

Edges(边)

边定义节点之间的流转方式:

  • 普通边:固定路径,node_a -> node_b
  • 条件边:根据 State 动态路由,node_a -> node_b 或 node_c
  • 起始边START -> 第一个节点
  • 结束边某节点 -> END

环境搭建

安装依赖

# 创建虚拟环境(推荐)
python -m venv venv
source venv/bin/activate      # macOS/Linux

# 安装 LangGraph 和 LangChain
pip install langgraph langchain langchain-openai python-dotenv -i https://mirrors.aliyun.com/pypi/simple/

配置 API Key

在项目根目录创建 .env 文件:

# .env 文件内容
OPENAI_API_KEY=sk-xxx
OPENAI_BASE_URL=https://api.openai.com/v1

# DeepSeek 配置(国内用户推荐)
DEEPSEEK_API_KEY=sk-xxx
DEEPSEEK_BASE_URL=https://api.deepseek.com
DEEPSEEK_MODEL=deepseek-v4-pro

LangGraph 初探

一个最简单的例子——两个节点的线性工作流:

from langgraph.graph import StateGraph, START, END
from typing import TypedDict

# Step 1: 定义 State
class SimpleState(TypedDict):
    message: str
    processed: bool

# Step 2: 定义节点函数
def greet_node(state: SimpleState) -> dict:
    """欢迎节点:生成问候语"""
    return {"message": f"你好!{state['message']}"}

def process_node(state: SimpleState) -> dict:
    """处理节点:标记为已处理"""
    return {"processed": True}

# Step 3: 构建图
builder = StateGraph(SimpleState)

# 添加节点
builder.add_node("greet", greet_node)
builder.add_node("process", process_node)

# 添加边
builder.add_edge(START, "greet")
builder.add_edge("greet", "process")
builder.add_edge("process", END)

# Step 4: 编译图
graph = builder.compile()

# Step 5: 运行
result = graph.invoke({
    "message": "世界",
    "processed": False
})

print(f"最终结果: {result}")
# {'message': '你好!世界', 'processed': True}

可视化图结构

# 在 Jupyter Notebook 中可视化
from IPython.display import Image
Image(graph.get_graph().draw_mermaid_png())

# 或者打印 Mermaid 格式
print(graph.get_graph().draw_mermaid())

State 状态管理

使用 TypedDict 定义状态

from typing import TypedDict, Annotated, Optional
from langgraph.graph.message import add_messages

class AgentState(TypedDict):
    # 消息历史(add_messages reducer 自动追加而非覆盖)
    messages: Annotated[list, add_messages]

    # 普通字段(直接覆盖)
    user_id: str
    session_id: str

    # 可选字段
    error: Optional[str]

    # 计数器(使用 operator.add 作为 reducer)
    retry_count: Annotated[int, lambda x, y: x + y]

使用 Pydantic 定义状态(推荐用于生产)

from pydantic import BaseModel, Field
from typing import Annotated
from langgraph.graph.message import add_messages

class ProductionState(BaseModel):
    messages: Annotated[list, add_messages] = Field(default_factory=list)
    user_id: str = ""
    confidence_score: float = 0.0

    class Config:
        arbitrary_types_allowed = True

MessagesState(内置快捷状态)

from langgraph.graph import MessagesState

# MessagesState 等价于:
# class MessagesState(TypedDict):
#     messages: Annotated[list[AnyMessage], add_messages]

# 直接使用,无需自定义
builder = StateGraph(MessagesState)

Nodes 节点

普通函数节点

def simple_node(state: AgentState) -> dict:
    # 读取状态
    last_message = state["messages"][-1]

    # 执行操作
    response = f"收到: {last_message.content}"

    # 返回部分状态更新
    return {
        "messages": [{"role": "assistant", "content": response}]
    }

LLM 调用节点

import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage

load_dotenv()

# 使用 DeepSeek 模型
llm = ChatOpenAI(
    model=os.getenv('DEEPSEEK_MODEL', 'deepseek-v4-pro'),
    openai_api_key=os.getenv('DEEPSEEK_API_KEY'),
    openai_api_base=os.getenv('DEEPSEEK_BASE_URL', 'https://api.deepseek.com'),
    temperature=0
)

def llm_node(state: dict) -> dict:
    """调用 LLM 的节点"""
    system_prompt = SystemMessage(content="你是一个有帮助的助手。")

    # 将系统提示与对话历史合并
    messages = [system_prompt] + state["messages"]

    # 调用 LLM
    response = llm.invoke(messages)

    return {"messages": [response]}

异步节点

import asyncio

async def async_node(state: AgentState) -> dict:
    """异步节点,适合 I/O 密集型操作"""
    await asyncio.sleep(0.1)

    result = await some_async_api_call(state["messages"][-1].content)
    return {"messages": [{"role": "assistant", "content": result}]}

Edges 边与条件路由

普通边

# 固定路径:node_a 完成后始终执行 node_b
builder.add_edge("node_a", "node_b")

# 结束:node_a 完成后图结束
builder.add_edge("node_a", END)

条件边

条件边是 LangGraph 的核心功能,根据当前 State 动态决定下一步:

def route_after_llm(state: AgentState) -> str:
    """
    路由函数:根据 LLM 的最新输出决定走哪条路径
    返回值必须是已注册节点名称或 END
    """
    last_message = state["messages"][-1]

    # 如果 LLM 请求使用工具
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"

    # 否则结束
    return END

# 添加条件边
builder.add_conditional_edges(
    "llm",              # 源节点
    route_after_llm,    # 路由函数
    {
        "tools": "tool_executor",   # 返回 "tools" 时 -> tool_executor 节点
        END: END                    # 返回 END 时 -> 结束
    }
)

并行执行(Fan-out/Fan-in)

# 从一个节点并行分叉到多个节点
builder.add_edge("start_node", "branch_a")
builder.add_edge("start_node", "branch_b")
builder.add_edge("start_node", "branch_c")

# 多个节点汇聚到一个节点(Fan-in)
builder.add_edge("branch_a", "merge_node")
builder.add_edge("branch_b", "merge_node")
builder.add_edge("branch_c", "merge_node")

构建对话机器人

支持多轮对话的机器人,能够记住对话上下文:

from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage

llm = ChatOpenAI(
    model=os.getenv('DEEPSEEK_MODEL', 'deepseek-v4-pro'),
    openai_api_key=os.getenv('DEEPSEEK_API_KEY'),
    openai_api_base=os.getenv('DEEPSEEK_BASE_URL', 'https://api.deepseek.com'),
    temperature=0.7
)

SYSTEM_PROMPT = """你是一个友善、专业的 AI 助手。
你的回答应该:
- 简洁清晰
- 使用中文回复
- 在不确定时主动询问用户
"""

def chatbot_node(state: MessagesState) -> dict:
    """核心对话节点"""
    system = SystemMessage(content=SYSTEM_PROMPT)
    messages = [system] + state["messages"]
    response = llm.invoke(messages)
    return {"messages": [response]}

# 构建图
builder = StateGraph(MessagesState)
builder.add_node("chatbot", chatbot_node)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)

graph = builder.compile()

# 多轮对话函数
def chat(conversation_history: list, user_input: str) -> tuple[str, list]:
    """处理单轮对话,返回 AI 响应和更新后的历史"""
    conversation_history.append(HumanMessage(content=user_input))
    result = graph.invoke({"messages": conversation_history})
    conversation_history = result["messages"]
    ai_response = conversation_history[-1].content
    return ai_response, conversation_history

ReAct Agent

让 Agent 具备使用外部工具的能力。ReAct(Reason + Act)是最常见的 Agent 模式:LLM 思考 -> 选择工具 -> 执行工具 -> 观察结果 -> 继续思考。

定义工具

from langchain_core.tools import tool

@tool
def search_web(query: str) -> str:
    """搜索网络获取最新信息。"""
    return f"关于 '{query}' 的搜索结果:这是模拟的搜索结果..."

@tool
def calculate(expression: str) -> str:
    """计算数学表达式。"""
    import ast
    import operator

    ops = {
        ast.Add: operator.add,
        ast.Sub: operator.sub,
        ast.Mult: operator.mul,
        ast.Div: operator.truediv,
        ast.Pow: operator.pow,
        ast.USub: operator.neg,
    }

    def safe_eval(node):
        if isinstance(node, ast.Expression):
            return safe_eval(node.body)
        elif isinstance(node, ast.Constant):
            return node.value
        elif isinstance(node, ast.BinOp):
            left = safe_eval(node.left)
            right = safe_eval(node.right)
            return ops[type(node.op)](left, right)
        elif isinstance(node, ast.UnaryOp):
            operand = safe_eval(node.operand)
            return ops[type(node.op)](operand)
        else:
            raise ValueError(f"不支持的表达式类型: {type(node)}")

    try:
        tree = ast.parse(expression, mode='eval')
        result = safe_eval(tree)
        return f"计算结果: {expression} = {result}"
    except Exception as e:
        return f"计算错误: {str(e)}"

@tool
def get_weather(city: str) -> str:
    """获取指定城市的天气信息。"""
    return f"{city} 今日天气:晴,温度 22C,湿度 60%"

tools = [search_web, calculate, get_weather]

构建 ReAct Agent

from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

# 初始化 LLM 并绑定工具
llm = ChatOpenAI(
    model=os.getenv('DEEPSEEK_MODEL', 'deepseek-v4-pro'),
    openai_api_key=os.getenv('DEEPSEEK_API_KEY'),
    openai_api_base=os.getenv('DEEPSEEK_BASE_URL', 'https://api.deepseek.com'),
    temperature=0
)
llm_with_tools = llm.bind_tools(tools)

def agent_node(state: MessagesState) -> dict:
    """Agent 推理节点:调用 LLM 决定下一步行动"""
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}

# 构建 ReAct 图
builder = StateGraph(MessagesState)

# 添加节点
builder.add_node("agent", agent_node)
builder.add_node("tools", ToolNode(tools))  # 内置 ToolNode 自动处理工具调用

# 添加边
builder.add_edge(START, "agent")

# 条件路由:如果 LLM 请求工具则执行工具,否则结束
builder.add_conditional_edges(
    "agent",
    tools_condition,  # 内置路由函数
    {
        "tools": "tools",
        END: END
    }
)

# 工具执行完后返回 agent 继续推理
builder.add_edge("tools", "agent")

graph = builder.compile()

# 测试
result = graph.invoke({
    "messages": [HumanMessage(content="北京今天天气如何?另外帮我计算 1234 * 5678")]
})

流式输出工具调用过程

# 流式观察 Agent 的每一步
for chunk in graph.stream(
    {"messages": [HumanMessage(content="搜索 LangGraph 的最新特性")]},
    stream_mode="updates"
):
    for node_name, updates in chunk.items():
        print(f"\n=== 节点: {node_name} ===")
        for msg in updates.get("messages", []):
            if hasattr(msg, "tool_calls") and msg.tool_calls:
                for tc in msg.tool_calls:
                    print(f"  -> 调用工具: {tc['name']}({tc['args']})")
            else:
                print(f"  -> 输出: {msg.content[:200] if msg.content else '(无内容)'}")

提示ToolNodetools_condition 是 LangGraph 的内置组件。ToolNode 自动解析 LLM 返回的工具调用并执行对应的工具函数;tools_condition 是一个路由函数,检查 LLM 输出是否包含工具调用请求。

Human-in-the-Loop

LangGraph 原生支持在工作流执行过程中暂停,等待人工审核或输入。

使用 interrupt 暂停执行

from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver

def sensitive_action_node(state: MessagesState) -> dict:
    """执行敏感操作前请求人工审批"""
    last_msg = state["messages"][-1].content

    # 暂停图的执行,等待人工决策
    human_decision = interrupt({
        "question": "是否批准执行以下操作?",
        "action": last_msg,
        "risk_level": "中等"
    })

    if human_decision == "approve":
        return {"messages": [{"role": "assistant", "content": "操作已批准并执行完毕。"}]}
    else:
        return {"messages": [{"role": "assistant", "content": "操作已取消。"}]}

# 必须使用 checkpointer 才能支持 interrupt
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

在边上设置断点

# 另一种方式:在编译时指定断点
graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_before=["sensitive_node"],   # 执行该节点前暂停
    # interrupt_after=["review_node"],     # 执行该节点后暂停
)

重要checkpointer 是实现 interrupt 功能的前提条件。

持久化内存

内存存储(适合开发测试)

from langgraph.checkpoint.memory import MemorySaver

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# 使用 thread_id 区分不同会话
config_user_a = {"configurable": {"thread_id": "user-alice"}}

# Alice 的对话
graph.invoke({"messages": [HumanMessage(content="我叫 Alice")]}, config=config_user_a)
graph.invoke({"messages": [HumanMessage(content="我叫什么名字?")]}, config=config_user_a)
# Agent 能记住:你叫 Alice

SQLite 持久化存储(适合本地项目)

# pip install langgraph-checkpoint-sqlite

from langgraph.checkpoint.sqlite import SqliteSaver

# 数据持久化到文件,程序重启后对话历史仍存在
with SqliteSaver.from_conn_string("./chat_memory.db") as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)

    config = {"configurable": {"thread_id": "persistent-chat"}}

    # 第一次运行
    graph.invoke({"messages": [HumanMessage(content="我叫张三")]}, config=config)

    # 程序重启后再次运行,记忆仍然存在
    result = graph.invoke(
        {"messages": [HumanMessage(content="你还记得我叫什么吗?")]},
        config=config
    )

查看对话历史

# 获取某个 thread 的完整状态历史
history = list(graph.get_state_history(config))

for snapshot in history:
    print(f"时间: {snapshot.created_at}")
    print(f"消息数: {len(snapshot.values['messages'])}")
    print("---")

# 获取当前状态
current_state = graph.get_state(config)
print(f"当前消息数: {len(current_state.values['messages'])}")

多 Agent 系统

LangGraph 擅长协调多个专门化的 Agent 协同工作。

主从架构(Supervisor Pattern)

from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage

# 定义专家 Agent
def research_agent(state: MessagesState) -> dict:
    """研究 Agent:负责信息收集"""
    system = SystemMessage(content="你是一个专业的研究员,负责收集和整理信息。请简洁地总结关键信息。")
    response = llm.invoke([system] + state["messages"])
    return {"messages": [response]}

def writing_agent(state: MessagesState) -> dict:
    """写作 Agent:负责内容创作"""
    system = SystemMessage(content="你是一个专业的写作者,负责根据已有信息撰写内容。请保持内容清晰流畅。")
    response = llm.invoke([system] + state["messages"])
    return {"messages": [response]}

def review_agent(state: MessagesState) -> dict:
    """审校 Agent:负责质量控制"""
    system = SystemMessage(content="你是一个专业的编辑,负责审核和改进内容质量。请指出问题并给出改进建议。")
    response = llm.invoke([system] + state["messages"])
    return {"messages": [response]}

# 主管 Agent 决定流程
def supervisor_node(state: MessagesState) -> dict:
    """主管:协调各专家 Agent 的工作"""
    system = SystemMessage(content="""你是一个工作流主管。
根据任务进度决定下一步应该由哪个 Agent 处理。
分析对话历史,只返回以下之一:RESEARCH、WRITING、REVIEW、FINISH
- RESEARCH:需要收集更多信息
- WRITING:信息充足,可以开始写作
- REVIEW:写作完成,需要审核
- FINISH:任务已完成
""")
    response = llm.invoke([system] + state["messages"])
    return {"messages": [response]}

def route_by_supervisor(state: MessagesState) -> str:
    """根据主管决策路由"""
    last_msg = state["messages"][-1].content.strip().upper()

    if "RESEARCH" in last_msg:
        return "research"
    elif "WRITING" in last_msg:
        return "writing"
    elif "REVIEW" in last_msg:
        return "review"
    else:
        return END

# 构建多 Agent 图
builder = StateGraph(MessagesState)
builder.add_node("supervisor", supervisor_node)
builder.add_node("research", research_agent)
builder.add_node("writing", writing_agent)
builder.add_node("review", review_agent)

builder.add_edge(START, "supervisor")
builder.add_conditional_edges("supervisor", route_by_supervisor)

# 每个专家完成后返回主管
for agent in ["research", "writing", "review"]:
    builder.add_edge(agent, "supervisor")

graph = builder.compile()

子图(Subgraph)

将复杂子流程封装为子图,在主图中复用:

# 将复杂子流程封装为子图,在主图中复用
sub_builder = StateGraph(MessagesState)
sub_builder.add_node("step1", step1_node)
sub_builder.add_node("step2", step2_node)
sub_builder.add_edge(START, "step1")
sub_builder.add_edge("step1", "step2")
sub_builder.add_edge("step2", END)
sub_graph = sub_builder.compile()

# 在主图中使用子图
main_builder = StateGraph(MessagesState)
main_builder.add_node("preprocessing", preprocess_node)
main_builder.add_node("sub_workflow", sub_graph)  # 直接使用编译好的子图
main_builder.add_node("postprocessing", postprocess_node)

main_builder.add_edge(START, "preprocessing")
main_builder.add_edge("preprocessing", "sub_workflow")
main_builder.add_edge("sub_workflow", "postprocessing")
main_builder.add_edge("postprocessing", END)

main_graph = main_builder.compile()

可视化调试

LangGraph Studio 是官方提供的可视化开发环境。

安装和启动

pip install langgraph-cli -i https://mirrors.aliyun.com/pypi/simple/

# 创建 langgraph.json 配置文件
# {
#   "dependencies": ["."],
#   "graphs": {
#     "my_agent": "./my_agent.py:graph"
#   },
#   "env": ".env"
# }

langgraph dev
# 启动后访问 http://localhost:8123

Studio 主要功能

  • 实时可视化:图形化展示节点执行过程
  • 状态检查:在任意节点暂停查看当前 State
  • 时间旅行:回放历史执行步骤
  • 热重载:修改代码后自动更新图结构

环境要求:LangGraph Studio 需要 Docker 环境支持。

最佳实践

状态设计要点

  • 保持 State 精简,只包含必要字段
  • 为复杂字段定义明确的 reducer(如 add_messages
  • 使用 Pydantic 模型在生产环境中验证状态类型
  • 避免在 State 中存储过大的对象

节点设计要点

  • 每个节点职责单一,便于测试和复用
  • 节点函数应该是幂等的(相同输入产生相同输出)
  • 避免在节点中直接修改传入的 state,而是返回新值
  • 合理使用异步节点处理 I/O 密集型操作

错误处理

def robust_node(state: AgentState) -> dict:
    try:
        result = risky_operation(state)
        return {"messages": [result], "error": None}
    except Exception as e:
        return {
            "error": str(e),
            "messages": [{"role": "assistant", "content": f"操作失败: {e}"}]
        }

避免无限循环

def route_with_limit(state: AgentState) -> str:
    # 设置最大重试次数,防止无限循环
    if state.get("retry_count", 0) >= 3:
        return END

    if needs_retry(state):
        return "retry_node"
    return END

常见问题 FAQ

Q1:节点返回值格式不对怎么办?

# 错误:直接修改 state 对象
def bad_node(state):
    state["messages"].append(...)
    return state

# 正确:返回需要更新的字段
def good_node(state):
    return {"messages": [new_message]}

Q2:如何在节点之间传递临时数据?

将临时数据加入 State 定义,或使用下划线前缀约定为内部字段:

class PublicState(TypedDict):
    messages: list  # 对外暴露

class PrivateState(TypedDict):
    _internal_cache: dict  # 以下划线开头约定为内部使用

Q3:如何调试节点执行过程?

# 使用 stream 模式观察每个节点的输出
for event in graph.stream(initial_state, stream_mode="updates"):
    for node_name, state_update in event.items():
        print(f"[节点: {node_name}]")
        print(f"更新: {state_update}")

总结

概念说明关键代码
StateGraph有向图工作流引擎,LangGraph 的核心StateGraph(MyState)
State节点间共享的状态数据结构TypedDict + Annotated
Nodes执行具体操作的函数节点builder.add_node()
Edges节点间的流转路径,支持条件分支add_edge() / add_conditional_edges()
ReAct Agent推理+行动的循环模式ToolNode + tools_condition
Human-in-Loop人工审批与介入机制interrupt() + Command(resume=)
持久化会话记忆与状态保存MemorySaver / SqliteSaver
多 Agent多专家协作系统Supervisor Pattern

开源地址:https://github.com/langchain-ai/langgraph

AI Assistant

Comments (0)

Sign in to leave a comment

No comments yet. Be the first to comment!