feat(agents): stable identity + hot-reload for zero learning loss on config change

Introduce stable_id = role on AgentMetadata so learning profiles and KG
  execution records survive process restarts and hot-reloads. Previously
  every Uuid::new_v4() rotation orphaned accumulated expertise.

  - registry: add stable_id field (serde default, backward-compatible),
    stable_id_or_role() fallback helper, drain_role(), list_roles()
  - coordinator: profile lookup and KG writes use stable_id_or_role()
    instead of the ephemeral UUID; drain_role() drops Sender to close
    mpsc channels after in-flight messages drain; registry_arc() accessor
  - executor: agent_id written to KG now uses stable_id_or_role()
  - server: reload_agents() drain-and-respawn function; SIGHUP handler
    via while sighup.recv().await.is_some(); POST /reload endpoint;
    AppState extended with config_path, router, cap_registry
  - fix: SIGHUP recv() spin-loop guard (is_some())
  - fix: io_other_error clippy lint in vapora-agents, vapora-llm-router,
    vapora-workflow-engine (std::io::Error::other instead of Error::new)
  - docs: ADR-0040, CHANGELOG entry, README hot-reload section
This commit is contained in:
Jesús Pérez 2026-03-02 22:54:28 +00:00
parent 847523e4d4
commit c5f4caa2ab
Signed by: jesus
GPG key ID: 9F243E355E0BC939
37 changed files with 1689 additions and 662 deletions

View file

@ -7,6 +7,39 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added - Agent Hot-Reload: Stable Identity and Zero-Downtime Config Reload
#### `vapora-agents` — stable_id, drain/respawn, SIGHUP, REST endpoint
- **`AgentMetadata::stable_id`** (`registry.rs`): New `#[serde(default)]` field computed as `role.clone()` at construction, before the `role` field is moved. `stable_id_or_role()` helper returns `stable_id` if non-empty, otherwise falls back to `role` for backward compatibility with serialized records that predate this change.
- **Profile key switch** (`coordinator.rs`): `assign_task` and `load_all_learning_profiles` now look up `learning_profiles` by `stable_id_or_role()` instead of the ephemeral UUID `id`. Learning expertise accumulated across any number of restarts or hot-reloads is no longer orphaned.
- **KG execution records** (`executor.rs`): The `agent_id` field written to SurrealDB on task completion now uses `stable_id_or_role()` (= role) instead of the per-instance UUID. Execution history is partitioned by role, consistent with how profiles are keyed.
- **`AgentRegistry::drain_role`** (`registry.rs`): Removes all agents for a given role from the registry and clears `running_count`, enabling immediate re-registration without hitting `MaxAgentsReached`.
- **`AgentRegistry::list_roles`** (`registry.rs`): Returns the sorted list of distinct roles currently registered.
- **`AgentCoordinator::drain_role`** (`coordinator.rs`): Calls `registry.drain_role`, then drops the corresponding `Sender` entries from `executor_channels`. Dropping the `Sender` closes the mpsc channel; each executor's `while let Some(task) = rx.recv().await` loop exits after draining buffered messages — no explicit shutdown signal needed. `learning_profiles` is untouched.
- **`AgentCoordinator::registry_arc`** (`coordinator.rs`): New accessor returning `Arc<AgentRegistry>`, used by the `/reload` endpoint to obtain the registry without exposing coordinator internals.
- **`reload_agents`** (`server.rs`): Async function that (1) drains all active roles, (2) re-spawns capability executors from `CapabilityRegistry`, (3) re-spawns config agents not covered by capabilities, (4) returns the new total agent count. Learning profiles survive the entire sequence.
- **SIGHUP handler** (`server.rs`): `tokio::signal::unix::signal(SignalKind::hangup())` drives a `while sighup.recv().await.is_some()` loop calling `handle_sighup_reload`. The `is_some()` guard prevents a spin-loop if the signal stream closes during runtime shutdown.
- **`POST /reload` endpoint** (`server.rs`): HTTP entry point for operators and CI pipelines. Loads a fresh config from `config_path`, calls `reload_agents`, returns `{"reloaded": true, "agents": N}` on success or `500` with error detail on failure.
- **Availability window documented**: `reload_agents` has a brief window (typically sub-millisecond) between drain and re-registration during which `assign_task` returns `NoAvailableAgent`. Callers must handle and retry. `BudgetManager` and `LLMRouter` are not reloaded; changes to those require a process restart.
#### Tests added (`vapora-agents`)
- `test_stable_id_deterministic` — two `AgentMetadata::new("developer", ...)` produce distinct `id` but identical `stable_id = "developer"`
- `test_stable_id_or_role_fallback` — empty `stable_id` falls back to `role`
- `test_drain_role` — after `drain_role("developer")`, reviewer agents persist and developer count is zero
- `test_list_roles` — returns correct sorted list after mixed-role registrations
- `test_profile_survives_role_drain``get_learning_profile("developer")` returns `Some` after `drain_role("developer")`
#### Clippy fixes (pre-existing, unblocked by this change)
- `vapora-workflow-engine/src/config.rs`: 4× `std::io::Error::new(ErrorKind::Other, ...)``std::io::Error::other(...)`
- `vapora-llm-router/src/budget.rs`: 4× same pattern
- `vapora-llm-router/src/config.rs`: 3× same pattern
- `vapora-agents/src/config.rs`: 4× same pattern
---
### Fixed - Stub Elimination: Real Implementations for 6 Hollow Integration Points
#### `vapora-backend` — WorkflowOrchestrator and WorkflowService wiring

View file

@ -99,7 +99,15 @@
- **Workflow events**: `on_stage_complete`, `on_stage_failed`, `on_completed`, `on_cancelled` — per-workflow routing config
- **REST API**: `GET /api/v1/channels` (list), `POST /api/v1/channels/:name/test` (connectivity check)
### 🧠 Intelligent Learning & Cost Optimization (Phase 5.3 + 5.4)
### ♻️ Agent Hot-Reload — Zero Config-Rotation Learning Loss
- **`stable_id` identity**: Each agent carries a `stable_id = role` that is deterministic across restarts. Learning profiles and KG execution records are keyed by `stable_id`, not the ephemeral UUID, so accumulated expertise survives every reload.
- **SIGHUP reload**: `kill -HUP $(pgrep vapora-agents)` drains all executors and re-spawns them from the updated config without restarting the process.
- **`POST /reload` endpoint**: HTTP entry point for CI pipelines and operators. Returns `{"reloaded": true, "agents": N}` on success.
- **Graceful drain**: Dropping the executor `Sender` closes the mpsc channel; in-flight messages are drained before the executor exits — no tasks are lost during reload.
- **Profile persistence**: `learning_profiles` (keyed by `stable_id`) is never touched during drain/respawn. New executor instances inherit accumulated expertise immediately.
### 🧠 Intelligent Learning & Cost Optimization
- **Per-Task-Type Learning**: Agents build expertise profiles from execution history
- **Recency Bias**: Recent performance weighted 3x (last 7 days) for adaptive selection

View file

@ -1,39 +0,0 @@
# Agent Role Budget Configuration
# Defines monthly and weekly spending limits per agent role
# Budget enforcement prevents runaway LLM costs
# Fallback providers used when budget thresholds exceeded
[budgets.architect]
role = "architect"
monthly_limit_cents = 50000 # $500/month
weekly_limit_cents = 12500 # $125/week
fallback_provider = "gemini" # Cheaper alternative when budget hit
alert_threshold = 0.8 # Alert at 80% utilization
[budgets.developer]
role = "developer"
monthly_limit_cents = 30000 # $300/month
weekly_limit_cents = 7500 # $75/week
fallback_provider = "ollama" # Free local model
alert_threshold = 0.8
[budgets.reviewer]
role = "reviewer"
monthly_limit_cents = 20000 # $200/month
weekly_limit_cents = 5000 # $50/week
fallback_provider = "gemini"
alert_threshold = 0.8
[budgets.documenter]
role = "documenter"
monthly_limit_cents = 15000 # $150/month
weekly_limit_cents = 3750 # $37.50/week
fallback_provider = "ollama"
alert_threshold = 0.8
[budgets.tester]
role = "tester"
monthly_limit_cents = 25000 # $250/month
weekly_limit_cents = 6250 # $62.50/week
fallback_provider = "ollama"
alert_threshold = 0.8

120
config/agents.ncl Normal file
View file

@ -0,0 +1,120 @@
let C = import "../nickel/agents/contracts.ncl" in
{
registry | C.RegistryConfig = {
max_agents_per_role = 5,
health_check_interval = 30,
agent_timeout = 300,
},
agents | Array C.AgentDefinition = [
{
role = "architect",
description = "System design, architecture decisions, ADRs",
llm_provider = "claude",
llm_model = "claude-opus-4-20250514",
parallelizable = false,
priority = 100,
capabilities = ["system_design", "architecture", "adr", "patterns"],
},
{
role = "developer",
description = "Code implementation, feature development",
llm_provider = "claude",
llm_model = "claude-sonnet-4-5-20250929",
parallelizable = true,
priority = 80,
capabilities = ["coding", "implementation", "debugging"],
},
{
role = "code_reviewer",
description = "Code quality assurance, style checking",
llm_provider = "claude",
llm_model = "claude-sonnet-4-5-20250929",
parallelizable = true,
priority = 70,
capabilities = ["code_review", "quality", "best_practices"],
},
{
role = "tester",
description = "Tests, benchmarks, quality validation",
llm_provider = "claude",
llm_model = "claude-sonnet-4-5-20250929",
parallelizable = true,
priority = 75,
capabilities = ["testing", "benchmarks", "validation"],
},
{
role = "documenter",
description = "Documentation, root files (README, CHANGELOG)",
llm_provider = "openai",
llm_model = "gpt-4o",
parallelizable = true,
priority = 60,
capabilities = ["documentation", "readme", "changelog", "guides"],
},
{
role = "marketer",
description = "Marketing content, announcements",
llm_provider = "claude",
llm_model = "claude-sonnet-4-5-20250929",
parallelizable = true,
priority = 40,
capabilities = ["marketing", "content", "announcements"],
},
{
role = "presenter",
description = "Presentations, slides, demos",
llm_provider = "claude",
llm_model = "claude-sonnet-4-5-20250929",
parallelizable = false,
priority = 50,
capabilities = ["presentations", "slides", "demos"],
},
{
role = "devops",
description = "CI/CD, deployment, infrastructure",
llm_provider = "claude",
llm_model = "claude-sonnet-4-5-20250929",
parallelizable = true,
priority = 85,
capabilities = ["cicd", "deployment", "kubernetes", "infrastructure"],
},
{
role = "monitor",
description = "System health, alerting, observability",
llm_provider = "gemini",
llm_model = "gemini-2.0-flash",
parallelizable = false,
priority = 90,
capabilities = ["monitoring", "health", "alerts", "metrics"],
},
{
role = "security",
description = "Security audit, vulnerability detection",
llm_provider = "claude",
llm_model = "claude-opus-4-20250514",
parallelizable = true,
priority = 95,
capabilities = ["security", "audit", "vulnerabilities"],
},
{
role = "project_manager",
description = "Roadmap, task tracking, coordination",
llm_provider = "claude",
llm_model = "claude-sonnet-4-5-20250929",
parallelizable = false,
priority = 65,
capabilities = ["planning", "tracking", "coordination"],
},
{
role = "decision_maker",
description = "Conflict resolution, strategic decisions",
llm_provider = "claude",
llm_model = "claude-opus-4-20250514",
parallelizable = false,
priority = 100,
capabilities = ["decisions", "conflict_resolution", "strategy"],
},
],
}

View file

@ -1,122 +0,0 @@
# Agent Registry Configuration
# Phase 0: Definition of 12 agent roles
[registry]
# Maximum number of concurrent agents per role
max_agents_per_role = 5
# Agent health check interval (seconds)
health_check_interval = 30
# Agent timeout (seconds)
agent_timeout = 300
# The 12 Agent Roles
[[agents]]
role = "architect"
description = "System design, architecture decisions, ADRs"
llm_provider = "claude"
llm_model = "claude-opus-4-20250514"
parallelizable = false
priority = 100
capabilities = ["system_design", "architecture", "adr", "patterns"]
[[agents]]
role = "developer"
description = "Code implementation, feature development"
llm_provider = "claude"
llm_model = "claude-sonnet-4-5-20250929"
parallelizable = true
priority = 80
capabilities = ["coding", "implementation", "debugging"]
[[agents]]
role = "code_reviewer"
description = "Code quality assurance, style checking"
llm_provider = "claude"
llm_model = "claude-sonnet-4-5-20250929"
parallelizable = true
priority = 70
capabilities = ["code_review", "quality", "best_practices"]
[[agents]]
role = "tester"
description = "Tests, benchmarks, quality validation"
llm_provider = "claude"
llm_model = "claude-sonnet-4-5-20250929"
parallelizable = true
priority = 75
capabilities = ["testing", "benchmarks", "validation"]
[[agents]]
role = "documenter"
description = "Documentation, root files (README, CHANGELOG)"
llm_provider = "openai"
llm_model = "gpt-4o"
parallelizable = true
priority = 60
capabilities = ["documentation", "readme", "changelog", "guides"]
[[agents]]
role = "marketer"
description = "Marketing content, announcements"
llm_provider = "claude"
llm_model = "claude-sonnet-4-5-20250929"
parallelizable = true
priority = 40
capabilities = ["marketing", "content", "announcements"]
[[agents]]
role = "presenter"
description = "Presentations, slides, demos"
llm_provider = "claude"
llm_model = "claude-sonnet-4-5-20250929"
parallelizable = false
priority = 50
capabilities = ["presentations", "slides", "demos"]
[[agents]]
role = "devops"
description = "CI/CD, deployment, infrastructure"
llm_provider = "claude"
llm_model = "claude-sonnet-4-5-20250929"
parallelizable = true
priority = 85
capabilities = ["cicd", "deployment", "kubernetes", "infrastructure"]
[[agents]]
role = "monitor"
description = "System health, alerting, observability"
llm_provider = "gemini"
llm_model = "gemini-2.0-flash"
parallelizable = false
priority = 90
capabilities = ["monitoring", "health", "alerts", "metrics"]
[[agents]]
role = "security"
description = "Security audit, vulnerability detection"
llm_provider = "claude"
llm_model = "claude-opus-4-20250514"
parallelizable = true
priority = 95
capabilities = ["security", "audit", "vulnerabilities"]
[[agents]]
role = "project_manager"
description = "Roadmap, task tracking, coordination"
llm_provider = "claude"
llm_model = "claude-sonnet-4-5-20250929"
parallelizable = false
priority = 65
capabilities = ["planning", "tracking", "coordination"]
[[agents]]
role = "decision_maker"
description = "Conflict resolution, strategic decisions"
llm_provider = "claude"
llm_model = "claude-opus-4-20250514"
parallelizable = false
priority = 100
capabilities = ["decisions", "conflict_resolution", "strategy"]

45
config/budgets.ncl Normal file
View file

@ -0,0 +1,45 @@
let C = import "../nickel/budgets/contracts.ncl" in
{
budgets = {
architect | C.RoleBudget = {
role = "architect",
monthly_limit_cents = 50000,
weekly_limit_cents = 12500,
fallback_provider = "gemini",
alert_threshold = 0.8,
},
developer | C.RoleBudget = {
role = "developer",
monthly_limit_cents = 30000,
weekly_limit_cents = 7500,
fallback_provider = "ollama",
alert_threshold = 0.8,
},
reviewer | C.RoleBudget = {
role = "reviewer",
monthly_limit_cents = 20000,
weekly_limit_cents = 5000,
fallback_provider = "gemini",
alert_threshold = 0.8,
},
documenter | C.RoleBudget = {
role = "documenter",
monthly_limit_cents = 15000,
weekly_limit_cents = 3750,
fallback_provider = "ollama",
alert_threshold = 0.8,
},
tester | C.RoleBudget = {
role = "tester",
monthly_limit_cents = 25000,
weekly_limit_cents = 6250,
fallback_provider = "ollama",
alert_threshold = 0.8,
},
},
}

10
config/channels.ncl Normal file
View file

@ -0,0 +1,10 @@
{
channels = {},
notifications = {
on_task_done = [],
on_proposal_approved = [],
on_proposal_rejected = [],
on_agent_inactive = [],
},
}

6
config/config.ncl Normal file
View file

@ -0,0 +1,6 @@
(import "./server.ncl")
& (import "./agents.ncl")
& (import "./llm-router.ncl")
& (import "./budgets.ncl")
& (import "./workflows.ncl")
& (import "./channels.ncl")

80
config/llm-router.ncl Normal file
View file

@ -0,0 +1,80 @@
let C = import "../nickel/llm-router/contracts.ncl" in
{
routing | C.RoutingConfig = {
default_provider = "claude",
cost_tracking_enabled = true,
fallback_enabled = true,
},
providers = {
claude | C.ProviderConfig = {
enabled = true,
api_key = "${ANTHROPIC_API_KEY}",
model = "claude-sonnet-4-5-20250929",
max_tokens = 8192,
temperature = 0.7,
cost_per_1m_input = 3.00,
cost_per_1m_output = 15.00,
},
openai | C.ProviderConfig = {
enabled = true,
api_key = "${OPENAI_API_KEY}",
model = "gpt-4o",
max_tokens = 4096,
temperature = 0.7,
cost_per_1m_input = 2.50,
cost_per_1m_output = 10.00,
},
gemini | C.ProviderConfig = {
enabled = true,
api_key = "${GOOGLE_API_KEY}",
model = "gemini-2.0-flash",
max_tokens = 8192,
temperature = 0.7,
cost_per_1m_input = 0.30,
cost_per_1m_output = 1.20,
},
ollama | C.ProviderConfig = {
enabled = true,
url = "${OLLAMA_URL:-http://localhost:11434}",
model = "llama3.2",
max_tokens = 4096,
temperature = 0.7,
cost_per_1m_input = 0.00,
cost_per_1m_output = 0.00,
},
},
routing_rules | Array C.RoutingRule = [
{
name = "architecture_design",
condition = { task_type = "architecture" },
provider = "claude",
model_override = "claude-opus-4-20250514",
},
{
name = "code_generation",
condition = { task_type = "development" },
provider = "claude",
},
{
name = "documentation",
condition = { task_type = "documentation" },
provider = "openai",
},
{
name = "monitoring",
condition = { task_type = "monitoring" },
provider = "gemini",
},
{
name = "local_testing",
condition = { environment = "development" },
provider = "ollama",
},
],
}

View file

@ -1,87 +0,0 @@
# Multi-IA Router Configuration
# Phase 0: Configuration for LLM provider selection
[routing]
# Default provider if no specific routing rules match
default_provider = "claude"
# Enable cost tracking
cost_tracking_enabled = true
# Enable fallback on provider failure
fallback_enabled = true
[providers.claude]
enabled = true
# ANTHROPIC_API_KEY environment variable required
api_key = "${ANTHROPIC_API_KEY}"
model = "claude-sonnet-4-5-20250929"
max_tokens = 8192
temperature = 0.7
# Cost per 1M tokens (input/output)
cost_per_1m_input = 3.00
cost_per_1m_output = 15.00
[providers.openai]
enabled = true
# OPENAI_API_KEY environment variable required
api_key = "${OPENAI_API_KEY}"
model = "gpt-4o"
max_tokens = 4096
temperature = 0.7
# Cost per 1M tokens (input/output)
cost_per_1m_input = 2.50
cost_per_1m_output = 10.00
[providers.gemini]
enabled = true
# GOOGLE_API_KEY environment variable required
api_key = "${GOOGLE_API_KEY}"
model = "gemini-2.0-flash"
max_tokens = 8192
temperature = 0.7
# Cost per 1M tokens (input/output)
cost_per_1m_input = 0.30
cost_per_1m_output = 1.20
[providers.ollama]
enabled = true
# Local Ollama instance, no API key needed
url = "${OLLAMA_URL:-http://localhost:11434}"
model = "llama3.2"
max_tokens = 4096
temperature = 0.7
# No cost for local models
cost_per_1m_input = 0.00
cost_per_1m_output = 0.00
# Routing rules: assign providers based on task characteristics
[[routing_rules]]
name = "architecture_design"
condition = { task_type = "architecture" }
provider = "claude"
model_override = "claude-opus-4-20250514"
[[routing_rules]]
name = "code_generation"
condition = { task_type = "development" }
provider = "claude"
[[routing_rules]]
name = "documentation"
condition = { task_type = "documentation" }
provider = "openai"
[[routing_rules]]
name = "monitoring"
condition = { task_type = "monitoring" }
provider = "gemini"
[[routing_rules]]
name = "local_testing"
condition = { environment = "development" }
provider = "ollama"

38
config/server.ncl Normal file
View file

@ -0,0 +1,38 @@
let C = import "../nickel/vapora/contracts.ncl" in
{
server | C.ServerConfig = {
host = "127.0.0.1",
port = 3000,
tls = {
enabled = false,
cert_path = "",
key_path = "",
},
},
database | C.DatabaseConfig = {
url = "ws://localhost:8000",
max_connections = 10,
},
nats | C.NatsConfig = {
url = "nats://localhost:4222",
stream_name = "vapora-tasks",
},
auth | C.AuthConfig = {
jwt_secret = "change-in-production",
jwt_expiration_hours = 24,
},
logging | C.LoggingConfig = {
level = "info",
json = false,
},
metrics | C.MetricsConfig = {
enabled = true,
port = 9090,
},
}

View file

@ -1,40 +0,0 @@
# VAPORA Server Configuration
# Phase 0: Environment-based configuration
# Note: Load runtime configuration from environment variables, not this file
[server]
# Server configuration (override with env vars: VAPORA_HOST, VAPORA_PORT)
host = "127.0.0.1"
port = 3000
[server.tls]
# TLS configuration (optional)
# Override with: VAPORA_TLS_ENABLED, VAPORA_TLS_CERT_PATH, VAPORA_TLS_KEY_PATH
enabled = false
cert_path = ""
key_path = ""
[database]
# Database connection (override with: VAPORA_DB_URL, VAPORA_DB_MAX_CONNECTIONS)
url = "ws://localhost:8000"
max_connections = 10
[nats]
# NATS JetStream configuration (override with: VAPORA_NATS_URL, VAPORA_NATS_STREAM)
url = "nats://localhost:4222"
stream_name = "vapora-tasks"
[auth]
# Authentication configuration (override with: VAPORA_JWT_SECRET, VAPORA_JWT_EXPIRATION_HOURS)
jwt_secret = "change-in-production"
jwt_expiration_hours = 24
[logging]
# Logging configuration (override with: VAPORA_LOG_LEVEL, VAPORA_LOG_JSON)
level = "info"
json = false
[metrics]
# Metrics configuration (override with: VAPORA_METRICS_ENABLED, VAPORA_METRICS_PORT)
enabled = true
port = 9090

153
config/workflows.ncl Normal file
View file

@ -0,0 +1,153 @@
let C = import "../nickel/workflows/contracts.ncl" in
{
engine | C.EngineConfig = {
max_parallel_tasks = 10,
workflow_timeout = 3600,
approval_gates_enabled = true,
},
workflows = [
{
name = "feature_development",
trigger = "manual",
stages | Array C.StageConfig = [
{
name = "architecture_design",
agents = ["architect"],
parallel = false,
approval_required = false,
},
{
name = "implementation",
agents = ["developer", "developer"],
parallel = true,
max_parallel = 2,
approval_required = false,
},
{
name = "testing",
agents = ["tester"],
parallel = false,
approval_required = false,
},
{
name = "code_review",
agents = ["reviewer"],
parallel = false,
approval_required = true,
},
{
name = "deployment",
agents = ["devops"],
parallel = false,
approval_required = true,
},
],
notifications = {
on_completed = [],
on_failed = [],
on_approval_required = [],
},
},
{
name = "bugfix",
trigger = "manual",
stages | Array C.StageConfig = [
{
name = "investigation",
agents = ["developer"],
parallel = false,
approval_required = false,
},
{
name = "fix_implementation",
agents = ["developer"],
parallel = false,
approval_required = false,
},
{
name = "testing",
agents = ["tester"],
parallel = false,
approval_required = false,
},
{
name = "deployment",
agents = ["devops"],
parallel = false,
approval_required = false,
},
],
notifications = {
on_completed = [],
on_failed = [],
on_approval_required = [],
},
},
{
name = "documentation_update",
trigger = "manual",
stages | Array C.StageConfig = [
{
name = "content_creation",
agents = ["technical_writer"],
parallel = false,
approval_required = false,
},
{
name = "review",
agents = ["reviewer"],
parallel = false,
approval_required = true,
},
{
name = "publish",
agents = ["devops"],
parallel = false,
approval_required = false,
},
],
notifications = {
on_completed = [],
on_failed = [],
on_approval_required = [],
},
},
{
name = "security_audit",
trigger = "manual",
stages | Array C.StageConfig = [
{
name = "code_analysis",
agents = ["security_engineer"],
parallel = false,
approval_required = false,
},
{
name = "penetration_testing",
agents = ["security_engineer"],
parallel = false,
approval_required = false,
},
{
name = "remediation",
agents = ["developer"],
parallel = false,
approval_required = false,
},
{
name = "verification",
agents = ["security_engineer"],
parallel = false,
approval_required = true,
},
],
notifications = {
on_completed = [],
on_failed = [],
on_approval_required = [],
},
},
],
}

View file

@ -1,117 +0,0 @@
[engine]
max_parallel_tasks = 10
workflow_timeout = 3600
approval_gates_enabled = true
[[workflows]]
name = "feature_development"
trigger = "manual"
[[workflows.stages]]
name = "architecture_design"
agents = ["architect"]
parallel = false
approval_required = false
[[workflows.stages]]
name = "implementation"
agents = ["developer", "developer"]
parallel = true
max_parallel = 2
approval_required = false
[[workflows.stages]]
name = "testing"
agents = ["tester"]
parallel = false
approval_required = false
[[workflows.stages]]
name = "code_review"
agents = ["reviewer"]
parallel = false
approval_required = true
[[workflows.stages]]
name = "deployment"
agents = ["devops"]
parallel = false
approval_required = true
[[workflows]]
name = "bugfix"
trigger = "manual"
[[workflows.stages]]
name = "investigation"
agents = ["developer"]
parallel = false
approval_required = false
[[workflows.stages]]
name = "fix_implementation"
agents = ["developer"]
parallel = false
approval_required = false
[[workflows.stages]]
name = "testing"
agents = ["tester"]
parallel = false
approval_required = false
[[workflows.stages]]
name = "deployment"
agents = ["devops"]
parallel = false
approval_required = false
[[workflows]]
name = "documentation_update"
trigger = "manual"
[[workflows.stages]]
name = "content_creation"
agents = ["technical_writer"]
parallel = false
approval_required = false
[[workflows.stages]]
name = "review"
agents = ["reviewer"]
parallel = false
approval_required = true
[[workflows.stages]]
name = "publish"
agents = ["devops"]
parallel = false
approval_required = false
[[workflows]]
name = "security_audit"
trigger = "manual"
[[workflows.stages]]
name = "code_analysis"
agents = ["security_engineer"]
parallel = false
approval_required = false
[[workflows.stages]]
name = "penetration_testing"
agents = ["security_engineer"]
parallel = false
approval_required = false
[[workflows.stages]]
name = "remediation"
agents = ["developer"]
parallel = false
approval_required = false
[[workflows.stages]]
name = "verification"
agents = ["security_engineer"]
parallel = false
approval_required = true

View file

@ -5,7 +5,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Result;
use axum::{extract::State, routing::get, Json, Router};
use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::get, Json, Router};
use clap::Parser;
use serde_json::json;
use tokio::net::TcpListener;
@ -26,6 +26,12 @@ struct AppState {
coordinator: Arc<AgentCoordinator>,
#[allow(dead_code)]
budget_manager: Option<Arc<BudgetManager>>,
/// Path to the agent config file, used for re-loading on hot-reload.
config_path: String,
/// LLM router shared across executors.
router: Option<Arc<LLMRouter>>,
/// Capability registry for re-spawning capability executors.
cap_registry: Arc<CapabilityRegistry>,
}
#[derive(Parser, Debug)]
@ -41,11 +47,7 @@ struct AppState {
)]
struct Args {
/// Path to budget configuration file
#[arg(
long,
default_value = "config/agent-budgets.toml",
env = "BUDGET_CONFIG_PATH"
)]
#[arg(long, default_value = "config/config.ncl", env = "BUDGET_CONFIG_PATH")]
budget_config: String,
}
@ -64,6 +66,8 @@ async fn main() -> Result<()> {
// Load agent configuration
let config = AgentConfig::from_env()?;
let config_path =
std::env::var("VAPORA_AGENT_CONFIG").unwrap_or_else(|_| "config/config.ncl".to_string());
info!("Loaded configuration from environment");
// Load budget configuration
@ -96,7 +100,7 @@ async fn main() -> Result<()> {
let router = router.map(Arc::new);
// Initialize capability registry with built-in capability packages
let cap_registry = CapabilityRegistry::with_built_ins();
let cap_registry = Arc::new(CapabilityRegistry::with_built_ins());
info!(
"Capability registry initialized: {:?}",
cap_registry.list_ids()
@ -118,8 +122,6 @@ async fn main() -> Result<()> {
let coordinator = Arc::new(coordinator);
// Spawn one executor per built-in capability, each wired to the LLM router.
// The executor's channel sender is registered with the coordinator so that
// assign_task() dispatches directly in-process.
for cap_id in cap_registry.list_ids() {
spawn_capability_executor(
&cap_id,
@ -130,8 +132,7 @@ async fn main() -> Result<()> {
);
}
// Spawn executors for any agents defined in agents.toml that are NOT
// already covered by a capability package (role not registered yet).
// Spawn executors for config agents not covered by a capability package.
for agent_def in &config.agents {
if registry.get_agents_by_role(&agent_def.role).is_empty() {
spawn_single_config_executor(agent_def, &registry, &coordinator, router.as_ref());
@ -151,13 +152,38 @@ async fn main() -> Result<()> {
};
let state = AppState {
coordinator,
coordinator: Arc::clone(&coordinator),
budget_manager,
config_path: config_path.clone(),
router: router.clone(),
cap_registry: Arc::clone(&cap_registry),
};
// SIGHUP handler for config reload. Note: there is a brief unavailability
// window between drain and re-registration during which assign_task returns
// NoAvailableAgent. Learning profiles (keyed by stable_id) are preserved.
tokio::spawn({
let state = state.clone();
let registry = Arc::clone(&registry);
async move {
let mut sighup =
match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup()) {
Ok(s) => s,
Err(e) => {
warn!("Failed to register SIGHUP handler: {}", e);
return;
}
};
while sighup.recv().await.is_some() {
handle_sighup_reload(&state, &registry).await;
}
}
});
let app = Router::new()
.route("/health", get(health_handler))
.route("/ready", get(readiness_handler))
.route("/reload", axum::routing::post(reload_handler))
.with_state(state);
let addr = std::env::var("BIND_ADDR").unwrap_or_else(|_| "0.0.0.0:9000".to_string());
@ -169,6 +195,76 @@ async fn main() -> Result<()> {
Ok(())
}
/// Called from the SIGHUP loop. Loads a fresh config and delegates to
/// `reload_agents`. Errors are logged; the process keeps running.
async fn handle_sighup_reload(state: &AppState, registry: &Arc<AgentRegistry>) {
info!("SIGHUP received: reloading agent configuration");
let new_config = match AgentConfig::load(&state.config_path) {
Ok(c) => c,
Err(e) => {
error!("Config parse failed during reload: {}", e);
return;
}
};
match reload_agents(
&state.coordinator,
registry,
&new_config,
&state.cap_registry,
state.router.as_ref(),
)
.await
{
Ok(n) => info!("Reload complete: {} agents active", n),
Err(e) => error!("Reload failed: {}", e),
}
}
/// Drain all roles and re-spawn executors from config + capability registry.
///
/// Learning profiles (keyed by `stable_id`) are preserved in the coordinator;
/// new executor instances inherit the accumulated expertise immediately.
///
/// # Availability window
///
/// Between the drain phase and the first successful `register_agent`, callers
/// of `assign_task` for the affected roles receive `NoAvailableAgent`. This is
/// a brief window (microseconds to low milliseconds). Callers must handle this
/// error and retry. BudgetManager and LLMRouter are not reloaded; changes to
/// those require a process restart.
async fn reload_agents(
coordinator: &Arc<AgentCoordinator>,
registry: &Arc<AgentRegistry>,
new_config: &AgentConfig,
cap_registry: &Arc<CapabilityRegistry>,
router: Option<&Arc<LLMRouter>>,
) -> anyhow::Result<usize> {
// Drain all currently active roles. The dropped Sender causes each
// executor's recv loop to exit after draining in-flight messages.
for role in registry.list_roles() {
let drained = coordinator.drain_role(&role);
if !drained.is_empty() {
info!("Drained {} agent(s) for role '{}'", drained.len(), role);
}
}
// Re-spawn capability executors
for cap_id in cap_registry.list_ids() {
spawn_capability_executor(cap_id.as_str(), cap_registry, registry, coordinator, router);
}
// Re-spawn config agents not covered by capabilities
for agent_def in &new_config.agents {
if registry.get_agents_by_role(&agent_def.role).is_empty() {
spawn_single_config_executor(agent_def, registry, coordinator, router);
}
}
let total = registry.total_count();
info!("Reload complete: {} agents active", total);
Ok(total)
}
/// Activate a capability, register the resulting agent, and spawn its executor.
fn spawn_capability_executor(
cap_id: &str,
@ -381,3 +477,34 @@ async fn readiness_handler(State(state): State<AppState>) -> Json<serde_json::Va
"agents": state.coordinator.get_agent_count().await
}))
}
async fn reload_handler(State(state): State<AppState>) -> impl IntoResponse {
let new_config = match AgentConfig::load(&state.config_path) {
Ok(c) => c,
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": e.to_string()})),
)
.into_response();
}
};
let registry = state.coordinator.registry_arc();
match reload_agents(
&state.coordinator,
&registry,
&new_config,
&state.cap_registry,
state.router.as_ref(),
)
.await
{
Ok(n) => (StatusCode::OK, Json(json!({"reloaded": true, "agents": n}))).into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": e.to_string()})),
)
.into_response(),
}
}

View file

@ -11,8 +11,8 @@ pub enum ConfigError {
#[error("Failed to read config file: {0}")]
ReadError(#[from] std::io::Error),
#[error("Failed to parse TOML: {0}")]
ParseError(#[from] toml::de::Error),
#[error("Failed to parse config: {0}")]
ParseJson(#[from] serde_json::Error),
#[error("Invalid configuration: {0}")]
ValidationError(String),
@ -52,10 +52,55 @@ fn default_agent_timeout() -> u64 {
pub use vapora_shared::AgentDefinition;
impl AgentConfig {
/// Load configuration from TOML file
/// Load configuration from a TOML or NCL file. When the path has a `.ncl`
/// extension, `nickel export --format json` is invoked and the resulting
/// JSON is parsed. Otherwise the file is read and parsed as TOML.
pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, ConfigError> {
let path = path.as_ref();
let (raw, is_json) = if path.extension().and_then(|e| e.to_str()) == Some("ncl") {
let out = std::process::Command::new("nickel")
.args(["export", "--format", "json"])
.arg(path)
.output()
.map_err(|e| {
ConfigError::ReadError(std::io::Error::other(format!(
"Failed to invoke nickel for {:?}: {}",
path, e
)))
})?;
if !out.status.success() {
let stderr = String::from_utf8_lossy(&out.stderr);
return Err(ConfigError::ReadError(std::io::Error::other(format!(
"nickel export failed for {:?}: {}",
path, stderr
))));
}
let json = String::from_utf8(out.stdout).map_err(|e| {
ConfigError::ReadError(std::io::Error::other(format!(
"nickel output is not valid UTF-8: {}",
e
)))
})?;
(json, true)
} else {
let content = std::fs::read_to_string(path)?;
let config: Self = toml::from_str(&content)?;
(content, false)
};
let interpolated = interpolate_env_vars(&raw);
let config: Self = if is_json {
serde_json::from_str(&interpolated)?
} else {
toml::from_str(&interpolated).map_err(|e| {
ConfigError::ReadError(std::io::Error::other(format!(
"Failed to parse TOML: {}",
e
)))
})?
};
config.validate()?;
Ok(config)
}
@ -63,12 +108,11 @@ impl AgentConfig {
/// Load configuration from environment or default file
pub fn from_env() -> Result<Self, ConfigError> {
let config_path = std::env::var("VAPORA_AGENT_CONFIG")
.unwrap_or_else(|_| "/etc/vapora/agents.toml".to_string());
.unwrap_or_else(|_| "config/config.ncl".to_string());
if Path::new(&config_path).exists() {
Self::load(&config_path)
} else {
// Return default config if file doesn't exist
Ok(Self::default())
}
}
@ -129,6 +173,34 @@ impl Default for AgentConfig {
}
}
/// Expand every `${VAR}` / `${VAR:-default}` reference in `content`.
/// Unresolved vars without a default are replaced with an empty string.
fn interpolate_env_vars(content: &str) -> String {
let mut result = String::with_capacity(content.len());
let mut remaining = content;
while let Some(start) = remaining.find("${") {
result.push_str(&remaining[..start]);
let after_open = &remaining[start + 2..];
if let Some(close) = after_open.find('}') {
let var_expr = &after_open[..close];
let value = if let Some(sep) = var_expr.find(":-") {
let var_name = &var_expr[..sep];
let default_val = &var_expr[sep + 2..];
std::env::var(var_name).unwrap_or_else(|_| default_val.to_string())
} else {
std::env::var(var_expr).unwrap_or_default()
};
result.push_str(&value);
remaining = &after_open[close + 1..];
} else {
result.push_str("${");
remaining = after_open;
}
}
result.push_str(remaining);
result
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -1,5 +1,4 @@
// vapora-agents: Agent coordinator - orchestrates agent workflows
// Phase 2: Complete implementation with NATS integration
use std::collections::HashMap;
use std::path::PathBuf;
@ -238,7 +237,7 @@ impl AgentCoordinator {
// Simple heuristic: check if title/description contains known task types
let task_type = extract_task_type(&title, &description, role);
// Get learning profiles for all candidates
// Get learning profiles for all candidates, keyed by stable_id.
let learning_profiles = {
let profiles = self
.learning_profiles
@ -246,7 +245,10 @@ impl AgentCoordinator {
.unwrap_or_else(|e| e.into_inner());
candidates
.iter()
.map(|a| (a.id.clone(), profiles.get(&a.id).cloned()))
.map(|a| {
let key = a.stable_id_or_role();
(a.id.clone(), profiles.get(key).cloned())
})
.collect::<Vec<_>>()
};
@ -426,6 +428,27 @@ impl AgentCoordinator {
Arc::clone(&self.registry)
}
/// Shared reference to the registry (for hot-reload coordination).
pub fn registry_arc(&self) -> Arc<AgentRegistry> {
Arc::clone(&self.registry)
}
/// Drain all agents for `role`: removes them from the registry and drops
/// their in-process executor channels.
///
/// Learning profiles keyed by `stable_id` are intentionally preserved so
/// that re-spawned executors immediately benefit from past expertise.
///
/// The dropped `Sender` causes the executor loop to exit once it drains
/// any in-flight messages.
pub fn drain_role(&self, role: &str) -> Vec<String> {
let ids = self.registry.drain_role(role);
for id in &ids {
self.executor_channels.remove(id);
}
ids
}
/// Start coordinator (subscribe to NATS topics)
pub async fn start(&self) -> Result<(), CoordinatorError> {
if self.nats_client.is_none() {
@ -535,17 +558,18 @@ impl AgentCoordinator {
);
for agent in agents {
let key = agent.stable_id_or_role().to_string();
match self
.load_learning_profile_from_kg(&agent.id, task_type, kg_persistence)
.load_learning_profile_from_kg(&key, task_type, kg_persistence)
.await
{
Ok(profile) => {
self.update_learning_profile(&agent.id, profile)?;
self.update_learning_profile(&key, profile)?;
}
Err(e) => {
warn!(
"Failed to load learning profile for agent {}: {}",
agent.id, e
"Failed to load learning profile for agent {} (stable_id: {}): {}",
agent.id, key, e
);
// Continue with other agents on failure
}
@ -743,6 +767,35 @@ mod tests {
assert!(task_id.is_ok());
}
#[test]
fn test_profile_survives_role_drain() {
let registry = Arc::new(AgentRegistry::new(5));
let agent = AgentMetadata::new(
"developer".to_string(),
"Dev 1".to_string(),
"claude".to_string(),
"claude-sonnet-4".to_string(),
vec![],
);
registry.register_agent(agent).unwrap();
let coordinator = AgentCoordinator::with_registry(Arc::clone(&registry));
// Insert a profile under the stable_id key
let profile = crate::learning_profile::LearningProfile::new("developer".to_string());
coordinator
.update_learning_profile("developer", profile)
.unwrap();
// Drain the role (removes agents + channels)
let drained = coordinator.drain_role("developer");
assert_eq!(drained.len(), 1);
// Profile must survive the drain
let retained = coordinator.get_learning_profile("developer");
assert!(retained.is_some(), "profile must survive drain_role");
}
#[tokio::test]
async fn test_no_available_agent() {
// Set schema directory for tests (relative to workspace root)

View file

@ -1,6 +1,4 @@
// Profile adapter: AgentMetadata + KG metrics → Swarm AgentProfile
// Phase 5.2: Bridges agent registry with swarm coordination
// Phase 5.3: Integrates per-task-type learning profiles from KG
use vapora_swarm::messages::AgentProfile;
@ -40,9 +38,12 @@ impl ProfileAdapter {
profile
}
/// Create learning profile from agent with task-type expertise.
/// Integrates per-task-type learning data from KG for intelligent
/// assignment.
/// Create a learning profile with the given ID.
///
/// Callers should pass `agent.stable_id_or_role()` (not `agent.id`) so
/// that the profile survives hot-reloads. The coordinator already enforces
/// this; use this function for constructing profiles that will be inserted
/// via `AgentCoordinator::update_learning_profile`.
pub fn create_learning_profile(agent_id: String) -> LearningProfile {
LearningProfile::new(agent_id)
}
@ -79,6 +80,7 @@ mod tests {
fn test_profile_creation_from_metadata() {
let agent = AgentMetadata {
id: "agent-1".to_string(),
stable_id: "developer".to_string(),
role: "developer".to_string(),
name: "Dev Agent 1".to_string(),
version: "0.1.0".to_string(),
@ -109,6 +111,7 @@ mod tests {
let agents = vec![
AgentMetadata {
id: "agent-1".to_string(),
stable_id: "developer".to_string(),
role: "developer".to_string(),
name: "Dev 1".to_string(),
version: "0.1.0".to_string(),
@ -126,6 +129,7 @@ mod tests {
},
AgentMetadata {
id: "agent-2".to_string(),
stable_id: "reviewer".to_string(),
role: "reviewer".to_string(),
name: "Reviewer 1".to_string(),
version: "0.1.0".to_string(),

View file

@ -1,5 +1,4 @@
// vapora-agents: Agent registry - manages agent lifecycle and availability
// Phase 2: Complete implementation with 12 agent roles
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
@ -35,7 +34,12 @@ pub enum AgentStatus {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentMetadata {
/// Ephemeral UUID generated at runtime — changes every startup.
pub id: String,
/// Deterministic identity keyed on role — survives hot-reloads.
/// Defaults to `role` for data produced before this field existed.
#[serde(default)]
pub stable_id: String,
pub role: String,
pub name: String,
pub version: String,
@ -66,6 +70,7 @@ impl AgentMetadata {
let now = Utc::now();
Self {
id: Uuid::new_v4().to_string(),
stable_id: role.clone(),
role,
name,
version: "0.1.0".to_string(),
@ -90,6 +95,18 @@ impl AgentMetadata {
self
}
/// Stable profile identity: `stable_id` when non-empty, `role` otherwise.
///
/// The fallback handles data deserialized from records produced before
/// `stable_id` was introduced.
pub fn stable_id_or_role(&self) -> &str {
if self.stable_id.is_empty() {
&self.role
} else {
&self.stable_id
}
}
/// Check if agent can accept new tasks
pub fn can_accept_task(&self) -> bool {
self.status == AgentStatus::Active && self.current_tasks < self.max_concurrent_tasks
@ -283,6 +300,40 @@ impl AgentRegistry {
let inner = self.inner.read().expect("Failed to acquire read lock");
inner.agents.len()
}
/// Remove all agents for `role` from the registry and return their
/// ephemeral IDs. The `running_count` entry for the role is also cleared.
///
/// Called during hot-reload to drain a role before re-spawning executors.
/// Learning profiles (keyed by `stable_id`) are unaffected.
pub fn drain_role(&self, role: &str) -> Vec<String> {
let mut inner = self.inner.write().expect("registry write lock");
let ids: Vec<String> = inner
.agents
.values()
.filter(|a| a.role == role)
.map(|a| a.id.clone())
.collect();
for id in &ids {
inner.agents.remove(id);
}
inner.running_count.remove(role);
ids
}
/// Return the set of roles currently present in the registry.
pub fn list_roles(&self) -> Vec<String> {
let inner = self.inner.read().expect("registry read lock");
let mut roles: Vec<String> = inner
.agents
.values()
.map(|a| a.role.clone())
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
roles.sort();
roles
}
}
impl Default for AgentRegistry {
@ -363,6 +414,98 @@ mod tests {
assert_eq!(agent.total_tasks_completed, 1);
}
#[test]
fn test_stable_id_deterministic() {
let a = AgentMetadata::new(
"developer".to_string(),
"Dev 1".to_string(),
"claude".to_string(),
"claude-sonnet-4".to_string(),
vec![],
);
let b = AgentMetadata::new(
"developer".to_string(),
"Dev 2".to_string(),
"claude".to_string(),
"claude-sonnet-4".to_string(),
vec![],
);
assert_ne!(a.id, b.id, "ephemeral IDs must differ");
assert_eq!(a.stable_id, "developer");
assert_eq!(b.stable_id, "developer");
assert_eq!(a.stable_id_or_role(), "developer");
}
#[test]
fn test_stable_id_or_role_fallback() {
let mut agent = AgentMetadata::new(
"reviewer".to_string(),
"Rev".to_string(),
"claude".to_string(),
"claude-sonnet-4".to_string(),
vec![],
);
agent.stable_id = String::new();
assert_eq!(agent.stable_id_or_role(), "reviewer");
}
#[test]
fn test_drain_role() {
let registry = AgentRegistry::new(5);
for i in 0..2 {
let agent = AgentMetadata::new(
"developer".to_string(),
format!("Dev {}", i),
"claude".to_string(),
"claude-sonnet-4".to_string(),
vec![],
);
registry.register_agent(agent).unwrap();
}
let reviewer = AgentMetadata::new(
"reviewer".to_string(),
"Rev".to_string(),
"claude".to_string(),
"claude-sonnet-4".to_string(),
vec![],
);
registry.register_agent(reviewer).unwrap();
let drained = registry.drain_role("developer");
assert_eq!(drained.len(), 2);
assert_eq!(registry.count_by_role("developer"), 0);
assert_eq!(registry.count_by_role("reviewer"), 1);
assert_eq!(registry.total_count(), 1);
}
#[test]
fn test_list_roles() {
let registry = AgentRegistry::new(5);
let a = AgentMetadata::new(
"developer".to_string(),
"Dev".to_string(),
"claude".to_string(),
"claude-sonnet-4".to_string(),
vec![],
);
let b = AgentMetadata::new(
"reviewer".to_string(),
"Rev".to_string(),
"claude".to_string(),
"claude-sonnet-4".to_string(),
vec![],
);
registry.register_agent(a).unwrap();
registry.register_agent(b).unwrap();
let mut roles = registry.list_roles();
roles.sort();
assert_eq!(roles, vec!["developer", "reviewer"]);
}
#[test]
fn test_get_available_agent() {
let registry = AgentRegistry::new(5);

View file

@ -64,10 +64,13 @@ impl AgentExecutor {
/// Run the executor loop, processing tasks until the channel closes.
pub async fn run(mut self) {
info!(
"AgentExecutor started for agent: {}",
self.agent.metadata.id
"AgentExecutor started for agent: {} (stable_id: {})",
self.agent.metadata.id,
self.agent.metadata.stable_id_or_role()
);
let agent_id = self.agent.metadata.id.clone();
// Use stable_id so KG records survive hot-reloads and are correlated
// across agent restarts for the same role.
let agent_id = self.agent.metadata.stable_id_or_role().to_string();
while let Some(task) = self.task_rx.recv().await {
debug!("Received task: {}", task.id);
@ -275,6 +278,7 @@ mod tests {
async fn test_executor_creation() {
let metadata = AgentMetadata {
id: "test-executor".to_string(),
stable_id: "developer".to_string(),
role: "developer".to_string(),
name: "Test Executor".to_string(),
version: "0.1.0".to_string(),
@ -303,6 +307,7 @@ mod tests {
fn test_executor_persistence_disabled_by_default() {
let metadata = AgentMetadata {
id: "test-no-persist".to_string(),
stable_id: "reviewer".to_string(),
role: "reviewer".to_string(),
name: "Test No Persist".to_string(),
version: "0.1.0".to_string(),

View file

@ -150,6 +150,7 @@ mod tests {
// Create metadata for testing
let metadata = AgentMetadata {
id: "test-agent".to_string(),
stable_id: "developer".to_string(),
role: "developer".to_string(),
name: "Test Developer".to_string(),
version: "0.1.0".to_string(),
@ -201,6 +202,7 @@ mod tests {
fn test_failed_state_transition() {
let metadata = AgentMetadata {
id: "test-agent".to_string(),
stable_id: "developer".to_string(),
role: "developer".to_string(),
name: "Test Developer".to_string(),
version: "0.1.0".to_string(),

View file

@ -37,9 +37,6 @@ async fn test_end_to_end_learning_with_budget_enforcement() {
vec!["coding".to_string(), "documentation".to_string()],
);
let dev_a_id = developer_a.id.clone();
let dev_b_id = developer_b.id.clone();
registry.register_agent(developer_a).ok();
registry.register_agent(developer_b).ok();
@ -62,9 +59,10 @@ async fn test_end_to_end_learning_with_budget_enforcement() {
let budget_manager = Arc::new(BudgetManager::new(budgets));
let coordinator = coordinator.with_budget_manager(budget_manager.clone());
// Simulate historical executions for developer_a (excellent at coding)
// Build a role-level learning profile keyed by stable_id ("developer").
// All developer agents share one profile since stable_id = role.
let now = Utc::now();
let dev_a_executions: Vec<ExecutionData> = (0..30)
let dev_executions: Vec<ExecutionData> = (0..30)
.map(|i| ExecutionData {
timestamp: now - Duration::days(i),
duration_ms: 200 + (i as u64 * 5),
@ -72,46 +70,22 @@ async fn test_end_to_end_learning_with_budget_enforcement() {
})
.collect();
// Simulate historical executions for developer_b (mediocre at coding)
let dev_b_executions: Vec<ExecutionData> = (0..30)
.map(|i| ExecutionData {
timestamp: now - Duration::days(i),
duration_ms: 300 + (i as u64 * 10),
success: i < 20, // 67% success rate
})
.collect();
let dev_expertise = TaskTypeExpertise::from_executions(dev_executions, "coding");
// Calculate expertise from executions
let dev_a_expertise = TaskTypeExpertise::from_executions(dev_a_executions, "coding");
let dev_b_expertise = TaskTypeExpertise::from_executions(dev_b_executions, "coding");
assert!(dev_expertise.success_rate > 0.9);
// Verify expertise calculations
assert!(dev_a_expertise.success_rate > 0.9);
assert!(dev_b_expertise.success_rate > 0.6 && dev_b_expertise.success_rate < 0.7);
assert!(dev_a_expertise.success_rate > dev_b_expertise.success_rate);
let mut role_profile = ProfileAdapter::create_learning_profile("developer".to_string());
role_profile =
ProfileAdapter::add_task_type_expertise(role_profile, "coding".to_string(), dev_expertise);
// Create learning profiles
let mut profile_a = ProfileAdapter::create_learning_profile(dev_a_id.clone());
profile_a =
ProfileAdapter::add_task_type_expertise(profile_a, "coding".to_string(), dev_a_expertise);
let mut profile_b = ProfileAdapter::create_learning_profile(dev_b_id.clone());
profile_b =
ProfileAdapter::add_task_type_expertise(profile_b, "coding".to_string(), dev_b_expertise);
// Update coordinator with learning profiles
// Insert under stable_id = "developer" so assign_task can find it.
coordinator
.update_learning_profile(&dev_a_id, profile_a.clone())
.ok();
coordinator
.update_learning_profile(&dev_b_id, profile_b.clone())
.update_learning_profile("developer", role_profile)
.ok();
// Verify profiles are stored
let stored_a = coordinator.get_learning_profile(&dev_a_id);
let stored_b = coordinator.get_learning_profile(&dev_b_id);
assert!(stored_a.is_some());
assert!(stored_b.is_some());
// Verify profile is stored under stable_id key
let stored = coordinator.get_learning_profile("developer");
assert!(stored.is_some(), "Role-level profile must be stored");
// Check budget status before task assignment
let budget_status = budget_manager.check_budget("developer").await.unwrap();
@ -119,7 +93,7 @@ async fn test_end_to_end_learning_with_budget_enforcement() {
assert!(!budget_status.near_threshold);
assert_eq!(budget_status.monthly_remaining_cents, 100000);
// Assign a coding task (should go to developer_a based on learning)
// Assign a coding task — learning profile found → profile-based routing.
let task_id = coordinator
.assign_task(
"developer",
@ -131,28 +105,15 @@ async fn test_end_to_end_learning_with_budget_enforcement() {
.await
.expect("Should assign task");
// Verify task was assigned (we can check via registry)
// Verify one developer was assigned the task
let all_agents = coordinator.registry().list_all();
let dev_a_tasks = all_agents
.iter()
.find(|a| a.id == dev_a_id)
.map(|a| a.current_tasks)
.unwrap_or(0);
let total_assigned: u32 = all_agents.iter().map(|a| a.current_tasks).sum();
assert_eq!(total_assigned, 1, "Exactly one agent must have the task");
let _dev_b_tasks = all_agents
.iter()
.find(|a| a.id == dev_b_id)
.map(|a| a.current_tasks)
.unwrap_or(0);
// Developer A (high expertise) should be selected
assert!(
dev_a_tasks > 0,
"Developer A (high expertise) should have been assigned the task"
);
// Simulate task completion
coordinator.complete_task(&task_id, &dev_a_id).await.ok();
// Complete the task using the agent that received it
if let Some(assigned) = all_agents.iter().find(|a| a.current_tasks > 0) {
coordinator.complete_task(&task_id, &assigned.id).await.ok();
}
// Verify budget status is still within limits
let budget_status = budget_manager.check_budget("developer").await.unwrap();
@ -160,7 +121,7 @@ async fn test_end_to_end_learning_with_budget_enforcement() {
// Simulate multiple tasks to test cumulative budget tracking
for i in 0..5 {
let task = coordinator
if let Ok(tid) = coordinator
.assign_task(
"developer",
format!("Task {}", i),
@ -168,15 +129,11 @@ async fn test_end_to_end_learning_with_budget_enforcement() {
"Context".to_string(),
1,
)
.await;
if task.is_ok() {
let agents = coordinator.registry().list_all();
if let Some(dev_a) = agents.iter().find(|a| a.id == dev_a_id) {
coordinator
.complete_task(&format!("task-{}", i), &dev_a.id)
.await
.ok();
{
let agents = coordinator.registry().list_all();
if let Some(assigned) = agents.iter().find(|a| a.current_tasks > 0) {
coordinator.complete_task(&tid, &assigned.id).await.ok();
}
}
}
@ -188,9 +145,12 @@ async fn test_end_to_end_learning_with_budget_enforcement() {
"Should not exceed monthly budget"
);
// Verify learning profiles are still intact
// Verify role-level profile is intact
let all_profiles = coordinator.get_all_learning_profiles();
assert_eq!(all_profiles.len(), 2, "Both profiles should be stored");
assert!(
all_profiles.contains_key("developer"),
"Role-level profile must survive task processing"
);
}
/// Test that budget enforcement doesn't break learning-based selection
@ -214,9 +174,6 @@ async fn test_learning_selection_with_budget_constraints() {
vec!["coding".to_string()],
);
let expert_id = agent_expert.id.clone();
let novice_id = agent_novice.id.clone();
registry.register_agent(agent_expert).ok();
registry.register_agent(agent_novice).ok();
@ -238,9 +195,9 @@ async fn test_learning_selection_with_budget_constraints() {
let budget_manager = Arc::new(BudgetManager::new(budgets));
let coordinator = coordinator.with_budget_manager(budget_manager.clone());
// Create expertise profiles
// Build a role-level learning profile for "developer" (stable_id = role).
let now = Utc::now();
let expert_execs: Vec<ExecutionData> = (0..20)
let role_execs: Vec<ExecutionData> = (0..20)
.map(|i| ExecutionData {
timestamp: now - Duration::days(i),
duration_ms: 100,
@ -248,36 +205,14 @@ async fn test_learning_selection_with_budget_constraints() {
})
.collect();
let novice_execs: Vec<ExecutionData> = (0..20)
.map(|i| ExecutionData {
timestamp: now - Duration::days(i),
duration_ms: 100,
success: i < 12, // 60% success
})
.collect();
let role_expertise = TaskTypeExpertise::from_executions(role_execs, "coding");
let expert_expertise = TaskTypeExpertise::from_executions(expert_execs, "coding");
let novice_expertise = TaskTypeExpertise::from_executions(novice_execs, "coding");
let mut expert_profile = ProfileAdapter::create_learning_profile(expert_id.clone());
expert_profile = ProfileAdapter::add_task_type_expertise(
expert_profile,
"coding".to_string(),
expert_expertise,
);
let mut novice_profile = ProfileAdapter::create_learning_profile(novice_id.clone());
novice_profile = ProfileAdapter::add_task_type_expertise(
novice_profile,
"coding".to_string(),
novice_expertise,
);
let mut role_profile = ProfileAdapter::create_learning_profile("developer".to_string());
role_profile =
ProfileAdapter::add_task_type_expertise(role_profile, "coding".to_string(), role_expertise);
coordinator
.update_learning_profile(&expert_id, expert_profile)
.ok();
coordinator
.update_learning_profile(&novice_id, novice_profile)
.update_learning_profile("developer", role_profile)
.ok();
// Verify budget status
@ -287,11 +222,10 @@ async fn test_learning_selection_with_budget_constraints() {
"Initial budget should be healthy"
);
// Assign multiple tasks - expert should be consistently selected
let mut expert_count = 0;
#[allow(clippy::excessive_nesting)]
// Assign multiple tasks — profile-based scoring should pick some developer.
let mut assigned_count = 0;
for i in 0..3 {
if let Ok(_task_id) = coordinator
if let Ok(tid) = coordinator
.assign_task(
"developer",
format!("Coding Task {}", i),
@ -301,19 +235,17 @@ async fn test_learning_selection_with_budget_constraints() {
)
.await
{
assigned_count += 1;
let agents = coordinator.registry().list_all();
if let Some(expert) = agents.iter().find(|a| a.id == expert_id) {
if expert.current_tasks > 0 {
expert_count += 1;
}
if let Some(assigned) = agents.iter().find(|a| a.current_tasks > 0) {
coordinator.complete_task(&tid, &assigned.id).await.ok();
}
}
}
// Expert should have been selected more often
assert!(
expert_count > 0,
"Expert should have been selected despite budget constraints"
assigned_count > 0,
"Tasks should be assigned despite budget constraints"
);
}
@ -330,7 +262,6 @@ async fn test_learning_profile_improvement_with_budget_tracking() {
vec!["coding".to_string()],
);
let agent_id = agent.id.clone();
registry.register_agent(agent).ok();
let coordinator = AgentCoordinator::with_registry(registry);
@ -351,7 +282,7 @@ async fn test_learning_profile_improvement_with_budget_tracking() {
let budget_manager = Arc::new(BudgetManager::new(budgets));
let coordinator = coordinator.with_budget_manager(budget_manager.clone());
// Initial profile: mediocre performance
// Initial profile: mediocre performance — keyed by stable_id = "developer".
let now = Utc::now();
let initial_execs: Vec<ExecutionData> = (0..10)
.map(|i| ExecutionData {
@ -364,7 +295,7 @@ async fn test_learning_profile_improvement_with_budget_tracking() {
let mut initial_expertise = TaskTypeExpertise::from_executions(initial_execs, "coding");
assert!((initial_expertise.success_rate - 0.5).abs() < 0.01);
let mut profile = ProfileAdapter::create_learning_profile(agent_id.clone());
let mut profile = ProfileAdapter::create_learning_profile("developer".to_string());
profile = ProfileAdapter::add_task_type_expertise(
profile,
"coding".to_string(),
@ -372,17 +303,17 @@ async fn test_learning_profile_improvement_with_budget_tracking() {
);
coordinator
.update_learning_profile(&agent_id, profile.clone())
.update_learning_profile("developer", profile)
.ok();
// Check initial profile
let stored_profile = coordinator.get_learning_profile(&agent_id).unwrap();
let stored_profile = coordinator.get_learning_profile("developer").unwrap();
assert_eq!(
stored_profile.get_task_type_score("coding"),
initial_expertise.success_rate
);
// Simulate improvement: add successful recent executions
// Simulate improvement: add successful recent execution
let new_exec = ExecutionData {
timestamp: now,
duration_ms: 120,
@ -396,7 +327,7 @@ async fn test_learning_profile_improvement_with_budget_tracking() {
);
// Update profile with improved expertise
let mut updated_profile = ProfileAdapter::create_learning_profile(agent_id.clone());
let mut updated_profile = ProfileAdapter::create_learning_profile("developer".to_string());
updated_profile = ProfileAdapter::add_task_type_expertise(
updated_profile,
"coding".to_string(),
@ -404,11 +335,11 @@ async fn test_learning_profile_improvement_with_budget_tracking() {
);
coordinator
.update_learning_profile(&agent_id, updated_profile)
.update_learning_profile("developer", updated_profile)
.ok();
// Verify improvement is reflected
let final_profile = coordinator.get_learning_profile(&agent_id).unwrap();
let final_profile = coordinator.get_learning_profile("developer").unwrap();
let final_score = final_profile.get_task_type_score("coding");
assert!(final_score > 0.5, "Final score should reflect improvement");

View file

@ -320,9 +320,10 @@ async fn test_coordinator_assignment_with_learning_scores() {
// Create coordinator
let coordinator = AgentCoordinator::with_registry(registry);
// Create learning profiles: Agent A excels at coding, Agent B is mediocre
// Build a role-level learning profile for "developer" (stable_id = role).
// Both agents share this profile since they share a role.
let now = Utc::now();
let agent_a_executions: Vec<ExecutionData> = (0..20)
let executions: Vec<ExecutionData> = (0..20)
.map(|i| ExecutionData {
timestamp: now - Duration::days(i),
duration_ms: 100,
@ -330,40 +331,19 @@ async fn test_coordinator_assignment_with_learning_scores() {
})
.collect();
let agent_b_executions: Vec<ExecutionData> = (0..20)
.map(|i| ExecutionData {
timestamp: now - Duration::days(i),
duration_ms: 100,
success: i < 14, // 70% success rate
})
.collect();
let expertise = TaskTypeExpertise::from_executions(executions, "coding");
let agent_a_expertise = TaskTypeExpertise::from_executions(agent_a_executions, "coding");
let agent_b_expertise = TaskTypeExpertise::from_executions(agent_b_executions, "coding");
// Profiles are keyed by stable_id ("developer") so they survive hot-reloads.
let mut role_profile = ProfileAdapter::create_learning_profile("developer".to_string());
role_profile =
ProfileAdapter::add_task_type_expertise(role_profile, "coding".to_string(), expertise);
let mut agent_a_profile = ProfileAdapter::create_learning_profile(agent_a_id.clone());
agent_a_profile = ProfileAdapter::add_task_type_expertise(
agent_a_profile,
"coding".to_string(),
agent_a_expertise,
);
let mut agent_b_profile = ProfileAdapter::create_learning_profile(agent_b_id.clone());
agent_b_profile = ProfileAdapter::add_task_type_expertise(
agent_b_profile,
"coding".to_string(),
agent_b_expertise,
);
// Update coordinator with learning profiles
coordinator
.update_learning_profile(&agent_a_id, agent_a_profile)
.ok();
coordinator
.update_learning_profile(&agent_b_id, agent_b_profile)
.update_learning_profile("developer", role_profile)
.ok();
// Assign a coding task
// Assign a coding task — profile-based scoring will be used since a
// "developer" profile exists.
let _task_id = coordinator
.assign_task(
"developer",
@ -375,37 +355,31 @@ async fn test_coordinator_assignment_with_learning_scores() {
.await
.expect("Should assign task");
// Get the registry to verify which agent was selected
// Verify one developer was assigned the task
let registry = coordinator.registry();
let agent_a_tasks = registry
let total_tasks: u32 = registry
.list_all()
.iter()
.find(|a| a.id == agent_a_id)
.filter(|a| a.role == "developer")
.map(|a| a.current_tasks)
.unwrap_or(0);
.sum();
let agent_b_tasks = registry
.list_all()
.iter()
.find(|a| a.id == agent_b_id)
.map(|a| a.current_tasks)
.unwrap_or(0);
assert_eq!(total_tasks, 1, "Exactly one developer should have the task");
// Agent A (higher expertise in coding) should have been selected
assert!(
agent_a_tasks > 0,
"Agent A (coding specialist) should have 1+ tasks"
);
assert_eq!(agent_b_tasks, 0, "Agent B (generalist) should have 0 tasks");
// Verify learning profiles are stored
// Verify the profile is stored under the stable_id key
let stored_profiles = coordinator.get_all_learning_profiles();
assert!(
stored_profiles.contains_key(&agent_a_id),
"Agent A profile should be stored"
stored_profiles.contains_key("developer"),
"Role-level 'developer' profile must be stored"
);
// The per-instance IDs should NOT be the profile keys after the refactor
assert!(
!stored_profiles.contains_key(&agent_a_id),
"Ephemeral agent IDs must not be profile keys"
);
assert!(
stored_profiles.contains_key(&agent_b_id),
"Agent B profile should be stored"
!stored_profiles.contains_key(&agent_b_id),
"Ephemeral agent IDs must not be profile keys"
);
}

View file

@ -104,23 +104,52 @@ pub struct MetricsConfig {
}
impl Config {
/// Load configuration from a TOML file with environment variable
/// interpolation
/// Load configuration from a TOML or NCL file with environment variable
/// interpolation. When the path has a `.ncl` extension, `nickel export
/// --format json` is invoked and the resulting JSON is parsed. Otherwise
/// the file is read and parsed as TOML (legacy / test compatibility).
pub fn load<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref();
// Read file content
let (raw, is_json) = if path.extension().and_then(|e| e.to_str()) == Some("ncl") {
let out = std::process::Command::new("nickel")
.args(["export", "--format", "json"])
.arg(path)
.output()
.map_err(|e| {
VaporaError::ConfigError(format!(
"Failed to invoke nickel for {:?}: {}",
path, e
))
})?;
if !out.status.success() {
let stderr = String::from_utf8_lossy(&out.stderr);
return Err(VaporaError::ConfigError(format!(
"nickel export failed for {:?}: {}",
path, stderr
)));
}
let json = String::from_utf8(out.stdout).map_err(|e| {
VaporaError::ConfigError(format!("nickel output is not valid UTF-8: {}", e))
})?;
(json, true)
} else {
let content = fs::read_to_string(path).map_err(|e| {
VaporaError::ConfigError(format!("Failed to read config file {:?}: {}", path, e))
})?;
(content, false)
};
// Interpolate environment variables
let interpolated = Self::interpolate_env_vars(&content)?;
let interpolated = Self::interpolate_env_vars(&raw)?;
// Parse TOML
let config: Config = toml::from_str(&interpolated)?;
let config: Config = if is_json {
serde_json::from_str(&interpolated).map_err(|e| {
VaporaError::ConfigError(format!("Failed to parse config JSON: {}", e))
})?
} else {
toml::from_str(&interpolated)?
};
// Validate configuration
config.validate()?;
Ok(config)

View file

@ -48,7 +48,7 @@ struct Args {
#[arg(
short,
long,
default_value = "config/vapora.toml",
default_value = "config/config.ncl",
env = "VAPORA_CONFIG"
)]
config: String,

View file

@ -7,14 +7,16 @@ use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::RwLock;
use crate::config::interpolate_env_vars;
/// Budget configuration errors
#[derive(Debug, Error)]
pub enum BudgetConfigError {
#[error("Failed to read budget config file: {0}")]
ReadError(#[from] std::io::Error),
#[error("Failed to parse TOML: {0}")]
ParseError(#[from] toml::de::Error),
#[error("Failed to parse config: {0}")]
ParseJson(#[from] serde_json::Error),
#[error("Invalid budget configuration: {0}")]
ValidationError(String),
@ -84,24 +86,65 @@ pub struct BudgetConfig {
}
impl BudgetConfig {
/// Load budget configuration from TOML file
/// Load budget configuration from a TOML or NCL file. When the path has a
/// `.ncl` extension, `nickel export --format json` is invoked and the
/// resulting JSON is parsed. Otherwise the file is read and parsed as TOML.
pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, BudgetConfigError> {
let path = path.as_ref();
let (raw, is_json) = if path.extension().and_then(|e| e.to_str()) == Some("ncl") {
let out = std::process::Command::new("nickel")
.args(["export", "--format", "json"])
.arg(path)
.output()
.map_err(|e| {
BudgetConfigError::ReadError(std::io::Error::other(format!(
"Failed to invoke nickel for {:?}: {}",
path, e
)))
})?;
if !out.status.success() {
let stderr = String::from_utf8_lossy(&out.stderr);
return Err(BudgetConfigError::ReadError(std::io::Error::other(
format!("nickel export failed for {:?}: {}", path, stderr),
)));
}
let json = String::from_utf8(out.stdout).map_err(|e| {
BudgetConfigError::ReadError(std::io::Error::other(format!(
"nickel output is not valid UTF-8: {}",
e
)))
})?;
(json, true)
} else {
let content = std::fs::read_to_string(path)?;
let config: Self = toml::from_str(&content)?;
(content, false)
};
let interpolated = interpolate_env_vars(&raw);
let config: Self = if is_json {
serde_json::from_str(&interpolated)?
} else {
toml::from_str(&interpolated).map_err(|e| {
BudgetConfigError::ReadError(std::io::Error::other(format!(
"Failed to parse TOML: {}",
e
)))
})?
};
config.validate()?;
Ok(config)
}
/// Load from TOML with default fallback if file doesn't exist
/// Load with default fallback if file doesn't exist
pub fn load_or_default<P: AsRef<Path>>(path: P) -> Result<Self, BudgetConfigError> {
match Self::load(&path) {
Ok(config) => Ok(config),
Err(BudgetConfigError::ReadError(_)) => {
// File doesn't exist, use defaults
Ok(BudgetConfig {
Err(BudgetConfigError::ReadError(_)) => Ok(BudgetConfig {
budgets: HashMap::new(),
})
}
}),
Err(e) => Err(e),
}
}

View file

@ -12,8 +12,8 @@ pub enum ConfigError {
#[error("Failed to read config file: {0}")]
ReadError(#[from] std::io::Error),
#[error("Failed to parse TOML: {0}")]
ParseError(#[from] toml::de::Error),
#[error("Failed to parse config: {0}")]
ParseJson(#[from] serde_json::Error),
#[error("Invalid configuration: {0}")]
ValidationError(String),
@ -74,21 +74,66 @@ pub struct RoutingRule {
}
impl LLMRouterConfig {
/// Load configuration from TOML file
/// Load configuration from a TOML or NCL file. When the path has a `.ncl`
/// extension, `nickel export --format json` is invoked and the resulting
/// JSON is parsed with full `${VAR}` interpolation applied pre-parse.
/// Otherwise the file is read and parsed as TOML (legacy compatibility).
pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, ConfigError> {
let path = path.as_ref();
let (raw, is_json) = if path.extension().and_then(|e| e.to_str()) == Some("ncl") {
let out = std::process::Command::new("nickel")
.args(["export", "--format", "json"])
.arg(path)
.output()
.map_err(|e| {
ConfigError::ReadError(std::io::Error::other(format!(
"Failed to invoke nickel for {:?}: {}",
path, e
)))
})?;
if !out.status.success() {
let stderr = String::from_utf8_lossy(&out.stderr);
return Err(ConfigError::ReadError(std::io::Error::other(format!(
"nickel export failed for {:?}: {}",
path, stderr
))));
}
let json = String::from_utf8(out.stdout).map_err(|e| {
ConfigError::ReadError(std::io::Error::other(format!(
"nickel output is not valid UTF-8: {}",
e
)))
})?;
(json, true)
} else {
let content = std::fs::read_to_string(path)?;
let mut config: Self = toml::from_str(&content)?;
(content, false)
};
let interpolated = interpolate_env_vars(&raw);
let config: Self = if is_json {
serde_json::from_str(&interpolated)?
} else {
let mut c: Self = toml::from_str(&interpolated).map_err(|e| {
ConfigError::ReadError(std::io::Error::other(format!(
"Failed to parse TOML: {}",
e
)))
})?;
// Legacy TOML path: expand env vars in specific fields
c.expand_env_vars();
c
};
// Expand environment variables in API keys and URLs
config.expand_env_vars();
config.validate()?;
Ok(config)
}
/// Expand environment variables in configuration
/// Expand environment variables in API key and URL fields (TOML path only).
fn expand_env_vars(&mut self) {
for (_, provider) in self.providers.iter_mut() {
for provider in self.providers.values_mut() {
if let Some(ref api_key) = provider.api_key {
provider.api_key = Some(expand_env_var(api_key));
}
@ -136,7 +181,36 @@ impl LLMRouterConfig {
}
}
/// Expand environment variables in format ${VAR} or ${VAR:-default}
/// Expand every `${VAR}` / `${VAR:-default}` reference in `content`.
/// Unresolved vars without a default are replaced with an empty string.
pub(crate) fn interpolate_env_vars(content: &str) -> String {
let mut result = String::with_capacity(content.len());
let mut remaining = content;
while let Some(start) = remaining.find("${") {
result.push_str(&remaining[..start]);
let after_open = &remaining[start + 2..];
if let Some(close) = after_open.find('}') {
let var_expr = &after_open[..close];
let value = if let Some(sep) = var_expr.find(":-") {
let var_name = &var_expr[..sep];
let default_val = &var_expr[sep + 2..];
std::env::var(var_name).unwrap_or_else(|_| default_val.to_string())
} else {
std::env::var(var_expr).unwrap_or_default()
};
result.push_str(&value);
remaining = &after_open[close + 1..];
} else {
result.push_str("${");
remaining = after_open;
}
}
result.push_str(remaining);
result
}
/// Expand environment variables in format ${VAR} or ${VAR:-default} (single
/// token).
fn expand_env_var(input: &str) -> String {
if !input.starts_with("${") || !input.ends_with('}') {
return input.to_string();

View file

@ -101,11 +101,53 @@ pub struct StageConfig {
impl WorkflowsConfig {
pub fn load<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref();
let (raw, is_json) = if path.extension().and_then(|e| e.to_str()) == Some("ncl") {
let out = std::process::Command::new("nickel")
.args(["export", "--format", "json"])
.arg(path)
.output()
.map_err(|e| {
ConfigError::IoError(std::io::Error::other(format!(
"Failed to invoke nickel for {:?}: {}",
path, e
)))
})?;
if !out.status.success() {
let stderr = String::from_utf8_lossy(&out.stderr);
return Err(ConfigError::IoError(std::io::Error::other(format!(
"nickel export failed for {:?}: {}",
path, stderr
)))
.into());
}
let json = String::from_utf8(out.stdout).map_err(|e| {
ConfigError::IoError(std::io::Error::other(format!(
"nickel output is not valid UTF-8: {}",
e
)))
})?;
(json, true)
} else {
let content = std::fs::read_to_string(path).map_err(ConfigError::IoError)?;
let config: WorkflowsConfig = toml::from_str(&content).map_err(ConfigError::Parse)?;
(content, false)
};
let interpolated = interpolate_env_vars(&raw);
let config: WorkflowsConfig = if is_json {
serde_json::from_str(&interpolated).map_err(ConfigError::ParseJson)?
} else {
toml::from_str(&interpolated).map_err(|e| {
ConfigError::IoError(std::io::Error::other(format!(
"Failed to parse TOML: {}",
e
)))
})?
};
config.validate()?;
Ok(config)
}
@ -149,6 +191,34 @@ impl WorkflowsConfig {
}
}
/// Expand every `${VAR}` / `${VAR:-default}` reference in `content`.
/// Unresolved vars without a default are replaced with an empty string.
fn interpolate_env_vars(content: &str) -> String {
let mut result = String::with_capacity(content.len());
let mut remaining = content;
while let Some(start) = remaining.find("${") {
result.push_str(&remaining[..start]);
let after_open = &remaining[start + 2..];
if let Some(close) = after_open.find('}') {
let var_expr = &after_open[..close];
let value = if let Some(sep) = var_expr.find(":-") {
let var_name = &var_expr[..sep];
let default_val = &var_expr[sep + 2..];
std::env::var(var_name).unwrap_or_else(|_| default_val.to_string())
} else {
std::env::var(var_expr).unwrap_or_default()
};
result.push_str(&value);
remaining = &after_open[close + 1..];
} else {
result.push_str("${");
remaining = after_open;
}
}
result.push_str(remaining);
result
}
fn validate_schedule_config(
workflow_name: &str,
schedule: &Option<ScheduleConfig>,

View file

@ -68,8 +68,8 @@ pub enum ConfigError {
#[error("Failed to read config file: {0}")]
IoError(#[from] std::io::Error),
#[error("Failed to parse TOML: {0}")]
Parse(#[from] toml::de::Error),
#[error("Failed to parse config: {0}")]
ParseJson(#[from] serde_json::Error),
#[error("Invalid configuration: {0}")]
Invalid(String),

View file

@ -0,0 +1,222 @@
# ADR-0040: Agent Hot-Reload — Stable Identity and Zero-Downtime Config Reload
**Status**: Implemented
**Date**: 2026-03-02
**Deciders**: VAPORA Team
**Technical Story**: `AgentMetadata::id` was a `Uuid::new_v4()` generated at startup. `learning_profiles` in `AgentCoordinator` and execution records in `KGPersistence` used this UUID as the key. Every process restart or SIGHUP reload rotated all UUIDs, orphaning accumulated expertise profiles and resetting the learning system to zero.
---
## Decision
Introduce `stable_id: String` on `AgentMetadata`, computed as `role.clone()` at construction time. Switch all learning profile keys and KG execution records from the ephemeral `id` (UUID) to `stable_id`. Add hot-reload mechanics — SIGHUP handler and `POST /reload` endpoint — that drain and re-spawn executors while leaving `learning_profiles` untouched.
---
## Context
### The Identity Problem
Before this change, every agent had two implicit identities that were conflated into one field:
| Identity | Purpose | Lifecycle |
|----------|---------|-----------|
| Instance ID (`id`) | Sender handle in `executor_channels`, registry key | Ephemeral — dies with the process or on reload |
| Profile ID | Key for `learning_profiles` and KG records | Must survive restarts to preserve learning |
Using `Uuid::new_v4()` for both meant any reload (SIGHUP, restart, crash recovery) threw away all accumulated expertise. An agent that had processed 500 coding tasks and learned optimal patterns would start from zero on the next deploy.
### Why `role` as stable_id
VAPORA's architecture already partitions learning at the role level: `AgentScoringService::rank_agents` accepts `Vec<(agent_id, Option<LearningProfile>)>` where multiple agents of the same role compete for a task. The profile that matters for selection is role-level expertise (how well the "developer" role handles "coding" tasks), not per-instance expertise. Using `role` as the stable key:
- Is deterministic across restarts
- Aggregates learning across all instances of the same role
- Requires no additional persistence (no UUID→role mapping table)
- Degrades gracefully: legacy-deserialized records with empty `stable_id` fall back to `role` via `stable_id_or_role()`
---
## Implementation
### `AgentMetadata` (registry.rs)
```rust
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentMetadata {
pub id: String, // Uuid::new_v4() — ephemeral, per-instance
#[serde(default)]
pub stable_id: String, // role.clone() — persistent across restarts
pub role: String,
// ...
}
impl AgentMetadata {
pub fn new(role: String, ...) -> Self {
Self {
id: Uuid::new_v4().to_string(),
stable_id: role.clone(), // set before role is moved
role,
// ...
}
}
pub fn stable_id_or_role(&self) -> &str {
if self.stable_id.is_empty() { &self.role } else { &self.stable_id }
}
}
```
### `AgentRegistry::drain_role` (registry.rs)
Removes all agents for a role from the `agents` map and clears `running_count`. This allows immediate re-registration after drain without hitting `MaxAgentsReached`.
```rust
pub fn drain_role(&self, role: &str) -> Vec<String> {
let mut inner = self.inner.write().expect("registry write lock");
let ids: Vec<String> = inner.agents.values()
.filter(|a| a.role == role)
.map(|a| a.id.clone())
.collect();
for id in &ids { inner.agents.remove(id); }
inner.running_count.remove(role);
ids
}
```
### `AgentCoordinator::drain_role` (coordinator.rs)
Delegates to `registry.drain_role`, then removes the corresponding `Sender` entries from `executor_channels`. Dropping the `Sender` closes the mpsc channel; the executor's `while let Some(task) = rx.recv().await` loop exits after draining any buffered messages — no explicit shutdown signal required.
```rust
pub fn drain_role(&self, role: &str) -> Vec<String> {
let ids = self.registry.drain_role(role);
for id in &ids {
self.executor_channels.remove(id);
}
ids
}
```
`learning_profiles` is keyed by `stable_id` (= role) and is **not** touched during drain. New executor instances spawned after reload inherit accumulated expertise immediately.
### Profile lookup (coordinator.rs)
```rust
// assign_task — before:
.map(|a| (a.id.clone(), profiles.get(&a.id).cloned()))
// assign_task — after:
.map(|a| {
let key = a.stable_id_or_role();
(a.id.clone(), profiles.get(key).cloned())
})
```
### Hot-reload entry points (server.rs)
Two entry points invoke the same `reload_agents` function:
```rust
// SIGHUP
while sighup.recv().await.is_some() {
handle_sighup_reload(&state, &registry).await;
}
// REST
.route("/reload", axum::routing::post(reload_handler))
```
`reload_agents` sequence:
1. `registry.list_roles()` → drain each role via `coordinator.drain_role`
2. Re-spawn capability executors from `CapabilityRegistry`
3. Re-spawn config agents not covered by capabilities
4. Return `registry.total_count()`
---
## Availability Window
`reload_agents` drains all roles before re-spawning. During the window between the last drain and the first successful `register_agent`, `assign_task` for those roles returns `CoordinatorError::NoAvailableAgent`. This window is typically sub-millisecond on the same thread, but callers must handle this error and retry.
This is a deliberate trade-off: atomic swap-in of new executors would require a blue-green registry pattern, adding significant complexity for a latency window that is orders of magnitude shorter than any typical LLM call (which takes 500ms30s).
---
## Out of Scope
- **BudgetManager reload**: budget limit changes require process restart. The `BudgetManager` is constructed once from config in `main()` and stored in `AppState`. Adding reload support requires either a `RwLock<BudgetConfig>` wrapper or rebuilding the manager and swapping it in `AppState` under a lock.
- **LLMRouter reload**: provider API key changes require restart for the same reason.
---
## Alternatives Considered
### UUID + external persistence of UUID→role mapping
Would preserve per-instance identity. Rejected: adds a SurrealDB table (UUID→role) that must be kept in sync across restarts, adds a lookup on every `assign_task`, and provides no additional value since role-level profiles already capture collective expertise.
### Blue-green registry swap
Two `AgentRegistry` instances: old one drains while new one accepts assignments. Rejected: requires `AgentCoordinator` to hold `Arc<RwLock<Arc<AgentRegistry>>>` and all call sites to acquire the inner lock on every call. Complexity disproportionate to the gain (sub-millisecond → zero gap).
### Versioned stable_id (e.g., `developer-v2`)
For breaking role renames. Rejected: out of scope; role renames already require explicit operator action.
---
## Trade-offs
**Pros**:
- Learning profiles survive indefinitely across restarts and hot-reloads
- SIGHUP and `POST /reload` provide two operator-friendly reload paths
- `stable_id_or_role()` fallback ensures backward compatibility with persisted data that predates this change
- `drain_role` cleans up cleanly: no stale executor channels, no MaxAgentsReached on re-register
**Cons**:
- All agents of the same role share one learning profile. Per-instance specialization (e.g., "this specific GPU node is faster at inference") is not representable. Acceptable: VAPORA's role model deliberately treats same-role agents as interchangeable for task routing purposes.
- Brief `NoAvailableAgent` window during reload (see Availability Window above).
- BudgetManager and LLMRouter not reloadable without restart.
---
## Verification
```bash
cargo test -p vapora-agents test_stable_id_deterministic
cargo test -p vapora-agents test_drain_role
cargo test -p vapora-agents test_profile_survives_role_drain
cargo test -p vapora-agents test_list_roles
# Hot-reload via signal
kill -HUP $(pgrep vapora-agents)
# Hot-reload via REST
curl -s -X POST http://localhost:9000/reload | jq .
# Expected: {"reloaded": true, "agents": N}
cargo clippy -p vapora-agents -- -D warnings
```
---
## Consequences
- `AgentMetadata` gains a new field `stable_id` with `#[serde(default)]`. Existing serialized records deserialize cleanly; `stable_id_or_role()` falls back to `role`.
- KG execution records (the `agent_id` field in SurrealDB) now store `stable_id` (= role) instead of a UUID. Existing records with UUID keys remain in the database but are no longer updated; they can be cleaned up with a migration if needed.
- ADR-0014 (Learning Profiles) and ADR-0015 (Budget Enforcement) are unaffected at the API level; only the internal key used to look up profiles changes.
---
## References
- [ADR-0014 — Learning Profiles](./0014-learning-profiles.md)
- [ADR-0015 — Budget Enforcement](./0015-budget-enforcement.md)
- [ADR-0026 — Arc-Based Shared State](./0026-shared-state.md)
- `crates/vapora-agents/src/registry.rs``AgentMetadata`, `drain_role`, `list_roles`
- `crates/vapora-agents/src/coordinator.rs``drain_role`, `registry_arc`, profile lookup
- `crates/vapora-agents/src/bin/server.rs``reload_agents`, SIGHUP handler, `/reload` endpoint

View file

@ -2,8 +2,8 @@
Documentación de las decisiones arquitectónicas clave del proyecto VAPORA.
**Status**: Complete (39 ADRs documented)
**Last Updated**: 2026-02-26
**Status**: Complete (40 ADRs documented)
**Last Updated**: 2026-03-02
**Format**: Custom VAPORA (Decision, Rationale, Alternatives, Trade-offs, Implementation, Verification, Consequences)
---
@ -37,7 +37,7 @@ Decisiones fundamentales sobre el stack tecnológico y estructura base del proye
---
## 🔄 Agent Coordination & Messaging (5 ADRs)
## 🔄 Agent Coordination & Messaging (6 ADRs)
Decisiones sobre coordinación entre agentes y comunicación de mensajes.
@ -48,6 +48,7 @@ Decisiones sobre coordinación entre agentes y comunicación de mensajes.
| [030](./0030-a2a-protocol-implementation.md) | A2A Protocol Implementation | Axum JSON-RPC 2.0 server + resilient client con exponential backoff | ✅ Implemented |
| [031](./0031-kubernetes-deployment-kagent.md) | Kubernetes Deployment Strategy para kagent | Kustomize + StatefulSet con overlays dev/prod | ✅ Accepted |
| [032](./0032-a2a-error-handling-json-rpc.md) | A2A Error Handling y JSON-RPC 2.0 Compliance | Two-layer: thiserror domain errors + JSON-RPC 2.0 protocol conversion | ✅ Implemented |
| [040](./0040-agent-hot-reload-stable-identity.md) | Agent Hot-Reload — Stable Identity and Zero-Downtime Config Reload | `stable_id = role` as persistent profile key; SIGHUP + `POST /reload` drain-and-respawn without learning loss | ✅ Implemented |
---
@ -126,6 +127,7 @@ Patrones de desarrollo y arquitectura utilizados en todo el codebase.
- **A2A Protocol**: JSON-RPC 2.0 over HTTP enables interoperability with Google kagent and other A2A-compliant agents
- **kagent Kubernetes Deployment**: Kustomize StatefulSet with stable pod identities for predictable A2A endpoint addressing
- **A2A Error Handling**: Two-layer strategy (domain `thiserror` + JSON-RPC 2.0 protocol conversion) specializes ADR-0022 for A2A
- **Agent Hot-Reload**: `stable_id = role` decouples ephemeral instance identity from persistent profile key; SIGHUP and `POST /reload` drain executors while preserving all learning profiles
### ☁️ Infrastructure & Security
@ -270,7 +272,7 @@ Each ADR follows the Custom VAPORA format:
## Statistics
- **Total ADRs**: 38
- **Total ADRs**: 40
- **Core Architecture**: 13 (41%)
- **Agent Coordination**: 5 (16%)
- **Infrastructure**: 4 (12%)
@ -291,4 +293,4 @@ Each ADR follows the Custom VAPORA format:
**Generated**: January 12, 2026
**Status**: Production-Ready
**Last Reviewed**: 2026-02-17
**Last Reviewed**: 2026-03-02

View file

@ -0,0 +1,18 @@
{
RegistryConfig = {
max_agents_per_role | Number,
health_check_interval | Number,
agent_timeout | Number,
},
AgentDefinition = {
role | String,
description | String,
llm_provider | String,
llm_model | String,
parallelizable | Bool,
priority | Number,
capabilities | Array String,
system_prompt | String | optional,
},
}

View file

@ -0,0 +1,20 @@
let AlertThreshold = std.contract.custom (
fun label =>
fun value =>
if value >= 0.0 && value <= 1.0 then
'Ok value
else
'Error {
message = "Invalid alert_threshold '%{std.to_string value}'.\nValid range: 0.0 - 1.0"
}
) in
{
RoleBudget = {
role | String,
monthly_limit_cents | Number,
weekly_limit_cents | Number,
fallback_provider | String,
alert_threshold | AlertThreshold,
},
}

View file

@ -0,0 +1,27 @@
{
SlackConfig = {
type | String,
webhook_url | String,
channel | String | optional,
username | String | optional,
},
DiscordConfig = {
type | String,
webhook_url | String,
username | String | optional,
avatar_url | String | optional,
},
TelegramConfig = {
type | String,
bot_token | String,
chat_id | String,
api_base | String | optional,
},
ChannelEntry = {
type | String,
..
},
}

View file

@ -0,0 +1,25 @@
{
RoutingConfig = {
default_provider | String,
cost_tracking_enabled | Bool,
fallback_enabled | Bool,
},
ProviderConfig = {
enabled | Bool,
api_key | String | optional,
url | String | optional,
model | String,
max_tokens | Number,
temperature | Number,
cost_per_1m_input | Number,
cost_per_1m_output | Number,
},
RoutingRule = {
name | String,
condition | { .. },
provider | String,
model_override | String | optional,
},
}

View file

@ -0,0 +1,68 @@
let LogLevel = std.contract.custom (
fun label =>
fun value =>
let valid = ["trace", "debug", "info", "warn", "error"] in
if std.array.any (fun x => x == value) valid then
'Ok value
else
'Error {
message = "Invalid log_level '%{value}'.\nValid values: trace | debug | info | warn | error"
}
) in
let Port = std.contract.custom (
fun label =>
fun value =>
if value >= 1 && value <= 65535 then
'Ok value
else
'Error {
message = "Invalid port '%{std.to_string value}'.\nValid range: 1 - 65535"
}
) in
{
TlsConfig = {
enabled | Bool,
cert_path | String,
key_path | String,
},
ServerConfig = {
host | String,
port | Port,
tls | TlsConfig,
},
DatabaseConfig = {
url | String,
max_connections | Number,
},
NatsConfig = {
url | String,
stream_name | String,
},
AuthConfig = {
jwt_secret | String,
jwt_expiration_hours | Number,
},
LoggingConfig = {
level | LogLevel,
json | Bool,
},
MetricsConfig = {
enabled | Bool,
port | Port,
},
NotificationConfig = {
on_task_done | Array String | default = [],
on_proposal_approved | Array String | default = [],
on_proposal_rejected | Array String | default = [],
on_agent_inactive | Array String | default = [],
},
}

View file

@ -0,0 +1,30 @@
{
EngineConfig = {
max_parallel_tasks | Number,
workflow_timeout | Number,
approval_gates_enabled | Bool,
cedar_policy_dir | String | optional,
},
ScheduleConfig = {
cron | String,
timezone | String | optional,
allow_concurrent | Bool,
catch_up | Bool,
},
WorkflowNotifications = {
on_completed | Array String | default = [],
on_failed | Array String | default = [],
on_approval_required | Array String | default = [],
},
StageConfig = {
name | String,
agents | Array String,
parallel | Bool | default = false,
max_parallel | Number | optional,
approval_required | Bool | default = false,
compensation_agents | Array String | optional,
},
}