//! Workflow execution engine for batch operations //! //! This module provides a configuration-driven batch workflow engine that //! integrates with the existing storage abstraction. It supports dependency //! resolution, parallel execution with limits, and mixed provider operations. use std::{ collections::{HashMap, HashSet, VecDeque}, sync::Arc, time::{Duration, Instant}, }; use anyhow::{Context, Result}; use futures::{stream, StreamExt}; use serde::{Deserialize, Serialize}; use tokio::sync::{RwLock, Semaphore}; use tracing::{debug, error, info}; use uuid::Uuid; use crate::{storage::TaskStorage, TaskStatus, WorkflowTask}; #[cfg(feature = "nats")] use platform_nats::NatsBridge; /// Configuration for workflow execution #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkflowConfig { /// Maximum number of parallel tasks pub max_parallel_tasks: usize, /// Timeout for individual tasks in seconds pub task_timeout_seconds: u64, /// Maximum retry attempts for failed tasks pub max_retries: u8, /// Delay between retries in seconds pub retry_delay_seconds: u64, /// Whether to fail fast on first error pub fail_fast: bool, /// Checkpoint interval in seconds pub checkpoint_interval_seconds: u64, } impl Default for WorkflowConfig { fn default() -> Self { Self { max_parallel_tasks: 4, task_timeout_seconds: 3600, // 1 hour max_retries: 3, retry_delay_seconds: 30, fail_fast: false, checkpoint_interval_seconds: 300, // 5 minutes } } } /// KCL workflow definition parsed from configuration #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkflowDefinition { pub name: String, pub description: Option, pub tasks: Vec, pub config: Option, } /// Individual task definition in workflow #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkflowTaskDefinition { pub name: String, pub command: String, pub args: Vec, pub dependencies: Vec, pub provider: Option, pub timeout_seconds: Option, pub max_retries: Option, pub environment: Option>, pub metadata: Option>, } /// Execution state for a workflow #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkflowExecutionState { pub workflow_id: String, pub status: WorkflowStatus, pub started_at: chrono::DateTime, pub completed_at: Option>, pub task_states: HashMap, pub execution_graph: DependencyGraph, pub checkpoints: Vec, pub statistics: WorkflowStatistics, } /// Overall workflow status #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum WorkflowStatus { Pending, Running, Completed, Failed, Cancelled, Paused, } impl std::fmt::Display for WorkflowStatus { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { WorkflowStatus::Pending => write!(f, "pending"), WorkflowStatus::Running => write!(f, "running"), WorkflowStatus::Completed => write!(f, "completed"), WorkflowStatus::Failed => write!(f, "failed"), WorkflowStatus::Cancelled => write!(f, "cancelled"), WorkflowStatus::Paused => write!(f, "paused"), } } } /// Execution state for individual task #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TaskExecutionState { pub task_id: String, pub status: TaskStatus, pub retry_count: u8, pub started_at: Option>, pub completed_at: Option>, pub duration_ms: Option, pub error: Option, } /// Dependency graph for task ordering #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct DependencyGraph { pub nodes: HashSet, pub edges: HashMap>, pub reverse_edges: HashMap>, } impl DependencyGraph { /// Create new empty dependency graph pub fn new() -> Self { Self::default() } /// Add a task node to the graph pub fn add_node(&mut self, task_name: String) { self.nodes.insert(task_name.clone()); self.edges.entry(task_name.clone()).or_default(); self.reverse_edges.entry(task_name).or_default(); } /// Add a dependency edge (from depends on to) pub fn add_dependency(&mut self, from: String, depends_on: String) { self.edges .entry(from.clone()) .or_default() .push(depends_on.clone()); self.reverse_edges.entry(depends_on).or_default().push(from); } /// Get tasks that have no pending dependencies pub fn get_ready_tasks(&self, completed_tasks: &HashSet) -> Vec { self.nodes .iter() .filter(|task| { !completed_tasks.contains(*task) && self .edges .get(*task) .map(|deps| deps.iter().all(|dep| completed_tasks.contains(dep))) .unwrap_or(true) }) .cloned() .collect() } /// Topologically sort tasks pub fn topological_sort(&self) -> Result> { let mut in_degree: HashMap = HashMap::new(); let mut result = Vec::new(); let mut queue = VecDeque::new(); // Calculate in-degrees for node in &self.nodes { in_degree.insert(node.clone(), 0); } for deps in self.edges.values() { for dep in deps { *in_degree.entry(dep.clone()).or_default() += 1; } } // Find nodes with no incoming edges for (node, degree) in &in_degree { if *degree == 0 { queue.push_back(node.clone()); } } // Process queue while let Some(node) = queue.pop_front() { result.push(node.clone()); let Some(dependents) = self.reverse_edges.get(&node) else { continue; }; for dependent in dependents { let Some(degree) = in_degree.get_mut(dependent) else { continue; }; *degree -= 1; if *degree == 0 { queue.push_back(dependent.clone()); } } } if result.len() != self.nodes.len() { return Err(anyhow::anyhow!("Circular dependency detected in workflow")); } Ok(result) } } /// Workflow execution checkpoint for recovery #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkflowCheckpoint { pub timestamp: chrono::DateTime, pub completed_tasks: HashSet, pub failed_tasks: HashSet, pub state_snapshot: String, } /// Workflow execution statistics #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkflowStatistics { pub total_tasks: usize, pub completed_tasks: usize, pub failed_tasks: usize, pub running_tasks: usize, pub pending_tasks: usize, pub total_duration_ms: u64, pub average_task_duration_ms: Option, pub throughput_tasks_per_minute: f64, } impl WorkflowStatistics { pub fn new(total_tasks: usize) -> Self { Self { total_tasks, completed_tasks: 0, failed_tasks: 0, running_tasks: 0, pending_tasks: total_tasks, total_duration_ms: 0, average_task_duration_ms: None, throughput_tasks_per_minute: 0.0, } } } /// Main workflow execution engine pub struct BatchWorkflowEngine { storage: Arc, config: WorkflowConfig, execution_states: Arc>>, #[cfg(feature = "nats")] nats: Option>, workspace: Option, } impl BatchWorkflowEngine { /// Create new workflow engine pub fn new(storage: Arc, config: WorkflowConfig) -> Self { Self { storage, config, execution_states: Arc::new(RwLock::new(HashMap::new())), #[cfg(feature = "nats")] nats: None, workspace: None, } } /// Attach a NATS bridge for DAG event emission. #[cfg(feature = "nats")] pub fn with_nats(mut self, nats: Arc) -> Self { self.nats = Some(nats); self } /// Set the workspace name included in DAG NATS event payloads. pub fn with_workspace(mut self, workspace: impl Into) -> Self { self.workspace = Some(workspace.into()); self } /// Emit a DAG lifecycle event to NATS. No-op if NATS is not configured. /// Failures are logged as warnings — event emission must not block execution. #[cfg(feature = "nats")] async fn emit_dag_event(&self, subject: &str, payload: serde_json::Value) { if let Some(nats) = &self.nats { if let Err(e) = nats.publish_json(subject, &payload).await { tracing::warn!(subject, "DAG NATS event emission failed: {}", e); } } } /// Emit a workspace NCL change event — signals ncl-sync to invalidate/re-export. /// Subject contract: `provisioning.workspace.ncl.changed` /// See ADR-024. #[cfg(feature = "nats")] async fn emit_ncl_changed(&self, workspace: Option<&str>, state_path: &str) { let Some(ws) = workspace else { return }; if let Some(nats) = &self.nats { let payload = serde_json::json!({ "workspace": ws, "path": state_path, "import_paths": [], "source": "orchestrator", }); if let Err(e) = nats .publish_json("provisioning.workspace.ncl.changed", &payload) .await { tracing::warn!("NCL changed event emission failed: {}", e); } } } /// Execute a workflow definition. /// /// `workspace` overrides the value set via `with_workspace()` for this /// execution. Pass `None` to fall back to the struct-level workspace. pub async fn execute_workflow( &self, definition: WorkflowDefinition, workspace: Option<&str>, ) -> Result { let workspace: Option = workspace .map(|s| s.to_owned()) .or_else(|| self.workspace.clone()); let workflow_id = Uuid::new_v4().to_string(); info!( "Starting workflow execution: {} ({})", definition.name, workflow_id ); // Build dependency graph let mut graph = DependencyGraph::new(); for task_def in &definition.tasks { graph.add_node(task_def.name.clone()); } for task_def in &definition.tasks { for dep in &task_def.dependencies { graph.add_dependency(task_def.name.clone(), dep.clone()); } } // Validate dependency graph let _sorted_tasks = graph .topological_sort() .context("Failed to resolve task dependencies")?; // Create initial execution state let mut execution_state = WorkflowExecutionState { workflow_id: workflow_id.clone(), status: WorkflowStatus::Running, started_at: chrono::Utc::now(), completed_at: None, task_states: HashMap::new(), execution_graph: graph, checkpoints: Vec::new(), statistics: WorkflowStatistics::new(definition.tasks.len()), }; // Initialize task states for task_def in &definition.tasks { execution_state.task_states.insert( task_def.name.clone(), TaskExecutionState { task_id: Uuid::new_v4().to_string(), status: TaskStatus::Pending, retry_count: 0, started_at: None, completed_at: None, duration_ms: None, error: None, }, ); } // Store initial state { let mut states = self.execution_states.write().await; states.insert(workflow_id.clone(), execution_state.clone()); } // Execute workflow let final_state = self .execute_workflow_internal(workflow_id.clone(), definition, execution_state, workspace) .await?; info!( "Workflow execution completed: {} ({})", final_state.workflow_id, final_state.status ); Ok(final_state) } /// Internal workflow execution logic async fn execute_workflow_internal( &self, workflow_id: String, definition: WorkflowDefinition, mut state: WorkflowExecutionState, #[cfg(feature = "nats")] workspace: Option, #[cfg(not(feature = "nats"))] _workspace: Option, ) -> Result { let semaphore = Arc::new(Semaphore::new(self.config.max_parallel_tasks)); let mut completed_tasks = HashSet::new(); let mut failed_tasks = HashSet::new(); let start_time = Instant::now(); let mut last_checkpoint = Instant::now(); loop { // Check for checkpoint if last_checkpoint.elapsed().as_secs() >= self.config.checkpoint_interval_seconds { self.create_checkpoint(&mut state, &completed_tasks, &failed_tasks) .await?; last_checkpoint = Instant::now(); } // Get ready tasks let ready_tasks = state.execution_graph.get_ready_tasks(&completed_tasks); let pending_ready_tasks: Vec<_> = ready_tasks .into_iter() .filter(|task| { !completed_tasks.contains(task) && !failed_tasks.contains(task) && state .task_states .get(task) .map(|ts| ts.status == TaskStatus::Pending) .unwrap_or(false) }) .collect(); if pending_ready_tasks.is_empty() { // Check if all tasks are done let total_done = completed_tasks.len() + failed_tasks.len(); if total_done >= definition.tasks.len() { break; } // Wait for running tasks tokio::time::sleep(Duration::from_secs(1)).await; continue; } // Execute ready tasks in parallel let tasks_futures = pending_ready_tasks.into_iter().map(|task_name| { let task_def = definition .tasks .iter() .find(|t| t.name == task_name) .unwrap() .clone(); let semaphore = semaphore.clone(); let storage = self.storage.clone(); let workflow_id = workflow_id.clone(); let config = self.config.clone(); // Capture emit data for the async block — the future must be awaited // inside the async context, not dropped in this sync iterator closure. #[cfg(feature = "nats")] let workspace_for_event = workspace.clone(); #[cfg(feature = "nats")] let (started_subject, started_payload) = { let is_hg = task_def .metadata .as_ref() .and_then(|m| m.get("type")) .map(|v| v == "health-gate") .unwrap_or(false); let fid = task_name .split_once("::") .map(|(f, _)| f.to_string()) .unwrap_or_else(|| task_name.clone()); let subject = if is_hg { "provisioning.dag.healthgate.checking" } else { "provisioning.dag.formula.started" }; let payload = serde_json::json!({ "workflow_id": workflow_id, "formula_id": fid, "task": task_name, "workspace": workspace_for_event, }); (subject.to_string(), payload) }; async move { let _permit = semaphore.acquire().await.unwrap(); #[cfg(feature = "nats")] self.emit_dag_event(&started_subject, started_payload).await; self.execute_single_task( workflow_id, task_name.clone(), task_def, storage, config, ) .await } }); // Wait for batch completion let results: Vec<_> = stream::iter(tasks_futures) .buffer_unordered(self.config.max_parallel_tasks) .collect() .await; // Process results for (task_name, result) in results { // Derive formula_id from namespaced task name: "formula-id::task-name" → "formula-id". // For non-namespaced tasks the full name is used as the formula_id. let formula_id = task_name .split_once("::") .map(|(fid, _)| fid) .unwrap_or(&task_name); // Determine if this is a health gate synthetic task. let is_health_gate = definition .tasks .iter() .find(|t| t.name == task_name) .and_then(|t| t.metadata.as_ref()) .and_then(|m| m.get("type")) .map(|v| v == "health-gate") .unwrap_or(false); match result { Ok(_) => { completed_tasks.insert(task_name.clone()); state.statistics.completed_tasks += 1; state.statistics.pending_tasks -= 1; if let Some(task_state) = state.task_states.get_mut(&task_name) { task_state.status = TaskStatus::Completed; task_state.completed_at = Some(chrono::Utc::now()); } #[cfg(feature = "nats")] if is_health_gate { self.emit_dag_event( "provisioning.dag.healthgate.passed", serde_json::json!({ "workflow_id": workflow_id, "gate_id": task_name, }), ) .await; } else { self.emit_dag_event( "provisioning.dag.formula.completed", serde_json::json!({ "workflow_id": workflow_id, "formula_id": formula_id, "task": task_name, "workspace": workspace, }), ) .await; // Nu tasks write .provisioning-state.ncl on transition — signal // ncl-sync so the cache is refreshed before the next read. // Path is conventional (/.provisioning-state.ncl); daemon // also watches filesystem so exact path is best-effort. if let Some(ws) = &workspace { let state_path = format!("{}/.provisioning-state.ncl", ws); self.emit_ncl_changed(Some(ws), &state_path).await; } } } Err(e) => { error!("Task {} failed: {}", task_name, e); failed_tasks.insert(task_name.clone()); state.statistics.failed_tasks += 1; state.statistics.pending_tasks -= 1; if let Some(task_state) = state.task_states.get_mut(&task_name) { task_state.status = TaskStatus::Failed; task_state.completed_at = Some(chrono::Utc::now()); task_state.error = Some(e.to_string()); } #[cfg(feature = "nats")] if is_health_gate { self.emit_dag_event( "provisioning.dag.healthgate.failed", serde_json::json!({ "workflow_id": workflow_id, "gate_id": task_name, "error": e.to_string(), }), ) .await; } else { self.emit_dag_event( "provisioning.dag.formula.failed", serde_json::json!({ "workflow_id": workflow_id, "formula_id": formula_id, "task": task_name, "error": e.to_string(), "workspace": workspace, }), ) .await; } if self.config.fail_fast { state.status = WorkflowStatus::Failed; return Ok(state); } } } } // Update execution state { let mut states = self.execution_states.write().await; states.insert(workflow_id.clone(), state.clone()); } } // Finalize workflow state.completed_at = Some(chrono::Utc::now()); state.statistics.total_duration_ms = start_time.elapsed().as_millis() as u64; if state.statistics.failed_tasks > 0 { state.status = WorkflowStatus::Failed; } else { state.status = WorkflowStatus::Completed; } // Calculate final statistics. // duration_ms is written by execute_single_task into execution_states (the shared map), // not into the local `state` copy. Read from execution_states to get accurate durations. if state.statistics.completed_tasks > 0 { let total_task_duration: u64 = { let states = self.execution_states.read().await; states .get(&workflow_id) .map(|s| s.task_states.values().filter_map(|ts| ts.duration_ms).sum()) .unwrap_or(0) }; state.statistics.average_task_duration_ms = Some(total_task_duration / state.statistics.completed_tasks as u64); } let duration_minutes = state.statistics.total_duration_ms as f64 / 60000.0; if duration_minutes > 0.0 { state.statistics.throughput_tasks_per_minute = state.statistics.completed_tasks as f64 / duration_minutes; } Ok(state) } /// Execute a single task async fn execute_single_task( &self, workflow_id: String, task_name: String, task_def: WorkflowTaskDefinition, storage: Arc, config: WorkflowConfig, ) -> (String, Result<()>) { let task_start = Instant::now(); // Update task state to running { let mut states = self.execution_states.write().await; if let Some(state) = states.get_mut(&workflow_id) { if let Some(task_state) = state.task_states.get_mut(&task_name) { task_state.status = TaskStatus::Running; task_state.started_at = Some(chrono::Utc::now()); } state.statistics.running_tasks += 1; state.statistics.pending_tasks -= 1; } } let result = self .execute_task_with_retry(task_name.clone(), task_def, storage, config) .await; // Update task duration let duration_ms = task_start.elapsed().as_millis() as u64; { let mut states = self.execution_states.write().await; if let Some(state) = states.get_mut(&workflow_id) { if let Some(task_state) = state.task_states.get_mut(&task_name) { task_state.duration_ms = Some(duration_ms); } state.statistics.running_tasks -= 1; } } (task_name, result) } /// Execute task with retry logic async fn execute_task_with_retry( &self, task_name: String, task_def: WorkflowTaskDefinition, storage: Arc, config: WorkflowConfig, ) -> Result<()> { let max_retries = task_def.max_retries.unwrap_or(config.max_retries); let mut last_error = None; for retry in 0..=max_retries { if retry > 0 { info!( "Retrying task {} (attempt {}/{})", task_name, retry + 1, max_retries + 1 ); tokio::time::sleep(Duration::from_secs(config.retry_delay_seconds)).await; } // Create workflow task let workflow_task = WorkflowTask { id: Uuid::new_v4().to_string(), name: task_name.clone(), command: task_def.command.clone(), args: task_def.args.clone(), dependencies: task_def.dependencies.clone(), status: TaskStatus::Pending, created_at: chrono::Utc::now(), started_at: None, completed_at: None, output: None, error: None, tags: Default::default(), }; // Execute via storage backend match storage.enqueue(workflow_task.clone(), 1).await { Ok(_) => { // Wait for completion let timeout = Duration::from_secs( task_def .timeout_seconds .unwrap_or(config.task_timeout_seconds), ); match self .wait_for_task_completion(&workflow_task.id, timeout, storage.clone()) .await { Ok(_) => return Ok(()), Err(e) => { last_error = Some(e); continue; } } } Err(e) => { last_error = Some(e.into()); } } } Err(last_error.unwrap_or_else(|| anyhow::anyhow!("Task execution failed after retries"))) } /// Wait for task completion with timeout async fn wait_for_task_completion( &self, task_id: &str, timeout: Duration, storage: Arc, ) -> Result { let start = Instant::now(); let poll_interval = Duration::from_secs(1); loop { if start.elapsed() > timeout { return Err(anyhow::anyhow!("Task execution timeout")); } match storage.get_task(task_id).await { Ok(Some(task)) => match task.status { TaskStatus::Completed => return Ok(task), TaskStatus::Failed | TaskStatus::Cancelled => { return Err(anyhow::anyhow!( "Task failed: {}", task.error.unwrap_or_else(|| "Unknown error".to_string()) )); } _ => { tokio::time::sleep(poll_interval).await; } }, Ok(None) => { return Err(anyhow::anyhow!("Task not found")); } Err(e) => { return Err(anyhow::anyhow!("Storage error: {}", e)); } } } } /// Create workflow checkpoint async fn create_checkpoint( &self, state: &mut WorkflowExecutionState, completed_tasks: &HashSet, failed_tasks: &HashSet, ) -> Result<()> { let checkpoint = WorkflowCheckpoint { timestamp: chrono::Utc::now(), completed_tasks: completed_tasks.clone(), failed_tasks: failed_tasks.clone(), state_snapshot: serde_json::to_string(state)?, }; state.checkpoints.push(checkpoint); // Keep only last 10 checkpoints if state.checkpoints.len() > 10 { state.checkpoints.drain(0..state.checkpoints.len() - 10); } debug!("Created workflow checkpoint for {}", state.workflow_id); Ok(()) } /// Get workflow execution state pub async fn get_workflow_state(&self, workflow_id: &str) -> Option { let states = self.execution_states.read().await; states.get(workflow_id).cloned() } /// List all workflow states pub async fn list_workflows(&self) -> Vec { let states = self.execution_states.read().await; states.values().cloned().collect() } /// Cancel a running workflow pub async fn cancel_workflow(&self, workflow_id: &str) -> Result<()> { let mut states = self.execution_states.write().await; if let Some(state) = states.get_mut(workflow_id) { if state.status == WorkflowStatus::Running { state.status = WorkflowStatus::Cancelled; state.completed_at = Some(chrono::Utc::now()); info!("Cancelled workflow: {}", workflow_id); } } Ok(()) } } /// Parse KCL workflow definition from JSON pub fn parse_kcl_workflow(json_content: &str) -> Result { serde_json::from_str(json_content).context("Failed to parse KCL workflow definition") } /// Validate workflow definition pub fn validate_workflow_definition(definition: &WorkflowDefinition) -> Result<()> { // Check for duplicate task names let mut task_names = HashSet::new(); for task in &definition.tasks { if !task_names.insert(&task.name) { return Err(anyhow::anyhow!("Duplicate task name: {}", task.name)); } } // Check dependency references for task in &definition.tasks { for dep in &task.dependencies { if !task_names.contains(dep) { return Err(anyhow::anyhow!( "Task '{}' has invalid dependency: '{}'", task.name, dep )); } } } Ok(()) }