on+re:
- core.ncl: 5 new Practice nodes (notification-channels,
vapora-capabilities, agent-hot-reload-stable-identity,
merkle-audit-trail, notification-channels) + 5 new edges;
knowledge-graph-execution-history updated with HNSW+BM25+RRF
- state.ncl: production-readiness blocker/catalyst updated (hot-reload
complete, BudgetManager/LLMRouter still require restart);
ontoref-integration catalyst updated (vapora-ontology/reflection
crates, api-catalog.json, nickel contracts)
ADRs (NCL):
- adr-013: KG hybrid search — HNSW+BM25+RRF, rejected in-process scan
- adr-014: capability packages — AgentDefinition→vapora-shared,
DashMap shard-before-await constraint
- adr-015: Merkle audit trail — SHA-256 hash chain, rejected HMAC
- adr-016: agent hot-reload — stable_id=role, learning_profiles survive
drain, BudgetManager excluded from reload scope
landing page:
- 2 new feature boxes: VCS-Agnostic Worktree (jj/git), Ontology Protocol
- KG box: 20→28 tests, HNSW+BM25+RRF description
- Agents box: 71→82 tests, hot-reload + stable_id
- tech stack: Rust 21→23 crates, added jj, Radicle, ontoref badges
- status badge: 620→691 tests
14 KiB
🤖 Agent Registry & Coordination
Multi-Agent Orchestration System
Version: 0.1.0 Status: Specification (VAPORA v1.0 - Multi-Agent) Purpose: Sistema de registro, descubrimiento y coordinación de agentes
🎯 Objetivo
Crear un marketplace de agentes donde:
- ✅ 12 roles especializados trabajan en paralelo
- ✅ Cada agente tiene capacidades, dependencias, versiones claras
- ✅ Discovery & instalación automática
- ✅ Health monitoring + auto-restart
- ✅ Inter-agent communication via NATS JetStream
- ✅ Shared context via MCP/RAG
📋 Los 12 Roles de Agentes
Tier 1: Technical Core (Código)
Architect (Role ID: architect)
- Responsabilidad: Diseño de sistemas, decisiones arquitectónicas
- Entrada: Task de feature compleja, contexto de proyecto
- Salida: ADRs, design documents, architecture diagrams
- LLM óptimo: Claude Opus (complejidad alta)
- Trabajo: Individual o iniciador de workflows
- Canales: Publica decisiones, consulta Decision-Maker
Developer (Role ID: developer)
- Responsabilidad: Implementación de código
- Entrada: Especificación, ADR, task asignada
- Salida: Código, artifacts, PR
- LLM óptimo: Claude Sonnet (velocidad + calidad)
- Trabajo: Paralelo (múltiples developers por tarea)
- Canales: Escucha de Architect, reporta a Reviewer
Reviewer (Role ID: code-reviewer)
- Responsabilidad: Revisión de calidad, standards
- Entrada: Pull requests, código propuesto
- Salida: Comments, aprobación/rechazo, sugerencias
- LLM óptimo: Claude Sonnet o Gemini (análisis rápido)
- Trabajo: Paralelo (múltiples reviewers)
- Canales: Escucha PRs de Developer, reporta a Decision-Maker si crítico
Tester (Role ID: tester)
- Responsabilidad: Testing, benchmarks, QA
- Entrada: Código implementado
- Salida: Test code, benchmark reports, coverage metrics
- LLM óptimo: Claude Sonnet (genera tests)
- Trabajo: Paralelo
- Canales: Escucha de Reviewer, reporta a DevOps
Tier 2: Documentation & Communication
Documenter (Role ID: documenter)
- Responsabilidad: Documentación técnica, root files, ADRs
- Entrada: Código, decisions, análisis
- Salida: Docs en
docs/, actualizaciones README/CHANGELOG - Usa: Root Files Keeper + doc-lifecycle-manager
- LLM óptimo: GPT-4 (mejor formato)
- Trabajo: Async, actualiza continuamente
- Canales: Escucha cambios en repo, publica docs
Marketer (Role ID: marketer)
- Responsabilidad: Marketing content, messaging
- Entrada: Nuevas features, releases
- Salida: Blog posts, social content, press releases
- LLM óptimo: Claude Sonnet (creatividad)
- Trabajo: Async
- Canales: Escucha releases, publica content
Presenter (Role ID: presenter)
- Responsabilidad: Presentaciones, slides, demos
- Entrada: Features, arquitectura, roadmaps
- Salida: Slidev presentations, demo scripts
- LLM óptimo: Claude Sonnet (format + creativity)
- Trabajo: On-demand, por eventos
- Canales: Consulta Architect/Developer
Tier 3: Operations & Infrastructure
DevOps (Role ID: devops)
- Responsabilidad: CI/CD, deploys, infrastructure
- Entrada: Code approved, deployment requests
- Salida: Manifests K8s, deployment logs, rollback
- LLM óptimo: Claude Sonnet (IaC)
- Trabajo: Paralelo deploys
- Canales: Escucha de Reviewer (approved), publica deploy logs
Monitor (Role ID: monitor)
- Responsabilidad: Health checks, alerting, observability
- Entrada: Deployment events, metrics
- Salida: Alerts, dashboards, incident reports
- LLM óptimo: Gemini Flash (análisis rápido)
- Trabajo: Real-time, continuous
- Canales: Publica alerts, escucha todo
Security (Role ID: security)
- Responsabilidad: Security analysis, compliance, audits
- Entrada: Code changes, PRs, config
- Salida: Security reports, CVE checks, audit logs
- LLM óptimo: Claude Opus (análisis profundo)
- Trabajo: Async, on PRs críticos
- Canales: Escucha de Reviewer, puede bloquear PRs
Tier 4: Management & Coordination
ProjectManager (Role ID: project-manager)
- Responsabilidad: Roadmaps, task tracking, coordination
- Entrada: Completed tasks, metrics, blockers
- Salida: Roadmap updates, task assignments, status reports
- LLM óptimo: Claude Sonnet (análisis datos)
- Trabajo: Async, agregador
- Canales: Publica status, escucha completions
DecisionMaker (Role ID: decision-maker)
- Responsabilidad: Decisiones en conflictos, aprobaciones críticas
- Entrada: Reportes de agentes, decisiones pendientes
- Salida: Aprobaciones, resolución de conflictos
- LLM óptimo: Claude Opus (análisis nuanced)
- Trabajo: On-demand, decisiones críticas
- Canales: Escucha escalaciones, publica decisiones
Orchestrator (Role ID: orchestrator)
- Responsabilidad: Coordinación de agentes, assignment de tareas
- Entrada: Tasks a hacer, equipo disponible, constraints
- Salida: Task assignments, workflow coordination
- LLM óptimo: Claude Opus (planejamiento)
- Trabajo: Continuous, meta-agent
- Canales: Coordina todo, publica assignments
🏗️ Agent Registry Structure
Agent Metadata (SurrealDB)
pub struct AgentMetadata {
pub id: String, // "architect", "developer-001"
pub role: AgentRole, // Architect, Developer, etc
pub name: String, // "Senior Architect Agent"
pub version: String, // "0.1.0"
pub status: AgentStatus, // Active, Inactive, Updating, Error
pub capabilities: Vec<Capability>, // [Design, ADR, Decisions]
pub skills: Vec<String>, // ["rust", "kubernetes", "distributed-systems"]
pub llm_provider: LLMProvider, // Claude, OpenAI, Gemini, Ollama
pub llm_model: String, // "opus-4"
pub dependencies: Vec<String>, // Agents this one depends on
pub dependents: Vec<String>, // Agents that depend on this one
pub health_check: HealthCheckConfig,
pub max_concurrent_tasks: u32,
pub current_tasks: u32,
pub queue_depth: u32,
pub created_at: DateTime<Utc>,
pub last_health_check: DateTime<Utc>,
pub uptime_percentage: f64,
}
pub enum AgentRole {
Architect, Developer, CodeReviewer, Tester,
Documenter, Marketer, Presenter,
DevOps, Monitor, Security,
ProjectManager, DecisionMaker, Orchestrator,
}
pub enum AgentStatus {
Active,
Inactive,
Updating,
Error(String),
Scaling,
}
pub struct Capability {
pub id: String, // "design-adr"
pub name: String, // "Architecture Decision Records"
pub description: String,
pub complexity: Complexity, // Low, Medium, High, Critical
}
pub struct HealthCheckConfig {
pub interval_secs: u32,
pub timeout_secs: u32,
pub consecutive_failures_threshold: u32,
pub auto_restart_enabled: bool,
}
Agent Instance (Runtime)
pub struct AgentInstance {
pub metadata: AgentMetadata,
pub pod_id: String, // K8s pod ID
pub ip: String,
pub port: u16,
pub start_time: DateTime<Utc>,
pub last_heartbeat: DateTime<Utc>,
pub tasks_completed: u32,
pub avg_task_duration_ms: u32,
pub error_count: u32,
pub tokens_used: u64,
pub cost_incurred: f64,
}
📡 Inter-Agent Communication (NATS)
Message Protocol
pub enum AgentMessage {
// Task assignment
TaskAssigned {
task_id: String,
agent_id: String,
context: TaskContext,
deadline: DateTime<Utc>,
},
TaskStarted {
task_id: String,
agent_id: String,
timestamp: DateTime<Utc>,
},
TaskProgress {
task_id: String,
agent_id: String,
progress_percent: u32,
current_step: String,
},
TaskCompleted {
task_id: String,
agent_id: String,
result: TaskResult,
tokens_used: u64,
duration_ms: u32,
},
TaskFailed {
task_id: String,
agent_id: String,
error: String,
retry_count: u32,
},
// Communication
RequestHelp {
from_agent: String,
to_roles: Vec<AgentRole>,
context: String,
deadline: DateTime<Utc>,
},
HelpOffered {
from_agent: String,
to_agent: String,
capability: Capability,
},
ShareContext {
from_agent: String,
to_roles: Vec<AgentRole>,
context_type: String, // "decision", "analysis", "code"
data: Value,
ttl_minutes: u32,
},
// Coordination
RequestDecision {
from_agent: String,
decision_type: String,
context: String,
options: Vec<String>,
},
DecisionMade {
decision_id: String,
decision: String,
reasoning: String,
made_by: String,
},
// Health
Heartbeat {
agent_id: String,
status: AgentStatus,
load: f64, // 0.0-1.0
},
}
// NATS Subjects (pub/sub pattern)
pub mod subjects {
pub const TASK_ASSIGNED: &str = "vapora.tasks.assigned"; // Broadcast
pub const TASK_PROGRESS: &str = "vapora.tasks.progress"; // Broadcast
pub const TASK_COMPLETED: &str = "vapora.tasks.completed"; // Broadcast
pub const AGENT_HELP: &str = "vapora.agent.help"; // Request/Reply
pub const AGENT_DECISION: &str = "vapora.agent.decision"; // Request/Reply
pub const AGENT_HEARTBEAT: &str = "vapora.agent.heartbeat"; // Broadcast
}
Pub/Sub Patterns
// 1. Broadcast: Task assigned to all interested agents
nats.publish("vapora.tasks.assigned", task_message).await?;
// 2. Request/Reply: Developer asks Help from Architect
let help_request = AgentMessage::RequestHelp { ... };
let response = nats.request("vapora.agent.help", help_request, Duration::from_secs(30)).await?;
// 3. Stream: Persist task completion for replay
nats.publish_to_stream("vapora_tasks", "vapora.tasks.completed", completion_message).await?;
// 4. Subscribe: Monitor listens all heartbeats
let mut subscription = nats.subscribe("vapora.agent.heartbeat").await?;
🏪 Agent Discovery & Installation
Marketplace API
pub struct AgentRegistry {
pub agents: HashMap<String, AgentMetadata>,
pub available_agents: HashMap<String, AgentManifest>, // Registry
pub running_agents: HashMap<String, AgentInstance>, // Runtime
}
pub struct AgentManifest {
pub id: String,
pub name: String,
pub version: String,
pub role: AgentRole,
pub docker_image: String, // "vapora/agents:developer-0.1.0"
pub resources: ResourceRequirements,
pub dependencies: Vec<AgentDependency>,
pub health_check_endpoint: String,
pub capabilities: Vec<Capability>,
pub documentation: String,
}
pub struct AgentDependency {
pub agent_id: String,
pub role: AgentRole,
pub min_version: String,
pub optional: bool,
}
impl AgentRegistry {
// Discover available agents
pub async fn list_available(&self) -> Vec<AgentManifest> {
self.available_agents.values().cloned().collect()
}
// Install agent
pub async fn install(
&mut self,
manifest: AgentManifest,
count: u32,
) -> anyhow::Result<Vec<AgentInstance>> {
// Check dependencies
for dep in &manifest.dependencies {
if !self.is_available(&dep.agent_id) && !dep.optional {
return Err(anyhow::anyhow!("Dependency {} required", dep.agent_id));
}
}
// Deploy to K8s (via Provisioning)
let instances = self.deploy_to_k8s(&manifest, count).await?;
// Register
for instance in &instances {
self.running_agents.insert(instance.metadata.id.clone(), instance.clone());
}
Ok(instances)
}
// Health monitoring
pub async fn monitor_health(&mut self) -> anyhow::Result<()> {
for (id, instance) in &mut self.running_agents {
let health = self.check_agent_health(instance).await?;
if !health.healthy {
if health.consecutive_failures >= instance.metadata.health_check.consecutive_failures_threshold {
if instance.metadata.health_check.auto_restart_enabled {
self.restart_agent(id).await?;
}
}
}
}
Ok(())
}
}
🔄 Shared State & Context
Context Management
pub struct SharedContext {
pub project_id: String,
pub active_tasks: HashMap<String, Task>,
pub agent_states: HashMap<String, AgentState>,
pub decisions: HashMap<String, Decision>,
pub shared_knowledge: HashMap<String, Value>, // RAG indexed
}
pub struct AgentState {
pub agent_id: String,
pub current_task: Option<String>,
pub last_action: DateTime<Utc>,
pub available_until: DateTime<Utc>,
pub context_from_previous_tasks: Vec<String>,
}
// Access via MCP
impl SharedContext {
pub async fn get_context(&self, agent_id: &str) -> anyhow::Result<AgentState> {
self.agent_states.get(agent_id)
.cloned()
.ok_or(anyhow::anyhow!("Agent {} not found", agent_id))
}
pub async fn share_decision(&mut self, decision: Decision) -> anyhow::Result<()> {
self.decisions.insert(decision.id.clone(), decision);
// Notify interested agents via NATS
Ok(())
}
pub async fn share_knowledge(&mut self, key: String, value: Value) -> anyhow::Result<()> {
self.shared_knowledge.insert(key, value);
// Index in RAG
Ok(())
}
}
🎯 Implementation Checklist
- Define AgentMetadata + AgentInstance structs
- NATS JetStream integration
- Agent Registry CRUD operations
- Health monitoring + auto-restart logic
- Agent marketplace UI (Leptos)
- Installation flow (manifest parsing, K8s deployment)
- Pub/Sub message handlers
- Request/Reply pattern implementation
- Shared context via MCP
- CLI:
vapora agent list,vapora agent install,vapora agent scale - Logging + monitoring (Prometheus metrics)
- Tests (mocking, integration)
📊 Success Metrics
✅ Agents register and appear in registry ✅ Health checks run every N seconds ✅ Unhealthy agents restart automatically ✅ NATS messages route correctly ✅ Shared context accessible to all agents ✅ Agent scaling works (1 → N replicas) ✅ Task assignment < 100ms latency
Version: 0.1.0 Status: ✅ Specification Complete (VAPORA v1.0) Purpose: Multi-agent registry and coordination system