173 lines
6.4 KiB
Rust
Raw Permalink Normal View History

/// Integration test: ActionGraph + PipelineContext + StateTracker working together.
///
/// Simulates a two-stage pipeline data plane (lint → build) without requiring
/// Vault, Nu executors, or external services.
use std::{path::PathBuf, sync::Arc};
use stratum_graph::{
types::{ActionNode, BackoffStrategy, Capability, NodeId, RetryPolicy},
ActionGraph,
};
use stratum_orchestrator::context::PipelineContext;
use stratum_state::{InMemoryStateTracker, PipelineStatus, StateTracker};
fn cap(s: &str) -> Capability {
Capability(s.to_owned())
}
fn node(id: &str, inputs: &[&str], outputs: &[&str], triggers: &[&str]) -> ActionNode {
ActionNode {
id: NodeId(id.to_owned()),
handler: PathBuf::from(format!("scripts/nu/{id}.nu")),
input_schemas: inputs
.iter()
.map(|c| (cap(c), PathBuf::from(format!("schemas/{c}.ncl"))))
.collect(),
output_schemas: outputs
.iter()
.map(|c| (cap(c), PathBuf::from(format!("schemas/{c}.ncl"))))
.collect(),
compensate: None,
retry: RetryPolicy {
max: 0,
backoff_secs: 1,
strategy: BackoffStrategy::Fixed,
},
timeout_secs: 30,
atomic: false,
triggers: triggers.iter().map(|t| t.to_string()).collect(),
}
}
async fn make_ctx() -> (PipelineContext, Arc<InMemoryStateTracker>) {
let state = Arc::new(InMemoryStateTracker::new());
let ctx = PipelineContext::new(
"dev.crate.foo.modified".to_string(),
serde_json::json!({"crate": "foo"}),
Arc::clone(&state) as Arc<dyn StateTracker>,
PathBuf::from("/tmp/no-schemas"),
)
.await
.unwrap();
(ctx, state)
}
#[tokio::test]
async fn test_two_stage_pipeline_data_plane() {
let (ctx, state) = make_ctx().await;
// Build the graph: lint (stage 0) → build (stage 1)
let nodes = vec![
node("lint", &[], &["linted"], &["dev.crate.>", "dev.>", "dev.crate.foo.modified"]),
node("build", &["linted"], &["built"], &[]),
];
let graph = ActionGraph::from_nodes(nodes).unwrap();
// Plan the graph for our trigger subject
let stages = graph
.plan("dev.crate.foo.modified", &ctx.trigger_payload)
.unwrap();
assert_eq!(stages.len(), 2, "expected two stages");
assert_eq!(stages[0], vec![NodeId("lint".into())]);
assert_eq!(stages[1], vec![NodeId("build".into())]);
// ── Stage 0: lint ──────────────────────────────────────────────────────
// lint has no inputs; deposits "linted" capability
let linted_value = serde_json::json!({"warnings": 0, "errors": 0});
ctx.deposit(&cap("linted"), linted_value.clone())
.await
.unwrap();
// ── Stage 1: build ─────────────────────────────────────────────────────
// build reads "linted" produced by stage 0
let inputs = ctx.extract_inputs(&[cap("linted")]).await.unwrap();
assert_eq!(inputs[&cap("linted")], linted_value);
let built_value = serde_json::json!({"artifact": "target/release/foo", "size_bytes": 4096});
ctx.deposit(&cap("built"), built_value.clone()).await.unwrap();
// ── Final state assertions ─────────────────────────────────────────────
state
.update_status(&ctx.run_id, PipelineStatus::Success)
.await
.unwrap();
let run = state.get_run(&ctx.run_id).await.unwrap().unwrap();
assert_eq!(run.status, PipelineStatus::Success);
assert_eq!(run.trigger_subject, "dev.crate.foo.modified");
// Both capabilities must be retrievable
let loaded_linted = state
.load_capability(&ctx.run_id, &cap("linted"))
.await
.unwrap();
let loaded_built = state
.load_capability(&ctx.run_id, &cap("built"))
.await
.unwrap();
assert_eq!(loaded_linted, Some(linted_value));
assert_eq!(loaded_built, Some(built_value));
}
#[tokio::test]
async fn test_graph_plan_wildcard_subject_routing() {
let nodes = vec![
node("lint", &[], &["linted"], &["dev.crate.>"]),
node("build", &["linted"], &["built"], &[]),
];
let graph = ActionGraph::from_nodes(nodes).unwrap();
// Any subject matching `dev.crate.>` should produce a plan
for subject in ["dev.crate.foo.modified", "dev.crate.bar.changed", "dev.crate.x"] {
let stages = graph.plan(subject, &serde_json::Value::Null).unwrap();
assert!(!stages.is_empty(), "no plan for subject '{subject}'");
assert_eq!(stages[0][0], NodeId("lint".into()));
}
// Subject not matching should return an error
let err = graph
.plan("prod.deploy.triggered", &serde_json::Value::Null)
.unwrap_err();
assert!(err.to_string().contains("no entry nodes match"));
}
#[tokio::test]
async fn test_multiple_contexts_share_state() {
// Two independent pipelines on the same StateTracker must not interfere.
let state = Arc::new(InMemoryStateTracker::new());
let ctx_a = PipelineContext::new(
"dev.crate.a.modified".to_string(),
serde_json::Value::Null,
Arc::clone(&state) as Arc<dyn StateTracker>,
PathBuf::from("/tmp/no-schemas"),
)
.await
.unwrap();
let ctx_b = PipelineContext::new(
"dev.crate.b.modified".to_string(),
serde_json::Value::Null,
Arc::clone(&state) as Arc<dyn StateTracker>,
PathBuf::from("/tmp/no-schemas"),
)
.await
.unwrap();
let linted_a = serde_json::json!({"crate": "a", "warnings": 0});
let linted_b = serde_json::json!({"crate": "b", "warnings": 2});
ctx_a.deposit(&cap("linted"), linted_a.clone()).await.unwrap();
ctx_b.deposit(&cap("linted"), linted_b.clone()).await.unwrap();
// Each context sees its own value, not the other's
assert_eq!(ctx_a.extract(&cap("linted")).await.unwrap(), linted_a);
assert_eq!(ctx_b.extract(&cap("linted")).await.unwrap(), linted_b);
// Verify via StateTracker too
let val_a = state.load_capability(&ctx_a.run_id, &cap("linted")).await.unwrap();
let val_b = state.load_capability(&ctx_b.run_id, &cap("linted")).await.unwrap();
assert_eq!(val_a, Some(linted_a));
assert_eq!(val_b, Some(linted_b));
}