从0到1掌握 LangGraph:并行、嵌套、状态合并一网打尽


📌 导读

你是否遇到过这些问题:

  • 看到 LangGraph 代码就头大,不知道从哪开始学?
  • 知道要用,但看不懂嵌套子图和状态合并?
  • 想写个工作流,却被各种概念绕晕?

别担心! 今天我用 3 个递进式示例,带你从零理解 LangGraph 的核心机制。

学完你将掌握:
✅ 节点、边、状态的基本用法
✅ 并行执行的实现方式
✅ 子图嵌套和状态合并技巧

无需基础,直接开干! 👇


🎯 核心概念一图看懂

在写代码之前,先记住这个公式:

1
LangGraph = 状态字典 + 节点函数 + 执行顺序

打个比方:

  • 状态(State) = 一个共享的记事本,所有节点都能读写
  • 节点(Node) = 一个个工人,每人负责修改记事本的某些内容
  • 边(Edge) = 工人之间的交接顺序

是不是简单多了?让我们开始实战!


🔰 Level 1:最简单的图(5分钟上手)

🎯 学习目标: 理解”节点如何修改状态”

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from langgraph.graph import StateGraph, START, END

def simple_workflow():
# 1️⃣ 创建图(用普通 dict 当状态)
builder = StateGraph(dict)

# 2️⃣ 定义节点函数
def add_greeting(state):
"""节点1:添加问候语"""
print(f"👋 收到输入: {state.get('input')}")
return {"output": f"Hello, {state['input']}!"}

def add_exclamation(state):
"""节点2:添加感叹号"""
print(f"❗ 收到: {state['output']}")
return {"output": state['output'] + " 🎉"}

# 3️⃣ 添加节点
builder.add_node("greet", add_greeting)
builder.add_node("excite", add_exclamation)

# 4️⃣ 定义执行顺序
builder.add_edge(START, "greet") # 开始 → greet
builder.add_edge("greet", "excite") # greet → excite
builder.add_edge("excite", END) # excite → 结束

# 5️⃣ 编译并运行
graph = builder.compile()
result = graph.invoke({"input": "World"})

print(f"✅ 最终结果: {result['output']}")
return graph

# 运行!
graph1 = simple_workflow()

📊 执行流程图

flowchart LR
    Start([▶ START]) --> Greet["greet\n加问候语"]
    Greet --> Excite["excite\n加感叹号"]
    Excite --> End([⏹ END])
    
    style Start fill:#3b82f6,stroke:#1e40af,color:#fff
    style End fill:#ef4444,stroke:#b91c1c,color:#fff
    style Greet fill:#10b981,stroke:#059669,color:#fff
    style Excite fill:#f59e0b,stroke:#d97706,color:#fff

🖨️ 运行结果

1
2
3
👋 收到输入: World
❗ 收到: Hello, World!
✅ 最终结果: Hello, World! 🎉

💡 核心要点

1. 节点函数怎么写?

1
2
3
4
5
6
7
8
9
def 节点名(state):
# 读取状态
data = state["某个key"]

# 处理逻辑
result = 处理(data)

# 返回要更新的字段(注意:只返回变化的部分!)
return {"新的key": result}

2. 状态如何流动?

1
2
3
4
5
初始状态: {"input": "World"}
↓ greet 节点
中间状态: {"input": "World", "output": "Hello, World!"}
↓ excite 节点
最终状态: {"input": "World", "output": "Hello, World! 🎉"}

📝 小结: 每个节点只负责修改部分字段,LangGraph 会自动合并所有更新。


🔀 Level 2:并行分支(同时做两件事)

🎯 学习目标: 理解”如何让多个节点同时执行”

场景引入

假设你要分析一段文本:

  • 既要判断情感(积极/消极)
  • 又要统计词数

串行做法: 先情感分析 → 再词数统计(慢!)
并行做法: 两个任务同时开始(快!)

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from langgraph.graph import StateGraph, START, END

def parallel_workflow():
builder = StateGraph(dict)

# 节点1:情感分析
def analyze_sentiment(state):
text = state["input"]
sentiment = "😊 积极" if "good" in text.lower() else "😐 中性"
print(f"📊 情感分析结果: {sentiment}")
return {"sentiment": sentiment}

# 节点2:词数统计
def count_words(state):
word_count = len(state["input"].split())
print(f"🔢 词数统计结果: {word_count}")
return {"word_count": word_count}

# 节点3:合并结果
def merge_results(state):
# 这里可以访问前面两个节点的输出
report = f"分析: {state['sentiment']}, 词数: {state['word_count']}"
print(f"📝 生成报告: {report}")
return {"report": report}

# 添加节点
builder.add_node("sentiment", analyze_sentiment)
builder.add_node("counter", count_words)
builder.add_node("merge", merge_results)

# 🔥 关键:从 START 连出两条边(并行)
builder.add_edge(START, "sentiment")
builder.add_edge(START, "counter") # 注意:也是从 START 出发!

# 两个并行节点都完成后,进入 merge
builder.add_edge("sentiment", "merge")
builder.add_edge("counter", "merge")
builder.add_edge("merge", END)

graph = builder.compile()
result = graph.invoke({"input": "This is good news"})

print(f"✅ 最终报告: {result['report']}")
return graph

graph2 = parallel_workflow()

📊 执行流程图

flowchart TB
    Start([▶ START]) --> Sentiment["sentiment\n情感分析"]
    Start --> Counter["counter\n词数统计"]
    
    Sentiment --> Merge["merge\n合并结果"]
    Counter --> Merge
    
    Merge --> End([⏹ END])
    
    style Start fill:#3b82f6,stroke:#1e40af,color:#fff
    style End fill:#ef4444,stroke:#b91c1c,color:#fff
    style Sentiment fill:#8b5cf6,stroke:#7c3aed,color:#fff
    style Counter fill:#ec4899,stroke:#db2777,color:#fff
    style Merge fill:#10b981,stroke:#059669,color:#fff

🖨️ 运行结果

1
2
3
4
📊 情感分析结果: 😊 积极
🔢 词数统计结果: 4
📝 生成报告: 分析: 😊 积极, 词数: 4
✅ 最终报告: 分析: 😊 积极, 词数: 4

💡 核心要点

1. 如何创建并行分支?

1
2
3
# 从同一个节点连出多条边
builder.add_edge(START, "节点A")
builder.add_edge(START, "节点B") # 也是从 START 出发

2. 并行节点如何同步?

  • LangGraph 会自动等待所有指向 merge 的边都完成后,才执行 merge 节点
  • 无需手动加锁或等待

3. merge 节点如何获取数据?

1
2
3
4
5
def merge_results(state):
# state 已经包含了 sentiment 和 counter 的输出
sentiment = state["sentiment"] # 来自节点A
word_count = state["word_count"] # 来自节点B
return {"report": ...}

📝 小结: 并行 = 从同一节点连出多条边,LangGraph 自动处理同步。


🎪 Level 3:嵌套子图(模块化复用)

🎯 学习目标: 理解”如何把复杂逻辑拆分成子图”

场景引入

想象你要开发一个日志分析系统:

  • 子任务1: 失败分析(筛选错误日志 + 生成总结)
  • 子任务2: 问题汇总(统计所有问题 + 发送报告)

这两个任务可以独立开发、测试,最后在主图中组合使用。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from langgraph.graph import StateGraph, START, END
from operator import add
from typing import Annotated

def nested_workflow():
# ========== 子图1:失败分析 ==========
def build_failure_subgraph():
builder = StateGraph(dict)

def find_failures(state):
"""筛选包含 error 字段的日志"""
logs = state["logs"]
failures = [log for log in logs if "error" in log]

print(f"🔍 找到 {len(failures)} 条失败日志")

return {
"failures": failures,
"fa_done": [f"FA:{log['id']}" for log in failures]
}

builder.add_node("find", find_failures)
builder.add_edge(START, "find")
builder.add_edge("find", END)

return builder.compile()

# ========== 子图2:问题汇总 ==========
def build_summary_subgraph():
builder = StateGraph(dict)

def summarize(state):
"""统计日志总数"""
logs = state["logs"]
summary = f"共 {len(logs)} 条日志"

print(f"📊 汇总结果: {summary}")

return {
"summary": summary,
"qs_done": [f"QS:{log['id']}" for log in logs]
}

builder.add_node("sum", summarize)
builder.add_edge(START, "sum")
builder.add_edge("sum", END)

return builder.compile()

# ========== 主图:组合子图 ==========
builder = StateGraph(dict)

def prepare_logs(state):
"""数据清洗(模拟)"""
print("🧹 数据清洗完成")
return {"logs": state["raw_logs"]}

# 🔥 关键:子图直接作为节点添加!
builder.add_node("clean", prepare_logs)
builder.add_node("failure_analysis", build_failure_subgraph()) # 子图1
builder.add_node("question_summary", build_summary_subgraph()) # 子图2

# 定义执行顺序
builder.add_edge(START, "clean")
builder.add_edge("clean", "failure_analysis") # 并行分支1
builder.add_edge("clean", "question_summary") # 并行分支2
builder.add_edge("failure_analysis", END)
builder.add_edge("question_summary", END)

graph = builder.compile()

# 测试数据
test_logs = [
{"id": "1", "text": "How to use ChatOllama?"},
{"id": "2", "text": "Chroma error: connection failed", "error": True}
]

print("\n🚀 开始执行嵌套工作流...\n")
result = graph.invoke({"raw_logs": test_logs})

print("\n" + "="*50)
print(" 最终结果:")
print(f" • 失败日志: {len(result.get('failures', []))} 条")
print(f" • 汇总报告: {result.get('summary')}")
print(f" • 处理标记: {result.get('fa_done', []) + result.get('qs_done', [])}")
print("="*50)

return graph

graph3 = nested_workflow()

📊 执行流程图

flowchart TB
    subgraph Main["📦 主图"]
        Start([▶ START]) --> Clean["clean\n数据清洗"]
        Clean --> FA["failure_analysis\n🔧 子图"]
        Clean --> QS["question_summary\n🔧 子图"]
        FA --> End([⏹ END])
        QS --> End
    end
    
    subgraph FA_Sub["失败分析子图"]
        FA_Start([▶]) --> Find["find\n筛选 error"]
        Find --> FA_End([⏹])
    end
    
    subgraph QS_Sub["问题汇总子图"]
        QS_Start([▶]) --> Sum["sum\n统计汇总"]
        Sum --> QS_End([⏹])
    end
    
    FA -.- Find
    QS -.- Sum
    
    style Main fill:#f3f4f6,stroke:#6b7280,stroke-width:2px
    style FA_Sub fill:#fef3c7,stroke:#f59e0b,stroke-width:2px
    style QS_Sub fill:#dbeafe,stroke:#3b82f6,stroke-width:2px

🖨️ 运行结果

1
2
3
4
5
6
7
8
9
10
11
12
🚀 开始执行嵌套工作流...

🧹 数据清洗完成
🔍 找到 1 条失败日志
📊 汇总结果: 共 2 条日志

==================================================
📋 最终结果:
• 失败日志: 1 条
• 汇总报告: 共 2 条日志
• 处理标记: ['FA:2', 'QS:1', 'QS:2']
==================================================

💡 核心要点

1. 如何定义子图?

1
2
3
4
5
6
def build_subgraph():
builder = StateGraph(dict)

# ...添加节点和边...

return builder.compile() # ⚠️ 记得编译!

2. 如何在主图中使用子图?

1
2
# 子图编译后,直接作为节点添加
builder.add_node("子图节点名", build_subgraph())

3. 状态如何传递?

1
2
3
4
5
6
7
8
9
主图状态: {"raw_logs": [...]}
↓ clean 节点
主图状态: {"logs": [...]}
↓ 传递给子图
子图状态: {"logs": [...]} # 自动接收父图的 logs
↓ 子图处理
子图输出: {"failures": [...], "fa_done": [...]}
↓ 合并回主图
主图状态: {..., "failures": [...], "fa_done": [...]}

4. 如何处理多个子图写入同一字段?(Reducer)

如果两个子图都要写 processed_logs 字段,需要声明如何合并:

1
2
3
4
5
6
from operator import add
from typing import Annotated

class State(TypedDict):
# 使用 Annotated 声明 reducer
processed_logs: Annotated[List[str], add] # 用 add 函数合并列表

📝 小结: 子图 = 独立模块,编译后可作为节点复用,状态自动传递和合并。


📚 核心概念速查表

把这张表保存下来,随时查阅!

概念 通俗理解 代码示例 注意事项
状态(State) 共享记事本 dictTypedDict 只返回要更新的字段
节点(Node) 工人函数 def func(state): return {"key": value} 纯函数,无副作用
边(Edge) 执行顺序 add_edge("A", "B") A 完成后才执行 B
并行 同时开工 从同一节点连出多条边 自动同步,无需手动等待
子图 模块复用 add_node("name", subgraph.compile()) 子图必须先编译
Reducer 合并规则 Annotated[List, add] 多节点写同一字段时使用

🛠️ 调试技巧(超实用!)

技巧1:打印图结构

1
2
3
4
5
# 查看所有节点
print("节点:", list(graph.get_graph().nodes.keys()))

# 查看所有边
print("边:", graph.get_graph().edges)

技巧2:单步执行观察

1
2
3
4
5
6
# 在节点函数里加 print
def debug_node(state):
print(f"🔍 进入节点,当前状态: {state}")
result = 处理逻辑(state)
print(f"✅ 离开节点,返回: {result}")
return result

技巧3:查看每一步的更新

1
2
3
# 使用 stream 模式
for step in graph.stream({"input": "test"}, stream_mode="updates"):
print(f"📍 步骤更新: {step}")

输出示例:

1
2
📍 步骤更新: {'greet': {'output': 'Hello, test!'}}
📍 步骤更新: {'excite': {'output': 'Hello, test! 🎉'}}

🎁 Bonus:常见错误及解决方案

❌ 错误1:节点返回了完整状态

1
2
3
4
5
6
7
8
9
# ❌ 错误写法
def wrong_node(state):
new_state = state.copy()
new_state["output"] = "xxx"
return new_state # 返回了整个状态!

# ✅ 正确写法
def correct_node(state):
return {"output": "xxx"} # 只返回变化的字段

❌ 错误2:忘记编译子图

1
2
3
4
5
# ❌ 错误
builder.add_node("subgraph", build_subgraph()) # 没加 ()

# ✅ 正确
builder.add_node("subgraph", build_subgraph().compile())

❌ 错误3:并行节点写同一字段未声明 reducer

1
2
3
4
5
6
7
# ❌ 错误(两个节点都写 processed_logs)
class State(TypedDict):
processed_logs: List[str] # 没声明如何合并

# ✅ 正确
class State(TypedDict):
processed_logs: Annotated[List[str], add] # 用 add 合并

📝 总结

恭喜你读完这篇文章!让我们回顾一下重点:

✅ 你学到了什么?

  1. 基础图: 节点 + 边 + 状态,三要素搞定简单工作流
  2. 并行执行: 从同一节点连出多条边,自动同步
  3. 嵌套子图: 模块化开发,编译后作为节点复用
  4. 状态合并: 使用 Annotated + reducer 处理冲突

🚀 下一步行动

立即实践:

  1. 复制 Level 1 代码,运行看效果
  2. 修改节点函数,实现你自己的逻辑
  3. 尝试添加第三个并行节点

进阶学习:

  • 学习条件边(add_conditional_edges
  • 探索循环和重试机制
  • 集成 LangSmith 进行追踪调试

参考资料