import ctypes import os import platform import subprocess from pathlib import Path from typing import List import Xlib import pyautogui import requests from PIL import Image from Xlib import display, X from flask import Flask, request, jsonify, send_file, abort from werkzeug.utils import secure_filename from pyxcursor import Xcursor app = Flask(__name__) pyautogui.PAUSE = 0 pyautogui.DARWIN_CATCH_UP_TIME = 0 logger = app.logger @app.route('/setup/execute', methods=['POST']) @app.route('/execute', methods=['POST']) def execute_command(): data = request.json # The 'command' key in the JSON request should contain the command to be executed. shell = data.get('shell', False) command = data.get('command', "" if shell else []) # Execute the command without any safety checks. try: result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell, text=True) return jsonify({ 'status': 'success', 'output': result.stdout, 'error': result.stderr }) except Exception as e: return jsonify({ 'status': 'error', 'message': str(e) }), 500 @app.route('/setup/launch', methods=["POST"]) def launch_app(): data = request.json command: List[str] = data.get("command", []) try: subprocess.Popen(command) return "{:} launched successfully".format(" ".join(command)) except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @app.route('/screenshot', methods=['GET']) def capture_screen_with_cursor(): # fixme: when running on virtual machines, the cursor is not captured, don't know why file_path = os.path.join("screenshots", "screenshot.png") user_platform = platform.system() # Ensure the screenshots directory exists os.makedirs(os.path.dirname(file_path), exist_ok=True) # fixme: This is a temporary fix for the cursor not being captured on Windows and Linux if user_platform == "Windows": def _download_image(url, path): response = requests.get(url) with open(path, 'wb') as file: file.write(response.content) cursor_path = os.path.join("screenshots", "cursor.png") if not os.path.exists(cursor_path): cursor_url = "https://vip.helloimg.com/images/2023/12/02/oQPzmt.png" _download_image(cursor_url, cursor_path) screenshot = pyautogui.screenshot() cursor_x, cursor_y = pyautogui.position() cursor = Image.open(cursor_path) # make the cursor smaller cursor = cursor.resize((int(cursor.width / 1.5), int(cursor.height / 1.5))) screenshot.paste(cursor, (cursor_x, cursor_y), cursor) screenshot.save(file_path) elif user_platform == "Linux": cursor_obj = Xcursor() imgarray = cursor_obj.getCursorImageArrayFast() cursor_img = Image.fromarray(imgarray) screenshot = pyautogui.screenshot() cursor_x, cursor_y = pyautogui.position() screenshot.paste(cursor_img, (cursor_x, cursor_y), cursor_img) screenshot.save(file_path) elif user_platform == "Darwin": # (Mac OS) # Use the screencapture utility to capture the screen with the cursor subprocess.run(["screencapture", "-C", file_path]) else: logger.warning(f"The platform you're using ({user_platform}) is not currently supported") return send_file(file_path, mimetype='image/png') @app.route('/screen_size', methods=['POST']) def get_screen_size(): d = display.Display() screen_width = d.screen().width_in_pixels screen_height = d.screen().height_in_pixels return jsonify( { "width": screen_width, "height": screen_height } ) @app.route('/window_size', methods=['POST']) def get_window_size(): if 'app_class_name' in request.form: app_class_name = request.form['app_class_name'] else: return jsonify({"error": "app_class_name is required"}), 400 d = display.Display() root = d.screen().root window_ids = root.get_full_property(d.intern_atom('_NET_CLIENT_LIST'), X.AnyPropertyType).value for window_id in window_ids: try: window = d.create_resource_object('window', window_id) wm_class = window.get_wm_class() if wm_class is None: continue if app_class_name.lower() in [name.lower() for name in wm_class]: geom = window.get_geometry() return jsonify( { "width": geom.width, "height": geom.height } ) except Xlib.error.XError: # Ignore windows that give an error continue return None @app.route('/desktop_path', methods=['POST']) def get_desktop_path(): # Get the home directory in a platform-independent manner using pathlib home_directory = str(Path.home()) # Determine the desktop path based on the operating system desktop_path = { "Windows": os.path.join(home_directory, "Desktop"), "Darwin": os.path.join(home_directory, "Desktop"), # macOS "Linux": os.path.join(home_directory, "Desktop") }.get(platform.system(), None) # Check if the operating system is supported and the desktop path exists if desktop_path and os.path.exists(desktop_path): return jsonify(desktop_path=desktop_path) else: return jsonify(error="Unsupported operating system or desktop path not found"), 404 @app.route('/wallpaper', methods=['POST']) def get_wallpaper(): def get_wallpaper_windows(): SPI_GETDESKWALLPAPER = 0x73 MAX_PATH = 260 buffer = ctypes.create_unicode_buffer(MAX_PATH) ctypes.windll.user32.SystemParametersInfoW(SPI_GETDESKWALLPAPER, MAX_PATH, buffer, 0) return buffer.value def get_wallpaper_macos(): script = """ tell application "System Events" to tell every desktop to get picture """ process = subprocess.Popen(['osascript', '-e', script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, error = process.communicate() if error: app.logger.error("Error: %s", error.decode('utf-8')) return None return output.strip().decode('utf-8') def get_wallpaper_linux(): try: output = subprocess.check_output( ["gsettings", "get", "org.gnome.desktop.background", "picture-uri"], stderr=subprocess.PIPE ) return output.decode('utf-8').strip().replace('file://', '').replace("'", "") except subprocess.CalledProcessError as e: app.logger.error("Error: %s", e) return None os_name = platform.system() wallpaper_path = None if os_name == 'Windows': wallpaper_path = get_wallpaper_windows() elif os_name == 'Darwin': wallpaper_path = get_wallpaper_macos() elif os_name == 'Linux': wallpaper_path = get_wallpaper_linux() else: app.logger.error(f"Unsupported OS: {os_name}") abort(400, description="Unsupported OS") if wallpaper_path: try: # Ensure the filename is secure return send_file(wallpaper_path, mimetype='image/png') except Exception as e: app.logger.error(f"An error occurred while serving the wallpaper file: {e}") abort(500, description="Unable to serve the wallpaper file") else: abort(404, description="Wallpaper file not found") @app.route('/list_directory', methods=['POST']) def get_directory_tree(): def _list_dir_contents(directory): """ List the contents of a directory recursively, building a tree structure. :param directory: The path of the directory to inspect. :return: A nested dictionary with the contents of the directory. """ tree = {'type': 'directory', 'name': os.path.basename(directory), 'children': []} try: # List all files and directories in the current directory for entry in os.listdir(directory): full_path = os.path.join(directory, entry) # If entry is a directory, recurse into it if os.path.isdir(full_path): tree['children'].append(_list_dir_contents(full_path)) else: tree['children'].append({'type': 'file', 'name': entry}) except OSError as e: # If the directory cannot be accessed, return the exception message tree = {'error': str(e)} return tree # Extract the 'path' parameter from the JSON request data = request.get_json() if 'path' not in data: return jsonify(error="Missing 'path' parameter"), 400 start_path = data['path'] # Ensure the provided path is a directory if not os.path.isdir(start_path): return jsonify(error="The provided path is not a directory"), 400 # Generate the directory tree starting from the provided path directory_tree = _list_dir_contents(start_path) return jsonify(directory_tree=directory_tree) @app.route('/file', methods=['POST']) def get_file(): # Retrieve filename from the POST request if 'file_path' in request.form: file_path = request.form['file_path'] else: return jsonify({"error": "file_path is required"}), 400 try: # Check if the file exists and send it to the user return send_file(file_path, as_attachment=True) except FileNotFoundError: # If the file is not found, return a 404 error return jsonify({"error": "File not found"}), 404 @app.route("/setup/upload", methods=["POST"]) def upload_file(): # Retrieve filename from the POST request if 'file_path' in request.form and 'file_data' in request.files: file_path = request.form['file_path'] file = request.files["file_data"] file.save(file_path) return "File Uploaded" else: return jsonify({"error": "file_path and file_data are required"}), 400 @app.route('/platform', methods=['GET']) def get_platform(): return platform.system() @app.route('/cursor_position', methods=['GET']) def get_cursor_position(): return pyautogui.position().x, pyautogui.position().y @app.route("/setup/change_wallpaper", methods=['POST']) def change_wallpaper(): data = request.json path = data.get('path', None) if not path: return "Path not supplied!", 400 path = Path(path) if not path.exists(): return f"File not found: {path}", 404 try: user_platform = platform.system() if user_platform == "Windows": import ctypes ctypes.windll.user32.SystemParametersInfoW(20, 0, str(path), 3) elif user_platform == "Linux": import subprocess subprocess.run(["gsettings", "set", "org.gnome.desktop.background", "picture-uri", f"file://{path}"]) elif user_platform == "Darwin": # (Mac OS) import subprocess subprocess.run( ["osascript", "-e", f'tell application "Finder" to set desktop picture to POSIX file "{path}"']) return "Wallpaper changed successfully" except Exception as e: return f"Failed to change wallpaper. Error: {e}", 500 @app.route("/setup/download_file", methods=['POST']) def download_file(): data = request.json url = data.get('url', None) path = data.get('path', None) if not url or not path: return "Path or URL not supplied!", 400 path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) max_retries = 3 for i in range(max_retries): try: response = requests.get(url, stream=True) response.raise_for_status() with open(path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) return "File downloaded successfully" except requests.RequestException as e: logger.error(f"Failed to download {url}. Retrying... ({max_retries - i - 1} attempts left)") return f"Failed to download {url}. No retries left. Error: {e}", 500 @app.route("/setup/open_file", methods=['POST']) def open_file(): data = request.json path = data.get('path', None) if not path: return "Path not supplied!", 400 path = Path(path) if not path.exists(): return f"File not found: {path}", 404 try: if platform.system() == "Windows": os.startfile(path) else: open_cmd: str = "open" if platform.system() == "Darwin" else "xdg-open" subprocess.Popen([open_cmd, str(path)]) return "File opened successfully" except Exception as e: return f"Failed to open {path}. Error: {e}", 500 if __name__ == '__main__': app.run(debug=True, host="0.0.0.0")