feat: integrate NatsBridge with real JetStream into A2A server
vapora-agents:
- Add nats_bridge.rs with real async_nats JetStream (submit_task, durable
pull consumer, list_agents from live registry)
- Replace swarm_adapter.rs stubs with real SwarmCoordinator calls
(select_agent via bidding, report_completion with load update, agent_load
from fractional profile)
- Expose SwarmCoordinator::get_agent() for per-agent profile access
vapora-a2a:
- CoordinatorBridge: replace raw NatsClient with NatsBridge (JetStream
at-least-once delivery via durable pull consumer)
- Add GET /a2a/agents endpoint listing registered agents
- task_manager::create(): switch .content() to parameterized INSERT INTO
to avoid SurrealDB serializer failing on adjacently-tagged enums
- task_manager::get(): explicit field projection, exclude id (Thing),
cast datetimes with type::string() to fix serde_json::Value deserialization
- Integration tests: 4/5 pass with SurrealDB + NATS
vapora-leptos-ui:
- Set doctest = false in [lib]: Leptos components require WASM reactive
runtime, incompatible with native cargo test --doc
This commit is contained in:
parent
0b78d97fd7
commit
2f76728481
12 changed files with 843 additions and 379 deletions
44
CHANGELOG.md
44
CHANGELOG.md
|
|
@ -7,6 +7,48 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
### Added - NatsBridge + A2A JetStream Integration
|
||||||
|
|
||||||
|
#### `vapora-agents` — NatsBridge (real JetStream)
|
||||||
|
|
||||||
|
- `nats_bridge.rs`: new `NatsBridge` with real `async_nats::jetstream::Context`
|
||||||
|
- `submit_task()` → JetStream publish with double-await ack, returns sequence number
|
||||||
|
- `subscribe_task_results()` → durable pull consumer (`WorkQueue` retention), returns `mpsc::Receiver<TaskResult>`
|
||||||
|
- `list_agents()` → reads from live `AgentRegistry`, never hardcoded
|
||||||
|
- `NatsBrokerConfig` with sensible defaults; stream auto-created via `get_or_create_stream`
|
||||||
|
- `swarm_adapter.rs`: replaced all 3 stubs with real logic
|
||||||
|
- `select_agent()` → `swarm.submit_task_for_bidding()` for load-balanced selection
|
||||||
|
- `report_completion()` → `swarm.update_agent_status()` with load adjustment on failure
|
||||||
|
- `agent_load()` → derives current tasks from fractional load via `swarm.get_agent()`
|
||||||
|
|
||||||
|
#### `vapora-swarm` — `SwarmCoordinator::get_agent()`
|
||||||
|
|
||||||
|
- Added `pub fn get_agent(&self, agent_id: &str) -> Option<AgentProfile>` to expose per-agent profiles from private `DashMap`
|
||||||
|
|
||||||
|
#### `vapora-a2a` — NatsBridge integration + SurrealDB serialization fixes
|
||||||
|
|
||||||
|
- `CoordinatorBridge`: replaced raw `NatsClient` with `Option<Arc<NatsBridge>>`
|
||||||
|
- `start_result_listener()` uses JetStream pull consumer (at-least-once delivery)
|
||||||
|
- `dispatch()` publishes to JetStream after coordinator assignment (non-fatal fallback)
|
||||||
|
- `list_agents()` delegates to `NatsBridge.list_agents()`
|
||||||
|
- `server.rs`: added `GET /a2a/agents` endpoint
|
||||||
|
- `task_manager.rs`: fixed SurrealDB serialization
|
||||||
|
- `create()`: switched from `.content()` to parameterized `INSERT INTO` query; avoids SurrealDB serializer failing on adjacently-tagged enums (`A2aMessagePart`)
|
||||||
|
- `get()`: changed `SELECT *` to explicit field projection; excludes `id` (SurrealDB `Thing`) and casts datetimes with `type::string()` to avoid `serde_json::Value` deserialization failures
|
||||||
|
- Integration tests verified: 4/5 pass with SurrealDB + NATS; 5th requires live agent
|
||||||
|
|
||||||
|
#### `vapora-leptos-ui`
|
||||||
|
|
||||||
|
- Set `doctest = false` in `[lib]`: Leptos components require WASM reactive runtime; native doctests are incompatible by design
|
||||||
|
|
||||||
|
### Added - NATS JetStream local container
|
||||||
|
|
||||||
|
- `/containers/nats/`: Docker Compose service following existing containers pattern
|
||||||
|
- JetStream enabled via `nats.conf` (`store_dir: /data`, max_mem: 1G, max_file: 10G)
|
||||||
|
- Persistent volume at `./nats_data`
|
||||||
|
- Ports: 4222 (client), 8222 (HTTP monitoring)
|
||||||
|
- `local_net` network, `restart: unless-stopped`
|
||||||
|
|
||||||
### Added - Recursive Language Models (RLM) Integration (v1.3.0)
|
### Added - Recursive Language Models (RLM) Integration (v1.3.0)
|
||||||
|
|
||||||
#### Core RLM Engine (`vapora-rlm` crate - 17,000+ LOC)
|
#### Core RLM Engine (`vapora-rlm` crate - 17,000+ LOC)
|
||||||
|
|
@ -108,7 +150,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
|
|
||||||
#### Documentation
|
#### Documentation
|
||||||
|
|
||||||
- **Architecture Decision Record**: `docs/architecture/decisions/008-recursive-language-models-integration.md`
|
- **Architecture Decision Record**: `docs/adrs/0029-rlm-recursive-language-models.md`
|
||||||
- Context and problem statement
|
- Context and problem statement
|
||||||
- Considered options (RAG, LangChain, custom RLM)
|
- Considered options (RAG, LangChain, custom RLM)
|
||||||
- Decision rationale and trade-offs
|
- Decision rationale and trade-offs
|
||||||
|
|
|
||||||
|
|
@ -34,8 +34,8 @@
|
||||||
### Components
|
### Components
|
||||||
|
|
||||||
1. **CoordinatorBridge** - Maps A2A tasks to internal agent coordination
|
1. **CoordinatorBridge** - Maps A2A tasks to internal agent coordination
|
||||||
- NATS subscribers for TaskCompleted/TaskFailed events
|
- JetStream durable pull consumer via `NatsBridge` (at-least-once delivery)
|
||||||
- DashMap for async result delivery via oneshot channels
|
- `DashMap<String, oneshot::Sender>` for async result delivery
|
||||||
- Graceful degradation if NATS unavailable
|
- Graceful degradation if NATS unavailable
|
||||||
|
|
||||||
2. **TaskManager** - Persistent task storage and lifecycle
|
2. **TaskManager** - Persistent task storage and lifecycle
|
||||||
|
|
@ -47,6 +47,7 @@
|
||||||
- `GET /.well-known/agent.json` - Agent discovery
|
- `GET /.well-known/agent.json` - Agent discovery
|
||||||
- `POST /a2a` - Task dispatch
|
- `POST /a2a` - Task dispatch
|
||||||
- `GET /a2a/tasks/{task_id}` - Status query
|
- `GET /a2a/tasks/{task_id}` - Status query
|
||||||
|
- `GET /a2a/agents` - List registered agents
|
||||||
- `GET /health` - Health check
|
- `GET /health` - Health check
|
||||||
- `GET /metrics` - Prometheus metrics
|
- `GET /metrics` - Prometheus metrics
|
||||||
|
|
||||||
|
|
@ -60,8 +61,9 @@ docker run -d -p 8000:8000 \
|
||||||
surrealdb/surrealdb:latest \
|
surrealdb/surrealdb:latest \
|
||||||
start --bind 0.0.0.0:8000
|
start --bind 0.0.0.0:8000
|
||||||
|
|
||||||
# Start NATS (optional, graceful degradation)
|
# Start NATS with JetStream (optional, graceful degradation)
|
||||||
docker run -d -p 4222:4222 nats:latest
|
docker run -d -p 4222:4222 -p 8222:8222 nats:latest -js
|
||||||
|
# or use the local container: cd /containers/nats && docker compose up -d
|
||||||
|
|
||||||
# Run migration
|
# Run migration
|
||||||
surrealdb import --conn ws://localhost:8000 \
|
surrealdb import --conn ws://localhost:8000 \
|
||||||
|
|
@ -174,7 +176,7 @@ Require SurrealDB + NATS running:
|
||||||
docker compose up -d surrealdb nats
|
docker compose up -d surrealdb nats
|
||||||
|
|
||||||
# Run tests
|
# Run tests
|
||||||
cargo test -p vapora-a2a --test integration_test -- --ignored
|
cargo test -p vapora-a2a --test integration_test -- --include-ignored
|
||||||
|
|
||||||
# Tests:
|
# Tests:
|
||||||
# 1. Task persistence after restart
|
# 1. Task persistence after restart
|
||||||
|
|
@ -198,7 +200,7 @@ Implements [A2A Protocol Specification](https://a2a-spec.dev):
|
||||||
|
|
||||||
## Production Deployment
|
## Production Deployment
|
||||||
|
|
||||||
See [ADR-0001](../../docs/architecture/adr/0001-a2a-protocol-implementation.md) and [ADR-0002](../../docs/architecture/adr/0002-kubernetes-deployment-strategy.md).
|
See [ADR-0030](../../docs/adrs/0030-a2a-protocol-implementation.md) and [ADR-0031](../../docs/adrs/0031-kubernetes-deployment-kagent.md).
|
||||||
|
|
||||||
### Kubernetes
|
### Kubernetes
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,17 +1,17 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_nats::Client as NatsClient;
|
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use futures::StreamExt;
|
use tokio::sync::{mpsc, oneshot};
|
||||||
use serde_json::json;
|
|
||||||
use tokio::sync::oneshot;
|
|
||||||
use tracing::{error, info, warn};
|
use tracing::{error, info, warn};
|
||||||
use vapora_agents::{coordinator::AgentCoordinator, messages::AgentMessage};
|
use vapora_agents::{
|
||||||
|
coordinator::AgentCoordinator,
|
||||||
|
nats_bridge::{AgentStatusInfo, NatsBridge, TaskMessage, TaskResult},
|
||||||
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
error::{A2aError, Result},
|
error::{A2aError, Result},
|
||||||
metrics::{A2A_COORDINATOR_ASSIGNMENTS, A2A_NATS_MESSAGES, A2A_TASKS_TOTAL},
|
metrics::{A2A_COORDINATOR_ASSIGNMENTS, A2A_NATS_MESSAGES, A2A_TASKS_TOTAL},
|
||||||
protocol::{A2aMessage, A2aMessagePart, A2aTask, A2aTaskResult, TaskState},
|
protocol::{A2aErrorObj, A2aMessage, A2aMessagePart, A2aTask, A2aTaskResult, TaskState},
|
||||||
task_manager::TaskManager,
|
task_manager::TaskManager,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -19,228 +19,49 @@ pub struct CoordinatorBridge {
|
||||||
coordinator: Arc<AgentCoordinator>,
|
coordinator: Arc<AgentCoordinator>,
|
||||||
task_manager: Arc<TaskManager>,
|
task_manager: Arc<TaskManager>,
|
||||||
result_channels: Arc<DashMap<String, oneshot::Sender<A2aTaskResult>>>,
|
result_channels: Arc<DashMap<String, oneshot::Sender<A2aTaskResult>>>,
|
||||||
nats_client: Option<NatsClient>,
|
nats_bridge: Option<Arc<NatsBridge>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CoordinatorBridge {
|
impl CoordinatorBridge {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
coordinator: Arc<AgentCoordinator>,
|
coordinator: Arc<AgentCoordinator>,
|
||||||
task_manager: Arc<TaskManager>,
|
task_manager: Arc<TaskManager>,
|
||||||
nats_client: Option<NatsClient>,
|
nats_bridge: Option<Arc<NatsBridge>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
coordinator,
|
coordinator,
|
||||||
task_manager,
|
task_manager,
|
||||||
result_channels: Arc::new(DashMap::new()),
|
result_channels: Arc::new(DashMap::new()),
|
||||||
nats_client,
|
nats_bridge,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Start background NATS listeners for task completion/failure events
|
/// Start the JetStream result consumer.
|
||||||
|
///
|
||||||
|
/// Subscribes to `vapora.tasks.completed` via the durable pull consumer in
|
||||||
|
/// `NatsBridge`. Each `TaskResult` received is persisted to SurrealDB and
|
||||||
|
/// forwarded to any waiting oneshot channel. Replaces the previous core
|
||||||
|
/// NATS subscriber approach with at-least-once JetStream delivery.
|
||||||
pub async fn start_result_listener(&self) -> Result<()> {
|
pub async fn start_result_listener(&self) -> Result<()> {
|
||||||
let Some(nats) = &self.nats_client else {
|
let Some(bridge) = &self.nats_bridge else {
|
||||||
warn!("NATS client not configured, result listener disabled");
|
warn!("NatsBridge not configured; result listener disabled");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
|
|
||||||
// Subscribe to completion events
|
let rx = bridge
|
||||||
let completed_sub = nats
|
.subscribe_task_results()
|
||||||
.subscribe("vapora.tasks.completed")
|
|
||||||
.await
|
.await
|
||||||
.map_err(|e| A2aError::InternalError(format!("Failed to subscribe to NATS: {}", e)))?;
|
.map_err(|e| A2aError::InternalError(format!("JetStream consumer error: {}", e)))?;
|
||||||
|
|
||||||
let failed_sub = nats
|
let task_manager = self.task_manager.clone();
|
||||||
.subscribe("vapora.tasks.failed")
|
let result_channels = self.result_channels.clone();
|
||||||
.await
|
|
||||||
.map_err(|e| A2aError::InternalError(format!("Failed to subscribe to NATS: {}", e)))?;
|
|
||||||
|
|
||||||
// Spawn listener for completed tasks
|
tokio::spawn(run_result_consumer(rx, task_manager, result_channels));
|
||||||
Self::spawn_completed_listener(
|
|
||||||
completed_sub,
|
|
||||||
self.task_manager.clone(),
|
|
||||||
self.result_channels.clone(),
|
|
||||||
);
|
|
||||||
|
|
||||||
// Spawn listener for failed tasks
|
info!("A2A result listener started (JetStream pull consumer)");
|
||||||
Self::spawn_failed_listener(
|
|
||||||
failed_sub,
|
|
||||||
self.task_manager.clone(),
|
|
||||||
self.result_channels.clone(),
|
|
||||||
);
|
|
||||||
|
|
||||||
info!(
|
|
||||||
"A2A result listener started (subscribed to vapora.tasks.completed, \
|
|
||||||
vapora.tasks.failed)"
|
|
||||||
);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn spawn_completed_listener(
|
|
||||||
mut completed_sub: async_nats::Subscriber,
|
|
||||||
task_manager: Arc<TaskManager>,
|
|
||||||
result_channels: Arc<DashMap<String, oneshot::Sender<A2aTaskResult>>>,
|
|
||||||
) {
|
|
||||||
tokio::spawn(async move {
|
|
||||||
while let Some(msg) = completed_sub.next().await {
|
|
||||||
Self::handle_completed_message(msg, &task_manager, &result_channels).await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_completed_message(
|
|
||||||
msg: async_nats::Message,
|
|
||||||
task_manager: &TaskManager,
|
|
||||||
result_channels: &DashMap<String, oneshot::Sender<A2aTaskResult>>,
|
|
||||||
) {
|
|
||||||
match serde_json::from_slice::<AgentMessage>(&msg.payload) {
|
|
||||||
Ok(AgentMessage::TaskCompleted(task_completed)) => {
|
|
||||||
let task_id = task_completed.task_id.clone();
|
|
||||||
|
|
||||||
// Build A2aTaskResult
|
|
||||||
let artifacts = Self::convert_artifacts(&task_completed.artifacts);
|
|
||||||
let result = A2aTaskResult {
|
|
||||||
message: A2aMessage {
|
|
||||||
role: "assistant".to_string(),
|
|
||||||
parts: vec![A2aMessagePart::Text(task_completed.result.clone())],
|
|
||||||
},
|
|
||||||
artifacts,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Update DB
|
|
||||||
if let Err(e) = task_manager.complete(&task_id, result.clone()).await {
|
|
||||||
error!(
|
|
||||||
error = %e,
|
|
||||||
task_id = %task_id,
|
|
||||||
"Failed to mark task as completed in DB"
|
|
||||||
);
|
|
||||||
A2A_NATS_MESSAGES
|
|
||||||
.with_label_values(&["completed", "db_error"])
|
|
||||||
.inc();
|
|
||||||
} else {
|
|
||||||
A2A_NATS_MESSAGES
|
|
||||||
.with_label_values(&["completed", "success"])
|
|
||||||
.inc();
|
|
||||||
A2A_TASKS_TOTAL.with_label_values(&["completed"]).inc();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send to waiting channel if exists
|
|
||||||
Self::send_to_channel(&task_id, result, result_channels);
|
|
||||||
}
|
|
||||||
Ok(_) => {
|
|
||||||
warn!("Received non-TaskCompleted message on vapora.tasks.completed");
|
|
||||||
A2A_NATS_MESSAGES
|
|
||||||
.with_label_values(&["completed", "wrong_type"])
|
|
||||||
.inc();
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!(error = %e, "Failed to deserialize TaskCompleted message");
|
|
||||||
A2A_NATS_MESSAGES
|
|
||||||
.with_label_values(&["completed", "deserialize_error"])
|
|
||||||
.inc();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn spawn_failed_listener(
|
|
||||||
mut failed_sub: async_nats::Subscriber,
|
|
||||||
task_manager: Arc<TaskManager>,
|
|
||||||
result_channels: Arc<DashMap<String, oneshot::Sender<A2aTaskResult>>>,
|
|
||||||
) {
|
|
||||||
tokio::spawn(async move {
|
|
||||||
while let Some(msg) = failed_sub.next().await {
|
|
||||||
Self::handle_failed_message(msg, &task_manager, &result_channels).await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_failed_message(
|
|
||||||
msg: async_nats::Message,
|
|
||||||
task_manager: &TaskManager,
|
|
||||||
result_channels: &DashMap<String, oneshot::Sender<A2aTaskResult>>,
|
|
||||||
) {
|
|
||||||
match serde_json::from_slice::<AgentMessage>(&msg.payload) {
|
|
||||||
Ok(AgentMessage::TaskFailed(task_failed)) => {
|
|
||||||
let task_id = task_failed.task_id.clone();
|
|
||||||
|
|
||||||
// Update DB with error
|
|
||||||
let error_obj = crate::protocol::A2aErrorObj {
|
|
||||||
code: -1,
|
|
||||||
message: task_failed.error.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(e) = task_manager.fail(&task_id, error_obj).await {
|
|
||||||
error!(
|
|
||||||
error = %e,
|
|
||||||
task_id = %task_id,
|
|
||||||
"Failed to mark task as failed in DB"
|
|
||||||
);
|
|
||||||
A2A_NATS_MESSAGES
|
|
||||||
.with_label_values(&["failed", "db_error"])
|
|
||||||
.inc();
|
|
||||||
} else {
|
|
||||||
A2A_NATS_MESSAGES
|
|
||||||
.with_label_values(&["failed", "success"])
|
|
||||||
.inc();
|
|
||||||
A2A_TASKS_TOTAL.with_label_values(&["failed"]).inc();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove waiting channel (task failed, no result to send)
|
|
||||||
result_channels.remove(&task_id);
|
|
||||||
}
|
|
||||||
Ok(_) => {
|
|
||||||
warn!("Received non-TaskFailed message on vapora.tasks.failed");
|
|
||||||
A2A_NATS_MESSAGES
|
|
||||||
.with_label_values(&["failed", "wrong_type"])
|
|
||||||
.inc();
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!(error = %e, "Failed to deserialize TaskFailed message");
|
|
||||||
A2A_NATS_MESSAGES
|
|
||||||
.with_label_values(&["failed", "deserialize_error"])
|
|
||||||
.inc();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn convert_artifacts(artifacts: &[String]) -> Option<Vec<crate::protocol::A2aArtifact>> {
|
|
||||||
if artifacts.is_empty() {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
Some(
|
|
||||||
artifacts
|
|
||||||
.iter()
|
|
||||||
.map(|path| crate::protocol::A2aArtifact {
|
|
||||||
artifact_type: "file".to_string(),
|
|
||||||
format: None,
|
|
||||||
title: Some(
|
|
||||||
std::path::Path::new(path)
|
|
||||||
.file_name()
|
|
||||||
.and_then(|n| n.to_str())
|
|
||||||
.unwrap_or(path)
|
|
||||||
.to_string(),
|
|
||||||
),
|
|
||||||
data: json!({ "path": path }),
|
|
||||||
})
|
|
||||||
.collect(),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn send_to_channel(
|
|
||||||
task_id: &str,
|
|
||||||
result: A2aTaskResult,
|
|
||||||
result_channels: &DashMap<String, oneshot::Sender<A2aTaskResult>>,
|
|
||||||
) {
|
|
||||||
if let Some((_, sender)) = result_channels.remove(task_id) {
|
|
||||||
if sender.send(result).is_err() {
|
|
||||||
warn!(
|
|
||||||
task_id = %task_id,
|
|
||||||
"Failed to send result to channel (receiver dropped)"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn dispatch(&self, a2a_task: A2aTask) -> Result<String> {
|
pub async fn dispatch(&self, a2a_task: A2aTask) -> Result<String> {
|
||||||
let task_id = a2a_task.id.clone();
|
let task_id = a2a_task.id.clone();
|
||||||
|
|
||||||
|
|
@ -272,27 +93,32 @@ impl CoordinatorBridge {
|
||||||
let title = parts[0].to_string();
|
let title = parts[0].to_string();
|
||||||
let description = parts.get(1).unwrap_or(&"").to_string();
|
let description = parts.get(1).unwrap_or(&"").to_string();
|
||||||
|
|
||||||
// Create task in DB (status: waiting)
|
// Persist task (status: waiting)
|
||||||
self.task_manager.create(a2a_task).await?;
|
self.task_manager.create(a2a_task.clone()).await?;
|
||||||
A2A_TASKS_TOTAL.with_label_values(&["waiting"]).inc();
|
A2A_TASKS_TOTAL.with_label_values(&["waiting"]).inc();
|
||||||
|
|
||||||
// Update status to working
|
|
||||||
self.task_manager
|
self.task_manager
|
||||||
.update_state(&task_id, TaskState::Working)
|
.update_state(&task_id, TaskState::Working)
|
||||||
.await?;
|
.await?;
|
||||||
A2A_TASKS_TOTAL.with_label_values(&["working"]).inc();
|
A2A_TASKS_TOTAL.with_label_values(&["working"]).inc();
|
||||||
|
|
||||||
// Assign to agent (via AgentCoordinator)
|
// Assign via AgentCoordinator (learning-based selection + core NATS publish)
|
||||||
match self
|
match self
|
||||||
.coordinator
|
.coordinator
|
||||||
.assign_task(&skill, title, description, json!({}).to_string(), 50)
|
.assign_task(
|
||||||
|
&skill,
|
||||||
|
title.clone(),
|
||||||
|
description.clone(),
|
||||||
|
serde_json::json!({}).to_string(),
|
||||||
|
50,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
A2A_COORDINATOR_ASSIGNMENTS
|
A2A_COORDINATOR_ASSIGNMENTS
|
||||||
.with_label_values(&[skill.as_str(), "success"])
|
.with_label_values(&[skill.as_str(), "success"])
|
||||||
.inc();
|
.inc();
|
||||||
info!("Task {} dispatched to coordinator", task_id);
|
info!(task_id = %task_id, "Task dispatched via coordinator");
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
A2A_COORDINATOR_ASSIGNMENTS
|
A2A_COORDINATOR_ASSIGNMENTS
|
||||||
|
|
@ -302,7 +128,36 @@ impl CoordinatorBridge {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NO sleep(5) here! Result will come via NATS subscriber
|
// Publish to JetStream for durable delivery (if NatsBridge is configured)
|
||||||
|
if let Some(bridge) = &self.nats_bridge {
|
||||||
|
let task_msg = TaskMessage {
|
||||||
|
task_id: task_id.clone(),
|
||||||
|
title,
|
||||||
|
description,
|
||||||
|
role: skill.clone(),
|
||||||
|
context: serde_json::json!({}).to_string(),
|
||||||
|
priority: 50,
|
||||||
|
};
|
||||||
|
|
||||||
|
match bridge.submit_task(task_msg).await {
|
||||||
|
Ok(seq) => {
|
||||||
|
info!(
|
||||||
|
task_id = %task_id,
|
||||||
|
jetstream_seq = seq,
|
||||||
|
"Task published to JetStream"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
// Non-fatal: core NATS dispatch already succeeded.
|
||||||
|
// Log and continue — coordinator assignment was the primary path.
|
||||||
|
warn!(
|
||||||
|
error = %e,
|
||||||
|
task_id = %task_id,
|
||||||
|
"JetStream publish failed (task still dispatched via coordinator)"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(task_id)
|
Ok(task_id)
|
||||||
}
|
}
|
||||||
|
|
@ -310,4 +165,78 @@ impl CoordinatorBridge {
|
||||||
pub async fn get_task(&self, id: &str) -> Result<crate::protocol::A2aTaskStatus> {
|
pub async fn get_task(&self, id: &str) -> Result<crate::protocol::A2aTaskStatus> {
|
||||||
self.task_manager.get(id).await
|
self.task_manager.get(id).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// List all registered agents from the NatsBridge registry.
|
||||||
|
/// Returns empty vec if NatsBridge is not configured.
|
||||||
|
pub fn list_agents(&self) -> Vec<AgentStatusInfo> {
|
||||||
|
self.nats_bridge
|
||||||
|
.as_ref()
|
||||||
|
.map(|b| b.list_agents())
|
||||||
|
.unwrap_or_default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_result_consumer(
|
||||||
|
mut rx: mpsc::Receiver<TaskResult>,
|
||||||
|
task_manager: Arc<TaskManager>,
|
||||||
|
result_channels: Arc<DashMap<String, oneshot::Sender<A2aTaskResult>>>,
|
||||||
|
) {
|
||||||
|
while let Some(task_result) = rx.recv().await {
|
||||||
|
let task_id = task_result.task_id.clone();
|
||||||
|
|
||||||
|
if task_result.success {
|
||||||
|
let a2a_result = A2aTaskResult {
|
||||||
|
message: A2aMessage {
|
||||||
|
role: "assistant".to_string(),
|
||||||
|
parts: vec![A2aMessagePart::Text(task_result.result.clone())],
|
||||||
|
},
|
||||||
|
artifacts: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
match task_manager.complete(&task_id, a2a_result.clone()).await {
|
||||||
|
Err(e) => {
|
||||||
|
error!(error = %e, task_id = %task_id, "Failed to mark task completed");
|
||||||
|
A2A_NATS_MESSAGES
|
||||||
|
.with_label_values(&["completed", "db_error"])
|
||||||
|
.inc();
|
||||||
|
}
|
||||||
|
Ok(()) => {
|
||||||
|
A2A_NATS_MESSAGES
|
||||||
|
.with_label_values(&["completed", "success"])
|
||||||
|
.inc();
|
||||||
|
A2A_TASKS_TOTAL.with_label_values(&["completed"]).inc();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some((_, sender)) = result_channels.remove(&task_id) {
|
||||||
|
if sender.send(a2a_result).is_err() {
|
||||||
|
warn!(task_id = %task_id, "Result receiver dropped");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let error_obj = A2aErrorObj {
|
||||||
|
code: -1,
|
||||||
|
message: task_result.result.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
match task_manager.fail(&task_id, error_obj).await {
|
||||||
|
Err(e) => {
|
||||||
|
error!(error = %e, task_id = %task_id, "Failed to mark task failed");
|
||||||
|
A2A_NATS_MESSAGES
|
||||||
|
.with_label_values(&["failed", "db_error"])
|
||||||
|
.inc();
|
||||||
|
}
|
||||||
|
Ok(()) => {
|
||||||
|
A2A_NATS_MESSAGES
|
||||||
|
.with_label_values(&["failed", "success"])
|
||||||
|
.inc();
|
||||||
|
A2A_TASKS_TOTAL.with_label_values(&["failed"]).inc();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result_channels.remove(&task_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
warn!("NatsBridge result channel closed; listener stopped");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,12 @@ use vapora_a2a::{
|
||||||
agent_card::generate_default_agent_card, bridge::CoordinatorBridge, server::create_router,
|
agent_card::generate_default_agent_card, bridge::CoordinatorBridge, server::create_router,
|
||||||
server::A2aState, task_manager::TaskManager,
|
server::A2aState, task_manager::TaskManager,
|
||||||
};
|
};
|
||||||
use vapora_agents::{config::AgentConfig, coordinator::AgentCoordinator, registry::AgentRegistry};
|
use vapora_agents::{
|
||||||
|
config::AgentConfig,
|
||||||
|
coordinator::AgentCoordinator,
|
||||||
|
nats_bridge::{NatsBridge, NatsBrokerConfig},
|
||||||
|
registry::AgentRegistry,
|
||||||
|
};
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
|
@ -37,13 +42,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
.map(|(_, v)| v.as_str())
|
.map(|(_, v)| v.as_str())
|
||||||
.unwrap_or("1.0.0");
|
.unwrap_or("1.0.0");
|
||||||
|
|
||||||
|
let nats_url =
|
||||||
|
std::env::var("NATS_URL").unwrap_or_else(|_| "nats://127.0.0.1:4222".to_string());
|
||||||
|
let surreal_url = std::env::var("SURREAL_URL").unwrap_or_else(|_| "127.0.0.1:8000".to_string());
|
||||||
|
|
||||||
let addr = format!("{}:{}", host, port);
|
let addr = format!("{}:{}", host, port);
|
||||||
|
|
||||||
info!("Starting VAPORA A2A Server on {}", addr);
|
info!("Starting VAPORA A2A Server on {}", addr);
|
||||||
|
|
||||||
// Connect to SurrealDB
|
// Connect to SurrealDB
|
||||||
info!("Connecting to SurrealDB");
|
info!(url = %surreal_url, "Connecting to SurrealDB");
|
||||||
let db = surrealdb::Surreal::new::<surrealdb::engine::remote::ws::Ws>("127.0.0.1:8000").await?;
|
let db =
|
||||||
|
surrealdb::Surreal::new::<surrealdb::engine::remote::ws::Ws>(surreal_url.as_str()).await?;
|
||||||
|
|
||||||
db.signin(surrealdb::opt::auth::Root {
|
db.signin(surrealdb::opt::auth::Root {
|
||||||
username: "root",
|
username: "root",
|
||||||
|
|
@ -54,32 +64,40 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
db.use_ns("vapora").use_db("main").await?;
|
db.use_ns("vapora").use_db("main").await?;
|
||||||
info!("Connected to SurrealDB");
|
info!("Connected to SurrealDB");
|
||||||
|
|
||||||
// Connect to NATS (optional - graceful fallback if not available)
|
let registry = Arc::new(AgentRegistry::new(10));
|
||||||
let nats_client = match async_nats::connect("127.0.0.1:4222").await {
|
|
||||||
Ok(client) => {
|
// Connect to NATS via NatsBridge (JetStream, durable) — graceful fallback
|
||||||
info!("Connected to NATS");
|
let nats_bridge = {
|
||||||
Some(client)
|
let config = NatsBrokerConfig {
|
||||||
|
url: nats_url.clone(),
|
||||||
|
..NatsBrokerConfig::default()
|
||||||
|
};
|
||||||
|
match NatsBridge::connect(config, registry.clone()).await {
|
||||||
|
Ok(bridge) => {
|
||||||
|
info!(url = %nats_url, "NatsBridge connected (JetStream)");
|
||||||
|
Some(Arc::new(bridge))
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(
|
warn!(
|
||||||
"Failed to connect to NATS: {}. Async coordination disabled.",
|
error = %e,
|
||||||
e
|
"Failed to connect NatsBridge; result listener and JetStream dispatch disabled"
|
||||||
);
|
);
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let task_manager = Arc::new(TaskManager::new(db.clone()));
|
let task_manager = Arc::new(TaskManager::new(db.clone()));
|
||||||
let registry = Arc::new(AgentRegistry::new(10));
|
|
||||||
let config = AgentConfig::default();
|
let config = AgentConfig::default();
|
||||||
let agent_coordinator = Arc::new(AgentCoordinator::new(config, registry).await?);
|
let agent_coordinator = Arc::new(AgentCoordinator::new(config, registry).await?);
|
||||||
|
|
||||||
let bridge = Arc::new(CoordinatorBridge::new(
|
let bridge = Arc::new(CoordinatorBridge::new(
|
||||||
agent_coordinator.clone(),
|
agent_coordinator.clone(),
|
||||||
task_manager.clone(),
|
task_manager.clone(),
|
||||||
nats_client,
|
nats_bridge,
|
||||||
));
|
));
|
||||||
|
|
||||||
// Start NATS result listener
|
// Start JetStream result listener
|
||||||
bridge.start_result_listener().await?;
|
bridge.start_result_listener().await?;
|
||||||
|
|
||||||
let agent_card =
|
let agent_card =
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ pub fn create_router(state: A2aState) -> Router {
|
||||||
.route("/.well-known/agent.json", get(agent_card_handler))
|
.route("/.well-known/agent.json", get(agent_card_handler))
|
||||||
.route("/a2a", post(a2a_handler))
|
.route("/a2a", post(a2a_handler))
|
||||||
.route("/a2a/tasks/{task_id}", get(task_status_handler))
|
.route("/a2a/tasks/{task_id}", get(task_status_handler))
|
||||||
|
.route("/a2a/agents", get(agents_handler))
|
||||||
.route("/health", get(health_handler))
|
.route("/health", get(health_handler))
|
||||||
.route("/metrics", get(metrics_handler))
|
.route("/metrics", get(metrics_handler))
|
||||||
.with_state(state)
|
.with_state(state)
|
||||||
|
|
@ -84,6 +85,10 @@ async fn task_status_handler(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn agents_handler(State(state): State<A2aState>) -> impl IntoResponse {
|
||||||
|
Json(state.bridge.list_agents())
|
||||||
|
}
|
||||||
|
|
||||||
async fn health_handler() -> impl IntoResponse {
|
async fn health_handler() -> impl IntoResponse {
|
||||||
Json(json!({
|
Json(json!({
|
||||||
"status": "healthy",
|
"status": "healthy",
|
||||||
|
|
|
||||||
|
|
@ -17,39 +17,42 @@ impl TaskManager {
|
||||||
|
|
||||||
pub async fn create(&self, task: A2aTask) -> Result<A2aTaskStatus> {
|
pub async fn create(&self, task: A2aTask) -> Result<A2aTaskStatus> {
|
||||||
let now = Utc::now().to_rfc3339();
|
let now = Utc::now().to_rfc3339();
|
||||||
let status = A2aTaskStatus {
|
|
||||||
id: task.id.clone(),
|
|
||||||
state: TaskState::Waiting.as_str().to_string(),
|
|
||||||
message: Some(task.message.clone()),
|
|
||||||
result: None,
|
|
||||||
error: None,
|
|
||||||
created_at: now.clone(),
|
|
||||||
updated_at: now.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Serialize task to JSON for storage
|
// Pre-serialize complex fields with serde_json to avoid SurrealDB's
|
||||||
let task_record = serde_json::json!({
|
// own serializer choking on adjacently-tagged enums (tag + content).
|
||||||
"task_id": task.id,
|
let message_json = serde_json::to_value(&task.message)
|
||||||
"state": TaskState::Waiting.as_str(),
|
.map_err(|e| A2aError::InternalError(format!("Failed to serialize message: {}", e)))?;
|
||||||
"message": task.message,
|
let metadata_json = serde_json::to_value(&task.metadata)
|
||||||
"result": serde_json::Value::Null,
|
.map_err(|e| A2aError::InternalError(format!("Failed to serialize metadata: {}", e)))?;
|
||||||
"error": serde_json::Value::Null,
|
|
||||||
"metadata": task.metadata,
|
|
||||||
"created_at": now,
|
|
||||||
"updated_at": now,
|
|
||||||
});
|
|
||||||
|
|
||||||
match self
|
match self
|
||||||
.db
|
.db
|
||||||
.create::<Option<serde_json::Value>>("a2a_tasks")
|
.query(
|
||||||
.content(task_record)
|
"INSERT INTO a2a_tasks (task_id, state, message, result, error, metadata, \
|
||||||
|
created_at, updated_at) VALUES ($task_id, $state, $message, NONE, NONE, \
|
||||||
|
$metadata, $created_at, $updated_at)",
|
||||||
|
)
|
||||||
|
.bind(("task_id", task.id.clone()))
|
||||||
|
.bind(("state", TaskState::Waiting.as_str().to_string()))
|
||||||
|
.bind(("message", message_json))
|
||||||
|
.bind(("metadata", metadata_json))
|
||||||
|
.bind(("created_at", now.clone()))
|
||||||
|
.bind(("updated_at", now.clone()))
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
A2A_DB_OPERATIONS
|
A2A_DB_OPERATIONS
|
||||||
.with_label_values(&["create", "success"])
|
.with_label_values(&["create", "success"])
|
||||||
.inc();
|
.inc();
|
||||||
Ok(status)
|
Ok(A2aTaskStatus {
|
||||||
|
id: task.id,
|
||||||
|
state: TaskState::Waiting.as_str().to_string(),
|
||||||
|
message: Some(task.message),
|
||||||
|
result: None,
|
||||||
|
error: None,
|
||||||
|
created_at: now.clone(),
|
||||||
|
updated_at: now,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
A2A_DB_OPERATIONS
|
A2A_DB_OPERATIONS
|
||||||
|
|
@ -64,9 +67,15 @@ impl TaskManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get(&self, id: &str) -> Result<A2aTaskStatus> {
|
pub async fn get(&self, id: &str) -> Result<A2aTaskStatus> {
|
||||||
|
// Explicit projection: exclude `id` (SurrealDB Thing) and cast datetime
|
||||||
|
// fields to strings to avoid deserialization failures into serde_json::Value.
|
||||||
let mut response = match self
|
let mut response = match self
|
||||||
.db
|
.db
|
||||||
.query("SELECT * FROM a2a_tasks WHERE task_id = $task_id LIMIT 1")
|
.query(
|
||||||
|
"SELECT task_id, state, message, result, error, metadata, \
|
||||||
|
type::string(created_at) AS created_at, type::string(updated_at) AS updated_at \
|
||||||
|
FROM a2a_tasks WHERE task_id = $task_id LIMIT 1",
|
||||||
|
)
|
||||||
.bind(("task_id", id.to_string()))
|
.bind(("task_id", id.to_string()))
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -13,11 +13,12 @@ use vapora_a2a::{
|
||||||
task_manager::TaskManager,
|
task_manager::TaskManager,
|
||||||
};
|
};
|
||||||
use vapora_agents::{
|
use vapora_agents::{
|
||||||
config::AgentConfig, coordinator::AgentCoordinator, messages::AgentMessage,
|
config::AgentConfig,
|
||||||
messages::TaskCompleted, registry::AgentRegistry,
|
coordinator::AgentCoordinator,
|
||||||
|
nats_bridge::{NatsBridge, NatsBrokerConfig, TaskResult},
|
||||||
|
registry::AgentRegistry,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Setup test database connection
|
|
||||||
async fn setup_test_db() -> Surreal<Client> {
|
async fn setup_test_db() -> Surreal<Client> {
|
||||||
let db = Surreal::new::<Ws>("127.0.0.1:8000")
|
let db = Surreal::new::<Ws>("127.0.0.1:8000")
|
||||||
.await
|
.await
|
||||||
|
|
@ -38,14 +39,21 @@ async fn setup_test_db() -> Surreal<Client> {
|
||||||
db
|
db
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Setup test NATS connection
|
async fn setup_test_nats_bridge(registry: Arc<AgentRegistry>) -> Arc<NatsBridge> {
|
||||||
async fn setup_test_nats() -> async_nats::Client {
|
let config = NatsBrokerConfig {
|
||||||
async_nats::connect("127.0.0.1:4222")
|
url: "nats://127.0.0.1:4222".to_string(),
|
||||||
|
stream_name: "VAPORA_TASKS_TEST".to_string(),
|
||||||
|
consumer_name: "vapora-a2a-integration-test".to_string(),
|
||||||
|
..NatsBrokerConfig::default()
|
||||||
|
};
|
||||||
|
Arc::new(
|
||||||
|
NatsBridge::connect(config, registry)
|
||||||
.await
|
.await
|
||||||
.expect("Failed to connect to NATS")
|
.expect("Failed to connect NatsBridge"),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Test 1: Task persistence - tasks survive restarts
|
/// Test 1: Task persistence — tasks survive TaskManager restart
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
#[ignore] // Requires SurrealDB running
|
#[ignore] // Requires SurrealDB running
|
||||||
async fn test_task_persistence_after_restart() {
|
async fn test_task_persistence_after_restart() {
|
||||||
|
|
@ -61,62 +69,65 @@ async fn test_task_persistence_after_restart() {
|
||||||
metadata: Default::default(),
|
metadata: Default::default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Create task
|
|
||||||
task_manager
|
task_manager
|
||||||
.create(task)
|
.create(task)
|
||||||
.await
|
.await
|
||||||
.expect("Failed to create task");
|
.expect("Failed to create task");
|
||||||
|
|
||||||
// Simulate restart by creating new TaskManager instance
|
// Simulate restart with a new TaskManager instance pointing to same DB
|
||||||
let task_manager2 = Arc::new(TaskManager::new(db.clone()));
|
let task_manager2 = Arc::new(TaskManager::new(db.clone()));
|
||||||
|
|
||||||
// Verify task still exists
|
|
||||||
let status = task_manager2
|
let status = task_manager2
|
||||||
.get("persistence-test-123")
|
.get("persistence-test-123")
|
||||||
.await
|
.await
|
||||||
.expect("Failed to get status after restart");
|
.expect("Task not found after restart");
|
||||||
|
|
||||||
assert_eq!(status.id, "persistence-test-123");
|
assert_eq!(status.id, "persistence-test-123");
|
||||||
assert_eq!(status.state, TaskState::Waiting.as_str());
|
assert_eq!(status.state, TaskState::Waiting.as_str());
|
||||||
|
|
||||||
// Cleanup
|
|
||||||
let _ = db
|
let _ = db
|
||||||
.query("DELETE FROM a2a_tasks WHERE task_id = $task_id")
|
.query("DELETE FROM a2a_tasks WHERE task_id = $task_id")
|
||||||
.bind(("task_id", "persistence-test-123"))
|
.bind(("task_id", "persistence-test-123"))
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Test 2: NATS task completion updates DB correctly
|
/// Test 2: JetStream result updates DB — NatsBridge receives TaskResult and
|
||||||
|
/// persists completion to SurrealDB
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
#[ignore] // Requires SurrealDB + NATS running
|
#[ignore] // Requires SurrealDB + NATS running
|
||||||
async fn test_nats_task_completion_updates_db() {
|
async fn test_jetstream_task_completion_updates_db() {
|
||||||
let db = setup_test_db().await;
|
let db = setup_test_db().await;
|
||||||
let nats = setup_test_nats().await;
|
let registry = Arc::new(AgentRegistry::new(10));
|
||||||
|
let nats_bridge = setup_test_nats_bridge(registry.clone()).await;
|
||||||
|
|
||||||
let task_manager = Arc::new(TaskManager::new(db.clone()));
|
let task_manager = Arc::new(TaskManager::new(db.clone()));
|
||||||
let registry = Arc::new(AgentRegistry::new(10));
|
|
||||||
let config = AgentConfig::default();
|
let config = AgentConfig::default();
|
||||||
let coordinator = Arc::new(AgentCoordinator::new(config, registry).await.unwrap());
|
let coordinator = Arc::new(
|
||||||
|
AgentCoordinator::new(config, registry)
|
||||||
|
.await
|
||||||
|
.expect("Failed to create coordinator"),
|
||||||
|
);
|
||||||
|
|
||||||
let bridge = Arc::new(CoordinatorBridge::new(
|
let bridge = Arc::new(CoordinatorBridge::new(
|
||||||
coordinator,
|
coordinator,
|
||||||
task_manager.clone(),
|
task_manager.clone(),
|
||||||
Some(nats.clone()),
|
Some(nats_bridge.clone()),
|
||||||
));
|
));
|
||||||
|
|
||||||
bridge
|
bridge
|
||||||
.start_result_listener()
|
.start_result_listener()
|
||||||
.await
|
.await
|
||||||
.expect("Failed to start listener");
|
.expect("Failed to start result listener");
|
||||||
|
|
||||||
let task_id = "nats-completion-test-456".to_string();
|
let task_id = "jetstream-completion-test-456".to_string();
|
||||||
|
|
||||||
// Create task
|
|
||||||
let task = A2aTask {
|
let task = A2aTask {
|
||||||
id: task_id.clone(),
|
id: task_id.clone(),
|
||||||
message: A2aMessage {
|
message: A2aMessage {
|
||||||
role: "user".to_string(),
|
role: "user".to_string(),
|
||||||
parts: vec![A2aMessagePart::Text("Test NATS completion".to_string())],
|
parts: vec![A2aMessagePart::Text(
|
||||||
|
"Test JetStream completion".to_string(),
|
||||||
|
)],
|
||||||
},
|
},
|
||||||
metadata: Default::default(),
|
metadata: Default::default(),
|
||||||
};
|
};
|
||||||
|
|
@ -126,57 +137,57 @@ async fn test_nats_task_completion_updates_db() {
|
||||||
.await
|
.await
|
||||||
.expect("Failed to create task");
|
.expect("Failed to create task");
|
||||||
|
|
||||||
// Publish TaskCompleted message to NATS
|
// Publish TaskResult to JetStream via a separate raw client — simulates
|
||||||
let task_completed = TaskCompleted {
|
// agent completing a task and publishing to vapora.tasks.completed.
|
||||||
|
let raw_client = async_nats::connect("127.0.0.1:4222")
|
||||||
|
.await
|
||||||
|
.expect("Failed to connect raw NATS client");
|
||||||
|
let js = async_nats::jetstream::new(raw_client);
|
||||||
|
|
||||||
|
let result = TaskResult {
|
||||||
task_id: task_id.clone(),
|
task_id: task_id.clone(),
|
||||||
agent_id: "test-agent".to_string(),
|
agent_id: "test-agent".to_string(),
|
||||||
result: "Test output from agent".to_string(),
|
result: "Test output from agent".to_string(),
|
||||||
artifacts: vec!["/path/to/artifact.txt".to_string()],
|
success: true,
|
||||||
tokens_used: 100,
|
|
||||||
duration_ms: 500,
|
duration_ms: 500,
|
||||||
completed_at: chrono::Utc::now(),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let message = AgentMessage::TaskCompleted(task_completed);
|
js.publish(
|
||||||
nats.publish(
|
"vapora.tasks.completed".to_string(),
|
||||||
"vapora.tasks.completed",
|
serde_json::to_vec(&result).unwrap().into(),
|
||||||
serde_json::to_vec(&message).unwrap().into(),
|
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.expect("Failed to publish");
|
.expect("Failed to publish to JetStream")
|
||||||
|
.await
|
||||||
|
.expect("Failed to receive JetStream ack");
|
||||||
|
|
||||||
// Wait for DB update (give NATS subscriber time to process)
|
// Allow the pull consumer to fetch and process the message
|
||||||
sleep(Duration::from_millis(1000)).await;
|
sleep(Duration::from_millis(1500)).await;
|
||||||
|
|
||||||
// Verify DB updated
|
|
||||||
let status = task_manager
|
let status = task_manager
|
||||||
.get(&task_id)
|
.get(&task_id)
|
||||||
.await
|
.await
|
||||||
.expect("Failed to get status");
|
.expect("Failed to get task status");
|
||||||
|
|
||||||
assert_eq!(status.state, TaskState::Completed.as_str());
|
assert_eq!(status.state, TaskState::Completed.as_str());
|
||||||
assert!(status.result.is_some());
|
assert!(status.result.is_some());
|
||||||
|
|
||||||
let result = status.result.unwrap();
|
let result_msg = status.result.unwrap();
|
||||||
assert_eq!(result.message.parts.len(), 1);
|
assert_eq!(result_msg.message.parts.len(), 1);
|
||||||
|
|
||||||
if let A2aMessagePart::Text(text) = &result.message.parts[0] {
|
if let A2aMessagePart::Text(text) = &result_msg.message.parts[0] {
|
||||||
assert_eq!(text, "Test output from agent");
|
assert_eq!(text, "Test output from agent");
|
||||||
} else {
|
} else {
|
||||||
panic!("Expected text message part");
|
panic!("Expected text message part");
|
||||||
}
|
}
|
||||||
|
|
||||||
assert!(result.artifacts.is_some());
|
|
||||||
assert_eq!(result.artifacts.as_ref().unwrap().len(), 1);
|
|
||||||
|
|
||||||
// Cleanup
|
|
||||||
let _ = db
|
let _ = db
|
||||||
.query("DELETE FROM a2a_tasks WHERE task_id = $task_id")
|
.query("DELETE FROM a2a_tasks WHERE task_id = $task_id")
|
||||||
.bind(("task_id", task_id))
|
.bind(("task_id", task_id))
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Test 3: Task state transitions work correctly
|
/// Test 3: Task state transitions work correctly (SurrealDB only)
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
#[ignore] // Requires SurrealDB running
|
#[ignore] // Requires SurrealDB running
|
||||||
async fn test_task_state_transitions() {
|
async fn test_task_state_transitions() {
|
||||||
|
|
@ -194,7 +205,6 @@ async fn test_task_state_transitions() {
|
||||||
metadata: Default::default(),
|
metadata: Default::default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Create task (waiting state)
|
|
||||||
task_manager
|
task_manager
|
||||||
.create(task)
|
.create(task)
|
||||||
.await
|
.await
|
||||||
|
|
@ -203,7 +213,6 @@ async fn test_task_state_transitions() {
|
||||||
let status = task_manager.get(&task_id).await.unwrap();
|
let status = task_manager.get(&task_id).await.unwrap();
|
||||||
assert_eq!(status.state, TaskState::Waiting.as_str());
|
assert_eq!(status.state, TaskState::Waiting.as_str());
|
||||||
|
|
||||||
// Transition to working
|
|
||||||
task_manager
|
task_manager
|
||||||
.update_state(&task_id, TaskState::Working)
|
.update_state(&task_id, TaskState::Working)
|
||||||
.await
|
.await
|
||||||
|
|
@ -212,7 +221,6 @@ async fn test_task_state_transitions() {
|
||||||
let status = task_manager.get(&task_id).await.unwrap();
|
let status = task_manager.get(&task_id).await.unwrap();
|
||||||
assert_eq!(status.state, TaskState::Working.as_str());
|
assert_eq!(status.state, TaskState::Working.as_str());
|
||||||
|
|
||||||
// Complete task
|
|
||||||
let result = vapora_a2a::protocol::A2aTaskResult {
|
let result = vapora_a2a::protocol::A2aTaskResult {
|
||||||
message: A2aMessage {
|
message: A2aMessage {
|
||||||
role: "assistant".to_string(),
|
role: "assistant".to_string(),
|
||||||
|
|
@ -230,14 +238,13 @@ async fn test_task_state_transitions() {
|
||||||
assert_eq!(status.state, TaskState::Completed.as_str());
|
assert_eq!(status.state, TaskState::Completed.as_str());
|
||||||
assert!(status.result.is_some());
|
assert!(status.result.is_some());
|
||||||
|
|
||||||
// Cleanup
|
|
||||||
let _ = db
|
let _ = db
|
||||||
.query("DELETE FROM a2a_tasks WHERE task_id = $task_id")
|
.query("DELETE FROM a2a_tasks WHERE task_id = $task_id")
|
||||||
.bind(("task_id", task_id))
|
.bind(("task_id", task_id))
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Test 4: Task failure handling
|
/// Test 4: Task failure handling (SurrealDB only)
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
#[ignore] // Requires SurrealDB running
|
#[ignore] // Requires SurrealDB running
|
||||||
async fn test_task_failure_handling() {
|
async fn test_task_failure_handling() {
|
||||||
|
|
@ -260,7 +267,6 @@ async fn test_task_failure_handling() {
|
||||||
.await
|
.await
|
||||||
.expect("Failed to create task");
|
.expect("Failed to create task");
|
||||||
|
|
||||||
// Fail task
|
|
||||||
let error = vapora_a2a::protocol::A2aErrorObj {
|
let error = vapora_a2a::protocol::A2aErrorObj {
|
||||||
code: -1,
|
code: -1,
|
||||||
message: "Test error message".to_string(),
|
message: "Test error message".to_string(),
|
||||||
|
|
@ -276,7 +282,6 @@ async fn test_task_failure_handling() {
|
||||||
assert!(status.error.is_some());
|
assert!(status.error.is_some());
|
||||||
assert_eq!(status.error.unwrap().message, "Test error message");
|
assert_eq!(status.error.unwrap().message, "Test error message");
|
||||||
|
|
||||||
// Cleanup
|
|
||||||
let _ = db
|
let _ = db
|
||||||
.query("DELETE FROM a2a_tasks WHERE task_id = $task_id")
|
.query("DELETE FROM a2a_tasks WHERE task_id = $task_id")
|
||||||
.bind(("task_id", task_id))
|
.bind(("task_id", task_id))
|
||||||
|
|
@ -288,23 +293,27 @@ async fn test_task_failure_handling() {
|
||||||
#[ignore] // Requires SurrealDB + NATS + Agent running
|
#[ignore] // Requires SurrealDB + NATS + Agent running
|
||||||
async fn test_end_to_end_task_dispatch() {
|
async fn test_end_to_end_task_dispatch() {
|
||||||
let db = setup_test_db().await;
|
let db = setup_test_db().await;
|
||||||
let nats = setup_test_nats().await;
|
let registry = Arc::new(AgentRegistry::new(10));
|
||||||
|
let nats_bridge = setup_test_nats_bridge(registry.clone()).await;
|
||||||
|
|
||||||
let task_manager = Arc::new(TaskManager::new(db.clone()));
|
let task_manager = Arc::new(TaskManager::new(db.clone()));
|
||||||
let registry = Arc::new(AgentRegistry::new(10));
|
|
||||||
let config = AgentConfig::default();
|
let config = AgentConfig::default();
|
||||||
let coordinator = Arc::new(AgentCoordinator::new(config, registry).await.unwrap());
|
let coordinator = Arc::new(
|
||||||
|
AgentCoordinator::new(config, registry)
|
||||||
|
.await
|
||||||
|
.expect("Failed to create coordinator"),
|
||||||
|
);
|
||||||
|
|
||||||
let bridge = Arc::new(CoordinatorBridge::new(
|
let bridge = Arc::new(CoordinatorBridge::new(
|
||||||
coordinator,
|
coordinator,
|
||||||
task_manager.clone(),
|
task_manager.clone(),
|
||||||
Some(nats.clone()),
|
Some(nats_bridge),
|
||||||
));
|
));
|
||||||
|
|
||||||
bridge
|
bridge
|
||||||
.start_result_listener()
|
.start_result_listener()
|
||||||
.await
|
.await
|
||||||
.expect("Failed to start listener");
|
.expect("Failed to start result listener");
|
||||||
|
|
||||||
let task = A2aTask {
|
let task = A2aTask {
|
||||||
id: "e2e-test-task-001".to_string(),
|
id: "e2e-test-task-001".to_string(),
|
||||||
|
|
@ -317,57 +326,36 @@ async fn test_end_to_end_task_dispatch() {
|
||||||
metadata: Default::default(),
|
metadata: Default::default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Dispatch task
|
|
||||||
let task_id = bridge
|
let task_id = bridge
|
||||||
.dispatch(task)
|
.dispatch(task)
|
||||||
.await
|
.await
|
||||||
.expect("Failed to dispatch task");
|
.expect("Failed to dispatch task");
|
||||||
|
|
||||||
// Poll for completion with timeout
|
assert!(!task_id.is_empty());
|
||||||
let result = timeout(Duration::from_secs(60), async {
|
|
||||||
loop {
|
|
||||||
let status = bridge
|
|
||||||
.get_task(&task_id)
|
|
||||||
.await
|
|
||||||
.expect("Failed to get status");
|
|
||||||
|
|
||||||
match task_state_from_str(&status.state) {
|
let status_result = timeout(Duration::from_secs(30), async {
|
||||||
TaskState::Completed => return Ok(status),
|
loop {
|
||||||
TaskState::Failed => return Err(format!("Task failed: {:?}", status.error)),
|
let status = task_manager.get(&task_id).await.unwrap();
|
||||||
_ => {
|
if status.state == TaskState::Completed.as_str()
|
||||||
|
|| status.state == TaskState::Failed.as_str()
|
||||||
|
{
|
||||||
|
return status;
|
||||||
|
}
|
||||||
sleep(Duration::from_millis(500)).await;
|
sleep(Duration::from_millis(500)).await;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
match result {
|
assert!(
|
||||||
Ok(Ok(status)) => {
|
status_result.is_ok(),
|
||||||
println!("Task completed successfully: {:?}", status);
|
"Task did not complete within 30 seconds"
|
||||||
assert_eq!(status.state, TaskState::Completed.as_str());
|
|
||||||
}
|
|
||||||
Ok(Err(e)) => panic!("Task failed: {}", e),
|
|
||||||
Err(_) => {
|
|
||||||
println!(
|
|
||||||
"Task did not complete within 60 seconds (this is expected if no agent is running)"
|
|
||||||
);
|
);
|
||||||
// Cleanup partial task
|
|
||||||
|
let final_status = status_result.unwrap();
|
||||||
|
assert_eq!(final_status.state, TaskState::Completed.as_str());
|
||||||
|
|
||||||
let _ = db
|
let _ = db
|
||||||
.query("DELETE FROM a2a_tasks WHERE task_id = $task_id")
|
.query("DELETE FROM a2a_tasks WHERE task_id = $task_id")
|
||||||
.bind(("task_id", task_id))
|
.bind(("task_id", task_id))
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper to convert string to TaskState
|
|
||||||
fn task_state_from_str(s: &str) -> TaskState {
|
|
||||||
match s {
|
|
||||||
"waiting" => TaskState::Waiting,
|
|
||||||
"working" => TaskState::Working,
|
|
||||||
"completed" => TaskState::Completed,
|
|
||||||
"failed" => TaskState::Failed,
|
|
||||||
_ => TaskState::Waiting,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ pub mod coordinator;
|
||||||
pub mod learning_profile;
|
pub mod learning_profile;
|
||||||
pub mod loader;
|
pub mod loader;
|
||||||
pub mod messages;
|
pub mod messages;
|
||||||
|
pub mod nats_bridge;
|
||||||
pub mod persistence_trait;
|
pub mod persistence_trait;
|
||||||
pub mod profile_adapter;
|
pub mod profile_adapter;
|
||||||
pub mod registry;
|
pub mod registry;
|
||||||
|
|
|
||||||
425
crates/vapora-agents/src/nats_bridge.rs
Normal file
425
crates/vapora-agents/src/nats_bridge.rs
Normal file
|
|
@ -0,0 +1,425 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use async_nats::jetstream::{self, consumer::pull, stream};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use thiserror::Error;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tracing::{error, info, warn};
|
||||||
|
|
||||||
|
use crate::registry::AgentRegistry;
|
||||||
|
|
||||||
|
/// Configuration for the NATS JetStream broker connection.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct NatsBrokerConfig {
|
||||||
|
/// NATS server URL (e.g. `nats://localhost:4222`).
|
||||||
|
pub url: String,
|
||||||
|
/// JetStream stream name; created if it does not exist.
|
||||||
|
pub stream_name: String,
|
||||||
|
/// Subject for dispatching tasks into the stream.
|
||||||
|
pub dispatch_subject: String,
|
||||||
|
/// Subject consumed for task results (maps to `vapora.tasks.completed`).
|
||||||
|
pub result_subject: String,
|
||||||
|
/// Durable consumer name for result subscriptions.
|
||||||
|
pub consumer_name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for NatsBrokerConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
url: "nats://localhost:4222".to_string(),
|
||||||
|
stream_name: "VAPORA_TASKS".to_string(),
|
||||||
|
dispatch_subject: "vapora.tasks.dispatch".to_string(),
|
||||||
|
result_subject: crate::messages::subjects::TASKS_COMPLETED.to_string(),
|
||||||
|
consumer_name: "vapora-nats-bridge".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Task message dispatched to JetStream for agent processing.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct TaskMessage {
|
||||||
|
pub task_id: String,
|
||||||
|
pub title: String,
|
||||||
|
pub description: String,
|
||||||
|
pub role: String,
|
||||||
|
pub context: String,
|
||||||
|
pub priority: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Task result received from the `vapora.tasks.completed` JetStream subject.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct TaskResult {
|
||||||
|
pub task_id: String,
|
||||||
|
pub agent_id: String,
|
||||||
|
pub result: String,
|
||||||
|
pub success: bool,
|
||||||
|
pub duration_ms: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Agent status snapshot from the registry — never hardcoded.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct AgentStatusInfo {
|
||||||
|
pub agent_id: String,
|
||||||
|
pub role: String,
|
||||||
|
pub available: bool,
|
||||||
|
pub current_tasks: usize,
|
||||||
|
pub max_tasks: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum NatsBridgeError {
|
||||||
|
#[error("NATS connection failed: {0}")]
|
||||||
|
Connection(String),
|
||||||
|
|
||||||
|
#[error("JetStream stream error: {0}")]
|
||||||
|
Stream(String),
|
||||||
|
|
||||||
|
#[error("JetStream consumer error: {0}")]
|
||||||
|
Consumer(String),
|
||||||
|
|
||||||
|
#[error("Publish ack error: {0}")]
|
||||||
|
Publish(String),
|
||||||
|
|
||||||
|
#[error("Serialization error: {0}")]
|
||||||
|
Serialization(#[from] serde_json::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// JetStream-backed bridge for durable task dispatch and result consumption.
|
||||||
|
///
|
||||||
|
/// Maintains a `VAPORA_TASKS` stream covering `vapora.tasks.dispatch` and
|
||||||
|
/// `vapora.tasks.completed`. Tasks are published with full JetStream acks;
|
||||||
|
/// results are consumed via a durable pull consumer so no messages are lost on
|
||||||
|
/// process restart.
|
||||||
|
pub struct NatsBridge {
|
||||||
|
jetstream: jetstream::Context,
|
||||||
|
config: NatsBrokerConfig,
|
||||||
|
registry: Arc<AgentRegistry>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NatsBridge {
|
||||||
|
/// Connect to NATS and ensure the JetStream stream exists.
|
||||||
|
///
|
||||||
|
/// The stream uses `WorkQueue` retention so each message is delivered to
|
||||||
|
/// exactly one consumer and removed after ack.
|
||||||
|
pub async fn connect(
|
||||||
|
config: NatsBrokerConfig,
|
||||||
|
registry: Arc<AgentRegistry>,
|
||||||
|
) -> Result<Self, NatsBridgeError> {
|
||||||
|
let client = async_nats::connect(&config.url)
|
||||||
|
.await
|
||||||
|
.map_err(|e| NatsBridgeError::Connection(e.to_string()))?;
|
||||||
|
|
||||||
|
let jetstream = jetstream::new(client);
|
||||||
|
|
||||||
|
let stream_config = stream::Config {
|
||||||
|
name: config.stream_name.clone(),
|
||||||
|
subjects: vec![
|
||||||
|
config.dispatch_subject.clone(),
|
||||||
|
config.result_subject.clone(),
|
||||||
|
],
|
||||||
|
retention: stream::RetentionPolicy::WorkQueue,
|
||||||
|
storage: stream::StorageType::File,
|
||||||
|
num_replicas: 1,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
jetstream
|
||||||
|
.get_or_create_stream(stream_config)
|
||||||
|
.await
|
||||||
|
.map_err(|e| NatsBridgeError::Stream(e.to_string()))?;
|
||||||
|
|
||||||
|
info!(
|
||||||
|
url = %config.url,
|
||||||
|
stream = %config.stream_name,
|
||||||
|
dispatch = %config.dispatch_subject,
|
||||||
|
result = %config.result_subject,
|
||||||
|
"NatsBridge connected; JetStream stream ensured"
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
jetstream,
|
||||||
|
config,
|
||||||
|
registry,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Publish a task to JetStream with a synchronous ack.
|
||||||
|
///
|
||||||
|
/// Returns the stream sequence number assigned to this message. The ack
|
||||||
|
/// guarantees the message reached the NATS server and was persisted in the
|
||||||
|
/// stream before returning.
|
||||||
|
pub async fn submit_task(&self, task: TaskMessage) -> Result<u64, NatsBridgeError> {
|
||||||
|
let payload = serde_json::to_vec(&task)?;
|
||||||
|
|
||||||
|
let ack = self
|
||||||
|
.jetstream
|
||||||
|
.publish(self.config.dispatch_subject.clone(), payload.into())
|
||||||
|
.await
|
||||||
|
.map_err(|e| NatsBridgeError::Publish(e.to_string()))?
|
||||||
|
.await
|
||||||
|
.map_err(|e| NatsBridgeError::Publish(e.to_string()))?;
|
||||||
|
|
||||||
|
info!(
|
||||||
|
task_id = %task.task_id,
|
||||||
|
sequence = ack.sequence,
|
||||||
|
subject = %self.config.dispatch_subject,
|
||||||
|
"Task dispatched to JetStream"
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(ack.sequence)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Start a background pull consumer for `vapora.tasks.completed`.
|
||||||
|
///
|
||||||
|
/// Spawns a tokio task that continuously fetches from the durable consumer
|
||||||
|
/// and sends deserialized `TaskResult` values into the returned channel.
|
||||||
|
/// Each message is ack-ed after successful deserialization; failed
|
||||||
|
/// deserialization sends a Nak so the message is redelivered.
|
||||||
|
///
|
||||||
|
/// The consumer is durable — if the process restarts it will resume from
|
||||||
|
/// the last unacked position.
|
||||||
|
pub async fn subscribe_task_results(
|
||||||
|
&self,
|
||||||
|
) -> Result<mpsc::Receiver<TaskResult>, NatsBridgeError> {
|
||||||
|
let stream = self
|
||||||
|
.jetstream
|
||||||
|
.get_stream(&self.config.stream_name)
|
||||||
|
.await
|
||||||
|
.map_err(|e| NatsBridgeError::Stream(e.to_string()))?;
|
||||||
|
|
||||||
|
let consumer: async_nats::jetstream::consumer::Consumer<pull::Config> = stream
|
||||||
|
.get_or_create_consumer(
|
||||||
|
&self.config.consumer_name,
|
||||||
|
pull::Config {
|
||||||
|
durable_name: Some(self.config.consumer_name.clone()),
|
||||||
|
filter_subject: self.config.result_subject.clone(),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|e| NatsBridgeError::Consumer(e.to_string()))?;
|
||||||
|
|
||||||
|
let (tx, rx) = mpsc::channel::<TaskResult>(256);
|
||||||
|
|
||||||
|
tokio::spawn(run_result_consumer(consumer, tx));
|
||||||
|
|
||||||
|
info!(
|
||||||
|
consumer = %self.config.consumer_name,
|
||||||
|
subject = %self.config.result_subject,
|
||||||
|
"Task result pull consumer started"
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(rx)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List all registered agents from the registry.
|
||||||
|
///
|
||||||
|
/// Never returns hardcoded data — always reflects the live registry state.
|
||||||
|
pub fn list_agents(&self) -> Vec<AgentStatusInfo> {
|
||||||
|
self.registry
|
||||||
|
.list_all()
|
||||||
|
.into_iter()
|
||||||
|
.map(|agent| {
|
||||||
|
let available = agent.can_accept_task();
|
||||||
|
AgentStatusInfo {
|
||||||
|
agent_id: agent.id,
|
||||||
|
role: agent.role,
|
||||||
|
available,
|
||||||
|
current_tasks: agent.current_tasks as usize,
|
||||||
|
max_tasks: agent.max_concurrent_tasks as usize,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Pull consumer loop: fetches JetStream messages, deserializes, acks, and
|
||||||
|
/// forwards `TaskResult` values to `tx`. Extracted to avoid excessive nesting
|
||||||
|
/// inside the `tokio::spawn` closure.
|
||||||
|
async fn run_result_consumer(
|
||||||
|
consumer: async_nats::jetstream::consumer::Consumer<pull::Config>,
|
||||||
|
tx: mpsc::Sender<TaskResult>,
|
||||||
|
) {
|
||||||
|
let mut messages = match consumer.messages().await {
|
||||||
|
Ok(m) => m,
|
||||||
|
Err(e) => {
|
||||||
|
error!(error = %e, "Failed to create JetStream message stream");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
while let Some(msg_result) = futures::StreamExt::next(&mut messages).await {
|
||||||
|
let msg = match msg_result {
|
||||||
|
Ok(m) => m,
|
||||||
|
Err(e) => {
|
||||||
|
error!(error = %e, "JetStream message error; stopping consumer");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match serde_json::from_slice::<TaskResult>(&msg.payload) {
|
||||||
|
Ok(result) => {
|
||||||
|
if let Err(e) = msg.ack().await {
|
||||||
|
error!(error = %e, "Failed to ack JetStream message");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if tx.send(result).await.is_err() {
|
||||||
|
warn!("Result receiver dropped; stopping consumer loop");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(error = %e, "Failed to deserialize TaskResult; sending Nak");
|
||||||
|
if let Err(nak_err) = msg
|
||||||
|
.ack_with(async_nats::jetstream::AckKind::Nak(None))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
error!(error = %nak_err, "Failed to Nak message");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::registry::{AgentMetadata, AgentRegistry};
|
||||||
|
|
||||||
|
fn make_registry_with_agent() -> Arc<AgentRegistry> {
|
||||||
|
let reg = Arc::new(AgentRegistry::new(5));
|
||||||
|
let agent = AgentMetadata::new(
|
||||||
|
"developer".to_string(),
|
||||||
|
"Dev 1".to_string(),
|
||||||
|
"claude".to_string(),
|
||||||
|
"claude-sonnet-4-6".to_string(),
|
||||||
|
vec!["coding".to_string()],
|
||||||
|
);
|
||||||
|
reg.register_agent(agent).unwrap();
|
||||||
|
reg
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn list_agents_reflects_registry() {
|
||||||
|
let registry = make_registry_with_agent();
|
||||||
|
// NatsBridge::list_agents requires a connected bridge, so test via a
|
||||||
|
// registry snapshot directly to verify the mapping logic.
|
||||||
|
let agents: Vec<AgentStatusInfo> = registry
|
||||||
|
.list_all()
|
||||||
|
.into_iter()
|
||||||
|
.map(|agent| {
|
||||||
|
let available = agent.can_accept_task();
|
||||||
|
AgentStatusInfo {
|
||||||
|
agent_id: agent.id,
|
||||||
|
role: agent.role,
|
||||||
|
available,
|
||||||
|
current_tasks: agent.current_tasks as usize,
|
||||||
|
max_tasks: agent.max_concurrent_tasks as usize,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
assert_eq!(agents.len(), 1);
|
||||||
|
assert_eq!(agents[0].role, "developer");
|
||||||
|
assert!(agents[0].available);
|
||||||
|
assert_eq!(agents[0].current_tasks, 0);
|
||||||
|
assert_eq!(agents[0].max_tasks, 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn task_message_roundtrip_serialization() {
|
||||||
|
let msg = TaskMessage {
|
||||||
|
task_id: "t-1".to_string(),
|
||||||
|
title: "Fix bug".to_string(),
|
||||||
|
description: "Critical regression in auth".to_string(),
|
||||||
|
role: "developer".to_string(),
|
||||||
|
context: "{}".to_string(),
|
||||||
|
priority: 90,
|
||||||
|
};
|
||||||
|
let bytes = serde_json::to_vec(&msg).unwrap();
|
||||||
|
let decoded: TaskMessage = serde_json::from_slice(&bytes).unwrap();
|
||||||
|
assert_eq!(decoded.task_id, msg.task_id);
|
||||||
|
assert_eq!(decoded.priority, msg.priority);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn task_result_roundtrip_serialization() {
|
||||||
|
let result = TaskResult {
|
||||||
|
task_id: "t-1".to_string(),
|
||||||
|
agent_id: "agent-dev-1".to_string(),
|
||||||
|
result: "Bug fixed in auth.rs:42".to_string(),
|
||||||
|
success: true,
|
||||||
|
duration_ms: 4200,
|
||||||
|
};
|
||||||
|
let bytes = serde_json::to_vec(&result).unwrap();
|
||||||
|
let decoded: TaskResult = serde_json::from_slice(&bytes).unwrap();
|
||||||
|
assert!(decoded.success);
|
||||||
|
assert_eq!(decoded.duration_ms, 4200);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Integration test: requires NATS server at nats://localhost:4222.
|
||||||
|
/// Run with: cargo test -p vapora-agents nats_bridge -- --ignored
|
||||||
|
#[tokio::test]
|
||||||
|
#[ignore]
|
||||||
|
async fn submit_task_receives_ack() {
|
||||||
|
let registry = make_registry_with_agent();
|
||||||
|
let config = NatsBrokerConfig::default();
|
||||||
|
|
||||||
|
let bridge = NatsBridge::connect(config, registry)
|
||||||
|
.await
|
||||||
|
.expect("NATS must be running at localhost:4222");
|
||||||
|
|
||||||
|
let task = TaskMessage {
|
||||||
|
task_id: "integration-t-1".to_string(),
|
||||||
|
title: "Integration test task".to_string(),
|
||||||
|
description: "Verify JetStream publish ack".to_string(),
|
||||||
|
role: "developer".to_string(),
|
||||||
|
context: "{}".to_string(),
|
||||||
|
priority: 50,
|
||||||
|
};
|
||||||
|
|
||||||
|
let sequence = bridge.submit_task(task).await.unwrap();
|
||||||
|
assert!(sequence > 0, "JetStream ack sequence must be > 0");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Integration test: requires NATS server at nats://localhost:4222.
|
||||||
|
#[tokio::test]
|
||||||
|
#[ignore]
|
||||||
|
async fn subscribe_task_results_receives_published_result() {
|
||||||
|
use tokio::time::{timeout, Duration};
|
||||||
|
|
||||||
|
let registry = make_registry_with_agent();
|
||||||
|
let config = NatsBrokerConfig::default();
|
||||||
|
|
||||||
|
let bridge = NatsBridge::connect(config.clone(), registry)
|
||||||
|
.await
|
||||||
|
.expect("NATS must be running at localhost:4222");
|
||||||
|
|
||||||
|
let mut rx = bridge.subscribe_task_results().await.unwrap();
|
||||||
|
|
||||||
|
// Publish a result directly to the result subject via JetStream
|
||||||
|
let result = TaskResult {
|
||||||
|
task_id: "integration-t-2".to_string(),
|
||||||
|
agent_id: "agent-dev-1".to_string(),
|
||||||
|
result: "Done".to_string(),
|
||||||
|
success: true,
|
||||||
|
duration_ms: 100,
|
||||||
|
};
|
||||||
|
let payload = serde_json::to_vec(&result).unwrap();
|
||||||
|
bridge
|
||||||
|
.jetstream
|
||||||
|
.publish(config.result_subject.clone(), payload.into())
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let received = timeout(Duration::from_secs(5), rx.recv())
|
||||||
|
.await
|
||||||
|
.expect("Timeout waiting for result")
|
||||||
|
.expect("Channel closed");
|
||||||
|
|
||||||
|
assert_eq!(received.task_id, "integration-t-2");
|
||||||
|
assert!(received.success);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,9 +1,7 @@
|
||||||
// Adapter implementing SwarmCoordination trait using real SwarmCoordinator
|
|
||||||
// Decouples agent orchestration from swarm details
|
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
use uuid::Uuid;
|
||||||
use vapora_swarm::coordinator::SwarmCoordinator;
|
use vapora_swarm::coordinator::SwarmCoordinator;
|
||||||
|
|
||||||
use crate::coordination::{AgentAssignment, AgentLoad, AgentProfile, SwarmCoordination};
|
use crate::coordination::{AgentAssignment, AgentLoad, AgentProfile, SwarmCoordination};
|
||||||
|
|
@ -23,7 +21,6 @@ impl SwarmCoordinationAdapter {
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl SwarmCoordination for SwarmCoordinationAdapter {
|
impl SwarmCoordination for SwarmCoordinationAdapter {
|
||||||
async fn register_profiles(&self, profiles: Vec<AgentProfile>) -> anyhow::Result<()> {
|
async fn register_profiles(&self, profiles: Vec<AgentProfile>) -> anyhow::Result<()> {
|
||||||
// Convert internal AgentProfile to swarm's AgentProfile
|
|
||||||
for profile in profiles {
|
for profile in profiles {
|
||||||
let swarm_profile = vapora_swarm::messages::AgentProfile {
|
let swarm_profile = vapora_swarm::messages::AgentProfile {
|
||||||
id: profile.id.clone(),
|
id: profile.id.clone(),
|
||||||
|
|
@ -38,37 +35,79 @@ impl SwarmCoordination for SwarmCoordinationAdapter {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Select best agent via swarm bidding.
|
||||||
|
///
|
||||||
|
/// Uses `submit_task_for_bidding` which applies load-balanced scoring
|
||||||
|
/// (success_rate / (1 + current_load)) across all available agents with
|
||||||
|
/// matching capabilities.
|
||||||
async fn select_agent(
|
async fn select_agent(
|
||||||
&self,
|
&self,
|
||||||
_task_type: &str,
|
task_type: &str,
|
||||||
_required_expertise: Option<&str>,
|
required_expertise: Option<&str>,
|
||||||
) -> anyhow::Result<AgentAssignment> {
|
) -> anyhow::Result<AgentAssignment> {
|
||||||
// For now, return a placeholder - real swarm selection would happen here
|
let capabilities: Vec<String> = match required_expertise {
|
||||||
// This is a simplified version - full implementation would query
|
Some(exp) => vec![task_type.to_string(), exp.to_string()],
|
||||||
// swarm.submit_task_for_bidding()
|
None => vec![task_type.to_string()],
|
||||||
|
};
|
||||||
|
|
||||||
|
// Use a ephemeral task_id for selection; the caller manages actual task IDs.
|
||||||
|
let selection_id = Uuid::new_v4().to_string();
|
||||||
|
|
||||||
|
let agent_id = self
|
||||||
|
.swarm
|
||||||
|
.submit_task_for_bidding(selection_id, task_type.to_string(), capabilities)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("Swarm bidding failed: {}", e))?
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("No available agent for task_type: {}", task_type))?;
|
||||||
|
|
||||||
|
let confidence = self
|
||||||
|
.swarm
|
||||||
|
.get_agent(&agent_id)
|
||||||
|
.map(|profile| profile.success_rate)
|
||||||
|
.unwrap_or(0.5);
|
||||||
|
|
||||||
Ok(AgentAssignment {
|
Ok(AgentAssignment {
|
||||||
agent_id: "default-agent".to_string(),
|
// Swarm profiles use ID as display name (no separate name field)
|
||||||
agent_name: "Default Agent".to_string(),
|
agent_name: agent_id.clone(),
|
||||||
confidence: 0.5,
|
agent_id,
|
||||||
|
confidence,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Report task completion and update agent load in the swarm.
|
||||||
|
///
|
||||||
|
/// On success the agent is marked available with minimal load.
|
||||||
|
/// On failure the agent receives a penalty load (0.5) to deprioritize it
|
||||||
|
/// in future selections until it recovers.
|
||||||
async fn report_completion(
|
async fn report_completion(
|
||||||
&self,
|
&self,
|
||||||
_agent_id: &str,
|
agent_id: &str,
|
||||||
_success: bool,
|
success: bool,
|
||||||
_duration_ms: u64,
|
_duration_ms: u64,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// Report task completion to swarm for load balancing updates
|
let new_load = if success { 0.0 } else { 0.5 };
|
||||||
Ok(())
|
self.swarm
|
||||||
|
.update_agent_status(agent_id, new_load, true)
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to update agent status: {}", e))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn agent_load(&self, _agent_id: &str) -> anyhow::Result<AgentLoad> {
|
/// Query current agent load from swarm profile.
|
||||||
// Query agent load from swarm
|
///
|
||||||
|
/// Infers `current_tasks` from the fractional load stored in the swarm
|
||||||
|
/// profile (each task represents ~10% of a capacity-10 agent).
|
||||||
|
async fn agent_load(&self, agent_id: &str) -> anyhow::Result<AgentLoad> {
|
||||||
|
let profile = self
|
||||||
|
.swarm
|
||||||
|
.get_agent(agent_id)
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("Agent not found in swarm: {}", agent_id))?;
|
||||||
|
|
||||||
|
const CAPACITY: usize = 10;
|
||||||
|
let current_tasks = (profile.current_load * CAPACITY as f64).round() as usize;
|
||||||
|
|
||||||
Ok(AgentLoad {
|
Ok(AgentLoad {
|
||||||
agent_id: _agent_id.to_string(),
|
agent_id: agent_id.to_string(),
|
||||||
current_tasks: 0,
|
current_tasks,
|
||||||
capacity: 10,
|
capacity: CAPACITY,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ keywords = ["leptos", "ui", "components", "glassmorphism", "wasm"]
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
crate-type = ["cdylib", "rlib"]
|
crate-type = ["cdylib", "rlib"]
|
||||||
|
doctest = false
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
||||||
|
|
|
||||||
|
|
@ -331,6 +331,11 @@ impl SwarmCoordinator {
|
||||||
.get(task_id)
|
.get(task_id)
|
||||||
.map(|entry| entry.value().clone())
|
.map(|entry| entry.value().clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get agent profile by ID (for load/status queries).
|
||||||
|
pub fn get_agent(&self, agent_id: &str) -> Option<AgentProfile> {
|
||||||
|
self.agents.get(agent_id).map(|entry| entry.value().clone())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Swarm statistics
|
/// Swarm statistics
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue