feat: config-driven topology — replace hardcoded provisioning streams with JSON config
Some checks failed
Build and Test / Validate Setup (push) Has been cancelled
Build and Test / Build (darwin-amd64) (push) Has been cancelled
Build and Test / Build (darwin-arm64) (push) Has been cancelled
Build and Test / Build (linux-amd64) (push) Has been cancelled
Build and Test / Build (windows-amd64) (push) Has been cancelled
Build and Test / Build (linux-arm64) (push) Has been cancelled
Build and Test / Security Audit (push) Has been cancelled
Build and Test / Package Results (push) Has been cancelled
Build and Test / Quality Gate (push) Has been cancelled

All commands now read stream/consumer definitions from a topology JSON file
  (--config flag or NATS_STREAMS_CONFIG env). nats pub publishes to exact
  subjects without auto-prefixing. Category changed from provisioning to nats
This commit is contained in:
Jesús Pérez 2026-03-11 03:17:08 +00:00
parent 5229e76cfb
commit b6eeaee4da
7 changed files with 4757 additions and 0 deletions

3507
nu_plugin_nats/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

32
nu_plugin_nats/Cargo.toml Normal file
View File

@ -0,0 +1,32 @@
[package]
name = "nu_plugin_nats"
version = "0.111.0"
edition = "2021"
authors = ["Jesus Perez <jesus@librecloud.online>"]
description = "Nushell plugin for NATS JetStream operations with config-driven topology"
license = "MIT"
[dependencies]
nu-plugin = "0.111.0"
nu-protocol = "0.111.0"
async-nats = "0.46"
bytes = "1"
futures = "0.3"
serde_json = "1"
thiserror = "2"
chrono = "0.4"
interprocess = "^2.3.1"
[dependencies.serde]
version = "1"
features = ["derive"]
[dependencies.tokio]
version = "1"
features = [
"rt",
"time",
]
[dev-dependencies]
nu-plugin-test-support = "0.111.0"

122
nu_plugin_nats/README.md Normal file
View File

@ -0,0 +1,122 @@
# nu_plugin_nats
Nushell plugin for NATS JetStream operations on the provisioning platform event bus.
Bundles `async-nats` directly — no external `nats` CLI required.
## Commands
| Command | Input | Output | Description |
|---------|-------|--------|-------------|
| `nats stream setup` | nothing | nothing | Create the 6 platform streams (idempotent) |
| `nats consumer setup` | nothing | nothing | Create `cli-notifications` consumers on WORKSPACE + AUDIT (idempotent) |
| `nats notify` | nothing | `list<record>` | Drain pending notifications from WORKSPACE and AUDIT |
| `nats pub <subject>` | `any` | `record` | Publish a JSON event to a platform subject |
| `nats status` | nothing | `list<record>` | Live state of all 6 streams |
## Installation
```bash
cd plugins/nushell-plugins
just install-plugin nu_plugin_nats
```
## Configuration
| Variable | Default | Description |
|----------|---------|-------------|
| `NATS_SERVER` | `nats://127.0.0.1:4222` | NATS server URL |
## Quick Start
Bootstrap streams and consumers on first use:
```nushell
nats stream setup
nats consumer setup
nats status
```
Publish an event from pipeline:
```nushell
{workspace_id: "ws-1", action: "deploy", status: "started"}
| nats pub "workspace.deploy.started"
# => {subject: "provisioning.workspace.deploy.started", stream: "WORKSPACE", sequence: 42}
```
Publish with raw JSON payload:
```nushell
nats pub "audit.login" --payload '{"user":"admin","ip":"10.0.0.1"}'
```
Drain pending notifications:
```nushell
nats notify
nats notify --count 10 --timeout 2
```
Filter by stream:
```nushell
nats notify | where stream == "AUDIT"
```
Extract payload fields:
```nushell
nats notify | get payload | where workspace_id == "ws-1"
```
Monitor stream health:
```nushell
nats status | select stream messages consumers
nats status | where messages > 0
```
## Subjects
The plugin prefixes `provisioning.` automatically. Pass only the suffix:
```text
"workspace.deploy.done" → provisioning.workspace.deploy.done → WORKSPACE stream
"audit.login.failed" → provisioning.audit.login.failed → AUDIT stream
"tasks.job.created" → provisioning.tasks.job.created → TASKS stream
```
Passing a subject that already starts with `provisioning.` skips the prefix.
## Streams
| Stream | Subjects | Retention | Max Age |
|--------|----------|-----------|---------|
| `TASKS` | `provisioning.tasks.>` | WorkQueue | — |
| `VAULT` | `provisioning.vault.>` | Interest | — |
| `AUTH` | `provisioning.auth.>` | Interest | — |
| `WORKSPACE` | `provisioning.workspace.>` | Limits | 7 days |
| `AUDIT` | `provisioning.audit.>` | Limits | 90 days |
| `HEALTH` | `provisioning.health.>` | Interest | — |
`cli-notifications` consumers are created on `WORKSPACE` and `AUDIT` only.
These are the streams with `Limits` retention — they retain messages independently of active subscribers.
## Testing
Requires NATS running on `$NATS_SERVER`:
```bash
nu tests/integration.nu
```
## Source
```text
src/
├── main.rs # Plugin struct + 5 command impls + JSON↔Value converters
├── client.rs # NatsError + nats_connect()
└── streams.rs # ensure_streams, ensure_consumers, fetch_notifications,
# get_stream_status, publish_message
```

View File

@ -0,0 +1,28 @@
use async_nats::{jetstream, Client};
#[derive(Debug, thiserror::Error)]
pub enum NatsError {
#[error("connection failed: {0}")]
Connect(String),
#[error("stream error: {0}")]
Stream(String),
#[error("consumer error: {0}")]
Consumer(String),
#[error("publish error: {0}")]
Publish(String),
#[error("fetch error: {0}")]
Fetch(String),
}
/// Connect to NATS JetStream. Reads `NATS_SERVER` env var, defaults to `nats://127.0.0.1:4222`.
pub async fn nats_connect() -> Result<(Client, jetstream::Context), NatsError> {
let url = std::env::var("NATS_SERVER").unwrap_or_else(|_| "nats://127.0.0.1:4222".to_string());
let client = async_nats::connect(&url)
.await
.map_err(|e| NatsError::Connect(format!("{url}: {e}")))?;
let js = jetstream::new(client.clone());
Ok((client, js))
}

577
nu_plugin_nats/src/main.rs Normal file
View File

@ -0,0 +1,577 @@
//! Nushell plugin for NATS JetStream operations.
//!
//! All commands read stream/consumer topology from a JSON config file.
//! Resolution: `--config <path>` flag → `NATS_STREAMS_CONFIG` env var.
//!
//! Commands:
//! - `nats stream setup` — create/ensure streams declared in topology config
//! - `nats consumer setup` — create consumers declared in topology config
//! - `nats notify` — drain pending notifications from topology consumers
//! - `nats pub` — publish a JSON event to a JetStream subject
//! - `nats status` — live state of streams declared in topology config
use nu_plugin::{
serve_plugin, EngineInterface, EvaluatedCall, MsgPackSerializer, Plugin, PluginCommand,
SimplePluginCommand,
};
use nu_protocol::{
record, Category, Example, LabeledError, Record, Signature, SyntaxShape, Type, Value,
};
use bytes::Bytes;
pub mod client;
pub mod streams;
use client::nats_connect;
use streams::{
ensure_consumers, ensure_streams, fetch_notifications, get_stream_status, load_topology,
publish_message, TopologyConfig,
};
// =============================================================================
// Plugin
// =============================================================================
#[derive(Debug)]
pub struct NatsPlugin;
impl Plugin for NatsPlugin {
fn version(&self) -> String {
env!("CARGO_PKG_VERSION").into()
}
fn commands(&self) -> Vec<Box<dyn PluginCommand<Plugin = Self>>> {
vec![
Box::new(NatsStreamSetup),
Box::new(NatsConsumerSetup),
Box::new(NatsNotify),
Box::new(NatsPub),
Box::new(NatsStatus),
]
}
}
// =============================================================================
// Helpers
// =============================================================================
fn json_value_to_nu_value(json_value: &serde_json::Value, span: nu_protocol::Span) -> Value {
match json_value {
serde_json::Value::Null => Value::nothing(span),
serde_json::Value::Bool(b) => Value::bool(*b, span),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
Value::int(i, span)
} else if let Some(f) = n.as_f64() {
Value::float(f, span)
} else {
Value::string(n.to_string(), span)
}
}
serde_json::Value::String(s) => Value::string(s.clone(), span),
serde_json::Value::Array(arr) => {
let values = arr
.iter()
.map(|v| json_value_to_nu_value(v, span))
.collect();
Value::list(values, span)
}
serde_json::Value::Object(obj) => {
let mut rec = Record::new();
for (key, value) in obj {
rec.push(key.clone(), json_value_to_nu_value(value, span));
}
Value::record(rec, span)
}
}
}
fn nu_value_to_json(value: &Value) -> Result<serde_json::Value, LabeledError> {
match value {
Value::Nothing { .. } => Ok(serde_json::Value::Null),
Value::Bool { val, .. } => Ok(serde_json::Value::Bool(*val)),
Value::Int { val, .. } => Ok(serde_json::Value::from(*val)),
Value::Float { val, .. } => serde_json::Number::from_f64(*val)
.map(serde_json::Value::Number)
.ok_or_else(|| LabeledError::new("float value is not representable as JSON number")),
Value::String { val, .. } => Ok(serde_json::Value::String(val.clone())),
Value::List { vals, .. } => {
let items: Result<Vec<_>, _> = vals.iter().map(nu_value_to_json).collect();
Ok(serde_json::Value::Array(items?))
}
Value::Record { val, .. } => {
let mut map = serde_json::Map::new();
for (k, v) in val.iter() {
map.insert(k.to_string(), nu_value_to_json(v)?);
}
Ok(serde_json::Value::Object(map))
}
other => Ok(serde_json::Value::String(format!("{other:?}"))),
}
}
fn build_rt() -> Result<tokio::runtime::Runtime, LabeledError> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| LabeledError::new(e.to_string()))
}
/// Extract topology config from `--config` flag. Falls back to `NATS_STREAMS_CONFIG` env var.
fn resolve_topology(call: &EvaluatedCall) -> Result<TopologyConfig, LabeledError> {
let config_path: Option<String> = call.get_flag("config")?;
load_topology(config_path.as_deref())
.map_err(|e| LabeledError::new(e.to_string()))?
.ok_or_else(|| {
LabeledError::new(
"no topology config found: pass --config <path> or set NATS_STREAMS_CONFIG",
)
})
}
// =============================================================================
// nats stream setup
// =============================================================================
#[derive(Debug)]
pub struct NatsStreamSetup;
impl SimplePluginCommand for NatsStreamSetup {
type Plugin = NatsPlugin;
fn name(&self) -> &str {
"nats stream setup"
}
fn signature(&self) -> Signature {
Signature::build(PluginCommand::name(self))
.input_output_type(Type::Nothing, Type::Record(vec![].into()))
.named(
"config",
SyntaxShape::Filepath,
"Path to topology JSON config (fallback: NATS_STREAMS_CONFIG env)",
Some('c'),
)
.category(Category::Custom("nats".into()))
}
fn description(&self) -> &str {
"Create or ensure JetStream streams declared in topology config (idempotent)"
}
fn examples(&self) -> Vec<Example<'_>> {
vec![
Example {
example: r#"nats stream setup --config nats/streams.json"#,
description: "Ensure all streams from config exist",
result: None,
},
Example {
example: "nats stream setup",
description: "Use NATS_STREAMS_CONFIG env var for topology",
result: None,
},
]
}
fn run(
&self,
_plugin: &NatsPlugin,
_engine: &EngineInterface,
call: &EvaluatedCall,
_input: &Value,
) -> Result<Value, LabeledError> {
let topology = resolve_topology(call)?;
let span = call.head;
let rt = build_rt()?;
let count = rt.block_on(async {
let (_, js) = nats_connect()
.await
.map_err(|e| LabeledError::new(e.to_string()))?;
ensure_streams(&js, &topology)
.await
.map_err(|e| LabeledError::new(e.to_string()))
})?;
Ok(Value::record(
record! {
"streams_created" => Value::int(count as i64, span),
},
span,
))
}
}
// =============================================================================
// nats consumer setup
// =============================================================================
#[derive(Debug)]
pub struct NatsConsumerSetup;
impl SimplePluginCommand for NatsConsumerSetup {
type Plugin = NatsPlugin;
fn name(&self) -> &str {
"nats consumer setup"
}
fn signature(&self) -> Signature {
Signature::build(PluginCommand::name(self))
.input_output_type(Type::Nothing, Type::Record(vec![].into()))
.named(
"config",
SyntaxShape::Filepath,
"Path to topology JSON config (fallback: NATS_STREAMS_CONFIG env)",
Some('c'),
)
.category(Category::Custom("nats".into()))
}
fn description(&self) -> &str {
"Create durable pull consumers declared in topology config (idempotent)"
}
fn examples(&self) -> Vec<Example<'_>> {
vec![
Example {
example: r#"nats consumer setup --config nats/streams.json"#,
description: "Ensure all consumers from config exist",
result: None,
},
Example {
example: "nats consumer setup",
description: "Use NATS_STREAMS_CONFIG env var for topology",
result: None,
},
]
}
fn run(
&self,
_plugin: &NatsPlugin,
_engine: &EngineInterface,
call: &EvaluatedCall,
_input: &Value,
) -> Result<Value, LabeledError> {
let topology = resolve_topology(call)?;
let span = call.head;
let rt = build_rt()?;
let count = rt.block_on(async {
let (_, js) = nats_connect()
.await
.map_err(|e| LabeledError::new(e.to_string()))?;
ensure_consumers(&js, &topology)
.await
.map_err(|e| LabeledError::new(e.to_string()))
})?;
Ok(Value::record(
record! {
"consumers_created" => Value::int(count as i64, span),
},
span,
))
}
}
// =============================================================================
// nats notify
// =============================================================================
#[derive(Debug)]
pub struct NatsNotify;
impl SimplePluginCommand for NatsNotify {
type Plugin = NatsPlugin;
fn name(&self) -> &str {
"nats notify"
}
fn signature(&self) -> Signature {
Signature::build(PluginCommand::name(self))
.input_output_type(
Type::Nothing,
Type::List(Box::new(Type::Record(vec![].into()))),
)
.named(
"config",
SyntaxShape::Filepath,
"Path to topology JSON config (fallback: NATS_STREAMS_CONFIG env)",
Some('c'),
)
.named(
"count",
SyntaxShape::Int,
"Maximum messages to fetch per consumer (default: 100)",
Some('n'),
)
.named(
"timeout",
SyntaxShape::Int,
"Fetch timeout in seconds (default: 5)",
Some('t'),
)
.category(Category::Custom("nats".into()))
}
fn description(&self) -> &str {
"Drain pending notifications from JetStream consumers declared in topology config"
}
fn examples(&self) -> Vec<Example<'_>> {
vec![
Example {
example: r#"nats notify --config nats/streams.json"#,
description: "Fetch pending notifications from all configured consumers",
result: None,
},
Example {
example: "nats notify --count 10 --timeout 2",
description: "Fetch up to 10 notifications per consumer, 2s timeout",
result: None,
},
]
}
fn run(
&self,
_plugin: &NatsPlugin,
_engine: &EngineInterface,
call: &EvaluatedCall,
_input: &Value,
) -> Result<Value, LabeledError> {
let topology = resolve_topology(call)?;
let count = call.get_flag::<i64>("count")?.unwrap_or(100).max(1) as usize;
let timeout_secs = call.get_flag::<i64>("timeout")?.unwrap_or(5).max(1) as u64;
let span = call.head;
let rt = build_rt()?;
let msgs = rt.block_on(async {
let (_, js) = nats_connect()
.await
.map_err(|e| LabeledError::new(e.to_string()))?;
fetch_notifications(&js, &topology, count, timeout_secs)
.await
.map_err(|e| LabeledError::new(e.to_string()))
})?;
let values: Vec<Value> = msgs
.iter()
.map(|msg| {
Value::record(
record! {
"stream" => Value::string(msg.stream.clone(), span),
"subject" => Value::string(msg.subject.clone(), span),
"sequence" => Value::int(msg.sequence as i64, span),
"payload" => json_value_to_nu_value(&msg.payload, span),
"timestamp" => Value::string(msg.timestamp.clone(), span),
},
span,
)
})
.collect();
Ok(Value::list(values, span))
}
}
// =============================================================================
// nats pub
// =============================================================================
#[derive(Debug)]
pub struct NatsPub;
impl SimplePluginCommand for NatsPub {
type Plugin = NatsPlugin;
fn name(&self) -> &str {
"nats pub"
}
fn signature(&self) -> Signature {
Signature::build(PluginCommand::name(self))
.input_output_type(Type::Any, Type::Record(vec![].into()))
.required(
"subject",
SyntaxShape::String,
"Full NATS subject to publish to",
)
.named(
"payload",
SyntaxShape::String,
"JSON payload string (alternative to pipeline input)",
Some('p'),
)
.category(Category::Custom("nats".into()))
}
fn description(&self) -> &str {
"Publish a JSON event to a NATS JetStream subject"
}
fn examples(&self) -> Vec<Example<'_>> {
vec![
Example {
example: r#"{mode_id: "validate", project: "typedialog"} | nats pub "ecosystem.reflection.request""#,
description: "Publish a record to a JetStream subject",
result: None,
},
Example {
example: r#"nats pub "provisioning.tasks.status" --payload '{"task_id":"t-1","status":"done"}'"#,
description: "Publish raw JSON via --payload flag",
result: None,
},
]
}
fn run(
&self,
_plugin: &NatsPlugin,
_engine: &EngineInterface,
call: &EvaluatedCall,
input: &Value,
) -> Result<Value, LabeledError> {
let subject: String = call.req(0)?;
let span = call.head;
let payload_bytes = if matches!(input, Value::Nothing { .. }) {
let json_str: String = call
.get_flag("payload")?
.ok_or_else(|| LabeledError::new("pipe a record or provide --payload <json>"))?;
Bytes::from(json_str.into_bytes())
} else {
let json_val = nu_value_to_json(input)?;
let json_bytes =
serde_json::to_vec(&json_val).map_err(|e| LabeledError::new(e.to_string()))?;
Bytes::from(json_bytes)
};
let rt = build_rt()?;
let (stream, sequence) = rt.block_on(async {
let (_, js) = nats_connect()
.await
.map_err(|e| LabeledError::new(e.to_string()))?;
publish_message(&js, &subject, payload_bytes)
.await
.map_err(|e| LabeledError::new(e.to_string()))
})?;
Ok(Value::record(
record! {
"subject" => Value::string(subject, span),
"stream" => Value::string(stream, span),
"sequence" => Value::int(sequence as i64, span),
},
span,
))
}
}
// =============================================================================
// nats status
// =============================================================================
#[derive(Debug)]
pub struct NatsStatus;
impl SimplePluginCommand for NatsStatus {
type Plugin = NatsPlugin;
fn name(&self) -> &str {
"nats status"
}
fn signature(&self) -> Signature {
Signature::build(PluginCommand::name(self))
.input_output_type(
Type::Nothing,
Type::List(Box::new(Type::Record(vec![].into()))),
)
.named(
"config",
SyntaxShape::Filepath,
"Path to topology JSON config (fallback: NATS_STREAMS_CONFIG env)",
Some('c'),
)
.category(Category::Custom("nats".into()))
}
fn description(&self) -> &str {
"Show live state of JetStream streams declared in topology config"
}
fn examples(&self) -> Vec<Example<'_>> {
vec![
Example {
example: r#"nats status --config nats/streams.json"#,
description: "Show stream state for all configured streams",
result: None,
},
Example {
example: "nats status | where messages > 0",
description: "Show only streams with pending messages",
result: None,
},
]
}
fn run(
&self,
_plugin: &NatsPlugin,
_engine: &EngineInterface,
call: &EvaluatedCall,
_input: &Value,
) -> Result<Value, LabeledError> {
let topology = resolve_topology(call)?;
let span = call.head;
let rt = build_rt()?;
let statuses = rt.block_on(async {
let (_, js) = nats_connect()
.await
.map_err(|e| LabeledError::new(e.to_string()))?;
get_stream_status(&js, &topology)
.await
.map_err(|e| LabeledError::new(e.to_string()))
})?;
let values: Vec<Value> = statuses
.iter()
.map(|s| {
let subjects: Vec<Value> = s
.subjects
.iter()
.map(|subj| Value::string(subj.clone(), span))
.collect();
Value::record(
record! {
"stream" => Value::string(s.name.clone(), span),
"subjects" => Value::list(subjects, span),
"retention" => Value::string(s.retention.clone(), span),
"messages" => Value::int(s.messages as i64, span),
"bytes" => Value::int(s.bytes as i64, span),
"consumers" => Value::int(s.consumers as i64, span),
},
span,
)
})
.collect();
Ok(Value::list(values, span))
}
}
// =============================================================================
// Entry point
// =============================================================================
fn main() {
serve_plugin(&NatsPlugin, MsgPackSerializer);
}

View File

@ -0,0 +1,381 @@
use async_nats::jetstream::{
self,
consumer::{pull, AckPolicy},
stream::{Config as StreamConfig, RetentionPolicy, StorageType},
};
use bytes::Bytes;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::time::Duration;
use crate::client::NatsError;
// ── Topology config (same JSON contract as platform-nats) ──────────────
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopologyConfig {
pub streams: Vec<StreamDef>,
#[serde(default)]
pub consumers: Vec<ConsumerDef>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamDef {
pub name: String,
pub subjects: Vec<String>,
#[serde(default = "default_retention")]
pub retention: Retention,
#[serde(default)]
pub max_age_days: u64,
#[serde(default = "default_storage")]
pub storage: Storage,
#[serde(default)]
pub max_messages: i64,
#[serde(default)]
pub max_bytes: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsumerDef {
pub name: String,
pub stream: String,
#[serde(default = "default_ack_policy")]
pub ack_policy: ConsumerAckPolicy,
#[serde(default)]
pub filter_subjects: Vec<String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
#[serde(rename_all = "PascalCase")]
pub enum Retention {
#[default]
Limits,
Interest,
WorkQueue,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
#[serde(rename_all = "PascalCase")]
pub enum Storage {
#[default]
File,
Memory,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
#[serde(rename_all = "PascalCase")]
pub enum ConsumerAckPolicy {
#[default]
Explicit,
None,
All,
}
fn default_retention() -> Retention {
Retention::Limits
}
fn default_storage() -> Storage {
Storage::File
}
fn default_ack_policy() -> ConsumerAckPolicy {
ConsumerAckPolicy::Explicit
}
impl From<Retention> for RetentionPolicy {
fn from(r: Retention) -> Self {
match r {
Retention::Limits => RetentionPolicy::Limits,
Retention::Interest => RetentionPolicy::Interest,
Retention::WorkQueue => RetentionPolicy::WorkQueue,
}
}
}
impl From<Storage> for StorageType {
fn from(s: Storage) -> Self {
match s {
Storage::File => StorageType::File,
Storage::Memory => StorageType::Memory,
}
}
}
impl From<ConsumerAckPolicy> for AckPolicy {
fn from(a: ConsumerAckPolicy) -> Self {
match a {
ConsumerAckPolicy::Explicit => AckPolicy::Explicit,
ConsumerAckPolicy::None => AckPolicy::None,
ConsumerAckPolicy::All => AckPolicy::All,
}
}
}
/// Load topology from explicit path, then `NATS_STREAMS_CONFIG` env fallback.
/// Returns `None` if neither is available.
pub fn load_topology(explicit_path: Option<&str>) -> Result<Option<TopologyConfig>, NatsError> {
let path = explicit_path
.map(String::from)
.or_else(|| std::env::var("NATS_STREAMS_CONFIG").ok());
let path = match path {
Some(p) => p,
None => return Ok(None),
};
let content = std::fs::read_to_string(Path::new(&path))
.map_err(|e| NatsError::Stream(format!("reading topology config {path}: {e}")))?;
let config: TopologyConfig = serde_json::from_str(&content)
.map_err(|e| NatsError::Stream(format!("parsing topology config {path}: {e}")))?;
Ok(Some(config))
}
// ── Stream/consumer setup (topology-driven) ────────────────────────────
/// Ensures all streams declared in topology exist. Idempotent via `get_or_create_stream`.
pub async fn ensure_streams(
js: &jetstream::Context,
topology: &TopologyConfig,
) -> Result<usize, NatsError> {
let mut count = 0;
for stream_def in &topology.streams {
let max_age = if stream_def.max_age_days > 0 {
Duration::from_secs(stream_def.max_age_days * 86400)
} else {
Duration::ZERO
};
let cfg = StreamConfig {
name: stream_def.name.clone(),
subjects: stream_def.subjects.clone(),
retention: stream_def.retention.into(),
storage: stream_def.storage.into(),
max_age,
max_messages: stream_def.max_messages,
max_bytes: stream_def.max_bytes,
..Default::default()
};
js.get_or_create_stream(cfg).await.map_err(|e| {
NatsError::Stream(format!("failed to ensure stream '{}': {e}", stream_def.name))
})?;
count += 1;
}
Ok(count)
}
/// Creates all consumers declared in topology. Idempotent via `get_or_create_consumer`.
pub async fn ensure_consumers(
js: &jetstream::Context,
topology: &TopologyConfig,
) -> Result<usize, NatsError> {
let mut count = 0;
for consumer_def in &topology.consumers {
let stream = js
.get_stream(&consumer_def.stream)
.await
.map_err(|e| NatsError::Consumer(format!("stream '{}': {e}", consumer_def.stream)))?;
let mut pull_cfg = pull::Config {
durable_name: Some(consumer_def.name.clone()),
ack_policy: consumer_def.ack_policy.into(),
..Default::default()
};
if consumer_def.filter_subjects.len() == 1 {
pull_cfg.filter_subject = consumer_def.filter_subjects[0].clone();
} else if consumer_def.filter_subjects.len() > 1 {
pull_cfg.filter_subjects = consumer_def.filter_subjects.clone();
}
stream
.get_or_create_consumer(&consumer_def.name, pull_cfg)
.await
.map_err(|e| {
NatsError::Consumer(format!(
"consumer '{}' on stream '{}': {e}",
consumer_def.name, consumer_def.stream
))
})?;
count += 1;
}
Ok(count)
}
// ── Notifications (topology-driven) ────────────────────────────────────
pub struct NotificationMsg {
pub stream: String,
pub subject: String,
pub sequence: u64,
pub payload: serde_json::Value,
pub timestamp: String,
}
/// Drains up to `count` pending messages from each consumer declared in topology.
pub async fn fetch_notifications(
js: &jetstream::Context,
topology: &TopologyConfig,
count: usize,
timeout_secs: u64,
) -> Result<Vec<NotificationMsg>, NatsError> {
let mut results = Vec::new();
for consumer_def in &topology.consumers {
let stream = js.get_stream(&consumer_def.stream).await.map_err(|e| {
NatsError::Consumer(format!("stream '{}': {e}", consumer_def.stream))
})?;
let consumer = stream
.get_or_create_consumer(
&consumer_def.name,
pull::Config {
durable_name: Some(consumer_def.name.clone()),
ack_policy: consumer_def.ack_policy.into(),
..Default::default()
},
)
.await
.map_err(|e| {
NatsError::Consumer(format!(
"consumer '{}' on '{}': {e}",
consumer_def.name, consumer_def.stream
))
})?;
let mut batch = consumer
.fetch()
.max_messages(count)
.expires(Duration::from_secs(timeout_secs))
.messages()
.await
.map_err(|e| {
NatsError::Fetch(format!(
"fetch from '{}' consumer '{}': {e}",
consumer_def.stream, consumer_def.name
))
})?;
while let Some(result) = batch.next().await {
let msg = result.map_err(|e| {
NatsError::Fetch(format!(
"message from '{}' consumer '{}': {e}",
consumer_def.stream, consumer_def.name
))
})?;
let (msg_stream, sequence, timestamp, subject, payload) = {
let info = msg
.info()
.map_err(|e| NatsError::Fetch(format!("message info: {e}")))?;
let ts_secs = info.published.unix_timestamp();
let ts_nanos = info.published.nanosecond();
let timestamp = chrono::DateTime::<chrono::Utc>::from_timestamp(ts_secs, ts_nanos)
.map(|dt| dt.to_rfc3339())
.unwrap_or_else(|| "unknown".to_string());
let payload = serde_json::from_slice::<serde_json::Value>(&msg.payload)
.unwrap_or_else(|_| {
serde_json::Value::String(
String::from_utf8_lossy(&msg.payload).into_owned(),
)
});
(
info.stream.to_string(),
info.stream_sequence,
timestamp,
msg.subject.to_string(),
payload,
)
};
msg.ack()
.await
.map_err(|e| NatsError::Fetch(format!("ack: {e}")))?;
results.push(NotificationMsg {
stream: msg_stream,
subject,
sequence,
payload,
timestamp,
});
}
}
Ok(results)
}
// ── Stream status (topology-driven) ────────────────────────────────────
pub struct StreamStatus {
pub name: String,
pub subjects: Vec<String>,
pub retention: String,
pub messages: u64,
pub bytes: u64,
pub consumers: usize,
}
/// Returns live state for all streams declared in topology.
pub async fn get_stream_status(
js: &jetstream::Context,
topology: &TopologyConfig,
) -> Result<Vec<StreamStatus>, NatsError> {
let mut statuses = Vec::new();
for stream_def in &topology.streams {
let mut stream = js.get_stream(&stream_def.name).await.map_err(|e| {
NatsError::Stream(format!("stream '{}': {e}", stream_def.name))
})?;
let info = stream.info().await.map_err(|e| {
NatsError::Stream(format!("info for '{}': {e}", stream_def.name))
})?;
let retention = match info.config.retention {
RetentionPolicy::Limits => "Limits",
RetentionPolicy::Interest => "Interest",
RetentionPolicy::WorkQueue => "WorkQueue",
};
statuses.push(StreamStatus {
name: stream_def.name.clone(),
subjects: info.config.subjects.clone(),
retention: retention.to_string(),
messages: info.state.messages,
bytes: info.state.bytes,
consumers: info.state.consumer_count,
});
}
Ok(statuses)
}
/// Publishes a message to the exact subject provided. Returns (stream, sequence).
pub async fn publish_message(
js: &jetstream::Context,
subject: &str,
payload: Bytes,
) -> Result<(String, u64), NatsError> {
let ack = js
.publish(subject.to_owned(), payload)
.await
.map_err(|e| NatsError::Publish(e.to_string()))?
.await
.map_err(|e| NatsError::Publish(format!("ack: {e}")))?;
Ok((ack.stream, ack.sequence))
}

View File

@ -0,0 +1,110 @@
#!/usr/bin/env nu
# Integration tests for nu_plugin_nats.
# Requires: NATS server running on $NATS_SERVER (default nats://127.0.0.1:4222).
# Run: nu tests/integration.nu
def pass [label: string] {
print $"✓ ($label)"
}
def fail [label: string, detail: string] {
print $"✗ ($label): ($detail)"
exit 1
}
def assert_eq [label: string, got: any, expected: any] {
if $got == $expected {
pass $label
} else {
fail $label $"expected ($expected | to nuon), got ($got | to nuon)"
}
}
def assert_gte [label: string, got: int, min: int] {
if $got >= $min {
pass $label
} else {
fail $label $"expected >= ($min), got ($got)"
}
}
def assert_not_empty [label: string, val: list] {
if ($val | length) > 0 {
pass $label
} else {
fail $label "expected non-empty list, got empty"
}
}
print "── nats stream setup ──────────────────────────────"
# Plugin commands propagate errors as Nushell errors — if this fails the test aborts.
nats stream setup
pass "nats stream setup completed without error"
print "── nats consumer setup ────────────────────────────"
nats consumer setup
pass "nats consumer setup completed without error"
print "── nats status ────────────────────────────────────"
let status = nats status
assert_eq "status returns 6 streams" ($status | length) 6
let stream_names = $status | get stream
for name in ["TASKS", "VAULT", "AUTH", "WORKSPACE", "AUDIT", "HEALTH"] {
if not ($stream_names | any { |s| $s == $name }) {
fail $"status contains ($name)" "stream missing from output"
}
pass $"stream ($name) present"
}
let workspace_row = $status | where stream == "WORKSPACE" | first
assert_gte "WORKSPACE has cli-notifications consumer" $workspace_row.consumers 1
let audit_row = $status | where stream == "AUDIT" | first
assert_gte "AUDIT has cli-notifications consumer" $audit_row.consumers 1
print "── nats pub ───────────────────────────────────────"
let pub_ws = (
{test_id: "integration-1", status: "ok", ts: (date now | format date "%+")}
| nats pub "workspace.test.event"
)
assert_eq "pub WORKSPACE stream name" $pub_ws.stream "WORKSPACE"
assert_gte "pub WORKSPACE sequence > 0" $pub_ws.sequence 1
let pub_audit = (
{test_id: "integration-1", action: "test_run"}
| nats pub "audit.test.event"
)
assert_eq "pub AUDIT stream name" $pub_audit.stream "AUDIT"
assert_gte "pub AUDIT sequence > 0" $pub_audit.sequence 1
# provisioning. prefix applied automatically
assert_eq "subject has provisioning. prefix" ($pub_ws.subject | str starts-with "provisioning.") true
# Full subject passed in does not get double-prefixed
let pub_full = ({v: 1} | nats pub "provisioning.workspace.test.explicit")
assert_eq "full subject not double-prefixed" $pub_full.subject "provisioning.workspace.test.explicit"
print "── nats notify ────────────────────────────────────"
let msgs = nats notify --timeout 3
assert_not_empty "notify drains published messages" $msgs
let ws_msg = $msgs | where stream == "WORKSPACE" | first
assert_eq "WORKSPACE msg subject contains workspace.test" ($ws_msg.subject | str contains "workspace.test") true
assert_eq "WORKSPACE payload is record" ($ws_msg.payload | describe | str starts-with "record") true
assert_eq "WORKSPACE payload.test_id" $ws_msg.payload.test_id "integration-1"
let audit_msg = $msgs | where stream == "AUDIT" | first
assert_eq "AUDIT msg stream field" $audit_msg.stream "AUDIT"
assert_eq "sequence type is int" ($ws_msg.sequence | describe) "int"
assert_eq "timestamp is a string" ($ws_msg.timestamp | describe) "string"
assert_eq "timestamp is not unknown" ($ws_msg.timestamp != "unknown") true
print ""
print "── all tests passed ───────────────────────────────"