深入解析 LangGraph Checkpoint
一、LangGraph Checkpoint 的核心设计目标LangGraph Checkpoint 解决的并不是简单的“存储状态”问题而是复杂工作流系统中的可恢复执行问题。从架构角度看它承担了四个关键职责1️⃣持久化状态管理保存 Graph 的完整状态使系统可以跨请求恢复执行。2️⃣工作流恢复能力Fault Tolerance当任务执行失败或系统重启时可以从最近状态继续执行。3️⃣时间旅行调试Time Travel Debugging开发者可以回滚到历史状态重新执行。4️⃣Human-in-the-loop 支持允许工作流暂停等待人工输入然后恢复执行。这四个能力构成了 LangGraph Agent 能够进入生产环境的重要基础。二、Checkpoint 在 LangGraph 架构中的位置LangGraph 的运行时架构可以抽象为四层Application Layer │ ▼ Graph Runtime │ ▼ Checkpoint System │ ▼ Storage Backend其中Graph Runtime负责节点执行、调度、并行控制。Checkpoint System负责保存和恢复 Graph State。Storage Backend负责物理存储数据库 / Redis / SQLite 等。Checkpoint 系统是连接运行时与存储层的关键组件。三、Pregel 执行模型与 Super-step理解 Checkpoint 的关键是理解 LangGraph 的执行模型。LangGraph 的执行模型借鉴了Pregel 分布式图计算模型。Pregel 模型核心思想图计算被分为一系列Super-step超级步骤superstep_1 superstep_2 superstep_3 ...每个 super-step 包含节点执行消息传递状态更新在 LangGraph 中Graph State │ ▼ Node Execution │ ▼ Channel Updates │ ▼ Next Node SelectionCheckpoint 就是在 super-step 边界生成的。Super-step 与 Checkpoint 的关系假设一个简单 GraphSTART → A → B → C → END执行过程superstep0 initial state superstep1 node A superstep2 node B superstep3 node CCheckpoint 会自动生成checkpoint0 checkpoint1 checkpoint2 checkpoint3每个 checkpoint 都保存Graph state snapshot因此Checkpoint Super-step 边界的状态快照四、Checkpoint 的核心数据结构LangGraph 使用两个核心对象描述状态快照Checkpoint StateSnapshot4.1 StateSnapshotStateSnapshot 是真正的状态载体。其核心字段包括StateSnapshot ├─ values ├─ next ├─ tasks ├─ metadata ├─ config ├─ created_at └─ parent_config下面逐一解释。valuesvalues: Dict[str, Any]保存Graph channel 的当前值。例如{ messages: [...], tools_result: ..., analysis: ... }这是 Graph 的核心业务数据。nextnext: tuple[str]表示下一步要执行的节点例如(tool_executor,)如果为空()说明Graph execution finishedtaskstasks: List[Task]保存待执行任务信息。可能包含未完成任务中断信息错误信息例如interrupt waiting for human inputmetadatametadata 包含执行信息step source parents例如{ step: 4, source: loop }用于调试tracingobservabilityparent_configCheckpoint 形成一个链式结构checkpoint1 │ ▼ checkpoint2 │ ▼ checkpoint3通过parent_config建立父子关系。这使得系统可以time travel五、ThreadLangGraph 的会话隔离机制Checkpoint 体系的核心概念之一是Thread。Thread 是checkpoint sequence结构如下thread_id ├─ checkpoint1 ├─ checkpoint2 ├─ checkpoint3也就是说Thread 一个工作流会话典型映射场景thread_id聊天user_session_idAgenttask_idWorkflowrun_id因此 thread_id 是LangGraph 中最重要的配置参数之一。示例config{configurable:{thread_id:user_123}}六、Checkpointer持久化接口Checkpoint 的存储由Checkpointer完成。核心抽象类BaseCheckpointSaver定义了统一接口。主要方法包括put() putWrites() getTuple() list()put()put(checkpoint)保存 checkpoint。getTuple()getTuple(thread_id)读取 checkpoint。list()list(thread_id)列出历史 checkpoint。putWrites()LangGraph 的一个高级特性。用于保存pending writes七、Pending Writes 与部分恢复LangGraph 的 checkpoint 系统支持部分恢复Partial Recovery。假设superstep ├─ NodeA success └─ NodeB fail系统会保存NodeA result恢复时NodeA 不重新执行 NodeB 重新执行这样可以避免重复计算这是 LangGraph 相比传统 workflow 引擎的重要优势。八、序列化机制SerdeCheckpoint 数据需要在内存和数据库之间传输。LangGraph 使用SerializerProtocol。默认实现JsonPlusSerializer特点支持 datetime支持 tuple支持 Python object因此 Graph state 不需要强制 JSON 化。九、存储后端架构LangGraph 的 checkpoint 系统支持多个存储后端。常见实现BackendPackageSourceIn-memorylanggraph-checkpointlangchain-ai/langgraphSQLitelanggraph-checkpoint-sqlitelangchain-ai/langgraphPostgreSQLlanggraph-checkpoint-postgreslangchain-ai/langgraphAWS (DynamoDB, Bedrock, Valkey)langgraph-checkpoint-awslangchain-ai/langchain-awsMongoDBlanggraph-checkpoint-mongodblangchain-ai/langchain-mongodbRedislanggraph-checkpoint-redisredis-developer/langgraph-redisCockroach DBlangchain-cockroachdbcockroachdb/langchain-cockroachdb生产环境推荐PostgreSQL原因事务支持稳定性并发能力十、Checkpoint 支持的高级能力Checkpoint 不只是持久化。它直接支持四个高级能力。1 会话记忆Graph state 会跨请求保存。例如聊天 AgentUser → Agent → Tool → Agent所有消息会自动持久化。2 Human-in-the-loopLangGraph 支持interrupt() resume()流程Graph pause human input Graph resumeCheckpoint 保存暂停状态。3 Fault Tolerance系统崩溃时load latest checkpoint resume execution避免任务从头开始。4 Time Travel开发者可以rollback checkpoint重新执行 Graph。适用于debuggingexperiment十一、Checkpoint 生命周期完整生命周期Graph invoke │ ▼ create superstep │ ▼ generate checkpoint │ ▼ serialize snapshot │ ▼ persist storage │ ▼ load on next invocation十二、工程最佳实践1 生产环境使用数据库推荐PostgresSaver2 thread_id 必须稳定不要使用random uuid否则无法恢复会话。3 控制状态大小不要在 state 中存大文件embedding 向量建议store reference4 定期清理 checkpoint长期系统需要TTL archiving十三、总结LangGraph 的 Checkpoint 系统本质上是一套基于 Pregel 执行模型的状态版本化系统。其核心架构Pregel Runtime │ ▼ Super-step │ ▼ StateSnapshot │ ▼ Checkpoint │ ▼ Thread │ ▼ Checkpointer │ ▼ Storage Backend通过这一体系LangGraph 实现了持久化状态管理可恢复工作流执行Human-in-the-loopTime travel debugging这也是为什么 LangGraph 能够构建真正生产级 Agent 系统的关键原因。from dotenv import load_dotenv from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import InMemorySaver from langgraph.types import interrupt, Command load_dotenv() ############################################################ # 1 定义 Graph State ############################################################ class AgentState(TypedDict): user_input: str analysis: str decision: str result: str ############################################################ # 2 定义节点 ############################################################ def analyze_node(state: AgentState): print(\n--- analyze_node ---) analysis fanalysis of {state[user_input]} return { analysis: analysis } def human_review_node(state: AgentState): print(\n--- human_review_node ---) # interrupt 会创建 checkpoint 并暂停 feedback interrupt( { question: Human approval required, analysis: state[analysis] } ) return { decision: feedback[decision] } def execute_node(state: AgentState): print(\n--- execute_node ---) if state[decision] approve: result fEXECUTED: {state[analysis]} else: result REJECTED BY HUMAN return { result: result } ############################################################ # 3 构建 Graph ############################################################ builder StateGraph(AgentState) builder.add_node(analyze, analyze_node) builder.add_node(review, human_review_node) builder.add_node(execute, execute_node) builder.add_edge(START, analyze) builder.add_edge(analyze, review) builder.add_edge(review, execute) builder.add_edge(execute, END) ############################################################ # 4 创建 Checkpointer ############################################################ checkpointer InMemorySaver() app builder.compile(checkpointercheckpointer) ############################################################ # 5 配置 thread_id ############################################################ config { configurable: { thread_id: demo-session } } ############################################################ # 6 第一次执行会在 review 节点 interrupt ############################################################ print(\n RUN GRAPH ) result app.invoke( { user_input: analyze this data }, configconfig ) print(\nExecution paused.) print(Current state:) state app.get_state(config) print(state.values) ############################################################ # 7 查看 checkpoint 历史 ############################################################ print(\n CHECKPOINT HISTORY ) history list(app.get_state_history(config)) for i, snapshot in enumerate(history): print(f\nCheckpoint {i}) print(step:, snapshot.metadata.get(step)) print(next:, snapshot.next) print(values:, snapshot.values) ############################################################ # 8 Human Resume继续执行 ############################################################ print(\n RESUME GRAPH ) result app.invoke( Command(resume{decision: approve}), configconfig ) print(\nFinal result:) print(result) ############################################################ # 9 Time Travel 示例回滚到旧 checkpoint ############################################################ print(\n TIME TRAVEL ) # 获取历史 checkpoint history list(app.get_state_history(config)) # 选择 checkpoint 2初始状态完全重新执行 checkpoint_id history[2].config[configurable][checkpoint_id] rollback_config { configurable: { thread_id: demo-session, checkpoint_id: checkpoint_id } } state app.get_state(rollback_config) print(\nRollback state:) print(state.values) print(\nRe-run from this checkpoint (without resume - will hit interrupt again)) # Time travel: 回滚后不带 resume 重新执行会重新触发 interrupt result app.invoke( None, configrollback_config ) print(\nNew result after time travel:) print(result)
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2425943.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!