feat: Phase 5.3 - Multi-Agent Learning Infrastructure
Implement intelligent agent learning from Knowledge Graph execution history
with per-task-type expertise tracking, recency bias, and learning curves.
## Phase 5.3 Implementation
### Learning Infrastructure (✅ Complete)
- LearningProfileService with per-task-type expertise metrics
- TaskTypeExpertise model tracking success_rate, confidence, learning curves
- Recency bias weighting: recent 7 days weighted 3x higher (exponential decay)
- Confidence scoring prevents overfitting: min(1.0, executions / 20)
- Learning curves computed from daily execution windows
### Agent Scoring Service (✅ Complete)
- Unified AgentScore combining SwarmCoordinator + learning profiles
- Scoring formula: 0.3*base + 0.5*expertise + 0.2*confidence
- Rank agents by combined score for intelligent assignment
- Support for recency-biased scoring (recent_success_rate)
- Methods: rank_agents, select_best, rank_agents_with_recency
### KG Integration (✅ Complete)
- KGPersistence::get_executions_for_task_type() - query by agent + task type
- KGPersistence::get_agent_executions() - all executions for agent
- Coordinator::load_learning_profile_from_kg() - core KG→Learning integration
- Coordinator::load_all_learning_profiles() - batch load for multiple agents
- Convert PersistedExecution → ExecutionData for learning calculations
### Agent Assignment Integration (✅ Complete)
- AgentCoordinator uses learning profiles for task assignment
- extract_task_type() infers task type from title/description
- assign_task() scores candidates using AgentScoringService
- Fallback to load-based selection if no learning data available
- Learning profiles stored in coordinator.learning_profiles RwLock
### Profile Adapter Enhancements (✅ Complete)
- create_learning_profile() - initialize empty profiles
- add_task_type_expertise() - set task-type expertise
- update_profile_with_learning() - update swarm profiles from learning
## Files Modified
### vapora-knowledge-graph/src/persistence.rs (+30 lines)
- get_executions_for_task_type(agent_id, task_type, limit)
- get_agent_executions(agent_id, limit)
### vapora-agents/src/coordinator.rs (+100 lines)
- load_learning_profile_from_kg() - core KG integration method
- load_all_learning_profiles() - batch loading for agents
- assign_task() already uses learning-based scoring via AgentScoringService
### Existing Complete Implementation
- vapora-knowledge-graph/src/learning.rs - calculation functions
- vapora-agents/src/learning_profile.rs - data structures and expertise
- vapora-agents/src/scoring.rs - unified scoring service
- vapora-agents/src/profile_adapter.rs - adapter methods
## Tests Passing
- learning_profile: 7 tests ✅
- scoring: 5 tests ✅
- profile_adapter: 6 tests ✅
- coordinator: learning-specific tests ✅
## Data Flow
1. Task arrives → AgentCoordinator::assign_task()
2. Extract task_type from description
3. Query KG for task-type executions (load_learning_profile_from_kg)
4. Calculate expertise with recency bias
5. Score candidates (SwarmCoordinator + learning)
6. Assign to top-scored agent
7. Execution result → KG → Update learning profiles
## Key Design Decisions
✅ Recency bias: 7-day half-life with 3x weight for recent performance
✅ Confidence scoring: min(1.0, total_executions / 20) prevents overfitting
✅ Hierarchical scoring: 30% base load, 50% expertise, 20% confidence
✅ KG query limit: 100 recent executions per task-type for performance
✅ Async loading: load_learning_profile_from_kg supports concurrent loads
## Next: Phase 5.4 - Cost Optimization
Ready to implement budget enforcement and cost-aware provider selection.
2026-01-11 13:03:53 +00:00
|
|
|
// vapora-backend: WebSocket handler for real-time workflow updates
|
|
|
|
|
// Phase 3: Stream workflow progress to connected clients
|
|
|
|
|
|
|
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
|
use tokio::sync::broadcast;
|
|
|
|
|
use tracing::{debug, error};
|
|
|
|
|
|
|
|
|
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
2026-01-11 21:32:56 +00:00
|
|
|
#[allow(dead_code)]
|
feat: Phase 5.3 - Multi-Agent Learning Infrastructure
Implement intelligent agent learning from Knowledge Graph execution history
with per-task-type expertise tracking, recency bias, and learning curves.
## Phase 5.3 Implementation
### Learning Infrastructure (✅ Complete)
- LearningProfileService with per-task-type expertise metrics
- TaskTypeExpertise model tracking success_rate, confidence, learning curves
- Recency bias weighting: recent 7 days weighted 3x higher (exponential decay)
- Confidence scoring prevents overfitting: min(1.0, executions / 20)
- Learning curves computed from daily execution windows
### Agent Scoring Service (✅ Complete)
- Unified AgentScore combining SwarmCoordinator + learning profiles
- Scoring formula: 0.3*base + 0.5*expertise + 0.2*confidence
- Rank agents by combined score for intelligent assignment
- Support for recency-biased scoring (recent_success_rate)
- Methods: rank_agents, select_best, rank_agents_with_recency
### KG Integration (✅ Complete)
- KGPersistence::get_executions_for_task_type() - query by agent + task type
- KGPersistence::get_agent_executions() - all executions for agent
- Coordinator::load_learning_profile_from_kg() - core KG→Learning integration
- Coordinator::load_all_learning_profiles() - batch load for multiple agents
- Convert PersistedExecution → ExecutionData for learning calculations
### Agent Assignment Integration (✅ Complete)
- AgentCoordinator uses learning profiles for task assignment
- extract_task_type() infers task type from title/description
- assign_task() scores candidates using AgentScoringService
- Fallback to load-based selection if no learning data available
- Learning profiles stored in coordinator.learning_profiles RwLock
### Profile Adapter Enhancements (✅ Complete)
- create_learning_profile() - initialize empty profiles
- add_task_type_expertise() - set task-type expertise
- update_profile_with_learning() - update swarm profiles from learning
## Files Modified
### vapora-knowledge-graph/src/persistence.rs (+30 lines)
- get_executions_for_task_type(agent_id, task_type, limit)
- get_agent_executions(agent_id, limit)
### vapora-agents/src/coordinator.rs (+100 lines)
- load_learning_profile_from_kg() - core KG integration method
- load_all_learning_profiles() - batch loading for agents
- assign_task() already uses learning-based scoring via AgentScoringService
### Existing Complete Implementation
- vapora-knowledge-graph/src/learning.rs - calculation functions
- vapora-agents/src/learning_profile.rs - data structures and expertise
- vapora-agents/src/scoring.rs - unified scoring service
- vapora-agents/src/profile_adapter.rs - adapter methods
## Tests Passing
- learning_profile: 7 tests ✅
- scoring: 5 tests ✅
- profile_adapter: 6 tests ✅
- coordinator: learning-specific tests ✅
## Data Flow
1. Task arrives → AgentCoordinator::assign_task()
2. Extract task_type from description
3. Query KG for task-type executions (load_learning_profile_from_kg)
4. Calculate expertise with recency bias
5. Score candidates (SwarmCoordinator + learning)
6. Assign to top-scored agent
7. Execution result → KG → Update learning profiles
## Key Design Decisions
✅ Recency bias: 7-day half-life with 3x weight for recent performance
✅ Confidence scoring: min(1.0, total_executions / 20) prevents overfitting
✅ Hierarchical scoring: 30% base load, 50% expertise, 20% confidence
✅ KG query limit: 100 recent executions per task-type for performance
✅ Async loading: load_learning_profile_from_kg supports concurrent loads
## Next: Phase 5.4 - Cost Optimization
Ready to implement budget enforcement and cost-aware provider selection.
2026-01-11 13:03:53 +00:00
|
|
|
pub struct WorkflowUpdate {
|
|
|
|
|
pub workflow_id: String,
|
|
|
|
|
pub status: String,
|
|
|
|
|
pub progress: u32,
|
|
|
|
|
pub message: String,
|
|
|
|
|
pub timestamp: chrono::DateTime<chrono::Utc>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl WorkflowUpdate {
|
|
|
|
|
pub fn new(workflow_id: String, status: String, progress: u32, message: String) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
workflow_id,
|
|
|
|
|
status,
|
|
|
|
|
progress,
|
|
|
|
|
message,
|
|
|
|
|
timestamp: chrono::Utc::now(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Broadcaster for workflow updates
|
2026-01-11 21:32:56 +00:00
|
|
|
#[allow(dead_code)]
|
feat: Phase 5.3 - Multi-Agent Learning Infrastructure
Implement intelligent agent learning from Knowledge Graph execution history
with per-task-type expertise tracking, recency bias, and learning curves.
## Phase 5.3 Implementation
### Learning Infrastructure (✅ Complete)
- LearningProfileService with per-task-type expertise metrics
- TaskTypeExpertise model tracking success_rate, confidence, learning curves
- Recency bias weighting: recent 7 days weighted 3x higher (exponential decay)
- Confidence scoring prevents overfitting: min(1.0, executions / 20)
- Learning curves computed from daily execution windows
### Agent Scoring Service (✅ Complete)
- Unified AgentScore combining SwarmCoordinator + learning profiles
- Scoring formula: 0.3*base + 0.5*expertise + 0.2*confidence
- Rank agents by combined score for intelligent assignment
- Support for recency-biased scoring (recent_success_rate)
- Methods: rank_agents, select_best, rank_agents_with_recency
### KG Integration (✅ Complete)
- KGPersistence::get_executions_for_task_type() - query by agent + task type
- KGPersistence::get_agent_executions() - all executions for agent
- Coordinator::load_learning_profile_from_kg() - core KG→Learning integration
- Coordinator::load_all_learning_profiles() - batch load for multiple agents
- Convert PersistedExecution → ExecutionData for learning calculations
### Agent Assignment Integration (✅ Complete)
- AgentCoordinator uses learning profiles for task assignment
- extract_task_type() infers task type from title/description
- assign_task() scores candidates using AgentScoringService
- Fallback to load-based selection if no learning data available
- Learning profiles stored in coordinator.learning_profiles RwLock
### Profile Adapter Enhancements (✅ Complete)
- create_learning_profile() - initialize empty profiles
- add_task_type_expertise() - set task-type expertise
- update_profile_with_learning() - update swarm profiles from learning
## Files Modified
### vapora-knowledge-graph/src/persistence.rs (+30 lines)
- get_executions_for_task_type(agent_id, task_type, limit)
- get_agent_executions(agent_id, limit)
### vapora-agents/src/coordinator.rs (+100 lines)
- load_learning_profile_from_kg() - core KG integration method
- load_all_learning_profiles() - batch loading for agents
- assign_task() already uses learning-based scoring via AgentScoringService
### Existing Complete Implementation
- vapora-knowledge-graph/src/learning.rs - calculation functions
- vapora-agents/src/learning_profile.rs - data structures and expertise
- vapora-agents/src/scoring.rs - unified scoring service
- vapora-agents/src/profile_adapter.rs - adapter methods
## Tests Passing
- learning_profile: 7 tests ✅
- scoring: 5 tests ✅
- profile_adapter: 6 tests ✅
- coordinator: learning-specific tests ✅
## Data Flow
1. Task arrives → AgentCoordinator::assign_task()
2. Extract task_type from description
3. Query KG for task-type executions (load_learning_profile_from_kg)
4. Calculate expertise with recency bias
5. Score candidates (SwarmCoordinator + learning)
6. Assign to top-scored agent
7. Execution result → KG → Update learning profiles
## Key Design Decisions
✅ Recency bias: 7-day half-life with 3x weight for recent performance
✅ Confidence scoring: min(1.0, total_executions / 20) prevents overfitting
✅ Hierarchical scoring: 30% base load, 50% expertise, 20% confidence
✅ KG query limit: 100 recent executions per task-type for performance
✅ Async loading: load_learning_profile_from_kg supports concurrent loads
## Next: Phase 5.4 - Cost Optimization
Ready to implement budget enforcement and cost-aware provider selection.
2026-01-11 13:03:53 +00:00
|
|
|
pub struct WorkflowBroadcaster {
|
|
|
|
|
tx: broadcast::Sender<WorkflowUpdate>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl WorkflowBroadcaster {
|
|
|
|
|
pub fn new() -> Self {
|
|
|
|
|
let (tx, _) = broadcast::channel(100);
|
|
|
|
|
Self { tx }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Send workflow update to all subscribers
|
|
|
|
|
pub fn send_update(&self, update: WorkflowUpdate) {
|
|
|
|
|
debug!(
|
|
|
|
|
"Broadcasting update for workflow {}: {} ({}%)",
|
|
|
|
|
update.workflow_id, update.message, update.progress
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if let Err(e) = self.tx.send(update) {
|
|
|
|
|
error!("Failed to broadcast update: {}", e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Subscribe to workflow updates
|
|
|
|
|
pub fn subscribe(&self) -> broadcast::Receiver<WorkflowUpdate> {
|
|
|
|
|
self.tx.subscribe()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get subscriber count
|
|
|
|
|
pub fn subscriber_count(&self) -> usize {
|
|
|
|
|
self.tx.receiver_count()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Default for WorkflowBroadcaster {
|
|
|
|
|
fn default() -> Self {
|
|
|
|
|
Self::new()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Clone for WorkflowBroadcaster {
|
|
|
|
|
fn clone(&self) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
tx: self.tx.clone(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Note: WebSocket support requires ws feature in axum
|
|
|
|
|
// For Phase 4, we focus on the broadcaster infrastructure
|
|
|
|
|
// WebSocket handlers would be added when the ws feature is enabled
|
|
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod tests {
|
|
|
|
|
use super::*;
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_broadcaster_creation() {
|
|
|
|
|
let broadcaster = WorkflowBroadcaster::new();
|
|
|
|
|
assert_eq!(broadcaster.subscriber_count(), 0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_subscribe() {
|
|
|
|
|
let broadcaster = WorkflowBroadcaster::new();
|
|
|
|
|
let _rx = broadcaster.subscribe();
|
|
|
|
|
assert_eq!(broadcaster.subscriber_count(), 1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn test_send_update() {
|
|
|
|
|
let broadcaster = WorkflowBroadcaster::new();
|
|
|
|
|
let mut rx = broadcaster.subscribe();
|
|
|
|
|
|
|
|
|
|
let update = WorkflowUpdate::new(
|
|
|
|
|
"wf-1".to_string(),
|
|
|
|
|
"in_progress".to_string(),
|
|
|
|
|
50,
|
|
|
|
|
"Step 1 completed".to_string(),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
broadcaster.send_update(update.clone());
|
|
|
|
|
|
|
|
|
|
let received = rx.recv().await.unwrap();
|
|
|
|
|
assert_eq!(received.workflow_id, "wf-1");
|
|
|
|
|
assert_eq!(received.progress, 50);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn test_multiple_subscribers() {
|
|
|
|
|
let broadcaster = WorkflowBroadcaster::new();
|
|
|
|
|
let mut rx1 = broadcaster.subscribe();
|
|
|
|
|
let mut rx2 = broadcaster.subscribe();
|
|
|
|
|
|
|
|
|
|
let update = WorkflowUpdate::new(
|
|
|
|
|
"wf-1".to_string(),
|
|
|
|
|
"completed".to_string(),
|
|
|
|
|
100,
|
|
|
|
|
"All steps completed".to_string(),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
broadcaster.send_update(update);
|
|
|
|
|
|
|
|
|
|
let received1 = rx1.recv().await.unwrap();
|
|
|
|
|
let received2 = rx2.recv().await.unwrap();
|
|
|
|
|
|
|
|
|
|
assert_eq!(received1.workflow_id, received2.workflow_id);
|
|
|
|
|
assert_eq!(received1.progress, 100);
|
|
|
|
|
assert_eq!(received2.progress, 100);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_update_serialization() {
|
|
|
|
|
let update = WorkflowUpdate::new(
|
|
|
|
|
"wf-1".to_string(),
|
|
|
|
|
"running".to_string(),
|
|
|
|
|
75,
|
|
|
|
|
"Almost done".to_string(),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let json = serde_json::to_string(&update).unwrap();
|
|
|
|
|
let deserialized: WorkflowUpdate = serde_json::from_str(&json).unwrap();
|
|
|
|
|
|
|
|
|
|
assert_eq!(deserialized.workflow_id, "wf-1");
|
|
|
|
|
assert_eq!(deserialized.progress, 75);
|
|
|
|
|
}
|
|
|
|
|
}
|