48 KiB
48 KiB
\nimport asyncio\nimport json\nimport logging\nimport time\nimport requests\nimport websockets\nfrom typing import Dict, List, Optional, Callable\nfrom dataclasses import dataclass\nfrom enum import Enum\n\nclass TaskStatus(Enum):\n PENDING = "Pending"\n RUNNING = "Running"\n COMPLETED = "Completed"\n FAILED = "Failed"\n CANCELLED = "Cancelled"\n\n@dataclass\nclass WorkflowTask:\n id: str\n name: str\n status: TaskStatus\n created_at: str\n started_at: Optional[str] = None\n completed_at: Optional[str] = None\n output: Optional[str] = None\n error: Optional[str] = None\n progress: Optional[float] = None\n\nclass ProvisioningAPIError(Exception):\n """Base exception for provisioning API errors"""\n pass\n\nclass AuthenticationError(ProvisioningAPIError):\n """Authentication failed"""\n pass\n\nclass ValidationError(ProvisioningAPIError):\n """Request validation failed"""\n pass\n\nclass ProvisioningClient:\n """\n Complete Python client for provisioning\n\n Features:\n - REST API integration\n - WebSocket support for real-time updates\n - Automatic token refresh\n - Retry logic with exponential backoff\n - Comprehensive error handling\n """\n\n def __init__(self,\n base_url: str = "http://localhost:9090",\n auth_url: str = "http://localhost:8081",\n username: str = None,\n password: str = None,\n token: str = None):\n self.base_url = base_url\n self.auth_url = auth_url\n self.username = username\n self.password = password\n self.token = token\n self.session = requests.Session()\n self.websocket = None\n self.event_handlers = {}\n\n # Setup logging\n self.logger = logging.getLogger(__name__)\n\n # Configure session with retries\n from requests.adapters import HTTPAdapter\n from urllib3.util.retry import Retry\n\n retry_strategy = Retry(\n total=3,\n status_forcelist=[429, 500, 502, 503, 504],\n method_whitelist=["HEAD", "GET", "OPTIONS"],\n backoff_factor=1\n )\n\n adapter = HTTPAdapter(max_retries=retry_strategy)\n self.session.mount("http://", adapter)\n self.session.mount("https://", adapter)\n\n async def authenticate(self) -> str:\n """Authenticate and get JWT token"""\n if self.token:\n return self.token\n\n if not self.username or not self.password:\n raise AuthenticationError("Username and password required for authentication")\n\n auth_data = {\n "username": self.username,\n "password": self.password\n }\n\n try:\n response = requests.post(f"{self.auth_url}/auth/login", json=auth_data)\n response.raise_for_status()\n\n result = response.json()\n if not result.get('success'):\n raise AuthenticationError(result.get('error', 'Authentication failed'))\n\n self.token = result['data']['token']\n self.session.headers.update({\n 'Authorization': f'Bearer {self.token}'\n })\n\n self.logger.info("Authentication successful")\n return self.token\n\n except requests.RequestException as e:\n raise AuthenticationError(f"Authentication request failed: {e}")\n\n def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict:\n """Make authenticated HTTP request with error handling"""\n if not self.token:\n raise AuthenticationError("Not authenticated. Call authenticate() first.")\n\n url = f"{self.base_url}{endpoint}"\n\n try:\n response = self.session.request(method, url, **kwargs)\n response.raise_for_status()\n\n result = response.json()\n if not result.get('success'):\n error_msg = result.get('error', 'Request failed')\n if response.status_code == 400:\n raise ValidationError(error_msg)\n else:\n raise ProvisioningAPIError(error_msg)\n\n return result['data']\n\n except requests.RequestException as e:\n self.logger.error(f"Request failed: {method} {url} - {e}")\n raise ProvisioningAPIError(f"Request failed: {e}")\n\n # Workflow Management Methods\n\n def create_server_workflow(self,\n infra: str,\n settings: str = "config.ncl",\n check_mode: bool = False,\n wait: bool = False) -> str:\n """Create a server provisioning workflow"""\n data = {\n "infra": infra,\n "settings": settings,\n "check_mode": check_mode,\n "wait": wait\n }\n\n task_id = self._make_request("POST", "/workflows/servers/create", json=data)\n self.logger.info(f"Server workflow created: {task_id}")\n return task_id\n\n def create_taskserv_workflow(self,\n operation: str,\n taskserv: str,\n infra: str,\n settings: str = "config.ncl",\n check_mode: bool = False,\n wait: bool = False) -> str:\n """Create a task service workflow"""\n data = {\n "operation": operation,\n "taskserv": taskserv,\n "infra": infra,\n "settings": settings,\n "check_mode": check_mode,\n "wait": wait\n }\n\n task_id = self._make_request("POST", "/workflows/taskserv/create", json=data)\n self.logger.info(f"Taskserv workflow created: {task_id}")\n return task_id\n\n def create_cluster_workflow(self,\n operation: str,\n cluster_type: str,\n infra: str,\n settings: str = "config.ncl",\n check_mode: bool = False,\n wait: bool = False) -> str:\n """Create a cluster workflow"""\n data = {\n "operation": operation,\n "cluster_type": cluster_type,\n "infra": infra,\n "settings": settings,\n "check_mode": check_mode,\n "wait": wait\n }\n\n task_id = self._make_request("POST", "/workflows/cluster/create", json=data)\n self.logger.info(f"Cluster workflow created: {task_id}")\n return task_id\n\n def get_task_status(self, task_id: str) -> WorkflowTask:\n """Get the status of a specific task"""\n data = self._make_request("GET", f"/tasks/{task_id}")\n return WorkflowTask(\n id=data['id'],\n name=data['name'],\n status=TaskStatus(data['status']),\n created_at=data['created_at'],\n started_at=data.get('started_at'),\n completed_at=data.get('completed_at'),\n output=data.get('output'),\n error=data.get('error'),\n progress=data.get('progress')\n )\n\n def list_tasks(self, status_filter: Optional[str] = None) -> List[WorkflowTask]:\n """List all tasks, optionally filtered by status"""\n params = {}\n if status_filter:\n params['status'] = status_filter\n\n data = self._make_request("GET", "/tasks", params=params)\n return [\n WorkflowTask(\n id=task['id'],\n name=task['name'],\n status=TaskStatus(task['status']),\n created_at=task['created_at'],\n started_at=task.get('started_at'),\n completed_at=task.get('completed_at'),\n output=task.get('output'),\n error=task.get('error')\n )\n for task in data\n ]\n\n def wait_for_task_completion(self,\n task_id: str,\n timeout: int = 300,\n poll_interval: int = 5) -> WorkflowTask:\n """Wait for a task to complete"""\n start_time = time.time()\n\n while time.time() - start_time < timeout:\n task = self.get_task_status(task_id)\n\n if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]:\n self.logger.info(f"Task {task_id} finished with status: {task.status}")\n return task\n\n self.logger.debug(f"Task {task_id} status: {task.status}")\n time.sleep(poll_interval)\n\n raise TimeoutError(f"Task {task_id} did not complete within {timeout} seconds")\n\n # Batch Operations\n\n def execute_batch_operation(self, batch_config: Dict) -> Dict:\n """Execute a batch operation"""\n return self._make_request("POST", "/batch/execute", json=batch_config)\n\n def get_batch_status(self, batch_id: str) -> Dict:\n """Get batch operation status"""\n return self._make_request("GET", f"/batch/operations/{batch_id}")\n\n def cancel_batch_operation(self, batch_id: str) -> str:\n """Cancel a running batch operation"""\n return self._make_request("POST", f"/batch/operations/{batch_id}/cancel")\n\n # System Health and Monitoring\n\n def get_system_health(self) -> Dict:\n """Get system health status"""\n return self._make_request("GET", "/state/system/health")\n\n def get_system_metrics(self) -> Dict:\n """Get system metrics"""\n return self._make_request("GET", "/state/system/metrics")\n\n # WebSocket Integration\n\n async def connect_websocket(self, event_types: List[str] = None):\n """Connect to WebSocket for real-time updates"""\n if not self.token:\n await self.authenticate()\n\n ws_url = f"ws://localhost:9090/ws?token={self.token}"\n if event_types:\n ws_url += f"&events={','.join(event_types)}"\n\n try:\n self.websocket = await websockets.connect(ws_url)\n self.logger.info("WebSocket connected")\n\n # Start listening for messages\n asyncio.create_task(self._websocket_listener())\n\n except Exception as e:\n self.logger.error(f"WebSocket connection failed: {e}")\n raise\n\n async def _websocket_listener(self):\n """Listen for WebSocket messages"""\n try:\n async for message in self.websocket:\n try:\n data = json.loads(message)\n await self._handle_websocket_message(data)\n except json.JSONDecodeError:\n self.logger.error(f"Invalid JSON received: {message}")\n except Exception as e:\n self.logger.error(f"WebSocket listener error: {e}")\n\n async def _handle_websocket_message(self, data: Dict):\n """Handle incoming WebSocket messages"""\n event_type = data.get('event_type')\n if event_type and event_type in self.event_handlers:\n for handler in self.event_handlers[event_type]:\n try:\n await handler(data)\n except Exception as e:\n self.logger.error(f"Error in event handler for {event_type}: {e}")\n\n def on_event(self, event_type: str, handler: Callable):\n """Register an event handler"""\n if event_type not in self.event_handlers:\n self.event_handlers[event_type] = []\n self.event_handlers[event_type].append(handler)\n\n async def disconnect_websocket(self):\n """Disconnect from WebSocket"""\n if self.websocket:\n await self.websocket.close()\n self.websocket = None\n self.logger.info("WebSocket disconnected")\n\n# Usage Example\nasync def main():\n # Initialize client\n client = ProvisioningClient(\n username="admin",\n password="password"\n )\n\n try:\n # Authenticate\n await client.authenticate()\n\n # Create a server workflow\n task_id = client.create_server_workflow(\n infra="production",\n settings="prod-settings.ncl",\n wait=False\n )\n print(f"Server workflow created: {task_id}")\n\n # Set up WebSocket event handlers\n async def on_task_update(event):\n print(f"Task update: {event['data']['task_id']} -> {event['data']['status']}")\n\n async def on_system_health(event):\n print(f"System health: {event['data']['overall_status']}")\n\n client.on_event('TaskStatusChanged', on_task_update)\n client.on_event('SystemHealthUpdate', on_system_health)\n\n # Connect to WebSocket\n await client.connect_websocket(['TaskStatusChanged', 'SystemHealthUpdate'])\n\n # Wait for task completion\n final_task = client.wait_for_task_completion(task_id, timeout=600)\n print(f"Task completed with status: {final_task.status}")\n\n if final_task.status == TaskStatus.COMPLETED:\n print(f"Output: {final_task.output}")\n elif final_task.status == TaskStatus.FAILED:\n print(f"Error: {final_task.error}")\n\n except ProvisioningAPIError as e:\n print(f"API Error: {e}")\n except Exception as e:\n print(f"Unexpected error: {e}")\n finally:\n await client.disconnect_websocket()\n\nif __name__ == "__main__":\n asyncio.run(main())\n\n\n### Node.js/JavaScript Integration\n\n#### Complete JavaScript/TypeScript Client\n\n\nimport axios, { AxiosInstance, AxiosResponse } from 'axios';\nimport WebSocket from 'ws';\nimport { EventEmitter } from 'events';\n\ninterface Task {\n id: string;\n name: string;\n status: 'Pending' | 'Running' | 'Completed' | 'Failed' | 'Cancelled';\n created_at: string;\n started_at?: string;\n completed_at?: string;\n output?: string;\n error?: string;\n progress?: number;\n}\n\ninterface BatchConfig {\n name: string;\n version: string;\n storage_backend: string;\n parallel_limit: number;\n rollback_enabled: boolean;\n operations: Array<{\n id: string;\n type: string;\n provider: string;\n dependencies: string[];\n [key: string]: any;\n }>;\n}\n\ninterface WebSocketEvent {\n event_type: string;\n timestamp: string;\n data: any;\n metadata: Record<string, any>;\n}\n\nclass ProvisioningClient extends EventEmitter {\n private httpClient: AxiosInstance;\n private authClient: AxiosInstance;\n private websocket?: WebSocket;\n private token?: string;\n private reconnectAttempts = 0;\n private maxReconnectAttempts = 10;\n private reconnectInterval = 5000;\n\n constructor(\n private baseUrl = 'http://localhost:9090',\n private authUrl = 'http://localhost:8081',\n private username?: string,\n private password?: string,\n token?: string\n ) {\n super();\n\n this.token = token;\n\n // Setup HTTP clients\n this.httpClient = axios.create({\n baseURL: baseUrl,\n timeout: 30000,\n });\n\n this.authClient = axios.create({\n baseURL: authUrl,\n timeout: 10000,\n });\n\n // Setup request interceptors\n this.setupInterceptors();\n }\n\n private setupInterceptors(): void {\n // Request interceptor to add auth token\n this.httpClient.interceptors.request.use((config) => {\n if (this.token) {\n config.headers.Authorization = `Bearer ${this.token}`;\n }\n return config;\n });\n\n // Response interceptor for error handling\n this.httpClient.interceptors.response.use(\n (response) => response,\n async (error) => {\n if (error.response?.status === 401 && this.username && this.password) {\n // Token expired, try to refresh\n try {\n await this.authenticate();\n // Retry the original request\n const originalRequest = error.config;\n originalRequest.headers.Authorization = `Bearer ${this.token}`;\n return this.httpClient.request(originalRequest);\n } catch (authError) {\n this.emit('authError', authError);\n throw error;\n }\n }\n throw error;\n }\n );\n }\n\n async authenticate(): Promise<string> {\n if (this.token) {\n return this.token;\n }\n\n if (!this.username || !this.password) {\n throw new Error('Username and password required for authentication');\n }\n\n try {\n const response = await this.authClient.post('/auth/login', {\n username: this.username,\n password: this.password,\n });\n\n const result = response.data;\n if (!result.success) {\n throw new Error(result.error || 'Authentication failed');\n }\n\n this.token = result.data.token;\n console.log('Authentication successful');\n this.emit('authenticated', this.token);\n\n return this.token;\n } catch (error) {\n console.error('Authentication failed:', error);\n throw new Error(`Authentication failed: ${error.message}`);\n }\n }\n\n private async makeRequest<T>(method: string, endpoint: string, data?: any): Promise<T> {\n try {\n const response: AxiosResponse = await this.httpClient.request({\n method,\n url: endpoint,\n data,\n });\n\n const result = response.data;\n if (!result.success) {\n throw new Error(result.error || 'Request failed');\n }\n\n return result.data;\n } catch (error) {\n console.error(`Request failed: ${method} ${endpoint}`, error);\n throw error;\n }\n }\n\n // Workflow Management Methods\n\n async createServerWorkflow(config: {\n infra: string;\n settings?: string;\n check_mode?: boolean;\n wait?: boolean;\n }): Promise<string> {\n const data = {\n infra: config.infra,\n settings: config.settings || 'config.ncl',\n check_mode: config.check_mode || false,\n wait: config.wait || false,\n };\n\n const taskId = await this.makeRequest<string>('POST', '/workflows/servers/create', data);\n console.log(`Server workflow created: ${taskId}`);\n this.emit('workflowCreated', { type: 'server', taskId });\n return taskId;\n }\n\n async createTaskservWorkflow(config: {\n operation: string;\n taskserv: string;\n infra: string;\n settings?: string;\n check_mode?: boolean;\n wait?: boolean;\n }): Promise<string> {\n const data = {\n operation: config.operation,\n taskserv: config.taskserv,\n infra: config.infra,\n settings: config.settings || 'config.ncl',\n check_mode: config.check_mode || false,\n wait: config.wait || false,\n };\n\n const taskId = await this.makeRequest<string>('POST', '/workflows/taskserv/create', data);\n console.log(`Taskserv workflow created: ${taskId}`);\n this.emit('workflowCreated', { type: 'taskserv', taskId });\n return taskId;\n }\n\n async createClusterWorkflow(config: {\n operation: string;\n cluster_type: string;\n infra: string;\n settings?: string;\n check_mode?: boolean;\n wait?: boolean;\n }): Promise<string> {\n const data = {\n operation: config.operation,\n cluster_type: config.cluster_type,\n infra: config.infra,\n settings: config.settings || 'config.ncl',\n check_mode: config.check_mode || false,\n wait: config.wait || false,\n };\n\n const taskId = await this.makeRequest<string>('POST', '/workflows/cluster/create', data);\n console.log(`Cluster workflow created: ${taskId}`);\n this.emit('workflowCreated', { type: 'cluster', taskId });\n return taskId;\n }\n\n async getTaskStatus(taskId: string): Promise<Task> {\n return this.makeRequest<Task>('GET', `/tasks/${taskId}`);\n }\n\n async listTasks(statusFilter?: string): Promise<Task[]> {\n const params = statusFilter ? `?status=${statusFilter}` : '';\n return this.makeRequest<Task[]>('GET', `/tasks${params}`);\n }\n\n async waitForTaskCompletion(\n taskId: string,\n timeout = 300000, // 5 minutes\n pollInterval = 5000 // 5 seconds\n ): Promise<Task> {\n return new Promise((resolve, reject) => {\n const startTime = Date.now();\n\n const poll = async () => {\n try {\n const task = await this.getTaskStatus(taskId);\n\n if (['Completed', 'Failed', 'Cancelled'].includes(task.status)) {\n console.log(`Task ${taskId} finished with status: ${task.status}`);\n resolve(task);\n return;\n }\n\n if (Date.now() - startTime > timeout) {\n reject(new Error(`Task ${taskId} did not complete within ${timeout}ms`));\n return;\n }\n\n console.log(`Task ${taskId} status: ${task.status}`);\n this.emit('taskProgress', task);\n setTimeout(poll, pollInterval);\n } catch (error) {\n reject(error);\n }\n };\n\n poll();\n });\n }\n\n // Batch Operations\n\n async executeBatchOperation(batchConfig: BatchConfig): Promise<any> {\n const result = await this.makeRequest('POST', '/batch/execute', batchConfig);\n console.log(`Batch operation started: ${result.batch_id}`);\n this.emit('batchStarted', result);\n return result;\n }\n\n async getBatchStatus(batchId: string): Promise<any> {\n return this.makeRequest('GET', `/batch/operations/${batchId}`);\n }\n\n async cancelBatchOperation(batchId: string): Promise<string> {\n return this.makeRequest('POST', `/batch/operations/${batchId}/cancel`);\n }\n\n // System Monitoring\n\n async getSystemHealth(): Promise<any> {\n return this.makeRequest('GET', '/state/system/health');\n }\n\n async getSystemMetrics(): Promise<any> {\n return this.makeRequest('GET', '/state/system/metrics');\n }\n\n // WebSocket Integration\n\n async connectWebSocket(eventTypes?: string[]): Promise<void> {\n if (!this.token) {\n await this.authenticate();\n }\n\n let wsUrl = `ws://localhost:9090/ws?token=${this.token}`;\n if (eventTypes && eventTypes.length > 0) {\n wsUrl += `&events=${eventTypes.join(',')}`;\n }\n\n return new Promise((resolve, reject) => {\n this.websocket = new WebSocket(wsUrl);\n\n this.websocket.on('open', () => {\n console.log('WebSocket connected');\n this.reconnectAttempts = 0;\n this.emit('websocketConnected');\n resolve();\n });\n\n this.websocket.on('message', (data: WebSocket.Data) => {\n try {\n const event: WebSocketEvent = JSON.parse(data.toString());\n this.handleWebSocketMessage(event);\n } catch (error) {\n console.error('Failed to parse WebSocket message:', error);\n }\n });\n\n this.websocket.on('close', (code: number, reason: string) => {\n console.log(`WebSocket disconnected: ${code} - ${reason}`);\n this.emit('websocketDisconnected', { code, reason });\n\n if (this.reconnectAttempts < this.maxReconnectAttempts) {\n setTimeout(() => {\n this.reconnectAttempts++;\n console.log(`Reconnecting... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`);\n this.connectWebSocket(eventTypes);\n }, this.reconnectInterval);\n }\n });\n\n this.websocket.on('error', (error: Error) => {\n console.error('WebSocket error:', error);\n this.emit('websocketError', error);\n reject(error);\n });\n });\n }\n\n private handleWebSocketMessage(event: WebSocketEvent): void {\n console.log(`WebSocket event: ${event.event_type}`);\n\n // Emit specific event\n this.emit(event.event_type, event);\n\n // Emit general event\n this.emit('websocketMessage', event);\n\n // Handle specific event types\n switch (event.event_type) {\n case 'TaskStatusChanged':\n this.emit('taskStatusChanged', event.data);\n break;\n case 'WorkflowProgressUpdate':\n this.emit('workflowProgress', event.data);\n break;\n case 'SystemHealthUpdate':\n this.emit('systemHealthUpdate', event.data);\n break;\n case 'BatchOperationUpdate':\n this.emit('batchUpdate', event.data);\n break;\n }\n }\n\n disconnectWebSocket(): void {\n if (this.websocket) {\n this.websocket.close();\n this.websocket = undefined;\n console.log('WebSocket disconnected');\n }\n }\n\n // Utility Methods\n\n async healthCheck(): Promise<boolean> {\n try {\n const response = await this.httpClient.get('/health');\n return response.data.success;\n } catch (error) {\n return false;\n }\n }\n}\n\n// Usage Example\nasync function main() {\n const client = new ProvisioningClient(\n 'http://localhost:9090',\n 'http://localhost:8081',\n 'admin',\n 'password'\n );\n\n try {\n // Authenticate\n await client.authenticate();\n\n // Set up event listeners\n client.on('taskStatusChanged', (task) => {\n console.log(`Task ${task.task_id} status changed to: ${task.status}`);\n });\n\n client.on('workflowProgress', (progress) => {\n console.log(`Workflow progress: ${progress.progress}% - ${progress.current_step}`);\n });\n\n client.on('systemHealthUpdate', (health) => {\n console.log(`System health: ${health.overall_status}`);\n });\n\n // Connect WebSocket\n await client.connectWebSocket(['TaskStatusChanged', 'WorkflowProgressUpdate', 'SystemHealthUpdate']);\n\n // Create workflows\n const serverTaskId = await client.createServerWorkflow({\n infra: 'production',\n settings: 'prod-settings.ncl',\n });\n\n const taskservTaskId = await client.createTaskservWorkflow({\n operation: 'create',\n taskserv: 'kubernetes',\n infra: 'production',\n });\n\n // Wait for completion\n const [serverTask, taskservTask] = await Promise.all([\n client.waitForTaskCompletion(serverTaskId),\n client.waitForTaskCompletion(taskservTaskId),\n ]);\n\n console.log('All workflows completed');\n console.log(`Server task: ${serverTask.status}`);\n console.log(`Taskserv task: ${taskservTask.status}`);\n\n // Create batch operation\n const batchConfig: BatchConfig = {\n name: 'test_deployment',\n version: '1.0.0',\n storage_backend: 'filesystem',\n parallel_limit: 3,\n rollback_enabled: true,\n operations: [\n {\n id: 'servers',\n type: 'server_batch',\n provider: 'upcloud',\n dependencies: [],\n server_configs: [\n { name: 'web-01', plan: '1xCPU-2 GB', zone: 'de-fra1' },\n { name: 'web-02', plan: '1xCPU-2 GB', zone: 'de-fra1' },\n ],\n },\n {\n id: 'taskservs',\n type: 'taskserv_batch',\n provider: 'upcloud',\n dependencies: ['servers'],\n taskservs: ['kubernetes', 'cilium'],\n },\n ],\n };\n\n const batchResult = await client.executeBatchOperation(batchConfig);\n console.log(`Batch operation started: ${batchResult.batch_id}`);\n\n // Monitor batch operation\n const monitorBatch = setInterval(async () => {\n try {\n const batchStatus = await client.getBatchStatus(batchResult.batch_id);\n console.log(`Batch status: ${batchStatus.status} - ${batchStatus.progress}%`);\n\n if (['Completed', 'Failed', 'Cancelled'].includes(batchStatus.status)) {\n clearInterval(monitorBatch);\n console.log(`Batch operation finished: ${batchStatus.status}`);\n }\n } catch (error) {\n console.error('Error checking batch status:', error);\n clearInterval(monitorBatch);\n }\n }, 10000);\n\n } catch (error) {\n console.error('Integration example failed:', error);\n } finally {\n client.disconnectWebSocket();\n }\n}\n\n// Run example\nif (require.main === module) {\n main().catch(console.error);\n}\n\nexport { ProvisioningClient, Task, BatchConfig };\n\n\n## Error Handling Strategies\n\n### Comprehensive Error Handling\n\n\nclass ProvisioningErrorHandler:\n """Centralized error handling for provisioning operations"""\n\n def __init__(self, client: ProvisioningClient):\n self.client = client\n self.retry_strategies = {\n 'network_error': self._exponential_backoff,\n 'rate_limit': self._rate_limit_backoff,\n 'server_error': self._server_error_strategy,\n 'auth_error': self._auth_error_strategy,\n }\n\n async def execute_with_retry(self, operation: Callable, *args, **kwargs):\n """Execute operation with intelligent retry logic"""\n max_attempts = 3\n attempt = 0\n\n while attempt < max_attempts:\n try:\n return await operation(*args, **kwargs)\n except Exception as e:\n attempt += 1\n error_type = self._classify_error(e)\n\n if attempt >= max_attempts:\n self._log_final_failure(operation.__name__, e, attempt)\n raise\n\n retry_strategy = self.retry_strategies.get(error_type, self._default_retry)\n wait_time = retry_strategy(attempt, e)\n\n self._log_retry_attempt(operation.__name__, e, attempt, wait_time)\n await asyncio.sleep(wait_time)\n\n def _classify_error(self, error: Exception) -> str:\n """Classify error type for appropriate retry strategy"""\n if isinstance(error, requests.ConnectionError):\n return 'network_error'\n elif isinstance(error, requests.HTTPError):\n if error.response.status_code == 429:\n return 'rate_limit'\n elif 500 <= error.response.status_code < 600:\n return 'server_error'\n elif error.response.status_code == 401:\n return 'auth_error'\n return 'unknown'\n\n def _exponential_backoff(self, attempt: int, error: Exception) -> float:\n """Exponential backoff for network errors"""\n return min(2 ** attempt + random.uniform(0, 1), 60)\n\n def _rate_limit_backoff(self, attempt: int, error: Exception) -> float:\n """Handle rate limiting with appropriate backoff"""\n retry_after = getattr(error.response, 'headers', {}).get('Retry-After')\n if retry_after:\n return float(retry_after)\n return 60 # Default to 60 seconds\n\n def _server_error_strategy(self, attempt: int, error: Exception) -> float:\n """Handle server errors"""\n return min(10 * attempt, 60)\n\n def _auth_error_strategy(self, attempt: int, error: Exception) -> float:\n """Handle authentication errors"""\n # Re-authenticate before retry\n asyncio.create_task(self.client.authenticate())\n return 5\n\n def _default_retry(self, attempt: int, error: Exception) -> float:\n """Default retry strategy"""\n return min(5 * attempt, 30)\n\n# Usage example\nasync def robust_workflow_execution():\n client = ProvisioningClient()\n handler = ProvisioningErrorHandler(client)\n\n try:\n # Execute with automatic retry\n task_id = await handler.execute_with_retry(\n client.create_server_workflow,\n infra="production",\n settings="config.ncl"\n )\n\n # Wait for completion with retry\n task = await handler.execute_with_retry(\n client.wait_for_task_completion,\n task_id,\n timeout=600\n )\n\n return task\n except Exception as e:\n # Log detailed error information\n logger.error(f"Workflow execution failed after all retries: {e}")\n # Implement fallback strategy\n return await fallback_workflow_strategy()\n\n\n### Circuit Breaker Pattern\n\n\nclass CircuitBreaker {\n private failures = 0;\n private nextAttempt = Date.now();\n private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED';\n\n constructor(\n private threshold = 5,\n private timeout = 60000, // 1 minute\n private monitoringPeriod = 10000 // 10 seconds\n ) {}\n\n async execute<T>(operation: () => Promise<T>): Promise<T> {\n if (this.state === 'OPEN') {\n if (Date.now() < this.nextAttempt) {\n throw new Error('Circuit breaker is OPEN');\n }\n this.state = 'HALF_OPEN';\n }\n\n try {\n const result = await operation();\n this.onSuccess();\n return result;\n } catch (error) {\n this.onFailure();\n throw error;\n }\n }\n\n private onSuccess(): void {\n this.failures = 0;\n this.state = 'CLOSED';\n }\n\n private onFailure(): void {\n this.failures++;\n if (this.failures >= this.threshold) {\n this.state = 'OPEN';\n this.nextAttempt = Date.now() + this.timeout;\n }\n }\n\n getState(): string {\n return this.state;\n }\n\n getFailures(): number {\n return this.failures;\n }\n}\n\n// Usage with ProvisioningClient\nclass ResilientProvisioningClient {\n private circuitBreaker = new CircuitBreaker();\n\n constructor(private client: ProvisioningClient) {}\n\n async createServerWorkflow(config: any): Promise<string> {\n return this.circuitBreaker.execute(async () => {\n return this.client.createServerWorkflow(config);\n });\n }\n\n async getTaskStatus(taskId: string): Promise<Task> {\n return this.circuitBreaker.execute(async () => {\n return this.client.getTaskStatus(taskId);\n });\n }\n}\n\n\n## Performance Optimization\n\n### Connection Pooling and Caching\n\n\nimport asyncio\nimport aiohttp\nfrom cachetools import TTLCache\nimport time\n\nclass OptimizedProvisioningClient:\n """High-performance client with connection pooling and caching"""\n\n def __init__(self, base_url: str, max_connections: int = 100):\n self.base_url = base_url\n self.session = None\n self.cache = TTLCache(maxsize=1000, ttl=300) # 5-minute cache\n self.max_connections = max_connections\n\n async def __aenter__(self):\n """Async context manager entry"""\n connector = aiohttp.TCPConnector(\n limit=self.max_connections,\n limit_per_host=20,\n keepalive_timeout=30,\n enable_cleanup_closed=True\n )\n\n timeout = aiohttp.ClientTimeout(total=30, connect=5)\n\n self.session = aiohttp.ClientSession(\n connector=connector,\n timeout=timeout,\n headers={'User-Agent': 'ProvisioningClient/2.0.0'}\n )\n\n return self\n\n async def __aexit__(self, exc_type, exc_val, exc_tb):\n """Async context manager exit"""\n if self.session:\n await self.session.close()\n\n async def get_task_status_cached(self, task_id: str) -> dict:\n """Get task status with caching"""\n cache_key = f"task_status:{task_id}"\n\n # Check cache first\n if cache_key in self.cache:\n return self.cache[cache_key]\n\n # Fetch from API\n result = await self._make_request('GET', f'/tasks/{task_id}')\n\n # Cache completed tasks for longer\n if result.get('status') in ['Completed', 'Failed', 'Cancelled']:\n self.cache[cache_key] = result\n\n return result\n\n async def batch_get_task_status(self, task_ids: list) -> dict:\n """Get multiple task statuses in parallel"""\n tasks = [self.get_task_status_cached(task_id) for task_id in task_ids]\n results = await asyncio.gather(*tasks, return_exceptions=True)\n\n return {\n task_id: result for task_id, result in zip(task_ids, results)\n if not isinstance(result, Exception)\n }\n\n async def _make_request(self, method: str, endpoint: str, **kwargs):\n """Optimized HTTP request method"""\n url = f"{self.base_url}{endpoint}"\n\n start_time = time.time()\n async with self.session.request(method, url, **kwargs) as response:\n request_time = time.time() - start_time\n\n # Log slow requests\n if request_time > 5.0:\n print(f"Slow request: {method} {endpoint} took {request_time:.2f}s")\n\n response.raise_for_status()\n result = await response.json()\n\n if not result.get('success'):\n raise Exception(result.get('error', 'Request failed'))\n\n return result['data']\n\n# Usage example\nasync def high_performance_workflow():\n async with OptimizedProvisioningClient('http://localhost:9090') as client:\n # Create multiple workflows in parallel\n workflow_tasks = [\n client.create_server_workflow({'infra': f'server-{i}'})\n for i in range(10)\n ]\n\n task_ids = await asyncio.gather(*workflow_tasks)\n print(f"Created {len(task_ids)} workflows")\n\n # Monitor all tasks efficiently\n while True:\n # Batch status check\n statuses = await client.batch_get_task_status(task_ids)\n\n completed = [\n task_id for task_id, status in statuses.items()\n if status.get('status') in ['Completed', 'Failed', 'Cancelled']\n ]\n\n print(f"Completed: {len(completed)}/{len(task_ids)}")\n\n if len(completed) == len(task_ids):\n break\n\n await asyncio.sleep(10)\n\n\n### WebSocket Connection Pooling\n\n\nclass WebSocketPool {\n constructor(maxConnections = 5) {\n this.maxConnections = maxConnections;\n this.connections = new Map();\n this.connectionQueue = [];\n }\n\n async getConnection(token, eventTypes = []) {\n const key = `${token}:${eventTypes.sort().join(',')}`;\n\n if (this.connections.has(key)) {\n return this.connections.get(key);\n }\n\n if (this.connections.size >= this.maxConnections) {\n // Wait for available connection\n await this.waitForAvailableSlot();\n }\n\n const connection = await this.createConnection(token, eventTypes);\n this.connections.set(key, connection);\n\n return connection;\n }\n\n async createConnection(token, eventTypes) {\n const ws = new WebSocket(`ws://localhost:9090/ws?token=${token}&events=${eventTypes.join(',')}`);\n\n return new Promise((resolve, reject) => {\n ws.onopen = () => resolve(ws);\n ws.onerror = (error) => reject(error);\n\n ws.onclose = () => {\n // Remove from pool when closed\n for (const [key, conn] of this.connections.entries()) {\n if (conn === ws) {\n this.connections.delete(key);\n break;\n }\n }\n };\n });\n }\n\n async waitForAvailableSlot() {\n return new Promise((resolve) => {\n this.connectionQueue.push(resolve);\n });\n }\n\n releaseConnection(ws) {\n if (this.connectionQueue.length > 0) {\n const waitingResolver = this.connectionQueue.shift();\n waitingResolver();\n }\n }\n}\n\n\n## SDK Documentation\n\n### Python SDK\n\nThe Python SDK provides a comprehensive interface for provisioning:\n\n#### Installation\n\n\npip install provisioning-client\n\n\n#### Quick Start\n\n\nfrom provisioning_client import ProvisioningClient\n\n# Initialize client\nclient = ProvisioningClient(\n base_url="http://localhost:9090",\n username="admin",\n password="password"\n)\n\n# Create workflow\ntask_id = await client.create_server_workflow(\n infra="production",\n settings="config.ncl"\n)\n\n# Wait for completion\ntask = await client.wait_for_task_completion(task_id)\nprint(f"Workflow completed: {task.status}")\n\n\n#### Advanced Usage\n\n\n# Use with async context manager\nasync with ProvisioningClient() as client:\n # Batch operations\n batch_config = {\n "name": "deployment",\n "operations": [...]\n }\n\n batch_result = await client.execute_batch_operation(batch_config)\n\n # Real-time monitoring\n await client.connect_websocket(['TaskStatusChanged'])\n\n client.on_event('TaskStatusChanged', handle_task_update)\n\n\n### JavaScript/TypeScript SDK\n\n#### Installation\n\n\nnpm install @provisioning/client\n\n\n#### Usage\n\n\nimport { ProvisioningClient } from '@provisioning/client';\n\nconst client = new ProvisioningClient({\n baseUrl: 'http://localhost:9090',\n username: 'admin',\n password: 'password'\n});\n\n// Create workflow\nconst taskId = await client.createServerWorkflow({\n infra: 'production',\n settings: 'config.ncl'\n});\n\n// Monitor progress\nclient.on('workflowProgress', (progress) => {\n console.log(`Progress: ${progress.progress}%`);\n});\n\nawait client.connectWebSocket();\n\n\n## Common Integration Patterns\n\n### Workflow Orchestration Pipeline\n\n\nclass WorkflowPipeline:\n """Orchestrate complex multi-step workflows"""\n\n def __init__(self, client: ProvisioningClient):\n self.client = client\n self.steps = []\n\n def add_step(self, name: str, operation: Callable, dependencies: list = None):\n """Add a step to the pipeline"""\n self.steps.append({\n 'name': name,\n 'operation': operation,\n 'dependencies': dependencies or [],\n 'status': 'pending',\n 'result': None\n })\n\n async def execute(self):\n """Execute the pipeline"""\n completed_steps = set()\n\n while len(completed_steps) < len(self.steps):\n # Find steps ready to execute\n ready_steps = [\n step for step in self.steps\n if (step['status'] == 'pending' and\n all(dep in completed_steps for dep in step['dependencies']))\n ]\n\n if not ready_steps:\n raise Exception("Pipeline deadlock detected")\n\n # Execute ready steps in parallel\n tasks = []\n for step in ready_steps:\n step['status'] = 'running'\n tasks.append(self._execute_step(step))\n\n # Wait for completion\n results = await asyncio.gather(*tasks, return_exceptions=True)\n\n for step, result in zip(ready_steps, results):\n if isinstance(result, Exception):\n step['status'] = 'failed'\n step['error'] = str(result)\n raise Exception(f"Step {step['name']} failed: {result}")\n else:\n step['status'] = 'completed'\n step['result'] = result\n completed_steps.add(step['name'])\n\n async def _execute_step(self, step):\n """Execute a single step"""\n try:\n return await step['operation']()\n except Exception as e:\n print(f"Step {step['name']} failed: {e}")\n raise\n\n# Usage example\nasync def complex_deployment():\n client = ProvisioningClient()\n pipeline = WorkflowPipeline(client)\n\n # Define deployment steps\n pipeline.add_step('servers', lambda: client.create_server_workflow({\n 'infra': 'production'\n }))\n\n pipeline.add_step('kubernetes', lambda: client.create_taskserv_workflow({\n 'operation': 'create',\n 'taskserv': 'kubernetes',\n 'infra': 'production'\n }), dependencies=['servers'])\n\n pipeline.add_step('cilium', lambda: client.create_taskserv_workflow({\n 'operation': 'create',\n 'taskserv': 'cilium',\n 'infra': 'production'\n }), dependencies=['kubernetes'])\n\n # Execute pipeline\n await pipeline.execute()\n print("Deployment pipeline completed successfully")\n\n\n### Event-Driven Architecture\n\n\nclass EventDrivenWorkflowManager {\n constructor(client) {\n this.client = client;\n this.workflows = new Map();\n this.setupEventHandlers();\n }\n\n setupEventHandlers() {\n this.client.on('TaskStatusChanged', this.handleTaskStatusChange.bind(this));\n this.client.on('WorkflowProgressUpdate', this.handleProgressUpdate.bind(this));\n this.client.on('SystemHealthUpdate', this.handleHealthUpdate.bind(this));\n }\n\n async createWorkflow(config) {\n const workflowId = generateUUID();\n const workflow = {\n id: workflowId,\n config,\n tasks: [],\n status: 'pending',\n progress: 0,\n events: []\n };\n\n this.workflows.set(workflowId, workflow);\n\n // Start workflow execution\n await this.executeWorkflow(workflow);\n\n return workflowId;\n }\n\n async executeWorkflow(workflow) {\n try {\n workflow.status = 'running';\n\n // Create initial tasks based on configuration\n const taskId = await this.client.createServerWorkflow(workflow.config);\n workflow.tasks.push({\n id: taskId,\n type: 'server_creation',\n status: 'pending'\n });\n\n this.emit('workflowStarted', { workflowId: workflow.id, taskId });\n\n } catch (error) {\n workflow.status = 'failed';\n workflow.error = error.message;\n this.emit('workflowFailed', { workflowId: workflow.id, error });\n }\n }\n\n handleTaskStatusChange(event) {\n // Find workflows containing this task\n for (const [workflowId, workflow] of this.workflows) {\n const task = workflow.tasks.find(t => t.id === event.data.task_id);\n if (task) {\n task.status = event.data.status;\n this.updateWorkflowProgress(workflow);\n\n // Trigger next steps based on task completion\n if (event.data.status === 'Completed') {\n this.triggerNextSteps(workflow, task);\n }\n }\n }\n }\n\n updateWorkflowProgress(workflow) {\n const completedTasks = workflow.tasks.filter(t =>\n ['Completed', 'Failed'].includes(t.status)\n ).length;\n\n workflow.progress = (completedTasks / workflow.tasks.length) * 100;\n\n if (completedTasks === workflow.tasks.length) {\n const failedTasks = workflow.tasks.filter(t => t.status === 'Failed');\n workflow.status = failedTasks.length > 0 ? 'failed' : 'completed';\n\n this.emit('workflowCompleted', {\n workflowId: workflow.id,\n status: workflow.status\n });\n }\n }\n\n async triggerNextSteps(workflow, completedTask) {\n // Define workflow dependencies and next steps\n const nextSteps = this.getNextSteps(workflow, completedTask);\n\n for (const nextStep of nextSteps) {\n try {\n const taskId = await this.executeWorkflowStep(nextStep);\n workflow.tasks.push({\n id: taskId,\n type: nextStep.type,\n status: 'pending',\n dependencies: [completedTask.id]\n });\n } catch (error) {\n console.error(`Failed to trigger next step: ${error.message}`);\n }\n }\n }\n\n getNextSteps(workflow, completedTask) {\n // Define workflow logic based on completed task type\n switch (completedTask.type) {\n case 'server_creation':\n return [\n { type: 'kubernetes_installation', taskserv: 'kubernetes' },\n { type: 'monitoring_setup', taskserv: 'prometheus' }\n ];\n case 'kubernetes_installation':\n return [\n { type: 'networking_setup', taskserv: 'cilium' }\n ];\n default:\n return [];\n }\n }\n}\n\n\nThis comprehensive integration documentation provides developers with everything needed to successfully integrate with provisioning, including\ncomplete client implementations, error handling strategies, performance optimizations, and common integration patterns.