prvng_core/nulib/workflows/management.nu

298 lines
10 KiB
Plaintext
Raw Normal View History

2025-10-07 10:32:04 +01:00
use std
use ../lib_provisioning *
use ../lib_provisioning/platform *
2025-10-07 10:32:04 +01:00
# Comprehensive workflow management commands
# Get orchestrator endpoint from platform configuration or use provided default
def get-orchestrator-url [--orchestrator: string = ""]: nothing -> string {
if ($orchestrator | is-not-empty) {
return $orchestrator
}
# Try to get from platform discovery
try {
service-endpoint "orchestrator"
} catch {
# Fallback to default if no active workspace
"http://localhost:9090"
}
}
# Detect if orchestrator URL is local (for plugin usage)
def use-local-plugin [orchestrator_url: string]: nothing -> bool {
# Check if it's a local endpoint
(detect-platform-mode $orchestrator_url) == "local"
}
2025-10-07 10:32:04 +01:00
# List all active workflows
export def "workflow list" [
--orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided)
2025-10-07 10:32:04 +01:00
--status: string # Filter by status: Pending, Running, Completed, Failed, Cancelled
]: nothing -> table {
let orch_url = (get-orchestrator-url --orchestrator=$orchestrator)
# Use plugin for local orchestrator (10-50x faster)
if (use-local-plugin $orch_url) {
let all_tasks = (orch tasks)
let filtered_tasks = if ($status | is-not-empty) {
$all_tasks | where status == $status
} else {
$all_tasks
}
return ($filtered_tasks | select id status priority created_at workflow_id)
}
# Fall back to HTTP for remote orchestrators
let response = (http get $"($orch_url)/tasks")
2025-10-07 10:32:04 +01:00
if not ($response | get success) {
_print $"Error: (($response | get error))"
return []
}
let tasks = ($response | get data)
let filtered_tasks = if ($status | is-not-empty) {
$tasks | where status == $status
} else {
$tasks
}
$filtered_tasks | select id name status created_at started_at completed_at
}
# Get detailed workflow status
export def "workflow status" [
task_id: string # Task ID to check
--orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided)
2025-10-07 10:32:04 +01:00
]: nothing -> record {
let orch_url = (get-orchestrator-url --orchestrator=$orchestrator)
# Use plugin for local orchestrator (~5ms vs ~50ms with HTTP)
if (use-local-plugin $orch_url) {
let all_tasks = (orch tasks)
let task = ($all_tasks | where id == $task_id | first)
if ($task | is-empty) {
return { error: $"Task ($task_id) not found" }
}
return $task
}
# Fall back to HTTP for remote orchestrators
let response = (http get $"($orch_url)/tasks/($task_id)")
2025-10-07 10:32:04 +01:00
if not ($response | get success) {
return { error: ($response | get error) }
}
($response | get data)
}
# Monitor workflow progress in real-time
export def "workflow monitor" [
task_id: string # Task ID to monitor
--orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided)
2025-10-07 10:32:04 +01:00
]: nothing -> nothing {
let orch_url = (get-orchestrator-url --orchestrator=$orchestrator)
2025-10-07 10:32:04 +01:00
_print $"Monitoring workflow: ($task_id)"
_print "Press Ctrl+C to stop monitoring"
_print ""
while true {
let task = (workflow status $task_id --orchestrator $orch_url)
2025-10-07 10:32:04 +01:00
if ($task | try { get error } catch { null } | is-not-empty) {
2025-10-07 10:32:04 +01:00
_print $"❌ Error getting task status: (($task | get error))"
break
}
let status = ($task | get status)
let created = ($task | get created_at)
let started = ($task | try { get started_at } catch { "Not started") }
let completed = ($task | try { get completed_at } catch { "Not completed") }
2025-10-07 10:32:04 +01:00
clear
_print $"📊 Workflow Status: ($task_id)"
_print $"═══════════════════════════════════════════════════════════════"
_print $"Name: (($task | get name))"
_print $"Status: ($status)"
_print $"Created: ($created)"
_print $"Started: ($started)"
_print $"Completed: ($completed)"
_print ""
match $status {
"Completed" => {
_print "✅ Workflow completed successfully!"
if ($task | try { get output } catch { null } | is-not-empty) {
2025-10-07 10:32:04 +01:00
_print ""
_print "Output:"
_print "───────"
_print ($task | get output)
}
break
},
"Failed" => {
_print "❌ Workflow failed!"
if ($task | try { get error } catch { null } | is-not-empty) {
2025-10-07 10:32:04 +01:00
_print ""
_print "Error:"
_print "──────"
_print ($task | get error)
}
break
},
"Running" => {
_print "🔄 Workflow is running..."
},
"Cancelled" => {
_print "🚫 Workflow was cancelled"
break
},
_ => {
_print $"⏳ Status: ($status)"
}
}
_print ""
_print "Refreshing in 3 seconds... (Ctrl+C to stop)"
sleep 3sec
}
}
# Show workflow statistics
export def "workflow stats" [
--orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided)
2025-10-07 10:32:04 +01:00
]: nothing -> record {
let orch_url = (get-orchestrator-url --orchestrator=$orchestrator)
let tasks = (workflow list --orchestrator $orch_url)
2025-10-07 10:32:04 +01:00
let total = ($tasks | length)
let completed = ($tasks | where status == "Completed" | length)
let failed = ($tasks | where status == "Failed" | length)
let running = ($tasks | where status == "Running" | length)
let pending = ($tasks | where status == "Pending" | length)
let cancelled = ($tasks | where status == "Cancelled" | length)
{
total: $total,
completed: $completed,
failed: $failed,
running: $running,
pending: $pending,
cancelled: $cancelled,
success_rate: (if $total > 0 { ($completed / $total * 100) | math round | into int } else { 0 })
}
}
# Clean up old completed workflows
export def "workflow cleanup" [
--orchestrator: string = "" # Orchestrator URL (optional, uses platform config if not provided)
2025-10-07 10:32:04 +01:00
--days: int = 7 # Remove workflows older than this many days
--dry-run # Show what would be removed without actually removing
]: nothing -> nothing {
let orch_url = (get-orchestrator-url --orchestrator=$orchestrator)
2025-10-07 10:32:04 +01:00
_print $"Cleaning up workflows older than ($days) days..."
let cutoff_date = ((date now) - ($days * 86400 | into duration --unit sec))
let tasks = (workflow list --orchestrator $orch_url)
2025-10-07 10:32:04 +01:00
let old_tasks = ($tasks | where {|task|
let task_date = ($task.completed_at | into datetime)
$task_date < $cutoff_date and ($task.status in ["Completed", "Failed", "Cancelled"])
})
if ($old_tasks | length) == 0 {
_print "No old workflows found for cleanup."
return
}
_print $"Found ($old_tasks | length) workflows for cleanup:"
$old_tasks | select id name status completed_at | table
if $dry_run {
_print ""
_print "Dry run mode - no workflows were actually removed."
_print "Run without --dry-run to perform the cleanup."
} else {
_print ""
_print "Note: Actual cleanup would require orchestrator API support for deletion."
_print "This is a placeholder for future implementation."
}
}
# Orchestrator health and info
export def "workflow orchestrator" [
--orchestrator: string = "http://localhost:8080" # Orchestrator URL
]: nothing -> record {
# Use plugin for local orchestrator (<5ms vs ~50ms with HTTP)
if (use-local-plugin $orchestrator) {
let status = (orch status)
let stats = (workflow stats --orchestrator $orchestrator)
return {
status: (if $status.running { "healthy" } else { "stopped" }),
message: $"Orchestrator running: ($status.running)",
orchestrator_url: $orchestrator,
workflow_stats: $stats,
plugin_mode: true
}
}
# Fall back to HTTP for remote orchestrators
2025-10-07 10:32:04 +01:00
let health_response = (http get $"($orchestrator)/health")
let stats = (workflow stats --orchestrator $orchestrator)
if not ($health_response | get success) {
return {
status: "unreachable",
message: "Cannot connect to orchestrator",
orchestrator_url: $orchestrator
}
}
{
status: "healthy",
message: ($health_response | get data),
orchestrator_url: $orchestrator,
workflow_stats: $stats
}
}
# Submit workflows with dependency management
export def "workflow submit" [
workflow_type: string # server, taskserv, cluster
operation: string # create, delete, etc.
target: string # specific target (server name, taskserv name, etc.)
infra?: string # Infrastructure target
settings?: string # Settings file path
--depends-on: list<string> = [] # Task IDs this workflow depends on
--check (-c) # Check mode only
--wait (-w) # Wait for completion
--orchestrator: string = "http://localhost:8080" # Orchestrator URL
]: nothing -> record {
match $workflow_type {
"server" => {
use server_create.nu
server_create_workflow $infra $settings [$target] --check=$check --wait=$wait --orchestrator $orchestrator
},
"taskserv" => {
use taskserv.nu
taskserv_workflow $target $operation $infra $settings --check=$check --wait=$wait --orchestrator $orchestrator
},
"cluster" => {
use cluster.nu
cluster_workflow $target $operation $infra $settings --check=$check --wait=$wait --orchestrator $orchestrator
},
_ => {
{ status: "error", message: $"Unknown workflow type: ($workflow_type)" }
}
}
}