1286 lines
42 KiB
Rust
Raw Normal View History

use std::env;
use std::sync::Arc;
2025-10-07 10:59:52 +01:00
use anyhow::{Context, Result};
use axum::{
extract::{Path, State},
2026-01-12 04:53:31 +00:00
http::StatusCode,
2025-10-07 10:59:52 +01:00
response::Json,
routing::{get, post},
Router,
};
use clap::Parser;
use platform_config::{load_deployment_mode, PlatformStartup};
// Use types from the library
use provisioning_orchestrator::{
2026-01-12 04:53:31 +00:00
audit::{AuditEvent, AuditFilter, AuditQuery, RetentionPolicy, SiemFormat},
batch::{BatchOperationRequest, BatchOperationResult},
compliance_routes,
config::OrchestratorConfig,
2026-01-12 04:53:31 +00:00
monitor::{MonitoringEvent, MonitoringEventType, SystemHealthStatus},
rollback::{Checkpoint, RollbackResult, RollbackStatistics},
state::{ProgressInfo, StateManagerStatistics, StateSnapshot, SystemMetrics},
test_environment::{
2026-01-12 04:53:31 +00:00
CreateTestEnvironmentRequest, RunTestRequest, TestEnvironment, TestEnvironmentResponse,
TestResult,
},
webhooks::{handle_webhook, WebhookState, WorkspaceRegistry},
2026-01-12 04:53:31 +00:00
workflow::WorkflowExecutionState,
AppState, Args, ClusterWorkflow, CreateServerWorkflow, SharedState, TaskStatus,
TaskservWorkflow, WorkflowTask,
2025-10-07 10:59:52 +01:00
};
use serde::{Deserialize, Serialize};
2025-10-07 10:59:52 +01:00
use tower_http::cors::CorsLayer;
use tracing::{error, info, warn};
use uuid::Uuid;
#[derive(Serialize)]
struct ApiResponse<T> {
success: bool,
data: Option<T>,
error: Option<String>,
}
impl<T> ApiResponse<T> {
fn success(data: T) -> Self {
Self {
success: true,
data: Some(data),
error: None,
}
}
fn error(message: String) -> Self {
Self {
success: false,
data: None,
error: Some(message),
}
}
}
// REST API Routes
async fn create_server_workflow(
State(state): State<SharedState>,
Json(workflow): Json<CreateServerWorkflow>,
) -> Result<Json<ApiResponse<String>>, StatusCode> {
let task_id = Uuid::new_v4().to_string();
// PROPER ARCHITECTURE: CLI renders script, orchestrator executes it
// If script_compressed is provided: execute it (that's ALL the orchestrator
// does) If NOT provided: error (legacy mode should not happen)
let task = if let Some(ref script_compressed) = workflow.script_compressed {
// CLI has provided the COMPLETE SCRIPT ready to execute
// No command construction, no decision logic
// Just: decompress -> execute
// Store script in temp file for execution
let script_file = format!("/tmp/orchestrator_script_{}.tar.gz.b64", task_id);
std::fs::write(&script_file, script_compressed).ok();
WorkflowTask {
id: task_id.clone(),
name: if workflow.servers.is_empty() {
"execute_servers_script_all".to_string()
} else {
format!("execute_servers_script_{}", workflow.servers.join("_"))
},
// Execute the decompressed script directly
command: "bash".to_string(),
args: vec![
"-c".to_string(),
// Decompress: base64 decode -> gunzip -> extract script.sh -> execute
// CRITICAL: Use '+x' to DISABLE debug mode and prevent credential exposure
// Even if script contains 'set -x', it won't execute with +x flag
format!(
"base64 -d < {} | gunzip | tar -xOf - script.sh | bash +x",
script_file
),
],
dependencies: vec![],
status: TaskStatus::Pending,
created_at: chrono::Utc::now(),
started_at: None,
completed_at: None,
output: None,
error: None,
}
} else {
// LEGACY: Construct command from parameters (deprecated)
let mut args = vec![
2025-10-07 10:59:52 +01:00
"--infra".to_string(),
workflow.infra.clone(),
"--settings".to_string(),
workflow.settings.clone(),
];
for server in &workflow.servers {
args.push(server.clone());
}
if workflow.check_mode {
args.push("--check".to_string());
}
if workflow.wait {
args.push("--wait".to_string());
}
WorkflowTask {
id: task_id.clone(),
name: if workflow.servers.is_empty() {
"create_servers_all".to_string()
} else {
format!("create_servers_{}", workflow.servers.join("_"))
},
command: format!("{} servers create", state.args.provisioning_path),
args,
dependencies: vec![],
status: TaskStatus::Pending,
created_at: chrono::Utc::now(),
started_at: None,
completed_at: None,
output: None,
error: None,
}
};
let server_summary = if workflow.servers.is_empty() {
"all servers".to_string()
} else {
format!("{} server(s)", workflow.servers.len())
2025-10-07 10:59:52 +01:00
};
match state.task_storage.enqueue(task, 5).await {
Ok(()) => {
info!(
"Enqueued server creation workflow ({}): {} | infra: {}",
server_summary, task_id, workflow.infra
);
2025-10-07 10:59:52 +01:00
Ok(Json(ApiResponse::success(task_id)))
}
Err(e) => {
error!(
"Failed to enqueue server creation task ({}): {}",
server_summary, e
);
2025-10-07 10:59:52 +01:00
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn create_taskserv_workflow(
State(state): State<SharedState>,
Json(workflow): Json<TaskservWorkflow>,
) -> Result<Json<ApiResponse<String>>, StatusCode> {
let task_id = Uuid::new_v4().to_string();
let task = WorkflowTask {
id: task_id.clone(),
name: format!("{}_taskserv", workflow.operation),
command: format!(
"{} taskserv {}",
state.args.provisioning_path, workflow.operation
),
2025-10-07 10:59:52 +01:00
args: vec![
workflow.taskserv.clone(),
"--infra".to_string(),
workflow.infra.clone(),
"--settings".to_string(),
workflow.settings.clone(),
if workflow.check_mode {
"--check".to_string()
} else {
"".to_string()
},
if workflow.wait {
"--wait".to_string()
} else {
"".to_string()
},
]
.into_iter()
.filter(|s| !s.is_empty())
.collect(),
2025-10-07 10:59:52 +01:00
dependencies: vec![],
status: TaskStatus::Pending,
created_at: chrono::Utc::now(),
started_at: None,
completed_at: None,
output: None,
error: None,
};
match state.task_storage.enqueue(task, 6).await {
Ok(()) => {
info!(
"Enqueued taskserv {} workflow: {}",
workflow.operation, task_id
);
2025-10-07 10:59:52 +01:00
Ok(Json(ApiResponse::success(task_id)))
}
Err(e) => {
error!("Failed to enqueue task: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn create_cluster_workflow(
State(state): State<SharedState>,
Json(workflow): Json<ClusterWorkflow>,
) -> Result<Json<ApiResponse<String>>, StatusCode> {
let task_id = Uuid::new_v4().to_string();
let task = WorkflowTask {
id: task_id.clone(),
name: format!("{}_cluster", workflow.operation),
command: format!(
"{} cluster {}",
state.args.provisioning_path, workflow.operation
),
2025-10-07 10:59:52 +01:00
args: vec![
workflow.cluster_type.clone(),
"--infra".to_string(),
workflow.infra.clone(),
"--settings".to_string(),
workflow.settings.clone(),
if workflow.check_mode {
"--check".to_string()
} else {
"".to_string()
},
if workflow.wait {
"--wait".to_string()
} else {
"".to_string()
},
]
.into_iter()
.filter(|s| !s.is_empty())
.collect(),
2025-10-07 10:59:52 +01:00
dependencies: vec![],
status: TaskStatus::Pending,
created_at: chrono::Utc::now(),
started_at: None,
completed_at: None,
output: None,
error: None,
};
match state.task_storage.enqueue(task, 7).await {
Ok(()) => {
info!(
"Enqueued cluster {} workflow: {}",
workflow.operation, task_id
);
2025-10-07 10:59:52 +01:00
Ok(Json(ApiResponse::success(task_id)))
}
Err(e) => {
error!("Failed to enqueue task: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn get_task_status(
State(state): State<SharedState>,
Path(task_id): Path<String>,
) -> Result<Json<ApiResponse<WorkflowTask>>, StatusCode> {
match state.task_storage.get_task(&task_id).await {
Ok(Some(task)) => Ok(Json(ApiResponse::success(task))),
Ok(None) => Err(StatusCode::NOT_FOUND),
Err(e) => {
error!("Failed to get task {}: {}", task_id, e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn list_tasks(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<Vec<WorkflowTask>>>, StatusCode> {
match state.task_storage.list_tasks(None).await {
Ok(task_list) => Ok(Json(ApiResponse::success(task_list))),
Err(e) => {
error!("Failed to list tasks: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn health_check() -> Json<ApiResponse<String>> {
Json(ApiResponse::success("Orchestrator is healthy".to_string()))
}
// Batch operation routes
async fn execute_batch_operation(
State(state): State<SharedState>,
Json(request): Json<BatchOperationRequest>,
) -> Result<Json<ApiResponse<BatchOperationResult>>, StatusCode> {
match state
.batch_coordinator
.execute_batch_operation(request)
.await
{
2025-10-07 10:59:52 +01:00
Ok(result) => Ok(Json(ApiResponse::success(result))),
Err(e) => {
error!("Failed to execute batch operation: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn get_batch_operation_status(
State(state): State<SharedState>,
Path(operation_id): Path<String>,
) -> Result<Json<ApiResponse<WorkflowExecutionState>>, StatusCode> {
match state
.batch_coordinator
.get_operation_status(&operation_id)
.await
{
2025-10-07 10:59:52 +01:00
Some(state) => Ok(Json(ApiResponse::success(state))),
None => Err(StatusCode::NOT_FOUND),
}
}
async fn list_batch_operations(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<Vec<WorkflowExecutionState>>>, StatusCode> {
let operations = state.batch_coordinator.list_operations().await;
Ok(Json(ApiResponse::success(operations)))
}
async fn cancel_batch_operation(
State(state): State<SharedState>,
Path(operation_id): Path<String>,
) -> Result<Json<ApiResponse<String>>, StatusCode> {
match state
.batch_coordinator
.cancel_operation(&operation_id)
.await
{
Ok(_) => Ok(Json(ApiResponse::success(format!(
"Operation {} cancelled",
operation_id
)))),
2025-10-07 10:59:52 +01:00
Err(e) => {
error!("Failed to cancel operation {}: {}", operation_id, e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// Rollback and recovery routes
#[derive(Deserialize)]
struct CreateCheckpointRequest {
name: String,
description: Option<String>,
}
#[derive(Deserialize)]
struct RollbackRequest {
checkpoint_id: Option<String>,
operation_ids: Option<Vec<String>>,
}
async fn create_checkpoint(
State(state): State<SharedState>,
Json(request): Json<CreateCheckpointRequest>,
) -> Result<Json<ApiResponse<String>>, StatusCode> {
match state
.rollback_system
.checkpoint_manager
.create_checkpoint(request.name, request.description)
.await
{
2025-10-07 10:59:52 +01:00
Ok(checkpoint_id) => {
info!("Created checkpoint: {}", checkpoint_id);
Ok(Json(ApiResponse::success(checkpoint_id)))
}
Err(e) => {
error!("Failed to create checkpoint: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn list_checkpoints(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<Vec<Checkpoint>>>, StatusCode> {
let checkpoints = state
.rollback_system
.checkpoint_manager
.list_checkpoints()
.await;
2025-10-07 10:59:52 +01:00
Ok(Json(ApiResponse::success(checkpoints)))
}
async fn get_checkpoint(
State(state): State<SharedState>,
Path(checkpoint_id): Path<String>,
) -> Result<Json<ApiResponse<Checkpoint>>, StatusCode> {
match state
.rollback_system
.checkpoint_manager
.get_checkpoint(&checkpoint_id)
.await
{
2025-10-07 10:59:52 +01:00
Some(checkpoint) => Ok(Json(ApiResponse::success(checkpoint))),
None => Err(StatusCode::NOT_FOUND),
}
}
async fn execute_rollback(
State(state): State<SharedState>,
Json(request): Json<RollbackRequest>,
) -> Result<Json<ApiResponse<RollbackResult>>, StatusCode> {
let result = if let Some(checkpoint_id) = request.checkpoint_id {
// Rollback to specific checkpoint
match state
.rollback_system
.rollback_executor
.rollback_to_checkpoint(&checkpoint_id)
.await
{
2025-10-07 10:59:52 +01:00
Ok(result) => result,
Err(e) => {
error!(
"Failed to execute rollback to checkpoint {}: {}",
checkpoint_id, e
);
2025-10-07 10:59:52 +01:00
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
}
} else if let Some(operation_ids) = request.operation_ids {
// Partial rollback of specific operations
match state
.rollback_system
.rollback_executor
.rollback_operations(operation_ids)
.await
{
2025-10-07 10:59:52 +01:00
Ok(result) => result,
Err(e) => {
error!("Failed to execute partial rollback: {}", e);
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
}
} else {
return Err(StatusCode::BAD_REQUEST);
};
if result.success {
info!(
"Rollback executed successfully: {} operations",
result.operations_executed
);
2025-10-07 10:59:52 +01:00
} else {
warn!(
"Rollback completed with errors: {}/{} operations failed",
result.operations_failed,
result.operations_executed + result.operations_failed
);
2025-10-07 10:59:52 +01:00
}
Ok(Json(ApiResponse::success(result)))
}
async fn get_rollback_statistics(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<RollbackStatistics>>, StatusCode> {
let stats = state.rollback_system.get_statistics().await;
Ok(Json(ApiResponse::success(stats)))
}
async fn restore_from_checkpoint(
State(state): State<SharedState>,
Path(checkpoint_id): Path<String>,
) -> Result<Json<ApiResponse<String>>, StatusCode> {
match state
.rollback_system
.state_restorer
.restore_from_checkpoint(&checkpoint_id)
.await
{
2025-10-07 10:59:52 +01:00
Ok(_) => {
info!(
"Successfully restored state from checkpoint: {}",
checkpoint_id
);
Ok(Json(ApiResponse::success(format!(
"State restored from checkpoint {}",
checkpoint_id
))))
2025-10-07 10:59:52 +01:00
}
Err(e) => {
error!("Failed to restore from checkpoint {}: {}", checkpoint_id, e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// State management and monitoring routes
async fn get_workflow_progress(
State(state): State<SharedState>,
Path(workflow_id): Path<String>,
) -> Result<Json<ApiResponse<ProgressInfo>>, StatusCode> {
match state
.progress_tracker
.get_real_time_progress(&workflow_id)
.await
{
2025-10-07 10:59:52 +01:00
Some(progress) => Ok(Json(ApiResponse::success(progress))),
None => Err(StatusCode::NOT_FOUND),
}
}
async fn get_workflow_snapshots(
State(state): State<SharedState>,
Path(workflow_id): Path<String>,
) -> Result<Json<ApiResponse<Vec<StateSnapshot>>>, StatusCode> {
let snapshots = state
.state_manager
.get_workflow_snapshots(&workflow_id)
.await;
2025-10-07 10:59:52 +01:00
Ok(Json(ApiResponse::success(snapshots)))
}
async fn get_system_metrics(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<SystemMetrics>>, StatusCode> {
let metrics = state.state_manager.get_system_metrics().await;
Ok(Json(ApiResponse::success(metrics)))
}
async fn get_system_health(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<SystemHealthStatus>>, StatusCode> {
let health_status = state
.monitoring_system
.health_monitor()
.get_system_health()
.await;
2025-10-07 10:59:52 +01:00
Ok(Json(ApiResponse::success(health_status)))
}
async fn get_state_statistics(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<StateManagerStatistics>>, StatusCode> {
match state.state_manager.get_statistics().await {
Ok(stats) => Ok(Json(ApiResponse::success(stats))),
Err(e) => {
error!("Failed to get state statistics: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// Test environment routes
async fn create_test_environment(
State(state): State<SharedState>,
Json(request): Json<CreateTestEnvironmentRequest>,
) -> Result<Json<ApiResponse<TestEnvironmentResponse>>, StatusCode> {
match state.test_orchestrator.create_environment(request).await {
Ok(response) => Ok(Json(ApiResponse::success(response))),
Err(e) => {
error!("Failed to create test environment: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn get_test_environment(
State(state): State<SharedState>,
Path(env_id): Path<String>,
) -> Result<Json<ApiResponse<TestEnvironment>>, StatusCode> {
match state.test_orchestrator.get_environment(&env_id).await {
Some(env) => Ok(Json(ApiResponse::success(env))),
None => Err(StatusCode::NOT_FOUND),
}
}
async fn list_test_environments(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<Vec<TestEnvironment>>>, StatusCode> {
let environments = state.test_orchestrator.list_environments().await;
Ok(Json(ApiResponse::success(environments)))
}
async fn run_environment_tests(
State(state): State<SharedState>,
Path(env_id): Path<String>,
Json(request): Json<RunTestRequest>,
) -> Result<Json<ApiResponse<Vec<TestResult>>>, StatusCode> {
match state.test_orchestrator.run_tests(&env_id, request).await {
Ok(results) => Ok(Json(ApiResponse::success(results))),
Err(e) => {
error!("Failed to run tests: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn cleanup_test_environment(
State(state): State<SharedState>,
Path(env_id): Path<String>,
) -> Result<Json<ApiResponse<String>>, StatusCode> {
match state
.test_orchestrator
.cleanup_environment(&env_id, false)
.await
{
Ok(_) => Ok(Json(ApiResponse::success(format!(
"Environment {} cleaned up",
env_id
)))),
2025-10-07 10:59:52 +01:00
Err(e) => {
error!("Failed to cleanup environment: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn get_environment_logs(
State(state): State<SharedState>,
Path(env_id): Path<String>,
) -> Result<Json<ApiResponse<Vec<String>>>, StatusCode> {
match state.test_orchestrator.get_environment_logs(&env_id).await {
Ok(logs) => Ok(Json(ApiResponse::success(logs))),
Err(e) => {
error!("Failed to get environment logs: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// DNS Management Routes
async fn list_dns_records(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<Vec<provisioning_orchestrator::dns::DnsRecord>>>, StatusCode> {
match state.dns_manager.list_records().await {
Ok(records) => Ok(Json(ApiResponse::success(records))),
Err(e) => {
error!("Failed to list DNS records: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// Extension Management Routes
async fn list_loaded_extensions(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<Vec<provisioning_orchestrator::extensions::Extension>>>, StatusCode> {
let extensions = state.extension_manager.list_loaded_extensions().await;
Ok(Json(ApiResponse::success(extensions)))
}
#[derive(Deserialize)]
struct ReloadExtensionRequest {
extension_type: String,
name: String,
}
async fn reload_extension(
State(state): State<SharedState>,
Json(request): Json<ReloadExtensionRequest>,
) -> Result<Json<ApiResponse<String>>, StatusCode> {
use provisioning_orchestrator::extensions::ExtensionType;
let ext_type = match request.extension_type.as_str() {
"provider" => ExtensionType::Provider,
"taskserv" => ExtensionType::Taskserv,
"cluster" => ExtensionType::Cluster,
_ => return Err(StatusCode::BAD_REQUEST),
};
match state
.extension_manager
.reload_extension(ext_type, request.name.clone())
.await
{
Ok(_) => Ok(Json(ApiResponse::success(format!(
"Extension {} reloaded",
request.name
)))),
2025-10-07 10:59:52 +01:00
Err(e) => {
error!("Failed to reload extension: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// OCI Registry Routes
#[derive(Deserialize)]
struct ListOciArtifactsRequest {
namespace: String,
}
async fn list_oci_artifacts(
State(state): State<SharedState>,
Json(request): Json<ListOciArtifactsRequest>,
) -> Result<Json<ApiResponse<Vec<provisioning_orchestrator::oci::OciArtifact>>>, StatusCode> {
match state
.oci_manager
.list_oci_artifacts(&request.namespace)
.await
{
2025-10-07 10:59:52 +01:00
Ok(artifacts) => Ok(Json(ApiResponse::success(artifacts))),
Err(e) => {
error!("Failed to list OCI artifacts: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// Service Orchestration Routes
async fn list_services(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<Vec<provisioning_orchestrator::services::Service>>>, StatusCode> {
let services = state.service_orchestrator.list_services().await;
Ok(Json(ApiResponse::success(services)))
}
#[derive(Serialize)]
struct ServiceStatusResponse {
name: String,
status: provisioning_orchestrator::services::ServiceStatus,
}
async fn get_services_status(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<Vec<ServiceStatusResponse>>>, StatusCode> {
let services = state.service_orchestrator.list_services().await;
let mut statuses = Vec::new();
for service in services {
if let Ok(status) = state
.service_orchestrator
.get_service_status(&service.name)
.await
{
2025-10-07 10:59:52 +01:00
statuses.push(ServiceStatusResponse {
name: service.name,
status,
});
}
}
Ok(Json(ApiResponse::success(statuses)))
}
// ============================================================================
// Audit API Handlers
// ============================================================================
/// Query audit logs with filters
async fn query_audit_logs(
State(state): State<SharedState>,
Json(query): Json<AuditQuery>,
) -> Result<Json<ApiResponse<Vec<AuditEvent>>>, StatusCode> {
match state.audit_logger.query(query).await {
Ok(events) => Ok(Json(ApiResponse::success(events))),
Err(e) => {
error!("Failed to query audit logs: {}", e);
Ok(Json(ApiResponse::error(format!("Query failed: {}", e))))
}
}
}
/// Export audit logs to specific format
#[derive(Debug, Deserialize)]
struct ExportRequest {
format: SiemFormat,
filter: Option<AuditFilter>,
}
async fn export_audit_logs(
State(state): State<SharedState>,
Json(request): Json<ExportRequest>,
) -> Result<axum::response::Response, StatusCode> {
match state
.audit_logger
.export(request.format, request.filter)
.await
{
Ok(data) => {
let content_type = match request.format {
SiemFormat::Json
| SiemFormat::JsonLines
| SiemFormat::SplunkCim
| SiemFormat::ElasticEcs => "application/json",
SiemFormat::Csv => "text/csv",
};
Ok(axum::response::Response::builder()
.status(StatusCode::OK)
.header("Content-Type", content_type)
.body(axum::body::Body::from(data))
.unwrap())
}
Err(e) => {
error!("Failed to export audit logs: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
/// Get audit logger statistics
async fn get_audit_stats(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
let stats = state.audit_logger.stats().await;
Ok(Json(ApiResponse::success(
serde_json::to_value(stats).unwrap_or_default(),
)))
}
/// Get audit storage statistics
async fn get_audit_storage_stats(
State(_state): State<SharedState>,
) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
// Note: This would require access to the storage backend
// For now, return a placeholder
Ok(Json(ApiResponse::success(serde_json::json!({
"total_events": 0,
"total_size_bytes": 0,
"oldest_event": null,
"newest_event": null,
"note": "Storage stats require direct storage access"
}))))
}
/// Apply retention policy to audit logs
async fn apply_audit_retention(
State(state): State<SharedState>,
Json(_policy): Json<RetentionPolicy>,
) -> Result<Json<ApiResponse<usize>>, StatusCode> {
// Temporarily update the audit logger's retention policy
// In production, this should be done through configuration
match state.audit_logger.apply_retention().await {
Ok(deleted) => Ok(Json(ApiResponse::success(deleted))),
Err(e) => {
error!("Failed to apply retention policy: {}", e);
Ok(Json(ApiResponse::error(format!("Retention failed: {}", e))))
}
}
}
/// Search audit logs with text query
#[derive(Debug, Deserialize)]
struct SearchRequest {
query: String,
field: String,
}
async fn search_audit_logs(
State(state): State<SharedState>,
Json(request): Json<SearchRequest>,
) -> Result<Json<ApiResponse<Vec<AuditEvent>>>, StatusCode> {
// Build filter based on field
let filter = match request.field.as_str() {
"user" => AuditFilter {
user_id: Some(request.query.clone()),
..Default::default()
},
"resource" => AuditFilter {
resource: Some(request.query.clone()),
..Default::default()
},
"workspace" => AuditFilter {
workspace: Some(request.query.clone()),
..Default::default()
},
_ => {
// Search all fields - this is a simple implementation
// In production, you'd want full-text search
AuditFilter::default()
}
};
let query = AuditQuery {
filter,
limit: Some(100),
offset: None,
sort_by: Some("timestamp".to_string()),
sort_desc: true,
};
match state.audit_logger.query(query).await {
Ok(events) => Ok(Json(ApiResponse::success(events))),
Err(e) => {
error!("Failed to search audit logs: {}", e);
Ok(Json(ApiResponse::error(format!("Search failed: {}", e))))
}
}
}
2025-10-07 10:59:52 +01:00
// Task processor - runs tasks from the queue
async fn process_tasks(state: SharedState) {
info!("Starting task processor");
let metrics_collector = state.monitoring_system.metrics_collector();
loop {
match state.task_storage.dequeue().await {
Ok(Some(mut task)) => {
// Track task dequeue
metrics_collector.increment_task_counter();
metrics_collector.record_storage_operation(true);
if let Err(e) = state
.task_storage
.update_task_status(&task.id, TaskStatus::Running)
.await
{
2025-10-07 10:59:52 +01:00
error!("Failed to update task status: {}", e);
metrics_collector.record_storage_operation(false);
continue;
}
#[cfg(feature = "nats")]
state
.publish_task_status(&task.id, "running", Some(0), None)
.await;
2025-10-07 10:59:52 +01:00
info!("Processing task: {} ({})", task.id, task.name);
let task_start = std::time::Instant::now();
let result = state
.execute_nushell_command(&task.command, &task.args)
.await;
2025-10-07 10:59:52 +01:00
let task_duration = task_start.elapsed();
match result {
Ok(output) => {
info!("Task {} completed successfully", task.id);
task.output = Some(output);
task.status = TaskStatus::Completed;
task.completed_at = Some(chrono::Utc::now());
#[cfg(feature = "nats")]
state
.publish_task_status(&task.id, "completed", Some(100), None)
.await;
2025-10-07 10:59:52 +01:00
// Record metrics
metrics_collector.record_task_completion(task_duration.as_millis() as u64);
// Publish monitoring event
let event = MonitoringEvent {
event_type: MonitoringEventType::TaskStatusChanged,
timestamp: chrono::Utc::now(),
data: serde_json::to_value(&task).unwrap_or_default(),
metadata: {
let mut meta = std::collections::HashMap::new();
meta.insert("task_id".to_string(), task.id.clone());
meta.insert("status".to_string(), "completed".to_string());
meta.insert(
"duration_ms".to_string(),
task_duration.as_millis().to_string(),
);
2025-10-07 10:59:52 +01:00
meta
},
};
if let Err(e) = state.monitoring_system.publish_event(event).await {
error!("Failed to publish monitoring event: {}", e);
}
if let Err(e) = state.task_storage.update_task(task).await {
error!("Failed to update task: {}", e);
metrics_collector.record_storage_operation(false);
} else {
metrics_collector.record_storage_operation(true);
}
}
Err(e) => {
error!("Task {} failed: {}", task.id, e);
task.error = Some(e.to_string());
task.status = TaskStatus::Failed;
task.completed_at = Some(chrono::Utc::now());
#[cfg(feature = "nats")]
state
.publish_task_status(&task.id, "failed", None, Some(&e.to_string()))
.await;
2025-10-07 10:59:52 +01:00
// Record metrics
metrics_collector.record_task_failure();
// Publish monitoring event
let event = MonitoringEvent {
event_type: MonitoringEventType::TaskStatusChanged,
timestamp: chrono::Utc::now(),
data: serde_json::to_value(&task).unwrap_or_default(),
metadata: {
let mut meta = std::collections::HashMap::new();
meta.insert("task_id".to_string(), task.id.clone());
meta.insert("status".to_string(), "failed".to_string());
meta.insert("error".to_string(), e.to_string());
meta
},
};
if let Err(e) = state.monitoring_system.publish_event(event).await {
error!("Failed to publish monitoring event: {}", e);
}
if let Err(e) = state.task_storage.update_task(task.clone()).await {
error!("Failed to update task: {}", e);
metrics_collector.record_storage_operation(false);
} else {
metrics_collector.record_storage_operation(true);
}
// Try to requeue for retry
if let Err(e) = state.task_storage.requeue_failed_task(&task.id).await {
error!("Failed to requeue task: {}", e);
metrics_collector.record_storage_operation(false);
}
}
}
}
Ok(None) => {
// No tasks in queue, sleep
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
Err(e) => {
error!("Error dequeuing task: {}", e);
metrics_collector.record_storage_operation(false);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}
}
}
/// Solo mode helpers: spawn nats-server child process and wait for readiness
#[cfg(feature = "nats")]
mod solo_nats {
use anyhow::{Context, Result};
use tokio::net::TcpStream;
use tokio::process::Command;
use tokio::time::{timeout, Duration, Instant};
use tracing::info;
/// Spawn `nats-server` as a child process with JetStream enabled.
///
/// The returned `Child` holds the process alive; drop it to kill the
/// server. Uses `kill_on_drop(true)` so the process is cleaned up when
/// `Child` is dropped.
pub async fn spawn_nats_server(data_dir: &str) -> Result<tokio::process::Child> {
let nats_store_dir = format!("{}/nats", data_dir);
std::fs::create_dir_all(&nats_store_dir)
.context("Failed to create NATS storage directory")?;
let child = Command::new("nats-server")
.args(["-js", "-sd", &nats_store_dir, "-p", "4222"])
.kill_on_drop(true)
.spawn()
.context("Failed to spawn nats-server — ensure nats-server is in PATH")?;
wait_for_nats(4222).await?;
info!("✓ NATS server (solo mode) ready on port 4222");
Ok(child)
}
/// Attempt TCP connect to 127.0.0.1:{port} in a loop until ready or
/// timeout.
async fn wait_for_nats(port: u16) -> Result<()> {
let addr = format!("127.0.0.1:{}", port);
let deadline = Instant::now() + Duration::from_secs(10);
loop {
if Instant::now() > deadline {
return Err(anyhow::anyhow!(
"NATS server did not become ready within 10 seconds on port {}",
port
));
}
if timeout(Duration::from_millis(200), TcpStream::connect(&addr))
.await
.is_ok()
{
return Ok(());
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
2025-10-07 10:59:52 +01:00
#[tokio::main]
async fn main() -> Result<()> {
// Parse CLI arguments FIRST (so --help works before any other processing)
2025-10-07 10:59:52 +01:00
let args = Args::parse();
// Initialize centralized observability (logging, metrics, health checks)
let _guard = observability::init_from_env("orchestrator", env!("CARGO_PKG_VERSION"))
.context("Failed to initialize observability")?;
// Initialize platform startup manager
let deployment = load_deployment_mode().context("Failed to load deployment-mode.ncl")?;
// Check if orchestrator is enabled
if !deployment.is_service_enabled("orchestrator")? {
warn!("⚠ Orchestrator is DISABLED in deployment-mode.ncl");
std::process::exit(1);
}
info!("✓ Orchestrator is ENABLED in deployment-mode.ncl");
// Validate dependencies
let startup = PlatformStartup::new(&deployment.config)
.context("Failed to initialize platform startup")?;
startup
.validate_dependencies("orchestrator")
.context("Failed to validate orchestrator dependencies")?;
// Setup Git repositories
let (_schemas_path, _configs_path) = startup
.setup_git_repos()
.context("Failed to setup Git repositories")?;
// Load orchestrator configuration from Nickel
let config = OrchestratorConfig::load().context("Failed to load orchestrator configuration")?;
// Apply CLI overrides if provided
let mut config = config;
config.apply_cli_overrides(&args);
let port = config.orchestrator.server.port;
info!(
"🔧 Loaded orchestrator configuration from NCL, binding to port {}",
port
);
// Solo mode: spawn embedded NATS server before connecting as client
// The child is kept alive for the entire lifetime of main()
#[cfg(feature = "nats")]
let _solo_nats_child = if args.mode.as_deref() == Some("solo") {
info!("Solo mode: starting embedded NATS server");
Some(solo_nats::spawn_nats_server(&args.data_dir).await?)
} else {
None
};
2025-10-07 10:59:52 +01:00
let state = Arc::new(AppState::new(args).await?);
// Build webhook state with empty registry (workspaces registered via API or
// config)
let webhook_state = Arc::new(WebhookState {
registry: Arc::new(parking_lot::RwLock::new(WorkspaceRegistry::new())),
});
// Start audit collector (NATS → SurrealDB)
#[cfg(feature = "nats")]
{
use provisioning_orchestrator::audit::collector::run_audit_collector;
let nats = Arc::clone(&state.nats);
let db = Arc::clone(&state.db);
tokio::spawn(async move {
run_audit_collector(nats, db).await;
});
info!("✓ NATS audit collector started");
}
2025-10-07 10:59:52 +01:00
// Start task processor
let processor_state = state.clone();
tokio::spawn(async move {
process_tasks(processor_state).await;
});
let app = Router::new()
.route("/health", get(health_check))
.route("/api/v1/health", get(health_check))
2025-10-07 10:59:52 +01:00
.route("/tasks", get(list_tasks))
.route("/tasks/{id}", get(get_task_status))
.route("/workflows/servers/create", post(create_server_workflow))
.route("/workflows/taskserv/create", post(create_taskserv_workflow))
.route("/workflows/cluster/create", post(create_cluster_workflow))
// Batch operation routes
.route("/batch/execute", post(execute_batch_operation))
.route("/batch/operations", get(list_batch_operations))
.route("/batch/operations/{id}", get(get_batch_operation_status))
.route(
"/batch/operations/{id}/cancel",
post(cancel_batch_operation),
)
2025-10-07 10:59:52 +01:00
// State management routes
.route("/state/workflows/{id}/progress", get(get_workflow_progress))
.route(
"/state/workflows/{id}/snapshots",
get(get_workflow_snapshots),
)
2025-10-07 10:59:52 +01:00
.route("/state/system/metrics", get(get_system_metrics))
.route("/state/system/health", get(get_system_health))
.route("/state/statistics", get(get_state_statistics))
// Rollback and recovery routes
.route("/rollback/checkpoints", post(create_checkpoint))
.route("/rollback/checkpoints", get(list_checkpoints))
.route("/rollback/checkpoints/{id}", get(get_checkpoint))
.route("/rollback/execute", post(execute_rollback))
.route("/rollback/restore/{id}", post(restore_from_checkpoint))
.route("/rollback/statistics", get(get_rollback_statistics))
// Test environment routes
.route("/test/environments/create", post(create_test_environment))
.route("/test/environments", get(list_test_environments))
.route("/test/environments/{id}", get(get_test_environment))
.route("/test/environments/{id}/run", post(run_environment_tests))
.route(
"/test/environments/{id}",
axum::routing::delete(cleanup_test_environment),
)
2025-10-07 10:59:52 +01:00
.route("/test/environments/{id}/logs", get(get_environment_logs))
// DNS integration routes
.route("/api/v1/dns/records", get(list_dns_records))
// Extension loading routes
.route("/api/v1/extensions/loaded", get(list_loaded_extensions))
.route("/api/v1/extensions/reload", post(reload_extension))
// OCI registry routes
.route("/api/v1/oci/artifacts", post(list_oci_artifacts))
// Service orchestration routes
.route("/api/v1/services/list", get(list_services))
.route("/api/v1/services/status", get(get_services_status))
// Audit logging routes
.route("/api/v1/audit/query", post(query_audit_logs))
.route("/api/v1/audit/export", post(export_audit_logs))
.route("/api/v1/audit/stats", get(get_audit_stats))
.route("/api/v1/audit/storage-stats", get(get_audit_storage_stats))
.route("/api/v1/audit/apply-retention", post(apply_audit_retention))
.route("/api/v1/audit/search", post(search_audit_logs))
// Compliance routes
.nest(
"/api/v1/compliance",
compliance_routes(state.compliance_service.clone()),
)
2025-10-07 10:59:52 +01:00
// Merge monitoring routes (includes /metrics, /ws, /events)
.merge(state.monitoring_system.create_routes())
// Webhook handler (separate state — workspace registry)
.route(
"/api/v1/webhooks/:workspace_id",
post(handle_webhook).with_state(webhook_state),
)
2025-10-07 10:59:52 +01:00
.layer(CorsLayer::permissive())
.with_state(state);
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port))
.await
.context("Failed to bind to address")?;
info!("Orchestrator listening on port {}", port);
axum::serve(listener, app).await.context("Server error")?;
2025-10-07 10:59:52 +01:00
Ok(())
}