// vapora-backend: Workflow service // Phase 3: Service layer for workflow management use std::sync::Arc; use thiserror::Error; use tracing::{error, info}; use crate::api::websocket::{WorkflowBroadcaster, WorkflowUpdate}; use crate::audit::{events, AuditEntry, AuditTrail}; use crate::workflow::{EngineError, Workflow, WorkflowEngine}; #[derive(Debug, Error)] pub enum WorkflowServiceError { #[error("Engine error: {0}")] EngineError(#[from] EngineError), #[error("Workflow not found: {0}")] NotFound(String), #[error("Invalid operation: {0}")] InvalidOperation(String), } /// Workflow service provides high-level workflow operations pub struct WorkflowService { engine: Arc, broadcaster: Arc, audit: Arc, } impl WorkflowService { pub fn new( engine: Arc, broadcaster: Arc, audit: Arc, ) -> Self { Self { engine, broadcaster, audit, } } /// Create and register a new workflow pub async fn create_workflow( &self, workflow: Workflow, ) -> Result { let workflow_id = workflow.id.clone(); let title = workflow.title.clone(); // Register with engine self.engine.register_workflow(workflow.clone()).await?; // Audit event self.audit .log_event( workflow_id.clone(), events::WORKFLOW_CREATED.to_string(), "system".to_string(), serde_json::json!({ "title": title, "phases": workflow.phases.len(), }), ) .await; // Broadcast update self.broadcaster.send_update(WorkflowUpdate::new( workflow_id.clone(), "created".to_string(), 0, format!("Workflow '{}' created", title), )); info!("Created workflow: {} ({})", workflow_id, title); Ok(workflow) } /// Execute a workflow pub async fn execute_workflow( &self, workflow_id: &str, ) -> Result { info!("Executing workflow: {}", workflow_id); // Broadcast start self.broadcaster.send_update(WorkflowUpdate::new( workflow_id.to_string(), "starting".to_string(), 0, "Workflow execution started".to_string(), )); // Audit event self.audit .log_event( workflow_id.to_string(), events::WORKFLOW_STARTED.to_string(), "system".to_string(), serde_json::json!({}), ) .await; // Execute workflow let result = self.engine.execute_workflow(workflow_id).await; match result { Ok(workflow) => { let status = format!("{:?}", workflow.status); let progress = workflow.progress_percent(); // Broadcast completion self.broadcaster.send_update(WorkflowUpdate::new( workflow_id.to_string(), status.clone(), progress, "Workflow execution completed".to_string(), )); // Audit event self.audit .log_event( workflow_id.to_string(), events::WORKFLOW_COMPLETED.to_string(), "system".to_string(), serde_json::json!({ "status": status, "progress": progress, }), ) .await; info!("Workflow {} completed with status: {}", workflow_id, status); Ok(workflow) } Err(e) => { let error_msg = format!("{}", e); // Broadcast failure self.broadcaster.send_update(WorkflowUpdate::new( workflow_id.to_string(), "failed".to_string(), 0, format!("Workflow execution failed: {}", error_msg), )); // Audit event self.audit .log_event( workflow_id.to_string(), events::WORKFLOW_FAILED.to_string(), "system".to_string(), serde_json::json!({ "error": error_msg, }), ) .await; error!("Workflow {} failed: {}", workflow_id, error_msg); Err(WorkflowServiceError::from(e)) } } } /// Get workflow by ID pub async fn get_workflow(&self, workflow_id: &str) -> Result { self.engine .get_workflow(workflow_id) .await .ok_or_else(|| WorkflowServiceError::NotFound(workflow_id.to_string())) } /// List all workflows pub async fn list_workflows(&self) -> Vec { self.engine.list_workflows().await } /// Rollback a failed workflow pub async fn rollback_workflow(&self, workflow_id: &str) -> Result<(), WorkflowServiceError> { info!("Rolling back workflow: {}", workflow_id); self.engine.rollback_workflow(workflow_id).await?; // Broadcast rollback self.broadcaster.send_update(WorkflowUpdate::new( workflow_id.to_string(), "rolled_back".to_string(), 0, "Workflow rolled back".to_string(), )); // Audit event self.audit .log_event( workflow_id.to_string(), events::WORKFLOW_ROLLED_BACK.to_string(), "system".to_string(), serde_json::json!({}), ) .await; Ok(()) } /// Get audit trail for workflow pub async fn get_audit_trail(&self, workflow_id: &str) -> Vec { self.audit.get_workflow_audit(workflow_id).await } /// Get broadcaster reference pub fn broadcaster(&self) -> Arc { Arc::clone(&self.broadcaster) } } #[cfg(test)] mod tests { use vapora_agents::{ config::{AgentConfig, RegistryConfig}, coordinator::AgentCoordinator, registry::AgentRegistry, }; use super::*; use crate::workflow::{ executor::StepExecutor, state::{Phase, StepStatus, WorkflowStep}, }; fn create_test_workflow() -> Workflow { Workflow::new( "test-wf-1".to_string(), "Test Workflow".to_string(), vec![Phase { id: "phase1".to_string(), name: "Phase 1".to_string(), status: StepStatus::Pending, parallel: false, estimated_hours: 1.0, steps: vec![WorkflowStep { id: "step1".to_string(), name: "Step 1".to_string(), agent_role: "developer".to_string(), status: StepStatus::Pending, depends_on: vec![], can_parallelize: true, started_at: None, completed_at: None, result: None, error: None, }], }], ) } #[tokio::test] async fn test_service_creation() { let registry = Arc::new(AgentRegistry::new(5)); let config = AgentConfig { registry: RegistryConfig { max_agents_per_role: 5, health_check_interval: 30, agent_timeout: 300, }, agents: vec![], }; let coordinator = Arc::new( AgentCoordinator::new(config, registry) .await .expect("coordinator creation failed"), ); let executor = StepExecutor::new(coordinator); let engine = Arc::new(WorkflowEngine::new(executor)); let broadcaster = Arc::new(WorkflowBroadcaster::new()); let audit = Arc::new(AuditTrail::new()); let service = WorkflowService::new(engine, broadcaster, audit); assert!(service.list_workflows().await.is_empty()); } #[tokio::test] async fn test_create_workflow() { let registry = Arc::new(AgentRegistry::new(5)); let config = AgentConfig { registry: RegistryConfig { max_agents_per_role: 5, health_check_interval: 30, agent_timeout: 300, }, agents: vec![], }; let coordinator = Arc::new( AgentCoordinator::new(config, registry) .await .expect("coordinator creation failed"), ); let executor = StepExecutor::new(coordinator); let engine = Arc::new(WorkflowEngine::new(executor)); let broadcaster = Arc::new(WorkflowBroadcaster::new()); let audit = Arc::new(AuditTrail::new()); let service = WorkflowService::new(engine, broadcaster, audit); let workflow = create_test_workflow(); let id = workflow.id.clone(); let result: Result = service.create_workflow(workflow).await; assert!(result.is_ok()); let retrieved: Result = service.get_workflow(&id).await; assert!(retrieved.is_ok()); assert_eq!(retrieved.unwrap().id, id); } #[tokio::test] async fn test_audit_trail_logging() { let registry = Arc::new(AgentRegistry::new(5)); let config = AgentConfig { registry: RegistryConfig { max_agents_per_role: 5, health_check_interval: 30, agent_timeout: 300, }, agents: vec![], }; let coordinator = Arc::new( AgentCoordinator::new(config, registry) .await .expect("coordinator creation failed"), ); let executor = StepExecutor::new(coordinator); let engine = Arc::new(WorkflowEngine::new(executor)); let broadcaster = Arc::new(WorkflowBroadcaster::new()); let audit = Arc::new(AuditTrail::new()); let service = WorkflowService::new(engine, broadcaster, audit); let workflow = create_test_workflow(); let id = workflow.id.clone(); let _: Result = service.create_workflow(workflow).await; let audit_entries: Vec<_> = service.get_audit_trail(&id).await; assert!(!audit_entries.is_empty()); assert_eq!(audit_entries[0].event_type, events::WORKFLOW_CREATED); } }