#!/usr/bin/env python3 """ Provisioning API Python Client Example """ import os import sys import time import json import requests from typing import Dict, Optional, Any class ProvisioningClient: """Client for Provisioning API""" def __init__(self, base_url: str = "http://localhost:8083", username: str = "admin", password: str = "admin123"): self.base_url = base_url.rstrip('/') self.username = username self.password = password self.token: Optional[str] = None self.refresh_token: Optional[str] = None def login(self) -> Dict[str, Any]: """Login and get access token""" url = f"{self.base_url}/v1/auth/login" data = { "username": self.username, "password": self.password } response = requests.post(url, json=data) response.raise_for_status() result = response.json() self.token = result["token"] self.refresh_token = result["refresh_token"] return result def refresh_access_token(self) -> Dict[str, Any]: """Refresh access token""" if not self.refresh_token: raise ValueError("No refresh token available") url = f"{self.base_url}/v1/auth/refresh" data = {"refresh_token": self.refresh_token} response = requests.post(url, json=data) response.raise_for_status() result = response.json() self.token = result["token"] self.refresh_token = result["refresh_token"] return result def _headers(self) -> Dict[str, str]: """Get authorization headers""" if not self.token: self.login() return { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" } def health_check(self) -> Dict[str, Any]: """Check server health""" url = f"{self.base_url}/health" response = requests.get(url) response.raise_for_status() return response.json() def list_servers(self) -> list: """List all servers""" url = f"{self.base_url}/v1/servers" response = requests.get(url, headers=self._headers()) response.raise_for_status() return response.json() def create_server(self, workspace: str, provider: str, plan: str, hostname: str, zone: Optional[str] = None, check_mode: bool = False, tags: Optional[list] = None) -> Dict[str, Any]: """Create a new server""" url = f"{self.base_url}/v1/servers/create" data = { "workspace": workspace, "provider": provider, "plan": plan, "hostname": hostname, "check_mode": check_mode } if zone: data["zone"] = zone if tags: data["tags"] = tags response = requests.post(url, headers=self._headers(), json=data) response.raise_for_status() return response.json() def delete_server(self, server_id: str, workspace: str, force: bool = False) -> Dict[str, Any]: """Delete a server""" url = f"{self.base_url}/v1/servers/{server_id}" data = { "workspace": workspace, "server_id": server_id, "force": force } response = requests.delete(url, headers=self._headers(), json=data) response.raise_for_status() return response.json() def list_taskservs(self) -> list: """List all taskservs""" url = f"{self.base_url}/v1/taskservs" response = requests.get(url, headers=self._headers()) response.raise_for_status() return response.json() def create_taskserv(self, workspace: str, name: str, servers: list, config: Optional[Dict] = None, check_mode: bool = False) -> Dict[str, Any]: """Create a new taskserv""" url = f"{self.base_url}/v1/taskservs/create" data = { "workspace": workspace, "name": name, "servers": servers, "check_mode": check_mode } if config: data["config"] = config response = requests.post(url, headers=self._headers(), json=data) response.raise_for_status() return response.json() def submit_workflow(self, workspace: str, workflow_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]: """Submit a workflow""" url = f"{self.base_url}/v1/workflows/submit" data = { "workspace": workspace, "workflow_type": workflow_type, "parameters": parameters } response = requests.post(url, headers=self._headers(), json=data) response.raise_for_status() return response.json() def get_operation(self, operation_id: str) -> Dict[str, Any]: """Get operation status""" url = f"{self.base_url}/v1/operations/{operation_id}" response = requests.get(url, headers=self._headers()) response.raise_for_status() return response.json() def wait_for_operation(self, operation_id: str, timeout: int = 300, interval: int = 5) -> Dict[str, Any]: """Wait for operation to complete""" start_time = time.time() while time.time() - start_time < timeout: operation = self.get_operation(operation_id) status = operation["status"] if status == "completed": return operation elif status == "failed": raise Exception(f"Operation failed: {operation.get('error')}") elif status == "cancelled": raise Exception("Operation was cancelled") time.sleep(interval) raise TimeoutError(f"Operation timed out after {timeout} seconds") def list_operations(self) -> list: """List all operations""" url = f"{self.base_url}/v1/operations" response = requests.get(url, headers=self._headers()) response.raise_for_status() return response.json() def cancel_operation(self, operation_id: str) -> Dict[str, Any]: """Cancel an operation""" url = f"{self.base_url}/v1/operations/{operation_id}/cancel" response = requests.post(url, headers=self._headers()) response.raise_for_status() return response.json() def main(): """Example usage""" # Get configuration from environment api_url = os.getenv("API_URL", "http://localhost:8083") username = os.getenv("USERNAME", "admin") password = os.getenv("PASSWORD", "admin123") # Create client client = ProvisioningClient(api_url, username, password) # Health check print("Checking server health...") health = client.health_check() print(f"Server status: {health['status']}") print(f"Version: {health['version']}") print() # Login print("Logging in...") client.login() print("Login successful") print() # List servers print("Listing servers...") servers = client.list_servers() print(f"Found {len(servers)} servers") for server in servers: print(f" - {server.get('hostname', 'unknown')}") print() # Create server (check mode) print("Creating server (check mode)...") operation = client.create_server( workspace="default", provider="upcloud", plan="1xCPU-2GB", hostname="test-server", zone="de-fra1", check_mode=True, tags=["test", "api"] ) operation_id = operation["id"] print(f"Operation ID: {operation_id}") print() # Wait for operation to complete print("Waiting for operation to complete...") try: result = client.wait_for_operation(operation_id, timeout=60) print(f"Operation completed: {result['status']}") if result.get('result'): print(f"Result: {json.dumps(result['result'], indent=2)}") except Exception as e: print(f"Error: {e}") print() # List operations print("Listing recent operations...") operations = client.list_operations() print(f"Found {len(operations)} operations") for op in operations[:5]: # Show last 5 print(f" - {op['id']}: {op['status']}") print() if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\nInterrupted by user") sys.exit(0) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1)