use std::net::IpAddr; use std::path::PathBuf; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; use axum::extract::{Path, Query, State}; use axum::http::StatusCode; use axum::response::IntoResponse; use axum::routing::{delete, get, post, put}; use axum::Json; use dashmap::DashMap; use serde::{Deserialize, Serialize}; use tracing::{error, warn}; use crate::actors::{ActorRegistry, ActorSessionView, RegisterRequest}; use crate::cache::NclCache; use crate::notifications::{AckRequest, NotificationStore, NotificationView}; use crate::watcher::FileWatcher; /// Fixed-window per-IP rate limiter for failed Bearer authentication attempts. /// /// Disabled automatically when the daemon binds to a loopback address — rate /// limiting is only meaningful when external clients can reach the endpoint. /// When `enabled = false`, every method is a no-op. pub struct AuthRateLimiter { enabled: bool, window_secs: u64, max_failures: u32, counts: DashMap, } impl AuthRateLimiter { pub fn new(enabled: bool) -> Self { Self { enabled, window_secs: 60, max_failures: 10, counts: DashMap::new(), } } /// Returns `true` if this IP has exceeded the failure threshold within the /// window. pub fn is_limited(&self, ip: IpAddr) -> bool { if !self.enabled { return false; } self.counts .get(&ip) .is_some_and(|e| e.1.elapsed().as_secs() < self.window_secs && e.0 >= self.max_failures) } /// Record a failed attempt. Returns `true` if the IP is now rate-limited. pub fn record_failure(&self, ip: IpAddr) -> bool { if !self.enabled { return false; } let now = Instant::now(); let mut entry = self.counts.entry(ip).or_insert((0, now)); if entry.1.elapsed().as_secs() >= self.window_secs { entry.0 = 0; entry.1 = now; } entry.0 += 1; entry.0 >= self.max_failures } /// Clear failure count after a successful authentication. pub fn record_success(&self, ip: IpAddr) { if self.enabled { self.counts.remove(&ip); } } } /// Returns true if `s` has the format of a UUID v4 (36 chars, hyphens at /// positions 8/13/18/23). Used to distinguish session tokens from raw passwords /// in `check_primary_auth` without needing to attempt argon2 on token strings. #[cfg(feature = "ui")] fn is_uuid_v4(s: &str) -> bool { if s.len() != 36 { return false; } let b = s.as_bytes(); b[8] == b'-' && b[13] == b'-' && b[18] == b'-' && b[23] == b'-' && b.iter() .enumerate() .all(|(i, &c)| matches!(i, 8 | 13 | 18 | 23) || c.is_ascii_hexdigit()) } /// Shared application state injected into handlers. #[derive(Clone)] pub struct AppState { pub cache: Arc, pub project_root: PathBuf, pub ontoref_root: Option, pub started_at: Instant, pub last_activity: Arc, pub actors: Arc, pub notifications: Arc, /// Resolved NICKEL_IMPORT_PATH for UI-initiated NCL exports. pub nickel_import_path: Option, #[cfg(feature = "db")] pub db: Option>, #[cfg(feature = "nats")] pub nats: Option>, #[cfg(feature = "ui")] pub tera: Option>>, #[cfg(feature = "ui")] pub public_dir: Option, /// Project registry. Always present — primary project is always registered /// under `registry.primary_slug()`. Registry and primary project auth share /// the same `ProjectContext` code path; `primary_keys` is gone. pub registry: Arc, /// Channel to start a file watcher for a project added at runtime. pub new_project_tx: Option>>>, /// Path to `~/.config/ontoref/` — used to persist key overrides. pub config_dir: Option, #[cfg(feature = "ui")] pub sessions: Arc, /// Current project set by `set_project` MCP tool — shared across all /// connections. #[cfg(feature = "mcp")] pub mcp_current_project: Arc>>, /// Live watchers for runtime-added registry projects. /// Keyed by slug; removing an entry drops the `FileWatcher` and stops /// watching. pub watcher_map: Arc>>, /// Per-IP failed auth rate limiter. Disabled on loopback binds. pub auth_rate_limiter: Arc, /// Per-path mutex registry for NCL file mutations (backlog, QA). /// Serializes concurrent read-mutate-write sequences on the same file. #[cfg(feature = "ui")] pub ncl_write_lock: Arc, /// Argon2id hash of the daemon-level admin password. /// Set via ONTOREF_ADMIN_TOKEN (inline hash) or ONTOREF_ADMIN_TOKEN_FILE /// (path to a file containing the hash). When present, /ui/manage/login /// accepts this password and creates a "_daemon" session with Role::Admin. #[cfg(feature = "ui")] pub daemon_admin_hash: Option, } impl AppState { fn touch_activity(&self) { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs(); self.last_activity.store(now, Ordering::Relaxed); } /// Check `Authorization: Bearer` against the primary project's keys. /// /// Accepts two credential forms (tried in order): /// 1. Session token (UUID v4) — looked up in SessionStore; O(1), cheap. /// 2. Raw password — verified via argon2id against project keys; O(~100ms). /// /// Returns `None` when the request should proceed, `Some(response)` for /// 429 (rate-limited) or 401 (bad/missing credentials). pub(crate) fn check_primary_auth( &self, headers: &axum::http::HeaderMap, client_ip: IpAddr, ) -> Option { if self.auth_rate_limiter.is_limited(client_ip) { return Some( ( StatusCode::TOO_MANY_REQUESTS, Json(serde_json::json!({ "error": "too many failed auth attempts", "retry_after_secs": 60 })), ) .into_response(), ); } let primary = self.registry.primary(); if !primary.auth_enabled() { return None; } let Some(bearer) = extract_bearer(headers) else { return Some( ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({ "error": "Authorization: Bearer required" })), ) .into_response(), ); }; // Try session token first (UUID v4 format). #[cfg(feature = "ui")] if is_uuid_v4(bearer) { if let Some(entry) = self.sessions.get(bearer) { if entry.slug == primary.slug || entry.slug == "_daemon" { self.auth_rate_limiter.record_success(client_ip); return None; } } // UUID format but not a valid session — fall through to password // check. } // Password verification (backward-compatible). if primary.verify_key(bearer).is_some() { self.auth_rate_limiter.record_success(client_ip); None } else { self.auth_rate_limiter.record_failure(client_ip); Some( ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "invalid credentials"})), ) .into_response(), ) } } /// The slug of the primary (local) project. pub fn default_project_name(&self) -> String { self.registry.primary_slug().to_string() } } /// Validate a Bearer session token and return the `SessionEntry`. /// /// Only accepts UUID v4 tokens (not raw passwords). Returns a 401 response on /// any failure so callers can propagate directly with `?`. #[cfg(feature = "ui")] fn require_session( state: &AppState, headers: &axum::http::HeaderMap, ) -> Result> { let Some(bearer) = extract_bearer(headers) else { return Err(Box::new( ( StatusCode::UNAUTHORIZED, Json( serde_json::json!({"error": "Authorization: Bearer required"}), ), ) .into_response(), )); }; if !is_uuid_v4(bearer) { return Err(Box::new( ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "session token required; use POST /sessions to obtain one"})), ) .into_response(), )); } state.sessions.get(bearer).ok_or_else(|| { Box::new( ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "invalid or expired session"})), ) .into_response(), ) }) } /// Extract the raw password from `Authorization: Bearer `. pub(crate) fn extract_bearer(headers: &axum::http::HeaderMap) -> Option<&str> { headers .get(axum::http::header::AUTHORIZATION) .and_then(|v| v.to_str().ok()) .and_then(|v| v.strip_prefix("Bearer ")) .map(str::trim) .filter(|s| !s.is_empty()) } pub fn router(state: AppState) -> axum::Router { let app = axum::Router::new() // Existing endpoints .route("/api/catalog", get(api_catalog_handler)) .route("/health", get(health)) .route("/nickel/export", post(nickel_export)) .route("/cache/stats", get(cache_stats)) .route("/cache/invalidate", post(cache_invalidate)) // Actor endpoints .route("/actors/register", post(actor_register)) .route("/actors/{token}", delete(actor_deregister)) .route("/actors/{token}/touch", post(actor_touch)) .route("/actors/{token}/profile", post(actor_update_profile)) .route("/actors", get(actors_list)) // Notification endpoints .route("/notifications/pending", get(notifications_pending)) .route("/notifications/ack", post(notifications_ack)) // SSE push stream — actor subscribes once instead of polling. .route("/notifications/stream", get(notifications_stream)) // Git hook endpoint — actor signs a file-change it caused. .route("/ontology/changed", post(ontology_changed)) // Search endpoint .route("/search", get(search)) // Describe endpoints .route("/describe/project", get(describe_project)) .route("/describe/capabilities", get(describe_capabilities)) .route("/describe/connections", get(describe_connections)) .route("/describe/actor-init", get(describe_actor_init)) .route("/describe/guides", get(describe_guides)) // ADR read + validation endpoints .route("/validate/adrs", get(validate_adrs)) .route("/adr/{id}", get(get_adr)) // Ontology extension endpoints .route("/ontology", get(list_ontology_extensions)) .route("/ontology/{file}", get(get_ontology_extension)) // Graph endpoints (impact analysis + federation) .route("/graph/impact", get(graph_impact)) .route("/graph/node/{id}", get(graph_node)) // Backlog JSON endpoint .route("/backlog-json", get(backlog_json)) // Q&A read endpoint .route("/qa-json", get(qa_json)) // Push-based sync endpoint: projects export their NCL and POST here. // Body limited to 4 MiB — ontology payloads are small; larger bodies are rejected. // Auth: Bearer token verified against project keys (see sync.rs). .route( "/sync", post(crate::sync::sync_push) .layer(axum::extract::DefaultBodyLimit::max(4 * 1024 * 1024)), ) // Runtime key rotation for registered projects. // Requires Bearer token with admin role (or no auth if project has no keys yet). .route("/projects/{slug}/keys", put(project_update_keys)) // Per-file ontology version counters — incremented on every cache invalidation. .route( "/projects/{slug}/ontology/versions", get(project_file_versions), ) // Project registry management. .route("/projects", get(projects_list).post(project_add)) .route("/projects/{slug}", delete(project_delete)) // Config surface — read .route("/projects/{slug}/config", get(project_config)) .route("/projects/{slug}/config/schema", get(project_config_schema)) .route( "/projects/{slug}/config/coherence", get(project_config_coherence), ) .route( "/projects/{slug}/config/quickref", get(project_config_quickref), ) .route( "/projects/{slug}/config/{section}", get(project_config_section), ) // Config surface — cross-project comparison (no slug) .route("/config/cross-project", get(config_cross_project)) // Config surface — mutation via override layer (admin only) .route( "/projects/{slug}/config/{section}", put(project_config_update), ); // Session endpoints — gated on ui feature (requires SessionStore). #[cfg(feature = "ui")] let app = app .route("/sessions", post(session_create).get(sessions_list)) .route("/sessions/{id}", delete(session_revoke)); // Gate the mutation endpoint behind the ui feature (requires crate::ui). #[cfg(feature = "ui")] let app = app .route("/qa/add", post(crate::ui::handlers::qa_add)) .route("/qa/delete", post(crate::ui::handlers::qa_delete)) .route("/qa/update", post(crate::ui::handlers::qa_update)) .route( "/search/bookmark/add", post(crate::ui::handlers::search_bookmark_add), ) .route( "/search/bookmark/delete", post(crate::ui::handlers::search_bookmark_delete), ); let app = app.with_state(state.clone()); #[cfg(feature = "ui")] let app = { use axum::response::Redirect; use tower_http::services::ServeDir; let app = app .route("/ui", get(|| async { Redirect::permanent("/ui/") })) .nest("/ui/", crate::ui::router(state.clone())); if let Some(ref public_dir) = state.public_dir { app.nest_service("/public", ServeDir::new(public_dir)) } else { app } }; // MCP streamable-HTTP endpoint — stateless per-request factory. #[cfg(feature = "mcp")] let app = { use rmcp::transport::streamable_http_server::{ session::local::LocalSessionManager, StreamableHttpServerConfig, StreamableHttpService, }; let mcp_state = state.clone(); let mcp_svc = StreamableHttpService::new( move || Ok(crate::mcp::OntoreServer::new(mcp_state.clone())), std::sync::Arc::new(LocalSessionManager::default()), StreamableHttpServerConfig::default(), ); app.nest_service("/mcp", mcp_svc) }; app } // ── Health ────────────────────────────────────────────────────────────── #[derive(Serialize)] struct HealthResponse { status: &'static str, uptime_secs: u64, cache_entries: usize, cache_hits: u64, cache_misses: u64, project_root: String, active_actors: usize, ontology_version: u64, #[serde(skip_serializing_if = "Option::is_none")] db_enabled: Option, } /// Return the full API catalog — all endpoints registered via `#[onto_api]`, /// sorted by path then method. #[ontoref_derive::onto_api( method = "GET", path = "/api/catalog", description = "Full catalog of daemon HTTP endpoints with metadata: auth, actors, params, tags", auth = "none", actors = "agent, developer, ci, admin", tags = "meta, catalog" )] async fn api_catalog_handler() -> impl IntoResponse { let routes = crate::api_catalog::catalog(); Json(serde_json::json!({ "count": routes.len(), "routes": routes })) } #[ontoref_derive::onto_api( method = "GET", path = "/health", description = "Daemon health check: uptime, version, feature flags, active projects", auth = "none", actors = "agent, developer, ci, admin", tags = "meta" )] async fn health(State(state): State) -> Json { state.touch_activity(); let db_enabled = { #[cfg(feature = "db")] { Some(state.db.is_some()) } #[cfg(not(feature = "db"))] { None } }; Json(HealthResponse { status: "ok", uptime_secs: state.started_at.elapsed().as_secs(), cache_entries: state.cache.len(), cache_hits: state.cache.hit_count(), cache_misses: state.cache.miss_count(), project_root: state.project_root.display().to_string(), active_actors: state.actors.count(), ontology_version: state .registry .primary() .ontology_version .load(Ordering::Acquire), db_enabled, }) } // ── Nickel Export ─────────────────────────────────────────────────────── #[derive(Deserialize)] struct ExportRequest { path: String, import_path: Option, } #[derive(Serialize)] struct ExportResponse { data: serde_json::Value, cached: bool, elapsed_ms: u64, } #[ontoref_derive::onto_api( method = "POST", path = "/nickel/export", description = "Export a Nickel file to JSON, using the cache when the file is unchanged", auth = "viewer", actors = "developer, agent", params = "file:string:required:Absolute path to the .ncl file to export; \ import_path:string:optional:NICKEL_IMPORT_PATH override", tags = "nickel, cache" )] async fn nickel_export( State(state): State, headers: axum::http::HeaderMap, axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo, Json(req): Json, ) -> axum::response::Response { state.touch_activity(); if let Some(resp) = state.check_primary_auth(&headers, addr.ip()) { return resp; } let start = Instant::now(); // Accept absolute paths — daemon is loopback-only; OS permissions are the // boundary. Relative paths are still resolved against project_root for // backward compatibility. let file_path = match resolve_any_path(&state.project_root, &req.path) { Ok(p) => p, Err((status, msg)) => { return (status, Json(serde_json::json!({"error": msg}))).into_response() } }; let inherited_ip = std::env::var("NICKEL_IMPORT_PATH").unwrap_or_default(); let merged_ip: Option = match req.import_path.as_deref() { Some(caller_ip) => { if inherited_ip.is_empty() { Some(caller_ip.to_string()) } else { Some(format!("{caller_ip}:{inherited_ip}")) } } None => { if inherited_ip.is_empty() { None } else { Some(inherited_ip) } } }; let (data, was_hit) = match state.cache.export(&file_path, merged_ip.as_deref()).await { Ok(r) => r, Err(e) => { error!(path = %file_path.display(), error = %e, "nickel export failed"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(); } }; Json(ExportResponse { data, cached: was_hit, elapsed_ms: start.elapsed().as_millis() as u64, }) .into_response() } // ── Cache Management ──────────────────────────────────────────────────── #[derive(Serialize)] struct CacheStatsResponse { entries: usize, hits: u64, misses: u64, hit_rate: f64, } #[ontoref_derive::onto_api( method = "GET", path = "/cache/stats", description = "NCL export cache statistics: entry count, hit/miss counters", auth = "viewer", actors = "developer, admin", tags = "cache, meta" )] async fn cache_stats(State(state): State) -> Json { state.touch_activity(); let hits = state.cache.hit_count(); let misses = state.cache.miss_count(); let total = hits + misses; Json(CacheStatsResponse { entries: state.cache.len(), hits, misses, hit_rate: if total > 0 { hits as f64 / total as f64 } else { 0.0 }, }) } #[derive(Deserialize)] struct InvalidateRequest { /// Optional project slug. When present, invalidates the registry project's /// cache instead of the primary project cache. slug: Option, prefix: Option, file: Option, all: Option, } #[derive(Serialize)] struct InvalidateResponse { invalidated: bool, entries_remaining: usize, } #[ontoref_derive::onto_api( method = "POST", path = "/cache/invalidate", description = "Invalidate one or all NCL cache entries, forcing re-export on next request", auth = "admin", actors = "developer, admin", params = "file:string:optional:Specific file path to invalidate (omit to invalidate all)", tags = "cache" )] async fn cache_invalidate( State(state): State, headers: axum::http::HeaderMap, axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo, Json(req): Json, ) -> std::result::Result, (StatusCode, String)> { state.touch_activity(); if let Some(resp) = state.check_primary_auth(&headers, addr.ip()) { let status = resp.status(); return Err((status, "authentication required".to_string())); } // Resolve which cache to operate on. let cache = match req.slug.as_deref() { Some(slug) => { let ctx = state.registry.get(slug).ok_or_else(|| { ( StatusCode::NOT_FOUND, format!("project '{slug}' not registered"), ) })?; ctx.cache.clone() } None => state.cache.clone(), }; let project_root = match req.slug.as_deref() { Some(slug) => state .registry .get(slug) .map(|ctx| ctx.root.clone()) .unwrap_or_else(|| state.project_root.clone()), None => state.project_root.clone(), }; if req.all.unwrap_or(false) { cache.invalidate_all(); } else if let Some(prefix) = &req.prefix { let path = resolve_path(&project_root, prefix)?; cache.invalidate_prefix(&path); } else if let Some(file) = &req.file { let path = resolve_path(&project_root, file)?; cache.invalidate_file(&path); } else { return Err(( StatusCode::BAD_REQUEST, "at least one of 'all', 'prefix', or 'file' must be specified".to_string(), )); } Ok(Json(InvalidateResponse { invalidated: true, entries_remaining: cache.len(), })) } // ── Actor Endpoints ───────────────────────────────────────────────────── #[derive(Serialize)] struct RegisterResponse { token: String, actors_connected: usize, } #[ontoref_derive::onto_api( method = "POST", path = "/actors/register", description = "Register an actor session and receive a bearer token for subsequent calls", auth = "none", actors = "agent, developer, ci", params = "actor:string:required:Actor type (agent|developer|ci|admin); \ project:string:optional:Project slug to associate with; label:string:optional:Human \ label for audit trail", tags = "actors, auth" )] async fn actor_register( State(state): State, Json(req): Json, ) -> (StatusCode, Json) { state.touch_activity(); #[cfg(feature = "nats")] let actor_type = req.actor_type.clone(); #[cfg(feature = "nats")] let project = req.project.clone(); let token = state.actors.register(req); let count = state.actors.count(); #[cfg(feature = "nats")] { if let Some(ref nats) = state.nats { if let Err(e) = nats .publish_actor_registered(&token, &actor_type, &project) .await { tracing::warn!(error = %e, "failed to publish actor.registered event"); } } } ( StatusCode::CREATED, Json(RegisterResponse { token, actors_connected: count, }), ) } #[ontoref_derive::onto_api( method = "DELETE", path = "/actors/{token}", description = "Deregister an actor session and invalidate its bearer token", auth = "none", actors = "agent, developer, ci", tags = "actors, auth" )] async fn actor_deregister(State(state): State, Path(token): Path) -> StatusCode { state.touch_activity(); if state.actors.deregister(&token) { #[cfg(feature = "nats")] { if let Some(ref nats) = state.nats { if let Err(e) = nats.publish_actor_deregistered(&token, "explicit").await { tracing::warn!(error = %e, "failed to publish actor.deregistered event"); } } } StatusCode::NO_CONTENT } else { StatusCode::NOT_FOUND } } #[ontoref_derive::onto_api( method = "POST", path = "/actors/{token}/touch", description = "Extend actor session TTL; prevents the session from expiring due to inactivity", auth = "none", actors = "agent, developer, ci", tags = "actors" )] async fn actor_touch(State(state): State, Path(token): Path) -> StatusCode { state.touch_activity(); if state.actors.touch(&token) { StatusCode::NO_CONTENT } else { StatusCode::NOT_FOUND } } #[derive(Deserialize)] struct ProfileRequest { #[serde(default)] role: Option, #[serde(default)] preferences: Option, } #[ontoref_derive::onto_api( method = "POST", path = "/actors/{token}/profile", description = "Update actor profile metadata: display name, role, and custom context fields", auth = "none", actors = "agent, developer", tags = "actors" )] async fn actor_update_profile( State(state): State, Path(token): Path, Json(req): Json, ) -> StatusCode { state.touch_activity(); if state .actors .update_profile(&token, req.role, req.preferences) { StatusCode::NO_CONTENT } else { StatusCode::NOT_FOUND } } #[derive(Serialize)] struct ActorsListResponse { actors: Vec, total: usize, } #[derive(Serialize)] struct ActorEntry { token: String, #[serde(flatten)] session: ActorSessionView, } #[derive(Deserialize)] struct ActorsQuery { project: Option, } #[ontoref_derive::onto_api( method = "GET", path = "/actors", description = "List all registered actor sessions with their last-seen timestamp and pending \ notification count", auth = "viewer", actors = "developer, admin", params = "project:string:optional:Filter by project slug", tags = "actors" )] async fn actors_list( State(state): State, Query(query): Query, ) -> Json { state.touch_activity(); let entries = match query.project { Some(ref project) => state.actors.list_for_project(project), None => state.actors.list(), }; let total = entries.len(); let actors = entries .into_iter() .map(|(token, session)| ActorEntry { token, session }) .collect(); Json(ActorsListResponse { actors, total }) } // ── Notification Endpoints ────────────────────────────────────────────── #[derive(Deserialize)] struct PendingQuery { token: String, project: Option, #[serde(default)] check_only: bool, } #[derive(Serialize)] struct PendingResponse { pending: usize, #[serde(skip_serializing_if = "Option::is_none")] notifications: Option>, } #[ontoref_derive::onto_api( method = "GET", path = "/notifications/pending", description = "Poll pending notifications for an actor; optionally marks them as seen", auth = "none", actors = "agent, developer, ci", params = "token:string:required:Actor bearer token; project:string:optional:Project slug \ filter; check_only:bool:default=false:Return count without marking seen", tags = "notifications" )] async fn notifications_pending( State(state): State, Query(query): Query, ) -> impl IntoResponse { state.touch_activity(); if state.actors.get_and_touch(&query.token).is_none() { return ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "unknown or expired actor token"})), ) .into_response(); } let project = query .project .unwrap_or_else(|| state.default_project_name()); if query.check_only { let count = state.notifications.pending_count(&project, &query.token); Json(PendingResponse { pending: count, notifications: None, }) .into_response() } else { let notifications = state.notifications.pending(&project, &query.token); let count = notifications.len(); Json(PendingResponse { pending: count, notifications: Some(notifications), }) .into_response() } } #[derive(Serialize)] struct AckResponse { acknowledged: usize, } #[ontoref_derive::onto_api( method = "POST", path = "/notifications/ack", description = "Acknowledge one or more notifications; removes them from the pending queue", auth = "none", actors = "agent, developer, ci", params = "token:string:required:Actor bearer token; ids:string:required:Comma-separated \ notification ids to acknowledge", tags = "notifications" )] async fn notifications_ack( State(state): State, Json(req): Json, ) -> std::result::Result, (StatusCode, String)> { state.touch_activity(); state.actors.touch(&req.token); let project = req.project.unwrap_or_else(|| state.default_project_name()); let count = if req.all { let acked = state.notifications.ack_all(&project, &req.token); state.actors.clear_pending(&req.token); acked } else if let Some(id) = req.notification_id { if state.notifications.ack_one(&project, &req.token, id) { 1 } else { 0 } } else { return Err(( StatusCode::BAD_REQUEST, "either 'all: true' or 'notification_id' must be specified".to_string(), )); }; Ok(Json(AckResponse { acknowledged: count, })) } // ── SSE Notification Stream ─────────────────────────────────────────────── /// Query params for `GET /notifications/stream`. #[derive(Deserialize)] struct StreamQuery { /// Actor token — used to scope the stream to this actor's project. token: String, /// Optional project slug override. Defaults to the actor's project. project: Option, } /// `GET /notifications/stream?token=[&project=]` /// /// Server-Sent Events stream. Each event is a JSON-serialized /// `NotificationView`. Clients receive push notifications without polling. /// Reconnects automatically pick up new events (no replay of missed events — /// use `/notifications/pending` for that). #[ontoref_derive::onto_api( method = "GET", path = "/notifications/stream", description = "SSE push stream: actor subscribes once and receives notification events as \ they occur", auth = "none", actors = "agent, developer", params = "token:string:required:Actor bearer token; project:string:optional:Project slug \ filter", tags = "notifications, sse" )] async fn notifications_stream( State(state): State, Query(params): Query, ) -> axum::response::Response { use axum::response::sse::{Event, KeepAlive}; use axum::response::Sse; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::StreamExt; let session = match state.actors.get_and_touch(¶ms.token) { Some(s) => s, None => { return ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "unknown or expired actor token"})), ) .into_response(); } }; // Resolve which notification store to subscribe to. let notifications = if let Some(slug) = ¶ms.project { state .registry .get(slug) .map(|ctx| Arc::clone(&ctx.notifications)) .unwrap_or_else(|| Arc::clone(&state.notifications)) } else { // Derive project from the actor session — reuse the snapshot from // get_and_touch. let actor_project = Some(session.project.clone()); actor_project .as_deref() .and_then(|p| state.registry.get(p)) .map(|ctx| Arc::clone(&ctx.notifications)) .unwrap_or_else(|| Arc::clone(&state.notifications)) }; let rx = notifications.subscribe(); let stream = BroadcastStream::new(rx).filter_map(|result| match result { Ok(view) => { let json = serde_json::to_string(&view).unwrap_or_default(); Some(Ok::<_, std::convert::Infallible>( Event::default().event("notification").data(json), )) } // Lagged — receiver missed events; client should call /notifications/pending to catch up. Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => { let msg = format!("{{\"lagged\":{n}}}"); Some(Ok(Event::default().event("lagged").data(msg))) } }); Sse::new(stream) .keep_alive(KeepAlive::default()) .into_response() } // ── Git Hook — Ontology Changed ─────────────────────────────────────────── /// Body for `POST /ontology/changed` — sent by a git hook after NCL files /// change. #[derive(Deserialize)] struct OntologyChangedRequest { /// Actor token that caused the change (from `ONTOREF_TOKEN` env var). token: String, /// Relative file paths that changed (e.g. `[".ontology/core.ncl"]`). files: Vec, /// Project slug. Defaults to the actor's registered project. #[serde(default)] project: Option, } /// `POST /ontology/changed` — actor-attributed file-change notification. /// /// Called by git hooks (post-merge, post-commit) so the daemon knows *who* /// caused the change. Creates a notification with `source_actor` set, enabling /// multi-actor coordination UIs to display attribution. #[ontoref_derive::onto_api( method = "POST", path = "/ontology/changed", description = "Git hook endpoint: actor signs a file-change event it caused to suppress \ self-notification", auth = "viewer", actors = "developer, ci", params = "token:string:required:Actor bearer token; files:string:required:JSON array of \ changed file paths", tags = "ontology, notifications" )] async fn ontology_changed( State(state): State, Json(req): Json, ) -> impl IntoResponse { // Reject unknown tokens — git hook attribution requires a registered actor. let actor_project = match state.actors.get(&req.token) { Some(s) => s.project.clone(), None => { return ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "unknown or expired actor token"})), ) .into_response(); } }; // Resolve project and notification store from the actor token. let (project, notifications) = { let project = match &req.project { // If the caller explicitly names a project, it must match the // token's registered project — prevents cross-project injection. Some(req_project) if req_project != &actor_project => { return ( StatusCode::FORBIDDEN, Json(serde_json::json!({ "error": format!( "token is registered to project '{}', not '{req_project}'", actor_project ) })), ) .into_response(); } Some(req_project) => req_project.clone(), None => actor_project, }; let store = state .registry .get(&project) .map(|ctx| Arc::clone(&ctx.notifications)) .unwrap_or_else(|| Arc::clone(&state.notifications)); (project, store) }; if req.files.is_empty() { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "files must not be empty"})), ) .into_response(); } let ids = notifications.push(&project, req.files, Some(req.token.clone())); // Increment pending for all actors in the project except the source actor. if !ids.is_empty() { for token in state.actors.tokens_for_project(&project) { if token != req.token { state.actors.increment_pending(&token); } } } ( StatusCode::OK, Json(serde_json::json!({ "project": project, "notifications_created": ids.len(), "source_actor": req.token, })), ) .into_response() } // ── Search ─────────────────────────────────────────────────────────────── #[derive(Deserialize)] struct SearchQuery { q: Option, #[cfg(feature = "ui")] slug: Option, } #[derive(Serialize)] struct SearchResponse { query: String, results: Vec, } #[ontoref_derive::onto_api( method = "GET", path = "/search", description = "Full-text search over ontology nodes, ADRs, practices and Q&A entries", auth = "none", actors = "agent, developer", params = "q:string:required:Search query string; slug:string:optional:Project slug (ui \ feature only)", tags = "search" )] async fn search( State(state): State, Query(params): Query, ) -> impl IntoResponse { let q = params.q.as_deref().unwrap_or("").trim().to_string(); if q.is_empty() { return Json(SearchResponse { query: q, results: vec![], }); } // In multi-project mode, resolve context from slug; fall back to primary // project. #[cfg(feature = "ui")] let results = { if let Some(slug) = params.slug.as_deref() { if let Some(ctx) = state.registry.get(slug) { crate::search::search_project(&ctx.root, &ctx.cache, ctx.import_path.as_deref(), &q) .await } else { vec![] } } else { crate::search::search_project( &state.project_root, &state.cache, state.nickel_import_path.as_deref(), &q, ) .await } }; #[cfg(not(feature = "ui"))] let results = crate::search::search_project( &state.project_root, &state.cache, state.nickel_import_path.as_deref(), &q, ) .await; Json(SearchResponse { query: q, results }) } // ── Describe Endpoints ─────────────────────────────────────────────────── #[derive(Deserialize)] struct DescribeQuery { slug: Option, } #[derive(Deserialize)] struct ActorInitQuery { actor: Option, slug: Option, } #[derive(Deserialize)] struct GuidesQuery { slug: Option, actor: Option, } /// Resolve project context from an optional slug. /// Falls back to the primary project when slug is absent or not found in /// registry. #[cfg(feature = "ui")] fn resolve_project_ctx( state: &AppState, slug: Option<&str>, ) -> (PathBuf, Arc, Option) { if let Some(s) = slug { if let Some(ctx) = state.registry.get(s) { return (ctx.root.clone(), ctx.cache.clone(), ctx.import_path.clone()); } } ( state.project_root.clone(), state.cache.clone(), state.nickel_import_path.clone(), ) } #[cfg(not(feature = "ui"))] fn resolve_project_ctx( state: &AppState, _slug: Option<&str>, ) -> (PathBuf, Arc, Option) { ( state.project_root.clone(), state.cache.clone(), state.nickel_import_path.clone(), ) } #[ontoref_derive::onto_api( method = "GET", path = "/describe/project", description = "Project self-description: identity, axioms, tensions, practices, gates, ADRs, \ dimensions", auth = "none", actors = "agent, developer, ci, admin", params = "slug:string:optional:Project slug (defaults to primary)", tags = "describe, ontology" )] async fn describe_project( State(state): State, Query(q): Query, ) -> impl IntoResponse { state.touch_activity(); let (root, cache, import_path) = resolve_project_ctx(&state, q.slug.as_deref()); let path = root.join(".ontology").join("core.ncl"); if !path.exists() { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": "core.ncl not found" })), ); } match cache.export(&path, import_path.as_deref()).await { Ok((data, _)) => (StatusCode::OK, Json(data)), Err(e) => { error!(path = %path.display(), error = %e, "describe_project export failed"); ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": e.to_string() })), ) } } } #[ontoref_derive::onto_api( method = "GET", path = "/describe/connections", description = "Cross-project connection declarations: upstream, downstream, peers with \ addressing", auth = "none", actors = "agent, developer", params = "slug:string:optional:Project slug (defaults to primary)", tags = "describe, federation" )] async fn describe_connections( State(state): State, Query(q): Query, ) -> impl IntoResponse { state.touch_activity(); let (root, cache, import_path) = resolve_project_ctx(&state, q.slug.as_deref()); let path = root.join(".ontology").join("connections.ncl"); if !path.exists() { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": "connections.ncl not found" })), ); } match cache.export(&path, import_path.as_deref()).await { Ok((data, _)) => (StatusCode::OK, Json(data)), Err(e) => { error!(path = %path.display(), error = %e, "describe_connections export failed"); ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": e.to_string() })), ) } } } #[ontoref_derive::onto_api( method = "GET", path = "/validate/adrs", description = "Execute typed ADR constraint checks and return per-constraint pass/fail results", auth = "viewer", actors = "developer, ci, agent", params = "slug:string:optional:Project slug (defaults to primary)", tags = "validate, adrs" )] async fn validate_adrs( State(state): State, Query(q): Query, ) -> impl IntoResponse { state.touch_activity(); let (root, _cache, _import_path) = resolve_project_ctx(&state, q.slug.as_deref()); let output = match tokio::process::Command::new("nu") .args([ "--no-config-file", "-c", "use reflection/modules/validate.nu *; validate check-all --fmt json", ]) .current_dir(&root) .output() .await { Ok(o) => o, Err(e) => { error!(error = %e, "validate_adrs: failed to spawn nu"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": format!("spawn failed: {e}") })), ); } }; let stdout = String::from_utf8_lossy(&output.stdout); match serde_json::from_str::(stdout.trim()) { Ok(v) => (StatusCode::OK, Json(v)), Err(e) => { let stderr = String::from_utf8_lossy(&output.stderr); error!(error = %e, stderr = %stderr, "validate_adrs: nu output is not valid JSON"); ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": format!("invalid JSON from validate: {e}"), "stderr": stderr.trim(), })), ) } } } #[derive(Deserialize)] struct ImpactQuery { slug: Option, node: String, #[serde(default = "default_depth")] depth: u32, #[serde(default)] include_external: bool, } fn default_depth() -> u32 { 2 } #[ontoref_derive::onto_api( method = "GET", path = "/graph/impact", description = "BFS impact graph from an ontology node; optionally traverses cross-project \ connections", auth = "none", actors = "agent, developer", params = "node:string:required:Ontology node id to start from; depth:u32:default=2:Max BFS \ hops (capped at 5); include_external:bool:default=false:Follow connections.ncl to \ external projects; slug:string:optional:Project slug (defaults to primary)", tags = "graph, federation" )] async fn graph_impact( State(state): State, Query(q): Query, ) -> impl IntoResponse { state.touch_activity(); let effective_slug = q .slug .clone() .unwrap_or_else(|| state.registry.primary_slug().to_owned()); let fed = crate::federation::FederatedQuery::new(Arc::clone(&state.registry)); let impacts = fed .impact_graph(&effective_slug, &q.node, q.depth, q.include_external) .await; ( StatusCode::OK, Json(serde_json::json!({ "slug": effective_slug, "node": q.node, "depth": q.depth, "include_external": q.include_external, "impacts": impacts, })), ) } #[ontoref_derive::onto_api( method = "GET", path = "/graph/node/{id}", description = "Resolve a single ontology node by id from the local cache (used by federation)", auth = "none", actors = "agent, developer", params = "slug:string:optional:Project slug (defaults to primary)", tags = "graph, federation" )] async fn graph_node( State(state): State, Path(id): Path, Query(q): Query, ) -> impl IntoResponse { state.touch_activity(); let (root, cache, import_path) = resolve_project_ctx(&state, q.slug.as_deref()); let core_path = root.join(".ontology").join("core.ncl"); if !core_path.exists() { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": "core.ncl not found" })), ); } match cache.export(&core_path, import_path.as_deref()).await { Ok((json, _)) => { let node = json .get("nodes") .and_then(|n| n.as_array()) .and_then(|nodes| { nodes .iter() .find(|n| n.get("id").and_then(|v| v.as_str()) == Some(id.as_str())) .cloned() }); match node { Some(n) => (StatusCode::OK, Json(n)), None => ( StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": format!("node '{}' not found", id) })), ), } } Err(e) => { error!(node = %id, error = %e, "graph_node: core export failed"); ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": e.to_string() })), ) } } } #[ontoref_derive::onto_api( method = "GET", path = "/describe/guides", description = "Complete operational context for an actor: identity, axioms, practices, \ constraints, gate state, modes, actor policy, connections, content assets", auth = "none", actors = "agent, developer, ci", params = "slug:string:optional:Project slug (defaults to primary); \ actor:string:optional:Actor context filters the policy (agent|developer|ci|admin)", tags = "describe, guides" )] async fn describe_guides( State(state): State, Query(q): Query, ) -> impl IntoResponse { state.touch_activity(); let (root, _cache, _import_path) = resolve_project_ctx(&state, q.slug.as_deref()); let actor = q.actor.as_deref().unwrap_or("developer"); let nu_cmd = format!( "use reflection/modules/describe.nu *; describe guides --actor {} --fmt json", actor, ); let output = match tokio::process::Command::new("nu") .args(["--no-config-file", "-c", &nu_cmd]) .current_dir(&root) .output() .await { Ok(o) => o, Err(e) => { error!(error = %e, "describe_guides: failed to spawn nu"); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": format!("spawn failed: {e}") })), ); } }; let stdout = String::from_utf8_lossy(&output.stdout); match serde_json::from_str::(stdout.trim()) { Ok(v) => (StatusCode::OK, Json(v)), Err(e) => { let stderr = String::from_utf8_lossy(&output.stderr); error!(error = %e, stderr = %stderr, "describe_guides: nu output is not valid JSON"); ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": format!("invalid JSON from describe guides: {e}"), "stderr": stderr.trim(), })), ) } } } #[ontoref_derive::onto_api( method = "GET", path = "/describe/capabilities", description = "Available reflection modes, just recipes, Claude capabilities and CI tools for \ the project", auth = "none", actors = "agent, developer, ci", params = "slug:string:optional:Project slug (defaults to primary)", tags = "describe" )] async fn describe_capabilities( State(state): State, Query(q): Query, ) -> impl IntoResponse { state.touch_activity(); let (root, cache, import_path) = resolve_project_ctx(&state, q.slug.as_deref()); let modes_dir = root.join("reflection").join("modes"); let adrs_dir = root.join("adrs"); let forms_dir = root.join("reflection").join("forms"); // Modes: export each NCL for id + trigger let mut modes: Vec = Vec::new(); if let Ok(entries) = std::fs::read_dir(&modes_dir) { for entry in entries.flatten() { let path = entry.path(); if path.extension().and_then(|e| e.to_str()) != Some("ncl") { continue; } let stem = path .file_stem() .and_then(|s| s.to_str()) .unwrap_or("") .to_string(); match cache.export(&path, import_path.as_deref()).await { Ok((json, _)) => { let id = json .get("id") .and_then(|v| v.as_str()) .unwrap_or(&stem) .to_string(); let trigger = json .get("trigger") .and_then(|v| v.as_str()) .unwrap_or("") .to_string(); modes.push(serde_json::json!({ "id": id, "trigger": trigger })); } Err(e) => { warn!(path = %path.display(), error = %e, "capabilities: mode export failed"); modes.push(serde_json::json!({ "id": stem, "trigger": "" })); } } } } modes.sort_by_key(|v| v["id"].as_str().unwrap_or("").to_string()); // ADRs: count only let adr_count = std::fs::read_dir(&adrs_dir) .map(|rd| { rd.flatten() .filter(|e| e.path().extension().and_then(|x| x.to_str()) == Some("ncl")) .count() }) .unwrap_or(0); // Forms: list file stems let mut forms: Vec = std::fs::read_dir(&forms_dir) .map(|rd| { rd.flatten() .filter_map(|e| { let p = e.path(); if p.extension().and_then(|x| x.to_str()) != Some("ncl") { return None; } p.file_stem().and_then(|s| s.to_str()).map(str::to_string) }) .collect() }) .unwrap_or_default(); forms.sort(); let mode_count = modes.len(); let form_count = forms.len(); ( StatusCode::OK, Json(serde_json::json!({ "modes": modes, "adrs": adr_count, "forms": forms, "mode_count": mode_count, "adr_count": adr_count, "form_count": form_count, })), ) } #[ontoref_derive::onto_api( method = "GET", path = "/describe/actor-init", description = "Minimal onboarding payload for a new actor session: what to register as and \ what to do first", auth = "none", actors = "agent", params = "actor:string:optional:Actor type to onboard as; slug:string:optional:Project slug", tags = "describe, actors" )] async fn describe_actor_init( State(state): State, Query(q): Query, ) -> impl IntoResponse { state.touch_activity(); let (root, cache, import_path) = resolve_project_ctx(&state, q.slug.as_deref()); let actor = q.actor.as_deref().unwrap_or("agent"); let config_path = root.join(".ontoref").join("config.ncl"); if !config_path.exists() { return ( StatusCode::OK, Json(serde_json::json!({ "mode": "", "auto_run": false })), ); } match cache.export(&config_path, import_path.as_deref()).await { Ok((json, _)) => { let entry = json .get("actor_init") .and_then(|v| v.as_array()) .and_then(|arr| { arr.iter() .find(|e| e.get("actor").and_then(|v| v.as_str()) == Some(actor)) }) .cloned(); let result = match entry { Some(e) => { let mode = e .get("mode") .and_then(|v| v.as_str()) .unwrap_or("") .to_string(); let auto_run = e.get("auto_run").and_then(|v| v.as_bool()).unwrap_or(false); serde_json::json!({ "mode": mode, "auto_run": auto_run }) } None => serde_json::json!({ "mode": "", "auto_run": false }), }; (StatusCode::OK, Json(result)) } Err(e) => { warn!(path = %config_path.display(), error = %e, "describe_actor_init export failed"); ( StatusCode::OK, Json(serde_json::json!({ "mode": "", "auto_run": false })), ) } } } // ── ADR read endpoint ──────────────────────────────────────────────────────── #[derive(Deserialize)] struct AdrQuery { slug: Option, } #[ontoref_derive::onto_api( method = "GET", path = "/adr/{id}", description = "Read a single ADR by id, exported from NCL as structured JSON", auth = "none", actors = "agent, developer", params = "slug:string:optional:Project slug (defaults to primary)", tags = "adrs" )] async fn get_adr( State(state): State, Path(id): Path, Query(q): Query, ) -> impl IntoResponse { state.touch_activity(); let (root, cache, import_path) = resolve_project_ctx(&state, q.slug.as_deref()); let adrs_dir = root.join("adrs"); let entries = match std::fs::read_dir(&adrs_dir) { Ok(e) => e, Err(_) => { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": "adrs directory not found" })), ); } }; for entry in entries.flatten() { let path = entry.path(); if path.extension().and_then(|e| e.to_str()) != Some("ncl") { continue; } let stem = path .file_stem() .and_then(|s| s.to_str()) .unwrap_or("") .to_string(); if !stem.contains(id.as_str()) { continue; } return match cache.export(&path, import_path.as_deref()).await { Ok((v, _)) => (StatusCode::OK, Json(v)), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": e.to_string() })), ), }; } ( StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": format!("ADR '{}' not found", id) })), ) } // ── Ontology extension endpoints ───────────────────────────────────────────── const CORE_FILES: &[&str] = &["core.ncl", "state.ncl", "gate.ncl"]; #[derive(Deserialize)] struct OntologyQuery { slug: Option, } #[ontoref_derive::onto_api( method = "GET", path = "/ontology", description = "List available ontology extension files beyond core, state, gate, manifest", auth = "none", actors = "agent, developer", params = "slug:string:optional:Project slug (defaults to primary)", tags = "ontology" )] async fn list_ontology_extensions( State(state): State, Query(q): Query, ) -> impl IntoResponse { state.touch_activity(); let (root, _, _) = resolve_project_ctx(&state, q.slug.as_deref()); let ontology_dir = root.join(".ontology"); let entries = match std::fs::read_dir(&ontology_dir) { Ok(e) => e, Err(_) => { return ( StatusCode::OK, Json(serde_json::json!({ "extensions": [] })), ); } }; let mut extensions: Vec = entries .flatten() .filter_map(|e| { let path = e.path(); if path.extension().and_then(|x| x.to_str()) != Some("ncl") { return None; } let name = path.file_name()?.to_str()?.to_string(); if CORE_FILES.contains(&name.as_str()) { return None; } let stem = path.file_stem()?.to_str()?.to_string(); Some(serde_json::json!({ "file": name, "id": stem })) }) .collect(); extensions.sort_by_key(|v| v["id"].as_str().unwrap_or("").to_string()); ( StatusCode::OK, Json(serde_json::json!({ "extensions": extensions })), ) } #[ontoref_derive::onto_api( method = "GET", path = "/ontology/{file}", description = "Export a specific ontology extension file to JSON", auth = "none", actors = "agent, developer", params = "slug:string:optional:Project slug (defaults to primary)", tags = "ontology" )] async fn get_ontology_extension( State(state): State, Path(file): Path, Query(q): Query, ) -> impl IntoResponse { state.touch_activity(); let (root, cache, import_path) = resolve_project_ctx(&state, q.slug.as_deref()); // Reject traversal attempts and core files — they have dedicated endpoints. if file.contains('/') || file.contains("..") || CORE_FILES.contains(&file.as_str()) { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": "invalid file name" })), ); } let file = if file.ends_with(".ncl") { file } else { format!("{file}.ncl") }; let path = root.join(".ontology").join(&file); if !path.exists() { return ( StatusCode::NOT_FOUND, Json( serde_json::json!({ "error": format!("ontology extension '{}' not found", file) }), ), ); } match cache.export(&path, import_path.as_deref()).await { Ok((v, _)) => (StatusCode::OK, Json(v)), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": e.to_string() })), ), } } #[ontoref_derive::onto_api( method = "GET", path = "/backlog-json", description = "Export the project backlog as structured JSON from reflection/backlog.ncl", auth = "viewer", actors = "developer, agent", params = "slug:string:optional:Project slug (defaults to primary)", tags = "backlog" )] async fn backlog_json( State(state): State, Query(q): Query, ) -> impl IntoResponse { state.touch_activity(); let (root, cache, import_path) = resolve_project_ctx(&state, q.slug.as_deref()); let backlog_path = root.join("reflection").join("backlog.ncl"); if !backlog_path.exists() { return (StatusCode::OK, Json(serde_json::json!([]))); } match cache.export(&backlog_path, import_path.as_deref()).await { Ok((json, _)) => { let items = json .get("items") .and_then(|v| v.as_array()) .cloned() .unwrap_or_default(); (StatusCode::OK, Json(serde_json::Value::Array(items))) } Err(e) => { error!(path = %backlog_path.display(), error = %e, "backlog_json export failed"); ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": e.to_string() })), ) } } } // ── Q&A endpoints ─────────────────────────────────────────────────────── #[ontoref_derive::onto_api( method = "GET", path = "/qa-json", description = "Export the Q&A knowledge store as structured JSON from reflection/qa.ncl", auth = "none", actors = "agent, developer", params = "slug:string:optional:Project slug (defaults to primary)", tags = "qa" )] async fn qa_json( State(state): State, Query(q): Query, ) -> impl IntoResponse { state.touch_activity(); let (root, cache, import_path) = resolve_project_ctx(&state, q.slug.as_deref()); let qa_path = root.join("reflection").join("qa.ncl"); if !qa_path.exists() { return (StatusCode::OK, Json(serde_json::json!([]))); } match cache.export(&qa_path, import_path.as_deref()).await { Ok((json, _)) => { let entries = json .get("entries") .and_then(|v| v.as_array()) .cloned() .unwrap_or_default(); (StatusCode::OK, Json(serde_json::Value::Array(entries))) } Err(e) => { error!(path = %qa_path.display(), error = %e, "qa_json export failed"); ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": e.to_string() })), ) } } } // ── Project Registry Endpoints ─────────────────────────────────────────────── #[derive(Serialize)] struct ProjectView { slug: String, root: String, push_only: bool, remote_url: String, auth_enabled: bool, /// Monotonically increasing counter — incremented after each successful /// seed. Clients compare this value to detect stale local state. ontology_version: u64, } #[ontoref_derive::onto_api( method = "GET", path = "/projects", description = "List all registered projects with slug, root, push_only flag and import path", auth = "admin", actors = "admin", tags = "projects, registry" )] async fn projects_list(State(state): State) -> impl IntoResponse { use std::sync::atomic::Ordering; state.touch_activity(); let projects: Vec = state .registry .all() .into_iter() .map(|ctx| ProjectView { slug: ctx.slug.clone(), root: ctx.root.display().to_string(), push_only: ctx.push_only, remote_url: ctx.remote_url.clone(), auth_enabled: ctx.auth_enabled(), ontology_version: ctx.ontology_version.load(Ordering::Acquire), }) .collect(); Json(serde_json::json!({ "projects": projects, "total": projects.len() })) } /// Validate that a project slug contains only alphanumeric characters, `-`, or /// `_`. Rejects slugs with `/`, `.`, null bytes, or other path-significant /// characters. fn validate_slug(slug: &str) -> std::result::Result<(), (StatusCode, String)> { if slug.is_empty() { return Err((StatusCode::BAD_REQUEST, "slug must not be empty".into())); } if slug.len() > 64 { return Err(( StatusCode::BAD_REQUEST, "slug must not exceed 64 characters".into(), )); } let valid = slug .chars() .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_'); if !valid { return Err(( StatusCode::BAD_REQUEST, format!("slug '{slug}' contains invalid characters — only [a-zA-Z0-9_-] are allowed"), )); } Ok(()) } #[ontoref_derive::onto_api( method = "POST", path = "/projects", description = "Register a new project at runtime without daemon restart", auth = "admin", actors = "admin", tags = "projects, registry" )] async fn project_add( State(state): State, Json(entry): Json, ) -> impl IntoResponse { state.touch_activity(); if let Err(e) = validate_slug(&entry.slug) { return e.into_response(); } let registry = &state.registry; let slug = entry.slug.clone(); if let Err(e) = registry.add_project(entry) { return ( StatusCode::CONFLICT, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(); } // Notify the runtime watcher task to start watching the new project. if let Some(ctx) = registry.get(&slug) { if let Some(ref tx) = state.new_project_tx { let _ = tx.send(ctx); } } ( StatusCode::CREATED, Json(serde_json::json!({"slug": slug, "registered": true})), ) .into_response() } #[ontoref_derive::onto_api( method = "DELETE", path = "/projects/{slug}", description = "Deregister a project and stop its file watcher", auth = "admin", actors = "admin", tags = "projects, registry" )] async fn project_delete( State(state): State, Path(slug): Path, ) -> impl IntoResponse { state.touch_activity(); let registry = &state.registry; if registry.get(&slug).is_none() { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": format!("project '{slug}' not registered")})), ) .into_response(); } registry.remove_project(&slug); // Abort and drop the file watcher for this project so the OS watch // handle is released and the debounce task doesn't keep running. if let Some(watcher) = state.watcher_map.lock().await.remove(&slug) { watcher.abort(); } StatusCode::NO_CONTENT.into_response() } // ── Project Key Rotation ───────────────────────────────────────────────────── #[derive(Deserialize)] struct UpdateKeysRequest { keys: Vec, } #[derive(Serialize)] struct UpdateKeysResponse { slug: String, keys_active: usize, } /// `PUT /projects/{slug}/keys` — replace the key set for a registered project. /// /// Auth: /// - If the project has existing keys, requires `Authorization: Bearer /// ` with **admin** role. This prevents a viewer or an attacker /// from rotating keys. /// - If the project has no keys yet (bootstrap case), the request is accepted /// without credentials — the daemon is loopback-only, so OS-level access /// controls apply. #[ontoref_derive::onto_api( method = "PUT", path = "/projects/{slug}/keys", description = "Hot-rotate credentials for a project; invalidates all existing actor and UI \ sessions", auth = "admin", actors = "admin", tags = "projects, auth" )] async fn project_update_keys( State(state): State, headers: axum::http::HeaderMap, Path(slug): Path, Json(req): Json, ) -> impl IntoResponse { let registry = &state.registry; let ctx = match registry.get(&slug) { Some(c) => c, None => { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": format!("project '{slug}' not registered")})), ) .into_response(); } }; // If the project already has keys, require admin credentials before allowing // rotation. if ctx.auth_enabled() { let bearer = headers .get(axum::http::header::AUTHORIZATION) .and_then(|v| v.to_str().ok()) .and_then(|v| v.strip_prefix("Bearer ")) .map(str::trim) .filter(|s| !s.is_empty()); match bearer { None => { return ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "Authorization: Bearer required"})), ) .into_response(); } Some(password) => match ctx.verify_key(password).map(|m| m.role) { Some(crate::registry::Role::Admin) => {} Some(crate::registry::Role::Viewer) => { return ( StatusCode::FORBIDDEN, Json(serde_json::json!({"error": "admin role required to rotate keys"})), ) .into_response(); } None => { return ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "invalid credentials"})), ) .into_response(); } }, } } let keys_active = ctx.set_keys(req.keys); // Invalidate all active actor sessions. let evicted_actors = ctx.actors.deregister_all_for_project(&slug); if evicted_actors > 0 { tracing::info!(slug = %slug, evicted = evicted_actors, "actor sessions invalidated after key rotation"); } // Invalidate all UI/API sessions for this project — they authenticated // against the old key set and must re-authenticate after rotation. #[cfg(feature = "ui")] { let evicted_sessions = state.sessions.revoke_all_for_slug(&slug); if evicted_sessions > 0 { tracing::info!(slug = %slug, evicted = evicted_sessions, "UI sessions invalidated after key rotation"); } } // Persist keys to keys-overlay.json so they survive daemon restart. if let Some(ref config_dir) = state.config_dir { persist_keys_overlay(config_dir, &state.registry); } ( StatusCode::OK, Json(UpdateKeysResponse { slug, keys_active }), ) .into_response() } #[ontoref_derive::onto_api( method = "GET", path = "/projects/{slug}/ontology/versions", description = "Per-file ontology change counters for a project; incremented on every cache \ invalidation", auth = "none", actors = "agent, developer", tags = "projects, ontology, cache" )] /// Return per-file ontology version counters for a registered project. /// /// Each counter is incremented every time the watcher invalidates that specific /// file in the NCL cache. Clients can snapshot and compare between polls to /// detect which individual files changed, without re-fetching all content. async fn project_file_versions( State(state): State, Path(slug): Path, ) -> impl IntoResponse { let Some(ctx) = state.registry.get(&slug) else { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": format!("project '{slug}' not registered")})), ) .into_response(); }; let versions: std::collections::BTreeMap = ctx .file_versions .iter() .map(|r| (r.key().display().to_string(), *r.value())) .collect(); Json(serde_json::json!({ "slug": slug, "global_version": ctx.ontology_version.load(std::sync::atomic::Ordering::Acquire), "files": versions, })) .into_response() } // ── Config surface endpoints // ────────────────────────────────────────────────── /// Resolve a project context or return 404. macro_rules! require_project { ($state:expr, $slug:expr) => { match $state.registry.get(&$slug) { Some(ctx) => ctx, None => { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": format!("project '{}' not registered", $slug)})), ) .into_response() } } }; } /// Resolve the config surface for a project or return 404. macro_rules! require_config_surface { ($ctx:expr, $slug:expr) => { match &$ctx.config_surface { Some(s) => s.clone(), None => { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": format!("project '{}' has no config_surface in manifest.ncl", $slug)})), ) .into_response() } } }; } #[ontoref_derive::onto_api( method = "GET", path = "/projects/{slug}/config", description = "Full config export for a registered project (merged with any active overrides)", auth = "none", actors = "agent, developer", params = "slug:string:required:Project slug", tags = "config" )] async fn project_config( State(state): State, Path(slug): Path, ) -> impl IntoResponse { state.touch_activity(); let ctx = require_project!(state, slug); let surface = require_config_surface!(ctx, slug); let entry = surface.entry_point_path(&ctx.root); match ctx.cache.export(&entry, ctx.import_path.as_deref()).await { Ok((json, _)) => Json(json).into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } #[ontoref_derive::onto_api( method = "GET", path = "/projects/{slug}/config/schema", description = "Config surface schema: sections with descriptions, rationales, contracts, and \ declared consumers", auth = "none", actors = "agent, developer", params = "slug:string:required:Project slug", tags = "config" )] async fn project_config_schema( State(state): State, Path(slug): Path, ) -> impl IntoResponse { state.touch_activity(); let ctx = require_project!(state, slug); let surface = require_config_surface!(ctx, slug); let sections: Vec = surface .sections .iter() .map(|s| { serde_json::json!({ "id": s.id, "file": s.file, "contract": s.contract, "description": s.description, "rationale": s.rationale, "mutable": s.mutable, "consumers": s.consumers.iter().map(|c| serde_json::json!({ "id": c.id, "kind": format!("{:?}", c.kind), "ref": c.reference, "fields": c.fields, })).collect::>(), }) }) .collect(); Json(serde_json::json!({ "slug": slug, "config_root": surface.config_root.display().to_string(), "entry_point": surface.entry_point, "kind": format!("{:?}", surface.kind), "contracts_path": surface.contracts_path, "sections": sections, })) .into_response() } #[ontoref_derive::onto_api( method = "GET", path = "/projects/{slug}/config/{section}", description = "Values for a single config section (from the merged NCL export)", auth = "none", actors = "agent, developer", params = "slug:string:required:Project slug; section:string:required:Section id", tags = "config" )] async fn project_config_section( State(state): State, Path((slug, section)): Path<(String, String)>, ) -> impl IntoResponse { state.touch_activity(); let ctx = require_project!(state, slug); let surface = require_config_surface!(ctx, slug); if surface.section(§ion).is_none() { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": format!("section '{section}' not declared in config_surface")})), ) .into_response(); } let entry = surface.entry_point_path(&ctx.root); match ctx.cache.export(&entry, ctx.import_path.as_deref()).await { Ok((json, _)) => { let val = json .get(§ion) .cloned() .unwrap_or(serde_json::Value::Null); Json(serde_json::json!({"slug": slug, "section": section, "values": val})) .into_response() } Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } #[ontoref_derive::onto_api( method = "GET", path = "/projects/{slug}/config/coherence", description = "Multi-consumer coherence report: unclaimed NCL fields, consumer field \ mismatches", auth = "none", actors = "agent, developer", params = "slug:string:required:Project slug; section:string:optional:Filter to one section", tags = "config" )] async fn project_config_coherence( State(state): State, Path(slug): Path, Query(q): Query>, ) -> impl IntoResponse { state.touch_activity(); let ctx = require_project!(state, slug); let surface = require_config_surface!(ctx, slug); let section_filter = q.get("section").map(String::as_str); let report = crate::config_coherence::check_project( &slug, &surface, &ctx.root, &ctx.cache, ctx.import_path.as_deref(), section_filter, ) .await; Json(serde_json::to_value(&report).unwrap_or(serde_json::Value::Null)).into_response() } #[ontoref_derive::onto_api( method = "GET", path = "/projects/{slug}/config/quickref", description = "Generated config documentation with rationales, override history, and \ coherence status", auth = "none", actors = "agent, developer", params = "slug:string:required:Project slug; section:string:optional:Filter to one section; \ format:string:optional:Output format (json|markdown)", tags = "config" )] async fn project_config_quickref( State(state): State, Path(slug): Path, Query(q): Query>, ) -> impl IntoResponse { state.touch_activity(); let ctx = require_project!(state, slug); let surface = require_config_surface!(ctx, slug); let section_filter = q.get("section").map(String::as_str); let quickref = crate::config_coherence::build_quickref( &slug, &surface, &ctx.root, &ctx.cache, ctx.import_path.as_deref(), section_filter, ) .await; Json(quickref).into_response() } fn index_section_fields( sec_val: &serde_json::Map, section_id: &str, slug: &str, index: &mut std::collections::BTreeMap<(String, String), Vec<(String, serde_json::Value)>>, ) { for (field, value) in sec_val { if field.starts_with("_meta_") || field == "_overrides_meta" { continue; } index .entry((section_id.to_owned(), field.clone())) .or_default() .push((slug.to_owned(), value.clone())); } } #[ontoref_derive::onto_api( method = "GET", path = "/config/cross-project", description = "Compare config surfaces across all registered projects: shared values, \ conflicts, coverage gaps", auth = "none", actors = "agent, developer", tags = "config" )] async fn config_cross_project(State(state): State) -> impl IntoResponse { state.touch_activity(); // Collect all registered projects that have a config_surface. let candidates: Vec> = state .registry .all() .into_iter() .filter(|ctx| ctx.config_surface.is_some()) .collect(); // Export each project's full config. NclCache makes repeated calls cheap. // (section_id, field_path) → Vec<(slug, serde_json::Value)> let mut field_index: std::collections::BTreeMap< (String, String), Vec<(String, serde_json::Value)>, > = std::collections::BTreeMap::new(); let mut project_summaries: Vec = Vec::new(); for ctx in &candidates { let surface = ctx.config_surface.as_ref().unwrap(); let entry = surface.entry_point_path(&ctx.root); let export = ctx .cache .export(&entry, ctx.import_path.as_deref()) .await .ok() .map(|(j, _)| j); let section_ids: Vec = surface.sections.iter().map(|s| s.id.clone()).collect(); if let Some(ref full) = export { for section in &surface.sections { let Some(sec_val) = full.get(§ion.id).and_then(|v| v.as_object()) else { continue; }; index_section_fields(sec_val, §ion.id, &ctx.slug, &mut field_index); } } project_summaries.push(serde_json::json!({ "slug": ctx.slug, "config_root": surface.config_root.display().to_string(), "kind": format!("{:?}", surface.kind), "sections": section_ids, "export_ok": export.is_some(), })); } // Shared values: same (section, field) present in ≥2 projects with identical // value. let mut shared: Vec = Vec::new(); // Conflicts: same (section, field) with differing values across projects. let mut conflicts: Vec = Vec::new(); // Port collisions: numeric fields named `port` or ending in `_port`. let mut port_collisions: Vec = Vec::new(); for ((section_id, field), entries) in &field_index { if entries.len() < 2 { continue; } let first_val = &entries[0].1; let all_same = entries.iter().all(|(_, v)| v == first_val); let is_port_field = field == "port" || field.ends_with("_port"); if is_port_field { if !all_same { // Different ports — not necessarily a conflict, but flag them. port_collisions.push(serde_json::json!({ "section": section_id, "field": field, "values": entries.iter().map(|(slug, v)| serde_json::json!({ "slug": slug, "value": v })).collect::>(), })); } } else if all_same { shared.push(serde_json::json!({ "section": section_id, "field": field, "value": first_val, "projects": entries.iter().map(|(s, _)| s).collect::>(), })); } else { conflicts.push(serde_json::json!({ "section": section_id, "field": field, "values": entries.iter().map(|(slug, v)| serde_json::json!({ "slug": slug, "value": v })).collect::>(), })); } } // Coverage gaps: section present in some projects but not others. let all_sections: std::collections::BTreeSet = field_index.keys().map(|(s, _)| s.clone()).collect(); let mut coverage_gaps: Vec = Vec::new(); for section_id in &all_sections { let present_in: Vec = candidates .iter() .filter(|ctx| { ctx.config_surface .as_ref() .map(|s| s.sections.iter().any(|sec| &sec.id == section_id)) .unwrap_or(false) }) .map(|ctx| ctx.slug.clone()) .collect(); if present_in.len() < candidates.len() { let absent_in: Vec = candidates .iter() .filter(|ctx| !present_in.contains(&ctx.slug)) .map(|ctx| ctx.slug.clone()) .collect(); coverage_gaps.push(serde_json::json!({ "section": section_id, "present_in": present_in, "absent_in": absent_in, })); } } Json(serde_json::json!({ "projects": project_summaries, "shared_values": shared, "conflicts": conflicts, "port_report": port_collisions, "coverage_gaps": coverage_gaps, "total_projects": candidates.len(), })) .into_response() } #[derive(Deserialize)] pub struct ConfigUpdateRequest { /// JSON object with fields to set in this section. pub values: serde_json::Value, /// Reason for the change — written as a comment in the override file. #[serde(default)] pub reason: String, /// When true (default for safety): return the proposed override NCL /// without writing to disk. #[serde(default = "default_dry_run")] pub dry_run: bool, } fn default_dry_run() -> bool { true } #[ontoref_derive::onto_api( method = "PUT", path = "/projects/{slug}/config/{section}", description = "Mutate a config section via the override layer. dry_run=true (default) returns \ the proposed change without writing.", auth = "admin", actors = "agent, developer", params = "slug:string:required:Project slug; section:string:required:Section id", tags = "config" )] async fn project_config_update( State(state): State, headers: axum::http::HeaderMap, Path((slug, section)): Path<(String, String)>, Json(req): Json, ) -> impl IntoResponse { state.touch_activity(); let ctx = require_project!(state, slug); // Require admin auth if the project has keys. if ctx.auth_enabled() { let bearer = headers .get(axum::http::header::AUTHORIZATION) .and_then(|v| v.to_str().ok()) .and_then(|v| v.strip_prefix("Bearer ")) .map(str::trim) .filter(|s| !s.is_empty()); match bearer { None => { return ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "Authorization: Bearer required"})), ) .into_response(); } Some(password) => match ctx.verify_key(password).map(|m| m.role) { Some(crate::registry::Role::Admin) => {} Some(crate::registry::Role::Viewer) => { return ( StatusCode::FORBIDDEN, Json(serde_json::json!({"error": "admin role required to mutate config"})), ) .into_response(); } None => { return ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "invalid credentials"})), ) .into_response(); } }, } } let surface = require_config_surface!(ctx, slug); // TypeDialog projects manage their own write pipeline (form.toml → // validators → fragments). The NCL override layer is not applicable. if matches!(surface.kind, crate::registry::ConfigKind::TypeDialog) { return ( StatusCode::METHOD_NOT_ALLOWED, Json(serde_json::json!({ "error": "TypeDialog config surfaces are not mutable via this endpoint", "detail": "TypeDialog projects use form.toml + validators + fragments. \ Mutate values through the TypeDialog pipeline directly.", "kind": "TypeDialog", })), ) .into_response(); } let section_meta = match surface.section(§ion) { Some(s) => s.clone(), None => return ( StatusCode::NOT_FOUND, Json( serde_json::json!({"error": format!("section '{section}' not in config_surface")}), ), ) .into_response(), }; if !section_meta.mutable { return ( StatusCode::METHOD_NOT_ALLOWED, Json(serde_json::json!({ "error": format!("section '{section}' is marked immutable in manifest.ncl"), "detail": "Set mutable = true in the config_section declaration to enable writes.", })), ) .into_response(); } if ctx.push_only { // push_only projects never have writable local files. return generate_override_diff(&surface, §ion, &req, &ctx.root).into_response(); } if req.dry_run { return generate_override_diff(&surface, §ion, &req, &ctx.root).into_response(); } // Apply the override: write {section}.overrides.ncl, patch entry point, // validate with nickel export, revert on failure. match apply_config_override( &surface, §ion, &req, &ctx.root, &ctx.cache, ctx.import_path.as_deref(), ) .await { Ok(result) => Json(result).into_response(), Err(e) => ( StatusCode::UNPROCESSABLE_ENTITY, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } /// Generate a dry-run diff as a `serde_json::Value` (shared by HTTP and MCP). pub fn generate_override_diff_value( surface: &crate::registry::ConfigSurface, section: &str, req: &ConfigUpdateRequest, project_root: &std::path::Path, ) -> serde_json::Value { let overrides_dir = surface.resolved_overrides_dir(); let override_path = project_root .join(overrides_dir) .join(format!("{section}.overrides.ncl")); let ncl_content = render_override_ncl(section, &req.values, &req.reason, project_root); serde_json::json!({ "dry_run": true, "section": section, "override_file": override_path.display().to_string(), "proposed_ncl": ncl_content, "values": req.values, }) } /// Generate a dry-run HTTP response showing what the override file would look /// like. fn generate_override_diff( surface: &crate::registry::ConfigSurface, section: &str, req: &ConfigUpdateRequest, project_root: &std::path::Path, ) -> impl IntoResponse { Json(generate_override_diff_value( surface, section, req, project_root, )) } /// Render the NCL content for an override file. fn render_override_ncl( section: &str, values: &serde_json::Value, reason: &str, _project_root: &std::path::Path, ) -> String { let ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map(|d| d.as_secs()) .unwrap_or(0); let mut lines = vec![ format!("# {section}.overrides.ncl — generated by ontoref"), "# DO NOT edit manually — managed by ontoref config surface.".to_owned(), "# To revert: delete this file and remove the import from the entry point.".to_owned(), "{".to_owned(), format!(" _overrides_meta = {{"), format!(" managed_by = \"ontoref\","), format!(" updated_at = {ts},"), ]; if !reason.is_empty() { lines.push(format!(" reason = {reason:?},")); } lines.push(" },".to_owned()); // Emit each key as a top-level field override. if let Some(obj) = values.as_object() { for (key, val) in obj { let ncl_val = json_to_ncl_literal(val); lines.push(format!(" {section}.{key} = {ncl_val},")); } } lines.push("}".to_owned()); lines.join("\n") } /// Best-effort conversion of a JSON value to a NCL literal. /// Supports primitives, arrays of primitives, and nested objects. fn json_to_ncl_literal(val: &serde_json::Value) -> String { match val { serde_json::Value::Null => "null".to_owned(), serde_json::Value::Bool(b) => b.to_string(), serde_json::Value::Number(n) => n.to_string(), serde_json::Value::String(s) => format!("{s:?}"), serde_json::Value::Array(arr) => { let items: Vec = arr.iter().map(json_to_ncl_literal).collect(); format!("[{}]", items.join(", ")) } serde_json::Value::Object(obj) => { let fields: Vec = obj .iter() .map(|(k, v)| format!(" {k} = {}", json_to_ncl_literal(v))) .collect(); format!("{{\n{}\n}}", fields.join(",\n")) } } } /// Write the override file, patch the entry point, and validate. /// Reverts on validation failure. pub async fn apply_config_override( surface: &crate::registry::ConfigSurface, section: &str, req: &ConfigUpdateRequest, project_root: &std::path::Path, cache: &crate::cache::NclCache, import_path: Option<&str>, ) -> anyhow::Result { use std::io::Write; let overrides_dir = project_root.join(surface.resolved_overrides_dir()); let override_file = overrides_dir.join(format!("{section}.overrides.ncl")); let entry_point = surface.entry_point_path(project_root); let ncl_content = render_override_ncl(section, &req.values, &req.reason, project_root); // Backup old override file if it exists. let backup = override_file.with_extension("ncl.bak"); if override_file.exists() { std::fs::copy(&override_file, &backup)?; } // Write the override file. { let mut f = std::fs::File::create(&override_file)?; f.write_all(ncl_content.as_bytes())?; } // Patch entry point to import the override if not already present. let override_import = format!("(import \"./{section}.overrides.ncl\")"); let entry_original = std::fs::read_to_string(&entry_point)?; let patched = if !entry_original.contains(&override_import) { // Append import at the end — NCL merge chain, override wins. Some(format!("{entry_original}\n& {override_import}\n")) } else { None }; if let Some(ref new_content) = patched { std::fs::write(&entry_point, new_content)?; } // Validate by re-exporting the entry point. cache.invalidate_file(&override_file); cache.invalidate_file(&entry_point); match cache.export(&entry_point, import_path).await { Ok(_) => { // Clean up backup on success. let _ = std::fs::remove_file(&backup); Ok(serde_json::json!({ "applied": true, "section": section, "override_file": override_file.display().to_string(), "values": req.values, })) } Err(e) => { // Revert: restore backup or remove the override file. if backup.exists() { let _ = std::fs::copy(&backup, &override_file); let _ = std::fs::remove_file(&backup); } else { let _ = std::fs::remove_file(&override_file); } // Restore entry point if we patched it. if patched.is_some() { let _ = std::fs::write(&entry_point, &entry_original); } cache.invalidate_file(&override_file); cache.invalidate_file(&entry_point); anyhow::bail!("nickel export validation failed after override: {e}") } } } /// Exchange a key for a session token. /// /// Accepts project keys (looked up by slug) or the daemon admin password. /// Returns a short-lived token suitable for use as `Authorization: Bearer /// ` or as a browser session cookie (set by the UI login flow). /// /// Rate-limited: repeated failures from the same IP exhaust the same budget as /// failed Bearer attempts on all other endpoints. #[cfg(feature = "ui")] #[derive(Deserialize)] struct SessionRequest { /// Raw (unhashed) key to verify. key: String, /// Project slug to authenticate against. `"_daemon"` or absent → daemon /// admin. project: Option, } #[cfg(feature = "ui")] #[derive(Serialize)] struct SessionResponse { token: String, role: String, key_label: String, project: String, expires_at: u64, } #[cfg(feature = "ui")] async fn session_create( State(state): State, axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo, Json(req): Json, ) -> impl IntoResponse { use std::time::{SystemTime, UNIX_EPOCH}; let client_ip = addr.ip(); if state.auth_rate_limiter.is_limited(client_ip) { return ( StatusCode::TOO_MANY_REQUESTS, Json(serde_json::json!({ "error": "too many failed auth attempts", "retry_after_secs": 60 })), ) .into_response(); } let target = req.project.as_deref().unwrap_or("_daemon"); // Daemon admin path. if target == "_daemon" { let Some(ref hash) = state.daemon_admin_hash else { state.auth_rate_limiter.record_failure(client_ip); return ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "daemon admin auth not configured"})), ) .into_response(); }; let key_entry = crate::registry::KeyEntry { role: crate::registry::Role::Admin, hash: hash.clone(), label: "daemon-admin".to_string(), }; if crate::registry::verify_keys_list(std::slice::from_ref(&key_entry), &req.key).is_some() { state.auth_rate_limiter.record_success(client_ip); let token = state.sessions.create( "_daemon".to_string(), crate::registry::Role::Admin, "daemon-admin".to_string(), ); let expires_at = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs() + 30 * 24 * 3600; return Json(SessionResponse { token, role: "admin".to_string(), key_label: "daemon-admin".to_string(), project: "_daemon".to_string(), expires_at, }) .into_response(); } state.auth_rate_limiter.record_failure(client_ip); return ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "invalid daemon admin credentials"})), ) .into_response(); } // Project key path. let Some(ctx) = state.registry.get(target) else { state.auth_rate_limiter.record_failure(client_ip); return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": format!("project '{target}' not registered")})), ) .into_response(); }; match ctx.verify_key(&req.key) { Some(km) => { state.auth_rate_limiter.record_success(client_ip); let role_str = match km.role { crate::registry::Role::Admin => "admin", crate::registry::Role::Viewer => "viewer", }; let token = state .sessions .create(target.to_string(), km.role, km.label.clone()); let expires_at = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs() + 30 * 24 * 3600; Json(SessionResponse { token, role: role_str.to_string(), key_label: km.label, project: target.to_string(), expires_at, }) .into_response() } None => { state.auth_rate_limiter.record_failure(client_ip); ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "invalid credentials"})), ) .into_response() } } } /// `GET /sessions?project=` — list active sessions for a project. /// /// Auth: /// - Any valid session for the requested project (viewer or admin). /// - Daemon admin session (slug == `_daemon`) can query any project. /// - Without `?project=`, requires daemon admin and returns all sessions. #[cfg(feature = "ui")] #[derive(Deserialize)] struct SessionsQuery { project: Option, } #[cfg(feature = "ui")] async fn sessions_list( State(state): State, headers: axum::http::HeaderMap, Query(q): Query, ) -> axum::response::Response { let acting = match require_session(&state, &headers) { Ok(e) => e, Err(r) => return *r, }; let views = match q.project.as_deref() { Some(slug) => { // Must be a session for that project or daemon admin. let authorized = acting.slug == "_daemon" || acting.slug == slug; if !authorized { return ( StatusCode::FORBIDDEN, Json(serde_json::json!({"error": "session is not scoped to this project"})), ) .into_response(); } if state.registry.get(slug).is_none() { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": format!("project '{slug}' not registered")})), ) .into_response(); } state.sessions.list_for_slug(slug) } None => { // No project filter — daemon admin only. if acting.slug != "_daemon" { return ( StatusCode::FORBIDDEN, Json(serde_json::json!({"error": "daemon admin session required to list all sessions"})), ) .into_response(); } state.sessions.list_all() } }; Json(serde_json::json!({"sessions": views})).into_response() } /// `DELETE /sessions/{id}` — revoke a session by its public id. /// /// Auth: /// - Project admin: may revoke sessions scoped to their own project. /// - Daemon admin: may revoke any session. #[cfg(feature = "ui")] async fn session_revoke( State(state): State, headers: axum::http::HeaderMap, Path(id): Path, ) -> axum::response::Response { let acting = match require_session(&state, &headers) { Ok(e) => e, Err(r) => return *r, }; use crate::session::RevokeResult; match state.sessions.revoke_by_id(&id, &acting.slug, &acting.role) { RevokeResult::Revoked => StatusCode::NO_CONTENT.into_response(), RevokeResult::NotFound => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "session not found or already expired"})), ) .into_response(), RevokeResult::Forbidden => ( StatusCode::FORBIDDEN, Json(serde_json::json!({"error": "not authorized to revoke this session"})), ) .into_response(), } } /// Write a snapshot of all project keys to `{config_dir}/keys-overlay.json`. /// Format: `{ "slug": [KeyEntry, ...], ... }` — loaded at next boot by main.rs. fn persist_keys_overlay(config_dir: &std::path::Path, registry: &crate::registry::ProjectRegistry) { let map: std::collections::HashMap> = registry .all() .into_iter() .filter_map(|ctx| { let keys = ctx.keys.read().unwrap_or_else(|e| e.into_inner()).clone(); if keys.is_empty() { None } else { Some((ctx.slug.clone(), keys)) } }) .collect(); let path = config_dir.join("keys-overlay.json"); match serde_json::to_string_pretty(&map) { Ok(json) => { if let Err(e) = std::fs::write(&path, json) { tracing::warn!(error = %e, path = %path.display(), "failed to persist keys overlay"); } } Err(e) => tracing::warn!(error = %e, "failed to serialize keys overlay"), } } // ── Helpers ───────────────────────────────────────────────────────────── /// Resolve a path that may be absolute or relative to project_root. /// Absolute paths are accepted as-is (daemon is loopback-only; OS enforces /// access). Relative paths are resolved against project_root and must stay /// within it. fn resolve_any_path( project_root: &std::path::Path, path: &str, ) -> std::result::Result { let p = PathBuf::from(path); if p.is_absolute() { return p.canonicalize().map_err(|e| { ( StatusCode::BAD_REQUEST, format!("path does not exist or is inaccessible: {e}"), ) }); } let joined = project_root.join(p); let canonical = joined.canonicalize().map_err(|e| { ( StatusCode::BAD_REQUEST, format!("path does not exist or is inaccessible: {e}"), ) })?; if !canonical.starts_with(project_root) { return Err(( StatusCode::BAD_REQUEST, format!("path escapes project root: {path}"), )); } Ok(canonical) } /// Resolve a path relative to the project root and verify it stays within. /// Rejects absolute paths — used for cache management operations scoped to a /// project. fn resolve_path( project_root: &std::path::Path, path: &str, ) -> std::result::Result { let p = PathBuf::from(path); if p.is_absolute() { return Err(( StatusCode::BAD_REQUEST, "absolute paths are not accepted".to_string(), )); } let joined = project_root.join(p); let canonical = joined.canonicalize().map_err(|e| { ( StatusCode::BAD_REQUEST, format!("path does not exist or is inaccessible: {e}"), ) })?; if !canonical.starts_with(project_root) { return Err(( StatusCode::BAD_REQUEST, format!("path escapes project root: {path}"), )); } Ok(canonical) } impl IntoResponse for crate::error::DaemonError { fn into_response(self) -> axum::response::Response { let body = serde_json::json!({"error": self.to_string()}); (StatusCode::INTERNAL_SERVER_ERROR, Json(body)).into_response() } } #[cfg(test)] mod tests { use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::time::Instant; use axum::body::to_bytes; use axum::http::{Request, StatusCode}; use tower::ServiceExt as _; use super::*; use crate::registry::{make_context, ContextSpec, ProjectRegistry}; /// Build a minimal `AppState` suitable for handler tests. /// /// Auth is disabled (no keys). Registry primary slug is "test". /// All optional feature-gated fields are `None`. fn test_state(project_root: std::path::PathBuf) -> AppState { let ctx = make_context(ContextSpec { slug: "test".into(), root: project_root.clone(), import_path: None, keys: vec![], remote_url: String::new(), push_only: false, stale_actor_timeout: 300, max_notifications: 100, ack_required: vec![], config_surface: None, }); let ctx = Arc::new(ctx); let actors = Arc::clone(&ctx.actors); let notifications = Arc::clone(&ctx.notifications); let cache = Arc::clone(&ctx.cache); let registry = Arc::new(ProjectRegistry::with_primary( "test".into(), ctx, vec![], 300, 100, )); AppState { cache, project_root, ontoref_root: None, started_at: Instant::now(), last_activity: Arc::new(AtomicU64::new(0)), actors, notifications, nickel_import_path: None, #[cfg(feature = "db")] db: None, #[cfg(feature = "nats")] nats: None, #[cfg(feature = "ui")] tera: None, #[cfg(feature = "ui")] public_dir: None, registry, new_project_tx: None, config_dir: None, #[cfg(feature = "ui")] sessions: Arc::new(crate::session::SessionStore::new()), #[cfg(feature = "mcp")] mcp_current_project: Arc::new(std::sync::RwLock::new(None)), watcher_map: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), auth_rate_limiter: Arc::new(AuthRateLimiter::new(false)), #[cfg(feature = "ui")] ncl_write_lock: Arc::new(crate::ui::ncl_write::NclWriteLock::new()), #[cfg(feature = "ui")] daemon_admin_hash: None, } } async fn body_json(resp: axum::response::Response) -> serde_json::Value { let bytes = to_bytes(resp.into_body(), usize::MAX).await.unwrap(); serde_json::from_slice(&bytes).unwrap() } #[tokio::test] async fn health_returns_ok_status() { let state = test_state(std::path::PathBuf::from("/tmp/test-project")); let app = router(state); let resp = app .oneshot( Request::builder() .uri("/health") .body(axum::body::Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(resp.status(), StatusCode::OK); let json = body_json(resp).await; assert_eq!(json["status"], "ok"); assert_eq!(json["project_root"], "/tmp/test-project"); assert_eq!(json["active_actors"], 0); assert_eq!(json["cache_entries"], 0); } #[tokio::test] async fn cache_stats_empty_on_fresh_state() { let state = test_state(std::path::PathBuf::from("/tmp/test-project")); let app = router(state); let resp = app .oneshot( Request::builder() .uri("/cache/stats") .body(axum::body::Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(resp.status(), StatusCode::OK); let json = body_json(resp).await; assert_eq!(json["entries"], 0); assert_eq!(json["hits"], 0); assert_eq!(json["misses"], 0); assert_eq!(json["hit_rate"], 0.0); } #[tokio::test] async fn actor_register_returns_201_with_token() { let state = test_state(std::path::PathBuf::from("/tmp/test-project")); let app = router(state); let body = serde_json::json!({ "actor_type": "developer", "project": "test", "hostname": "host-a", "pid": 1001 }); let resp = app .oneshot( Request::builder() .method("POST") .uri("/actors/register") .header("content-type", "application/json") .body(axum::body::Body::from(body.to_string())) .unwrap(), ) .await .unwrap(); assert_eq!(resp.status(), StatusCode::CREATED); let json = body_json(resp).await; assert!(json["token"].is_string(), "token must be present"); assert_eq!(json["actors_connected"], 1); } #[tokio::test] async fn actor_deregister_204_then_404() { let state = test_state(std::path::PathBuf::from("/tmp/test-project")); let register_body = serde_json::json!({ "actor_type": "agent", "project": "test", "hostname": "host-b", "pid": 1002 }); let register_resp = router(state.clone()) .oneshot( Request::builder() .method("POST") .uri("/actors/register") .header("content-type", "application/json") .body(axum::body::Body::from(register_body.to_string())) .unwrap(), ) .await .unwrap(); let token = body_json(register_resp).await["token"] .as_str() .unwrap() .to_string(); let del_resp = router(state.clone()) .oneshot( Request::builder() .method("DELETE") .uri(format!("/actors/{token}")) .body(axum::body::Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(del_resp.status(), StatusCode::NO_CONTENT); let del_again = router(state) .oneshot( Request::builder() .method("DELETE") .uri(format!("/actors/{token}")) .body(axum::body::Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(del_again.status(), StatusCode::NOT_FOUND); } #[tokio::test] async fn actor_touch_204_for_known_404_for_unknown() { let state = test_state(std::path::PathBuf::from("/tmp/test-project")); let reg_body = serde_json::json!({ "actor_type": "ci", "project": "test", "hostname": "host-c", "pid": 1003 }); let reg_resp = router(state.clone()) .oneshot( Request::builder() .method("POST") .uri("/actors/register") .header("content-type", "application/json") .body(axum::body::Body::from(reg_body.to_string())) .unwrap(), ) .await .unwrap(); let token = body_json(reg_resp).await["token"] .as_str() .unwrap() .to_string(); let touch_ok = router(state.clone()) .oneshot( Request::builder() .method("POST") .uri(format!("/actors/{token}/touch")) .body(axum::body::Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(touch_ok.status(), StatusCode::NO_CONTENT); let touch_miss = router(state) .oneshot( Request::builder() .method("POST") .uri("/actors/nonexistent-token/touch") .body(axum::body::Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(touch_miss.status(), StatusCode::NOT_FOUND); } #[tokio::test] async fn actors_list_reflects_registered_actors() { let state = test_state(std::path::PathBuf::from("/tmp/test-project")); for i in 0..3 { let body = serde_json::json!({ "actor_type": "developer", "project": "test", "hostname": format!("host-{i}"), "pid": 2000u32 + i as u32 }); router(state.clone()) .oneshot( Request::builder() .method("POST") .uri("/actors/register") .header("content-type", "application/json") .body(axum::body::Body::from(body.to_string())) .unwrap(), ) .await .unwrap(); } let list_resp = router(state) .oneshot( Request::builder() .uri("/actors") .body(axum::body::Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(list_resp.status(), StatusCode::OK); let json = body_json(list_resp).await; assert_eq!(json["total"], 3); assert_eq!(json["actors"].as_array().unwrap().len(), 3); } /// `check_primary_auth` is called per-handler, not via middleware, so we /// test it as a unit rather than through the full HTTP stack. #[test] fn check_primary_auth_returns_401_without_bearer() { use argon2::password_hash::{rand_core::OsRng, SaltString}; use argon2::{Argon2, PasswordHasher}; use crate::registry::{KeyEntry, Role}; let salt = SaltString::generate(&mut OsRng); let hash = Argon2::default() .hash_password(b"s3cr3t", &salt) .unwrap() .to_string(); let state = test_state(std::path::PathBuf::from("/tmp/test-project")); state.registry.primary().set_keys(vec![KeyEntry { role: Role::Admin, hash, label: String::new(), }]); let client_ip = "10.0.0.1".parse::().unwrap(); let headers = axum::http::HeaderMap::new(); let result = state.check_primary_auth(&headers, client_ip); assert!(result.is_some(), "must reject missing Bearer"); let resp = result.unwrap(); assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); } #[test] fn check_primary_auth_accepts_correct_bearer() { use argon2::password_hash::{rand_core::OsRng, SaltString}; use argon2::{Argon2, PasswordHasher}; use crate::registry::{KeyEntry, Role}; let salt = SaltString::generate(&mut OsRng); let hash = Argon2::default() .hash_password(b"s3cr3t", &salt) .unwrap() .to_string(); let state = test_state(std::path::PathBuf::from("/tmp/test-project")); state.registry.primary().set_keys(vec![KeyEntry { role: Role::Admin, hash, label: String::new(), }]); let client_ip = "10.0.0.1".parse::().unwrap(); let mut headers = axum::http::HeaderMap::new(); headers.insert( axum::http::header::AUTHORIZATION, "Bearer s3cr3t".parse().unwrap(), ); let result = state.check_primary_auth(&headers, client_ip); assert!(result.is_none(), "valid credentials must pass through"); } }