Files
sci-gui-agent-benchmark/annotation/ducktrack/recorder.py

145 lines
5.0 KiB
Python

import json
import os
import time
from datetime import datetime
from platform import system
from queue import Queue
from pynput import keyboard, mouse
from pynput.keyboard import KeyCode
from PyQt6.QtCore import QThread, pyqtSignal
from .metadata import MetadataManager
from .obs_client import OBSClient
from .util import fix_windows_dpi_scaling, get_recordings_dir
class Recorder(QThread):
"""
Makes recordings.
"""
recording_stopped = pyqtSignal()
def __init__(self, natural_scrolling: bool):
super().__init__()
if system() == "Windows":
fix_windows_dpi_scaling()
self.recording_path = self._get_recording_path()
self._is_recording = False
self._is_paused = False
self.event_queue = Queue()
self.events_file = open(os.path.join(self.recording_path, "events.jsonl"), "a")
self.metadata_manager = MetadataManager(
recording_path=self.recording_path,
natural_scrolling=natural_scrolling
)
self.obs_client = OBSClient(recording_path=self.recording_path,
metadata=self.metadata_manager.metadata)
self.mouse_listener = mouse.Listener(
on_move=self.on_move,
on_click=self.on_click,
on_scroll=self.on_scroll)
self.keyboard_listener = keyboard.Listener(
on_press=self.on_press,
on_release=self.on_release)
def on_move(self, x, y):
if not self._is_paused:
self.event_queue.put({"time_stamp": time.perf_counter(),
"action": "move",
"x": x,
"y": y}, block=False)
def on_click(self, x, y, button, pressed):
if not self._is_paused:
self.event_queue.put({"time_stamp": time.perf_counter(),
"action": "click",
"x": x,
"y": y,
"button": button.name,
"pressed": pressed}, block=False)
def on_scroll(self, x, y, dx, dy):
if not self._is_paused:
self.event_queue.put({"time_stamp": time.perf_counter(),
"action": "scroll",
"x": x,
"y": y,
"dx": dx,
"dy": dy}, block=False)
def on_press(self, key):
if not self._is_paused:
self.event_queue.put({"time_stamp": time.perf_counter(),
"action": "press",
"name": key.char if type(key) == KeyCode else key.name}, block=False)
def on_release(self, key):
if not self._is_paused:
self.event_queue.put({"time_stamp": time.perf_counter(),
"action": "release",
"name": key.char if type(key) == KeyCode else key.name}, block=False)
def run(self):
self._is_recording = True
self.metadata_manager.collect()
self.obs_client.start_recording()
self.mouse_listener.start()
self.keyboard_listener.start()
while self._is_recording:
event = self.event_queue.get()
self.events_file.write(json.dumps(event) + "\n")
def stop_recording(self):
if self._is_recording:
self._is_recording = False
self.metadata_manager.end_collect()
self.mouse_listener.stop()
self.keyboard_listener.stop()
self.obs_client.stop_recording()
self.metadata_manager.add_obs_record_state_timings(self.obs_client.record_state_events)
self.events_file.close()
self.metadata_manager.save_metadata()
self.recording_stopped.emit()
def pause_recording(self):
if not self._is_paused and self._is_recording:
self._is_paused = True
self.obs_client.pause_recording()
self.event_queue.put({"time_stamp": time.perf_counter(),
"action": "pause"}, block=False)
def resume_recording(self):
if self._is_paused and self._is_recording:
self._is_paused = False
self.obs_client.resume_recording()
self.event_queue.put({"time_stamp": time.perf_counter(),
"action": "resume"}, block=False)
def _get_recording_path(self) -> str:
recordings_dir = get_recordings_dir()
if not os.path.exists(recordings_dir):
os.mkdir(recordings_dir)
current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
recording_path = os.path.join(recordings_dir, f"recording-{current_time}")
os.mkdir(recording_path)
return recording_path