Update compressor for data annotation

This commit is contained in:
Timothyxxx
2023-12-04 00:51:33 +08:00
parent 8cb31f1bb0
commit 8818779329
8 changed files with 2519 additions and 151 deletions

View File

@@ -3,78 +3,93 @@ import sys, pathlib;
sys.path.append(str(pathlib.Path(__file__).parents[1]))
import os
import math
import json
from typing import List
from desktop_env.envs.desktop_env import Action, MouseClick
from copy import deepcopy
pynput2pyautogui_key = {
"alt_l": "altleft",
"alt_r": "altright",
}
class DuckTrackEventActionConverter:
def __init__(self, human_readable: str, compress_move: bool = True):
self.human_readable = human_readable
self.compress_move = compress_move
def __init__(self, ):
""""""
def enum_to_str(self, enum):
"""Converts an enum to its string representation if HUMAN_READABLE is True, otherwise returns its value."""
return enum.name if self.human_readable else enum.value
### Enumerations ###
def move_event_to_action(self, event: dict, action_space: str = "computer_13"):
"""Converts a mouse move event to its corresponding action."""
if action_space == "computer_13":
return {
"action_type": "MOVE_TO",
"parameters": {
"x": event["x"],
"y": event["y"]
}
}
elif action_space == "pyautogui":
return "pyautogui.moveTo({}, {})".format(event["x"], event["y"])
def compress_mouse_move(self, data: List[dict], index: int):
"""Compresses consecutive mouse move events into first and last move events."""
first_move, last_move = data[index], data[index]
while index < len(data) and data[index]["action"] == "move":
last_move = data[index]
index += 1
return first_move, last_move, index
def click_event_to_action(self, event: dict, action_space: str = "computer_13"):
"""Converts a mouse click event to its corresponding action."""
action = {
"action_type": None,
"parameters": {
"button": None
}
}
def move_event_to_action(self, event: dict):
return {"action_type": self.enum_to_str(Action.MOUSE_MOVE),
"x": event["x"],
"y": event["y"]}
def click_event_to_action(self, event: dict):
action = {}
mouse_button = event["button"]
mouse_pressed = event["pressed"]
if mouse_pressed:
action["action_type"] = self.enum_to_str(Action.MOUSE_DOWN)
action["action_type"] = "MOUSE_DOWN"
elif not mouse_pressed:
action["action_type"] = self.enum_to_str(Action.MOUSE_UP)
action["action_type"] = "MOUSE_UP"
else:
raise NotImplementedError(mouse_pressed)
if mouse_button == "left":
action["click_type"] = self.enum_to_str(MouseClick.LEFT)
elif mouse_button == "right":
action["click_type"] = self.enum_to_str(MouseClick.RIGHT)
elif mouse_button == "middle":
action["click_type"] = self.enum_to_str(MouseClick.MIDDLE)
if mouse_button in ["left", "right", "middle"]:
action["parameters"]["button"] = mouse_button
else:
raise NotImplementedError(mouse_button)
return action
def press_event_to_action(self, event: dict):
return {"action_type": self.enum_to_str(Action.KEY_DOWN),
"key": [ord(c) for c in event["name"]]}
def press_event_to_action(self, event: dict, action_space: str = "computer_13"):
"""Converts a key down event to its corresponding action."""
# NOTE: the `key down`, `press` have the same meaning here, while different in pyautogui
return {
"action_type": "KEY_DOWN",
"parameters": {
"key": event["name"] if event["name"] not in pynput2pyautogui_key else pynput2pyautogui_key[
event["name"]]
}
}
def release_event_to_action(self, event: dict):
return {"action_type": self.enum_to_str(Action.KEY_UP),
"key": [ord(c) for c in event["name"]]}
def release_event_to_action(self, event: dict, action_space: str = "computer_13"):
"""Converts a key release event to its corresponding action."""
return {
"action_type": "KEY_UP",
"parameters": {
"key": event["name"] if event["name"] not in pynput2pyautogui_key else pynput2pyautogui_key[
event["name"]]
}
}
def scroll_event_to_action(self, event: dict):
# TODO: need to confirm if df < 0 means scroll up or down
def scroll_event_to_action(self, event: dict, action_space: str = "computer_13"):
"""Converts a scroll event to its corresponding action."""
return {
"action_type": "SCROLL",
"parameters": {
"dx": event["dx"],
"dy": event["dy"]
}
}
# TODO: NEED to be test to match the scroll up and down with our action, e.g. scroll here once is equal to scroll 10 or scroll 20?
if event["dy"] < 0:
down = False
else:
down = True
return {"action_type": self.enum_to_str(Action.CLICK),
"click_type": self.enum_to_str(MouseClick.WHEEL_DOWN) if down else self.enum_to_str(
MouseClick.WHEEL_UP)}
def event_to_action(self, event: dict):
def event_to_action(self, event: dict, action_space: str = "computer_13"):
"""Converts an event to its corresponding action based on the event type."""
if event["action"] == "move":
return self.move_event_to_action(event)
@@ -89,114 +104,121 @@ class DuckTrackEventActionConverter:
else:
raise NotImplementedError(event["action"])
def ducktrack_event_file_to_action(self, ducktrack_event_file: str, out_file: str, compress_move: bool = None):
### Compressing ###
def compress_mouse_move(self, data: List[dict], index: int):
"""Compresses consecutive mouse move events into first and last move events."""
first_move, last_move = data[index], data[index]
while index < len(data) and data[index]["action"] == "move":
last_move = data[index]
index += 1
return first_move, last_move, index
### Converting ###
def ducktrack_event_file_to_action(self, ducktrack_event_file: str, out_file: str, compress_move: bool = True):
"""Converts DuckTrack event data to a list of actions and saves them to a file."""
if not os.path.exists(ducktrack_event_file):
raise FileNotFoundError(ducktrack_event_file)
# set to default
if compress_move is None:
compress_move = self.compress_move
with open(ducktrack_event_file, 'r') as file:
data = [json.loads(line) for line in file]
events = [json.loads(line) for line in file]
result = {"action": [], "event": []}
# Save the compressed actions in a list
result = []
index = 0
presses_to_skip = 0
releases_to_skip = 0
# Compress the mouse move events
while index < len(data):
event = data[index]
while index < len(events):
event = events[index]
def do_mouse_press(button: str):
for j, second_event in enumerate(events[index + 1:]):
# make sure the time between mouse clicks is less than 500ms
if second_event["time_stamp"] - event["time_stamp"] > 0.5:
break
if "x" in second_event and "y" in second_event:
# if the mouse moves out of the click radius/rectangle, it is not a click sequence
if math.sqrt((second_event["y"] - event["y"]) ** 2 +
(second_event["x"] - event["x"]) ** 2) > 4:
break
if second_event["action"] == "click" and second_event["pressed"]:
for k, third_event in enumerate(events[index + j + 2:]):
if third_event["time_stamp"] - second_event["time_stamp"] > 0.5:
break
if "x" in third_event and "y" in third_event:
if math.sqrt((third_event["y"] - event["y"]) ** 2 +
(third_event["x"] - event["x"]) ** 2) > 5:
break
if third_event["action"] == "click" and third_event["pressed"]:
result.append({
"action_type": "CLICK",
"parameters": {
"button": button,
"num_clicks": 3
}
})
return 2, 2
result.append({
"action_type": "CLICK",
"parameters": {
"button": button,
"num_clicks": 2
}
})
return 1, 1
result.append({
"action_type": "MOUSE_DOWN",
"parameters": {
"button": button
}
})
return 0, 0
if event["action"] == "move" and compress_move:
first_move, last_move, index = self.compress_mouse_move(data, index)
result["action"].extend([self.event_to_action(last_move)])
result["event"].extend([last_move])
else:
result["action"].append(self.event_to_action(event))
result["event"].append(event)
first_move, last_move, index = self.compress_mouse_move(events, index)
result.extend([self.event_to_action(last_move)])
elif event["action"] == "click":
button = event["button"]
if event["pressed"]:
if presses_to_skip == 0:
presses, releases = do_mouse_press(button)
presses_to_skip += presses
releases_to_skip += releases
else:
presses_to_skip -= 1
else:
if releases_to_skip == 0:
result.append({
"action_type": "MOUSE_UP",
"parameters": {
"button": button
}
})
else:
releases_to_skip -= 1
index += 1
# Compress the key down and key up actions
# todo: handling the key down and key up events
_new_actions = []
_action = list(result["action"])
idx = 0
while True:
if idx >= len(_action):
break
if _action[idx]["action_type"] == self.enum_to_str(Action.KEY_DOWN):
typed_text = []
while idx < len(_action) and _action[idx]["action_type"] in [self.enum_to_str(Action.KEY_DOWN), self.enum_to_str(Action.KEY_UP)] and len(_action[idx]["key"]) == 1:
if _action[idx]["action_type"] == self.enum_to_str(Action.KEY_DOWN):
typed_text.append(chr(_action[idx]["key"][0]))
idx += 1
if typed_text:
_new_actions.append({"action_type": self.enum_to_str(Action.TYPE), "text": typed_text})
else:
_new_actions.append(_action[idx])
idx += 1
else:
_new_actions.append(_action[idx])
idx += 1
result["action"] = _new_actions
# Compress the scroll up and scroll down events
# todo: handling the key down and key up events
_new_actions = []
_action = list(result["action"])
idx = 0
while True:
if idx >= len(_action):
break
if _action[idx]["action_type"] == self.enum_to_str(Action.CLICK) and _action[idx]["click_type"] in [self.enum_to_str(MouseClick.WHEEL_UP), self.enum_to_str(MouseClick.WHEEL_DOWN)]:
typed_text = []
while idx < len(_action) and _action[idx]["action_type"] == self.enum_to_str(Action.CLICK) and _action[idx]["click_type"] in [self.enum_to_str(MouseClick.WHEEL_UP), self.enum_to_str(MouseClick.WHEEL_DOWN)]:
if _action[idx]["click_type"] == self.enum_to_str(MouseClick.WHEEL_UP):
typed_text.append("UP")
idx += 1
elif _action[idx]["click_type"] == self.enum_to_str(MouseClick.WHEEL_DOWN):
typed_text.append("DOWN")
idx += 1
_new_actions.append({"action_type": self.enum_to_str(Action.CLICK), "click_type": "SCROLL", "text": typed_text})
else:
_new_actions.append(_action[idx])
idx += 1
result["action"] = _new_actions
# Compress the mouse down and mouse up actions
# todo: handling the key down and key up events
_new_actions = []
_action = list(result["action"])
idx = 0
while True:
if idx >= len(_action):
break
if _action[idx]["action_type"] == self.enum_to_str(Action.MOUSE_DOWN):
if idx + 1 < len(_action) and _action[idx+1]["action_type"] == self.enum_to_str(Action.MOUSE_UP):
_new_actions.append({"action_type": self.enum_to_str(Action.CLICK), "click_type": _action[idx]["click_type"]})
idx += 2
else:
_new_actions.append(_action[idx])
idx += 1
else:
_new_actions.append(_action[idx])
idx += 1
result["action"] = _new_actions
result.append(self.event_to_action(event))
index += 1
with open(out_file, "w") as f:
json.dump(result, f)
if __name__ == "__main__":
converter = DuckTrackEventActionConverter(human_readable=True)
converter.ducktrack_event_file_to_action(ducktrack_event_file="sample.jsonl",
out_file="output.json",
compress_move=True)
converter = DuckTrackEventActionConverter()
converter.ducktrack_event_file_to_action(
ducktrack_event_file="events_calc.jsonl",
out_file="events_calc.json",
compress_move=True
)