diff --git a/CHANGELOG.md b/CHANGELOG.md index 37b305f..783e684 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,39 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added - Agent Hot-Reload: Stable Identity and Zero-Downtime Config Reload + +#### `vapora-agents` — stable_id, drain/respawn, SIGHUP, REST endpoint + +- **`AgentMetadata::stable_id`** (`registry.rs`): New `#[serde(default)]` field computed as `role.clone()` at construction, before the `role` field is moved. `stable_id_or_role()` helper returns `stable_id` if non-empty, otherwise falls back to `role` for backward compatibility with serialized records that predate this change. +- **Profile key switch** (`coordinator.rs`): `assign_task` and `load_all_learning_profiles` now look up `learning_profiles` by `stable_id_or_role()` instead of the ephemeral UUID `id`. Learning expertise accumulated across any number of restarts or hot-reloads is no longer orphaned. +- **KG execution records** (`executor.rs`): The `agent_id` field written to SurrealDB on task completion now uses `stable_id_or_role()` (= role) instead of the per-instance UUID. Execution history is partitioned by role, consistent with how profiles are keyed. +- **`AgentRegistry::drain_role`** (`registry.rs`): Removes all agents for a given role from the registry and clears `running_count`, enabling immediate re-registration without hitting `MaxAgentsReached`. +- **`AgentRegistry::list_roles`** (`registry.rs`): Returns the sorted list of distinct roles currently registered. +- **`AgentCoordinator::drain_role`** (`coordinator.rs`): Calls `registry.drain_role`, then drops the corresponding `Sender` entries from `executor_channels`. Dropping the `Sender` closes the mpsc channel; each executor's `while let Some(task) = rx.recv().await` loop exits after draining buffered messages — no explicit shutdown signal needed. `learning_profiles` is untouched. +- **`AgentCoordinator::registry_arc`** (`coordinator.rs`): New accessor returning `Arc`, used by the `/reload` endpoint to obtain the registry without exposing coordinator internals. +- **`reload_agents`** (`server.rs`): Async function that (1) drains all active roles, (2) re-spawns capability executors from `CapabilityRegistry`, (3) re-spawns config agents not covered by capabilities, (4) returns the new total agent count. Learning profiles survive the entire sequence. +- **SIGHUP handler** (`server.rs`): `tokio::signal::unix::signal(SignalKind::hangup())` drives a `while sighup.recv().await.is_some()` loop calling `handle_sighup_reload`. The `is_some()` guard prevents a spin-loop if the signal stream closes during runtime shutdown. +- **`POST /reload` endpoint** (`server.rs`): HTTP entry point for operators and CI pipelines. Loads a fresh config from `config_path`, calls `reload_agents`, returns `{"reloaded": true, "agents": N}` on success or `500` with error detail on failure. +- **Availability window documented**: `reload_agents` has a brief window (typically sub-millisecond) between drain and re-registration during which `assign_task` returns `NoAvailableAgent`. Callers must handle and retry. `BudgetManager` and `LLMRouter` are not reloaded; changes to those require a process restart. + +#### Tests added (`vapora-agents`) + +- `test_stable_id_deterministic` — two `AgentMetadata::new("developer", ...)` produce distinct `id` but identical `stable_id = "developer"` +- `test_stable_id_or_role_fallback` — empty `stable_id` falls back to `role` +- `test_drain_role` — after `drain_role("developer")`, reviewer agents persist and developer count is zero +- `test_list_roles` — returns correct sorted list after mixed-role registrations +- `test_profile_survives_role_drain` — `get_learning_profile("developer")` returns `Some` after `drain_role("developer")` + +#### Clippy fixes (pre-existing, unblocked by this change) + +- `vapora-workflow-engine/src/config.rs`: 4× `std::io::Error::new(ErrorKind::Other, ...)` → `std::io::Error::other(...)` +- `vapora-llm-router/src/budget.rs`: 4× same pattern +- `vapora-llm-router/src/config.rs`: 3× same pattern +- `vapora-agents/src/config.rs`: 4× same pattern + +--- + ### Fixed - Stub Elimination: Real Implementations for 6 Hollow Integration Points #### `vapora-backend` — WorkflowOrchestrator and WorkflowService wiring diff --git a/README.md b/README.md index 5ee4546..321d4a3 100644 --- a/README.md +++ b/README.md @@ -99,7 +99,15 @@ - **Workflow events**: `on_stage_complete`, `on_stage_failed`, `on_completed`, `on_cancelled` — per-workflow routing config - **REST API**: `GET /api/v1/channels` (list), `POST /api/v1/channels/:name/test` (connectivity check) -### 🧠 Intelligent Learning & Cost Optimization (Phase 5.3 + 5.4) +### ♻️ Agent Hot-Reload — Zero Config-Rotation Learning Loss + +- **`stable_id` identity**: Each agent carries a `stable_id = role` that is deterministic across restarts. Learning profiles and KG execution records are keyed by `stable_id`, not the ephemeral UUID, so accumulated expertise survives every reload. +- **SIGHUP reload**: `kill -HUP $(pgrep vapora-agents)` drains all executors and re-spawns them from the updated config without restarting the process. +- **`POST /reload` endpoint**: HTTP entry point for CI pipelines and operators. Returns `{"reloaded": true, "agents": N}` on success. +- **Graceful drain**: Dropping the executor `Sender` closes the mpsc channel; in-flight messages are drained before the executor exits — no tasks are lost during reload. +- **Profile persistence**: `learning_profiles` (keyed by `stable_id`) is never touched during drain/respawn. New executor instances inherit accumulated expertise immediately. + +### 🧠 Intelligent Learning & Cost Optimization - **Per-Task-Type Learning**: Agents build expertise profiles from execution history - **Recency Bias**: Recent performance weighted 3x (last 7 days) for adaptive selection diff --git a/config/agent-budgets.toml b/config/agent-budgets.toml deleted file mode 100644 index a6d5e9d..0000000 --- a/config/agent-budgets.toml +++ /dev/null @@ -1,39 +0,0 @@ -# Agent Role Budget Configuration -# Defines monthly and weekly spending limits per agent role -# Budget enforcement prevents runaway LLM costs -# Fallback providers used when budget thresholds exceeded - -[budgets.architect] -role = "architect" -monthly_limit_cents = 50000 # $500/month -weekly_limit_cents = 12500 # $125/week -fallback_provider = "gemini" # Cheaper alternative when budget hit -alert_threshold = 0.8 # Alert at 80% utilization - -[budgets.developer] -role = "developer" -monthly_limit_cents = 30000 # $300/month -weekly_limit_cents = 7500 # $75/week -fallback_provider = "ollama" # Free local model -alert_threshold = 0.8 - -[budgets.reviewer] -role = "reviewer" -monthly_limit_cents = 20000 # $200/month -weekly_limit_cents = 5000 # $50/week -fallback_provider = "gemini" -alert_threshold = 0.8 - -[budgets.documenter] -role = "documenter" -monthly_limit_cents = 15000 # $150/month -weekly_limit_cents = 3750 # $37.50/week -fallback_provider = "ollama" -alert_threshold = 0.8 - -[budgets.tester] -role = "tester" -monthly_limit_cents = 25000 # $250/month -weekly_limit_cents = 6250 # $62.50/week -fallback_provider = "ollama" -alert_threshold = 0.8 diff --git a/config/agents.ncl b/config/agents.ncl new file mode 100644 index 0000000..cd994a5 --- /dev/null +++ b/config/agents.ncl @@ -0,0 +1,120 @@ +let C = import "../nickel/agents/contracts.ncl" in + +{ + registry | C.RegistryConfig = { + max_agents_per_role = 5, + health_check_interval = 30, + agent_timeout = 300, + }, + + agents | Array C.AgentDefinition = [ + { + role = "architect", + description = "System design, architecture decisions, ADRs", + llm_provider = "claude", + llm_model = "claude-opus-4-20250514", + parallelizable = false, + priority = 100, + capabilities = ["system_design", "architecture", "adr", "patterns"], + }, + { + role = "developer", + description = "Code implementation, feature development", + llm_provider = "claude", + llm_model = "claude-sonnet-4-5-20250929", + parallelizable = true, + priority = 80, + capabilities = ["coding", "implementation", "debugging"], + }, + { + role = "code_reviewer", + description = "Code quality assurance, style checking", + llm_provider = "claude", + llm_model = "claude-sonnet-4-5-20250929", + parallelizable = true, + priority = 70, + capabilities = ["code_review", "quality", "best_practices"], + }, + { + role = "tester", + description = "Tests, benchmarks, quality validation", + llm_provider = "claude", + llm_model = "claude-sonnet-4-5-20250929", + parallelizable = true, + priority = 75, + capabilities = ["testing", "benchmarks", "validation"], + }, + { + role = "documenter", + description = "Documentation, root files (README, CHANGELOG)", + llm_provider = "openai", + llm_model = "gpt-4o", + parallelizable = true, + priority = 60, + capabilities = ["documentation", "readme", "changelog", "guides"], + }, + { + role = "marketer", + description = "Marketing content, announcements", + llm_provider = "claude", + llm_model = "claude-sonnet-4-5-20250929", + parallelizable = true, + priority = 40, + capabilities = ["marketing", "content", "announcements"], + }, + { + role = "presenter", + description = "Presentations, slides, demos", + llm_provider = "claude", + llm_model = "claude-sonnet-4-5-20250929", + parallelizable = false, + priority = 50, + capabilities = ["presentations", "slides", "demos"], + }, + { + role = "devops", + description = "CI/CD, deployment, infrastructure", + llm_provider = "claude", + llm_model = "claude-sonnet-4-5-20250929", + parallelizable = true, + priority = 85, + capabilities = ["cicd", "deployment", "kubernetes", "infrastructure"], + }, + { + role = "monitor", + description = "System health, alerting, observability", + llm_provider = "gemini", + llm_model = "gemini-2.0-flash", + parallelizable = false, + priority = 90, + capabilities = ["monitoring", "health", "alerts", "metrics"], + }, + { + role = "security", + description = "Security audit, vulnerability detection", + llm_provider = "claude", + llm_model = "claude-opus-4-20250514", + parallelizable = true, + priority = 95, + capabilities = ["security", "audit", "vulnerabilities"], + }, + { + role = "project_manager", + description = "Roadmap, task tracking, coordination", + llm_provider = "claude", + llm_model = "claude-sonnet-4-5-20250929", + parallelizable = false, + priority = 65, + capabilities = ["planning", "tracking", "coordination"], + }, + { + role = "decision_maker", + description = "Conflict resolution, strategic decisions", + llm_provider = "claude", + llm_model = "claude-opus-4-20250514", + parallelizable = false, + priority = 100, + capabilities = ["decisions", "conflict_resolution", "strategy"], + }, + ], +} diff --git a/config/agents.toml b/config/agents.toml deleted file mode 100644 index c115dc7..0000000 --- a/config/agents.toml +++ /dev/null @@ -1,122 +0,0 @@ -# Agent Registry Configuration -# Phase 0: Definition of 12 agent roles - -[registry] -# Maximum number of concurrent agents per role -max_agents_per_role = 5 - -# Agent health check interval (seconds) -health_check_interval = 30 - -# Agent timeout (seconds) -agent_timeout = 300 - -# The 12 Agent Roles - -[[agents]] -role = "architect" -description = "System design, architecture decisions, ADRs" -llm_provider = "claude" -llm_model = "claude-opus-4-20250514" -parallelizable = false -priority = 100 -capabilities = ["system_design", "architecture", "adr", "patterns"] - -[[agents]] -role = "developer" -description = "Code implementation, feature development" -llm_provider = "claude" -llm_model = "claude-sonnet-4-5-20250929" -parallelizable = true -priority = 80 -capabilities = ["coding", "implementation", "debugging"] - -[[agents]] -role = "code_reviewer" -description = "Code quality assurance, style checking" -llm_provider = "claude" -llm_model = "claude-sonnet-4-5-20250929" -parallelizable = true -priority = 70 -capabilities = ["code_review", "quality", "best_practices"] - -[[agents]] -role = "tester" -description = "Tests, benchmarks, quality validation" -llm_provider = "claude" -llm_model = "claude-sonnet-4-5-20250929" -parallelizable = true -priority = 75 -capabilities = ["testing", "benchmarks", "validation"] - -[[agents]] -role = "documenter" -description = "Documentation, root files (README, CHANGELOG)" -llm_provider = "openai" -llm_model = "gpt-4o" -parallelizable = true -priority = 60 -capabilities = ["documentation", "readme", "changelog", "guides"] - -[[agents]] -role = "marketer" -description = "Marketing content, announcements" -llm_provider = "claude" -llm_model = "claude-sonnet-4-5-20250929" -parallelizable = true -priority = 40 -capabilities = ["marketing", "content", "announcements"] - -[[agents]] -role = "presenter" -description = "Presentations, slides, demos" -llm_provider = "claude" -llm_model = "claude-sonnet-4-5-20250929" -parallelizable = false -priority = 50 -capabilities = ["presentations", "slides", "demos"] - -[[agents]] -role = "devops" -description = "CI/CD, deployment, infrastructure" -llm_provider = "claude" -llm_model = "claude-sonnet-4-5-20250929" -parallelizable = true -priority = 85 -capabilities = ["cicd", "deployment", "kubernetes", "infrastructure"] - -[[agents]] -role = "monitor" -description = "System health, alerting, observability" -llm_provider = "gemini" -llm_model = "gemini-2.0-flash" -parallelizable = false -priority = 90 -capabilities = ["monitoring", "health", "alerts", "metrics"] - -[[agents]] -role = "security" -description = "Security audit, vulnerability detection" -llm_provider = "claude" -llm_model = "claude-opus-4-20250514" -parallelizable = true -priority = 95 -capabilities = ["security", "audit", "vulnerabilities"] - -[[agents]] -role = "project_manager" -description = "Roadmap, task tracking, coordination" -llm_provider = "claude" -llm_model = "claude-sonnet-4-5-20250929" -parallelizable = false -priority = 65 -capabilities = ["planning", "tracking", "coordination"] - -[[agents]] -role = "decision_maker" -description = "Conflict resolution, strategic decisions" -llm_provider = "claude" -llm_model = "claude-opus-4-20250514" -parallelizable = false -priority = 100 -capabilities = ["decisions", "conflict_resolution", "strategy"] diff --git a/config/budgets.ncl b/config/budgets.ncl new file mode 100644 index 0000000..9dd635c --- /dev/null +++ b/config/budgets.ncl @@ -0,0 +1,45 @@ +let C = import "../nickel/budgets/contracts.ncl" in + +{ + budgets = { + architect | C.RoleBudget = { + role = "architect", + monthly_limit_cents = 50000, + weekly_limit_cents = 12500, + fallback_provider = "gemini", + alert_threshold = 0.8, + }, + + developer | C.RoleBudget = { + role = "developer", + monthly_limit_cents = 30000, + weekly_limit_cents = 7500, + fallback_provider = "ollama", + alert_threshold = 0.8, + }, + + reviewer | C.RoleBudget = { + role = "reviewer", + monthly_limit_cents = 20000, + weekly_limit_cents = 5000, + fallback_provider = "gemini", + alert_threshold = 0.8, + }, + + documenter | C.RoleBudget = { + role = "documenter", + monthly_limit_cents = 15000, + weekly_limit_cents = 3750, + fallback_provider = "ollama", + alert_threshold = 0.8, + }, + + tester | C.RoleBudget = { + role = "tester", + monthly_limit_cents = 25000, + weekly_limit_cents = 6250, + fallback_provider = "ollama", + alert_threshold = 0.8, + }, + }, +} diff --git a/config/channels.ncl b/config/channels.ncl new file mode 100644 index 0000000..bcc71df --- /dev/null +++ b/config/channels.ncl @@ -0,0 +1,10 @@ +{ + channels = {}, + + notifications = { + on_task_done = [], + on_proposal_approved = [], + on_proposal_rejected = [], + on_agent_inactive = [], + }, +} diff --git a/config/config.ncl b/config/config.ncl new file mode 100644 index 0000000..99feead --- /dev/null +++ b/config/config.ncl @@ -0,0 +1,6 @@ + (import "./server.ncl") +& (import "./agents.ncl") +& (import "./llm-router.ncl") +& (import "./budgets.ncl") +& (import "./workflows.ncl") +& (import "./channels.ncl") diff --git a/config/llm-router.ncl b/config/llm-router.ncl new file mode 100644 index 0000000..bc60d25 --- /dev/null +++ b/config/llm-router.ncl @@ -0,0 +1,80 @@ +let C = import "../nickel/llm-router/contracts.ncl" in + +{ + routing | C.RoutingConfig = { + default_provider = "claude", + cost_tracking_enabled = true, + fallback_enabled = true, + }, + + providers = { + claude | C.ProviderConfig = { + enabled = true, + api_key = "${ANTHROPIC_API_KEY}", + model = "claude-sonnet-4-5-20250929", + max_tokens = 8192, + temperature = 0.7, + cost_per_1m_input = 3.00, + cost_per_1m_output = 15.00, + }, + + openai | C.ProviderConfig = { + enabled = true, + api_key = "${OPENAI_API_KEY}", + model = "gpt-4o", + max_tokens = 4096, + temperature = 0.7, + cost_per_1m_input = 2.50, + cost_per_1m_output = 10.00, + }, + + gemini | C.ProviderConfig = { + enabled = true, + api_key = "${GOOGLE_API_KEY}", + model = "gemini-2.0-flash", + max_tokens = 8192, + temperature = 0.7, + cost_per_1m_input = 0.30, + cost_per_1m_output = 1.20, + }, + + ollama | C.ProviderConfig = { + enabled = true, + url = "${OLLAMA_URL:-http://localhost:11434}", + model = "llama3.2", + max_tokens = 4096, + temperature = 0.7, + cost_per_1m_input = 0.00, + cost_per_1m_output = 0.00, + }, + }, + + routing_rules | Array C.RoutingRule = [ + { + name = "architecture_design", + condition = { task_type = "architecture" }, + provider = "claude", + model_override = "claude-opus-4-20250514", + }, + { + name = "code_generation", + condition = { task_type = "development" }, + provider = "claude", + }, + { + name = "documentation", + condition = { task_type = "documentation" }, + provider = "openai", + }, + { + name = "monitoring", + condition = { task_type = "monitoring" }, + provider = "gemini", + }, + { + name = "local_testing", + condition = { environment = "development" }, + provider = "ollama", + }, + ], +} diff --git a/config/llm-router.toml b/config/llm-router.toml deleted file mode 100644 index 8f02ba5..0000000 --- a/config/llm-router.toml +++ /dev/null @@ -1,87 +0,0 @@ -# Multi-IA Router Configuration -# Phase 0: Configuration for LLM provider selection - -[routing] -# Default provider if no specific routing rules match -default_provider = "claude" - -# Enable cost tracking -cost_tracking_enabled = true - -# Enable fallback on provider failure -fallback_enabled = true - -[providers.claude] -enabled = true -# ANTHROPIC_API_KEY environment variable required -api_key = "${ANTHROPIC_API_KEY}" -model = "claude-sonnet-4-5-20250929" -max_tokens = 8192 -temperature = 0.7 - -# Cost per 1M tokens (input/output) -cost_per_1m_input = 3.00 -cost_per_1m_output = 15.00 - -[providers.openai] -enabled = true -# OPENAI_API_KEY environment variable required -api_key = "${OPENAI_API_KEY}" -model = "gpt-4o" -max_tokens = 4096 -temperature = 0.7 - -# Cost per 1M tokens (input/output) -cost_per_1m_input = 2.50 -cost_per_1m_output = 10.00 - -[providers.gemini] -enabled = true -# GOOGLE_API_KEY environment variable required -api_key = "${GOOGLE_API_KEY}" -model = "gemini-2.0-flash" -max_tokens = 8192 -temperature = 0.7 - -# Cost per 1M tokens (input/output) -cost_per_1m_input = 0.30 -cost_per_1m_output = 1.20 - -[providers.ollama] -enabled = true -# Local Ollama instance, no API key needed -url = "${OLLAMA_URL:-http://localhost:11434}" -model = "llama3.2" -max_tokens = 4096 -temperature = 0.7 - -# No cost for local models -cost_per_1m_input = 0.00 -cost_per_1m_output = 0.00 - -# Routing rules: assign providers based on task characteristics -[[routing_rules]] -name = "architecture_design" -condition = { task_type = "architecture" } -provider = "claude" -model_override = "claude-opus-4-20250514" - -[[routing_rules]] -name = "code_generation" -condition = { task_type = "development" } -provider = "claude" - -[[routing_rules]] -name = "documentation" -condition = { task_type = "documentation" } -provider = "openai" - -[[routing_rules]] -name = "monitoring" -condition = { task_type = "monitoring" } -provider = "gemini" - -[[routing_rules]] -name = "local_testing" -condition = { environment = "development" } -provider = "ollama" diff --git a/config/server.ncl b/config/server.ncl new file mode 100644 index 0000000..c9b8d77 --- /dev/null +++ b/config/server.ncl @@ -0,0 +1,38 @@ +let C = import "../nickel/vapora/contracts.ncl" in + +{ + server | C.ServerConfig = { + host = "127.0.0.1", + port = 3000, + tls = { + enabled = false, + cert_path = "", + key_path = "", + }, + }, + + database | C.DatabaseConfig = { + url = "ws://localhost:8000", + max_connections = 10, + }, + + nats | C.NatsConfig = { + url = "nats://localhost:4222", + stream_name = "vapora-tasks", + }, + + auth | C.AuthConfig = { + jwt_secret = "change-in-production", + jwt_expiration_hours = 24, + }, + + logging | C.LoggingConfig = { + level = "info", + json = false, + }, + + metrics | C.MetricsConfig = { + enabled = true, + port = 9090, + }, +} diff --git a/config/vapora.toml b/config/vapora.toml deleted file mode 100644 index 3395377..0000000 --- a/config/vapora.toml +++ /dev/null @@ -1,40 +0,0 @@ -# VAPORA Server Configuration -# Phase 0: Environment-based configuration -# Note: Load runtime configuration from environment variables, not this file - -[server] -# Server configuration (override with env vars: VAPORA_HOST, VAPORA_PORT) -host = "127.0.0.1" -port = 3000 - -[server.tls] -# TLS configuration (optional) -# Override with: VAPORA_TLS_ENABLED, VAPORA_TLS_CERT_PATH, VAPORA_TLS_KEY_PATH -enabled = false -cert_path = "" -key_path = "" - -[database] -# Database connection (override with: VAPORA_DB_URL, VAPORA_DB_MAX_CONNECTIONS) -url = "ws://localhost:8000" -max_connections = 10 - -[nats] -# NATS JetStream configuration (override with: VAPORA_NATS_URL, VAPORA_NATS_STREAM) -url = "nats://localhost:4222" -stream_name = "vapora-tasks" - -[auth] -# Authentication configuration (override with: VAPORA_JWT_SECRET, VAPORA_JWT_EXPIRATION_HOURS) -jwt_secret = "change-in-production" -jwt_expiration_hours = 24 - -[logging] -# Logging configuration (override with: VAPORA_LOG_LEVEL, VAPORA_LOG_JSON) -level = "info" -json = false - -[metrics] -# Metrics configuration (override with: VAPORA_METRICS_ENABLED, VAPORA_METRICS_PORT) -enabled = true -port = 9090 diff --git a/config/workflows.ncl b/config/workflows.ncl new file mode 100644 index 0000000..8708b76 --- /dev/null +++ b/config/workflows.ncl @@ -0,0 +1,153 @@ +let C = import "../nickel/workflows/contracts.ncl" in + +{ + engine | C.EngineConfig = { + max_parallel_tasks = 10, + workflow_timeout = 3600, + approval_gates_enabled = true, + }, + + workflows = [ + { + name = "feature_development", + trigger = "manual", + stages | Array C.StageConfig = [ + { + name = "architecture_design", + agents = ["architect"], + parallel = false, + approval_required = false, + }, + { + name = "implementation", + agents = ["developer", "developer"], + parallel = true, + max_parallel = 2, + approval_required = false, + }, + { + name = "testing", + agents = ["tester"], + parallel = false, + approval_required = false, + }, + { + name = "code_review", + agents = ["reviewer"], + parallel = false, + approval_required = true, + }, + { + name = "deployment", + agents = ["devops"], + parallel = false, + approval_required = true, + }, + ], + notifications = { + on_completed = [], + on_failed = [], + on_approval_required = [], + }, + }, + { + name = "bugfix", + trigger = "manual", + stages | Array C.StageConfig = [ + { + name = "investigation", + agents = ["developer"], + parallel = false, + approval_required = false, + }, + { + name = "fix_implementation", + agents = ["developer"], + parallel = false, + approval_required = false, + }, + { + name = "testing", + agents = ["tester"], + parallel = false, + approval_required = false, + }, + { + name = "deployment", + agents = ["devops"], + parallel = false, + approval_required = false, + }, + ], + notifications = { + on_completed = [], + on_failed = [], + on_approval_required = [], + }, + }, + { + name = "documentation_update", + trigger = "manual", + stages | Array C.StageConfig = [ + { + name = "content_creation", + agents = ["technical_writer"], + parallel = false, + approval_required = false, + }, + { + name = "review", + agents = ["reviewer"], + parallel = false, + approval_required = true, + }, + { + name = "publish", + agents = ["devops"], + parallel = false, + approval_required = false, + }, + ], + notifications = { + on_completed = [], + on_failed = [], + on_approval_required = [], + }, + }, + { + name = "security_audit", + trigger = "manual", + stages | Array C.StageConfig = [ + { + name = "code_analysis", + agents = ["security_engineer"], + parallel = false, + approval_required = false, + }, + { + name = "penetration_testing", + agents = ["security_engineer"], + parallel = false, + approval_required = false, + }, + { + name = "remediation", + agents = ["developer"], + parallel = false, + approval_required = false, + }, + { + name = "verification", + agents = ["security_engineer"], + parallel = false, + approval_required = true, + }, + ], + notifications = { + on_completed = [], + on_failed = [], + on_approval_required = [], + }, + }, + ], +} diff --git a/config/workflows.toml b/config/workflows.toml deleted file mode 100644 index ca2849b..0000000 --- a/config/workflows.toml +++ /dev/null @@ -1,117 +0,0 @@ -[engine] -max_parallel_tasks = 10 -workflow_timeout = 3600 -approval_gates_enabled = true - -[[workflows]] -name = "feature_development" -trigger = "manual" - -[[workflows.stages]] -name = "architecture_design" -agents = ["architect"] -parallel = false -approval_required = false - -[[workflows.stages]] -name = "implementation" -agents = ["developer", "developer"] -parallel = true -max_parallel = 2 -approval_required = false - -[[workflows.stages]] -name = "testing" -agents = ["tester"] -parallel = false -approval_required = false - -[[workflows.stages]] -name = "code_review" -agents = ["reviewer"] -parallel = false -approval_required = true - -[[workflows.stages]] -name = "deployment" -agents = ["devops"] -parallel = false -approval_required = true - -[[workflows]] -name = "bugfix" -trigger = "manual" - -[[workflows.stages]] -name = "investigation" -agents = ["developer"] -parallel = false -approval_required = false - -[[workflows.stages]] -name = "fix_implementation" -agents = ["developer"] -parallel = false -approval_required = false - -[[workflows.stages]] -name = "testing" -agents = ["tester"] -parallel = false -approval_required = false - -[[workflows.stages]] -name = "deployment" -agents = ["devops"] -parallel = false -approval_required = false - -[[workflows]] -name = "documentation_update" -trigger = "manual" - -[[workflows.stages]] -name = "content_creation" -agents = ["technical_writer"] -parallel = false -approval_required = false - -[[workflows.stages]] -name = "review" -agents = ["reviewer"] -parallel = false -approval_required = true - -[[workflows.stages]] -name = "publish" -agents = ["devops"] -parallel = false -approval_required = false - -[[workflows]] -name = "security_audit" -trigger = "manual" - -[[workflows.stages]] -name = "code_analysis" -agents = ["security_engineer"] -parallel = false -approval_required = false - -[[workflows.stages]] -name = "penetration_testing" -agents = ["security_engineer"] -parallel = false -approval_required = false - -[[workflows.stages]] -name = "remediation" -agents = ["developer"] -parallel = false -approval_required = false - -[[workflows.stages]] -name = "verification" -agents = ["security_engineer"] -parallel = false -approval_required = true diff --git a/crates/vapora-agents/src/bin/server.rs b/crates/vapora-agents/src/bin/server.rs index bd8a4da..a5eb88a 100644 --- a/crates/vapora-agents/src/bin/server.rs +++ b/crates/vapora-agents/src/bin/server.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use std::sync::Arc; use anyhow::Result; -use axum::{extract::State, routing::get, Json, Router}; +use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::get, Json, Router}; use clap::Parser; use serde_json::json; use tokio::net::TcpListener; @@ -26,6 +26,12 @@ struct AppState { coordinator: Arc, #[allow(dead_code)] budget_manager: Option>, + /// Path to the agent config file, used for re-loading on hot-reload. + config_path: String, + /// LLM router shared across executors. + router: Option>, + /// Capability registry for re-spawning capability executors. + cap_registry: Arc, } #[derive(Parser, Debug)] @@ -41,11 +47,7 @@ struct AppState { )] struct Args { /// Path to budget configuration file - #[arg( - long, - default_value = "config/agent-budgets.toml", - env = "BUDGET_CONFIG_PATH" - )] + #[arg(long, default_value = "config/config.ncl", env = "BUDGET_CONFIG_PATH")] budget_config: String, } @@ -64,6 +66,8 @@ async fn main() -> Result<()> { // Load agent configuration let config = AgentConfig::from_env()?; + let config_path = + std::env::var("VAPORA_AGENT_CONFIG").unwrap_or_else(|_| "config/config.ncl".to_string()); info!("Loaded configuration from environment"); // Load budget configuration @@ -96,7 +100,7 @@ async fn main() -> Result<()> { let router = router.map(Arc::new); // Initialize capability registry with built-in capability packages - let cap_registry = CapabilityRegistry::with_built_ins(); + let cap_registry = Arc::new(CapabilityRegistry::with_built_ins()); info!( "Capability registry initialized: {:?}", cap_registry.list_ids() @@ -118,8 +122,6 @@ async fn main() -> Result<()> { let coordinator = Arc::new(coordinator); // Spawn one executor per built-in capability, each wired to the LLM router. - // The executor's channel sender is registered with the coordinator so that - // assign_task() dispatches directly in-process. for cap_id in cap_registry.list_ids() { spawn_capability_executor( &cap_id, @@ -130,8 +132,7 @@ async fn main() -> Result<()> { ); } - // Spawn executors for any agents defined in agents.toml that are NOT - // already covered by a capability package (role not registered yet). + // Spawn executors for config agents not covered by a capability package. for agent_def in &config.agents { if registry.get_agents_by_role(&agent_def.role).is_empty() { spawn_single_config_executor(agent_def, ®istry, &coordinator, router.as_ref()); @@ -151,13 +152,38 @@ async fn main() -> Result<()> { }; let state = AppState { - coordinator, + coordinator: Arc::clone(&coordinator), budget_manager, + config_path: config_path.clone(), + router: router.clone(), + cap_registry: Arc::clone(&cap_registry), }; + // SIGHUP handler for config reload. Note: there is a brief unavailability + // window between drain and re-registration during which assign_task returns + // NoAvailableAgent. Learning profiles (keyed by stable_id) are preserved. + tokio::spawn({ + let state = state.clone(); + let registry = Arc::clone(®istry); + async move { + let mut sighup = + match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup()) { + Ok(s) => s, + Err(e) => { + warn!("Failed to register SIGHUP handler: {}", e); + return; + } + }; + while sighup.recv().await.is_some() { + handle_sighup_reload(&state, ®istry).await; + } + } + }); + let app = Router::new() .route("/health", get(health_handler)) .route("/ready", get(readiness_handler)) + .route("/reload", axum::routing::post(reload_handler)) .with_state(state); let addr = std::env::var("BIND_ADDR").unwrap_or_else(|_| "0.0.0.0:9000".to_string()); @@ -169,6 +195,76 @@ async fn main() -> Result<()> { Ok(()) } +/// Called from the SIGHUP loop. Loads a fresh config and delegates to +/// `reload_agents`. Errors are logged; the process keeps running. +async fn handle_sighup_reload(state: &AppState, registry: &Arc) { + info!("SIGHUP received: reloading agent configuration"); + let new_config = match AgentConfig::load(&state.config_path) { + Ok(c) => c, + Err(e) => { + error!("Config parse failed during reload: {}", e); + return; + } + }; + match reload_agents( + &state.coordinator, + registry, + &new_config, + &state.cap_registry, + state.router.as_ref(), + ) + .await + { + Ok(n) => info!("Reload complete: {} agents active", n), + Err(e) => error!("Reload failed: {}", e), + } +} + +/// Drain all roles and re-spawn executors from config + capability registry. +/// +/// Learning profiles (keyed by `stable_id`) are preserved in the coordinator; +/// new executor instances inherit the accumulated expertise immediately. +/// +/// # Availability window +/// +/// Between the drain phase and the first successful `register_agent`, callers +/// of `assign_task` for the affected roles receive `NoAvailableAgent`. This is +/// a brief window (microseconds to low milliseconds). Callers must handle this +/// error and retry. BudgetManager and LLMRouter are not reloaded; changes to +/// those require a process restart. +async fn reload_agents( + coordinator: &Arc, + registry: &Arc, + new_config: &AgentConfig, + cap_registry: &Arc, + router: Option<&Arc>, +) -> anyhow::Result { + // Drain all currently active roles. The dropped Sender causes each + // executor's recv loop to exit after draining in-flight messages. + for role in registry.list_roles() { + let drained = coordinator.drain_role(&role); + if !drained.is_empty() { + info!("Drained {} agent(s) for role '{}'", drained.len(), role); + } + } + + // Re-spawn capability executors + for cap_id in cap_registry.list_ids() { + spawn_capability_executor(cap_id.as_str(), cap_registry, registry, coordinator, router); + } + + // Re-spawn config agents not covered by capabilities + for agent_def in &new_config.agents { + if registry.get_agents_by_role(&agent_def.role).is_empty() { + spawn_single_config_executor(agent_def, registry, coordinator, router); + } + } + + let total = registry.total_count(); + info!("Reload complete: {} agents active", total); + Ok(total) +} + /// Activate a capability, register the resulting agent, and spawn its executor. fn spawn_capability_executor( cap_id: &str, @@ -381,3 +477,34 @@ async fn readiness_handler(State(state): State) -> Json) -> impl IntoResponse { + let new_config = match AgentConfig::load(&state.config_path) { + Ok(c) => c, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": e.to_string()})), + ) + .into_response(); + } + }; + + let registry = state.coordinator.registry_arc(); + match reload_agents( + &state.coordinator, + ®istry, + &new_config, + &state.cap_registry, + state.router.as_ref(), + ) + .await + { + Ok(n) => (StatusCode::OK, Json(json!({"reloaded": true, "agents": n}))).into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": e.to_string()})), + ) + .into_response(), + } +} diff --git a/crates/vapora-agents/src/config.rs b/crates/vapora-agents/src/config.rs index 3dd4a5b..f534dfe 100644 --- a/crates/vapora-agents/src/config.rs +++ b/crates/vapora-agents/src/config.rs @@ -11,8 +11,8 @@ pub enum ConfigError { #[error("Failed to read config file: {0}")] ReadError(#[from] std::io::Error), - #[error("Failed to parse TOML: {0}")] - ParseError(#[from] toml::de::Error), + #[error("Failed to parse config: {0}")] + ParseJson(#[from] serde_json::Error), #[error("Invalid configuration: {0}")] ValidationError(String), @@ -52,10 +52,55 @@ fn default_agent_timeout() -> u64 { pub use vapora_shared::AgentDefinition; impl AgentConfig { - /// Load configuration from TOML file + /// Load configuration from a TOML or NCL file. When the path has a `.ncl` + /// extension, `nickel export --format json` is invoked and the resulting + /// JSON is parsed. Otherwise the file is read and parsed as TOML. pub fn load>(path: P) -> Result { - let content = std::fs::read_to_string(path)?; - let config: Self = toml::from_str(&content)?; + let path = path.as_ref(); + + let (raw, is_json) = if path.extension().and_then(|e| e.to_str()) == Some("ncl") { + let out = std::process::Command::new("nickel") + .args(["export", "--format", "json"]) + .arg(path) + .output() + .map_err(|e| { + ConfigError::ReadError(std::io::Error::other(format!( + "Failed to invoke nickel for {:?}: {}", + path, e + ))) + })?; + if !out.status.success() { + let stderr = String::from_utf8_lossy(&out.stderr); + return Err(ConfigError::ReadError(std::io::Error::other(format!( + "nickel export failed for {:?}: {}", + path, stderr + )))); + } + let json = String::from_utf8(out.stdout).map_err(|e| { + ConfigError::ReadError(std::io::Error::other(format!( + "nickel output is not valid UTF-8: {}", + e + ))) + })?; + (json, true) + } else { + let content = std::fs::read_to_string(path)?; + (content, false) + }; + + let interpolated = interpolate_env_vars(&raw); + + let config: Self = if is_json { + serde_json::from_str(&interpolated)? + } else { + toml::from_str(&interpolated).map_err(|e| { + ConfigError::ReadError(std::io::Error::other(format!( + "Failed to parse TOML: {}", + e + ))) + })? + }; + config.validate()?; Ok(config) } @@ -63,12 +108,11 @@ impl AgentConfig { /// Load configuration from environment or default file pub fn from_env() -> Result { let config_path = std::env::var("VAPORA_AGENT_CONFIG") - .unwrap_or_else(|_| "/etc/vapora/agents.toml".to_string()); + .unwrap_or_else(|_| "config/config.ncl".to_string()); if Path::new(&config_path).exists() { Self::load(&config_path) } else { - // Return default config if file doesn't exist Ok(Self::default()) } } @@ -129,6 +173,34 @@ impl Default for AgentConfig { } } +/// Expand every `${VAR}` / `${VAR:-default}` reference in `content`. +/// Unresolved vars without a default are replaced with an empty string. +fn interpolate_env_vars(content: &str) -> String { + let mut result = String::with_capacity(content.len()); + let mut remaining = content; + while let Some(start) = remaining.find("${") { + result.push_str(&remaining[..start]); + let after_open = &remaining[start + 2..]; + if let Some(close) = after_open.find('}') { + let var_expr = &after_open[..close]; + let value = if let Some(sep) = var_expr.find(":-") { + let var_name = &var_expr[..sep]; + let default_val = &var_expr[sep + 2..]; + std::env::var(var_name).unwrap_or_else(|_| default_val.to_string()) + } else { + std::env::var(var_expr).unwrap_or_default() + }; + result.push_str(&value); + remaining = &after_open[close + 1..]; + } else { + result.push_str("${"); + remaining = after_open; + } + } + result.push_str(remaining); + result +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/vapora-agents/src/coordinator.rs b/crates/vapora-agents/src/coordinator.rs index a4c8f18..5b0014b 100644 --- a/crates/vapora-agents/src/coordinator.rs +++ b/crates/vapora-agents/src/coordinator.rs @@ -1,5 +1,4 @@ // vapora-agents: Agent coordinator - orchestrates agent workflows -// Phase 2: Complete implementation with NATS integration use std::collections::HashMap; use std::path::PathBuf; @@ -238,7 +237,7 @@ impl AgentCoordinator { // Simple heuristic: check if title/description contains known task types let task_type = extract_task_type(&title, &description, role); - // Get learning profiles for all candidates + // Get learning profiles for all candidates, keyed by stable_id. let learning_profiles = { let profiles = self .learning_profiles @@ -246,7 +245,10 @@ impl AgentCoordinator { .unwrap_or_else(|e| e.into_inner()); candidates .iter() - .map(|a| (a.id.clone(), profiles.get(&a.id).cloned())) + .map(|a| { + let key = a.stable_id_or_role(); + (a.id.clone(), profiles.get(key).cloned()) + }) .collect::>() }; @@ -426,6 +428,27 @@ impl AgentCoordinator { Arc::clone(&self.registry) } + /// Shared reference to the registry (for hot-reload coordination). + pub fn registry_arc(&self) -> Arc { + Arc::clone(&self.registry) + } + + /// Drain all agents for `role`: removes them from the registry and drops + /// their in-process executor channels. + /// + /// Learning profiles keyed by `stable_id` are intentionally preserved so + /// that re-spawned executors immediately benefit from past expertise. + /// + /// The dropped `Sender` causes the executor loop to exit once it drains + /// any in-flight messages. + pub fn drain_role(&self, role: &str) -> Vec { + let ids = self.registry.drain_role(role); + for id in &ids { + self.executor_channels.remove(id); + } + ids + } + /// Start coordinator (subscribe to NATS topics) pub async fn start(&self) -> Result<(), CoordinatorError> { if self.nats_client.is_none() { @@ -535,17 +558,18 @@ impl AgentCoordinator { ); for agent in agents { + let key = agent.stable_id_or_role().to_string(); match self - .load_learning_profile_from_kg(&agent.id, task_type, kg_persistence) + .load_learning_profile_from_kg(&key, task_type, kg_persistence) .await { Ok(profile) => { - self.update_learning_profile(&agent.id, profile)?; + self.update_learning_profile(&key, profile)?; } Err(e) => { warn!( - "Failed to load learning profile for agent {}: {}", - agent.id, e + "Failed to load learning profile for agent {} (stable_id: {}): {}", + agent.id, key, e ); // Continue with other agents on failure } @@ -743,6 +767,35 @@ mod tests { assert!(task_id.is_ok()); } + #[test] + fn test_profile_survives_role_drain() { + let registry = Arc::new(AgentRegistry::new(5)); + let agent = AgentMetadata::new( + "developer".to_string(), + "Dev 1".to_string(), + "claude".to_string(), + "claude-sonnet-4".to_string(), + vec![], + ); + registry.register_agent(agent).unwrap(); + + let coordinator = AgentCoordinator::with_registry(Arc::clone(®istry)); + + // Insert a profile under the stable_id key + let profile = crate::learning_profile::LearningProfile::new("developer".to_string()); + coordinator + .update_learning_profile("developer", profile) + .unwrap(); + + // Drain the role (removes agents + channels) + let drained = coordinator.drain_role("developer"); + assert_eq!(drained.len(), 1); + + // Profile must survive the drain + let retained = coordinator.get_learning_profile("developer"); + assert!(retained.is_some(), "profile must survive drain_role"); + } + #[tokio::test] async fn test_no_available_agent() { // Set schema directory for tests (relative to workspace root) diff --git a/crates/vapora-agents/src/profile_adapter.rs b/crates/vapora-agents/src/profile_adapter.rs index f901b95..2b23f7a 100644 --- a/crates/vapora-agents/src/profile_adapter.rs +++ b/crates/vapora-agents/src/profile_adapter.rs @@ -1,6 +1,4 @@ // Profile adapter: AgentMetadata + KG metrics → Swarm AgentProfile -// Phase 5.2: Bridges agent registry with swarm coordination -// Phase 5.3: Integrates per-task-type learning profiles from KG use vapora_swarm::messages::AgentProfile; @@ -40,9 +38,12 @@ impl ProfileAdapter { profile } - /// Create learning profile from agent with task-type expertise. - /// Integrates per-task-type learning data from KG for intelligent - /// assignment. + /// Create a learning profile with the given ID. + /// + /// Callers should pass `agent.stable_id_or_role()` (not `agent.id`) so + /// that the profile survives hot-reloads. The coordinator already enforces + /// this; use this function for constructing profiles that will be inserted + /// via `AgentCoordinator::update_learning_profile`. pub fn create_learning_profile(agent_id: String) -> LearningProfile { LearningProfile::new(agent_id) } @@ -79,6 +80,7 @@ mod tests { fn test_profile_creation_from_metadata() { let agent = AgentMetadata { id: "agent-1".to_string(), + stable_id: "developer".to_string(), role: "developer".to_string(), name: "Dev Agent 1".to_string(), version: "0.1.0".to_string(), @@ -109,6 +111,7 @@ mod tests { let agents = vec![ AgentMetadata { id: "agent-1".to_string(), + stable_id: "developer".to_string(), role: "developer".to_string(), name: "Dev 1".to_string(), version: "0.1.0".to_string(), @@ -126,6 +129,7 @@ mod tests { }, AgentMetadata { id: "agent-2".to_string(), + stable_id: "reviewer".to_string(), role: "reviewer".to_string(), name: "Reviewer 1".to_string(), version: "0.1.0".to_string(), diff --git a/crates/vapora-agents/src/registry.rs b/crates/vapora-agents/src/registry.rs index 76971fa..e04d707 100644 --- a/crates/vapora-agents/src/registry.rs +++ b/crates/vapora-agents/src/registry.rs @@ -1,5 +1,4 @@ // vapora-agents: Agent registry - manages agent lifecycle and availability -// Phase 2: Complete implementation with 12 agent roles use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -35,7 +34,12 @@ pub enum AgentStatus { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AgentMetadata { + /// Ephemeral UUID generated at runtime — changes every startup. pub id: String, + /// Deterministic identity keyed on role — survives hot-reloads. + /// Defaults to `role` for data produced before this field existed. + #[serde(default)] + pub stable_id: String, pub role: String, pub name: String, pub version: String, @@ -66,6 +70,7 @@ impl AgentMetadata { let now = Utc::now(); Self { id: Uuid::new_v4().to_string(), + stable_id: role.clone(), role, name, version: "0.1.0".to_string(), @@ -90,6 +95,18 @@ impl AgentMetadata { self } + /// Stable profile identity: `stable_id` when non-empty, `role` otherwise. + /// + /// The fallback handles data deserialized from records produced before + /// `stable_id` was introduced. + pub fn stable_id_or_role(&self) -> &str { + if self.stable_id.is_empty() { + &self.role + } else { + &self.stable_id + } + } + /// Check if agent can accept new tasks pub fn can_accept_task(&self) -> bool { self.status == AgentStatus::Active && self.current_tasks < self.max_concurrent_tasks @@ -283,6 +300,40 @@ impl AgentRegistry { let inner = self.inner.read().expect("Failed to acquire read lock"); inner.agents.len() } + + /// Remove all agents for `role` from the registry and return their + /// ephemeral IDs. The `running_count` entry for the role is also cleared. + /// + /// Called during hot-reload to drain a role before re-spawning executors. + /// Learning profiles (keyed by `stable_id`) are unaffected. + pub fn drain_role(&self, role: &str) -> Vec { + let mut inner = self.inner.write().expect("registry write lock"); + let ids: Vec = inner + .agents + .values() + .filter(|a| a.role == role) + .map(|a| a.id.clone()) + .collect(); + for id in &ids { + inner.agents.remove(id); + } + inner.running_count.remove(role); + ids + } + + /// Return the set of roles currently present in the registry. + pub fn list_roles(&self) -> Vec { + let inner = self.inner.read().expect("registry read lock"); + let mut roles: Vec = inner + .agents + .values() + .map(|a| a.role.clone()) + .collect::>() + .into_iter() + .collect(); + roles.sort(); + roles + } } impl Default for AgentRegistry { @@ -363,6 +414,98 @@ mod tests { assert_eq!(agent.total_tasks_completed, 1); } + #[test] + fn test_stable_id_deterministic() { + let a = AgentMetadata::new( + "developer".to_string(), + "Dev 1".to_string(), + "claude".to_string(), + "claude-sonnet-4".to_string(), + vec![], + ); + let b = AgentMetadata::new( + "developer".to_string(), + "Dev 2".to_string(), + "claude".to_string(), + "claude-sonnet-4".to_string(), + vec![], + ); + + assert_ne!(a.id, b.id, "ephemeral IDs must differ"); + assert_eq!(a.stable_id, "developer"); + assert_eq!(b.stable_id, "developer"); + assert_eq!(a.stable_id_or_role(), "developer"); + } + + #[test] + fn test_stable_id_or_role_fallback() { + let mut agent = AgentMetadata::new( + "reviewer".to_string(), + "Rev".to_string(), + "claude".to_string(), + "claude-sonnet-4".to_string(), + vec![], + ); + agent.stable_id = String::new(); + assert_eq!(agent.stable_id_or_role(), "reviewer"); + } + + #[test] + fn test_drain_role() { + let registry = AgentRegistry::new(5); + + for i in 0..2 { + let agent = AgentMetadata::new( + "developer".to_string(), + format!("Dev {}", i), + "claude".to_string(), + "claude-sonnet-4".to_string(), + vec![], + ); + registry.register_agent(agent).unwrap(); + } + let reviewer = AgentMetadata::new( + "reviewer".to_string(), + "Rev".to_string(), + "claude".to_string(), + "claude-sonnet-4".to_string(), + vec![], + ); + registry.register_agent(reviewer).unwrap(); + + let drained = registry.drain_role("developer"); + assert_eq!(drained.len(), 2); + assert_eq!(registry.count_by_role("developer"), 0); + assert_eq!(registry.count_by_role("reviewer"), 1); + assert_eq!(registry.total_count(), 1); + } + + #[test] + fn test_list_roles() { + let registry = AgentRegistry::new(5); + + let a = AgentMetadata::new( + "developer".to_string(), + "Dev".to_string(), + "claude".to_string(), + "claude-sonnet-4".to_string(), + vec![], + ); + let b = AgentMetadata::new( + "reviewer".to_string(), + "Rev".to_string(), + "claude".to_string(), + "claude-sonnet-4".to_string(), + vec![], + ); + registry.register_agent(a).unwrap(); + registry.register_agent(b).unwrap(); + + let mut roles = registry.list_roles(); + roles.sort(); + assert_eq!(roles, vec!["developer", "reviewer"]); + } + #[test] fn test_get_available_agent() { let registry = AgentRegistry::new(5); diff --git a/crates/vapora-agents/src/runtime/executor.rs b/crates/vapora-agents/src/runtime/executor.rs index b3d6600..709bfb9 100644 --- a/crates/vapora-agents/src/runtime/executor.rs +++ b/crates/vapora-agents/src/runtime/executor.rs @@ -64,10 +64,13 @@ impl AgentExecutor { /// Run the executor loop, processing tasks until the channel closes. pub async fn run(mut self) { info!( - "AgentExecutor started for agent: {}", - self.agent.metadata.id + "AgentExecutor started for agent: {} (stable_id: {})", + self.agent.metadata.id, + self.agent.metadata.stable_id_or_role() ); - let agent_id = self.agent.metadata.id.clone(); + // Use stable_id so KG records survive hot-reloads and are correlated + // across agent restarts for the same role. + let agent_id = self.agent.metadata.stable_id_or_role().to_string(); while let Some(task) = self.task_rx.recv().await { debug!("Received task: {}", task.id); @@ -275,6 +278,7 @@ mod tests { async fn test_executor_creation() { let metadata = AgentMetadata { id: "test-executor".to_string(), + stable_id: "developer".to_string(), role: "developer".to_string(), name: "Test Executor".to_string(), version: "0.1.0".to_string(), @@ -303,6 +307,7 @@ mod tests { fn test_executor_persistence_disabled_by_default() { let metadata = AgentMetadata { id: "test-no-persist".to_string(), + stable_id: "reviewer".to_string(), role: "reviewer".to_string(), name: "Test No Persist".to_string(), version: "0.1.0".to_string(), diff --git a/crates/vapora-agents/src/runtime/state_machine.rs b/crates/vapora-agents/src/runtime/state_machine.rs index 9bf192a..e94b023 100644 --- a/crates/vapora-agents/src/runtime/state_machine.rs +++ b/crates/vapora-agents/src/runtime/state_machine.rs @@ -150,6 +150,7 @@ mod tests { // Create metadata for testing let metadata = AgentMetadata { id: "test-agent".to_string(), + stable_id: "developer".to_string(), role: "developer".to_string(), name: "Test Developer".to_string(), version: "0.1.0".to_string(), @@ -201,6 +202,7 @@ mod tests { fn test_failed_state_transition() { let metadata = AgentMetadata { id: "test-agent".to_string(), + stable_id: "developer".to_string(), role: "developer".to_string(), name: "Test Developer".to_string(), version: "0.1.0".to_string(), diff --git a/crates/vapora-agents/tests/end_to_end_learning_budget_test.rs b/crates/vapora-agents/tests/end_to_end_learning_budget_test.rs index 93d6357..08bb6ab 100644 --- a/crates/vapora-agents/tests/end_to_end_learning_budget_test.rs +++ b/crates/vapora-agents/tests/end_to_end_learning_budget_test.rs @@ -37,9 +37,6 @@ async fn test_end_to_end_learning_with_budget_enforcement() { vec!["coding".to_string(), "documentation".to_string()], ); - let dev_a_id = developer_a.id.clone(); - let dev_b_id = developer_b.id.clone(); - registry.register_agent(developer_a).ok(); registry.register_agent(developer_b).ok(); @@ -62,9 +59,10 @@ async fn test_end_to_end_learning_with_budget_enforcement() { let budget_manager = Arc::new(BudgetManager::new(budgets)); let coordinator = coordinator.with_budget_manager(budget_manager.clone()); - // Simulate historical executions for developer_a (excellent at coding) + // Build a role-level learning profile keyed by stable_id ("developer"). + // All developer agents share one profile since stable_id = role. let now = Utc::now(); - let dev_a_executions: Vec = (0..30) + let dev_executions: Vec = (0..30) .map(|i| ExecutionData { timestamp: now - Duration::days(i), duration_ms: 200 + (i as u64 * 5), @@ -72,46 +70,22 @@ async fn test_end_to_end_learning_with_budget_enforcement() { }) .collect(); - // Simulate historical executions for developer_b (mediocre at coding) - let dev_b_executions: Vec = (0..30) - .map(|i| ExecutionData { - timestamp: now - Duration::days(i), - duration_ms: 300 + (i as u64 * 10), - success: i < 20, // 67% success rate - }) - .collect(); + let dev_expertise = TaskTypeExpertise::from_executions(dev_executions, "coding"); - // Calculate expertise from executions - let dev_a_expertise = TaskTypeExpertise::from_executions(dev_a_executions, "coding"); - let dev_b_expertise = TaskTypeExpertise::from_executions(dev_b_executions, "coding"); + assert!(dev_expertise.success_rate > 0.9); - // Verify expertise calculations - assert!(dev_a_expertise.success_rate > 0.9); - assert!(dev_b_expertise.success_rate > 0.6 && dev_b_expertise.success_rate < 0.7); - assert!(dev_a_expertise.success_rate > dev_b_expertise.success_rate); + let mut role_profile = ProfileAdapter::create_learning_profile("developer".to_string()); + role_profile = + ProfileAdapter::add_task_type_expertise(role_profile, "coding".to_string(), dev_expertise); - // Create learning profiles - let mut profile_a = ProfileAdapter::create_learning_profile(dev_a_id.clone()); - profile_a = - ProfileAdapter::add_task_type_expertise(profile_a, "coding".to_string(), dev_a_expertise); - - let mut profile_b = ProfileAdapter::create_learning_profile(dev_b_id.clone()); - profile_b = - ProfileAdapter::add_task_type_expertise(profile_b, "coding".to_string(), dev_b_expertise); - - // Update coordinator with learning profiles + // Insert under stable_id = "developer" so assign_task can find it. coordinator - .update_learning_profile(&dev_a_id, profile_a.clone()) - .ok(); - coordinator - .update_learning_profile(&dev_b_id, profile_b.clone()) + .update_learning_profile("developer", role_profile) .ok(); - // Verify profiles are stored - let stored_a = coordinator.get_learning_profile(&dev_a_id); - let stored_b = coordinator.get_learning_profile(&dev_b_id); - assert!(stored_a.is_some()); - assert!(stored_b.is_some()); + // Verify profile is stored under stable_id key + let stored = coordinator.get_learning_profile("developer"); + assert!(stored.is_some(), "Role-level profile must be stored"); // Check budget status before task assignment let budget_status = budget_manager.check_budget("developer").await.unwrap(); @@ -119,7 +93,7 @@ async fn test_end_to_end_learning_with_budget_enforcement() { assert!(!budget_status.near_threshold); assert_eq!(budget_status.monthly_remaining_cents, 100000); - // Assign a coding task (should go to developer_a based on learning) + // Assign a coding task — learning profile found → profile-based routing. let task_id = coordinator .assign_task( "developer", @@ -131,28 +105,15 @@ async fn test_end_to_end_learning_with_budget_enforcement() { .await .expect("Should assign task"); - // Verify task was assigned (we can check via registry) + // Verify one developer was assigned the task let all_agents = coordinator.registry().list_all(); - let dev_a_tasks = all_agents - .iter() - .find(|a| a.id == dev_a_id) - .map(|a| a.current_tasks) - .unwrap_or(0); + let total_assigned: u32 = all_agents.iter().map(|a| a.current_tasks).sum(); + assert_eq!(total_assigned, 1, "Exactly one agent must have the task"); - let _dev_b_tasks = all_agents - .iter() - .find(|a| a.id == dev_b_id) - .map(|a| a.current_tasks) - .unwrap_or(0); - - // Developer A (high expertise) should be selected - assert!( - dev_a_tasks > 0, - "Developer A (high expertise) should have been assigned the task" - ); - - // Simulate task completion - coordinator.complete_task(&task_id, &dev_a_id).await.ok(); + // Complete the task using the agent that received it + if let Some(assigned) = all_agents.iter().find(|a| a.current_tasks > 0) { + coordinator.complete_task(&task_id, &assigned.id).await.ok(); + } // Verify budget status is still within limits let budget_status = budget_manager.check_budget("developer").await.unwrap(); @@ -160,7 +121,7 @@ async fn test_end_to_end_learning_with_budget_enforcement() { // Simulate multiple tasks to test cumulative budget tracking for i in 0..5 { - let task = coordinator + if let Ok(tid) = coordinator .assign_task( "developer", format!("Task {}", i), @@ -168,15 +129,11 @@ async fn test_end_to_end_learning_with_budget_enforcement() { "Context".to_string(), 1, ) - .await; - - if task.is_ok() { + .await + { let agents = coordinator.registry().list_all(); - if let Some(dev_a) = agents.iter().find(|a| a.id == dev_a_id) { - coordinator - .complete_task(&format!("task-{}", i), &dev_a.id) - .await - .ok(); + if let Some(assigned) = agents.iter().find(|a| a.current_tasks > 0) { + coordinator.complete_task(&tid, &assigned.id).await.ok(); } } } @@ -188,9 +145,12 @@ async fn test_end_to_end_learning_with_budget_enforcement() { "Should not exceed monthly budget" ); - // Verify learning profiles are still intact + // Verify role-level profile is intact let all_profiles = coordinator.get_all_learning_profiles(); - assert_eq!(all_profiles.len(), 2, "Both profiles should be stored"); + assert!( + all_profiles.contains_key("developer"), + "Role-level profile must survive task processing" + ); } /// Test that budget enforcement doesn't break learning-based selection @@ -214,9 +174,6 @@ async fn test_learning_selection_with_budget_constraints() { vec!["coding".to_string()], ); - let expert_id = agent_expert.id.clone(); - let novice_id = agent_novice.id.clone(); - registry.register_agent(agent_expert).ok(); registry.register_agent(agent_novice).ok(); @@ -238,9 +195,9 @@ async fn test_learning_selection_with_budget_constraints() { let budget_manager = Arc::new(BudgetManager::new(budgets)); let coordinator = coordinator.with_budget_manager(budget_manager.clone()); - // Create expertise profiles + // Build a role-level learning profile for "developer" (stable_id = role). let now = Utc::now(); - let expert_execs: Vec = (0..20) + let role_execs: Vec = (0..20) .map(|i| ExecutionData { timestamp: now - Duration::days(i), duration_ms: 100, @@ -248,36 +205,14 @@ async fn test_learning_selection_with_budget_constraints() { }) .collect(); - let novice_execs: Vec = (0..20) - .map(|i| ExecutionData { - timestamp: now - Duration::days(i), - duration_ms: 100, - success: i < 12, // 60% success - }) - .collect(); + let role_expertise = TaskTypeExpertise::from_executions(role_execs, "coding"); - let expert_expertise = TaskTypeExpertise::from_executions(expert_execs, "coding"); - let novice_expertise = TaskTypeExpertise::from_executions(novice_execs, "coding"); - - let mut expert_profile = ProfileAdapter::create_learning_profile(expert_id.clone()); - expert_profile = ProfileAdapter::add_task_type_expertise( - expert_profile, - "coding".to_string(), - expert_expertise, - ); - - let mut novice_profile = ProfileAdapter::create_learning_profile(novice_id.clone()); - novice_profile = ProfileAdapter::add_task_type_expertise( - novice_profile, - "coding".to_string(), - novice_expertise, - ); + let mut role_profile = ProfileAdapter::create_learning_profile("developer".to_string()); + role_profile = + ProfileAdapter::add_task_type_expertise(role_profile, "coding".to_string(), role_expertise); coordinator - .update_learning_profile(&expert_id, expert_profile) - .ok(); - coordinator - .update_learning_profile(&novice_id, novice_profile) + .update_learning_profile("developer", role_profile) .ok(); // Verify budget status @@ -287,11 +222,10 @@ async fn test_learning_selection_with_budget_constraints() { "Initial budget should be healthy" ); - // Assign multiple tasks - expert should be consistently selected - let mut expert_count = 0; - #[allow(clippy::excessive_nesting)] + // Assign multiple tasks — profile-based scoring should pick some developer. + let mut assigned_count = 0; for i in 0..3 { - if let Ok(_task_id) = coordinator + if let Ok(tid) = coordinator .assign_task( "developer", format!("Coding Task {}", i), @@ -301,19 +235,17 @@ async fn test_learning_selection_with_budget_constraints() { ) .await { + assigned_count += 1; let agents = coordinator.registry().list_all(); - if let Some(expert) = agents.iter().find(|a| a.id == expert_id) { - if expert.current_tasks > 0 { - expert_count += 1; - } + if let Some(assigned) = agents.iter().find(|a| a.current_tasks > 0) { + coordinator.complete_task(&tid, &assigned.id).await.ok(); } } } - // Expert should have been selected more often assert!( - expert_count > 0, - "Expert should have been selected despite budget constraints" + assigned_count > 0, + "Tasks should be assigned despite budget constraints" ); } @@ -330,7 +262,6 @@ async fn test_learning_profile_improvement_with_budget_tracking() { vec!["coding".to_string()], ); - let agent_id = agent.id.clone(); registry.register_agent(agent).ok(); let coordinator = AgentCoordinator::with_registry(registry); @@ -351,7 +282,7 @@ async fn test_learning_profile_improvement_with_budget_tracking() { let budget_manager = Arc::new(BudgetManager::new(budgets)); let coordinator = coordinator.with_budget_manager(budget_manager.clone()); - // Initial profile: mediocre performance + // Initial profile: mediocre performance — keyed by stable_id = "developer". let now = Utc::now(); let initial_execs: Vec = (0..10) .map(|i| ExecutionData { @@ -364,7 +295,7 @@ async fn test_learning_profile_improvement_with_budget_tracking() { let mut initial_expertise = TaskTypeExpertise::from_executions(initial_execs, "coding"); assert!((initial_expertise.success_rate - 0.5).abs() < 0.01); - let mut profile = ProfileAdapter::create_learning_profile(agent_id.clone()); + let mut profile = ProfileAdapter::create_learning_profile("developer".to_string()); profile = ProfileAdapter::add_task_type_expertise( profile, "coding".to_string(), @@ -372,17 +303,17 @@ async fn test_learning_profile_improvement_with_budget_tracking() { ); coordinator - .update_learning_profile(&agent_id, profile.clone()) + .update_learning_profile("developer", profile) .ok(); // Check initial profile - let stored_profile = coordinator.get_learning_profile(&agent_id).unwrap(); + let stored_profile = coordinator.get_learning_profile("developer").unwrap(); assert_eq!( stored_profile.get_task_type_score("coding"), initial_expertise.success_rate ); - // Simulate improvement: add successful recent executions + // Simulate improvement: add successful recent execution let new_exec = ExecutionData { timestamp: now, duration_ms: 120, @@ -396,7 +327,7 @@ async fn test_learning_profile_improvement_with_budget_tracking() { ); // Update profile with improved expertise - let mut updated_profile = ProfileAdapter::create_learning_profile(agent_id.clone()); + let mut updated_profile = ProfileAdapter::create_learning_profile("developer".to_string()); updated_profile = ProfileAdapter::add_task_type_expertise( updated_profile, "coding".to_string(), @@ -404,11 +335,11 @@ async fn test_learning_profile_improvement_with_budget_tracking() { ); coordinator - .update_learning_profile(&agent_id, updated_profile) + .update_learning_profile("developer", updated_profile) .ok(); // Verify improvement is reflected - let final_profile = coordinator.get_learning_profile(&agent_id).unwrap(); + let final_profile = coordinator.get_learning_profile("developer").unwrap(); let final_score = final_profile.get_task_type_score("coding"); assert!(final_score > 0.5, "Final score should reflect improvement"); diff --git a/crates/vapora-agents/tests/learning_integration_test.rs b/crates/vapora-agents/tests/learning_integration_test.rs index 9d907d7..e8f6301 100644 --- a/crates/vapora-agents/tests/learning_integration_test.rs +++ b/crates/vapora-agents/tests/learning_integration_test.rs @@ -320,9 +320,10 @@ async fn test_coordinator_assignment_with_learning_scores() { // Create coordinator let coordinator = AgentCoordinator::with_registry(registry); - // Create learning profiles: Agent A excels at coding, Agent B is mediocre + // Build a role-level learning profile for "developer" (stable_id = role). + // Both agents share this profile since they share a role. let now = Utc::now(); - let agent_a_executions: Vec = (0..20) + let executions: Vec = (0..20) .map(|i| ExecutionData { timestamp: now - Duration::days(i), duration_ms: 100, @@ -330,40 +331,19 @@ async fn test_coordinator_assignment_with_learning_scores() { }) .collect(); - let agent_b_executions: Vec = (0..20) - .map(|i| ExecutionData { - timestamp: now - Duration::days(i), - duration_ms: 100, - success: i < 14, // 70% success rate - }) - .collect(); + let expertise = TaskTypeExpertise::from_executions(executions, "coding"); - let agent_a_expertise = TaskTypeExpertise::from_executions(agent_a_executions, "coding"); - let agent_b_expertise = TaskTypeExpertise::from_executions(agent_b_executions, "coding"); + // Profiles are keyed by stable_id ("developer") so they survive hot-reloads. + let mut role_profile = ProfileAdapter::create_learning_profile("developer".to_string()); + role_profile = + ProfileAdapter::add_task_type_expertise(role_profile, "coding".to_string(), expertise); - let mut agent_a_profile = ProfileAdapter::create_learning_profile(agent_a_id.clone()); - agent_a_profile = ProfileAdapter::add_task_type_expertise( - agent_a_profile, - "coding".to_string(), - agent_a_expertise, - ); - - let mut agent_b_profile = ProfileAdapter::create_learning_profile(agent_b_id.clone()); - agent_b_profile = ProfileAdapter::add_task_type_expertise( - agent_b_profile, - "coding".to_string(), - agent_b_expertise, - ); - - // Update coordinator with learning profiles coordinator - .update_learning_profile(&agent_a_id, agent_a_profile) - .ok(); - coordinator - .update_learning_profile(&agent_b_id, agent_b_profile) + .update_learning_profile("developer", role_profile) .ok(); - // Assign a coding task + // Assign a coding task — profile-based scoring will be used since a + // "developer" profile exists. let _task_id = coordinator .assign_task( "developer", @@ -375,37 +355,31 @@ async fn test_coordinator_assignment_with_learning_scores() { .await .expect("Should assign task"); - // Get the registry to verify which agent was selected + // Verify one developer was assigned the task let registry = coordinator.registry(); - let agent_a_tasks = registry + let total_tasks: u32 = registry .list_all() .iter() - .find(|a| a.id == agent_a_id) + .filter(|a| a.role == "developer") .map(|a| a.current_tasks) - .unwrap_or(0); + .sum(); - let agent_b_tasks = registry - .list_all() - .iter() - .find(|a| a.id == agent_b_id) - .map(|a| a.current_tasks) - .unwrap_or(0); + assert_eq!(total_tasks, 1, "Exactly one developer should have the task"); - // Agent A (higher expertise in coding) should have been selected - assert!( - agent_a_tasks > 0, - "Agent A (coding specialist) should have 1+ tasks" - ); - assert_eq!(agent_b_tasks, 0, "Agent B (generalist) should have 0 tasks"); - - // Verify learning profiles are stored + // Verify the profile is stored under the stable_id key let stored_profiles = coordinator.get_all_learning_profiles(); assert!( - stored_profiles.contains_key(&agent_a_id), - "Agent A profile should be stored" + stored_profiles.contains_key("developer"), + "Role-level 'developer' profile must be stored" + ); + + // The per-instance IDs should NOT be the profile keys after the refactor + assert!( + !stored_profiles.contains_key(&agent_a_id), + "Ephemeral agent IDs must not be profile keys" ); assert!( - stored_profiles.contains_key(&agent_b_id), - "Agent B profile should be stored" + !stored_profiles.contains_key(&agent_b_id), + "Ephemeral agent IDs must not be profile keys" ); } diff --git a/crates/vapora-backend/src/config.rs b/crates/vapora-backend/src/config.rs index d0c4ca9..77960cc 100644 --- a/crates/vapora-backend/src/config.rs +++ b/crates/vapora-backend/src/config.rs @@ -104,23 +104,52 @@ pub struct MetricsConfig { } impl Config { - /// Load configuration from a TOML file with environment variable - /// interpolation + /// Load configuration from a TOML or NCL file with environment variable + /// interpolation. When the path has a `.ncl` extension, `nickel export + /// --format json` is invoked and the resulting JSON is parsed. Otherwise + /// the file is read and parsed as TOML (legacy / test compatibility). pub fn load>(path: P) -> Result { let path = path.as_ref(); - // Read file content - let content = fs::read_to_string(path).map_err(|e| { - VaporaError::ConfigError(format!("Failed to read config file {:?}: {}", path, e)) - })?; + let (raw, is_json) = if path.extension().and_then(|e| e.to_str()) == Some("ncl") { + let out = std::process::Command::new("nickel") + .args(["export", "--format", "json"]) + .arg(path) + .output() + .map_err(|e| { + VaporaError::ConfigError(format!( + "Failed to invoke nickel for {:?}: {}", + path, e + )) + })?; + if !out.status.success() { + let stderr = String::from_utf8_lossy(&out.stderr); + return Err(VaporaError::ConfigError(format!( + "nickel export failed for {:?}: {}", + path, stderr + ))); + } + let json = String::from_utf8(out.stdout).map_err(|e| { + VaporaError::ConfigError(format!("nickel output is not valid UTF-8: {}", e)) + })?; + (json, true) + } else { + let content = fs::read_to_string(path).map_err(|e| { + VaporaError::ConfigError(format!("Failed to read config file {:?}: {}", path, e)) + })?; + (content, false) + }; - // Interpolate environment variables - let interpolated = Self::interpolate_env_vars(&content)?; + let interpolated = Self::interpolate_env_vars(&raw)?; - // Parse TOML - let config: Config = toml::from_str(&interpolated)?; + let config: Config = if is_json { + serde_json::from_str(&interpolated).map_err(|e| { + VaporaError::ConfigError(format!("Failed to parse config JSON: {}", e)) + })? + } else { + toml::from_str(&interpolated)? + }; - // Validate configuration config.validate()?; Ok(config) diff --git a/crates/vapora-backend/src/main.rs b/crates/vapora-backend/src/main.rs index 65859b8..8cc5855 100644 --- a/crates/vapora-backend/src/main.rs +++ b/crates/vapora-backend/src/main.rs @@ -48,7 +48,7 @@ struct Args { #[arg( short, long, - default_value = "config/vapora.toml", + default_value = "config/config.ncl", env = "VAPORA_CONFIG" )] config: String, diff --git a/crates/vapora-llm-router/src/budget.rs b/crates/vapora-llm-router/src/budget.rs index 1cb0cc8..b8fdb08 100644 --- a/crates/vapora-llm-router/src/budget.rs +++ b/crates/vapora-llm-router/src/budget.rs @@ -7,14 +7,16 @@ use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::sync::RwLock; +use crate::config::interpolate_env_vars; + /// Budget configuration errors #[derive(Debug, Error)] pub enum BudgetConfigError { #[error("Failed to read budget config file: {0}")] ReadError(#[from] std::io::Error), - #[error("Failed to parse TOML: {0}")] - ParseError(#[from] toml::de::Error), + #[error("Failed to parse config: {0}")] + ParseJson(#[from] serde_json::Error), #[error("Invalid budget configuration: {0}")] ValidationError(String), @@ -84,24 +86,65 @@ pub struct BudgetConfig { } impl BudgetConfig { - /// Load budget configuration from TOML file + /// Load budget configuration from a TOML or NCL file. When the path has a + /// `.ncl` extension, `nickel export --format json` is invoked and the + /// resulting JSON is parsed. Otherwise the file is read and parsed as TOML. pub fn load>(path: P) -> Result { - let content = std::fs::read_to_string(path)?; - let config: Self = toml::from_str(&content)?; + let path = path.as_ref(); + + let (raw, is_json) = if path.extension().and_then(|e| e.to_str()) == Some("ncl") { + let out = std::process::Command::new("nickel") + .args(["export", "--format", "json"]) + .arg(path) + .output() + .map_err(|e| { + BudgetConfigError::ReadError(std::io::Error::other(format!( + "Failed to invoke nickel for {:?}: {}", + path, e + ))) + })?; + if !out.status.success() { + let stderr = String::from_utf8_lossy(&out.stderr); + return Err(BudgetConfigError::ReadError(std::io::Error::other( + format!("nickel export failed for {:?}: {}", path, stderr), + ))); + } + let json = String::from_utf8(out.stdout).map_err(|e| { + BudgetConfigError::ReadError(std::io::Error::other(format!( + "nickel output is not valid UTF-8: {}", + e + ))) + })?; + (json, true) + } else { + let content = std::fs::read_to_string(path)?; + (content, false) + }; + + let interpolated = interpolate_env_vars(&raw); + + let config: Self = if is_json { + serde_json::from_str(&interpolated)? + } else { + toml::from_str(&interpolated).map_err(|e| { + BudgetConfigError::ReadError(std::io::Error::other(format!( + "Failed to parse TOML: {}", + e + ))) + })? + }; + config.validate()?; Ok(config) } - /// Load from TOML with default fallback if file doesn't exist + /// Load with default fallback if file doesn't exist pub fn load_or_default>(path: P) -> Result { match Self::load(&path) { Ok(config) => Ok(config), - Err(BudgetConfigError::ReadError(_)) => { - // File doesn't exist, use defaults - Ok(BudgetConfig { - budgets: HashMap::new(), - }) - } + Err(BudgetConfigError::ReadError(_)) => Ok(BudgetConfig { + budgets: HashMap::new(), + }), Err(e) => Err(e), } } diff --git a/crates/vapora-llm-router/src/config.rs b/crates/vapora-llm-router/src/config.rs index d96cdca..a7e3073 100644 --- a/crates/vapora-llm-router/src/config.rs +++ b/crates/vapora-llm-router/src/config.rs @@ -12,8 +12,8 @@ pub enum ConfigError { #[error("Failed to read config file: {0}")] ReadError(#[from] std::io::Error), - #[error("Failed to parse TOML: {0}")] - ParseError(#[from] toml::de::Error), + #[error("Failed to parse config: {0}")] + ParseJson(#[from] serde_json::Error), #[error("Invalid configuration: {0}")] ValidationError(String), @@ -74,21 +74,66 @@ pub struct RoutingRule { } impl LLMRouterConfig { - /// Load configuration from TOML file + /// Load configuration from a TOML or NCL file. When the path has a `.ncl` + /// extension, `nickel export --format json` is invoked and the resulting + /// JSON is parsed with full `${VAR}` interpolation applied pre-parse. + /// Otherwise the file is read and parsed as TOML (legacy compatibility). pub fn load>(path: P) -> Result { - let content = std::fs::read_to_string(path)?; - let mut config: Self = toml::from_str(&content)?; + let path = path.as_ref(); + + let (raw, is_json) = if path.extension().and_then(|e| e.to_str()) == Some("ncl") { + let out = std::process::Command::new("nickel") + .args(["export", "--format", "json"]) + .arg(path) + .output() + .map_err(|e| { + ConfigError::ReadError(std::io::Error::other(format!( + "Failed to invoke nickel for {:?}: {}", + path, e + ))) + })?; + if !out.status.success() { + let stderr = String::from_utf8_lossy(&out.stderr); + return Err(ConfigError::ReadError(std::io::Error::other(format!( + "nickel export failed for {:?}: {}", + path, stderr + )))); + } + let json = String::from_utf8(out.stdout).map_err(|e| { + ConfigError::ReadError(std::io::Error::other(format!( + "nickel output is not valid UTF-8: {}", + e + ))) + })?; + (json, true) + } else { + let content = std::fs::read_to_string(path)?; + (content, false) + }; + + let interpolated = interpolate_env_vars(&raw); + + let config: Self = if is_json { + serde_json::from_str(&interpolated)? + } else { + let mut c: Self = toml::from_str(&interpolated).map_err(|e| { + ConfigError::ReadError(std::io::Error::other(format!( + "Failed to parse TOML: {}", + e + ))) + })?; + // Legacy TOML path: expand env vars in specific fields + c.expand_env_vars(); + c + }; - // Expand environment variables in API keys and URLs - config.expand_env_vars(); config.validate()?; - Ok(config) } - /// Expand environment variables in configuration + /// Expand environment variables in API key and URL fields (TOML path only). fn expand_env_vars(&mut self) { - for (_, provider) in self.providers.iter_mut() { + for provider in self.providers.values_mut() { if let Some(ref api_key) = provider.api_key { provider.api_key = Some(expand_env_var(api_key)); } @@ -136,7 +181,36 @@ impl LLMRouterConfig { } } -/// Expand environment variables in format ${VAR} or ${VAR:-default} +/// Expand every `${VAR}` / `${VAR:-default}` reference in `content`. +/// Unresolved vars without a default are replaced with an empty string. +pub(crate) fn interpolate_env_vars(content: &str) -> String { + let mut result = String::with_capacity(content.len()); + let mut remaining = content; + while let Some(start) = remaining.find("${") { + result.push_str(&remaining[..start]); + let after_open = &remaining[start + 2..]; + if let Some(close) = after_open.find('}') { + let var_expr = &after_open[..close]; + let value = if let Some(sep) = var_expr.find(":-") { + let var_name = &var_expr[..sep]; + let default_val = &var_expr[sep + 2..]; + std::env::var(var_name).unwrap_or_else(|_| default_val.to_string()) + } else { + std::env::var(var_expr).unwrap_or_default() + }; + result.push_str(&value); + remaining = &after_open[close + 1..]; + } else { + result.push_str("${"); + remaining = after_open; + } + } + result.push_str(remaining); + result +} + +/// Expand environment variables in format ${VAR} or ${VAR:-default} (single +/// token). fn expand_env_var(input: &str) -> String { if !input.starts_with("${") || !input.ends_with('}') { return input.to_string(); diff --git a/crates/vapora-workflow-engine/src/config.rs b/crates/vapora-workflow-engine/src/config.rs index dc40099..af00179 100644 --- a/crates/vapora-workflow-engine/src/config.rs +++ b/crates/vapora-workflow-engine/src/config.rs @@ -101,11 +101,53 @@ pub struct StageConfig { impl WorkflowsConfig { pub fn load>(path: P) -> Result { - let content = std::fs::read_to_string(path).map_err(ConfigError::IoError)?; - let config: WorkflowsConfig = toml::from_str(&content).map_err(ConfigError::Parse)?; + let path = path.as_ref(); + + let (raw, is_json) = if path.extension().and_then(|e| e.to_str()) == Some("ncl") { + let out = std::process::Command::new("nickel") + .args(["export", "--format", "json"]) + .arg(path) + .output() + .map_err(|e| { + ConfigError::IoError(std::io::Error::other(format!( + "Failed to invoke nickel for {:?}: {}", + path, e + ))) + })?; + if !out.status.success() { + let stderr = String::from_utf8_lossy(&out.stderr); + return Err(ConfigError::IoError(std::io::Error::other(format!( + "nickel export failed for {:?}: {}", + path, stderr + ))) + .into()); + } + let json = String::from_utf8(out.stdout).map_err(|e| { + ConfigError::IoError(std::io::Error::other(format!( + "nickel output is not valid UTF-8: {}", + e + ))) + })?; + (json, true) + } else { + let content = std::fs::read_to_string(path).map_err(ConfigError::IoError)?; + (content, false) + }; + + let interpolated = interpolate_env_vars(&raw); + + let config: WorkflowsConfig = if is_json { + serde_json::from_str(&interpolated).map_err(ConfigError::ParseJson)? + } else { + toml::from_str(&interpolated).map_err(|e| { + ConfigError::IoError(std::io::Error::other(format!( + "Failed to parse TOML: {}", + e + ))) + })? + }; config.validate()?; - Ok(config) } @@ -149,6 +191,34 @@ impl WorkflowsConfig { } } +/// Expand every `${VAR}` / `${VAR:-default}` reference in `content`. +/// Unresolved vars without a default are replaced with an empty string. +fn interpolate_env_vars(content: &str) -> String { + let mut result = String::with_capacity(content.len()); + let mut remaining = content; + while let Some(start) = remaining.find("${") { + result.push_str(&remaining[..start]); + let after_open = &remaining[start + 2..]; + if let Some(close) = after_open.find('}') { + let var_expr = &after_open[..close]; + let value = if let Some(sep) = var_expr.find(":-") { + let var_name = &var_expr[..sep]; + let default_val = &var_expr[sep + 2..]; + std::env::var(var_name).unwrap_or_else(|_| default_val.to_string()) + } else { + std::env::var(var_expr).unwrap_or_default() + }; + result.push_str(&value); + remaining = &after_open[close + 1..]; + } else { + result.push_str("${"); + remaining = after_open; + } + } + result.push_str(remaining); + result +} + fn validate_schedule_config( workflow_name: &str, schedule: &Option, diff --git a/crates/vapora-workflow-engine/src/error.rs b/crates/vapora-workflow-engine/src/error.rs index ce1b458..cfde6da 100644 --- a/crates/vapora-workflow-engine/src/error.rs +++ b/crates/vapora-workflow-engine/src/error.rs @@ -68,8 +68,8 @@ pub enum ConfigError { #[error("Failed to read config file: {0}")] IoError(#[from] std::io::Error), - #[error("Failed to parse TOML: {0}")] - Parse(#[from] toml::de::Error), + #[error("Failed to parse config: {0}")] + ParseJson(#[from] serde_json::Error), #[error("Invalid configuration: {0}")] Invalid(String), diff --git a/docs/adrs/0040-agent-hot-reload-stable-identity.md b/docs/adrs/0040-agent-hot-reload-stable-identity.md new file mode 100644 index 0000000..317e910 --- /dev/null +++ b/docs/adrs/0040-agent-hot-reload-stable-identity.md @@ -0,0 +1,222 @@ +# ADR-0040: Agent Hot-Reload — Stable Identity and Zero-Downtime Config Reload + +**Status**: Implemented +**Date**: 2026-03-02 +**Deciders**: VAPORA Team +**Technical Story**: `AgentMetadata::id` was a `Uuid::new_v4()` generated at startup. `learning_profiles` in `AgentCoordinator` and execution records in `KGPersistence` used this UUID as the key. Every process restart or SIGHUP reload rotated all UUIDs, orphaning accumulated expertise profiles and resetting the learning system to zero. + +--- + +## Decision + +Introduce `stable_id: String` on `AgentMetadata`, computed as `role.clone()` at construction time. Switch all learning profile keys and KG execution records from the ephemeral `id` (UUID) to `stable_id`. Add hot-reload mechanics — SIGHUP handler and `POST /reload` endpoint — that drain and re-spawn executors while leaving `learning_profiles` untouched. + +--- + +## Context + +### The Identity Problem + +Before this change, every agent had two implicit identities that were conflated into one field: + +| Identity | Purpose | Lifecycle | +|----------|---------|-----------| +| Instance ID (`id`) | Sender handle in `executor_channels`, registry key | Ephemeral — dies with the process or on reload | +| Profile ID | Key for `learning_profiles` and KG records | Must survive restarts to preserve learning | + +Using `Uuid::new_v4()` for both meant any reload (SIGHUP, restart, crash recovery) threw away all accumulated expertise. An agent that had processed 500 coding tasks and learned optimal patterns would start from zero on the next deploy. + +### Why `role` as stable_id + +VAPORA's architecture already partitions learning at the role level: `AgentScoringService::rank_agents` accepts `Vec<(agent_id, Option)>` where multiple agents of the same role compete for a task. The profile that matters for selection is role-level expertise (how well the "developer" role handles "coding" tasks), not per-instance expertise. Using `role` as the stable key: + +- Is deterministic across restarts +- Aggregates learning across all instances of the same role +- Requires no additional persistence (no UUID→role mapping table) +- Degrades gracefully: legacy-deserialized records with empty `stable_id` fall back to `role` via `stable_id_or_role()` + +--- + +## Implementation + +### `AgentMetadata` (registry.rs) + +```rust +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AgentMetadata { + pub id: String, // Uuid::new_v4() — ephemeral, per-instance + #[serde(default)] + pub stable_id: String, // role.clone() — persistent across restarts + pub role: String, + // ... +} + +impl AgentMetadata { + pub fn new(role: String, ...) -> Self { + Self { + id: Uuid::new_v4().to_string(), + stable_id: role.clone(), // set before role is moved + role, + // ... + } + } + + pub fn stable_id_or_role(&self) -> &str { + if self.stable_id.is_empty() { &self.role } else { &self.stable_id } + } +} +``` + +### `AgentRegistry::drain_role` (registry.rs) + +Removes all agents for a role from the `agents` map and clears `running_count`. This allows immediate re-registration after drain without hitting `MaxAgentsReached`. + +```rust +pub fn drain_role(&self, role: &str) -> Vec { + let mut inner = self.inner.write().expect("registry write lock"); + let ids: Vec = inner.agents.values() + .filter(|a| a.role == role) + .map(|a| a.id.clone()) + .collect(); + for id in &ids { inner.agents.remove(id); } + inner.running_count.remove(role); + ids +} +``` + +### `AgentCoordinator::drain_role` (coordinator.rs) + +Delegates to `registry.drain_role`, then removes the corresponding `Sender` entries from `executor_channels`. Dropping the `Sender` closes the mpsc channel; the executor's `while let Some(task) = rx.recv().await` loop exits after draining any buffered messages — no explicit shutdown signal required. + +```rust +pub fn drain_role(&self, role: &str) -> Vec { + let ids = self.registry.drain_role(role); + for id in &ids { + self.executor_channels.remove(id); + } + ids +} +``` + +`learning_profiles` is keyed by `stable_id` (= role) and is **not** touched during drain. New executor instances spawned after reload inherit accumulated expertise immediately. + +### Profile lookup (coordinator.rs) + +```rust +// assign_task — before: +.map(|a| (a.id.clone(), profiles.get(&a.id).cloned())) + +// assign_task — after: +.map(|a| { + let key = a.stable_id_or_role(); + (a.id.clone(), profiles.get(key).cloned()) +}) +``` + +### Hot-reload entry points (server.rs) + +Two entry points invoke the same `reload_agents` function: + +```rust +// SIGHUP +while sighup.recv().await.is_some() { + handle_sighup_reload(&state, ®istry).await; +} + +// REST +.route("/reload", axum::routing::post(reload_handler)) +``` + +`reload_agents` sequence: + +1. `registry.list_roles()` → drain each role via `coordinator.drain_role` +2. Re-spawn capability executors from `CapabilityRegistry` +3. Re-spawn config agents not covered by capabilities +4. Return `registry.total_count()` + +--- + +## Availability Window + +`reload_agents` drains all roles before re-spawning. During the window between the last drain and the first successful `register_agent`, `assign_task` for those roles returns `CoordinatorError::NoAvailableAgent`. This window is typically sub-millisecond on the same thread, but callers must handle this error and retry. + +This is a deliberate trade-off: atomic swap-in of new executors would require a blue-green registry pattern, adding significant complexity for a latency window that is orders of magnitude shorter than any typical LLM call (which takes 500ms–30s). + +--- + +## Out of Scope + +- **BudgetManager reload**: budget limit changes require process restart. The `BudgetManager` is constructed once from config in `main()` and stored in `AppState`. Adding reload support requires either a `RwLock` wrapper or rebuilding the manager and swapping it in `AppState` under a lock. +- **LLMRouter reload**: provider API key changes require restart for the same reason. + +--- + +## Alternatives Considered + +### UUID + external persistence of UUID→role mapping + +Would preserve per-instance identity. Rejected: adds a SurrealDB table (UUID→role) that must be kept in sync across restarts, adds a lookup on every `assign_task`, and provides no additional value since role-level profiles already capture collective expertise. + +### Blue-green registry swap + +Two `AgentRegistry` instances: old one drains while new one accepts assignments. Rejected: requires `AgentCoordinator` to hold `Arc>>` and all call sites to acquire the inner lock on every call. Complexity disproportionate to the gain (sub-millisecond → zero gap). + +### Versioned stable_id (e.g., `developer-v2`) + +For breaking role renames. Rejected: out of scope; role renames already require explicit operator action. + +--- + +## Trade-offs + +**Pros**: + +- Learning profiles survive indefinitely across restarts and hot-reloads +- SIGHUP and `POST /reload` provide two operator-friendly reload paths +- `stable_id_or_role()` fallback ensures backward compatibility with persisted data that predates this change +- `drain_role` cleans up cleanly: no stale executor channels, no MaxAgentsReached on re-register + +**Cons**: + +- All agents of the same role share one learning profile. Per-instance specialization (e.g., "this specific GPU node is faster at inference") is not representable. Acceptable: VAPORA's role model deliberately treats same-role agents as interchangeable for task routing purposes. +- Brief `NoAvailableAgent` window during reload (see Availability Window above). +- BudgetManager and LLMRouter not reloadable without restart. + +--- + +## Verification + +```bash +cargo test -p vapora-agents test_stable_id_deterministic +cargo test -p vapora-agents test_drain_role +cargo test -p vapora-agents test_profile_survives_role_drain +cargo test -p vapora-agents test_list_roles + +# Hot-reload via signal +kill -HUP $(pgrep vapora-agents) + +# Hot-reload via REST +curl -s -X POST http://localhost:9000/reload | jq . +# Expected: {"reloaded": true, "agents": N} + +cargo clippy -p vapora-agents -- -D warnings +``` + +--- + +## Consequences + +- `AgentMetadata` gains a new field `stable_id` with `#[serde(default)]`. Existing serialized records deserialize cleanly; `stable_id_or_role()` falls back to `role`. +- KG execution records (the `agent_id` field in SurrealDB) now store `stable_id` (= role) instead of a UUID. Existing records with UUID keys remain in the database but are no longer updated; they can be cleaned up with a migration if needed. +- ADR-0014 (Learning Profiles) and ADR-0015 (Budget Enforcement) are unaffected at the API level; only the internal key used to look up profiles changes. + +--- + +## References + +- [ADR-0014 — Learning Profiles](./0014-learning-profiles.md) +- [ADR-0015 — Budget Enforcement](./0015-budget-enforcement.md) +- [ADR-0026 — Arc-Based Shared State](./0026-shared-state.md) +- `crates/vapora-agents/src/registry.rs` — `AgentMetadata`, `drain_role`, `list_roles` +- `crates/vapora-agents/src/coordinator.rs` — `drain_role`, `registry_arc`, profile lookup +- `crates/vapora-agents/src/bin/server.rs` — `reload_agents`, SIGHUP handler, `/reload` endpoint diff --git a/docs/adrs/README.md b/docs/adrs/README.md index a44109f..b979eea 100644 --- a/docs/adrs/README.md +++ b/docs/adrs/README.md @@ -2,8 +2,8 @@ Documentación de las decisiones arquitectónicas clave del proyecto VAPORA. -**Status**: Complete (39 ADRs documented) -**Last Updated**: 2026-02-26 +**Status**: Complete (40 ADRs documented) +**Last Updated**: 2026-03-02 **Format**: Custom VAPORA (Decision, Rationale, Alternatives, Trade-offs, Implementation, Verification, Consequences) --- @@ -37,7 +37,7 @@ Decisiones fundamentales sobre el stack tecnológico y estructura base del proye --- -## 🔄 Agent Coordination & Messaging (5 ADRs) +## 🔄 Agent Coordination & Messaging (6 ADRs) Decisiones sobre coordinación entre agentes y comunicación de mensajes. @@ -48,6 +48,7 @@ Decisiones sobre coordinación entre agentes y comunicación de mensajes. | [030](./0030-a2a-protocol-implementation.md) | A2A Protocol Implementation | Axum JSON-RPC 2.0 server + resilient client con exponential backoff | ✅ Implemented | | [031](./0031-kubernetes-deployment-kagent.md) | Kubernetes Deployment Strategy para kagent | Kustomize + StatefulSet con overlays dev/prod | ✅ Accepted | | [032](./0032-a2a-error-handling-json-rpc.md) | A2A Error Handling y JSON-RPC 2.0 Compliance | Two-layer: thiserror domain errors + JSON-RPC 2.0 protocol conversion | ✅ Implemented | +| [040](./0040-agent-hot-reload-stable-identity.md) | Agent Hot-Reload — Stable Identity and Zero-Downtime Config Reload | `stable_id = role` as persistent profile key; SIGHUP + `POST /reload` drain-and-respawn without learning loss | ✅ Implemented | --- @@ -126,6 +127,7 @@ Patrones de desarrollo y arquitectura utilizados en todo el codebase. - **A2A Protocol**: JSON-RPC 2.0 over HTTP enables interoperability with Google kagent and other A2A-compliant agents - **kagent Kubernetes Deployment**: Kustomize StatefulSet with stable pod identities for predictable A2A endpoint addressing - **A2A Error Handling**: Two-layer strategy (domain `thiserror` + JSON-RPC 2.0 protocol conversion) specializes ADR-0022 for A2A +- **Agent Hot-Reload**: `stable_id = role` decouples ephemeral instance identity from persistent profile key; SIGHUP and `POST /reload` drain executors while preserving all learning profiles ### ☁️ Infrastructure & Security @@ -270,7 +272,7 @@ Each ADR follows the Custom VAPORA format: ## Statistics -- **Total ADRs**: 38 +- **Total ADRs**: 40 - **Core Architecture**: 13 (41%) - **Agent Coordination**: 5 (16%) - **Infrastructure**: 4 (12%) @@ -291,4 +293,4 @@ Each ADR follows the Custom VAPORA format: **Generated**: January 12, 2026 **Status**: Production-Ready -**Last Reviewed**: 2026-02-17 +**Last Reviewed**: 2026-03-02 diff --git a/nickel/agents/contracts.ncl b/nickel/agents/contracts.ncl new file mode 100644 index 0000000..742ecb1 --- /dev/null +++ b/nickel/agents/contracts.ncl @@ -0,0 +1,18 @@ +{ + RegistryConfig = { + max_agents_per_role | Number, + health_check_interval | Number, + agent_timeout | Number, + }, + + AgentDefinition = { + role | String, + description | String, + llm_provider | String, + llm_model | String, + parallelizable | Bool, + priority | Number, + capabilities | Array String, + system_prompt | String | optional, + }, +} diff --git a/nickel/budgets/contracts.ncl b/nickel/budgets/contracts.ncl new file mode 100644 index 0000000..8aeaa47 --- /dev/null +++ b/nickel/budgets/contracts.ncl @@ -0,0 +1,20 @@ +let AlertThreshold = std.contract.custom ( + fun label => + fun value => + if value >= 0.0 && value <= 1.0 then + 'Ok value + else + 'Error { + message = "Invalid alert_threshold '%{std.to_string value}'.\nValid range: 0.0 - 1.0" + } +) in + +{ + RoleBudget = { + role | String, + monthly_limit_cents | Number, + weekly_limit_cents | Number, + fallback_provider | String, + alert_threshold | AlertThreshold, + }, +} diff --git a/nickel/channels/contracts.ncl b/nickel/channels/contracts.ncl new file mode 100644 index 0000000..e9d3a12 --- /dev/null +++ b/nickel/channels/contracts.ncl @@ -0,0 +1,27 @@ +{ + SlackConfig = { + type | String, + webhook_url | String, + channel | String | optional, + username | String | optional, + }, + + DiscordConfig = { + type | String, + webhook_url | String, + username | String | optional, + avatar_url | String | optional, + }, + + TelegramConfig = { + type | String, + bot_token | String, + chat_id | String, + api_base | String | optional, + }, + + ChannelEntry = { + type | String, + .. + }, +} diff --git a/nickel/llm-router/contracts.ncl b/nickel/llm-router/contracts.ncl new file mode 100644 index 0000000..a718224 --- /dev/null +++ b/nickel/llm-router/contracts.ncl @@ -0,0 +1,25 @@ +{ + RoutingConfig = { + default_provider | String, + cost_tracking_enabled | Bool, + fallback_enabled | Bool, + }, + + ProviderConfig = { + enabled | Bool, + api_key | String | optional, + url | String | optional, + model | String, + max_tokens | Number, + temperature | Number, + cost_per_1m_input | Number, + cost_per_1m_output | Number, + }, + + RoutingRule = { + name | String, + condition | { .. }, + provider | String, + model_override | String | optional, + }, +} diff --git a/nickel/vapora/contracts.ncl b/nickel/vapora/contracts.ncl new file mode 100644 index 0000000..af2020a --- /dev/null +++ b/nickel/vapora/contracts.ncl @@ -0,0 +1,68 @@ +let LogLevel = std.contract.custom ( + fun label => + fun value => + let valid = ["trace", "debug", "info", "warn", "error"] in + if std.array.any (fun x => x == value) valid then + 'Ok value + else + 'Error { + message = "Invalid log_level '%{value}'.\nValid values: trace | debug | info | warn | error" + } +) in + +let Port = std.contract.custom ( + fun label => + fun value => + if value >= 1 && value <= 65535 then + 'Ok value + else + 'Error { + message = "Invalid port '%{std.to_string value}'.\nValid range: 1 - 65535" + } +) in + +{ + TlsConfig = { + enabled | Bool, + cert_path | String, + key_path | String, + }, + + ServerConfig = { + host | String, + port | Port, + tls | TlsConfig, + }, + + DatabaseConfig = { + url | String, + max_connections | Number, + }, + + NatsConfig = { + url | String, + stream_name | String, + }, + + AuthConfig = { + jwt_secret | String, + jwt_expiration_hours | Number, + }, + + LoggingConfig = { + level | LogLevel, + json | Bool, + }, + + MetricsConfig = { + enabled | Bool, + port | Port, + }, + + NotificationConfig = { + on_task_done | Array String | default = [], + on_proposal_approved | Array String | default = [], + on_proposal_rejected | Array String | default = [], + on_agent_inactive | Array String | default = [], + }, +} diff --git a/nickel/workflows/contracts.ncl b/nickel/workflows/contracts.ncl new file mode 100644 index 0000000..407f242 --- /dev/null +++ b/nickel/workflows/contracts.ncl @@ -0,0 +1,30 @@ +{ + EngineConfig = { + max_parallel_tasks | Number, + workflow_timeout | Number, + approval_gates_enabled | Bool, + cedar_policy_dir | String | optional, + }, + + ScheduleConfig = { + cron | String, + timezone | String | optional, + allow_concurrent | Bool, + catch_up | Bool, + }, + + WorkflowNotifications = { + on_completed | Array String | default = [], + on_failed | Array String | default = [], + on_approval_required | Array String | default = [], + }, + + StageConfig = { + name | String, + agents | Array String, + parallel | Bool | default = false, + max_parallel | Number | optional, + approval_required | Bool | default = false, + compensation_agents | Array String | optional, + }, +}