import requests import json from requests_toolbelt.multipart.encoder import MultipartEncoder import uuid import os.path from typing import Dict, List from typing import Any, Union, Optional import logging logger = logging.getLogger("desktopenv.setup") import traceback import time class SetupController: def __init__(self, http_server: str, cache_dir: str): self.http_server: str = http_server self.http_server_setup_root = http_server + "/setup" self.cache_dir: str = cache_dir def reset_cache_dir(self, cache_dir: str): self.cache_dir = cache_dir def setup(self, config: List[Dict[str, Any]]): """ Args: config (List[Dict[str, Any]]): list of dict like {str: Any}. each config dict has the structure like { "type": str, corresponding to the `_{:}_setup` methods of this class "parameters": dick like {str, Any} providing the keyword parameters } """ for cfg in config: config_type: str = cfg["type"] parameters: Dict[str, Any] = cfg["parameters"] # Assumes all the setup the functions should follow this name # protocol setup_function: str = "_{:}_setup".format(config_type) assert hasattr(self, setup_function) getattr(self, setup_function)(**parameters) logger.info("SETUP: %s(%s)", setup_function, str(parameters)) # self._download_setup(config) # self._change_wallpaper(config) # self._tidy_desktop(config) todo: implement this # self._open_setup(config) # can add other setup steps # ZDY_COMMENT: merged with launch #def _command_setup(self, command: str): #""" #Directly send a command into the virtual machine os for setting up. #""" #payload = json.dumps({"command": command}) #headers = { #'Content-Type': 'application/json' #} #timeout = 5 #timout_whitelist = ["vlc"] # #try: # #response = requests.post(self.http_server + "/execute", headers=headers, data=payload, timeout=timeout) #if response.status_code == 200: #print("Command executed successfully:", response.text) #else: #print("Failed to execute command. Status code:", response.status_code) #except requests.exceptions.Timeout as e: #if command in timout_whitelist: #print("Command executed successfully:", command) #else: #print("An error occurred while trying to execute the command:", e) #except requests.exceptions.RequestException as e: #print("An error occurred while trying to execute the command:", e) def _download_setup(self, files: List[Dict[str, str]]): """ Args: files (List[Dict[str, str]]): files to download. lisf of dict like { "url": str, the url to download "path": str, the path on the VM to store the downloaded file } """ # if not config: # return # if not 'download' in config: # return # for url, path in config['download']: for f in files: url: str = f["url"] path: str = f["path"] cache_path: str = os.path.join(self.cache_dir, "{:}_{:}".format( uuid.uuid5(uuid.NAMESPACE_URL, url), os.path.basename(path))) if not url or not path: raise Exception(f"Setup Download - Invalid URL ({url}) or path ({path}).") if not os.path.exists(cache_path): max_retries = 3 downloaded = False for i in range(max_retries): try: response = requests.get(url, stream=True) response.raise_for_status() with open(cache_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) logger.info("File downloaded successfully") downloaded = True break except requests.RequestException as e: logger.error(f"Failed to download {url}. Retrying... ({max_retries - i - 1} attempts left)") if not downloaded: raise requests.RequestException(f"Failed to download {url}. No retries left. Error: {e}") # payload = json.dumps({"url": url, "path": path}) # headers = { # 'Content-Type': 'application/json' # } form = MultipartEncoder({ "file_path": path, "file_data": (os.path.basename(path), open(cache_path, "rb")) }) headers = {"Content-Type": form.content_type} logger.debug(form.content_type) # send request to server to upload file try: logger.debug("REQUEST ADDRESS: %s", self.http_server_setup_root + "/upload") response = requests.post(self.http_server_setup_root + "/upload", headers=headers, data=form) if response.status_code == 200: logger.info("Command executed successfully: %s", response.text) else: logger.error("Failed to upload file. Status code: %s", response.text) except requests.exceptions.RequestException as e: logger.error("An error occurred while trying to send the request: %s", e) def _change_wallpaper_setup(self, path: str): # if not config: # return # if not 'wallpaper' in config: # return # path = config['wallpaper'] if not path: raise Exception(f"Setup Wallpaper - Invalid path ({path}).") payload = json.dumps({"path": path}) headers = { 'Content-Type': 'application/json' } # send request to server to change wallpaper try: response = requests.post(self.http_server_setup_root + "/change_wallpaper", headers=headers, data=payload) if response.status_code == 200: logger.info("Command executed successfully: %s", response.text) else: logger.error("Failed to change wallpaper. Status code: %s", response.text) except requests.exceptions.RequestException as e: logger.error("An error occurred while trying to send the request: %s", e) def _tidy_desktop_setup(self, **config): raise NotImplementedError() def _open_setup(self, path: str): # if not config: # return # if not 'open' in config: # return # for path in config['open']: if not path: raise Exception(f"Setup Open - Invalid path ({path}).") payload = json.dumps({"path": path}) headers = { 'Content-Type': 'application/json' } # send request to server to open file try: response = requests.post(self.http_server_setup_root + "/open_file", headers=headers, data=payload) if response.status_code == 200: logger.info("Command executed successfully: %s", response.text) else: logger.error("Failed to open file. Status code: %s", response.text) except requests.exceptions.RequestException as e: logger.error("An error occurred while trying to send the request: %s", e) def _launch_setup(self, command: Union[str, List[str]]): if not command: raise Exception("Empty command to launch.") if isinstance(command, str) and len(command.split()) > 1: logger.warning("Command should be a list of strings. Now it is a string. Will split it by space.") command = command.split() payload = json.dumps({"command": command}) headers = {"Content-Type": "application/json"} try: response = requests.post(self.http_server_setup_root + "/launch", headers=headers, data=payload) if response.status_code == 200: logger.info("Command executed successfully: %s", response.text) else: logger.error("Failed to launch application. Status code: %s", response.text) except requests.exceptions.RequestException as e: logger.error("An error occurred while trying to send the request: %s", e) def _execute_setup( self, command: List[str] , stdout: str = "", stderr: str = "" , shell: bool = False, until: Optional[Dict[str, Any]] = None): if not command: raise Exception("Empty comman to launch.") until: Dict[str, Any] = until or {} terminates: bool = False nb_failings = 0 payload = json.dumps({"command": command, "shell": shell}) headers = {"Content-Type": "application/json"} while not terminates: try: response = requests.post(self.http_server_setup_root + "/execute", headers=headers, data=payload) if response.status_code == 200: results: Dict[str, str] = response.json() if stdout: with open(os.path.join(self.cache_dir, stdout), "w") as f: f.write(results["output"]) if stderr: with open(os.path.join(self.cache_dir, stderr), "w") as f: f.write(results["error"]) logger.info( "Command executed successfully: %s -> %s" , " ".join(command) , response.text ) else: logger.error("Failed to launch application. Status code: %s", response.text) results = None nb_failings += 1 except requests.exceptions.RequestException as e: logger.error("An error occurred while trying to send the request: %s", e) traceback.print_exc() results = None nb_failings += 1 if len(until)==0: terminates = True elif results is not None: terminates = "returncode" in until and results["returncode"]==until["returncode"]\ or "stdout" in until and until["stdout"] in results["output"]\ or "stderr" in until and until["stderr"] in results["error"] terminates = terminates or nb_failings>=5 if not terminates: time.sleep(0.3) def _command_setup(self, command: List[str], **kwargs): self._execute_setup(command, **kwargs) def _sleep_setup(self, seconds: float): time.sleep(seconds) def _act_setup(self, action_seq: List[Union[Dict[str, Any], str]]): # TODO raise NotImplementedError() def _replay_setup(self, trajectory: str): """ Args: trajectory (str): path to the replay trajectory file """ # TODO raise NotImplementedError()