大模型长期记忆同步:多 Agent 间的消息路由机制设计

大模型长期记忆同步:多 Agent 间的消息路由机制设计 大模型长期记忆同步多 Agent 间的消息路由机制设计前言多 Agent 系统里最头疼的问题就是Agent 之间信息不同步。Agent A 记住了用户的需求Agent B 一无所知。这个问题本质是消息路由和状态一致性问题。本文设计了一套方案把长期记忆变成可路由的消息在 Agent 之间同步。今天聊聊。一、底层原理1.1 长期记忆的消息路由长期记忆在多个 Agent 之间同步本质是一个消息传递问题graph TD A[Agent A 记忆更新] -- B[生成记忆消息] B -- C[消息路由] C -- D[Agent B] C -- E[Agent C] C -- F[Agent D] D -- G[更新本地记忆] E -- H[更新本地记忆] F -- I[更新本地记忆] G -- J{确认} J --|成功| K[ACK] J --|失败| L[重试]核心挑战记忆更新要及时同步冲突要能解决记忆不能丢失性能不能太差1.2 同步方案对比方案一致性延迟复杂度广播强高低订阅最终中中中心化强低中去中心最终高高二、快速上手2.1 基础消息路由from typing import Dict, List, Any, Callable from dataclasses import dataclass import uuid import time dataclass class MemoryMessage: id: str key: str value: Any source_agent: str timestamp: float version: int class MemoryRouter: def __init__(self): self.subscribers: Dict[str, List[Callable]] {} self.message_log: List[MemoryMessage] [] def subscribe(self, agent_name: str, callback: Callable): if agent_name not in self.subscribers: self.subscribers[agent_name] [] self.subscribers[agent_name].append(callback) def publish(self, key: str, value: Any, source: str): msg MemoryMessage( idstr(uuid.uuid4()), keykey, valuevalue, source_agentsource, timestamptime.time(), version1 ) self.message_log.append(msg) for agent, callbacks in self.subscribers.items(): if agent ! source: for cb in callbacks: cb(msg) def get_history(self, key: str) - List[MemoryMessage]: return [m for m in self.message_log if m.key key]三、核心 API / 深水区3.1 记忆同步机制速查机制作用实现事件广播通知更新发布订阅版本向量冲突检测版本号最后写入冲突解决时间戳定期同步保证最终一致后台任务3.2 版本控制解决冲突class VersionVector: def __init__(self): self.versions: Dict[str, int] {} def increment(self, agent: str): self.versions[agent] self.versions.get(agent, 0) 1 def get(self, agent: str) - int: return self.versions.get(agent, 0) def merge(self, other: VersionVector): for agent, ver in other.versions.items(): self.versions[agent] max( self.versions.get(agent, 0), ver ) class ConflictResolver: def resolve(self, local: Dict, remote: Dict) - Dict: if remote[version] local[version]: return remote elif local[version] remote[version]: return local else: # 版本相同以时间戳为准 if remote[timestamp] local[timestamp]: return remote return local3.3 带重试的路由import time class ReliableRouter: def __init__(self, max_retries3): self.max_retries max_retries self.pending: List[MemoryMessage] [] def send(self, msg: MemoryMessage, destination: str) - bool: for attempt in range(self.max_retries): try: self._deliver(msg, destination) return True except Exception: if attempt self.max_retries - 1: self.pending.append(msg) return False time.sleep(0.1 * (attempt 1)) return False def _deliver(self, msg, dest): # 实际投递逻辑 pass def retry_pending(self): for msg in self.pending[:]: for dest in self._get_destinations(msg): if self.send(msg, dest): self.pending.remove(msg)四、实战演练完整的记忆同步系统from typing import Dict, List, Any, Optional from dataclasses import dataclass, field import uuid import time import json dataclass class SyncEvent: event_id: str agent_name: str memory_key: str memory_value: Any timestamp: float version: int class MemorySyncAgent: def __init__(self, name: str): self.name name self.local_memory: Dict[str, Any] {} self.version_vector: Dict[str, int] {} self.event_log: List[SyncEvent] [] self.peers: List[MemorySyncAgent] [] def add_peer(self, agent: MemorySyncAgent): self.peers.append(agent) def store(self, key: str, value: Any): self.version_vector[key] self.version_vector.get(key, 0) 1 self.local_memory[key] value event SyncEvent( event_idstr(uuid.uuid4()), agent_nameself.name, memory_keykey, memory_valuevalue, timestamptime.time(), versionself.version_vector[key] ) self.event_log.append(event) self._broadcast(event) def _broadcast(self, event: SyncEvent): for peer in self.peers: peer.receive_sync(event) def receive_sync(self, event: SyncEvent): current_version self.version_vector.get(event.memory_key, 0) if event.version current_version: self.local_memory[event.memory_key] event.memory_value self.version_vector[event.memory_key] event.version self.event_log.append(event) elif event.version current_version and event.timestamp self._get_local_ts(event.memory_key): self.local_memory[event.memory_key] event.memory_value def _get_local_ts(self, key: str) - float: for event in reversed(self.event_log): if event.memory_key key: return event.timestamp return 0 def recall(self, key: str) - Any: return self.local_memory.get(key) def get_state(self) - Dict: return { agent: self.name, memory: self.local_memory, versions: self.version_vector } class SyncOrchestrator: def __init__(self): self.agents: Dict[str, MemorySyncAgent] {} def create_agent(self, name: str) - MemorySyncAgent: agent MemorySyncAgent(name) self.agents[name] agent return agent def connect_all(self): names list(self.agents.keys()) for i, name in enumerate(names): for j in range(i 1, len(names)): self.agents[name].add_peer(self.agents[names[j]]) self.agents[names[j]].add_peer(self.agents[name]) def sync_all(self): for agent in self.agents.values(): for event in agent.event_log[-5:]: agent._broadcast(event) orchestrator SyncOrchestrator() agent_a orchestrator.create_agent(order_agent) agent_b orchestrator.create_agent(logistics_agent) orchestrator.connect_all() agent_a.store(user_id, 12345) agent_a.store(order_id, ORDER-001) time.sleep(0.1) print(fAgent B 记忆: {json.dumps(agent_b.get_state(), ensure_asciiFalse)})五、避坑指南与最佳实践 **技巧用版本号解决冲突谁版本高谁说了算简单有效。⚠️ **警告广播太多会爆炸每次更新都广播Agent 多了扛不住。✅ **推荐增量同步只同步变更的内容不要全量。六、综合实战演示生产级记忆同步方案from typing import Dict, List, Any, Optional, Set from dataclasses import dataclass, field import json import time dataclass class MemoryDelta: key: str value: Any version: int timestamp: float class DeltaSyncAgent: def __init__(self, name: str): self.name name self.memory: Dict[str, Any] {} self.versions: Dict[str, int] {} self.deltas: List[MemoryDelta] [] self.peers: Dict[str, DeltaSyncAgent] {} self.last_sync: Dict[str, float] {} def connect(self, name: str, peer: DeltaSyncAgent): self.peers[name] peer def update(self, key: str, value: Any): ver self.versions.get(key, 0) 1 self.memory[key] value self.versions[key] ver delta MemoryDelta( keykey, valuevalue, versionver, timestamptime.time() ) self.deltas.append(delta) self._sync_delta(delta) def _sync_delta(self, delta: MemoryDelta): for name, peer in self.peers.items(): peer.receive_delta(self.name, delta) def receive_delta(self, source: str, delta: MemoryDelta): current self.versions.get(delta.key, 0) if delta.version current: self.memory[delta.key] delta.value self.versions[delta.key] delta.version self.deltas.append(delta) def get(self, key: str) - Optional[Any]: return self.memory.get(key) def get_all_deltas(self) - List[Dict]: return [{ key: d.key, value: d.value, version: d.version } for d in self.deltas[-20:]] class MemoryHub: def __init__(self): self.agents: Dict[str, DeltaSyncAgent] {} def register(self, name: str) - DeltaSyncAgent: agent DeltaSyncAgent(name) self.agents[name] agent self._connect_new(agent, name) return agent def _connect_new(self, agent: DeltaSyncAgent, name: str): for existing_name, existing in self.agents.items(): if existing_name ! name: agent.connect(existing_name, existing) existing.connect(name, agent) def broadcast(self, key: str, value: Any, source: str): agent self.agents.get(source) if agent: agent.update(key, value) def get_consensus(self, key: str) - Optional[Any]: values {} for name, agent in self.agents.items(): v agent.get(key) if v is not None: values[name] v if not values: return None # 多数一致 from collections import Counter counter Counter(str(v) for v in values.values()) most_common counter.most_common(1) return eval(most_common[0][0]) if most_common else None hub MemoryHub() agent_a hub.register(agent_a) agent_b hub.register(agent_b) agent_c hub.register(agent_c) hub.broadcast(user_preference, 喜欢红色, agent_a) time.sleep(0.1) print(fAgent B 知道: {agent_b.get(user_preference)}) print(fAgent C 知道: {agent_c.get(user_preference)}) print(f共识: {hub.get_consensus(user_preference)})七、总结多 Agent 长期记忆的消息路由与同步增量同步代替全量版本号解决冲突广播 订阅多数一致的保证做好这些Agent 之间的记忆就不再各自为政了。