# Conflicts: # desktop_env/evaluators/getters/__init__.py # desktop_env/evaluators/metrics/__init__.py # requirements.txt
533 lines
19 KiB
Python
533 lines
19 KiB
Python
import ctypes
|
|
import os
|
|
import platform
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
import lxml.etree
|
|
from lxml.etree import _Element
|
|
import pyatspi
|
|
from pyatspi import Accessible, StateType
|
|
from pyatspi import Component, Document
|
|
from pyatspi import Text as ATText
|
|
from pyatspi import Value as ATValue
|
|
from pyatspi import Action as ATAction
|
|
|
|
from typing import List, Dict
|
|
from typing import Any
|
|
|
|
import Xlib
|
|
import pyautogui
|
|
from PIL import Image
|
|
from Xlib import display, X
|
|
from pyxcursor import Xcursor
|
|
|
|
import requests
|
|
from flask import Flask, request, jsonify, send_file, abort
|
|
from werkzeug.utils import secure_filename
|
|
|
|
app = Flask(__name__)
|
|
|
|
pyautogui.PAUSE = 0
|
|
pyautogui.DARWIN_CATCH_UP_TIME = 0
|
|
|
|
logger = app.logger
|
|
|
|
|
|
@app.route('/setup/execute', methods=['POST'])
|
|
@app.route('/execute', methods=['POST'])
|
|
def execute_command():
|
|
data = request.json
|
|
# The 'command' key in the JSON request should contain the command to be executed.
|
|
shell = data.get('shell', False)
|
|
command = data.get('command', "" if shell else [])
|
|
|
|
# Execute the command without any safety checks.
|
|
try:
|
|
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell, text=True)
|
|
return jsonify({
|
|
'status': 'success',
|
|
'output': result.stdout,
|
|
'error': result.stderr
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': str(e)
|
|
}), 500
|
|
|
|
|
|
@app.route('/setup/launch', methods=["POST"])
|
|
def launch_app():
|
|
data = request.json
|
|
command: List[str] = data.get("command", [])
|
|
|
|
try:
|
|
subprocess.Popen(command)
|
|
return "{:} launched successfully".format(" ".join(command))
|
|
except Exception as e:
|
|
return jsonify({"status": "error", "message": str(e)}), 500
|
|
|
|
|
|
@app.route('/screenshot', methods=['GET'])
|
|
def capture_screen_with_cursor():
|
|
# fixme: when running on virtual machines, the cursor is not captured, don't know why
|
|
|
|
file_path = os.path.join("screenshots", "screenshot.png")
|
|
user_platform = platform.system()
|
|
|
|
# Ensure the screenshots directory exists
|
|
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
|
|
|
# fixme: This is a temporary fix for the cursor not being captured on Windows and Linux
|
|
if user_platform == "Windows":
|
|
def _download_image(url, path):
|
|
response = requests.get(url)
|
|
with open(path, 'wb') as file:
|
|
file.write(response.content)
|
|
|
|
cursor_path = os.path.join("screenshots", "cursor.png")
|
|
if not os.path.exists(cursor_path):
|
|
cursor_url = "https://vip.helloimg.com/images/2023/12/02/oQPzmt.png"
|
|
_download_image(cursor_url, cursor_path)
|
|
screenshot = pyautogui.screenshot()
|
|
cursor_x, cursor_y = pyautogui.position()
|
|
cursor = Image.open(cursor_path)
|
|
# make the cursor smaller
|
|
cursor = cursor.resize((int(cursor.width / 1.5), int(cursor.height / 1.5)))
|
|
screenshot.paste(cursor, (cursor_x, cursor_y), cursor)
|
|
screenshot.save(file_path)
|
|
elif user_platform == "Linux":
|
|
cursor_obj = Xcursor()
|
|
imgarray = cursor_obj.getCursorImageArrayFast()
|
|
cursor_img = Image.fromarray(imgarray)
|
|
screenshot = pyautogui.screenshot()
|
|
cursor_x, cursor_y = pyautogui.position()
|
|
screenshot.paste(cursor_img, (cursor_x, cursor_y), cursor_img)
|
|
screenshot.save(file_path)
|
|
elif user_platform == "Darwin": # (Mac OS)
|
|
# Use the screencapture utility to capture the screen with the cursor
|
|
subprocess.run(["screencapture", "-C", file_path])
|
|
else:
|
|
logger.warning(f"The platform you're using ({user_platform}) is not currently supported")
|
|
|
|
return send_file(file_path, mimetype='image/png')
|
|
|
|
_accessibility_ns_map = { "st": "uri:deskat:state.at-spi.gnome.org"
|
|
, "attr": "uri:deskat:attributes.at-spi.gnome.org"
|
|
, "cp": "uri:deskat:component.at-spi.gnome.org"
|
|
, "doc": "uri:deskat:document.at-spi.gnome.org"
|
|
, "docattr": "uri:deskat:attributes.document.at-spi.gnome.org"
|
|
, "txt": "uri:deskat:text.at-spi.gnome.org"
|
|
, "val": "uri:deskat:value.at-spi.gnome.org"
|
|
, "act": "uri:deskat:action.at-spi.gnome.org"
|
|
}
|
|
def _create_node(node: Accessible) -> _Element:
|
|
attribute_dict: Dict[str, Any] = {"name": node.name}
|
|
|
|
# States {{{ #
|
|
states: List[StateType] = node.getState().get_states()
|
|
for st in states:
|
|
state_name: str = StateType._enum_lookup[st]
|
|
attribute_dict[ "{{{:}}}{:}"\
|
|
.format( _accessibility_ns_map["st"]
|
|
, state_name.split("_", maxsplit=1)[1].lower()
|
|
)
|
|
] = "true"
|
|
# }}} States #
|
|
|
|
# Attributes {{{ #
|
|
attributes: List[str] = node.getAttributes()
|
|
for attrbt in attributes:
|
|
attribute_name: str
|
|
attribute_value: str
|
|
attribute_name, attribute_value = attrbt.split(":", maxsplit=1)
|
|
attribute_dict[ "{{{:}}}{:}"\
|
|
.format( _accessibility_ns_map["attr"]
|
|
, attribute_name
|
|
)
|
|
] = attribute_value
|
|
# }}} Attributes #
|
|
|
|
# Component {{{ #
|
|
try:
|
|
component: Component = node.queryComponent()
|
|
except NotImplementedError:
|
|
pass
|
|
else:
|
|
attribute_dict["{{{:}}}screencoord".format(_accessibility_ns_map["cp"])] = str(component.getPosition(pyatspi.XY_SCREEN))
|
|
attribute_dict["{{{:}}}windowcoord".format(_accessibility_ns_map["cp"])] = str(component.getPosition(pyatspi.XY_WINDOW))
|
|
attribute_dict["{{{:}}}parentcoord".format(_accessibility_ns_map["cp"])] = str(component.getPosition(pyatspi.XY_PARENT))
|
|
attribute_dict["{{{:}}}size".format(_accessibility_ns_map["cp"])] = str(component.getSize())
|
|
# }}} Component #
|
|
|
|
# Document {{{ #
|
|
try:
|
|
document: Document = node.queryDocument()
|
|
except NotImplementedError:
|
|
pass
|
|
else:
|
|
attribute_dict["{{{:}}}locale".format(_accessibility_ns_map["doc"])] = document.getLocale()
|
|
attribute_dict["{{{:}}}pagecount".format(_accessibility_ns_map["doc"])] = str(document.getPageCount())
|
|
attribute_dict["{{{:}}}currentpage".format(_accessibility_ns_map["doc"])] = str(document.getCurrentPageNumber())
|
|
for attrbt in document.getAttributes():
|
|
attribute_name: str
|
|
attribute_value: str
|
|
attribute_name, attribute_value = attrbt.split(":", maxsplit=1)
|
|
attribute_dict[ "{{{:}}}{:}"\
|
|
.format( _accessibility_ns_map["docattr"]
|
|
, attribute_name
|
|
)
|
|
] = attribute_value
|
|
# }}} Document #
|
|
|
|
# Text {{{ #
|
|
try:
|
|
text_obj: ATText = node.queryText()
|
|
except NotImplementedError:
|
|
pass
|
|
else:
|
|
# only text shown on current screen is available
|
|
#attribute_dict["txt:text"] = text_obj.getText(0, text_obj.characterCount)
|
|
text: str = text_obj.getText(0, text_obj.characterCount)
|
|
# }}} Text #
|
|
|
|
# Selection {{{ #
|
|
try:
|
|
node.querySelection()
|
|
except NotImplementedError:
|
|
pass
|
|
else:
|
|
attribute_dict["selection"] = "true"
|
|
# }}} Selection #
|
|
|
|
# Value {{{ #
|
|
try:
|
|
value: ATValue = node.queryValue()
|
|
except NotImplementedError:
|
|
pass
|
|
else:
|
|
attribute_dict["{{{:}}}value".format(_accessibility_ns_map["val"])] = str(value.currentValue)
|
|
attribute_dict["{{{:}}}min".format(_accessibility_ns_map["val"])] = str(value.minimumValue)
|
|
attribute_dict["{{{:}}}max".format(_accessibility_ns_map["val"])] = str(value.maximumValue)
|
|
attribute_dict["{{{:}}}step".format(_accessibility_ns_map["val"])] = str(value.minimumIncrement)
|
|
# }}} Value #
|
|
|
|
# Action {{{ #
|
|
try:
|
|
action: ATAction = node.queryAction()
|
|
except NotImplementedError:
|
|
pass
|
|
else:
|
|
for i in range(action.nActions):
|
|
action_name: str = action.getName(i).replace(" ", "-")
|
|
attribute_dict[ "{{{:}}}{:}_desc"\
|
|
.format( _accessibility_ns_map["act"]
|
|
, action_name
|
|
)
|
|
] = action.getDescription(i)
|
|
attribute_dict[ "{{{:}}}{:}_kb"\
|
|
.format( _accessibility_ns_map["act"]
|
|
, action_name
|
|
)
|
|
] = action.getKeyBinding(i)
|
|
# }}} Action #
|
|
|
|
xml_node = lxml.etree.Element( node.getRoleName().replace(" ", "-")
|
|
, attrib=attribute_dict
|
|
, nsmap=_accessibility_ns_map
|
|
)
|
|
if "text" in locals() and len(text)>0:
|
|
xml_node.text = text
|
|
for ch in node:
|
|
xml_node.append(_create_node(ch))
|
|
return xml_node
|
|
|
|
@app.route("/accessibility", methods=["GET"])
|
|
def get_accessibility_tree():
|
|
desktop: Accessible = pyatspi.Registry.getDesktop(0)
|
|
desktop_xml: _Element = _create_node(desktop)
|
|
return jsonify({"AT": lxml.etree.tostring(desktop_xml, encoding="unicode")})
|
|
|
|
@app.route('/screen_size', methods=['POST'])
|
|
def get_screen_size():
|
|
d = display.Display()
|
|
screen_width = d.screen().width_in_pixels
|
|
screen_height = d.screen().height_in_pixels
|
|
return jsonify(
|
|
{
|
|
"width": screen_width,
|
|
"height": screen_height
|
|
}
|
|
)
|
|
|
|
|
|
@app.route('/window_size', methods=['POST'])
|
|
def get_window_size():
|
|
if 'app_class_name' in request.form:
|
|
app_class_name = request.form['app_class_name']
|
|
else:
|
|
return jsonify({"error": "app_class_name is required"}), 400
|
|
|
|
d = display.Display()
|
|
root = d.screen().root
|
|
window_ids = root.get_full_property(d.intern_atom('_NET_CLIENT_LIST'), X.AnyPropertyType).value
|
|
|
|
for window_id in window_ids:
|
|
try:
|
|
window = d.create_resource_object('window', window_id)
|
|
wm_class = window.get_wm_class()
|
|
|
|
if wm_class is None:
|
|
continue
|
|
|
|
if app_class_name.lower() in [name.lower() for name in wm_class]:
|
|
geom = window.get_geometry()
|
|
return jsonify(
|
|
{
|
|
"width": geom.width,
|
|
"height": geom.height
|
|
}
|
|
)
|
|
except Xlib.error.XError: # Ignore windows that give an error
|
|
continue
|
|
return None
|
|
|
|
|
|
@app.route('/desktop_path', methods=['POST'])
|
|
def get_desktop_path():
|
|
# Get the home directory in a platform-independent manner using pathlib
|
|
home_directory = str(Path.home())
|
|
|
|
# Determine the desktop path based on the operating system
|
|
desktop_path = {
|
|
"Windows": os.path.join(home_directory, "Desktop"),
|
|
"Darwin": os.path.join(home_directory, "Desktop"), # macOS
|
|
"Linux": os.path.join(home_directory, "Desktop")
|
|
}.get(platform.system(), None)
|
|
|
|
# Check if the operating system is supported and the desktop path exists
|
|
if desktop_path and os.path.exists(desktop_path):
|
|
return jsonify(desktop_path=desktop_path)
|
|
else:
|
|
return jsonify(error="Unsupported operating system or desktop path not found"), 404
|
|
|
|
|
|
@app.route('/wallpaper', methods=['POST'])
|
|
def get_wallpaper():
|
|
def get_wallpaper_windows():
|
|
SPI_GETDESKWALLPAPER = 0x73
|
|
MAX_PATH = 260
|
|
buffer = ctypes.create_unicode_buffer(MAX_PATH)
|
|
ctypes.windll.user32.SystemParametersInfoW(SPI_GETDESKWALLPAPER, MAX_PATH, buffer, 0)
|
|
return buffer.value
|
|
|
|
def get_wallpaper_macos():
|
|
script = """
|
|
tell application "System Events" to tell every desktop to get picture
|
|
"""
|
|
process = subprocess.Popen(['osascript', '-e', script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
output, error = process.communicate()
|
|
if error:
|
|
app.logger.error("Error: %s", error.decode('utf-8'))
|
|
return None
|
|
return output.strip().decode('utf-8')
|
|
|
|
def get_wallpaper_linux():
|
|
try:
|
|
output = subprocess.check_output(
|
|
["gsettings", "get", "org.gnome.desktop.background", "picture-uri"],
|
|
stderr=subprocess.PIPE
|
|
)
|
|
return output.decode('utf-8').strip().replace('file://', '').replace("'", "")
|
|
except subprocess.CalledProcessError as e:
|
|
app.logger.error("Error: %s", e)
|
|
return None
|
|
|
|
os_name = platform.system()
|
|
wallpaper_path = None
|
|
if os_name == 'Windows':
|
|
wallpaper_path = get_wallpaper_windows()
|
|
elif os_name == 'Darwin':
|
|
wallpaper_path = get_wallpaper_macos()
|
|
elif os_name == 'Linux':
|
|
wallpaper_path = get_wallpaper_linux()
|
|
else:
|
|
app.logger.error(f"Unsupported OS: {os_name}")
|
|
abort(400, description="Unsupported OS")
|
|
|
|
if wallpaper_path:
|
|
try:
|
|
# Ensure the filename is secure
|
|
return send_file(wallpaper_path, mimetype='image/png')
|
|
except Exception as e:
|
|
app.logger.error(f"An error occurred while serving the wallpaper file: {e}")
|
|
abort(500, description="Unable to serve the wallpaper file")
|
|
else:
|
|
abort(404, description="Wallpaper file not found")
|
|
|
|
|
|
@app.route('/list_directory', methods=['POST'])
|
|
def get_directory_tree():
|
|
def _list_dir_contents(directory):
|
|
"""
|
|
List the contents of a directory recursively, building a tree structure.
|
|
|
|
:param directory: The path of the directory to inspect.
|
|
:return: A nested dictionary with the contents of the directory.
|
|
"""
|
|
tree = {'type': 'directory', 'name': os.path.basename(directory), 'children': []}
|
|
try:
|
|
# List all files and directories in the current directory
|
|
for entry in os.listdir(directory):
|
|
full_path = os.path.join(directory, entry)
|
|
# If entry is a directory, recurse into it
|
|
if os.path.isdir(full_path):
|
|
tree['children'].append(_list_dir_contents(full_path))
|
|
else:
|
|
tree['children'].append({'type': 'file', 'name': entry})
|
|
except OSError as e:
|
|
# If the directory cannot be accessed, return the exception message
|
|
tree = {'error': str(e)}
|
|
return tree
|
|
|
|
# Extract the 'path' parameter from the JSON request
|
|
data = request.get_json()
|
|
if 'path' not in data:
|
|
return jsonify(error="Missing 'path' parameter"), 400
|
|
|
|
start_path = data['path']
|
|
# Ensure the provided path is a directory
|
|
if not os.path.isdir(start_path):
|
|
return jsonify(error="The provided path is not a directory"), 400
|
|
|
|
# Generate the directory tree starting from the provided path
|
|
directory_tree = _list_dir_contents(start_path)
|
|
return jsonify(directory_tree=directory_tree)
|
|
|
|
|
|
@app.route('/file', methods=['POST'])
|
|
def get_file():
|
|
# Retrieve filename from the POST request
|
|
if 'file_path' in request.form:
|
|
file_path = request.form['file_path']
|
|
else:
|
|
return jsonify({"error": "file_path is required"}), 400
|
|
|
|
try:
|
|
# Check if the file exists and send it to the user
|
|
return send_file(file_path, as_attachment=True)
|
|
except FileNotFoundError:
|
|
# If the file is not found, return a 404 error
|
|
return jsonify({"error": "File not found"}), 404
|
|
|
|
|
|
@app.route("/setup/upload", methods=["POST"])
|
|
def upload_file():
|
|
# Retrieve filename from the POST request
|
|
if 'file_path' in request.form and 'file_data' in request.files:
|
|
file_path = request.form['file_path']
|
|
file = request.files["file_data"]
|
|
file.save(file_path)
|
|
return "File Uploaded"
|
|
else:
|
|
return jsonify({"error": "file_path and file_data are required"}), 400
|
|
|
|
|
|
@app.route('/platform', methods=['GET'])
|
|
def get_platform():
|
|
return platform.system()
|
|
|
|
|
|
@app.route('/cursor_position', methods=['GET'])
|
|
def get_cursor_position():
|
|
return pyautogui.position().x, pyautogui.position().y
|
|
|
|
|
|
@app.route("/setup/change_wallpaper", methods=['POST'])
|
|
def change_wallpaper():
|
|
data = request.json
|
|
path = data.get('path', None)
|
|
|
|
if not path:
|
|
return "Path not supplied!", 400
|
|
|
|
path = Path(path)
|
|
|
|
if not path.exists():
|
|
return f"File not found: {path}", 404
|
|
|
|
try:
|
|
user_platform = platform.system()
|
|
if user_platform == "Windows":
|
|
import ctypes
|
|
ctypes.windll.user32.SystemParametersInfoW(20, 0, str(path), 3)
|
|
elif user_platform == "Linux":
|
|
import subprocess
|
|
subprocess.run(["gsettings", "set", "org.gnome.desktop.background", "picture-uri", f"file://{path}"])
|
|
elif user_platform == "Darwin": # (Mac OS)
|
|
import subprocess
|
|
subprocess.run(
|
|
["osascript", "-e", f'tell application "Finder" to set desktop picture to POSIX file "{path}"'])
|
|
return "Wallpaper changed successfully"
|
|
except Exception as e:
|
|
return f"Failed to change wallpaper. Error: {e}", 500
|
|
|
|
|
|
@app.route("/setup/download_file", methods=['POST'])
|
|
def download_file():
|
|
data = request.json
|
|
url = data.get('url', None)
|
|
path = data.get('path', None)
|
|
|
|
if not url or not path:
|
|
return "Path or URL not supplied!", 400
|
|
|
|
path = Path(path)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
max_retries = 3
|
|
for i in range(max_retries):
|
|
try:
|
|
response = requests.get(url, stream=True)
|
|
response.raise_for_status()
|
|
|
|
with open(path, 'wb') as f:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
if chunk:
|
|
f.write(chunk)
|
|
return "File downloaded successfully"
|
|
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to download {url}. Retrying... ({max_retries - i - 1} attempts left)")
|
|
|
|
return f"Failed to download {url}. No retries left. Error: {e}", 500
|
|
|
|
|
|
@app.route("/setup/open_file", methods=['POST'])
|
|
def open_file():
|
|
data = request.json
|
|
path = data.get('path', None)
|
|
|
|
if not path:
|
|
return "Path not supplied!", 400
|
|
|
|
path = Path(path)
|
|
|
|
if not path.exists():
|
|
return f"File not found: {path}", 404
|
|
|
|
try:
|
|
if platform.system() == "Windows":
|
|
os.startfile(path)
|
|
else:
|
|
open_cmd: str = "open" if platform.system() == "Darwin" else "xdg-open"
|
|
subprocess.Popen([open_cmd, str(path)])
|
|
return "File opened successfully"
|
|
except Exception as e:
|
|
return f"Failed to open {path}. Error: {e}", 500
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True, host="0.0.0.0")
|