A2A 协议深度解析:多 Agent 协作的通信标准
深入剖析 Google 推出的 Agent-to-Agent (A2A) 协议,对比 MCP 的定位差异,展示多 Agent 系统如何通过标准化通信协议实现高效协作。
MCP 解决了 Agent 与工具之间的连接问题,但 Agent 与 Agent 之间呢?2025 年底 Google 发布了 A2A(Agent-to-Agent)协议,专门解决多 Agent 系统中的通信与协作问题。到 2026 年中,A2A 已经从实验性规范成长为多 Agent 架构的关键基础设施。
MCP vs A2A:不同的问题域
很多人把 A2A 和 MCP 混为一谈,其实它们解决的是完全不同的问题:
┌─────────────────────────────────────────────────┐
│ Agent 通信模型 │
│ │
│ Agent A ──── MCP ────→ Tool (数据库/API/文件) │
│ │ │
│ │ │
│ └──── A2A ────→ Agent B │
│ │ │
│ └──── MCP ────→ Tool │
│ │
│ MCP: Agent ↔ Tool(人与工具的接口) │
│ A2A: Agent ↔ Agent(Agent 之间的接口) │
└─────────────────────────────────────────────────┘
| 维度 | MCP | A2A |
|---|---|---|
| 通信对象 | Agent ↔ Tool | Agent ↔ Agent |
| 核心能力 | 工具调用、资源读取 | 任务委托、状态同步、能力发现 |
| 交互模式 | 请求-响应 | 异步任务、流式更新 |
| 典型场景 | 读数据库、调 API、操作文件 | 委托子任务、协作分析、链式处理 |
A2A 核心概念
Agent Card
每个 A2A Agent 通过 Agent Card 声明自己的能力:
// Agent Card(类似 MCP 的 Server Info)
interface AgentCard {
// 基本信息
name: string;
description: string;
version: string;
// 能力声明
capabilities: {
// 是否支持流式输出
streaming: boolean;
// 是否支持推送通知
pushNotifications: boolean;
// 状态转移历史
stateTransitionHistory: boolean;
};
// 技能列表(类似 MCP 的 Tools)
skills: AgentSkill[];
// 认证方式
authentication: {
schemes: string[];
credentials?: string;
};
}
interface AgentSkill {
id: string;
name: string;
description: string;
// 支持的输入/输出类型
inputModes: string[]; // 如 ['text', 'file', 'structured']
outputModes: string[];
// 示例(帮助调用方理解如何使用)
examples?: string[];
// 标签(用于发现和路由)
tags: string[];
}
任务(Task)
A2A 的核心抽象是任务——Agent A 委托 Agent B 完成某件事:
interface A2ATask {
// 任务唯一标识
id: string;
// 会话标识(多个任务可以属于同一会话)
sessionId: string;
// 任务状态
status: TaskStatus;
// 任务消息
message: Message;
// 工件(Artifacts)—— 任务产出
artifacts?: Artifact[];
}
type TaskStatus =
| 'submitted' // 已提交
| 'working' // 处理中
| 'input-required' // 需要额外输入
| 'completed' // 已完成
| 'canceled' // 已取消
| 'failed'; // 失败
interface Message {
role: 'user' | 'agent';
parts: Part[];
}
// Part 支持多种内容类型
type Part =
| { type: 'text'; text: string }
| { type: 'file'; file: { name: string; mimeType: string; data: string } }
| { type: 'data'; data: Record<string, unknown>; mimeType: string };
A2A 通信流程
基本任务流程
Agent A (调用方) Agent B (执行方)
│ │
│ 1. 发现 Agent Card │
│ ─────────────────────────→ │
│ ←───────────────────────── │
│ │
│ 2. 创建任务 │
│ ─────────────────────────→ │
│ (tasks/send) │
│ │
│ 3. 任务状态更新 │
│ ←───────────────────────── │
│ (working) │
│ │
│ 4. 流式进度 │
│ ←───────────────────────── │
│ (progress update) │
│ │
│ 5. 任务完成 │
│ ←───────────────────────── │
│ (completed + artifacts) │
TypeScript 实现
// A2A 客户端
class A2AClient {
constructor(private baseUrl: string) {}
// 发现 Agent 能力
async discover(): Promise<AgentCard> {
const response = await fetch(`${this.baseUrl}/.well-known/agent.json`);
return response.json();
}
// 发送任务(同步等待结果)
async sendTask(task: {
message: Message;
sessionId?: string;
}): Promise<A2ATask> {
const response = await fetch(`${this.baseUrl}/tasks/send`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
id: crypto.randomUUID(),
sessionId: task.sessionId || crypto.randomUUID(),
message: task.message,
}),
});
return response.json();
}
// 发送任务(流式)
async *sendTaskStream(task: {
message: Message;
sessionId?: string;
}): AsyncGenerator<TaskEvent> {
const response = await fetch(`${this.baseUrl}/tasks/sendSubscribe`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
id: crypto.randomUUID(),
sessionId: task.sessionId || crypto.randomUUID(),
message: task.message,
}),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const events = text.split('\n').filter(Boolean);
for (const event of events) {
yield JSON.parse(event);
}
}
}
}
A2A 服务端
// A2A 服务端实现
class A2AServer {
private handlers: Map<string, TaskHandler> = new Map();
// 注册技能处理器
registerSkill(skillId: string, handler: TaskHandler) {
this.handlers.set(skillId, handler);
}
// 处理任务请求
async handleTask(task: A2ATask): Promise<A2ATask> {
const handler = this.selectHandler(task);
if (!handler) {
return { ...task, status: 'failed', error: 'No handler found' };
}
try {
// 更新状态为处理中
this.emitStatus(task.id, 'working');
// 执行任务
const result = await handler.execute(task);
// 返回完成状态和产物
return {
...task,
status: 'completed',
artifacts: result.artifacts,
};
} catch (error) {
return { ...task, status: 'failed', error: error.message };
}
}
private selectHandler(task: A2ATask): TaskHandler | null {
// 根据消息内容和标签匹配处理器
for (const [skillId, handler] of this.handlers) {
if (handler.canHandle(task)) {
return handler;
}
}
return null;
}
}
多 Agent 协作模式
模式一:链式委托
用户 → Agent A (规划) → Agent B (编码) → Agent C (测试) → 结果
class ChainOrchestrator {
private agents: A2AClient[];
async execute(task: string): Promise<Artifact[]> {
let currentMessage: Message = {
role: 'user',
parts: [{ type: 'text', text: task }],
};
let sessionId = crypto.randomUUID();
for (const agent of this.agents) {
const result = await agent.sendTask({
message: currentMessage,
sessionId,
});
if (result.status === 'failed') {
throw new Error(`Chain failed at ${agent}: ${result.error}`);
}
// 将当前 Agent 的输出作为下一个 Agent 的输入
currentMessage = {
role: 'user',
parts: result.artifacts!.map(a => ({
type: 'text' as const,
text: JSON.stringify(a),
})),
};
}
return currentMessage.parts as Artifact[];
}
}
模式二:并行分发
┌→ Agent B (分析安全性) ─┐
用户 → A │→ Agent C (分析性能) ├→ Agent F (汇总)
└→ Agent D (分析代码质量)┘
class ParallelOrchestrator {
async execute(task: string): Promise<Artifact[]> {
const sessionId = crypto.randomUUID();
const baseMessage: Message = {
role: 'user',
parts: [{ type: 'text', text: task }],
};
// 并行分发给多个 Agent
const results = await Promise.allSettled(
this.agents.map(agent =>
agent.sendTask({ message: baseMessage, sessionId })
)
);
// 收集成功的结果
const artifacts = results
.filter((r): r is PromiseFulfilledResult<A2ATask> =>
r.status === 'fulfilled' && r.value.status === 'completed'
)
.flatMap(r => r.value.artifacts || []);
// 汇总 Agent 处理
const summary = await this.summaryAgent.sendTask({
message: {
role: 'user',
parts: [{ type: 'text', text: JSON.stringify(artifacts) }],
},
sessionId,
});
return summary.artifacts || [];
}
}
模式三:动态路由
class DynamicRouter {
private agentRegistry: Map<string, AgentCard> = new Map();
async route(task: string): Promise<A2AClient> {
// 根据任务内容动态选择最合适的 Agent
const taskEmbedding = await this.embed(task);
let bestAgent: A2AClient | null = null;
let bestScore = -1;
for (const [id, card] of this.agentRegistry) {
for (const skill of card.skills) {
const skillEmbedding = await this.embed(skill.description);
const score = this.cosineSimilarity(taskEmbedding, skillEmbedding);
if (score > bestScore) {
bestScore = score;
bestAgent = this.getClient(id);
}
}
}
if (!bestAgent || bestScore < 0.5) {
throw new Error('No suitable agent found for this task');
}
return bestAgent;
}
}
A2A + MCP:完整的 Agent 技术栈
在实际系统中,A2A 和 MCP 是互补的:
// 一个完整的 Agent 既暴露 A2A 接口,又使用 MCP 工具
class FullStackAgent {
private a2aServer: A2AServer;
private mcpClients: McpClient[];
constructor() {
// 注册 A2A 技能
this.a2aServer = new A2AServer();
this.a2aServer.registerSkill('code-review', new CodeReviewHandler(this));
this.a2aServer.registerSkill('bug-fix', new BugFixHandler(this));
}
// 通过 MCP 调用外部工具
async useTool(toolName: string, params: Record<string, unknown>) {
const client = this.mcpClients.find(c => c.hasTool(toolName));
if (!client) throw new Error(`Tool ${toolName} not found`);
return client.callTool(toolName, params);
}
// 处理 A2A 任务时可以使用 MCP 工具
async handleCodeReview(task: A2ATask): Promise<Artifact[]> {
// 1. 通过 MCP 读取代码
const code = await this.useTool('read-file', { path: task.message });
// 2. 通过 MCP 运行静态分析
const analysis = await this.useTool('lint', { code });
// 3. LLM 推理生成审查意见
const review = await this.llm.analyze(code, analysis);
// 4. 通过 A2A 返回结果
return [{ type: 'text', text: review }];
}
}
┌─────────────────────────────────────────────────┐
│ 完整 Agent 技术栈 │
│ │
│ ┌─────────────────────────────────────────┐ │
│ │ A2A 层(Agent 间通信) │ │
│ │ Agent A ←──── A2A ────→ Agent B │ │
│ └─────────────────┬───────────────────────┘ │
│ │ │
│ ┌─────────────────┴───────────────────────┐ │
│ │ MCP 层(工具调用) │ │
│ │ Agent ──── MCP ────→ DB / API / FS │ │
│ └─────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────┐ │
│ │ LLM 层(推理引擎) │ │
│ │ Claude / GPT / Gemini │ │
│ └─────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘
安全考量
身份验证
class A2AAuth {
// Agent 间认证
async authenticateAgent(request: A2ARequest): Promise<boolean> {
// 1. 验证 JWT token
const token = request.headers['authorization']?.replace('Bearer ', '');
if (!token) return false;
const payload = await this.verifyJWT(token);
// 2. 检查 Agent 是否在可信注册表中
const trusted = await this.registry.isTrusted(payload.agentId);
if (!trusted) return false;
// 3. 检查权限
const hasPermission = await this.checkPermission(
payload.agentId,
request.skillId
);
return hasPermission;
}
// 任务级别的权限控制
async authorizeTask(task: A2ATask, callerId: string): Promise<boolean> {
// 检查调用方是否有权限使用该技能
// 检查任务参数是否在允许范围内
// 检查资源配额
}
}
任务沙箱
class TaskSandbox {
// 每个任务在隔离环境中执行
async execute(task: A2ATask, handler: TaskHandler): Promise<A2ATask> {
const sandbox = await this.createSandbox({
// 资源限制
maxMemory: '512MB',
maxCpu: '1 core',
maxDuration: 300_000, // 5 分钟
// 网络限制
allowedHosts: handler.requiredHosts || [],
denyAllOutbound: !handler.requiresNetwork,
// 文件系统限制
readOnlyFs: true,
tmpDir: '/tmp/sandbox',
});
return sandbox.run(() => handler.execute(task));
}
}
常见问题(FAQ)
A2A 会取代 MCP 吗?
不会。它们是互补关系,不是竞争关系。MCP 解决 Agent 与工具的连接,A2A 解决 Agent 之间的通信。一个完整的 Agent 系统通常同时需要两者。
什么时候需要用 A2A?
当你的系统中有多个 Agent 需要协作时。单 Agent + MCP 工具能解决的问题,不需要引入 A2A。A2A 的价值在于:任务分解、专业化分工、并行处理、跨组织协作。
A2A 的性能开销大吗?
Agent 间通信有额外的网络和序列化开销,但相比 LLM 推理时间(秒级)通常可以忽略。真正的性能收益来自并行处理——一个复杂任务拆分给 3 个 Agent 并行执行,总时间可能只有串行的 1/3。
总结
A2A 协议填补了多 Agent 系统中”Agent 间通信”的空白。它不是 MCP 的替代品,而是互补层。MCP 让 Agent 能使用工具,A2A 让 Agent 能相互协作。
2026 年的 Agent 技术栈正在形成清晰的分层:LLM 作为推理引擎,MCP 作为工具层,A2A 作为协作层。理解这三层的关系和各自的适用场景,是构建高效 Agent 系统的关键。