Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

ADR-019: Temporal Execution History con Daily Windowing

Status: Accepted | Implemented Date: 2024-11-01 Deciders: Knowledge Graph Team Technical Story: Tracking agent execution history with daily aggregation for learning curves


Decision

Implementar temporal execution history con daily windowed aggregations para computar learning curves.


Rationale

  1. Learning Curves: Daily aggregations permiten ver trends (improving/stable/declining)
  2. Causal Reasoning: Histórico permite rastrear problemas a raíz
  3. Temporal Analysis: Comparer performance across days/weeks
  4. Efficient Queries: Daily windows permiten group-by queries eficientes

Alternatives Considered

❌ Per-Execution Only (No Aggregation)

  • Pros: Maximum detail
  • Cons: Queries slow, hard to identify trends

❌ Monthly Aggregation Only

  • Pros: Compact
  • Cons: Misses weekly trends, loses detail

✅ Daily Windows (CHOSEN)

  • Good balance: detail + trend visibility

Trade-offs

Pros:

  • ✅ Trends visible at daily granularity
  • ✅ Learning curves computable
  • ✅ Efficient aggregation queries
  • ✅ Retention policy compatible

Cons:

  • ⚠️ Storage overhead (daily windows)
  • ⚠️ Intra-day trends hidden (needs hourly for detail)
  • ⚠️ Rollup complexity

Implementation

Execution Record Model:

#![allow(unused)]
fn main() {
// crates/vapora-knowledge-graph/src/models.rs

pub struct ExecutionRecord {
    pub id: String,
    pub agent_id: String,
    pub task_id: String,
    pub task_type: String,
    pub success: bool,
    pub quality_score: f32,
    pub latency_ms: u32,
    pub cost_cents: u32,
    pub timestamp: DateTime<Utc>,
    pub daily_window: String,  // YYYY-MM-DD
}

pub struct DailyAggregation {
    pub id: String,
    pub agent_id: String,
    pub task_type: String,
    pub day: String,           // YYYY-MM-DD
    pub execution_count: u32,
    pub success_count: u32,
    pub success_rate: f32,
    pub avg_quality: f32,
    pub avg_latency_ms: f32,
    pub total_cost_cents: u32,
    pub trend: TrendDirection,  // Improving, Stable, Declining
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TrendDirection {
    Improving,
    Stable,
    Declining,
}
}

Recording Execution:

#![allow(unused)]
fn main() {
pub async fn record_execution(
    db: &Surreal<Ws>,
    record: ExecutionRecord,
) -> Result<String> {
    // Set daily_window automatically
    let mut record = record;
    record.daily_window = record.timestamp.format("%Y-%m-%d").to_string();

    // Insert execution record
    let id = db
        .create("executions")
        .content(&record)
        .await?
        .id
        .unwrap();

    // Trigger daily aggregation (async)
    tokio::spawn(aggregate_daily_window(
        db.clone(),
        record.agent_id.clone(),
        record.task_type.clone(),
        record.daily_window.clone(),
    ));

    Ok(id)
}
}

Daily Aggregation:

#![allow(unused)]
fn main() {
pub async fn aggregate_daily_window(
    db: Surreal<Ws>,
    agent_id: String,
    task_type: String,
    day: String,
) -> Result<()> {
    // Query all executions for this day/agent/tasktype
    let executions = db
        .query(
            "SELECT * FROM executions \
             WHERE agent_id = $1 AND task_type = $2 AND daily_window = $3"
        )
        .bind((&agent_id, &task_type, &day))
        .await?
        .take::<Vec<ExecutionRecord>>(0)?
        .unwrap_or_default();

    if executions.is_empty() {
        return Ok(());
    }

    // Compute aggregates
    let execution_count = executions.len() as u32;
    let success_count = executions.iter().filter(|e| e.success).count() as u32;
    let success_rate = success_count as f32 / execution_count as f32;
    let avg_quality: f32 = executions.iter().map(|e| e.quality_score).sum::<f32>() / execution_count as f32;
    let avg_latency_ms: f32 = executions.iter().map(|e| e.latency_ms as f32).sum::<f32>() / execution_count as f32;
    let total_cost_cents: u32 = executions.iter().map(|e| e.cost_cents).sum();

    // Compute trend (compare to yesterday)
    let yesterday = (chrono::NaiveDate::parse_from_str(&day, "%Y-%m-%d")?
        - chrono::Duration::days(1))
        .format("%Y-%m-%d")
        .to_string();

    let yesterday_agg = db
        .query(
            "SELECT success_rate FROM daily_aggregations \
             WHERE agent_id = $1 AND task_type = $2 AND day = $3"
        )
        .bind((&agent_id, &task_type, &yesterday))
        .await?
        .take::<Vec<DailyAggregation>>(0)?;

    let trend = if let Some(prev) = yesterday_agg.first() {
        let change = success_rate - prev.success_rate;
        if change > 0.05 {
            TrendDirection::Improving
        } else if change < -0.05 {
            TrendDirection::Declining
        } else {
            TrendDirection::Stable
        }
    } else {
        TrendDirection::Stable
    };

    // Create or update aggregation record
    let agg = DailyAggregation {
        id: format!("{}-{}-{}", &agent_id, &task_type, &day),
        agent_id,
        task_type,
        day,
        execution_count,
        success_count,
        success_rate,
        avg_quality,
        avg_latency_ms,
        total_cost_cents,
        trend,
    };

    db.upsert(&agg.id).content(&agg).await?;

    Ok(())
}
}

Learning Curve Query:

#![allow(unused)]
fn main() {
pub async fn get_learning_curve(
    db: &Surreal<Ws>,
    agent_id: &str,
    task_type: &str,
    days: u32,
) -> Result<Vec<DailyAggregation>> {
    let since = (Utc::now() - chrono::Duration::days(days as i64))
        .format("%Y-%m-%d")
        .to_string();

    let curve = db
        .query(
            "SELECT * FROM daily_aggregations \
             WHERE agent_id = $1 AND task_type = $2 AND day >= $3 \
             ORDER BY day ASC"
        )
        .bind((agent_id, task_type, since))
        .await?
        .take::<Vec<DailyAggregation>>(0)?
        .unwrap_or_default();

    Ok(curve)
}
}

Trend Analysis:

#![allow(unused)]
fn main() {
pub fn analyze_trend(curve: &[DailyAggregation]) -> TrendAnalysis {
    if curve.len() < 2 {
        return TrendAnalysis::InsufficientData;
    }

    let improving_days = curve.iter().filter(|d| d.trend == TrendDirection::Improving).count();
    let declining_days = curve.iter().filter(|d| d.trend == TrendDirection::Declining).count();

    if improving_days > declining_days {
        TrendAnalysis::Improving
    } else if declining_days > improving_days {
        TrendAnalysis::Declining
    } else {
        TrendAnalysis::Stable
    }
}
}

Key Files:

  • /crates/vapora-knowledge-graph/src/models.rs (models)
  • /crates/vapora-knowledge-graph/src/aggregation.rs (daily aggregation)
  • /crates/vapora-knowledge-graph/src/learning.rs (learning curves)

Verification

# Test execution recording with daily window
cargo test -p vapora-knowledge-graph test_execution_recording

# Test daily aggregation
cargo test -p vapora-knowledge-graph test_daily_aggregation

# Test learning curve computation (7 days)
cargo test -p vapora-knowledge-graph test_learning_curve_7day

# Test trend detection
cargo test -p vapora-knowledge-graph test_trend_detection

# Integration: full lifecycle
cargo test -p vapora-knowledge-graph test_temporal_history_lifecycle

Expected Output:

  • Executions recorded with daily_window set
  • Daily aggregations computed correctly
  • Learning curves show trends
  • Trends detected accurately (improving/stable/declining)
  • Queries efficient with daily windows

Consequences

Data Retention

  • Daily aggregations permanent (minimal storage)
  • Individual execution records archived after 30 days
  • Trend analysis available indefinitely

Trend Visibility

  • Daily trends visible immediately
  • Week-over-week comparisons possible
  • Month-over-month trends computable

Performance

  • Aggregation queries use indexes (efficient)
  • Daily rollup automatic (background task)
  • No real-time overhead

Monitoring

  • Trends inform agent selection decisions
  • Declining agents flagged for investigation
  • Improving agents promoted

References

  • /crates/vapora-knowledge-graph/src/aggregation.rs (implementation)
  • /crates/vapora-knowledge-graph/src/learning.rs (usage)
  • ADR-013 (Knowledge Graph)
  • ADR-014 (Learning Profiles)

Related ADRs: ADR-013 (Knowledge Graph), ADR-014 (Learning Profiles), ADR-020 (Audit Trail)