Jesús Pérez 1329eb509f
Some checks failed
Nickel Type Check / Nickel Type Checking (push) Has been cancelled
Rust CI / Security Audit (push) Has been cancelled
Rust CI / Check + Test + Lint (nightly) (push) Has been cancelled
Rust CI / Check + Test + Lint (stable) (push) Has been cancelled
feat(core): add SurrealDB v3 engine abstraction, NATS event publishing, and storage factory
Key changes: new events.rs (NATS EventingStorage decorator), storage/factory.rs (backend selection), orchestration.rs, SurrealDB v3
  engine upgrade, expanded Nickel schemas, and two new ADRs (006, 007).
2026-02-22 21:51:53 +00:00

170 lines
5.1 KiB
Rust

//! NATS `JetStream` event publishing for KOGRAL storage operations
//!
//! `EventPublisher` wraps a `platform_nats::EventStream` and publishes typed
//! `KogralEvent` values as JSON payloads.
//!
//! `EventingStorage<S>` decorates any `Storage` implementation with automatic
//! event publishing after each mutating operation, without modifying the
//! underlying storage type.
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use platform_nats::EventStream;
use serde::{Deserialize, Serialize};
use crate::error::{KbError, Result};
use crate::models::{Graph, Node};
use crate::storage::Storage;
/// Typed events emitted by KOGRAL storage operations
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "event_type", rename_all = "snake_case")]
pub enum KogralEvent {
/// A node was created or updated
NodeSaved {
/// Graph the node belongs to
graph: String,
/// Node identifier
node_id: String,
/// Lowercase node type string (e.g. `"note"`, `"decision"`)
node_type: String,
},
/// A node was removed
NodeDeleted {
/// Graph the node belonged to
graph: String,
/// Node identifier
node_id: String,
},
/// A complete graph snapshot was persisted
GraphSaved {
/// Graph name
name: String,
/// Number of nodes in the snapshot
node_count: usize,
},
}
impl KogralEvent {
/// Subject string used when publishing to NATS `JetStream`
#[must_use]
pub fn subject(&self) -> String {
match self {
Self::NodeSaved { graph, .. } => format!("kogral.{graph}.node.saved"),
Self::NodeDeleted { graph, .. } => format!("kogral.{graph}.node.deleted"),
Self::GraphSaved { name, .. } => format!("kogral.{name}.graph.saved"),
}
}
}
/// Publishes `KogralEvent` values to NATS `JetStream`
pub struct EventPublisher {
stream: Arc<EventStream>,
}
impl EventPublisher {
/// Wrap an `EventStream` in a publisher.
#[must_use]
pub fn new(stream: EventStream) -> Self {
Self {
stream: Arc::new(stream),
}
}
/// Serialize `event` to JSON and publish to its computed NATS subject.
///
/// # Errors
///
/// Returns `KbError::Serialization` if JSON encoding fails, or
/// `KbError::Storage` if the NATS publish call fails.
pub async fn publish(&self, event: KogralEvent) -> Result<()> {
let subject = event.subject();
let payload = serde_json::to_vec(&event)
.map_err(|e| KbError::Serialization(e.to_string()))?;
self.stream
.publish(&subject, Bytes::from(payload))
.await
.map_err(|e| KbError::Storage(format!("NATS publish to '{subject}': {e}")))
}
}
/// Decorator that wraps any `Storage` and publishes events after mutations
pub struct EventingStorage<S: Storage> {
inner: S,
publisher: EventPublisher,
/// Default graph name used in `save_node` events when `node.project` is `None`
graph_name: String,
}
impl<S: Storage> EventingStorage<S> {
/// Wrap `inner` storage with NATS event publishing.
///
/// `graph_name` is used as the event graph field for `save_node` calls
/// when the node has no `project` set.
#[must_use]
pub fn new(inner: S, publisher: EventPublisher, graph_name: String) -> Self {
Self {
inner,
publisher,
graph_name,
}
}
}
#[async_trait]
impl<S: Storage + Send> Storage for EventingStorage<S> {
async fn save_graph(&mut self, graph: &Graph) -> Result<()> {
self.inner.save_graph(graph).await?;
self.publisher
.publish(KogralEvent::GraphSaved {
name: graph.name.clone(),
node_count: graph.nodes.len(),
})
.await
}
async fn save_node(&mut self, node: &Node) -> Result<()> {
self.inner.save_node(node).await?;
let graph = node
.project
.as_deref()
.unwrap_or(&self.graph_name)
.to_string();
self.publisher
.publish(KogralEvent::NodeSaved {
graph,
node_id: node.id.clone(),
node_type: node.node_type.to_string(),
})
.await
}
async fn delete_node(&mut self, graph_name: &str, node_id: &str) -> Result<()> {
self.inner.delete_node(graph_name, node_id).await?;
self.publisher
.publish(KogralEvent::NodeDeleted {
graph: graph_name.to_string(),
node_id: node_id.to_string(),
})
.await
}
async fn load_graph(&self, name: &str) -> Result<Graph> {
self.inner.load_graph(name).await
}
async fn load_node(&self, graph_name: &str, node_id: &str) -> Result<Node> {
self.inner.load_node(graph_name, node_id).await
}
async fn list_graphs(&self) -> Result<Vec<String>> {
self.inner.list_graphs().await
}
async fn list_nodes(&self, graph_name: &str, node_type: Option<&str>) -> Result<Vec<Node>> {
self.inner.list_nodes(graph_name, node_type).await
}
}