MCP

MCP 性能优化实战:降低延迟、提升吞吐的关键策略

深入分析 Model Context Protocol 的性能瓶颈和优化方法,包括连接池、缓存、批处理和异步执行等关键策略。

MCP 为 AI Agent 提供了强大的工具调用能力,但每次调用都涉及序列化、网络传输和反序列化开销。在高频调用场景下,这些开销会显著影响系统性能。这篇文章将介绍实用的 MCP 性能优化策略。

性能瓶颈分析

MCP 调用的延迟来源:

LLM 推理 → JSON 序列化 → 传输 → Server 处理 → JSON 序列化 → 传输 → 上下文更新
         ~1ms         ~0.1ms    变化大     ~1ms         ~0.1ms    ~1ms

其中 Server 处理时间因工具而异,可能从毫秒级到秒级不等。传输延迟取决于传输方式:stdio 约 0.1ms,HTTP 约 10-100ms。

策略一:连接复用

避免每次调用都建立新的连接。

stdio 连接池

class StdioPool {
  private pool: Client[] = [];
  private maxSize: number;

  constructor(maxSize: number = 5) {
    this.maxSize = maxSize;
  }

  async acquire(command: string, args: string[]): Promise<Client> {
    // 复用现有连接
    const existing = this.pool.find(c => !c.busy);
    if (existing) {
      existing.busy = true;
      return existing;
    }

    // 创建新连接
    if (this.pool.length < this.maxSize) {
      const client = new Client({ name: 'pooled', version: '1.0.0' });
      await client.connect(new StdioClientTransport({ command, args }));
      client.busy = true;
      this.pool.push(client);
      return client;
    }

    // 等待连接释放
    return new Promise((resolve) => {
      const check = setInterval(() => {
        const free = this.pool.find(c => !c.busy);
        if (free) {
          free.busy = true;
          clearInterval(check);
          resolve(free);
        }
      }, 10);
    });
  }

  release(client: Client) {
    client.busy = false;
  }
}

HTTP 持久连接

import http from 'http';

const agent = new http.Agent({
  keepAlive: true,
  maxSockets: 10,
  keepAliveMsecs: 60000,
});

const transport = new StreamableHTTPClientTransport(
  new URL('http://localhost:3000/mcp'),
  { agent }
);

策略二:结果缓存

对频繁调用且结果变化不大的工具实施缓存。

class ToolCache {
  private cache = new Map<string, { data: any; expiry: number }>();
  private ttl: number;

  constructor(ttlMs: number = 60000) {
    this.ttl = ttlMs;
  }

  get(key: string): any | null {
    const entry = this.cache.get(key);
    if (!entry || Date.now() > entry.expiry) {
      this.cache.delete(key);
      return null;
    }
    return entry.data;
  }

  set(key: string, data: any) {
    this.cache.set(key, {
      data,
      expiry: Date.now() + this.ttl,
    });
  }
}

const cache = new ToolCache(30000);

server.tool('get_config', '获取应用配置', {}, async () => {
  const cached = cache.get('config');
  if (cached) return { content: [{ type: 'text', text: cached }] };

  const config = await loadConfig();
  const text = JSON.stringify(config);
  cache.set('config', text);

  return { content: [{ type: 'text', text }] };
});

策略三:批处理

将多个操作合并为一次调用,减少往返次数。

server.tool('batch_read', '批量读取多个文件', {
  paths: z.array(z.string()),
}, async ({ paths }) => {
  const results = await Promise.allSettled(
    paths.map(p => fs.readFile(p, 'utf-8'))
  );

  const output = results.map((r, i) => ({
    path: paths[i],
    content: r.status === 'fulfilled' ? r.value : null,
    error: r.status === 'rejected' ? r.reason.message : null,
  }));

  return { content: [{ type: 'text', text: JSON.stringify(output) }] };
});

策略四:异步执行

对于耗时操作,使用异步模式避免阻塞。

const pendingJobs = new Map<string, Promise<any>>();

server.tool('start_analysis', '启动数据分析任务', {
  dataset: z.string(),
}, async ({ dataset }) => {
  const jobId = randomUUID();

  // 异步启动任务
  const job = runAnalysis(dataset);
  pendingJobs.set(jobId, job);

  // 立即返回任务 ID
  return {
    content: [{
      type: 'text',
      text: JSON.stringify({ jobId, status: 'started' }),
    }],
  };
});

server.tool('check_analysis', '检查分析任务状态', {
  jobId: z.string(),
}, async ({ jobId }) => {
  const job = pendingJobs.get(jobId);
  if (!job) {
    return {
      content: [{ type: 'text', text: '任务不存在' }],
      isError: true,
    };
  }

  try {
    const result = await Promise.race([
      job,
      new Promise(resolve => setTimeout(() => resolve('pending'), 100)),
    ]);

    if (result === 'pending') {
      return { content: [{ type: 'text', text: '{"status":"running"}' }] };
    }

    pendingJobs.delete(jobId);
    return { content: [{ type: 'text', text: JSON.stringify(result) }] };
  } catch (error) {
    pendingJobs.delete(jobId);
    return {
      content: [{ type: 'text', text: `错误:${error.message}` }],
      isError: true,
    };
  }
});

策略五:流式响应

对于大量数据,使用流式返回减少首次响应时间。

server.tool('stream_search', '流式搜索结果', {
  query: z.string(),
}, async ({ query }) => {
  const stream = searchAsStream(query);
  const chunks: string[] = [];

  for await (const chunk of stream) {
    chunks.push(chunk);
    // 可以在这里通过 SSE 推送中间结果
  }

  return { content: [{ type: 'text', text: chunks.join('') }] };
});

传输层优化

选择合适的传输方式

  • 本地工具:stdio(最低延迟)
  • 远程服务:Streamable HTTP(最佳扩展性)
  • 需要推送:SSE

压缩传输数据

import zlib from 'zlib';

// Server 端压缩
function compressResponse(data: string): Buffer {
  return zlib.gzipSync(Buffer.from(data));
}

// Client 端解压
function decompressResponse(data: Buffer): string {
  return zlib.gunzipSync(data).toString();
}

监控和调优

性能指标收集

class PerformanceMonitor {
  private metrics: Map<string, number[]> = new Map();

  record(tool: string, duration: number) {
    if (!this.metrics.has(tool)) {
      this.metrics.set(tool, []);
    }
    this.metrics.get(tool)!.push(duration);
  }

  getStats(tool: string) {
    const durations = this.metrics.get(tool) || [];
    if (durations.length === 0) return null;

    const sorted = [...durations].sort((a, b) => a - b);
    return {
      count: sorted.length,
      p50: sorted[Math.floor(sorted.length * 0.5)],
      p95: sorted[Math.floor(sorted.length * 0.95)],
      p99: sorted[Math.floor(sorted.length * 0.99)],
      avg: sorted.reduce((a, b) => a + b, 0) / sorted.length,
    };
  }
}

常见问题(FAQ)

缓存会导致数据不一致吗?

会。需要根据数据的变化频率设置合理的 TTL。对于实时性要求高的数据,可以使用事件驱动的缓存失效机制。

批处理有大小限制吗?

建议单次批处理不超过 100 个操作。过大的批处理可能导致内存压力和超时。

异步任务需要清理吗?

需要。为异步任务设置超时,定期清理过期的任务记录,避免内存泄漏。

总结

MCP 性能优化的核心思路是减少不必要的调用(缓存)、合并多个操作(批处理)和避免阻塞(异步)。在实际项目中,先通过监控找到瓶颈,再有针对性地优化,避免过早优化。