AI Workflow

AI Workflow Engine 架构:构建可靠的 Agent 任务编排系统

深入探讨 AI Workflow Engine 的架构设计,包括任务编排、状态管理、错误处理和可视化的完整实现方案。

当 Agent 需要执行多步骤的复杂任务时,简单的 ReAct 循环就不够用了。你需要一个 Workflow Engine 来定义、执行和管理复杂的工作流——包括条件分支、并行执行、错误处理和人工审批。

Workflow Engine 核心概念

工作流定义

使用 DAG(有向无环图)描述任务的执行顺序和依赖关系:

interface WorkflowDefinition {
  id: string;
  name: string;
  nodes: WorkflowNode[];
  edges: WorkflowEdge[];
}

interface WorkflowNode {
  id: string;
  type: 'agent' | 'tool' | 'condition' | 'human-review';
  config: Record<string, any>;
}

interface WorkflowEdge {
  from: string;
  to: string;
  condition?: string;
}

执行引擎

class WorkflowEngine {
  private workflows: Map<string, WorkflowDefinition> = new Map();
  private executions: Map<string, ExecutionContext> = new Map();

  async execute(workflowId: string, input: any): Promise<ExecutionResult> {
    const workflow = this.workflows.get(workflowId);
    if (!workflow) throw new Error(`Workflow ${workflowId} not found`);

    const executionId = generateId();
    const context: ExecutionContext = {
      id: executionId,
      workflowId,
      status: 'running',
      nodeStates: new Map(),
      input,
      output: null,
      startedAt: Date.now(),
    };

    this.executions.set(executionId, context);

    try {
      const result = await this.runWorkflow(workflow, context);
      context.status = 'completed';
      context.output = result;
      return result;
    } catch (error) {
      context.status = 'failed';
      context.error = error.message;
      throw error;
    }
  }

  private async runWorkflow(
    workflow: WorkflowDefinition,
    context: ExecutionContext
  ): Promise<any> {
    // 拓扑排序确定执行顺序
    const order = this.topologicalSort(workflow);

    let result: any = context.input;

    for (const nodeId of order) {
      const node = workflow.nodes.find(n => n.id === nodeId)!;
      const incomingEdges = workflow.edges.filter(e => e.to === nodeId);

      // 检查条件
      const shouldExecute = incomingEdges.every(edge => {
        if (!edge.condition) return true;
        return this.evaluateCondition(edge.condition, context);
      });

      if (!shouldExecute) continue;

      // 执行节点
      result = await this.executeNode(node, result, context);
      context.nodeStates.set(nodeId, { status: 'completed', output: result });
    }

    return result;
  }

  private async executeNode(
    node: WorkflowNode,
    input: any,
    context: ExecutionContext
  ): Promise<any> {
    switch (node.type) {
      case 'agent':
        return await this.executeAgentNode(node, input);
      case 'tool':
        return await this.executeToolNode(node, input);
      case 'condition':
        return await this.evaluateConditionNode(node, input);
      case 'human-review':
        return await this.waitForHumanReview(node, input, context);
    }
  }
}

并行执行

class ParallelExecutor {
  async executeParallel(nodes: WorkflowNode[], input: any): Promise<Map<string, any>> {
    const results = new Map();

    const promises = nodes.map(async node => {
      const result = await this.executeNode(node, input);
      results.set(node.id, result);
    });

    await Promise.allSettled(promises);
    return results;
  }
}

条件分支

class ConditionEvaluator {
  evaluate(condition: string, context: ExecutionContext): boolean {
    // 简单的表达式求值
    const func = new Function('context', `return ${condition}`);
    try {
      return func(context);
    } catch {
      return false;
    }
  }
}

错误处理

重试策略

class RetryStrategy {
  async executeWithRetry<T>(
    fn: () => Promise<T>,
    config: RetryConfig
  ): Promise<T> {
    let lastError: Error | null = null;

    for (let attempt = 0; attempt <= config.maxRetries; attempt++) {
      try {
        return await fn();
      } catch (error) {
        lastError = error as Error;

        if (attempt < config.maxRetries) {
          const delay = config.backoffMs * Math.pow(2, attempt);
          await new Promise(r => setTimeout(r, delay));
        }
      }
    }

    throw lastError;
  }
}

补偿机制

当工作流失败时,执行已经完成步骤的补偿操作:

class CompensationEngine {
  private compensations: Map<string, Function> = new Map();

  async compensate(context: ExecutionContext): Promise<void> {
    const completedNodes = Array.from(context.nodeStates.entries())
      .filter(([_, state]) => state.status === 'completed')
      .reverse();  // 逆序补偿

    for (const [nodeId, _] of completedNodes) {
      const compensation = this.compensations.get(nodeId);
      if (compensation) {
        try {
          await compensation(context);
        } catch (error) {
          console.error(`补偿失败: ${nodeId}`, error);
        }
      }
    }
  }
}

可视化

工作流的可视化对于调试和监控至关重要:

class WorkflowVisualizer {
  toMermaid(workflow: WorkflowDefinition): string {
    const lines = ['graph TD'];

    for (const node of workflow.nodes) {
      const label = node.config.name || node.id;
      lines.push(`  ${node.id}["${label}"]`);
    }

    for (const edge of workflow.edges) {
      if (edge.condition) {
        lines.push(`  ${edge.from} -->|${edge.condition}| ${edge.to}`);
      } else {
        lines.push(`  ${edge.from} --> ${edge.to}`);
      }
    }

    return lines.join('\n');
  }
}

常见问题(FAQ)

Workflow Engine 和 Agent ReAct 循环有什么区别?

ReAct 循环是单 Agent 的推理-行动循环,适合简单任务。Workflow Engine 管理多步骤、多 Agent 的复杂工作流,支持条件、并行和人工审批。

如何处理长时间运行的工作流?

使用持久化存储保存工作流状态,支持暂停和恢复。通过 Webhook 或消息队列通知外部事件。

工作流版本管理怎么做?

使用版本号管理工作流定义,支持灰度发布和回滚。

总结

AI Workflow Engine 为复杂 Agent 任务提供了可靠的编排能力。通过 DAG 定义任务流程,支持条件分支、并行执行和错误补偿,让 Agent 系统能够处理真实世界中的复杂业务场景。