feat: integrate stratum orchestration, kogral bridge, and NATS
Some checks failed
Build - Verify Code & Build Binaries / Check Code Format (push) Has been cancelled
Build - Verify Code & Build Binaries / Lint with Clippy (push) Has been cancelled
Build - Verify Code & Build Binaries / Test Suite (push) Has been cancelled
Build - Verify Code & Build Binaries / Cargo Check (push) Has been cancelled
Build - Verify Code & Build Binaries / Security Audit (push) Has been cancelled
Build - Verify Code & Build Binaries / Build (Debug) - macos-latest (push) Has been cancelled
Build - Verify Code & Build Binaries / Build (Debug) - ubuntu-latest (push) Has been cancelled
Build - Verify Code & Build Binaries / Build (Debug) - windows-latest (push) Has been cancelled
Build - Verify Code & Build Binaries / Build (Release) - macos-latest (push) Has been cancelled
Build - Verify Code & Build Binaries / Build (Release) - ubuntu-latest (push) Has been cancelled
Build - Verify Code & Build Binaries / Build (Release) - windows-latest (push) Has been cancelled
CI/CD with Staging Preset / Validate Installation with Staging Preset (macos-latest) (push) Has been cancelled
CI/CD with Staging Preset / Validate Installation with Staging Preset (ubuntu-latest) (push) Has been cancelled
CI/CD with Staging Preset / Validate Documentation (push) Has been cancelled
Build - Verify Code & Build Binaries / All Checks Passed (push) Has been cancelled
CI/CD with Staging Preset / Build and Test with Staging Preset (push) Has been cancelled
CI/CD with Staging Preset / Integration Test with Docker Compose (push) Has been cancelled
CI/CD with Staging Preset / Test Summary (push) Has been cancelled
Some checks failed
Build - Verify Code & Build Binaries / Check Code Format (push) Has been cancelled
Build - Verify Code & Build Binaries / Lint with Clippy (push) Has been cancelled
Build - Verify Code & Build Binaries / Test Suite (push) Has been cancelled
Build - Verify Code & Build Binaries / Cargo Check (push) Has been cancelled
Build - Verify Code & Build Binaries / Security Audit (push) Has been cancelled
Build - Verify Code & Build Binaries / Build (Debug) - macos-latest (push) Has been cancelled
Build - Verify Code & Build Binaries / Build (Debug) - ubuntu-latest (push) Has been cancelled
Build - Verify Code & Build Binaries / Build (Debug) - windows-latest (push) Has been cancelled
Build - Verify Code & Build Binaries / Build (Release) - macos-latest (push) Has been cancelled
Build - Verify Code & Build Binaries / Build (Release) - ubuntu-latest (push) Has been cancelled
Build - Verify Code & Build Binaries / Build (Release) - windows-latest (push) Has been cancelled
CI/CD with Staging Preset / Validate Installation with Staging Preset (macos-latest) (push) Has been cancelled
CI/CD with Staging Preset / Validate Installation with Staging Preset (ubuntu-latest) (push) Has been cancelled
CI/CD with Staging Preset / Validate Documentation (push) Has been cancelled
Build - Verify Code & Build Binaries / All Checks Passed (push) Has been cancelled
CI/CD with Staging Preset / Build and Test with Staging Preset (push) Has been cancelled
CI/CD with Staging Preset / Integration Test with Docker Compose (push) Has been cancelled
CI/CD with Staging Preset / Test Summary (push) Has been cancelled
platform - Add orchestration.rs and kogral_bridge.rs to syntaxis-vapora - Replace async-nats with platform-nats (NKey auth support) - Wire stratum-orchestrator, stratum-graph, stratum-state deps - Upgrade surrealdb 2.3 → 3 with kv-surrealkv and rustls features - Consolidate core/Cargo.toml into root workspace (remove virtual manifest) - Add shared/rust/nickel.rs for Nickel config integration - Rename CLI binary from syntaxis-cli to syntaxis
This commit is contained in:
parent
48d7503b48
commit
3faf7a5fc9
@ -74,7 +74,13 @@ palette = { version = "0.7", features = ["serializing"] }
|
||||
# Database
|
||||
sqlx = { version = "0.8", features = ["runtime-tokio-native-tls", "sqlite", "macros"] }
|
||||
sqlx-sqlite = "0.8"
|
||||
surrealdb = { version = "2.3", features = ["kv-mem", "kv-rocksdb"] }
|
||||
surrealdb = { version = "3", features = ["kv-mem", "kv-surrealkv", "kv-rocksdb", "protocol-ws", "rustls"] }
|
||||
platform-nats = { path = "../stratumiops/crates/platform-nats" }
|
||||
stratum-orchestrator = { path = "../stratumiops/crates/stratum-orchestrator" }
|
||||
stratum-graph = { path = "../stratumiops/crates/stratum-graph" }
|
||||
stratum-state = { path = "../stratumiops/crates/stratum-state" }
|
||||
kogral-core = { path = "../kogral/crates/kogral-core" }
|
||||
bytes = "1.9"
|
||||
serde_bytes = "0.11"
|
||||
|
||||
# Logging/Tracing
|
||||
|
||||
@ -47,7 +47,7 @@ default = "sqlite"
|
||||
type = "sqlite"
|
||||
name = "SQLite"
|
||||
description = "File-based, no server"
|
||||
platforms = [linux, macos, windows]
|
||||
platforms = ["linux", "macos", "windows"]
|
||||
|
||||
# Installation steps/checklist
|
||||
[checklist]
|
||||
|
||||
@ -1,77 +1,3 @@
|
||||
# This is a virtual manifest grouped under the parent workspace in /Users/Akasha/Development/syntaxis/Cargo.toml
|
||||
# All workspace configuration, dependencies, and profiles are defined in the root workspace
|
||||
|
||||
[workspace.package]
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
rust-version = "1.75"
|
||||
authors = ["syntaxis contributors"]
|
||||
license = "MIT OR Apache-2.0"
|
||||
repository = "https://github.com/syntaxis/core"
|
||||
|
||||
[workspace.dependencies]
|
||||
# Serialization
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
toml = "0.9"
|
||||
uuid = { version = "1.18", features = ["v4", "serde"] }
|
||||
|
||||
# Error handling
|
||||
thiserror = "2.0"
|
||||
anyhow = "1.0"
|
||||
|
||||
# Async runtime
|
||||
tokio = { version = "1.48", features = ["full"] }
|
||||
async-trait = "0.1"
|
||||
futures = "0.3"
|
||||
|
||||
# Web framework
|
||||
axum = { version = "0.8", features = ["ws"] }
|
||||
tower = "0.5"
|
||||
tower-http = { version = "0.6", features = ["trace", "cors", "fs"] }
|
||||
tokio-rustls = "0.26"
|
||||
rustls = "0.23"
|
||||
rustls-pemfile = "2.2"
|
||||
|
||||
# HTTP client
|
||||
reqwest = { version = "0.12", features = ["json"] }
|
||||
|
||||
# Logging/Tracing
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
|
||||
# Date/time
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
|
||||
# File operations
|
||||
camino = "1.2"
|
||||
walkdir = "2.5"
|
||||
|
||||
# Templating
|
||||
handlebars = "6.3"
|
||||
|
||||
# Database
|
||||
sqlx = { version = "0.8", features = ["runtime-tokio-native-tls", "sqlite", "macros"] }
|
||||
sqlx-sqlite = "0.8"
|
||||
surrealdb = { version = "2.3", features = ["kv-mem", "kv-rocksdb"] }
|
||||
serde_bytes = "0.11"
|
||||
|
||||
# Other utilities
|
||||
indexmap = "2.12"
|
||||
regex = "1.12"
|
||||
moka = { version = "0.12", features = ["future"] }
|
||||
tokio-tungstenite = "0.28"
|
||||
jsonwebtoken = { version = "10.2", features = ["aws_lc_rs"] }
|
||||
once_cell = "1.21"
|
||||
prometheus = { version = "0.14", features = ["process"] }
|
||||
async-nats = "0.45"
|
||||
rand_core = "0.6"
|
||||
rand = "0.8"
|
||||
|
||||
# Dev dependencies
|
||||
tokio-test = "0.4"
|
||||
tempfile = "3.23"
|
||||
assert_cmd = "2.1"
|
||||
predicates = "3.1"
|
||||
criterion = { version = "0.7", features = ["html_reports"] }
|
||||
mockito = "1.6"
|
||||
# Sub-directory grouping file — not a workspace root.
|
||||
# Workspace definitions live in /Users/Akasha/Development/syntaxis/Cargo.toml.
|
||||
# Packages in core/crates/ resolve { workspace = true } deps from the root workspace.
|
||||
|
||||
@ -207,8 +207,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
);
|
||||
|
||||
// Initialize JWT provider if enabled
|
||||
let jwt_prov = if auth_config.jwt_enabled && auth_config.jwt_secret.is_some() {
|
||||
let secret = auth_config.jwt_secret.as_ref().unwrap();
|
||||
let jwt_prov = if let Some(secret) = auth_config.jwt_secret.as_ref().filter(|_| auth_config.jwt_enabled) {
|
||||
let expiration = auth_config.jwt_expiration as i64;
|
||||
|
||||
match shared_api_lib::auth::jwt::JwtProvider::new(secret, expiration) {
|
||||
|
||||
@ -3,7 +3,7 @@ mode = "Standalone"
|
||||
primary_language = "rust"
|
||||
languages = ["rust"]
|
||||
phase = "Creation"
|
||||
database_path = ".project-tools/data/lifecycle.db"
|
||||
database_path = "data.db"
|
||||
|
||||
[project]
|
||||
name = "syntaxis-cli"
|
||||
@ -13,8 +13,8 @@ authors = ["Your Name"]
|
||||
license = "MIT"
|
||||
keywords = []
|
||||
categories = []
|
||||
created = "2025-11-17T00:36:54.732694Z"
|
||||
last_updated = "2025-11-17T00:36:54.732694Z"
|
||||
created = "2026-02-22T11:10:00.776028Z"
|
||||
last_updated = "2026-02-22T11:10:00.776028Z"
|
||||
|
||||
[phases]
|
||||
allowed_transitions = []
|
||||
@ -31,7 +31,7 @@ include_dev = false
|
||||
|
||||
[export]
|
||||
format = "json"
|
||||
output_path = ".project-tools/exports/project-{timestamp}.json"
|
||||
output_path = "exports/project-{timestamp}.json"
|
||||
include_metadata = true
|
||||
include_tags = true
|
||||
pretty = true
|
||||
|
||||
@ -9,7 +9,7 @@ repository.workspace = true
|
||||
description = "CLI tool for syntaxis management"
|
||||
|
||||
[[bin]]
|
||||
name = "syntaxis-cli"
|
||||
name = "syntaxis"
|
||||
path = "src/main.rs"
|
||||
|
||||
[package.metadata.syntaxis]
|
||||
|
||||
@ -63,7 +63,7 @@ async fn select_project_interactive() -> Result<String> {
|
||||
let app_name = ui_config::get_app_name();
|
||||
return Err(anyhow!(
|
||||
"No projects found in database. Run '{} init' first.",
|
||||
format!("{} ", app_name)
|
||||
app_name
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
@ -484,7 +484,7 @@ async fn main() -> Result<()> {
|
||||
// Find config file
|
||||
let config_path = find_config_path_warn_conflicts(
|
||||
"config.toml",
|
||||
cli.config.as_ref().map(|s| std::path::Path::new(s)),
|
||||
cli.config.as_deref().map(std::path::Path::new),
|
||||
)
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
|
||||
@ -9,8 +9,8 @@ use tempfile::TempDir;
|
||||
/// Create a temporary test directory with a valid lifecycle.toml config
|
||||
fn create_test_fixture() -> TempDir {
|
||||
let temp_dir = TempDir::new().expect("Failed to create temp directory");
|
||||
let config_dir = temp_dir.path().join(".project");
|
||||
fs::create_dir_all(&config_dir).expect("Failed to create .project directory");
|
||||
let config_dir = temp_dir.path().join(".syntaxis");
|
||||
fs::create_dir_all(&config_dir).expect("Failed to create .syntaxis directory");
|
||||
|
||||
let config_content = r#"project_type = "MultiLang"
|
||||
mode = "Standalone"
|
||||
|
||||
@ -1,6 +1,3 @@
|
||||
[workspace]
|
||||
# Independent package - not part of any workspace
|
||||
|
||||
[package]
|
||||
name = "syntaxis-bridge"
|
||||
version = "0.1.0"
|
||||
|
||||
@ -42,7 +42,7 @@ impl IntegrationRegistry {
|
||||
&self,
|
||||
name: &str,
|
||||
) -> Result<Arc<dyn EcosystemIntegration>, EcosystemIntegrationError> {
|
||||
self.integrations.get(name).map(|i| Arc::clone(i)).ok_or(
|
||||
self.integrations.get(name).map(Arc::clone).ok_or(
|
||||
EcosystemIntegrationError::NotFound {
|
||||
name: name.to_string(),
|
||||
},
|
||||
|
||||
@ -62,6 +62,5 @@ tokio-test = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
|
||||
[features]
|
||||
# Enable SurrealDB support (includes embedded database)
|
||||
surrealdb = ["dep:surrealdb"]
|
||||
default = ["surrealdb"]
|
||||
surrealdb-backend = ["dep:surrealdb"]
|
||||
default = ["surrealdb-backend"]
|
||||
|
||||
@ -49,8 +49,8 @@ pub use config::{
|
||||
SbomFormat,
|
||||
};
|
||||
pub use error::{LifecycleError, Result};
|
||||
#[cfg(feature = "surrealdb")]
|
||||
pub use persistence::SurrealDatabase;
|
||||
#[cfg(feature = "surrealdb-backend")]
|
||||
pub use persistence::{SurrealDatabase, SurrealDbBackendConfig, SurrealEngineConfig};
|
||||
pub use persistence::{
|
||||
Database, DatabaseConfig, DbChecklistItem, DbPhaseHistory, DbPhaseTransition, DbProject,
|
||||
DbSecurityAssessment, DbSecurityAssessmentDetail, DbTeamMember, DbToolConfiguration,
|
||||
|
||||
@ -3,13 +3,6 @@
|
||||
//! Provides configuration structures for different database backends.
|
||||
//! Configuration is typically loaded from TOML files and can be overridden
|
||||
//! by environment variables.
|
||||
//!
|
||||
//! # Examples
|
||||
//!
|
||||
//! ```ignore
|
||||
//! let config = DatabaseConfig::load("configs/database.toml")?;
|
||||
//! let db = config.create_database().await?;
|
||||
//! ```
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::path::Path;
|
||||
@ -23,10 +16,11 @@ pub struct DatabaseConfig {
|
||||
/// SQLite configuration (required if engine is "sqlite")
|
||||
pub sqlite: Option<SqliteConfig>,
|
||||
|
||||
/// SurrealDB configuration (required if engine is "surrealdb")
|
||||
pub surrealdb: Option<SurrealDbConfig>,
|
||||
/// SurrealDB dual-engine configuration (required if engine is "surrealdb")
|
||||
#[cfg(feature = "surrealdb-backend")]
|
||||
pub surrealdb: Option<SurrealDbBackendConfig>,
|
||||
|
||||
/// Optional PostgreSQL configuration for future support
|
||||
/// PostgreSQL configuration (future support)
|
||||
pub postgresql: Option<PostgresConfig>,
|
||||
}
|
||||
|
||||
@ -36,90 +30,125 @@ pub struct SqliteConfig {
|
||||
/// Path to SQLite database file
|
||||
pub path: String,
|
||||
|
||||
/// Maximum number of connections in pool (default: 5)
|
||||
/// Maximum number of connections in pool
|
||||
#[serde(default = "default_sqlite_pool_size")]
|
||||
pub max_connections: u32,
|
||||
|
||||
/// Connection timeout in seconds (default: 30)
|
||||
/// Connection timeout in seconds
|
||||
#[serde(default = "default_timeout_secs")]
|
||||
pub timeout_secs: u64,
|
||||
|
||||
/// Enable write-ahead logging (default: true)
|
||||
/// Enable write-ahead logging
|
||||
#[serde(default = "default_true")]
|
||||
pub wal_mode: bool,
|
||||
|
||||
/// PRAGMA synchronous setting: OFF, NORMAL, FULL (default: NORMAL)
|
||||
/// PRAGMA synchronous setting: OFF, NORMAL, FULL
|
||||
#[serde(default = "default_pragma_synchronous")]
|
||||
pub pragma_synchronous: String,
|
||||
|
||||
/// PRAGMA cache_size in pages (default: 2000)
|
||||
/// PRAGMA cache_size in pages
|
||||
#[serde(default = "default_cache_size")]
|
||||
pub pragma_cache_size: i32,
|
||||
}
|
||||
|
||||
/// SurrealDB-specific configuration
|
||||
/// SurrealDB engine selector — dispatches via `engine::any::connect(url)`.
|
||||
///
|
||||
/// Serialized with an internal `engine` tag so TOML records map directly:
|
||||
/// `{ engine = "surreal_kv", path = ".data/syntaxis/core" }` → `SurrealKv`.
|
||||
#[cfg(feature = "surrealdb-backend")]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SurrealDbConfig {
|
||||
/// SurrealDB server URL (e.g., "ws://localhost:8000")
|
||||
pub url: String,
|
||||
|
||||
/// Namespace for isolation (default: "syntaxis")
|
||||
#[serde(default = "default_namespace")]
|
||||
pub namespace: String,
|
||||
|
||||
/// Database name (default: "projects")
|
||||
#[serde(default = "default_database")]
|
||||
pub database: String,
|
||||
|
||||
/// Username for authentication
|
||||
pub username: String,
|
||||
|
||||
/// Password for authentication
|
||||
pub password: String,
|
||||
|
||||
/// Maximum connections (default: 10)
|
||||
#[serde(default = "default_surrealdb_pool_size")]
|
||||
pub max_connections: u32,
|
||||
|
||||
/// Connection timeout in seconds (default: 30)
|
||||
#[serde(default = "default_timeout_secs")]
|
||||
pub timeout_secs: u64,
|
||||
|
||||
/// Enable TLS (default: false)
|
||||
#[serde(default)]
|
||||
pub tls_enabled: bool,
|
||||
|
||||
/// TLS certificate path (optional)
|
||||
pub tls_cert_path: Option<String>,
|
||||
#[serde(tag = "engine", rename_all = "snake_case")]
|
||||
pub enum SurrealEngineConfig {
|
||||
/// In-memory — tests and ephemeral use only
|
||||
Mem,
|
||||
/// SurrealKV embedded (B-tree) — default for relational data (projects, tasks, phases)
|
||||
SurrealKv {
|
||||
/// Filesystem path for the SurrealKV database directory
|
||||
path: String,
|
||||
},
|
||||
/// RocksDB embedded (LSM) — default for append-heavy data (audit logs)
|
||||
RocksDb {
|
||||
/// Filesystem path for the RocksDB database directory
|
||||
path: String,
|
||||
},
|
||||
/// Remote WebSocket — team and cloud deployments
|
||||
Ws {
|
||||
/// Full WebSocket URL, e.g. `ws://surrealdb.internal:8000`
|
||||
url: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// PostgreSQL configuration (for future support)
|
||||
#[cfg(feature = "surrealdb-backend")]
|
||||
impl SurrealEngineConfig {
|
||||
/// Produce the URL string consumed by `surrealdb::engine::any::connect`.
|
||||
#[must_use]
|
||||
pub fn to_url(&self) -> String {
|
||||
match self {
|
||||
Self::Mem => "mem://".to_string(),
|
||||
Self::SurrealKv { path } => format!("surrealkv://{path}"),
|
||||
Self::RocksDb { path } => format!("rocksdb://{path}"),
|
||||
Self::Ws { url } => url.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Dual-engine SurrealDB backend configuration.
|
||||
///
|
||||
/// `core` (default: SurrealKV) stores relational/graph data with a B-tree engine
|
||||
/// suited for random-access patterns. `hot` (default: RocksDB) stores audit logs
|
||||
/// and append-heavy data with an LSM engine suited for sequential writes.
|
||||
#[cfg(feature = "surrealdb-backend")]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SurrealDbBackendConfig {
|
||||
/// Engine for relational data (projects, tasks, phases, checklists)
|
||||
#[serde(default = "default_core_engine")]
|
||||
pub core: SurrealEngineConfig,
|
||||
/// Engine for hot/append data (phase history, future audit embeddings)
|
||||
#[serde(default = "default_hot_engine")]
|
||||
pub hot: SurrealEngineConfig,
|
||||
/// SurrealDB namespace shared by both engines
|
||||
#[serde(default = "default_surreal_namespace")]
|
||||
pub namespace: String,
|
||||
/// Auth username (used for Ws engine only)
|
||||
pub username: Option<String>,
|
||||
/// Auth password (used for Ws engine only)
|
||||
pub password: Option<String>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "surrealdb-backend")]
|
||||
impl Default for SurrealDbBackendConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
core: default_core_engine(),
|
||||
hot: default_hot_engine(),
|
||||
namespace: default_surreal_namespace(),
|
||||
username: None,
|
||||
password: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// PostgreSQL configuration (future support)
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PostgresConfig {
|
||||
/// Connection string
|
||||
pub url: String,
|
||||
|
||||
/// Maximum connections (default: 10)
|
||||
/// Maximum connections
|
||||
#[serde(default = "default_postgres_pool_size")]
|
||||
pub max_connections: u32,
|
||||
|
||||
/// Connection timeout in seconds (default: 30)
|
||||
/// Connection timeout in seconds
|
||||
#[serde(default = "default_timeout_secs")]
|
||||
pub timeout_secs: u64,
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// DEFAULT IMPLEMENTATIONS
|
||||
// ============================================================================
|
||||
// ── default constructors ─────────────────────────────────────────────────────
|
||||
|
||||
fn default_sqlite_pool_size() -> u32 {
|
||||
5
|
||||
}
|
||||
|
||||
fn default_surrealdb_pool_size() -> u32 {
|
||||
10
|
||||
}
|
||||
|
||||
fn default_postgres_pool_size() -> u32 {
|
||||
10
|
||||
}
|
||||
@ -140,20 +169,29 @@ fn default_cache_size() -> i32 {
|
||||
2000
|
||||
}
|
||||
|
||||
fn default_namespace() -> String {
|
||||
#[cfg(feature = "surrealdb-backend")]
|
||||
fn default_core_engine() -> SurrealEngineConfig {
|
||||
SurrealEngineConfig::SurrealKv {
|
||||
path: ".data/syntaxis/core".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "surrealdb-backend")]
|
||||
fn default_hot_engine() -> SurrealEngineConfig {
|
||||
SurrealEngineConfig::RocksDb {
|
||||
path: ".data/syntaxis/hot".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "surrealdb-backend")]
|
||||
fn default_surreal_namespace() -> String {
|
||||
"syntaxis".to_string()
|
||||
}
|
||||
|
||||
fn default_database() -> String {
|
||||
"projects".to_string()
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// IMPLEMENTATIONS
|
||||
// ============================================================================
|
||||
// ── DatabaseConfig impl ──────────────────────────────────────────────────────
|
||||
|
||||
impl DatabaseConfig {
|
||||
/// Create a default SQLite configuration
|
||||
/// Create a default SQLite configuration.
|
||||
pub fn sqlite_default(path: &str) -> Self {
|
||||
Self {
|
||||
engine: "sqlite".to_string(),
|
||||
@ -165,45 +203,54 @@ impl DatabaseConfig {
|
||||
pragma_synchronous: "NORMAL".to_string(),
|
||||
pragma_cache_size: 2000,
|
||||
}),
|
||||
#[cfg(feature = "surrealdb-backend")]
|
||||
surrealdb: None,
|
||||
postgresql: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a default SurrealDB configuration
|
||||
pub fn surrealdb_default(url: &str, username: &str, password: &str) -> Self {
|
||||
/// Create a default SurrealDB configuration using embedded engines.
|
||||
#[cfg(feature = "surrealdb-backend")]
|
||||
pub fn surrealdb_default() -> Self {
|
||||
Self {
|
||||
engine: "surrealdb".to_string(),
|
||||
sqlite: None,
|
||||
surrealdb: Some(SurrealDbConfig {
|
||||
url: url.to_string(),
|
||||
namespace: default_namespace(),
|
||||
database: default_database(),
|
||||
username: username.to_string(),
|
||||
password: password.to_string(),
|
||||
max_connections: default_surrealdb_pool_size(),
|
||||
timeout_secs: default_timeout_secs(),
|
||||
tls_enabled: false,
|
||||
tls_cert_path: None,
|
||||
surrealdb: Some(SurrealDbBackendConfig::default()),
|
||||
postgresql: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a WebSocket-connected SurrealDB configuration.
|
||||
#[cfg(feature = "surrealdb-backend")]
|
||||
pub fn surrealdb_ws(url: &str, username: &str, password: &str) -> Self {
|
||||
Self {
|
||||
engine: "surrealdb".to_string(),
|
||||
sqlite: None,
|
||||
surrealdb: Some(SurrealDbBackendConfig {
|
||||
core: SurrealEngineConfig::Ws { url: url.to_string() },
|
||||
hot: SurrealEngineConfig::Ws { url: url.to_string() },
|
||||
namespace: default_surreal_namespace(),
|
||||
username: Some(username.to_string()),
|
||||
password: Some(password.to_string()),
|
||||
}),
|
||||
postgresql: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Load configuration from TOML file
|
||||
/// Load configuration from a TOML file.
|
||||
pub fn load_from_file<P: AsRef<Path>>(path: P) -> crate::error::Result<Self> {
|
||||
let contents = std::fs::read_to_string(path)?;
|
||||
Self::load_from_str(&contents)
|
||||
}
|
||||
|
||||
/// Load configuration from TOML string
|
||||
/// Load configuration from a TOML string.
|
||||
pub fn load_from_str(toml: &str) -> crate::error::Result<Self> {
|
||||
toml::from_str(toml).map_err(|e| {
|
||||
crate::error::LifecycleError::Config(format!("Failed to parse config: {}", e))
|
||||
crate::error::LifecycleError::Config(format!("Failed to parse config: {e}"))
|
||||
})
|
||||
}
|
||||
|
||||
/// Validate configuration
|
||||
/// Validate that the selected engine has a matching configuration block.
|
||||
pub fn validate(&self) -> crate::error::Result<()> {
|
||||
match self.engine.as_str() {
|
||||
"sqlite" => {
|
||||
@ -215,13 +262,22 @@ impl DatabaseConfig {
|
||||
Ok(())
|
||||
}
|
||||
"surrealdb" => {
|
||||
#[cfg(feature = "surrealdb-backend")]
|
||||
{
|
||||
self.surrealdb.as_ref().ok_or_else(|| {
|
||||
crate::error::LifecycleError::Config(
|
||||
"SurrealDB engine selected but no surrealdb config provided".to_string(),
|
||||
"SurrealDB engine selected but no surrealdb config provided"
|
||||
.to_string(),
|
||||
)
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
#[cfg(not(feature = "surrealdb-backend"))]
|
||||
Err(crate::error::LifecycleError::Config(
|
||||
"SurrealDB engine is not compiled in; enable feature 'surrealdb-backend'"
|
||||
.to_string(),
|
||||
))
|
||||
}
|
||||
"postgresql" => {
|
||||
self.postgresql.as_ref().ok_or_else(|| {
|
||||
crate::error::LifecycleError::Config(
|
||||
@ -231,28 +287,24 @@ impl DatabaseConfig {
|
||||
Ok(())
|
||||
}
|
||||
other => Err(crate::error::LifecycleError::Config(format!(
|
||||
"Unknown database engine: {}",
|
||||
other
|
||||
"Unknown database engine: {other}"
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the engine type
|
||||
pub fn get_engine(&self) -> &str {
|
||||
&self.engine
|
||||
}
|
||||
|
||||
/// Check if SQLite backend is configured
|
||||
/// Returns true if SQLite backend is configured.
|
||||
pub fn is_sqlite(&self) -> bool {
|
||||
self.engine == "sqlite"
|
||||
}
|
||||
|
||||
/// Check if SurrealDB backend is configured
|
||||
/// Returns true if SurrealDB backend is configured.
|
||||
pub fn is_surrealdb(&self) -> bool {
|
||||
self.engine == "surrealdb"
|
||||
}
|
||||
}
|
||||
|
||||
// ── tests ────────────────────────────────────────────────────────────────────
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@ -265,18 +317,6 @@ mod tests {
|
||||
assert_eq!(config.sqlite.unwrap().path, "test.db");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_surrealdb_default_config() {
|
||||
let config = DatabaseConfig::surrealdb_default("ws://localhost:8000", "user", "pass");
|
||||
assert!(config.is_surrealdb());
|
||||
assert!(config.surrealdb.is_some());
|
||||
|
||||
let sdb = config.surrealdb.unwrap();
|
||||
assert_eq!(sdb.url, "ws://localhost:8000");
|
||||
assert_eq!(sdb.username, "user");
|
||||
assert_eq!(sdb.password, "pass");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_sqlite() {
|
||||
let config = DatabaseConfig::sqlite_default("test.db");
|
||||
@ -288,6 +328,7 @@ mod tests {
|
||||
let config = DatabaseConfig {
|
||||
engine: "mongodb".to_string(),
|
||||
sqlite: None,
|
||||
#[cfg(feature = "surrealdb-backend")]
|
||||
surrealdb: None,
|
||||
postgresql: None,
|
||||
};
|
||||
@ -324,22 +365,80 @@ max_connections = 10
|
||||
assert!(config.wal_mode);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_surrealdb_config_defaults() {
|
||||
let config = SurrealDbConfig {
|
||||
url: "ws://localhost:8000".to_string(),
|
||||
namespace: default_namespace(),
|
||||
database: default_database(),
|
||||
username: "root".to_string(),
|
||||
password: "root".to_string(),
|
||||
max_connections: default_surrealdb_pool_size(),
|
||||
timeout_secs: default_timeout_secs(),
|
||||
tls_enabled: false,
|
||||
tls_cert_path: None,
|
||||
};
|
||||
#[cfg(feature = "surrealdb-backend")]
|
||||
mod surreal_tests {
|
||||
use super::*;
|
||||
|
||||
assert_eq!(config.namespace, "syntaxis");
|
||||
assert_eq!(config.database, "projects");
|
||||
assert_eq!(config.max_connections, 10);
|
||||
#[test]
|
||||
fn test_surrealdb_default_config() {
|
||||
let config = DatabaseConfig::surrealdb_default();
|
||||
assert!(config.is_surrealdb());
|
||||
assert!(config.surrealdb.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_surrealdb_ws_config() {
|
||||
let config =
|
||||
DatabaseConfig::surrealdb_ws("ws://localhost:8000", "root", "root");
|
||||
assert!(config.is_surrealdb());
|
||||
let cfg = config.surrealdb.unwrap();
|
||||
assert_eq!(cfg.username, Some("root".to_string()));
|
||||
assert!(matches!(cfg.core, SurrealEngineConfig::Ws { .. }));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_engine_to_url_mem() {
|
||||
assert_eq!(SurrealEngineConfig::Mem.to_url(), "mem://");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_engine_to_url_surrealkv() {
|
||||
let eng = SurrealEngineConfig::SurrealKv {
|
||||
path: "/data/core".to_string(),
|
||||
};
|
||||
assert_eq!(eng.to_url(), "surrealkv:///data/core");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_engine_to_url_rocksdb() {
|
||||
let eng = SurrealEngineConfig::RocksDb {
|
||||
path: "/data/hot".to_string(),
|
||||
};
|
||||
assert_eq!(eng.to_url(), "rocksdb:///data/hot");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_engine_to_url_ws() {
|
||||
let eng = SurrealEngineConfig::Ws {
|
||||
url: "ws://host:8000".to_string(),
|
||||
};
|
||||
assert_eq!(eng.to_url(), "ws://host:8000");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_backend_config_default() {
|
||||
let cfg = SurrealDbBackendConfig::default();
|
||||
assert_eq!(cfg.namespace, "syntaxis");
|
||||
assert!(cfg.username.is_none());
|
||||
assert!(matches!(cfg.core, SurrealEngineConfig::SurrealKv { .. }));
|
||||
assert!(matches!(cfg.hot, SurrealEngineConfig::RocksDb { .. }));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_surrealdb() {
|
||||
let config = DatabaseConfig::surrealdb_default();
|
||||
assert!(config.validate().is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_surrealdb_missing_config() {
|
||||
let config = DatabaseConfig {
|
||||
engine: "surrealdb".to_string(),
|
||||
sqlite: None,
|
||||
surrealdb: None,
|
||||
postgresql: None,
|
||||
};
|
||||
assert!(config.validate().is_err());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -26,14 +26,16 @@ pub mod config;
|
||||
pub mod error;
|
||||
pub mod migration;
|
||||
pub mod sqlite_impl;
|
||||
#[cfg(feature = "surrealdb")]
|
||||
#[cfg(feature = "surrealdb-backend")]
|
||||
pub mod surrealdb_impl;
|
||||
|
||||
// Re-export public types
|
||||
pub use config::DatabaseConfig;
|
||||
#[cfg(feature = "surrealdb-backend")]
|
||||
pub use config::{SurrealDbBackendConfig, SurrealEngineConfig};
|
||||
pub use error::PersistenceError;
|
||||
pub use sqlite_impl::SqliteDatabase;
|
||||
#[cfg(feature = "surrealdb")]
|
||||
#[cfg(feature = "surrealdb-backend")]
|
||||
pub use surrealdb_impl::SurrealDatabase;
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -1,32 +1,36 @@
|
||||
// Integration tests for SurrealDB 2.3 with server mode
|
||||
// Integration tests for SurrealDB 3 with server mode
|
||||
//
|
||||
// To run these tests, start SurrealDB server in another terminal:
|
||||
// surreal start --bind 127.0.0.1:8000 memory
|
||||
// surreal start --bind 127.0.0.1:8000 memory --user root --pass root
|
||||
//
|
||||
// Then run: cargo test --test integration_surrealdb -- --test-threads=1
|
||||
// Then run: cargo test --test integration_surrealdb --features surrealdb-backend -- --test-threads=1
|
||||
|
||||
#[cfg(all(test, feature = "surrealdb"))]
|
||||
#[cfg(all(test, feature = "surrealdb-backend"))]
|
||||
mod surrealdb_integration_tests {
|
||||
use chrono::Utc;
|
||||
use syntaxis_core::persistence::{
|
||||
Database, DbChecklistItem, DbPhaseTransition, DbProject, DbSecurityAssessment,
|
||||
DbTeamMember, SurrealDatabase,
|
||||
DbTeamMember, SurrealDatabase, SurrealDbBackendConfig, SurrealEngineConfig,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Helper to connect to SurrealDB server
|
||||
/// Requires: surreal start --bind 127.0.0.1:8000 memory
|
||||
/// Helper to connect to SurrealDB server.
|
||||
///
|
||||
/// Requires: `surreal start --bind 127.0.0.1:8000 memory --user root --pass root`
|
||||
async fn get_server_db() -> Result<SurrealDatabase, Box<dyn std::error::Error>> {
|
||||
// Try to connect to server; if it fails, we'll skip the test
|
||||
let db = SurrealDatabase::new_server(
|
||||
"ws://localhost:8000",
|
||||
"test_workspace",
|
||||
"test_projects",
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
let cfg = SurrealDbBackendConfig {
|
||||
core: SurrealEngineConfig::Ws {
|
||||
url: "ws://localhost:8000".to_string(),
|
||||
},
|
||||
hot: SurrealEngineConfig::Ws {
|
||||
url: "ws://localhost:8000".to_string(),
|
||||
},
|
||||
namespace: "test_workspace".to_string(),
|
||||
username: Some("root".to_string()),
|
||||
password: Some("root".to_string()),
|
||||
};
|
||||
|
||||
let db = SurrealDatabase::from_config(&cfg).await?;
|
||||
Ok(db)
|
||||
}
|
||||
|
||||
|
||||
@ -13,6 +13,7 @@ syntaxis-core = { path = "../syntaxis" }
|
||||
|
||||
# Async
|
||||
tokio = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
|
||||
# Logging and tracing
|
||||
tracing = { workspace = true }
|
||||
@ -21,19 +22,31 @@ tracing-subscriber = { workspace = true }
|
||||
# Serialization
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
|
||||
# UUID and dates
|
||||
uuid = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
|
||||
# HTTP client (with cookies feature)
|
||||
# HTTP client
|
||||
reqwest = { workspace = true }
|
||||
|
||||
# NATS JetStream
|
||||
async-nats = { workspace = true }
|
||||
# NATS JetStream with NKey auth
|
||||
platform-nats = { workspace = true }
|
||||
|
||||
# WebSocket (older version 0.20 for this crate)
|
||||
# Stratum orchestration
|
||||
stratum-orchestrator = { workspace = true }
|
||||
stratum-graph = { workspace = true }
|
||||
stratum-state = { workspace = true }
|
||||
|
||||
# WebSocket
|
||||
tokio-tungstenite = { workspace = true }
|
||||
|
||||
# Other utilities
|
||||
# Error handling
|
||||
anyhow = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
orchestration = []
|
||||
kogral = []
|
||||
|
||||
184
core/crates/vapora/src/kogral_bridge.rs
Normal file
184
core/crates/vapora/src/kogral_bridge.rs
Normal file
@ -0,0 +1,184 @@
|
||||
//! kogral integration bridge — forwards syntaxis lifecycle events to kogral.
|
||||
//!
|
||||
//! Enabled with the `kogral` feature. Internally uses `platform-nats::EventStream`
|
||||
//! so the kogral-core dependency chain is not required.
|
||||
|
||||
use bytes::Bytes;
|
||||
use platform_nats::{EventStream, NatsConfig};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::VaporaResult;
|
||||
|
||||
/// Connection config for the kogral NATS cluster.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct KogralBridgeConfig {
|
||||
/// NATS server URL of the kogral cluster
|
||||
pub url: String,
|
||||
/// JetStream stream that kogral consumes (default: `KOGRAL`)
|
||||
pub stream_name: String,
|
||||
/// Consumer name (default: `syntaxis-kogral`)
|
||||
pub consumer_name: String,
|
||||
/// NKey seed for signing (optional)
|
||||
pub nkey_seed: Option<String>,
|
||||
/// Trusted public NKeys (empty → accept all)
|
||||
pub trusted_nkeys: Vec<String>,
|
||||
}
|
||||
|
||||
impl Default for KogralBridgeConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
url: "nats://localhost:4222".to_string(),
|
||||
stream_name: "KOGRAL".to_string(),
|
||||
consumer_name: "syntaxis-kogral".to_string(),
|
||||
nkey_seed: None,
|
||||
trusted_nkeys: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Lifecycle event forwarded to kogral for knowledge graph indexing.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct KogralEvent {
|
||||
/// Dot-separated event type, e.g. `syntaxis.project.created`
|
||||
pub event_type: String,
|
||||
/// Source project identifier
|
||||
pub project_id: String,
|
||||
/// Optional phase name relevant to the event
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub phase: Option<String>,
|
||||
/// RFC 3339 timestamp
|
||||
pub timestamp: String,
|
||||
/// Arbitrary structured payload
|
||||
pub payload: serde_json::Value,
|
||||
}
|
||||
|
||||
/// Bridge that publishes syntaxis lifecycle events to kogral via NATS.
|
||||
pub struct KogralBridge {
|
||||
stream: EventStream,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for KogralBridge {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("KogralBridge").finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl KogralBridge {
|
||||
/// Connect to the kogral NATS cluster.
|
||||
pub async fn connect(cfg: &KogralBridgeConfig) -> VaporaResult<Self> {
|
||||
let nats_cfg = NatsConfig {
|
||||
url: cfg.url.clone(),
|
||||
stream_name: cfg.stream_name.clone(),
|
||||
consumer_name: cfg.consumer_name.clone(),
|
||||
subjects: vec![format!("kogral.syntaxis.>")],
|
||||
nkey_seed: cfg.nkey_seed.clone(),
|
||||
trusted_nkeys: cfg.trusted_nkeys.clone(),
|
||||
require_signed_messages: false,
|
||||
};
|
||||
|
||||
let stream = EventStream::connect(&nats_cfg).await?;
|
||||
tracing::info!(url = %cfg.url, stream = %cfg.stream_name, "kogral bridge connected");
|
||||
Ok(Self { stream })
|
||||
}
|
||||
|
||||
/// Emit a generic lifecycle event to `kogral.syntaxis.<event_type>`.
|
||||
pub async fn emit(&self, event: &KogralEvent) -> VaporaResult<()> {
|
||||
let subject = format!("kogral.syntaxis.{}", event.event_type.replace('.', "-"));
|
||||
let payload = Bytes::from(serde_json::to_vec(event)?);
|
||||
self.stream.publish(&subject, payload).await?;
|
||||
tracing::debug!(event_type = %event.event_type, project_id = %event.project_id, "kogral event emitted");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Emit a project-created event.
|
||||
pub async fn on_project_created(&self, project_id: &str) -> VaporaResult<()> {
|
||||
self.emit(&KogralEvent {
|
||||
event_type: "project.created".to_string(),
|
||||
project_id: project_id.to_string(),
|
||||
phase: None,
|
||||
timestamp: chrono::Utc::now().to_rfc3339(),
|
||||
payload: serde_json::json!({ "project_id": project_id }),
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Emit a phase-transition event.
|
||||
pub async fn on_phase_transition(
|
||||
&self,
|
||||
project_id: &str,
|
||||
from_phase: &str,
|
||||
to_phase: &str,
|
||||
) -> VaporaResult<()> {
|
||||
self.emit(&KogralEvent {
|
||||
event_type: "phase.transition".to_string(),
|
||||
project_id: project_id.to_string(),
|
||||
phase: Some(to_phase.to_string()),
|
||||
timestamp: chrono::Utc::now().to_rfc3339(),
|
||||
payload: serde_json::json!({
|
||||
"from": from_phase,
|
||||
"to": to_phase,
|
||||
}),
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Emit a task-completed event.
|
||||
pub async fn on_task_completed(
|
||||
&self,
|
||||
project_id: &str,
|
||||
task_id: &str,
|
||||
phase: &str,
|
||||
) -> VaporaResult<()> {
|
||||
self.emit(&KogralEvent {
|
||||
event_type: "task.completed".to_string(),
|
||||
project_id: project_id.to_string(),
|
||||
phase: Some(phase.to_string()),
|
||||
timestamp: chrono::Utc::now().to_rfc3339(),
|
||||
payload: serde_json::json!({ "task_id": task_id }),
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_kogral_bridge_config_default() {
|
||||
let cfg = KogralBridgeConfig::default();
|
||||
assert_eq!(cfg.stream_name, "KOGRAL");
|
||||
assert!(cfg.nkey_seed.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_kogral_event_serde() {
|
||||
let event = KogralEvent {
|
||||
event_type: "project.created".to_string(),
|
||||
project_id: "p1".to_string(),
|
||||
phase: None,
|
||||
timestamp: "2026-01-01T00:00:00Z".to_string(),
|
||||
payload: serde_json::json!({ "project_id": "p1" }),
|
||||
};
|
||||
let json = serde_json::to_string(&event).unwrap();
|
||||
let decoded: KogralEvent = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(decoded.event_type, "project.created");
|
||||
// `phase` is None → must be absent from the serialized JSON (skip_serializing_if)
|
||||
assert!(!json.contains("\"phase\""));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_kogral_event_with_phase_serde() {
|
||||
let event = KogralEvent {
|
||||
event_type: "phase.transition".to_string(),
|
||||
project_id: "p2".to_string(),
|
||||
phase: Some("devel".to_string()),
|
||||
timestamp: "2026-01-01T00:00:00Z".to_string(),
|
||||
payload: serde_json::json!({ "from": "create", "to": "devel" }),
|
||||
};
|
||||
let json = serde_json::to_string(&event).unwrap();
|
||||
assert!(json.contains("\"phase\""));
|
||||
let decoded: KogralEvent = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(decoded.phase.unwrap(), "devel");
|
||||
}
|
||||
}
|
||||
@ -27,9 +27,13 @@ pub mod llm_provider;
|
||||
pub mod multi_ia_router;
|
||||
pub mod nats_bridge;
|
||||
pub mod observability;
|
||||
pub mod orchestration;
|
||||
pub mod plugin;
|
||||
pub mod security;
|
||||
|
||||
#[cfg(feature = "kogral")]
|
||||
pub mod kogral_bridge;
|
||||
|
||||
/// VAPORA lifecycle integration result type
|
||||
pub type VaporaResult<T> = anyhow::Result<T>;
|
||||
|
||||
|
||||
@ -1,251 +1,248 @@
|
||||
//! NATS JetStream bridge for agent communication
|
||||
//!
|
||||
//! Real integration with NATS for multi-agent orchestration.
|
||||
//! NATS JetStream bridge using `platform-nats` with NKey auth and signed messages.
|
||||
|
||||
use anyhow::anyhow;
|
||||
use bytes::Bytes;
|
||||
use platform_nats::{EventStream, NatsConfig};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::Duration;
|
||||
|
||||
/// NATS broker configuration
|
||||
#[derive(Debug, Clone)]
|
||||
use crate::VaporaResult;
|
||||
|
||||
/// Broker-level configuration for `NatsBridge`.
|
||||
///
|
||||
/// Call `NatsBrokerConfig::to_platform_config()` to convert to the
|
||||
/// `platform_nats::NatsConfig` expected by `EventStream::connect`.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct NatsBrokerConfig {
|
||||
/// NATS server URL
|
||||
/// NATS server URL, e.g. `nats://localhost:4222`
|
||||
pub url: String,
|
||||
/// Max retries for connection
|
||||
pub max_retries: u32,
|
||||
/// Connection timeout
|
||||
pub timeout: Duration,
|
||||
/// JetStream stream name (created if absent)
|
||||
pub stream_name: String,
|
||||
/// Durable pull-consumer name (created if absent)
|
||||
pub consumer_name: String,
|
||||
/// Subject filter list, e.g. `["syntaxis.>"]`
|
||||
pub subjects: Vec<String>,
|
||||
/// ed25519 NKey seed for signing outbound messages (optional)
|
||||
pub nkey_seed: Option<String>,
|
||||
/// Public NKeys whose signatures are accepted (empty → accept all)
|
||||
pub trusted_nkeys: Vec<String>,
|
||||
/// Reject inbound messages that lack a valid NKey signature
|
||||
pub require_signed_messages: bool,
|
||||
}
|
||||
|
||||
impl Default for NatsBrokerConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
url: "nats://localhost:4222".to_string(),
|
||||
max_retries: 3,
|
||||
timeout: Duration::from_secs(10),
|
||||
stream_name: "SYNTAXIS".to_string(),
|
||||
consumer_name: "syntaxis-consumer".to_string(),
|
||||
subjects: vec!["syntaxis.>".to_string()],
|
||||
nkey_seed: None,
|
||||
trusted_nkeys: vec![],
|
||||
require_signed_messages: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Task message for NATS
|
||||
impl NatsBrokerConfig {
|
||||
fn to_platform_config(&self) -> NatsConfig {
|
||||
NatsConfig {
|
||||
url: self.url.clone(),
|
||||
stream_name: self.stream_name.clone(),
|
||||
consumer_name: self.consumer_name.clone(),
|
||||
subjects: self.subjects.clone(),
|
||||
nkey_seed: self.nkey_seed.clone(),
|
||||
trusted_nkeys: self.trusted_nkeys.clone(),
|
||||
require_signed_messages: self.require_signed_messages,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Event configuration matching the kogral `NatsEventConfig` schema.
|
||||
///
|
||||
/// Allows downstream systems to express NATS connectivity in a unified format
|
||||
/// and convert to `NatsBrokerConfig` via `From`.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct NatsEventConfig {
|
||||
/// NATS server URL
|
||||
pub url: String,
|
||||
/// JetStream stream name
|
||||
pub stream: String,
|
||||
/// Durable pull-consumer name
|
||||
pub consumer: String,
|
||||
/// Subject filter list
|
||||
pub subjects: Vec<String>,
|
||||
/// NKey seed for signing outbound messages
|
||||
#[serde(default)]
|
||||
pub nkey_seed: Option<String>,
|
||||
/// Trusted public NKeys
|
||||
#[serde(default)]
|
||||
pub trusted_nkeys: Vec<String>,
|
||||
/// Reject inbound messages lacking a valid NKey signature
|
||||
#[serde(default)]
|
||||
pub require_signed_messages: bool,
|
||||
}
|
||||
|
||||
impl From<NatsEventConfig> for NatsBrokerConfig {
|
||||
fn from(cfg: NatsEventConfig) -> Self {
|
||||
Self {
|
||||
url: cfg.url,
|
||||
stream_name: cfg.stream,
|
||||
consumer_name: cfg.consumer,
|
||||
subjects: cfg.subjects,
|
||||
nkey_seed: cfg.nkey_seed,
|
||||
trusted_nkeys: cfg.trusted_nkeys,
|
||||
require_signed_messages: cfg.require_signed_messages,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Task message published to `syntaxis.tasks.<agent_role>.submit`
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct TaskMessage {
|
||||
/// Task ID
|
||||
/// Unique task identifier
|
||||
pub task_id: String,
|
||||
/// Agent role
|
||||
/// Agent role this task is targeted at
|
||||
pub agent_role: String,
|
||||
/// Task title
|
||||
/// Short task title
|
||||
pub title: String,
|
||||
/// Task description
|
||||
/// Full task description
|
||||
pub description: String,
|
||||
/// Priority (1-10)
|
||||
/// Priority in `1..=10` (higher = more urgent)
|
||||
pub priority: u32,
|
||||
/// Task data
|
||||
/// Arbitrary structured task data
|
||||
pub data: serde_json::Value,
|
||||
}
|
||||
|
||||
/// Task result from agent
|
||||
/// Task result consumed from `syntaxis.tasks.<task_id>.result`
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct TaskResult {
|
||||
/// Task ID
|
||||
/// Task identifier correlating to the originating `TaskMessage`
|
||||
pub task_id: String,
|
||||
/// Agent who completed it
|
||||
/// Agent that completed (or failed) the task
|
||||
pub agent_role: String,
|
||||
/// Status (completed, failed, timeout)
|
||||
/// `"completed"`, `"failed"`, or `"timeout"`
|
||||
pub status: String,
|
||||
/// Result data
|
||||
/// Result payload produced by the agent
|
||||
pub result: serde_json::Value,
|
||||
/// Execution time in milliseconds
|
||||
/// Wall-clock execution time in milliseconds
|
||||
pub execution_time_ms: u64,
|
||||
}
|
||||
|
||||
/// Agent status message
|
||||
/// Agent heartbeat published to `syntaxis.agents.<role>.status`
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AgentStatus {
|
||||
/// Agent role
|
||||
/// Agent role identifier
|
||||
pub role: String,
|
||||
/// Is healthy
|
||||
/// Whether the agent is currently healthy
|
||||
pub healthy: bool,
|
||||
/// Current task count
|
||||
/// Number of tasks currently in flight
|
||||
pub task_count: usize,
|
||||
/// Max capacity
|
||||
/// Maximum concurrent task capacity
|
||||
pub capacity: usize,
|
||||
/// Last heartbeat
|
||||
/// RFC 3339 timestamp of the last heartbeat
|
||||
pub last_heartbeat: String,
|
||||
}
|
||||
|
||||
/// NATS JetStream agent bridge (production-ready)
|
||||
#[derive(Debug, Clone)]
|
||||
/// Production NATS JetStream bridge backed by `platform-nats::EventStream`.
|
||||
pub struct NatsBridge {
|
||||
/// NATS broker configuration
|
||||
#[allow(dead_code)]
|
||||
stream: EventStream,
|
||||
config: NatsBrokerConfig,
|
||||
// In production: async_nats::jetstream::Context
|
||||
/// Connection state
|
||||
_connected: bool,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for NatsBridge {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("NatsBridge")
|
||||
.field("config", &self.config)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl NatsBridge {
|
||||
/// Create new NATS bridge with configuration
|
||||
pub async fn new(config: NatsBrokerConfig) -> anyhow::Result<Self> {
|
||||
// Production: Connect to NATS
|
||||
// let client = async_nats::connect(&config.url).await
|
||||
// .map_err(|e| anyhow!("Failed to connect to NATS: {}", e))?;
|
||||
// let jetstream = async_nats::jetstream::new(client);
|
||||
|
||||
tracing::info!("NATS Bridge initialized with URL: {}", config.url);
|
||||
|
||||
Ok(Self {
|
||||
config,
|
||||
_connected: true,
|
||||
})
|
||||
/// Connect to NATS and materialise the configured stream + consumer.
|
||||
pub async fn connect(config: NatsBrokerConfig) -> VaporaResult<Self> {
|
||||
let stream = EventStream::connect(&config.to_platform_config()).await?;
|
||||
tracing::info!(url = %config.url, stream = %config.stream_name, "NATS bridge connected");
|
||||
Ok(Self { stream, config })
|
||||
}
|
||||
|
||||
/// Submit task to agent pool
|
||||
pub async fn submit_task(&self, task: &TaskMessage) -> anyhow::Result<String> {
|
||||
let _subject = format!("tasks.{}.submit", task.agent_role);
|
||||
let _payload = serde_json::to_vec(task)?;
|
||||
|
||||
tracing::info!("Publishing task {} to subject: {}", task.task_id, _subject);
|
||||
|
||||
// Production:
|
||||
// let context = async_nats::jetstream::new(client);
|
||||
// context.publish(_subject, _payload.into()).await?;
|
||||
|
||||
/// Publish a task to `syntaxis.tasks.<agent_role>.submit`.
|
||||
///
|
||||
/// Returns the `task_id` on success for correlation tracking.
|
||||
pub async fn submit_task(&self, task: &TaskMessage) -> VaporaResult<String> {
|
||||
let subject = format!("syntaxis.tasks.{}.submit", task.agent_role);
|
||||
let payload = Bytes::from(serde_json::to_vec(task)?);
|
||||
self.stream.publish(&subject, payload).await?;
|
||||
tracing::debug!(task_id = %task.task_id, subject = %subject, "task submitted");
|
||||
Ok(task.task_id.clone())
|
||||
}
|
||||
|
||||
/// Request task from agent (synchronous with timeout)
|
||||
pub async fn request_task_result(
|
||||
&self,
|
||||
task_id: &str,
|
||||
_timeout: Duration,
|
||||
) -> anyhow::Result<TaskResult> {
|
||||
let _subject = format!("tasks.{}.result", task_id);
|
||||
/// Pull up to `max_msgs` task results from the consumer.
|
||||
///
|
||||
/// Each message is ACKed after successful deserialization.
|
||||
/// Messages that fail to deserialize are ACKed (removed) and excluded.
|
||||
pub async fn pull_task_results(&self, max_msgs: usize) -> VaporaResult<Vec<TaskResult>> {
|
||||
let batch = self.stream.pull_batch(max_msgs).await?;
|
||||
let mut results = Vec::with_capacity(batch.len());
|
||||
|
||||
tracing::info!("Requesting task result: {}", task_id);
|
||||
|
||||
// Production:
|
||||
// let msg = jetstream
|
||||
// .request(_subject, "".into(), _timeout)
|
||||
// .await?;
|
||||
// Ok(serde_json::from_slice(&msg.payload)?)
|
||||
|
||||
Err(anyhow!("Task {} not yet completed", task_id))
|
||||
for (subject, payload, msg) in batch {
|
||||
match serde_json::from_slice::<TaskResult>(&payload) {
|
||||
Ok(result) => {
|
||||
msg.ack()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("ack failed on '{subject}': {e}"))?;
|
||||
results.push(result);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(subject = %subject, "dropping unparseable task result: {e}");
|
||||
let _ = msg.ack().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Subscribe to task results
|
||||
pub async fn subscribe_task_results(&self, agent_role: &str) -> anyhow::Result<()> {
|
||||
let subject = format!("tasks.{}.results", agent_role);
|
||||
|
||||
tracing::info!("Subscribing to task results: {}", subject);
|
||||
|
||||
// Production:
|
||||
// let subscriber = jetstream.subscribe(subject).await?;
|
||||
// loop {
|
||||
// if let Ok(Some(msg)) = subscriber.next_timeout(Duration::from_secs(5)).await {
|
||||
// let result: TaskResult = serde_json::from_slice(&msg.payload)?;
|
||||
// self.process_result(&result).await?;
|
||||
// }
|
||||
// }
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Publish an agent status heartbeat.
|
||||
pub async fn publish_agent_status(&self, status: &AgentStatus) -> VaporaResult<()> {
|
||||
let subject = format!("syntaxis.agents.{}.status", status.role);
|
||||
let payload = Bytes::from(serde_json::to_vec(status)?);
|
||||
self.stream.publish(&subject, payload).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Publish agent status
|
||||
pub async fn publish_agent_status(&self, status: &AgentStatus) -> anyhow::Result<()> {
|
||||
let _subject = format!("agents.{}.status", status.role);
|
||||
let _payload = serde_json::to_vec(status)?;
|
||||
|
||||
tracing::info!("Publishing agent status: {} -> {}", status.role, _subject);
|
||||
|
||||
// Production:
|
||||
// jetstream.publish(_subject, _payload.into()).await?;
|
||||
|
||||
Ok(())
|
||||
/// Ping NATS by publishing a zero-byte keepalive to `syntaxis.ping`.
|
||||
pub async fn health_check(&self) -> VaporaResult<()> {
|
||||
self.stream
|
||||
.publish("syntaxis.ping", Bytes::new())
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("NATS health check failed: {e}"))
|
||||
}
|
||||
|
||||
/// Subscribe to agent status updates
|
||||
pub async fn subscribe_agent_status(&self) -> anyhow::Result<()> {
|
||||
let _subject = "agents.*.status";
|
||||
|
||||
tracing::info!("Subscribing to agent status updates");
|
||||
|
||||
// Production:
|
||||
// let subscriber = jetstream.subscribe(subject).await?;
|
||||
// Handle incoming status updates
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// List active agents
|
||||
pub async fn list_agents(&self) -> anyhow::Result<Vec<AgentStatus>> {
|
||||
// Production: Request agent list from NATS directory
|
||||
tracing::info!("Listing active agents");
|
||||
|
||||
Ok(vec![
|
||||
AgentStatus {
|
||||
role: "developer".to_string(),
|
||||
healthy: true,
|
||||
task_count: 2,
|
||||
capacity: 10,
|
||||
last_heartbeat: chrono::Local::now().to_rfc3339(),
|
||||
},
|
||||
AgentStatus {
|
||||
role: "tester".to_string(),
|
||||
healthy: true,
|
||||
task_count: 1,
|
||||
capacity: 5,
|
||||
last_heartbeat: chrono::Local::now().to_rfc3339(),
|
||||
},
|
||||
])
|
||||
}
|
||||
|
||||
/// Wait for task completion
|
||||
/// Wait up to `max_wait` for a specific `task_id` result via polling pull.
|
||||
pub async fn wait_for_completion(
|
||||
&self,
|
||||
task_id: &str,
|
||||
max_wait: Duration,
|
||||
) -> anyhow::Result<TaskResult> {
|
||||
) -> VaporaResult<TaskResult> {
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
loop {
|
||||
match self
|
||||
.request_task_result(task_id, Duration::from_secs(1))
|
||||
.await
|
||||
{
|
||||
Ok(result) => return Ok(result),
|
||||
Err(_) => {
|
||||
let batch = self.pull_task_results(32).await?;
|
||||
if let Some(r) = batch.into_iter().find(|r| r.task_id == task_id) {
|
||||
return Ok(r);
|
||||
}
|
||||
|
||||
if start.elapsed() > max_wait {
|
||||
return Err(anyhow!("Task {} timed out", task_id));
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
return Err(anyhow::anyhow!("task '{}' timed out", task_id));
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Get health status of NATS connection
|
||||
pub async fn health_check(&self) -> anyhow::Result<()> {
|
||||
// Production: Send ping to NATS server
|
||||
tracing::info!("NATS health check");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create JetStream stream for tasks if not exists
|
||||
pub async fn ensure_stream(&self) -> anyhow::Result<()> {
|
||||
// Production:
|
||||
// let jetstream = async_nats::jetstream::new(client);
|
||||
// jetstream
|
||||
// .get_or_create_stream(async_nats::jetstream::stream::Config {
|
||||
// name: "TASKS".to_string(),
|
||||
// subjects: vec!["tasks.>".to_string()],
|
||||
// ..Default::default()
|
||||
// })
|
||||
// .await?;
|
||||
|
||||
tracing::info!("Task stream ensured");
|
||||
Ok(())
|
||||
/// Return the broker configuration this bridge was created with.
|
||||
pub fn config(&self) -> &NatsBrokerConfig {
|
||||
&self.config
|
||||
}
|
||||
}
|
||||
|
||||
@ -254,80 +251,78 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_nats_config_default() {
|
||||
let config = NatsBrokerConfig::default();
|
||||
assert_eq!(config.url, "nats://localhost:4222");
|
||||
assert_eq!(config.max_retries, 3);
|
||||
fn test_broker_config_default() {
|
||||
let cfg = NatsBrokerConfig::default();
|
||||
assert_eq!(cfg.url, "nats://localhost:4222");
|
||||
assert_eq!(cfg.stream_name, "SYNTAXIS");
|
||||
assert!(!cfg.require_signed_messages);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_task_message() {
|
||||
let task = TaskMessage {
|
||||
task_id: "task1".to_string(),
|
||||
agent_role: "developer".to_string(),
|
||||
title: "Test Task".to_string(),
|
||||
description: "A test".to_string(),
|
||||
priority: 5,
|
||||
data: serde_json::json!({}),
|
||||
fn test_nats_event_config_into_broker() {
|
||||
let event_cfg = NatsEventConfig {
|
||||
url: "nats://prod:4222".to_string(),
|
||||
stream: "PROD_STREAM".to_string(),
|
||||
consumer: "prod-consumer".to_string(),
|
||||
subjects: vec!["syntaxis.>".to_string()],
|
||||
nkey_seed: Some("SXXX".to_string()),
|
||||
trusted_nkeys: vec!["PUBKEY".to_string()],
|
||||
require_signed_messages: true,
|
||||
};
|
||||
|
||||
assert_eq!(task.agent_role, "developer");
|
||||
let broker: NatsBrokerConfig = event_cfg.into();
|
||||
assert_eq!(broker.url, "nats://prod:4222");
|
||||
assert_eq!(broker.stream_name, "PROD_STREAM");
|
||||
assert_eq!(broker.nkey_seed.unwrap(), "SXXX");
|
||||
assert!(broker.require_signed_messages);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_task_result() {
|
||||
let result = TaskResult {
|
||||
task_id: "task1".to_string(),
|
||||
fn test_to_platform_config_round_trip() {
|
||||
let cfg = NatsBrokerConfig {
|
||||
url: "nats://host:4222".to_string(),
|
||||
stream_name: "S".to_string(),
|
||||
consumer_name: "C".to_string(),
|
||||
subjects: vec!["s.>".to_string()],
|
||||
nkey_seed: None,
|
||||
trusted_nkeys: vec![],
|
||||
require_signed_messages: false,
|
||||
};
|
||||
|
||||
let platform = cfg.to_platform_config();
|
||||
assert_eq!(platform.url, cfg.url);
|
||||
assert_eq!(platform.stream_name, cfg.stream_name);
|
||||
assert_eq!(platform.consumer_name, cfg.consumer_name);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_task_message_serde() {
|
||||
let task = TaskMessage {
|
||||
task_id: "t1".to_string(),
|
||||
agent_role: "developer".to_string(),
|
||||
title: "Impl X".to_string(),
|
||||
description: "Details".to_string(),
|
||||
priority: 5,
|
||||
data: serde_json::json!({ "repo": "syntaxis" }),
|
||||
};
|
||||
let json = serde_json::to_string(&task).unwrap();
|
||||
let decoded: TaskMessage = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(decoded.task_id, "t1");
|
||||
assert_eq!(decoded.priority, 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_task_result_serde() {
|
||||
let r = TaskResult {
|
||||
task_id: "t1".to_string(),
|
||||
agent_role: "developer".to_string(),
|
||||
status: "completed".to_string(),
|
||||
result: serde_json::json!({ "success": true }),
|
||||
execution_time_ms: 1500,
|
||||
execution_time_ms: 420,
|
||||
};
|
||||
|
||||
assert_eq!(result.status, "completed");
|
||||
assert_eq!(result.execution_time_ms, 1500);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_bridge_creation() {
|
||||
let config = NatsBrokerConfig::default();
|
||||
let bridge = NatsBridge::new(config).await;
|
||||
assert!(bridge.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_submit_task() {
|
||||
let config = NatsBrokerConfig::default();
|
||||
let bridge = NatsBridge::new(config).await.unwrap();
|
||||
|
||||
let task = TaskMessage {
|
||||
task_id: "task1".to_string(),
|
||||
agent_role: "developer".to_string(),
|
||||
title: "Test".to_string(),
|
||||
description: "Test task".to_string(),
|
||||
priority: 5,
|
||||
data: serde_json::json!({}),
|
||||
};
|
||||
|
||||
let result = bridge.submit_task(&task).await;
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_agents() {
|
||||
let config = NatsBrokerConfig::default();
|
||||
let bridge = NatsBridge::new(config).await.unwrap();
|
||||
|
||||
let agents = bridge.list_agents().await.unwrap();
|
||||
assert!(!agents.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_health_check() {
|
||||
let config = NatsBrokerConfig::default();
|
||||
let bridge = NatsBridge::new(config).await.unwrap();
|
||||
|
||||
let health = bridge.health_check().await;
|
||||
assert!(health.is_ok());
|
||||
let json = serde_json::to_string(&r).unwrap();
|
||||
let decoded: TaskResult = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(decoded.status, "completed");
|
||||
assert_eq!(decoded.execution_time_ms, 420);
|
||||
}
|
||||
}
|
||||
|
||||
216
core/crates/vapora/src/orchestration.rs
Normal file
216
core/crates/vapora/src/orchestration.rs
Normal file
@ -0,0 +1,216 @@
|
||||
//! Phase orchestration wiring: `StageRunner` + `ActionGraph` + Cedar + Vault.
|
||||
//!
|
||||
//! `PhaseOrchestrator` listens for syntaxis phase transitions and runs the
|
||||
//! corresponding `ActionGraph` pipeline via `stratum-orchestrator`.
|
||||
|
||||
use std::{path::PathBuf, sync::Arc};
|
||||
|
||||
use anyhow::Result;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use stratum_graph::ActionGraph;
|
||||
use stratum_orchestrator::{
|
||||
auth::{CedarAuthorizer, VaultClient, VaultConfig},
|
||||
context::PipelineContext,
|
||||
executor::NuExecutor,
|
||||
graph::load_graph_from_dir,
|
||||
runner::StageRunner,
|
||||
};
|
||||
use stratum_state::{InMemoryStateTracker, PipelineStatus, StateTracker};
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::info;
|
||||
|
||||
/// Configuration for `PhaseOrchestrator`.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct OrchestratorConfig {
|
||||
/// Directory containing NuShell step scripts.
|
||||
pub scripts_base: PathBuf,
|
||||
/// Directory containing `.ncl` ActionGraph node definitions.
|
||||
pub graphs_dir: PathBuf,
|
||||
/// Directory containing capability `.ncl` schema files (used by `PipelineContext`).
|
||||
pub schema_dir: PathBuf,
|
||||
/// Enforce Cedar authorization on every pipeline node execution.
|
||||
pub cedar_required: bool,
|
||||
/// Directory containing `.cedar` policy files.
|
||||
///
|
||||
/// Required only when `cedar_required = true`. When `cedar_required = false`
|
||||
/// a permissive allow-all policy is written to a temp directory automatically.
|
||||
pub cedar_policy_dir: Option<PathBuf>,
|
||||
/// HashiCorp Vault URL for credential injection.
|
||||
///
|
||||
/// If empty and `cedar_required = false`, the Vault client is never invoked
|
||||
/// because no nodes are dispatched without a defined graph.
|
||||
#[serde(default)]
|
||||
pub vault_url: String,
|
||||
/// Vault token.
|
||||
#[serde(default)]
|
||||
pub vault_token: String,
|
||||
}
|
||||
|
||||
impl OrchestratorConfig {
|
||||
/// In-process default for tests: ephemeral directories, no Cedar, no Vault calls.
|
||||
pub fn for_testing() -> Self {
|
||||
Self {
|
||||
scripts_base: std::env::temp_dir().join("syntaxis-scripts"),
|
||||
graphs_dir: std::env::temp_dir().join("syntaxis-graphs"),
|
||||
schema_dir: std::env::temp_dir().join("syntaxis-schema"),
|
||||
cedar_required: false,
|
||||
cedar_policy_dir: None,
|
||||
vault_url: String::new(),
|
||||
vault_token: String::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Orchestrator that maps syntaxis phase transitions to `StageRunner` pipelines.
|
||||
pub struct PhaseOrchestrator {
|
||||
runner: StageRunner,
|
||||
schema_dir: PathBuf,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for PhaseOrchestrator {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("PhaseOrchestrator")
|
||||
.field("schema_dir", &self.schema_dir)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl PhaseOrchestrator {
|
||||
/// Construct from config, building all internal components.
|
||||
///
|
||||
/// When `cedar_required = false` and `cedar_policy_dir` is absent, a
|
||||
/// permissive allow-all `.cedar` policy is written to a temp directory so
|
||||
/// `CedarAuthorizer::load_from_dir` can succeed without a real policy set.
|
||||
pub async fn from_config(
|
||||
cfg: &OrchestratorConfig,
|
||||
state: Arc<dyn StateTracker>,
|
||||
) -> Result<Self> {
|
||||
// Load ActionGraph (empty dir produces an empty graph with a warning).
|
||||
let graph = if cfg.graphs_dir.exists() {
|
||||
load_graph_from_dir(&cfg.graphs_dir)?
|
||||
} else {
|
||||
ActionGraph::from_nodes(vec![])?
|
||||
};
|
||||
|
||||
// Build Cedar authorizer.
|
||||
let cedar_dir = match &cfg.cedar_policy_dir {
|
||||
Some(dir) => dir.clone(),
|
||||
None => {
|
||||
// Write a permissive allow-all policy to a stable temp path.
|
||||
let tmp = std::env::temp_dir().join("syntaxis-cedar-permissive");
|
||||
std::fs::create_dir_all(&tmp)?;
|
||||
let policy_path = tmp.join("allow-all.cedar");
|
||||
std::fs::write(
|
||||
&policy_path,
|
||||
r#"permit(principal, action, resource);"#,
|
||||
)?;
|
||||
tmp
|
||||
}
|
||||
};
|
||||
|
||||
let cedar = CedarAuthorizer::load_from_dir(&cedar_dir)?;
|
||||
|
||||
let vault = VaultClient::new(VaultConfig {
|
||||
url: cfg.vault_url.clone(),
|
||||
token: cfg.vault_token.clone(),
|
||||
});
|
||||
|
||||
// Ensure script base dir exists so NuExecutor can locate scripts.
|
||||
std::fs::create_dir_all(&cfg.scripts_base)?;
|
||||
std::fs::create_dir_all(&cfg.schema_dir)?;
|
||||
|
||||
let runner = StageRunner {
|
||||
graph: Arc::new(RwLock::new(graph)),
|
||||
executor: Arc::new(NuExecutor::new(cfg.scripts_base.clone())),
|
||||
state,
|
||||
auth: Arc::new(cedar),
|
||||
vault: Arc::new(vault),
|
||||
cedar_required: cfg.cedar_required,
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
runner,
|
||||
schema_dir: cfg.schema_dir.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Build with an `InMemoryStateTracker` — intended for tests.
|
||||
pub async fn for_testing() -> Result<Self> {
|
||||
let cfg = OrchestratorConfig::for_testing();
|
||||
let state = Arc::new(InMemoryStateTracker::new()) as Arc<dyn StateTracker>;
|
||||
Self::from_config(&cfg, state).await
|
||||
}
|
||||
|
||||
/// Called on every successful syntaxis phase transition.
|
||||
///
|
||||
/// Publishes to subject `syntaxis.phase.<from>.<to>`, deposits `project_id`
|
||||
/// as the trigger payload, and runs the matching pipeline.
|
||||
///
|
||||
/// Returns `PipelineStatus::Success` when there are no graph nodes to
|
||||
/// execute (empty or unmatched graph).
|
||||
pub async fn on_phase_transition(
|
||||
&self,
|
||||
project_id: &str,
|
||||
from_phase: &str,
|
||||
to_phase: &str,
|
||||
) -> Result<PipelineStatus> {
|
||||
let subject = format!("syntaxis.phase.{from_phase}.{to_phase}");
|
||||
let payload = serde_json::json!({ "project_id": project_id });
|
||||
|
||||
info!(
|
||||
project_id,
|
||||
from_phase,
|
||||
to_phase,
|
||||
subject = %subject,
|
||||
"phase transition — dispatching pipeline"
|
||||
);
|
||||
|
||||
let ctx = Arc::new(
|
||||
PipelineContext::new(
|
||||
subject,
|
||||
payload,
|
||||
Arc::clone(&self.runner.state),
|
||||
self.schema_dir.clone(),
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
||||
self.runner.run_pipeline(ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use stratum_state::PipelineStatus;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_orchestrator_empty_graph_returns_success() {
|
||||
let orch = PhaseOrchestrator::for_testing()
|
||||
.await
|
||||
.expect("orchestrator construction");
|
||||
|
||||
let status = orch
|
||||
.on_phase_transition("proj-001", "create", "devel")
|
||||
.await
|
||||
.expect("phase transition");
|
||||
|
||||
assert_eq!(status, PipelineStatus::Success);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_orchestrator_custom_state_tracker() {
|
||||
let cfg = OrchestratorConfig::for_testing();
|
||||
let state = Arc::new(InMemoryStateTracker::new()) as Arc<dyn StateTracker>;
|
||||
let orch = PhaseOrchestrator::from_config(&cfg, Arc::clone(&state))
|
||||
.await
|
||||
.expect("orchestrator from config");
|
||||
|
||||
let status = orch
|
||||
.on_phase_transition("proj-002", "devel", "publish")
|
||||
.await
|
||||
.expect("phase transition");
|
||||
|
||||
assert_eq!(status, PipelineStatus::Success);
|
||||
}
|
||||
}
|
||||
@ -42,7 +42,7 @@ default = "sqlite"
|
||||
type = "sqlite"
|
||||
name = "SQLite"
|
||||
description = "File-based, no server"
|
||||
platforms = [linux, macos, windows]
|
||||
platforms = ["linux", "macos", "windows"]
|
||||
|
||||
# Installation steps/checklist
|
||||
[checklist]
|
||||
|
||||
@ -17,7 +17,9 @@ path = "examples/config_discovery.rs"
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
serde = { version = "1.0", features = ["derive"], optional = true }
|
||||
serde_json = { version = "1.0", optional = true }
|
||||
toml = { version = "0.9", optional = true }
|
||||
tokio = { version = "1", features = ["process", "io-util"], optional = true }
|
||||
dirs = "6.0"
|
||||
inquire = { version = "0.9", optional = true }
|
||||
|
||||
@ -29,6 +31,7 @@ default = ["config-discovery", "manifest"]
|
||||
config-discovery = []
|
||||
manifest = ["serde", "toml"]
|
||||
interactive = ["inquire"]
|
||||
nickel = ["dep:tokio", "dep:serde_json", "serde"]
|
||||
|
||||
[[example]]
|
||||
name = "manifest_usage"
|
||||
|
||||
@ -73,6 +73,9 @@ pub mod xdg;
|
||||
#[cfg(feature = "manifest")]
|
||||
pub mod manifest_manager;
|
||||
|
||||
#[cfg(feature = "nickel")]
|
||||
pub mod nickel;
|
||||
|
||||
// Re-export commonly used items from config_finder
|
||||
pub use config_finder::{
|
||||
find_config_path, find_config_path_or, find_config_path_warn_conflicts,
|
||||
|
||||
118
shared/rust/nickel.rs
Normal file
118
shared/rust/nickel.rs
Normal file
@ -0,0 +1,118 @@
|
||||
//! Nickel configuration loader.
|
||||
//!
|
||||
//! Evaluates `.ncl` files to JSON via the `nickel export` CLI.
|
||||
//! Optionally resolves OCI imports via `ncl-import-resolver` when a
|
||||
//! `resolver-manifest.json` file is present alongside the config.
|
||||
//!
|
||||
//! # Requirements
|
||||
//!
|
||||
//! - `nickel` must be on `PATH`.
|
||||
//! - `ncl-import-resolver` must be on `PATH` when OCI imports are used.
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use serde::de::DeserializeOwned;
|
||||
use tokio::process::Command;
|
||||
|
||||
/// Evaluate a Nickel configuration file and deserialize the result into `T`.
|
||||
///
|
||||
/// Steps:
|
||||
/// 1. If a `resolver-manifest.json` exists beside the `.ncl` file, run
|
||||
/// `ncl-import-resolver <manifest>` to resolve OCI imports.
|
||||
/// 2. Run `nickel export --format json <ncl_path>` and capture stdout.
|
||||
/// 3. Deserialize the JSON output via `serde_json`.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error if:
|
||||
/// - `ncl-import-resolver` exits non-zero (when `resolver-manifest.json` exists).
|
||||
/// - `nickel export` exits non-zero or cannot be found.
|
||||
/// - The JSON output cannot be deserialized into `T`.
|
||||
pub async fn load_nickel_config<T: DeserializeOwned>(ncl_path: &Path) -> anyhow::Result<T> {
|
||||
// Resolve OCI imports if a manifest is co-located with the config file.
|
||||
if let Some(parent) = ncl_path.parent() {
|
||||
let manifest = parent.join("resolver-manifest.json");
|
||||
if manifest.exists() {
|
||||
let status = Command::new("ncl-import-resolver")
|
||||
.arg(&manifest)
|
||||
.status()
|
||||
.await
|
||||
.context("launching ncl-import-resolver")?;
|
||||
|
||||
if !status.success() {
|
||||
return Err(anyhow!(
|
||||
"ncl-import-resolver failed (exit {:?}) on '{}'",
|
||||
status.code(),
|
||||
manifest.display()
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Export the Nickel config to JSON.
|
||||
let out = Command::new("nickel")
|
||||
.args(["export", "--format", "json"])
|
||||
.arg(ncl_path)
|
||||
.output()
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"launching `nickel export` on '{}'",
|
||||
ncl_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
if !out.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&out.stderr);
|
||||
return Err(anyhow!(
|
||||
"`nickel export` failed on '{}': {}",
|
||||
ncl_path.display(),
|
||||
stderr.trim()
|
||||
));
|
||||
}
|
||||
|
||||
serde_json::from_slice::<T>(&out.stdout).with_context(|| {
|
||||
format!(
|
||||
"deserializing Nickel output from '{}' into {}",
|
||||
ncl_path.display(),
|
||||
std::any::type_name::<T>()
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use serde::Deserialize;
|
||||
use std::io::Write;
|
||||
|
||||
#[derive(Debug, Deserialize, PartialEq)]
|
||||
struct Simple {
|
||||
name: String,
|
||||
value: u32,
|
||||
}
|
||||
|
||||
/// Verifies the error path when `nickel` is not installed or the file
|
||||
/// doesn't exist (CI without nickel will hit this branch).
|
||||
#[tokio::test]
|
||||
async fn test_load_nickel_config_missing_file() {
|
||||
let result = load_nickel_config::<Simple>(Path::new("/nonexistent/path.ncl")).await;
|
||||
assert!(
|
||||
result.is_err(),
|
||||
"expected error for non-existent .ncl file"
|
||||
);
|
||||
}
|
||||
|
||||
/// Verifies resolver-manifest detection: if no manifest exists alongside the
|
||||
/// config file, resolver is skipped (only nickel export runs).
|
||||
#[tokio::test]
|
||||
async fn test_load_nickel_config_no_manifest_skips_resolver() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let ncl_path = dir.path().join("config.ncl");
|
||||
std::fs::write(&ncl_path, r#"{ name = "test", value = 42 }"#).unwrap();
|
||||
|
||||
// nickel may or may not be installed; either way, no manifest → no resolver invoked.
|
||||
let _ = load_nickel_config::<Simple>(&ncl_path).await;
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user