# Workflow Orchestrator Multi-stage workflow execution with cost-efficient agent coordination and artifact passing. ## Overview The Workflow Orchestrator (`vapora-workflow-engine`) enables cost-efficient multi-agent pipelines by executing workflows as discrete stages with short-lived agent contexts. Instead of accumulating context in long sessions, agents receive only what they need, produce artifacts, and terminate. **Key Benefit**: ~95% reduction in LLM cache token costs compared to monolithic session patterns. ## Architecture ### Core Components ```text ┌─────────────────────────────────────────────────────────┐ │ WorkflowOrchestrator │ │ ┌─────────────────────────────────────────────────┐ │ │ │ WorkflowInstance │ │ │ │ ├─ workflow_id: UUID │ │ │ │ ├─ template: WorkflowConfig │ │ │ │ ├─ current_stage: usize │ │ │ │ ├─ stage_states: Vec │ │ │ │ └─ artifacts: HashMap │ │ │ └─────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────┘ │ │ │ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ NATS │ │ Swarm │ │ KG │ │ Listener │ │Coordinator│ │Persistence│ └──────────┘ └──────────┘ └──────────┘ ``` ### Workflow Lifecycle 1. **Template Loading**: Read workflow definition from `config/workflows.toml` 2. **Instance Creation**: Create `WorkflowInstance` with initial context 3. **Stage Execution**: Orchestrator assigns tasks to agents via SwarmCoordinator 4. **Event Listening**: NATS subscribers wait for `TaskCompleted`/`TaskFailed` events 5. **Stage Advancement**: When all tasks complete, advance to next stage 6. **Artifact Passing**: Accumulated artifacts passed to subsequent stages 7. **Completion**: Workflow marked complete, metrics recorded ## Workflow Templates Pre-configured workflows in `config/workflows.toml`: ### feature_development (5 stages) ```toml [[workflows]] name = "feature_development" trigger = "manual" [[workflows.stages]] name = "architecture_design" agents = ["architect"] parallel = false approval_required = false [[workflows.stages]] name = "implementation" agents = ["developer", "developer"] parallel = true max_parallel = 2 approval_required = false [[workflows.stages]] name = "testing" agents = ["tester"] parallel = false approval_required = false [[workflows.stages]] name = "code_review" agents = ["reviewer"] parallel = false approval_required = true [[workflows.stages]] name = "deployment" agents = ["devops"] parallel = false approval_required = true ``` **Stages**: architecture → implementation (parallel) → testing → review (approval) → deployment (approval) ### bugfix (4 stages) **Stages**: investigation → fix → testing → deployment ### documentation_update (3 stages) **Stages**: content creation → review (approval) → publish ### security_audit (4 stages) **Stages**: code analysis → penetration testing → remediation → verification (approval) ## Stage Types ### Sequential Stages Single agent executes task, advances when complete. ```toml [[workflows.stages]] name = "architecture_design" agents = ["architect"] parallel = false ``` ### Parallel Stages Multiple agents execute tasks simultaneously. ```toml [[workflows.stages]] name = "implementation" agents = ["developer", "developer"] parallel = true max_parallel = 2 ``` ### Approval Gates Stage requires manual approval before advancing. ```toml [[workflows.stages]] name = "deployment" agents = ["devops"] approval_required = true ``` When `approval_required = true`: 1. Workflow pauses with status `waiting_approval:` 2. NATS event published to `vapora.workflow.approval_required` 3. Admin approves via API or CLI 4. Workflow resumes execution ## Artifacts Data passed between stages: ### Artifact Types ```rust pub enum ArtifactType { Adr, // Architecture Decision Record Code, // Source code files TestResults, // Test execution output Review, // Code review feedback Documentation, // Generated docs Custom(String), // User-defined type } ``` ### Artifact Flow ```text Stage 1: Architecture └─ Produces: Artifact(Adr, "design-spec", ...) │ ▼ Stage 2: Implementation ├─ Consumes: design-spec └─ Produces: Artifact(Code, "feature-impl", ...) │ ▼ Stage 3: Testing ├─ Consumes: feature-impl └─ Produces: Artifact(TestResults, "test-report", ...) ``` Artifacts stored in `WorkflowInstance.accumulated_artifacts` and passed to subsequent stages via context. ## Kogral Integration Enrich workflow context with persistent knowledge from Kogral: ```rust orchestrator.enrich_context_from_kogral(&mut context, "feature_development").await?; ``` Loads: - **Guidelines**: `.kogral/guidelines/{workflow_name}.md` - **Patterns**: `.kogral/patterns/*.md` (matching workflow name) - **ADRs**: `.kogral/adrs/*.md` (5 most recent, containing workflow name) Result injected into context: ```json { "task": "Add authentication", "kogral_guidelines": { "source": ".kogral/guidelines/feature_development.md", "content": "..." }, "kogral_patterns": [ { "file": "auth-pattern.md", "content": "..." } ], "kogral_decisions": [ { "file": "0005-oauth2-implementation.md", "content": "..." } ] } ``` **Configuration**: ```bash export KOGRAL_PATH="/path/to/kogral/.kogral" ``` Default: `../kogral/.kogral` (sibling directory) ## REST API All endpoints under `/api/v1/workflow_orchestrator`: ### Start Workflow ```http POST /api/v1/workflow_orchestrator Content-Type: application/json { "template": "feature_development", "context": { "task": "Implement authentication", "requirements": ["OAuth2", "JWT"] } } ``` **Response**: ```json { "workflow_id": "3f9a2b1c-5e7f-4a9b-8c2d-1e3f5a7b9c1d" } ``` ### List Active Workflows ```http GET /api/v1/workflow_orchestrator ``` **Response**: ```json { "workflows": [ { "id": "3f9a2b1c-5e7f-4a9b-8c2d-1e3f5a7b9c1d", "template_name": "feature_development", "status": "running", "current_stage": 2, "total_stages": 5, "created_at": "2026-01-24T01:23:45.123Z", "updated_at": "2026-01-24T01:45:12.456Z" } ] } ``` ### Get Workflow Status ```http GET /api/v1/workflow_orchestrator/:id ``` **Response**: Same as workflow object in list response ### Approve Stage ```http POST /api/v1/workflow_orchestrator/:id/approve Content-Type: application/json { "approver": "Jane Doe" } ``` **Response**: ```json { "success": true, "message": "Workflow 3f9a2b1c stage approved" } ``` ### Cancel Workflow ```http POST /api/v1/workflow_orchestrator/:id/cancel Content-Type: application/json { "reason": "Requirements changed" } ``` **Response**: ```json { "success": true, "message": "Workflow 3f9a2b1c cancelled" } ``` ### List Templates ```http GET /api/v1/workflow_orchestrator/templates ``` **Response**: ```json { "templates": [ "feature_development", "bugfix", "documentation_update", "security_audit" ] } ``` ## NATS Events Workflow orchestrator publishes/subscribes to NATS JetStream: ### Subscriptions - `vapora.tasks.completed` - Agent task completion events - `vapora.tasks.failed` - Agent task failure events ### Publications - `vapora.workflow.approval_required` - Stage waiting for approval - `vapora.workflow.completed` - Workflow finished successfully **Event Format**: ```json { "type": "approval_required", "workflow_id": "3f9a2b1c-5e7f-4a9b-8c2d-1e3f5a7b9c1d", "stage": "code_review", "timestamp": "2026-01-24T01:45:12.456Z" } ``` ## Metrics Prometheus metrics exposed at `/metrics`: - `vapora_workflows_started_total` - Total workflows initiated - `vapora_workflows_completed_total` - Successfully finished workflows - `vapora_workflows_failed_total` - Failed workflows - `vapora_stages_completed_total` - Individual stage completions - `vapora_active_workflows` - Currently running workflows (gauge) - `vapora_stage_duration_seconds` - Histogram of stage execution times - `vapora_workflow_duration_seconds` - Histogram of total workflow times ## Cost Optimization ### Before: Monolithic Session ```text Session with 50 messages: ├─ Message 1: 50K context → 50K cache reads ├─ Message 2: 100K context → 100K cache reads ├─ Message 3: 150K context → 150K cache reads └─ Message 50: 800K context → 800K cache reads ────────────────── ~20M cache reads ``` **Cost**: ~$840/month for typical usage ### After: Multi-Stage Workflow ```text Workflow with 3 stages: ├─ Architect: 40K context, 5 msgs → 200K cache reads ├─ Developer: 25K context, 12 msgs → 300K cache reads └─ Reviewer: 35K context, 4 msgs → 140K cache reads ────────────────── ~640K cache reads ``` **Cost**: ~$110/month for equivalent work **Savings**: ~$730/month (87% reduction) ## Usage Examples See [CLI Commands Guide](../setup/cli-commands.md) for command-line usage. ### Programmatic Usage ```rust use vapora_workflow_engine::WorkflowOrchestrator; use std::sync::Arc; // Initialize orchestrator let orchestrator = Arc::new( WorkflowOrchestrator::new( "config/workflows.toml", swarm, kg, nats, ).await? ); // Start event listener orchestrator.clone().start_event_listener().await?; // Start workflow let workflow_id = orchestrator.start_workflow( "feature_development", serde_json::json!({ "task": "Add authentication", "requirements": ["OAuth2", "JWT"] }) ).await?; // Get status let workflow = orchestrator.get_workflow(&workflow_id)?; println!("Status: {:?}", workflow.status); // Approve stage (if waiting) orchestrator.approve_stage(&workflow_id, "Jane Doe").await?; ``` ## Configuration ### Workflow Templates File: `config/workflows.toml` ```toml [engine] max_parallel_tasks = 10 workflow_timeout = 3600 approval_gates_enabled = true [[workflows]] name = "custom_workflow" trigger = "manual" [[workflows.stages]] name = "stage_name" agents = ["agent_role"] parallel = false max_parallel = 1 approval_required = false ``` ### Environment Variables ```bash # Kogral knowledge base path export KOGRAL_PATH="/path/to/kogral/.kogral" # NATS connection export NATS_URL="nats://localhost:4222" # Backend API (for CLI) export VAPORA_API_URL="http://localhost:8001" ``` ## Troubleshooting ### Workflow Stuck in "waiting_approval" **Solution**: Use CLI or API to approve: ```bash vapora workflow approve --approver "Your Name" ``` ### Stage Fails Repeatedly **Check**: 1. Agent availability: `vapora workflow list` (via backend) 2. NATS connection: Verify NATS URL and cluster status 3. Task requirements: Check if stage agents have required capabilities ### High Latency Between Stages **Causes**: - NATS messaging delay (check network) - SwarmCoordinator queue depth (check agent load) - Artifact serialization overhead (reduce artifact size) **Mitigation**: - Use parallel stages where possible - Increase `max_parallel` in stage config - Optimize artifact content (references instead of full content) ### Workflow Not Advancing **Debug**: ```bash # Check workflow status vapora workflow status # Check backend logs docker logs vapora-backend # Check NATS messages nats sub "vapora.tasks.>" ``` ## Related Documentation - [CLI Commands Guide](../setup/cli-commands.md) - Command-line usage - [Multi-Agent Workflows](../architecture/multi-agent-workflows.md) - Architecture overview - [Agent Registry & Coordination](../architecture/agent-registry-coordination.md) - Agent management - [ADR-0028: Workflow Orchestrator](../adrs/0028-workflow-orchestrator.md) - Decision rationale - [ADR-0014: Learning-Based Agent Selection](../adrs/0014-learning-profiles.md) - Agent selection - [ADR-0015: Budget Enforcement](../adrs/0015-budget-enforcement.md) - Cost control