A2A

A2A 协议深度解析:多 Agent 协作的通信标准

深入剖析 Google 推出的 Agent-to-Agent (A2A) 协议,对比 MCP 的定位差异,展示多 Agent 系统如何通过标准化通信协议实现高效协作。

MCP 解决了 Agent 与工具之间的连接问题,但 Agent 与 Agent 之间呢?2025 年底 Google 发布了 A2A(Agent-to-Agent)协议,专门解决多 Agent 系统中的通信与协作问题。到 2026 年中,A2A 已经从实验性规范成长为多 Agent 架构的关键基础设施。

MCP vs A2A:不同的问题域

很多人把 A2A 和 MCP 混为一谈,其实它们解决的是完全不同的问题:

┌─────────────────────────────────────────────────┐
│           Agent 通信模型                         │
│                                                  │
│  Agent A ──── MCP ────→ Tool (数据库/API/文件)  │
│    │                                             │
│    │                                             │
│    └──── A2A ────→ Agent B                      │
│                     │                            │
│                     └──── MCP ────→ Tool         │
│                                                  │
│  MCP: Agent ↔ Tool(人与工具的接口)             │
│  A2A: Agent ↔ Agent(Agent 之间的接口)          │
└─────────────────────────────────────────────────┘
维度MCPA2A
通信对象Agent ↔ ToolAgent ↔ Agent
核心能力工具调用、资源读取任务委托、状态同步、能力发现
交互模式请求-响应异步任务、流式更新
典型场景读数据库、调 API、操作文件委托子任务、协作分析、链式处理

A2A 核心概念

Agent Card

每个 A2A Agent 通过 Agent Card 声明自己的能力:

// Agent Card(类似 MCP 的 Server Info)
interface AgentCard {
  // 基本信息
  name: string;
  description: string;
  version: string;

  // 能力声明
  capabilities: {
    // 是否支持流式输出
    streaming: boolean;

    // 是否支持推送通知
    pushNotifications: boolean;

    // 状态转移历史
    stateTransitionHistory: boolean;
  };

  // 技能列表(类似 MCP 的 Tools)
  skills: AgentSkill[];

  // 认证方式
  authentication: {
    schemes: string[];
    credentials?: string;
  };
}

interface AgentSkill {
  id: string;
  name: string;
  description: string;

  // 支持的输入/输出类型
  inputModes: string[];   // 如 ['text', 'file', 'structured']
  outputModes: string[];

  // 示例(帮助调用方理解如何使用)
  examples?: string[];

  // 标签(用于发现和路由)
  tags: string[];
}

任务(Task)

A2A 的核心抽象是任务——Agent A 委托 Agent B 完成某件事:

interface A2ATask {
  // 任务唯一标识
  id: string;

  // 会话标识(多个任务可以属于同一会话)
  sessionId: string;

  // 任务状态
  status: TaskStatus;

  // 任务消息
  message: Message;

  // 工件(Artifacts)—— 任务产出
  artifacts?: Artifact[];
}

type TaskStatus =
  | 'submitted'    // 已提交
  | 'working'      // 处理中
  | 'input-required' // 需要额外输入
  | 'completed'    // 已完成
  | 'canceled'     // 已取消
  | 'failed';      // 失败

interface Message {
  role: 'user' | 'agent';
  parts: Part[];
}

// Part 支持多种内容类型
type Part =
  | { type: 'text'; text: string }
  | { type: 'file'; file: { name: string; mimeType: string; data: string } }
  | { type: 'data'; data: Record<string, unknown>; mimeType: string };

A2A 通信流程

基本任务流程

Agent A (调用方)              Agent B (执行方)
    │                             │
    │  1. 发现 Agent Card         │
    │ ─────────────────────────→ │
    │ ←───────────────────────── │
    │                             │
    │  2. 创建任务                │
    │ ─────────────────────────→ │
    │     (tasks/send)            │
    │                             │
    │  3. 任务状态更新            │
    │ ←───────────────────────── │
    │     (working)               │
    │                             │
    │  4. 流式进度                │
    │ ←───────────────────────── │
    │     (progress update)       │
    │                             │
    │  5. 任务完成                │
    │ ←───────────────────────── │
    │     (completed + artifacts) │

TypeScript 实现

// A2A 客户端
class A2AClient {
  constructor(private baseUrl: string) {}

  // 发现 Agent 能力
  async discover(): Promise<AgentCard> {
    const response = await fetch(`${this.baseUrl}/.well-known/agent.json`);
    return response.json();
  }

  // 发送任务(同步等待结果)
  async sendTask(task: {
    message: Message;
    sessionId?: string;
  }): Promise<A2ATask> {
    const response = await fetch(`${this.baseUrl}/tasks/send`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        id: crypto.randomUUID(),
        sessionId: task.sessionId || crypto.randomUUID(),
        message: task.message,
      }),
    });
    return response.json();
  }

  // 发送任务(流式)
  async *sendTaskStream(task: {
    message: Message;
    sessionId?: string;
  }): AsyncGenerator<TaskEvent> {
    const response = await fetch(`${this.baseUrl}/tasks/sendSubscribe`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        id: crypto.randomUUID(),
        sessionId: task.sessionId || crypto.randomUUID(),
        message: task.message,
      }),
    });

    const reader = response.body!.getReader();
    const decoder = new TextDecoder();

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      const text = decoder.decode(value);
      const events = text.split('\n').filter(Boolean);
      for (const event of events) {
        yield JSON.parse(event);
      }
    }
  }
}

A2A 服务端

// A2A 服务端实现
class A2AServer {
  private handlers: Map<string, TaskHandler> = new Map();

  // 注册技能处理器
  registerSkill(skillId: string, handler: TaskHandler) {
    this.handlers.set(skillId, handler);
  }

  // 处理任务请求
  async handleTask(task: A2ATask): Promise<A2ATask> {
    const handler = this.selectHandler(task);
    if (!handler) {
      return { ...task, status: 'failed', error: 'No handler found' };
    }

    try {
      // 更新状态为处理中
      this.emitStatus(task.id, 'working');

      // 执行任务
      const result = await handler.execute(task);

      // 返回完成状态和产物
      return {
        ...task,
        status: 'completed',
        artifacts: result.artifacts,
      };
    } catch (error) {
      return { ...task, status: 'failed', error: error.message };
    }
  }

  private selectHandler(task: A2ATask): TaskHandler | null {
    // 根据消息内容和标签匹配处理器
    for (const [skillId, handler] of this.handlers) {
      if (handler.canHandle(task)) {
        return handler;
      }
    }
    return null;
  }
}

多 Agent 协作模式

模式一:链式委托

用户 → Agent A (规划) → Agent B (编码) → Agent C (测试) → 结果
class ChainOrchestrator {
  private agents: A2AClient[];

  async execute(task: string): Promise<Artifact[]> {
    let currentMessage: Message = {
      role: 'user',
      parts: [{ type: 'text', text: task }],
    };

    let sessionId = crypto.randomUUID();

    for (const agent of this.agents) {
      const result = await agent.sendTask({
        message: currentMessage,
        sessionId,
      });

      if (result.status === 'failed') {
        throw new Error(`Chain failed at ${agent}: ${result.error}`);
      }

      // 将当前 Agent 的输出作为下一个 Agent 的输入
      currentMessage = {
        role: 'user',
        parts: result.artifacts!.map(a => ({
          type: 'text' as const,
          text: JSON.stringify(a),
        })),
      };
    }

    return currentMessage.parts as Artifact[];
  }
}

模式二:并行分发

         ┌→ Agent B (分析安全性) ─┐
用户 → A │→ Agent C (分析性能)   ├→ Agent F (汇总)
         └→ Agent D (分析代码质量)┘
class ParallelOrchestrator {
  async execute(task: string): Promise<Artifact[]> {
    const sessionId = crypto.randomUUID();
    const baseMessage: Message = {
      role: 'user',
      parts: [{ type: 'text', text: task }],
    };

    // 并行分发给多个 Agent
    const results = await Promise.allSettled(
      this.agents.map(agent =>
        agent.sendTask({ message: baseMessage, sessionId })
      )
    );

    // 收集成功的结果
    const artifacts = results
      .filter((r): r is PromiseFulfilledResult<A2ATask> =>
        r.status === 'fulfilled' && r.value.status === 'completed'
      )
      .flatMap(r => r.value.artifacts || []);

    // 汇总 Agent 处理
    const summary = await this.summaryAgent.sendTask({
      message: {
        role: 'user',
        parts: [{ type: 'text', text: JSON.stringify(artifacts) }],
      },
      sessionId,
    });

    return summary.artifacts || [];
  }
}

模式三:动态路由

class DynamicRouter {
  private agentRegistry: Map<string, AgentCard> = new Map();

  async route(task: string): Promise<A2AClient> {
    // 根据任务内容动态选择最合适的 Agent
    const taskEmbedding = await this.embed(task);

    let bestAgent: A2AClient | null = null;
    let bestScore = -1;

    for (const [id, card] of this.agentRegistry) {
      for (const skill of card.skills) {
        const skillEmbedding = await this.embed(skill.description);
        const score = this.cosineSimilarity(taskEmbedding, skillEmbedding);

        if (score > bestScore) {
          bestScore = score;
          bestAgent = this.getClient(id);
        }
      }
    }

    if (!bestAgent || bestScore < 0.5) {
      throw new Error('No suitable agent found for this task');
    }

    return bestAgent;
  }
}

A2A + MCP:完整的 Agent 技术栈

在实际系统中,A2A 和 MCP 是互补的:

// 一个完整的 Agent 既暴露 A2A 接口,又使用 MCP 工具
class FullStackAgent {
  private a2aServer: A2AServer;
  private mcpClients: McpClient[];

  constructor() {
    // 注册 A2A 技能
    this.a2aServer = new A2AServer();
    this.a2aServer.registerSkill('code-review', new CodeReviewHandler(this));
    this.a2aServer.registerSkill('bug-fix', new BugFixHandler(this));
  }

  // 通过 MCP 调用外部工具
  async useTool(toolName: string, params: Record<string, unknown>) {
    const client = this.mcpClients.find(c => c.hasTool(toolName));
    if (!client) throw new Error(`Tool ${toolName} not found`);
    return client.callTool(toolName, params);
  }

  // 处理 A2A 任务时可以使用 MCP 工具
  async handleCodeReview(task: A2ATask): Promise<Artifact[]> {
    // 1. 通过 MCP 读取代码
    const code = await this.useTool('read-file', { path: task.message });

    // 2. 通过 MCP 运行静态分析
    const analysis = await this.useTool('lint', { code });

    // 3. LLM 推理生成审查意见
    const review = await this.llm.analyze(code, analysis);

    // 4. 通过 A2A 返回结果
    return [{ type: 'text', text: review }];
  }
}
┌─────────────────────────────────────────────────┐
│         完整 Agent 技术栈                        │
│                                                  │
│  ┌─────────────────────────────────────────┐   │
│  │            A2A 层(Agent 间通信)         │   │
│  │  Agent A ←──── A2A ────→ Agent B        │   │
│  └─────────────────┬───────────────────────┘   │
│                    │                            │
│  ┌─────────────────┴───────────────────────┐   │
│  │            MCP 层(工具调用)             │   │
│  │  Agent ──── MCP ────→ DB / API / FS     │   │
│  └─────────────────────────────────────────┘   │
│                                                  │
│  ┌─────────────────────────────────────────┐   │
│  │            LLM 层(推理引擎)             │   │
│  │  Claude / GPT / Gemini                   │   │
│  └─────────────────────────────────────────┘   │
└─────────────────────────────────────────────────┘

安全考量

身份验证

class A2AAuth {
  // Agent 间认证
  async authenticateAgent(request: A2ARequest): Promise<boolean> {
    // 1. 验证 JWT token
    const token = request.headers['authorization']?.replace('Bearer ', '');
    if (!token) return false;

    const payload = await this.verifyJWT(token);

    // 2. 检查 Agent 是否在可信注册表中
    const trusted = await this.registry.isTrusted(payload.agentId);
    if (!trusted) return false;

    // 3. 检查权限
    const hasPermission = await this.checkPermission(
      payload.agentId,
      request.skillId
    );

    return hasPermission;
  }

  // 任务级别的权限控制
  async authorizeTask(task: A2ATask, callerId: string): Promise<boolean> {
    // 检查调用方是否有权限使用该技能
    // 检查任务参数是否在允许范围内
    // 检查资源配额
  }
}

任务沙箱

class TaskSandbox {
  // 每个任务在隔离环境中执行
  async execute(task: A2ATask, handler: TaskHandler): Promise<A2ATask> {
    const sandbox = await this.createSandbox({
      // 资源限制
      maxMemory: '512MB',
      maxCpu: '1 core',
      maxDuration: 300_000,  // 5 分钟

      // 网络限制
      allowedHosts: handler.requiredHosts || [],
      denyAllOutbound: !handler.requiresNetwork,

      // 文件系统限制
      readOnlyFs: true,
      tmpDir: '/tmp/sandbox',
    });

    return sandbox.run(() => handler.execute(task));
  }
}

常见问题(FAQ)

A2A 会取代 MCP 吗?

不会。它们是互补关系,不是竞争关系。MCP 解决 Agent 与工具的连接,A2A 解决 Agent 之间的通信。一个完整的 Agent 系统通常同时需要两者。

什么时候需要用 A2A?

当你的系统中有多个 Agent 需要协作时。单 Agent + MCP 工具能解决的问题,不需要引入 A2A。A2A 的价值在于:任务分解、专业化分工、并行处理、跨组织协作。

A2A 的性能开销大吗?

Agent 间通信有额外的网络和序列化开销,但相比 LLM 推理时间(秒级)通常可以忽略。真正的性能收益来自并行处理——一个复杂任务拆分给 3 个 Agent 并行执行,总时间可能只有串行的 1/3。

总结

A2A 协议填补了多 Agent 系统中”Agent 间通信”的空白。它不是 MCP 的替代品,而是互补层。MCP 让 Agent 能使用工具,A2A 让 Agent 能相互协作。

2026 年的 Agent 技术栈正在形成清晰的分层:LLM 作为推理引擎,MCP 作为工具层,A2A 作为协作层。理解这三层的关系和各自的适用场景,是构建高效 Agent 系统的关键。