853 lines
26 KiB
Rust
Raw Normal View History

2025-10-07 10:59:52 +01:00
//! Batch operation coordinator for infrastructure provisioning
//!
//! This module provides batch coordination capabilities for infrastructure
//! operations across multiple providers (UpCloud, AWS, local). It integrates
//! with the workflow engine and provides higher-level abstractions for common
//! provisioning patterns.
2025-10-07 10:59:52 +01:00
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use anyhow::Result;
use serde::{Deserialize, Serialize};
2025-10-07 10:59:52 +01:00
use tracing::info;
use uuid::Uuid;
use crate::{
storage::TaskStorage,
workflow::{
BatchWorkflowEngine, WorkflowConfig, WorkflowDefinition, WorkflowExecutionState,
WorkflowTaskDefinition,
},
Args, TaskStatus,
};
/// Configuration for batch operations
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchConfig {
/// Workflow execution configuration
pub workflow: WorkflowConfig,
/// Provider-specific settings
pub providers: HashMap<String, ProviderConfig>,
/// Infrastructure targets
pub infrastructure: HashMap<String, InfrastructureConfig>,
/// Default provisioning settings
pub defaults: BatchDefaults,
}
impl Default for BatchConfig {
fn default() -> Self {
let mut providers = HashMap::new();
providers.insert("upcloud".to_string(), ProviderConfig::default());
providers.insert("aws".to_string(), ProviderConfig::default());
providers.insert("local".to_string(), ProviderConfig::default());
Self {
workflow: WorkflowConfig::default(),
providers,
infrastructure: HashMap::new(),
defaults: BatchDefaults::default(),
}
}
}
/// Provider-specific configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProviderConfig {
/// Provider type
pub provider_type: String,
/// Maximum concurrent operations
pub max_concurrent: usize,
/// Timeout for operations in seconds
pub timeout_seconds: u64,
/// Retry configuration
pub retry_config: RetryConfig,
/// Provider-specific settings
pub settings: HashMap<String, String>,
}
impl Default for ProviderConfig {
fn default() -> Self {
Self {
provider_type: "unknown".to_string(),
max_concurrent: 2,
timeout_seconds: 3600,
retry_config: RetryConfig::default(),
settings: HashMap::new(),
}
}
}
/// Retry configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryConfig {
pub max_attempts: u8,
pub base_delay_seconds: u64,
pub max_delay_seconds: u64,
pub exponential_backoff: bool,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_attempts: 3,
base_delay_seconds: 30,
max_delay_seconds: 600,
exponential_backoff: true,
}
}
}
/// Infrastructure configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InfrastructureConfig {
/// Infrastructure name
pub name: String,
/// Provider type
pub provider: String,
/// Settings file path
pub settings_path: String,
/// Environment (dev, test, prod)
pub environment: String,
/// Target servers or resources
pub targets: Vec<String>,
/// Metadata for the infrastructure
pub metadata: HashMap<String, String>,
}
/// Default batch operation settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchDefaults {
/// Default check mode
pub check_mode: bool,
/// Default wait for completion
pub wait: bool,
/// Default provisioning script path
pub provisioning_path: String,
/// Default nushell executable path
pub nu_path: String,
/// Common environment variables
pub environment: HashMap<String, String>,
}
impl Default for BatchDefaults {
fn default() -> Self {
let mut environment = HashMap::new();
environment.insert("PROVISIONING_LOG_LEVEL".to_string(), "INFO".to_string());
Self {
check_mode: false,
wait: true,
provisioning_path: "./core/nulib/provisioning".to_string(),
nu_path: "nu".to_string(),
environment,
}
}
}
/// Batch operation type
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BatchOperationType {
/// Server operations
Servers {
operation: ServerOperation,
servers: Vec<String>,
},
/// Task service operations
TaskServices {
operation: TaskServiceOperation,
services: Vec<String>,
},
/// Cluster operations
Clusters {
operation: ClusterOperation,
clusters: Vec<String>,
},
/// Custom workflow
CustomWorkflow { workflow: WorkflowDefinition },
2025-10-07 10:59:52 +01:00
}
/// Server operation types
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ServerOperation {
Create,
Delete,
Update,
List,
Start,
Stop,
Restart,
}
/// Task service operation types
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TaskServiceOperation {
Create,
Delete,
Update,
Generate,
CheckUpdates,
ListVersions,
}
/// Cluster operation types
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ClusterOperation {
Create,
Delete,
Update,
Scale,
Backup,
Restore,
}
/// Batch operation request
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchOperationRequest {
/// Operation identifier
pub operation_id: Option<String>,
/// Operation type
pub operation_type: BatchOperationType,
/// Target infrastructure
pub infrastructure: String,
/// Operation configuration
pub config: Option<BatchConfig>,
/// Additional metadata
pub metadata: Option<HashMap<String, String>>,
}
/// Batch operation result
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchOperationResult {
/// Operation identifier
pub operation_id: String,
/// Workflow execution state
pub execution_state: WorkflowExecutionState,
/// Individual task results
pub task_results: HashMap<String, TaskResult>,
/// Operation summary
pub summary: OperationSummary,
}
/// Individual task result
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskResult {
pub task_id: String,
pub status: TaskStatus,
pub output: Option<String>,
pub error: Option<String>,
pub duration_ms: Option<u64>,
pub provider: Option<String>,
}
/// Operation summary
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OperationSummary {
pub total_tasks: usize,
pub successful_tasks: usize,
pub failed_tasks: usize,
pub skipped_tasks: usize,
pub total_duration_ms: u64,
pub errors: Vec<String>,
}
/// Main batch operation coordinator
pub struct BatchCoordinator {
workflow_engine: BatchWorkflowEngine,
storage: Arc<dyn TaskStorage>,
config: BatchConfig,
args: Args,
}
impl BatchCoordinator {
/// Create new batch coordinator
pub fn new(storage: Arc<dyn TaskStorage>, config: BatchConfig, args: Args) -> Self {
let workflow_engine = BatchWorkflowEngine::new(storage.clone(), config.workflow.clone());
2025-10-07 10:59:52 +01:00
Self {
workflow_engine,
storage,
config,
args,
}
}
/// Execute a batch operation
pub async fn execute_batch_operation(
&self,
request: BatchOperationRequest,
) -> Result<BatchOperationResult> {
let operation_id = request
.operation_id
2025-10-07 10:59:52 +01:00
.unwrap_or_else(|| Uuid::new_v4().to_string());
info!("Starting batch operation: {}", operation_id);
// Get infrastructure configuration
let infra_config = self
.config
.infrastructure
.get(&request.infrastructure)
.ok_or_else(|| {
anyhow::anyhow!(
"Infrastructure '{}' not found in configuration",
request.infrastructure
)
})?
2025-10-07 10:59:52 +01:00
.clone();
// Build workflow definition
let workflow_def = self.build_workflow_definition(
operation_id.clone(),
request.operation_type,
infra_config,
request.config,
)?;
// Execute workflow
let execution_state = self.workflow_engine.execute_workflow(workflow_def).await?;
2025-10-07 10:59:52 +01:00
// Collect task results
let task_results = self.collect_task_results(&execution_state).await?;
// Generate summary
let summary = self.generate_operation_summary(&execution_state, &task_results);
let result = BatchOperationResult {
operation_id,
execution_state,
task_results,
summary,
};
info!(
"Batch operation completed: {} ({})",
result.operation_id, result.execution_state.status
);
2025-10-07 10:59:52 +01:00
Ok(result)
}
/// Build workflow definition from operation request
fn build_workflow_definition(
&self,
operation_id: String,
operation_type: BatchOperationType,
infra_config: InfrastructureConfig,
config_override: Option<BatchConfig>,
) -> Result<WorkflowDefinition> {
let config = config_override.as_ref().unwrap_or(&self.config);
let provider_config = config
.providers
.get(&infra_config.provider)
.ok_or_else(|| {
anyhow::anyhow!(
"Provider '{}' not found in configuration",
infra_config.provider
)
})?;
2025-10-07 10:59:52 +01:00
let mut tasks = Vec::new();
match operation_type {
BatchOperationType::Servers { operation, servers } => {
tasks.extend(self.build_server_tasks(
operation,
servers,
&infra_config,
provider_config,
)?);
}
BatchOperationType::TaskServices {
operation,
services,
} => {
2025-10-07 10:59:52 +01:00
tasks.extend(self.build_taskservice_tasks(
operation,
services,
&infra_config,
provider_config,
)?);
}
BatchOperationType::Clusters {
operation,
clusters,
} => {
2025-10-07 10:59:52 +01:00
tasks.extend(self.build_cluster_tasks(
operation,
clusters,
&infra_config,
provider_config,
)?);
}
BatchOperationType::CustomWorkflow { workflow } => {
return Ok(workflow);
}
}
Ok(WorkflowDefinition {
name: format!("batch_{}", operation_id),
description: Some(format!(
"Batch operation for infrastructure: {}",
infra_config.name
)),
tasks,
config: Some(config.workflow.clone()),
})
}
/// Build server operation tasks
fn build_server_tasks(
&self,
operation: ServerOperation,
servers: Vec<String>,
infra_config: &InfrastructureConfig,
provider_config: &ProviderConfig,
) -> Result<Vec<WorkflowTaskDefinition>> {
let mut tasks = Vec::new();
let operation_cmd = match operation {
ServerOperation::Create => "server create",
ServerOperation::Delete => "server delete",
ServerOperation::Update => "server update",
ServerOperation::List => "server list",
ServerOperation::Start => "server start",
ServerOperation::Stop => "server stop",
ServerOperation::Restart => "server restart",
};
for (i, server) in servers.iter().enumerate() {
let task_name = format!(
"{}_{}_server_{}",
operation_cmd.replace(' ', "_"),
i,
server
);
2025-10-07 10:59:52 +01:00
let mut args = vec![
"--infra".to_string(),
infra_config.name.clone(),
"--settings".to_string(),
infra_config.settings_path.clone(),
];
// Add server-specific arguments
if !matches!(operation, ServerOperation::List) {
args.push(server.clone());
}
if self.config.defaults.check_mode {
args.push("--check".to_string());
}
if self.config.defaults.wait {
args.push("--wait".to_string());
}
let mut environment = self.config.defaults.environment.clone();
environment.insert(
"PROVISIONING_ENV".to_string(),
infra_config.environment.clone(),
);
2025-10-07 10:59:52 +01:00
let task = WorkflowTaskDefinition {
name: task_name,
command: format!(
"{} {}",
self.config.defaults.provisioning_path, operation_cmd
),
2025-10-07 10:59:52 +01:00
args,
dependencies: vec![],
provider: Some(infra_config.provider.clone()),
timeout_seconds: Some(provider_config.timeout_seconds),
max_retries: Some(provider_config.retry_config.max_attempts),
environment: Some(environment),
metadata: Some({
let mut metadata = HashMap::new();
metadata.insert("server".to_string(), server.clone());
metadata.insert("provider".to_string(), infra_config.provider.clone());
metadata.insert("operation".to_string(), format!("{:?}", operation));
metadata
}),
};
tasks.push(task);
}
Ok(tasks)
}
/// Build task service operation tasks
fn build_taskservice_tasks(
&self,
operation: TaskServiceOperation,
services: Vec<String>,
infra_config: &InfrastructureConfig,
provider_config: &ProviderConfig,
) -> Result<Vec<WorkflowTaskDefinition>> {
let mut tasks = Vec::new();
let operation_cmd = match operation {
TaskServiceOperation::Create => "taskserv create",
TaskServiceOperation::Delete => "taskserv delete",
TaskServiceOperation::Update => "taskserv update",
TaskServiceOperation::Generate => "taskserv generate",
TaskServiceOperation::CheckUpdates => "taskserv check-updates",
TaskServiceOperation::ListVersions => "taskserv versions",
};
// Build dependency order for taskservs
let ordered_services = self.order_taskservices(&services)?;
for (i, service) in ordered_services.iter().enumerate() {
let task_name = format!(
"{}_{}_taskserv_{}",
operation_cmd.replace(' ', "_"),
i,
service
);
2025-10-07 10:59:52 +01:00
let mut args = vec![
service.clone(),
"--infra".to_string(),
infra_config.name.clone(),
"--settings".to_string(),
infra_config.settings_path.clone(),
];
if self.config.defaults.check_mode {
args.push("--check".to_string());
}
if self.config.defaults.wait {
args.push("--wait".to_string());
}
let mut environment = self.config.defaults.environment.clone();
environment.insert(
"PROVISIONING_ENV".to_string(),
infra_config.environment.clone(),
);
2025-10-07 10:59:52 +01:00
// Add dependencies for ordered installation
let dependencies = if matches!(operation, TaskServiceOperation::Create) && i > 0 {
vec![format!(
"{}_{}_taskserv_{}",
operation_cmd.replace(' ', "_"),
i - 1,
ordered_services[i - 1]
)]
2025-10-07 10:59:52 +01:00
} else {
vec![]
};
let task = WorkflowTaskDefinition {
name: task_name,
command: format!(
"{} {}",
self.config.defaults.provisioning_path, operation_cmd
),
2025-10-07 10:59:52 +01:00
args,
dependencies,
provider: Some(infra_config.provider.clone()),
timeout_seconds: Some(provider_config.timeout_seconds),
max_retries: Some(provider_config.retry_config.max_attempts),
environment: Some(environment),
metadata: Some({
let mut metadata = HashMap::new();
metadata.insert("service".to_string(), service.clone());
metadata.insert("provider".to_string(), infra_config.provider.clone());
metadata.insert("operation".to_string(), format!("{:?}", operation));
metadata
}),
};
tasks.push(task);
}
Ok(tasks)
}
/// Build cluster operation tasks
fn build_cluster_tasks(
&self,
operation: ClusterOperation,
clusters: Vec<String>,
infra_config: &InfrastructureConfig,
provider_config: &ProviderConfig,
) -> Result<Vec<WorkflowTaskDefinition>> {
let mut tasks = Vec::new();
let operation_cmd = match operation {
ClusterOperation::Create => "cluster create",
ClusterOperation::Delete => "cluster delete",
ClusterOperation::Update => "cluster update",
ClusterOperation::Scale => "cluster scale",
ClusterOperation::Backup => "cluster backup",
ClusterOperation::Restore => "cluster restore",
};
for (i, cluster) in clusters.iter().enumerate() {
let task_name = format!(
"{}_{}_cluster_{}",
operation_cmd.replace(' ', "_"),
i,
cluster
);
2025-10-07 10:59:52 +01:00
let mut args = vec![
cluster.clone(),
"--infra".to_string(),
infra_config.name.clone(),
"--settings".to_string(),
infra_config.settings_path.clone(),
];
if self.config.defaults.check_mode {
args.push("--check".to_string());
}
if self.config.defaults.wait {
args.push("--wait".to_string());
}
let mut environment = self.config.defaults.environment.clone();
environment.insert(
"PROVISIONING_ENV".to_string(),
infra_config.environment.clone(),
);
2025-10-07 10:59:52 +01:00
let task = WorkflowTaskDefinition {
name: task_name,
command: format!(
"{} {}",
self.config.defaults.provisioning_path, operation_cmd
),
2025-10-07 10:59:52 +01:00
args,
dependencies: vec![],
provider: Some(infra_config.provider.clone()),
timeout_seconds: Some(provider_config.timeout_seconds),
max_retries: Some(provider_config.retry_config.max_attempts),
environment: Some(environment),
metadata: Some({
let mut metadata = HashMap::new();
metadata.insert("cluster".to_string(), cluster.clone());
metadata.insert("provider".to_string(), infra_config.provider.clone());
metadata.insert("operation".to_string(), format!("{:?}", operation));
metadata
}),
};
tasks.push(task);
}
Ok(tasks)
}
/// Order task services by dependencies
fn order_taskservices(&self, services: &[String]) -> Result<Vec<String>> {
// Predefined task service dependency order
let dependency_order = [
"kubernetes", // Core Kubernetes first
"containerd", // Container runtime
"cri-o", // Alternative container runtime
"cilium", // Networking
"coredns", // DNS
"rook-ceph", // Storage
"mayastor", // Alternative storage
"external-nfs", // Network file system
"oci-registry", // Container registry
"haproxy", // Load balancing
"postgresql", // Database
"gitea", // Git server
"provisioning", // Provisioning tools
"nushell", // Shell tools
2025-10-07 10:59:52 +01:00
];
let mut ordered = Vec::new();
let service_set: HashSet<_> = services.iter().collect();
// Add services in dependency order
for service in dependency_order.iter() {
if service_set.contains(&service.to_string()) {
ordered.push(service.to_string());
}
}
// Add any remaining services
for service in services {
if !ordered.contains(service) {
ordered.push(service.clone());
}
}
Ok(ordered)
}
/// Collect task results from execution state
async fn collect_task_results(
&self,
execution_state: &WorkflowExecutionState,
) -> Result<HashMap<String, TaskResult>> {
let mut task_results = HashMap::new();
for (task_name, task_state) in &execution_state.task_states {
let task_result = TaskResult {
task_id: task_state.task_id.clone(),
status: task_state.status.clone(),
output: None, // Would need to get from storage
error: task_state.error.clone(),
duration_ms: task_state.duration_ms,
provider: None, // Would extract from metadata
};
task_results.insert(task_name.clone(), task_result);
}
Ok(task_results)
}
/// Generate operation summary
fn generate_operation_summary(
&self,
execution_state: &WorkflowExecutionState,
task_results: &HashMap<String, TaskResult>,
) -> OperationSummary {
let total_tasks = execution_state.statistics.total_tasks;
let successful_tasks = execution_state.statistics.completed_tasks;
let failed_tasks = execution_state.statistics.failed_tasks;
let skipped_tasks = 0; // Would need to track skipped tasks
let errors: Vec<String> = task_results
.values()
2025-10-07 10:59:52 +01:00
.filter_map(|result| result.error.as_ref())
.cloned()
.collect();
OperationSummary {
total_tasks,
successful_tasks,
failed_tasks,
skipped_tasks,
total_duration_ms: execution_state.statistics.total_duration_ms,
errors,
}
}
/// Get batch operation status
pub async fn get_operation_status(&self, operation_id: &str) -> Option<WorkflowExecutionState> {
self.workflow_engine.get_workflow_state(operation_id).await
}
/// List all batch operations
pub async fn list_operations(&self) -> Vec<WorkflowExecutionState> {
self.workflow_engine.list_workflows().await
}
/// Cancel a batch operation
pub async fn cancel_operation(&self, operation_id: &str) -> Result<()> {
self.workflow_engine.cancel_workflow(operation_id).await
}
}
/// Factory for creating batch coordinators
pub struct BatchCoordinatorFactory;
impl BatchCoordinatorFactory {
/// Create batch coordinator from args
pub async fn create_from_args(
storage: Arc<dyn TaskStorage>,
args: Args,
) -> Result<BatchCoordinator> {
// Load batch configuration (would typically come from config files)
let config = Self::load_batch_config(&args).await?;
Ok(BatchCoordinator::new(storage, config, args))
}
/// Load batch configuration from files
async fn load_batch_config(args: &Args) -> Result<BatchConfig> {
// For now, return default config
// In a real implementation, this would load from:
// - config.defaults.toml
// - config.user.toml
// - environment-specific configs
let mut config = BatchConfig::default();
// Override with command-line args
config.defaults.provisioning_path = args.provisioning_path.clone();
config.defaults.nu_path = args.nu_path.clone();
Ok(config)
}
}
/// Configuration-driven batch operation templates
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchTemplate {
pub name: String,
pub description: String,
pub operation_type: BatchOperationType,
pub default_config: BatchConfig,
pub required_parameters: Vec<String>,
pub optional_parameters: HashMap<String, String>,
}
/// Template registry for common batch operations
#[derive(Default)]
2025-10-07 10:59:52 +01:00
pub struct BatchTemplateRegistry {
templates: HashMap<String, BatchTemplate>,
}
impl BatchTemplateRegistry {
/// Create new template registry
pub fn new() -> Self {
let mut registry = Self::default();
2025-10-07 10:59:52 +01:00
registry.register_default_templates();
registry
}
/// Register default templates
fn register_default_templates(&mut self) {
// Full infrastructure deployment template
let full_deployment = BatchTemplate {
name: "full_infrastructure_deployment".to_string(),
description: "Complete infrastructure deployment with servers, Kubernetes, and \
services"
.to_string(),
2025-10-07 10:59:52 +01:00
operation_type: BatchOperationType::CustomWorkflow {
workflow: WorkflowDefinition {
name: "full_deployment".to_string(),
description: Some("Complete infrastructure deployment".to_string()),
tasks: vec![], // Would be populated dynamically
config: None,
},
},
default_config: BatchConfig::default(),
required_parameters: vec!["infrastructure".to_string()],
optional_parameters: {
let mut params = HashMap::new();
params.insert("check_mode".to_string(), "false".to_string());
params.insert("wait".to_string(), "true".to_string());
params
},
};
self.templates
.insert("full_deployment".to_string(), full_deployment);
2025-10-07 10:59:52 +01:00
}
/// Get template by name
pub fn get_template(&self, name: &str) -> Option<&BatchTemplate> {
self.templates.get(name)
}
/// List available templates
pub fn list_templates(&self) -> Vec<&BatchTemplate> {
self.templates.values().collect()
}
}