From 27a290b3697d1439dfc99a2d065c87bead70686d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jesu=CC=81s=20Pe=CC=81rez?= Date: Thu, 26 Feb 2026 15:32:44 +0000 Subject: [PATCH] feat(kg,channels): hybrid search + agent-inactive notifications MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - KG: HNSW + BM25 + RRF(k=60) hybrid search via SurrealDB 3 native indexes - Fix schema bug: kg_executions missing agent_role/provider/cost_cents (silent empty reads) - channels: on_agent_inactive hook (AgentStatus::Inactive → Message::error) - migration 012: adds missing fields + HNSW + BM25 indexes - docs: ADR-0036, update ADR-0035 + notification-channels feature doc --- CHANGELOG.md | 31 ++ README.md | 11 +- crates/vapora-backend/src/api/agents.rs | 12 +- crates/vapora-backend/src/config.rs | 4 + crates/vapora-knowledge-graph/src/lib.rs | 6 +- crates/vapora-knowledge-graph/src/models.rs | 17 + .../vapora-knowledge-graph/src/persistence.rs | 340 +++++++++++++++++- docs/adrs/0035-notification-channels.md | 4 +- docs/adrs/0036-kg-hybrid-search.md | 181 ++++++++++ docs/adrs/README.md | 1 + docs/features/notification-channels.md | 2 + migrations/012_kg_hybrid_search.surql | 31 ++ 12 files changed, 617 insertions(+), 23 deletions(-) create mode 100644 docs/adrs/0036-kg-hybrid-search.md create mode 100644 migrations/012_kg_hybrid_search.surql diff --git a/CHANGELOG.md b/CHANGELOG.md index 5943392..a30c4d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,37 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 --- +### Added - Knowledge Graph Hybrid Search (HNSW + BM25 + RRF) + +#### `vapora-knowledge-graph` — real retrieval replaces stub + +- `find_similar_executions`: was returning recent records ordered by timestamp; now uses SurrealDB 3 HNSW ANN query (`<|100,64|>`) against the `embedding` field +- `hybrid_search`: new method combining HNSW semantic + BM25 lexical via RRF(k=60) fusion; returns `Vec` with individual `semantic_score`, `lexical_score`, `hybrid_score`, and rank fields +- `find_similar_rlm_tasks`: was ignoring `query_embedding`; now uses in-memory cosine similarity over SCHEMALESS `rlm_executions` records +- `HybridSearchResult` added to `models.rs` and re-exported from `lib.rs` +- 5 new unit tests: `cosine_similarity` edge cases (orthogonal, identical, empty, partial) + RRF fusion consensus validation + +#### `migrations/012_kg_hybrid_search.surql` — schema fix + indexes + +- **Schema bug fixed**: `kg_executions` (SCHEMAFULL) was missing `agent_role`, `provider`, `cost_cents` — SurrealDB silently dropped these fields on INSERT, causing all reads to fail deserialization silently; all three fields now declared +- `DEFINE ANALYZER kg_text_analyzer` — `class` tokenizer + `lowercase` + `snowball(english)` filters +- `DEFINE INDEX idx_kg_executions_ft` — BM25 full-text index on `task_description` +- `DEFINE INDEX idx_kg_executions_hnsw` — HNSW index on `embedding` (1536-dim, cosine, F32, M=16, EF=200) + +#### Documentation + +- **ADR-0036**: documents HNSW+BM25+RRF decision, the schema bug root cause, and why `stratum-embeddings` brute-force is unsuitable for unbounded KG datasets + +--- + +### Added - `on_agent_inactive` Notification Hook + +- `NotificationConfig` gains `on_agent_inactive: Vec` — fires when `update_agent_status` transitions an agent to `AgentStatus::Inactive` +- `update_agent_status` handler in `agents.rs` fires `Message::error("Agent Inactive", ...)` via `state.notify` +- Docs: `on_agent_inactive` added to Events Reference table in `docs/features/notification-channels.md` and to the backend integration section in ADR-0035 + +--- + ### Added - Autonomous Scheduling: Timezone Support and Distributed Fire-Lock #### `vapora-workflow-engine` — scheduling hardening diff --git a/README.md b/README.md index 8161f42..5ee4546 100644 --- a/README.md +++ b/README.md @@ -86,10 +86,19 @@ - **Chunking Strategies**: Fixed-size, semantic (sentence-aware), code-aware (AST-based for Rust/Python/JS) - **Sandbox Execution**: WASM tier (<10ms) + Docker tier (80-150ms) with automatic tier selection - **Multi-Provider LLM**: OpenAI, Claude, Ollama integration with cost tracking -- **Knowledge Graph**: Execution history persistence with learning curves +- **Knowledge Graph**: Temporal execution history with hybrid retrieval — HNSW (semantic) + BM25 (lexical) + RRF fusion via SurrealDB 3 native indexes - **Production Ready**: 38/38 tests passing, 0 clippy warnings, real SurrealDB persistence - **Cost Efficient**: Chunk-based processing reduces token usage vs full-document LLM calls +### 🔔 Webhook Notification Channels + +- **Three providers**: Slack (Incoming Webhook), Discord (Webhook embed), Telegram (Bot API) — no vendor SDKs, plain HTTP POST +- **Secret resolution at startup**: `${VAR}` / `${VAR:-default}` interpolation built into `ChannelRegistry`; a raw placeholder never reaches the HTTP layer +- **Fire-and-forget**: channel failures never surface as API errors; delivery is logged and continues for remaining targets +- **Backend events**: `on_task_done`, `on_proposal_approved`, `on_proposal_rejected`, `on_agent_inactive` +- **Workflow events**: `on_stage_complete`, `on_stage_failed`, `on_completed`, `on_cancelled` — per-workflow routing config +- **REST API**: `GET /api/v1/channels` (list), `POST /api/v1/channels/:name/test` (connectivity check) + ### 🧠 Intelligent Learning & Cost Optimization (Phase 5.3 + 5.4) - **Per-Task-Type Learning**: Agents build expertise profiles from execution history diff --git a/crates/vapora-backend/src/api/agents.rs b/crates/vapora-backend/src/api/agents.rs index b805648..417ffeb 100644 --- a/crates/vapora-backend/src/api/agents.rs +++ b/crates/vapora-backend/src/api/agents.rs @@ -7,6 +7,7 @@ use axum::{ Json, }; use serde::Deserialize; +use vapora_channels::Message; use vapora_shared::models::{Agent, AgentStatus}; use crate::api::state::AppState; @@ -90,8 +91,17 @@ pub async fn update_agent_status( ) -> ApiResult { let updated = state .agent_service - .update_agent_status(&id, payload.status) + .update_agent_status(&id, payload.status.clone()) .await?; + + if payload.status == AgentStatus::Inactive { + let msg = Message::error( + "Agent Inactive", + format!("Agent '{}' has transitioned to inactive", id), + ); + state.notify(&state.notification_config.clone().on_agent_inactive, msg); + } + Ok(Json(updated)) } diff --git a/crates/vapora-backend/src/config.rs b/crates/vapora-backend/src/config.rs index 60de8ce..d0c4ca9 100644 --- a/crates/vapora-backend/src/config.rs +++ b/crates/vapora-backend/src/config.rs @@ -35,6 +35,7 @@ pub struct Config { /// on_task_done = ["team-slack"] /// on_proposal_approved = ["team-slack"] /// on_proposal_rejected = ["team-slack", "ops-telegram"] +/// on_agent_inactive = ["ops-telegram"] /// ``` #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct NotificationConfig { @@ -44,6 +45,9 @@ pub struct NotificationConfig { pub on_proposal_approved: Vec, #[serde(default)] pub on_proposal_rejected: Vec, + /// Fires when an agent transitions to `AgentStatus::Inactive`. + #[serde(default)] + pub on_agent_inactive: Vec, } /// Server configuration diff --git a/crates/vapora-knowledge-graph/src/lib.rs b/crates/vapora-knowledge-graph/src/lib.rs index a210e63..e2d33a9 100644 --- a/crates/vapora-knowledge-graph/src/lib.rs +++ b/crates/vapora-knowledge-graph/src/lib.rs @@ -20,7 +20,11 @@ pub use analytics::{ pub use error::{KGError, Result}; pub use learning::{apply_recency_bias, calculate_learning_curve}; pub use metrics::{AnalyticsComputation, TimePeriod}; -pub use models::*; +pub use models::{ + AgentProfile, CausalRelationship, ExecutionRecord, ExecutionRelation, GraphStatistics, + HybridSearchResult, ProviderAnalytics, ProviderCostForecast, ProviderEfficiency, + ProviderTaskTypeMetrics, Recommendation, SimilarityResult, +}; pub use persistence::{ KGPersistence, PersistedExecution, PersistedRlmExecution, RlmExecutionBuilder, }; diff --git a/crates/vapora-knowledge-graph/src/models.rs b/crates/vapora-knowledge-graph/src/models.rs index 2707810..0fcc495 100644 --- a/crates/vapora-knowledge-graph/src/models.rs +++ b/crates/vapora-knowledge-graph/src/models.rs @@ -128,6 +128,23 @@ pub struct ProviderTaskTypeMetrics { pub avg_duration_ms: f64, } +/// Result from hybrid search combining HNSW semantic and BM25 lexical retrieval +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HybridSearchResult { + pub execution: crate::persistence::PersistedExecution, + /// Cosine similarity score from HNSW vector search [0.0, 1.0] + pub semantic_score: f64, + /// BM25 relevance score from full-text search (unnormalized, higher = more + /// relevant) + pub lexical_score: f64, + /// RRF fused score: sum of 1/(60 + rank) across both methods + pub hybrid_score: f64, + /// 1-indexed position in the semantic ranked list (0 if absent) + pub semantic_rank: usize, + /// 1-indexed position in the lexical ranked list (0 if absent) + pub lexical_rank: usize, +} + /// Provider cost forecast #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ProviderCostForecast { diff --git a/crates/vapora-knowledge-graph/src/persistence.rs b/crates/vapora-knowledge-graph/src/persistence.rs index b4d447b..830d9c2 100644 --- a/crates/vapora-knowledge-graph/src/persistence.rs +++ b/crates/vapora-knowledge-graph/src/persistence.rs @@ -310,27 +310,227 @@ impl KGPersistence { Ok(()) } - /// Load historical executions for similar task (using vector similarity) + /// Find similar executions using SurrealDB 3 HNSW approximate nearest + /// neighbor search. + /// + /// The `<|100,64|>` operator retrieves 100 HNSW candidates with ef=64 + /// search expansion, then reranks by exact cosine similarity. Falls + /// back to recent successful executions when no embedding is provided. pub async fn find_similar_executions( &self, - _embedding: &[f32], + embedding: &[f32], limit: usize, ) -> anyhow::Result> { - debug!("Searching for similar executions (limit: {})", limit); - - // SurrealDB vector similarity queries require different syntax - // For now, return recent successful executions - let query = format!( - "SELECT * FROM kg_executions WHERE outcome = 'success' LIMIT {}", + debug!( + "HNSW semantic search for similar executions (limit: {})", limit ); - let mut response = self.db.query(&query).await?; + if embedding.is_empty() { + let query = format!( + "SELECT * FROM kg_executions WHERE outcome = 'success' ORDER BY executed_at DESC \ + LIMIT {limit}" + ); + let mut response = self.db.query(&query).await?; + let raw: Vec = response.take(0)?; + return Ok(raw + .into_iter() + .filter_map(|v| serde_json::from_value(v).ok()) + .collect()); + } + + let results = self + .search_by_embedding_inner(embedding, limit) + .await? + .into_iter() + .map(|(exec, _score)| exec) + .collect(); + + Ok(results) + } + + /// Hybrid search combining HNSW vector similarity and BM25 full-text + /// search, fused via Reciprocal Rank Fusion (RRF, k=60). + /// + /// Requires migration 012 (HNSW + BM25 indexes) to be applied. + pub async fn hybrid_search( + &self, + embedding: &[f32], + text_query: &str, + limit: usize, + ) -> anyhow::Result> { + use std::collections::HashMap; + + let candidate_limit = (limit * 3).max(30); + + let semantic_results = if !embedding.is_empty() { + self.search_by_embedding_inner(embedding, candidate_limit) + .await + .unwrap_or_default() + } else { + Vec::new() + }; + + let lexical_results = if !text_query.trim().is_empty() { + self.search_by_text_inner(text_query, candidate_limit) + .await + .unwrap_or_default() + } else { + Vec::new() + }; + + if semantic_results.is_empty() && lexical_results.is_empty() { + return Ok(Vec::new()); + } + + let semantic_ranked: Vec = semantic_results + .iter() + .map(|(e, _)| e.execution_id.clone()) + .collect(); + let lexical_ranked: Vec = lexical_results + .iter() + .map(|(e, _)| e.execution_id.clone()) + .collect(); + + let semantic_score_map: HashMap = semantic_results + .iter() + .map(|(e, s)| (e.execution_id.clone(), *s)) + .collect(); + let lexical_score_map: HashMap = lexical_results + .iter() + .map(|(e, s)| (e.execution_id.clone(), *s)) + .collect(); + + // Merge execution records without double-fetching (prefer semantic) + let mut exec_map: HashMap = HashMap::new(); + for (exec, _) in semantic_results.into_iter().chain(lexical_results) { + exec_map.entry(exec.execution_id.clone()).or_insert(exec); + } + + // Collect unique IDs for RRF + let mut all_ids: Vec = Vec::with_capacity(exec_map.len()); + let mut seen = std::collections::HashSet::new(); + for id in semantic_ranked.iter().chain(lexical_ranked.iter()) { + if seen.insert(id.clone()) { + all_ids.push(id.clone()); + } + } + + let mut fused: Vec = all_ids + .into_iter() + .filter_map(|id| { + // RRF: 1-indexed ranks; absent items get rank = list_len + 1 + let sem_rank = semantic_ranked + .iter() + .position(|x| x == &id) + .map_or(semantic_ranked.len() + 1, |r| r + 1); + let lex_rank = lexical_ranked + .iter() + .position(|x| x == &id) + .map_or(lexical_ranked.len() + 1, |r| r + 1); + + const K: f64 = 60.0; + let hybrid_score = 1.0 / (K + sem_rank as f64) + 1.0 / (K + lex_rank as f64); + + let execution = exec_map.remove(&id)?; + + Some(crate::models::HybridSearchResult { + semantic_score: semantic_score_map.get(&id).copied().unwrap_or(0.0), + lexical_score: lexical_score_map.get(&id).copied().unwrap_or(0.0), + hybrid_score, + semantic_rank: sem_rank, + lexical_rank: lex_rank, + execution, + }) + }) + .collect(); + + fused.sort_by(|a, b| { + b.hybrid_score + .partial_cmp(&a.hybrid_score) + .unwrap_or(std::cmp::Ordering::Equal) + }); + fused.truncate(limit); + + debug!( + "Hybrid search returned {} results (semantic: {}, lexical: {} unique candidates)", + fused.len(), + semantic_ranked.len(), + lexical_ranked.len() + ); + + Ok(fused) + } + + /// HNSW approximate nearest neighbor search on `embedding` field. + /// + /// `<|100,64|>`: retrieve 100 candidates from HNSW index with ef=64. + /// Results are reranked by exact cosine similarity before returning. + async fn search_by_embedding_inner( + &self, + embedding: &[f32], + limit: usize, + ) -> anyhow::Result> { + let mut response = self + .db + .query( + "SELECT *, vector::similarity::cosine(embedding, $q) AS cosine_score FROM \ + kg_executions WHERE embedding <|100,64|> $q ORDER BY cosine_score DESC", + ) + .bind(("q", embedding.to_vec())) + .await?; + let raw: Vec = response.take(0)?; let results = raw .into_iter() - .filter_map(|v| serde_json::from_value(v).ok()) + .filter_map(|mut v| { + let score = v.get("cosine_score")?.as_f64()?; + if let Some(obj) = v.as_object_mut() { + obj.remove("cosine_score"); + obj.remove("id"); + } + let exec: PersistedExecution = serde_json::from_value(v).ok()?; + Some((exec, score)) + }) + .take(limit) .collect(); + + Ok(results) + } + + /// BM25 full-text search on `task_description` field. + /// + /// Requires `idx_kg_executions_ft` index (migration 012). + /// The `@1@` predicate tag pairs with `search::score(1)` for BM25 scoring. + async fn search_by_text_inner( + &self, + text_query: &str, + limit: usize, + ) -> anyhow::Result> { + let mut response = self + .db + .query( + "SELECT *, search::score(1) AS bm25_score FROM kg_executions WHERE \ + task_description @1@ $text ORDER BY bm25_score DESC LIMIT 100", + ) + .bind(("text", text_query.to_string())) + .await?; + + let raw: Vec = response.take(0)?; + let results = raw + .into_iter() + .filter_map(|mut v| { + let score = v.get("bm25_score")?.as_f64().unwrap_or(0.0); + if let Some(obj) = v.as_object_mut() { + obj.remove("bm25_score"); + obj.remove("id"); + } + let exec: PersistedExecution = serde_json::from_value(v).ok()?; + Some((exec, score)) + }) + .take(limit) + .collect(); + Ok(results) } @@ -751,26 +951,49 @@ impl KGPersistence { .collect()) } - /// Find similar RLM tasks using query embedding similarity - /// Uses cosine similarity on query_embedding field + /// Find similar RLM tasks using cosine similarity on `query_embedding`. + /// + /// `rlm_executions` is SCHEMALESS with nullable `query_embedding`, making + /// HNSW indexing impractical. In-memory cosine similarity is used instead + /// since RLM datasets are per-document and bounded in size. pub async fn find_similar_rlm_tasks( &self, - _query_embedding: &[f32], + query_embedding: &[f32], limit: usize, ) -> anyhow::Result> { - debug!("Searching for similar RLM tasks (limit: {})", limit); - - let query = format!( - "SELECT * FROM rlm_executions WHERE success = true ORDER BY executed_at DESC LIMIT {}", + debug!( + "Cosine similarity search for similar RLM tasks (limit: {})", limit ); + let candidate_limit = (limit * 10).max(200); + let query = format!( + "SELECT * FROM rlm_executions WHERE success = true ORDER BY executed_at DESC LIMIT \ + {candidate_limit}" + ); let mut response = self.db.query(&query).await?; let raw: Vec = response.take(0)?; - Ok(raw + let candidates: Vec = raw .into_iter() .filter_map(|v| serde_json::from_value(v).ok()) - .collect()) + .collect(); + + if query_embedding.is_empty() || candidates.is_empty() { + return Ok(candidates.into_iter().take(limit).collect()); + } + + let mut scored: Vec<(PersistedRlmExecution, f64)> = candidates + .into_iter() + .filter_map(|exec| { + let emb = exec.query_embedding.as_deref()?; + let sim = cosine_similarity(query_embedding, emb); + Some((exec, sim)) + }) + .collect(); + + scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)); + + Ok(scored.into_iter().take(limit).map(|(e, _)| e).collect()) } /// Get RLM success rate for a specific document @@ -856,6 +1079,22 @@ impl KGPersistence { } } +/// Cosine similarity between two equal-length f32 vectors. +/// Returns 0.0 for empty or mismatched-dimension inputs. +fn cosine_similarity(a: &[f32], b: &[f32]) -> f64 { + if a.len() != b.len() || a.is_empty() { + return 0.0; + } + let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum(); + let norm_a: f32 = a.iter().map(|x| x * x).sum::().sqrt(); + let norm_b: f32 = b.iter().map(|x| x * x).sum::().sqrt(); + if norm_a > 0.0 && norm_b > 0.0 { + (dot / (norm_a * norm_b)) as f64 + } else { + 0.0 + } +} + #[cfg(test)] mod tests { use super::*; @@ -961,4 +1200,67 @@ mod tests { assert_eq!(execution.success(), true); assert_eq!(execution.duration_ms(), 1000); } + + #[test] + fn test_cosine_similarity_orthogonal() { + let a = vec![1.0_f32, 0.0, 0.0]; + let b = vec![0.0_f32, 1.0, 0.0]; + assert!((cosine_similarity(&a, &b) - 0.0).abs() < 1e-6); + } + + #[test] + fn test_cosine_similarity_identical() { + let v = vec![0.3_f32, 0.4, 0.5]; + let sim = cosine_similarity(&v, &v); + assert!((sim - 1.0).abs() < 1e-5, "identical vectors: {sim}"); + } + + #[test] + fn test_cosine_similarity_empty() { + assert_eq!(cosine_similarity(&[], &[]), 0.0); + assert_eq!(cosine_similarity(&[1.0], &[]), 0.0); + } + + #[test] + fn test_cosine_similarity_partial() { + // [1,0] · [0.5, 0.866] ≈ cos(60°) ≈ 0.5 + let a = vec![1.0_f32, 0.0]; + let b = vec![0.5_f32, 0.866]; + let sim = cosine_similarity(&a, &b); + assert!((sim - 0.5).abs() < 0.01, "got {sim}"); + } + + #[test] + fn test_rrf_fusion_consensus_wins() { + // Simulate the RRF logic: IDs in both lists should score higher + const K: f64 = 60.0; + let semantic = vec!["exec-a".to_string(), "exec-b".to_string()]; + let lexical = vec!["exec-b".to_string(), "exec-c".to_string()]; + + let score_for = |id: &str| { + let sem_rank = semantic + .iter() + .position(|x| x == id) + .map_or(semantic.len() + 1, |r| r + 1); + let lex_rank = lexical + .iter() + .position(|x| x == id) + .map_or(lexical.len() + 1, |r| r + 1); + 1.0 / (K + sem_rank as f64) + 1.0 / (K + lex_rank as f64) + }; + + let score_b = score_for("exec-b"); + let score_a = score_for("exec-a"); + let score_c = score_for("exec-c"); + + // exec-b appears in both lists, should rank highest + assert!( + score_b > score_a, + "exec-b ({score_b}) should beat exec-a ({score_a})" + ); + assert!( + score_b > score_c, + "exec-b ({score_b}) should beat exec-c ({score_c})" + ); + } } diff --git a/docs/adrs/0035-notification-channels.md b/docs/adrs/0035-notification-channels.md index fcea436..c392c6e 100644 --- a/docs/adrs/0035-notification-channels.md +++ b/docs/adrs/0035-notification-channels.md @@ -111,13 +111,15 @@ webhook_url = "${SLACK_WEBHOOK_URL}" on_task_done = ["team-slack"] on_proposal_approved = ["team-slack", "ops-discord"] on_proposal_rejected = ["ops-discord"] +on_agent_inactive = ["ops-telegram"] ``` -`AppState` gains `channel_registry: Option>` and `notification_config: Arc`. Hooks in three existing handlers: +`AppState` gains `channel_registry: Option>` and `notification_config: Arc`. Hooks in four existing handlers: - `update_task_status` — fires `Message::success` on `TaskStatus::Done` - `approve_proposal` — fires `Message::success` - `reject_proposal` — fires `Message::warning` +- `update_agent_status` — fires `Message::error` on `AgentStatus::Inactive` #### New REST Endpoints diff --git a/docs/adrs/0036-kg-hybrid-search.md b/docs/adrs/0036-kg-hybrid-search.md new file mode 100644 index 0000000..d228c69 --- /dev/null +++ b/docs/adrs/0036-kg-hybrid-search.md @@ -0,0 +1,181 @@ +# ADR-0036: Knowledge Graph Hybrid Search — HNSW + BM25 + RRF + +**Status**: Implemented +**Date**: 2026-02-26 +**Deciders**: VAPORA Team +**Technical Story**: `find_similar_executions` was a stub returning recent records; `find_similar_rlm_tasks` ignored embeddings entirely. A missing schema migration caused all `kg_executions` reads to silently fail deserialization. + +--- + +## Decision + +Replace the stub similarity functions in `KGPersistence` with a **hybrid retrieval pipeline** combining: + +1. **HNSW** (SurrealDB 3 native) — approximate nearest-neighbor vector search over `embedding` field +2. **BM25** (SurrealDB 3 native full-text search) — lexical scoring over `task_description` field +3. **Reciprocal Rank Fusion (RRF, k=60)** — scale-invariant score fusion + +Add migration `012_kg_hybrid_search.surql` that fixes a pre-existing schema bug (three fields missing from the `SCHEMAFULL` table) and defines the required indexes. + +--- + +## Context + +### The Stub Problem + +`find_similar_executions` in `persistence.rs` discarded its `embedding: &[f32]` argument entirely and returned the N most-recent successful executions, ordered by timestamp. Any caller relying on semantic proximity was silently receiving chronological results — a correctness bug, not a performance issue. + +### The Silent Schema Bug + +`kg_executions` was declared `SCHEMAFULL` in migration 005 but three fields used by `PersistedExecution` (`agent_role`, `provider`, `cost_cents`) were absent from the schema. SurrealDB drops undefined fields on `INSERT` in SCHEMAFULL tables. All subsequent `SELECT` queries returned records that failed `serde_json::from_value` deserialization, which was swallowed by `.filter_map(|v| v.ok())`. The persistence layer appeared to work (no errors) while returning empty results for every query. + +### Why Not `stratum-embeddings` SurrealDbStore + +`stratumiops/crates/stratum-embeddings/src/store/surrealdb.rs` implements vector search as a brute-force full-scan: it loads all records into memory and computes cosine similarity in-process. This works for document chunk retrieval (bounded dataset per document), but is unsuitable for the knowledge graph which accumulates unbounded execution records across all agents and tasks over time. + +### Why Hybrid Over Pure Semantic + +Embedding-only retrieval misses exact keyword matches: an agent searching for "cargo clippy warnings" may not find a record titled "clippy deny warnings fix" if the embedding model compresses the phrase differently than the query. BM25 handles exact token overlap that embeddings smooth over. + +--- + +## Alternatives Considered + +### ❌ Pure HNSW semantic search only + +- Misses exact keyword matches (e.g., specific error codes, crate names) +- Embedding quality varies across providers; degrades if provider changes + +### ❌ Pure BM25 lexical search only + +- Misses paraphrases and semantic variants ("task failed" vs "execution error") +- No relevance for structurally similar tasks with different wording + +### ❌ Tantivy / external FTS engine + +- Adds a new process dependency for a capability SurrealDB 3 provides natively +- Requires synchronizing two stores; adds operational complexity + +### ✅ SurrealDB 3 HNSW + BM25 + RRF (chosen) + +Single data store, two native index types, no new dependencies, no sync complexity. + +--- + +## Implementation + +### Migration 012 + +```sql +-- Fix missing fields causing silent deserialization failure +DEFINE FIELD agent_role ON TABLE kg_executions TYPE option; +DEFINE FIELD provider ON TABLE kg_executions TYPE string DEFAULT 'unknown'; +DEFINE FIELD cost_cents ON TABLE kg_executions TYPE int DEFAULT 0; + +-- BM25 full-text index on task_description +DEFINE ANALYZER kg_text_analyzer + TOKENIZERS class + FILTERS lowercase, snowball(english); + +DEFINE INDEX idx_kg_executions_ft + ON TABLE kg_executions + FIELDS task_description + SEARCH ANALYZER kg_text_analyzer BM25; + +-- HNSW ANN index on embedding (1536-dim, cosine, float32) +DEFINE INDEX idx_kg_executions_hnsw + ON TABLE kg_executions + FIELDS embedding + HNSW DIMENSION 1536 DIST COSINE TYPE F32 M 16 EF_CONSTRUCTION 200; +``` + +HNSW parameters: `M 16` (16 edges per node, standard for 1536-dim); `EF_CONSTRUCTION 200` (index build quality vs. insert speed; 200 is the standard default). + +### Query Patterns + +**HNSW semantic search** (`<|100,64|>` = 100 candidates, ef=64 at query time): + +```surql +SELECT *, vector::similarity::cosine(embedding, $q) AS cosine_score +FROM kg_executions +WHERE embedding <|100,64|> $q +ORDER BY cosine_score DESC +LIMIT 20 +``` + +**BM25 lexical search** (`@1@` assigns predicate ID 1; paired with `search::score(1)`): + +```surql +SELECT *, search::score(1) AS bm25_score +FROM kg_executions +WHERE task_description @1@ $text +ORDER BY bm25_score DESC +LIMIT 100 +``` + +### RRF Fusion + +Cosine similarity is bounded `[0.0, 1.0]`; BM25 is unbounded `[0, ∞)`. Linear blending requires per-corpus normalization. RRF is scale-invariant: + +``` +hybrid_score(id) = 1 / (60 + rank_semantic) + 1 / (60 + rank_lexical) +``` + +`k=60` is the standard constant (Robertson & Zaragoza, 2009). IDs absent from one ranked list receive rank 0, contributing `1/60` — never 0, preventing complete suppression of single-method results. + +### RLM Executions + +`rlm_executions` is `SCHEMALESS` with a nullable `query_embedding` field. HNSW indexes require a `SCHEMAFULL` table with a non-nullable typed field. `find_similar_rlm_tasks` uses in-memory cosine similarity: loads candidate records, filters those with non-empty embeddings, sorts by cosine score. Acceptable because the RLM dataset is bounded per document. + +### New Public API + +```rust +impl KGPersistence { + // Was stub (returned recent records). Now uses HNSW ANN query. + pub async fn find_similar_executions( + &self, + embedding: &[f32], + limit: usize, + ) -> anyhow::Result>; + + // New. HNSW + BM25 + RRF. Either argument may be empty (degrades gracefully). + pub async fn hybrid_search( + &self, + embedding: &[f32], + text_query: &str, + limit: usize, + ) -> anyhow::Result>; +} +``` + +`HybridSearchResult` exposes `semantic_score`, `lexical_score`, `hybrid_score`, `semantic_rank`, `lexical_rank` — callers can inspect individual signal contributions. + +--- + +## Consequences + +### Positive + +- `find_similar_executions` returns semantically similar past executions, not recent ones. The correctness bug is fixed. +- `hybrid_search` exposes both signals; callers can filter by `semantic_score ≥ 0.7` for high-confidence-only retrieval. +- No new dependencies. The two indexes are defined in a migration; no Rust dependency change. +- The schema bug fix means all existing `kg_executions` records round-trip correctly after migration 012 is applied. + +### Negative / Trade-offs + +- HNSW index build is `O(n log n)` in SurrealDB; large existing datasets will cause migration 012 to take longer than typical DDL migrations. No data migration is needed — only index creation. +- BM25 requires the `task_description` field to be populated at insert time. Records inserted before this migration with empty or null descriptions will not appear in lexical results. +- `rlm_executions` hybrid search remains in-memory. A future migration converting `rlm_executions` to SCHEMAFULL would enable native HNSW for that table too. + +### Supersedes + +- The stub implementation of `find_similar_executions` (existed since persistence.rs was written). +- Extends ADR-0013 (KG temporal design) with the retrieval layer decision. + +--- + +## Related + +- [ADR-0013: Knowledge Graph Temporal](./0013-knowledge-graph.md) — original KG design +- [ADR-0029: RLM Recursive Language Models](./0029-rlm-recursive-language-models.md) — RLM hybrid search (different use case: document chunks, not execution records) +- [ADR-0004: SurrealDB](./0004-surrealdb-database.md) — database foundation diff --git a/docs/adrs/README.md b/docs/adrs/README.md index 5a9417e..79ca4b5 100644 --- a/docs/adrs/README.md +++ b/docs/adrs/README.md @@ -83,6 +83,7 @@ Decisiones únicas que diferencian a VAPORA de otras plataformas de orquestació | [033](./0033-stratum-orchestrator-workflow-hardening.md) | Workflow Engine Hardening — Persistence · Saga · Cedar | SurrealDB persistence + Saga best-effort rollback + Cedar per-stage auth; stratum patterns implemented natively (no path dep) | ✅ Implemented | | [034](./0034-autonomous-scheduling.md) | Autonomous Scheduling — Timezone Support and Distributed Fire-Lock | `chrono-tz` IANA-aware cron evaluation + SurrealDB conditional UPDATE fire-lock; no external lock service required | ✅ Implemented | | [035](./0035-notification-channels.md) | Webhook-Based Notification Channels — `vapora-channels` Crate | Trait-based webhook delivery (Slack/Discord/Telegram) + `${VAR}` secret resolution built into `ChannelRegistry::from_config`; fire-and-forget via `tokio::spawn` | ✅ Implemented | +| [036](./0036-kg-hybrid-search.md) | Knowledge Graph Hybrid Search — HNSW + BM25 + RRF | SurrealDB 3 native HNSW vector index + BM25 full-text; RRF(k=60) fusion; fixes schema bug causing silent empty reads | ✅ Implemented | --- diff --git a/docs/features/notification-channels.md b/docs/features/notification-channels.md index 9320f77..39e8b17 100644 --- a/docs/features/notification-channels.md +++ b/docs/features/notification-channels.md @@ -42,6 +42,7 @@ Channel names (`team-slack`, `ops-discord`, `alerts-telegram`) are arbitrary ide on_task_done = ["team-slack"] on_proposal_approved = ["team-slack", "ops-discord"] on_proposal_rejected = ["ops-discord"] +on_agent_inactive = ["ops-telegram"] ``` Each key is an event name; the value is a list of channel names declared in `[channels.*]`. An empty list or absent key means no notification for that event. @@ -199,6 +200,7 @@ There is no built-in retry. A channel that is consistently unreachable produces | `on_task_done` | Task moved to `Done` status | `Success` | | `on_proposal_approved` | Proposal approved via API | `Success` | | `on_proposal_rejected` | Proposal rejected via API | `Warning` | +| `on_agent_inactive` | Agent status transitions to `Inactive` | `Error` | | `on_stage_complete` | Workflow stage finished | `Info` | | `on_stage_failed` | Workflow stage failed | `Warning` | | `on_completed` | Workflow reached terminal `Completed` state | `Success` | diff --git a/migrations/012_kg_hybrid_search.surql b/migrations/012_kg_hybrid_search.surql new file mode 100644 index 0000000..c97b2c8 --- /dev/null +++ b/migrations/012_kg_hybrid_search.surql @@ -0,0 +1,31 @@ +-- Migration 012: Knowledge Graph Hybrid Search +-- Adds HNSW vector index and BM25 full-text index for hybrid retrieval +-- Fixes missing fields in kg_executions (agent_role, provider, cost_cents) +-- that caused deserialization failures on SELECT queries. + +-- Missing fields added to make kg_executions round-trip correctly +DEFINE FIELD agent_role ON TABLE kg_executions TYPE option; +DEFINE FIELD provider ON TABLE kg_executions TYPE string DEFAULT 'unknown'; +DEFINE FIELD cost_cents ON TABLE kg_executions TYPE int DEFAULT 0; + +-- BM25 full-text search on task descriptions +-- class tokenizer preserves word boundaries (code terms, identifiers) +-- snowball(english) reduces "compiling" → "compil" for better recall +DEFINE ANALYZER kg_text_analyzer + TOKENIZERS class + FILTERS lowercase, snowball(english); + +DEFINE INDEX idx_kg_executions_ft + ON TABLE kg_executions + FIELDS task_description + SEARCH ANALYZER kg_text_analyzer BM25; + +-- HNSW approximate nearest neighbor index for semantic similarity +-- DIST COSINE: matches cosine similarity used throughout the codebase +-- TYPE F32: float32 embeddings (OpenAI ada-002 / compatible providers) +-- M 16: graph connectivity (16 edges per node, standard for 1536-dim) +-- EF_CONSTRUCTION 200: index build quality vs speed tradeoff +DEFINE INDEX idx_kg_executions_hnsw + ON TABLE kg_executions + FIELDS embedding + HNSW DIMENSION 1536 DIST COSINE TYPE F32 M 16 EF_CONSTRUCTION 200;