use std::collections::HashSet; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; use dashmap::DashMap; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast; use tracing::debug; /// Broadcast capacity per `NotificationStore`. /// Old events are overwritten when the buffer is full — this is intentional: /// SSE clients that lag behind lose events but reconnect and resume via /// polling. const BROADCAST_CAPACITY: usize = 256; /// Notification events that require acknowledgment before git commit. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum NotificationEvent { OntologyChanged, AdrChanged, ReflectionChanged, /// User-emitted notification with a free-form kind and payload. Custom, } impl NotificationEvent { /// Map a relative file path to its notification event type. /// Returns `None` for paths outside watched directories. pub fn from_path(relative_path: &str) -> Option { if relative_path.starts_with(".ontology/") || relative_path.starts_with(".ontology\\") { Some(Self::OntologyChanged) } else if relative_path.starts_with("adrs/") || relative_path.starts_with("adrs\\") { Some(Self::AdrChanged) } else if relative_path.starts_with("reflection/") || relative_path.starts_with("reflection\\") { Some(Self::ReflectionChanged) } else { None } } /// NATS subject suffix for this event type. pub fn nats_suffix(&self) -> &'static str { match self { Self::OntologyChanged => "notification.ontology", Self::AdrChanged => "notification.adr", Self::ReflectionChanged => "notification.reflection", Self::Custom => "notification.custom", } } } #[derive(Debug, Clone, Serialize)] pub struct Notification { pub id: u64, pub project: String, pub event: NotificationEvent, pub files: Vec, pub timestamp: u64, pub source_actor: Option, pub acked_by: HashSet, /// Populated for `NotificationEvent::Custom` notifications only. pub custom_kind: Option, pub custom_title: Option, pub custom_payload: Option, /// Source project slug for cross-project notifications. pub source_project: Option, } /// Request body for user-emitted notifications (REST + UI form). #[derive(Debug, Deserialize)] pub struct EmitRequest { /// Target project slug. Required in multi-project mode. #[serde(default)] pub target_project: Option, /// Free-form kind label, e.g. "backlog_delegation", "alert", "cross_ref". pub kind: String, pub title: String, #[serde(default)] pub payload: Option, #[serde(default)] pub source_actor: Option, } /// Per-project notification ring buffer with SSE broadcast support. /// /// Stores the last `capacity` notifications. Thread-safe via DashMap /// for per-project isolation + AtomicU64 for the global sequence counter. /// `event_tx` broadcasts every new notification to SSE subscribers. pub struct NotificationStore { /// project → ordered notifications (newest last) projects: DashMap>, sequence: AtomicU64, capacity: usize, /// Directories that require acknowledgment (e.g., [".ontology", "adrs"]) ack_required: Vec, /// Broadcast channel — SSE handlers subscribe via `subscribe()`. event_tx: broadcast::Sender, } impl NotificationStore { pub fn new(capacity: usize, ack_required: Vec) -> Self { let (event_tx, _) = broadcast::channel(BROADCAST_CAPACITY); Self { projects: DashMap::new(), sequence: AtomicU64::new(1), capacity, ack_required, event_tx, } } /// Subscribe to live notification events for SSE delivery. /// Returns a `broadcast::Receiver` that receives every new /// `NotificationView` pushed via `push()` or `push_custom()`. pub fn subscribe(&self) -> broadcast::Receiver { self.event_tx.subscribe() } /// Push notifications for changed files in a project, grouped by event /// type. /// /// A single file batch may contain files from multiple watched directories /// (e.g., `.ontology/` and `adrs/`). This method creates one notification /// per distinct event type, so no changes are silently swallowed. /// /// Returns the IDs of created notifications (empty if none required ack). pub fn push( &self, project: &str, files: Vec, source_actor: Option, ) -> Vec { // Group files by event type — each group becomes a separate notification let mut by_event: std::collections::HashMap> = std::collections::HashMap::new(); for file in files { if let Some(event) = NotificationEvent::from_path(&file) { by_event.entry(event).or_default().push(file); } } let mut ids = Vec::new(); let now = epoch_secs(); for (event, event_files) in by_event { if !self.requires_ack(&event) { continue; } let id = self.sequence.fetch_add(1, Ordering::Relaxed); let notification = Notification { id, project: project.to_string(), event, files: event_files, timestamp: now, source_actor: source_actor.clone(), acked_by: HashSet::new(), custom_kind: None, custom_title: None, custom_payload: None, source_project: None, }; debug!( id, project, event = ?notification.event, file_count = notification.files.len(), "notification created" ); let mut ring = self.projects.entry(project.to_string()).or_default(); ring.push(notification); // Trim to capacity if ring.len() > self.capacity { let excess = ring.len() - self.capacity; ring.drain(..excess); } // Broadcast to SSE subscribers — lagging receivers get a `Lagged` error. if let Some(view) = ring.last().map(NotificationView::from) { let _ = self.event_tx.send(view); } ids.push(id); } ids } /// Emit a user-authored notification directly into this project's ring /// buffer. /// /// Unlike `push()`, this bypasses file-path classification and is always /// stored. Returns the new notification ID. pub fn push_custom( &self, project: &str, kind: impl Into, title: impl Into, payload: Option, source_actor: Option, source_project: Option, ) -> u64 { let id = self.sequence.fetch_add(1, Ordering::Relaxed); let notification = Notification { id, project: project.to_string(), event: NotificationEvent::Custom, files: vec![], timestamp: epoch_secs(), source_actor, acked_by: HashSet::new(), custom_kind: Some(kind.into()), custom_title: Some(title.into()), custom_payload: payload, source_project, }; let mut ring = self.projects.entry(project.to_string()).or_default(); ring.push(notification); if ring.len() > self.capacity { let excess = ring.len() - self.capacity; ring.drain(..excess); } if let Some(view) = ring.last().map(NotificationView::from) { let _ = self.event_tx.send(view); } id } /// Get pending (unacknowledged) notifications for a specific actor token. pub fn pending(&self, project: &str, token: &str) -> Vec { let ring = match self.projects.get(project) { Some(r) => r, None => return Vec::new(), }; ring.iter() .filter(|n| !n.acked_by.contains(token)) .map(NotificationView::from) .collect() } /// Count of pending notifications for a token. pub fn pending_count(&self, project: &str, token: &str) -> usize { let ring = match self.projects.get(project) { Some(r) => r, None => return 0, }; ring.iter().filter(|n| !n.acked_by.contains(token)).count() } /// Acknowledge all pending notifications for a token. pub fn ack_all(&self, project: &str, token: &str) -> usize { let mut ring = match self.projects.get_mut(project) { Some(r) => r, None => return 0, }; let mut count = 0; for n in ring.iter_mut() { if n.acked_by.insert(token.to_string()) { count += 1; } } debug!(token, project, acked = count, "notifications acknowledged"); count } /// Acknowledge a specific notification by ID. pub fn ack_one(&self, project: &str, token: &str, notification_id: u64) -> bool { let mut ring = match self.projects.get_mut(project) { Some(r) => r, None => return false, }; ring.iter_mut() .find(|n| n.id == notification_id) .map(|n| n.acked_by.insert(token.to_string())) .unwrap_or(false) } /// Check whether a notification event requires acknowledgment. fn requires_ack(&self, event: &NotificationEvent) -> bool { // Custom (user-emitted) notifications are always stored. if matches!(event, NotificationEvent::Custom) { return true; } if self.ack_required.is_empty() { // Default: ontology and ADR changes require ack; Custom already handled above. matches!( event, NotificationEvent::OntologyChanged | NotificationEvent::AdrChanged | NotificationEvent::Custom ) } else { let dir = match event { NotificationEvent::OntologyChanged => ".ontology", NotificationEvent::AdrChanged => "adrs", NotificationEvent::ReflectionChanged => "reflection", NotificationEvent::Custom => return true, }; self.ack_required.iter().any(|r| r == dir) } } /// Get a single notification by ID across all projects. pub fn get_one(&self, id: u64) -> Option { for entry in self.projects.iter() { if let Some(n) = entry.value().iter().find(|n| n.id == id) { return Some(NotificationView::from(n)); } } None } /// All projects with stored notifications. pub fn projects(&self) -> Vec { self.projects.iter().map(|e| e.key().clone()).collect() } /// All notifications across all projects, newest-last order per project. /// Used by the web UI to display a global feed without a per-actor token. pub fn all_recent(&self) -> Vec { self.projects .iter() .flat_map(|entry| { entry .value() .iter() .map(NotificationView::from) .collect::>() }) .collect() } } /// Serializable view without the mutable `acked_by` set. #[derive(Debug, Clone, Serialize)] pub struct NotificationView { pub id: u64, pub project: String, pub event: NotificationEvent, pub files: Vec, pub timestamp: u64, pub source_actor: Option, pub custom_kind: Option, pub custom_title: Option, pub custom_payload: Option, pub source_project: Option, } impl From<&Notification> for NotificationView { fn from(n: &Notification) -> Self { Self { id: n.id, project: n.project.clone(), event: n.event, files: n.files.clone(), timestamp: n.timestamp, source_actor: n.source_actor.clone(), custom_kind: n.custom_kind.clone(), custom_title: n.custom_title.clone(), custom_payload: n.custom_payload.clone(), source_project: n.source_project.clone(), } } } #[derive(Debug, Deserialize)] pub struct AckRequest { pub token: String, #[serde(default)] pub project: Option, #[serde(default)] pub all: bool, #[serde(default)] pub notification_id: Option, } fn epoch_secs() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs() } #[cfg(test)] mod tests { use super::*; fn store() -> NotificationStore { NotificationStore::new(100, vec![".ontology".into(), "adrs".into()]) } #[test] fn event_from_path() { assert_eq!( NotificationEvent::from_path(".ontology/core.ncl"), Some(NotificationEvent::OntologyChanged) ); assert_eq!( NotificationEvent::from_path("adrs/adr-001.ncl"), Some(NotificationEvent::AdrChanged) ); assert_eq!( NotificationEvent::from_path("reflection/modes/sync.ncl"), Some(NotificationEvent::ReflectionChanged) ); assert_eq!(NotificationEvent::from_path("src/main.rs"), None); } #[test] fn push_and_pending() { let store = store(); let files = vec![".ontology/core.ncl".into()]; let ids = store.push("proj", files, None); assert_eq!(ids.len(), 1); let pending = store.pending("proj", "dev:host:1"); assert_eq!(pending.len(), 1); assert_eq!(pending[0].event, NotificationEvent::OntologyChanged); } #[test] fn push_groups_by_event_type() { let store = store(); let files = vec![ ".ontology/core.ncl".into(), "adrs/adr-001.ncl".into(), ".ontology/state.ncl".into(), ]; let ids = store.push("proj", files, None); // Should create 2 notifications: OntologyChanged + AdrChanged assert_eq!(ids.len(), 2); let pending = store.pending("proj", "dev:host:1"); assert_eq!(pending.len(), 2); let events: HashSet = pending.iter().map(|n| n.event).collect(); assert!(events.contains(&NotificationEvent::OntologyChanged)); assert!(events.contains(&NotificationEvent::AdrChanged)); } #[test] fn ack_clears_pending() { let store = store(); store.push("proj", vec![".ontology/state.ncl".into()], None); store.push("proj", vec!["adrs/adr-002.ncl".into()], None); let token = "dev:host:1"; assert_eq!(store.pending_count("proj", token), 2); let acked = store.ack_all("proj", token); assert_eq!(acked, 2); assert_eq!(store.pending_count("proj", token), 0); } #[test] fn ack_one_specific() { let store = store(); let ids = store.push("proj", vec![".ontology/core.ncl".into()], None); let id1 = ids[0]; store.push("proj", vec!["adrs/adr-001.ncl".into()], None); let token = "dev:host:1"; assert!(store.ack_one("proj", token, id1)); assert_eq!(store.pending_count("proj", token), 1); } #[test] fn ring_buffer_eviction() { let store = NotificationStore::new(3, vec![".ontology".into()]); for i in 0..5 { store.push("proj", vec![format!(".ontology/file{i}.ncl")], None); } // Only last 3 retained let pending = store.pending("proj", "token"); assert_eq!(pending.len(), 3); } #[test] fn reflection_not_ack_required_by_default_config() { let store = store(); // ack_required = [".ontology", "adrs"] let ids = store.push("proj", vec!["reflection/modes/sync.ncl".into()], None); // reflection not in ack_required → push returns empty assert!(ids.is_empty()); } }