fix chrome
This commit is contained in:
309
manual_examine.py
Normal file
309
manual_examine.py
Normal file
@@ -0,0 +1,309 @@
|
||||
from __future__ import annotations
|
||||
import argparse
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import signal
|
||||
import time
|
||||
from typing import List, Dict
|
||||
from tqdm import tqdm
|
||||
from desktop_env.desktop_env import DesktopEnv
|
||||
|
||||
# Global variables for signal handling
|
||||
active_environment = None
|
||||
is_terminating = False
|
||||
|
||||
# load the environment variables from .env file
|
||||
if os.path.exists(".env"):
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
# Logger Configs {{{ #
|
||||
def config() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Manual examination of benchmark tasks"
|
||||
)
|
||||
|
||||
# environment config
|
||||
parser.add_argument("--path_to_vm", type=str, default=None)
|
||||
parser.add_argument(
|
||||
"--headless", action="store_true", help="Run in headless machine"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--action_space", type=str, default="pyautogui", help="Action type"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--observation_type",
|
||||
choices=["screenshot", "a11y_tree", "screenshot_a11y_tree", "som"],
|
||||
default="screenshot",
|
||||
help="Observation type",
|
||||
)
|
||||
parser.add_argument("--screen_width", type=int, default=1920)
|
||||
parser.add_argument("--screen_height", type=int, default=1080)
|
||||
parser.add_argument("--sleep_after_execution", type=float, default=0.0)
|
||||
parser.add_argument("--max_steps", type=int, default=15)
|
||||
|
||||
# agent config
|
||||
parser.add_argument("--max_trajectory_length", type=int, default=3)
|
||||
parser.add_argument(
|
||||
"--test_config_base_dir", type=str, default="evaluation_examples"
|
||||
)
|
||||
|
||||
# example config
|
||||
parser.add_argument("--domain", type=str, required=True, help="Specific domain to examine")
|
||||
parser.add_argument("--example_id", type=str, required=True, help="Specific example ID to examine")
|
||||
parser.add_argument(
|
||||
"--test_all_meta_path", type=str, default="evaluation_examples/test_all.json"
|
||||
)
|
||||
|
||||
# logging related
|
||||
parser.add_argument("--result_dir", type=str, default="./results_manual")
|
||||
parser.add_argument("--log_level", type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
|
||||
default='INFO', help="Set the logging level")
|
||||
|
||||
# aws config
|
||||
parser.add_argument(
|
||||
"--region", type=str, default="us-east-1", help="AWS region for the VM"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
return args
|
||||
|
||||
args = config() # Get command line arguments first
|
||||
|
||||
logger = logging.getLogger()
|
||||
log_level = getattr(logging, args.log_level.upper())
|
||||
logger.setLevel(log_level)
|
||||
|
||||
datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
|
||||
|
||||
file_handler = logging.FileHandler(
|
||||
os.path.join("logs", "manual-{:}.log".format(datetime_str)), encoding="utf-8"
|
||||
)
|
||||
debug_handler = logging.FileHandler(
|
||||
os.path.join("logs", "manual-debug-{:}.log".format(datetime_str)), encoding="utf-8"
|
||||
)
|
||||
stdout_handler = logging.StreamHandler(sys.stdout)
|
||||
|
||||
file_handler.setLevel(logging.INFO)
|
||||
debug_handler.setLevel(logging.DEBUG)
|
||||
stdout_handler.setLevel(log_level)
|
||||
|
||||
formatter = logging.Formatter(
|
||||
fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s"
|
||||
)
|
||||
file_handler.setFormatter(formatter)
|
||||
debug_handler.setFormatter(formatter)
|
||||
stdout_handler.setFormatter(formatter)
|
||||
|
||||
stdout_handler.addFilter(logging.Filter("desktopenv"))
|
||||
|
||||
logger.addHandler(file_handler)
|
||||
logger.addHandler(debug_handler)
|
||||
logger.addHandler(stdout_handler)
|
||||
# }}} Logger Configs #
|
||||
|
||||
logger = logging.getLogger("desktopenv.experiment")
|
||||
|
||||
|
||||
def setup_example_logger(example, example_result_dir):
|
||||
"""设置特定样例的日志记录器"""
|
||||
runtime_logger = logging.getLogger(f"desktopenv.example.{example['id']}")
|
||||
runtime_logger.setLevel(logging.DEBUG)
|
||||
runtime_logger.addHandler(logging.FileHandler(os.path.join(example_result_dir, "runtime.log")))
|
||||
return runtime_logger
|
||||
|
||||
|
||||
def run_manual_examination(env, example, instruction, args, example_result_dir):
|
||||
"""手动检查单个样例的函数"""
|
||||
runtime_logger = setup_example_logger(example, example_result_dir)
|
||||
|
||||
# 重置环境并加载任务配置
|
||||
env.reset(task_config=example)
|
||||
logger.info("环境正在初始化,请等待60秒...")
|
||||
time.sleep(60) # Wait for the environment to be ready
|
||||
|
||||
# 获取初始观察
|
||||
obs = env._get_obs()
|
||||
|
||||
# 保存初始状态截图
|
||||
initial_timestamp = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
|
||||
with open(os.path.join(example_result_dir, f"initial_state_{initial_timestamp}.png"), "wb") as f:
|
||||
f.write(obs['screenshot'])
|
||||
|
||||
# 记录任务信息
|
||||
with open(os.path.join(example_result_dir, "task_info.json"), "w", encoding="utf-8") as f:
|
||||
json.dump({
|
||||
"domain": args.domain,
|
||||
"example_id": args.example_id,
|
||||
"instruction": instruction,
|
||||
"initial_timestamp": initial_timestamp,
|
||||
"example_config": example
|
||||
}, f, indent=2, ensure_ascii=False)
|
||||
|
||||
# 开始录制
|
||||
env.controller.start_recording()
|
||||
|
||||
logger.info("="*80)
|
||||
logger.info(f"任务域: {args.domain}")
|
||||
logger.info(f"样例ID: {args.example_id}")
|
||||
logger.info(f"任务指令: {instruction}")
|
||||
logger.info("="*80)
|
||||
logger.info("环境已准备就绪!")
|
||||
logger.info("请在虚拟机中手动执行任务...")
|
||||
logger.info("完成后请按回车键继续进行评估...")
|
||||
logger.info("="*80)
|
||||
|
||||
# 阻塞等待用户手动操作
|
||||
try:
|
||||
input("按回车键开始评估...")
|
||||
except KeyboardInterrupt:
|
||||
logger.info("用户中断操作")
|
||||
return None
|
||||
|
||||
logger.info("开始评估...")
|
||||
|
||||
# 获取最终状态截图
|
||||
final_obs = env._get_obs()
|
||||
final_timestamp = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
|
||||
with open(os.path.join(example_result_dir, f"final_state_{final_timestamp}.png"), "wb") as f:
|
||||
f.write(final_obs['screenshot'])
|
||||
|
||||
# 评估结果
|
||||
result = env.evaluate()
|
||||
logger.info(f"评估结果: {result:.2f}")
|
||||
|
||||
# 保存结果
|
||||
with open(os.path.join(example_result_dir, "result.txt"), "w", encoding="utf-8") as f:
|
||||
f.write(f"{result}\n")
|
||||
|
||||
# 保存执行记录
|
||||
with open(os.path.join(example_result_dir, "execution_log.jsonl"), "w", encoding="utf-8") as f:
|
||||
f.write(json.dumps({
|
||||
"type": "manual_execution",
|
||||
"initial_timestamp": initial_timestamp,
|
||||
"final_timestamp": final_timestamp,
|
||||
"result": result,
|
||||
"initial_screenshot": f"initial_state_{initial_timestamp}.png",
|
||||
"final_screenshot": f"final_state_{final_timestamp}.png"
|
||||
}, ensure_ascii=False))
|
||||
f.write("\n")
|
||||
|
||||
# 结束录制
|
||||
env.controller.end_recording(os.path.join(example_result_dir, "recording.mp4"))
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def signal_handler(signum, frame):
|
||||
"""处理终止信号以优雅关闭环境"""
|
||||
global is_terminating, active_environment
|
||||
|
||||
# 避免重复处理
|
||||
if is_terminating:
|
||||
return
|
||||
|
||||
is_terminating = True
|
||||
logger.info(f"接收到信号 {signum}。正在优雅关闭...")
|
||||
|
||||
# 关闭环境
|
||||
if active_environment:
|
||||
try:
|
||||
logger.info("正在关闭环境...")
|
||||
active_environment.close()
|
||||
logger.info("环境已成功关闭")
|
||||
except Exception as e:
|
||||
logger.error(f"关闭环境时出错: {e}")
|
||||
|
||||
logger.info("关闭完成。退出程序。")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def main():
|
||||
global active_environment
|
||||
|
||||
# 注册信号处理器以优雅终止
|
||||
signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C
|
||||
signal.signal(signal.SIGTERM, signal_handler) # Handle termination signal
|
||||
|
||||
try:
|
||||
args = config()
|
||||
logger.info("参数: %s", args)
|
||||
|
||||
# 加载指定的任务
|
||||
config_file = os.path.join(
|
||||
args.test_config_base_dir, f"examples/{args.domain}/{args.example_id}.json"
|
||||
)
|
||||
|
||||
if not os.path.exists(config_file):
|
||||
logger.error(f"配置文件不存在: {config_file}")
|
||||
return
|
||||
|
||||
with open(config_file, "r", encoding="utf-8") as f:
|
||||
example = json.load(f)
|
||||
|
||||
# 创建结果目录
|
||||
example_result_dir = os.path.join(
|
||||
args.result_dir,
|
||||
args.action_space,
|
||||
args.observation_type,
|
||||
"manual_examination",
|
||||
args.domain,
|
||||
args.example_id,
|
||||
)
|
||||
os.makedirs(example_result_dir, exist_ok=True)
|
||||
|
||||
# 设置环境
|
||||
from desktop_env.providers.aws.manager import IMAGE_ID_MAP
|
||||
REGION = "us-east-1"
|
||||
active_environment = DesktopEnv(
|
||||
path_to_vm=args.path_to_vm,
|
||||
action_space=args.action_space,
|
||||
provider_name="aws",
|
||||
region=REGION,
|
||||
snapshot_name=IMAGE_ID_MAP[REGION],
|
||||
screen_size=(args.screen_width, args.screen_height),
|
||||
headless=args.headless,
|
||||
os_type="Ubuntu",
|
||||
require_a11y_tree=args.observation_type in ["a11y_tree", "screenshot_a11y_tree", "som"],
|
||||
enable_proxy=True
|
||||
)
|
||||
|
||||
# 执行手动检查
|
||||
result = run_manual_examination(
|
||||
active_environment,
|
||||
example,
|
||||
example["instruction"],
|
||||
args,
|
||||
example_result_dir
|
||||
)
|
||||
|
||||
if result is not None:
|
||||
logger.info(f"手动检查完成。最终结果: {result:.2f}")
|
||||
else:
|
||||
logger.info("手动检查被中断")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info("主进程接收到KeyboardInterrupt")
|
||||
# 信号处理器会处理清理工作
|
||||
except Exception as e:
|
||||
logger.error(f"主进程中的意外错误: {e}", exc_info=True)
|
||||
# 也触发清理
|
||||
signal_handler(signal.SIGTERM, None)
|
||||
finally:
|
||||
# 最终清理以防任何环境或进程仍然存在
|
||||
logger.info("主进程最终清理...")
|
||||
if active_environment is not None:
|
||||
try:
|
||||
logger.info("在最终清理中关闭环境...")
|
||||
active_environment.close()
|
||||
logger.info("在最终清理中环境已成功关闭")
|
||||
except Exception as e:
|
||||
logger.error(f"最终环境清理期间出错: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 禁用tokenizers并行处理避免警告
|
||||
os.environ["TOKENIZERS_PARALLELISM"] = "false"
|
||||
main()
|
||||
Reference in New Issue
Block a user