nushell-plugins/nu_plugin_nats
Jesús Pérez b6eeaee4da
Some checks failed
Build and Test / Validate Setup (push) Has been cancelled
Build and Test / Build (darwin-amd64) (push) Has been cancelled
Build and Test / Build (darwin-arm64) (push) Has been cancelled
Build and Test / Build (linux-amd64) (push) Has been cancelled
Build and Test / Build (windows-amd64) (push) Has been cancelled
Build and Test / Build (linux-arm64) (push) Has been cancelled
Build and Test / Security Audit (push) Has been cancelled
Build and Test / Package Results (push) Has been cancelled
Build and Test / Quality Gate (push) Has been cancelled
feat: config-driven topology — replace hardcoded provisioning streams with JSON config
All commands now read stream/consumer definitions from a topology JSON file
  (--config flag or NATS_STREAMS_CONFIG env). nats pub publishes to exact
  subjects without auto-prefixing. Category changed from provisioning to nats
2026-03-11 03:17:08 +00:00
..

nu_plugin_nats

Nushell plugin for NATS JetStream operations on the provisioning platform event bus.

Bundles async-nats directly — no external nats CLI required.

Commands

Command Input Output Description
nats stream setup nothing nothing Create the 6 platform streams (idempotent)
nats consumer setup nothing nothing Create cli-notifications consumers on WORKSPACE + AUDIT (idempotent)
nats notify nothing list<record> Drain pending notifications from WORKSPACE and AUDIT
nats pub <subject> any record Publish a JSON event to a platform subject
nats status nothing list<record> Live state of all 6 streams

Installation

cd plugins/nushell-plugins
just install-plugin nu_plugin_nats

Configuration

Variable Default Description
NATS_SERVER nats://127.0.0.1:4222 NATS server URL

Quick Start

Bootstrap streams and consumers on first use:

nats stream setup
nats consumer setup
nats status

Publish an event from pipeline:

{workspace_id: "ws-1", action: "deploy", status: "started"}
    | nats pub "workspace.deploy.started"
# => {subject: "provisioning.workspace.deploy.started", stream: "WORKSPACE", sequence: 42}

Publish with raw JSON payload:

nats pub "audit.login" --payload '{"user":"admin","ip":"10.0.0.1"}'

Drain pending notifications:

nats notify
nats notify --count 10 --timeout 2

Filter by stream:

nats notify | where stream == "AUDIT"

Extract payload fields:

nats notify | get payload | where workspace_id == "ws-1"

Monitor stream health:

nats status | select stream messages consumers
nats status | where messages > 0

Subjects

The plugin prefixes provisioning. automatically. Pass only the suffix:

"workspace.deploy.done"  →  provisioning.workspace.deploy.done  →  WORKSPACE stream
"audit.login.failed"     →  provisioning.audit.login.failed     →  AUDIT stream
"tasks.job.created"      →  provisioning.tasks.job.created      →  TASKS stream

Passing a subject that already starts with provisioning. skips the prefix.

Streams

Stream Subjects Retention Max Age
TASKS provisioning.tasks.> WorkQueue
VAULT provisioning.vault.> Interest
AUTH provisioning.auth.> Interest
WORKSPACE provisioning.workspace.> Limits 7 days
AUDIT provisioning.audit.> Limits 90 days
HEALTH provisioning.health.> Interest

cli-notifications consumers are created on WORKSPACE and AUDIT only. These are the streams with Limits retention — they retain messages independently of active subscribers.

Testing

Requires NATS running on $NATS_SERVER:

nu tests/integration.nu

Source

src/
├── main.rs      # Plugin struct + 5 command impls + JSON↔Value converters
├── client.rs    # NatsError + nats_connect()
└── streams.rs   # ensure_streams, ensure_consumers, fetch_notifications,
                 # get_stream_status, publish_message