kogral/docs/architecture/adrs/007-nats-event-publishing.md
Jesús Pérez 1329eb509f
Some checks failed
Nickel Type Check / Nickel Type Checking (push) Has been cancelled
Rust CI / Security Audit (push) Has been cancelled
Rust CI / Check + Test + Lint (nightly) (push) Has been cancelled
Rust CI / Check + Test + Lint (stable) (push) Has been cancelled
feat(core): add SurrealDB v3 engine abstraction, NATS event publishing, and storage factory
Key changes: new events.rs (NATS EventingStorage decorator), storage/factory.rs (backend selection), orchestration.rs, SurrealDB v3
  engine upgrade, expanded Nickel schemas, and two new ADRs (006, 007).
2026-02-22 21:51:53 +00:00

7.8 KiB

ADR-007: NATS JetStream Event Publishing

Status: Accepted

Date: 2026-02-21

Deciders: Architecture Team

Depends on: ADR-006: SurrealDB 3.0 Engine Abstraction


Context

As KOGRAL scales to organizational deployments, components beyond the core library need to react to knowledge graph mutations: embedding reindex pipelines, audit trails, cross-graph sync, AI agent notifications. Polling storage for changes does not scale and creates artificial coupling between consumer and storage implementation.

NATS JetStream provides durable, at-least-once message delivery with subject hierarchies that map naturally to KOGRAL's multi-graph structure:

kogral.<graph>.node.saved
kogral.<graph>.node.deleted
kogral.<graph>.graph.saved

The challenge: wrapping the storage layer with event publishing must not affect the Storage trait interface, must not require consumers to know which backend is in use, and must be completely opt-in (off by default, zero overhead when disabled).


Decision

Implement the EventingStorage<S> decorator pattern: wraps any S: Storage with post-mutation NATS JetStream publishes, feature-gated behind nats-events.

Event Type

#[serde(tag = "event_type", rename_all = "snake_case")]
pub enum KogralEvent {
    NodeSaved   { graph: String, node_id: String, node_type: String },
    NodeDeleted { graph: String, node_id: String },
    GraphSaved  { name: String, node_count: usize },
}

impl KogralEvent {
    pub fn subject(&self) -> String {
        match self {
            Self::NodeSaved   { graph, .. } => format!("kogral.{graph}.node.saved"),
            Self::NodeDeleted { graph, .. } => format!("kogral.{graph}.node.deleted"),
            Self::GraphSaved  { name, .. }  => format!("kogral.{name}.graph.saved"),
        }
    }
}

Events are serialized as JSON and published to NATS subjects derived from graph name. Consumers can subscribe to kogral.> (all events), kogral.<graph>.> (single graph), or specific event types.

Decorator

pub struct EventingStorage<S: Storage> {
    inner:      S,
    publisher:  EventPublisher,
    graph_name: String,
}

EventingStorage<S> implements Storage by delegating to inner, then publishing the event. Failures in publishing do not roll back the storage mutation — publishing is best-effort.

Type Erasure

The factory returns Box<dyn Storage>. impl Storage for Box<dyn Storage> (added in the same change) enables EventingStorage<Box<dyn Storage>> to satisfy S: Storage + Send without the factory knowing the concrete inner type:

pub async fn build_eventing(
    config: &StorageConfig,
    base_path: PathBuf,
    default_graph: impl Into<String>,
) -> Result<Box<dyn Storage>> {
    let base = build(config, base_path).await?;        // Box<dyn Storage>
    let Some(nats_cfg) = &config.nats else {
        return Ok(base);                               // no NATS config → no wrapping
    };
    let stream = EventStream::connect(&nats_config).await?;
    Ok(Box::new(EventingStorage::new(base, publisher, default_graph.into())))
}

When config.nats is None, build_eventing is identical to build — callers always use build_eventing and get wrapping only when configured.

Config Schema

nats = {
  enabled        = false,
  url            = "nats://localhost:4222",
  stream_name    = "KOGRAL",
  consumer_name  = "kogral-consumer",
  subjects       = ["kogral.>"],
  require_signed_messages = false,
  trusted_nkeys  = [],
}

NatsEventConfig converts to platform_nats::NatsConfig via From:

impl From<NatsEventConfig> for platform_nats::NatsConfig {
    fn from(c: NatsEventConfig) -> Self {
        Self {
            url: c.url,
            stream_name: c.stream_name,
            consumer_name: c.consumer_name,
            subjects: c.subjects,
            nkey_seed: c.nkey_seed,
            trusted_nkeys: c.trusted_nkeys,
            require_signed_messages: c.require_signed_messages,
        }
    }
}

Orchestration Bridge

The orchestration feature (depends on nats-events) provides pipeline_context_from_event() mapping KogralEvent to stratum_orchestrator::PipelineContext for triggering downstream automation pipelines:

pub fn pipeline_context_from_event(
    event: &KogralEvent,
    extra: serde_json::Value,
) -> PipelineContext {
    PipelineContext::new(Uuid::new_v4().to_string(), event.subject(), extra)
}

ncl-import-resolver

Nickel config files may import other .ncl files. When the Nickel CLI is invoked without a resolver, imports from outside the project root fail. resolve_nickel_imports() runs ncl-import-resolver against a resolver-manifest.json in the same directory as the config file before calling nickel export:

fn resolve_nickel_imports(ncl_file: &Path) -> Result<()> {
    let manifest = ncl_file.parent().unwrap_or(Path::new("."))
        .join("resolver-manifest.json");
    if !manifest.exists() {
        return Ok(());   // no manifest → no-op, not an error
    }
    let output = Command::new("ncl-import-resolver")
        .arg(&manifest)
        .output()
        .map_err(|e| KbError::NickelExport(format!("ncl-import-resolver unavailable: {e}")))?;
    if !output.status.success() {
        return Err(KbError::NickelExport(
            format!("ncl-import-resolver failed: {}", String::from_utf8_lossy(&output.stderr))
        ));
    }
    Ok(())
}

The resolver step is a no-op when resolver-manifest.json is absent, so existing single-file configs are unaffected.


Consequences

Positive

  • Storage mutations produce observable events with no code changes in callers
  • Subject hierarchy (kogral.<graph>.*) enables fine-grained consumer subscriptions
  • NATS disabled at compile time (no overhead when nats-events feature is off)
  • NATS disabled at runtime (no overhead when config.nats is None)
  • Publishing failures are non-fatal — storage mutation already committed
  • ncl-import-resolver enables multi-file Nickel configs without shell wrapper scripts

Negative

  • At-least-once semantics: consumers must be idempotent on duplicate events
  • Publishing is fire-and-forget — no delivery confirmation before save_node returns
  • orchestration feature adds stratum-orchestrator as a workspace dependency (compile time)

Neutral

  • EventPublisher holds an Arc<EventStream> so EventingStorage is Clone-friendly
  • NATS subjects use graph name as the second token — graphs named > or * would conflict with NATS wildcards (acceptable constraint; graph names are user-defined identifiers)

Alternatives Considered

Callbacks / Observer Pattern in Storage Trait

Rejected: Adds optional complexity to the trait itself. Every Storage implementation would need to support callback registration, even when events are never used.

Database Triggers (SurrealDB DEFINE EVENT)

Rejected: Couples event logic to the SurrealDB backend. Filesystem and Memory backends would produce no events, breaking consistency.

tokio Broadcast Channel

Rejected: In-process only, no persistence, no fan-out beyond process boundary. Suitable for internal state notification, not cross-service event streams.


References


Revision History

Date Author Change
2026-02-21 Architecture Team Initial decision — NATS JetStream + EventingStorage + ncl-import-resolver

Previous ADR: ADR-006: SurrealDB v3 Engine Abstraction