- DAG architecture: `dag show/validate/export` (nulib/main_provisioning/dag.nu),
config loader (lib_provisioning/config/loader/dag.nu), taskserv dag-executor.
Backed by schemas/lib/dag/*.ncl; orchestrator emits NATS events via
WorkspaceComposition::into_workflow. See ADR-020, ADR-021.
- Unified Component Architecture: components/mod.nu, main_provisioning/
{components,workflow,extensions,ontoref-queries}.nu. Full workflow engine with
topological sort and NATS subject emission. Blocks A-H complete (libre-daoshi).
- Commands-registry: nulib/commands-registry.ncl (Nickel source, 314 lines) +
JSON cache at ~/.cache/provisioning/commands-registry.json rebuilt on source
change. cli/provisioning fast-path alias expansion avoids cold Nu startup.
ADDING_COMMANDS.md documents new-command workflow.
- Platform service manager: service-manager.nu (+573), startup.nu (+611),
service-check.nu (+255); autostart/bootstrap/health/target refactored.
- Nushell 0.112.2 migration: removed all try/catch and bash redirections;
external commands prefixed with ^; type signatures enforced. Driven by
scripts/refactor-try-catch{,-simplified}.nu.
- TTY stack: removed shlib/*-tty.sh; replaced by cli/tty-dispatch.sh,
tty-filter.sh, tty-commands.conf.
- New domain modules: images/ (golden image lifecycle), workspace/{state,sync}.nu,
main_provisioning/{bootstrap,cluster-deploy,fip,state}.nu, commands/{state,
build,integrations/auth,utilities/alias}.nu, platform.nu expanded (+874).
- Config loader overhaul: loader/core.nu slimmed (-759), cache/core.nu
refactored (-454), removed legacy loaders/file_loader.nu (-330).
- Thirteen new provisioning-<domain>.nu top-level modules for bash dispatcher.
- Tests: test_workspace_state.nu (+351); updates to test_oci_registry,
test_services.
- README + CHANGELOG updated.
315 lines
13 KiB
Text
315 lines
13 KiB
Text
# dag-executor.nu — DAG-aware taskserv execution
|
|
#
|
|
# Resolves cross-formula dependencies from dag.ncl before executing taskservs.
|
|
# When a user runs `provisioning taskserv create kubernetes`, this module:
|
|
# 1. Finds which formulas contain the requested taskserv
|
|
# 2. Walks the DAG backwards to collect all prerequisite formulas
|
|
# 3. Checks state to skip already-completed formulas
|
|
# 4. Executes pending formulas in topological order with health gates
|
|
#
|
|
# Falls back to direct execution when no dag.ncl exists.
|
|
|
|
use handlers.nu *
|
|
use ../workspace/state.nu *
|
|
use ../lib_provisioning/config/accessor.nu *
|
|
use ../lib_provisioning/utils/ssh.nu [ssh_cmd]
|
|
use ../lib_provisioning/utils/nickel_processor.nu [ncl-eval]
|
|
|
|
# Parse dag.ncl and servers.ncl formulas into a unified execution model.
|
|
export def load-dag [settings: record]: nothing -> record {
|
|
let dag_path = ($settings.infra_path | path join "dag.ncl")
|
|
let servers_path = ($settings.infra_path | path join "servers.ncl")
|
|
let prov_root = ($env.PROVISIONING? | default "/usr/local/provisioning")
|
|
|
|
if not ($dag_path | path exists) { return { has_dag: false } }
|
|
|
|
let dag = (try {
|
|
ncl-eval $dag_path [$prov_root]
|
|
} catch {
|
|
return { has_dag: false }
|
|
})
|
|
|
|
# Formulas live in dag.ncl (moved from servers.ncl in unified component architecture).
|
|
# dag.formulas — formula definitions (id, server, nodes, max_parallel)
|
|
# dag.composition.formulas — DAG metadata (depends_on, parallel, health_gate)
|
|
let raw_formulas = ($dag | get -o formulas | default [])
|
|
if ($raw_formulas | is-empty) { return { has_dag: false } }
|
|
|
|
# Build formula map: formula_id → { server, nodes, depends_on, parallel, health_gate }
|
|
let formula_map = ($raw_formulas | each {|f|
|
|
let dag_entry = ($dag.composition.formulas | where formula_id == $f.id | get 0?)
|
|
{
|
|
id: $f.id,
|
|
server: $f.server,
|
|
nodes: $f.nodes,
|
|
max_parallel: ($f.max_parallel? | default 4),
|
|
depends_on: (if ($dag_entry | is-not-empty) { $dag_entry.depends_on } else { [] }),
|
|
parallel: (if ($dag_entry | is-not-empty) { $dag_entry.parallel? | default false } else { false }),
|
|
health_gate: (if ($dag_entry | is-not-empty) { $dag_entry.health_gate? | default null } else { null }),
|
|
}
|
|
})
|
|
|
|
{ has_dag: true, formulas: $formula_map }
|
|
}
|
|
|
|
# Find all formulas that contain a given taskserv name.
|
|
# Extract the component/taskserv name from a formula node (handles both field shapes).
|
|
def node-name [n: record]: nothing -> string {
|
|
$n | get -o taskserv | default null | get -o name
|
|
| default ($n | get -o component | default null | get -o name | default "")
|
|
}
|
|
|
|
def find-formulas-for-taskserv [dag: record, taskserv_name: string, server_filter: string]: nothing -> list {
|
|
$dag.formulas | where {|f|
|
|
let has_taskserv = ($f.nodes | any {|n| (node-name $n) == $taskserv_name })
|
|
let matches_server = ($server_filter == "" or $f.server == $server_filter)
|
|
$has_taskserv and $matches_server
|
|
}
|
|
}
|
|
|
|
# Walk the DAG backwards from target formulas to collect all prerequisites.
|
|
# Returns formula_ids in topological order (prerequisites first).
|
|
def resolve-prerequisites [dag: record, target_ids: list<string>]: nothing -> list<string> {
|
|
let all_ids = ($dag.formulas | each {|f| $f.id })
|
|
|
|
# Recursive walk: collect all transitive dependencies
|
|
mut visited = []
|
|
mut queue = $target_ids
|
|
|
|
while ($queue | is-not-empty) {
|
|
let current = ($queue | first)
|
|
$queue = ($queue | skip 1)
|
|
if $current in $visited { continue }
|
|
$visited = ($visited | append $current)
|
|
let formula = ($dag.formulas | where id == $current | get 0?)
|
|
if ($formula | is-not-empty) {
|
|
for dep in $formula.depends_on {
|
|
if $dep.formula_id not-in $visited {
|
|
$queue = ($queue | append $dep.formula_id)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
# Topological sort: Kahn's algorithm
|
|
# Build adjacency from the visited subset only
|
|
let subset = ($dag.formulas | where {|f| $f.id in $visited })
|
|
mut in_degree = ($subset | each {|f| { $f.id: 0 } } | reduce -f {} {|it, acc| $acc | merge $it })
|
|
for f in $subset {
|
|
for dep in $f.depends_on {
|
|
if $dep.formula_id in $visited {
|
|
let cur = ($in_degree | get $f.id)
|
|
$in_degree = ($in_degree | upsert $f.id ($cur + 1))
|
|
}
|
|
}
|
|
}
|
|
|
|
mut sorted = []
|
|
mut zero_queue = ($in_degree | transpose k v | where v == 0 | each {|it| $it.k })
|
|
|
|
while ($zero_queue | is-not-empty) {
|
|
let node = ($zero_queue | first)
|
|
$zero_queue = ($zero_queue | skip 1)
|
|
$sorted = ($sorted | append $node)
|
|
|
|
# Find formulas that depend on this node
|
|
for f in $subset {
|
|
let depends_on_node = ($f.depends_on | any {|d| $d.formula_id == $node })
|
|
if $depends_on_node {
|
|
let cur = ($in_degree | get $f.id)
|
|
$in_degree = ($in_degree | upsert $f.id ($cur - 1))
|
|
if ($cur - 1) == 0 {
|
|
$zero_queue = ($zero_queue | append $f.id)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
$sorted
|
|
}
|
|
|
|
# Check if a formula is fully completed in state.
|
|
def formula-completed [workspace_path: string, formula: record]: nothing -> bool {
|
|
let st = (state-read $workspace_path)
|
|
let srv_state = ($st.servers | get -o $formula.server | default {} | get -o taskservs | default {})
|
|
$formula.nodes | all {|n|
|
|
let ts_name = (node-name $n)
|
|
let node_state = ($srv_state | get -o $ts_name | default {} | get -o state | default "pending")
|
|
$node_state == "completed"
|
|
}
|
|
}
|
|
|
|
# Execute a health gate command on the appropriate server via SSH.
|
|
# Uses the gate's timeout_ms as total budget, distributing retries with backoff.
|
|
# For a CP health gate (180s timeout, 10 retries) this gives ~18s between checks
|
|
# with increasing intervals — enough for apiserver + cilium to stabilize.
|
|
def run-health-gate [settings: record, formula: record]: nothing -> bool {
|
|
let gate = $formula.health_gate
|
|
if ($gate | is-empty) or $gate == null { return true }
|
|
|
|
_print $" health gate: ($formula.id) ..."
|
|
let server = ($settings.data.servers | where hostname == $formula.server | get 0?)
|
|
if ($server | is-empty) {
|
|
_print $" ⚠ server ($formula.server) not found for health gate"
|
|
return false
|
|
}
|
|
|
|
let ip = (do { mw_get_ip $settings $server "public" false } catch { "" })
|
|
let max_retries = ($gate.retries? | default 6)
|
|
let timeout_ms = ($gate.timeout_ms? | default 60000)
|
|
# Base interval: distribute total timeout across retries, minimum 10s
|
|
let base_wait_raw = ($timeout_ms / $max_retries / 1000)
|
|
let base_wait = (if $base_wait_raw < 10 { 10 } else { $base_wait_raw })
|
|
mut remaining = $max_retries
|
|
mut elapsed_ms = 0
|
|
|
|
while $remaining > 0 and $elapsed_ms < $timeout_ms {
|
|
let ok = (ssh_cmd $settings $server false $gate.check_cmd $ip)
|
|
if $ok {
|
|
_print $" ✅ health gate ($formula.id) passed"
|
|
return true
|
|
}
|
|
$remaining -= 1
|
|
if $remaining > 0 {
|
|
let attempt = ($max_retries - $remaining)
|
|
# Backoff: first attempts wait base_wait, later ones wait 1.5x
|
|
let wait = if $attempt <= 2 { $base_wait } else { (($base_wait * 1.5) | into int) }
|
|
let wait_int = ($wait | into int)
|
|
_print $" ⏳ gate ($attempt)/($max_retries) — retry in ($wait_int)s"
|
|
sleep ($"($wait_int)sec" | into duration)
|
|
$elapsed_ms = ($elapsed_ms + ($wait_int * 1000))
|
|
}
|
|
}
|
|
_print $" 🛑 health gate ($formula.id) failed after ($max_retries) attempts \(($timeout_ms / 1000)s timeout)"
|
|
false
|
|
}
|
|
|
|
# Main entry: DAG-aware taskserv execution.
|
|
#
|
|
# If dag.ncl exists, resolves the full dependency chain and executes
|
|
# formulas in topological order. Otherwise falls back to on_taskservs.
|
|
export def dag-aware-create [
|
|
settings: record
|
|
match_taskserv: string
|
|
match_server: string
|
|
iptype: string
|
|
check: bool
|
|
upload: bool = false
|
|
reset: bool = false
|
|
cmd: string = ""
|
|
]: nothing -> nothing {
|
|
let dag = (load-dag $settings)
|
|
|
|
if not $dag.has_dag {
|
|
# No DAG — fall back to direct execution
|
|
on_taskservs $settings $match_taskserv "" $match_server $iptype $check $upload $reset $cmd
|
|
return
|
|
}
|
|
|
|
let workspace_path = ($settings.src_path? | default $env.PWD)
|
|
|
|
# Ensure all formula nodes exist in state — nodes installed before state
|
|
# tracking was active have no entry and get silently skipped by the gate.
|
|
# Only initialise nodes that have never been written (actor.identity empty = default
|
|
# from state-node-get). This avoids resetting completed nodes when hyphenated
|
|
# server names cause get -o to return {} instead of the real server record.
|
|
for formula in $dag.formulas {
|
|
for node in $formula.nodes {
|
|
let node_nm = (node-name $node)
|
|
let existing = (state-node-get $workspace_path $formula.server $node_nm)
|
|
if ($existing.actor?.identity? | default "" | is-empty) {
|
|
state-node-set $workspace_path $formula.server $node_nm {
|
|
state: "pending",
|
|
operation: "create",
|
|
profile: ($node | get -o taskserv | default {} | get -o profile | default "default"),
|
|
started_at: "",
|
|
ended_at: "",
|
|
blocker: "",
|
|
actor: { identity: "system", source: "dag-executor" },
|
|
log: [{ ts: ((date now) | format date "%Y-%m-%dT%H:%M:%SZ"), event: "dag-init", source: "dag-executor" }],
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
# Find target formulas containing the requested taskserv
|
|
let targets = (find-formulas-for-taskserv $dag $match_taskserv $match_server)
|
|
if ($targets | is-empty) {
|
|
_print $"⚠ No formula contains taskserv ($match_taskserv) for server ($match_server)"
|
|
return
|
|
}
|
|
|
|
let target_ids = ($targets | each {|f| $f.id })
|
|
|
|
# Resolve full dependency chain in topological order
|
|
let execution_order = (resolve-prerequisites $dag $target_ids)
|
|
|
|
_print $"DAG execution plan: ($execution_order | length) formula\(s\)"
|
|
for fid in $execution_order {
|
|
let is_target = $fid in $target_ids
|
|
let tag = if $is_target { " [target]" } else { " [prerequisite]" }
|
|
_print $" ($fid)($tag)"
|
|
}
|
|
_print ""
|
|
|
|
# Execute formulas in order.
|
|
# A formula failure or health gate failure stops the entire DAG —
|
|
# dependent formulas never run if their prerequisite is broken.
|
|
for formula_id in $execution_order {
|
|
let formula = ($dag.formulas | where id == $formula_id | first)
|
|
|
|
# Skip completed formulas (unless reset)
|
|
if not $reset and $cmd == "" and (formula-completed $workspace_path $formula) {
|
|
_print $"⊘ ($formula_id) — already completed"
|
|
# Verify health gate still passes for completed prereqs
|
|
if $formula.health_gate != null {
|
|
if not (run-health-gate $settings $formula) {
|
|
_print $"🛑 ($formula_id) was completed but health gate now fails — stopping"
|
|
_print $" Run with --reset to re-execute this formula"
|
|
return
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
_print $"▶ ($formula_id) on ($formula.server)"
|
|
|
|
# Execute each formula node in order — only the taskservs declared
|
|
# in the formula, not every taskserv on the server.
|
|
# When match_taskserv is set, only that specific node runs;
|
|
# the state gate inside on_taskservs skips already-completed nodes.
|
|
for node in $formula.nodes {
|
|
let nm = (node-name $node)
|
|
if $match_taskserv == "" or $nm == $match_taskserv {
|
|
on_taskservs $settings $nm "" $formula.server $iptype $check $upload $reset $cmd
|
|
}
|
|
}
|
|
|
|
# Check if formula completed successfully by reading state.
|
|
# Skip when a specific taskserv was requested — partial runs are intentional.
|
|
# If any node failed, stop — do not proceed to dependent formulas.
|
|
if $match_taskserv == "" and not (formula-completed $workspace_path $formula) {
|
|
let failed_nodes = ($formula.nodes | where {|n|
|
|
let st = (state-node-get $workspace_path $formula.server (node-name $n))
|
|
$st.state != "completed"
|
|
} | each {|n| node-name $n })
|
|
_print $"🛑 ($formula_id) failed — nodes not completed: ($failed_nodes | str join ', ')"
|
|
_print $" Fix the issue and re-run. Dependent formulas will not execute."
|
|
return
|
|
}
|
|
|
|
# Health gate: verify the formula's services are actually operational.
|
|
# Retries with backoff — services like apiserver need time after install.
|
|
# Skip for partial runs — health gate only makes sense on full formula completion.
|
|
if $match_taskserv == "" and $formula.health_gate != null {
|
|
if not (run-health-gate $settings $formula) {
|
|
_print $"🛑 ($formula_id) health gate failed — stopping"
|
|
_print $" The formula completed but services are not healthy."
|
|
_print $" Check logs on ($formula.server) and re-run."
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
_print $"✅ DAG execution complete"
|
|
}
|