kogral/docs/architecture/adrs/007-nats-event-publishing.md

244 lines
7.8 KiB
Markdown
Raw Permalink Normal View History

# ADR-007: NATS JetStream Event Publishing
**Status**: Accepted
**Date**: 2026-02-21
**Deciders**: Architecture Team
**Depends on**: [ADR-006: SurrealDB 3.0 Engine Abstraction](006-surrealdb-v3-engine-abstraction.md)
---
## Context
As KOGRAL scales to organizational deployments, components beyond the core library need to react
to knowledge graph mutations: embedding reindex pipelines, audit trails, cross-graph sync, AI
agent notifications. Polling storage for changes does not scale and creates artificial coupling
between consumer and storage implementation.
NATS JetStream provides durable, at-least-once message delivery with subject hierarchies that
map naturally to KOGRAL's multi-graph structure:
```text
kogral.<graph>.node.saved
kogral.<graph>.node.deleted
kogral.<graph>.graph.saved
```
The challenge: wrapping the storage layer with event publishing must not affect the `Storage`
trait interface, must not require consumers to know which backend is in use, and must be
completely opt-in (off by default, zero overhead when disabled).
---
## Decision
**Implement the `EventingStorage<S>` decorator pattern: wraps any `S: Storage` with
post-mutation NATS JetStream publishes, feature-gated behind `nats-events`.**
### Event Type
```rust
#[serde(tag = "event_type", rename_all = "snake_case")]
pub enum KogralEvent {
NodeSaved { graph: String, node_id: String, node_type: String },
NodeDeleted { graph: String, node_id: String },
GraphSaved { name: String, node_count: usize },
}
impl KogralEvent {
pub fn subject(&self) -> String {
match self {
Self::NodeSaved { graph, .. } => format!("kogral.{graph}.node.saved"),
Self::NodeDeleted { graph, .. } => format!("kogral.{graph}.node.deleted"),
Self::GraphSaved { name, .. } => format!("kogral.{name}.graph.saved"),
}
}
}
```
Events are serialized as JSON and published to NATS subjects derived from graph name. Consumers
can subscribe to `kogral.>` (all events), `kogral.<graph>.>` (single graph), or specific event
types.
### Decorator
```rust
pub struct EventingStorage<S: Storage> {
inner: S,
publisher: EventPublisher,
graph_name: String,
}
```
`EventingStorage<S>` implements `Storage` by delegating to `inner`, then publishing the event.
Failures in publishing do **not** roll back the storage mutation — publishing is best-effort.
### Type Erasure
The factory returns `Box<dyn Storage>`. `impl Storage for Box<dyn Storage>` (added in the same
change) enables `EventingStorage<Box<dyn Storage>>` to satisfy `S: Storage + Send` without the
factory knowing the concrete inner type:
```rust
pub async fn build_eventing(
config: &StorageConfig,
base_path: PathBuf,
default_graph: impl Into<String>,
) -> Result<Box<dyn Storage>> {
let base = build(config, base_path).await?; // Box<dyn Storage>
let Some(nats_cfg) = &config.nats else {
return Ok(base); // no NATS config → no wrapping
};
let stream = EventStream::connect(&nats_config).await?;
Ok(Box::new(EventingStorage::new(base, publisher, default_graph.into())))
}
```
When `config.nats` is `None`, `build_eventing` is identical to `build` — callers always use
`build_eventing` and get wrapping only when configured.
### Config Schema
```nickel
nats = {
enabled = false,
url = "nats://localhost:4222",
stream_name = "KOGRAL",
consumer_name = "kogral-consumer",
subjects = ["kogral.>"],
require_signed_messages = false,
trusted_nkeys = [],
}
```
`NatsEventConfig` converts to `platform_nats::NatsConfig` via `From`:
```rust
impl From<NatsEventConfig> for platform_nats::NatsConfig {
fn from(c: NatsEventConfig) -> Self {
Self {
url: c.url,
stream_name: c.stream_name,
consumer_name: c.consumer_name,
subjects: c.subjects,
nkey_seed: c.nkey_seed,
trusted_nkeys: c.trusted_nkeys,
require_signed_messages: c.require_signed_messages,
}
}
}
```
### Orchestration Bridge
The `orchestration` feature (depends on `nats-events`) provides `pipeline_context_from_event()`
mapping `KogralEvent` to `stratum_orchestrator::PipelineContext` for triggering downstream
automation pipelines:
```rust
pub fn pipeline_context_from_event(
event: &KogralEvent,
extra: serde_json::Value,
) -> PipelineContext {
PipelineContext::new(Uuid::new_v4().to_string(), event.subject(), extra)
}
```
### ncl-import-resolver
Nickel config files may `import` other `.ncl` files. When the Nickel CLI is invoked without a
resolver, imports from outside the project root fail. `resolve_nickel_imports()` runs
`ncl-import-resolver` against a `resolver-manifest.json` in the same directory as the config
file before calling `nickel export`:
```rust
fn resolve_nickel_imports(ncl_file: &Path) -> Result<()> {
let manifest = ncl_file.parent().unwrap_or(Path::new("."))
.join("resolver-manifest.json");
if !manifest.exists() {
return Ok(()); // no manifest → no-op, not an error
}
let output = Command::new("ncl-import-resolver")
.arg(&manifest)
.output()
.map_err(|e| KbError::NickelExport(format!("ncl-import-resolver unavailable: {e}")))?;
if !output.status.success() {
return Err(KbError::NickelExport(
format!("ncl-import-resolver failed: {}", String::from_utf8_lossy(&output.stderr))
));
}
Ok(())
}
```
The resolver step is a no-op when `resolver-manifest.json` is absent, so existing single-file
configs are unaffected.
---
## Consequences
### Positive
- Storage mutations produce observable events with no code changes in callers
- Subject hierarchy (`kogral.<graph>.*`) enables fine-grained consumer subscriptions
- NATS disabled at compile time (no overhead when `nats-events` feature is off)
- NATS disabled at runtime (no overhead when `config.nats` is `None`)
- Publishing failures are non-fatal — storage mutation already committed
- `ncl-import-resolver` enables multi-file Nickel configs without shell wrapper scripts
### Negative
- At-least-once semantics: consumers must be idempotent on duplicate events
- Publishing is fire-and-forget — no delivery confirmation before `save_node` returns
- `orchestration` feature adds `stratum-orchestrator` as a workspace dependency (compile time)
### Neutral
- `EventPublisher` holds an `Arc<EventStream>` so `EventingStorage` is `Clone`-friendly
- NATS subjects use graph name as the second token — graphs named `>` or `*` would conflict
with NATS wildcards (acceptable constraint; graph names are user-defined identifiers)
---
## Alternatives Considered
### Callbacks / Observer Pattern in `Storage` Trait
**Rejected**: Adds optional complexity to the trait itself. Every `Storage` implementation
would need to support callback registration, even when events are never used.
### Database Triggers (SurrealDB `DEFINE EVENT`)
**Rejected**: Couples event logic to the SurrealDB backend. Filesystem and Memory backends
would produce no events, breaking consistency.
### tokio Broadcast Channel
**Rejected**: In-process only, no persistence, no fan-out beyond process boundary. Suitable
for internal state notification, not cross-service event streams.
---
## References
- [events.rs](../../../crates/kogral-core/src/events.rs)
- [orchestration.rs](../../../crates/kogral-core/src/orchestration.rs)
- [factory.rs](../../../crates/kogral-core/src/storage/factory.rs)
- [platform-nats](https://github.com/stratumiops/platform-nats)
- [NATS JetStream docs](https://docs.nats.io/nats-concepts/jetstream)
---
## Revision History
| Date | Author | Change |
|---|---|---|
| 2026-02-21 | Architecture Team | Initial decision — NATS JetStream + EventingStorage + ncl-import-resolver |
---
**Previous ADR**: [ADR-006: SurrealDB v3 Engine Abstraction](006-surrealdb-v3-engine-abstraction.md)