741 lines
28 KiB
Python
741 lines
28 KiB
Python
from __future__ import annotations
|
|
import argparse
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import signal
|
|
import time
|
|
from typing import List, Dict, Any, Optional
|
|
import math
|
|
from tqdm import tqdm
|
|
from multiprocessing import Process, Manager
|
|
from multiprocessing import current_process
|
|
import lib_run_single
|
|
from desktop_env.desktop_env import DesktopEnv, _fix_pyautogui_less_than_bug
|
|
from mm_agents.aworldguiagent.agent import AworldGUIAgent
|
|
from mm_agents.aworldguiagent.grounding import OSWorldACI
|
|
|
|
MAX_RETRIES = 5 # Maximum retries for environment setup
|
|
|
|
# Global variables for signal handling
|
|
active_environments = []
|
|
processes = []
|
|
is_terminating = False
|
|
|
|
# import wandb
|
|
|
|
# load the environment variables from .env file
|
|
if os.path.exists(".env"):
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
|
|
# Logger Configs {{{ #
|
|
def config() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Run end-to-end evaluation on the benchmark"
|
|
)
|
|
|
|
# environment config
|
|
parser.add_argument("--path_to_vm", type=str, default=None)
|
|
parser.add_argument(
|
|
"--headless", action="store_true", help="Run in headless machine"
|
|
)
|
|
parser.add_argument(
|
|
"--action_space", type=str, default="pyautogui", help="Action type"
|
|
)
|
|
parser.add_argument(
|
|
"--observation_type",
|
|
choices=["screenshot", "a11y_tree", "screenshot_a11y_tree", "som"],
|
|
default="screenshot",
|
|
help="Observation type",
|
|
)
|
|
parser.add_argument("--sleep_after_execution", type=float, default=0.0)
|
|
parser.add_argument("--max_steps", type=int, default=15)
|
|
|
|
# agent config
|
|
parser.add_argument(
|
|
"--test_config_base_dir", type=str, default="evaluation_examples"
|
|
)
|
|
|
|
# lm config
|
|
parser.add_argument("--model", type=str, default="o3")
|
|
|
|
# example config
|
|
parser.add_argument("--domain", type=str, default="all")
|
|
parser.add_argument(
|
|
"--test_all_meta_path", type=str, default="evaluation_examples/test_all.json"
|
|
)
|
|
|
|
# logging related
|
|
parser.add_argument("--result_dir", type=str, default="./results")
|
|
parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to run in parallel")
|
|
parser.add_argument("--log_level", type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
|
|
default='INFO', help="Set the logging level")
|
|
# aws config
|
|
parser.add_argument(
|
|
"--region", type=str, default="us-east-1", help="AWS region for the VM"
|
|
)
|
|
parser.add_argument(
|
|
"--provider_name", type=str, default="aws", choices=["aws", "virtualbox", "vmware", "docker", "azure"],
|
|
help="Provider name"
|
|
)
|
|
parser.add_argument(
|
|
"--client_password", type=str, default="", help="Client password"
|
|
)
|
|
parser.add_argument(
|
|
"--screen_width", type=int, default=1920, help="Screen width"
|
|
)
|
|
parser.add_argument(
|
|
"--screen_height", type=int, default=1080, help="Screen height"
|
|
)
|
|
|
|
# agent S2 config
|
|
|
|
parser.add_argument("--model_provider", type=str, default="openai")
|
|
parser.add_argument(
|
|
"--model_url",
|
|
type=str,
|
|
default="",
|
|
help="The URL of the main generation model API.",
|
|
)
|
|
parser.add_argument(
|
|
"--model_api_key",
|
|
type=str,
|
|
default="",
|
|
help="The API key of the main generation model.",
|
|
)
|
|
parser.add_argument("--model_temperature", type=float, default=None,
|
|
help="Temperature to fix the generation model at (e.g. o3 can only be run with 1.0)")
|
|
|
|
parser.add_argument("--ground_provider", type=str, required=True, help="The provider for the grounding model")
|
|
parser.add_argument("--ground_url", type=str, required=True, help="The URL of the grounding model")
|
|
parser.add_argument(
|
|
"--ground_api_key",
|
|
type=str,
|
|
default="",
|
|
help="The API key of the grounding model.",
|
|
)
|
|
parser.add_argument(
|
|
"--ground_model", type=str, required=True, help="The model name for the grounding model"
|
|
)
|
|
parser.add_argument(
|
|
"--grounding_width",
|
|
type=int,
|
|
required=True,
|
|
help="Width of screenshot image after processor rescaling",
|
|
)
|
|
parser.add_argument(
|
|
"--grounding_height",
|
|
type=int,
|
|
required=True,
|
|
help="Height of screenshot image after processor rescaling",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
return args
|
|
|
|
|
|
args = config() # Get command line arguments first
|
|
|
|
logger = logging.getLogger()
|
|
log_level = getattr(logging, args.log_level.upper())
|
|
logger.setLevel(log_level)
|
|
|
|
datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
|
|
|
|
file_handler = logging.FileHandler(
|
|
os.path.join("logs", "normal-{:}.log".format(datetime_str)), encoding="utf-8"
|
|
)
|
|
debug_handler = logging.FileHandler(
|
|
os.path.join("logs", "debug-{:}.log".format(datetime_str)), encoding="utf-8"
|
|
)
|
|
stdout_handler = logging.StreamHandler(sys.stdout)
|
|
|
|
file_handler.setLevel(logging.INFO)
|
|
debug_handler.setLevel(logging.DEBUG)
|
|
stdout_handler.setLevel(log_level)
|
|
|
|
formatter = logging.Formatter(
|
|
fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s"
|
|
)
|
|
file_handler.setFormatter(formatter)
|
|
debug_handler.setFormatter(formatter)
|
|
stdout_handler.setFormatter(formatter)
|
|
|
|
stdout_handler.addFilter(logging.Filter("desktopenv"))
|
|
|
|
logger.addHandler(file_handler)
|
|
logger.addHandler(debug_handler)
|
|
logger.addHandler(stdout_handler)
|
|
# }}} Logger Configs #
|
|
|
|
logger = logging.getLogger("desktopenv.experiment")
|
|
|
|
|
|
class CustomDesktopEnv(DesktopEnv):
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
logger.info("CustomDesktopEnv class initialized.")
|
|
|
|
def reset(self, task_config: Optional[Dict[str, Any]] = None, seed=None, options=None) -> Dict[str, Any]:
|
|
|
|
# Reset to certain task in OSWorld
|
|
logger.info("Resetting environment...")
|
|
logger.info("Switching task...")
|
|
logger.info("Setting counters...")
|
|
self._traj_no += 1
|
|
self._step_no = 0
|
|
self.action_history.clear()
|
|
|
|
for attempt in range(MAX_RETRIES):
|
|
# Only revert to snapshot if environment has been used (step/setup)
|
|
# This optimization is especially important for cloud providers like AWS
|
|
# where unnecessary snapshot operations are costly and time-consuming
|
|
|
|
if task_config is not None:
|
|
# Only consider task proxy requirement if proxy is enabled at system level
|
|
task_use_proxy = task_config.get("proxy", False) and self.enable_proxy
|
|
if not self.enable_proxy and task_config.get("proxy", False):
|
|
logger.info(
|
|
"Task requires proxy but proxy is disabled at system level, ignoring proxy requirement.")
|
|
|
|
if task_use_proxy != self.current_use_proxy:
|
|
# keep because get_info_from_website depend on this
|
|
self.current_use_proxy = task_use_proxy
|
|
|
|
if self.is_environment_used:
|
|
logger.info("Environment has been used, reverting to snapshot {}...".format(self.snapshot_name))
|
|
self._revert_to_snapshot()
|
|
logger.info("Starting emulator...")
|
|
self._start_emulator()
|
|
logger.info("Emulator started.")
|
|
# Reset the usage flag after reverting
|
|
self.is_environment_used = False
|
|
else:
|
|
logger.info("Environment is clean, skipping snapshot revert (provider: {}).".format(self.provider_name))
|
|
|
|
if task_config is not None:
|
|
if task_config.get("proxy", False) and self.enable_proxy:
|
|
# If using proxy and proxy is enabled, set up the proxy configuration
|
|
self.setup_controller._proxy_setup(self.client_password)
|
|
self._set_task_info(task_config)
|
|
self.setup_controller.reset_cache_dir(self.cache_dir)
|
|
logger.info("Clearing browser cache and browsing data...")
|
|
try:
|
|
self.setup_controller._delete_all_browsing_data_chromium_setup()
|
|
logger.info("Browser cache cleared successfully")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to clear browser cache: {e}")
|
|
logger.info("Setting up environment...")
|
|
success = self.setup_controller.setup(self.config,
|
|
task_config.get("proxy", False) and self.enable_proxy)
|
|
if success:
|
|
# Mark environment as used when setup is successfully executed
|
|
if self.config: # Only mark as used if there were actual setup operations
|
|
self.is_environment_used = True
|
|
break
|
|
else:
|
|
logger.error(
|
|
"Environment setup failed, retrying (%d/%d)...",
|
|
attempt + 1,
|
|
MAX_RETRIES,
|
|
)
|
|
time.sleep(5)
|
|
else:
|
|
break
|
|
|
|
logger.info("Environment setup complete.")
|
|
|
|
# start soffice service for office tools
|
|
self.setup_controller._launch_setup(
|
|
'soffice --headless --accept="socket,host=localhost,port=2002;urp;" --norestore --nologo --nodefault', shell=True)
|
|
time.sleep(5)
|
|
|
|
observation = self._get_obs()
|
|
return observation
|
|
|
|
def step(self, action, pause=2):
|
|
self._step_no += 1
|
|
self.action_history.append(action)
|
|
|
|
# Mark environment as used when step is called
|
|
self.is_environment_used = True
|
|
|
|
reward = 0 # todo: Define reward calculation for each example
|
|
done = False # todo: Define episode termination condition for each example
|
|
response = None
|
|
info = {}
|
|
logger.info(f"Step {self._step_no} in trajectory {self._traj_no} with action: {action}")
|
|
# handle the special actions
|
|
if action in ['WAIT', 'FAIL', 'DONE'] or (
|
|
type(action) == dict and action['action_type'] in ['WAIT', 'FAIL', 'DONE']):
|
|
if action == 'WAIT':
|
|
time.sleep(pause)
|
|
elif action == 'FAIL':
|
|
done = True
|
|
info = {"fail": True}
|
|
elif action == 'DONE':
|
|
done = True
|
|
info = {"done": True}
|
|
|
|
if self.action_space == "computer_13":
|
|
# the set of all possible actions defined in the action representation
|
|
self.controller.execute_action(action)
|
|
elif self.action_space == "pyautogui" or self.action_space == "claude_computer_use":
|
|
if action in ['WAIT', 'FAIL', 'DONE']:
|
|
self.controller.execute_action(action)
|
|
else:
|
|
# the set of all possible python commands insides `pyautogui`
|
|
if type(action) == str:
|
|
# Fix PyAutoGUI '<' character bug before execution
|
|
fixed_command = _fix_pyautogui_less_than_bug(action)
|
|
response = self.controller.execute_python_command(fixed_command)
|
|
|
|
elif type(action) == dict:
|
|
# Fix PyAutoGUI '<' character bug before execution
|
|
fixed_command = _fix_pyautogui_less_than_bug(action['command'])
|
|
response = self.controller.execute_python_command(fixed_command)
|
|
|
|
time.sleep(pause)
|
|
observation = self._get_obs()
|
|
observation["action_response"] = response
|
|
return observation, reward, done, info
|
|
|
|
|
|
def distribute_tasks(test_all_meta: dict) -> List[tuple]:
|
|
all_tasks = []
|
|
for domain, examples in test_all_meta.items():
|
|
for example_id in examples:
|
|
all_tasks.append((domain, example_id))
|
|
return all_tasks
|
|
|
|
|
|
def process_signal_handler(signum, frame, env_idx):
|
|
"""Signal handler for child processes to gracefully shut down their environments."""
|
|
logger.info(f"Process {env_idx + 1} received signal {signum}. Shutting down...")
|
|
|
|
# Get the active_environments from the caller's frame
|
|
local_vars = frame.f_locals
|
|
active_environments = local_vars.get('active_environments', [])
|
|
|
|
# Close environment in the current process context
|
|
for env in active_environments:
|
|
if env is not None:
|
|
try:
|
|
logger.info(f"Process {env_idx + 1} closing environment...")
|
|
env.close()
|
|
logger.info(f"Process {env_idx + 1} environment closed successfully")
|
|
except Exception as e:
|
|
logger.error(f"Process {env_idx + 1} error closing environment: {e}")
|
|
|
|
logger.info(f"Process {env_idx + 1} shutdown complete. Exiting.")
|
|
sys.exit(0)
|
|
|
|
|
|
def run_env_tasks(task_queue: Queue, args: argparse.Namespace, shared_scores: list):
|
|
active_environments = []
|
|
env = None
|
|
try:
|
|
from desktop_env.providers.aws.manager import IMAGE_ID_MAP
|
|
REGION = args.region
|
|
screen_size = (args.screen_width, args.screen_height)
|
|
ami_id = IMAGE_ID_MAP[REGION].get(screen_size, IMAGE_ID_MAP[REGION][(1920, 1080)])
|
|
env = CustomDesktopEnv(
|
|
path_to_vm=args.path_to_vm,
|
|
action_space=args.action_space,
|
|
provider_name=args.provider_name,
|
|
region=REGION,
|
|
# snapshot_name=ami_id,
|
|
screen_size=screen_size,
|
|
headless=args.headless,
|
|
os_type="Ubuntu",
|
|
require_a11y_tree=args.observation_type in ["a11y_tree", "screenshot_a11y_tree", "som"],
|
|
enable_proxy=False,
|
|
client_password=args.client_password
|
|
)
|
|
active_environments.append(env)
|
|
|
|
# AgentS2 configuration
|
|
engine_params = {
|
|
"engine_type": args.model_provider,
|
|
"model": args.model,
|
|
"base_url": getattr(args, 'model_url', ''),
|
|
"api_key": getattr(args, 'model_api_key', ''),
|
|
"temperature": getattr(args, 'model_temperature', None),
|
|
}
|
|
|
|
|
|
engine_params_for_grounding = {
|
|
"engine_type": args.ground_provider,
|
|
"model": args.ground_model,
|
|
"base_url": getattr(args, 'ground_url', ''),
|
|
"api_key": getattr(args, 'ground_api_key', ''),
|
|
"grounding_width": args.grounding_width,
|
|
"grounding_height": args.grounding_height,
|
|
}
|
|
|
|
# Create grounding agent
|
|
grounding_agent = OSWorldACI(
|
|
platform="linux",
|
|
engine_params_for_generation=engine_params,
|
|
engine_params_for_grounding=engine_params_for_grounding,
|
|
width=args.screen_width,
|
|
height=args.screen_height,
|
|
)
|
|
|
|
# Create AgentS2 worker
|
|
agent = AworldGUIAgent(
|
|
engine_params,
|
|
grounding_agent,
|
|
platform="linux",
|
|
)
|
|
|
|
logger.info(f"Process {current_process().name} started.")
|
|
while True:
|
|
try:
|
|
item = task_queue.get(timeout=5)
|
|
except Exception:
|
|
break
|
|
domain, example_id = item
|
|
try:
|
|
config_file = os.path.join(
|
|
args.test_config_base_dir, f"examples/{domain}/{example_id}.json"
|
|
)
|
|
with open(config_file, "r", encoding="utf-8") as f:
|
|
example = json.load(f)
|
|
logger.info(f"[{current_process().name}][Domain]: {domain}")
|
|
logger.info(f"[{current_process().name}][Example ID]: {example_id}")
|
|
logger.info(f"[{current_process().name}][Instruction]: {example['instruction']}")
|
|
example_result_dir = os.path.join(
|
|
args.result_dir,
|
|
args.action_space,
|
|
args.observation_type,
|
|
args.model,
|
|
domain,
|
|
example_id,
|
|
)
|
|
os.makedirs(example_result_dir, exist_ok=True)
|
|
try:
|
|
lib_run_single.run_single_example(
|
|
agent,
|
|
env,
|
|
example,
|
|
args.max_steps,
|
|
example["instruction"],
|
|
args,
|
|
example_result_dir,
|
|
shared_scores,
|
|
)
|
|
except Exception as e:
|
|
import traceback
|
|
logger.error(f"Exception in {current_process().name} {domain}/{example_id}: {e}")
|
|
logger.error(traceback.format_exc())
|
|
try:
|
|
env.controller.end_recording(
|
|
os.path.join(example_result_dir, "recording.mp4")
|
|
)
|
|
except Exception as rec_e:
|
|
logger.error(f"Failed to end recording: {rec_e}")
|
|
with open(os.path.join(example_result_dir, "traj.jsonl"), "a") as f:
|
|
f.write(
|
|
json.dumps(
|
|
{"Error": f"{domain}/{example_id} - {e}"}
|
|
)
|
|
)
|
|
f.write("\n")
|
|
except Exception as e:
|
|
logger.error(f"Task-level error in {current_process().name}: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
except Exception as e:
|
|
logger.error(f"Process-level error in {current_process().name}: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
finally:
|
|
logger.info(f"{current_process().name} cleaning up environment...")
|
|
try:
|
|
if env:
|
|
env.close()
|
|
logger.info(f"{current_process().name} environment closed successfully")
|
|
except Exception as e:
|
|
logger.error(f"{current_process().name} error during environment cleanup: {e}")
|
|
|
|
|
|
def signal_handler(signum, frame):
|
|
"""Handle termination signals (SIGINT, SIGTERM) to gracefully shutdown environments."""
|
|
global is_terminating, active_environments, processes
|
|
|
|
# Avoid duplicate handling
|
|
if is_terminating:
|
|
return
|
|
|
|
is_terminating = True
|
|
logger.info(f"Received signal {signum}. Gracefully shutting down...")
|
|
|
|
# Close all registered environments in the main process
|
|
for env in active_environments:
|
|
try:
|
|
logger.info(f"Closing environment...")
|
|
env.close()
|
|
logger.info(f"Environment closed successfully")
|
|
except Exception as e:
|
|
logger.error(f"Error closing environment: {e}")
|
|
|
|
# Send termination signal to all child processes first
|
|
for p in processes:
|
|
if p.is_alive():
|
|
try:
|
|
logger.info(f"Sending termination signal to process {p.name}...")
|
|
p.terminate()
|
|
except Exception as e:
|
|
logger.error(f"Error sending termination signal to process: {e}")
|
|
|
|
# Allow a short time for processes to handle their own cleanup
|
|
time.sleep(1)
|
|
|
|
# Forcefully terminate any processes that didn't exit
|
|
for p in processes:
|
|
if p.is_alive():
|
|
try:
|
|
logger.info(f"Forcefully terminating process {p.name}...")
|
|
import signal as sig
|
|
os.kill(p.pid, sig.SIGKILL)
|
|
except Exception as e:
|
|
logger.error(f"Error forcefully terminating process: {e}")
|
|
|
|
logger.info("Shutdown complete. Exiting.")
|
|
sys.exit(0)
|
|
|
|
|
|
def test(args: argparse.Namespace, test_all_meta: dict) -> None:
|
|
global processes
|
|
logger.info("Args: %s", args)
|
|
all_tasks = distribute_tasks(test_all_meta)
|
|
logger.info(f"Total tasks: {len(all_tasks)}")
|
|
with Manager() as manager:
|
|
shared_scores = manager.list()
|
|
task_queue = manager.Queue()
|
|
for item in all_tasks:
|
|
task_queue.put(item)
|
|
num_envs = args.num_envs
|
|
processes = []
|
|
for i in range(num_envs):
|
|
p = Process(
|
|
target=run_env_tasks,
|
|
args=(task_queue, args, shared_scores),
|
|
name=f"EnvProcess-{i + 1}"
|
|
)
|
|
p.daemon = True
|
|
p.start()
|
|
processes.append(p)
|
|
logger.info(f"Started process {p.name} with PID {p.pid}")
|
|
try:
|
|
while True:
|
|
alive_count = 0
|
|
for idx, p in enumerate(processes):
|
|
if not p.is_alive():
|
|
logger.warning(f"Process {p.name} died, restarting...")
|
|
new_p = Process(
|
|
target=run_env_tasks,
|
|
args=(task_queue, args, shared_scores),
|
|
name=f"EnvProcess-Restart-{idx + 1}"
|
|
)
|
|
new_p.daemon = True
|
|
new_p.start()
|
|
processes[idx] = new_p
|
|
logger.info(f"Restarted process {new_p.name} with PID {new_p.pid}")
|
|
else:
|
|
alive_count += 1
|
|
if task_queue.empty():
|
|
logger.info("All tasks finished.")
|
|
break
|
|
if alive_count == 0:
|
|
logger.error("All processes died, exiting.")
|
|
break
|
|
time.sleep(5)
|
|
for p in processes:
|
|
p.join()
|
|
except KeyboardInterrupt:
|
|
logger.info("Main process received KeyboardInterrupt. Initiating graceful shutdown...")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error while waiting for processes: {e}", exc_info=True)
|
|
for p in processes:
|
|
if p.is_alive():
|
|
try:
|
|
logger.info(f"Terminating process {p.name} due to error...")
|
|
p.terminate()
|
|
except Exception as term_e:
|
|
logger.error(f"Error terminating process {p.name}: {term_e}")
|
|
raise
|
|
scores = list(shared_scores)
|
|
logger.info(f"Average score: {sum(scores) / len(scores) if scores else 0}")
|
|
|
|
|
|
def get_unfinished(
|
|
action_space, use_model, observation_type, result_dir, total_file_json
|
|
):
|
|
target_dir = os.path.join(result_dir, action_space, observation_type, use_model)
|
|
|
|
if not os.path.exists(target_dir):
|
|
return total_file_json
|
|
|
|
finished = {}
|
|
for domain in os.listdir(target_dir):
|
|
finished[domain] = []
|
|
domain_path = os.path.join(target_dir, domain)
|
|
if os.path.isdir(domain_path):
|
|
for example_id in os.listdir(domain_path):
|
|
if example_id == "onboard":
|
|
continue
|
|
example_path = os.path.join(domain_path, example_id)
|
|
if os.path.isdir(example_path):
|
|
if "result.txt" not in os.listdir(example_path):
|
|
# empty all files under example_id
|
|
for file in os.listdir(example_path):
|
|
os.remove(os.path.join(example_path, file))
|
|
else:
|
|
finished[domain].append(example_id)
|
|
|
|
if not finished:
|
|
return total_file_json
|
|
|
|
for domain, examples in finished.items():
|
|
if domain in total_file_json:
|
|
total_file_json[domain] = [
|
|
x for x in total_file_json[domain] if x not in examples
|
|
]
|
|
|
|
return total_file_json
|
|
|
|
|
|
def get_result(action_space, use_model, observation_type, result_dir, total_file_json):
|
|
target_dir = os.path.join(result_dir, action_space, observation_type, use_model)
|
|
if not os.path.exists(target_dir):
|
|
print("New experiment, no result yet.")
|
|
return None
|
|
|
|
all_result = []
|
|
|
|
for domain in os.listdir(target_dir):
|
|
domain_path = os.path.join(target_dir, domain)
|
|
if os.path.isdir(domain_path):
|
|
for example_id in os.listdir(domain_path):
|
|
example_path = os.path.join(domain_path, example_id)
|
|
if os.path.isdir(example_path):
|
|
if "result.txt" in os.listdir(example_path):
|
|
# empty all files under example_id
|
|
try:
|
|
all_result.append(
|
|
float(
|
|
open(
|
|
os.path.join(example_path, "result.txt"), "r"
|
|
).read()
|
|
)
|
|
)
|
|
except:
|
|
all_result.append(0.0)
|
|
|
|
if not all_result:
|
|
print("New experiment, no result yet.")
|
|
return None
|
|
else:
|
|
print("Current Success Rate:", sum(all_result) / len(all_result) * 100, "%")
|
|
return all_result
|
|
|
|
|
|
if __name__ == "__main__":
|
|
####### The complete version of the list of examples #######
|
|
os.environ["TOKENIZERS_PARALLELISM"] = "false"
|
|
|
|
# Register signal handlers for graceful termination
|
|
signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C
|
|
signal.signal(signal.SIGTERM, signal_handler) # Handle termination signal
|
|
|
|
try:
|
|
args = config()
|
|
|
|
# save args to json in result_dir/action_space/observation_type/model/args.json
|
|
path_to_args = os.path.join(
|
|
args.result_dir,
|
|
args.action_space,
|
|
args.observation_type,
|
|
args.model,
|
|
"args.json",
|
|
)
|
|
os.makedirs(os.path.dirname(path_to_args), exist_ok=True)
|
|
with open(path_to_args, "w", encoding="utf-8") as f:
|
|
json.dump(vars(args), f, indent=4)
|
|
|
|
with open(args.test_all_meta_path, "r", encoding="utf-8") as f:
|
|
test_all_meta = json.load(f)
|
|
|
|
if args.domain != "all":
|
|
test_all_meta = {args.domain: test_all_meta[args.domain]}
|
|
|
|
test_file_list = get_unfinished(
|
|
args.action_space,
|
|
args.model,
|
|
args.observation_type,
|
|
args.result_dir,
|
|
test_all_meta,
|
|
)
|
|
left_info = ""
|
|
for domain in test_file_list:
|
|
left_info += f"{domain}: {len(test_file_list[domain])}\n"
|
|
logger.info(f"Left tasks:\n{left_info}")
|
|
|
|
get_result(
|
|
args.action_space,
|
|
args.model,
|
|
args.observation_type,
|
|
args.result_dir,
|
|
test_all_meta,
|
|
)
|
|
test(args, test_file_list)
|
|
except KeyboardInterrupt:
|
|
logger.info("Main process received KeyboardInterrupt.")
|
|
# Signal handler will take care of cleanup
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error in main process: {e}", exc_info=True)
|
|
# Also trigger cleanup for unhandled exceptions
|
|
signal_handler(signal.SIGTERM, None)
|
|
finally:
|
|
# Final cleanup in case any environments or processes remain
|
|
logger.info("Main process final cleanup...")
|
|
for env in active_environments:
|
|
if env is not None:
|
|
try:
|
|
logger.info(f"Closing environment in final cleanup...")
|
|
env.close()
|
|
logger.info(f"Environment closed successfully in final cleanup")
|
|
except Exception as e:
|
|
logger.error(f"Error during final environment cleanup: {e}")
|
|
|
|
# First try gentle termination
|
|
for p in processes:
|
|
if p is not None and p.is_alive():
|
|
try:
|
|
logger.info(f"Terminating process {p.name}...")
|
|
p.terminate()
|
|
except Exception as e:
|
|
logger.error(f"Error terminating process: {e}")
|
|
|
|
# Wait a moment for processes to terminate
|
|
time.sleep(1)
|
|
|
|
# Then force kill if needed
|
|
for p in processes:
|
|
if p is not None and p.is_alive():
|
|
try:
|
|
logger.info(f"Force killing process {p.name}...")
|
|
os.kill(p.pid, signal.SIGKILL)
|
|
logger.info(f"Process {p.name} force killed")
|
|
except Exception as e:
|
|
logger.error(f"Error force killing process: {e}")
|