use std::{path::PathBuf, sync::Arc, time::Duration}; use clap::Parser; use futures::StreamExt; use ops_keeper::{ audit::emit_audit, config::KeeperConfig, nats_client::KeeperNats, pending::{extract_op_type, log_decision, parse_pending}, policy::{load_policy, Decision, PolicyMatcher}, signer::Signer, }; use tracing::{error, info, warn}; #[derive(Parser)] #[command(name = "keeper-daemon", about = "Auto-signing keeper daemon (ADR-037)")] struct Cli { #[arg(short, long, default_value = "keeper-daemon.toml")] config: PathBuf, } #[tokio::main] async fn main() -> Result<(), Box> { tracing_subscriber::fmt() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .init(); let cli = Cli::parse(); let cfg = KeeperConfig::from_toml(&cli.config)?; // Load signing key let private_pem = std::fs::read(&cfg.private_key_path)?; let public_pem = std::fs::read(&cfg.public_key_path)?; let signer = Arc::new(Signer::from_pem_files( &private_pem, &public_pem, cfg.issuer_id.clone(), cfg.workspace.clone(), cfg.token_validity_secs, )?); // Load and compile policy let policy = load_policy(&cfg.policy_path)?; info!(version = policy.version, "policy loaded"); let matcher = Arc::new(PolicyMatcher::from_policy(&policy)?); // Connect to NATS let nats = Arc::new(KeeperNats::connect(&cfg).await?); info!("keeper-daemon ready; subscribing to ops.pending.{}.>", cfg.workspace); let consumer_name = format!("{}-keeper-daemon", cfg.workspace); let mut messages = nats.pending_consumer(&consumer_name).await?; while let Some(msg_result) = messages.next().await { let msg = match msg_result { Ok(m) => m, Err(e) => { error!("NATS message error: {e}"); continue; } }; let subject = msg.subject.to_string(); let op_type = match extract_op_type(&subject) { Some(t) => t.to_string(), None => { warn!(subject, "cannot extract op_type from subject; acking and skipping"); let _ = msg.ack().await; continue; } }; let op = match parse_pending(&msg.payload) { Ok(op) => op, Err(e) => { error!(subject, error = %e, "failed to parse pending op; acking and skipping"); let _ = msg.ack().await; continue; } }; let decision = matcher.decide(&op); log_decision(&op, decision.as_str()); match decision { Decision::AutoSign => { let jwt = match signer.sign_op(&op) { Ok(jwt) => jwt, Err(e) => { error!(error = %e, "JWT signing failed; leaving in pending queue"); // Do NOT ack — leave for retry continue; } }; if let Err(e) = nats.publish_signed(&op_type, &jwt).await { error!(error = %e, "failed to publish signed command; leaving in pending queue"); continue; } if let Err(e) = emit_audit(&nats, &op, &uuid::Uuid::new_v4().to_string(), signer.issuer_id(), decision.as_str()).await { warn!(error = %e, "audit emit failed (non-fatal; signed command already published)"); } if let Err(e) = msg.ack().await { warn!(error = %e, "failed to ack NATS message after signing (idempotent on retry)"); } } Decision::RequireManual => { if let Err(e) = emit_audit(&nats, &op, "hold", signer.issuer_id(), decision.as_str()).await { warn!(error = %e, "audit emit failed for require_manual decision"); } // Re-NAK with backoff so JetStream re-delivers after a delay let _ = msg.ack_with(async_nats::jetstream::AckKind::Nak(Some(Duration::from_secs(30)))).await; } Decision::HoldPending => { // Do not ack — message stays in WorkQueue until manually signed via keeper-cli let _ = msg.ack_with(async_nats::jetstream::AckKind::Nak(Some(Duration::from_secs(30)))).await; } } } warn!("NATS message stream ended; keeper-daemon exiting"); Ok(()) }