use utils.nu * # REMOVED: use lib_provisioning * - causes circular import use run.nu * use check_mode.nu * use ../lib_provisioning/config/accessor.nu * use ../lib_provisioning/utils/logging.nu [is-debug-enabled, is-debug-check-enabled] use ../servers/utils.nu [servers_selector, wait_for_server] use ../lib_provisioning/utils/hints.nu * use ../workspace/state.nu * use ../lib_provisioning/utils/nickel_processor.nu [ncl-eval] # Resolve taskserv directory: checks direct (flat) then category subdirectories (hierarchical). # Also tries underscore variant of hyphenated names (vol-prepare → vol_prepare). def find-taskserv-path [taskservs_path: string, name: string]: nothing -> string { let alt = ($name | str replace --all "-" "_") let names = if $alt != $name { [$name, $alt] } else { [$name] } for n in $names { let direct = ($taskservs_path | path join $n) if ($direct | path exists) { return $direct } } if not ($taskservs_path | path exists) { return "" } for n in $names { let found = (do -i { ls $taskservs_path } | where type == "dir" | each {|cat| let candidate = ($cat.name | path join $n) if ($candidate | path exists) { $candidate } else { null } } | compact) if ($found | is-not-empty) { return ($found | first) } } "" } #use ../extensions/taskservs/run.nu run_taskserv def install_from_server [ defs: record server_taskserv_path: string wk_server: string ]: nothing -> bool { _print ( $"(_ansi yellow_bold)($defs.taskserv.name)(_ansi reset) (_ansi default_dimmed)on(_ansi reset) " + $"($defs.server.hostname) (_ansi default_dimmed)install(_ansi reset) " + $"(_ansi purple_bold)from ($defs.taskserv_install_mode)(_ansi reset)" ) let run_taskservs_path = (get-run-taskservs-path) (run_taskserv $defs ($run_taskservs_path | path join $defs.taskserv.name | path join $server_taskserv_path) ($wk_server | path join $defs.taskserv.name)) } def install_from_library [ defs: record server_taskserv_path: string wk_server: string ]: nothing -> bool { _print ( $"(_ansi yellow_bold)($defs.taskserv.name)(_ansi reset) (_ansi default_dimmed)on(_ansi reset) " + $"($defs.server.hostname) (_ansi default_dimmed)install(_ansi reset) " + $"(_ansi purple_bold)from library(_ansi reset)" ) let taskservs_path = (get-taskservs-path) let taskserv_dir = (find-taskserv-path $taskservs_path $defs.taskserv.name) (run_taskserv $defs ($taskserv_dir | path join $defs.taskserv_profile) ($wk_server | path join $defs.taskserv.name)) } # Build a map of taskserv_name → [depends_on taskserv_names] from a formula DAG. # Reads the formula whose id matches "-formula". # Returns {} if the formula is not found or the DAG file does not exist. def load-dag-deps [settings: record, hostname: string]: nothing -> record { let dag_path = ($settings.infra_path | path join "dag.ncl") if not ($dag_path | path exists) { return {} } let prov_path = ($env.PROVISIONING? | default "/usr/local/provisioning") let dag = (try { ncl-eval $dag_path [$prov_path] } catch { return {} }) let formula_id = $"($hostname)-formula" let formula = ($dag.composition?.formulas? | default [] | where {|f| $f.formula_id? == $formula_id} | get 0?) if ($formula | is-empty) { return {} } # Build map: taskserv_name → [dep_taskserv_names] # Formula nodes have: { id, taskserv: {name}, depends_on: [{node_id}] } # We need to resolve node_id → taskserv.name via the nodes list. let nodes = ($formula.nodes? | default []) let id_to_name = ($nodes | each {|n| { id: $n.id, name: ($n.taskserv?.name? | default $n.id) } }) $nodes | each {|n| let ts_name = ($n.taskserv?.name? | default $n.id) let dep_names = ($n.depends_on? | default [] | each {|d| let resolved = ($id_to_name | where id == $d.node_id | first?) if ($resolved | is-not-empty) { $resolved.name } else { $d.node_id } }) { $ts_name: $dep_names } } | reduce -f {} {|it, acc| $acc | merge $it } } export def on_taskservs [ settings: record match_taskserv: string match_taskserv_profile: string match_server: string iptype: string check: bool upload: bool = false reset: bool = false cmd: string = "" force_delete: bool = false ] { _print $"Running (_ansi yellow_bold)taskservs(_ansi reset) ..." let provisioning_sops = ($env.PROVISIONING_SOPS? | default "") if $provisioning_sops == "" { # A SOPS load env $env.CURRENT_INFRA_PATH = $settings.infra_path use ../sops_env.nu } let ip_type = if $iptype == "" { "public" } else { $iptype } let str_created_taskservs_dirpath = ( $settings.data | get -o created_taskservs_dirpath | default (["/tmp"] | path join) | str replace "./" $"($settings.src_path)/" | str replace "~" $env.HOME | str replace "NOW" $env.NOW ) let created_taskservs_dirpath = if ($str_created_taskservs_dirpath | str starts-with "/" ) { $str_created_taskservs_dirpath } else { $settings.src_path | path join $str_created_taskservs_dirpath } let root_wk_server = ($created_taskservs_dirpath | path join "on-server") if not ($root_wk_server | path exists ) { ^mkdir "-p" $root_wk_server } let dflt_clean_created_taskservs = ($settings.data.clean_created_taskservs? | default $created_taskservs_dirpath | str replace "./" $"($settings.src_path)/" | str replace "~" $env.HOME ) let run_ops = if (is-debug-enabled) { "bash -x" } else { "" } $settings.data.servers | enumerate | where {|it| $match_server == "" or $it.item.hostname == $match_server } | each {|it| let server_pos = $it.index let srvr = $it.item _print $"on (_ansi green_bold)($srvr.hostname)(_ansi reset) pos ($server_pos) ..." let clean_created_taskservs = ($settings.data.servers | get $server_pos | get -o clean_created_taskservs | default $dflt_clean_created_taskservs) # Determine IP address — resolve real IP when upload inspection is requested let ip = if (is-debug-check-enabled) or ($check and not $upload) { "127.0.0.1" } else { let curr_ip = (mw_get_ip $settings $srvr $ip_type false | default "") if $curr_ip == "" { _print $"🛑 No IP ($ip_type) found for (_ansi green_bold)($srvr.hostname)(_ansi reset) ($server_pos) " null } else { # Check if server is in running state if not (wait_for_server $server_pos $srvr $settings $curr_ip) { _print $"🛑 server ($srvr.hostname) ($curr_ip) (_ansi red_bold)not in running state(_ansi reset)" null } else { $curr_ip } } } # Process server only if we have valid IP if ($ip != null) { let server = ($srvr | merge { ip_addresses: { pub: $ip, priv: ($srvr | get -o network_private_ip | default ($srvr | get -o networking.private_ip | default "")) }}) let wk_server = ($root_wk_server | path join $server.hostname) let workspace_path = ($settings.src_path? | default $env.PWD) let dag_deps = (load-dag-deps $settings $server.hostname) if ($wk_server | path exists ) { rm -rf $wk_server } ^mkdir "-p" $wk_server let taskserv_list = if $force_delete and $cmd == "delete" { # --force: build taskserv list from state file, servers.ncl, or explicit name. # Covers: removed from servers.ncl, never tracked in state, or both. let st_taskservs = (state-read $workspace_path | get -o servers | default {} | get -o $server.hostname | default {} | get -o taskservs | default {}) let from_state = ($st_taskservs | transpose name state_data | where {|it| let matches_taskserv = ($match_taskserv == "" or $match_taskserv == $it.name) let matches_profile = ($match_taskserv_profile == "" or $match_taskserv_profile == ($it.state_data.profile? | default "default")) $matches_taskserv and $matches_profile } | each {|it| $it.name }) # If explicit taskserv requested but not found in state, force-create a synthetic entry let names = if ($match_taskserv | is-not-empty) and $match_taskserv not-in $from_state { $from_state | append $match_taskserv } else { $from_state } $names | enumerate | each {|it| { index: $it.index, item: { name: $it.item, install_mode: "library", profile: ($st_taskservs | get -o $it.item | default {} | get -o profile | default "default"), target_save_path: "", depends_on: [], on_error: "Continue", max_retries: 0, params: {}, }, }} } else { $server.taskservs | enumerate | where {|it| let taskserv = $it.item let matches_taskserv = ($match_taskserv == "" or $match_taskserv == $taskserv.name) let matches_profile = ($match_taskserv_profile == "" or $match_taskserv_profile == $taskserv.profile) $matches_taskserv and $matches_profile } } mut stop_on_error = false for it in $taskserv_list { if $stop_on_error { break } let taskserv = $it.item let taskserv_pos = $it.index let taskservs_path = (get-taskservs-path) let taskserv_dir = (find-taskserv-path $taskservs_path $taskserv.name) # Check if taskserv path exists - skip if not found if ($taskserv_dir | is-empty) { _print $"taskserv path: ($taskservs_path)/($taskserv.name) (_ansi red_bold)not found(_ansi reset)" } else { # ── Resolve effective taskserv (cmd_task override) ──────────── let effective_taskserv = if ($cmd | is-not-empty) { $taskserv | merge { cmd_task: $cmd } } else if $reset { $taskserv | merge { cmd_task: "reinstall" } } else { $taskserv } # Derive operation label and whether this is a deploy (install/reinstall) # vs a maintenance op (update, scripts, restart, config, remove). let effective_cmd = ($effective_taskserv.cmd_task? | default "install") let effective_operation = match $effective_cmd { "install" | "reinstall" => "create", $op => $op, } # Only fresh installs go through the state-gate. # reinstall/reset always runs regardless of current state. let is_deploy = $effective_cmd == "install" # ── State gate (fresh install only) ────────────────────────── # reinstall, update, scripts, restart, config bypass the gate. if not $check { if $is_deploy { let depends_on = ($dag_deps | get -o $taskserv.name | default []) let decision = (state-node-decision-with-deps $workspace_path $server.hostname $taskserv.name $depends_on) match $decision { "skip" => { let node = (state-node-get $workspace_path $server.hostname $taskserv.name) _print $"⊘ ($taskserv.name) on ($server.hostname) — state=completed \(ended ($node.ended_at? | default '?')). Run reset first." continue }, $d if ($d | str starts-with "blocked:") => { let blocker = ($d | str replace "blocked:" "") _print $"⛔ ($taskserv.name) on ($server.hostname) — blocked by ($blocker) \(not completed)" continue }, "rerun" => { _print $"↻ ($taskserv.name) on ($server.hostname) — failed, re-running" }, _ => {}, } } else { _print $"↺ ($taskserv.name) on ($server.hostname) — ($effective_cmd)" } let actor = ($env.USER? | default "system") let profile = ($taskserv.profile? | default "") state-node-start $workspace_path $server.hostname $taskserv.name --actor $actor --source "orchestrator" --operation $effective_operation --profile $profile } # ───────────────────────────────────────────────────────────── # Taskserv path exists, proceed with processing if not ($wk_server | path join $taskserv.name| path exists) { ^mkdir "-p" ($wk_server | path join $taskserv.name) } let $taskserv_profile = if $taskserv.profile == "" { "default" } else { $taskserv.profile } let $taskserv_install_mode = if $taskserv.install_mode == "" { "library" } else { $taskserv.install_mode } let server_taskserv_path = ($server.hostname | path join $taskserv_profile) let defs = { settings: $settings, server: $server, taskserv: $effective_taskserv, taskserv_install_mode: $taskserv_install_mode, taskserv_profile: $taskserv_profile, taskserv_dir: $taskserv_dir, pos: { server: $"($server_pos)", taskserv: $taskserv_pos}, ip: $ip, check: $check } # Enhanced check mode if $check { let check_result = (run-check-mode $taskserv.name $taskserv_profile $settings $server --verbose=(is-debug-enabled)) if $check_result.overall_valid { # Check passed, proceed (no action needed, validation was successful) } else { _print $"(_ansi red)⊘ Skipping deployment due to validation errors(_ansi reset)" } if $upload { run-upload-inspection $defs --verbose=(is-debug-enabled) } } else { # Normal installation mode — functions return bool; false = failure let install_ok = match $taskserv.install_mode { "server" | "getfile" => { (install_from_server $defs $server_taskserv_path $wk_server) }, "library-server" => { let a = (install_from_library $defs $server_taskserv_path $wk_server) let b = (install_from_server $defs $server_taskserv_path $wk_server) $a and $b }, "server-library" => { let a = (install_from_server $defs $server_taskserv_path $wk_server) let b = (install_from_library $defs $server_taskserv_path $wk_server) $a and $b }, "library" => { (install_from_library $defs $server_taskserv_path $wk_server) }, "local" => { # Runs install script on the provisioning machine (not via SSH). # Used for tools like k0sctl that manage their own remote connections. (install_from_library $defs $server_taskserv_path $wk_server) }, } if not $install_ok { _print $"🛑 ($taskserv.name) on ($server.hostname) failed" state-node-finish $workspace_path $server.hostname $taskserv.name --source "orchestrator" if ($taskserv.on_error? | default "Continue") == "Stop" { $stop_on_error = true } continue } } # Write completed state after successful execution. # reinstall = reset-only: transition back to pending so the next # install create goes through the gate normally. if not $check { if $effective_cmd == "delete" { state-node-delete $workspace_path $server.hostname $taskserv.name } else if $effective_cmd == "reinstall" { state-node-reset $workspace_path $server.hostname $taskserv.name --source "orchestrator" --actor ($env.USER? | default "system") } else { state-node-finish $workspace_path $server.hostname $taskserv.name --success --source "orchestrator" } } if $clean_created_taskservs == "yes" { rm -rf ($wk_server | path join $taskserv.name) } } } if $clean_created_taskservs == "yes" { rm -rf $wk_server } _print $"Tasks completed on ($server.hostname)" } } if ("/tmp/k8s_join.sh" | path exists) { cp "/tmp/k8s_join.sh" $root_wk_server ; rm -r /tmp/k8s_join.sh } if $dflt_clean_created_taskservs == "yes" { rm -rf $root_wk_server } _print $"✅ Tasks (_ansi green_bold)completed(_ansi reset) ($match_server) ($match_taskserv) ($match_taskserv_profile) ....." # Show next-step hints after successful taskserv installation if not $check and ($match_taskserv | is-not-empty) { show-next-step "taskserv_create" {name: $match_taskserv} } true }