173 lines
6.4 KiB
Rust
173 lines
6.4 KiB
Rust
|
|
/// Integration test: ActionGraph + PipelineContext + StateTracker working together.
|
||
|
|
///
|
||
|
|
/// Simulates a two-stage pipeline data plane (lint → build) without requiring
|
||
|
|
/// Vault, Nu executors, or external services.
|
||
|
|
use std::{path::PathBuf, sync::Arc};
|
||
|
|
|
||
|
|
use stratum_graph::{
|
||
|
|
types::{ActionNode, BackoffStrategy, Capability, NodeId, RetryPolicy},
|
||
|
|
ActionGraph,
|
||
|
|
};
|
||
|
|
use stratum_orchestrator::context::PipelineContext;
|
||
|
|
use stratum_state::{InMemoryStateTracker, PipelineStatus, StateTracker};
|
||
|
|
|
||
|
|
fn cap(s: &str) -> Capability {
|
||
|
|
Capability(s.to_owned())
|
||
|
|
}
|
||
|
|
|
||
|
|
fn node(id: &str, inputs: &[&str], outputs: &[&str], triggers: &[&str]) -> ActionNode {
|
||
|
|
ActionNode {
|
||
|
|
id: NodeId(id.to_owned()),
|
||
|
|
handler: PathBuf::from(format!("scripts/nu/{id}.nu")),
|
||
|
|
input_schemas: inputs
|
||
|
|
.iter()
|
||
|
|
.map(|c| (cap(c), PathBuf::from(format!("schemas/{c}.ncl"))))
|
||
|
|
.collect(),
|
||
|
|
output_schemas: outputs
|
||
|
|
.iter()
|
||
|
|
.map(|c| (cap(c), PathBuf::from(format!("schemas/{c}.ncl"))))
|
||
|
|
.collect(),
|
||
|
|
compensate: None,
|
||
|
|
retry: RetryPolicy {
|
||
|
|
max: 0,
|
||
|
|
backoff_secs: 1,
|
||
|
|
strategy: BackoffStrategy::Fixed,
|
||
|
|
},
|
||
|
|
timeout_secs: 30,
|
||
|
|
atomic: false,
|
||
|
|
triggers: triggers.iter().map(|t| t.to_string()).collect(),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
async fn make_ctx() -> (PipelineContext, Arc<InMemoryStateTracker>) {
|
||
|
|
let state = Arc::new(InMemoryStateTracker::new());
|
||
|
|
let ctx = PipelineContext::new(
|
||
|
|
"dev.crate.foo.modified".to_string(),
|
||
|
|
serde_json::json!({"crate": "foo"}),
|
||
|
|
Arc::clone(&state) as Arc<dyn StateTracker>,
|
||
|
|
PathBuf::from("/tmp/no-schemas"),
|
||
|
|
)
|
||
|
|
.await
|
||
|
|
.unwrap();
|
||
|
|
(ctx, state)
|
||
|
|
}
|
||
|
|
|
||
|
|
#[tokio::test]
|
||
|
|
async fn test_two_stage_pipeline_data_plane() {
|
||
|
|
let (ctx, state) = make_ctx().await;
|
||
|
|
|
||
|
|
// Build the graph: lint (stage 0) → build (stage 1)
|
||
|
|
let nodes = vec![
|
||
|
|
node("lint", &[], &["linted"], &["dev.crate.>", "dev.>", "dev.crate.foo.modified"]),
|
||
|
|
node("build", &["linted"], &["built"], &[]),
|
||
|
|
];
|
||
|
|
let graph = ActionGraph::from_nodes(nodes).unwrap();
|
||
|
|
|
||
|
|
// Plan the graph for our trigger subject
|
||
|
|
let stages = graph
|
||
|
|
.plan("dev.crate.foo.modified", &ctx.trigger_payload)
|
||
|
|
.unwrap();
|
||
|
|
assert_eq!(stages.len(), 2, "expected two stages");
|
||
|
|
assert_eq!(stages[0], vec![NodeId("lint".into())]);
|
||
|
|
assert_eq!(stages[1], vec![NodeId("build".into())]);
|
||
|
|
|
||
|
|
// ── Stage 0: lint ──────────────────────────────────────────────────────
|
||
|
|
// lint has no inputs; deposits "linted" capability
|
||
|
|
let linted_value = serde_json::json!({"warnings": 0, "errors": 0});
|
||
|
|
ctx.deposit(&cap("linted"), linted_value.clone())
|
||
|
|
.await
|
||
|
|
.unwrap();
|
||
|
|
|
||
|
|
// ── Stage 1: build ─────────────────────────────────────────────────────
|
||
|
|
// build reads "linted" produced by stage 0
|
||
|
|
let inputs = ctx.extract_inputs(&[cap("linted")]).await.unwrap();
|
||
|
|
assert_eq!(inputs[&cap("linted")], linted_value);
|
||
|
|
|
||
|
|
let built_value = serde_json::json!({"artifact": "target/release/foo", "size_bytes": 4096});
|
||
|
|
ctx.deposit(&cap("built"), built_value.clone()).await.unwrap();
|
||
|
|
|
||
|
|
// ── Final state assertions ─────────────────────────────────────────────
|
||
|
|
state
|
||
|
|
.update_status(&ctx.run_id, PipelineStatus::Success)
|
||
|
|
.await
|
||
|
|
.unwrap();
|
||
|
|
|
||
|
|
let run = state.get_run(&ctx.run_id).await.unwrap().unwrap();
|
||
|
|
assert_eq!(run.status, PipelineStatus::Success);
|
||
|
|
assert_eq!(run.trigger_subject, "dev.crate.foo.modified");
|
||
|
|
|
||
|
|
// Both capabilities must be retrievable
|
||
|
|
let loaded_linted = state
|
||
|
|
.load_capability(&ctx.run_id, &cap("linted"))
|
||
|
|
.await
|
||
|
|
.unwrap();
|
||
|
|
let loaded_built = state
|
||
|
|
.load_capability(&ctx.run_id, &cap("built"))
|
||
|
|
.await
|
||
|
|
.unwrap();
|
||
|
|
assert_eq!(loaded_linted, Some(linted_value));
|
||
|
|
assert_eq!(loaded_built, Some(built_value));
|
||
|
|
}
|
||
|
|
|
||
|
|
#[tokio::test]
|
||
|
|
async fn test_graph_plan_wildcard_subject_routing() {
|
||
|
|
let nodes = vec![
|
||
|
|
node("lint", &[], &["linted"], &["dev.crate.>"]),
|
||
|
|
node("build", &["linted"], &["built"], &[]),
|
||
|
|
];
|
||
|
|
let graph = ActionGraph::from_nodes(nodes).unwrap();
|
||
|
|
|
||
|
|
// Any subject matching `dev.crate.>` should produce a plan
|
||
|
|
for subject in ["dev.crate.foo.modified", "dev.crate.bar.changed", "dev.crate.x"] {
|
||
|
|
let stages = graph.plan(subject, &serde_json::Value::Null).unwrap();
|
||
|
|
assert!(!stages.is_empty(), "no plan for subject '{subject}'");
|
||
|
|
assert_eq!(stages[0][0], NodeId("lint".into()));
|
||
|
|
}
|
||
|
|
|
||
|
|
// Subject not matching should return an error
|
||
|
|
let err = graph
|
||
|
|
.plan("prod.deploy.triggered", &serde_json::Value::Null)
|
||
|
|
.unwrap_err();
|
||
|
|
assert!(err.to_string().contains("no entry nodes match"));
|
||
|
|
}
|
||
|
|
|
||
|
|
#[tokio::test]
|
||
|
|
async fn test_multiple_contexts_share_state() {
|
||
|
|
// Two independent pipelines on the same StateTracker must not interfere.
|
||
|
|
let state = Arc::new(InMemoryStateTracker::new());
|
||
|
|
|
||
|
|
let ctx_a = PipelineContext::new(
|
||
|
|
"dev.crate.a.modified".to_string(),
|
||
|
|
serde_json::Value::Null,
|
||
|
|
Arc::clone(&state) as Arc<dyn StateTracker>,
|
||
|
|
PathBuf::from("/tmp/no-schemas"),
|
||
|
|
)
|
||
|
|
.await
|
||
|
|
.unwrap();
|
||
|
|
|
||
|
|
let ctx_b = PipelineContext::new(
|
||
|
|
"dev.crate.b.modified".to_string(),
|
||
|
|
serde_json::Value::Null,
|
||
|
|
Arc::clone(&state) as Arc<dyn StateTracker>,
|
||
|
|
PathBuf::from("/tmp/no-schemas"),
|
||
|
|
)
|
||
|
|
.await
|
||
|
|
.unwrap();
|
||
|
|
|
||
|
|
let linted_a = serde_json::json!({"crate": "a", "warnings": 0});
|
||
|
|
let linted_b = serde_json::json!({"crate": "b", "warnings": 2});
|
||
|
|
|
||
|
|
ctx_a.deposit(&cap("linted"), linted_a.clone()).await.unwrap();
|
||
|
|
ctx_b.deposit(&cap("linted"), linted_b.clone()).await.unwrap();
|
||
|
|
|
||
|
|
// Each context sees its own value, not the other's
|
||
|
|
assert_eq!(ctx_a.extract(&cap("linted")).await.unwrap(), linted_a);
|
||
|
|
assert_eq!(ctx_b.extract(&cap("linted")).await.unwrap(), linted_b);
|
||
|
|
|
||
|
|
// Verify via StateTracker too
|
||
|
|
let val_a = state.load_capability(&ctx_a.run_id, &cap("linted")).await.unwrap();
|
||
|
|
let val_b = state.load_capability(&ctx_b.run_id, &cap("linted")).await.unwrap();
|
||
|
|
assert_eq!(val_a, Some(linted_a));
|
||
|
|
assert_eq!(val_b, Some(linted_b));
|
||
|
|
}
|