Agent Observability

Agent Observability 全方案:追踪、指标、日志与告警

构建完整的 AI Agent 可观测性体系,涵盖分布式追踪、性能指标采集、结构化日志和智能告警的工程实践。

当你的 Agent 系统从原型走向生产,第一个真正的问题不是”它能不能工作”,而是”它在干什么”。Agent 的决策链路涉及 LLM 调用、工具执行、外部 API 交互,任何一环出问题都可能导致最终结果偏离预期。可观测性(Observability)就是让你看透这条链路的能力。

可观测性三大支柱

┌─────────────────────────────────────────────────┐
│              Agent Observability                 │
│                                                  │
│  ┌─────────────┐  ┌──────────┐  ┌────────────┐  │
│  │   Traces     │  │ Metrics  │  │   Logs     │  │
│  │  分布式追踪  │  │  性能指标 │  │  结构化日志 │  │
│  └──────┬──────┘  └────┬─────┘  └─────┬──────┘  │
│         │              │              │          │
│         └──────────┬───┴──────────────┘          │
│                    │                             │
│            ┌───────┴───────┐                     │
│            │   Alerting    │                     │
│            │   智能告警     │                     │
│            └───────────────┘                     │
└─────────────────────────────────────────────────┘

分布式追踪

Trace 结构设计

Agent 的每一次交互都应该生成一个完整的 Trace,记录从用户输入到最终响应的全链路:

interface AgentTrace {
  traceId: string;
  spanId: string;
  parentSpanId?: string;
  operationName: string;
  startTime: number;
  endTime?: number;
  status: 'ok' | 'error' | 'timeout';
  attributes: Record<string, any>;
  events: TraceEvent[];
}

interface TraceEvent {
  name: string;
  timestamp: number;
  attributes: Record<string, any>;
}

class Tracer {
  private spans: Map<string, AgentTrace> = new Map();

  startSpan(operationName: string, parentSpanId?: string): AgentTrace {
    const spanId = generateId();
    const traceId = parentSpanId
      ? this.spans.get(parentSpanId)?.traceId || spanId
      : spanId;

    const span: AgentTrace = {
      traceId,
      spanId,
      parentSpanId,
      operationName,
      startTime: Date.now(),
      status: 'ok',
      attributes: {},
      events: [],
    };

    this.spans.set(spanId, span);
    return span;
  }

  endSpan(spanId: string, status: 'ok' | 'error' | 'timeout' = 'ok'): void {
    const span = this.spans.get(spanId);
    if (span) {
      span.endTime = Date.now();
      span.status = status;
      this.export(span);
    }
  }

  addEvent(spanId: string, name: string, attributes: Record<string, any>): void {
    const span = this.spans.get(spanId);
    if (span) {
      span.events.push({ name, timestamp: Date.now(), attributes });
    }
  }

  private export(span: AgentTrace): void {
    // 导出到 Jaeger、Zipkin 或 OTLP 后端
    console.log(JSON.stringify(span));
  }
}

Agent 调用链追踪

一次完整的 Agent 调用可能包含多个嵌套 Span:

class AgentTracer {
  private tracer: Tracer;

  async traceAgentCall<T>(
    agentName: string,
    input: string,
    fn: () => Promise<T>
  ): Promise<T> {
    const span = this.tracer.startSpan(`agent.${agentName}`);
    span.attributes['agent.input'] = input;

    try {
      const result = await fn();
      span.attributes['agent.output'] = JSON.stringify(result);
      this.tracer.endSpan(span.spanId, 'ok');
      return result;
    } catch (error) {
      span.attributes['error.message'] = (error as Error).message;
      this.tracer.endSpan(span.spanId, 'error');
      throw error;
    }
  }

  async traceLLMCall<T>(
    model: string,
    messages: any[],
    fn: () => Promise<T>
  ): Promise<T> {
    const span = this.tracer.startSpan('llm.call');
    span.attributes['llm.model'] = model;
    span.attributes['llm.input_messages'] = messages.length;

    const startTime = Date.now();
    try {
      const result = await fn();
      const duration = Date.now() - startTime;
      span.attributes['llm.duration_ms'] = duration;
      span.attributes['llm.tokens_used'] = (result as any).usage?.total_tokens;
      this.tracer.endSpan(span.spanId, 'ok');
      return result;
    } catch (error) {
      this.tracer.endSpan(span.spanId, 'error');
      throw error;
    }
  }

  async traceToolCall<T>(
    toolName: string,
    args: any,
    fn: () => Promise<T>
  ): Promise<T> {
    const span = this.tracer.startSpan(`tool.${toolName}`);
    span.attributes['tool.name'] = toolName;
    span.attributes['tool.args'] = JSON.stringify(args);

    try {
      const result = await fn();
      span.attributes['tool.result_size'] = JSON.stringify(result).length;
      this.tracer.endSpan(span.spanId, 'ok');
      return result;
    } catch (error) {
      span.attributes['error.message'] = (error as Error).message;
      this.tracer.endSpan(span.spanId, 'error');
      throw error;
    }
  }
}

性能指标

核心指标定义

class AgentMetrics {
  private counters: Map<string, number> = new Map();
  private histograms: Map<string, number[]> = new Map();
  private gauges: Map<string, number> = new Map();

  // 计数器:单调递增
  increment(name: string, value: number = 1): void {
    this.counters.set(name, (this.counters.get(name) || 0) + value);
  }

  // 直方图:记录分布
  record(name: string, value: number): void {
    if (!this.histograms.has(name)) {
      this.histograms.set(name, []);
    }
    this.histograms.get(name)!.push(value);
  }

  // 仪表盘:当前值
  gauge(name: string, value: number): void {
    this.gauges.set(name, value);
  }

  // Agent 核心指标
  recordAgentCall(duration: number, status: 'success' | 'error'): void {
    this.increment('agent.calls.total');
    this.record('agent.calls.duration_ms', duration);
    if (status === 'error') {
      this.increment('agent.calls.errors');
    }
  }

  recordLLMCall(model: string, inputTokens: number, outputTokens: number, duration: number): void {
    this.increment('llm.calls.total');
    this.increment('llm.tokens.input', inputTokens);
    this.increment('llm.tokens.output', outputTokens);
    this.record('llm.calls.duration_ms', duration);
    this.record('llm.tokens.total', inputTokens + outputTokens);
  }

  recordToolCall(toolName: string, duration: number, success: boolean): void {
    this.increment(`tool.calls.total`);
    this.record(`tool.calls.duration_ms`, duration);
    if (!success) {
      this.increment(`tool.calls.errors`);
    }
  }

  // 计算百分位数
  percentile(name: string, p: number): number {
    const values = this.histograms.get(name) || [];
    if (values.length === 0) return 0;

    const sorted = [...values].sort((a, b) => a - b);
    const index = Math.ceil((p / 100) * sorted.length) - 1;
    return sorted[Math.max(0, index)];
  }

  // 导出 Prometheus 格式
  toPrometheus(): string {
    const lines: string[] = [];

    for (const [name, value] of this.counters) {
      lines.push(`# TYPE ${name} counter`);
      lines.push(`${name} ${value}`);
    }

    for (const [name, values] of this.histograms) {
      lines.push(`# TYPE ${name} histogram`);
      lines.push(`${name}_count ${values.length}`);
      lines.push(`${name}_sum ${values.reduce((a, b) => a + b, 0)}`);
      lines.push(`${name}_bucket{le="100"} ${values.filter(v => v <= 100).length}`);
      lines.push(`${name}_bucket{le="500"} ${values.filter(v => v <= 500).length}`);
      lines.push(`${name}_bucket{le="1000"} ${values.filter(v => v <= 1000).length}`);
      lines.push(`${name}_bucket{le="5000"} ${values.filter(v => v <= 5000).length}`);
    }

    for (const [name, value] of this.gauges) {
      lines.push(`# TYPE ${name} gauge`);
      lines.push(`${name} ${value}`);
    }

    return lines.join('\n');
  }
}

成本追踪

class CostMetrics {
  private pricing: Record<string, { input: number; output: number }> = {
    'claude-sonnet-4-20250514': { input: 3.0 / 1000000, output: 15.0 / 1000000 },
    'gpt-4o': { input: 2.5 / 1000000, output: 10.0 / 1000000 },
    'claude-haiku-4-5-20251001': { input: 0.8 / 1000000, output: 4.0 / 1000000 },
  };

  private costs: Map<string, number[]> = new Map();

  record(model: string, inputTokens: number, outputTokens: number): number {
    const price = this.pricing[model] || { input: 0, output: 0 };
    const cost = inputTokens * price.input + outputTokens * price.output;

    if (!this.costs.has(model)) {
      this.costs.set(model, []);
    }
    this.costs.get(model)!.push(cost);

    return cost;
  }

  getDailyCost(model?: string): number {
    if (model) {
      return (this.costs.get(model) || []).reduce((a, b) => a + b, 0);
    }
    return Array.from(this.costs.values())
      .flat()
      .reduce((a, b) => a + b, 0);
  }

  getCostBreakdown(): Record<string, { total: number; count: number; avg: number }> {
    const breakdown: Record<string, { total: number; count: number; avg: number }> = {};

    for (const [model, costs] of this.costs) {
      const total = costs.reduce((a, b) => a + b, 0);
      breakdown[model] = {
        total,
        count: costs.length,
        avg: total / costs.length,
      };
    }

    return breakdown;
  }
}

结构化日志

enum LogLevel {
  DEBUG = 0,
  INFO = 1,
  WARN = 2,
  ERROR = 3,
}

interface LogEntry {
  timestamp: string;
  level: string;
  message: string;
  traceId?: string;
  spanId?: string;
  agentName?: string;
  userId?: string;
  metadata?: Record<string, any>;
  error?: {
    name: string;
    message: string;
    stack?: string;
  };
}

class AgentLogger {
  private minLevel: LogLevel;
  private buffer: LogEntry[] = [];
  private flushInterval: NodeJS.Timeout;

  constructor(minLevel: LogLevel = LogLevel.INFO) {
    this.minLevel = minLevel;
    this.flushInterval = setInterval(() => this.flush(), 5000);
  }

  info(message: string, metadata?: Record<string, any>): void {
    this.log(LogLevel.INFO, message, metadata);
  }

  warn(message: string, metadata?: Record<string, any>): void {
    this.log(LogLevel.WARN, message, metadata);
  }

  error(message: string, error?: Error, metadata?: Record<string, any>): void {
    this.log(LogLevel.ERROR, message, {
      ...metadata,
      error: error ? {
        name: error.name,
        message: error.message,
        stack: error.stack,
      } : undefined,
    });
  }

  private log(level: LogLevel, message: string, metadata?: Record<string, any>): void {
    if (level < this.minLevel) return;

    const entry: LogEntry = {
      timestamp: new Date().toISOString(),
      level: LogLevel[level],
      message,
      ...metadata,
    };

    this.buffer.push(entry);

    // 错误级别立即输出
    if (level >= LogLevel.ERROR) {
      this.flush();
    }
  }

  private flush(): void {
    if (this.buffer.length === 0) return;

    const entries = [...this.buffer];
    this.buffer = [];

    // 批量写入日志收集器
    for (const entry of entries) {
      console.log(JSON.stringify(entry));
    }
  }
}

智能告警

interface AlertRule {
  name: string;
  condition: (metrics: AgentMetrics) => boolean;
  severity: 'low' | 'medium' | 'high' | 'critical';
  cooldownMs: number;
  lastTriggered?: number;
}

class AlertEngine {
  private rules: AlertRule[] = [];
  private handlers: Array<(alert: Alert) => void> = [];

  addRule(rule: AlertRule): void {
    this.rules.push(rule);
  }

  onAlert(handler: (alert: Alert) => void): void {
    this.handlers.push(handler);
  }

  evaluate(metrics: AgentMetrics): void {
    const now = Date.now();

    for (const rule of this.rules) {
      // 冷却期检查
      if (rule.lastTriggered && now - rule.lastTriggered < rule.cooldownMs) {
        continue;
      }

      if (rule.condition(metrics)) {
        rule.lastTriggered = now;
        const alert: Alert = {
          rule: rule.name,
          severity: rule.severity,
          timestamp: now,
          metrics: this.snapshotMetrics(metrics),
        };

        for (const handler of this.handlers) {
          handler(alert);
        }
      }
    }
  }

  private snapshotMetrics(metrics: AgentMetrics): Record<string, number> {
    return {
      p50_latency: metrics.percentile('agent.calls.duration_ms', 50),
      p99_latency: metrics.percentile('agent.calls.duration_ms', 99),
      error_rate: this.calculateErrorRate(metrics),
    };
  }

  private calculateErrorRate(metrics: AgentMetrics): number {
    const total = metrics.histograms.get('agent.calls.total')?.length || 0;
    const errors = metrics.counters.get('agent.calls.errors') || 0;
    return total > 0 ? errors / total : 0;
  }
}

// 预置告警规则
const defaultAlertRules: AlertRule[] = [
  {
    name: 'high_error_rate',
    condition: (m) => {
      const total = m.histograms.get('agent.calls.total')?.length || 0;
      const errors = m.counters.get('agent.calls.errors') || 0;
      return total > 10 && errors / total > 0.1;
    },
    severity: 'high',
    cooldownMs: 300000,
  },
  {
    name: 'high_latency_p99',
    condition: (m) => m.percentile('agent.calls.duration_ms', 99) > 30000,
    severity: 'medium',
    cooldownMs: 300000,
  },
  {
    name: 'llm_cost_spike',
    condition: (m) => {
      const totalTokens = m.counters.get('llm.tokens.total') || 0;
      return totalTokens > 1000000;
    },
    severity: 'medium',
    cooldownMs: 3600000,
  },
];

可视化仪表盘

class DashboardBuilder {
  buildAgentOverview(metrics: AgentMetrics): DashboardData {
    return {
      panels: [
        {
          title: 'Agent 调用量',
          type: 'counter',
          value: metrics.histograms.get('agent.calls.total')?.length || 0,
        },
        {
          title: 'P50 延迟',
          type: 'gauge',
          value: metrics.percentile('agent.calls.duration_ms', 50),
          unit: 'ms',
        },
        {
          title: 'P99 延迟',
          type: 'gauge',
          value: metrics.percentile('agent.calls.duration_ms', 99),
          unit: 'ms',
        },
        {
          title: '错误率',
          type: 'gauge',
          value: this.calculateErrorRate(metrics),
          unit: '%',
        },
        {
          title: 'Token 使用量',
          type: 'counter',
          value: metrics.counters.get('llm.tokens.total') || 0,
        },
      ],
    };
  }
}

常见问题(FAQ)

可观测性会增加多少性能开销?

通常 2-5% 的性能开销。关键是使用异步批量导出,避免同步写入阻塞主流程。

如何选择追踪后端?

小规模用 Jaeger,大规模用 Tempo + Grafana,云原生用 OpenTelemetry Collector。

Agent 日志量太大怎么控制?

按级别过滤,采样非关键路径的 DEBUG 日志,使用结构化日志减少冗余。

总结

Agent 可观测性是生产级系统的必备能力。通过分布式追踪看透调用链路,通过性能指标量化系统表现,通过结构化日志定位具体问题,通过智能告警及时发现异常。这四大支柱共同构成了 Agent 系统的”眼睛”和”耳朵”。