Files
sci-gui-agent-benchmark/scripts/core/recorder.py
2026-01-12 18:30:12 +08:00

296 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
轨迹录制器
监听鼠标键盘事件,记录操作轨迹和截图
"""
import time
import json
import os
from datetime import datetime
from pynput import mouse, keyboard
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Recorder:
"""轨迹录制器 - Host端事件驱动录制"""
def __init__(self, jade_env, task_id, output_dir):
"""
初始化录制器
Args:
jade_env: JadeEnv实例
task_id: 任务ID
output_dir: 输出目录human_demo/
"""
self.env = jade_env
self.task_id = task_id
self.output_dir = output_dir
# 创建输出目录
self.screens_dir = os.path.join(output_dir, "screens")
os.makedirs(self.screens_dir, exist_ok=True)
# 数据结构
self.actions = []
self.metadata = {}
self.start_time = None
self.screenshot_counter = 0
# 监听器
self.mouse_listener = None
self.keyboard_listener = None
# 状态
self.is_recording = False
logger.info(f"录制器初始化: 任务={task_id}")
def start(self):
"""开始录制"""
if self.is_recording:
logger.warning("录制已在进行中")
return
self.is_recording = True
self.start_time = time.time()
# 获取虚拟机屏幕信息
try:
screen_info = self.env.get_screen_info()
self.metadata = {
"task_id": self.task_id,
"vm_resolution": [screen_info['screen_width'], screen_info['screen_height']],
"vm_screenshot_resolution": [screen_info['screenshot_width'], screen_info['screenshot_height']],
"vm_dpi_scale": screen_info['dpi_scale'],
"recording_start": datetime.now().isoformat(),
"recording_end": None
}
logger.info(f"虚拟机分辨率: {screen_info['screen_width']}x{screen_info['screen_height']}")
logger.info(f"截图分辨率: {screen_info['screenshot_width']}x{screen_info['screenshot_height']}")
except Exception as e:
logger.warning(f"获取屏幕信息失败: {e}")
self.metadata = {
"task_id": self.task_id,
"recording_start": datetime.now().isoformat(),
"recording_end": None
}
# 记录初始截图
self._capture_screenshot("initial")
# 启动监听器
self.mouse_listener = mouse.Listener(
on_click=self._on_mouse_click,
on_scroll=self._on_mouse_scroll
)
self.keyboard_listener = keyboard.Listener(
on_press=self._on_key_press
)
self.mouse_listener.start()
self.keyboard_listener.start()
logger.info("✅ 录制已启动")
print("\n" + "=" * 60)
print("🎥 录制进行中...")
print("💡 提示:")
print(" - 请在VMware窗口中操作JADE")
print(" - 每次点击都会自动截图")
print(" - 按 Ctrl+C 停止录制")
print("=" * 60 + "\n")
def _on_mouse_click(self, x, y, button, pressed):
"""鼠标点击事件处理"""
if not self.is_recording or not pressed:
return
# 核心修改:立刻从虚拟机获取真实物理坐标
vm_x, vm_y = self.env.get_mouse_pos()
elapsed = time.time() - self.start_time
# 记录动作
action = {
"t": round(elapsed, 3),
"type": "click",
"button": str(button).replace("Button.", ""),
"pos_host": [x, y], # Mac 逻辑坐标(留作参考)
"pos_vm": [vm_x, vm_y] if vm_x is not None else None # 真实VM物理坐标
}
# 截图
screenshot_filename = self._capture_screenshot("click")
action["screenshot"] = screenshot_filename
self.actions.append(action)
if vm_x is not None:
logger.info(f"[{elapsed:.1f}s] 点击: VM({vm_x}, {vm_y}) [Host: {int(x)}, {int(y)}] {action['button']}")
else:
logger.info(f"[{elapsed:.1f}s] 点击: Host({int(x)}, {int(y)}) [VM获取失败] {action['button']}")
def _on_mouse_scroll(self, x, y, dx, dy):
"""鼠标滚轮事件处理"""
if not self.is_recording:
return
elapsed = time.time() - self.start_time
action = {
"t": round(elapsed, 3),
"type": "scroll",
"pos_host": [x, y],
"delta": [dx, dy],
"pos_vm": None
}
self.actions.append(action)
logger.debug(f"[{elapsed:.1f}s] 滚轮: ({x}, {y}) delta=({dx}, {dy})")
def _on_key_press(self, key):
"""键盘按键事件处理"""
if not self.is_recording:
return
elapsed = time.time() - self.start_time
# 转换按键名称
try:
if hasattr(key, 'char') and key.char:
key_name = key.char
else:
key_name = str(key).replace("Key.", "")
except:
key_name = str(key)
action = {
"t": round(elapsed, 3),
"type": "key",
"key": key_name
}
self.actions.append(action)
logger.debug(f"[{elapsed:.1f}s] 按键: {key_name}")
def _capture_screenshot(self, tag=""):
"""
捕获截图
Args:
tag: 标签(用于文件名)
Returns:
str: 截图相对路径
"""
try:
screenshot = self.env.get_screenshot()
# 生成文件名
self.screenshot_counter += 1
if tag:
filename = f"{self.screenshot_counter:04d}_{tag}.png"
else:
filename = f"{self.screenshot_counter:04d}.png"
filepath = os.path.join(self.screens_dir, filename)
screenshot.save(filepath)
logger.debug(f"截图保存: {filename}")
return f"screens/{filename}"
except Exception as e:
logger.error(f"截图失败: {e}")
return None
def stop(self):
"""停止录制"""
if not self.is_recording:
logger.warning("录制未在进行中")
return
self.is_recording = False
# 停止监听器
if self.mouse_listener:
self.mouse_listener.stop()
if self.keyboard_listener:
self.keyboard_listener.stop()
# 记录结束截图
self._capture_screenshot("final")
# 更新元数据
self.metadata["recording_end"] = datetime.now().isoformat()
self.metadata["total_duration"] = round(time.time() - self.start_time, 2)
self.metadata["total_actions"] = len(self.actions)
self.metadata["total_screenshots"] = self.screenshot_counter
logger.info("✅ 录制已停止")
def save(self):
"""保存轨迹数据"""
if self.is_recording:
logger.warning("录制仍在进行,先停止录制")
self.stop()
# 保存原始数据(未处理坐标)
output_data = {
"metadata": self.metadata,
"actions": self.actions
}
raw_path = os.path.join(self.output_dir, "actions_raw.json")
with open(raw_path, 'w', encoding='utf-8') as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
logger.info(f"✅ 轨迹数据已保存: {raw_path}")
logger.info(f" - 总动作数: {len(self.actions)}")
logger.info(f" - 截图数: {self.screenshot_counter}")
logger.info(f" - 总时长: {self.metadata.get('total_duration', 0):.1f}")
print("\n" + "=" * 60)
print("📊 录制统计:")
print(f" 动作数: {len(self.actions)}")
print(f" 截图数: {self.screenshot_counter}")
print(f" 时长: {self.metadata.get('total_duration', 0):.1f}")
print(f" 保存位置: {raw_path}")
print("=" * 60)
print("\n💡 下一步:运行坐标转换")
print(f" python scripts/tools/process_trajectory.py {self.task_id}")
print("=" * 60 + "\n")
def record_interactive(jade_env, task_id, output_dir):
"""
交互式录制带Ctrl+C停止
Args:
jade_env: JadeEnv实例
task_id: 任务ID
output_dir: 输出目录
"""
recorder = Recorder(jade_env, task_id, output_dir)
recorder.start()
try:
# 保持录制状态直到Ctrl+C
while recorder.is_recording:
time.sleep(0.1)
except KeyboardInterrupt:
print("\n\n⏹ 收到停止信号...")
finally:
recorder.stop()
recorder.save()
return recorder
if __name__ == "__main__":
print("Recorder 独立测试模式")
print("提示: 通常应该通过 collect_task.py 调用")