Agent Mesh 设计模式:构建可扩展的分布式 Agent 网络
深入解析 Agent Mesh 的设计原理和实现方式,包括服务发现、负载均衡、故障转移和跨节点通信的关键技术。
当你的 Agent 系统从单机走向分布式,传统的中心化编排模式就不再适用了。Agent Mesh 借鉴了 Service Mesh 的理念,为分布式 Agent 网络提供了统一的通信、发现和管理层。
什么是 Agent Mesh
Agent Mesh 是一个基础设施层,负责处理 Agent 之间的通信、发现、负载均衡和安全。它让 Agent 开发者可以专注于业务逻辑,而不用关心分布式系统的复杂性。
┌─────────────────────────────────────────┐
│ Agent Mesh │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Agent A │ │ Agent B │ │ Agent C │ │
│ │ ┌─────┐ │ │ ┌─────┐ │ │ ┌─────┐ │ │
│ │ │Proxy│ │ │ │Proxy│ │ │ │Proxy│ │ │
│ │ └──┬──┘ │ │ └──┬──┘ │ │ └──┬──┘ │ │
│ └────┼────┘ └────┼────┘ └────┼────┘ │
│ └────────────┼────────────┘ │
│ ┌─────┴─────┐ │
│ │ Control │ │
│ │ Plane │ │
│ └───────────┘ │
└─────────────────────────────────────────┘
数据平面
每个 Agent 旁边部署一个 Sidecar Proxy,拦截所有进出消息并处理通信细节。
控制平面
管理所有 Agent 的注册信息、路由规则和安全策略。
服务发现
Agent Mesh 中的 Agent 需要能够动态发现彼此。
class AgentRegistry {
private agents: Map<string, AgentInfo> = new Map();
private healthChecks: Map<string, NodeJS.Timer> = new Map();
register(info: AgentInfo): void {
this.agents.set(info.id, info);
this.startHealthCheck(info.id);
}
deregister(id: string): void {
this.agents.delete(id);
const timer = this.healthChecks.get(id);
if (timer) clearInterval(timer);
}
discover(query: DiscoverQuery): AgentInfo[] {
return Array.from(this.agents.values()).filter(agent => {
if (query.type && agent.type !== query.type) return false;
if (query.capabilities) {
const hasAll = query.capabilities.every(c => agent.capabilities.includes(c));
if (!hasAll) return false;
}
return agent.status === 'healthy';
});
}
private startHealthCheck(id: string): void {
const timer = setInterval(async () => {
const agent = this.agents.get(id);
if (!agent) return;
try {
await this.pingAgent(agent);
agent.status = 'healthy';
} catch {
agent.status = 'unhealthy';
}
}, 10000);
this.healthChecks.set(id, timer);
}
}
负载均衡
当多个 Agent 实例提供相同能力时,需要负载均衡来分配请求。
class LoadBalancer {
private strategy: 'round-robin' | 'least-connections' | 'weighted';
private counters: Map<string, number> = new Map();
select(agents: AgentInfo[]): AgentInfo {
switch (this.strategy) {
case 'round-robin':
return this.roundRobin(agents);
case 'least-connections':
return this.leastConnections(agents);
case 'weighted':
return this.weighted(agents);
}
}
private roundRobin(agents: AgentInfo[]): AgentInfo {
const key = agents.map(a => a.id).sort().join(',');
const counter = (this.counters.get(key) || 0) % agents.length;
this.counters.set(key, counter + 1);
return agents[counter];
}
private leastConnections(agents: AgentInfo[]): AgentInfo {
return agents.reduce((min, agent) =>
agent.activeConnections < min.activeConnections ? agent : min
);
}
}
跨节点通信
请求-响应模式
class MeshProxy {
private registry: AgentRegistry;
private loadBalancer: LoadBalancer;
async sendRequest(targetType: string, request: any): Promise<any> {
const agents = this.registry.discover({ type: targetType });
if (agents.length === 0) {
throw new Error(`No available agent of type ${targetType}`);
}
const target = this.loadBalancer.select(agents);
try {
return await this.rpc(target, request);
} catch (error) {
// 故障转移:尝试下一个实例
const remaining = agents.filter(a => a.id !== target.id);
if (remaining.length > 0) {
const fallback = this.loadBalancer.select(remaining);
return await this.rpc(fallback, request);
}
throw error;
}
}
private async rpc(target: AgentInfo, request: any): Promise<any> {
const response = await fetch(`${target.endpoint}/rpc`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(request),
signal: AbortSignal.timeout(30000),
});
if (!response.ok) {
throw new Error(`RPC failed: ${response.status}`);
}
return response.json();
}
}
发布-订阅模式
class EventBus {
private subscribers: Map<string, Set<string>> = new Map(); // topic -> agentIds
private agents: Map<string, AgentInfo> = new Map();
subscribe(agentId: string, topic: string): void {
if (!this.subscribers.has(topic)) {
this.subscribers.set(topic, new Set());
}
this.subscribers.get(topic)!.add(agentId);
}
async publish(topic: string, event: any): Promise<void> {
const subscribers = this.subscribers.get(topic);
if (!subscribers) return;
const promises = Array.from(subscribers).map(async agentId => {
const agent = this.agents.get(agentId);
if (agent) {
try {
await fetch(`${agent.endpoint}/events`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ topic, event }),
});
} catch {
// 静默失败,不影响其他订阅者
}
}
});
await Promise.allSettled(promises);
}
}
安全通信
mTLS 双向认证
Agent 之间的通信使用双向 TLS 认证,确保双方身份可信。
消息加密
所有跨节点消息都经过加密传输,防止中间人攻击。
常见问题(FAQ)
Agent Mesh 和 Service Mesh 有什么区别?
核心理念相同,但 Agent Mesh 需要处理更多与 AI 相关的场景,如上下文传递、工具调用路由、LLM 请求调度等。
Agent Mesh 的性能开销有多大?
Sidecar Proxy 通常增加 1-5ms 延迟。对于 LLM 调用(通常需要数秒)来说,这个开销可以忽略。
如何监控 Agent Mesh?
使用分布式追踪(OpenTelemetry)和指标采集(Prometheus),在控制平面提供统一的监控面板。
总结
Agent Mesh 为分布式 Agent 系统提供了统一的通信和管理基础设施。通过服务发现、负载均衡、故障转移和安全通信等机制,让 Agent 开发者可以专注于业务逻辑,而不用关心分布式系统的复杂性。