从0到1掌握 LangGraph:并行、嵌套、状态合并一网打尽
📌 导读
你是否遇到过这些问题:
- 看到 LangGraph 代码就头大,不知道从哪开始学?
- 知道要用,但看不懂嵌套子图和状态合并?
- 想写个工作流,却被各种概念绕晕?
别担心! 今天我用 3 个递进式示例,带你从零理解 LangGraph 的核心机制。
学完你将掌握:
✅ 节点、边、状态的基本用法
✅ 并行执行的实现方式
✅ 子图嵌套和状态合并技巧
无需基础,直接开干! 👇
🎯 核心概念一图看懂
在写代码之前,先记住这个公式:
1
| LangGraph = 状态字典 + 节点函数 + 执行顺序
|
打个比方:
- 状态(State) = 一个共享的记事本,所有节点都能读写
- 节点(Node) = 一个个工人,每人负责修改记事本的某些内容
- 边(Edge) = 工人之间的交接顺序
是不是简单多了?让我们开始实战!
🔰 Level 1:最简单的图(5分钟上手)
🎯 学习目标: 理解”节点如何修改状态”
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| from langgraph.graph import StateGraph, START, END
def simple_workflow(): builder = StateGraph(dict) def add_greeting(state): """节点1:添加问候语""" print(f"👋 收到输入: {state.get('input')}") return {"output": f"Hello, {state['input']}!"} def add_exclamation(state): """节点2:添加感叹号""" print(f"❗ 收到: {state['output']}") return {"output": state['output'] + " 🎉"} builder.add_node("greet", add_greeting) builder.add_node("excite", add_exclamation) builder.add_edge(START, "greet") builder.add_edge("greet", "excite") builder.add_edge("excite", END) graph = builder.compile() result = graph.invoke({"input": "World"}) print(f"✅ 最终结果: {result['output']}") return graph
graph1 = simple_workflow()
|
📊 执行流程图
flowchart LR
Start([▶ START]) --> Greet["greet\n加问候语"]
Greet --> Excite["excite\n加感叹号"]
Excite --> End([⏹ END])
style Start fill:#3b82f6,stroke:#1e40af,color:#fff
style End fill:#ef4444,stroke:#b91c1c,color:#fff
style Greet fill:#10b981,stroke:#059669,color:#fff
style Excite fill:#f59e0b,stroke:#d97706,color:#fff
🖨️ 运行结果
1 2 3
| 👋 收到输入: World ❗ 收到: Hello, World! ✅ 最终结果: Hello, World! 🎉
|
💡 核心要点
1. 节点函数怎么写?
1 2 3 4 5 6 7 8 9
| def 节点名(state): data = state["某个key"] result = 处理(data) return {"新的key": result}
|
2. 状态如何流动?
1 2 3 4 5
| 初始状态: {"input": "World"} ↓ greet 节点 中间状态: {"input": "World", "output": "Hello, World!"} ↓ excite 节点 最终状态: {"input": "World", "output": "Hello, World! 🎉"}
|
📝 小结: 每个节点只负责修改部分字段,LangGraph 会自动合并所有更新。
🔀 Level 2:并行分支(同时做两件事)
🎯 学习目标: 理解”如何让多个节点同时执行”
场景引入
假设你要分析一段文本:
串行做法: 先情感分析 → 再词数统计(慢!)
并行做法: 两个任务同时开始(快!)
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| from langgraph.graph import StateGraph, START, END
def parallel_workflow(): builder = StateGraph(dict) def analyze_sentiment(state): text = state["input"] sentiment = "😊 积极" if "good" in text.lower() else "😐 中性" print(f"📊 情感分析结果: {sentiment}") return {"sentiment": sentiment} def count_words(state): word_count = len(state["input"].split()) print(f"🔢 词数统计结果: {word_count}") return {"word_count": word_count} def merge_results(state): report = f"分析: {state['sentiment']}, 词数: {state['word_count']}" print(f"📝 生成报告: {report}") return {"report": report} builder.add_node("sentiment", analyze_sentiment) builder.add_node("counter", count_words) builder.add_node("merge", merge_results) builder.add_edge(START, "sentiment") builder.add_edge(START, "counter") builder.add_edge("sentiment", "merge") builder.add_edge("counter", "merge") builder.add_edge("merge", END) graph = builder.compile() result = graph.invoke({"input": "This is good news"}) print(f"✅ 最终报告: {result['report']}") return graph
graph2 = parallel_workflow()
|
📊 执行流程图
flowchart TB
Start([▶ START]) --> Sentiment["sentiment\n情感分析"]
Start --> Counter["counter\n词数统计"]
Sentiment --> Merge["merge\n合并结果"]
Counter --> Merge
Merge --> End([⏹ END])
style Start fill:#3b82f6,stroke:#1e40af,color:#fff
style End fill:#ef4444,stroke:#b91c1c,color:#fff
style Sentiment fill:#8b5cf6,stroke:#7c3aed,color:#fff
style Counter fill:#ec4899,stroke:#db2777,color:#fff
style Merge fill:#10b981,stroke:#059669,color:#fff
🖨️ 运行结果
1 2 3 4
| 📊 情感分析结果: 😊 积极 🔢 词数统计结果: 4 📝 生成报告: 分析: 😊 积极, 词数: 4 ✅ 最终报告: 分析: 😊 积极, 词数: 4
|
💡 核心要点
1. 如何创建并行分支?
1 2 3
| builder.add_edge(START, "节点A") builder.add_edge(START, "节点B")
|
2. 并行节点如何同步?
- LangGraph 会自动等待所有指向 merge 的边都完成后,才执行 merge 节点
- 无需手动加锁或等待
3. merge 节点如何获取数据?
1 2 3 4 5
| def merge_results(state): sentiment = state["sentiment"] word_count = state["word_count"] return {"report": ...}
|
📝 小结: 并行 = 从同一节点连出多条边,LangGraph 自动处理同步。
🎪 Level 3:嵌套子图(模块化复用)
🎯 学习目标: 理解”如何把复杂逻辑拆分成子图”
场景引入
想象你要开发一个日志分析系统:
- 子任务1: 失败分析(筛选错误日志 + 生成总结)
- 子任务2: 问题汇总(统计所有问题 + 发送报告)
这两个任务可以独立开发、测试,最后在主图中组合使用。
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| from langgraph.graph import StateGraph, START, END from operator import add from typing import Annotated
def nested_workflow(): def build_failure_subgraph(): builder = StateGraph(dict) def find_failures(state): """筛选包含 error 字段的日志""" logs = state["logs"] failures = [log for log in logs if "error" in log] print(f"🔍 找到 {len(failures)} 条失败日志") return { "failures": failures, "fa_done": [f"FA:{log['id']}" for log in failures] } builder.add_node("find", find_failures) builder.add_edge(START, "find") builder.add_edge("find", END) return builder.compile() def build_summary_subgraph(): builder = StateGraph(dict) def summarize(state): """统计日志总数""" logs = state["logs"] summary = f"共 {len(logs)} 条日志" print(f"📊 汇总结果: {summary}") return { "summary": summary, "qs_done": [f"QS:{log['id']}" for log in logs] } builder.add_node("sum", summarize) builder.add_edge(START, "sum") builder.add_edge("sum", END) return builder.compile() builder = StateGraph(dict) def prepare_logs(state): """数据清洗(模拟)""" print("🧹 数据清洗完成") return {"logs": state["raw_logs"]} builder.add_node("clean", prepare_logs) builder.add_node("failure_analysis", build_failure_subgraph()) builder.add_node("question_summary", build_summary_subgraph()) builder.add_edge(START, "clean") builder.add_edge("clean", "failure_analysis") builder.add_edge("clean", "question_summary") builder.add_edge("failure_analysis", END) builder.add_edge("question_summary", END) graph = builder.compile() test_logs = [ {"id": "1", "text": "How to use ChatOllama?"}, {"id": "2", "text": "Chroma error: connection failed", "error": True} ] print("\n🚀 开始执行嵌套工作流...\n") result = graph.invoke({"raw_logs": test_logs}) print("\n" + "="*50) print(" 最终结果:") print(f" • 失败日志: {len(result.get('failures', []))} 条") print(f" • 汇总报告: {result.get('summary')}") print(f" • 处理标记: {result.get('fa_done', []) + result.get('qs_done', [])}") print("="*50) return graph
graph3 = nested_workflow()
|
📊 执行流程图
flowchart TB
subgraph Main["📦 主图"]
Start([▶ START]) --> Clean["clean\n数据清洗"]
Clean --> FA["failure_analysis\n🔧 子图"]
Clean --> QS["question_summary\n🔧 子图"]
FA --> End([⏹ END])
QS --> End
end
subgraph FA_Sub["失败分析子图"]
FA_Start([▶]) --> Find["find\n筛选 error"]
Find --> FA_End([⏹])
end
subgraph QS_Sub["问题汇总子图"]
QS_Start([▶]) --> Sum["sum\n统计汇总"]
Sum --> QS_End([⏹])
end
FA -.- Find
QS -.- Sum
style Main fill:#f3f4f6,stroke:#6b7280,stroke-width:2px
style FA_Sub fill:#fef3c7,stroke:#f59e0b,stroke-width:2px
style QS_Sub fill:#dbeafe,stroke:#3b82f6,stroke-width:2px
🖨️ 运行结果
1 2 3 4 5 6 7 8 9 10 11 12
| 🚀 开始执行嵌套工作流...
🧹 数据清洗完成 🔍 找到 1 条失败日志 📊 汇总结果: 共 2 条日志
================================================== 📋 最终结果: • 失败日志: 1 条 • 汇总报告: 共 2 条日志 • 处理标记: ['FA:2', 'QS:1', 'QS:2'] ==================================================
|
💡 核心要点
1. 如何定义子图?
1 2 3 4 5 6
| def build_subgraph(): builder = StateGraph(dict) return builder.compile()
|
2. 如何在主图中使用子图?
1 2
| builder.add_node("子图节点名", build_subgraph())
|
3. 状态如何传递?
1 2 3 4 5 6 7 8 9
| 主图状态: {"raw_logs": [...]} ↓ clean 节点 主图状态: {"logs": [...]} ↓ 传递给子图 子图状态: {"logs": [...]} # 自动接收父图的 logs ↓ 子图处理 子图输出: {"failures": [...], "fa_done": [...]} ↓ 合并回主图 主图状态: {..., "failures": [...], "fa_done": [...]}
|
4. 如何处理多个子图写入同一字段?(Reducer)
如果两个子图都要写 processed_logs 字段,需要声明如何合并:
1 2 3 4 5 6
| from operator import add from typing import Annotated
class State(TypedDict): processed_logs: Annotated[List[str], add]
|
📝 小结: 子图 = 独立模块,编译后可作为节点复用,状态自动传递和合并。
📚 核心概念速查表
把这张表保存下来,随时查阅!
| 概念 |
通俗理解 |
代码示例 |
注意事项 |
| 状态(State) |
共享记事本 |
dict 或 TypedDict |
只返回要更新的字段 |
| 节点(Node) |
工人函数 |
def func(state): return {"key": value} |
纯函数,无副作用 |
| 边(Edge) |
执行顺序 |
add_edge("A", "B") |
A 完成后才执行 B |
| 并行 |
同时开工 |
从同一节点连出多条边 |
自动同步,无需手动等待 |
| 子图 |
模块复用 |
add_node("name", subgraph.compile()) |
子图必须先编译 |
| Reducer |
合并规则 |
Annotated[List, add] |
多节点写同一字段时使用 |
🛠️ 调试技巧(超实用!)
技巧1:打印图结构
1 2 3 4 5
| print("节点:", list(graph.get_graph().nodes.keys()))
print("边:", graph.get_graph().edges)
|
技巧2:单步执行观察
1 2 3 4 5 6
| def debug_node(state): print(f"🔍 进入节点,当前状态: {state}") result = 处理逻辑(state) print(f"✅ 离开节点,返回: {result}") return result
|
技巧3:查看每一步的更新
1 2 3
| for step in graph.stream({"input": "test"}, stream_mode="updates"): print(f"📍 步骤更新: {step}")
|
输出示例:
1 2
| 📍 步骤更新: {'greet': {'output': 'Hello, test!'}} 📍 步骤更新: {'excite': {'output': 'Hello, test! 🎉'}}
|
🎁 Bonus:常见错误及解决方案
❌ 错误1:节点返回了完整状态
1 2 3 4 5 6 7 8 9
| def wrong_node(state): new_state = state.copy() new_state["output"] = "xxx" return new_state
def correct_node(state): return {"output": "xxx"}
|
❌ 错误2:忘记编译子图
1 2 3 4 5
| builder.add_node("subgraph", build_subgraph())
builder.add_node("subgraph", build_subgraph().compile())
|
❌ 错误3:并行节点写同一字段未声明 reducer
1 2 3 4 5 6 7
| class State(TypedDict): processed_logs: List[str]
class State(TypedDict): processed_logs: Annotated[List[str], add]
|
📝 总结
恭喜你读完这篇文章!让我们回顾一下重点:
✅ 你学到了什么?
- 基础图: 节点 + 边 + 状态,三要素搞定简单工作流
- 并行执行: 从同一节点连出多条边,自动同步
- 嵌套子图: 模块化开发,编译后作为节点复用
- 状态合并: 使用
Annotated + reducer 处理冲突
🚀 下一步行动
立即实践:
- 复制 Level 1 代码,运行看效果
- 修改节点函数,实现你自己的逻辑
- 尝试添加第三个并行节点
进阶学习:
- 学习条件边(
add_conditional_edges)
- 探索循环和重试机制
- 集成 LangSmith 进行追踪调试
参考资料