prvng_core/nulib/main_provisioning/workflow.nu
Jesús Pérez 894046ef5a
feat(core): three-layer DAG, unified component arch, commands-registry cache, Nushell 0.112.2 migration
- DAG architecture: `dag show/validate/export` (nulib/main_provisioning/dag.nu),
    config loader (lib_provisioning/config/loader/dag.nu), taskserv dag-executor.
    Backed by schemas/lib/dag/*.ncl; orchestrator emits NATS events via
    WorkspaceComposition::into_workflow. See ADR-020, ADR-021.
  - Unified Component Architecture: components/mod.nu, main_provisioning/
    {components,workflow,extensions,ontoref-queries}.nu. Full workflow engine with
    topological sort and NATS subject emission. Blocks A-H complete (libre-daoshi).
  - Commands-registry: nulib/commands-registry.ncl (Nickel source, 314 lines) +
    JSON cache at ~/.cache/provisioning/commands-registry.json rebuilt on source
    change. cli/provisioning fast-path alias expansion avoids cold Nu startup.
    ADDING_COMMANDS.md documents new-command workflow.
  - Platform service manager: service-manager.nu (+573), startup.nu (+611),
    service-check.nu (+255); autostart/bootstrap/health/target refactored.
  - Nushell 0.112.2 migration: removed all try/catch and bash redirections;
    external commands prefixed with ^; type signatures enforced. Driven by
    scripts/refactor-try-catch{,-simplified}.nu.
  - TTY stack: removed shlib/*-tty.sh; replaced by cli/tty-dispatch.sh,
    tty-filter.sh, tty-commands.conf.
  - New domain modules: images/ (golden image lifecycle), workspace/{state,sync}.nu,
    main_provisioning/{bootstrap,cluster-deploy,fip,state}.nu, commands/{state,
    build,integrations/auth,utilities/alias}.nu, platform.nu expanded (+874).
  - Config loader overhaul: loader/core.nu slimmed (-759), cache/core.nu
    refactored (-454), removed legacy loaders/file_loader.nu (-330).
  - Thirteen new provisioning-<domain>.nu top-level modules for bash dispatcher.
  - Tests: test_workspace_state.nu (+351); updates to test_oci_registry,
    test_services.
  - README + CHANGELOG updated.
2026-04-17 04:27:33 +01:00

588 lines
24 KiB
Text

use ../lib_provisioning/user/config.nu [get-workspace-path, get-active-workspace-details]
use ../lib_provisioning/utils/nickel_processor.nu [ncl-eval, ncl-eval-soft, default-ncl-paths]
# Resolve provisioning root from env with default fallback.
def wf-prov-root []: nothing -> string {
$env.PROVISIONING? | default "/usr/local/provisioning"
}
# Export a Nickel file as parsed JSON.
#
# Provides workspace root and provisioning root as import paths so cross-workspace
# schema references resolve correctly.
def wf-ncl-export [ws_root: string, full_path: string]: nothing -> record {
ncl-eval $full_path (default-ncl-paths $ws_root)
}
# Collect all workflow *.ncl files under infra/{infra}/workflows/ for a workspace.
def wf-collect-workflow-files [ws_root: string]: nothing -> list {
let infra_root = ($ws_root | path join "infra")
if not ($infra_root | path exists) {
return []
}
let infra_dirs = (do { ^bash -c $"ls -1d ($infra_root)/*/workflows 2>/dev/null" } | complete)
if $infra_dirs.exit_code != 0 or ($infra_dirs.stdout | str trim | is-empty) {
return []
}
$infra_dirs.stdout
| lines
| where { $in | str ends-with "workflows" }
| each {|wf_dir|
let ncl_files = (do { ^bash -c $"ls ($wf_dir)/*.ncl 2>/dev/null" } | complete)
if $ncl_files.exit_code != 0 or ($ncl_files.stdout | str trim | is-empty) {
[]
} else {
$ncl_files.stdout | lines | where { ($in | str trim | is-not-empty) }
}
}
| flatten
}
# Resolve the install script for a component+mode from extensions/components/.
#
# Tries underscore/dash variants: component dir name and script suffix. Returns the
# first existing path. Errors if none match.
def wf-resolve-script [prov_root: string, comp_name: string, mode: string]: nothing -> string {
let dash_name = ($comp_name | str replace --all "_" "-")
let under_name = ($comp_name | str replace --all "-" "_")
let combos = [
[$under_name, $under_name],
[$under_name, $dash_name],
[$dash_name, $dash_name],
[$dash_name, $under_name],
]
let found = ($combos | each {|pair|
let p = ($prov_root | path join "extensions/components" $pair.0 $mode $"install-($pair.1).sh")
if ($p | path exists) { $p } else { null }
} | where { $in != null })
if ($found | is-empty) {
error make { msg: $"No install script for component '($comp_name)' mode '($mode)' in ($prov_root)/extensions/components/ (tried all _/- variants)" }
}
$found | first
}
# Non-erroring variant for dry-run display.
def wf-resolve-script-opt [prov_root: string, comp_name: string, mode: string]: nothing -> string {
let dash_name = ($comp_name | str replace --all "_" "-")
let under_name = ($comp_name | str replace --all "-" "_")
let combos = [
[$under_name, $under_name],
[$under_name, $dash_name],
[$dash_name, $dash_name],
[$dash_name, $under_name],
]
let found = ($combos | each {|pair|
let p = ($prov_root | path join "extensions/components" $pair.0 $mode $"install-($pair.1).sh")
if ($p | path exists) { $p } else { null }
} | where { $in != null })
if ($found | is-empty) { "<not found>" } else { $found | first }
}
# Resolve workspace name from optional arg or active workspace.
def wf-resolve-ws-name [workspace: string]: nothing -> string {
if ($workspace | is-not-empty) {
$workspace
} else {
let details = (get-active-workspace-details)
if ($details == null) {
error make { msg: "No active workspace. Pass --workspace or activate one first." }
}
$details.name
}
}
# Emit a NATS event for a workflow step — fire-and-forget, swallows errors when NATS unavailable.
def wf-emit-event [subject: string, payload: record]: nothing -> nothing {
let json_payload = ($payload | to json --raw)
let result = (do { ^nats pub $subject $json_payload } | complete)
if $result.exit_code != 0 {
# NATS not available or misconfigured — log at debug level and continue.
if ($env.PROVISIONING_DEBUG? | default false) {
print $" [wf] NATS emit failed for ($subject): ($result.stderr)"
}
}
}
# Topological sort of workflow steps respecting depends_on edges.
#
# Returns steps in execution order. Errors on cycles or dangling references.
def wf-topo-sort [steps: list]: nothing -> list {
let ids = ($steps | each {|s| $s.id})
# Verify all depends_on targets exist.
for step in $steps {
let deps = ($step | get -o depends_on | default [])
for dep in $deps {
if not ($ids | any {|id| $id == $dep}) {
error make { msg: $"Step '($step.id)' depends_on unknown step '($dep)'" }
}
}
}
# Kahn's algorithm: iteratively emit steps whose dependencies are satisfied.
# $sorted_ids tracks completed ids as an immutable snapshot for closure capture.
mut sorted = []
mut sorted_ids = []
mut remaining = $steps
mut iterations = 0
let max_iter = ($steps | length) + 1
loop {
if ($remaining | is-empty) { break }
if $iterations >= $max_iter {
let stuck = ($remaining | each {|s| $s.id} | str join ", ")
error make { msg: $"Cycle detected in workflow step depends_on. Stuck on: ($stuck)" }
}
# Snapshot mutable state as immutable so closures can capture safely.
let done_ids = $sorted_ids
let ready = ($remaining | where {|step|
let deps = ($step | get -o depends_on | default [])
$deps | all {|dep| $done_ids | any {|done_id| $done_id == $dep}}
})
if ($ready | is-empty) {
let stuck = ($remaining | each {|s| $s.id} | str join ", ")
error make { msg: $"No progress possible — possible cycle. Stuck on: ($stuck)" }
}
let ready_ids = ($ready | each {|s| $s.id})
$sorted = ($sorted | append $ready)
$sorted_ids = ($sorted_ids | append $ready_ids)
$remaining = ($remaining | where {|step| not ($ready_ids | any {|rid| $rid == $step.id})})
$iterations += 1
}
$sorted
}
# Build env vars for a component script from its config record.
#
# Mirrors the cd-ext-env protocol: scalar fields as <PREFIX>_<FIELD>,
# complex fields as <PREFIX>_<FIELD>_JSON, full config as <PREFIX>_CONFIG_JSON.
def wf-build-env [comp_name: string, cfg: any]: nothing -> record {
let prefix = ($comp_name | str upcase | str replace --all "-" "_" | str replace --all "." "_")
let flat = if ($cfg | describe | str starts-with "record") {
$cfg | transpose key val | reduce --fold {} {|entry, acc|
let raw_key = ($entry.key | str upcase | str replace --all "-" "_" | str replace --all "." "_")
let type_desc = ($entry.val | describe)
let is_scalar = ($type_desc in ["string", "int", "float", "bool"])
let env_key = if $is_scalar { $"($prefix)_($raw_key)" } else { $"($prefix)_($raw_key)_JSON" }
let env_val = if $type_desc == "string" {
$entry.val
} else if $is_scalar {
$entry.val | into string
} else {
$entry.val | to json --raw
}
$acc | insert $env_key $env_val
}
} else {
{}
}
$flat | insert $"($prefix)_CONFIG_JSON" ($cfg | to json --raw)
}
# Run a workflow by id, executing steps in topological order.
#
# Reads workflows/*.ncl from infra/{infra}/workflows/, exports each to find the matching
# workflow id. Dispatches CMD_TSK={operation} to extension install scripts per target.
# NATS events are emitted per step if NATS is available.
export def "main workflow run" [
workflow_id: string # Workflow id to execute (matches workflow metadata.id)
--workspace (-w): string # Workspace name (default: active)
--infra (-i): string = "" # Infra subdirectory (default: auto-detected from workspace name)
--dry-run (-n) # Print execution plan without running scripts
] : nothing -> nothing {
let ws_name = (wf-resolve-ws-name $workspace)
let ws_root = (get-workspace-path $ws_name)
if ($ws_root | is-empty) or ($ws_root == null) {
error make { msg: $"Workspace '($ws_name)' not found in registry." }
}
let prov_root = (wf-prov-root)
let wf_files = (wf-collect-workflow-files $ws_root)
if ($wf_files | is-empty) {
error make { msg: $"No workflow files found under ($ws_root)/infra/*/workflows/" }
}
# Find the workflow definition matching the requested id.
mut wf_def = null
mut wf_meta = null
for wf_file in $wf_files {
let exported = (wf-ncl-export $ws_root $wf_file)
# Each workflow NCL exports a record whose values are either WorkflowDef (has `id` +
# `steps`) or WorkflowMetadata (has `id` + `name` + `actors`).
# We scan every key in the file — metadata may appear before or after the def.
let keys = ($exported | columns)
for key in $keys {
let entry = ($exported | get $key)
let entry_id = ($entry | get -o id | default "")
if $entry_id != $workflow_id { continue }
# WorkflowDef has `steps`; WorkflowMetadata has `actors`.
if ($entry | get -o steps | default null) != null {
$wf_def = $entry
} else if ($entry | get -o actors | default null) != null {
$wf_meta = $entry
}
}
if $wf_def != null and $wf_meta != null { break }
}
if $wf_def == null {
error make { msg: $"Workflow '($workflow_id)' not found in any infra/*/workflows/*.ncl under ($ws_root)" }
}
# Load settings.ncl to resolve component configs.
let infra_name = if ($infra | is-not-empty) {
$infra
} else {
# Auto-detect: pick the first infra dir that has a workflows/ subdir.
let infra_root = ($ws_root | path join "infra")
let candidates = (do { ls $infra_root } | complete)
if $candidates.exit_code != 0 {
error make { msg: $"Cannot list infra dir ($infra_root) — pass --infra explicitly." }
}
let found = ($candidates.stdout
| where type == "dir"
| each {|d| $d.name | path basename }
| where {|name| ($infra_root | path join $name "workflows") | path exists }
| first
)
if ($found | is-empty) {
error make { msg: "Cannot auto-detect infra name — no infra/*/workflows/ found. Pass --infra explicitly." }
}
$found
}
let settings_path = ($ws_root | path join "infra" $infra_name "settings.ncl")
let settings = if ($settings_path | path exists) {
(wf-ncl-export $ws_root $settings_path)
} else {
{ components: {} }
}
let components = ($settings | get -o components | default {})
let steps_raw = ($wf_def | get -o steps | default [])
let steps = (wf-topo-sort $steps_raw)
let nats_prefix = if $wf_meta != null {
$wf_meta | get -o notifications | default {} | get -o subject_prefix | default $"workflow.($workflow_id)"
} else {
$"workflow.($workflow_id)"
}
print $"Workflow: ($workflow_id)"
if $dry_run { print "DRY RUN — scripts will not execute" }
print $"Steps: ($steps | length)"
print ""
mut completed = []
for step in $steps {
let step_id = $step.id
let targets = ($step | get -o targets | default [])
let on_error = ($step | get -o on_error | default "Stop")
print $"[($step_id)]"
for target in $targets {
let comp_name = ($target | get -o component | default "")
let operation = ($target | get -o operation | default "install")
if ($comp_name | is-empty) {
print $" skip: target missing component field"
continue
}
let comp_cfg = ($components | get -o $comp_name | default {})
let comp_mode = ($comp_cfg | get -o mode | default "taskserv" | into string | str replace "'" "")
let env_vars = (wf-build-env $comp_name $comp_cfg)
let full_env = ($env_vars | insert CMD_TSK $operation)
if $dry_run {
let script_display = (wf-resolve-script-opt $prov_root $comp_name $comp_mode)
print $" component: ($comp_name) operation: ($operation)"
print $" mode: ($comp_mode)"
print $" script: ($script_display)"
print $" env keys: ($full_env | columns | sort | str join ', ')"
} else {
let ts_start = (date now | format date "%Y-%m-%dT%H:%M:%SZ")
(wf-emit-event $"($nats_prefix).step.($step_id).started" {
workflow_id: $workflow_id,
step_id: $step_id,
component: $comp_name,
operation: $operation,
timestamp: $ts_start,
status: "started",
})
let script = (wf-resolve-script $prov_root $comp_name $comp_mode)
print $" component: ($comp_name) operation: ($operation)"
print $" script: ($script)"
with-env $full_env { ^bash $script }
let exit_code = $env.LAST_EXIT_CODE
let ts_done = (date now | format date "%Y-%m-%dT%H:%M:%SZ")
if $exit_code == 0 {
(wf-emit-event $"($nats_prefix).step.($step_id).completed" {
workflow_id: $workflow_id,
step_id: $step_id,
component: $comp_name,
operation: $operation,
timestamp: $ts_done,
status: "completed",
})
} else {
(wf-emit-event $"($nats_prefix).step.($step_id).failed" {
workflow_id: $workflow_id,
step_id: $step_id,
component: $comp_name,
operation: $operation,
timestamp: $ts_done,
status: "failed",
exit_code: ($exit_code | into string),
})
let on_error_str = ($on_error | into string)
if $on_error_str == "Stop" {
error make { msg: $"Step '($step_id)' target ($comp_name)/($operation) exited ($exit_code) — on_error=Stop" }
} else {
print $" WARN: step exited ($exit_code) — on_error=($on_error_str), continuing"
}
}
}
}
$completed = ($completed | append $step_id)
print ""
}
print $"Workflow ($workflow_id): done"
}
# List all workflows declared in infra/{infra}/workflows/*.ncl for a workspace.
export def "main workflow list" [
--workspace (-w): string # Workspace name (default: active)
] : nothing -> table {
let ws_name = (wf-resolve-ws-name $workspace)
let ws_root = (get-workspace-path $ws_name)
if ($ws_root | is-empty) or ($ws_root == null) {
error make { msg: $"Workspace '($ws_name)' not found in registry." }
}
let wf_files = (wf-collect-workflow-files $ws_root)
if ($wf_files | is-empty) {
return []
}
mut rows = []
for wf_file in $wf_files {
let exported = (wf-ncl-export $ws_root $wf_file)
let keys = ($exported | columns)
# WorkflowDef has `steps`; WorkflowMetadata has `actors`. Distinguish by struct shape,
# not key name — avoids fragility when authors name keys freely.
mut meta_map = {}
mut def_map = {}
for key in $keys {
let entry = ($exported | get $key)
let eid = ($entry | get -o id | default $key)
if ($entry | get -o steps | default null) != null {
$def_map = ($def_map | insert $eid $entry)
} else if ($entry | get -o actors | default null) != null {
$meta_map = ($meta_map | insert $eid $entry)
}
}
for wf_id in ($def_map | columns) {
let def = ($def_map | get $wf_id)
let meta = ($meta_map | get -o $wf_id | default {})
let row = {
id: $wf_id,
name: ($meta | get -o name | default $wf_id),
description: ($meta | get -o description | default ($def | get -o description | default "")),
steps_count: ($def | get -o steps | default [] | length),
fsm_dimension: ($meta | get -o fsm_dimension | default ""),
}
$rows = ($rows | append $row)
}
}
$rows
}
# Show FSM dimension state for a workflow's tracked dimension.
export def "main workflow status" [
workflow_id: string # Workflow id
--workspace (-w): string # Workspace name (default: active)
] : nothing -> record {
let ws_name = (wf-resolve-ws-name $workspace)
let ws_root = (get-workspace-path $ws_name)
if ($ws_root | is-empty) or ($ws_root == null) {
error make { msg: $"Workspace '($ws_name)' not found in registry." }
}
# Find the metadata block to get fsm_dimension.
let wf_files = (wf-collect-workflow-files $ws_root)
mut fsm_dim = ""
for wf_file in $wf_files {
let exported = (wf-ncl-export $ws_root $wf_file)
for key in ($exported | columns) {
let entry = ($exported | get $key)
if ($key | str ends-with "metadata") and ($entry | get -o id | default "") == $workflow_id {
$fsm_dim = ($entry | get -o fsm_dimension | default "")
break
}
}
if ($fsm_dim | is-not-empty) { break }
}
if ($fsm_dim | is-empty) {
return { workflow_id: $workflow_id, fsm_dimension: null, current_state: "unknown", desired_state: "unknown" }
}
# Read state.ncl — look for the dimension matching fsm_dim.
let state_path = ($ws_root | path join ".ontology" "state.ncl")
if not ($state_path | path exists) {
return { workflow_id: $workflow_id, fsm_dimension: $fsm_dim, current_state: "unknown", desired_state: "unknown" }
}
let state = (wf-ncl-export $ws_root $state_path)
let dim = ($state | get -o dimensions | default [] | where {|d| $d.id == $fsm_dim} | first)
if ($dim | is-empty) or ($dim == null) {
return { workflow_id: $workflow_id, fsm_dimension: $fsm_dim, current_state: "unknown", desired_state: "unknown" }
}
{
workflow_id: $workflow_id,
fsm_dimension: $fsm_dim,
current_state: ($dim | get -o current_state | default "unknown"),
desired_state: ($dim | get -o desired_state | default "unknown"),
}
}
# Cross-validate all workflows in a workspace against settings.ncl and each other.
#
# Checks: component exists in settings, operation supported by component, depends_on
# references valid step ids, fsm_dimension referenced in metadata exists in state.ncl.
export def "main workflow validate" [
--workspace (-w): string # Workspace name (default: active)
] : nothing -> table {
let ws_name = (wf-resolve-ws-name $workspace)
let ws_root = (get-workspace-path $ws_name)
if ($ws_root | is-empty) or ($ws_root == null) {
error make { msg: $"Workspace '($ws_name)' not found in registry." }
}
# Load all infra settings.ncl files (may be multiple infra dirs).
let infra_root = ($ws_root | path join "infra")
let infra_dirs = (do { ^bash -c $"ls -1d ($infra_root)/*/settings.ncl 2>/dev/null" } | complete)
mut all_components = {}
if $infra_dirs.exit_code == 0 and ($infra_dirs.stdout | str trim | is-not-empty) {
for settings_path in ($infra_dirs.stdout | lines | where { $in | str trim | is-not-empty }) {
let comps = ncl-eval-soft $settings_path (default-ncl-paths $ws_root) {} | get -o components | default {}
$all_components = ($all_components | merge $comps)
}
}
# Load state.ncl dimension ids for fsm_dimension check.
let state_path = ($ws_root | path join ".ontology" "state.ncl")
let known_dimensions = if ($state_path | path exists) {
ncl-eval-soft $state_path (default-ncl-paths $ws_root) {} | get -o dimensions | default [] | each {|d| $d.id}
} else { [] }
let wf_files = (wf-collect-workflow-files $ws_root)
mut rows = []
for wf_file in $wf_files {
let exported = (wf-ncl-export $ws_root $wf_file)
let keys = ($exported | columns)
mut def_by_id = {}
mut meta_by_id = {}
for key in $keys {
let entry = ($exported | get $key)
let eid = ($entry | get -o id | default $key)
if ($entry | get -o steps | default null) != null {
$def_by_id = ($def_by_id | insert $eid $entry)
} else if ($entry | get -o actors | default null) != null {
$meta_by_id = ($meta_by_id | insert $eid $entry)
}
}
for wf_id in ($def_by_id | columns) {
let def = ($def_by_id | get $wf_id)
let meta = ($meta_by_id | get -o $wf_id | default {})
let steps = ($def | get -o steps | default [])
let step_ids = ($steps | each {|s| $s.id})
# FSM dimension check.
let fsm_dim = ($meta | get -o fsm_dimension | default "")
if ($fsm_dim | is-not-empty) {
let dim_ok = ($known_dimensions | any {|d| $d == $fsm_dim})
let row = {
workflow: $wf_id,
step: "(metadata)",
check: $"fsm_dimension '($fsm_dim)' exists in state.ncl",
status: (if $dim_ok { "PASS" } else { "WARN" }),
}
$rows = ($rows | append $row)
}
for step in $steps {
let step_id = $step.id
let targets = ($step | get -o targets | default [])
# depends_on references valid step ids.
let deps = ($step | get -o depends_on | default [])
for dep in $deps {
let dep_ok = ($step_ids | any {|id| $id == $dep})
$rows = ($rows | append {
workflow: $wf_id,
step: $step_id,
check: $"depends_on '($dep)' exists in workflow",
status: (if $dep_ok { "PASS" } else { "FAIL" }),
})
}
for target in $targets {
let comp_name = ($target | get -o component | default "")
let operation = ($target | get -o operation | default "install")
if ($comp_name | is-empty) {
$rows = ($rows | append {
workflow: $wf_id,
step: $step_id,
check: "target has component field",
status: "FAIL",
})
continue
}
# Component exists in settings.
let comp_exists = ($all_components | columns | any {|c| $c == $comp_name})
$rows = ($rows | append {
workflow: $wf_id,
step: $step_id,
check: $"component '($comp_name)' in settings.ncl",
status: (if $comp_exists { "PASS" } else { "FAIL" }),
})
# Operation supported by component.
if $comp_exists {
let comp_cfg = ($all_components | get $comp_name)
let ops = ($comp_cfg | get -o operations | default {})
let op_val = ($ops | get -o $operation | default false)
let op_ok = ($op_val == true)
$rows = ($rows | append {
workflow: $wf_id,
step: $step_id,
check: $"component '($comp_name)' supports operation '($operation)'",
status: (if $op_ok { "PASS" } else { "FAIL" }),
})
}
}
}
}
}
$rows
}