Files
sci-gui-agent-benchmark/desktop_env/server/main.py
Timothyxxx 287876affc Merge remote-tracking branch 'origin/main'
# Conflicts:
#	desktop_env/evaluators/getters/__init__.py
#	desktop_env/evaluators/metrics/__init__.py
#	requirements.txt
2024-01-10 23:20:49 +08:00

533 lines
19 KiB
Python

import ctypes
import os
import platform
import subprocess
from pathlib import Path
import lxml.etree
from lxml.etree import _Element
import pyatspi
from pyatspi import Accessible, StateType
from pyatspi import Component, Document
from pyatspi import Text as ATText
from pyatspi import Value as ATValue
from pyatspi import Action as ATAction
from typing import List, Dict
from typing import Any
import Xlib
import pyautogui
from PIL import Image
from Xlib import display, X
from pyxcursor import Xcursor
import requests
from flask import Flask, request, jsonify, send_file, abort
from werkzeug.utils import secure_filename
app = Flask(__name__)
pyautogui.PAUSE = 0
pyautogui.DARWIN_CATCH_UP_TIME = 0
logger = app.logger
@app.route('/setup/execute', methods=['POST'])
@app.route('/execute', methods=['POST'])
def execute_command():
data = request.json
# The 'command' key in the JSON request should contain the command to be executed.
shell = data.get('shell', False)
command = data.get('command', "" if shell else [])
# Execute the command without any safety checks.
try:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell, text=True)
return jsonify({
'status': 'success',
'output': result.stdout,
'error': result.stderr
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/setup/launch', methods=["POST"])
def launch_app():
data = request.json
command: List[str] = data.get("command", [])
try:
subprocess.Popen(command)
return "{:} launched successfully".format(" ".join(command))
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/screenshot', methods=['GET'])
def capture_screen_with_cursor():
# fixme: when running on virtual machines, the cursor is not captured, don't know why
file_path = os.path.join("screenshots", "screenshot.png")
user_platform = platform.system()
# Ensure the screenshots directory exists
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# fixme: This is a temporary fix for the cursor not being captured on Windows and Linux
if user_platform == "Windows":
def _download_image(url, path):
response = requests.get(url)
with open(path, 'wb') as file:
file.write(response.content)
cursor_path = os.path.join("screenshots", "cursor.png")
if not os.path.exists(cursor_path):
cursor_url = "https://vip.helloimg.com/images/2023/12/02/oQPzmt.png"
_download_image(cursor_url, cursor_path)
screenshot = pyautogui.screenshot()
cursor_x, cursor_y = pyautogui.position()
cursor = Image.open(cursor_path)
# make the cursor smaller
cursor = cursor.resize((int(cursor.width / 1.5), int(cursor.height / 1.5)))
screenshot.paste(cursor, (cursor_x, cursor_y), cursor)
screenshot.save(file_path)
elif user_platform == "Linux":
cursor_obj = Xcursor()
imgarray = cursor_obj.getCursorImageArrayFast()
cursor_img = Image.fromarray(imgarray)
screenshot = pyautogui.screenshot()
cursor_x, cursor_y = pyautogui.position()
screenshot.paste(cursor_img, (cursor_x, cursor_y), cursor_img)
screenshot.save(file_path)
elif user_platform == "Darwin": # (Mac OS)
# Use the screencapture utility to capture the screen with the cursor
subprocess.run(["screencapture", "-C", file_path])
else:
logger.warning(f"The platform you're using ({user_platform}) is not currently supported")
return send_file(file_path, mimetype='image/png')
_accessibility_ns_map = { "st": "uri:deskat:state.at-spi.gnome.org"
, "attr": "uri:deskat:attributes.at-spi.gnome.org"
, "cp": "uri:deskat:component.at-spi.gnome.org"
, "doc": "uri:deskat:document.at-spi.gnome.org"
, "docattr": "uri:deskat:attributes.document.at-spi.gnome.org"
, "txt": "uri:deskat:text.at-spi.gnome.org"
, "val": "uri:deskat:value.at-spi.gnome.org"
, "act": "uri:deskat:action.at-spi.gnome.org"
}
def _create_node(node: Accessible) -> _Element:
attribute_dict: Dict[str, Any] = {"name": node.name}
# States {{{ #
states: List[StateType] = node.getState().get_states()
for st in states:
state_name: str = StateType._enum_lookup[st]
attribute_dict[ "{{{:}}}{:}"\
.format( _accessibility_ns_map["st"]
, state_name.split("_", maxsplit=1)[1].lower()
)
] = "true"
# }}} States #
# Attributes {{{ #
attributes: List[str] = node.getAttributes()
for attrbt in attributes:
attribute_name: str
attribute_value: str
attribute_name, attribute_value = attrbt.split(":", maxsplit=1)
attribute_dict[ "{{{:}}}{:}"\
.format( _accessibility_ns_map["attr"]
, attribute_name
)
] = attribute_value
# }}} Attributes #
# Component {{{ #
try:
component: Component = node.queryComponent()
except NotImplementedError:
pass
else:
attribute_dict["{{{:}}}screencoord".format(_accessibility_ns_map["cp"])] = str(component.getPosition(pyatspi.XY_SCREEN))
attribute_dict["{{{:}}}windowcoord".format(_accessibility_ns_map["cp"])] = str(component.getPosition(pyatspi.XY_WINDOW))
attribute_dict["{{{:}}}parentcoord".format(_accessibility_ns_map["cp"])] = str(component.getPosition(pyatspi.XY_PARENT))
attribute_dict["{{{:}}}size".format(_accessibility_ns_map["cp"])] = str(component.getSize())
# }}} Component #
# Document {{{ #
try:
document: Document = node.queryDocument()
except NotImplementedError:
pass
else:
attribute_dict["{{{:}}}locale".format(_accessibility_ns_map["doc"])] = document.getLocale()
attribute_dict["{{{:}}}pagecount".format(_accessibility_ns_map["doc"])] = str(document.getPageCount())
attribute_dict["{{{:}}}currentpage".format(_accessibility_ns_map["doc"])] = str(document.getCurrentPageNumber())
for attrbt in document.getAttributes():
attribute_name: str
attribute_value: str
attribute_name, attribute_value = attrbt.split(":", maxsplit=1)
attribute_dict[ "{{{:}}}{:}"\
.format( _accessibility_ns_map["docattr"]
, attribute_name
)
] = attribute_value
# }}} Document #
# Text {{{ #
try:
text_obj: ATText = node.queryText()
except NotImplementedError:
pass
else:
# only text shown on current screen is available
#attribute_dict["txt:text"] = text_obj.getText(0, text_obj.characterCount)
text: str = text_obj.getText(0, text_obj.characterCount)
# }}} Text #
# Selection {{{ #
try:
node.querySelection()
except NotImplementedError:
pass
else:
attribute_dict["selection"] = "true"
# }}} Selection #
# Value {{{ #
try:
value: ATValue = node.queryValue()
except NotImplementedError:
pass
else:
attribute_dict["{{{:}}}value".format(_accessibility_ns_map["val"])] = str(value.currentValue)
attribute_dict["{{{:}}}min".format(_accessibility_ns_map["val"])] = str(value.minimumValue)
attribute_dict["{{{:}}}max".format(_accessibility_ns_map["val"])] = str(value.maximumValue)
attribute_dict["{{{:}}}step".format(_accessibility_ns_map["val"])] = str(value.minimumIncrement)
# }}} Value #
# Action {{{ #
try:
action: ATAction = node.queryAction()
except NotImplementedError:
pass
else:
for i in range(action.nActions):
action_name: str = action.getName(i).replace(" ", "-")
attribute_dict[ "{{{:}}}{:}_desc"\
.format( _accessibility_ns_map["act"]
, action_name
)
] = action.getDescription(i)
attribute_dict[ "{{{:}}}{:}_kb"\
.format( _accessibility_ns_map["act"]
, action_name
)
] = action.getKeyBinding(i)
# }}} Action #
xml_node = lxml.etree.Element( node.getRoleName().replace(" ", "-")
, attrib=attribute_dict
, nsmap=_accessibility_ns_map
)
if "text" in locals() and len(text)>0:
xml_node.text = text
for ch in node:
xml_node.append(_create_node(ch))
return xml_node
@app.route("/accessibility", methods=["GET"])
def get_accessibility_tree():
desktop: Accessible = pyatspi.Registry.getDesktop(0)
desktop_xml: _Element = _create_node(desktop)
return jsonify({"AT": lxml.etree.tostring(desktop_xml, encoding="unicode")})
@app.route('/screen_size', methods=['POST'])
def get_screen_size():
d = display.Display()
screen_width = d.screen().width_in_pixels
screen_height = d.screen().height_in_pixels
return jsonify(
{
"width": screen_width,
"height": screen_height
}
)
@app.route('/window_size', methods=['POST'])
def get_window_size():
if 'app_class_name' in request.form:
app_class_name = request.form['app_class_name']
else:
return jsonify({"error": "app_class_name is required"}), 400
d = display.Display()
root = d.screen().root
window_ids = root.get_full_property(d.intern_atom('_NET_CLIENT_LIST'), X.AnyPropertyType).value
for window_id in window_ids:
try:
window = d.create_resource_object('window', window_id)
wm_class = window.get_wm_class()
if wm_class is None:
continue
if app_class_name.lower() in [name.lower() for name in wm_class]:
geom = window.get_geometry()
return jsonify(
{
"width": geom.width,
"height": geom.height
}
)
except Xlib.error.XError: # Ignore windows that give an error
continue
return None
@app.route('/desktop_path', methods=['POST'])
def get_desktop_path():
# Get the home directory in a platform-independent manner using pathlib
home_directory = str(Path.home())
# Determine the desktop path based on the operating system
desktop_path = {
"Windows": os.path.join(home_directory, "Desktop"),
"Darwin": os.path.join(home_directory, "Desktop"), # macOS
"Linux": os.path.join(home_directory, "Desktop")
}.get(platform.system(), None)
# Check if the operating system is supported and the desktop path exists
if desktop_path and os.path.exists(desktop_path):
return jsonify(desktop_path=desktop_path)
else:
return jsonify(error="Unsupported operating system or desktop path not found"), 404
@app.route('/wallpaper', methods=['POST'])
def get_wallpaper():
def get_wallpaper_windows():
SPI_GETDESKWALLPAPER = 0x73
MAX_PATH = 260
buffer = ctypes.create_unicode_buffer(MAX_PATH)
ctypes.windll.user32.SystemParametersInfoW(SPI_GETDESKWALLPAPER, MAX_PATH, buffer, 0)
return buffer.value
def get_wallpaper_macos():
script = """
tell application "System Events" to tell every desktop to get picture
"""
process = subprocess.Popen(['osascript', '-e', script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
if error:
app.logger.error("Error: %s", error.decode('utf-8'))
return None
return output.strip().decode('utf-8')
def get_wallpaper_linux():
try:
output = subprocess.check_output(
["gsettings", "get", "org.gnome.desktop.background", "picture-uri"],
stderr=subprocess.PIPE
)
return output.decode('utf-8').strip().replace('file://', '').replace("'", "")
except subprocess.CalledProcessError as e:
app.logger.error("Error: %s", e)
return None
os_name = platform.system()
wallpaper_path = None
if os_name == 'Windows':
wallpaper_path = get_wallpaper_windows()
elif os_name == 'Darwin':
wallpaper_path = get_wallpaper_macos()
elif os_name == 'Linux':
wallpaper_path = get_wallpaper_linux()
else:
app.logger.error(f"Unsupported OS: {os_name}")
abort(400, description="Unsupported OS")
if wallpaper_path:
try:
# Ensure the filename is secure
return send_file(wallpaper_path, mimetype='image/png')
except Exception as e:
app.logger.error(f"An error occurred while serving the wallpaper file: {e}")
abort(500, description="Unable to serve the wallpaper file")
else:
abort(404, description="Wallpaper file not found")
@app.route('/list_directory', methods=['POST'])
def get_directory_tree():
def _list_dir_contents(directory):
"""
List the contents of a directory recursively, building a tree structure.
:param directory: The path of the directory to inspect.
:return: A nested dictionary with the contents of the directory.
"""
tree = {'type': 'directory', 'name': os.path.basename(directory), 'children': []}
try:
# List all files and directories in the current directory
for entry in os.listdir(directory):
full_path = os.path.join(directory, entry)
# If entry is a directory, recurse into it
if os.path.isdir(full_path):
tree['children'].append(_list_dir_contents(full_path))
else:
tree['children'].append({'type': 'file', 'name': entry})
except OSError as e:
# If the directory cannot be accessed, return the exception message
tree = {'error': str(e)}
return tree
# Extract the 'path' parameter from the JSON request
data = request.get_json()
if 'path' not in data:
return jsonify(error="Missing 'path' parameter"), 400
start_path = data['path']
# Ensure the provided path is a directory
if not os.path.isdir(start_path):
return jsonify(error="The provided path is not a directory"), 400
# Generate the directory tree starting from the provided path
directory_tree = _list_dir_contents(start_path)
return jsonify(directory_tree=directory_tree)
@app.route('/file', methods=['POST'])
def get_file():
# Retrieve filename from the POST request
if 'file_path' in request.form:
file_path = request.form['file_path']
else:
return jsonify({"error": "file_path is required"}), 400
try:
# Check if the file exists and send it to the user
return send_file(file_path, as_attachment=True)
except FileNotFoundError:
# If the file is not found, return a 404 error
return jsonify({"error": "File not found"}), 404
@app.route("/setup/upload", methods=["POST"])
def upload_file():
# Retrieve filename from the POST request
if 'file_path' in request.form and 'file_data' in request.files:
file_path = request.form['file_path']
file = request.files["file_data"]
file.save(file_path)
return "File Uploaded"
else:
return jsonify({"error": "file_path and file_data are required"}), 400
@app.route('/platform', methods=['GET'])
def get_platform():
return platform.system()
@app.route('/cursor_position', methods=['GET'])
def get_cursor_position():
return pyautogui.position().x, pyautogui.position().y
@app.route("/setup/change_wallpaper", methods=['POST'])
def change_wallpaper():
data = request.json
path = data.get('path', None)
if not path:
return "Path not supplied!", 400
path = Path(path)
if not path.exists():
return f"File not found: {path}", 404
try:
user_platform = platform.system()
if user_platform == "Windows":
import ctypes
ctypes.windll.user32.SystemParametersInfoW(20, 0, str(path), 3)
elif user_platform == "Linux":
import subprocess
subprocess.run(["gsettings", "set", "org.gnome.desktop.background", "picture-uri", f"file://{path}"])
elif user_platform == "Darwin": # (Mac OS)
import subprocess
subprocess.run(
["osascript", "-e", f'tell application "Finder" to set desktop picture to POSIX file "{path}"'])
return "Wallpaper changed successfully"
except Exception as e:
return f"Failed to change wallpaper. Error: {e}", 500
@app.route("/setup/download_file", methods=['POST'])
def download_file():
data = request.json
url = data.get('url', None)
path = data.get('path', None)
if not url or not path:
return "Path or URL not supplied!", 400
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
max_retries = 3
for i in range(max_retries):
try:
response = requests.get(url, stream=True)
response.raise_for_status()
with open(path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return "File downloaded successfully"
except requests.RequestException as e:
logger.error(f"Failed to download {url}. Retrying... ({max_retries - i - 1} attempts left)")
return f"Failed to download {url}. No retries left. Error: {e}", 500
@app.route("/setup/open_file", methods=['POST'])
def open_file():
data = request.json
path = data.get('path', None)
if not path:
return "Path not supplied!", 400
path = Path(path)
if not path.exists():
return f"File not found: {path}", 404
try:
if platform.system() == "Windows":
os.startfile(path)
else:
open_cmd: str = "open" if platform.system() == "Darwin" else "xdg-open"
subprocess.Popen([open_cmd, str(path)])
return "File opened successfully"
except Exception as e:
return f"Failed to open {path}. Error: {e}", 500
if __name__ == '__main__':
app.run(debug=True, host="0.0.0.0")