from __future__ import annotations import os import subprocess import time import uuid from typing import List import gymnasium as gym from desktop_env.controllers.python import PythonController def _execute_command(command: List[str]) -> None: result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=60, text=True) if result.returncode != 0: raise Exception("\033[91m" + result.stdout + result.stderr + "\033[0m") class DesktopEnv(gym.Env): """DesktopEnv with OpenAI Gym interface.""" def __init__( self, path_to_vm: str, host: str = "192.168.7.128:5000", snapshot_path: str = "base", action_space: str = "pyautogui", ): # Initialize environment variables self.path_to_vm = path_to_vm self.host = host self.snapshot_path = snapshot_path # todo: handling the logic of snapshot directory # Initialize emulator and controller print("Initializing...") self._start_emulator() self.controller = PythonController(http_server=self.host) # mode: human or machine assert action_space in ["computer_13", "pyautogui"] self.action_space = action_space # todo: define the action space and the observation space as gym did, or extend theirs def _start_emulator(self): while True: try: output = subprocess.check_output(f"vmrun -T ws list", shell=True, stderr=subprocess.STDOUT) output = output.decode() if self.path_to_vm.lstrip("~/") in output: print("VM is running.") break else: print("Starting VM...") _execute_command(["vmrun", "-T", "ws", "start", self.path_to_vm]) time.sleep(10) except subprocess.CalledProcessError as e: print(f"Error executing command: {e.output.decode().strip()}") def _save_state(self): _execute_command(["vmrun", "-T", "ws" "snapshot", self.path_to_vm, self.snapshot_path]) def _get_screenshot(self): random_uuid = str(uuid.uuid4()) os.makedirs(os.path.join("tmp", random_uuid), exist_ok=True) image_path = os.path.join("tmp", random_uuid, "screenshot.png") # Get the screenshot and save to the image_path screenshot = self.controller.get_screenshot() with open(image_path, "wb") as f: f.write(screenshot) return image_path def _get_obs(self): screenshot_image_path = self._get_screenshot() return screenshot_image_path def reset(self, seed=None, options=None): print("Resetting environment...") print("Reverting to snapshot to {}...".format(self.snapshot_path)) _execute_command(["vmrun", "-T", "ws", "revertToSnapshot", self.path_to_vm, self.snapshot_path]) time.sleep(5) print("Starting emulator...") self._start_emulator() print("Emulator started.") observation = self._get_obs() return observation def step(self, action, pause=0.5): if self.action_space == "computer_13": # the set of all possible actions defined in the action representation self.controller.execute_action(action) elif self.action_space == "pyautogui": # the set of all possible python commands insides `pyautogui` self.controller.execute_python_command(action) # todo: maybe for the better here we need to add a logic to wait until the rendering is done time.sleep(pause) observation = self._get_obs() reward = 0 # todo: Define reward calculation for each example done = False # todo: Define episode termination condition for each example info = {} return observation, reward, done, info def render(self, mode='rgb_array'): if mode == 'rgb_array': return self._get_obs() else: raise ValueError('Unsupported render mode: {}'.format(mode)) def close(self): _execute_command(["vmrun", "stop", self.path_to_vm])