2025-10-07 10:59:52 +01:00

272 lines
8.4 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Provisioning API Python Client Example
"""
import os
import sys
import time
import json
import requests
from typing import Dict, Optional, Any
class ProvisioningClient:
"""Client for Provisioning API"""
def __init__(self, base_url: str = "http://localhost:8083", username: str = "admin", password: str = "admin123"):
self.base_url = base_url.rstrip('/')
self.username = username
self.password = password
self.token: Optional[str] = None
self.refresh_token: Optional[str] = None
def login(self) -> Dict[str, Any]:
"""Login and get access token"""
url = f"{self.base_url}/v1/auth/login"
data = {
"username": self.username,
"password": self.password
}
response = requests.post(url, json=data)
response.raise_for_status()
result = response.json()
self.token = result["token"]
self.refresh_token = result["refresh_token"]
return result
def refresh_access_token(self) -> Dict[str, Any]:
"""Refresh access token"""
if not self.refresh_token:
raise ValueError("No refresh token available")
url = f"{self.base_url}/v1/auth/refresh"
data = {"refresh_token": self.refresh_token}
response = requests.post(url, json=data)
response.raise_for_status()
result = response.json()
self.token = result["token"]
self.refresh_token = result["refresh_token"]
return result
def _headers(self) -> Dict[str, str]:
"""Get authorization headers"""
if not self.token:
self.login()
return {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
def health_check(self) -> Dict[str, Any]:
"""Check server health"""
url = f"{self.base_url}/health"
response = requests.get(url)
response.raise_for_status()
return response.json()
def list_servers(self) -> list:
"""List all servers"""
url = f"{self.base_url}/v1/servers"
response = requests.get(url, headers=self._headers())
response.raise_for_status()
return response.json()
def create_server(self, workspace: str, provider: str, plan: str, hostname: str,
zone: Optional[str] = None, check_mode: bool = False,
tags: Optional[list] = None) -> Dict[str, Any]:
"""Create a new server"""
url = f"{self.base_url}/v1/servers/create"
data = {
"workspace": workspace,
"provider": provider,
"plan": plan,
"hostname": hostname,
"check_mode": check_mode
}
if zone:
data["zone"] = zone
if tags:
data["tags"] = tags
response = requests.post(url, headers=self._headers(), json=data)
response.raise_for_status()
return response.json()
def delete_server(self, server_id: str, workspace: str, force: bool = False) -> Dict[str, Any]:
"""Delete a server"""
url = f"{self.base_url}/v1/servers/{server_id}"
data = {
"workspace": workspace,
"server_id": server_id,
"force": force
}
response = requests.delete(url, headers=self._headers(), json=data)
response.raise_for_status()
return response.json()
def list_taskservs(self) -> list:
"""List all taskservs"""
url = f"{self.base_url}/v1/taskservs"
response = requests.get(url, headers=self._headers())
response.raise_for_status()
return response.json()
def create_taskserv(self, workspace: str, name: str, servers: list,
config: Optional[Dict] = None, check_mode: bool = False) -> Dict[str, Any]:
"""Create a new taskserv"""
url = f"{self.base_url}/v1/taskservs/create"
data = {
"workspace": workspace,
"name": name,
"servers": servers,
"check_mode": check_mode
}
if config:
data["config"] = config
response = requests.post(url, headers=self._headers(), json=data)
response.raise_for_status()
return response.json()
def submit_workflow(self, workspace: str, workflow_type: str,
parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Submit a workflow"""
url = f"{self.base_url}/v1/workflows/submit"
data = {
"workspace": workspace,
"workflow_type": workflow_type,
"parameters": parameters
}
response = requests.post(url, headers=self._headers(), json=data)
response.raise_for_status()
return response.json()
def get_operation(self, operation_id: str) -> Dict[str, Any]:
"""Get operation status"""
url = f"{self.base_url}/v1/operations/{operation_id}"
response = requests.get(url, headers=self._headers())
response.raise_for_status()
return response.json()
def wait_for_operation(self, operation_id: str, timeout: int = 300, interval: int = 5) -> Dict[str, Any]:
"""Wait for operation to complete"""
start_time = time.time()
while time.time() - start_time < timeout:
operation = self.get_operation(operation_id)
status = operation["status"]
if status == "completed":
return operation
elif status == "failed":
raise Exception(f"Operation failed: {operation.get('error')}")
elif status == "cancelled":
raise Exception("Operation was cancelled")
time.sleep(interval)
raise TimeoutError(f"Operation timed out after {timeout} seconds")
def list_operations(self) -> list:
"""List all operations"""
url = f"{self.base_url}/v1/operations"
response = requests.get(url, headers=self._headers())
response.raise_for_status()
return response.json()
def cancel_operation(self, operation_id: str) -> Dict[str, Any]:
"""Cancel an operation"""
url = f"{self.base_url}/v1/operations/{operation_id}/cancel"
response = requests.post(url, headers=self._headers())
response.raise_for_status()
return response.json()
def main():
"""Example usage"""
# Get configuration from environment
api_url = os.getenv("API_URL", "http://localhost:8083")
username = os.getenv("USERNAME", "admin")
password = os.getenv("PASSWORD", "admin123")
# Create client
client = ProvisioningClient(api_url, username, password)
# Health check
print("Checking server health...")
health = client.health_check()
print(f"Server status: {health['status']}")
print(f"Version: {health['version']}")
print()
# Login
print("Logging in...")
client.login()
print("Login successful")
print()
# List servers
print("Listing servers...")
servers = client.list_servers()
print(f"Found {len(servers)} servers")
for server in servers:
print(f" - {server.get('hostname', 'unknown')}")
print()
# Create server (check mode)
print("Creating server (check mode)...")
operation = client.create_server(
workspace="default",
provider="upcloud",
plan="1xCPU-2GB",
hostname="test-server",
zone="de-fra1",
check_mode=True,
tags=["test", "api"]
)
operation_id = operation["id"]
print(f"Operation ID: {operation_id}")
print()
# Wait for operation to complete
print("Waiting for operation to complete...")
try:
result = client.wait_for_operation(operation_id, timeout=60)
print(f"Operation completed: {result['status']}")
if result.get('result'):
print(f"Result: {json.dumps(result['result'], indent=2)}")
except Exception as e:
print(f"Error: {e}")
print()
# List operations
print("Listing recent operations...")
operations = client.list_operations()
print(f"Found {len(operations)} operations")
for op in operations[:5]: # Show last 5
print(f" - {op['id']}: {op['status']}")
print()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nInterrupted by user")
sys.exit(0)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)