diff --git a/desktop_env/controllers/python.py b/desktop_env/controllers/python.py index a3b19c5..c572083 100644 --- a/desktop_env/controllers/python.py +++ b/desktop_env/controllers/python.py @@ -12,9 +12,10 @@ logger = logging.getLogger("desktopenv.pycontroller") class PythonController: def __init__(self, vm_ip: str, + server_port: int, pkgs_prefix: str = "import pyautogui; import time; pyautogui.FAILSAFE = False; {command}"): self.vm_ip = vm_ip - self.http_server = f"http://{vm_ip}:5000" + self.http_server = f"http://{vm_ip}:{server_port}" self.pkgs_prefix = pkgs_prefix # fixme: this is a hacky way to execute python commands. fix it and combine it with installation of packages self.retry_times = 3 self.retry_interval = 5 diff --git a/desktop_env/controllers/setup.py b/desktop_env/controllers/setup.py index b4ae4b2..dfac4b3 100644 --- a/desktop_env/controllers/setup.py +++ b/desktop_env/controllers/setup.py @@ -28,10 +28,10 @@ FILE_PATH = os.path.dirname(os.path.abspath(__file__)) class SetupController: - def __init__(self, vm_ip: str, cache_dir: str): + def __init__(self, vm_ip: str, server_port: int, cache_dir: str): self.vm_ip: str = vm_ip - self.http_server: str = f"http://{vm_ip}:5000" - self.http_server_setup_root: str = f"http://{vm_ip}:5000/setup" + self.http_server: str = f"http://{vm_ip}:{server_port}" + self.http_server_setup_root: str = f"http://{vm_ip}:{server_port}/setup" self.cache_dir: str = cache_dir def reset_cache_dir(self, cache_dir: str): diff --git a/desktop_env/desktop_env.py b/desktop_env/desktop_env.py index e02eacf..a03ebb2 100644 --- a/desktop_env/desktop_env.py +++ b/desktop_env/desktop_env.py @@ -26,7 +26,7 @@ class DesktopEnv(gym.Env): def __init__( self, - provider_name: str = "vmware", + provider_name: str = "docker", region: str = None, path_to_vm: str = None, snapshot_name: str = "init_state", @@ -54,6 +54,11 @@ class DesktopEnv(gym.Env): """ # Initialize VM manager and vitualization provider self.region = region + + # Default + self.server_port = 5000 + self.chromium_port = 9222 + self.vnc_port = 8006 self.manager, self.provider = create_vm_manager_and_provider(provider_name, region) self.os_type = os_type @@ -92,9 +97,14 @@ class DesktopEnv(gym.Env): self.provider.start_emulator(self.path_to_vm, self.headless, self.os_type) # Get the ip from the virtual machine, and setup the controller - self.vm_ip = self.provider.get_ip_address(self.path_to_vm) + vm_ip_ports = self.provider.get_ip_address(self.path_to_vm).split(':') + self.vm_ip = vm_ip_ports[0] + if len(vm_ip_ports) > 1: + self.server_port = int(vm_ip_ports[1]) + self.chromium_port = int(vm_ip_ports[2]) + self.vnc_port = int(vm_ip_ports[3]) self.controller = PythonController(vm_ip=self.vm_ip) - self.setup_controller = SetupController(vm_ip=self.vm_ip, cache_dir=self.cache_dir_base) + self.setup_controller = SetupController(vm_ip=self.vm_ip, server_port=self.server_port, cache_dir=self.cache_dir_base) def _revert_to_snapshot(self): # Revert to certain snapshot of the virtual machine, and refresh the path to vm and ip of vm diff --git a/desktop_env/evaluators/getters/general.py b/desktop_env/evaluators/getters/general.py index 81ad69b..d5d965c 100644 --- a/desktop_env/evaluators/getters/general.py +++ b/desktop_env/evaluators/getters/general.py @@ -23,7 +23,7 @@ def get_vm_command_line(env, config: Dict[str, str]): def get_vm_command_error(env, config: Dict[str, str]): vm_ip = env.vm_ip - port = 5000 + port = env.server_port command = config["command"] shell = config.get("shell", False) diff --git a/desktop_env/providers/docker/manager.py b/desktop_env/providers/docker/manager.py index 86fba12..36dfbf1 100644 --- a/desktop_env/providers/docker/manager.py +++ b/desktop_env/providers/docker/manager.py @@ -25,7 +25,7 @@ logger.setLevel(logging.INFO) MAX_RETRY_TIMES = 10 RETRY_INTERVAL = 5 -UBUNTU_X86_URL = "https://huggingface.co/datasets/xlangai/ubuntu_osworld/resolve/main/Ubuntu.qcow2" +UBUNTU_X86_URL = "docker-osworld-x86" # Determine the platform and CPU architecture to decide the correct VM image to download # if platform.system() == 'Darwin': # macOS @@ -41,289 +41,65 @@ UBUNTU_X86_URL = "https://huggingface.co/datasets/xlangai/ubuntu_osworld/resolve URL = UBUNTU_X86_URL DOWNLOADED_FILE_NAME = URL.split('/')[-1] -REGISTRY_PATH = '.docker_vms' -LOCK_FILE_NAME = '.docker_lck' -VMS_DIR = "./docker_vm_data" -update_lock = threading.Lock() if platform.system() == 'Windows': docker_path = r"C:\Program Files\Docker\Docker" os.environ["PATH"] += os.pathsep + docker_path -def generate_new_vm_name(vms_dir, os_type): - registry_idx = 0 - prefix = os_type +UBUNTU_X86_URL = "https://huggingface.co/datasets/xlangai/ubuntu_osworld/resolve/main/Ubuntu.qcow2" +VMS_DIR = "./vmware_vm_data" + +def __download_vm(vms_dir: str): + # Download the virtual machine image + logger.info("Downloading the virtual machine image...") + downloaded_size = 0 + + URL = UBUNTU_X86_URL + DOWNLOADED_FILE_NAME = URL.split('/')[-1] + downloaded_file_name = DOWNLOADED_FILE_NAME + while True: - attempted_new_name = f"{prefix}{registry_idx}" - if os.path.exists( - os.path.join(vms_dir, attempted_new_name, attempted_new_name + ".qcow2")): - registry_idx += 1 - else: - return attempted_new_name + downloaded_file_path = os.path.join(vms_dir, downloaded_file_name) + headers = {} + if os.path.exists(downloaded_file_path): + downloaded_size = os.path.getsize(downloaded_file_path) + headers["Range"] = f"bytes={downloaded_size}-" -def _install_vm(vm_name, vms_dir, downloaded_file_name, os_type, original_vm_name="Ubuntu"): - os.makedirs(vms_dir, exist_ok=True) + with requests.get(URL, headers=headers, stream=True) as response: + if response.status_code == 416: + # This means the range was not satisfiable, possibly the file was fully downloaded + logger.info("Fully downloaded or the file size changed.") + break - def __download_and_unzip_vm(): - # Download the virtual machine image - logger.info("Downloading the virtual machine image...") - downloaded_size = 0 - - if os_type == "Ubuntu": - if platform.system() == 'Darwin': - URL = UBUNTU_X86_URL - elif platform.machine().lower() in ['amd64', 'x86_64']: - URL = UBUNTU_X86_URL - elif os_type == "Windows": - if platform.machine().lower() in ['amd64', 'x86_64']: - URL = WINDOWS_X86_URL - DOWNLOADED_FILE_NAME = URL.split('/')[-1] - downloaded_file_name = DOWNLOADED_FILE_NAME - - while True: - downloaded_file_path = os.path.join(vms_dir, downloaded_file_name) - headers = {} - if os.path.exists(downloaded_file_path): - downloaded_size = os.path.getsize(downloaded_file_path) - headers["Range"] = f"bytes={downloaded_size}-" - - with requests.get(URL, headers=headers, stream=True) as response: - if response.status_code == 416: - # This means the range was not satisfiable, possibly the file was fully downloaded - logger.info("Fully downloaded or the file size changed.") - break - - response.raise_for_status() - total_size = int(response.headers.get('content-length', 0)) - - with open(downloaded_file_path, "ab") as file, tqdm( - desc="Progress", - total=total_size, - unit='iB', - unit_scale=True, - unit_divisor=1024, - initial=downloaded_size, - ascii=True - ) as progress_bar: - try: - for data in response.iter_content(chunk_size=1024): - size = file.write(data) - progress_bar.update(size) - except (requests.exceptions.RequestException, IOError) as e: - logger.error(f"Download error: {e}") - sleep(RETRY_INTERVAL) - logger.error("Retrying...") - else: - logger.info("Download succeeds.") - break # Download completed successfully - - vm_path = os.path.join(vms_dir, vm_name, vm_name + ".qcow2") - - # Execute the function to download and unzip the VM, and update the vm metadata - if not os.path.exists(vm_path): - __download_and_unzip_vm() - else: - logger.info(f"Virtual machine exists: {vm_path}") - - # Start the virtual machine - def start_vm(vm_path, max_retries=20): - pass - - if not start_vm(vm_path): - raise ValueError("Error encountered during installation, please rerun the code for retrying.") - - def get_vm_ip_and_port(vm_path, max_retries=20): - pass - - vm_ip, vm_port = get_vm_ip_and_port(vm_path) - if not vm_ip: - raise ValueError("Error encountered during installation, please rerun the code for retrying.") - - # Function used to check whether the virtual machine is ready - def download_screenshot(ip, port): - url = f"http://{ip}:{port}/screenshot" - try: - # max trey times 1, max timeout 1 - response = requests.get(url, timeout=(10, 10)) - if response.status_code == 200: - return True - except Exception as e: - logger.error(f"Error: {e}") - logger.error(f"Type: {type(e).__name__}") - logger.error(f"Error detail: {str(e)}") - sleep(RETRY_INTERVAL) - return False - - # Try downloading the screenshot until successful - while not download_screenshot(vm_ip, vm_port): - logger.info("Check whether the virtual machine is ready...") - - logger.info("Virtual machine is ready. Start to make a snapshot on the virtual machine. It would take a while...") + response.raise_for_status() + total_size = int(response.headers.get('content-length', 0)) + with open(downloaded_file_path, "ab") as file, tqdm( + desc="Progress", + total=total_size, + unit='iB', + unit_scale=True, + unit_divisor=1024, + initial=downloaded_size, + ascii=True + ) as progress_bar: + try: + for data in response.iter_content(chunk_size=1024): + size = file.write(data) + progress_bar.update(size) + except (requests.exceptions.RequestException, IOError) as e: + logger.error(f"Download error: {e}") + sleep(RETRY_INTERVAL) + logger.error("Retrying...") + else: + logger.info("Download succeeds.") + break # Download completed successfully class DockerVMManager(VMManager): - def __init__(self, registry_path=REGISTRY_PATH): - self.registry_path = registry_path - self.lock = FileLock(LOCK_FILE_NAME, timeout=60) - self.initialize_registry() - self.client = docker.from_env() + def __init__(self, registry_path=""): + pass - def initialize_registry(self): - with self.lock: # Locking during initialization - if not os.path.exists(self.registry_path): - with open(self.registry_path, 'w') as file: - file.write('') - - def add_vm(self, vm_path, lock_needed=True): - if lock_needed: - with self.lock: - self._add_vm(vm_path) - else: - self._add_vm(vm_path) - - def _add_vm(self, vm_path, region=None): - assert region in [None, 'local'], "For VMware provider, the region should be neither None or 'local'." - with self.lock: - with open(self.registry_path, 'r') as file: - lines = file.readlines() - new_lines = lines + [f'{vm_path}|free\n'] - with open(self.registry_path, 'w') as file: - file.writelines(new_lines) - - def occupy_vm(self, vm_path, pid, lock_needed=True): - if lock_needed: - with self.lock: - self._occupy_vm(vm_path, pid) - else: - self._occupy_vm(vm_path, pid) - - def _occupy_vm(self, vm_path, pid, region=None): - assert region in [None, 'local'], "For VMware provider, the region should be neither None or 'local'." - with self.lock: - new_lines = [] - with open(self.registry_path, 'r') as file: - lines = file.readlines() - for line in lines: - registered_vm_path, _ = line.strip().split('|') - if registered_vm_path == vm_path: - new_lines.append(f'{registered_vm_path}|{pid}\n') - else: - new_lines.append(line) - with open(self.registry_path, 'w') as file: - file.writelines(new_lines) - - def delete_vm(self, vm_path, lock_needed=True): - if lock_needed: - with self.lock: - self._delete_vm(vm_path) - else: - self._delete_vm(vm_path) - - def _delete_vm(self, vm_path): - raise NotImplementedError - - def check_and_clean(self, vms_dir, lock_needed=True): - if lock_needed: - with self.lock: - self._check_and_clean(vms_dir) - else: - self._check_and_clean(vms_dir) - - def _check_and_clean(self, vms_dir): - with self.lock: # Lock when cleaning up the registry and vms_dir - # Check and clean on the running vms, detect the released ones and mark then as 'free' - active_pids = {p.pid for p in psutil.process_iter()} - new_lines = [] - vm_paths = [] - - with open(self.registry_path, 'r') as file: - lines = file.readlines() - for line in lines: - vm_path, pid_str = line.strip().split('|') - if not os.path.exists(vm_path): - logger.info(f"VM {vm_path} not found, releasing it.") - new_lines.append(f'{vm_path}|free\n') - continue - - vm_paths.append(vm_path) - if pid_str == "free": - new_lines.append(line) - continue - - if int(pid_str) in active_pids: - new_lines.append(line) - else: - new_lines.append(f'{vm_path}|free\n') - with open(self.registry_path, 'w') as file: - file.writelines(new_lines) - - # Check and clean on the files inside vms_dir, delete the unregistered ones - os.makedirs(vms_dir, exist_ok=True) - vm_names = os.listdir(vms_dir) - for vm_name in vm_names: - # skip the downloaded .zip file - if vm_name == DOWNLOADED_FILE_NAME: - continue - # Skip the .DS_Store file on macOS - if vm_name == ".DS_Store": - continue - - flag = True - for vm_path in vm_paths: - if vm_name + ".qcow2" in vm_path: - flag = False - if flag: - shutil.rmtree(os.path.join(vms_dir, vm_name)) - - def list_free_vms(self, lock_needed=True): - if lock_needed: - with self.lock: - return self._list_free_vms() - else: - return self._list_free_vms() - - def _list_free_vms(self): - with self.lock: # Lock when reading the registry - free_vms = [] - with open(self.registry_path, 'r') as file: - lines = file.readlines() - for line in lines: - vm_path, pid_str = line.strip().split('|') - if pid_str == "free": - free_vms.append((vm_path, pid_str)) - return free_vms - - def get_vm_path(self, os_type, region=None): - with self.lock: - if not DockerVMManager.checked_and_cleaned: - DockerVMManager.checked_and_cleaned = True - self._check_and_clean(vms_dir=VMS_DIR) - - allocation_needed = False - with self.lock: - free_vms_paths = self._list_free_vms() - if len(free_vms_paths) == 0: - # No free virtual machine available, generate a new one - allocation_needed = True - else: - # Choose the first free virtual machine - chosen_vm_path = free_vms_paths[0][0] - self._occupy_vm(chosen_vm_path, os.getpid()) - return chosen_vm_path - - if allocation_needed: - logger.info("No free virtual machine available. Generating a new one, which would take a while...☕") - new_vm_name = generate_new_vm_name(vms_dir=VMS_DIR, os_type=os_type) - - original_vm_name = None - if os_type == "Ubuntu": - original_vm_name = "Ubuntu" - elif os_type == "Windows": - original_vm_name = "Windows 10 x64" - - new_vm_path = _install_vm(new_vm_name, vms_dir=VMS_DIR, - downloaded_file_name=DOWNLOADED_FILE_NAME, original_vm_name=original_vm_name, os_type=os_type) - with self.lock: - self._add_vm(new_vm_path) - self._occupy_vm(new_vm_path, os.getpid()) - return new_vm_path + def get_vm_path(self, region): + if not os.path.exists(os.path.join(VMS_DIR, DOWNLOADED_FILE_NAME)): + __download_vm(VMS_DIR) + return os.path.join(VMS_DIR, DOWNLOADED_FILE_NAME) \ No newline at end of file diff --git a/desktop_env/providers/docker/provider.py b/desktop_env/providers/docker/provider.py index fb3f77b..6920a9e 100644 --- a/desktop_env/providers/docker/provider.py +++ b/desktop_env/providers/docker/provider.py @@ -4,6 +4,7 @@ import platform import subprocess import time import docker +import psutil from desktop_env.providers.base import Provider @@ -15,42 +16,35 @@ WAIT_TIME = 3 class DockerProvider(Provider): def __init__(self, region: str): self.client = docker.from_env() + self.vnc_port = self._get_available_port(8006) + self.server_port = self._get_available_port(5000) + # self.remote_debugging_port = self._get_available_port(1337) + self.chromium_port = self._get_available_port(9222) + self.environment = {"DISK_SIZE": "64G", "RAM_SIZE": "4G", "CPU_CORES": "2"} # Modify if needed @staticmethod - def _execute_command(command: list, return_output=False): - process = subprocess.Popen( - command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - encoding="utf-8" - ) - - if return_output: - output = process.communicate()[0].strip() - return output - else: - return None + def _get_available_port(port: int): + while port < 65354: + if port not in [conn.laddr.port for conn in psutil.net_connections()]: + return port + port += 1 def start_emulator(self, path_to_vm: str, headless: bool, os_type: str): - self.container = self.client.containers.run('qemux/qemu-docker', environment={"DISK_SIZE": "64G", "RAM_SIZE": "6G", "CPU_CORES": "8"}, volumes={"/Users/happysix/Programs/HKUNLP/Qemu/Ubuntu.qcow2": {"bind": "/Ubuntu.qcow2", "mode": "ro"}, "/Users/happysix/Programs/HKUNLP/Qemu/snapshot.qcow2": {"bind": "/boot.qcow2", "mode": "rw"}}, cap_add=["NET_ADMIN"], ports={8006: 8006, 5000: 5001}, detach=True) + # self.container = self.client.containers.run('qemux/qemu-docker', environment={"DISK_SIZE": "64G", "RAM_SIZE": "6G", "CPU_CORES": "8"}, volumes={"/Users/happysix/Programs/HKUNLP/Qemu/Ubuntu.qcow2": {"bind": "/Ubuntu.qcow2", "mode": "ro"}, "/Users/happysix/Programs/HKUNLP/Qemu/snapshot.qcow2": {"bind": "/boot.qcow2", "mode": "rw"}}, cap_add=["NET_ADMIN"], ports={8006: self.vnc_port, 5000: self.server_port}, detach=True) + self.container = self.client.containers.run(path_to_vm, environment=self.environment, cap_add=["NET_ADMIN"], volumes={"/Users/happysix/Programs/HKUNLP/Qemu/Ubuntu.qcow2": {"bind": "/Ubuntu.qcow2", "mode": "ro"}}, ports={8006: self.vnc_port, 5000: self.server_port}, detach=True) def get_ip_address(self, path_to_vm: str) -> str: - pass + return f"localhost:{self.server_port}:{self.chromium_port}:{self.vnc_port}" def save_state(self, path_to_vm: str, snapshot_name: str): - logger.info("Saving VM state...") - DockerProvider._execute_command( - ["qemu-img", "convert", "-O", "qcow2", snapshot_name, "temp.qcow2"]) - DockerProvider._execute_command( - ["mv", "temp.qcow2", path_to_vm] - ) - time.sleep(WAIT_TIME) # Wait for the VM to save + raise NotImplementedError("Not available for Docker.") def revert_to_snapshot(self, path_to_vm: str, snapshot_name: str): pass def stop_emulator(self, path_to_vm: str): - logger.info("Stopping VMware VM...") + logger.info("Stopping VM...") self.container.stop(WAIT_TIME) - self.container.remove() \ No newline at end of file + self.container.remove() + +# docker run -it --rm -e "DISK_SIZE=64G" -e "RAM_SIZE=8G" -e "CPU_CORES=8" --volume C:\Users\admin\Documents\Ubuntu.qcow2:/boot.qcow2 --cap-add NET_ADMIN --device /dev/kvm -p 8006:8006 -p 5000:5000 qemux/qemu-docker \ No newline at end of file