1086 lines
35 KiB
Rust
Raw Normal View History

use std::sync::Arc;
2025-10-07 10:59:52 +01:00
use anyhow::{Context, Result};
use axum::{
extract::{Path, State},
2026-01-12 04:53:31 +00:00
http::StatusCode,
2025-10-07 10:59:52 +01:00
response::Json,
routing::{get, post},
Router,
};
use clap::Parser;
// Use types from the library
use provisioning_orchestrator::{
2026-01-12 04:53:31 +00:00
audit::{AuditEvent, AuditFilter, AuditQuery, RetentionPolicy, SiemFormat},
batch::{BatchOperationRequest, BatchOperationResult},
compliance_routes,
2026-01-12 04:53:31 +00:00
monitor::{MonitoringEvent, MonitoringEventType, SystemHealthStatus},
rollback::{Checkpoint, RollbackResult, RollbackStatistics},
state::{ProgressInfo, StateManagerStatistics, StateSnapshot, SystemMetrics},
test_environment::{
2026-01-12 04:53:31 +00:00
CreateTestEnvironmentRequest, RunTestRequest, TestEnvironment, TestEnvironmentResponse,
TestResult,
},
2026-01-12 04:53:31 +00:00
workflow::WorkflowExecutionState,
AppState, Args, ClusterWorkflow, CreateServerWorkflow, SharedState, TaskStatus,
TaskservWorkflow, WorkflowTask,
2025-10-07 10:59:52 +01:00
};
use serde::{Deserialize, Serialize};
2025-10-07 10:59:52 +01:00
use tower_http::cors::CorsLayer;
use tracing::{error, info, warn};
use uuid::Uuid;
#[derive(Serialize)]
struct ApiResponse<T> {
success: bool,
data: Option<T>,
error: Option<String>,
}
impl<T> ApiResponse<T> {
fn success(data: T) -> Self {
Self {
success: true,
data: Some(data),
error: None,
}
}
fn error(message: String) -> Self {
Self {
success: false,
data: None,
error: Some(message),
}
}
}
// REST API Routes
async fn create_server_workflow(
State(state): State<SharedState>,
Json(workflow): Json<CreateServerWorkflow>,
) -> Result<Json<ApiResponse<String>>, StatusCode> {
let task_id = Uuid::new_v4().to_string();
let task = WorkflowTask {
id: task_id.clone(),
name: "create_servers".to_string(),
command: format!("{} servers create", state.args.provisioning_path),
args: vec![
"--infra".to_string(),
workflow.infra.clone(),
"--settings".to_string(),
workflow.settings.clone(),
if workflow.check_mode {
"--check".to_string()
} else {
"".to_string()
},
if workflow.wait {
"--wait".to_string()
} else {
"".to_string()
},
]
.into_iter()
.filter(|s| !s.is_empty())
.collect(),
2025-10-07 10:59:52 +01:00
dependencies: vec![],
status: TaskStatus::Pending,
created_at: chrono::Utc::now(),
started_at: None,
completed_at: None,
output: None,
error: None,
};
match state.task_storage.enqueue(task, 5).await {
Ok(()) => {
info!("Enqueued server creation workflow: {}", task_id);
Ok(Json(ApiResponse::success(task_id)))
}
Err(e) => {
error!("Failed to enqueue task: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn create_taskserv_workflow(
State(state): State<SharedState>,
Json(workflow): Json<TaskservWorkflow>,
) -> Result<Json<ApiResponse<String>>, StatusCode> {
let task_id = Uuid::new_v4().to_string();
let task = WorkflowTask {
id: task_id.clone(),
name: format!("{}_taskserv", workflow.operation),
command: format!(
"{} taskserv {}",
state.args.provisioning_path, workflow.operation
),
2025-10-07 10:59:52 +01:00
args: vec![
workflow.taskserv.clone(),
"--infra".to_string(),
workflow.infra.clone(),
"--settings".to_string(),
workflow.settings.clone(),
if workflow.check_mode {
"--check".to_string()
} else {
"".to_string()
},
if workflow.wait {
"--wait".to_string()
} else {
"".to_string()
},
]
.into_iter()
.filter(|s| !s.is_empty())
.collect(),
2025-10-07 10:59:52 +01:00
dependencies: vec![],
status: TaskStatus::Pending,
created_at: chrono::Utc::now(),
started_at: None,
completed_at: None,
output: None,
error: None,
};
match state.task_storage.enqueue(task, 6).await {
Ok(()) => {
info!(
"Enqueued taskserv {} workflow: {}",
workflow.operation, task_id
);
2025-10-07 10:59:52 +01:00
Ok(Json(ApiResponse::success(task_id)))
}
Err(e) => {
error!("Failed to enqueue task: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn create_cluster_workflow(
State(state): State<SharedState>,
Json(workflow): Json<ClusterWorkflow>,
) -> Result<Json<ApiResponse<String>>, StatusCode> {
let task_id = Uuid::new_v4().to_string();
let task = WorkflowTask {
id: task_id.clone(),
name: format!("{}_cluster", workflow.operation),
command: format!(
"{} cluster {}",
state.args.provisioning_path, workflow.operation
),
2025-10-07 10:59:52 +01:00
args: vec![
workflow.cluster_type.clone(),
"--infra".to_string(),
workflow.infra.clone(),
"--settings".to_string(),
workflow.settings.clone(),
if workflow.check_mode {
"--check".to_string()
} else {
"".to_string()
},
if workflow.wait {
"--wait".to_string()
} else {
"".to_string()
},
]
.into_iter()
.filter(|s| !s.is_empty())
.collect(),
2025-10-07 10:59:52 +01:00
dependencies: vec![],
status: TaskStatus::Pending,
created_at: chrono::Utc::now(),
started_at: None,
completed_at: None,
output: None,
error: None,
};
match state.task_storage.enqueue(task, 7).await {
Ok(()) => {
info!(
"Enqueued cluster {} workflow: {}",
workflow.operation, task_id
);
2025-10-07 10:59:52 +01:00
Ok(Json(ApiResponse::success(task_id)))
}
Err(e) => {
error!("Failed to enqueue task: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn get_task_status(
State(state): State<SharedState>,
Path(task_id): Path<String>,
) -> Result<Json<ApiResponse<WorkflowTask>>, StatusCode> {
match state.task_storage.get_task(&task_id).await {
Ok(Some(task)) => Ok(Json(ApiResponse::success(task))),
Ok(None) => Err(StatusCode::NOT_FOUND),
Err(e) => {
error!("Failed to get task {}: {}", task_id, e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn list_tasks(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<Vec<WorkflowTask>>>, StatusCode> {
match state.task_storage.list_tasks(None).await {
Ok(task_list) => Ok(Json(ApiResponse::success(task_list))),
Err(e) => {
error!("Failed to list tasks: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn health_check() -> Json<ApiResponse<String>> {
Json(ApiResponse::success("Orchestrator is healthy".to_string()))
}
// Batch operation routes
async fn execute_batch_operation(
State(state): State<SharedState>,
Json(request): Json<BatchOperationRequest>,
) -> Result<Json<ApiResponse<BatchOperationResult>>, StatusCode> {
match state
.batch_coordinator
.execute_batch_operation(request)
.await
{
2025-10-07 10:59:52 +01:00
Ok(result) => Ok(Json(ApiResponse::success(result))),
Err(e) => {
error!("Failed to execute batch operation: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn get_batch_operation_status(
State(state): State<SharedState>,
Path(operation_id): Path<String>,
) -> Result<Json<ApiResponse<WorkflowExecutionState>>, StatusCode> {
match state
.batch_coordinator
.get_operation_status(&operation_id)
.await
{
2025-10-07 10:59:52 +01:00
Some(state) => Ok(Json(ApiResponse::success(state))),
None => Err(StatusCode::NOT_FOUND),
}
}
async fn list_batch_operations(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<Vec<WorkflowExecutionState>>>, StatusCode> {
let operations = state.batch_coordinator.list_operations().await;
Ok(Json(ApiResponse::success(operations)))
}
async fn cancel_batch_operation(
State(state): State<SharedState>,
Path(operation_id): Path<String>,
) -> Result<Json<ApiResponse<String>>, StatusCode> {
match state
.batch_coordinator
.cancel_operation(&operation_id)
.await
{
Ok(_) => Ok(Json(ApiResponse::success(format!(
"Operation {} cancelled",
operation_id
)))),
2025-10-07 10:59:52 +01:00
Err(e) => {
error!("Failed to cancel operation {}: {}", operation_id, e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// Rollback and recovery routes
#[derive(Deserialize)]
struct CreateCheckpointRequest {
name: String,
description: Option<String>,
}
#[derive(Deserialize)]
struct RollbackRequest {
checkpoint_id: Option<String>,
operation_ids: Option<Vec<String>>,
}
async fn create_checkpoint(
State(state): State<SharedState>,
Json(request): Json<CreateCheckpointRequest>,
) -> Result<Json<ApiResponse<String>>, StatusCode> {
match state
.rollback_system
.checkpoint_manager
.create_checkpoint(request.name, request.description)
.await
{
2025-10-07 10:59:52 +01:00
Ok(checkpoint_id) => {
info!("Created checkpoint: {}", checkpoint_id);
Ok(Json(ApiResponse::success(checkpoint_id)))
}
Err(e) => {
error!("Failed to create checkpoint: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn list_checkpoints(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<Vec<Checkpoint>>>, StatusCode> {
let checkpoints = state
.rollback_system
.checkpoint_manager
.list_checkpoints()
.await;
2025-10-07 10:59:52 +01:00
Ok(Json(ApiResponse::success(checkpoints)))
}
async fn get_checkpoint(
State(state): State<SharedState>,
Path(checkpoint_id): Path<String>,
) -> Result<Json<ApiResponse<Checkpoint>>, StatusCode> {
match state
.rollback_system
.checkpoint_manager
.get_checkpoint(&checkpoint_id)
.await
{
2025-10-07 10:59:52 +01:00
Some(checkpoint) => Ok(Json(ApiResponse::success(checkpoint))),
None => Err(StatusCode::NOT_FOUND),
}
}
async fn execute_rollback(
State(state): State<SharedState>,
Json(request): Json<RollbackRequest>,
) -> Result<Json<ApiResponse<RollbackResult>>, StatusCode> {
let result = if let Some(checkpoint_id) = request.checkpoint_id {
// Rollback to specific checkpoint
match state
.rollback_system
.rollback_executor
.rollback_to_checkpoint(&checkpoint_id)
.await
{
2025-10-07 10:59:52 +01:00
Ok(result) => result,
Err(e) => {
error!(
"Failed to execute rollback to checkpoint {}: {}",
checkpoint_id, e
);
2025-10-07 10:59:52 +01:00
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
}
} else if let Some(operation_ids) = request.operation_ids {
// Partial rollback of specific operations
match state
.rollback_system
.rollback_executor
.rollback_operations(operation_ids)
.await
{
2025-10-07 10:59:52 +01:00
Ok(result) => result,
Err(e) => {
error!("Failed to execute partial rollback: {}", e);
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
}
} else {
return Err(StatusCode::BAD_REQUEST);
};
if result.success {
info!(
"Rollback executed successfully: {} operations",
result.operations_executed
);
2025-10-07 10:59:52 +01:00
} else {
warn!(
"Rollback completed with errors: {}/{} operations failed",
result.operations_failed,
result.operations_executed + result.operations_failed
);
2025-10-07 10:59:52 +01:00
}
Ok(Json(ApiResponse::success(result)))
}
async fn get_rollback_statistics(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<RollbackStatistics>>, StatusCode> {
let stats = state.rollback_system.get_statistics().await;
Ok(Json(ApiResponse::success(stats)))
}
async fn restore_from_checkpoint(
State(state): State<SharedState>,
Path(checkpoint_id): Path<String>,
) -> Result<Json<ApiResponse<String>>, StatusCode> {
match state
.rollback_system
.state_restorer
.restore_from_checkpoint(&checkpoint_id)
.await
{
2025-10-07 10:59:52 +01:00
Ok(_) => {
info!(
"Successfully restored state from checkpoint: {}",
checkpoint_id
);
Ok(Json(ApiResponse::success(format!(
"State restored from checkpoint {}",
checkpoint_id
))))
2025-10-07 10:59:52 +01:00
}
Err(e) => {
error!("Failed to restore from checkpoint {}: {}", checkpoint_id, e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// State management and monitoring routes
async fn get_workflow_progress(
State(state): State<SharedState>,
Path(workflow_id): Path<String>,
) -> Result<Json<ApiResponse<ProgressInfo>>, StatusCode> {
match state
.progress_tracker
.get_real_time_progress(&workflow_id)
.await
{
2025-10-07 10:59:52 +01:00
Some(progress) => Ok(Json(ApiResponse::success(progress))),
None => Err(StatusCode::NOT_FOUND),
}
}
async fn get_workflow_snapshots(
State(state): State<SharedState>,
Path(workflow_id): Path<String>,
) -> Result<Json<ApiResponse<Vec<StateSnapshot>>>, StatusCode> {
let snapshots = state
.state_manager
.get_workflow_snapshots(&workflow_id)
.await;
2025-10-07 10:59:52 +01:00
Ok(Json(ApiResponse::success(snapshots)))
}
async fn get_system_metrics(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<SystemMetrics>>, StatusCode> {
let metrics = state.state_manager.get_system_metrics().await;
Ok(Json(ApiResponse::success(metrics)))
}
async fn get_system_health(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<SystemHealthStatus>>, StatusCode> {
let health_status = state
.monitoring_system
.health_monitor()
.get_system_health()
.await;
2025-10-07 10:59:52 +01:00
Ok(Json(ApiResponse::success(health_status)))
}
async fn get_state_statistics(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<StateManagerStatistics>>, StatusCode> {
match state.state_manager.get_statistics().await {
Ok(stats) => Ok(Json(ApiResponse::success(stats))),
Err(e) => {
error!("Failed to get state statistics: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// Test environment routes
async fn create_test_environment(
State(state): State<SharedState>,
Json(request): Json<CreateTestEnvironmentRequest>,
) -> Result<Json<ApiResponse<TestEnvironmentResponse>>, StatusCode> {
match state.test_orchestrator.create_environment(request).await {
Ok(response) => Ok(Json(ApiResponse::success(response))),
Err(e) => {
error!("Failed to create test environment: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn get_test_environment(
State(state): State<SharedState>,
Path(env_id): Path<String>,
) -> Result<Json<ApiResponse<TestEnvironment>>, StatusCode> {
match state.test_orchestrator.get_environment(&env_id).await {
Some(env) => Ok(Json(ApiResponse::success(env))),
None => Err(StatusCode::NOT_FOUND),
}
}
async fn list_test_environments(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<Vec<TestEnvironment>>>, StatusCode> {
let environments = state.test_orchestrator.list_environments().await;
Ok(Json(ApiResponse::success(environments)))
}
async fn run_environment_tests(
State(state): State<SharedState>,
Path(env_id): Path<String>,
Json(request): Json<RunTestRequest>,
) -> Result<Json<ApiResponse<Vec<TestResult>>>, StatusCode> {
match state.test_orchestrator.run_tests(&env_id, request).await {
Ok(results) => Ok(Json(ApiResponse::success(results))),
Err(e) => {
error!("Failed to run tests: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn cleanup_test_environment(
State(state): State<SharedState>,
Path(env_id): Path<String>,
) -> Result<Json<ApiResponse<String>>, StatusCode> {
match state
.test_orchestrator
.cleanup_environment(&env_id, false)
.await
{
Ok(_) => Ok(Json(ApiResponse::success(format!(
"Environment {} cleaned up",
env_id
)))),
2025-10-07 10:59:52 +01:00
Err(e) => {
error!("Failed to cleanup environment: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn get_environment_logs(
State(state): State<SharedState>,
Path(env_id): Path<String>,
) -> Result<Json<ApiResponse<Vec<String>>>, StatusCode> {
match state.test_orchestrator.get_environment_logs(&env_id).await {
Ok(logs) => Ok(Json(ApiResponse::success(logs))),
Err(e) => {
error!("Failed to get environment logs: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// DNS Management Routes
async fn list_dns_records(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<Vec<provisioning_orchestrator::dns::DnsRecord>>>, StatusCode> {
match state.dns_manager.list_records().await {
Ok(records) => Ok(Json(ApiResponse::success(records))),
Err(e) => {
error!("Failed to list DNS records: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// Extension Management Routes
async fn list_loaded_extensions(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<Vec<provisioning_orchestrator::extensions::Extension>>>, StatusCode> {
let extensions = state.extension_manager.list_loaded_extensions().await;
Ok(Json(ApiResponse::success(extensions)))
}
#[derive(Deserialize)]
struct ReloadExtensionRequest {
extension_type: String,
name: String,
}
async fn reload_extension(
State(state): State<SharedState>,
Json(request): Json<ReloadExtensionRequest>,
) -> Result<Json<ApiResponse<String>>, StatusCode> {
use provisioning_orchestrator::extensions::ExtensionType;
let ext_type = match request.extension_type.as_str() {
"provider" => ExtensionType::Provider,
"taskserv" => ExtensionType::Taskserv,
"cluster" => ExtensionType::Cluster,
_ => return Err(StatusCode::BAD_REQUEST),
};
match state
.extension_manager
.reload_extension(ext_type, request.name.clone())
.await
{
Ok(_) => Ok(Json(ApiResponse::success(format!(
"Extension {} reloaded",
request.name
)))),
2025-10-07 10:59:52 +01:00
Err(e) => {
error!("Failed to reload extension: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// OCI Registry Routes
#[derive(Deserialize)]
struct ListOciArtifactsRequest {
namespace: String,
}
async fn list_oci_artifacts(
State(state): State<SharedState>,
Json(request): Json<ListOciArtifactsRequest>,
) -> Result<Json<ApiResponse<Vec<provisioning_orchestrator::oci::OciArtifact>>>, StatusCode> {
match state
.oci_manager
.list_oci_artifacts(&request.namespace)
.await
{
2025-10-07 10:59:52 +01:00
Ok(artifacts) => Ok(Json(ApiResponse::success(artifacts))),
Err(e) => {
error!("Failed to list OCI artifacts: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// Service Orchestration Routes
async fn list_services(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<Vec<provisioning_orchestrator::services::Service>>>, StatusCode> {
let services = state.service_orchestrator.list_services().await;
Ok(Json(ApiResponse::success(services)))
}
#[derive(Serialize)]
struct ServiceStatusResponse {
name: String,
status: provisioning_orchestrator::services::ServiceStatus,
}
async fn get_services_status(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<Vec<ServiceStatusResponse>>>, StatusCode> {
let services = state.service_orchestrator.list_services().await;
let mut statuses = Vec::new();
for service in services {
if let Ok(status) = state
.service_orchestrator
.get_service_status(&service.name)
.await
{
2025-10-07 10:59:52 +01:00
statuses.push(ServiceStatusResponse {
name: service.name,
status,
});
}
}
Ok(Json(ApiResponse::success(statuses)))
}
// ============================================================================
// Audit API Handlers
// ============================================================================
/// Query audit logs with filters
async fn query_audit_logs(
State(state): State<SharedState>,
Json(query): Json<AuditQuery>,
) -> Result<Json<ApiResponse<Vec<AuditEvent>>>, StatusCode> {
match state.audit_logger.query(query).await {
Ok(events) => Ok(Json(ApiResponse::success(events))),
Err(e) => {
error!("Failed to query audit logs: {}", e);
Ok(Json(ApiResponse::error(format!("Query failed: {}", e))))
}
}
}
/// Export audit logs to specific format
#[derive(Debug, Deserialize)]
struct ExportRequest {
format: SiemFormat,
filter: Option<AuditFilter>,
}
async fn export_audit_logs(
State(state): State<SharedState>,
Json(request): Json<ExportRequest>,
) -> Result<axum::response::Response, StatusCode> {
match state
.audit_logger
.export(request.format, request.filter)
.await
{
Ok(data) => {
let content_type = match request.format {
SiemFormat::Json
| SiemFormat::JsonLines
| SiemFormat::SplunkCim
| SiemFormat::ElasticEcs => "application/json",
SiemFormat::Csv => "text/csv",
};
Ok(axum::response::Response::builder()
.status(StatusCode::OK)
.header("Content-Type", content_type)
.body(axum::body::Body::from(data))
.unwrap())
}
Err(e) => {
error!("Failed to export audit logs: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
/// Get audit logger statistics
async fn get_audit_stats(
State(state): State<SharedState>,
) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
let stats = state.audit_logger.stats().await;
Ok(Json(ApiResponse::success(
serde_json::to_value(stats).unwrap_or_default(),
)))
}
/// Get audit storage statistics
async fn get_audit_storage_stats(
State(_state): State<SharedState>,
) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
// Note: This would require access to the storage backend
// For now, return a placeholder
Ok(Json(ApiResponse::success(serde_json::json!({
"total_events": 0,
"total_size_bytes": 0,
"oldest_event": null,
"newest_event": null,
"note": "Storage stats require direct storage access"
}))))
}
/// Apply retention policy to audit logs
async fn apply_audit_retention(
State(state): State<SharedState>,
Json(_policy): Json<RetentionPolicy>,
) -> Result<Json<ApiResponse<usize>>, StatusCode> {
// Temporarily update the audit logger's retention policy
// In production, this should be done through configuration
match state.audit_logger.apply_retention().await {
Ok(deleted) => Ok(Json(ApiResponse::success(deleted))),
Err(e) => {
error!("Failed to apply retention policy: {}", e);
Ok(Json(ApiResponse::error(format!("Retention failed: {}", e))))
}
}
}
/// Search audit logs with text query
#[derive(Debug, Deserialize)]
struct SearchRequest {
query: String,
field: String,
}
async fn search_audit_logs(
State(state): State<SharedState>,
Json(request): Json<SearchRequest>,
) -> Result<Json<ApiResponse<Vec<AuditEvent>>>, StatusCode> {
// Build filter based on field
let filter = match request.field.as_str() {
"user" => AuditFilter {
user_id: Some(request.query.clone()),
..Default::default()
},
"resource" => AuditFilter {
resource: Some(request.query.clone()),
..Default::default()
},
"workspace" => AuditFilter {
workspace: Some(request.query.clone()),
..Default::default()
},
_ => {
// Search all fields - this is a simple implementation
// In production, you'd want full-text search
AuditFilter::default()
}
};
let query = AuditQuery {
filter,
limit: Some(100),
offset: None,
sort_by: Some("timestamp".to_string()),
sort_desc: true,
};
match state.audit_logger.query(query).await {
Ok(events) => Ok(Json(ApiResponse::success(events))),
Err(e) => {
error!("Failed to search audit logs: {}", e);
Ok(Json(ApiResponse::error(format!("Search failed: {}", e))))
}
}
}
2025-10-07 10:59:52 +01:00
// Task processor - runs tasks from the queue
async fn process_tasks(state: SharedState) {
info!("Starting task processor");
let metrics_collector = state.monitoring_system.metrics_collector();
loop {
match state.task_storage.dequeue().await {
Ok(Some(mut task)) => {
// Track task dequeue
metrics_collector.increment_task_counter();
metrics_collector.record_storage_operation(true);
if let Err(e) = state
.task_storage
.update_task_status(&task.id, TaskStatus::Running)
.await
{
2025-10-07 10:59:52 +01:00
error!("Failed to update task status: {}", e);
metrics_collector.record_storage_operation(false);
continue;
}
info!("Processing task: {} ({})", task.id, task.name);
let task_start = std::time::Instant::now();
let result = state
.execute_nushell_command(&task.command, &task.args)
.await;
2025-10-07 10:59:52 +01:00
let task_duration = task_start.elapsed();
match result {
Ok(output) => {
info!("Task {} completed successfully", task.id);
task.output = Some(output);
task.status = TaskStatus::Completed;
task.completed_at = Some(chrono::Utc::now());
// Record metrics
metrics_collector.record_task_completion(task_duration.as_millis() as u64);
// Publish monitoring event
let event = MonitoringEvent {
event_type: MonitoringEventType::TaskStatusChanged,
timestamp: chrono::Utc::now(),
data: serde_json::to_value(&task).unwrap_or_default(),
metadata: {
let mut meta = std::collections::HashMap::new();
meta.insert("task_id".to_string(), task.id.clone());
meta.insert("status".to_string(), "completed".to_string());
meta.insert(
"duration_ms".to_string(),
task_duration.as_millis().to_string(),
);
2025-10-07 10:59:52 +01:00
meta
},
};
if let Err(e) = state.monitoring_system.publish_event(event).await {
error!("Failed to publish monitoring event: {}", e);
}
if let Err(e) = state.task_storage.update_task(task).await {
error!("Failed to update task: {}", e);
metrics_collector.record_storage_operation(false);
} else {
metrics_collector.record_storage_operation(true);
}
}
Err(e) => {
error!("Task {} failed: {}", task.id, e);
task.error = Some(e.to_string());
task.status = TaskStatus::Failed;
task.completed_at = Some(chrono::Utc::now());
// Record metrics
metrics_collector.record_task_failure();
// Publish monitoring event
let event = MonitoringEvent {
event_type: MonitoringEventType::TaskStatusChanged,
timestamp: chrono::Utc::now(),
data: serde_json::to_value(&task).unwrap_or_default(),
metadata: {
let mut meta = std::collections::HashMap::new();
meta.insert("task_id".to_string(), task.id.clone());
meta.insert("status".to_string(), "failed".to_string());
meta.insert("error".to_string(), e.to_string());
meta
},
};
if let Err(e) = state.monitoring_system.publish_event(event).await {
error!("Failed to publish monitoring event: {}", e);
}
if let Err(e) = state.task_storage.update_task(task.clone()).await {
error!("Failed to update task: {}", e);
metrics_collector.record_storage_operation(false);
} else {
metrics_collector.record_storage_operation(true);
}
// Try to requeue for retry
if let Err(e) = state.task_storage.requeue_failed_task(&task.id).await {
error!("Failed to requeue task: {}", e);
metrics_collector.record_storage_operation(false);
}
}
}
}
Ok(None) => {
// No tasks in queue, sleep
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
Err(e) => {
error!("Error dequeuing task: {}", e);
metrics_collector.record_storage_operation(false);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let args = Args::parse();
let port = args.port;
info!("Starting provisioning orchestrator on port {}", port);
let state = Arc::new(AppState::new(args).await?);
// Start task processor
let processor_state = state.clone();
tokio::spawn(async move {
process_tasks(processor_state).await;
});
let app = Router::new()
.route("/health", get(health_check))
.route("/tasks", get(list_tasks))
.route("/tasks/{id}", get(get_task_status))
.route("/workflows/servers/create", post(create_server_workflow))
.route("/workflows/taskserv/create", post(create_taskserv_workflow))
.route("/workflows/cluster/create", post(create_cluster_workflow))
// Batch operation routes
.route("/batch/execute", post(execute_batch_operation))
.route("/batch/operations", get(list_batch_operations))
.route("/batch/operations/{id}", get(get_batch_operation_status))
.route(
"/batch/operations/{id}/cancel",
post(cancel_batch_operation),
)
2025-10-07 10:59:52 +01:00
// State management routes
.route("/state/workflows/{id}/progress", get(get_workflow_progress))
.route(
"/state/workflows/{id}/snapshots",
get(get_workflow_snapshots),
)
2025-10-07 10:59:52 +01:00
.route("/state/system/metrics", get(get_system_metrics))
.route("/state/system/health", get(get_system_health))
.route("/state/statistics", get(get_state_statistics))
// Rollback and recovery routes
.route("/rollback/checkpoints", post(create_checkpoint))
.route("/rollback/checkpoints", get(list_checkpoints))
.route("/rollback/checkpoints/{id}", get(get_checkpoint))
.route("/rollback/execute", post(execute_rollback))
.route("/rollback/restore/{id}", post(restore_from_checkpoint))
.route("/rollback/statistics", get(get_rollback_statistics))
// Test environment routes
.route("/test/environments/create", post(create_test_environment))
.route("/test/environments", get(list_test_environments))
.route("/test/environments/{id}", get(get_test_environment))
.route("/test/environments/{id}/run", post(run_environment_tests))
.route(
"/test/environments/{id}",
axum::routing::delete(cleanup_test_environment),
)
2025-10-07 10:59:52 +01:00
.route("/test/environments/{id}/logs", get(get_environment_logs))
// DNS integration routes
.route("/api/v1/dns/records", get(list_dns_records))
// Extension loading routes
.route("/api/v1/extensions/loaded", get(list_loaded_extensions))
.route("/api/v1/extensions/reload", post(reload_extension))
// OCI registry routes
.route("/api/v1/oci/artifacts", post(list_oci_artifacts))
// Service orchestration routes
.route("/api/v1/services/list", get(list_services))
.route("/api/v1/services/status", get(get_services_status))
// Audit logging routes
.route("/api/v1/audit/query", post(query_audit_logs))
.route("/api/v1/audit/export", post(export_audit_logs))
.route("/api/v1/audit/stats", get(get_audit_stats))
.route("/api/v1/audit/storage-stats", get(get_audit_storage_stats))
.route("/api/v1/audit/apply-retention", post(apply_audit_retention))
.route("/api/v1/audit/search", post(search_audit_logs))
// Compliance routes
.nest(
"/api/v1/compliance",
compliance_routes(state.compliance_service.clone()),
)
2025-10-07 10:59:52 +01:00
// Merge monitoring routes (includes /metrics, /ws, /events)
.merge(state.monitoring_system.create_routes())
.layer(CorsLayer::permissive())
.with_state(state);
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port))
.await
.context("Failed to bind to address")?;
info!("Orchestrator listening on port {}", port);
axum::serve(listener, app).await.context("Server error")?;
2025-10-07 10:59:52 +01:00
Ok(())
}