# Workspace provisioning state — read/write/transition for .provisioning-state.ncl # Pattern: nickel export --format json for reads; atomic temp+rename for writes. # Follows images/state.nu conventions. use ../lib_provisioning/utils/nickel_processor.nu [ncl-eval] use ../lib_provisioning/config/cache/nickel.nu [request-ncl-sync] # ─── Path helpers ──────────────────────────────────────────────────────────── export def state-path [workspace_path: string]: nothing -> string { $workspace_path | path join ".provisioning-state.ncl" } def state-tmp-path [workspace_path: string]: nothing -> string { $workspace_path | path join ".provisioning-state.ncl.tmp" } # Maximum log entries retained per node. Older entries are dropped. const LOG_MAX_ENTRIES = 50 # Trim log to last LOG_MAX_ENTRIES entries. def log-trim [entries: list]: nothing -> list { let n = ($entries | length) if $n <= $LOG_MAX_ENTRIES { return $entries } $entries | last $LOG_MAX_ENTRIES } # ─── Serialization ─────────────────────────────────────────────────────────── # Serialize a log entry list to Nickel array literal. def serialize-log [entries: list]: nothing -> string { if ($entries | is-empty) { return "[]" } let inner = ($entries | each {|e| $" \{ ts = \"($e.ts)\", event = \"($e.event)\", source = '($e.source) \}," } | str join "\n") $"[\n($inner)\n ]" } # Serialize a taskserv state record to Nickel literal. def serialize-taskserv [name: string, ts: record]: nothing -> string { let log_str = (serialize-log ($ts.log? | default [])) $" ($name) = \{ state = '($ts.state? | default "pending"), operation = '($ts.operation? | default "create"), profile = \"($ts.profile? | default "")\", started_at = \"($ts.started_at? | default "")\", ended_at = \"($ts.ended_at? | default "")\", blocker = \"($ts.blocker? | default "")\", actor = \{ identity = \"($ts.actor?.identity? | default "")\", source = '($ts.actor?.source? | default "orchestrator"), \}, log = ($log_str), \}," } # Serialize a server state record to Nickel literal. def serialize-server [hostname: string, srv: record]: nothing -> string { let taskservs_str = if ($srv.taskservs? | default {} | is-empty) { " \{\}" } else { let inner = ($srv.taskservs | transpose k v | each {|it| serialize-taskserv $it.k $it.v } | str join "\n") $" \{\n($inner)\n \}" } $" ($hostname) = \{ provider_id = \"($srv.provider_id? | default "")\", provider_state = '($srv.provider_state? | default "unknown"), last_sync = \"($srv.last_sync? | default "")\", taskservs = ($taskservs_str), \}," } # Serialize the full workspace state record to a Nickel file literal. def serialize-state [state: record]: nothing -> string { let servers_str = if ($state.servers? | default {} | is-empty) { "\{\}" } else { let inner = ($state.servers | transpose k v | each {|it| serialize-server $it.k $it.v } | str join "\n") $"\{\n($inner)\n\}" } $"\{ workspace = \"($state.workspace)\", cluster = \"($state.cluster)\", schema_version = \"($state.schema_version? | default "2.0")\", servers = ($servers_str), \}" } # ─── Read ───────────────────────────────────────────────────────────────────── # Read workspace state. Returns a record with WorkspaceState fields. # Missing file returns all-pending default — never errors on absence. export def state-read [workspace_path: string]: nothing -> record { let path = (state-path $workspace_path) if not ($path | path exists) { return { workspace: ($workspace_path | path basename), cluster: "", schema_version: "2.0", servers: {}, } } ncl-eval $path [] } # Read state for a specific DAG node (server + taskserv). # Returns null if the server or taskserv is not present. export def state-node-get [ workspace_path: string hostname: string taskserv: string ]: nothing -> record { let st = (state-read $workspace_path) let srv = ($st.servers | get -o $hostname | default {}) $srv.taskservs? | default {} | get -o $taskserv | default { state: "pending", operation: "create", profile: "", started_at: "", ended_at: "", actor: { identity: "", source: "orchestrator" }, log: [], } } # ─── Write ──────────────────────────────────────────────────────────────────── # Write the full state atomically (temp + rename). # Signals ncl-sync to re-export eagerly (belt-and-suspenders over the file watcher). export def state-write [workspace_path: string, state: record]: nothing -> nothing { let path = (state-path $workspace_path) let tmp_path = (state-tmp-path $workspace_path) (serialize-state $state) | save --force $tmp_path ^mv $tmp_path $path let prov = ($env.PROVISIONING? | default "") let imports = if ($prov | is-not-empty) { [$workspace_path $prov] } else { [$workspace_path] } request-ncl-sync $path --import-paths $imports } # ─── Node transitions ───────────────────────────────────────────────────────── # Update a single DAG node state. Merges into the existing state atomically. export def state-node-set [ workspace_path: string hostname: string taskserv: string node_state: record # Partial taskserv state fields to merge ]: nothing -> nothing { mut st = (state-read $workspace_path) # Read existing server — fall back to empty structure if not present let current_server = ( $st.servers | transpose k v | where { |r| $r.k == $hostname } | get -o v.0 | default { provider_id: "", provider_state: "unknown", last_sync: "", taskservs: {} } ) # Read existing taskserv — merge node_state over it (preserves unset fields) let current_ts = ($current_server.taskservs? | default {}) let existing_node = ( $current_ts | transpose k v | where { |r| $r.k == $taskserv } | get -o v.0 | default { state: "pending", operation: "create", profile: "", started_at: "", ended_at: "", blocker: "", actor: { identity: "", source: "orchestrator" }, log: [] } ) let merged = ($existing_node | merge $node_state) # Upsert the taskserv into the existing taskservs (preserves all other taskservs) let new_ts = ($current_ts | upsert $taskserv $merged) # Upsert the server back into servers (preserves all other servers) let new_server = ($current_server | upsert taskservs $new_ts) let new_servers = ( $st.servers | transpose k v | each { |r| if $r.k == $hostname { { k: $r.k, v: $new_server } } else { $r } } | if ($in | where k == $hostname | is-empty) { append { k: $hostname, v: $new_server } } else { $in } | transpose -r -d ) $st.servers = $new_servers state-write $workspace_path $st } # Transition: pending → running. Writes started_at + actor. export def state-node-start [ workspace_path: string hostname: string taskserv: string --actor: string = "system" --source: string = "orchestrator" --operation: string = "create" --profile: string = "" ]: nothing -> nothing { let ts = ((date now) | format date "%Y-%m-%dT%H:%M:%SZ") let existing = (state-node-get $workspace_path $hostname $taskserv) let updated_log = (log-trim ($existing.log? | default [] | append { ts: $ts, event: "started", source: $source, })) state-node-set $workspace_path $hostname $taskserv { state: "running", operation: $operation, profile: $profile, started_at: $ts, ended_at: "", actor: { identity: $actor, source: $source }, log: $updated_log, } } # Transition: running → completed | failed. Writes ended_at + log entry. export def state-node-finish [ workspace_path: string hostname: string taskserv: string --success --source: string = "orchestrator" ]: nothing -> nothing { let ts = ((date now) | format date "%Y-%m-%dT%H:%M:%SZ") let outcome = if $success { "completed" } else { "failed" } let existing = (state-node-get $workspace_path $hostname $taskserv) let updated_log = (log-trim ($existing.log? | default [] | append { ts: $ts, event: $outcome, source: $source, })) state-node-set $workspace_path $hostname $taskserv { state: $outcome, ended_at: $ts, log: $updated_log, } } # ─── Orchestrator decision ──────────────────────────────────────────────────── # Returns true if the orchestrator should skip this node (already completed). export def state-node-skip? [ workspace_path: string hostname: string taskserv: string ]: nothing -> bool { let node = (state-node-get $workspace_path $hostname $taskserv) $node.state == "completed" } # Returns the execution decision for a node WITHOUT dependency check. # Use state-node-decision-with-deps when depends_on is available. export def state-node-decision [ workspace_path: string hostname: string taskserv: string ]: nothing -> string { let node = (state-node-get $workspace_path $hostname $taskserv) match $node.state { "completed" => "skip", "failed" => "rerun", "blocked" => "blocked", _ => "run", } } # Check all depends_on nodes for a given DAG node. # Returns: { ready: bool, blocker: string } — blocker is "" when ready. # A node is blocked if any dependency is failed, blocked, or not completed. export def state-dag-check-deps [ workspace_path: string hostname: string depends_on: list # List of taskserv names this node depends on ]: nothing -> record { if ($depends_on | is-empty) { return { ready: true, blocker: "" } } let first_blocker = ($depends_on | each {|dep| let dep_node = (state-node-get $workspace_path $hostname $dep) match $dep_node.state { "completed" => null, _ => $dep, } } | compact | first?) if ($first_blocker | is-empty) { { ready: true, blocker: "" } } else { { ready: false, blocker: $first_blocker } } } # Full decision including dependency propagation. # depends_on: list of taskserv names this node depends on (from DAG definition). # Outputs: skip | run | rerun | blocked: export def state-node-decision-with-deps [ workspace_path: string hostname: string taskserv: string depends_on: list ]: nothing -> string { # First check own state let own = (state-node-decision $workspace_path $hostname $taskserv) if $own == "skip" { return "skip" } # Then check dependencies — a non-completed dep blocks regardless of own state let dep_check = (state-dag-check-deps $workspace_path $hostname $depends_on) if not $dep_check.ready { # Write blocked state into the state file so it's visible in the audit log state-node-block $workspace_path $hostname $taskserv $dep_check.blocker return $"blocked:($dep_check.blocker)" } $own } # Transition a node to 'blocked, recording which dependency is blocking it. export def state-node-block [ workspace_path: string hostname: string taskserv: string blocker: string ]: nothing -> nothing { let ts = ((date now) | format date "%Y-%m-%dT%H:%M:%SZ") let existing = (state-node-get $workspace_path $hostname $taskserv) let updated_log = (log-trim ($existing.log? | default [] | append { ts: $ts, event: $"blocked-by:($blocker)", source: "orchestrator", })) state-node-set $workspace_path $hostname $taskserv { state: "blocked", blocker: $blocker, log: $updated_log, } } # ─── Init ───────────────────────────────────────────────────────────────────── # Bootstrap .provisioning-state.ncl from a settings record. # Safe to call on an existing file — merges servers found in settings without # overwriting existing node states. export def state-init [ workspace_path: string settings: record # Provisioning settings record (has .data.servers) ]: nothing -> nothing { let existing = (state-read $workspace_path) let cluster = ($settings.data.cluster_name? | default ($settings.data.cluster? | default "")) mut st = ($existing | merge { workspace: ($workspace_path | path basename), cluster: $cluster, schema_version: "2.0", }) # Ensure every server in settings has an entry in state (all-pending if new) for srv in ($settings.data.servers? | default []) { let h = $srv.hostname if not ($st.servers | columns | any {|c| $c == $h}) { $st.servers = ($st.servers | insert $h { provider_id: "", provider_state: "unknown", last_sync: "", taskservs: {}, }) } } state-write $workspace_path $st } # ─── Migration ──────────────────────────────────────────────────────────────── # Migrate .provisioning-state.json → .provisioning-state.ncl. # Reads known fields from the JSON format and writes a valid NCL state file. # The JSON format has: cluster, timestamp, version, state.{ssh_keys,network,firewall,volumes,servers} # All migrated nodes start as 'unknown (not 'completed) — sync must confirm their real state. export def state-migrate-from-json [ workspace_path: string ]: nothing -> nothing { let json_path = ($workspace_path | path join ".provisioning-state.json") let ncl_path = (state-path $workspace_path) if not ($json_path | path exists) { error make { msg: $"No .provisioning-state.json found at ($json_path)" } } if ($ncl_path | path exists) { error make { msg: $"($ncl_path) already exists — remove it first to migrate" } } let json = (open $json_path | from json) let ts = ((date now) | format date "%Y-%m-%dT%H:%M:%SZ") # Build server entries from json.state.servers (flat map of hostname → provider_id) mut servers = {} let json_servers = ($json.state?.servers? | default {}) for entry in ($json_servers | transpose k v) { $servers = ($servers | insert $entry.k { provider_id: ($entry.v | default ""), provider_state: "unknown", last_sync: $ts, taskservs: {}, }) } let migrated = { workspace: ($workspace_path | path basename), cluster: ($json.cluster? | default ""), schema_version: "2.0", servers: $servers, } state-write $workspace_path $migrated _print $"Migrated ($json_path) → ($ncl_path)" _print $"All servers set to provider_state=unknown. Run `provisioning sync` to reconcile." } # ─── Inspection ─────────────────────────────────────────────────────────────── # Display workspace state as a table. # Columns: server | taskserv | state | blocker | operation | actor | started_at | ended_at export def state-show [ workspace_path: string --server: string = "" # Filter by hostname ]: nothing -> nothing { let st = (state-read $workspace_path) let rows = ($st.servers | transpose hostname srv | each {|s| if ($server | is-not-empty) and $s.hostname != $server { return [] } let taskservs = ($s.srv.taskservs? | default {}) if ($taskservs | is-empty) { return [[hostname taskserv state blocker operation actor started_at ended_at]; [$s.hostname "—" $s.srv.provider_state "" "" "" $s.srv.last_sync ""]] } $taskservs | transpose taskserv node | each {|t| { hostname: $s.hostname, taskserv: $t.taskserv, state: $t.node.state, blocker: ($t.node.blocker? | default ""), operation: ($t.node.operation? | default ""), actor: ($t.node.actor?.identity? | default ""), started_at: ($t.node.started_at? | default ""), ended_at: ($t.node.ended_at? | default ""), } } } | flatten) if ($rows | is-empty) { print "(no state entries)" return } print ($rows | table) } # Reset a node back to 'pending — clears state, blocker, log, and timestamps. export def state-node-reset [ workspace_path: string hostname: string taskserv: string --source: string = "cli" --actor: string = "" ]: nothing -> nothing { let ts = ((date now) | format date "%Y-%m-%dT%H:%M:%SZ") let actor_id = if ($actor | is-not-empty) { $actor } else { $env.USER? | default "system" } state-node-set $workspace_path $hostname $taskserv { state: "pending", blocker: "", started_at: "", ended_at: "", actor: { identity: $actor_id, source: $source }, log: [{ ts: $ts, event: "reset", source: $source }], } } # Remove a taskserv entry from a server's state entirely. # Used after `delete` — the taskserv no longer exists on that server. export def state-node-delete [ workspace_path: string hostname: string taskserv: string ]: nothing -> nothing { mut st = (state-read $workspace_path) if not ($st.servers | columns | any {|c| $c == $hostname}) { return } let current_ts = ($st.servers | get $hostname | get -o taskservs | default {}) if not ($current_ts | columns | any {|c| $c == $taskserv}) { return } $st.servers = ($st.servers | update $hostname {|srv| $srv | upsert taskservs ($current_ts | reject $taskserv) }) state-write $workspace_path $st } # ─── Drift detection ───────────────────────────────────────────────────────── # Compare servers.ncl (desired) against .provisioning-state.ncl (tracked). # Returns a table of drift entries: { server, taskserv, drift, state }. # drift = "orphaned" — in state but NOT in servers.ncl (was removed) # drift = "missing" — in servers.ncl but NOT in state (needs create) # drift = "ok" — present in both export def state-drift [ workspace_path: string settings: record --server: string = "" ]: nothing -> list { let st = (state-read $workspace_path) let desired_servers = ($settings.data.servers? | default []) mut rows = [] for srv in $desired_servers { if ($server | is-not-empty) and $srv.hostname != $server { continue } let desired_taskservs = ($srv.taskservs | each {|t| $t.name }) let state_taskservs = ($st.servers | get -o $srv.hostname | default {} | get -o taskservs | default {} | columns) # Check desired vs state for ts_name in $desired_taskservs { if $ts_name in $state_taskservs { let node = ($st.servers | get $srv.hostname | get taskservs | get $ts_name) $rows = ($rows | append { server: $srv.hostname, taskserv: $ts_name, drift: "ok", state: ($node.state? | default "pending"), }) } else { $rows = ($rows | append { server: $srv.hostname, taskserv: $ts_name, drift: "missing", state: "—", }) } } # Orphaned: in state but not in desired for ts_name in $state_taskservs { if $ts_name not-in $desired_taskservs { let node = ($st.servers | get $srv.hostname | get taskservs | get $ts_name) $rows = ($rows | append { server: $srv.hostname, taskserv: $ts_name, drift: "orphaned", state: ($node.state? | default "unknown"), }) } } } # Orphaned servers: in state but not in settings at all let desired_hostnames = ($desired_servers | each {|s| $s.hostname }) for srv_name in ($st.servers | columns) { if ($server | is-not-empty) and $srv_name != $server { continue } if $srv_name not-in $desired_hostnames { let state_taskservs = ($st.servers | get $srv_name | get -o taskservs | default {} | columns) for ts_name in $state_taskservs { let node = ($st.servers | get $srv_name | get taskservs | get $ts_name) $rows = ($rows | append { server: $srv_name, taskserv: $ts_name, drift: "orphaned", state: ($node.state? | default "unknown"), }) } } } $rows } # Reconcile .provisioning-state.ncl to match servers.ncl. # - Removes orphaned taskserv entries (in state but not in servers.ncl) # - Adds pending entries for new taskservs (in servers.ncl but not in state) # Returns { removed: list, added: list } for reporting. export def state-reconcile [ workspace_path: string settings: record --server: string = "" --dry-run ]: nothing -> record { let drift = (state-drift $workspace_path $settings --server $server) let orphaned = ($drift | where drift == "orphaned") let missing = ($drift | where drift == "missing") if $dry_run { return { removed: $orphaned, added: $missing } } let ts = ((date now) | format date "%Y-%m-%dT%H:%M:%SZ") # Remove orphaned entries for entry in $orphaned { state-node-delete $workspace_path $entry.server $entry.taskserv } # Add pending entries for missing taskservs for entry in $missing { state-node-set $workspace_path $entry.server $entry.taskserv { state: "pending", operation: "create", profile: "", started_at: "", ended_at: "", blocker: "", actor: { identity: "system", source: "reconcile" }, log: [{ ts: $ts, event: "reconcile-added", source: "reconcile" }], } } { removed: $orphaned, added: $missing } } # ─── Sync helpers ───────────────────────────────────────────────────────────── # Mark a server's provider state from an external API response. # Only writes 'running or 'off — never marks taskservs as completed. export def state-server-sync [ workspace_path: string hostname: string --provider-id: string = "" --provider-state: string = "unknown" ]: nothing -> nothing { let ts = ((date now) | format date "%Y-%m-%dT%H:%M:%SZ") mut st = (state-read $workspace_path) if not ($st.servers | columns | any {|c| $c == $hostname}) { $st.servers = ($st.servers | insert $hostname { provider_id: $provider_id, provider_state: $provider_state, last_sync: $ts, taskservs: {}, }) } else { $st.servers = ($st.servers | update $hostname {|srv| $srv | merge { provider_id: (if ($provider_id | is-not-empty) { $provider_id } else { $srv.provider_id }), provider_state: $provider_state, last_sync: $ts, } }) } state-write $workspace_path $st }