419 lines
16 KiB
Rust
Raw Normal View History

2026-03-13 00:18:14 +00:00
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicU64;
2026-03-13 00:18:14 +00:00
use std::sync::Arc;
use std::time::Duration;
use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
use tokio::sync::{mpsc, Semaphore};
2026-03-13 00:18:14 +00:00
use tracing::{debug, info, warn};
use crate::actors::ActorRegistry;
use crate::cache::NclCache;
use crate::notifications::NotificationStore;
use crate::registry::{KeyEntry, ProjectRegistry};
2026-03-13 00:18:14 +00:00
/// Directories to watch for NCL changes relative to a project root.
const WATCH_DIRS: &[&str] = &[".ontology", "adrs", "reflection", "ontology"];
/// File watcher that invalidates the NCL cache on filesystem changes
/// and pushes notifications to the notification store.
pub struct FileWatcher {
_watcher: RecommendedWatcher,
debounce_task: tokio::task::JoinHandle<()>,
}
impl FileWatcher {
/// Cancel the debounce task and stop watching.
///
/// The underlying `notify` watcher drops when `FileWatcher` is dropped.
/// Calling `abort()` before drop ensures the async task is cancelled
/// immediately rather than running until its next yield point.
pub fn abort(&self) {
self.debounce_task.abort();
}
2026-03-13 00:18:14 +00:00
}
/// Optional dependencies injected into the file watcher.
pub struct WatcherDeps {
/// Project slug used for DB seeding, NATS events, and notification scoping.
/// Falls back to the directory name of `project_root` when empty.
pub slug: String,
2026-03-13 00:18:14 +00:00
#[cfg(feature = "db")]
pub db: Option<Arc<stratum_db::StratumDb>>,
pub import_path: Option<String>,
pub notifications: Arc<NotificationStore>,
pub actors: Arc<ActorRegistry>,
#[cfg(feature = "nats")]
pub nats: Option<Arc<crate::nats::NatsPublisher>>,
/// Shared with `ProjectContext` — prevents concurrent re-seeds per project.
pub seed_lock: Arc<Semaphore>,
/// Shared with `ProjectContext` — incremented after each successful seed.
pub ontology_version: Arc<AtomicU64>,
--- feat: API catalog surface, protocol v2 tooling, MCP expansion, on+re update ## Summary Session 2026-03-23. Closes the loop between handler code and discoverability across all three surfaces (browser, CLI, MCP agent) via compile-time inventory registration. Adds protocol v2 update tooling, extends MCP from 21 to 29 tools, and brings the self-description up to date. ## API Catalog Surface (#[onto_api] proc-macro) - crates/ontoref-derive: new proc-macro crate; `#[onto_api(method, path, description, auth, actors, params, tags)]` emits `inventory::submit!(ApiRouteEntry{...})` at link time - crates/ontoref-daemon/src/api_catalog.rs: `catalog()` — pure fn over `inventory::iter::<ApiRouteEntry>()`, zero runtime allocation - GET /api/catalog: returns full annotated HTTP surface as JSON - templates/pages/api_catalog.html: new page with client-side filtering by method, auth, path/description; detail panel per route (params table, feature flag); linked from dashboard card and nav - UI nav: "API" link (</> icon) added to mobile dropdown and desktop bar - inventory = "0.3" added to workspace.dependencies (MIT, zero transitive deps) ## Protocol Update Mode - reflection/modes/update_ontoref.ncl: 9-step DAG (5 detect parallel, 2 update idempotent, 2 validate, 1 report) — brings any project from protocol v1 to v2 by adding manifest.ncl and connections.ncl if absent, scanning ADRs for deprecated check_hint, validating with nickel export - reflection/templates/update-ontology-prompt.md: 8-phase reusable prompt for agent-driven ontology enrichment (infrastructure → audit → core.ncl → state.ncl → manifest.ncl → connections.ncl → ADR migration → validation) ## CLI — describe group extensions - reflection/bin/ontoref.nu: `describe diff [--fmt] [--file]` and `describe api [--actor] [--tag] [--auth] [--fmt]` registered as canonical subcommands with log-action; aliases `df` and `da` added; QUICK REFERENCE and ALIASES sections updated ## MCP — two new tools (21 → 29 total) - ontoref_api_catalog: filters catalog() output by actor/tag/auth; returns { routes, total } — no HTTP roundtrip, calls inventory directly - ontoref_file_versions: reads ProjectContext.file_versions DashMap per slug; returns BTreeMap<filename, u64> reload counters - insert_mcp_ctx: audited and updated from 15 to 28 entries in 6 groups - HelpTool JSON: 8 new entries (validate_adrs, validate, impact, guides, bookmark_list, bookmark_add, api_catalog, file_versions) - ServerHandler::get_info instructions updated to mention new tools ## Web UI — dashboard additions - Dashboard: "API Catalog" card (9th); "Ontology File Versions" section showing per-file reload counters from file_versions DashMap - dashboard_mp: builds BTreeMap<String, u64> from ctx.file_versions and injects into Tera context ## on+re update - .ontology/core.ncl: describe-query-layer and adopt-ontoref-tooling descriptions updated; ontoref-daemon updated ("11 pages", "29 tools", API catalog, per-file versioning, #[onto_api]); new node api-catalog-surface (Yang/Practice) with 3 edges; artifact_paths extended across 3 nodes - .ontology/state.ncl: protocol-maturity blocker updated (protocol v2 complete); self-description-coverage catalyst updated with session 2026-03-23 additions - ADR-007: "API Surface Discoverability via #[onto_api] Proc-Macro" — Accepted ## Documentation - README.md: crates table updated (11 pages, 29 MCP tools, ontoref-derive row); MCP representative table expanded; API Catalog, Semantic Diff, Per-File Versioning paragraphs added; update_ontoref onboarding section added - CHANGELOG.md: [Unreleased] section with 4 change groups - assets/web/src/index.html: tool counts 19→29 (EN+ES), page counts 12→11 (EN+ES), daemon description paragraph updated with API catalog + #[onto_api]
2026-03-23 00:58:27 +01:00
/// Shared with `ProjectContext` — per-file change counters, keyed by
/// canonical path. Incremented unconditionally on every cache invalidation.
pub file_versions: Arc<dashmap::DashMap<std::path::PathBuf, u64>>,
2026-03-13 00:18:14 +00:00
}
impl FileWatcher {
/// Start watching NCL-relevant directories under `project_root`.
///
/// Changes are debounced (200ms) before invalidating the cache.
/// A periodic full invalidation runs every `full_invalidation_secs` as
/// safety net.
pub fn start(
project_root: &Path,
cache: Arc<NclCache>,
full_invalidation_secs: u64,
deps: WatcherDeps,
) -> std::result::Result<Self, crate::error::DaemonError> {
let (tx, rx) = mpsc::channel::<Vec<PathBuf>>(256);
let project_root_owned = project_root
.canonicalize()
.unwrap_or_else(|_| project_root.to_path_buf());
let tx_notify = tx.clone();
let mut watcher = RecommendedWatcher::new(
move |res: std::result::Result<Event, notify::Error>| match res {
Ok(event) => {
let ncl_paths: Vec<PathBuf> = event
.paths
.into_iter()
.filter(|p| {
p.extension()
.is_some_and(|ext| ext == "ncl" || ext == "jsonl")
})
.collect();
if !ncl_paths.is_empty() {
let _ = tx_notify.try_send(ncl_paths);
}
}
Err(e) => warn!(error = %e, "file watcher error"),
},
Config::default(),
)
.map_err(|e| crate::error::DaemonError::Watcher(e.to_string()))?;
let mut watched_count = 0;
for dir_name in WATCH_DIRS {
let dir = project_root.join(dir_name);
if dir.is_dir() {
if let Err(e) = watcher.watch(&dir, RecursiveMode::Recursive) {
warn!(dir = %dir.display(), error = %e, "failed to watch directory");
} else {
info!(dir = %dir.display(), "watching directory");
watched_count += 1;
}
}
}
info!(watched_count, "file watcher started");
let debounce_task = tokio::spawn(debounce_loop(
rx,
cache,
project_root_owned,
full_invalidation_secs,
deps,
));
Ok(Self {
_watcher: watcher,
debounce_task,
2026-03-13 00:18:14 +00:00
})
}
}
/// Debounce filesystem events: collect paths over 200ms windows, then
/// invalidate once. Also runs periodic full invalidation as safety net.
/// Pushes notifications to the store and optionally publishes via NATS.
async fn debounce_loop(
mut rx: mpsc::Receiver<Vec<PathBuf>>,
cache: Arc<NclCache>,
project_root: PathBuf,
full_invalidation_secs: u64,
deps: WatcherDeps,
) {
let debounce = Duration::from_millis(200);
let effective_secs = if full_invalidation_secs == 0 {
60
} else {
full_invalidation_secs
};
let mut full_tick = tokio::time::interval(Duration::from_secs(effective_secs));
full_tick.tick().await; // consume immediate first tick
let project_name = if deps.slug.is_empty() {
project_root
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
.to_string()
} else {
deps.slug.clone()
};
2026-03-13 00:18:14 +00:00
loop {
tokio::select! {
recv = rx.recv() => match recv {
None => {
debug!("watcher channel closed — debounce task exiting");
return;
}
Some(paths) => {
// Collect all events within debounce window
let mut all_paths = paths;
tokio::time::sleep(debounce).await;
while let Ok(more) = rx.try_recv() {
all_paths.extend(more);
}
// Canonicalize, deduplicate, and invalidate.
let mut canonical: Vec<PathBuf> = all_paths
.into_iter()
.filter_map(|p| p.canonicalize().ok())
.collect();
canonical.sort();
canonical.dedup();
let file_names: Vec<String> = canonical
.iter()
.filter_map(|p| p.file_name())
.map(|n| n.to_string_lossy().to_string())
.collect();
for path in &canonical {
cache.invalidate_file(path);
--- feat: API catalog surface, protocol v2 tooling, MCP expansion, on+re update ## Summary Session 2026-03-23. Closes the loop between handler code and discoverability across all three surfaces (browser, CLI, MCP agent) via compile-time inventory registration. Adds protocol v2 update tooling, extends MCP from 21 to 29 tools, and brings the self-description up to date. ## API Catalog Surface (#[onto_api] proc-macro) - crates/ontoref-derive: new proc-macro crate; `#[onto_api(method, path, description, auth, actors, params, tags)]` emits `inventory::submit!(ApiRouteEntry{...})` at link time - crates/ontoref-daemon/src/api_catalog.rs: `catalog()` — pure fn over `inventory::iter::<ApiRouteEntry>()`, zero runtime allocation - GET /api/catalog: returns full annotated HTTP surface as JSON - templates/pages/api_catalog.html: new page with client-side filtering by method, auth, path/description; detail panel per route (params table, feature flag); linked from dashboard card and nav - UI nav: "API" link (</> icon) added to mobile dropdown and desktop bar - inventory = "0.3" added to workspace.dependencies (MIT, zero transitive deps) ## Protocol Update Mode - reflection/modes/update_ontoref.ncl: 9-step DAG (5 detect parallel, 2 update idempotent, 2 validate, 1 report) — brings any project from protocol v1 to v2 by adding manifest.ncl and connections.ncl if absent, scanning ADRs for deprecated check_hint, validating with nickel export - reflection/templates/update-ontology-prompt.md: 8-phase reusable prompt for agent-driven ontology enrichment (infrastructure → audit → core.ncl → state.ncl → manifest.ncl → connections.ncl → ADR migration → validation) ## CLI — describe group extensions - reflection/bin/ontoref.nu: `describe diff [--fmt] [--file]` and `describe api [--actor] [--tag] [--auth] [--fmt]` registered as canonical subcommands with log-action; aliases `df` and `da` added; QUICK REFERENCE and ALIASES sections updated ## MCP — two new tools (21 → 29 total) - ontoref_api_catalog: filters catalog() output by actor/tag/auth; returns { routes, total } — no HTTP roundtrip, calls inventory directly - ontoref_file_versions: reads ProjectContext.file_versions DashMap per slug; returns BTreeMap<filename, u64> reload counters - insert_mcp_ctx: audited and updated from 15 to 28 entries in 6 groups - HelpTool JSON: 8 new entries (validate_adrs, validate, impact, guides, bookmark_list, bookmark_add, api_catalog, file_versions) - ServerHandler::get_info instructions updated to mention new tools ## Web UI — dashboard additions - Dashboard: "API Catalog" card (9th); "Ontology File Versions" section showing per-file reload counters from file_versions DashMap - dashboard_mp: builds BTreeMap<String, u64> from ctx.file_versions and injects into Tera context ## on+re update - .ontology/core.ncl: describe-query-layer and adopt-ontoref-tooling descriptions updated; ontoref-daemon updated ("11 pages", "29 tools", API catalog, per-file versioning, #[onto_api]); new node api-catalog-surface (Yang/Practice) with 3 edges; artifact_paths extended across 3 nodes - .ontology/state.ncl: protocol-maturity blocker updated (protocol v2 complete); self-description-coverage catalyst updated with session 2026-03-23 additions - ADR-007: "API Surface Discoverability via #[onto_api] Proc-Macro" — Accepted ## Documentation - README.md: crates table updated (11 pages, 29 MCP tools, ontoref-derive row); MCP representative table expanded; API Catalog, Semantic Diff, Per-File Versioning paragraphs added; update_ontoref onboarding section added - CHANGELOG.md: [Unreleased] section with 4 change groups - assets/web/src/index.html: tool counts 19→29 (EN+ES), page counts 12→11 (EN+ES), daemon description paragraph updated with API catalog + #[onto_api]
2026-03-23 00:58:27 +01:00
*deps.file_versions.entry(path.clone()).or_insert(0) += 1;
2026-03-13 00:18:14 +00:00
}
info!(
files = canonical.len(),
names = %file_names.join(", "),
"cache invalidated — files changed"
);
// Convert to relative paths for notification matching
let relative_paths: Vec<String> = canonical
.iter()
.filter_map(|p| {
p.strip_prefix(&project_root)
.ok()
.map(|rel| rel.to_string_lossy().to_string())
})
.collect();
// Publish general file.changed event via NATS (all files, not just ack-required)
#[cfg(feature = "nats")]
{
if !relative_paths.is_empty() {
if let Some(ref nats) = deps.nats {
if let Err(e) = nats.publish_file_changed(&project_name, &relative_paths).await {
warn!(error = %e, "NATS file.changed publish failed");
}
}
}
}
// Push notifications — one per event type, actors need to ack
if !relative_paths.is_empty() {
let notification_ids = deps.notifications.push(
&project_name,
relative_paths.clone(),
None, // source_actor unknown from fs event
);
if !notification_ids.is_empty() {
let actor_tokens = deps.actors.tokens_for_project(&project_name);
// Increment pending count on each actor for each notification
for token in &actor_tokens {
for _ in &notification_ids {
deps.actors.increment_pending(token);
}
}
info!(
notifications = notification_ids.len(),
project = %project_name,
actors = actor_tokens.len(),
"notifications pushed"
);
// Publish via NATS — derive events from the file paths directly
#[cfg(feature = "nats")]
{
if let Some(ref nats) = deps.nats {
let mut published_events = std::collections::HashSet::new();
for file in &relative_paths {
if let Some(event) = crate::notifications::NotificationEvent::from_path(file) {
if published_events.insert(event) {
let event_files: Vec<String> = relative_paths
.iter()
.filter(|f| crate::notifications::NotificationEvent::from_path(f) == Some(event))
.cloned()
.collect();
if let Err(e) = nats.publish_notification(
&project_name,
&event,
&event_files,
).await {
warn!(error = %e, "NATS notification publish failed");
}
}
}
}
}
}
}
}
// Re-seed DB if ontology files changed.
// The seed_lock semaphore (permits=1) prevents concurrent re-seeds
// caused by rapid successive debounce windows on the same project.
2026-03-13 00:18:14 +00:00
#[cfg(feature = "db")]
{
let ontology_changed = canonical.iter().any(|p| {
p.components().any(|c| {
c.as_os_str() == std::ffi::OsStr::new(".ontology")
})
2026-03-13 00:18:14 +00:00
});
if ontology_changed {
if let Some(ref db) = deps.db {
let _permit = deps.seed_lock.acquire().await;
2026-03-13 00:18:14 +00:00
info!("re-seeding ontology tables from changed files");
crate::seed::seed_ontology(
db,
&project_name,
2026-03-13 00:18:14 +00:00
&project_root,
&cache,
deps.import_path.as_deref(),
)
.await;
deps.ontology_version
.fetch_add(1, std::sync::atomic::Ordering::Release);
// _permit drops here, releasing the semaphore
2026-03-13 00:18:14 +00:00
}
}
}
}
},
_ = full_tick.tick() => {
// Periodic full invalidation as safety net against missed events.
let before = cache.len();
cache.invalidate_all();
if before > 0 {
info!(evicted = before, "periodic full cache invalidation");
}
}
}
}
}
// ── Config Hot-Reload
// ─────────────────────────────────────────────────────────
/// Watches `keys-overlay.json` and hot-applies credential changes to all
/// registered `ProjectContext`s and the primary project's key set.
///
/// The parent directory is watched (not the file itself) so atomic editor
/// saves — which create a new inode and rename it — are detected reliably.
pub struct ConfigWatcher {
_watcher: RecommendedWatcher,
_task: tokio::task::JoinHandle<()>,
}
impl ConfigWatcher {
/// Start watching `overlay_path` for changes.
///
/// On any write, re-reads the file and applies keys for every slug.
/// The reserved slug `"_primary"` resolves to `registry.primary_slug()`
/// so callers do not need to know the actual project name.
pub fn start(
overlay_path: PathBuf,
registry: Arc<ProjectRegistry>,
) -> std::result::Result<Self, crate::error::DaemonError> {
let (tx, rx) = mpsc::channel::<()>(8);
let tx_notify = tx.clone();
let mut watcher = RecommendedWatcher::new(
move |res: std::result::Result<Event, notify::Error>| {
if res.is_ok() {
// Any event in the parent directory triggers a debounced reload.
// The reload task re-reads the file and skips if unchanged.
let _ = tx_notify.try_send(());
}
},
Config::default(),
)
.map_err(|e| crate::error::DaemonError::Watcher(e.to_string()))?;
if let Some(parent) = overlay_path.parent() {
if parent.is_dir() {
watcher
.watch(parent, RecursiveMode::NonRecursive)
.map_err(|e| crate::error::DaemonError::Watcher(e.to_string()))?;
info!(path = %overlay_path.display(), "config watcher started");
}
}
let task = tokio::spawn(config_reload_loop(rx, overlay_path, registry));
Ok(Self {
_watcher: watcher,
_task: task,
})
}
}
async fn config_reload_loop(
mut rx: mpsc::Receiver<()>,
overlay_path: PathBuf,
registry: Arc<ProjectRegistry>,
) {
let debounce = Duration::from_millis(500);
while rx.recv().await.is_some() {
// Drain burst within the debounce window.
tokio::time::sleep(debounce).await;
while rx.try_recv().is_ok() {}
if !overlay_path.exists() {
continue;
}
let data = match std::fs::read_to_string(&overlay_path) {
Ok(d) => d,
Err(e) => {
warn!(path = %overlay_path.display(), error = %e, "config watcher: failed to read overlay");
continue;
}
};
let overrides = match serde_json::from_str::<std::collections::HashMap<String, Vec<KeyEntry>>>(
&data,
) {
Ok(o) => o,
Err(e) => {
warn!(path = %overlay_path.display(), error = %e, "config watcher: overlay is invalid JSON — skipped");
continue;
}
};
let mut updated = 0usize;
for (slug, keys) in overrides {
let resolved = if slug == "_primary" {
registry.primary_slug().to_string()
} else {
slug
};
if registry.update_keys(&resolved, keys).is_some() {
updated += 1;
} else {
warn!(%resolved, "config watcher: no registered project with this slug — key update skipped");
}
}
info!(updated, "keys-overlay hot-reloaded");
}
}