use crate::error::{AnalyticsError, Result}; use crate::events::*; use chrono::{Duration, Utc}; use dashmap::DashMap; use std::collections::VecDeque; use std::sync::Arc; use tokio::sync::mpsc; use tracing::debug; /// Streaming pipeline for event processing #[derive(Clone)] pub struct EventPipeline { event_tx: mpsc::UnboundedSender, event_rx: Arc>>, alerts_tx: mpsc::UnboundedSender, time_windows: Arc>>, } impl EventPipeline { /// Create new event pipeline pub fn new(external_alert_tx: mpsc::UnboundedSender) -> (Self, mpsc::UnboundedSender) { let (event_tx, event_rx) = mpsc::unbounded_channel(); let pipeline = Self { event_tx, event_rx: Arc::new(tokio::sync::Mutex::new(event_rx)), alerts_tx: external_alert_tx.clone(), time_windows: Arc::new(DashMap::new()), }; (pipeline, external_alert_tx) } /// Emit an event into the pipeline pub async fn emit_event(&self, event: AgentEvent) -> Result<()> { self.event_tx.send(event).map_err(|e| { AnalyticsError::ChannelError(format!("Failed to emit event: {}", e)) })?; Ok(()) } /// Start processing events from the pipeline pub async fn run(&self, window_duration_secs: u64) -> Result<()> { let mut rx = self.event_rx.lock().await; let time_windows = self.time_windows.clone(); let alerts_tx = self.alerts_tx.clone(); while let Some(event) = rx.recv().await { debug!("Processing event: {:?}", event.event_type); // Store in time window let window_key = format!( "{}_{}", event.event_type.as_str(), event.timestamp.timestamp() / (window_duration_secs as i64) ); time_windows .entry(window_key.clone()) .or_insert_with(VecDeque::new) .push_back(event.clone()); // Check for alerts if event.event_type.is_error() { let alert = Alert { id: uuid::Uuid::new_v4().to_string(), level: AlertLevel::Warning, message: format!( "Error in agent {}: {}", event.agent_id, event.error.clone().unwrap_or_default() ), affected_agents: vec![event.agent_id.clone()], affected_tasks: event.task_id.clone().into_iter().collect(), triggered_at: Utc::now(), resolution: None, }; alerts_tx.send(alert).ok(); } // Check for performance degradation if let Some(duration) = event.duration_ms { if duration > 30_000 { let alert = Alert { id: uuid::Uuid::new_v4().to_string(), level: AlertLevel::Warning, message: format!( "Slow task execution: {} took {}ms", event.agent_id, duration ), affected_agents: vec![event.agent_id.clone()], affected_tasks: event.task_id.clone().into_iter().collect(), triggered_at: Utc::now(), resolution: Some("Consider scaling or optimization".to_string()), }; alerts_tx.send(alert).ok(); } } } Ok(()) } /// Get aggregated statistics for a time window pub async fn get_window_stats( &self, event_type: EventType, window_secs: u64, ) -> Result { let now = Utc::now(); let window_start = now - Duration::seconds(window_secs as i64); let mut total_events = 0u64; let mut agents = std::collections::HashSet::new(); let mut durations = Vec::new(); let mut error_count = 0u64; let mut success_count = 0u64; for entry in self.time_windows.iter() { for event in entry.value().iter() { if event.event_type == event_type && event.timestamp > window_start { total_events += 1; agents.insert(event.agent_id.clone()); if let Some(duration) = event.duration_ms { durations.push(duration); } if event.event_type.is_error() { error_count += 1; } else if event.event_type.is_success() { success_count += 1; } } } } let avg_duration = if !durations.is_empty() { durations.iter().sum::() as f64 / durations.len() as f64 } else { 0.0 }; Ok(EventAggregation { window_start, window_end: now, event_type, total_events, distinct_agents: agents.len() as u32, avg_duration_ms: avg_duration, error_count, success_count, }) } /// Filter events by criteria pub fn filter_events(&self, predicate: F) -> Vec where F: Fn(&AgentEvent) -> bool, { self.time_windows .iter() .flat_map(|entry| { entry .value() .iter() .filter(|event| predicate(event)) .cloned() .collect::>() }) .collect() } /// Get error rate in last N seconds pub async fn get_error_rate(&self, window_secs: u64) -> Result { let now = Utc::now(); let window_start = now - Duration::seconds(window_secs as i64); let mut total = 0u64; let mut errors = 0u64; for entry in self.time_windows.iter() { for event in entry.value().iter() { if event.timestamp > window_start { total += 1; if event.event_type.is_error() { errors += 1; } } } } if total == 0 { Ok(0.0) } else { Ok(errors as f64 / total as f64) } } /// Get throughput (events per second) pub async fn get_throughput(&self, window_secs: u64) -> Result { let now = Utc::now(); let window_start = now - Duration::seconds(window_secs as i64); let mut count = 0u64; for entry in self.time_windows.iter() { for event in entry.value().iter() { if event.timestamp > window_start { count += 1; } } } Ok(count as f64 / window_secs as f64) } /// Get top N agents by task completion pub async fn get_top_agents(&self, limit: usize) -> Result> { let mut agent_counts: std::collections::HashMap = std::collections::HashMap::new(); for entry in self.time_windows.iter() { for event in entry.value().iter() { if event.event_type.is_success() { *agent_counts.entry(event.agent_id.clone()).or_insert(0) += 1; } } } let mut agents: Vec<_> = agent_counts.into_iter().collect(); agents.sort_by(|a, b| b.1.cmp(&a.1)); Ok(agents.into_iter().take(limit).collect()) } } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn test_pipeline_creation() { let (_alert_tx, alert_rx) = mpsc::unbounded_channel(); let (_pipeline, _alerts) = EventPipeline::new(_alert_tx); assert!(alert_rx.is_empty()); } #[tokio::test] async fn test_emit_event() { let (alert_tx, _alert_rx) = mpsc::unbounded_channel(); let (pipeline, _alerts) = EventPipeline::new(alert_tx); let event = AgentEvent::new_task_completed( "agent-1".to_string(), "task-1".to_string(), 1000, 100, 50, ); assert!(pipeline.emit_event(event).await.is_ok()); } #[tokio::test] async fn test_filter_events() { let (alert_tx, _alert_rx) = mpsc::unbounded_channel(); let (pipeline, _alerts) = EventPipeline::new(alert_tx); // Spawn pipeline processor in background let pipeline_clone = pipeline.clone(); tokio::spawn(async move { pipeline_clone.run(60).await.ok(); }); let event1 = AgentEvent::new_task_completed( "agent-1".to_string(), "task-1".to_string(), 1000, 100, 50, ); let event2 = AgentEvent::new_task_failed( "agent-2".to_string(), "task-2".to_string(), "error".to_string(), ); pipeline.emit_event(event1).await.ok(); pipeline.emit_event(event2).await.ok(); // Give pipeline time to process events tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; let filtered = pipeline.filter_events(|e| e.event_type.is_error()); assert_eq!(filtered.len(), 1); } }