ver Jan4th
updated interfaces for thunderbird evaluation, not tested
This commit is contained in:
@@ -6,7 +6,7 @@ import uuid
|
||||
import os.path
|
||||
|
||||
from typing import Dict, List
|
||||
from typing import Any
|
||||
from typing import Any, Union
|
||||
|
||||
|
||||
class SetupController:
|
||||
@@ -145,7 +145,7 @@ class SetupController:
|
||||
print("An error occurred while trying to send the request:", e)
|
||||
|
||||
def _tidy_desktop_setup(self, **config):
|
||||
raise NotImplementedError
|
||||
raise NotImplementedError()
|
||||
|
||||
def _open_setup(self, path: str):
|
||||
# if not config:
|
||||
@@ -170,3 +170,56 @@ class SetupController:
|
||||
print("Failed to open file. Status code:", response.text)
|
||||
except requests.exceptions.RequestException as e:
|
||||
print("An error occurred while trying to send the request:", e)
|
||||
|
||||
def _launch_setup(self, command: List[str]):
|
||||
if not command:
|
||||
raise Exception("Empty comman to launch.")
|
||||
|
||||
payload = json.dumps({"command": command})
|
||||
headers = {"Content-Type": "application/json"}
|
||||
|
||||
try:
|
||||
response = requests.post(self.http_server + "/launch", headers=headers, data=payload)
|
||||
if response.status_code == 200:
|
||||
print("Command executed successfully:", response.text)
|
||||
else:
|
||||
print("Failed to launch application. Status code:", response.text)
|
||||
except requests.exceptions.RequestException as e:
|
||||
print("An error occurred while trying to send the request:", e)
|
||||
|
||||
def _execute_setup(self, command: List[str], stdout: str = "", stderr: str = ""):
|
||||
if not command:
|
||||
raise Exception("Empty comman to launch.")
|
||||
|
||||
payload = json.dumps({"command": command})
|
||||
headers = {"Content-Type": "application/json"}
|
||||
|
||||
try:
|
||||
response = requests.post(self.http_server + "/launch", headers=headers, data=payload)
|
||||
if response.status_code == 200:
|
||||
results: Dict[str, str] = response.json()
|
||||
if stdout:
|
||||
with open(os.path.join(self.cache_dir, stdout), "w") as f:
|
||||
f.write(results["output"])
|
||||
if stderr:
|
||||
with open(os.path.join(self.cache_dir, stderr), "w") as f:
|
||||
f.write(results["error"])
|
||||
print( "Command executed successfully: {:} ->".format(" ".join(command))
|
||||
, response.text
|
||||
)
|
||||
else:
|
||||
print("Failed to launch application. Status code:", response.text)
|
||||
except requests.exceptions.RequestException as e:
|
||||
print("An error occurred while trying to send the request:", e)
|
||||
|
||||
def _act_setup(self, action_seq: List[Union[Dict[str, Any], str]]):
|
||||
# TODO
|
||||
raise NotImplementedError()
|
||||
def _replay_setup(self, trajectory: str):
|
||||
"""
|
||||
Args:
|
||||
trajectory (str): path to the replay trajectory file
|
||||
"""
|
||||
|
||||
# TODO
|
||||
raise NotImplementedError()
|
||||
|
||||
Reference in New Issue
Block a user