prvng_platform/orchestrator/batch_workflow_plan.md
2025-10-07 10:59:52 +01:00

8.6 KiB

⎿  Provider-Agnostic Batch Workflow System Plan

 Architecture Overview

 Create a comprehensive batch workflow system defined in KCL schemas that supports:
 - Multi-provider infrastructure (UpCloud, AWS, local)
 - Taskservs with dependency resolution
 - CRUD operations for servers and services
 - Full observability (logging, monitoring, state tracking)
 - Enterprise features (rollback, retry, SOPS/KMS encryption)

 1. KCL Schema Design (kcl/workflows.k)

 Core Workflow Schemas

 schema WorkflowDef:
     """Batch workflow definition"""
     name: str
     type: "batch" | "sequential" | "parallel" = "batch"
     provider_agnostic: bool = True
     operations: [OperationDef]
     dependencies: [DependencyDef]
     rollback_strategy: RollbackStrategy
     retry_policy: RetryPolicy
     logging: LoggingConfig
     secrets: SecretProvider  # SOPS/KMS support
     templates: [str]  # Template references
     monitoring: MonitoringConfig

 schema OperationDef:
     """Single operation in workflow"""
     id: str
     type: "create" | "update" | "delete" | "check"
     target: "server" | "taskserv" | "cluster"
     providers: [str] = []  # Empty = all providers
     items: [str]  # Server/taskserv names
     depends_on: [str]  # Operation IDs
     parallel: bool = True
     timeout: int = 300
     on_failure: "fail" | "continue" | "rollback" = "rollback"

 schema DependencyDef:
     """Dependency between resources"""
     source: str  # resource.name format
     target: str
     type: "hard" | "soft" | "optional" = "hard"
     wait_condition: str  # Expression to evaluate

 schema RollbackStrategy:
     """Rollback configuration"""
     enabled: bool = True
     on_partial_failure: bool = True
     checkpoint_interval: int = 5  # Operations between checkpoints
     preserve_successful: bool = False

 schema RetryPolicy:
     """Retry configuration"""
     max_attempts: int = 3
     backoff: "linear" | "exponential" = "exponential"
     initial_delay: int = 5
     max_delay: int = 300

 schema LoggingConfig:
     """Logging configuration"""
     level: "debug" | "info" | "warn" | "error" = "info"
     destinations: [str] = ["file", "stdout"]
     format: "json" | "text" = "json"
     retention_days: int = 30
     path: str = "./logs/workflows"

 schema MonitoringConfig:
     """Monitoring and state tracking"""
     enabled: bool = True
     metrics_endpoint: str = "/metrics"
     state_store: "file" | "rkvs" | "postgres" = "rkvs"
     checkpoint_frequency: int = 10  # seconds
     telemetry: TelemetryConfig

 2. Infrastructure Integration (klab/wuji/workflows.k)

 Example Batch Workflow Definition

 import workflows

 batch_create_infrastructure = workflows.WorkflowDef {
     name = "wuji-full-deployment"
     type = "batch"

     operations = [
         # Phase 1: Create servers across providers
         {
             id = "create-servers"
             type = "create"
             target = "server"
             items = ["wuji-cp-0", "wuji-strg-0", "wuji-worker-0"]
             parallel = True  # Create all servers in parallel
         }
         # Phase 2: Install base taskservs
         {
             id = "install-base"
             type = "create"
             target = "taskserv"
             items = ["os", "resolv", "containerd"]
             depends_on = ["create-servers"]
             parallel = True
         }
         # Phase 3: Install Kubernetes
         {
             id = "install-k8s"
             type = "create"
             target = "taskserv"
             items = ["kubernetes"]
             depends_on = ["install-base"]
         }
         # Phase 4: Install cluster services
         {
             id = "install-cluster"
             type = "create"
             target = "taskserv"
             items = ["rook-ceph", "external-nfs", "cilium"]
             depends_on = ["install-k8s"]
             parallel = True
         }
     ]

     rollback_strategy = {
         enabled = True
         on_partial_failure = True
         checkpoint_interval = 2
     }

     retry_policy = {
         max_attempts = 3
         backoff = "exponential"
     }

     secrets = {
         provider = "sops"
         sops_config = {
             age_key_file = "~/.config/sops/age/keys.txt"
         }
     }
 }

 3. Provider & Taskserv Enhancements

 Provider Dependencies (providers/upcloud/kcl/dependencies.k)

 schema ProviderDependencies:
     """Provider-specific dependencies"""
     requires_network: bool = True
     network_before_servers: bool = True
     shared_resources = ["private_network", "security_group"]
     api_rate_limit = 10  # requests per second
     batch_size = 5  # Max parallel operations

 Taskserv Dependencies (taskservs/kubernetes/kcl/dependencies.k)

 schema TaskservDependencies:
     """Taskserv dependency declaration"""
     requires = ["containerd", "resolv"]  # Other taskservs
     conflicts_with = ["docker"]
     minimum_resources = {
         cpu = 2
         memory = 4096
         disk = 20
     }
     health_check = {
         command = "kubectl get nodes"
         timeout = 60
         retries = 5
     }

 4. Orchestrator Enhancement (src/orchestrator/)

 New Modules

 - src/orchestrator/src/workflow.rs: Workflow execution engine
 - src/orchestrator/src/batch.rs: Batch operation coordinator
 - src/orchestrator/src/dependency.rs: Dependency resolver
 - src/orchestrator/src/rollback.rs: Rollback manager
 - src/orchestrator/src/monitor.rs: State tracking & monitoring

 API Endpoints

 POST /workflows/batch        - Submit batch workflow
 GET  /workflows/{id}/status  - Get workflow status
 GET  /workflows/{id}/logs    - Stream workflow logs
 POST /workflows/{id}/rollback - Trigger rollback
 GET  /workflows/{id}/metrics - Get workflow metrics

 5. Nushell Workflow Module (core/nulib/workflows/batch.nu)

 Core Functions

 # Submit batch workflow
 export def "batch submit" [
     workflow_file: string  # KCL workflow definition
     --validate           # Validate only
     --dry-run           # Simulate execution
     --watch             # Monitor progress
 ]

 # Monitor batch status
 export def "batch status" [
     workflow_id: string
     --follow           # Stream updates
     --format: string   # json, yaml, table
 ]

 # Manage batch operations
 export def "batch rollback" [workflow_id: string]
 export def "batch retry" [workflow_id: string, operation_id?: string]
 export def "batch pause" [workflow_id: string]
 export def "batch resume" [workflow_id: string]

 6. Monitoring & Observability

 Features

 - Real-time Progress: WebSocket/SSE for live updates
 - Structured Logging: JSON logs with trace IDs
 - Metrics Export: Prometheus-compatible metrics
 - State Persistence: Checkpoint/restore capability
 - Audit Trail: Complete operation history

 State Tracking

 struct WorkflowState {
     id: String,
     status: WorkflowStatus,
     operations: Vec<OperationState>,
     checkpoints: Vec<Checkpoint>,
     metrics: WorkflowMetrics,
     logs: Vec<LogEntry>,
 }

 7. Templates & Automation

 Template Support

 - Jinja2 templates for operations
 - Variable interpolation from KCL
 - Dynamic resource naming
 - Environment-specific configs

 8. Security Features

 SOPS/KMS Integration

 - Automatic secret decryption
 - Secure credential passing
 - SSH key management
 - API token rotation

 Benefits

 1. Provider Agnostic: Works across UpCloud, AWS, local
 2. Dependency Aware: Automatic resolution and ordering
 3. Fault Tolerant: Retry, rollback, checkpoint/restore
 4. Observable: Full logging, monitoring, state tracking
 5. Scalable: Batch operations with parallelization
 6. Secure: SOPS/KMS encryption, credential management
 7. Declarative: KCL-based configuration
 8. Extensible: Plugin architecture for providers/taskservs

 This design provides enterprise-grade batch workflow capabilities while maintaining the simplicity of KCL declarations and the power of the orchestrator.