//! Comprehensive migration tests for storage backend transitions //! //! This module tests data migration between all supported storage backends, //! ensuring data integrity, rollback functionality, and performance characteristics. use std::sync::Arc; use tempfile::TempDir; use tokio_test; // Import test helpers and orchestrator types mod helpers; use helpers::{TestEnvironment, TestDataGenerator, StorageAssertions}; use orchestrator::{TaskStatus, WorkflowTask}; use orchestrator::storage::{StorageConfig, create_storage}; use orchestrator::migration::{ StorageMigrator, MigrationConfig, MigrationOptions, MigrationStatus, RollbackManager, RollbackConfig, RollbackStrategy, RollbackStatus }; /// Test basic migration between filesystem and SurrealDB embedded #[cfg(feature = "surrealdb")] #[tokio::test] async fn test_filesystem_to_embedded_migration() { let mut env = TestEnvironment::new(); let gen = TestDataGenerator::new(); let source_config = env.filesystem_config().unwrap(); let target_config = env.surrealdb_embedded_config().unwrap(); // Setup source data let source_storage = create_storage(source_config.clone()).await.unwrap(); source_storage.init().await.unwrap(); let test_tasks = gen.workflow_tasks_batch(10); for task in &test_tasks { source_storage.enqueue(task.clone(), 1).await.unwrap(); } // Add some audit entries and metrics let audit_entry = gen.audit_entry(test_tasks[0].id.clone()); source_storage.record_audit_entry(audit_entry).await.unwrap(); let metric = gen.metric("test_metric", 42.0); source_storage.record_metric(metric).await.unwrap(); // Configure migration let migration_config = MigrationConfig { source_config, target_config, options: MigrationOptions { dry_run: false, verify_integrity: true, create_backup: true, ..Default::default() }, }; // Execute migration let migrator = StorageMigrator::new(migration_config); let report = migrator.execute().await.unwrap(); // Verify migration success assert_eq!(report.status, MigrationStatus::Completed); assert_eq!(report.statistics.tasks_migrated, 10); assert_eq!(report.statistics.tasks_failed, 0); assert_eq!(report.errors.len(), 0); // Verify data in target storage let target_storage = create_storage(report.target_backend.parse().unwrap()).await.unwrap(); StorageAssertions::assert_task_count(&target_storage, 10).await.unwrap(); } /// Test migration with progress callback #[tokio::test] async fn test_migration_with_progress_tracking() { let mut env = TestEnvironment::new(); let gen = TestDataGenerator::new(); let source_config = env.filesystem_config().unwrap(); let target_config = env.filesystem_config().unwrap(); // Different directory // Setup source data let source_storage = create_storage(source_config.clone()).await.unwrap(); source_storage.init().await.unwrap(); let test_tasks = gen.workflow_tasks_batch(5); for task in &test_tasks { source_storage.enqueue(task.clone(), 1).await.unwrap(); } // Track progress updates let progress_updates = Arc::new(std::sync::Mutex::new(Vec::new())); let progress_updates_clone = progress_updates.clone(); let progress_callback = Arc::new(move |progress: &orchestrator::migration::MigrationProgress| { progress_updates_clone.lock().unwrap().push(( progress.phase.clone(), progress.percentage, progress.message.clone(), )); }); // Configure migration let migration_config = MigrationConfig { source_config, target_config, options: MigrationOptions { dry_run: false, batch_size: 2, ..Default::default() }, }; // Execute migration with progress tracking let migrator = StorageMigrator::new(migration_config) .with_progress_callback(progress_callback); let report = migrator.execute().await.unwrap(); // Verify migration success assert_eq!(report.status, MigrationStatus::Completed); assert_eq!(report.statistics.tasks_migrated, 5); // Verify progress was tracked let updates = progress_updates.lock().unwrap(); assert!(!updates.is_empty()); // Check that we received updates for all phases let phases: Vec = updates.iter().map(|(phase, _, _)| phase.clone()).collect(); assert!(phases.contains(&"validation".to_string())); assert!(phases.contains(&"connection".to_string())); assert!(phases.contains(&"discovery".to_string())); assert!(phases.contains(&"migration".to_string())); } /// Test migration with filtering options #[tokio::test] async fn test_migration_with_filtering() { let mut env = TestEnvironment::new(); let gen = TestDataGenerator::new(); let source_config = env.filesystem_config().unwrap(); let target_config = env.filesystem_config().unwrap(); // Setup source data with different statuses let source_storage = create_storage(source_config.clone()).await.unwrap(); source_storage.init().await.unwrap(); let pending_tasks = (0..3).map(|_| gen.workflow_task_with_status(TaskStatus::Pending)).collect::>(); let completed_tasks = (0..2).map(|_| gen.workflow_task_with_status(TaskStatus::Completed)).collect::>(); let failed_tasks = (0..1).map(|_| gen.workflow_task_with_status(TaskStatus::Failed)).collect::>(); for task in &pending_tasks { source_storage.enqueue(task.clone(), 1).await.unwrap(); } for task in &completed_tasks { source_storage.enqueue(task.clone(), 1).await.unwrap(); } for task in &failed_tasks { source_storage.enqueue(task.clone(), 1).await.unwrap(); } // Configure migration to only migrate pending and completed tasks let migration_config = MigrationConfig { source_config, target_config, options: MigrationOptions { dry_run: false, status_filter: Some(vec![TaskStatus::Pending, TaskStatus::Completed]), verify_integrity: false, // Skip integrity check for filtered migration ..Default::default() }, }; // Execute migration let migrator = StorageMigrator::new(migration_config); let report = migrator.execute().await.unwrap(); // Verify migration success assert_eq!(report.status, MigrationStatus::Completed); assert_eq!(report.statistics.tasks_migrated, 5); // 3 pending + 2 completed assert_eq!(report.statistics.tasks_failed, 0); // Verify target storage contains only filtered tasks let target_storage = create_storage(report.target_backend.parse().unwrap()).await.unwrap(); StorageAssertions::assert_task_count(&target_storage, 5).await.unwrap(); let failed_tasks_in_target = target_storage.list_tasks(Some(TaskStatus::Failed)).await.unwrap(); assert_eq!(failed_tasks_in_target.len(), 0); } /// Test dry run migration #[tokio::test] async fn test_dry_run_migration() { let mut env = TestEnvironment::new(); let gen = TestDataGenerator::new(); let source_config = env.filesystem_config().unwrap(); let target_config = env.filesystem_config().unwrap(); // Setup source data let source_storage = create_storage(source_config.clone()).await.unwrap(); source_storage.init().await.unwrap(); let test_tasks = gen.workflow_tasks_batch(5); for task in &test_tasks { source_storage.enqueue(task.clone(), 1).await.unwrap(); } // Configure dry run migration let migration_config = MigrationConfig { source_config, target_config: target_config.clone(), options: MigrationOptions { dry_run: true, ..Default::default() }, }; // Execute dry run let migrator = StorageMigrator::new(migration_config); let report = migrator.execute().await.unwrap(); // Verify dry run completed successfully assert_eq!(report.status, MigrationStatus::Completed); assert_eq!(report.statistics.tasks_migrated, 5); // Verify target storage is empty (no actual migration occurred) let target_storage = create_storage(target_config).await.unwrap(); target_storage.init().await.unwrap(); StorageAssertions::assert_task_count(&target_storage, 0).await.unwrap(); } /// Test migration failure and rollback #[tokio::test] async fn test_migration_failure_and_rollback() { let mut env = TestEnvironment::new(); let gen = TestDataGenerator::new(); let source_config = env.filesystem_config().unwrap(); // Create an invalid target config to force migration failure let mut target_config = env.filesystem_config().unwrap(); target_config.data_dir = "/invalid/path/that/does/not/exist".to_string(); // Setup source data let source_storage = create_storage(source_config.clone()).await.unwrap(); source_storage.init().await.unwrap(); let test_tasks = gen.workflow_tasks_batch(3); for task in &test_tasks { source_storage.enqueue(task.clone(), 1).await.unwrap(); } // Configure migration with backup for rollback let migration_config = MigrationConfig { source_config, target_config, options: MigrationOptions { dry_run: false, create_backup: true, ..Default::default() }, }; // Execute migration (should fail) let migrator = StorageMigrator::new(migration_config); let report = migrator.execute().await.unwrap(); // Verify migration failed assert_eq!(report.status, MigrationStatus::Failed); assert!(!report.errors.is_empty()); // Check if automatic rollback was attempted assert!(!report.warnings.is_empty()); let rollback_warning = report.warnings.iter() .find(|w| w.contains("Automatic rollback executed")); assert!(rollback_warning.is_some()); } /// Test manual rollback process #[tokio::test] async fn test_manual_rollback_process() { let rollback_config = RollbackConfig { backup_path: "/tmp/test_backup.json".to_string(), migration_id: "test_migration_123".to_string(), rollback_strategy: RollbackStrategy::Manual, }; let rollback_manager = RollbackManager::new(rollback_config); // Create a mock target storage for rollback let mut env = TestEnvironment::new(); let target_config = env.filesystem_config().unwrap(); let target_storage = create_storage(target_config).await.unwrap(); target_storage.init().await.unwrap(); // Execute manual rollback let rollback_report = rollback_manager.execute_rollback(&target_storage).await.unwrap(); // Verify rollback completed (manual strategy just generates instructions) assert_eq!(rollback_report.status, RollbackStatus::Completed); assert!(!rollback_report.actions_performed.is_empty()); // Check that instructions were generated let instruction_action = rollback_report.actions_performed.iter() .find(|a| a.action.contains("manual_instructions")); assert!(instruction_action.is_some()); } /// Test migration between different backend combinations #[cfg(feature = "surrealdb")] #[tokio::test] async fn test_all_backend_combinations() { let mut env = TestEnvironment::new(); let gen = TestDataGenerator::new(); let configs = env.all_storage_configs(); // Test migration between each pair of backends for (i, source_config) in configs.iter().enumerate() { for (j, target_config) in configs.iter().enumerate() { if i == j { continue; // Skip same backend migrations } println!("Testing migration from {} to {}", source_config.storage_type, target_config.storage_type); // Setup source data let source_storage = create_storage(source_config.clone()).await.unwrap(); source_storage.init().await.unwrap(); let test_tasks = gen.workflow_tasks_batch(3); for task in &test_tasks { source_storage.enqueue(task.clone(), 1).await.unwrap(); } // Configure migration let migration_config = MigrationConfig { source_config: source_config.clone(), target_config: target_config.clone(), options: MigrationOptions { dry_run: false, verify_integrity: true, create_backup: false, // Skip backup for test speed ..Default::default() }, }; // Execute migration let migrator = StorageMigrator::new(migration_config); let report = migrator.execute().await.unwrap(); // Verify migration success assert_eq!(report.status, MigrationStatus::Completed); assert_eq!(report.statistics.tasks_migrated, 3); assert_eq!(report.statistics.tasks_failed, 0); println!("✓ Migration from {} to {} successful", source_config.storage_type, target_config.storage_type); } } } /// Test large dataset migration performance #[tokio::test] async fn test_large_dataset_migration_performance() { let mut env = TestEnvironment::new(); let gen = TestDataGenerator::new(); let source_config = env.filesystem_config().unwrap(); let target_config = env.filesystem_config().unwrap(); // Setup large dataset let source_storage = create_storage(source_config.clone()).await.unwrap(); source_storage.init().await.unwrap(); let task_count = 1000; println!("Setting up {} tasks for migration...", task_count); let batch_tasks: Vec<(WorkflowTask, u8)> = (0..task_count) .map(|i| (gen.workflow_task(), (i % 10 + 1) as u8)) .collect(); source_storage.enqueue_batch(batch_tasks).await.unwrap(); // Configure migration with optimized batch size let migration_config = MigrationConfig { source_config, target_config, options: MigrationOptions { dry_run: false, batch_size: 100, verify_integrity: false, // Skip for performance create_backup: false, ..Default::default() }, }; // Execute migration with timing let start = std::time::Instant::now(); let migrator = StorageMigrator::new(migration_config); let report = migrator.execute().await.unwrap(); let duration = start.elapsed(); // Verify migration success assert_eq!(report.status, MigrationStatus::Completed); assert_eq!(report.statistics.tasks_migrated, task_count); println!("Migration of {} tasks completed in {:?}", task_count, duration); println!("Throughput: {:.2} tasks/second", report.statistics.throughput_tasks_per_second); // Performance should be reasonable (adjust based on requirements) assert!(duration.as_secs() < 30, "Migration took too long: {:?}", duration); assert!(report.statistics.throughput_tasks_per_second > 10.0, "Throughput too low"); } /// Test migration with data integrity verification #[tokio::test] async fn test_migration_data_integrity() { let mut env = TestEnvironment::new(); let gen = TestDataGenerator::new(); let source_config = env.filesystem_config().unwrap(); let target_config = env.filesystem_config().unwrap(); // Setup source data with complex tasks let source_storage = create_storage(source_config.clone()).await.unwrap(); source_storage.init().await.unwrap(); let mut complex_task = gen.workflow_task(); complex_task.args = vec!["arg1".to_string(), "arg2".to_string(), "arg3".to_string()]; complex_task.dependencies = vec!["dep1".to_string(), "dep2".to_string()]; complex_task.status = TaskStatus::Running; complex_task.started_at = Some(chrono::Utc::now()); complex_task.output = Some("Complex output data".to_string()); source_storage.enqueue(complex_task.clone(), 5).await.unwrap(); // Configure migration with integrity verification let migration_config = MigrationConfig { source_config, target_config: target_config.clone(), options: MigrationOptions { dry_run: false, verify_integrity: true, ..Default::default() }, }; // Execute migration let migrator = StorageMigrator::new(migration_config); let report = migrator.execute().await.unwrap(); // Verify migration success with integrity check assert_eq!(report.status, MigrationStatus::Completed); assert_eq!(report.statistics.tasks_migrated, 1); assert_eq!(report.errors.len(), 0); // Verify complex data was preserved let target_storage = create_storage(target_config).await.unwrap(); target_storage.init().await.unwrap(); let migrated_task = target_storage.get_task(&complex_task.id).await.unwrap().unwrap(); assert_eq!(migrated_task.args, complex_task.args); assert_eq!(migrated_task.dependencies, complex_task.dependencies); assert_eq!(migrated_task.status, complex_task.status); assert_eq!(migrated_task.output, complex_task.output); } /// Test migration rollback with target storage cleanup #[tokio::test] async fn test_rollback_with_target_cleanup() { let mut env = TestEnvironment::new(); let gen = TestDataGenerator::new(); // Create a valid target config let target_config = env.filesystem_config().unwrap(); let target_storage = create_storage(target_config.clone()).await.unwrap(); target_storage.init().await.unwrap(); // Add some test data to target storage (simulating partial migration) let test_tasks = gen.workflow_tasks_batch(3); for task in &test_tasks { target_storage.enqueue(task.clone(), 1).await.unwrap(); } StorageAssertions::assert_task_count(&target_storage, 3).await.unwrap(); // Configure rollback to clear target storage let rollback_config = RollbackConfig { backup_path: "/tmp/nonexistent_backup.json".to_string(), migration_id: "test_migration_456".to_string(), rollback_strategy: RollbackStrategy::ClearTargetRestoreSource, }; let rollback_manager = RollbackManager::new(rollback_config); // Execute rollback let rollback_report = rollback_manager.execute_rollback(&target_storage).await.unwrap(); // Note: The current implementation doesn't support task deletion // This test verifies the rollback process recognizes the limitation assert_eq!(rollback_report.status, RollbackStatus::PartiallyCompleted); assert!(!rollback_report.errors.is_empty()); let limitation_error = rollback_report.errors.iter() .find(|e| e.contains("TaskStorage trait lacks delete_task method")); assert!(limitation_error.is_some()); } /// Test migration continuation after non-critical errors #[tokio::test] async fn test_migration_continue_on_error() { let mut env = TestEnvironment::new(); let gen = TestDataGenerator::new(); let source_config = env.filesystem_config().unwrap(); let target_config = env.filesystem_config().unwrap(); // Setup source data let source_storage = create_storage(source_config.clone()).await.unwrap(); source_storage.init().await.unwrap(); let test_tasks = gen.workflow_tasks_batch(5); for task in &test_tasks { source_storage.enqueue(task.clone(), 1).await.unwrap(); } // Configure migration to continue on error let migration_config = MigrationConfig { source_config, target_config, options: MigrationOptions { dry_run: false, continue_on_error: true, max_retries: 1, verify_integrity: false, ..Default::default() }, }; // Execute migration let migrator = StorageMigrator::new(migration_config); let report = migrator.execute().await.unwrap(); // Migration should complete despite potential errors assert_eq!(report.status, MigrationStatus::Completed); assert_eq!(report.statistics.tasks_migrated, 5); } /// Test migration utility functions for common scenarios #[cfg(feature = "surrealdb")] #[tokio::test] async fn test_migration_utility_functions() { let mut env = TestEnvironment::new(); let source_dir = env.create_temp_dir().unwrap(); let target_dir = env.create_temp_dir().unwrap(); // Test filesystem to embedded utility let migration_options = MigrationOptions { dry_run: true, ..Default::default() }; let migrator = StorageMigrator::filesystem_to_embedded( &source_dir.to_string_lossy(), &target_dir.to_string_lossy(), Some(migration_options), ); // Verify configuration was set correctly let report = migrator.execute().await.unwrap(); assert_eq!(report.source_backend, "filesystem"); assert_eq!(report.target_backend, "surrealdb-embedded"); assert_eq!(report.status, MigrationStatus::Completed); // Test embedded to server utility let migrator = StorageMigrator::embedded_to_server( &target_dir.to_string_lossy(), "memory://test", "test", "test", Some(MigrationOptions { dry_run: true, ..Default::default() }), ); let report = migrator.execute().await.unwrap(); assert_eq!(report.source_backend, "surrealdb-embedded"); assert_eq!(report.target_backend, "surrealdb-server"); assert_eq!(report.status, MigrationStatus::Completed); }