//! NATS `JetStream` event publishing for KOGRAL storage operations //! //! `EventPublisher` wraps a `platform_nats::EventStream` and publishes typed //! `KogralEvent` values as JSON payloads. //! //! `EventingStorage` decorates any `Storage` implementation with automatic //! event publishing after each mutating operation, without modifying the //! underlying storage type. use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; use platform_nats::EventStream; use serde::{Deserialize, Serialize}; use crate::error::{KbError, Result}; use crate::models::{Graph, Node}; use crate::storage::Storage; /// Typed events emitted by KOGRAL storage operations #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "event_type", rename_all = "snake_case")] pub enum KogralEvent { /// A node was created or updated NodeSaved { /// Graph the node belongs to graph: String, /// Node identifier node_id: String, /// Lowercase node type string (e.g. `"note"`, `"decision"`) node_type: String, }, /// A node was removed NodeDeleted { /// Graph the node belonged to graph: String, /// Node identifier node_id: String, }, /// A complete graph snapshot was persisted GraphSaved { /// Graph name name: String, /// Number of nodes in the snapshot node_count: usize, }, } impl KogralEvent { /// Subject string used when publishing to NATS `JetStream` #[must_use] pub fn subject(&self) -> String { match self { Self::NodeSaved { graph, .. } => format!("kogral.{graph}.node.saved"), Self::NodeDeleted { graph, .. } => format!("kogral.{graph}.node.deleted"), Self::GraphSaved { name, .. } => format!("kogral.{name}.graph.saved"), } } } /// Publishes `KogralEvent` values to NATS `JetStream` pub struct EventPublisher { stream: Arc, } impl EventPublisher { /// Wrap an `EventStream` in a publisher. #[must_use] pub fn new(stream: EventStream) -> Self { Self { stream: Arc::new(stream), } } /// Serialize `event` to JSON and publish to its computed NATS subject. /// /// # Errors /// /// Returns `KbError::Serialization` if JSON encoding fails, or /// `KbError::Storage` if the NATS publish call fails. pub async fn publish(&self, event: KogralEvent) -> Result<()> { let subject = event.subject(); let payload = serde_json::to_vec(&event) .map_err(|e| KbError::Serialization(e.to_string()))?; self.stream .publish(&subject, Bytes::from(payload)) .await .map_err(|e| KbError::Storage(format!("NATS publish to '{subject}': {e}"))) } } /// Decorator that wraps any `Storage` and publishes events after mutations pub struct EventingStorage { inner: S, publisher: EventPublisher, /// Default graph name used in `save_node` events when `node.project` is `None` graph_name: String, } impl EventingStorage { /// Wrap `inner` storage with NATS event publishing. /// /// `graph_name` is used as the event graph field for `save_node` calls /// when the node has no `project` set. #[must_use] pub fn new(inner: S, publisher: EventPublisher, graph_name: String) -> Self { Self { inner, publisher, graph_name, } } } #[async_trait] impl Storage for EventingStorage { async fn save_graph(&mut self, graph: &Graph) -> Result<()> { self.inner.save_graph(graph).await?; self.publisher .publish(KogralEvent::GraphSaved { name: graph.name.clone(), node_count: graph.nodes.len(), }) .await } async fn save_node(&mut self, node: &Node) -> Result<()> { self.inner.save_node(node).await?; let graph = node .project .as_deref() .unwrap_or(&self.graph_name) .to_string(); self.publisher .publish(KogralEvent::NodeSaved { graph, node_id: node.id.clone(), node_type: node.node_type.to_string(), }) .await } async fn delete_node(&mut self, graph_name: &str, node_id: &str) -> Result<()> { self.inner.delete_node(graph_name, node_id).await?; self.publisher .publish(KogralEvent::NodeDeleted { graph: graph_name.to_string(), node_id: node_id.to_string(), }) .await } async fn load_graph(&self, name: &str) -> Result { self.inner.load_graph(name).await } async fn load_node(&self, graph_name: &str, node_id: &str) -> Result { self.inner.load_node(graph_name, node_id).await } async fn list_graphs(&self) -> Result> { self.inner.list_graphs().await } async fn list_nodes(&self, graph_name: &str, node_type: Option<&str>) -> Result> { self.inner.list_nodes(graph_name, node_type).await } }