208 lines
8.2 KiB
Text
208 lines
8.2 KiB
Text
use std
|
|
# Selective imports replacing fat-path (ADR-025 Phase 4).
|
|
use lib_provisioning/config/accessor/core.nu [config-get]
|
|
use lib_provisioning/platform/target.nu [detect-platform-mode]
|
|
use lib_provisioning/utils/interface.nu [_print]
|
|
use ../workspace/state.nu *
|
|
|
|
# Taskserv workflow definitions
|
|
|
|
def get-orchestrator-url [--orchestrator: string = ""] {
|
|
if ($orchestrator | is-not-empty) {
|
|
return $orchestrator
|
|
}
|
|
if ($env.PROVISIONING_ORCHESTRATOR_URL? | is-not-empty) {
|
|
return $env.PROVISIONING_ORCHESTRATOR_URL
|
|
}
|
|
config-get "platform.orchestrator.url" "http://localhost:9011"
|
|
}
|
|
|
|
# Detect if orchestrator URL is local (for plugin usage)
|
|
def use-local-plugin [orchestrator_url: string] {
|
|
# Check if it's a local endpoint
|
|
(detect-platform-mode $orchestrator_url) == "local"
|
|
}
|
|
export def taskserv_workflow [
|
|
taskserv: string # Taskserv name
|
|
operation: string # Operation: create, delete, generate, check-updates
|
|
infra?: string # Infrastructure target
|
|
settings?: string # Settings file path
|
|
--check (-c) # Check mode only
|
|
--wait (-w) # Wait for completion
|
|
--hostname: string = "" # Server hostname for state tracking
|
|
--workspace: string = "" # Workspace path for state file resolution
|
|
--actor: string = "" # Identity for audit log (defaults to $env.USER)
|
|
--depends-on: list<string> = [] # DAG depends_on list for this node (taskserv names)
|
|
--force (-f) # Force execution even if state is 'completed or 'blocked
|
|
--orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided)
|
|
] {
|
|
let orch_url = (get-orchestrator-url --orchestrator=$orchestrator)
|
|
let workspace_path = if ($workspace | is-not-empty) { $workspace } else { $env.PWD }
|
|
let actor_id = if ($actor | is-not-empty) { $actor } else { $env.USER? | default "system" }
|
|
|
|
# State gate: check own state + dependency propagation, unless --force
|
|
if ($hostname | is-not-empty) and not $force and not $check {
|
|
let decision = (state-node-decision-with-deps $workspace_path $hostname $taskserv $depends_on)
|
|
match $decision {
|
|
"skip" => {
|
|
_print $"⊘ ($taskserv) on ($hostname) — completed, skipping"
|
|
return { status: "skipped", taskserv: $taskserv, hostname: $hostname }
|
|
},
|
|
"rerun" => {
|
|
_print $"↻ ($taskserv) on ($hostname) — failed, re-running"
|
|
},
|
|
$d if ($d | str starts-with "blocked:") => {
|
|
let blocker = ($d | str replace "blocked:" "")
|
|
_print $"⛔ ($taskserv) on ($hostname) — blocked by ($blocker) (state not completed)"
|
|
return { status: "blocked", taskserv: $taskserv, hostname: $hostname, blocker: $blocker }
|
|
},
|
|
_ => {},
|
|
}
|
|
}
|
|
|
|
# Write running state before submitting to orchestrator
|
|
if ($hostname | is-not-empty) and not $check {
|
|
state-node-start $workspace_path $hostname $taskserv
|
|
--actor $actor_id
|
|
--source "orchestrator"
|
|
--operation $operation
|
|
}
|
|
|
|
let workflow_data = {
|
|
taskserv: $taskserv,
|
|
operation: $operation,
|
|
infra: ($infra | default ""),
|
|
settings: ($settings | default ""),
|
|
check_mode: $check,
|
|
wait: $wait,
|
|
hostname: $hostname,
|
|
}
|
|
|
|
let response = (do {
|
|
http post $"($orch_url)/workflows/taskserv/create" --content-type "application/json" ($workflow_data | to json)
|
|
} catch { |e|
|
|
# Write failed state on submit error
|
|
if ($hostname | is-not-empty) and not $check {
|
|
state-node-finish $workspace_path $hostname $taskserv --source "orchestrator"
|
|
}
|
|
return { status: "error", message: $e.msg }
|
|
})
|
|
|
|
if not ($response | get success) {
|
|
if ($hostname | is-not-empty) and not $check {
|
|
state-node-finish $workspace_path $hostname $taskserv --source "orchestrator"
|
|
}
|
|
return { status: "error", message: ($response | get error) }
|
|
}
|
|
|
|
let task_id = ($response | get data)
|
|
_print $"Taskserv ($operation) workflow submitted: ($task_id)"
|
|
|
|
if $wait {
|
|
let result = (wait_for_workflow_completion $orch_url $task_id)
|
|
if ($hostname | is-not-empty) and not $check {
|
|
if $result.status == "completed" {
|
|
state-node-finish $workspace_path $hostname $taskserv --success --source "orchestrator"
|
|
} else {
|
|
state-node-finish $workspace_path $hostname $taskserv --source "orchestrator"
|
|
}
|
|
}
|
|
$result
|
|
} else {
|
|
{ status: "submitted", task_id: $task_id }
|
|
}
|
|
}
|
|
|
|
# Specific taskserv operations
|
|
export def "taskserv create" [
|
|
taskserv: string # Taskserv name
|
|
infra?: string # Infrastructure target
|
|
settings?: string # Settings file path
|
|
--check (-c) # Check mode only
|
|
--wait (-w) # Wait for completion
|
|
--orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided)
|
|
] {
|
|
taskserv_workflow $taskserv "create" $infra $settings --check=$check --wait=$wait --orchestrator $orchestrator
|
|
}
|
|
|
|
export def "taskserv delete" [
|
|
taskserv: string # Taskserv name
|
|
infra?: string # Infrastructure target
|
|
settings?: string # Settings file path
|
|
--check (-c) # Check mode only
|
|
--wait (-w) # Wait for completion
|
|
--orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided)
|
|
] {
|
|
taskserv_workflow $taskserv "delete" $infra $settings --check=$check --wait=$wait --orchestrator $orchestrator
|
|
}
|
|
|
|
export def "taskserv generate" [
|
|
taskserv: string # Taskserv name
|
|
infra?: string # Infrastructure target
|
|
settings?: string # Settings file path
|
|
--check (-c) # Check mode only
|
|
--wait (-w) # Wait for completion
|
|
--orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided)
|
|
] {
|
|
taskserv_workflow $taskserv "generate" $infra $settings --check=$check --wait=$wait --orchestrator $orchestrator
|
|
}
|
|
|
|
export def "taskserv check-updates" [
|
|
taskserv?: string # Taskserv name (optional for all)
|
|
infra?: string # Infrastructure target
|
|
settings?: string # Settings file path
|
|
--check (-c) # Check mode only
|
|
--wait (-w) # Wait for completion
|
|
--orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided)
|
|
] {
|
|
let taskserv_name = ($taskserv | default "")
|
|
taskserv_workflow $taskserv_name "check-updates" $infra $settings --check=$check --wait=$wait --orchestrator $orchestrator
|
|
}
|
|
|
|
def wait_for_workflow_completion [orchestrator: string, task_id: string] {
|
|
_print "Waiting for workflow completion..."
|
|
|
|
mut result = { status: "pending" }
|
|
|
|
while true {
|
|
let status_response = (http get $"($orchestrator)/tasks/($task_id)")
|
|
|
|
if not ($status_response | get success) {
|
|
return { status: "error", message: "Failed to get task status" }
|
|
}
|
|
|
|
let task = ($status_response | get data)
|
|
let task_status = ($task | get status)
|
|
|
|
match $task_status {
|
|
"Completed" => {
|
|
_print $"✅ Workflow completed successfully"
|
|
if ($task | get output | is-not-empty) {
|
|
_print "Output:"
|
|
_print ($task | get output)
|
|
}
|
|
$result = { status: "completed", task: $task }
|
|
break
|
|
},
|
|
"Failed" => {
|
|
_print $"❌ Workflow failed"
|
|
if ($task | get error | is-not-empty) {
|
|
_print "Error:"
|
|
_print ($task | get error)
|
|
}
|
|
$result = { status: "failed", task: $task }
|
|
break
|
|
},
|
|
"Running" => {
|
|
_print $"🔄 Workflow is running..."
|
|
},
|
|
_ => {
|
|
_print $"⏳ Workflow status: ($task_status)"
|
|
}
|
|
}
|
|
|
|
sleep 2sec
|
|
}
|
|
|
|
return $result
|
|
}
|