use std log use ../lib_provisioning * use ../lib_provisioning/config/accessor.nu * # Comprehensive Nushell CLI for batch workflow operations # Follows PAP: Configuration-driven operations, no hardcoded logic # Integration with orchestrator REST API endpoints # Get orchestrator URL from configuration def get-orchestrator-url []: nothing -> string { config-get "orchestrator.url" "http://localhost:8080" } # Get workflow storage backend from configuration def get-storage-backend []: nothing -> string { config-get "workflows.storage.backend" "filesystem" } # Validate KCL workflow definition export def "batch validate" [ workflow_file: string # Path to KCL workflow definition --check-syntax (-s) # Check syntax only --check-dependencies (-d) # Validate dependencies ]: nothing -> record { _print $"Validating KCL workflow: ($workflow_file)" if not ($workflow_file | path exists) { return { valid: false, error: $"Workflow file not found: ($workflow_file)" } } let validation_result = { valid: false, syntax_valid: false, dependencies_valid: false, errors: [], warnings: [] } # Check KCL syntax if $check_syntax or (not $check_dependencies) { let kcl_result = (run-external "kcl" ["fmt", "--check", $workflow_file] | complete) if $kcl_result.exit_code == 0 { $validation_result | update syntax_valid true } else { $validation_result | update errors ($validation_result.errors | append $"KCL syntax error: ($kcl_result.stderr)") } } # Check dependencies if requested if $check_dependencies { let content = (open $workflow_file | from toml) if ($content | get -o dependencies | is-not-empty) { let deps = ($content | get dependencies) let missing_deps = ($deps | where {|dep| not ($dep | path exists) }) if ($missing_deps | length) > 0 { $validation_result | update dependencies_valid false $validation_result | update errors ($validation_result.errors | append $"Missing dependencies: ($missing_deps | str join ', ')") } else { $validation_result | update dependencies_valid true } } else { $validation_result | update dependencies_valid true } } # Determine overall validity let is_valid = ( ($validation_result.syntax_valid == true) and (not $check_dependencies or $validation_result.dependencies_valid == true) ) $validation_result | update valid $is_valid } # Submit KCL workflow to orchestrator export def "batch submit" [ workflow_file: string # Path to KCL workflow definition --name (-n): string # Custom workflow name --priority: int = 5 # Workflow priority (1-10) --environment: string # Target environment (dev/test/prod) --wait (-w) # Wait for completion --timeout: duration = 30min # Timeout for waiting ]: nothing -> record { let orchestrator_url = (get-orchestrator-url) # Validate workflow first let validation = (batch validate $workflow_file --check-syntax --check-dependencies) if not $validation.valid { return { status: "error", message: "Workflow validation failed", errors: $validation.errors } } _print $"Submitting workflow: ($workflow_file)" # Parse workflow content let workflow_content = (open $workflow_file) let workflow_name = if ($name | is-not-empty) { $name } else { ($workflow_file | path basename | path parse | get stem) } # Prepare submission payload let payload = { name: $workflow_name, workflow_file: $workflow_file, content: $workflow_content, priority: $priority, environment: ($environment | default (config-get "environment" "dev")), storage_backend: (get-storage-backend), submitted_at: (date now | format date "%Y-%m-%d %H:%M:%S") } # Submit to orchestrator let response = (http post $"($orchestrator_url)/workflows" $payload) if not ($response | get success) { return { status: "error", message: ($response | get error) } } let task = ($response | get data) let task_id = ($task | get id) _print $"✅ Workflow submitted successfully" _print $"Task ID: ($task_id)" _print $"Name: ($workflow_name)" _print $"Priority: ($priority)" if $wait { _print "" _print "Waiting for completion..." batch monitor $task_id --timeout $timeout } else { return { status: "submitted", task_id: $task_id, name: $workflow_name, message: "Use 'batch monitor' to track progress" } } } # Get workflow status export def "batch status" [ task_id: string # Task ID to check --format: string = "table" # Output format: table, json, compact ]: nothing -> record { let orchestrator_url = (get-orchestrator-url) let response = (http get $"($orchestrator_url)/workflows/($task_id)") if not ($response | get success) { return { error: ($response | get error), task_id: $task_id } } let task = ($response | get data) match $format { "json" => $task, "compact" => { _print $"($task.id): ($task.name) [($task.status)]" $task }, _ => { _print $"📊 Workflow Status" _print $"═══════════════════" _print $"ID: ($task.id)" _print $"Name: ($task.name)" _print $"Status: ($task.status)" _print $"Created: ($task.created_at)" _print $"Started: (($task | get -o started_at | default 'Not started'))" _print $"Completed: (($task | get -o completed_at | default 'Not completed'))" if ($task | get -o progress | is-not-empty) { _print $"Progress: ($task.progress)%" } $task } } } # Real-time monitoring of workflow progress export def "batch monitor" [ task_id: string # Task ID to monitor --interval: duration = 3sec # Refresh interval --timeout: duration = 30min # Maximum monitoring time --quiet (-q) # Minimal output ]: nothing -> nothing { let orchestrator_url = (get-orchestrator-url) let start_time = (date now) if not $quiet { _print $"🔍 Monitoring workflow: ($task_id)" _print "Press Ctrl+C to stop monitoring" _print "" } while true { let elapsed = ((date now) - $start_time) if $elapsed > $timeout { _print "⏰ Monitoring timeout reached" break } let task_status = (batch status $task_id --format "compact") if ($task_status | get -o error | is-not-empty) { _print $"❌ Error getting task status: (($task_status | get error))" break } let status = ($task_status | get status) if not $quiet { clear let progress = ($task_status | get -o progress | default 0) let progress_bar = (generate-progress-bar $progress) _print $"🔍 Monitoring: ($task_id)" _print $"Status: ($status) ($progress_bar) ($progress)%" _print $"Elapsed: ($elapsed)" _print "" } match $status { "Completed" => { _print "✅ Workflow completed successfully!" if ($task_status | get -o output | is-not-empty) { _print "" _print "Output:" _print "───────" _print ($task_status | get output) } break }, "Failed" => { _print "❌ Workflow failed!" if ($task_status | get -o error | is-not-empty) { _print "" _print "Error:" _print "──────" _print ($task_status | get error) } break }, "Cancelled" => { _print "🚫 Workflow was cancelled" break }, _ => { if not $quiet { _print $"Refreshing in ($interval)... (Ctrl+C to stop)" } sleep $interval } } } } # Generate ASCII progress bar def generate-progress-bar [progress: int]: nothing -> string { let width = 20 let filled = ($progress * $width / 100 | math floor) let empty = ($width - $filled) let filled_bar = (1..$filled | each { "█" } | str join) let empty_bar = (1..$empty | each { "░" } | str join) $"[($filled_bar)($empty_bar)]" } # Rollback workflow operations export def "batch rollback" [ task_id: string # Task ID to rollback --checkpoint: string # Rollback to specific checkpoint --force (-f) # Force rollback without confirmation ]: nothing -> record { let orchestrator_url = (get-orchestrator-url) if not $force { let confirm = (input $"Are you sure you want to rollback task ($task_id)? [y/N]: ") if $confirm != "y" and $confirm != "Y" { return { status: "cancelled", message: "Rollback cancelled by user" } } } let payload = { task_id: $task_id, checkpoint: ($checkpoint | default ""), force: $force } let response = (http post $"($orchestrator_url)/workflows/($task_id)/rollback" $payload) if not ($response | get success) { return { status: "error", message: ($response | get error) } } _print $"🔄 Rollback initiated for task: ($task_id)" ($response | get data) } # List all workflows with filtering export def "batch list" [ --status: string # Filter by status (Pending, Running, Completed, Failed, Cancelled) --environment: string # Filter by environment --name: string # Filter by name pattern --limit: int = 50 # Maximum number of results --format: string = "table" # Output format: table, json, compact ]: nothing -> table { let orchestrator_url = (get-orchestrator-url) # Build query string let query_parts = [] let query_parts = if ($status | is-not-empty) { $query_parts | append $"status=($status)" } else { $query_parts } let query_parts = if ($environment | is-not-empty) { $query_parts | append $"environment=($environment)" } else { $query_parts } let query_parts = if ($name | is-not-empty) { $query_parts | append $"name_pattern=($name)" } else { $query_parts } let query_parts = $query_parts | append $"limit=($limit)" let query_string = if ($query_parts | length) > 0 { "?" + ($query_parts | str join "&") } else { "" } let response = (http get $"($orchestrator_url)/workflows($query_string)") if not ($response | get success) { _print $"❌ Error: (($response | get error))" return [] } let workflows = ($response | get data) match $format { "json" => ($workflows | to json), "compact" => { $workflows | each {|w| _print $"($w.id): ($w.name) [($w.status)] (($w.created_at))" } [] }, _ => { $workflows | select id name status environment priority created_at started_at completed_at } } } # Cancel running workflow export def "batch cancel" [ task_id: string # Task ID to cancel --reason: string # Cancellation reason --force (-f) # Force cancellation ]: nothing -> record { let orchestrator_url = (get-orchestrator-url) let payload = { task_id: $task_id, reason: ($reason | default "User requested cancellation"), force: $force } let response = (http post $"($orchestrator_url)/workflows/($task_id)/cancel" $payload) if not ($response | get success) { return { status: "error", message: ($response | get error) } } _print $"🚫 Cancellation request sent for task: ($task_id)" ($response | get data) } # Manage workflow templates export def "batch template" [ action: string # Action: list, create, delete, show template_name?: string # Template name (required for create, delete, show) --from-file: string # Create template from file --description: string # Template description ]: nothing -> any { let orchestrator_url = (get-orchestrator-url) match $action { "list" => { let response = (http get $"($orchestrator_url)/templates") if ($response | get success) { ($response | get data) | select name description created_at } else { _print $"❌ Error: (($response | get error))" [] } }, "create" => { if ($template_name | is-empty) or ($from_file | is-empty) { return { error: "Template name and source file are required for creation" } } if not ($from_file | path exists) { return { error: $"Template file not found: ($from_file)" } } let content = (open $from_file) let payload = { name: $template_name, content: $content, description: ($description | default "") } let response = (http post $"($orchestrator_url)/templates" $payload) if ($response | get success) { _print $"✅ Template created: ($template_name)" ($response | get data) } else { { error: ($response | get error) } } }, "delete" => { if ($template_name | is-empty) { return { error: "Template name is required for deletion" } } let response = (http delete $"($orchestrator_url)/templates/($template_name)") if ($response | get success) { _print $"✅ Template deleted: ($template_name)" ($response | get data) } else { { error: ($response | get error) } } }, "show" => { if ($template_name | is-empty) { return { error: "Template name is required" } } let response = (http get $"($orchestrator_url)/templates/($template_name)") if ($response | get success) { ($response | get data) } else { { error: ($response | get error) } } }, _ => { { error: $"Unknown template action: ($action). Use: list, create, delete, show" } } } } # Batch workflow statistics and analytics export def "batch stats" [ --period: string = "24h" # Time period: 1h, 24h, 7d, 30d --environment: string # Filter by environment --detailed (-d) # Show detailed statistics ]: nothing -> record { let orchestrator_url = (get-orchestrator-url) # Build query string let query_parts = [] let query_parts = $query_parts | append $"period=($period)" let query_parts = if ($environment | is-not-empty) { $query_parts | append $"environment=($environment)" } else { $query_parts } let query_parts = if $detailed { $query_parts | append "detailed=true" } else { $query_parts } let query_string = if ($query_parts | length) > 0 { "?" + ($query_parts | str join "&") } else { "" } let response = (http get $"($orchestrator_url)/workflows/stats($query_string)") if not ($response | get success) { return { error: ($response | get error) } } let stats = ($response | get data) _print $"📊 Workflow Statistics (($period))" _print "══════════════════════════════════" _print $"Total Workflows: ($stats.total)" _print $"Completed: ($stats.completed) (($stats.success_rate)%)" _print $"Failed: ($stats.failed)" _print $"Running: ($stats.running)" _print $"Pending: ($stats.pending)" _print $"Cancelled: ($stats.cancelled)" if $detailed { _print "" _print "Environment Breakdown:" if ($stats | get -o by_environment | is-not-empty) { ($stats.by_environment) | each {|env| _print $" ($env.name): ($env.count) workflows" } | ignore } _print "" _print "Average Execution Time: (($stats | get -o avg_execution_time | default 'N/A'))" } $stats } # Health check for batch workflow system export def "batch health" []: nothing -> record { let orchestrator_url = (get-orchestrator-url) let result = (do { http get $"($orchestrator_url)/health" } | complete) if $result.exit_code != 0 { _print $"❌ Cannot connect to orchestrator: ($orchestrator_url)" { status: "unreachable", orchestrator_url: $orchestrator_url } } else { let response = ($result.stdout | from json) if ($response | get success) { let health_data = ($response | get data) _print $"✅ Orchestrator: Healthy" _print $"Version: (($health_data | get -o version | default 'Unknown'))" _print $"Uptime: (($health_data | get -o uptime | default 'Unknown'))" # Check storage backend let storage_backend = (get-storage-backend) _print $"Storage Backend: ($storage_backend)" { status: "healthy", orchestrator: $health_data, storage_backend: $storage_backend } } else { _print $"❌ Orchestrator: Unhealthy" _print $"Error: (($response | get error))" { status: "unhealthy", error: ($response | get error) } } } }