prvng_core/nulib/taskservs/dag-executor.nu
Jesús Pérez 894046ef5a
feat(core): three-layer DAG, unified component arch, commands-registry cache, Nushell 0.112.2 migration
- DAG architecture: `dag show/validate/export` (nulib/main_provisioning/dag.nu),
    config loader (lib_provisioning/config/loader/dag.nu), taskserv dag-executor.
    Backed by schemas/lib/dag/*.ncl; orchestrator emits NATS events via
    WorkspaceComposition::into_workflow. See ADR-020, ADR-021.
  - Unified Component Architecture: components/mod.nu, main_provisioning/
    {components,workflow,extensions,ontoref-queries}.nu. Full workflow engine with
    topological sort and NATS subject emission. Blocks A-H complete (libre-daoshi).
  - Commands-registry: nulib/commands-registry.ncl (Nickel source, 314 lines) +
    JSON cache at ~/.cache/provisioning/commands-registry.json rebuilt on source
    change. cli/provisioning fast-path alias expansion avoids cold Nu startup.
    ADDING_COMMANDS.md documents new-command workflow.
  - Platform service manager: service-manager.nu (+573), startup.nu (+611),
    service-check.nu (+255); autostart/bootstrap/health/target refactored.
  - Nushell 0.112.2 migration: removed all try/catch and bash redirections;
    external commands prefixed with ^; type signatures enforced. Driven by
    scripts/refactor-try-catch{,-simplified}.nu.
  - TTY stack: removed shlib/*-tty.sh; replaced by cli/tty-dispatch.sh,
    tty-filter.sh, tty-commands.conf.
  - New domain modules: images/ (golden image lifecycle), workspace/{state,sync}.nu,
    main_provisioning/{bootstrap,cluster-deploy,fip,state}.nu, commands/{state,
    build,integrations/auth,utilities/alias}.nu, platform.nu expanded (+874).
  - Config loader overhaul: loader/core.nu slimmed (-759), cache/core.nu
    refactored (-454), removed legacy loaders/file_loader.nu (-330).
  - Thirteen new provisioning-<domain>.nu top-level modules for bash dispatcher.
  - Tests: test_workspace_state.nu (+351); updates to test_oci_registry,
    test_services.
  - README + CHANGELOG updated.
2026-04-17 04:27:33 +01:00

315 lines
13 KiB
Text

# dag-executor.nu — DAG-aware taskserv execution
#
# Resolves cross-formula dependencies from dag.ncl before executing taskservs.
# When a user runs `provisioning taskserv create kubernetes`, this module:
# 1. Finds which formulas contain the requested taskserv
# 2. Walks the DAG backwards to collect all prerequisite formulas
# 3. Checks state to skip already-completed formulas
# 4. Executes pending formulas in topological order with health gates
#
# Falls back to direct execution when no dag.ncl exists.
use handlers.nu *
use ../workspace/state.nu *
use ../lib_provisioning/config/accessor.nu *
use ../lib_provisioning/utils/ssh.nu [ssh_cmd]
use ../lib_provisioning/utils/nickel_processor.nu [ncl-eval]
# Parse dag.ncl and servers.ncl formulas into a unified execution model.
export def load-dag [settings: record]: nothing -> record {
let dag_path = ($settings.infra_path | path join "dag.ncl")
let servers_path = ($settings.infra_path | path join "servers.ncl")
let prov_root = ($env.PROVISIONING? | default "/usr/local/provisioning")
if not ($dag_path | path exists) { return { has_dag: false } }
let dag = (try {
ncl-eval $dag_path [$prov_root]
} catch {
return { has_dag: false }
})
# Formulas live in dag.ncl (moved from servers.ncl in unified component architecture).
# dag.formulas — formula definitions (id, server, nodes, max_parallel)
# dag.composition.formulas — DAG metadata (depends_on, parallel, health_gate)
let raw_formulas = ($dag | get -o formulas | default [])
if ($raw_formulas | is-empty) { return { has_dag: false } }
# Build formula map: formula_id → { server, nodes, depends_on, parallel, health_gate }
let formula_map = ($raw_formulas | each {|f|
let dag_entry = ($dag.composition.formulas | where formula_id == $f.id | get 0?)
{
id: $f.id,
server: $f.server,
nodes: $f.nodes,
max_parallel: ($f.max_parallel? | default 4),
depends_on: (if ($dag_entry | is-not-empty) { $dag_entry.depends_on } else { [] }),
parallel: (if ($dag_entry | is-not-empty) { $dag_entry.parallel? | default false } else { false }),
health_gate: (if ($dag_entry | is-not-empty) { $dag_entry.health_gate? | default null } else { null }),
}
})
{ has_dag: true, formulas: $formula_map }
}
# Find all formulas that contain a given taskserv name.
# Extract the component/taskserv name from a formula node (handles both field shapes).
def node-name [n: record]: nothing -> string {
$n | get -o taskserv | default null | get -o name
| default ($n | get -o component | default null | get -o name | default "")
}
def find-formulas-for-taskserv [dag: record, taskserv_name: string, server_filter: string]: nothing -> list {
$dag.formulas | where {|f|
let has_taskserv = ($f.nodes | any {|n| (node-name $n) == $taskserv_name })
let matches_server = ($server_filter == "" or $f.server == $server_filter)
$has_taskserv and $matches_server
}
}
# Walk the DAG backwards from target formulas to collect all prerequisites.
# Returns formula_ids in topological order (prerequisites first).
def resolve-prerequisites [dag: record, target_ids: list<string>]: nothing -> list<string> {
let all_ids = ($dag.formulas | each {|f| $f.id })
# Recursive walk: collect all transitive dependencies
mut visited = []
mut queue = $target_ids
while ($queue | is-not-empty) {
let current = ($queue | first)
$queue = ($queue | skip 1)
if $current in $visited { continue }
$visited = ($visited | append $current)
let formula = ($dag.formulas | where id == $current | get 0?)
if ($formula | is-not-empty) {
for dep in $formula.depends_on {
if $dep.formula_id not-in $visited {
$queue = ($queue | append $dep.formula_id)
}
}
}
}
# Topological sort: Kahn's algorithm
# Build adjacency from the visited subset only
let subset = ($dag.formulas | where {|f| $f.id in $visited })
mut in_degree = ($subset | each {|f| { $f.id: 0 } } | reduce -f {} {|it, acc| $acc | merge $it })
for f in $subset {
for dep in $f.depends_on {
if $dep.formula_id in $visited {
let cur = ($in_degree | get $f.id)
$in_degree = ($in_degree | upsert $f.id ($cur + 1))
}
}
}
mut sorted = []
mut zero_queue = ($in_degree | transpose k v | where v == 0 | each {|it| $it.k })
while ($zero_queue | is-not-empty) {
let node = ($zero_queue | first)
$zero_queue = ($zero_queue | skip 1)
$sorted = ($sorted | append $node)
# Find formulas that depend on this node
for f in $subset {
let depends_on_node = ($f.depends_on | any {|d| $d.formula_id == $node })
if $depends_on_node {
let cur = ($in_degree | get $f.id)
$in_degree = ($in_degree | upsert $f.id ($cur - 1))
if ($cur - 1) == 0 {
$zero_queue = ($zero_queue | append $f.id)
}
}
}
}
$sorted
}
# Check if a formula is fully completed in state.
def formula-completed [workspace_path: string, formula: record]: nothing -> bool {
let st = (state-read $workspace_path)
let srv_state = ($st.servers | get -o $formula.server | default {} | get -o taskservs | default {})
$formula.nodes | all {|n|
let ts_name = (node-name $n)
let node_state = ($srv_state | get -o $ts_name | default {} | get -o state | default "pending")
$node_state == "completed"
}
}
# Execute a health gate command on the appropriate server via SSH.
# Uses the gate's timeout_ms as total budget, distributing retries with backoff.
# For a CP health gate (180s timeout, 10 retries) this gives ~18s between checks
# with increasing intervals — enough for apiserver + cilium to stabilize.
def run-health-gate [settings: record, formula: record]: nothing -> bool {
let gate = $formula.health_gate
if ($gate | is-empty) or $gate == null { return true }
_print $" health gate: ($formula.id) ..."
let server = ($settings.data.servers | where hostname == $formula.server | get 0?)
if ($server | is-empty) {
_print $" ⚠ server ($formula.server) not found for health gate"
return false
}
let ip = (do { mw_get_ip $settings $server "public" false } catch { "" })
let max_retries = ($gate.retries? | default 6)
let timeout_ms = ($gate.timeout_ms? | default 60000)
# Base interval: distribute total timeout across retries, minimum 10s
let base_wait_raw = ($timeout_ms / $max_retries / 1000)
let base_wait = (if $base_wait_raw < 10 { 10 } else { $base_wait_raw })
mut remaining = $max_retries
mut elapsed_ms = 0
while $remaining > 0 and $elapsed_ms < $timeout_ms {
let ok = (ssh_cmd $settings $server false $gate.check_cmd $ip)
if $ok {
_print $" ✅ health gate ($formula.id) passed"
return true
}
$remaining -= 1
if $remaining > 0 {
let attempt = ($max_retries - $remaining)
# Backoff: first attempts wait base_wait, later ones wait 1.5x
let wait = if $attempt <= 2 { $base_wait } else { (($base_wait * 1.5) | into int) }
let wait_int = ($wait | into int)
_print $" ⏳ gate ($attempt)/($max_retries) — retry in ($wait_int)s"
sleep ($"($wait_int)sec" | into duration)
$elapsed_ms = ($elapsed_ms + ($wait_int * 1000))
}
}
_print $" 🛑 health gate ($formula.id) failed after ($max_retries) attempts \(($timeout_ms / 1000)s timeout)"
false
}
# Main entry: DAG-aware taskserv execution.
#
# If dag.ncl exists, resolves the full dependency chain and executes
# formulas in topological order. Otherwise falls back to on_taskservs.
export def dag-aware-create [
settings: record
match_taskserv: string
match_server: string
iptype: string
check: bool
upload: bool = false
reset: bool = false
cmd: string = ""
]: nothing -> nothing {
let dag = (load-dag $settings)
if not $dag.has_dag {
# No DAG — fall back to direct execution
on_taskservs $settings $match_taskserv "" $match_server $iptype $check $upload $reset $cmd
return
}
let workspace_path = ($settings.src_path? | default $env.PWD)
# Ensure all formula nodes exist in state — nodes installed before state
# tracking was active have no entry and get silently skipped by the gate.
# Only initialise nodes that have never been written (actor.identity empty = default
# from state-node-get). This avoids resetting completed nodes when hyphenated
# server names cause get -o to return {} instead of the real server record.
for formula in $dag.formulas {
for node in $formula.nodes {
let node_nm = (node-name $node)
let existing = (state-node-get $workspace_path $formula.server $node_nm)
if ($existing.actor?.identity? | default "" | is-empty) {
state-node-set $workspace_path $formula.server $node_nm {
state: "pending",
operation: "create",
profile: ($node | get -o taskserv | default {} | get -o profile | default "default"),
started_at: "",
ended_at: "",
blocker: "",
actor: { identity: "system", source: "dag-executor" },
log: [{ ts: ((date now) | format date "%Y-%m-%dT%H:%M:%SZ"), event: "dag-init", source: "dag-executor" }],
}
}
}
}
# Find target formulas containing the requested taskserv
let targets = (find-formulas-for-taskserv $dag $match_taskserv $match_server)
if ($targets | is-empty) {
_print $"⚠ No formula contains taskserv ($match_taskserv) for server ($match_server)"
return
}
let target_ids = ($targets | each {|f| $f.id })
# Resolve full dependency chain in topological order
let execution_order = (resolve-prerequisites $dag $target_ids)
_print $"DAG execution plan: ($execution_order | length) formula\(s\)"
for fid in $execution_order {
let is_target = $fid in $target_ids
let tag = if $is_target { " [target]" } else { " [prerequisite]" }
_print $" ($fid)($tag)"
}
_print ""
# Execute formulas in order.
# A formula failure or health gate failure stops the entire DAG —
# dependent formulas never run if their prerequisite is broken.
for formula_id in $execution_order {
let formula = ($dag.formulas | where id == $formula_id | first)
# Skip completed formulas (unless reset)
if not $reset and $cmd == "" and (formula-completed $workspace_path $formula) {
_print $"⊘ ($formula_id) — already completed"
# Verify health gate still passes for completed prereqs
if $formula.health_gate != null {
if not (run-health-gate $settings $formula) {
_print $"🛑 ($formula_id) was completed but health gate now fails — stopping"
_print $" Run with --reset to re-execute this formula"
return
}
}
continue
}
_print $"▶ ($formula_id) on ($formula.server)"
# Execute each formula node in order — only the taskservs declared
# in the formula, not every taskserv on the server.
# When match_taskserv is set, only that specific node runs;
# the state gate inside on_taskservs skips already-completed nodes.
for node in $formula.nodes {
let nm = (node-name $node)
if $match_taskserv == "" or $nm == $match_taskserv {
on_taskservs $settings $nm "" $formula.server $iptype $check $upload $reset $cmd
}
}
# Check if formula completed successfully by reading state.
# Skip when a specific taskserv was requested — partial runs are intentional.
# If any node failed, stop — do not proceed to dependent formulas.
if $match_taskserv == "" and not (formula-completed $workspace_path $formula) {
let failed_nodes = ($formula.nodes | where {|n|
let st = (state-node-get $workspace_path $formula.server (node-name $n))
$st.state != "completed"
} | each {|n| node-name $n })
_print $"🛑 ($formula_id) failed — nodes not completed: ($failed_nodes | str join ', ')"
_print $" Fix the issue and re-run. Dependent formulas will not execute."
return
}
# Health gate: verify the formula's services are actually operational.
# Retries with backoff — services like apiserver need time after install.
# Skip for partial runs — health gate only makes sense on full formula completion.
if $match_taskserv == "" and $formula.health_gate != null {
if not (run-health-gate $settings $formula) {
_print $"🛑 ($formula_id) health gate failed — stopping"
_print $" The formula completed but services are not healthy."
_print $" Check logs on ($formula.server) and re-run."
return
}
}
}
_print $"✅ DAG execution complete"
}