From a37fe869258f0b02b60726a01bcd9765b53be89e Mon Sep 17 00:00:00 2001 From: yuanmengqi Date: Mon, 28 Jul 2025 07:43:13 +0000 Subject: [PATCH] feat: enhance logging and signal handling in run_multienv_claude.py - Refactored logging configuration to support dynamic log levels via command-line arguments, allowing for better control over log verbosity. - Introduced a new signal handler for graceful shutdown of environments and processes, improving robustness during termination. - Added functionality to save command-line arguments to a JSON file for better traceability of execution parameters. - Maintained existing logic while enhancing the overall structure and error handling capabilities of the script. --- run_multienv_claude.py | 186 +++++++++++++++++++++++++++++------------ 1 file changed, 132 insertions(+), 54 deletions(-) diff --git a/run_multienv_claude.py b/run_multienv_claude.py index 6f6d3b7..ca6aba2 100644 --- a/run_multienv_claude.py +++ b/run_multienv_claude.py @@ -8,48 +8,24 @@ import json import logging import os import sys -from typing import List, Dict -import math -from tqdm import tqdm +import signal +import time +from typing import List from multiprocessing import Process, Manager, current_process import lib_run_single from desktop_env.desktop_env import DesktopEnv from mm_agents.anthropic import AnthropicAgent +# Global variables for signal handling +active_environments = [] +processes = [] +is_terminating = False + # .env from dotenv import load_dotenv load_dotenv() - # Logger Configs {{{ # -logger = logging.getLogger() -logger.setLevel(logging.INFO) - -datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S") - -file_handler = logging.FileHandler( - os.path.join("logs", "normal-{:}.log".format(datetime_str)), encoding="utf-8" -) -stdout_handler = logging.StreamHandler(sys.stdout) - -file_handler.setLevel(logging.INFO) -stdout_handler.setLevel(logging.INFO) - -formatter = logging.Formatter( - fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s" -) -file_handler.setFormatter(formatter) -stdout_handler.setFormatter(formatter) - -stdout_handler.addFilter(logging.Filter("desktopenv")) - -logger.addHandler(file_handler) -logger.addHandler(stdout_handler) -# }}} Logger Configs # - -logger = logging.getLogger("desktopenv.experiment") - - def config() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Run end-to-end evaluation on the benchmark" @@ -106,6 +82,8 @@ def config() -> argparse.Namespace: # logging related parser.add_argument("--result_dir", type=str, default="./results") parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to run in parallel") + parser.add_argument("--log_level", type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], + default='INFO', help="Set the logging level") # aws config parser.add_argument( @@ -115,6 +93,42 @@ def config() -> argparse.Namespace: args = parser.parse_args() return args +args = config() # Get command line arguments first + +logger = logging.getLogger() +log_level = getattr(logging, args.log_level.upper()) +logger.setLevel(log_level) + +datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S") + +file_handler = logging.FileHandler( + os.path.join("logs", "normal-{:}.log".format(datetime_str)), encoding="utf-8" +) +debug_handler = logging.FileHandler( + os.path.join("logs", "debug-{:}.log".format(datetime_str)), encoding="utf-8" +) +stdout_handler = logging.StreamHandler(sys.stdout) + +file_handler.setLevel(logging.INFO) +debug_handler.setLevel(logging.DEBUG) +stdout_handler.setLevel(log_level) + +formatter = logging.Formatter( + fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s" +) +file_handler.setFormatter(formatter) +debug_handler.setFormatter(formatter) +stdout_handler.setFormatter(formatter) + +stdout_handler.addFilter(logging.Filter("desktopenv")) + +logger.addHandler(file_handler) +logger.addHandler(debug_handler) +logger.addHandler(stdout_handler) +# }}} Logger Configs # + +logger = logging.getLogger("desktopenv.experiment") + def distribute_tasks(test_all_meta: dict) -> List[tuple]: """Distribute tasks evenly across environments.""" @@ -127,6 +141,27 @@ def distribute_tasks(test_all_meta: dict) -> List[tuple]: return all_tasks +def process_signal_handler(signum, frame, env_idx): + """Signal handler for child processes to gracefully shut down their environments.""" + logger.info(f"Process {env_idx + 1} received signal {signum}. Shutting down...") + + # Get the active_environments from the caller's frame + local_vars = frame.f_locals + active_environments = local_vars.get('active_environments', []) + + # Close environment in the current process context + for env in active_environments: + if env is not None: + try: + logger.info(f"Process {env_idx + 1} closing environment...") + env.close() + logger.info(f"Process {env_idx + 1} environment closed successfully") + except Exception as e: + logger.error(f"Process {env_idx + 1} error closing environment: {e}") + + logger.info(f"Process {env_idx + 1} shutdown complete. Exiting.") + sys.exit(0) + def run_env_tasks(task_queue, args, shared_scores): """Run tasks for a single environment.""" @@ -235,28 +270,18 @@ def run_env_tasks(task_queue, args, shared_scores): logger.error(f"{current_process().name} error during environment cleanup: {e}") -def process_signal_handler(signum, frame, env_idx): - logger.info(f"Process {env_idx + 1} received signal {signum}. Shutting down...") - local_vars = frame.f_locals - active_environments = local_vars.get('active_environments', []) - for env in active_environments: - if env is not None: - try: - logger.info(f"Process {env_idx + 1} closing environment...") - env.close() - logger.info(f"Process {env_idx + 1} environment closed successfully") - except Exception as e: - logger.error(f"Process {env_idx + 1} error closing environment: {e}") - logger.info(f"Process {env_idx + 1} shutdown complete. Exiting.") - sys.exit(0) - - def signal_handler(signum, frame): + """Handle termination signals (SIGINT, SIGTERM) to gracefully shutdown environments.""" global is_terminating, active_environments, processes + + # Avoid duplicate handling if is_terminating: return + is_terminating = True logger.info(f"Received signal {signum}. Gracefully shutting down...") + + # Close all registered environments in the main process for env in active_environments: try: logger.info(f"Closing environment...") @@ -264,6 +289,8 @@ def signal_handler(signum, frame): logger.info(f"Environment closed successfully") except Exception as e: logger.error(f"Error closing environment: {e}") + + # Send termination signal to all child processes first for p in processes: if p.is_alive(): try: @@ -271,7 +298,11 @@ def signal_handler(signum, frame): p.terminate() except Exception as e: logger.error(f"Error sending termination signal to process: {e}") + + # Allow a short time for processes to handle their own cleanup time.sleep(1) + + # Forcefully terminate any processes that didn't exit for p in processes: if p.is_alive(): try: @@ -280,6 +311,7 @@ def signal_handler(signum, frame): os.kill(p.pid, sig.SIGKILL) except Exception as e: logger.error(f"Error forcefully terminating process: {e}") + logger.info("Shutdown complete. Exiting.") sys.exit(0) @@ -424,12 +456,25 @@ def get_result(action_space, use_model, observation_type, result_dir, total_file if __name__ == "__main__": ####### The complete version of the list of examples ####### os.environ["TOKENIZERS_PARALLELISM"] = "false" - import signal - import time - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) + + # Register signal handlers for graceful termination + signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C + signal.signal(signal.SIGTERM, signal_handler) # Handle termination signal + try: - args = config() + # args already defined globally above + + # save args to json in result_dir/action_space/observation_type/model/args.json + path_to_args = os.path.join( + args.result_dir, + args.action_space, + args.observation_type, + args.model, + "args.json", + ) + os.makedirs(os.path.dirname(path_to_args), exist_ok=True) + with open(path_to_args, "w", encoding="utf-8") as f: + json.dump(vars(args), f, indent=4) with open(args.test_all_meta_path, "r", encoding="utf-8") as f: test_all_meta = json.load(f) @@ -459,8 +504,41 @@ if __name__ == "__main__": test(args, test_file_list) except KeyboardInterrupt: logger.info("Main process received KeyboardInterrupt.") + # Signal handler will take care of cleanup except Exception as e: logger.error(f"Unexpected error in main process: {e}", exc_info=True) + # Also trigger cleanup for unhandled exceptions signal_handler(signal.SIGTERM, None) finally: - logger.info("Main process final cleanup...") \ No newline at end of file + # Final cleanup in case any environments or processes remain + logger.info("Main process final cleanup...") + for env in active_environments: + if env is not None: + try: + logger.info(f"Closing environment in final cleanup...") + env.close() + logger.info(f"Environment closed successfully in final cleanup") + except Exception as e: + logger.error(f"Error during final environment cleanup: {e}") + + # First try gentle termination + for p in processes: + if p is not None and p.is_alive(): + try: + logger.info(f"Terminating process {p.name}...") + p.terminate() + except Exception as e: + logger.error(f"Error terminating process: {e}") + + # Wait a moment for processes to terminate + time.sleep(1) + + # Then force kill if needed + for p in processes: + if p is not None and p.is_alive(): + try: + logger.info(f"Force killing process {p.name}...") + os.kill(p.pid, signal.SIGKILL) + logger.info(f"Process {p.name} force killed") + except Exception as e: + logger.error(f"Error force killing process: {e}") \ No newline at end of file