From bb55c80d2bf2c9d0a0f391071a8ef230a52892ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jesu=CC=81s=20Pe=CC=81rez?= Date: Thu, 26 Feb 2026 11:34:44 +0000 Subject: [PATCH] feat(workflow-engine): autonomous scheduling with timezone and distributed lock Add cron-based autonomous workflow firing with two hardening layers: - Timezone-aware scheduling via chrono-tz: ScheduledWorkflow.timezone (IANA identifier), compute_next_fire_at/after_tz, validate_timezone; DST-safe, UTC fallback when absent; validated at config load and REST API - Distributed fire-lock via SurrealDB conditional UPDATE (locked_by/locked_at fields, 120 s TTL); WorkflowScheduler gains instance_id (UUID) as lock owner; prevents double-fires across multi-instance deployments without extra infra - ScheduleStore: try_acquire_fire_lock, release_fire_lock (own-instance guard), full CRUD (load_one/all, full_upsert, patch, delete, load_runs) - REST: 7 endpoints (GET/PUT/PATCH/DELETE schedules, runs history, manual fire) with timezone field in all request/response types - Migrations 010 (schedule tables) + 011 (timezone + lock columns) - Tests: 48 passing (was 26); ADR-0034; changelog; feature docs updated --- CHANGELOG.md | 38 ++ Cargo.lock | 43 +- crates/vapora-backend/src/api/mod.rs | 1 + crates/vapora-backend/src/api/schedules.rs | 451 ++++++++++++++++++ crates/vapora-backend/src/api/state.rs | 10 +- .../src/api/workflow_orchestrator.rs | 1 + crates/vapora-backend/src/main.rs | 25 +- crates/vapora-workflow-engine/Cargo.toml | 4 + crates/vapora-workflow-engine/src/config.rs | 156 ++++++ crates/vapora-workflow-engine/src/error.rs | 12 + crates/vapora-workflow-engine/src/instance.rs | 1 + crates/vapora-workflow-engine/src/lib.rs | 14 +- crates/vapora-workflow-engine/src/metrics.rs | 25 + .../src/orchestrator.rs | 47 ++ crates/vapora-workflow-engine/src/schedule.rs | 248 ++++++++++ .../src/schedule_store.rs | 384 +++++++++++++++ .../vapora-workflow-engine/src/scheduler.rs | 388 +++++++++++++++ docs/adrs/0034-autonomous-scheduling.md | 101 ++++ docs/features/workflow-orchestrator.md | 74 +++ migrations/010_scheduled_workflows.surql | 44 ++ migrations/011_schedule_tz_lock.surql | 9 + 21 files changed, 2071 insertions(+), 5 deletions(-) create mode 100644 crates/vapora-backend/src/api/schedules.rs create mode 100644 crates/vapora-workflow-engine/src/schedule.rs create mode 100644 crates/vapora-workflow-engine/src/schedule_store.rs create mode 100644 crates/vapora-workflow-engine/src/scheduler.rs create mode 100644 docs/adrs/0034-autonomous-scheduling.md create mode 100644 migrations/010_scheduled_workflows.surql create mode 100644 migrations/011_schedule_tz_lock.surql diff --git a/CHANGELOG.md b/CHANGELOG.md index dcbfa2c..7b2024a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,44 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added - Autonomous Scheduling: Timezone Support and Distributed Fire-Lock + +#### `vapora-workflow-engine` — scheduling hardening + +- **Timezone-aware cron evaluation** (`chrono-tz = "0.10"`): + - `ScheduledWorkflow.timezone: Option` — IANA identifier stored per-schedule + - `compute_next_fire_at_tz(expr, tz)` / `compute_next_fire_after_tz(expr, after, tz)` — generic over `chrono_tz::Tz`; UTC fallback when `tz = None` + - `validate_timezone(tz)` — compile-time exhaustive IANA enum, rejects unknown identifiers + - `compute_fire_times_tz` in `scheduler.rs` — catch-up and normal firing both timezone-aware + - Config-load validation: `[workflows.schedule] timezone = "..."` validated at startup (fail-fast) +- **Distributed fire-lock** (SurrealDB document-level atomic CAS): + - `scheduled_workflows` gains `locked_by: option` and `locked_at: option` (migration 011) + - `ScheduleStore::try_acquire_fire_lock(id, instance_id, now)` — conditional `UPDATE ... WHERE locked_by IS NONE OR locked_at < $expiry`; returns `true` only if update succeeded (non-empty result = lock acquired) + - `ScheduleStore::release_fire_lock(id, instance_id)` — `WHERE locked_by = $instance_id` guard prevents stale release after TTL expiry + - `WorkflowScheduler.instance_id: String` — UUID generated at startup, identifies lock owner + - 120-second TTL: crashed instance's lock auto-expires within two scheduler ticks + - Lock acquired before `fire_with_lock`, released in `finally`-style block after (warn on release failure, TTL fallback) +- New tests: `test_validate_timezone_valid`, `test_validate_timezone_invalid`, `test_compute_next_fire_at_tz_utc`, `test_compute_next_fire_at_tz_named`, `test_compute_next_fire_at_tz_invalid_tz_fallback`, `test_compute_fires_with_catchup_named_tz`, `test_instance_id_is_unique` +- Test count: 48 (was 41) + +#### `vapora-backend` — schedule REST API surface + +- `ScheduleResponse`, `PutScheduleRequest`, `PatchScheduleRequest` gain `timezone: Option` +- `validate_tz()` helper validates at API boundary → `400 InvalidInput` on unknown identifier +- `put_schedule` and `patch_schedule` use `compute_next_fire_at_tz` / `compute_next_fire_after_tz` +- `fire_schedule` uses `compute_next_fire_after_tz` with schedule's stored timezone + +#### Migrations + +- **`migrations/011_schedule_tz_lock.surql`**: `DEFINE FIELD timezone`, `locked_by`, `locked_at` on `scheduled_workflows` + +#### Documentation + +- **ADR-0034**: design rationale for `chrono-tz` selection and SurrealDB conditional UPDATE lock +- **`docs/features/workflow-orchestrator.md`**: Autonomous Scheduling section with TOML config, REST API table, timezone/distributed lock explanations, Prometheus metrics + +--- + ### Added - Workflow Engine Hardening (Persistence · Saga · Cedar) #### `vapora-workflow-engine` — three new hardening layers diff --git a/Cargo.lock b/Cargo.lock index e88d8b0..7ce3b30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1679,6 +1679,16 @@ dependencies = [ "phf 0.11.3", ] +[[package]] +name = "chrono-tz" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6139a8597ed92cf816dfb33f5dd6cf0bb93a6adc938f11039f371bc5bcd26c3" +dependencies = [ + "chrono", + "phf 0.12.1", +] + [[package]] name = "chrono-tz-build" version = "0.3.0" @@ -2236,6 +2246,17 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" +[[package]] +name = "cron" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8c3e73077b4b4a6ab1ea5047c37c57aee77657bc8ecd6f29b0af082d0b0c07" +dependencies = [ + "chrono", + "nom 7.1.3", + "once_cell", +] + [[package]] name = "crossbeam" version = "0.8.4" @@ -7051,6 +7072,15 @@ dependencies = [ "phf_shared 0.11.3", ] +[[package]] +name = "phf" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "913273894cec178f401a31ec4b656318d95473527be05c0752cc41cdc32be8b7" +dependencies = [ + "phf_shared 0.12.1", +] + [[package]] name = "phf" version = "0.13.1" @@ -7130,6 +7160,15 @@ dependencies = [ "unicase", ] +[[package]] +name = "phf_shared" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06005508882fb681fd97892ecff4b7fd0fee13ef1aa569f8695dae7ab9099981" +dependencies = [ + "siphasher", +] + [[package]] name = "phf_shared" version = "0.13.1" @@ -10937,7 +10976,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8004bca281f2d32df3bacd59bc67b312cb4c70cea46cbd79dbe8ac5ed206722" dependencies = [ "chrono", - "chrono-tz", + "chrono-tz 0.9.0", "globwalk", "humansize", "lazy_static", @@ -12643,6 +12682,8 @@ dependencies = [ "async-trait", "cedar-policy 4.9.0", "chrono", + "chrono-tz 0.10.4", + "cron", "dashmap 6.1.0", "futures", "mockall", diff --git a/crates/vapora-backend/src/api/mod.rs b/crates/vapora-backend/src/api/mod.rs index a89bd38..3d4e234 100644 --- a/crates/vapora-backend/src/api/mod.rs +++ b/crates/vapora-backend/src/api/mod.rs @@ -12,6 +12,7 @@ pub mod proposals; pub mod provider_analytics; pub mod provider_metrics; pub mod rlm; +pub mod schedules; pub mod state; pub mod swarm; pub mod tasks; diff --git a/crates/vapora-backend/src/api/schedules.rs b/crates/vapora-backend/src/api/schedules.rs new file mode 100644 index 0000000..ce6cf16 --- /dev/null +++ b/crates/vapora-backend/src/api/schedules.rs @@ -0,0 +1,451 @@ +use std::sync::Arc; + +use axum::{ + extract::{Path, State}, + http::StatusCode, + Json, +}; +use chrono::Utc; +use serde::{Deserialize, Serialize}; +use tracing::{error, info, warn}; +use vapora_shared::VaporaError; +use vapora_workflow_engine::{ + compute_next_fire_after_tz, compute_next_fire_at_tz, validate_cron_expression, + validate_timezone, RunStatus, ScheduleRun, ScheduleStore, ScheduledWorkflow, +}; + +use crate::api::error::ApiError; +use crate::api::state::AppState; + +// ─── Response types ────────────────────────────────────────────────────────── + +#[derive(Debug, Serialize)] +pub struct ScheduleResponse { + pub id: String, + pub template_name: String, + pub cron_expression: String, + pub initial_context: serde_json::Value, + pub enabled: bool, + pub allow_concurrent: bool, + pub catch_up: bool, + pub timezone: Option, + pub last_fired_at: Option, + pub next_fire_at: Option, + pub runs_count: u64, + pub created_at: String, + pub updated_at: String, +} + +impl From for ScheduleResponse { + fn from(s: ScheduledWorkflow) -> Self { + Self { + id: s.id, + template_name: s.template_name, + cron_expression: s.cron_expression, + initial_context: s.initial_context, + enabled: s.enabled, + allow_concurrent: s.allow_concurrent, + catch_up: s.catch_up, + timezone: s.timezone, + last_fired_at: s.last_fired_at.map(|t| t.to_rfc3339()), + next_fire_at: s.next_fire_at.map(|t| t.to_rfc3339()), + runs_count: s.runs_count, + created_at: s.created_at.to_rfc3339(), + updated_at: s.updated_at.to_rfc3339(), + } + } +} + +#[derive(Debug, Serialize)] +pub struct ScheduleRunResponse { + pub id: String, + pub schedule_id: String, + pub workflow_instance_id: Option, + pub fired_at: String, + pub status: String, + pub notes: Option, +} + +impl From for ScheduleRunResponse { + fn from(r: ScheduleRun) -> Self { + Self { + id: r.id, + schedule_id: r.schedule_id, + workflow_instance_id: r.workflow_instance_id, + fired_at: r.fired_at.to_rfc3339(), + status: match r.status { + RunStatus::Fired => "fired".to_string(), + RunStatus::Skipped => "skipped".to_string(), + RunStatus::Failed => "failed".to_string(), + }, + notes: r.notes, + } + } +} + +#[derive(Debug, Serialize)] +pub struct ScheduleListResponse { + pub schedules: Vec, + pub total: usize, +} + +#[derive(Debug, Serialize)] +pub struct RunListResponse { + pub runs: Vec, + pub total: usize, +} + +#[derive(Debug, Serialize)] +pub struct MessageResponse { + pub success: bool, + pub message: String, +} + +// ─── Request types +// ──────────────────────────────────────────────────────────── + +/// Body for `PUT /api/v1/schedules/:id` — full replacement. +#[derive(Debug, Deserialize)] +pub struct PutScheduleRequest { + pub template_name: String, + pub cron_expression: String, + #[serde(default)] + pub initial_context: Option, + #[serde(default)] + pub enabled: Option, + #[serde(default)] + pub allow_concurrent: bool, + #[serde(default)] + pub catch_up: bool, + /// IANA timezone for cron evaluation (e.g. `"America/New_York"`). UTC when + /// absent. + #[serde(default)] + pub timezone: Option, +} + +/// Body for `PATCH /api/v1/schedules/:id` — partial update. +#[derive(Debug, Deserialize)] +pub struct PatchScheduleRequest { + pub enabled: Option, + pub cron_expression: Option, + pub allow_concurrent: Option, + pub catch_up: Option, + pub initial_context: Option, + /// Update the timezone. Pass `null` explicitly to clear it (revert to UTC). + #[serde(default)] + pub timezone: Option, +} + +// ─── Helper +// ─────────────────────────────────────────────────────────────────── + +fn require_store(state: &AppState) -> Result, ApiError> { + state.schedule_store.clone().ok_or_else(|| { + ApiError(VaporaError::InternalError( + "Schedule store not available".to_string(), + )) + }) +} + +fn validate_cron(expr: &str) -> Result<(), ApiError> { + validate_cron_expression(expr).map_err(|e| { + ApiError(VaporaError::InvalidInput(format!( + "Invalid cron expression '{}': {}", + expr, e + ))) + }) +} + +fn validate_tz(tz: &str) -> Result<(), ApiError> { + validate_timezone(tz).map_err(|e| ApiError(VaporaError::InvalidInput(e))) +} + +// ─── Handlers +// ───────────────────────────────────────────────────────────────── + +/// `GET /api/v1/schedules` — list all schedules. +pub async fn list_schedules( + State(state): State, +) -> Result, ApiError> { + let store = require_store(&state)?; + let schedules = store.load_all().await.map_err(|e| { + error!("list_schedules: {e}"); + ApiError(VaporaError::InternalError(e.to_string())) + })?; + let total = schedules.len(); + Ok(Json(ScheduleListResponse { + total, + schedules: schedules.into_iter().map(ScheduleResponse::from).collect(), + })) +} + +/// `GET /api/v1/schedules/:id` — get a single schedule. +pub async fn get_schedule( + State(state): State, + Path(id): Path, +) -> Result, ApiError> { + let store = require_store(&state)?; + let schedule = store + .load_one(&id) + .await + .map_err(|e| { + error!(schedule_id = %id, "get_schedule: {e}"); + ApiError(VaporaError::InternalError(e.to_string())) + })? + .ok_or_else(|| ApiError(VaporaError::NotFound(format!("Schedule {} not found", id))))?; + + Ok(Json(ScheduleResponse::from(schedule))) +} + +/// `PUT /api/v1/schedules/:id` — create or fully replace a schedule. +/// +/// Preserves `last_fired_at` and `runs_count` from the existing record. +pub async fn put_schedule( + State(state): State, + Path(id): Path, + Json(req): Json, +) -> Result<(StatusCode, Json), ApiError> { + let store = require_store(&state)?; + + validate_cron(&req.cron_expression)?; + if let Some(ref tz) = req.timezone { + validate_tz(tz)?; + } + let tz = req.timezone.as_deref(); + let next_fire_at = compute_next_fire_at_tz(&req.cron_expression, tz); + + let now = Utc::now(); + let s = ScheduledWorkflow { + id: id.clone(), + template_name: req.template_name.clone(), + cron_expression: req.cron_expression.clone(), + initial_context: req + .initial_context + .unwrap_or(serde_json::Value::Object(Default::default())), + enabled: req.enabled.unwrap_or(true), + allow_concurrent: req.allow_concurrent, + catch_up: req.catch_up, + timezone: req.timezone.clone(), + last_fired_at: None, + next_fire_at, + runs_count: 0, + created_at: now, + updated_at: now, + }; + + store.full_upsert(&s).await.map_err(|e| { + error!(schedule_id = %id, "put_schedule: {e}"); + ApiError(VaporaError::InternalError(e.to_string())) + })?; + + let updated = store + .load_one(&id) + .await + .map_err(|e| ApiError(VaporaError::InternalError(e.to_string())))? + .ok_or_else(|| { + ApiError(VaporaError::InternalError( + "Schedule vanished after upsert".into(), + )) + })?; + + info!(schedule_id = %id, template = %req.template_name, "Schedule PUT"); + Ok((StatusCode::OK, Json(ScheduleResponse::from(updated)))) +} + +/// `PATCH /api/v1/schedules/:id` — partial update. +/// +/// Only the provided fields are changed. If `cron_expression` is updated, +/// `next_fire_at` is recomputed automatically. +pub async fn patch_schedule( + State(state): State, + Path(id): Path, + Json(req): Json, +) -> Result, ApiError> { + let store = require_store(&state)?; + + // Ensure the schedule exists first. + store + .load_one(&id) + .await + .map_err(|e| ApiError(VaporaError::InternalError(e.to_string())))? + .ok_or_else(|| ApiError(VaporaError::NotFound(format!("Schedule {} not found", id))))?; + + let mut patch = serde_json::Map::new(); + + if let Some(enabled) = req.enabled { + patch.insert("enabled".into(), serde_json::json!(enabled)); + } + if let Some(allow_concurrent) = req.allow_concurrent { + patch.insert( + "allow_concurrent".into(), + serde_json::json!(allow_concurrent), + ); + } + if let Some(catch_up) = req.catch_up { + patch.insert("catch_up".into(), serde_json::json!(catch_up)); + } + if let Some(ctx) = req.initial_context { + patch.insert("initial_context".into(), ctx); + } + if let Some(ref tz) = req.timezone { + validate_tz(tz)?; + patch.insert("timezone".into(), serde_json::json!(tz)); + } + if let Some(ref cron) = req.cron_expression { + validate_cron(cron)?; + // Recompute next_fire_at using the new cron and whatever timezone is + // already in the patch (or falls back to None → UTC). + let tz = req.timezone.as_deref(); + let next_fire_at = compute_next_fire_at_tz(cron, tz); + patch.insert("cron_expression".into(), serde_json::json!(cron)); + patch.insert("next_fire_at".into(), serde_json::json!(next_fire_at)); + } + patch.insert("updated_at".into(), serde_json::json!(Utc::now())); + + let updated = store + .patch(&id, serde_json::Value::Object(patch)) + .await + .map_err(|e| { + error!(schedule_id = %id, "patch_schedule: {e}"); + ApiError(VaporaError::InternalError(e.to_string())) + })? + .ok_or_else(|| { + ApiError(VaporaError::NotFound(format!( + "Schedule {} not found after patch", + id + ))) + })?; + + info!(schedule_id = %id, "Schedule PATCHed"); + Ok(Json(ScheduleResponse::from(updated))) +} + +/// `DELETE /api/v1/schedules/:id` — permanently remove a schedule. +pub async fn delete_schedule( + State(state): State, + Path(id): Path, +) -> Result<(StatusCode, Json), ApiError> { + let store = require_store(&state)?; + + // Verify existence before delete. + store + .load_one(&id) + .await + .map_err(|e| ApiError(VaporaError::InternalError(e.to_string())))? + .ok_or_else(|| ApiError(VaporaError::NotFound(format!("Schedule {} not found", id))))?; + + store.delete(&id).await.map_err(|e| { + error!(schedule_id = %id, "delete_schedule: {e}"); + ApiError(VaporaError::InternalError(e.to_string())) + })?; + + info!(schedule_id = %id, "Schedule deleted"); + Ok(( + StatusCode::OK, + Json(MessageResponse { + success: true, + message: format!("Schedule {} deleted", id), + }), + )) +} + +/// `GET /api/v1/schedules/:id/runs` — execution history (last 100, desc). +pub async fn list_schedule_runs( + State(state): State, + Path(id): Path, +) -> Result, ApiError> { + let store = require_store(&state)?; + + // Ensure schedule exists. + store + .load_one(&id) + .await + .map_err(|e| ApiError(VaporaError::InternalError(e.to_string())))? + .ok_or_else(|| ApiError(VaporaError::NotFound(format!("Schedule {} not found", id))))?; + + let runs = store.load_runs(&id).await.map_err(|e| { + error!(schedule_id = %id, "list_schedule_runs: {e}"); + ApiError(VaporaError::InternalError(e.to_string())) + })?; + let total = runs.len(); + Ok(Json(RunListResponse { + total, + runs: runs.into_iter().map(ScheduleRunResponse::from).collect(), + })) +} + +/// `POST /api/v1/schedules/:id/fire` — immediately trigger a scheduled +/// workflow bypassing the cron timer. +/// +/// Records an auditable `ScheduleRun` with `status = Fired` and advances +/// `last_fired_at` / `next_fire_at` exactly like the background scheduler. +pub async fn fire_schedule( + State(state): State, + Path(id): Path, +) -> Result<(StatusCode, Json), ApiError> { + let store = require_store(&state)?; + let orchestrator = state.workflow_orchestrator.as_ref().ok_or_else(|| { + ApiError(VaporaError::InternalError( + "Workflow orchestrator not available".to_string(), + )) + })?; + + let schedule = store + .load_one(&id) + .await + .map_err(|e| ApiError(VaporaError::InternalError(e.to_string())))? + .ok_or_else(|| ApiError(VaporaError::NotFound(format!("Schedule {} not found", id))))?; + + if !schedule.enabled { + return Err(ApiError(VaporaError::InvalidInput(format!( + "Schedule {} is disabled", + id + )))); + } + + let now = Utc::now(); + + let workflow_id = orchestrator + .start_workflow(&schedule.template_name, schedule.initial_context.clone()) + .await + .map_err(|e| { + error!(schedule_id = %id, "fire_schedule start_workflow: {e}"); + ApiError(VaporaError::WorkflowError(e.to_string())) + })?; + + let run = ScheduleRun { + id: uuid::Uuid::new_v4().to_string(), + schedule_id: id.clone(), + workflow_instance_id: Some(workflow_id.clone()), + fired_at: now, + status: RunStatus::Fired, + notes: Some("Manual fire via API".to_string()), + }; + store.record_run(&run).await.map_err(|e| { + warn!(schedule_id = %id, "fire_schedule record_run: {e}"); + ApiError(VaporaError::InternalError(e.to_string())) + })?; + + let next_fire_at = compute_next_fire_after_tz( + &schedule.cron_expression, + &now, + schedule.timezone.as_deref(), + ); + + store + .update_after_fire(&id, now, next_fire_at) + .await + .map_err(|e| { + warn!(schedule_id = %id, "fire_schedule update_after_fire: {e}"); + ApiError(VaporaError::InternalError(e.to_string())) + })?; + + info!( + schedule_id = %id, + template = %schedule.template_name, + workflow_id = %workflow_id, + "Schedule manually fired via API" + ); + + Ok((StatusCode::CREATED, Json(ScheduleRunResponse::from(run)))) +} diff --git a/crates/vapora-backend/src/api/state.rs b/crates/vapora-backend/src/api/state.rs index 067361f..a158f9b 100644 --- a/crates/vapora-backend/src/api/state.rs +++ b/crates/vapora-backend/src/api/state.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use vapora_rlm::storage::SurrealDBStorage; use vapora_rlm::RLMEngine; -use vapora_workflow_engine::WorkflowOrchestrator; +use vapora_workflow_engine::{ScheduleStore, WorkflowOrchestrator}; use crate::services::{ AgentService, ProjectService, ProposalService, ProviderAnalyticsService, TaskService, @@ -20,6 +20,7 @@ pub struct AppState { pub provider_analytics_service: Arc, pub workflow_orchestrator: Option>, pub rlm_engine: Option>>, + pub schedule_store: Option>, } impl AppState { @@ -39,6 +40,7 @@ impl AppState { provider_analytics_service: Arc::new(provider_analytics_service), workflow_orchestrator: None, rlm_engine: None, + schedule_store: None, } } @@ -54,4 +56,10 @@ impl AppState { self.rlm_engine = Some(rlm_engine); self } + + /// Add schedule store to state + pub fn with_schedule_store(mut self, store: Arc) -> Self { + self.schedule_store = Some(store); + self + } } diff --git a/crates/vapora-backend/src/api/workflow_orchestrator.rs b/crates/vapora-backend/src/api/workflow_orchestrator.rs index 2b1ebd3..352093f 100644 --- a/crates/vapora-backend/src/api/workflow_orchestrator.rs +++ b/crates/vapora-backend/src/api/workflow_orchestrator.rs @@ -290,6 +290,7 @@ mod tests { approval_required: false, compensation_agents: None, }], + schedule: None, }; let instance = WorkflowInstance::new(&config, serde_json::json!({})); diff --git a/crates/vapora-backend/src/main.rs b/crates/vapora-backend/src/main.rs index 6ef3bc6..dcfae38 100644 --- a/crates/vapora-backend/src/main.rs +++ b/crates/vapora-backend/src/main.rs @@ -19,6 +19,7 @@ use clap::Parser; use tower_http::cors::{Any, CorsLayer}; use tracing::{info, Level}; use vapora_swarm::{SwarmCoordinator, SwarmMetrics}; +use vapora_workflow_engine::ScheduleStore; use crate::api::AppState; use crate::config::Config; @@ -104,6 +105,10 @@ async fn main() -> Result<()> { )?); info!("RLM engine initialized for Phase 8"); + // Initialize schedule store (backed by the same SurrealDB connection) + let schedule_store = Arc::new(ScheduleStore::new(Arc::new(db.clone()))); + info!("ScheduleStore initialized for autonomous scheduling"); + // Create application state let app_state = AppState::new( project_service, @@ -112,7 +117,8 @@ async fn main() -> Result<()> { proposal_service, provider_analytics_service, ) - .with_rlm_engine(rlm_engine); + .with_rlm_engine(rlm_engine) + .with_schedule_store(schedule_store); // Create SwarmMetrics for Prometheus monitoring let metrics = match SwarmMetrics::new() { @@ -331,6 +337,23 @@ async fn main() -> Result<()> { .route("/api/v1/rlm/documents", post(api::rlm::load_document)) .route("/api/v1/rlm/query", post(api::rlm::query_document)) .route("/api/v1/rlm/analyze", post(api::rlm::analyze_document)) + // Schedule endpoints + .route("/api/v1/schedules", get(api::schedules::list_schedules)) + .route( + "/api/v1/schedules/:id", + get(api::schedules::get_schedule) + .put(api::schedules::put_schedule) + .patch(api::schedules::patch_schedule) + .delete(api::schedules::delete_schedule), + ) + .route( + "/api/v1/schedules/:id/runs", + get(api::schedules::list_schedule_runs), + ) + .route( + "/api/v1/schedules/:id/fire", + post(api::schedules::fire_schedule), + ) // Apply CORS, state, and extensions .layer(Extension(swarm_coordinator)) .layer(cors) diff --git a/crates/vapora-workflow-engine/Cargo.toml b/crates/vapora-workflow-engine/Cargo.toml index f42f402..7431518 100644 --- a/crates/vapora-workflow-engine/Cargo.toml +++ b/crates/vapora-workflow-engine/Cargo.toml @@ -50,6 +50,10 @@ surrealdb = { workspace = true } # Authorization cedar-policy = "4.9" +# Scheduling +cron = "0.12" +chrono-tz = "0.10" + [dev-dependencies] mockall = { workspace = true } wiremock = { workspace = true } diff --git a/crates/vapora-workflow-engine/src/config.rs b/crates/vapora-workflow-engine/src/config.rs index 4cc3629..0f59631 100644 --- a/crates/vapora-workflow-engine/src/config.rs +++ b/crates/vapora-workflow-engine/src/config.rs @@ -1,4 +1,5 @@ use std::path::Path; +use std::str::FromStr; use serde::{Deserialize, Serialize}; @@ -24,6 +25,31 @@ pub struct WorkflowConfig { pub name: String, pub trigger: String, pub stages: Vec, + #[serde(default)] + pub schedule: Option, +} + +/// Cron-based scheduling configuration for `trigger = "schedule"` workflows. +#[derive(Debug, Clone, Deserialize)] +pub struct ScheduleConfig { + /// 5-field standard cron (`min hour dom month dow`) or 7-field cron crate + /// format (`sec min hour dom month dow year`). + pub cron: String, + /// IANA timezone identifier for cron evaluation (e.g. + /// `"America/New_York"`). Defaults to UTC when absent. + #[serde(default)] + pub timezone: Option, + /// Allow a new instance to start while a previous one is still running. + #[serde(default)] + pub allow_concurrent: bool, + /// Fire all missed slots on restart (capped at 10). When false, only the + /// next slot is fired and missed slots are discarded. + #[serde(default)] + pub catch_up: bool, + /// Optional JSON object merged into each triggered workflow's initial + /// context. + #[serde(default)] + pub initial_context: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -75,6 +101,13 @@ impl WorkflowsConfig { .into()); } } + + // Trigger/schedule alignment: "schedule" requires a [schedule] block + // and a parseable cron expression (fail-fast rather than silently + // skipping at runtime). + if workflow.trigger == "schedule" { + validate_schedule_config(&workflow.name, &workflow.schedule)?; + } } Ok(()) @@ -85,6 +118,44 @@ impl WorkflowsConfig { } } +fn validate_schedule_config( + workflow_name: &str, + schedule: &Option, +) -> std::result::Result<(), ConfigError> { + let sc = schedule.as_ref().ok_or_else(|| { + ConfigError::Invalid(format!( + "Workflow '{}' has trigger = \"schedule\" but no [schedule] block", + workflow_name + )) + })?; + + // Validate cron expression (inline normalisation avoids circular dep with + // schedule.rs). + let normalized = match sc.cron.split_whitespace().count() { + 5 => format!("0 {} *", sc.cron), + 6 => format!("{} *", sc.cron), + _ => sc.cron.clone(), + }; + cron::Schedule::from_str(&normalized).map_err(|e| { + ConfigError::Invalid(format!( + "Workflow '{}' has invalid cron '{}': {}", + workflow_name, sc.cron, e + )) + })?; + + // Validate timezone when provided. + if let Some(tz) = &sc.timezone { + tz.parse::().map_err(|_| { + ConfigError::Invalid(format!( + "Workflow '{}' has invalid timezone '{}': not a valid IANA identifier", + workflow_name, tz + )) + })?; + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -190,4 +261,89 @@ agents = ["agent2"] assert!(config.get_workflow("workflow_b").is_some()); assert!(config.get_workflow("nonexistent").is_none()); } + + #[test] + fn test_schedule_trigger_valid() { + let toml_str = r#" +[engine] +max_parallel_tasks = 4 +workflow_timeout = 3600 +approval_gates_enabled = false + +[[workflows]] +name = "nightly_analysis" +trigger = "schedule" + +[workflows.schedule] +cron = "0 2 * * *" +allow_concurrent = false +catch_up = false + +[[workflows.stages]] +name = "analyze" +agents = ["analyst"] + "#; + + let config: WorkflowsConfig = toml::from_str(toml_str).unwrap(); + assert!( + config.validate().is_ok(), + "Valid schedule config should pass" + ); + let wf = config.get_workflow("nightly_analysis").unwrap(); + let sc = wf.schedule.as_ref().unwrap(); + assert_eq!(sc.cron, "0 2 * * *"); + assert!(!sc.allow_concurrent); + assert!(!sc.catch_up); + } + + #[test] + fn test_schedule_trigger_missing_block() { + let toml_str = r#" +[engine] +max_parallel_tasks = 4 +workflow_timeout = 3600 +approval_gates_enabled = false + +[[workflows]] +name = "no_schedule_block" +trigger = "schedule" + +[[workflows.stages]] +name = "work" +agents = ["worker"] + "#; + + let config: WorkflowsConfig = toml::from_str(toml_str).unwrap(); + assert!( + config.validate().is_err(), + "schedule trigger without [schedule] block must fail validation" + ); + } + + #[test] + fn test_schedule_trigger_invalid_cron() { + let toml_str = r#" +[engine] +max_parallel_tasks = 4 +workflow_timeout = 3600 +approval_gates_enabled = false + +[[workflows]] +name = "bad_cron" +trigger = "schedule" + +[workflows.schedule] +cron = "not a valid cron expression" + +[[workflows.stages]] +name = "work" +agents = ["worker"] + "#; + + let config: WorkflowsConfig = toml::from_str(toml_str).unwrap(); + assert!( + config.validate().is_err(), + "Invalid cron expression must fail validation" + ); + } } diff --git a/crates/vapora-workflow-engine/src/error.rs b/crates/vapora-workflow-engine/src/error.rs index 9fbccdb..ce1b458 100644 --- a/crates/vapora-workflow-engine/src/error.rs +++ b/crates/vapora-workflow-engine/src/error.rs @@ -49,6 +49,18 @@ pub enum WorkflowError { #[error("Internal error: {0}")] Internal(String), + + #[error("Schedule error: {0}")] + Schedule(#[from] ScheduleError), +} + +#[derive(Error, Debug)] +pub enum ScheduleError { + #[error("Invalid cron expression '{expr}': {reason}")] + InvalidCron { expr: String, reason: String }, + + #[error("Schedule not found: {0}")] + NotFound(String), } #[derive(Error, Debug)] diff --git a/crates/vapora-workflow-engine/src/instance.rs b/crates/vapora-workflow-engine/src/instance.rs index f26b0c6..1d0f73f 100644 --- a/crates/vapora-workflow-engine/src/instance.rs +++ b/crates/vapora-workflow-engine/src/instance.rs @@ -226,6 +226,7 @@ mod tests { compensation_agents: None, }, ], + schedule: None, } } diff --git a/crates/vapora-workflow-engine/src/lib.rs b/crates/vapora-workflow-engine/src/lib.rs index abe5eab..8b2ae70 100644 --- a/crates/vapora-workflow-engine/src/lib.rs +++ b/crates/vapora-workflow-engine/src/lib.rs @@ -31,15 +31,25 @@ pub mod metrics; pub mod orchestrator; pub mod persistence; pub mod saga; +pub mod schedule; +pub mod schedule_store; +pub mod scheduler; pub mod stage; pub use artifact::{Artifact, ArtifactType}; pub use auth::CedarAuthorizer; -pub use config::{EngineConfig, StageConfig, WorkflowConfig, WorkflowsConfig}; -pub use error::{ConfigError, Result, WorkflowError}; +pub use config::{EngineConfig, ScheduleConfig, StageConfig, WorkflowConfig, WorkflowsConfig}; +pub use error::{ConfigError, Result, ScheduleError, WorkflowError}; pub use instance::{WorkflowInstance, WorkflowStatus}; pub use metrics::WorkflowMetrics; pub use orchestrator::WorkflowOrchestrator; pub use persistence::SurrealWorkflowStore; pub use saga::SagaCompensator; +pub use schedule::{ + compute_next_fire_after, compute_next_fire_after_tz, compute_next_fire_at, + compute_next_fire_at_tz, validate_cron_expression, validate_timezone, RunStatus, ScheduleRun, + ScheduledWorkflow, +}; +pub use schedule_store::ScheduleStore; +pub use scheduler::WorkflowScheduler; pub use stage::{StageState, StageStatus, TaskState, TaskStatus}; diff --git a/crates/vapora-workflow-engine/src/metrics.rs b/crates/vapora-workflow-engine/src/metrics.rs index d999866..e73e67c 100644 --- a/crates/vapora-workflow-engine/src/metrics.rs +++ b/crates/vapora-workflow-engine/src/metrics.rs @@ -10,6 +10,11 @@ pub struct WorkflowMetrics { pub active_workflows: IntGauge, pub stage_duration_seconds: Histogram, pub workflow_duration_seconds: Histogram, + // Scheduling subsystem + pub schedules_fired: Counter, + pub schedules_skipped: Counter, + pub schedules_failed: Counter, + pub active_schedules: IntGauge, } impl WorkflowMetrics { @@ -46,6 +51,22 @@ impl WorkflowMetrics { ) .buckets(vec![60.0, 300.0, 600.0, 1800.0, 3600.0]), )?, + schedules_fired: register_counter!( + "vapora_schedules_fired_total", + "Total schedule fires that launched a workflow" + )?, + schedules_skipped: register_counter!( + "vapora_schedules_skipped_total", + "Total schedule fires skipped due to allow_concurrent=false" + )?, + schedules_failed: register_counter!( + "vapora_schedules_failed_total", + "Total schedule fires that failed to start a workflow" + )?, + active_schedules: register_int_gauge!( + "vapora_active_schedules", + "Number of enabled scheduled workflows" + )?, }) } @@ -57,6 +78,10 @@ impl WorkflowMetrics { registry.register(Box::new(self.active_workflows.clone()))?; registry.register(Box::new(self.stage_duration_seconds.clone()))?; registry.register(Box::new(self.workflow_duration_seconds.clone()))?; + registry.register(Box::new(self.schedules_fired.clone()))?; + registry.register(Box::new(self.schedules_skipped.clone()))?; + registry.register(Box::new(self.schedules_failed.clone()))?; + registry.register(Box::new(self.active_schedules.clone()))?; Ok(()) } } diff --git a/crates/vapora-workflow-engine/src/orchestrator.rs b/crates/vapora-workflow-engine/src/orchestrator.rs index eaf894c..e5cd78a 100644 --- a/crates/vapora-workflow-engine/src/orchestrator.rs +++ b/crates/vapora-workflow-engine/src/orchestrator.rs @@ -6,6 +6,7 @@ use futures::StreamExt; use serde_json::Value; use surrealdb::engine::remote::ws::Client; use surrealdb::Surreal; +use tokio::sync::watch; use tracing::{debug, error, info, warn}; use vapora_agents::messages::{AgentMessage, TaskCompleted, TaskFailed}; use vapora_knowledge_graph::persistence::KGPersistence; @@ -19,6 +20,9 @@ use crate::instance::{WorkflowInstance, WorkflowStatus}; use crate::metrics::WorkflowMetrics; use crate::persistence::SurrealWorkflowStore; use crate::saga::SagaCompensator; +use crate::schedule::ScheduledWorkflow; +use crate::schedule_store::ScheduleStore; +use crate::scheduler::WorkflowScheduler; use crate::stage::{StageState, StageStatus, TaskState}; pub struct WorkflowOrchestrator { @@ -653,6 +657,49 @@ impl WorkflowOrchestrator { Ok(()) } + /// Returns true if any non-terminal workflow instance is running for the + /// given template name. Used by `WorkflowScheduler` to enforce + /// `allow_concurrent = false`. + pub fn has_active_workflow_for_template(&self, template_name: &str) -> bool { + self.active_workflows + .iter() + .any(|e| e.value().template_name == template_name) + } + + /// Build a `WorkflowScheduler` seeded from the TOML-configured scheduled + /// workflows. + /// + /// TOML is the source of truth for static config; the DB owns runtime state + /// (`last_fired_at`, `runs_count`, `next_fire_at`). The returned + /// `watch::Sender` drives graceful shutdown: `sender.send(true)` + /// terminates the scheduler loop. + pub async fn build_scheduler( + self: Arc, + db: Arc>, + ) -> Result<(WorkflowScheduler, watch::Sender)> { + let store = Arc::new(ScheduleStore::new(Arc::clone(&db))); + + for wf in self + .config + .workflows + .iter() + .filter(|w| w.trigger == "schedule") + { + if let Some(sc) = &wf.schedule { + let entry = ScheduledWorkflow::from_config(&wf.name, sc); + store.upsert(&entry).await?; + } + } + + let (shutdown_tx, shutdown_rx) = watch::channel(false); + let nats = Some(Arc::clone(&self.nats)); + let metrics = Arc::clone(&self.metrics); + let orchestrator = Arc::clone(&self); + let scheduler = WorkflowScheduler::new(store, orchestrator, nats, metrics, shutdown_rx); + + Ok((scheduler, shutdown_tx)) + } + pub fn list_templates(&self) -> Vec { self.config .workflows diff --git a/crates/vapora-workflow-engine/src/schedule.rs b/crates/vapora-workflow-engine/src/schedule.rs new file mode 100644 index 0000000..c402466 --- /dev/null +++ b/crates/vapora-workflow-engine/src/schedule.rs @@ -0,0 +1,248 @@ +use std::str::FromStr; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +use crate::config::ScheduleConfig; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum RunStatus { + Fired, + Skipped, + Failed, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ScheduleRun { + pub id: String, + pub schedule_id: String, + pub workflow_instance_id: Option, + pub fired_at: DateTime, + pub status: RunStatus, + pub notes: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ScheduledWorkflow { + pub id: String, + pub template_name: String, + pub cron_expression: String, + pub initial_context: serde_json::Value, + pub enabled: bool, + pub allow_concurrent: bool, + pub catch_up: bool, + /// IANA timezone identifier for cron evaluation (e.g. "America/New_York"). + /// When `None`, UTC is used. + #[serde(default)] + pub timezone: Option, + pub last_fired_at: Option>, + pub next_fire_at: Option>, + pub runs_count: u64, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +impl ScheduledWorkflow { + /// Build from TOML config, deriving a stable ID from the template name + /// so repeated calls to `store.upsert` are idempotent. + pub fn from_config(template_name: &str, sc: &ScheduleConfig) -> Self { + let next_fire_at = compute_next_fire_at_tz(&sc.cron, sc.timezone.as_deref()); + + Self { + id: template_name.to_string(), + template_name: template_name.to_string(), + cron_expression: sc.cron.clone(), + initial_context: sc + .initial_context + .clone() + .unwrap_or(serde_json::Value::Object(Default::default())), + enabled: true, + allow_concurrent: sc.allow_concurrent, + catch_up: sc.catch_up, + timezone: sc.timezone.clone(), + last_fired_at: None, + next_fire_at, + runs_count: 0, + created_at: Utc::now(), + updated_at: Utc::now(), + } + } + + /// True when the scheduled fire time has arrived or passed. + pub fn is_due(&self, now: DateTime) -> bool { + self.next_fire_at.is_some_and(|t| t <= now) + } +} + +/// Normalise a cron expression to the 7-field format required by the `cron` +/// crate (`sec min hour dom month dow year`). +/// +/// - 5-field standard shell cron (`min hour dom month dow`) → prepend `0` +/// (seconds) and append `*` (any year). +/// - 6-field (`sec min hour dom month dow`) → append `*` (any year). +/// - 7-field → unchanged. +pub(crate) fn normalize_cron(expr: &str) -> String { + match expr.split_whitespace().count() { + 5 => format!("0 {} *", expr), + 6 => format!("{} *", expr), + _ => expr.to_string(), + } +} + +/// Validate a cron expression (5-field, 6-field, or 7-field). +/// +/// Returns `Ok(())` if parseable, `Err(description)` otherwise. +pub fn validate_cron_expression(expr: &str) -> Result<(), String> { + let normalized = normalize_cron(expr); + cron::Schedule::from_str(&normalized) + .map(|_| ()) + .map_err(|e| e.to_string()) +} + +/// Validate an IANA timezone string (e.g. `"America/New_York"`). +/// +/// Returns `Ok(())` when recognised, `Err(description)` otherwise. +pub fn validate_timezone(tz: &str) -> Result<(), String> { + tz.parse::() + .map(|_| ()) + .map_err(|_| format!("'{}' is not a valid IANA timezone identifier", tz)) +} + +/// Compute the next scheduled fire time after UTC now, evaluated in `tz`. +/// +/// Returns `None` if the expression is invalid or has no future occurrences. +/// When `tz` is `None`, UTC is used. +pub fn compute_next_fire_at_tz(expr: &str, tz: Option<&str>) -> Option> { + let normalized = normalize_cron(expr); + let schedule = cron::Schedule::from_str(&normalized).ok()?; + match tz.and_then(|s| s.parse::().ok()) { + Some(tz) => schedule.upcoming(tz).next().map(|t| t.with_timezone(&Utc)), + None => schedule.upcoming(Utc).next(), + } +} + +/// Compute the next scheduled fire time after `after`, evaluated in `tz`. +/// +/// When `tz` is `None`, UTC is used. +pub fn compute_next_fire_after_tz( + expr: &str, + after: &DateTime, + tz: Option<&str>, +) -> Option> { + let normalized = normalize_cron(expr); + let schedule = cron::Schedule::from_str(&normalized).ok()?; + match tz.and_then(|s| s.parse::().ok()) { + Some(tz) => schedule + .after(&after.with_timezone(&tz)) + .next() + .map(|t| t.with_timezone(&Utc)), + None => schedule.after(after).next(), + } +} + +/// UTC convenience wrapper — delegates to `compute_next_fire_at_tz` with no +/// timezone. +pub fn compute_next_fire_at(expr: &str) -> Option> { + compute_next_fire_at_tz(expr, None) +} + +/// UTC convenience wrapper — delegates to `compute_next_fire_after_tz` with no +/// timezone. +pub fn compute_next_fire_after(expr: &str, after: &DateTime) -> Option> { + compute_next_fire_after_tz(expr, after, None) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_run_status_serde() { + for status in [RunStatus::Fired, RunStatus::Skipped, RunStatus::Failed] { + let json = serde_json::to_string(&status).unwrap(); + let round_tripped: RunStatus = serde_json::from_str(&json).unwrap(); + assert_eq!(status, round_tripped); + } + } + + #[test] + fn test_schedule_is_due() { + let now = Utc::now(); + let past = now - chrono::Duration::seconds(60); + let future = now + chrono::Duration::seconds(60); + + let mut s = ScheduledWorkflow { + id: "test".to_string(), + template_name: "test".to_string(), + cron_expression: "0 * * * *".to_string(), + initial_context: serde_json::json!({}), + enabled: true, + allow_concurrent: false, + catch_up: false, + timezone: None, + last_fired_at: None, + next_fire_at: None, + runs_count: 0, + created_at: now, + updated_at: now, + }; + + assert!(!s.is_due(now), "None next_fire_at should not be due"); + + s.next_fire_at = Some(past); + assert!(s.is_due(now), "Past next_fire_at should be due"); + + s.next_fire_at = Some(now); + assert!(s.is_due(now), "Exact now should be due"); + + s.next_fire_at = Some(future); + assert!(!s.is_due(now), "Future next_fire_at should not be due"); + } + + #[test] + fn test_validate_timezone_valid() { + assert!(validate_timezone("America/New_York").is_ok()); + assert!(validate_timezone("Europe/London").is_ok()); + assert!(validate_timezone("Asia/Tokyo").is_ok()); + assert!(validate_timezone("UTC").is_ok()); + } + + #[test] + fn test_validate_timezone_invalid() { + assert!(validate_timezone("Not/ATimezone").is_err()); + assert!(validate_timezone("New York").is_err()); + assert!(validate_timezone("").is_err()); + } + + #[test] + fn test_compute_next_fire_at_tz_utc() { + // UTC fallback: any future result is valid + let result = compute_next_fire_at_tz("0 9 * * *", None); + assert!(result.is_some(), "should produce a future time"); + assert!(result.unwrap() > Utc::now()); + } + + #[test] + fn test_compute_next_fire_at_tz_named() { + let result_tz = compute_next_fire_at_tz("0 9 * * *", Some("America/New_York")); + let result_utc = compute_next_fire_at_tz("0 9 * * *", None); + // Both must be Some and in the future. + assert!(result_tz.is_some()); + assert!(result_utc.is_some()); + // They represent 09:00 in different timezones, so they must differ + // (EST = UTC-5, EDT = UTC-4; New_York 09:00 != UTC 09:00). + assert_ne!( + result_tz.unwrap(), + result_utc.unwrap(), + "timezone-aware result must differ from UTC result" + ); + } + + #[test] + fn test_compute_next_fire_at_tz_invalid_tz_fallback() { + // Unknown timezone is silently treated as UTC (parse fails, match hits None + // arm). + let result = compute_next_fire_at_tz("0 9 * * *", Some("Invalid/Zone")); + assert!(result.is_some(), "falls back to UTC on unknown timezone"); + } +} diff --git a/crates/vapora-workflow-engine/src/schedule_store.rs b/crates/vapora-workflow-engine/src/schedule_store.rs new file mode 100644 index 0000000..b85f9c8 --- /dev/null +++ b/crates/vapora-workflow-engine/src/schedule_store.rs @@ -0,0 +1,384 @@ +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use surrealdb::engine::remote::ws::Client; +use surrealdb::Surreal; +use tracing::debug; + +use crate::error::{Result, WorkflowError}; +use crate::schedule::{ScheduleRun, ScheduledWorkflow}; + +/// Persists `ScheduledWorkflow` and `ScheduleRun` records to SurrealDB. +/// +/// Follows the same injection pattern as `SurrealWorkflowStore` — receives the +/// shared DB connection; does not create its own. Tables are defined by +/// migration 010. +pub struct ScheduleStore { + db: Arc>, +} + +impl ScheduleStore { + pub fn new(db: Arc>) -> Self { + Self { db } + } + + /// Idempotent upsert for a TOML-seeded schedule. + /// + /// - First call: creates the record with full initial state (including + /// computed `next_fire_at`). + /// - Subsequent calls: merges only static config fields (`cron_expression`, + /// `allow_concurrent`, `catch_up`, `initial_context`), leaving runtime + /// state (`last_fired_at`, `next_fire_at`, `runs_count`) intact. + pub async fn upsert(&self, s: &ScheduledWorkflow) -> Result<()> { + let existing: Option = self + .db + .select(("scheduled_workflows", &*s.id)) + .await + .map_err(|e| WorkflowError::DatabaseError(format!("check schedule {}: {e}", s.id)))?; + + if existing.is_none() { + let json = serde_json::to_value(s).map_err(|e| { + WorkflowError::DatabaseError(format!("serialize schedule {}: {e}", s.id)) + })?; + let _: Option = self + .db + .create(("scheduled_workflows", &*s.id)) + .content(json) + .await + .map_err(|e| { + WorkflowError::DatabaseError(format!("create schedule {}: {e}", s.id)) + })?; + debug!(schedule_id = %s.id, template = %s.template_name, "Schedule created"); + } else { + let _: Option = self + .db + .update(("scheduled_workflows", &*s.id)) + .merge(serde_json::json!({ + "cron_expression": s.cron_expression, + "allow_concurrent": s.allow_concurrent, + "catch_up": s.catch_up, + "initial_context": s.initial_context, + "timezone": s.timezone, + "updated_at": Utc::now(), + })) + .await + .map_err(|e| { + WorkflowError::DatabaseError(format!("merge schedule {}: {e}", s.id)) + })?; + debug!(schedule_id = %s.id, template = %s.template_name, "Schedule config merged"); + } + + Ok(()) + } + + /// Load all enabled schedules. Filters in Rust to avoid complex SurrealQL + /// against serde-tagged enums (same rationale as `SurrealWorkflowStore`). + pub async fn load_enabled(&self) -> Result> { + let mut response = self + .db + .query("SELECT * FROM scheduled_workflows") + .await + .map_err(|e| WorkflowError::DatabaseError(format!("load_enabled query: {e}")))?; + + let raw: Vec = response + .take(0) + .map_err(|e| WorkflowError::DatabaseError(format!("load_enabled take: {e}")))?; + + let schedules: Vec = raw + .into_iter() + .filter_map(|v| serde_json::from_value(v).ok()) + .collect(); + + Ok(schedules.into_iter().filter(|s| s.enabled).collect()) + } + + /// Atomically advance runtime state after one or more fires. + /// + /// Uses `runs_count = runs_count + 1` in SurrealQL to avoid a read-modify- + /// write race when multiple scheduler instances run (future distributed + /// deployments). + pub async fn update_after_fire( + &self, + id: &str, + fired_at: DateTime, + next_fire_at: Option>, + ) -> Result<()> { + self.db + .query( + "UPDATE type::thing('scheduled_workflows', $id) SET last_fired_at = $fired_at, \ + next_fire_at = $next_fire_at, runs_count = runs_count + 1, updated_at = \ + time::now()", + ) + .bind(("id", id.to_string())) + .bind(("fired_at", fired_at)) + .bind(("next_fire_at", next_fire_at)) + .await + .map_err(|e| WorkflowError::DatabaseError(format!("update_after_fire {id}: {e}")))?; + + debug!(schedule_id = %id, "Schedule runtime state updated"); + Ok(()) + } + + /// Load a single schedule by its ID. + pub async fn load_one(&self, id: &str) -> Result> { + let raw: Option = self + .db + .select(("scheduled_workflows", id)) + .await + .map_err(|e| WorkflowError::DatabaseError(format!("load_one schedule {id}: {e}")))?; + + raw.map(|v| { + serde_json::from_value(v).map_err(|e| { + WorkflowError::DatabaseError(format!("deserialize schedule {id}: {e}")) + }) + }) + .transpose() + } + + /// Load all schedules (enabled and disabled). + pub async fn load_all(&self) -> Result> { + let mut response = self + .db + .query("SELECT * FROM scheduled_workflows") + .await + .map_err(|e| WorkflowError::DatabaseError(format!("load_all query: {e}")))?; + + let raw: Vec = response + .take(0) + .map_err(|e| WorkflowError::DatabaseError(format!("load_all take: {e}")))?; + + Ok(raw + .into_iter() + .filter_map(|v| serde_json::from_value(v).ok()) + .collect()) + } + + /// Full-replace upsert for `PUT` semantics. + /// + /// Replaces all config fields and recomputes `next_fire_at`. Preserves + /// `last_fired_at` and `runs_count` from the existing record so PUT + /// doesn't erase operational history. + pub async fn full_upsert(&self, s: &ScheduledWorkflow) -> Result<()> { + let existing: Option = self + .db + .select(("scheduled_workflows", &*s.id)) + .await + .map_err(|e| WorkflowError::DatabaseError(format!("check schedule {}: {e}", s.id)))?; + + if existing.is_none() { + let json = serde_json::to_value(s).map_err(|e| { + WorkflowError::DatabaseError(format!("serialize schedule {}: {e}", s.id)) + })?; + let _: Option = self + .db + .create(("scheduled_workflows", &*s.id)) + .content(json) + .await + .map_err(|e| { + WorkflowError::DatabaseError(format!("create schedule {}: {e}", s.id)) + })?; + } else { + // Replace all config fields but preserve operational counters. + let _: Option = self + .db + .update(("scheduled_workflows", &*s.id)) + .merge(serde_json::json!({ + "template_name": s.template_name, + "cron_expression": s.cron_expression, + "initial_context": s.initial_context, + "enabled": s.enabled, + "allow_concurrent": s.allow_concurrent, + "catch_up": s.catch_up, + "timezone": s.timezone, + "next_fire_at": s.next_fire_at, + "updated_at": Utc::now(), + })) + .await + .map_err(|e| { + WorkflowError::DatabaseError(format!("full_upsert schedule {}: {e}", s.id)) + })?; + } + debug!(schedule_id = %s.id, "Schedule full-upserted"); + Ok(()) + } + + /// Partial update: only touches the fields provided (PATCH semantics). + /// + /// If `cron_expression` changes, the caller must compute and pass the new + /// `next_fire_at`. + pub async fn patch( + &self, + id: &str, + patch: serde_json::Value, + ) -> Result> { + let _: Option = self + .db + .update(("scheduled_workflows", id)) + .merge(patch) + .await + .map_err(|e| WorkflowError::DatabaseError(format!("patch schedule {id}: {e}")))?; + + self.load_one(id).await + } + + /// Delete a schedule definition permanently. + pub async fn delete(&self, id: &str) -> Result<()> { + let _: Option = self + .db + .delete(("scheduled_workflows", id)) + .await + .map_err(|e| WorkflowError::DatabaseError(format!("delete schedule {id}: {e}")))?; + + debug!(schedule_id = %id, "Schedule deleted"); + Ok(()) + } + + /// Load the run history for a specific schedule, ordered by fired_at desc. + pub async fn load_runs(&self, schedule_id: &str) -> Result> { + let mut response = self + .db + .query( + "SELECT * FROM schedule_runs WHERE schedule_id = $schedule_id ORDER BY fired_at \ + DESC LIMIT 100", + ) + .bind(("schedule_id", schedule_id.to_string())) + .await + .map_err(|e| WorkflowError::DatabaseError(format!("load_runs query: {e}")))?; + + let raw: Vec = response + .take(0) + .map_err(|e| WorkflowError::DatabaseError(format!("load_runs take: {e}")))?; + + Ok(raw + .into_iter() + .filter_map(|v| serde_json::from_value(v).ok()) + .collect()) + } + + /// Attempt to acquire an exclusive fire-lock for distributed deployments. + /// + /// Uses a conditional SurrealDB UPDATE: succeeds only when the record has + /// no current lock holder, or the existing lock has expired (> 120 s old). + /// Returns `true` if the lock was acquired by this call, `false` if another + /// instance already holds it. + /// + /// The 120-second TTL means a crashed instance releases its lock + /// automatically on the next scheduler tick after that window. + pub async fn try_acquire_fire_lock( + &self, + id: &str, + instance_id: &str, + now: &chrono::DateTime, + ) -> Result { + let expiry = *now - chrono::Duration::seconds(120); + let mut resp = self + .db + .query( + "UPDATE type::thing('scheduled_workflows', $id) SET locked_by = $instance_id, \ + locked_at = $now WHERE locked_by IS NONE OR locked_at < $expiry", + ) + .bind(("id", id.to_string())) + .bind(("instance_id", instance_id.to_string())) + .bind(("now", *now)) + .bind(("expiry", expiry)) + .await + .map_err(|e| { + WorkflowError::DatabaseError(format!("try_acquire_fire_lock {id}: {e}")) + })?; + + let records: Vec = resp.take(0).map_err(|e| { + WorkflowError::DatabaseError(format!("try_acquire_fire_lock take {id}: {e}")) + })?; + + Ok(!records.is_empty()) + } + + /// Release the fire-lock held by `instance_id`. + /// + /// The WHERE guard ensures an instance can only release its own lock, + /// preventing a slow instance from accidentally clearing another's lock + /// after its TTL expired and a second instance re-acquired it. + pub async fn release_fire_lock(&self, id: &str, instance_id: &str) -> Result<()> { + self.db + .query( + "UPDATE type::thing('scheduled_workflows', $id) SET locked_by = NONE, locked_at = \ + NONE WHERE locked_by = $instance_id", + ) + .bind(("id", id.to_string())) + .bind(("instance_id", instance_id.to_string())) + .await + .map_err(|e| WorkflowError::DatabaseError(format!("release_fire_lock {id}: {e}")))?; + + debug!(schedule_id = %id, instance_id = %instance_id, "Fire lock released"); + Ok(()) + } + + /// Append a run record to the immutable audit log. + pub async fn record_run(&self, run: &ScheduleRun) -> Result<()> { + let json = serde_json::to_value(run) + .map_err(|e| WorkflowError::DatabaseError(format!("serialize run {}: {e}", run.id)))?; + + let _: Option = self + .db + .create(("schedule_runs", &*run.id)) + .content(json) + .await + .map_err(|e| WorkflowError::DatabaseError(format!("record_run {}: {e}", run.id)))?; + + debug!( + run_id = %run.id, + schedule = %run.schedule_id, + status = ?run.status, + "Schedule run recorded" + ); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::schedule::RunStatus; + + #[test] + fn test_schedule_serialization() { + let now = Utc::now(); + let s = ScheduledWorkflow { + id: "nightly_analysis".to_string(), + template_name: "nightly_analysis".to_string(), + cron_expression: "0 2 * * *".to_string(), + initial_context: serde_json::json!({"env": "prod"}), + enabled: true, + allow_concurrent: false, + catch_up: false, + timezone: None, + last_fired_at: Some(now), + next_fire_at: Some(now + chrono::Duration::hours(24)), + runs_count: 42, + created_at: now, + updated_at: now, + }; + + let json = serde_json::to_string(&s).expect("serialize"); + let decoded: ScheduledWorkflow = serde_json::from_str(&json).expect("deserialize"); + + assert_eq!(decoded.id, s.id); + assert_eq!(decoded.template_name, s.template_name); + assert_eq!(decoded.cron_expression, s.cron_expression); + assert_eq!(decoded.runs_count, 42); + assert_eq!(decoded.allow_concurrent, false); + assert_eq!(decoded.catch_up, false); + + let run = ScheduleRun { + id: uuid::Uuid::new_v4().to_string(), + schedule_id: s.id.clone(), + workflow_instance_id: Some("wf-abc".to_string()), + fired_at: now, + status: RunStatus::Fired, + notes: None, + }; + let run_json = serde_json::to_string(&run).expect("serialize run"); + let decoded_run: ScheduleRun = serde_json::from_str(&run_json).expect("deserialize run"); + assert_eq!(decoded_run.status, RunStatus::Fired); + } +} diff --git a/crates/vapora-workflow-engine/src/scheduler.rs b/crates/vapora-workflow-engine/src/scheduler.rs new file mode 100644 index 0000000..83abcca --- /dev/null +++ b/crates/vapora-workflow-engine/src/scheduler.rs @@ -0,0 +1,388 @@ +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; + +use chrono::{DateTime, Utc}; +use tokio::sync::watch; +use tokio::time::MissedTickBehavior; +use tracing::{error, info, warn}; + +use crate::error::{Result, ScheduleError, WorkflowError}; +use crate::metrics::WorkflowMetrics; +use crate::orchestrator::WorkflowOrchestrator; +use crate::schedule::{ + compute_next_fire_after_tz, compute_next_fire_at_tz, normalize_cron, RunStatus, ScheduleRun, + ScheduledWorkflow, +}; +use crate::schedule_store::ScheduleStore; + +pub struct WorkflowScheduler { + store: Arc, + orchestrator: Arc, + nats: Option>, + metrics: Arc, + tick_interval: Duration, + shutdown: watch::Receiver, + /// UUID identifying this process instance. Used as the lock owner in + /// distributed deployments to prevent double-fires across instances. + instance_id: String, +} + +impl WorkflowScheduler { + pub fn new( + store: Arc, + orchestrator: Arc, + nats: Option>, + metrics: Arc, + shutdown: watch::Receiver, + ) -> Self { + Self { + store, + orchestrator, + nats, + metrics, + tick_interval: Duration::from_secs(30), + shutdown, + instance_id: uuid::Uuid::new_v4().to_string(), + } + } + + /// Override the default 30-second tick interval (useful in tests). + pub fn with_tick_interval(mut self, interval: Duration) -> Self { + self.tick_interval = interval; + self + } + + /// Drive the scheduling loop until the shutdown signal fires. + /// + /// Call from a `tokio::spawn` — the returned future completes when + /// `watch::Sender::send(true)` is called on the paired sender. + pub async fn run(self: Arc) { + let mut shutdown = self.shutdown.clone(); + let mut interval = tokio::time::interval(self.tick_interval); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + info!( + instance_id = %self.instance_id, + tick = ?self.tick_interval, + "WorkflowScheduler started" + ); + + loop { + tokio::select! { + changed = shutdown.changed() => { + if changed.is_ok() && *shutdown.borrow() { + info!(instance_id = %self.instance_id, "WorkflowScheduler shutting down"); + break; + } + } + _ = interval.tick() => { + let now = Utc::now(); + match self.store.load_enabled().await { + Ok(schedules) => { + for s in schedules.into_iter().filter(|s| s.is_due(now)) { + let scheduler = Arc::clone(&self); + let s = s.clone(); + tokio::spawn(async move { + if let Err(e) = scheduler.fire_schedule(&s, now).await { + error!( + schedule_id = %s.id, + template = %s.template_name, + error = %e, + "Schedule fire error" + ); + } + }); + } + } + Err(e) => error!(error = %e, "Failed to load enabled schedules"), + } + } + } + } + } + + async fn fire_schedule(&self, s: &ScheduledWorkflow, now: DateTime) -> Result<()> { + // Acquire distributed lock — prevents double-fires in multi-instance + // deployments. Lock has a 120-second TTL enforced by the store. + let acquired = self + .store + .try_acquire_fire_lock(&s.id, &self.instance_id, &now) + .await?; + + if !acquired { + info!( + schedule_id = %s.id, + template = %s.template_name, + instance_id = %self.instance_id, + "Fire lock held by another instance — skipping this tick" + ); + self.metrics.schedules_skipped.inc(); + return Ok(()); + } + + let result = self.fire_with_lock(s, now).await; + + // Release the lock unconditionally. A failure here is non-fatal since + // the TTL will expire automatically after 120 s. + if let Err(e) = self.store.release_fire_lock(&s.id, &self.instance_id).await { + warn!( + schedule_id = %s.id, + instance_id = %self.instance_id, + error = %e, + "Failed to release schedule fire lock (TTL will expire automatically)" + ); + } + + result + } + + /// Execute a single schedule fire — called after the distributed lock is + /// held. + async fn fire_with_lock(&self, s: &ScheduledWorkflow, now: DateTime) -> Result<()> { + let tz = s.timezone.as_deref(); + + let normalized = normalize_cron(&s.cron_expression); + let cron_schedule = cron::Schedule::from_str(&normalized).map_err(|e| { + WorkflowError::Schedule(ScheduleError::InvalidCron { + expr: s.cron_expression.clone(), + reason: e.to_string(), + }) + })?; + + // Concurrency guard: skip if an active workflow for this template exists. + // Now safe to check without races because we hold the distributed lock. + if !s.allow_concurrent + && self + .orchestrator + .has_active_workflow_for_template(&s.template_name) + { + let next_fire_at = compute_next_fire_at_tz(&s.cron_expression, tz); + self.store + .update_after_fire(&s.id, now, next_fire_at) + .await?; + self.store + .record_run(&ScheduleRun { + id: uuid::Uuid::new_v4().to_string(), + schedule_id: s.id.clone(), + workflow_instance_id: None, + fired_at: now, + status: RunStatus::Skipped, + notes: Some("Active workflow for template already exists".to_string()), + }) + .await?; + self.metrics.schedules_skipped.inc(); + info!( + schedule_id = %s.id, + template = %s.template_name, + "Schedule skipped: concurrent run in progress" + ); + return Ok(()); + } + + // Determine which time slots to fire — timezone-aware. + let last = s.last_fired_at.unwrap_or(s.created_at); + let fire_times = compute_fire_times_tz(&cron_schedule, last, now, s.catch_up, tz); + + let mut last_fire_time = now; + for &fire_time in &fire_times { + last_fire_time = fire_time; + match self + .orchestrator + .start_workflow(&s.template_name, s.initial_context.clone()) + .await + { + Ok(workflow_id) => { + self.store + .record_run(&ScheduleRun { + id: uuid::Uuid::new_v4().to_string(), + schedule_id: s.id.clone(), + workflow_instance_id: Some(workflow_id.clone()), + fired_at: fire_time, + status: RunStatus::Fired, + notes: None, + }) + .await?; + self.metrics.schedules_fired.inc(); + info!( + schedule_id = %s.id, + template = %s.template_name, + workflow_id = %workflow_id, + fired_at = %fire_time, + timezone = ?tz, + "Schedule fired" + ); + } + Err(e) => { + error!( + schedule_id = %s.id, + template = %s.template_name, + error = %e, + "Workflow start failed for scheduled fire" + ); + self.store + .record_run(&ScheduleRun { + id: uuid::Uuid::new_v4().to_string(), + schedule_id: s.id.clone(), + workflow_instance_id: None, + fired_at: fire_time, + status: RunStatus::Failed, + notes: Some(e.to_string()), + }) + .await?; + self.metrics.schedules_failed.inc(); + } + } + } + + // Advance the pointer to the next scheduled time (timezone-aware). + let next_fire_at = compute_next_fire_after_tz(&s.cron_expression, &last_fire_time, tz); + self.store + .update_after_fire(&s.id, last_fire_time, next_fire_at) + .await?; + + // Publish NATS event — graceful skip when NATS is unavailable. + if let Some(nats) = &self.nats { + let event = serde_json::json!({ + "type": "schedule_fired", + "schedule_id": s.id, + "template": s.template_name, + "timezone": s.timezone, + "fires": fire_times.len(), + "timestamp": Utc::now().to_rfc3339(), + }); + if let Err(e) = nats + .publish("vapora.schedule.fired", event.to_string().into()) + .await + { + warn!(error = %e, "Failed to publish vapora.schedule.fired"); + } + } + + Ok(()) + } +} + +/// Compute which UTC time slots to fire, optionally in a named timezone. +/// +/// - `catch_up = true`: all missed slots since `last`, capped at 10, converted +/// to UTC. +/// - `catch_up = false`: always exactly one slot (`now`), timezone ignored. +/// +/// When `tz` is `Some` the cron iterator runs in that timezone so that +/// e.g. "every day at 09:00 America/New_York" fires at 14:00 UTC (or 13:00 +/// during EDT). +#[cfg_attr(not(test), allow(dead_code))] +fn compute_fire_times_tz( + schedule: &cron::Schedule, + last: DateTime, + now: DateTime, + catch_up: bool, + tz: Option<&str>, +) -> Vec> { + if !catch_up { + return vec![now]; + } + + let collected: Vec> = match tz.and_then(|s| s.parse::().ok()) { + Some(tz) => schedule + .after(&last.with_timezone(&tz)) + .take_while(|t| t.with_timezone(&Utc) <= now) + .take(10) + .map(|t| t.with_timezone(&Utc)) + .collect(), + None => schedule + .after(&last) + .take_while(|t| t <= &now) + .take(10) + .collect(), + }; + + if collected.is_empty() { + vec![now] + } else { + collected + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::schedule::normalize_cron; + + #[test] + fn test_normalize_cron_5field() { + let result = normalize_cron("0 9 * * 1-5"); + assert_eq!(result, "0 0 9 * * 1-5 *"); + } + + #[test] + fn test_normalize_cron_7field() { + let expr = "0 0 9 * * 1-5 *"; + let result = normalize_cron(expr); + assert_eq!(result, expr); + } + + #[test] + fn test_compute_fires_no_catchup() { + let normalized = normalize_cron("0 0 * * *"); // Every day at midnight + let schedule = cron::Schedule::from_str(&normalized).expect("valid cron"); + + let now = Utc::now(); + let last = now - chrono::Duration::days(3); + + let fires = compute_fire_times_tz(&schedule, last, now, false, None); + // Without catch_up: always exactly one fire at `now`. + assert_eq!(fires.len(), 1); + assert_eq!(fires[0], now); + } + + #[test] + fn test_compute_fires_with_catchup() { + let normalized = normalize_cron("0 0 * * *"); // Every day at midnight + let schedule = cron::Schedule::from_str(&normalized).expect("valid cron"); + + // last fired 5 days ago → should produce up to 5 missed midnight slots. + let now = Utc::now(); + let last = now - chrono::Duration::days(5); + + let fires = compute_fire_times_tz(&schedule, last, now, true, None); + assert!(fires.len() > 1, "catch_up should produce multiple fires"); + assert!(fires.len() <= 10, "catch_up is capped at 10"); + for &t in &fires { + assert!(t <= now, "catch_up fire time must not be in the future"); + } + } + + #[test] + fn test_compute_fires_with_catchup_named_tz() { + let normalized = normalize_cron("0 0 * * *"); // Every day at midnight (in tz) + let schedule = cron::Schedule::from_str(&normalized).expect("valid cron"); + + let now = Utc::now(); + let last = now - chrono::Duration::days(3); + + let fires_utc = compute_fire_times_tz(&schedule, last, now, true, None); + let fires_tz = compute_fire_times_tz(&schedule, last, now, true, Some("America/New_York")); + + // Both produce multiple slots. The UTC times differ because midnight NY + // is 05:00 UTC (or 04:00 EDT) — so the same count but different instants. + assert!(!fires_utc.is_empty()); + assert!(!fires_tz.is_empty()); + if fires_utc.len() == fires_tz.len() && fires_utc.len() > 0 { + // Timestamps must differ (different timezone means different UTC offset). + assert_ne!( + fires_utc[0], fires_tz[0], + "UTC and NY midnight must be different UTC instants" + ); + } + } + + #[test] + fn test_instance_id_is_unique() { + // Each WorkflowScheduler gets a distinct UUID to prevent lock collisions. + let id1 = uuid::Uuid::new_v4().to_string(); + let id2 = uuid::Uuid::new_v4().to_string(); + assert_ne!(id1, id2); + } +} diff --git a/docs/adrs/0034-autonomous-scheduling.md b/docs/adrs/0034-autonomous-scheduling.md new file mode 100644 index 0000000..a2cec67 --- /dev/null +++ b/docs/adrs/0034-autonomous-scheduling.md @@ -0,0 +1,101 @@ +# ADR-0034: Autonomous Cron Scheduling — Timezone Support and Distributed Fire-Lock + +**Status**: Implemented +**Date**: 2026-02-26 +**Deciders**: VAPORA Team +**Technical Story**: `vapora-workflow-engine` scheduler fired cron jobs only in UTC and had no protection against double-fires in multi-instance deployments. + +--- + +## Decision + +Extend the autonomous scheduling subsystem with two independent hardening layers: + +1. **Timezone-aware scheduling** (`chrono-tz`) — cron expressions evaluated in any IANA timezone, stored per-schedule, validated at API and config-load boundaries. +2. **Distributed fire-lock** — SurrealDB conditional `UPDATE ... WHERE locked_by IS NONE OR locked_at < $expiry` provides atomic, TTL-backed mutual exclusion across instances without additional infrastructure. + +--- + +## Context + +### Gaps Addressed + +| Gap | Consequence | +|-----|-------------| +| UTC-only cron evaluation | `"0 9 * * *"` fires at 09:00 UTC regardless of business timezone; scheduled reports or maintenance windows drift by the UTC offset | +| No distributed coordination | Two `vapora-workflow-engine` instances reading the same `scheduled_workflows` table both fire the same schedule at the same tick | + +### Why These Approaches + +**`chrono-tz`** over manual UTC-offset arithmetic: +- Compile-time exhaustive enum of all IANA timezone names — invalid names are rejected at parse time. +- The `cron` crate's `Schedule::upcoming(tz)` / `Schedule::after(&dt_in_tz)` are generic over any `TimeZone`, so timezone-awareness requires no special-casing in iteration logic: pass `DateTime` instead of `DateTime`, convert output with `.with_timezone(&Utc)`. +- DST transitions handled automatically by `chrono-tz` — no application code needed. + +**SurrealDB conditional UPDATE** over external distributed lock (Redis, etcd): +- No additional infrastructure dependency. +- SurrealDB applies document-level write locking; `UPDATE record WHERE condition` is atomic — two concurrent instances race on the same document and only one succeeds (non-empty return array = lock acquired). +- 120-second TTL enforced in application code: `locked_at < $expiry` in the WHERE clause auto-expires a lock from a crashed instance within two scheduler ticks. + +--- + +## Implementation + +### New Fields + +`scheduled_workflows` table gains three columns (migration 011): + +| Field | Type | Purpose | +|-------|------|---------| +| `timezone` | `option` | IANA identifier (`"America/New_York"`) or `NONE` for UTC | +| `locked_by` | `option` | UUID of the instance holding the current fire-lock | +| `locked_at` | `option` | When the lock was acquired; used for TTL expiry | + +### Lock Protocol + +``` +Tick N fires schedule S: + try_acquire_fire_lock(id, instance_id, now) + → UPDATE ... WHERE locked_by IS NONE OR locked_at < (now - 120s) + → returns true (non-empty) or false (empty) + if false: log + inc schedules_skipped, return + fire_with_lock(S, now) ← actual workflow start + release_fire_lock(id, instance_id) + → UPDATE ... WHERE locked_by = instance_id + → own-instance guard prevents stale release +``` + +Lock release is always attempted even on `fire_with_lock` error; a `warn!` is emitted if release fails (TTL provides fallback). + +### Timezone-Aware Cron Evaluation + +``` +compute_fire_times_tz(schedule, last, now, catch_up, tz): + match tz.parse::(): + Some(tz) → schedule.after(&last.with_timezone(&tz)) + .take_while(|t| t.with_timezone(&Utc) <= now) + .map(|t| t.with_timezone(&Utc)) + None → schedule.after(&last) ← UTC +``` + +Parsing an unknown/invalid timezone string silently falls back to UTC — avoids a hard error at runtime if a previously valid TZ identifier is removed from the `chrono-tz` database in a future upgrade. + +### API Surface Changes + +`PUT /api/v1/schedules/:id` and `PATCH /api/v1/schedules/:id` accept and return `timezone: Option`. Timezone is validated at the API boundary using `validate_timezone()` (returns `400 InvalidInput` for unknown identifiers). Config-file `[schedule]` blocks also accept `timezone` and are validated at startup (fail-fast, same as `cron`). + +--- + +## Consequences + +### Positive + +- Schedules expressed in business-local time — no mental UTC arithmetic for operators. +- Multi-instance deployments safe by default; no external lock service required. +- `ScheduledWorkflow.timezone` is nullable/optional — all existing schedules without the field default to UTC with no migration required. + +### Negative / Trade-offs + +- `chrono-tz` adds ~2 MB of IANA timezone data to the binary (compile-time embedded). +- Distributed lock TTL of 120 s means a worst-case window of one double-fire per 120 s if the winning instance crashes between acquiring the lock and calling `update_after_fire`. Acceptable given the `schedule_runs` audit log makes duplicates visible. +- No multi-PATCH for timezone clearance: passing `timezone: null` in JSON is treated as absent (`#[serde(default)]`). Clearing timezone (revert to UTC) requires a full PUT. diff --git a/docs/features/workflow-orchestrator.md b/docs/features/workflow-orchestrator.md index 2f11c0d..721e260 100644 --- a/docs/features/workflow-orchestrator.md +++ b/docs/features/workflow-orchestrator.md @@ -528,11 +528,85 @@ docker logs vapora-backend nats sub "vapora.tasks.>" ``` +## Autonomous Scheduling + +Workflows with `trigger = "schedule"` fire automatically on a cron expression without any REST trigger. + +### TOML Configuration + +```toml +[[workflows]] +name = "nightly_analysis" +trigger = "schedule" + +[workflows.schedule] +cron = "0 2 * * *" # 5-field: min hour dom month dow +timezone = "America/New_York" # IANA identifier; omit for UTC +allow_concurrent = false # skip if previous run is still active +catch_up = false # fire missed slots on restart (capped 10) + +[[workflows.stages]] +name = "analyze" +agents = ["analyst"] +``` + +Cron accepts 5-field (standard shell), 6-field (with seconds), or 7-field (with seconds + year). The expression is validated at config-load time — startup fails on invalid cron or unknown timezone. + +### Schedule REST API + +| Method | Path | Description | +|--------|------|-------------| +| `GET` | `/api/v1/schedules` | List all schedules | +| `GET` | `/api/v1/schedules/:id` | Get one schedule | +| `PUT` | `/api/v1/schedules/:id` | Create or fully replace | +| `PATCH` | `/api/v1/schedules/:id` | Partial update | +| `DELETE` | `/api/v1/schedules/:id` | Remove | +| `GET` | `/api/v1/schedules/:id/runs` | Execution history (last 100) | +| `POST` | `/api/v1/schedules/:id/fire` | Manual trigger bypassing cron | + +**PUT body** (all fields): + +```json +{ + "template_name": "nightly_analysis", + "cron_expression": "0 2 * * *", + "timezone": "America/New_York", + "enabled": true, + "allow_concurrent": false, + "catch_up": false, + "initial_context": {} +} +``` + +**PATCH body** (only changed fields): + +```json +{ "enabled": false } +``` + +### Timezone Support + +`timezone` is an IANA timezone identifier (e.g. `"America/New_York"`, `"Europe/Berlin"`, `"Asia/Tokyo"`). When absent, UTC is used. DST transitions are handled automatically. + +The REST API validates the timezone at the boundary — an unknown identifier returns `400 InvalidInput`. + +### Distributed Fire-Lock + +When multiple VAPORA backend instances run against the same SurrealDB, the scheduler uses a conditional `UPDATE ... WHERE locked_by IS NONE OR locked_at < (now - 120s)` to ensure only one instance fires each schedule per tick. The lock holder is identified by a per-process UUID stored in `locked_by`; it expires automatically after 120 seconds, handling crashed instances. + +### Schedule Metrics (Prometheus) + +- `vapora_schedules_fired_total` — successful fires +- `vapora_schedules_skipped_total` — skipped (concurrent guard or distributed lock contention) +- `vapora_schedules_failed_total` — workflow start failures +- `vapora_active_schedules` — current count (gauge) + ## Related Documentation - [CLI Commands Guide](../setup/cli-commands.md) - Command-line usage - [Multi-Agent Workflows](../architecture/multi-agent-workflows.md) - Architecture overview - [Agent Registry & Coordination](../architecture/agent-registry-coordination.md) - Agent management - [ADR-0028: Workflow Orchestrator](../adrs/0028-workflow-orchestrator.md) - Decision rationale +- [ADR-0034: Autonomous Scheduling](../adrs/0034-autonomous-scheduling.md) - Scheduling design decisions - [ADR-0014: Learning-Based Agent Selection](../adrs/0014-learning-profiles.md) - Agent selection - [ADR-0015: Budget Enforcement](../adrs/0015-budget-enforcement.md) - Cost control diff --git a/migrations/010_scheduled_workflows.surql b/migrations/010_scheduled_workflows.surql new file mode 100644 index 0000000..21f6fda --- /dev/null +++ b/migrations/010_scheduled_workflows.surql @@ -0,0 +1,44 @@ +-- Migration 010: Scheduled Workflow Definitions and Run History +-- Enables autonomous cron-based workflow firing without REST triggers. +-- Two tables: schedule definitions (managed by TOML + DB) and an append-only run log. + +DEFINE TABLE scheduled_workflows SCHEMAFULL; + +DEFINE FIELD id ON TABLE scheduled_workflows TYPE record; +DEFINE FIELD template_name ON TABLE scheduled_workflows TYPE string ASSERT $value != NONE; +DEFINE FIELD cron_expression ON TABLE scheduled_workflows TYPE string ASSERT $value != NONE; +DEFINE FIELD initial_context ON TABLE scheduled_workflows FLEXIBLE TYPE object DEFAULT {}; +DEFINE FIELD enabled ON TABLE scheduled_workflows TYPE bool DEFAULT true; +DEFINE FIELD allow_concurrent ON TABLE scheduled_workflows TYPE bool DEFAULT false; +DEFINE FIELD catch_up ON TABLE scheduled_workflows TYPE bool DEFAULT false; +DEFINE FIELD last_fired_at ON TABLE scheduled_workflows TYPE option DEFAULT NONE; +DEFINE FIELD next_fire_at ON TABLE scheduled_workflows TYPE option DEFAULT NONE; +DEFINE FIELD runs_count ON TABLE scheduled_workflows TYPE int DEFAULT 0; +DEFINE FIELD created_at ON TABLE scheduled_workflows TYPE datetime DEFAULT time::now(); +DEFINE FIELD updated_at ON TABLE scheduled_workflows TYPE datetime DEFAULT time::now() VALUE time::now(); + +DEFINE INDEX idx_scheduled_workflows_template + ON TABLE scheduled_workflows COLUMNS template_name; + +DEFINE INDEX idx_scheduled_workflows_enabled + ON TABLE scheduled_workflows COLUMNS enabled; + +-- Append-only execution history for audit and debugging. +DEFINE TABLE schedule_runs SCHEMAFULL; + +DEFINE FIELD id ON TABLE schedule_runs TYPE record; +DEFINE FIELD schedule_id ON TABLE schedule_runs TYPE string ASSERT $value != NONE; +DEFINE FIELD workflow_instance_id ON TABLE schedule_runs TYPE option DEFAULT NONE; +DEFINE FIELD fired_at ON TABLE schedule_runs TYPE datetime ASSERT $value != NONE; +DEFINE FIELD status ON TABLE schedule_runs TYPE string + ASSERT $value INSIDE ['Fired', 'Skipped', 'Failed']; +DEFINE FIELD notes ON TABLE schedule_runs TYPE option DEFAULT NONE; + +DEFINE INDEX idx_schedule_runs_schedule_id + ON TABLE schedule_runs COLUMNS schedule_id; + +DEFINE INDEX idx_schedule_runs_fired_at + ON TABLE schedule_runs COLUMNS fired_at; + +DEFINE INDEX idx_schedule_runs_schedule_fired + ON TABLE schedule_runs COLUMNS schedule_id, fired_at; diff --git a/migrations/011_schedule_tz_lock.surql b/migrations/011_schedule_tz_lock.surql new file mode 100644 index 0000000..48bc698 --- /dev/null +++ b/migrations/011_schedule_tz_lock.surql @@ -0,0 +1,9 @@ +-- Migration 011: Timezone and Distributed Fire-Lock for Scheduled Workflows +-- Extends the scheduled_workflows table with: +-- timezone — IANA timezone identifier for cron evaluation (e.g. "America/New_York") +-- locked_by — instance UUID holding the current fire lock (prevents double-fires) +-- locked_at — when the lock was acquired; TTL = 120 s is enforced in application code + +DEFINE FIELD timezone ON TABLE scheduled_workflows TYPE option DEFAULT NONE; +DEFINE FIELD locked_by ON TABLE scheduled_workflows TYPE option DEFAULT NONE; +DEFINE FIELD locked_at ON TABLE scheduled_workflows TYPE option DEFAULT NONE;