8.8 KiB
8.8 KiB
ADR-021: Real-Time WebSocket Updates via Broadcast
Status: Accepted | Implemented Date: 2024-11-01 Deciders: Frontend Architecture Team Technical Story: Enabling real-time workflow progress updates to multiple clients
Decision
Implementar real-time WebSocket updates usando tokio::sync::broadcast para pub/sub de workflow progress.
Rationale
- Real-Time UX: Usuarios ven cambios inmediatos (no polling)
- Broadcast Efficiency:
broadcastchannel permite fan-out a múltiples clientes - No State Tracking: No mantener per-client state, channel maneja distribución
- Async-Native:
tokio::syncintegrado con Tokio runtime
Alternatives Considered
❌ HTTP Long-Polling
- Pros: Simple, no WebSocket complexity
- Cons: High latency, resource-intensive
❌ Server-Sent Events (SSE)
- Pros: HTTP-based, simpler than WebSocket
- Cons: Unidirectional only (server→client)
✅ WebSocket + Broadcast (CHOSEN)
- Bidirectional, low latency, efficient fan-out
Trade-offs
Pros:
- ✅ Real-time updates (sub-100ms latency)
- ✅ Efficient broadcast (no per-client loops)
- ✅ Bidirectional communication
- ✅ Lower bandwidth than polling
Cons:
- ⚠️ Connection state management complex
- ⚠️ Harder to scale beyond single server
- ⚠️ Client reconnection handling needed
Implementation
Broadcast Channel Setup:
// crates/vapora-backend/src/main.rs
use tokio::sync::broadcast;
// Create broadcast channel (buffer size = 100 messages)
let (tx, _rx) = broadcast::channel(100);
// Share broadcaster in app state
let app_state = AppState::new(/* ... */)
.with_broadcast_tx(tx.clone());
Workflow Progress Event:
// crates/vapora-backend/src/workflow.rs
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowUpdate {
pub workflow_id: String,
pub status: WorkflowStatus,
pub current_step: u32,
pub total_steps: u32,
pub message: String,
pub timestamp: DateTime<Utc>,
}
pub async fn update_workflow_status(
db: &Surreal<Ws>,
tx: &broadcast::Sender<WorkflowUpdate>,
workflow_id: &str,
status: WorkflowStatus,
) -> Result<()> {
// Update database
let updated = db
.query("UPDATE workflows SET status = $1 WHERE id = $2")
.bind((status, workflow_id))
.await?;
// Broadcast update to all subscribers
let update = WorkflowUpdate {
workflow_id: workflow_id.to_string(),
status,
current_step: 0, // Fetch from DB if needed
total_steps: 0,
message: format!("Workflow status changed to {:?}", status),
timestamp: Utc::now(),
};
// Ignore if no subscribers (channel will be dropped)
let _ = tx.send(update);
Ok(())
}
WebSocket Handler:
// crates/vapora-backend/src/api/websocket.rs
use axum::extract::ws::{WebSocket, WebSocketUpgrade};
use futures::{sink::SinkExt, stream::StreamExt};
pub async fn websocket_handler(
ws: WebSocketUpgrade,
State(app_state): State<AppState>,
Path(workflow_id): Path<String>,
) -> impl IntoResponse {
ws.on_upgrade(|socket| handle_socket(socket, app_state, workflow_id))
}
async fn handle_socket(
socket: WebSocket,
app_state: AppState,
workflow_id: String,
) {
let (mut sender, mut receiver) = socket.split();
// Subscribe to workflow updates
let mut rx = app_state.broadcast_tx.subscribe();
// Task 1: Forward broadcast updates to WebSocket client
let workflow_id_clone = workflow_id.clone();
let send_task = tokio::spawn(async move {
while let Ok(update) = rx.recv().await {
// Filter: only send updates for this workflow
if update.workflow_id == workflow_id_clone {
if let Ok(msg) = serde_json::to_string(&update) {
if sender.send(Message::Text(msg)).await.is_err() {
break; // Client disconnected
}
}
}
}
});
// Task 2: Listen for client messages (if any)
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
match msg {
Message::Close(_) => break,
Message::Ping(data) => {
// Respond to ping (keep-alive)
let _ = receiver.send(Message::Pong(data)).await;
}
_ => {}
}
}
});
// Wait for either task to complete (client disconnect or broadcast end)
tokio::select! {
_ = &mut send_task => {},
_ = &mut recv_task => {},
}
}
Frontend Integration (Leptos):
// crates/vapora-frontend/src/api/websocket.rs
use leptos::*;
#[component]
pub fn WorkflowProgressMonitor(workflow_id: String) -> impl IntoView {
let (progress, set_progress) = create_signal::<Option<WorkflowUpdate>>(None);
create_effect(move |_| {
let workflow_id = workflow_id.clone();
spawn_local(async move {
match create_websocket_connection(&format!(
"ws://localhost:8001/api/workflows/{}/updates",
workflow_id
)) {
Ok(ws) => {
loop {
match ws.recv().await {
Ok(msg) => {
if let Ok(update) = serde_json::from_str::<WorkflowUpdate>(&msg) {
set_progress(Some(update));
}
}
Err(_) => break,
}
}
}
Err(e) => eprintln!("WebSocket error: {:?}", e),
}
});
});
view! {
<div class="workflow-progress">
{move || {
progress().map(|update| {
view! {
<div class="progress-item">
<p>{&update.message}</p>
<progress
value={update.current_step}
max={update.total_steps}
/>
</div>
}
})
}}
</div>
}
}
Connection Management:
pub async fn connection_with_reconnect(
ws_url: &str,
max_retries: u32,
) -> Result<WebSocket> {
let mut retries = 0;
loop {
match connect_websocket(ws_url).await {
Ok(ws) => return Ok(ws),
Err(e) if retries < max_retries => {
retries += 1;
let backoff_ms = 100 * 2_u64.pow(retries);
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
}
Err(e) => return Err(e),
}
}
}
Key Files:
/crates/vapora-backend/src/api/websocket.rs(WebSocket handler)/crates/vapora-backend/src/workflow.rs(broadcast events)/crates/vapora-frontend/src/api/websocket.rs(Leptos client)
Verification
# Test broadcast channel basic functionality
cargo test -p vapora-backend test_broadcast_basic
# Test multiple subscribers
cargo test -p vapora-backend test_broadcast_multiple_subscribers
# Test filtering (only send relevant updates)
cargo test -p vapora-backend test_broadcast_filtering
# Integration: full WebSocket lifecycle
cargo test -p vapora-backend test_websocket_full_lifecycle
# Connection stability test
cargo test -p vapora-backend test_websocket_disconnection_handling
# Load test: multiple concurrent connections
cargo test -p vapora-backend test_websocket_concurrent_connections
Expected Output:
- Updates broadcast to all subscribers
- Only relevant workflow updates sent per subscription
- Client disconnections handled gracefully
- Reconnection with backoff works
- Latency < 100ms
- Scales to 100+ concurrent connections
Consequences
Scalability
- Single server: broadcast works well
- Multiple servers: need message broker (Redis, NATS)
- Load balancer: sticky sessions or server-wide broadcast
Connection Management
- Automatic cleanup on client disconnect
- Backpressure handling (dropped messages if queue full)
- Per-connection state minimal
Frontend
- Real-time UX without polling
- Automatic disconnection handling
- Graceful degradation if WebSocket unavailable
Monitoring
- Track concurrent WebSocket connections
- Monitor broadcast channel depth
- Alert on high message loss
References
- Tokio Broadcast Documentation
/crates/vapora-backend/src/api/websocket.rs(implementation)/crates/vapora-frontend/src/api/websocket.rs(client integration)
Related ADRs: ADR-003 (Leptos Frontend), ADR-002 (Axum Backend)