Jesús Pérez 2d87d60bb5
Some checks failed
Rust CI / Security Audit (push) Has been cancelled
Rust CI / Check + Test + Lint (nightly) (push) Has been cancelled
Rust CI / Check + Test + Lint (stable) (push) Has been cancelled
chore: add src code
2026-03-13 00:18:14 +00:00

303 lines
9.0 KiB
Rust

use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::Arc;
use std::time::{Instant, SystemTime};
use dashmap::DashMap;
use serde_json::Value;
use tokio::sync::Mutex;
use tracing::debug;
use crate::error::{DaemonError, Result};
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct CacheKey {
path: PathBuf,
mtime: SystemTime,
import_path: Option<String>,
}
impl CacheKey {
fn new(path: &Path, mtime: SystemTime, import_path: Option<&str>) -> Self {
Self {
path: path.to_path_buf(),
mtime,
import_path: import_path.map(String::from),
}
}
}
struct CachedExport {
json: Value,
exported_at: Instant,
}
/// Caches `nickel export` subprocess results keyed by file path + mtime.
///
/// First call to a file invokes `nickel export` (~100ms). Subsequent calls
/// with unchanged mtime return the cached JSON (<1ms).
pub struct NclCache {
cache: DashMap<CacheKey, CachedExport>,
inflight: DashMap<PathBuf, Arc<Mutex<()>>>,
stats: CacheStats,
}
struct CacheStats {
hits: std::sync::atomic::AtomicU64,
misses: std::sync::atomic::AtomicU64,
}
impl CacheStats {
fn new() -> Self {
Self {
hits: std::sync::atomic::AtomicU64::new(0),
misses: std::sync::atomic::AtomicU64::new(0),
}
}
fn hit(&self) {
self.hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
fn miss(&self) {
self.misses
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
fn hits(&self) -> u64 {
self.hits.load(std::sync::atomic::Ordering::Relaxed)
}
fn misses(&self) -> u64 {
self.misses.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl Default for NclCache {
fn default() -> Self {
Self::new()
}
}
impl NclCache {
pub fn new() -> Self {
Self {
cache: DashMap::new(),
inflight: DashMap::new(),
stats: CacheStats::new(),
}
}
/// Export a Nickel file to JSON. Returns cached result if mtime unchanged.
///
/// On cache miss, spawns the `nickel export` subprocess on a blocking
/// thread to avoid stalling the Tokio runtime.
/// Returns `(json, was_cache_hit)`.
pub async fn export(&self, path: &Path, import_path: Option<&str>) -> Result<(Value, bool)> {
let abs_path = if path.is_absolute() {
path.to_path_buf()
} else {
std::env::current_dir()?.join(path)
};
let mtime = std::fs::metadata(&abs_path)
.map_err(|e| DaemonError::NclExport {
path: abs_path.display().to_string(),
reason: format!("stat failed: {e}"),
})?
.modified()
.map_err(|e| DaemonError::NclExport {
path: abs_path.display().to_string(),
reason: format!("mtime unavailable: {e}"),
})?;
let key = CacheKey::new(&abs_path, mtime, import_path);
if let Some(cached) = self.cache.get(&key) {
self.stats.hit();
debug!(
path = %abs_path.display(),
age_ms = cached.exported_at.elapsed().as_millis(),
"cache hit"
);
return Ok((cached.json.clone(), true));
}
debug!(path = %abs_path.display(), "cache miss — acquiring inflight lock");
// Acquire per-path lock to coalesce concurrent misses for the same file.
let lock = self
.inflight
.entry(abs_path.clone())
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone();
let _guard = lock.lock().await;
// Re-check cache after acquiring the lock — another task may have filled it.
// Return as a hit: the subprocess ran, but not on our behalf.
if let Some(cached) = self.cache.get(&key) {
self.stats.hit();
return Ok((cached.json.clone(), true));
}
// Confirmed miss — no cached result exists, we will invoke the subprocess.
self.stats.miss();
let export_path = abs_path.clone();
let export_ip = import_path.map(String::from);
let result = tokio::task::spawn_blocking(move || {
run_nickel_export(&export_path, export_ip.as_deref())
})
.await
.map_err(|e| DaemonError::NclExport {
path: abs_path.display().to_string(),
reason: format!("spawn_blocking join failed: {e}"),
});
// On error: release inflight slot so future attempts can retry.
// On success: insert into cache BEFORE releasing inflight, so concurrent
// waiters find the result on their re-check instead of re-running the export.
match result {
Err(e) => {
drop(_guard);
self.inflight.remove(&abs_path);
Err(e)
}
Ok(Err(e)) => {
drop(_guard);
self.inflight.remove(&abs_path);
Err(e)
}
Ok(Ok(json)) => {
self.cache.insert(
key,
CachedExport {
json: json.clone(),
exported_at: Instant::now(),
},
);
// Release inflight AFTER cache is populated — concurrent waiters
// will hit the cache on their re-check.
drop(_guard);
self.inflight.remove(&abs_path);
Ok((json, false))
}
}
}
/// Invalidate all cache entries whose path starts with the given prefix.
pub fn invalidate_prefix(&self, prefix: &Path) {
let before = self.cache.len();
self.cache.retain(|k, _| !k.path.starts_with(prefix));
let evicted = before - self.cache.len();
if evicted > 0 {
debug!(prefix = %prefix.display(), evicted, "cache invalidation");
}
}
/// Invalidate a specific file path (all mtimes).
///
/// Paths from the watcher and API are always resolved to absolute before
/// calling this. The `debug_assert` catches programming errors in tests.
pub fn invalidate_file(&self, path: &Path) {
debug_assert!(path.is_absolute(), "invalidate_file expects absolute path");
self.cache.retain(|k, _| k.path != path);
}
/// Drop all cached entries.
pub fn invalidate_all(&self) {
let count = self.cache.len();
self.cache.clear();
debug!(count, "full cache invalidation");
}
pub fn len(&self) -> usize {
self.cache.len()
}
pub fn is_empty(&self) -> bool {
self.cache.is_empty()
}
pub fn hit_count(&self) -> u64 {
self.stats.hits()
}
pub fn miss_count(&self) -> u64 {
self.stats.misses()
}
}
/// Invoke `nickel export --format json` as a subprocess.
fn run_nickel_export(path: &Path, import_path: Option<&str>) -> Result<Value> {
let mut cmd = Command::new("nickel");
cmd.args(["export", "--format", "json"]).arg(path);
if let Some(ip) = import_path {
cmd.env("NICKEL_IMPORT_PATH", ip);
} else {
cmd.env_remove("NICKEL_IMPORT_PATH");
}
let output = cmd.output().map_err(|e| DaemonError::NclExport {
path: path.display().to_string(),
reason: format!("spawn failed: {e}"),
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(DaemonError::NclExport {
path: path.display().to_string(),
reason: stderr.trim().to_string(),
});
}
let json: Value =
serde_json::from_slice(&output.stdout).map_err(|e| DaemonError::NclExport {
path: path.display().to_string(),
reason: format!("JSON parse failed: {e}"),
})?;
Ok(json)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cache_key_equality() {
let t = SystemTime::now();
let k1 = CacheKey::new(Path::new("/a/b.ncl"), t, Some("path1"));
let k2 = CacheKey::new(Path::new("/a/b.ncl"), t, Some("path1"));
let k3 = CacheKey::new(Path::new("/a/b.ncl"), t, Some("path2"));
assert_eq!(k1, k2);
assert_ne!(k1, k3);
}
#[test]
fn invalidation_by_prefix() {
let cache = NclCache::new();
let t = SystemTime::now();
// Insert fake entries directly
let key1 = CacheKey::new(Path::new("/project/.ontology/core.ncl"), t, None);
let key2 = CacheKey::new(Path::new("/project/.ontology/state.ncl"), t, None);
let key3 = CacheKey::new(Path::new("/project/adrs/adr-001.ncl"), t, None);
for key in [key1, key2, key3] {
cache.cache.insert(
key,
CachedExport {
json: Value::Null,
exported_at: Instant::now(),
},
);
}
assert_eq!(cache.len(), 3);
cache.invalidate_prefix(Path::new("/project/.ontology"));
assert_eq!(cache.len(), 1);
}
}