diff --git a/.gitignore b/.gitignore index 4ac4beb..e6d1c54 100644 --- a/.gitignore +++ b/.gitignore @@ -67,3 +67,4 @@ vendordiff.patch # Generated SBOM files SBOM.*.json *.sbom.json +.claude/settings.local.json diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b2024a..5943392 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,45 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added - Webhook Notification Channels (`vapora-channels`) + +#### `vapora-channels` — new crate + +- `NotificationChannel` trait: single `async fn send(&Message) -> Result<()>` — no vendor SDK dependency +- Three webhook implementations: `SlackChannel` (Incoming Webhook), `DiscordChannel` (Webhook embed), `TelegramChannel` (Bot API `sendMessage`) +- `ChannelRegistry`: name-keyed routing hub; `from_config(HashMap)` resolves secrets at construction time +- `Message { title, body, level }` — four constructors: `info`, `success`, `warning`, `error` +- **Secret resolution built-in**: `${VAR}` / `${VAR:-default}` interpolation via `OnceLock` in `config.rs`; `ChannelError::SecretNotFound` if env var absent and no default — callers cannot bypass resolution +- `ChannelError`: `NotFound`, `ApiError { channel, status, body }`, `SecretNotFound`, `SerializationError` +- 7 unit tests for `interpolate()`: plain string (no-op fast-path), single var, default fallback, missing var error, nested vars, whitespace, multiple vars in one string + +#### `vapora-workflow-engine` — notification hooks + +- `WorkflowNotifications` struct in `config.rs`: `on_stage_complete`, `on_stage_failed`, `on_completed`, `on_cancelled` — each a `Vec` of channel names +- `WorkflowConfig.notifications: WorkflowNotifications` (default: empty, no regression) +- `WorkflowOrchestrator` gains `Option>`; four `notify_*` methods spawn `dispatch_notifications` +- 6 new tests in `tests/notification_config.rs`: config parsing, all four event hooks, empty-targets no-op + +#### `vapora-backend` — event hooks and REST endpoints + +- `Config.channels: HashMap` and `Config.notifications: NotificationConfig` (TOML config) +- `NotificationConfig { on_task_done, on_proposal_approved, on_proposal_rejected }` — per-event channel-name lists +- `AppState` gains `channel_registry: Option>` and `notification_config: Arc` +- `AppState::notify(&[String], Message)` — fire-and-forget; `tokio::spawn(dispatch_notifications(...))` +- `pub(crate) async fn dispatch_notifications(Option>, Vec, Message)` — extracted for testability without DB +- Notification hooks added to three existing handlers: + - `update_task_status` — `Message::success` when `TaskStatus::Done` + - `approve_proposal` — `Message::success` + - `reject_proposal` — `Message::warning` +- New endpoints: `GET /api/v1/channels` (list names), `POST /api/v1/channels/:name/test` (connectivity check) +- 5 unit tests in `state.rs`: `RecordingChannel` + `FailingChannel` test doubles; dispatch no-op, single delivery, multi-channel, resilience after failure, warn on unknown channel + +#### Documentation + +- **ADR-0035**: design rationale for trait-based channels, built-in secret resolution, and fire-and-forget delivery + +--- + ### Added - Autonomous Scheduling: Timezone Support and Distributed Fire-Lock #### `vapora-workflow-engine` — scheduling hardening diff --git a/Cargo.lock b/Cargo.lock index 7ce3b30..c7e291c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12419,6 +12419,7 @@ dependencies = [ "tracing-subscriber", "uuid", "vapora-agents", + "vapora-channels", "vapora-knowledge-graph", "vapora-llm-router", "vapora-rlm", @@ -12429,6 +12430,21 @@ dependencies = [ "wiremock", ] +[[package]] +name = "vapora-channels" +version = "1.2.0" +dependencies = [ + "async-trait", + "regex", + "reqwest 0.13.1", + "serde", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tracing", + "wiremock", +] + [[package]] name = "vapora-cli" version = "1.2.0" @@ -12698,6 +12714,7 @@ dependencies = [ "tracing", "uuid", "vapora-agents", + "vapora-channels", "vapora-knowledge-graph", "vapora-shared", "vapora-swarm", diff --git a/Cargo.toml b/Cargo.toml index 49c8a9d..bd31825 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ resolver = "2" members = [ + "crates/vapora-channels", "crates/vapora-backend", "crates/vapora-frontend", "crates/vapora-leptos-ui", @@ -36,6 +37,7 @@ categories = ["development-tools", "web-programming"] [workspace.dependencies] # Vapora internal crates +vapora-channels = { path = "crates/vapora-channels" } vapora-shared = { path = "crates/vapora-shared" } vapora-leptos-ui = { path = "crates/vapora-leptos-ui" } vapora-agents = { path = "crates/vapora-agents" } diff --git a/assets/web/index.html b/assets/web/index.html index e21de4b..09b6cbc 100644 --- a/assets/web/index.html +++ b/assets/web/index.html @@ -1 +1 @@ - Vapora
✅ v1.2.0 | 354 Tests | 100% Pass Rate
Vapora - Development Orchestration

Evaporate complexity

Development Flows

Specialized agents orchestrate pipelines for design, implementation, testing, documentation and deployment. Agents learn from history and optimize costs automatically.
100% self-hosted.

The 4 Problems It Solves

01

Context Switching

Developers jump between tools constantly. Vapora unifies everything in one intelligent system where context flows.

02

Knowledge Fragmentation

Decisions lost in threads, code scattered, docs unmaintained. RLM (Recursive Language Models) with hybrid search (BM25 + semantic) and chunking makes knowledge discoverable even in 100k+ token documents.

03

Manual Coordination

Orchestrating code review, testing, documentation and deployment manually creates bottlenecks. Multi-agent workflows solve this.

04

Dev-Ops Friction

Handoffs between developers and operations lack visibility and context. Vapora maintains unified deployment readiness.

How It Works

🤖

Specialized Agents

71 tests verify agent orchestration, learning profiles, and task assignment. Agents track expertise per task type with 7-day recency bias (3× weight). Real SurrealDB persistence + NATS coordination.

🧠

Intelligent Orchestration

53 tests verify multi-provider routing (Claude, OpenAI, Gemini, Ollama), per-role budget limits, cost tracking, and automatic fallback chains. Swarm coordination with load-balanced assignment using success_rate / (1 + load) formula.

📚

Recursive Language Models (RLM)

Process 100k+ token documents without context limits. Hybrid search combines BM25 (keywords) + semantic embeddings via RRF fusion. Intelligent chunking (Fixed/Semantic/Code) with SurrealDB persistence. Perfect for large codebases and documentation.

🔗

Agent-to-Agent (A2A) Protocol

Distributed agent coordination with task dispatch, status tracking, and result collection. Real SurrealDB persistence (no in-memory HashMap). NATS messaging for async completion. Exponential backoff retry with circuit breaker. 12 integration tests verify real behavior.

🕸️

Knowledge Graph

Temporal execution history with causal relationships. Learning curves from daily windowed aggregations. Similarity search recommends solutions from past tasks. 20 tests verify graph persistence, learning profiles, and execution tracking.

NATS JetStream

Reliable message delivery for agent coordination. JetStream streams for workflow events, task completion, and status updates. Graceful fallback when NATS unavailable. Background subscribers with DashMap for async result delivery.

🗄️

SurrealDB

Multi-model database with graph capabilities. Multi-tenant scopes for workspace isolation. Native graph relations for Knowledge Graph. All queries use parameterized bindings for security. SCHEMAFULL tables with explicit indexes.

🔌

Backend API & MCP Connectors

40+ REST endpoints (projects, tasks, agents, workflows, swarm). WebSocket real-time updates. MCP gateway for external tool integration and plugin system. Multi-tenant SurrealDB scopes. Prometheus metrics at /metrics. 161 tests verify API correctness.

☸️

Cloud-Native & Self-Hosted

161 backend tests + K8s manifests with Kustomize overlays. Health checks, Prometheus metrics (/metrics endpoint), StatefulSets with anti-affinity. Local Docker Compose for development. Zero vendor lock-in.

Technology Stack

Rust (17 crates)Axum REST APISurrealDBNATS JetStreamLeptos WASMKubernetesPrometheusKnowledge GraphRLM (Hybrid Search)A2A ProtocolMCP Server

Available Agents

ArchitectSystem design
DeveloperCode implementation
CodeReviewerQuality assurance
TesterTests & benchmarks
DocumenterDocumentation
MarketerMarketing content
PresenterPresentations
DevOpsCI/CD deployment
MonitorHealth & alerting
SecurityAudit & compliance
ProjectManagerRoadmap tracking
DecisionMakerConflict resolution

Ready for intelligent orchestration?

Built with Rust 🦀 | Open Source | Self-Hosted

Explore on GitHub →

Vapora v1.2.0

Made with Vapora dreams and Rust reality ✨

Intelligent Development Orchestration | Multi-Agent Multi-IA Platform

+ Vapora
✅ v1.2.0 | 372 Tests | 100% Pass Rate
Vapora - Development Orchestration

Evaporate complexity

Development Flows

Specialized agents orchestrate pipelines for design, implementation, testing, documentation and deployment. Agents learn from history and optimize costs automatically.
100% self-hosted.

The 4 Problems It Solves

01

Context Switching

Developers jump between tools constantly. Vapora unifies everything in one intelligent system where context flows.

02

Knowledge Fragmentation

Decisions lost in threads, code scattered, docs unmaintained. RLM (Recursive Language Models) with hybrid search (BM25 + semantic) and chunking makes knowledge discoverable even in 100k+ token documents.

03

Manual Coordination

Orchestrating code review, testing, documentation and deployment manually creates bottlenecks. Multi-agent workflows solve this.

04

Dev-Ops Friction

Handoffs between developers and operations lack visibility and context. Vapora maintains unified deployment readiness.

How It Works

🤖

Specialized Agents

71 tests verify agent orchestration, learning profiles, and task assignment. Agents track expertise per task type with 7-day recency bias (3× weight). Real SurrealDB persistence + NATS coordination.

🧠

Intelligent Orchestration

53 tests verify multi-provider routing (Claude, OpenAI, Gemini, Ollama), per-role budget limits, cost tracking, and automatic fallback chains. Swarm coordination with load-balanced assignment using success_rate / (1 + load) formula.

📚

Recursive Language Models (RLM)

Process 100k+ token documents without context limits. Hybrid search combines BM25 (keywords) + semantic embeddings via RRF fusion. Intelligent chunking (Fixed/Semantic/Code) with SurrealDB persistence. Perfect for large codebases and documentation.

🔗

Agent-to-Agent (A2A) Protocol

Distributed agent coordination with task dispatch, status tracking, and result collection. Real SurrealDB persistence (no in-memory HashMap). NATS messaging for async completion. Exponential backoff retry with circuit breaker. 12 integration tests verify real behavior.

🕸️

Knowledge Graph

Temporal execution history with causal relationships. Learning curves from daily windowed aggregations. Similarity search recommends solutions from past tasks. 20 tests verify graph persistence, learning profiles, and execution tracking.

NATS JetStream

Reliable message delivery for agent coordination. JetStream streams for workflow events, task completion, and status updates. Graceful fallback when NATS unavailable. Background subscribers with DashMap for async result delivery.

🗄️

SurrealDB

Multi-model database with graph capabilities. Multi-tenant scopes for workspace isolation. Native graph relations for Knowledge Graph. All queries use parameterized bindings for security. SCHEMAFULL tables with explicit indexes.

🔌

Backend API & MCP Connectors

40+ REST endpoints (projects, tasks, agents, workflows, swarm). WebSocket real-time updates. MCP gateway for external tool integration and plugin system. Multi-tenant SurrealDB scopes. Prometheus metrics at /metrics. 161 tests verify API correctness.

☸️

Cloud-Native & Self-Hosted

161 backend tests + K8s manifests with Kustomize overlays. Health checks, Prometheus metrics (/metrics endpoint), StatefulSets with anti-affinity. Local Docker Compose for development. Zero vendor lock-in.

Technology Stack

Rust (18 crates)Axum REST APISurrealDBNATS JetStreamLeptos WASMKubernetesPrometheusKnowledge GraphRLM (Hybrid Search)A2A ProtocolMCP Server

Available Agents

ArchitectSystem design
DeveloperCode implementation
CodeReviewerQuality assurance
TesterTests & benchmarks
DocumenterDocumentation
MarketerMarketing content
PresenterPresentations
DevOpsCI/CD deployment
MonitorHealth & alerting
SecurityAudit & compliance
ProjectManagerRoadmap tracking
DecisionMakerConflict resolution

Ready for intelligent orchestration?

Built with Rust 🦀 | Open Source | Self-Hosted

Explore on GitHub →

Vapora v1.2.0

Made with Vapora dreams and Rust reality ✨

Intelligent Development Orchestration | Multi-Agent Multi-IA Platform

diff --git a/assets/web/src/index.html b/assets/web/src/index.html index 3b3f6c5..61aaece 100644 --- a/assets/web/src/index.html +++ b/assets/web/src/index.html @@ -516,8 +516,8 @@
- ✅ v1.2.0 | 354 Tests | 100% Pass Rate✅ v1.2.0 | 372 Tests | 100% Pass Rate
Vapora - Development Orchestration @@ -785,6 +785,42 @@ 161 backend tests + K8s manifests with Kustomize overlays. Health checks, Prometheus metrics (/metrics endpoint), StatefulSets with anti-affinity. Local Docker Compose for development. Zero vendor lock-in.

+
+
+

+ Autonomous Scheduling +

+

+ Cron-triggered workflow execution with IANA timezone support via chrono-tz. Distributed fire-lock using SurrealDB conditional UPDATE prevents double-fires across multi-instance deployments — no external lock service required. 48 tests. +

+
+
+
🔔
+

+ Webhook Notifications +

+

+ Real-time alerts to Slack, Discord, and Telegram — no vendor SDKs. ${VAR} secret resolution is built into ChannelRegistry construction; tokens never reach the HTTP layer unresolved. Fire-and-forget hooks on task completion, proposal approval/rejection, and workflow lifecycle events. +

+
@@ -795,7 +831,7 @@ >
- Rust (17 crates) + Rust (18 crates) Axum REST API SurrealDB NATS JetStream @@ -806,6 +842,8 @@ RLM (Hybrid Search) A2A Protocol MCP Server + chrono-tz (Cron) + Webhook Channels
diff --git a/assets/web/vapora_architecture.svg b/assets/web/vapora_architecture.svg index 024d66f..7afbe81 100644 --- a/assets/web/vapora_architecture.svg +++ b/assets/web/vapora_architecture.svg @@ -106,7 +106,7 @@ VAPORA ARCHITECTURE - 18 CRATES · 354 TESTS · 100% RUST + 18 CRATES · 372 TESTS · 100% RUST PRESENTATION diff --git a/crates/vapora-backend/Cargo.toml b/crates/vapora-backend/Cargo.toml index 767c3c7..afb1eb7 100644 --- a/crates/vapora-backend/Cargo.toml +++ b/crates/vapora-backend/Cargo.toml @@ -25,6 +25,7 @@ vapora-swarm = { workspace = true } vapora-tracking = { path = "../vapora-tracking" } vapora-knowledge-graph = { path = "../vapora-knowledge-graph" } vapora-workflow-engine = { workspace = true } +vapora-channels = { workspace = true } vapora-rlm = { path = "../vapora-rlm" } # Secrets management diff --git a/crates/vapora-backend/src/api/channels.rs b/crates/vapora-backend/src/api/channels.rs new file mode 100644 index 0000000..7325525 --- /dev/null +++ b/crates/vapora-backend/src/api/channels.rs @@ -0,0 +1,62 @@ +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::IntoResponse, + Json, +}; +use serde::Serialize; +use vapora_channels::{ChannelError, Message}; +use vapora_shared::VaporaError; + +use crate::api::state::AppState; +use crate::api::ApiResult; + +#[derive(Serialize)] +struct ChannelListResponse { + channels: Vec, +} + +/// List all registered notification channels. +/// +/// GET /api/v1/channels +pub async fn list_channels(State(state): State) -> impl IntoResponse { + let names = match &state.channel_registry { + Some(r) => { + let mut names: Vec = r.channel_names().into_iter().map(str::to_owned).collect(); + names.sort_unstable(); + names + } + None => vec![], + }; + Json(ChannelListResponse { channels: names }) +} + +/// Send a test message to a specific notification channel. +/// +/// POST /api/v1/channels/:name/test +/// +/// Returns 200 on successful delivery, 404 if the channel is unknown or not +/// configured, 502 if delivery fails at the remote platform. +pub async fn test_channel( + State(state): State, + Path(name): Path, +) -> ApiResult { + let registry = state.channel_registry.as_ref().ok_or_else(|| { + VaporaError::NotFound(format!( + "Channel '{}' not found — no channels configured", + name + )) + })?; + + let msg = Message::info( + "Test notification", + format!("Connectivity test from VAPORA backend for channel '{name}'"), + ); + + registry.send(&name, msg).await.map_err(|e| match e { + ChannelError::NotFound(_) => VaporaError::NotFound(e.to_string()), + other => VaporaError::InternalError(other.to_string()), + })?; + + Ok(StatusCode::OK) +} diff --git a/crates/vapora-backend/src/api/mod.rs b/crates/vapora-backend/src/api/mod.rs index 3d4e234..5119405 100644 --- a/crates/vapora-backend/src/api/mod.rs +++ b/crates/vapora-backend/src/api/mod.rs @@ -3,6 +3,7 @@ pub mod agents; pub mod analytics; pub mod analytics_metrics; +pub mod channels; pub mod error; pub mod health; pub mod metrics; diff --git a/crates/vapora-backend/src/api/proposals.rs b/crates/vapora-backend/src/api/proposals.rs index 76bc5fc..ab911db 100644 --- a/crates/vapora-backend/src/api/proposals.rs +++ b/crates/vapora-backend/src/api/proposals.rs @@ -7,6 +7,7 @@ use axum::{ Json, }; use serde::Deserialize; +use vapora_channels::Message; use vapora_shared::models::{Proposal, ProposalReview, ProposalStatus, RiskLevel}; use crate::api::state::AppState; @@ -186,6 +187,12 @@ pub async fn approve_proposal( .approve_proposal(&id, tenant_id) .await?; + let msg = Message::success( + "Proposal approved", + format!("'{}' has been approved", proposal.title), + ); + state.notify(&state.notification_config.clone().on_proposal_approved, msg); + Ok(Json(proposal)) } @@ -203,6 +210,12 @@ pub async fn reject_proposal( .reject_proposal(&id, tenant_id) .await?; + let msg = Message::warning( + "Proposal rejected", + format!("'{}' has been rejected", proposal.title), + ); + state.notify(&state.notification_config.clone().on_proposal_rejected, msg); + Ok(Json(proposal)) } diff --git a/crates/vapora-backend/src/api/state.rs b/crates/vapora-backend/src/api/state.rs index a158f9b..0ceca9b 100644 --- a/crates/vapora-backend/src/api/state.rs +++ b/crates/vapora-backend/src/api/state.rs @@ -2,10 +2,12 @@ use std::sync::Arc; +use vapora_channels::ChannelRegistry; use vapora_rlm::storage::SurrealDBStorage; use vapora_rlm::RLMEngine; use vapora_workflow_engine::{ScheduleStore, WorkflowOrchestrator}; +use crate::config::NotificationConfig; use crate::services::{ AgentService, ProjectService, ProposalService, ProviderAnalyticsService, TaskService, }; @@ -21,6 +23,11 @@ pub struct AppState { pub workflow_orchestrator: Option>, pub rlm_engine: Option>>, pub schedule_store: Option>, + /// Outbound notification channels; `None` when `[channels]` is absent from + /// config. + pub channel_registry: Option>, + /// Backend-level event → channel-name mappings. + pub notification_config: Arc, } impl AppState { @@ -41,6 +48,8 @@ impl AppState { workflow_orchestrator: None, rlm_engine: None, schedule_store: None, + channel_registry: None, + notification_config: Arc::new(NotificationConfig::default()), } } @@ -62,4 +71,192 @@ impl AppState { self.schedule_store = Some(store); self } + + /// Attach the notification channel registry built from `[channels]` config. + pub fn with_channel_registry(mut self, registry: Arc) -> Self { + self.channel_registry = Some(registry); + self + } + + /// Attach the per-event notification targets. + pub fn with_notification_config(mut self, cfg: NotificationConfig) -> Self { + self.notification_config = Arc::new(cfg); + self + } + + /// Fire-and-forget: send `msg` to each channel in `targets`. + /// + /// Spawns a background task; delivery failures are logged as `warn!` and + /// never surface to the caller. + pub fn notify(&self, targets: &[String], msg: vapora_channels::Message) { + if targets.is_empty() { + return; + } + let registry = self.channel_registry.clone(); + let targets = targets.to_vec(); + tokio::spawn(dispatch_notifications(registry, targets, msg)); + } +} + +/// Deliver `msg` to every channel name in `targets` using `registry`. +/// +/// A `None` registry or an unknown channel name is silent (warn-logged for +/// unknown names). Failures in one channel do not abort delivery to others. +/// +/// Extracted from [`AppState::notify`] to be directly callable in tests +/// without needing a fully-constructed [`AppState`]. +pub(crate) async fn dispatch_notifications( + registry: Option>, + targets: Vec, + msg: vapora_channels::Message, +) { + let Some(registry) = registry else { + return; + }; + for name in &targets { + if let Err(e) = registry.send(name, msg.clone()).await { + tracing::warn!(channel = %name, error = %e, "Notification delivery failed"); + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::{Arc, Mutex}; + + use async_trait::async_trait; + use vapora_channels::Result as ChannelResult; + use vapora_channels::{ChannelError, ChannelRegistry, Message, NotificationChannel}; + + use super::dispatch_notifications; + + struct RecordingChannel { + name: String, + captured: Arc>>, + } + + impl RecordingChannel { + fn new(name: &str) -> (Self, Arc>>) { + let captured = Arc::new(Mutex::new(vec![])); + ( + Self { + name: name.to_string(), + captured: Arc::clone(&captured), + }, + captured, + ) + } + } + + #[async_trait] + impl NotificationChannel for RecordingChannel { + fn name(&self) -> &str { + &self.name + } + async fn send(&self, msg: &Message) -> ChannelResult<()> { + self.captured.lock().unwrap().push(msg.clone()); + Ok(()) + } + } + + struct FailingChannel { + name: String, + } + + #[async_trait] + impl NotificationChannel for FailingChannel { + fn name(&self) -> &str { + &self.name + } + async fn send(&self, _msg: &Message) -> ChannelResult<()> { + Err(ChannelError::ApiError { + channel: self.name.clone(), + status: 503, + body: "unavailable".to_string(), + }) + } + } + + #[tokio::test] + async fn dispatch_is_noop_when_registry_is_none() { + // Must not panic; targets are non-empty so the only short-circuit is None + // registry. + dispatch_notifications(None, vec!["ch".to_string()], Message::info("Test", "body")).await; + } + + #[tokio::test] + async fn dispatch_delivers_to_named_channel() { + let (recording, captured) = RecordingChannel::new("team-slack"); + let mut registry = ChannelRegistry::new(); + registry.register(Arc::new(recording)); + + dispatch_notifications( + Some(Arc::new(registry)), + vec!["team-slack".to_string()], + Message::success("Deploy done", "v1.0 → prod"), + ) + .await; + + let msgs = captured.lock().unwrap(); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].title, "Deploy done"); + } + + #[tokio::test] + async fn dispatch_delivers_to_multiple_targets() { + let (ch_a, cap_a) = RecordingChannel::new("ch-a"); + let (ch_b, cap_b) = RecordingChannel::new("ch-b"); + let mut registry = ChannelRegistry::new(); + registry.register(Arc::new(ch_a)); + registry.register(Arc::new(ch_b)); + + dispatch_notifications( + Some(Arc::new(registry)), + vec!["ch-a".to_string(), "ch-b".to_string()], + Message::info("Test", "broadcast"), + ) + .await; + + assert_eq!(cap_a.lock().unwrap().len(), 1); + assert_eq!(cap_b.lock().unwrap().len(), 1); + } + + #[tokio::test] + async fn dispatch_continues_after_channel_failure() { + let bad = FailingChannel { + name: "bad".to_string(), + }; + let (good, cap_good) = RecordingChannel::new("good"); + let mut registry = ChannelRegistry::new(); + registry.register(Arc::new(bad)); + registry.register(Arc::new(good)); + + // Must not panic; "good" receives despite "bad" returning an error. + dispatch_notifications( + Some(Arc::new(registry)), + vec!["bad".to_string(), "good".to_string()], + Message::error("Alert", "system down"), + ) + .await; + + assert_eq!(cap_good.lock().unwrap().len(), 1); + } + + #[tokio::test] + async fn dispatch_logs_warn_on_unknown_channel_but_continues() { + let (present, cap) = RecordingChannel::new("present"); + let mut registry = ChannelRegistry::new(); + registry.register(Arc::new(present)); + + // "missing" → ChannelError::NotFound logged as warn; "present" still + // receives its message. + dispatch_notifications( + Some(Arc::new(registry)), + vec!["missing".to_string(), "present".to_string()], + Message::info("Test", "body"), + ) + .await; + + assert_eq!(cap.lock().unwrap().len(), 1); + } } diff --git a/crates/vapora-backend/src/api/tasks.rs b/crates/vapora-backend/src/api/tasks.rs index d915811..9145921 100644 --- a/crates/vapora-backend/src/api/tasks.rs +++ b/crates/vapora-backend/src/api/tasks.rs @@ -7,6 +7,7 @@ use axum::{ Json, }; use serde::Deserialize; +use vapora_channels::Message; use vapora_shared::models::{Task, TaskPriority, TaskStatus}; use crate::api::state::AppState; @@ -160,8 +161,17 @@ pub async fn update_task_status( let updated = state .task_service - .update_task_status(&id, tenant_id, status) + .update_task_status(&id, tenant_id, status.clone()) .await?; + + if status == TaskStatus::Done { + let msg = Message::success( + "Task completed", + format!("'{}' moved to Done", updated.title), + ); + state.notify(&state.notification_config.clone().on_task_done, msg); + } + Ok(Json(updated)) } diff --git a/crates/vapora-backend/src/api/workflow_orchestrator.rs b/crates/vapora-backend/src/api/workflow_orchestrator.rs index 352093f..beb89bf 100644 --- a/crates/vapora-backend/src/api/workflow_orchestrator.rs +++ b/crates/vapora-backend/src/api/workflow_orchestrator.rs @@ -291,6 +291,7 @@ mod tests { compensation_agents: None, }], schedule: None, + notifications: Default::default(), }; let instance = WorkflowInstance::new(&config, serde_json::json!({})); diff --git a/crates/vapora-backend/src/config.rs b/crates/vapora-backend/src/config.rs index 2f6dbc4..60de8ce 100644 --- a/crates/vapora-backend/src/config.rs +++ b/crates/vapora-backend/src/config.rs @@ -1,10 +1,12 @@ // Configuration module for VAPORA Backend // Loads config from vapora.toml with environment variable interpolation +use std::collections::HashMap; use std::fs; use std::path::Path; use serde::{Deserialize, Serialize}; +use vapora_channels::config::ChannelConfig; use vapora_shared::{Result, VaporaError}; /// Main configuration structure @@ -16,6 +18,32 @@ pub struct Config { pub auth: AuthConfig, pub logging: LoggingConfig, pub metrics: MetricsConfig, + /// Named outbound notification channels (`[channels.name]` blocks in TOML). + /// Credential fields support `${VAR}` / `${VAR:-default}` interpolation — + /// resolution happens automatically in [`ChannelRegistry::from_map`]. + #[serde(default)] + pub channels: HashMap, + /// Backend-level event → channel-name mappings. + #[serde(default)] + pub notifications: NotificationConfig, +} + +/// Per-event lists of channel names to notify. +/// +/// ```toml +/// [notifications] +/// on_task_done = ["team-slack"] +/// on_proposal_approved = ["team-slack"] +/// on_proposal_rejected = ["team-slack", "ops-telegram"] +/// ``` +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct NotificationConfig { + #[serde(default)] + pub on_task_done: Vec, + #[serde(default)] + pub on_proposal_approved: Vec, + #[serde(default)] + pub on_proposal_rejected: Vec, } /// Server configuration @@ -199,7 +227,8 @@ mod tests { #[test] fn test_env_var_interpolation() { - std::env::set_var("TEST_VAR", "test_value"); + // SAFETY: single-threaded test, no concurrent env access. + unsafe { std::env::set_var("TEST_VAR", "test_value") }; let input = "host = \"${TEST_VAR}\""; let result = Config::interpolate_env_vars(input).unwrap(); @@ -245,6 +274,8 @@ mod tests { enabled: true, port: 9090, }, + channels: HashMap::new(), + notifications: NotificationConfig::default(), }; assert!(config.validate().is_err()); diff --git a/crates/vapora-backend/src/main.rs b/crates/vapora-backend/src/main.rs index dcfae38..342b152 100644 --- a/crates/vapora-backend/src/main.rs +++ b/crates/vapora-backend/src/main.rs @@ -18,6 +18,7 @@ use axum::{ use clap::Parser; use tower_http::cors::{Any, CorsLayer}; use tracing::{info, Level}; +use vapora_channels::ChannelRegistry; use vapora_swarm::{SwarmCoordinator, SwarmMetrics}; use vapora_workflow_engine::ScheduleStore; @@ -109,8 +110,28 @@ async fn main() -> Result<()> { let schedule_store = Arc::new(ScheduleStore::new(Arc::new(db.clone()))); info!("ScheduleStore initialized for autonomous scheduling"); + // Build notification channel registry from [channels] config block. + // Absent block → no notifications sent; a build error is non-fatal (warns). + let channel_registry = if config.channels.is_empty() { + None + } else { + match ChannelRegistry::from_map(config.channels.clone()) { + Ok(r) => { + info!( + "Channel registry built ({} channels)", + r.channel_names().len() + ); + Some(std::sync::Arc::new(r)) + } + Err(e) => { + tracing::warn!("Failed to build channel registry: {e}; notifications disabled"); + None + } + } + }; + // Create application state - let app_state = AppState::new( + let mut app_state = AppState::new( project_service, task_service, agent_service, @@ -118,7 +139,11 @@ async fn main() -> Result<()> { provider_analytics_service, ) .with_rlm_engine(rlm_engine) - .with_schedule_store(schedule_store); + .with_schedule_store(schedule_store) + .with_notification_config(config.notifications.clone()); + if let Some(registry) = channel_registry { + app_state = app_state.with_channel_registry(registry); + } // Create SwarmMetrics for Prometheus monitoring let metrics = match SwarmMetrics::new() { @@ -333,6 +358,12 @@ async fn main() -> Result<()> { "/api/v1/analytics/providers/:provider/tasks/:task_type", get(api::provider_analytics::get_provider_task_type_metrics), ) + // Channel endpoints + .route("/api/v1/channels", get(api::channels::list_channels)) + .route( + "/api/v1/channels/:name/test", + post(api::channels::test_channel), + ) // RLM endpoints (Phase 8) .route("/api/v1/rlm/documents", post(api::rlm::load_document)) .route("/api/v1/rlm/query", post(api::rlm::query_document)) diff --git a/crates/vapora-channels/Cargo.toml b/crates/vapora-channels/Cargo.toml new file mode 100644 index 0000000..8b760bf --- /dev/null +++ b/crates/vapora-channels/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "vapora-channels" +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true +homepage = "https://vapora.dev" +rust-version.workspace = true +description = "Outbound notification channels: Slack, Discord, Telegram" + +[dependencies] +reqwest = { workspace = true } +async-trait = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } +regex = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["full"] } +wiremock = { workspace = true } diff --git a/crates/vapora-channels/src/channel.rs b/crates/vapora-channels/src/channel.rs new file mode 100644 index 0000000..4c2b01e --- /dev/null +++ b/crates/vapora-channels/src/channel.rs @@ -0,0 +1,9 @@ +use async_trait::async_trait; + +use crate::{error::Result, message::Message}; + +#[async_trait] +pub trait NotificationChannel: Send + Sync { + fn name(&self) -> &str; + async fn send(&self, msg: &Message) -> Result<()>; +} diff --git a/crates/vapora-channels/src/config.rs b/crates/vapora-channels/src/config.rs new file mode 100644 index 0000000..5d5399b --- /dev/null +++ b/crates/vapora-channels/src/config.rs @@ -0,0 +1,246 @@ +use std::collections::HashMap; +use std::sync::OnceLock; + +use regex::Regex; +use serde::{Deserialize, Serialize}; + +use crate::error::{ChannelError, Result}; + +/// Top-level config section; embed under `[channels]` in your TOML. +/// +/// Credential fields (`webhook_url`, `bot_token`, etc.) support `${VAR}` and +/// `${VAR:-default}` interpolation. Resolution is performed automatically by +/// [`ChannelRegistry::from_config`] / [`ChannelRegistry::from_map`] via +/// [`ChannelConfig::resolve_secrets`]. Plain literals pass through unchanged. +/// +/// ```toml +/// [channels.team-slack] +/// type = "slack" +/// webhook_url = "${SLACK_WEBHOOK_URL}" +/// +/// [channels.ops-discord] +/// type = "discord" +/// webhook_url = "${DISCORD_WEBHOOK_URL}" +/// +/// [channels.alerts-telegram] +/// type = "telegram" +/// bot_token = "${TELEGRAM_BOT_TOKEN}" +/// chat_id = "${TELEGRAM_CHAT_ID:-100999}" +/// ``` +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct ChannelsConfig { + #[serde(default)] + pub channels: HashMap, +} + +impl ChannelsConfig { + /// Resolve all `${VAR}` references in every channel entry. + /// + /// Consumes `self` and returns a new `ChannelsConfig` with literals. Fails + /// on the first channel whose secrets cannot be resolved. + pub fn resolve_secrets(self) -> Result { + let channels = self + .channels + .into_iter() + .map(|(name, cfg)| cfg.resolve_secrets().map(|c| (name, c))) + .collect::>()?; + Ok(Self { channels }) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "lowercase")] +pub enum ChannelConfig { + Slack(SlackConfig), + Discord(DiscordConfig), + Telegram(TelegramConfig), +} + +impl ChannelConfig { + /// Resolve `${VAR}` / `${VAR:-default}` references in all credential + /// fields. Plain string literals are returned unchanged. + pub fn resolve_secrets(self) -> Result { + match self { + Self::Slack(c) => Ok(Self::Slack(c.resolve_secrets()?)), + Self::Discord(c) => Ok(Self::Discord(c.resolve_secrets()?)), + Self::Telegram(c) => Ok(Self::Telegram(c.resolve_secrets()?)), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SlackConfig { + pub webhook_url: String, + /// Channel override (e.g. `#alerts`). The webhook already targets a + /// channel; this overrides it for workspaces that allow it. + pub channel: Option, + pub username: Option, +} + +impl SlackConfig { + pub fn resolve_secrets(self) -> Result { + Ok(Self { + webhook_url: interpolate(&self.webhook_url)?, + channel: self.channel.map(|s| interpolate(&s)).transpose()?, + username: self.username.map(|s| interpolate(&s)).transpose()?, + }) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DiscordConfig { + pub webhook_url: String, + pub username: Option, + pub avatar_url: Option, +} + +impl DiscordConfig { + pub fn resolve_secrets(self) -> Result { + Ok(Self { + webhook_url: interpolate(&self.webhook_url)?, + username: self.username.map(|s| interpolate(&s)).transpose()?, + avatar_url: self.avatar_url.map(|s| interpolate(&s)).transpose()?, + }) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TelegramConfig { + pub bot_token: String, + /// Numeric chat ID (e.g. `-1001234567890` for a supergroup). + pub chat_id: String, + /// Override the Bot API base URL. Leave `None` for production. + /// Useful for pointing at a local mock server during tests. + pub api_base: Option, +} + +impl TelegramConfig { + pub fn api_url(&self) -> String { + let base = self + .api_base + .as_deref() + .unwrap_or("https://api.telegram.org"); + format!("{}/bot{}/sendMessage", base, self.bot_token) + } + + pub fn resolve_secrets(self) -> Result { + Ok(Self { + bot_token: interpolate(&self.bot_token)?, + chat_id: interpolate(&self.chat_id)?, + api_base: self.api_base.map(|s| interpolate(&s)).transpose()?, + }) + } +} + +/// Expand every `${VAR}` / `${VAR:-default}` reference found anywhere in `s`. +/// +/// - `${FOO}` → value of `FOO`, error if unset +/// - `${FOO:-bar}` → value of `FOO` if set, `"bar"` otherwise +/// - Anything else → returned unchanged +fn interpolate(s: &str) -> Result { + static RE: OnceLock = OnceLock::new(); + let re = RE.get_or_init(|| { + // Matches ${VAR} and ${VAR:-default} anywhere in the string. + Regex::new(r"\$\{([^}:]+)(?::-(.*?))?\}").expect("static regex is valid") + }); + + // Fast path: no placeholder in the string. + if !s.contains("${") { + return Ok(s.to_string()); + } + + let mut result = s.to_string(); + for cap in re.captures_iter(s) { + let full = cap.get(0).unwrap().as_str(); + let var_name = cap.get(1).unwrap().as_str(); + let default = cap.get(2).map(|m| m.as_str()); + + let value = match std::env::var(var_name) { + Ok(v) => v, + Err(_) => match default { + Some(d) => d.to_string(), + None => return Err(ChannelError::SecretNotFound(var_name.to_string())), + }, + }; + result = result.replace(full, &value); + } + Ok(result) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn literal_passthrough() { + let v = interpolate("https://hooks.slack.com/services/T/B/token").unwrap(); + assert_eq!(v, "https://hooks.slack.com/services/T/B/token"); + } + + #[test] + fn env_var_resolved() { + // SAFETY: single-threaded test process, no concurrent env access. + unsafe { std::env::set_var("TEST_CHANNELS_WEBHOOK", "https://resolved.example.com") }; + let v = interpolate("${TEST_CHANNELS_WEBHOOK}").unwrap(); + assert_eq!(v, "https://resolved.example.com"); + } + + #[test] + fn env_var_with_default_used_when_unset() { + // SAFETY: single-threaded test process, no concurrent env access. + unsafe { std::env::remove_var("TEST_CHANNELS_MISSING") }; + let v = interpolate("${TEST_CHANNELS_MISSING:-fallback-token}").unwrap(); + assert_eq!(v, "fallback-token"); + } + + #[test] + fn env_var_missing_no_default_errors() { + // SAFETY: single-threaded test process, no concurrent env access. + unsafe { std::env::remove_var("TEST_CHANNELS_REQUIRED") }; + let err = interpolate("${TEST_CHANNELS_REQUIRED}").unwrap_err(); + assert!( + matches!(err, ChannelError::SecretNotFound(ref v) if v == "TEST_CHANNELS_REQUIRED") + ); + } + + #[test] + fn partial_interpolation_in_url() { + // SAFETY: single-threaded test process, no concurrent env access. + unsafe { std::env::set_var("TEST_CHANNELS_PARTIAL_TOKEN", "abc123") }; + let v = + interpolate("https://hooks.example.com/services/${TEST_CHANNELS_PARTIAL_TOKEN}/end") + .unwrap(); + assert_eq!(v, "https://hooks.example.com/services/abc123/end"); + } + + #[test] + fn slack_config_resolves_secrets() { + // SAFETY: single-threaded test process, no concurrent env access. + unsafe { std::env::set_var("TEST_SLACK_WEBHOOK", "https://hooks.slack.com/s/t/b/x") }; + let cfg = SlackConfig { + webhook_url: "${TEST_SLACK_WEBHOOK}".to_string(), + channel: Some("#alerts".to_string()), + username: None, + }; + let resolved = cfg.resolve_secrets().unwrap(); + assert_eq!(resolved.webhook_url, "https://hooks.slack.com/s/t/b/x"); + assert_eq!(resolved.channel.as_deref(), Some("#alerts")); + } + + #[test] + fn telegram_config_resolves_secrets() { + // SAFETY: single-threaded test process, no concurrent env access. + unsafe { + std::env::set_var("TEST_TG_TOKEN", "999:TOKEN"); + std::env::set_var("TEST_TG_CHAT", "-100999"); + } + let cfg = TelegramConfig { + bot_token: "${TEST_TG_TOKEN}".to_string(), + chat_id: "${TEST_TG_CHAT}".to_string(), + api_base: None, + }; + let resolved = cfg.resolve_secrets().unwrap(); + assert_eq!(resolved.bot_token, "999:TOKEN"); + assert_eq!(resolved.chat_id, "-100999"); + } +} diff --git a/crates/vapora-channels/src/discord.rs b/crates/vapora-channels/src/discord.rs new file mode 100644 index 0000000..23f9b4d --- /dev/null +++ b/crates/vapora-channels/src/discord.rs @@ -0,0 +1,154 @@ +use async_trait::async_trait; +use reqwest::Client; +use serde_json::{json, Value}; +use tracing::instrument; + +use crate::{ + channel::NotificationChannel, + config::DiscordConfig, + error::{ChannelError, Result}, + message::Message, +}; + +pub struct DiscordChannel { + name: String, + config: DiscordConfig, + client: Client, +} + +impl DiscordChannel { + pub fn new(name: impl Into, config: DiscordConfig, client: Client) -> Self { + Self { + name: name.into(), + config, + client, + } + } +} + +/// Builds the Discord webhook JSON payload from a message. +pub(crate) fn build_payload( + msg: &Message, + username_override: Option<&str>, + avatar_url: Option<&str>, +) -> Value { + let fields: Vec = msg + .metadata + .iter() + .map(|(k, v)| { + json!({ + "name": k, + "value": v, + "inline": true + }) + }) + .collect(); + + let mut payload = json!({ + "embeds": [{ + "title": msg.title, + "description": msg.body, + "color": msg.level.discord_color(), + "fields": fields, + "footer": { "text": "vapora" } + }] + }); + + if let Some(u) = username_override { + payload["username"] = json!(u); + } + if let Some(av) = avatar_url { + payload["avatar_url"] = json!(av); + } + + payload +} + +#[async_trait] +impl NotificationChannel for DiscordChannel { + fn name(&self) -> &str { + &self.name + } + + #[instrument(skip(self, msg), fields(channel = %self.name))] + async fn send(&self, msg: &Message) -> Result<()> { + let payload = build_payload( + msg, + self.config.username.as_deref(), + self.config.avatar_url.as_deref(), + ); + + let resp = self + .client + .post(&self.config.webhook_url) + .json(&payload) + .send() + .await + .map_err(|e| ChannelError::HttpError { + channel: self.name.clone(), + source: e, + })?; + + // Discord returns 204 No Content on success. + if !resp.status().is_success() { + let status = resp.status().as_u16(); + let body = resp.text().await.unwrap_or_default(); + return Err(ChannelError::ApiError { + channel: self.name.clone(), + status, + body, + }); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn error_message_uses_red_discord_color() { + let msg = Message::error("Service down", "Health check failed"); + let payload = build_payload(&msg, None, None); + assert_eq!( + payload["embeds"][0]["color"].as_u64().unwrap(), + 0xcc0000_u64 + ); + } + + #[test] + fn success_message_uses_green_discord_color() { + let msg = Message::success("Deploy complete", "v1.2.0"); + let payload = build_payload(&msg, None, None); + assert_eq!( + payload["embeds"][0]["color"].as_u64().unwrap(), + 0x36a64f_u64 + ); + } + + #[test] + fn metadata_maps_to_inline_fields() { + let msg = Message::info("Test", "Body").with_metadata("region", "eu-west-1"); + let payload = build_payload(&msg, None, None); + let fields = payload["embeds"][0]["fields"].as_array().unwrap(); + assert_eq!(fields.len(), 1); + assert_eq!(fields[0]["inline"], json!(true)); + } + + #[test] + fn username_and_avatar_appear_at_top_level() { + let msg = Message::info("Test", "Body"); + let payload = build_payload( + &msg, + Some("vapora-bot"), + Some("https://example.com/avatar.png"), + ); + assert_eq!(payload["username"], json!("vapora-bot")); + assert_eq!( + payload["avatar_url"], + json!("https://example.com/avatar.png") + ); + } +} diff --git a/crates/vapora-channels/src/error.rs b/crates/vapora-channels/src/error.rs new file mode 100644 index 0000000..37220af --- /dev/null +++ b/crates/vapora-channels/src/error.rs @@ -0,0 +1,31 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum ChannelError { + #[error("HTTP request failed for channel '{channel}': {source}")] + HttpError { + channel: String, + #[source] + source: reqwest::Error, + }, + + #[error("Channel '{0}' not found in registry")] + NotFound(String), + + #[error("Channel '{channel}' returned non-success status {status}: {body}")] + ApiError { + channel: String, + status: u16, + body: String, + }, + + #[error("Failed to build HTTP client: {0}")] + HttpClientBuild(String), + + /// Raised when a `${VAR}` reference is present in config but the env var + /// is not set and no `:-default` was provided. + #[error("Secret reference '${{{{0}}}}' not resolved: env var not set and no default provided")] + SecretNotFound(String), +} + +pub type Result = std::result::Result; diff --git a/crates/vapora-channels/src/lib.rs b/crates/vapora-channels/src/lib.rs new file mode 100644 index 0000000..b518fc6 --- /dev/null +++ b/crates/vapora-channels/src/lib.rs @@ -0,0 +1,54 @@ +//! Outbound notification channels for VAPORA. +//! +//! Delivers workflow events and agent completion signals to external team +//! communication platforms. All three providers use HTTP webhooks / Bot API — +//! no vendor SDKs are required. +//! +//! # Supported Channels +//! +//! - **Slack** — Incoming Webhooks (POST JSON with `attachments`) +//! - **Discord** — Incoming Webhooks (POST JSON with `embeds`, 204 response) +//! - **Telegram** — Bot API `sendMessage` with HTML `parse_mode` +//! +//! # Quick Start +//! +//! ```toml +//! # vapora.toml (under your [channels] section) +//! [channels.team-slack] +//! type = "slack" +//! webhook_url = "https://hooks.slack.com/services/…" +//! +//! [channels.ops-discord] +//! type = "discord" +//! webhook_url = "https://discord.com/api/webhooks/…" +//! +//! [channels.alerts] +//! type = "telegram" +//! bot_token = "123456:ABC-DEF…" +//! chat_id = "-1001234567890" +//! ``` +//! +//! ```rust,ignore +//! let config: ChannelsConfig = toml::from_str(toml_str)?; +//! let registry = ChannelRegistry::from_config(config)?; +//! +//! registry.send("team-slack", Message::success( +//! "Deploy complete", +//! "v1.2.0 is live on production", +//! )).await?; +//! ``` + +pub mod channel; +pub mod config; +pub mod discord; +pub mod error; +pub mod message; +pub mod registry; +pub mod slack; +pub mod telegram; + +pub use channel::NotificationChannel; +pub use config::{ChannelConfig, ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig}; +pub use error::{ChannelError, Result}; +pub use message::{Message, MessageLevel}; +pub use registry::ChannelRegistry; diff --git a/crates/vapora-channels/src/message.rs b/crates/vapora-channels/src/message.rs new file mode 100644 index 0000000..b0d318d --- /dev/null +++ b/crates/vapora-channels/src/message.rs @@ -0,0 +1,127 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum MessageLevel { + Info, + Success, + Warning, + Error, +} + +impl MessageLevel { + /// Slack attachment hex color string. + pub fn slack_color(self) -> &'static str { + match self { + Self::Info => "#0099ff", + Self::Success => "#36a64f", + Self::Warning => "#ffcc00", + Self::Error => "#cc0000", + } + } + + /// Discord embed color as 0xRRGGBB integer. + pub fn discord_color(self) -> u32 { + match self { + Self::Info => 0x0099ff, + Self::Success => 0x36a64f, + Self::Warning => 0xffcc00, + Self::Error => 0xcc0000, + } + } + + /// Unicode emoji prefix for plain-text formats. + pub fn emoji(self) -> &'static str { + match self { + Self::Info => "ℹ️", + Self::Success => "✅", + Self::Warning => "⚠️", + Self::Error => "🔴", + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Message { + pub title: String, + pub body: String, + pub level: MessageLevel, + #[serde(default)] + pub metadata: HashMap, +} + +impl Message { + pub fn new(title: impl Into, body: impl Into, level: MessageLevel) -> Self { + Self { + title: title.into(), + body: body.into(), + level, + metadata: HashMap::new(), + } + } + + pub fn info(title: impl Into, body: impl Into) -> Self { + Self::new(title, body, MessageLevel::Info) + } + + pub fn success(title: impl Into, body: impl Into) -> Self { + Self::new(title, body, MessageLevel::Success) + } + + pub fn warning(title: impl Into, body: impl Into) -> Self { + Self::new(title, body, MessageLevel::Warning) + } + + pub fn error(title: impl Into, body: impl Into) -> Self { + Self::new(title, body, MessageLevel::Error) + } + + pub fn with_metadata(mut self, key: impl Into, value: impl Into) -> Self { + self.metadata.insert(key.into(), value.into()); + self + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn level_colors_are_distinct() { + let levels = [ + MessageLevel::Info, + MessageLevel::Success, + MessageLevel::Warning, + MessageLevel::Error, + ]; + let slack_colors: Vec<_> = levels.iter().map(|l| l.slack_color()).collect(); + let discord_colors: Vec<_> = levels.iter().map(|l| l.discord_color()).collect(); + // All four Slack colors are unique. + let mut deduped = slack_colors.clone(); + deduped.dedup(); + assert_eq!(deduped.len(), 4); + // All four Discord colors are unique. + let mut deduped = discord_colors.clone(); + deduped.dedup(); + assert_eq!(deduped.len(), 4); + } + + #[test] + fn constructors_set_correct_level() { + assert_eq!(Message::info("t", "b").level, MessageLevel::Info); + assert_eq!(Message::success("t", "b").level, MessageLevel::Success); + assert_eq!(Message::warning("t", "b").level, MessageLevel::Warning); + assert_eq!(Message::error("t", "b").level, MessageLevel::Error); + } + + #[test] + fn with_metadata_is_additive() { + let msg = Message::info("t", "b") + .with_metadata("env", "prod") + .with_metadata("region", "eu-west-1"); + assert_eq!(msg.metadata.len(), 2); + assert_eq!(msg.metadata["env"], "prod"); + } +} diff --git a/crates/vapora-channels/src/registry.rs b/crates/vapora-channels/src/registry.rs new file mode 100644 index 0000000..c2780bc --- /dev/null +++ b/crates/vapora-channels/src/registry.rs @@ -0,0 +1,122 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use tracing::{error, instrument}; + +use crate::{ + channel::NotificationChannel, + config::{ChannelConfig, ChannelsConfig}, + discord::DiscordChannel, + error::{ChannelError, Result}, + message::Message, + slack::SlackChannel, + telegram::TelegramChannel, +}; + +/// Routes outbound notifications to named channels. +/// +/// Each channel is addressed by the name given in the config (e.g. +/// `"team-slack"`, `"ops-discord"`). `send` delivers to one channel; +/// `broadcast` fans out to all registered channels in parallel. +pub struct ChannelRegistry { + channels: HashMap>, +} + +impl ChannelRegistry { + /// Creates an empty registry. Use `register` to add channels individually + /// (e.g. in tests with pre-built clients). + pub fn new() -> Self { + Self { + channels: HashMap::new(), + } + } + + /// Builds all channels from `config` with a shared reqwest::Client. + /// + /// The client is configured with a 10 s timeout and a `vapora-channels` + /// User-Agent. Fails if the TLS backend cannot be initialised. + pub fn from_config(config: ChannelsConfig) -> Result { + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(10)) + .user_agent(concat!("vapora-channels/", env!("CARGO_PKG_VERSION"))) + .build() + .map_err(|e| ChannelError::HttpClientBuild(e.to_string()))?; + + // Resolve ${VAR} references in every channel's credential fields before + // constructing any HTTP client — this is the single mandatory call site. + let config = config.resolve_secrets()?; + + let mut registry = Self::new(); + for (name, ch_config) in config.channels { + let channel: Arc = match ch_config { + ChannelConfig::Slack(cfg) => { + Arc::new(SlackChannel::new(name.clone(), cfg, client.clone())) + } + ChannelConfig::Discord(cfg) => { + Arc::new(DiscordChannel::new(name.clone(), cfg, client.clone())) + } + ChannelConfig::Telegram(cfg) => { + Arc::new(TelegramChannel::new(name.clone(), cfg, client.clone())) + } + }; + registry.channels.insert(name, channel); + } + Ok(registry) + } + + /// Builds all channels from a flat map, creating a shared + /// `reqwest::Client`. + /// + /// Equivalent to wrapping the map in `ChannelsConfig` and calling + /// `from_config`. Use this when you hold the channel entries directly + /// (e.g. from `WorkflowsConfig.channels`). + pub fn from_map(channels: std::collections::HashMap) -> Result { + Self::from_config(ChannelsConfig { channels }) + } + + /// Registers an already-constructed channel implementation. + pub fn register(&mut self, channel: Arc) -> &mut Self { + self.channels.insert(channel.name().to_string(), channel); + self + } + + /// Sends `msg` to a single channel identified by `name`. + #[instrument(skip(self, msg), fields(channel = %name))] + pub async fn send(&self, name: &str, msg: Message) -> Result<()> { + let channel = self + .channels + .get(name) + .ok_or_else(|| ChannelError::NotFound(name.to_string()))?; + channel.send(&msg).await + } + + /// Sends `msg` to every registered channel sequentially. + /// + /// Returns a `Vec` of `(channel_name, Result)` — failures do not abort + /// delivery to remaining channels. + pub async fn broadcast(&self, msg: Message) -> Vec<(String, Result<()>)> { + let mut results = Vec::with_capacity(self.channels.len()); + for (name, channel) in &self.channels { + let result = channel.send(&msg).await; + if let Err(ref e) = result { + error!(channel = %name, error = %e, "Broadcast delivery failed"); + } + results.push((name.clone(), result)); + } + results + } + + /// Returns the names of all registered channels. + pub fn channel_names(&self) -> Vec<&str> { + self.channels.keys().map(String::as_str).collect() + } + + pub fn is_empty(&self) -> bool { + self.channels.is_empty() + } +} + +impl Default for ChannelRegistry { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/vapora-channels/src/slack.rs b/crates/vapora-channels/src/slack.rs new file mode 100644 index 0000000..2273fb5 --- /dev/null +++ b/crates/vapora-channels/src/slack.rs @@ -0,0 +1,148 @@ +use async_trait::async_trait; +use reqwest::Client; +use serde_json::{json, Value}; +use tracing::instrument; + +use crate::{ + channel::NotificationChannel, + config::SlackConfig, + error::{ChannelError, Result}, + message::Message, +}; + +pub struct SlackChannel { + name: String, + config: SlackConfig, + client: Client, +} + +impl SlackChannel { + pub fn new(name: impl Into, config: SlackConfig, client: Client) -> Self { + Self { + name: name.into(), + config, + client, + } + } +} + +/// Builds the Slack webhook JSON payload from a message. +/// +/// Extracted as a free function so payload shape can be unit-tested without +/// mocking HTTP. +pub(crate) fn build_payload( + msg: &Message, + channel_override: Option<&str>, + username_override: Option<&str>, +) -> Value { + let fields: Vec = msg + .metadata + .iter() + .map(|(k, v)| { + json!({ + "title": k, + "value": v, + "short": true + }) + }) + .collect(); + + let mut payload = json!({ + "attachments": [{ + "color": msg.level.slack_color(), + "title": msg.title, + "text": msg.body, + "footer": "vapora", + "fields": fields + }] + }); + + if let Some(ch) = channel_override { + payload["channel"] = json!(ch); + } + if let Some(u) = username_override { + payload["username"] = json!(u); + } + + payload +} + +#[async_trait] +impl NotificationChannel for SlackChannel { + fn name(&self) -> &str { + &self.name + } + + #[instrument(skip(self, msg), fields(channel = %self.name))] + async fn send(&self, msg: &Message) -> Result<()> { + let payload = build_payload( + msg, + self.config.channel.as_deref(), + self.config.username.as_deref(), + ); + + let resp = self + .client + .post(&self.config.webhook_url) + .json(&payload) + .send() + .await + .map_err(|e| ChannelError::HttpError { + channel: self.name.clone(), + source: e, + })?; + + if !resp.status().is_success() { + let status = resp.status().as_u16(); + let body = resp.text().await.unwrap_or_default(); + return Err(ChannelError::ApiError { + channel: self.name.clone(), + status, + body, + }); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn error_message_uses_red_color() { + let msg = Message::error("Deploy failed", "Rollback triggered"); + let payload = build_payload(&msg, None, None); + assert_eq!( + payload["attachments"][0]["color"].as_str().unwrap(), + "#cc0000" + ); + } + + #[test] + fn metadata_maps_to_fields_array() { + let msg = Message::info("Test", "Body") + .with_metadata("env", "production") + .with_metadata("version", "1.2.0"); + let payload = build_payload(&msg, None, None); + let fields = payload["attachments"][0]["fields"].as_array().unwrap(); + assert_eq!(fields.len(), 2); + } + + #[test] + fn channel_override_appears_at_top_level() { + let msg = Message::info("Test", "Body"); + let payload = build_payload(&msg, Some("#alerts"), Some("vapora-bot")); + assert_eq!(payload["channel"], json!("#alerts")); + assert_eq!(payload["username"], json!("vapora-bot")); + } + + #[test] + fn no_overrides_leaves_keys_absent() { + let msg = Message::info("Test", "Body"); + let payload = build_payload(&msg, None, None); + assert!(payload.get("channel").is_none()); + assert!(payload.get("username").is_none()); + } +} diff --git a/crates/vapora-channels/src/telegram.rs b/crates/vapora-channels/src/telegram.rs new file mode 100644 index 0000000..f9feb67 --- /dev/null +++ b/crates/vapora-channels/src/telegram.rs @@ -0,0 +1,178 @@ +use async_trait::async_trait; +use reqwest::Client; +use serde_json::{json, Value}; +use tracing::instrument; + +use crate::{ + channel::NotificationChannel, + config::TelegramConfig, + error::{ChannelError, Result}, + message::Message, +}; + +pub struct TelegramChannel { + name: String, + config: TelegramConfig, + client: Client, +} + +impl TelegramChannel { + pub fn new(name: impl Into, config: TelegramConfig, client: Client) -> Self { + Self { + name: name.into(), + config, + client, + } + } +} + +/// Escapes the three characters that have meaning in Telegram HTML mode. +fn html_escape(s: &str) -> String { + // Telegram HTML supports , , ,
, .
+    // Only &, < and > need escaping.
+    let mut out = String::with_capacity(s.len());
+    for ch in s.chars() {
+        match ch {
+            '&' => out.push_str("&"),
+            '<' => out.push_str("<"),
+            '>' => out.push_str(">"),
+            other => out.push(other),
+        }
+    }
+    out
+}
+
+/// Builds the Telegram sendMessage JSON body.
+pub(crate) fn build_payload(msg: &Message, chat_id: &str) -> Value {
+    let mut text = format!(
+        "{} {}\n\n{}",
+        msg.level.emoji(),
+        html_escape(&msg.title),
+        html_escape(&msg.body),
+    );
+
+    if !msg.metadata.is_empty() {
+        text.push('\n');
+        for (k, v) in &msg.metadata {
+            text.push_str(&format!("\n{}: {}", html_escape(k), html_escape(v)));
+        }
+    }
+
+    json!({
+        "chat_id": chat_id,
+        "text": text,
+        "parse_mode": "HTML"
+    })
+}
+
+#[async_trait]
+impl NotificationChannel for TelegramChannel {
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    #[instrument(skip(self, msg), fields(channel = %self.name))]
+    async fn send(&self, msg: &Message) -> Result<()> {
+        let payload = build_payload(msg, &self.config.chat_id);
+
+        let resp = self
+            .client
+            .post(self.config.api_url())
+            .json(&payload)
+            .send()
+            .await
+            .map_err(|e| ChannelError::HttpError {
+                channel: self.name.clone(),
+                source: e,
+            })?;
+
+        if !resp.status().is_success() {
+            let status = resp.status().as_u16();
+            let body = resp.text().await.unwrap_or_default();
+            return Err(ChannelError::ApiError {
+                channel: self.name.clone(),
+                status,
+                body,
+            });
+        }
+
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn html_escape_handles_all_three_special_chars() {
+        assert_eq!(html_escape("a < b & c > d"), "a < b & c > d");
+    }
+
+    #[test]
+    fn html_escape_is_noop_for_plain_text() {
+        let plain = "Deploy complete: v1.2.0 to production";
+        assert_eq!(html_escape(plain), plain);
+    }
+
+    #[test]
+    fn payload_uses_html_parse_mode() {
+        let msg = Message::info("Test", "Body");
+        let payload = build_payload(&msg, "-100123456");
+        assert_eq!(payload["parse_mode"], json!("HTML"));
+        assert_eq!(payload["chat_id"], json!("-100123456"));
+    }
+
+    #[test]
+    fn payload_contains_emoji_and_bold_title() {
+        let msg = Message::error("Service down", "Health check failed");
+        let payload = build_payload(&msg, "-100");
+        let text = payload["text"].as_str().unwrap();
+        assert!(text.contains("🔴"));
+        assert!(text.contains(""));
+        assert!(text.contains("Service down"));
+    }
+
+    #[test]
+    fn payload_escapes_html_in_title_and_body() {
+        let msg = Message::warning("a < b", "x & y > z");
+        let payload = build_payload(&msg, "-100");
+        let text = payload["text"].as_str().unwrap();
+        assert!(text.contains("a < b"));
+        assert!(text.contains("x & y > z"));
+    }
+
+    #[test]
+    fn metadata_appended_as_bold_key_value_lines() {
+        let msg = Message::info("Test", "Body").with_metadata("env", "prod");
+        let payload = build_payload(&msg, "-100");
+        let text = payload["text"].as_str().unwrap();
+        assert!(text.contains("env: prod"));
+    }
+
+    #[test]
+    fn api_url_default_base() {
+        let cfg = TelegramConfig {
+            bot_token: "123:ABC".to_string(),
+            chat_id: "-100".to_string(),
+            api_base: None,
+        };
+        assert_eq!(
+            cfg.api_url(),
+            "https://api.telegram.org/bot123:ABC/sendMessage"
+        );
+    }
+
+    #[test]
+    fn api_url_custom_base_for_testing() {
+        let cfg = TelegramConfig {
+            bot_token: "123:ABC".to_string(),
+            chat_id: "-100".to_string(),
+            api_base: Some("http://localhost:8080".to_string()),
+        };
+        assert_eq!(
+            cfg.api_url(),
+            "http://localhost:8080/bot123:ABC/sendMessage"
+        );
+    }
+}
diff --git a/crates/vapora-channels/tests/integration.rs b/crates/vapora-channels/tests/integration.rs
new file mode 100644
index 0000000..be1bbd2
--- /dev/null
+++ b/crates/vapora-channels/tests/integration.rs
@@ -0,0 +1,285 @@
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use vapora_channels::{
+    config::{DiscordConfig, SlackConfig, TelegramConfig},
+    discord::DiscordChannel,
+    error::ChannelError,
+    message::Message,
+    registry::ChannelRegistry,
+    slack::SlackChannel,
+    telegram::TelegramChannel,
+    NotificationChannel, Result,
+};
+use wiremock::{
+    matchers::{method, path},
+    Mock, MockServer, ResponseTemplate,
+};
+
+// ── Slack ─────────────────────────────────────────────────────────────────────
+
+#[tokio::test]
+async fn slack_send_returns_ok_on_200() {
+    let server = MockServer::start().await;
+
+    Mock::given(method("POST"))
+        .and(path("/hooks/slack"))
+        .respond_with(ResponseTemplate::new(200).set_body_string("ok"))
+        .mount(&server)
+        .await;
+
+    let cfg = SlackConfig {
+        webhook_url: format!("{}/hooks/slack", server.uri()),
+        channel: None,
+        username: None,
+    };
+    let channel = SlackChannel::new("slack", cfg, reqwest::Client::new());
+    let msg = Message::success("Deploy complete", "v1.2.0 → production");
+
+    channel.send(&msg).await.expect("should succeed on 200");
+}
+
+#[tokio::test]
+async fn slack_send_returns_api_error_on_500() {
+    let server = MockServer::start().await;
+
+    Mock::given(method("POST"))
+        .and(path("/hooks/slack"))
+        .respond_with(ResponseTemplate::new(500).set_body_string("internal_error"))
+        .mount(&server)
+        .await;
+
+    let cfg = SlackConfig {
+        webhook_url: format!("{}/hooks/slack", server.uri()),
+        channel: None,
+        username: None,
+    };
+    let channel = SlackChannel::new("slack", cfg, reqwest::Client::new());
+    let err = channel
+        .send(&Message::info("Test", "Body"))
+        .await
+        .unwrap_err();
+
+    assert!(
+        matches!(
+            err,
+            ChannelError::ApiError {
+                status: 500,
+                ref body,
+                ..
+            } if body == "internal_error"
+        ),
+        "unexpected error variant: {err}"
+    );
+}
+
+// ── Discord
+// ───────────────────────────────────────────────────────────────────
+
+#[tokio::test]
+async fn discord_send_returns_ok_on_204() {
+    let server = MockServer::start().await;
+
+    Mock::given(method("POST"))
+        .and(path("/webhooks/discord"))
+        .respond_with(ResponseTemplate::new(204))
+        .mount(&server)
+        .await;
+
+    let cfg = DiscordConfig {
+        webhook_url: format!("{}/webhooks/discord", server.uri()),
+        username: None,
+        avatar_url: None,
+    };
+    let channel = DiscordChannel::new("discord", cfg, reqwest::Client::new());
+
+    channel
+        .send(&Message::warning("High latency", "p99 > 500 ms"))
+        .await
+        .expect("should succeed on 204");
+}
+
+#[tokio::test]
+async fn discord_send_returns_api_error_on_400() {
+    let server = MockServer::start().await;
+
+    Mock::given(method("POST"))
+        .and(path("/webhooks/discord"))
+        .respond_with(ResponseTemplate::new(400).set_body_string("{\"code\":50006}"))
+        .mount(&server)
+        .await;
+
+    let cfg = DiscordConfig {
+        webhook_url: format!("{}/webhooks/discord", server.uri()),
+        username: None,
+        avatar_url: None,
+    };
+    let channel = DiscordChannel::new("discord", cfg, reqwest::Client::new());
+    let err = channel
+        .send(&Message::info("Test", "Body"))
+        .await
+        .unwrap_err();
+
+    assert!(
+        matches!(err, ChannelError::ApiError { status: 400, .. }),
+        "unexpected error variant: {err}"
+    );
+}
+
+// ── Telegram
+// ──────────────────────────────────────────────────────────────────
+
+#[tokio::test]
+async fn telegram_send_returns_ok_on_200() {
+    let server = MockServer::start().await;
+
+    // Telegram returns {"ok": true, "result": {...}} with HTTP 200.
+    Mock::given(method("POST"))
+        .and(path("/botTEST_TOKEN/sendMessage"))
+        .respond_with(
+            ResponseTemplate::new(200).set_body_string(r#"{"ok":true,"result":{"message_id":1}}"#),
+        )
+        .mount(&server)
+        .await;
+
+    let cfg = TelegramConfig {
+        bot_token: "TEST_TOKEN".to_string(),
+        chat_id: "-100999".to_string(),
+        api_base: Some(server.uri()),
+    };
+    let channel = TelegramChannel::new("telegram", cfg, reqwest::Client::new());
+
+    channel
+        .send(&Message::error("Service down", "Critical alert"))
+        .await
+        .expect("should succeed on 200");
+}
+
+#[tokio::test]
+async fn telegram_send_returns_api_error_on_400() {
+    let server = MockServer::start().await;
+
+    Mock::given(method("POST"))
+        .and(path("/botBAD_TOKEN/sendMessage"))
+        .respond_with(
+            ResponseTemplate::new(400)
+                .set_body_string(r#"{"ok":false,"description":"Unauthorized"}"#),
+        )
+        .mount(&server)
+        .await;
+
+    let cfg = TelegramConfig {
+        bot_token: "BAD_TOKEN".to_string(),
+        chat_id: "-100".to_string(),
+        api_base: Some(server.uri()),
+    };
+    let channel = TelegramChannel::new("telegram", cfg, reqwest::Client::new());
+    let err = channel
+        .send(&Message::info("Test", "Body"))
+        .await
+        .unwrap_err();
+
+    assert!(
+        matches!(err, ChannelError::ApiError { status: 400, .. }),
+        "unexpected error variant: {err}"
+    );
+}
+
+// ── Registry
+// ──────────────────────────────────────────────────────────────────
+
+struct AlwaysOkChannel {
+    name: String,
+}
+
+#[async_trait]
+impl NotificationChannel for AlwaysOkChannel {
+    fn name(&self) -> &str {
+        &self.name
+    }
+    async fn send(&self, _msg: &Message) -> Result<()> {
+        Ok(())
+    }
+}
+
+struct AlwaysFailChannel {
+    name: String,
+}
+
+#[async_trait]
+impl NotificationChannel for AlwaysFailChannel {
+    fn name(&self) -> &str {
+        &self.name
+    }
+    async fn send(&self, _msg: &Message) -> Result<()> {
+        Err(ChannelError::ApiError {
+            channel: self.name.clone(),
+            status: 503,
+            body: "unavailable".to_string(),
+        })
+    }
+}
+
+#[tokio::test]
+async fn registry_send_routes_to_named_channel() {
+    let mut registry = ChannelRegistry::new();
+    registry.register(Arc::new(AlwaysOkChannel {
+        name: "ok-channel".to_string(),
+    }));
+
+    registry
+        .send("ok-channel", Message::info("Test", "Body"))
+        .await
+        .expect("should route to ok-channel and succeed");
+}
+
+#[tokio::test]
+async fn registry_send_returns_not_found_for_unknown_channel() {
+    let registry = ChannelRegistry::new();
+    let err = registry
+        .send("does-not-exist", Message::info("Test", "Body"))
+        .await
+        .unwrap_err();
+
+    assert!(
+        matches!(err, ChannelError::NotFound(ref n) if n == "does-not-exist"),
+        "unexpected error: {err}"
+    );
+}
+
+#[tokio::test]
+async fn registry_broadcast_delivers_to_all_channels() {
+    let mut registry = ChannelRegistry::new();
+    registry.register(Arc::new(AlwaysOkChannel {
+        name: "ch-a".to_string(),
+    }));
+    registry.register(Arc::new(AlwaysOkChannel {
+        name: "ch-b".to_string(),
+    }));
+
+    let results = registry
+        .broadcast(Message::success("All systems green", ""))
+        .await;
+
+    assert_eq!(results.len(), 2);
+    assert!(results.iter().all(|(_, r)| r.is_ok()));
+}
+
+#[tokio::test]
+async fn registry_broadcast_continues_after_partial_failure() {
+    let mut registry = ChannelRegistry::new();
+    registry.register(Arc::new(AlwaysOkChannel {
+        name: "good".to_string(),
+    }));
+    registry.register(Arc::new(AlwaysFailChannel {
+        name: "bad".to_string(),
+    }));
+
+    let results = registry.broadcast(Message::info("Test", "Body")).await;
+
+    assert_eq!(results.len(), 2);
+    let ok_count = results.iter().filter(|(_, r)| r.is_ok()).count();
+    let err_count = results.iter().filter(|(_, r)| r.is_err()).count();
+    assert_eq!(ok_count, 1);
+    assert_eq!(err_count, 1);
+}
diff --git a/crates/vapora-workflow-engine/Cargo.toml b/crates/vapora-workflow-engine/Cargo.toml
index 7431518..b944539 100644
--- a/crates/vapora-workflow-engine/Cargo.toml
+++ b/crates/vapora-workflow-engine/Cargo.toml
@@ -12,6 +12,7 @@ categories.workspace = true
 
 [dependencies]
 vapora-shared = { workspace = true }
+vapora-channels = { workspace = true }
 vapora-swarm = { workspace = true }
 vapora-agents = { workspace = true }
 vapora-knowledge-graph = { workspace = true }
diff --git a/crates/vapora-workflow-engine/src/config.rs b/crates/vapora-workflow-engine/src/config.rs
index 0f59631..dc40099 100644
--- a/crates/vapora-workflow-engine/src/config.rs
+++ b/crates/vapora-workflow-engine/src/config.rs
@@ -2,6 +2,7 @@ use std::path::Path;
 use std::str::FromStr;
 
 use serde::{Deserialize, Serialize};
+use vapora_channels::config::ChannelConfig;
 
 use crate::error::{ConfigError, Result};
 
@@ -9,6 +10,10 @@ use crate::error::{ConfigError, Result};
 pub struct WorkflowsConfig {
     pub engine: EngineConfig,
     pub workflows: Vec,
+    /// Outbound notification channels keyed by name. Absent from TOML → no
+    /// notifications sent. Each entry becomes a channel in `ChannelRegistry`.
+    #[serde(default)]
+    pub channels: std::collections::HashMap,
 }
 
 #[derive(Debug, Clone, Deserialize)]
@@ -20,6 +25,30 @@ pub struct EngineConfig {
     pub cedar_policy_dir: Option,
 }
 
+/// Per-workflow notification targets, keyed by event type.
+///
+/// Each field is a list of channel names registered in `[channels]`.
+///
+/// ```toml
+/// [[workflows]]
+/// name = "deploy-prod"
+/// trigger = "schedule"
+///
+/// [workflows.notifications]
+/// on_completed          = ["team-slack"]
+/// on_failed             = ["team-slack", "ops-telegram"]
+/// on_approval_required  = ["team-slack"]
+/// ```
+#[derive(Debug, Clone, Default, Serialize, Deserialize)]
+pub struct WorkflowNotifications {
+    #[serde(default)]
+    pub on_completed: Vec,
+    #[serde(default)]
+    pub on_failed: Vec,
+    #[serde(default)]
+    pub on_approval_required: Vec,
+}
+
 #[derive(Debug, Clone, Deserialize)]
 pub struct WorkflowConfig {
     pub name: String,
@@ -27,6 +56,8 @@ pub struct WorkflowConfig {
     pub stages: Vec,
     #[serde(default)]
     pub schedule: Option,
+    #[serde(default)]
+    pub notifications: WorkflowNotifications,
 }
 
 /// Cron-based scheduling configuration for `trigger = "schedule"` workflows.
@@ -78,7 +109,7 @@ impl WorkflowsConfig {
         Ok(config)
     }
 
-    fn validate(&self) -> Result<()> {
+    pub fn validate(&self) -> Result<()> {
         if self.workflows.is_empty() {
             return Err(ConfigError::Invalid("No workflows defined".to_string()).into());
         }
@@ -207,6 +238,7 @@ approval_required = false
                 cedar_policy_dir: None,
             },
             workflows: vec![],
+            channels: std::collections::HashMap::new(),
         };
 
         assert!(config.validate().is_err());
diff --git a/crates/vapora-workflow-engine/src/instance.rs b/crates/vapora-workflow-engine/src/instance.rs
index 1d0f73f..ca9d8e7 100644
--- a/crates/vapora-workflow-engine/src/instance.rs
+++ b/crates/vapora-workflow-engine/src/instance.rs
@@ -227,6 +227,7 @@ mod tests {
                 },
             ],
             schedule: None,
+            notifications: Default::default(),
         }
     }
 
diff --git a/crates/vapora-workflow-engine/src/orchestrator.rs b/crates/vapora-workflow-engine/src/orchestrator.rs
index e5cd78a..7384732 100644
--- a/crates/vapora-workflow-engine/src/orchestrator.rs
+++ b/crates/vapora-workflow-engine/src/orchestrator.rs
@@ -9,6 +9,7 @@ use surrealdb::Surreal;
 use tokio::sync::watch;
 use tracing::{debug, error, info, warn};
 use vapora_agents::messages::{AgentMessage, TaskCompleted, TaskFailed};
+use vapora_channels::{ChannelRegistry, Message};
 use vapora_knowledge_graph::persistence::KGPersistence;
 use vapora_swarm::coordinator::SwarmCoordinator;
 
@@ -36,6 +37,8 @@ pub struct WorkflowOrchestrator {
     store: Arc,
     saga: SagaCompensator,
     cedar: Option>,
+    /// Outbound notification registry. `None` when no channels are configured.
+    channels: Option>,
 }
 
 impl WorkflowOrchestrator {
@@ -68,6 +71,16 @@ impl WorkflowOrchestrator {
             info!("Cedar authorization enabled for workflow stages");
         }
 
+        let channels = if config.channels.is_empty() {
+            None
+        } else {
+            let count = config.channels.len();
+            let registry = ChannelRegistry::from_map(config.channels.clone())
+                .map_err(|e| WorkflowError::Internal(format!("Channel registry init: {e}")))?;
+            info!(count, "Notification channels registered");
+            Some(Arc::new(registry))
+        };
+
         // Crash recovery: restore active workflows from DB
         let active_workflows = DashMap::new();
         match store.load_active().await {
@@ -96,6 +109,7 @@ impl WorkflowOrchestrator {
             store,
             saga,
             cedar,
+            channels,
         })
     }
 
@@ -388,13 +402,16 @@ impl WorkflowOrchestrator {
         if should_continue {
             self.execute_current_stage(workflow_id).await?;
         } else {
-            let duration = {
+            let (duration, template_name) = {
                 let instance = self
                     .active_workflows
                     .get(workflow_id)
                     .ok_or_else(|| WorkflowError::WorkflowNotFound(workflow_id.to_string()))?;
 
-                (Utc::now() - instance.created_at).num_seconds() as f64
+                (
+                    (Utc::now() - instance.created_at).num_seconds() as f64,
+                    instance.template_name.clone(),
+                )
             };
 
             self.metrics.workflow_duration_seconds.observe(duration);
@@ -408,6 +425,8 @@ impl WorkflowOrchestrator {
             );
 
             self.publish_workflow_completed(workflow_id).await?;
+            self.notify_workflow_completed(workflow_id, &template_name, duration)
+                .await;
 
             // Remove from DB — terminal state is cleaned up
             if let Err(e) = self.store.delete(workflow_id).await {
@@ -550,7 +569,7 @@ impl WorkflowOrchestrator {
         // `mark_current_task_failed` encapsulates the mutable stage borrow so
         // the DashMap entry can be re-accessed without nesting or borrow
         // conflicts.
-        let compensation_data: Option<(Vec, Value, String)> = {
+        let compensation_data: Option<(Vec, Value, String, String)> = {
             let mut instance = self
                 .active_workflows
                 .get_mut(&workflow_id)
@@ -576,6 +595,7 @@ impl WorkflowOrchestrator {
                     let current_idx = instance.current_stage_idx;
                     let executed_stages = instance.stages[..current_idx].to_vec();
                     let context = instance.initial_context.clone();
+                    let template_name = instance.template_name.clone();
 
                     instance.fail(format!("Stage {} failed: {}", stage_name, msg.error));
 
@@ -589,12 +609,12 @@ impl WorkflowOrchestrator {
                         "Workflow failed"
                     );
 
-                    Some((executed_stages, context, stage_name))
+                    Some((executed_stages, context, stage_name, template_name))
                 }
             }
         }; // DashMap lock released here
 
-        if let Some((executed_stages, context, _stage_name)) = compensation_data {
+        if let Some((executed_stages, context, stage_name, template_name)) = compensation_data {
             // Saga compensation: dispatch rollback tasks in reverse order (best-effort)
             self.saga
                 .compensate(&workflow_id, &executed_stages, &context)
@@ -604,6 +624,9 @@ impl WorkflowOrchestrator {
             if let Some(instance) = self.active_workflows.get(&workflow_id) {
                 self.store.save(instance.value()).await?;
             }
+
+            self.notify_workflow_failed(&workflow_id, &template_name, &stage_name, &msg.error)
+                .await;
         }
 
         Ok(())
@@ -631,6 +654,15 @@ impl WorkflowOrchestrator {
             "Approval request published"
         );
 
+        let template_name = self
+            .active_workflows
+            .get(workflow_id)
+            .map(|e| e.template_name.clone())
+            .unwrap_or_default();
+
+        self.notify_approval_required(workflow_id, &template_name, stage_name)
+            .await;
+
         Ok(())
     }
 
@@ -700,6 +732,113 @@ impl WorkflowOrchestrator {
         Ok((scheduler, shutdown_tx))
     }
 
+    /// Sends a completion notification to every channel listed in
+    /// `workflow.notifications.on_completed`. Never propagates errors —
+    /// a channel timeout must not abort the workflow record.
+    async fn notify_workflow_completed(
+        &self,
+        workflow_id: &str,
+        template: &str,
+        duration_secs: f64,
+    ) {
+        let Some(registry) = &self.channels else {
+            return;
+        };
+
+        let targets = self
+            .config
+            .get_workflow(template)
+            .map(|w| w.notifications.on_completed.clone())
+            .unwrap_or_default();
+
+        if targets.is_empty() {
+            return;
+        }
+
+        let msg = Message::success(
+            format!("Workflow completed: {}", template),
+            format!("All stages finished in {:.0}s", duration_secs),
+        )
+        .with_metadata("workflow_id", workflow_id)
+        .with_metadata("template", template)
+        .with_metadata("duration", format!("{:.0}s", duration_secs));
+
+        for target in &targets {
+            if let Err(e) = registry.send(target, msg.clone()).await {
+                warn!(channel = %target, error = %e, "Completion notification failed");
+            }
+        }
+    }
+
+    async fn notify_workflow_failed(
+        &self,
+        workflow_id: &str,
+        template: &str,
+        stage: &str,
+        error: &str,
+    ) {
+        let Some(registry) = &self.channels else {
+            return;
+        };
+
+        let targets = self
+            .config
+            .get_workflow(template)
+            .map(|w| w.notifications.on_failed.clone())
+            .unwrap_or_default();
+
+        if targets.is_empty() {
+            return;
+        }
+
+        let msg = Message::error(
+            format!("Workflow failed: {}", template),
+            format!("Stage `{}` failed: {}", stage, error),
+        )
+        .with_metadata("workflow_id", workflow_id)
+        .with_metadata("template", template)
+        .with_metadata("failed_stage", stage);
+
+        for target in &targets {
+            if let Err(e) = registry.send(target, msg.clone()).await {
+                warn!(channel = %target, error = %e, "Failure notification failed");
+            }
+        }
+    }
+
+    async fn notify_approval_required(&self, workflow_id: &str, template: &str, stage_name: &str) {
+        let Some(registry) = &self.channels else {
+            return;
+        };
+
+        let targets = self
+            .config
+            .get_workflow(template)
+            .map(|w| w.notifications.on_approval_required.clone())
+            .unwrap_or_default();
+
+        if targets.is_empty() {
+            return;
+        }
+
+        let msg = Message::warning(
+            format!("Approval required: {}", stage_name),
+            format!(
+                "Workflow `{}` is waiting for human approval to proceed with stage `{}`.",
+                template, stage_name
+            ),
+        )
+        .with_metadata("workflow_id", workflow_id)
+        .with_metadata("template", template)
+        .with_metadata("stage", stage_name);
+
+        for target in &targets {
+            if let Err(e) = registry.send(target, msg.clone()).await {
+                warn!(channel = %target, error = %e, "Approval notification failed");
+            }
+        }
+    }
+
     pub fn list_templates(&self) -> Vec {
         self.config
             .workflows
diff --git a/crates/vapora-workflow-engine/tests/notification_config.rs b/crates/vapora-workflow-engine/tests/notification_config.rs
new file mode 100644
index 0000000..22034c4
--- /dev/null
+++ b/crates/vapora-workflow-engine/tests/notification_config.rs
@@ -0,0 +1,106 @@
+use vapora_channels::ChannelRegistry;
+use vapora_workflow_engine::config::WorkflowsConfig;
+
+/// Full TOML round-trip: channels section + per-workflow notification targets.
+const TOML_WITH_CHANNELS: &str = r#"
+[engine]
+max_parallel_tasks = 4
+workflow_timeout   = 3600
+approval_gates_enabled = false
+
+[channels.team-slack]
+type        = "slack"
+webhook_url = "https://hooks.slack.com/services/TEST/TEST/TEST"
+
+[channels.ops-telegram]
+type      = "telegram"
+bot_token = "123:TEST"
+chat_id   = "-100999"
+
+[[workflows]]
+name    = "deploy-prod"
+trigger = "manual"
+
+[workflows.notifications]
+on_completed         = ["team-slack"]
+on_failed            = ["team-slack", "ops-telegram"]
+on_approval_required = ["team-slack"]
+
+[[workflows.stages]]
+name   = "build"
+agents = ["developer"]
+
+[[workflows.stages]]
+name              = "deploy"
+agents            = ["deployer"]
+approval_required = true
+"#;
+
+/// Workflow with no [channels] section — should parse without error and leave
+/// the channel map empty (registry skipped by orchestrator).
+const TOML_WITHOUT_CHANNELS: &str = r#"
+[engine]
+max_parallel_tasks    = 4
+workflow_timeout      = 3600
+approval_gates_enabled = false
+
+[[workflows]]
+name    = "ci-pipeline"
+trigger = "manual"
+
+[[workflows.stages]]
+name   = "test"
+agents = ["developer"]
+"#;
+
+#[test]
+fn channels_section_parses_into_config() {
+    let config: WorkflowsConfig = toml::from_str(TOML_WITH_CHANNELS).expect("must parse");
+
+    assert_eq!(config.channels.len(), 2);
+    assert!(config.channels.contains_key("team-slack"));
+    assert!(config.channels.contains_key("ops-telegram"));
+}
+
+#[test]
+fn notification_targets_parse_per_workflow() {
+    let config: WorkflowsConfig = toml::from_str(TOML_WITH_CHANNELS).expect("must parse");
+
+    let wf = config.get_workflow("deploy-prod").expect("workflow exists");
+    assert_eq!(wf.notifications.on_completed, ["team-slack"]);
+    assert_eq!(wf.notifications.on_failed, ["team-slack", "ops-telegram"]);
+    assert_eq!(wf.notifications.on_approval_required, ["team-slack"]);
+}
+
+#[test]
+fn missing_channels_section_defaults_to_empty() {
+    let config: WorkflowsConfig = toml::from_str(TOML_WITHOUT_CHANNELS).expect("must parse");
+
+    assert!(config.channels.is_empty());
+}
+
+#[test]
+fn missing_notifications_block_defaults_to_empty_vecs() {
+    let config: WorkflowsConfig = toml::from_str(TOML_WITHOUT_CHANNELS).expect("must parse");
+
+    let wf = config.get_workflow("ci-pipeline").expect("workflow exists");
+    assert!(wf.notifications.on_completed.is_empty());
+    assert!(wf.notifications.on_failed.is_empty());
+    assert!(wf.notifications.on_approval_required.is_empty());
+}
+
+#[test]
+fn channel_registry_builds_from_config() {
+    let config: WorkflowsConfig = toml::from_str(TOML_WITH_CHANNELS).expect("must parse");
+    let registry = ChannelRegistry::from_map(config.channels).expect("registry must build");
+
+    let mut names = registry.channel_names();
+    names.sort_unstable();
+    assert_eq!(names, ["ops-telegram", "team-slack"]);
+}
+
+#[test]
+fn validation_passes_with_channels_and_notifications() {
+    let config: WorkflowsConfig = toml::from_str(TOML_WITH_CHANNELS).expect("must parse");
+    config.validate().expect("validation must pass");
+}
diff --git a/docs/adrs/0035-notification-channels.md b/docs/adrs/0035-notification-channels.md
new file mode 100644
index 0000000..fcea436
--- /dev/null
+++ b/docs/adrs/0035-notification-channels.md
@@ -0,0 +1,159 @@
+# ADR-0035: Webhook-Based Notification Channels — `vapora-channels` Crate
+
+**Status**: Implemented
+**Date**: 2026-02-26
+**Deciders**: VAPORA Team
+**Technical Story**: Workflow events (task completion, proposal approve/reject, schedule fires) had no outbound delivery path; operators had to poll the API to learn about state changes.
+
+---
+
+## Decision
+
+Introduce a dedicated `vapora-channels` crate implementing a **trait-based webhook delivery layer** with:
+
+1. `NotificationChannel` trait — single `send(&Message) -> Result<()>` method; consumers implement HTTP webhooks (Slack, Discord, Telegram) without vendor SDK dependencies.
+2. `ChannelRegistry` — name-keyed routing hub; `from_config(HashMap)` builds the registry from TOML config, resolving secrets at construction time.
+3. `${VAR}` / `${VAR:-default}` interpolation **inside the library** — secret resolution is mandatory and cannot be bypassed by callers.
+4. Fire-and-forget delivery at both layers: `AppState::notify` (backend) and `WorkflowOrchestrator::notify_*` (workflow engine) spawn background tasks; delivery failures are `warn!`-logged and never surface to API callers.
+5. Per-event routing config (`NotificationConfig`) maps event names to channel-name lists, not hardcoded channel identifiers.
+
+---
+
+## Context
+
+### Gaps Addressed
+
+| Gap | Consequence |
+|-----|-------------|
+| No outbound event delivery | Operators must poll 40+ API endpoints to detect state changes |
+| Secrets in TOML as plain strings | If resolution is left to callers, a `${SLACK_WEBHOOK_URL}` placeholder reaches the HTTP layer verbatim when the caller forgets to interpolate |
+| Tight vendor coupling | Using `slack-rs` / `serenity` locks the feature to specific Slack/Discord API versions and transitive dependency trees |
+
+### Why `NotificationChannel` Trait Over Vendor SDKs
+
+Slack, Discord, and Telegram all accept a simple `POST` with a JSON body to a webhook URL — no OAuth, no persistent connection, no stateful session. A trait with one async method covers all three with less than 50 lines per implementation. Vendor SDKs add 200–500 kB of transitive dependencies and introduce breaking changes on provider API updates.
+
+### Why Secret Resolution in the Library
+
+Placing the responsibility on the caller creates a **TOFU gap**: the first time any caller forgets to call `resolve_secrets()` before constructing `ChannelRegistry`, a raw `${SLACK_WEBHOOK_URL}` string is sent to Slack's API as the URL. The request fails silently (Slack returns 404 or 400), the placeholder leaks in logs, and no compile-time or runtime warning is raised until a log is inspected.
+
+Moving interpolation into `ChannelRegistry::from_config` makes it **structurally impossible to construct a registry with unresolved secrets**: `ChannelError::SecretNotFound(var_name)` is returned immediately if an env var is absent and no default is provided. There is no non-error path that bypasses resolution.
+
+### Why Fire-and-Forget With `tokio::spawn`
+
+Notification delivery is a best-effort side-effect, not part of the request/response contract. A Slack outage should not cause a `POST /api/v1/proposals/:id/approve` to return 500. Spawning an independent task decouples delivery latency from API latency; `warn!` logging provides observability without blocking the caller.
+
+---
+
+## Implementation
+
+### Crate Structure (`vapora-channels`)
+
+```
+vapora-channels/
+├── src/
+│   ├── lib.rs         — pub re-exports (ChannelRegistry, Message, NotificationChannel)
+│   ├── channel.rs     — NotificationChannel trait
+│   ├── config.rs      — ChannelsConfig, ChannelConfig, SlackConfig/DiscordConfig/TelegramConfig
+│   │                    resolve_secrets() chain + interpolate() with OnceLock
+│   ├── error.rs       — ChannelError: NotFound, ApiError, SecretNotFound, SerializationError
+│   ├── message.rs     — Message { title, body, level: Info|Success|Warning|Error }
+│   ├── registry.rs    — ChannelRegistry: name → Arc
+│   └── webhooks/
+│       ├── slack.rs   — SlackChannel: POST IncomingWebhook JSON
+│       ├── discord.rs — DiscordChannel: POST Webhook embed JSON
+│       └── telegram.rs— TelegramChannel: POST bot sendMessage JSON
+```
+
+### Secret Resolution
+
+```
+interpolate(s: &str) -> Result:
+  regex: \$\{([^}:]+)(?::-(.*?))?\}   (compiled once via OnceLock)
+  fast-path: if !s.contains("${") { return Ok(s) }
+  for each capture:
+    var_name = capture[1]
+    default  = capture[2] (optional)
+    match env::var(var_name):
+      Ok(v)  → replace placeholder with v
+      Err(_) → if default.is_some(): replace with default
+               else: return Err(SecretNotFound(var_name))
+```
+
+`resolve_secrets()` is called unconditionally in `ChannelRegistry::from_config` — single mandatory call site, no consumer bypass.
+
+### Integration Points
+
+#### `vapora-workflow-engine`
+
+`WorkflowConfig.notifications: WorkflowNotifications` maps four events to channel-name lists:
+
+```toml
+[workflows.myflow.notifications]
+on_stage_complete = ["team-slack"]
+on_stage_failed   = ["team-slack", "ops-discord"]
+on_completed      = ["team-slack"]
+on_cancelled      = ["ops-discord"]
+```
+
+`WorkflowOrchestrator` holds `Option>` and calls `notify_stage_complete`, `notify_stage_failed`, `notify_completed`, `notify_cancelled` — each spawns `dispatch_notifications`.
+
+#### `vapora-backend`
+
+`Config.channels: HashMap` and `Config.notifications: NotificationConfig`:
+
+```toml
+[channels.team-slack]
+type = "slack"
+webhook_url = "${SLACK_WEBHOOK_URL}"
+
+[notifications]
+on_task_done         = ["team-slack"]
+on_proposal_approved = ["team-slack", "ops-discord"]
+on_proposal_rejected = ["ops-discord"]
+```
+
+`AppState` gains `channel_registry: Option>` and `notification_config: Arc`. Hooks in three existing handlers:
+
+- `update_task_status` — fires `Message::success` on `TaskStatus::Done`
+- `approve_proposal` — fires `Message::success`
+- `reject_proposal` — fires `Message::warning`
+
+#### New REST Endpoints
+
+| Method | Path | Description |
+|--------|------|-------------|
+| `GET` | `/api/v1/channels` | List registered channel names |
+| `POST` | `/api/v1/channels/:name/test` | Send connectivity test; 200 OK / 404 / 502 |
+
+### Testability
+
+`dispatch_notifications` is extracted as `pub(crate) async fn` taking `Option>` directly, making it testable without a DB or a fully-constructed `AppState`. Five inline tests in `state.rs` use `RecordingChannel` (captures messages) and `FailingChannel` (returns 503 error) to verify:
+
+1. No-op when registry is `None`
+2. Single-channel delivery
+3. Multi-channel broadcast
+4. Resilience: delivery continues after one channel fails
+5. Warn-log on unknown channel name, other channels still receive
+
+---
+
+## Consequences
+
+### Positive
+
+- Operators get real-time Slack/Discord/Telegram alerts on task completion, proposal decisions, and workflow lifecycle events.
+- Adding a new channel type requires implementing one trait method and one TOML variant — no changes to routing or dispatch code.
+- Secret resolution failures surface immediately at startup (if `ChannelRegistry::from_config` is called at boot), not silently at first delivery.
+- Zero additional infrastructure: webhooks are outbound-only HTTP POSTs.
+
+### Negative / Trade-offs
+
+- Delivery is best-effort (fire-and-forget). A channel that is consistently down produces `warn!` logs but no alert escalation; consumers needing guaranteed delivery must implement their own retry loop or use a message queue.
+- `${VAR}` interpolation uses `unsafe { std::env::set_var }` in tests (required by Rust 1.80 stabilization of the unsafety annotation). Tests set/unset env vars; multi-threaded test parallelism can cause flaky results if not isolated with `#[serial_test::serial]`.
+- No per-channel rate limiting: a workflow that fires 1,000 stage-complete events will produce 1,000 Slack messages. Operators must configure `notifications` lists deliberately.
+
+### Supersedes / Specializes
+
+- Builds on `SecretumVault` pattern (ADR-0011) philosophy of never storing secrets as plain strings; specializes it to config-file webhook tokens.
+- Parallel to `vapora-a2a-client`'s retry pattern (ADR-0030) — both handle external HTTP delivery, but channels are fire-and-forget while A2A requires confirmed response.
diff --git a/docs/adrs/README.md b/docs/adrs/README.md
index 2dbee85..5a9417e 100644
--- a/docs/adrs/README.md
+++ b/docs/adrs/README.md
@@ -2,8 +2,8 @@
 
 Documentación de las decisiones arquitectónicas clave del proyecto VAPORA.
 
-**Status**: Complete (33 ADRs documented)
-**Last Updated**: 2026-02-21
+**Status**: Complete (35 ADRs documented)
+**Last Updated**: 2026-02-26
 **Format**: Custom VAPORA (Decision, Rationale, Alternatives, Trade-offs, Implementation, Verification, Consequences)
 
 ---
@@ -81,6 +81,8 @@ Decisiones únicas que diferencian a VAPORA de otras plataformas de orquestació
 | [028](./0028-workflow-orchestrator.md) | Workflow Orchestrator para Multi-Agent Pipelines | Short-lived agent contexts + artifact passing para reducir cache tokens 95% | ✅ Accepted |
 | [029](./0029-rlm-recursive-language-models.md) | Recursive Language Models (RLM) | Custom Rust engine: BM25 + semantic hybrid search + distributed LLM dispatch + WASM/Docker sandbox | ✅ Accepted |
 | [033](./0033-stratum-orchestrator-workflow-hardening.md) | Workflow Engine Hardening — Persistence · Saga · Cedar | SurrealDB persistence + Saga best-effort rollback + Cedar per-stage auth; stratum patterns implemented natively (no path dep) | ✅ Implemented |
+| [034](./0034-autonomous-scheduling.md) | Autonomous Scheduling — Timezone Support and Distributed Fire-Lock | `chrono-tz` IANA-aware cron evaluation + SurrealDB conditional UPDATE fire-lock; no external lock service required | ✅ Implemented |
+| [035](./0035-notification-channels.md) | Webhook-Based Notification Channels — `vapora-channels` Crate | Trait-based webhook delivery (Slack/Discord/Telegram) + `${VAR}` secret resolution built into `ChannelRegistry::from_config`; fire-and-forget via `tokio::spawn` | ✅ Implemented |
 
 ---
 
@@ -141,6 +143,8 @@ Patrones de desarrollo y arquitectura utilizados en todo el codebase.
 - **Real-Time WebSocket Updates**: Broadcast channels for efficient multi-client workflow progress updates
 - **Workflow Orchestrator**: Short-lived agent contexts + artifact passing reduce cache token costs ~95% vs monolithic sessions
 - **Recursive Language Models (RLM)**: Hybrid BM25+semantic search + distributed LLM dispatch + WASM/Docker sandbox enables reasoning over 100k+ token documents
+- **Autonomous Scheduling**: `chrono-tz` IANA-aware cron evaluation + SurrealDB CAS fire-lock eliminates double-fires in multi-instance deployments without external lock infrastructure
+- **Notification Channels**: Trait-based webhook delivery with `${VAR}` secret resolution built into `ChannelRegistry` construction — operators get real-time Slack/Discord/Telegram alerts with zero new infrastructure
 
 ### 🔧 Development Patterns
 
diff --git a/docs/features/README.md b/docs/features/README.md
index 7ba8fd6..c2318ef 100644
--- a/docs/features/README.md
+++ b/docs/features/README.md
@@ -5,3 +5,5 @@ VAPORA capabilities and overview documentation.
 ## Contents
 
 - **[Features Overview](overview.md)** — Complete feature list and descriptions including learning-based agent selection, cost optimization, and swarm coordination
+- **[Workflow Orchestrator](workflow-orchestrator.md)** — Multi-stage pipelines, approval gates, artifacts, autonomous scheduling, and distributed fire-lock
+- **[Notification Channels](notification-channels.md)** — Webhook delivery to Slack, Discord, and Telegram with built-in secret resolution
diff --git a/docs/features/notification-channels.md b/docs/features/notification-channels.md
new file mode 100644
index 0000000..9320f77
--- /dev/null
+++ b/docs/features/notification-channels.md
@@ -0,0 +1,236 @@
+# Notification Channels
+
+Real-time outbound alerts to Slack, Discord, and Telegram via webhook delivery.
+
+## Overview
+
+`vapora-channels` provides a trait-based webhook notification layer. When VAPORA events occur (task completion, proposal decisions, workflow lifecycle), configured channels receive a message immediately — no polling required.
+
+**Key properties**:
+
+- No vendor SDKs — plain HTTP POST to webhook URLs
+- Secret tokens resolved from environment variables at startup; a raw `${VAR}` placeholder never reaches the HTTP layer
+- Fire-and-forget delivery: channel failures never surface as API errors
+
+## Configuration
+
+All channel configuration lives in `vapora.toml`.
+
+### Declaring channels
+
+```toml
+[channels.team-slack]
+type = "slack"
+webhook_url = "${SLACK_WEBHOOK_URL}"
+
+[channels.ops-discord]
+type = "discord"
+webhook_url = "${DISCORD_WEBHOOK_URL}"
+
+[channels.alerts-telegram]
+type = "telegram"
+bot_token = "${TELEGRAM_BOT_TOKEN}"
+chat_id   = "${TELEGRAM_CHAT_ID}"
+```
+
+Channel names (`team-slack`, `ops-discord`, `alerts-telegram`) are arbitrary identifiers used in event routing below.
+
+### Routing events to channels
+
+```toml
+[notifications]
+on_task_done         = ["team-slack"]
+on_proposal_approved = ["team-slack", "ops-discord"]
+on_proposal_rejected = ["ops-discord"]
+```
+
+Each key is an event name; the value is a list of channel names declared in `[channels.*]`. An empty list or absent key means no notification for that event.
+
+### Workflow lifecycle notifications
+
+Per-workflow notification targets are set in the workflow template:
+
+```toml
+[[workflows]]
+name = "nightly_analysis"
+trigger = "schedule"
+
+[workflows.nightly_analysis.notifications]
+on_stage_complete = ["team-slack"]
+on_stage_failed   = ["team-slack", "ops-discord"]
+on_completed      = ["team-slack"]
+on_cancelled      = ["ops-discord"]
+```
+
+## Secret Resolution
+
+Token values in `[channels.*]` blocks are interpolated from the environment before any network call is made. Two syntaxes are supported:
+
+| Syntax | Behaviour |
+|--------|-----------|
+| `"${VAR}"` | Replaced with `$VAR`; startup fails if the variable is unset |
+| `"${VAR:-default}"` | Replaced with `$VAR` if set, otherwise `default` |
+
+Resolution happens inside `ChannelRegistry::from_config` — the single mandatory call site. There is no way to construct a registry with an unresolved placeholder.
+
+**Example**:
+
+```bash
+export SLACK_WEBHOOK_URL="https://hooks.slack.com/services/T.../..."
+export DISCORD_WEBHOOK_URL="https://discord.com/api/webhooks/..."
+export TELEGRAM_BOT_TOKEN="123456:ABC..."
+export TELEGRAM_CHAT_ID="-1001234567890"
+```
+
+If a required variable is absent and no default is provided, VAPORA exits at startup with:
+
+```text
+Error: Secret reference '${SLACK_WEBHOOK_URL}' not resolved: env var not set and no default provided
+```
+
+## Supported Channel Types
+
+### Slack
+
+Uses the [Incoming Webhooks](https://api.slack.com/messaging/webhooks) API. The webhook URL is obtained from Slack's app configuration.
+
+```toml
+[channels.my-slack]
+type        = "slack"
+webhook_url = "${SLACK_WEBHOOK_URL}"
+```
+
+Payload format: `{ "text": "**Title**\nBody" }`. No SDK dependency.
+
+### Discord
+
+Uses the [Discord Webhook](https://discord.com/developers/docs/resources/webhook) endpoint. The webhook URL includes the token — obtain it from the channel's Integrations settings.
+
+```toml
+[channels.my-discord]
+type        = "discord"
+webhook_url = "${DISCORD_WEBHOOK_URL}"
+```
+
+Payload format: `{ "embeds": [{ "title": "...", "description": "...", "color":  }] }`.
+
+### Telegram
+
+Uses the [Bot API](https://core.telegram.org/bots/api#sendmessage) `sendMessage` endpoint. Requires a bot token from `@BotFather` and the numeric chat ID of the target group or channel.
+
+```toml
+[channels.my-telegram]
+type      = "telegram"
+bot_token = "${TELEGRAM_BOT_TOKEN}"
+chat_id   = "${TELEGRAM_CHAT_ID}"
+```
+
+Payload format: `{ "chat_id": "...", "text": "**Title**\nBody", "parse_mode": "Markdown" }`.
+
+## Message Levels
+
+Every notification carries a level that controls colour and emoji in the rendered message:
+
+| Level | Constructor | Use case |
+|-------|-------------|----------|
+| `Info` | `Message::info(title, body)` | General status updates |
+| `Success` | `Message::success(title, body)` | Task done, workflow completed |
+| `Warning` | `Message::warning(title, body)` | Proposal rejected, stage failed |
+| `Error` | `Message::error(title, body)` | Unrecoverable failure |
+
+## REST API
+
+Two endpoints are available under `/api/v1/channels`:
+
+### List channels
+
+```http
+GET /api/v1/channels
+```
+
+Returns the names of all registered channels (sorted alphabetically). Returns an empty list when no channels are configured.
+
+**Response**:
+
+```json
+{
+  "channels": ["ops-discord", "team-slack"]
+}
+```
+
+### Test a channel
+
+```http
+POST /api/v1/channels/:name/test
+```
+
+Sends a connectivity test message to the named channel and returns synchronously.
+
+| Status | Meaning |
+|--------|---------|
+| `200 OK` | Message delivered successfully |
+| `404 Not Found` | Channel name unknown or no channels configured |
+| `502 Bad Gateway` | Delivery attempt failed at the remote platform |
+
+**Example**:
+
+```bash
+curl -X POST http://localhost:8001/api/v1/channels/team-slack/test
+```
+
+Expected Slack message: `Test notification — Connectivity test from VAPORA backend for channel 'team-slack'`
+
+## Delivery Semantics
+
+Delivery is **fire-and-forget**: `AppState::notify` spawns a background Tokio task and returns immediately. The API response does not wait for webhook delivery to complete.
+
+Behaviour on failure:
+
+- Unknown channel name: `warn!` log, delivery to other targets continues
+- HTTP error from the remote platform: `warn!` log, delivery to other targets continues
+- No channels configured (`channel_registry = None`): silent no-op
+
+There is no built-in retry. A channel that is consistently unreachable produces `warn!` log lines but no escalation. Use the `/test` endpoint to confirm connectivity after configuration changes.
+
+## Events Reference
+
+| Event key | Trigger | Default level |
+|-----------|---------|---------------|
+| `on_task_done` | Task moved to `Done` status | `Success` |
+| `on_proposal_approved` | Proposal approved via API | `Success` |
+| `on_proposal_rejected` | Proposal rejected via API | `Warning` |
+| `on_stage_complete` | Workflow stage finished | `Info` |
+| `on_stage_failed` | Workflow stage failed | `Warning` |
+| `on_completed` | Workflow reached terminal `Completed` state | `Success` |
+| `on_cancelled` | Workflow cancelled | `Warning` |
+
+## Troubleshooting
+
+### Channel not receiving messages
+
+1. Verify the channel name in `[notifications]` matches the name in `[channels.*]` exactly (case-sensitive).
+2. Confirm the env variable is set: `echo $SLACK_WEBHOOK_URL`.
+3. Send a test message: `POST /api/v1/channels//test`.
+4. Check backend logs for `warn` entries with `channel = ""`.
+
+### Startup fails with `SecretNotFound`
+
+The env variable referenced in `webhook_url` or `bot_token`/`chat_id` is not set. Either export the variable or add a default value:
+
+```toml
+webhook_url = "${SLACK_WEBHOOK_URL:-https://hooks.slack.com/...}"
+```
+
+### Discord returns 400
+
+The webhook URL must end with `/slack` for Slack-compatible mode, or be the raw Discord webhook URL. Ensure the URL copied from Discord's channel settings is used without modification.
+
+### Telegram chat_id not found
+
+The bot must be a member of the target group or channel. For groups, prefix the numeric ID with `-` (e.g. `-1001234567890`). Use `@userinfobot` in Telegram to retrieve your chat ID.
+
+## Related Documentation
+
+- [Workflow Orchestrator](./workflow-orchestrator.md) — workflow lifecycle events and notification config
+- [ADR-0035: Notification Channels](../adrs/0035-notification-channels.md) — design rationale
+- [ADR-0011: SecretumVault](../adrs/0011-secretumvault.md) — secret management philosophy
diff --git a/justfiles/rust-axum b/justfiles/rust-axum
new file mode 120000
index 0000000..92dcf27
--- /dev/null
+++ b/justfiles/rust-axum
@@ -0,0 +1 @@
+/Users/Akasha/Tools/dev-system/languages/rust/just-modules/axum
\ No newline at end of file
diff --git a/justfiles/rust-cargo b/justfiles/rust-cargo
new file mode 120000
index 0000000..3d031bf
--- /dev/null
+++ b/justfiles/rust-cargo
@@ -0,0 +1 @@
+/Users/Akasha/Tools/dev-system/languages/rust/just-modules/cargo
\ No newline at end of file
diff --git a/justfiles/rust-leptos b/justfiles/rust-leptos
new file mode 120000
index 0000000..29df629
--- /dev/null
+++ b/justfiles/rust-leptos
@@ -0,0 +1 @@
+/Users/Akasha/Tools/dev-system/languages/rust/just-modules/leptos
\ No newline at end of file