Jesús Pérez 09a97ac8f5
chore: update platform submodule to monorepo crates structure
Platform restructured into crates/, added AI service and detector,
       migrated control-center-ui to Leptos 0.8
2026-01-08 21:32:59 +00:00

668 lines
22 KiB
Rust

//! Comprehensive migration tests for storage backend transitions
//!
//! This module tests data migration between all supported storage backends,
//! ensuring data integrity, rollback functionality, and performance
//! characteristics.
//!
//! NOTE: These tests are disabled due to import issues with crate name
//! (orchestrator crate name vs provisioning-orchestrator)
// Disabled: import issues with crate name
#![allow(unexpected_cfgs)]
#![cfg(feature = "migration_tests")]
#![allow(unused_imports, clippy::single_component_path_imports)]
use std::sync::Arc;
use tempfile::TempDir;
use tokio_test;
// Import test helpers and orchestrator types
mod helpers;
use helpers::{StorageAssertions, TestDataGenerator, TestEnvironment};
use orchestrator::migration::{
MigrationConfig, MigrationOptions, MigrationStatus, RollbackConfig, RollbackManager,
RollbackStatus, RollbackStrategy, StorageMigrator,
};
use orchestrator::storage::{create_storage, StorageConfig};
use orchestrator::{TaskStatus, WorkflowTask};
/// Test basic migration between filesystem and SurrealDB embedded
#[cfg(feature = "surrealdb")]
#[tokio::test]
async fn test_filesystem_to_embedded_migration() {
let mut env = TestEnvironment::new();
let gen = TestDataGenerator::new();
let source_config = env.filesystem_config().unwrap();
let target_config = env.surrealdb_embedded_config().unwrap();
// Setup source data
let source_storage = create_storage(source_config.clone()).await.unwrap();
source_storage.init().await.unwrap();
let test_tasks = gen.workflow_tasks_batch(10);
for task in &test_tasks {
source_storage.enqueue(task.clone(), 1).await.unwrap();
}
// Add some audit entries and metrics
let audit_entry = gen.audit_entry(test_tasks[0].id.clone());
source_storage
.record_audit_entry(audit_entry)
.await
.unwrap();
let metric = gen.metric("test_metric", 42.0);
source_storage.record_metric(metric).await.unwrap();
// Configure migration
let migration_config = MigrationConfig {
source_config,
target_config,
options: MigrationOptions {
dry_run: false,
verify_integrity: true,
create_backup: true,
..Default::default()
},
};
// Execute migration
let migrator = StorageMigrator::new(migration_config);
let report = migrator.execute().await.unwrap();
// Verify migration success
assert_eq!(report.status, MigrationStatus::Completed);
assert_eq!(report.statistics.tasks_migrated, 10);
assert_eq!(report.statistics.tasks_failed, 0);
assert_eq!(report.errors.len(), 0);
// Verify data in target storage
let target_storage = create_storage(report.target_backend.parse().unwrap())
.await
.unwrap();
StorageAssertions::assert_task_count(&target_storage, 10)
.await
.unwrap();
}
/// Test migration with progress callback
#[tokio::test]
async fn test_migration_with_progress_tracking() {
let mut env = TestEnvironment::new();
let gen = TestDataGenerator::new();
let source_config = env.filesystem_config().unwrap();
let target_config = env.filesystem_config().unwrap(); // Different directory
// Setup source data
let source_storage = create_storage(source_config.clone()).await.unwrap();
source_storage.init().await.unwrap();
let test_tasks = gen.workflow_tasks_batch(5);
for task in &test_tasks {
source_storage.enqueue(task.clone(), 1).await.unwrap();
}
// Track progress updates
let progress_updates = Arc::new(std::sync::Mutex::new(Vec::new()));
let progress_updates_clone = progress_updates.clone();
let progress_callback = Arc::new(
move |progress: &orchestrator::migration::MigrationProgress| {
progress_updates_clone.lock().unwrap().push((
progress.phase.clone(),
progress.percentage,
progress.message.clone(),
));
},
);
// Configure migration
let migration_config = MigrationConfig {
source_config,
target_config,
options: MigrationOptions {
dry_run: false,
batch_size: 2,
..Default::default()
},
};
// Execute migration with progress tracking
let migrator = StorageMigrator::new(migration_config).with_progress_callback(progress_callback);
let report = migrator.execute().await.unwrap();
// Verify migration success
assert_eq!(report.status, MigrationStatus::Completed);
assert_eq!(report.statistics.tasks_migrated, 5);
// Verify progress was tracked
let updates = progress_updates.lock().unwrap();
assert!(!updates.is_empty());
// Check that we received updates for all phases
let phases: Vec<String> = updates.iter().map(|(phase, _, _)| phase.clone()).collect();
assert!(phases.contains(&"validation".to_string()));
assert!(phases.contains(&"connection".to_string()));
assert!(phases.contains(&"discovery".to_string()));
assert!(phases.contains(&"migration".to_string()));
}
/// Test migration with filtering options
#[tokio::test]
async fn test_migration_with_filtering() {
let mut env = TestEnvironment::new();
let gen = TestDataGenerator::new();
let source_config = env.filesystem_config().unwrap();
let target_config = env.filesystem_config().unwrap();
// Setup source data with different statuses
let source_storage = create_storage(source_config.clone()).await.unwrap();
source_storage.init().await.unwrap();
let pending_tasks = (0..3)
.map(|_| gen.workflow_task_with_status(TaskStatus::Pending))
.collect::<Vec<_>>();
let completed_tasks = (0..2)
.map(|_| gen.workflow_task_with_status(TaskStatus::Completed))
.collect::<Vec<_>>();
let failed_tasks = (0..1)
.map(|_| gen.workflow_task_with_status(TaskStatus::Failed))
.collect::<Vec<_>>();
for task in &pending_tasks {
source_storage.enqueue(task.clone(), 1).await.unwrap();
}
for task in &completed_tasks {
source_storage.enqueue(task.clone(), 1).await.unwrap();
}
for task in &failed_tasks {
source_storage.enqueue(task.clone(), 1).await.unwrap();
}
// Configure migration to only migrate pending and completed tasks
let migration_config = MigrationConfig {
source_config,
target_config,
options: MigrationOptions {
dry_run: false,
status_filter: Some(vec![TaskStatus::Pending, TaskStatus::Completed]),
verify_integrity: false, // Skip integrity check for filtered migration
..Default::default()
},
};
// Execute migration
let migrator = StorageMigrator::new(migration_config);
let report = migrator.execute().await.unwrap();
// Verify migration success
assert_eq!(report.status, MigrationStatus::Completed);
assert_eq!(report.statistics.tasks_migrated, 5); // 3 pending + 2 completed
assert_eq!(report.statistics.tasks_failed, 0);
// Verify target storage contains only filtered tasks
let target_storage = create_storage(report.target_backend.parse().unwrap())
.await
.unwrap();
StorageAssertions::assert_task_count(&target_storage, 5)
.await
.unwrap();
let failed_tasks_in_target = target_storage
.list_tasks(Some(TaskStatus::Failed))
.await
.unwrap();
assert_eq!(failed_tasks_in_target.len(), 0);
}
/// Test dry run migration
#[tokio::test]
async fn test_dry_run_migration() {
let mut env = TestEnvironment::new();
let gen = TestDataGenerator::new();
let source_config = env.filesystem_config().unwrap();
let target_config = env.filesystem_config().unwrap();
// Setup source data
let source_storage = create_storage(source_config.clone()).await.unwrap();
source_storage.init().await.unwrap();
let test_tasks = gen.workflow_tasks_batch(5);
for task in &test_tasks {
source_storage.enqueue(task.clone(), 1).await.unwrap();
}
// Configure dry run migration
let migration_config = MigrationConfig {
source_config,
target_config: target_config.clone(),
options: MigrationOptions {
dry_run: true,
..Default::default()
},
};
// Execute dry run
let migrator = StorageMigrator::new(migration_config);
let report = migrator.execute().await.unwrap();
// Verify dry run completed successfully
assert_eq!(report.status, MigrationStatus::Completed);
assert_eq!(report.statistics.tasks_migrated, 5);
// Verify target storage is empty (no actual migration occurred)
let target_storage = create_storage(target_config).await.unwrap();
target_storage.init().await.unwrap();
StorageAssertions::assert_task_count(&target_storage, 0)
.await
.unwrap();
}
/// Test migration failure and rollback
#[tokio::test]
async fn test_migration_failure_and_rollback() {
let mut env = TestEnvironment::new();
let gen = TestDataGenerator::new();
let source_config = env.filesystem_config().unwrap();
// Create an invalid target config to force migration failure
let mut target_config = env.filesystem_config().unwrap();
target_config.data_dir = "/invalid/path/that/does/not/exist".to_string();
// Setup source data
let source_storage = create_storage(source_config.clone()).await.unwrap();
source_storage.init().await.unwrap();
let test_tasks = gen.workflow_tasks_batch(3);
for task in &test_tasks {
source_storage.enqueue(task.clone(), 1).await.unwrap();
}
// Configure migration with backup for rollback
let migration_config = MigrationConfig {
source_config,
target_config,
options: MigrationOptions {
dry_run: false,
create_backup: true,
..Default::default()
},
};
// Execute migration (should fail)
let migrator = StorageMigrator::new(migration_config);
let report = migrator.execute().await.unwrap();
// Verify migration failed
assert_eq!(report.status, MigrationStatus::Failed);
assert!(!report.errors.is_empty());
// Check if automatic rollback was attempted
assert!(!report.warnings.is_empty());
let rollback_warning = report
.warnings
.iter()
.find(|w| w.contains("Automatic rollback executed"));
assert!(rollback_warning.is_some());
}
/// Test manual rollback process
#[tokio::test]
async fn test_manual_rollback_process() {
let rollback_config = RollbackConfig {
backup_path: "/tmp/test_backup.json".to_string(),
migration_id: "test_migration_123".to_string(),
rollback_strategy: RollbackStrategy::Manual,
};
let rollback_manager = RollbackManager::new(rollback_config);
// Create a mock target storage for rollback
let mut env = TestEnvironment::new();
let target_config = env.filesystem_config().unwrap();
let target_storage = create_storage(target_config).await.unwrap();
target_storage.init().await.unwrap();
// Execute manual rollback
let rollback_report = rollback_manager
.execute_rollback(&target_storage)
.await
.unwrap();
// Verify rollback completed (manual strategy just generates instructions)
assert_eq!(rollback_report.status, RollbackStatus::Completed);
assert!(!rollback_report.actions_performed.is_empty());
// Check that instructions were generated
let instruction_action = rollback_report
.actions_performed
.iter()
.find(|a| a.action.contains("manual_instructions"));
assert!(instruction_action.is_some());
}
/// Test migration between different backend combinations
#[cfg(feature = "surrealdb")]
#[tokio::test]
async fn test_all_backend_combinations() {
let mut env = TestEnvironment::new();
let gen = TestDataGenerator::new();
let configs = env.all_storage_configs();
// Test migration between each pair of backends
for (i, source_config) in configs.iter().enumerate() {
for (j, target_config) in configs.iter().enumerate() {
if i == j {
continue; // Skip same backend migrations
}
println!(
"Testing migration from {} to {}",
source_config.storage_type, target_config.storage_type
);
// Setup source data
let source_storage = create_storage(source_config.clone()).await.unwrap();
source_storage.init().await.unwrap();
let test_tasks = gen.workflow_tasks_batch(3);
for task in &test_tasks {
source_storage.enqueue(task.clone(), 1).await.unwrap();
}
// Configure migration
let migration_config = MigrationConfig {
source_config: source_config.clone(),
target_config: target_config.clone(),
options: MigrationOptions {
dry_run: false,
verify_integrity: true,
create_backup: false, // Skip backup for test speed
..Default::default()
},
};
// Execute migration
let migrator = StorageMigrator::new(migration_config);
let report = migrator.execute().await.unwrap();
// Verify migration success
assert_eq!(report.status, MigrationStatus::Completed);
assert_eq!(report.statistics.tasks_migrated, 3);
assert_eq!(report.statistics.tasks_failed, 0);
println!(
"✓ Migration from {} to {} successful",
source_config.storage_type, target_config.storage_type
);
}
}
}
/// Test large dataset migration performance
#[tokio::test]
async fn test_large_dataset_migration_performance() {
let mut env = TestEnvironment::new();
let gen = TestDataGenerator::new();
let source_config = env.filesystem_config().unwrap();
let target_config = env.filesystem_config().unwrap();
// Setup large dataset
let source_storage = create_storage(source_config.clone()).await.unwrap();
source_storage.init().await.unwrap();
let task_count = 1000;
println!("Setting up {} tasks for migration...", task_count);
let batch_tasks: Vec<(WorkflowTask, u8)> = (0..task_count)
.map(|i| (gen.workflow_task(), (i % 10 + 1) as u8))
.collect();
source_storage.enqueue_batch(batch_tasks).await.unwrap();
// Configure migration with optimized batch size
let migration_config = MigrationConfig {
source_config,
target_config,
options: MigrationOptions {
dry_run: false,
batch_size: 100,
verify_integrity: false, // Skip for performance
create_backup: false,
..Default::default()
},
};
// Execute migration with timing
let start = std::time::Instant::now();
let migrator = StorageMigrator::new(migration_config);
let report = migrator.execute().await.unwrap();
let duration = start.elapsed();
// Verify migration success
assert_eq!(report.status, MigrationStatus::Completed);
assert_eq!(report.statistics.tasks_migrated, task_count);
println!(
"Migration of {} tasks completed in {:?}",
task_count, duration
);
println!(
"Throughput: {:.2} tasks/second",
report.statistics.throughput_tasks_per_second
);
// Performance should be reasonable (adjust based on requirements)
assert!(
duration.as_secs() < 30,
"Migration took too long: {:?}",
duration
);
assert!(
report.statistics.throughput_tasks_per_second > 10.0,
"Throughput too low"
);
}
/// Test migration with data integrity verification
#[tokio::test]
async fn test_migration_data_integrity() {
let mut env = TestEnvironment::new();
let gen = TestDataGenerator::new();
let source_config = env.filesystem_config().unwrap();
let target_config = env.filesystem_config().unwrap();
// Setup source data with complex tasks
let source_storage = create_storage(source_config.clone()).await.unwrap();
source_storage.init().await.unwrap();
let mut complex_task = gen.workflow_task();
complex_task.args = vec!["arg1".to_string(), "arg2".to_string(), "arg3".to_string()];
complex_task.dependencies = vec!["dep1".to_string(), "dep2".to_string()];
complex_task.status = TaskStatus::Running;
complex_task.started_at = Some(chrono::Utc::now());
complex_task.output = Some("Complex output data".to_string());
source_storage
.enqueue(complex_task.clone(), 5)
.await
.unwrap();
// Configure migration with integrity verification
let migration_config = MigrationConfig {
source_config,
target_config: target_config.clone(),
options: MigrationOptions {
dry_run: false,
verify_integrity: true,
..Default::default()
},
};
// Execute migration
let migrator = StorageMigrator::new(migration_config);
let report = migrator.execute().await.unwrap();
// Verify migration success with integrity check
assert_eq!(report.status, MigrationStatus::Completed);
assert_eq!(report.statistics.tasks_migrated, 1);
assert_eq!(report.errors.len(), 0);
// Verify complex data was preserved
let target_storage = create_storage(target_config).await.unwrap();
target_storage.init().await.unwrap();
let migrated_task = target_storage
.get_task(&complex_task.id)
.await
.unwrap()
.unwrap();
assert_eq!(migrated_task.args, complex_task.args);
assert_eq!(migrated_task.dependencies, complex_task.dependencies);
assert_eq!(migrated_task.status, complex_task.status);
assert_eq!(migrated_task.output, complex_task.output);
}
/// Test migration rollback with target storage cleanup
#[tokio::test]
async fn test_rollback_with_target_cleanup() {
let mut env = TestEnvironment::new();
let gen = TestDataGenerator::new();
// Create a valid target config
let target_config = env.filesystem_config().unwrap();
let target_storage = create_storage(target_config.clone()).await.unwrap();
target_storage.init().await.unwrap();
// Add some test data to target storage (simulating partial migration)
let test_tasks = gen.workflow_tasks_batch(3);
for task in &test_tasks {
target_storage.enqueue(task.clone(), 1).await.unwrap();
}
StorageAssertions::assert_task_count(&target_storage, 3)
.await
.unwrap();
// Configure rollback to clear target storage
let rollback_config = RollbackConfig {
backup_path: "/tmp/nonexistent_backup.json".to_string(),
migration_id: "test_migration_456".to_string(),
rollback_strategy: RollbackStrategy::ClearTargetRestoreSource,
};
let rollback_manager = RollbackManager::new(rollback_config);
// Execute rollback
let rollback_report = rollback_manager
.execute_rollback(&target_storage)
.await
.unwrap();
// Note: The current implementation doesn't support task deletion
// This test verifies the rollback process recognizes the limitation
assert_eq!(rollback_report.status, RollbackStatus::PartiallyCompleted);
assert!(!rollback_report.errors.is_empty());
let limitation_error = rollback_report
.errors
.iter()
.find(|e| e.contains("TaskStorage trait lacks delete_task method"));
assert!(limitation_error.is_some());
}
/// Test migration continuation after non-critical errors
#[tokio::test]
async fn test_migration_continue_on_error() {
let mut env = TestEnvironment::new();
let gen = TestDataGenerator::new();
let source_config = env.filesystem_config().unwrap();
let target_config = env.filesystem_config().unwrap();
// Setup source data
let source_storage = create_storage(source_config.clone()).await.unwrap();
source_storage.init().await.unwrap();
let test_tasks = gen.workflow_tasks_batch(5);
for task in &test_tasks {
source_storage.enqueue(task.clone(), 1).await.unwrap();
}
// Configure migration to continue on error
let migration_config = MigrationConfig {
source_config,
target_config,
options: MigrationOptions {
dry_run: false,
continue_on_error: true,
max_retries: 1,
verify_integrity: false,
..Default::default()
},
};
// Execute migration
let migrator = StorageMigrator::new(migration_config);
let report = migrator.execute().await.unwrap();
// Migration should complete despite potential errors
assert_eq!(report.status, MigrationStatus::Completed);
assert_eq!(report.statistics.tasks_migrated, 5);
}
/// Test migration utility functions for common scenarios
#[cfg(feature = "surrealdb")]
#[tokio::test]
async fn test_migration_utility_functions() {
let mut env = TestEnvironment::new();
let source_dir = env.create_temp_dir().unwrap();
let target_dir = env.create_temp_dir().unwrap();
// Test filesystem to embedded utility
let migration_options = MigrationOptions {
dry_run: true,
..Default::default()
};
let migrator = StorageMigrator::filesystem_to_embedded(
&source_dir.to_string_lossy(),
&target_dir.to_string_lossy(),
Some(migration_options),
);
// Verify configuration was set correctly
let report = migrator.execute().await.unwrap();
assert_eq!(report.source_backend, "filesystem");
assert_eq!(report.target_backend, "surrealdb-embedded");
assert_eq!(report.status, MigrationStatus::Completed);
// Test embedded to server utility
let migrator = StorageMigrator::embedded_to_server(
&target_dir.to_string_lossy(),
"memory://test",
"test",
"test",
Some(MigrationOptions {
dry_run: true,
..Default::default()
}),
);
let report = migrator.execute().await.unwrap();
assert_eq!(report.source_backend, "surrealdb-embedded");
assert_eq!(report.target_backend, "surrealdb-server");
assert_eq!(report.status, MigrationStatus::Completed);
}