Files
sci-gui-agent-benchmark/desktop_env/server/main.py
2024-01-09 09:30:11 +08:00

226 lines
7.1 KiB
Python

import os
import platform
import subprocess
from pathlib import Path
from typing import List
import pyautogui
import requests
from PIL import Image
from flask import Flask, request, jsonify, send_file
from pyxcursor import Xcursor
app = Flask(__name__)
pyautogui.PAUSE = 0
pyautogui.DARWIN_CATCH_UP_TIME = 0
logger = app.logger
@app.route('/setup/execute', methods=['POST'])
@app.route('/execute', methods=['POST'])
def execute_command():
data = request.json
# The 'command' key in the JSON request should contain the command to be executed.
shell = data.get('shell', False)
command = data.get('command', "" if shell else [])
# Execute the command without any safety checks.
try:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell, text=True)
return jsonify({
'status': 'success',
'output': result.stdout,
'error': result.stderr
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/setup/launch', methods=["POST"])
def launch_app():
data = request.json
command: List[str] = data.get("command", [])
try:
subprocess.Popen(command)
return "{:} launched successfully".format(" ".join(command))
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/screenshot', methods=['GET'])
def capture_screen_with_cursor():
# fixme: when running on virtual machines, the cursor is not captured, don't know why
file_path = os.path.join("screenshots", "screenshot.png")
user_platform = platform.system()
# Ensure the screenshots directory exists
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# fixme: This is a temporary fix for the cursor not being captured on Windows and Linux
if user_platform == "Windows":
def _download_image(url, path):
response = requests.get(url)
with open(path, 'wb') as file:
file.write(response.content)
cursor_path = os.path.join("screenshots", "cursor.png")
if not os.path.exists(cursor_path):
cursor_url = "https://vip.helloimg.com/images/2023/12/02/oQPzmt.png"
_download_image(cursor_url, cursor_path)
screenshot = pyautogui.screenshot()
cursor_x, cursor_y = pyautogui.position()
cursor = Image.open(cursor_path)
# make the cursor smaller
cursor = cursor.resize((int(cursor.width / 1.5), int(cursor.height / 1.5)))
screenshot.paste(cursor, (cursor_x, cursor_y), cursor)
screenshot.save(file_path)
elif user_platform == "Linux":
cursor_obj = Xcursor()
imgarray = cursor_obj.getCursorImageArrayFast()
cursor_img = Image.fromarray(imgarray)
screenshot = pyautogui.screenshot()
cursor_x, cursor_y = pyautogui.position()
screenshot.paste(cursor_img, (cursor_x, cursor_y), cursor_img)
screenshot.save(file_path)
elif user_platform == "Darwin": # (Mac OS)
# Use the screencapture utility to capture the screen with the cursor
subprocess.run(["screencapture", "-C", file_path])
else:
logger.warning(f"The platform you're using ({user_platform}) is not currently supported")
return send_file(file_path, mimetype='image/png')
@app.route('/file', methods=['POST'])
def get_file():
# Retrieve filename from the POST request
if 'file_path' in request.form:
file_path = request.form['file_path']
else:
return jsonify({"error": "file_path is required"}), 400
try:
# Check if the file exists and send it to the user
return send_file(file_path, as_attachment=True)
except FileNotFoundError:
# If the file is not found, return a 404 error
return jsonify({"error": "File not found"}), 404
@app.route("/setup/upload", methods=["POST"])
def upload_file():
# Retrieve filename from the POST request
if 'file_path' in request.form and 'file_data' in request.files:
file_path = request.form['file_path']
file = request.files["file_data"]
file.save(file_path)
return "File Uploaded"
else:
return jsonify({"error": "file_path and file_data are required"}), 400
@app.route('/platform', methods=['GET'])
def get_platform():
return platform.system()
@app.route('/cursor_position', methods=['GET'])
def get_cursor_position():
return pyautogui.position().x, pyautogui.position().y
@app.route("/setup/change_wallpaper", methods=['POST'])
def change_wallpaper():
data = request.json
path = data.get('path', None)
if not path:
return "Path not supplied!", 400
path = Path(path)
if not path.exists():
return f"File not found: {path}", 404
try:
user_platform = platform.system()
if user_platform == "Windows":
import ctypes
ctypes.windll.user32.SystemParametersInfoW(20, 0, str(path), 3)
elif user_platform == "Linux":
import subprocess
subprocess.run(["gsettings", "set", "org.gnome.desktop.background", "picture-uri", f"file://{path}"])
elif user_platform == "Darwin": # (Mac OS)
import subprocess
subprocess.run(
["osascript", "-e", f'tell application "Finder" to set desktop picture to POSIX file "{path}"'])
return "Wallpaper changed successfully"
except Exception as e:
return f"Failed to change wallpaper. Error: {e}", 500
@app.route("/setup/download_file", methods=['POST'])
def download_file():
data = request.json
url = data.get('url', None)
path = data.get('path', None)
if not url or not path:
return "Path or URL not supplied!", 400
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
max_retries = 3
for i in range(max_retries):
try:
response = requests.get(url, stream=True)
response.raise_for_status()
with open(path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return "File downloaded successfully"
except requests.RequestException as e:
logger.error(f"Failed to download {url}. Retrying... ({max_retries - i - 1} attempts left)")
return f"Failed to download {url}. No retries left. Error: {e}", 500
@app.route("/setup/open_file", methods=['POST'])
def open_file():
data = request.json
path = data.get('path', None)
if not path:
return "Path not supplied!", 400
path = Path(path)
if not path.exists():
return f"File not found: {path}", 404
try:
if platform.system() == "Windows":
os.startfile(path)
else:
open_cmd: str = "open" if platform.system() == "Darwin" else "xdg-open"
subprocess.Popen([open_cmd, str(path)])
return "File opened successfully"
except Exception as e:
return f"Failed to open {path}. Error: {e}", 500
if __name__ == '__main__':
app.run(debug=True, host="0.0.0.0")