/// Integration test: ActionGraph + PipelineContext + StateTracker working together. /// /// Simulates a two-stage pipeline data plane (lint → build) without requiring /// Vault, Nu executors, or external services. use std::{path::PathBuf, sync::Arc}; use stratum_graph::{ types::{ActionNode, BackoffStrategy, Capability, NodeId, RetryPolicy}, ActionGraph, }; use stratum_orchestrator::context::PipelineContext; use stratum_state::{InMemoryStateTracker, PipelineStatus, StateTracker}; fn cap(s: &str) -> Capability { Capability(s.to_owned()) } fn node(id: &str, inputs: &[&str], outputs: &[&str], triggers: &[&str]) -> ActionNode { ActionNode { id: NodeId(id.to_owned()), handler: PathBuf::from(format!("scripts/nu/{id}.nu")), input_schemas: inputs .iter() .map(|c| (cap(c), PathBuf::from(format!("schemas/{c}.ncl")))) .collect(), output_schemas: outputs .iter() .map(|c| (cap(c), PathBuf::from(format!("schemas/{c}.ncl")))) .collect(), compensate: None, retry: RetryPolicy { max: 0, backoff_secs: 1, strategy: BackoffStrategy::Fixed, }, timeout_secs: 30, atomic: false, triggers: triggers.iter().map(|t| t.to_string()).collect(), } } async fn make_ctx() -> (PipelineContext, Arc) { let state = Arc::new(InMemoryStateTracker::new()); let ctx = PipelineContext::new( "dev.crate.foo.modified".to_string(), serde_json::json!({"crate": "foo"}), Arc::clone(&state) as Arc, PathBuf::from("/tmp/no-schemas"), ) .await .unwrap(); (ctx, state) } #[tokio::test] async fn test_two_stage_pipeline_data_plane() { let (ctx, state) = make_ctx().await; // Build the graph: lint (stage 0) → build (stage 1) let nodes = vec![ node("lint", &[], &["linted"], &["dev.crate.>", "dev.>", "dev.crate.foo.modified"]), node("build", &["linted"], &["built"], &[]), ]; let graph = ActionGraph::from_nodes(nodes).unwrap(); // Plan the graph for our trigger subject let stages = graph .plan("dev.crate.foo.modified", &ctx.trigger_payload) .unwrap(); assert_eq!(stages.len(), 2, "expected two stages"); assert_eq!(stages[0], vec![NodeId("lint".into())]); assert_eq!(stages[1], vec![NodeId("build".into())]); // ── Stage 0: lint ────────────────────────────────────────────────────── // lint has no inputs; deposits "linted" capability let linted_value = serde_json::json!({"warnings": 0, "errors": 0}); ctx.deposit(&cap("linted"), linted_value.clone()) .await .unwrap(); // ── Stage 1: build ───────────────────────────────────────────────────── // build reads "linted" produced by stage 0 let inputs = ctx.extract_inputs(&[cap("linted")]).await.unwrap(); assert_eq!(inputs[&cap("linted")], linted_value); let built_value = serde_json::json!({"artifact": "target/release/foo", "size_bytes": 4096}); ctx.deposit(&cap("built"), built_value.clone()).await.unwrap(); // ── Final state assertions ───────────────────────────────────────────── state .update_status(&ctx.run_id, PipelineStatus::Success) .await .unwrap(); let run = state.get_run(&ctx.run_id).await.unwrap().unwrap(); assert_eq!(run.status, PipelineStatus::Success); assert_eq!(run.trigger_subject, "dev.crate.foo.modified"); // Both capabilities must be retrievable let loaded_linted = state .load_capability(&ctx.run_id, &cap("linted")) .await .unwrap(); let loaded_built = state .load_capability(&ctx.run_id, &cap("built")) .await .unwrap(); assert_eq!(loaded_linted, Some(linted_value)); assert_eq!(loaded_built, Some(built_value)); } #[tokio::test] async fn test_graph_plan_wildcard_subject_routing() { let nodes = vec![ node("lint", &[], &["linted"], &["dev.crate.>"]), node("build", &["linted"], &["built"], &[]), ]; let graph = ActionGraph::from_nodes(nodes).unwrap(); // Any subject matching `dev.crate.>` should produce a plan for subject in ["dev.crate.foo.modified", "dev.crate.bar.changed", "dev.crate.x"] { let stages = graph.plan(subject, &serde_json::Value::Null).unwrap(); assert!(!stages.is_empty(), "no plan for subject '{subject}'"); assert_eq!(stages[0][0], NodeId("lint".into())); } // Subject not matching should return an error let err = graph .plan("prod.deploy.triggered", &serde_json::Value::Null) .unwrap_err(); assert!(err.to_string().contains("no entry nodes match")); } #[tokio::test] async fn test_multiple_contexts_share_state() { // Two independent pipelines on the same StateTracker must not interfere. let state = Arc::new(InMemoryStateTracker::new()); let ctx_a = PipelineContext::new( "dev.crate.a.modified".to_string(), serde_json::Value::Null, Arc::clone(&state) as Arc, PathBuf::from("/tmp/no-schemas"), ) .await .unwrap(); let ctx_b = PipelineContext::new( "dev.crate.b.modified".to_string(), serde_json::Value::Null, Arc::clone(&state) as Arc, PathBuf::from("/tmp/no-schemas"), ) .await .unwrap(); let linted_a = serde_json::json!({"crate": "a", "warnings": 0}); let linted_b = serde_json::json!({"crate": "b", "warnings": 2}); ctx_a.deposit(&cap("linted"), linted_a.clone()).await.unwrap(); ctx_b.deposit(&cap("linted"), linted_b.clone()).await.unwrap(); // Each context sees its own value, not the other's assert_eq!(ctx_a.extract(&cap("linted")).await.unwrap(), linted_a); assert_eq!(ctx_b.extract(&cap("linted")).await.unwrap(), linted_b); // Verify via StateTracker too let val_a = state.load_capability(&ctx_a.run_id, &cap("linted")).await.unwrap(); let val_b = state.load_capability(&ctx_b.run_id, &cap("linted")).await.unwrap(); assert_eq!(val_a, Some(linted_a)); assert_eq!(val_b, Some(linted_b)); }