prvng_core/nulib/taskservs/handlers.nu
Jesús Pérez 894046ef5a
feat(core): three-layer DAG, unified component arch, commands-registry cache, Nushell 0.112.2 migration
- DAG architecture: `dag show/validate/export` (nulib/main_provisioning/dag.nu),
    config loader (lib_provisioning/config/loader/dag.nu), taskserv dag-executor.
    Backed by schemas/lib/dag/*.ncl; orchestrator emits NATS events via
    WorkspaceComposition::into_workflow. See ADR-020, ADR-021.
  - Unified Component Architecture: components/mod.nu, main_provisioning/
    {components,workflow,extensions,ontoref-queries}.nu. Full workflow engine with
    topological sort and NATS subject emission. Blocks A-H complete (libre-daoshi).
  - Commands-registry: nulib/commands-registry.ncl (Nickel source, 314 lines) +
    JSON cache at ~/.cache/provisioning/commands-registry.json rebuilt on source
    change. cli/provisioning fast-path alias expansion avoids cold Nu startup.
    ADDING_COMMANDS.md documents new-command workflow.
  - Platform service manager: service-manager.nu (+573), startup.nu (+611),
    service-check.nu (+255); autostart/bootstrap/health/target refactored.
  - Nushell 0.112.2 migration: removed all try/catch and bash redirections;
    external commands prefixed with ^; type signatures enforced. Driven by
    scripts/refactor-try-catch{,-simplified}.nu.
  - TTY stack: removed shlib/*-tty.sh; replaced by cli/tty-dispatch.sh,
    tty-filter.sh, tty-commands.conf.
  - New domain modules: images/ (golden image lifecycle), workspace/{state,sync}.nu,
    main_provisioning/{bootstrap,cluster-deploy,fip,state}.nu, commands/{state,
    build,integrations/auth,utilities/alias}.nu, platform.nu expanded (+874).
  - Config loader overhaul: loader/core.nu slimmed (-759), cache/core.nu
    refactored (-454), removed legacy loaders/file_loader.nu (-330).
  - Thirteen new provisioning-<domain>.nu top-level modules for bash dispatcher.
  - Tests: test_workspace_state.nu (+351); updates to test_oci_registry,
    test_services.
  - README + CHANGELOG updated.
2026-04-17 04:27:33 +01:00

354 lines
18 KiB
Text

use utils.nu *
# REMOVED: use lib_provisioning * - causes circular import
use run.nu *
use check_mode.nu *
use ../lib_provisioning/config/accessor.nu *
use ../lib_provisioning/utils/logging.nu [is-debug-enabled, is-debug-check-enabled]
use ../servers/utils.nu [servers_selector, wait_for_server]
use ../lib_provisioning/utils/hints.nu *
use ../workspace/state.nu *
use ../lib_provisioning/utils/nickel_processor.nu [ncl-eval]
# Resolve taskserv directory: checks direct (flat) then category subdirectories (hierarchical).
# Also tries underscore variant of hyphenated names (vol-prepare → vol_prepare).
def find-taskserv-path [taskservs_path: string, name: string]: nothing -> string {
let alt = ($name | str replace --all "-" "_")
let names = if $alt != $name { [$name, $alt] } else { [$name] }
for n in $names {
let direct = ($taskservs_path | path join $n)
if ($direct | path exists) { return $direct }
}
if not ($taskservs_path | path exists) { return "" }
for n in $names {
let found = (do -i { ls $taskservs_path } | where type == "dir" | each {|cat|
let candidate = ($cat.name | path join $n)
if ($candidate | path exists) { $candidate } else { null }
} | compact)
if ($found | is-not-empty) { return ($found | first) }
}
""
}
#use ../extensions/taskservs/run.nu run_taskserv
def install_from_server [
defs: record
server_taskserv_path: string
wk_server: string
]: nothing -> bool {
_print (
$"(_ansi yellow_bold)($defs.taskserv.name)(_ansi reset) (_ansi default_dimmed)on(_ansi reset) " +
$"($defs.server.hostname) (_ansi default_dimmed)install(_ansi reset) " +
$"(_ansi purple_bold)from ($defs.taskserv_install_mode)(_ansi reset)"
)
let run_taskservs_path = (get-run-taskservs-path)
(run_taskserv $defs
($run_taskservs_path | path join $defs.taskserv.name | path join $server_taskserv_path)
($wk_server | path join $defs.taskserv.name))
}
def install_from_library [
defs: record
server_taskserv_path: string
wk_server: string
]: nothing -> bool {
_print (
$"(_ansi yellow_bold)($defs.taskserv.name)(_ansi reset) (_ansi default_dimmed)on(_ansi reset) " +
$"($defs.server.hostname) (_ansi default_dimmed)install(_ansi reset) " +
$"(_ansi purple_bold)from library(_ansi reset)"
)
let taskservs_path = (get-taskservs-path)
let taskserv_dir = (find-taskserv-path $taskservs_path $defs.taskserv.name)
(run_taskserv $defs
($taskserv_dir | path join $defs.taskserv_profile)
($wk_server | path join $defs.taskserv.name))
}
# Build a map of taskserv_name → [depends_on taskserv_names] from a formula DAG.
# Reads the formula whose id matches "<hostname>-formula".
# Returns {} if the formula is not found or the DAG file does not exist.
def load-dag-deps [settings: record, hostname: string]: nothing -> record {
let dag_path = ($settings.infra_path | path join "dag.ncl")
if not ($dag_path | path exists) { return {} }
let prov_path = ($env.PROVISIONING? | default "/usr/local/provisioning")
let dag = (try {
ncl-eval $dag_path [$prov_path]
} catch {
return {}
})
let formula_id = $"($hostname)-formula"
let formula = ($dag.composition?.formulas? | default []
| where {|f| $f.formula_id? == $formula_id} | get 0?)
if ($formula | is-empty) { return {} }
# Build map: taskserv_name → [dep_taskserv_names]
# Formula nodes have: { id, taskserv: {name}, depends_on: [{node_id}] }
# We need to resolve node_id → taskserv.name via the nodes list.
let nodes = ($formula.nodes? | default [])
let id_to_name = ($nodes | each {|n|
{ id: $n.id, name: ($n.taskserv?.name? | default $n.id) }
})
$nodes | each {|n|
let ts_name = ($n.taskserv?.name? | default $n.id)
let dep_names = ($n.depends_on? | default [] | each {|d|
let resolved = ($id_to_name | where id == $d.node_id | first?)
if ($resolved | is-not-empty) { $resolved.name } else { $d.node_id }
})
{ $ts_name: $dep_names }
} | reduce -f {} {|it, acc| $acc | merge $it }
}
export def on_taskservs [
settings: record
match_taskserv: string
match_taskserv_profile: string
match_server: string
iptype: string
check: bool
upload: bool = false
reset: bool = false
cmd: string = ""
force_delete: bool = false
] {
_print $"Running (_ansi yellow_bold)taskservs(_ansi reset) ..."
let provisioning_sops = ($env.PROVISIONING_SOPS? | default "")
if $provisioning_sops == "" {
# A SOPS load env
$env.CURRENT_INFRA_PATH = $settings.infra_path
use ../sops_env.nu
}
let ip_type = if $iptype == "" { "public" } else { $iptype }
let str_created_taskservs_dirpath = ( $settings.data | get -o created_taskservs_dirpath | default (["/tmp"] | path join) |
str replace "./" $"($settings.src_path)/" | str replace "~" $env.HOME | str replace "NOW" $env.NOW
)
let created_taskservs_dirpath = if ($str_created_taskservs_dirpath | str starts-with "/" ) { $str_created_taskservs_dirpath } else { $settings.src_path | path join $str_created_taskservs_dirpath }
let root_wk_server = ($created_taskservs_dirpath | path join "on-server")
if not ($root_wk_server | path exists ) { ^mkdir "-p" $root_wk_server }
let dflt_clean_created_taskservs = ($settings.data.clean_created_taskservs? | default $created_taskservs_dirpath |
str replace "./" $"($settings.src_path)/" | str replace "~" $env.HOME
)
let run_ops = if (is-debug-enabled) { "bash -x" } else { "" }
$settings.data.servers
| enumerate
| where {|it|
$match_server == "" or $it.item.hostname == $match_server
}
| each {|it|
let server_pos = $it.index
let srvr = $it.item
_print $"on (_ansi green_bold)($srvr.hostname)(_ansi reset) pos ($server_pos) ..."
let clean_created_taskservs = ($settings.data.servers | get $server_pos | get -o clean_created_taskservs | default $dflt_clean_created_taskservs)
# Determine IP address — resolve real IP when upload inspection is requested
let ip = if (is-debug-check-enabled) or ($check and not $upload) {
"127.0.0.1"
} else {
let curr_ip = (mw_get_ip $settings $srvr $ip_type false | default "")
if $curr_ip == "" {
_print $"🛑 No IP ($ip_type) found for (_ansi green_bold)($srvr.hostname)(_ansi reset) ($server_pos) "
null
} else {
# Check if server is in running state
if not (wait_for_server $server_pos $srvr $settings $curr_ip) {
_print $"🛑 server ($srvr.hostname) ($curr_ip) (_ansi red_bold)not in running state(_ansi reset)"
null
} else {
$curr_ip
}
}
}
# Process server only if we have valid IP
if ($ip != null) {
let server = ($srvr | merge { ip_addresses: { pub: $ip, priv: ($srvr | get -o network_private_ip | default ($srvr | get -o networking.private_ip | default "")) }})
let wk_server = ($root_wk_server | path join $server.hostname)
let workspace_path = ($settings.src_path? | default $env.PWD)
let dag_deps = (load-dag-deps $settings $server.hostname)
if ($wk_server | path exists ) { rm -rf $wk_server }
^mkdir "-p" $wk_server
let taskserv_list = if $force_delete and $cmd == "delete" {
# --force: build taskserv list from state file, servers.ncl, or explicit name.
# Covers: removed from servers.ncl, never tracked in state, or both.
let st_taskservs = (state-read $workspace_path
| get -o servers | default {}
| get -o $server.hostname | default {}
| get -o taskservs | default {})
let from_state = ($st_taskservs | transpose name state_data | where {|it|
let matches_taskserv = ($match_taskserv == "" or $match_taskserv == $it.name)
let matches_profile = ($match_taskserv_profile == "" or $match_taskserv_profile == ($it.state_data.profile? | default "default"))
$matches_taskserv and $matches_profile
} | each {|it| $it.name })
# If explicit taskserv requested but not found in state, force-create a synthetic entry
let names = if ($match_taskserv | is-not-empty) and $match_taskserv not-in $from_state {
$from_state | append $match_taskserv
} else {
$from_state
}
$names | enumerate | each {|it| {
index: $it.index,
item: {
name: $it.item,
install_mode: "library",
profile: ($st_taskservs | get -o $it.item | default {} | get -o profile | default "default"),
target_save_path: "",
depends_on: [],
on_error: "Continue",
max_retries: 0,
params: {},
},
}}
} else {
$server.taskservs | enumerate | where {|it|
let taskserv = $it.item
let matches_taskserv = ($match_taskserv == "" or $match_taskserv == $taskserv.name)
let matches_profile = ($match_taskserv_profile == "" or $match_taskserv_profile == $taskserv.profile)
$matches_taskserv and $matches_profile
}
}
mut stop_on_error = false
for it in $taskserv_list {
if $stop_on_error { break }
let taskserv = $it.item
let taskserv_pos = $it.index
let taskservs_path = (get-taskservs-path)
let taskserv_dir = (find-taskserv-path $taskservs_path $taskserv.name)
# Check if taskserv path exists - skip if not found
if ($taskserv_dir | is-empty) {
_print $"taskserv path: ($taskservs_path)/($taskserv.name) (_ansi red_bold)not found(_ansi reset)"
} else {
# ── Resolve effective taskserv (cmd_task override) ────────────
let effective_taskserv = if ($cmd | is-not-empty) {
$taskserv | merge { cmd_task: $cmd }
} else if $reset {
$taskserv | merge { cmd_task: "reinstall" }
} else {
$taskserv
}
# Derive operation label and whether this is a deploy (install/reinstall)
# vs a maintenance op (update, scripts, restart, config, remove).
let effective_cmd = ($effective_taskserv.cmd_task? | default "install")
let effective_operation = match $effective_cmd {
"install" | "reinstall" => "create",
$op => $op,
}
# Only fresh installs go through the state-gate.
# reinstall/reset always runs regardless of current state.
let is_deploy = $effective_cmd == "install"
# ── State gate (fresh install only) ──────────────────────────
# reinstall, update, scripts, restart, config bypass the gate.
if not $check {
if $is_deploy {
let depends_on = ($dag_deps | get -o $taskserv.name | default [])
let decision = (state-node-decision-with-deps $workspace_path $server.hostname $taskserv.name $depends_on)
match $decision {
"skip" => {
let node = (state-node-get $workspace_path $server.hostname $taskserv.name)
_print $"⊘ ($taskserv.name) on ($server.hostname) — state=completed \(ended ($node.ended_at? | default '?')). Run reset first."
continue
},
$d if ($d | str starts-with "blocked:") => {
let blocker = ($d | str replace "blocked:" "")
_print $"⛔ ($taskserv.name) on ($server.hostname) — blocked by ($blocker) \(not completed)"
continue
},
"rerun" => {
_print $"↻ ($taskserv.name) on ($server.hostname) — failed, re-running"
},
_ => {},
}
} else {
_print $"↺ ($taskserv.name) on ($server.hostname) — ($effective_cmd)"
}
let actor = ($env.USER? | default "system")
let profile = ($taskserv.profile? | default "")
state-node-start $workspace_path $server.hostname $taskserv.name --actor $actor --source "orchestrator" --operation $effective_operation --profile $profile
}
# ─────────────────────────────────────────────────────────────
# Taskserv path exists, proceed with processing
if not ($wk_server | path join $taskserv.name| path exists) { ^mkdir "-p" ($wk_server | path join $taskserv.name) }
let $taskserv_profile = if $taskserv.profile == "" { "default" } else { $taskserv.profile }
let $taskserv_install_mode = if $taskserv.install_mode == "" { "library" } else { $taskserv.install_mode }
let server_taskserv_path = ($server.hostname | path join $taskserv_profile)
let defs = {
settings: $settings, server: $server, taskserv: $effective_taskserv,
taskserv_install_mode: $taskserv_install_mode, taskserv_profile: $taskserv_profile,
taskserv_dir: $taskserv_dir,
pos: { server: $"($server_pos)", taskserv: $taskserv_pos}, ip: $ip, check: $check }
# Enhanced check mode
if $check {
let check_result = (run-check-mode $taskserv.name $taskserv_profile $settings $server --verbose=(is-debug-enabled))
if $check_result.overall_valid {
# Check passed, proceed (no action needed, validation was successful)
} else {
_print $"(_ansi red)⊘ Skipping deployment due to validation errors(_ansi reset)"
}
if $upload {
run-upload-inspection $defs --verbose=(is-debug-enabled)
}
} else {
# Normal installation mode — functions return bool; false = failure
let install_ok = match $taskserv.install_mode {
"server" | "getfile" => {
(install_from_server $defs $server_taskserv_path $wk_server)
},
"library-server" => {
let a = (install_from_library $defs $server_taskserv_path $wk_server)
let b = (install_from_server $defs $server_taskserv_path $wk_server)
$a and $b
},
"server-library" => {
let a = (install_from_server $defs $server_taskserv_path $wk_server)
let b = (install_from_library $defs $server_taskserv_path $wk_server)
$a and $b
},
"library" => {
(install_from_library $defs $server_taskserv_path $wk_server)
},
"local" => {
# Runs install script on the provisioning machine (not via SSH).
# Used for tools like k0sctl that manage their own remote connections.
(install_from_library $defs $server_taskserv_path $wk_server)
},
}
if not $install_ok {
_print $"🛑 ($taskserv.name) on ($server.hostname) failed"
state-node-finish $workspace_path $server.hostname $taskserv.name --source "orchestrator"
if ($taskserv.on_error? | default "Continue") == "Stop" {
$stop_on_error = true
}
continue
}
}
# Write completed state after successful execution.
# reinstall = reset-only: transition back to pending so the next
# install create goes through the gate normally.
if not $check {
if $effective_cmd == "delete" {
state-node-delete $workspace_path $server.hostname $taskserv.name
} else if $effective_cmd == "reinstall" {
state-node-reset $workspace_path $server.hostname $taskserv.name --source "orchestrator" --actor ($env.USER? | default "system")
} else {
state-node-finish $workspace_path $server.hostname $taskserv.name --success --source "orchestrator"
}
}
if $clean_created_taskservs == "yes" { rm -rf ($wk_server | path join $taskserv.name) }
}
}
if $clean_created_taskservs == "yes" { rm -rf $wk_server }
_print $"Tasks completed on ($server.hostname)"
}
}
if ("/tmp/k8s_join.sh" | path exists) { cp "/tmp/k8s_join.sh" $root_wk_server ; rm -r /tmp/k8s_join.sh }
if $dflt_clean_created_taskservs == "yes" { rm -rf $root_wk_server }
_print $"✅ Tasks (_ansi green_bold)completed(_ansi reset) ($match_server) ($match_taskserv) ($match_taskserv_profile) ....."
# Show next-step hints after successful taskserv installation
if not $check and ($match_taskserv | is-not-empty) {
show-next-step "taskserv_create" {name: $match_taskserv}
}
true
}