Add DuckTrack as initial annotation tool; Initial multimodal test

This commit is contained in:
Timothyxxx
2023-11-27 00:34:57 +08:00
parent 8c0525c20e
commit 8272e93953
53 changed files with 1705 additions and 0 deletions

172
annotation/.gitignore vendored Normal file
View File

@@ -0,0 +1,172 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# experiments
experiments/**/*.png
experiments/**/*.csv
experiments/**/*.mp4
experiments/**/*.jsonl
experiments/**/*.json
experiments/**/*.md
experiments/**/*.txt
# macos
*DS_Store*

21
annotation/LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2023 DuckAI
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

50
annotation/OBS_SETUP.md Normal file
View File

@@ -0,0 +1,50 @@
# OBS Setup
These are instructions on setting up OBS (Open Broadcaster Software) to record screen activity for creating the multimodal computer dataset.
## Installation
1. Go to the OBS Project website: [https://obsproject.com/](https://obsproject.com/).
2. Choose the appropriate installer for your operating system.
3.
![Operating System Selection](readme_images/Screenshot%202023-06-17%20220155.png)
3. Run the installer from your downloads folder and grant OBS the necessary permissions for installation.
![Installer Run](readme_images/Screenshot%202023-06-24%20115916.png)
4. Keep the default settings and proceed through the installation wizard by clicking "Next" and then "Finish."
![Installation Wizard](readme_images/Screenshot%202023-06-24%20120133.png)
5. OBS should now be open. If not, search for and open the application.
![Open OBS](readme_images/Screenshot%202023-06-17%20221407.png)
## Enabling OBS WebSocket Server
1. Click on "Tools" in the Navigation Bar within OBS, and then select "WebSocket Server Settings." A pop-up window will appear.
![WebSocket Server Settings](readme_images/Screenshot%202023-06-17%20221553.png)
2. Check the box next to "Enable WebSocket server" and uncheck the box next to "Enable Authentication." Click "Apply," then "Ok." You should return to the main OBS page.
Make sure the port is set to 4455.
![Enable WebSocket Server](readme_images/Screenshot%202023-06-24%20120347.png)
## Adding Display Capture and Recording
1. Now, back on the home page of OBS, select "Scene." Under "Sources," click the "+" button and then click "Display Capture." (in MacOS this is MacOS Screen Capture)
![Display Capture](readme_images/Screenshot%202023-06-24%20110823.png)
2. Select "Ok."
![Confirm Display Capture](readme_images/Screenshot%202023-06-24%20111017.png)
3. Make sure the "Display" is set to your main display, and you should see your screen on the canvas. Select "Ok." _(in MacOS if your screen is black with a red square in the top left try to disable then re-enable OBS Screen Recording permissions, this has worked before)_
![Main Display](readme_images/Screenshot%202023-06-24%20112001.png)
4. Now you can close OBS and OBS will opened and controlled automatically when you launch the Computer Tracker App. Also, the Computer Tracker app creates a new OBS profile so you don't have to worry about your previous settings being messed up.
![Recording in Progress](readme_images/Screenshot%202023-06-24%20113548.png)

98
annotation/README.md Normal file
View File

@@ -0,0 +1,98 @@
# DuckTrack
This is the repository for the DuckAI DuckTrack app which records all keyboard and mouse input as well as the screen for use in a multimodal computer interaction dataset.
## Installation & Setup
### Download Application
<!-- TODO: add prebuilt applications in github releases -->
Download the pre-built application for your system [here](https://github.com/TheDuckAI/DuckTrack/releases/).
Make sure you have OBS downloaded with the following configuration:
1. Have a screen capture source recording your whole main screen.
2. Enable desktop audio and mute microphone.
3. Make sure the default websocket is enabled.
More detailed instructions for OBS setup and installation located [here](OBS_SETUP.md).
If you are on MacOS, make sure to enable to the following Privacy & Security permissions before running the app:
1. Accessibility (for playing back actions)
2. Input Monitoring (for reading keyboard inputs)
Make sure to accept all other security permission dialogues to ensure that the app works properly.
### Build from source
Have Python >=3.11.
Clone this repo and `cd` into it:
```bash
$ git clone https://github.com/TheDuckAI/DuckTrack
$ cd DuckTrack
```
Install the dependencies for this project:
```bash
$ pip install -r requirements.txt
```
Build the application:
```bash
$ python3 build.py
```
The built application should be located in the generated `dist` directory. After this, follow the remaining relevant setup instructions.
## Running the App
You can run the app like any other desktop app on your computer. If you decided to not download the app or build it from source, just run `python main.py` and it should work the same. You will be interacting with the app through an app tray icon or a small window.
### Recording
From the app tray or GUI, you can start and stop a recording as well as pause and resume a recording. Pausing and resuming is important for when you want to hide sensitive information like credit card of login credentials. You can optionally name your recording and give it a description upon stopping a recording. You can also view your recordings by pressing the "Show Recordings" option.
### Playback
You can playback a recording, i.e. simulate the series of events from the recording, by pressing "Play Latest Recording", which plays the latest created recording, or by pressing "Play Custom Recording", which lets you choose a recording to play. You can easily replay the most recently played recording by pressing "Replay Recording".
To stop the app mid-playback, just press `shift`+`esc` on your keyboard.
### Misc
To quit the app, you just press the "Quit" option.
## Recording Format
Recordings are stored in `Documents/DuckTrack_Recordings`. Each recording is a directory containing:
1. `events.jsonl` file - sequence of all computer actions that happened. A sample event may look like this:
```json
{"time_stamp": 1234567.89, "action": "move", "x": 69.0, "y": 420.0}
```
1. `metadata.json` - stores metadata about the computer that made the recording
2. `README.md` - stores the description for the recording
3. MP4 file - the screen recording from OBS of the recording.
Here is a [sample recording](example) for further reference.
## Technical Overview
<!-- maybe put a nice graphical representation of the app here -->
*TDB*
## Known Bugs
- After doing lots of playbacks on macOS, a segfault will occur.
- Mouse movement is not captured when the current application is using raw input, i.e. video games.
- OBS may not open in the background properly on some Linux machines.
## Things To Do
- Add logging
- Testing
- CI (with builds and testing)
- Add way to hide/show window from the app tray (and it saves that as a preference?)
- Make saving preferences a thing generally, like with natural scrolling too

BIN
annotation/assets/duck.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.5 KiB

BIN
annotation/assets/duck.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.4 KiB

27
annotation/build.py Normal file
View File

@@ -0,0 +1,27 @@
import shutil
import sys
from pathlib import Path
from platform import system
from subprocess import CalledProcessError, run
project_dir = Path(".")
assets_dir = project_dir / "assets"
main_py = project_dir / "main.py"
icon_file = assets_dir / ("duck.ico" if system() == "Windows" else "duck.png")
for dir_to_remove in ["dist", "build"]:
dir_path = project_dir / dir_to_remove
if dir_path.exists():
shutil.rmtree(dir_path)
pyinstaller_cmd = [
"pyinstaller", "--onefile", "--windowed",
f"--add-data={assets_dir}{';' if system() == 'Windows' else ':'}{assets_dir}",
f"--name=DuckTrack", f"--icon={icon_file}", str(main_py)
]
try:
run(pyinstaller_cmd, check=True)
except CalledProcessError as e:
print("An error occurred while running PyInstaller:", e)
sys.exit(1)

View File

@@ -0,0 +1 @@
from .app import MainInterface

251
annotation/ducktrack/app.py Normal file
View File

@@ -0,0 +1,251 @@
import os
import sys
from platform import system
from PyQt6.QtCore import QTimer, pyqtSlot
from PyQt6.QtGui import QAction, QIcon
from PyQt6.QtWidgets import (QApplication, QCheckBox, QDialog, QFileDialog,
QFormLayout, QLabel, QLineEdit, QMenu,
QMessageBox, QPushButton, QSystemTrayIcon,
QTextEdit, QVBoxLayout, QWidget)
from .obs_client import close_obs, is_obs_running, open_obs
from .playback import Player, get_latest_recording
from .recorder import Recorder
from .util import get_recordings_dir, open_file
class TitleDescriptionDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Recording Details")
layout = QVBoxLayout(self)
self.form_layout = QFormLayout()
self.title_label = QLabel("Title:")
self.title_input = QLineEdit(self)
self.form_layout.addRow(self.title_label, self.title_input)
self.description_label = QLabel("Description:")
self.description_input = QTextEdit(self)
self.form_layout.addRow(self.description_label, self.description_input)
layout.addLayout(self.form_layout)
self.submit_button = QPushButton("Save", self)
self.submit_button.clicked.connect(self.accept)
layout.addWidget(self.submit_button)
def get_values(self):
return self.title_input.text(), self.description_input.toPlainText()
class MainInterface(QWidget):
def __init__(self, app: QApplication):
super().__init__()
self.tray = QSystemTrayIcon(QIcon(resource_path("assets/duck.png")))
self.tray.show()
self.app = app
self.init_tray()
self.init_window()
if not is_obs_running():
self.obs_process = open_obs()
def init_window(self):
self.setWindowTitle("DuckTrack")
layout = QVBoxLayout(self)
self.toggle_record_button = QPushButton("Start Recording", self)
self.toggle_record_button.clicked.connect(self.toggle_record)
layout.addWidget(self.toggle_record_button)
self.toggle_pause_button = QPushButton("Pause Recording", self)
self.toggle_pause_button.clicked.connect(self.toggle_pause)
self.toggle_pause_button.setEnabled(False)
layout.addWidget(self.toggle_pause_button)
self.show_recordings_button = QPushButton("Show Recordings", self)
self.show_recordings_button.clicked.connect(lambda: open_file(get_recordings_dir()))
layout.addWidget(self.show_recordings_button)
self.play_latest_button = QPushButton("Play Latest Recording", self)
self.play_latest_button.clicked.connect(self.play_latest_recording)
layout.addWidget(self.play_latest_button)
self.play_custom_button = QPushButton("Play Custom Recording", self)
self.play_custom_button.clicked.connect(self.play_custom_recording)
layout.addWidget(self.play_custom_button)
self.replay_recording_button = QPushButton("Replay Recording", self)
self.replay_recording_button.clicked.connect(self.replay_recording)
self.replay_recording_button.setEnabled(False)
layout.addWidget(self.replay_recording_button)
self.quit_button = QPushButton("Quit", self)
self.quit_button.clicked.connect(self.quit)
layout.addWidget(self.quit_button)
self.natural_scrolling_checkbox = QCheckBox("Natural Scrolling", self, checked=system() == "Darwin")
layout.addWidget(self.natural_scrolling_checkbox)
self.natural_scrolling_checkbox.stateChanged.connect(self.toggle_natural_scrolling)
self.setLayout(layout)
def init_tray(self):
self.menu = QMenu()
self.tray.setContextMenu(self.menu)
self.toggle_record_action = QAction("Start Recording")
self.toggle_record_action.triggered.connect(self.toggle_record)
self.menu.addAction(self.toggle_record_action)
self.toggle_pause_action = QAction("Pause Recording")
self.toggle_pause_action.triggered.connect(self.toggle_pause)
self.toggle_pause_action.setVisible(False)
self.menu.addAction(self.toggle_pause_action)
self.show_recordings_action = QAction("Show Recordings")
self.show_recordings_action.triggered.connect(lambda: open_file(get_recordings_dir()))
self.menu.addAction(self.show_recordings_action)
self.play_latest_action = QAction("Play Latest Recording")
self.play_latest_action.triggered.connect(self.play_latest_recording)
self.menu.addAction(self.play_latest_action)
self.play_custom_action = QAction("Play Custom Recording")
self.play_custom_action.triggered.connect(self.play_custom_recording)
self.menu.addAction(self.play_custom_action)
self.replay_recording_action = QAction("Replay Recording")
self.replay_recording_action.triggered.connect(self.replay_recording)
self.menu.addAction(self.replay_recording_action)
self.replay_recording_action.setVisible(False)
self.quit_action = QAction("Quit")
self.quit_action.triggered.connect(self.quit)
self.menu.addAction(self.quit_action)
self.menu.addSeparator()
self.natural_scrolling_option = QAction("Natural Scrolling", checkable=True, checked=system() == "Darwin")
self.natural_scrolling_option.triggered.connect(self.toggle_natural_scrolling)
self.menu.addAction(self.natural_scrolling_option)
@pyqtSlot()
def replay_recording(self):
player = Player()
if hasattr(self, "last_played_recording_path"):
player.play(self.last_played_recording_path)
else:
self.display_error_message("No recording has been played yet!")
@pyqtSlot()
def play_latest_recording(self):
player = Player()
recording_path = get_latest_recording()
self.last_played_recording_path = recording_path
self.replay_recording_action.setVisible(True)
self.replay_recording_button.setEnabled(True)
player.play(recording_path)
@pyqtSlot()
def play_custom_recording(self):
player = Player()
directory = QFileDialog.getExistingDirectory(None, "Select Recording", get_recordings_dir())
if directory:
self.last_played_recording_path = directory
self.replay_recording_button.setEnabled(True)
self.replay_recording_action.setVisible(True)
player.play(directory)
@pyqtSlot()
def quit(self):
if hasattr(self, "recorder_thread"):
self.toggle_record()
if hasattr(self, "obs_process"):
close_obs(self.obs_process)
self.app.quit()
def closeEvent(self, event):
self.quit()
@pyqtSlot()
def toggle_natural_scrolling(self):
sender = self.sender()
if sender == self.natural_scrolling_checkbox:
state = self.natural_scrolling_checkbox.isChecked()
self.natural_scrolling_option.setChecked(state)
else:
state = self.natural_scrolling_option.isChecked()
self.natural_scrolling_checkbox.setChecked(state)
@pyqtSlot()
def toggle_pause(self):
if self.recorder_thread._is_paused:
self.recorder_thread.resume_recording()
self.toggle_pause_action.setText("Pause Recording")
self.toggle_pause_button.setText("Pause Recording")
else:
self.recorder_thread.pause_recording()
self.toggle_pause_action.setText("Resume Recording")
self.toggle_pause_button.setText("Resume Recording")
@pyqtSlot()
def toggle_record(self):
if not hasattr(self, "recorder_thread"):
self.recorder_thread = Recorder(natural_scrolling=self.natural_scrolling_checkbox.isChecked())
self.recorder_thread.recording_stopped.connect(self.on_recording_stopped)
self.recorder_thread.start()
self.update_menu(True)
else:
self.recorder_thread.stop_recording()
self.recorder_thread.terminate()
recording_dir = self.recorder_thread.recording_path
del self.recorder_thread
dialog = TitleDescriptionDialog()
QTimer.singleShot(0, dialog.raise_)
result = dialog.exec()
if result == QDialog.DialogCode.Accepted:
title, description = dialog.get_values()
if title:
renamed_dir = os.path.join(os.path.dirname(recording_dir), title)
os.rename(recording_dir, renamed_dir)
with open(os.path.join(renamed_dir, 'README.md'), 'w') as f:
f.write(description)
self.on_recording_stopped()
@pyqtSlot()
def on_recording_stopped(self):
self.update_menu(False)
def update_menu(self, is_recording: bool):
self.toggle_record_button.setText("Stop Recording" if is_recording else "Start Recording")
self.toggle_record_action.setText("Stop Recording" if is_recording else "Start Recording")
self.toggle_pause_button.setEnabled(is_recording)
self.toggle_pause_action.setVisible(is_recording)
def display_error_message(self, message):
QMessageBox.critical(None, "Error", message)
def resource_path(relative_path: str) -> str:
if hasattr(sys, '_MEIPASS'):
base_path = getattr(sys, "_MEIPASS")
else:
base_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')
return os.path.join(base_path, relative_path)

View File

@@ -0,0 +1,33 @@
from pynput.keyboard import Listener
from .util import name_to_key
class KeyCombinationListener:
"""
Simple and bad key combination listener.
"""
def __init__(self):
self.current_keys = set()
self.callbacks = {}
self.listener = Listener(on_press=self.on_key_press, on_release=self.on_key_release)
def add_comb(self, keys, callback):
self.callbacks[tuple([name_to_key(key_name) for key_name in sorted(keys)])] = callback
def on_key_press(self, key):
self.current_keys.add(key)
for comb, callback in self.callbacks.items():
if all(k in self.current_keys for k in comb):
return callback()
def on_key_release(self, key):
if key in self.current_keys:
self.current_keys.remove(key)
def start(self):
self.listener.start()
def stop(self):
self.listener.stop()

View File

@@ -0,0 +1,60 @@
import json
import os
import uuid
from datetime import datetime
from platform import uname
from screeninfo import get_monitors
class MetadataManager:
"""
Handles various system metadata collection.
"""
def __init__(self, recording_path: str, natural_scrolling: bool):
self.recording_path = recording_path
self.metadata = uname()._asdict()
self.metadata["id"] = uuid.getnode()
main_monitor = get_monitors()[0]
self.metadata["screen_width"] = main_monitor.width
self.metadata["screen_height"] = main_monitor.height
try:
match self.metadata["system"]:
case "Windows":
import wmi
for item in wmi.WMI().Win32_ComputerSystem():
self.metadata["model"] = item.Model
break
case "Darwin":
import subprocess
model = subprocess.check_output(["sysctl", "-n", "hw.model"]).decode().strip()
self.metadata["model"] = model
case "Linux":
with open("/sys/devices/virtual/dmi/id/product_name", "r") as f:
self.metadata["model"] = f.read().strip()
except:
self.metadata["model"] = "Unknown"
self.metadata["scroll_direction"] = -1 if natural_scrolling else 1
def save_metadata(self):
metadata_path = os.path.join(self.recording_path, "metadata.json")
with open(metadata_path, "w") as f:
json.dump(self.metadata, f, indent=4)
def collect(self):
self.metadata["start_time"] = self._get_time_stamp()
def end_collect(self):
self.metadata["stop_time"] = self._get_time_stamp()
def add_obs_record_state_timings(self, record_state_events: dict[str, float]):
self.metadata["obs_record_state_timings"] = record_state_events
def _get_time_stamp(self):
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

View File

@@ -0,0 +1,200 @@
import os
import subprocess
import time
from platform import system
import obsws_python as obs
import psutil
def is_obs_running() -> bool:
try:
for process in psutil.process_iter(attrs=["pid", "name"]):
if "obs" in process.info["name"].lower():
return True
return False
except:
raise Exception("Could not check if OBS is running already. Please check manually.")
def close_obs(obs_process: subprocess.Popen):
if obs_process:
obs_process.terminate()
try:
obs_process.wait(timeout=5)
except subprocess.TimeoutExpired:
obs_process.kill()
def find_obs() -> str:
common_paths = {
"Windows": [
"C:\\Program Files\\obs-studio\\bin\\64bit\\obs64.exe",
"C:\\Program Files (x86)\\obs-studio\\bin\\32bit\\obs32.exe"
],
"Darwin": [
"/Applications/OBS.app/Contents/MacOS/OBS",
"/opt/homebrew/bin/obs"
],
"Linux": [
"/usr/bin/obs",
"/usr/local/bin/obs"
]
}
for path in common_paths.get(system(), []):
if os.path.exists(path):
return path
try:
if system() == "Windows":
obs_path = subprocess.check_output("where obs", shell=True).decode().strip()
else:
obs_path = subprocess.check_output("which obs", shell=True).decode().strip()
if os.path.exists(obs_path):
return obs_path
except subprocess.CalledProcessError:
pass
return "obs"
def open_obs() -> subprocess.Popen:
try:
obs_path = find_obs()
if system() == "Windows":
# you have to change the working directory first for OBS to find the correct locale on windows
os.chdir(os.path.dirname(obs_path))
obs_path = os.path.basename(obs_path)
return subprocess.Popen([obs_path, "--startreplaybuffer", "--minimize-to-tray"])
except:
raise Exception("Failed to find OBS, please open OBS manually.")
class OBSClient:
"""
Controls the OBS client via the OBS websocket.
Sets all the correct settings for recording.
"""
def __init__(
self,
recording_path: str,
metadata: dict,
fps=30,
output_width=1280,
output_height=720,
):
self.metadata = metadata
self.req_client = obs.ReqClient()
self.event_client = obs.EventClient()
self.record_state_events = {}
def on_record_state_changed(data):
output_state = data.output_state
print("record state changed:", output_state)
if output_state not in self.record_state_events:
self.record_state_events[output_state] = []
self.record_state_events[output_state].append(time.perf_counter())
self.event_client.callback.register(on_record_state_changed)
self.old_profile = self.req_client.get_profile_list().current_profile_name
if "computer_tracker" not in self.req_client.get_profile_list().profiles:
self.req_client.create_profile("computer_tracker")
else:
self.req_client.set_current_profile("computer_tracker")
self.req_client.create_profile("temp")
self.req_client.remove_profile("temp")
self.req_client.set_current_profile("computer_tracker")
base_width = metadata["screen_width"]
base_height = metadata["screen_height"]
if metadata["system"] == "Darwin":
# for retina displays
# TODO: check if external displays are messed up by this
base_width *= 2
base_height *= 2
scaled_width, scaled_height = _scale_resolution(base_width, base_height, output_width, output_height)
self.req_client.set_profile_parameter("Video", "BaseCX", str(base_width))
self.req_client.set_profile_parameter("Video", "BaseCY", str(base_height))
self.req_client.set_profile_parameter("Video", "OutputCX", str(scaled_width))
self.req_client.set_profile_parameter("Video", "OutputCY", str(scaled_height))
self.req_client.set_profile_parameter("Video", "ScaleType", "lanczos")
self.req_client.set_profile_parameter("AdvOut", "RescaleRes", f"{base_width}x{base_height}")
self.req_client.set_profile_parameter("AdvOut", "RecRescaleRes", f"{base_width}x{base_height}")
self.req_client.set_profile_parameter("AdvOut", "FFRescaleRes", f"{base_width}x{base_height}")
self.req_client.set_profile_parameter("Video", "FPSCommon", str(fps))
self.req_client.set_profile_parameter("Video", "FPSInt", str(fps))
self.req_client.set_profile_parameter("Video", "FPSNum", str(fps))
self.req_client.set_profile_parameter("Video", "FPSDen", "1")
self.req_client.set_profile_parameter("SimpleOutput", "RecFormat2", "mp4")
bitrate = int(_get_bitrate_mbps(scaled_width, scaled_height, fps=fps) * 1000 / 50) * 50
self.req_client.set_profile_parameter("SimpleOutput", "VBitrate", str(bitrate))
# do this in order to get pause & resume
self.req_client.set_profile_parameter("SimpleOutput", "RecQuality", "Small")
self.req_client.set_profile_parameter("SimpleOutput", "FilePath", recording_path)
# TODO: not all OBS configs have this, maybe just instruct the user to mute themselves
try:
self.req_client.set_input_mute("Mic/Aux", muted=True)
except obs.error.OBSSDKRequestError :
# In case there is no Mic/Aux input, this will throw an error
pass
def start_recording(self):
self.req_client.start_record()
def stop_recording(self):
self.req_client.stop_record()
self.req_client.set_current_profile(self.old_profile) # restore old profile
def pause_recording(self):
self.req_client.pause_record()
def resume_recording(self):
self.req_client.resume_record()
def _get_bitrate_mbps(width: int, height: int, fps=30) -> float:
"""
Gets the YouTube recommended bitrate in Mbps for a given resolution and framerate.
Refer to https://support.google.com/youtube/answer/1722171?hl=en#zippy=%2Cbitrate
"""
resolutions = {
(7680, 4320): {30: 120, 60: 180},
(3840, 2160): {30: 40, 60: 60.5},
(2160, 1440): {30: 16, 60: 24},
(1920, 1080): {30: 8, 60: 12},
(1280, 720): {30: 5, 60: 7.5},
(640, 480): {30: 2.5, 60: 4},
(480, 360): {30: 1, 60: 1.5}
}
if (width, height) in resolutions:
return resolutions[(width, height)].get(fps)
else:
# approximate the bitrate using a simple linear model
area = width * height
multiplier = 3.5982188179592543e-06 if fps == 30 else 5.396175171097084e-06
constant = 2.418399836285939 if fps == 30 else 3.742780056500365
return multiplier * area + constant
def _scale_resolution(base_width: int, base_height: int, target_width: int, target_height: int) -> tuple[int, int]:
target_area = target_width * target_height
aspect_ratio = base_width / base_height
scaled_height = int((target_area / aspect_ratio) ** 0.5)
scaled_width = int(aspect_ratio * scaled_height)
return scaled_width, scaled_height

View File

@@ -0,0 +1,188 @@
import json
import math
import os
import sys
import time
import pyautogui
from pynput.keyboard import Controller as KeyboardController
from pynput.keyboard import Key
from pynput.mouse import Button
from pynput.mouse import Controller as MouseController
from .keycomb import KeyCombinationListener
from .util import (fix_windows_dpi_scaling, get_recordings_dir, name_to_button,
name_to_key)
pyautogui.PAUSE = 0
pyautogui.DARWIN_CATCH_UP_TIME = 0
class Player:
"""
Plays back recordings.
"""
def __init__(self):
self.stop_playback = False
self.listener = KeyCombinationListener()
def stop_comb_pressed():
self.stop_playback = True
return False
self.listener.add_comb(("shift", "esc"), stop_comb_pressed)
self.listener.start()
def play(self, recording_path: str):
with open(os.path.join(recording_path, "events.jsonl"), "r") as f:
events = [json.loads(line) for line in f.readlines()]
with open(os.path.join(recording_path, "metadata.json"), "r") as f:
metadata = json.load(f)
self.playback(events, metadata)
def playback(self, events: list[dict], metadata: dict):
if metadata["system"] == "Windows":
fix_windows_dpi_scaling()
mouse_controller = MouseController()
keyboard_controller = KeyboardController()
if not events:
self.listener.stop()
return
presses_to_skip = 0
releases_to_skip = 0
in_click_sequence = False
for i, event in enumerate(events):
start_time = time.perf_counter()
if self.stop_playback:
return
def do_mouse_press(button):
for j, second_event in enumerate(events[i+1:]):
# make sure the time between mouse clicks is less than 500ms
if second_event["time_stamp"] - event["time_stamp"] > 0.5:
break
if "x" in second_event and "y" in second_event:
# if the mouse moves out of the click radius/rectangle, it is not a click sequence
if math.sqrt((second_event["y"] - event["y"]) ** 2 +
(second_event["x"] - event["x"]) ** 2) > 4:
break
if second_event["action"] == "click" and second_event["pressed"]:
for k, third_event in enumerate(events[i+j+2:]):
if third_event["time_stamp"] - second_event["time_stamp"] > 0.5:
break
if "x" in third_event and "y" in third_event:
if math.sqrt((third_event["y"] - event["y"]) ** 2 +
(third_event["x"] - event["x"]) ** 2) > 5:
break
if third_event["action"] == "click" and third_event["pressed"]:
mouse_controller.click(button, 3)
return 2, 2
mouse_controller.click(button, 2)
return 1, 1
mouse_controller.press(button)
return 0, 0
if event["action"] == "move":
mouse_controller.position = (event["x"], event["y"])
elif event["action"] == "click":
button = name_to_button(event["button"])
if event["pressed"]:
if presses_to_skip == 0:
presses, releases = do_mouse_press(button)
presses_to_skip += presses
releases_to_skip += releases
if presses > 0:
in_click_sequence = True
else:
presses_to_skip -= 1
else:
if releases_to_skip == 0:
mouse_controller.release(button)
if in_click_sequence:
keyboard_controller.press(Key.shift)
mouse_controller.click(Button.left)
keyboard_controller.release(Key.shift)
in_click_sequence = False
else:
releases_to_skip -= 1
elif event["action"] == "scroll":
if metadata["system"] == "Windows":
# for some reason on windows, pynput scroll is correct but pyautogui is not
mouse_controller.scroll(metadata["scroll_direction"] * event["dx"], metadata["scroll_direction"] * event["dy"])
else:
pyautogui.hscroll(clicks=metadata["scroll_direction"] * event["dx"])
pyautogui.vscroll(clicks=metadata["scroll_direction"] * event["dy"])
elif event["action"] in ["press", "release"]:
key = name_to_key(event["name"])
if event["action"] == "press":
keyboard_controller.press(key)
else:
keyboard_controller.release(key)
# sleep for the correct amount of time
end_time = time.perf_counter()
execution_time = end_time - start_time
if i + 1 < len(events):
desired_delay = events[i + 1]["time_stamp"] - event["time_stamp"]
delay = desired_delay - execution_time
if delay < 0:
print(f"warning: behind by {-delay * 1000:.3f} ms")
elif delay != 0:
wait_until = time.perf_counter() + delay
while time.perf_counter() < wait_until:
pass
self.listener.stop()
def get_latest_recording() -> str:
recordings_dir = get_recordings_dir()
if not os.path.exists(recordings_dir):
raise Exception("The recordings directory does not exist")
recordings = [os.path.join(recordings_dir, f) for f in os.listdir(recordings_dir) if os.path.isdir(os.path.join(recordings_dir, f))]
if len(recordings) == 0:
raise Exception("You have no recordings to play back")
latest_recording = max(recordings, key=os.path.getctime)
return latest_recording
def main():
player = Player()
if len(sys.argv) > 1:
recording_path = sys.argv[1]
else:
recording_path = get_latest_recording()
player.play(recording_path)
if __name__ == "__main__":
n = 3
print("press shift+esc to stop the playback")
print(f"starting in {n} seconds...")
time.sleep(n)
main()

View File

@@ -0,0 +1,145 @@
import json
import os
import time
from datetime import datetime
from platform import system
from queue import Queue
from pynput import keyboard, mouse
from pynput.keyboard import KeyCode
from PyQt6.QtCore import QThread, pyqtSignal
from .metadata import MetadataManager
from .obs_client import OBSClient
from .util import fix_windows_dpi_scaling, get_recordings_dir
class Recorder(QThread):
"""
Makes recordings.
"""
recording_stopped = pyqtSignal()
def __init__(self, natural_scrolling: bool):
super().__init__()
if system() == "Windows":
fix_windows_dpi_scaling()
self.recording_path = self._get_recording_path()
self._is_recording = False
self._is_paused = False
self.event_queue = Queue()
self.events_file = open(os.path.join(self.recording_path, "events.jsonl"), "a")
self.metadata_manager = MetadataManager(
recording_path=self.recording_path,
natural_scrolling=natural_scrolling
)
self.obs_client = OBSClient(recording_path=self.recording_path,
metadata=self.metadata_manager.metadata)
self.mouse_listener = mouse.Listener(
on_move=self.on_move,
on_click=self.on_click,
on_scroll=self.on_scroll)
self.keyboard_listener = keyboard.Listener(
on_press=self.on_press,
on_release=self.on_release)
def on_move(self, x, y):
if not self._is_paused:
self.event_queue.put({"time_stamp": time.perf_counter(),
"action": "move",
"x": x,
"y": y}, block=False)
def on_click(self, x, y, button, pressed):
if not self._is_paused:
self.event_queue.put({"time_stamp": time.perf_counter(),
"action": "click",
"x": x,
"y": y,
"button": button.name,
"pressed": pressed}, block=False)
def on_scroll(self, x, y, dx, dy):
if not self._is_paused:
self.event_queue.put({"time_stamp": time.perf_counter(),
"action": "scroll",
"x": x,
"y": y,
"dx": dx,
"dy": dy}, block=False)
def on_press(self, key):
if not self._is_paused:
self.event_queue.put({"time_stamp": time.perf_counter(),
"action": "press",
"name": key.char if type(key) == KeyCode else key.name}, block=False)
def on_release(self, key):
if not self._is_paused:
self.event_queue.put({"time_stamp": time.perf_counter(),
"action": "release",
"name": key.char if type(key) == KeyCode else key.name}, block=False)
def run(self):
self._is_recording = True
self.metadata_manager.collect()
self.obs_client.start_recording()
self.mouse_listener.start()
self.keyboard_listener.start()
while self._is_recording:
event = self.event_queue.get()
self.events_file.write(json.dumps(event) + "\n")
def stop_recording(self):
if self._is_recording:
self._is_recording = False
self.metadata_manager.end_collect()
self.mouse_listener.stop()
self.keyboard_listener.stop()
self.obs_client.stop_recording()
self.metadata_manager.add_obs_record_state_timings(self.obs_client.record_state_events)
self.events_file.close()
self.metadata_manager.save_metadata()
self.recording_stopped.emit()
def pause_recording(self):
if not self._is_paused and self._is_recording:
self._is_paused = True
self.obs_client.pause_recording()
self.event_queue.put({"time_stamp": time.perf_counter(),
"action": "pause"}, block=False)
def resume_recording(self):
if self._is_paused and self._is_recording:
self._is_paused = False
self.obs_client.resume_recording()
self.event_queue.put({"time_stamp": time.perf_counter(),
"action": "resume"}, block=False)
def _get_recording_path(self) -> str:
recordings_dir = get_recordings_dir()
if not os.path.exists(recordings_dir):
os.mkdir(recordings_dir)
current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
recording_path = os.path.join(recordings_dir, f"recording-{current_time}")
os.mkdir(recording_path)
return recording_path

View File

@@ -0,0 +1,38 @@
import os
import platform
import subprocess
from pathlib import Path
from pynput.keyboard import Key, KeyCode
from pynput.mouse import Button
def name_to_key(name: str) -> Key | KeyCode:
try:
return getattr(Key, name)
except AttributeError:
return KeyCode.from_char(name)
def name_to_button(name: str) -> Button:
return getattr(Button, name)
def get_recordings_dir() -> str:
documents_folder = Path.home() / 'Documents' / 'DuckTrack_Recordings'
return str(documents_folder)
def fix_windows_dpi_scaling():
"""
Fixes DPI scaling issues with legacy windows applications
Reference: https://pynput.readthedocs.io/en/latest/mouse.html#ensuring-consistent-coordinates-between-listener-and-controller-on-windows
"""
import ctypes
PROCESS_PER_MONITOR_DPI_AWARE = 2
ctypes.windll.shcore.SetProcessDpiAwareness(PROCESS_PER_MONITOR_DPI_AWARE)
def open_file(path):
if platform.system() == "Windows":
os.startfile(path)
elif platform.system() == "Darwin":
subprocess.Popen(["open", path])
else:
subprocess.Popen(["xdg-open", path])

View File

@@ -0,0 +1,48 @@
import glob
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from scipy.stats import sem, t
def calculate_confidence_interval(data, confidence=0.95):
n = len(data)
m = np.mean(data)
std_err = sem(data)
h = std_err * t.ppf((1 + confidence) / 2, n - 1)
return m, m-h, m+h
runs = glob.glob("run*.txt")
TOTAL_EVENTS = 22509
percent_delays = []
all_delays = []
for run in runs:
with open(run, "r") as f:
delays = [float(line.split()[3]) for line in f if float(line.split()[3]) > 0] # consider only positive delays
percent_delays.append((len(delays) / TOTAL_EVENTS) * 100)
all_delays.extend(delays)
average_percent_delays = np.mean(percent_delays)
confidence_interval_percent_delays = calculate_confidence_interval(percent_delays)
print(f"Average percentage of delayed events across all runs: {average_percent_delays:.2f}%")
print(f"95% Confidence interval: ({confidence_interval_percent_delays[1]:.2f}%, {confidence_interval_percent_delays[2]:.2f}%)")
if all_delays:
mean_delay = np.mean(all_delays)
confidence_interval_delays = calculate_confidence_interval(all_delays)
print(f"Mean delay time: {mean_delay:.2f}")
print(f"95% Confidence interval for delay time: ({confidence_interval_delays[1]:.2f}, {confidence_interval_delays[2]:.2f})")
else:
print("No delay data available for calculation.")
sns.histplot(all_delays, bins=30, kde=False)
plt.xlabel('Delay Time (ms)')
plt.ylabel('Frequency')
plt.yscale('log')
plt.title('Histogram of Delay Times (macOS)')
plt.savefig('delays.png', dpi=300)
plt.show()

View File

@@ -0,0 +1,110 @@
import glob
import os
import cv2
import matplotlib.pyplot as plt
import numpy as np
import scipy.stats as stats
from skimage.metrics import structural_similarity as ssim
from tqdm import tqdm
# use this: https://sketch.io
def calculate_rmse(imageA, imageB):
err = np.sum((imageA - imageB) ** 2)
err /= float(imageA.shape[0] * imageA.shape[1])
return np.sqrt(err)
def compare_images(ground_truth_path, sample_paths):
results = []
gt_image = cv2.imread(ground_truth_path, cv2.IMREAD_GRAYSCALE)
if gt_image is None:
raise ValueError("Ground truth image could not be read. Please check the file path.")
gt_image = gt_image.astype("float") / 255.0
for path in tqdm(sample_paths):
sample_image = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
if sample_image is None:
print(f"WARNING: Sample image at path {path} could not be read. Skipping this image.")
continue
sample_image = sample_image.astype("float") / 255.0
rmse_value = calculate_rmse(gt_image, sample_image)
ssim_value, _ = ssim(gt_image, sample_image, full=True, data_range=1) # Corrected line
diff_mask = cv2.absdiff(gt_image, sample_image)
# plt.imshow(diff_mask * 255, cmap='gray')
# plt.title(f'Difference Mask for {os.path.basename(path)}\nRMSE: {rmse_value:.5f} - SSIM: {ssim_value:.5f}')
# plt.show()
results.append({
'path': path,
'rmse': rmse_value,
'ssim': ssim_value,
'diff_mask': diff_mask
})
return results
ground_truth = 'ground_truth.png'
sample_images = glob.glob("samples/*.png")
results = compare_images(ground_truth, sample_images)
for res in results:
print(f"Image: {res['path']} - RMSE: {res['rmse']} - SSIM: {res['ssim']}")
def calculate_confidence_interval(data, confidence_level=0.95):
mean = np.mean(data)
sem = stats.sem(data)
df = len(data) - 1
me = sem * stats.t.ppf((1 + confidence_level) / 2, df)
return mean - me, mean + me
rmse_values = [res['rmse'] for res in results]
ssim_values = [res['ssim'] for res in results]
rmse_mean = np.mean(rmse_values)
rmse_median = np.median(rmse_values)
rmse_stdev = np.std(rmse_values, ddof=1)
ssim_mean = np.mean(ssim_values)
ssim_median = np.median(ssim_values)
ssim_stdev = np.std(ssim_values, ddof=1)
rmse_ci = calculate_confidence_interval(rmse_values)
ssim_ci = calculate_confidence_interval(ssim_values)
print(f"\nRMSE - Mean: {rmse_mean}, Median: {rmse_median}, Std Dev: {rmse_stdev}, 95% CI: {rmse_ci}")
print(f"SSIM - Mean: {ssim_mean}, Median: {ssim_median}, Std Dev: {ssim_stdev}, 95% CI: {ssim_ci}")
print(f"RMSE: {rmse_mean} ± {rmse_ci[1] - rmse_mean}")
print(f"SSIM: {ssim_mean} ± {ssim_ci[1] - ssim_mean}")
def save_average_diff_map(results, save_path='average_diff_map.png'):
if not results:
print("No results available to create an average diff map.")
return
avg_diff_map = None
for res in results:
if avg_diff_map is None:
avg_diff_map = np.zeros_like(res['diff_mask'])
avg_diff_map += res['diff_mask']
avg_diff_map /= len(results)
avg_diff_map = (avg_diff_map * 255).astype(np.uint8)
cv2.imwrite(save_path, avg_diff_map)
# Usage
save_average_diff_map(results)

View File

@@ -0,0 +1,4 @@
success = 10
total = 10
print(success / total)

View File

@@ -0,0 +1,48 @@
import csv
import time
import numpy as np
from tqdm import tqdm
def check_sleep(duration, sleep_function):
start = time.perf_counter()
sleep_function(duration)
end = time.perf_counter()
elapsed = end - start
return abs(elapsed - duration)
def busy_sleep(duration):
end_time = time.perf_counter() + duration
while time.perf_counter() < end_time:
pass
def measure_accuracy(sleep_function, durations, iterations=100):
average_errors = []
for duration in tqdm(durations):
errors = [check_sleep(duration, sleep_function) for _ in range(iterations)]
average_error = np.mean(errors)
average_errors.append(average_error)
return average_errors
durations = np.arange(0.001, 0.101, 0.001) # From 1ms to 100ms in 1ms increments
iterations = 100
sleep_errors = measure_accuracy(time.sleep, durations, iterations)
busy_sleep_errors = measure_accuracy(busy_sleep, durations, iterations)
def save_to_csv(filename, durations, sleep_errors, busy_sleep_errors):
with open(filename, 'w', newline='') as csvfile:
fieldnames = ['duration', 'sleep_error', 'busy_sleep_error']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for duration, sleep_error, busy_sleep_error in zip(durations, sleep_errors, busy_sleep_errors):
writer.writerow({
'duration': duration,
'sleep_error': sleep_error,
'busy_sleep_error': busy_sleep_error
})
print("Data saved to", filename)
save_to_csv('sleep_data.csv', durations * 1000, np.array(sleep_errors) * 1000, np.array(busy_sleep_errors) * 1000)

View File

@@ -0,0 +1,33 @@
import csv
import matplotlib.pyplot as plt
def plot_from_csv(filename, save_plot=False):
durations = []
sleep_errors = []
busy_sleep_errors = []
with open(filename, 'r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
durations.append(float(row['duration']))
sleep_errors.append(float(row['sleep_error']))
busy_sleep_errors.append(float(row['busy_sleep_error']))
plt.figure(figsize=(10, 5))
plt.plot(durations, sleep_errors, label='time.sleep()', marker='o')
plt.plot(durations, busy_sleep_errors, label='busy_sleep()', marker='x')
plt.xlabel('Desired Delay (ms)')
plt.ylabel('Average Error (ms)')
plt.title('Sleep Accuracy: time.sleep() vs Busy-Wait Loop (macOS)')
plt.legend()
plt.grid(True)
if save_plot:
plt.savefig('sleep_accuracy_plot.png', dpi=300)
print("Plot saved as sleep_accuracy_plot.png")
plt.show()
plot_from_csv('sleep_data.csv', save_plot=True)

View File

@@ -0,0 +1,110 @@
import glob
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.stats as stats
import seaborn as sns
# use this: https://www.estopwatch.net/
def read_file(file_path):
df = pd.read_csv(file_path)
df['Elapsed time'] = pd.to_datetime(df['Elapsed time'], errors='coerce')
return df
def analyze_new_error(run_df, groundtruth_df):
cumulative_errors = run_df['Elapsed time'] - groundtruth_df['Elapsed time']
cumulative_errors_in_seconds = cumulative_errors.dt.total_seconds()
new_errors_in_seconds = cumulative_errors_in_seconds.diff().fillna(cumulative_errors_in_seconds[0])
new_error_points = new_errors_in_seconds[new_errors_in_seconds != 0].index.tolist()
return new_errors_in_seconds[new_error_points]
def calculate_statistics(errors):
if len(errors) == 0:
return {
'mean_error': 0,
'median_error': 0,
'stddev_error': 0,
'rmse_error': 0,
'confidence_interval': (0, 0),
'error_frequency': 0
}
mean_error = np.mean(errors)
median_error = np.median(errors)
stddev_error = np.std(errors)
rmse_error = np.sqrt(np.mean(np.square(errors)))
ci_low, ci_high = stats.t.interval(
confidence=0.95,
df=len(errors) - 1,
loc=mean_error,
scale=stats.sem(errors) if len(errors) > 1 else 0
)
return {
'mean_error': mean_error,
'median_error': median_error,
'stddev_error': stddev_error,
'rmse_error': rmse_error,
'confidence_interval': (ci_low, ci_high),
}
def main():
groundtruth_file = 'groundtruth.csv'
run_files = glob.glob('runs/*.csv')
groundtruth_df = read_file(groundtruth_file)
run_dfs = {f'run{i+1}': read_file(file) for i, file in enumerate(run_files)}
total_errors = []
total_points = 0
all_errors = []
for run, df in run_dfs.items():
errors = analyze_new_error(df, groundtruth_df)
total_errors.extend(errors)
all_errors.extend(errors)
total_points += len(df)
results = calculate_statistics(errors)
error_frequency = len(errors) / len(df)
print(f"Results for {run}:")
print(f"Mean New Error: {results['mean_error']:.5f} seconds")
print(f"Median New Error: {results['median_error']:.5f} seconds")
print(f"Standard Deviation of New Error: {results['stddev_error']:.5f} seconds")
print(f"RMSE of New Error: {results['rmse_error']:.5f} seconds")
print(f"95% Confidence Interval of New Error: ({results['confidence_interval'][0]:.5f}, {results['confidence_interval'][1]:.5f}) seconds")
print(f"New Error Frequency: {error_frequency*100:.5f} %")
print('-----------------------------------------')
total_results = calculate_statistics(total_errors)
total_error_frequency = len(total_errors) / total_points
print("Total Statistics:")
print(f"Mean New Error: {total_results['mean_error']:.5f} seconds")
print(f"Median New Error: {total_results['median_error']:.5f} seconds")
print(f"Standard Deviation of New Error: {total_results['stddev_error']:.5f} seconds")
print(f"RMSE of New Error: {total_results['rmse_error']:.5f} seconds")
print(f"95% Confidence Interval of New Error: ({total_results['confidence_interval'][0]:.5f}, {total_results['confidence_interval'][1]:.5f}) seconds")
print(f"New Error Frequency: {total_error_frequency*100:.5f} %")
# do plus minus
print(f"New Error: {total_results['mean_error']:.5f} ± {total_results['confidence_interval'][1] - total_results['mean_error']:.5f} seconds")
plt.figure(figsize=(10, 5))
sns.histplot(all_errors, bins=12, kde=False)
plt.title('Distribution of Newly Introduced Errors (macOS)')
plt.xlabel('Error Duration (seconds)')
plt.ylabel('Frequency')
plt.savefig('error_dist', dpi=300)
plt.show()
if __name__ == "__main__":
main()

39
annotation/main.py Normal file
View File

@@ -0,0 +1,39 @@
import signal
import sys
import traceback
from PyQt6.QtWidgets import QApplication
from ducktrack import MainInterface
def main():
app = QApplication(sys.argv)
app.setQuitOnLastWindowClosed(False)
signal.signal(signal.SIGINT, signal.SIG_DFL)
interface = MainInterface(app)
interface.show()
# TODO: come up with a better error solution to this
original_excepthook = sys.excepthook
def handle_exception(exc_type, exc_value, exc_traceback):
print("Exception type:", exc_type)
print("Exception value:", exc_value)
trace_details = traceback.format_exception(exc_type, exc_value, exc_traceback)
trace_string = "".join(trace_details)
print("Exception traceback:", trace_string)
message = f"An error occurred!\n\n{exc_value}\n\n{trace_string}"
interface.display_error_message(message)
original_excepthook(exc_type, exc_value, exc_traceback)
sys.excepthook = handle_exception
sys.exit(app.exec())
if __name__ == "__main__":
main()

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 156 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 176 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 170 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 156 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 127 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 94 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 192 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 185 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 166 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 68 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 57 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 64 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 442 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 88 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 836 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 674 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 248 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 109 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 144 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 190 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

View File

@@ -0,0 +1,9 @@
git+https://github.com/moses-palmer/pynput.git@refs/pull/541/head # to make sure that it works on Apple Silicon
pyautogui
obsws-python
PyQt6
Pillow
screeninfo
wmi
psutil
pyinstaller

View File

0
mm_agents/__init__.py Normal file
View File

BIN
mm_agents/chrome_start.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 MiB

20
mm_agents/fuyu_test.py Normal file
View File

@@ -0,0 +1,20 @@
from transformers import FuyuProcessor, FuyuForCausalLM
from PIL import Image
image = Image.open("stackoverflow.png").convert("RGB")
# load model and processor
model_id = "adept/fuyu-8b"
processor = FuyuProcessor.from_pretrained(model_id)
model = FuyuForCausalLM.from_pretrained(model_id, device_map="cuda:0")
# prepare inputs for the model
text_prompt = "Description:\n"
inputs = processor(text=text_prompt, images=image, return_tensors="pt").to("cuda:0")
# autoregressively generate text
generation_output = model.generate(**inputs, max_new_tokens=100)
generation_text = processor.batch_decode(generation_output[:, -100:], skip_special_tokens=True)
print(generation_text)

BIN
mm_agents/stackoverflow.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.0 MiB