Vapora/docs/guides/workflow-saga-persistence.md

262 lines
7.0 KiB
Markdown
Raw Normal View History

# Workflow Engine: Persistence, Saga Compensation & Cedar Authorization
How to configure and operate the three hardening layers added in v1.3.0:
crash-recoverable state, Saga-based rollback, and per-stage access control.
## Prerequisites
- Running SurrealDB instance (same one used by the backend)
- `vapora-workflow-engine` v1.3.0+
- Migration `009_workflow_state.surql` applied
## 1. Apply the Database Migration
```bash
surreal import \
--conn ws://localhost:8000 \
--user root \
--pass root \
--ns vapora \
--db vapora \
migrations/009_workflow_state.surql
```
This creates the `workflow_instances` SCHEMAFULL table with indexes on
`template_name` and `created_at`.
## 2. Wire the Store into the Orchestrator
Pass the existing `Surreal<Client>` when constructing `WorkflowOrchestrator`:
```rust
use std::sync::Arc;
use surrealdb::{Surreal, engine::remote::ws::Client};
use vapora_workflow_engine::WorkflowOrchestrator;
async fn build_orchestrator(
db: Arc<Surreal<Client>>,
swarm: Arc<SwarmCoordinator>,
kg: Arc<KGPersistence>,
nats: Arc<async_nats::Client>,
) -> anyhow::Result<Arc<WorkflowOrchestrator>> {
let orchestrator = WorkflowOrchestrator::new(
"config/workflows.toml",
swarm,
kg,
nats,
(*db).clone(), // Surreal<Client> (not Arc)
)
.await?;
Ok(Arc::new(orchestrator))
}
```
On startup the orchestrator calls `store.load_active()` and reinserts any
non-terminal instances from SurrealDB into the in-memory `DashMap`. Workflows
in progress before a crash resume from their last persisted state.
## 3. Configure Saga Compensation
Add `compensation_agents` to each stage that should trigger a rollback task
when the workflow fails after that stage has completed:
```toml
# config/workflows.toml
[engine]
max_parallel_tasks = 10
workflow_timeout = 3600
approval_gates_enabled = true
[[workflows]]
name = "deploy_pipeline"
trigger = "manual"
[[workflows.stages]]
name = "provision"
agents = ["devops"]
compensation_agents = ["devops"] # ← Saga: send rollback task to devops
[[workflows.stages]]
name = "migrate_db"
agents = ["backend"]
compensation_agents = ["backend"] # ← Saga: send DB rollback task to backend
[[workflows.stages]]
name = "smoke_test"
agents = ["tester"]
# no compensation_agents → skipped in Saga reversal
```
### Compensation Task Payload
When a non-retryable failure occurs at stage N, the `SagaCompensator` iterates
stages `[0..N]` in reverse order. For each stage with `compensation_agents`
defined it dispatches via `SwarmCoordinator`:
```json
{
"type": "compensation",
"stage_name": "migrate_db",
"workflow_id": "abc-123",
"original_context": { "…": "…" },
"artifacts_to_undo": ["artifact-id-1"]
}
```
Compensation is **best-effort**: errors are logged but never fail the workflow
state machine. The workflow is already marked `Failed` before Saga fires.
### Agent Implementation
Agents receive compensation tasks on the same NATS subjects as regular tasks.
Distinguish by the `"type": "compensation"` field:
```rust
// In your agent task handler
if task.payload["type"] == "compensation" {
let stage = &task.payload["stage_name"];
let wf_id = &task.payload["workflow_id"];
// perform rollback for this stage …
return Ok(());
}
// normal task handling …
```
## 4. Configure Cedar Authorization
Cedar authorization is **opt-in**: if `cedar_policy_dir` is absent, all stages
execute without policy checks.
### Enable Cedar
```toml
[engine]
max_parallel_tasks = 10
workflow_timeout = 3600
approval_gates_enabled = true
cedar_policy_dir = "/etc/vapora/cedar" # directory with *.cedar files
```
### Write Policies
Create `.cedar` files in the configured directory. Each file is loaded at
startup and merged into a single `PolicySet`.
**Allow all stages** (permissive default):
```cedar
// /etc/vapora/cedar/allow-all.cedar
permit(
principal == "vapora-orchestrator",
action == Action::"execute-stage",
resource
);
```
**Restrict deployment stages to approved callers only**:
```cedar
// /etc/vapora/cedar/restrict-deploy.cedar
forbid(
principal == "vapora-orchestrator",
action == Action::"execute-stage",
resource == Stage::"deployment"
) unless {
context.approved == true
};
```
**Allow only specific stages**:
```cedar
// /etc/vapora/cedar/workflow-policy.cedar
permit(
principal == "vapora-orchestrator",
action == Action::"execute-stage",
resource in [Stage::"architecture_design", Stage::"implementation", Stage::"testing"]
);
forbid(
principal == "vapora-orchestrator",
action == Action::"execute-stage",
resource == Stage::"production_deploy"
);
```
### Authorization Check
Before each stage dispatch the engine calls:
```rust
// principal: "vapora-orchestrator"
// action: "execute-stage"
// resource: Stage::"<stage_name>"
cedar.authorize("vapora-orchestrator", "execute-stage", &stage_name)?;
```
A `Deny` decision returns `WorkflowError::Unauthorized` and halts the workflow
immediately. The stage is never dispatched to `SwarmCoordinator`.
## 5. Crash Recovery Behavior
| Scenario | Behavior |
|----------|----------|
| Server restart with active workflows | `load_active()` re-populates `DashMap`; event listener resumes NATS subscriptions; in-flight tasks re-dispatch on next `TaskCompleted`/`TaskFailed` event |
| Workflow in `Completed` or `Failed` state | Filtered out by `load_active()`; not reloaded |
| Workflow in `WaitingApproval` | Reloaded; waits for `approve_stage()` call |
| SurrealDB unavailable at startup | `load_active()` returns error; orchestrator fails to start |
## 6. Observability
Saga dispatch failures appear in logs with level `warn`:
```text
WARN vapora_workflow_engine::saga: compensation dispatch failed
workflow_id="abc-123" stage="migrate_db" error="…"
```
Cedar denials:
```text
ERROR vapora_workflow_engine::orchestrator: Cedar authorization denied
workflow_id="abc-123" stage="production_deploy"
```
Existing Prometheus metrics track workflow lifecycle:
```text
vapora_workflows_failed_total # includes Cedar-denied workflows
vapora_active_workflows # drops immediately on denial
```
## 7. Full Config Reference
```toml
[engine]
max_parallel_tasks = 10
workflow_timeout = 3600
approval_gates_enabled = true
cedar_policy_dir = "/etc/vapora/cedar" # optional
[[workflows]]
name = "my_workflow"
trigger = "manual"
[[workflows.stages]]
name = "stage_a"
agents = ["agent_role"]
parallel = false
max_parallel = 1
approval_required = false
compensation_agents = ["agent_role"] # optional; omit to skip Saga for this stage
```
## Related
- [ADR-0033: Workflow Engine Hardening](../adrs/0033-stratum-orchestrator-workflow-hardening.md)
- [ADR-0028: Workflow Orchestrator](../adrs/0028-workflow-orchestrator.md)
- [ADR-0010: Cedar Authorization](../adrs/0010-cedar-authorization.md)
- [Workflow Orchestrator Feature Reference](../features/workflow-orchestrator.md)