# Integration Examples This document provides comprehensive examples and patterns for integrating with provisioning APIs, including client libraries, SDKs, error handling strategies, and performance optimization. ## Overview Provisioning offers multiple integration points: - REST APIs for workflow management - WebSocket APIs for real-time monitoring - Configuration APIs for system setup - Extension APIs for custom providers and services ## Complete Integration Examples ### Python Integration #### Full-Featured Python Client ``` import asyncio import json import logging import time import requests import websockets from typing import Dict, List, Optional, Callable from dataclasses import dataclass from enum import Enum class TaskStatus(Enum): PENDING = "Pending" RUNNING = "Running" COMPLETED = "Completed" FAILED = "Failed" CANCELLED = "Cancelled" @dataclass class WorkflowTask: id: str name: str status: TaskStatus created_at: str started_at: Optional[str] = None completed_at: Optional[str] = None output: Optional[str] = None error: Optional[str] = None progress: Optional[float] = None class ProvisioningAPIError(Exception): """Base exception for provisioning API errors""" pass class AuthenticationError(ProvisioningAPIError): """Authentication failed""" pass class ValidationError(ProvisioningAPIError): """Request validation failed""" pass class ProvisioningClient: """ Complete Python client for provisioning Features: - REST API integration - WebSocket support for real-time updates - Automatic token refresh - Retry logic with exponential backoff - Comprehensive error handling """ def __init__(self, base_url: str = "http://localhost:9090", auth_url: str = "http://localhost:8081", username: str = None, password: str = None, token: str = None): self.base_url = base_url self.auth_url = auth_url self.username = username self.password = password self.token = token self.session = requests.Session() self.websocket = None self.event_handlers = {} # Setup logging self.logger = logging.getLogger(__name__) # Configure session with retries from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry retry_strategy = Retry( total=3, status_forcelist=[429, 500, 502, 503, 504], method_whitelist=["HEAD", "GET", "OPTIONS"], backoff_factor=1 ) adapter = HTTPAdapter(max_retries=retry_strategy) self.session.mount("http://", adapter) self.session.mount("https://", adapter) async def authenticate(self) -> str: """Authenticate and get JWT token""" if self.token: return self.token if not self.username or not self.password: raise AuthenticationError("Username and password required for authentication") auth_data = { "username": self.username, "password": self.password } try: response = requests.post(f"{self.auth_url}/auth/login", json=auth_data) response.raise_for_status() result = response.json() if not result.get('success'): raise AuthenticationError(result.get('error', 'Authentication failed')) self.token = result['data']['token'] self.session.headers.update({ 'Authorization': f'Bearer {self.token}' }) self.logger.info("Authentication successful") return self.token except requests.RequestException as e: raise AuthenticationError(f"Authentication request failed: {e}") def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict: """Make authenticated HTTP request with error handling""" if not self.token: raise AuthenticationError("Not authenticated. Call authenticate() first.") url = f"{self.base_url}{endpoint}" try: response = self.session.request(method, url, **kwargs) response.raise_for_status() result = response.json() if not result.get('success'): error_msg = result.get('error', 'Request failed') if response.status_code == 400: raise ValidationError(error_msg) else: raise ProvisioningAPIError(error_msg) return result['data'] except requests.RequestException as e: self.logger.error(f"Request failed: {method} {url} - {e}") raise ProvisioningAPIError(f"Request failed: {e}") # Workflow Management Methods def create_server_workflow(self, infra: str, settings: str = "config.ncl", check_mode: bool = False, wait: bool = False) -> str: """Create a server provisioning workflow""" data = { "infra": infra, "settings": settings, "check_mode": check_mode, "wait": wait } task_id = self._make_request("POST", "/workflows/servers/create", json=data) self.logger.info(f"Server workflow created: {task_id}") return task_id def create_taskserv_workflow(self, operation: str, taskserv: str, infra: str, settings: str = "config.ncl", check_mode: bool = False, wait: bool = False) -> str: """Create a task service workflow""" data = { "operation": operation, "taskserv": taskserv, "infra": infra, "settings": settings, "check_mode": check_mode, "wait": wait } task_id = self._make_request("POST", "/workflows/taskserv/create", json=data) self.logger.info(f"Taskserv workflow created: {task_id}") return task_id def create_cluster_workflow(self, operation: str, cluster_type: str, infra: str, settings: str = "config.ncl", check_mode: bool = False, wait: bool = False) -> str: """Create a cluster workflow""" data = { "operation": operation, "cluster_type": cluster_type, "infra": infra, "settings": settings, "check_mode": check_mode, "wait": wait } task_id = self._make_request("POST", "/workflows/cluster/create", json=data) self.logger.info(f"Cluster workflow created: {task_id}") return task_id def get_task_status(self, task_id: str) -> WorkflowTask: """Get the status of a specific task""" data = self._make_request("GET", f"/tasks/{task_id}") return WorkflowTask( id=data['id'], name=data['name'], status=TaskStatus(data['status']), created_at=data['created_at'], started_at=data.get('started_at'), completed_at=data.get('completed_at'), output=data.get('output'), error=data.get('error'), progress=data.get('progress') ) def list_tasks(self, status_filter: Optional[str] = None) -> List[WorkflowTask]: """List all tasks, optionally filtered by status""" params = {} if status_filter: params['status'] = status_filter data = self._make_request("GET", "/tasks", params=params) return [ WorkflowTask( id=task['id'], name=task['name'], status=TaskStatus(task['status']), created_at=task['created_at'], started_at=task.get('started_at'), completed_at=task.get('completed_at'), output=task.get('output'), error=task.get('error') ) for task in data ] def wait_for_task_completion(self, task_id: str, timeout: int = 300, poll_interval: int = 5) -> WorkflowTask: """Wait for a task to complete""" start_time = time.time() while time.time() - start_time < timeout: task = self.get_task_status(task_id) if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]: self.logger.info(f"Task {task_id} finished with status: {task.status}") return task self.logger.debug(f"Task {task_id} status: {task.status}") time.sleep(poll_interval) raise TimeoutError(f"Task {task_id} did not complete within {timeout} seconds") # Batch Operations def execute_batch_operation(self, batch_config: Dict) -> Dict: """Execute a batch operation""" return self._make_request("POST", "/batch/execute", json=batch_config) def get_batch_status(self, batch_id: str) -> Dict: """Get batch operation status""" return self._make_request("GET", f"/batch/operations/{batch_id}") def cancel_batch_operation(self, batch_id: str) -> str: """Cancel a running batch operation""" return self._make_request("POST", f"/batch/operations/{batch_id}/cancel") # System Health and Monitoring def get_system_health(self) -> Dict: """Get system health status""" return self._make_request("GET", "/state/system/health") def get_system_metrics(self) -> Dict: """Get system metrics""" return self._make_request("GET", "/state/system/metrics") # WebSocket Integration async def connect_websocket(self, event_types: List[str] = None): """Connect to WebSocket for real-time updates""" if not self.token: await self.authenticate() ws_url = f"ws://localhost:9090/ws?token={self.token}" if event_types: ws_url += f"&events={','.join(event_types)}" try: self.websocket = await websockets.connect(ws_url) self.logger.info("WebSocket connected") # Start listening for messages asyncio.create_task(self._websocket_listener()) except Exception as e: self.logger.error(f"WebSocket connection failed: {e}") raise async def _websocket_listener(self): """Listen for WebSocket messages""" try: async for message in self.websocket: try: data = json.loads(message) await self._handle_websocket_message(data) except json.JSONDecodeError: self.logger.error(f"Invalid JSON received: {message}") except Exception as e: self.logger.error(f"WebSocket listener error: {e}") async def _handle_websocket_message(self, data: Dict): """Handle incoming WebSocket messages""" event_type = data.get('event_type') if event_type and event_type in self.event_handlers: for handler in self.event_handlers[event_type]: try: await handler(data) except Exception as e: self.logger.error(f"Error in event handler for {event_type}: {e}") def on_event(self, event_type: str, handler: Callable): """Register an event handler""" if event_type not in self.event_handlers: self.event_handlers[event_type] = [] self.event_handlers[event_type].append(handler) async def disconnect_websocket(self): """Disconnect from WebSocket""" if self.websocket: await self.websocket.close() self.websocket = None self.logger.info("WebSocket disconnected") # Usage Example async def main(): # Initialize client client = ProvisioningClient( username="admin", password="password" ) try: # Authenticate await client.authenticate() # Create a server workflow task_id = client.create_server_workflow( infra="production", settings="prod-settings.ncl", wait=False ) print(f"Server workflow created: {task_id}") # Set up WebSocket event handlers async def on_task_update(event): print(f"Task update: {event['data']['task_id']} -> {event['data']['status']}") async def on_system_health(event): print(f"System health: {event['data']['overall_status']}") client.on_event('TaskStatusChanged', on_task_update) client.on_event('SystemHealthUpdate', on_system_health) # Connect to WebSocket await client.connect_websocket(['TaskStatusChanged', 'SystemHealthUpdate']) # Wait for task completion final_task = client.wait_for_task_completion(task_id, timeout=600) print(f"Task completed with status: {final_task.status}") if final_task.status == TaskStatus.COMPLETED: print(f"Output: {final_task.output}") elif final_task.status == TaskStatus.FAILED: print(f"Error: {final_task.error}") except ProvisioningAPIError as e: print(f"API Error: {e}") except Exception as e: print(f"Unexpected error: {e}") finally: await client.disconnect_websocket() if __name__ == "__main__": asyncio.run(main()) ``` ### Node.js/JavaScript Integration #### Complete JavaScript/TypeScript Client ``` import axios, { AxiosInstance, AxiosResponse } from 'axios'; import WebSocket from 'ws'; import { EventEmitter } from 'events'; interface Task { id: string; name: string; status: 'Pending' | 'Running' | 'Completed' | 'Failed' | 'Cancelled'; created_at: string; started_at?: string; completed_at?: string; output?: string; error?: string; progress?: number; } interface BatchConfig { name: string; version: string; storage_backend: string; parallel_limit: number; rollback_enabled: boolean; operations: Array<{ id: string; type: string; provider: string; dependencies: string[]; [key: string]: any; }>; } interface WebSocketEvent { event_type: string; timestamp: string; data: any; metadata: Record; } class ProvisioningClient extends EventEmitter { private httpClient: AxiosInstance; private authClient: AxiosInstance; private websocket?: WebSocket; private token?: string; private reconnectAttempts = 0; private maxReconnectAttempts = 10; private reconnectInterval = 5000; constructor( private baseUrl = 'http://localhost:9090', private authUrl = 'http://localhost:8081', private username?: string, private password?: string, token?: string ) { super(); this.token = token; // Setup HTTP clients this.httpClient = axios.create({ baseURL: baseUrl, timeout: 30000, }); this.authClient = axios.create({ baseURL: authUrl, timeout: 10000, }); // Setup request interceptors this.setupInterceptors(); } private setupInterceptors(): void { // Request interceptor to add auth token this.httpClient.interceptors.request.use((config) => { if (this.token) { config.headers.Authorization = `Bearer ${this.token}`; } return config; }); // Response interceptor for error handling this.httpClient.interceptors.response.use( (response) => response, async (error) => { if (error.response?.status === 401 && this.username && this.password) { // Token expired, try to refresh try { await this.authenticate(); // Retry the original request const originalRequest = error.config; originalRequest.headers.Authorization = `Bearer ${this.token}`; return this.httpClient.request(originalRequest); } catch (authError) { this.emit('authError', authError); throw error; } } throw error; } ); } async authenticate(): Promise { if (this.token) { return this.token; } if (!this.username || !this.password) { throw new Error('Username and password required for authentication'); } try { const response = await this.authClient.post('/auth/login', { username: this.username, password: this.password, }); const result = response.data; if (!result.success) { throw new Error(result.error || 'Authentication failed'); } this.token = result.data.token; console.log('Authentication successful'); this.emit('authenticated', this.token); return this.token; } catch (error) { console.error('Authentication failed:', error); throw new Error(`Authentication failed: ${error.message}`); } } private async makeRequest(method: string, endpoint: string, data?: any): Promise { try { const response: AxiosResponse = await this.httpClient.request({ method, url: endpoint, data, }); const result = response.data; if (!result.success) { throw new Error(result.error || 'Request failed'); } return result.data; } catch (error) { console.error(`Request failed: ${method} ${endpoint}`, error); throw error; } } // Workflow Management Methods async createServerWorkflow(config: { infra: string; settings?: string; check_mode?: boolean; wait?: boolean; }): Promise { const data = { infra: config.infra, settings: config.settings || 'config.ncl', check_mode: config.check_mode || false, wait: config.wait || false, }; const taskId = await this.makeRequest('POST', '/workflows/servers/create', data); console.log(`Server workflow created: ${taskId}`); this.emit('workflowCreated', { type: 'server', taskId }); return taskId; } async createTaskservWorkflow(config: { operation: string; taskserv: string; infra: string; settings?: string; check_mode?: boolean; wait?: boolean; }): Promise { const data = { operation: config.operation, taskserv: config.taskserv, infra: config.infra, settings: config.settings || 'config.ncl', check_mode: config.check_mode || false, wait: config.wait || false, }; const taskId = await this.makeRequest('POST', '/workflows/taskserv/create', data); console.log(`Taskserv workflow created: ${taskId}`); this.emit('workflowCreated', { type: 'taskserv', taskId }); return taskId; } async createClusterWorkflow(config: { operation: string; cluster_type: string; infra: string; settings?: string; check_mode?: boolean; wait?: boolean; }): Promise { const data = { operation: config.operation, cluster_type: config.cluster_type, infra: config.infra, settings: config.settings || 'config.ncl', check_mode: config.check_mode || false, wait: config.wait || false, }; const taskId = await this.makeRequest('POST', '/workflows/cluster/create', data); console.log(`Cluster workflow created: ${taskId}`); this.emit('workflowCreated', { type: 'cluster', taskId }); return taskId; } async getTaskStatus(taskId: string): Promise { return this.makeRequest('GET', `/tasks/${taskId}`); } async listTasks(statusFilter?: string): Promise { const params = statusFilter ? `?status=${statusFilter}` : ''; return this.makeRequest('GET', `/tasks${params}`); } async waitForTaskCompletion( taskId: string, timeout = 300000, // 5 minutes pollInterval = 5000 // 5 seconds ): Promise { return new Promise((resolve, reject) => { const startTime = Date.now(); const poll = async () => { try { const task = await this.getTaskStatus(taskId); if (['Completed', 'Failed', 'Cancelled'].includes(task.status)) { console.log(`Task ${taskId} finished with status: ${task.status}`); resolve(task); return; } if (Date.now() - startTime > timeout) { reject(new Error(`Task ${taskId} did not complete within ${timeout}ms`)); return; } console.log(`Task ${taskId} status: ${task.status}`); this.emit('taskProgress', task); setTimeout(poll, pollInterval); } catch (error) { reject(error); } }; poll(); }); } // Batch Operations async executeBatchOperation(batchConfig: BatchConfig): Promise { const result = await this.makeRequest('POST', '/batch/execute', batchConfig); console.log(`Batch operation started: ${result.batch_id}`); this.emit('batchStarted', result); return result; } async getBatchStatus(batchId: string): Promise { return this.makeRequest('GET', `/batch/operations/${batchId}`); } async cancelBatchOperation(batchId: string): Promise { return this.makeRequest('POST', `/batch/operations/${batchId}/cancel`); } // System Monitoring async getSystemHealth(): Promise { return this.makeRequest('GET', '/state/system/health'); } async getSystemMetrics(): Promise { return this.makeRequest('GET', '/state/system/metrics'); } // WebSocket Integration async connectWebSocket(eventTypes?: string[]): Promise { if (!this.token) { await this.authenticate(); } let wsUrl = `ws://localhost:9090/ws?token=${this.token}`; if (eventTypes && eventTypes.length > 0) { wsUrl += `&events=${eventTypes.join(',')}`; } return new Promise((resolve, reject) => { this.websocket = new WebSocket(wsUrl); this.websocket.on('open', () => { console.log('WebSocket connected'); this.reconnectAttempts = 0; this.emit('websocketConnected'); resolve(); }); this.websocket.on('message', (data: WebSocket.Data) => { try { const event: WebSocketEvent = JSON.parse(data.toString()); this.handleWebSocketMessage(event); } catch (error) { console.error('Failed to parse WebSocket message:', error); } }); this.websocket.on('close', (code: number, reason: string) => { console.log(`WebSocket disconnected: ${code} - ${reason}`); this.emit('websocketDisconnected', { code, reason }); if (this.reconnectAttempts < this.maxReconnectAttempts) { setTimeout(() => { this.reconnectAttempts++; console.log(`Reconnecting... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`); this.connectWebSocket(eventTypes); }, this.reconnectInterval); } }); this.websocket.on('error', (error: Error) => { console.error('WebSocket error:', error); this.emit('websocketError', error); reject(error); }); }); } private handleWebSocketMessage(event: WebSocketEvent): void { console.log(`WebSocket event: ${event.event_type}`); // Emit specific event this.emit(event.event_type, event); // Emit general event this.emit('websocketMessage', event); // Handle specific event types switch (event.event_type) { case 'TaskStatusChanged': this.emit('taskStatusChanged', event.data); break; case 'WorkflowProgressUpdate': this.emit('workflowProgress', event.data); break; case 'SystemHealthUpdate': this.emit('systemHealthUpdate', event.data); break; case 'BatchOperationUpdate': this.emit('batchUpdate', event.data); break; } } disconnectWebSocket(): void { if (this.websocket) { this.websocket.close(); this.websocket = undefined; console.log('WebSocket disconnected'); } } // Utility Methods async healthCheck(): Promise { try { const response = await this.httpClient.get('/health'); return response.data.success; } catch (error) { return false; } } } // Usage Example async function main() { const client = new ProvisioningClient( 'http://localhost:9090', 'http://localhost:8081', 'admin', 'password' ); try { // Authenticate await client.authenticate(); // Set up event listeners client.on('taskStatusChanged', (task) => { console.log(`Task ${task.task_id} status changed to: ${task.status}`); }); client.on('workflowProgress', (progress) => { console.log(`Workflow progress: ${progress.progress}% - ${progress.current_step}`); }); client.on('systemHealthUpdate', (health) => { console.log(`System health: ${health.overall_status}`); }); // Connect WebSocket await client.connectWebSocket(['TaskStatusChanged', 'WorkflowProgressUpdate', 'SystemHealthUpdate']); // Create workflows const serverTaskId = await client.createServerWorkflow({ infra: 'production', settings: 'prod-settings.ncl', }); const taskservTaskId = await client.createTaskservWorkflow({ operation: 'create', taskserv: 'kubernetes', infra: 'production', }); // Wait for completion const [serverTask, taskservTask] = await Promise.all([ client.waitForTaskCompletion(serverTaskId), client.waitForTaskCompletion(taskservTaskId), ]); console.log('All workflows completed'); console.log(`Server task: ${serverTask.status}`); console.log(`Taskserv task: ${taskservTask.status}`); // Create batch operation const batchConfig: BatchConfig = { name: 'test_deployment', version: '1.0.0', storage_backend: 'filesystem', parallel_limit: 3, rollback_enabled: true, operations: [ { id: 'servers', type: 'server_batch', provider: 'upcloud', dependencies: [], server_configs: [ { name: 'web-01', plan: '1xCPU-2 GB', zone: 'de-fra1' }, { name: 'web-02', plan: '1xCPU-2 GB', zone: 'de-fra1' }, ], }, { id: 'taskservs', type: 'taskserv_batch', provider: 'upcloud', dependencies: ['servers'], taskservs: ['kubernetes', 'cilium'], }, ], }; const batchResult = await client.executeBatchOperation(batchConfig); console.log(`Batch operation started: ${batchResult.batch_id}`); // Monitor batch operation const monitorBatch = setInterval(async () => { try { const batchStatus = await client.getBatchStatus(batchResult.batch_id); console.log(`Batch status: ${batchStatus.status} - ${batchStatus.progress}%`); if (['Completed', 'Failed', 'Cancelled'].includes(batchStatus.status)) { clearInterval(monitorBatch); console.log(`Batch operation finished: ${batchStatus.status}`); } } catch (error) { console.error('Error checking batch status:', error); clearInterval(monitorBatch); } }, 10000); } catch (error) { console.error('Integration example failed:', error); } finally { client.disconnectWebSocket(); } } // Run example if (require.main === module) { main().catch(console.error); } export { ProvisioningClient, Task, BatchConfig }; ``` ## Error Handling Strategies ### Comprehensive Error Handling ``` class ProvisioningErrorHandler: """Centralized error handling for provisioning operations""" def __init__(self, client: ProvisioningClient): self.client = client self.retry_strategies = { 'network_error': self._exponential_backoff, 'rate_limit': self._rate_limit_backoff, 'server_error': self._server_error_strategy, 'auth_error': self._auth_error_strategy, } async def execute_with_retry(self, operation: Callable, *args, **kwargs): """Execute operation with intelligent retry logic""" max_attempts = 3 attempt = 0 while attempt < max_attempts: try: return await operation(*args, **kwargs) except Exception as e: attempt += 1 error_type = self._classify_error(e) if attempt >= max_attempts: self._log_final_failure(operation.__name__, e, attempt) raise retry_strategy = self.retry_strategies.get(error_type, self._default_retry) wait_time = retry_strategy(attempt, e) self._log_retry_attempt(operation.__name__, e, attempt, wait_time) await asyncio.sleep(wait_time) def _classify_error(self, error: Exception) -> str: """Classify error type for appropriate retry strategy""" if isinstance(error, requests.ConnectionError): return 'network_error' elif isinstance(error, requests.HTTPError): if error.response.status_code == 429: return 'rate_limit' elif 500 <= error.response.status_code < 600: return 'server_error' elif error.response.status_code == 401: return 'auth_error' return 'unknown' def _exponential_backoff(self, attempt: int, error: Exception) -> float: """Exponential backoff for network errors""" return min(2 ** attempt + random.uniform(0, 1), 60) def _rate_limit_backoff(self, attempt: int, error: Exception) -> float: """Handle rate limiting with appropriate backoff""" retry_after = getattr(error.response, 'headers', {}).get('Retry-After') if retry_after: return float(retry_after) return 60 # Default to 60 seconds def _server_error_strategy(self, attempt: int, error: Exception) -> float: """Handle server errors""" return min(10 * attempt, 60) def _auth_error_strategy(self, attempt: int, error: Exception) -> float: """Handle authentication errors""" # Re-authenticate before retry asyncio.create_task(self.client.authenticate()) return 5 def _default_retry(self, attempt: int, error: Exception) -> float: """Default retry strategy""" return min(5 * attempt, 30) # Usage example async def robust_workflow_execution(): client = ProvisioningClient() handler = ProvisioningErrorHandler(client) try: # Execute with automatic retry task_id = await handler.execute_with_retry( client.create_server_workflow, infra="production", settings="config.ncl" ) # Wait for completion with retry task = await handler.execute_with_retry( client.wait_for_task_completion, task_id, timeout=600 ) return task except Exception as e: # Log detailed error information logger.error(f"Workflow execution failed after all retries: {e}") # Implement fallback strategy return await fallback_workflow_strategy() ``` ### Circuit Breaker Pattern ``` class CircuitBreaker { private failures = 0; private nextAttempt = Date.now(); private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED'; constructor( private threshold = 5, private timeout = 60000, // 1 minute private monitoringPeriod = 10000 // 10 seconds ) {} async execute(operation: () => Promise): Promise { if (this.state === 'OPEN') { if (Date.now() < this.nextAttempt) { throw new Error('Circuit breaker is OPEN'); } this.state = 'HALF_OPEN'; } try { const result = await operation(); this.onSuccess(); return result; } catch (error) { this.onFailure(); throw error; } } private onSuccess(): void { this.failures = 0; this.state = 'CLOSED'; } private onFailure(): void { this.failures++; if (this.failures >= this.threshold) { this.state = 'OPEN'; this.nextAttempt = Date.now() + this.timeout; } } getState(): string { return this.state; } getFailures(): number { return this.failures; } } // Usage with ProvisioningClient class ResilientProvisioningClient { private circuitBreaker = new CircuitBreaker(); constructor(private client: ProvisioningClient) {} async createServerWorkflow(config: any): Promise { return this.circuitBreaker.execute(async () => { return this.client.createServerWorkflow(config); }); } async getTaskStatus(taskId: string): Promise { return this.circuitBreaker.execute(async () => { return this.client.getTaskStatus(taskId); }); } } ``` ## Performance Optimization ### Connection Pooling and Caching ``` import asyncio import aiohttp from cachetools import TTLCache import time class OptimizedProvisioningClient: """High-performance client with connection pooling and caching""" def __init__(self, base_url: str, max_connections: int = 100): self.base_url = base_url self.session = None self.cache = TTLCache(maxsize=1000, ttl=300) # 5-minute cache self.max_connections = max_connections async def __aenter__(self): """Async context manager entry""" connector = aiohttp.TCPConnector( limit=self.max_connections, limit_per_host=20, keepalive_timeout=30, enable_cleanup_closed=True ) timeout = aiohttp.ClientTimeout(total=30, connect=5) self.session = aiohttp.ClientSession( connector=connector, timeout=timeout, headers={'User-Agent': 'ProvisioningClient/2.0.0'} ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit""" if self.session: await self.session.close() async def get_task_status_cached(self, task_id: str) -> dict: """Get task status with caching""" cache_key = f"task_status:{task_id}" # Check cache first if cache_key in self.cache: return self.cache[cache_key] # Fetch from API result = await self._make_request('GET', f'/tasks/{task_id}') # Cache completed tasks for longer if result.get('status') in ['Completed', 'Failed', 'Cancelled']: self.cache[cache_key] = result return result async def batch_get_task_status(self, task_ids: list) -> dict: """Get multiple task statuses in parallel""" tasks = [self.get_task_status_cached(task_id) for task_id in task_ids] results = await asyncio.gather(*tasks, return_exceptions=True) return { task_id: result for task_id, result in zip(task_ids, results) if not isinstance(result, Exception) } async def _make_request(self, method: str, endpoint: str, **kwargs): """Optimized HTTP request method""" url = f"{self.base_url}{endpoint}" start_time = time.time() async with self.session.request(method, url, **kwargs) as response: request_time = time.time() - start_time # Log slow requests if request_time > 5.0: print(f"Slow request: {method} {endpoint} took {request_time:.2f}s") response.raise_for_status() result = await response.json() if not result.get('success'): raise Exception(result.get('error', 'Request failed')) return result['data'] # Usage example async def high_performance_workflow(): async with OptimizedProvisioningClient('http://localhost:9090') as client: # Create multiple workflows in parallel workflow_tasks = [ client.create_server_workflow({'infra': f'server-{i}'}) for i in range(10) ] task_ids = await asyncio.gather(*workflow_tasks) print(f"Created {len(task_ids)} workflows") # Monitor all tasks efficiently while True: # Batch status check statuses = await client.batch_get_task_status(task_ids) completed = [ task_id for task_id, status in statuses.items() if status.get('status') in ['Completed', 'Failed', 'Cancelled'] ] print(f"Completed: {len(completed)}/{len(task_ids)}") if len(completed) == len(task_ids): break await asyncio.sleep(10) ``` ### WebSocket Connection Pooling ``` class WebSocketPool { constructor(maxConnections = 5) { this.maxConnections = maxConnections; this.connections = new Map(); this.connectionQueue = []; } async getConnection(token, eventTypes = []) { const key = `${token}:${eventTypes.sort().join(',')}`; if (this.connections.has(key)) { return this.connections.get(key); } if (this.connections.size >= this.maxConnections) { // Wait for available connection await this.waitForAvailableSlot(); } const connection = await this.createConnection(token, eventTypes); this.connections.set(key, connection); return connection; } async createConnection(token, eventTypes) { const ws = new WebSocket(`ws://localhost:9090/ws?token=${token}&events=${eventTypes.join(',')}`); return new Promise((resolve, reject) => { ws.onopen = () => resolve(ws); ws.onerror = (error) => reject(error); ws.onclose = () => { // Remove from pool when closed for (const [key, conn] of this.connections.entries()) { if (conn === ws) { this.connections.delete(key); break; } } }; }); } async waitForAvailableSlot() { return new Promise((resolve) => { this.connectionQueue.push(resolve); }); } releaseConnection(ws) { if (this.connectionQueue.length > 0) { const waitingResolver = this.connectionQueue.shift(); waitingResolver(); } } } ``` ## SDK Documentation ### Python SDK The Python SDK provides a comprehensive interface for provisioning: #### Installation ``` pip install provisioning-client ``` #### Quick Start ``` from provisioning_client import ProvisioningClient # Initialize client client = ProvisioningClient( base_url="http://localhost:9090", username="admin", password="password" ) # Create workflow task_id = await client.create_server_workflow( infra="production", settings="config.ncl" ) # Wait for completion task = await client.wait_for_task_completion(task_id) print(f"Workflow completed: {task.status}") ``` #### Advanced Usage ``` # Use with async context manager async with ProvisioningClient() as client: # Batch operations batch_config = { "name": "deployment", "operations": [...] } batch_result = await client.execute_batch_operation(batch_config) # Real-time monitoring await client.connect_websocket(['TaskStatusChanged']) client.on_event('TaskStatusChanged', handle_task_update) ``` ### JavaScript/TypeScript SDK #### Installation ``` npm install @provisioning/client ``` #### Usage ``` import { ProvisioningClient } from '@provisioning/client'; const client = new ProvisioningClient({ baseUrl: 'http://localhost:9090', username: 'admin', password: 'password' }); // Create workflow const taskId = await client.createServerWorkflow({ infra: 'production', settings: 'config.ncl' }); // Monitor progress client.on('workflowProgress', (progress) => { console.log(`Progress: ${progress.progress}%`); }); await client.connectWebSocket(); ``` ## Common Integration Patterns ### Workflow Orchestration Pipeline ``` class WorkflowPipeline: """Orchestrate complex multi-step workflows""" def __init__(self, client: ProvisioningClient): self.client = client self.steps = [] def add_step(self, name: str, operation: Callable, dependencies: list = None): """Add a step to the pipeline""" self.steps.append({ 'name': name, 'operation': operation, 'dependencies': dependencies or [], 'status': 'pending', 'result': None }) async def execute(self): """Execute the pipeline""" completed_steps = set() while len(completed_steps) < len(self.steps): # Find steps ready to execute ready_steps = [ step for step in self.steps if (step['status'] == 'pending' and all(dep in completed_steps for dep in step['dependencies'])) ] if not ready_steps: raise Exception("Pipeline deadlock detected") # Execute ready steps in parallel tasks = [] for step in ready_steps: step['status'] = 'running' tasks.append(self._execute_step(step)) # Wait for completion results = await asyncio.gather(*tasks, return_exceptions=True) for step, result in zip(ready_steps, results): if isinstance(result, Exception): step['status'] = 'failed' step['error'] = str(result) raise Exception(f"Step {step['name']} failed: {result}") else: step['status'] = 'completed' step['result'] = result completed_steps.add(step['name']) async def _execute_step(self, step): """Execute a single step""" try: return await step['operation']() except Exception as e: print(f"Step {step['name']} failed: {e}") raise # Usage example async def complex_deployment(): client = ProvisioningClient() pipeline = WorkflowPipeline(client) # Define deployment steps pipeline.add_step('servers', lambda: client.create_server_workflow({ 'infra': 'production' })) pipeline.add_step('kubernetes', lambda: client.create_taskserv_workflow({ 'operation': 'create', 'taskserv': 'kubernetes', 'infra': 'production' }), dependencies=['servers']) pipeline.add_step('cilium', lambda: client.create_taskserv_workflow({ 'operation': 'create', 'taskserv': 'cilium', 'infra': 'production' }), dependencies=['kubernetes']) # Execute pipeline await pipeline.execute() print("Deployment pipeline completed successfully") ``` ### Event-Driven Architecture ``` class EventDrivenWorkflowManager { constructor(client) { this.client = client; this.workflows = new Map(); this.setupEventHandlers(); } setupEventHandlers() { this.client.on('TaskStatusChanged', this.handleTaskStatusChange.bind(this)); this.client.on('WorkflowProgressUpdate', this.handleProgressUpdate.bind(this)); this.client.on('SystemHealthUpdate', this.handleHealthUpdate.bind(this)); } async createWorkflow(config) { const workflowId = generateUUID(); const workflow = { id: workflowId, config, tasks: [], status: 'pending', progress: 0, events: [] }; this.workflows.set(workflowId, workflow); // Start workflow execution await this.executeWorkflow(workflow); return workflowId; } async executeWorkflow(workflow) { try { workflow.status = 'running'; // Create initial tasks based on configuration const taskId = await this.client.createServerWorkflow(workflow.config); workflow.tasks.push({ id: taskId, type: 'server_creation', status: 'pending' }); this.emit('workflowStarted', { workflowId: workflow.id, taskId }); } catch (error) { workflow.status = 'failed'; workflow.error = error.message; this.emit('workflowFailed', { workflowId: workflow.id, error }); } } handleTaskStatusChange(event) { // Find workflows containing this task for (const [workflowId, workflow] of this.workflows) { const task = workflow.tasks.find(t => t.id === event.data.task_id); if (task) { task.status = event.data.status; this.updateWorkflowProgress(workflow); // Trigger next steps based on task completion if (event.data.status === 'Completed') { this.triggerNextSteps(workflow, task); } } } } updateWorkflowProgress(workflow) { const completedTasks = workflow.tasks.filter(t => ['Completed', 'Failed'].includes(t.status) ).length; workflow.progress = (completedTasks / workflow.tasks.length) * 100; if (completedTasks === workflow.tasks.length) { const failedTasks = workflow.tasks.filter(t => t.status === 'Failed'); workflow.status = failedTasks.length > 0 ? 'failed' : 'completed'; this.emit('workflowCompleted', { workflowId: workflow.id, status: workflow.status }); } } async triggerNextSteps(workflow, completedTask) { // Define workflow dependencies and next steps const nextSteps = this.getNextSteps(workflow, completedTask); for (const nextStep of nextSteps) { try { const taskId = await this.executeWorkflowStep(nextStep); workflow.tasks.push({ id: taskId, type: nextStep.type, status: 'pending', dependencies: [completedTask.id] }); } catch (error) { console.error(`Failed to trigger next step: ${error.message}`); } } } getNextSteps(workflow, completedTask) { // Define workflow logic based on completed task type switch (completedTask.type) { case 'server_creation': return [ { type: 'kubernetes_installation', taskserv: 'kubernetes' }, { type: 'monitoring_setup', taskserv: 'prometheus' } ]; case 'kubernetes_installation': return [ { type: 'networking_setup', taskserv: 'cilium' } ]; default: return []; } } } ``` This comprehensive integration documentation provides developers with everything needed to successfully integrate with provisioning, including complete client implementations, error handling strategies, performance optimizations, and common integration patterns.