feat: Phase 5.3 - Multi-Agent Learning Infrastructure
Implement intelligent agent learning from Knowledge Graph execution history
with per-task-type expertise tracking, recency bias, and learning curves.
## Phase 5.3 Implementation
### Learning Infrastructure (✅ Complete)
- LearningProfileService with per-task-type expertise metrics
- TaskTypeExpertise model tracking success_rate, confidence, learning curves
- Recency bias weighting: recent 7 days weighted 3x higher (exponential decay)
- Confidence scoring prevents overfitting: min(1.0, executions / 20)
- Learning curves computed from daily execution windows
### Agent Scoring Service (✅ Complete)
- Unified AgentScore combining SwarmCoordinator + learning profiles
- Scoring formula: 0.3*base + 0.5*expertise + 0.2*confidence
- Rank agents by combined score for intelligent assignment
- Support for recency-biased scoring (recent_success_rate)
- Methods: rank_agents, select_best, rank_agents_with_recency
### KG Integration (✅ Complete)
- KGPersistence::get_executions_for_task_type() - query by agent + task type
- KGPersistence::get_agent_executions() - all executions for agent
- Coordinator::load_learning_profile_from_kg() - core KG→Learning integration
- Coordinator::load_all_learning_profiles() - batch load for multiple agents
- Convert PersistedExecution → ExecutionData for learning calculations
### Agent Assignment Integration (✅ Complete)
- AgentCoordinator uses learning profiles for task assignment
- extract_task_type() infers task type from title/description
- assign_task() scores candidates using AgentScoringService
- Fallback to load-based selection if no learning data available
- Learning profiles stored in coordinator.learning_profiles RwLock
### Profile Adapter Enhancements (✅ Complete)
- create_learning_profile() - initialize empty profiles
- add_task_type_expertise() - set task-type expertise
- update_profile_with_learning() - update swarm profiles from learning
## Files Modified
### vapora-knowledge-graph/src/persistence.rs (+30 lines)
- get_executions_for_task_type(agent_id, task_type, limit)
- get_agent_executions(agent_id, limit)
### vapora-agents/src/coordinator.rs (+100 lines)
- load_learning_profile_from_kg() - core KG integration method
- load_all_learning_profiles() - batch loading for agents
- assign_task() already uses learning-based scoring via AgentScoringService
### Existing Complete Implementation
- vapora-knowledge-graph/src/learning.rs - calculation functions
- vapora-agents/src/learning_profile.rs - data structures and expertise
- vapora-agents/src/scoring.rs - unified scoring service
- vapora-agents/src/profile_adapter.rs - adapter methods
## Tests Passing
- learning_profile: 7 tests ✅
- scoring: 5 tests ✅
- profile_adapter: 6 tests ✅
- coordinator: learning-specific tests ✅
## Data Flow
1. Task arrives → AgentCoordinator::assign_task()
2. Extract task_type from description
3. Query KG for task-type executions (load_learning_profile_from_kg)
4. Calculate expertise with recency bias
5. Score candidates (SwarmCoordinator + learning)
6. Assign to top-scored agent
7. Execution result → KG → Update learning profiles
## Key Design Decisions
✅ Recency bias: 7-day half-life with 3x weight for recent performance
✅ Confidence scoring: min(1.0, total_executions / 20) prevents overfitting
✅ Hierarchical scoring: 30% base load, 50% expertise, 20% confidence
✅ KG query limit: 100 recent executions per task-type for performance
✅ Async loading: load_learning_profile_from_kg supports concurrent loads
## Next: Phase 5.4 - Cost Optimization
Ready to implement budget enforcement and cost-aware provider selection.
2026-01-11 13:03:53 +00:00
|
|
|
// vapora-agents: NATS message protocol for inter-agent communication
|
|
|
|
|
// Phase 2: Message types for agent coordination
|
|
|
|
|
|
|
|
|
|
use chrono::{DateTime, Utc};
|
|
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
|
|
|
|
|
|
/// Agent message envelope for NATS pub/sub
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
|
#[serde(tag = "type", rename_all = "snake_case")]
|
|
|
|
|
pub enum AgentMessage {
|
|
|
|
|
TaskAssigned(TaskAssignment),
|
|
|
|
|
TaskStarted(TaskStarted),
|
|
|
|
|
TaskProgress(TaskProgress),
|
|
|
|
|
TaskCompleted(TaskCompleted),
|
|
|
|
|
TaskFailed(TaskFailed),
|
|
|
|
|
Heartbeat(Heartbeat),
|
|
|
|
|
AgentRegistered(AgentRegistered),
|
|
|
|
|
AgentStopped(AgentStopped),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
|
pub struct TaskAssignment {
|
|
|
|
|
pub id: String,
|
|
|
|
|
pub agent_id: String,
|
|
|
|
|
pub required_role: String,
|
|
|
|
|
pub title: String,
|
|
|
|
|
pub description: String,
|
|
|
|
|
pub context: String,
|
|
|
|
|
pub priority: u32,
|
|
|
|
|
pub deadline: Option<DateTime<Utc>>,
|
|
|
|
|
pub assigned_at: DateTime<Utc>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
|
pub struct TaskStarted {
|
|
|
|
|
pub task_id: String,
|
|
|
|
|
pub agent_id: String,
|
|
|
|
|
pub started_at: DateTime<Utc>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
|
pub struct TaskProgress {
|
|
|
|
|
pub task_id: String,
|
|
|
|
|
pub agent_id: String,
|
|
|
|
|
pub progress_percent: u32,
|
|
|
|
|
pub current_step: String,
|
|
|
|
|
pub estimated_completion: Option<DateTime<Utc>>,
|
|
|
|
|
pub updated_at: DateTime<Utc>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
|
pub struct TaskCompleted {
|
|
|
|
|
pub task_id: String,
|
|
|
|
|
pub agent_id: String,
|
|
|
|
|
pub result: String,
|
|
|
|
|
pub artifacts: Vec<String>,
|
|
|
|
|
pub tokens_used: u64,
|
|
|
|
|
pub duration_ms: u64,
|
|
|
|
|
pub completed_at: DateTime<Utc>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
|
pub struct TaskFailed {
|
|
|
|
|
pub task_id: String,
|
|
|
|
|
pub agent_id: String,
|
|
|
|
|
pub error: String,
|
|
|
|
|
pub retry_count: u32,
|
|
|
|
|
pub can_retry: bool,
|
|
|
|
|
pub failed_at: DateTime<Utc>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
|
pub struct Heartbeat {
|
|
|
|
|
pub agent_id: String,
|
|
|
|
|
pub status: String,
|
|
|
|
|
pub load: f64,
|
|
|
|
|
pub active_tasks: u32,
|
|
|
|
|
pub total_tasks_completed: u64,
|
|
|
|
|
pub uptime_seconds: u64,
|
|
|
|
|
pub timestamp: DateTime<Utc>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
|
pub struct AgentRegistered {
|
|
|
|
|
pub agent_id: String,
|
|
|
|
|
pub role: String,
|
|
|
|
|
pub version: String,
|
|
|
|
|
pub capabilities: Vec<String>,
|
|
|
|
|
pub registered_at: DateTime<Utc>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
|
pub struct AgentStopped {
|
|
|
|
|
pub agent_id: String,
|
|
|
|
|
pub role: String,
|
|
|
|
|
pub reason: String,
|
|
|
|
|
pub stopped_at: DateTime<Utc>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl AgentMessage {
|
|
|
|
|
/// Serialize message to JSON bytes for NATS
|
|
|
|
|
pub fn to_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
|
|
|
|
|
serde_json::to_vec(self)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Deserialize message from JSON bytes
|
|
|
|
|
pub fn from_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
|
|
|
|
|
serde_json::from_slice(bytes)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get message type as string
|
|
|
|
|
pub fn message_type(&self) -> &str {
|
|
|
|
|
match self {
|
|
|
|
|
AgentMessage::TaskAssigned(_) => "task_assigned",
|
|
|
|
|
AgentMessage::TaskStarted(_) => "task_started",
|
|
|
|
|
AgentMessage::TaskProgress(_) => "task_progress",
|
|
|
|
|
AgentMessage::TaskCompleted(_) => "task_completed",
|
|
|
|
|
AgentMessage::TaskFailed(_) => "task_failed",
|
|
|
|
|
AgentMessage::Heartbeat(_) => "heartbeat",
|
|
|
|
|
AgentMessage::AgentRegistered(_) => "agent_registered",
|
|
|
|
|
AgentMessage::AgentStopped(_) => "agent_stopped",
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// NATS subjects for agent communication
|
|
|
|
|
pub mod subjects {
|
|
|
|
|
pub const TASKS_ASSIGNED: &str = "vapora.tasks.assigned";
|
|
|
|
|
pub const TASKS_STARTED: &str = "vapora.tasks.started";
|
|
|
|
|
pub const TASKS_PROGRESS: &str = "vapora.tasks.progress";
|
|
|
|
|
pub const TASKS_COMPLETED: &str = "vapora.tasks.completed";
|
|
|
|
|
pub const TASKS_FAILED: &str = "vapora.tasks.failed";
|
|
|
|
|
pub const AGENT_HEARTBEAT: &str = "vapora.agent.heartbeat";
|
|
|
|
|
pub const AGENT_REGISTERED: &str = "vapora.agent.registered";
|
|
|
|
|
pub const AGENT_STOPPED: &str = "vapora.agent.stopped";
|
|
|
|
|
|
|
|
|
|
/// Get subject for a specific agent role
|
|
|
|
|
pub fn agent_role_subject(role: &str) -> String {
|
|
|
|
|
format!("vapora.agent.role.{}", role)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get subject for a specific task
|
|
|
|
|
pub fn task_subject(task_id: &str) -> String {
|
|
|
|
|
format!("vapora.task.{}", task_id)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod tests {
|
|
|
|
|
use super::*;
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_message_serialization() {
|
|
|
|
|
let msg = AgentMessage::TaskAssigned(TaskAssignment {
|
|
|
|
|
id: "task-123".to_string(),
|
|
|
|
|
agent_id: "agent-001".to_string(),
|
|
|
|
|
required_role: "developer".to_string(),
|
|
|
|
|
title: "Test task".to_string(),
|
|
|
|
|
description: "Test description".to_string(),
|
|
|
|
|
context: "{}".to_string(),
|
|
|
|
|
priority: 80,
|
|
|
|
|
deadline: None,
|
|
|
|
|
assigned_at: Utc::now(),
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let bytes = msg.to_bytes().unwrap();
|
|
|
|
|
let deserialized = AgentMessage::from_bytes(&bytes).unwrap();
|
|
|
|
|
|
|
|
|
|
assert_eq!(msg.message_type(), deserialized.message_type());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_heartbeat_message() {
|
|
|
|
|
let heartbeat = Heartbeat {
|
|
|
|
|
agent_id: "agent-001".to_string(),
|
|
|
|
|
status: "active".to_string(),
|
|
|
|
|
load: 0.5,
|
|
|
|
|
active_tasks: 2,
|
|
|
|
|
total_tasks_completed: 100,
|
|
|
|
|
uptime_seconds: 3600,
|
|
|
|
|
timestamp: Utc::now(),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let msg = AgentMessage::Heartbeat(heartbeat);
|
|
|
|
|
assert_eq!(msg.message_type(), "heartbeat");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_subject_generation() {
|
2026-01-11 21:32:56 +00:00
|
|
|
assert_eq!(
|
|
|
|
|
subjects::agent_role_subject("developer"),
|
|
|
|
|
"vapora.agent.role.developer"
|
|
|
|
|
);
|
feat: Phase 5.3 - Multi-Agent Learning Infrastructure
Implement intelligent agent learning from Knowledge Graph execution history
with per-task-type expertise tracking, recency bias, and learning curves.
## Phase 5.3 Implementation
### Learning Infrastructure (✅ Complete)
- LearningProfileService with per-task-type expertise metrics
- TaskTypeExpertise model tracking success_rate, confidence, learning curves
- Recency bias weighting: recent 7 days weighted 3x higher (exponential decay)
- Confidence scoring prevents overfitting: min(1.0, executions / 20)
- Learning curves computed from daily execution windows
### Agent Scoring Service (✅ Complete)
- Unified AgentScore combining SwarmCoordinator + learning profiles
- Scoring formula: 0.3*base + 0.5*expertise + 0.2*confidence
- Rank agents by combined score for intelligent assignment
- Support for recency-biased scoring (recent_success_rate)
- Methods: rank_agents, select_best, rank_agents_with_recency
### KG Integration (✅ Complete)
- KGPersistence::get_executions_for_task_type() - query by agent + task type
- KGPersistence::get_agent_executions() - all executions for agent
- Coordinator::load_learning_profile_from_kg() - core KG→Learning integration
- Coordinator::load_all_learning_profiles() - batch load for multiple agents
- Convert PersistedExecution → ExecutionData for learning calculations
### Agent Assignment Integration (✅ Complete)
- AgentCoordinator uses learning profiles for task assignment
- extract_task_type() infers task type from title/description
- assign_task() scores candidates using AgentScoringService
- Fallback to load-based selection if no learning data available
- Learning profiles stored in coordinator.learning_profiles RwLock
### Profile Adapter Enhancements (✅ Complete)
- create_learning_profile() - initialize empty profiles
- add_task_type_expertise() - set task-type expertise
- update_profile_with_learning() - update swarm profiles from learning
## Files Modified
### vapora-knowledge-graph/src/persistence.rs (+30 lines)
- get_executions_for_task_type(agent_id, task_type, limit)
- get_agent_executions(agent_id, limit)
### vapora-agents/src/coordinator.rs (+100 lines)
- load_learning_profile_from_kg() - core KG integration method
- load_all_learning_profiles() - batch loading for agents
- assign_task() already uses learning-based scoring via AgentScoringService
### Existing Complete Implementation
- vapora-knowledge-graph/src/learning.rs - calculation functions
- vapora-agents/src/learning_profile.rs - data structures and expertise
- vapora-agents/src/scoring.rs - unified scoring service
- vapora-agents/src/profile_adapter.rs - adapter methods
## Tests Passing
- learning_profile: 7 tests ✅
- scoring: 5 tests ✅
- profile_adapter: 6 tests ✅
- coordinator: learning-specific tests ✅
## Data Flow
1. Task arrives → AgentCoordinator::assign_task()
2. Extract task_type from description
3. Query KG for task-type executions (load_learning_profile_from_kg)
4. Calculate expertise with recency bias
5. Score candidates (SwarmCoordinator + learning)
6. Assign to top-scored agent
7. Execution result → KG → Update learning profiles
## Key Design Decisions
✅ Recency bias: 7-day half-life with 3x weight for recent performance
✅ Confidence scoring: min(1.0, total_executions / 20) prevents overfitting
✅ Hierarchical scoring: 30% base load, 50% expertise, 20% confidence
✅ KG query limit: 100 recent executions per task-type for performance
✅ Async loading: load_learning_profile_from_kg supports concurrent loads
## Next: Phase 5.4 - Cost Optimization
Ready to implement budget enforcement and cost-aware provider selection.
2026-01-11 13:03:53 +00:00
|
|
|
assert_eq!(subjects::task_subject("task-123"), "vapora.task.task-123");
|
|
|
|
|
}
|
|
|
|
|
}
|