prvng_platform/orchestrator/tests/storage_integration.rs
2025-10-07 10:59:52 +01:00

656 lines
21 KiB
Rust

//! Comprehensive storage integration tests
//!
//! This module provides storage-agnostic integration tests that run against
//! all available storage backends, ensuring consistent behavior across
//! filesystem, SurrealDB embedded, and SurrealDB server implementations.
use chrono::{Duration, Utc};
use futures::StreamExt;
use std::collections::HashMap;
use tokio_test;
// Import test helpers and orchestrator types
mod helpers;
use helpers::{StorageTestRunner, TestDataGenerator, StorageAssertions, test_all_backends, test_with_backend};
use orchestrator::{TaskStatus, WorkflowTask};
use orchestrator::storage::{
TaskStorage, StorageResult, TimeRange, AuditEntry, Metric, TaskEventType
};
/// Test basic storage initialization and health checks
test_all_backends!(test_storage_initialization, |storage, _gen| async move {
// Storage should already be initialized by test runner
StorageAssertions::assert_healthy(&storage).await?;
Ok(())
});
/// Test basic CRUD operations
test_all_backends!(test_basic_crud_operations, |storage, gen| async move {
let task = gen.workflow_task();
let task_id = task.id.clone();
// Test task creation via enqueue
storage.enqueue(task.clone(), 1).await?;
StorageAssertions::assert_task_count(&storage, 1).await?;
StorageAssertions::assert_queue_size(&storage, 1).await?;
// Test task retrieval
StorageAssertions::assert_task_exists(&storage, &task_id, &task.name).await?;
// Test task update
let mut updated_task = task.clone();
updated_task.status = TaskStatus::Running;
updated_task.started_at = Some(Utc::now());
storage.update_task(updated_task).await?;
StorageAssertions::assert_task_status(&storage, &task_id, TaskStatus::Running).await?;
// Test status-only update
storage.update_task_status(&task_id, TaskStatus::Completed).await?;
StorageAssertions::assert_task_status(&storage, &task_id, TaskStatus::Completed).await?;
Ok(())
});
/// Test queue operations and priority handling
test_all_backends!(test_queue_operations, |storage, gen| async move {
// Enqueue tasks with different priorities
let high_priority_task = gen.workflow_task();
let medium_priority_task = gen.workflow_task();
let low_priority_task = gen.workflow_task();
storage.enqueue(medium_priority_task.clone(), 5).await?;
storage.enqueue(low_priority_task.clone(), 1).await?;
storage.enqueue(high_priority_task.clone(), 10).await?;
StorageAssertions::assert_queue_size(&storage, 3).await?;
// Test peek (should return highest priority without removing)
let peeked = storage.peek().await?;
assert!(peeked.is_some());
StorageAssertions::assert_queue_size(&storage, 3).await?;
// Test dequeue (should return tasks in priority order)
let first = storage.dequeue().await?.unwrap();
let second = storage.dequeue().await?.unwrap();
let third = storage.dequeue().await?.unwrap();
let fourth = storage.dequeue().await?;
// Verify priority ordering (highest first)
assert_eq!(first.id, high_priority_task.id);
assert_eq!(second.id, medium_priority_task.id);
assert_eq!(third.id, low_priority_task.id);
assert!(fourth.is_none());
StorageAssertions::assert_queue_size(&storage, 0).await?;
Ok(())
});
/// Test task listing and filtering
test_all_backends!(test_task_listing_and_filtering, |storage, gen| async move {
// Create tasks with different statuses
let pending_task = gen.workflow_task_with_status(TaskStatus::Pending);
let running_task = gen.workflow_task_with_status(TaskStatus::Running);
let completed_task = gen.workflow_task_with_status(TaskStatus::Completed);
let failed_task = gen.workflow_task_with_status(TaskStatus::Failed);
// Enqueue all tasks
storage.enqueue(pending_task, 1).await?;
storage.enqueue(running_task, 1).await?;
storage.enqueue(completed_task, 1).await?;
storage.enqueue(failed_task, 1).await?;
// Test listing all tasks
let all_tasks = storage.list_tasks(None).await?;
assert_eq!(all_tasks.len(), 4);
// Test filtering by status
let pending_tasks = storage.list_tasks(Some(TaskStatus::Pending)).await?;
assert_eq!(pending_tasks.len(), 1);
assert_eq!(pending_tasks[0].status, TaskStatus::Pending);
let completed_tasks = storage.list_tasks(Some(TaskStatus::Completed)).await?;
assert_eq!(completed_tasks.len(), 1);
assert_eq!(completed_tasks[0].status, TaskStatus::Completed);
Ok(())
});
/// Test task requeuing functionality
test_all_backends!(test_task_requeuing, |storage, gen| async move {
let mut task = gen.workflow_task();
task.status = TaskStatus::Failed;
let task_id = task.id.clone();
// Enqueue a failed task
storage.enqueue(task, 1).await?;
// Initial queue should be empty (failed tasks not in active queue)
let initial_queue_size = storage.queue_size().await?;
// Requeue the failed task
let requeued = storage.requeue_failed_task(&task_id).await?;
assert!(requeued);
// Verify task was requeued
let final_queue_size = storage.queue_size().await?;
assert!(final_queue_size > initial_queue_size);
// Verify task status was reset to pending
StorageAssertions::assert_task_status(&storage, &task_id, TaskStatus::Pending).await?;
// Test requeuing a non-failed task should return false
let non_failed_task = gen.workflow_task_with_status(TaskStatus::Completed);
storage.enqueue(non_failed_task.clone(), 1).await?;
let not_requeued = storage.requeue_failed_task(&non_failed_task.id).await?;
assert!(!not_requeued);
Ok(())
});
/// Test batch operations
test_all_backends!(test_batch_operations, |storage, gen| async move {
// Test batch enqueue
let batch_tasks: Vec<(WorkflowTask, u8)> = (0..5)
.map(|i| (gen.workflow_task(), (i % 3 + 1) as u8))
.collect();
let task_ids: Vec<String> = batch_tasks.iter().map(|(task, _)| task.id.clone()).collect();
storage.enqueue_batch(batch_tasks).await?;
StorageAssertions::assert_task_count(&storage, 5).await?;
// Test batch get
let retrieved_tasks = storage.get_tasks_batch(task_ids).await?;
assert_eq!(retrieved_tasks.len(), 5);
// Test batch update
let mut update_tasks: Vec<WorkflowTask> = retrieved_tasks.into_values().collect();
for task in &mut update_tasks {
task.status = TaskStatus::Running;
}
storage.update_tasks_batch(update_tasks).await?;
// Verify all tasks were updated
let all_tasks = storage.list_tasks(Some(TaskStatus::Running)).await?;
assert_eq!(all_tasks.len(), 5);
Ok(())
});
/// Test advanced search functionality
test_all_backends!(test_advanced_search, |storage, gen| async move {
let now = Utc::now();
let past = now - Duration::hours(1);
// Create tasks at different times with different patterns
let mut old_task = gen.workflow_task();
old_task.name = "old_test_task".to_string();
old_task.created_at = past;
old_task.status = TaskStatus::Completed;
let mut recent_task = gen.workflow_task();
recent_task.name = "recent_test_task".to_string();
recent_task.created_at = now;
recent_task.status = TaskStatus::Pending;
let mut another_recent = gen.workflow_task();
another_recent.name = "another_recent".to_string();
another_recent.created_at = now;
another_recent.status = TaskStatus::Failed;
storage.enqueue(old_task, 1).await?;
storage.enqueue(recent_task, 1).await?;
storage.enqueue(another_recent, 1).await?;
// Test search by name pattern
let test_tasks = storage.search_tasks(
Some("test".to_string()),
None,
None,
None,
None,
None,
).await?;
assert_eq!(test_tasks.len(), 2); // old_test_task and recent_test_task
// Test search by status filter
let failed_tasks = storage.search_tasks(
None,
Some(vec![TaskStatus::Failed]),
None,
None,
None,
None,
).await?;
assert_eq!(failed_tasks.len(), 1);
// Test search by time range
let recent_tasks = storage.search_tasks(
None,
None,
Some(now - Duration::minutes(30)),
None,
None,
None,
).await?;
assert_eq!(recent_tasks.len(), 2); // recent_test_task and another_recent
// Test search with limit
let limited_tasks = storage.search_tasks(
None,
None,
None,
None,
Some(1),
None,
).await?;
assert_eq!(limited_tasks.len(), 1);
Ok(())
});
/// Test dependency management
test_all_backends!(test_dependency_management, |storage, gen| async move {
let parent_task = gen.workflow_task();
let child_task = gen.workflow_task_with_deps(vec![parent_task.id.clone()]);
storage.enqueue(parent_task.clone(), 1).await?;
storage.enqueue(child_task.clone(), 1).await?;
// Test getting task dependencies
let deps = storage.get_task_dependencies(&child_task.id).await?;
assert_eq!(deps.len(), 1);
assert_eq!(deps[0], parent_task.id);
// Test getting dependent tasks
let dependents = storage.get_dependent_tasks(&parent_task.id).await?;
// Note: This test may vary based on implementation
// Some backends might not implement reverse dependency lookup
Ok(())
});
/// Test cleanup operations
test_all_backends!(test_cleanup_operations, |storage, gen| async move {
let now = Utc::now();
// Create old completed task
let mut old_completed = gen.workflow_task_with_status(TaskStatus::Completed);
old_completed.completed_at = Some(now - Duration::days(2));
// Create recent completed task
let mut recent_completed = gen.workflow_task_with_status(TaskStatus::Completed);
recent_completed.completed_at = Some(now);
// Create old running task (should not be cleaned up)
let mut old_running = gen.workflow_task_with_status(TaskStatus::Running);
old_running.created_at = now - Duration::days(2);
storage.enqueue(old_completed, 1).await?;
storage.enqueue(recent_completed, 1).await?;
storage.enqueue(old_running, 1).await?;
StorageAssertions::assert_task_count(&storage, 3).await?;
// Cleanup tasks older than 1 day
let cleaned_count = storage.cleanup_completed_tasks(Duration::days(1)).await?;
assert_eq!(cleaned_count, 1); // Only old_completed should be cleaned
StorageAssertions::assert_task_count(&storage, 2).await?;
Ok(())
});
/// Test audit log functionality
test_all_backends!(test_audit_log, |storage, gen| async move {
let task = gen.workflow_task();
let audit_entry = gen.audit_entry(task.id.clone());
// Record audit entry
storage.record_audit_entry(audit_entry.clone()).await?;
// Retrieve audit log
let time_range = TimeRange::last_hours(1);
let audit_log = storage.get_audit_log(time_range).await?;
assert_eq!(audit_log.len(), 1);
assert_eq!(audit_log[0].task_id, audit_entry.task_id);
assert_eq!(audit_log[0].operation, audit_entry.operation);
Ok(())
});
/// Test metrics functionality
test_all_backends!(test_metrics, |storage, gen| async move {
let metric1 = gen.metric("test_counter", 42.0);
let metric2 = gen.metric("test_gauge", 3.14);
// Record metrics
storage.record_metric(metric1.clone()).await?;
storage.record_metric(metric2.clone()).await?;
// Retrieve metrics
let time_range = TimeRange::last_hours(1);
let metrics = storage.get_metrics(time_range).await?;
assert_eq!(metrics.len(), 2);
let metric_names: Vec<&String> = metrics.iter().map(|m| &m.name).collect();
assert!(metric_names.contains(&&metric1.name));
assert!(metric_names.contains(&&metric2.name));
Ok(())
});
/// Test authentication functionality
test_all_backends!(test_authentication, |storage, _gen| async move {
// Test successful authentication
let token = storage.authenticate("test", "test").await?;
assert!(!token.token.is_empty());
assert_eq!(token.user_id, "test");
assert!(!token.is_expired());
// Test token validation
let validated = storage.validate_token(&token.token).await?;
assert!(validated.is_some());
assert_eq!(validated.unwrap().user_id, "test");
// Test invalid token validation
let invalid_validation = storage.validate_token("invalid_token").await?;
assert!(invalid_validation.is_none());
// Test failed authentication
let auth_result = storage.authenticate("invalid", "wrong").await;
assert!(auth_result.is_err());
Ok(())
});
/// Test event subscription and publishing
test_all_backends!(test_event_system, |storage, gen| async move {
// Create and publish events
let task = gen.workflow_task();
let event1 = gen.task_event(task.id.clone(), TaskEventType::Created);
let event2 = gen.task_event(task.id.clone(), TaskEventType::StatusChanged);
storage.publish_event(event1).await?;
storage.publish_event(event2).await?;
// Test event subscription
let mut event_stream = storage.subscribe_to_events(None).await?;
// Collect events from stream
let mut events = Vec::new();
while let Some(event) = event_stream.next().await {
events.push(event);
if events.len() >= 2 {
break;
}
}
assert_eq!(events.len(), 2);
assert_eq!(events[0].task_id, task.id);
Ok(())
});
/// Test backup and restore functionality
test_all_backends!(test_backup_restore, |storage, gen| async move {
// Create some test data
let task = gen.workflow_task();
storage.enqueue(task.clone(), 1).await?;
// Create backup
let backup_path = "/tmp/test_backup.json";
let backup_result = storage.create_backup(backup_path).await;
// Note: Some backends might not support backup/restore
match backup_result {
Ok(_) => {
// If backup succeeds, test restore
let restore_result = storage.restore_from_backup(backup_path).await;
// Restore might also not be supported
match restore_result {
Ok(_) => {
// Verify data still exists after restore
StorageAssertions::assert_task_exists(&storage, &task.id, &task.name).await?;
}
Err(_) => {
// Restore not supported, but backup worked
println!("Backup supported but restore not implemented");
}
}
}
Err(_) => {
// Backup not supported by this backend
println!("Backup/restore not supported by this backend");
}
}
Ok(())
});
/// Test storage statistics
test_all_backends!(test_storage_statistics, |storage, gen| async move {
// Add some test data
let tasks = gen.workflow_tasks_batch(5);
for (i, task) in tasks.into_iter().enumerate() {
storage.enqueue(task, (i % 3 + 1) as u8).await?;
}
// Get statistics
let stats = storage.get_statistics().await?;
assert_eq!(stats.total_tasks, 5);
assert!(stats.pending_tasks > 0);
// Statistics should be consistent with actual data
let total = stats.pending_tasks + stats.running_tasks + stats.completed_tasks + stats.failed_tasks;
assert_eq!(total, stats.total_tasks);
Ok(())
});
/// Test concurrent operations
test_all_backends!(test_concurrent_operations, |storage, gen| async move {
use tokio::task::JoinSet;
let mut join_set = JoinSet::new();
let storage = std::sync::Arc::new(storage);
// Spawn multiple concurrent enqueue operations
for i in 0..10 {
let storage_clone = storage.clone();
let task = gen.workflow_task();
join_set.spawn(async move {
storage_clone.enqueue(task, (i % 5 + 1) as u8).await
});
}
// Wait for all operations to complete
let mut success_count = 0;
while let Some(result) = join_set.join_next().await {
if result.unwrap().is_ok() {
success_count += 1;
}
}
// All operations should succeed
assert_eq!(success_count, 10);
StorageAssertions::assert_task_count(&storage, 10).await?;
// Test concurrent dequeue operations
let mut join_set = JoinSet::new();
for _ in 0..5 {
let storage_clone = storage.clone();
join_set.spawn(async move {
storage_clone.dequeue().await
});
}
let mut dequeued_count = 0;
while let Some(result) = join_set.join_next().await {
if let Ok(Some(_)) = result.unwrap() {
dequeued_count += 1;
}
}
// Should have dequeued 5 tasks
assert_eq!(dequeued_count, 5);
Ok(())
});
/// Test error handling and edge cases
test_all_backends!(test_error_handling, |storage, _gen| async move {
// Test getting non-existent task
let result = storage.get_task("non_existent_id").await?;
assert!(result.is_none());
// Test updating non-existent task status
let result = storage.update_task_status("non_existent_id", TaskStatus::Completed).await;
assert!(result.is_err());
// Test dequeue from empty queue
// First ensure queue is empty
while storage.dequeue().await?.is_some() {
// Drain the queue
}
let result = storage.dequeue().await?;
assert!(result.is_none());
// Test peek empty queue
let result = storage.peek().await?;
assert!(result.is_none());
Ok(())
});
/// Performance test for bulk operations
test_all_backends!(test_bulk_performance, |storage, gen| async move {
use std::time::Instant;
let batch_size = 100;
let start = Instant::now();
// Generate large batch of tasks
let batch_tasks: Vec<(WorkflowTask, u8)> = (0..batch_size)
.map(|i| (gen.workflow_task(), (i % 10 + 1) as u8))
.collect();
// Test batch enqueue performance
storage.enqueue_batch(batch_tasks).await?;
let enqueue_duration = start.elapsed();
println!("Batch enqueue of {} tasks took: {:?}", batch_size, enqueue_duration);
// Verify all tasks were stored
StorageAssertions::assert_task_count(&storage, batch_size).await?;
// Test bulk retrieval performance
let start = Instant::now();
let all_tasks = storage.list_tasks(None).await?;
let list_duration = start.elapsed();
println!("List all {} tasks took: {:?}", batch_size, list_duration);
assert_eq!(all_tasks.len(), batch_size);
// Performance should be reasonable (adjust thresholds based on requirements)
assert!(enqueue_duration.as_millis() < 5000, "Batch enqueue too slow");
assert!(list_duration.as_millis() < 1000, "List tasks too slow");
Ok(())
});
// Backend-specific tests that only run when features are available
#[cfg(feature = "surrealdb")]
test_with_backend!(test_surrealdb_embedded_specific, "surrealdb-embedded", |storage, gen| async move {
// Test SurrealDB embedded specific functionality
let task = gen.workflow_task();
storage.enqueue(task.clone(), 1).await?;
// SurrealDB should handle complex queries efficiently
let complex_search = storage.search_tasks(
Some("Test".to_string()),
Some(vec![TaskStatus::Pending, TaskStatus::Running]),
Some(Utc::now() - Duration::hours(1)),
Some(Utc::now() + Duration::hours(1)),
Some(10),
Some(0),
).await?;
// Should find the task we just added
assert!(!complex_search.is_empty());
Ok(())
});
#[cfg(feature = "surrealdb")]
test_with_backend!(test_surrealdb_server_specific, "surrealdb-server", |storage, gen| async move {
// Test SurrealDB server specific functionality
let task = gen.workflow_task();
storage.enqueue(task.clone(), 1).await?;
// Test that server connection is working
StorageAssertions::assert_healthy(&storage).await?;
// Server backend should support all advanced features
let stats = storage.get_statistics().await?;
assert_eq!(stats.total_tasks, 1);
Ok(())
});
/// Integration test that verifies all storage backends have consistent behavior
#[tokio::test]
async fn test_cross_backend_consistency() {
let mut runner = StorageTestRunner::new();
let gen = TestDataGenerator::new();
// Create the same test data for each backend
let test_tasks: Vec<WorkflowTask> = (0..3)
.map(|_| gen.workflow_task())
.collect();
let mut results = Vec::new();
// Run the same operations on all backends
let configs = runner.env.all_storage_configs();
for config in configs {
let storage = orchestrator::storage::create_storage(config.clone()).await
.expect(&format!("Failed to create {} storage", config.storage_type));
storage.init().await
.expect(&format!("Failed to initialize {} storage", config.storage_type));
// Perform identical operations
for (i, task) in test_tasks.iter().enumerate() {
storage.enqueue(task.clone(), (i + 1) as u8).await.unwrap();
}
let all_tasks = storage.list_tasks(None).await.unwrap();
let queue_size = storage.queue_size().await.unwrap();
let total_tasks = storage.total_tasks().await.unwrap();
results.push((config.storage_type.clone(), all_tasks.len(), queue_size, total_tasks));
}
// Verify all backends produced consistent results
if results.len() > 1 {
let first = &results[0];
for result in &results[1..] {
assert_eq!(first.1, result.1, "Task count mismatch between {} and {}", first.0, result.0);
assert_eq!(first.2, result.2, "Queue size mismatch between {} and {}", first.0, result.0);
assert_eq!(first.3, result.3, "Total tasks mismatch between {} and {}", first.0, result.0);
}
}
println!("Cross-backend consistency verified for {} backends", results.len());
}