569 lines
18 KiB
Plaintext
Raw Normal View History

2025-10-07 10:32:04 +01:00
use std log
use ../lib_provisioning *
use ../lib_provisioning/config/accessor.nu *
# Comprehensive Nushell CLI for batch workflow operations
# Follows PAP: Configuration-driven operations, no hardcoded logic
# Integration with orchestrator REST API endpoints
# Get orchestrator URL from configuration
def get-orchestrator-url []: nothing -> string {
config-get "orchestrator.url" "http://localhost:8080"
}
# Get workflow storage backend from configuration
def get-storage-backend []: nothing -> string {
config-get "workflows.storage.backend" "filesystem"
}
# Validate KCL workflow definition
export def "batch validate" [
workflow_file: string # Path to KCL workflow definition
--check-syntax (-s) # Check syntax only
--check-dependencies (-d) # Validate dependencies
]: nothing -> record {
_print $"Validating KCL workflow: ($workflow_file)"
if not ($workflow_file | path exists) {
return {
valid: false,
error: $"Workflow file not found: ($workflow_file)"
}
}
let validation_result = {
valid: false,
syntax_valid: false,
dependencies_valid: false,
errors: [],
warnings: []
}
# Check KCL syntax
if $check_syntax or (not $check_dependencies) {
let kcl_result = (run-external "kcl" ["fmt", "--check", $workflow_file] | complete)
if $kcl_result.exit_code == 0 {
$validation_result | update syntax_valid true
} else {
$validation_result | update errors ($validation_result.errors | append $"KCL syntax error: ($kcl_result.stderr)")
}
}
# Check dependencies if requested
if $check_dependencies {
let content = (open $workflow_file | from toml)
if ($content | get -o dependencies | is-not-empty) {
let deps = ($content | get dependencies)
let missing_deps = ($deps | where {|dep| not ($dep | path exists) })
if ($missing_deps | length) > 0 {
$validation_result | update dependencies_valid false
$validation_result | update errors ($validation_result.errors | append $"Missing dependencies: ($missing_deps | str join ', ')")
} else {
$validation_result | update dependencies_valid true
}
} else {
$validation_result | update dependencies_valid true
}
}
# Determine overall validity
let is_valid = (
($validation_result.syntax_valid == true) and
(not $check_dependencies or $validation_result.dependencies_valid == true)
)
$validation_result | update valid $is_valid
}
# Submit KCL workflow to orchestrator
export def "batch submit" [
workflow_file: string # Path to KCL workflow definition
--name (-n): string # Custom workflow name
--priority: int = 5 # Workflow priority (1-10)
--environment: string # Target environment (dev/test/prod)
--wait (-w) # Wait for completion
--timeout: duration = 30min # Timeout for waiting
]: nothing -> record {
let orchestrator_url = (get-orchestrator-url)
# Validate workflow first
let validation = (batch validate $workflow_file --check-syntax --check-dependencies)
if not $validation.valid {
return {
status: "error",
message: "Workflow validation failed",
errors: $validation.errors
}
}
_print $"Submitting workflow: ($workflow_file)"
# Parse workflow content
let workflow_content = (open $workflow_file)
let workflow_name = if ($name | is-not-empty) {
$name
} else {
($workflow_file | path basename | path parse | get stem)
}
# Prepare submission payload
let payload = {
name: $workflow_name,
workflow_file: $workflow_file,
content: $workflow_content,
priority: $priority,
environment: ($environment | default (config-get "environment" "dev")),
storage_backend: (get-storage-backend),
submitted_at: (date now | format date "%Y-%m-%d %H:%M:%S")
}
# Submit to orchestrator
let response = (http post $"($orchestrator_url)/workflows" $payload)
if not ($response | get success) {
return {
status: "error",
message: ($response | get error)
}
}
let task = ($response | get data)
let task_id = ($task | get id)
_print $"✅ Workflow submitted successfully"
_print $"Task ID: ($task_id)"
_print $"Name: ($workflow_name)"
_print $"Priority: ($priority)"
if $wait {
_print ""
_print "Waiting for completion..."
batch monitor $task_id --timeout $timeout
} else {
return {
status: "submitted",
task_id: $task_id,
name: $workflow_name,
message: "Use 'batch monitor' to track progress"
}
}
}
# Get workflow status
export def "batch status" [
task_id: string # Task ID to check
--format: string = "table" # Output format: table, json, compact
]: nothing -> record {
let orchestrator_url = (get-orchestrator-url)
let response = (http get $"($orchestrator_url)/workflows/($task_id)")
if not ($response | get success) {
return {
error: ($response | get error),
task_id: $task_id
}
}
let task = ($response | get data)
match $format {
"json" => $task,
"compact" => {
_print $"($task.id): ($task.name) [($task.status)]"
$task
},
_ => {
_print $"📊 Workflow Status"
_print $"═══════════════════"
_print $"ID: ($task.id)"
_print $"Name: ($task.name)"
_print $"Status: ($task.status)"
_print $"Created: ($task.created_at)"
_print $"Started: (($task | get -o started_at | default 'Not started'))"
_print $"Completed: (($task | get -o completed_at | default 'Not completed'))"
if ($task | get -o progress | is-not-empty) {
_print $"Progress: ($task.progress)%"
}
$task
}
}
}
# Real-time monitoring of workflow progress
export def "batch monitor" [
task_id: string # Task ID to monitor
--interval: duration = 3sec # Refresh interval
--timeout: duration = 30min # Maximum monitoring time
--quiet (-q) # Minimal output
]: nothing -> nothing {
let orchestrator_url = (get-orchestrator-url)
let start_time = (date now)
if not $quiet {
_print $"🔍 Monitoring workflow: ($task_id)"
_print "Press Ctrl+C to stop monitoring"
_print ""
}
while true {
let elapsed = ((date now) - $start_time)
if $elapsed > $timeout {
_print "⏰ Monitoring timeout reached"
break
}
let task_status = (batch status $task_id --format "compact")
if ($task_status | get -o error | is-not-empty) {
_print $"❌ Error getting task status: (($task_status | get error))"
break
}
let status = ($task_status | get status)
if not $quiet {
clear
let progress = ($task_status | get -o progress | default 0)
let progress_bar = (generate-progress-bar $progress)
_print $"🔍 Monitoring: ($task_id)"
_print $"Status: ($status) ($progress_bar) ($progress)%"
_print $"Elapsed: ($elapsed)"
_print ""
}
match $status {
"Completed" => {
_print "✅ Workflow completed successfully!"
if ($task_status | get -o output | is-not-empty) {
_print ""
_print "Output:"
_print "───────"
_print ($task_status | get output)
}
break
},
"Failed" => {
_print "❌ Workflow failed!"
if ($task_status | get -o error | is-not-empty) {
_print ""
_print "Error:"
_print "──────"
_print ($task_status | get error)
}
break
},
"Cancelled" => {
_print "🚫 Workflow was cancelled"
break
},
_ => {
if not $quiet {
_print $"Refreshing in ($interval)... (Ctrl+C to stop)"
}
sleep $interval
}
}
}
}
# Generate ASCII progress bar
def generate-progress-bar [progress: int]: nothing -> string {
let width = 20
let filled = ($progress * $width / 100 | math floor)
let empty = ($width - $filled)
let filled_bar = (1..$filled | each { "█" } | str join)
let empty_bar = (1..$empty | each { "░" } | str join)
$"[($filled_bar)($empty_bar)]"
}
# Rollback workflow operations
export def "batch rollback" [
task_id: string # Task ID to rollback
--checkpoint: string # Rollback to specific checkpoint
--force (-f) # Force rollback without confirmation
]: nothing -> record {
let orchestrator_url = (get-orchestrator-url)
if not $force {
let confirm = (input $"Are you sure you want to rollback task ($task_id)? [y/N]: ")
if $confirm != "y" and $confirm != "Y" {
return { status: "cancelled", message: "Rollback cancelled by user" }
}
}
let payload = {
task_id: $task_id,
checkpoint: ($checkpoint | default ""),
force: $force
}
let response = (http post $"($orchestrator_url)/workflows/($task_id)/rollback" $payload)
if not ($response | get success) {
return {
status: "error",
message: ($response | get error)
}
}
_print $"🔄 Rollback initiated for task: ($task_id)"
($response | get data)
}
# List all workflows with filtering
export def "batch list" [
--status: string # Filter by status (Pending, Running, Completed, Failed, Cancelled)
--environment: string # Filter by environment
--name: string # Filter by name pattern
--limit: int = 50 # Maximum number of results
--format: string = "table" # Output format: table, json, compact
]: nothing -> table {
let orchestrator_url = (get-orchestrator-url)
# Build query string
let query_parts = []
let query_parts = if ($status | is-not-empty) {
$query_parts | append $"status=($status)"
} else { $query_parts }
let query_parts = if ($environment | is-not-empty) {
$query_parts | append $"environment=($environment)"
} else { $query_parts }
let query_parts = if ($name | is-not-empty) {
$query_parts | append $"name_pattern=($name)"
} else { $query_parts }
let query_parts = $query_parts | append $"limit=($limit)"
let query_string = if ($query_parts | length) > 0 {
"?" + ($query_parts | str join "&")
} else {
""
}
let response = (http get $"($orchestrator_url)/workflows($query_string)")
if not ($response | get success) {
_print $"❌ Error: (($response | get error))"
return []
}
let workflows = ($response | get data)
match $format {
"json" => ($workflows | to json),
"compact" => {
$workflows | each {|w|
_print $"($w.id): ($w.name) [($w.status)] (($w.created_at))"
}
[]
},
_ => {
$workflows | select id name status environment priority created_at started_at completed_at
}
}
}
# Cancel running workflow
export def "batch cancel" [
task_id: string # Task ID to cancel
--reason: string # Cancellation reason
--force (-f) # Force cancellation
]: nothing -> record {
let orchestrator_url = (get-orchestrator-url)
let payload = {
task_id: $task_id,
reason: ($reason | default "User requested cancellation"),
force: $force
}
let response = (http post $"($orchestrator_url)/workflows/($task_id)/cancel" $payload)
if not ($response | get success) {
return {
status: "error",
message: ($response | get error)
}
}
_print $"🚫 Cancellation request sent for task: ($task_id)"
($response | get data)
}
# Manage workflow templates
export def "batch template" [
action: string # Action: list, create, delete, show
template_name?: string # Template name (required for create, delete, show)
--from-file: string # Create template from file
--description: string # Template description
]: nothing -> any {
let orchestrator_url = (get-orchestrator-url)
match $action {
"list" => {
let response = (http get $"($orchestrator_url)/templates")
if ($response | get success) {
($response | get data) | select name description created_at
} else {
_print $"❌ Error: (($response | get error))"
[]
}
},
"create" => {
if ($template_name | is-empty) or ($from_file | is-empty) {
return { error: "Template name and source file are required for creation" }
}
if not ($from_file | path exists) {
return { error: $"Template file not found: ($from_file)" }
}
let content = (open $from_file)
let payload = {
name: $template_name,
content: $content,
description: ($description | default "")
}
let response = (http post $"($orchestrator_url)/templates" $payload)
if ($response | get success) {
_print $"✅ Template created: ($template_name)"
($response | get data)
} else {
{ error: ($response | get error) }
}
},
"delete" => {
if ($template_name | is-empty) {
return { error: "Template name is required for deletion" }
}
let response = (http delete $"($orchestrator_url)/templates/($template_name)")
if ($response | get success) {
_print $"✅ Template deleted: ($template_name)"
($response | get data)
} else {
{ error: ($response | get error) }
}
},
"show" => {
if ($template_name | is-empty) {
return { error: "Template name is required" }
}
let response = (http get $"($orchestrator_url)/templates/($template_name)")
if ($response | get success) {
($response | get data)
} else {
{ error: ($response | get error) }
}
},
_ => {
{ error: $"Unknown template action: ($action). Use: list, create, delete, show" }
}
}
}
# Batch workflow statistics and analytics
export def "batch stats" [
--period: string = "24h" # Time period: 1h, 24h, 7d, 30d
--environment: string # Filter by environment
--detailed (-d) # Show detailed statistics
]: nothing -> record {
let orchestrator_url = (get-orchestrator-url)
# Build query string
let query_parts = []
let query_parts = $query_parts | append $"period=($period)"
let query_parts = if ($environment | is-not-empty) {
$query_parts | append $"environment=($environment)"
} else { $query_parts }
let query_parts = if $detailed {
$query_parts | append "detailed=true"
} else { $query_parts }
let query_string = if ($query_parts | length) > 0 {
"?" + ($query_parts | str join "&")
} else {
""
}
let response = (http get $"($orchestrator_url)/workflows/stats($query_string)")
if not ($response | get success) {
return { error: ($response | get error) }
}
let stats = ($response | get data)
_print $"📊 Workflow Statistics (($period))"
_print "══════════════════════════════════"
_print $"Total Workflows: ($stats.total)"
_print $"Completed: ($stats.completed) (($stats.success_rate)%)"
_print $"Failed: ($stats.failed)"
_print $"Running: ($stats.running)"
_print $"Pending: ($stats.pending)"
_print $"Cancelled: ($stats.cancelled)"
if $detailed {
_print ""
_print "Environment Breakdown:"
if ($stats | get -o by_environment | is-not-empty) {
($stats.by_environment) | each {|env|
_print $" ($env.name): ($env.count) workflows"
} | ignore
}
_print ""
_print "Average Execution Time: (($stats | get -o avg_execution_time | default 'N/A'))"
}
$stats
}
# Health check for batch workflow system
export def "batch health" []: nothing -> record {
let orchestrator_url = (get-orchestrator-url)
let result = (do { http get $"($orchestrator_url)/health" } | complete)
if $result.exit_code != 0 {
_print $"❌ Cannot connect to orchestrator: ($orchestrator_url)"
{
status: "unreachable",
orchestrator_url: $orchestrator_url
}
} else {
let response = ($result.stdout | from json)
if ($response | get success) {
let health_data = ($response | get data)
_print $"✅ Orchestrator: Healthy"
_print $"Version: (($health_data | get -o version | default 'Unknown'))"
_print $"Uptime: (($health_data | get -o uptime | default 'Unknown'))"
# Check storage backend
let storage_backend = (get-storage-backend)
_print $"Storage Backend: ($storage_backend)"
{
status: "healthy",
orchestrator: $health_data,
storage_backend: $storage_backend
}
} else {
_print $"❌ Orchestrator: Unhealthy"
_print $"Error: (($response | get error))"
{
status: "unhealthy",
error: ($response | get error)
}
}
}
}