Vapora/crates/vapora-backend/src/services/workflow_service.rs

298 lines
9.6 KiB
Rust
Raw Normal View History

feat: Phase 5.3 - Multi-Agent Learning Infrastructure Implement intelligent agent learning from Knowledge Graph execution history with per-task-type expertise tracking, recency bias, and learning curves. ## Phase 5.3 Implementation ### Learning Infrastructure (✅ Complete) - LearningProfileService with per-task-type expertise metrics - TaskTypeExpertise model tracking success_rate, confidence, learning curves - Recency bias weighting: recent 7 days weighted 3x higher (exponential decay) - Confidence scoring prevents overfitting: min(1.0, executions / 20) - Learning curves computed from daily execution windows ### Agent Scoring Service (✅ Complete) - Unified AgentScore combining SwarmCoordinator + learning profiles - Scoring formula: 0.3*base + 0.5*expertise + 0.2*confidence - Rank agents by combined score for intelligent assignment - Support for recency-biased scoring (recent_success_rate) - Methods: rank_agents, select_best, rank_agents_with_recency ### KG Integration (✅ Complete) - KGPersistence::get_executions_for_task_type() - query by agent + task type - KGPersistence::get_agent_executions() - all executions for agent - Coordinator::load_learning_profile_from_kg() - core KG→Learning integration - Coordinator::load_all_learning_profiles() - batch load for multiple agents - Convert PersistedExecution → ExecutionData for learning calculations ### Agent Assignment Integration (✅ Complete) - AgentCoordinator uses learning profiles for task assignment - extract_task_type() infers task type from title/description - assign_task() scores candidates using AgentScoringService - Fallback to load-based selection if no learning data available - Learning profiles stored in coordinator.learning_profiles RwLock ### Profile Adapter Enhancements (✅ Complete) - create_learning_profile() - initialize empty profiles - add_task_type_expertise() - set task-type expertise - update_profile_with_learning() - update swarm profiles from learning ## Files Modified ### vapora-knowledge-graph/src/persistence.rs (+30 lines) - get_executions_for_task_type(agent_id, task_type, limit) - get_agent_executions(agent_id, limit) ### vapora-agents/src/coordinator.rs (+100 lines) - load_learning_profile_from_kg() - core KG integration method - load_all_learning_profiles() - batch loading for agents - assign_task() already uses learning-based scoring via AgentScoringService ### Existing Complete Implementation - vapora-knowledge-graph/src/learning.rs - calculation functions - vapora-agents/src/learning_profile.rs - data structures and expertise - vapora-agents/src/scoring.rs - unified scoring service - vapora-agents/src/profile_adapter.rs - adapter methods ## Tests Passing - learning_profile: 7 tests ✅ - scoring: 5 tests ✅ - profile_adapter: 6 tests ✅ - coordinator: learning-specific tests ✅ ## Data Flow 1. Task arrives → AgentCoordinator::assign_task() 2. Extract task_type from description 3. Query KG for task-type executions (load_learning_profile_from_kg) 4. Calculate expertise with recency bias 5. Score candidates (SwarmCoordinator + learning) 6. Assign to top-scored agent 7. Execution result → KG → Update learning profiles ## Key Design Decisions ✅ Recency bias: 7-day half-life with 3x weight for recent performance ✅ Confidence scoring: min(1.0, total_executions / 20) prevents overfitting ✅ Hierarchical scoring: 30% base load, 50% expertise, 20% confidence ✅ KG query limit: 100 recent executions per task-type for performance ✅ Async loading: load_learning_profile_from_kg supports concurrent loads ## Next: Phase 5.4 - Cost Optimization Ready to implement budget enforcement and cost-aware provider selection.
2026-01-11 13:03:53 +00:00
// vapora-backend: Workflow service
// Phase 3: Service layer for workflow management
use crate::api::websocket::{WorkflowBroadcaster, WorkflowUpdate};
use crate::audit::{events, AuditEntry, AuditTrail};
use crate::workflow::{EngineError, Workflow, WorkflowEngine};
use std::sync::Arc;
use thiserror::Error;
use tracing::{error, info};
#[derive(Debug, Error)]
pub enum WorkflowServiceError {
#[error("Engine error: {0}")]
EngineError(#[from] EngineError),
#[error("Workflow not found: {0}")]
NotFound(String),
#[error("Invalid operation: {0}")]
InvalidOperation(String),
}
/// Workflow service provides high-level workflow operations
pub struct WorkflowService {
engine: Arc<WorkflowEngine>,
broadcaster: Arc<WorkflowBroadcaster>,
audit: Arc<AuditTrail>,
}
impl WorkflowService {
pub fn new(
engine: Arc<WorkflowEngine>,
broadcaster: Arc<WorkflowBroadcaster>,
audit: Arc<AuditTrail>,
) -> Self {
Self {
engine,
broadcaster,
audit,
}
}
/// Create and register a new workflow
pub async fn create_workflow(&self, workflow: Workflow) -> Result<Workflow, WorkflowServiceError> {
let workflow_id = workflow.id.clone();
let title = workflow.title.clone();
// Register with engine
self.engine.register_workflow(workflow.clone()).await?;
// Audit event
self.audit
.log_event(
workflow_id.clone(),
events::WORKFLOW_CREATED.to_string(),
"system".to_string(),
serde_json::json!({
"title": title,
"phases": workflow.phases.len(),
}),
)
.await;
// Broadcast update
self.broadcaster.send_update(WorkflowUpdate::new(
workflow_id.clone(),
"created".to_string(),
0,
format!("Workflow '{}' created", title),
));
info!("Created workflow: {} ({})", workflow_id, title);
Ok(workflow)
}
/// Execute a workflow
pub async fn execute_workflow(&self, workflow_id: &str) -> Result<Workflow, WorkflowServiceError> {
info!("Executing workflow: {}", workflow_id);
// Broadcast start
self.broadcaster.send_update(WorkflowUpdate::new(
workflow_id.to_string(),
"starting".to_string(),
0,
"Workflow execution started".to_string(),
));
// Audit event
self.audit
.log_event(
workflow_id.to_string(),
events::WORKFLOW_STARTED.to_string(),
"system".to_string(),
serde_json::json!({}),
)
.await;
// Execute workflow
let result = self.engine.execute_workflow(workflow_id).await;
match result {
Ok(workflow) => {
let status = format!("{:?}", workflow.status);
let progress = workflow.progress_percent();
// Broadcast completion
self.broadcaster.send_update(WorkflowUpdate::new(
workflow_id.to_string(),
status.clone(),
progress,
"Workflow execution completed".to_string(),
));
// Audit event
self.audit
.log_event(
workflow_id.to_string(),
events::WORKFLOW_COMPLETED.to_string(),
"system".to_string(),
serde_json::json!({
"status": status,
"progress": progress,
}),
)
.await;
info!("Workflow {} completed with status: {}", workflow_id, status);
Ok(workflow)
}
Err(e) => {
let error_msg = format!("{}", e);
// Broadcast failure
self.broadcaster.send_update(WorkflowUpdate::new(
workflow_id.to_string(),
"failed".to_string(),
0,
format!("Workflow execution failed: {}", error_msg),
));
// Audit event
self.audit
.log_event(
workflow_id.to_string(),
events::WORKFLOW_FAILED.to_string(),
"system".to_string(),
serde_json::json!({
"error": error_msg,
}),
)
.await;
error!("Workflow {} failed: {}", workflow_id, error_msg);
Err(WorkflowServiceError::from(e))
}
}
}
/// Get workflow by ID
pub async fn get_workflow(&self, workflow_id: &str) -> Result<Workflow, WorkflowServiceError> {
self.engine
.get_workflow(workflow_id)
.await
.ok_or_else(|| WorkflowServiceError::NotFound(workflow_id.to_string()))
}
/// List all workflows
pub async fn list_workflows(&self) -> Vec<Workflow> {
self.engine.list_workflows().await
}
/// Rollback a failed workflow
pub async fn rollback_workflow(&self, workflow_id: &str) -> Result<(), WorkflowServiceError> {
info!("Rolling back workflow: {}", workflow_id);
self.engine.rollback_workflow(workflow_id).await?;
// Broadcast rollback
self.broadcaster.send_update(WorkflowUpdate::new(
workflow_id.to_string(),
"rolled_back".to_string(),
0,
"Workflow rolled back".to_string(),
));
// Audit event
self.audit
.log_event(
workflow_id.to_string(),
events::WORKFLOW_ROLLED_BACK.to_string(),
"system".to_string(),
serde_json::json!({}),
)
.await;
Ok(())
}
/// Get audit trail for workflow
pub async fn get_audit_trail(&self, workflow_id: &str) -> Vec<AuditEntry> {
self.audit.get_workflow_audit(workflow_id).await
}
/// Get broadcaster reference
pub fn broadcaster(&self) -> Arc<WorkflowBroadcaster> {
Arc::clone(&self.broadcaster)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::workflow::{executor::StepExecutor, state::{Phase, StepStatus, WorkflowStep}};
use vapora_agents::{coordinator::AgentCoordinator, registry::AgentRegistry};
fn create_test_workflow() -> Workflow {
Workflow::new(
"test-wf-1".to_string(),
"Test Workflow".to_string(),
vec![Phase {
id: "phase1".to_string(),
name: "Phase 1".to_string(),
status: StepStatus::Pending,
parallel: false,
estimated_hours: 1.0,
steps: vec![WorkflowStep {
id: "step1".to_string(),
name: "Step 1".to_string(),
agent_role: "developer".to_string(),
status: StepStatus::Pending,
depends_on: vec![],
can_parallelize: true,
started_at: None,
completed_at: None,
result: None,
error: None,
}],
}],
)
}
#[tokio::test]
async fn test_service_creation() {
let registry = Arc::new(AgentRegistry::new(5));
let coordinator = Arc::new(AgentCoordinator::new(registry));
let executor = StepExecutor::new(coordinator);
let engine = Arc::new(WorkflowEngine::new(executor));
let broadcaster = Arc::new(WorkflowBroadcaster::new());
let audit = Arc::new(AuditTrail::new());
let service = WorkflowService::new(engine, broadcaster, audit);
assert!(service.list_workflows().await.is_empty());
}
#[tokio::test]
async fn test_create_workflow() {
let registry = Arc::new(AgentRegistry::new(5));
let coordinator = Arc::new(AgentCoordinator::new(registry));
let executor = StepExecutor::new(coordinator);
let engine = Arc::new(WorkflowEngine::new(executor));
let broadcaster = Arc::new(WorkflowBroadcaster::new());
let audit = Arc::new(AuditTrail::new());
let service = WorkflowService::new(engine, broadcaster, audit);
let workflow = create_test_workflow();
let id = workflow.id.clone();
let result = service.create_workflow(workflow).await;
assert!(result.is_ok());
let retrieved = service.get_workflow(&id).await;
assert!(retrieved.is_ok());
assert_eq!(retrieved.unwrap().id, id);
}
#[tokio::test]
async fn test_audit_trail_logging() {
let registry = Arc::new(AgentRegistry::new(5));
let coordinator = Arc::new(AgentCoordinator::new(registry));
let executor = StepExecutor::new(coordinator);
let engine = Arc::new(WorkflowEngine::new(executor));
let broadcaster = Arc::new(WorkflowBroadcaster::new());
let audit = Arc::new(AuditTrail::new());
let service = WorkflowService::new(engine, broadcaster, audit);
let workflow = create_test_workflow();
let id = workflow.id.clone();
service.create_workflow(workflow).await.unwrap();
let audit_entries = service.get_audit_trail(&id).await;
assert!(!audit_entries.is_empty());
assert_eq!(audit_entries[0].event_type, events::WORKFLOW_CREATED);
}
}