Multi-Agent

Multi-Agent System 协同架构:任务分解与协作通信实战

深入探讨多 Agent 系统的任务分解策略、协作通信模式和状态同步机制,构建高效的 AI Agent 协同架构。

单个 Agent 的能力终究有限。当面对需要多领域知识、多步骤执行的复杂任务时,多个 Agent 协作是必然选择。但多 Agent 系统的难点不在于”让多个 Agent 同时运行”,而在于”让它们高效协作”。

任务分解策略

基于能力的分解

根据每个 Agent 的专业能力分配任务。这种方式需要预先定义每个 Agent 的能力边界。

interface AgentCapability {
  agentId: string;
  skills: string[];
  tools: string[];
  maxConcurrent: number;
}

class TaskDecomposer {
  private capabilities: AgentCapability[];

  async decompose(task: string): Promise<TaskPlan> {
    // 使用 LLM 分析任务并分解为子任务
    const subtasks = await this.llmDecompose(task);

    // 为每个子任务匹配最合适的 Agent
    const assignments = subtasks.map(subtask => ({
      subtask,
      agent: this.findBestAgent(subtask),
    }));

    // 分析依赖关系,确定执行顺序
    const dependencies = this.analyzeDependencies(assignments);

    return { assignments, dependencies };
  }

  private findBestAgent(subtask: SubTask): string {
    // 基于能力匹配度选择 Agent
    let bestAgent = '';
    let bestScore = -1;

    for (const cap of this.capabilities) {
      const score = this.calculateMatchScore(cap, subtask);
      if (score > bestScore) {
        bestScore = score;
        bestAgent = cap.agentId;
      }
    }

    return bestAgent;
  }
}

基于数据的分解

当任务涉及大量数据处理时,可以将数据分片分配给不同的 Agent 并行处理。

基于流程的分解

当任务有明确的步骤流程时,每个步骤分配给专门的 Agent。

协作通信模式

直接通信

Agent 之间直接发送消息。实现简单,但随着 Agent 数量增加,通信复杂度呈指数增长。

class DirectCommunication {
  private agents: Map<string, AgentInstance> = new Map();

  async sendMessage(from: string, to: string, message: any): Promise<void> {
    const target = this.agents.get(to);
    if (!target) throw new Error(`Agent ${to} not found`);
    await target.receiveMessage({ from, content: message });
  }
}

黑板模式

所有 Agent 通过共享的”黑板”(共享数据空间)进行通信。Agent 读取黑板上的信息,处理后将结果写回。

class Blackboard {
  private state: Map<string, any> = new Map();
  private subscribers: Map<string, Function[]> = new Map();

  write(key: string, value: any, author: string): void {
    this.state.set(key, { value, author, timestamp: Date.now() });
    this.notify(key, value);
  }

  read(key: string): any {
    return this.state.get(key)?.value;
  }

  subscribe(key: string, callback: Function): void {
    if (!this.subscribers.has(key)) {
      this.subscribers.set(key, []);
    }
    this.subscribers.get(key)!.push(callback);
  }

  private notify(key: string, value: any): void {
    const subs = this.subscribers.get(key) || [];
    subs.forEach(cb => cb(value));
  }
}

消息总线模式

通过中心化的消息总线进行通信,支持发布/订阅和点对点两种模式。

class MessageBus {
  private handlers: Map<string, Function[]> = new Map();

  publish(topic: string, message: any): void {
    const handlers = this.handlers.get(topic) || [];
    handlers.forEach(handler => handler(message));
  }

  subscribe(topic: string, handler: Function): void {
    if (!this.handlers.has(topic)) {
      this.handlers.set(topic, []);
    }
    this.handlers.get(topic)!.push(handler);
  }
}

状态同步机制

乐观锁

允许 Agent 并行修改状态,冲突时通过合并或回滚解决。

中心化状态管理

由编排器统一管理所有 Agent 的状态,避免冲突。

编排器实现

class Orchestrator {
  private agents: Map<string, AgentInstance> = new Map();
  private blackboard: Blackboard;
  private messageBus: MessageBus;

  async executeTask(task: string): Promise<any> {
    // 1. 分解任务
    const plan = await this.decomposeTask(task);

    // 2. 执行子任务
    const results: Map<string, any> = new Map();

    for (const step of plan.steps) {
      if (step.parallel) {
        // 并行执行
        const promises = step.tasks.map(t => this.executeSubTask(t));
        const stepResults = await Promise.allSettled(promises);
        step.tasks.forEach((t, i) => {
          if (stepResults[i].status === 'fulfilled') {
            results.set(t.id, stepResults[i].value);
          }
        });
      } else {
        // 串行执行
        for (const t of step.tasks) {
          const result = await this.executeSubTask(t);
          results.set(t.id, result);
        }
      }
    }

    // 3. 汇总结果
    return this.aggregateResults(results);
  }

  private async executeSubTask(subtask: SubTask): Promise<any> {
    const agent = this.agents.get(subtask.agentId);
    if (!agent) throw new Error(`Agent ${subtask.agentId} not found`);

    // 将之前的子任务结果注入上下文
    const context = this.buildContext(subtask);
    return await agent.execute(subtask.description, context);
  }
}

错误处理与容错

重试机制

async function withRetry<T>(
  fn: () => Promise<T>,
  maxRetries: number = 3,
  backoffMs: number = 1000
): Promise<T> {
  for (let i = 0; i <= maxRetries; i++) {
    try {
      return await fn();
    } catch (error) {
      if (i === maxRetries) throw error;
      await new Promise(r => setTimeout(r, backoffMs * Math.pow(2, i)));
    }
  }
  throw new Error('Unreachable');
}

降级策略

当某个 Agent 不可用时,降级到备选 Agent 或跳过该子任务。

常见问题(FAQ)

如何确定 Agent 的最佳数量?

通常 3-5 个 Agent 是比较合理的范围。过多的 Agent 会增加编排复杂度和通信开销。

Agent 之间如何共享数据?

通过黑板模式或消息总线共享。避免直接传递大量数据,而是传递引用或摘要。

如何调试多 Agent 系统?

为每个 Agent 添加详细的日志和追踪,使用分布式追踪工具(如 Jaeger)可视化调用链。

总结

多 Agent 系统的核心挑战在于任务分解和协作通信。选择合适的分解策略(能力、数据、流程)和通信模式(直接、黑板、消息总线),是构建高效多 Agent 系统的关键。编排器作为系统的”大脑”,负责协调所有 Agent 的工作。