Files
sci-gui-agent-benchmark/desktop_env/envs/__init__.py
zdy023 c1a719d141 ver May1st
a temporary workround for Mac hosts.
2024-05-01 21:40:35 +08:00

344 lines
14 KiB
Python

import os
import platform
import random
import re
import subprocess
import threading
import uuid
import zipfile
from time import sleep
import logging
logger = logging.getLogger("desktopenv.envinit")
import psutil
import requests
from tqdm import tqdm
__version__ = "0.1.6"
MAX_RETRY_TIMES = 10
UBUNTU_ARM_URL = "https://huggingface.co/datasets/xlangai/ubuntu_arm/resolve/main/Ubuntu.zip"
UBUNTU_X86_URL = "https://huggingface.co/datasets/xlangai/ubuntu_x86/resolve/main/Ubuntu.zip"
REGISTRY_PATH = '.vms'
REGISTRY_IDX_PATH = ".vms_idx"
update_lock = threading.Lock()
class VirtualMachineManager:
def __init__(self, registry_path=REGISTRY_PATH, registry_idx_path=REGISTRY_IDX_PATH):
self.registry_path = registry_path
self.registry_idx_path = registry_idx_path
self.lock = threading.Lock()
self.initialize_registry()
def initialize_registry(self):
with self.lock: # Locking during initialization
if not os.path.exists(self.registry_path):
with open(self.registry_path, 'w') as file:
file.write('')
if not os.path.exists(self.registry_idx_path):
with open(self.registry_idx_path, 'w') as file:
file.write('0')
def add_vm(self, vm_path):
with self.lock:
with open(self.registry_path, 'r') as file:
lines = file.readlines()
new_lines = lines + [f'{vm_path}|free\n']
with open(self.registry_path, 'w') as file:
file.writelines(new_lines)
def occupy_vm(self, vm_path, pid):
with self.lock:
new_lines = []
with open(self.registry_path, 'r') as file:
lines = file.readlines()
for line in lines:
registered_vm_path, _ = line.strip().split('|')
if registered_vm_path == vm_path:
new_lines.append(f'{registered_vm_path}|{pid}\n')
else:
new_lines.append(line)
with open(self.registry_path, 'w') as file:
file.writelines(new_lines)
def release_vm(self, vm_path):
with self.lock: # Lock when modifying the registry
new_lines = []
with open(self.registry_path, 'r') as file:
lines = file.readlines()
for line in lines:
registered_vm_path, _ = line.strip().split('|')
if registered_vm_path != vm_path:
new_lines.append(line)
else:
new_lines.append(f'{registered_vm_path}|free\n')
with open(self.registry_path, 'w') as file:
file.writelines(new_lines)
def check_and_clean(self):
with self.lock: # Lock when cleaning up the registry
active_pids = {p.pid for p in psutil.process_iter()}
new_lines = []
with open(self.registry_path, 'r') as file:
lines = file.readlines()
for line in lines:
vm_path, pid_str = line.strip().split('|')
if pid_str == "free":
new_lines.append(line)
continue
if int(pid_str) in active_pids:
new_lines.append(line)
else:
new_lines.append(f'{vm_path}|free\n')
with open(self.registry_path, 'w') as file:
file.writelines(new_lines)
def list_vms(self):
with self.lock: # Lock when reading the registry
all_vms = []
with open(self.registry_path, 'r') as file:
lines = file.readlines()
for line in lines:
vm_path, pid_str = line.strip().split('|')
all_vms.append((vm_path, pid_str))
return all_vms
def list_free_vms(self):
with self.lock: # Lock when reading the registry
free_vms = []
with open(self.registry_path, 'r') as file:
lines = file.readlines()
for line in lines:
vm_path, pid_str = line.strip().split('|')
if pid_str == "free":
free_vms.append((vm_path, pid_str))
return free_vms
def generate_new_vm_name(self):
with self.lock: # Lock when generating a new path
with open(self.registry_idx_path, 'r') as file:
idx = int(file.read())
new_name = f"Ubuntu{idx}"
with open(self.registry_idx_path, 'w') as file:
file.write(str(idx + 1))
return new_name
def _update_vm(vmx_path, target_vm_name):
"""Update the VMX file with the new VM name and other parameters, so that the VM can be started successfully without conflict with the original VM."""
with update_lock:
dir_path, vmx_file = os.path.split(vmx_path)
def _generate_mac_address():
# VMware MAC address range starts with 00:0c:29
mac = [0x00, 0x0c, 0x29,
random.randint(0x00, 0x7f),
random.randint(0x00, 0xff),
random.randint(0x00, 0xff)]
return ':'.join(map(lambda x: "%02x" % x, mac))
# Backup the original file
with open(vmx_path, 'r') as file:
original_content = file.read()
# Generate new values
new_uuid_bios = str(uuid.uuid4())
new_uuid_location = str(uuid.uuid4())
new_mac_address = _generate_mac_address()
new_vmci_id = str(random.randint(-2147483648, 2147483647)) # Random 32-bit integer
# Update the content
updated_content = re.sub(r'displayName = ".*?"', f'displayName = "{target_vm_name}"', original_content)
updated_content = re.sub(r'uuid.bios = ".*?"', f'uuid.bios = "{new_uuid_bios}"', updated_content)
updated_content = re.sub(r'uuid.location = ".*?"', f'uuid.location = "{new_uuid_location}"', updated_content)
updated_content = re.sub(r'ethernet0.generatedAddress = ".*?"',
f'ethernet0.generatedAddress = "{new_mac_address}"',
updated_content)
updated_content = re.sub(r'vmci0.id = ".*?"', f'vmci0.id = "{new_vmci_id}"', updated_content)
# Write the updated content back to the file
with open(vmx_path, 'w') as file:
file.write(updated_content)
print(".vmx file updated successfully.")
vmx_file_base_name = os.path.splitext(vmx_file)[0]
assert vmx_file == "Ubuntu.vmx", "The VMX file should be named 'Ubuntu.vmx'."
files_to_rename = ['vmx', 'nvram', 'vmsd', 'vmxf']
for ext in files_to_rename:
original_file = os.path.join(dir_path, f"{vmx_file_base_name}.{ext}")
target_file = os.path.join(dir_path, f"{target_vm_name}.{ext}")
os.rename(original_file, target_file)
# Update the dir_path to the target vm_name, only replace the last character
# Split the path into parts up to the last folder
path_parts = dir_path.rstrip(os.sep).split(os.sep)
path_parts[-1] = target_vm_name
target_dir_path = os.sep.join(path_parts)
os.rename(dir_path, target_dir_path)
print("VM files renamed successfully.")
def _install_virtual_machine(vm_name, working_dir="./vm_data", downloaded_file_name="Ubuntu.zip", original_vm_name="Ubuntu"):
os.makedirs(working_dir, exist_ok=True)
def __download_and_unzip_vm():
# Determine the platform and CPU architecture to decide the correct VM image to download
if platform.system() == 'Darwin': # macOS
#if os.uname().machine == 'arm64': # Apple Silicon
#url = UBUNTU_ARM_URL
#else:
#url = UBUNTU_X86_URL
url = UBUNTU_ARM_URL # a workout as most new macs are using apple silicon and they are frequently misrecognized as x86_64
logger.warning("Your platform is temporarily considered to be ARM. If this is a mistake, please manually replace the downloaded VM under ./vm_data with the VM of x86 version from %s", UBUNTU_X86_URL)
elif platform.machine().lower() in ['amd64', 'x86_64']:
url = UBUNTU_X86_URL
else:
raise Exception("Unsupported platform or architecture")
# Download the virtual machine image
print("Downloading the virtual machine image...")
downloaded_size = 0
while True:
downloaded_file_path = os.path.join(working_dir, downloaded_file_name)
headers = {}
if os.path.exists(downloaded_file_path):
downloaded_size = os.path.getsize(downloaded_file_path)
headers["Range"] = f"bytes={downloaded_size}-"
with requests.get(url, headers=headers, stream=True) as response:
if response.status_code == 416:
# This means the range was not satisfiable, possibly the file was fully downloaded
print("Fully downloaded or the file sized changed.")
break
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
with open(downloaded_file_path, "ab") as file, tqdm(
desc="Progress",
total=total_size,
unit='iB',
unit_scale=True,
unit_divisor=1024,
initial=downloaded_size,
ascii=True
) as progress_bar:
try:
for data in response.iter_content(chunk_size=1024):
size = file.write(data)
progress_bar.update(size)
except (requests.exceptions.RequestException, IOError) as e:
print(f"Download error: {e}")
sleep(1) # Wait for 1 second before retrying
print("Retrying...")
else:
print("Download succeeds.")
break # Download completed successfully
# Unzip the downloaded file
print("Unzipping the downloaded file...☕️")
with zipfile.ZipFile(downloaded_file_path, 'r') as zip_ref:
zip_ref.extractall(os.path.join(working_dir, vm_name))
print("Files have been successfully extracted to the directory:", os.path.join(working_dir, vm_name))
vm_path = os.path.join(working_dir, vm_name, vm_name, vm_name + ".vmx")
# Execute the function to download and unzip the VM, and update the vm metadata
if not os.path.exists(vm_path):
__download_and_unzip_vm()
_update_vm(os.path.join(working_dir, vm_name, original_vm_name, original_vm_name + ".vmx"), vm_name)
else:
print(f"Virtual machine exists: {vm_path}")
# Determine the platform of the host machine and decide the parameter for vmrun
def get_vmrun_type():
if platform.system() == 'Windows' or platform.system() == 'Linux':
return '-T ws'
elif platform.system() == 'Darwin': # Darwin is the system name for macOS
return '-T fusion'
else:
raise Exception("Unsupported operating system")
# Start the virtual machine
subprocess.run(f'vmrun {get_vmrun_type()} start "{vm_path}" nogui', shell=True)
print("Starting virtual machine...")
# Get the IP address of the virtual machine
for i in range(MAX_RETRY_TIMES):
get_vm_ip = subprocess.run(f'vmrun {get_vmrun_type()} getGuestIPAddress "{vm_path}" -wait', shell=True,
capture_output=True,
text=True)
if "Error" in get_vm_ip.stdout:
print("Retry on getting IP")
continue
print("Virtual machine IP address:", get_vm_ip.stdout.strip())
break
vm_ip = get_vm_ip.stdout.strip()
def is_url_accessible(url, timeout=1):
try:
response = requests.head(url, timeout=timeout)
return response.status_code == 200
except requests.exceptions.RequestException:
return False
url = f"http://{vm_ip}:5000/screenshot"
check_url = is_url_accessible(url)
# Function used to check whether the virtual machine is ready
def download_screenshot(ip):
url = f"http://{ip}:5000/screenshot"
try:
# max trey times 1, max timeout 1
response = requests.get(url, timeout=(10, 10))
if response.status_code == 200:
return True
except Exception as e:
print(f"Error: {e}")
print(f"Type: {type(e).__name__}")
print(f"Error detail: {str(e)}")
sleep(2)
return False
# Try downloading the screenshot until successful
while not download_screenshot(vm_ip):
print("Check whether the virtual machine is ready...")
print("Virtual machine is ready. Start to make a snapshot on the virtual machine. It would take a while...")
# Create a snapshot of the virtual machine
subprocess.run(f'vmrun {get_vmrun_type()} snapshot "{vm_path}" "init_state"', shell=True)
print("Snapshot created.")
return vm_path
def _get_vm_path():
vm_manager = VirtualMachineManager(REGISTRY_PATH)
vm_manager.check_and_clean()
free_vms_paths = vm_manager.list_free_vms()
if len(free_vms_paths) == 0:
# No free virtual machine available, generate a new one
print("No free virtual machine available. Generating a new one, which would take a while...☕")
new_vm_name = vm_manager.generate_new_vm_name()
new_vm_path = _install_virtual_machine(new_vm_name)
vm_manager.add_vm(new_vm_path)
vm_manager.occupy_vm(new_vm_path, os.getpid())
return new_vm_path
else:
# Choose the first free virtual machine
chosen_vm_path = free_vms_paths[0][0]
vm_manager.occupy_vm(chosen_vm_path, os.getpid())
return chosen_vm_path