Key changes: new events.rs (NATS EventingStorage decorator), storage/factory.rs (backend selection), orchestration.rs, SurrealDB v3 engine upgrade, expanded Nickel schemas, and two new ADRs (006, 007).
7.8 KiB
ADR-007: NATS JetStream Event Publishing
Status: Accepted
Date: 2026-02-21
Deciders: Architecture Team
Depends on: ADR-006: SurrealDB 3.0 Engine Abstraction
Context
As KOGRAL scales to organizational deployments, components beyond the core library need to react to knowledge graph mutations: embedding reindex pipelines, audit trails, cross-graph sync, AI agent notifications. Polling storage for changes does not scale and creates artificial coupling between consumer and storage implementation.
NATS JetStream provides durable, at-least-once message delivery with subject hierarchies that map naturally to KOGRAL's multi-graph structure:
kogral.<graph>.node.saved
kogral.<graph>.node.deleted
kogral.<graph>.graph.saved
The challenge: wrapping the storage layer with event publishing must not affect the Storage
trait interface, must not require consumers to know which backend is in use, and must be
completely opt-in (off by default, zero overhead when disabled).
Decision
Implement the EventingStorage<S> decorator pattern: wraps any S: Storage with
post-mutation NATS JetStream publishes, feature-gated behind nats-events.
Event Type
#[serde(tag = "event_type", rename_all = "snake_case")]
pub enum KogralEvent {
NodeSaved { graph: String, node_id: String, node_type: String },
NodeDeleted { graph: String, node_id: String },
GraphSaved { name: String, node_count: usize },
}
impl KogralEvent {
pub fn subject(&self) -> String {
match self {
Self::NodeSaved { graph, .. } => format!("kogral.{graph}.node.saved"),
Self::NodeDeleted { graph, .. } => format!("kogral.{graph}.node.deleted"),
Self::GraphSaved { name, .. } => format!("kogral.{name}.graph.saved"),
}
}
}
Events are serialized as JSON and published to NATS subjects derived from graph name. Consumers
can subscribe to kogral.> (all events), kogral.<graph>.> (single graph), or specific event
types.
Decorator
pub struct EventingStorage<S: Storage> {
inner: S,
publisher: EventPublisher,
graph_name: String,
}
EventingStorage<S> implements Storage by delegating to inner, then publishing the event.
Failures in publishing do not roll back the storage mutation — publishing is best-effort.
Type Erasure
The factory returns Box<dyn Storage>. impl Storage for Box<dyn Storage> (added in the same
change) enables EventingStorage<Box<dyn Storage>> to satisfy S: Storage + Send without the
factory knowing the concrete inner type:
pub async fn build_eventing(
config: &StorageConfig,
base_path: PathBuf,
default_graph: impl Into<String>,
) -> Result<Box<dyn Storage>> {
let base = build(config, base_path).await?; // Box<dyn Storage>
let Some(nats_cfg) = &config.nats else {
return Ok(base); // no NATS config → no wrapping
};
let stream = EventStream::connect(&nats_config).await?;
Ok(Box::new(EventingStorage::new(base, publisher, default_graph.into())))
}
When config.nats is None, build_eventing is identical to build — callers always use
build_eventing and get wrapping only when configured.
Config Schema
nats = {
enabled = false,
url = "nats://localhost:4222",
stream_name = "KOGRAL",
consumer_name = "kogral-consumer",
subjects = ["kogral.>"],
require_signed_messages = false,
trusted_nkeys = [],
}
NatsEventConfig converts to platform_nats::NatsConfig via From:
impl From<NatsEventConfig> for platform_nats::NatsConfig {
fn from(c: NatsEventConfig) -> Self {
Self {
url: c.url,
stream_name: c.stream_name,
consumer_name: c.consumer_name,
subjects: c.subjects,
nkey_seed: c.nkey_seed,
trusted_nkeys: c.trusted_nkeys,
require_signed_messages: c.require_signed_messages,
}
}
}
Orchestration Bridge
The orchestration feature (depends on nats-events) provides pipeline_context_from_event()
mapping KogralEvent to stratum_orchestrator::PipelineContext for triggering downstream
automation pipelines:
pub fn pipeline_context_from_event(
event: &KogralEvent,
extra: serde_json::Value,
) -> PipelineContext {
PipelineContext::new(Uuid::new_v4().to_string(), event.subject(), extra)
}
ncl-import-resolver
Nickel config files may import other .ncl files. When the Nickel CLI is invoked without a
resolver, imports from outside the project root fail. resolve_nickel_imports() runs
ncl-import-resolver against a resolver-manifest.json in the same directory as the config
file before calling nickel export:
fn resolve_nickel_imports(ncl_file: &Path) -> Result<()> {
let manifest = ncl_file.parent().unwrap_or(Path::new("."))
.join("resolver-manifest.json");
if !manifest.exists() {
return Ok(()); // no manifest → no-op, not an error
}
let output = Command::new("ncl-import-resolver")
.arg(&manifest)
.output()
.map_err(|e| KbError::NickelExport(format!("ncl-import-resolver unavailable: {e}")))?;
if !output.status.success() {
return Err(KbError::NickelExport(
format!("ncl-import-resolver failed: {}", String::from_utf8_lossy(&output.stderr))
));
}
Ok(())
}
The resolver step is a no-op when resolver-manifest.json is absent, so existing single-file
configs are unaffected.
Consequences
Positive
- Storage mutations produce observable events with no code changes in callers
- Subject hierarchy (
kogral.<graph>.*) enables fine-grained consumer subscriptions - NATS disabled at compile time (no overhead when
nats-eventsfeature is off) - NATS disabled at runtime (no overhead when
config.natsisNone) - Publishing failures are non-fatal — storage mutation already committed
ncl-import-resolverenables multi-file Nickel configs without shell wrapper scripts
Negative
- At-least-once semantics: consumers must be idempotent on duplicate events
- Publishing is fire-and-forget — no delivery confirmation before
save_nodereturns orchestrationfeature addsstratum-orchestratoras a workspace dependency (compile time)
Neutral
EventPublisherholds anArc<EventStream>soEventingStorageisClone-friendly- NATS subjects use graph name as the second token — graphs named
>or*would conflict with NATS wildcards (acceptable constraint; graph names are user-defined identifiers)
Alternatives Considered
Callbacks / Observer Pattern in Storage Trait
Rejected: Adds optional complexity to the trait itself. Every Storage implementation
would need to support callback registration, even when events are never used.
Database Triggers (SurrealDB DEFINE EVENT)
Rejected: Couples event logic to the SurrealDB backend. Filesystem and Memory backends would produce no events, breaking consistency.
tokio Broadcast Channel
Rejected: In-process only, no persistence, no fan-out beyond process boundary. Suitable for internal state notification, not cross-service event streams.
References
Revision History
| Date | Author | Change |
|---|---|---|
| 2026-02-21 | Architecture Team | Initial decision — NATS JetStream + EventingStorage + ncl-import-resolver |
Previous ADR: ADR-006: SurrealDB v3 Engine Abstraction