Files
sci-gui-agent-benchmark/desktop_env/controllers/setup.py
2024-01-05 15:55:41 +08:00

256 lines
9.8 KiB
Python

import requests
import json
from requests_toolbelt.multipart.encoder import MultipartEncoder
import uuid
import os.path
from typing import Dict, List
from typing import Any, Union
import logging
logger = logging.getLogger("desktopenv.setup")
import traceback
class SetupController:
def __init__(self, http_server: str, cache_dir: str):
self.http_server: str = http_server
self.http_server_setup_root = http_server + "/setup"
self.cache_dir: str = cache_dir
def reset_cache_dir(self, cache_dir: str):
self.cache_dir = cache_dir
def setup(self, config: List[Dict[str, Any]]):
"""
Args:
config (List[Dict[str, Any]]): list of dict like {str: Any}. each
config dict has the structure like
{
"type": str, corresponding to the `_{:}_setup` methods of
this class
"parameters": dick like {str, Any} providing the keyword
parameters
}
"""
for cfg in config:
config_type: str = cfg["type"]
parameters: Dict[str, Any] = cfg["parameters"]
# Assumes all the setup the functions should follow this name
# protocol
setup_function: str = "_{:}_setup".format(config_type)
assert hasattr(self, setup_function)
getattr(self, setup_function)(**parameters)
# self._download_setup(config)
# self._change_wallpaper(config)
# self._tidy_desktop(config) todo: implement this
# self._open_setup(config)
# can add other setup steps
# ZDY_COMMENT: merged with launch
#def _command_setup(self, command: str):
#"""
#Directly send a command into the virtual machine os for setting up.
#"""
#payload = json.dumps({"command": command})
#headers = {
#'Content-Type': 'application/json'
#}
#timeout = 5
#timout_whitelist = ["vlc"]
#
#try:
#
#response = requests.post(self.http_server + "/execute", headers=headers, data=payload, timeout=timeout)
#if response.status_code == 200:
#print("Command executed successfully:", response.text)
#else:
#print("Failed to execute command. Status code:", response.status_code)
#except requests.exceptions.Timeout as e:
#if command in timout_whitelist:
#print("Command executed successfully:", command)
#else:
#print("An error occurred while trying to execute the command:", e)
#except requests.exceptions.RequestException as e:
#print("An error occurred while trying to execute the command:", e)
def _download_setup(self, files: List[Dict[str, str]]):
"""
Args:
files (List[Dict[str, str]]): files to download. lisf of dict like
{
"url": str, the url to download
"path": str, the path on the VM to store the downloaded file
}
"""
# if not config:
# return
# if not 'download' in config:
# return
# for url, path in config['download']:
for f in files:
url: str = f["url"]
path: str = f["path"]
cache_path: str = os.path.join(self.cache_dir, "{:}_{:}".format(
uuid.uuid5(uuid.NAMESPACE_URL, url),
os.path.basename(path)))
if not url or not path:
raise Exception(f"Setup Download - Invalid URL ({url}) or path ({path}).")
if not os.path.exists(cache_path):
max_retries = 3
downloaded = False
for i in range(max_retries):
try:
response = requests.get(url, stream=True)
response.raise_for_status()
with open(cache_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
logger.info("File downloaded successfully")
downloaded = True
break
except requests.RequestException as e:
logger.error(f"Failed to download {url}. Retrying... ({max_retries - i - 1} attempts left)")
if not downloaded:
raise requests.RequestException(f"Failed to download {url}. No retries left. Error: {e}")
# payload = json.dumps({"url": url, "path": path})
# headers = {
# 'Content-Type': 'application/json'
# }
form = MultipartEncoder({
"file_path": path,
"file_data": (os.path.basename(path), open(cache_path, "rb"))
})
headers = {"Content-Type": form.content_type}
logger.debug(form.content_type)
# send request to server to upload file
try:
logger.debug("REQUEST ADDRESS: %s", self.http_server_setup_root + "/upload")
response = requests.post(self.http_server_setup_root + "/upload", headers=headers, data=form)
if response.status_code == 200:
logger.info("Command executed successfully: %s", response.text)
else:
logger.error("Failed to upload file. Status code: %s", response.text)
except requests.exceptions.RequestException as e:
logger.error("An error occurred while trying to send the request: %s", e)
def _change_wallpaper_setup(self, path: str):
# if not config:
# return
# if not 'wallpaper' in config:
# return
# path = config['wallpaper']
if not path:
raise Exception(f"Setup Wallpaper - Invalid path ({path}).")
payload = json.dumps({"path": path})
headers = {
'Content-Type': 'application/json'
}
# send request to server to change wallpaper
try:
response = requests.post(self.http_server_setup_root + "/change_wallpaper", headers=headers, data=payload)
if response.status_code == 200:
logger.info("Command executed successfully: %s", response.text)
else:
logger.error("Failed to change wallpaper. Status code: %s", response.text)
except requests.exceptions.RequestException as e:
logger.error("An error occurred while trying to send the request: %s", e)
def _tidy_desktop_setup(self, **config):
raise NotImplementedError()
def _open_setup(self, path: str):
# if not config:
# return
# if not 'open' in config:
# return
# for path in config['open']:
if not path:
raise Exception(f"Setup Open - Invalid path ({path}).")
payload = json.dumps({"path": path})
headers = {
'Content-Type': 'application/json'
}
# send request to server to open file
try:
response = requests.post(self.http_server_setup_root + "/open_file", headers=headers, data=payload)
if response.status_code == 200:
logger.info("Command executed successfully: %s", response.text)
else:
logger.error("Failed to open file. Status code: %s", response.text)
except requests.exceptions.RequestException as e:
logger.error("An error occurred while trying to send the request: %s", e)
def _launch_setup(self, command: List[str]):
if not command:
raise Exception("Empty comman to launch.")
payload = json.dumps({"command": command})
headers = {"Content-Type": "application/json"}
try:
response = requests.post(self.http_server_setup_root + "/launch", headers=headers, data=payload)
if response.status_code == 200:
logger.info("Command executed successfully: %s", response.text)
else:
logger.error("Failed to launch application. Status code: %s", response.text)
except requests.exceptions.RequestException as e:
logger.error("An error occurred while trying to send the request: %s", e)
def _execute_setup(self, command: List[str], stdout: str = "", stderr: str = ""):
if not command:
raise Exception("Empty comman to launch.")
payload = json.dumps({"command": command})
headers = {"Content-Type": "application/json"}
try:
response = requests.post(self.http_server_setup_root + "/execute", headers=headers, data=payload)
if response.status_code == 200:
results: Dict[str, str] = response.json()
if stdout:
with open(os.path.join(self.cache_dir, stdout), "w") as f:
f.write(results["output"])
if stderr:
with open(os.path.join(self.cache_dir, stderr), "w") as f:
f.write(results["error"])
logger.info( "Command executed successfully: %s -> %s"
, " ".join(command)
, response.text
)
else:
logger.error("Failed to launch application. Status code: %s", response.text)
except requests.exceptions.RequestException as e:
logger.error("An error occurred while trying to send the request: %s", e)
traceback.print_exc()
def _act_setup(self, action_seq: List[Union[Dict[str, Any], str]]):
# TODO
raise NotImplementedError()
def _replay_setup(self, trajectory: str):
"""
Args:
trajectory (str): path to the replay trajectory file
"""
# TODO
raise NotImplementedError()