feat(core): add SurrealDB v3 engine abstraction, NATS event publishing, and storage factory
Some checks failed
Nickel Type Check / Nickel Type Checking (push) Has been cancelled
Rust CI / Security Audit (push) Has been cancelled
Rust CI / Check + Test + Lint (nightly) (push) Has been cancelled
Rust CI / Check + Test + Lint (stable) (push) Has been cancelled

Key changes: new events.rs (NATS EventingStorage decorator), storage/factory.rs (backend selection), orchestration.rs, SurrealDB v3
  engine upgrade, expanded Nickel schemas, and two new ADRs (006, 007).
This commit is contained in:
Jesús Pérez 2026-02-22 21:51:53 +00:00
parent 6335aba33b
commit 1329eb509f
Signed by: jesus
GPG Key ID: 9F243E355E0BC939
22 changed files with 3778 additions and 949 deletions

View File

@ -7,6 +7,69 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
#### SurrealDB 3.0 — Config-Driven Engine Abstraction (`surrealdb-backend` feature)
- **`SurrealEngineConfig`** tagged enum (`mem`, `surreal_kv`, `rocks_db`, `ws`) selects the
engine at runtime via `surrealdb::engine::any::connect(url)` URL dispatch — no recompilation
required to change backends.
- **`SurrealDbBackendConfig`** dual hot/cold layout: `graph` engine (default: SurrealKV B-tree)
for nodes/edges/metadata, `hot` engine (default: RocksDB LSM) for embeddings and session logs.
- **`SurrealDbStorage::from_config()`** constructor replaces ad-hoc field initialization;
`Surreal<Any>` unified type — no generics, no `Arc<RwLock<>>`.
- All SurrealDB CRUD routes through `serde_json::Value` (`to_json` / `from_json` helpers) to
satisfy the `SurrealValue` trait bound without coupling domain models to the database library.
- `SurrealDbStorage::save_embedding()` and `log_session()` for hot-engine direct access.
- Removed `Database(#[from] surrealdb::Error)` variant from `KbError` — all SurrealDB errors
propagate as `KbError::Storage(e.to_string())`.
- `surrealdb-backend` Cargo feature activates `kv-surrealkv`, `kv-rocksdb`, `protocol-ws`,
`rustls` engine features on top of the workspace `kv-mem` base.
#### Storage Factory (`storage::factory`)
- **`storage::factory::build(config, base_path)`** — config-driven engine dispatch: SurrealDB
when `secondary.enabled`, `MemoryStorage` when `primary == memory`, `FilesystemStorage`
otherwise. Zero recompilation to switch backends.
- **`storage::factory::build_eventing(config, base_path, default_graph)`** — builds the base
backend then wraps it with `EventingStorage` when `config.nats` is `Some`; falls through to
`build` when NATS is not configured (`nats-events` feature required).
- **`impl Storage for Box<dyn Storage>`** — blanket delegation impl enabling
`EventingStorage<Box<dyn Storage>>` without knowing the concrete backend type at compile time.
#### NATS JetStream Event Publishing (`nats-events` feature)
- **`KogralEvent`** typed enum (`NodeSaved`, `NodeDeleted`, `GraphSaved`) with computed NATS
subject (`kogral.{graph}.node.saved`, etc.) and `serde` JSON serialization.
- **`EventPublisher`** wraps `platform_nats::EventStream` and publishes `KogralEvent` as JSON
to JetStream after each mutation.
- **`EventingStorage<S>`** decorator wraps any `Storage` implementation with automatic event
publishing after `save_graph`, `save_node`, and `delete_node` — inner backend unchanged.
- **`NatsEventConfig`** schema type with `From` conversion to `platform_nats::NatsConfig`.
- `nats-events` feature adds `platform-nats` and `bytes` workspace dependencies.
#### Orchestration Integration (`orchestration` feature)
- **`orchestration::pipeline_context_from_event(event, payload, state, schema_dir)`** bridges
`KogralEvent` to `stratum-orchestrator`'s `PipelineContext::new` for downstream pipeline
triggering.
- `orchestration` feature activates `stratum-orchestrator` and `stratum-state` workspace deps;
implies `nats-events`.
#### Nickel Config Schema Updates
- `secondary.surrealdb` sub-record replaces flat `url`/`namespace`/`database` fields:
`graph` and `hot` engine selectors with `engine`, `path`/`url` fields per engine type.
- `nats` top-level block in `StorageConfig` for optional JetStream configuration.
- `SurrealEngineValidator` contract and `SurrealEngineConfig` / `SurrealDbBackendConfig` /
`NatsEventConfig` type contracts added to `schemas/kogral/contracts.ncl`.
- `schemas/kogral/defaults.ncl` updated with dual-engine defaults and `nats` disabled block.
#### Nickel Import Resolver
- `config::nickel::resolve_nickel_imports(path)` runs `ncl-import-resolver` on
`resolver-manifest.json` (when present) before `nickel export`. No-op when manifest absent.
### Changed
#### BREAKING: MCP Protocol Rebranding

2869
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -41,9 +41,15 @@ tera = "1.20"
rig-core = "0.30"
# Storage
surrealdb = "2.6"
surrealdb = { version = "3", features = ["kv-mem"] }
dashmap = "6.1"
# NATS eventing & orchestration
platform-nats = { path = "/Users/Akasha/Development/stratumiops/crates/platform-nats" }
stratum-orchestrator = { path = "/Users/Akasha/Development/stratumiops/crates/stratum-orchestrator" }
stratum-state = { path = "/Users/Akasha/Development/stratumiops/crates/stratum-state" }
bytes = "1.9"
# File watching
notify = "8.2"

View File

@ -254,7 +254,11 @@ Knowledge base behavior is fully configurable via Nickel schemas:
secondary = {
enabled = true,
type = 'surrealdb,
url = "ws://localhost:8000",
surrealdb = {
graph = { engine = "surreal_kv", path = ".kogral/db/graph" },
hot = { engine = "rocks_db", path = ".kogral/db/hot" },
namespace = "kogral",
},
},
},

View File

@ -21,7 +21,11 @@ anyhow = { workspace = true }
pulldown-cmark = { workspace = true }
tera = { workspace = true }
rig-core = { workspace = true }
surrealdb = { workspace = true, optional = true }
surrealdb = { workspace = true, optional = true, features = ["kv-surrealkv", "kv-rocksdb", "protocol-ws", "rustls"] }
platform-nats = { workspace = true, optional = true }
stratum-orchestrator = { workspace = true, optional = true }
stratum-state = { workspace = true, optional = true }
bytes = { workspace = true, optional = true }
dashmap = { workspace = true }
notify = { workspace = true }
chrono = { workspace = true }
@ -46,5 +50,7 @@ tempfile = { workspace = true }
[features]
default = ["filesystem"]
filesystem = []
surrealdb-backend = ["surrealdb"]
full = ["surrealdb-backend"]
surrealdb-backend = ["dep:surrealdb"]
nats-events = ["dep:platform-nats", "dep:bytes"]
orchestration = ["nats-events", "dep:stratum-orchestrator", "dep:stratum-state"]
full = ["surrealdb-backend", "nats-events", "orchestration"]

View File

@ -15,6 +15,45 @@ use tracing::{debug, info, warn};
use crate::error::{KbError, Result};
/// Run `ncl-import-resolver` on the manifest adjacent to `ncl_file` if present.
///
/// Looks for `resolver-manifest.json` in the same directory as `ncl_file`.
/// When the manifest is absent the function is a no-op, so callers in
/// environments without the tool are unaffected.
///
/// # Errors
///
/// Returns `KbError::NickelExport` if:
/// - `ncl-import-resolver` is not in PATH when the manifest exists
/// - The resolver process exits non-zero
fn resolve_nickel_imports(ncl_file: &Path) -> Result<()> {
let manifest = ncl_file
.parent()
.unwrap_or(Path::new("."))
.join("resolver-manifest.json");
if !manifest.exists() {
return Ok(());
}
let output = Command::new("ncl-import-resolver")
.arg(&manifest)
.output()
.map_err(|e| {
KbError::NickelExport(format!("ncl-import-resolver unavailable: {e}"))
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(KbError::NickelExport(format!(
"ncl-import-resolver failed for {}: {stderr}",
manifest.display()
)));
}
Ok(())
}
/// Export a Nickel file to JSON using the Nickel CLI
///
/// # Arguments
@ -42,6 +81,8 @@ pub fn export_nickel_to_json<P: AsRef<Path>>(nickel_file: P) -> Result<String> {
)));
}
resolve_nickel_imports(path)?;
info!("Exporting Nickel file to JSON: {}", path.display());
let output = Command::new("nickel")
@ -129,8 +170,7 @@ pub fn is_nickel_available() -> bool {
Command::new("nickel")
.arg("--version")
.output()
.map(|output| output.status.success())
.unwrap_or(false)
.is_ok_and(|output| output.status.success())
}
/// Get Nickel CLI version

View File

@ -161,8 +161,100 @@ pub enum SecondaryStorageType {
Sqlite,
}
/// `SurrealDB` engine selector — dispatches via `engine::any::connect(url)`
///
/// Serialized with an internal `engine` tag so Nickel records map directly:
/// `{ engine = "surreal_kv", path = ".kogral/db/graph" }` → `SurrealKv`.
#[cfg(feature = "surrealdb-backend")]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "engine", rename_all = "snake_case")]
pub enum SurrealEngineConfig {
/// In-memory — tests and ephemeral use only
Mem,
/// `SurrealKV` embedded (B-tree) — default for relational/graph data
SurrealKv {
/// Filesystem path for the `SurrealKV` database directory
path: String,
},
/// `RocksDB` embedded (LSM) — default for hot/append data (embeddings, sessions)
RocksDb {
/// Filesystem path for the `RocksDB` database directory
path: String,
},
/// Remote WebSocket — team and shared deployments
Ws {
/// Full WebSocket URL, e.g. `ws://surrealdb.internal:8000`
url: String,
},
}
#[cfg(feature = "surrealdb-backend")]
impl SurrealEngineConfig {
/// Produce the URL string consumed by `surrealdb::engine::any::connect`
#[must_use]
pub fn to_url(&self) -> String {
match self {
Self::Mem => "mem://".to_string(),
Self::SurrealKv { path } => format!("surrealkv://{path}"),
Self::RocksDb { path } => format!("rocksdb://{path}"),
Self::Ws { url } => url.clone(),
}
}
}
/// Dual-engine `SurrealDB` backend configuration
///
/// `graph` (default: `SurrealKV`) stores nodes, edges, and metadata using a
/// B-tree engine suited for random-access patterns. `hot` (default: `RocksDB`)
/// stores embeddings and session logs using an LSM engine suited for
/// append-heavy workloads.
#[cfg(feature = "surrealdb-backend")]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SurrealDbBackendConfig {
/// Engine for relational/graph data (nodes, edges, metadata)
#[serde(default = "default_graph_engine")]
pub graph: SurrealEngineConfig,
/// Engine for hot data (embeddings, session logs)
#[serde(default = "default_hot_engine")]
pub hot: SurrealEngineConfig,
/// `SurrealDB` namespace shared by both engines
#[serde(default = "default_surreal_namespace")]
pub namespace: String,
}
#[cfg(feature = "surrealdb-backend")]
impl Default for SurrealDbBackendConfig {
fn default() -> Self {
Self {
graph: default_graph_engine(),
hot: default_hot_engine(),
namespace: default_surreal_namespace(),
}
}
}
#[cfg(feature = "surrealdb-backend")]
fn default_graph_engine() -> SurrealEngineConfig {
SurrealEngineConfig::SurrealKv {
path: ".kogral/db/graph".to_string(),
}
}
#[cfg(feature = "surrealdb-backend")]
fn default_hot_engine() -> SurrealEngineConfig {
SurrealEngineConfig::RocksDb {
path: ".kogral/db/hot".to_string(),
}
}
#[cfg(feature = "surrealdb-backend")]
fn default_surreal_namespace() -> String {
"kogral".to_string()
}
/// Secondary storage configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Default)]
pub struct SecondaryStorageConfig {
/// Whether secondary storage is enabled
#[serde(default)]
@ -170,41 +262,52 @@ pub struct SecondaryStorageConfig {
/// Secondary storage type
#[serde(rename = "type", default)]
pub storage_type: SecondaryStorageType,
/// Connection URL
#[serde(default = "default_surrealdb_url")]
/// `SurrealDB` engine configuration (dual hot/cold layout)
#[cfg(feature = "surrealdb-backend")]
#[serde(default)]
pub surrealdb: SurrealDbBackendConfig,
}
/// NATS `JetStream` event publishing configuration
#[cfg(feature = "nats-events")]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NatsEventConfig {
/// NATS server URL (e.g. `nats://localhost:4222`)
pub url: String,
/// Database namespace
#[serde(default = "default_namespace")]
pub namespace: String,
/// Database name
#[serde(default = "default_database")]
pub database: String,
/// `JetStream` stream name
pub stream_name: String,
/// Durable consumer name
pub consumer_name: String,
/// Subjects this stream captures (e.g. `["kogral.>"]`)
#[serde(default)]
pub subjects: Vec<String>,
/// `NKey` seed for signing published messages (optional)
#[serde(default)]
pub nkey_seed: Option<String>,
/// Public `NKeys` whose signatures are trusted
#[serde(default)]
pub trusted_nkeys: Vec<String>,
/// Reject messages that lack valid `NKey` signatures
#[serde(default)]
pub require_signed_messages: bool,
}
impl Default for SecondaryStorageConfig {
fn default() -> Self {
#[cfg(feature = "nats-events")]
impl From<NatsEventConfig> for platform_nats::NatsConfig {
fn from(c: NatsEventConfig) -> Self {
Self {
enabled: false,
storage_type: SecondaryStorageType::default(),
url: default_surrealdb_url(),
namespace: default_namespace(),
database: default_database(),
url: c.url,
stream_name: c.stream_name,
consumer_name: c.consumer_name,
subjects: c.subjects,
nkey_seed: c.nkey_seed,
trusted_nkeys: c.trusted_nkeys,
require_signed_messages: c.require_signed_messages,
}
}
}
fn default_surrealdb_url() -> String {
"ws://localhost:8000".to_string()
}
fn default_namespace() -> String {
"kb".to_string()
}
fn default_database() -> String {
"default".to_string()
}
/// Storage backend configuration
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct StorageConfig {
@ -214,6 +317,10 @@ pub struct StorageConfig {
/// Secondary storage (optional, for scaling/search)
#[serde(default)]
pub secondary: SecondaryStorageConfig,
/// NATS event publishing (optional)
#[cfg(feature = "nats-events")]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub nats: Option<NatsEventConfig>,
}
/// Embedding provider type

View File

@ -87,11 +87,6 @@ pub enum KbError {
#[error("Invalid edge type: {0}")]
InvalidEdgeType(String),
/// Database operation error
#[cfg(feature = "surrealdb-backend")]
#[error("Database error: {0}")]
Database(#[from] surrealdb::Error),
/// Other errors
#[error("{0}")]
Other(String),

View File

@ -0,0 +1,169 @@
//! NATS `JetStream` event publishing for KOGRAL storage operations
//!
//! `EventPublisher` wraps a `platform_nats::EventStream` and publishes typed
//! `KogralEvent` values as JSON payloads.
//!
//! `EventingStorage<S>` decorates any `Storage` implementation with automatic
//! event publishing after each mutating operation, without modifying the
//! underlying storage type.
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use platform_nats::EventStream;
use serde::{Deserialize, Serialize};
use crate::error::{KbError, Result};
use crate::models::{Graph, Node};
use crate::storage::Storage;
/// Typed events emitted by KOGRAL storage operations
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "event_type", rename_all = "snake_case")]
pub enum KogralEvent {
/// A node was created or updated
NodeSaved {
/// Graph the node belongs to
graph: String,
/// Node identifier
node_id: String,
/// Lowercase node type string (e.g. `"note"`, `"decision"`)
node_type: String,
},
/// A node was removed
NodeDeleted {
/// Graph the node belonged to
graph: String,
/// Node identifier
node_id: String,
},
/// A complete graph snapshot was persisted
GraphSaved {
/// Graph name
name: String,
/// Number of nodes in the snapshot
node_count: usize,
},
}
impl KogralEvent {
/// Subject string used when publishing to NATS `JetStream`
#[must_use]
pub fn subject(&self) -> String {
match self {
Self::NodeSaved { graph, .. } => format!("kogral.{graph}.node.saved"),
Self::NodeDeleted { graph, .. } => format!("kogral.{graph}.node.deleted"),
Self::GraphSaved { name, .. } => format!("kogral.{name}.graph.saved"),
}
}
}
/// Publishes `KogralEvent` values to NATS `JetStream`
pub struct EventPublisher {
stream: Arc<EventStream>,
}
impl EventPublisher {
/// Wrap an `EventStream` in a publisher.
#[must_use]
pub fn new(stream: EventStream) -> Self {
Self {
stream: Arc::new(stream),
}
}
/// Serialize `event` to JSON and publish to its computed NATS subject.
///
/// # Errors
///
/// Returns `KbError::Serialization` if JSON encoding fails, or
/// `KbError::Storage` if the NATS publish call fails.
pub async fn publish(&self, event: KogralEvent) -> Result<()> {
let subject = event.subject();
let payload = serde_json::to_vec(&event)
.map_err(|e| KbError::Serialization(e.to_string()))?;
self.stream
.publish(&subject, Bytes::from(payload))
.await
.map_err(|e| KbError::Storage(format!("NATS publish to '{subject}': {e}")))
}
}
/// Decorator that wraps any `Storage` and publishes events after mutations
pub struct EventingStorage<S: Storage> {
inner: S,
publisher: EventPublisher,
/// Default graph name used in `save_node` events when `node.project` is `None`
graph_name: String,
}
impl<S: Storage> EventingStorage<S> {
/// Wrap `inner` storage with NATS event publishing.
///
/// `graph_name` is used as the event graph field for `save_node` calls
/// when the node has no `project` set.
#[must_use]
pub fn new(inner: S, publisher: EventPublisher, graph_name: String) -> Self {
Self {
inner,
publisher,
graph_name,
}
}
}
#[async_trait]
impl<S: Storage + Send> Storage for EventingStorage<S> {
async fn save_graph(&mut self, graph: &Graph) -> Result<()> {
self.inner.save_graph(graph).await?;
self.publisher
.publish(KogralEvent::GraphSaved {
name: graph.name.clone(),
node_count: graph.nodes.len(),
})
.await
}
async fn save_node(&mut self, node: &Node) -> Result<()> {
self.inner.save_node(node).await?;
let graph = node
.project
.as_deref()
.unwrap_or(&self.graph_name)
.to_string();
self.publisher
.publish(KogralEvent::NodeSaved {
graph,
node_id: node.id.clone(),
node_type: node.node_type.to_string(),
})
.await
}
async fn delete_node(&mut self, graph_name: &str, node_id: &str) -> Result<()> {
self.inner.delete_node(graph_name, node_id).await?;
self.publisher
.publish(KogralEvent::NodeDeleted {
graph: graph_name.to_string(),
node_id: node_id.to_string(),
})
.await
}
async fn load_graph(&self, name: &str) -> Result<Graph> {
self.inner.load_graph(name).await
}
async fn load_node(&self, graph_name: &str, node_id: &str) -> Result<Node> {
self.inner.load_node(graph_name, node_id).await
}
async fn list_graphs(&self) -> Result<Vec<String>> {
self.inner.list_graphs().await
}
async fn list_nodes(&self, graph_name: &str, node_type: Option<&str>) -> Result<Vec<Node>> {
self.inner.list_nodes(graph_name, node_type).await
}
}

View File

@ -68,6 +68,12 @@ pub mod query;
pub mod storage;
pub mod sync;
#[cfg(feature = "nats-events")]
pub mod events;
#[cfg(feature = "orchestration")]
pub mod orchestration;
mod regex_patterns;
// Re-exports for convenience

View File

@ -0,0 +1,29 @@
//! Bridge between KOGRAL events and the stratum-orchestrator pipeline runtime
//!
//! Maps `KogralEvent` subjects and payloads into `PipelineContext` instances
//! that the stratum-orchestrator `StageRunner` can execute.
use std::{path::PathBuf, sync::Arc};
use stratum_orchestrator::context::PipelineContext;
use stratum_state::StateTracker;
use crate::events::KogralEvent;
/// Construct a `PipelineContext` from a `KogralEvent`.
///
/// The event's subject becomes the pipeline trigger subject and `payload`
/// is passed through as the trigger payload. A fresh `PipelineRun` record
/// is written to `state` before this function returns.
///
/// # Errors
///
/// Propagates any error from `PipelineContext::new` (DB write failure).
pub async fn pipeline_context_from_event(
event: &KogralEvent,
payload: serde_json::Value,
state: Arc<dyn StateTracker>,
schema_dir: PathBuf,
) -> anyhow::Result<PipelineContext> {
PipelineContext::new(event.subject(), payload, state, schema_dir).await
}

View File

@ -77,7 +77,7 @@ impl QueryEngine {
}
// Sort by score (descending)
results.sort_by(|a, b| b.0.cmp(&a.0));
results.sort_by_key(|b| std::cmp::Reverse(b.0));
Ok(results.into_iter().map(|(_, node)| node).collect())
}

View File

@ -0,0 +1,168 @@
//! Config-driven storage backend factory
//!
//! Selects and constructs the appropriate [`Storage`] implementation based on
//! [`StorageConfig`](crate::config::schema::StorageConfig) at runtime —
//! no recompilation required to switch backends.
//!
//! ## Engine selection priority
//!
//! 1. `surrealdb-backend` feature ON **and** `secondary.enabled` **and**
//! `secondary.type == surrealdb` → [`SurrealDbStorage`](super::surrealdb::SurrealDbStorage)
//! using the dual hot/cold engine layout from `secondary.surrealdb`
//! 2. `primary == memory` → [`MemoryStorage`](super::memory::MemoryStorage)
//! 3. default → [`FilesystemStorage`](super::filesystem::FilesystemStorage)
//! rooted at `base_path`
//!
//! ## NATS event wrapping
//!
//! When the `nats-events` feature is enabled, [`build_eventing`] wraps any
//! backend with [`EventingStorage`](crate::events::EventingStorage), publishing
//! a [`KogralEvent`](crate::events::KogralEvent) after each mutation.
use std::path::PathBuf;
use crate::config::schema::{StorageConfig, StorageType};
use crate::error::Result;
use crate::storage::{filesystem::FilesystemStorage, memory::MemoryStorage, Storage};
/// Build the storage backend driven by `config`.
///
/// Applies the engine selection priority described in the module docs.
/// For NATS event publishing after mutations, use [`build_eventing`] instead.
///
/// # Errors
///
/// Returns [`KbError`](crate::error::KbError) if the `SurrealDB` engine
/// connection fails.
// `unused_async`: this function has no await when surrealdb-backend is off,
// but callers treat it as async uniformly across all feature combinations.
#[allow(clippy::unused_async)]
pub async fn build(config: &StorageConfig, base_path: PathBuf) -> Result<Box<dyn Storage>> {
#[cfg(feature = "surrealdb-backend")]
if config.secondary.enabled
&& config.secondary.storage_type
== crate::config::schema::SecondaryStorageType::Surrealdb
{
let db = crate::storage::surrealdb::SurrealDbStorage::from_config(
&config.secondary.surrealdb,
)
.await?;
return Ok(Box::new(db));
}
Ok(match config.primary {
StorageType::Memory => Box::new(MemoryStorage::new()),
StorageType::Filesystem => Box::new(FilesystemStorage::new(base_path)),
})
}
/// Build storage wrapped with NATS `JetStream` event publishing.
///
/// Calls [`build`] to construct the base backend, then wraps it with
/// [`EventingStorage`](crate::events::EventingStorage) so that a
/// [`KogralEvent`](crate::events::KogralEvent) is published after every
/// mutation (`save_graph`, `save_node`, `delete_node`).
///
/// When `config.nats` is `None` the function returns the unwrapped base
/// backend — identical to calling [`build`] directly.
///
/// `default_graph` is used in `NodeSaved` events when `node.project` is
/// `None`.
///
/// # Errors
///
/// Returns [`KbError`](crate::error::KbError) if the base backend or the
/// NATS `JetStream` connection fails.
#[cfg(feature = "nats-events")]
pub async fn build_eventing(
config: &StorageConfig,
base_path: PathBuf,
default_graph: impl Into<String>,
) -> Result<Box<dyn Storage>> {
let base = build(config, base_path).await?;
let Some(nats_cfg) = &config.nats else {
return Ok(base);
};
let nats_config = platform_nats::NatsConfig::from(nats_cfg.clone());
let stream = platform_nats::EventStream::connect(&nats_config)
.await
.map_err(|e| crate::error::KbError::Storage(format!("NATS connect: {e}")))?;
let publisher = crate::events::EventPublisher::new(stream);
Ok(Box::new(crate::events::EventingStorage::new(
base,
publisher,
default_graph.into(),
)))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::schema::{StorageConfig, StorageType};
use tempfile::tempdir;
#[tokio::test]
async fn build_memory_backend() {
let config = StorageConfig {
primary: StorageType::Memory,
..StorageConfig::default()
};
let storage = build(&config, PathBuf::from("/tmp")).await;
assert!(storage.is_ok(), "memory backend must construct without I/O");
}
#[tokio::test]
async fn build_filesystem_backend() {
let dir = tempdir().unwrap();
let config = StorageConfig::default(); // primary defaults to Filesystem
let storage = build(&config, dir.path().to_path_buf()).await;
assert!(
storage.is_ok(),
"filesystem backend must construct for any valid path"
);
}
#[tokio::test]
async fn memory_backend_save_load_roundtrip() {
use crate::models::Graph;
let config = StorageConfig {
primary: StorageType::Memory,
..StorageConfig::default()
};
let mut storage = build(&config, PathBuf::from("/tmp")).await.unwrap();
let graph = Graph::new("test-graph".to_string());
storage.save_graph(&graph).await.unwrap();
let loaded = storage.load_graph("test-graph").await.unwrap();
assert_eq!(loaded.name, "test-graph");
}
#[cfg(feature = "surrealdb-backend")]
#[tokio::test]
async fn build_surrealdb_mem_backend() {
use crate::config::schema::{
SecondaryStorageConfig, SecondaryStorageType, SurrealDbBackendConfig,
SurrealEngineConfig,
};
let config = StorageConfig {
secondary: SecondaryStorageConfig {
enabled: true,
storage_type: SecondaryStorageType::Surrealdb,
surrealdb: SurrealDbBackendConfig {
graph: SurrealEngineConfig::Mem,
hot: SurrealEngineConfig::Mem,
namespace: "factory-test".to_string(),
},
},
..StorageConfig::default()
};
let storage = build(&config, PathBuf::from("/tmp")).await;
assert!(storage.is_ok(), "surrealdb mem:// backend must connect");
}
}

View File

@ -4,6 +4,8 @@
//! - Filesystem: Git-friendly markdown files
//! - Memory: In-memory graph for dev/cache
//! - `SurrealDB`: Scalable database backend (optional)
//!
//! Use [`factory::build`] to construct the appropriate backend from config.
use async_trait::async_trait;
@ -53,6 +55,43 @@ pub trait Storage: Send + Sync {
async fn list_nodes(&self, graph_name: &str, node_type: Option<&str>) -> Result<Vec<Node>>;
}
/// Blanket `Storage` impl for `Box<dyn Storage>`.
///
/// Enables type-erased composition: `EventingStorage<Box<dyn Storage>>`
/// satisfies `S: Storage + Send` so the factory can wrap any backend
/// with NATS events without knowing the concrete type at compile time.
#[async_trait]
impl Storage for Box<dyn Storage> {
async fn save_graph(&mut self, graph: &Graph) -> Result<()> {
(**self).save_graph(graph).await
}
async fn load_graph(&self, name: &str) -> Result<Graph> {
(**self).load_graph(name).await
}
async fn save_node(&mut self, node: &Node) -> Result<()> {
(**self).save_node(node).await
}
async fn load_node(&self, graph_name: &str, node_id: &str) -> Result<Node> {
(**self).load_node(graph_name, node_id).await
}
async fn delete_node(&mut self, graph_name: &str, node_id: &str) -> Result<()> {
(**self).delete_node(graph_name, node_id).await
}
async fn list_graphs(&self) -> Result<Vec<String>> {
(**self).list_graphs().await
}
async fn list_nodes(&self, graph_name: &str, node_type: Option<&str>) -> Result<Vec<Node>> {
(**self).list_nodes(graph_name, node_type).await
}
}
pub mod factory;
pub mod filesystem;
pub mod memory;

View File

@ -1,144 +1,184 @@
//! SurrealDB storage backend
//! `SurrealDB` storage backend — `engine::any` dispatcher
//!
//! Provides scalable, graph-native storage using SurrealDB.
//! Supports distributed deployments and complex queries.
use std::sync::Arc;
//! A single `Surreal<Any>` type unifies all engine variants selected at
//! runtime via URL scheme. The dual hot/cold layout uses:
//! - `graph_db`: `SurrealKV` (default) — B-tree, suited for random-access graph data
//! - `hot_db`: `RocksDB` (default) — LSM, suited for embeddings and session logs
//!
//! ## `SurrealDB` 3.0 and `SurrealValue`
//!
//! `SurrealDB` 3.0 requires `SurrealValue` on all types passed to or returned
//! from CRUD methods. `Graph` and `Node` intentionally do not implement this
//! trait to avoid coupling the domain model to the database library.
//!
//! All I/O routes through `serde_json::Value`, which implements `SurrealValue`,
//! with explicit `serde_json::to_value` / `serde_json::from_value` conversion.
//! This is the same pattern used by `stratum-state`'s `SurrealStateTracker`.
use async_trait::async_trait;
use surrealdb::engine::any::Any;
use surrealdb::Surreal;
use tokio::sync::RwLock;
use surrealdb::{engine::any, engine::any::Any, Surreal};
use crate::config::schema::SurrealDbBackendConfig;
use crate::error::{KbError, Result};
use crate::models::{Graph, Node};
use crate::storage::Storage;
/// SurrealDB storage backend
// ─── serialization helpers ────────────────────────────────────────────────────
fn to_json<T: serde::Serialize>(v: &T) -> Result<serde_json::Value> {
serde_json::to_value(v).map_err(|e| KbError::Serialization(e.to_string()))
}
fn from_json<T: serde::de::DeserializeOwned>(v: serde_json::Value) -> Result<T> {
serde_json::from_value(v).map_err(|e| KbError::Serialization(e.to_string()))
}
// ─── engine factory ───────────────────────────────────────────────────────────
/// Open a `SurrealDB` connection using `engine::any` URL dispatch.
///
/// Stores graphs and nodes in SurrealDB with full ACID transactions.
/// Connection is wrapped in Arc<RwLock<>> for thread-safe concurrent access.
/// Calls `use_ns` / `use_db` at connection time so callers never need to
/// repeat the namespace/database selection before individual queries.
async fn open_engine(url: &str, namespace: &str, database: &str) -> Result<Surreal<Any>> {
let conn = any::connect(url)
.await
.map_err(|e| KbError::Storage(format!("connect '{url}' failed: {e}")))?;
conn.use_ns(namespace)
.use_db(database)
.await
.map_err(|e| KbError::Storage(format!("use_ns/use_db failed: {e}")))?;
Ok(conn)
}
// ─── storage type ─────────────────────────────────────────────────────────────
/// `SurrealDB` storage backend — no generics, no Arc<`RwLock`<>>
///
/// `Surreal<Any>` is already `Clone + Send + Sync`. The dual-connection
/// layout separates graph semantics from hot-data semantics at the engine
/// level rather than at the table level.
pub struct SurrealDbStorage {
db: Arc<RwLock<Surreal<Any>>>,
/// `SurrealKV` (default) or any config-selected engine — graph/node data
graph_db: Surreal<Any>,
/// `RocksDB` (default) or any config-selected engine — embeddings/sessions
hot_db: Surreal<Any>,
namespace: String,
database: String,
}
impl SurrealDbStorage {
/// Create a new SurrealDB storage instance
/// Construct from a `SurrealDbBackendConfig`, opening both engine connections.
///
/// # Arguments
/// * `db` - Connected SurrealDB instance
/// * `namespace` - SurrealDB namespace (default: "kogral")
/// * `database` - SurrealDB database (default: "kb")
pub fn new(db: Surreal<Any>, namespace: String, database: String) -> Self {
Self {
db: Arc::new(RwLock::new(db)),
namespace,
database,
}
/// # Errors
///
/// Returns `KbError::Storage` if either engine connection or namespace/database
/// selection fails.
pub async fn from_config(cfg: &SurrealDbBackendConfig) -> Result<Self> {
let graph_url = cfg.graph.to_url();
let hot_url = cfg.hot.to_url();
let graph_db = open_engine(&graph_url, &cfg.namespace, "graph").await?;
let hot_db = open_engine(&hot_url, &cfg.namespace, "hot").await?;
Ok(Self {
graph_db,
hot_db,
namespace: cfg.namespace.clone(),
})
}
/// Create a new storage with default namespace and database names
pub fn with_defaults(db: Surreal<Any>) -> Self {
Self::new(db, "kogral".to_string(), "kb".to_string())
/// Save a node embedding vector to the hot engine.
///
/// # Errors
///
/// Returns `KbError::Storage` on `SurrealDB` I/O failure.
pub async fn save_embedding(&self, node_id: &str, vector: &[f32]) -> Result<()> {
let payload = serde_json::json!({ "node_id": node_id, "vector": vector });
let _: Option<serde_json::Value> = self
.hot_db
.upsert(("embeddings", node_id))
.content(payload)
.await
.map_err(|e| KbError::Storage(e.to_string()))?;
Ok(())
}
/// Append a session log entry to the hot engine.
///
/// # Errors
///
/// Returns `KbError::Storage` on `SurrealDB` I/O failure.
pub async fn log_session(&self, entry: &serde_json::Value) -> Result<()> {
let _: Option<serde_json::Value> = self
.hot_db
.create("sessions")
.content(entry.clone())
.await
.map_err(|e| KbError::Storage(e.to_string()))?;
Ok(())
}
fn node_key(graph_name: &str, node_id: &str) -> String {
format!("{graph_name}__{node_id}")
}
}
impl Clone for SurrealDbStorage {
fn clone(&self) -> Self {
Self {
db: Arc::clone(&self.db),
graph_db: self.graph_db.clone(),
hot_db: self.hot_db.clone(),
namespace: self.namespace.clone(),
database: self.database.clone(),
}
}
}
// ─── Storage trait impl ───────────────────────────────────────────────────────
#[async_trait]
impl Storage for SurrealDbStorage {
async fn save_graph(&mut self, graph: &Graph) -> Result<()> {
let db = self.db.write().await;
let _ = db
.use_ns(&self.namespace)
.use_db(&self.database)
let _: Option<serde_json::Value> = self
.graph_db
.upsert(("graphs", graph.name.clone()))
.content(to_json(graph)?)
.await
.map_err(|e| KbError::Database(e))?;
.map_err(|e| KbError::Storage(e.to_string()))?;
// Serialize graph and all nodes
let graph_json = serde_json::to_value(graph)
.map_err(|e| KbError::Serialization(format!("Graph serialization error: {}", e)))?;
// Use raw SurrealQL query for upserting
let query = "UPSERT graphs:$graph_id SET * = $content;";
let graph_id = graph.name.clone();
let _: Vec<surrealdb::Value> = db
.query(query)
.bind(("graph_id", graph_id))
.bind(("content", graph_json))
.await
.map_err(|e| KbError::Database(e))?
.take(0)
.map_err(|e| KbError::Database(e))?;
// Upsert all nodes
for node in graph.nodes.values() {
let node_json = serde_json::to_value(node)
.map_err(|e| KbError::Serialization(format!("Node serialization error: {}", e)))?;
let node_key = format!("{}_{}", graph.name, node.id);
let query = "UPSERT nodes:$node_id SET * = $content;";
let _: Vec<surrealdb::Value> = db
.query(query)
.bind(("node_id", node_key))
.bind(("content", node_json))
let key = Self::node_key(&graph.name, &node.id);
let _: Option<serde_json::Value> = self
.graph_db
.upsert(("nodes", key))
.content(to_json(node)?)
.await
.map_err(|e| KbError::Database(e))?
.take(0)
.map_err(|e| KbError::Database(e))?;
.map_err(|e| KbError::Storage(e.to_string()))?;
}
Ok(())
}
async fn load_graph(&self, name: &str) -> Result<Graph> {
let db = self.db.read().await;
let _ = db
.use_ns(&self.namespace)
.use_db(&self.database)
let raw: Option<serde_json::Value> = self
.graph_db
.select(("graphs", name))
.await
.map_err(|e| KbError::Database(e))?;
.map_err(|e| KbError::Storage(e.to_string()))?;
// Load graph metadata using raw query
let query = "SELECT * FROM graphs:$graph_id;";
let graph_id = name.to_string();
let result: Vec<Option<Graph>> = db
.query(query)
.bind(("graph_id", graph_id))
let mut graph: Graph = raw
.map(from_json)
.transpose()?
.ok_or_else(|| KbError::Graph(format!("Graph not found: {name}")))?;
// Overlay with individually-saved nodes so save_node writes are visible
let raw_nodes: Vec<serde_json::Value> = self
.graph_db
.query("SELECT * FROM nodes WHERE project = $g")
.bind(("g", name.to_string()))
.await
.map_err(|e| KbError::Database(e))?
.map_err(|e| KbError::Storage(e.to_string()))?
.take(0)
.map_err(|e| KbError::Database(e))?;
.map_err(|e| KbError::Storage(e.to_string()))?;
let mut graph = result
.into_iter()
.next()
.flatten()
.ok_or_else(|| KbError::Graph(format!("Graph not found: {}", name)))?;
// Load all nodes for this graph
let query = "SELECT * FROM nodes WHERE id LIKE $pattern;";
let pattern = format!("{}_%", name);
let nodes: Vec<Node> = db
.query(query)
.bind(("pattern", pattern))
.await
.map_err(|e| KbError::Database(e))?
.take(0)
.map_err(|e| KbError::Database(e))?;
for node in nodes {
for raw_node in raw_nodes {
let node: Node = from_json(raw_node)?;
graph.nodes.insert(node.id.clone(), node);
}
@ -146,126 +186,81 @@ impl Storage for SurrealDbStorage {
}
async fn save_node(&mut self, node: &Node) -> Result<()> {
let db = self.db.write().await;
let _ = db
.use_ns(&self.namespace)
.use_db(&self.database)
let graph_name = node.project.as_deref().unwrap_or("default");
let key = Self::node_key(graph_name, &node.id);
let _: Option<serde_json::Value> = self
.graph_db
.upsert(("nodes", key))
.content(to_json(node)?)
.await
.map_err(|e| KbError::Database(e))?;
let graph_name = node
.project
.clone()
.unwrap_or_else(|| "default".to_string());
let node_id = format!("{}_{}", graph_name, node.id);
let node_json = serde_json::to_value(node)
.map_err(|e| KbError::Serialization(format!("Node serialization error: {}", e)))?;
let query = "UPSERT nodes:$node_id SET * = $content;";
let _: Vec<surrealdb::Value> = db
.query(query)
.bind(("node_id", node_id))
.bind(("content", node_json))
.await
.map_err(|e| KbError::Database(e))?
.take(0)
.map_err(|e| KbError::Database(e))?;
.map_err(|e| KbError::Storage(e.to_string()))?;
Ok(())
}
async fn load_node(&self, graph_name: &str, node_id: &str) -> Result<Node> {
let db = self.db.read().await;
let _ = db
.use_ns(&self.namespace)
.use_db(&self.database)
let key = Self::node_key(graph_name, node_id);
let raw: Option<serde_json::Value> = self
.graph_db
.select(("nodes", key))
.await
.map_err(|e| KbError::Database(e))?;
.map_err(|e| KbError::Storage(e.to_string()))?;
let combined_id = format!("{}_{}", graph_name, node_id);
let query = "SELECT * FROM nodes:$node_id;";
let result: Vec<Option<Node>> = db
.query(query)
.bind(("node_id", combined_id))
.await
.map_err(|e| KbError::Database(e))?
.take(0)
.map_err(|e| KbError::Database(e))?;
result
.into_iter()
.next()
.flatten()
.ok_or_else(|| KbError::NodeNotFound(format!("{}/{}", graph_name, node_id)))
raw.map(from_json)
.transpose()?
.ok_or_else(|| KbError::NodeNotFound(format!("{graph_name}/{node_id}")))
}
async fn delete_node(&mut self, graph_name: &str, node_id: &str) -> Result<()> {
let db = self.db.write().await;
let _ = db
.use_ns(&self.namespace)
.use_db(&self.database)
let key = Self::node_key(graph_name, node_id);
let deleted: Option<serde_json::Value> = self
.graph_db
.delete(("nodes", key))
.await
.map_err(|e| KbError::Database(e))?;
let combined_id = format!("{}_{}", graph_name, node_id);
let query = "DELETE nodes:$node_id RETURN BEFORE;";
let deleted: Vec<Option<Node>> = db
.query(query)
.bind(("node_id", combined_id))
.await
.map_err(|e| KbError::Database(e))?
.take(0)
.map_err(|e| KbError::Database(e))?;
.map_err(|e| KbError::Storage(e.to_string()))?;
deleted
.into_iter()
.next()
.flatten()
.ok_or_else(|| KbError::NodeNotFound(format!("{}/{}", graph_name, node_id)))
.ok_or_else(|| KbError::NodeNotFound(format!("{graph_name}/{node_id}")))
.map(|_| ())
}
async fn list_graphs(&self) -> Result<Vec<String>> {
let db = self.db.read().await;
let _ = db
.use_ns(&self.namespace)
.use_db(&self.database)
.await
.map_err(|e| KbError::Database(e))?;
let graphs: Vec<Graph> = db
let raw_graphs: Vec<serde_json::Value> = self
.graph_db
.select("graphs")
.await
.map_err(|e| KbError::Database(e))?;
.map_err(|e| KbError::Storage(e.to_string()))?;
Ok(graphs.into_iter().map(|g| g.name).collect())
raw_graphs
.into_iter()
.map(|v| {
v.get("name")
.and_then(|n| n.as_str())
.map(str::to_string)
.ok_or_else(|| KbError::Storage("graph record missing 'name' field".into()))
})
.collect()
}
async fn list_nodes(&self, graph_name: &str, node_type: Option<&str>) -> Result<Vec<Node>> {
let db = self.db.read().await;
let _ = db
.use_ns(&self.namespace)
.use_db(&self.database)
let raw_nodes: Vec<serde_json::Value> = if let Some(t) = node_type {
self.graph_db
.query("SELECT * FROM nodes WHERE project = $g AND node_type = $t")
.bind(("g", graph_name.to_string()))
.bind(("t", t.to_string()))
.await
.map_err(|e| KbError::Database(e))?;
let mut query_builder = db.query(if let Some(_) = node_type {
"SELECT * FROM nodes WHERE id LIKE $id_pattern AND type = $type_filter"
} else {
"SELECT * FROM nodes WHERE id LIKE $id_pattern"
});
let id_pattern = format!("{}_%", graph_name);
query_builder = query_builder.bind(("id_pattern", id_pattern));
if let Some(filter_type) = node_type {
query_builder = query_builder.bind(("type_filter", filter_type));
}
query_builder
.await
.map_err(|e| KbError::Database(e))?
.map_err(|e| KbError::Storage(e.to_string()))?
.take(0)
.map_err(|e| KbError::Database(e))
.map_err(|e| KbError::Storage(e.to_string()))?
} else {
self.graph_db
.query("SELECT * FROM nodes WHERE project = $g")
.bind(("g", graph_name.to_string()))
.await
.map_err(|e| KbError::Storage(e.to_string()))?
.take(0)
.map_err(|e| KbError::Storage(e.to_string()))?
};
raw_nodes.into_iter().map(from_json).collect()
}
}

View File

@ -63,8 +63,6 @@ fn test_load_production_config() {
assert_eq!(config.graph.name, "tools-ecosystem-kb");
assert!(config.storage.secondary.enabled);
assert_eq!(config.storage.secondary.namespace, "tools_kb");
assert_eq!(config.storage.secondary.database, "production");
assert_eq!(config.embeddings.provider, EmbeddingProvider::Openai);
assert_eq!(config.embeddings.model, "text-embedding-3-small");
assert_eq!(config.query.similarity_threshold, 0.5);

View File

@ -32,6 +32,8 @@
- [ADR-003: Hybrid Storage Strategy](architecture/adrs/003-hybrid-storage.md)
- [ADR-004: Logseq Blocks Support](architecture/adrs/004-logseq-blocks-support.md)
- [ADR-005: MCP Protocol for AI Integration](architecture/adrs/005-mcp-protocol.md)
- [ADR-006: SurrealDB 3.0 Engine Abstraction](architecture/adrs/006-surrealdb-v3-engine-abstraction.md)
- [ADR-007: NATS Event Publishing](architecture/adrs/007-nats-event-publishing.md)
# Setup

View File

@ -0,0 +1,241 @@
# ADR-006: SurrealDB 3.0 Engine Abstraction
**Status**: Accepted
**Date**: 2026-02-21
**Deciders**: Architecture Team
**Supersedes**: [ADR-003: Hybrid Storage Strategy](003-hybrid-storage.md) (SurrealDB connection model)
---
## Context
SurrealDB 2.x required a concrete connection type at compile time (`Surreal<Ws>`, `Surreal<Mem>`,
etc.), forcing a single engine choice per binary. This created three problems:
1. **No runtime engine selection**: switching from embedded to remote required recompilation
2. **Tight test coupling**: tests depended on whichever engine was compiled in
3. **No dual-layout support**: using SurrealKV for graph data and RocksDB for hot data required
separate crates or unsafe casting
SurrealDB 3.0 introduces `surrealdb::engine::any::connect(url)` returning `Surreal<Any>` — a
type-erased connection dispatched at runtime by URL scheme:
| URL scheme | Engine | Characteristics |
|---|---|---|
| `mem://` | In-memory | Ephemeral, test isolation |
| `surrealkv://path` | SurrealKV (B-tree) | Embedded, relational/graph data |
| `rocksdb://path` | RocksDB (LSM) | Embedded, append-heavy hot data |
| `ws://host:port` | WebSocket | Remote, team/shared deployment |
This makes the engine a pure config concern.
---
## Decision
**Use `Surreal<Any>` throughout `SurrealDbStorage`, selecting the engine from `SurrealEngineConfig`
at runtime via URL dispatch.**
### Config Schema
`SurrealEngineConfig` is a serde-tagged enum serialized with `tag = "engine"`:
```rust
#[serde(tag = "engine", rename_all = "snake_case")]
pub enum SurrealEngineConfig {
Mem,
SurrealKv { path: String },
RocksDb { path: String },
Ws { url: String },
}
```
`to_url()` maps variants to the URL scheme `engine::any::connect` expects:
```rust
impl SurrealEngineConfig {
pub fn to_url(&self) -> String {
match self {
Self::Mem => "mem://".to_string(),
Self::SurrealKv { path } => format!("surrealkv://{path}"),
Self::RocksDb { path } => format!("rocksdb://{path}"),
Self::Ws { url } => url.clone(),
}
}
}
```
### Dual Hot/Cold Layout
`SurrealDbStorage` holds two independent `Surreal<Any>` connections:
```rust
pub struct SurrealDbStorage {
graph_db: Surreal<Any>, // SurrealKV default — nodes, edges, metadata
hot_db: Surreal<Any>, // RocksDB default — embeddings, session logs
namespace: String,
}
```
Default production layout:
```nickel
secondary.surrealdb = {
graph = { engine = "surreal_kv", path = ".kogral/db/graph" },
hot = { engine = "rocks_db", path = ".kogral/db/hot" },
namespace = "kogral",
}
```
Test layout (in-memory, no filesystem side effects):
```nickel
secondary.surrealdb = {
graph = { engine = "mem" },
hot = { engine = "mem" },
namespace = "test",
}
```
Remote team deployment (single SurrealDB instance, two databases):
```nickel
secondary.surrealdb = {
graph = { engine = "ws", url = "ws://kb.company.com:8000" },
hot = { engine = "ws", url = "ws://kb.company.com:8000" },
namespace = "engineering",
}
```
### Storage Factory
`storage::factory::build()` performs the runtime dispatch, keeping SurrealDB-specific logic
behind the `surrealdb-backend` feature gate:
```rust
#[allow(clippy::unused_async)]
pub async fn build(config: &StorageConfig, base_path: PathBuf) -> Result<Box<dyn Storage>> {
#[cfg(feature = "surrealdb-backend")]
if config.secondary.enabled && config.secondary.storage_type == SecondaryStorageType::Surrealdb {
let db = SurrealDbStorage::from_config(&config.secondary.surrealdb).await?;
return Ok(Box::new(db));
}
Ok(match config.primary {
StorageType::Memory => Box::new(MemoryStorage::new()),
StorageType::Filesystem => Box::new(FilesystemStorage::new(base_path)),
})
}
```
### Type Erasure Composition
`impl Storage for Box<dyn Storage>` enables `EventingStorage<Box<dyn Storage>>` without
knowing the concrete backend at compile time:
```rust
#[async_trait]
impl Storage for Box<dyn Storage> {
async fn save_graph(&mut self, graph: &Graph) -> Result<()> {
(**self).save_graph(graph).await
}
// ... full delegation for all methods
}
```
### CRUD via `serde_json::Value`
SurrealDB 3.0 removed the `IntoSurrealValue`/`SurrealValue` traits. All CRUD goes through
`serde_json::Value` as the intermediary:
```rust
// save_node
let row = serde_json::to_value(node)
.map_err(|e| KbError::Storage(e.to_string()))?;
let _: Option<serde_json::Value> = self.graph_db
.upsert(("nodes", format!("{graph_name}__{}", node.id)))
.content(row)
.await
.map_err(|e| KbError::Storage(e.to_string()))?;
```
`.bind()` parameters require `'static` values — `&str` arguments must be `.to_string()`:
```rust
let nodes: Vec<Node> = self.graph_db
.query("SELECT * FROM nodes WHERE project = $g")
.bind(("g", graph_name.to_string())) // .to_string() required
.await...
```
### Error Handling
`KbError::Database(#[from] surrealdb::Error)` was removed. All SurrealDB errors convert via
`map_err(|e| KbError::Storage(e.to_string()))`, avoiding `#[from]` coupling to a feature-gated
type that would break compilation on default features.
---
## Consequences
### Positive
- Engine selection is a config value, not a compile-time decision
- Tests run fully in-memory (`engine = "mem"`) with zero filesystem side effects
- Dual layout (SurrealKV + RocksDB) is the embedded production default
- Remote deployment (WebSocket) requires only a config change
- `Storage` trait consumers never see SurrealDB types — `Box<dyn Storage>` is the boundary
### Negative
- `Surreal<Any>` has slightly higher dispatch overhead than concrete types (negligible vs. I/O)
- `serde_json::Value` intermediary adds one extra allocation per CRUD call
- `engine::any` requires all four engine feature flags compiled in when `surrealdb-backend` is
enabled (larger binary)
---
## Feature Matrix
```toml
[features]
default = ["filesystem"]
filesystem = []
surrealdb-backend = ["dep:surrealdb"]
nats-events = ["dep:platform-nats", "dep:bytes"]
orchestration = ["nats-events", "dep:stratum-orchestrator"]
full = ["surrealdb-backend", "nats-events", "orchestration"]
```
SurrealDB dependency activates all four engines:
```toml
surrealdb = { workspace = true, optional = true,
features = ["kv-surrealkv", "kv-rocksdb", "protocol-ws", "rustls"] }
```
---
## References
- [SurrealDB `engine::any` docs](https://surrealdb.com/docs/sdk/rust/setup)
- [SurrealDbStorage](../../../crates/kogral-core/src/storage/surrealdb.rs)
- [Storage Factory](../../../crates/kogral-core/src/storage/factory.rs)
- [Config Schema](../../../crates/kogral-core/src/config/schema.rs)
- [Nickel Defaults](../../../schemas/kogral/defaults.ncl)
---
## Revision History
| Date | Author | Change |
|---|---|---|
| 2026-02-21 | Architecture Team | Initial decision — SurrealDB 3.0 + engine::any |
---
**Previous ADR**: [ADR-005: MCP Protocol](005-mcp-protocol.md)
**Next ADR**: [ADR-007: NATS Event Publishing](007-nats-event-publishing.md)

View File

@ -0,0 +1,243 @@
# ADR-007: NATS JetStream Event Publishing
**Status**: Accepted
**Date**: 2026-02-21
**Deciders**: Architecture Team
**Depends on**: [ADR-006: SurrealDB 3.0 Engine Abstraction](006-surrealdb-v3-engine-abstraction.md)
---
## Context
As KOGRAL scales to organizational deployments, components beyond the core library need to react
to knowledge graph mutations: embedding reindex pipelines, audit trails, cross-graph sync, AI
agent notifications. Polling storage for changes does not scale and creates artificial coupling
between consumer and storage implementation.
NATS JetStream provides durable, at-least-once message delivery with subject hierarchies that
map naturally to KOGRAL's multi-graph structure:
```text
kogral.<graph>.node.saved
kogral.<graph>.node.deleted
kogral.<graph>.graph.saved
```
The challenge: wrapping the storage layer with event publishing must not affect the `Storage`
trait interface, must not require consumers to know which backend is in use, and must be
completely opt-in (off by default, zero overhead when disabled).
---
## Decision
**Implement the `EventingStorage<S>` decorator pattern: wraps any `S: Storage` with
post-mutation NATS JetStream publishes, feature-gated behind `nats-events`.**
### Event Type
```rust
#[serde(tag = "event_type", rename_all = "snake_case")]
pub enum KogralEvent {
NodeSaved { graph: String, node_id: String, node_type: String },
NodeDeleted { graph: String, node_id: String },
GraphSaved { name: String, node_count: usize },
}
impl KogralEvent {
pub fn subject(&self) -> String {
match self {
Self::NodeSaved { graph, .. } => format!("kogral.{graph}.node.saved"),
Self::NodeDeleted { graph, .. } => format!("kogral.{graph}.node.deleted"),
Self::GraphSaved { name, .. } => format!("kogral.{name}.graph.saved"),
}
}
}
```
Events are serialized as JSON and published to NATS subjects derived from graph name. Consumers
can subscribe to `kogral.>` (all events), `kogral.<graph>.>` (single graph), or specific event
types.
### Decorator
```rust
pub struct EventingStorage<S: Storage> {
inner: S,
publisher: EventPublisher,
graph_name: String,
}
```
`EventingStorage<S>` implements `Storage` by delegating to `inner`, then publishing the event.
Failures in publishing do **not** roll back the storage mutation — publishing is best-effort.
### Type Erasure
The factory returns `Box<dyn Storage>`. `impl Storage for Box<dyn Storage>` (added in the same
change) enables `EventingStorage<Box<dyn Storage>>` to satisfy `S: Storage + Send` without the
factory knowing the concrete inner type:
```rust
pub async fn build_eventing(
config: &StorageConfig,
base_path: PathBuf,
default_graph: impl Into<String>,
) -> Result<Box<dyn Storage>> {
let base = build(config, base_path).await?; // Box<dyn Storage>
let Some(nats_cfg) = &config.nats else {
return Ok(base); // no NATS config → no wrapping
};
let stream = EventStream::connect(&nats_config).await?;
Ok(Box::new(EventingStorage::new(base, publisher, default_graph.into())))
}
```
When `config.nats` is `None`, `build_eventing` is identical to `build` — callers always use
`build_eventing` and get wrapping only when configured.
### Config Schema
```nickel
nats = {
enabled = false,
url = "nats://localhost:4222",
stream_name = "KOGRAL",
consumer_name = "kogral-consumer",
subjects = ["kogral.>"],
require_signed_messages = false,
trusted_nkeys = [],
}
```
`NatsEventConfig` converts to `platform_nats::NatsConfig` via `From`:
```rust
impl From<NatsEventConfig> for platform_nats::NatsConfig {
fn from(c: NatsEventConfig) -> Self {
Self {
url: c.url,
stream_name: c.stream_name,
consumer_name: c.consumer_name,
subjects: c.subjects,
nkey_seed: c.nkey_seed,
trusted_nkeys: c.trusted_nkeys,
require_signed_messages: c.require_signed_messages,
}
}
}
```
### Orchestration Bridge
The `orchestration` feature (depends on `nats-events`) provides `pipeline_context_from_event()`
mapping `KogralEvent` to `stratum_orchestrator::PipelineContext` for triggering downstream
automation pipelines:
```rust
pub fn pipeline_context_from_event(
event: &KogralEvent,
extra: serde_json::Value,
) -> PipelineContext {
PipelineContext::new(Uuid::new_v4().to_string(), event.subject(), extra)
}
```
### ncl-import-resolver
Nickel config files may `import` other `.ncl` files. When the Nickel CLI is invoked without a
resolver, imports from outside the project root fail. `resolve_nickel_imports()` runs
`ncl-import-resolver` against a `resolver-manifest.json` in the same directory as the config
file before calling `nickel export`:
```rust
fn resolve_nickel_imports(ncl_file: &Path) -> Result<()> {
let manifest = ncl_file.parent().unwrap_or(Path::new("."))
.join("resolver-manifest.json");
if !manifest.exists() {
return Ok(()); // no manifest → no-op, not an error
}
let output = Command::new("ncl-import-resolver")
.arg(&manifest)
.output()
.map_err(|e| KbError::NickelExport(format!("ncl-import-resolver unavailable: {e}")))?;
if !output.status.success() {
return Err(KbError::NickelExport(
format!("ncl-import-resolver failed: {}", String::from_utf8_lossy(&output.stderr))
));
}
Ok(())
}
```
The resolver step is a no-op when `resolver-manifest.json` is absent, so existing single-file
configs are unaffected.
---
## Consequences
### Positive
- Storage mutations produce observable events with no code changes in callers
- Subject hierarchy (`kogral.<graph>.*`) enables fine-grained consumer subscriptions
- NATS disabled at compile time (no overhead when `nats-events` feature is off)
- NATS disabled at runtime (no overhead when `config.nats` is `None`)
- Publishing failures are non-fatal — storage mutation already committed
- `ncl-import-resolver` enables multi-file Nickel configs without shell wrapper scripts
### Negative
- At-least-once semantics: consumers must be idempotent on duplicate events
- Publishing is fire-and-forget — no delivery confirmation before `save_node` returns
- `orchestration` feature adds `stratum-orchestrator` as a workspace dependency (compile time)
### Neutral
- `EventPublisher` holds an `Arc<EventStream>` so `EventingStorage` is `Clone`-friendly
- NATS subjects use graph name as the second token — graphs named `>` or `*` would conflict
with NATS wildcards (acceptable constraint; graph names are user-defined identifiers)
---
## Alternatives Considered
### Callbacks / Observer Pattern in `Storage` Trait
**Rejected**: Adds optional complexity to the trait itself. Every `Storage` implementation
would need to support callback registration, even when events are never used.
### Database Triggers (SurrealDB `DEFINE EVENT`)
**Rejected**: Couples event logic to the SurrealDB backend. Filesystem and Memory backends
would produce no events, breaking consistency.
### tokio Broadcast Channel
**Rejected**: In-process only, no persistence, no fan-out beyond process boundary. Suitable
for internal state notification, not cross-service event streams.
---
## References
- [events.rs](../../../crates/kogral-core/src/events.rs)
- [orchestration.rs](../../../crates/kogral-core/src/orchestration.rs)
- [factory.rs](../../../crates/kogral-core/src/storage/factory.rs)
- [platform-nats](https://github.com/stratumiops/platform-nats)
- [NATS JetStream docs](https://docs.nats.io/nats-concepts/jetstream)
---
## Revision History
| Date | Author | Change |
|---|---|---|
| 2026-02-21 | Architecture Team | Initial decision — NATS JetStream + EventingStorage + ncl-import-resolver |
---
**Previous ADR**: [ADR-006: SurrealDB v3 Engine Abstraction](006-surrealdb-v3-engine-abstraction.md)

View File

@ -1 +1,176 @@
# SurrealDB Storage
KOGRAL uses SurrealDB 3.0 as its scalable backend, enabled via the `surrealdb-backend` Cargo feature.
The integration is built on `surrealdb::engine::any::connect(url)`, which selects the engine at
runtime from a URL scheme — no recompilation required when switching between embedded, in-memory,
or remote deployments.
## Dual Hot/Cold Layout
`SurrealDbStorage` maintains two independent database connections:
| Connection | Default engine | URL | Purpose |
|---|---|---|---|
| `graph_db` | SurrealKV (B-tree) | `surrealkv://.kogral/db/graph` | Nodes, edges, graph metadata |
| `hot_db` | RocksDB (LSM) | `rocksdb://.kogral/db/hot` | Embeddings, session logs, append data |
SurrealKV's B-tree layout favours point lookups and range scans (node/graph queries). RocksDB's
LSM tree favours sequential writes (embedding vectors, event logs). Separating them avoids
write-amplification cross-contamination.
## Supported Engines
All four engines are compiled in when the `surrealdb-backend` feature is active:
| Nickel `engine` | URL scheme | Cargo feature | Use case |
|---|---|---|---|
| `mem` | `mem://` | `kv-mem` | Tests, ephemeral dev sessions |
| `surreal_kv` | `surrealkv://path` | `kv-surrealkv` | Embedded production (default graph) |
| `rocks_db` | `rocksdb://path` | `kv-rocksdb` | Embedded production (default hot) |
| `ws` | `ws://host:port` | `protocol-ws` | Remote team / shared deployments |
## Configuration
### Embedded (default production)
```nickel
storage = {
primary = 'filesystem,
secondary = {
enabled = true,
type = 'surrealdb,
surrealdb = {
graph = { engine = "surreal_kv", path = ".kogral/db/graph" },
hot = { engine = "rocks_db", path = ".kogral/db/hot" },
namespace = "kogral",
},
},
}
```
### In-Memory (tests, CI)
```nickel
storage = {
primary = 'memory,
secondary = {
enabled = true,
type = 'surrealdb,
surrealdb = {
graph = { engine = "mem" },
hot = { engine = "mem" },
namespace = "test",
},
},
}
```
### Remote WebSocket (team/shared deployment)
```nickel
storage = {
primary = 'filesystem,
secondary = {
enabled = true,
type = 'surrealdb,
surrealdb = {
graph = { engine = "ws", url = "ws://kb.company.com:8000" },
hot = { engine = "ws", url = "ws://kb.company.com:8000" },
namespace = "engineering",
},
},
}
```
## Building with SurrealDB Support
```bash
# Debug build
cargo build -p kogral-core --features surrealdb-backend
# All features (SurrealDB + NATS + orchestration)
cargo build -p kogral-core --all-features
# Justfile shortcut
just build::core-db
```
## CRUD Pattern
All CRUD operations route through `serde_json::Value` as the intermediary type (SurrealDB 3.0
removed `IntoSurrealValue`/`SurrealValue`). The key format for nodes is
`("{graph_name}__{node_id}")` on the `nodes` table:
```rust
// upsert
let row = serde_json::to_value(node)?;
let _: Option<serde_json::Value> = graph_db
.upsert(("nodes", format!("{graph_name}__{}", node.id)))
.content(row)
.await?;
// select
let raw: Option<serde_json::Value> = graph_db
.select(("nodes", format!("{graph_name}__{node_id}")))
.await?;
// delete
let _: Option<serde_json::Value> = graph_db
.delete(("nodes", format!("{graph_name}__{node_id}")))
.await?;
// list by graph (query API)
let nodes: Vec<Node> = graph_db
.query("SELECT * FROM nodes WHERE project = $g")
.bind(("g", graph_name.to_string()))
.await?
.take(0)?;
```
`.bind()` parameters require owned `String` values — `&str` slices do not satisfy the `'static`
bound in SurrealDB 3.0's bind API.
## Hot Data Methods
`SurrealDbStorage` exposes direct methods on `hot_db` that are outside the `Storage` trait:
```rust
// Store embedding vector for a node
pub async fn save_embedding(&self, node_id: &str, vector: &[f32]) -> Result<()>
// Append a session event to the log
pub async fn log_session(&self, entry: &serde_json::Value) -> Result<()>
```
These operate on the `embeddings` and `sessions` tables in `hot_db`.
## NATS Event Integration
When the `nats-events` feature is enabled and `config.nats` is present, the storage factory
wraps `SurrealDbStorage` (or any other backend) with `EventingStorage`. Every mutation emits
a NATS JetStream event:
```text
kogral.<graph>.node.saved → NodeSaved { graph, node_id, node_type }
kogral.<graph>.node.deleted → NodeDeleted { graph, node_id }
kogral.<graph>.graph.saved → GraphSaved { name, node_count }
```
See [ADR-007: NATS Event Publishing](../architecture/adrs/007-nats-event-publishing.md) for design rationale.
## Feature Matrix
| Feature | Includes |
|---|---|
| `filesystem` (default) | `FilesystemStorage` only |
| `surrealdb-backend` | `SurrealDbStorage` + all four engines |
| `nats-events` | `EventingStorage`, `KogralEvent`, NATS JetStream client |
| `orchestration` | `nats-events` + `stratum-orchestrator` bridge |
| `full` | All of the above |
## Related
- [ADR-003: Hybrid Storage Strategy](../architecture/adrs/003-hybrid-storage.md)
- [ADR-006: SurrealDB 3.0 Engine Abstraction](../architecture/adrs/006-surrealdb-v3-engine-abstraction.md)
- [storage/factory.rs](../../crates/kogral-core/src/storage/factory.rs)
- [storage/surrealdb.rs](../../crates/kogral-core/src/storage/surrealdb.rs)

View File

@ -37,6 +37,43 @@
StorageType = [| 'filesystem, 'memory, 'surrealdb |],
# SurrealEngineConfig — tagged union via `engine` field
# Matches Rust's #[serde(tag = "engine", rename_all = "snake_case")] enum.
SurrealEngineValidator = fun label value =>
let engines = ["mem", "surreal_kv", "rocks_db", "ws"] in
let engine = value.engine in
if std.array.any (fun e => e == engine) engines then
value
else
std.contract.blame_with_message
"engine must be one of: \"mem\", \"surreal_kv\", \"rocks_db\", \"ws\""
label,
SurrealEngineConfig = {
engine | String
| doc "Engine type: \"mem\", \"surreal_kv\", \"rocks_db\", or \"ws\"",
path | String
| doc "Filesystem path for embedded engines (surreal_kv, rocks_db)"
| optional,
url | String
| doc "Remote URL for ws engine"
| optional,
} | SurrealEngineValidator,
SurrealDbBackendConfig = {
graph | SurrealEngineConfig
| doc "Engine for graph/relational data (default: SurrealKV)"
| default = { engine = "surreal_kv", path = ".kogral/db/graph" },
hot | SurrealEngineConfig
| doc "Engine for hot/append data (default: RocksDB)"
| default = { engine = "rocks_db", path = ".kogral/db/hot" },
namespace | String
| doc "SurrealDB namespace shared by both engines"
| default = "kogral",
},
SecondaryStorageConfig = {
enabled | Bool
| doc "Enable secondary storage backend"
@ -46,25 +83,39 @@
| doc "Secondary storage type"
| default = 'surrealdb,
surrealdb | SurrealDbBackendConfig
| doc "SurrealDB engine layout (dual hot/cold)"
| default = {},
},
NatsEventConfig = {
enabled | Bool
| doc "Whether to activate NATS event publishing"
| default = false,
url | String
| doc "Connection URL"
| default = "ws://localhost:8000",
| doc "NATS server URL"
| default = "nats://localhost:4222",
namespace | String
| doc "SurrealDB namespace"
| default = "kb",
stream_name | String
| doc "JetStream stream name"
| default = "KOGRAL",
database | String
| doc "SurrealDB database name"
| default = "default",
consumer_name | String
| doc "Durable consumer name"
| default = "kogral-consumer",
username | String
| doc "Database username"
| optional,
subjects | Array String
| doc "Subjects captured by this stream"
| default = ["kogral.>"],
password | String
| doc "Database password"
| optional,
require_signed_messages | Bool
| doc "Reject messages without valid NKey signatures"
| default = false,
trusted_nkeys | Array String
| doc "Public NKeys whose signatures are trusted"
| default = [],
},
StorageConfig = {
@ -75,6 +126,10 @@
secondary | SecondaryStorageConfig
| doc "Optional secondary storage"
| default = { enabled = false },
nats | NatsEventConfig
| doc "NATS JetStream event publishing"
| default = {},
},
# === EMBEDDINGS ===

View File

@ -28,9 +28,20 @@ let contracts = import "contracts.ncl" in
secondary = {
enabled = false,
type = 'surrealdb,
url = "ws://localhost:8000",
surrealdb = {
graph = { engine = "surreal_kv", path = ".kogral/db/graph" },
hot = { engine = "rocks_db", path = ".kogral/db/hot" },
namespace = "kogral",
database = "default",
},
},
nats = {
enabled = false,
url = "nats://localhost:4222",
stream_name = "KOGRAL",
consumer_name = "kogral-consumer",
subjects = ["kogral.>"],
require_signed_messages = false,
trusted_nkeys = [],
},
},