413 lines
13 KiB
Plaintext
Raw Normal View History

2026-03-13 00:21:04 +00:00
#!/usr/bin/env nu
# nats.nu - Bidirectional NATS event system for reflection operations.
#
# Production-ready implementation with:
# - Payload validation against Nickel contracts
# - Robust event parsing from nats notify
# - Real handler loading + execution
# - Error handling + logging
# - Environment variable management
use ./store.nu *
# -- Configuration ────────────────────────────────────────────────────────────
def get-nats-config []: nothing -> record {
let root = ($env.ONTOREF_PROJECT_ROOT? | default $env.ONTOREF_ROOT)
let config_path = $root + "/.ontoref/config.ncl"
if not ($config_path | path exists) {
return { nats_events: { enabled: false, url: "nats://localhost:4222", handlers_dir: "reflection/handlers" } }
}
let result = (do { ^nickel export $config_path } | complete)
if $result.exit_code == 0 {
$result.stdout | from json
} else {
{ nats_events: { enabled: false, url: "nats://localhost:4222", handlers_dir: "reflection/handlers" } }
}
}
def get-subjects []: nothing -> record {
let root = ($env.ONTOREF_PROJECT_ROOT? | default $env.ONTOREF_ROOT)
let subjects_path = $root + "/nats/subjects.ncl"
let nickel_path = $root + ":/Users/Akasha/Tools/dev-system/ci/schemas"
if not ($subjects_path | path exists) {
return { subjects: {}, payloads: {} }
}
let result = (do {
env NICKEL_IMPORT_PATH=$nickel_path
^nickel export $subjects_path
} | complete)
if $result.exit_code == 0 {
$result.stdout | from json
} else {
{ subjects: {}, payloads: {} }
}
}
def event-to-contract-name [event: string]: nothing -> string {
match $event {
"mode.started" => "NushellModeStarted",
"mode.completed" => "NushellModeCompleted",
"mode.failed" => "NushellModeCompleted",
"sync.completed" => "NushellSyncCompleted",
"config.changed" => "NushellConfigChanged",
"reload" => "NushellReload",
_ => ""
}
}
# -- Availability Check ───────────────────────────────────────────────────────
export def "nats-available" []: nothing -> bool {
if ($env.NATS_AVAILABLE? | is-not-empty) {
return ($env.NATS_AVAILABLE | into bool)
}
let config = (get-nats-config)
let enabled = ($config.nats_events.enabled? | default false)
if not $enabled {
$env.NATS_AVAILABLE = "false"
return false
}
let result = (do { nats status } | complete)
let available = ($result.exit_code == 0)
$env.NATS_AVAILABLE = ($available | into string)
$available
}
def nats-subject [event: string]: nothing -> string {
let subjects = (get-subjects)
let mapping = $subjects.subjects
match $event {
"mode.started" => { $mapping.nushell_mode_started? | default "" },
"mode.completed" => { $mapping.nushell_mode_completed? | default "" },
"mode.failed" => { $mapping.nushell_mode_failed? | default "" },
"sync.completed" => { $mapping.nushell_sync_completed? | default "" },
"config.changed" => { $mapping.nushell_config_changed? | default "" },
"reload" => { $mapping.nushell_reload? | default "" },
"reflection.request" => { $mapping.reflection_request? | default "" },
"ontology.changed" => { $mapping.ontology_validated? | default "" },
_ => { "" }
}
}
# Validate payload against schema field requirements
def validate-payload [event: string, payload: record]: nothing -> record {
let contract_name = (event-to-contract-name $event)
match $event {
"mode.started" => {
# Required: mode_id, project, actor, timestamp
if ($payload.mode_id? | is-empty) {
error make { msg: "Invalid NushellModeStarted: missing mode_id" }
}
if ($payload.project? | is-empty) {
error make { msg: "Invalid NushellModeStarted: missing project" }
}
if ($payload.actor? | is-empty) {
error make { msg: "Invalid NushellModeStarted: missing actor" }
}
if ($payload.timestamp? | is-empty) {
error make { msg: "Invalid NushellModeStarted: missing timestamp" }
}
},
"mode.completed" => {
# Required: mode_id, project, status, steps_run, timestamp
if ($payload.mode_id? | is-empty) {
error make { msg: "Invalid NushellModeCompleted: missing mode_id" }
}
if ($payload.project? | is-empty) {
error make { msg: "Invalid NushellModeCompleted: missing project" }
}
if ($payload.status? | is-empty) {
error make { msg: "Invalid NushellModeCompleted: missing status" }
}
if ($payload.steps_run? | is-empty) {
error make { msg: "Invalid NushellModeCompleted: missing steps_run" }
}
if ($payload.timestamp? | is-empty) {
error make { msg: "Invalid NushellModeCompleted: missing timestamp" }
}
},
"sync.completed" => {
# Required: project, nodes_ok, nodes_warn, nodes_err, timestamp
if ($payload.project? | is-empty) {
error make { msg: "Invalid NushellSyncCompleted: missing project" }
}
if ($payload.timestamp? | is-empty) {
error make { msg: "Invalid NushellSyncCompleted: missing timestamp" }
}
},
"reload" => {
# Required: reason, project
if ($payload.reason? | is-empty) {
error make { msg: "Invalid NushellReload: missing reason" }
}
},
_ => { }
}
$payload
}
# -- Emit ─────────────────────────────────────────────────────────────────────
export def "nats-emit" [event: string]: record -> nothing {
if not (nats-available) { return }
let subject = (nats-subject $event)
if ($subject | is-empty) {
let cross = (ansi red) + "✗" + (ansi reset)
print $" $cross Unknown event: ($event)"
return
}
let config = (get-nats-config)
let should_emit = ($config.nats_events.emit? | default [] | where { |e| $e == $event } | is-not-empty)
if not $should_emit {
print $" (ansi dark_gray)[nats-emit] ($event) not in emit list(ansi reset)"
return
}
let payload = $in
# Validate payload
let validated = (do {
validate-payload $event $payload
} | complete)
if $validated.exit_code != 0 {
print $" (ansi red)✗(ansi reset) Validation failed: ($validated.stderr? | default 'unknown error')"
return
}
# Publish to NATS
let nats_url = ($config.nats_events.url? | default "nats://localhost:4222")
let publish_result = (do {
with-env { NATS_SERVER: $nats_url } {
$payload | to json | ^nats pub $subject
}
} | complete)
if $publish_result.exit_code == 0 {
let check = (ansi green) + "✓" + (ansi reset)
print $" $check Event published: ($event) → ($subject)"
} else {
let err = ($publish_result.stderr? | default "unknown error")
let cross = (ansi red) + "✗" + (ansi reset)
print $" $cross Publish failed: $err"
}
}
# -- Listen ───────────────────────────────────────────────────────────────────
export def "nats-listen" [--interval: int = 2]: nothing -> nothing {
if not (nats-available) {
print " [nats-listen] NATS unavailable, skipping"
return
}
let config = (get-nats-config)
let nats_url = ($config.nats_events.url? | default "nats://localhost:4222")
let subscribe_list = ($config.nats_events.subscribe? | default [] | str join ", ")
let interval_text = ($interval | into string)
let cyan = (ansi cyan)
let gray = (ansi dark_gray)
let reset = (ansi reset)
print $" $cyan→$reset Listening for NATS events (interval: ($interval_text)s)"
print $" $graySubscribe to: ($subscribe_list)$reset"
print ""
# Main loop
loop {
let events_result = (do {
with-env { NATS_SERVER: $nats_url } {
^nats notify --count 50 --timeout $interval
}
} | complete)
if $events_result.exit_code == 0 and ($events_result.stdout | is-not-empty) {
# Parse nats notify output
# Format: [stream] [subject] [sequence] [timestamp] [payload]
let parsed = (parse-nats-events $events_result.stdout)
for event in $parsed {
let dispatch_result = (do {
dispatch-event $event.subject $event.payload
} | complete)
if $dispatch_result.exit_code != 0 {
let err = ($dispatch_result.stderr? | default "unknown error")
let red = (ansi red)
let reset = (ansi reset)
print $" $red[dispatch-error]$reset ($event.subject): $err"
}
}
} else if ($events_result.exit_code != 0) {
let err = ($events_result.stderr? | default "timeout or connection error")
let yellow = (ansi yellow)
let reset = (ansi reset)
print $" $yellow[listen-warn]$reset $err - retrying..."
}
let sleep_time = ($"($interval)s" | into duration)
sleep $sleep_time
}
}
def parse-nats-events [output: string]: nothing -> list {
let lines = ($output | split row "\n" | where { |line| $line | is-not-empty })
if ($lines | is-empty) { return [] }
# Skip header line if present
let data_lines = if ($lines.0? | str starts-with "Stream") {
$lines | skip 1
} else {
$lines
}
# Parse each line: [stream] [subject] [sequence] [timestamp] [payload...]
$data_lines | each { |line|
let trimmed = ($line | str trim)
if ($trimmed | is-empty) { return null }
# Split on first occurrence of spaces, being careful about payloads
let parts = ($trimmed | split row " " | where { |p| $p != "" })
if ($parts | length) < 3 {
return null
}
let stream = $parts.0?
let subject = $parts.1?
let sequence = $parts.2?
let timestamp = $parts.3?
# Remaining parts are the payload (might be JSON or empty)
let payload_str = (
$parts
| skip 4
| str join " "
)
let payload = (
if ($payload_str | is-empty) {
{}
} else {
do {
$payload_str | from json
} | complete | if $env._.exit_code == 0 { $env._.stdout } else { {} }
}
)
{
stream: $stream,
subject: $subject,
sequence: $sequence,
timestamp: $timestamp,
payload: $payload,
}
} | where { |e| $e != null }
}
def dispatch-event [subject: string, payload: record]: nothing -> nothing {
let config = (get-nats-config)
let handlers_dir = ($config.nats_events.handlers_dir? | default "reflection/handlers")
let root = ($env.ONTOREF_PROJECT_ROOT? | default $env.ONTOREF_ROOT)
# Map subject to handler file
let handler_path = match $subject {
"ecosystem.reflection.request" => {
$root + "/" + $handlers_dir + "/reflection-request.nu"
},
"ecosystem.ontology.changed" => {
$root + "/" + $handlers_dir + "/ontology-changed.nu"
},
"ecosystem.reflection.nushell.reload" => {
$root + "/" + $handlers_dir + "/reload.nu"
},
_ => { "" }
}
if ($handler_path | is-empty) {
let yellow = (ansi yellow)
let reset = (ansi reset)
print $" $yellow[dispatch]$reset No handler for ($subject)"
return
}
if not ($handler_path | path exists) {
error make { msg: $"Handler not found: ($handler_path)" }
}
# Execute handler using nu -c with payload JSON passed via environment
let payload_json = ($payload | to json)
let exec_result = (do {
env NATS_PAYLOAD=$payload_json
^nu $handler_path
} | complete)
if $exec_result.exit_code == 0 {
let check = (ansi green) + "✓" + (ansi reset)
print $" $check Handler executed: ($subject)"
} else {
let err = ($exec_result.stderr? | default "unknown error")
error make { msg: $"Handler failed for ($subject): $err" }
}
}
# -- Status ───────────────────────────────────────────────────────────────────
export def "nats-status" []: nothing -> nothing {
if not (nats-available) {
print " [NATS] unavailable"
return
}
let config = (get-nats-config)
let nats_server = ($config.nats_events.url? | default "nats://localhost:4222")
let enabled = ($config.nats_events.enabled? | default false)
let emit_count = ($config.nats_events.emit? | default [] | length)
let subscribe_count = ($config.nats_events.subscribe? | default [] | length)
print ""
let cyan = (ansi cyan)
let gray = (ansi dark_gray)
let reset = (ansi reset)
print $" $cyanNATS Event System$reset"
print $" $gray────────────────────────────────────────$reset"
print $" Server: $nats_server"
print $" Enabled: $enabled"
print $" Emit events: $emit_count"
print $" Subscriptions: $subscribe_count"
let status_result = (do {
with-env { NATS_SERVER: $nats_server } {
^nats status
}
} | complete)
if $status_result.exit_code == 0 {
print ""
print $status_result.stdout
} else {
print ""
let red = (ansi red)
let cross = $red + "✗" + $reset
print $" $cross Could not reach NATS server"
}
print ""
}