656 lines
21 KiB
Rust
656 lines
21 KiB
Rust
//! Comprehensive storage integration tests
|
|
//!
|
|
//! This module provides storage-agnostic integration tests that run against
|
|
//! all available storage backends, ensuring consistent behavior across
|
|
//! filesystem, SurrealDB embedded, and SurrealDB server implementations.
|
|
|
|
use chrono::{Duration, Utc};
|
|
use futures::StreamExt;
|
|
use std::collections::HashMap;
|
|
use tokio_test;
|
|
|
|
// Import test helpers and orchestrator types
|
|
mod helpers;
|
|
use helpers::{StorageTestRunner, TestDataGenerator, StorageAssertions, test_all_backends, test_with_backend};
|
|
|
|
use orchestrator::{TaskStatus, WorkflowTask};
|
|
use orchestrator::storage::{
|
|
TaskStorage, StorageResult, TimeRange, AuditEntry, Metric, TaskEventType
|
|
};
|
|
|
|
/// Test basic storage initialization and health checks
|
|
test_all_backends!(test_storage_initialization, |storage, _gen| async move {
|
|
// Storage should already be initialized by test runner
|
|
StorageAssertions::assert_healthy(&storage).await?;
|
|
Ok(())
|
|
});
|
|
|
|
/// Test basic CRUD operations
|
|
test_all_backends!(test_basic_crud_operations, |storage, gen| async move {
|
|
let task = gen.workflow_task();
|
|
let task_id = task.id.clone();
|
|
|
|
// Test task creation via enqueue
|
|
storage.enqueue(task.clone(), 1).await?;
|
|
StorageAssertions::assert_task_count(&storage, 1).await?;
|
|
StorageAssertions::assert_queue_size(&storage, 1).await?;
|
|
|
|
// Test task retrieval
|
|
StorageAssertions::assert_task_exists(&storage, &task_id, &task.name).await?;
|
|
|
|
// Test task update
|
|
let mut updated_task = task.clone();
|
|
updated_task.status = TaskStatus::Running;
|
|
updated_task.started_at = Some(Utc::now());
|
|
storage.update_task(updated_task).await?;
|
|
|
|
StorageAssertions::assert_task_status(&storage, &task_id, TaskStatus::Running).await?;
|
|
|
|
// Test status-only update
|
|
storage.update_task_status(&task_id, TaskStatus::Completed).await?;
|
|
StorageAssertions::assert_task_status(&storage, &task_id, TaskStatus::Completed).await?;
|
|
|
|
Ok(())
|
|
});
|
|
|
|
/// Test queue operations and priority handling
|
|
test_all_backends!(test_queue_operations, |storage, gen| async move {
|
|
// Enqueue tasks with different priorities
|
|
let high_priority_task = gen.workflow_task();
|
|
let medium_priority_task = gen.workflow_task();
|
|
let low_priority_task = gen.workflow_task();
|
|
|
|
storage.enqueue(medium_priority_task.clone(), 5).await?;
|
|
storage.enqueue(low_priority_task.clone(), 1).await?;
|
|
storage.enqueue(high_priority_task.clone(), 10).await?;
|
|
|
|
StorageAssertions::assert_queue_size(&storage, 3).await?;
|
|
|
|
// Test peek (should return highest priority without removing)
|
|
let peeked = storage.peek().await?;
|
|
assert!(peeked.is_some());
|
|
StorageAssertions::assert_queue_size(&storage, 3).await?;
|
|
|
|
// Test dequeue (should return tasks in priority order)
|
|
let first = storage.dequeue().await?.unwrap();
|
|
let second = storage.dequeue().await?.unwrap();
|
|
let third = storage.dequeue().await?.unwrap();
|
|
let fourth = storage.dequeue().await?;
|
|
|
|
// Verify priority ordering (highest first)
|
|
assert_eq!(first.id, high_priority_task.id);
|
|
assert_eq!(second.id, medium_priority_task.id);
|
|
assert_eq!(third.id, low_priority_task.id);
|
|
assert!(fourth.is_none());
|
|
|
|
StorageAssertions::assert_queue_size(&storage, 0).await?;
|
|
|
|
Ok(())
|
|
});
|
|
|
|
/// Test task listing and filtering
|
|
test_all_backends!(test_task_listing_and_filtering, |storage, gen| async move {
|
|
// Create tasks with different statuses
|
|
let pending_task = gen.workflow_task_with_status(TaskStatus::Pending);
|
|
let running_task = gen.workflow_task_with_status(TaskStatus::Running);
|
|
let completed_task = gen.workflow_task_with_status(TaskStatus::Completed);
|
|
let failed_task = gen.workflow_task_with_status(TaskStatus::Failed);
|
|
|
|
// Enqueue all tasks
|
|
storage.enqueue(pending_task, 1).await?;
|
|
storage.enqueue(running_task, 1).await?;
|
|
storage.enqueue(completed_task, 1).await?;
|
|
storage.enqueue(failed_task, 1).await?;
|
|
|
|
// Test listing all tasks
|
|
let all_tasks = storage.list_tasks(None).await?;
|
|
assert_eq!(all_tasks.len(), 4);
|
|
|
|
// Test filtering by status
|
|
let pending_tasks = storage.list_tasks(Some(TaskStatus::Pending)).await?;
|
|
assert_eq!(pending_tasks.len(), 1);
|
|
assert_eq!(pending_tasks[0].status, TaskStatus::Pending);
|
|
|
|
let completed_tasks = storage.list_tasks(Some(TaskStatus::Completed)).await?;
|
|
assert_eq!(completed_tasks.len(), 1);
|
|
assert_eq!(completed_tasks[0].status, TaskStatus::Completed);
|
|
|
|
Ok(())
|
|
});
|
|
|
|
/// Test task requeuing functionality
|
|
test_all_backends!(test_task_requeuing, |storage, gen| async move {
|
|
let mut task = gen.workflow_task();
|
|
task.status = TaskStatus::Failed;
|
|
let task_id = task.id.clone();
|
|
|
|
// Enqueue a failed task
|
|
storage.enqueue(task, 1).await?;
|
|
|
|
// Initial queue should be empty (failed tasks not in active queue)
|
|
let initial_queue_size = storage.queue_size().await?;
|
|
|
|
// Requeue the failed task
|
|
let requeued = storage.requeue_failed_task(&task_id).await?;
|
|
assert!(requeued);
|
|
|
|
// Verify task was requeued
|
|
let final_queue_size = storage.queue_size().await?;
|
|
assert!(final_queue_size > initial_queue_size);
|
|
|
|
// Verify task status was reset to pending
|
|
StorageAssertions::assert_task_status(&storage, &task_id, TaskStatus::Pending).await?;
|
|
|
|
// Test requeuing a non-failed task should return false
|
|
let non_failed_task = gen.workflow_task_with_status(TaskStatus::Completed);
|
|
storage.enqueue(non_failed_task.clone(), 1).await?;
|
|
let not_requeued = storage.requeue_failed_task(&non_failed_task.id).await?;
|
|
assert!(!not_requeued);
|
|
|
|
Ok(())
|
|
});
|
|
|
|
/// Test batch operations
|
|
test_all_backends!(test_batch_operations, |storage, gen| async move {
|
|
// Test batch enqueue
|
|
let batch_tasks: Vec<(WorkflowTask, u8)> = (0..5)
|
|
.map(|i| (gen.workflow_task(), (i % 3 + 1) as u8))
|
|
.collect();
|
|
|
|
let task_ids: Vec<String> = batch_tasks.iter().map(|(task, _)| task.id.clone()).collect();
|
|
|
|
storage.enqueue_batch(batch_tasks).await?;
|
|
StorageAssertions::assert_task_count(&storage, 5).await?;
|
|
|
|
// Test batch get
|
|
let retrieved_tasks = storage.get_tasks_batch(task_ids).await?;
|
|
assert_eq!(retrieved_tasks.len(), 5);
|
|
|
|
// Test batch update
|
|
let mut update_tasks: Vec<WorkflowTask> = retrieved_tasks.into_values().collect();
|
|
for task in &mut update_tasks {
|
|
task.status = TaskStatus::Running;
|
|
}
|
|
|
|
storage.update_tasks_batch(update_tasks).await?;
|
|
|
|
// Verify all tasks were updated
|
|
let all_tasks = storage.list_tasks(Some(TaskStatus::Running)).await?;
|
|
assert_eq!(all_tasks.len(), 5);
|
|
|
|
Ok(())
|
|
});
|
|
|
|
/// Test advanced search functionality
|
|
test_all_backends!(test_advanced_search, |storage, gen| async move {
|
|
let now = Utc::now();
|
|
let past = now - Duration::hours(1);
|
|
|
|
// Create tasks at different times with different patterns
|
|
let mut old_task = gen.workflow_task();
|
|
old_task.name = "old_test_task".to_string();
|
|
old_task.created_at = past;
|
|
old_task.status = TaskStatus::Completed;
|
|
|
|
let mut recent_task = gen.workflow_task();
|
|
recent_task.name = "recent_test_task".to_string();
|
|
recent_task.created_at = now;
|
|
recent_task.status = TaskStatus::Pending;
|
|
|
|
let mut another_recent = gen.workflow_task();
|
|
another_recent.name = "another_recent".to_string();
|
|
another_recent.created_at = now;
|
|
another_recent.status = TaskStatus::Failed;
|
|
|
|
storage.enqueue(old_task, 1).await?;
|
|
storage.enqueue(recent_task, 1).await?;
|
|
storage.enqueue(another_recent, 1).await?;
|
|
|
|
// Test search by name pattern
|
|
let test_tasks = storage.search_tasks(
|
|
Some("test".to_string()),
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
).await?;
|
|
assert_eq!(test_tasks.len(), 2); // old_test_task and recent_test_task
|
|
|
|
// Test search by status filter
|
|
let failed_tasks = storage.search_tasks(
|
|
None,
|
|
Some(vec![TaskStatus::Failed]),
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
).await?;
|
|
assert_eq!(failed_tasks.len(), 1);
|
|
|
|
// Test search by time range
|
|
let recent_tasks = storage.search_tasks(
|
|
None,
|
|
None,
|
|
Some(now - Duration::minutes(30)),
|
|
None,
|
|
None,
|
|
None,
|
|
).await?;
|
|
assert_eq!(recent_tasks.len(), 2); // recent_test_task and another_recent
|
|
|
|
// Test search with limit
|
|
let limited_tasks = storage.search_tasks(
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
Some(1),
|
|
None,
|
|
).await?;
|
|
assert_eq!(limited_tasks.len(), 1);
|
|
|
|
Ok(())
|
|
});
|
|
|
|
/// Test dependency management
|
|
test_all_backends!(test_dependency_management, |storage, gen| async move {
|
|
let parent_task = gen.workflow_task();
|
|
let child_task = gen.workflow_task_with_deps(vec![parent_task.id.clone()]);
|
|
|
|
storage.enqueue(parent_task.clone(), 1).await?;
|
|
storage.enqueue(child_task.clone(), 1).await?;
|
|
|
|
// Test getting task dependencies
|
|
let deps = storage.get_task_dependencies(&child_task.id).await?;
|
|
assert_eq!(deps.len(), 1);
|
|
assert_eq!(deps[0], parent_task.id);
|
|
|
|
// Test getting dependent tasks
|
|
let dependents = storage.get_dependent_tasks(&parent_task.id).await?;
|
|
// Note: This test may vary based on implementation
|
|
// Some backends might not implement reverse dependency lookup
|
|
|
|
Ok(())
|
|
});
|
|
|
|
/// Test cleanup operations
|
|
test_all_backends!(test_cleanup_operations, |storage, gen| async move {
|
|
let now = Utc::now();
|
|
|
|
// Create old completed task
|
|
let mut old_completed = gen.workflow_task_with_status(TaskStatus::Completed);
|
|
old_completed.completed_at = Some(now - Duration::days(2));
|
|
|
|
// Create recent completed task
|
|
let mut recent_completed = gen.workflow_task_with_status(TaskStatus::Completed);
|
|
recent_completed.completed_at = Some(now);
|
|
|
|
// Create old running task (should not be cleaned up)
|
|
let mut old_running = gen.workflow_task_with_status(TaskStatus::Running);
|
|
old_running.created_at = now - Duration::days(2);
|
|
|
|
storage.enqueue(old_completed, 1).await?;
|
|
storage.enqueue(recent_completed, 1).await?;
|
|
storage.enqueue(old_running, 1).await?;
|
|
|
|
StorageAssertions::assert_task_count(&storage, 3).await?;
|
|
|
|
// Cleanup tasks older than 1 day
|
|
let cleaned_count = storage.cleanup_completed_tasks(Duration::days(1)).await?;
|
|
assert_eq!(cleaned_count, 1); // Only old_completed should be cleaned
|
|
|
|
StorageAssertions::assert_task_count(&storage, 2).await?;
|
|
|
|
Ok(())
|
|
});
|
|
|
|
/// Test audit log functionality
|
|
test_all_backends!(test_audit_log, |storage, gen| async move {
|
|
let task = gen.workflow_task();
|
|
let audit_entry = gen.audit_entry(task.id.clone());
|
|
|
|
// Record audit entry
|
|
storage.record_audit_entry(audit_entry.clone()).await?;
|
|
|
|
// Retrieve audit log
|
|
let time_range = TimeRange::last_hours(1);
|
|
let audit_log = storage.get_audit_log(time_range).await?;
|
|
|
|
assert_eq!(audit_log.len(), 1);
|
|
assert_eq!(audit_log[0].task_id, audit_entry.task_id);
|
|
assert_eq!(audit_log[0].operation, audit_entry.operation);
|
|
|
|
Ok(())
|
|
});
|
|
|
|
/// Test metrics functionality
|
|
test_all_backends!(test_metrics, |storage, gen| async move {
|
|
let metric1 = gen.metric("test_counter", 42.0);
|
|
let metric2 = gen.metric("test_gauge", 3.14);
|
|
|
|
// Record metrics
|
|
storage.record_metric(metric1.clone()).await?;
|
|
storage.record_metric(metric2.clone()).await?;
|
|
|
|
// Retrieve metrics
|
|
let time_range = TimeRange::last_hours(1);
|
|
let metrics = storage.get_metrics(time_range).await?;
|
|
|
|
assert_eq!(metrics.len(), 2);
|
|
let metric_names: Vec<&String> = metrics.iter().map(|m| &m.name).collect();
|
|
assert!(metric_names.contains(&&metric1.name));
|
|
assert!(metric_names.contains(&&metric2.name));
|
|
|
|
Ok(())
|
|
});
|
|
|
|
/// Test authentication functionality
|
|
test_all_backends!(test_authentication, |storage, _gen| async move {
|
|
// Test successful authentication
|
|
let token = storage.authenticate("test", "test").await?;
|
|
assert!(!token.token.is_empty());
|
|
assert_eq!(token.user_id, "test");
|
|
assert!(!token.is_expired());
|
|
|
|
// Test token validation
|
|
let validated = storage.validate_token(&token.token).await?;
|
|
assert!(validated.is_some());
|
|
assert_eq!(validated.unwrap().user_id, "test");
|
|
|
|
// Test invalid token validation
|
|
let invalid_validation = storage.validate_token("invalid_token").await?;
|
|
assert!(invalid_validation.is_none());
|
|
|
|
// Test failed authentication
|
|
let auth_result = storage.authenticate("invalid", "wrong").await;
|
|
assert!(auth_result.is_err());
|
|
|
|
Ok(())
|
|
});
|
|
|
|
/// Test event subscription and publishing
|
|
test_all_backends!(test_event_system, |storage, gen| async move {
|
|
// Create and publish events
|
|
let task = gen.workflow_task();
|
|
let event1 = gen.task_event(task.id.clone(), TaskEventType::Created);
|
|
let event2 = gen.task_event(task.id.clone(), TaskEventType::StatusChanged);
|
|
|
|
storage.publish_event(event1).await?;
|
|
storage.publish_event(event2).await?;
|
|
|
|
// Test event subscription
|
|
let mut event_stream = storage.subscribe_to_events(None).await?;
|
|
|
|
// Collect events from stream
|
|
let mut events = Vec::new();
|
|
while let Some(event) = event_stream.next().await {
|
|
events.push(event);
|
|
if events.len() >= 2 {
|
|
break;
|
|
}
|
|
}
|
|
|
|
assert_eq!(events.len(), 2);
|
|
assert_eq!(events[0].task_id, task.id);
|
|
|
|
Ok(())
|
|
});
|
|
|
|
/// Test backup and restore functionality
|
|
test_all_backends!(test_backup_restore, |storage, gen| async move {
|
|
// Create some test data
|
|
let task = gen.workflow_task();
|
|
storage.enqueue(task.clone(), 1).await?;
|
|
|
|
// Create backup
|
|
let backup_path = "/tmp/test_backup.json";
|
|
let backup_result = storage.create_backup(backup_path).await;
|
|
|
|
// Note: Some backends might not support backup/restore
|
|
match backup_result {
|
|
Ok(_) => {
|
|
// If backup succeeds, test restore
|
|
let restore_result = storage.restore_from_backup(backup_path).await;
|
|
// Restore might also not be supported
|
|
match restore_result {
|
|
Ok(_) => {
|
|
// Verify data still exists after restore
|
|
StorageAssertions::assert_task_exists(&storage, &task.id, &task.name).await?;
|
|
}
|
|
Err(_) => {
|
|
// Restore not supported, but backup worked
|
|
println!("Backup supported but restore not implemented");
|
|
}
|
|
}
|
|
}
|
|
Err(_) => {
|
|
// Backup not supported by this backend
|
|
println!("Backup/restore not supported by this backend");
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
});
|
|
|
|
/// Test storage statistics
|
|
test_all_backends!(test_storage_statistics, |storage, gen| async move {
|
|
// Add some test data
|
|
let tasks = gen.workflow_tasks_batch(5);
|
|
for (i, task) in tasks.into_iter().enumerate() {
|
|
storage.enqueue(task, (i % 3 + 1) as u8).await?;
|
|
}
|
|
|
|
// Get statistics
|
|
let stats = storage.get_statistics().await?;
|
|
|
|
assert_eq!(stats.total_tasks, 5);
|
|
assert!(stats.pending_tasks > 0);
|
|
|
|
// Statistics should be consistent with actual data
|
|
let total = stats.pending_tasks + stats.running_tasks + stats.completed_tasks + stats.failed_tasks;
|
|
assert_eq!(total, stats.total_tasks);
|
|
|
|
Ok(())
|
|
});
|
|
|
|
/// Test concurrent operations
|
|
test_all_backends!(test_concurrent_operations, |storage, gen| async move {
|
|
use tokio::task::JoinSet;
|
|
|
|
let mut join_set = JoinSet::new();
|
|
let storage = std::sync::Arc::new(storage);
|
|
|
|
// Spawn multiple concurrent enqueue operations
|
|
for i in 0..10 {
|
|
let storage_clone = storage.clone();
|
|
let task = gen.workflow_task();
|
|
join_set.spawn(async move {
|
|
storage_clone.enqueue(task, (i % 5 + 1) as u8).await
|
|
});
|
|
}
|
|
|
|
// Wait for all operations to complete
|
|
let mut success_count = 0;
|
|
while let Some(result) = join_set.join_next().await {
|
|
if result.unwrap().is_ok() {
|
|
success_count += 1;
|
|
}
|
|
}
|
|
|
|
// All operations should succeed
|
|
assert_eq!(success_count, 10);
|
|
StorageAssertions::assert_task_count(&storage, 10).await?;
|
|
|
|
// Test concurrent dequeue operations
|
|
let mut join_set = JoinSet::new();
|
|
|
|
for _ in 0..5 {
|
|
let storage_clone = storage.clone();
|
|
join_set.spawn(async move {
|
|
storage_clone.dequeue().await
|
|
});
|
|
}
|
|
|
|
let mut dequeued_count = 0;
|
|
while let Some(result) = join_set.join_next().await {
|
|
if let Ok(Some(_)) = result.unwrap() {
|
|
dequeued_count += 1;
|
|
}
|
|
}
|
|
|
|
// Should have dequeued 5 tasks
|
|
assert_eq!(dequeued_count, 5);
|
|
|
|
Ok(())
|
|
});
|
|
|
|
/// Test error handling and edge cases
|
|
test_all_backends!(test_error_handling, |storage, _gen| async move {
|
|
// Test getting non-existent task
|
|
let result = storage.get_task("non_existent_id").await?;
|
|
assert!(result.is_none());
|
|
|
|
// Test updating non-existent task status
|
|
let result = storage.update_task_status("non_existent_id", TaskStatus::Completed).await;
|
|
assert!(result.is_err());
|
|
|
|
// Test dequeue from empty queue
|
|
// First ensure queue is empty
|
|
while storage.dequeue().await?.is_some() {
|
|
// Drain the queue
|
|
}
|
|
|
|
let result = storage.dequeue().await?;
|
|
assert!(result.is_none());
|
|
|
|
// Test peek empty queue
|
|
let result = storage.peek().await?;
|
|
assert!(result.is_none());
|
|
|
|
Ok(())
|
|
});
|
|
|
|
/// Performance test for bulk operations
|
|
test_all_backends!(test_bulk_performance, |storage, gen| async move {
|
|
use std::time::Instant;
|
|
|
|
let batch_size = 100;
|
|
let start = Instant::now();
|
|
|
|
// Generate large batch of tasks
|
|
let batch_tasks: Vec<(WorkflowTask, u8)> = (0..batch_size)
|
|
.map(|i| (gen.workflow_task(), (i % 10 + 1) as u8))
|
|
.collect();
|
|
|
|
// Test batch enqueue performance
|
|
storage.enqueue_batch(batch_tasks).await?;
|
|
let enqueue_duration = start.elapsed();
|
|
|
|
println!("Batch enqueue of {} tasks took: {:?}", batch_size, enqueue_duration);
|
|
|
|
// Verify all tasks were stored
|
|
StorageAssertions::assert_task_count(&storage, batch_size).await?;
|
|
|
|
// Test bulk retrieval performance
|
|
let start = Instant::now();
|
|
let all_tasks = storage.list_tasks(None).await?;
|
|
let list_duration = start.elapsed();
|
|
|
|
println!("List all {} tasks took: {:?}", batch_size, list_duration);
|
|
assert_eq!(all_tasks.len(), batch_size);
|
|
|
|
// Performance should be reasonable (adjust thresholds based on requirements)
|
|
assert!(enqueue_duration.as_millis() < 5000, "Batch enqueue too slow");
|
|
assert!(list_duration.as_millis() < 1000, "List tasks too slow");
|
|
|
|
Ok(())
|
|
});
|
|
|
|
// Backend-specific tests that only run when features are available
|
|
|
|
#[cfg(feature = "surrealdb")]
|
|
test_with_backend!(test_surrealdb_embedded_specific, "surrealdb-embedded", |storage, gen| async move {
|
|
// Test SurrealDB embedded specific functionality
|
|
let task = gen.workflow_task();
|
|
storage.enqueue(task.clone(), 1).await?;
|
|
|
|
// SurrealDB should handle complex queries efficiently
|
|
let complex_search = storage.search_tasks(
|
|
Some("Test".to_string()),
|
|
Some(vec![TaskStatus::Pending, TaskStatus::Running]),
|
|
Some(Utc::now() - Duration::hours(1)),
|
|
Some(Utc::now() + Duration::hours(1)),
|
|
Some(10),
|
|
Some(0),
|
|
).await?;
|
|
|
|
// Should find the task we just added
|
|
assert!(!complex_search.is_empty());
|
|
|
|
Ok(())
|
|
});
|
|
|
|
#[cfg(feature = "surrealdb")]
|
|
test_with_backend!(test_surrealdb_server_specific, "surrealdb-server", |storage, gen| async move {
|
|
// Test SurrealDB server specific functionality
|
|
let task = gen.workflow_task();
|
|
storage.enqueue(task.clone(), 1).await?;
|
|
|
|
// Test that server connection is working
|
|
StorageAssertions::assert_healthy(&storage).await?;
|
|
|
|
// Server backend should support all advanced features
|
|
let stats = storage.get_statistics().await?;
|
|
assert_eq!(stats.total_tasks, 1);
|
|
|
|
Ok(())
|
|
});
|
|
|
|
/// Integration test that verifies all storage backends have consistent behavior
|
|
#[tokio::test]
|
|
async fn test_cross_backend_consistency() {
|
|
let mut runner = StorageTestRunner::new();
|
|
let gen = TestDataGenerator::new();
|
|
|
|
// Create the same test data for each backend
|
|
let test_tasks: Vec<WorkflowTask> = (0..3)
|
|
.map(|_| gen.workflow_task())
|
|
.collect();
|
|
|
|
let mut results = Vec::new();
|
|
|
|
// Run the same operations on all backends
|
|
let configs = runner.env.all_storage_configs();
|
|
|
|
for config in configs {
|
|
let storage = orchestrator::storage::create_storage(config.clone()).await
|
|
.expect(&format!("Failed to create {} storage", config.storage_type));
|
|
|
|
storage.init().await
|
|
.expect(&format!("Failed to initialize {} storage", config.storage_type));
|
|
|
|
// Perform identical operations
|
|
for (i, task) in test_tasks.iter().enumerate() {
|
|
storage.enqueue(task.clone(), (i + 1) as u8).await.unwrap();
|
|
}
|
|
|
|
let all_tasks = storage.list_tasks(None).await.unwrap();
|
|
let queue_size = storage.queue_size().await.unwrap();
|
|
let total_tasks = storage.total_tasks().await.unwrap();
|
|
|
|
results.push((config.storage_type.clone(), all_tasks.len(), queue_size, total_tasks));
|
|
}
|
|
|
|
// Verify all backends produced consistent results
|
|
if results.len() > 1 {
|
|
let first = &results[0];
|
|
for result in &results[1..] {
|
|
assert_eq!(first.1, result.1, "Task count mismatch between {} and {}", first.0, result.0);
|
|
assert_eq!(first.2, result.2, "Queue size mismatch between {} and {}", first.0, result.0);
|
|
assert_eq!(first.3, result.3, "Total tasks mismatch between {} and {}", first.0, result.0);
|
|
}
|
|
}
|
|
|
|
println!("Cross-backend consistency verified for {} backends", results.len());
|
|
} |