351 lines
16 KiB
Python
351 lines
16 KiB
Python
import sys, pathlib;
|
|
|
|
sys.path.append(str(pathlib.Path(__file__).parents[1]))
|
|
|
|
import os
|
|
import math
|
|
import json
|
|
import numpy as np
|
|
from typing import List
|
|
from copy import deepcopy
|
|
|
|
pynput2pyautogui_key = {
|
|
"alt_l": "altleft",
|
|
"alt_r": "altright",
|
|
}
|
|
COMMAND_KEYS = ['accept', 'add', 'alt', 'altleft', 'altright', 'apps', 'backspace', 'browserback', 'browserfavorites', 'browserforward', 'browserhome', 'browserrefresh', 'browsersearch', 'browserstop', 'capslock', 'clear', 'convert', 'ctrl', 'ctrlleft', 'ctrlright', 'decimal', 'del', 'delete', 'divide', 'down', 'end', 'enter', 'esc', 'escape', 'execute', 'f1', 'f10', 'f11', 'f12', 'f13', 'f14', 'f15', 'f16', 'f17', 'f18', 'f19', 'f2', 'f20', 'f21', 'f22', 'f23', 'f24', 'f3', 'f4', 'f5', 'f6', 'f7', 'f8', 'f9', 'final', 'fn', 'hanguel', 'hangul', 'hanja', 'help', 'home', 'insert', 'junja', 'kana', 'kanji', 'launchapp1', 'launchapp2', 'launchmail', 'launchmediaselect', 'left', 'modechange', 'multiply', 'nexttrack', 'nonconvert', 'num0', 'num1', 'num2', 'num3', 'num4', 'num5', 'num6', 'num7', 'num8', 'num9', 'numlock', 'pagedown', 'pageup', 'pause', 'pgdn', 'pgup', 'playpause', 'prevtrack', 'print', 'printscreen', 'prntscrn', 'prtsc', 'prtscr', 'return', 'right', 'scrolllock', 'select', 'separator', 'shift', 'shiftleft', 'shiftright', 'sleep', 'stop', 'subtract', 'tab', 'up', 'volumedown', 'volumemute', 'volumeup', 'win', 'winleft', 'winright', 'yen', 'command', 'option', 'optionleft', 'optionright', 'alt_l', 'alt_r']
|
|
typingkey2str = {
|
|
"space" : " ",
|
|
}
|
|
|
|
class DuckTrackEventActionConverter:
|
|
def __init__(self, ):
|
|
""""""
|
|
|
|
### Enumerations ###
|
|
def move_event_to_action(self, event: dict, action_space: str = "computer_13"):
|
|
"""Converts a mouse move event to its corresponding action."""
|
|
if action_space == "computer_13":
|
|
return {
|
|
"action_type": "MOVE_TO",
|
|
"parameters": {
|
|
"x": event["x"],
|
|
"y": event["y"]
|
|
}
|
|
}
|
|
elif action_space == "pyautogui":
|
|
return "pyautogui.moveTo({}, {})".format(event["x"], event["y"])
|
|
|
|
def click_event_to_action(self, event: dict, action_space: str = "computer_13"):
|
|
"""Converts a mouse click event to its corresponding action."""
|
|
action = {
|
|
"action_type": None,
|
|
"parameters": {
|
|
"button": None
|
|
}
|
|
}
|
|
|
|
mouse_button = event["button"]
|
|
mouse_pressed = event["pressed"]
|
|
|
|
if mouse_pressed:
|
|
action["action_type"] = "MOUSE_DOWN"
|
|
elif not mouse_pressed:
|
|
action["action_type"] = "MOUSE_UP"
|
|
else:
|
|
raise NotImplementedError(mouse_pressed)
|
|
|
|
if mouse_button in ["left", "right", "middle"]:
|
|
action["parameters"]["button"] = mouse_button
|
|
else:
|
|
raise NotImplementedError(mouse_button)
|
|
|
|
return action
|
|
|
|
def press_event_to_action(self, event: dict, action_space: str = "computer_13"):
|
|
"""Converts a key down event to its corresponding action."""
|
|
# NOTE: the `key down`, `press` have the same meaning here, while different in pyautogui
|
|
return {
|
|
"action_type": "KEY_DOWN",
|
|
"parameters": {
|
|
"key": event["name"] if event["name"] not in pynput2pyautogui_key else pynput2pyautogui_key[
|
|
event["name"]]
|
|
}
|
|
}
|
|
|
|
def release_event_to_action(self, event: dict, action_space: str = "computer_13"):
|
|
"""Converts a key release event to its corresponding action."""
|
|
return {
|
|
"action_type": "KEY_UP",
|
|
"parameters": {
|
|
"key": event["name"] if event["name"] not in pynput2pyautogui_key else pynput2pyautogui_key[
|
|
event["name"]]
|
|
}
|
|
}
|
|
|
|
def scroll_event_to_action(self, event: dict, action_space: str = "computer_13"):
|
|
"""Converts a scroll event to its corresponding action."""
|
|
return {
|
|
"action_type": "SCROLL",
|
|
"parameters": {
|
|
"dx": event["dx"],
|
|
"dy": event["dy"]
|
|
}
|
|
}
|
|
|
|
def event_to_action(self, event: dict, action_space: str = "computer_13"):
|
|
"""Converts an event to its corresponding action based on the event type."""
|
|
if event["action"] == "move":
|
|
return self.move_event_to_action(event)
|
|
elif event["action"] == "click":
|
|
return self.click_event_to_action(event)
|
|
elif event["action"] == "press":
|
|
return self.press_event_to_action(event)
|
|
elif event["action"] == "release":
|
|
return self.release_event_to_action(event)
|
|
elif event["action"] == "scroll":
|
|
return self.scroll_event_to_action(event)
|
|
else:
|
|
raise NotImplementedError(event["action"])
|
|
|
|
### Compressing ###
|
|
def compress_mouse_move(self, data: List[dict], index: int):
|
|
"""Compresses consecutive mouse move events into the last move events."""
|
|
last_move = data[index]
|
|
while index < len(data) and data[index]["action"] == "move":
|
|
last_move = data[index]
|
|
index += 1
|
|
return last_move, index
|
|
|
|
def compress_scroll(self, data: List[dict], index: int):
|
|
"""Compresses consecutive scroll events into a single scroll event."""
|
|
last_scroll = data[index]
|
|
consecutive_dx, consecutive_dy = data[index]["dx"], data[index]["dy"]
|
|
while index < len(data) and data[index]["action"] == "scroll" and np.sign(data[index]["dx"]) == np.sign(consecutive_dx) and np.sign(data[index]["dy"]) == np.sign(consecutive_dy):
|
|
last_scroll = data[index]
|
|
consecutive_dx += data[index]["dx"]
|
|
consecutive_dy += data[index]["dy"]
|
|
index += 1
|
|
last_scroll["dx"], last_scroll["dy"] = consecutive_dx, consecutive_dy
|
|
return last_scroll, index
|
|
|
|
### Converting ###
|
|
def ducktrack_event_file_to_action(self, ducktrack_event_file: str, out_file: str, compress_move: bool = True, compress_scroll: bool = True, compress_click: bool = True,compress_drag: bool = True, compress_press_key: bool = True, compress_typing: bool = True):
|
|
"""Converts DuckTrack event data to a list of actions and saves them to a file."""
|
|
if not os.path.exists(ducktrack_event_file):
|
|
raise FileNotFoundError(ducktrack_event_file)
|
|
|
|
with open(ducktrack_event_file, 'r') as file:
|
|
events = [json.loads(line) for line in file]
|
|
|
|
# Save the compressed actions in a list
|
|
result = []
|
|
index = 0
|
|
presses_to_skip = 0
|
|
releases_to_skip = 0
|
|
move_to_skip = 0
|
|
keys_pressed = []
|
|
|
|
# Compress the mouse move events
|
|
while index < len(events):
|
|
|
|
event = events[index]
|
|
|
|
def do_mouse_press(button: str, _index: int):
|
|
|
|
num_clicks = 0
|
|
mouse_pressed = True
|
|
skip_move = 0
|
|
click_x, click_y = event["x"], event["y"]
|
|
|
|
for j, next_event in enumerate(events[index + 1:]):
|
|
# make sure the time between mouse clicks is less than 500ms
|
|
if next_event["time_stamp"] - event["time_stamp"] > 0.5:
|
|
if num_clicks > 0:
|
|
if result[-1:][0]["action_type"] == "MOVE_TO":
|
|
result.pop()
|
|
result.append({
|
|
"action_type": "CLICK",
|
|
"parameters": {
|
|
"button": button,
|
|
"x" : click_x,
|
|
"y" : click_y,
|
|
"num_clicks": num_clicks
|
|
}
|
|
})
|
|
return num_clicks-1, num_clicks, _index, skip_move
|
|
break
|
|
|
|
if "x" in next_event and "y" in next_event:
|
|
# if the mouse moves out of the click radius/rectangle, it is not a click sequence
|
|
if math.sqrt((next_event["y"] - event["y"]) ** 2 +
|
|
(next_event["x"] - event["x"]) ** 2) > 4:
|
|
if num_clicks > 0:
|
|
if result[-1:][0]["action_type"] == "MOVE_TO":
|
|
result.pop()
|
|
result.append({
|
|
"action_type": "CLICK",
|
|
"parameters": {
|
|
"button": button,
|
|
"x" : click_x,
|
|
"y" : click_y,
|
|
"num_clicks": num_clicks
|
|
}
|
|
})
|
|
return num_clicks-1, num_clicks, _index, skip_move
|
|
break
|
|
|
|
if next_event["action"] == "click" and compress_click:
|
|
if not next_event["pressed"]:
|
|
num_clicks += 1
|
|
mouse_pressed = False
|
|
if num_clicks == 3:
|
|
if result[-1:][0]["action_type"] == "MOVE_TO":
|
|
result.pop()
|
|
result.append({
|
|
"action_type": "CLICK",
|
|
"parameters": {
|
|
"button": button,
|
|
"x" : click_x,
|
|
"y" : click_y,
|
|
"num_clicks": 3
|
|
}
|
|
})
|
|
return 2, 3, _index, skip_move
|
|
elif next_event["pressed"]:
|
|
mouse_pressed = True
|
|
else:
|
|
raise NotImplementedError(next_event["pressed"])
|
|
elif next_event["action"] != "click" and not mouse_pressed:
|
|
if next_event["action"] == "move":
|
|
if next_event["x"] == click_x and next_event["y"] == click_y:
|
|
skip_move += 1
|
|
continue
|
|
if result[-1:][0]["action_type"] == "MOVE_TO":
|
|
result.pop()
|
|
result.append({
|
|
"action_type": "CLICK",
|
|
"parameters": {
|
|
"button": button,
|
|
"x" : click_x,
|
|
"y" : click_y,
|
|
"num_clicks": num_clicks
|
|
}
|
|
})
|
|
return num_clicks-1, num_clicks, _index, skip_move
|
|
|
|
# Compress {MOUSE_DOWN, MOVE, MOUSE_UP} into DRAG_TO event
|
|
elif next_event["action"] == "move" and compress_drag:
|
|
if next_event["x"] == click_x and next_event["y"] == click_y:
|
|
skip_move += 1
|
|
continue
|
|
last_move, _index = self.compress_mouse_move(events, _index+1)
|
|
result.append({
|
|
"action_type": "DRAG_TO",
|
|
"parameters": {
|
|
"x": last_move["x"],
|
|
"y": last_move["y"]
|
|
}
|
|
})
|
|
return 0, 1, _index, skip_move
|
|
|
|
result.append({
|
|
"action_type": "MOUSE_DOWN",
|
|
"parameters": {
|
|
"button": button
|
|
}
|
|
})
|
|
return 0, 0, _index, skip_move
|
|
|
|
if event["action"] == "move":
|
|
if move_to_skip > 0:
|
|
move_to_skip -= 1
|
|
index += 1
|
|
continue
|
|
if compress_move:
|
|
last_move, index = self.compress_mouse_move(events, index)
|
|
result.extend([self.event_to_action(last_move)])
|
|
|
|
elif event["action"] == "scroll" and compress_scroll:
|
|
last_scroll, index = self.compress_scroll(events, index)
|
|
result.extend([self.event_to_action(last_scroll)])
|
|
|
|
elif event["action"] == "click":
|
|
button = event["button"]
|
|
|
|
if event["pressed"]:
|
|
if presses_to_skip == 0:
|
|
presses, releases, index, moves = do_mouse_press(button, index)
|
|
presses_to_skip += presses
|
|
releases_to_skip += releases
|
|
move_to_skip += moves
|
|
else:
|
|
presses_to_skip -= 1
|
|
else:
|
|
if releases_to_skip == 0:
|
|
result.append({
|
|
"action_type": "MOUSE_UP",
|
|
"parameters": {
|
|
"button": button
|
|
}
|
|
})
|
|
else:
|
|
releases_to_skip -= 1
|
|
index += 1
|
|
elif event["action"] == "press" and event["name"] not in COMMAND_KEYS and compress_typing:
|
|
typing_words = ""
|
|
while index < len(events) and events[index]["action"] in ["press", "release"] and events[index]["name"] not in COMMAND_KEYS:
|
|
if events[index]["action"] == "press":
|
|
keys_pressed.append(events[index]["name"])
|
|
typing_words += events[index]["name"] if events[index]["name"] not in typingkey2str else typingkey2str[events[index]["name"]]
|
|
elif events[index]["action"] == "release":
|
|
keys_pressed.remove(events[index]["name"])
|
|
index += 1
|
|
if len(typing_words) > 1:
|
|
result.append({
|
|
"action_type": "TYPING",
|
|
"parameters": {
|
|
"text": typing_words
|
|
}
|
|
})
|
|
else:
|
|
result.append({
|
|
"action_type": "PRESS",
|
|
"parameters": {
|
|
"key": typing_words
|
|
}
|
|
})
|
|
elif event["action"] == "press" and compress_press_key:
|
|
keys_pressed.append(event["name"])
|
|
result.append({
|
|
"action_type": "PRESS",
|
|
"parameters": {
|
|
"key": event["name"] if event["name"] not in pynput2pyautogui_key else pynput2pyautogui_key[
|
|
event["name"]]
|
|
}
|
|
})
|
|
index += 1
|
|
elif event["action"] == "release" and compress_press_key:
|
|
keys_pressed.remove(event["name"])
|
|
index += 1
|
|
else:
|
|
result.append(self.event_to_action(event))
|
|
index += 1
|
|
|
|
with open(out_file, "w") as f:
|
|
json.dump(result, f)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
converter = DuckTrackEventActionConverter()
|
|
converter.ducktrack_event_file_to_action(
|
|
ducktrack_event_file="complex_clicking.jsonl",
|
|
out_file="complex_clicking5.json",
|
|
compress_move=True,
|
|
compress_scroll=True,
|
|
compress_click=True,
|
|
compress_drag=True,
|
|
compress_press_key=True,
|
|
compress_typing=True,
|
|
)
|