Files
sci-gui-agent-benchmark/utils/ducktrack.py
2023-12-06 21:35:30 +08:00

353 lines
16 KiB
Python

import sys, pathlib;
sys.path.append(str(pathlib.Path(__file__).parents[1]))
import os
import math
import json
import numpy as np
from typing import List
from copy import deepcopy
pynput2pyautogui_key = {
"alt_l": "altleft",
"alt_r": "altright",
}
COMMAND_KEYS = ['accept', 'add', 'alt', 'altleft', 'altright', 'apps', 'backspace', 'browserback', 'browserfavorites', 'browserforward', 'browserhome', 'browserrefresh', 'browsersearch', 'browserstop', 'capslock', 'clear', 'convert', 'ctrl', 'ctrlleft', 'ctrlright', 'decimal', 'del', 'delete', 'divide', 'down', 'end', 'enter', 'esc', 'escape', 'execute', 'f1', 'f10', 'f11', 'f12', 'f13', 'f14', 'f15', 'f16', 'f17', 'f18', 'f19', 'f2', 'f20', 'f21', 'f22', 'f23', 'f24', 'f3', 'f4', 'f5', 'f6', 'f7', 'f8', 'f9', 'final', 'fn', 'hanguel', 'hangul', 'hanja', 'help', 'home', 'insert', 'junja', 'kana', 'kanji', 'launchapp1', 'launchapp2', 'launchmail', 'launchmediaselect', 'left', 'modechange', 'multiply', 'nexttrack', 'nonconvert', 'num0', 'num1', 'num2', 'num3', 'num4', 'num5', 'num6', 'num7', 'num8', 'num9', 'numlock', 'pagedown', 'pageup', 'pause', 'pgdn', 'pgup', 'playpause', 'prevtrack', 'print', 'printscreen', 'prntscrn', 'prtsc', 'prtscr', 'return', 'right', 'scrolllock', 'select', 'separator', 'shift', 'shiftleft', 'shiftright', 'sleep', 'stop', 'subtract', 'tab', 'up', 'volumedown', 'volumemute', 'volumeup', 'win', 'winleft', 'winright', 'yen', 'command', 'option', 'optionleft', 'optionright', 'alt_l', 'alt_r']
typingkey2str = {
"space" : " ",
}
class DuckTrackEventActionConverter:
def __init__(self, ):
""""""
### Enumerations ###
def move_event_to_action(self, event: dict, action_space: str = "computer_13"):
"""Converts a mouse move event to its corresponding action."""
if action_space == "computer_13":
return {
"action_type": "MOVE_TO",
"parameters": {
"x": event["x"],
"y": event["y"]
}
}
elif action_space == "pyautogui":
return "pyautogui.moveTo({}, {})".format(event["x"], event["y"])
def click_event_to_action(self, event: dict, action_space: str = "computer_13"):
"""Converts a mouse click event to its corresponding action."""
action = {
"action_type": None,
"parameters": {
"button": None
}
}
mouse_button = event["button"]
mouse_pressed = event["pressed"]
if mouse_pressed:
action["action_type"] = "MOUSE_DOWN"
elif not mouse_pressed:
action["action_type"] = "MOUSE_UP"
else:
raise NotImplementedError(mouse_pressed)
if mouse_button in ["left", "right", "middle"]:
action["parameters"]["button"] = mouse_button
else:
raise NotImplementedError(mouse_button)
return action
def press_event_to_action(self, event: dict, action_space: str = "computer_13"):
"""Converts a key down event to its corresponding action."""
# NOTE: the `key down`, `press` have the same meaning here, while different in pyautogui
return {
"action_type": "KEY_DOWN",
"parameters": {
"key": event["name"] if event["name"] not in pynput2pyautogui_key else pynput2pyautogui_key[
event["name"]]
}
}
def release_event_to_action(self, event: dict, action_space: str = "computer_13"):
"""Converts a key release event to its corresponding action."""
return {
"action_type": "KEY_UP",
"parameters": {
"key": event["name"] if event["name"] not in pynput2pyautogui_key else pynput2pyautogui_key[
event["name"]]
}
}
def scroll_event_to_action(self, event: dict, action_space: str = "computer_13"):
"""Converts a scroll event to its corresponding action."""
return {
"action_type": "SCROLL",
"parameters": {
"dx": event["dx"],
"dy": event["dy"]
}
}
def event_to_action(self, event: dict, action_space: str = "computer_13"):
"""Converts an event to its corresponding action based on the event type."""
if event["action"] == "move":
return self.move_event_to_action(event)
elif event["action"] == "click":
return self.click_event_to_action(event)
elif event["action"] == "press":
return self.press_event_to_action(event)
elif event["action"] == "release":
return self.release_event_to_action(event)
elif event["action"] == "scroll":
return self.scroll_event_to_action(event)
else:
raise NotImplementedError(event["action"])
### Compressing ###
def compress_mouse_move(self, data: List[dict], index: int):
"""Compresses consecutive mouse move events into the last move events."""
last_move = data[index]
while index < len(data) and data[index]["action"] == "move":
last_move = data[index]
index += 1
return last_move, index
def compress_scroll(self, data: List[dict], index: int):
"""Compresses consecutive scroll events into a single scroll event."""
last_scroll = data[index]
consecutive_dx, consecutive_dy = data[index]["dx"], data[index]["dy"]
while index < len(data) and data[index]["action"] == "scroll" and np.sign(data[index]["dx"]) == np.sign(consecutive_dx) and np.sign(data[index]["dy"]) == np.sign(consecutive_dy):
last_scroll = data[index]
consecutive_dx += data[index]["dx"]
consecutive_dy += data[index]["dy"]
index += 1
last_scroll["dx"], last_scroll["dy"] = consecutive_dx, consecutive_dy
return last_scroll, index
### Converting ###
def ducktrack_event_file_to_action(self, ducktrack_event_file: str, out_file: str, compress_move: bool = True, compress_scroll: bool = True, compress_click: bool = True,compress_drag: bool = True, compress_press_key: bool = True, compress_typing: bool = True):
"""Converts DuckTrack event data to a list of actions and saves them to a file."""
if not os.path.exists(ducktrack_event_file):
raise FileNotFoundError(ducktrack_event_file)
with open(ducktrack_event_file, 'r') as file:
events = [json.loads(line) for line in file]
# Save the compressed actions in a list
result = []
index = 0
presses_to_skip = 0
releases_to_skip = 0
move_to_skip = 0
keys_pressed = []
# Compress the mouse move events
while index < len(events):
event = events[index]
def do_mouse_press(button: str, _index: int):
num_clicks = 0
mouse_pressed = True
skip_move = 0
click_x, click_y = event["x"], event["y"]
for j, next_event in enumerate(events[index + 1:]):
# make sure the time between mouse clicks is less than 500ms
if next_event["time_stamp"] - event["time_stamp"] > 0.5:
if num_clicks > 0:
if result[-1:][0]["action_type"] == "MOVE_TO":
result.pop()
result.append({
"action_type": "CLICK",
"parameters": {
"button": button,
"x" : click_x,
"y" : click_y,
"num_clicks": num_clicks
}
})
return num_clicks-1, num_clicks, _index, skip_move
break
if "x" in next_event and "y" in next_event:
# if the mouse moves out of the click radius/rectangle, it is not a click sequence
if math.sqrt((next_event["y"] - event["y"]) ** 2 +
(next_event["x"] - event["x"]) ** 2) > 4:
if num_clicks > 0:
if result[-1:][0]["action_type"] == "MOVE_TO":
result.pop()
result.append({
"action_type": "CLICK",
"parameters": {
"button": button,
"x" : click_x,
"y" : click_y,
"num_clicks": num_clicks
}
})
return num_clicks-1, num_clicks, _index, skip_move
break
if next_event["action"] == "click" and compress_click:
if not next_event["pressed"]:
num_clicks += 1
mouse_pressed = False
if num_clicks == 3:
if result[-1:][0]["action_type"] == "MOVE_TO":
result.pop()
result.append({
"action_type": "CLICK",
"parameters": {
"button": button,
"x" : click_x,
"y" : click_y,
"num_clicks": 3
}
})
return 2, 3, _index, skip_move
elif next_event["pressed"]:
mouse_pressed = True
else:
raise NotImplementedError(next_event["pressed"])
elif next_event["action"] != "click" and not mouse_pressed:
if next_event["action"] == "move":
if next_event["x"] == click_x and next_event["y"] == click_y:
skip_move += 1
continue
if result[-1:][0]["action_type"] == "MOVE_TO":
result.pop()
result.append({
"action_type": "CLICK",
"parameters": {
"button": button,
"x" : click_x,
"y" : click_y,
"num_clicks": num_clicks
}
})
return num_clicks-1, num_clicks, _index, skip_move
# Compress {MOUSE_DOWN, MOVE, MOUSE_UP} into DRAG_TO event
elif next_event["action"] == "move" and compress_drag:
if next_event["x"] == click_x and next_event["y"] == click_y:
skip_move += 1
continue
last_move, _index = self.compress_mouse_move(events, _index+1)
if result[-1:][0]["action_type"] == "MOVE_TO":
result.pop()
result.append({
"action_type": "DRAG_TO",
"parameters": {
"x": last_move["x"],
"y": last_move["y"]
}
})
return 0, 1, _index, skip_move
result.append({
"action_type": "MOUSE_DOWN",
"parameters": {
"button": button
}
})
return 0, 0, _index, skip_move
if event["action"] == "move":
if move_to_skip > 0:
move_to_skip -= 1
index += 1
continue
if compress_move:
last_move, index = self.compress_mouse_move(events, index)
result.extend([self.event_to_action(last_move)])
elif event["action"] == "scroll" and compress_scroll:
last_scroll, index = self.compress_scroll(events, index)
result.extend([self.event_to_action(last_scroll)])
elif event["action"] == "click":
button = event["button"]
if event["pressed"]:
if presses_to_skip == 0:
presses, releases, index, moves = do_mouse_press(button, index)
presses_to_skip += presses
releases_to_skip += releases
move_to_skip += moves
else:
presses_to_skip -= 1
else:
if releases_to_skip == 0:
result.append({
"action_type": "MOUSE_UP",
"parameters": {
"button": button
}
})
else:
releases_to_skip -= 1
index += 1
elif event["action"] == "press" and event["name"] not in COMMAND_KEYS and compress_typing:
typing_words = ""
while index < len(events) and events[index]["action"] in ["press", "release"] and events[index]["name"] not in COMMAND_KEYS:
if events[index]["action"] == "press":
keys_pressed.append(events[index]["name"])
typing_words += events[index]["name"] if events[index]["name"] not in typingkey2str else typingkey2str[events[index]["name"]]
elif events[index]["action"] == "release":
keys_pressed.remove(events[index]["name"])
index += 1
if len(typing_words) > 1:
result.append({
"action_type": "TYPING",
"parameters": {
"text": typing_words
}
})
else:
result.append({
"action_type": "PRESS",
"parameters": {
"key": typing_words
}
})
elif event["action"] == "press" and compress_press_key:
keys_pressed.append(event["name"])
result.append({
"action_type": "PRESS",
"parameters": {
"key": event["name"] if event["name"] not in pynput2pyautogui_key else pynput2pyautogui_key[
event["name"]]
}
})
index += 1
elif event["action"] == "release" and compress_press_key:
keys_pressed.remove(event["name"])
index += 1
else:
result.append(self.event_to_action(event))
index += 1
with open(out_file, "w") as f:
json.dump(result, f)
if __name__ == "__main__":
converter = DuckTrackEventActionConverter()
converter.ducktrack_event_file_to_action(
ducktrack_event_file="complex_clicking.jsonl",
out_file="complex_clicking5.json",
compress_move=True,
compress_scroll=True,
compress_click=True,
compress_drag=True,
compress_press_key=True,
compress_typing=True,
)