chore: add ValidationPipeline

This commit is contained in:
Jesús Pérez 2026-01-14 21:12:49 +00:00
parent d3316d75ba
commit a601c1a093
Signed by: jesus
GPG Key ID: 9F243E355E0BC939
27 changed files with 2877 additions and 93 deletions

View File

@ -25,7 +25,7 @@
// It does NOT catch malformed closing fences with language specifiers (e.g., ```plaintext) // It does NOT catch malformed closing fences with language specifiers (e.g., ```plaintext)
// CommonMark spec requires closing fences to be ``` only (no language) // CommonMark spec requires closing fences to be ``` only (no language)
// Use separate validation script to check closing fences // Use separate validation script to check closing fences
"MD040": false, // fenced-code-language (relaxed - flexible language specifiers) "MD040": true, // fenced-code-language (relaxed - flexible language specifiers)
// Formatting - strict whitespace // Formatting - strict whitespace
"MD009": true, // no-hard-tabs "MD009": true, // no-hard-tabs

4
Cargo.lock generated
View File

@ -8060,7 +8060,7 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8c27177b12a6399ffc08b98f76f7c9a1f4fe9fc967c784c5a071fa8d93cf7e1" checksum = "d8c27177b12a6399ffc08b98f76f7c9a1f4fe9fc967c784c5a071fa8d93cf7e1"
dependencies = [ dependencies = [
"windows-sys 0.59.0", "windows-sys 0.61.2",
] ]
[[package]] [[package]]
@ -9249,10 +9249,12 @@ name = "vapora-shared"
version = "1.2.0" version = "1.2.0"
dependencies = [ dependencies = [
"chrono", "chrono",
"regex",
"serde", "serde",
"serde_json", "serde_json",
"surrealdb", "surrealdb",
"thiserror 2.0.17", "thiserror 2.0.17",
"tokio",
"toml", "toml",
"tracing", "tracing",
"uuid", "uuid",

File diff suppressed because one or more lines are too long

View File

@ -461,7 +461,7 @@
<p class="tagline">Evaporate complexity</p> <p class="tagline">Evaporate complexity</p>
<h1 <h1
data-en="Development Flows<br>When Teams and AI Orchestrate" data-en="Development Flows<br>When Teams and AI Orchestrate"
data-es="El Desarrollo Fluye<br>Cuando Equipos e IA Orquestan" data-es="El Desarrollo Fluye<br>Cuando los Equipos y la IA Orquestan"
> >
Development Flows Development Flows
</h1> </h1>
@ -470,19 +470,19 @@
class="highlight" class="highlight"
data-en="Specialized agents" data-en="Specialized agents"
data-es="Agentes especializados" data-es="Agentes especializados"
>Specialized agents</span >Specialized agents </span
> >
<span <span
data-en="orchestrate pipelines for design, implementation, testing, documentation and deployment. Agents learn from history and optimize costs automatically." data-en=" orchestrate pipelines for design, implementation, testing, documentation and deployment. Agents learn from history and optimize costs automatically."
data-es="orquestan pipelines para diseño, implementación, testing, documentación y deployment. Los agentes aprenden del historial y optimizan costos automáticamente." data-es=" que orquestan pipelines para diseño, implementación, testing, documentación y deployment. Los agentes aprenden del historial y optimizan costos automáticamente."
>orchestrate pipelines for design, implementation, testing, >orchestrate pipelines for design, implementation, testing,
documentation and deployment. Agents learn from history and optimize documentation and deployment. Agents learn from history and optimize
costs automatically.</span costs automatically.
> </span>
<strong data-en="100% self-hosted." data-es="100% self-hosted." <br><span><strong data-en="100% self-hosted." data-es="100% self-hosted."
>100% self-hosted.</strong >100% self-hosted.</strong
> >
<span data-en="" data-es=""></span> </span>
</p> </p>
</header> </header>

View File

@ -2,12 +2,14 @@
// Phase 2: Complete implementation with NATS integration // Phase 2: Complete implementation with NATS integration
use std::collections::HashMap; use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use chrono::Utc; use chrono::Utc;
use thiserror::Error; use thiserror::Error;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use uuid::Uuid; use uuid::Uuid;
use vapora_shared::validation::{SchemaRegistry, ValidationPipeline};
use crate::learning_profile::{ExecutionData, LearningProfile, TaskTypeExpertise}; use crate::learning_profile::{ExecutionData, LearningProfile, TaskTypeExpertise};
use crate::messages::{AgentMessage, TaskAssignment}; use crate::messages::{AgentMessage, TaskAssignment};
@ -30,6 +32,9 @@ pub enum CoordinatorError {
#[error("Invalid task state: {0}")] #[error("Invalid task state: {0}")]
InvalidTaskState(String), InvalidTaskState(String),
#[error("Validation error: {0}")]
ValidationError(String),
} }
use vapora_llm_router::BudgetManager; use vapora_llm_router::BudgetManager;
@ -46,6 +51,7 @@ pub struct AgentCoordinator {
swarm_coordinator: Option<Arc<SwarmCoordinator>>, swarm_coordinator: Option<Arc<SwarmCoordinator>>,
learning_profiles: Arc<std::sync::RwLock<HashMap<String, LearningProfile>>>, learning_profiles: Arc<std::sync::RwLock<HashMap<String, LearningProfile>>>,
budget_manager: Option<Arc<BudgetManager>>, budget_manager: Option<Arc<BudgetManager>>,
validation: Arc<ValidationPipeline>,
} }
impl AgentCoordinator { impl AgentCoordinator {
@ -100,12 +106,25 @@ impl AgentCoordinator {
} }
}); });
// Initialize validation pipeline
let schema_dir =
std::env::var("VAPORA_SCHEMA_DIR").unwrap_or_else(|_| "schemas".to_string());
let schema_path = PathBuf::from(&schema_dir);
let schema_registry = Arc::new(SchemaRegistry::new(schema_path));
let validation = Arc::new(ValidationPipeline::new(schema_registry));
info!(
"Initialized validation pipeline with schema dir: {}",
schema_dir
);
Ok(Self { Ok(Self {
registry, registry,
nats_client, nats_client,
swarm_coordinator: Some(swarm_coordinator), swarm_coordinator: Some(swarm_coordinator),
learning_profiles: Arc::new(std::sync::RwLock::new(HashMap::new())), learning_profiles: Arc::new(std::sync::RwLock::new(HashMap::new())),
budget_manager: None, budget_manager: None,
validation,
}) })
} }
@ -118,12 +137,20 @@ impl AgentCoordinator {
swarm_coordinator.register_agent(profile.clone()).ok(); swarm_coordinator.register_agent(profile.clone()).ok();
} }
// Initialize validation pipeline
let schema_dir =
std::env::var("VAPORA_SCHEMA_DIR").unwrap_or_else(|_| "schemas".to_string());
let schema_path = PathBuf::from(&schema_dir);
let schema_registry = Arc::new(SchemaRegistry::new(schema_path));
let validation = Arc::new(ValidationPipeline::new(schema_registry));
Self { Self {
registry, registry,
nats_client: None, nats_client: None,
swarm_coordinator: Some(swarm_coordinator), swarm_coordinator: Some(swarm_coordinator),
learning_profiles: Arc::new(std::sync::RwLock::new(HashMap::new())), learning_profiles: Arc::new(std::sync::RwLock::new(HashMap::new())),
budget_manager: None, budget_manager: None,
validation,
} }
} }
@ -148,6 +175,42 @@ impl AgentCoordinator {
context: String, context: String,
priority: u32, priority: u32,
) -> Result<String, CoordinatorError> { ) -> Result<String, CoordinatorError> {
// Validate inputs against schema
let input = serde_json::json!({
"role": role,
"title": &title,
"description": &description,
"context": &context,
"priority": priority,
});
let validation_result = self
.validation
.validate("agents/task_assignment", &input)
.await
.map_err(|e| CoordinatorError::ValidationError(e.to_string()))?;
if !validation_result.valid {
let error_messages: Vec<String> = validation_result
.errors
.iter()
.map(|e| e.to_string())
.collect();
warn!(
"Task assignment validation failed for role {}: {}",
role,
error_messages.join(", ")
);
return Err(CoordinatorError::ValidationError(format!(
"Validation failed: {}",
error_messages.join(", ")
)));
}
info!("Task assignment validated successfully for role: {}", role);
// Get all available candidates for role // Get all available candidates for role
let all_agents = self.registry.get_agents_by_role(role); let all_agents = self.registry.get_agents_by_role(role);
let candidates: Vec<_> = all_agents let candidates: Vec<_> = all_agents
@ -563,6 +626,9 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_coordinator_creation() { async fn test_coordinator_creation() {
// Set schema directory for tests (relative to workspace root)
std::env::set_var("VAPORA_SCHEMA_DIR", "../../schemas");
let registry = Arc::new(AgentRegistry::new(5)); let registry = Arc::new(AgentRegistry::new(5));
let coordinator = AgentCoordinator::with_registry(registry); let coordinator = AgentCoordinator::with_registry(registry);
@ -571,6 +637,9 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_task_assignment() { async fn test_task_assignment() {
// Set schema directory for tests (relative to workspace root)
std::env::set_var("VAPORA_SCHEMA_DIR", "../../schemas");
let registry = Arc::new(AgentRegistry::new(5)); let registry = Arc::new(AgentRegistry::new(5));
// Register an agent // Register an agent
@ -595,11 +664,18 @@ mod tests {
) )
.await; .await;
match &task_id {
Ok(_) => {}
Err(e) => eprintln!("Task assignment failed: {}", e),
}
assert!(task_id.is_ok()); assert!(task_id.is_ok());
} }
#[tokio::test] #[tokio::test]
async fn test_no_available_agent() { async fn test_no_available_agent() {
// Set schema directory for tests (relative to workspace root)
std::env::set_var("VAPORA_SCHEMA_DIR", "../../schemas");
let registry = Arc::new(AgentRegistry::new(5)); let registry = Arc::new(AgentRegistry::new(5));
let coordinator = AgentCoordinator::with_registry(registry); let coordinator = AgentCoordinator::with_registry(registry);

View File

@ -54,6 +54,10 @@ pub fn error_response(error: VaporaError) -> Response {
format!("TOML error: {}", msg), format!("TOML error: {}", msg),
), ),
VaporaError::InternalError(msg) => (StatusCode::INTERNAL_SERVER_ERROR, msg), VaporaError::InternalError(msg) => (StatusCode::INTERNAL_SERVER_ERROR, msg),
VaporaError::ValidationError(msg) => (
StatusCode::BAD_REQUEST,
format!("Validation error: {}", msg),
),
}; };
let body = Json(json!({ let body = Json(json!({

View File

@ -181,12 +181,14 @@ impl ReasoningEngine {
.iter() .iter()
.filter(|r| !visited.contains(&r.id) && r.solution.is_some()) .filter(|r| !visited.contains(&r.id) && r.solution.is_some())
{ {
if let Some(ref root) = other.root_cause { let Some(ref root) = other.root_cause else {
if root.contains(chain.last().unwrap()) { continue;
chain.push(other.solution.clone().unwrap()); };
visited.insert(other.id.clone());
break; if root.contains(chain.last().unwrap()) {
} chain.push(other.solution.clone().unwrap());
visited.insert(other.id.clone());
break;
} }
} }

View File

@ -116,17 +116,18 @@ impl TemporalKG {
let record = entry.value(); let record = entry.value();
if record.timestamp > cutoff && record.task_type == task_type { if record.timestamp > cutoff && record.task_type == task_type {
let similarity = if let Some(ref query_emb) = query_embedding { let similarity = match (
// Phase 5.1: Use vector embedding similarity &query_embedding,
if let Ok(Some(record_emb)) = self.get_or_embed(&record.description).await { self.get_or_embed(&record.description).await,
) {
(Some(query_emb), Ok(Some(record_emb))) => {
// Phase 5.1: Use vector embedding similarity
Self::compute_vector_similarity(query_emb, &record_emb) Self::compute_vector_similarity(query_emb, &record_emb)
} else { }
// Fallback to Jaccard if embedding fails _ => {
// Fallback to Jaccard if embedding fails or no embedding provider
calculate_similarity(description, &record.description) calculate_similarity(description, &record.description)
} }
} else {
// Fallback to Jaccard if no embedding provider
calculate_similarity(description, &record.description)
}; };
if similarity >= threshold { if similarity >= threshold {
@ -160,15 +161,13 @@ impl TemporalKG {
for task in similar_tasks { for task in similar_tasks {
if task.success { if task.success {
let confidence = if let Some(ref query_emb) = query_embedding { let confidence =
if let Ok(Some(task_emb)) = self.get_or_embed(&task.description).await { match (&query_embedding, self.get_or_embed(&task.description).await) {
Self::compute_vector_similarity(query_emb, &task_emb) (Some(query_emb), Ok(Some(task_emb))) => {
} else { Self::compute_vector_similarity(query_emb, &task_emb)
calculate_similarity(description, &task.description) }
} _ => calculate_similarity(description, &task.description),
} else { };
calculate_similarity(description, &task.description)
};
recommendations.push(Recommendation { recommendations.push(Recommendation {
source_record_id: task.id.clone(), source_record_id: task.id.clone(),
@ -284,28 +283,27 @@ impl TemporalKG {
for entry in self.records.iter() { for entry in self.records.iter() {
let record = entry.value(); let record = entry.value();
if !record.success { if !record.success {
if let Some(error) = &record.error { let Some(error) = &record.error else {
let similarity = if let Some(ref pattern_emb) = pattern_embedding { continue;
if let Ok(Some(error_emb)) = self.get_or_embed(error).await { };
Self::compute_vector_similarity(pattern_emb, &error_emb)
} else {
calculate_similarity(cause_pattern, error)
}
} else {
calculate_similarity(cause_pattern, error)
};
if similarity >= threshold { let similarity = match (&pattern_embedding, self.get_or_embed(error).await) {
relationships.push(CausalRelationship { (Some(pattern_emb), Ok(Some(error_emb))) => {
cause: error.clone(), Self::compute_vector_similarity(pattern_emb, &error_emb)
effect: record
.solution
.clone()
.unwrap_or_else(|| "unknown".to_string()),
confidence: similarity,
frequency: 1,
});
} }
_ => calculate_similarity(cause_pattern, error),
};
if similarity >= threshold {
relationships.push(CausalRelationship {
cause: error.clone(),
effect: record
.solution
.clone()
.unwrap_or_else(|| "unknown".to_string()),
confidence: similarity,
frequency: 1,
});
} }
} }
} }

View File

@ -158,32 +158,29 @@ impl LLMRouter {
} }
// Check budget if provided // Check budget if provided
if let Some(role) = agent_role { if let (Some(role), Some(budget_mgr)) = (agent_role, &self.budget_manager) {
if let Some(budget_mgr) = &self.budget_manager { match budget_mgr.check_budget(role).await {
match budget_mgr.check_budget(role).await { Ok(status) if status.exceeded => {
Ok(status) => { // Budget exceeded - use fallback provider
if status.exceeded { info!(
// Budget exceeded - use fallback provider "Budget exceeded for role {}, using fallback provider: {}",
info!( role, status.fallback_provider
"Budget exceeded for role {}, using fallback provider: {}", );
role, status.fallback_provider return Ok(status.fallback_provider);
); }
return Ok(status.fallback_provider); Ok(status) if status.near_threshold => {
} // Budget near threshold - prefer cost-efficient providers
debug!(
if status.near_threshold { "Budget near threshold for role {}, selecting cost-efficient provider",
// Budget near threshold - prefer cost-efficient providers role
debug!( );
"Budget near threshold for role {}, selecting cost-efficient \ return self.select_cost_efficient_provider(task_type).await;
provider", }
role Ok(_) => {
); // Budget ok, continue with normal routing
return self.select_cost_efficient_provider(task_type).await; }
} Err(e) => {
} warn!("Budget check failed: {}, continuing with normal routing", e);
Err(e) => {
warn!("Budget check failed: {}, continuing with normal routing", e);
}
} }
} }
} }
@ -327,13 +324,7 @@ impl LLMRouter {
); );
// Record spend with budget manager if available // Record spend with budget manager if available
if let Some(role) = agent_role { self.record_budget_spend(agent_role, cost as u32).await;
if let Some(budget_mgr) = &self.budget_manager {
if let Err(e) = budget_mgr.record_spend(role, cost as u32).await {
warn!("Failed to record budget spend: {}", e);
}
}
}
} }
Ok(response) Ok(response)
@ -411,6 +402,15 @@ impl LLMRouter {
latency_ms: provider.latency_ms(), latency_ms: provider.latency_ms(),
}) })
} }
/// Record budget spend for agent role
async fn record_budget_spend(&self, agent_role: Option<&str>, cost: u32) {
if let (Some(role), Some(budget_mgr)) = (agent_role, &self.budget_manager) {
if let Err(e) = budget_mgr.record_spend(role, cost).await {
warn!("Failed to record budget spend: {}", e);
}
}
}
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]

View File

@ -1,10 +1,13 @@
// vapora-mcp-server: Model Context Protocol server for VAPORA v1.0 // vapora-mcp-server: Model Context Protocol server for VAPORA v1.0
// Phase 2: Standalone MCP server with HTTP endpoints // Phase 2: Standalone MCP server with HTTP endpoints
// Phase 3: Schema validation pipeline integration
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use axum::{ use axum::{
extract::{Json, Path}, extract::{Json, Path, State},
http::StatusCode, http::StatusCode,
response::IntoResponse, response::IntoResponse,
routing::{get, post}, routing::{get, post},
@ -15,6 +18,7 @@ use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tracing::{info, warn}; use tracing::{info, warn};
use vapora_shared::validation::{SchemaRegistry, ValidationPipeline};
#[derive(Parser)] #[derive(Parser)]
#[command(name = "vapora-mcp-server")] #[command(name = "vapora-mcp-server")]
@ -56,6 +60,15 @@ struct PromptDefinition {
description: String, description: String,
} }
// ============================================================================
// App State
// ============================================================================
#[derive(Clone)]
struct AppState {
validation: Arc<ValidationPipeline>,
}
// ============================================================================ // ============================================================================
// Handlers // Handlers
// ============================================================================ // ============================================================================
@ -153,9 +166,59 @@ async fn list_tools() -> impl IntoResponse {
Json(json!({ "tools": tools })) Json(json!({ "tools": tools }))
} }
async fn invoke_tool(Json(request): Json<InvokeToolRequest>) -> impl IntoResponse { async fn invoke_tool(
State(state): State<AppState>,
Json(request): Json<InvokeToolRequest>,
) -> impl IntoResponse {
info!("Invoking tool: {}", request.tool); info!("Invoking tool: {}", request.tool);
// Validate parameters against schema
let schema_name = format!("tools/{}", request.tool);
let validation_result = match state
.validation
.validate(&schema_name, &request.parameters)
.await
{
Ok(result) => result,
Err(e) => {
warn!("Schema validation error for {}: {}", request.tool, e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({
"success": false,
"error": format!("Schema validation error: {}", e),
})),
);
}
};
// Check if validation passed
if !validation_result.valid {
let error_messages: Vec<String> = validation_result
.errors
.iter()
.map(|e| e.to_string())
.collect();
warn!(
"Validation failed for {}: {}",
request.tool,
error_messages.join(", ")
);
return (
StatusCode::BAD_REQUEST,
Json(json!({
"success": false,
"error": "Validation failed",
"validation_errors": error_messages,
})),
);
}
// Use validated data (with defaults applied)
let validated_params = validation_result.validated_data.unwrap();
let result = match request.tool.as_str() { let result = match request.tool.as_str() {
"kanban_create_task" => json!({ "kanban_create_task" => json!({
"success": true, "success": true,
@ -167,7 +230,7 @@ async fn invoke_tool(Json(request): Json<InvokeToolRequest>) -> impl IntoRespons
"message": "Task updated successfully" "message": "Task updated successfully"
}), }),
"get_project_summary" => json!({ "get_project_summary" => json!({
"project_id": request.parameters.get("project_id").and_then(|v| v.as_str()).unwrap_or("unknown"), "project_id": validated_params.get("project_id").and_then(|v| v.as_str()).unwrap_or("unknown"),
"total_tasks": 42, "total_tasks": 42,
"completed": 15, "completed": 15,
"in_progress": 12, "in_progress": 12,
@ -188,7 +251,7 @@ async fn invoke_tool(Json(request): Json<InvokeToolRequest>) -> impl IntoRespons
"success": true "success": true
}), }),
"get_agent_capabilities" => json!({ "get_agent_capabilities" => json!({
"agent_id": request.parameters.get("agent_id").and_then(|v| v.as_str()).unwrap_or("unknown"), "agent_id": validated_params.get("agent_id").and_then(|v| v.as_str()).unwrap_or("unknown"),
"role": "Developer", "role": "Developer",
"capabilities": ["coding", "debugging", "refactoring"], "capabilities": ["coding", "debugging", "refactoring"],
"llm_provider": "claude", "llm_provider": "claude",
@ -301,6 +364,17 @@ async fn main() -> anyhow::Result<()> {
let args = Args::parse(); let args = Args::parse();
// Initialize validation pipeline
let schema_dir = std::env::var("VAPORA_SCHEMA_DIR").unwrap_or_else(|_| "schemas".to_string());
let schema_path = PathBuf::from(&schema_dir);
info!("Loading schemas from: {}", schema_path.display());
let registry = Arc::new(SchemaRegistry::new(schema_path));
let validation = Arc::new(ValidationPipeline::new(registry));
let state = AppState { validation };
// Build router // Build router
let app = Router::new() let app = Router::new()
.route("/health", get(health)) .route("/health", get(health))
@ -308,7 +382,8 @@ async fn main() -> anyhow::Result<()> {
.route("/mcp/invoke", post(invoke_tool)) .route("/mcp/invoke", post(invoke_tool))
.route("/mcp/resources", get(list_resources)) .route("/mcp/resources", get(list_resources))
.route("/mcp/resources/:uri", get(get_resource)) .route("/mcp/resources/:uri", get(get_resource))
.route("/mcp/prompts", get(list_prompts)); .route("/mcp/prompts", get(list_prompts))
.with_state(state);
// Bind address // Bind address
let addr = format!("{}:{}", args.host, args.port).parse::<SocketAddr>()?; let addr = format!("{}:{}", args.host, args.port).parse::<SocketAddr>()?;
@ -366,7 +441,15 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_invoke_tool() { async fn test_invoke_tool() {
let app = Router::new().route("/mcp/invoke", post(invoke_tool)); // Create test state
let schema_path = PathBuf::from("schemas");
let registry = Arc::new(SchemaRegistry::new(schema_path));
let validation = Arc::new(ValidationPipeline::new(registry));
let state = AppState { validation };
let app = Router::new()
.route("/mcp/invoke", post(invoke_tool))
.with_state(state);
let server = TestServer::new(app).unwrap(); let server = TestServer::new(app).unwrap();
let request = json!({ let request = json!({

View File

@ -29,6 +29,12 @@ surrealdb = { workspace = true, optional = true }
# Logging # Logging
tracing = { workspace = true } tracing = { workspace = true }
# Validation
regex = { workspace = true }
# Async runtime (for validation pipeline)
tokio = { workspace = true, features = ["process", "io-util"] }
[features] [features]
default = ["backend"] default = ["backend"]
backend = ["surrealdb"] backend = ["surrealdb"]

View File

@ -22,6 +22,10 @@ pub enum VaporaError {
#[error("Invalid input: {0}")] #[error("Invalid input: {0}")]
InvalidInput(String), InvalidInput(String),
/// Schema validation error (Nickel contracts)
#[error("Validation error: {0}")]
ValidationError(String),
/// Authentication or authorization error /// Authentication or authorization error
#[error("Unauthorized: {0}")] #[error("Unauthorized: {0}")]
Unauthorized(String), Unauthorized(String),

View File

@ -3,5 +3,6 @@
pub mod error; pub mod error;
pub mod models; pub mod models;
pub mod validation;
pub use error::{Result, VaporaError}; pub use error::{Result, VaporaError};

View File

@ -0,0 +1,12 @@
// vapora-shared: Validation module - Schema validation pipeline with Nickel
// contracts Phase: Schema Validation Pipeline implementation
pub mod nickel_bridge;
pub mod pipeline;
pub mod schema_registry;
pub use nickel_bridge::NickelCli;
pub use pipeline::{ValidationError, ValidationPipeline, ValidationResult};
pub use schema_registry::{
CompiledSchema, Contract, FieldSchema, FieldType, SchemaRegistry, SchemaSource,
};

View File

@ -0,0 +1,254 @@
// vapora-shared: Nickel CLI bridge - Execute Nickel commands for schema
// operations Provides interface to nickel query, typecheck, and export commands
use std::path::{Path, PathBuf};
use std::process::Stdio;
use tokio::process::Command;
use tracing::debug;
use crate::error::{Result, VaporaError};
/// Nickel CLI bridge for schema operations
#[derive(Debug, Clone)]
pub struct NickelCli {
/// Path to nickel executable
nickel_path: PathBuf,
/// Timeout for nickel commands (milliseconds)
timeout_ms: u64,
}
impl NickelCli {
/// Create new Nickel CLI bridge with default path
pub fn new() -> Self {
Self {
nickel_path: PathBuf::from("nickel"),
timeout_ms: 30_000, // 30 seconds
}
}
/// Create Nickel CLI bridge with custom path
pub fn with_path(nickel_path: PathBuf) -> Self {
Self {
nickel_path,
timeout_ms: 30_000,
}
}
/// Set timeout for Nickel commands
pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
self.timeout_ms = timeout_ms;
self
}
/// Execute `nickel query` to extract schema metadata
///
/// Returns JSON representation of Nickel value
pub async fn query(&self, path: &Path, field: Option<&str>) -> Result<serde_json::Value> {
let mut args = vec!["query", "--format", "json"];
if let Some(f) = field {
args.push("--field");
args.push(f);
}
let path_str = path.to_str().ok_or_else(|| {
VaporaError::ValidationError(format!("Invalid path: {}", path.display()))
})?;
args.push(path_str);
debug!("Executing: nickel {}", args.join(" "));
let output = self.execute_with_timeout(&args).await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(VaporaError::ValidationError(format!(
"Nickel query failed: {}",
stderr
)));
}
let stdout = String::from_utf8_lossy(&output.stdout);
serde_json::from_str(&stdout).map_err(|e| {
VaporaError::ValidationError(format!("Failed to parse Nickel output: {}", e))
})
}
/// Execute `nickel typecheck` to validate schema
///
/// Returns Ok(()) if typecheck passes, Err with details if it fails
pub async fn typecheck(&self, path: &Path) -> Result<()> {
let path_str = path.to_str().ok_or_else(|| {
VaporaError::ValidationError(format!("Invalid path: {}", path.display()))
})?;
let args = vec!["typecheck", path_str];
debug!("Executing: nickel {}", args.join(" "));
let output = self.execute_with_timeout(&args).await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(VaporaError::ValidationError(format!(
"Nickel typecheck failed: {}",
stderr
)));
}
Ok(())
}
/// Execute `nickel export` to generate JSON output
///
/// Returns exported JSON value
pub async fn export(&self, path: &Path) -> Result<serde_json::Value> {
let path_str = path.to_str().ok_or_else(|| {
VaporaError::ValidationError(format!("Invalid path: {}", path.display()))
})?;
let args = vec!["export", "--format", "json", path_str];
debug!("Executing: nickel {}", args.join(" "));
let output = self.execute_with_timeout(&args).await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(VaporaError::ValidationError(format!(
"Nickel export failed: {}",
stderr
)));
}
let stdout = String::from_utf8_lossy(&output.stdout);
serde_json::from_str(&stdout).map_err(|e| {
VaporaError::ValidationError(format!("Failed to parse Nickel export: {}", e))
})
}
/// Check if Nickel CLI is available
pub async fn is_available(&self) -> bool {
let result = Command::new(&self.nickel_path)
.arg("--version")
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.await;
match result {
Ok(status) => status.success(),
Err(_) => false,
}
}
/// Get Nickel CLI version
pub async fn version(&self) -> Result<String> {
let output = Command::new(&self.nickel_path)
.arg("--version")
.output()
.await
.map_err(|e| {
VaporaError::ValidationError(format!("Failed to get Nickel version: {}", e))
})?;
if !output.status.success() {
return Err(VaporaError::ValidationError(
"Failed to get Nickel version".to_string(),
));
}
let version = String::from_utf8_lossy(&output.stdout);
Ok(version.trim().to_string())
}
/// Execute Nickel command with timeout
async fn execute_with_timeout(&self, args: &[&str]) -> Result<std::process::Output> {
let child = Command::new(&self.nickel_path)
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| {
VaporaError::ValidationError(format!("Failed to execute Nickel: {}", e))
})?;
// Wait with timeout
let timeout = tokio::time::Duration::from_millis(self.timeout_ms);
let result = tokio::time::timeout(timeout, child.wait_with_output()).await;
match result {
Ok(Ok(output)) => Ok(output),
Ok(Err(e)) => Err(VaporaError::ValidationError(format!(
"Nickel command failed: {}",
e
))),
Err(_) => {
// Timeout occurred - process is still running but we give up
Err(VaporaError::ValidationError(format!(
"Nickel command timed out after {}ms",
self.timeout_ms
)))
}
}
}
}
impl Default for NickelCli {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_nickel_cli_creation() {
let cli = NickelCli::new();
assert_eq!(cli.nickel_path, PathBuf::from("nickel"));
assert_eq!(cli.timeout_ms, 30_000);
}
#[tokio::test]
async fn test_nickel_cli_with_path() {
let custom_path = PathBuf::from("/usr/local/bin/nickel");
let cli = NickelCli::with_path(custom_path.clone());
assert_eq!(cli.nickel_path, custom_path);
}
#[tokio::test]
async fn test_nickel_cli_with_timeout() {
let cli = NickelCli::new().with_timeout(5_000);
assert_eq!(cli.timeout_ms, 5_000);
}
#[tokio::test]
#[ignore] // Requires Nickel CLI installed
async fn test_nickel_is_available() {
let cli = NickelCli::new();
let available = cli.is_available().await;
// This will pass if nickel is in PATH
if available {
println!("Nickel CLI is available");
} else {
println!("Nickel CLI is not available (install it to run this test)");
}
}
#[tokio::test]
#[ignore] // Requires Nickel CLI installed
async fn test_nickel_version() {
let cli = NickelCli::new();
if cli.is_available().await {
let version = cli.version().await.unwrap();
println!("Nickel version: {}", version);
assert!(!version.is_empty());
}
}
}

View File

@ -0,0 +1,636 @@
// vapora-shared: Validation pipeline - Runtime validation against Nickel
// contracts Validates JSON inputs against compiled schemas with detailed error
// reporting
use std::sync::Arc;
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tracing::{debug, warn};
use super::schema_registry::{CompiledSchema, Contract, FieldType, SchemaRegistry};
use crate::error::Result;
/// Validation error with field context
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValidationError {
/// Field name that failed validation
pub field: String,
/// Human-readable error message
pub message: String,
/// Contract that was violated
pub contract: String,
/// Expected value/type
pub expected: Option<String>,
/// Actual value received
pub actual: Option<String>,
}
impl ValidationError {
/// Create a new validation error
pub fn new(
field: impl Into<String>,
message: impl Into<String>,
contract: impl Into<String>,
) -> Self {
Self {
field: field.into(),
message: message.into(),
contract: contract.into(),
expected: None,
actual: None,
}
}
/// Add expected value to error
pub fn with_expected(mut self, expected: impl Into<String>) -> Self {
self.expected = Some(expected.into());
self
}
/// Add actual value to error
pub fn with_actual(mut self, actual: impl Into<String>) -> Self {
self.actual = Some(actual.into());
self
}
}
impl std::fmt::Display for ValidationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Field '{}': {}", self.field, self.message)?;
if let Some(expected) = &self.expected {
write!(f, " (expected: {})", expected)?;
}
if let Some(actual) = &self.actual {
write!(f, " (got: {})", actual)?;
}
write!(f, " [contract: {}]", self.contract)
}
}
/// Result of validation pipeline
#[derive(Debug, Serialize, Deserialize)]
pub struct ValidationResult {
/// Is input valid?
pub valid: bool,
/// Validation errors (if any)
pub errors: Vec<ValidationError>,
/// Validated data with defaults applied
pub validated_data: Option<Value>,
/// Schema name that was used
pub schema_name: String,
}
impl ValidationResult {
/// Create successful validation result
pub fn ok(data: Value, schema_name: impl Into<String>) -> Self {
Self {
valid: true,
errors: vec![],
validated_data: Some(data),
schema_name: schema_name.into(),
}
}
/// Create failed validation result
pub fn err(errors: Vec<ValidationError>, schema_name: impl Into<String>) -> Self {
Self {
valid: false,
errors,
validated_data: None,
schema_name: schema_name.into(),
}
}
/// Get all error messages
pub fn error_messages(&self) -> Vec<String> {
self.errors.iter().map(|e| e.to_string()).collect()
}
/// Check if specific field has error
pub fn has_field_error(&self, field_name: &str) -> bool {
self.errors.iter().any(|e| e.field == field_name)
}
}
/// Main validation pipeline
pub struct ValidationPipeline {
/// Schema registry
registry: Arc<SchemaRegistry>,
/// Enable strict mode (fail on unknown fields)
strict_mode: bool,
}
impl ValidationPipeline {
/// Create new validation pipeline
pub fn new(registry: Arc<SchemaRegistry>) -> Self {
Self {
registry,
strict_mode: false,
}
}
/// Enable strict mode (reject unknown fields)
pub fn with_strict_mode(mut self, enabled: bool) -> Self {
self.strict_mode = enabled;
self
}
/// Validate input against a named schema
pub async fn validate(&self, schema_name: &str, input: &Value) -> Result<ValidationResult> {
let schema = self.registry.load_schema(schema_name).await?;
self.validate_against_schema(&schema, input)
}
/// Validate input against compiled schema
pub fn validate_against_schema(
&self,
schema: &CompiledSchema,
input: &Value,
) -> Result<ValidationResult> {
let mut errors = vec![];
let mut validated = serde_json::Map::new();
let input_obj = match input.as_object() {
Some(obj) => obj,
None => {
errors.push(ValidationError::new(
"<root>",
"Input must be a JSON object",
"type",
));
return Ok(ValidationResult::err(errors, &schema.name));
}
};
// Validate all defined fields
for field_schema in &schema.fields {
let field_value = input_obj.get(&field_schema.name);
// Check required fields
if field_schema.required && field_value.is_none() {
errors.push(
ValidationError::new(&field_schema.name, "Required field missing", "required")
.with_expected(format!("{:?}", field_schema.field_type)),
);
continue;
}
// Apply default if missing
let value = match field_value {
Some(v) => v.clone(),
None => match &field_schema.default {
Some(default) => {
debug!(
"Applying default for field {}: {:?}",
field_schema.name, default
);
default.clone()
}
None => continue,
},
};
// Type validation
if let Err(e) = self.validate_type(&field_schema.name, &value, &field_schema.field_type)
{
errors.push(e);
continue;
}
// Contract validation
for contract in &field_schema.contracts {
if let Err(e) = self.validate_contract(&field_schema.name, &value, contract) {
errors.push(e);
}
}
validated.insert(field_schema.name.clone(), value);
}
// Strict mode: check for unknown fields
if self.strict_mode {
for (key, _value) in input_obj {
if !schema.fields.iter().any(|f| &f.name == key) {
errors.push(ValidationError::new(
key,
"Unknown field (strict mode enabled)",
"strict",
));
}
}
}
if errors.is_empty() {
Ok(ValidationResult::ok(Value::Object(validated), &schema.name))
} else {
Ok(ValidationResult::err(errors, &schema.name))
}
}
/// Validate value type
fn validate_type(
&self,
field_name: &str,
value: &Value,
expected: &FieldType,
) -> std::result::Result<(), ValidationError> {
let valid = match expected {
FieldType::String => value.is_string(),
FieldType::Number => value.is_number(),
FieldType::Bool => value.is_boolean(),
FieldType::Array(_) => value.is_array(),
FieldType::Object => value.is_object(),
FieldType::Enum(variants) => value
.as_str()
.map(|s| variants.contains(&s.to_string()))
.unwrap_or(false),
FieldType::Union(types) => {
// Valid if matches any type in union
types
.iter()
.any(|t| self.validate_type(field_name, value, t).is_ok())
}
};
if valid {
Ok(())
} else {
Err(ValidationError::new(field_name, "Type mismatch", "type")
.with_expected(format!("{:?}", expected))
.with_actual(format!("{:?}", value)))
}
}
/// Validate value against contract
fn validate_contract(
&self,
field_name: &str,
value: &Value,
contract: &Contract,
) -> std::result::Result<(), ValidationError> {
match contract {
Contract::NonEmpty => {
if let Some(s) = value.as_str() {
if s.is_empty() {
return Err(ValidationError::new(
field_name,
"String cannot be empty",
"std.string.NonEmpty",
));
}
}
}
Contract::MinLength(min) => {
if let Some(s) = value.as_str() {
if s.len() < *min {
return Err(ValidationError::new(
field_name,
format!("String too short (minimum {} characters)", min),
format!("std.string.length.min {}", min),
)
.with_expected(format!("{} chars", min))
.with_actual(format!("{} chars", s.len())));
}
}
}
Contract::MaxLength(max) => {
if let Some(s) = value.as_str() {
if s.len() > *max {
return Err(ValidationError::new(
field_name,
format!("String too long (maximum {} characters)", max),
format!("std.string.length.max {}", max),
)
.with_expected(format!("{} chars", max))
.with_actual(format!("{} chars", s.len())));
}
}
}
Contract::Pattern(pattern) => {
if let Some(s) = value.as_str() {
let re = Regex::new(pattern).map_err(|e| {
ValidationError::new(
field_name,
format!("Invalid regex pattern: {}", e),
"std.string.match",
)
})?;
if !re.is_match(s) {
return Err(ValidationError::new(
field_name,
"Value does not match required pattern",
format!("std.string.match {}", pattern),
)
.with_expected(format!("pattern: {}", pattern))
.with_actual(s));
}
}
}
Contract::Range { min, max } => {
if let Some(n) = value.as_f64() {
if n < *min || n > *max {
return Err(ValidationError::new(
field_name,
format!("Number must be between {} and {}", min, max),
format!("std.number.between {} {}", min, max),
)
.with_expected(format!("[{}, {}]", min, max))
.with_actual(n.to_string()));
}
}
}
Contract::GreaterThan(min) => {
if let Some(n) = value.as_f64() {
if n <= *min {
return Err(ValidationError::new(
field_name,
format!("Number must be greater than {}", min),
format!("std.number.greater_than {}", min),
)
.with_expected(format!("> {}", min))
.with_actual(n.to_string()));
}
}
}
Contract::LessThan(max) => {
if let Some(n) = value.as_f64() {
if n >= *max {
return Err(ValidationError::new(
field_name,
format!("Number must be less than {}", max),
format!("std.number.less_than {}", max),
)
.with_expected(format!("< {}", max))
.with_actual(n.to_string()));
}
}
}
Contract::Email => {
if let Some(s) = value.as_str() {
// Simple email validation (not RFC-compliant, but good enough)
if !s.contains('@') || !s.contains('.') || s.len() < 5 {
return Err(ValidationError::new(
field_name,
"Invalid email format",
"std.string.Email",
)
.with_expected("user@example.com")
.with_actual(s));
}
// Check for multiple @
if s.matches('@').count() != 1 {
return Err(ValidationError::new(
field_name,
"Invalid email format (multiple @ symbols)",
"std.string.Email",
));
}
}
}
Contract::Url => {
if let Some(s) = value.as_str() {
if !s.starts_with("http://") && !s.starts_with("https://") {
return Err(ValidationError::new(
field_name,
"Invalid URL format (must start with http:// or https://)",
"std.string.Url",
)
.with_expected("https://example.com")
.with_actual(s));
}
}
}
Contract::Uuid => {
if let Some(s) = value.as_str() {
let uuid_pattern =
r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$";
let re = Regex::new(uuid_pattern).unwrap();
if !re.is_match(s) {
return Err(ValidationError::new(
field_name,
"Invalid UUID format",
"std.string.Uuid",
)
.with_expected("xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx")
.with_actual(s));
}
}
}
Contract::Custom(source) => {
// Custom contracts require Nickel evaluation
// For now, warn and skip
warn!(
"Custom contract validation not yet implemented for field {}: {}",
field_name, source
);
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use serde_json::json;
use super::*;
use crate::validation::schema_registry::{CompiledSchema, Contract, FieldSchema, FieldType};
fn create_test_schema() -> CompiledSchema {
CompiledSchema {
name: "test_schema".to_string(),
fields: vec![
FieldSchema {
name: "title".to_string(),
field_type: FieldType::String,
required: true,
contracts: vec![Contract::NonEmpty, Contract::MaxLength(100)],
default: None,
doc: Some("Task title".to_string()),
},
FieldSchema {
name: "priority".to_string(),
field_type: FieldType::Number,
required: true,
contracts: vec![Contract::Range {
min: 0.0,
max: 100.0,
}],
default: None,
doc: Some("Priority score".to_string()),
},
FieldSchema {
name: "email".to_string(),
field_type: FieldType::String,
required: false,
contracts: vec![Contract::Email],
default: Some(json!("noreply@example.com")),
doc: None,
},
],
custom_contracts: vec![],
source_path: PathBuf::from("test.ncl"),
}
}
#[test]
fn test_validation_error_display() {
let error = ValidationError::new("title", "Field is required", "required")
.with_expected("String")
.with_actual("null");
let display = error.to_string();
assert!(display.contains("title"));
assert!(display.contains("required"));
assert!(display.contains("String"));
}
#[test]
fn test_validation_result_ok() {
let data = json!({"title": "Test"});
let result = ValidationResult::ok(data.clone(), "test_schema");
assert!(result.valid);
assert!(result.errors.is_empty());
assert_eq!(result.validated_data, Some(data));
}
#[test]
fn test_validation_result_err() {
let errors = vec![ValidationError::new("field1", "Error", "contract1")];
let result = ValidationResult::err(errors, "test_schema");
assert!(!result.valid);
assert_eq!(result.errors.len(), 1);
assert!(result.validated_data.is_none());
}
#[tokio::test]
async fn test_validate_valid_input() {
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("/tmp")));
let pipeline = ValidationPipeline::new(registry);
let schema = create_test_schema();
let input = json!({
"title": "Test Task",
"priority": 50.0
});
let result = pipeline.validate_against_schema(&schema, &input).unwrap();
assert!(result.valid, "Validation should pass for valid input");
assert!(result.errors.is_empty());
}
#[tokio::test]
async fn test_validate_missing_required_field() {
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("/tmp")));
let pipeline = ValidationPipeline::new(registry);
let schema = create_test_schema();
let input = json!({
"priority": 50.0
});
let result = pipeline.validate_against_schema(&schema, &input).unwrap();
assert!(!result.valid);
assert!(result.has_field_error("title"));
}
#[tokio::test]
async fn test_validate_contract_violation() {
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("/tmp")));
let pipeline = ValidationPipeline::new(registry);
let schema = create_test_schema();
let input = json!({
"title": "", // Empty string violates NonEmpty
"priority": 50.0
});
let result = pipeline.validate_against_schema(&schema, &input).unwrap();
assert!(!result.valid);
assert!(result.has_field_error("title"));
}
#[tokio::test]
async fn test_validate_with_defaults() {
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("/tmp")));
let pipeline = ValidationPipeline::new(registry);
let schema = create_test_schema();
let input = json!({
"title": "Test",
"priority": 50.0
// email not provided, should use default
});
let result = pipeline.validate_against_schema(&schema, &input).unwrap();
assert!(result.valid);
let validated = result.validated_data.unwrap();
assert_eq!(validated["email"], "noreply@example.com");
}
#[tokio::test]
async fn test_validate_number_range() {
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("/tmp")));
let pipeline = ValidationPipeline::new(registry);
let schema = create_test_schema();
// Out of range
let input = json!({
"title": "Test",
"priority": 150.0 // > 100
});
let result = pipeline.validate_against_schema(&schema, &input).unwrap();
assert!(!result.valid);
assert!(result.has_field_error("priority"));
}
#[tokio::test]
async fn test_validate_email_format() {
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("/tmp")));
let pipeline = ValidationPipeline::new(registry);
let schema = create_test_schema();
let input = json!({
"title": "Test",
"priority": 50.0,
"email": "invalid-email" // Missing @
});
let result = pipeline.validate_against_schema(&schema, &input).unwrap();
assert!(!result.valid);
assert!(result.has_field_error("email"));
}
}

View File

@ -0,0 +1,481 @@
// vapora-shared: Schema registry - Loads and caches Nickel schemas for
// validation Compiles Nickel schemas to internal representation for runtime
// validation
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use super::nickel_bridge::NickelCli;
use crate::error::{Result, VaporaError};
/// Compiled schema representation (from Nickel)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompiledSchema {
/// Schema name (e.g., "tools/kanban_create_task")
pub name: String,
/// Field definitions with types and contracts
pub fields: Vec<FieldSchema>,
/// Custom contract predicates (Nickel functions)
pub custom_contracts: Vec<String>,
/// Source file path
pub source_path: PathBuf,
}
/// Field schema with type and validation contracts
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FieldSchema {
/// Field name (flattened path for nested fields)
pub name: String,
/// Field type (Nickel type)
pub field_type: FieldType,
/// Is this field required?
pub required: bool,
/// Validation contracts (predicates)
pub contracts: Vec<Contract>,
/// Default value (if field is optional)
pub default: Option<serde_json::Value>,
/// Documentation string
pub doc: Option<String>,
}
/// Nickel field types
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum FieldType {
/// String type
String,
/// Number type (f64)
Number,
/// Boolean type
Bool,
/// Array type with element type
Array(Box<FieldType>),
/// Object/Record type
Object,
/// Enum with allowed values
Enum(Vec<String>),
/// Union of types (Nickel `|`)
Union(Vec<FieldType>),
}
/// Validation contract (Nickel predicate)
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum Contract {
/// Non-empty string (std.string.NonEmpty)
NonEmpty,
/// Minimum string length
MinLength(usize),
/// Maximum string length
MaxLength(usize),
/// Regex pattern match
Pattern(String),
/// Numeric range validation
Range { min: f64, max: f64 },
/// Greater than (exclusive)
GreaterThan(f64),
/// Less than (exclusive)
LessThan(f64),
/// Email validation
Email,
/// URL validation
Url,
/// UUID validation
Uuid,
/// Custom Nickel predicate (function source code)
Custom(String),
}
/// Schema source (file or inline definition)
#[derive(Debug, Clone)]
pub enum SchemaSource {
/// Load from Nickel file
File(PathBuf),
/// Inline Nickel code
Inline(String),
/// Pre-compiled schema (for testing)
Compiled(CompiledSchema),
}
/// Registry that loads and caches Nickel schemas
pub struct SchemaRegistry {
/// Cached compiled schemas
schemas: Arc<RwLock<HashMap<String, CompiledSchema>>>,
/// Base directory for schema files
schema_dir: PathBuf,
/// Nickel CLI bridge
nickel_cli: NickelCli,
}
impl SchemaRegistry {
/// Create a new schema registry
pub fn new(schema_dir: PathBuf) -> Self {
Self {
schemas: Arc::new(RwLock::new(HashMap::new())),
schema_dir,
nickel_cli: NickelCli::new(),
}
}
/// Create registry with custom Nickel CLI path
pub fn with_nickel_cli(schema_dir: PathBuf, nickel_path: PathBuf) -> Self {
Self {
schemas: Arc::new(RwLock::new(HashMap::new())),
schema_dir,
nickel_cli: NickelCli::with_path(nickel_path),
}
}
/// Load schema from Nickel file, compile and cache
pub async fn load_schema(&self, schema_name: &str) -> Result<CompiledSchema> {
// Check cache first
{
let cache = self.schemas.read().await;
if let Some(schema) = cache.get(schema_name) {
debug!("Schema {} loaded from cache", schema_name);
return Ok(schema.clone());
}
}
// Load from file
let schema_path = self.resolve_schema_path(schema_name);
let compiled = self.compile_schema(&schema_path, schema_name).await?;
// Cache it
{
let mut cache = self.schemas.write().await;
cache.insert(schema_name.to_string(), compiled.clone());
}
info!("Schema {} compiled and cached", schema_name);
Ok(compiled)
}
/// Load schema from source (file, inline, or pre-compiled)
pub async fn load_from_source(
&self,
source: SchemaSource,
schema_name: &str,
) -> Result<CompiledSchema> {
match source {
SchemaSource::File(path) => self.compile_schema(&path, schema_name).await,
SchemaSource::Inline(code) => self.compile_inline(&code, schema_name).await,
SchemaSource::Compiled(schema) => Ok(schema),
}
}
/// Resolve schema name to file path
fn resolve_schema_path(&self, schema_name: &str) -> PathBuf {
let filename = if schema_name.ends_with(".ncl") {
schema_name.to_string()
} else {
format!("{}.ncl", schema_name)
};
self.schema_dir.join(filename)
}
/// Compile Nickel schema from file to internal representation
async fn compile_schema(&self, path: &Path, name: &str) -> Result<CompiledSchema> {
if !path.exists() {
return Err(VaporaError::ValidationError(format!(
"Schema file not found: {}",
path.display()
)));
}
// Use nickel query to extract schema metadata
let metadata_json = self.nickel_cli.query(path, None).await?;
self.parse_nickel_metadata(&metadata_json, name, path)
}
/// Compile inline Nickel code
async fn compile_inline(&self, code: &str, name: &str) -> Result<CompiledSchema> {
// Write to temp file for Nickel CLI processing
let temp_dir = std::env::temp_dir();
let temp_path = temp_dir.join(format!("vapora_schema_{}.ncl", name.replace('/', "_")));
tokio::fs::write(&temp_path, code).await.map_err(|e| {
VaporaError::ValidationError(format!("Failed to write temp file: {}", e))
})?;
let result = self.compile_schema(&temp_path, name).await;
// Clean up temp file
let _ = tokio::fs::remove_file(&temp_path).await;
result
}
/// Parse Nickel metadata JSON into CompiledSchema
fn parse_nickel_metadata(
&self,
json: &serde_json::Value,
name: &str,
source_path: &Path,
) -> Result<CompiledSchema> {
// Extract schema name from metadata or use provided name
let schema_name = json["tool_name"]
.as_str()
.or_else(|| json["schema_name"].as_str())
.unwrap_or(name)
.to_string();
// Extract parameters/fields object
let params = json["parameters"]
.as_object()
.or_else(|| json["fields"].as_object());
let fields = match params {
Some(params_obj) => params_obj
.iter()
.map(|(field_name, field_def)| self.parse_field(field_name, field_def))
.collect::<Result<Vec<_>>>()?,
None => {
warn!("No parameters or fields found in schema {}", name);
vec![]
}
};
// Extract custom contracts
let custom_contracts = json["contracts"]
.as_object()
.map(|obj| {
obj.iter()
.map(|(name, _code)| name.clone())
.collect::<Vec<_>>()
})
.unwrap_or_default();
Ok(CompiledSchema {
name: schema_name,
fields,
custom_contracts,
source_path: source_path.to_path_buf(),
})
}
/// Parse a single field definition from Nickel metadata
fn parse_field(&self, name: &str, def: &serde_json::Value) -> Result<FieldSchema> {
let field_type = self.parse_type(def)?;
let contracts = self.extract_contracts(def);
let default = def.get("default").cloned();
let required = default.is_none();
let doc = def.get("doc").and_then(|v| v.as_str()).map(String::from);
Ok(FieldSchema {
name: name.to_string(),
field_type,
required,
contracts,
default,
doc,
})
}
/// Parse Nickel type from metadata
fn parse_type(&self, def: &serde_json::Value) -> Result<FieldType> {
// Type information comes from Nickel metadata
let type_str = def["type"].as_str().unwrap_or("String");
let field_type = match type_str {
"String" => FieldType::String,
"Number" => FieldType::Number,
"Bool" => FieldType::Bool,
"Object" | "Record" => FieldType::Object,
t if t.starts_with("Array") => {
// Parse array element type
FieldType::Array(Box::new(FieldType::String))
}
t if t.starts_with("Enum") => {
// Extract enum variants
if let Some(variants) = def["variants"].as_array() {
let values = variants
.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect();
FieldType::Enum(values)
} else {
FieldType::String
}
}
_ => FieldType::String,
};
Ok(field_type)
}
/// Extract validation contracts from Nickel metadata
fn extract_contracts(&self, def: &serde_json::Value) -> Vec<Contract> {
let mut contracts = vec![];
// Parse contract annotations from Nickel metadata
if let Some(annotations) = def["annotations"].as_array() {
for ann in annotations {
if let Some(contract) = self.parse_contract_annotation(ann) {
contracts.push(contract);
}
}
}
contracts
}
/// Parse a single contract annotation
fn parse_contract_annotation(&self, ann: &serde_json::Value) -> Option<Contract> {
let name = ann["name"].as_str()?;
match name {
"std.string.NonEmpty" => Some(Contract::NonEmpty),
"std.string.Email" => Some(Contract::Email),
"std.string.Url" => Some(Contract::Url),
"std.string.Uuid" => Some(Contract::Uuid),
n if n.starts_with("std.string.length.min") => {
let min = ann["args"][0].as_u64()? as usize;
Some(Contract::MinLength(min))
}
n if n.starts_with("std.string.length.max") => {
let max = ann["args"][0].as_u64()? as usize;
Some(Contract::MaxLength(max))
}
n if n.starts_with("std.number.between") => {
let min = ann["args"][0].as_f64()?;
let max = ann["args"][1].as_f64()?;
Some(Contract::Range { min, max })
}
n if n.starts_with("std.number.greater_than") => {
let min = ann["args"][0].as_f64()?;
Some(Contract::GreaterThan(min))
}
n if n.starts_with("std.number.less_than") => {
let max = ann["args"][0].as_f64()?;
Some(Contract::LessThan(max))
}
n if n.starts_with("std.string.match") => {
let pattern = ann["args"][0].as_str()?.to_string();
Some(Contract::Pattern(pattern))
}
_ => {
// Custom contract - store source code
ann["source"]
.as_str()
.map(|source| Contract::Custom(source.to_string()))
}
}
}
/// Invalidate cache for a specific schema (for hot-reload)
pub async fn invalidate(&self, schema_name: &str) {
let mut cache = self.schemas.write().await;
cache.remove(schema_name);
debug!("Schema {} invalidated from cache", schema_name);
}
/// Invalidate all cached schemas
pub async fn invalidate_all(&self) {
let mut cache = self.schemas.write().await;
let count = cache.len();
cache.clear();
info!("Invalidated {} schemas from cache", count);
}
/// Get all cached schema names
pub async fn list_cached(&self) -> Vec<String> {
let cache = self.schemas.read().await;
cache.keys().cloned().collect()
}
/// Check if schema is cached
pub async fn is_cached(&self, schema_name: &str) -> bool {
let cache = self.schemas.read().await;
cache.contains_key(schema_name)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_parse_field_type() {
let registry = SchemaRegistry::new(PathBuf::from("/tmp"));
let string_def = serde_json::json!({"type": "String"});
let field_type = registry.parse_type(&string_def).unwrap();
assert_eq!(field_type, FieldType::String);
let number_def = serde_json::json!({"type": "Number"});
let field_type = registry.parse_type(&number_def).unwrap();
assert_eq!(field_type, FieldType::Number);
}
#[tokio::test]
async fn test_parse_contract_annotation() {
let registry = SchemaRegistry::new(PathBuf::from("/tmp"));
let non_empty = serde_json::json!({"name": "std.string.NonEmpty"});
let contract = registry.parse_contract_annotation(&non_empty);
assert_eq!(contract, Some(Contract::NonEmpty));
let min_length = serde_json::json!({
"name": "std.string.length.min",
"args": [5]
});
let contract = registry.parse_contract_annotation(&min_length);
assert_eq!(contract, Some(Contract::MinLength(5)));
let range = serde_json::json!({
"name": "std.number.between",
"args": [0.0, 100.0]
});
let contract = registry.parse_contract_annotation(&range);
assert_eq!(
contract,
Some(Contract::Range {
min: 0.0,
max: 100.0
})
);
}
#[tokio::test]
async fn test_schema_source() {
let inline_source = SchemaSource::Inline(r#"{ name = "test" }"#.to_string());
assert!(matches!(inline_source, SchemaSource::Inline(_)));
let file_source = SchemaSource::File(PathBuf::from("test.ncl"));
assert!(matches!(file_source, SchemaSource::File(_)));
}
}

View File

@ -0,0 +1,352 @@
// Integration tests for ValidationPipeline
// Demonstrates end-to-end schema validation workflow
use std::path::PathBuf;
use std::sync::Arc;
use serde_json::json;
use vapora_shared::validation::{
CompiledSchema, Contract, FieldSchema, FieldType, SchemaRegistry, SchemaSource,
ValidationPipeline,
};
/// Create a test schema for MCP tool validation
fn create_mcp_tool_schema() -> CompiledSchema {
CompiledSchema {
name: "tools/kanban_create_task".to_string(),
fields: vec![
FieldSchema {
name: "project_id".to_string(),
field_type: FieldType::String,
required: true,
contracts: vec![Contract::NonEmpty, Contract::Uuid],
default: None,
doc: Some("Project UUID".to_string()),
},
FieldSchema {
name: "title".to_string(),
field_type: FieldType::String,
required: true,
contracts: vec![
Contract::NonEmpty,
Contract::MinLength(3),
Contract::MaxLength(200),
],
default: None,
doc: Some("Task title (3-200 chars)".to_string()),
},
FieldSchema {
name: "description".to_string(),
field_type: FieldType::String,
required: false,
contracts: vec![],
default: Some(json!("")),
doc: Some("Task description".to_string()),
},
FieldSchema {
name: "priority".to_string(),
field_type: FieldType::String,
required: true,
contracts: vec![],
default: None,
doc: Some("Task priority".to_string()),
},
],
custom_contracts: vec![],
source_path: PathBuf::from("schemas/tools/kanban_create_task.ncl"),
}
}
/// Create a test schema for agent task assignment
fn create_agent_assignment_schema() -> CompiledSchema {
CompiledSchema {
name: "agents/task_assignment".to_string(),
fields: vec![
FieldSchema {
name: "role".to_string(),
field_type: FieldType::Enum(vec![
"developer".to_string(),
"reviewer".to_string(),
"architect".to_string(),
"tester".to_string(),
]),
required: true,
contracts: vec![],
default: None,
doc: Some("Agent role".to_string()),
},
FieldSchema {
name: "title".to_string(),
field_type: FieldType::String,
required: true,
contracts: vec![Contract::NonEmpty, Contract::MaxLength(500)],
default: None,
doc: Some("Task title".to_string()),
},
FieldSchema {
name: "priority".to_string(),
field_type: FieldType::Number,
required: false,
contracts: vec![Contract::Range {
min: 0.0,
max: 100.0,
}],
default: Some(json!(50)),
doc: Some("Priority score (0-100)".to_string()),
},
],
custom_contracts: vec![],
source_path: PathBuf::from("schemas/agents/task_assignment.ncl"),
}
}
#[tokio::test]
async fn test_mcp_tool_valid_input() {
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("schemas")));
// Load pre-compiled schema
let schema = create_mcp_tool_schema();
let source = SchemaSource::Compiled(schema);
let loaded_schema = registry
.load_from_source(source, "tools/kanban_create_task")
.await
.unwrap();
let pipeline = ValidationPipeline::new(registry);
let input = json!({
"project_id": "550e8400-e29b-41d4-a716-446655440000",
"title": "Implement validation pipeline",
"priority": "high"
});
let result = pipeline
.validate_against_schema(&loaded_schema, &input)
.unwrap();
assert!(result.valid, "Valid input should pass validation");
assert!(result.errors.is_empty());
assert!(result.validated_data.is_some());
// Check defaults applied
let validated = result.validated_data.unwrap();
assert_eq!(validated["description"], "");
}
#[tokio::test]
async fn test_mcp_tool_missing_required_field() {
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("schemas")));
let schema = create_mcp_tool_schema();
let pipeline = ValidationPipeline::new(registry);
let input = json!({
"title": "Test Task",
// Missing project_id (required)
"priority": "high"
});
let result = pipeline.validate_against_schema(&schema, &input).unwrap();
assert!(!result.valid);
assert!(result.has_field_error("project_id"));
assert!(result.error_messages()[0].contains("Required field missing"));
}
#[tokio::test]
async fn test_mcp_tool_invalid_uuid() {
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("schemas")));
let schema = create_mcp_tool_schema();
let pipeline = ValidationPipeline::new(registry);
let input = json!({
"project_id": "not-a-valid-uuid",
"title": "Test Task",
"priority": "high"
});
let result = pipeline.validate_against_schema(&schema, &input).unwrap();
assert!(!result.valid);
assert!(result.has_field_error("project_id"));
let error_msg = result.error_messages()[0].to_lowercase();
assert!(error_msg.contains("uuid") || error_msg.contains("format"));
}
#[tokio::test]
async fn test_mcp_tool_title_too_short() {
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("schemas")));
let schema = create_mcp_tool_schema();
let pipeline = ValidationPipeline::new(registry);
let input = json!({
"project_id": "550e8400-e29b-41d4-a716-446655440000",
"title": "AB", // Only 2 chars, min is 3
"priority": "high"
});
let result = pipeline.validate_against_schema(&schema, &input).unwrap();
assert!(!result.valid);
assert!(result.has_field_error("title"));
}
#[tokio::test]
async fn test_mcp_tool_title_too_long() {
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("schemas")));
let schema = create_mcp_tool_schema();
let pipeline = ValidationPipeline::new(registry);
let long_title = "A".repeat(201); // Max is 200
let input = json!({
"project_id": "550e8400-e29b-41d4-a716-446655440000",
"title": long_title,
"priority": "high"
});
let result = pipeline.validate_against_schema(&schema, &input).unwrap();
assert!(!result.valid);
assert!(result.has_field_error("title"));
}
#[tokio::test]
async fn test_agent_assignment_valid_input() {
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("schemas")));
let schema = create_agent_assignment_schema();
let pipeline = ValidationPipeline::new(registry);
let input = json!({
"role": "developer",
"title": "Fix authentication bug"
});
let result = pipeline.validate_against_schema(&schema, &input).unwrap();
assert!(result.valid);
assert!(result.errors.is_empty());
// Check default priority applied
let validated = result.validated_data.unwrap();
assert_eq!(validated["priority"], 50);
}
#[tokio::test]
async fn test_agent_assignment_invalid_role() {
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("schemas")));
let schema = create_agent_assignment_schema();
let pipeline = ValidationPipeline::new(registry);
let input = json!({
"role": "invalid_role", // Not in enum
"title": "Test Task"
});
let result = pipeline.validate_against_schema(&schema, &input).unwrap();
assert!(!result.valid);
assert!(result.has_field_error("role"));
}
#[tokio::test]
async fn test_agent_assignment_priority_out_of_range() {
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("schemas")));
let schema = create_agent_assignment_schema();
let pipeline = ValidationPipeline::new(registry);
let input = json!({
"role": "developer",
"title": "Test Task",
"priority": 150 // > 100
});
let result = pipeline.validate_against_schema(&schema, &input).unwrap();
assert!(!result.valid);
assert!(result.has_field_error("priority"));
}
#[tokio::test]
async fn test_strict_mode_rejects_unknown_fields() {
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("schemas")));
let schema = create_mcp_tool_schema();
let pipeline = ValidationPipeline::new(registry).with_strict_mode(true);
let input = json!({
"project_id": "550e8400-e29b-41d4-a716-446655440000",
"title": "Test Task",
"priority": "high",
"unknown_field": "value" // Not in schema
});
let result = pipeline.validate_against_schema(&schema, &input).unwrap();
assert!(!result.valid);
assert!(result.has_field_error("unknown_field"));
}
#[tokio::test]
async fn test_non_strict_mode_allows_unknown_fields() {
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("schemas")));
let schema = create_mcp_tool_schema();
let pipeline = ValidationPipeline::new(registry).with_strict_mode(false);
let input = json!({
"project_id": "550e8400-e29b-41d4-a716-446655440000",
"title": "Test Task",
"priority": "high",
"unknown_field": "value" // Allowed in non-strict mode
});
let result = pipeline.validate_against_schema(&schema, &input).unwrap();
// Should pass, but unknown_field won't be in validated_data
assert!(result.valid);
let validated = result.validated_data.unwrap();
assert!(!validated.as_object().unwrap().contains_key("unknown_field"));
}
#[tokio::test]
async fn test_multiple_validation_errors() {
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("schemas")));
let schema = create_mcp_tool_schema();
let pipeline = ValidationPipeline::new(registry);
let input = json!({
"project_id": "invalid-uuid",
"title": "AB", // Too short
"priority": "high"
});
let result = pipeline.validate_against_schema(&schema, &input).unwrap();
assert!(!result.valid);
// Should have errors for both project_id and title
assert!(result.errors.len() >= 2);
assert!(result.has_field_error("project_id"));
assert!(result.has_field_error("title"));
}
#[tokio::test]
async fn test_validation_result_serialization() {
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("schemas")));
let schema = create_mcp_tool_schema();
let pipeline = ValidationPipeline::new(registry);
let input = json!({
"title": "", // Empty violates NonEmpty
"priority": "high"
});
let result = pipeline.validate_against_schema(&schema, &input).unwrap();
// Serialize to JSON
let json_result = serde_json::to_string(&result).unwrap();
assert!(json_result.contains("valid"));
assert!(json_result.contains("errors"));
// Deserialize back
let _deserialized: vapora_shared::validation::ValidationResult =
serde_json::from_str(&json_result).unwrap();
}

View File

@ -10,13 +10,16 @@ Complete system architecture and design documentation for VAPORA.
- **[Multi-IA Router](multi-ia-router.md)** — Provider selection, routing rules, and fallback mechanisms - **[Multi-IA Router](multi-ia-router.md)** — Provider selection, routing rules, and fallback mechanisms
- **[Roles, Permissions & Profiles](roles-permissions-profiles.md)** — Cedar policy engine and RBAC implementation - **[Roles, Permissions & Profiles](roles-permissions-profiles.md)** — Cedar policy engine and RBAC implementation
- **[Task, Agent & Doc Manager](task-agent-doc-manager.md)** — Task orchestration and documentation lifecycle - **[Task, Agent & Doc Manager](task-agent-doc-manager.md)** — Task orchestration and documentation lifecycle
- **[Schema Validation Pipeline](schema-validation-pipeline.md)** — Runtime validation with Nickel contracts for MCP tools and agent tasks
## Overview ## Overview
These documents cover: These documents cover:
- Complete system architecture and design decisions - Complete system architecture and design decisions
- Multi-agent orchestration and coordination patterns - Multi-agent orchestration and coordination patterns
- Provider routing and selection strategies - Provider routing and selection strategies
- Workflow execution and task management - Workflow execution and task management
- Security, RBAC, and policy enforcement - Security, RBAC, and policy enforcement
- Learning-based agent selection and cost optimization - Learning-based agent selection and cost optimization
- Runtime schema validation with Nickel contracts

View File

@ -0,0 +1,507 @@
# Schema Validation Pipeline
Runtime validation system for MCP tools and agent task assignments using Nickel contracts.
## Overview
The Schema Validation Pipeline prevents downstream errors by validating inputs before execution. It uses Nickel schemas with contracts to enforce type safety, business rules, and data constraints at runtime.
**Problem Solved:** VAPORA previously assumed valid inputs or failed downstream. This caused:
- Invalid UUIDs reaching database queries
- Empty strings bypassing business logic
- Out-of-range priorities corrupting task queues
- Malformed contexts breaking agent execution
**Solution:** Validate all inputs against Nickel schemas before execution.
## Architecture
```text
┌─────────────────────────────────────────────────────────────────┐
│ Client Request │
│ (MCP Tool Invocation / Agent Task Assignment) │
└────────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ ValidationPipeline │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 1. Load schema from SchemaRegistry (cached) │ │
│ │ 2. Validate types (String, Number, Array, Object) │ │
│ │ 3. Check required fields │ │
│ │ 4. Apply contracts (NonEmpty, UUID, Range, etc.) │ │
│ │ 5. Apply default values │ │
│ │ 6. Return ValidationResult (valid + errors + data) │ │
│ └──────────────────────────────────────────────────────────┘ │
└────────────────────────────┬────────────────────────────────────┘
┌──────────┴──────────┐
│ │
▼ ▼
┌──────────────┐ ┌─────────────┐
│ Valid? │ │ Invalid? │
│ Execute │ │ Reject │
│ with data │ │ with errors│
└──────────────┘ └─────────────┘
```
## Components
### 1. ValidationPipeline
Core validation engine in `vapora-shared/src/validation/pipeline.rs`.
**Key Methods:**
```rust
pub async fn validate(
&self,
schema_name: &str,
input: &Value
) -> Result<ValidationResult>
```
**Validation Steps:**
1. Load compiled schema from registry
2. Validate field types (String, Number, Bool, Array, Object)
3. Check required fields (reject if missing)
4. Apply contracts (NonEmpty, UUID, Range, Email, etc.)
5. Apply default values for optional fields
6. Return ValidationResult with errors (if any)
**Strict Mode:** Rejects unknown fields not in schema.
### 2. SchemaRegistry
Schema loading and caching in `vapora-shared/src/validation/schema_registry.rs`.
**Features:**
- **Caching:** Compiled schemas cached in memory (Arc + RwLock)
- **Hot Reload:** `invalidate(schema_name)` to reload without restart
- **Schema Sources:** File system, embedded string, or URL (future)
**Schema Structure:**
```rust
pub struct CompiledSchema {
pub name: String,
pub fields: HashMap<String, FieldSchema>,
}
pub struct FieldSchema {
pub field_type: FieldType,
pub required: bool,
pub contracts: Vec<Contract>,
pub default: Option<Value>,
}
```
### 3. NickelBridge
CLI integration for Nickel operations in `vapora-shared/src/validation/nickel_bridge.rs`.
**Operations:**
- `typecheck(path)` — Validate Nickel syntax
- `export(path)` — Export schema as JSON
- `query(path, field)` — Query specific field
- `is_available()` — Check if Nickel CLI is installed
**Timeout Protection:** 30s default to prevent DoS from malicious Nickel code.
## Nickel Schemas
Located in `schemas/` directory (workspace root).
### Directory Structure
```text
schemas/
├── tools/ # MCP tool parameter validation
│ ├── kanban_create_task.ncl
│ ├── kanban_update_task.ncl
│ ├── assign_task_to_agent.ncl
│ ├── get_project_summary.ncl
│ └── get_agent_capabilities.ncl
└── agents/ # Agent task assignment validation
└── task_assignment.ncl
```
### Schema Format
```nickel
{
tool_name = "example_tool",
parameters = {
# Required field with contracts
user_id
| String
| doc "User UUID"
| std.string.NonEmpty
| std.string.match "^[0-9a-f]{8}-[0-9a-f]{4}-...$",
# Optional field with default
priority
| Number
| doc "Priority score (0-100)"
| std.number.between 0 100
| default = 50,
},
}
```
### Supported Contracts
| Contract | Description | Example |
|----------|-------------|---------|
| `std.string.NonEmpty` | String cannot be empty | Required text fields |
| `std.string.length.min N` | Minimum length | `min 3` for titles |
| `std.string.length.max N` | Maximum length | `max 200` for titles |
| `std.string.match PATTERN` | Regex validation | UUID format |
| `std.number.between A B` | Numeric range | `between 0 100` |
| `std.number.greater_than N` | Minimum value (exclusive) | `> -1` |
| `std.number.less_than N` | Maximum value (exclusive) | `< 1000` |
| `std.enum.TaggedUnion` | Enum validation | `[| 'low, 'high |]` |
## Integration Points
### MCP Server
Location: `crates/vapora-mcp-server/src/main.rs`
```rust
// Initialize validation pipeline
let schema_dir = std::env::var("VAPORA_SCHEMA_DIR")
.unwrap_or_else(|_| "schemas".to_string());
let registry = Arc::new(SchemaRegistry::new(PathBuf::from(&schema_dir)));
let validation = Arc::new(ValidationPipeline::new(registry));
// Add to AppState
#[derive(Clone)]
struct AppState {
validation: Arc<ValidationPipeline>,
}
// Validate in handler
async fn invoke_tool(
State(state): State<AppState>,
Json(request): Json<InvokeToolRequest>,
) -> impl IntoResponse {
let schema_name = format!("tools/{}", request.tool);
let validation_result = state
.validation
.validate(&schema_name, &request.parameters)
.await?;
if !validation_result.valid {
return (StatusCode::BAD_REQUEST, Json(validation_errors));
}
// Execute with validated data
let validated_params = validation_result.validated_data.unwrap();
// ...
}
```
### Agent Coordinator
Location: `crates/vapora-agents/src/coordinator.rs`
```rust
pub struct AgentCoordinator {
validation: Arc<ValidationPipeline>,
// ...
}
impl AgentCoordinator {
pub async fn assign_task(
&self,
role: &str,
title: String,
description: String,
context: String,
priority: u32,
) -> Result<String, CoordinatorError> {
// Validate inputs
let input = serde_json::json!({
"role": role,
"title": &title,
"description": &description,
"context": &context,
"priority": priority,
});
let validation_result = self
.validation
.validate("agents/task_assignment", &input)
.await?;
if !validation_result.valid {
return Err(CoordinatorError::ValidationError(
validation_result.errors.join(", ")
));
}
// Continue with validated inputs
// ...
}
}
```
## Usage Patterns
### 1. Validating MCP Tool Inputs
```rust
// In MCP server handler
let validation_result = state
.validation
.validate("tools/kanban_create_task", &input)
.await?;
if !validation_result.valid {
let errors: Vec<String> = validation_result
.errors
.iter()
.map(|e| e.to_string())
.collect();
return (StatusCode::BAD_REQUEST, Json(json!({
"success": false,
"validation_errors": errors,
})));
}
// Use validated data with defaults applied
let validated_data = validation_result.validated_data.unwrap();
```
### 2. Validating Agent Task Assignments
```rust
// In AgentCoordinator
let input = serde_json::json!({
"role": role,
"title": title,
"description": description,
"context": context,
"priority": priority,
});
let validation_result = self
.validation
.validate("agents/task_assignment", &input)
.await?;
if !validation_result.valid {
warn!("Validation failed: {:?}", validation_result.errors);
return Err(CoordinatorError::ValidationError(
format!("Invalid input: {}", validation_result.errors.join(", "))
));
}
```
### 3. Hot Reloading Schemas
```rust
// Invalidate single schema
registry.invalidate("tools/kanban_create_task").await;
// Invalidate all schemas (useful for config reload)
registry.invalidate_all().await;
```
## Testing Schemas
### Validate Syntax
```bash
nickel typecheck schemas/tools/kanban_create_task.ncl
```
### Export as JSON
```bash
nickel export schemas/tools/kanban_create_task.ncl
```
### Query Specific Field
```bash
nickel query --field parameters.title schemas/tools/kanban_create_task.ncl
```
## Adding New Schemas
1. Create `.ncl` file in appropriate directory (`tools/` or `agents/`)
2. Define `tool_name` or `schema_name`
3. Define `parameters` or `fields` with types and contracts
4. Add `doc` annotations for documentation
5. Test with `nickel typecheck`
6. Restart services or use hot-reload
**Example:**
```nickel
# schemas/tools/my_new_tool.ncl
{
tool_name = "my_new_tool",
parameters = {
name
| String
| doc "User name (3-50 chars)"
| std.string.NonEmpty
| std.string.length.min 3
| std.string.length.max 50,
age
| Number
| doc "User age (0-120)"
| std.number.between 0 120,
email
| String
| doc "User email address"
| std.string.Email,
active
| Bool
| doc "Account status"
| default = true,
},
}
```
## Configuration
### Environment Variables
- `VAPORA_SCHEMA_DIR` — Schema directory path (default: `"schemas"`)
**In tests:**
```rust
std::env::set_var("VAPORA_SCHEMA_DIR", "../../schemas");
```
**In production:**
```bash
export VAPORA_SCHEMA_DIR=/app/schemas
```
## Performance Characteristics
- **Schema Loading:** ~5-10ms (first load, then cached)
- **Validation:** ~0.1-0.5ms per request (in-memory)
- **Hot Reload:** ~10-20ms (invalidates cache, reloads from disk)
**Optimization:** SchemaRegistry uses `Arc<RwLock<HashMap>>` for concurrent reads.
## Security Considerations
### Timeout Protection
NickelBridge enforces 30s timeout on all CLI operations to prevent:
- Infinite loops in malicious Nickel code
- DoS attacks via crafted schemas
- Resource exhaustion
### Input Sanitization
Contracts prevent:
- SQL injection (via UUID/Email validation)
- XSS attacks (via length limits on text fields)
- Buffer overflows (via max length constraints)
- Type confusion (via strict type checking)
### Schema Validation
All schemas must pass `nickel typecheck` before deployment.
## Error Handling
### ValidationResult
```rust
pub struct ValidationResult {
pub valid: bool,
pub errors: Vec<ValidationError>,
pub validated_data: Option<Value>,
}
pub enum ValidationError {
MissingField(String),
TypeMismatch { field: String, expected: String, got: String },
ContractViolation { field: String, contract: String, value: String },
InvalidSchema(String),
}
```
### Error Response Format
```json
{
"success": false,
"error": "Validation failed",
"validation_errors": [
"Field 'project_id' must match UUID pattern",
"Field 'title' must be at least 3 characters",
"Field 'priority' must be between 0 and 100"
]
}
```
## Troubleshooting
### Schema Not Found
**Error:** `Schema file not found: schemas/tools/my_tool.ncl`
**Solution:** Check `VAPORA_SCHEMA_DIR` environment variable and ensure schema file exists.
### Nickel CLI Not Available
**Error:** `Nickel CLI not found in PATH`
**Solution:** Install Nickel CLI:
```bash
cargo install nickel-lang-cli
```
### Validation Always Fails
**Error:** All requests rejected with validation errors
**Solution:** Check schema syntax with `nickel typecheck`, verify field names match exactly.
## Future Enhancements
- [ ] Remote schema loading (HTTP/S3)
- [ ] Schema versioning and migration
- [ ] Custom contract plugins
- [ ] GraphQL schema generation from Nickel
- [ ] OpenAPI spec generation
## Related Documentation
- [Nickel Language Documentation](https://nickel-lang.org/)
- [VAPORA Architecture Overview](vapora-architecture.md)
- [Agent Coordination](agent-registry-coordination.md)
- [MCP Protocol Integration](../integrations/mcp-server.md)
## References
- **Implementation:** `crates/vapora-shared/src/validation/`
- **Schemas:** `schemas/`
- **Tests:** `crates/vapora-shared/tests/validation_integration.rs`
- **MCP Integration:** `crates/vapora-mcp-server/src/main.rs`
- **Agent Integration:** `crates/vapora-agents/src/coordinator.rs`

128
schemas/README.md Normal file
View File

@ -0,0 +1,128 @@
# VAPORA Validation Schemas
Nickel schemas for runtime validation of MCP tools and agent tasks.
## Directory Structure
```
schemas/
├── tools/ # MCP tool parameter validation
│ ├── kanban_create_task.ncl
│ ├── kanban_update_task.ncl
│ ├── assign_task_to_agent.ncl
│ ├── get_project_summary.ncl
│ └── get_agent_capabilities.ncl
└── agents/ # Agent task assignment validation
└── task_assignment.ncl
```
## Schema Format
Nickel schemas define:
- **Field types** (String, Number, Bool, Array, Object)
- **Required/optional fields** (via `default` values)
- **Validation contracts** (NonEmpty, Email, UUID, Range, Pattern, etc.)
- **Documentation** (via `doc` annotations)
### Example
```nickel
{
tool_name = "example_tool",
parameters = {
# Required field with contracts
user_id
| String
| doc "User UUID"
| std.string.NonEmpty
| std.string.match "^[0-9a-f]{8}-[0-9a-f]{4}-...$",
# Optional field with default
priority
| Number
| doc "Priority score (0-100)"
| std.number.between 0 100
| default = 50,
},
}
```
## Supported Contracts
| Contract | Description | Example |
|----------|-------------|---------|
| `std.string.NonEmpty` | String cannot be empty | Required text fields |
| `std.string.length.min N` | Minimum length | `min 3` for titles |
| `std.string.length.max N` | Maximum length | `max 200` for titles |
| `std.string.match PATTERN` | Regex validation | UUID format |
| `std.number.between A B` | Numeric range | `between 0 100` |
| `std.number.greater_than N` | Minimum value (exclusive) | `> -1` |
| `std.number.less_than N` | Maximum value (exclusive) | `< 1000` |
| `std.string.Email` | Email format | user@example.com |
| `std.string.Url` | URL format | https://... |
| `std.string.Uuid` | UUID format | xxxxxxxx-xxxx-... |
| `std.enum.TaggedUnion` | Enum validation | `[| 'low, 'high |]` |
## Usage
Schemas are loaded by `ValidationPipeline` at runtime:
```rust
use vapora_shared::validation::{SchemaRegistry, ValidationPipeline};
let registry = Arc::new(SchemaRegistry::new(PathBuf::from("schemas")));
let pipeline = ValidationPipeline::new(registry);
// Validate MCP tool input
let result = pipeline.validate("tools/kanban_create_task", &input).await?;
if !result.valid {
return (StatusCode::BAD_REQUEST, Json(result.errors));
}
```
## Testing Schemas
Validate schema syntax:
```bash
nickel typecheck schemas/tools/kanban_create_task.ncl
```
Export schema as JSON:
```bash
nickel export schemas/tools/kanban_create_task.ncl
```
Query specific field:
```bash
nickel query --field parameters.title schemas/tools/kanban_create_task.ncl
```
## Adding New Schemas
1. Create `.ncl` file in appropriate directory
2. Define `tool_name` or `schema_name`
3. Define `parameters` or `fields` with types and contracts
4. Add `doc` annotations for documentation
5. Test with `nickel typecheck`
6. Restart services to reload schemas (or use hot-reload)
## Hot Reload
Invalidate cached schema without restart:
```rust
registry.invalidate("tools/kanban_create_task").await;
```
Invalidate all schemas:
```rust
registry.invalidate_all().await;
```

View File

@ -0,0 +1,71 @@
# Schema for agent task assignment
# Used by AgentCoordinator for internal task assignment validation
{
schema_name = "task_assignment",
description = "Validates agent task assignment requests from AgentCoordinator",
fields = {
# Agent role - must match registered roles
role
| String
| doc "Agent role for task execution"
| std.string.NonEmpty
| std.enum.TaggedUnion
| [| 'developer, 'reviewer, 'architect, 'tester, 'documenter, 'devops, 'monitor, 'security |],
# Task title
title
| String
| doc "Task title (3-500 chars)"
| std.string.NonEmpty
| std.string.length.min 3
| std.string.length.max 500,
# Task description
description
| String
| doc "Detailed task description"
| default = "",
# Task context (JSON string)
context
| String
| doc "JSON context for task execution"
| default = "{}",
# Priority score (0-100)
priority
| Number
| doc "Task priority score (0=lowest, 100=highest)"
| std.number.between 0 100
| default = 50,
# Optional deadline (ISO 8601 datetime string)
deadline
| String
| doc "Task deadline (ISO 8601 format, optional)"
| default = "",
},
# Custom validation contracts
contracts = {
# Title must not be only whitespace
title_not_whitespace = fun fields =>
std.string.trim fields.title != "",
# Context must be valid JSON
context_valid_json = fun fields =>
if fields.context == "{}" then
true
else
std.string.is_match "^[\\{\\[]" fields.context,
# If deadline is provided, must match ISO 8601 pattern
deadline_iso8601 = fun fields =>
if fields.deadline == "" then
true
else
std.string.is_match "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}" fields.deadline,
},
}

View File

@ -0,0 +1,56 @@
# Schema for assign_task_to_agent MCP tool
# Validates agent task assignment requests
{
tool_name = "assign_task_to_agent",
description = "Assign a task to an agent by role",
parameters = {
# Agent role - must be valid role
role
| String
| doc "Agent role"
| std.string.NonEmpty
| std.enum.TaggedUnion
| [| 'developer, 'reviewer, 'architect, 'tester, 'documenter, 'devops, 'monitor, 'security |],
# Task title
task_title
| String
| doc "Task title (3-500 chars)"
| std.string.NonEmpty
| std.string.length.min 3
| std.string.length.max 500,
# Task description
task_description
| String
| doc "Task description (detailed requirements)"
| std.string.NonEmpty
| std.string.length.min 10,
# Optional priority (0-100)
priority
| Number
| doc "Task priority (0-100, default: 50)"
| std.number.between 0 100
| default = 50,
# Optional context (JSON string)
context
| String
| doc "Additional context as JSON string (optional)"
| default = "{}",
},
# Validation contracts
contracts = {
# Context must be valid JSON if provided
context_is_json = fun params =>
if params.context == "{}" then
true
else
# Simple JSON validation - starts with { or [
std.string.is_match "^[\\{\\[]" params.context,
},
}

View File

@ -0,0 +1,16 @@
# Schema for get_agent_capabilities MCP tool
# Validates agent capability requests
{
tool_name = "get_agent_capabilities",
description = "Get detailed agent capabilities and status",
parameters = {
# Agent UUID or ID
agent_id
| String
| doc "Agent UUID or unique identifier"
| std.string.NonEmpty
| std.string.length.min 3,
},
}

View File

@ -0,0 +1,16 @@
# Schema for get_project_summary MCP tool
# Validates project summary requests
{
tool_name = "get_project_summary",
description = "Get project summary and statistics",
parameters = {
# Project UUID
project_id
| String
| doc "UUID of the project"
| std.string.NonEmpty
| std.string.match "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$",
},
}

View File

@ -0,0 +1,45 @@
# Schema for kanban_create_task MCP tool
# Validates task creation requests with contracts
{
tool_name = "kanban_create_task",
description = "Create task in Kanban board with validation",
parameters = {
# Project UUID - must be valid UUID format
project_id
| String
| doc "UUID of the target project"
| std.string.NonEmpty
| std.string.match "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$",
# Task title - 3 to 200 characters
title
| String
| doc "Task title (3-200 chars)"
| std.string.NonEmpty
| std.string.length.min 3
| std.string.length.max 200,
# Optional description
description
| String
| doc "Task description (optional)"
| default = "",
# Priority level - enum validation
priority
| String
| doc "Task priority level"
| std.string.NonEmpty
| std.enum.TaggedUnion
| [| 'low, 'medium, 'high, 'critical |],
},
# Validation contracts (custom predicates)
contracts = {
# Title must not be only whitespace
title_not_whitespace = fun params =>
std.string.trim params.title != "",
},
}

View File

@ -0,0 +1,31 @@
# Schema for kanban_update_task MCP tool
# Validates task status updates and reordering
{
tool_name = "kanban_update_task",
description = "Update task status and order in Kanban board",
parameters = {
# Task UUID
task_id
| String
| doc "UUID of the task to update"
| std.string.NonEmpty
| std.string.match "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$",
# New status - enum validation
status
| String
| doc "New task status"
| std.string.NonEmpty
| std.enum.TaggedUnion
| [| 'todo, 'doing, 'review, 'done |],
# Optional order within column
order
| Number
| doc "Order within column (0-based index, optional)"
| std.number.greater_than (-1)
| default = 0,
},
}