272 lines
8.4 KiB
Python
Executable File
272 lines
8.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Provisioning API Python Client Example
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import json
|
|
import requests
|
|
from typing import Dict, Optional, Any
|
|
|
|
|
|
class ProvisioningClient:
|
|
"""Client for Provisioning API"""
|
|
|
|
def __init__(self, base_url: str = "http://localhost:8083", username: str = "admin", password: str = "admin123"):
|
|
self.base_url = base_url.rstrip('/')
|
|
self.username = username
|
|
self.password = password
|
|
self.token: Optional[str] = None
|
|
self.refresh_token: Optional[str] = None
|
|
|
|
def login(self) -> Dict[str, Any]:
|
|
"""Login and get access token"""
|
|
url = f"{self.base_url}/v1/auth/login"
|
|
data = {
|
|
"username": self.username,
|
|
"password": self.password
|
|
}
|
|
|
|
response = requests.post(url, json=data)
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
self.token = result["token"]
|
|
self.refresh_token = result["refresh_token"]
|
|
|
|
return result
|
|
|
|
def refresh_access_token(self) -> Dict[str, Any]:
|
|
"""Refresh access token"""
|
|
if not self.refresh_token:
|
|
raise ValueError("No refresh token available")
|
|
|
|
url = f"{self.base_url}/v1/auth/refresh"
|
|
data = {"refresh_token": self.refresh_token}
|
|
|
|
response = requests.post(url, json=data)
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
self.token = result["token"]
|
|
self.refresh_token = result["refresh_token"]
|
|
|
|
return result
|
|
|
|
def _headers(self) -> Dict[str, str]:
|
|
"""Get authorization headers"""
|
|
if not self.token:
|
|
self.login()
|
|
|
|
return {
|
|
"Authorization": f"Bearer {self.token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
def health_check(self) -> Dict[str, Any]:
|
|
"""Check server health"""
|
|
url = f"{self.base_url}/health"
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def list_servers(self) -> list:
|
|
"""List all servers"""
|
|
url = f"{self.base_url}/v1/servers"
|
|
response = requests.get(url, headers=self._headers())
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def create_server(self, workspace: str, provider: str, plan: str, hostname: str,
|
|
zone: Optional[str] = None, check_mode: bool = False,
|
|
tags: Optional[list] = None) -> Dict[str, Any]:
|
|
"""Create a new server"""
|
|
url = f"{self.base_url}/v1/servers/create"
|
|
data = {
|
|
"workspace": workspace,
|
|
"provider": provider,
|
|
"plan": plan,
|
|
"hostname": hostname,
|
|
"check_mode": check_mode
|
|
}
|
|
|
|
if zone:
|
|
data["zone"] = zone
|
|
if tags:
|
|
data["tags"] = tags
|
|
|
|
response = requests.post(url, headers=self._headers(), json=data)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def delete_server(self, server_id: str, workspace: str, force: bool = False) -> Dict[str, Any]:
|
|
"""Delete a server"""
|
|
url = f"{self.base_url}/v1/servers/{server_id}"
|
|
data = {
|
|
"workspace": workspace,
|
|
"server_id": server_id,
|
|
"force": force
|
|
}
|
|
|
|
response = requests.delete(url, headers=self._headers(), json=data)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def list_taskservs(self) -> list:
|
|
"""List all taskservs"""
|
|
url = f"{self.base_url}/v1/taskservs"
|
|
response = requests.get(url, headers=self._headers())
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def create_taskserv(self, workspace: str, name: str, servers: list,
|
|
config: Optional[Dict] = None, check_mode: bool = False) -> Dict[str, Any]:
|
|
"""Create a new taskserv"""
|
|
url = f"{self.base_url}/v1/taskservs/create"
|
|
data = {
|
|
"workspace": workspace,
|
|
"name": name,
|
|
"servers": servers,
|
|
"check_mode": check_mode
|
|
}
|
|
|
|
if config:
|
|
data["config"] = config
|
|
|
|
response = requests.post(url, headers=self._headers(), json=data)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def submit_workflow(self, workspace: str, workflow_type: str,
|
|
parameters: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Submit a workflow"""
|
|
url = f"{self.base_url}/v1/workflows/submit"
|
|
data = {
|
|
"workspace": workspace,
|
|
"workflow_type": workflow_type,
|
|
"parameters": parameters
|
|
}
|
|
|
|
response = requests.post(url, headers=self._headers(), json=data)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def get_operation(self, operation_id: str) -> Dict[str, Any]:
|
|
"""Get operation status"""
|
|
url = f"{self.base_url}/v1/operations/{operation_id}"
|
|
response = requests.get(url, headers=self._headers())
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def wait_for_operation(self, operation_id: str, timeout: int = 300, interval: int = 5) -> Dict[str, Any]:
|
|
"""Wait for operation to complete"""
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < timeout:
|
|
operation = self.get_operation(operation_id)
|
|
status = operation["status"]
|
|
|
|
if status == "completed":
|
|
return operation
|
|
elif status == "failed":
|
|
raise Exception(f"Operation failed: {operation.get('error')}")
|
|
elif status == "cancelled":
|
|
raise Exception("Operation was cancelled")
|
|
|
|
time.sleep(interval)
|
|
|
|
raise TimeoutError(f"Operation timed out after {timeout} seconds")
|
|
|
|
def list_operations(self) -> list:
|
|
"""List all operations"""
|
|
url = f"{self.base_url}/v1/operations"
|
|
response = requests.get(url, headers=self._headers())
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def cancel_operation(self, operation_id: str) -> Dict[str, Any]:
|
|
"""Cancel an operation"""
|
|
url = f"{self.base_url}/v1/operations/{operation_id}/cancel"
|
|
response = requests.post(url, headers=self._headers())
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
def main():
|
|
"""Example usage"""
|
|
# Get configuration from environment
|
|
api_url = os.getenv("API_URL", "http://localhost:8083")
|
|
username = os.getenv("USERNAME", "admin")
|
|
password = os.getenv("PASSWORD", "admin123")
|
|
|
|
# Create client
|
|
client = ProvisioningClient(api_url, username, password)
|
|
|
|
# Health check
|
|
print("Checking server health...")
|
|
health = client.health_check()
|
|
print(f"Server status: {health['status']}")
|
|
print(f"Version: {health['version']}")
|
|
print()
|
|
|
|
# Login
|
|
print("Logging in...")
|
|
client.login()
|
|
print("Login successful")
|
|
print()
|
|
|
|
# List servers
|
|
print("Listing servers...")
|
|
servers = client.list_servers()
|
|
print(f"Found {len(servers)} servers")
|
|
for server in servers:
|
|
print(f" - {server.get('hostname', 'unknown')}")
|
|
print()
|
|
|
|
# Create server (check mode)
|
|
print("Creating server (check mode)...")
|
|
operation = client.create_server(
|
|
workspace="default",
|
|
provider="upcloud",
|
|
plan="1xCPU-2GB",
|
|
hostname="test-server",
|
|
zone="de-fra1",
|
|
check_mode=True,
|
|
tags=["test", "api"]
|
|
)
|
|
operation_id = operation["id"]
|
|
print(f"Operation ID: {operation_id}")
|
|
print()
|
|
|
|
# Wait for operation to complete
|
|
print("Waiting for operation to complete...")
|
|
try:
|
|
result = client.wait_for_operation(operation_id, timeout=60)
|
|
print(f"Operation completed: {result['status']}")
|
|
if result.get('result'):
|
|
print(f"Result: {json.dumps(result['result'], indent=2)}")
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
print()
|
|
|
|
# List operations
|
|
print("Listing recent operations...")
|
|
operations = client.list_operations()
|
|
print(f"Found {len(operations)} operations")
|
|
for op in operations[:5]: # Show last 5
|
|
print(f" - {op['id']}: {op['status']}")
|
|
print()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
print("\nInterrupted by user")
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|