Agent 安全纵深防御:认证、授权、审计与合规
构建企业级 AI Agent 安全体系,涵盖身份认证、权限控制、操作审计和合规治理的完整方案。
当 Agent 开始代表用户执行操作——读写数据库、调用 API、发送消息——安全就不再是”可选项”。一个没有完善安全机制的 Agent 系统,就像一个没有门禁的办公楼:任何人都能进,做什么都行。
安全架构总览
┌─────────────────────────────────────────────────┐
│ Agent Security Stack │
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ Layer 1: 身份认证 (Authentication) │ │
│ │ - Agent 身份 - 用户身份 - 服务身份 │ │
│ └──────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────┐ │
│ │ Layer 2: 权限控制 (Authorization) │ │
│ │ - RBAC - ABAC - 工具级权限 │ │
│ └──────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────┐ │
│ │ Layer 3: 操作审计 (Audit) │ │
│ │ - 全链路追踪 - 操作日志 - 异常检测 │ │
│ └──────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────┐ │
│ │ Layer 4: 合规治理 (Compliance) │ │
│ │ - 数据脱敏 - 访问控制 - 策略执行 │ │
│ └──────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘
身份认证
Agent 身份
每个 Agent 都需要唯一的身份标识:
class AgentIdentity {
private certificates: Map<string, Certificate> = new Map();
async authenticate(agentId: string, credential: Credential): Promise<AuthResult> {
const cert = this.certificates.get(agentId);
if (!cert) {
return { success: false, reason: 'Agent not registered' };
}
// 验证证书
const valid = await this.verifyCertificate(cert, credential);
if (!valid) {
return { success: false, reason: 'Invalid credential' };
}
// 检查证书是否过期
if (cert.expiresAt < Date.now()) {
return { success: false, reason: 'Certificate expired' };
}
return {
success: true,
identity: {
agentId,
permissions: cert.permissions,
capabilities: cert.capabilities,
},
};
}
async issueCertificate(agentId: string, config: CertificateConfig): Promise<Certificate> {
const cert: Certificate = {
agentId,
publicKey: await this.generateKeyPair(),
permissions: config.permissions,
capabilities: config.capabilities,
issuedAt: Date.now(),
expiresAt: Date.now() + config.validityDays * 24 * 60 * 60 * 1000,
};
this.certificates.set(agentId, cert);
return cert;
}
}
用户身份传递
Agent 代表用户操作时,需要传递用户身份上下文:
class UserIdentityPropagation {
async propagateUserContext(
userId: string,
agentId: string,
action: string
): Promise<PropagatedContext> {
// 获取用户权限
const userPerms = await this.getUserPermissions(userId);
// 获取 Agent 权限
const agentPerms = await this.getAgentPermissions(agentId);
// 取交集:Agent 只能使用用户和 Agent 都有的权限
const effectivePerms = this.intersectPermissions(userPerms, agentPerms);
return {
userId,
agentId,
permissions: effectivePerms,
constraints: {
maxDuration: 3600,
allowedResources: this.getAllowedResources(userId, agentId),
},
};
}
}
权限控制
RBAC(基于角色的访问控制)
class RBACManager {
private roles: Map<string, Role> = new Map();
private assignments: Map<string, string[]> = new Map(); // agentId → roleIds
assignRole(agentId: string, roleId: string): void {
const roles = this.assignments.get(agentId) || [];
if (!roles.includes(roleId)) {
roles.push(roleId);
this.assignments.set(agentId, roles);
}
}
checkPermission(agentId: string, resource: string, action: string): boolean {
const roleIds = this.assignments.get(agentId) || [];
for (const roleId of roleIds) {
const role = this.roles.get(roleId);
if (!role) continue;
const permission = `${resource}:${action}`;
if (role.permissions.includes(permission) || role.permissions.includes(`${resource}:*`)) {
return true;
}
}
return false;
}
}
ABAC(基于属性的访问控制)
class ABACManager {
private policies: Policy[] = [];
evaluate(request: AccessRequest): AccessDecision {
for (const policy of this.policies) {
if (this.matches(policy, request)) {
return policy.effect; // 'allow' or 'deny'
}
}
return 'deny'; // 默认拒绝
}
private matches(policy: Policy, request: AccessRequest): boolean {
// 检查主体属性
if (!this.matchAttributes(policy.subject, request.subject)) {
return false;
}
// 检查资源属性
if (!this.matchAttributes(policy.resource, request.resource)) {
return false;
}
// 检查环境属性
if (!this.matchAttributes(policy.environment, request.environment)) {
return false;
}
return true;
}
}
// 策略示例
const policies: Policy[] = [
{
name: 'allow-read-during-business-hours',
subject: { role: 'data-analyst' },
resource: { type: 'database', classification: 'public' },
environment: { timeRange: '09:00-18:00' },
effect: 'allow',
},
{
name: 'deny-write-production',
subject: { role: 'developer' },
resource: { type: 'database', environment: 'production' },
effect: 'deny',
},
];
工具级权限
class ToolPermissionManager {
private toolPolicies: Map<string, ToolPolicy> = new Map();
async canExecute(
agentId: string,
toolName: string,
args: any
): Promise<PermissionResult> {
const policy = this.toolPolicies.get(toolName);
if (!policy) {
return { allowed: false, reason: 'No policy defined for tool' };
}
// 检查 Agent 是否有权限使用此工具
if (!policy.allowedAgents.includes(agentId) && policy.allowedAgents.length > 0) {
return { allowed: false, reason: 'Agent not authorized for this tool' };
}
// 检查参数是否合规
const argsValid = await this.validateArgs(toolName, args, policy);
if (!argsValid) {
return { allowed: false, reason: 'Invalid arguments' };
}
// 检查速率限制
const rateOk = await this.checkRateLimit(agentId, toolName);
if (!rateOk) {
return { allowed: false, reason: 'Rate limit exceeded' };
}
return { allowed: true };
}
private async validateArgs(toolName: string, args: any, policy: ToolPolicy): Promise<boolean> {
// 验证参数模式
if (policy.argsSchema) {
return this.schemaValidate(args, policy.argsSchema);
}
// 检查敏感参数
for (const sensitive of policy.sensitiveArgs || []) {
if (args[sensitive] && !this.isEncrypted(args[sensitive])) {
return false;
}
}
return true;
}
}
操作审计
class AuditSystem {
private logStore: AuditLogStore;
private anomalyDetector: AnomalyDetector;
async log(entry: AuditEntry): Promise<void> {
// 记录审计日志
await this.logStore.store({
...entry,
timestamp: Date.now(),
hash: await this.computeHash(entry),
});
// 实时异常检测
const anomaly = await this.anomalyDetector.check(entry);
if (anomaly) {
await this.alertSecurity(anomaly);
}
}
async query(filter: AuditFilter): Promise<AuditEntry[]> {
return await this.logStore.query(filter);
}
async generateReport(timeRange: TimeRange): Promise<SecurityReport> {
const entries = await this.query({ timeRange });
return {
totalActions: entries.length,
byAgent: this.groupBy(entries, 'agentId'),
byAction: this.groupBy(entries, 'action'),
anomalies: entries.filter(e => e.flagged),
topRiskActions: this.getTopRiskActions(entries),
};
}
}
class AnomalyDetector {
private baseline: BehaviorBaseline;
async check(entry: AuditEntry): Promise<Anomaly | null> {
// 检查异常访问模式
if (this.isUnusualTime(entry)) {
return { type: 'unusual_time', severity: 'medium', entry };
}
// 检查异常频率
if (await this.isHighFrequency(entry)) {
return { type: 'high_frequency', severity: 'high', entry };
}
// 检查异常资源访问
if (await this.isUnusualResource(entry)) {
return { type: 'unusual_resource', severity: 'medium', entry };
}
return null;
}
}
数据安全
数据脱敏
class DataMasker {
private rules: MaskingRule[] = [
{ pattern: /\b\d{3}-\d{2}-\d{4}\b/, replacement: '***-**-****', type: 'ssn' },
{ pattern: /\b\d{16}\b/, replacement: '****-****-****-****', type: 'credit_card' },
{ pattern: /\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/, replacement: '[EMAIL]', type: 'email' },
{ pattern: /\b1[3-9]\d{9}\b/, replacement: '[PHONE]', type: 'phone' },
];
mask(data: any, context: MaskingContext): any {
if (typeof data === 'string') {
return this.maskString(data, context);
}
if (Array.isArray(data)) {
return data.map(item => this.mask(item, context));
}
if (typeof data === 'object' && data !== null) {
const masked: any = {};
for (const [key, value] of Object.entries(data)) {
// 根据字段名判断是否需要脱敏
if (this.isSensitiveField(key, context)) {
masked[key] = this.maskValue(value, context);
} else {
masked[key] = this.mask(value, context);
}
}
return masked;
}
return data;
}
private maskString(text: string, context: MaskingContext): string {
let result = text;
for (const rule of this.rules) {
if (context.allowedTypes?.includes(rule.type)) continue;
result = result.replace(rule.pattern, rule.replacement);
}
return result;
}
}
最小权限原则
class LeastPrivilegeEnforcer {
async enforce(agentId: string, action: Action): Promise<void> {
// 获取 Agent 当前权限
const currentPerms = await this.getAgentPermissions(agentId);
// 计算完成任务所需的最小权限
const requiredPerms = await this.calculateMinimumPermissions(action);
// 如果当前权限超出最小权限,降级
const excess = this.findExcessPermissions(currentPerms, requiredPerms);
if (excess.length > 0) {
await this.revokePermissions(agentId, excess);
await this.audit.log({
agentId,
action: 'permission_downgrade',
revoked: excess,
reason: 'Least privilege enforcement',
});
}
}
}
合规框架
class ComplianceEngine {
private frameworks: ComplianceFramework[] = [];
async checkCompliance(agentId: string): Promise<ComplianceReport> {
const results: ComplianceCheck[] = [];
for (const framework of this.frameworks) {
const checks = await framework.evaluate(agentId);
results.push(...checks);
}
return {
agentId,
timestamp: Date.now(),
checks: results,
passed: results.every(r => r.status === 'pass'),
score: this.calculateScore(results),
};
}
}
// GDPR 合规检查
class GDPRCompliance implements ComplianceFramework {
async evaluate(agentId: string): Promise<ComplianceCheck[]> {
return [
{
check: 'data_minimization',
description: 'Agent 只收集必要数据',
status: await this.checkDataMinimization(agentId),
},
{
check: 'purpose_limitation',
description: '数据使用符合声明目的',
status: await this.checkPurposeLimitation(agentId),
},
{
check: 'right_to_erasure',
description: '支持数据删除请求',
status: await this.checkErasureCapability(agentId),
},
];
}
}
常见问题(FAQ)
Agent 安全和传统应用安全有什么区别?
Agent 安全需要额外考虑:Prompt Injection 防御、工具调用权限、LLM 输出验证、用户意图与 Agent 行为的一致性。
如何平衡安全性和易用性?
使用渐进式安全:低风险操作宽松,高风险操作严格。通过信任评分动态调整安全策略。
安全审计会影响性能吗?
异步审计可以将影响降到 1-2%。关键路径使用同步审计,非关键路径使用异步批量审计。
总结
Agent 安全需要纵深防御策略。从身份认证到权限控制,从操作审计到合规治理,每一层都提供独立的安全保障。安全不是一次性工程,而是需要持续演进的体系。