Files
sci-gui-agent-benchmark/annotation/ducktrack/playback.py

189 lines
7.0 KiB
Python

import json
import math
import os
import sys
import time
import pyautogui
from pynput.keyboard import Controller as KeyboardController
from pynput.keyboard import Key
from pynput.mouse import Button
from pynput.mouse import Controller as MouseController
from .keycomb import KeyCombinationListener
from .util import (fix_windows_dpi_scaling, get_recordings_dir, name_to_button,
name_to_key)
pyautogui.PAUSE = 0
pyautogui.DARWIN_CATCH_UP_TIME = 0
class Player:
"""
Plays back recordings.
"""
def __init__(self):
self.stop_playback = False
self.listener = KeyCombinationListener()
def stop_comb_pressed():
self.stop_playback = True
return False
self.listener.add_comb(("shift", "esc"), stop_comb_pressed)
self.listener.start()
def play(self, recording_path: str):
with open(os.path.join(recording_path, "events.jsonl"), "r") as f:
events = [json.loads(line) for line in f.readlines()]
with open(os.path.join(recording_path, "metadata.json"), "r") as f:
metadata = json.load(f)
self.playback(events, metadata)
def playback(self, events: list[dict], metadata: dict):
if metadata["system"] == "Windows":
fix_windows_dpi_scaling()
mouse_controller = MouseController()
keyboard_controller = KeyboardController()
if not events:
self.listener.stop()
return
presses_to_skip = 0
releases_to_skip = 0
in_click_sequence = False
for i, event in enumerate(events):
start_time = time.perf_counter()
if self.stop_playback:
return
def do_mouse_press(button):
for j, second_event in enumerate(events[i+1:]):
# make sure the time between mouse clicks is less than 500ms
if second_event["time_stamp"] - event["time_stamp"] > 0.5:
break
if "x" in second_event and "y" in second_event:
# if the mouse moves out of the click radius/rectangle, it is not a click sequence
if math.sqrt((second_event["y"] - event["y"]) ** 2 +
(second_event["x"] - event["x"]) ** 2) > 4:
break
if second_event["action"] == "click" and second_event["pressed"]:
for k, third_event in enumerate(events[i+j+2:]):
if third_event["time_stamp"] - second_event["time_stamp"] > 0.5:
break
if "x" in third_event and "y" in third_event:
if math.sqrt((third_event["y"] - event["y"]) ** 2 +
(third_event["x"] - event["x"]) ** 2) > 5:
break
if third_event["action"] == "click" and third_event["pressed"]:
mouse_controller.click(button, 3)
return 2, 2
mouse_controller.click(button, 2)
return 1, 1
mouse_controller.press(button)
return 0, 0
if event["action"] == "move":
mouse_controller.position = (event["x"], event["y"])
elif event["action"] == "click":
button = name_to_button(event["button"])
if event["pressed"]:
if presses_to_skip == 0:
presses, releases = do_mouse_press(button)
presses_to_skip += presses
releases_to_skip += releases
if presses > 0:
in_click_sequence = True
else:
presses_to_skip -= 1
else:
if releases_to_skip == 0:
mouse_controller.release(button)
if in_click_sequence:
keyboard_controller.press(Key.shift)
mouse_controller.click(Button.left)
keyboard_controller.release(Key.shift)
in_click_sequence = False
else:
releases_to_skip -= 1
elif event["action"] == "scroll":
if metadata["system"] == "Windows":
# for some reason on windows, pynput scroll is correct but pyautogui is not
mouse_controller.scroll(metadata["scroll_direction"] * event["dx"], metadata["scroll_direction"] * event["dy"])
else:
pyautogui.hscroll(clicks=metadata["scroll_direction"] * event["dx"])
pyautogui.vscroll(clicks=metadata["scroll_direction"] * event["dy"])
elif event["action"] in ["press", "release"]:
key = name_to_key(event["name"])
if event["action"] == "press":
keyboard_controller.press(key)
else:
keyboard_controller.release(key)
# sleep for the correct amount of time
end_time = time.perf_counter()
execution_time = end_time - start_time
if i + 1 < len(events):
desired_delay = events[i + 1]["time_stamp"] - event["time_stamp"]
delay = desired_delay - execution_time
if delay < 0:
print(f"warning: behind by {-delay * 1000:.3f} ms")
elif delay != 0:
wait_until = time.perf_counter() + delay
while time.perf_counter() < wait_until:
pass
self.listener.stop()
def get_latest_recording() -> str:
recordings_dir = get_recordings_dir()
if not os.path.exists(recordings_dir):
raise Exception("The recordings directory does not exist")
recordings = [os.path.join(recordings_dir, f) for f in os.listdir(recordings_dir) if os.path.isdir(os.path.join(recordings_dir, f))]
if len(recordings) == 0:
raise Exception("You have no recordings to play back")
latest_recording = max(recordings, key=os.path.getctime)
return latest_recording
def main():
player = Player()
if len(sys.argv) > 1:
recording_path = sys.argv[1]
else:
recording_path = get_latest_recording()
player.play(recording_path)
if __name__ == "__main__":
n = 3
print("press shift+esc to stop the playback")
print(f"starting in {n} seconds...")
time.sleep(n)
main()