Agent Security

Agent 安全纵深防御:认证、授权、审计与合规

构建企业级 AI Agent 安全体系,涵盖身份认证、权限控制、操作审计和合规治理的完整方案。

当 Agent 开始代表用户执行操作——读写数据库、调用 API、发送消息——安全就不再是”可选项”。一个没有完善安全机制的 Agent 系统,就像一个没有门禁的办公楼:任何人都能进,做什么都行。

安全架构总览

┌─────────────────────────────────────────────────┐
│              Agent Security Stack                │
│                                                  │
│  ┌──────────────────────────────────────────┐   │
│  │  Layer 1: 身份认证 (Authentication)      │   │
│  │  - Agent 身份    - 用户身份    - 服务身份  │   │
│  └──────────────────────────────────────────┘   │
│  ┌──────────────────────────────────────────┐   │
│  │  Layer 2: 权限控制 (Authorization)       │   │
│  │  - RBAC    - ABAC    - 工具级权限         │   │
│  └──────────────────────────────────────────┘   │
│  ┌──────────────────────────────────────────┐   │
│  │  Layer 3: 操作审计 (Audit)               │   │
│  │  - 全链路追踪  - 操作日志  - 异常检测     │   │
│  └──────────────────────────────────────────┘   │
│  ┌──────────────────────────────────────────┐   │
│  │  Layer 4: 合规治理 (Compliance)          │   │
│  │  - 数据脱敏  - 访问控制  - 策略执行       │   │
│  └──────────────────────────────────────────┘   │
└─────────────────────────────────────────────────┘

身份认证

Agent 身份

每个 Agent 都需要唯一的身份标识:

class AgentIdentity {
  private certificates: Map<string, Certificate> = new Map();

  async authenticate(agentId: string, credential: Credential): Promise<AuthResult> {
    const cert = this.certificates.get(agentId);
    if (!cert) {
      return { success: false, reason: 'Agent not registered' };
    }

    // 验证证书
    const valid = await this.verifyCertificate(cert, credential);
    if (!valid) {
      return { success: false, reason: 'Invalid credential' };
    }

    // 检查证书是否过期
    if (cert.expiresAt < Date.now()) {
      return { success: false, reason: 'Certificate expired' };
    }

    return {
      success: true,
      identity: {
        agentId,
        permissions: cert.permissions,
        capabilities: cert.capabilities,
      },
    };
  }

  async issueCertificate(agentId: string, config: CertificateConfig): Promise<Certificate> {
    const cert: Certificate = {
      agentId,
      publicKey: await this.generateKeyPair(),
      permissions: config.permissions,
      capabilities: config.capabilities,
      issuedAt: Date.now(),
      expiresAt: Date.now() + config.validityDays * 24 * 60 * 60 * 1000,
    };

    this.certificates.set(agentId, cert);
    return cert;
  }
}

用户身份传递

Agent 代表用户操作时,需要传递用户身份上下文:

class UserIdentityPropagation {
  async propagateUserContext(
    userId: string,
    agentId: string,
    action: string
  ): Promise<PropagatedContext> {
    // 获取用户权限
    const userPerms = await this.getUserPermissions(userId);

    // 获取 Agent 权限
    const agentPerms = await this.getAgentPermissions(agentId);

    // 取交集:Agent 只能使用用户和 Agent 都有的权限
    const effectivePerms = this.intersectPermissions(userPerms, agentPerms);

    return {
      userId,
      agentId,
      permissions: effectivePerms,
      constraints: {
        maxDuration: 3600,
        allowedResources: this.getAllowedResources(userId, agentId),
      },
    };
  }
}

权限控制

RBAC(基于角色的访问控制)

class RBACManager {
  private roles: Map<string, Role> = new Map();
  private assignments: Map<string, string[]> = new Map(); // agentId → roleIds

  assignRole(agentId: string, roleId: string): void {
    const roles = this.assignments.get(agentId) || [];
    if (!roles.includes(roleId)) {
      roles.push(roleId);
      this.assignments.set(agentId, roles);
    }
  }

  checkPermission(agentId: string, resource: string, action: string): boolean {
    const roleIds = this.assignments.get(agentId) || [];

    for (const roleId of roleIds) {
      const role = this.roles.get(roleId);
      if (!role) continue;

      const permission = `${resource}:${action}`;
      if (role.permissions.includes(permission) || role.permissions.includes(`${resource}:*`)) {
        return true;
      }
    }

    return false;
  }
}

ABAC(基于属性的访问控制)

class ABACManager {
  private policies: Policy[] = [];

  evaluate(request: AccessRequest): AccessDecision {
    for (const policy of this.policies) {
      if (this.matches(policy, request)) {
        return policy.effect; // 'allow' or 'deny'
      }
    }

    return 'deny'; // 默认拒绝
  }

  private matches(policy: Policy, request: AccessRequest): boolean {
    // 检查主体属性
    if (!this.matchAttributes(policy.subject, request.subject)) {
      return false;
    }

    // 检查资源属性
    if (!this.matchAttributes(policy.resource, request.resource)) {
      return false;
    }

    // 检查环境属性
    if (!this.matchAttributes(policy.environment, request.environment)) {
      return false;
    }

    return true;
  }
}

// 策略示例
const policies: Policy[] = [
  {
    name: 'allow-read-during-business-hours',
    subject: { role: 'data-analyst' },
    resource: { type: 'database', classification: 'public' },
    environment: { timeRange: '09:00-18:00' },
    effect: 'allow',
  },
  {
    name: 'deny-write-production',
    subject: { role: 'developer' },
    resource: { type: 'database', environment: 'production' },
    effect: 'deny',
  },
];

工具级权限

class ToolPermissionManager {
  private toolPolicies: Map<string, ToolPolicy> = new Map();

  async canExecute(
    agentId: string,
    toolName: string,
    args: any
  ): Promise<PermissionResult> {
    const policy = this.toolPolicies.get(toolName);
    if (!policy) {
      return { allowed: false, reason: 'No policy defined for tool' };
    }

    // 检查 Agent 是否有权限使用此工具
    if (!policy.allowedAgents.includes(agentId) && policy.allowedAgents.length > 0) {
      return { allowed: false, reason: 'Agent not authorized for this tool' };
    }

    // 检查参数是否合规
    const argsValid = await this.validateArgs(toolName, args, policy);
    if (!argsValid) {
      return { allowed: false, reason: 'Invalid arguments' };
    }

    // 检查速率限制
    const rateOk = await this.checkRateLimit(agentId, toolName);
    if (!rateOk) {
      return { allowed: false, reason: 'Rate limit exceeded' };
    }

    return { allowed: true };
  }

  private async validateArgs(toolName: string, args: any, policy: ToolPolicy): Promise<boolean> {
    // 验证参数模式
    if (policy.argsSchema) {
      return this.schemaValidate(args, policy.argsSchema);
    }

    // 检查敏感参数
    for (const sensitive of policy.sensitiveArgs || []) {
      if (args[sensitive] && !this.isEncrypted(args[sensitive])) {
        return false;
      }
    }

    return true;
  }
}

操作审计

class AuditSystem {
  private logStore: AuditLogStore;
  private anomalyDetector: AnomalyDetector;

  async log(entry: AuditEntry): Promise<void> {
    // 记录审计日志
    await this.logStore.store({
      ...entry,
      timestamp: Date.now(),
      hash: await this.computeHash(entry),
    });

    // 实时异常检测
    const anomaly = await this.anomalyDetector.check(entry);
    if (anomaly) {
      await this.alertSecurity(anomaly);
    }
  }

  async query(filter: AuditFilter): Promise<AuditEntry[]> {
    return await this.logStore.query(filter);
  }

  async generateReport(timeRange: TimeRange): Promise<SecurityReport> {
    const entries = await this.query({ timeRange });

    return {
      totalActions: entries.length,
      byAgent: this.groupBy(entries, 'agentId'),
      byAction: this.groupBy(entries, 'action'),
      anomalies: entries.filter(e => e.flagged),
      topRiskActions: this.getTopRiskActions(entries),
    };
  }
}

class AnomalyDetector {
  private baseline: BehaviorBaseline;

  async check(entry: AuditEntry): Promise<Anomaly | null> {
    // 检查异常访问模式
    if (this.isUnusualTime(entry)) {
      return { type: 'unusual_time', severity: 'medium', entry };
    }

    // 检查异常频率
    if (await this.isHighFrequency(entry)) {
      return { type: 'high_frequency', severity: 'high', entry };
    }

    // 检查异常资源访问
    if (await this.isUnusualResource(entry)) {
      return { type: 'unusual_resource', severity: 'medium', entry };
    }

    return null;
  }
}

数据安全

数据脱敏

class DataMasker {
  private rules: MaskingRule[] = [
    { pattern: /\b\d{3}-\d{2}-\d{4}\b/, replacement: '***-**-****', type: 'ssn' },
    { pattern: /\b\d{16}\b/, replacement: '****-****-****-****', type: 'credit_card' },
    { pattern: /\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/, replacement: '[EMAIL]', type: 'email' },
    { pattern: /\b1[3-9]\d{9}\b/, replacement: '[PHONE]', type: 'phone' },
  ];

  mask(data: any, context: MaskingContext): any {
    if (typeof data === 'string') {
      return this.maskString(data, context);
    }

    if (Array.isArray(data)) {
      return data.map(item => this.mask(item, context));
    }

    if (typeof data === 'object' && data !== null) {
      const masked: any = {};
      for (const [key, value] of Object.entries(data)) {
        // 根据字段名判断是否需要脱敏
        if (this.isSensitiveField(key, context)) {
          masked[key] = this.maskValue(value, context);
        } else {
          masked[key] = this.mask(value, context);
        }
      }
      return masked;
    }

    return data;
  }

  private maskString(text: string, context: MaskingContext): string {
    let result = text;
    for (const rule of this.rules) {
      if (context.allowedTypes?.includes(rule.type)) continue;
      result = result.replace(rule.pattern, rule.replacement);
    }
    return result;
  }
}

最小权限原则

class LeastPrivilegeEnforcer {
  async enforce(agentId: string, action: Action): Promise<void> {
    // 获取 Agent 当前权限
    const currentPerms = await this.getAgentPermissions(agentId);

    // 计算完成任务所需的最小权限
    const requiredPerms = await this.calculateMinimumPermissions(action);

    // 如果当前权限超出最小权限,降级
    const excess = this.findExcessPermissions(currentPerms, requiredPerms);
    if (excess.length > 0) {
      await this.revokePermissions(agentId, excess);
      await this.audit.log({
        agentId,
        action: 'permission_downgrade',
        revoked: excess,
        reason: 'Least privilege enforcement',
      });
    }
  }
}

合规框架

class ComplianceEngine {
  private frameworks: ComplianceFramework[] = [];

  async checkCompliance(agentId: string): Promise<ComplianceReport> {
    const results: ComplianceCheck[] = [];

    for (const framework of this.frameworks) {
      const checks = await framework.evaluate(agentId);
      results.push(...checks);
    }

    return {
      agentId,
      timestamp: Date.now(),
      checks: results,
      passed: results.every(r => r.status === 'pass'),
      score: this.calculateScore(results),
    };
  }
}

// GDPR 合规检查
class GDPRCompliance implements ComplianceFramework {
  async evaluate(agentId: string): Promise<ComplianceCheck[]> {
    return [
      {
        check: 'data_minimization',
        description: 'Agent 只收集必要数据',
        status: await this.checkDataMinimization(agentId),
      },
      {
        check: 'purpose_limitation',
        description: '数据使用符合声明目的',
        status: await this.checkPurposeLimitation(agentId),
      },
      {
        check: 'right_to_erasure',
        description: '支持数据删除请求',
        status: await this.checkErasureCapability(agentId),
      },
    ];
  }
}

常见问题(FAQ)

Agent 安全和传统应用安全有什么区别?

Agent 安全需要额外考虑:Prompt Injection 防御、工具调用权限、LLM 输出验证、用户意图与 Agent 行为的一致性。

如何平衡安全性和易用性?

使用渐进式安全:低风险操作宽松,高风险操作严格。通过信任评分动态调整安全策略。

安全审计会影响性能吗?

异步审计可以将影响降到 1-2%。关键路径使用同步审计,非关键路径使用异步批量审计。

总结

Agent 安全需要纵深防御策略。从身份认证到权限控制,从操作审计到合规治理,每一层都提供独立的安全保障。安全不是一次性工程,而是需要持续演进的体系。