prvng_core/nulib/workflows/taskserv.nu
Jesús Pérez 894046ef5a
feat(core): three-layer DAG, unified component arch, commands-registry cache, Nushell 0.112.2 migration
- DAG architecture: `dag show/validate/export` (nulib/main_provisioning/dag.nu),
    config loader (lib_provisioning/config/loader/dag.nu), taskserv dag-executor.
    Backed by schemas/lib/dag/*.ncl; orchestrator emits NATS events via
    WorkspaceComposition::into_workflow. See ADR-020, ADR-021.
  - Unified Component Architecture: components/mod.nu, main_provisioning/
    {components,workflow,extensions,ontoref-queries}.nu. Full workflow engine with
    topological sort and NATS subject emission. Blocks A-H complete (libre-daoshi).
  - Commands-registry: nulib/commands-registry.ncl (Nickel source, 314 lines) +
    JSON cache at ~/.cache/provisioning/commands-registry.json rebuilt on source
    change. cli/provisioning fast-path alias expansion avoids cold Nu startup.
    ADDING_COMMANDS.md documents new-command workflow.
  - Platform service manager: service-manager.nu (+573), startup.nu (+611),
    service-check.nu (+255); autostart/bootstrap/health/target refactored.
  - Nushell 0.112.2 migration: removed all try/catch and bash redirections;
    external commands prefixed with ^; type signatures enforced. Driven by
    scripts/refactor-try-catch{,-simplified}.nu.
  - TTY stack: removed shlib/*-tty.sh; replaced by cli/tty-dispatch.sh,
    tty-filter.sh, tty-commands.conf.
  - New domain modules: images/ (golden image lifecycle), workspace/{state,sync}.nu,
    main_provisioning/{bootstrap,cluster-deploy,fip,state}.nu, commands/{state,
    build,integrations/auth,utilities/alias}.nu, platform.nu expanded (+874).
  - Config loader overhaul: loader/core.nu slimmed (-759), cache/core.nu
    refactored (-454), removed legacy loaders/file_loader.nu (-330).
  - Thirteen new provisioning-<domain>.nu top-level modules for bash dispatcher.
  - Tests: test_workspace_state.nu (+351); updates to test_oci_registry,
    test_services.
  - README + CHANGELOG updated.
2026-04-17 04:27:33 +01:00

206 lines
8 KiB
Text

use std
use ../lib_provisioning *
use ../lib_provisioning/platform *
use ../workspace/state.nu *
# Taskserv workflow definitions
def get-orchestrator-url [--orchestrator: string = ""] {
if ($orchestrator | is-not-empty) {
return $orchestrator
}
if ($env.PROVISIONING_ORCHESTRATOR_URL? | is-not-empty) {
return $env.PROVISIONING_ORCHESTRATOR_URL
}
config-get "platform.orchestrator.url" "http://localhost:9011"
}
# Detect if orchestrator URL is local (for plugin usage)
def use-local-plugin [orchestrator_url: string] {
# Check if it's a local endpoint
(detect-platform-mode $orchestrator_url) == "local"
}
export def taskserv_workflow [
taskserv: string # Taskserv name
operation: string # Operation: create, delete, generate, check-updates
infra?: string # Infrastructure target
settings?: string # Settings file path
--check (-c) # Check mode only
--wait (-w) # Wait for completion
--hostname: string = "" # Server hostname for state tracking
--workspace: string = "" # Workspace path for state file resolution
--actor: string = "" # Identity for audit log (defaults to $env.USER)
--depends-on: list<string> = [] # DAG depends_on list for this node (taskserv names)
--force (-f) # Force execution even if state is 'completed or 'blocked
--orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided)
] {
let orch_url = (get-orchestrator-url --orchestrator=$orchestrator)
let workspace_path = if ($workspace | is-not-empty) { $workspace } else { $env.PWD }
let actor_id = if ($actor | is-not-empty) { $actor } else { $env.USER? | default "system" }
# State gate: check own state + dependency propagation, unless --force
if ($hostname | is-not-empty) and not $force and not $check {
let decision = (state-node-decision-with-deps $workspace_path $hostname $taskserv $depends_on)
match $decision {
"skip" => {
_print $"⊘ ($taskserv) on ($hostname) — completed, skipping"
return { status: "skipped", taskserv: $taskserv, hostname: $hostname }
},
"rerun" => {
_print $"↻ ($taskserv) on ($hostname) — failed, re-running"
},
$d if ($d | str starts-with "blocked:") => {
let blocker = ($d | str replace "blocked:" "")
_print $"⛔ ($taskserv) on ($hostname) — blocked by ($blocker) (state not completed)"
return { status: "blocked", taskserv: $taskserv, hostname: $hostname, blocker: $blocker }
},
_ => {},
}
}
# Write running state before submitting to orchestrator
if ($hostname | is-not-empty) and not $check {
state-node-start $workspace_path $hostname $taskserv
--actor $actor_id
--source "orchestrator"
--operation $operation
}
let workflow_data = {
taskserv: $taskserv,
operation: $operation,
infra: ($infra | default ""),
settings: ($settings | default ""),
check_mode: $check,
wait: $wait,
hostname: $hostname,
}
let response = (do {
http post $"($orch_url)/workflows/taskserv/create" --content-type "application/json" ($workflow_data | to json)
} catch { |e|
# Write failed state on submit error
if ($hostname | is-not-empty) and not $check {
state-node-finish $workspace_path $hostname $taskserv --source "orchestrator"
}
return { status: "error", message: $e.msg }
})
if not ($response | get success) {
if ($hostname | is-not-empty) and not $check {
state-node-finish $workspace_path $hostname $taskserv --source "orchestrator"
}
return { status: "error", message: ($response | get error) }
}
let task_id = ($response | get data)
_print $"Taskserv ($operation) workflow submitted: ($task_id)"
if $wait {
let result = (wait_for_workflow_completion $orch_url $task_id)
if ($hostname | is-not-empty) and not $check {
if $result.status == "completed" {
state-node-finish $workspace_path $hostname $taskserv --success --source "orchestrator"
} else {
state-node-finish $workspace_path $hostname $taskserv --source "orchestrator"
}
}
$result
} else {
{ status: "submitted", task_id: $task_id }
}
}
# Specific taskserv operations
export def "taskserv create" [
taskserv: string # Taskserv name
infra?: string # Infrastructure target
settings?: string # Settings file path
--check (-c) # Check mode only
--wait (-w) # Wait for completion
--orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided)
] {
taskserv_workflow $taskserv "create" $infra $settings --check=$check --wait=$wait --orchestrator $orchestrator
}
export def "taskserv delete" [
taskserv: string # Taskserv name
infra?: string # Infrastructure target
settings?: string # Settings file path
--check (-c) # Check mode only
--wait (-w) # Wait for completion
--orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided)
] {
taskserv_workflow $taskserv "delete" $infra $settings --check=$check --wait=$wait --orchestrator $orchestrator
}
export def "taskserv generate" [
taskserv: string # Taskserv name
infra?: string # Infrastructure target
settings?: string # Settings file path
--check (-c) # Check mode only
--wait (-w) # Wait for completion
--orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided)
] {
taskserv_workflow $taskserv "generate" $infra $settings --check=$check --wait=$wait --orchestrator $orchestrator
}
export def "taskserv check-updates" [
taskserv?: string # Taskserv name (optional for all)
infra?: string # Infrastructure target
settings?: string # Settings file path
--check (-c) # Check mode only
--wait (-w) # Wait for completion
--orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided)
] {
let taskserv_name = ($taskserv | default "")
taskserv_workflow $taskserv_name "check-updates" $infra $settings --check=$check --wait=$wait --orchestrator $orchestrator
}
def wait_for_workflow_completion [orchestrator: string, task_id: string] {
_print "Waiting for workflow completion..."
mut result = { status: "pending" }
while true {
let status_response = (http get $"($orchestrator)/tasks/($task_id)")
if not ($status_response | get success) {
return { status: "error", message: "Failed to get task status" }
}
let task = ($status_response | get data)
let task_status = ($task | get status)
match $task_status {
"Completed" => {
_print $"✅ Workflow completed successfully"
if ($task | get output | is-not-empty) {
_print "Output:"
_print ($task | get output)
}
$result = { status: "completed", task: $task }
break
},
"Failed" => {
_print $"❌ Workflow failed"
if ($task | get error | is-not-empty) {
_print "Error:"
_print ($task | get error)
}
$result = { status: "failed", task: $task }
break
},
"Running" => {
_print $"🔄 Workflow is running..."
},
_ => {
_print $"⏳ Workflow status: ($task_status)"
}
}
sleep 2sec
}
return $result
}