434 lines
13 KiB
Rust
Raw Normal View History

2026-03-13 00:18:14 +00:00
#[cfg(feature = "nats")]
use std::path::PathBuf;
#[cfg(feature = "nats")]
use std::process::Command;
#[cfg(feature = "nats")]
use std::time::{SystemTime, UNIX_EPOCH};
#[cfg(feature = "nats")]
use anyhow::{anyhow, Result};
#[cfg(feature = "nats")]
use bytes::Bytes;
#[cfg(feature = "nats")]
use platform_nats::{EventStream, NatsConnectionConfig, TopologyConfig};
#[cfg(feature = "nats")]
use serde_json::{json, Value};
#[cfg(feature = "nats")]
use tracing::{info, warn};
/// NATS JetStream publisher for daemon lifecycle events.
///
/// Uses platform-nats `connect_client()` — connection + auth only.
/// Stream/consumer topology comes entirely from `nats/streams.json` (or
/// equivalent), referenced by `nats_events.streams_config` in
/// `.ontoref/config.ncl`.
///
/// Gracefully degrades if NATS is unavailable or disabled in config.
#[cfg(feature = "nats")]
pub struct NatsPublisher {
stream: EventStream,
project: String,
port: u16,
}
#[cfg(feature = "nats")]
impl NatsPublisher {
/// Connect to NATS JetStream, apply topology from config, bind consumer.
/// Reads `nats_events` section from `.ontoref/config.ncl`.
/// Returns `Ok(None)` if disabled or unavailable (graceful degradation).
pub async fn connect(
config_path: &PathBuf,
project: String,
port: u16,
) -> Result<Option<Self>> {
let config = load_nats_config(config_path)?;
let nats_section = match config.get("nats_events") {
Some(section) => section,
None => return Ok(None),
};
let enabled = nats_section
.get("enabled")
.and_then(|e| e.as_bool())
.unwrap_or(false);
if !enabled {
return Ok(None);
}
let url = nats_section
.get("url")
.and_then(|u| u.as_str())
.unwrap_or("nats://localhost:4222")
.to_string();
let nkey_seed = nats_section
.get("nkey_seed")
.and_then(|s| s.as_str())
.map(|s| s.to_string());
let require_signed = nats_section
.get("require_signed_messages")
.and_then(|r| r.as_bool())
.unwrap_or(false);
let trusted_nkeys = nats_section
.get("trusted_nkeys")
.and_then(|t| t.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let conn_cfg = NatsConnectionConfig {
url: url.clone(),
nkey_seed,
require_signed_messages: require_signed,
trusted_nkeys,
};
let mut stream = match tokio::time::timeout(
std::time::Duration::from_secs(3),
EventStream::connect_client(&conn_cfg),
)
.await
{
Ok(Ok(s)) => s,
Ok(Err(e)) => {
warn!(error = %e, url = %url, "NATS connection failed — running without events");
return Ok(None);
}
Err(_) => {
warn!(url = %url, "NATS connection timed out — running without events");
return Ok(None);
}
};
info!(url = %url, "NATS connected");
// Apply topology from streams_config file declared in project config.
// Fallback: NATS_STREAMS_CONFIG env var.
let topology_path = nats_section
.get("streams_config")
.and_then(|s| s.as_str())
.map(std::path::PathBuf::from);
let topology = match TopologyConfig::load(topology_path.as_deref()) {
Ok(Some(t)) => Some(t),
Ok(None) => {
warn!("no topology config found — publish-only mode (no consumer bound)");
None
}
Err(e) => {
warn!(error = %e, "topology config load failed — publish-only mode");
None
}
};
if let Some(ref topo) = topology {
match stream.apply_topology(topo).await {
Ok(report) => info!(
streams = report.streams_applied,
consumers = report.consumers_applied,
"topology applied"
),
Err(e) => warn!(error = %e, "topology apply failed — publish-only mode"),
}
}
// Bind to daemon consumer on the first stream (convention: "daemon-{project}").
if let Some(ref topo) = topology {
if let Some(first_stream) = topo.streams.first() {
let consumer_name = format!("daemon-{project}");
if let Err(e) = stream
.bind_consumer(&first_stream.name, &consumer_name)
.await
{
warn!(error = %e, "failed to bind daemon consumer — pull_events disabled");
}
}
}
Ok(Some(Self {
stream,
project,
port,
}))
}
pub async fn publish_started(&self) -> Result<()> {
let payload = json!({
"project": self.project,
"port": self.port,
"timestamp": iso8601_now(),
});
self.stream
.publish("ecosystem.daemon.started", Bytes::from(payload.to_string()))
.await?;
info!(port = self.port, "published daemon.started");
Ok(())
}
pub async fn publish_stopped(&self, uptime_secs: u64) -> Result<()> {
let payload = json!({
"project": self.project,
"uptime_seconds": uptime_secs,
"timestamp": iso8601_now(),
});
self.stream
.publish("ecosystem.daemon.stopped", Bytes::from(payload.to_string()))
.await?;
info!(uptime = uptime_secs, "published daemon.stopped");
Ok(())
}
/// Publish a file change notification for a specific project and event
/// type.
pub async fn publish_notification(
&self,
project: &str,
event: &crate::notifications::NotificationEvent,
files: &[String],
) -> Result<()> {
let subject = format!("ecosystem.{project}.{}", event.nats_suffix());
let payload = json!({
"project": project,
"event": format!("{event:?}"),
"files": files,
"timestamp": iso8601_now(),
});
self.stream
.publish(&subject, Bytes::from(payload.to_string()))
.await?;
info!(subject = %subject, files = files.len(), "published notification");
Ok(())
}
/// Publish an actor registration event.
pub async fn publish_actor_registered(
&self,
token: &str,
actor_type: &str,
project: &str,
) -> Result<()> {
let payload = json!({
"token": token,
"actor_type": actor_type,
"project": project,
"timestamp": iso8601_now(),
});
self.stream
.publish(
"ecosystem.actor.registered",
Bytes::from(payload.to_string()),
)
.await?;
info!(token = %token, "published actor.registered");
Ok(())
}
/// Publish an actor deregistration event.
pub async fn publish_actor_deregistered(&self, token: &str, reason: &str) -> Result<()> {
let payload = json!({
"token": token,
"reason": reason,
"timestamp": iso8601_now(),
});
self.stream
.publish(
"ecosystem.actor.deregistered",
Bytes::from(payload.to_string()),
)
.await?;
info!(token = %token, reason = %reason, "published actor.deregistered");
Ok(())
}
/// Publish a file change event for a project (general, not requiring ack).
pub async fn publish_file_changed(&self, project: &str, files: &[String]) -> Result<()> {
let subject = format!("ecosystem.{project}.file.changed");
let payload = json!({
"project": project,
"files": files,
"timestamp": iso8601_now(),
});
self.stream
.publish(&subject, Bytes::from(payload.to_string()))
.await?;
Ok(())
}
pub async fn publish_cache_invalidated(&self, reason: &str) -> Result<()> {
let payload = json!({
"project": self.project,
"reason": reason,
"affected_keys": [],
"timestamp": iso8601_now(),
});
self.stream
.publish(
"ecosystem.daemon.cache.invalidated",
Bytes::from(payload.to_string()),
)
.await?;
info!(reason = %reason, "published daemon.cache.invalidated");
Ok(())
}
/// Pull pending messages from the bound JetStream consumer.
/// Returns (subject, parsed_json) for each valid message.
/// Returns empty vec if no consumer is bound (publish-only mode).
pub async fn pull_events(&self, max_msgs: usize) -> Result<Vec<(String, Value)>> {
let batch = match self.stream.pull_batch(max_msgs).await {
Ok(b) => b,
Err(e) => {
let msg = e.to_string();
if msg.contains("no consumer bound") {
return Ok(Vec::new());
}
return Err(e);
}
};
let mut events = Vec::with_capacity(batch.len());
for (subject, payload_bytes, msg) in batch {
match serde_json::from_slice::<Value>(&payload_bytes) {
Ok(json) => events.push((subject, json)),
Err(e) => {
warn!(error = %e, subject = %subject, "invalid JSON in NATS message — skipping");
}
}
let _ = msg.ack().await;
}
Ok(events)
}
/// Extract (mode_id, params) from a reflection.request payload.
pub fn parse_reflection_request(payload: &Value) -> Option<(String, Value)> {
let mode_id = payload.get("mode_id")?.as_str()?.to_string();
let params = payload
.get("params")
.cloned()
.unwrap_or(Value::Object(Default::default()));
Some((mode_id, params))
}
}
/// Load project NATS config from .ontoref/config.ncl via nickel export.
#[cfg(feature = "nats")]
fn load_nats_config(config_path: &PathBuf) -> Result<Value> {
if !config_path.exists() {
return Ok(json!({}));
}
let output = Command::new("nickel")
.arg("export")
.arg(config_path)
.output()
.map_err(|e| anyhow!("running nickel export: {e}"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(anyhow!("nickel export failed: {stderr}"));
}
serde_json::from_slice(&output.stdout).map_err(|e| anyhow!("parsing nickel export output: {e}"))
}
/// ISO 8601 timestamp (UTC) without external dependency.
#[cfg(feature = "nats")]
fn iso8601_now() -> String {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
let secs = now.as_secs();
let micros = now.subsec_micros();
let days = secs / 86400;
let day_secs = secs % 86400;
// Gregorian approximation (valid for 19702099)
let year = 1970 + (days / 365);
let month = ((days % 365) / 30) + 1;
let day = ((days % 365) % 30) + 1;
format!(
"{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:06}Z",
year,
month,
day,
day_secs / 3600,
(day_secs % 3600) / 60,
day_secs % 60,
micros,
)
}
// ── No-op implementation when nats feature is disabled ────────────────
#[cfg(not(feature = "nats"))]
pub struct NatsPublisher;
#[cfg(not(feature = "nats"))]
impl NatsPublisher {
pub async fn connect(
_config_path: &std::path::PathBuf,
_project: String,
_port: u16,
) -> anyhow::Result<Option<Self>> {
Ok(None)
}
pub async fn publish_started(&self) -> anyhow::Result<()> {
Ok(())
}
pub async fn publish_stopped(&self, _uptime_secs: u64) -> anyhow::Result<()> {
Ok(())
}
pub async fn publish_notification(
&self,
_project: &str,
_event: &crate::notifications::NotificationEvent,
_files: &[String],
) -> anyhow::Result<()> {
Ok(())
}
pub async fn publish_actor_registered(
&self,
_token: &str,
_actor_type: &str,
_project: &str,
) -> anyhow::Result<()> {
Ok(())
}
pub async fn publish_actor_deregistered(
&self,
_token: &str,
_reason: &str,
) -> anyhow::Result<()> {
Ok(())
}
pub async fn publish_file_changed(
&self,
_project: &str,
_files: &[String],
) -> anyhow::Result<()> {
Ok(())
}
pub async fn publish_cache_invalidated(&self, _reason: &str) -> anyhow::Result<()> {
Ok(())
}
}