Files
sci-gui-agent-benchmark/desktop_env/server/main.py
David Chang cf5d480f44 ver Jan10th
new Thunderbird task config
2024-01-10 17:36:59 +08:00

373 lines
13 KiB
Python

import os
from pathlib import Path
import platform
import subprocess
from pyxcursor import Xcursor
# import Xlib.display
import pyautogui
# from PIL import ImageGrab, Image
from PIL import Image
import lxml.etree
from lxml.etree import _Element
import pyatspi
from pyatspi import Accessible, StateType
from pyatspi import Component, Document
from pyatspi import Text as ATText
from pyatspi import Value as ATValue
from pyatspi import Action as ATAction
import requests
from flask import Flask, request, jsonify, send_file
from typing import List, Dict
from typing import Any
app = Flask(__name__)
pyautogui.PAUSE = 0
pyautogui.DARWIN_CATCH_UP_TIME = 0
logger = app.logger
@app.route('/setup/execute', methods=['POST'])
@app.route('/execute', methods=['POST'])
def execute_command():
data = request.json
# The 'command' key in the JSON request should contain the command to be executed.
shell = data.get('shell', False)
command = data.get('command', "" if shell else [])
# Execute the command without any safety checks.
try:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell, text=True)
return jsonify({
'status': 'success',
'output': result.stdout,
'error': result.stderr
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/setup/launch', methods=["POST"])
def launch_app():
data = request.json
command: List[str] = data.get("command", [])
try:
subprocess.Popen(command)
return "{:} launched successfully".format(" ".join(command))
except Exception as e:
return jsonify( { "status": "error"
, "message": str(e)
}
)\
, 500
@app.route('/screenshot', methods=['GET'])
def capture_screen_with_cursor():
# fixme: when running on virtual machines, the cursor is not captured, don't know why
file_path = os.path.join("screenshots", "screenshot.png")
user_platform = platform.system()
# Ensure the screenshots directory exists
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# fixme: This is a temporary fix for the cursor not being captured on Windows and Linux
if user_platform == "Windows":
def _download_image(url, path):
response = requests.get(url)
with open(path, 'wb') as file:
file.write(response.content)
cursor_path = os.path.join("screenshots", "cursor.png")
if not os.path.exists(cursor_path):
cursor_url = "https://vip.helloimg.com/images/2023/12/02/oQPzmt.png"
_download_image(cursor_url, cursor_path)
screenshot = pyautogui.screenshot()
cursor_x, cursor_y = pyautogui.position()
cursor = Image.open(cursor_path)
# make the cursor smaller
cursor = cursor.resize((int(cursor.width / 1.5), int(cursor.height / 1.5)))
screenshot.paste(cursor, (cursor_x, cursor_y), cursor)
screenshot.save(file_path)
elif user_platform == "Linux":
cursor_obj = Xcursor()
imgarray = cursor_obj.getCursorImageArrayFast()
cursor_img = Image.fromarray(imgarray)
screenshot = pyautogui.screenshot()
cursor_x, cursor_y = pyautogui.position()
screenshot.paste(cursor_img, (cursor_x, cursor_y), cursor_img)
screenshot.save(file_path)
elif user_platform == "Darwin": # (Mac OS)
# Use the screencapture utility to capture the screen with the cursor
subprocess.run(["screencapture", "-C", file_path])
else:
logger.warning(f"The platform you're using ({user_platform}) is not currently supported")
return send_file(file_path, mimetype='image/png')
_accessibility_ns_map = { "st": "uri:deskat:state.at-spi.gnome.org"
, "attr": "uri:deskat:attributes.at-spi.gnome.org"
, "cp": "uri:deskat:component.at-spi.gnome.org"
, "doc": "uri:deskat:document.at-spi.gnome.org"
, "docattr": "uri:deskat:attributes.document.at-spi.gnome.org"
, "txt": "uri:deskat:text.at-spi.gnome.org"
, "val": "uri:deskat:value.at-spi.gnome.org"
, "act": "uri:deskat:action.at-spi.gnome.org"
}
def _create_node(node: Accessible) -> _Element:
attribute_dict: Dict[str, Any] = {"name": node.name}
# States {{{ #
states: List[StateType] = node.getState().get_states()
for st in states:
state_name: str = StateType._enum_lookup[st]
attribute_dict[ "{{{:}}}{:}"\
.format( _accessibility_ns_map["st"]
, state_name.split("_", maxsplit=1)[1].lower()
)
] = "true"
# }}} States #
# Attributes {{{ #
attributes: List[str] = node.getAttributes()
for attrbt in attributes:
attribute_name: str
attribute_value: str
attribute_name, attribute_value = attrbt.split(":", maxsplit=1)
attribute_dict[ "{{{:}}}{:}"\
.format( _accessibility_ns_map["attr"]
, attribute_name
)
] = attribute_value
# }}} Attributes #
# Component {{{ #
try:
component: Component = node.queryComponent()
except NotImplementedError:
pass
else:
attribute_dict["{{{:}}}screencoord".format(_accessibility_ns_map["cp"])] = str(component.getPosition(pyatspi.XY_SCREEN))
attribute_dict["{{{:}}}windowcoord".format(_accessibility_ns_map["cp"])] = str(component.getPosition(pyatspi.XY_WINDOW))
attribute_dict["{{{:}}}parentcoord".format(_accessibility_ns_map["cp"])] = str(component.getPosition(pyatspi.XY_PARENT))
attribute_dict["{{{:}}}size".format(_accessibility_ns_map["cp"])] = str(component.getSize())
# }}} Component #
# Document {{{ #
try:
document: Document = node.queryDocument()
except NotImplementedError:
pass
else:
attribute_dict["{{{:}}}locale".format(_accessibility_ns_map["doc"])] = document.getLocale()
attribute_dict["{{{:}}}pagecount".format(_accessibility_ns_map["doc"])] = str(document.getPageCount())
attribute_dict["{{{:}}}currentpage".format(_accessibility_ns_map["doc"])] = str(document.getCurrentPageNumber())
for attrbt in document.getAttributes():
attribute_name: str
attribute_value: str
attribute_name, attribute_value = attrbt.split(":", maxsplit=1)
attribute_dict[ "{{{:}}}{:}"\
.format( _accessibility_ns_map["docattr"]
, attribute_name
)
] = attribute_value
# }}} Document #
# Text {{{ #
try:
text_obj: ATText = node.queryText()
except NotImplementedError:
pass
else:
# only text shown on current screen is available
#attribute_dict["txt:text"] = text_obj.getText(0, text_obj.characterCount)
text: str = text_obj.getText(0, text_obj.characterCount)
# }}} Text #
# Selection {{{ #
try:
node.querySelection()
except NotImplementedError:
pass
else:
attribute_dict["selection"] = "true"
# }}} Selection #
# Value {{{ #
try:
value: ATValue = node.queryValue()
except NotImplementedError:
pass
else:
attribute_dict["{{{:}}}value".format(_accessibility_ns_map["val"])] = str(value.currentValue)
attribute_dict["{{{:}}}min".format(_accessibility_ns_map["val"])] = str(value.minimumValue)
attribute_dict["{{{:}}}max".format(_accessibility_ns_map["val"])] = str(value.maximumValue)
attribute_dict["{{{:}}}step".format(_accessibility_ns_map["val"])] = str(value.minimumIncrement)
# }}} Value #
# Action {{{ #
try:
action: ATAction = node.queryAction()
except NotImplementedError:
pass
else:
for i in range(action.nActions):
action_name: str = action.getName(i).replace(" ", "-")
attribute_dict[ "{{{:}}}{:}_desc"\
.format( _accessibility_ns_map["act"]
, action_name
)
] = action.getDescription(i)
attribute_dict[ "{{{:}}}{:}_kb"\
.format( _accessibility_ns_map["act"]
, action_name
)
] = action.getKeyBinding(i)
# }}} Action #
xml_node = lxml.etree.Element( node.getRoleName().replace(" ", "-")
, attrib=attribute_dict
, nsmap=_accessibility_ns_map
)
if "text" in locals() and len(text)>0:
xml_node.text = text
for ch in node:
xml_node.append(_create_node(ch))
return xml_node
@app.route("/accessibility", methods=["GET"])
def get_accessibility_tree():
desktop: Accessible = pyatspi.Registry.getDesktop(0)
desktop_xml: _Element = _create_node(desktop)
return jsonify({"AT": lxml.etree.tostring(desktop_xml, encoding="unicode")})
@app.route('/file', methods=['POST'])
def get_file():
# Retrieve filename from the POST request
if 'file_path' in request.form:
file_path = request.form['file_path']
else:
return jsonify({"error": "file_path is required"}), 400
try:
# Check if the file exists and send it to the user
return send_file(file_path, as_attachment=True)
except FileNotFoundError:
# If the file is not found, return a 404 error
return jsonify({"error": "File not found"}), 404
@app.route("/setup/upload", methods=["POST"])
def upload_file():
# Retrieve filename from the POST request
if 'file_path' in request.form and 'file_data' in request.files:
file_path = request.form['file_path']
file = request.files["file_data"]
file.save(file_path)
return "File Uploaded"
else:
return jsonify({"error": "file_path and file_data are required"}), 400
@app.route('/platform', methods=['GET'])
def get_platform():
return platform.system()
@app.route('/cursor_position', methods=['GET'])
def get_cursor_position():
return pyautogui.position().x, pyautogui.position().y
@app.route("/setup/change_wallpaper", methods=['POST'])
def change_wallpaper():
data = request.json
path = data.get('path', None)
if not path:
return "Path not supplied!", 400
path = Path(path)
if not path.exists():
return f"File not found: {path}", 404
try:
user_platform = platform.system()
if user_platform == "Windows":
import ctypes
ctypes.windll.user32.SystemParametersInfoW(20, 0, str(path), 3)
elif user_platform == "Linux":
import subprocess
subprocess.run(["gsettings", "set", "org.gnome.desktop.background", "picture-uri", f"file://{path}"])
elif user_platform == "Darwin": # (Mac OS)
import subprocess
subprocess.run(
["osascript", "-e", f'tell application "Finder" to set desktop picture to POSIX file "{path}"'])
return "Wallpaper changed successfully"
except Exception as e:
return f"Failed to change wallpaper. Error: {e}", 500
@app.route("/setup/download_file", methods=['POST'])
def download_file():
data = request.json
url = data.get('url', None)
path = data.get('path', None)
if not url or not path:
return "Path or URL not supplied!", 400
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
max_retries = 3
for i in range(max_retries):
try:
response = requests.get(url, stream=True)
response.raise_for_status()
with open(path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return "File downloaded successfully"
except requests.RequestException as e:
logger.error(f"Failed to download {url}. Retrying... ({max_retries - i - 1} attempts left)")
return f"Failed to download {url}. No retries left. Error: {e}", 500
@app.route("/setup/open_file", methods=['POST'])
def open_file():
data = request.json
path = data.get('path', None)
if not path:
return "Path not supplied!", 400
path = Path(path)
if not path.exists():
return f"File not found: {path}", 404
try:
if platform.system() == "Windows":
os.startfile(path)
else:
open_cmd: str = "open" if platform.system() == "Darwin" else "xdg-open"
subprocess.Popen([open_cmd, str(path)])
return "File opened successfully"
except Exception as e:
return f"Failed to open {path}. Error: {e}", 500
if __name__ == '__main__':
app.run(debug=True, host="0.0.0.0")