diff --git a/Cargo.toml b/Cargo.toml index 0c62f9c..488f90b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,7 +74,13 @@ palette = { version = "0.7", features = ["serializing"] } # Database sqlx = { version = "0.8", features = ["runtime-tokio-native-tls", "sqlite", "macros"] } sqlx-sqlite = "0.8" -surrealdb = { version = "2.3", features = ["kv-mem", "kv-rocksdb"] } +surrealdb = { version = "3", features = ["kv-mem", "kv-surrealkv", "kv-rocksdb", "protocol-ws", "rustls"] } +platform-nats = { path = "../stratumiops/crates/platform-nats" } +stratum-orchestrator = { path = "../stratumiops/crates/stratum-orchestrator" } +stratum-graph = { path = "../stratumiops/crates/stratum-graph" } +stratum-state = { path = "../stratumiops/crates/stratum-state" } +kogral-core = { path = "../kogral/crates/kogral-core" } +bytes = "1.9" serde_bytes = "0.11" # Logging/Tracing diff --git a/config/provctl/installer-settings.toml b/config/provctl/installer-settings.toml index f2e5050..31b1af2 100644 --- a/config/provctl/installer-settings.toml +++ b/config/provctl/installer-settings.toml @@ -47,7 +47,7 @@ default = "sqlite" type = "sqlite" name = "SQLite" description = "File-based, no server" -platforms = [linux, macos, windows] +platforms = ["linux", "macos", "windows"] # Installation steps/checklist [checklist] diff --git a/core/Cargo.toml b/core/Cargo.toml index 5c6a237..683cf11 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -1,77 +1,3 @@ -# This is a virtual manifest grouped under the parent workspace in /Users/Akasha/Development/syntaxis/Cargo.toml -# All workspace configuration, dependencies, and profiles are defined in the root workspace - -[workspace.package] -version = "0.1.0" -edition = "2021" -rust-version = "1.75" -authors = ["syntaxis contributors"] -license = "MIT OR Apache-2.0" -repository = "https://github.com/syntaxis/core" - -[workspace.dependencies] -# Serialization -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -toml = "0.9" -uuid = { version = "1.18", features = ["v4", "serde"] } - -# Error handling -thiserror = "2.0" -anyhow = "1.0" - -# Async runtime -tokio = { version = "1.48", features = ["full"] } -async-trait = "0.1" -futures = "0.3" - -# Web framework -axum = { version = "0.8", features = ["ws"] } -tower = "0.5" -tower-http = { version = "0.6", features = ["trace", "cors", "fs"] } -tokio-rustls = "0.26" -rustls = "0.23" -rustls-pemfile = "2.2" - -# HTTP client -reqwest = { version = "0.12", features = ["json"] } - -# Logging/Tracing -tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } - -# Date/time -chrono = { version = "0.4", features = ["serde"] } - -# File operations -camino = "1.2" -walkdir = "2.5" - -# Templating -handlebars = "6.3" - -# Database -sqlx = { version = "0.8", features = ["runtime-tokio-native-tls", "sqlite", "macros"] } -sqlx-sqlite = "0.8" -surrealdb = { version = "2.3", features = ["kv-mem", "kv-rocksdb"] } -serde_bytes = "0.11" - -# Other utilities -indexmap = "2.12" -regex = "1.12" -moka = { version = "0.12", features = ["future"] } -tokio-tungstenite = "0.28" -jsonwebtoken = { version = "10.2", features = ["aws_lc_rs"] } -once_cell = "1.21" -prometheus = { version = "0.14", features = ["process"] } -async-nats = "0.45" -rand_core = "0.6" -rand = "0.8" - -# Dev dependencies -tokio-test = "0.4" -tempfile = "3.23" -assert_cmd = "2.1" -predicates = "3.1" -criterion = { version = "0.7", features = ["html_reports"] } -mockito = "1.6" +# Sub-directory grouping file — not a workspace root. +# Workspace definitions live in /Users/Akasha/Development/syntaxis/Cargo.toml. +# Packages in core/crates/ resolve { workspace = true } deps from the root workspace. diff --git a/core/crates/api/src/main.rs b/core/crates/api/src/main.rs index d040ec3..0db3344 100644 --- a/core/crates/api/src/main.rs +++ b/core/crates/api/src/main.rs @@ -207,8 +207,7 @@ async fn main() -> Result<(), Box> { ); // Initialize JWT provider if enabled - let jwt_prov = if auth_config.jwt_enabled && auth_config.jwt_secret.is_some() { - let secret = auth_config.jwt_secret.as_ref().unwrap(); + let jwt_prov = if let Some(secret) = auth_config.jwt_secret.as_ref().filter(|_| auth_config.jwt_enabled) { let expiration = auth_config.jwt_expiration as i64; match shared_api_lib::auth::jwt::JwtProvider::new(secret, expiration) { diff --git a/core/crates/cli/.vapora/config/project.toml b/core/crates/cli/.vapora/config/project.toml index 081d304..31a2d5c 100644 --- a/core/crates/cli/.vapora/config/project.toml +++ b/core/crates/cli/.vapora/config/project.toml @@ -3,7 +3,7 @@ mode = "Standalone" primary_language = "rust" languages = ["rust"] phase = "Creation" -database_path = ".project-tools/data/lifecycle.db" +database_path = "data.db" [project] name = "syntaxis-cli" @@ -13,8 +13,8 @@ authors = ["Your Name"] license = "MIT" keywords = [] categories = [] -created = "2025-11-17T00:36:54.732694Z" -last_updated = "2025-11-17T00:36:54.732694Z" +created = "2026-02-22T11:10:00.776028Z" +last_updated = "2026-02-22T11:10:00.776028Z" [phases] allowed_transitions = [] @@ -31,7 +31,7 @@ include_dev = false [export] format = "json" -output_path = ".project-tools/exports/project-{timestamp}.json" +output_path = "exports/project-{timestamp}.json" include_metadata = true include_tags = true pretty = true diff --git a/core/crates/cli/Cargo.toml b/core/crates/cli/Cargo.toml index 2ee5ca8..2f2dda7 100644 --- a/core/crates/cli/Cargo.toml +++ b/core/crates/cli/Cargo.toml @@ -9,7 +9,7 @@ repository.workspace = true description = "CLI tool for syntaxis management" [[bin]] -name = "syntaxis-cli" +name = "syntaxis" path = "src/main.rs" [package.metadata.syntaxis] diff --git a/core/crates/cli/src/handlers/project_resolver.rs b/core/crates/cli/src/handlers/project_resolver.rs index 5825bdc..cc107e6 100644 --- a/core/crates/cli/src/handlers/project_resolver.rs +++ b/core/crates/cli/src/handlers/project_resolver.rs @@ -62,8 +62,8 @@ async fn select_project_interactive() -> Result { if projects.is_empty() { let app_name = ui_config::get_app_name(); return Err(anyhow!( - "No projects found in database. Run '{}init' first.", - format!("{} ", app_name) + "No projects found in database. Run '{} init' first.", + app_name )); } diff --git a/core/crates/cli/src/main.rs b/core/crates/cli/src/main.rs index ba11499..c235c85 100644 --- a/core/crates/cli/src/main.rs +++ b/core/crates/cli/src/main.rs @@ -484,7 +484,7 @@ async fn main() -> Result<()> { // Find config file let config_path = find_config_path_warn_conflicts( "config.toml", - cli.config.as_ref().map(|s| std::path::Path::new(s)), + cli.config.as_deref().map(std::path::Path::new), ) .ok_or_else(|| { anyhow::anyhow!( diff --git a/core/crates/cli/tests/cli_integration_tests.rs b/core/crates/cli/tests/cli_integration_tests.rs index 5a77982..2fccb2d 100644 --- a/core/crates/cli/tests/cli_integration_tests.rs +++ b/core/crates/cli/tests/cli_integration_tests.rs @@ -9,8 +9,8 @@ use tempfile::TempDir; /// Create a temporary test directory with a valid lifecycle.toml config fn create_test_fixture() -> TempDir { let temp_dir = TempDir::new().expect("Failed to create temp directory"); - let config_dir = temp_dir.path().join(".project"); - fs::create_dir_all(&config_dir).expect("Failed to create .project directory"); + let config_dir = temp_dir.path().join(".syntaxis"); + fs::create_dir_all(&config_dir).expect("Failed to create .syntaxis directory"); let config_content = r#"project_type = "MultiLang" mode = "Standalone" diff --git a/core/crates/syntaxis-bridge/Cargo.toml b/core/crates/syntaxis-bridge/Cargo.toml index fcc0bac..2d946ff 100644 --- a/core/crates/syntaxis-bridge/Cargo.toml +++ b/core/crates/syntaxis-bridge/Cargo.toml @@ -1,6 +1,3 @@ -[workspace] -# Independent package - not part of any workspace - [package] name = "syntaxis-bridge" version = "0.1.0" diff --git a/core/crates/syntaxis-bridge/src/registry.rs b/core/crates/syntaxis-bridge/src/registry.rs index 4a87559..22d3363 100644 --- a/core/crates/syntaxis-bridge/src/registry.rs +++ b/core/crates/syntaxis-bridge/src/registry.rs @@ -42,7 +42,7 @@ impl IntegrationRegistry { &self, name: &str, ) -> Result, EcosystemIntegrationError> { - self.integrations.get(name).map(|i| Arc::clone(i)).ok_or( + self.integrations.get(name).map(Arc::clone).ok_or( EcosystemIntegrationError::NotFound { name: name.to_string(), }, diff --git a/core/crates/syntaxis/Cargo.toml b/core/crates/syntaxis/Cargo.toml index 88e42f7..0b86aca 100644 --- a/core/crates/syntaxis/Cargo.toml +++ b/core/crates/syntaxis/Cargo.toml @@ -62,6 +62,5 @@ tokio-test = { workspace = true } tempfile = { workspace = true } [features] -# Enable SurrealDB support (includes embedded database) -surrealdb = ["dep:surrealdb"] -default = ["surrealdb"] +surrealdb-backend = ["dep:surrealdb"] +default = ["surrealdb-backend"] diff --git a/core/crates/syntaxis/src/lib.rs b/core/crates/syntaxis/src/lib.rs index b285f44..29f1881 100644 --- a/core/crates/syntaxis/src/lib.rs +++ b/core/crates/syntaxis/src/lib.rs @@ -49,8 +49,8 @@ pub use config::{ SbomFormat, }; pub use error::{LifecycleError, Result}; -#[cfg(feature = "surrealdb")] -pub use persistence::SurrealDatabase; +#[cfg(feature = "surrealdb-backend")] +pub use persistence::{SurrealDatabase, SurrealDbBackendConfig, SurrealEngineConfig}; pub use persistence::{ Database, DatabaseConfig, DbChecklistItem, DbPhaseHistory, DbPhaseTransition, DbProject, DbSecurityAssessment, DbSecurityAssessmentDetail, DbTeamMember, DbToolConfiguration, diff --git a/core/crates/syntaxis/src/persistence/config.rs b/core/crates/syntaxis/src/persistence/config.rs index 3da2b6c..f9cee6e 100644 --- a/core/crates/syntaxis/src/persistence/config.rs +++ b/core/crates/syntaxis/src/persistence/config.rs @@ -3,13 +3,6 @@ //! Provides configuration structures for different database backends. //! Configuration is typically loaded from TOML files and can be overridden //! by environment variables. -//! -//! # Examples -//! -//! ```ignore -//! let config = DatabaseConfig::load("configs/database.toml")?; -//! let db = config.create_database().await?; -//! ``` use serde::{Deserialize, Serialize}; use std::path::Path; @@ -23,10 +16,11 @@ pub struct DatabaseConfig { /// SQLite configuration (required if engine is "sqlite") pub sqlite: Option, - /// SurrealDB configuration (required if engine is "surrealdb") - pub surrealdb: Option, + /// SurrealDB dual-engine configuration (required if engine is "surrealdb") + #[cfg(feature = "surrealdb-backend")] + pub surrealdb: Option, - /// Optional PostgreSQL configuration for future support + /// PostgreSQL configuration (future support) pub postgresql: Option, } @@ -36,90 +30,125 @@ pub struct SqliteConfig { /// Path to SQLite database file pub path: String, - /// Maximum number of connections in pool (default: 5) + /// Maximum number of connections in pool #[serde(default = "default_sqlite_pool_size")] pub max_connections: u32, - /// Connection timeout in seconds (default: 30) + /// Connection timeout in seconds #[serde(default = "default_timeout_secs")] pub timeout_secs: u64, - /// Enable write-ahead logging (default: true) + /// Enable write-ahead logging #[serde(default = "default_true")] pub wal_mode: bool, - /// PRAGMA synchronous setting: OFF, NORMAL, FULL (default: NORMAL) + /// PRAGMA synchronous setting: OFF, NORMAL, FULL #[serde(default = "default_pragma_synchronous")] pub pragma_synchronous: String, - /// PRAGMA cache_size in pages (default: 2000) + /// PRAGMA cache_size in pages #[serde(default = "default_cache_size")] pub pragma_cache_size: i32, } -/// SurrealDB-specific configuration +/// SurrealDB engine selector — dispatches via `engine::any::connect(url)`. +/// +/// Serialized with an internal `engine` tag so TOML records map directly: +/// `{ engine = "surreal_kv", path = ".data/syntaxis/core" }` → `SurrealKv`. +#[cfg(feature = "surrealdb-backend")] #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SurrealDbConfig { - /// SurrealDB server URL (e.g., "ws://localhost:8000") - pub url: String, - - /// Namespace for isolation (default: "syntaxis") - #[serde(default = "default_namespace")] - pub namespace: String, - - /// Database name (default: "projects") - #[serde(default = "default_database")] - pub database: String, - - /// Username for authentication - pub username: String, - - /// Password for authentication - pub password: String, - - /// Maximum connections (default: 10) - #[serde(default = "default_surrealdb_pool_size")] - pub max_connections: u32, - - /// Connection timeout in seconds (default: 30) - #[serde(default = "default_timeout_secs")] - pub timeout_secs: u64, - - /// Enable TLS (default: false) - #[serde(default)] - pub tls_enabled: bool, - - /// TLS certificate path (optional) - pub tls_cert_path: Option, +#[serde(tag = "engine", rename_all = "snake_case")] +pub enum SurrealEngineConfig { + /// In-memory — tests and ephemeral use only + Mem, + /// SurrealKV embedded (B-tree) — default for relational data (projects, tasks, phases) + SurrealKv { + /// Filesystem path for the SurrealKV database directory + path: String, + }, + /// RocksDB embedded (LSM) — default for append-heavy data (audit logs) + RocksDb { + /// Filesystem path for the RocksDB database directory + path: String, + }, + /// Remote WebSocket — team and cloud deployments + Ws { + /// Full WebSocket URL, e.g. `ws://surrealdb.internal:8000` + url: String, + }, } -/// PostgreSQL configuration (for future support) +#[cfg(feature = "surrealdb-backend")] +impl SurrealEngineConfig { + /// Produce the URL string consumed by `surrealdb::engine::any::connect`. + #[must_use] + pub fn to_url(&self) -> String { + match self { + Self::Mem => "mem://".to_string(), + Self::SurrealKv { path } => format!("surrealkv://{path}"), + Self::RocksDb { path } => format!("rocksdb://{path}"), + Self::Ws { url } => url.clone(), + } + } +} + +/// Dual-engine SurrealDB backend configuration. +/// +/// `core` (default: SurrealKV) stores relational/graph data with a B-tree engine +/// suited for random-access patterns. `hot` (default: RocksDB) stores audit logs +/// and append-heavy data with an LSM engine suited for sequential writes. +#[cfg(feature = "surrealdb-backend")] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SurrealDbBackendConfig { + /// Engine for relational data (projects, tasks, phases, checklists) + #[serde(default = "default_core_engine")] + pub core: SurrealEngineConfig, + /// Engine for hot/append data (phase history, future audit embeddings) + #[serde(default = "default_hot_engine")] + pub hot: SurrealEngineConfig, + /// SurrealDB namespace shared by both engines + #[serde(default = "default_surreal_namespace")] + pub namespace: String, + /// Auth username (used for Ws engine only) + pub username: Option, + /// Auth password (used for Ws engine only) + pub password: Option, +} + +#[cfg(feature = "surrealdb-backend")] +impl Default for SurrealDbBackendConfig { + fn default() -> Self { + Self { + core: default_core_engine(), + hot: default_hot_engine(), + namespace: default_surreal_namespace(), + username: None, + password: None, + } + } +} + +/// PostgreSQL configuration (future support) #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PostgresConfig { /// Connection string pub url: String, - /// Maximum connections (default: 10) + /// Maximum connections #[serde(default = "default_postgres_pool_size")] pub max_connections: u32, - /// Connection timeout in seconds (default: 30) + /// Connection timeout in seconds #[serde(default = "default_timeout_secs")] pub timeout_secs: u64, } -// ============================================================================ -// DEFAULT IMPLEMENTATIONS -// ============================================================================ +// ── default constructors ───────────────────────────────────────────────────── fn default_sqlite_pool_size() -> u32 { 5 } -fn default_surrealdb_pool_size() -> u32 { - 10 -} - fn default_postgres_pool_size() -> u32 { 10 } @@ -140,20 +169,29 @@ fn default_cache_size() -> i32 { 2000 } -fn default_namespace() -> String { +#[cfg(feature = "surrealdb-backend")] +fn default_core_engine() -> SurrealEngineConfig { + SurrealEngineConfig::SurrealKv { + path: ".data/syntaxis/core".to_string(), + } +} + +#[cfg(feature = "surrealdb-backend")] +fn default_hot_engine() -> SurrealEngineConfig { + SurrealEngineConfig::RocksDb { + path: ".data/syntaxis/hot".to_string(), + } +} + +#[cfg(feature = "surrealdb-backend")] +fn default_surreal_namespace() -> String { "syntaxis".to_string() } -fn default_database() -> String { - "projects".to_string() -} - -// ============================================================================ -// IMPLEMENTATIONS -// ============================================================================ +// ── DatabaseConfig impl ────────────────────────────────────────────────────── impl DatabaseConfig { - /// Create a default SQLite configuration + /// Create a default SQLite configuration. pub fn sqlite_default(path: &str) -> Self { Self { engine: "sqlite".to_string(), @@ -165,45 +203,54 @@ impl DatabaseConfig { pragma_synchronous: "NORMAL".to_string(), pragma_cache_size: 2000, }), + #[cfg(feature = "surrealdb-backend")] surrealdb: None, postgresql: None, } } - /// Create a default SurrealDB configuration - pub fn surrealdb_default(url: &str, username: &str, password: &str) -> Self { + /// Create a default SurrealDB configuration using embedded engines. + #[cfg(feature = "surrealdb-backend")] + pub fn surrealdb_default() -> Self { Self { engine: "surrealdb".to_string(), sqlite: None, - surrealdb: Some(SurrealDbConfig { - url: url.to_string(), - namespace: default_namespace(), - database: default_database(), - username: username.to_string(), - password: password.to_string(), - max_connections: default_surrealdb_pool_size(), - timeout_secs: default_timeout_secs(), - tls_enabled: false, - tls_cert_path: None, + surrealdb: Some(SurrealDbBackendConfig::default()), + postgresql: None, + } + } + + /// Create a WebSocket-connected SurrealDB configuration. + #[cfg(feature = "surrealdb-backend")] + pub fn surrealdb_ws(url: &str, username: &str, password: &str) -> Self { + Self { + engine: "surrealdb".to_string(), + sqlite: None, + surrealdb: Some(SurrealDbBackendConfig { + core: SurrealEngineConfig::Ws { url: url.to_string() }, + hot: SurrealEngineConfig::Ws { url: url.to_string() }, + namespace: default_surreal_namespace(), + username: Some(username.to_string()), + password: Some(password.to_string()), }), postgresql: None, } } - /// Load configuration from TOML file + /// Load configuration from a TOML file. pub fn load_from_file>(path: P) -> crate::error::Result { let contents = std::fs::read_to_string(path)?; Self::load_from_str(&contents) } - /// Load configuration from TOML string + /// Load configuration from a TOML string. pub fn load_from_str(toml: &str) -> crate::error::Result { toml::from_str(toml).map_err(|e| { - crate::error::LifecycleError::Config(format!("Failed to parse config: {}", e)) + crate::error::LifecycleError::Config(format!("Failed to parse config: {e}")) }) } - /// Validate configuration + /// Validate that the selected engine has a matching configuration block. pub fn validate(&self) -> crate::error::Result<()> { match self.engine.as_str() { "sqlite" => { @@ -215,12 +262,21 @@ impl DatabaseConfig { Ok(()) } "surrealdb" => { - self.surrealdb.as_ref().ok_or_else(|| { - crate::error::LifecycleError::Config( - "SurrealDB engine selected but no surrealdb config provided".to_string(), - ) - })?; - Ok(()) + #[cfg(feature = "surrealdb-backend")] + { + self.surrealdb.as_ref().ok_or_else(|| { + crate::error::LifecycleError::Config( + "SurrealDB engine selected but no surrealdb config provided" + .to_string(), + ) + })?; + Ok(()) + } + #[cfg(not(feature = "surrealdb-backend"))] + Err(crate::error::LifecycleError::Config( + "SurrealDB engine is not compiled in; enable feature 'surrealdb-backend'" + .to_string(), + )) } "postgresql" => { self.postgresql.as_ref().ok_or_else(|| { @@ -231,28 +287,24 @@ impl DatabaseConfig { Ok(()) } other => Err(crate::error::LifecycleError::Config(format!( - "Unknown database engine: {}", - other + "Unknown database engine: {other}" ))), } } - /// Get the engine type - pub fn get_engine(&self) -> &str { - &self.engine - } - - /// Check if SQLite backend is configured + /// Returns true if SQLite backend is configured. pub fn is_sqlite(&self) -> bool { self.engine == "sqlite" } - /// Check if SurrealDB backend is configured + /// Returns true if SurrealDB backend is configured. pub fn is_surrealdb(&self) -> bool { self.engine == "surrealdb" } } +// ── tests ──────────────────────────────────────────────────────────────────── + #[cfg(test)] mod tests { use super::*; @@ -265,18 +317,6 @@ mod tests { assert_eq!(config.sqlite.unwrap().path, "test.db"); } - #[test] - fn test_surrealdb_default_config() { - let config = DatabaseConfig::surrealdb_default("ws://localhost:8000", "user", "pass"); - assert!(config.is_surrealdb()); - assert!(config.surrealdb.is_some()); - - let sdb = config.surrealdb.unwrap(); - assert_eq!(sdb.url, "ws://localhost:8000"); - assert_eq!(sdb.username, "user"); - assert_eq!(sdb.password, "pass"); - } - #[test] fn test_validate_sqlite() { let config = DatabaseConfig::sqlite_default("test.db"); @@ -288,6 +328,7 @@ mod tests { let config = DatabaseConfig { engine: "mongodb".to_string(), sqlite: None, + #[cfg(feature = "surrealdb-backend")] surrealdb: None, postgresql: None, }; @@ -324,22 +365,80 @@ max_connections = 10 assert!(config.wal_mode); } - #[test] - fn test_surrealdb_config_defaults() { - let config = SurrealDbConfig { - url: "ws://localhost:8000".to_string(), - namespace: default_namespace(), - database: default_database(), - username: "root".to_string(), - password: "root".to_string(), - max_connections: default_surrealdb_pool_size(), - timeout_secs: default_timeout_secs(), - tls_enabled: false, - tls_cert_path: None, - }; + #[cfg(feature = "surrealdb-backend")] + mod surreal_tests { + use super::*; - assert_eq!(config.namespace, "syntaxis"); - assert_eq!(config.database, "projects"); - assert_eq!(config.max_connections, 10); + #[test] + fn test_surrealdb_default_config() { + let config = DatabaseConfig::surrealdb_default(); + assert!(config.is_surrealdb()); + assert!(config.surrealdb.is_some()); + } + + #[test] + fn test_surrealdb_ws_config() { + let config = + DatabaseConfig::surrealdb_ws("ws://localhost:8000", "root", "root"); + assert!(config.is_surrealdb()); + let cfg = config.surrealdb.unwrap(); + assert_eq!(cfg.username, Some("root".to_string())); + assert!(matches!(cfg.core, SurrealEngineConfig::Ws { .. })); + } + + #[test] + fn test_engine_to_url_mem() { + assert_eq!(SurrealEngineConfig::Mem.to_url(), "mem://"); + } + + #[test] + fn test_engine_to_url_surrealkv() { + let eng = SurrealEngineConfig::SurrealKv { + path: "/data/core".to_string(), + }; + assert_eq!(eng.to_url(), "surrealkv:///data/core"); + } + + #[test] + fn test_engine_to_url_rocksdb() { + let eng = SurrealEngineConfig::RocksDb { + path: "/data/hot".to_string(), + }; + assert_eq!(eng.to_url(), "rocksdb:///data/hot"); + } + + #[test] + fn test_engine_to_url_ws() { + let eng = SurrealEngineConfig::Ws { + url: "ws://host:8000".to_string(), + }; + assert_eq!(eng.to_url(), "ws://host:8000"); + } + + #[test] + fn test_backend_config_default() { + let cfg = SurrealDbBackendConfig::default(); + assert_eq!(cfg.namespace, "syntaxis"); + assert!(cfg.username.is_none()); + assert!(matches!(cfg.core, SurrealEngineConfig::SurrealKv { .. })); + assert!(matches!(cfg.hot, SurrealEngineConfig::RocksDb { .. })); + } + + #[test] + fn test_validate_surrealdb() { + let config = DatabaseConfig::surrealdb_default(); + assert!(config.validate().is_ok()); + } + + #[test] + fn test_validate_surrealdb_missing_config() { + let config = DatabaseConfig { + engine: "surrealdb".to_string(), + sqlite: None, + surrealdb: None, + postgresql: None, + }; + assert!(config.validate().is_err()); + } } } diff --git a/core/crates/syntaxis/src/persistence/mod.rs b/core/crates/syntaxis/src/persistence/mod.rs index 3d97b39..5aa83db 100644 --- a/core/crates/syntaxis/src/persistence/mod.rs +++ b/core/crates/syntaxis/src/persistence/mod.rs @@ -26,14 +26,16 @@ pub mod config; pub mod error; pub mod migration; pub mod sqlite_impl; -#[cfg(feature = "surrealdb")] +#[cfg(feature = "surrealdb-backend")] pub mod surrealdb_impl; // Re-export public types pub use config::DatabaseConfig; +#[cfg(feature = "surrealdb-backend")] +pub use config::{SurrealDbBackendConfig, SurrealEngineConfig}; pub use error::PersistenceError; pub use sqlite_impl::SqliteDatabase; -#[cfg(feature = "surrealdb")] +#[cfg(feature = "surrealdb-backend")] pub use surrealdb_impl::SurrealDatabase; use crate::error::Result; diff --git a/core/crates/syntaxis/src/persistence/surrealdb_impl.rs b/core/crates/syntaxis/src/persistence/surrealdb_impl.rs index 506a5a1..a98b043 100644 --- a/core/crates/syntaxis/src/persistence/surrealdb_impl.rs +++ b/core/crates/syntaxis/src/persistence/surrealdb_impl.rs @@ -1,280 +1,284 @@ -//! SurrealDB database implementation for version 2.3+ +//! SurrealDB 3 storage backend — `engine::any` dual-engine dispatcher //! -//! This module provides a complete SurrealDB implementation of the Database trait. -//! It supports both embedded (in-memory and RocksDB) and network modes. +//! Uses a single `Surreal` type for all engine variants, selected at +//! runtime via URL scheme. The dual-engine layout separates: //! -//! Enabled with the `surrealdb` feature (enabled by default). +//! - `core_db` → SurrealKV (default) — B-tree, suited for relational/graph data +//! (projects, tasks, phases, checklists, tools, team) +//! - `hot_db` → RocksDB (default) — LSM, suited for append-heavy data +//! (phase history / audit logs, future embeddings) //! -//! # Features +//! ## SurrealDB 3 and type safety //! -//! - Full async/await support with SurrealDB 2.3+ -//! - Automatic schema initialization -//! - Type-safe queries via serde -//! - Support for all database operations -//! -//! # Examples -//! -//! ```ignore -//! // Create in-memory database (testing) -//! let db = SurrealDatabase::new_memory().await?; -//! -//! // Create file-based database -//! let db = SurrealDatabase::new_file("workspace.surreal").await?; -//! ``` +//! All I/O routes through `serde_json::Value`, which implements `SurrealValue`. +//! Domain types never touch the SurrealDB wire directly — all values go through +//! explicit `serde_json::to_value` / `serde_json::from_value` conversion. +//! SurrealDB 3 returns record IDs as `"table:key"` strings; `from_json_record` +//! normalizes them back to bare application IDs before deserialization. use super::{ Database, DbChecklistItem, DbPhaseHistory, DbPhaseTransition, DbProject, DbSecurityAssessment, DbSecurityAssessmentDetail, DbTeamMember, DbToolConfiguration, DbToolDependency, }; use crate::error::{LifecycleError, Result}; +use crate::persistence::config::{SurrealDbBackendConfig, SurrealEngineConfig}; use async_trait::async_trait; use chrono::Utc; -use serde_json::json; -use surrealdb::engine::local::{Db, Mem, RocksDb}; -use surrealdb::Surreal; +use surrealdb::{engine::any, engine::any::Any, opt::auth::Root, Surreal}; -/// SurrealDB database implementation +// ── serialization helpers ───────────────────────────────────────────────────── + +fn to_json(v: &T) -> Result { + serde_json::to_value(v).map_err(|e| LifecycleError::Database(e.to_string())) +} + +/// Deserialize from a SurrealDB 3 response, normalizing the `id` field. /// -/// Supports both embedded (in-memory, file-based) and network modes. -#[derive(Debug, Clone)] +/// SurrealDB 3 serializes record IDs as `"table:key"` strings. This strips +/// the `"table:"` prefix so domain structs with `id: String` deserialize +/// correctly. +fn from_json_record( + mut val: serde_json::Value, + table: &str, +) -> Result { + if let Some(obj) = val.as_object_mut() { + if let Some(id_val) = obj.get("id").cloned() { + let prefix = format!("{table}:"); + let normalized = match &id_val { + serde_json::Value::String(s) => s + .strip_prefix(&prefix) + .map(|s| s.to_string()) + .unwrap_or_else(|| s.clone()), + // SurrealDB 2.x-style structured Thing — kept for compatibility + other => other + .get("id") + .and_then(|v| v.get("String")) + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string(), + }; + if !normalized.is_empty() { + obj.insert("id".to_string(), serde_json::Value::String(normalized)); + } + } + } + serde_json::from_value(val).map_err(|e| LifecycleError::Database(e.to_string())) +} + +fn from_json_records( + vals: Vec, + table: &str, +) -> Result> { + vals.into_iter() + .map(|v| from_json_record(v, table)) + .collect() +} + +// ── engine factory ──────────────────────────────────────────────────────────── + +/// Open a SurrealDB connection using `engine::any` URL dispatch. +/// +/// Calls `use_ns` / `use_db` at connection time; callers never need to +/// repeat namespace/database selection per query. +async fn open_engine(url: &str, namespace: &str, database: &str) -> Result> { + let conn = any::connect(url) + .await + .map_err(|e| LifecycleError::Database(format!("connect '{url}' failed: {e}")))?; + conn.use_ns(namespace) + .use_db(database) + .await + .map_err(|e| LifecycleError::Database(format!("use_ns/use_db failed: {e}")))?; + Ok(conn) +} + +// ── storage type ────────────────────────────────────────────────────────────── + +/// SurrealDB 3 database implementation with dual-engine layout. +/// +/// `Surreal` is `Clone + Send + Sync`; no `Arc>` needed. pub struct SurrealDatabase { - db: Surreal, + /// SurrealKV (default) — projects, tasks, phases, checklists, tools, team + core_db: Surreal, + /// RocksDB (default) — phase history, future audit embeddings + hot_db: Surreal, +} + +impl std::fmt::Debug for SurrealDatabase { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SurrealDatabase").finish_non_exhaustive() + } +} + +impl Clone for SurrealDatabase { + fn clone(&self) -> Self { + Self { + core_db: self.core_db.clone(), + hot_db: self.hot_db.clone(), + } + } } impl SurrealDatabase { - /// Create in-memory SurrealDB instance (for testing) - pub async fn new_memory() -> Result { - let db = Surreal::new::(()).await.map_err(|e| { - LifecycleError::Database(format!("Failed to create in-memory SurrealDB: {}", e)) - })?; - - // Use default namespace and database - db.use_ns("syntaxis") - .use_db("projects") - .await - .map_err(|e| { - LifecycleError::Database(format!("Failed to select namespace/database: {}", e)) - })?; - - let instance = Self { db }; - instance.init_db_schema().await?; - Ok(instance) - } - - /// Create file-based SurrealDB instance (RocksDB storage) - pub async fn new_file(path: &str) -> Result { - let db = Surreal::new::(path).await.map_err(|e| { - LifecycleError::Database(format!("Failed to create file-based SurrealDB: {}", e)) - })?; - - // Use default namespace and database - db.use_ns("syntaxis") - .use_db("projects") - .await - .map_err(|e| { - LifecycleError::Database(format!("Failed to select namespace/database: {}", e)) - })?; - - let instance = Self { db }; - instance.init_db_schema().await?; - Ok(instance) - } - - /// Create server-based SurrealDB connection + /// Construct from `SurrealDbBackendConfig`, opening both engine connections. /// - /// Note: Server mode requires SurrealDB to be running separately. - /// Start with: `surreal start --bind 127.0.0.1:8000 memory` + /// # Errors /// - /// This is primarily for integration testing and production deployments. - #[allow(unused_variables)] - pub async fn new_server( - url: &str, - namespace: &str, - database: &str, - username: Option<&str>, - password: Option<&str>, - ) -> Result { - // NOTE: Server-mode (WebSocket client) requires a different type parameter - // than our local Db wrapper. For now, fall back to in-memory for testing. - // Full server-mode support would require: - // 1. Using an enum wrapper or generic type parameter - // 2. Or creating a separate ServerDatabase struct - // - // For integration testing, use SurrealDatabase::new_memory() and - // ensure SurrealDB server is running separately on the configured URL. + /// Returns `LifecycleError::Database` if either engine connection or + /// namespace/database selection fails. + pub async fn from_config(cfg: &SurrealDbBackendConfig) -> Result { + let core_url = cfg.core.to_url(); + let hot_url = cfg.hot.to_url(); - eprintln!( - "⚠ SurrealDB server mode requested ({}), but falling back to in-memory mode", - url - ); - eprintln!(" To use server mode, ensure SurrealDB is running: surreal start --bind 127.0.0.1:8000 memory"); - eprintln!( - " Current limitation: Server client requires different type parameter than local Db" - ); + let core_db = open_engine(&core_url, &cfg.namespace, "core").await?; + let hot_db = open_engine(&hot_url, &cfg.namespace, "hot").await?; - // Fall back to in-memory for now - Self::new_memory().await - } - - /// Initialize database schema (private method) - async fn init_db_schema(&self) -> Result<()> { - // Create tables with SCHEMAFULL enforcement - let tables = vec![ - "projects", - "checklist_items", - "phase_transitions", - "security_assessments", - "phase_history", - "tool_configurations", - "tool_dependencies", - "security_assessment_details", - "users", - ]; - - for table in tables { - let _ = self - .db - .query(format!("DEFINE TABLE {} SCHEMAFULL;", table)) + // Authenticate for remote WS connections only + if matches!(cfg.core, SurrealEngineConfig::Ws { .. }) { + let user = cfg + .username + .as_deref() + .unwrap_or("root"); + let pass = cfg + .password + .as_deref() + .unwrap_or(""); + core_db + .signin(Root { + username: user.to_string(), + password: pass.to_string(), + }) .await - .map_err(|e| LifecycleError::Database(e.to_string()))?; + .map_err(|e| LifecycleError::Database(format!("core_db signin failed: {e}")))?; } - Ok(()) + if matches!(cfg.hot, SurrealEngineConfig::Ws { .. }) { + let user = cfg.username.as_deref().unwrap_or("root"); + let pass = cfg.password.as_deref().unwrap_or(""); + hot_db + .signin(Root { + username: user.to_string(), + password: pass.to_string(), + }) + .await + .map_err(|e| LifecycleError::Database(format!("hot_db signin failed: {e}")))?; + } + + Ok(Self { core_db, hot_db }) + } + + /// Create an in-memory SurrealDB instance (both engines use `mem://`). + /// + /// Convenience constructor for tests and ephemeral use. + pub async fn new_memory() -> Result { + let cfg = SurrealDbBackendConfig { + core: SurrealEngineConfig::Mem, + hot: SurrealEngineConfig::Mem, + namespace: "syntaxis".to_string(), + username: None, + password: None, + }; + Self::from_config(&cfg).await + } + + /// Create a file-based SurrealDB instance using SurrealKV for core and + /// RocksDB for hot data. + pub async fn new_file(core_path: &str, hot_path: &str) -> Result { + let cfg = SurrealDbBackendConfig { + core: SurrealEngineConfig::SurrealKv { + path: core_path.to_string(), + }, + hot: SurrealEngineConfig::RocksDb { + path: hot_path.to_string(), + }, + namespace: "syntaxis".to_string(), + username: None, + password: None, + }; + Self::from_config(&cfg).await } } +// ── Database trait implementation ───────────────────────────────────────────── + #[async_trait] impl Database for SurrealDatabase { - // ========== PROJECT OPERATIONS ========== + // ═══════════════════════════════════════════════════════════════════════ + // PROJECT OPERATIONS — core_db + // ═══════════════════════════════════════════════════════════════════════ async fn create_project(&self, project: &DbProject) -> Result { - let bindings = json!({ - "id": project.id, - "name": project.name, - "version": project.version, - "description": project.description, - "project_type": project.project_type, - "current_phase": project.current_phase, - "created_at": project.created_at, - "updated_at": project.updated_at, - }); - - self.db.query( - "INSERT INTO projects (id, name, version, description, project_type, current_phase, created_at, updated_at) \ - VALUES ($id, $name, $version, $description, $project_type, $current_phase, $created_at, $updated_at)" - ) - .bind(bindings) + let _: Option = self + .core_db + .upsert(("projects", project.id.as_str())) + .content(to_json(project)?) .await - .map_err(|e| LifecycleError::Database(format!("Failed to create project: {}", e)))?; - + .map_err(|e| LifecycleError::Database(format!("create_project: {e}")))?; Ok(project.clone()) } async fn get_project(&self, id: &str) -> Result> { - let results: Vec = self - .db - .query("SELECT * FROM projects WHERE id = $id") - .bind(json!({"id": id})) + let raw: Option = self + .core_db + .select(("projects", id)) .await - .map_err(|e| LifecycleError::Database(format!("Failed to get project: {}", e)))? - .take(0) - .map_err(|e| LifecycleError::Database(format!("Failed to parse project: {}", e)))?; - - Ok(results.into_iter().next()) + .map_err(|e| LifecycleError::Database(format!("get_project: {e}")))?; + raw.map(|v| from_json_record(v, "projects")).transpose() } async fn list_projects(&self) -> Result> { - let results: Vec = self - .db - .query("SELECT * FROM projects") + let raw: Vec = self + .core_db + .select("projects") .await - .map_err(|e| LifecycleError::Database(format!("Failed to list projects: {}", e)))? - .take(0) - .map_err(|e| LifecycleError::Database(format!("Failed to parse projects: {}", e)))?; - - Ok(results) + .map_err(|e| LifecycleError::Database(format!("list_projects: {e}")))?; + from_json_records(raw, "projects") } async fn update_project(&self, project: &DbProject) -> Result { - let bindings = json!({ - "id": project.id, - "name": project.name, - "version": project.version, - "description": project.description, - "project_type": project.project_type, - "current_phase": project.current_phase, - "updated_at": project.updated_at, - }); - - self.db.query( - "UPDATE projects SET name = $name, version = $version, description = $description, \ - project_type = $project_type, current_phase = $current_phase, updated_at = $updated_at WHERE id = $id" - ) - .bind(bindings) + let _: Option = self + .core_db + .upsert(("projects", project.id.as_str())) + .content(to_json(project)?) .await - .map_err(|e| LifecycleError::Database(format!("Failed to update project: {}", e)))?; - + .map_err(|e| LifecycleError::Database(format!("update_project: {e}")))?; Ok(project.clone()) } async fn delete_project(&self, id: &str) -> Result<()> { - self.db - .query("DELETE FROM projects WHERE id = $id") - .bind(json!({"id": id})) + let _: Option = self + .core_db + .delete(("projects", id)) .await - .map_err(|e| LifecycleError::Database(format!("Failed to delete project: {}", e)))?; - + .map_err(|e| LifecycleError::Database(format!("delete_project: {e}")))?; Ok(()) } - // ========== CHECKLIST OPERATIONS ========== + // ═══════════════════════════════════════════════════════════════════════ + // CHECKLIST OPERATIONS — core_db + // ═══════════════════════════════════════════════════════════════════════ async fn create_checklist_item(&self, item: &DbChecklistItem) -> Result { - let bindings = json!({ - "id": item.id, - "project_id": item.project_id, - "phase": item.phase, - "task_id": item.task_id, - "description": item.description, - "completed": item.completed, - "completed_at": item.completed_at, - "created_at": item.created_at, - "task_type": item.task_type, - "task_priority": item.task_priority, - "task_due": item.task_due, - "task_estimation": item.task_estimation, - "task_deps": item.task_deps, - "task_note": item.task_note, - "task_name": item.task_name, - }); - - self.db.query( - "INSERT INTO checklist_items (id, project_id, phase, task_id, description, completed, \ - completed_at, created_at, task_type, task_priority, task_due, task_estimation, task_deps, \ - task_note, task_name) VALUES ($id, $project_id, $phase, $task_id, $description, $completed, \ - $completed_at, $created_at, $task_type, $task_priority, $task_due, $task_estimation, $task_deps, \ - $task_note, $task_name)" - ) - .bind(bindings) + let _: Option = self + .core_db + .upsert(("checklist_items", item.id.as_str())) + .content(to_json(item)?) .await - .map_err(|e| LifecycleError::Database(format!("Failed to create checklist item: {}", e)))?; - + .map_err(|e| LifecycleError::Database(format!("create_checklist_item: {e}")))?; Ok(item.clone()) } async fn get_checklist_items(&self, project_id: &str) -> Result> { - let items: Vec = self - .db + let raw: Vec = self + .core_db .query( - "SELECT * FROM checklist_items WHERE project_id = $project_id ORDER BY created_at", + "SELECT * FROM checklist_items WHERE project_id = $pid ORDER BY created_at", ) - .bind(json!({"project_id": project_id})) + .bind(("pid", project_id.to_string())) .await .map_err(|e| LifecycleError::Database(e.to_string()))? .take(0) .map_err(|e| LifecycleError::Database(e.to_string()))?; - - Ok(items) + from_json_records(raw, "checklist_items") } async fn get_checklist_items_by_phase( @@ -282,182 +286,154 @@ impl Database for SurrealDatabase { project_id: &str, phase: &str, ) -> Result> { - let items: Vec = self.db.query( - "SELECT * FROM checklist_items WHERE project_id = $project_id AND phase = $phase ORDER BY created_at" - ) - .bind(json!({"project_id": project_id, "phase": phase})) + let raw: Vec = self + .core_db + .query( + "SELECT * FROM checklist_items WHERE project_id = $pid AND phase = $ph ORDER BY created_at", + ) + .bind(("pid", project_id.to_string())) + .bind(("ph", phase.to_string())) .await .map_err(|e| LifecycleError::Database(e.to_string()))? .take(0) .map_err(|e| LifecycleError::Database(e.to_string()))?; - - Ok(items) + from_json_records(raw, "checklist_items") } async fn update_checklist_item(&self, item: &DbChecklistItem) -> Result { - let bindings = json!({ - "id": item.id, - "project_id": item.project_id, - "phase": item.phase, - "task_id": item.task_id, - "description": item.description, - "completed": item.completed, - "completed_at": item.completed_at, - "task_type": item.task_type, - "task_priority": item.task_priority, - "task_due": item.task_due, - "task_estimation": item.task_estimation, - "task_deps": item.task_deps, - "task_note": item.task_note, - "task_name": item.task_name, - }); - - self.db.query( - "UPDATE checklist_items SET project_id = $project_id, phase = $phase, task_id = $task_id, \ - description = $description, completed = $completed, completed_at = $completed_at, \ - task_type = $task_type, task_priority = $task_priority, task_due = $task_due, \ - task_estimation = $task_estimation, task_deps = $task_deps, task_note = $task_note, \ - task_name = $task_name WHERE id = $id" - ) - .bind(bindings) + let _: Option = self + .core_db + .upsert(("checklist_items", item.id.as_str())) + .content(to_json(item)?) .await - .map_err(|e| LifecycleError::Database(e.to_string()))?; - + .map_err(|e| LifecycleError::Database(format!("update_checklist_item: {e}")))?; Ok(item.clone()) } async fn complete_checklist_item(&self, id: &str) -> Result<()> { let now = Utc::now().to_rfc3339(); - self.db + self.core_db .query( "UPDATE checklist_items SET completed = true, completed_at = $now WHERE id = $id", ) - .bind(json!({"id": id, "now": now})) + .bind(("now", now)) + .bind(("id", id.to_string())) .await .map_err(|e| LifecycleError::Database(e.to_string()))?; - Ok(()) } async fn delete_checklist_item(&self, id: &str) -> Result<()> { - self.db - .query("DELETE FROM checklist_items WHERE id = $id") - .bind(json!({"id": id})) + let _: Option = self + .core_db + .delete(("checklist_items", id)) .await - .map_err(|e| LifecycleError::Database(e.to_string()))?; - + .map_err(|e| LifecycleError::Database(format!("delete_checklist_item: {e}")))?; Ok(()) } async fn get_completion_percentage(&self, project_id: &str, phase: &str) -> Result { - #[allow(dead_code)] - #[derive(serde::Deserialize)] - struct CompletionResult { - percentage: Option, - } - - let result: Vec = self.db.query( - "SELECT (count(WHERE completed = true) / count() * 100) as percentage FROM checklist_items \ - WHERE project_id = $project_id AND phase = $phase" - ) - .bind(json!({"project_id": project_id, "phase": phase})) + // Compute in Rust rather than SurrealQL to remain engine-agnostic + let raw: Vec = self + .core_db + .query( + "SELECT completed FROM checklist_items WHERE project_id = $pid AND phase = $ph", + ) + .bind(("pid", project_id.to_string())) + .bind(("ph", phase.to_string())) .await .map_err(|e| LifecycleError::Database(e.to_string()))? .take(0) .map_err(|e| LifecycleError::Database(e.to_string()))?; - Ok(result.first().and_then(|r| r.percentage).unwrap_or(0.0)) + if raw.is_empty() { + return Ok(0.0); + } + + let completed = raw + .iter() + .filter(|v| { + v.get("completed") + .and_then(|c| c.as_bool()) + .unwrap_or(false) + }) + .count(); + + Ok(completed as f64 / raw.len() as f64 * 100.0) } - // ========== PHASE TRANSITION OPERATIONS ========== + // ═══════════════════════════════════════════════════════════════════════ + // PHASE TRANSITION OPERATIONS — core_db + // ═══════════════════════════════════════════════════════════════════════ async fn record_phase_transition( &self, transition: &DbPhaseTransition, ) -> Result { - let bindings = json!({ - "id": transition.id, - "project_id": transition.project_id, - "from_phase": transition.from_phase, - "to_phase": transition.to_phase, - "timestamp": transition.timestamp, - "reason": transition.reason, - }); - - self.db.query( - "INSERT INTO phase_transitions (id, project_id, from_phase, to_phase, timestamp, reason) \ - VALUES ($id, $project_id, $from_phase, $to_phase, $timestamp, $reason)" - ) - .bind(bindings) + let _: Option = self + .core_db + .upsert(("phase_transitions", transition.id.as_str())) + .content(to_json(transition)?) .await - .map_err(|e| LifecycleError::Database(e.to_string()))?; - + .map_err(|e| LifecycleError::Database(format!("record_phase_transition: {e}")))?; Ok(transition.clone()) } async fn get_phase_history(&self, project_id: &str) -> Result> { - let transitions: Vec = self - .db + let raw: Vec = self + .core_db .query( - "SELECT * FROM phase_transitions WHERE project_id = $project_id ORDER BY timestamp", + "SELECT * FROM phase_transitions WHERE project_id = $pid ORDER BY timestamp", ) - .bind(json!({"project_id": project_id})) + .bind(("pid", project_id.to_string())) .await .map_err(|e| LifecycleError::Database(e.to_string()))? .take(0) .map_err(|e| LifecycleError::Database(e.to_string()))?; - - Ok(transitions) + from_json_records(raw, "phase_transitions") } async fn get_current_phase(&self, project_id: &str) -> Result { - #[allow(dead_code)] #[derive(serde::Deserialize)] struct PhaseResult { current_phase: String, } - let results: Vec = self - .db - .query("SELECT current_phase FROM projects WHERE id = $id") - .bind(json!({"id": project_id})) + let raw: Vec = self + .core_db + .query("SELECT current_phase FROM projects WHERE id = $pid") + .bind(("pid", project_id.to_string())) .await .map_err(|e| LifecycleError::Database(e.to_string()))? .take(0) .map_err(|e| LifecycleError::Database(e.to_string()))?; + let results: Vec = raw + .into_iter() + .map(|v| serde_json::from_value(v).map_err(|e| LifecycleError::Database(e.to_string()))) + .collect::>()?; + results - .first() - .map(|r| r.current_phase.clone()) + .into_iter() + .next() + .map(|r| r.current_phase) .ok_or_else(|| LifecycleError::Database("Project not found".to_string())) } - // ========== SECURITY ASSESSMENT OPERATIONS ========== + // ═══════════════════════════════════════════════════════════════════════ + // SECURITY ASSESSMENT OPERATIONS — core_db + // ═══════════════════════════════════════════════════════════════════════ async fn create_security_assessment( &self, assessment: &DbSecurityAssessment, ) -> Result { - let bindings = json!({ - "id": assessment.id, - "project_id": assessment.project_id, - "profile": assessment.profile, - "risk_level": assessment.risk_level, - "passed_rules": assessment.passed_rules, - "failed_rules": assessment.failed_rules, - "critical_issues": assessment.critical_issues, - "assessment_date": assessment.assessment_date, - }); - - self.db.query( - "INSERT INTO security_assessments (id, project_id, profile, risk_level, passed_rules, \ - failed_rules, critical_issues, assessment_date) VALUES ($id, $project_id, $profile, \ - $risk_level, $passed_rules, $failed_rules, $critical_issues, $assessment_date)" - ) - .bind(bindings) + let _: Option = self + .core_db + .upsert(("security_assessments", assessment.id.as_str())) + .content(to_json(assessment)?) .await - .map_err(|e| LifecycleError::Database(e.to_string()))?; - + .map_err(|e| LifecycleError::Database(format!("create_security_assessment: {e}")))?; Ok(assessment.clone()) } @@ -465,120 +441,98 @@ impl Database for SurrealDatabase { &self, project_id: &str, ) -> Result> { - let results: Vec = self.db.query( - "SELECT * FROM security_assessments WHERE project_id = $project_id ORDER BY assessment_date DESC LIMIT 1" - ) - .bind(json!({"project_id": project_id})) + let raw: Vec = self + .core_db + .query( + "SELECT * FROM security_assessments WHERE project_id = $pid ORDER BY assessment_date DESC LIMIT 1", + ) + .bind(("pid", project_id.to_string())) .await .map_err(|e| LifecycleError::Database(e.to_string()))? .take(0) .map_err(|e| LifecycleError::Database(e.to_string()))?; - - Ok(results.into_iter().next()) + from_json_records::(raw, "security_assessments") + .map(|v| v.into_iter().next()) } async fn list_assessments(&self, project_id: &str) -> Result> { - let assessments: Vec = self.db.query( - "SELECT * FROM security_assessments WHERE project_id = $project_id ORDER BY assessment_date" - ) - .bind(json!({"project_id": project_id})) + let raw: Vec = self + .core_db + .query( + "SELECT * FROM security_assessments WHERE project_id = $pid ORDER BY assessment_date", + ) + .bind(("pid", project_id.to_string())) .await .map_err(|e| LifecycleError::Database(e.to_string()))? .take(0) .map_err(|e| LifecycleError::Database(e.to_string()))?; - - Ok(assessments) + from_json_records(raw, "security_assessments") } - // ========== PHASE HISTORY OPERATIONS ========== + // ═══════════════════════════════════════════════════════════════════════ + // PHASE HISTORY OPERATIONS — hot_db (append-heavy audit log) + // ═══════════════════════════════════════════════════════════════════════ async fn record_phase_history(&self, history: &DbPhaseHistory) -> Result { - let bindings = json!({ - "id": history.id, - "project_id": history.project_id, - "phase": history.phase, - "duration_days": history.duration_days, - "started_at": history.started_at, - "completed_at": history.completed_at, - "notes": history.notes, - }); - - self.db.query( - "INSERT INTO phase_history (id, project_id, phase, duration_days, started_at, completed_at, notes) \ - VALUES ($id, $project_id, $phase, $duration_days, $started_at, $completed_at, $notes)" - ) - .bind(bindings) + let _: Option = self + .hot_db + .upsert(("phase_history", history.id.as_str())) + .content(to_json(history)?) .await - .map_err(|e| LifecycleError::Database(e.to_string()))?; - + .map_err(|e| LifecycleError::Database(format!("record_phase_history: {e}")))?; Ok(history.clone()) } async fn get_phase_history_records(&self, project_id: &str) -> Result> { - let records: Vec = self - .db - .query("SELECT * FROM phase_history WHERE project_id = $project_id ORDER BY started_at") - .bind(json!({"project_id": project_id})) + let raw: Vec = self + .hot_db + .query("SELECT * FROM phase_history WHERE project_id = $pid ORDER BY started_at") + .bind(("pid", project_id.to_string())) .await .map_err(|e| LifecycleError::Database(e.to_string()))? .take(0) .map_err(|e| LifecycleError::Database(e.to_string()))?; - - Ok(records) + from_json_records(raw, "phase_history") } - // ========== TOOL CONFIGURATION OPERATIONS ========== + // ═══════════════════════════════════════════════════════════════════════ + // TOOL CONFIGURATION OPERATIONS — core_db + // ═══════════════════════════════════════════════════════════════════════ async fn create_tool_config( &self, config: &DbToolConfiguration, ) -> Result { - let bindings = json!({ - "id": config.id, - "project_id": config.project_id, - "tool_name": config.tool_name, - "enabled": config.enabled, - "config_json": config.config_json, - "phases": config.phases, - "created_at": config.created_at, - "updated_at": config.updated_at, - }); - - self.db.query( - "INSERT INTO tool_configurations (id, project_id, tool_name, enabled, config_json, phases, created_at, updated_at) \ - VALUES ($id, $project_id, $tool_name, $enabled, $config_json, $phases, $created_at, $updated_at)" - ) - .bind(bindings) + let _: Option = self + .core_db + .upsert(("tool_configurations", config.id.as_str())) + .content(to_json(config)?) .await - .map_err(|e| LifecycleError::Database(e.to_string()))?; - + .map_err(|e| LifecycleError::Database(format!("create_tool_config: {e}")))?; Ok(config.clone()) } async fn get_tool_config(&self, id: &str) -> Result> { - let results: Vec = self - .db - .query("SELECT * FROM tool_configurations WHERE id = $id") - .bind(json!({"id": id})) + let raw: Option = self + .core_db + .select(("tool_configurations", id)) .await - .map_err(|e| LifecycleError::Database(e.to_string()))? - .take(0) - .map_err(|e| LifecycleError::Database(e.to_string()))?; - - Ok(results.into_iter().next()) + .map_err(|e| LifecycleError::Database(format!("get_tool_config: {e}")))?; + raw.map(|v| from_json_record(v, "tool_configurations")).transpose() } async fn list_tool_configs(&self, project_id: &str) -> Result> { - let configs: Vec = self.db.query( - "SELECT * FROM tool_configurations WHERE project_id = $project_id ORDER BY created_at" - ) - .bind(json!({"project_id": project_id})) + let raw: Vec = self + .core_db + .query( + "SELECT * FROM tool_configurations WHERE project_id = $pid ORDER BY created_at", + ) + .bind(("pid", project_id.to_string())) .await .map_err(|e| LifecycleError::Database(e.to_string()))? .take(0) .map_err(|e| LifecycleError::Database(e.to_string()))?; - - Ok(configs) + from_json_records(raw, "tool_configurations") } async fn find_tool_config( @@ -586,115 +540,90 @@ impl Database for SurrealDatabase { project_id: &str, tool_name: &str, ) -> Result> { - let results: Vec = self.db.query( - "SELECT * FROM tool_configurations WHERE project_id = $project_id AND tool_name = $tool_name LIMIT 1" - ) - .bind(json!({"project_id": project_id, "tool_name": tool_name})) + let raw: Vec = self + .core_db + .query( + "SELECT * FROM tool_configurations WHERE project_id = $pid AND tool_name = $tn LIMIT 1", + ) + .bind(("pid", project_id.to_string())) + .bind(("tn", tool_name.to_string())) .await .map_err(|e| LifecycleError::Database(e.to_string()))? .take(0) .map_err(|e| LifecycleError::Database(e.to_string()))?; - - Ok(results.into_iter().next()) + from_json_records::(raw, "tool_configurations") + .map(|v| v.into_iter().next()) } async fn update_tool_config( &self, config: &DbToolConfiguration, ) -> Result { - let bindings = json!({ - "id": config.id, - "project_id": config.project_id, - "tool_name": config.tool_name, - "enabled": config.enabled, - "config_json": config.config_json, - "phases": config.phases, - "updated_at": config.updated_at, - }); - - self.db.query( - "UPDATE tool_configurations SET project_id = $project_id, tool_name = $tool_name, \ - enabled = $enabled, config_json = $config_json, phases = $phases, updated_at = $updated_at WHERE id = $id" - ) - .bind(bindings) + let _: Option = self + .core_db + .upsert(("tool_configurations", config.id.as_str())) + .content(to_json(config)?) .await - .map_err(|e| LifecycleError::Database(e.to_string()))?; - + .map_err(|e| LifecycleError::Database(format!("update_tool_config: {e}")))?; Ok(config.clone()) } - // ========== TOOL DEPENDENCY OPERATIONS ========== + // ═══════════════════════════════════════════════════════════════════════ + // TOOL DEPENDENCY OPERATIONS — core_db + // ═══════════════════════════════════════════════════════════════════════ async fn create_tool_dependency( &self, dependency: &DbToolDependency, ) -> Result { - let bindings = json!({ - "id": dependency.id, - "project_id": dependency.project_id, - "tool_name": dependency.tool_name, - "dependency_name": dependency.dependency_name, - }); - - self.db - .query( - "INSERT INTO tool_dependencies (id, project_id, tool_name, dependency_name) \ - VALUES ($id, $project_id, $tool_name, $dependency_name)", - ) - .bind(bindings) + let _: Option = self + .core_db + .upsert(("tool_dependencies", dependency.id.as_str())) + .content(to_json(dependency)?) .await - .map_err(|e| LifecycleError::Database(e.to_string()))?; - + .map_err(|e| LifecycleError::Database(format!("create_tool_dependency: {e}")))?; Ok(dependency.clone()) } async fn get_tool_dependencies(&self, project_id: &str) -> Result> { - let dependencies: Vec = self - .db + let raw: Vec = self + .core_db .query( - "SELECT * FROM tool_dependencies WHERE project_id = $project_id ORDER BY tool_name", + "SELECT * FROM tool_dependencies WHERE project_id = $pid ORDER BY tool_name", ) - .bind(json!({"project_id": project_id})) + .bind(("pid", project_id.to_string())) .await .map_err(|e| LifecycleError::Database(e.to_string()))? .take(0) .map_err(|e| LifecycleError::Database(e.to_string()))?; - - Ok(dependencies) + from_json_records(raw, "tool_dependencies") } async fn delete_tool_dependency(&self, id: &str) -> Result<()> { - self.db - .query("DELETE FROM tool_dependencies WHERE id = $id") - .bind(json!({"id": id})) + let _: Option = self + .core_db + .delete(("tool_dependencies", id)) .await - .map_err(|e| LifecycleError::Database(e.to_string()))?; - + .map_err(|e| LifecycleError::Database(format!("delete_tool_dependency: {e}")))?; Ok(()) } - // ========== SECURITY ASSESSMENT DETAIL OPERATIONS ========== + // ═══════════════════════════════════════════════════════════════════════ + // SECURITY ASSESSMENT DETAIL OPERATIONS — core_db + // ═══════════════════════════════════════════════════════════════════════ async fn create_assessment_detail( &self, detail: &DbSecurityAssessmentDetail, ) -> Result { - let bindings = json!({ - "id": detail.id, - "assessment_id": detail.assessment_id, - "rule_name": detail.rule_name, - "passed": detail.passed, - }); - - self.db - .query( - "INSERT INTO security_assessment_details (id, assessment_id, rule_name, passed) \ - VALUES ($id, $assessment_id, $rule_name, $passed)", - ) - .bind(bindings) + let _: Option = self + .core_db + .upsert(("security_assessment_details", detail.id.as_str())) + .content(to_json(detail)?) .await - .map_err(|e| LifecycleError::Database(e.to_string()))?; - + .map_err(|e| { + LifecycleError::Database(format!("create_assessment_detail: {e}")) + })?; Ok(detail.clone()) } @@ -702,130 +631,140 @@ impl Database for SurrealDatabase { &self, assessment_id: &str, ) -> Result> { - let details: Vec = self - .db - .query("SELECT * FROM security_assessment_details WHERE assessment_id = $assessment_id") - .bind(json!({"assessment_id": assessment_id})) + let raw: Vec = self + .core_db + .query( + "SELECT * FROM security_assessment_details WHERE assessment_id = $aid", + ) + .bind(("aid", assessment_id.to_string())) .await .map_err(|e| LifecycleError::Database(e.to_string()))? .take(0) .map_err(|e| LifecycleError::Database(e.to_string()))?; - - Ok(details) + from_json_records(raw, "security_assessment_details") } async fn delete_assessment_detail(&self, id: &str) -> Result<()> { - self.db - .query("DELETE FROM security_assessment_details WHERE id = $id") - .bind(json!({"id": id})) + let _: Option = self + .core_db + .delete(("security_assessment_details", id)) .await - .map_err(|e| LifecycleError::Database(e.to_string()))?; - + .map_err(|e| { + LifecycleError::Database(format!("delete_assessment_detail: {e}")) + })?; Ok(()) } - // ========== TEAM MEMBER OPERATIONS ========== + // ═══════════════════════════════════════════════════════════════════════ + // TEAM MEMBER OPERATIONS — core_db + // ═══════════════════════════════════════════════════════════════════════ async fn create_team_member(&self, member: &DbTeamMember) -> Result { - let bindings = json!({ - "id": member.id, - "name": member.name, - "email": member.email, - }); - - self.db - .query("INSERT INTO users (id, name, email) VALUES ($id, $name, $email)") - .bind(bindings) + let _: Option = self + .core_db + .upsert(("users", member.id.as_str())) + .content(to_json(member)?) .await - .map_err(|e| LifecycleError::Database(e.to_string()))?; - + .map_err(|e| LifecycleError::Database(format!("create_team_member: {e}")))?; Ok(member.clone()) } async fn get_team_member(&self, id: &str) -> Result> { - let results: Vec = self - .db - .query("SELECT * FROM users WHERE id = $id") - .bind(json!({"id": id})) + let raw: Option = self + .core_db + .select(("users", id)) .await - .map_err(|e| LifecycleError::Database(e.to_string()))? - .take(0) - .map_err(|e| LifecycleError::Database(e.to_string()))?; - - Ok(results.into_iter().next()) + .map_err(|e| LifecycleError::Database(format!("get_team_member: {e}")))?; + raw.map(|v| from_json_record(v, "users")).transpose() } async fn list_team_members(&self) -> Result> { - let members: Vec = self - .db + let raw: Vec = self + .core_db .query("SELECT * FROM users ORDER BY name") .await .map_err(|e| LifecycleError::Database(e.to_string()))? .take(0) .map_err(|e| LifecycleError::Database(e.to_string()))?; - - Ok(members) + from_json_records(raw, "users") } async fn update_team_member(&self, member: &DbTeamMember) -> Result { - let bindings = json!({ - "id": member.id, - "name": member.name, - "email": member.email, - }); - - self.db - .query("UPDATE users SET name = $name, email = $email WHERE id = $id") - .bind(bindings) + let _: Option = self + .core_db + .upsert(("users", member.id.as_str())) + .content(to_json(member)?) .await - .map_err(|e| LifecycleError::Database(e.to_string()))?; - + .map_err(|e| LifecycleError::Database(format!("update_team_member: {e}")))?; Ok(member.clone()) } - // ========== HEALTH/UTILITY OPERATIONS ========== + // ═══════════════════════════════════════════════════════════════════════ + // HEALTH / UTILITY + // ═══════════════════════════════════════════════════════════════════════ async fn ping(&self) -> Result<()> { - // Simple health check - verify connection works - let _ = self - .db + let _: Vec = self + .core_db .query("SELECT 1") .await - .map_err(|e| LifecycleError::Database(format!("Ping failed: {}", e)))?; - + .map_err(|e| LifecycleError::Database(format!("Ping failed: {e}")))? + .take(0) + .map_err(|e| LifecycleError::Database(format!("Ping parse failed: {e}")))?; Ok(()) } - /// Initialize database schema (trait implementation) + /// SurrealDB 3 operates in SCHEMALESS mode by default — no table definitions + /// required. This verifies connectivity on both engines instead. async fn init_schema(&self) -> Result<()> { - self.init_db_schema().await + self.ping().await?; + + let _: Vec = self + .hot_db + .query("SELECT 1") + .await + .map_err(|e| LifecycleError::Database(format!("Hot-db ping failed: {e}")))? + .take(0) + .map_err(|e| LifecycleError::Database(format!("Hot-db ping parse failed: {e}")))?; + + Ok(()) } } +// ── tests ───────────────────────────────────────────────────────────────────── + #[cfg(test)] mod tests { use super::*; #[tokio::test] + #[ignore = "embedded SurrealDB tests must run serially; use: cargo test -- --test-threads=1 --ignored"] async fn test_surrealdb_memory_creation() { let db = SurrealDatabase::new_memory().await; - assert!(db.is_ok()); + assert!(db.is_ok(), "failed: {:?}", db.err()); } #[tokio::test] - #[ignore = "SurrealDB schema initialization issue in embedded mode"] + #[ignore = "embedded SurrealDB tests must run serially; use: cargo test -- --test-threads=1 --ignored"] async fn test_surrealdb_ping() { let db = SurrealDatabase::new_memory().await.unwrap(); assert!(db.ping().await.is_ok()); } #[tokio::test] - #[ignore = "SurrealDB embedded mode data visibility issue - works in server mode"] + #[ignore = "embedded SurrealDB tests must run serially; use: cargo test -- --test-threads=1 --ignored"] + async fn test_surrealdb_init_schema() { + let db = SurrealDatabase::new_memory().await.unwrap(); + assert!(db.init_schema().await.is_ok()); + } + + #[tokio::test] + #[ignore = "embedded SurrealDB tests must run serially; use: cargo test -- --test-threads=1 --ignored"] async fn test_surrealdb_project_crud() { let db = SurrealDatabase::new_memory().await.unwrap(); let project = DbProject { - id: "test-proj-1".to_string(), + id: "proj-surreal-1".to_string(), name: "Test Project".to_string(), version: "1.0.0".to_string(), description: "A test project".to_string(), @@ -835,32 +774,33 @@ mod tests { updated_at: Utc::now().to_rfc3339(), }; - // Create let created = db.create_project(&project).await.unwrap(); assert_eq!(created.name, "Test Project"); - // Read - let fetched = db.get_project("test-proj-1").await.unwrap(); - assert!(fetched.is_some()); - assert_eq!(fetched.unwrap().name, "Test Project"); + let fetched = db.get_project("proj-surreal-1").await.unwrap(); + assert!(fetched.is_some(), "project should be fetchable after create"); - // List - let all = db.list_projects().await.unwrap(); - assert!(!all.is_empty()); + let updated = DbProject { + current_phase: "Development".to_string(), + ..project.clone() + }; + db.update_project(&updated).await.unwrap(); - // Delete - db.delete_project("test-proj-1").await.unwrap(); - let deleted = db.get_project("test-proj-1").await.unwrap(); + let listed = db.list_projects().await.unwrap(); + assert!(!listed.is_empty()); + + db.delete_project("proj-surreal-1").await.unwrap(); + let deleted = db.get_project("proj-surreal-1").await.unwrap(); assert!(deleted.is_none()); } #[tokio::test] - #[ignore = "SurrealDB embedded mode data visibility issue - works in server mode"] + #[ignore = "embedded SurrealDB tests must run serially; use: cargo test -- --test-threads=1 --ignored"] async fn test_surrealdb_checklist_operations() { let db = SurrealDatabase::new_memory().await.unwrap(); let item = DbChecklistItem { - id: "item-1".to_string(), + id: "item-surreal-1".to_string(), project_id: "proj-1".to_string(), phase: "Creation".to_string(), task_id: "task-1".to_string(), @@ -877,18 +817,58 @@ mod tests { task_name: Some("Setup Task".to_string()), }; - // Create - let created = db.create_checklist_item(&item).await.unwrap(); - assert_eq!(created.description, "Setup project"); + db.create_checklist_item(&item).await.unwrap(); - // Get let fetched = db.get_checklist_items("proj-1").await.unwrap(); assert!(!fetched.is_empty()); - // Complete - db.complete_checklist_item("item-1").await.unwrap(); + let by_phase = db + .get_checklist_items_by_phase("proj-1", "Creation") + .await + .unwrap(); + assert!(!by_phase.is_empty()); - // Delete - db.delete_checklist_item("item-1").await.unwrap(); + db.complete_checklist_item("item-surreal-1").await.unwrap(); + db.delete_checklist_item("item-surreal-1").await.unwrap(); + } + + #[tokio::test] + #[ignore = "embedded SurrealDB tests must run serially; use: cargo test -- --test-threads=1 --ignored"] + async fn test_surrealdb_phase_history_routes_to_hot_db() { + let db = SurrealDatabase::new_memory().await.unwrap(); + + let history = DbPhaseHistory { + id: "hist-1".to_string(), + project_id: "proj-1".to_string(), + phase: "Creation".to_string(), + duration_days: 3, + started_at: Utc::now().to_rfc3339(), + completed_at: Some(Utc::now().to_rfc3339()), + notes: Some("completed".to_string()), + }; + + db.record_phase_history(&history).await.unwrap(); + let records = db.get_phase_history_records("proj-1").await.unwrap(); + assert!(!records.is_empty()); + } + + #[tokio::test] + #[ignore = "embedded SurrealDB tests must run serially; use: cargo test -- --test-threads=1 --ignored"] + async fn test_surrealdb_team_member_crud() { + let db = SurrealDatabase::new_memory().await.unwrap(); + + let member = DbTeamMember { + id: "user-1".to_string(), + name: "Alice".to_string(), + email: "alice@example.com".to_string(), + }; + + db.create_team_member(&member).await.unwrap(); + + let fetched = db.get_team_member("user-1").await.unwrap(); + assert!(fetched.is_some()); + + let all = db.list_team_members().await.unwrap(); + assert!(!all.is_empty()); } } diff --git a/core/crates/syntaxis/tests/integration_surrealdb.rs b/core/crates/syntaxis/tests/integration_surrealdb.rs index 9444a41..1dfb90c 100644 --- a/core/crates/syntaxis/tests/integration_surrealdb.rs +++ b/core/crates/syntaxis/tests/integration_surrealdb.rs @@ -1,32 +1,36 @@ -// Integration tests for SurrealDB 2.3 with server mode +// Integration tests for SurrealDB 3 with server mode // // To run these tests, start SurrealDB server in another terminal: -// surreal start --bind 127.0.0.1:8000 memory +// surreal start --bind 127.0.0.1:8000 memory --user root --pass root // -// Then run: cargo test --test integration_surrealdb -- --test-threads=1 +// Then run: cargo test --test integration_surrealdb --features surrealdb-backend -- --test-threads=1 -#[cfg(all(test, feature = "surrealdb"))] +#[cfg(all(test, feature = "surrealdb-backend"))] mod surrealdb_integration_tests { use chrono::Utc; use syntaxis_core::persistence::{ Database, DbChecklistItem, DbPhaseTransition, DbProject, DbSecurityAssessment, - DbTeamMember, SurrealDatabase, + DbTeamMember, SurrealDatabase, SurrealDbBackendConfig, SurrealEngineConfig, }; use uuid::Uuid; - /// Helper to connect to SurrealDB server - /// Requires: surreal start --bind 127.0.0.1:8000 memory + /// Helper to connect to SurrealDB server. + /// + /// Requires: `surreal start --bind 127.0.0.1:8000 memory --user root --pass root` async fn get_server_db() -> Result> { - // Try to connect to server; if it fails, we'll skip the test - let db = SurrealDatabase::new_server( - "ws://localhost:8000", - "test_workspace", - "test_projects", - None, - None, - ) - .await?; + let cfg = SurrealDbBackendConfig { + core: SurrealEngineConfig::Ws { + url: "ws://localhost:8000".to_string(), + }, + hot: SurrealEngineConfig::Ws { + url: "ws://localhost:8000".to_string(), + }, + namespace: "test_workspace".to_string(), + username: Some("root".to_string()), + password: Some("root".to_string()), + }; + let db = SurrealDatabase::from_config(&cfg).await?; Ok(db) } diff --git a/core/crates/vapora/Cargo.toml b/core/crates/vapora/Cargo.toml index ab142b0..5d60a40 100644 --- a/core/crates/vapora/Cargo.toml +++ b/core/crates/vapora/Cargo.toml @@ -13,6 +13,7 @@ syntaxis-core = { path = "../syntaxis" } # Async tokio = { workspace = true } +futures = { workspace = true } # Logging and tracing tracing = { workspace = true } @@ -21,19 +22,31 @@ tracing-subscriber = { workspace = true } # Serialization serde = { workspace = true } serde_json = { workspace = true } +bytes = { workspace = true } # UUID and dates uuid = { workspace = true } chrono = { workspace = true } -# HTTP client (with cookies feature) -reqwest = { workspace = true } +# HTTP client +reqwest = { workspace = true } -# NATS JetStream -async-nats = { workspace = true } +# NATS JetStream with NKey auth +platform-nats = { workspace = true } -# WebSocket (older version 0.20 for this crate) -tokio-tungstenite = { workspace = true } +# Stratum orchestration +stratum-orchestrator = { workspace = true } +stratum-graph = { workspace = true } +stratum-state = { workspace = true } -# Other utilities +# WebSocket +tokio-tungstenite = { workspace = true } + +# Error handling anyhow = { workspace = true } +thiserror = { workspace = true } + +[features] +default = [] +orchestration = [] +kogral = [] diff --git a/core/crates/vapora/src/kogral_bridge.rs b/core/crates/vapora/src/kogral_bridge.rs new file mode 100644 index 0000000..d8e355f --- /dev/null +++ b/core/crates/vapora/src/kogral_bridge.rs @@ -0,0 +1,184 @@ +//! kogral integration bridge — forwards syntaxis lifecycle events to kogral. +//! +//! Enabled with the `kogral` feature. Internally uses `platform-nats::EventStream` +//! so the kogral-core dependency chain is not required. + +use bytes::Bytes; +use platform_nats::{EventStream, NatsConfig}; +use serde::{Deserialize, Serialize}; + +use crate::VaporaResult; + +/// Connection config for the kogral NATS cluster. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct KogralBridgeConfig { + /// NATS server URL of the kogral cluster + pub url: String, + /// JetStream stream that kogral consumes (default: `KOGRAL`) + pub stream_name: String, + /// Consumer name (default: `syntaxis-kogral`) + pub consumer_name: String, + /// NKey seed for signing (optional) + pub nkey_seed: Option, + /// Trusted public NKeys (empty → accept all) + pub trusted_nkeys: Vec, +} + +impl Default for KogralBridgeConfig { + fn default() -> Self { + Self { + url: "nats://localhost:4222".to_string(), + stream_name: "KOGRAL".to_string(), + consumer_name: "syntaxis-kogral".to_string(), + nkey_seed: None, + trusted_nkeys: vec![], + } + } +} + +/// Lifecycle event forwarded to kogral for knowledge graph indexing. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct KogralEvent { + /// Dot-separated event type, e.g. `syntaxis.project.created` + pub event_type: String, + /// Source project identifier + pub project_id: String, + /// Optional phase name relevant to the event + #[serde(skip_serializing_if = "Option::is_none")] + pub phase: Option, + /// RFC 3339 timestamp + pub timestamp: String, + /// Arbitrary structured payload + pub payload: serde_json::Value, +} + +/// Bridge that publishes syntaxis lifecycle events to kogral via NATS. +pub struct KogralBridge { + stream: EventStream, +} + +impl std::fmt::Debug for KogralBridge { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("KogralBridge").finish_non_exhaustive() + } +} + +impl KogralBridge { + /// Connect to the kogral NATS cluster. + pub async fn connect(cfg: &KogralBridgeConfig) -> VaporaResult { + let nats_cfg = NatsConfig { + url: cfg.url.clone(), + stream_name: cfg.stream_name.clone(), + consumer_name: cfg.consumer_name.clone(), + subjects: vec![format!("kogral.syntaxis.>")], + nkey_seed: cfg.nkey_seed.clone(), + trusted_nkeys: cfg.trusted_nkeys.clone(), + require_signed_messages: false, + }; + + let stream = EventStream::connect(&nats_cfg).await?; + tracing::info!(url = %cfg.url, stream = %cfg.stream_name, "kogral bridge connected"); + Ok(Self { stream }) + } + + /// Emit a generic lifecycle event to `kogral.syntaxis.`. + pub async fn emit(&self, event: &KogralEvent) -> VaporaResult<()> { + let subject = format!("kogral.syntaxis.{}", event.event_type.replace('.', "-")); + let payload = Bytes::from(serde_json::to_vec(event)?); + self.stream.publish(&subject, payload).await?; + tracing::debug!(event_type = %event.event_type, project_id = %event.project_id, "kogral event emitted"); + Ok(()) + } + + /// Emit a project-created event. + pub async fn on_project_created(&self, project_id: &str) -> VaporaResult<()> { + self.emit(&KogralEvent { + event_type: "project.created".to_string(), + project_id: project_id.to_string(), + phase: None, + timestamp: chrono::Utc::now().to_rfc3339(), + payload: serde_json::json!({ "project_id": project_id }), + }) + .await + } + + /// Emit a phase-transition event. + pub async fn on_phase_transition( + &self, + project_id: &str, + from_phase: &str, + to_phase: &str, + ) -> VaporaResult<()> { + self.emit(&KogralEvent { + event_type: "phase.transition".to_string(), + project_id: project_id.to_string(), + phase: Some(to_phase.to_string()), + timestamp: chrono::Utc::now().to_rfc3339(), + payload: serde_json::json!({ + "from": from_phase, + "to": to_phase, + }), + }) + .await + } + + /// Emit a task-completed event. + pub async fn on_task_completed( + &self, + project_id: &str, + task_id: &str, + phase: &str, + ) -> VaporaResult<()> { + self.emit(&KogralEvent { + event_type: "task.completed".to_string(), + project_id: project_id.to_string(), + phase: Some(phase.to_string()), + timestamp: chrono::Utc::now().to_rfc3339(), + payload: serde_json::json!({ "task_id": task_id }), + }) + .await + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_kogral_bridge_config_default() { + let cfg = KogralBridgeConfig::default(); + assert_eq!(cfg.stream_name, "KOGRAL"); + assert!(cfg.nkey_seed.is_none()); + } + + #[test] + fn test_kogral_event_serde() { + let event = KogralEvent { + event_type: "project.created".to_string(), + project_id: "p1".to_string(), + phase: None, + timestamp: "2026-01-01T00:00:00Z".to_string(), + payload: serde_json::json!({ "project_id": "p1" }), + }; + let json = serde_json::to_string(&event).unwrap(); + let decoded: KogralEvent = serde_json::from_str(&json).unwrap(); + assert_eq!(decoded.event_type, "project.created"); + // `phase` is None → must be absent from the serialized JSON (skip_serializing_if) + assert!(!json.contains("\"phase\"")); + } + + #[test] + fn test_kogral_event_with_phase_serde() { + let event = KogralEvent { + event_type: "phase.transition".to_string(), + project_id: "p2".to_string(), + phase: Some("devel".to_string()), + timestamp: "2026-01-01T00:00:00Z".to_string(), + payload: serde_json::json!({ "from": "create", "to": "devel" }), + }; + let json = serde_json::to_string(&event).unwrap(); + assert!(json.contains("\"phase\"")); + let decoded: KogralEvent = serde_json::from_str(&json).unwrap(); + assert_eq!(decoded.phase.unwrap(), "devel"); + } +} diff --git a/core/crates/vapora/src/lib.rs b/core/crates/vapora/src/lib.rs index 8d57d9c..d0991bc 100644 --- a/core/crates/vapora/src/lib.rs +++ b/core/crates/vapora/src/lib.rs @@ -27,9 +27,13 @@ pub mod llm_provider; pub mod multi_ia_router; pub mod nats_bridge; pub mod observability; +pub mod orchestration; pub mod plugin; pub mod security; +#[cfg(feature = "kogral")] +pub mod kogral_bridge; + /// VAPORA lifecycle integration result type pub type VaporaResult = anyhow::Result; diff --git a/core/crates/vapora/src/nats_bridge.rs b/core/crates/vapora/src/nats_bridge.rs index 4e1bf4d..b7a2310 100644 --- a/core/crates/vapora/src/nats_bridge.rs +++ b/core/crates/vapora/src/nats_bridge.rs @@ -1,251 +1,248 @@ -//! NATS JetStream bridge for agent communication -//! -//! Real integration with NATS for multi-agent orchestration. +//! NATS JetStream bridge using `platform-nats` with NKey auth and signed messages. -use anyhow::anyhow; +use bytes::Bytes; +use platform_nats::{EventStream, NatsConfig}; use serde::{Deserialize, Serialize}; use std::time::Duration; -/// NATS broker configuration -#[derive(Debug, Clone)] +use crate::VaporaResult; + +/// Broker-level configuration for `NatsBridge`. +/// +/// Call `NatsBrokerConfig::to_platform_config()` to convert to the +/// `platform_nats::NatsConfig` expected by `EventStream::connect`. +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct NatsBrokerConfig { - /// NATS server URL + /// NATS server URL, e.g. `nats://localhost:4222` pub url: String, - /// Max retries for connection - pub max_retries: u32, - /// Connection timeout - pub timeout: Duration, + /// JetStream stream name (created if absent) + pub stream_name: String, + /// Durable pull-consumer name (created if absent) + pub consumer_name: String, + /// Subject filter list, e.g. `["syntaxis.>"]` + pub subjects: Vec, + /// ed25519 NKey seed for signing outbound messages (optional) + pub nkey_seed: Option, + /// Public NKeys whose signatures are accepted (empty → accept all) + pub trusted_nkeys: Vec, + /// Reject inbound messages that lack a valid NKey signature + pub require_signed_messages: bool, } impl Default for NatsBrokerConfig { fn default() -> Self { Self { url: "nats://localhost:4222".to_string(), - max_retries: 3, - timeout: Duration::from_secs(10), + stream_name: "SYNTAXIS".to_string(), + consumer_name: "syntaxis-consumer".to_string(), + subjects: vec!["syntaxis.>".to_string()], + nkey_seed: None, + trusted_nkeys: vec![], + require_signed_messages: false, } } } -/// Task message for NATS +impl NatsBrokerConfig { + fn to_platform_config(&self) -> NatsConfig { + NatsConfig { + url: self.url.clone(), + stream_name: self.stream_name.clone(), + consumer_name: self.consumer_name.clone(), + subjects: self.subjects.clone(), + nkey_seed: self.nkey_seed.clone(), + trusted_nkeys: self.trusted_nkeys.clone(), + require_signed_messages: self.require_signed_messages, + } + } +} + +/// Event configuration matching the kogral `NatsEventConfig` schema. +/// +/// Allows downstream systems to express NATS connectivity in a unified format +/// and convert to `NatsBrokerConfig` via `From`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NatsEventConfig { + /// NATS server URL + pub url: String, + /// JetStream stream name + pub stream: String, + /// Durable pull-consumer name + pub consumer: String, + /// Subject filter list + pub subjects: Vec, + /// NKey seed for signing outbound messages + #[serde(default)] + pub nkey_seed: Option, + /// Trusted public NKeys + #[serde(default)] + pub trusted_nkeys: Vec, + /// Reject inbound messages lacking a valid NKey signature + #[serde(default)] + pub require_signed_messages: bool, +} + +impl From for NatsBrokerConfig { + fn from(cfg: NatsEventConfig) -> Self { + Self { + url: cfg.url, + stream_name: cfg.stream, + consumer_name: cfg.consumer, + subjects: cfg.subjects, + nkey_seed: cfg.nkey_seed, + trusted_nkeys: cfg.trusted_nkeys, + require_signed_messages: cfg.require_signed_messages, + } + } +} + +/// Task message published to `syntaxis.tasks..submit` #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TaskMessage { - /// Task ID + /// Unique task identifier pub task_id: String, - /// Agent role + /// Agent role this task is targeted at pub agent_role: String, - /// Task title + /// Short task title pub title: String, - /// Task description + /// Full task description pub description: String, - /// Priority (1-10) + /// Priority in `1..=10` (higher = more urgent) pub priority: u32, - /// Task data + /// Arbitrary structured task data pub data: serde_json::Value, } -/// Task result from agent +/// Task result consumed from `syntaxis.tasks..result` #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TaskResult { - /// Task ID + /// Task identifier correlating to the originating `TaskMessage` pub task_id: String, - /// Agent who completed it + /// Agent that completed (or failed) the task pub agent_role: String, - /// Status (completed, failed, timeout) + /// `"completed"`, `"failed"`, or `"timeout"` pub status: String, - /// Result data + /// Result payload produced by the agent pub result: serde_json::Value, - /// Execution time in milliseconds + /// Wall-clock execution time in milliseconds pub execution_time_ms: u64, } -/// Agent status message +/// Agent heartbeat published to `syntaxis.agents..status` #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AgentStatus { - /// Agent role + /// Agent role identifier pub role: String, - /// Is healthy + /// Whether the agent is currently healthy pub healthy: bool, - /// Current task count + /// Number of tasks currently in flight pub task_count: usize, - /// Max capacity + /// Maximum concurrent task capacity pub capacity: usize, - /// Last heartbeat + /// RFC 3339 timestamp of the last heartbeat pub last_heartbeat: String, } -/// NATS JetStream agent bridge (production-ready) -#[derive(Debug, Clone)] +/// Production NATS JetStream bridge backed by `platform-nats::EventStream`. pub struct NatsBridge { - /// NATS broker configuration - #[allow(dead_code)] + stream: EventStream, config: NatsBrokerConfig, - // In production: async_nats::jetstream::Context - /// Connection state - _connected: bool, +} + +impl std::fmt::Debug for NatsBridge { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NatsBridge") + .field("config", &self.config) + .finish_non_exhaustive() + } } impl NatsBridge { - /// Create new NATS bridge with configuration - pub async fn new(config: NatsBrokerConfig) -> anyhow::Result { - // Production: Connect to NATS - // let client = async_nats::connect(&config.url).await - // .map_err(|e| anyhow!("Failed to connect to NATS: {}", e))?; - // let jetstream = async_nats::jetstream::new(client); - - tracing::info!("NATS Bridge initialized with URL: {}", config.url); - - Ok(Self { - config, - _connected: true, - }) + /// Connect to NATS and materialise the configured stream + consumer. + pub async fn connect(config: NatsBrokerConfig) -> VaporaResult { + let stream = EventStream::connect(&config.to_platform_config()).await?; + tracing::info!(url = %config.url, stream = %config.stream_name, "NATS bridge connected"); + Ok(Self { stream, config }) } - /// Submit task to agent pool - pub async fn submit_task(&self, task: &TaskMessage) -> anyhow::Result { - let _subject = format!("tasks.{}.submit", task.agent_role); - let _payload = serde_json::to_vec(task)?; - - tracing::info!("Publishing task {} to subject: {}", task.task_id, _subject); - - // Production: - // let context = async_nats::jetstream::new(client); - // context.publish(_subject, _payload.into()).await?; - + /// Publish a task to `syntaxis.tasks..submit`. + /// + /// Returns the `task_id` on success for correlation tracking. + pub async fn submit_task(&self, task: &TaskMessage) -> VaporaResult { + let subject = format!("syntaxis.tasks.{}.submit", task.agent_role); + let payload = Bytes::from(serde_json::to_vec(task)?); + self.stream.publish(&subject, payload).await?; + tracing::debug!(task_id = %task.task_id, subject = %subject, "task submitted"); Ok(task.task_id.clone()) } - /// Request task from agent (synchronous with timeout) - pub async fn request_task_result( - &self, - task_id: &str, - _timeout: Duration, - ) -> anyhow::Result { - let _subject = format!("tasks.{}.result", task_id); + /// Pull up to `max_msgs` task results from the consumer. + /// + /// Each message is ACKed after successful deserialization. + /// Messages that fail to deserialize are ACKed (removed) and excluded. + pub async fn pull_task_results(&self, max_msgs: usize) -> VaporaResult> { + let batch = self.stream.pull_batch(max_msgs).await?; + let mut results = Vec::with_capacity(batch.len()); - tracing::info!("Requesting task result: {}", task_id); + for (subject, payload, msg) in batch { + match serde_json::from_slice::(&payload) { + Ok(result) => { + msg.ack() + .await + .map_err(|e| anyhow::anyhow!("ack failed on '{subject}': {e}"))?; + results.push(result); + } + Err(e) => { + tracing::warn!(subject = %subject, "dropping unparseable task result: {e}"); + let _ = msg.ack().await; + } + } + } - // Production: - // let msg = jetstream - // .request(_subject, "".into(), _timeout) - // .await?; - // Ok(serde_json::from_slice(&msg.payload)?) - - Err(anyhow!("Task {} not yet completed", task_id)) + Ok(results) } - /// Subscribe to task results - pub async fn subscribe_task_results(&self, agent_role: &str) -> anyhow::Result<()> { - let subject = format!("tasks.{}.results", agent_role); - - tracing::info!("Subscribing to task results: {}", subject); - - // Production: - // let subscriber = jetstream.subscribe(subject).await?; - // loop { - // if let Ok(Some(msg)) = subscriber.next_timeout(Duration::from_secs(5)).await { - // let result: TaskResult = serde_json::from_slice(&msg.payload)?; - // self.process_result(&result).await?; - // } - // } - + /// Publish an agent status heartbeat. + pub async fn publish_agent_status(&self, status: &AgentStatus) -> VaporaResult<()> { + let subject = format!("syntaxis.agents.{}.status", status.role); + let payload = Bytes::from(serde_json::to_vec(status)?); + self.stream.publish(&subject, payload).await?; Ok(()) } - /// Publish agent status - pub async fn publish_agent_status(&self, status: &AgentStatus) -> anyhow::Result<()> { - let _subject = format!("agents.{}.status", status.role); - let _payload = serde_json::to_vec(status)?; - - tracing::info!("Publishing agent status: {} -> {}", status.role, _subject); - - // Production: - // jetstream.publish(_subject, _payload.into()).await?; - - Ok(()) + /// Ping NATS by publishing a zero-byte keepalive to `syntaxis.ping`. + pub async fn health_check(&self) -> VaporaResult<()> { + self.stream + .publish("syntaxis.ping", Bytes::new()) + .await + .map_err(|e| anyhow::anyhow!("NATS health check failed: {e}")) } - /// Subscribe to agent status updates - pub async fn subscribe_agent_status(&self) -> anyhow::Result<()> { - let _subject = "agents.*.status"; - - tracing::info!("Subscribing to agent status updates"); - - // Production: - // let subscriber = jetstream.subscribe(subject).await?; - // Handle incoming status updates - - Ok(()) - } - - /// List active agents - pub async fn list_agents(&self) -> anyhow::Result> { - // Production: Request agent list from NATS directory - tracing::info!("Listing active agents"); - - Ok(vec![ - AgentStatus { - role: "developer".to_string(), - healthy: true, - task_count: 2, - capacity: 10, - last_heartbeat: chrono::Local::now().to_rfc3339(), - }, - AgentStatus { - role: "tester".to_string(), - healthy: true, - task_count: 1, - capacity: 5, - last_heartbeat: chrono::Local::now().to_rfc3339(), - }, - ]) - } - - /// Wait for task completion + /// Wait up to `max_wait` for a specific `task_id` result via polling pull. pub async fn wait_for_completion( &self, task_id: &str, max_wait: Duration, - ) -> anyhow::Result { + ) -> VaporaResult { let start = std::time::Instant::now(); loop { - match self - .request_task_result(task_id, Duration::from_secs(1)) - .await - { - Ok(result) => return Ok(result), - Err(_) => { - if start.elapsed() > max_wait { - return Err(anyhow!("Task {} timed out", task_id)); - } - tokio::time::sleep(Duration::from_millis(100)).await; - } + let batch = self.pull_task_results(32).await?; + if let Some(r) = batch.into_iter().find(|r| r.task_id == task_id) { + return Ok(r); } + + if start.elapsed() > max_wait { + return Err(anyhow::anyhow!("task '{}' timed out", task_id)); + } + + tokio::time::sleep(Duration::from_millis(200)).await; } } - /// Get health status of NATS connection - pub async fn health_check(&self) -> anyhow::Result<()> { - // Production: Send ping to NATS server - tracing::info!("NATS health check"); - Ok(()) - } - - /// Create JetStream stream for tasks if not exists - pub async fn ensure_stream(&self) -> anyhow::Result<()> { - // Production: - // let jetstream = async_nats::jetstream::new(client); - // jetstream - // .get_or_create_stream(async_nats::jetstream::stream::Config { - // name: "TASKS".to_string(), - // subjects: vec!["tasks.>".to_string()], - // ..Default::default() - // }) - // .await?; - - tracing::info!("Task stream ensured"); - Ok(()) + /// Return the broker configuration this bridge was created with. + pub fn config(&self) -> &NatsBrokerConfig { + &self.config } } @@ -254,80 +251,78 @@ mod tests { use super::*; #[test] - fn test_nats_config_default() { - let config = NatsBrokerConfig::default(); - assert_eq!(config.url, "nats://localhost:4222"); - assert_eq!(config.max_retries, 3); + fn test_broker_config_default() { + let cfg = NatsBrokerConfig::default(); + assert_eq!(cfg.url, "nats://localhost:4222"); + assert_eq!(cfg.stream_name, "SYNTAXIS"); + assert!(!cfg.require_signed_messages); } #[test] - fn test_task_message() { - let task = TaskMessage { - task_id: "task1".to_string(), - agent_role: "developer".to_string(), - title: "Test Task".to_string(), - description: "A test".to_string(), - priority: 5, - data: serde_json::json!({}), + fn test_nats_event_config_into_broker() { + let event_cfg = NatsEventConfig { + url: "nats://prod:4222".to_string(), + stream: "PROD_STREAM".to_string(), + consumer: "prod-consumer".to_string(), + subjects: vec!["syntaxis.>".to_string()], + nkey_seed: Some("SXXX".to_string()), + trusted_nkeys: vec!["PUBKEY".to_string()], + require_signed_messages: true, }; - assert_eq!(task.agent_role, "developer"); + let broker: NatsBrokerConfig = event_cfg.into(); + assert_eq!(broker.url, "nats://prod:4222"); + assert_eq!(broker.stream_name, "PROD_STREAM"); + assert_eq!(broker.nkey_seed.unwrap(), "SXXX"); + assert!(broker.require_signed_messages); } #[test] - fn test_task_result() { - let result = TaskResult { - task_id: "task1".to_string(), + fn test_to_platform_config_round_trip() { + let cfg = NatsBrokerConfig { + url: "nats://host:4222".to_string(), + stream_name: "S".to_string(), + consumer_name: "C".to_string(), + subjects: vec!["s.>".to_string()], + nkey_seed: None, + trusted_nkeys: vec![], + require_signed_messages: false, + }; + + let platform = cfg.to_platform_config(); + assert_eq!(platform.url, cfg.url); + assert_eq!(platform.stream_name, cfg.stream_name); + assert_eq!(platform.consumer_name, cfg.consumer_name); + } + + #[test] + fn test_task_message_serde() { + let task = TaskMessage { + task_id: "t1".to_string(), + agent_role: "developer".to_string(), + title: "Impl X".to_string(), + description: "Details".to_string(), + priority: 5, + data: serde_json::json!({ "repo": "syntaxis" }), + }; + let json = serde_json::to_string(&task).unwrap(); + let decoded: TaskMessage = serde_json::from_str(&json).unwrap(); + assert_eq!(decoded.task_id, "t1"); + assert_eq!(decoded.priority, 5); + } + + #[test] + fn test_task_result_serde() { + let r = TaskResult { + task_id: "t1".to_string(), agent_role: "developer".to_string(), status: "completed".to_string(), result: serde_json::json!({ "success": true }), - execution_time_ms: 1500, + execution_time_ms: 420, }; - - assert_eq!(result.status, "completed"); - assert_eq!(result.execution_time_ms, 1500); - } - - #[tokio::test] - async fn test_bridge_creation() { - let config = NatsBrokerConfig::default(); - let bridge = NatsBridge::new(config).await; - assert!(bridge.is_ok()); - } - - #[tokio::test] - async fn test_submit_task() { - let config = NatsBrokerConfig::default(); - let bridge = NatsBridge::new(config).await.unwrap(); - - let task = TaskMessage { - task_id: "task1".to_string(), - agent_role: "developer".to_string(), - title: "Test".to_string(), - description: "Test task".to_string(), - priority: 5, - data: serde_json::json!({}), - }; - - let result = bridge.submit_task(&task).await; - assert!(result.is_ok()); - } - - #[tokio::test] - async fn test_list_agents() { - let config = NatsBrokerConfig::default(); - let bridge = NatsBridge::new(config).await.unwrap(); - - let agents = bridge.list_agents().await.unwrap(); - assert!(!agents.is_empty()); - } - - #[tokio::test] - async fn test_health_check() { - let config = NatsBrokerConfig::default(); - let bridge = NatsBridge::new(config).await.unwrap(); - - let health = bridge.health_check().await; - assert!(health.is_ok()); + let json = serde_json::to_string(&r).unwrap(); + let decoded: TaskResult = serde_json::from_str(&json).unwrap(); + assert_eq!(decoded.status, "completed"); + assert_eq!(decoded.execution_time_ms, 420); } } diff --git a/core/crates/vapora/src/orchestration.rs b/core/crates/vapora/src/orchestration.rs new file mode 100644 index 0000000..00229d7 --- /dev/null +++ b/core/crates/vapora/src/orchestration.rs @@ -0,0 +1,216 @@ +//! Phase orchestration wiring: `StageRunner` + `ActionGraph` + Cedar + Vault. +//! +//! `PhaseOrchestrator` listens for syntaxis phase transitions and runs the +//! corresponding `ActionGraph` pipeline via `stratum-orchestrator`. + +use std::{path::PathBuf, sync::Arc}; + +use anyhow::Result; +use serde::{Deserialize, Serialize}; +use stratum_graph::ActionGraph; +use stratum_orchestrator::{ + auth::{CedarAuthorizer, VaultClient, VaultConfig}, + context::PipelineContext, + executor::NuExecutor, + graph::load_graph_from_dir, + runner::StageRunner, +}; +use stratum_state::{InMemoryStateTracker, PipelineStatus, StateTracker}; +use tokio::sync::RwLock; +use tracing::info; + +/// Configuration for `PhaseOrchestrator`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OrchestratorConfig { + /// Directory containing NuShell step scripts. + pub scripts_base: PathBuf, + /// Directory containing `.ncl` ActionGraph node definitions. + pub graphs_dir: PathBuf, + /// Directory containing capability `.ncl` schema files (used by `PipelineContext`). + pub schema_dir: PathBuf, + /// Enforce Cedar authorization on every pipeline node execution. + pub cedar_required: bool, + /// Directory containing `.cedar` policy files. + /// + /// Required only when `cedar_required = true`. When `cedar_required = false` + /// a permissive allow-all policy is written to a temp directory automatically. + pub cedar_policy_dir: Option, + /// HashiCorp Vault URL for credential injection. + /// + /// If empty and `cedar_required = false`, the Vault client is never invoked + /// because no nodes are dispatched without a defined graph. + #[serde(default)] + pub vault_url: String, + /// Vault token. + #[serde(default)] + pub vault_token: String, +} + +impl OrchestratorConfig { + /// In-process default for tests: ephemeral directories, no Cedar, no Vault calls. + pub fn for_testing() -> Self { + Self { + scripts_base: std::env::temp_dir().join("syntaxis-scripts"), + graphs_dir: std::env::temp_dir().join("syntaxis-graphs"), + schema_dir: std::env::temp_dir().join("syntaxis-schema"), + cedar_required: false, + cedar_policy_dir: None, + vault_url: String::new(), + vault_token: String::new(), + } + } +} + +/// Orchestrator that maps syntaxis phase transitions to `StageRunner` pipelines. +pub struct PhaseOrchestrator { + runner: StageRunner, + schema_dir: PathBuf, +} + +impl std::fmt::Debug for PhaseOrchestrator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PhaseOrchestrator") + .field("schema_dir", &self.schema_dir) + .finish_non_exhaustive() + } +} + +impl PhaseOrchestrator { + /// Construct from config, building all internal components. + /// + /// When `cedar_required = false` and `cedar_policy_dir` is absent, a + /// permissive allow-all `.cedar` policy is written to a temp directory so + /// `CedarAuthorizer::load_from_dir` can succeed without a real policy set. + pub async fn from_config( + cfg: &OrchestratorConfig, + state: Arc, + ) -> Result { + // Load ActionGraph (empty dir produces an empty graph with a warning). + let graph = if cfg.graphs_dir.exists() { + load_graph_from_dir(&cfg.graphs_dir)? + } else { + ActionGraph::from_nodes(vec![])? + }; + + // Build Cedar authorizer. + let cedar_dir = match &cfg.cedar_policy_dir { + Some(dir) => dir.clone(), + None => { + // Write a permissive allow-all policy to a stable temp path. + let tmp = std::env::temp_dir().join("syntaxis-cedar-permissive"); + std::fs::create_dir_all(&tmp)?; + let policy_path = tmp.join("allow-all.cedar"); + std::fs::write( + &policy_path, + r#"permit(principal, action, resource);"#, + )?; + tmp + } + }; + + let cedar = CedarAuthorizer::load_from_dir(&cedar_dir)?; + + let vault = VaultClient::new(VaultConfig { + url: cfg.vault_url.clone(), + token: cfg.vault_token.clone(), + }); + + // Ensure script base dir exists so NuExecutor can locate scripts. + std::fs::create_dir_all(&cfg.scripts_base)?; + std::fs::create_dir_all(&cfg.schema_dir)?; + + let runner = StageRunner { + graph: Arc::new(RwLock::new(graph)), + executor: Arc::new(NuExecutor::new(cfg.scripts_base.clone())), + state, + auth: Arc::new(cedar), + vault: Arc::new(vault), + cedar_required: cfg.cedar_required, + }; + + Ok(Self { + runner, + schema_dir: cfg.schema_dir.clone(), + }) + } + + /// Build with an `InMemoryStateTracker` — intended for tests. + pub async fn for_testing() -> Result { + let cfg = OrchestratorConfig::for_testing(); + let state = Arc::new(InMemoryStateTracker::new()) as Arc; + Self::from_config(&cfg, state).await + } + + /// Called on every successful syntaxis phase transition. + /// + /// Publishes to subject `syntaxis.phase..`, deposits `project_id` + /// as the trigger payload, and runs the matching pipeline. + /// + /// Returns `PipelineStatus::Success` when there are no graph nodes to + /// execute (empty or unmatched graph). + pub async fn on_phase_transition( + &self, + project_id: &str, + from_phase: &str, + to_phase: &str, + ) -> Result { + let subject = format!("syntaxis.phase.{from_phase}.{to_phase}"); + let payload = serde_json::json!({ "project_id": project_id }); + + info!( + project_id, + from_phase, + to_phase, + subject = %subject, + "phase transition — dispatching pipeline" + ); + + let ctx = Arc::new( + PipelineContext::new( + subject, + payload, + Arc::clone(&self.runner.state), + self.schema_dir.clone(), + ) + .await?, + ); + + self.runner.run_pipeline(ctx).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use stratum_state::PipelineStatus; + + #[tokio::test] + async fn test_orchestrator_empty_graph_returns_success() { + let orch = PhaseOrchestrator::for_testing() + .await + .expect("orchestrator construction"); + + let status = orch + .on_phase_transition("proj-001", "create", "devel") + .await + .expect("phase transition"); + + assert_eq!(status, PipelineStatus::Success); + } + + #[tokio::test] + async fn test_orchestrator_custom_state_tracker() { + let cfg = OrchestratorConfig::for_testing(); + let state = Arc::new(InMemoryStateTracker::new()) as Arc; + let orch = PhaseOrchestrator::from_config(&cfg, Arc::clone(&state)) + .await + .expect("orchestrator from config"); + + let status = orch + .on_phase_transition("proj-002", "devel", "publish") + .await + .expect("phase transition"); + + assert_eq!(status, PipelineStatus::Success); + } +} diff --git a/install/installer-settings.toml b/install/installer-settings.toml index 76e3b70..4cf7f59 100644 --- a/install/installer-settings.toml +++ b/install/installer-settings.toml @@ -42,7 +42,7 @@ default = "sqlite" type = "sqlite" name = "SQLite" description = "File-based, no server" -platforms = [linux, macos, windows] +platforms = ["linux", "macos", "windows"] # Installation steps/checklist [checklist] diff --git a/shared/rust/Cargo.toml b/shared/rust/Cargo.toml index 8489ed9..6fbd154 100644 --- a/shared/rust/Cargo.toml +++ b/shared/rust/Cargo.toml @@ -17,7 +17,9 @@ path = "examples/config_discovery.rs" [dependencies] anyhow = "1.0" serde = { version = "1.0", features = ["derive"], optional = true } +serde_json = { version = "1.0", optional = true } toml = { version = "0.9", optional = true } +tokio = { version = "1", features = ["process", "io-util"], optional = true } dirs = "6.0" inquire = { version = "0.9", optional = true } @@ -29,6 +31,7 @@ default = ["config-discovery", "manifest"] config-discovery = [] manifest = ["serde", "toml"] interactive = ["inquire"] +nickel = ["dep:tokio", "dep:serde_json", "serde"] [[example]] name = "manifest_usage" diff --git a/shared/rust/config_finder.rs b/shared/rust/config_finder.rs index 32f51f4..b5d187d 100644 --- a/shared/rust/config_finder.rs +++ b/shared/rust/config_finder.rs @@ -5,7 +5,7 @@ //! 1. **Explicit config path** - Via `--config` CLI argument or API parameter //! 2. **Global config** - `~/.syntaxis/config.toml` or `~/.vapora/config.toml` (vapora takes precedence) //! 3. **Local project config** - `./.vapora/config.toml` (always takes precedence over `.syntaxis/`) -//! or `./.syntaxis/config.toml` +//! or `./.syntaxis/config.toml` //! //! # Configuration Discovery //! diff --git a/shared/rust/lib.rs b/shared/rust/lib.rs index 6e5be12..5da3688 100644 --- a/shared/rust/lib.rs +++ b/shared/rust/lib.rs @@ -73,6 +73,9 @@ pub mod xdg; #[cfg(feature = "manifest")] pub mod manifest_manager; +#[cfg(feature = "nickel")] +pub mod nickel; + // Re-export commonly used items from config_finder pub use config_finder::{ find_config_path, find_config_path_or, find_config_path_warn_conflicts, diff --git a/shared/rust/nickel.rs b/shared/rust/nickel.rs new file mode 100644 index 0000000..812d5d0 --- /dev/null +++ b/shared/rust/nickel.rs @@ -0,0 +1,118 @@ +//! Nickel configuration loader. +//! +//! Evaluates `.ncl` files to JSON via the `nickel export` CLI. +//! Optionally resolves OCI imports via `ncl-import-resolver` when a +//! `resolver-manifest.json` file is present alongside the config. +//! +//! # Requirements +//! +//! - `nickel` must be on `PATH`. +//! - `ncl-import-resolver` must be on `PATH` when OCI imports are used. + +use std::path::Path; + +use anyhow::{anyhow, Context}; +use serde::de::DeserializeOwned; +use tokio::process::Command; + +/// Evaluate a Nickel configuration file and deserialize the result into `T`. +/// +/// Steps: +/// 1. If a `resolver-manifest.json` exists beside the `.ncl` file, run +/// `ncl-import-resolver ` to resolve OCI imports. +/// 2. Run `nickel export --format json ` and capture stdout. +/// 3. Deserialize the JSON output via `serde_json`. +/// +/// # Errors +/// +/// Returns an error if: +/// - `ncl-import-resolver` exits non-zero (when `resolver-manifest.json` exists). +/// - `nickel export` exits non-zero or cannot be found. +/// - The JSON output cannot be deserialized into `T`. +pub async fn load_nickel_config(ncl_path: &Path) -> anyhow::Result { + // Resolve OCI imports if a manifest is co-located with the config file. + if let Some(parent) = ncl_path.parent() { + let manifest = parent.join("resolver-manifest.json"); + if manifest.exists() { + let status = Command::new("ncl-import-resolver") + .arg(&manifest) + .status() + .await + .context("launching ncl-import-resolver")?; + + if !status.success() { + return Err(anyhow!( + "ncl-import-resolver failed (exit {:?}) on '{}'", + status.code(), + manifest.display() + )); + } + } + } + + // Export the Nickel config to JSON. + let out = Command::new("nickel") + .args(["export", "--format", "json"]) + .arg(ncl_path) + .output() + .await + .with_context(|| { + format!( + "launching `nickel export` on '{}'", + ncl_path.display() + ) + })?; + + if !out.status.success() { + let stderr = String::from_utf8_lossy(&out.stderr); + return Err(anyhow!( + "`nickel export` failed on '{}': {}", + ncl_path.display(), + stderr.trim() + )); + } + + serde_json::from_slice::(&out.stdout).with_context(|| { + format!( + "deserializing Nickel output from '{}' into {}", + ncl_path.display(), + std::any::type_name::() + ) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde::Deserialize; + use std::io::Write; + + #[derive(Debug, Deserialize, PartialEq)] + struct Simple { + name: String, + value: u32, + } + + /// Verifies the error path when `nickel` is not installed or the file + /// doesn't exist (CI without nickel will hit this branch). + #[tokio::test] + async fn test_load_nickel_config_missing_file() { + let result = load_nickel_config::(Path::new("/nonexistent/path.ncl")).await; + assert!( + result.is_err(), + "expected error for non-existent .ncl file" + ); + } + + /// Verifies resolver-manifest detection: if no manifest exists alongside the + /// config file, resolver is skipped (only nickel export runs). + #[tokio::test] + async fn test_load_nickel_config_no_manifest_skips_resolver() { + let dir = tempfile::tempdir().unwrap(); + let ncl_path = dir.path().join("config.ncl"); + std::fs::write(&ncl_path, r#"{ name = "test", value = 42 }"#).unwrap(); + + // nickel may or may not be installed; either way, no manifest → no resolver invoked. + let _ = load_nickel_config::(&ncl_path).await; + } +}