/// Integration test: Saga compensation state machine. /// /// Validates the state transition sequence: /// Running → (stage 1 succeeds) → Compensating → Compensated /// and that capabilities deposited in stage 1 remain accessible during compensation. use std::{path::PathBuf, sync::Arc}; use stratum_graph::types::Capability; use stratum_orchestrator::context::PipelineContext; use stratum_state::{InMemoryStateTracker, PipelineStatus, StateTracker, StepRecord}; use stratum_graph::types::NodeId; fn cap(s: &str) -> Capability { Capability(s.to_owned()) } async fn make_ctx() -> (PipelineContext, Arc) { let state = Arc::new(InMemoryStateTracker::new()); let ctx = PipelineContext::new( "dev.crate.foo.modified".to_string(), serde_json::Value::Null, Arc::clone(&state) as Arc, PathBuf::from("/tmp/no-schemas"), ) .await .unwrap(); (ctx, state) } #[tokio::test] async fn test_compensation_state_transitions() { let (ctx, state) = make_ctx().await; // Stage 0 (lint) succeeds — deposit capability and record step let lint_node = NodeId("lint".into()); let step_start = StepRecord::start(lint_node.clone()); state.record_step(&ctx.run_id, &step_start).await.unwrap(); let linted_value = serde_json::json!({"warnings": 0}); ctx.deposit(&cap("linted"), linted_value.clone()) .await .unwrap(); let step_done = step_start.succeed(vec![cap("linted")]); state.record_step(&ctx.run_id, &step_done).await.unwrap(); // Stage 1 (build) fails — transition to Compensating state .update_status(&ctx.run_id, PipelineStatus::Compensating) .await .unwrap(); let run = state.get_run(&ctx.run_id).await.unwrap().unwrap(); assert_eq!(run.status, PipelineStatus::Compensating); // Compensation: lint's rollback reads the "linted" capability from DB // (in production NuExecutor would call the compensation script; // here we verify the data is accessible) let linted_in_db = state .load_capability(&ctx.run_id, &cap("linted")) .await .unwrap(); assert_eq!( linted_in_db, Some(linted_value), "linted capability must survive into compensation phase" ); // Compensation completes state .update_status(&ctx.run_id, PipelineStatus::Compensated) .await .unwrap(); let run = state.get_run(&ctx.run_id).await.unwrap().unwrap(); assert_eq!(run.status, PipelineStatus::Compensated); // Step record must show lint succeeded before compensation started assert_eq!(run.steps.len(), 1, "exactly one step record (lint)"); assert_eq!(run.steps[0].node_id, lint_node); assert_eq!(run.steps[0].capabilities_deposited, vec![cap("linted")]); } #[tokio::test] async fn test_step_upsert_within_saga() { let (ctx, state) = make_ctx().await; let node_id = NodeId("fmt".into()); // Record start, then update to success (upsert must not duplicate) let start = StepRecord::start(node_id.clone()); state.record_step(&ctx.run_id, &start).await.unwrap(); let done = start.succeed(vec![cap("formatted")]); state.record_step(&ctx.run_id, &done).await.unwrap(); let run = state.get_run(&ctx.run_id).await.unwrap().unwrap(); assert_eq!(run.steps.len(), 1, "upsert must not duplicate step records"); assert_eq!( run.steps[0].capabilities_deposited, vec![cap("formatted")], "step must reflect the succeeded record" ); } #[tokio::test] async fn test_multiple_stages_compensated_in_order() { let (ctx, state) = make_ctx().await; // Simulate two successful stages before failure for (name, cap_name) in [("lint", "linted"), ("fmt", "formatted")] { let step = StepRecord::start(NodeId(name.into())); state.record_step(&ctx.run_id, &step).await.unwrap(); ctx.deposit(&cap(cap_name), serde_json::json!({"ok": true})) .await .unwrap(); let done = step.succeed(vec![cap(cap_name)]); state.record_step(&ctx.run_id, &done).await.unwrap(); } // Third stage fails → compensate state .update_status(&ctx.run_id, PipelineStatus::Compensating) .await .unwrap(); // Both capabilities from stages 0 and 1 must remain accessible for cap_name in ["linted", "formatted"] { let v = state .load_capability(&ctx.run_id, &cap(cap_name)) .await .unwrap(); assert!( v.is_some(), "capability '{cap_name}' must be accessible during compensation" ); } state .update_status(&ctx.run_id, PipelineStatus::Compensated) .await .unwrap(); let run = state.get_run(&ctx.run_id).await.unwrap().unwrap(); assert_eq!(run.status, PipelineStatus::Compensated); assert_eq!(run.steps.len(), 2, "two step records from two stages"); }