stratumiops/crates/stratum-orchestrator/tests/test_saga_compensation.rs
Jesús Pérez 9095ea6d8e
Some checks failed
Nickel Type Check / Nickel Type Checking (push) Has been cancelled
Rust CI / Security Audit (push) Has been cancelled
Rust CI / Check + Test + Lint (nightly) (push) Has been cancelled
Rust CI / Check + Test + Lint (stable) (push) Has been cancelled
feat: add stratum-orchestrator with graph, state, NATS, and Nickel action nodes
New crates: stratum-orchestrator (Cedar authz, Vault secrets, Nu/agent executors,
  saga runner), stratum-graph (petgraph DAG + SurrealDB repo), stratum-state
  (SurrealDB tracker), platform-nats (NKey auth client), ncl-import-resolver.

  Updates: stratum-embeddings (SurrealDB store + persistent cache), stratum-llm
  circuit breaker. Adds Nickel action-nodes, schemas, config, Nushell scripts,
  docker-compose dev stack, and ADR-003.
2026-02-22 21:33:26 +00:00

150 lines
4.9 KiB
Rust

/// Integration test: Saga compensation state machine.
///
/// Validates the state transition sequence:
/// Running → (stage 1 succeeds) → Compensating → Compensated
/// and that capabilities deposited in stage 1 remain accessible during compensation.
use std::{path::PathBuf, sync::Arc};
use stratum_graph::types::Capability;
use stratum_orchestrator::context::PipelineContext;
use stratum_state::{InMemoryStateTracker, PipelineStatus, StateTracker, StepRecord};
use stratum_graph::types::NodeId;
fn cap(s: &str) -> Capability {
Capability(s.to_owned())
}
async fn make_ctx() -> (PipelineContext, Arc<InMemoryStateTracker>) {
let state = Arc::new(InMemoryStateTracker::new());
let ctx = PipelineContext::new(
"dev.crate.foo.modified".to_string(),
serde_json::Value::Null,
Arc::clone(&state) as Arc<dyn StateTracker>,
PathBuf::from("/tmp/no-schemas"),
)
.await
.unwrap();
(ctx, state)
}
#[tokio::test]
async fn test_compensation_state_transitions() {
let (ctx, state) = make_ctx().await;
// Stage 0 (lint) succeeds — deposit capability and record step
let lint_node = NodeId("lint".into());
let step_start = StepRecord::start(lint_node.clone());
state.record_step(&ctx.run_id, &step_start).await.unwrap();
let linted_value = serde_json::json!({"warnings": 0});
ctx.deposit(&cap("linted"), linted_value.clone())
.await
.unwrap();
let step_done = step_start.succeed(vec![cap("linted")]);
state.record_step(&ctx.run_id, &step_done).await.unwrap();
// Stage 1 (build) fails — transition to Compensating
state
.update_status(&ctx.run_id, PipelineStatus::Compensating)
.await
.unwrap();
let run = state.get_run(&ctx.run_id).await.unwrap().unwrap();
assert_eq!(run.status, PipelineStatus::Compensating);
// Compensation: lint's rollback reads the "linted" capability from DB
// (in production NuExecutor would call the compensation script;
// here we verify the data is accessible)
let linted_in_db = state
.load_capability(&ctx.run_id, &cap("linted"))
.await
.unwrap();
assert_eq!(
linted_in_db,
Some(linted_value),
"linted capability must survive into compensation phase"
);
// Compensation completes
state
.update_status(&ctx.run_id, PipelineStatus::Compensated)
.await
.unwrap();
let run = state.get_run(&ctx.run_id).await.unwrap().unwrap();
assert_eq!(run.status, PipelineStatus::Compensated);
// Step record must show lint succeeded before compensation started
assert_eq!(run.steps.len(), 1, "exactly one step record (lint)");
assert_eq!(run.steps[0].node_id, lint_node);
assert_eq!(run.steps[0].capabilities_deposited, vec![cap("linted")]);
}
#[tokio::test]
async fn test_step_upsert_within_saga() {
let (ctx, state) = make_ctx().await;
let node_id = NodeId("fmt".into());
// Record start, then update to success (upsert must not duplicate)
let start = StepRecord::start(node_id.clone());
state.record_step(&ctx.run_id, &start).await.unwrap();
let done = start.succeed(vec![cap("formatted")]);
state.record_step(&ctx.run_id, &done).await.unwrap();
let run = state.get_run(&ctx.run_id).await.unwrap().unwrap();
assert_eq!(run.steps.len(), 1, "upsert must not duplicate step records");
assert_eq!(
run.steps[0].capabilities_deposited,
vec![cap("formatted")],
"step must reflect the succeeded record"
);
}
#[tokio::test]
async fn test_multiple_stages_compensated_in_order() {
let (ctx, state) = make_ctx().await;
// Simulate two successful stages before failure
for (name, cap_name) in [("lint", "linted"), ("fmt", "formatted")] {
let step = StepRecord::start(NodeId(name.into()));
state.record_step(&ctx.run_id, &step).await.unwrap();
ctx.deposit(&cap(cap_name), serde_json::json!({"ok": true}))
.await
.unwrap();
let done = step.succeed(vec![cap(cap_name)]);
state.record_step(&ctx.run_id, &done).await.unwrap();
}
// Third stage fails → compensate
state
.update_status(&ctx.run_id, PipelineStatus::Compensating)
.await
.unwrap();
// Both capabilities from stages 0 and 1 must remain accessible
for cap_name in ["linted", "formatted"] {
let v = state
.load_capability(&ctx.run_id, &cap(cap_name))
.await
.unwrap();
assert!(
v.is_some(),
"capability '{cap_name}' must be accessible during compensation"
);
}
state
.update_status(&ctx.run_id, PipelineStatus::Compensated)
.await
.unwrap();
let run = state.get_run(&ctx.run_id).await.unwrap().unwrap();
assert_eq!(run.status, PipelineStatus::Compensated);
assert_eq!(run.steps.len(), 2, "two step records from two stages");
}