prvng_platform/orchestrator/tests/simple_batch_test.rs
2025-10-07 10:59:52 +01:00

751 lines
24 KiB
Rust

use std::collections::HashMap;
use std::sync::Arc;
use tokio::time::{sleep, Duration};
use uuid::Uuid;
use chrono::Utc;
use serde_json;
// Simplified test structures for batch workflow testing
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TaskType {
Server,
Taskserv,
Cluster,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TaskState {
Pending,
Running,
Completed,
Failed,
}
#[derive(Debug, Clone)]
pub struct TaskDependency {
pub task_id: Uuid,
pub dependency_type: String,
}
#[derive(Debug, Clone)]
pub struct BatchTask {
pub id: Uuid,
pub name: String,
pub task_type: TaskType,
pub provider: String,
pub config: serde_json::Value,
pub dependencies: Vec<TaskDependency>,
pub state: TaskState,
pub created_at: chrono::DateTime<Utc>,
pub updated_at: chrono::DateTime<Utc>,
}
#[derive(Debug)]
pub struct BatchWorkflow {
pub id: Uuid,
pub name: String,
pub tasks: Vec<BatchTask>,
}
impl BatchWorkflow {
pub fn new(id: Uuid, name: String) -> Self {
Self {
id,
name,
tasks: Vec::new(),
}
}
pub fn add_task(&mut self, task: BatchTask) {
self.tasks.push(task);
}
pub fn get_execution_order(&self) -> Result<Vec<Uuid>, String> {
let mut visited = std::collections::HashSet::new();
let mut temp_visited = std::collections::HashSet::new();
let mut result = Vec::new();
// Build dependency map
let mut task_map: HashMap<Uuid, &BatchTask> = HashMap::new();
for task in &self.tasks {
task_map.insert(task.id, task);
}
fn visit(
task_id: Uuid,
visited: &mut std::collections::HashSet<Uuid>,
temp_visited: &mut std::collections::HashSet<Uuid>,
result: &mut Vec<Uuid>,
task_map: &HashMap<Uuid, &BatchTask>,
) -> Result<(), String> {
if temp_visited.contains(&task_id) {
return Err("Circular dependency detected".to_string());
}
if visited.contains(&task_id) {
return Ok(());
}
temp_visited.insert(task_id);
if let Some(task) = task_map.get(&task_id) {
for dep in &task.dependencies {
visit(dep.task_id, visited, temp_visited, result, task_map)?;
}
}
temp_visited.remove(&task_id);
visited.insert(task_id);
result.push(task_id);
Ok(())
}
for task in &self.tasks {
if !visited.contains(&task.id) {
visit(task.id, &mut visited, &mut temp_visited, &mut result, &task_map)?;
}
}
Ok(result)
}
}
// Simple in-memory storage for testing
#[derive(Debug, Clone)]
pub struct WorkflowState {
pub workflow_id: Uuid,
pub current_tasks: Vec<Uuid>,
pub completed_tasks: Vec<Uuid>,
pub failed_tasks: Vec<Uuid>,
pub checkpoints: Vec<serde_json::Value>,
pub metadata: serde_json::Value,
pub created_at: chrono::DateTime<Utc>,
pub updated_at: chrono::DateTime<Utc>,
}
pub struct SimpleStorage {
states: Arc<std::sync::Mutex<HashMap<Uuid, WorkflowState>>>,
}
impl SimpleStorage {
pub fn new() -> Self {
Self {
states: Arc::new(std::sync::Mutex::new(HashMap::new())),
}
}
pub async fn save_workflow_state(&self, state: &WorkflowState) -> Result<(), String> {
let mut states = self.states.lock().unwrap();
states.insert(state.workflow_id, state.clone());
Ok(())
}
pub async fn load_workflow_state(&self, workflow_id: Uuid) -> Result<WorkflowState, String> {
let states = self.states.lock().unwrap();
states.get(&workflow_id)
.cloned()
.ok_or_else(|| "Workflow state not found".to_string())
}
pub async fn list_workflow_states(&self) -> Result<Vec<Uuid>, String> {
let states = self.states.lock().unwrap();
Ok(states.keys().cloned().collect())
}
}
// Simple metrics collector
#[derive(Debug, Clone)]
pub struct WorkflowEvent {
pub workflow_id: Uuid,
pub task_id: Option<Uuid>,
pub event_type: String,
pub metadata: serde_json::Value,
pub timestamp: chrono::DateTime<Utc>,
}
#[derive(Debug, Clone)]
pub struct WorkflowMetrics {
pub total_tasks: u32,
pub completed_tasks: u32,
pub failed_tasks: u32,
pub total_duration_ms: u64,
}
pub struct SimpleMetricsCollector {
events: Arc<std::sync::Mutex<Vec<WorkflowEvent>>>,
}
impl SimpleMetricsCollector {
pub fn new() -> Self {
Self {
events: Arc::new(std::sync::Mutex::new(Vec::new())),
}
}
pub async fn record_event(&self, event: WorkflowEvent) {
let mut events = self.events.lock().unwrap();
events.push(event);
}
pub async fn get_workflow_metrics(&self, workflow_id: Uuid) -> Result<WorkflowMetrics, String> {
let events = self.events.lock().unwrap();
let workflow_events: Vec<_> = events
.iter()
.filter(|e| e.workflow_id == workflow_id)
.collect();
let task_events: Vec<_> = workflow_events
.iter()
.filter(|e| e.task_id.is_some())
.collect();
let mut unique_tasks = std::collections::HashSet::new();
let mut completed = 0;
let mut failed = 0;
for event in &task_events {
if let Some(task_id) = event.task_id {
unique_tasks.insert(task_id);
match event.event_type.as_str() {
"task_completed" => completed += 1,
"task_failed" => failed += 1,
_ => {}
}
}
}
let start_time = workflow_events
.iter()
.map(|e| e.timestamp)
.min()
.unwrap_or(Utc::now());
let end_time = workflow_events
.iter()
.filter(|e| e.event_type == "workflow_completed")
.map(|e| e.timestamp)
.max();
let duration = if let Some(end) = end_time {
(end - start_time).num_milliseconds().max(0) as u64
} else {
(Utc::now() - start_time).num_milliseconds().max(0) as u64
};
Ok(WorkflowMetrics {
total_tasks: unique_tasks.len() as u32,
completed_tasks: completed,
failed_tasks: failed,
total_duration_ms: duration,
})
}
pub async fn get_workflow_events(&self, workflow_id: Uuid) -> Result<Vec<WorkflowEvent>, String> {
let events = self.events.lock().unwrap();
Ok(events
.iter()
.filter(|e| e.workflow_id == workflow_id)
.cloned()
.collect())
}
}
// Integration tests
#[tokio::test]
async fn test_complete_batch_workflow() {
let storage = Arc::new(SimpleStorage::new());
let metrics = Arc::new(SimpleMetricsCollector::new());
// Create a complete infrastructure deployment workflow
let workflow_id = Uuid::new_v4();
let mut workflow = BatchWorkflow::new(workflow_id, "test-infrastructure".to_string());
// Add server provisioning task
let server_task_id = Uuid::new_v4();
workflow.add_task(BatchTask {
id: server_task_id,
name: "upcloud-server-1".to_string(),
task_type: TaskType::Server,
provider: "upcloud".to_string(),
config: serde_json::json!({
"zone": "fi-hel1",
"plan": "1xCPU-1GB",
"image": "Ubuntu 24.04"
}),
dependencies: vec![],
state: TaskState::Pending,
created_at: Utc::now(),
updated_at: Utc::now(),
});
// Add taskserv installation
let k8s_task_id = Uuid::new_v4();
workflow.add_task(BatchTask {
id: k8s_task_id,
name: "kubernetes-installation".to_string(),
task_type: TaskType::Taskserv,
provider: "kubernetes".to_string(),
config: serde_json::json!({
"version": "1.31.0",
"cni": "cilium"
}),
dependencies: vec![TaskDependency {
task_id: server_task_id,
dependency_type: "requires".to_string(),
}],
state: TaskState::Pending,
created_at: Utc::now(),
updated_at: Utc::now(),
});
// Add cluster deployment
let cluster_task_id = Uuid::new_v4();
workflow.add_task(BatchTask {
id: cluster_task_id,
name: "web-cluster".to_string(),
task_type: TaskType::Cluster,
provider: "kubernetes".to_string(),
config: serde_json::json!({
"namespace": "web",
"replicas": 3
}),
dependencies: vec![TaskDependency {
task_id: k8s_task_id,
dependency_type: "requires".to_string(),
}],
state: TaskState::Pending,
created_at: Utc::now(),
updated_at: Utc::now(),
});
// Save initial workflow state
let state = WorkflowState {
workflow_id,
current_tasks: workflow.tasks.iter().map(|t| t.id).collect(),
completed_tasks: vec![],
failed_tasks: vec![],
checkpoints: vec![],
metadata: serde_json::json!({"test": "integration"}),
created_at: Utc::now(),
updated_at: Utc::now(),
};
storage.save_workflow_state(&state).await.unwrap();
// Test dependency resolution
let execution_order = workflow.get_execution_order().unwrap();
assert_eq!(execution_order.len(), 3);
// Verify dependency order
let server_pos = execution_order.iter().position(|&id| id == server_task_id).unwrap();
let k8s_pos = execution_order.iter().position(|&id| id == k8s_task_id).unwrap();
let cluster_pos = execution_order.iter().position(|&id| id == cluster_task_id).unwrap();
assert!(server_pos < k8s_pos);
assert!(k8s_pos < cluster_pos);
// Record workflow start
metrics.record_event(WorkflowEvent {
workflow_id,
task_id: None,
event_type: "workflow_started".to_string(),
metadata: serde_json::json!({"name": "test-infrastructure"}),
timestamp: Utc::now(),
}).await;
// Simulate task execution in correct order
for task_id in execution_order {
let task = workflow.tasks.iter_mut().find(|t| t.id == task_id).unwrap();
// Mark task as running
task.state = TaskState::Running;
metrics.record_event(WorkflowEvent {
workflow_id,
task_id: Some(task_id),
event_type: "task_started".to_string(),
metadata: serde_json::json!({"task_name": task.name}),
timestamp: Utc::now(),
}).await;
// Simulate execution time
sleep(Duration::from_millis(50)).await;
// Mark task as completed
task.state = TaskState::Completed;
task.updated_at = Utc::now();
metrics.record_event(WorkflowEvent {
workflow_id,
task_id: Some(task_id),
event_type: "task_completed".to_string(),
metadata: serde_json::json!({"task_name": task.name}),
timestamp: Utc::now(),
}).await;
}
// Record workflow completion
metrics.record_event(WorkflowEvent {
workflow_id,
task_id: None,
event_type: "workflow_completed".to_string(),
metadata: serde_json::json!({"total_tasks": 3}),
timestamp: Utc::now(),
}).await;
// Verify all tasks completed successfully
for task in &workflow.tasks {
assert_eq!(task.state, TaskState::Completed);
}
// Verify metrics were collected
let workflow_metrics = metrics.get_workflow_metrics(workflow_id).await.unwrap();
assert_eq!(workflow_metrics.total_tasks, 3);
assert_eq!(workflow_metrics.completed_tasks, 3);
assert_eq!(workflow_metrics.failed_tasks, 0);
assert!(workflow_metrics.total_duration_ms > 100); // At least our sleep time
}
#[tokio::test]
async fn test_mixed_provider_deployment() {
let workflow_id = Uuid::new_v4();
let mut workflow = BatchWorkflow::new(workflow_id, "mixed-provider".to_string());
let upcloud_task_id = Uuid::new_v4();
let aws_task_id = Uuid::new_v4();
// UpCloud server
workflow.add_task(BatchTask {
id: upcloud_task_id,
name: "upcloud-web-server".to_string(),
task_type: TaskType::Server,
provider: "upcloud".to_string(),
config: serde_json::json!({
"zone": "fi-hel1",
"plan": "2xCPU-4GB"
}),
dependencies: vec![],
state: TaskState::Pending,
created_at: Utc::now(),
updated_at: Utc::now(),
});
// AWS resources
workflow.add_task(BatchTask {
id: aws_task_id,
name: "aws-s3-storage".to_string(),
task_type: TaskType::Server,
provider: "aws".to_string(),
config: serde_json::json!({
"service": "s3",
"bucket": "test-deployment-assets",
"region": "eu-north-1"
}),
dependencies: vec![],
state: TaskState::Pending,
created_at: Utc::now(),
updated_at: Utc::now(),
});
let execution_order = workflow.get_execution_order().unwrap();
assert_eq!(execution_order.len(), 2);
// Both tasks should be executable in parallel since no dependencies
for task_id in execution_order {
let task = workflow.tasks.iter_mut().find(|t| t.id == task_id).unwrap();
task.state = TaskState::Completed;
}
// Verify mixed provider execution
let upcloud_task = workflow.tasks.iter().find(|t| t.provider == "upcloud").unwrap();
let aws_task = workflow.tasks.iter().find(|t| t.provider == "aws").unwrap();
assert_eq!(upcloud_task.state, TaskState::Completed);
assert_eq!(aws_task.state, TaskState::Completed);
}
#[tokio::test]
async fn test_complex_dependency_resolution() {
let workflow_id = Uuid::new_v4();
let mut workflow = BatchWorkflow::new(workflow_id, "complex-deps".to_string());
// Create complex dependency chain:
// Server1 -> K8s -> Storage -> App
// Server2 -> K8s -> Networking -> App
let server1_id = Uuid::new_v4();
let server2_id = Uuid::new_v4();
let k8s_id = Uuid::new_v4();
let storage_id = Uuid::new_v4();
let network_id = Uuid::new_v4();
let app_id = Uuid::new_v4();
// Add tasks in random order to test dependency resolution
workflow.add_task(BatchTask {
id: app_id,
name: "application".to_string(),
task_type: TaskType::Cluster,
provider: "kubernetes".to_string(),
config: serde_json::json!({}),
dependencies: vec![
TaskDependency { task_id: storage_id, dependency_type: "requires".to_string() },
TaskDependency { task_id: network_id, dependency_type: "requires".to_string() },
],
state: TaskState::Pending,
created_at: Utc::now(),
updated_at: Utc::now(),
});
workflow.add_task(BatchTask {
id: storage_id,
name: "storage".to_string(),
task_type: TaskType::Taskserv,
provider: "rook-ceph".to_string(),
config: serde_json::json!({}),
dependencies: vec![TaskDependency { task_id: k8s_id, dependency_type: "requires".to_string() }],
state: TaskState::Pending,
created_at: Utc::now(),
updated_at: Utc::now(),
});
workflow.add_task(BatchTask {
id: network_id,
name: "networking".to_string(),
task_type: TaskType::Taskserv,
provider: "cilium".to_string(),
config: serde_json::json!({}),
dependencies: vec![TaskDependency { task_id: k8s_id, dependency_type: "requires".to_string() }],
state: TaskState::Pending,
created_at: Utc::now(),
updated_at: Utc::now(),
});
workflow.add_task(BatchTask {
id: k8s_id,
name: "kubernetes".to_string(),
task_type: TaskType::Taskserv,
provider: "kubernetes".to_string(),
config: serde_json::json!({}),
dependencies: vec![
TaskDependency { task_id: server1_id, dependency_type: "requires".to_string() },
TaskDependency { task_id: server2_id, dependency_type: "requires".to_string() },
],
state: TaskState::Pending,
created_at: Utc::now(),
updated_at: Utc::now(),
});
workflow.add_task(BatchTask {
id: server1_id,
name: "server1".to_string(),
task_type: TaskType::Server,
provider: "upcloud".to_string(),
config: serde_json::json!({}),
dependencies: vec![],
state: TaskState::Pending,
created_at: Utc::now(),
updated_at: Utc::now(),
});
workflow.add_task(BatchTask {
id: server2_id,
name: "server2".to_string(),
task_type: TaskType::Server,
provider: "upcloud".to_string(),
config: serde_json::json!({}),
dependencies: vec![],
state: TaskState::Pending,
created_at: Utc::now(),
updated_at: Utc::now(),
});
let execution_order = workflow.get_execution_order().unwrap();
assert_eq!(execution_order.len(), 6);
// Verify dependency order
let server1_pos = execution_order.iter().position(|&id| id == server1_id).unwrap();
let server2_pos = execution_order.iter().position(|&id| id == server2_id).unwrap();
let k8s_pos = execution_order.iter().position(|&id| id == k8s_id).unwrap();
let storage_pos = execution_order.iter().position(|&id| id == storage_id).unwrap();
let network_pos = execution_order.iter().position(|&id| id == network_id).unwrap();
let app_pos = execution_order.iter().position(|&id| id == app_id).unwrap();
// Servers must come before K8s
assert!(server1_pos < k8s_pos);
assert!(server2_pos < k8s_pos);
// K8s must come before storage and networking
assert!(k8s_pos < storage_pos);
assert!(k8s_pos < network_pos);
// Storage and networking must come before app
assert!(storage_pos < app_pos);
assert!(network_pos < app_pos);
}
#[tokio::test]
async fn test_rollback_and_recovery() {
let storage = Arc::new(SimpleStorage::new());
let workflow_id = Uuid::new_v4();
let mut workflow = BatchWorkflow::new(workflow_id, "rollback-test".to_string());
let task1_id = Uuid::new_v4();
let task2_id = Uuid::new_v4();
let task3_id = Uuid::new_v4();
workflow.add_task(BatchTask {
id: task1_id,
name: "server-creation".to_string(),
task_type: TaskType::Server,
provider: "upcloud".to_string(),
config: serde_json::json!({}),
dependencies: vec![],
state: TaskState::Pending,
created_at: Utc::now(),
updated_at: Utc::now(),
});
workflow.add_task(BatchTask {
id: task2_id,
name: "kubernetes-install".to_string(),
task_type: TaskType::Taskserv,
provider: "kubernetes".to_string(),
config: serde_json::json!({}),
dependencies: vec![TaskDependency { task_id: task1_id, dependency_type: "requires".to_string() }],
state: TaskState::Pending,
created_at: Utc::now(),
updated_at: Utc::now(),
});
workflow.add_task(BatchTask {
id: task3_id,
name: "app-deploy".to_string(),
task_type: TaskType::Cluster,
provider: "kubernetes".to_string(),
config: serde_json::json!({}),
dependencies: vec![TaskDependency { task_id: task2_id, dependency_type: "requires".to_string() }],
state: TaskState::Pending,
created_at: Utc::now(),
updated_at: Utc::now(),
});
// Execute first task successfully
workflow.tasks.iter_mut().find(|t| t.id == task1_id).unwrap().state = TaskState::Completed;
// Create checkpoint after first task
let checkpoint_state = WorkflowState {
workflow_id,
current_tasks: vec![task2_id, task3_id],
completed_tasks: vec![task1_id],
failed_tasks: vec![],
checkpoints: vec![serde_json::json!({
"checkpoint_id": "after-server-creation",
"completed_tasks": [task1_id],
"timestamp": Utc::now()
})],
metadata: serde_json::json!({"checkpoint": true}),
created_at: Utc::now(),
updated_at: Utc::now(),
};
storage.save_workflow_state(&checkpoint_state).await.unwrap();
// Simulate failure on second task
workflow.tasks.iter_mut().find(|t| t.id == task2_id).unwrap().state = TaskState::Failed;
// Verify rollback capability
let recovered_state = storage.load_workflow_state(workflow_id).await.unwrap();
assert_eq!(recovered_state.completed_tasks.len(), 1);
assert_eq!(recovered_state.completed_tasks[0], task1_id);
assert_eq!(recovered_state.checkpoints.len(), 1);
// Test that rollback preserves completed work
assert!(recovered_state.completed_tasks.contains(&task1_id));
assert!(recovered_state.current_tasks.contains(&task2_id));
}
#[tokio::test]
async fn test_state_persistence() {
let storage = Arc::new(SimpleStorage::new());
let workflow_id = Uuid::new_v4();
let state = WorkflowState {
workflow_id,
current_tasks: vec![Uuid::new_v4(), Uuid::new_v4()],
completed_tasks: vec![Uuid::new_v4()],
failed_tasks: vec![],
checkpoints: vec![serde_json::json!({"test": "checkpoint"})],
metadata: serde_json::json!({"backend": "filesystem"}),
created_at: Utc::now(),
updated_at: Utc::now(),
};
// Save and load state
storage.save_workflow_state(&state).await.unwrap();
let loaded_state = storage.load_workflow_state(workflow_id).await.unwrap();
assert_eq!(loaded_state.workflow_id, workflow_id);
assert_eq!(loaded_state.current_tasks.len(), 2);
assert_eq!(loaded_state.completed_tasks.len(), 1);
assert_eq!(loaded_state.checkpoints.len(), 1);
// Verify state persistence
let all_states = storage.list_workflow_states().await.unwrap();
assert!(all_states.contains(&workflow_id));
}
#[tokio::test]
async fn test_metrics_collection() {
let metrics = Arc::new(SimpleMetricsCollector::new());
let workflow_id = Uuid::new_v4();
let task_id = Uuid::new_v4();
// Record various events
metrics.record_event(WorkflowEvent {
workflow_id,
task_id: Some(task_id),
event_type: "workflow_started".to_string(),
metadata: serde_json::json!({"name": "test-workflow"}),
timestamp: Utc::now(),
}).await;
sleep(Duration::from_millis(10)).await;
metrics.record_event(WorkflowEvent {
workflow_id,
task_id: Some(task_id),
event_type: "task_started".to_string(),
metadata: serde_json::json!({"task_name": "test-task"}),
timestamp: Utc::now(),
}).await;
sleep(Duration::from_millis(10)).await;
metrics.record_event(WorkflowEvent {
workflow_id,
task_id: Some(task_id),
event_type: "task_completed".to_string(),
metadata: serde_json::json!({"task_name": "test-task", "duration_ms": 100}),
timestamp: Utc::now(),
}).await;
metrics.record_event(WorkflowEvent {
workflow_id,
task_id: None,
event_type: "workflow_completed".to_string(),
metadata: serde_json::json!({"total_duration_ms": 200}),
timestamp: Utc::now(),
}).await;
// Verify metrics collection
let workflow_metrics = metrics.get_workflow_metrics(workflow_id).await.unwrap();
assert_eq!(workflow_metrics.total_tasks, 1);
assert_eq!(workflow_metrics.completed_tasks, 1);
assert_eq!(workflow_metrics.failed_tasks, 0);
assert!(workflow_metrics.total_duration_ms >= 20);
let events = metrics.get_workflow_events(workflow_id).await.unwrap();
assert_eq!(events.len(), 4);
assert!(events.iter().any(|e| e.event_type == "workflow_started"));
assert!(events.iter().any(|e| e.event_type == "workflow_completed"));
}