""" 轨迹录制器 监听鼠标键盘事件,记录操作轨迹和截图 """ import time import json import os from datetime import datetime from pynput import mouse, keyboard import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class Recorder: """轨迹录制器 - Host端事件驱动录制""" def __init__(self, jade_env, task_id, output_dir): """ 初始化录制器 Args: jade_env: JadeEnv实例 task_id: 任务ID output_dir: 输出目录(human_demo/) """ self.env = jade_env self.task_id = task_id self.output_dir = output_dir # 创建输出目录 self.screens_dir = os.path.join(output_dir, "screens") os.makedirs(self.screens_dir, exist_ok=True) # 数据结构 self.actions = [] self.metadata = {} self.start_time = None self.screenshot_counter = 0 # 监听器 self.mouse_listener = None self.keyboard_listener = None # 状态 self.is_recording = False logger.info(f"录制器初始化: 任务={task_id}") def start(self): """开始录制""" if self.is_recording: logger.warning("录制已在进行中") return self.is_recording = True self.start_time = time.time() # 获取虚拟机屏幕信息 try: screen_info = self.env.get_screen_info() self.metadata = { "task_id": self.task_id, "vm_resolution": [screen_info['screen_width'], screen_info['screen_height']], "vm_screenshot_resolution": [screen_info['screenshot_width'], screen_info['screenshot_height']], "vm_dpi_scale": screen_info['dpi_scale'], "recording_start": datetime.now().isoformat(), "recording_end": None } logger.info(f"虚拟机分辨率: {screen_info['screen_width']}x{screen_info['screen_height']}") logger.info(f"截图分辨率: {screen_info['screenshot_width']}x{screen_info['screenshot_height']}") except Exception as e: logger.warning(f"获取屏幕信息失败: {e}") self.metadata = { "task_id": self.task_id, "recording_start": datetime.now().isoformat(), "recording_end": None } # 记录初始截图 self._capture_screenshot("initial") # 启动监听器 self.mouse_listener = mouse.Listener( on_click=self._on_mouse_click, on_scroll=self._on_mouse_scroll ) self.keyboard_listener = keyboard.Listener( on_press=self._on_key_press ) self.mouse_listener.start() self.keyboard_listener.start() logger.info("✅ 录制已启动") print("\n" + "=" * 60) print("🎥 录制进行中...") print("💡 提示:") print(" - 请在VMware窗口中操作JADE") print(" - 每次点击都会自动截图") print(" - 按 Ctrl+C 停止录制") print("=" * 60 + "\n") def _on_mouse_click(self, x, y, button, pressed): """鼠标点击事件处理""" if not self.is_recording or not pressed: return # 核心修改:立刻从虚拟机获取真实物理坐标 vm_x, vm_y = self.env.get_mouse_pos() elapsed = time.time() - self.start_time # 记录动作 action = { "t": round(elapsed, 3), "type": "click", "button": str(button).replace("Button.", ""), "pos_host": [x, y], # Mac 逻辑坐标(留作参考) "pos_vm": [vm_x, vm_y] if vm_x is not None else None # 真实VM物理坐标 } # 截图 screenshot_filename = self._capture_screenshot("click") action["screenshot"] = screenshot_filename self.actions.append(action) if vm_x is not None: logger.info(f"[{elapsed:.1f}s] 点击: VM({vm_x}, {vm_y}) [Host: {int(x)}, {int(y)}] {action['button']}") else: logger.info(f"[{elapsed:.1f}s] 点击: Host({int(x)}, {int(y)}) [VM获取失败] {action['button']}") def _on_mouse_scroll(self, x, y, dx, dy): """鼠标滚轮事件处理""" if not self.is_recording: return elapsed = time.time() - self.start_time action = { "t": round(elapsed, 3), "type": "scroll", "pos_host": [x, y], "delta": [dx, dy], "pos_vm": None } self.actions.append(action) logger.debug(f"[{elapsed:.1f}s] 滚轮: ({x}, {y}) delta=({dx}, {dy})") def _on_key_press(self, key): """键盘按键事件处理""" if not self.is_recording: return elapsed = time.time() - self.start_time # 转换按键名称 try: if hasattr(key, 'char') and key.char: key_name = key.char else: key_name = str(key).replace("Key.", "") except: key_name = str(key) action = { "t": round(elapsed, 3), "type": "key", "key": key_name } self.actions.append(action) logger.debug(f"[{elapsed:.1f}s] 按键: {key_name}") def _capture_screenshot(self, tag=""): """ 捕获截图 Args: tag: 标签(用于文件名) Returns: str: 截图相对路径 """ try: screenshot = self.env.get_screenshot() # 生成文件名 self.screenshot_counter += 1 if tag: filename = f"{self.screenshot_counter:04d}_{tag}.png" else: filename = f"{self.screenshot_counter:04d}.png" filepath = os.path.join(self.screens_dir, filename) screenshot.save(filepath) logger.debug(f"截图保存: {filename}") return f"screens/{filename}" except Exception as e: logger.error(f"截图失败: {e}") return None def stop(self): """停止录制""" if not self.is_recording: logger.warning("录制未在进行中") return self.is_recording = False # 停止监听器 if self.mouse_listener: self.mouse_listener.stop() if self.keyboard_listener: self.keyboard_listener.stop() # 记录结束截图 self._capture_screenshot("final") # 更新元数据 self.metadata["recording_end"] = datetime.now().isoformat() self.metadata["total_duration"] = round(time.time() - self.start_time, 2) self.metadata["total_actions"] = len(self.actions) self.metadata["total_screenshots"] = self.screenshot_counter logger.info("✅ 录制已停止") def save(self): """保存轨迹数据""" if self.is_recording: logger.warning("录制仍在进行,先停止录制") self.stop() # 保存原始数据(未处理坐标) output_data = { "metadata": self.metadata, "actions": self.actions } raw_path = os.path.join(self.output_dir, "actions_raw.json") with open(raw_path, 'w', encoding='utf-8') as f: json.dump(output_data, f, indent=2, ensure_ascii=False) logger.info(f"✅ 轨迹数据已保存: {raw_path}") logger.info(f" - 总动作数: {len(self.actions)}") logger.info(f" - 截图数: {self.screenshot_counter}") logger.info(f" - 总时长: {self.metadata.get('total_duration', 0):.1f}秒") print("\n" + "=" * 60) print("📊 录制统计:") print(f" 动作数: {len(self.actions)}") print(f" 截图数: {self.screenshot_counter}") print(f" 时长: {self.metadata.get('total_duration', 0):.1f}秒") print(f" 保存位置: {raw_path}") print("=" * 60) print("\n💡 下一步:运行坐标转换") print(f" python scripts/tools/process_trajectory.py {self.task_id}") print("=" * 60 + "\n") def record_interactive(jade_env, task_id, output_dir): """ 交互式录制(带Ctrl+C停止) Args: jade_env: JadeEnv实例 task_id: 任务ID output_dir: 输出目录 """ recorder = Recorder(jade_env, task_id, output_dir) recorder.start() try: # 保持录制状态,直到Ctrl+C while recorder.is_recording: time.sleep(0.1) except KeyboardInterrupt: print("\n\n⏹ 收到停止信号...") finally: recorder.stop() recorder.save() return recorder if __name__ == "__main__": print("Recorder 独立测试模式") print("提示: 通常应该通过 collect_task.py 调用")