# Workflow Orchestrator Multi-stage workflow execution with cost-efficient agent coordination and artifact passing. ## Overview The Workflow Orchestrator (`vapora-workflow-engine`) enables cost-efficient multi-agent pipelines by executing workflows as discrete stages with short-lived agent contexts. Instead of accumulating context in long sessions, agents receive only what they need, produce artifacts, and terminate. **Key Benefit**: ~95% reduction in LLM cache token costs compared to monolithic session patterns. ## Architecture ### Core Components ```text ┌─────────────────────────────────────────────────────────┐ │ WorkflowOrchestrator │ │ ┌─────────────────────────────────────────────────┐ │ │ │ WorkflowInstance │ │ │ │ ├─ workflow_id: UUID │ │ │ │ ├─ template: WorkflowConfig │ │ │ │ ├─ current_stage: usize │ │ │ │ ├─ stage_states: Vec │ │ │ │ └─ artifacts: HashMap │ │ │ └─────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────┘ │ │ │ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ NATS │ │ Swarm │ │ KG │ │ Listener │ │Coordinator│ │Persistence│ └──────────┘ └──────────┘ └──────────┘ ``` ### Workflow Lifecycle 1. **Template Loading**: Read workflow definition from `config/workflows.toml` 2. **Instance Creation**: Create `WorkflowInstance` with initial context 3. **Stage Execution**: Orchestrator assigns tasks to agents via SwarmCoordinator 4. **Event Listening**: NATS subscribers wait for `TaskCompleted`/`TaskFailed` events 5. **Stage Advancement**: When all tasks complete, advance to next stage 6. **Artifact Passing**: Accumulated artifacts passed to subsequent stages 7. **Completion**: Workflow marked complete, metrics recorded ## Workflow Templates Pre-configured workflows in `config/workflows.toml`: ### feature_development (5 stages) ```toml [[workflows]] name = "feature_development" trigger = "manual" [[workflows.stages]] name = "architecture_design" agents = ["architect"] parallel = false approval_required = false [[workflows.stages]] name = "implementation" agents = ["developer", "developer"] parallel = true max_parallel = 2 approval_required = false [[workflows.stages]] name = "testing" agents = ["tester"] parallel = false approval_required = false [[workflows.stages]] name = "code_review" agents = ["reviewer"] parallel = false approval_required = true [[workflows.stages]] name = "deployment" agents = ["devops"] parallel = false approval_required = true ``` **Stages**: architecture → implementation (parallel) → testing → review (approval) → deployment (approval) ### bugfix (4 stages) **Stages**: investigation → fix → testing → deployment ### documentation_update (3 stages) **Stages**: content creation → review (approval) → publish ### security_audit (4 stages) **Stages**: code analysis → penetration testing → remediation → verification (approval) ## Stage Types ### Sequential Stages Single agent executes task, advances when complete. ```toml [[workflows.stages]] name = "architecture_design" agents = ["architect"] parallel = false ``` ### Parallel Stages Multiple agents execute tasks simultaneously. ```toml [[workflows.stages]] name = "implementation" agents = ["developer", "developer"] parallel = true max_parallel = 2 ``` ### Approval Gates Stage requires manual approval before advancing. ```toml [[workflows.stages]] name = "deployment" agents = ["devops"] approval_required = true ``` When `approval_required = true`: 1. Workflow pauses with status `waiting_approval:` 2. NATS event published to `vapora.workflow.approval_required` 3. Admin approves via API or CLI 4. Workflow resumes execution ## Artifacts Data passed between stages: ### Artifact Types ```rust pub enum ArtifactType { Adr, // Architecture Decision Record Code, // Source code files TestResults, // Test execution output Review, // Code review feedback Documentation, // Generated docs Custom(String), // User-defined type } ``` ### Artifact Flow ```text Stage 1: Architecture └─ Produces: Artifact(Adr, "design-spec", ...) │ ▼ Stage 2: Implementation ├─ Consumes: design-spec └─ Produces: Artifact(Code, "feature-impl", ...) │ ▼ Stage 3: Testing ├─ Consumes: feature-impl └─ Produces: Artifact(TestResults, "test-report", ...) ``` Artifacts stored in `WorkflowInstance.accumulated_artifacts` and passed to subsequent stages via context. ## Kogral Integration Enrich workflow context with persistent knowledge from Kogral: ```rust orchestrator.enrich_context_from_kogral(&mut context, "feature_development").await?; ``` Loads: - **Guidelines**: `.kogral/guidelines/{workflow_name}.md` - **Patterns**: `.kogral/patterns/*.md` (matching workflow name) - **ADRs**: `.kogral/adrs/*.md` (5 most recent, containing workflow name) Result injected into context: ```json { "task": "Add authentication", "kogral_guidelines": { "source": ".kogral/guidelines/feature_development.md", "content": "..." }, "kogral_patterns": [ { "file": "auth-pattern.md", "content": "..." } ], "kogral_decisions": [ { "file": "0005-oauth2-implementation.md", "content": "..." } ] } ``` **Configuration**: ```bash export KOGRAL_PATH="/path/to/kogral/.kogral" ``` Default: `../kogral/.kogral` (sibling directory) ## REST API Two distinct API surfaces exist for workflows: - **`/api/v1/workflow_orchestrator`** — live orchestration (start, approve, cancel, status) - **`/api/v1/workflows`** — workflow CRUD with execution history and Merkle audit trail ### Workflow CRUD (`/api/v1/workflows`) | Method | Path | Description | |--------|------|-------------| | `GET` | `/api/v1/workflows` | List all registered workflows | | `POST` | `/api/v1/workflows` | Register workflow from YAML | | `GET` | `/api/v1/workflows/:id` | Get workflow by ID | | `POST` | `/api/v1/workflows/:id/execute` | Execute a registered workflow | | `POST` | `/api/v1/workflows/:id/rollback` | Rollback a failed workflow | | `GET` | `/api/v1/workflows/:id/audit` | Get tamper-evident audit trail | **Create from YAML**: ```http POST /api/v1/workflows Content-Type: application/json { "yaml": "workflow:\n id: my-workflow\n steps: ..." } ``` **Audit trail entry** (each entry is hash-chained): ```json { "seq": 3, "entry_id": "uuid", "timestamp": "2026-02-26T10:00:00Z", "workflow_id": "my-workflow", "event_type": "stage_completed", "actor": "developer-agent", "details": {}, "prev_hash": "abc123...", "block_hash": "def456..." } ``` The `block_hash` covers `prev_hash|seq|entry_id|timestamp|workflow_id|event_type|actor|details_json` — modifying any field breaks the chain. Call `GET /api/v1/workflows/:id/audit` to retrieve the full chain; chain integrity is verified server-side via `AuditTrail::verify_integrity`. > **Note**: `WorkflowService` is initialized non-fatally at startup. If `AgentCoordinator` init fails (usually a missing `agents.toml`), all `/api/v1/workflows/*` endpoints return `503 Service Unavailable` rather than crashing the backend. ### Orchestration endpoints (`/api/v1/workflow_orchestrator`) ### Start Workflow ```http POST /api/v1/workflow_orchestrator Content-Type: application/json { "template": "feature_development", "context": { "task": "Implement authentication", "requirements": ["OAuth2", "JWT"] } } ``` **Response**: ```json { "workflow_id": "3f9a2b1c-5e7f-4a9b-8c2d-1e3f5a7b9c1d" } ``` ### List Active Workflows ```http GET /api/v1/workflow_orchestrator ``` **Response**: ```json { "workflows": [ { "id": "3f9a2b1c-5e7f-4a9b-8c2d-1e3f5a7b9c1d", "template_name": "feature_development", "status": "running", "current_stage": 2, "total_stages": 5, "created_at": "2026-01-24T01:23:45.123Z", "updated_at": "2026-01-24T01:45:12.456Z" } ] } ``` ### Get Workflow Status ```http GET /api/v1/workflow_orchestrator/:id ``` **Response**: Same as workflow object in list response ### Approve Stage ```http POST /api/v1/workflow_orchestrator/:id/approve Content-Type: application/json { "approver": "Jane Doe" } ``` **Response**: ```json { "success": true, "message": "Workflow 3f9a2b1c stage approved" } ``` ### Cancel Workflow ```http POST /api/v1/workflow_orchestrator/:id/cancel Content-Type: application/json { "reason": "Requirements changed" } ``` **Response**: ```json { "success": true, "message": "Workflow 3f9a2b1c cancelled" } ``` ### List Templates ```http GET /api/v1/workflow_orchestrator/templates ``` **Response**: ```json { "templates": [ "feature_development", "bugfix", "documentation_update", "security_audit" ] } ``` ## NATS Events Workflow orchestrator publishes/subscribes to NATS JetStream: ### Subscriptions - `vapora.tasks.completed` - Agent task completion events - `vapora.tasks.failed` - Agent task failure events ### Publications - `vapora.workflow.approval_required` - Stage waiting for approval - `vapora.workflow.completed` - Workflow finished successfully **Event Format**: ```json { "type": "approval_required", "workflow_id": "3f9a2b1c-5e7f-4a9b-8c2d-1e3f5a7b9c1d", "stage": "code_review", "timestamp": "2026-01-24T01:45:12.456Z" } ``` ## Metrics Prometheus metrics exposed at `/metrics`: - `vapora_workflows_started_total` - Total workflows initiated - `vapora_workflows_completed_total` - Successfully finished workflows - `vapora_workflows_failed_total` - Failed workflows - `vapora_stages_completed_total` - Individual stage completions - `vapora_active_workflows` - Currently running workflows (gauge) - `vapora_stage_duration_seconds` - Histogram of stage execution times - `vapora_workflow_duration_seconds` - Histogram of total workflow times ## Cost Optimization ### Before: Monolithic Session ```text Session with 50 messages: ├─ Message 1: 50K context → 50K cache reads ├─ Message 2: 100K context → 100K cache reads ├─ Message 3: 150K context → 150K cache reads └─ Message 50: 800K context → 800K cache reads ────────────────── ~20M cache reads ``` **Cost**: ~$840/month for typical usage ### After: Multi-Stage Workflow ```text Workflow with 3 stages: ├─ Architect: 40K context, 5 msgs → 200K cache reads ├─ Developer: 25K context, 12 msgs → 300K cache reads └─ Reviewer: 35K context, 4 msgs → 140K cache reads ────────────────── ~640K cache reads ``` **Cost**: ~$110/month for equivalent work **Savings**: ~$730/month (87% reduction) ## Usage Examples See [CLI Commands Guide](../setup/cli-commands.md) for command-line usage. ### Programmatic Usage ```rust use vapora_workflow_engine::WorkflowOrchestrator; use std::sync::Arc; // Initialize orchestrator let orchestrator = Arc::new( WorkflowOrchestrator::new( "config/workflows.toml", swarm, kg, nats, ).await? ); // Start event listener orchestrator.clone().start_event_listener().await?; // Start workflow let workflow_id = orchestrator.start_workflow( "feature_development", serde_json::json!({ "task": "Add authentication", "requirements": ["OAuth2", "JWT"] }) ).await?; // Get status let workflow = orchestrator.get_workflow(&workflow_id)?; println!("Status: {:?}", workflow.status); // Approve stage (if waiting) orchestrator.approve_stage(&workflow_id, "Jane Doe").await?; ``` ## Configuration ### Workflow Templates File: `config/workflows.toml` ```toml [engine] max_parallel_tasks = 10 workflow_timeout = 3600 approval_gates_enabled = true [[workflows]] name = "custom_workflow" trigger = "manual" [[workflows.stages]] name = "stage_name" agents = ["agent_role"] parallel = false max_parallel = 1 approval_required = false ``` ### Environment Variables ```bash # Kogral knowledge base path export KOGRAL_PATH="/path/to/kogral/.kogral" # NATS connection export NATS_URL="nats://localhost:4222" # Backend API (for CLI) export VAPORA_API_URL="http://localhost:8001" ``` ## Troubleshooting ### Workflow Stuck in "waiting_approval" **Solution**: Use CLI or API to approve: ```bash vapora workflow approve --approver "Your Name" ``` ### Stage Fails Repeatedly **Check**: 1. Agent availability: `vapora workflow list` (via backend) 2. NATS connection: Verify NATS URL and cluster status 3. Task requirements: Check if stage agents have required capabilities ### High Latency Between Stages **Causes**: - NATS messaging delay (check network) - SwarmCoordinator queue depth (check agent load) - Artifact serialization overhead (reduce artifact size) **Mitigation**: - Use parallel stages where possible - Increase `max_parallel` in stage config - Optimize artifact content (references instead of full content) ### Workflow Not Advancing **Debug**: ```bash # Check workflow status vapora workflow status # Check backend logs docker logs vapora-backend # Check NATS messages nats sub "vapora.tasks.>" ``` ## Autonomous Scheduling Workflows with `trigger = "schedule"` fire automatically on a cron expression without any REST trigger. ### TOML Configuration ```toml [[workflows]] name = "nightly_analysis" trigger = "schedule" [workflows.schedule] cron = "0 2 * * *" # 5-field: min hour dom month dow timezone = "America/New_York" # IANA identifier; omit for UTC allow_concurrent = false # skip if previous run is still active catch_up = false # fire missed slots on restart (capped 10) [[workflows.stages]] name = "analyze" agents = ["analyst"] ``` Cron accepts 5-field (standard shell), 6-field (with seconds), or 7-field (with seconds + year). The expression is validated at config-load time — startup fails on invalid cron or unknown timezone. ### Schedule REST API | Method | Path | Description | |--------|------|-------------| | `GET` | `/api/v1/schedules` | List all schedules | | `GET` | `/api/v1/schedules/:id` | Get one schedule | | `PUT` | `/api/v1/schedules/:id` | Create or fully replace | | `PATCH` | `/api/v1/schedules/:id` | Partial update | | `DELETE` | `/api/v1/schedules/:id` | Remove | | `GET` | `/api/v1/schedules/:id/runs` | Execution history (last 100) | | `POST` | `/api/v1/schedules/:id/fire` | Manual trigger bypassing cron (requires NATS) | **PUT body** (all fields): ```json { "template_name": "nightly_analysis", "cron_expression": "0 2 * * *", "timezone": "America/New_York", "enabled": true, "allow_concurrent": false, "catch_up": false, "initial_context": {} } ``` > **`POST /fire` availability**: Requires a live NATS connection and a valid `config/workflows.toml`. If NATS is unavailable at startup, `WorkflowOrchestrator` is not initialized and `POST /fire` returns `503`. All other schedule endpoints (`GET`, `PUT`, `PATCH`, `DELETE`) remain available regardless of NATS status. **PATCH body** (only changed fields): ```json { "enabled": false } ``` ### Timezone Support `timezone` is an IANA timezone identifier (e.g. `"America/New_York"`, `"Europe/Berlin"`, `"Asia/Tokyo"`). When absent, UTC is used. DST transitions are handled automatically. The REST API validates the timezone at the boundary — an unknown identifier returns `400 InvalidInput`. ### Distributed Fire-Lock When multiple VAPORA backend instances run against the same SurrealDB, the scheduler uses a conditional `UPDATE ... WHERE locked_by IS NONE OR locked_at < (now - 120s)` to ensure only one instance fires each schedule per tick. The lock holder is identified by a per-process UUID stored in `locked_by`; it expires automatically after 120 seconds, handling crashed instances. ### Schedule Metrics (Prometheus) - `vapora_schedules_fired_total` — successful fires - `vapora_schedules_skipped_total` — skipped (concurrent guard or distributed lock contention) - `vapora_schedules_failed_total` — workflow start failures - `vapora_active_schedules` — current count (gauge) ## Related Documentation - [CLI Commands Guide](../setup/cli-commands.md) - Command-line usage - [Multi-Agent Workflows](../architecture/multi-agent-workflows.md) - Architecture overview - [Agent Registry & Coordination](../architecture/agent-registry-coordination.md) - Agent management - [ADR-0028: Workflow Orchestrator](../adrs/0028-workflow-orchestrator.md) - Decision rationale - [ADR-0034: Autonomous Scheduling](../adrs/0034-autonomous-scheduling.md) - Scheduling design decisions - [ADR-0014: Learning-Based Agent Selection](../adrs/0014-learning-profiles.md) - Agent selection - [ADR-0015: Budget Enforcement](../adrs/0015-budget-enforcement.md) - Cost control