#[cfg(feature = "nats")] use std::time::{SystemTime, UNIX_EPOCH}; #[cfg(feature = "nats")] use anyhow::Result; #[cfg(feature = "nats")] use bytes::Bytes; #[cfg(feature = "nats")] use platform_nats::{EventStream, NatsConnectionConfig, TopologyConfig}; #[cfg(feature = "nats")] use serde_json::{json, Value}; #[cfg(feature = "nats")] use tracing::{info, warn}; /// NATS JetStream publisher for daemon lifecycle events. /// /// Uses platform-nats `connect_client()` — connection + auth only. /// Stream/consumer topology comes entirely from `nats/streams.json` (or /// equivalent), referenced by `nats_events.streams_config` in /// `.ontoref/config.ncl`. /// /// Gracefully degrades if NATS is unavailable or disabled in config. #[cfg(feature = "nats")] pub struct NatsPublisher { stream: EventStream, project: String, port: u16, } #[cfg(feature = "nats")] impl NatsPublisher { /// Connect to NATS JetStream, apply topology from config, bind consumer. /// Reads `nats_events` section from `.ontoref/config.ncl`. /// Returns `Ok(None)` if disabled or unavailable (graceful degradation). pub async fn connect( config: Option<&serde_json::Value>, project: String, port: u16, ) -> Result> { let config = match config { Some(v) => v.clone(), None => return Ok(None), }; let nats_section = match config.get("nats_events") { Some(section) => section, None => return Ok(None), }; let enabled = nats_section .get("enabled") .and_then(|e| e.as_bool()) .unwrap_or(false); if !enabled { return Ok(None); } info!("connecting to NATS..."); let url = nats_section .get("url") .and_then(|u| u.as_str()) .unwrap_or("nats://localhost:4222") .to_string(); let nkey_seed = nats_section .get("nkey_seed") .and_then(|s| s.as_str()) .map(|s| s.to_string()); let require_signed = nats_section .get("require_signed_messages") .and_then(|r| r.as_bool()) .unwrap_or(false); let trusted_nkeys = nats_section .get("trusted_nkeys") .and_then(|t| t.as_array()) .map(|arr| { arr.iter() .filter_map(|v| v.as_str().map(String::from)) .collect() }) .unwrap_or_default(); let conn_cfg = NatsConnectionConfig { url: url.clone(), nkey_seed, require_signed_messages: require_signed, trusted_nkeys, }; let mut stream = match tokio::time::timeout( std::time::Duration::from_secs(3), EventStream::connect_client(&conn_cfg), ) .await { Ok(Ok(s)) => s, Ok(Err(e)) => { warn!(error = %e, url = %url, "NATS connection failed — running without events"); return Ok(None); } Err(_) => { warn!(url = %url, "NATS connection timed out — running without events"); return Ok(None); } }; info!(url = %url, "NATS connected"); // Apply topology from streams_config file declared in project config. // Empty string → None so TopologyConfig::load falls back to NATS_STREAMS_CONFIG // env var (set by ontoref-daemon-boot to // ~/.config/ontoref/streams.json). let topology_path = nats_section .get("streams_config") .and_then(|s| s.as_str()) .filter(|s| !s.is_empty()) .map(std::path::PathBuf::from); let topology = match TopologyConfig::load(topology_path.as_deref()) { Ok(Some(t)) => Some(t), Ok(None) => { warn!("no topology config found — publish-only mode (no consumer bound)"); None } Err(e) => { warn!(error = %e, "topology config load failed — publish-only mode"); None } }; if let Some(ref topo) = topology { match stream.apply_topology(topo).await { Ok(report) => info!( streams = report.streams_applied, consumers = report.consumers_applied, "topology applied" ), Err(e) => warn!(error = %e, "topology apply failed — publish-only mode"), } } // Bind to daemon consumer on the first stream (convention: "daemon-{project}"). if let Some(ref topo) = topology { if let Some(first_stream) = topo.streams.first() { let consumer_name = format!("daemon-{project}"); if let Err(e) = stream .bind_consumer(&first_stream.name, &consumer_name) .await { warn!(error = %e, "failed to bind daemon consumer — pull_events disabled"); } } } Ok(Some(Self { stream, project, port, })) } pub async fn publish_started(&self) -> Result<()> { let payload = json!({ "project": self.project, "port": self.port, "timestamp": iso8601_now(), }); self.stream .publish("ecosystem.daemon.started", Bytes::from(payload.to_string())) .await?; info!(port = self.port, "published daemon.started"); Ok(()) } pub async fn publish_stopped(&self, uptime_secs: u64) -> Result<()> { let payload = json!({ "project": self.project, "uptime_seconds": uptime_secs, "timestamp": iso8601_now(), }); self.stream .publish("ecosystem.daemon.stopped", Bytes::from(payload.to_string())) .await?; info!(uptime = uptime_secs, "published daemon.stopped"); Ok(()) } /// Publish a file change notification for a specific project and event /// type. pub async fn publish_notification( &self, project: &str, event: &crate::notifications::NotificationEvent, files: &[String], ) -> Result<()> { let subject = format!("ecosystem.{project}.{}", event.nats_suffix()); let payload = json!({ "project": project, "event": format!("{event:?}"), "files": files, "timestamp": iso8601_now(), }); self.stream .publish(&subject, Bytes::from(payload.to_string())) .await?; info!(subject = %subject, files = files.len(), "published notification"); Ok(()) } /// Publish an actor registration event. pub async fn publish_actor_registered( &self, token: &str, actor_type: &str, project: &str, ) -> Result<()> { let payload = json!({ "token": token, "actor_type": actor_type, "project": project, "timestamp": iso8601_now(), }); self.stream .publish( "ecosystem.actor.registered", Bytes::from(payload.to_string()), ) .await?; info!(token = %token, "published actor.registered"); Ok(()) } /// Publish an actor deregistration event. pub async fn publish_actor_deregistered(&self, token: &str, reason: &str) -> Result<()> { let payload = json!({ "token": token, "reason": reason, "timestamp": iso8601_now(), }); self.stream .publish( "ecosystem.actor.deregistered", Bytes::from(payload.to_string()), ) .await?; info!(token = %token, reason = %reason, "published actor.deregistered"); Ok(()) } /// Publish a file change event for a project (general, not requiring ack). pub async fn publish_file_changed(&self, project: &str, files: &[String]) -> Result<()> { let subject = format!("ecosystem.{project}.file.changed"); let payload = json!({ "project": project, "files": files, "timestamp": iso8601_now(), }); self.stream .publish(&subject, Bytes::from(payload.to_string())) .await?; Ok(()) } pub async fn publish_cache_invalidated(&self, reason: &str) -> Result<()> { let payload = json!({ "project": self.project, "reason": reason, "affected_keys": [], "timestamp": iso8601_now(), }); self.stream .publish( "ecosystem.daemon.cache.invalidated", Bytes::from(payload.to_string()), ) .await?; info!(reason = %reason, "published daemon.cache.invalidated"); Ok(()) } /// Pull pending messages from the bound JetStream consumer. /// Returns (subject, parsed_json) for each valid message. /// Returns empty vec if no consumer is bound (publish-only mode). pub async fn pull_events(&self, max_msgs: usize) -> Result> { let batch = match self.stream.pull_batch(max_msgs).await { Ok(b) => b, Err(e) => { let msg = e.to_string(); if msg.contains("no consumer bound") { return Ok(Vec::new()); } return Err(e); } }; let mut events = Vec::with_capacity(batch.len()); for (subject, payload_bytes, msg) in batch { match serde_json::from_slice::(&payload_bytes) { Ok(json) => events.push((subject, json)), Err(e) => { warn!(error = %e, subject = %subject, "invalid JSON in NATS message — skipping"); } } let _ = msg.ack().await; } Ok(events) } /// Extract (mode_id, params) from a reflection.request payload. pub fn parse_reflection_request(payload: &Value) -> Option<(String, Value)> { let mode_id = payload.get("mode_id")?.as_str()?.to_string(); let params = payload .get("params") .cloned() .unwrap_or(Value::Object(Default::default())); Some((mode_id, params)) } } /// ISO 8601 timestamp (UTC) without external dependency. #[cfg(feature = "nats")] fn iso8601_now() -> String { iso8601_from_duration( SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default(), ) } fn iso8601_from_duration(now: std::time::Duration) -> String { let total_secs = now.as_secs(); let micros = now.subsec_micros(); let mut remaining_days = (total_secs / 86400) as u32; let day_secs = (total_secs % 86400) as u32; let mut year = 1970u32; loop { let diy = days_in_year(year); if remaining_days < diy { break; } remaining_days -= diy; year += 1; } let month_lengths: [u32; 12] = if is_leap_year(year) { [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] } else { [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] }; let mut month = 1u32; for &ml in &month_lengths { if remaining_days < ml { break; } remaining_days -= ml; month += 1; } let day = remaining_days + 1; format!( "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:06}Z", year, month, day, day_secs / 3600, (day_secs % 3600) / 60, day_secs % 60, micros, ) } fn is_leap_year(y: u32) -> bool { (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400) } fn days_in_year(y: u32) -> u32 { if is_leap_year(y) { 366 } else { 365 } } #[cfg(test)] mod tests { use std::time::Duration; use super::*; #[test] fn epoch_zero_is_unix_origin() { assert_eq!( iso8601_from_duration(Duration::ZERO), "1970-01-01T00:00:00.000000Z" ); } #[test] fn known_date_non_leap() { // 2001-03-01T00:00:00Z — first day of March in a non-leap year // 2001-01-01 = 978307200; +59 days (Jan31 + Feb28) = 978307200 + 5097600 = // 983404800 let secs = 978_307_200 + 31 * 86400 + 28 * 86400; assert_eq!( iso8601_from_duration(Duration::from_secs(secs)), "2001-03-01T00:00:00.000000Z" ); } #[test] fn leap_year_feb29() { // 2000-02-29T00:00:00Z — only exists in a leap year // 2000-01-01 = 946684800; +59 days (Jan31 + Feb29 is day 60, so +58 days from // Jan1) let secs = 946_684_800 + 31 * 86400 + 28 * 86400; assert_eq!( iso8601_from_duration(Duration::from_secs(secs)), "2000-02-29T00:00:00.000000Z" ); } #[test] fn century_non_leap_skips_feb29() { // 1900 would be non-leap (div 100, not div 400); but our epoch starts at 1970 // Use 2100: it's div 100 but not div 400 → non-leap. // 2100-02-28 exists; 2100-03-01 follows immediately (no Feb 29). // Verify is_leap_year rejects 2100. assert!(!is_leap_year(2100)); assert!(is_leap_year(2000)); assert!(is_leap_year(2024)); assert!(!is_leap_year(2023)); } #[test] fn subsecond_micros_preserved() { let d = Duration::new(0, 123_456_000); // 123456 microseconds let s = iso8601_from_duration(d); assert!(s.ends_with(".123456Z"), "got: {s}"); } #[test] fn time_components_correct() { // 1970-01-01T01:02:03 let secs = 3600 + 2 * 60 + 3; assert_eq!( iso8601_from_duration(Duration::from_secs(secs)), "1970-01-01T01:02:03.000000Z" ); } #[test] fn iso8601_now_format() { let s = iso8601_now(); assert_eq!(s.len(), 27, "unexpected length: {s}"); assert!(s.ends_with('Z')); assert_eq!(&s[4..5], "-"); assert_eq!(&s[7..8], "-"); assert_eq!(&s[10..11], "T"); assert_eq!(&s[13..14], ":"); assert_eq!(&s[16..17], ":"); assert_eq!(&s[19..20], "."); } } // ── No-op implementation when nats feature is disabled ──────────────── #[cfg(not(feature = "nats"))] pub struct NatsPublisher; #[cfg(not(feature = "nats"))] impl NatsPublisher { pub async fn connect( _config: Option<&serde_json::Value>, _project: String, _port: u16, ) -> anyhow::Result> { Ok(None) } pub async fn publish_started(&self) -> anyhow::Result<()> { Ok(()) } pub async fn publish_stopped(&self, _uptime_secs: u64) -> anyhow::Result<()> { Ok(()) } pub async fn publish_notification( &self, _project: &str, _event: &crate::notifications::NotificationEvent, _files: &[String], ) -> anyhow::Result<()> { Ok(()) } pub async fn publish_actor_registered( &self, _token: &str, _actor_type: &str, _project: &str, ) -> anyhow::Result<()> { Ok(()) } pub async fn publish_actor_deregistered( &self, _token: &str, _reason: &str, ) -> anyhow::Result<()> { Ok(()) } pub async fn publish_file_changed( &self, _project: &str, _files: &[String], ) -> anyhow::Result<()> { Ok(()) } pub async fn publish_cache_invalidated(&self, _reason: &str) -> anyhow::Result<()> { Ok(()) } }