Vapora/docs/architecture/agent-registry-coordination.md
Jesús Pérez d14150da75 feat: Phase 5.3 - Multi-Agent Learning Infrastructure
Implement intelligent agent learning from Knowledge Graph execution history
with per-task-type expertise tracking, recency bias, and learning curves.

## Phase 5.3 Implementation

### Learning Infrastructure ( Complete)
- LearningProfileService with per-task-type expertise metrics
- TaskTypeExpertise model tracking success_rate, confidence, learning curves
- Recency bias weighting: recent 7 days weighted 3x higher (exponential decay)
- Confidence scoring prevents overfitting: min(1.0, executions / 20)
- Learning curves computed from daily execution windows

### Agent Scoring Service ( Complete)
- Unified AgentScore combining SwarmCoordinator + learning profiles
- Scoring formula: 0.3*base + 0.5*expertise + 0.2*confidence
- Rank agents by combined score for intelligent assignment
- Support for recency-biased scoring (recent_success_rate)
- Methods: rank_agents, select_best, rank_agents_with_recency

### KG Integration ( Complete)
- KGPersistence::get_executions_for_task_type() - query by agent + task type
- KGPersistence::get_agent_executions() - all executions for agent
- Coordinator::load_learning_profile_from_kg() - core KG→Learning integration
- Coordinator::load_all_learning_profiles() - batch load for multiple agents
- Convert PersistedExecution → ExecutionData for learning calculations

### Agent Assignment Integration ( Complete)
- AgentCoordinator uses learning profiles for task assignment
- extract_task_type() infers task type from title/description
- assign_task() scores candidates using AgentScoringService
- Fallback to load-based selection if no learning data available
- Learning profiles stored in coordinator.learning_profiles RwLock

### Profile Adapter Enhancements ( Complete)
- create_learning_profile() - initialize empty profiles
- add_task_type_expertise() - set task-type expertise
- update_profile_with_learning() - update swarm profiles from learning

## Files Modified

### vapora-knowledge-graph/src/persistence.rs (+30 lines)
- get_executions_for_task_type(agent_id, task_type, limit)
- get_agent_executions(agent_id, limit)

### vapora-agents/src/coordinator.rs (+100 lines)
- load_learning_profile_from_kg() - core KG integration method
- load_all_learning_profiles() - batch loading for agents
- assign_task() already uses learning-based scoring via AgentScoringService

### Existing Complete Implementation
- vapora-knowledge-graph/src/learning.rs - calculation functions
- vapora-agents/src/learning_profile.rs - data structures and expertise
- vapora-agents/src/scoring.rs - unified scoring service
- vapora-agents/src/profile_adapter.rs - adapter methods

## Tests Passing
- learning_profile: 7 tests 
- scoring: 5 tests 
- profile_adapter: 6 tests 
- coordinator: learning-specific tests 

## Data Flow
1. Task arrives → AgentCoordinator::assign_task()
2. Extract task_type from description
3. Query KG for task-type executions (load_learning_profile_from_kg)
4. Calculate expertise with recency bias
5. Score candidates (SwarmCoordinator + learning)
6. Assign to top-scored agent
7. Execution result → KG → Update learning profiles

## Key Design Decisions
 Recency bias: 7-day half-life with 3x weight for recent performance
 Confidence scoring: min(1.0, total_executions / 20) prevents overfitting
 Hierarchical scoring: 30% base load, 50% expertise, 20% confidence
 KG query limit: 100 recent executions per task-type for performance
 Async loading: load_learning_profile_from_kg supports concurrent loads

## Next: Phase 5.4 - Cost Optimization
Ready to implement budget enforcement and cost-aware provider selection.
2026-01-11 13:03:53 +00:00

14 KiB

🤖 Agent Registry & Coordination

Multi-Agent Orchestration System

Version: 0.1.0 Status: Specification (VAPORA v1.0 - Multi-Agent) Purpose: Sistema de registro, descubrimiento y coordinación de agentes


🎯 Objetivo

Crear un marketplace de agentes donde:

  • 12 roles especializados trabajan en paralelo
  • Cada agente tiene capacidades, dependencias, versiones claras
  • Discovery & instalación automática
  • Health monitoring + auto-restart
  • Inter-agent communication via NATS JetStream
  • Shared context via MCP/RAG

📋 Los 12 Roles de Agentes

Tier 1: Technical Core (Código)

Architect (Role ID: architect)

  • Responsabilidad: Diseño de sistemas, decisiones arquitectónicas
  • Entrada: Task de feature compleja, contexto de proyecto
  • Salida: ADRs, design documents, architecture diagrams
  • LLM óptimo: Claude Opus (complejidad alta)
  • Trabajo: Individual o iniciador de workflows
  • Canales: Publica decisiones, consulta Decision-Maker

Developer (Role ID: developer)

  • Responsabilidad: Implementación de código
  • Entrada: Especificación, ADR, task asignada
  • Salida: Código, artifacts, PR
  • LLM óptimo: Claude Sonnet (velocidad + calidad)
  • Trabajo: Paralelo (múltiples developers por tarea)
  • Canales: Escucha de Architect, reporta a Reviewer

Reviewer (Role ID: code-reviewer)

  • Responsabilidad: Revisión de calidad, standards
  • Entrada: Pull requests, código propuesto
  • Salida: Comments, aprobación/rechazo, sugerencias
  • LLM óptimo: Claude Sonnet o Gemini (análisis rápido)
  • Trabajo: Paralelo (múltiples reviewers)
  • Canales: Escucha PRs de Developer, reporta a Decision-Maker si crítico

Tester (Role ID: tester)

  • Responsabilidad: Testing, benchmarks, QA
  • Entrada: Código implementado
  • Salida: Test code, benchmark reports, coverage metrics
  • LLM óptimo: Claude Sonnet (genera tests)
  • Trabajo: Paralelo
  • Canales: Escucha de Reviewer, reporta a DevOps

Tier 2: Documentation & Communication

Documenter (Role ID: documenter)

  • Responsabilidad: Documentación técnica, root files, ADRs
  • Entrada: Código, decisions, análisis
  • Salida: Docs en docs/, actualizaciones README/CHANGELOG
  • Usa: Root Files Keeper + doc-lifecycle-manager
  • LLM óptimo: GPT-4 (mejor formato)
  • Trabajo: Async, actualiza continuamente
  • Canales: Escucha cambios en repo, publica docs

Marketer (Role ID: marketer)

  • Responsabilidad: Marketing content, messaging
  • Entrada: Nuevas features, releases
  • Salida: Blog posts, social content, press releases
  • LLM óptimo: Claude Sonnet (creatividad)
  • Trabajo: Async
  • Canales: Escucha releases, publica content

Presenter (Role ID: presenter)

  • Responsabilidad: Presentaciones, slides, demos
  • Entrada: Features, arquitectura, roadmaps
  • Salida: Slidev presentations, demo scripts
  • LLM óptimo: Claude Sonnet (format + creativity)
  • Trabajo: On-demand, por eventos
  • Canales: Consulta Architect/Developer

Tier 3: Operations & Infrastructure

DevOps (Role ID: devops)

  • Responsabilidad: CI/CD, deploys, infrastructure
  • Entrada: Code approved, deployment requests
  • Salida: Manifests K8s, deployment logs, rollback
  • LLM óptimo: Claude Sonnet (IaC)
  • Trabajo: Paralelo deploys
  • Canales: Escucha de Reviewer (approved), publica deploy logs

Monitor (Role ID: monitor)

  • Responsabilidad: Health checks, alerting, observability
  • Entrada: Deployment events, metrics
  • Salida: Alerts, dashboards, incident reports
  • LLM óptimo: Gemini Flash (análisis rápido)
  • Trabajo: Real-time, continuous
  • Canales: Publica alerts, escucha todo

Security (Role ID: security)

  • Responsabilidad: Security analysis, compliance, audits
  • Entrada: Code changes, PRs, config
  • Salida: Security reports, CVE checks, audit logs
  • LLM óptimo: Claude Opus (análisis profundo)
  • Trabajo: Async, on PRs críticos
  • Canales: Escucha de Reviewer, puede bloquear PRs

Tier 4: Management & Coordination

ProjectManager (Role ID: project-manager)

  • Responsabilidad: Roadmaps, task tracking, coordination
  • Entrada: Completed tasks, metrics, blockers
  • Salida: Roadmap updates, task assignments, status reports
  • LLM óptimo: Claude Sonnet (análisis datos)
  • Trabajo: Async, agregador
  • Canales: Publica status, escucha completions

DecisionMaker (Role ID: decision-maker)

  • Responsabilidad: Decisiones en conflictos, aprobaciones críticas
  • Entrada: Reportes de agentes, decisiones pendientes
  • Salida: Aprobaciones, resolución de conflictos
  • LLM óptimo: Claude Opus (análisis nuanced)
  • Trabajo: On-demand, decisiones críticas
  • Canales: Escucha escalaciones, publica decisiones

Orchestrator (Role ID: orchestrator)

  • Responsabilidad: Coordinación de agentes, assignment de tareas
  • Entrada: Tasks a hacer, equipo disponible, constraints
  • Salida: Task assignments, workflow coordination
  • LLM óptimo: Claude Opus (planejamiento)
  • Trabajo: Continuous, meta-agent
  • Canales: Coordina todo, publica assignments

🏗️ Agent Registry Structure

Agent Metadata (SurrealDB)

pub struct AgentMetadata {
    pub id: String,                    // "architect", "developer-001"
    pub role: AgentRole,               // Architect, Developer, etc
    pub name: String,                  // "Senior Architect Agent"
    pub version: String,               // "0.1.0"
    pub status: AgentStatus,           // Active, Inactive, Updating, Error

    pub capabilities: Vec<Capability>, // [Design, ADR, Decisions]
    pub skills: Vec<String>,           // ["rust", "kubernetes", "distributed-systems"]
    pub llm_provider: LLMProvider,      // Claude, OpenAI, Gemini, Ollama
    pub llm_model: String,             // "opus-4"

    pub dependencies: Vec<String>,     // Agents this one depends on
    pub dependents: Vec<String>,       // Agents that depend on this one

    pub health_check: HealthCheckConfig,
    pub max_concurrent_tasks: u32,
    pub current_tasks: u32,
    pub queue_depth: u32,

    pub created_at: DateTime<Utc>,
    pub last_health_check: DateTime<Utc>,
    pub uptime_percentage: f64,
}

pub enum AgentRole {
    Architect, Developer, CodeReviewer, Tester,
    Documenter, Marketer, Presenter,
    DevOps, Monitor, Security,
    ProjectManager, DecisionMaker, Orchestrator,
}

pub enum AgentStatus {
    Active,
    Inactive,
    Updating,
    Error(String),
    Scaling,
}

pub struct Capability {
    pub id: String,        // "design-adr"
    pub name: String,      // "Architecture Decision Records"
    pub description: String,
    pub complexity: Complexity,  // Low, Medium, High, Critical
}

pub struct HealthCheckConfig {
    pub interval_secs: u32,
    pub timeout_secs: u32,
    pub consecutive_failures_threshold: u32,
    pub auto_restart_enabled: bool,
}

Agent Instance (Runtime)

pub struct AgentInstance {
    pub metadata: AgentMetadata,
    pub pod_id: String,              // K8s pod ID
    pub ip: String,
    pub port: u16,
    pub start_time: DateTime<Utc>,
    pub last_heartbeat: DateTime<Utc>,
    pub tasks_completed: u32,
    pub avg_task_duration_ms: u32,
    pub error_count: u32,
    pub tokens_used: u64,
    pub cost_incurred: f64,
}

📡 Inter-Agent Communication (NATS)

Message Protocol

pub enum AgentMessage {
    // Task assignment
    TaskAssigned {
        task_id: String,
        agent_id: String,
        context: TaskContext,
        deadline: DateTime<Utc>,
    },
    TaskStarted {
        task_id: String,
        agent_id: String,
        timestamp: DateTime<Utc>,
    },
    TaskProgress {
        task_id: String,
        agent_id: String,
        progress_percent: u32,
        current_step: String,
    },
    TaskCompleted {
        task_id: String,
        agent_id: String,
        result: TaskResult,
        tokens_used: u64,
        duration_ms: u32,
    },
    TaskFailed {
        task_id: String,
        agent_id: String,
        error: String,
        retry_count: u32,
    },

    // Communication
    RequestHelp {
        from_agent: String,
        to_roles: Vec<AgentRole>,
        context: String,
        deadline: DateTime<Utc>,
    },
    HelpOffered {
        from_agent: String,
        to_agent: String,
        capability: Capability,
    },
    ShareContext {
        from_agent: String,
        to_roles: Vec<AgentRole>,
        context_type: String,  // "decision", "analysis", "code"
        data: Value,
        ttl_minutes: u32,
    },

    // Coordination
    RequestDecision {
        from_agent: String,
        decision_type: String,
        context: String,
        options: Vec<String>,
    },
    DecisionMade {
        decision_id: String,
        decision: String,
        reasoning: String,
        made_by: String,
    },

    // Health
    Heartbeat {
        agent_id: String,
        status: AgentStatus,
        load: f64,  // 0.0-1.0
    },
}

// NATS Subjects (pub/sub pattern)
pub mod subjects {
    pub const TASK_ASSIGNED: &str = "vapora.tasks.assigned";        // Broadcast
    pub const TASK_PROGRESS: &str = "vapora.tasks.progress";        // Broadcast
    pub const TASK_COMPLETED: &str = "vapora.tasks.completed";      // Broadcast
    pub const AGENT_HELP: &str = "vapora.agent.help";              // Request/Reply
    pub const AGENT_DECISION: &str = "vapora.agent.decision";      // Request/Reply
    pub const AGENT_HEARTBEAT: &str = "vapora.agent.heartbeat";    // Broadcast
}

Pub/Sub Patterns

// 1. Broadcast: Task assigned to all interested agents
nats.publish("vapora.tasks.assigned", task_message).await?;

// 2. Request/Reply: Developer asks Help from Architect
let help_request = AgentMessage::RequestHelp { ... };
let response = nats.request("vapora.agent.help", help_request, Duration::from_secs(30)).await?;

// 3. Stream: Persist task completion for replay
nats.publish_to_stream("vapora_tasks", "vapora.tasks.completed", completion_message).await?;

// 4. Subscribe: Monitor listens all heartbeats
let mut subscription = nats.subscribe("vapora.agent.heartbeat").await?;

🏪 Agent Discovery & Installation

Marketplace API

pub struct AgentRegistry {
    pub agents: HashMap<String, AgentMetadata>,
    pub available_agents: HashMap<String, AgentManifest>,  // Registry
    pub running_agents: HashMap<String, AgentInstance>,    // Runtime
}

pub struct AgentManifest {
    pub id: String,
    pub name: String,
    pub version: String,
    pub role: AgentRole,
    pub docker_image: String,           // "vapora/agents:developer-0.1.0"
    pub resources: ResourceRequirements,
    pub dependencies: Vec<AgentDependency>,
    pub health_check_endpoint: String,
    pub capabilities: Vec<Capability>,
    pub documentation: String,
}

pub struct AgentDependency {
    pub agent_id: String,
    pub role: AgentRole,
    pub min_version: String,
    pub optional: bool,
}

impl AgentRegistry {
    // Discover available agents
    pub async fn list_available(&self) -> Vec<AgentManifest> {
        self.available_agents.values().cloned().collect()
    }

    // Install agent
    pub async fn install(
        &mut self,
        manifest: AgentManifest,
        count: u32,
    ) -> anyhow::Result<Vec<AgentInstance>> {
        // Check dependencies
        for dep in &manifest.dependencies {
            if !self.is_available(&dep.agent_id) && !dep.optional {
                return Err(anyhow::anyhow!("Dependency {} required", dep.agent_id));
            }
        }

        // Deploy to K8s (via Provisioning)
        let instances = self.deploy_to_k8s(&manifest, count).await?;

        // Register
        for instance in &instances {
            self.running_agents.insert(instance.metadata.id.clone(), instance.clone());
        }

        Ok(instances)
    }

    // Health monitoring
    pub async fn monitor_health(&mut self) -> anyhow::Result<()> {
        for (id, instance) in &mut self.running_agents {
            let health = self.check_agent_health(instance).await?;
            if !health.healthy {
                if health.consecutive_failures >= instance.metadata.health_check.consecutive_failures_threshold {
                    if instance.metadata.health_check.auto_restart_enabled {
                        self.restart_agent(id).await?;
                    }
                }
            }
        }
        Ok(())
    }
}

🔄 Shared State & Context

Context Management

pub struct SharedContext {
    pub project_id: String,
    pub active_tasks: HashMap<String, Task>,
    pub agent_states: HashMap<String, AgentState>,
    pub decisions: HashMap<String, Decision>,
    pub shared_knowledge: HashMap<String, Value>,  // RAG indexed
}

pub struct AgentState {
    pub agent_id: String,
    pub current_task: Option<String>,
    pub last_action: DateTime<Utc>,
    pub available_until: DateTime<Utc>,
    pub context_from_previous_tasks: Vec<String>,
}

// Access via MCP
impl SharedContext {
    pub async fn get_context(&self, agent_id: &str) -> anyhow::Result<AgentState> {
        self.agent_states.get(agent_id)
            .cloned()
            .ok_or(anyhow::anyhow!("Agent {} not found", agent_id))
    }

    pub async fn share_decision(&mut self, decision: Decision) -> anyhow::Result<()> {
        self.decisions.insert(decision.id.clone(), decision);
        // Notify interested agents via NATS
        Ok(())
    }

    pub async fn share_knowledge(&mut self, key: String, value: Value) -> anyhow::Result<()> {
        self.shared_knowledge.insert(key, value);
        // Index in RAG
        Ok(())
    }
}

🎯 Implementation Checklist

  • Define AgentMetadata + AgentInstance structs
  • NATS JetStream integration
  • Agent Registry CRUD operations
  • Health monitoring + auto-restart logic
  • Agent marketplace UI (Leptos)
  • Installation flow (manifest parsing, K8s deployment)
  • Pub/Sub message handlers
  • Request/Reply pattern implementation
  • Shared context via MCP
  • CLI: vapora agent list, vapora agent install, vapora agent scale
  • Logging + monitoring (Prometheus metrics)
  • Tests (mocking, integration)

📊 Success Metrics

Agents register and appear in registry Health checks run every N seconds Unhealthy agents restart automatically NATS messages route correctly Shared context accessible to all agents Agent scaling works (1 → N replicas) Task assignment < 100ms latency


Version: 0.1.0 Status: Specification Complete (VAPORA v1.0) Purpose: Multi-agent registry and coordination system