prvng_platform/orchestrator/benches/migration_benchmarks.rs

535 lines
21 KiB
Rust
Raw Permalink Normal View History

2025-10-07 10:59:52 +01:00
//! Migration performance benchmarks
//!
//! This module benchmarks migration performance between different storage backends,
//! measuring throughput, memory usage, and scalability of data transfers.
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use std::sync::Arc;
use tempfile::TempDir;
use tokio::runtime::Runtime;
// Import orchestrator types
use orchestrator::{TaskStatus, WorkflowTask};
use orchestrator::storage::{StorageConfig, create_storage};
use orchestrator::migration::{StorageMigrator, MigrationConfig, MigrationOptions};
/// Migration benchmark data generator
struct MigrationBenchGenerator {
counter: std::sync::atomic::AtomicU64,
}
impl MigrationBenchGenerator {
fn new() -> Self {
Self {
counter: std::sync::atomic::AtomicU64::new(0),
}
}
fn generate_task(&self) -> WorkflowTask {
let id = self.counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
WorkflowTask {
id: format!("migration_bench_task_{}", id),
name: format!("Migration Benchmark Task {}", id),
command: "echo".to_string(),
args: vec!["migration".to_string(), "benchmark".to_string()],
dependencies: if id % 5 == 0 {
vec![format!("migration_bench_task_{}", id.saturating_sub(1))]
} else {
vec![]
},
status: match id % 4 {
0 => TaskStatus::Pending,
1 => TaskStatus::Running,
2 => TaskStatus::Completed,
_ => TaskStatus::Failed,
},
created_at: chrono::Utc::now() - chrono::Duration::minutes(id as i64 % 60),
started_at: if id % 4 >= 1 {
Some(chrono::Utc::now() - chrono::Duration::minutes(id as i64 % 30))
} else {
None
},
completed_at: if id % 4 >= 2 {
Some(chrono::Utc::now() - chrono::Duration::minutes(id as i64 % 10))
} else {
None
},
output: if id % 4 == 2 {
Some(format!("Output for task {}", id))
} else {
None
},
error: if id % 4 == 3 {
Some(format!("Error for task {}", id))
} else {
None
},
}
}
async fn populate_storage(&self, storage: &Box<dyn orchestrator::storage::TaskStorage>, count: usize) {
let batch_size = 100.min(count);
for chunk_start in (0..count).step_by(batch_size) {
let chunk_end = (chunk_start + batch_size).min(count);
let batch: Vec<(WorkflowTask, u8)> = (chunk_start..chunk_end)
.map(|i| (self.generate_task(), (i % 10 + 1) as u8))
.collect();
storage.enqueue_batch(batch).await.unwrap();
}
}
}
/// Create migration configuration pairs for benchmarking
async fn create_migration_configs() -> Vec<(String, StorageConfig, StorageConfig)> {
let mut configs = Vec::new();
// Filesystem to Filesystem (different directories)
let fs_source_temp = TempDir::new().unwrap();
let fs_target_temp = TempDir::new().unwrap();
let fs_source_config = StorageConfig {
storage_type: "filesystem".to_string(),
data_dir: fs_source_temp.path().to_string_lossy().to_string(),
..Default::default()
};
let fs_target_config = StorageConfig {
storage_type: "filesystem".to_string(),
data_dir: fs_target_temp.path().to_string_lossy().to_string(),
..Default::default()
};
configs.push((
"filesystem_to_filesystem".to_string(),
fs_source_config,
fs_target_config,
));
// Keep temp directories alive
std::mem::forget(fs_source_temp);
std::mem::forget(fs_target_temp);
#[cfg(feature = "surrealdb")]
{
// Filesystem to SurrealDB Embedded
let fs_to_embedded_source = TempDir::new().unwrap();
let fs_to_embedded_target = TempDir::new().unwrap();
let fs_source = StorageConfig {
storage_type: "filesystem".to_string(),
data_dir: fs_to_embedded_source.path().to_string_lossy().to_string(),
..Default::default()
};
let embedded_target = StorageConfig {
storage_type: "surrealdb-embedded".to_string(),
data_dir: fs_to_embedded_target.path().to_string_lossy().to_string(),
surrealdb_namespace: Some("bench".to_string()),
surrealdb_database: Some("tasks".to_string()),
..Default::default()
};
configs.push((
"filesystem_to_embedded".to_string(),
fs_source,
embedded_target,
));
std::mem::forget(fs_to_embedded_source);
std::mem::forget(fs_to_embedded_target);
// SurrealDB Embedded to Server
let embedded_to_server_source = TempDir::new().unwrap();
let embedded_source = StorageConfig {
storage_type: "surrealdb-embedded".to_string(),
data_dir: embedded_to_server_source.path().to_string_lossy().to_string(),
surrealdb_namespace: Some("bench".to_string()),
surrealdb_database: Some("tasks".to_string()),
..Default::default()
};
let server_target = StorageConfig {
storage_type: "surrealdb-server".to_string(),
data_dir: "".to_string(),
surrealdb_url: Some("memory://migration_bench".to_string()),
surrealdb_namespace: Some("bench".to_string()),
surrealdb_database: Some("tasks".to_string()),
surrealdb_username: Some("bench".to_string()),
surrealdb_password: Some("bench".to_string()),
};
configs.push((
"embedded_to_server".to_string(),
embedded_source,
server_target,
));
std::mem::forget(embedded_to_server_source);
}
configs
}
/// Benchmark basic migration performance
fn bench_basic_migration(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let migration_configs = rt.block_on(create_migration_configs());
let mut group = c.benchmark_group("basic_migration");
group.sample_size(10);
let dataset_sizes = [100, 500, 1000];
for (migration_name, source_config, target_config) in migration_configs {
for dataset_size in dataset_sizes.iter() {
group.throughput(Throughput::Elements(*dataset_size as u64));
group.bench_with_input(
BenchmarkId::new(format!("{}_{}", migration_name, dataset_size), dataset_size),
&(source_config.clone(), target_config.clone(), *dataset_size),
|b, (source_config, target_config, dataset_size)| {
b.to_async(&rt).iter_batched(
|| {
// Setup: Create and populate source storage
rt.block_on(async {
let source_storage = create_storage(source_config.clone()).await.unwrap();
source_storage.init().await.unwrap();
let gen = MigrationBenchGenerator::new();
gen.populate_storage(&source_storage, *dataset_size).await;
(source_config.clone(), target_config.clone())
})
},
|(source_config, target_config)| async move {
let migration_config = MigrationConfig {
source_config,
target_config,
options: MigrationOptions {
dry_run: false,
verify_integrity: false,
create_backup: false,
batch_size: 50,
..Default::default()
},
};
let migrator = StorageMigrator::new(migration_config);
black_box(migrator.execute().await.unwrap())
},
criterion::BatchSize::SmallInput,
);
},
);
}
}
group.finish();
}
/// Benchmark migration with different batch sizes
fn bench_migration_batch_sizes(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let migration_configs = rt.block_on(create_migration_configs());
let mut group = c.benchmark_group("migration_batch_sizes");
group.sample_size(10);
let batch_sizes = [10, 50, 100, 200];
let dataset_size = 1000;
for (migration_name, source_config, target_config) in migration_configs {
for batch_size in batch_sizes.iter() {
group.throughput(Throughput::Elements(dataset_size as u64));
group.bench_with_input(
BenchmarkId::new(format!("{}_batch_{}", migration_name, batch_size), batch_size),
&(source_config.clone(), target_config.clone(), *batch_size),
|b, (source_config, target_config, batch_size)| {
b.to_async(&rt).iter_batched(
|| {
// Setup: Create and populate source storage
rt.block_on(async {
let source_storage = create_storage(source_config.clone()).await.unwrap();
source_storage.init().await.unwrap();
let gen = MigrationBenchGenerator::new();
gen.populate_storage(&source_storage, dataset_size).await;
(source_config.clone(), target_config.clone())
})
},
|(source_config, target_config)| async move {
let migration_config = MigrationConfig {
source_config,
target_config,
options: MigrationOptions {
dry_run: false,
verify_integrity: false,
create_backup: false,
batch_size: *batch_size,
..Default::default()
},
};
let migrator = StorageMigrator::new(migration_config);
black_box(migrator.execute().await.unwrap())
},
criterion::BatchSize::SmallInput,
);
},
);
}
}
group.finish();
}
/// Benchmark migration with integrity verification
fn bench_migration_with_verification(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let migration_configs = rt.block_on(create_migration_configs());
let mut group = c.benchmark_group("migration_verification");
group.sample_size(10);
let dataset_size = 500;
for (migration_name, source_config, target_config) in migration_configs {
// Benchmark without verification
group.bench_with_input(
BenchmarkId::new(format!("{}_no_verify", migration_name), "no_verify"),
&(source_config.clone(), target_config.clone()),
|b, (source_config, target_config)| {
b.to_async(&rt).iter_batched(
|| {
rt.block_on(async {
let source_storage = create_storage(source_config.clone()).await.unwrap();
source_storage.init().await.unwrap();
let gen = MigrationBenchGenerator::new();
gen.populate_storage(&source_storage, dataset_size).await;
(source_config.clone(), target_config.clone())
})
},
|(source_config, target_config)| async move {
let migration_config = MigrationConfig {
source_config,
target_config,
options: MigrationOptions {
dry_run: false,
verify_integrity: false,
create_backup: false,
..Default::default()
},
};
let migrator = StorageMigrator::new(migration_config);
black_box(migrator.execute().await.unwrap())
},
criterion::BatchSize::SmallInput,
);
},
);
// Benchmark with verification
group.bench_with_input(
BenchmarkId::new(format!("{}_with_verify", migration_name), "with_verify"),
&(source_config.clone(), target_config.clone()),
|b, (source_config, target_config)| {
b.to_async(&rt).iter_batched(
|| {
rt.block_on(async {
let source_storage = create_storage(source_config.clone()).await.unwrap();
source_storage.init().await.unwrap();
let gen = MigrationBenchGenerator::new();
gen.populate_storage(&source_storage, dataset_size).await;
(source_config.clone(), target_config.clone())
})
},
|(source_config, target_config)| async move {
let migration_config = MigrationConfig {
source_config,
target_config,
options: MigrationOptions {
dry_run: false,
verify_integrity: true,
create_backup: false,
..Default::default()
},
};
let migrator = StorageMigrator::new(migration_config);
black_box(migrator.execute().await.unwrap())
},
criterion::BatchSize::SmallInput,
);
},
);
}
group.finish();
}
/// Benchmark migration with progress tracking overhead
fn bench_migration_progress_tracking(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let migration_configs = rt.block_on(create_migration_configs());
let mut group = c.benchmark_group("migration_progress_tracking");
group.sample_size(10);
let dataset_size = 500;
for (migration_name, source_config, target_config) in migration_configs.into_iter().take(1) {
// Benchmark without progress tracking
group.bench_with_input(
BenchmarkId::new(format!("{}_no_progress", migration_name), "no_progress"),
&(source_config.clone(), target_config.clone()),
|b, (source_config, target_config)| {
b.to_async(&rt).iter_batched(
|| {
rt.block_on(async {
let source_storage = create_storage(source_config.clone()).await.unwrap();
source_storage.init().await.unwrap();
let gen = MigrationBenchGenerator::new();
gen.populate_storage(&source_storage, dataset_size).await;
(source_config.clone(), target_config.clone())
})
},
|(source_config, target_config)| async move {
let migration_config = MigrationConfig {
source_config,
target_config,
options: MigrationOptions {
dry_run: false,
verify_integrity: false,
create_backup: false,
..Default::default()
},
};
let migrator = StorageMigrator::new(migration_config);
black_box(migrator.execute().await.unwrap())
},
criterion::BatchSize::SmallInput,
);
},
);
// Benchmark with progress tracking
group.bench_with_input(
BenchmarkId::new(format!("{}_with_progress", migration_name), "with_progress"),
&(source_config.clone(), target_config.clone()),
|b, (source_config, target_config)| {
b.to_async(&rt).iter_batched(
|| {
rt.block_on(async {
let source_storage = create_storage(source_config.clone()).await.unwrap();
source_storage.init().await.unwrap();
let gen = MigrationBenchGenerator::new();
gen.populate_storage(&source_storage, dataset_size).await;
(source_config.clone(), target_config.clone())
})
},
|(source_config, target_config)| async move {
let progress_callback = Arc::new(|_: &orchestrator::migration::MigrationProgress| {
// Simulate some progress handling overhead
std::hint::black_box(());
});
let migration_config = MigrationConfig {
source_config,
target_config,
options: MigrationOptions {
dry_run: false,
verify_integrity: false,
create_backup: false,
..Default::default()
},
};
let migrator = StorageMigrator::new(migration_config)
.with_progress_callback(progress_callback);
black_box(migrator.execute().await.unwrap())
},
criterion::BatchSize::SmallInput,
);
},
);
}
group.finish();
}
/// Benchmark dry run migration performance
fn bench_dry_run_migration(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let migration_configs = rt.block_on(create_migration_configs());
let mut group = c.benchmark_group("dry_run_migration");
group.sample_size(10);
let dataset_sizes = [1000, 5000, 10000];
for (migration_name, source_config, target_config) in migration_configs.into_iter().take(1) {
for dataset_size in dataset_sizes.iter() {
group.throughput(Throughput::Elements(*dataset_size as u64));
group.bench_with_input(
BenchmarkId::new(format!("{}_dry_{}", migration_name, dataset_size), dataset_size),
&(source_config.clone(), target_config.clone(), *dataset_size),
|b, (source_config, target_config, dataset_size)| {
b.to_async(&rt).iter_batched(
|| {
rt.block_on(async {
let source_storage = create_storage(source_config.clone()).await.unwrap();
source_storage.init().await.unwrap();
let gen = MigrationBenchGenerator::new();
gen.populate_storage(&source_storage, *dataset_size).await;
(source_config.clone(), target_config.clone())
})
},
|(source_config, target_config)| async move {
let migration_config = MigrationConfig {
source_config,
target_config,
options: MigrationOptions {
dry_run: true,
verify_integrity: false,
create_backup: false,
..Default::default()
},
};
let migrator = StorageMigrator::new(migration_config);
black_box(migrator.execute().await.unwrap())
},
criterion::BatchSize::SmallInput,
);
},
);
}
}
group.finish();
}
criterion_group!(
migration_benches,
bench_basic_migration,
bench_migration_batch_sizes,
bench_migration_with_verification,
bench_migration_progress_tracking,
bench_dry_run_migration
);
criterion_main!(migration_benches);