use ../lib_provisioning/user/config.nu [get-workspace-path, get-active-workspace-details] use ../lib_provisioning/utils/nickel_processor.nu [ncl-eval, ncl-eval-soft, default-ncl-paths] # Resolve provisioning root from env with default fallback. def wf-prov-root []: nothing -> string { $env.PROVISIONING? | default "/usr/local/provisioning" } # Export a Nickel file as parsed JSON. # # Provides workspace root and provisioning root as import paths so cross-workspace # schema references resolve correctly. def wf-ncl-export [ws_root: string, full_path: string]: nothing -> record { ncl-eval $full_path (default-ncl-paths $ws_root) } # Collect all workflow *.ncl files under infra/{infra}/workflows/ for a workspace. def wf-collect-workflow-files [ws_root: string]: nothing -> list { let infra_root = ($ws_root | path join "infra") if not ($infra_root | path exists) { return [] } let infra_dirs = (do { ^bash -c $"ls -1d ($infra_root)/*/workflows 2>/dev/null" } | complete) if $infra_dirs.exit_code != 0 or ($infra_dirs.stdout | str trim | is-empty) { return [] } $infra_dirs.stdout | lines | where { $in | str ends-with "workflows" } | each {|wf_dir| let ncl_files = (do { ^bash -c $"ls ($wf_dir)/*.ncl 2>/dev/null" } | complete) if $ncl_files.exit_code != 0 or ($ncl_files.stdout | str trim | is-empty) { [] } else { $ncl_files.stdout | lines | where { ($in | str trim | is-not-empty) } } } | flatten } # Resolve the install script for a component+mode from extensions/components/. # # Tries underscore/dash variants: component dir name and script suffix. Returns the # first existing path. Errors if none match. def wf-resolve-script [prov_root: string, comp_name: string, mode: string]: nothing -> string { let dash_name = ($comp_name | str replace --all "_" "-") let under_name = ($comp_name | str replace --all "-" "_") let combos = [ [$under_name, $under_name], [$under_name, $dash_name], [$dash_name, $dash_name], [$dash_name, $under_name], ] let found = ($combos | each {|pair| let p = ($prov_root | path join "extensions/components" $pair.0 $mode $"install-($pair.1).sh") if ($p | path exists) { $p } else { null } } | where { $in != null }) if ($found | is-empty) { error make { msg: $"No install script for component '($comp_name)' mode '($mode)' in ($prov_root)/extensions/components/ (tried all _/- variants)" } } $found | first } # Non-erroring variant for dry-run display. def wf-resolve-script-opt [prov_root: string, comp_name: string, mode: string]: nothing -> string { let dash_name = ($comp_name | str replace --all "_" "-") let under_name = ($comp_name | str replace --all "-" "_") let combos = [ [$under_name, $under_name], [$under_name, $dash_name], [$dash_name, $dash_name], [$dash_name, $under_name], ] let found = ($combos | each {|pair| let p = ($prov_root | path join "extensions/components" $pair.0 $mode $"install-($pair.1).sh") if ($p | path exists) { $p } else { null } } | where { $in != null }) if ($found | is-empty) { "" } else { $found | first } } # Resolve workspace name from optional arg or active workspace. def wf-resolve-ws-name [workspace: string]: nothing -> string { if ($workspace | is-not-empty) { $workspace } else { let details = (get-active-workspace-details) if ($details == null) { error make { msg: "No active workspace. Pass --workspace or activate one first." } } $details.name } } # Emit a NATS event for a workflow step — fire-and-forget, swallows errors when NATS unavailable. def wf-emit-event [subject: string, payload: record]: nothing -> nothing { let json_payload = ($payload | to json --raw) let result = (do { ^nats pub $subject $json_payload } | complete) if $result.exit_code != 0 { # NATS not available or misconfigured — log at debug level and continue. if ($env.PROVISIONING_DEBUG? | default false) { print $" [wf] NATS emit failed for ($subject): ($result.stderr)" } } } # Topological sort of workflow steps respecting depends_on edges. # # Returns steps in execution order. Errors on cycles or dangling references. def wf-topo-sort [steps: list]: nothing -> list { let ids = ($steps | each {|s| $s.id}) # Verify all depends_on targets exist. for step in $steps { let deps = ($step | get -o depends_on | default []) for dep in $deps { if not ($ids | any {|id| $id == $dep}) { error make { msg: $"Step '($step.id)' depends_on unknown step '($dep)'" } } } } # Kahn's algorithm: iteratively emit steps whose dependencies are satisfied. # $sorted_ids tracks completed ids as an immutable snapshot for closure capture. mut sorted = [] mut sorted_ids = [] mut remaining = $steps mut iterations = 0 let max_iter = ($steps | length) + 1 loop { if ($remaining | is-empty) { break } if $iterations >= $max_iter { let stuck = ($remaining | each {|s| $s.id} | str join ", ") error make { msg: $"Cycle detected in workflow step depends_on. Stuck on: ($stuck)" } } # Snapshot mutable state as immutable so closures can capture safely. let done_ids = $sorted_ids let ready = ($remaining | where {|step| let deps = ($step | get -o depends_on | default []) $deps | all {|dep| $done_ids | any {|done_id| $done_id == $dep}} }) if ($ready | is-empty) { let stuck = ($remaining | each {|s| $s.id} | str join ", ") error make { msg: $"No progress possible — possible cycle. Stuck on: ($stuck)" } } let ready_ids = ($ready | each {|s| $s.id}) $sorted = ($sorted | append $ready) $sorted_ids = ($sorted_ids | append $ready_ids) $remaining = ($remaining | where {|step| not ($ready_ids | any {|rid| $rid == $step.id})}) $iterations += 1 } $sorted } # Build env vars for a component script from its config record. # # Mirrors the cd-ext-env protocol: scalar fields as _, # complex fields as __JSON, full config as _CONFIG_JSON. def wf-build-env [comp_name: string, cfg: any]: nothing -> record { let prefix = ($comp_name | str upcase | str replace --all "-" "_" | str replace --all "." "_") let flat = if ($cfg | describe | str starts-with "record") { $cfg | transpose key val | reduce --fold {} {|entry, acc| let raw_key = ($entry.key | str upcase | str replace --all "-" "_" | str replace --all "." "_") let type_desc = ($entry.val | describe) let is_scalar = ($type_desc in ["string", "int", "float", "bool"]) let env_key = if $is_scalar { $"($prefix)_($raw_key)" } else { $"($prefix)_($raw_key)_JSON" } let env_val = if $type_desc == "string" { $entry.val } else if $is_scalar { $entry.val | into string } else { $entry.val | to json --raw } $acc | insert $env_key $env_val } } else { {} } $flat | insert $"($prefix)_CONFIG_JSON" ($cfg | to json --raw) } # Run a workflow by id, executing steps in topological order. # # Reads workflows/*.ncl from infra/{infra}/workflows/, exports each to find the matching # workflow id. Dispatches CMD_TSK={operation} to extension install scripts per target. # NATS events are emitted per step if NATS is available. export def "main workflow run" [ workflow_id: string # Workflow id to execute (matches workflow metadata.id) --workspace (-w): string # Workspace name (default: active) --infra (-i): string = "" # Infra subdirectory (default: auto-detected from workspace name) --dry-run (-n) # Print execution plan without running scripts ] : nothing -> nothing { let ws_name = (wf-resolve-ws-name $workspace) let ws_root = (get-workspace-path $ws_name) if ($ws_root | is-empty) or ($ws_root == null) { error make { msg: $"Workspace '($ws_name)' not found in registry." } } let prov_root = (wf-prov-root) let wf_files = (wf-collect-workflow-files $ws_root) if ($wf_files | is-empty) { error make { msg: $"No workflow files found under ($ws_root)/infra/*/workflows/" } } # Find the workflow definition matching the requested id. mut wf_def = null mut wf_meta = null for wf_file in $wf_files { let exported = (wf-ncl-export $ws_root $wf_file) # Each workflow NCL exports a record whose values are either WorkflowDef (has `id` + # `steps`) or WorkflowMetadata (has `id` + `name` + `actors`). # We scan every key in the file — metadata may appear before or after the def. let keys = ($exported | columns) for key in $keys { let entry = ($exported | get $key) let entry_id = ($entry | get -o id | default "") if $entry_id != $workflow_id { continue } # WorkflowDef has `steps`; WorkflowMetadata has `actors`. if ($entry | get -o steps | default null) != null { $wf_def = $entry } else if ($entry | get -o actors | default null) != null { $wf_meta = $entry } } if $wf_def != null and $wf_meta != null { break } } if $wf_def == null { error make { msg: $"Workflow '($workflow_id)' not found in any infra/*/workflows/*.ncl under ($ws_root)" } } # Load settings.ncl to resolve component configs. let infra_name = if ($infra | is-not-empty) { $infra } else { # Auto-detect: pick the first infra dir that has a workflows/ subdir. let infra_root = ($ws_root | path join "infra") let candidates = (do { ls $infra_root } | complete) if $candidates.exit_code != 0 { error make { msg: $"Cannot list infra dir ($infra_root) — pass --infra explicitly." } } let found = ($candidates.stdout | where type == "dir" | each {|d| $d.name | path basename } | where {|name| ($infra_root | path join $name "workflows") | path exists } | first ) if ($found | is-empty) { error make { msg: "Cannot auto-detect infra name — no infra/*/workflows/ found. Pass --infra explicitly." } } $found } let settings_path = ($ws_root | path join "infra" $infra_name "settings.ncl") let settings = if ($settings_path | path exists) { (wf-ncl-export $ws_root $settings_path) } else { { components: {} } } let components = ($settings | get -o components | default {}) let steps_raw = ($wf_def | get -o steps | default []) let steps = (wf-topo-sort $steps_raw) let nats_prefix = if $wf_meta != null { $wf_meta | get -o notifications | default {} | get -o subject_prefix | default $"workflow.($workflow_id)" } else { $"workflow.($workflow_id)" } print $"Workflow: ($workflow_id)" if $dry_run { print "DRY RUN — scripts will not execute" } print $"Steps: ($steps | length)" print "" mut completed = [] for step in $steps { let step_id = $step.id let targets = ($step | get -o targets | default []) let on_error = ($step | get -o on_error | default "Stop") print $"[($step_id)]" for target in $targets { let comp_name = ($target | get -o component | default "") let operation = ($target | get -o operation | default "install") if ($comp_name | is-empty) { print $" skip: target missing component field" continue } let comp_cfg = ($components | get -o $comp_name | default {}) let comp_mode = ($comp_cfg | get -o mode | default "taskserv" | into string | str replace "'" "") let env_vars = (wf-build-env $comp_name $comp_cfg) let full_env = ($env_vars | insert CMD_TSK $operation) if $dry_run { let script_display = (wf-resolve-script-opt $prov_root $comp_name $comp_mode) print $" component: ($comp_name) operation: ($operation)" print $" mode: ($comp_mode)" print $" script: ($script_display)" print $" env keys: ($full_env | columns | sort | str join ', ')" } else { let ts_start = (date now | format date "%Y-%m-%dT%H:%M:%SZ") (wf-emit-event $"($nats_prefix).step.($step_id).started" { workflow_id: $workflow_id, step_id: $step_id, component: $comp_name, operation: $operation, timestamp: $ts_start, status: "started", }) let script = (wf-resolve-script $prov_root $comp_name $comp_mode) print $" component: ($comp_name) operation: ($operation)" print $" script: ($script)" with-env $full_env { ^bash $script } let exit_code = $env.LAST_EXIT_CODE let ts_done = (date now | format date "%Y-%m-%dT%H:%M:%SZ") if $exit_code == 0 { (wf-emit-event $"($nats_prefix).step.($step_id).completed" { workflow_id: $workflow_id, step_id: $step_id, component: $comp_name, operation: $operation, timestamp: $ts_done, status: "completed", }) } else { (wf-emit-event $"($nats_prefix).step.($step_id).failed" { workflow_id: $workflow_id, step_id: $step_id, component: $comp_name, operation: $operation, timestamp: $ts_done, status: "failed", exit_code: ($exit_code | into string), }) let on_error_str = ($on_error | into string) if $on_error_str == "Stop" { error make { msg: $"Step '($step_id)' target ($comp_name)/($operation) exited ($exit_code) — on_error=Stop" } } else { print $" WARN: step exited ($exit_code) — on_error=($on_error_str), continuing" } } } } $completed = ($completed | append $step_id) print "" } print $"Workflow ($workflow_id): done" } # List all workflows declared in infra/{infra}/workflows/*.ncl for a workspace. export def "main workflow list" [ --workspace (-w): string # Workspace name (default: active) ] : nothing -> table { let ws_name = (wf-resolve-ws-name $workspace) let ws_root = (get-workspace-path $ws_name) if ($ws_root | is-empty) or ($ws_root == null) { error make { msg: $"Workspace '($ws_name)' not found in registry." } } let wf_files = (wf-collect-workflow-files $ws_root) if ($wf_files | is-empty) { return [] } mut rows = [] for wf_file in $wf_files { let exported = (wf-ncl-export $ws_root $wf_file) let keys = ($exported | columns) # WorkflowDef has `steps`; WorkflowMetadata has `actors`. Distinguish by struct shape, # not key name — avoids fragility when authors name keys freely. mut meta_map = {} mut def_map = {} for key in $keys { let entry = ($exported | get $key) let eid = ($entry | get -o id | default $key) if ($entry | get -o steps | default null) != null { $def_map = ($def_map | insert $eid $entry) } else if ($entry | get -o actors | default null) != null { $meta_map = ($meta_map | insert $eid $entry) } } for wf_id in ($def_map | columns) { let def = ($def_map | get $wf_id) let meta = ($meta_map | get -o $wf_id | default {}) let row = { id: $wf_id, name: ($meta | get -o name | default $wf_id), description: ($meta | get -o description | default ($def | get -o description | default "")), steps_count: ($def | get -o steps | default [] | length), fsm_dimension: ($meta | get -o fsm_dimension | default ""), } $rows = ($rows | append $row) } } $rows } # Show FSM dimension state for a workflow's tracked dimension. export def "main workflow status" [ workflow_id: string # Workflow id --workspace (-w): string # Workspace name (default: active) ] : nothing -> record { let ws_name = (wf-resolve-ws-name $workspace) let ws_root = (get-workspace-path $ws_name) if ($ws_root | is-empty) or ($ws_root == null) { error make { msg: $"Workspace '($ws_name)' not found in registry." } } # Find the metadata block to get fsm_dimension. let wf_files = (wf-collect-workflow-files $ws_root) mut fsm_dim = "" for wf_file in $wf_files { let exported = (wf-ncl-export $ws_root $wf_file) for key in ($exported | columns) { let entry = ($exported | get $key) if ($key | str ends-with "metadata") and ($entry | get -o id | default "") == $workflow_id { $fsm_dim = ($entry | get -o fsm_dimension | default "") break } } if ($fsm_dim | is-not-empty) { break } } if ($fsm_dim | is-empty) { return { workflow_id: $workflow_id, fsm_dimension: null, current_state: "unknown", desired_state: "unknown" } } # Read state.ncl — look for the dimension matching fsm_dim. let state_path = ($ws_root | path join ".ontology" "state.ncl") if not ($state_path | path exists) { return { workflow_id: $workflow_id, fsm_dimension: $fsm_dim, current_state: "unknown", desired_state: "unknown" } } let state = (wf-ncl-export $ws_root $state_path) let dim = ($state | get -o dimensions | default [] | where {|d| $d.id == $fsm_dim} | first) if ($dim | is-empty) or ($dim == null) { return { workflow_id: $workflow_id, fsm_dimension: $fsm_dim, current_state: "unknown", desired_state: "unknown" } } { workflow_id: $workflow_id, fsm_dimension: $fsm_dim, current_state: ($dim | get -o current_state | default "unknown"), desired_state: ($dim | get -o desired_state | default "unknown"), } } # Cross-validate all workflows in a workspace against settings.ncl and each other. # # Checks: component exists in settings, operation supported by component, depends_on # references valid step ids, fsm_dimension referenced in metadata exists in state.ncl. export def "main workflow validate" [ --workspace (-w): string # Workspace name (default: active) ] : nothing -> table { let ws_name = (wf-resolve-ws-name $workspace) let ws_root = (get-workspace-path $ws_name) if ($ws_root | is-empty) or ($ws_root == null) { error make { msg: $"Workspace '($ws_name)' not found in registry." } } # Load all infra settings.ncl files (may be multiple infra dirs). let infra_root = ($ws_root | path join "infra") let infra_dirs = (do { ^bash -c $"ls -1d ($infra_root)/*/settings.ncl 2>/dev/null" } | complete) mut all_components = {} if $infra_dirs.exit_code == 0 and ($infra_dirs.stdout | str trim | is-not-empty) { for settings_path in ($infra_dirs.stdout | lines | where { $in | str trim | is-not-empty }) { let comps = ncl-eval-soft $settings_path (default-ncl-paths $ws_root) {} | get -o components | default {} $all_components = ($all_components | merge $comps) } } # Load state.ncl dimension ids for fsm_dimension check. let state_path = ($ws_root | path join ".ontology" "state.ncl") let known_dimensions = if ($state_path | path exists) { ncl-eval-soft $state_path (default-ncl-paths $ws_root) {} | get -o dimensions | default [] | each {|d| $d.id} } else { [] } let wf_files = (wf-collect-workflow-files $ws_root) mut rows = [] for wf_file in $wf_files { let exported = (wf-ncl-export $ws_root $wf_file) let keys = ($exported | columns) mut def_by_id = {} mut meta_by_id = {} for key in $keys { let entry = ($exported | get $key) let eid = ($entry | get -o id | default $key) if ($entry | get -o steps | default null) != null { $def_by_id = ($def_by_id | insert $eid $entry) } else if ($entry | get -o actors | default null) != null { $meta_by_id = ($meta_by_id | insert $eid $entry) } } for wf_id in ($def_by_id | columns) { let def = ($def_by_id | get $wf_id) let meta = ($meta_by_id | get -o $wf_id | default {}) let steps = ($def | get -o steps | default []) let step_ids = ($steps | each {|s| $s.id}) # FSM dimension check. let fsm_dim = ($meta | get -o fsm_dimension | default "") if ($fsm_dim | is-not-empty) { let dim_ok = ($known_dimensions | any {|d| $d == $fsm_dim}) let row = { workflow: $wf_id, step: "(metadata)", check: $"fsm_dimension '($fsm_dim)' exists in state.ncl", status: (if $dim_ok { "PASS" } else { "WARN" }), } $rows = ($rows | append $row) } for step in $steps { let step_id = $step.id let targets = ($step | get -o targets | default []) # depends_on references valid step ids. let deps = ($step | get -o depends_on | default []) for dep in $deps { let dep_ok = ($step_ids | any {|id| $id == $dep}) $rows = ($rows | append { workflow: $wf_id, step: $step_id, check: $"depends_on '($dep)' exists in workflow", status: (if $dep_ok { "PASS" } else { "FAIL" }), }) } for target in $targets { let comp_name = ($target | get -o component | default "") let operation = ($target | get -o operation | default "install") if ($comp_name | is-empty) { $rows = ($rows | append { workflow: $wf_id, step: $step_id, check: "target has component field", status: "FAIL", }) continue } # Component exists in settings. let comp_exists = ($all_components | columns | any {|c| $c == $comp_name}) $rows = ($rows | append { workflow: $wf_id, step: $step_id, check: $"component '($comp_name)' in settings.ncl", status: (if $comp_exists { "PASS" } else { "FAIL" }), }) # Operation supported by component. if $comp_exists { let comp_cfg = ($all_components | get $comp_name) let ops = ($comp_cfg | get -o operations | default {}) let op_val = ($ops | get -o $operation | default false) let op_ok = ($op_val == true) $rows = ($rows | append { workflow: $wf_id, step: $step_id, check: $"component '($comp_name)' supports operation '($operation)'", status: (if $op_ok { "PASS" } else { "FAIL" }), }) } } } } } $rows }