feat(workflow-engine): autonomous scheduling with timezone and distributed lock

Add cron-based autonomous workflow firing with two hardening layers:

  - Timezone-aware scheduling via chrono-tz: ScheduledWorkflow.timezone
    (IANA identifier), compute_next_fire_at/after_tz, validate_timezone;
    DST-safe, UTC fallback when absent; validated at config load and REST API

  - Distributed fire-lock via SurrealDB conditional UPDATE (locked_by/locked_at
    fields, 120 s TTL); WorkflowScheduler gains instance_id (UUID) as lock owner;
    prevents double-fires across multi-instance deployments without extra infra

  - ScheduleStore: try_acquire_fire_lock, release_fire_lock (own-instance guard),
    full CRUD (load_one/all, full_upsert, patch, delete, load_runs)

  - REST: 7 endpoints (GET/PUT/PATCH/DELETE schedules, runs history, manual fire)
    with timezone field in all request/response types

  - Migrations 010 (schedule tables) + 011 (timezone + lock columns)
  - Tests: 48 passing (was 26); ADR-0034; changelog; feature docs updated
This commit is contained in:
Jesús Pérez 2026-02-26 11:34:44 +00:00
parent b9e2cee9f7
commit bb55c80d2b
Signed by: jesus
GPG Key ID: 9F243E355E0BC939
21 changed files with 2071 additions and 5 deletions

View File

@ -7,6 +7,44 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Added - Autonomous Scheduling: Timezone Support and Distributed Fire-Lock
#### `vapora-workflow-engine` — scheduling hardening
- **Timezone-aware cron evaluation** (`chrono-tz = "0.10"`):
- `ScheduledWorkflow.timezone: Option<String>` — IANA identifier stored per-schedule
- `compute_next_fire_at_tz(expr, tz)` / `compute_next_fire_after_tz(expr, after, tz)` — generic over `chrono_tz::Tz`; UTC fallback when `tz = None`
- `validate_timezone(tz)` — compile-time exhaustive IANA enum, rejects unknown identifiers
- `compute_fire_times_tz` in `scheduler.rs` — catch-up and normal firing both timezone-aware
- Config-load validation: `[workflows.schedule] timezone = "..."` validated at startup (fail-fast)
- **Distributed fire-lock** (SurrealDB document-level atomic CAS):
- `scheduled_workflows` gains `locked_by: option<string>` and `locked_at: option<datetime>` (migration 011)
- `ScheduleStore::try_acquire_fire_lock(id, instance_id, now)` — conditional `UPDATE ... WHERE locked_by IS NONE OR locked_at < $expiry`; returns `true` only if update succeeded (non-empty result = lock acquired)
- `ScheduleStore::release_fire_lock(id, instance_id)``WHERE locked_by = $instance_id` guard prevents stale release after TTL expiry
- `WorkflowScheduler.instance_id: String` — UUID generated at startup, identifies lock owner
- 120-second TTL: crashed instance's lock auto-expires within two scheduler ticks
- Lock acquired before `fire_with_lock`, released in `finally`-style block after (warn on release failure, TTL fallback)
- New tests: `test_validate_timezone_valid`, `test_validate_timezone_invalid`, `test_compute_next_fire_at_tz_utc`, `test_compute_next_fire_at_tz_named`, `test_compute_next_fire_at_tz_invalid_tz_fallback`, `test_compute_fires_with_catchup_named_tz`, `test_instance_id_is_unique`
- Test count: 48 (was 41)
#### `vapora-backend` — schedule REST API surface
- `ScheduleResponse`, `PutScheduleRequest`, `PatchScheduleRequest` gain `timezone: Option<String>`
- `validate_tz()` helper validates at API boundary → `400 InvalidInput` on unknown identifier
- `put_schedule` and `patch_schedule` use `compute_next_fire_at_tz` / `compute_next_fire_after_tz`
- `fire_schedule` uses `compute_next_fire_after_tz` with schedule's stored timezone
#### Migrations
- **`migrations/011_schedule_tz_lock.surql`**: `DEFINE FIELD timezone`, `locked_by`, `locked_at` on `scheduled_workflows`
#### Documentation
- **ADR-0034**: design rationale for `chrono-tz` selection and SurrealDB conditional UPDATE lock
- **`docs/features/workflow-orchestrator.md`**: Autonomous Scheduling section with TOML config, REST API table, timezone/distributed lock explanations, Prometheus metrics
---
### Added - Workflow Engine Hardening (Persistence · Saga · Cedar) ### Added - Workflow Engine Hardening (Persistence · Saga · Cedar)
#### `vapora-workflow-engine` — three new hardening layers #### `vapora-workflow-engine` — three new hardening layers

43
Cargo.lock generated
View File

@ -1679,6 +1679,16 @@ dependencies = [
"phf 0.11.3", "phf 0.11.3",
] ]
[[package]]
name = "chrono-tz"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6139a8597ed92cf816dfb33f5dd6cf0bb93a6adc938f11039f371bc5bcd26c3"
dependencies = [
"chrono",
"phf 0.12.1",
]
[[package]] [[package]]
name = "chrono-tz-build" name = "chrono-tz-build"
version = "0.3.0" version = "0.3.0"
@ -2236,6 +2246,17 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b"
[[package]]
name = "cron"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f8c3e73077b4b4a6ab1ea5047c37c57aee77657bc8ecd6f29b0af082d0b0c07"
dependencies = [
"chrono",
"nom 7.1.3",
"once_cell",
]
[[package]] [[package]]
name = "crossbeam" name = "crossbeam"
version = "0.8.4" version = "0.8.4"
@ -7051,6 +7072,15 @@ dependencies = [
"phf_shared 0.11.3", "phf_shared 0.11.3",
] ]
[[package]]
name = "phf"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "913273894cec178f401a31ec4b656318d95473527be05c0752cc41cdc32be8b7"
dependencies = [
"phf_shared 0.12.1",
]
[[package]] [[package]]
name = "phf" name = "phf"
version = "0.13.1" version = "0.13.1"
@ -7130,6 +7160,15 @@ dependencies = [
"unicase", "unicase",
] ]
[[package]]
name = "phf_shared"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06005508882fb681fd97892ecff4b7fd0fee13ef1aa569f8695dae7ab9099981"
dependencies = [
"siphasher",
]
[[package]] [[package]]
name = "phf_shared" name = "phf_shared"
version = "0.13.1" version = "0.13.1"
@ -10937,7 +10976,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8004bca281f2d32df3bacd59bc67b312cb4c70cea46cbd79dbe8ac5ed206722" checksum = "e8004bca281f2d32df3bacd59bc67b312cb4c70cea46cbd79dbe8ac5ed206722"
dependencies = [ dependencies = [
"chrono", "chrono",
"chrono-tz", "chrono-tz 0.9.0",
"globwalk", "globwalk",
"humansize", "humansize",
"lazy_static", "lazy_static",
@ -12643,6 +12682,8 @@ dependencies = [
"async-trait", "async-trait",
"cedar-policy 4.9.0", "cedar-policy 4.9.0",
"chrono", "chrono",
"chrono-tz 0.10.4",
"cron",
"dashmap 6.1.0", "dashmap 6.1.0",
"futures", "futures",
"mockall", "mockall",

View File

@ -12,6 +12,7 @@ pub mod proposals;
pub mod provider_analytics; pub mod provider_analytics;
pub mod provider_metrics; pub mod provider_metrics;
pub mod rlm; pub mod rlm;
pub mod schedules;
pub mod state; pub mod state;
pub mod swarm; pub mod swarm;
pub mod tasks; pub mod tasks;

View File

@ -0,0 +1,451 @@
use std::sync::Arc;
use axum::{
extract::{Path, State},
http::StatusCode,
Json,
};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use tracing::{error, info, warn};
use vapora_shared::VaporaError;
use vapora_workflow_engine::{
compute_next_fire_after_tz, compute_next_fire_at_tz, validate_cron_expression,
validate_timezone, RunStatus, ScheduleRun, ScheduleStore, ScheduledWorkflow,
};
use crate::api::error::ApiError;
use crate::api::state::AppState;
// ─── Response types ──────────────────────────────────────────────────────────
#[derive(Debug, Serialize)]
pub struct ScheduleResponse {
pub id: String,
pub template_name: String,
pub cron_expression: String,
pub initial_context: serde_json::Value,
pub enabled: bool,
pub allow_concurrent: bool,
pub catch_up: bool,
pub timezone: Option<String>,
pub last_fired_at: Option<String>,
pub next_fire_at: Option<String>,
pub runs_count: u64,
pub created_at: String,
pub updated_at: String,
}
impl From<ScheduledWorkflow> for ScheduleResponse {
fn from(s: ScheduledWorkflow) -> Self {
Self {
id: s.id,
template_name: s.template_name,
cron_expression: s.cron_expression,
initial_context: s.initial_context,
enabled: s.enabled,
allow_concurrent: s.allow_concurrent,
catch_up: s.catch_up,
timezone: s.timezone,
last_fired_at: s.last_fired_at.map(|t| t.to_rfc3339()),
next_fire_at: s.next_fire_at.map(|t| t.to_rfc3339()),
runs_count: s.runs_count,
created_at: s.created_at.to_rfc3339(),
updated_at: s.updated_at.to_rfc3339(),
}
}
}
#[derive(Debug, Serialize)]
pub struct ScheduleRunResponse {
pub id: String,
pub schedule_id: String,
pub workflow_instance_id: Option<String>,
pub fired_at: String,
pub status: String,
pub notes: Option<String>,
}
impl From<ScheduleRun> for ScheduleRunResponse {
fn from(r: ScheduleRun) -> Self {
Self {
id: r.id,
schedule_id: r.schedule_id,
workflow_instance_id: r.workflow_instance_id,
fired_at: r.fired_at.to_rfc3339(),
status: match r.status {
RunStatus::Fired => "fired".to_string(),
RunStatus::Skipped => "skipped".to_string(),
RunStatus::Failed => "failed".to_string(),
},
notes: r.notes,
}
}
}
#[derive(Debug, Serialize)]
pub struct ScheduleListResponse {
pub schedules: Vec<ScheduleResponse>,
pub total: usize,
}
#[derive(Debug, Serialize)]
pub struct RunListResponse {
pub runs: Vec<ScheduleRunResponse>,
pub total: usize,
}
#[derive(Debug, Serialize)]
pub struct MessageResponse {
pub success: bool,
pub message: String,
}
// ─── Request types
// ────────────────────────────────────────────────────────────
/// Body for `PUT /api/v1/schedules/:id` — full replacement.
#[derive(Debug, Deserialize)]
pub struct PutScheduleRequest {
pub template_name: String,
pub cron_expression: String,
#[serde(default)]
pub initial_context: Option<serde_json::Value>,
#[serde(default)]
pub enabled: Option<bool>,
#[serde(default)]
pub allow_concurrent: bool,
#[serde(default)]
pub catch_up: bool,
/// IANA timezone for cron evaluation (e.g. `"America/New_York"`). UTC when
/// absent.
#[serde(default)]
pub timezone: Option<String>,
}
/// Body for `PATCH /api/v1/schedules/:id` — partial update.
#[derive(Debug, Deserialize)]
pub struct PatchScheduleRequest {
pub enabled: Option<bool>,
pub cron_expression: Option<String>,
pub allow_concurrent: Option<bool>,
pub catch_up: Option<bool>,
pub initial_context: Option<serde_json::Value>,
/// Update the timezone. Pass `null` explicitly to clear it (revert to UTC).
#[serde(default)]
pub timezone: Option<String>,
}
// ─── Helper
// ───────────────────────────────────────────────────────────────────
fn require_store(state: &AppState) -> Result<Arc<ScheduleStore>, ApiError> {
state.schedule_store.clone().ok_or_else(|| {
ApiError(VaporaError::InternalError(
"Schedule store not available".to_string(),
))
})
}
fn validate_cron(expr: &str) -> Result<(), ApiError> {
validate_cron_expression(expr).map_err(|e| {
ApiError(VaporaError::InvalidInput(format!(
"Invalid cron expression '{}': {}",
expr, e
)))
})
}
fn validate_tz(tz: &str) -> Result<(), ApiError> {
validate_timezone(tz).map_err(|e| ApiError(VaporaError::InvalidInput(e)))
}
// ─── Handlers
// ─────────────────────────────────────────────────────────────────
/// `GET /api/v1/schedules` — list all schedules.
pub async fn list_schedules(
State(state): State<AppState>,
) -> Result<Json<ScheduleListResponse>, ApiError> {
let store = require_store(&state)?;
let schedules = store.load_all().await.map_err(|e| {
error!("list_schedules: {e}");
ApiError(VaporaError::InternalError(e.to_string()))
})?;
let total = schedules.len();
Ok(Json(ScheduleListResponse {
total,
schedules: schedules.into_iter().map(ScheduleResponse::from).collect(),
}))
}
/// `GET /api/v1/schedules/:id` — get a single schedule.
pub async fn get_schedule(
State(state): State<AppState>,
Path(id): Path<String>,
) -> Result<Json<ScheduleResponse>, ApiError> {
let store = require_store(&state)?;
let schedule = store
.load_one(&id)
.await
.map_err(|e| {
error!(schedule_id = %id, "get_schedule: {e}");
ApiError(VaporaError::InternalError(e.to_string()))
})?
.ok_or_else(|| ApiError(VaporaError::NotFound(format!("Schedule {} not found", id))))?;
Ok(Json(ScheduleResponse::from(schedule)))
}
/// `PUT /api/v1/schedules/:id` — create or fully replace a schedule.
///
/// Preserves `last_fired_at` and `runs_count` from the existing record.
pub async fn put_schedule(
State(state): State<AppState>,
Path(id): Path<String>,
Json(req): Json<PutScheduleRequest>,
) -> Result<(StatusCode, Json<ScheduleResponse>), ApiError> {
let store = require_store(&state)?;
validate_cron(&req.cron_expression)?;
if let Some(ref tz) = req.timezone {
validate_tz(tz)?;
}
let tz = req.timezone.as_deref();
let next_fire_at = compute_next_fire_at_tz(&req.cron_expression, tz);
let now = Utc::now();
let s = ScheduledWorkflow {
id: id.clone(),
template_name: req.template_name.clone(),
cron_expression: req.cron_expression.clone(),
initial_context: req
.initial_context
.unwrap_or(serde_json::Value::Object(Default::default())),
enabled: req.enabled.unwrap_or(true),
allow_concurrent: req.allow_concurrent,
catch_up: req.catch_up,
timezone: req.timezone.clone(),
last_fired_at: None,
next_fire_at,
runs_count: 0,
created_at: now,
updated_at: now,
};
store.full_upsert(&s).await.map_err(|e| {
error!(schedule_id = %id, "put_schedule: {e}");
ApiError(VaporaError::InternalError(e.to_string()))
})?;
let updated = store
.load_one(&id)
.await
.map_err(|e| ApiError(VaporaError::InternalError(e.to_string())))?
.ok_or_else(|| {
ApiError(VaporaError::InternalError(
"Schedule vanished after upsert".into(),
))
})?;
info!(schedule_id = %id, template = %req.template_name, "Schedule PUT");
Ok((StatusCode::OK, Json(ScheduleResponse::from(updated))))
}
/// `PATCH /api/v1/schedules/:id` — partial update.
///
/// Only the provided fields are changed. If `cron_expression` is updated,
/// `next_fire_at` is recomputed automatically.
pub async fn patch_schedule(
State(state): State<AppState>,
Path(id): Path<String>,
Json(req): Json<PatchScheduleRequest>,
) -> Result<Json<ScheduleResponse>, ApiError> {
let store = require_store(&state)?;
// Ensure the schedule exists first.
store
.load_one(&id)
.await
.map_err(|e| ApiError(VaporaError::InternalError(e.to_string())))?
.ok_or_else(|| ApiError(VaporaError::NotFound(format!("Schedule {} not found", id))))?;
let mut patch = serde_json::Map::new();
if let Some(enabled) = req.enabled {
patch.insert("enabled".into(), serde_json::json!(enabled));
}
if let Some(allow_concurrent) = req.allow_concurrent {
patch.insert(
"allow_concurrent".into(),
serde_json::json!(allow_concurrent),
);
}
if let Some(catch_up) = req.catch_up {
patch.insert("catch_up".into(), serde_json::json!(catch_up));
}
if let Some(ctx) = req.initial_context {
patch.insert("initial_context".into(), ctx);
}
if let Some(ref tz) = req.timezone {
validate_tz(tz)?;
patch.insert("timezone".into(), serde_json::json!(tz));
}
if let Some(ref cron) = req.cron_expression {
validate_cron(cron)?;
// Recompute next_fire_at using the new cron and whatever timezone is
// already in the patch (or falls back to None → UTC).
let tz = req.timezone.as_deref();
let next_fire_at = compute_next_fire_at_tz(cron, tz);
patch.insert("cron_expression".into(), serde_json::json!(cron));
patch.insert("next_fire_at".into(), serde_json::json!(next_fire_at));
}
patch.insert("updated_at".into(), serde_json::json!(Utc::now()));
let updated = store
.patch(&id, serde_json::Value::Object(patch))
.await
.map_err(|e| {
error!(schedule_id = %id, "patch_schedule: {e}");
ApiError(VaporaError::InternalError(e.to_string()))
})?
.ok_or_else(|| {
ApiError(VaporaError::NotFound(format!(
"Schedule {} not found after patch",
id
)))
})?;
info!(schedule_id = %id, "Schedule PATCHed");
Ok(Json(ScheduleResponse::from(updated)))
}
/// `DELETE /api/v1/schedules/:id` — permanently remove a schedule.
pub async fn delete_schedule(
State(state): State<AppState>,
Path(id): Path<String>,
) -> Result<(StatusCode, Json<MessageResponse>), ApiError> {
let store = require_store(&state)?;
// Verify existence before delete.
store
.load_one(&id)
.await
.map_err(|e| ApiError(VaporaError::InternalError(e.to_string())))?
.ok_or_else(|| ApiError(VaporaError::NotFound(format!("Schedule {} not found", id))))?;
store.delete(&id).await.map_err(|e| {
error!(schedule_id = %id, "delete_schedule: {e}");
ApiError(VaporaError::InternalError(e.to_string()))
})?;
info!(schedule_id = %id, "Schedule deleted");
Ok((
StatusCode::OK,
Json(MessageResponse {
success: true,
message: format!("Schedule {} deleted", id),
}),
))
}
/// `GET /api/v1/schedules/:id/runs` — execution history (last 100, desc).
pub async fn list_schedule_runs(
State(state): State<AppState>,
Path(id): Path<String>,
) -> Result<Json<RunListResponse>, ApiError> {
let store = require_store(&state)?;
// Ensure schedule exists.
store
.load_one(&id)
.await
.map_err(|e| ApiError(VaporaError::InternalError(e.to_string())))?
.ok_or_else(|| ApiError(VaporaError::NotFound(format!("Schedule {} not found", id))))?;
let runs = store.load_runs(&id).await.map_err(|e| {
error!(schedule_id = %id, "list_schedule_runs: {e}");
ApiError(VaporaError::InternalError(e.to_string()))
})?;
let total = runs.len();
Ok(Json(RunListResponse {
total,
runs: runs.into_iter().map(ScheduleRunResponse::from).collect(),
}))
}
/// `POST /api/v1/schedules/:id/fire` — immediately trigger a scheduled
/// workflow bypassing the cron timer.
///
/// Records an auditable `ScheduleRun` with `status = Fired` and advances
/// `last_fired_at` / `next_fire_at` exactly like the background scheduler.
pub async fn fire_schedule(
State(state): State<AppState>,
Path(id): Path<String>,
) -> Result<(StatusCode, Json<ScheduleRunResponse>), ApiError> {
let store = require_store(&state)?;
let orchestrator = state.workflow_orchestrator.as_ref().ok_or_else(|| {
ApiError(VaporaError::InternalError(
"Workflow orchestrator not available".to_string(),
))
})?;
let schedule = store
.load_one(&id)
.await
.map_err(|e| ApiError(VaporaError::InternalError(e.to_string())))?
.ok_or_else(|| ApiError(VaporaError::NotFound(format!("Schedule {} not found", id))))?;
if !schedule.enabled {
return Err(ApiError(VaporaError::InvalidInput(format!(
"Schedule {} is disabled",
id
))));
}
let now = Utc::now();
let workflow_id = orchestrator
.start_workflow(&schedule.template_name, schedule.initial_context.clone())
.await
.map_err(|e| {
error!(schedule_id = %id, "fire_schedule start_workflow: {e}");
ApiError(VaporaError::WorkflowError(e.to_string()))
})?;
let run = ScheduleRun {
id: uuid::Uuid::new_v4().to_string(),
schedule_id: id.clone(),
workflow_instance_id: Some(workflow_id.clone()),
fired_at: now,
status: RunStatus::Fired,
notes: Some("Manual fire via API".to_string()),
};
store.record_run(&run).await.map_err(|e| {
warn!(schedule_id = %id, "fire_schedule record_run: {e}");
ApiError(VaporaError::InternalError(e.to_string()))
})?;
let next_fire_at = compute_next_fire_after_tz(
&schedule.cron_expression,
&now,
schedule.timezone.as_deref(),
);
store
.update_after_fire(&id, now, next_fire_at)
.await
.map_err(|e| {
warn!(schedule_id = %id, "fire_schedule update_after_fire: {e}");
ApiError(VaporaError::InternalError(e.to_string()))
})?;
info!(
schedule_id = %id,
template = %schedule.template_name,
workflow_id = %workflow_id,
"Schedule manually fired via API"
);
Ok((StatusCode::CREATED, Json(ScheduleRunResponse::from(run))))
}

View File

@ -4,7 +4,7 @@ use std::sync::Arc;
use vapora_rlm::storage::SurrealDBStorage; use vapora_rlm::storage::SurrealDBStorage;
use vapora_rlm::RLMEngine; use vapora_rlm::RLMEngine;
use vapora_workflow_engine::WorkflowOrchestrator; use vapora_workflow_engine::{ScheduleStore, WorkflowOrchestrator};
use crate::services::{ use crate::services::{
AgentService, ProjectService, ProposalService, ProviderAnalyticsService, TaskService, AgentService, ProjectService, ProposalService, ProviderAnalyticsService, TaskService,
@ -20,6 +20,7 @@ pub struct AppState {
pub provider_analytics_service: Arc<ProviderAnalyticsService>, pub provider_analytics_service: Arc<ProviderAnalyticsService>,
pub workflow_orchestrator: Option<Arc<WorkflowOrchestrator>>, pub workflow_orchestrator: Option<Arc<WorkflowOrchestrator>>,
pub rlm_engine: Option<Arc<RLMEngine<SurrealDBStorage>>>, pub rlm_engine: Option<Arc<RLMEngine<SurrealDBStorage>>>,
pub schedule_store: Option<Arc<ScheduleStore>>,
} }
impl AppState { impl AppState {
@ -39,6 +40,7 @@ impl AppState {
provider_analytics_service: Arc::new(provider_analytics_service), provider_analytics_service: Arc::new(provider_analytics_service),
workflow_orchestrator: None, workflow_orchestrator: None,
rlm_engine: None, rlm_engine: None,
schedule_store: None,
} }
} }
@ -54,4 +56,10 @@ impl AppState {
self.rlm_engine = Some(rlm_engine); self.rlm_engine = Some(rlm_engine);
self self
} }
/// Add schedule store to state
pub fn with_schedule_store(mut self, store: Arc<ScheduleStore>) -> Self {
self.schedule_store = Some(store);
self
}
} }

View File

@ -290,6 +290,7 @@ mod tests {
approval_required: false, approval_required: false,
compensation_agents: None, compensation_agents: None,
}], }],
schedule: None,
}; };
let instance = WorkflowInstance::new(&config, serde_json::json!({})); let instance = WorkflowInstance::new(&config, serde_json::json!({}));

View File

@ -19,6 +19,7 @@ use clap::Parser;
use tower_http::cors::{Any, CorsLayer}; use tower_http::cors::{Any, CorsLayer};
use tracing::{info, Level}; use tracing::{info, Level};
use vapora_swarm::{SwarmCoordinator, SwarmMetrics}; use vapora_swarm::{SwarmCoordinator, SwarmMetrics};
use vapora_workflow_engine::ScheduleStore;
use crate::api::AppState; use crate::api::AppState;
use crate::config::Config; use crate::config::Config;
@ -104,6 +105,10 @@ async fn main() -> Result<()> {
)?); )?);
info!("RLM engine initialized for Phase 8"); info!("RLM engine initialized for Phase 8");
// Initialize schedule store (backed by the same SurrealDB connection)
let schedule_store = Arc::new(ScheduleStore::new(Arc::new(db.clone())));
info!("ScheduleStore initialized for autonomous scheduling");
// Create application state // Create application state
let app_state = AppState::new( let app_state = AppState::new(
project_service, project_service,
@ -112,7 +117,8 @@ async fn main() -> Result<()> {
proposal_service, proposal_service,
provider_analytics_service, provider_analytics_service,
) )
.with_rlm_engine(rlm_engine); .with_rlm_engine(rlm_engine)
.with_schedule_store(schedule_store);
// Create SwarmMetrics for Prometheus monitoring // Create SwarmMetrics for Prometheus monitoring
let metrics = match SwarmMetrics::new() { let metrics = match SwarmMetrics::new() {
@ -331,6 +337,23 @@ async fn main() -> Result<()> {
.route("/api/v1/rlm/documents", post(api::rlm::load_document)) .route("/api/v1/rlm/documents", post(api::rlm::load_document))
.route("/api/v1/rlm/query", post(api::rlm::query_document)) .route("/api/v1/rlm/query", post(api::rlm::query_document))
.route("/api/v1/rlm/analyze", post(api::rlm::analyze_document)) .route("/api/v1/rlm/analyze", post(api::rlm::analyze_document))
// Schedule endpoints
.route("/api/v1/schedules", get(api::schedules::list_schedules))
.route(
"/api/v1/schedules/:id",
get(api::schedules::get_schedule)
.put(api::schedules::put_schedule)
.patch(api::schedules::patch_schedule)
.delete(api::schedules::delete_schedule),
)
.route(
"/api/v1/schedules/:id/runs",
get(api::schedules::list_schedule_runs),
)
.route(
"/api/v1/schedules/:id/fire",
post(api::schedules::fire_schedule),
)
// Apply CORS, state, and extensions // Apply CORS, state, and extensions
.layer(Extension(swarm_coordinator)) .layer(Extension(swarm_coordinator))
.layer(cors) .layer(cors)

View File

@ -50,6 +50,10 @@ surrealdb = { workspace = true }
# Authorization # Authorization
cedar-policy = "4.9" cedar-policy = "4.9"
# Scheduling
cron = "0.12"
chrono-tz = "0.10"
[dev-dependencies] [dev-dependencies]
mockall = { workspace = true } mockall = { workspace = true }
wiremock = { workspace = true } wiremock = { workspace = true }

View File

@ -1,4 +1,5 @@
use std::path::Path; use std::path::Path;
use std::str::FromStr;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -24,6 +25,31 @@ pub struct WorkflowConfig {
pub name: String, pub name: String,
pub trigger: String, pub trigger: String,
pub stages: Vec<StageConfig>, pub stages: Vec<StageConfig>,
#[serde(default)]
pub schedule: Option<ScheduleConfig>,
}
/// Cron-based scheduling configuration for `trigger = "schedule"` workflows.
#[derive(Debug, Clone, Deserialize)]
pub struct ScheduleConfig {
/// 5-field standard cron (`min hour dom month dow`) or 7-field cron crate
/// format (`sec min hour dom month dow year`).
pub cron: String,
/// IANA timezone identifier for cron evaluation (e.g.
/// `"America/New_York"`). Defaults to UTC when absent.
#[serde(default)]
pub timezone: Option<String>,
/// Allow a new instance to start while a previous one is still running.
#[serde(default)]
pub allow_concurrent: bool,
/// Fire all missed slots on restart (capped at 10). When false, only the
/// next slot is fired and missed slots are discarded.
#[serde(default)]
pub catch_up: bool,
/// Optional JSON object merged into each triggered workflow's initial
/// context.
#[serde(default)]
pub initial_context: Option<serde_json::Value>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@ -75,6 +101,13 @@ impl WorkflowsConfig {
.into()); .into());
} }
} }
// Trigger/schedule alignment: "schedule" requires a [schedule] block
// and a parseable cron expression (fail-fast rather than silently
// skipping at runtime).
if workflow.trigger == "schedule" {
validate_schedule_config(&workflow.name, &workflow.schedule)?;
}
} }
Ok(()) Ok(())
@ -85,6 +118,44 @@ impl WorkflowsConfig {
} }
} }
fn validate_schedule_config(
workflow_name: &str,
schedule: &Option<ScheduleConfig>,
) -> std::result::Result<(), ConfigError> {
let sc = schedule.as_ref().ok_or_else(|| {
ConfigError::Invalid(format!(
"Workflow '{}' has trigger = \"schedule\" but no [schedule] block",
workflow_name
))
})?;
// Validate cron expression (inline normalisation avoids circular dep with
// schedule.rs).
let normalized = match sc.cron.split_whitespace().count() {
5 => format!("0 {} *", sc.cron),
6 => format!("{} *", sc.cron),
_ => sc.cron.clone(),
};
cron::Schedule::from_str(&normalized).map_err(|e| {
ConfigError::Invalid(format!(
"Workflow '{}' has invalid cron '{}': {}",
workflow_name, sc.cron, e
))
})?;
// Validate timezone when provided.
if let Some(tz) = &sc.timezone {
tz.parse::<chrono_tz::Tz>().map_err(|_| {
ConfigError::Invalid(format!(
"Workflow '{}' has invalid timezone '{}': not a valid IANA identifier",
workflow_name, tz
))
})?;
}
Ok(())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -190,4 +261,89 @@ agents = ["agent2"]
assert!(config.get_workflow("workflow_b").is_some()); assert!(config.get_workflow("workflow_b").is_some());
assert!(config.get_workflow("nonexistent").is_none()); assert!(config.get_workflow("nonexistent").is_none());
} }
#[test]
fn test_schedule_trigger_valid() {
let toml_str = r#"
[engine]
max_parallel_tasks = 4
workflow_timeout = 3600
approval_gates_enabled = false
[[workflows]]
name = "nightly_analysis"
trigger = "schedule"
[workflows.schedule]
cron = "0 2 * * *"
allow_concurrent = false
catch_up = false
[[workflows.stages]]
name = "analyze"
agents = ["analyst"]
"#;
let config: WorkflowsConfig = toml::from_str(toml_str).unwrap();
assert!(
config.validate().is_ok(),
"Valid schedule config should pass"
);
let wf = config.get_workflow("nightly_analysis").unwrap();
let sc = wf.schedule.as_ref().unwrap();
assert_eq!(sc.cron, "0 2 * * *");
assert!(!sc.allow_concurrent);
assert!(!sc.catch_up);
}
#[test]
fn test_schedule_trigger_missing_block() {
let toml_str = r#"
[engine]
max_parallel_tasks = 4
workflow_timeout = 3600
approval_gates_enabled = false
[[workflows]]
name = "no_schedule_block"
trigger = "schedule"
[[workflows.stages]]
name = "work"
agents = ["worker"]
"#;
let config: WorkflowsConfig = toml::from_str(toml_str).unwrap();
assert!(
config.validate().is_err(),
"schedule trigger without [schedule] block must fail validation"
);
}
#[test]
fn test_schedule_trigger_invalid_cron() {
let toml_str = r#"
[engine]
max_parallel_tasks = 4
workflow_timeout = 3600
approval_gates_enabled = false
[[workflows]]
name = "bad_cron"
trigger = "schedule"
[workflows.schedule]
cron = "not a valid cron expression"
[[workflows.stages]]
name = "work"
agents = ["worker"]
"#;
let config: WorkflowsConfig = toml::from_str(toml_str).unwrap();
assert!(
config.validate().is_err(),
"Invalid cron expression must fail validation"
);
}
} }

View File

@ -49,6 +49,18 @@ pub enum WorkflowError {
#[error("Internal error: {0}")] #[error("Internal error: {0}")]
Internal(String), Internal(String),
#[error("Schedule error: {0}")]
Schedule(#[from] ScheduleError),
}
#[derive(Error, Debug)]
pub enum ScheduleError {
#[error("Invalid cron expression '{expr}': {reason}")]
InvalidCron { expr: String, reason: String },
#[error("Schedule not found: {0}")]
NotFound(String),
} }
#[derive(Error, Debug)] #[derive(Error, Debug)]

View File

@ -226,6 +226,7 @@ mod tests {
compensation_agents: None, compensation_agents: None,
}, },
], ],
schedule: None,
} }
} }

View File

@ -31,15 +31,25 @@ pub mod metrics;
pub mod orchestrator; pub mod orchestrator;
pub mod persistence; pub mod persistence;
pub mod saga; pub mod saga;
pub mod schedule;
pub mod schedule_store;
pub mod scheduler;
pub mod stage; pub mod stage;
pub use artifact::{Artifact, ArtifactType}; pub use artifact::{Artifact, ArtifactType};
pub use auth::CedarAuthorizer; pub use auth::CedarAuthorizer;
pub use config::{EngineConfig, StageConfig, WorkflowConfig, WorkflowsConfig}; pub use config::{EngineConfig, ScheduleConfig, StageConfig, WorkflowConfig, WorkflowsConfig};
pub use error::{ConfigError, Result, WorkflowError}; pub use error::{ConfigError, Result, ScheduleError, WorkflowError};
pub use instance::{WorkflowInstance, WorkflowStatus}; pub use instance::{WorkflowInstance, WorkflowStatus};
pub use metrics::WorkflowMetrics; pub use metrics::WorkflowMetrics;
pub use orchestrator::WorkflowOrchestrator; pub use orchestrator::WorkflowOrchestrator;
pub use persistence::SurrealWorkflowStore; pub use persistence::SurrealWorkflowStore;
pub use saga::SagaCompensator; pub use saga::SagaCompensator;
pub use schedule::{
compute_next_fire_after, compute_next_fire_after_tz, compute_next_fire_at,
compute_next_fire_at_tz, validate_cron_expression, validate_timezone, RunStatus, ScheduleRun,
ScheduledWorkflow,
};
pub use schedule_store::ScheduleStore;
pub use scheduler::WorkflowScheduler;
pub use stage::{StageState, StageStatus, TaskState, TaskStatus}; pub use stage::{StageState, StageStatus, TaskState, TaskStatus};

View File

@ -10,6 +10,11 @@ pub struct WorkflowMetrics {
pub active_workflows: IntGauge, pub active_workflows: IntGauge,
pub stage_duration_seconds: Histogram, pub stage_duration_seconds: Histogram,
pub workflow_duration_seconds: Histogram, pub workflow_duration_seconds: Histogram,
// Scheduling subsystem
pub schedules_fired: Counter,
pub schedules_skipped: Counter,
pub schedules_failed: Counter,
pub active_schedules: IntGauge,
} }
impl WorkflowMetrics { impl WorkflowMetrics {
@ -46,6 +51,22 @@ impl WorkflowMetrics {
) )
.buckets(vec![60.0, 300.0, 600.0, 1800.0, 3600.0]), .buckets(vec![60.0, 300.0, 600.0, 1800.0, 3600.0]),
)?, )?,
schedules_fired: register_counter!(
"vapora_schedules_fired_total",
"Total schedule fires that launched a workflow"
)?,
schedules_skipped: register_counter!(
"vapora_schedules_skipped_total",
"Total schedule fires skipped due to allow_concurrent=false"
)?,
schedules_failed: register_counter!(
"vapora_schedules_failed_total",
"Total schedule fires that failed to start a workflow"
)?,
active_schedules: register_int_gauge!(
"vapora_active_schedules",
"Number of enabled scheduled workflows"
)?,
}) })
} }
@ -57,6 +78,10 @@ impl WorkflowMetrics {
registry.register(Box::new(self.active_workflows.clone()))?; registry.register(Box::new(self.active_workflows.clone()))?;
registry.register(Box::new(self.stage_duration_seconds.clone()))?; registry.register(Box::new(self.stage_duration_seconds.clone()))?;
registry.register(Box::new(self.workflow_duration_seconds.clone()))?; registry.register(Box::new(self.workflow_duration_seconds.clone()))?;
registry.register(Box::new(self.schedules_fired.clone()))?;
registry.register(Box::new(self.schedules_skipped.clone()))?;
registry.register(Box::new(self.schedules_failed.clone()))?;
registry.register(Box::new(self.active_schedules.clone()))?;
Ok(()) Ok(())
} }
} }

View File

@ -6,6 +6,7 @@ use futures::StreamExt;
use serde_json::Value; use serde_json::Value;
use surrealdb::engine::remote::ws::Client; use surrealdb::engine::remote::ws::Client;
use surrealdb::Surreal; use surrealdb::Surreal;
use tokio::sync::watch;
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use vapora_agents::messages::{AgentMessage, TaskCompleted, TaskFailed}; use vapora_agents::messages::{AgentMessage, TaskCompleted, TaskFailed};
use vapora_knowledge_graph::persistence::KGPersistence; use vapora_knowledge_graph::persistence::KGPersistence;
@ -19,6 +20,9 @@ use crate::instance::{WorkflowInstance, WorkflowStatus};
use crate::metrics::WorkflowMetrics; use crate::metrics::WorkflowMetrics;
use crate::persistence::SurrealWorkflowStore; use crate::persistence::SurrealWorkflowStore;
use crate::saga::SagaCompensator; use crate::saga::SagaCompensator;
use crate::schedule::ScheduledWorkflow;
use crate::schedule_store::ScheduleStore;
use crate::scheduler::WorkflowScheduler;
use crate::stage::{StageState, StageStatus, TaskState}; use crate::stage::{StageState, StageStatus, TaskState};
pub struct WorkflowOrchestrator { pub struct WorkflowOrchestrator {
@ -653,6 +657,49 @@ impl WorkflowOrchestrator {
Ok(()) Ok(())
} }
/// Returns true if any non-terminal workflow instance is running for the
/// given template name. Used by `WorkflowScheduler` to enforce
/// `allow_concurrent = false`.
pub fn has_active_workflow_for_template(&self, template_name: &str) -> bool {
self.active_workflows
.iter()
.any(|e| e.value().template_name == template_name)
}
/// Build a `WorkflowScheduler` seeded from the TOML-configured scheduled
/// workflows.
///
/// TOML is the source of truth for static config; the DB owns runtime state
/// (`last_fired_at`, `runs_count`, `next_fire_at`). The returned
/// `watch::Sender<bool>` drives graceful shutdown: `sender.send(true)`
/// terminates the scheduler loop.
pub async fn build_scheduler(
self: Arc<Self>,
db: Arc<Surreal<Client>>,
) -> Result<(WorkflowScheduler, watch::Sender<bool>)> {
let store = Arc::new(ScheduleStore::new(Arc::clone(&db)));
for wf in self
.config
.workflows
.iter()
.filter(|w| w.trigger == "schedule")
{
if let Some(sc) = &wf.schedule {
let entry = ScheduledWorkflow::from_config(&wf.name, sc);
store.upsert(&entry).await?;
}
}
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let nats = Some(Arc::clone(&self.nats));
let metrics = Arc::clone(&self.metrics);
let orchestrator = Arc::clone(&self);
let scheduler = WorkflowScheduler::new(store, orchestrator, nats, metrics, shutdown_rx);
Ok((scheduler, shutdown_tx))
}
pub fn list_templates(&self) -> Vec<String> { pub fn list_templates(&self) -> Vec<String> {
self.config self.config
.workflows .workflows

View File

@ -0,0 +1,248 @@
use std::str::FromStr;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::config::ScheduleConfig;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RunStatus {
Fired,
Skipped,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduleRun {
pub id: String,
pub schedule_id: String,
pub workflow_instance_id: Option<String>,
pub fired_at: DateTime<Utc>,
pub status: RunStatus,
pub notes: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduledWorkflow {
pub id: String,
pub template_name: String,
pub cron_expression: String,
pub initial_context: serde_json::Value,
pub enabled: bool,
pub allow_concurrent: bool,
pub catch_up: bool,
/// IANA timezone identifier for cron evaluation (e.g. "America/New_York").
/// When `None`, UTC is used.
#[serde(default)]
pub timezone: Option<String>,
pub last_fired_at: Option<DateTime<Utc>>,
pub next_fire_at: Option<DateTime<Utc>>,
pub runs_count: u64,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
impl ScheduledWorkflow {
/// Build from TOML config, deriving a stable ID from the template name
/// so repeated calls to `store.upsert` are idempotent.
pub fn from_config(template_name: &str, sc: &ScheduleConfig) -> Self {
let next_fire_at = compute_next_fire_at_tz(&sc.cron, sc.timezone.as_deref());
Self {
id: template_name.to_string(),
template_name: template_name.to_string(),
cron_expression: sc.cron.clone(),
initial_context: sc
.initial_context
.clone()
.unwrap_or(serde_json::Value::Object(Default::default())),
enabled: true,
allow_concurrent: sc.allow_concurrent,
catch_up: sc.catch_up,
timezone: sc.timezone.clone(),
last_fired_at: None,
next_fire_at,
runs_count: 0,
created_at: Utc::now(),
updated_at: Utc::now(),
}
}
/// True when the scheduled fire time has arrived or passed.
pub fn is_due(&self, now: DateTime<Utc>) -> bool {
self.next_fire_at.is_some_and(|t| t <= now)
}
}
/// Normalise a cron expression to the 7-field format required by the `cron`
/// crate (`sec min hour dom month dow year`).
///
/// - 5-field standard shell cron (`min hour dom month dow`) → prepend `0`
/// (seconds) and append `*` (any year).
/// - 6-field (`sec min hour dom month dow`) → append `*` (any year).
/// - 7-field → unchanged.
pub(crate) fn normalize_cron(expr: &str) -> String {
match expr.split_whitespace().count() {
5 => format!("0 {} *", expr),
6 => format!("{} *", expr),
_ => expr.to_string(),
}
}
/// Validate a cron expression (5-field, 6-field, or 7-field).
///
/// Returns `Ok(())` if parseable, `Err(description)` otherwise.
pub fn validate_cron_expression(expr: &str) -> Result<(), String> {
let normalized = normalize_cron(expr);
cron::Schedule::from_str(&normalized)
.map(|_| ())
.map_err(|e| e.to_string())
}
/// Validate an IANA timezone string (e.g. `"America/New_York"`).
///
/// Returns `Ok(())` when recognised, `Err(description)` otherwise.
pub fn validate_timezone(tz: &str) -> Result<(), String> {
tz.parse::<chrono_tz::Tz>()
.map(|_| ())
.map_err(|_| format!("'{}' is not a valid IANA timezone identifier", tz))
}
/// Compute the next scheduled fire time after UTC now, evaluated in `tz`.
///
/// Returns `None` if the expression is invalid or has no future occurrences.
/// When `tz` is `None`, UTC is used.
pub fn compute_next_fire_at_tz(expr: &str, tz: Option<&str>) -> Option<DateTime<Utc>> {
let normalized = normalize_cron(expr);
let schedule = cron::Schedule::from_str(&normalized).ok()?;
match tz.and_then(|s| s.parse::<chrono_tz::Tz>().ok()) {
Some(tz) => schedule.upcoming(tz).next().map(|t| t.with_timezone(&Utc)),
None => schedule.upcoming(Utc).next(),
}
}
/// Compute the next scheduled fire time after `after`, evaluated in `tz`.
///
/// When `tz` is `None`, UTC is used.
pub fn compute_next_fire_after_tz(
expr: &str,
after: &DateTime<Utc>,
tz: Option<&str>,
) -> Option<DateTime<Utc>> {
let normalized = normalize_cron(expr);
let schedule = cron::Schedule::from_str(&normalized).ok()?;
match tz.and_then(|s| s.parse::<chrono_tz::Tz>().ok()) {
Some(tz) => schedule
.after(&after.with_timezone(&tz))
.next()
.map(|t| t.with_timezone(&Utc)),
None => schedule.after(after).next(),
}
}
/// UTC convenience wrapper — delegates to `compute_next_fire_at_tz` with no
/// timezone.
pub fn compute_next_fire_at(expr: &str) -> Option<DateTime<Utc>> {
compute_next_fire_at_tz(expr, None)
}
/// UTC convenience wrapper — delegates to `compute_next_fire_after_tz` with no
/// timezone.
pub fn compute_next_fire_after(expr: &str, after: &DateTime<Utc>) -> Option<DateTime<Utc>> {
compute_next_fire_after_tz(expr, after, None)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_run_status_serde() {
for status in [RunStatus::Fired, RunStatus::Skipped, RunStatus::Failed] {
let json = serde_json::to_string(&status).unwrap();
let round_tripped: RunStatus = serde_json::from_str(&json).unwrap();
assert_eq!(status, round_tripped);
}
}
#[test]
fn test_schedule_is_due() {
let now = Utc::now();
let past = now - chrono::Duration::seconds(60);
let future = now + chrono::Duration::seconds(60);
let mut s = ScheduledWorkflow {
id: "test".to_string(),
template_name: "test".to_string(),
cron_expression: "0 * * * *".to_string(),
initial_context: serde_json::json!({}),
enabled: true,
allow_concurrent: false,
catch_up: false,
timezone: None,
last_fired_at: None,
next_fire_at: None,
runs_count: 0,
created_at: now,
updated_at: now,
};
assert!(!s.is_due(now), "None next_fire_at should not be due");
s.next_fire_at = Some(past);
assert!(s.is_due(now), "Past next_fire_at should be due");
s.next_fire_at = Some(now);
assert!(s.is_due(now), "Exact now should be due");
s.next_fire_at = Some(future);
assert!(!s.is_due(now), "Future next_fire_at should not be due");
}
#[test]
fn test_validate_timezone_valid() {
assert!(validate_timezone("America/New_York").is_ok());
assert!(validate_timezone("Europe/London").is_ok());
assert!(validate_timezone("Asia/Tokyo").is_ok());
assert!(validate_timezone("UTC").is_ok());
}
#[test]
fn test_validate_timezone_invalid() {
assert!(validate_timezone("Not/ATimezone").is_err());
assert!(validate_timezone("New York").is_err());
assert!(validate_timezone("").is_err());
}
#[test]
fn test_compute_next_fire_at_tz_utc() {
// UTC fallback: any future result is valid
let result = compute_next_fire_at_tz("0 9 * * *", None);
assert!(result.is_some(), "should produce a future time");
assert!(result.unwrap() > Utc::now());
}
#[test]
fn test_compute_next_fire_at_tz_named() {
let result_tz = compute_next_fire_at_tz("0 9 * * *", Some("America/New_York"));
let result_utc = compute_next_fire_at_tz("0 9 * * *", None);
// Both must be Some and in the future.
assert!(result_tz.is_some());
assert!(result_utc.is_some());
// They represent 09:00 in different timezones, so they must differ
// (EST = UTC-5, EDT = UTC-4; New_York 09:00 != UTC 09:00).
assert_ne!(
result_tz.unwrap(),
result_utc.unwrap(),
"timezone-aware result must differ from UTC result"
);
}
#[test]
fn test_compute_next_fire_at_tz_invalid_tz_fallback() {
// Unknown timezone is silently treated as UTC (parse fails, match hits None
// arm).
let result = compute_next_fire_at_tz("0 9 * * *", Some("Invalid/Zone"));
assert!(result.is_some(), "falls back to UTC on unknown timezone");
}
}

View File

@ -0,0 +1,384 @@
use std::sync::Arc;
use chrono::{DateTime, Utc};
use surrealdb::engine::remote::ws::Client;
use surrealdb::Surreal;
use tracing::debug;
use crate::error::{Result, WorkflowError};
use crate::schedule::{ScheduleRun, ScheduledWorkflow};
/// Persists `ScheduledWorkflow` and `ScheduleRun` records to SurrealDB.
///
/// Follows the same injection pattern as `SurrealWorkflowStore` — receives the
/// shared DB connection; does not create its own. Tables are defined by
/// migration 010.
pub struct ScheduleStore {
db: Arc<Surreal<Client>>,
}
impl ScheduleStore {
pub fn new(db: Arc<Surreal<Client>>) -> Self {
Self { db }
}
/// Idempotent upsert for a TOML-seeded schedule.
///
/// - First call: creates the record with full initial state (including
/// computed `next_fire_at`).
/// - Subsequent calls: merges only static config fields (`cron_expression`,
/// `allow_concurrent`, `catch_up`, `initial_context`), leaving runtime
/// state (`last_fired_at`, `next_fire_at`, `runs_count`) intact.
pub async fn upsert(&self, s: &ScheduledWorkflow) -> Result<()> {
let existing: Option<serde_json::Value> = self
.db
.select(("scheduled_workflows", &*s.id))
.await
.map_err(|e| WorkflowError::DatabaseError(format!("check schedule {}: {e}", s.id)))?;
if existing.is_none() {
let json = serde_json::to_value(s).map_err(|e| {
WorkflowError::DatabaseError(format!("serialize schedule {}: {e}", s.id))
})?;
let _: Option<serde_json::Value> = self
.db
.create(("scheduled_workflows", &*s.id))
.content(json)
.await
.map_err(|e| {
WorkflowError::DatabaseError(format!("create schedule {}: {e}", s.id))
})?;
debug!(schedule_id = %s.id, template = %s.template_name, "Schedule created");
} else {
let _: Option<serde_json::Value> = self
.db
.update(("scheduled_workflows", &*s.id))
.merge(serde_json::json!({
"cron_expression": s.cron_expression,
"allow_concurrent": s.allow_concurrent,
"catch_up": s.catch_up,
"initial_context": s.initial_context,
"timezone": s.timezone,
"updated_at": Utc::now(),
}))
.await
.map_err(|e| {
WorkflowError::DatabaseError(format!("merge schedule {}: {e}", s.id))
})?;
debug!(schedule_id = %s.id, template = %s.template_name, "Schedule config merged");
}
Ok(())
}
/// Load all enabled schedules. Filters in Rust to avoid complex SurrealQL
/// against serde-tagged enums (same rationale as `SurrealWorkflowStore`).
pub async fn load_enabled(&self) -> Result<Vec<ScheduledWorkflow>> {
let mut response = self
.db
.query("SELECT * FROM scheduled_workflows")
.await
.map_err(|e| WorkflowError::DatabaseError(format!("load_enabled query: {e}")))?;
let raw: Vec<serde_json::Value> = response
.take(0)
.map_err(|e| WorkflowError::DatabaseError(format!("load_enabled take: {e}")))?;
let schedules: Vec<ScheduledWorkflow> = raw
.into_iter()
.filter_map(|v| serde_json::from_value(v).ok())
.collect();
Ok(schedules.into_iter().filter(|s| s.enabled).collect())
}
/// Atomically advance runtime state after one or more fires.
///
/// Uses `runs_count = runs_count + 1` in SurrealQL to avoid a read-modify-
/// write race when multiple scheduler instances run (future distributed
/// deployments).
pub async fn update_after_fire(
&self,
id: &str,
fired_at: DateTime<Utc>,
next_fire_at: Option<DateTime<Utc>>,
) -> Result<()> {
self.db
.query(
"UPDATE type::thing('scheduled_workflows', $id) SET last_fired_at = $fired_at, \
next_fire_at = $next_fire_at, runs_count = runs_count + 1, updated_at = \
time::now()",
)
.bind(("id", id.to_string()))
.bind(("fired_at", fired_at))
.bind(("next_fire_at", next_fire_at))
.await
.map_err(|e| WorkflowError::DatabaseError(format!("update_after_fire {id}: {e}")))?;
debug!(schedule_id = %id, "Schedule runtime state updated");
Ok(())
}
/// Load a single schedule by its ID.
pub async fn load_one(&self, id: &str) -> Result<Option<ScheduledWorkflow>> {
let raw: Option<serde_json::Value> = self
.db
.select(("scheduled_workflows", id))
.await
.map_err(|e| WorkflowError::DatabaseError(format!("load_one schedule {id}: {e}")))?;
raw.map(|v| {
serde_json::from_value(v).map_err(|e| {
WorkflowError::DatabaseError(format!("deserialize schedule {id}: {e}"))
})
})
.transpose()
}
/// Load all schedules (enabled and disabled).
pub async fn load_all(&self) -> Result<Vec<ScheduledWorkflow>> {
let mut response = self
.db
.query("SELECT * FROM scheduled_workflows")
.await
.map_err(|e| WorkflowError::DatabaseError(format!("load_all query: {e}")))?;
let raw: Vec<serde_json::Value> = response
.take(0)
.map_err(|e| WorkflowError::DatabaseError(format!("load_all take: {e}")))?;
Ok(raw
.into_iter()
.filter_map(|v| serde_json::from_value(v).ok())
.collect())
}
/// Full-replace upsert for `PUT` semantics.
///
/// Replaces all config fields and recomputes `next_fire_at`. Preserves
/// `last_fired_at` and `runs_count` from the existing record so PUT
/// doesn't erase operational history.
pub async fn full_upsert(&self, s: &ScheduledWorkflow) -> Result<()> {
let existing: Option<serde_json::Value> = self
.db
.select(("scheduled_workflows", &*s.id))
.await
.map_err(|e| WorkflowError::DatabaseError(format!("check schedule {}: {e}", s.id)))?;
if existing.is_none() {
let json = serde_json::to_value(s).map_err(|e| {
WorkflowError::DatabaseError(format!("serialize schedule {}: {e}", s.id))
})?;
let _: Option<serde_json::Value> = self
.db
.create(("scheduled_workflows", &*s.id))
.content(json)
.await
.map_err(|e| {
WorkflowError::DatabaseError(format!("create schedule {}: {e}", s.id))
})?;
} else {
// Replace all config fields but preserve operational counters.
let _: Option<serde_json::Value> = self
.db
.update(("scheduled_workflows", &*s.id))
.merge(serde_json::json!({
"template_name": s.template_name,
"cron_expression": s.cron_expression,
"initial_context": s.initial_context,
"enabled": s.enabled,
"allow_concurrent": s.allow_concurrent,
"catch_up": s.catch_up,
"timezone": s.timezone,
"next_fire_at": s.next_fire_at,
"updated_at": Utc::now(),
}))
.await
.map_err(|e| {
WorkflowError::DatabaseError(format!("full_upsert schedule {}: {e}", s.id))
})?;
}
debug!(schedule_id = %s.id, "Schedule full-upserted");
Ok(())
}
/// Partial update: only touches the fields provided (PATCH semantics).
///
/// If `cron_expression` changes, the caller must compute and pass the new
/// `next_fire_at`.
pub async fn patch(
&self,
id: &str,
patch: serde_json::Value,
) -> Result<Option<ScheduledWorkflow>> {
let _: Option<serde_json::Value> = self
.db
.update(("scheduled_workflows", id))
.merge(patch)
.await
.map_err(|e| WorkflowError::DatabaseError(format!("patch schedule {id}: {e}")))?;
self.load_one(id).await
}
/// Delete a schedule definition permanently.
pub async fn delete(&self, id: &str) -> Result<()> {
let _: Option<serde_json::Value> = self
.db
.delete(("scheduled_workflows", id))
.await
.map_err(|e| WorkflowError::DatabaseError(format!("delete schedule {id}: {e}")))?;
debug!(schedule_id = %id, "Schedule deleted");
Ok(())
}
/// Load the run history for a specific schedule, ordered by fired_at desc.
pub async fn load_runs(&self, schedule_id: &str) -> Result<Vec<ScheduleRun>> {
let mut response = self
.db
.query(
"SELECT * FROM schedule_runs WHERE schedule_id = $schedule_id ORDER BY fired_at \
DESC LIMIT 100",
)
.bind(("schedule_id", schedule_id.to_string()))
.await
.map_err(|e| WorkflowError::DatabaseError(format!("load_runs query: {e}")))?;
let raw: Vec<serde_json::Value> = response
.take(0)
.map_err(|e| WorkflowError::DatabaseError(format!("load_runs take: {e}")))?;
Ok(raw
.into_iter()
.filter_map(|v| serde_json::from_value(v).ok())
.collect())
}
/// Attempt to acquire an exclusive fire-lock for distributed deployments.
///
/// Uses a conditional SurrealDB UPDATE: succeeds only when the record has
/// no current lock holder, or the existing lock has expired (> 120 s old).
/// Returns `true` if the lock was acquired by this call, `false` if another
/// instance already holds it.
///
/// The 120-second TTL means a crashed instance releases its lock
/// automatically on the next scheduler tick after that window.
pub async fn try_acquire_fire_lock(
&self,
id: &str,
instance_id: &str,
now: &chrono::DateTime<Utc>,
) -> Result<bool> {
let expiry = *now - chrono::Duration::seconds(120);
let mut resp = self
.db
.query(
"UPDATE type::thing('scheduled_workflows', $id) SET locked_by = $instance_id, \
locked_at = $now WHERE locked_by IS NONE OR locked_at < $expiry",
)
.bind(("id", id.to_string()))
.bind(("instance_id", instance_id.to_string()))
.bind(("now", *now))
.bind(("expiry", expiry))
.await
.map_err(|e| {
WorkflowError::DatabaseError(format!("try_acquire_fire_lock {id}: {e}"))
})?;
let records: Vec<serde_json::Value> = resp.take(0).map_err(|e| {
WorkflowError::DatabaseError(format!("try_acquire_fire_lock take {id}: {e}"))
})?;
Ok(!records.is_empty())
}
/// Release the fire-lock held by `instance_id`.
///
/// The WHERE guard ensures an instance can only release its own lock,
/// preventing a slow instance from accidentally clearing another's lock
/// after its TTL expired and a second instance re-acquired it.
pub async fn release_fire_lock(&self, id: &str, instance_id: &str) -> Result<()> {
self.db
.query(
"UPDATE type::thing('scheduled_workflows', $id) SET locked_by = NONE, locked_at = \
NONE WHERE locked_by = $instance_id",
)
.bind(("id", id.to_string()))
.bind(("instance_id", instance_id.to_string()))
.await
.map_err(|e| WorkflowError::DatabaseError(format!("release_fire_lock {id}: {e}")))?;
debug!(schedule_id = %id, instance_id = %instance_id, "Fire lock released");
Ok(())
}
/// Append a run record to the immutable audit log.
pub async fn record_run(&self, run: &ScheduleRun) -> Result<()> {
let json = serde_json::to_value(run)
.map_err(|e| WorkflowError::DatabaseError(format!("serialize run {}: {e}", run.id)))?;
let _: Option<serde_json::Value> = self
.db
.create(("schedule_runs", &*run.id))
.content(json)
.await
.map_err(|e| WorkflowError::DatabaseError(format!("record_run {}: {e}", run.id)))?;
debug!(
run_id = %run.id,
schedule = %run.schedule_id,
status = ?run.status,
"Schedule run recorded"
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schedule::RunStatus;
#[test]
fn test_schedule_serialization() {
let now = Utc::now();
let s = ScheduledWorkflow {
id: "nightly_analysis".to_string(),
template_name: "nightly_analysis".to_string(),
cron_expression: "0 2 * * *".to_string(),
initial_context: serde_json::json!({"env": "prod"}),
enabled: true,
allow_concurrent: false,
catch_up: false,
timezone: None,
last_fired_at: Some(now),
next_fire_at: Some(now + chrono::Duration::hours(24)),
runs_count: 42,
created_at: now,
updated_at: now,
};
let json = serde_json::to_string(&s).expect("serialize");
let decoded: ScheduledWorkflow = serde_json::from_str(&json).expect("deserialize");
assert_eq!(decoded.id, s.id);
assert_eq!(decoded.template_name, s.template_name);
assert_eq!(decoded.cron_expression, s.cron_expression);
assert_eq!(decoded.runs_count, 42);
assert_eq!(decoded.allow_concurrent, false);
assert_eq!(decoded.catch_up, false);
let run = ScheduleRun {
id: uuid::Uuid::new_v4().to_string(),
schedule_id: s.id.clone(),
workflow_instance_id: Some("wf-abc".to_string()),
fired_at: now,
status: RunStatus::Fired,
notes: None,
};
let run_json = serde_json::to_string(&run).expect("serialize run");
let decoded_run: ScheduleRun = serde_json::from_str(&run_json).expect("deserialize run");
assert_eq!(decoded_run.status, RunStatus::Fired);
}
}

View File

@ -0,0 +1,388 @@
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use chrono::{DateTime, Utc};
use tokio::sync::watch;
use tokio::time::MissedTickBehavior;
use tracing::{error, info, warn};
use crate::error::{Result, ScheduleError, WorkflowError};
use crate::metrics::WorkflowMetrics;
use crate::orchestrator::WorkflowOrchestrator;
use crate::schedule::{
compute_next_fire_after_tz, compute_next_fire_at_tz, normalize_cron, RunStatus, ScheduleRun,
ScheduledWorkflow,
};
use crate::schedule_store::ScheduleStore;
pub struct WorkflowScheduler {
store: Arc<ScheduleStore>,
orchestrator: Arc<WorkflowOrchestrator>,
nats: Option<Arc<async_nats::Client>>,
metrics: Arc<WorkflowMetrics>,
tick_interval: Duration,
shutdown: watch::Receiver<bool>,
/// UUID identifying this process instance. Used as the lock owner in
/// distributed deployments to prevent double-fires across instances.
instance_id: String,
}
impl WorkflowScheduler {
pub fn new(
store: Arc<ScheduleStore>,
orchestrator: Arc<WorkflowOrchestrator>,
nats: Option<Arc<async_nats::Client>>,
metrics: Arc<WorkflowMetrics>,
shutdown: watch::Receiver<bool>,
) -> Self {
Self {
store,
orchestrator,
nats,
metrics,
tick_interval: Duration::from_secs(30),
shutdown,
instance_id: uuid::Uuid::new_v4().to_string(),
}
}
/// Override the default 30-second tick interval (useful in tests).
pub fn with_tick_interval(mut self, interval: Duration) -> Self {
self.tick_interval = interval;
self
}
/// Drive the scheduling loop until the shutdown signal fires.
///
/// Call from a `tokio::spawn` — the returned future completes when
/// `watch::Sender::send(true)` is called on the paired sender.
pub async fn run(self: Arc<Self>) {
let mut shutdown = self.shutdown.clone();
let mut interval = tokio::time::interval(self.tick_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
info!(
instance_id = %self.instance_id,
tick = ?self.tick_interval,
"WorkflowScheduler started"
);
loop {
tokio::select! {
changed = shutdown.changed() => {
if changed.is_ok() && *shutdown.borrow() {
info!(instance_id = %self.instance_id, "WorkflowScheduler shutting down");
break;
}
}
_ = interval.tick() => {
let now = Utc::now();
match self.store.load_enabled().await {
Ok(schedules) => {
for s in schedules.into_iter().filter(|s| s.is_due(now)) {
let scheduler = Arc::clone(&self);
let s = s.clone();
tokio::spawn(async move {
if let Err(e) = scheduler.fire_schedule(&s, now).await {
error!(
schedule_id = %s.id,
template = %s.template_name,
error = %e,
"Schedule fire error"
);
}
});
}
}
Err(e) => error!(error = %e, "Failed to load enabled schedules"),
}
}
}
}
}
async fn fire_schedule(&self, s: &ScheduledWorkflow, now: DateTime<Utc>) -> Result<()> {
// Acquire distributed lock — prevents double-fires in multi-instance
// deployments. Lock has a 120-second TTL enforced by the store.
let acquired = self
.store
.try_acquire_fire_lock(&s.id, &self.instance_id, &now)
.await?;
if !acquired {
info!(
schedule_id = %s.id,
template = %s.template_name,
instance_id = %self.instance_id,
"Fire lock held by another instance — skipping this tick"
);
self.metrics.schedules_skipped.inc();
return Ok(());
}
let result = self.fire_with_lock(s, now).await;
// Release the lock unconditionally. A failure here is non-fatal since
// the TTL will expire automatically after 120 s.
if let Err(e) = self.store.release_fire_lock(&s.id, &self.instance_id).await {
warn!(
schedule_id = %s.id,
instance_id = %self.instance_id,
error = %e,
"Failed to release schedule fire lock (TTL will expire automatically)"
);
}
result
}
/// Execute a single schedule fire — called after the distributed lock is
/// held.
async fn fire_with_lock(&self, s: &ScheduledWorkflow, now: DateTime<Utc>) -> Result<()> {
let tz = s.timezone.as_deref();
let normalized = normalize_cron(&s.cron_expression);
let cron_schedule = cron::Schedule::from_str(&normalized).map_err(|e| {
WorkflowError::Schedule(ScheduleError::InvalidCron {
expr: s.cron_expression.clone(),
reason: e.to_string(),
})
})?;
// Concurrency guard: skip if an active workflow for this template exists.
// Now safe to check without races because we hold the distributed lock.
if !s.allow_concurrent
&& self
.orchestrator
.has_active_workflow_for_template(&s.template_name)
{
let next_fire_at = compute_next_fire_at_tz(&s.cron_expression, tz);
self.store
.update_after_fire(&s.id, now, next_fire_at)
.await?;
self.store
.record_run(&ScheduleRun {
id: uuid::Uuid::new_v4().to_string(),
schedule_id: s.id.clone(),
workflow_instance_id: None,
fired_at: now,
status: RunStatus::Skipped,
notes: Some("Active workflow for template already exists".to_string()),
})
.await?;
self.metrics.schedules_skipped.inc();
info!(
schedule_id = %s.id,
template = %s.template_name,
"Schedule skipped: concurrent run in progress"
);
return Ok(());
}
// Determine which time slots to fire — timezone-aware.
let last = s.last_fired_at.unwrap_or(s.created_at);
let fire_times = compute_fire_times_tz(&cron_schedule, last, now, s.catch_up, tz);
let mut last_fire_time = now;
for &fire_time in &fire_times {
last_fire_time = fire_time;
match self
.orchestrator
.start_workflow(&s.template_name, s.initial_context.clone())
.await
{
Ok(workflow_id) => {
self.store
.record_run(&ScheduleRun {
id: uuid::Uuid::new_v4().to_string(),
schedule_id: s.id.clone(),
workflow_instance_id: Some(workflow_id.clone()),
fired_at: fire_time,
status: RunStatus::Fired,
notes: None,
})
.await?;
self.metrics.schedules_fired.inc();
info!(
schedule_id = %s.id,
template = %s.template_name,
workflow_id = %workflow_id,
fired_at = %fire_time,
timezone = ?tz,
"Schedule fired"
);
}
Err(e) => {
error!(
schedule_id = %s.id,
template = %s.template_name,
error = %e,
"Workflow start failed for scheduled fire"
);
self.store
.record_run(&ScheduleRun {
id: uuid::Uuid::new_v4().to_string(),
schedule_id: s.id.clone(),
workflow_instance_id: None,
fired_at: fire_time,
status: RunStatus::Failed,
notes: Some(e.to_string()),
})
.await?;
self.metrics.schedules_failed.inc();
}
}
}
// Advance the pointer to the next scheduled time (timezone-aware).
let next_fire_at = compute_next_fire_after_tz(&s.cron_expression, &last_fire_time, tz);
self.store
.update_after_fire(&s.id, last_fire_time, next_fire_at)
.await?;
// Publish NATS event — graceful skip when NATS is unavailable.
if let Some(nats) = &self.nats {
let event = serde_json::json!({
"type": "schedule_fired",
"schedule_id": s.id,
"template": s.template_name,
"timezone": s.timezone,
"fires": fire_times.len(),
"timestamp": Utc::now().to_rfc3339(),
});
if let Err(e) = nats
.publish("vapora.schedule.fired", event.to_string().into())
.await
{
warn!(error = %e, "Failed to publish vapora.schedule.fired");
}
}
Ok(())
}
}
/// Compute which UTC time slots to fire, optionally in a named timezone.
///
/// - `catch_up = true`: all missed slots since `last`, capped at 10, converted
/// to UTC.
/// - `catch_up = false`: always exactly one slot (`now`), timezone ignored.
///
/// When `tz` is `Some` the cron iterator runs in that timezone so that
/// e.g. "every day at 09:00 America/New_York" fires at 14:00 UTC (or 13:00
/// during EDT).
#[cfg_attr(not(test), allow(dead_code))]
fn compute_fire_times_tz(
schedule: &cron::Schedule,
last: DateTime<Utc>,
now: DateTime<Utc>,
catch_up: bool,
tz: Option<&str>,
) -> Vec<DateTime<Utc>> {
if !catch_up {
return vec![now];
}
let collected: Vec<DateTime<Utc>> = match tz.and_then(|s| s.parse::<chrono_tz::Tz>().ok()) {
Some(tz) => schedule
.after(&last.with_timezone(&tz))
.take_while(|t| t.with_timezone(&Utc) <= now)
.take(10)
.map(|t| t.with_timezone(&Utc))
.collect(),
None => schedule
.after(&last)
.take_while(|t| t <= &now)
.take(10)
.collect(),
};
if collected.is_empty() {
vec![now]
} else {
collected
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schedule::normalize_cron;
#[test]
fn test_normalize_cron_5field() {
let result = normalize_cron("0 9 * * 1-5");
assert_eq!(result, "0 0 9 * * 1-5 *");
}
#[test]
fn test_normalize_cron_7field() {
let expr = "0 0 9 * * 1-5 *";
let result = normalize_cron(expr);
assert_eq!(result, expr);
}
#[test]
fn test_compute_fires_no_catchup() {
let normalized = normalize_cron("0 0 * * *"); // Every day at midnight
let schedule = cron::Schedule::from_str(&normalized).expect("valid cron");
let now = Utc::now();
let last = now - chrono::Duration::days(3);
let fires = compute_fire_times_tz(&schedule, last, now, false, None);
// Without catch_up: always exactly one fire at `now`.
assert_eq!(fires.len(), 1);
assert_eq!(fires[0], now);
}
#[test]
fn test_compute_fires_with_catchup() {
let normalized = normalize_cron("0 0 * * *"); // Every day at midnight
let schedule = cron::Schedule::from_str(&normalized).expect("valid cron");
// last fired 5 days ago → should produce up to 5 missed midnight slots.
let now = Utc::now();
let last = now - chrono::Duration::days(5);
let fires = compute_fire_times_tz(&schedule, last, now, true, None);
assert!(fires.len() > 1, "catch_up should produce multiple fires");
assert!(fires.len() <= 10, "catch_up is capped at 10");
for &t in &fires {
assert!(t <= now, "catch_up fire time must not be in the future");
}
}
#[test]
fn test_compute_fires_with_catchup_named_tz() {
let normalized = normalize_cron("0 0 * * *"); // Every day at midnight (in tz)
let schedule = cron::Schedule::from_str(&normalized).expect("valid cron");
let now = Utc::now();
let last = now - chrono::Duration::days(3);
let fires_utc = compute_fire_times_tz(&schedule, last, now, true, None);
let fires_tz = compute_fire_times_tz(&schedule, last, now, true, Some("America/New_York"));
// Both produce multiple slots. The UTC times differ because midnight NY
// is 05:00 UTC (or 04:00 EDT) — so the same count but different instants.
assert!(!fires_utc.is_empty());
assert!(!fires_tz.is_empty());
if fires_utc.len() == fires_tz.len() && fires_utc.len() > 0 {
// Timestamps must differ (different timezone means different UTC offset).
assert_ne!(
fires_utc[0], fires_tz[0],
"UTC and NY midnight must be different UTC instants"
);
}
}
#[test]
fn test_instance_id_is_unique() {
// Each WorkflowScheduler gets a distinct UUID to prevent lock collisions.
let id1 = uuid::Uuid::new_v4().to_string();
let id2 = uuid::Uuid::new_v4().to_string();
assert_ne!(id1, id2);
}
}

View File

@ -0,0 +1,101 @@
# ADR-0034: Autonomous Cron Scheduling — Timezone Support and Distributed Fire-Lock
**Status**: Implemented
**Date**: 2026-02-26
**Deciders**: VAPORA Team
**Technical Story**: `vapora-workflow-engine` scheduler fired cron jobs only in UTC and had no protection against double-fires in multi-instance deployments.
---
## Decision
Extend the autonomous scheduling subsystem with two independent hardening layers:
1. **Timezone-aware scheduling** (`chrono-tz`) — cron expressions evaluated in any IANA timezone, stored per-schedule, validated at API and config-load boundaries.
2. **Distributed fire-lock** — SurrealDB conditional `UPDATE ... WHERE locked_by IS NONE OR locked_at < $expiry` provides atomic, TTL-backed mutual exclusion across instances without additional infrastructure.
---
## Context
### Gaps Addressed
| Gap | Consequence |
|-----|-------------|
| UTC-only cron evaluation | `"0 9 * * *"` fires at 09:00 UTC regardless of business timezone; scheduled reports or maintenance windows drift by the UTC offset |
| No distributed coordination | Two `vapora-workflow-engine` instances reading the same `scheduled_workflows` table both fire the same schedule at the same tick |
### Why These Approaches
**`chrono-tz`** over manual UTC-offset arithmetic:
- Compile-time exhaustive enum of all IANA timezone names — invalid names are rejected at parse time.
- The `cron` crate's `Schedule::upcoming(tz)` / `Schedule::after(&dt_in_tz)` are generic over any `TimeZone`, so timezone-awareness requires no special-casing in iteration logic: pass `DateTime<chrono_tz::Tz>` instead of `DateTime<Utc>`, convert output with `.with_timezone(&Utc)`.
- DST transitions handled automatically by `chrono-tz` — no application code needed.
**SurrealDB conditional UPDATE** over external distributed lock (Redis, etcd):
- No additional infrastructure dependency.
- SurrealDB applies document-level write locking; `UPDATE record WHERE condition` is atomic — two concurrent instances race on the same document and only one succeeds (non-empty return array = lock acquired).
- 120-second TTL enforced in application code: `locked_at < $expiry` in the WHERE clause auto-expires a lock from a crashed instance within two scheduler ticks.
---
## Implementation
### New Fields
`scheduled_workflows` table gains three columns (migration 011):
| Field | Type | Purpose |
|-------|------|---------|
| `timezone` | `option<string>` | IANA identifier (`"America/New_York"`) or `NONE` for UTC |
| `locked_by` | `option<string>` | UUID of the instance holding the current fire-lock |
| `locked_at` | `option<datetime>` | When the lock was acquired; used for TTL expiry |
### Lock Protocol
```
Tick N fires schedule S:
try_acquire_fire_lock(id, instance_id, now)
→ UPDATE ... WHERE locked_by IS NONE OR locked_at < (now - 120s)
→ returns true (non-empty) or false (empty)
if false: log + inc schedules_skipped, return
fire_with_lock(S, now) ← actual workflow start
release_fire_lock(id, instance_id)
→ UPDATE ... WHERE locked_by = instance_id
→ own-instance guard prevents stale release
```
Lock release is always attempted even on `fire_with_lock` error; a `warn!` is emitted if release fails (TTL provides fallback).
### Timezone-Aware Cron Evaluation
```
compute_fire_times_tz(schedule, last, now, catch_up, tz):
match tz.parse::<chrono_tz::Tz>():
Some(tz) → schedule.after(&last.with_timezone(&tz))
.take_while(|t| t.with_timezone(&Utc) <= now)
.map(|t| t.with_timezone(&Utc))
None → schedule.after(&last) ← UTC
```
Parsing an unknown/invalid timezone string silently falls back to UTC — avoids a hard error at runtime if a previously valid TZ identifier is removed from the `chrono-tz` database in a future upgrade.
### API Surface Changes
`PUT /api/v1/schedules/:id` and `PATCH /api/v1/schedules/:id` accept and return `timezone: Option<String>`. Timezone is validated at the API boundary using `validate_timezone()` (returns `400 InvalidInput` for unknown identifiers). Config-file `[schedule]` blocks also accept `timezone` and are validated at startup (fail-fast, same as `cron`).
---
## Consequences
### Positive
- Schedules expressed in business-local time — no mental UTC arithmetic for operators.
- Multi-instance deployments safe by default; no external lock service required.
- `ScheduledWorkflow.timezone` is nullable/optional — all existing schedules without the field default to UTC with no migration required.
### Negative / Trade-offs
- `chrono-tz` adds ~2 MB of IANA timezone data to the binary (compile-time embedded).
- Distributed lock TTL of 120 s means a worst-case window of one double-fire per 120 s if the winning instance crashes between acquiring the lock and calling `update_after_fire`. Acceptable given the `schedule_runs` audit log makes duplicates visible.
- No multi-PATCH for timezone clearance: passing `timezone: null` in JSON is treated as absent (`#[serde(default)]`). Clearing timezone (revert to UTC) requires a full PUT.

View File

@ -528,11 +528,85 @@ docker logs vapora-backend
nats sub "vapora.tasks.>" nats sub "vapora.tasks.>"
``` ```
## Autonomous Scheduling
Workflows with `trigger = "schedule"` fire automatically on a cron expression without any REST trigger.
### TOML Configuration
```toml
[[workflows]]
name = "nightly_analysis"
trigger = "schedule"
[workflows.schedule]
cron = "0 2 * * *" # 5-field: min hour dom month dow
timezone = "America/New_York" # IANA identifier; omit for UTC
allow_concurrent = false # skip if previous run is still active
catch_up = false # fire missed slots on restart (capped 10)
[[workflows.stages]]
name = "analyze"
agents = ["analyst"]
```
Cron accepts 5-field (standard shell), 6-field (with seconds), or 7-field (with seconds + year). The expression is validated at config-load time — startup fails on invalid cron or unknown timezone.
### Schedule REST API
| Method | Path | Description |
|--------|------|-------------|
| `GET` | `/api/v1/schedules` | List all schedules |
| `GET` | `/api/v1/schedules/:id` | Get one schedule |
| `PUT` | `/api/v1/schedules/:id` | Create or fully replace |
| `PATCH` | `/api/v1/schedules/:id` | Partial update |
| `DELETE` | `/api/v1/schedules/:id` | Remove |
| `GET` | `/api/v1/schedules/:id/runs` | Execution history (last 100) |
| `POST` | `/api/v1/schedules/:id/fire` | Manual trigger bypassing cron |
**PUT body** (all fields):
```json
{
"template_name": "nightly_analysis",
"cron_expression": "0 2 * * *",
"timezone": "America/New_York",
"enabled": true,
"allow_concurrent": false,
"catch_up": false,
"initial_context": {}
}
```
**PATCH body** (only changed fields):
```json
{ "enabled": false }
```
### Timezone Support
`timezone` is an IANA timezone identifier (e.g. `"America/New_York"`, `"Europe/Berlin"`, `"Asia/Tokyo"`). When absent, UTC is used. DST transitions are handled automatically.
The REST API validates the timezone at the boundary — an unknown identifier returns `400 InvalidInput`.
### Distributed Fire-Lock
When multiple VAPORA backend instances run against the same SurrealDB, the scheduler uses a conditional `UPDATE ... WHERE locked_by IS NONE OR locked_at < (now - 120s)` to ensure only one instance fires each schedule per tick. The lock holder is identified by a per-process UUID stored in `locked_by`; it expires automatically after 120 seconds, handling crashed instances.
### Schedule Metrics (Prometheus)
- `vapora_schedules_fired_total` — successful fires
- `vapora_schedules_skipped_total` — skipped (concurrent guard or distributed lock contention)
- `vapora_schedules_failed_total` — workflow start failures
- `vapora_active_schedules` — current count (gauge)
## Related Documentation ## Related Documentation
- [CLI Commands Guide](../setup/cli-commands.md) - Command-line usage - [CLI Commands Guide](../setup/cli-commands.md) - Command-line usage
- [Multi-Agent Workflows](../architecture/multi-agent-workflows.md) - Architecture overview - [Multi-Agent Workflows](../architecture/multi-agent-workflows.md) - Architecture overview
- [Agent Registry & Coordination](../architecture/agent-registry-coordination.md) - Agent management - [Agent Registry & Coordination](../architecture/agent-registry-coordination.md) - Agent management
- [ADR-0028: Workflow Orchestrator](../adrs/0028-workflow-orchestrator.md) - Decision rationale - [ADR-0028: Workflow Orchestrator](../adrs/0028-workflow-orchestrator.md) - Decision rationale
- [ADR-0034: Autonomous Scheduling](../adrs/0034-autonomous-scheduling.md) - Scheduling design decisions
- [ADR-0014: Learning-Based Agent Selection](../adrs/0014-learning-profiles.md) - Agent selection - [ADR-0014: Learning-Based Agent Selection](../adrs/0014-learning-profiles.md) - Agent selection
- [ADR-0015: Budget Enforcement](../adrs/0015-budget-enforcement.md) - Cost control - [ADR-0015: Budget Enforcement](../adrs/0015-budget-enforcement.md) - Cost control

View File

@ -0,0 +1,44 @@
-- Migration 010: Scheduled Workflow Definitions and Run History
-- Enables autonomous cron-based workflow firing without REST triggers.
-- Two tables: schedule definitions (managed by TOML + DB) and an append-only run log.
DEFINE TABLE scheduled_workflows SCHEMAFULL;
DEFINE FIELD id ON TABLE scheduled_workflows TYPE record<scheduled_workflows>;
DEFINE FIELD template_name ON TABLE scheduled_workflows TYPE string ASSERT $value != NONE;
DEFINE FIELD cron_expression ON TABLE scheduled_workflows TYPE string ASSERT $value != NONE;
DEFINE FIELD initial_context ON TABLE scheduled_workflows FLEXIBLE TYPE object DEFAULT {};
DEFINE FIELD enabled ON TABLE scheduled_workflows TYPE bool DEFAULT true;
DEFINE FIELD allow_concurrent ON TABLE scheduled_workflows TYPE bool DEFAULT false;
DEFINE FIELD catch_up ON TABLE scheduled_workflows TYPE bool DEFAULT false;
DEFINE FIELD last_fired_at ON TABLE scheduled_workflows TYPE option<datetime> DEFAULT NONE;
DEFINE FIELD next_fire_at ON TABLE scheduled_workflows TYPE option<datetime> DEFAULT NONE;
DEFINE FIELD runs_count ON TABLE scheduled_workflows TYPE int DEFAULT 0;
DEFINE FIELD created_at ON TABLE scheduled_workflows TYPE datetime DEFAULT time::now();
DEFINE FIELD updated_at ON TABLE scheduled_workflows TYPE datetime DEFAULT time::now() VALUE time::now();
DEFINE INDEX idx_scheduled_workflows_template
ON TABLE scheduled_workflows COLUMNS template_name;
DEFINE INDEX idx_scheduled_workflows_enabled
ON TABLE scheduled_workflows COLUMNS enabled;
-- Append-only execution history for audit and debugging.
DEFINE TABLE schedule_runs SCHEMAFULL;
DEFINE FIELD id ON TABLE schedule_runs TYPE record<schedule_runs>;
DEFINE FIELD schedule_id ON TABLE schedule_runs TYPE string ASSERT $value != NONE;
DEFINE FIELD workflow_instance_id ON TABLE schedule_runs TYPE option<string> DEFAULT NONE;
DEFINE FIELD fired_at ON TABLE schedule_runs TYPE datetime ASSERT $value != NONE;
DEFINE FIELD status ON TABLE schedule_runs TYPE string
ASSERT $value INSIDE ['Fired', 'Skipped', 'Failed'];
DEFINE FIELD notes ON TABLE schedule_runs TYPE option<string> DEFAULT NONE;
DEFINE INDEX idx_schedule_runs_schedule_id
ON TABLE schedule_runs COLUMNS schedule_id;
DEFINE INDEX idx_schedule_runs_fired_at
ON TABLE schedule_runs COLUMNS fired_at;
DEFINE INDEX idx_schedule_runs_schedule_fired
ON TABLE schedule_runs COLUMNS schedule_id, fired_at;

View File

@ -0,0 +1,9 @@
-- Migration 011: Timezone and Distributed Fire-Lock for Scheduled Workflows
-- Extends the scheduled_workflows table with:
-- timezone — IANA timezone identifier for cron evaluation (e.g. "America/New_York")
-- locked_by — instance UUID holding the current fire lock (prevents double-fires)
-- locked_at — when the lock was acquired; TTL = 120 s is enforced in application code
DEFINE FIELD timezone ON TABLE scheduled_workflows TYPE option<string> DEFAULT NONE;
DEFINE FIELD locked_by ON TABLE scheduled_workflows TYPE option<string> DEFAULT NONE;
DEFINE FIELD locked_at ON TABLE scheduled_workflows TYPE option<datetime> DEFAULT NONE;