703 lines
24 KiB
Plaintext
Raw Normal View History

2025-10-07 10:32:04 +01:00
use std log
use ../lib_provisioning *
use ../lib_provisioning/config/accessor.nu *
use ../lib_provisioning/plugins/auth.nu *
use ../lib_provisioning/platform *
2025-10-07 10:32:04 +01:00
# Comprehensive Nushell CLI for batch workflow operations
# Follows PAP: Configuration-driven operations, no hardcoded logic
# Integration with orchestrator REST API endpoints
# Get orchestrator URL from configuration or platform discovery
def get-orchestrator-url [] {
# First try platform discovery API
let result = (do { service-endpoint "orchestrator" } | complete)
if $result.exit_code != 0 {
# Fall back to config or default
config-get "orchestrator.url" "http://localhost:9090"
} else {
$result.stdout
}
}
# Detect if orchestrator URL is local (for plugin usage)
def use-local-plugin [orchestrator_url: string] {
# Check if it's a local endpoint using platform mode detection
(detect-platform-mode $orchestrator_url) == "local"
2025-10-07 10:32:04 +01:00
}
# Get workflow storage backend from configuration
def get-storage-backend [] {
2025-10-07 10:32:04 +01:00
config-get "workflows.storage.backend" "filesystem"
}
# Validate Nickel workflow definition
2025-10-07 10:32:04 +01:00
export def "batch validate" [
workflow_file: string # Path to Nickel workflow definition
2025-10-07 10:32:04 +01:00
--check-syntax (-s) # Check syntax only
--check-dependencies (-d) # Validate dependencies
] {
_print $"Validating Nickel workflow: ($workflow_file)"
2025-10-07 10:32:04 +01:00
if not ($workflow_file | path exists) {
return {
valid: false,
error: $"Workflow file not found: ($workflow_file)"
}
}
let validation_result = {
valid: false,
syntax_valid: false,
dependencies_valid: false,
errors: [],
warnings: []
}
# Check Nickel syntax
2025-10-07 10:32:04 +01:00
if $check_syntax or (not $check_dependencies) {
let decl_result = (run-external "nickel" ["fmt", "--check", $workflow_file] | complete)
if $decl_result.exit_code == 0 {
2025-10-07 10:32:04 +01:00
$validation_result | update syntax_valid true
} else {
$validation_result | update errors ($validation_result.errors | append $"Nickel syntax error: ($decl_result.stderr)")
2025-10-07 10:32:04 +01:00
}
}
# Check dependencies if requested
if $check_dependencies {
let content = (open $workflow_file | from toml)
let deps_result = (do { $content | get dependencies } | complete)
let deps_data = if $deps_result.exit_code == 0 { $deps_result.stdout } else { null }
if ($deps_data | is-not-empty) {
let deps = $deps_data
2025-10-07 10:32:04 +01:00
let missing_deps = ($deps | where {|dep| not ($dep | path exists) })
if ($missing_deps | length) > 0 {
$validation_result | update dependencies_valid false
$validation_result | update errors ($validation_result.errors | append $"Missing dependencies: ($missing_deps | str join ', ')")
} else {
$validation_result | update dependencies_valid true
}
} else {
$validation_result | update dependencies_valid true
}
}
# Determine overall validity
let is_valid = (
($validation_result.syntax_valid == true) and
(not $check_dependencies or $validation_result.dependencies_valid == true)
)
$validation_result | update valid $is_valid
}
# Submit Nickel workflow to orchestrator
2025-10-07 10:32:04 +01:00
export def "batch submit" [
workflow_file: string # Path to Nickel workflow definition
2025-10-07 10:32:04 +01:00
--name (-n): string # Custom workflow name
--priority: int = 5 # Workflow priority (1-10)
--environment: string # Target environment (dev/test/prod)
--wait (-w) # Wait for completion
--timeout: duration = 30min # Timeout for waiting
--skip-auth # Skip authentication (dev/test only)
] {
2025-10-07 10:32:04 +01:00
let orchestrator_url = (get-orchestrator-url)
# Authentication check for batch workflow submission
let target_env = if ($environment | is-not-empty) {
$environment
} else {
(config-get "environment" "dev")
}
let workflow_name = if ($name | is-not-empty) {
$name
} else {
($workflow_file | path basename | path parse | get stem)
}
let operation_name = $"batch workflow submit: ($workflow_name)"
# Check authentication based on environment
if $target_env == "prod" {
if not $skip_auth {
check-auth-for-production $operation_name --allow-skip
}
} else {
# For dev/test, require auth but allow skip
let allow_skip = (get-config-value "security.bypass.allow_skip_auth" false)
if not $skip_auth and $allow_skip {
require-auth $operation_name --allow-skip
} else if not $skip_auth {
require-auth $operation_name
}
}
# Log the operation for audit trail
if not $skip_auth {
let auth_metadata = (get-auth-metadata)
log-authenticated-operation "batch_workflow_submit" {
workflow_name: $workflow_name
workflow_file: $workflow_file
environment: $target_env
priority: $priority
user: $auth_metadata.username
}
}
2025-10-07 10:32:04 +01:00
# Validate workflow first
let validation = (batch validate $workflow_file --check-syntax --check-dependencies)
if not $validation.valid {
return {
status: "error",
message: "Workflow validation failed",
errors: $validation.errors
}
}
_print $"Submitting workflow: ($workflow_file)"
# Parse workflow content
let workflow_content = (open $workflow_file)
let workflow_name = if ($name | is-not-empty) {
$name
} else {
($workflow_file | path basename | path parse | get stem)
}
# Prepare submission payload
let payload = {
name: $workflow_name,
workflow_file: $workflow_file,
content: $workflow_content,
priority: $priority,
environment: ($environment | default (config-get "environment" "dev")),
storage_backend: (get-storage-backend),
submitted_at: (date now | format date "%Y-%m-%d %H:%M:%S")
}
# Submit to orchestrator
let response = (http post $"($orchestrator_url)/workflows" $payload)
if not ($response | get success) {
return {
status: "error",
message: ($response | get error)
}
}
let task = ($response | get data)
let task_id = ($task | get id)
_print $"✅ Workflow submitted successfully"
_print $"Task ID: ($task_id)"
_print $"Name: ($workflow_name)"
_print $"Priority: ($priority)"
if $wait {
_print ""
_print "Waiting for completion..."
batch monitor $task_id --timeout $timeout
} else {
return {
status: "submitted",
task_id: $task_id,
name: $workflow_name,
message: "Use 'batch monitor' to track progress"
}
}
}
# Get workflow status
export def "batch status" [
task_id: string # Task ID to check
--format: string = "table" # Output format: table, json, compact
] {
2025-10-07 10:32:04 +01:00
let orchestrator_url = (get-orchestrator-url)
# Use plugin for local orchestrator (~5ms vs ~50ms with HTTP)
let task = if (use-local-plugin $orchestrator_url) {
let all_tasks = (orch tasks)
let found = ($all_tasks | where id == $task_id | first)
if ($found | is-empty) {
return { error: $"Task ($task_id) not found", task_id: $task_id }
2025-10-07 10:32:04 +01:00
}
$found
} else {
# Fall back to HTTP for remote orchestrators
let response = (http get $"($orchestrator_url)/workflows/($task_id)")
if not ($response | get success) {
return {
error: ($response | get error),
task_id: $task_id
}
}
($response | get data)
}
2025-10-07 10:32:04 +01:00
match $format {
"json" => $task,
"compact" => {
_print $"($task.id): ($task.name) [($task.status)]"
$task
},
_ => {
_print $"📊 Workflow Status"
_print $"═══════════════════"
_print $"ID: ($task.id)"
_print $"Name: ($task.name)"
_print $"Status: ($task.status)"
_print $"Created: ($task.created_at)"
let started_result = (do { $task | get started_at } | complete)
let started_at = if $started_result.exit_code == 0 { $started_result.stdout } else { "Not started" }
_print $"Started: ($started_at)"
let completed_result = (do { $task | get completed_at } | complete)
let completed_at = if $completed_result.exit_code == 0 { $completed_result.stdout } else { "Not completed" }
_print $"Completed: ($completed_at)"
let progress_result = (do { $task | get progress } | complete)
let progress = if $progress_result.exit_code == 0 { $progress_result.stdout } else { null }
if ($progress | is-not-empty) {
_print $"Progress: ($progress)%"
2025-10-07 10:32:04 +01:00
}
$task
}
}
}
# Real-time monitoring of workflow progress
export def "batch monitor" [
task_id: string # Task ID to monitor
--interval: duration = 3sec # Refresh interval
--timeout: duration = 30min # Maximum monitoring time
--quiet (-q) # Minimal output
] {
2025-10-07 10:32:04 +01:00
let orchestrator_url = (get-orchestrator-url)
let start_time = (date now)
if not $quiet {
_print $"🔍 Monitoring workflow: ($task_id)"
_print "Press Ctrl+C to stop monitoring"
_print ""
}
while true {
let elapsed = ((date now) - $start_time)
if $elapsed > $timeout {
_print "⏰ Monitoring timeout reached"
break
}
let task_status = (batch status $task_id --format "compact")
let error_result = (do { $task_status | get error } | complete)
let task_error = if $error_result.exit_code == 0 { $error_result.stdout } else { null }
if ($task_error | is-not-empty) {
_print $"❌ Error getting task status: ($task_error)"
2025-10-07 10:32:04 +01:00
break
}
let status = ($task_status | get status)
if not $quiet {
clear
let progress_result = (do { $task_status | get progress } | complete)
let progress = if $progress_result.exit_code == 0 { $progress_result.stdout } else { 0 }
2025-10-07 10:32:04 +01:00
let progress_bar = (generate-progress-bar $progress)
_print $"🔍 Monitoring: ($task_id)"
_print $"Status: ($status) ($progress_bar) ($progress)%"
_print $"Elapsed: ($elapsed)"
_print ""
}
match $status {
"Completed" => {
_print "✅ Workflow completed successfully!"
let output_result = (do { $task_status | get output } | complete)
let task_output = if $output_result.exit_code == 0 { $output_result.stdout } else { null }
if ($task_output | is-not-empty) {
2025-10-07 10:32:04 +01:00
_print ""
_print "Output:"
_print "───────"
_print $task_output
2025-10-07 10:32:04 +01:00
}
break
},
"Failed" => {
_print "❌ Workflow failed!"
let error_result = (do { $task_status | get error } | complete)
let task_error = if $error_result.exit_code == 0 { $error_result.stdout } else { null }
if ($task_error | is-not-empty) {
2025-10-07 10:32:04 +01:00
_print ""
_print "Error:"
_print "──────"
_print ($task_status | get error)
}
break
},
"Cancelled" => {
_print "🚫 Workflow was cancelled"
break
},
_ => {
if not $quiet {
_print $"Refreshing in ($interval)... (Ctrl+C to stop)"
}
sleep $interval
}
}
}
}
# Generate ASCII progress bar
def generate-progress-bar [progress: int] {
2025-10-07 10:32:04 +01:00
let width = 20
let filled = ($progress * $width / 100 | math floor)
let empty = ($width - $filled)
let filled_bar = (1..$filled | each { "█" } | str join)
let empty_bar = (1..$empty | each { "░" } | str join)
$"[($filled_bar)($empty_bar)]"
}
# Rollback workflow operations
export def "batch rollback" [
task_id: string # Task ID to rollback
--checkpoint: string # Rollback to specific checkpoint
--force (-f) # Force rollback without confirmation
] {
2025-10-07 10:32:04 +01:00
let orchestrator_url = (get-orchestrator-url)
if not $force {
let confirm = (input $"Are you sure you want to rollback task ($task_id)? [y/N]: ")
if $confirm != "y" and $confirm != "Y" {
return { status: "cancelled", message: "Rollback cancelled by user" }
}
}
let payload = {
task_id: $task_id,
checkpoint: ($checkpoint | default ""),
force: $force
}
let response = (http post $"($orchestrator_url)/workflows/($task_id)/rollback" $payload)
if not ($response | get success) {
return {
status: "error",
message: ($response | get error)
}
}
_print $"🔄 Rollback initiated for task: ($task_id)"
($response | get data)
}
# List all workflows with filtering
export def "batch list" [
--status: string # Filter by status (Pending, Running, Completed, Failed, Cancelled)
--environment: string # Filter by environment
--name: string # Filter by name pattern
--limit: int = 50 # Maximum number of results
--format: string = "table" # Output format: table, json, compact
] {
2025-10-07 10:32:04 +01:00
let orchestrator_url = (get-orchestrator-url)
# Use plugin for local orchestrator (<10ms vs ~50ms with HTTP)
let workflows = if (use-local-plugin $orchestrator_url) {
let all_tasks = (orch tasks)
2025-10-07 10:32:04 +01:00
# Apply filters
let filtered = if ($status | is-not-empty) {
$all_tasks | where status == $status
} else {
$all_tasks
}
# Apply limit
$filtered | first $limit
2025-10-07 10:32:04 +01:00
} else {
# Fall back to HTTP for remote orchestrators
# Build query string
let query_parts = []
let query_parts = if ($status | is-not-empty) {
$query_parts | append $"status=($status)"
} else { $query_parts }
let query_parts = if ($environment | is-not-empty) {
$query_parts | append $"environment=($environment)"
} else { $query_parts }
let query_parts = if ($name | is-not-empty) {
$query_parts | append $"name_pattern=($name)"
} else { $query_parts }
let query_parts = $query_parts | append $"limit=($limit)"
let query_string = if ($query_parts | length) > 0 {
"?" + ($query_parts | str join "&")
} else {
""
}
2025-10-07 10:32:04 +01:00
let response = (http get $"($orchestrator_url)/workflows($query_string)")
2025-10-07 10:32:04 +01:00
if not ($response | get success) {
_print $"❌ Error: (($response | get error))"
return []
}
2025-10-07 10:32:04 +01:00
($response | get data)
}
2025-10-07 10:32:04 +01:00
match $format {
"json" => ($workflows | to json),
"compact" => {
$workflows | each {|w|
_print $"($w.id): ($w.name) [($w.status)] (($w.created_at))"
}
[]
},
_ => {
$workflows | select id name status environment priority created_at started_at completed_at
}
}
}
# Cancel running workflow
export def "batch cancel" [
task_id: string # Task ID to cancel
--reason: string # Cancellation reason
--force (-f) # Force cancellation
] {
2025-10-07 10:32:04 +01:00
let orchestrator_url = (get-orchestrator-url)
let payload = {
task_id: $task_id,
reason: ($reason | default "User requested cancellation"),
force: $force
}
let response = (http post $"($orchestrator_url)/workflows/($task_id)/cancel" $payload)
if not ($response | get success) {
return {
status: "error",
message: ($response | get error)
}
}
_print $"🚫 Cancellation request sent for task: ($task_id)"
($response | get data)
}
# Manage workflow templates
export def "batch template" [
action: string # Action: list, create, delete, show
template_name?: string # Template name (required for create, delete, show)
--from-file: string # Create template from file
--description: string # Template description
] {
2025-10-07 10:32:04 +01:00
let orchestrator_url = (get-orchestrator-url)
match $action {
"list" => {
# HTTP required for template management (no plugin support yet)
2025-10-07 10:32:04 +01:00
let response = (http get $"($orchestrator_url)/templates")
if ($response | get success) {
($response | get data) | select name description created_at
} else {
_print $"❌ Error: (($response | get error))"
[]
}
},
"create" => {
if ($template_name | is-empty) or ($from_file | is-empty) {
return { error: "Template name and source file are required for creation" }
}
if not ($from_file | path exists) {
return { error: $"Template file not found: ($from_file)" }
}
let content = (open $from_file)
let payload = {
name: $template_name,
content: $content,
description: ($description | default "")
}
let response = (http post $"($orchestrator_url)/templates" $payload)
if ($response | get success) {
_print $"✅ Template created: ($template_name)"
($response | get data)
} else {
{ error: ($response | get error) }
}
},
"delete" => {
if ($template_name | is-empty) {
return { error: "Template name is required for deletion" }
}
let response = (http delete $"($orchestrator_url)/templates/($template_name)")
if ($response | get success) {
_print $"✅ Template deleted: ($template_name)"
($response | get data)
} else {
{ error: ($response | get error) }
}
},
"show" => {
if ($template_name | is-empty) {
return { error: "Template name is required" }
}
let response = (http get $"($orchestrator_url)/templates/($template_name)")
if ($response | get success) {
($response | get data)
} else {
{ error: ($response | get error) }
}
},
_ => {
{ error: $"Unknown template action: ($action). Use: list, create, delete, show" }
}
}
}
# Batch workflow statistics and analytics
export def "batch stats" [
--period: string = "24h" # Time period: 1h, 24h, 7d, 30d
--environment: string # Filter by environment
--detailed (-d) # Show detailed statistics
] {
2025-10-07 10:32:04 +01:00
let orchestrator_url = (get-orchestrator-url)
# Build query string
let query_parts = []
let query_parts = $query_parts | append $"period=($period)"
let query_parts = if ($environment | is-not-empty) {
$query_parts | append $"environment=($environment)"
} else { $query_parts }
let query_parts = if $detailed {
$query_parts | append "detailed=true"
} else { $query_parts }
let query_string = if ($query_parts | length) > 0 {
"?" + ($query_parts | str join "&")
} else {
""
}
let response = (http get $"($orchestrator_url)/workflows/stats($query_string)")
if not ($response | get success) {
return { error: ($response | get error) }
}
let stats = ($response | get data)
_print $"📊 Workflow Statistics (($period))"
_print "══════════════════════════════════"
_print $"Total Workflows: ($stats.total)"
_print $"Completed: ($stats.completed) (($stats.success_rate)%)"
_print $"Failed: ($stats.failed)"
_print $"Running: ($stats.running)"
_print $"Pending: ($stats.pending)"
_print $"Cancelled: ($stats.cancelled)"
if $detailed {
_print ""
_print "Environment Breakdown:"
let by_env_result = (do { $stats | get by_environment } | complete)
let by_environment = if $by_env_result.exit_code == 0 { $by_env_result.stdout } else { null }
if ($by_environment | is-not-empty) {
($by_environment) | each {|env|
2025-10-07 10:32:04 +01:00
_print $" ($env.name): ($env.count) workflows"
} | ignore
}
_print ""
let avg_time_result = (do { $stats | get avg_execution_time } | complete)
let avg_execution_time = if $avg_time_result.exit_code == 0 { $avg_time_result.stdout } else { "N/A" }
_print $"Average Execution Time: ($avg_execution_time)"
2025-10-07 10:32:04 +01:00
}
$stats
}
# Health check for batch workflow system
export def "batch health" [] {
2025-10-07 10:32:04 +01:00
let orchestrator_url = (get-orchestrator-url)
# Use plugin for local orchestrator (<5ms vs ~50ms with HTTP)
if (use-local-plugin $orchestrator_url) {
let status = (orch status)
let storage_backend = (get-storage-backend)
_print $"✅ Orchestrator: ($status.running | if $in { 'Running' } else { 'Stopped' })"
_print $"Tasks Pending: ($status.tasks_pending)"
_print $"Tasks Running: ($status.tasks_running)"
_print $"Tasks Completed: ($status.tasks_completed)"
_print $"Storage Backend: ($storage_backend)"
_print $"Plugin Mode: Enabled (10-50x faster)"
return {
status: (if $status.running { "healthy" } else { "stopped" }),
orchestrator: $status,
storage_backend: $storage_backend,
plugin_mode: true
}
}
# Fall back to HTTP for remote orchestrators
2025-10-07 10:32:04 +01:00
let result = (do { http get $"($orchestrator_url)/health" } | complete)
if $result.exit_code != 0 {
_print $"❌ Cannot connect to orchestrator: ($orchestrator_url)"
{
status: "unreachable",
orchestrator_url: $orchestrator_url
}
} else {
let response = ($result.stdout | from json)
if ($response | get success) {
let health_data = ($response | get data)
_print $"✅ Orchestrator: Healthy"
let version_result = (do { $health_data | get version } | complete)
let version = if $version_result.exit_code == 0 { $version_result.stdout } else { "Unknown" }
_print $"Version: ($version)"
let uptime_result = (do { $health_data | get uptime } | complete)
let uptime = if $uptime_result.exit_code == 0 { $uptime_result.stdout } else { "Unknown" }
_print $"Uptime: ($uptime)"
2025-10-07 10:32:04 +01:00
# Check storage backend
let storage_backend = (get-storage-backend)
_print $"Storage Backend: ($storage_backend)"
{
status: "healthy",
orchestrator: $health_data,
storage_backend: $storage_backend
}
} else {
_print $"❌ Orchestrator: Unhealthy"
_print $"Error: (($response | get error))"
{
status: "unhealthy",
error: ($response | get error)
}
}
}
}