Agent Mesh

Agent Mesh 设计模式:构建可扩展的分布式 Agent 网络

深入解析 Agent Mesh 的设计原理和实现方式,包括服务发现、负载均衡、故障转移和跨节点通信的关键技术。

当你的 Agent 系统从单机走向分布式,传统的中心化编排模式就不再适用了。Agent Mesh 借鉴了 Service Mesh 的理念,为分布式 Agent 网络提供了统一的通信、发现和管理层。

什么是 Agent Mesh

Agent Mesh 是一个基础设施层,负责处理 Agent 之间的通信、发现、负载均衡和安全。它让 Agent 开发者可以专注于业务逻辑,而不用关心分布式系统的复杂性。

┌─────────────────────────────────────────┐
│              Agent Mesh                  │
│                                         │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐ │
│  │ Agent A │  │ Agent B │  │ Agent C │ │
│  │ ┌─────┐ │  │ ┌─────┐ │  │ ┌─────┐ │ │
│  │ │Proxy│ │  │ │Proxy│ │  │ │Proxy│ │ │
│  │ └──┬──┘ │  │ └──┬──┘ │  │ └──┬──┘ │ │
│  └────┼────┘  └────┼────┘  └────┼────┘ │
│       └────────────┼────────────┘      │
│              ┌─────┴─────┐              │
│              │  Control   │              │
│              │   Plane    │              │
│              └───────────┘              │
└─────────────────────────────────────────┘

数据平面

每个 Agent 旁边部署一个 Sidecar Proxy,拦截所有进出消息并处理通信细节。

控制平面

管理所有 Agent 的注册信息、路由规则和安全策略。

服务发现

Agent Mesh 中的 Agent 需要能够动态发现彼此。

class AgentRegistry {
  private agents: Map<string, AgentInfo> = new Map();
  private healthChecks: Map<string, NodeJS.Timer> = new Map();

  register(info: AgentInfo): void {
    this.agents.set(info.id, info);
    this.startHealthCheck(info.id);
  }

  deregister(id: string): void {
    this.agents.delete(id);
    const timer = this.healthChecks.get(id);
    if (timer) clearInterval(timer);
  }

  discover(query: DiscoverQuery): AgentInfo[] {
    return Array.from(this.agents.values()).filter(agent => {
      if (query.type && agent.type !== query.type) return false;
      if (query.capabilities) {
        const hasAll = query.capabilities.every(c => agent.capabilities.includes(c));
        if (!hasAll) return false;
      }
      return agent.status === 'healthy';
    });
  }

  private startHealthCheck(id: string): void {
    const timer = setInterval(async () => {
      const agent = this.agents.get(id);
      if (!agent) return;

      try {
        await this.pingAgent(agent);
        agent.status = 'healthy';
      } catch {
        agent.status = 'unhealthy';
      }
    }, 10000);

    this.healthChecks.set(id, timer);
  }
}

负载均衡

当多个 Agent 实例提供相同能力时,需要负载均衡来分配请求。

class LoadBalancer {
  private strategy: 'round-robin' | 'least-connections' | 'weighted';
  private counters: Map<string, number> = new Map();

  select(agents: AgentInfo[]): AgentInfo {
    switch (this.strategy) {
      case 'round-robin':
        return this.roundRobin(agents);
      case 'least-connections':
        return this.leastConnections(agents);
      case 'weighted':
        return this.weighted(agents);
    }
  }

  private roundRobin(agents: AgentInfo[]): AgentInfo {
    const key = agents.map(a => a.id).sort().join(',');
    const counter = (this.counters.get(key) || 0) % agents.length;
    this.counters.set(key, counter + 1);
    return agents[counter];
  }

  private leastConnections(agents: AgentInfo[]): AgentInfo {
    return agents.reduce((min, agent) =>
      agent.activeConnections < min.activeConnections ? agent : min
    );
  }
}

跨节点通信

请求-响应模式

class MeshProxy {
  private registry: AgentRegistry;
  private loadBalancer: LoadBalancer;

  async sendRequest(targetType: string, request: any): Promise<any> {
    const agents = this.registry.discover({ type: targetType });
    if (agents.length === 0) {
      throw new Error(`No available agent of type ${targetType}`);
    }

    const target = this.loadBalancer.select(agents);

    try {
      return await this.rpc(target, request);
    } catch (error) {
      // 故障转移:尝试下一个实例
      const remaining = agents.filter(a => a.id !== target.id);
      if (remaining.length > 0) {
        const fallback = this.loadBalancer.select(remaining);
        return await this.rpc(fallback, request);
      }
      throw error;
    }
  }

  private async rpc(target: AgentInfo, request: any): Promise<any> {
    const response = await fetch(`${target.endpoint}/rpc`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(request),
      signal: AbortSignal.timeout(30000),
    });

    if (!response.ok) {
      throw new Error(`RPC failed: ${response.status}`);
    }

    return response.json();
  }
}

发布-订阅模式

class EventBus {
  private subscribers: Map<string, Set<string>> = new Map(); // topic -> agentIds
  private agents: Map<string, AgentInfo> = new Map();

  subscribe(agentId: string, topic: string): void {
    if (!this.subscribers.has(topic)) {
      this.subscribers.set(topic, new Set());
    }
    this.subscribers.get(topic)!.add(agentId);
  }

  async publish(topic: string, event: any): Promise<void> {
    const subscribers = this.subscribers.get(topic);
    if (!subscribers) return;

    const promises = Array.from(subscribers).map(async agentId => {
      const agent = this.agents.get(agentId);
      if (agent) {
        try {
          await fetch(`${agent.endpoint}/events`, {
            method: 'POST',
            headers: { 'Content-Type': 'application/json' },
            body: JSON.stringify({ topic, event }),
          });
        } catch {
          // 静默失败,不影响其他订阅者
        }
      }
    });

    await Promise.allSettled(promises);
  }
}

安全通信

mTLS 双向认证

Agent 之间的通信使用双向 TLS 认证,确保双方身份可信。

消息加密

所有跨节点消息都经过加密传输,防止中间人攻击。

常见问题(FAQ)

Agent Mesh 和 Service Mesh 有什么区别?

核心理念相同,但 Agent Mesh 需要处理更多与 AI 相关的场景,如上下文传递、工具调用路由、LLM 请求调度等。

Agent Mesh 的性能开销有多大?

Sidecar Proxy 通常增加 1-5ms 延迟。对于 LLM 调用(通常需要数秒)来说,这个开销可以忽略。

如何监控 Agent Mesh?

使用分布式追踪(OpenTelemetry)和指标采集(Prometheus),在控制平面提供统一的监控面板。

总结

Agent Mesh 为分布式 Agent 系统提供了统一的通信和管理基础设施。通过服务发现、负载均衡、故障转移和安全通信等机制,让 Agent 开发者可以专注于业务逻辑,而不用关心分布式系统的复杂性。