569 lines
18 KiB
Plaintext
569 lines
18 KiB
Plaintext
use std log
|
|
use ../lib_provisioning *
|
|
use ../lib_provisioning/config/accessor.nu *
|
|
|
|
# Comprehensive Nushell CLI for batch workflow operations
|
|
# Follows PAP: Configuration-driven operations, no hardcoded logic
|
|
# Integration with orchestrator REST API endpoints
|
|
|
|
# Get orchestrator URL from configuration
|
|
def get-orchestrator-url []: nothing -> string {
|
|
config-get "orchestrator.url" "http://localhost:8080"
|
|
}
|
|
|
|
# Get workflow storage backend from configuration
|
|
def get-storage-backend []: nothing -> string {
|
|
config-get "workflows.storage.backend" "filesystem"
|
|
}
|
|
|
|
# Validate KCL workflow definition
|
|
export def "batch validate" [
|
|
workflow_file: string # Path to KCL workflow definition
|
|
--check-syntax (-s) # Check syntax only
|
|
--check-dependencies (-d) # Validate dependencies
|
|
]: nothing -> record {
|
|
_print $"Validating KCL workflow: ($workflow_file)"
|
|
|
|
if not ($workflow_file | path exists) {
|
|
return {
|
|
valid: false,
|
|
error: $"Workflow file not found: ($workflow_file)"
|
|
}
|
|
}
|
|
|
|
let validation_result = {
|
|
valid: false,
|
|
syntax_valid: false,
|
|
dependencies_valid: false,
|
|
errors: [],
|
|
warnings: []
|
|
}
|
|
|
|
# Check KCL syntax
|
|
if $check_syntax or (not $check_dependencies) {
|
|
let kcl_result = (run-external "kcl" ["fmt", "--check", $workflow_file] | complete)
|
|
if $kcl_result.exit_code == 0 {
|
|
$validation_result | update syntax_valid true
|
|
} else {
|
|
$validation_result | update errors ($validation_result.errors | append $"KCL syntax error: ($kcl_result.stderr)")
|
|
}
|
|
}
|
|
|
|
# Check dependencies if requested
|
|
if $check_dependencies {
|
|
let content = (open $workflow_file | from toml)
|
|
if ($content | get -o dependencies | is-not-empty) {
|
|
let deps = ($content | get dependencies)
|
|
let missing_deps = ($deps | where {|dep| not ($dep | path exists) })
|
|
|
|
if ($missing_deps | length) > 0 {
|
|
$validation_result | update dependencies_valid false
|
|
$validation_result | update errors ($validation_result.errors | append $"Missing dependencies: ($missing_deps | str join ', ')")
|
|
} else {
|
|
$validation_result | update dependencies_valid true
|
|
}
|
|
} else {
|
|
$validation_result | update dependencies_valid true
|
|
}
|
|
}
|
|
|
|
# Determine overall validity
|
|
let is_valid = (
|
|
($validation_result.syntax_valid == true) and
|
|
(not $check_dependencies or $validation_result.dependencies_valid == true)
|
|
)
|
|
|
|
$validation_result | update valid $is_valid
|
|
}
|
|
|
|
# Submit KCL workflow to orchestrator
|
|
export def "batch submit" [
|
|
workflow_file: string # Path to KCL workflow definition
|
|
--name (-n): string # Custom workflow name
|
|
--priority: int = 5 # Workflow priority (1-10)
|
|
--environment: string # Target environment (dev/test/prod)
|
|
--wait (-w) # Wait for completion
|
|
--timeout: duration = 30min # Timeout for waiting
|
|
]: nothing -> record {
|
|
let orchestrator_url = (get-orchestrator-url)
|
|
|
|
# Validate workflow first
|
|
let validation = (batch validate $workflow_file --check-syntax --check-dependencies)
|
|
if not $validation.valid {
|
|
return {
|
|
status: "error",
|
|
message: "Workflow validation failed",
|
|
errors: $validation.errors
|
|
}
|
|
}
|
|
|
|
_print $"Submitting workflow: ($workflow_file)"
|
|
|
|
# Parse workflow content
|
|
let workflow_content = (open $workflow_file)
|
|
let workflow_name = if ($name | is-not-empty) {
|
|
$name
|
|
} else {
|
|
($workflow_file | path basename | path parse | get stem)
|
|
}
|
|
|
|
# Prepare submission payload
|
|
let payload = {
|
|
name: $workflow_name,
|
|
workflow_file: $workflow_file,
|
|
content: $workflow_content,
|
|
priority: $priority,
|
|
environment: ($environment | default (config-get "environment" "dev")),
|
|
storage_backend: (get-storage-backend),
|
|
submitted_at: (date now | format date "%Y-%m-%d %H:%M:%S")
|
|
}
|
|
|
|
# Submit to orchestrator
|
|
let response = (http post $"($orchestrator_url)/workflows" $payload)
|
|
|
|
if not ($response | get success) {
|
|
return {
|
|
status: "error",
|
|
message: ($response | get error)
|
|
}
|
|
}
|
|
|
|
let task = ($response | get data)
|
|
let task_id = ($task | get id)
|
|
|
|
_print $"✅ Workflow submitted successfully"
|
|
_print $"Task ID: ($task_id)"
|
|
_print $"Name: ($workflow_name)"
|
|
_print $"Priority: ($priority)"
|
|
|
|
if $wait {
|
|
_print ""
|
|
_print "Waiting for completion..."
|
|
batch monitor $task_id --timeout $timeout
|
|
} else {
|
|
return {
|
|
status: "submitted",
|
|
task_id: $task_id,
|
|
name: $workflow_name,
|
|
message: "Use 'batch monitor' to track progress"
|
|
}
|
|
}
|
|
}
|
|
|
|
# Get workflow status
|
|
export def "batch status" [
|
|
task_id: string # Task ID to check
|
|
--format: string = "table" # Output format: table, json, compact
|
|
]: nothing -> record {
|
|
let orchestrator_url = (get-orchestrator-url)
|
|
let response = (http get $"($orchestrator_url)/workflows/($task_id)")
|
|
|
|
if not ($response | get success) {
|
|
return {
|
|
error: ($response | get error),
|
|
task_id: $task_id
|
|
}
|
|
}
|
|
|
|
let task = ($response | get data)
|
|
|
|
match $format {
|
|
"json" => $task,
|
|
"compact" => {
|
|
_print $"($task.id): ($task.name) [($task.status)]"
|
|
$task
|
|
},
|
|
_ => {
|
|
_print $"📊 Workflow Status"
|
|
_print $"═══════════════════"
|
|
_print $"ID: ($task.id)"
|
|
_print $"Name: ($task.name)"
|
|
_print $"Status: ($task.status)"
|
|
_print $"Created: ($task.created_at)"
|
|
_print $"Started: (($task | get -o started_at | default 'Not started'))"
|
|
_print $"Completed: (($task | get -o completed_at | default 'Not completed'))"
|
|
|
|
if ($task | get -o progress | is-not-empty) {
|
|
_print $"Progress: ($task.progress)%"
|
|
}
|
|
|
|
$task
|
|
}
|
|
}
|
|
}
|
|
|
|
# Real-time monitoring of workflow progress
|
|
export def "batch monitor" [
|
|
task_id: string # Task ID to monitor
|
|
--interval: duration = 3sec # Refresh interval
|
|
--timeout: duration = 30min # Maximum monitoring time
|
|
--quiet (-q) # Minimal output
|
|
]: nothing -> nothing {
|
|
let orchestrator_url = (get-orchestrator-url)
|
|
let start_time = (date now)
|
|
|
|
if not $quiet {
|
|
_print $"🔍 Monitoring workflow: ($task_id)"
|
|
_print "Press Ctrl+C to stop monitoring"
|
|
_print ""
|
|
}
|
|
|
|
while true {
|
|
let elapsed = ((date now) - $start_time)
|
|
if $elapsed > $timeout {
|
|
_print "⏰ Monitoring timeout reached"
|
|
break
|
|
}
|
|
|
|
let task_status = (batch status $task_id --format "compact")
|
|
|
|
if ($task_status | get -o error | is-not-empty) {
|
|
_print $"❌ Error getting task status: (($task_status | get error))"
|
|
break
|
|
}
|
|
|
|
let status = ($task_status | get status)
|
|
|
|
if not $quiet {
|
|
clear
|
|
let progress = ($task_status | get -o progress | default 0)
|
|
let progress_bar = (generate-progress-bar $progress)
|
|
|
|
_print $"🔍 Monitoring: ($task_id)"
|
|
_print $"Status: ($status) ($progress_bar) ($progress)%"
|
|
_print $"Elapsed: ($elapsed)"
|
|
_print ""
|
|
}
|
|
|
|
match $status {
|
|
"Completed" => {
|
|
_print "✅ Workflow completed successfully!"
|
|
if ($task_status | get -o output | is-not-empty) {
|
|
_print ""
|
|
_print "Output:"
|
|
_print "───────"
|
|
_print ($task_status | get output)
|
|
}
|
|
break
|
|
},
|
|
"Failed" => {
|
|
_print "❌ Workflow failed!"
|
|
if ($task_status | get -o error | is-not-empty) {
|
|
_print ""
|
|
_print "Error:"
|
|
_print "──────"
|
|
_print ($task_status | get error)
|
|
}
|
|
break
|
|
},
|
|
"Cancelled" => {
|
|
_print "🚫 Workflow was cancelled"
|
|
break
|
|
},
|
|
_ => {
|
|
if not $quiet {
|
|
_print $"Refreshing in ($interval)... (Ctrl+C to stop)"
|
|
}
|
|
sleep $interval
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
# Generate ASCII progress bar
|
|
def generate-progress-bar [progress: int]: nothing -> string {
|
|
let width = 20
|
|
let filled = ($progress * $width / 100 | math floor)
|
|
let empty = ($width - $filled)
|
|
|
|
let filled_bar = (1..$filled | each { "█" } | str join)
|
|
let empty_bar = (1..$empty | each { "░" } | str join)
|
|
|
|
$"[($filled_bar)($empty_bar)]"
|
|
}
|
|
|
|
# Rollback workflow operations
|
|
export def "batch rollback" [
|
|
task_id: string # Task ID to rollback
|
|
--checkpoint: string # Rollback to specific checkpoint
|
|
--force (-f) # Force rollback without confirmation
|
|
]: nothing -> record {
|
|
let orchestrator_url = (get-orchestrator-url)
|
|
|
|
if not $force {
|
|
let confirm = (input $"Are you sure you want to rollback task ($task_id)? [y/N]: ")
|
|
if $confirm != "y" and $confirm != "Y" {
|
|
return { status: "cancelled", message: "Rollback cancelled by user" }
|
|
}
|
|
}
|
|
|
|
let payload = {
|
|
task_id: $task_id,
|
|
checkpoint: ($checkpoint | default ""),
|
|
force: $force
|
|
}
|
|
|
|
let response = (http post $"($orchestrator_url)/workflows/($task_id)/rollback" $payload)
|
|
|
|
if not ($response | get success) {
|
|
return {
|
|
status: "error",
|
|
message: ($response | get error)
|
|
}
|
|
}
|
|
|
|
_print $"🔄 Rollback initiated for task: ($task_id)"
|
|
($response | get data)
|
|
}
|
|
|
|
# List all workflows with filtering
|
|
export def "batch list" [
|
|
--status: string # Filter by status (Pending, Running, Completed, Failed, Cancelled)
|
|
--environment: string # Filter by environment
|
|
--name: string # Filter by name pattern
|
|
--limit: int = 50 # Maximum number of results
|
|
--format: string = "table" # Output format: table, json, compact
|
|
]: nothing -> table {
|
|
let orchestrator_url = (get-orchestrator-url)
|
|
|
|
# Build query string
|
|
let query_parts = []
|
|
let query_parts = if ($status | is-not-empty) {
|
|
$query_parts | append $"status=($status)"
|
|
} else { $query_parts }
|
|
let query_parts = if ($environment | is-not-empty) {
|
|
$query_parts | append $"environment=($environment)"
|
|
} else { $query_parts }
|
|
let query_parts = if ($name | is-not-empty) {
|
|
$query_parts | append $"name_pattern=($name)"
|
|
} else { $query_parts }
|
|
let query_parts = $query_parts | append $"limit=($limit)"
|
|
|
|
let query_string = if ($query_parts | length) > 0 {
|
|
"?" + ($query_parts | str join "&")
|
|
} else {
|
|
""
|
|
}
|
|
|
|
let response = (http get $"($orchestrator_url)/workflows($query_string)")
|
|
|
|
if not ($response | get success) {
|
|
_print $"❌ Error: (($response | get error))"
|
|
return []
|
|
}
|
|
|
|
let workflows = ($response | get data)
|
|
|
|
match $format {
|
|
"json" => ($workflows | to json),
|
|
"compact" => {
|
|
$workflows | each {|w|
|
|
_print $"($w.id): ($w.name) [($w.status)] (($w.created_at))"
|
|
}
|
|
[]
|
|
},
|
|
_ => {
|
|
$workflows | select id name status environment priority created_at started_at completed_at
|
|
}
|
|
}
|
|
}
|
|
|
|
# Cancel running workflow
|
|
export def "batch cancel" [
|
|
task_id: string # Task ID to cancel
|
|
--reason: string # Cancellation reason
|
|
--force (-f) # Force cancellation
|
|
]: nothing -> record {
|
|
let orchestrator_url = (get-orchestrator-url)
|
|
|
|
let payload = {
|
|
task_id: $task_id,
|
|
reason: ($reason | default "User requested cancellation"),
|
|
force: $force
|
|
}
|
|
|
|
let response = (http post $"($orchestrator_url)/workflows/($task_id)/cancel" $payload)
|
|
|
|
if not ($response | get success) {
|
|
return {
|
|
status: "error",
|
|
message: ($response | get error)
|
|
}
|
|
}
|
|
|
|
_print $"🚫 Cancellation request sent for task: ($task_id)"
|
|
($response | get data)
|
|
}
|
|
|
|
# Manage workflow templates
|
|
export def "batch template" [
|
|
action: string # Action: list, create, delete, show
|
|
template_name?: string # Template name (required for create, delete, show)
|
|
--from-file: string # Create template from file
|
|
--description: string # Template description
|
|
]: nothing -> any {
|
|
let orchestrator_url = (get-orchestrator-url)
|
|
|
|
match $action {
|
|
"list" => {
|
|
let response = (http get $"($orchestrator_url)/templates")
|
|
if ($response | get success) {
|
|
($response | get data) | select name description created_at
|
|
} else {
|
|
_print $"❌ Error: (($response | get error))"
|
|
[]
|
|
}
|
|
},
|
|
"create" => {
|
|
if ($template_name | is-empty) or ($from_file | is-empty) {
|
|
return { error: "Template name and source file are required for creation" }
|
|
}
|
|
|
|
if not ($from_file | path exists) {
|
|
return { error: $"Template file not found: ($from_file)" }
|
|
}
|
|
|
|
let content = (open $from_file)
|
|
let payload = {
|
|
name: $template_name,
|
|
content: $content,
|
|
description: ($description | default "")
|
|
}
|
|
|
|
let response = (http post $"($orchestrator_url)/templates" $payload)
|
|
if ($response | get success) {
|
|
_print $"✅ Template created: ($template_name)"
|
|
($response | get data)
|
|
} else {
|
|
{ error: ($response | get error) }
|
|
}
|
|
},
|
|
"delete" => {
|
|
if ($template_name | is-empty) {
|
|
return { error: "Template name is required for deletion" }
|
|
}
|
|
|
|
let response = (http delete $"($orchestrator_url)/templates/($template_name)")
|
|
if ($response | get success) {
|
|
_print $"✅ Template deleted: ($template_name)"
|
|
($response | get data)
|
|
} else {
|
|
{ error: ($response | get error) }
|
|
}
|
|
},
|
|
"show" => {
|
|
if ($template_name | is-empty) {
|
|
return { error: "Template name is required" }
|
|
}
|
|
|
|
let response = (http get $"($orchestrator_url)/templates/($template_name)")
|
|
if ($response | get success) {
|
|
($response | get data)
|
|
} else {
|
|
{ error: ($response | get error) }
|
|
}
|
|
},
|
|
_ => {
|
|
{ error: $"Unknown template action: ($action). Use: list, create, delete, show" }
|
|
}
|
|
}
|
|
}
|
|
|
|
# Batch workflow statistics and analytics
|
|
export def "batch stats" [
|
|
--period: string = "24h" # Time period: 1h, 24h, 7d, 30d
|
|
--environment: string # Filter by environment
|
|
--detailed (-d) # Show detailed statistics
|
|
]: nothing -> record {
|
|
let orchestrator_url = (get-orchestrator-url)
|
|
|
|
# Build query string
|
|
let query_parts = []
|
|
let query_parts = $query_parts | append $"period=($period)"
|
|
let query_parts = if ($environment | is-not-empty) {
|
|
$query_parts | append $"environment=($environment)"
|
|
} else { $query_parts }
|
|
let query_parts = if $detailed {
|
|
$query_parts | append "detailed=true"
|
|
} else { $query_parts }
|
|
|
|
let query_string = if ($query_parts | length) > 0 {
|
|
"?" + ($query_parts | str join "&")
|
|
} else {
|
|
""
|
|
}
|
|
|
|
let response = (http get $"($orchestrator_url)/workflows/stats($query_string)")
|
|
|
|
if not ($response | get success) {
|
|
return { error: ($response | get error) }
|
|
}
|
|
|
|
let stats = ($response | get data)
|
|
|
|
_print $"📊 Workflow Statistics (($period))"
|
|
_print "══════════════════════════════════"
|
|
_print $"Total Workflows: ($stats.total)"
|
|
_print $"Completed: ($stats.completed) (($stats.success_rate)%)"
|
|
_print $"Failed: ($stats.failed)"
|
|
_print $"Running: ($stats.running)"
|
|
_print $"Pending: ($stats.pending)"
|
|
_print $"Cancelled: ($stats.cancelled)"
|
|
|
|
if $detailed {
|
|
_print ""
|
|
_print "Environment Breakdown:"
|
|
if ($stats | get -o by_environment | is-not-empty) {
|
|
($stats.by_environment) | each {|env|
|
|
_print $" ($env.name): ($env.count) workflows"
|
|
} | ignore
|
|
}
|
|
|
|
_print ""
|
|
_print "Average Execution Time: (($stats | get -o avg_execution_time | default 'N/A'))"
|
|
}
|
|
|
|
$stats
|
|
}
|
|
|
|
# Health check for batch workflow system
|
|
export def "batch health" []: nothing -> record {
|
|
let orchestrator_url = (get-orchestrator-url)
|
|
|
|
let result = (do { http get $"($orchestrator_url)/health" } | complete)
|
|
|
|
if $result.exit_code != 0 {
|
|
_print $"❌ Cannot connect to orchestrator: ($orchestrator_url)"
|
|
{
|
|
status: "unreachable",
|
|
orchestrator_url: $orchestrator_url
|
|
}
|
|
} else {
|
|
let response = ($result.stdout | from json)
|
|
|
|
if ($response | get success) {
|
|
let health_data = ($response | get data)
|
|
_print $"✅ Orchestrator: Healthy"
|
|
_print $"Version: (($health_data | get -o version | default 'Unknown'))"
|
|
_print $"Uptime: (($health_data | get -o uptime | default 'Unknown'))"
|
|
|
|
# Check storage backend
|
|
let storage_backend = (get-storage-backend)
|
|
_print $"Storage Backend: ($storage_backend)"
|
|
|
|
{
|
|
status: "healthy",
|
|
orchestrator: $health_data,
|
|
storage_backend: $storage_backend
|
|
}
|
|
} else {
|
|
_print $"❌ Orchestrator: Unhealthy"
|
|
_print $"Error: (($response | get error))"
|
|
|
|
{
|
|
status: "unhealthy",
|
|
error: ($response | get error)
|
|
}
|
|
}
|
|
}
|
|
} |