//! Storage performance benchmarks //! //! This module provides comprehensive performance benchmarks for all storage backends, //! measuring throughput, latency, and scalability characteristics. use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; use std::time::Duration; use tempfile::TempDir; use tokio::runtime::Runtime; // Import orchestrator types use orchestrator::{TaskStatus, WorkflowTask}; use orchestrator::storage::{StorageConfig, create_storage, TaskStorage}; /// Test data generator for benchmarks struct BenchDataGenerator { counter: std::sync::atomic::AtomicU64, } impl BenchDataGenerator { fn new() -> Self { Self { counter: std::sync::atomic::AtomicU64::new(0), } } fn generate_task(&self) -> WorkflowTask { let id = self.counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst); WorkflowTask { id: format!("bench_task_{}", id), name: format!("Benchmark Task {}", id), command: "echo".to_string(), args: vec!["benchmark".to_string()], dependencies: vec![], status: TaskStatus::Pending, created_at: chrono::Utc::now(), started_at: None, completed_at: None, output: None, error: None, } } fn generate_batch(&self, size: usize) -> Vec<(WorkflowTask, u8)> { (0..size) .map(|i| (self.generate_task(), (i % 10 + 1) as u8)) .collect() } } /// Create storage configurations for benchmarking async fn create_benchmark_configs() -> Vec<(String, StorageConfig)> { let mut configs = Vec::new(); // Filesystem storage let fs_temp = TempDir::new().unwrap(); let fs_config = StorageConfig { storage_type: "filesystem".to_string(), data_dir: fs_temp.path().to_string_lossy().to_string(), ..Default::default() }; configs.push(("filesystem".to_string(), fs_config)); // Keep temp directory alive std::mem::forget(fs_temp); #[cfg(feature = "surrealdb")] { // SurrealDB embedded storage let embedded_temp = TempDir::new().unwrap(); let embedded_config = StorageConfig { storage_type: "surrealdb-embedded".to_string(), data_dir: embedded_temp.path().to_string_lossy().to_string(), surrealdb_namespace: Some("bench".to_string()), surrealdb_database: Some("tasks".to_string()), ..Default::default() }; configs.push(("surrealdb-embedded".to_string(), embedded_config)); std::mem::forget(embedded_temp); // SurrealDB server storage (in-memory for benchmarking) let server_config = StorageConfig { storage_type: "surrealdb-server".to_string(), data_dir: "".to_string(), surrealdb_url: Some("memory://benchmark".to_string()), surrealdb_namespace: Some("bench".to_string()), surrealdb_database: Some("tasks".to_string()), surrealdb_username: Some("bench".to_string()), surrealdb_password: Some("bench".to_string()), }; configs.push(("surrealdb-server".to_string(), server_config)); } configs } /// Benchmark single task enqueue operations fn bench_single_enqueue(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let configs = rt.block_on(create_benchmark_configs()); let mut group = c.benchmark_group("single_enqueue"); for (backend_name, config) in configs { group.bench_with_input( BenchmarkId::new("enqueue", &backend_name), &config, |b, config| { let rt = Runtime::new().unwrap(); let storage = rt.block_on(async { let storage = create_storage(config.clone()).await.unwrap(); storage.init().await.unwrap(); storage }); let gen = BenchDataGenerator::new(); b.to_async(&rt).iter(|| async { let task = gen.generate_task(); black_box(storage.enqueue(task, 1).await.unwrap()) }); }, ); } group.finish(); } /// Benchmark single task dequeue operations fn bench_single_dequeue(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let configs = rt.block_on(create_benchmark_configs()); let mut group = c.benchmark_group("single_dequeue"); for (backend_name, config) in configs { group.bench_with_input( BenchmarkId::new("dequeue", &backend_name), &config, |b, config| { let rt = Runtime::new().unwrap(); let storage = rt.block_on(async { let storage = create_storage(config.clone()).await.unwrap(); storage.init().await.unwrap(); storage }); let gen = BenchDataGenerator::new(); // Pre-populate with tasks for dequeue benchmark rt.block_on(async { let tasks = gen.generate_batch(1000); storage.enqueue_batch(tasks).await.unwrap(); }); b.to_async(&rt).iter(|| async { black_box(storage.dequeue().await.unwrap()) }); }, ); } group.finish(); } /// Benchmark batch enqueue operations fn bench_batch_enqueue(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let configs = rt.block_on(create_benchmark_configs()); let mut group = c.benchmark_group("batch_enqueue"); group.sample_size(10); // Reduce sample size for batch operations let batch_sizes = [10, 50, 100, 500, 1000]; for (backend_name, config) in configs { for batch_size in batch_sizes.iter() { group.throughput(Throughput::Elements(*batch_size as u64)); group.bench_with_input( BenchmarkId::new(format!("{}_{}", backend_name, batch_size), batch_size), &(config.clone(), *batch_size), |b, (config, batch_size)| { let rt = Runtime::new().unwrap(); let storage = rt.block_on(async { let storage = create_storage(config.clone()).await.unwrap(); storage.init().await.unwrap(); storage }); b.to_async(&rt).iter(|| async { let gen = BenchDataGenerator::new(); let batch = gen.generate_batch(*batch_size); black_box(storage.enqueue_batch(batch).await.unwrap()) }); }, ); } } group.finish(); } /// Benchmark task retrieval operations fn bench_task_get(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let configs = rt.block_on(create_benchmark_configs()); let mut group = c.benchmark_group("task_get"); for (backend_name, config) in configs { group.bench_with_input( BenchmarkId::new("get_task", &backend_name), &config, |b, config| { let rt = Runtime::new().unwrap(); let (storage, task_ids) = rt.block_on(async { let storage = create_storage(config.clone()).await.unwrap(); storage.init().await.unwrap(); // Pre-populate with tasks let gen = BenchDataGenerator::new(); let tasks = gen.generate_batch(100); let task_ids: Vec = tasks.iter().map(|(task, _)| task.id.clone()).collect(); storage.enqueue_batch(tasks).await.unwrap(); (storage, task_ids) }); let mut id_index = 0; b.to_async(&rt).iter(|| async { let id = &task_ids[id_index % task_ids.len()]; id_index += 1; black_box(storage.get_task(id).await.unwrap()) }); }, ); } group.finish(); } /// Benchmark task listing operations fn bench_task_list(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let configs = rt.block_on(create_benchmark_configs()); let mut group = c.benchmark_group("task_list"); group.sample_size(10); let dataset_sizes = [100, 500, 1000]; for (backend_name, config) in configs { for dataset_size in dataset_sizes.iter() { group.throughput(Throughput::Elements(*dataset_size as u64)); group.bench_with_input( BenchmarkId::new(format!("{}_{}", backend_name, dataset_size), dataset_size), &(config.clone(), *dataset_size), |b, (config, dataset_size)| { let rt = Runtime::new().unwrap(); let storage = rt.block_on(async { let storage = create_storage(config.clone()).await.unwrap(); storage.init().await.unwrap(); // Pre-populate with tasks let gen = BenchDataGenerator::new(); let tasks = gen.generate_batch(*dataset_size); storage.enqueue_batch(tasks).await.unwrap(); storage }); b.to_async(&rt).iter(|| async { black_box(storage.list_tasks(None).await.unwrap()) }); }, ); } } group.finish(); } /// Benchmark task update operations fn bench_task_update(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let configs = rt.block_on(create_benchmark_configs()); let mut group = c.benchmark_group("task_update"); for (backend_name, config) in configs { group.bench_with_input( BenchmarkId::new("update_task_status", &backend_name), &config, |b, config| { let rt = Runtime::new().unwrap(); let (storage, task_ids) = rt.block_on(async { let storage = create_storage(config.clone()).await.unwrap(); storage.init().await.unwrap(); // Pre-populate with tasks let gen = BenchDataGenerator::new(); let tasks = gen.generate_batch(100); let task_ids: Vec = tasks.iter().map(|(task, _)| task.id.clone()).collect(); storage.enqueue_batch(tasks).await.unwrap(); (storage, task_ids) }); let mut id_index = 0; b.to_async(&rt).iter(|| async { let id = &task_ids[id_index % task_ids.len()]; id_index += 1; let status = match id_index % 4 { 0 => TaskStatus::Pending, 1 => TaskStatus::Running, 2 => TaskStatus::Completed, _ => TaskStatus::Failed, }; black_box(storage.update_task_status(id, status).await.unwrap()) }); }, ); } group.finish(); } /// Benchmark concurrent operations fn bench_concurrent_operations(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let configs = rt.block_on(create_benchmark_configs()); let mut group = c.benchmark_group("concurrent_operations"); group.sample_size(10); group.measurement_time(Duration::from_secs(10)); let concurrency_levels = [1, 4, 8, 16]; for (backend_name, config) in configs { for concurrency in concurrency_levels.iter() { group.throughput(Throughput::Elements(*concurrency as u64)); group.bench_with_input( BenchmarkId::new(format!("{}_{}", backend_name, concurrency), concurrency), &(config.clone(), *concurrency), |b, (config, concurrency)| { let rt = Runtime::new().unwrap(); let storage = rt.block_on(async { let storage = create_storage(config.clone()).await.unwrap(); storage.init().await.unwrap(); std::sync::Arc::new(storage) }); b.to_async(&rt).iter(|| async { let mut handles = Vec::new(); let gen = BenchDataGenerator::new(); for _ in 0..*concurrency { let storage_clone = storage.clone(); let task = gen.generate_task(); handles.push(tokio::spawn(async move { storage_clone.enqueue(task, 1).await.unwrap(); storage_clone.dequeue().await.unwrap() })); } for handle in handles { black_box(handle.await.unwrap()); } }); }, ); } } group.finish(); } /// Benchmark search operations fn bench_search_operations(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let configs = rt.block_on(create_benchmark_configs()); let mut group = c.benchmark_group("search_operations"); group.sample_size(10); for (backend_name, config) in configs { group.bench_with_input( BenchmarkId::new("search_tasks", &backend_name), &config, |b, config| { let rt = Runtime::new().unwrap(); let storage = rt.block_on(async { let storage = create_storage(config.clone()).await.unwrap(); storage.init().await.unwrap(); // Pre-populate with diverse tasks let gen = BenchDataGenerator::new(); let mut tasks = Vec::new(); for i in 0..500 { let mut task = gen.generate_task(); task.name = format!("SearchableTask_{}", i % 10); task.status = match i % 4 { 0 => TaskStatus::Pending, 1 => TaskStatus::Running, 2 => TaskStatus::Completed, _ => TaskStatus::Failed, }; tasks.push((task, 1u8)); } storage.enqueue_batch(tasks).await.unwrap(); storage }); b.to_async(&rt).iter(|| async { black_box( storage.search_tasks( Some("SearchableTask".to_string()), Some(vec![TaskStatus::Pending, TaskStatus::Running]), None, None, Some(50), None, ).await.unwrap() ) }); }, ); } group.finish(); } /// Benchmark memory usage and cleanup fn bench_cleanup_operations(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let configs = rt.block_on(create_benchmark_configs()); let mut group = c.benchmark_group("cleanup_operations"); group.sample_size(10); for (backend_name, config) in configs { group.bench_with_input( BenchmarkId::new("cleanup_completed", &backend_name), &config, |b, config| { let rt = Runtime::new().unwrap(); b.to_async(&rt).iter_batched( || { // Setup: Create storage with completed tasks rt.block_on(async { let storage = create_storage(config.clone()).await.unwrap(); storage.init().await.unwrap(); let gen = BenchDataGenerator::new(); let tasks = gen.generate_batch(100); storage.enqueue_batch(tasks).await.unwrap(); // Mark half as completed with old timestamps let all_tasks = storage.list_tasks(None).await.unwrap(); for (i, task) in all_tasks.iter().enumerate() { if i % 2 == 0 { storage.update_task_status(&task.id, TaskStatus::Completed).await.unwrap(); } } storage }) }, |storage| async move { black_box( storage.cleanup_completed_tasks(chrono::Duration::seconds(0)).await.unwrap() ) }, criterion::BatchSize::SmallInput, ); }, ); } group.finish(); } criterion_group!( storage_benches, bench_single_enqueue, bench_single_dequeue, bench_batch_enqueue, bench_task_get, bench_task_list, bench_task_update, bench_concurrent_operations, bench_search_operations, bench_cleanup_operations ); criterion_main!(storage_benches);