feat: enhance logging and signal handling in run_multienv_claude.py
- Refactored logging configuration to support dynamic log levels via command-line arguments, allowing for better control over log verbosity. - Introduced a new signal handler for graceful shutdown of environments and processes, improving robustness during termination. - Added functionality to save command-line arguments to a JSON file for better traceability of execution parameters. - Maintained existing logic while enhancing the overall structure and error handling capabilities of the script.
This commit is contained in:
@@ -8,48 +8,24 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
from typing import List, Dict
|
import signal
|
||||||
import math
|
import time
|
||||||
from tqdm import tqdm
|
from typing import List
|
||||||
from multiprocessing import Process, Manager, current_process
|
from multiprocessing import Process, Manager, current_process
|
||||||
import lib_run_single
|
import lib_run_single
|
||||||
from desktop_env.desktop_env import DesktopEnv
|
from desktop_env.desktop_env import DesktopEnv
|
||||||
from mm_agents.anthropic import AnthropicAgent
|
from mm_agents.anthropic import AnthropicAgent
|
||||||
|
|
||||||
|
# Global variables for signal handling
|
||||||
|
active_environments = []
|
||||||
|
processes = []
|
||||||
|
is_terminating = False
|
||||||
|
|
||||||
# .env
|
# .env
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
|
|
||||||
# Logger Configs {{{ #
|
# Logger Configs {{{ #
|
||||||
logger = logging.getLogger()
|
|
||||||
logger.setLevel(logging.INFO)
|
|
||||||
|
|
||||||
datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
|
|
||||||
|
|
||||||
file_handler = logging.FileHandler(
|
|
||||||
os.path.join("logs", "normal-{:}.log".format(datetime_str)), encoding="utf-8"
|
|
||||||
)
|
|
||||||
stdout_handler = logging.StreamHandler(sys.stdout)
|
|
||||||
|
|
||||||
file_handler.setLevel(logging.INFO)
|
|
||||||
stdout_handler.setLevel(logging.INFO)
|
|
||||||
|
|
||||||
formatter = logging.Formatter(
|
|
||||||
fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s"
|
|
||||||
)
|
|
||||||
file_handler.setFormatter(formatter)
|
|
||||||
stdout_handler.setFormatter(formatter)
|
|
||||||
|
|
||||||
stdout_handler.addFilter(logging.Filter("desktopenv"))
|
|
||||||
|
|
||||||
logger.addHandler(file_handler)
|
|
||||||
logger.addHandler(stdout_handler)
|
|
||||||
# }}} Logger Configs #
|
|
||||||
|
|
||||||
logger = logging.getLogger("desktopenv.experiment")
|
|
||||||
|
|
||||||
|
|
||||||
def config() -> argparse.Namespace:
|
def config() -> argparse.Namespace:
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
description="Run end-to-end evaluation on the benchmark"
|
description="Run end-to-end evaluation on the benchmark"
|
||||||
@@ -106,6 +82,8 @@ def config() -> argparse.Namespace:
|
|||||||
# logging related
|
# logging related
|
||||||
parser.add_argument("--result_dir", type=str, default="./results")
|
parser.add_argument("--result_dir", type=str, default="./results")
|
||||||
parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to run in parallel")
|
parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to run in parallel")
|
||||||
|
parser.add_argument("--log_level", type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
|
||||||
|
default='INFO', help="Set the logging level")
|
||||||
|
|
||||||
# aws config
|
# aws config
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
@@ -115,6 +93,42 @@ def config() -> argparse.Namespace:
|
|||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
return args
|
return args
|
||||||
|
|
||||||
|
args = config() # Get command line arguments first
|
||||||
|
|
||||||
|
logger = logging.getLogger()
|
||||||
|
log_level = getattr(logging, args.log_level.upper())
|
||||||
|
logger.setLevel(log_level)
|
||||||
|
|
||||||
|
datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
|
||||||
|
|
||||||
|
file_handler = logging.FileHandler(
|
||||||
|
os.path.join("logs", "normal-{:}.log".format(datetime_str)), encoding="utf-8"
|
||||||
|
)
|
||||||
|
debug_handler = logging.FileHandler(
|
||||||
|
os.path.join("logs", "debug-{:}.log".format(datetime_str)), encoding="utf-8"
|
||||||
|
)
|
||||||
|
stdout_handler = logging.StreamHandler(sys.stdout)
|
||||||
|
|
||||||
|
file_handler.setLevel(logging.INFO)
|
||||||
|
debug_handler.setLevel(logging.DEBUG)
|
||||||
|
stdout_handler.setLevel(log_level)
|
||||||
|
|
||||||
|
formatter = logging.Formatter(
|
||||||
|
fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s"
|
||||||
|
)
|
||||||
|
file_handler.setFormatter(formatter)
|
||||||
|
debug_handler.setFormatter(formatter)
|
||||||
|
stdout_handler.setFormatter(formatter)
|
||||||
|
|
||||||
|
stdout_handler.addFilter(logging.Filter("desktopenv"))
|
||||||
|
|
||||||
|
logger.addHandler(file_handler)
|
||||||
|
logger.addHandler(debug_handler)
|
||||||
|
logger.addHandler(stdout_handler)
|
||||||
|
# }}} Logger Configs #
|
||||||
|
|
||||||
|
logger = logging.getLogger("desktopenv.experiment")
|
||||||
|
|
||||||
|
|
||||||
def distribute_tasks(test_all_meta: dict) -> List[tuple]:
|
def distribute_tasks(test_all_meta: dict) -> List[tuple]:
|
||||||
"""Distribute tasks evenly across environments."""
|
"""Distribute tasks evenly across environments."""
|
||||||
@@ -127,6 +141,27 @@ def distribute_tasks(test_all_meta: dict) -> List[tuple]:
|
|||||||
return all_tasks
|
return all_tasks
|
||||||
|
|
||||||
|
|
||||||
|
def process_signal_handler(signum, frame, env_idx):
|
||||||
|
"""Signal handler for child processes to gracefully shut down their environments."""
|
||||||
|
logger.info(f"Process {env_idx + 1} received signal {signum}. Shutting down...")
|
||||||
|
|
||||||
|
# Get the active_environments from the caller's frame
|
||||||
|
local_vars = frame.f_locals
|
||||||
|
active_environments = local_vars.get('active_environments', [])
|
||||||
|
|
||||||
|
# Close environment in the current process context
|
||||||
|
for env in active_environments:
|
||||||
|
if env is not None:
|
||||||
|
try:
|
||||||
|
logger.info(f"Process {env_idx + 1} closing environment...")
|
||||||
|
env.close()
|
||||||
|
logger.info(f"Process {env_idx + 1} environment closed successfully")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Process {env_idx + 1} error closing environment: {e}")
|
||||||
|
|
||||||
|
logger.info(f"Process {env_idx + 1} shutdown complete. Exiting.")
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
|
||||||
def run_env_tasks(task_queue, args, shared_scores):
|
def run_env_tasks(task_queue, args, shared_scores):
|
||||||
"""Run tasks for a single environment."""
|
"""Run tasks for a single environment."""
|
||||||
@@ -235,28 +270,18 @@ def run_env_tasks(task_queue, args, shared_scores):
|
|||||||
logger.error(f"{current_process().name} error during environment cleanup: {e}")
|
logger.error(f"{current_process().name} error during environment cleanup: {e}")
|
||||||
|
|
||||||
|
|
||||||
def process_signal_handler(signum, frame, env_idx):
|
|
||||||
logger.info(f"Process {env_idx + 1} received signal {signum}. Shutting down...")
|
|
||||||
local_vars = frame.f_locals
|
|
||||||
active_environments = local_vars.get('active_environments', [])
|
|
||||||
for env in active_environments:
|
|
||||||
if env is not None:
|
|
||||||
try:
|
|
||||||
logger.info(f"Process {env_idx + 1} closing environment...")
|
|
||||||
env.close()
|
|
||||||
logger.info(f"Process {env_idx + 1} environment closed successfully")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Process {env_idx + 1} error closing environment: {e}")
|
|
||||||
logger.info(f"Process {env_idx + 1} shutdown complete. Exiting.")
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
|
|
||||||
def signal_handler(signum, frame):
|
def signal_handler(signum, frame):
|
||||||
|
"""Handle termination signals (SIGINT, SIGTERM) to gracefully shutdown environments."""
|
||||||
global is_terminating, active_environments, processes
|
global is_terminating, active_environments, processes
|
||||||
|
|
||||||
|
# Avoid duplicate handling
|
||||||
if is_terminating:
|
if is_terminating:
|
||||||
return
|
return
|
||||||
|
|
||||||
is_terminating = True
|
is_terminating = True
|
||||||
logger.info(f"Received signal {signum}. Gracefully shutting down...")
|
logger.info(f"Received signal {signum}. Gracefully shutting down...")
|
||||||
|
|
||||||
|
# Close all registered environments in the main process
|
||||||
for env in active_environments:
|
for env in active_environments:
|
||||||
try:
|
try:
|
||||||
logger.info(f"Closing environment...")
|
logger.info(f"Closing environment...")
|
||||||
@@ -264,6 +289,8 @@ def signal_handler(signum, frame):
|
|||||||
logger.info(f"Environment closed successfully")
|
logger.info(f"Environment closed successfully")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error closing environment: {e}")
|
logger.error(f"Error closing environment: {e}")
|
||||||
|
|
||||||
|
# Send termination signal to all child processes first
|
||||||
for p in processes:
|
for p in processes:
|
||||||
if p.is_alive():
|
if p.is_alive():
|
||||||
try:
|
try:
|
||||||
@@ -271,7 +298,11 @@ def signal_handler(signum, frame):
|
|||||||
p.terminate()
|
p.terminate()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error sending termination signal to process: {e}")
|
logger.error(f"Error sending termination signal to process: {e}")
|
||||||
|
|
||||||
|
# Allow a short time for processes to handle their own cleanup
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Forcefully terminate any processes that didn't exit
|
||||||
for p in processes:
|
for p in processes:
|
||||||
if p.is_alive():
|
if p.is_alive():
|
||||||
try:
|
try:
|
||||||
@@ -280,6 +311,7 @@ def signal_handler(signum, frame):
|
|||||||
os.kill(p.pid, sig.SIGKILL)
|
os.kill(p.pid, sig.SIGKILL)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error forcefully terminating process: {e}")
|
logger.error(f"Error forcefully terminating process: {e}")
|
||||||
|
|
||||||
logger.info("Shutdown complete. Exiting.")
|
logger.info("Shutdown complete. Exiting.")
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
@@ -424,12 +456,25 @@ def get_result(action_space, use_model, observation_type, result_dir, total_file
|
|||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
####### The complete version of the list of examples #######
|
####### The complete version of the list of examples #######
|
||||||
os.environ["TOKENIZERS_PARALLELISM"] = "false"
|
os.environ["TOKENIZERS_PARALLELISM"] = "false"
|
||||||
import signal
|
|
||||||
import time
|
# Register signal handlers for graceful termination
|
||||||
signal.signal(signal.SIGINT, signal_handler)
|
signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C
|
||||||
signal.signal(signal.SIGTERM, signal_handler)
|
signal.signal(signal.SIGTERM, signal_handler) # Handle termination signal
|
||||||
|
|
||||||
try:
|
try:
|
||||||
args = config()
|
# args already defined globally above
|
||||||
|
|
||||||
|
# save args to json in result_dir/action_space/observation_type/model/args.json
|
||||||
|
path_to_args = os.path.join(
|
||||||
|
args.result_dir,
|
||||||
|
args.action_space,
|
||||||
|
args.observation_type,
|
||||||
|
args.model,
|
||||||
|
"args.json",
|
||||||
|
)
|
||||||
|
os.makedirs(os.path.dirname(path_to_args), exist_ok=True)
|
||||||
|
with open(path_to_args, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(vars(args), f, indent=4)
|
||||||
|
|
||||||
with open(args.test_all_meta_path, "r", encoding="utf-8") as f:
|
with open(args.test_all_meta_path, "r", encoding="utf-8") as f:
|
||||||
test_all_meta = json.load(f)
|
test_all_meta = json.load(f)
|
||||||
@@ -459,8 +504,41 @@ if __name__ == "__main__":
|
|||||||
test(args, test_file_list)
|
test(args, test_file_list)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
logger.info("Main process received KeyboardInterrupt.")
|
logger.info("Main process received KeyboardInterrupt.")
|
||||||
|
# Signal handler will take care of cleanup
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Unexpected error in main process: {e}", exc_info=True)
|
logger.error(f"Unexpected error in main process: {e}", exc_info=True)
|
||||||
|
# Also trigger cleanup for unhandled exceptions
|
||||||
signal_handler(signal.SIGTERM, None)
|
signal_handler(signal.SIGTERM, None)
|
||||||
finally:
|
finally:
|
||||||
|
# Final cleanup in case any environments or processes remain
|
||||||
logger.info("Main process final cleanup...")
|
logger.info("Main process final cleanup...")
|
||||||
|
for env in active_environments:
|
||||||
|
if env is not None:
|
||||||
|
try:
|
||||||
|
logger.info(f"Closing environment in final cleanup...")
|
||||||
|
env.close()
|
||||||
|
logger.info(f"Environment closed successfully in final cleanup")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error during final environment cleanup: {e}")
|
||||||
|
|
||||||
|
# First try gentle termination
|
||||||
|
for p in processes:
|
||||||
|
if p is not None and p.is_alive():
|
||||||
|
try:
|
||||||
|
logger.info(f"Terminating process {p.name}...")
|
||||||
|
p.terminate()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error terminating process: {e}")
|
||||||
|
|
||||||
|
# Wait a moment for processes to terminate
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Then force kill if needed
|
||||||
|
for p in processes:
|
||||||
|
if p is not None and p.is_alive():
|
||||||
|
try:
|
||||||
|
logger.info(f"Force killing process {p.name}...")
|
||||||
|
os.kill(p.pid, signal.SIGKILL)
|
||||||
|
logger.info(f"Process {p.name} force killed")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error force killing process: {e}")
|
||||||
Reference in New Issue
Block a user