Vapora/crates/vapora-analytics/benches/pipeline_benchmarks.rs
Jesús Pérez fe4d138a14
Some checks failed
Rust CI / Security Audit (push) Has been cancelled
Rust CI / Check + Test + Lint (nightly) (push) Has been cancelled
Rust CI / Check + Test + Lint (stable) (push) Has been cancelled
feat: CLI arguments, distribution management, and approval gates
- Add CLI support (--config, --help) with env var override for backend/agents
  - Implement distro justfile recipes: list-targets, install-targets, build-target, install
  - Fix OpenTelemetry API incompatibilities and remove deprecated calls
  - Add tokio "time" feature for timeout support
  - Fix Cargo profile warnings and Nushell script syntax
  - Update all dead_code warnings with strategic annotations
  - Zero compiler warnings in vapora codebase
  - Comprehensive CHANGELOG documenting risk-based approval gates system
2026-02-03 21:35:00 +00:00

142 lines
4.9 KiB
Rust

use std::hint::black_box;
use criterion::{criterion_group, criterion_main, Criterion};
use tokio::sync::mpsc;
use vapora_analytics::{AgentEvent, EventPipeline};
fn create_test_event(i: usize) -> AgentEvent {
if i % 20 == 0 {
AgentEvent::new_task_failed(
format!("agent-{}", i % 5),
format!("task-{}", i),
"timeout error".to_string(),
)
} else {
AgentEvent::new_task_completed(
format!("agent-{}", i % 5),
format!("task-{}", i),
1000 + (i as u64 * 100),
100 + (i as u64 * 10),
50,
)
}
}
fn pipeline_emit_event(c: &mut Criterion) {
c.bench_function("emit_single_event", |b| {
b.to_async(tokio::runtime::Runtime::new().unwrap())
.iter(|| async {
let (alert_tx, _alert_rx) = mpsc::unbounded_channel();
let (pipeline, _) = EventPipeline::new(alert_tx);
let event = AgentEvent::new_task_completed(
black_box("agent-1".to_string()),
black_box("task-1".to_string()),
1000,
100,
50,
);
black_box(pipeline.emit_event(black_box(event)).await)
});
});
}
fn pipeline_filter_events(c: &mut Criterion) {
c.bench_function("filter_events_100_events", |b| {
b.to_async(tokio::runtime::Runtime::new().unwrap())
.iter_batched(
|| {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let (alert_tx, _alert_rx) = mpsc::unbounded_channel();
let (pipeline, _) = EventPipeline::new(alert_tx);
for i in 0..100 {
let event = AgentEvent::new_task_completed(
format!("agent-{}", i % 5),
format!("task-{}", i),
1000 + (i as u64 * 100),
100 + (i as u64 * 10),
50,
);
pipeline.emit_event(event).await.ok();
}
pipeline
})
},
|pipeline| async move {
black_box(
pipeline
.filter_events(|e| e.agent_id == "agent-1")
)
},
criterion::BatchSize::SmallInput,
);
});
}
fn pipeline_get_error_rate(c: &mut Criterion) {
c.bench_function("get_error_rate_200_events", |b| {
b.to_async(tokio::runtime::Runtime::new().unwrap())
.iter_batched(
|| {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let (alert_tx, _alert_rx) = mpsc::unbounded_channel();
let (pipeline, _) = EventPipeline::new(alert_tx);
for i in 0..200 {
let event = create_test_event(i);
pipeline.emit_event(event).await.ok();
}
pipeline
})
},
|pipeline| async move { black_box(pipeline.get_error_rate(60).await.ok()) },
criterion::BatchSize::SmallInput,
);
});
}
fn pipeline_get_top_agents(c: &mut Criterion) {
c.bench_function("get_top_agents_500_events", |b| {
b.to_async(tokio::runtime::Runtime::new().unwrap())
.iter_batched(
|| {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let (alert_tx, _alert_rx) = mpsc::unbounded_channel();
let (pipeline, _) = EventPipeline::new(alert_tx);
for i in 0..500 {
let event = AgentEvent::new_task_completed(
format!("agent-{}", i % 10),
format!("task-{}", i),
1000 + (i as u64 * 100) % 5000,
100 + (i as u64 * 10),
50,
);
pipeline.emit_event(event).await.ok();
}
pipeline
})
},
|pipeline| async move { black_box(pipeline.get_top_agents(60).await.ok()) },
criterion::BatchSize::SmallInput,
);
});
}
criterion_group!(
benches,
pipeline_emit_event,
pipeline_filter_events,
pipeline_get_error_rate,
pipeline_get_top_agents
);
criterion_main!(benches);