751 lines
24 KiB
Rust
751 lines
24 KiB
Rust
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
use tokio::time::{sleep, Duration};
|
|
use uuid::Uuid;
|
|
use chrono::Utc;
|
|
use serde_json;
|
|
|
|
// Simplified test structures for batch workflow testing
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
pub enum TaskType {
|
|
Server,
|
|
Taskserv,
|
|
Cluster,
|
|
}
|
|
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
pub enum TaskState {
|
|
Pending,
|
|
Running,
|
|
Completed,
|
|
Failed,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct TaskDependency {
|
|
pub task_id: Uuid,
|
|
pub dependency_type: String,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct BatchTask {
|
|
pub id: Uuid,
|
|
pub name: String,
|
|
pub task_type: TaskType,
|
|
pub provider: String,
|
|
pub config: serde_json::Value,
|
|
pub dependencies: Vec<TaskDependency>,
|
|
pub state: TaskState,
|
|
pub created_at: chrono::DateTime<Utc>,
|
|
pub updated_at: chrono::DateTime<Utc>,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct BatchWorkflow {
|
|
pub id: Uuid,
|
|
pub name: String,
|
|
pub tasks: Vec<BatchTask>,
|
|
}
|
|
|
|
impl BatchWorkflow {
|
|
pub fn new(id: Uuid, name: String) -> Self {
|
|
Self {
|
|
id,
|
|
name,
|
|
tasks: Vec::new(),
|
|
}
|
|
}
|
|
|
|
pub fn add_task(&mut self, task: BatchTask) {
|
|
self.tasks.push(task);
|
|
}
|
|
|
|
pub fn get_execution_order(&self) -> Result<Vec<Uuid>, String> {
|
|
let mut visited = std::collections::HashSet::new();
|
|
let mut temp_visited = std::collections::HashSet::new();
|
|
let mut result = Vec::new();
|
|
|
|
// Build dependency map
|
|
let mut task_map: HashMap<Uuid, &BatchTask> = HashMap::new();
|
|
for task in &self.tasks {
|
|
task_map.insert(task.id, task);
|
|
}
|
|
|
|
fn visit(
|
|
task_id: Uuid,
|
|
visited: &mut std::collections::HashSet<Uuid>,
|
|
temp_visited: &mut std::collections::HashSet<Uuid>,
|
|
result: &mut Vec<Uuid>,
|
|
task_map: &HashMap<Uuid, &BatchTask>,
|
|
) -> Result<(), String> {
|
|
if temp_visited.contains(&task_id) {
|
|
return Err("Circular dependency detected".to_string());
|
|
}
|
|
if visited.contains(&task_id) {
|
|
return Ok(());
|
|
}
|
|
|
|
temp_visited.insert(task_id);
|
|
|
|
if let Some(task) = task_map.get(&task_id) {
|
|
for dep in &task.dependencies {
|
|
visit(dep.task_id, visited, temp_visited, result, task_map)?;
|
|
}
|
|
}
|
|
|
|
temp_visited.remove(&task_id);
|
|
visited.insert(task_id);
|
|
result.push(task_id);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
for task in &self.tasks {
|
|
if !visited.contains(&task.id) {
|
|
visit(task.id, &mut visited, &mut temp_visited, &mut result, &task_map)?;
|
|
}
|
|
}
|
|
|
|
Ok(result)
|
|
}
|
|
}
|
|
|
|
// Simple in-memory storage for testing
|
|
#[derive(Debug, Clone)]
|
|
pub struct WorkflowState {
|
|
pub workflow_id: Uuid,
|
|
pub current_tasks: Vec<Uuid>,
|
|
pub completed_tasks: Vec<Uuid>,
|
|
pub failed_tasks: Vec<Uuid>,
|
|
pub checkpoints: Vec<serde_json::Value>,
|
|
pub metadata: serde_json::Value,
|
|
pub created_at: chrono::DateTime<Utc>,
|
|
pub updated_at: chrono::DateTime<Utc>,
|
|
}
|
|
|
|
pub struct SimpleStorage {
|
|
states: Arc<std::sync::Mutex<HashMap<Uuid, WorkflowState>>>,
|
|
}
|
|
|
|
impl SimpleStorage {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
states: Arc::new(std::sync::Mutex::new(HashMap::new())),
|
|
}
|
|
}
|
|
|
|
pub async fn save_workflow_state(&self, state: &WorkflowState) -> Result<(), String> {
|
|
let mut states = self.states.lock().unwrap();
|
|
states.insert(state.workflow_id, state.clone());
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn load_workflow_state(&self, workflow_id: Uuid) -> Result<WorkflowState, String> {
|
|
let states = self.states.lock().unwrap();
|
|
states.get(&workflow_id)
|
|
.cloned()
|
|
.ok_or_else(|| "Workflow state not found".to_string())
|
|
}
|
|
|
|
pub async fn list_workflow_states(&self) -> Result<Vec<Uuid>, String> {
|
|
let states = self.states.lock().unwrap();
|
|
Ok(states.keys().cloned().collect())
|
|
}
|
|
}
|
|
|
|
// Simple metrics collector
|
|
#[derive(Debug, Clone)]
|
|
pub struct WorkflowEvent {
|
|
pub workflow_id: Uuid,
|
|
pub task_id: Option<Uuid>,
|
|
pub event_type: String,
|
|
pub metadata: serde_json::Value,
|
|
pub timestamp: chrono::DateTime<Utc>,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct WorkflowMetrics {
|
|
pub total_tasks: u32,
|
|
pub completed_tasks: u32,
|
|
pub failed_tasks: u32,
|
|
pub total_duration_ms: u64,
|
|
}
|
|
|
|
pub struct SimpleMetricsCollector {
|
|
events: Arc<std::sync::Mutex<Vec<WorkflowEvent>>>,
|
|
}
|
|
|
|
impl SimpleMetricsCollector {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
events: Arc::new(std::sync::Mutex::new(Vec::new())),
|
|
}
|
|
}
|
|
|
|
pub async fn record_event(&self, event: WorkflowEvent) {
|
|
let mut events = self.events.lock().unwrap();
|
|
events.push(event);
|
|
}
|
|
|
|
pub async fn get_workflow_metrics(&self, workflow_id: Uuid) -> Result<WorkflowMetrics, String> {
|
|
let events = self.events.lock().unwrap();
|
|
let workflow_events: Vec<_> = events
|
|
.iter()
|
|
.filter(|e| e.workflow_id == workflow_id)
|
|
.collect();
|
|
|
|
let task_events: Vec<_> = workflow_events
|
|
.iter()
|
|
.filter(|e| e.task_id.is_some())
|
|
.collect();
|
|
|
|
let mut unique_tasks = std::collections::HashSet::new();
|
|
let mut completed = 0;
|
|
let mut failed = 0;
|
|
|
|
for event in &task_events {
|
|
if let Some(task_id) = event.task_id {
|
|
unique_tasks.insert(task_id);
|
|
match event.event_type.as_str() {
|
|
"task_completed" => completed += 1,
|
|
"task_failed" => failed += 1,
|
|
_ => {}
|
|
}
|
|
}
|
|
}
|
|
|
|
let start_time = workflow_events
|
|
.iter()
|
|
.map(|e| e.timestamp)
|
|
.min()
|
|
.unwrap_or(Utc::now());
|
|
|
|
let end_time = workflow_events
|
|
.iter()
|
|
.filter(|e| e.event_type == "workflow_completed")
|
|
.map(|e| e.timestamp)
|
|
.max();
|
|
|
|
let duration = if let Some(end) = end_time {
|
|
(end - start_time).num_milliseconds().max(0) as u64
|
|
} else {
|
|
(Utc::now() - start_time).num_milliseconds().max(0) as u64
|
|
};
|
|
|
|
Ok(WorkflowMetrics {
|
|
total_tasks: unique_tasks.len() as u32,
|
|
completed_tasks: completed,
|
|
failed_tasks: failed,
|
|
total_duration_ms: duration,
|
|
})
|
|
}
|
|
|
|
pub async fn get_workflow_events(&self, workflow_id: Uuid) -> Result<Vec<WorkflowEvent>, String> {
|
|
let events = self.events.lock().unwrap();
|
|
Ok(events
|
|
.iter()
|
|
.filter(|e| e.workflow_id == workflow_id)
|
|
.cloned()
|
|
.collect())
|
|
}
|
|
}
|
|
|
|
// Integration tests
|
|
#[tokio::test]
|
|
async fn test_complete_batch_workflow() {
|
|
let storage = Arc::new(SimpleStorage::new());
|
|
let metrics = Arc::new(SimpleMetricsCollector::new());
|
|
|
|
// Create a complete infrastructure deployment workflow
|
|
let workflow_id = Uuid::new_v4();
|
|
let mut workflow = BatchWorkflow::new(workflow_id, "test-infrastructure".to_string());
|
|
|
|
// Add server provisioning task
|
|
let server_task_id = Uuid::new_v4();
|
|
workflow.add_task(BatchTask {
|
|
id: server_task_id,
|
|
name: "upcloud-server-1".to_string(),
|
|
task_type: TaskType::Server,
|
|
provider: "upcloud".to_string(),
|
|
config: serde_json::json!({
|
|
"zone": "fi-hel1",
|
|
"plan": "1xCPU-1GB",
|
|
"image": "Ubuntu 24.04"
|
|
}),
|
|
dependencies: vec![],
|
|
state: TaskState::Pending,
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
});
|
|
|
|
// Add taskserv installation
|
|
let k8s_task_id = Uuid::new_v4();
|
|
workflow.add_task(BatchTask {
|
|
id: k8s_task_id,
|
|
name: "kubernetes-installation".to_string(),
|
|
task_type: TaskType::Taskserv,
|
|
provider: "kubernetes".to_string(),
|
|
config: serde_json::json!({
|
|
"version": "1.31.0",
|
|
"cni": "cilium"
|
|
}),
|
|
dependencies: vec![TaskDependency {
|
|
task_id: server_task_id,
|
|
dependency_type: "requires".to_string(),
|
|
}],
|
|
state: TaskState::Pending,
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
});
|
|
|
|
// Add cluster deployment
|
|
let cluster_task_id = Uuid::new_v4();
|
|
workflow.add_task(BatchTask {
|
|
id: cluster_task_id,
|
|
name: "web-cluster".to_string(),
|
|
task_type: TaskType::Cluster,
|
|
provider: "kubernetes".to_string(),
|
|
config: serde_json::json!({
|
|
"namespace": "web",
|
|
"replicas": 3
|
|
}),
|
|
dependencies: vec![TaskDependency {
|
|
task_id: k8s_task_id,
|
|
dependency_type: "requires".to_string(),
|
|
}],
|
|
state: TaskState::Pending,
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
});
|
|
|
|
// Save initial workflow state
|
|
let state = WorkflowState {
|
|
workflow_id,
|
|
current_tasks: workflow.tasks.iter().map(|t| t.id).collect(),
|
|
completed_tasks: vec![],
|
|
failed_tasks: vec![],
|
|
checkpoints: vec![],
|
|
metadata: serde_json::json!({"test": "integration"}),
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
};
|
|
|
|
storage.save_workflow_state(&state).await.unwrap();
|
|
|
|
// Test dependency resolution
|
|
let execution_order = workflow.get_execution_order().unwrap();
|
|
assert_eq!(execution_order.len(), 3);
|
|
|
|
// Verify dependency order
|
|
let server_pos = execution_order.iter().position(|&id| id == server_task_id).unwrap();
|
|
let k8s_pos = execution_order.iter().position(|&id| id == k8s_task_id).unwrap();
|
|
let cluster_pos = execution_order.iter().position(|&id| id == cluster_task_id).unwrap();
|
|
|
|
assert!(server_pos < k8s_pos);
|
|
assert!(k8s_pos < cluster_pos);
|
|
|
|
// Record workflow start
|
|
metrics.record_event(WorkflowEvent {
|
|
workflow_id,
|
|
task_id: None,
|
|
event_type: "workflow_started".to_string(),
|
|
metadata: serde_json::json!({"name": "test-infrastructure"}),
|
|
timestamp: Utc::now(),
|
|
}).await;
|
|
|
|
// Simulate task execution in correct order
|
|
for task_id in execution_order {
|
|
let task = workflow.tasks.iter_mut().find(|t| t.id == task_id).unwrap();
|
|
|
|
// Mark task as running
|
|
task.state = TaskState::Running;
|
|
metrics.record_event(WorkflowEvent {
|
|
workflow_id,
|
|
task_id: Some(task_id),
|
|
event_type: "task_started".to_string(),
|
|
metadata: serde_json::json!({"task_name": task.name}),
|
|
timestamp: Utc::now(),
|
|
}).await;
|
|
|
|
// Simulate execution time
|
|
sleep(Duration::from_millis(50)).await;
|
|
|
|
// Mark task as completed
|
|
task.state = TaskState::Completed;
|
|
task.updated_at = Utc::now();
|
|
|
|
metrics.record_event(WorkflowEvent {
|
|
workflow_id,
|
|
task_id: Some(task_id),
|
|
event_type: "task_completed".to_string(),
|
|
metadata: serde_json::json!({"task_name": task.name}),
|
|
timestamp: Utc::now(),
|
|
}).await;
|
|
}
|
|
|
|
// Record workflow completion
|
|
metrics.record_event(WorkflowEvent {
|
|
workflow_id,
|
|
task_id: None,
|
|
event_type: "workflow_completed".to_string(),
|
|
metadata: serde_json::json!({"total_tasks": 3}),
|
|
timestamp: Utc::now(),
|
|
}).await;
|
|
|
|
// Verify all tasks completed successfully
|
|
for task in &workflow.tasks {
|
|
assert_eq!(task.state, TaskState::Completed);
|
|
}
|
|
|
|
// Verify metrics were collected
|
|
let workflow_metrics = metrics.get_workflow_metrics(workflow_id).await.unwrap();
|
|
assert_eq!(workflow_metrics.total_tasks, 3);
|
|
assert_eq!(workflow_metrics.completed_tasks, 3);
|
|
assert_eq!(workflow_metrics.failed_tasks, 0);
|
|
assert!(workflow_metrics.total_duration_ms > 100); // At least our sleep time
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_mixed_provider_deployment() {
|
|
let workflow_id = Uuid::new_v4();
|
|
let mut workflow = BatchWorkflow::new(workflow_id, "mixed-provider".to_string());
|
|
|
|
let upcloud_task_id = Uuid::new_v4();
|
|
let aws_task_id = Uuid::new_v4();
|
|
|
|
// UpCloud server
|
|
workflow.add_task(BatchTask {
|
|
id: upcloud_task_id,
|
|
name: "upcloud-web-server".to_string(),
|
|
task_type: TaskType::Server,
|
|
provider: "upcloud".to_string(),
|
|
config: serde_json::json!({
|
|
"zone": "fi-hel1",
|
|
"plan": "2xCPU-4GB"
|
|
}),
|
|
dependencies: vec![],
|
|
state: TaskState::Pending,
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
});
|
|
|
|
// AWS resources
|
|
workflow.add_task(BatchTask {
|
|
id: aws_task_id,
|
|
name: "aws-s3-storage".to_string(),
|
|
task_type: TaskType::Server,
|
|
provider: "aws".to_string(),
|
|
config: serde_json::json!({
|
|
"service": "s3",
|
|
"bucket": "test-deployment-assets",
|
|
"region": "eu-north-1"
|
|
}),
|
|
dependencies: vec![],
|
|
state: TaskState::Pending,
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
});
|
|
|
|
let execution_order = workflow.get_execution_order().unwrap();
|
|
assert_eq!(execution_order.len(), 2);
|
|
|
|
// Both tasks should be executable in parallel since no dependencies
|
|
for task_id in execution_order {
|
|
let task = workflow.tasks.iter_mut().find(|t| t.id == task_id).unwrap();
|
|
task.state = TaskState::Completed;
|
|
}
|
|
|
|
// Verify mixed provider execution
|
|
let upcloud_task = workflow.tasks.iter().find(|t| t.provider == "upcloud").unwrap();
|
|
let aws_task = workflow.tasks.iter().find(|t| t.provider == "aws").unwrap();
|
|
|
|
assert_eq!(upcloud_task.state, TaskState::Completed);
|
|
assert_eq!(aws_task.state, TaskState::Completed);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_complex_dependency_resolution() {
|
|
let workflow_id = Uuid::new_v4();
|
|
let mut workflow = BatchWorkflow::new(workflow_id, "complex-deps".to_string());
|
|
|
|
// Create complex dependency chain:
|
|
// Server1 -> K8s -> Storage -> App
|
|
// Server2 -> K8s -> Networking -> App
|
|
|
|
let server1_id = Uuid::new_v4();
|
|
let server2_id = Uuid::new_v4();
|
|
let k8s_id = Uuid::new_v4();
|
|
let storage_id = Uuid::new_v4();
|
|
let network_id = Uuid::new_v4();
|
|
let app_id = Uuid::new_v4();
|
|
|
|
// Add tasks in random order to test dependency resolution
|
|
workflow.add_task(BatchTask {
|
|
id: app_id,
|
|
name: "application".to_string(),
|
|
task_type: TaskType::Cluster,
|
|
provider: "kubernetes".to_string(),
|
|
config: serde_json::json!({}),
|
|
dependencies: vec![
|
|
TaskDependency { task_id: storage_id, dependency_type: "requires".to_string() },
|
|
TaskDependency { task_id: network_id, dependency_type: "requires".to_string() },
|
|
],
|
|
state: TaskState::Pending,
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
});
|
|
|
|
workflow.add_task(BatchTask {
|
|
id: storage_id,
|
|
name: "storage".to_string(),
|
|
task_type: TaskType::Taskserv,
|
|
provider: "rook-ceph".to_string(),
|
|
config: serde_json::json!({}),
|
|
dependencies: vec![TaskDependency { task_id: k8s_id, dependency_type: "requires".to_string() }],
|
|
state: TaskState::Pending,
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
});
|
|
|
|
workflow.add_task(BatchTask {
|
|
id: network_id,
|
|
name: "networking".to_string(),
|
|
task_type: TaskType::Taskserv,
|
|
provider: "cilium".to_string(),
|
|
config: serde_json::json!({}),
|
|
dependencies: vec![TaskDependency { task_id: k8s_id, dependency_type: "requires".to_string() }],
|
|
state: TaskState::Pending,
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
});
|
|
|
|
workflow.add_task(BatchTask {
|
|
id: k8s_id,
|
|
name: "kubernetes".to_string(),
|
|
task_type: TaskType::Taskserv,
|
|
provider: "kubernetes".to_string(),
|
|
config: serde_json::json!({}),
|
|
dependencies: vec![
|
|
TaskDependency { task_id: server1_id, dependency_type: "requires".to_string() },
|
|
TaskDependency { task_id: server2_id, dependency_type: "requires".to_string() },
|
|
],
|
|
state: TaskState::Pending,
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
});
|
|
|
|
workflow.add_task(BatchTask {
|
|
id: server1_id,
|
|
name: "server1".to_string(),
|
|
task_type: TaskType::Server,
|
|
provider: "upcloud".to_string(),
|
|
config: serde_json::json!({}),
|
|
dependencies: vec![],
|
|
state: TaskState::Pending,
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
});
|
|
|
|
workflow.add_task(BatchTask {
|
|
id: server2_id,
|
|
name: "server2".to_string(),
|
|
task_type: TaskType::Server,
|
|
provider: "upcloud".to_string(),
|
|
config: serde_json::json!({}),
|
|
dependencies: vec![],
|
|
state: TaskState::Pending,
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
});
|
|
|
|
let execution_order = workflow.get_execution_order().unwrap();
|
|
assert_eq!(execution_order.len(), 6);
|
|
|
|
// Verify dependency order
|
|
let server1_pos = execution_order.iter().position(|&id| id == server1_id).unwrap();
|
|
let server2_pos = execution_order.iter().position(|&id| id == server2_id).unwrap();
|
|
let k8s_pos = execution_order.iter().position(|&id| id == k8s_id).unwrap();
|
|
let storage_pos = execution_order.iter().position(|&id| id == storage_id).unwrap();
|
|
let network_pos = execution_order.iter().position(|&id| id == network_id).unwrap();
|
|
let app_pos = execution_order.iter().position(|&id| id == app_id).unwrap();
|
|
|
|
// Servers must come before K8s
|
|
assert!(server1_pos < k8s_pos);
|
|
assert!(server2_pos < k8s_pos);
|
|
|
|
// K8s must come before storage and networking
|
|
assert!(k8s_pos < storage_pos);
|
|
assert!(k8s_pos < network_pos);
|
|
|
|
// Storage and networking must come before app
|
|
assert!(storage_pos < app_pos);
|
|
assert!(network_pos < app_pos);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_rollback_and_recovery() {
|
|
let storage = Arc::new(SimpleStorage::new());
|
|
|
|
let workflow_id = Uuid::new_v4();
|
|
let mut workflow = BatchWorkflow::new(workflow_id, "rollback-test".to_string());
|
|
|
|
let task1_id = Uuid::new_v4();
|
|
let task2_id = Uuid::new_v4();
|
|
let task3_id = Uuid::new_v4();
|
|
|
|
workflow.add_task(BatchTask {
|
|
id: task1_id,
|
|
name: "server-creation".to_string(),
|
|
task_type: TaskType::Server,
|
|
provider: "upcloud".to_string(),
|
|
config: serde_json::json!({}),
|
|
dependencies: vec![],
|
|
state: TaskState::Pending,
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
});
|
|
|
|
workflow.add_task(BatchTask {
|
|
id: task2_id,
|
|
name: "kubernetes-install".to_string(),
|
|
task_type: TaskType::Taskserv,
|
|
provider: "kubernetes".to_string(),
|
|
config: serde_json::json!({}),
|
|
dependencies: vec![TaskDependency { task_id: task1_id, dependency_type: "requires".to_string() }],
|
|
state: TaskState::Pending,
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
});
|
|
|
|
workflow.add_task(BatchTask {
|
|
id: task3_id,
|
|
name: "app-deploy".to_string(),
|
|
task_type: TaskType::Cluster,
|
|
provider: "kubernetes".to_string(),
|
|
config: serde_json::json!({}),
|
|
dependencies: vec![TaskDependency { task_id: task2_id, dependency_type: "requires".to_string() }],
|
|
state: TaskState::Pending,
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
});
|
|
|
|
// Execute first task successfully
|
|
workflow.tasks.iter_mut().find(|t| t.id == task1_id).unwrap().state = TaskState::Completed;
|
|
|
|
// Create checkpoint after first task
|
|
let checkpoint_state = WorkflowState {
|
|
workflow_id,
|
|
current_tasks: vec![task2_id, task3_id],
|
|
completed_tasks: vec![task1_id],
|
|
failed_tasks: vec![],
|
|
checkpoints: vec![serde_json::json!({
|
|
"checkpoint_id": "after-server-creation",
|
|
"completed_tasks": [task1_id],
|
|
"timestamp": Utc::now()
|
|
})],
|
|
metadata: serde_json::json!({"checkpoint": true}),
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
};
|
|
|
|
storage.save_workflow_state(&checkpoint_state).await.unwrap();
|
|
|
|
// Simulate failure on second task
|
|
workflow.tasks.iter_mut().find(|t| t.id == task2_id).unwrap().state = TaskState::Failed;
|
|
|
|
// Verify rollback capability
|
|
let recovered_state = storage.load_workflow_state(workflow_id).await.unwrap();
|
|
assert_eq!(recovered_state.completed_tasks.len(), 1);
|
|
assert_eq!(recovered_state.completed_tasks[0], task1_id);
|
|
assert_eq!(recovered_state.checkpoints.len(), 1);
|
|
|
|
// Test that rollback preserves completed work
|
|
assert!(recovered_state.completed_tasks.contains(&task1_id));
|
|
assert!(recovered_state.current_tasks.contains(&task2_id));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_state_persistence() {
|
|
let storage = Arc::new(SimpleStorage::new());
|
|
|
|
let workflow_id = Uuid::new_v4();
|
|
let state = WorkflowState {
|
|
workflow_id,
|
|
current_tasks: vec![Uuid::new_v4(), Uuid::new_v4()],
|
|
completed_tasks: vec![Uuid::new_v4()],
|
|
failed_tasks: vec![],
|
|
checkpoints: vec![serde_json::json!({"test": "checkpoint"})],
|
|
metadata: serde_json::json!({"backend": "filesystem"}),
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
};
|
|
|
|
// Save and load state
|
|
storage.save_workflow_state(&state).await.unwrap();
|
|
let loaded_state = storage.load_workflow_state(workflow_id).await.unwrap();
|
|
|
|
assert_eq!(loaded_state.workflow_id, workflow_id);
|
|
assert_eq!(loaded_state.current_tasks.len(), 2);
|
|
assert_eq!(loaded_state.completed_tasks.len(), 1);
|
|
assert_eq!(loaded_state.checkpoints.len(), 1);
|
|
|
|
// Verify state persistence
|
|
let all_states = storage.list_workflow_states().await.unwrap();
|
|
assert!(all_states.contains(&workflow_id));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_metrics_collection() {
|
|
let metrics = Arc::new(SimpleMetricsCollector::new());
|
|
let workflow_id = Uuid::new_v4();
|
|
let task_id = Uuid::new_v4();
|
|
|
|
// Record various events
|
|
metrics.record_event(WorkflowEvent {
|
|
workflow_id,
|
|
task_id: Some(task_id),
|
|
event_type: "workflow_started".to_string(),
|
|
metadata: serde_json::json!({"name": "test-workflow"}),
|
|
timestamp: Utc::now(),
|
|
}).await;
|
|
|
|
sleep(Duration::from_millis(10)).await;
|
|
|
|
metrics.record_event(WorkflowEvent {
|
|
workflow_id,
|
|
task_id: Some(task_id),
|
|
event_type: "task_started".to_string(),
|
|
metadata: serde_json::json!({"task_name": "test-task"}),
|
|
timestamp: Utc::now(),
|
|
}).await;
|
|
|
|
sleep(Duration::from_millis(10)).await;
|
|
|
|
metrics.record_event(WorkflowEvent {
|
|
workflow_id,
|
|
task_id: Some(task_id),
|
|
event_type: "task_completed".to_string(),
|
|
metadata: serde_json::json!({"task_name": "test-task", "duration_ms": 100}),
|
|
timestamp: Utc::now(),
|
|
}).await;
|
|
|
|
metrics.record_event(WorkflowEvent {
|
|
workflow_id,
|
|
task_id: None,
|
|
event_type: "workflow_completed".to_string(),
|
|
metadata: serde_json::json!({"total_duration_ms": 200}),
|
|
timestamp: Utc::now(),
|
|
}).await;
|
|
|
|
// Verify metrics collection
|
|
let workflow_metrics = metrics.get_workflow_metrics(workflow_id).await.unwrap();
|
|
assert_eq!(workflow_metrics.total_tasks, 1);
|
|
assert_eq!(workflow_metrics.completed_tasks, 1);
|
|
assert_eq!(workflow_metrics.failed_tasks, 0);
|
|
assert!(workflow_metrics.total_duration_ms >= 20);
|
|
|
|
let events = metrics.get_workflow_events(workflow_id).await.unwrap();
|
|
assert_eq!(events.len(), 4);
|
|
assert!(events.iter().any(|e| e.event_type == "workflow_started"));
|
|
assert!(events.iter().any(|e| e.event_type == "workflow_completed"));
|
|
} |