296 lines
9.2 KiB
Rust
Raw Normal View History

//! Orchestrator API Integration Service
//!
//! Handles communication with the provisioning-orchestrator service
use anyhow::Context;
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
use crate::error::{http, infrastructure, ControlCenterError, Result};
/// Task definition for orchestrator workflow
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct OrchestratorTask {
pub id: String,
pub task_type: String,
pub name: String,
pub description: String,
pub parameters: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
pub depends_on: Option<Vec<String>>,
}
/// Workflow submission request
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct WorkflowSubmissionRequest {
pub name: String,
pub description: String,
pub organization: String,
pub tasks: Vec<OrchestratorTask>,
}
/// Workflow submission response
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct WorkflowSubmissionResponse {
pub workflow_id: String,
pub status: String,
pub created_at: String,
}
/// Workflow status response
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct WorkflowStatus {
pub workflow_id: String,
pub status: String,
pub progress_percentage: u32,
pub completed_tasks: u32,
pub total_tasks: u32,
pub started_at: Option<String>,
pub completed_at: Option<String>,
pub error_message: Option<String>,
}
/// Orchestrator service for API communication
pub struct OrchestratorService {
base_url: String,
http_client: reqwest::Client,
}
impl OrchestratorService {
/// Create a new orchestrator service
pub fn new(base_url: Option<String>) -> Self {
let url = base_url.unwrap_or_else(|| "http://localhost:9090".to_string());
Self {
base_url: url,
http_client: reqwest::Client::new(),
}
}
/// Submit a deployment plan as a workflow to the orchestrator
pub async fn submit_workflow(
&self,
deployment_plan: &crate::services::iac_deployment::DeploymentPlan,
) -> Result<WorkflowSubmissionResponse> {
debug!("Submitting workflow for deployment: {}", deployment_plan.id);
// Convert deployment plan to orchestrator workflow format
let tasks = deployment_plan
.tasks
.iter()
.map(|t| OrchestratorTask {
id: t.id.clone(),
task_type: t.task_type.clone(),
name: t.name.clone(),
description: t.description.clone(),
parameters: t.parameters.clone(),
depends_on: if t.depends_on.is_empty() {
None
} else {
Some(t.depends_on.clone())
},
})
.collect();
let request = WorkflowSubmissionRequest {
name: deployment_plan.name.clone(),
description: deployment_plan.description.clone(),
organization: deployment_plan.organization.clone(),
tasks,
};
// Submit to orchestrator
let url = format!("{}/workflows/deployment/create", self.base_url);
debug!("Submitting to orchestrator at: {}", url);
let response = self
.http_client
.post(&url)
.json(&request)
.send()
.await
.context("Failed to connect to orchestrator API")?;
if !response.status().is_success() {
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
warn!("Orchestrator API error: {}", error_text);
return Err(ControlCenterError::Infrastructure(
infrastructure::InfrastructureError::External(format!(
"Orchestrator submission failed: {}",
error_text
)),
));
}
let workflow_response: WorkflowSubmissionResponse = response
.json()
.await
.context("Failed to parse orchestrator response")?;
debug!(
"Workflow submitted successfully: {}",
workflow_response.workflow_id
);
Ok(workflow_response)
}
/// Get the status of a submitted workflow
pub async fn get_workflow_status(&self, workflow_id: &str) -> Result<WorkflowStatus> {
debug!("Fetching workflow status for: {}", workflow_id);
let url = format!("{}/workflows/{}/status", self.base_url, workflow_id);
let response = self
.http_client
.get(&url)
.send()
.await
.context("Failed to connect to orchestrator API")?;
if !response.status().is_success() {
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
warn!("Orchestrator API error: {}", error_text);
return Err(ControlCenterError::Infrastructure(
infrastructure::InfrastructureError::External(format!(
"Failed to get workflow status: {}",
error_text
)),
));
}
let status: WorkflowStatus = response
.json()
.await
.context("Failed to parse workflow status")?;
Ok(status)
}
/// Cancel a submitted workflow
pub async fn cancel_workflow(&self, workflow_id: &str) -> Result<()> {
debug!("Cancelling workflow: {}", workflow_id);
let url = format!("{}/workflows/{}/cancel", self.base_url, workflow_id);
let response = self
.http_client
.post(&url)
.send()
.await
.context("Failed to connect to orchestrator API")?;
if !response.status().is_success() {
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
warn!("Orchestrator API error: {}", error_text);
return Err(ControlCenterError::Infrastructure(
infrastructure::InfrastructureError::External(format!(
"Failed to cancel workflow: {}",
error_text
)),
));
}
debug!("Workflow cancelled successfully: {}", workflow_id);
Ok(())
}
/// Get detailed workflow information
pub async fn get_workflow_details(&self, workflow_id: &str) -> Result<serde_json::Value> {
debug!("Fetching workflow details for: {}", workflow_id);
let url = format!("{}/workflows/{}", self.base_url, workflow_id);
let response = self
.http_client
.get(&url)
.send()
.await
.context("Failed to connect to orchestrator API")?;
if !response.status().is_success() {
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
warn!("Orchestrator API error: {}", error_text);
return Err(ControlCenterError::Infrastructure(
infrastructure::InfrastructureError::External(format!(
"Failed to get workflow details: {}",
error_text
)),
));
}
let details: serde_json::Value = response
.json()
.await
.context("Failed to parse workflow details")?;
Ok(details)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_orchestrator_service_creation() {
let service = OrchestratorService::new(None);
assert_eq!(service.base_url, "http://localhost:9090");
}
#[test]
fn test_orchestrator_service_custom_url() {
let custom_url = "http://custom-orchestrator:8080".to_string();
let service = OrchestratorService::new(Some(custom_url.clone()));
assert_eq!(service.base_url, custom_url);
}
#[test]
fn test_workflow_submission_request_serialization() {
let request = WorkflowSubmissionRequest {
name: "Test Deployment".to_string(),
description: "Test deployment".to_string(),
organization: "default".to_string(),
tasks: vec![],
};
let json = serde_json::to_string(&request).expect("Serialization failed");
assert!(json.contains("Test Deployment"));
assert!(json.contains("\"tasks\":[]"));
}
#[test]
fn test_workflow_status_parsing() {
let json = r#"{
"workflow_id": "wf-123",
"status": "running",
"progress_percentage": 50,
"completed_tasks": 2,
"total_tasks": 4,
"started_at": "2025-11-14T10:00:00Z",
"completed_at": null,
"error_message": null
}"#;
let status: WorkflowStatus = serde_json::from_str(json).expect("Parsing failed");
assert_eq!(status.workflow_id, "wf-123");
assert_eq!(status.progress_percentage, 50);
assert_eq!(status.completed_tasks, 2);
}
}