[拆解LangChain执行引擎]一个实例理解LangChain的几种流模式
invoke/ainvoke方法看起来是采用简单的请求/回复消息交换模式客户端需等待整个流程执行完毕后才能得到结果其实方法背后还是会调用stream/astream方法以流的方式进行交互。如果我们直接调用调用这两个方法并采用相应的流模式我们就能有效解决客户端长时间无响应的问题实时地得到对方的反馈。class Pregel( PregelProtocol[StateT, ContextT, InputT, OutputT], Generic[StateT, ContextT, InputT, OutputT]): def stream( self, input: InputT | Command | None, config: RunnableConfig | None None, *, context: ContextT | None None, stream_mode: StreamMode | Sequence[StreamMode] | None None, print_mode: StreamMode | Sequence[StreamMode] (), output_keys: str | Sequence[str] | None None, interrupt_before: All | Sequence[str] | None None, interrupt_after: All | Sequence[str] | None None, durability: Durability | None None, subgraphs: bool False, debug: bool | None None, **kwargs: Unpack[DeprecatedKwargs], ) - Iterator[dict[str, Any] | Any] async def astream( self, input: InputT | Command | None, config: RunnableConfig | None None, *, context: ContextT | None None, stream_mode: StreamMode | Sequence[StreamMode] | None None, print_mode: StreamMode | Sequence[StreamMode] (), output_keys: str | Sequence[str] | None None, interrupt_before: All | Sequence[str] | None None, interrupt_after: All | Sequence[str] | None None, durability: Durability | None None, subgraphs: bool False, debug: bool | None None, **kwargs: Unpack[DeprecatedKwargs], ) - AsyncIterator[dict[str, Any] | Any]在stream/astream方法的众多参数中表示流模式的stream_mode参数最为重要其对应类型StreamMode以字符串字面量的形式定义了七个选项。流是Pregel引擎向调用者提供数据的基本工作方法它采用订阅发布的形式。Pregel对象发布的内容由对它订阅决定因为发布客户端不敢兴趣的内容不但毫无意义而且还会对影响造成极大的影响。StreamMode Literal[ values, updates, checkpoints, tasks, debug, messages, custom ]在调用stream/astream方法时我们可以根据需要指定一个或者多个流模式以StreamMode序列的形式。如果没有显式设置NonePregel对象自身的stream_mode字段会作为兜底该字典的默认值为“values。如果当前Pregel对象以子图的形式被调用会默认使用values模式subgraphs参数用于控制是否希望得到子图的输出。下面列出了七种流模式对应的输出内容values在每个Superstep结束后输出全部Channel的值updates针对每个Node输出由它更新的Channel值checkpoints在创建新的Checkpoint的时候输出与get_state方法返回值具有相同结构的内容tasks在Node任务开始和结束的时候输出任务ID、Node名称和其他相关信息debug可以简单认为是tasks checkpointsmessages输出语言模型产生的Token和相关元数据开发者在Node处理函数中利用StreamWriter自行输出的内容如果混合使用多种流模式stream/astream方法会返回一个字典自带的Key表示当前输出采用的流模式。对于单一模式的调用会直接返回输出的内容。如果采用custom模式Node处理方法可以利用SteamWriter向客户端实时输出自定义的内容。StreamWriter和静态上下文一样都属于当前Runtime的一部分后者可以利用注入Node处理函数的RunnableConfig提取。下面演示程序会使用所有的流模式我们在每个Node的处理函数中利用StreamWriter输出当前的Node名称。from langgraph.checkpoint.memory import InMemorySaver from langgraph.pregel import Pregel, NodeBuilder from langgraph.channels import LastValue, BinaryOperatorAggregate import operator from functools import partial from langchain_core.runnables import RunnableConfig from typing import Any,Sequence from langgraph.runtime import Runtime from langgraph.types import StreamWriter,StreamMode from collections import defaultdict def handle(node: str, inputs: dict[str, Any], config: RunnableConfig) - list[str]: runtime:Runtime config[configurable].get(__pregel_runtime) writer:StreamWriter runtime.stream_writer writer(fnode {node} is called.) return [node] foo (NodeBuilder() .subscribe_to(foo,read False) .do(partial(handle, foo)) .write_to(bartriggered by foo) ) bar1 (NodeBuilder() .subscribe_to(bar,read False) .do(partial(handle, bar1)) .write_to(output) ) bar2 (NodeBuilder() .subscribe_to(bar,read False) .do(partial(handle, bar2)) .write_to(output)) app Pregel( nodes{foo: foo, bar1: bar1, bar2: bar2}, channels{ foo: LastValue(str), bar: LastValue(str), output: BinaryOperatorAggregate(list, operator.add), }, input_channels[foo], output_channels[output], checkpointerInMemorySaver(), ) config{configurable: {thread_id: 123}} stream_mode: Sequence[StreamMode] [values, updates,checkpoints,tasks,debug,custom] result: defaultdict[str, list[str]] defaultdict(list) for (mode,chunk) in app.stream(input{foo: None}, stream_mode stream_mode, configconfig): result[mode].append(chunk) for mode,chunks in result.items(): index 1 for chunk in chunks: print(f{index}.[{mode}] {chunk}) index 1 print()创建的Pregel由节点foo、bar1和bar2构成。节点foo率先执行bar1和bar2随后并行执行。我们将调用stream方法收集到的内容根据流模式分组进行输出。我们来分析一下如下的输出结果由于三个Node都会涉及到针对Channel的更新所以会有三个updates模式的输出。整个流程涉及三个Supperstep两个values模式的输出的全量的状态对应于后两个Superstep。三个Node对应三个任务所以具有六个tasks模式的输出反映这三个任务的开始和结束。经历的三个Superstep对应三次Checkpoint的创建所以我们能看到三个checkpoints模式的输出。六个tasks加三个checkpoints所以有九个debug模式的输出。三个Node中针对StreamWriter的调用对应三个custom模式的输出。1.[checkpoints] {config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9ea-6dad-bfff-aaf158b4ced6}}, parent_config: None, values: {foo: None, output: []}, metadata: {source: input, step: -1, parents: {}}, next: [foo], tasks: [{id: d40d3fbc-0f70-33fe-1b31-51a3e12846fe, name: foo, interrupts: (), state: None}]} 2.[checkpoints] {config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9ef-658f-8000-fe11b210fb05}}, parent_config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9ea-6dad-bfff-aaf158b4ced6}}, values: {foo: None, bar: triggered by foo, output: []}, metadata: {source: loop, step: 0, parents: {}}, next: [bar1, bar2], tasks: [{id: ca9f30fb-ebea-87bf-074c-9a397616125a, name: bar1, interrupts: (), state: None}, {id: c4540722-5a7a-4df3-14e1-abe0fddc9d73, name: bar2, interrupts: (), state: None}]} 3.[checkpoints] {config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9f3-6cbf-8001-e4219a2f4b58}}, parent_config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9ef-658f-8000-fe11b210fb05}}, values: {foo: None, bar: triggered by foo, output: [bar1, bar2]}, metadata: {source: loop, step: 1, parents: {}}, next: [], tasks: []} 1.[debug] {step: -1, timestamp: 2026-01-27T00:08:26.86591800:00, type: checkpoint, payload: {config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9ea-6dad-bfff-aaf158b4ced6}}, parent_config: None, values: {foo: None, output: []}, metadata: {source: input, step: -1, parents: {}}, next: [foo], tasks: [{id: d40d3fbc-0f70-33fe-1b31-51a3e12846fe, name: foo, interrupts: (), state: None}]}} 2.[debug] {step: 0, timestamp: 2026-01-27T00:08:26.86593300:00, type: task, payload: {id: d40d3fbc-0f70-33fe-1b31-51a3e12846fe, name: foo, input: {}, triggers: (foo,)}} 3.[debug] {step: 0, timestamp: 2026-01-27T00:08:26.86651600:00, type: task_result, payload: {id: d40d3fbc-0f70-33fe-1b31-51a3e12846fe, name: foo, error: None, result: {bar: triggered by foo}, interrupts: []}} 4.[debug] {step: 0, timestamp: 2026-01-27T00:08:26.86732600:00, type: checkpoint, payload: {config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9ef-658f-8000-fe11b210fb05}}, parent_config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9ea-6dad-bfff-aaf158b4ced6}}, values: {foo: None, bar: triggered by foo, output: []}, metadata: {source: loop, step: 0, parents: {}}, next: [bar1, bar2], tasks: [{id: ca9f30fb-ebea-87bf-074c-9a397616125a, name: bar1, interrupts: (), state: None}, {id: c4540722-5a7a-4df3-14e1-abe0fddc9d73, name: bar2, interrupts: (), state: None}]}} 5.[debug] {step: 1, timestamp: 2026-01-27T00:08:26.86733800:00, type: task, payload: {id: ca9f30fb-ebea-87bf-074c-9a397616125a, name: bar1, input: {}, triggers: (bar,)}} 6.[debug] {step: 1, timestamp: 2026-01-27T00:08:26.86734300:00, type: task, payload: {id: c4540722-5a7a-4df3-14e1-abe0fddc9d73, name: bar2, input: {}, triggers: (bar,)}} 7.[debug] {step: 1, timestamp: 2026-01-27T00:08:26.86804300:00, type: task_result, payload: {id: c4540722-5a7a-4df3-14e1-abe0fddc9d73, name: bar2, error: None, result: {output: [bar2]}, interrupts: []}} 8.[debug] {step: 1, timestamp: 2026-01-27T00:08:26.86811700:00, type: task_result, payload: {id: ca9f30fb-ebea-87bf-074c-9a397616125a, name: bar1, error: None, result: {output: [bar1]}, interrupts: []}} 9.[debug] {step: 1, timestamp: 2026-01-27T00:08:26.86853300:00, type: checkpoint, payload: {config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9f3-6cbf-8001-e4219a2f4b58}}, parent_config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9ef-658f-8000-fe11b210fb05}}, values: {foo: None, bar: triggered by foo, output: [bar1, bar2]}, metadata: {source: loop, step: 1, parents: {}}, next: [], tasks: []}} 1.[tasks] {id: d40d3fbc-0f70-33fe-1b31-51a3e12846fe, name: foo, input: {}, triggers: (foo,)} 2.[tasks] {id: d40d3fbc-0f70-33fe-1b31-51a3e12846fe, name: foo, error: None, result: {bar: triggered by foo}, interrupts: []} 3.[tasks] {id: ca9f30fb-ebea-87bf-074c-9a397616125a, name: bar1, input: {}, triggers: (bar,)} 4.[tasks] {id: c4540722-5a7a-4df3-14e1-abe0fddc9d73, name: bar2, input: {}, triggers: (bar,)} 5.[tasks] {id: c4540722-5a7a-4df3-14e1-abe0fddc9d73, name: bar2, error: None, result: {output: [bar2]}, interrupts: []} 6.[tasks] {id: ca9f30fb-ebea-87bf-074c-9a397616125a, name: bar1, error: None, result: {output: [bar1]}, interrupts: []} 1.[custom] node foo is called. 2.[custom] node bar2 is called. 3.[custom] node bar1 is called. 1.[updates] {foo: {bar: triggered by foo}} 2.[updates] {bar2: {output: [bar2]}} 3.[updates] {bar1: {output: [bar1]}} 1.[values] {foo: None, bar: triggered by foo, output: []} 2.[values] {foo: None, bar: triggered by foo, output: [bar1, bar2]}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2484955.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!