Spring AI Alibaba 1.x 系列【55】Interrupts 中断机制:静态中断源码分析
文章目录1. interruptBefore 模式1.1 中断判断逻辑1.2 构建中断元数据1.3 返回中断响应1.4 初始化【中断执行】上下文1.5 合并状态BUG1.6 执行结束2. interruptsAfter 模式2.1 设置 INTERRUPT_AFTER 标记2.2 动态计算下一个节点3. 中断时机对比1. interruptBefore 模式核心特性执行前中断节点业务逻辑执行前触发静态配置节点ID配置在interruptsBefore列表无动态路由恢复后直接执行原节点1.1 中断判断逻辑MainGraphExecutor#execute关于中断处理的核心逻辑中断判断调用上下文方法校验是否满足流程中断条件元数据构建记录当前节点ID 克隆的状态数据防止原状态被篡改响应返回返回流程中断完成的响应携带中断信息// 3. 判断是否需要中断 → 触发中断返回中断元数据if(context.shouldInterrupt()){InterruptionMetadatametadataInterruptionMetadata.builder(context.getCurrentNodeId(),context.cloneState(context.getCurrentStateData())).build();returnFlux.just(GraphResponse.done(metadata));}GraphRunnerContext#shouldInterrupt()是中断判断的入口只要执行前中断或执行后中断满足一个就会返回true:/** * 判断当前流程是否需要中断 * return true-需要中断false-不需要中断 */publicbooleanshouldInterrupt(){// 满足【节点执行前中断】 或 【节点执行后中断】 任一条件即触发中断returnshouldInterruptBefore(nextNodeId,currentNodeId)||shouldInterruptAfter(currentNodeId,nextNodeId);}执行前中断shouldInterruptBefore()判断/** * 判断是否需要在【目标节点执行前】进行中断 * param nodeId 即将执行的下一个节点ID * param previousNodeId 当前执行完成的节点ID * return true-执行前中断false-不中断 */privatebooleanshouldInterruptBefore(StringnodeId,StringpreviousNodeId){// 无上一个节点流程初始状态不执行前置中断if(previousNodeIdnull)returnfalse;// 判断节点ID是否配置在【执行前中断点】集合中returncompiledGraph.compileConfig.interruptsBefore().contains(nodeId);}执行后中断shouldInterruptAfter()判断/** * 判断是否需要在【当前节点执行后】进行中断 * param nodeId 即将执行的下一个节点ID * param previousNodeId 当前执行完成的节点ID * return true-执行后中断false-不中断 */privatebooleanshouldInterruptAfter(StringnodeId,StringpreviousNodeId){// 无下一个节点 或 下一个节点与当前节点是同一个不执行后置中断if(nodeIdnull||Objects.equals(nodeId,previousNodeId))returnfalse;// 满足任一条件则后置中断// 1. 开启了条件边执行前中断 节点标记为固定后置中断节点// 2. 节点ID配置在【执行后中断点】集合中return(compiledGraph.compileConfig.interruptBeforeEdge()Objects.equals(nodeId,INTERRUPT_AFTER))||compiledGraph.compileConfig.interruptsAfter().contains(nodeId);}1.2 构建中断元数据需要中断时会构建InterruptionMetadata// 创建中断元数据包含当前节点ID和状态InterruptionMetadatametadataInterruptionMetadata.builder(context.getCurrentNodeId(),context.cloneState(context.getCurrentStateData())).build();构建参数currentNodeId当前执行中断的节点名称currentStateData调用overallState.data()获取的状态data数据build()构建逻辑// 1. 私有构造方法只能通过内部 Builder 类创建对象privateInterruptionMetadata(Builderbuilder){// 2. 调用父类构造方法传入 builder 中的 nodeId 和 statesuper(builder.nodeId,builder.state);// 3. 给当前类的 metadata 字段赋值直接引用 builder 中的值this.metadatabuilder.metadata();// 4. 给 toolFeedbacks 赋值创建新 ArrayList拷贝 builder 中的集合this.toolFeedbacksnewArrayList(builder.toolFeedbacks);// 5. 安全赋值 toolsAutomaticallyApproved空值防护if(builder.toolsAutomaticallyApproved!null){this.toolsAutomaticallyApprovedbuilder.toolsAutomaticallyApproved;}else{// 6. 如果 builder 中为 null赋值为空集合避免后续 NPEthis.toolsAutomaticallyApprovednewArrayList();}}构建完成后的对象1.3 返回中断响应MainGraphExecutor#execute将中断数据包装为GraphResponse返回returnFlux.just(GraphResponse.done(metadata));调用方需要判断输出类型为InterruptionMetadata时说明流程中断了需要向用户显示可处理的操作用户执行操作后进入到流程恢复阶段。1.4 初始化【中断执行】上下文用户操作后进入到流程恢复阶段首先进入到GraphRunner#run方法初始化【执行】上下文再调用执行器执行和上篇动态中断一致。1.5 合并状态BUG和上篇动态中断一致。1.6 执行结束合并状态之后说明这个暂停节点已经被正式恢复了按照正常流程继续执行直到结束。2. interruptsAfter 模式核心特性执行后中断节点业务逻辑执行完成后触发动态路由开启interruptBeforeEdge恢复时重新计算下一个节点interruptsAfter和interruptBefore的处理流程一致主要区别是interruptsAfter支持配置interruptBeforeEdge参数支持在恢复时动态计算下一个节点所以下面只介绍下不一样的地方。2.1 设置 INTERRUPT_AFTER 标记在NodeExecutor节点执行完成处理时当前节点是否配置了【执行后中断】并开启了interruptBeforeEdge配置 会将下一个节点设置为固定的INTERRUPT_AFTER// 判断是否开启【边中断】机制 当前节点是否配置了【执行后中断】if(context.getCompiledGraph().compileConfig.interruptBeforeEdge()context.getCompiledGraph().compileConfig.interruptsAfter().contains(context.getCurrentNodeId())){// // 场景节点执行后走【中断】不自动走向下一个节点// 下一个节点设置为固定的 INTERRUPT_AFTER// context.setNextNodeId(INTERRUPT_AFTER);}else{// // 正常场景根据当前状态 节点路由规则计算下一个真实节点// CommandnextCommandcontext.nextNodeId(context.getCurrentNodeId(),context.getCurrentStateData());context.setNextNodeId(nextCommand.gotoNode());}2.2 动态计算下一个节点创建好上下文后调用MainGraphExecutor执行中进入到处理中断恢复处理逻辑如果配置了interruptBeforeEdge true且下一个节点是INTERRUPT_AFTER说明要重新计算下一个节点// 从哪里中断的由 创建上下文时设置finalvarresumeFromcontext.getResumeFromAndReset();if(resumeFrom.isPresent()){// 开启延迟路由 当前是中断标记 → 重新计算下一个节点if(context.getCompiledGraph().compileConfig.interruptBeforeEdge()java.util.Objects.equals(context.getNextNodeId(),INTERRUPT_AFTER)){varnextNodecontext.nextNodeId(resumeFrom.get(),context.getCurrentStateData());context.setNextNodeId(nextNode.gotoNode());context.setCurrentNodeId(null);}}3. 中断时机对比中断类型触发位置状态已更新nextNodeId 已计算checkpoint 已创建适用场景interruptsBeforeMainGraphExecutor❌❌❌节点执行前审批interruptsAfterMainGraphExecutor✅✅✅节点执行后审批interruptBeforeEdgeNodeExecutor✅❌ (INTERRUPT_AFTER)✅分支决策前审批InterruptableAction.interrupt()NodeExecutor❌❌❌自定义执行前中断InterruptableAction.interruptAfter()NodeExecutor✅ (先合并)✅✅自定义执行后中断
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2633546.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!