Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

WebSocket API Reference

This document provides comprehensive documentation for the WebSocket API used for real-time monitoring, event streaming, and live updates in provisioning.

Overview

The WebSocket API enables real-time communication between clients and the provisioning orchestrator, providing:

  • Live workflow progress updates
  • System health monitoring
  • Event streaming
  • Real-time metrics
  • Interactive debugging sessions

WebSocket Endpoints

Primary WebSocket Endpoint

ws://localhost:9090/ws

The main WebSocket endpoint for real-time events and monitoring.

Connection Parameters:

  • token: JWT authentication token (required)
  • events: Comma-separated list of event types to subscribe to (optional)
  • batch_size: Maximum number of events per message (default: 10)
  • compression: Enable message compression (default: false)

Example Connection:

const ws = new WebSocket('ws://localhost:9090/ws?token=jwt-token&events=task,batch,system');

Specialized WebSocket Endpoints

ws://localhost:9090/metrics

Real-time metrics streaming endpoint.

Features:

  • Live system metrics
  • Performance data
  • Resource utilization
  • Custom metric streams

ws://localhost:9090/logs

Live log streaming endpoint.

Features:

  • Real-time log tailing
  • Log level filtering
  • Component-specific logs
  • Search and filtering

Authentication

JWT Token Authentication

All WebSocket connections require authentication via JWT token:

// Include token in connection URL
const ws = new WebSocket('ws://localhost:9090/ws?token=' + jwtToken);

// Or send token after connection
ws.onopen = function() {
  ws.send(JSON.stringify({
    type: 'auth',
    token: jwtToken
  }));
};

Connection Authentication Flow

  1. Initial Connection: Client connects with token parameter
  2. Token Validation: Server validates JWT token
  3. Authorization: Server checks token permissions
  4. Subscription: Client subscribes to event types
  5. Event Stream: Server begins streaming events

Event Types and Schemas

Core Event Types

Task Status Changed

Fired when a workflow task status changes.

{
  "event_type": "TaskStatusChanged",
  "timestamp": "2025-09-26T10:00:00Z",
  "data": {
    "task_id": "uuid-string",
    "name": "create_servers",
    "status": "Running",
    "previous_status": "Pending",
    "progress": 45.5
  },
  "metadata": {
    "task_id": "uuid-string",
    "workflow_type": "server_creation",
    "infra": "production"
  }
}

Batch Operation Update

Fired when batch operation status changes.

{
  "event_type": "BatchOperationUpdate",
  "timestamp": "2025-09-26T10:00:00Z",
  "data": {
    "batch_id": "uuid-string",
    "name": "multi_cloud_deployment",
    "status": "Running",
    "progress": 65.0,
    "operations": [
      {
        "id": "upcloud_servers",
        "status": "Completed",
        "progress": 100.0
      },
      {
        "id": "aws_taskservs",
        "status": "Running",
        "progress": 30.0
      }
    ]
  },
  "metadata": {
    "total_operations": 5,
    "completed_operations": 2,
    "failed_operations": 0
  }
}

System Health Update

Fired when system health status changes.

{
  "event_type": "SystemHealthUpdate",
  "timestamp": "2025-09-26T10:00:00Z",
  "data": {
    "overall_status": "Healthy",
    "components": {
      "storage": {
        "status": "Healthy",
        "last_check": "2025-09-26T09:59:55Z"
      },
      "batch_coordinator": {
        "status": "Warning",
        "last_check": "2025-09-26T09:59:55Z",
        "message": "High memory usage"
      }
    },
    "metrics": {
      "cpu_usage": 45.2,
      "memory_usage": 2048,
      "disk_usage": 75.5,
      "active_workflows": 5
    }
  },
  "metadata": {
    "check_interval": 30,
    "next_check": "2025-09-26T10:00:30Z"
  }
}

Workflow Progress Update

Fired when workflow progress changes.

{
  "event_type": "WorkflowProgressUpdate",
  "timestamp": "2025-09-26T10:00:00Z",
  "data": {
    "workflow_id": "uuid-string",
    "name": "kubernetes_deployment",
    "progress": 75.0,
    "current_step": "Installing CNI",
    "total_steps": 8,
    "completed_steps": 6,
    "estimated_time_remaining": 120,
    "step_details": {
      "step_name": "Installing CNI",
      "step_progress": 45.0,
      "step_message": "Downloading Cilium components"
    }
  },
  "metadata": {
    "infra": "production",
    "provider": "upcloud",
    "started_at": "2025-09-26T09:45:00Z"
  }
}

Log Entry

Real-time log streaming.

{
  "event_type": "LogEntry",
  "timestamp": "2025-09-26T10:00:00Z",
  "data": {
    "level": "INFO",
    "message": "Server web-01 created successfully",
    "component": "server-manager",
    "task_id": "uuid-string",
    "details": {
      "server_id": "server-uuid",
      "hostname": "web-01",
      "ip_address": "10.0.1.100"
    }
  },
  "metadata": {
    "source": "orchestrator",
    "thread": "worker-1"
  }
}

Metric Update

Real-time metrics streaming.

{
  "event_type": "MetricUpdate",
  "timestamp": "2025-09-26T10:00:00Z",
  "data": {
    "metric_name": "workflow_duration",
    "metric_type": "histogram",
    "value": 180.5,
    "labels": {
      "workflow_type": "server_creation",
      "status": "completed",
      "infra": "production"
    }
  },
  "metadata": {
    "interval": 15,
    "aggregation": "average"
  }
}

Custom Event Types

Applications can define custom event types:

{
  "event_type": "CustomApplicationEvent",
  "timestamp": "2025-09-26T10:00:00Z",
  "data": {
    // Custom event data
  },
  "metadata": {
    "custom_field": "custom_value"
  }
}

Client-Side JavaScript API

Connection Management

class ProvisioningWebSocket {
  constructor(baseUrl, token, options = {}) {
    this.baseUrl = baseUrl;
    this.token = token;
    this.options = {
      reconnect: true,
      reconnectInterval: 5000,
      maxReconnectAttempts: 10,
      ...options
    };
    this.ws = null;
    this.reconnectAttempts = 0;
    this.eventHandlers = new Map();
  }

  connect() {
    const wsUrl = `${this.baseUrl}/ws?token=${this.token}`;
    this.ws = new WebSocket(wsUrl);

    this.ws.onopen = (event) => {
      console.log('WebSocket connected');
      this.reconnectAttempts = 0;
      this.emit('connected', event);
    };

    this.ws.onmessage = (event) => {
      try {
        const message = JSON.parse(event.data);
        this.handleMessage(message);
      } catch (error) {
        console.error('Failed to parse WebSocket message:', error);
      }
    };

    this.ws.onclose = (event) => {
      console.log('WebSocket disconnected');
      this.emit('disconnected', event);

      if (this.options.reconnect && this.reconnectAttempts < this.options.maxReconnectAttempts) {
        setTimeout(() => {
          this.reconnectAttempts++;
          console.log(`Reconnecting... (${this.reconnectAttempts}/${this.options.maxReconnectAttempts})`);
          this.connect();
        }, this.options.reconnectInterval);
      }
    };

    this.ws.onerror = (error) => {
      console.error('WebSocket error:', error);
      this.emit('error', error);
    };
  }

  handleMessage(message) {
    if (message.event_type) {
      this.emit(message.event_type, message);
      this.emit('message', message);
    }
  }

  on(eventType, handler) {
    if (!this.eventHandlers.has(eventType)) {
      this.eventHandlers.set(eventType, []);
    }
    this.eventHandlers.get(eventType).push(handler);
  }

  off(eventType, handler) {
    const handlers = this.eventHandlers.get(eventType);
    if (handlers) {
      const index = handlers.indexOf(handler);
      if (index > -1) {
        handlers.splice(index, 1);
      }
    }
  }

  emit(eventType, data) {
    const handlers = this.eventHandlers.get(eventType);
    if (handlers) {
      handlers.forEach(handler => {
        try {
          handler(data);
        } catch (error) {
          console.error(`Error in event handler for ${eventType}:`, error);
        }
      });
    }
  }

  send(message) {
    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(message));
    } else {
      console.warn('WebSocket not connected, message not sent');
    }
  }

  disconnect() {
    this.options.reconnect = false;
    if (this.ws) {
      this.ws.close();
    }
  }

  subscribe(eventTypes) {
    this.send({
      type: 'subscribe',
      events: Array.isArray(eventTypes) ? eventTypes : [eventTypes]
    });
  }

  unsubscribe(eventTypes) {
    this.send({
      type: 'unsubscribe',
      events: Array.isArray(eventTypes) ? eventTypes : [eventTypes]
    });
  }
}

// Usage example
const ws = new ProvisioningWebSocket('ws://localhost:9090', 'your-jwt-token');

ws.on('TaskStatusChanged', (event) => {
  console.log(`Task ${event.data.task_id} status: ${event.data.status}`);
  updateTaskUI(event.data);
});

ws.on('WorkflowProgressUpdate', (event) => {
  console.log(`Workflow progress: ${event.data.progress}%`);
  updateProgressBar(event.data.progress);
});

ws.on('SystemHealthUpdate', (event) => {
  console.log('System health:', event.data.overall_status);
  updateHealthIndicator(event.data);
});

ws.connect();

// Subscribe to specific events
ws.subscribe(['TaskStatusChanged', 'WorkflowProgressUpdate']);

Real-Time Dashboard Example

class ProvisioningDashboard {
  constructor(wsUrl, token) {
    this.ws = new ProvisioningWebSocket(wsUrl, token);
    this.setupEventHandlers();
    this.connect();
  }

  setupEventHandlers() {
    this.ws.on('TaskStatusChanged', this.handleTaskUpdate.bind(this));
    this.ws.on('BatchOperationUpdate', this.handleBatchUpdate.bind(this));
    this.ws.on('SystemHealthUpdate', this.handleHealthUpdate.bind(this));
    this.ws.on('WorkflowProgressUpdate', this.handleProgressUpdate.bind(this));
    this.ws.on('LogEntry', this.handleLogEntry.bind(this));
  }

  connect() {
    this.ws.connect();
  }

  handleTaskUpdate(event) {
    const taskCard = document.getElementById(`task-${event.data.task_id}`);
    if (taskCard) {
      taskCard.querySelector('.status').textContent = event.data.status;
      taskCard.querySelector('.status').className = `status ${event.data.status.toLowerCase()}`;

      if (event.data.progress) {
        const progressBar = taskCard.querySelector('.progress-bar');
        progressBar.style.width = `${event.data.progress}%`;
      }
    }
  }

  handleBatchUpdate(event) {
    const batchCard = document.getElementById(`batch-${event.data.batch_id}`);
    if (batchCard) {
      batchCard.querySelector('.batch-progress').style.width = `${event.data.progress}%`;

      event.data.operations.forEach(op => {
        const opElement = batchCard.querySelector(`[data-operation="${op.id}"]`);
        if (opElement) {
          opElement.querySelector('.operation-status').textContent = op.status;
          opElement.querySelector('.operation-progress').style.width = `${op.progress}%`;
        }
      });
    }
  }

  handleHealthUpdate(event) {
    const healthIndicator = document.getElementById('health-indicator');
    healthIndicator.className = `health-indicator ${event.data.overall_status.toLowerCase()}`;
    healthIndicator.textContent = event.data.overall_status;

    const metricsPanel = document.getElementById('metrics-panel');
    metricsPanel.innerHTML = `
      <div class="metric">CPU: ${event.data.metrics.cpu_usage}%</div>
      <div class="metric">Memory: ${Math.round(event.data.metrics.memory_usage / 1024 / 1024)}MB</div>
      <div class="metric">Disk: ${event.data.metrics.disk_usage}%</div>
      <div class="metric">Active Workflows: ${event.data.metrics.active_workflows}</div>
    `;
  }

  handleProgressUpdate(event) {
    const workflowCard = document.getElementById(`workflow-${event.data.workflow_id}`);
    if (workflowCard) {
      const progressBar = workflowCard.querySelector('.workflow-progress');
      const stepInfo = workflowCard.querySelector('.step-info');

      progressBar.style.width = `${event.data.progress}%`;
      stepInfo.textContent = `${event.data.current_step} (${event.data.completed_steps}/${event.data.total_steps})`;

      if (event.data.estimated_time_remaining) {
        const timeRemaining = workflowCard.querySelector('.time-remaining');
        timeRemaining.textContent = `${Math.round(event.data.estimated_time_remaining / 60)} min remaining`;
      }
    }
  }

  handleLogEntry(event) {
    const logContainer = document.getElementById('log-container');
    const logEntry = document.createElement('div');
    logEntry.className = `log-entry log-${event.data.level.toLowerCase()}`;
    logEntry.innerHTML = `
      <span class="log-timestamp">${new Date(event.timestamp).toLocaleTimeString()}</span>
      <span class="log-level">${event.data.level}</span>
      <span class="log-component">${event.data.component}</span>
      <span class="log-message">${event.data.message}</span>
    `;

    logContainer.appendChild(logEntry);

    // Auto-scroll to bottom
    logContainer.scrollTop = logContainer.scrollHeight;

    // Limit log entries to prevent memory issues
    const maxLogEntries = 1000;
    if (logContainer.children.length > maxLogEntries) {
      logContainer.removeChild(logContainer.firstChild);
    }
  }
}

// Initialize dashboard
const dashboard = new ProvisioningDashboard('ws://localhost:9090', jwtToken);

Server-Side Implementation

Rust WebSocket Handler

The orchestrator implements WebSocket support using Axum and Tokio:

use axum::{
    extract::{ws::WebSocket, ws::WebSocketUpgrade, Query, State},
    response::Response,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::sync::broadcast;

#[derive(Debug, Deserialize)]
pub struct WsQuery {
    token: String,
    events: Option<String>,
    batch_size: Option<usize>,
    compression: Option<bool>,
}

#[derive(Debug, Clone, Serialize)]
pub struct WebSocketMessage {
    pub event_type: String,
    pub timestamp: chrono::DateTime<chrono::Utc>,
    pub data: serde_json::Value,
    pub metadata: HashMap<String, String>,
}

pub async fn websocket_handler(
    ws: WebSocketUpgrade,
    Query(params): Query<WsQuery>,
    State(state): State<SharedState>,
) -> Response {
    // Validate JWT token
    let claims = match state.auth_service.validate_token(&params.token) {
        Ok(claims) => claims,
        Err(_) => return Response::builder()
            .status(401)
            .body("Unauthorized".into())
            .unwrap(),
    };

    ws.on_upgrade(move |socket| handle_socket(socket, params, claims, state))
}

async fn handle_socket(
    socket: WebSocket,
    params: WsQuery,
    claims: Claims,
    state: SharedState,
) {
    let (mut sender, mut receiver) = socket.split();

    // Subscribe to event stream
    let mut event_rx = state.monitoring_system.subscribe_to_events().await;

    // Parse requested event types
    let requested_events: Vec<String> = params.events
        .unwrap_or_default()
        .split(',')
        .map(|s| s.trim().to_string())
        .filter(|s| !s.is_empty())
        .collect();

    // Handle incoming messages from client
    let sender_task = tokio::spawn(async move {
        while let Some(msg) = receiver.next().await {
            if let Ok(msg) = msg {
                if let Ok(text) = msg.to_text() {
                    if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
                        handle_client_message(client_msg, &state).await;
                    }
                }
            }
        }
    });

    // Handle outgoing messages to client
    let receiver_task = tokio::spawn(async move {
        let mut batch = Vec::new();
        let batch_size = params.batch_size.unwrap_or(10);

        while let Ok(event) = event_rx.recv().await {
            // Filter events based on subscription
            if !requested_events.is_empty() && !requested_events.contains(&event.event_type) {
                continue;
            }

            // Check permissions
            if !has_event_permission(&claims, &event.event_type) {
                continue;
            }

            batch.push(event);

            // Send batch when full or after timeout
            if batch.len() >= batch_size {
                send_event_batch(&mut sender, &batch).await;
                batch.clear();
            }
        }
    });

    // Wait for either task to complete
    tokio::select! {
        _ = sender_task => {},
        _ = receiver_task => {},
    }
}

#[derive(Debug, Deserialize)]
struct ClientMessage {
    #[serde(rename = "type")]
    msg_type: String,
    token: Option<String>,
    events: Option<Vec<String>>,
}

async fn handle_client_message(msg: ClientMessage, state: &SharedState) {
    match msg.msg_type.as_str() {
        "subscribe" => {
            // Handle event subscription
        },
        "unsubscribe" => {
            // Handle event unsubscription
        },
        "auth" => {
            // Handle re-authentication
        },
        _ => {
            // Unknown message type
        }
    }
}

async fn send_event_batch(sender: &mut SplitSink<WebSocket, Message>, batch: &[WebSocketMessage]) {
    let batch_msg = serde_json::json!({
        "type": "batch",
        "events": batch
    });

    if let Ok(msg_text) = serde_json::to_string(&batch_msg) {
        if let Err(e) = sender.send(Message::Text(msg_text)).await {
            eprintln!("Failed to send WebSocket message: {}", e);
        }
    }
}

fn has_event_permission(claims: &Claims, event_type: &str) -> bool {
    // Check if user has permission to receive this event type
    match event_type {
        "SystemHealthUpdate" => claims.role.contains(&"admin".to_string()),
        "LogEntry" => claims.role.contains(&"admin".to_string()) ||
                     claims.role.contains(&"developer".to_string()),
        _ => true, // Most events are accessible to all authenticated users
    }
}

Event Filtering and Subscriptions

Client-Side Filtering

// Subscribe to specific event types
ws.subscribe(['TaskStatusChanged', 'WorkflowProgressUpdate']);

// Subscribe with filters
ws.send({
  type: 'subscribe',
  events: ['TaskStatusChanged'],
  filters: {
    task_name: 'create_servers',
    status: ['Running', 'Completed', 'Failed']
  }
});

// Advanced filtering
ws.send({
  type: 'subscribe',
  events: ['LogEntry'],
  filters: {
    level: ['ERROR', 'WARN'],
    component: ['server-manager', 'batch-coordinator'],
    since: '2025-09-26T10:00:00Z'
  }
});

Server-Side Event Filtering

Events can be filtered on the server side based on:

  • User permissions and roles
  • Event type subscriptions
  • Custom filter criteria
  • Rate limiting

Error Handling and Reconnection

Connection Errors

ws.on('error', (error) => {
  console.error('WebSocket error:', error);

  // Handle specific error types
  if (error.code === 1006) {
    // Abnormal closure, attempt reconnection
    setTimeout(() => ws.connect(), 5000);
  } else if (error.code === 1008) {
    // Policy violation, check token
    refreshTokenAndReconnect();
  }
});

ws.on('disconnected', (event) => {
  console.log(`WebSocket disconnected: ${event.code} - ${event.reason}`);

  // Handle different close codes
  switch (event.code) {
    case 1000: // Normal closure
      console.log('Connection closed normally');
      break;
    case 1001: // Going away
      console.log('Server is shutting down');
      break;
    case 4001: // Custom: Token expired
      refreshTokenAndReconnect();
      break;
    default:
      // Attempt reconnection for other errors
      if (shouldReconnect()) {
        scheduleReconnection();
      }
  }
});

Heartbeat and Keep-Alive

class ProvisioningWebSocket {
  constructor(baseUrl, token, options = {}) {
    // ... existing code ...
    this.heartbeatInterval = options.heartbeatInterval || 30000;
    this.heartbeatTimer = null;
  }

  connect() {
    // ... existing connection code ...

    this.ws.onopen = (event) => {
      console.log('WebSocket connected');
      this.startHeartbeat();
      this.emit('connected', event);
    };

    this.ws.onclose = (event) => {
      this.stopHeartbeat();
      // ... existing close handling ...
    };
  }

  startHeartbeat() {
    this.heartbeatTimer = setInterval(() => {
      if (this.ws && this.ws.readyState === WebSocket.OPEN) {
        this.send({ type: 'ping' });
      }
    }, this.heartbeatInterval);
  }

  stopHeartbeat() {
    if (this.heartbeatTimer) {
      clearInterval(this.heartbeatTimer);
      this.heartbeatTimer = null;
    }
  }

  handleMessage(message) {
    if (message.type === 'pong') {
      // Heartbeat response received
      return;
    }

    // ... existing message handling ...
  }
}

Performance Considerations

Message Batching

To improve performance, the server can batch multiple events into single WebSocket messages:

{
  "type": "batch",
  "timestamp": "2025-09-26T10:00:00Z",
  "events": [
    {
      "event_type": "TaskStatusChanged",
      "data": { ... }
    },
    {
      "event_type": "WorkflowProgressUpdate",
      "data": { ... }
    }
  ]
}

Compression

Enable message compression for large events:

const ws = new WebSocket('ws://localhost:9090/ws?token=jwt&compression=true');

Rate Limiting

The server implements rate limiting to prevent abuse:

  • Maximum connections per user: 10
  • Maximum messages per second: 100
  • Maximum subscription events: 50

Security Considerations

Authentication and Authorization

  • All connections require valid JWT tokens
  • Tokens are validated on connection and periodically renewed
  • Event access is controlled by user roles and permissions

Message Validation

  • All incoming messages are validated against schemas
  • Malformed messages are rejected
  • Rate limiting prevents DoS attacks

Data Sanitization

  • All event data is sanitized before transmission
  • Sensitive information is filtered based on user permissions
  • PII and secrets are never transmitted

This WebSocket API provides a robust, real-time communication channel for monitoring and managing provisioning with comprehensive security and performance features.