import requests import json from requests_toolbelt.multipart.encoder import MultipartEncoder import uuid import os.path from typing import Dict, List from typing import Any, Union import logging logger = logging.getLogger("desktopenv.setup") import traceback class SetupController: def __init__( self , http_server: str , cache_dir: str ): self.http_server = http_server + "/setup" self.cache_dir: str = cache_dir def reset_cache_dir(self, cache_dir: str): self.cache_dir = cache_dir def setup(self, config: List[Dict[str, Any]]): """ Args: config (List[Dict[str, Any]]): list of dict like {str: Any}. each config dict has the structure like { "type": str, corresponding to the `_{:}_setup` methods of this class "parameters": dick like {str, Any} providing the keyword parameters } """ for cfg in config: config_type: str = cfg["type"] parameters: Dict[str, Any] = cfg["parameters"] # Assumes all the setup the functions should follow this name # protocol setup_function: str = "_{:}_setup".format(config_type) assert hasattr(self, setup_function) getattr(self, setup_function)(**parameters) # self._download_setup(config) # self._change_wallpaper(config) # self._tidy_desktop(config) todo: implement this # self._open_setup(config) # can add other setup steps def _download_setup(self, files: List[Dict[str, str]]): """ Args: files (List[Dict[str, str]]): files to download. lisf of dict like { "url": str, the url to download "path": str, the path on the VM to store the downloaded file } """ # if not config: # return # if not 'download' in config: # return # for url, path in config['download']: for f in files: url: str = f["url"] path: str = f["path"] cache_path: str = os.path.join( self.cache_dir , "{:}_{:}".format( uuid.uuid5(uuid.NAMESPACE_URL, url) , os.path.basename(path) ) ) if not url or not path: raise Exception(f"Setup Download - Invalid URL ({url}) or path ({path}).") if not os.path.exists(cache_path): max_retries = 3 downloaded = False for i in range(max_retries): try: response = requests.get(url, stream=True) response.raise_for_status() with open(cache_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) logger.info("File downloaded successfully") downloaded = True break except requests.RequestException as e: logger.error(f"Failed to download {url}. Retrying... ({max_retries - i - 1} attempts left)") if not downloaded: raise requests.RequestException(f"Failed to download {url}. No retries left. Error: {e}") #payload = json.dumps({"url": url, "path": path}) #headers = { #'Content-Type': 'application/json' #} form = MultipartEncoder( { "file_path": path , "file_data": (os.path.basename(path), open(cache_path, "rb")) } ) headers = {"Content-Type": form.content_type} logger.debug(form.content_type) # send request to server to upload file try: logger.debug("REQUEST ADDRESS: %s", self.http_server + "/upload") response = requests.post(self.http_server + "/upload", headers=headers, data=form) if response.status_code == 200: logger.info("Command executed successfully: %s", response.text) else: logger.error("Failed to upload file. Status code: %s", response.text) except requests.exceptions.RequestException as e: logger.error("An error occurred while trying to send the request: %s", e) def _change_wallpaper_setup(self, path: str): # if not config: # return # if not 'wallpaper' in config: # return # path = config['wallpaper'] if not path: raise Exception(f"Setup Wallpaper - Invalid path ({path}).") payload = json.dumps({"path": path}) headers = { 'Content-Type': 'application/json' } # send request to server to change wallpaper try: response = requests.post(self.http_server + "/change_wallpaper", headers=headers, data=payload) if response.status_code == 200: logger.info("Command executed successfully: %s", response.text) else: logger.error("Failed to change wallpaper. Status code: %s", response.text) except requests.exceptions.RequestException as e: logger.error("An error occurred while trying to send the request: %s", e) def _tidy_desktop_setup(self, **config): raise NotImplementedError() def _open_setup(self, path: str): # if not config: # return # if not 'open' in config: # return # for path in config['open']: if not path: raise Exception(f"Setup Open - Invalid path ({path}).") payload = json.dumps({"path": path}) headers = { 'Content-Type': 'application/json' } # send request to server to open file try: response = requests.post(self.http_server + "/open_file", headers=headers, data=payload) if response.status_code == 200: logger.info("Command executed successfully: %s", response.text) else: logger.error("Failed to open file. Status code: %s", response.text) except requests.exceptions.RequestException as e: logger.error("An error occurred while trying to send the request: %s", e) def _launch_setup(self, command: List[str]): if not command: raise Exception("Empty comman to launch.") payload = json.dumps({"command": command}) headers = {"Content-Type": "application/json"} try: response = requests.post(self.http_server + "/launch", headers=headers, data=payload) if response.status_code == 200: logger.info("Command executed successfully: %s", response.text) else: logger.error("Failed to launch application. Status code: %s", response.text) except requests.exceptions.RequestException as e: logger.error("An error occurred while trying to send the request: %s", e) def _execute_setup(self, command: List[str], stdout: str = "", stderr: str = ""): if not command: raise Exception("Empty comman to launch.") payload = json.dumps({"command": command}) headers = {"Content-Type": "application/json"} try: response = requests.post(self.http_server + "/execute", headers=headers, data=payload) if response.status_code == 200: results: Dict[str, str] = response.json() if stdout: with open(os.path.join(self.cache_dir, stdout), "w") as f: f.write(results["output"]) if stderr: with open(os.path.join(self.cache_dir, stderr), "w") as f: f.write(results["error"]) logger.info( "Command executed successfully: %s -> %s" , " ".join(command) , response.text ) else: logger.error("Failed to launch application. Status code: %s", response.text) except requests.exceptions.RequestException as e: logger.error("An error occurred while trying to send the request: %s", e) traceback.print_exc() def _act_setup(self, action_seq: List[Union[Dict[str, Any], str]]): # TODO raise NotImplementedError() def _replay_setup(self, trajectory: str): """ Args: trajectory (str): path to the replay trajectory file """ # TODO raise NotImplementedError()