2026-01-24 02:07:45 +00:00
# Workflow Orchestrator
Multi-stage workflow execution with cost-efficient agent coordination and artifact passing.
## Overview
The Workflow Orchestrator (`vapora-workflow-engine` ) enables cost-efficient multi-agent pipelines by executing workflows as discrete stages with short-lived agent contexts. Instead of accumulating context in long sessions, agents receive only what they need, produce artifacts, and terminate.
**Key Benefit**: ~95% reduction in LLM cache token costs compared to monolithic session patterns.
## Architecture
### Core Components
```text
┌─────────────────────────────────────────────────────────┐
│ WorkflowOrchestrator │
│ ┌─────────────────────────────────────────────────┐ │
│ │ WorkflowInstance │ │
│ │ ├─ workflow_id: UUID │ │
│ │ ├─ template: WorkflowConfig │ │
│ │ ├─ current_stage: usize │ │
│ │ ├─ stage_states: Vec< StageState > │ │
│ │ └─ artifacts: HashMap< String , Artifact > │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ NATS │ │ Swarm │ │ KG │
│ Listener │ │Coordinator│ │Persistence│
└──────────┘ └──────────┘ └──────────┘
```
### Workflow Lifecycle
1. **Template Loading** : Read workflow definition from `config/workflows.toml`
2. **Instance Creation** : Create `WorkflowInstance` with initial context
3. **Stage Execution** : Orchestrator assigns tasks to agents via SwarmCoordinator
4. **Event Listening** : NATS subscribers wait for `TaskCompleted` /`TaskFailed` events
5. **Stage Advancement** : When all tasks complete, advance to next stage
6. **Artifact Passing** : Accumulated artifacts passed to subsequent stages
7. **Completion** : Workflow marked complete, metrics recorded
## Workflow Templates
Pre-configured workflows in `config/workflows.toml` :
### feature_development (5 stages)
```toml
[[workflows]]
name = "feature_development"
trigger = "manual"
[[workflows.stages]]
name = "architecture_design"
agents = ["architect"]
parallel = false
approval_required = false
[[workflows.stages]]
name = "implementation"
agents = ["developer", "developer"]
parallel = true
max_parallel = 2
approval_required = false
[[workflows.stages]]
name = "testing"
agents = ["tester"]
parallel = false
approval_required = false
[[workflows.stages]]
name = "code_review"
agents = ["reviewer"]
parallel = false
approval_required = true
[[workflows.stages]]
name = "deployment"
agents = ["devops"]
parallel = false
approval_required = true
```
**Stages**: architecture → implementation (parallel) → testing → review (approval) → deployment (approval)
### bugfix (4 stages)
**Stages**: investigation → fix → testing → deployment
### documentation_update (3 stages)
**Stages**: content creation → review (approval) → publish
### security_audit (4 stages)
**Stages**: code analysis → penetration testing → remediation → verification (approval)
## Stage Types
### Sequential Stages
Single agent executes task, advances when complete.
```toml
[[workflows.stages]]
name = "architecture_design"
agents = ["architect"]
parallel = false
```
### Parallel Stages
Multiple agents execute tasks simultaneously.
```toml
[[workflows.stages]]
name = "implementation"
agents = ["developer", "developer"]
parallel = true
max_parallel = 2
```
### Approval Gates
Stage requires manual approval before advancing.
```toml
[[workflows.stages]]
name = "deployment"
agents = ["devops"]
approval_required = true
```
When `approval_required = true` :
1. Workflow pauses with status `waiting_approval:<stage_idx>`
2. NATS event published to `vapora.workflow.approval_required`
3. Admin approves via API or CLI
4. Workflow resumes execution
## Artifacts
Data passed between stages:
### Artifact Types
```rust
pub enum ArtifactType {
Adr, // Architecture Decision Record
Code, // Source code files
TestResults, // Test execution output
Review, // Code review feedback
Documentation, // Generated docs
Custom(String), // User-defined type
}
```
### Artifact Flow
```text
Stage 1: Architecture
└─ Produces: Artifact(Adr, "design-spec", ...)
│
▼
Stage 2: Implementation
├─ Consumes: design-spec
└─ Produces: Artifact(Code, "feature-impl", ...)
│
▼
Stage 3: Testing
├─ Consumes: feature-impl
└─ Produces: Artifact(TestResults, "test-report", ...)
```
Artifacts stored in `WorkflowInstance.accumulated_artifacts` and passed to subsequent stages via context.
## Kogral Integration
Enrich workflow context with persistent knowledge from Kogral:
```rust
orchestrator.enrich_context_from_kogral(& mut context, "feature_development").await?;
```
Loads:
- **Guidelines**: `.kogral/guidelines/{workflow_name}.md`
- **Patterns**: `.kogral/patterns/*.md` (matching workflow name)
- **ADRs**: `.kogral/adrs/*.md` (5 most recent, containing workflow name)
Result injected into context:
```json
{
"task": "Add authentication",
"kogral_guidelines": {
"source": ".kogral/guidelines/feature_development.md",
"content": "..."
},
"kogral_patterns": [
{ "file": "auth-pattern.md", "content": "..." }
],
"kogral_decisions": [
{ "file": "0005-oauth2-implementation.md", "content": "..." }
]
}
```
**Configuration**:
```bash
export KOGRAL_PATH="/path/to/kogral/.kogral"
```
Default: `../kogral/.kogral` (sibling directory)
## REST API
All endpoints under `/api/v1/workflow_orchestrator` :
### Start Workflow
```http
POST /api/v1/workflow_orchestrator
Content-Type: application/json
{
"template": "feature_development",
"context": {
"task": "Implement authentication",
"requirements": ["OAuth2", "JWT"]
}
}
```
**Response**:
```json
{
"workflow_id": "3f9a2b1c-5e7f-4a9b-8c2d-1e3f5a7b9c1d"
}
```
### List Active Workflows
```http
GET /api/v1/workflow_orchestrator
```
**Response**:
```json
{
"workflows": [
{
"id": "3f9a2b1c-5e7f-4a9b-8c2d-1e3f5a7b9c1d",
"template_name": "feature_development",
"status": "running",
"current_stage": 2,
"total_stages": 5,
"created_at": "2026-01-24T01:23:45.123Z",
"updated_at": "2026-01-24T01:45:12.456Z"
}
]
}
```
### Get Workflow Status
```http
GET /api/v1/workflow_orchestrator/:id
```
**Response**: Same as workflow object in list response
### Approve Stage
```http
POST /api/v1/workflow_orchestrator/:id/approve
Content-Type: application/json
{
"approver": "Jane Doe"
}
```
**Response**:
```json
{
"success": true,
"message": "Workflow 3f9a2b1c stage approved"
}
```
### Cancel Workflow
```http
POST /api/v1/workflow_orchestrator/:id/cancel
Content-Type: application/json
{
"reason": "Requirements changed"
}
```
**Response**:
```json
{
"success": true,
"message": "Workflow 3f9a2b1c cancelled"
}
```
### List Templates
```http
GET /api/v1/workflow_orchestrator/templates
```
**Response**:
```json
{
"templates": [
"feature_development",
"bugfix",
"documentation_update",
"security_audit"
]
}
```
## NATS Events
Workflow orchestrator publishes/subscribes to NATS JetStream:
### Subscriptions
- `vapora.tasks.completed` - Agent task completion events
- `vapora.tasks.failed` - Agent task failure events
### Publications
- `vapora.workflow.approval_required` - Stage waiting for approval
- `vapora.workflow.completed` - Workflow finished successfully
**Event Format**:
```json
{
"type": "approval_required",
"workflow_id": "3f9a2b1c-5e7f-4a9b-8c2d-1e3f5a7b9c1d",
"stage": "code_review",
"timestamp": "2026-01-24T01:45:12.456Z"
}
```
## Metrics
Prometheus metrics exposed at `/metrics` :
- `vapora_workflows_started_total` - Total workflows initiated
- `vapora_workflows_completed_total` - Successfully finished workflows
- `vapora_workflows_failed_total` - Failed workflows
- `vapora_stages_completed_total` - Individual stage completions
- `vapora_active_workflows` - Currently running workflows (gauge)
- `vapora_stage_duration_seconds` - Histogram of stage execution times
- `vapora_workflow_duration_seconds` - Histogram of total workflow times
## Cost Optimization
### Before: Monolithic Session
```text
Session with 50 messages:
├─ Message 1: 50K context → 50K cache reads
├─ Message 2: 100K context → 100K cache reads
├─ Message 3: 150K context → 150K cache reads
└─ Message 50: 800K context → 800K cache reads
──────────────────
~20M cache reads
```
**Cost**: ~$840/month for typical usage
### After: Multi-Stage Workflow
```text
Workflow with 3 stages:
├─ Architect: 40K context, 5 msgs → 200K cache reads
├─ Developer: 25K context, 12 msgs → 300K cache reads
└─ Reviewer: 35K context, 4 msgs → 140K cache reads
──────────────────
~640K cache reads
```
**Cost**: ~$110/month for equivalent work
**Savings**: ~$730/month (87% reduction)
## Usage Examples
See [CLI Commands Guide ](../setup/cli-commands.md ) for command-line usage.
### Programmatic Usage
```rust
use vapora_workflow_engine::WorkflowOrchestrator;
use std::sync::Arc;
// Initialize orchestrator
let orchestrator = Arc::new(
WorkflowOrchestrator::new(
"config/workflows.toml",
swarm,
kg,
nats,
).await?
);
// Start event listener
orchestrator.clone().start_event_listener().await?;
// Start workflow
let workflow_id = orchestrator.start_workflow(
"feature_development",
serde_json::json!({
"task": "Add authentication",
"requirements": ["OAuth2", "JWT"]
})
).await?;
// Get status
let workflow = orchestrator.get_workflow(&workflow_id)?;
println!("Status: {:?}", workflow.status);
// Approve stage (if waiting)
orchestrator.approve_stage(& workflow_id, "Jane Doe").await?;
```
## Configuration
### Workflow Templates
File: `config/workflows.toml`
```toml
[engine]
max_parallel_tasks = 10
workflow_timeout = 3600
approval_gates_enabled = true
[[workflows]]
name = "custom_workflow"
trigger = "manual"
[[workflows.stages]]
name = "stage_name"
agents = ["agent_role"]
parallel = false
max_parallel = 1
approval_required = false
```
### Environment Variables
```bash
# Kogral knowledge base path
export KOGRAL_PATH="/path/to/kogral/.kogral"
# NATS connection
export NATS_URL="nats://localhost:4222"
# Backend API (for CLI)
export VAPORA_API_URL="http://localhost:8001"
```
## Troubleshooting
### Workflow Stuck in "waiting_approval"
**Solution**: Use CLI or API to approve:
```bash
vapora workflow approve < workflow_id > --approver "Your Name"
```
### Stage Fails Repeatedly
**Check**:
1. Agent availability: `vapora workflow list` (via backend)
2. NATS connection: Verify NATS URL and cluster status
3. Task requirements: Check if stage agents have required capabilities
### High Latency Between Stages
**Causes**:
- NATS messaging delay (check network)
- SwarmCoordinator queue depth (check agent load)
- Artifact serialization overhead (reduce artifact size)
**Mitigation**:
- Use parallel stages where possible
- Increase `max_parallel` in stage config
- Optimize artifact content (references instead of full content)
### Workflow Not Advancing
**Debug**:
```bash
# Check workflow status
vapora workflow status < workflow_id >
# Check backend logs
docker logs vapora-backend
# Check NATS messages
nats sub "vapora.tasks.>"
```
feat(workflow-engine): autonomous scheduling with timezone and distributed lock
Add cron-based autonomous workflow firing with two hardening layers:
- Timezone-aware scheduling via chrono-tz: ScheduledWorkflow.timezone
(IANA identifier), compute_next_fire_at/after_tz, validate_timezone;
DST-safe, UTC fallback when absent; validated at config load and REST API
- Distributed fire-lock via SurrealDB conditional UPDATE (locked_by/locked_at
fields, 120 s TTL); WorkflowScheduler gains instance_id (UUID) as lock owner;
prevents double-fires across multi-instance deployments without extra infra
- ScheduleStore: try_acquire_fire_lock, release_fire_lock (own-instance guard),
full CRUD (load_one/all, full_upsert, patch, delete, load_runs)
- REST: 7 endpoints (GET/PUT/PATCH/DELETE schedules, runs history, manual fire)
with timezone field in all request/response types
- Migrations 010 (schedule tables) + 011 (timezone + lock columns)
- Tests: 48 passing (was 26); ADR-0034; changelog; feature docs updated
2026-02-26 11:34:44 +00:00
## Autonomous Scheduling
Workflows with `trigger = "schedule"` fire automatically on a cron expression without any REST trigger.
### TOML Configuration
```toml
[[workflows]]
name = "nightly_analysis"
trigger = "schedule"
[workflows.schedule]
cron = "0 2 * * *" # 5-field: min hour dom month dow
timezone = "America/New_York" # IANA identifier; omit for UTC
allow_concurrent = false # skip if previous run is still active
catch_up = false # fire missed slots on restart (capped 10)
[[workflows.stages]]
name = "analyze"
agents = ["analyst"]
```
Cron accepts 5-field (standard shell), 6-field (with seconds), or 7-field (with seconds + year). The expression is validated at config-load time — startup fails on invalid cron or unknown timezone.
### Schedule REST API
| Method | Path | Description |
|--------|------|-------------|
| `GET` | `/api/v1/schedules` | List all schedules |
| `GET` | `/api/v1/schedules/:id` | Get one schedule |
| `PUT` | `/api/v1/schedules/:id` | Create or fully replace |
| `PATCH` | `/api/v1/schedules/:id` | Partial update |
| `DELETE` | `/api/v1/schedules/:id` | Remove |
| `GET` | `/api/v1/schedules/:id/runs` | Execution history (last 100) |
| `POST` | `/api/v1/schedules/:id/fire` | Manual trigger bypassing cron |
**PUT body** (all fields):
```json
{
"template_name": "nightly_analysis",
"cron_expression": "0 2 * * *",
"timezone": "America/New_York",
"enabled": true,
"allow_concurrent": false,
"catch_up": false,
"initial_context": {}
}
```
**PATCH body** (only changed fields):
```json
{ "enabled": false }
```
### Timezone Support
`timezone` is an IANA timezone identifier (e.g. `"America/New_York"` , `"Europe/Berlin"` , `"Asia/Tokyo"` ). When absent, UTC is used. DST transitions are handled automatically.
The REST API validates the timezone at the boundary — an unknown identifier returns `400 InvalidInput` .
### Distributed Fire-Lock
When multiple VAPORA backend instances run against the same SurrealDB, the scheduler uses a conditional `UPDATE ... WHERE locked_by IS NONE OR locked_at < (now - 120s)` to ensure only one instance fires each schedule per tick. The lock holder is identified by a per-process UUID stored in `locked_by` ; it expires automatically after 120 seconds, handling crashed instances.
### Schedule Metrics (Prometheus)
- `vapora_schedules_fired_total` — successful fires
- `vapora_schedules_skipped_total` — skipped (concurrent guard or distributed lock contention)
- `vapora_schedules_failed_total` — workflow start failures
- `vapora_active_schedules` — current count (gauge)
2026-01-24 02:07:45 +00:00
## Related Documentation
- [CLI Commands Guide ](../setup/cli-commands.md ) - Command-line usage
- [Multi-Agent Workflows ](../architecture/multi-agent-workflows.md ) - Architecture overview
- [Agent Registry & Coordination ](../architecture/agent-registry-coordination.md ) - Agent management
- [ADR-0028: Workflow Orchestrator ](../adrs/0028-workflow-orchestrator.md ) - Decision rationale
feat(workflow-engine): autonomous scheduling with timezone and distributed lock
Add cron-based autonomous workflow firing with two hardening layers:
- Timezone-aware scheduling via chrono-tz: ScheduledWorkflow.timezone
(IANA identifier), compute_next_fire_at/after_tz, validate_timezone;
DST-safe, UTC fallback when absent; validated at config load and REST API
- Distributed fire-lock via SurrealDB conditional UPDATE (locked_by/locked_at
fields, 120 s TTL); WorkflowScheduler gains instance_id (UUID) as lock owner;
prevents double-fires across multi-instance deployments without extra infra
- ScheduleStore: try_acquire_fire_lock, release_fire_lock (own-instance guard),
full CRUD (load_one/all, full_upsert, patch, delete, load_runs)
- REST: 7 endpoints (GET/PUT/PATCH/DELETE schedules, runs history, manual fire)
with timezone field in all request/response types
- Migrations 010 (schedule tables) + 011 (timezone + lock columns)
- Tests: 48 passing (was 26); ADR-0034; changelog; feature docs updated
2026-02-26 11:34:44 +00:00
- [ADR-0034: Autonomous Scheduling ](../adrs/0034-autonomous-scheduling.md ) - Scheduling design decisions
2026-01-24 02:07:45 +00:00
- [ADR-0014: Learning-Based Agent Selection ](../adrs/0014-learning-profiles.md ) - Agent selection
- [ADR-0015: Budget Enforcement ](../adrs/0015-budget-enforcement.md ) - Cost control