prvng_core/nulib/workspace/state.nu
Jesús Pérez a6ecf5b7fb
fix(core): resolve undefined symbols hidden by lib_provisioning star-imports
Six symbols were referenced across the codebase but had no definition anywhere.
Star-imports from lib_provisioning/mod.nu silenced the missing-def errors at
parse time; at runtime the call sites either threw or took dead code paths.
ADR-025 Phase 2 (AST audit) surfaced them as blockers for Phase 3 because
selective imports would expose them as "variable not found" errors.

Resolution: add stub getters in lib_provisioning/config/accessor/functions.nu
following the existing pattern (env -> config -> PROVISIONING-derived -> ""):

  - get-providers-path          (14 call sites)
  - get-prov-lib-path           (2 call sites)
  - get-core-nulib-path         (7 call sites)
  - get-provisioning-generate-dirpath  (5 call sites)
  - get-provisioning-generate-defsfile (1 call site)
  - get-provisioning-req-versions (4 call sites)

All existing callers already guard results with is-empty / path exists checks,
so empty-string returns fall back to safe no-op paths.

show_tools_info (main_provisioning/tools.nu) was missing a guard around its
open call; added is-empty / path-exists check matching sibling fns.

The only non-path symbol (on_clusters in clusters/create.nu) had no recoverable
implementation; its closure is replaced with a user-facing message directing
to 'prvng cluster deploy' (the supported workflow).

Refs: ADR-025, .coder/benchmarks/phase2-findings.md blockers section
2026-04-17 07:43:34 +01:00

641 lines
25 KiB
Text

# Workspace provisioning state — read/write/transition for .provisioning-state.ncl
# Pattern: nickel export --format json for reads; atomic temp+rename for writes.
# Follows images/state.nu conventions.
use ../lib_provisioning/utils/nickel_processor.nu [ncl-eval]
use ../lib_provisioning/config/cache/nickel.nu [request-ncl-sync]
# ─── Path helpers ────────────────────────────────────────────────────────────
export def state-path [workspace_path: string]: nothing -> string {
$workspace_path | path join ".provisioning-state.ncl"
}
def state-tmp-path [workspace_path: string]: nothing -> string {
$workspace_path | path join ".provisioning-state.ncl.tmp"
}
# Maximum log entries retained per node. Older entries are dropped.
const LOG_MAX_ENTRIES = 50
# Trim log to last LOG_MAX_ENTRIES entries.
export def log-trim [entries: list]: nothing -> list {
let n = ($entries | length)
if $n <= $LOG_MAX_ENTRIES { return $entries }
$entries | last $LOG_MAX_ENTRIES
}
# ─── Serialization ───────────────────────────────────────────────────────────
# Serialize a log entry list to Nickel array literal.
def serialize-log [entries: list]: nothing -> string {
if ($entries | is-empty) { return "[]" }
let inner = ($entries | each {|e|
$" \{ ts = \"($e.ts)\", event = \"($e.event)\", source = '($e.source) \},"
} | str join "\n")
$"[\n($inner)\n ]"
}
# Serialize a taskserv state record to Nickel literal.
def serialize-taskserv [name: string, ts: record]: nothing -> string {
let log_str = (serialize-log ($ts.log? | default []))
$" ($name) = \{
state = '($ts.state? | default "pending"),
operation = '($ts.operation? | default "create"),
profile = \"($ts.profile? | default "")\",
started_at = \"($ts.started_at? | default "")\",
ended_at = \"($ts.ended_at? | default "")\",
blocker = \"($ts.blocker? | default "")\",
actor = \{
identity = \"($ts.actor?.identity? | default "")\",
source = '($ts.actor?.source? | default "orchestrator"),
\},
log = ($log_str),
\},"
}
# Serialize a server state record to Nickel literal.
def serialize-server [hostname: string, srv: record]: nothing -> string {
let taskservs_str = if ($srv.taskservs? | default {} | is-empty) {
" \{\}"
} else {
let inner = ($srv.taskservs | transpose k v | each {|it|
serialize-taskserv $it.k $it.v
} | str join "\n")
$" \{\n($inner)\n \}"
}
$" ($hostname) = \{
provider_id = \"($srv.provider_id? | default "")\",
provider_state = '($srv.provider_state? | default "unknown"),
last_sync = \"($srv.last_sync? | default "")\",
taskservs = ($taskservs_str),
\},"
}
# Serialize the full workspace state record to a Nickel file literal.
def serialize-state [state: record]: nothing -> string {
let servers_str = if ($state.servers? | default {} | is-empty) {
"\{\}"
} else {
let inner = ($state.servers | transpose k v | each {|it|
serialize-server $it.k $it.v
} | str join "\n")
$"\{\n($inner)\n\}"
}
$"\{
workspace = \"($state.workspace)\",
cluster = \"($state.cluster)\",
schema_version = \"($state.schema_version? | default "2.0")\",
servers = ($servers_str),
\}"
}
# ─── Read ─────────────────────────────────────────────────────────────────────
# Read workspace state. Returns a record with WorkspaceState fields.
# Missing file returns all-pending default — never errors on absence.
export def state-read [workspace_path: string]: nothing -> record {
let path = (state-path $workspace_path)
if not ($path | path exists) {
return {
workspace: ($workspace_path | path basename),
cluster: "",
schema_version: "2.0",
servers: {},
}
}
ncl-eval $path []
}
# Read state for a specific DAG node (server + taskserv).
# Returns null if the server or taskserv is not present.
export def state-node-get [
workspace_path: string
hostname: string
taskserv: string
]: nothing -> record {
let st = (state-read $workspace_path)
let srv = ($st.servers | get -o $hostname | default {})
$srv.taskservs? | default {} | get -o $taskserv | default {
state: "pending",
operation: "create",
profile: "",
started_at: "",
ended_at: "",
actor: { identity: "", source: "orchestrator" },
log: [],
}
}
# ─── Write ────────────────────────────────────────────────────────────────────
# Write the full state atomically (temp + rename).
# Signals ncl-sync to re-export eagerly (belt-and-suspenders over the file watcher).
export def state-write [workspace_path: string, state: record]: nothing -> nothing {
let path = (state-path $workspace_path)
let tmp_path = (state-tmp-path $workspace_path)
(serialize-state $state) | save --force $tmp_path
^mv $tmp_path $path
let prov = ($env.PROVISIONING? | default "")
let imports = if ($prov | is-not-empty) { [$workspace_path $prov] } else { [$workspace_path] }
request-ncl-sync $path --import-paths $imports
}
# ─── Node transitions ─────────────────────────────────────────────────────────
# Update a single DAG node state. Merges into the existing state atomically.
export def state-node-set [
workspace_path: string
hostname: string
taskserv: string
node_state: record # Partial taskserv state fields to merge
]: nothing -> nothing {
mut st = (state-read $workspace_path)
# Read existing server — fall back to empty structure if not present
let current_server = (
$st.servers
| transpose k v
| where { |r| $r.k == $hostname }
| get -o v.0
| default { provider_id: "", provider_state: "unknown", last_sync: "", taskservs: {} }
)
# Read existing taskserv — merge node_state over it (preserves unset fields)
let current_ts = ($current_server.taskservs? | default {})
let existing_node = (
$current_ts
| transpose k v
| where { |r| $r.k == $taskserv }
| get -o v.0
| default { state: "pending", operation: "create", profile: "", started_at: "", ended_at: "", blocker: "", actor: { identity: "", source: "orchestrator" }, log: [] }
)
let merged = ($existing_node | merge $node_state)
# Upsert the taskserv into the existing taskservs (preserves all other taskservs)
let new_ts = ($current_ts | upsert $taskserv $merged)
# Upsert the server back into servers (preserves all other servers)
let new_server = ($current_server | upsert taskservs $new_ts)
let new_servers = (
$st.servers
| transpose k v
| each { |r| if $r.k == $hostname { { k: $r.k, v: $new_server } } else { $r } }
| if ($in | where k == $hostname | is-empty) { append { k: $hostname, v: $new_server } } else { $in }
| transpose -r -d
)
$st.servers = $new_servers
state-write $workspace_path $st
}
# Transition: pending → running. Writes started_at + actor.
export def state-node-start [
workspace_path: string
hostname: string
taskserv: string
--actor: string = "system"
--source: string = "orchestrator"
--operation: string = "create"
--profile: string = ""
]: nothing -> nothing {
let ts = ((date now) | format date "%Y-%m-%dT%H:%M:%SZ")
let existing = (state-node-get $workspace_path $hostname $taskserv)
let updated_log = (log-trim ($existing.log? | default [] | append {
ts: $ts,
event: "started",
source: $source,
}))
state-node-set $workspace_path $hostname $taskserv {
state: "running",
operation: $operation,
profile: $profile,
started_at: $ts,
ended_at: "",
actor: { identity: $actor, source: $source },
log: $updated_log,
}
}
# Transition: running → completed | failed. Writes ended_at + log entry.
export def state-node-finish [
workspace_path: string
hostname: string
taskserv: string
--success
--source: string = "orchestrator"
]: nothing -> nothing {
let ts = ((date now) | format date "%Y-%m-%dT%H:%M:%SZ")
let outcome = if $success { "completed" } else { "failed" }
let existing = (state-node-get $workspace_path $hostname $taskserv)
let updated_log = (log-trim ($existing.log? | default [] | append {
ts: $ts,
event: $outcome,
source: $source,
}))
state-node-set $workspace_path $hostname $taskserv {
state: $outcome,
ended_at: $ts,
log: $updated_log,
}
}
# ─── Orchestrator decision ────────────────────────────────────────────────────
# Returns true if the orchestrator should skip this node (already completed).
export def state-node-skip? [
workspace_path: string
hostname: string
taskserv: string
]: nothing -> bool {
let node = (state-node-get $workspace_path $hostname $taskserv)
$node.state == "completed"
}
# Returns the execution decision for a node WITHOUT dependency check.
# Use state-node-decision-with-deps when depends_on is available.
export def state-node-decision [
workspace_path: string
hostname: string
taskserv: string
]: nothing -> string {
let node = (state-node-get $workspace_path $hostname $taskserv)
match $node.state {
"completed" => "skip",
"failed" => "rerun",
"blocked" => "blocked",
_ => "run",
}
}
# Check all depends_on nodes for a given DAG node.
# Returns: { ready: bool, blocker: string } — blocker is "" when ready.
# A node is blocked if any dependency is failed, blocked, or not completed.
export def state-dag-check-deps [
workspace_path: string
hostname: string
depends_on: list<string> # List of taskserv names this node depends on
]: nothing -> record {
if ($depends_on | is-empty) {
return { ready: true, blocker: "" }
}
let first_blocker = ($depends_on | each {|dep|
let dep_node = (state-node-get $workspace_path $hostname $dep)
match $dep_node.state {
"completed" => null,
_ => $dep,
}
} | compact | first?)
if ($first_blocker | is-empty) {
{ ready: true, blocker: "" }
} else {
{ ready: false, blocker: $first_blocker }
}
}
# Full decision including dependency propagation.
# depends_on: list of taskserv names this node depends on (from DAG definition).
# Outputs: skip | run | rerun | blocked:<blocker_name>
export def state-node-decision-with-deps [
workspace_path: string
hostname: string
taskserv: string
depends_on: list<string>
]: nothing -> string {
# First check own state
let own = (state-node-decision $workspace_path $hostname $taskserv)
if $own == "skip" { return "skip" }
# Then check dependencies — a non-completed dep blocks regardless of own state
let dep_check = (state-dag-check-deps $workspace_path $hostname $depends_on)
if not $dep_check.ready {
# Write blocked state into the state file so it's visible in the audit log
state-node-block $workspace_path $hostname $taskserv $dep_check.blocker
return $"blocked:($dep_check.blocker)"
}
$own
}
# Transition a node to 'blocked, recording which dependency is blocking it.
export def state-node-block [
workspace_path: string
hostname: string
taskserv: string
blocker: string
]: nothing -> nothing {
let ts = ((date now) | format date "%Y-%m-%dT%H:%M:%SZ")
let existing = (state-node-get $workspace_path $hostname $taskserv)
let updated_log = (log-trim ($existing.log? | default [] | append {
ts: $ts,
event: $"blocked-by:($blocker)",
source: "orchestrator",
}))
state-node-set $workspace_path $hostname $taskserv {
state: "blocked",
blocker: $blocker,
log: $updated_log,
}
}
# ─── Init ─────────────────────────────────────────────────────────────────────
# Bootstrap .provisioning-state.ncl from a settings record.
# Safe to call on an existing file — merges servers found in settings without
# overwriting existing node states.
export def state-init [
workspace_path: string
settings: record # Provisioning settings record (has .data.servers)
]: nothing -> nothing {
let existing = (state-read $workspace_path)
let cluster = ($settings.data.cluster_name? | default ($settings.data.cluster? | default ""))
mut st = ($existing | merge {
workspace: ($workspace_path | path basename),
cluster: $cluster,
schema_version: "2.0",
})
# Ensure every server in settings has an entry in state (all-pending if new)
for srv in ($settings.data.servers? | default []) {
let h = $srv.hostname
if not ($st.servers | columns | any {|c| $c == $h}) {
$st.servers = ($st.servers | insert $h {
provider_id: "",
provider_state: "unknown",
last_sync: "",
taskservs: {},
})
}
}
state-write $workspace_path $st
}
# ─── Migration ────────────────────────────────────────────────────────────────
# Migrate .provisioning-state.json → .provisioning-state.ncl.
# Reads known fields from the JSON format and writes a valid NCL state file.
# The JSON format has: cluster, timestamp, version, state.{ssh_keys,network,firewall,volumes,servers}
# All migrated nodes start as 'unknown (not 'completed) — sync must confirm their real state.
export def state-migrate-from-json [
workspace_path: string
]: nothing -> nothing {
let json_path = ($workspace_path | path join ".provisioning-state.json")
let ncl_path = (state-path $workspace_path)
if not ($json_path | path exists) {
error make { msg: $"No .provisioning-state.json found at ($json_path)" }
}
if ($ncl_path | path exists) {
error make { msg: $"($ncl_path) already exists — remove it first to migrate" }
}
let json = (open $json_path | from json)
let ts = ((date now) | format date "%Y-%m-%dT%H:%M:%SZ")
# Build server entries from json.state.servers (flat map of hostname → provider_id)
mut servers = {}
let json_servers = ($json.state?.servers? | default {})
for entry in ($json_servers | transpose k v) {
$servers = ($servers | insert $entry.k {
provider_id: ($entry.v | default ""),
provider_state: "unknown",
last_sync: $ts,
taskservs: {},
})
}
let migrated = {
workspace: ($workspace_path | path basename),
cluster: ($json.cluster? | default ""),
schema_version: "2.0",
servers: $servers,
}
state-write $workspace_path $migrated
_print $"Migrated ($json_path) → ($ncl_path)"
_print $"All servers set to provider_state=unknown. Run `provisioning sync` to reconcile."
}
# ─── Inspection ───────────────────────────────────────────────────────────────
# Display workspace state as a table.
# Columns: server | taskserv | state | blocker | operation | actor | started_at | ended_at
export def state-show [
workspace_path: string
--server: string = "" # Filter by hostname
]: nothing -> nothing {
let st = (state-read $workspace_path)
let rows = ($st.servers | transpose hostname srv | each {|s|
if ($server | is-not-empty) and $s.hostname != $server { return [] }
let taskservs = ($s.srv.taskservs? | default {})
if ($taskservs | is-empty) {
return [[hostname taskserv state blocker operation actor started_at ended_at];
[$s.hostname "—" $s.srv.provider_state "" "" "" $s.srv.last_sync ""]]
}
$taskservs | transpose taskserv node | each {|t|
{
hostname: $s.hostname,
taskserv: $t.taskserv,
state: $t.node.state,
blocker: ($t.node.blocker? | default ""),
operation: ($t.node.operation? | default ""),
actor: ($t.node.actor?.identity? | default ""),
started_at: ($t.node.started_at? | default ""),
ended_at: ($t.node.ended_at? | default ""),
}
}
} | flatten)
if ($rows | is-empty) {
print "(no state entries)"
return
}
print ($rows | table)
}
# Reset a node back to 'pending — clears state, blocker, log, and timestamps.
export def state-node-reset [
workspace_path: string
hostname: string
taskserv: string
--source: string = "cli"
--actor: string = ""
]: nothing -> nothing {
let ts = ((date now) | format date "%Y-%m-%dT%H:%M:%SZ")
let actor_id = if ($actor | is-not-empty) { $actor } else { $env.USER? | default "system" }
state-node-set $workspace_path $hostname $taskserv {
state: "pending",
blocker: "",
started_at: "",
ended_at: "",
actor: { identity: $actor_id, source: $source },
log: [{ ts: $ts, event: "reset", source: $source }],
}
}
# Remove a taskserv entry from a server's state entirely.
# Used after `delete` — the taskserv no longer exists on that server.
export def state-node-delete [
workspace_path: string
hostname: string
taskserv: string
]: nothing -> nothing {
mut st = (state-read $workspace_path)
if not ($st.servers | columns | any {|c| $c == $hostname}) { return }
let current_ts = ($st.servers | get $hostname | get -o taskservs | default {})
if not ($current_ts | columns | any {|c| $c == $taskserv}) { return }
$st.servers = ($st.servers | update $hostname {|srv|
$srv | upsert taskservs ($current_ts | reject $taskserv)
})
state-write $workspace_path $st
}
# ─── Drift detection ─────────────────────────────────────────────────────────
# Compare servers.ncl (desired) against .provisioning-state.ncl (tracked).
# Returns a table of drift entries: { server, taskserv, drift, state }.
# drift = "orphaned" — in state but NOT in servers.ncl (was removed)
# drift = "missing" — in servers.ncl but NOT in state (needs create)
# drift = "ok" — present in both
export def state-drift [
workspace_path: string
settings: record
--server: string = ""
]: nothing -> list {
let st = (state-read $workspace_path)
let desired_servers = ($settings.data.servers? | default [])
mut rows = []
for srv in $desired_servers {
if ($server | is-not-empty) and $srv.hostname != $server { continue }
let desired_taskservs = ($srv.taskservs | each {|t| $t.name })
let state_taskservs = ($st.servers
| get -o $srv.hostname | default {}
| get -o taskservs | default {}
| columns)
# Check desired vs state
for ts_name in $desired_taskservs {
if $ts_name in $state_taskservs {
let node = ($st.servers | get $srv.hostname | get taskservs | get $ts_name)
$rows = ($rows | append {
server: $srv.hostname,
taskserv: $ts_name,
drift: "ok",
state: ($node.state? | default "pending"),
})
} else {
$rows = ($rows | append {
server: $srv.hostname,
taskserv: $ts_name,
drift: "missing",
state: "—",
})
}
}
# Orphaned: in state but not in desired
for ts_name in $state_taskservs {
if $ts_name not-in $desired_taskservs {
let node = ($st.servers | get $srv.hostname | get taskservs | get $ts_name)
$rows = ($rows | append {
server: $srv.hostname,
taskserv: $ts_name,
drift: "orphaned",
state: ($node.state? | default "unknown"),
})
}
}
}
# Orphaned servers: in state but not in settings at all
let desired_hostnames = ($desired_servers | each {|s| $s.hostname })
for srv_name in ($st.servers | columns) {
if ($server | is-not-empty) and $srv_name != $server { continue }
if $srv_name not-in $desired_hostnames {
let state_taskservs = ($st.servers | get $srv_name | get -o taskservs | default {} | columns)
for ts_name in $state_taskservs {
let node = ($st.servers | get $srv_name | get taskservs | get $ts_name)
$rows = ($rows | append {
server: $srv_name,
taskserv: $ts_name,
drift: "orphaned",
state: ($node.state? | default "unknown"),
})
}
}
}
$rows
}
# Reconcile .provisioning-state.ncl to match servers.ncl.
# - Removes orphaned taskserv entries (in state but not in servers.ncl)
# - Adds pending entries for new taskservs (in servers.ncl but not in state)
# Returns { removed: list, added: list } for reporting.
export def state-reconcile [
workspace_path: string
settings: record
--server: string = ""
--dry-run
]: nothing -> record {
let drift = (state-drift $workspace_path $settings --server $server)
let orphaned = ($drift | where drift == "orphaned")
let missing = ($drift | where drift == "missing")
if $dry_run {
return { removed: $orphaned, added: $missing }
}
let ts = ((date now) | format date "%Y-%m-%dT%H:%M:%SZ")
# Remove orphaned entries
for entry in $orphaned {
state-node-delete $workspace_path $entry.server $entry.taskserv
}
# Add pending entries for missing taskservs
for entry in $missing {
state-node-set $workspace_path $entry.server $entry.taskserv {
state: "pending",
operation: "create",
profile: "",
started_at: "",
ended_at: "",
blocker: "",
actor: { identity: "system", source: "reconcile" },
log: [{ ts: $ts, event: "reconcile-added", source: "reconcile" }],
}
}
{ removed: $orphaned, added: $missing }
}
# ─── Sync helpers ─────────────────────────────────────────────────────────────
# Mark a server's provider state from an external API response.
# Only writes 'running or 'off — never marks taskservs as completed.
export def state-server-sync [
workspace_path: string
hostname: string
--provider-id: string = ""
--provider-state: string = "unknown"
]: nothing -> nothing {
let ts = ((date now) | format date "%Y-%m-%dT%H:%M:%SZ")
mut st = (state-read $workspace_path)
if not ($st.servers | columns | any {|c| $c == $hostname}) {
$st.servers = ($st.servers | insert $hostname {
provider_id: $provider_id,
provider_state: $provider_state,
last_sync: $ts,
taskservs: {},
})
} else {
$st.servers = ($st.servers | update $hostname {|srv|
$srv | merge {
provider_id: (if ($provider_id | is-not-empty) { $provider_id } else { $srv.provider_id }),
provider_state: $provider_state,
last_sync: $ts,
}
})
}
state-write $workspace_path $st
}