Implement intelligent agent learning from Knowledge Graph execution history with per-task-type expertise tracking, recency bias, and learning curves. ## Phase 5.3 Implementation ### Learning Infrastructure (✅ Complete) - LearningProfileService with per-task-type expertise metrics - TaskTypeExpertise model tracking success_rate, confidence, learning curves - Recency bias weighting: recent 7 days weighted 3x higher (exponential decay) - Confidence scoring prevents overfitting: min(1.0, executions / 20) - Learning curves computed from daily execution windows ### Agent Scoring Service (✅ Complete) - Unified AgentScore combining SwarmCoordinator + learning profiles - Scoring formula: 0.3*base + 0.5*expertise + 0.2*confidence - Rank agents by combined score for intelligent assignment - Support for recency-biased scoring (recent_success_rate) - Methods: rank_agents, select_best, rank_agents_with_recency ### KG Integration (✅ Complete) - KGPersistence::get_executions_for_task_type() - query by agent + task type - KGPersistence::get_agent_executions() - all executions for agent - Coordinator::load_learning_profile_from_kg() - core KG→Learning integration - Coordinator::load_all_learning_profiles() - batch load for multiple agents - Convert PersistedExecution → ExecutionData for learning calculations ### Agent Assignment Integration (✅ Complete) - AgentCoordinator uses learning profiles for task assignment - extract_task_type() infers task type from title/description - assign_task() scores candidates using AgentScoringService - Fallback to load-based selection if no learning data available - Learning profiles stored in coordinator.learning_profiles RwLock ### Profile Adapter Enhancements (✅ Complete) - create_learning_profile() - initialize empty profiles - add_task_type_expertise() - set task-type expertise - update_profile_with_learning() - update swarm profiles from learning ## Files Modified ### vapora-knowledge-graph/src/persistence.rs (+30 lines) - get_executions_for_task_type(agent_id, task_type, limit) - get_agent_executions(agent_id, limit) ### vapora-agents/src/coordinator.rs (+100 lines) - load_learning_profile_from_kg() - core KG integration method - load_all_learning_profiles() - batch loading for agents - assign_task() already uses learning-based scoring via AgentScoringService ### Existing Complete Implementation - vapora-knowledge-graph/src/learning.rs - calculation functions - vapora-agents/src/learning_profile.rs - data structures and expertise - vapora-agents/src/scoring.rs - unified scoring service - vapora-agents/src/profile_adapter.rs - adapter methods ## Tests Passing - learning_profile: 7 tests ✅ - scoring: 5 tests ✅ - profile_adapter: 6 tests ✅ - coordinator: learning-specific tests ✅ ## Data Flow 1. Task arrives → AgentCoordinator::assign_task() 2. Extract task_type from description 3. Query KG for task-type executions (load_learning_profile_from_kg) 4. Calculate expertise with recency bias 5. Score candidates (SwarmCoordinator + learning) 6. Assign to top-scored agent 7. Execution result → KG → Update learning profiles ## Key Design Decisions ✅ Recency bias: 7-day half-life with 3x weight for recent performance ✅ Confidence scoring: min(1.0, total_executions / 20) prevents overfitting ✅ Hierarchical scoring: 30% base load, 50% expertise, 20% confidence ✅ KG query limit: 100 recent executions per task-type for performance ✅ Async loading: load_learning_profile_from_kg supports concurrent loads ## Next: Phase 5.4 - Cost Optimization Ready to implement budget enforcement and cost-aware provider selection.
235 lines
6.2 KiB
Rust
235 lines
6.2 KiB
Rust
// vapora-backend: Audit trail system
|
|
// Phase 3: Track all workflow events and actions
|
|
|
|
use chrono::{DateTime, Utc};
|
|
use serde::{Deserialize, Serialize};
|
|
use std::sync::Arc;
|
|
use tokio::sync::RwLock;
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct AuditEntry {
|
|
pub id: String,
|
|
pub timestamp: DateTime<Utc>,
|
|
pub workflow_id: String,
|
|
pub event_type: String,
|
|
pub actor: String,
|
|
pub details: serde_json::Value,
|
|
}
|
|
|
|
impl AuditEntry {
|
|
pub fn new(
|
|
workflow_id: String,
|
|
event_type: String,
|
|
actor: String,
|
|
details: serde_json::Value,
|
|
) -> Self {
|
|
Self {
|
|
id: uuid::Uuid::new_v4().to_string(),
|
|
timestamp: Utc::now(),
|
|
workflow_id,
|
|
event_type,
|
|
actor,
|
|
details,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Audit trail maintains history of workflow events
|
|
pub struct AuditTrail {
|
|
entries: Arc<RwLock<Vec<AuditEntry>>>,
|
|
}
|
|
|
|
impl AuditTrail {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
entries: Arc::new(RwLock::new(Vec::new())),
|
|
}
|
|
}
|
|
|
|
/// Log a workflow event
|
|
pub async fn log_event(
|
|
&self,
|
|
workflow_id: String,
|
|
event_type: String,
|
|
actor: String,
|
|
details: serde_json::Value,
|
|
) {
|
|
let entry = AuditEntry::new(workflow_id, event_type, actor, details);
|
|
let mut entries = self.entries.write().await;
|
|
entries.push(entry);
|
|
}
|
|
|
|
/// Get audit entries for a workflow
|
|
pub async fn get_workflow_audit(&self, workflow_id: &str) -> Vec<AuditEntry> {
|
|
let entries = self.entries.read().await;
|
|
entries
|
|
.iter()
|
|
.filter(|e| e.workflow_id == workflow_id)
|
|
.cloned()
|
|
.collect()
|
|
}
|
|
|
|
/// Get all audit entries
|
|
pub async fn get_all_entries(&self) -> Vec<AuditEntry> {
|
|
let entries = self.entries.read().await;
|
|
entries.clone()
|
|
}
|
|
|
|
/// Get entries by event type
|
|
pub async fn get_by_event_type(&self, event_type: &str) -> Vec<AuditEntry> {
|
|
let entries = self.entries.read().await;
|
|
entries
|
|
.iter()
|
|
.filter(|e| e.event_type == event_type)
|
|
.cloned()
|
|
.collect()
|
|
}
|
|
|
|
/// Get entries by actor
|
|
pub async fn get_by_actor(&self, actor: &str) -> Vec<AuditEntry> {
|
|
let entries = self.entries.read().await;
|
|
entries
|
|
.iter()
|
|
.filter(|e| e.actor == actor)
|
|
.cloned()
|
|
.collect()
|
|
}
|
|
|
|
/// Clear all entries (for testing)
|
|
pub async fn clear(&self) {
|
|
let mut entries = self.entries.write().await;
|
|
entries.clear();
|
|
}
|
|
}
|
|
|
|
impl Default for AuditTrail {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
/// Event types for audit trail
|
|
pub mod events {
|
|
pub const WORKFLOW_CREATED: &str = "workflow_created";
|
|
pub const WORKFLOW_STARTED: &str = "workflow_started";
|
|
pub const WORKFLOW_COMPLETED: &str = "workflow_completed";
|
|
pub const WORKFLOW_FAILED: &str = "workflow_failed";
|
|
pub const WORKFLOW_ROLLED_BACK: &str = "workflow_rolled_back";
|
|
pub const PHASE_STARTED: &str = "phase_started";
|
|
pub const PHASE_COMPLETED: &str = "phase_completed";
|
|
pub const STEP_STARTED: &str = "step_started";
|
|
pub const STEP_COMPLETED: &str = "step_completed";
|
|
pub const STEP_FAILED: &str = "step_failed";
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[tokio::test]
|
|
async fn test_audit_trail_creation() {
|
|
let audit = AuditTrail::new();
|
|
assert!(audit.get_all_entries().await.is_empty());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_log_event() {
|
|
let audit = AuditTrail::new();
|
|
|
|
audit
|
|
.log_event(
|
|
"wf-1".to_string(),
|
|
events::WORKFLOW_STARTED.to_string(),
|
|
"system".to_string(),
|
|
serde_json::json!({"test": "data"}),
|
|
)
|
|
.await;
|
|
|
|
let entries = audit.get_all_entries().await;
|
|
assert_eq!(entries.len(), 1);
|
|
assert_eq!(entries[0].workflow_id, "wf-1");
|
|
assert_eq!(entries[0].event_type, events::WORKFLOW_STARTED);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_get_workflow_audit() {
|
|
let audit = AuditTrail::new();
|
|
|
|
audit
|
|
.log_event(
|
|
"wf-1".to_string(),
|
|
events::WORKFLOW_STARTED.to_string(),
|
|
"system".to_string(),
|
|
serde_json::json!({}),
|
|
)
|
|
.await;
|
|
|
|
audit
|
|
.log_event(
|
|
"wf-2".to_string(),
|
|
events::WORKFLOW_STARTED.to_string(),
|
|
"system".to_string(),
|
|
serde_json::json!({}),
|
|
)
|
|
.await;
|
|
|
|
let entries = audit.get_workflow_audit("wf-1").await;
|
|
assert_eq!(entries.len(), 1);
|
|
assert_eq!(entries[0].workflow_id, "wf-1");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_filter_by_event_type() {
|
|
let audit = AuditTrail::new();
|
|
|
|
audit
|
|
.log_event(
|
|
"wf-1".to_string(),
|
|
events::WORKFLOW_STARTED.to_string(),
|
|
"system".to_string(),
|
|
serde_json::json!({}),
|
|
)
|
|
.await;
|
|
|
|
audit
|
|
.log_event(
|
|
"wf-1".to_string(),
|
|
events::WORKFLOW_COMPLETED.to_string(),
|
|
"system".to_string(),
|
|
serde_json::json!({}),
|
|
)
|
|
.await;
|
|
|
|
let entries = audit.get_by_event_type(events::WORKFLOW_STARTED).await;
|
|
assert_eq!(entries.len(), 1);
|
|
assert_eq!(entries[0].event_type, events::WORKFLOW_STARTED);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_filter_by_actor() {
|
|
let audit = AuditTrail::new();
|
|
|
|
audit
|
|
.log_event(
|
|
"wf-1".to_string(),
|
|
events::WORKFLOW_STARTED.to_string(),
|
|
"user-1".to_string(),
|
|
serde_json::json!({}),
|
|
)
|
|
.await;
|
|
|
|
audit
|
|
.log_event(
|
|
"wf-2".to_string(),
|
|
events::WORKFLOW_STARTED.to_string(),
|
|
"user-2".to_string(),
|
|
serde_json::json!({}),
|
|
)
|
|
.await;
|
|
|
|
let entries = audit.get_by_actor("user-1").await;
|
|
assert_eq!(entries.len(), 1);
|
|
assert_eq!(entries[0].actor, "user-1");
|
|
}
|
|
}
|