Merge branch 'main' into zdy

This commit is contained in:
David Chang
2024-01-10 22:16:25 +08:00
1971 changed files with 303852 additions and 225 deletions

View File

@@ -1,11 +1,15 @@
import json
import logging
import random
from typing import Any, Dict, Optional
import requests
from desktop_env.envs.actions import KEYBOARD_KEYS
import logging
logger = logging.getLogger("desktopenv.pycontroller")
class PythonController:
def __init__(self, http_server: str, pkgs_prefix: str = "python -c \"import pyautogui; {command}\""):
self.http_server = http_server
@@ -71,6 +75,10 @@ class PythonController:
action_type = action["action_type"]
parameters = action["parameters"] if "parameters" in action else {}
move_mode = random.choice(
["pyautogui.easeInQuad", "pyautogui.easeOutQuad", "pyautogui.easeInOutQuad", "pyautogui.easeInBounce",
"pyautogui.easeInElastic"])
duration = random.uniform(0.5, 1)
if action_type == "MOVE_TO":
if parameters == {} or None:
@@ -78,7 +86,7 @@ class PythonController:
elif "x" in parameters and "y" in parameters:
x = parameters["x"]
y = parameters["y"]
self.execute_python_command(f"pyautogui.moveTo({x}, {y})")
self.execute_python_command(f"pyautogui.moveTo({x}, {y}, {duration}, {move_mode})")
else:
raise Exception(f"Unknown parameters: {parameters}")
@@ -220,6 +228,27 @@ class PythonController:
else:
raise Exception(f"Unknown action type: {action_type}")
def get_vm_screen_size(self):
"""
Gets the size of the vm screen.
"""
response = requests.post(self.http_server + "/screen_size")
if response.status_code == 200:
return response.json()
else:
logger.error("Failed to get screen size. Status code: %d", response.status_code)
return None
def get_vm_window_size(self, app_class_name: str):
"""
Gets the size of the vm app window.
"""
response = requests.post(self.http_server + "/window_size", data={"app_class_name": app_class_name})
if response.status_code == 200:
return response.json()
else:
logger.error("Failed to get window size. Status code: %d", response.status_code)
return None
def get_vlc_status(self, host='localhost', port=8080, password='password'):
url = f'http://{host}:{port}/requests/status.xml'
@@ -227,8 +256,7 @@ class PythonController:
response = requests.get(url, auth=('', password))
if response.status_code == 200:
print("File downloaded successfully")
return response.content
else:
print("Failed to get vlc status. Status code:", response.status_code)
logger.error("Failed to get vlc status. Status code: %d", response.status_code)
return None

View File

@@ -199,9 +199,13 @@ class SetupController:
except requests.exceptions.RequestException as e:
logger.error("An error occurred while trying to send the request: %s", e)
def _launch_setup(self, command: List[str]):
def _launch_setup(self, command: Union[str, List[str]]):
if not command:
raise Exception("Empty comman to launch.")
raise Exception("Empty command to launch.")
if isinstance(command, str) and len(command.split()) > 1:
logger.warning("Command should be a list of strings. Now it is a string. Will split it by space.")
command = command.split()
payload = json.dumps({"command": command})
headers = {"Content-Type": "application/json"}

View File

@@ -1,3 +1,3 @@
from .file import get_cloud_file, get_vm_file, get_cache_file
from .misc import get_rule, get_desktop_path, get_wallpaper, get_accessibility_tree
from .vlc import get_vlc_playing_info
from .vlc import get_vlc_playing_info, get_vlc_config

View File

@@ -1,7 +1,7 @@
import os
from typing import Dict
from typing import Optional
import os
import requests
@@ -40,11 +40,13 @@ def get_vm_file(env, config: Dict[str, str]) -> Optional[str]:
file = env.controller.get_file(config["path"])
if file is None:
return None
#raise FileNotFoundError("File not found on VM: {:}".format(config["path"]))
with open(_path, "wb") as f:
f.write(file)
return _path
def get_cache_file(env, config: Dict[str, str]) -> str:
"""
Config:

View File

@@ -0,0 +1,23 @@
from typing import Dict
import os
import requests
def get_string(env, config: Dict[str, str]) -> str:
"""
Config:
string (str)
"""
return config["string"]
def get_command_line(env, config: Dict[str, str]) -> str:
"""
Config:
string (str)
"""
f = os.popen(config["command"])
return f.read()

View File

@@ -1,95 +1,19 @@
import logging
from typing import TypeVar
#from typing import Dict, List
import platform
import subprocess
import ctypes
import os
#import pyatspi
#from pyatspi import Accessible, StateType
#from pyatspi import Component, Document
#from pyatspi import Text as ATText
#from pyatspi import Value as ATValue
#from pyatspi import Action as ATAction
#import lxml.etree
#from lxml.etree import _Element
import logging
logger = logging.getLogger("desktopenv.getters.misc")
R = TypeVar("Rule")
def get_rule(env, config: R) -> R:
"""
Returns the rule as-is.
"""
return config["rules"]
def get_desktop_path(*args):
username = os.getlogin() # Get the current username
if platform.system() == "Windows":
return os.path.join("C:", "Users", username, "Desktop")
elif platform.system() == "Darwin": # macOS is identified as 'Darwin'
return os.path.join("/Users", username, "Desktop")
elif platform.system() == "Linux":
return os.path.join("/home", username, "Desktop")
else:
raise Exception("Unsupported operating system")
def get_wallpaper(*args):
def get_wallpaper_windows():
SPI_GETDESKWALLPAPER = 0x73
MAX_PATH = 260
buffer = ctypes.create_unicode_buffer(MAX_PATH)
ctypes.windll.user32.SystemParametersInfoW(SPI_GETDESKWALLPAPER, MAX_PATH, buffer, 0)
return buffer.value
def get_wallpaper_macos():
script = """
tell application "System Events" to tell every desktop to get picture
"""
process = subprocess.Popen(['osascript', '-e', script], stdout=subprocess.PIPE)
output, error = process.communicate()
if error:
logger.error("Error: %s", error)
else:
return output.strip().decode('utf-8')
def get_wallpaper_linux():
try:
output = subprocess.check_output(["gsettings", "get", "org.gnome.desktop.background", "picture-uri"])
return output.decode('utf-8').strip().replace('file://', '').replace("'", "")
except Exception as e:
logger.error("Error: %s", e)
return None
os_name = platform.system()
if os_name == 'Windows':
return get_wallpaper_windows()
elif os_name == 'Darwin':
return get_wallpaper_macos()
elif os_name == 'Linux':
return get_wallpaper_linux()
else:
return "Unsupported OS"
#def get_accessibility_tree(*args) -> _Element:
#desktop: Accessible = pyatspi.Registry.getDesktop(0)
#desktop_xml: _Element = _create_node(desktop)
#return desktop_xml
def get_accessibility_tree(env, *args) -> str:
accessibility_tree: str = env.controller.get_accessibility_tree()
logger.debug("AT@eval: %s", accessibility_tree)
return accessibility_tree
#if __name__ == "__main__":
#import sys
#with open(sys.argv[1], "w") as f:
#f.write( lxml.etree.tostring( get_accessibility_tree()
#, encoding="unicode"
#, pretty_print=True
#)
#)

View File

@@ -1,6 +1,9 @@
import logging
import os
from typing import Dict
logger = logging.getLogger("desktopenv.getters.vlc")
def get_vlc_playing_info(env, config: Dict[str, str]):
"""
@@ -13,7 +16,33 @@ def get_vlc_playing_info(env, config: Dict[str, str]):
password = 'password'
content = env.controller.get_vlc_status(host, port, password)
print("content: ", content)
with open(_path, "wb") as f:
f.write(content)
return _path
def get_vlc_config(env, config: Dict[str, str]):
"""
Reads the VLC configuration file to check setting.
"""
_path = os.path.join(env.cache_dir, config["dest"])
os_type = env.controller.execute_python_command("import platform; print(platform.system())")['output'].strip()
# fixme: depends on how we config and install the vlc in virtual machine, need to be aligned and double-checked
if os_type == "Linux":
config_path = \
env.controller.execute_python_command("import os; print(os.path.expanduser('~/.config/vlc/vlcrc'))")[
'output'].strip()
elif os_type == "Darwin":
config_path = env.controller.execute_python_command(
"import os; print(os.path.expanduser('~/Library/Preferences/org.videolan.vlc/vlcrc'))")['output'].strip()
elif os_type == "Windows":
config_path = env.controller.execute_python_command(
"import os; print(os.path.expanduser('~\\AppData\\Roaming\\vlc\\vlcrc'))")['output'].strip()
content = env.controller.get_file(config_path)
with open(_path, "wb") as f:
f.write(content)

View File

@@ -130,6 +130,12 @@ To enable and use the HTTP interface in VLC Media Player for remote control and
- You will be prompted for a password. Enter the password you set in the Lua HTTP settings.
- Once logged in, you will have access to VLC's HTTP interface for remote control.
#### Packages
```bash
pip install opencv-python-headless Pillow imagehash
```
#### Troubleshooting
- If you cannot access the HTTP interface, check if your firewall or security software is blocking the connection.

View File

@@ -5,5 +5,5 @@ from .docs import compare_font_names, compare_subscript_contains, has_page_numbe
from .docs import is_first_line_centered, check_file_exists, compare_contains_image
from .pdf import check_pdf_pages
from .libreoffice import check_libre_locale
#from .vlc import is_vlc_playing
from .vlc import is_vlc_playing, is_vlc_recordings_folder
from .general import check_csv, check_accessibility_tree, check_list

View File

@@ -1,86 +1,142 @@
import os
import platform
from xml.etree import ElementTree
import pygetwindow as gw
import pyautogui
from typing import Dict
import logging
import os
import subprocess
from typing import Dict
from xml.etree import ElementTree
import acoustid
import cv2
import imagehash
from PIL import Image
logger = logging.getLogger("desktopenv.metrics.vlc")
def get_vlc_config(setting_name):
"""
Reads the VLC configuration file to check for a specific setting.
# Example usage
setting_name = 'recordings_folder='
setting = read_vlc_config(setting_name)
"""
# Common paths for VLC config file on different operating systems
paths = {
'Windows': os.path.expanduser('~\\AppData\\Roaming\\vlc\\vlcrc'),
'Darwin': os.path.expanduser('~/Library/Preferences/org.videolan.vlc/vlcrc'),
'Linux': os.path.expanduser('~/.config/vlc/vlcrc')
}
os_type = platform.system()
config_path = paths.get(os_type)
if not config_path or not os.path.exists(config_path):
logger.warning("VLC config file not found for this operating system.")
return None
try:
with open(config_path, 'r', encoding="utf-8") as file:
for line in file:
if line.startswith(setting_name):
return line.strip()
except IOError as e:
logger.error(f"Error reading config file: {e}")
return None
def is_vlc_playing(actual: str, rule: Dict[str, str]) -> float:
def is_vlc_playing(actual_status_path: str, rule: Dict[str, str]) -> float:
"""
Checks if VLC is currently playing a file.
"""
with open(actual, 'rb') as file:
with open(actual_status_path, 'rb') as file:
actual_status = file.read().decode('utf-8')
tree = ElementTree.fromstring(actual_status)
status = tree.find('state').text
if status == 'playing':
file_info = tree.find('information/category[@name="meta"]/info[@name="filename"]').text
print("file_info: ", file_info)
if file_info:
return 1 if file_info.endswith(rule['expected']) else 0
if rule['type'] == 'file_name':
file_info = tree.find('information/category[@name="meta"]/info[@name="filename"]').text
if file_info:
return 1 if file_info.endswith(rule['file_name']) else 0
elif rule['type'] == 'url':
file_info = tree.find('information/category[@name="meta"]/info[@name="url"]').text
if file_info:
return 1 if file_info.endswith(rule['url']) else 0
else:
logger.error(f"Unknown type: {rule['type']}")
return 0
else:
return 0
def is_vlc_fullscreen():
def is_vlc_recordings_folder(actual_config_path: str, rule: Dict[str, str]) -> float:
"""
Checks if the VLC window is in full-screen mode.
Checks if VLC's recording folder is set to the expected value.
"""
with open(actual_config_path, 'rb') as file:
config_file = file.read().decode('utf-8')
expected_recording_file_path = rule['recording_file_path']
When VLC is in full-screen mode, its window size matches the screen size with no borders.
"""
try:
# Get the VLC window; adjust the title as per your VLC window's title
vlc_window = gw.getWindowsWithTitle('VLC media player')[0] # Adjust title if needed
if not vlc_window:
for line in config_file:
# Skip comments and empty lines
if line.startswith('#') or not line.strip():
continue
# Check if the line contains the recording path setting
if 'recorded_files_path' in line:
# Extract the value of the recording path and remove surrounding whitespace
current_path = line.split('=')[-1].strip()
# Compare with the Desktop path
if current_path == expected_recording_file_path:
return True
else:
return False
# The configuration key was not found in the file
return False
# Get screen size
screen_width, screen_height = pyautogui.size()
# Check if VLC window size matches the screen size
return (vlc_window.width == screen_width and vlc_window.height == screen_height)
except IndexError:
# VLC window not found
logger.error("VLC window not found.")
except FileNotFoundError:
logger.error("VLC configuration file not found.")
return False
except Exception as e:
logger.error(f"An error occurred: {e}")
return False
def is_vlc_fullscreen(actual_window_size, screen_size):
if actual_window_size['width'] == screen_size['width'] and actual_window_size['height'] == screen_size['height']:
return True
else:
return False
def compare_videos(video_path1, video_path2, max_frames_to_check=100, threshold=5):
# Open both video files
cap1 = cv2.VideoCapture(video_path1)
cap2 = cv2.VideoCapture(video_path2)
frames_checked = 0
mismatch_count = 0
while frames_checked < max_frames_to_check:
# Read frames from both videos
ret1, frame1 = cap1.read()
ret2, frame2 = cap2.read()
# If a video ends, then check if both ended to confirm they are of the same length
if not ret1 or not ret2:
return ret1 == ret2
# Convert frames to PIL Images
frame1 = Image.fromarray(cv2.cvtColor(frame1, cv2.COLOR_BGR2RGB))
frame2 = Image.fromarray(cv2.cvtColor(frame2, cv2.COLOR_BGR2RGB))
# Compute the perceptual hash for each frame
hash1 = imagehash.phash(frame1)
hash2 = imagehash.phash(frame2)
# Increment the frames checked
frames_checked += 1
# Compute the difference in the hashes
if hash1 - hash2 > threshold:
mismatch_count += 1
# If there's a significant difference, the frames are not the same
if mismatch_count > threshold:
return False
# If we reach here, the content appears to be the same
return True
def are_audio_files_similar(mp3_file_path, mp4_file_path):
# Extract audio fingerprint from MP3 file
mp3_fingerprint, mp3_duration = acoustid.fingerprint_file(mp3_file_path)
# Extract the audio stream from the MP4 file
mp4_audio_path = os.path.splitext(mp4_file_path)[0] + '_extracted.mp3'
try:
subprocess.run(["ffmpeg", "-i", mp4_file_path, "-vn", "-ar", "44100", "-ac", "2", "-ab", "192k", "-f", "mp3",
mp4_audio_path], check=True)
except subprocess.CalledProcessError as e:
print(f"An error occurred during audio extraction from MP4: {e}")
return False
# Extract audio fingerprint from the extracted audio
mp4_fingerprint, mp4_duration = acoustid.fingerprint_file(mp4_audio_path)
# Clean up temporary extracted audio file
os.remove(mp4_audio_path)
# Compare fingerprints (rudimentary comparison)
if mp3_duration >= mp4_duration and mp3_fingerprint == mp4_fingerprint:
return True
return False

View File

@@ -0,0 +1,32 @@
def compare_text_file(actual: str, expected: str, **options) -> float:
"""
Args:
actual (str): path to result xlsx
expected (str): path to gold xlsx
options (Dict[str, List[str]]): dict like
{
}
Return:
float: the score
"""
with open(actual) as f1:
actual_text = f1.read()
with open(expected) as f2:
expected_text = f2.read()
if actual_text == expected_text:
return 1.0
return 0.0
def compare_answer(actual: str, expected: str, **options) -> float:
if actual == expected:
return 1.0
# TODO: can use text embedding to get non-zero return
return 0.0
if __name__ == '__main__':
print(compare_text_file("README.md", "README.md"))

View File

@@ -1,13 +1,9 @@
import ctypes
import os
from pathlib import Path
import platform
import subprocess
from pathlib import Path
from pyxcursor import Xcursor
# import Xlib.display
import pyautogui
# from PIL import ImageGrab, Image
from PIL import Image
import lxml.etree
from lxml.etree import _Element
import pyatspi
@@ -17,12 +13,19 @@ from pyatspi import Text as ATText
from pyatspi import Value as ATValue
from pyatspi import Action as ATAction
import requests
from flask import Flask, request, jsonify, send_file
from typing import List, Dict
from typing import Any
import Xlib
import pyautogui
from PIL import Image
from Xlib import display, X
from pyxcursor import Xcursor
import requests
from flask import Flask, request, jsonify, send_file, abort
from werkzeug.utils import secure_filename
app = Flask(__name__)
pyautogui.PAUSE = 0
@@ -30,6 +33,7 @@ pyautogui.DARWIN_CATCH_UP_TIME = 0
logger = app.logger
@app.route('/setup/execute', methods=['POST'])
@app.route('/execute', methods=['POST'])
def execute_command():
@@ -52,6 +56,7 @@ def execute_command():
'message': str(e)
}), 500
@app.route('/setup/launch', methods=["POST"])
def launch_app():
data = request.json
@@ -61,11 +66,7 @@ def launch_app():
subprocess.Popen(command)
return "{:} launched successfully".format(" ".join(command))
except Exception as e:
return jsonify( { "status": "error"
, "message": str(e)
}
)\
, 500
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/screenshot', methods=['GET'])
@@ -248,6 +249,164 @@ def get_accessibility_tree():
desktop_xml: _Element = _create_node(desktop)
return jsonify({"AT": lxml.etree.tostring(desktop_xml, encoding="unicode")})
@app.route('/screen_size', methods=['POST'])
def get_screen_size():
d = display.Display()
screen_width = d.screen().width_in_pixels
screen_height = d.screen().height_in_pixels
return jsonify(
{
"width": screen_width,
"height": screen_height
}
)
@app.route('/window_size', methods=['POST'])
def get_window_size():
if 'app_class_name' in request.form:
app_class_name = request.form['app_class_name']
else:
return jsonify({"error": "app_class_name is required"}), 400
d = display.Display()
root = d.screen().root
window_ids = root.get_full_property(d.intern_atom('_NET_CLIENT_LIST'), X.AnyPropertyType).value
for window_id in window_ids:
try:
window = d.create_resource_object('window', window_id)
wm_class = window.get_wm_class()
if wm_class is None:
continue
if app_class_name.lower() in [name.lower() for name in wm_class]:
geom = window.get_geometry()
return jsonify(
{
"width": geom.width,
"height": geom.height
}
)
except Xlib.error.XError: # Ignore windows that give an error
continue
return None
@app.route('/desktop_path', methods=['POST'])
def get_desktop_path():
# Get the home directory in a platform-independent manner using pathlib
home_directory = str(Path.home())
# Determine the desktop path based on the operating system
desktop_path = {
"Windows": os.path.join(home_directory, "Desktop"),
"Darwin": os.path.join(home_directory, "Desktop"), # macOS
"Linux": os.path.join(home_directory, "Desktop")
}.get(platform.system(), None)
# Check if the operating system is supported and the desktop path exists
if desktop_path and os.path.exists(desktop_path):
return jsonify(desktop_path=desktop_path)
else:
return jsonify(error="Unsupported operating system or desktop path not found"), 404
@app.route('/wallpaper', methods=['POST'])
def get_wallpaper():
def get_wallpaper_windows():
SPI_GETDESKWALLPAPER = 0x73
MAX_PATH = 260
buffer = ctypes.create_unicode_buffer(MAX_PATH)
ctypes.windll.user32.SystemParametersInfoW(SPI_GETDESKWALLPAPER, MAX_PATH, buffer, 0)
return buffer.value
def get_wallpaper_macos():
script = """
tell application "System Events" to tell every desktop to get picture
"""
process = subprocess.Popen(['osascript', '-e', script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
if error:
app.logger.error("Error: %s", error.decode('utf-8'))
return None
return output.strip().decode('utf-8')
def get_wallpaper_linux():
try:
output = subprocess.check_output(
["gsettings", "get", "org.gnome.desktop.background", "picture-uri"],
stderr=subprocess.PIPE
)
return output.decode('utf-8').strip().replace('file://', '').replace("'", "")
except subprocess.CalledProcessError as e:
app.logger.error("Error: %s", e)
return None
os_name = platform.system()
wallpaper_path = None
if os_name == 'Windows':
wallpaper_path = get_wallpaper_windows()
elif os_name == 'Darwin':
wallpaper_path = get_wallpaper_macos()
elif os_name == 'Linux':
wallpaper_path = get_wallpaper_linux()
else:
app.logger.error(f"Unsupported OS: {os_name}")
abort(400, description="Unsupported OS")
if wallpaper_path:
try:
# Ensure the filename is secure
filename = secure_filename(os.path.basename(wallpaper_path))
return send_file(wallpaper_path, attachment_filename=filename)
except Exception as e:
app.logger.error(f"An error occurred while serving the wallpaper file: {e}")
abort(500, description="Unable to serve the wallpaper file")
else:
abort(404, description="Wallpaper file not found")
@app.route('/list_directory', methods=['POST'])
def get_directory_tree():
def _list_dir_contents(directory):
"""
List the contents of a directory recursively, building a tree structure.
:param directory: The path of the directory to inspect.
:return: A nested dictionary with the contents of the directory.
"""
tree = {'type': 'directory', 'name': os.path.basename(directory), 'children': []}
try:
# List all files and directories in the current directory
for entry in os.listdir(directory):
full_path = os.path.join(directory, entry)
# If entry is a directory, recurse into it
if os.path.isdir(full_path):
tree['children'].append(_list_dir_contents(full_path))
else:
tree['children'].append({'type': 'file', 'name': entry})
except OSError as e:
# If the directory cannot be accessed, return the exception message
tree = {'error': str(e)}
return tree
# Extract the 'path' parameter from the JSON request
data = request.get_json()
if 'path' not in data:
return jsonify(error="Missing 'path' parameter"), 400
start_path = data['path']
# Ensure the provided path is a directory
if not os.path.isdir(start_path):
return jsonify(error="The provided path is not a directory"), 400
# Generate the directory tree starting from the provided path
directory_tree = _list_dir_contents(start_path)
return jsonify(directory_tree=directory_tree)
@app.route('/file', methods=['POST'])
def get_file():
# Retrieve filename from the POST request
@@ -263,6 +422,7 @@ def get_file():
# If the file is not found, return a 404 error
return jsonify({"error": "File not found"}), 404
@app.route("/setup/upload", methods=["POST"])
def upload_file():
# Retrieve filename from the POST request
@@ -274,6 +434,7 @@ def upload_file():
else:
return jsonify({"error": "file_path and file_data are required"}), 400
@app.route('/platform', methods=['GET'])
def get_platform():
return platform.system()