434 lines
13 KiB
Rust
434 lines
13 KiB
Rust
|
|
#[cfg(feature = "nats")]
|
|||
|
|
use std::path::PathBuf;
|
|||
|
|
#[cfg(feature = "nats")]
|
|||
|
|
use std::process::Command;
|
|||
|
|
#[cfg(feature = "nats")]
|
|||
|
|
use std::time::{SystemTime, UNIX_EPOCH};
|
|||
|
|
|
|||
|
|
#[cfg(feature = "nats")]
|
|||
|
|
use anyhow::{anyhow, Result};
|
|||
|
|
#[cfg(feature = "nats")]
|
|||
|
|
use bytes::Bytes;
|
|||
|
|
#[cfg(feature = "nats")]
|
|||
|
|
use platform_nats::{EventStream, NatsConnectionConfig, TopologyConfig};
|
|||
|
|
#[cfg(feature = "nats")]
|
|||
|
|
use serde_json::{json, Value};
|
|||
|
|
#[cfg(feature = "nats")]
|
|||
|
|
use tracing::{info, warn};
|
|||
|
|
|
|||
|
|
/// NATS JetStream publisher for daemon lifecycle events.
|
|||
|
|
///
|
|||
|
|
/// Uses platform-nats `connect_client()` — connection + auth only.
|
|||
|
|
/// Stream/consumer topology comes entirely from `nats/streams.json` (or
|
|||
|
|
/// equivalent), referenced by `nats_events.streams_config` in
|
|||
|
|
/// `.ontoref/config.ncl`.
|
|||
|
|
///
|
|||
|
|
/// Gracefully degrades if NATS is unavailable or disabled in config.
|
|||
|
|
#[cfg(feature = "nats")]
|
|||
|
|
pub struct NatsPublisher {
|
|||
|
|
stream: EventStream,
|
|||
|
|
project: String,
|
|||
|
|
port: u16,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#[cfg(feature = "nats")]
|
|||
|
|
impl NatsPublisher {
|
|||
|
|
/// Connect to NATS JetStream, apply topology from config, bind consumer.
|
|||
|
|
/// Reads `nats_events` section from `.ontoref/config.ncl`.
|
|||
|
|
/// Returns `Ok(None)` if disabled or unavailable (graceful degradation).
|
|||
|
|
pub async fn connect(
|
|||
|
|
config_path: &PathBuf,
|
|||
|
|
project: String,
|
|||
|
|
port: u16,
|
|||
|
|
) -> Result<Option<Self>> {
|
|||
|
|
let config = load_nats_config(config_path)?;
|
|||
|
|
|
|||
|
|
let nats_section = match config.get("nats_events") {
|
|||
|
|
Some(section) => section,
|
|||
|
|
None => return Ok(None),
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
let enabled = nats_section
|
|||
|
|
.get("enabled")
|
|||
|
|
.and_then(|e| e.as_bool())
|
|||
|
|
.unwrap_or(false);
|
|||
|
|
|
|||
|
|
if !enabled {
|
|||
|
|
return Ok(None);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
let url = nats_section
|
|||
|
|
.get("url")
|
|||
|
|
.and_then(|u| u.as_str())
|
|||
|
|
.unwrap_or("nats://localhost:4222")
|
|||
|
|
.to_string();
|
|||
|
|
|
|||
|
|
let nkey_seed = nats_section
|
|||
|
|
.get("nkey_seed")
|
|||
|
|
.and_then(|s| s.as_str())
|
|||
|
|
.map(|s| s.to_string());
|
|||
|
|
|
|||
|
|
let require_signed = nats_section
|
|||
|
|
.get("require_signed_messages")
|
|||
|
|
.and_then(|r| r.as_bool())
|
|||
|
|
.unwrap_or(false);
|
|||
|
|
|
|||
|
|
let trusted_nkeys = nats_section
|
|||
|
|
.get("trusted_nkeys")
|
|||
|
|
.and_then(|t| t.as_array())
|
|||
|
|
.map(|arr| {
|
|||
|
|
arr.iter()
|
|||
|
|
.filter_map(|v| v.as_str().map(String::from))
|
|||
|
|
.collect()
|
|||
|
|
})
|
|||
|
|
.unwrap_or_default();
|
|||
|
|
|
|||
|
|
let conn_cfg = NatsConnectionConfig {
|
|||
|
|
url: url.clone(),
|
|||
|
|
nkey_seed,
|
|||
|
|
require_signed_messages: require_signed,
|
|||
|
|
trusted_nkeys,
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
let mut stream = match tokio::time::timeout(
|
|||
|
|
std::time::Duration::from_secs(3),
|
|||
|
|
EventStream::connect_client(&conn_cfg),
|
|||
|
|
)
|
|||
|
|
.await
|
|||
|
|
{
|
|||
|
|
Ok(Ok(s)) => s,
|
|||
|
|
Ok(Err(e)) => {
|
|||
|
|
warn!(error = %e, url = %url, "NATS connection failed — running without events");
|
|||
|
|
return Ok(None);
|
|||
|
|
}
|
|||
|
|
Err(_) => {
|
|||
|
|
warn!(url = %url, "NATS connection timed out — running without events");
|
|||
|
|
return Ok(None);
|
|||
|
|
}
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
info!(url = %url, "NATS connected");
|
|||
|
|
|
|||
|
|
// Apply topology from streams_config file declared in project config.
|
|||
|
|
// Fallback: NATS_STREAMS_CONFIG env var.
|
|||
|
|
let topology_path = nats_section
|
|||
|
|
.get("streams_config")
|
|||
|
|
.and_then(|s| s.as_str())
|
|||
|
|
.map(std::path::PathBuf::from);
|
|||
|
|
|
|||
|
|
let topology = match TopologyConfig::load(topology_path.as_deref()) {
|
|||
|
|
Ok(Some(t)) => Some(t),
|
|||
|
|
Ok(None) => {
|
|||
|
|
warn!("no topology config found — publish-only mode (no consumer bound)");
|
|||
|
|
None
|
|||
|
|
}
|
|||
|
|
Err(e) => {
|
|||
|
|
warn!(error = %e, "topology config load failed — publish-only mode");
|
|||
|
|
None
|
|||
|
|
}
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
if let Some(ref topo) = topology {
|
|||
|
|
match stream.apply_topology(topo).await {
|
|||
|
|
Ok(report) => info!(
|
|||
|
|
streams = report.streams_applied,
|
|||
|
|
consumers = report.consumers_applied,
|
|||
|
|
"topology applied"
|
|||
|
|
),
|
|||
|
|
Err(e) => warn!(error = %e, "topology apply failed — publish-only mode"),
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Bind to daemon consumer on the first stream (convention: "daemon-{project}").
|
|||
|
|
if let Some(ref topo) = topology {
|
|||
|
|
if let Some(first_stream) = topo.streams.first() {
|
|||
|
|
let consumer_name = format!("daemon-{project}");
|
|||
|
|
if let Err(e) = stream
|
|||
|
|
.bind_consumer(&first_stream.name, &consumer_name)
|
|||
|
|
.await
|
|||
|
|
{
|
|||
|
|
warn!(error = %e, "failed to bind daemon consumer — pull_events disabled");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
Ok(Some(Self {
|
|||
|
|
stream,
|
|||
|
|
project,
|
|||
|
|
port,
|
|||
|
|
}))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub async fn publish_started(&self) -> Result<()> {
|
|||
|
|
let payload = json!({
|
|||
|
|
"project": self.project,
|
|||
|
|
"port": self.port,
|
|||
|
|
"timestamp": iso8601_now(),
|
|||
|
|
});
|
|||
|
|
self.stream
|
|||
|
|
.publish("ecosystem.daemon.started", Bytes::from(payload.to_string()))
|
|||
|
|
.await?;
|
|||
|
|
info!(port = self.port, "published daemon.started");
|
|||
|
|
Ok(())
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub async fn publish_stopped(&self, uptime_secs: u64) -> Result<()> {
|
|||
|
|
let payload = json!({
|
|||
|
|
"project": self.project,
|
|||
|
|
"uptime_seconds": uptime_secs,
|
|||
|
|
"timestamp": iso8601_now(),
|
|||
|
|
});
|
|||
|
|
self.stream
|
|||
|
|
.publish("ecosystem.daemon.stopped", Bytes::from(payload.to_string()))
|
|||
|
|
.await?;
|
|||
|
|
info!(uptime = uptime_secs, "published daemon.stopped");
|
|||
|
|
Ok(())
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// Publish a file change notification for a specific project and event
|
|||
|
|
/// type.
|
|||
|
|
pub async fn publish_notification(
|
|||
|
|
&self,
|
|||
|
|
project: &str,
|
|||
|
|
event: &crate::notifications::NotificationEvent,
|
|||
|
|
files: &[String],
|
|||
|
|
) -> Result<()> {
|
|||
|
|
let subject = format!("ecosystem.{project}.{}", event.nats_suffix());
|
|||
|
|
let payload = json!({
|
|||
|
|
"project": project,
|
|||
|
|
"event": format!("{event:?}"),
|
|||
|
|
"files": files,
|
|||
|
|
"timestamp": iso8601_now(),
|
|||
|
|
});
|
|||
|
|
self.stream
|
|||
|
|
.publish(&subject, Bytes::from(payload.to_string()))
|
|||
|
|
.await?;
|
|||
|
|
info!(subject = %subject, files = files.len(), "published notification");
|
|||
|
|
Ok(())
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// Publish an actor registration event.
|
|||
|
|
pub async fn publish_actor_registered(
|
|||
|
|
&self,
|
|||
|
|
token: &str,
|
|||
|
|
actor_type: &str,
|
|||
|
|
project: &str,
|
|||
|
|
) -> Result<()> {
|
|||
|
|
let payload = json!({
|
|||
|
|
"token": token,
|
|||
|
|
"actor_type": actor_type,
|
|||
|
|
"project": project,
|
|||
|
|
"timestamp": iso8601_now(),
|
|||
|
|
});
|
|||
|
|
self.stream
|
|||
|
|
.publish(
|
|||
|
|
"ecosystem.actor.registered",
|
|||
|
|
Bytes::from(payload.to_string()),
|
|||
|
|
)
|
|||
|
|
.await?;
|
|||
|
|
info!(token = %token, "published actor.registered");
|
|||
|
|
Ok(())
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// Publish an actor deregistration event.
|
|||
|
|
pub async fn publish_actor_deregistered(&self, token: &str, reason: &str) -> Result<()> {
|
|||
|
|
let payload = json!({
|
|||
|
|
"token": token,
|
|||
|
|
"reason": reason,
|
|||
|
|
"timestamp": iso8601_now(),
|
|||
|
|
});
|
|||
|
|
self.stream
|
|||
|
|
.publish(
|
|||
|
|
"ecosystem.actor.deregistered",
|
|||
|
|
Bytes::from(payload.to_string()),
|
|||
|
|
)
|
|||
|
|
.await?;
|
|||
|
|
info!(token = %token, reason = %reason, "published actor.deregistered");
|
|||
|
|
Ok(())
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// Publish a file change event for a project (general, not requiring ack).
|
|||
|
|
pub async fn publish_file_changed(&self, project: &str, files: &[String]) -> Result<()> {
|
|||
|
|
let subject = format!("ecosystem.{project}.file.changed");
|
|||
|
|
let payload = json!({
|
|||
|
|
"project": project,
|
|||
|
|
"files": files,
|
|||
|
|
"timestamp": iso8601_now(),
|
|||
|
|
});
|
|||
|
|
self.stream
|
|||
|
|
.publish(&subject, Bytes::from(payload.to_string()))
|
|||
|
|
.await?;
|
|||
|
|
Ok(())
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub async fn publish_cache_invalidated(&self, reason: &str) -> Result<()> {
|
|||
|
|
let payload = json!({
|
|||
|
|
"project": self.project,
|
|||
|
|
"reason": reason,
|
|||
|
|
"affected_keys": [],
|
|||
|
|
"timestamp": iso8601_now(),
|
|||
|
|
});
|
|||
|
|
self.stream
|
|||
|
|
.publish(
|
|||
|
|
"ecosystem.daemon.cache.invalidated",
|
|||
|
|
Bytes::from(payload.to_string()),
|
|||
|
|
)
|
|||
|
|
.await?;
|
|||
|
|
info!(reason = %reason, "published daemon.cache.invalidated");
|
|||
|
|
Ok(())
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// Pull pending messages from the bound JetStream consumer.
|
|||
|
|
/// Returns (subject, parsed_json) for each valid message.
|
|||
|
|
/// Returns empty vec if no consumer is bound (publish-only mode).
|
|||
|
|
pub async fn pull_events(&self, max_msgs: usize) -> Result<Vec<(String, Value)>> {
|
|||
|
|
let batch = match self.stream.pull_batch(max_msgs).await {
|
|||
|
|
Ok(b) => b,
|
|||
|
|
Err(e) => {
|
|||
|
|
let msg = e.to_string();
|
|||
|
|
if msg.contains("no consumer bound") {
|
|||
|
|
return Ok(Vec::new());
|
|||
|
|
}
|
|||
|
|
return Err(e);
|
|||
|
|
}
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
let mut events = Vec::with_capacity(batch.len());
|
|||
|
|
|
|||
|
|
for (subject, payload_bytes, msg) in batch {
|
|||
|
|
match serde_json::from_slice::<Value>(&payload_bytes) {
|
|||
|
|
Ok(json) => events.push((subject, json)),
|
|||
|
|
Err(e) => {
|
|||
|
|
warn!(error = %e, subject = %subject, "invalid JSON in NATS message — skipping");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
let _ = msg.ack().await;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
Ok(events)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// Extract (mode_id, params) from a reflection.request payload.
|
|||
|
|
pub fn parse_reflection_request(payload: &Value) -> Option<(String, Value)> {
|
|||
|
|
let mode_id = payload.get("mode_id")?.as_str()?.to_string();
|
|||
|
|
let params = payload
|
|||
|
|
.get("params")
|
|||
|
|
.cloned()
|
|||
|
|
.unwrap_or(Value::Object(Default::default()));
|
|||
|
|
Some((mode_id, params))
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// Load project NATS config from .ontoref/config.ncl via nickel export.
|
|||
|
|
#[cfg(feature = "nats")]
|
|||
|
|
fn load_nats_config(config_path: &PathBuf) -> Result<Value> {
|
|||
|
|
if !config_path.exists() {
|
|||
|
|
return Ok(json!({}));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
let output = Command::new("nickel")
|
|||
|
|
.arg("export")
|
|||
|
|
.arg(config_path)
|
|||
|
|
.output()
|
|||
|
|
.map_err(|e| anyhow!("running nickel export: {e}"))?;
|
|||
|
|
|
|||
|
|
if !output.status.success() {
|
|||
|
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
|||
|
|
return Err(anyhow!("nickel export failed: {stderr}"));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
serde_json::from_slice(&output.stdout).map_err(|e| anyhow!("parsing nickel export output: {e}"))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// ISO 8601 timestamp (UTC) without external dependency.
|
|||
|
|
#[cfg(feature = "nats")]
|
|||
|
|
fn iso8601_now() -> String {
|
|||
|
|
let now = SystemTime::now()
|
|||
|
|
.duration_since(UNIX_EPOCH)
|
|||
|
|
.unwrap_or_default();
|
|||
|
|
|
|||
|
|
let secs = now.as_secs();
|
|||
|
|
let micros = now.subsec_micros();
|
|||
|
|
|
|||
|
|
let days = secs / 86400;
|
|||
|
|
let day_secs = secs % 86400;
|
|||
|
|
|
|||
|
|
// Gregorian approximation (valid for 1970–2099)
|
|||
|
|
let year = 1970 + (days / 365);
|
|||
|
|
let month = ((days % 365) / 30) + 1;
|
|||
|
|
let day = ((days % 365) % 30) + 1;
|
|||
|
|
|
|||
|
|
format!(
|
|||
|
|
"{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:06}Z",
|
|||
|
|
year,
|
|||
|
|
month,
|
|||
|
|
day,
|
|||
|
|
day_secs / 3600,
|
|||
|
|
(day_secs % 3600) / 60,
|
|||
|
|
day_secs % 60,
|
|||
|
|
micros,
|
|||
|
|
)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ── No-op implementation when nats feature is disabled ────────────────
|
|||
|
|
|
|||
|
|
#[cfg(not(feature = "nats"))]
|
|||
|
|
pub struct NatsPublisher;
|
|||
|
|
|
|||
|
|
#[cfg(not(feature = "nats"))]
|
|||
|
|
impl NatsPublisher {
|
|||
|
|
pub async fn connect(
|
|||
|
|
_config_path: &std::path::PathBuf,
|
|||
|
|
_project: String,
|
|||
|
|
_port: u16,
|
|||
|
|
) -> anyhow::Result<Option<Self>> {
|
|||
|
|
Ok(None)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub async fn publish_started(&self) -> anyhow::Result<()> {
|
|||
|
|
Ok(())
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub async fn publish_stopped(&self, _uptime_secs: u64) -> anyhow::Result<()> {
|
|||
|
|
Ok(())
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub async fn publish_notification(
|
|||
|
|
&self,
|
|||
|
|
_project: &str,
|
|||
|
|
_event: &crate::notifications::NotificationEvent,
|
|||
|
|
_files: &[String],
|
|||
|
|
) -> anyhow::Result<()> {
|
|||
|
|
Ok(())
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub async fn publish_actor_registered(
|
|||
|
|
&self,
|
|||
|
|
_token: &str,
|
|||
|
|
_actor_type: &str,
|
|||
|
|
_project: &str,
|
|||
|
|
) -> anyhow::Result<()> {
|
|||
|
|
Ok(())
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub async fn publish_actor_deregistered(
|
|||
|
|
&self,
|
|||
|
|
_token: &str,
|
|||
|
|
_reason: &str,
|
|||
|
|
) -> anyhow::Result<()> {
|
|||
|
|
Ok(())
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub async fn publish_file_changed(
|
|||
|
|
&self,
|
|||
|
|
_project: &str,
|
|||
|
|
_files: &[String],
|
|||
|
|
) -> anyhow::Result<()> {
|
|||
|
|
Ok(())
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub async fn publish_cache_invalidated(&self, _reason: &str) -> anyhow::Result<()> {
|
|||
|
|
Ok(())
|
|||
|
|
}
|
|||
|
|
}
|