在“Agent状态是如何被写入通道的”中我们探讨了组成的Agent的节点在完成了自身操作后如何将执行结果通过写入相应的通道进而更新Agent的状态。本篇文章则讨论这个问题的另一面Agent的节点如何通过读取通道得到Agent的状态的。1. 利用ChannelRead读取状态通过“Agent状态是如何被写入通道的”我们知道作为Agent节点的PregelNode对象利用ChannelWrite提交它对通道的写入意图进而增量更新Agent的状态。ChannelRead与之相对它被PregelNode用于读取相应的通道进而得到所需的状态成员这个操作可以通过如下所示的静态方法do_read来完成。classChannelRead(RunnableCallable):staticmethoddefdo_read(config:RunnableConfig,*,select:str|list[str],fresh:boolFalse,mapper:Callable[[Any],Any]|NoneNone,)-Any静态方法do_read具有如下的参数config当前的运行时配置select 选择需要读取的通道如果没有指定将不会读取任何通道fresh如果设为 True它会强制获取当前状态下最新鲜的数据而不是可能被缓存的旧值。默认值为False以提高性能mapper 对读取的内容字典形式作进一步转换以生成最终返回的对象下面的程序演示了针对do_read方法的使用。我们创建了一个单节点的Pregel对象并为它定义了5个通道其中foo、bar、baz和select为输入通道前三个通道用于提供常规的输入select通道用于提供需要被节点读取的通道列表。节点订阅select通道并将该通道的值作为参数调用do_read方法作为运行时配置的config参数直接注入到节点函数。do_read方法返回的结果被写入result通道该通道也是Pregel的唯一输出通道。fromlanggraph.pregelimportPregel,NodeBuilderfromlanggraph.pregel._readimportChannelReadfromlanggraph.channelsimportLastValuefromtypingimportAnyfromlangchain_core.runnablesimportRunnableConfigdefread(channels:list[str],config:RunnableConfig)-Any:returnChannelRead.do_read(config,selectchannels)node(NodeBuilder().subscribe_only(select).write_to(result).do(read))appPregel(nodes{node:node},channels{foo:LastValue(str),bar:LastValue(str),baz:LastValue(str),select:LastValue(list),result:LastValue(Any)},input_channels[foo,bar,baz,select],output_channels[result],)input{foo:123,bar:456,baz:789}resultapp.invoke(input{**input,select:[foo]})assertresult[result]{foo:123}resultapp.invoke(input{**input,select:[foo,bar]})assertresult[result]{foo:123,bar:456}resultapp.invoke(input{**input,select:[foo,bar,baz]})assertresult[result]{foo:123,bar:456,baz:789}resultapp.invoke(input{**input,select:[]})assertresult[result]{}Pregel对象创建之后我们利用作为输入的字典为foo、bar和baz指定相同的值但设置不同的select列表。我们将输入字典作为参数对Pregel发起多次调用断言验证了返回值是否与select列表的一致性。2. ChannelRead静态方法do_read在ChannelRead中的定义非常简单针对选择通道的读取并未真正实现在该方法中一切都隐藏在它通过RunnableConfig中提取出来的这个可执行对象。classChannelRead(RunnableCallable):staticmethoddefdo_read(config:RunnableConfig,*,select:str|list[str],fresh:boolFalse,mapper:Callable[[Any],Any]|NoneNone,)-Any:try:read:READ_TYPEconfig[CONF][CONFIG_KEY_READ]exceptKeyError:raiseRuntimeError(Not configured with a read functionMake sure to call in the context of a Pregel process)ifmapper:returnmapper(read(select,fresh))else:returnread(select,fresh)RunnableConfig在LangChain执行引擎中被当成“数据总线”在用如果你对其底层实现稍有了解你会发现它将很多东西往里面扔。我不想评论这种设计的优劣但我个人是不太喜欢的。在“Agent状态是如何被写入通道的”中我们知道它在配置中为ChannelWrite的do_write方法存放了一个用于收集channel_name, channel_value二元组的函数对应的Key为“__pregel_send”通过常量CONFIG_KEY_SEND表示。这里也一样它在配置存放了一个同于读取通道的函数对应的Key为“__pregel_read”通过常量CONFIG_KEY_READ表示。通过给出的实现可以推断这个函数的签名为Callable[[list[str],bool],dict[str,Any]|Any]。3. local_read在“Agent状态是如何被写入通道的”提到过用于收集通道写入的函数是在创建节点对应执行任务的PregelExecutableTask对象时创建的携带此函数的RunnableConfig也被绑定在这个PregelExecutableTask对象上。这个用于通道读取的函数的创建和应用方式与之类似。这个被置于RunnableConfig中的Callable[[list[str],bool],dict[str,Any]|Any]是针对如下这个local_read函数常见的偏函数partial function。deflocal_read(scratchpad:PregelScratchpad,channels:Mapping[str,BaseChannel],managed:ManagedValueMapping,task:WritesProtocol,select:list[str]|str,fresh:boolFalse,)-dict[str,Any]|Any ManagedValueMappingdict[str,ManagedValueSpec]local_read函数具有如下的参数scratchpad在推进Pregel执行的每一步Superstep对会创建的PregelScratchpad,用于存放当前超步数以及服务于中断恢复执行和SubAgent执行的计数器。所有ManagedValue的值都是根据它计算出来的。“拆解LangChain执行引擎”系列中的很多文章都介绍过它channels所有通道与名称的映射managed所有ManagedValue的描述对象ManagedValueSpec与名称的映射task节点任务的协议主要偏重于描述节点产生的状态更新select选择读取的通道列表fresh如果设为 True它会强制获取当前状态下最新鲜的数据而不是可能被缓存的旧值。默认值为False以提高性能WritesProtocol协议定义如下上面提到过的用于表示节点执行任务的PregelExecutableTask就实现了这个协议。它提供的四个属性分别表示任务的路径SubAgent导致、名称、通道写入channel_name, channel_value二元组序列和订阅通道。classWritesProtocol(Protocol):propertydefpath(self)-tuple[str|int|tuple,...]:...propertydefname(self)-str:...propertydefwrites(self)-Sequence[tuple[str,Any]]:...propertydeftriggers(self)-Sequence[str]:...如下所示的是local_read函数的完整实现它利用select参数从channels和managed筛选出通道和ManagedValueSpec然后调用read_channels将选择的通道和ManagedValue的值读出来。如果fresh参数被设置成True它会对选择出来的通道先实施写操作来清除缓存。deflocal_read(scratchpad:PregelScratchpad,channels:Mapping[str,BaseChannel],managed:ManagedValueMapping,task:WritesProtocol,select:list[str]|str,fresh:boolFalse,)-dict[str,Any]|Any:updated:dict[str,list[Any]]defaultdict(list)ifisinstance(select,str):managed_keys[]forc,vintask.writes:ifcselect:updated[c].append(v)else:managed_keys[kforkinselectifkinmanaged]select[kforkinselectifknotinmanaged]forc,vintask.writes:ifcinselect:updated[c].append(v)iffresh:# apply writeslocal_channels:dict[str,BaseChannel]{}forkinchannels:ccchannels[k].copy()cc.update(updated[k])local_channels[k]cc# read fresh valuesvaluesread_channels(local_channels,select)else:valuesread_channels(channels,select)ifmanaged_keys:values.update({k:managed[k].get(scratchpad)forkinmanaged_keys})returnvalues在为节点创建任务的时候它会提取节点的名称、订阅通道和当前的PregelScratchpad、所有通道和描述所有ManagedValue的ManagedValueSpec与其名称的映射字典按照如下的方式调用创建local_read的偏函数。这个偏函数被置于绑定到节点任务的RunnableConfig配置中供ChannelRead的静态方法do_read使用。代码中的那个writes变量就是那个用于收集通道写入意图的双端队列deque。name...writes...triggers...scratchpad...channels...managed...partial(local_read,scratchpad,channels,managed,PregelTaskWrites(task_path[:3],name,writes,triggers,),),classPregelTaskWrites(NamedTuple):path:tuple[str|int|tuple,...]name:strwrites:Sequence[tuple[str,Any]]triggers:Sequence[str]顺便说说为什么叫 “local”在LangGraph的并行执行模型中当一个节点执行完毕并返回结果时这些结果属于“局部更新”尚未合并到全局状态中。4. read_channelslocal_read函数用于读取通道的read_channels函数定义如下。没什么特别之处最终就是调用BaseChannel的get方法并在此基础上做了一些必要的异常处理而已。defread_channels(channels:Mapping[str,BaseChannel],select:Sequence[str]|str,*,skip_empty:boolTrue,)-dict[str,Any]|Any:ifisinstance(select,str):returnread_channel(channels,select)else:values:dict[str,Any]{}forkinselect:try:values[k]read_channel(channels,k,catchnotskip_empty)exceptEmptyChannelError:passreturnvaluesdefread_channel(channels:Mapping[str,BaseChannel],chan:str,*,catch:boolTrue,)-Any:try:returnchannels[chan].get()exceptEmptyChannelError:ifcatch:returnNoneelse:raise5. 作为Runnable的ChannelRead对于ChannelRead我们仅仅介绍了它的静态方法do_read而它自身是一个继承自RunnableCallable的可执行对象从如下的定义可以看出最终用来读取通道的_read和_aread方法最终还是调用的静态方法do_read。classChannelRead(RunnableCallable):channel:str|list[str]fresh:boolFalsemapper:Callable[[Any],Any]|NoneNonedef__init__(self,channel:str|list[str],*,fresh:boolFalse,mapper:Callable[[Any],Any]|NoneNone,tags:list[str]|NoneNone,)-None:super().__init__(funcself._read,afuncself._aread,tagstags,nameNone,traceFalse,)self.freshfresh self.mappermapper self.channelchanneldefget_name(self,suffix:str|NoneNone,*,name:str|NoneNone)-str:ifname:passelifisinstance(self.channel,str):namefChannelRead{self.channel}else:namefChannelRead{,.join(self.channel)}returnsuper().get_name(suffix,namename)def_read(self,_:Any,config:RunnableConfig)-Any:returnself.do_read(config,selectself.channel,freshself.fresh,mapperself.mapper)asyncdef_aread(self,_:Any,config:RunnableConfig)-Any:returnself.do_read(config,selectself.channel,freshself.fresh,mapperself.mapper)
[拆解LangChain执行引擎]Agent状态是如何从通道读取的?
在“Agent状态是如何被写入通道的”中我们探讨了组成的Agent的节点在完成了自身操作后如何将执行结果通过写入相应的通道进而更新Agent的状态。本篇文章则讨论这个问题的另一面Agent的节点如何通过读取通道得到Agent的状态的。1. 利用ChannelRead读取状态通过“Agent状态是如何被写入通道的”我们知道作为Agent节点的PregelNode对象利用ChannelWrite提交它对通道的写入意图进而增量更新Agent的状态。ChannelRead与之相对它被PregelNode用于读取相应的通道进而得到所需的状态成员这个操作可以通过如下所示的静态方法do_read来完成。classChannelRead(RunnableCallable):staticmethoddefdo_read(config:RunnableConfig,*,select:str|list[str],fresh:boolFalse,mapper:Callable[[Any],Any]|NoneNone,)-Any静态方法do_read具有如下的参数config当前的运行时配置select 选择需要读取的通道如果没有指定将不会读取任何通道fresh如果设为 True它会强制获取当前状态下最新鲜的数据而不是可能被缓存的旧值。默认值为False以提高性能mapper 对读取的内容字典形式作进一步转换以生成最终返回的对象下面的程序演示了针对do_read方法的使用。我们创建了一个单节点的Pregel对象并为它定义了5个通道其中foo、bar、baz和select为输入通道前三个通道用于提供常规的输入select通道用于提供需要被节点读取的通道列表。节点订阅select通道并将该通道的值作为参数调用do_read方法作为运行时配置的config参数直接注入到节点函数。do_read方法返回的结果被写入result通道该通道也是Pregel的唯一输出通道。fromlanggraph.pregelimportPregel,NodeBuilderfromlanggraph.pregel._readimportChannelReadfromlanggraph.channelsimportLastValuefromtypingimportAnyfromlangchain_core.runnablesimportRunnableConfigdefread(channels:list[str],config:RunnableConfig)-Any:returnChannelRead.do_read(config,selectchannels)node(NodeBuilder().subscribe_only(select).write_to(result).do(read))appPregel(nodes{node:node},channels{foo:LastValue(str),bar:LastValue(str),baz:LastValue(str),select:LastValue(list),result:LastValue(Any)},input_channels[foo,bar,baz,select],output_channels[result],)input{foo:123,bar:456,baz:789}resultapp.invoke(input{**input,select:[foo]})assertresult[result]{foo:123}resultapp.invoke(input{**input,select:[foo,bar]})assertresult[result]{foo:123,bar:456}resultapp.invoke(input{**input,select:[foo,bar,baz]})assertresult[result]{foo:123,bar:456,baz:789}resultapp.invoke(input{**input,select:[]})assertresult[result]{}Pregel对象创建之后我们利用作为输入的字典为foo、bar和baz指定相同的值但设置不同的select列表。我们将输入字典作为参数对Pregel发起多次调用断言验证了返回值是否与select列表的一致性。2. ChannelRead静态方法do_read在ChannelRead中的定义非常简单针对选择通道的读取并未真正实现在该方法中一切都隐藏在它通过RunnableConfig中提取出来的这个可执行对象。classChannelRead(RunnableCallable):staticmethoddefdo_read(config:RunnableConfig,*,select:str|list[str],fresh:boolFalse,mapper:Callable[[Any],Any]|NoneNone,)-Any:try:read:READ_TYPEconfig[CONF][CONFIG_KEY_READ]exceptKeyError:raiseRuntimeError(Not configured with a read functionMake sure to call in the context of a Pregel process)ifmapper:returnmapper(read(select,fresh))else:returnread(select,fresh)RunnableConfig在LangChain执行引擎中被当成“数据总线”在用如果你对其底层实现稍有了解你会发现它将很多东西往里面扔。我不想评论这种设计的优劣但我个人是不太喜欢的。在“Agent状态是如何被写入通道的”中我们知道它在配置中为ChannelWrite的do_write方法存放了一个用于收集channel_name, channel_value二元组的函数对应的Key为“__pregel_send”通过常量CONFIG_KEY_SEND表示。这里也一样它在配置存放了一个同于读取通道的函数对应的Key为“__pregel_read”通过常量CONFIG_KEY_READ表示。通过给出的实现可以推断这个函数的签名为Callable[[list[str],bool],dict[str,Any]|Any]。3. local_read在“Agent状态是如何被写入通道的”提到过用于收集通道写入的函数是在创建节点对应执行任务的PregelExecutableTask对象时创建的携带此函数的RunnableConfig也被绑定在这个PregelExecutableTask对象上。这个用于通道读取的函数的创建和应用方式与之类似。这个被置于RunnableConfig中的Callable[[list[str],bool],dict[str,Any]|Any]是针对如下这个local_read函数常见的偏函数partial function。deflocal_read(scratchpad:PregelScratchpad,channels:Mapping[str,BaseChannel],managed:ManagedValueMapping,task:WritesProtocol,select:list[str]|str,fresh:boolFalse,)-dict[str,Any]|Any ManagedValueMappingdict[str,ManagedValueSpec]local_read函数具有如下的参数scratchpad在推进Pregel执行的每一步Superstep对会创建的PregelScratchpad,用于存放当前超步数以及服务于中断恢复执行和SubAgent执行的计数器。所有ManagedValue的值都是根据它计算出来的。“拆解LangChain执行引擎”系列中的很多文章都介绍过它channels所有通道与名称的映射managed所有ManagedValue的描述对象ManagedValueSpec与名称的映射task节点任务的协议主要偏重于描述节点产生的状态更新select选择读取的通道列表fresh如果设为 True它会强制获取当前状态下最新鲜的数据而不是可能被缓存的旧值。默认值为False以提高性能WritesProtocol协议定义如下上面提到过的用于表示节点执行任务的PregelExecutableTask就实现了这个协议。它提供的四个属性分别表示任务的路径SubAgent导致、名称、通道写入channel_name, channel_value二元组序列和订阅通道。classWritesProtocol(Protocol):propertydefpath(self)-tuple[str|int|tuple,...]:...propertydefname(self)-str:...propertydefwrites(self)-Sequence[tuple[str,Any]]:...propertydeftriggers(self)-Sequence[str]:...如下所示的是local_read函数的完整实现它利用select参数从channels和managed筛选出通道和ManagedValueSpec然后调用read_channels将选择的通道和ManagedValue的值读出来。如果fresh参数被设置成True它会对选择出来的通道先实施写操作来清除缓存。deflocal_read(scratchpad:PregelScratchpad,channels:Mapping[str,BaseChannel],managed:ManagedValueMapping,task:WritesProtocol,select:list[str]|str,fresh:boolFalse,)-dict[str,Any]|Any:updated:dict[str,list[Any]]defaultdict(list)ifisinstance(select,str):managed_keys[]forc,vintask.writes:ifcselect:updated[c].append(v)else:managed_keys[kforkinselectifkinmanaged]select[kforkinselectifknotinmanaged]forc,vintask.writes:ifcinselect:updated[c].append(v)iffresh:# apply writeslocal_channels:dict[str,BaseChannel]{}forkinchannels:ccchannels[k].copy()cc.update(updated[k])local_channels[k]cc# read fresh valuesvaluesread_channels(local_channels,select)else:valuesread_channels(channels,select)ifmanaged_keys:values.update({k:managed[k].get(scratchpad)forkinmanaged_keys})returnvalues在为节点创建任务的时候它会提取节点的名称、订阅通道和当前的PregelScratchpad、所有通道和描述所有ManagedValue的ManagedValueSpec与其名称的映射字典按照如下的方式调用创建local_read的偏函数。这个偏函数被置于绑定到节点任务的RunnableConfig配置中供ChannelRead的静态方法do_read使用。代码中的那个writes变量就是那个用于收集通道写入意图的双端队列deque。name...writes...triggers...scratchpad...channels...managed...partial(local_read,scratchpad,channels,managed,PregelTaskWrites(task_path[:3],name,writes,triggers,),),classPregelTaskWrites(NamedTuple):path:tuple[str|int|tuple,...]name:strwrites:Sequence[tuple[str,Any]]triggers:Sequence[str]顺便说说为什么叫 “local”在LangGraph的并行执行模型中当一个节点执行完毕并返回结果时这些结果属于“局部更新”尚未合并到全局状态中。4. read_channelslocal_read函数用于读取通道的read_channels函数定义如下。没什么特别之处最终就是调用BaseChannel的get方法并在此基础上做了一些必要的异常处理而已。defread_channels(channels:Mapping[str,BaseChannel],select:Sequence[str]|str,*,skip_empty:boolTrue,)-dict[str,Any]|Any:ifisinstance(select,str):returnread_channel(channels,select)else:values:dict[str,Any]{}forkinselect:try:values[k]read_channel(channels,k,catchnotskip_empty)exceptEmptyChannelError:passreturnvaluesdefread_channel(channels:Mapping[str,BaseChannel],chan:str,*,catch:boolTrue,)-Any:try:returnchannels[chan].get()exceptEmptyChannelError:ifcatch:returnNoneelse:raise5. 作为Runnable的ChannelRead对于ChannelRead我们仅仅介绍了它的静态方法do_read而它自身是一个继承自RunnableCallable的可执行对象从如下的定义可以看出最终用来读取通道的_read和_aread方法最终还是调用的静态方法do_read。classChannelRead(RunnableCallable):channel:str|list[str]fresh:boolFalsemapper:Callable[[Any],Any]|NoneNonedef__init__(self,channel:str|list[str],*,fresh:boolFalse,mapper:Callable[[Any],Any]|NoneNone,tags:list[str]|NoneNone,)-None:super().__init__(funcself._read,afuncself._aread,tagstags,nameNone,traceFalse,)self.freshfresh self.mappermapper self.channelchanneldefget_name(self,suffix:str|NoneNone,*,name:str|NoneNone)-str:ifname:passelifisinstance(self.channel,str):namefChannelRead{self.channel}else:namefChannelRead{,.join(self.channel)}returnsuper().get_name(suffix,namename)def_read(self,_:Any,config:RunnableConfig)-Any:returnself.do_read(config,selectself.channel,freshself.fresh,mapperself.mapper)asyncdef_aread(self,_:Any,config:RunnableConfig)-Any:returnself.do_read(config,selectself.channel,freshself.fresh,mapperself.mapper)