//! Unified Analytics Data Collector //! //! Orchestrates collection from all log sources: //! - Navigation tracking (JSONL logs) //! - Route cache performance //! - Server logs //! - Browser logs //! //! Provides real-time streaming and batch processing capabilities. use super::{ AnalyticsConfig, AnalyticsEvent, AnalyticsMetrics, EventLevel, BrowserMetrics, CacheMetrics, NavigationMetrics, ServerMetrics, }; use anyhow::{Context, Result}; use chrono::{DateTime, Duration, Utc}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, VecDeque}; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; /// Log source types #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Hash, Eq)] pub enum LogSource { Navigation, RouteCache, Server, Browser, System, } impl LogSource { pub fn as_str(&self) -> &'static str { match self { LogSource::Navigation => "navigation", LogSource::RouteCache => "route_cache", LogSource::Server => "server", LogSource::Browser => "browser", LogSource::System => "system", } } } /// Analytics collector configuration #[derive(Debug, Clone)] pub struct CollectorConfig { pub batch_size: usize, pub flush_interval_seconds: u64, pub max_memory_events: usize, pub data_directory: PathBuf, } impl Default for CollectorConfig { fn default() -> Self { Self { batch_size: 100, flush_interval_seconds: 30, max_memory_events: 10000, data_directory: PathBuf::from("logs/analytics"), } } } /// Main analytics collector pub struct AnalyticsCollector { config: CollectorConfig, event_buffer: Arc>>, sender: Option>, receiver: Option>, // Source-specific collectors navigation_collector: Option, server_collector: Option, browser_collector: Option, // Metrics aggregation metrics_cache: Arc>>, } impl AnalyticsCollector { /// Create new analytics collector pub fn new(config: &AnalyticsConfig) -> Result { let collector_config = CollectorConfig { data_directory: config.data_directory.clone(), ..CollectorConfig::default() }; // Create data directory if it doesn't exist std::fs::create_dir_all(&collector_config.data_directory) .context("Failed to create analytics data directory")?; let (sender, receiver) = mpsc::unbounded_channel(); Ok(Self { config: collector_config, event_buffer: Arc::new(Mutex::new(VecDeque::new())), sender: Some(sender), receiver: Some(receiver), navigation_collector: None, server_collector: None, browser_collector: None, metrics_cache: Arc::new(Mutex::new(HashMap::new())), }) } /// Start collection from all sources pub async fn start(&mut self) -> Result<()> { tracing::info!("Starting analytics collection..."); // Initialize source-specific collectors self.init_collectors().await?; // Start event processing task if let Some(receiver) = self.receiver.take() { let buffer = Arc::clone(&self.event_buffer); let config = self.config.clone(); let metrics_cache = Arc::clone(&self.metrics_cache); tokio::spawn(async move { Self::process_events(receiver, buffer, config, metrics_cache).await; }); } tracing::info!("Analytics collection started"); Ok(()) } /// Stop collection pub async fn stop(&mut self) -> Result<()> { tracing::info!("Stopping analytics collection..."); // Flush any remaining events self.flush_events().await?; if let Some(sender) = &self.sender { sender.send(AnalyticsEvent { id: super::generate_event_id(), timestamp: Utc::now(), source: LogSource::System, event_type: "shutdown".to_string(), session_id: None, path: None, level: EventLevel::Info, message: "Analytics collector shutting down".to_string(), metadata: HashMap::new(), duration_ms: None, errors: Vec::new(), })?; } tracing::info!("Analytics collection stopped"); Ok(()) } /// Initialize source-specific collectors async fn init_collectors(&mut self) -> Result<()> { let sender = self.sender.as_ref() .ok_or_else(|| anyhow::anyhow!("Sender not available"))? .clone(); // Initialize navigation collector if let Ok(nav_collector) = super::navigation::NavigationCollector::new(sender.clone()).await { tracing::debug!("Navigation collector initialized"); self.navigation_collector = Some(nav_collector); } else { tracing::warn!("Failed to initialize navigation collector"); } // Initialize server collector if let Ok(server_collector) = super::rustelo_server::ServerCollector::new(sender.clone()).await { tracing::debug!("Server collector initialized"); self.server_collector = Some(server_collector); } else { tracing::warn!("Failed to initialize server collector"); } // Initialize browser collector if let Ok(browser_collector) = super::browser::BrowserCollector::new(sender.clone()).await { tracing::debug!("Browser collector initialized"); self.browser_collector = Some(browser_collector); } else { tracing::warn!("Failed to initialize browser collector"); } Ok(()) } /// Process events from the channel async fn process_events( mut receiver: mpsc::UnboundedReceiver, buffer: Arc>>, config: CollectorConfig, metrics_cache: Arc>>, ) { let mut flush_interval = tokio::time::interval( tokio::time::Duration::from_secs(config.flush_interval_seconds) ); loop { tokio::select! { // Process incoming events event = receiver.recv() => { match event { Some(event) => { // Add to buffer { let mut buffer_guard = buffer.lock().unwrap(); buffer_guard.push_back(event.clone()); // Prevent memory overflow while buffer_guard.len() > config.max_memory_events { buffer_guard.pop_front(); } } // Update metrics cache Self::update_metrics_cache(&event, &metrics_cache); // Check if we need to flush let buffer_len = buffer.lock().unwrap().len(); if buffer_len >= config.batch_size { if let Err(e) = Self::flush_buffer(&buffer, &config).await { tracing::error!("Failed to flush event buffer: {}", e); } } } None => { tracing::info!("Analytics event channel closed"); break; } } } // Periodic flush _ = flush_interval.tick() => { if let Err(e) = Self::flush_buffer(&buffer, &config).await { tracing::error!("Failed to flush event buffer: {}", e); } } } } } /// Update metrics cache with new event fn update_metrics_cache( event: &AnalyticsEvent, metrics_cache: &Arc>> ) { let period_key = format!("hour_{}", event.timestamp.format("%Y%m%d_%H")); let mut cache = metrics_cache.lock().unwrap(); let metrics = cache.entry(period_key).or_insert_with(|| { AnalyticsMetrics { period_start: event.timestamp.with_minute(0).unwrap().with_second(0).unwrap(), period_end: event.timestamp.with_minute(59).unwrap().with_second(59).unwrap(), navigation: NavigationMetrics { total_requests: 0, route_resolutions: 0, language_switches: 0, avg_resolution_time_ms: 0.0, slow_routes_count: 0, error_count: 0, }, cache: CacheMetrics { total_requests: 0, hit_count: 0, miss_count: 0, hit_rate: 0.0, evictions: 0, expired_entries: 0, }, server: ServerMetrics { total_requests: 0, error_count: 0, panic_count: 0, avg_response_time_ms: 0.0, memory_usage_mb: None, cpu_usage_percent: None, }, browser: BrowserMetrics { console_errors: 0, console_warnings: 0, hydration_mismatches: 0, javascript_errors: 0, performance_issues: 0, }, } }); // Update metrics based on event type and source match event.source { LogSource::Navigation => { metrics.navigation.total_requests += 1; if event.event_type == "RouteResolution" { metrics.navigation.route_resolutions += 1; } if event.event_type == "LanguageSwitch" { metrics.navigation.language_switches += 1; } if !event.errors.is_empty() { metrics.navigation.error_count += 1; } if let Some(duration) = event.duration_ms { if duration > 10 { metrics.navigation.slow_routes_count += 1; } // Update average (simplified) metrics.navigation.avg_resolution_time_ms = (metrics.navigation.avg_resolution_time_ms + duration as f64) / 2.0; } } LogSource::RouteCache => { metrics.cache.total_requests += 1; if let Some(hit_val) = event.metadata.get("cache_hit") { if hit_val.as_bool().unwrap_or(false) { metrics.cache.hit_count += 1; } else { metrics.cache.miss_count += 1; } metrics.cache.hit_rate = metrics.cache.hit_count as f64 / metrics.cache.total_requests as f64; } } LogSource::Server => { metrics.server.total_requests += 1; if event.level >= EventLevel::Error { metrics.server.error_count += 1; } if event.message.contains("panic") { metrics.server.panic_count += 1; } if let Some(duration) = event.duration_ms { metrics.server.avg_response_time_ms = (metrics.server.avg_response_time_ms + duration as f64) / 2.0; } } LogSource::Browser => { match event.level { EventLevel::Error => metrics.browser.console_errors += 1, EventLevel::Warn => metrics.browser.console_warnings += 1, _ => {} } if event.event_type == "hydration_mismatch" { metrics.browser.hydration_mismatches += 1; } if event.event_type == "javascript_error" { metrics.browser.javascript_errors += 1; } } LogSource::System => { // System events don't affect business metrics } } } /// Flush event buffer to disk async fn flush_buffer( buffer: &Arc>>, config: &CollectorConfig, ) -> Result<()> { let events: Vec = { let mut buffer_guard = buffer.lock().unwrap(); let events = buffer_guard.drain(..).collect(); events }; if events.is_empty() { return Ok(()); } tracing::debug!("Flushing {} events to disk", events.len()); // Write events to daily log file let date_str = Utc::now().format("%Y%m%d"); let log_file = config.data_directory.join(format!("analytics_{}.jsonl", date_str)); let mut log_content = String::new(); for event in events { if let Ok(json) = serde_json::to_string(&event) { log_content.push_str(&json); log_content.push('\n'); } } tokio::fs::write(&log_file, log_content).await .with_context(|| format!("Failed to write analytics log to {:?}", log_file))?; Ok(()) } /// Manually flush events pub async fn flush_events(&self) -> Result<()> { Self::flush_buffer(&self.event_buffer, &self.config).await } /// Send event to collector pub fn send_event(&self, event: AnalyticsEvent) -> Result<()> { if let Some(sender) = &self.sender { sender.send(event)?; } Ok(()) } /// Get aggregated metrics for a time period pub async fn get_aggregated_metrics(&self, period_hours: u32) -> Result { let now = Utc::now(); let start_time = now - Duration::hours(period_hours as i64); let mut aggregated = AnalyticsMetrics { period_start: start_time, period_end: now, navigation: NavigationMetrics { total_requests: 0, route_resolutions: 0, language_switches: 0, avg_resolution_time_ms: 0.0, slow_routes_count: 0, error_count: 0, }, cache: CacheMetrics { total_requests: 0, hit_count: 0, miss_count: 0, hit_rate: 0.0, evictions: 0, expired_entries: 0, }, server: ServerMetrics { total_requests: 0, error_count: 0, panic_count: 0, avg_response_time_ms: 0.0, memory_usage_mb: None, cpu_usage_percent: None, }, browser: BrowserMetrics { console_errors: 0, console_warnings: 0, hydration_mismatches: 0, javascript_errors: 0, performance_issues: 0, }, }; // Aggregate from cache and disk if needed let cache = self.metrics_cache.lock().unwrap(); for (_, metrics) in cache.iter() { if metrics.period_start >= start_time && metrics.period_end <= now { // Add to aggregated metrics aggregated.navigation.total_requests += metrics.navigation.total_requests; aggregated.navigation.route_resolutions += metrics.navigation.route_resolutions; aggregated.navigation.language_switches += metrics.navigation.language_switches; aggregated.navigation.slow_routes_count += metrics.navigation.slow_routes_count; aggregated.navigation.error_count += metrics.navigation.error_count; aggregated.cache.total_requests += metrics.cache.total_requests; aggregated.cache.hit_count += metrics.cache.hit_count; aggregated.cache.miss_count += metrics.cache.miss_count; aggregated.server.total_requests += metrics.server.total_requests; aggregated.server.error_count += metrics.server.error_count; aggregated.server.panic_count += metrics.server.panic_count; aggregated.browser.console_errors += metrics.browser.console_errors; aggregated.browser.console_warnings += metrics.browser.console_warnings; aggregated.browser.hydration_mismatches += metrics.browser.hydration_mismatches; aggregated.browser.javascript_errors += metrics.browser.javascript_errors; } } // Calculate derived metrics if aggregated.cache.total_requests > 0 { aggregated.cache.hit_rate = aggregated.cache.hit_count as f64 / aggregated.cache.total_requests as f64; } Ok(aggregated) } /// Get events from buffer (for real-time monitoring) pub fn get_recent_events(&self, limit: usize) -> Vec { let buffer = self.event_buffer.lock().unwrap(); buffer.iter() .rev() .take(limit) .cloned() .collect() } }