import logging import os import subprocess import time import requests from desktop_env.providers.base import Provider logger = logging.getLogger("desktopenv.providers.proxmox.ProxmoxProvider") logger.setLevel(logging.INFO) WAIT_TIME = 5 RETRY_INTERVAL = 3 MAX_WAIT_READY = 300 # seconds to wait for VM HTTP server to be ready class ProxmoxProvider(Provider): """ Proxmox VE provider that manages VMs via SSH to the Proxmox host, executing `qm` commands for VM lifecycle management. Configuration via environment variables: PROXMOX_SSH_HOST: SSH target (default: root@10.10.17.3) PROXMOX_VM_IP: Fallback VM IP if guest agent is unavailable (default: 10.10.17.10) """ def __init__(self, region: str = None): super().__init__(region) self.ssh_host = os.environ.get("PROXMOX_SSH_HOST", "root@10.10.17.3") self.vm_ip_fallback = os.environ.get("PROXMOX_VM_IP", "10.10.17.10") self._vm_ip_cache = None def _ssh_exec(self, command: str, timeout: int = 120, check: bool = True) -> str: """Execute a command on the Proxmox host via SSH. Args: command: The command to run on the remote host. timeout: Timeout in seconds. check: If True, raise on non-zero exit code. Returns: stdout output as a stripped string. """ ssh_cmd = [ "ssh", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=10", "-o", "BatchMode=yes", self.ssh_host, command, ] logger.debug(f"SSH exec: {' '.join(ssh_cmd)}") try: result = subprocess.run( ssh_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding="utf-8", timeout=timeout, ) if check and result.returncode != 0: logger.error(f"SSH command failed (rc={result.returncode}): {result.stderr.strip()}") return result.stdout.strip() except subprocess.TimeoutExpired: logger.error(f"SSH command timed out after {timeout}s: {command}") return "" except Exception as e: logger.error(f"SSH execution error: {e}") return "" def _get_vm_status(self, vmid: str) -> str: """Get the current status of a VM (e.g. 'running', 'stopped').""" output = self._ssh_exec(f"qm status {vmid}") # output format: "status: running" if ":" in output: return output.split(":", 1)[1].strip() return output.strip() def _wait_for_status(self, vmid: str, target_status: str, timeout: int = 120): """Poll VM status until it matches target_status.""" start = time.time() while time.time() - start < timeout: status = self._get_vm_status(vmid) logger.info(f"VM {vmid} status: {status} (waiting for {target_status})") if status == target_status: return True time.sleep(RETRY_INTERVAL) logger.error(f"VM {vmid} did not reach status '{target_status}' within {timeout}s") return False def _wait_for_vm_ready(self, vm_ip: str, server_port: int = 5000, timeout: int = MAX_WAIT_READY): """Poll the VM's HTTP server until it responds with a screenshot.""" start = time.time() url = f"http://{vm_ip}:{server_port}/screenshot" while time.time() - start < timeout: try: response = requests.get(url, timeout=(10, 10)) if response.status_code == 200: logger.info(f"VM HTTP server is ready at {url}") return True except Exception: pass logger.info(f"Waiting for VM HTTP server at {url}...") time.sleep(RETRY_INTERVAL) logger.error(f"VM HTTP server at {url} not ready within {timeout}s") return False def start_emulator(self, path_to_vm: str, headless: bool, os_type: str = "Windows"): """Start the Proxmox VM. Args: path_to_vm: The VM ID as a string (e.g. "102"). headless: Ignored for Proxmox (VMs are always headless on server). os_type: OS type of the VM. """ vmid = path_to_vm logger.info(f"Starting Proxmox VM {vmid}...") print(f"Starting Proxmox VM {vmid}...") status = self._get_vm_status(vmid) if status == "running": logger.info(f"VM {vmid} is already running.") else: self._ssh_exec(f"qm start {vmid}") if not self._wait_for_status(vmid, "running", timeout=120): raise RuntimeError(f"Failed to start VM {vmid}") # Wait for Flask HTTP server inside VM to be ready vm_ip = self._resolve_vm_ip(vmid) self._wait_for_vm_ready(vm_ip) def _resolve_vm_ip(self, vmid: str) -> str: """Try to get VM IP via QEMU Guest Agent, fall back to env var.""" if self._vm_ip_cache: return self._vm_ip_cache # Try QEMU Guest Agent try: output = self._ssh_exec( f"qm guest cmd {vmid} network-get-interfaces", timeout=15, check=False, ) if output and "ip-address" in output: import json interfaces = json.loads(output) for iface in interfaces: for addr in iface.get("ip-addresses", []): ip = addr.get("ip-address", "") # Skip loopback and IPv6 link-local if ip and not ip.startswith("127.") and not ip.startswith("fe80") and ":" not in ip: logger.info(f"Got VM {vmid} IP from guest agent: {ip}") self._vm_ip_cache = ip return ip except Exception as e: logger.debug(f"Guest agent query failed: {e}") # Fallback to env var / default logger.info(f"Using fallback VM IP: {self.vm_ip_fallback}") self._vm_ip_cache = self.vm_ip_fallback return self.vm_ip_fallback def get_ip_address(self, path_to_vm: str) -> str: """Return the VM's IP address. Args: path_to_vm: The VM ID as a string. Returns: IP address string (e.g. "10.10.17.10"). """ vmid = path_to_vm return self._resolve_vm_ip(vmid) def save_state(self, path_to_vm: str, snapshot_name: str): """Create a snapshot of the VM. Args: path_to_vm: The VM ID. snapshot_name: Name for the snapshot. """ vmid = path_to_vm logger.info(f"Creating snapshot '{snapshot_name}' for VM {vmid}...") self._ssh_exec(f"qm snapshot {vmid} {snapshot_name}", timeout=120) time.sleep(WAIT_TIME) logger.info(f"Snapshot '{snapshot_name}' created for VM {vmid}.") def revert_to_snapshot(self, path_to_vm: str, snapshot_name: str): """Revert the VM to a snapshot and restart it. Args: path_to_vm: The VM ID. snapshot_name: Name of the snapshot to revert to. Returns: The VM ID (path_to_vm). """ vmid = path_to_vm logger.info(f"Reverting VM {vmid} to snapshot '{snapshot_name}'...") # Stop VM first if running status = self._get_vm_status(vmid) if status == "running": self._ssh_exec(f"qm stop {vmid}", timeout=60) self._wait_for_status(vmid, "stopped", timeout=60) # Rollback to snapshot self._ssh_exec(f"qm rollback {vmid} {snapshot_name}", timeout=120) time.sleep(WAIT_TIME) # Clear IP cache since IP might change after rollback self._vm_ip_cache = None logger.info(f"VM {vmid} reverted to snapshot '{snapshot_name}'.") return path_to_vm def stop_emulator(self, path_to_vm: str, region=None, *args, **kwargs): """Stop the VM. Args: path_to_vm: The VM ID. """ vmid = path_to_vm logger.info(f"Stopping Proxmox VM {vmid}...") status = self._get_vm_status(vmid) if status == "stopped": logger.info(f"VM {vmid} is already stopped.") return self._ssh_exec(f"qm stop {vmid}", timeout=60) self._wait_for_status(vmid, "stopped", timeout=60) self._vm_ip_cache = None logger.info(f"VM {vmid} stopped.")