925 lines
30 KiB
Rust
925 lines
30 KiB
Rust
use std::net::SocketAddr;
|
|
use std::path::PathBuf;
|
|
use std::process::Command;
|
|
use std::sync::atomic::{AtomicU64, Ordering};
|
|
use std::sync::Arc;
|
|
use std::time::{Instant, SystemTime, UNIX_EPOCH};
|
|
|
|
use clap::Parser;
|
|
use ontoref_daemon::actors::ActorRegistry;
|
|
use ontoref_daemon::api::{self, AppState};
|
|
use ontoref_daemon::cache::NclCache;
|
|
use ontoref_daemon::notifications::NotificationStore;
|
|
use ontoref_daemon::watcher::{FileWatcher, WatcherDeps};
|
|
use tokio::net::TcpListener;
|
|
use tokio::sync::watch;
|
|
use tower_http::trace::TraceLayer;
|
|
use tracing::{error, info, warn};
|
|
|
|
/// Load daemon config from .ontoref/config.ncl and override CLI defaults.
|
|
/// Returns the resolved NICKEL_IMPORT_PATH from config (colon-separated).
|
|
fn load_config_overrides(cli: &mut Cli) -> Option<String> {
|
|
let config_path = cli.project_root.join(".ontoref").join("config.ncl");
|
|
if !config_path.exists() {
|
|
return None;
|
|
}
|
|
|
|
let output = match Command::new("nickel")
|
|
.arg("export")
|
|
.arg(&config_path)
|
|
.output()
|
|
{
|
|
Ok(o) => o,
|
|
Err(e) => {
|
|
warn!(error = %e, path = %config_path.display(), "failed to read config");
|
|
return None;
|
|
}
|
|
};
|
|
|
|
if !output.status.success() {
|
|
warn!("nickel export failed for config");
|
|
return None;
|
|
}
|
|
|
|
let config_json: serde_json::Value = match serde_json::from_slice(&output.stdout) {
|
|
Ok(v) => v,
|
|
Err(e) => {
|
|
warn!(error = %e, "failed to parse config JSON");
|
|
return None;
|
|
}
|
|
};
|
|
|
|
// Extract daemon config
|
|
if let Some(daemon) = config_json.get("daemon").and_then(|d| d.as_object()) {
|
|
if let Some(port) = daemon.get("port").and_then(|p| p.as_u64()) {
|
|
cli.port = port as u16;
|
|
}
|
|
if let Some(timeout) = daemon.get("idle_timeout").and_then(|t| t.as_u64()) {
|
|
cli.idle_timeout = timeout;
|
|
}
|
|
if let Some(interval) = daemon.get("invalidation_interval").and_then(|i| i.as_u64()) {
|
|
cli.invalidation_interval = interval;
|
|
}
|
|
if let Some(sweep) = daemon.get("actor_sweep_interval").and_then(|s| s.as_u64()) {
|
|
cli.actor_sweep_interval = sweep;
|
|
}
|
|
if let Some(stale) = daemon.get("actor_stale_timeout").and_then(|s| s.as_u64()) {
|
|
cli.actor_stale_timeout = stale;
|
|
}
|
|
if let Some(max) = daemon.get("max_notifications").and_then(|m| m.as_u64()) {
|
|
cli.max_notifications = max as usize;
|
|
}
|
|
if let Some(ack_dirs) = daemon
|
|
.get("notification_ack_required")
|
|
.and_then(|a| a.as_array())
|
|
{
|
|
cli.notification_ack_required = ack_dirs
|
|
.iter()
|
|
.filter_map(|v| v.as_str().map(String::from))
|
|
.collect();
|
|
}
|
|
}
|
|
|
|
// Extract db config (if db feature enabled)
|
|
#[cfg(feature = "db")]
|
|
{
|
|
if let Some(db) = config_json.get("db").and_then(|d| d.as_object()) {
|
|
if let Some(url) = db.get("url").and_then(|u| u.as_str()) {
|
|
if !url.is_empty() {
|
|
cli.db_url = Some(url.to_string());
|
|
}
|
|
}
|
|
if let Some(ns) = db.get("namespace").and_then(|n| n.as_str()) {
|
|
if !ns.is_empty() {
|
|
cli.db_namespace = Some(ns.to_string());
|
|
}
|
|
}
|
|
if let Some(user) = db.get("username").and_then(|u| u.as_str()) {
|
|
if !user.is_empty() {
|
|
cli.db_username = user.to_string();
|
|
}
|
|
}
|
|
if let Some(pass) = db.get("password").and_then(|p| p.as_str()) {
|
|
if !pass.is_empty() {
|
|
cli.db_password = pass.to_string();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Env var overrides for DB credentials — not persisted to disk.
|
|
#[cfg(feature = "db")]
|
|
{
|
|
if let Ok(user) = std::env::var("ONTOREF_DB_USERNAME") {
|
|
if !user.is_empty() {
|
|
cli.db_username = user;
|
|
}
|
|
}
|
|
if let Ok(pass) = std::env::var("ONTOREF_DB_PASSWORD") {
|
|
if !pass.is_empty() {
|
|
cli.db_password = pass;
|
|
}
|
|
}
|
|
}
|
|
|
|
// UI config section — only populates fields not already set via CLI.
|
|
#[cfg(feature = "ui")]
|
|
apply_ui_config(cli, &config_json);
|
|
|
|
info!("config loaded from {}", config_path.display());
|
|
|
|
let import_path = config_json
|
|
.get("nickel_import_paths")
|
|
.and_then(|v| v.as_array())
|
|
.map(|arr| {
|
|
arr.iter()
|
|
.filter_map(|v| v.as_str())
|
|
.collect::<Vec<_>>()
|
|
.join(":")
|
|
})
|
|
.filter(|s| !s.is_empty());
|
|
|
|
import_path
|
|
}
|
|
|
|
#[derive(Parser)]
|
|
#[command(name = "ontoref-daemon", about = "Ontoref cache daemon")]
|
|
struct Cli {
|
|
/// Project root directory (where .ontoref/config.ncl lives)
|
|
#[arg(long, default_value = ".")]
|
|
project_root: PathBuf,
|
|
|
|
/// Stratumiops root directory (for shared schemas/modules)
|
|
#[arg(long)]
|
|
ontoref_root: Option<PathBuf>,
|
|
|
|
/// HTTP listen port (overridden by config if present)
|
|
#[arg(long, default_value_t = 7891)]
|
|
port: u16,
|
|
|
|
/// Seconds of inactivity before auto-shutdown (overridden by config)
|
|
#[arg(long, default_value_t = 1800)]
|
|
idle_timeout: u64,
|
|
|
|
/// Full cache invalidation interval in seconds (overridden by config)
|
|
#[arg(long, default_value_t = 60)]
|
|
invalidation_interval: u64,
|
|
|
|
/// PID file path
|
|
#[arg(long)]
|
|
pid_file: Option<PathBuf>,
|
|
|
|
/// Actor sweep interval in seconds (reap stale sessions)
|
|
#[arg(long, default_value_t = 30)]
|
|
actor_sweep_interval: u64,
|
|
|
|
/// Seconds before a remote actor (no `kill -0` check) is considered stale
|
|
#[arg(long, default_value_t = 120)]
|
|
actor_stale_timeout: u64,
|
|
|
|
/// Maximum notifications to retain per project (ring buffer)
|
|
#[arg(long, default_value_t = 1000)]
|
|
max_notifications: usize,
|
|
|
|
/// Directories requiring notification acknowledgment before commit
|
|
#[arg(long, value_delimiter = ',')]
|
|
notification_ack_required: Vec<String>,
|
|
|
|
/// Directory containing Tera HTML templates for the web UI
|
|
#[cfg(feature = "ui")]
|
|
#[arg(long)]
|
|
templates_dir: Option<PathBuf>,
|
|
|
|
/// Directory to serve as /public (CSS, JS assets)
|
|
#[cfg(feature = "ui")]
|
|
#[arg(long)]
|
|
public_dir: Option<PathBuf>,
|
|
|
|
/// Path to registry.toml for multi-project mode
|
|
#[cfg(feature = "ui")]
|
|
#[arg(long)]
|
|
registry: Option<PathBuf>,
|
|
|
|
/// Hash a password with argon2id and print the PHC string, then exit
|
|
#[cfg(feature = "ui")]
|
|
#[arg(long, value_name = "PASSWORD")]
|
|
hash_password: Option<String>,
|
|
|
|
/// Run as an MCP server over stdin/stdout (for Claude Desktop, Cursor,
|
|
/// etc.). No HTTP server is started in this mode.
|
|
#[cfg(feature = "mcp")]
|
|
#[arg(long)]
|
|
mcp_stdio: bool,
|
|
|
|
/// TLS certificate file (PEM). Enables HTTPS when combined with --tls-key
|
|
#[cfg(feature = "tls")]
|
|
#[arg(long)]
|
|
tls_cert: Option<PathBuf>,
|
|
|
|
/// TLS private key file (PEM). Enables HTTPS when combined with --tls-cert
|
|
#[cfg(feature = "tls")]
|
|
#[arg(long)]
|
|
tls_key: Option<PathBuf>,
|
|
|
|
/// SurrealDB remote WebSocket URL (e.g., ws://127.0.0.1:8000)
|
|
#[cfg(feature = "db")]
|
|
#[arg(long)]
|
|
db_url: Option<String>,
|
|
|
|
/// SurrealDB namespace for this daemon instance
|
|
#[cfg(feature = "db")]
|
|
#[arg(long)]
|
|
db_namespace: Option<String>,
|
|
|
|
/// SurrealDB username
|
|
#[cfg(feature = "db")]
|
|
#[arg(long, default_value = "root")]
|
|
db_username: String,
|
|
|
|
/// SurrealDB password
|
|
#[cfg(feature = "db")]
|
|
#[arg(long, default_value = "root")]
|
|
db_password: String,
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() {
|
|
// Parse CLI first so we can redirect logs to stderr in stdio MCP mode.
|
|
// In stdio mode stdout is the MCP JSON-RPC transport; any log line there
|
|
// corrupts the framing and the client silently drops or errors.
|
|
let mut cli = Cli::parse();
|
|
|
|
#[cfg(feature = "mcp")]
|
|
let use_stderr = cli.mcp_stdio;
|
|
#[cfg(not(feature = "mcp"))]
|
|
let use_stderr = false;
|
|
|
|
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
|
|
.unwrap_or_else(|_| "ontoref_daemon=info,tower_http=debug".into());
|
|
|
|
if use_stderr {
|
|
tracing_subscriber::fmt()
|
|
.with_env_filter(env_filter)
|
|
.with_writer(std::io::stderr)
|
|
.init();
|
|
} else {
|
|
tracing_subscriber::fmt().with_env_filter(env_filter).init();
|
|
}
|
|
|
|
#[cfg(feature = "ui")]
|
|
if let Some(ref password) = cli.hash_password {
|
|
use argon2::{
|
|
password_hash::{rand_core::OsRng, PasswordHasher, SaltString},
|
|
Argon2,
|
|
};
|
|
let salt = SaltString::generate(&mut OsRng);
|
|
let hash = Argon2::default()
|
|
.hash_password(password.as_bytes(), &salt)
|
|
.expect("argon2 hash failed")
|
|
.to_string();
|
|
println!("{hash}");
|
|
return;
|
|
}
|
|
|
|
// Read config from project's .ontoref/config.ncl and override CLI defaults
|
|
let nickel_import_path = load_config_overrides(&mut cli);
|
|
|
|
let project_root = match cli.project_root.canonicalize() {
|
|
Ok(p) if p.is_dir() => p,
|
|
Ok(p) => {
|
|
error!(
|
|
path = %p.display(),
|
|
"project_root is not a directory — aborting"
|
|
);
|
|
std::process::exit(1);
|
|
}
|
|
Err(e) => {
|
|
error!(
|
|
path = %cli.project_root.display(),
|
|
error = %e,
|
|
"project_root does not exist or is inaccessible — aborting"
|
|
);
|
|
std::process::exit(1);
|
|
}
|
|
};
|
|
|
|
info!(
|
|
project_root = %project_root.display(),
|
|
port = cli.port,
|
|
idle_timeout = cli.idle_timeout,
|
|
"starting ontoref-daemon"
|
|
);
|
|
|
|
let cache = Arc::new(NclCache::new());
|
|
|
|
#[cfg(feature = "ui")]
|
|
let registry: Option<Arc<ontoref_daemon::registry::ProjectRegistry>> = {
|
|
let reg_path: Option<std::path::PathBuf> = cli.registry.clone().or_else(|| {
|
|
let candidate = project_root.join(".ontoref").join("registry.toml");
|
|
if candidate.exists() {
|
|
info!(path = %candidate.display(), "auto-discovered registry.toml");
|
|
Some(candidate)
|
|
} else {
|
|
None
|
|
}
|
|
});
|
|
|
|
if let Some(ref reg_path) = reg_path {
|
|
match ontoref_daemon::registry::ProjectRegistry::load(
|
|
reg_path,
|
|
cli.actor_stale_timeout,
|
|
cli.max_notifications,
|
|
) {
|
|
Ok(r) => {
|
|
info!(path = %reg_path.display(), projects = r.count(), "registry loaded");
|
|
Some(Arc::new(r))
|
|
}
|
|
Err(e) => {
|
|
error!(error = %e, path = %reg_path.display(), "failed to load registry — aborting");
|
|
std::process::exit(1);
|
|
}
|
|
}
|
|
} else {
|
|
None
|
|
}
|
|
};
|
|
|
|
#[cfg(feature = "ui")]
|
|
let sessions = Arc::new(ontoref_daemon::session::SessionStore::new());
|
|
|
|
let actors = Arc::new(ActorRegistry::new(cli.actor_stale_timeout));
|
|
|
|
// Initialize Tera template engine from the configured templates directory.
|
|
#[cfg(feature = "ui")]
|
|
let tera_instance: Option<Arc<tokio::sync::RwLock<tera::Tera>>> = {
|
|
if let Some(ref tdir) = cli.templates_dir {
|
|
let glob = format!("{}/**/*.html", tdir.display());
|
|
match tera::Tera::new(&glob) {
|
|
Ok(t) => {
|
|
info!(templates_dir = %tdir.display(), "Tera templates loaded");
|
|
Some(Arc::new(tokio::sync::RwLock::new(t)))
|
|
}
|
|
Err(e) => {
|
|
warn!(error = %e, templates_dir = %tdir.display(), "Tera init failed — UI disabled");
|
|
None
|
|
}
|
|
}
|
|
} else {
|
|
info!("--templates-dir not set — web UI disabled");
|
|
None
|
|
}
|
|
};
|
|
|
|
// Notification store with configurable capacity and ack requirements
|
|
let ack_required = if cli.notification_ack_required.is_empty() {
|
|
vec![".ontology".to_string(), "adrs".to_string()]
|
|
} else {
|
|
cli.notification_ack_required.clone()
|
|
};
|
|
let notifications = Arc::new(NotificationStore::new(cli.max_notifications, ack_required));
|
|
|
|
// Optional DB connection with health check
|
|
#[cfg(feature = "db")]
|
|
let db = {
|
|
if cli.db_url.is_some() {
|
|
info!(url = %cli.db_url.as_deref().unwrap_or(""), "connecting to SurrealDB...");
|
|
connect_db(&cli).await
|
|
} else {
|
|
info!("SurrealDB not configured — running cache-only");
|
|
None
|
|
}
|
|
};
|
|
|
|
// Seed ontology tables from local NCL files → DB projection.
|
|
#[cfg(feature = "db")]
|
|
{
|
|
if let Some(ref db) = db {
|
|
info!("seeding ontology tables from local files...");
|
|
ontoref_daemon::seed::seed_ontology(
|
|
db,
|
|
&project_root,
|
|
&cache,
|
|
nickel_import_path.as_deref(),
|
|
)
|
|
.await;
|
|
}
|
|
}
|
|
|
|
// Initialize NATS publisher
|
|
#[cfg(feature = "nats")]
|
|
info!("connecting to NATS...");
|
|
#[cfg(feature = "nats")]
|
|
let nats_publisher = {
|
|
let project_name = project_root
|
|
.file_name()
|
|
.and_then(|n| n.to_str())
|
|
.unwrap_or("unknown")
|
|
.to_string();
|
|
match ontoref_daemon::nats::NatsPublisher::connect(
|
|
&project_root.join(".ontoref").join("config.ncl"),
|
|
project_name,
|
|
cli.port,
|
|
)
|
|
.await
|
|
{
|
|
Ok(Some(pub_)) => {
|
|
info!("NATS publisher initialized");
|
|
Some(Arc::new(pub_))
|
|
}
|
|
Ok(None) => {
|
|
info!("NATS disabled or unavailable");
|
|
None
|
|
}
|
|
Err(e) => {
|
|
warn!(error = %e, "NATS initialization failed");
|
|
None
|
|
}
|
|
}
|
|
};
|
|
|
|
// Start file watcher — after DB so it can re-seed on changes
|
|
let watcher_deps = WatcherDeps {
|
|
#[cfg(feature = "db")]
|
|
db: db.clone(),
|
|
import_path: nickel_import_path.clone(),
|
|
notifications: Arc::clone(¬ifications),
|
|
actors: Arc::clone(&actors),
|
|
#[cfg(feature = "nats")]
|
|
nats: nats_publisher.clone(),
|
|
};
|
|
let _watcher = match FileWatcher::start(
|
|
&project_root,
|
|
Arc::clone(&cache),
|
|
cli.invalidation_interval,
|
|
watcher_deps,
|
|
) {
|
|
Ok(w) => Some(w),
|
|
Err(e) => {
|
|
error!(error = %e, "file watcher failed to start — running without auto-invalidation");
|
|
None
|
|
}
|
|
};
|
|
|
|
let epoch_secs = SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.unwrap_or_default()
|
|
.as_secs();
|
|
let last_activity = Arc::new(AtomicU64::new(epoch_secs));
|
|
|
|
// Capture display values before they are moved into AppState.
|
|
#[cfg(feature = "ui")]
|
|
let project_root_str = project_root.display().to_string();
|
|
#[cfg(feature = "ui")]
|
|
let ui_startup: Option<(String, String)> = cli.templates_dir.as_ref().map(|tdir| {
|
|
let public = cli
|
|
.public_dir
|
|
.as_ref()
|
|
.map(|p| p.display().to_string())
|
|
.unwrap_or_else(|| "—".to_string());
|
|
(tdir.display().to_string(), public)
|
|
});
|
|
|
|
let state = {
|
|
#[cfg(feature = "nats")]
|
|
{
|
|
AppState {
|
|
cache,
|
|
project_root,
|
|
ontoref_root: cli.ontoref_root,
|
|
started_at: Instant::now(),
|
|
last_activity: Arc::clone(&last_activity),
|
|
actors: Arc::clone(&actors),
|
|
notifications: Arc::clone(¬ifications),
|
|
nickel_import_path: nickel_import_path.clone(),
|
|
#[cfg(feature = "db")]
|
|
db,
|
|
nats: nats_publisher.clone(),
|
|
#[cfg(feature = "ui")]
|
|
tera: tera_instance,
|
|
#[cfg(feature = "ui")]
|
|
public_dir: cli.public_dir,
|
|
#[cfg(feature = "ui")]
|
|
registry: registry.clone(),
|
|
#[cfg(feature = "ui")]
|
|
sessions: Arc::clone(&sessions),
|
|
#[cfg(feature = "mcp")]
|
|
mcp_current_project: Arc::new(std::sync::RwLock::new(None)),
|
|
}
|
|
}
|
|
#[cfg(not(feature = "nats"))]
|
|
{
|
|
AppState {
|
|
cache,
|
|
project_root,
|
|
ontoref_root: cli.ontoref_root,
|
|
started_at: Instant::now(),
|
|
last_activity: Arc::clone(&last_activity),
|
|
actors: Arc::clone(&actors),
|
|
notifications: Arc::clone(¬ifications),
|
|
nickel_import_path: nickel_import_path.clone(),
|
|
#[cfg(feature = "db")]
|
|
db,
|
|
#[cfg(feature = "ui")]
|
|
tera: tera_instance,
|
|
#[cfg(feature = "ui")]
|
|
public_dir: cli.public_dir,
|
|
#[cfg(feature = "ui")]
|
|
registry: registry.clone(),
|
|
#[cfg(feature = "ui")]
|
|
sessions: Arc::clone(&sessions),
|
|
#[cfg(feature = "mcp")]
|
|
mcp_current_project: Arc::new(std::sync::RwLock::new(None)),
|
|
}
|
|
}
|
|
};
|
|
|
|
// Start template hot-reload watcher if templates dir is configured.
|
|
#[cfg(feature = "ui")]
|
|
let _template_watcher = {
|
|
if let (Some(ref tdir), Some(ref tera)) = (&cli.templates_dir, &state.tera) {
|
|
match ontoref_daemon::ui::TemplateWatcher::start(tdir, Arc::clone(tera)) {
|
|
Ok(w) => Some(w),
|
|
Err(e) => {
|
|
warn!(error = %e, "template watcher failed to start — hot-reload disabled");
|
|
None
|
|
}
|
|
}
|
|
} else {
|
|
None
|
|
}
|
|
};
|
|
|
|
// Start passive drift observer (scan+diff, no apply).
|
|
#[cfg(feature = "ui")]
|
|
let _drift_watcher = {
|
|
let project_name = state.default_project_name();
|
|
let notif_store = Arc::clone(&state.notifications);
|
|
match ontoref_daemon::ui::DriftWatcher::start(
|
|
&state.project_root,
|
|
project_name,
|
|
notif_store,
|
|
) {
|
|
Ok(w) => {
|
|
info!("drift watcher started");
|
|
Some(w)
|
|
}
|
|
Err(e) => {
|
|
warn!(error = %e, "drift watcher failed to start");
|
|
None
|
|
}
|
|
}
|
|
};
|
|
|
|
// MCP stdio mode — skips HTTP entirely; serves stdin/stdout to AI client.
|
|
#[cfg(feature = "mcp")]
|
|
if cli.mcp_stdio {
|
|
if let Err(e) = ontoref_daemon::mcp::serve_stdio(state).await {
|
|
error!(error = %e, "MCP stdio server error");
|
|
std::process::exit(1);
|
|
}
|
|
return;
|
|
}
|
|
|
|
let app = api::router(state).layer(TraceLayer::new_for_http());
|
|
|
|
let addr = SocketAddr::from(([127, 0, 0, 1], cli.port));
|
|
let listener = TcpListener::bind(addr).await.unwrap_or_else(|e| {
|
|
error!(addr = %addr, error = %e, "failed to bind");
|
|
std::process::exit(1);
|
|
});
|
|
|
|
// Write PID file only after successful bind
|
|
if let Some(ref pid_path) = cli.pid_file {
|
|
if let Err(e) = write_pid_file(pid_path) {
|
|
error!(path = %pid_path.display(), error = %e, "failed to write PID file");
|
|
}
|
|
}
|
|
|
|
info!(addr = %addr, "listening");
|
|
|
|
#[cfg(feature = "ui")]
|
|
if let Some((ref tdir, ref public)) = ui_startup {
|
|
#[cfg(feature = "tls")]
|
|
let scheme = if cli.tls_cert.is_some() && cli.tls_key.is_some() {
|
|
"https"
|
|
} else {
|
|
"http"
|
|
};
|
|
#[cfg(not(feature = "tls"))]
|
|
let scheme = "http";
|
|
info!(
|
|
url = %format!("{scheme}://{addr}/ui/"),
|
|
project_root = %project_root_str,
|
|
templates_dir = %tdir,
|
|
public_dir = %public,
|
|
"web UI available"
|
|
);
|
|
}
|
|
|
|
// Publish daemon.started event
|
|
#[cfg(feature = "nats")]
|
|
{
|
|
if let Some(ref nats) = nats_publisher {
|
|
if let Err(e) = nats.publish_started().await {
|
|
warn!(error = %e, "failed to publish daemon.started event");
|
|
}
|
|
}
|
|
}
|
|
|
|
// Spawn NATS event polling handler if enabled
|
|
#[cfg(feature = "nats")]
|
|
let _nats_handler = if let Some(ref nats) = nats_publisher {
|
|
let nats_clone = Arc::clone(nats);
|
|
let handle = tokio::spawn(async move {
|
|
handle_nats_events(nats_clone).await;
|
|
});
|
|
Some(handle)
|
|
} else {
|
|
None
|
|
};
|
|
|
|
// Spawn actor sweep task — reaps stale sessions periodically
|
|
let sweep_actors = Arc::clone(&actors);
|
|
#[cfg(feature = "nats")]
|
|
let sweep_nats = nats_publisher.clone();
|
|
let sweep_interval = cli.actor_sweep_interval;
|
|
let _sweep_task = tokio::spawn(async move {
|
|
actor_sweep_loop(
|
|
sweep_actors,
|
|
sweep_interval,
|
|
#[cfg(feature = "nats")]
|
|
sweep_nats,
|
|
)
|
|
.await;
|
|
});
|
|
|
|
// Idle timeout: spawn a watchdog that signals shutdown via watch channel.
|
|
let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
|
|
if cli.idle_timeout > 0 {
|
|
let idle_secs = cli.idle_timeout;
|
|
let activity = Arc::clone(&last_activity);
|
|
tokio::spawn(idle_watchdog(activity, idle_secs, shutdown_tx));
|
|
}
|
|
|
|
// TLS serve path — takes priority when cert + key are both configured.
|
|
#[cfg(feature = "tls")]
|
|
if let (Some(cert), Some(key)) = (&cli.tls_cert, &cli.tls_key) {
|
|
let tls_config = match axum_server::tls_rustls::RustlsConfig::from_pem_file(cert, key).await
|
|
{
|
|
Ok(c) => c,
|
|
Err(e) => {
|
|
error!(error = %e, cert = %cert.display(), key = %key.display(),
|
|
"TLS config failed — aborting");
|
|
std::process::exit(1);
|
|
}
|
|
};
|
|
|
|
let handle = axum_server::Handle::new();
|
|
let shutdown_handle = handle.clone();
|
|
let mut tls_rx = shutdown_rx.clone();
|
|
tokio::spawn(async move {
|
|
let _ = tls_rx.wait_for(|&v| v).await;
|
|
shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(30)));
|
|
});
|
|
|
|
let std_listener = listener.into_std().unwrap_or_else(|e| {
|
|
error!(error = %e, "listener conversion failed");
|
|
std::process::exit(1);
|
|
});
|
|
|
|
#[cfg(feature = "nats")]
|
|
let tls_start = Instant::now();
|
|
|
|
if let Err(e) = axum_server::from_tcp_rustls(std_listener, tls_config)
|
|
.handle(handle)
|
|
.serve(app.into_make_service())
|
|
.await
|
|
{
|
|
error!(error = %e, "TLS server error");
|
|
}
|
|
|
|
#[cfg(feature = "nats")]
|
|
if let Some(ref nats) = nats_publisher {
|
|
let _ = nats.publish_stopped(tls_start.elapsed().as_secs()).await;
|
|
}
|
|
|
|
if let Some(ref pid_path) = cli.pid_file {
|
|
let _ = std::fs::remove_file(pid_path);
|
|
}
|
|
return;
|
|
}
|
|
|
|
// Plain HTTP serve path.
|
|
#[cfg(feature = "nats")]
|
|
let startup_instant = Instant::now();
|
|
let graceful = async move {
|
|
let _ = shutdown_rx.wait_for(|&v| v).await;
|
|
};
|
|
|
|
if let Err(e) = axum::serve(listener, app)
|
|
.with_graceful_shutdown(graceful)
|
|
.await
|
|
{
|
|
error!(error = %e, "server error");
|
|
}
|
|
|
|
// Publish daemon.stopped event on graceful shutdown
|
|
#[cfg(feature = "nats")]
|
|
{
|
|
if let Some(ref nats) = nats_publisher {
|
|
let uptime_secs = startup_instant.elapsed().as_secs();
|
|
if let Err(e) = nats.publish_stopped(uptime_secs).await {
|
|
warn!(error = %e, "failed to publish daemon.stopped event");
|
|
}
|
|
}
|
|
}
|
|
|
|
// Cleanup PID file
|
|
if let Some(ref pid_path) = cli.pid_file {
|
|
let _ = std::fs::remove_file(pid_path);
|
|
}
|
|
}
|
|
|
|
async fn idle_watchdog(activity: Arc<AtomicU64>, idle_secs: u64, shutdown: watch::Sender<bool>) {
|
|
let check_interval = std::time::Duration::from_secs(30);
|
|
loop {
|
|
tokio::time::sleep(check_interval).await;
|
|
let last = activity.load(Ordering::Relaxed);
|
|
let now = SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.unwrap_or_default()
|
|
.as_secs();
|
|
let idle = now.saturating_sub(last);
|
|
if idle >= idle_secs {
|
|
info!(idle, idle_secs, "idle timeout reached — shutting down");
|
|
let _ = shutdown.send(true);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Periodic sweep of stale actor sessions.
|
|
/// Local actors: checked via `kill -0 <pid>`. Remote actors: `last_seen`
|
|
/// timeout. Publishes `actor.deregistered` events via NATS for reaped sessions.
|
|
async fn actor_sweep_loop(
|
|
actors: Arc<ActorRegistry>,
|
|
interval_secs: u64,
|
|
#[cfg(feature = "nats")] nats: Option<Arc<ontoref_daemon::nats::NatsPublisher>>,
|
|
) {
|
|
let interval = std::time::Duration::from_secs(interval_secs);
|
|
loop {
|
|
tokio::time::sleep(interval).await;
|
|
let reaped = actors.sweep_stale();
|
|
|
|
#[cfg(feature = "nats")]
|
|
publish_reaped_actors(&nats, &reaped).await;
|
|
|
|
#[cfg(not(feature = "nats"))]
|
|
let _ = reaped;
|
|
}
|
|
}
|
|
|
|
#[cfg(feature = "nats")]
|
|
async fn publish_reaped_actors(
|
|
nats: &Option<Arc<ontoref_daemon::nats::NatsPublisher>>,
|
|
reaped: &[String],
|
|
) {
|
|
let Some(ref nats) = nats else { return };
|
|
for token in reaped {
|
|
if let Err(e) = nats.publish_actor_deregistered(token, "stale_sweep").await {
|
|
warn!(error = %e, token = %token, "failed to publish actor.deregistered");
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(feature = "db")]
|
|
async fn connect_db(cli: &Cli) -> Option<Arc<stratum_db::StratumDb>> {
|
|
let db_url = cli.db_url.as_ref()?;
|
|
let namespace = cli.db_namespace.as_deref().unwrap_or("ontoref");
|
|
|
|
let connect_timeout = std::time::Duration::from_secs(5);
|
|
let db = match tokio::time::timeout(
|
|
connect_timeout,
|
|
stratum_db::StratumDb::connect_remote(
|
|
db_url,
|
|
namespace,
|
|
"daemon",
|
|
&cli.db_username,
|
|
&cli.db_password,
|
|
),
|
|
)
|
|
.await
|
|
{
|
|
Ok(Ok(db)) => db,
|
|
Ok(Err(e)) => {
|
|
error!(error = %e, "SurrealDB connection failed — running without persistence");
|
|
return None;
|
|
}
|
|
Err(_) => {
|
|
error!(url = %db_url, "SurrealDB connection timed out (5s) — running without persistence");
|
|
return None;
|
|
}
|
|
};
|
|
|
|
let health_timeout = std::time::Duration::from_secs(5);
|
|
match tokio::time::timeout(health_timeout, db.health_check()).await {
|
|
Ok(Ok(())) => {
|
|
info!(url = %db_url, namespace = %namespace, "SurrealDB connected and healthy");
|
|
}
|
|
Ok(Err(e)) => {
|
|
error!(error = %e, "SurrealDB health check failed — running without persistence");
|
|
return None;
|
|
}
|
|
Err(_) => {
|
|
error!("SurrealDB health check timed out — running without persistence");
|
|
return None;
|
|
}
|
|
}
|
|
|
|
if let Err(e) = db.initialize_tables().await {
|
|
warn!(error = %e, "table initialization failed — proceeding with cache only");
|
|
return None;
|
|
}
|
|
|
|
info!("Level 1 ontology tables initialized");
|
|
Some(Arc::new(db))
|
|
}
|
|
|
|
#[cfg(feature = "ui")]
|
|
fn apply_ui_config(cli: &mut Cli, config: &serde_json::Value) {
|
|
let Some(ui) = config.get("ui").and_then(|u| u.as_object()) else {
|
|
return;
|
|
};
|
|
if cli.templates_dir.is_none() {
|
|
let dir = ui
|
|
.get("templates_dir")
|
|
.and_then(|d| d.as_str())
|
|
.unwrap_or("");
|
|
if !dir.is_empty() {
|
|
cli.templates_dir = Some(cli.project_root.join(dir));
|
|
}
|
|
}
|
|
if cli.public_dir.is_none() {
|
|
let dir = ui.get("public_dir").and_then(|d| d.as_str()).unwrap_or("");
|
|
if !dir.is_empty() {
|
|
cli.public_dir = Some(cli.project_root.join(dir));
|
|
}
|
|
}
|
|
#[cfg(feature = "tls")]
|
|
{
|
|
if cli.tls_cert.is_none() {
|
|
let p = ui.get("tls_cert").and_then(|d| d.as_str()).unwrap_or("");
|
|
if !p.is_empty() {
|
|
cli.tls_cert = Some(cli.project_root.join(p));
|
|
}
|
|
}
|
|
if cli.tls_key.is_none() {
|
|
let p = ui.get("tls_key").and_then(|d| d.as_str()).unwrap_or("");
|
|
if !p.is_empty() {
|
|
cli.tls_key = Some(cli.project_root.join(p));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn write_pid_file(path: &PathBuf) -> std::io::Result<()> {
|
|
if let Some(parent) = path.parent() {
|
|
std::fs::create_dir_all(parent)?;
|
|
}
|
|
std::fs::write(path, std::process::id().to_string())
|
|
}
|
|
|
|
/// Poll NATS JetStream for incoming events.
|
|
#[cfg(feature = "nats")]
|
|
async fn handle_nats_events(nats: Arc<ontoref_daemon::nats::NatsPublisher>) {
|
|
loop {
|
|
let events = match nats.pull_events(10).await {
|
|
Ok(ev) => ev,
|
|
Err(e) => {
|
|
warn!("NATS poll error: {e}");
|
|
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
|
continue;
|
|
}
|
|
};
|
|
|
|
for (subject, payload) in events {
|
|
dispatch_nats_event(&subject, &payload);
|
|
}
|
|
|
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
|
}
|
|
}
|
|
|
|
#[cfg(feature = "nats")]
|
|
fn dispatch_nats_event(subject: &str, payload: &serde_json::Value) {
|
|
use ontoref_daemon::nats::NatsPublisher;
|
|
|
|
if subject != "ecosystem.reflection.request" {
|
|
return;
|
|
}
|
|
|
|
if let Some((mode_id, _params)) = NatsPublisher::parse_reflection_request(payload) {
|
|
info!(mode_id = %mode_id, "received reflection.request via JetStream");
|
|
}
|
|
}
|