import os from pathlib import Path import platform import subprocess from pyxcursor import Xcursor # import Xlib.display import pyautogui # from PIL import ImageGrab, Image from PIL import Image import lxml.etree from lxml.etree import _Element import pyatspi from pyatspi import Accessible, StateType from pyatspi import Component, Document from pyatspi import Text as ATText from pyatspi import Value as ATValue from pyatspi import Action as ATAction import requests from flask import Flask, request, jsonify, send_file from typing import List, Dict from typing import Any app = Flask(__name__) pyautogui.PAUSE = 0 pyautogui.DARWIN_CATCH_UP_TIME = 0 logger = app.logger @app.route('/setup/execute', methods=['POST']) @app.route('/execute', methods=['POST']) def execute_command(): data = request.json # The 'command' key in the JSON request should contain the command to be executed. shell = data.get('shell', False) command = data.get('command', "" if shell else []) # Execute the command without any safety checks. try: result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell, text=True) return jsonify({ 'status': 'success', 'output': result.stdout, 'error': result.stderr }) except Exception as e: return jsonify({ 'status': 'error', 'message': str(e) }), 500 @app.route('/setup/launch', methods=["POST"]) def launch_app(): data = request.json command: List[str] = data.get("command", []) try: subprocess.Popen(command) return "{:} launched successfully".format(" ".join(command)) except Exception as e: return jsonify( { "status": "error" , "message": str(e) } )\ , 500 @app.route('/screenshot', methods=['GET']) def capture_screen_with_cursor(): # fixme: when running on virtual machines, the cursor is not captured, don't know why file_path = os.path.join("screenshots", "screenshot.png") user_platform = platform.system() # Ensure the screenshots directory exists os.makedirs(os.path.dirname(file_path), exist_ok=True) # fixme: This is a temporary fix for the cursor not being captured on Windows and Linux if user_platform == "Windows": def _download_image(url, path): response = requests.get(url) with open(path, 'wb') as file: file.write(response.content) cursor_path = os.path.join("screenshots", "cursor.png") if not os.path.exists(cursor_path): cursor_url = "https://vip.helloimg.com/images/2023/12/02/oQPzmt.png" _download_image(cursor_url, cursor_path) screenshot = pyautogui.screenshot() cursor_x, cursor_y = pyautogui.position() cursor = Image.open(cursor_path) # make the cursor smaller cursor = cursor.resize((int(cursor.width / 1.5), int(cursor.height / 1.5))) screenshot.paste(cursor, (cursor_x, cursor_y), cursor) screenshot.save(file_path) elif user_platform == "Linux": cursor_obj = Xcursor() imgarray = cursor_obj.getCursorImageArrayFast() cursor_img = Image.fromarray(imgarray) screenshot = pyautogui.screenshot() cursor_x, cursor_y = pyautogui.position() screenshot.paste(cursor_img, (cursor_x, cursor_y), cursor_img) screenshot.save(file_path) elif user_platform == "Darwin": # (Mac OS) # Use the screencapture utility to capture the screen with the cursor subprocess.run(["screencapture", "-C", file_path]) else: logger.warning(f"The platform you're using ({user_platform}) is not currently supported") return send_file(file_path, mimetype='image/png') _accessibility_ns_map = { "st": "uri:deskat:state.at-spi.gnome.org" , "attr": "uri:deskat:attributes.at-spi.gnome.org" , "cp": "uri:deskat:component.at-spi.gnome.org" , "doc": "uri:deskat:document.at-spi.gnome.org" , "docattr": "uri:deskat:attributes.document.at-spi.gnome.org" , "txt": "uri:deskat:text.at-spi.gnome.org" , "val": "uri:deskat:value.at-spi.gnome.org" , "act": "uri:deskat:action.at-spi.gnome.org" } def _create_node(node: Accessible) -> _Element: attribute_dict: Dict[str, Any] = {"name": node.name} # States {{{ # states: List[StateType] = node.getState().get_states() for st in states: state_name: str = StateType._enum_lookup[st] attribute_dict[ "{{{:}}}{:}"\ .format( _accessibility_ns_map["st"] , state_name.split("_", maxsplit=1)[1].lower() ) ] = "true" # }}} States # # Attributes {{{ # attributes: List[str] = node.getAttributes() for attrbt in attributes: attribute_name: str attribute_value: str attribute_name, attribute_value = attrbt.split(":", maxsplit=1) attribute_dict[ "{{{:}}}{:}"\ .format( _accessibility_ns_map["attr"] , attribute_name ) ] = attribute_value # }}} Attributes # # Component {{{ # try: component: Component = node.queryComponent() except NotImplementedError: pass else: attribute_dict["{{{:}}}screencoord".format(_accessibility_ns_map["cp"])] = str(component.getPosition(pyatspi.XY_SCREEN)) attribute_dict["{{{:}}}windowcoord".format(_accessibility_ns_map["cp"])] = str(component.getPosition(pyatspi.XY_WINDOW)) attribute_dict["{{{:}}}parentcoord".format(_accessibility_ns_map["cp"])] = str(component.getPosition(pyatspi.XY_PARENT)) attribute_dict["{{{:}}}size".format(_accessibility_ns_map["cp"])] = str(component.getSize()) # }}} Component # # Document {{{ # try: document: Document = node.queryDocument() except NotImplementedError: pass else: attribute_dict["{{{:}}}locale".format(_accessibility_ns_map["doc"])] = document.getLocale() attribute_dict["{{{:}}}pagecount".format(_accessibility_ns_map["doc"])] = str(document.getPageCount()) attribute_dict["{{{:}}}currentpage".format(_accessibility_ns_map["doc"])] = str(document.getCurrentPageNumber()) for attrbt in document.getAttributes(): attribute_name: str attribute_value: str attribute_name, attribute_value = attrbt.split(":", maxsplit=1) attribute_dict[ "{{{:}}}{:}"\ .format( _accessibility_ns_map["docattr"] , attribute_name ) ] = attribute_value # }}} Document # # Text {{{ # try: text_obj: ATText = node.queryText() except NotImplementedError: pass else: # only text shown on current screen is available #attribute_dict["txt:text"] = text_obj.getText(0, text_obj.characterCount) text: str = text_obj.getText(0, text_obj.characterCount) # }}} Text # # Selection {{{ # try: node.querySelection() except NotImplementedError: pass else: attribute_dict["selection"] = "true" # }}} Selection # # Value {{{ # try: value: ATValue = node.queryValue() except NotImplementedError: pass else: attribute_dict["{{{:}}}value".format(_accessibility_ns_map["val"])] = str(value.currentValue) attribute_dict["{{{:}}}min".format(_accessibility_ns_map["val"])] = str(value.minimumValue) attribute_dict["{{{:}}}max".format(_accessibility_ns_map["val"])] = str(value.maximumValue) attribute_dict["{{{:}}}step".format(_accessibility_ns_map["val"])] = str(value.minimumIncrement) # }}} Value # # Action {{{ # try: action: ATAction = node.queryAction() except NotImplementedError: pass else: for i in range(action.nActions): action_name: str = action.getName(i).replace(" ", "-") attribute_dict[ "{{{:}}}{:}_desc"\ .format( _accessibility_ns_map["act"] , action_name ) ] = action.getDescription(i) attribute_dict[ "{{{:}}}{:}_kb"\ .format( _accessibility_ns_map["act"] , action_name ) ] = action.getKeyBinding(i) # }}} Action # xml_node = lxml.etree.Element( node.getRoleName().replace(" ", "-") , attrib=attribute_dict , nsmap=_accessibility_ns_map ) if "text" in locals() and len(text)>0: xml_node.text = text for ch in node: xml_node.append(_create_node(ch)) return xml_node @app.route("/accessibility", methods=["GET"]) def get_accessibility_tree(): desktop: Accessible = pyatspi.Registry.getDesktop(0) desktop_xml: _Element = _create_node(desktop) return jsonify({"AT": lxml.etree.tostring(desktop_xml, encoding="unicode")}) @app.route('/file', methods=['POST']) def get_file(): # Retrieve filename from the POST request if 'file_path' in request.form: file_path = request.form['file_path'] else: return jsonify({"error": "file_path is required"}), 400 try: # Check if the file exists and send it to the user return send_file(file_path, as_attachment=True) except FileNotFoundError: # If the file is not found, return a 404 error return jsonify({"error": "File not found"}), 404 @app.route("/setup/upload", methods=["POST"]) def upload_file(): # Retrieve filename from the POST request if 'file_path' in request.form and 'file_data' in request.files: file_path = request.form['file_path'] file = request.files["file_data"] file.save(file_path) return "File Uploaded" else: return jsonify({"error": "file_path and file_data are required"}), 400 @app.route('/platform', methods=['GET']) def get_platform(): return platform.system() @app.route('/cursor_position', methods=['GET']) def get_cursor_position(): return pyautogui.position().x, pyautogui.position().y @app.route("/setup/change_wallpaper", methods=['POST']) def change_wallpaper(): data = request.json path = data.get('path', None) if not path: return "Path not supplied!", 400 path = Path(path) if not path.exists(): return f"File not found: {path}", 404 try: user_platform = platform.system() if user_platform == "Windows": import ctypes ctypes.windll.user32.SystemParametersInfoW(20, 0, str(path), 3) elif user_platform == "Linux": import subprocess subprocess.run(["gsettings", "set", "org.gnome.desktop.background", "picture-uri", f"file://{path}"]) elif user_platform == "Darwin": # (Mac OS) import subprocess subprocess.run( ["osascript", "-e", f'tell application "Finder" to set desktop picture to POSIX file "{path}"']) return "Wallpaper changed successfully" except Exception as e: return f"Failed to change wallpaper. Error: {e}", 500 @app.route("/setup/download_file", methods=['POST']) def download_file(): data = request.json url = data.get('url', None) path = data.get('path', None) if not url or not path: return "Path or URL not supplied!", 400 path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) max_retries = 3 for i in range(max_retries): try: response = requests.get(url, stream=True) response.raise_for_status() with open(path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) return "File downloaded successfully" except requests.RequestException as e: logger.error(f"Failed to download {url}. Retrying... ({max_retries - i - 1} attempts left)") return f"Failed to download {url}. No retries left. Error: {e}", 500 @app.route("/setup/open_file", methods=['POST']) def open_file(): data = request.json path = data.get('path', None) if not path: return "Path not supplied!", 400 path = Path(path) if not path.exists(): return f"File not found: {path}", 404 try: if platform.system() == "Windows": os.startfile(path) else: open_cmd: str = "open" if platform.system() == "Darwin" else "xdg-open" subprocess.Popen([open_cmd, str(path)]) return "File opened successfully" except Exception as e: return f"Failed to open {path}. Error: {e}", 500 if __name__ == '__main__': app.run(debug=True, host="0.0.0.0")