677 lines
20 KiB
Rust
Raw Normal View History

//! Configuration loading and management for the orchestrator
//!
//! This module handles loading configuration from TOML files with support for:
//! - Default configuration (config.defaults.toml)
//! - User overrides (config.user.toml)
//! - Environment-specific configuration
//! - CLI argument overrides
use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
use platform_config::ConfigLoader;
use serde::{Deserialize, Serialize};
/// Complete orchestrator configuration
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct OrchestratorConfig {
pub orchestrator: OrchestratorSettings,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrchestratorSettings {
pub enabled: bool,
pub name: String,
pub version: String,
pub server: ServerConfig,
pub paths: PathsConfig,
pub storage: StorageConfig,
pub queue: QueueConfig,
pub batch: BatchConfig,
pub monitoring: MonitoringConfig,
pub rollback: RollbackConfig,
pub state: StateConfig,
pub logging: LoggingConfig,
pub dns: DnsConfig,
pub oci: OciConfig,
pub extensions: ExtensionsConfig,
pub services: ServicesConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerConfig {
pub host: String,
pub port: u16,
pub workers: usize,
pub keep_alive: u64,
pub max_connections: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PathsConfig {
pub base: String,
pub data_dir: String,
pub logs_dir: String,
pub queue_dir: String,
pub nu_path: String,
pub provisioning_path: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorageConfig {
#[serde(rename = "type")]
pub storage_type: String,
pub backend_path: String,
pub surrealdb: SurrealDbConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SurrealDbConfig {
pub url: String,
pub namespace: String,
pub database: String,
pub username: String,
pub password: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueConfig {
pub max_concurrent_tasks: usize,
pub retry_attempts: u32,
pub retry_delay_seconds: u64,
pub task_timeout_minutes: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchConfig {
pub parallel_limit: usize,
pub operation_timeout_minutes: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonitoringConfig {
pub enabled: bool,
pub metrics_interval_seconds: u64,
pub health_check_interval_seconds: u64,
pub min_memory_mb: u64,
pub max_cpu_percent: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RollbackConfig {
pub checkpoint_interval_seconds: u64,
pub auto_checkpoint_enabled: bool,
pub strategy: String,
pub max_checkpoints: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StateConfig {
pub snapshot_interval_seconds: u64,
pub max_snapshots_per_workflow: usize,
pub cleanup_interval_minutes: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoggingConfig {
pub level: String,
pub format: String,
pub max_file_size: String,
pub max_files: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DnsConfig {
pub coredns_url: String,
pub auto_register: bool,
pub ttl: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OciConfig {
pub registry_url: String,
pub namespace: String,
pub cache_dir: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExtensionsConfig {
pub auto_load: bool,
pub cache_dir: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServicesConfig {
pub manager_enabled: bool,
pub auto_start_dependencies: bool,
}
impl OrchestratorConfig {
/// Load configuration from file
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
let content = std::fs::read_to_string(path.as_ref())
.with_context(|| format!("Failed to read config file: {:?}", path.as_ref()))?;
let config: OrchestratorConfig = toml::from_str(&content)
.with_context(|| format!("Failed to parse config file: {:?}", path.as_ref()))?;
Ok(config)
}
/// Load configuration with hierarchical fallback logic:
/// 1. Environment variable ORCHESTRATOR_CONFIG (explicit config path)
/// 2. Mode-specific config:
/// provisioning/platform/config/orchestrator.{mode}.toml
/// 3. User config: config.user.toml (legacy support)
/// 4. Default config: config.defaults.toml (fallback)
///
/// Then environment variables (ORCHESTRATOR_*) override specific fields.
pub fn load() -> Result<Self> {
let mut config = Self::load_from_hierarchy()?;
Self::apply_env_overrides(&mut config)?;
Ok(config)
}
/// Internal: Load configuration from hierarchy without env var overrides
fn load_from_hierarchy() -> Result<Self> {
// Priority 1: Explicit config path from environment variable
if let Ok(config_path) = std::env::var("ORCHESTRATOR_CONFIG") {
return Self::from_file(&config_path);
}
// Priority 2: Mode-specific config (provisioning/platform/config/)
if let Ok(mode) = std::env::var("ORCHESTRATOR_MODE") {
let mode_config_path =
format!("provisioning/platform/config/orchestrator.{}.toml", mode);
if Path::new(&mode_config_path).exists() {
return Self::from_file(&mode_config_path);
}
}
// Priority 3: User override (legacy)
let base_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let user_config_path = base_path.join("config.user.toml");
if user_config_path.exists() {
return Self::from_file(user_config_path);
}
// Priority 4: System defaults
let defaults_path = base_path.join("config.defaults.toml");
if defaults_path.exists() {
return Self::from_file(defaults_path);
}
anyhow::bail!(
"No configuration file found. Set ORCHESTRATOR_CONFIG, ORCHESTRATOR_MODE, or ensure \
config.defaults.toml exists"
)
}
/// Apply environment variable overrides to configuration
/// Environment variables use format: ORCHESTRATOR_{SECTION}_{KEY}=value
/// Example: ORCHESTRATOR_SERVER_PORT=9999
fn apply_env_overrides(config: &mut Self) -> Result<()> {
// Server overrides
if let Ok(host) = std::env::var("ORCHESTRATOR_SERVER_HOST") {
config.orchestrator.server.host = host;
}
if let Ok(port) = std::env::var("ORCHESTRATOR_SERVER_PORT") {
config.orchestrator.server.port = port
.parse()
.context("ORCHESTRATOR_SERVER_PORT must be a valid port number")?;
}
if let Ok(workers) = std::env::var("ORCHESTRATOR_SERVER_WORKERS") {
config.orchestrator.server.workers = workers
.parse()
.context("ORCHESTRATOR_SERVER_WORKERS must be a valid number")?;
}
// Storage overrides
if let Ok(storage_type) = std::env::var("ORCHESTRATOR_STORAGE_TYPE") {
config.orchestrator.storage.storage_type = storage_type;
}
if let Ok(backend_path) = std::env::var("ORCHESTRATOR_STORAGE_PATH") {
config.orchestrator.storage.backend_path = backend_path;
}
// SurrealDB overrides
if let Ok(url) = std::env::var("ORCHESTRATOR_SURREALDB_URL") {
config.orchestrator.storage.surrealdb.url = url;
}
if let Ok(namespace) = std::env::var("ORCHESTRATOR_SURREALDB_NAMESPACE") {
config.orchestrator.storage.surrealdb.namespace = namespace;
}
if let Ok(database) = std::env::var("ORCHESTRATOR_SURREALDB_DATABASE") {
config.orchestrator.storage.surrealdb.database = database;
}
if let Ok(username) = std::env::var("ORCHESTRATOR_SURREALDB_USERNAME") {
config.orchestrator.storage.surrealdb.username = username;
}
if let Ok(password) = std::env::var("ORCHESTRATOR_SURREALDB_PASSWORD") {
config.orchestrator.storage.surrealdb.password = password;
}
// Queue overrides
if let Ok(max_tasks) = std::env::var("ORCHESTRATOR_QUEUE_MAX_CONCURRENT_TASKS") {
config.orchestrator.queue.max_concurrent_tasks = max_tasks
.parse()
.context("ORCHESTRATOR_QUEUE_MAX_CONCURRENT_TASKS must be a valid number")?;
}
if let Ok(retries) = std::env::var("ORCHESTRATOR_QUEUE_RETRY_ATTEMPTS") {
config.orchestrator.queue.retry_attempts = retries
.parse()
.context("ORCHESTRATOR_QUEUE_RETRY_ATTEMPTS must be a valid number")?;
}
// Logging overrides
if let Ok(level) = std::env::var("ORCHESTRATOR_LOG_LEVEL") {
config.orchestrator.logging.level = level;
}
if let Ok(format) = std::env::var("ORCHESTRATOR_LOG_FORMAT") {
config.orchestrator.logging.format = format;
}
Ok(())
}
/// Apply CLI argument overrides to the configuration
pub fn apply_cli_overrides(&mut self, args: &crate::Args) {
// Override port
self.orchestrator.server.port = args.port;
// Override data directory
self.orchestrator.paths.data_dir = args.data_dir.clone();
// Override storage type
self.orchestrator.storage.storage_type = args.storage_type.clone();
// Override SurrealDB settings if provided
if let Some(url) = &args.surrealdb_url {
self.orchestrator.storage.surrealdb.url = url.clone();
}
if let Some(namespace) = &args.surrealdb_namespace {
self.orchestrator.storage.surrealdb.namespace = namespace.clone();
}
if let Some(database) = &args.surrealdb_database {
self.orchestrator.storage.surrealdb.database = database.clone();
}
if let Some(username) = &args.surrealdb_username {
self.orchestrator.storage.surrealdb.username = username.clone();
}
if let Some(password) = &args.surrealdb_password {
self.orchestrator.storage.surrealdb.password = password.clone();
}
// Override Nushell path
self.orchestrator.paths.nu_path = args.nu_path.clone();
// Override provisioning path
self.orchestrator.paths.provisioning_path = args.provisioning_path.clone();
}
/// Get server configuration
pub fn server(&self) -> &ServerConfig {
&self.orchestrator.server
}
/// Get paths configuration
pub fn paths(&self) -> &PathsConfig {
&self.orchestrator.paths
}
/// Get storage configuration
pub fn storage(&self) -> &StorageConfig {
&self.orchestrator.storage
}
/// Get queue configuration
pub fn queue(&self) -> &QueueConfig {
&self.orchestrator.queue
}
/// Get logging configuration
pub fn logging(&self) -> &LoggingConfig {
&self.orchestrator.logging
}
/// Get DNS configuration
pub fn dns(&self) -> &DnsConfig {
&self.orchestrator.dns
}
/// Get OCI configuration
pub fn oci(&self) -> &OciConfig {
&self.orchestrator.oci
}
/// Get extensions configuration
pub fn extensions(&self) -> &ExtensionsConfig {
&self.orchestrator.extensions
}
/// Get services configuration
pub fn services(&self) -> &ServicesConfig {
&self.orchestrator.services
}
}
impl ConfigLoader for OrchestratorConfig {
fn service_name() -> &'static str {
"orchestrator"
}
fn load_from_hierarchy() -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>>
{
let service = Self::service_name();
if let Some(path) = platform_config::resolve_config_path(service) {
return Self::from_path(&path);
}
// Fallback to defaults
Ok(Self::default())
}
fn apply_env_overrides(
&mut self,
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
Self::apply_env_overrides(self).map_err(|e| {
Box::new(std::io::Error::other(e.to_string()))
as Box<dyn std::error::Error + Send + Sync>
})
}
fn from_path<P: AsRef<Path>>(
path: P,
) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let path = path.as_ref();
let json_value = platform_config::format::load_config(path).map_err(|e| {
let err: Box<dyn std::error::Error + Send + Sync> = Box::new(e);
err
})?;
serde_json::from_value(json_value).map_err(|e| {
let err_msg = format!(
"Failed to deserialize orchestrator config from {:?}: {}",
path, e
);
Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
err_msg,
)) as Box<dyn std::error::Error + Send + Sync>
})
}
}
impl Default for OrchestratorSettings {
fn default() -> Self {
Self {
enabled: true,
name: "orchestrator".to_string(),
version: "1.0.0".to_string(),
server: ServerConfig::default(),
paths: PathsConfig::default(),
storage: StorageConfig::default(),
queue: QueueConfig::default(),
batch: BatchConfig::default(),
monitoring: MonitoringConfig::default(),
rollback: RollbackConfig::default(),
state: StateConfig::default(),
logging: LoggingConfig::default(),
dns: DnsConfig::default(),
oci: OciConfig::default(),
extensions: ExtensionsConfig::default(),
services: ServicesConfig::default(),
}
}
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
host: "127.0.0.1".to_string(),
port: 9090,
workers: 4,
keep_alive: 75,
max_connections: 1000,
}
}
}
impl Default for PathsConfig {
fn default() -> Self {
Self {
base: "/tmp/.orchestrator".to_string(),
data_dir: "/tmp/.orchestrator/data".to_string(),
logs_dir: "/tmp/.orchestrator/logs".to_string(),
queue_dir: "/tmp/.orchestrator/data/queue".to_string(),
nu_path: "/usr/local/bin/nu".to_string(),
provisioning_path: "provisioning".to_string(),
}
}
}
impl Default for StorageConfig {
fn default() -> Self {
Self {
storage_type: "surrealdb".to_string(),
backend_path: "/tmp/.orchestrator/storage".to_string(),
surrealdb: SurrealDbConfig::default(),
}
}
}
impl Default for SurrealDbConfig {
fn default() -> Self {
Self {
url: "ws://127.0.0.1:8000".to_string(),
namespace: "provisioning".to_string(),
database: "orchestrator".to_string(),
username: "root".to_string(),
password: "root".to_string(),
}
}
}
impl Default for QueueConfig {
fn default() -> Self {
Self {
max_concurrent_tasks: 10,
retry_attempts: 3,
retry_delay_seconds: 5,
task_timeout_minutes: 30,
}
}
}
impl Default for BatchConfig {
fn default() -> Self {
Self {
parallel_limit: 5,
operation_timeout_minutes: 60,
}
}
}
impl Default for MonitoringConfig {
fn default() -> Self {
Self {
enabled: true,
metrics_interval_seconds: 60,
health_check_interval_seconds: 30,
min_memory_mb: 512,
max_cpu_percent: 80.0,
}
}
}
impl Default for RollbackConfig {
fn default() -> Self {
Self {
checkpoint_interval_seconds: 300,
auto_checkpoint_enabled: true,
strategy: "incremental".to_string(),
max_checkpoints: 10,
}
}
}
impl Default for StateConfig {
fn default() -> Self {
Self {
snapshot_interval_seconds: 600,
max_snapshots_per_workflow: 5,
cleanup_interval_minutes: 1440,
}
}
}
impl Default for LoggingConfig {
fn default() -> Self {
Self {
level: "info".to_string(),
format: "json".to_string(),
max_file_size: "100MB".to_string(),
max_files: 10,
}
}
}
impl Default for DnsConfig {
fn default() -> Self {
Self {
coredns_url: "http://127.0.0.1:9053".to_string(),
auto_register: true,
ttl: 300,
}
}
}
impl Default for OciConfig {
fn default() -> Self {
Self {
registry_url: "http://127.0.0.1:5000".to_string(),
namespace: "provisioning".to_string(),
cache_dir: "/tmp/.orchestrator/oci-cache".to_string(),
}
}
}
impl Default for ExtensionsConfig {
fn default() -> Self {
Self {
auto_load: true,
cache_dir: "/tmp/.orchestrator/extensions".to_string(),
}
}
}
impl Default for ServicesConfig {
fn default() -> Self {
Self {
manager_enabled: true,
auto_start_dependencies: true,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_deserialization() {
let toml_content = r#"
[orchestrator]
enabled = true
name = "orchestrator"
version = "1.0.0"
[orchestrator.server]
host = "127.0.0.1"
port = 9090
workers = 4
keep_alive = 75
max_connections = 1000
[orchestrator.paths]
base = "/tmp/.orchestrator"
data_dir = "/tmp/.orchestrator/data"
logs_dir = "/tmp/.orchestrator/logs"
queue_dir = "/tmp/.orchestrator/data/queue"
nu_path = "/usr/local/bin/nu"
provisioning_path = "/usr/local/bin/provisioning"
[orchestrator.storage]
type = "filesystem"
backend_path = "/tmp/.orchestrator/data/queue.rkvs"
[orchestrator.storage.surrealdb]
url = "ws://localhost:8000"
namespace = "orchestrator"
database = "tasks"
username = ""
password = ""
[orchestrator.queue]
max_concurrent_tasks = 5
retry_attempts = 3
retry_delay_seconds = 5
task_timeout_minutes = 60
[orchestrator.batch]
parallel_limit = 5
operation_timeout_minutes = 30
[orchestrator.monitoring]
enabled = true
metrics_interval_seconds = 60
health_check_interval_seconds = 30
min_memory_mb = 1024
max_cpu_percent = 80.0
[orchestrator.rollback]
checkpoint_interval_seconds = 300
auto_checkpoint_enabled = true
strategy = "config-driven"
max_checkpoints = 50
[orchestrator.state]
snapshot_interval_seconds = 120
max_snapshots_per_workflow = 10
cleanup_interval_minutes = 60
[orchestrator.logging]
level = "info"
format = "json"
max_file_size = "100MB"
max_files = 10
[orchestrator.dns]
coredns_url = "http://localhost:53"
auto_register = true
ttl = 300
[orchestrator.oci]
registry_url = "http://localhost:5000"
namespace = "provisioning-extensions"
cache_dir = "/tmp/oci-cache"
[orchestrator.extensions]
auto_load = true
cache_dir = "/tmp/extensions"
[orchestrator.services]
manager_enabled = true
auto_start_dependencies = true
"#;
let config: OrchestratorConfig = toml::from_str(toml_content).unwrap();
assert_eq!(config.orchestrator.name, "orchestrator");
assert_eq!(config.orchestrator.server.port, 9090);
assert_eq!(config.orchestrator.storage.storage_type, "filesystem");
assert_eq!(config.orchestrator.queue.max_concurrent_tasks, 5);
}
}