#!/usr/bin/env nu # nats.nu - Bidirectional NATS event system for reflection operations. # # Production-ready implementation with: # - Payload validation against Nickel contracts # - Robust event parsing from nats notify # - Real handler loading + execution # - Error handling + logging # - Environment variable management use ./store.nu * # -- Configuration ──────────────────────────────────────────────────────────── def get-nats-config []: nothing -> record { let root = ($env.ONTOREF_PROJECT_ROOT? | default $env.ONTOREF_ROOT) let config_path = $root + "/.ontoref/config.ncl" if not ($config_path | path exists) { return { nats_events: { enabled: false, url: "nats://localhost:4222", handlers_dir: "reflection/handlers" } } } let result = (do { ^nickel export $config_path } | complete) if $result.exit_code == 0 { $result.stdout | from json } else { { nats_events: { enabled: false, url: "nats://localhost:4222", handlers_dir: "reflection/handlers" } } } } def get-subjects []: nothing -> record { let root = ($env.ONTOREF_PROJECT_ROOT? | default $env.ONTOREF_ROOT) let subjects_path = $root + "/nats/subjects.ncl" let nickel_path = $root + ":/Users/Akasha/Tools/dev-system/ci/schemas" if not ($subjects_path | path exists) { return { subjects: {}, payloads: {} } } let result = (do { env NICKEL_IMPORT_PATH=$nickel_path ^nickel export $subjects_path } | complete) if $result.exit_code == 0 { $result.stdout | from json } else { { subjects: {}, payloads: {} } } } def event-to-contract-name [event: string]: nothing -> string { match $event { "mode.started" => "NushellModeStarted", "mode.completed" => "NushellModeCompleted", "mode.failed" => "NushellModeCompleted", "sync.completed" => "NushellSyncCompleted", "config.changed" => "NushellConfigChanged", "reload" => "NushellReload", _ => "" } } # -- Availability Check ─────────────────────────────────────────────────────── export def "nats-available" []: nothing -> bool { if ($env.NATS_AVAILABLE? | is-not-empty) { return ($env.NATS_AVAILABLE | into bool) } let config = (get-nats-config) let enabled = ($config.nats_events.enabled? | default false) if not $enabled { $env.NATS_AVAILABLE = "false" return false } let result = (do { nats status } | complete) let available = ($result.exit_code == 0) $env.NATS_AVAILABLE = ($available | into string) $available } def nats-subject [event: string]: nothing -> string { let subjects = (get-subjects) let mapping = $subjects.subjects match $event { "mode.started" => { $mapping.nushell_mode_started? | default "" }, "mode.completed" => { $mapping.nushell_mode_completed? | default "" }, "mode.failed" => { $mapping.nushell_mode_failed? | default "" }, "sync.completed" => { $mapping.nushell_sync_completed? | default "" }, "config.changed" => { $mapping.nushell_config_changed? | default "" }, "reload" => { $mapping.nushell_reload? | default "" }, "reflection.request" => { $mapping.reflection_request? | default "" }, "ontology.changed" => { $mapping.ontology_validated? | default "" }, _ => { "" } } } # Validate payload against schema field requirements def validate-payload [event: string, payload: record]: nothing -> record { let contract_name = (event-to-contract-name $event) match $event { "mode.started" => { # Required: mode_id, project, actor, timestamp if ($payload.mode_id? | is-empty) { error make { msg: "Invalid NushellModeStarted: missing mode_id" } } if ($payload.project? | is-empty) { error make { msg: "Invalid NushellModeStarted: missing project" } } if ($payload.actor? | is-empty) { error make { msg: "Invalid NushellModeStarted: missing actor" } } if ($payload.timestamp? | is-empty) { error make { msg: "Invalid NushellModeStarted: missing timestamp" } } }, "mode.completed" => { # Required: mode_id, project, status, steps_run, timestamp if ($payload.mode_id? | is-empty) { error make { msg: "Invalid NushellModeCompleted: missing mode_id" } } if ($payload.project? | is-empty) { error make { msg: "Invalid NushellModeCompleted: missing project" } } if ($payload.status? | is-empty) { error make { msg: "Invalid NushellModeCompleted: missing status" } } if ($payload.steps_run? | is-empty) { error make { msg: "Invalid NushellModeCompleted: missing steps_run" } } if ($payload.timestamp? | is-empty) { error make { msg: "Invalid NushellModeCompleted: missing timestamp" } } }, "sync.completed" => { # Required: project, nodes_ok, nodes_warn, nodes_err, timestamp if ($payload.project? | is-empty) { error make { msg: "Invalid NushellSyncCompleted: missing project" } } if ($payload.timestamp? | is-empty) { error make { msg: "Invalid NushellSyncCompleted: missing timestamp" } } }, "reload" => { # Required: reason, project if ($payload.reason? | is-empty) { error make { msg: "Invalid NushellReload: missing reason" } } }, _ => { } } $payload } # -- Emit ───────────────────────────────────────────────────────────────────── export def "nats-emit" [event: string]: record -> nothing { if not (nats-available) { return } let subject = (nats-subject $event) if ($subject | is-empty) { let cross = (ansi red) + "✗" + (ansi reset) print $" $cross Unknown event: ($event)" return } let config = (get-nats-config) let should_emit = ($config.nats_events.emit? | default [] | where { |e| $e == $event } | is-not-empty) if not $should_emit { print $" (ansi dark_gray)[nats-emit] ($event) not in emit list(ansi reset)" return } let payload = $in # Validate payload let validated = (do { validate-payload $event $payload } | complete) if $validated.exit_code != 0 { print $" (ansi red)✗(ansi reset) Validation failed: ($validated.stderr? | default 'unknown error')" return } # Publish to NATS let nats_url = ($config.nats_events.url? | default "nats://localhost:4222") let publish_result = (do { with-env { NATS_SERVER: $nats_url } { $payload | to json | ^nats pub $subject } } | complete) if $publish_result.exit_code == 0 { let check = (ansi green) + "✓" + (ansi reset) print $" $check Event published: ($event) → ($subject)" } else { let err = ($publish_result.stderr? | default "unknown error") let cross = (ansi red) + "✗" + (ansi reset) print $" $cross Publish failed: $err" } } # -- Listen ─────────────────────────────────────────────────────────────────── export def "nats-listen" [--interval: int = 2]: nothing -> nothing { if not (nats-available) { print " [nats-listen] NATS unavailable, skipping" return } let config = (get-nats-config) let nats_url = ($config.nats_events.url? | default "nats://localhost:4222") let subscribe_list = ($config.nats_events.subscribe? | default [] | str join ", ") let interval_text = ($interval | into string) let cyan = (ansi cyan) let gray = (ansi dark_gray) let reset = (ansi reset) print $" $cyan→$reset Listening for NATS events (interval: ($interval_text)s)" print $" $graySubscribe to: ($subscribe_list)$reset" print "" # Main loop loop { let events_result = (do { with-env { NATS_SERVER: $nats_url } { ^nats notify --count 50 --timeout $interval } } | complete) if $events_result.exit_code == 0 and ($events_result.stdout | is-not-empty) { # Parse nats notify output # Format: [stream] [subject] [sequence] [timestamp] [payload] let parsed = (parse-nats-events $events_result.stdout) for event in $parsed { let dispatch_result = (do { dispatch-event $event.subject $event.payload } | complete) if $dispatch_result.exit_code != 0 { let err = ($dispatch_result.stderr? | default "unknown error") let red = (ansi red) let reset = (ansi reset) print $" $red[dispatch-error]$reset ($event.subject): $err" } } } else if ($events_result.exit_code != 0) { let err = ($events_result.stderr? | default "timeout or connection error") let yellow = (ansi yellow) let reset = (ansi reset) print $" $yellow[listen-warn]$reset $err - retrying..." } let sleep_time = ($"($interval)s" | into duration) sleep $sleep_time } } def parse-nats-events [output: string]: nothing -> list { let lines = ($output | split row "\n" | where { |line| $line | is-not-empty }) if ($lines | is-empty) { return [] } # Skip header line if present let data_lines = if ($lines.0? | str starts-with "Stream") { $lines | skip 1 } else { $lines } # Parse each line: [stream] [subject] [sequence] [timestamp] [payload...] $data_lines | each { |line| let trimmed = ($line | str trim) if ($trimmed | is-empty) { return null } # Split on first occurrence of spaces, being careful about payloads let parts = ($trimmed | split row " " | where { |p| $p != "" }) if ($parts | length) < 3 { return null } let stream = $parts.0? let subject = $parts.1? let sequence = $parts.2? let timestamp = $parts.3? # Remaining parts are the payload (might be JSON or empty) let payload_str = ( $parts | skip 4 | str join " " ) let payload = ( if ($payload_str | is-empty) { {} } else { do { $payload_str | from json } | complete | if $env._.exit_code == 0 { $env._.stdout } else { {} } } ) { stream: $stream, subject: $subject, sequence: $sequence, timestamp: $timestamp, payload: $payload, } } | where { |e| $e != null } } def dispatch-event [subject: string, payload: record]: nothing -> nothing { let config = (get-nats-config) let handlers_dir = ($config.nats_events.handlers_dir? | default "reflection/handlers") let root = ($env.ONTOREF_PROJECT_ROOT? | default $env.ONTOREF_ROOT) # Map subject to handler file let handler_path = match $subject { "ecosystem.reflection.request" => { $root + "/" + $handlers_dir + "/reflection-request.nu" }, "ecosystem.ontology.changed" => { $root + "/" + $handlers_dir + "/ontology-changed.nu" }, "ecosystem.reflection.nushell.reload" => { $root + "/" + $handlers_dir + "/reload.nu" }, _ => { "" } } if ($handler_path | is-empty) { let yellow = (ansi yellow) let reset = (ansi reset) print $" $yellow[dispatch]$reset No handler for ($subject)" return } if not ($handler_path | path exists) { error make { msg: $"Handler not found: ($handler_path)" } } # Execute handler using nu -c with payload JSON passed via environment let payload_json = ($payload | to json) let exec_result = (do { env NATS_PAYLOAD=$payload_json ^nu $handler_path } | complete) if $exec_result.exit_code == 0 { let check = (ansi green) + "✓" + (ansi reset) print $" $check Handler executed: ($subject)" } else { let err = ($exec_result.stderr? | default "unknown error") error make { msg: $"Handler failed for ($subject): $err" } } } # -- Status ─────────────────────────────────────────────────────────────────── export def "nats-status" []: nothing -> nothing { if not (nats-available) { print " [NATS] unavailable" return } let config = (get-nats-config) let nats_server = ($config.nats_events.url? | default "nats://localhost:4222") let enabled = ($config.nats_events.enabled? | default false) let emit_count = ($config.nats_events.emit? | default [] | length) let subscribe_count = ($config.nats_events.subscribe? | default [] | length) print "" let cyan = (ansi cyan) let gray = (ansi dark_gray) let reset = (ansi reset) print $" $cyanNATS Event System$reset" print $" $gray────────────────────────────────────────$reset" print $" Server: $nats_server" print $" Enabled: $enabled" print $" Emit events: $emit_count" print $" Subscriptions: $subscribe_count" let status_result = (do { with-env { NATS_SERVER: $nats_server } { ^nats status } } | complete) if $status_result.exit_code == 0 { print "" print $status_result.stdout } else { print "" let red = (ansi red) let cross = $red + "✗" + $reset print $" $cross Could not reach NATS server" } print "" }