From 308282e83059ea268f666721c55fd3de339d4fec Mon Sep 17 00:00:00 2001 From: cui0711 <1729461967@qq.com> Date: Fri, 30 Jan 2026 16:27:49 +0800 Subject: [PATCH] feat(server): add cross-platform support and improve screenshot handling --- desktop_env/server/main.py | 197 ++++++++++++++++++++++++++++++++----- 1 file changed, 175 insertions(+), 22 deletions(-) diff --git a/desktop_env/server/main.py b/desktop_env/server/main.py index 02fbf17..0430276 100644 --- a/desktop_env/server/main.py +++ b/desktop_env/server/main.py @@ -4,25 +4,27 @@ import platform import shlex import json import subprocess, signal +import sys import time from pathlib import Path from typing import Any, Optional, Sequence from typing import List, Dict, Tuple, Literal import concurrent.futures -import Xlib import lxml.etree import pyautogui import requests import re from PIL import Image, ImageGrab -from Xlib import display, X from flask import Flask, request, jsonify, send_file, abort # , send_from_directory from lxml.etree import _Element platform_name: str = platform.system() if platform_name == "Linux": + import Xlib + from Xlib import display, X + from pyxcursor import Xcursor import pyatspi from pyatspi import Accessible, StateType, STATE_SHOWING from pyatspi import Action as ATAction @@ -39,9 +41,14 @@ elif platform_name == "Windows": import win32ui, win32gui Accessible = Any + Xlib = None + display = None + X = None + Xcursor = None elif platform_name == "Darwin": import plistlib + from pyxcursor import Xcursor import AppKit import ApplicationServices @@ -51,13 +58,16 @@ elif platform_name == "Darwin": Accessible = Any BaseWrapper = Any + Xlib = None else: # Platform not supported Accessible = None BaseWrapper = Any - -from pyxcursor import Xcursor + Xlib = None + display = None + X = None + Xcursor = None # todo: need to reformat and organize this whole file @@ -89,6 +99,10 @@ def execute_command(): if arg.startswith("~/"): command[i] = os.path.expanduser(arg) + # Replace 'python' with sys.executable to use the same Python interpreter as the server + if len(command) > 0 and command[0] in ['python', 'python3', 'python.exe', 'python3.exe']: + command[0] = sys.executable + # Execute the command without any safety checks. try: if platform_name == "Windows": @@ -262,15 +276,12 @@ def launch_app(): @app.route('/screenshot', methods=['GET']) def capture_screen_with_cursor(): - # fixme: when running on virtual machines, the cursor is not captured, don't know why - file_path = os.path.join(os.path.dirname(__file__), "screenshots", "screenshot.png") user_platform = platform.system() # Ensure the screenshots directory exists os.makedirs(os.path.dirname(file_path), exist_ok=True) - # fixme: This is a temporary fix for the cursor not being captured on Windows and Linux if user_platform == "Windows": def get_cursor(): hcursor = win32gui.GetCursorInfo()[1] @@ -303,19 +314,53 @@ def capture_screen_with_cursor(): ratio = ctypes.windll.shcore.GetScaleFactorForDevice(0) / 100 + # get logical screen size + user32 = ctypes.windll.user32 + logical_width = user32.GetSystemMetrics(0) + logical_height = user32.GetSystemMetrics(1) + + # ===== Key fix: get cursor position before taking screenshot ===== + # win32gui.GetCursorPos() returns logical coordinates (consistent with pyautogui) + pos_win = win32gui.GetCursorPos() + logger.info(f"Cursor position (logical coordinates): {pos_win}") + + # Take screenshot immediately to reduce time difference img = ImageGrab.grab(bbox=None, include_layered_windows=True) + # ============================================= + + # ===== DPI scaling fix ===== + if ratio != 1.0: + physical_width, physical_height = img.size + logger.info(f"Detected DPI scaling: {ratio}x ({ratio*100}%)") + logger.info(f"Physical screenshot size: {physical_width}x{physical_height}") + logger.info(f"Logical resolution: {logical_width}x{logical_height}") + logger.info(f"Resizing screenshot to match logical resolution...") + img = img.resize((logical_width, logical_height), Image.Resampling.LANCZOS) + logger.info(f"Screenshot resized to: {img.size}") + # ========================== try: cursor, (hotspotx, hotspoty) = get_cursor() - pos_win = win32gui.GetCursorPos() - pos = (round(pos_win[0]*ratio - hotspotx), round(pos_win[1]*ratio - hotspoty)) + # ===== Cursor position handling ===== + # win32gui.GetCursorPos() and pyautogui both use logical coordinates + # The screenshot has been resized to logical resolution, so use directly + logical_cursor_x = pos_win[0] + logical_cursor_y = pos_win[1] + + pos = (logical_cursor_x - hotspotx, logical_cursor_y - hotspoty) + + logger.info(f"Cursor position (logical coordinates): ({logical_cursor_x}, {logical_cursor_y})") + logger.info(f"Hotspot offset: ({hotspotx}, {hotspoty})") + logger.info(f"Final paste position: {pos}") + # =================================== img.paste(cursor, pos, cursor) except Exception as e: - logger.warning(f"Failed to capture cursor on Windows, screenshot will not have a cursor. Error: {e}") + logger.warning(f"Failed to capture cursor on Windows, screenshot will not include cursor. Error: {e}") img.save(file_path) + elif user_platform == "Linux": cursor_obj = Xcursor() imgarray = cursor_obj.getCursorImageArrayFast() @@ -324,17 +369,19 @@ def capture_screen_with_cursor(): cursor_x, cursor_y = pyautogui.position() screenshot.paste(cursor_img, (cursor_x, cursor_y), cursor_img) screenshot.save(file_path) + elif user_platform == "Darwin": # (Mac OS) - # Use the screencapture utility to capture the screen with the cursor subprocess.run(["screencapture", "-C", file_path]) + else: logger.warning(f"The platform you're using ({user_platform}) is not currently supported") return send_file(file_path, mimetype='image/png') + def _has_active_terminal(desktop: Accessible) -> bool: - """ A quick check whether the terminal window is open and active. + """ A quick check whether the terminal window is open and active (Linux only). """ for app in desktop: if app.getRoleName() == "application" and app.name == "gnome-terminal-server": @@ -344,6 +391,87 @@ def _has_active_terminal(desktop: Accessible) -> bool: return False +def _get_windows_terminal_output() -> Optional[str]: + """ Get terminal output on Windows platform. + Supports Windows Terminal, PowerShell, Command Prompt, and ConHost. + """ + try: + from pywinauto import Desktop + from pywinauto.findwindows import ElementNotFoundError + + desktop = Desktop(backend="uia") + + # Common terminal applications on Windows + terminal_apps = [ + "WindowsTerminal.exe", # Windows Terminal + "powershell.exe", # PowerShell + "pwsh.exe", # PowerShell Core + "cmd.exe", # Command Prompt + "conhost.exe" # Console Host + ] + + # Try to find active terminal windows + for window in desktop.windows(): + try: + # Check if window is visible and not minimized + if not window.is_visible() or window.is_minimized(): + continue + + # Get window process name + process_name = window.element_info.name.lower() + + # Check if this is a terminal window + is_terminal = False + for term_app in terminal_apps: + if term_app.lower() in process_name or \ + any(term_name in process_name for term_name in ['terminal', 'powershell', 'command prompt', 'cmd']): + is_terminal = True + break + + if not is_terminal: + continue + + # Try to get text content from the terminal + # First, try to find console/edit controls that contain the output + try: + # For Windows Terminal and modern consoles + # Look for Edit or Document controls that contain the text + text_controls = window.descendants(control_type="Edit") + if not text_controls: + text_controls = window.descendants(control_type="Document") + if not text_controls: + text_controls = window.descendants(control_type="Text") + + for control in text_controls: + try: + text = control.window_text() + if text and len(text.strip()) > 0: + return text.rstrip() + except: + pass + + # If no text controls found, try to get the window text directly + window_text = window.window_text() + if window_text and len(window_text.strip()) > 0: + # Filter out just the window title + if window_text not in ['Windows PowerShell', 'Command Prompt', 'PowerShell', 'Administrator: Windows PowerShell']: + return window_text.rstrip() + + except Exception as e: + logger.debug(f"Error getting text from window {process_name}: {e}") + continue + + except Exception as e: + logger.debug(f"Error processing window: {e}") + continue + + return None + + except Exception as e: + logger.error(f"Error in _get_windows_terminal_output: {e}") + return None + + @app.route('/terminal', methods=['GET']) def get_terminal_output(): user_platform = platform.system() @@ -358,8 +486,10 @@ def get_terminal_output(): xpath = '//application[@name="gnome-terminal-server"]/frame[@st:active="true"]//terminal[@st:focused="true"]' terminals: List[_Element] = desktop_xml.xpath(xpath, namespaces=_accessibility_ns_map_ubuntu) output = terminals[0].text.rstrip() if len(terminals) == 1 else None - else: # windows and macos platform is not implemented currently - # raise NotImplementedError + elif user_platform == "Windows": + output = _get_windows_terminal_output() + logger.debug(f"Terminal output retrieved: {output}") + else: # macOS platform is not implemented currently return "Currently not implemented for platform {:}.".format(platform.platform()), 500 return jsonify({"output": output, "status": "success"}) except Exception as e: @@ -989,6 +1119,9 @@ def get_window_size(): else: return jsonify({"error": "app_class_name is required"}), 400 + if platform_name != "Linux": + return jsonify({"error": "window_size is only supported on Linux"}), 501 + d = display.Display() root = d.screen().root window_ids = root.get_full_property(d.intern_atom('_NET_CLIENT_LIST'), X.AnyPropertyType).value @@ -1505,11 +1638,19 @@ def start_recording(): logger.error(f"Error removing old recording file: {e}") return jsonify({'status': 'error', 'message': f'Failed to remove old recording file: {e}'}), 500 - d = display.Display() - screen_width = d.screen().width_in_pixels - screen_height = d.screen().height_in_pixels - - start_command = f"ffmpeg -y -f x11grab -draw_mouse 1 -s {screen_width}x{screen_height} -i :0.0 -c:v libx264 -r 30 {recording_path}" + if platform_name == "Linux": + d = display.Display() + screen_width = d.screen().width_in_pixels + screen_height = d.screen().height_in_pixels + start_command = f"ffmpeg -y -f x11grab -draw_mouse 1 -s {screen_width}x{screen_height} -i :0.0 -c:v libx264 -r 30 {recording_path}" + elif platform_name == "Windows": + user32 = ctypes.windll.user32 + screen_width = user32.GetSystemMetrics(0) + screen_height = user32.GetSystemMetrics(1) + # Use gdigrab for Windows screen capture + start_command = f"ffmpeg -y -f gdigrab -draw_mouse 1 -framerate 30 -video_size {screen_width}x{screen_height} -i desktop -c:v libx264 -r 30 {recording_path}" + else: + return jsonify({'status': 'error', 'message': f'Recording not supported on {platform_name}'}), 501 # Use stderr=PIPE to capture potential errors from ffmpeg recording_process = subprocess.Popen(shlex.split(start_command), @@ -1544,11 +1685,22 @@ def end_recording(): error_output = "" try: # Send SIGINT for a graceful shutdown, allowing ffmpeg to finalize the file. - recording_process.send_signal(signal.SIGINT) + # On Windows, use CTRL_C_EVENT; on Unix, use SIGINT + if platform_name == "Windows": + # On Windows, we need to terminate the process gracefully + # ffmpeg responds to standard input 'q' to quit gracefully + try: + recording_process.stdin.write(b'q') + recording_process.stdin.flush() + except: + # If stdin is not available, use terminate + recording_process.terminate() + else: + recording_process.send_signal(signal.SIGINT) # Wait for ffmpeg to terminate. communicate() gets output and waits. _, error_output = recording_process.communicate(timeout=15) except subprocess.TimeoutExpired: - logger.error("ffmpeg did not respond to SIGINT, killing the process.") + logger.error("ffmpeg did not respond to stop signal, killing the process.") recording_process.kill() # After killing, communicate to get any remaining output. _, error_output = recording_process.communicate() @@ -1589,8 +1741,9 @@ def run_python(): f.write(code) # Execute the file using subprocess to capture all output + # Use sys.executable to use the same Python interpreter as the Flask server result = subprocess.run( - ['/usr/bin/python3', temp_filename], + [sys.executable, temp_filename], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True,