39 KiB
39 KiB
KCL Best Practices for Provisioning
This document outlines best practices for using and developing with the provisioning KCL package, covering schema design, workflow patterns, and operational guidelines.
Table of Contents
- Schema Design
- Workflow Patterns
- Error Handling
- Performance Optimization
- Security Considerations
- Testing Strategies
- Maintenance Guidelines
Schema Design
1. Clear Naming Conventions
# ✅ Good: Descriptive, consistent naming
schema ProductionWebServer:
"""Web server optimized for production workloads"""
hostname: str # Clear, specific field names
fully_qualified_domain_name?: str
environment_classification: "dev" | "staging" | "prod"
cost_allocation_center: str
operational_team_owner: str
# ✅ Good: Consistent prefixes for related schemas
schema K8sDeploymentSpec:
"""Kubernetes deployment specification"""
replica_count: int
container_definitions: [K8sContainerSpec]
volume_mount_configs: [K8sVolumeMountSpec]
schema K8sContainerSpec:
"""Kubernetes container specification"""
image_reference: str
resource_requirements: K8sResourceRequirements
# ❌ Avoid: Ambiguous or inconsistent naming
schema Server: # ❌ Too generic
name: str # ❌ Ambiguous - hostname? display name?
env: str # ❌ Unclear - environment? variables?
cfg: {str: str} # ❌ Cryptic abbreviations
2. Comprehensive Documentation
# ✅ Good: Detailed documentation with examples
schema ServerConfiguration:
"""
Production server configuration following company standards.
This schema defines servers for multi-tier applications with
proper security, monitoring, and operational requirements.
Example:
web_server: ServerConfiguration = ServerConfiguration {
hostname: "prod-web-01"
server_role: "frontend"
environment: "production"
cost_center: "engineering"
}
"""
# Core identification (required)
hostname: str # DNS-compliant hostname (RFC 1123)
server_role: "frontend" | "backend" | "database" | "cache"
# Environment and operational metadata
environment: "development" | "staging" | "production"
cost_center: str # Billing allocation identifier
primary_contact_team: str # Team responsible for maintenance
# Security and compliance
security_zone: "dmz" | "internal" | "restricted"
compliance_requirements: [str] # e.g., ["pci", "sox", "hipaa"]
# Optional operational settings
backup_policy?: str # Backup schedule identifier
monitoring_profile?: str # Monitoring configuration profile
check:
# Hostname validation (DNS RFC 1123)
regex.match(hostname, "^[a-z0-9]([a-z0-9-]{0,61}[a-z0-9])?$"),
"Hostname must be DNS-compliant (RFC 1123): ${hostname}"
# Environment-specific validations
environment == "production" and len(primary_contact_team) > 0,
"Production servers must specify primary contact team"
# Security requirements
security_zone == "restricted" and "encryption" in compliance_requirements,
"Restricted zone servers must have encryption compliance"
# ❌ Avoid: Minimal or missing documentation
schema Srv: # ❌ No documentation
h: str # ❌ No field documentation
t: str # ❌ Cryptic field names
3. Hierarchical Schema Design
# ✅ Good: Base schemas with specialized extensions
schema BaseInfrastructureResource:
"""Foundation schema for all infrastructure resources"""
# Universal metadata
resource_name: str
creation_timestamp?: str
last_modified_timestamp?: str
created_by_user?: str
# Organizational metadata
cost_center: str
project_identifier: str
environment: "dev" | "staging" | "prod"
# Operational metadata
tags: {str: str} = {}
monitoring_enabled: bool = True
check:
len(resource_name) > 0 and len(resource_name) <= 63,
"Resource name must be 1-63 characters"
regex.match(resource_name, "^[a-z0-9]([a-z0-9-]*[a-z0-9])?$"),
"Resource name must be DNS-label compatible"
schema ComputeResource(BaseInfrastructureResource):
"""Compute resources with CPU/memory specifications"""
# Hardware specifications
cpu_cores: int
memory_gigabytes: int
storage_gigabytes: int
# Performance characteristics
cpu_architecture: "x86_64" | "arm64"
performance_tier: "burstable" | "standard" | "high_performance"
check:
cpu_cores > 0 and cpu_cores <= 128,
"CPU cores must be between 1 and 128"
memory_gigabytes > 0 and memory_gigabytes <= 1024,
"Memory must be between 1GB and 1TB"
schema ManagedDatabaseResource(BaseInfrastructureResource):
"""Managed database service configuration"""
# Database specifications
database_engine: "postgresql" | "mysql" | "redis" | "mongodb"
engine_version: str
instance_class: str
# High availability and backup
multi_availability_zone: bool = False
backup_retention_days: int = 7
automated_backup_enabled: bool = True
# Security
encryption_at_rest: bool = True
encryption_in_transit: bool = True
check:
environment == "prod" and multi_availability_zone == True,
"Production databases must enable multi-AZ"
environment == "prod" and backup_retention_days >= 30,
"Production databases need minimum 30 days backup retention"
4. Flexible Configuration Patterns
# ✅ Good: Environment-aware defaults
schema EnvironmentAdaptiveConfiguration:
"""Configuration that adapts based on environment"""
environment: "dev" | "staging" | "prod"
# Computed defaults based on environment
default_timeout_seconds: int = (
environment == "prod" ? 300 : (
environment == "staging" ? 180 : 60
)
)
default_retry_attempts: int = (
environment == "prod" ? 5 : (
environment == "staging" ? 3 : 1
)
)
resource_allocation: ComputeResource = ComputeResource {
resource_name: "default-compute"
cost_center: "shared"
project_identifier: "infrastructure"
environment: environment
# Environment-specific resource sizing
cpu_cores: environment == "prod" ? 4 : (environment == "staging" ? 2 : 1)
memory_gigabytes: environment == "prod" ? 8 : (environment == "staging" ? 4 : 2)
storage_gigabytes: environment == "prod" ? 100 : 50
cpu_architecture: "x86_64"
performance_tier: environment == "prod" ? "high_performance" : "standard"
}
monitoring_configuration: MonitoringConfig = MonitoringConfig {
collection_interval_seconds: environment == "prod" ? 15 : 60
retention_days: environment == "prod" ? 90 : 30
alert_thresholds: environment == "prod" ? "strict" : "relaxed"
}
# ✅ Good: Composable configuration with mixins
schema SecurityMixin:
"""Security-related configuration that can be mixed into other schemas"""
encryption_enabled: bool = True
access_logging_enabled: bool = True
security_scan_enabled: bool = True
# Security-specific validations
check:
encryption_enabled == True,
"Encryption must be enabled for security compliance"
schema ComplianceMixin:
"""Compliance-related configuration"""
compliance_frameworks: [str] = []
audit_logging_enabled: bool = False
data_retention_policy?: str
check:
len(compliance_frameworks) > 0 and audit_logging_enabled == True,
"Compliance frameworks require audit logging"
schema SecureComputeResource(ComputeResource, SecurityMixin, ComplianceMixin):
"""Compute resource with security and compliance requirements"""
# Additional security requirements for compute
secure_boot_enabled: bool = True
encrypted_storage: bool = True
check:
# Inherit all parent validations, plus additional ones
"pci" in compliance_frameworks and encrypted_storage == True,
"PCI compliance requires encrypted storage"
Workflow Patterns
1. Dependency Management
# ✅ Good: Clear dependency patterns with proper error handling
schema InfrastructureWorkflow(main.BatchWorkflow):
"""Infrastructure deployment with proper dependency management"""
# Categorize operations for dependency analysis
foundation_operations: [str] = [] # Network, security groups, etc.
compute_operations: [str] = [] # Servers, instances
service_operations: [str] = [] # Applications, databases
validation_operations: [str] = [] # Testing, health checks
check:
# Foundation must come first
all([
len([dep for dep in op.dependencies or []
if dep.target_operation_id in foundation_operations]) > 0
for op in operations
if op.operation_id in compute_operations
]) or len(compute_operations) == 0,
"Compute operations must depend on foundation operations"
# Services depend on compute
all([
len([dep for dep in op.dependencies or []
if dep.target_operation_id in compute_operations]) > 0
for op in operations
if op.operation_id in service_operations
]) or len(service_operations) == 0,
"Service operations must depend on compute operations"
# Example usage with proper dependency chains
production_deployment: InfrastructureWorkflow = InfrastructureWorkflow {
workflow_id: "prod-infra-2025-001"
name: "Production Infrastructure Deployment"
foundation_operations: ["create_vpc", "setup_security_groups"]
compute_operations: ["create_web_servers", "create_db_servers"]
service_operations: ["install_applications", "configure_databases"]
validation_operations: ["run_health_checks", "validate_connectivity"]
operations: [
# Foundation layer
main.BatchOperation {
operation_id: "create_vpc"
name: "Create VPC and Networking"
operation_type: "custom"
action: "create"
parameters: {"cidr": "10.0.0.0/16"}
priority: 10
timeout: 600
},
# Compute layer (depends on foundation)
main.BatchOperation {
operation_id: "create_web_servers"
name: "Create Web Servers"
operation_type: "server"
action: "create"
parameters: {"count": "3", "type": "web"}
dependencies: [
main.DependencyDef {
target_operation_id: "create_vpc"
dependency_type: "sequential"
timeout: 300
fail_on_dependency_error: True
}
]
priority: 8
timeout: 900
},
# Service layer (depends on compute)
main.BatchOperation {
operation_id: "install_applications"
name: "Install Web Applications"
operation_type: "taskserv"
action: "create"
parameters: {"apps": ["nginx", "prometheus"]}
dependencies: [
main.DependencyDef {
target_operation_id: "create_web_servers"
dependency_type: "conditional"
conditions: ["servers_ready", "ssh_accessible"]
timeout: 600
}
]
priority: 6
}
]
}
2. Multi-Environment Workflows
# ✅ Good: Environment-specific workflow configurations
schema MultiEnvironmentWorkflow:
"""Workflow that adapts to different environments"""
base_workflow: main.BatchWorkflow
target_environment: "dev" | "staging" | "prod"
# Environment-specific overrides
environment_config: EnvironmentConfig = EnvironmentConfig {
environment: target_environment
# Adjust parallelism based on environment
max_parallel: target_environment == "prod" ? 3 : 5
# Adjust timeouts
operation_timeout_multiplier: target_environment == "prod" ? 1.5 : 1.0
# Monitoring intensity
monitoring_level: target_environment == "prod" ? "comprehensive" : "basic"
}
# Generate final workflow with environment adaptations
final_workflow: main.BatchWorkflow = main.BatchWorkflow {
workflow_id: f"{base_workflow.workflow_id}-{target_environment}"
name: f"{base_workflow.name} ({target_environment})"
description: base_workflow.description
operations: [
main.BatchOperation {
operation_id: op.operation_id
name: op.name
operation_type: op.operation_type
provider: op.provider
action: op.action
parameters: op.parameters
dependencies: op.dependencies
# Environment-adapted timeout
timeout: int(op.timeout * environment_config.operation_timeout_multiplier)
# Environment-adapted priority
priority: op.priority
allow_parallel: op.allow_parallel
# Environment-specific retry policy
retry_policy: main.RetryPolicy {
max_attempts: target_environment == "prod" ? 3 : 2
initial_delay: target_environment == "prod" ? 30 : 10
backoff_multiplier: 2
}
}
for op in base_workflow.operations
]
max_parallel_operations: environment_config.max_parallel
global_timeout: base_workflow.global_timeout
fail_fast: target_environment == "prod" ? False : True
# Environment-specific storage
storage: main.StorageConfig {
backend: target_environment == "prod" ? "surrealdb" : "filesystem"
base_path: f"./workflows/{target_environment}"
enable_persistence: target_environment != "dev"
retention_hours: target_environment == "prod" ? 2160 : 168 # 90 days vs 1 week
}
# Environment-specific monitoring
monitoring: main.MonitoringConfig {
enabled: True
backend: "prometheus"
enable_tracing: target_environment == "prod"
enable_notifications: target_environment != "dev"
log_level: target_environment == "dev" ? "debug" : "info"
}
}
# Usage for different environments
dev_deployment: MultiEnvironmentWorkflow = MultiEnvironmentWorkflow {
target_environment: "dev"
base_workflow: main.BatchWorkflow {
workflow_id: "webapp-deploy"
name: "Web Application Deployment"
operations: [
# ... base operations
]
}
}
prod_deployment: MultiEnvironmentWorkflow = MultiEnvironmentWorkflow {
target_environment: "prod"
base_workflow: dev_deployment.base_workflow # Reuse same base workflow
}
3. Error Recovery Patterns
# ✅ Good: Comprehensive error recovery strategy
schema ResilientWorkflow(main.BatchWorkflow):
"""Workflow with advanced error recovery capabilities"""
# Error categorization
critical_operations: [str] = [] # Operations that cannot fail
optional_operations: [str] = [] # Operations that can be skipped
retry_operations: [str] = [] # Operations with custom retry logic
# Recovery strategies
global_error_strategy: "fail_fast" | "continue_on_error" | "intelligent" = "intelligent"
# Enhanced operations with error handling
enhanced_operations: [EnhancedBatchOperation] = [
EnhancedBatchOperation {
base_operation: op
is_critical: op.operation_id in critical_operations
is_optional: op.operation_id in optional_operations
custom_retry: op.operation_id in retry_operations
# Adaptive retry policy based on operation characteristics
adaptive_retry_policy: main.RetryPolicy {
max_attempts: (
is_critical ? 5 : (
is_optional ? 1 : 3
)
)
initial_delay: is_critical ? 60 : 30
max_delay: is_critical ? 900 : 300
backoff_multiplier: 2
retry_on_errors: [
"timeout",
"connection_error",
"rate_limit"
] + (is_critical ? [
"resource_unavailable",
"quota_exceeded"
] : [])
}
# Adaptive rollback strategy
adaptive_rollback_strategy: main.RollbackStrategy {
enabled: True
strategy: is_critical ? "manual" : "immediate"
preserve_partial_state: is_critical
custom_rollback_operations: is_critical ? [
"notify_engineering_team",
"create_incident_ticket",
"preserve_debug_info"
] : []
}
}
for op in operations
]
schema EnhancedBatchOperation:
"""Batch operation with enhanced error handling"""
base_operation: main.BatchOperation
is_critical: bool = False
is_optional: bool = False
custom_retry: bool = False
adaptive_retry_policy: main.RetryPolicy
adaptive_rollback_strategy: main.RollbackStrategy
# Circuit breaker pattern
failure_threshold: int = 3
recovery_timeout_seconds: int = 300
check:
not (is_critical and is_optional),
"Operation cannot be both critical and optional"
Error Handling
1. Graceful Degradation
# ✅ Good: Graceful degradation for non-critical components
schema GracefulDegradationWorkflow(main.BatchWorkflow):
"""Workflow that can degrade gracefully on partial failures"""
# Categorize operations by importance
core_operations: [str] = [] # Must succeed
enhancement_operations: [str] = [] # Nice to have
monitoring_operations: [str] = [] # Can be skipped if needed
# Minimum viable deployment definition
minimum_viable_operations: [str] = core_operations
# Degradation strategy
degradation_policy: DegradationPolicy = DegradationPolicy {
allow_partial_deployment: True
minimum_success_percentage: 80.0
operation_priorities: {
# Core operations (must succeed)
op_id: 10 for op_id in core_operations
} | {
# Enhancement operations (should succeed)
op_id: 5 for op_id in enhancement_operations
} | {
# Monitoring operations (can fail)
op_id: 1 for op_id in monitoring_operations
}
}
check:
# Ensure minimum viable deployment is achievable
len(minimum_viable_operations) > 0,
"Must specify at least one operation for minimum viable deployment"
# Core operations should not depend on enhancement operations
all([
all([
dep.target_operation_id not in enhancement_operations
for dep in op.dependencies or []
])
for op in operations
if op.operation_id in core_operations
]),
"Core operations should not depend on enhancement operations"
schema DegradationPolicy:
"""Policy for graceful degradation"""
allow_partial_deployment: bool = False
minimum_success_percentage: float = 100.0
operation_priorities: {str: int} = {}
# Fallback configurations
fallback_configurations: {str: str} = {}
emergency_contacts: [str] = []
check:
0.0 <= minimum_success_percentage and minimum_success_percentage <= 100.0,
"Success percentage must be between 0 and 100"
2. Circuit Breaker Patterns
# ✅ Good: Circuit breaker for external dependencies
schema CircuitBreakerOperation(main.BatchOperation):
"""Operation with circuit breaker pattern for external dependencies"""
# Circuit breaker configuration
circuit_breaker_enabled: bool = False
failure_threshold: int = 5
recovery_timeout_seconds: int = 300
# Health check configuration
health_check_endpoint?: str
health_check_interval_seconds: int = 30
# Fallback behavior
fallback_enabled: bool = False
fallback_operation?: main.BatchOperation
check:
circuit_breaker_enabled == True and failure_threshold > 0,
"Circuit breaker must have positive failure threshold"
circuit_breaker_enabled == True and recovery_timeout_seconds > 0,
"Circuit breaker must have positive recovery timeout"
fallback_enabled == True and fallback_operation != Undefined,
"Fallback requires fallback operation definition"
# Example: Database operation with circuit breaker
database_operation_with_circuit_breaker: CircuitBreakerOperation = CircuitBreakerOperation {
# Base operation
operation_id: "setup_database"
name: "Setup Production Database"
operation_type: "server"
action: "create"
parameters: {"service": "postgresql", "version": "15"}
timeout: 1800
# Circuit breaker settings
circuit_breaker_enabled: True
failure_threshold: 3
recovery_timeout_seconds: 600
# Health monitoring
health_check_endpoint: "http://db-health.internal/health"
health_check_interval_seconds: 60
# Fallback to read replica
fallback_enabled: True
fallback_operation: main.BatchOperation {
operation_id: "setup_database_readonly"
name: "Setup Read-Only Database Fallback"
operation_type: "server"
action: "create"
parameters: {"service": "postgresql", "mode": "readonly"}
timeout: 900
}
}
Performance Optimization
1. Parallel Execution Strategies
# ✅ Good: Intelligent parallelization
schema OptimizedParallelWorkflow(main.BatchWorkflow):
"""Workflow optimized for parallel execution"""
# Parallel execution groups
parallel_groups: [[str]] = [] # Groups of operations that can run in parallel
# Resource-aware scheduling
resource_requirements: {str: ResourceRequirement} = {}
total_available_resources: ResourceCapacity = ResourceCapacity {
max_cpu_cores: 16
max_memory_gb: 64
max_network_bandwidth_mbps: 1000
max_concurrent_operations: 10
}
# Computed optimal parallelism
optimal_parallel_limit: int = min([
total_available_resources.max_concurrent_operations,
len(operations),
8 # Reasonable default maximum
])
# Generate workflow with optimized settings
optimized_workflow: main.BatchWorkflow = main.BatchWorkflow {
workflow_id: workflow_id
name: name
description: description
operations: [
OptimizedBatchOperation {
base_operation: op
resource_hint: resource_requirements[op.operation_id] or ResourceRequirement {
cpu_cores: 1
memory_gb: 2
estimated_duration_seconds: op.timeout / 2
}
# Enable parallelism for operations in parallel groups
computed_allow_parallel: any([
op.operation_id in group and len(group) > 1
for group in parallel_groups
])
}
for op in operations
]
max_parallel_operations: optimal_parallel_limit
global_timeout: global_timeout
fail_fast: fail_fast
# Optimize storage for performance
storage: main.StorageConfig {
backend: "surrealdb" # Better for concurrent access
enable_compression: False # Trade space for speed
connection_config: {
"connection_pool_size": str(optimal_parallel_limit * 2)
"max_retries": "3"
"timeout": "30"
}
}
}
schema OptimizedBatchOperation:
"""Batch operation with performance optimizations"""
base_operation: main.BatchOperation
resource_hint: ResourceRequirement
computed_allow_parallel: bool
# Performance-optimized operation
optimized_operation: main.BatchOperation = main.BatchOperation {
operation_id: base_operation.operation_id
name: base_operation.name
operation_type: base_operation.operation_type
provider: base_operation.provider
action: base_operation.action
parameters: base_operation.parameters
dependencies: base_operation.dependencies
# Optimized settings
timeout: max([base_operation.timeout, resource_hint.estimated_duration_seconds * 2])
allow_parallel: computed_allow_parallel
priority: base_operation.priority
# Performance-oriented retry policy
retry_policy: main.RetryPolicy {
max_attempts: 2 # Fewer retries for faster failure detection
initial_delay: 10
max_delay: 60
backoff_multiplier: 1.5
retry_on_errors: ["timeout", "rate_limit"] # Only retry fast-failing errors
}
}
schema ResourceRequirement:
"""Resource requirements for performance planning"""
cpu_cores: int = 1
memory_gb: int = 2
estimated_duration_seconds: int = 300
io_intensive: bool = False
network_intensive: bool = False
schema ResourceCapacity:
"""Available resource capacity"""
max_cpu_cores: int
max_memory_gb: int
max_network_bandwidth_mbps: int
max_concurrent_operations: int
2. Caching and Memoization
# ✅ Good: Caching for expensive operations
schema CachedOperation(main.BatchOperation):
"""Operation with caching capabilities"""
# Caching configuration
cache_enabled: bool = False
cache_key_template: str = "${operation_id}-${provider}-${action}"
cache_ttl_seconds: int = 3600 # 1 hour default
# Cache invalidation rules
cache_invalidation_triggers: [str] = []
force_cache_refresh: bool = False
# Computed cache key
computed_cache_key: str = f"{operation_id}-{provider}-{action}"
# Cache-aware timeout (shorter if cache hit expected)
cache_aware_timeout: int = cache_enabled ? timeout / 2 : timeout
check:
cache_enabled == True and cache_ttl_seconds > 0,
"Cache TTL must be positive when caching is enabled"
# Example: Cached provider operations
cached_server_creation: CachedOperation = CachedOperation {
# Base operation
operation_id: "create_standardized_servers"
name: "Create Standardized Web Servers"
operation_type: "server"
provider: "upcloud"
action: "create"
parameters: {
"plan": "2xCPU-4GB"
"zone": "fi-hel2"
"image": "ubuntu-22.04"
}
timeout: 900
# Caching settings
cache_enabled: True
cache_key_template: "server-${plan}-${zone}-${image}"
cache_ttl_seconds: 7200 # 2 hours
# Cache invalidation
cache_invalidation_triggers: ["image_updated", "plan_changed"]
}
Security Considerations
1. Secure Configuration Management
# ✅ Good: Secure configuration with proper secret handling
schema SecureConfiguration:
"""Security-first configuration management"""
# Secret management
secrets_provider: main.SecretProvider = main.SecretProvider {
provider: "sops"
sops_config: main.SopsConfig {
config_path: "./.sops.yaml"
age_key_file: "{{env.HOME}}/.config/sops/age/keys.txt"
use_age: True
}
}
# Security classifications
data_classification: "public" | "internal" | "confidential" | "restricted"
encryption_required: bool = data_classification != "public"
audit_logging_required: bool = data_classification in ["confidential", "restricted"]
# Access control
allowed_environments: [str] = ["dev", "staging", "prod"]
environment_access_matrix: {str: [str]} = {
"dev": ["developers", "qa_team"]
"staging": ["developers", "qa_team", "release_team"]
"prod": ["release_team", "operations_team"]
}
# Network security
network_isolation_required: bool = data_classification in ["confidential", "restricted"]
vpc_isolation: bool = network_isolation_required
private_subnets_only: bool = data_classification == "restricted"
check:
data_classification == "restricted" and encryption_required == True,
"Restricted data must be encrypted"
audit_logging_required == True and len(audit_log_destinations) > 0,
"Audit logging destinations must be specified for sensitive data"
# Example: Production security configuration
production_security: SecureConfiguration = SecureConfiguration {
data_classification: "confidential"
# encryption_required automatically becomes True
# audit_logging_required automatically becomes True
# network_isolation_required automatically becomes True
allowed_environments: ["staging", "prod"]
environment_access_matrix: {
"staging": ["release_team", "security_team"]
"prod": ["operations_team", "security_team"]
}
audit_log_destinations: [
"siem://security.company.com",
"s3://audit-logs-prod/workflows"
]
}
2. Compliance and Auditing
# ✅ Good: Compliance-aware workflow design
schema ComplianceWorkflow(main.BatchWorkflow):
"""Workflow with built-in compliance features"""
# Compliance framework requirements
compliance_frameworks: [str] = []
compliance_metadata: ComplianceMetadata = ComplianceMetadata {
frameworks: compliance_frameworks
audit_trail_required: "sox" in compliance_frameworks or "pci" in compliance_frameworks
data_residency_requirements: "gdpr" in compliance_frameworks ? ["eu"] : []
retention_requirements: get_retention_requirements(compliance_frameworks)
}
# Enhanced workflow with compliance features
compliant_workflow: main.BatchWorkflow = main.BatchWorkflow {
workflow_id: workflow_id
name: name
description: description
operations: [
ComplianceAwareBatchOperation {
base_operation: op
compliance_metadata: compliance_metadata
}.compliant_operation
for op in operations
]
# Compliance-aware storage
storage: main.StorageConfig {
backend: "surrealdb"
enable_persistence: True
retention_hours: compliance_metadata.retention_requirements.workflow_data_hours
enable_compression: False # For audit clarity
encryption: compliance_metadata.audit_trail_required ? main.SecretProvider {
provider: "sops"
sops_config: main.SopsConfig {
config_path: "./.sops.yaml"
age_key_file: "{{env.HOME}}/.config/sops/age/keys.txt"
use_age: True
}
} : Undefined
}
# Compliance-aware monitoring
monitoring: main.MonitoringConfig {
enabled: True
backend: "prometheus"
enable_tracing: compliance_metadata.audit_trail_required
enable_notifications: True
log_level: "info"
collection_interval: compliance_metadata.audit_trail_required ? 15 : 30
}
# Audit trail in execution context
execution_context: execution_context | {
"compliance_frameworks": str(compliance_frameworks)
"audit_trail_enabled": str(compliance_metadata.audit_trail_required)
"data_classification": "confidential"
}
}
schema ComplianceMetadata:
"""Metadata for compliance requirements"""
frameworks: [str]
audit_trail_required: bool
data_residency_requirements: [str]
retention_requirements: RetentionRequirements
schema RetentionRequirements:
"""Data retention requirements based on compliance"""
workflow_data_hours: int = 8760 # 1 year default
audit_log_hours: int = 26280 # 3 years default
backup_retention_hours: int = 43800 # 5 years default
schema ComplianceAwareBatchOperation:
"""Batch operation with compliance awareness"""
base_operation: main.BatchOperation
compliance_metadata: ComplianceMetadata
compliant_operation: main.BatchOperation = main.BatchOperation {
operation_id: base_operation.operation_id
name: base_operation.name
operation_type: base_operation.operation_type
provider: base_operation.provider
action: base_operation.action
parameters: base_operation.parameters | (
compliance_metadata.audit_trail_required ? {
"audit_enabled": "true"
"compliance_mode": "strict"
} : {}
)
dependencies: base_operation.dependencies
timeout: base_operation.timeout
allow_parallel: base_operation.allow_parallel
priority: base_operation.priority
# Enhanced retry for compliance
retry_policy: main.RetryPolicy {
max_attempts: compliance_metadata.audit_trail_required ? 5 : 3
initial_delay: 30
max_delay: 300
backoff_multiplier: 2
retry_on_errors: ["timeout", "connection_error", "rate_limit"]
}
# Conservative rollback for compliance
rollback_strategy: main.RollbackStrategy {
enabled: True
strategy: "manual" # Manual approval for compliance
preserve_partial_state: True
rollback_timeout: 1800
custom_rollback_operations: [
"create_audit_entry",
"notify_compliance_team",
"preserve_evidence"
]
}
}
# Helper function for retention requirements
def get_retention_requirements(frameworks: [str]) -> RetentionRequirements:
"""Get retention requirements based on compliance frameworks"""
if "sox" in frameworks:
return RetentionRequirements {
workflow_data_hours: 43800 # 5 years
audit_log_hours: 61320 # 7 years
backup_retention_hours: 87600 # 10 years
}
elif "pci" in frameworks:
return RetentionRequirements {
workflow_data_hours: 8760 # 1 year
audit_log_hours: 26280 # 3 years
backup_retention_hours: 43800 # 5 years
}
else:
return RetentionRequirements {
workflow_data_hours: 8760 # 1 year default
audit_log_hours: 26280 # 3 years default
backup_retention_hours: 43800 # 5 years default
}
Testing Strategies
1. Schema Testing
#!/bin/bash
# Schema testing script
# Test 1: Basic syntax validation
echo "Testing schema syntax..."
find . -name "*.k" -exec kcl fmt {} \;
# Test 2: Schema compilation
echo "Testing schema compilation..."
for file in *.k; do
echo "Testing $file"
kcl run "$file" > /dev/null || echo "FAILED: $file"
done
# Test 3: Constraint validation
echo "Testing constraints..."
kcl run test_constraints.k
# Test 4: JSON serialization
echo "Testing JSON serialization..."
kcl run examples/simple_workflow.k --format json | jq '.' > /dev/null
# Test 5: Cross-schema compatibility
echo "Testing cross-schema compatibility..."
kcl run integration_test.k
2. Validation Testing
# Test configuration for validation
test_validation_cases: {
# Valid cases
valid_server: main.Server = main.Server {
hostname: "test-01"
title: "Test Server"
labels: "env: test"
user: "test"
}
# Edge cases
minimal_workflow: main.BatchWorkflow = main.BatchWorkflow {
workflow_id: "minimal"
name: "Minimal Test Workflow"
operations: [
main.BatchOperation {
operation_id: "test_op"
name: "Test Operation"
operation_type: "custom"
action: "test"
parameters: {}
}
]
}
# Boundary testing
max_timeout_operation: main.BatchOperation = main.BatchOperation {
operation_id: "max_timeout"
name: "Maximum Timeout Test"
operation_type: "custom"
action: "test"
parameters: {}
timeout: 86400 # 24 hours - test upper boundary
}
}
Maintenance Guidelines
1. Schema Evolution
# ✅ Good: Backward-compatible schema evolution
schema ServerV2(main.Server):
"""Enhanced server schema with backward compatibility"""
# New optional fields (backward compatible)
performance_profile?: "standard" | "high_performance" | "burstable"
auto_scaling_enabled?: bool = False
# Deprecated fields (marked but still supported)
deprecated_field?: str # TODO: Remove in v3.0
# Version metadata
schema_version: str = "2.0"
check:
# Maintain existing validations
len(hostname) > 0, "Hostname required"
len(title) > 0, "Title required"
# New validations for new fields
performance_profile != Undefined and auto_scaling_enabled == True and performance_profile != "burstable",
"Auto-scaling not compatible with burstable performance profile"
# Migration helper
schema ServerMigration:
"""Helper for migrating from ServerV1 to ServerV2"""
v1_server: main.Server
v2_server: ServerV2 = ServerV2 {
# Copy all existing fields
hostname: v1_server.hostname
title: v1_server.title
labels: v1_server.labels
user: v1_server.user
# Set defaults for new fields
performance_profile: "standard"
auto_scaling_enabled: False
# Copy optional fields if they exist
taskservs: v1_server.taskservs
cluster: v1_server.cluster
}
2. Documentation Updates
# ✅ Good: Self-documenting schemas with examples
schema DocumentedWorkflow(main.BatchWorkflow):
"""
Production workflow with comprehensive documentation
This workflow follows company best practices for:
- Multi-environment deployment
- Error handling and recovery
- Security and compliance
- Performance optimization
Example Usage:
prod_workflow: DocumentedWorkflow = DocumentedWorkflow {
environment: "production"
security_level: "high"
base_workflow: main.BatchWorkflow {
workflow_id: "webapp-deploy-001"
name: "Web Application Deployment"
operations: [...]
}
}
See Also:
- examples/production_workflow.k
- docs/WORKFLOW_PATTERNS.md
- docs/SECURITY_GUIDELINES.md
"""
# Required metadata for documentation
environment: "dev" | "staging" | "prod"
security_level: "low" | "medium" | "high"
base_workflow: main.BatchWorkflow
# Auto-generated documentation fields
documentation_generated_at: str = "{{now.date}}"
schema_version: str = "1.0"
check:
environment == "prod" and security_level == "high",
"Production workflows must use high security level"
This comprehensive best practices guide provides the foundation for creating maintainable, secure, and performant KCL configurations for the provisioning system.