prvng_platform/orchestrator/benches/storage_benchmarks.rs

498 lines
17 KiB
Rust
Raw Permalink Normal View History

2025-10-07 10:59:52 +01:00
//! Storage performance benchmarks
//!
//! This module provides comprehensive performance benchmarks for all storage backends,
//! measuring throughput, latency, and scalability characteristics.
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use std::time::Duration;
use tempfile::TempDir;
use tokio::runtime::Runtime;
// Import orchestrator types
use orchestrator::{TaskStatus, WorkflowTask};
use orchestrator::storage::{StorageConfig, create_storage, TaskStorage};
/// Test data generator for benchmarks
struct BenchDataGenerator {
counter: std::sync::atomic::AtomicU64,
}
impl BenchDataGenerator {
fn new() -> Self {
Self {
counter: std::sync::atomic::AtomicU64::new(0),
}
}
fn generate_task(&self) -> WorkflowTask {
let id = self.counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
WorkflowTask {
id: format!("bench_task_{}", id),
name: format!("Benchmark Task {}", id),
command: "echo".to_string(),
args: vec!["benchmark".to_string()],
dependencies: vec![],
status: TaskStatus::Pending,
created_at: chrono::Utc::now(),
started_at: None,
completed_at: None,
output: None,
error: None,
}
}
fn generate_batch(&self, size: usize) -> Vec<(WorkflowTask, u8)> {
(0..size)
.map(|i| (self.generate_task(), (i % 10 + 1) as u8))
.collect()
}
}
/// Create storage configurations for benchmarking
async fn create_benchmark_configs() -> Vec<(String, StorageConfig)> {
let mut configs = Vec::new();
// Filesystem storage
let fs_temp = TempDir::new().unwrap();
let fs_config = StorageConfig {
storage_type: "filesystem".to_string(),
data_dir: fs_temp.path().to_string_lossy().to_string(),
..Default::default()
};
configs.push(("filesystem".to_string(), fs_config));
// Keep temp directory alive
std::mem::forget(fs_temp);
#[cfg(feature = "surrealdb")]
{
// SurrealDB embedded storage
let embedded_temp = TempDir::new().unwrap();
let embedded_config = StorageConfig {
storage_type: "surrealdb-embedded".to_string(),
data_dir: embedded_temp.path().to_string_lossy().to_string(),
surrealdb_namespace: Some("bench".to_string()),
surrealdb_database: Some("tasks".to_string()),
..Default::default()
};
configs.push(("surrealdb-embedded".to_string(), embedded_config));
std::mem::forget(embedded_temp);
// SurrealDB server storage (in-memory for benchmarking)
let server_config = StorageConfig {
storage_type: "surrealdb-server".to_string(),
data_dir: "".to_string(),
surrealdb_url: Some("memory://benchmark".to_string()),
surrealdb_namespace: Some("bench".to_string()),
surrealdb_database: Some("tasks".to_string()),
surrealdb_username: Some("bench".to_string()),
surrealdb_password: Some("bench".to_string()),
};
configs.push(("surrealdb-server".to_string(), server_config));
}
configs
}
/// Benchmark single task enqueue operations
fn bench_single_enqueue(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let configs = rt.block_on(create_benchmark_configs());
let mut group = c.benchmark_group("single_enqueue");
for (backend_name, config) in configs {
group.bench_with_input(
BenchmarkId::new("enqueue", &backend_name),
&config,
|b, config| {
let rt = Runtime::new().unwrap();
let storage = rt.block_on(async {
let storage = create_storage(config.clone()).await.unwrap();
storage.init().await.unwrap();
storage
});
let gen = BenchDataGenerator::new();
b.to_async(&rt).iter(|| async {
let task = gen.generate_task();
black_box(storage.enqueue(task, 1).await.unwrap())
});
},
);
}
group.finish();
}
/// Benchmark single task dequeue operations
fn bench_single_dequeue(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let configs = rt.block_on(create_benchmark_configs());
let mut group = c.benchmark_group("single_dequeue");
for (backend_name, config) in configs {
group.bench_with_input(
BenchmarkId::new("dequeue", &backend_name),
&config,
|b, config| {
let rt = Runtime::new().unwrap();
let storage = rt.block_on(async {
let storage = create_storage(config.clone()).await.unwrap();
storage.init().await.unwrap();
storage
});
let gen = BenchDataGenerator::new();
// Pre-populate with tasks for dequeue benchmark
rt.block_on(async {
let tasks = gen.generate_batch(1000);
storage.enqueue_batch(tasks).await.unwrap();
});
b.to_async(&rt).iter(|| async {
black_box(storage.dequeue().await.unwrap())
});
},
);
}
group.finish();
}
/// Benchmark batch enqueue operations
fn bench_batch_enqueue(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let configs = rt.block_on(create_benchmark_configs());
let mut group = c.benchmark_group("batch_enqueue");
group.sample_size(10); // Reduce sample size for batch operations
let batch_sizes = [10, 50, 100, 500, 1000];
for (backend_name, config) in configs {
for batch_size in batch_sizes.iter() {
group.throughput(Throughput::Elements(*batch_size as u64));
group.bench_with_input(
BenchmarkId::new(format!("{}_{}", backend_name, batch_size), batch_size),
&(config.clone(), *batch_size),
|b, (config, batch_size)| {
let rt = Runtime::new().unwrap();
let storage = rt.block_on(async {
let storage = create_storage(config.clone()).await.unwrap();
storage.init().await.unwrap();
storage
});
b.to_async(&rt).iter(|| async {
let gen = BenchDataGenerator::new();
let batch = gen.generate_batch(*batch_size);
black_box(storage.enqueue_batch(batch).await.unwrap())
});
},
);
}
}
group.finish();
}
/// Benchmark task retrieval operations
fn bench_task_get(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let configs = rt.block_on(create_benchmark_configs());
let mut group = c.benchmark_group("task_get");
for (backend_name, config) in configs {
group.bench_with_input(
BenchmarkId::new("get_task", &backend_name),
&config,
|b, config| {
let rt = Runtime::new().unwrap();
let (storage, task_ids) = rt.block_on(async {
let storage = create_storage(config.clone()).await.unwrap();
storage.init().await.unwrap();
// Pre-populate with tasks
let gen = BenchDataGenerator::new();
let tasks = gen.generate_batch(100);
let task_ids: Vec<String> = tasks.iter().map(|(task, _)| task.id.clone()).collect();
storage.enqueue_batch(tasks).await.unwrap();
(storage, task_ids)
});
let mut id_index = 0;
b.to_async(&rt).iter(|| async {
let id = &task_ids[id_index % task_ids.len()];
id_index += 1;
black_box(storage.get_task(id).await.unwrap())
});
},
);
}
group.finish();
}
/// Benchmark task listing operations
fn bench_task_list(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let configs = rt.block_on(create_benchmark_configs());
let mut group = c.benchmark_group("task_list");
group.sample_size(10);
let dataset_sizes = [100, 500, 1000];
for (backend_name, config) in configs {
for dataset_size in dataset_sizes.iter() {
group.throughput(Throughput::Elements(*dataset_size as u64));
group.bench_with_input(
BenchmarkId::new(format!("{}_{}", backend_name, dataset_size), dataset_size),
&(config.clone(), *dataset_size),
|b, (config, dataset_size)| {
let rt = Runtime::new().unwrap();
let storage = rt.block_on(async {
let storage = create_storage(config.clone()).await.unwrap();
storage.init().await.unwrap();
// Pre-populate with tasks
let gen = BenchDataGenerator::new();
let tasks = gen.generate_batch(*dataset_size);
storage.enqueue_batch(tasks).await.unwrap();
storage
});
b.to_async(&rt).iter(|| async {
black_box(storage.list_tasks(None).await.unwrap())
});
},
);
}
}
group.finish();
}
/// Benchmark task update operations
fn bench_task_update(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let configs = rt.block_on(create_benchmark_configs());
let mut group = c.benchmark_group("task_update");
for (backend_name, config) in configs {
group.bench_with_input(
BenchmarkId::new("update_task_status", &backend_name),
&config,
|b, config| {
let rt = Runtime::new().unwrap();
let (storage, task_ids) = rt.block_on(async {
let storage = create_storage(config.clone()).await.unwrap();
storage.init().await.unwrap();
// Pre-populate with tasks
let gen = BenchDataGenerator::new();
let tasks = gen.generate_batch(100);
let task_ids: Vec<String> = tasks.iter().map(|(task, _)| task.id.clone()).collect();
storage.enqueue_batch(tasks).await.unwrap();
(storage, task_ids)
});
let mut id_index = 0;
b.to_async(&rt).iter(|| async {
let id = &task_ids[id_index % task_ids.len()];
id_index += 1;
let status = match id_index % 4 {
0 => TaskStatus::Pending,
1 => TaskStatus::Running,
2 => TaskStatus::Completed,
_ => TaskStatus::Failed,
};
black_box(storage.update_task_status(id, status).await.unwrap())
});
},
);
}
group.finish();
}
/// Benchmark concurrent operations
fn bench_concurrent_operations(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let configs = rt.block_on(create_benchmark_configs());
let mut group = c.benchmark_group("concurrent_operations");
group.sample_size(10);
group.measurement_time(Duration::from_secs(10));
let concurrency_levels = [1, 4, 8, 16];
for (backend_name, config) in configs {
for concurrency in concurrency_levels.iter() {
group.throughput(Throughput::Elements(*concurrency as u64));
group.bench_with_input(
BenchmarkId::new(format!("{}_{}", backend_name, concurrency), concurrency),
&(config.clone(), *concurrency),
|b, (config, concurrency)| {
let rt = Runtime::new().unwrap();
let storage = rt.block_on(async {
let storage = create_storage(config.clone()).await.unwrap();
storage.init().await.unwrap();
std::sync::Arc::new(storage)
});
b.to_async(&rt).iter(|| async {
let mut handles = Vec::new();
let gen = BenchDataGenerator::new();
for _ in 0..*concurrency {
let storage_clone = storage.clone();
let task = gen.generate_task();
handles.push(tokio::spawn(async move {
storage_clone.enqueue(task, 1).await.unwrap();
storage_clone.dequeue().await.unwrap()
}));
}
for handle in handles {
black_box(handle.await.unwrap());
}
});
},
);
}
}
group.finish();
}
/// Benchmark search operations
fn bench_search_operations(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let configs = rt.block_on(create_benchmark_configs());
let mut group = c.benchmark_group("search_operations");
group.sample_size(10);
for (backend_name, config) in configs {
group.bench_with_input(
BenchmarkId::new("search_tasks", &backend_name),
&config,
|b, config| {
let rt = Runtime::new().unwrap();
let storage = rt.block_on(async {
let storage = create_storage(config.clone()).await.unwrap();
storage.init().await.unwrap();
// Pre-populate with diverse tasks
let gen = BenchDataGenerator::new();
let mut tasks = Vec::new();
for i in 0..500 {
let mut task = gen.generate_task();
task.name = format!("SearchableTask_{}", i % 10);
task.status = match i % 4 {
0 => TaskStatus::Pending,
1 => TaskStatus::Running,
2 => TaskStatus::Completed,
_ => TaskStatus::Failed,
};
tasks.push((task, 1u8));
}
storage.enqueue_batch(tasks).await.unwrap();
storage
});
b.to_async(&rt).iter(|| async {
black_box(
storage.search_tasks(
Some("SearchableTask".to_string()),
Some(vec![TaskStatus::Pending, TaskStatus::Running]),
None,
None,
Some(50),
None,
).await.unwrap()
)
});
},
);
}
group.finish();
}
/// Benchmark memory usage and cleanup
fn bench_cleanup_operations(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let configs = rt.block_on(create_benchmark_configs());
let mut group = c.benchmark_group("cleanup_operations");
group.sample_size(10);
for (backend_name, config) in configs {
group.bench_with_input(
BenchmarkId::new("cleanup_completed", &backend_name),
&config,
|b, config| {
let rt = Runtime::new().unwrap();
b.to_async(&rt).iter_batched(
|| {
// Setup: Create storage with completed tasks
rt.block_on(async {
let storage = create_storage(config.clone()).await.unwrap();
storage.init().await.unwrap();
let gen = BenchDataGenerator::new();
let tasks = gen.generate_batch(100);
storage.enqueue_batch(tasks).await.unwrap();
// Mark half as completed with old timestamps
let all_tasks = storage.list_tasks(None).await.unwrap();
for (i, task) in all_tasks.iter().enumerate() {
if i % 2 == 0 {
storage.update_task_status(&task.id, TaskStatus::Completed).await.unwrap();
}
}
storage
})
},
|storage| async move {
black_box(
storage.cleanup_completed_tasks(chrono::Duration::seconds(0)).await.unwrap()
)
},
criterion::BatchSize::SmallInput,
);
},
);
}
group.finish();
}
criterion_group!(
storage_benches,
bench_single_enqueue,
bench_single_dequeue,
bench_batch_enqueue,
bench_task_get,
bench_task_list,
bench_task_update,
bench_concurrent_operations,
bench_search_operations,
bench_cleanup_operations
);
criterion_main!(storage_benches);