use std use ../lib_provisioning * use ../lib_provisioning/platform * use ../workspace/state.nu * # Taskserv workflow definitions def get-orchestrator-url [--orchestrator: string = ""] { if ($orchestrator | is-not-empty) { return $orchestrator } if ($env.PROVISIONING_ORCHESTRATOR_URL? | is-not-empty) { return $env.PROVISIONING_ORCHESTRATOR_URL } config-get "platform.orchestrator.url" "http://localhost:9011" } # Detect if orchestrator URL is local (for plugin usage) def use-local-plugin [orchestrator_url: string] { # Check if it's a local endpoint (detect-platform-mode $orchestrator_url) == "local" } export def taskserv_workflow [ taskserv: string # Taskserv name operation: string # Operation: create, delete, generate, check-updates infra?: string # Infrastructure target settings?: string # Settings file path --check (-c) # Check mode only --wait (-w) # Wait for completion --hostname: string = "" # Server hostname for state tracking --workspace: string = "" # Workspace path for state file resolution --actor: string = "" # Identity for audit log (defaults to $env.USER) --depends-on: list = [] # DAG depends_on list for this node (taskserv names) --force (-f) # Force execution even if state is 'completed or 'blocked --orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided) ] { let orch_url = (get-orchestrator-url --orchestrator=$orchestrator) let workspace_path = if ($workspace | is-not-empty) { $workspace } else { $env.PWD } let actor_id = if ($actor | is-not-empty) { $actor } else { $env.USER? | default "system" } # State gate: check own state + dependency propagation, unless --force if ($hostname | is-not-empty) and not $force and not $check { let decision = (state-node-decision-with-deps $workspace_path $hostname $taskserv $depends_on) match $decision { "skip" => { _print $"⊘ ($taskserv) on ($hostname) — completed, skipping" return { status: "skipped", taskserv: $taskserv, hostname: $hostname } }, "rerun" => { _print $"↻ ($taskserv) on ($hostname) — failed, re-running" }, $d if ($d | str starts-with "blocked:") => { let blocker = ($d | str replace "blocked:" "") _print $"⛔ ($taskserv) on ($hostname) — blocked by ($blocker) (state not completed)" return { status: "blocked", taskserv: $taskserv, hostname: $hostname, blocker: $blocker } }, _ => {}, } } # Write running state before submitting to orchestrator if ($hostname | is-not-empty) and not $check { state-node-start $workspace_path $hostname $taskserv --actor $actor_id --source "orchestrator" --operation $operation } let workflow_data = { taskserv: $taskserv, operation: $operation, infra: ($infra | default ""), settings: ($settings | default ""), check_mode: $check, wait: $wait, hostname: $hostname, } let response = (do { http post $"($orch_url)/workflows/taskserv/create" --content-type "application/json" ($workflow_data | to json) } catch { |e| # Write failed state on submit error if ($hostname | is-not-empty) and not $check { state-node-finish $workspace_path $hostname $taskserv --source "orchestrator" } return { status: "error", message: $e.msg } }) if not ($response | get success) { if ($hostname | is-not-empty) and not $check { state-node-finish $workspace_path $hostname $taskserv --source "orchestrator" } return { status: "error", message: ($response | get error) } } let task_id = ($response | get data) _print $"Taskserv ($operation) workflow submitted: ($task_id)" if $wait { let result = (wait_for_workflow_completion $orch_url $task_id) if ($hostname | is-not-empty) and not $check { if $result.status == "completed" { state-node-finish $workspace_path $hostname $taskserv --success --source "orchestrator" } else { state-node-finish $workspace_path $hostname $taskserv --source "orchestrator" } } $result } else { { status: "submitted", task_id: $task_id } } } # Specific taskserv operations export def "taskserv create" [ taskserv: string # Taskserv name infra?: string # Infrastructure target settings?: string # Settings file path --check (-c) # Check mode only --wait (-w) # Wait for completion --orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided) ] { taskserv_workflow $taskserv "create" $infra $settings --check=$check --wait=$wait --orchestrator $orchestrator } export def "taskserv delete" [ taskserv: string # Taskserv name infra?: string # Infrastructure target settings?: string # Settings file path --check (-c) # Check mode only --wait (-w) # Wait for completion --orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided) ] { taskserv_workflow $taskserv "delete" $infra $settings --check=$check --wait=$wait --orchestrator $orchestrator } export def "taskserv generate" [ taskserv: string # Taskserv name infra?: string # Infrastructure target settings?: string # Settings file path --check (-c) # Check mode only --wait (-w) # Wait for completion --orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided) ] { taskserv_workflow $taskserv "generate" $infra $settings --check=$check --wait=$wait --orchestrator $orchestrator } export def "taskserv check-updates" [ taskserv?: string # Taskserv name (optional for all) infra?: string # Infrastructure target settings?: string # Settings file path --check (-c) # Check mode only --wait (-w) # Wait for completion --orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided) ] { let taskserv_name = ($taskserv | default "") taskserv_workflow $taskserv_name "check-updates" $infra $settings --check=$check --wait=$wait --orchestrator $orchestrator } def wait_for_workflow_completion [orchestrator: string, task_id: string] { _print "Waiting for workflow completion..." mut result = { status: "pending" } while true { let status_response = (http get $"($orchestrator)/tasks/($task_id)") if not ($status_response | get success) { return { status: "error", message: "Failed to get task status" } } let task = ($status_response | get data) let task_status = ($task | get status) match $task_status { "Completed" => { _print $"✅ Workflow completed successfully" if ($task | get output | is-not-empty) { _print "Output:" _print ($task | get output) } $result = { status: "completed", task: $task } break }, "Failed" => { _print $"❌ Workflow failed" if ($task | get error | is-not-empty) { _print "Error:" _print ($task | get error) } $result = { status: "failed", task: $task } break }, "Running" => { _print $"🔄 Workflow is running..." }, _ => { _print $"⏳ Workflow status: ($task_status)" } } sleep 2sec } return $result }