import os import platform import random import re import subprocess import threading import uuid import zipfile from time import sleep import psutil import requests from tqdm import tqdm __version__ = "0.1.6" MAX_RETRY_TIMES = 10 UBUNTU_ARM_URL = "https://huggingface.co/datasets/xlangai/ubuntu_arm/resolve/main/Ubuntu.zip" UBUNTU_X86_URL = "https://huggingface.co/datasets/xlangai/ubuntu_x86/resolve/main/Ubuntu.zip" REGISTRY_PATH = '.vms' REGISTRY_IDX_PATH = ".vms_idx" update_lock = threading.Lock() class VirtualMachineManager: def __init__(self, registry_path=REGISTRY_PATH, registry_idx_path=REGISTRY_IDX_PATH): self.registry_path = registry_path self.registry_idx_path = registry_idx_path self.lock = threading.Lock() self.initialize_registry() def initialize_registry(self): with self.lock: # Locking during initialization if not os.path.exists(self.registry_path): with open(self.registry_path, 'w') as file: file.write('') if not os.path.exists(self.registry_idx_path): with open(self.registry_idx_path, 'w') as file: file.write('0') def add_vm(self, vm_path): with self.lock: with open(self.registry_path, 'r') as file: lines = file.readlines() new_lines = lines + [f'{vm_path}|free\n'] with open(self.registry_path, 'w') as file: file.writelines(new_lines) def occupy_vm(self, vm_path, pid): with self.lock: new_lines = [] with open(self.registry_path, 'r') as file: lines = file.readlines() for line in lines: registered_vm_path, _ = line.strip().split('|') if registered_vm_path == vm_path: new_lines.append(f'{registered_vm_path}|{pid}\n') else: new_lines.append(line) with open(self.registry_path, 'w') as file: file.writelines(new_lines) def release_vm(self, vm_path): with self.lock: # Lock when modifying the registry new_lines = [] with open(self.registry_path, 'r') as file: lines = file.readlines() for line in lines: registered_vm_path, _ = line.strip().split('|') if registered_vm_path != vm_path: new_lines.append(line) else: new_lines.append(f'{registered_vm_path}|free\n') with open(self.registry_path, 'w') as file: file.writelines(new_lines) def check_and_clean(self): with self.lock: # Lock when cleaning up the registry active_pids = {p.pid for p in psutil.process_iter()} new_lines = [] with open(self.registry_path, 'r') as file: lines = file.readlines() for line in lines: vm_path, pid_str = line.strip().split('|') if pid_str == "free": new_lines.append(line) continue if int(pid_str) in active_pids: new_lines.append(line) else: new_lines.append(f'{vm_path}|free\n') with open(self.registry_path, 'w') as file: file.writelines(new_lines) def list_vms(self): with self.lock: # Lock when reading the registry all_vms = [] with open(self.registry_path, 'r') as file: lines = file.readlines() for line in lines: vm_path, pid_str = line.strip().split('|') all_vms.append((vm_path, pid_str)) return all_vms def list_free_vms(self): with self.lock: # Lock when reading the registry free_vms = [] with open(self.registry_path, 'r') as file: lines = file.readlines() for line in lines: vm_path, pid_str = line.strip().split('|') if pid_str == "free": free_vms.append((vm_path, pid_str)) return free_vms def generate_new_vm_name(self): with self.lock: # Lock when generating a new path with open(self.registry_idx_path, 'r') as file: idx = int(file.read()) new_name = f"Ubuntu{idx}" with open(self.registry_idx_path, 'w') as file: file.write(str(idx + 1)) return new_name def _update_vm(vmx_path, target_vm_name): """Update the VMX file with the new VM name and other parameters, so that the VM can be started successfully without conflict with the original VM.""" with update_lock: dir_path, vmx_file = os.path.split(vmx_path) def _generate_mac_address(): # VMware MAC address range starts with 00:0c:29 mac = [0x00, 0x0c, 0x29, random.randint(0x00, 0x7f), random.randint(0x00, 0xff), random.randint(0x00, 0xff)] return ':'.join(map(lambda x: "%02x" % x, mac)) # Backup the original file with open(vmx_path, 'r') as file: original_content = file.read() # Generate new values new_uuid_bios = str(uuid.uuid4()) new_uuid_location = str(uuid.uuid4()) new_mac_address = _generate_mac_address() new_vmci_id = str(random.randint(-2147483648, 2147483647)) # Random 32-bit integer # Update the content updated_content = re.sub(r'displayName = ".*?"', f'displayName = "{target_vm_name}"', original_content) updated_content = re.sub(r'uuid.bios = ".*?"', f'uuid.bios = "{new_uuid_bios}"', updated_content) updated_content = re.sub(r'uuid.location = ".*?"', f'uuid.location = "{new_uuid_location}"', updated_content) updated_content = re.sub(r'ethernet0.generatedAddress = ".*?"', f'ethernet0.generatedAddress = "{new_mac_address}"', updated_content) updated_content = re.sub(r'vmci0.id = ".*?"', f'vmci0.id = "{new_vmci_id}"', updated_content) # Write the updated content back to the file with open(vmx_path, 'w') as file: file.write(updated_content) print(".vmx file updated successfully.") vmx_file_base_name = os.path.splitext(vmx_file)[0] assert vmx_file == "Ubuntu.vmx", "The VMX file should be named 'Ubuntu.vmx'." files_to_rename = ['vmx', 'nvram', 'vmsd', 'vmxf'] for ext in files_to_rename: original_file = os.path.join(dir_path, f"{vmx_file_base_name}.{ext}") target_file = os.path.join(dir_path, f"{target_vm_name}.{ext}") os.rename(original_file, target_file) # Update the dir_path to the target vm_name, only replace the last character # Split the path into parts up to the last folder path_parts = dir_path.rstrip(os.sep).split(os.sep) path_parts[-1] = target_vm_name target_dir_path = os.sep.join(path_parts) os.rename(dir_path, target_dir_path) print("VM files renamed successfully.") def _install_virtual_machine(vm_name, working_dir="./vm_data", downloaded_file_name="Ubuntu.zip", original_vm_name="Ubuntu"): os.makedirs(working_dir, exist_ok=True) def __download_and_unzip_vm(): # Determine the platform and CPU architecture to decide the correct VM image to download if platform.machine() == 'arm64': # macOS with Apple Silicon url = UBUNTU_ARM_URL elif platform.machine().lower() in ['amd64', "x86_64"]: url = UBUNTU_X86_URL else: raise Exception("Unsupported platform or architecture") # Download the virtual machine image print("Downloading the virtual machine image...") downloaded_size = 0 while True: downloaded_file_path = os.path.join(working_dir, downloaded_file_name) headers = {} if os.path.exists(downloaded_file_path): downloaded_size = os.path.getsize(downloaded_file_path) headers["Range"] = f"bytes={downloaded_size}-" with requests.get(url, headers=headers, stream=True) as response: if response.status_code == 416: # This means the range was not satisfiable, possibly the file was fully downloaded print("Fully downloaded or the file sized changed.") break response.raise_for_status() total_size = int(response.headers.get('content-length', 0)) with open(downloaded_file_path, "ab") as file, tqdm( desc="Progress", total=total_size, unit='iB', unit_scale=True, unit_divisor=1024, initial=downloaded_size, ascii=True ) as progress_bar: try: for data in response.iter_content(chunk_size=1024): size = file.write(data) progress_bar.update(size) except (requests.exceptions.RequestException, IOError) as e: print(f"Download error: {e}") sleep(1) # Wait for 1 second before retrying print("Retrying...") else: print("Download succeeds.") break # Download completed successfully # Unzip the downloaded file print("Unzipping the downloaded file...☕️") with zipfile.ZipFile(downloaded_file_path, 'r') as zip_ref: zip_ref.extractall(os.path.join(working_dir, vm_name)) print("Files have been successfully extracted to the directory:", os.path.join(working_dir, vm_name)) vm_path = os.path.join(working_dir, vm_name, vm_name, vm_name + ".vmx") # Execute the function to download and unzip the VM, and update the vm metadata if not os.path.exists(vm_path): __download_and_unzip_vm() _update_vm(os.path.join(working_dir, vm_name, original_vm_name, original_vm_name + ".vmx"), vm_name) else: print(f"Virtual machine exists: {vm_path}") # Determine the platform of the host machine and decide the parameter for vmrun def get_vmrun_type(): if platform.system() == 'Windows' or platform.system() == 'Linux': return '-T ws' elif platform.system() == 'Darwin': # Darwin is the system name for macOS return '-T fusion' else: raise Exception("Unsupported operating system") # Start the virtual machine subprocess.run(f'vmrun {get_vmrun_type()} start "{vm_path}" nogui', shell=True) print("Starting virtual machine...") # Get the IP address of the virtual machine for i in range(MAX_RETRY_TIMES): get_vm_ip = subprocess.run(f'vmrun {get_vmrun_type()} getGuestIPAddress "{vm_path}" -wait', shell=True, capture_output=True, text=True) if "Error" in get_vm_ip.stdout: print("Retry on getting IP") continue print("Virtual machine IP address:", get_vm_ip.stdout.strip()) break vm_ip = get_vm_ip.stdout.strip() def is_url_accessible(url, timeout=1): try: response = requests.head(url, timeout=timeout) return response.status_code == 200 except requests.exceptions.RequestException: return False url = f"http://{vm_ip}:5000/screenshot" check_url = is_url_accessible(url) # Function used to check whether the virtual machine is ready def download_screenshot(ip): url = f"http://{ip}:5000/screenshot" try: # max trey times 1, max timeout 1 response = requests.get(url, timeout=(1, 1)) if response.status_code == 200: return True except Exception as e: print(f"Error: {e}") print(f"Type: {type(e).__name__}") print(f"Error detail: {str(e)}") sleep(2) return False # Try downloading the screenshot until successful while not download_screenshot(vm_ip): print("Check whether the virtual machine is ready...") print("Virtual machine is ready. Start to make a snapshot on the virtual machine. It would take a while...") # Create a snapshot of the virtual machine subprocess.run(f'vmrun {get_vmrun_type()} snapshot "{vm_path}" "init_state"', shell=True) print("Snapshot created.") return vm_path def _get_vm_path(): vm_manager = VirtualMachineManager(REGISTRY_PATH) vm_manager.check_and_clean() free_vms_paths = vm_manager.list_free_vms() if len(free_vms_paths) == 0: # No free virtual machine available, generate a new one print("No free virtual machine available. Generating a new one, which would take a while...☕") new_vm_name = vm_manager.generate_new_vm_name() new_vm_path = _install_virtual_machine(new_vm_name) vm_manager.add_vm(new_vm_path) vm_manager.occupy_vm(new_vm_path, os.getpid()) return new_vm_path else: # Choose the first free virtual machine chosen_vm_path = free_vms_paths[0][0] vm_manager.occupy_vm(chosen_vm_path, os.getpid()) return chosen_vm_path