# dag-executor.nu — DAG-aware taskserv execution # # Resolves cross-formula dependencies from dag.ncl before executing taskservs. # When a user runs `provisioning taskserv create kubernetes`, this module: # 1. Finds which formulas contain the requested taskserv # 2. Walks the DAG backwards to collect all prerequisite formulas # 3. Checks state to skip already-completed formulas # 4. Executes pending formulas in topological order with health gates # # Falls back to direct execution when no dag.ncl exists. use handlers.nu * use ../workspace/state.nu * use ../lib_provisioning/config/accessor.nu * use ../lib_provisioning/utils/ssh.nu [ssh_cmd] use ../lib_provisioning/utils/nickel_processor.nu [ncl-eval] # Parse dag.ncl and servers.ncl formulas into a unified execution model. export def load-dag [settings: record]: nothing -> record { let dag_path = ($settings.infra_path | path join "dag.ncl") let servers_path = ($settings.infra_path | path join "servers.ncl") let prov_root = ($env.PROVISIONING? | default "/usr/local/provisioning") if not ($dag_path | path exists) { return { has_dag: false } } let dag = (try { ncl-eval $dag_path [$prov_root] } catch { return { has_dag: false } }) # Formulas live in dag.ncl (moved from servers.ncl in unified component architecture). # dag.formulas — formula definitions (id, server, nodes, max_parallel) # dag.composition.formulas — DAG metadata (depends_on, parallel, health_gate) let raw_formulas = ($dag | get -o formulas | default []) if ($raw_formulas | is-empty) { return { has_dag: false } } # Build formula map: formula_id → { server, nodes, depends_on, parallel, health_gate } let formula_map = ($raw_formulas | each {|f| let dag_entry = ($dag.composition.formulas | where formula_id == $f.id | get 0?) { id: $f.id, server: $f.server, nodes: $f.nodes, max_parallel: ($f.max_parallel? | default 4), depends_on: (if ($dag_entry | is-not-empty) { $dag_entry.depends_on } else { [] }), parallel: (if ($dag_entry | is-not-empty) { $dag_entry.parallel? | default false } else { false }), health_gate: (if ($dag_entry | is-not-empty) { $dag_entry.health_gate? | default null } else { null }), } }) { has_dag: true, formulas: $formula_map } } # Find all formulas that contain a given taskserv name. # Extract the component/taskserv name from a formula node (handles both field shapes). def node-name [n: record]: nothing -> string { $n | get -o taskserv | default null | get -o name | default ($n | get -o component | default null | get -o name | default "") } def find-formulas-for-taskserv [dag: record, taskserv_name: string, server_filter: string]: nothing -> list { $dag.formulas | where {|f| let has_taskserv = ($f.nodes | any {|n| (node-name $n) == $taskserv_name }) let matches_server = ($server_filter == "" or $f.server == $server_filter) $has_taskserv and $matches_server } } # Walk the DAG backwards from target formulas to collect all prerequisites. # Returns formula_ids in topological order (prerequisites first). def resolve-prerequisites [dag: record, target_ids: list]: nothing -> list { let all_ids = ($dag.formulas | each {|f| $f.id }) # Recursive walk: collect all transitive dependencies mut visited = [] mut queue = $target_ids while ($queue | is-not-empty) { let current = ($queue | first) $queue = ($queue | skip 1) if $current in $visited { continue } $visited = ($visited | append $current) let formula = ($dag.formulas | where id == $current | get 0?) if ($formula | is-not-empty) { for dep in $formula.depends_on { if $dep.formula_id not-in $visited { $queue = ($queue | append $dep.formula_id) } } } } # Topological sort: Kahn's algorithm # Build adjacency from the visited subset only let subset = ($dag.formulas | where {|f| $f.id in $visited }) mut in_degree = ($subset | each {|f| { $f.id: 0 } } | reduce -f {} {|it, acc| $acc | merge $it }) for f in $subset { for dep in $f.depends_on { if $dep.formula_id in $visited { let cur = ($in_degree | get $f.id) $in_degree = ($in_degree | upsert $f.id ($cur + 1)) } } } mut sorted = [] mut zero_queue = ($in_degree | transpose k v | where v == 0 | each {|it| $it.k }) while ($zero_queue | is-not-empty) { let node = ($zero_queue | first) $zero_queue = ($zero_queue | skip 1) $sorted = ($sorted | append $node) # Find formulas that depend on this node for f in $subset { let depends_on_node = ($f.depends_on | any {|d| $d.formula_id == $node }) if $depends_on_node { let cur = ($in_degree | get $f.id) $in_degree = ($in_degree | upsert $f.id ($cur - 1)) if ($cur - 1) == 0 { $zero_queue = ($zero_queue | append $f.id) } } } } $sorted } # Check if a formula is fully completed in state. def formula-completed [workspace_path: string, formula: record]: nothing -> bool { let st = (state-read $workspace_path) let srv_state = ($st.servers | get -o $formula.server | default {} | get -o taskservs | default {}) $formula.nodes | all {|n| let ts_name = (node-name $n) let node_state = ($srv_state | get -o $ts_name | default {} | get -o state | default "pending") $node_state == "completed" } } # Execute a health gate command on the appropriate server via SSH. # Uses the gate's timeout_ms as total budget, distributing retries with backoff. # For a CP health gate (180s timeout, 10 retries) this gives ~18s between checks # with increasing intervals — enough for apiserver + cilium to stabilize. def run-health-gate [settings: record, formula: record]: nothing -> bool { let gate = $formula.health_gate if ($gate | is-empty) or $gate == null { return true } _print $" health gate: ($formula.id) ..." let server = ($settings.data.servers | where hostname == $formula.server | get 0?) if ($server | is-empty) { _print $" ⚠ server ($formula.server) not found for health gate" return false } let ip = (do { mw_get_ip $settings $server "public" false } catch { "" }) let max_retries = ($gate.retries? | default 6) let timeout_ms = ($gate.timeout_ms? | default 60000) # Base interval: distribute total timeout across retries, minimum 10s let base_wait_raw = ($timeout_ms / $max_retries / 1000) let base_wait = (if $base_wait_raw < 10 { 10 } else { $base_wait_raw }) mut remaining = $max_retries mut elapsed_ms = 0 while $remaining > 0 and $elapsed_ms < $timeout_ms { let ok = (ssh_cmd $settings $server false $gate.check_cmd $ip) if $ok { _print $" ✅ health gate ($formula.id) passed" return true } $remaining -= 1 if $remaining > 0 { let attempt = ($max_retries - $remaining) # Backoff: first attempts wait base_wait, later ones wait 1.5x let wait = if $attempt <= 2 { $base_wait } else { (($base_wait * 1.5) | into int) } let wait_int = ($wait | into int) _print $" ⏳ gate ($attempt)/($max_retries) — retry in ($wait_int)s" sleep ($"($wait_int)sec" | into duration) $elapsed_ms = ($elapsed_ms + ($wait_int * 1000)) } } _print $" 🛑 health gate ($formula.id) failed after ($max_retries) attempts \(($timeout_ms / 1000)s timeout)" false } # Main entry: DAG-aware taskserv execution. # # If dag.ncl exists, resolves the full dependency chain and executes # formulas in topological order. Otherwise falls back to on_taskservs. export def dag-aware-create [ settings: record match_taskserv: string match_server: string iptype: string check: bool upload: bool = false reset: bool = false cmd: string = "" ]: nothing -> nothing { let dag = (load-dag $settings) if not $dag.has_dag { # No DAG — fall back to direct execution on_taskservs $settings $match_taskserv "" $match_server $iptype $check $upload $reset $cmd return } let workspace_path = ($settings.src_path? | default $env.PWD) # Ensure all formula nodes exist in state — nodes installed before state # tracking was active have no entry and get silently skipped by the gate. # Only initialise nodes that have never been written (actor.identity empty = default # from state-node-get). This avoids resetting completed nodes when hyphenated # server names cause get -o to return {} instead of the real server record. for formula in $dag.formulas { for node in $formula.nodes { let node_nm = (node-name $node) let existing = (state-node-get $workspace_path $formula.server $node_nm) if ($existing.actor?.identity? | default "" | is-empty) { state-node-set $workspace_path $formula.server $node_nm { state: "pending", operation: "create", profile: ($node | get -o taskserv | default {} | get -o profile | default "default"), started_at: "", ended_at: "", blocker: "", actor: { identity: "system", source: "dag-executor" }, log: [{ ts: ((date now) | format date "%Y-%m-%dT%H:%M:%SZ"), event: "dag-init", source: "dag-executor" }], } } } } # Find target formulas containing the requested taskserv let targets = (find-formulas-for-taskserv $dag $match_taskserv $match_server) if ($targets | is-empty) { _print $"⚠ No formula contains taskserv ($match_taskserv) for server ($match_server)" return } let target_ids = ($targets | each {|f| $f.id }) # Resolve full dependency chain in topological order let execution_order = (resolve-prerequisites $dag $target_ids) _print $"DAG execution plan: ($execution_order | length) formula\(s\)" for fid in $execution_order { let is_target = $fid in $target_ids let tag = if $is_target { " [target]" } else { " [prerequisite]" } _print $" ($fid)($tag)" } _print "" # Execute formulas in order. # A formula failure or health gate failure stops the entire DAG — # dependent formulas never run if their prerequisite is broken. for formula_id in $execution_order { let formula = ($dag.formulas | where id == $formula_id | first) # Skip completed formulas (unless reset) if not $reset and $cmd == "" and (formula-completed $workspace_path $formula) { _print $"⊘ ($formula_id) — already completed" # Verify health gate still passes for completed prereqs if $formula.health_gate != null { if not (run-health-gate $settings $formula) { _print $"🛑 ($formula_id) was completed but health gate now fails — stopping" _print $" Run with --reset to re-execute this formula" return } } continue } _print $"▶ ($formula_id) on ($formula.server)" # Execute each formula node in order — only the taskservs declared # in the formula, not every taskserv on the server. # When match_taskserv is set, only that specific node runs; # the state gate inside on_taskservs skips already-completed nodes. for node in $formula.nodes { let nm = (node-name $node) if $match_taskserv == "" or $nm == $match_taskserv { on_taskservs $settings $nm "" $formula.server $iptype $check $upload $reset $cmd } } # Check if formula completed successfully by reading state. # Skip when a specific taskserv was requested — partial runs are intentional. # If any node failed, stop — do not proceed to dependent formulas. if $match_taskserv == "" and not (formula-completed $workspace_path $formula) { let failed_nodes = ($formula.nodes | where {|n| let st = (state-node-get $workspace_path $formula.server (node-name $n)) $st.state != "completed" } | each {|n| node-name $n }) _print $"🛑 ($formula_id) failed — nodes not completed: ($failed_nodes | str join ', ')" _print $" Fix the issue and re-run. Dependent formulas will not execute." return } # Health gate: verify the formula's services are actually operational. # Retries with backoff — services like apiserver need time after install. # Skip for partial runs — health gate only makes sense on full formula completion. if $match_taskserv == "" and $formula.health_gate != null { if not (run-health-gate $settings $formula) { _print $"🛑 ($formula_id) health gate failed — stopping" _print $" The formula completed but services are not healthy." _print $" Check logs on ($formula.server) and re-run." return } } } _print $"✅ DAG execution complete" }