LangGraph 持久化完全指南:从零搭建永不丢失状态的 AI Agent 系统
前言在构建 AI Agent 应用时你是否遇到过这样的困扰用户刚说完自己的需求下一次提问时智能体就“失忆”了工作流执行到一半时服务器意外崩溃所有进度付之东流一个涉及多次人工审核的复杂流程被打断后就再也接续不上。这些问题本质上都指向同一个核心挑战——状态管理。LangGraph 通过其强大的持久化Persistence与持久化执行Durable Execution能力为开发者提供了一套系统性的解决方案。正如古希腊先哲埃斯库罗斯所说“记忆是所有智慧的母亲”一个优秀的 AI 智能体持久化的记忆能力恰恰是其“智慧”的真正基石。本文将从实战角度出发带你系统掌握 LangGraph 持久化的核心原理与开发全流程。1. 深入理解 LangGraph 持久化的核心原理1.1 什么是持久化在 LangGraph 的语境下持久化Persistence是指将图状态state在各个执行步骤中的快照保存到持久化存储中的机制。通俗来说类似视频游戏中的“存档点”——当你玩一款角色扮演游戏时可以在关键节点保存游戏进度即使第二天关机重启加载保存的存档后便能从之前的进度接着玩而无须重新从头玩一遍。LangGraph 的持久化机制通过检查点Checkpoint来实现。每当编译图时通过指定检查点保存器checkpointerLangGraph 会在图的每个“超级步骤”super-step执行后自动保存一次状态快照。这些检查点会被归类到不同的线程Thread中你可以把线程理解为某一轮完整对话或某一次特定任务执行的唯一标识容器。每个线程都有一个唯一的 thread_id通过这个 ID可以追溯、恢复甚至“穿越”时间旅行到该任务历史上的任一保存点。1.2 检查点的核心结构每个检查点实际上是一个 StateSnapshot 对象包含了几个关键部分values在当前检查点下图的状态变量里面具体存了什么数据。next下一步将要执行哪些节点以元组形式列出即图的“执行计划”。config包含 thread_id 和 checkpoint_id 的配置用于定位特定检查点。metadata存放元数据信息例如检查点的来源、写入时间戳等。tasks如果该步骤之前已经尝试过但失败了tasks 中会包含错误信息如果图从节点内部被动态中断还会带有与中断相关的额外数据。理解这些基础概念后我们开始动手写代码。1.3 准备开发环境在尝试运行任何代码之前需要先配置好 Python 运行环境。# 推荐使用 Python 3.10 或以上版本 python --version # 创建并激活虚拟环境 python -m venv langgraph_persistence_env # Windows: langgraph_persistence_env\Scripts\activate # Mac/Linux: source langgraph_persistence_env/bin/activate # 安装 LangGraph 核心库 # LangGraph 是核心框架库负责提供图结构、构建和执行能力 pip install langgraph # 类型提示扩展非必须但推荐安装 pip install typing-extensions2. 从零开始内存检查点实战内存检查点InMemorySaver是 LangGraph 中最简单直接的检查点实现方式。它直接将状态存储在程序运行的内存RAM中进程一旦关闭或程序崩溃所有保存的状态就会彻底消失。内存检查点适合什么场景它非常适合代码的快速原型验证、单元测试或纯粹的教学演示因为它无需配置数据库或任何外部依赖。下面的完整代码将演示一个包含三个步骤的线性工作流# persistence_demo_memory.py # 本代码演示如何使用 InMemorySaver 在内存中实现工作流的状态持久化 # 通过 thread_id 机制来保持对话或任务的状态延续并模拟了一个多步骤的工作流 # 当你携带相同的 thread_id 进行调用时LangGraph 会自动加载上次中断时的状态 from typing import Annotated, TypedDict from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import InMemorySaver import operator # ----- 第一步定义一个状态类 ----- # 这个状态类决定了我们的图中将存储什么样的数据结构 # LangGraph 以 状态字典 的形式在节点之间传递数据 # Annotated 配合 operator.add 是一种常用的列表累加模式可以自动将新的列表元素追加到现有列表后面 class PersistenceDemoState(TypedDict): # operator.add 是 Reducer 函数——告诉 LangGraph 当新数据进入时是覆盖旧的还是合并。 # 此处配置 operator.add每次 return 都会将新消息自动追加到 messages 列表中而不是覆盖 messages: Annotated[list, operator.add] step_count: Annotated[int, operator.add] # step_count 同样采用累加模式 # ----- 第二步定义工作流节点Node函数 ----- # 每个节点函数接收当前状态state然后返回需要更新的字典 # 返回的字典中key 必须与 State 中定义的字段匹配 def step_one(state: PersistenceDemoState) - dict: print(执行步骤 1正在处理初始数据...) return { messages: [执行了步骤 1], step_count: 1 } def step_two(state: PersistenceDemoState) - dict: print(执行步骤 2正在处理中间逻辑...) return { messages: [执行了步骤 2], step_count: 1 } def step_three(state: PersistenceDemoState) - dict: print(执行步骤 3正在生成最终结果...) return { messages: [执行了步骤 3], step_count: 1 } # ----- 第三步构建图结构StateGraph----- def create_graph(): # 初始化图构造器将上面定义的状态类传入 builder StateGraph(PersistenceDemoState) # 添加节点“step_one”是节点的名称step_one是对应的函数 builder.add_node(step_one, step_one) builder.add_node(step_two, step_two) builder.add_node(step_three, step_three) # 添加边定义流程顺序 # START 是 LangGraph 内置常量代表图的入口 builder.add_edge(START, step_one) builder.add_edge(step_one, step_two) builder.add_edge(step_two, step_three) builder.add_edge(step_three, END) return builder # ----- 第四步主函数入口 ----- def main(): print( LangGraph 内存持久化存储完整演示 \n) # 1. 创建内存存储器实例 # 这是持久化的“引擎”负责每个步骤结束后将状态存入内存 memory InMemorySaver() # 2. 编译图将内存存储器绑定到图上 builder create_graph() # 关键点在调用 compile() 时传入 checkpointer 参数将持久化能力注入到图中 app builder.compile(checkpointermemory) # 3. 配置线程标识符 thread_id # 可以把 thread_id 想象成聊天窗口中具体的“对话编号” # 无论调用多少次 invoke只要是同一个 thread_idLangGraph 就会自动读取该会话的历史状态 config {configurable: {thread_id: demo_thread_001}} # 4. 首次执行工作流从头开始 print(【1】首次执行工作流...) # 为了观察累加效果初始状态传入一个初始性的消息和步数 final_state app.invoke({ messages: [开始执行], step_count: 0 }, config) print(f执行结果最终状态: {final_state}\n) # 5. 查看当前保存的状态 print(【2】调用 get_state 查看当前保存的状态) # get_state 方法可以查看该 thread_id 对应的当前状态摘要包括下一个即将执行的节点 current_snapshot app.get_state(config) print(f当前状态数据: {current_snapshot.values}) print(f下一个待执行的节点: {current_snapshot.next}\n) # 6. 第二次调用尝试恢复已经执行完的线程 # 注意这里传入了 None 作为输入数据 # 当工作流已经完成时即到达 END再次调用 invoke 会直接返回最终状态 print(【3】尝试恢复已经完成的工作流) resumed_state app.invoke(None, config) print(f恢复执行得到的最终状态: {resumed_state}) # 展开 messages 列表可以看到所有步骤的执行记录都完整保留了 print(f累积的消息列表: {resumed_state[messages]}) print(f累积的步数统计: {resumed_state[step_count]}\n) # 7. 实战演示用不同的 thread_id 开启全新会话 print(【4】使用新的 thread_id 开启全新会话) new_config {configurable: {thread_id: demo_thread_002}} new_state app.invoke({ messages: [本次是新会话], step_count: 0 }, new_config) print(f新会话执行结果: {new_state}) print(注意新会话的状态完全独立于之前的 demo_thread_001互不干扰) print(\n 内存持久化演示结束 ) if __name__ __main__: main()2.1 代码详解与运行结果解析上述代码首次执行时invoke 方法会沿着构建好的执行路径走完全程步骤1→步骤2→步骤3→END并将每一步的状态自动保存到 InMemorySaver 中最终的消息列表会累加所有节点返回的段落工作流已完成对应保存。核心知识点thread_id 是持久化的关键钥匙。只要携带相同的 thread_id无论执行多少次 invokeLangGraph 都会自动尝试加载该会话的历史检查点。3. 深入持久化执行Durable Execution3.1 什么是持久化执行持久化执行是 LangGraph 持久化能力的高级延伸。如果说持久化侧重于“保存快照”那么持久化执行则侧重于“基于保存的进度实现可靠续跑”。传统的程序执行往往是“瞬间完成”或“灾难性崩溃”——一旦进程在某个中间步骤中断所有已执行的任务全部丢失。但在 AI Agent 流水线中一个工作流可能需要调用昂贵的 LLM、经过多轮人工审核、甚至持续数小时。这种“中断就是毁灭”的模式在生产环境中无疑是行不通的。持久化执行的核心思想是工作流在每个关键节点保存进度使得它不仅能暂停还能在未来某个时间点——无论是几秒钟后还是几天后——从精确暂停的位置接着执行而不需要从头跑完所有已完成的工作。3.2 三大持久化模式Durability ModeLangGraph 提供了三种持久性模式从低到高提供了不同粒度的状态保存策略开发者可以根据具体场景在性能和可靠性之间做权衡。1. “exit” 模式低持久化仅在整个工作流执行完成无论是成功还是报错时才将最终状态写入持久化存储。这种模式性能最佳、开销最小但缺点是在工作流执行的中途任何崩溃或中断都会导致进度全部丢失。适用场景一次性执行的、短暂的小任务例如批处理或快速查询如果中断从头处理也无伤大雅的时候。2. “async” 模式异步持久化在下一步执行的同时当前步的状态会异步存储。这意味着 LangGraph 不会“等待”存储完成才继续干活减少了执行过程中的卡顿。适用场景这也是默认模式。它平衡了性能和可靠性但风险在于如果程序在“刚刚准备存储但还没彻底写完”的那几微秒中突然崩溃检查点可能丢失后续恢复时可能回退到上一步或从零开始。3. “sync” 模式同步持久化在下一个步骤开始之前当前步骤的状态被同步写入存储介质。只有当存储操作彻底完成后图才会继续推进至下一环节。虽然这种模式性能开销最大但它提供了最强有力的可靠性保障——每个检查点必定被安全保存。适用场景金融支付验证、医疗审核流程等高风险场景任何一步的状态丢失都可能导致重大错误时必须选用 sync 模式。如何在代码中使用# 示例指定持久化模式推荐结合 sync 或 async 使用 result app.stream( {messages: [开始处理]}, config, durabilitysync # 可选的三种模式sync / async / exit )3.3 如何从故障或中断中恢复执行当工作流因为服务器异常重启、网络断连、API 调用超时等原因中断时只需做两件事1拥有相同的 thread_id2传入 None 作为输入值。LangGraph 会自动找到最近的成功检查点从中恢复执行。import time # 假设上次运行中断了。只需再次调用 invoke传入 None 作为 input # LangGraph 将自动从最后一个成功的检查点恢复 try: final_result app.invoke(None, config) print(工作流已恢复完成) except Exception as e: print(f工作流恢复遇到错误: {e})3.4 持久化执行最佳实践确保确定性与幂等性恢复工作流时LangGraph 并不是从代码中断的“该行”直接跳回运行。它会根据 checkpoint 中记录的版本确定一个合适的起始节点并从该节点开始重新执行后续节点。这意味着非确定性操作例如随机数生成 random.random()在重放时可能产生不同结果导致状态不一致。副作用操作例如写入文件、发送 HTTP 请求、扣减余额如果被重复执行可能导致严重的重复写入或重复扣费问题。为了确保恢复时状态一致建议将副作用或非确定性操作封装在任务节点中以便重放时直接从持久层读取结果而非真实执行。实现幂等性。以 API 调用为例在处理支付逻辑时可以传入一个唯一的幂等键idempotency key。服务端通过检查该键来决定重复请求是否忽略从而保证即使工作流因故恢复并重新发出请求也不会造成重复扣款。避免重复劳动如果一个节点包含多个副作用操作请分别封装到单独任务中。恢复时已完成的任务结果将从持久化层自动检索不会被重复执行。4. 生产级数据库持久化方案尽管 InMemorySaver 在本地测试或教学演示中很方便但它毕竟无法真正解决持久化问题。当 AI Agent 系统部署到真实生产中我们需要探讨的是真实数据库。LangGraph 官方以一种非常优雅的解耦方式设计基础检查点保存器BaseCheckpointSaver作为接口各种主流数据库的集成作为独立库进行维护。LangGraph 支持的主要数据库方案有SQLite轻量级文件数据库适合本地开发和单机小型项目。PostgreSQL功能强大的企业级关系型数据库支持高并发和复杂查询推荐用于生产环境。AWS DynamoDB / MongoDB云原生 NoSQL 方案具备极强的可扩展性适合大规模分布式部署。Redis / Mem0高性能内存缓存数据库适合对速度要求极高的低延迟场景。4.1 SQLite 检查点实战教程SQLite 是一个纯 C 语言编写的轻量级嵌入式数据库它将整个数据库存储为一个独立的文件无需额外的服务器进程或配置。这对于初期开发、本地测试或教育用途非常友好。下面演示如何在刚构建的持久化 LangGraph 中使用 SqliteSaver 将所有检查点真正存储在本地硬盘上。第一步安装必要的扩展库pip install langgraph-checkpoint-sqlite第二步运行完整代码示例以下是使用 SQLite 作为持久化后端存储 LangGraph 状态的工作示例。此脚本包含初始化数据库连接、创建表结构、运行多步骤图以及关闭资源等标准生产流程。# persistence_demo_sqlite.py # 本代码演示如何使用真正的 SQLite 文件数据库来存储 LangGraph 的工作流状态 # 与内存存储不同的是即使整个 Python 程序退出后重新启动使用同样的 thread_id # 仍然可以准确恢复之前的执行历史——这是实现生产级持久化的核心能力 import sqlite3 import operator from typing import TypedDict, Annotated from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.sqlite import SqliteSaver import os # 为确保数据库存储成功建立 sqlite_data 文件夹如果还未建立 DATA_DIR ./sqlite_data os.makedirs(DATA_DIR, exist_okTrue) class MyState(TypedDict): messages: Annotated[list, operator.add] def node_1(state: MyState) - dict: print(Node 1 正在运行生成测试数据) # 注意operator.add 模式下返回的这条 messages 会被自动追加到历史列表中去 return {messages: [abc, def]} def main(): # 定义 SQLite 数据库文件路径 db_path os.path.join(DATA_DIR, langgraph_sqlite.db) print(fSQLite 数据库文件将保存在: {db_path}) # 创建 SQLite 数据库连接 # check_same_threadFalse 允许在多线程环境中提交使用在这里确保非严格限制 conn sqlite3.connect( db_path, check_same_threadFalse ) # 绑定 SqliteSaver 必须要经过 conn 接管 # 初始化内存持久化类告知 LangGraph 把状态保存到 conn 连接指向的数据库中 memory SqliteSaver(connconn) # 构建图结构 builder StateGraph(MyState) builder.add_node(node_1, node_1) builder.add_edge(START, node_1) builder.add_edge(node_1, END) # 编译图并引入真正的数据库持久化能力 checkpointer graph builder.compile(checkpointermemory) # 配置线程标识符 config {configurable: {thread_id: sqlite_thread_001}} # 1. 查看初始状态第一次理应没有任何消息 initial_state graph.get_state(config) print(f首次读取初始状态: {initial_state.values}) # 2. 调用执行图状态都将被持久化写入 sqlite 文件 print(开始调用图执行 node_1...) result graph.invoke({messages: []}, config) print(f执行后的返回结果: {result}) # 3. 再次调用 get_state 可看到持久化后的状态 final_state graph.get_state(config) print(f持久化保存后的最终状态: {final_state.values}) # 4. 关键验证点关闭当前程序使用的数据库连接然后重启一个新的连接来模拟“程序重启” # 在实际生产环境程序退出了再次运行此脚本。我们要确保依然能通过 thread_id 恢复历史对话 conn.close() print(\n--- 模拟程序重启重新连接数据库 ---) # 重新建立连接并读取同一个 thread_id 的状态 new_conn sqlite3.connect(db_path, check_same_threadFalse) new_memory SqliteSaver(connnew_conn) new_graph builder.compile(checkpointernew_memory) recovered_state new_graph.get_state(config) print(f程序重启后通过同一个 thread_id {config[configurable][thread_id]} 恢复的状态: {recovered_state.values}) new_conn.close() if __name__ __main__: main()运行脚本前请务必确保 sqlite_data 目录存在或者让代码自动创建它。使用 SQLite 进行持久化最大的优势是可以在没有任何外部基础设施如 Docker、云数据库的情况下模拟生产级别的状态持久化这对于本地开发和功能验证非常有帮助。4.2 PostgreSQL 生产环境方案PostgreSQL 是企业级应用中最受欢迎的开源数据库之一具备完整的 ACID 特性、强大的并发控制能力和丰富的数据类型支持。当你将 LangGraph 工作流部署到云服务器或 Kubernetes 集群时PostgreSQL 应该是首选的生产级持久化方案。pip install langgraph-checkpoint-postgres psycopg[binary]# 同步版本使用示例 from langgraph.checkpoint.postgres import PostgresSaver from psycopg.rows import dict_row import psycopg # 确保连接保留了 autocommitTrue 以便事务管理 conn psycopg.connect( postgresql://user:passwordlocalhost:5432/langgraph_db, autocommitTrue, row_factorydict_row ) # 创建数据库表结构首次运行时执行 checkpointer PostgresSaver(conn) checkpointer.setup() # 这一步会在数据库中创建 LangGraph 所需的元数据表 graph builder.compile(checkpointercheckpointer)PostgreSQL 检查点与 SQLite 在 API 层面几乎一致但生产部署时建议使用异步版本 AsyncPostgresSaver 以提升并发处理能力。PostgreSQL 的实现将通道值分为两个部分存储原始类型字符串、整数等保存在主表复杂对象如大型 JSON存储在独立的 blob 表中这种分离设计显著提升了查询和写入效率。4.3 云数据库方案概览在微服务和大型分布式架构中云原生数据库提供了自动弹性扩展、托管运维和全球部署等显著优势DynamoDBAWSAWS 为 LangGraph 官方维护了 langgraph-checkpoint-amazon-dynamodb 包采用单表设计模式支持自动 TTL 过期清理和按需计费是无服务器架构的理想选择。MongoDBMongoDB 为 LangGraph 提供了 langgraph-store-mongodb 包原生支持 JSON 文档存储和向量检索尤其适合需要结合语义检索构建长期记忆的场景。Aerospike提供基于 Aerospike 数据库的高性能检查点方案适合对延迟极度敏感的超大规模部署。自定义方案所有方案均通过实现 BaseCheckpointSaver 抽象类即可接入。选择哪种数据库方案取决于你的具体需求——数据量、并发请求规模、是否需要跨区域部署、现有技术栈等。5. 高级特性状态查询与“时间旅行”LangGraph 的持久化层不仅用于容错恢复还提供了强大的调试和审计能力。通过以下两个关键方法可以深入了解图的执行情况graph.get_state(config)获取该 thread_id 的当前最新状态包括状态值和下一个将要执行的节点。graph.get_state_history(config)获取该 thread_id 完整的历史检查点列表按时间从新到旧排列。5.1 查看历史状态的完整代码示例# 在之前具备 SqliteSaver 或者 InMemorySaver 的基础上继续执行以下代码 # 假设 graph 已经被编译并绑定了 checkpointerconfig 中包含了 thread_id print(查看线程历史执行轨迹) # 设置 limit 参数可以限制返回历史检查点的最大数量避免一次性加载过多 history_list list(graph.get_state_history(config, limit10)) for idx, snapshot in enumerate(history_list): print(f--- 历史条目 {idx 1} ---) print(f快照值: {snapshot.values}) print(f下一个应执行的节点: {snapshot.next}) print(f检查点元数据: {snapshot.metadata}) print(f检查点 ID 配置: {snapshot.config})传统调试中如果 Agent 出错通常是盲目的猜测发生了什么。借助 get_state_history可以“回溯时间”观察每一个超步后的内部状态变化。这对于审查 AI 大模型中间的推理链条、验证路由逻辑或者排查变量传递错误都非常高效。5.2 “时间旅行”回退至任意历史状态“时间旅行”指的不只是一种科幻概念——你可以把它想象成 Git 的版本回退。通过指定一个历史 checkpoint_id让工作流从那个快照位置重新运行用来探索“假如我当时选择走另一条路径会发生什么”或者修复运行中的错误状态。# 获取历史列表 hist list(graph.get_state_history(config)) if len(hist) 1: older_checkpoint_config hist[1].config # 取上一步的 checkpoints 配置 print(f回退到历史检查点: {older_checkpoint_config}) # 从这个历史点恢复执行并且可以携带新的输入 recovered_result graph.invoke(None, older_checkpoint_config) print(f从历史点恢复后的结果: {recovered_result})6. 人机协同与长时间任务实践持久化执行最具代表性的应用场景之一就是“人机协同”Human-in-the-loop。想象一下典型复杂审批流程当 LLM 需要执行一个敏感操作如付款、发布内容、删除数据时系统可以“暂停”执行向人类发送一个确认审批的消息等待用户的反馈。而这一整个过程中所有前置计算的状态并不会丢失——这完全依赖于 LangGraph 的持久化能力。在这种模式中持久化执行允许我们在任意节点设置“中断点”。终端用户可以隔几分钟、几小时甚至几天返回系统通过 API 传入最终决定工作流会从精确的中断点继续执行。# 伪代码示例在人机协同中使用中断与恢复 from langgraph.checkpoint import interrupt def approval_node(state): approval_status interrupt( prompt请批准此项操作输入 approve 或 reject, # 用户返回的答案会被 LangGraph 存储为状态 ) if approval_status approve: return {approved: True} else: return {approved: False}用户可以在任意时刻通过继续 API 发送决定由于 checkpointer 持久化了所有这些状态系统能确保只执行一次敏感操作且能精准恢复至决策后的后续逻辑。6.1 长时间运行任务的挑战与对策在 AI Agent 实际生产环境中工作流可能因为 LLM API 超时、网络抖动、数据库连接断裂而导致部分失败。传统的简单 try-except 配合手动重试往往力不从心。但当你的图绑定了可靠的数据库检查点比如 PostgresSaver后事情就变得简单多了import time from tenacity import retry, stop_after_attempt retry(stopstop_after_attempt(3)) def risky_api_call(): # 此函数调用可能超时的第三方 LLM pass def my_node(state): try: result risky_api_call() return {result: result} except Exception as e: # 出现异常时LangGraph 框架已经自动保存了该节点前的检查点 # 因此抛出异常促使图停止随后通过 app.invoke(None, config) 重试即可恢复 raise e当整个工作流因外部服务中断而失败时运维人员或调度系统只需用同一个 thread_id 再次调用 app.invoke(None, config)之前成功的节点不会重做只会从中断点继续——这正是持久化执行在生产环境中的强大之处。7. 总结核心概念清晰LangGraph 的持久化通过检查点机制实现每个检查点是图状态在某“超级步骤”的完整快照。每个线程thread_id对应一个独立的执行会话。三个不同的层次InMemorySaver 侧重开发调试SqliteSaver 适合本地轻量级部署PostgresSaver 和其他云数据库方案适用于大并发、关键任务的生产环境。持久化执行确保业务连续使用 async 或 sync 模式可以确保工作流在故障中断后无缝恢复。对于高风险场景推荐 sync 实现最大化的容错能力。高效调试与监控通过 get_state 和 get_state_history 能够完整追踪 AI Agent 的内部运行轨迹甚至通过“时间旅行”回退至任意历史快照排查问题。适用场景广这些技术为人机协同审批、财务数据处理、复杂多智能体系统、容错服务器调度等场景提供了坚实的技术基石。对于刚接触 LangGraph 持久化的开发者建议首先从 InMemorySaver 和 SQLite 方案入手理解基础机制。在生产环境中则应当优先考虑 PostgreSQL 或云托管数据库方案以保障状态的可靠性和可扩展性。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2559554.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!