Merge branch 'main' into zdy

This commit is contained in:
David Chang
2024-01-15 12:12:05 +08:00
46 changed files with 1585 additions and 457 deletions

View File

@@ -197,8 +197,10 @@ class PythonController:
if "text" not in parameters:
raise Exception(f"Unknown parameters: {parameters}")
# deal with special ' and \ characters
text = parameters["text"].replace("\\", "\\\\").replace("'", "\\'")
self.execute_python_command(f"pyautogui.typewrite('{text}')")
# text = parameters["text"].replace("\\", "\\\\").replace("'", "\\'")
# self.execute_python_command(f"pyautogui.typewrite('{text}')")
text = parameters["text"]
self.execute_python_command("pyautogui.typewrite({:})".format(repr(text)))
elif action_type == "PRESS":
if "key" not in parameters:
@@ -237,6 +239,9 @@ class PythonController:
keys_para_rep = "', '".join(keys)
self.execute_python_command(f"pyautogui.hotkey('{keys_para_rep}')")
elif action_type in ['WAIT', 'FAIL', 'DONE']:
pass
else:
raise Exception(f"Unknown action type: {action_type}")
@@ -280,3 +285,31 @@ class PythonController:
else:
logger.error("Failed to get wallpaper. Status code: %d", response.status_code)
return None
def get_vm_desktop_path(self):
"""
Gets the desktop path of the vm.
"""
response = requests.post(self.http_server + "/desktop_path")
if response.status_code == 200:
logger.info("Desktop path downloaded successfully")
return response.json()["desktop_path"]
else:
logger.error("Failed to get desktop path. Status code: %d", response.status_code)
return None
def get_vm_directory_tree(self, path):
"""
Gets the directory tree of the vm.
"""
payload = json.dumps({"path": path})
headers = {
'Content-Type': 'application/json'
}
response = requests.post(self.http_server + "/list_directory", headers=headers, data=payload)
if response.status_code == 200:
logger.info("Directory tree downloaded successfully")
return response.json()["directory_tree"]
else:
logger.error("Failed to get directory tree. Status code: %d", response.status_code)
return None

View File

@@ -1,18 +1,18 @@
import json
import time
import logging
import os.path
import time
import traceback
import uuid
from typing import Dict, List
from typing import Any, Union, Optional
from typing import Dict, List
import requests
from playwright.sync_api import sync_playwright
from requests_toolbelt.multipart.encoder import MultipartEncoder
from desktop_env.evaluators.metrics.utils import compare_urls
import logging
logger = logging.getLogger("desktopenv.setup")
@@ -20,6 +20,7 @@ class SetupController:
def __init__(self, vm_ip: str, cache_dir: str):
self.vm_ip: str = vm_ip
self.http_server: str = f"http://{vm_ip}:5000"
self.http_server_setup_root: str = f"http://{vm_ip}:5000/setup"
self.cache_dir: str = cache_dir
def reset_cache_dir(self, cache_dir: str):
@@ -57,31 +58,31 @@ class SetupController:
# can add other setup steps
# ZDY_COMMENT: merged with launch
#def _command_setup(self, command: str):
#"""
#Directly send a command into the virtual machine os for setting up.
#"""
#payload = json.dumps({"command": command})
#headers = {
#'Content-Type': 'application/json'
#}
#timeout = 5
#timout_whitelist = ["vlc"]
#
#try:
#
#response = requests.post(self.http_server + "/execute", headers=headers, data=payload, timeout=timeout)
#if response.status_code == 200:
#print("Command executed successfully:", response.text)
#else:
#print("Failed to execute command. Status code:", response.status_code)
#except requests.exceptions.Timeout as e:
#if command in timout_whitelist:
#print("Command executed successfully:", command)
#else:
#print("An error occurred while trying to execute the command:", e)
#except requests.exceptions.RequestException as e:
#print("An error occurred while trying to execute the command:", e)
# def _command_setup(self, command: str):
# """
# Directly send a command into the virtual machine os for setting up.
# """
# payload = json.dumps({"command": command})
# headers = {
# 'Content-Type': 'application/json'
# }
# timeout = 5
# timout_whitelist = ["vlc"]
#
# try:
#
# response = requests.post(self.http_server + "/execute", headers=headers, data=payload, timeout=timeout)
# if response.status_code == 200:
# print("Command executed successfully:", response.text)
# else:
# print("Failed to execute command. Status code:", response.status_code)
# except requests.exceptions.Timeout as e:
# if command in timout_whitelist:
# print("Command executed successfully:", command)
# else:
# print("An error occurred while trying to execute the command:", e)
# except requests.exceptions.RequestException as e:
# print("An error occurred while trying to execute the command:", e)
def _download_setup(self, files: List[Dict[str, str]]):
"""
@@ -224,9 +225,14 @@ class SetupController:
except requests.exceptions.RequestException as e:
logger.error("An error occurred while trying to send the request: %s", e)
def _execute_setup( self, command: List[str]
, stdout: str = "", stderr: str = ""
, shell: bool = False, until: Optional[Dict[str, Any]] = None):
def _execute_setup(
self,
command: List[str],
stdout: str = "",
stderr: str = "",
shell: bool = False,
until: Optional[Dict[str, Any]] = None
):
if not command:
raise Exception("Empty comman to launch.")
@@ -248,10 +254,10 @@ class SetupController:
if stderr:
with open(os.path.join(self.cache_dir, stderr), "w") as f:
f.write(results["error"])
logger.info( "Command executed successfully: %s -> %s"
, " ".join(command)
, response.text
)
logger.info("Command executed successfully: %s -> %s"
, " ".join(command)
, response.text
)
else:
logger.error("Failed to launch application. Status code: %s", response.text)
results = None
@@ -263,13 +269,13 @@ class SetupController:
results = None
nb_failings += 1
if len(until)==0:
if len(until) == 0:
terminates = True
elif results is not None:
terminates = "returncode" in until and results["returncode"]==until["returncode"]\
or "stdout" in until and until["stdout"] in results["output"]\
or "stderr" in until and until["stderr"] in results["error"]
terminates = terminates or nb_failings>=5
terminates = "returncode" in until and results["returncode"] == until["returncode"] \
or "stdout" in until and until["stdout"] in results["output"] \
or "stderr" in until and until["stderr"] in results["error"]
terminates = terminates or nb_failings >= 5
if not terminates:
time.sleep(0.3)
@@ -292,6 +298,25 @@ class SetupController:
# TODO
raise NotImplementedError()
def _activate_window_setup(self, window_name: str):
if not window_name:
raise Exception(f"Setup Open - Invalid path ({window_name}).")
payload = json.dumps({"window_name": window_name})
headers = {
'Content-Type': 'application/json'
}
# send request to server to open file
try:
response = requests.post(self.http_server + "/setup" + "/activate_window", headers=headers, data=payload)
if response.status_code == 200:
logger.info("Command executed successfully: %s", response.text)
else:
logger.error(f"Failed to activate window {window_name}. Status code: %s", response.text)
except requests.exceptions.RequestException as e:
logger.error("An error occurred while trying to send the request: %s", e)
# Chrome setup
def _chrome_open_tabs_setup(self, urls_to_open: List[str]):
host = self.vm_ip

View File

@@ -186,5 +186,18 @@ ACTION_SPACE = [
"optional": False,
}
}
},
############################################################################################################
{
"action_type": "WAIT",
"note": "wait until the next action",
},
{
"action_type": "FAIL",
"note": "decide the task can not be performed",
},
{
"action_type": "DONE",
"note": "decide the task is done",
}
]

View File

@@ -1,28 +1,30 @@
from __future__ import annotations
import logging
import os
import subprocess
import tempfile
import time
from typing import Callable, Any, Optional
# import uuid
# import platform
from typing import List, Dict
from typing import Callable, Any, Optional
import tempfile
import gymnasium as gym
# import requests
from desktop_env.controllers.python import PythonController
from desktop_env.controllers.setup import SetupController
# from desktop_env.evaluators import eval_funcs
from desktop_env.evaluators import metrics, getters
import logging
# import requests
logger = logging.getLogger("desktopenv.env")
Metric = Callable[[Any, Any], float]
Getter = Callable[[gym.Env, Dict[str, Any]], Any]
def _execute_command(command: List[str]) -> None:
if command[:4] == ["vmrun", "-T", "ws", "start"]:
p = subprocess.Popen(command)
@@ -84,8 +86,8 @@ class DesktopEnv(gym.Env):
self.setup_controller = SetupController(vm_ip=self.vm_ip, cache_dir=self.cache_dir)
# Meta info of the VM, move to the reset() function
self.vm_platform: str = "" # self.controller.get_vm_platform()
self.vm_screen_size = None # self.controller.get_vm_screen_size()
self.vm_platform: str = "" # self.controller.get_vm_platform()
self.vm_screen_size = None # self.controller.get_vm_screen_size()
# mode: human or machine
assert action_space in ["computer_13", "pyautogui"]
@@ -164,7 +166,7 @@ class DesktopEnv(gym.Env):
self.evaluator["expected"]["type"])) if "expected" in self.evaluator else None
self.metric_options: Dict[str, Any] = self.evaluator.get("options", {})
def reset(self, task_config: Optional[Dict[str, Any]] = None, seed=None, options=None):
def reset(self, task_config: Optional[Dict[str, Any]] = None, seed=None, options=None) -> Dict[str, Any]:
logger.info("Resetting environment...")
logger.info("Switching task...")
@@ -202,11 +204,27 @@ class DesktopEnv(gym.Env):
time.sleep(5)
logger.info("Environment setup complete.")
observation = self._get_obs()
observation = {"screenshot": self._get_obs()}
return observation
def step(self, action, pause=0.5):
self._step_no += 1
self.action_history.append(action)
reward = 0 # todo: Define reward calculation for each example
done = False # todo: Define episode termination condition for each example
info = {}
# handle the special actions
if action in ['WAIT', 'FAIL', 'DONE']:
if action == 'WAIT':
time.sleep(pause)
elif action == 'FAIL':
done = True
info = {"fail": True}
elif action == 'DONE':
done = True
info = {"done": True}
# fixme: add reminding logic here, decide if the action is valid for the current action_space
if self.action_space == "computer_13":
@@ -215,18 +233,14 @@ class DesktopEnv(gym.Env):
elif self.action_space == "pyautogui":
# the set of all possible python commands insides `pyautogui`
self.controller.execute_python_command(action)
self.action_history.append(action)
# todo: maybe for the better here we need to add a logic to wait until the rendering is done
time.sleep(pause)
observation = {
"screenshot": self._get_obs(),
"accessibility_tree": self.controller.get_accessibility_tree(),
"terminal": self.controller.get_terminal_output(),
"instruction": self.instruction
}
reward = 0 # todo: Define reward calculation for each example
done = False # todo: Define episode termination condition for each example
info = {}
return observation, reward, done, info
def evaluate(self):

View File

@@ -1,5 +1,9 @@
from .chrome import get_default_search_engine, get_cookie_data, get_bookmarks, get_open_tabs_info, get_pdf_from_url, \
get_shortcuts_on_desktop
from .file import get_cloud_file, get_vm_file, get_cache_file
from .general import get_vm_command_line
from .info import get_vm_screen_size, get_vm_window_size, get_vm_wallpaper
from .misc import get_rule, get_accessibility_tree
from .replay import get_replay
from .vlc import get_vlc_playing_info, get_vlc_config
from .chrome import get_default_search_engine, get_bookmarks, get_open_tabs_info
from .vscode import get_vscode_config

View File

@@ -46,6 +46,10 @@ def get_default_search_engine(env, config: Dict[str, str]):
def get_cookie_data(env, config: Dict[str, str]):
"""
Get the cookies from the Chrome browser.
Assume the cookies are stored in the default location, not encrypted and not large in size.
"""
os_type = env.vm_platform
if os_type == 'Windows':
chrome_cookie_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
@@ -61,21 +65,23 @@ def get_cookie_data(env, config: Dict[str, str]):
else:
raise Exception('Unsupported operating system')
# todo: add a new controller function to connect the cookie database
#############
try:
conn = sqlite3.connect(chrome_cookie_file_path)
content = env.controller.get_file(chrome_cookie_file_path)
_path = os.path.join(env.cache_dir, config["dest"])
with open(_path, "wb") as f:
f.write(content)
conn = sqlite3.connect(_path)
cursor = conn.cursor()
# Query to check for OpenAI cookies
cursor.execute("SELECT * FROM cookies")
cookies = cursor.fetchall()
return cookies
except Exception as e:
logger.error(f"Error: {e}")
return None
#############
def get_bookmarks(env, config: Dict[str, str]):
@@ -94,17 +100,12 @@ def get_bookmarks(env, config: Dict[str, str]):
else:
raise Exception('Unsupported operating system')
try:
content = env.controller.get_file(preference_file_path)
# make content json variable
data = json.load(content)
bookmarks = data.get('roots', {})
return bookmarks
except Exception as e:
logger.error(f"Error: {e}")
return None
content = env.controller.get_file(preference_file_path)
if not content:
return []
data = json.loads(content)
bookmarks = data.get('roots', {})
return bookmarks
# todo: move this to the main.py
@@ -190,3 +191,83 @@ def get_active_tab_info(env, config: Dict[str, str]):
browser.close()
return active_tab_info
def get_pdf_from_url(env, config: Dict[str, str]) -> str:
"""
Download a PDF from a URL.
"""
_url = config["path"]
_path = os.path.join(env.cache_dir, config["dest"])
host = env.vm_ip
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
remote_debugging_url = f"http://{host}:{port}"
with sync_playwright() as p:
browser = p.chromium.connect_over_cdp(remote_debugging_url)
page = browser.new_page()
page.goto(_url)
page.pdf(path=_path)
browser.close()
return _path
# fixme: needs to be changed (maybe through post-processing) since it's not working
def get_chrome_saved_address(env, config: Dict[str, str]):
# host = env.vm_ip
host = "192.168.13.130"
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
remote_debugging_url = f"http://{host}:{port}"
with sync_playwright() as p:
# connect to remote Chrome instance
browser = p.chromium.connect_over_cdp(remote_debugging_url)
page = browser.new_page()
# Navigate to Chrome's settings page for autofill
page.goto("chrome://settings/addresses")
# Get the HTML content of the page
content = page.content()
browser.close()
return content
def get_shortcuts_on_desktop(env, config: Dict[str, str]):
# Find out the operating system
os_name = env.vm_platform
# Depending on the OS, define the shortcut file extension
if os_name == 'Windows':
# Windows shortcuts are typically .url or .lnk files
shortcut_extension = '.lnk'
elif os_name == 'Darwin':
# macOS's shortcuts are .webloc files
shortcut_extension = '.webloc'
elif os_name == 'Linux':
# Linux (Ubuntu, etc.) shortcuts are typically .desktop files
shortcut_extension = '.desktop'
else:
logger.error(f"Unsupported operating system: {os_name}")
return []
# Get the path to the desktop folder
desktop_path = env.controller.get_vm_desktop_path()
desktop_directory_tree = env.controller.get_vm_directory_tree(desktop_path)
shortcuts_paths = [file['name'] for file in desktop_directory_tree['children'] if
file['name'].endswith(shortcut_extension)]
short_cuts = {}
for shortcut_path in shortcuts_paths:
short_cuts[shortcut_path] = env.controller.get_file(env.controller.execute_python_command(
f"import os; print(os.path.join(os.path.expanduser('~'), 'Desktop', '{shortcut_path}'))")['output'].strip()).decode('utf-8')
return short_cuts

View File

@@ -40,7 +40,7 @@ def get_vm_file(env, config: Dict[str, str]) -> Optional[str]:
file = env.controller.get_file(config["path"])
if file is None:
return None
#raise FileNotFoundError("File not found on VM: {:}".format(config["path"]))
# raise FileNotFoundError("File not found on VM: {:}".format(config["path"]))
with open(_path, "wb") as f:
f.write(file)

View File

@@ -1,23 +1,19 @@
import logging
from typing import Dict
import os
import requests
logger = logging.getLogger("desktopenv.getters.general")
def get_string(env, config: Dict[str, str]) -> str:
"""
Config:
string (str)
"""
return config["string"]
def get_vm_command_line(env, config: Dict[str, str]):
vm_ip = env.vm_ip
port = 5000
command = config["command"]
def get_command_line(env, config: Dict[str, str]) -> str:
"""
Config:
string (str)
"""
f = os.popen(config["command"])
return f.read()
response = requests.post(f"http://{vm_ip}:{port}/execute", json={"command": command})
if response.status_code == 200:
return response.json()["output"]
else:
logger.error("Failed to get vm command line. Status code: %d", response.status_code)
return None

View File

@@ -1,6 +1,5 @@
import logging
from typing import TypeVar
#from typing import Dict, List
logger = logging.getLogger("desktopenv.getters.misc")
@@ -13,6 +12,7 @@ def get_rule(env, config: R) -> R:
"""
return config["rules"]
def get_accessibility_tree(env, *args) -> str:
accessibility_tree: str = env.controller.get_accessibility_tree()
logger.debug("AT@eval: %s", accessibility_tree)

View File

@@ -0,0 +1,20 @@
from typing import List, Dict, Any
def get_replay(env, trajectory: List[Dict[str, Any]]) -> None:
# fixme: need to be combined with the accessibility tree to activate the selection of the target window
def parse(action):
if action["type"] == "hotkey":
keys = "', '".join(action["param"])
return f"pyautogui.hotkey('{keys}')"
if action["type"] == "typewrite":
text = action["param"]
return f"pyautogui.typewrite('{text}')"
if action["type"] == "press":
key = action["param"]
return f"pyautogui.press('{key}')"
for action in trajectory:
env.controller.execute_python_command(parse(action))

View File

@@ -0,0 +1,34 @@
import logging
from typing import Any, Dict
from .file import get_vm_file
from .replay import get_replay
logger = logging.getLogger("desktopenv.getters.vscode")
def get_vscode_config(env, config: Dict[str, Any]) -> str:
os_type = env.vm_platform
vscode_extension_command = config["vscode_extension_command"]
# fixme: depends on how we config and install the vscode in virtual machine, need to be aligned and double-checked
if os_type == "MacOS":
trajectory = [
{"type": "hotkey", "param": ["command", "shift", "p"]},
{"type": "typewrite", "param": vscode_extension_command},
{"type": "press", "param": "enter"}
]
else:
trajectory = [
{"type": "hotkey", "param": ["ctrl", "shift", "p"]},
{"type": "typewrite", "param": vscode_extension_command},
{"type": "press", "param": "enter"}
]
get_replay(env, trajectory)
return get_vm_file(env, {
"path": config["path"],
"dest": config["dest"]
})

View File

@@ -1,4 +1,4 @@
from .chrome import is_expected_tabs, is_expected_bookmarks
from .chrome import is_expected_tabs, is_expected_bookmarks, compare_pdfs, is_cookie_deleted, is_shortcut_on_desktop
from .docs import compare_font_names, compare_subscript_contains, has_page_numbers_in_footers
from .docs import find_default_font, contains_page_break, compare_docx_files, compare_docx_tables, compare_line_spacing, \
compare_insert_equation
@@ -13,4 +13,5 @@ from .vlc import is_vlc_playing, is_vlc_recordings_folder, is_vlc_fullscreen, co
from .gimp import increase_saturation, decrease_brightness, check_file_exists, compare_triangle_positions
from .general import check_csv, check_accessibility_tree, check_list, run_sqlite3
from .thunderbird import check_thunderbird_prefs, check_thunderbird_filter
from .vscode import compare_text_file, compare_config, compare_answer, is_extension_installed
from .impress import check_slide_numbers_color, compare_pptx_files, check_for_two_lines

View File

@@ -1,5 +1,9 @@
import logging
from typing import Any, Dict, List
import fitz # PyMuPDF
import rapidfuzz.fuzz as fuzz
from desktop_env.evaluators.metrics.utils import are_lists_equal, compare_urls
logger = logging.getLogger("desktopenv.metrics.chrome")
@@ -22,18 +26,72 @@ def is_expected_tabs(open_tabs: List[Dict[str, str]], rule: Dict[str, Any]) -> f
return 0
def is_expected_bookmarks(bookmarks: List[Dict[str, Any]], rule: Dict[str, Any]) -> float:
def is_expected_bookmarks(bookmarks: List[str], rule: Dict[str, Any]) -> float:
"""
Checks if the expected bookmarks are in Chrome.
"""
# todo
match_type = rule['type']
if match_type == "url":
expected_urls = rule['urls']
actual_urls = [bookmark['url'] for bookmark in bookmarks]
return 1 if are_lists_equal(expected_urls, actual_urls, compare_urls) else 0
if not bookmarks:
return 0.
elif rule['type'] == "bookmark_bar_folders_names":
bookmark_bar_folders_names = [bookmark['name'] for bookmark in bookmarks['bookmark_bar']['children'] if
bookmark['type'] == 'folder']
return 1. if set(bookmark_bar_folders_names) == set(rule['names']) else 0.
elif rule['type'] == "bookmark_bar_websites_urls":
bookmark_bar_websites_urls = [bookmark['url'] for bookmark in bookmarks['bookmark_bar']['children'] if
bookmark['type'] == 'url']
return 1. if set(bookmark_bar_websites_urls) == set(rule['urls']) else 0.
else:
logger.error(f"Unknown type: {match_type}")
return 0
raise TypeError(f"{rule['type']} not support yet!")
def compare_pdfs(pdf1_path, pdf2_path):
"""
Compare two PDF files.
"""
def extract_text_from_pdf(pdf_path):
"""Extract text from each page of the PDF."""
text = ""
with fitz.open(pdf_path) as pdf:
for page in pdf:
text += page.get_text()
return text.strip()
text1 = extract_text_from_pdf(pdf1_path)
text2 = extract_text_from_pdf(pdf2_path)
return fuzz.ratio(text1, text2) / 100
def is_cookie_deleted(cookie_data, rule):
"""
Check if the cookie is deleted.
"""
if rule['type'] == 'domains':
cookies_domains = [cookie[1] for cookie in cookie_data]
for domain in rule['domains']:
for cookies_domain in cookies_domains:
if compare_urls(domain, cookies_domain):
return 0.
return 1.
else:
raise TypeError(f"{rule['type']} not support yet!")
def is_shortcut_on_desktop(shortcuts: Dict[str, str], rule):
"""
Check if the shortcut is on the desktop.
"""
# fixme: if the name of the website changed in the future, this will not work; can be replaced with url
if rule['type'] == 'name':
for shortcut_path, shortcut_content in shortcuts.items():
if "Name=" + rule['name'] + "\n" in shortcut_content:
return 1.
return 0.
elif rule['type'] == 'url':
raise TypeError(f"{rule['type']} not support yet!")
elif rule['type'] == 'id':
raise TypeError(f"{rule['type']} not support yet!")
else:
raise TypeError(f"{rule['type']} not support yet!")

View File

@@ -1,12 +1,14 @@
import xml.etree.ElementTree as ET
import logging
import os
import xml.etree.ElementTree as ET
from typing import List, Dict, Any
from docx import Document
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
import logging
logger = logging.getLogger("desktopenv.metric.docs")
def find_default_font(config_file_path, rules):
"""Find the default font in LibreOffice Writer."""
default_font = None

View File

@@ -1,4 +1,75 @@
from pptx import Presentation
import os
def is_red_color(color):
#judge if the color is red
print(color.rgb)
return color and color.rgb == (255, 0, 0)
def get_master_placeholder_color(prs):
# get the color of the placeholder
masters = prs.slide_masters
for idx, master in enumerate(masters):
for placeholder in master.placeholders:
if placeholder.has_text_frame and placeholder.text == "<number>":
text_frame = placeholder.text_frame
if text_frame.paragraphs:
first_paragraph = text_frame.paragraphs[0]
return first_paragraph.font.color
return None
def check_slide_numbers_color(pptx_file_path):
presentation = Presentation(pptx_file_path)
for i, slide in enumerate(presentation.slides):
for shape in slide.shapes:
# check if the shape is a text box
if hasattr(shape, "text"):
if shape.text.isdigit():
# "SlidePlaceholder" is the name of the placeholder in the master slide
page_number_text = shape.text
font_color = get_master_placeholder_color(presentation)
print(font_color)
return 1 if font_color is not None and is_red_color(font_color) else 0
def compare_pptx_files(file1_path, file2_path):
prs1 = Presentation(file1_path)
prs2 = Presentation(file2_path)
# compare the number of slides
if len(prs1.slides) != len(prs2.slides):
return 0
# compare the content of each slide
for slide1, slide2 in zip(prs1.slides, prs2.slides):
# check if the shapes are the same
for shape1, shape2 in zip(slide1.shapes, slide2.shapes):
if hasattr(shape1, "text") and hasattr(shape2, "text"):
if shape1.text != shape2.text:
return 0
return 1
def has_two_lines_on_page(slide):
line_count = 0
for shape in slide.shapes:
if shape.shape_type == 1: # 1 表示 Line 形状
line_count += 1
if line_count >= 2:
return True
return False
def check_for_two_lines(prs):
prs = Presentation(prs)
for i, slide in enumerate(prs.slides):
if has_two_lines_on_page(slide):
return 1
return 0
def check_file_exists(directory, filename):
file_path = os.path.join(directory, filename)
return 1 if os.path.isfile(file_path) else 0
if __name__ == "__main__":
path1 = "../../任务数据/LibreOffice Impress/Change_Color_Slide_Number_gold_textbox.pptx"

View File

@@ -1,37 +1,38 @@
import lxml.cssselect
from lxml.etree import _Element as Element
import lxml.etree
import fnmatch
from typing import Dict, List
import lxml.cssselect
import lxml.etree
from lxml.etree import _Element as Element
_libconf_namespaces = [("oor", "http://openoffice.org/2001/registry")]
_libconf_ns_mapping = dict(_libconf_namespaces)
_setup_locale_selector = lxml.cssselect.CSSSelector( 'item[oor|path$=L10N]>prop[oor|name=ooSetupSystemLocale]>value'
, namespaces=_libconf_ns_mapping
)
_locale_selector = lxml.cssselect.CSSSelector( 'item[oor|path$=L10N]>prop[oor|name=ooLocale]>value'
, namespaces=_libconf_ns_mapping
)
_setup_locale_selector = lxml.cssselect.CSSSelector('item[oor|path$=L10N]>prop[oor|name=ooSetupSystemLocale]>value',
namespaces=_libconf_ns_mapping)
_locale_selector = lxml.cssselect.CSSSelector('item[oor|path$=L10N]>prop[oor|name=ooLocale]>value',
namespaces=_libconf_ns_mapping)
def check_libre_locale(config_file: str, rules: Dict[str, List[str]]) -> float:
config: Element = lxml.etree.parse(config_file).getroot()
setup_locale_setting: List[Element] = _setup_locale_selector(config)
locale_setting: List[Element] = _locale_selector(config)
setup_locale_setting: str = setup_locale_setting[0].text\
if len(setup_locale_setting)>0\
else locale_setting[0].text
setup_locale_setting: str = setup_locale_setting[0].text \
if len(setup_locale_setting) > 0 \
else locale_setting[0].text
return float( any( fnmatch.fnmatchcase(setup_locale_setting, ptn)\
for ptn in rules["locale_set"]
return float(any(fnmatch.fnmatchcase(setup_locale_setting, ptn) \
for ptn in rules["locale_set"]
)
)
)
if __name__ == "__main__":
path1 = "../../任务数据/LibreOffice Calc/registrymodifications.ru.xcu"
print( check_libre_locale( path1, { "locale_set": [ "ru-*", "de-*", "fr-*"
, "pt-*", "es-*", "it-*"
]
}
print(check_libre_locale(path1, {"locale_set": ["ru-*", "de-*", "fr-*"
, "pt-*", "es-*", "it-*"
]
}
)
)
)

View File

@@ -1,13 +1,11 @@
from pypdf import PdfReader
import operator
from typing import Dict
from typing import Any
from typing import Dict
from pypdf import PdfReader
def check_pdf_pages(pdf_file: str, rules: Dict[str, Any]) -> float:
reader = PdfReader(pdf_file)
nb_pages: int = len(reader.pages)
return float( getattr(operator, rules["relation"])( nb_pages
, rules["ref_value"]
)
)
return float(getattr(operator, rules["relation"])(nb_pages, rules["ref_value"]))

View File

@@ -1,18 +1,19 @@
import pandas as pd
import logging
import operator
from numbers import Number
from typing import Any, Union
from typing import Dict, List
import openpyxl
import pandas as pd
from openpyxl import Workbook
from openpyxl.worksheet.worksheet import Worksheet
from .utils import load_charts, load_sparklines
import operator
from typing import Dict, List
from typing import Any, Union
from numbers import Number
import logging
logger = logging.getLogger("desktopenv.metric.table")
def compare_table(actual: str, expected: str, **options) -> float:
"""
Args:
@@ -44,28 +45,28 @@ def compare_table(actual: str, expected: str, **options) -> float:
workbook1: Workbook = openpyxl.load_workbook(actual)
workbook2: Workbook = openpyxl.load_workbook(expected)
if ftr=="sparkline":
if ftr == "sparkline":
sp1 = load_sparklines(actual)
sp2 = load_sparklines(expected)
new_metric: bool = sp1 == sp2
logger.debug("Sparkline Metric: {:}".format(new_metric))
elif ftr=="chart":
elif ftr == "chart":
charts1 = load_charts(workbook1, **options)
charts2 = load_charts(workbook2, **options)
new_metric: bool = charts1 == charts2
logger.debug("Chart Metric: {:}".format(new_metric))
elif ftr=="number_format":
number_formats1: List[str] = [ c.number_format.lower()\
for col in workbook1.active.iter_cols()\
for c in col\
if c.data_type=="n"
]
number_formats2: List[str] = [ c.number_format.lower()\
for col in workbook2.active.iter_cols()\
for c in col\
if c.data_type=="n"
]
new_metric: bool = number_formats1==number_formats2
elif ftr == "number_format":
number_formats1: List[str] = [c.number_format.lower() \
for col in workbook1.active.iter_cols() \
for c in col \
if c.data_type == "n"
]
number_formats2: List[str] = [c.number_format.lower() \
for col in workbook2.active.iter_cols() \
for c in col \
if c.data_type == "n"
]
new_metric: bool = number_formats1 == number_formats2
logger.debug("Number Format Metric: {:}".format(new_metric))
else:
raise NotImplementedError("Unsupported xlsx feature: {:}".format(ftr))
@@ -73,6 +74,7 @@ def compare_table(actual: str, expected: str, **options) -> float:
return float(metric)
def check_sheet_list(result: str, rules: List[Dict[str, Any]]) -> float:
if result is None:
return 0.
@@ -114,6 +116,7 @@ def check_sheet_list(result: str, rules: List[Dict[str, Any]]) -> float:
return float(passes)
def check_xlsx_freeze(result: str, rules: Dict[str, str]) -> float:
if result is None:
return 0.
@@ -121,16 +124,18 @@ def check_xlsx_freeze(result: str, rules: Dict[str, str]) -> float:
worksheet: Worksheet = openpyxl.load_workbook(filename=result).active
return float(worksheet.freeze_panes == rules["position"])
def check_xlsx_zoom(result: str, rules: Dict[str, Union[str, Number]]) -> float:
if result is None:
return 0.
worksheet = openpyxl.load_workbook(filename=result).active
zoom_scale: Number = worksheet.sheet_view.zoomScale or 100.
return float( getattr(operator, rules["relation"])( zoom_scale
return float(getattr(operator, rules["relation"])(zoom_scale
, rules["ref_value"]
)
)
)
if __name__ == '__main__':
# path1 = ""
@@ -168,51 +173,51 @@ if __name__ == '__main__':
# ]
# print(check_sheet_list(path1, rule))
#path1 = "../../任务数据/LibreOffice Calc/Create_column_charts_using_statistics_gold.xlsx"
#path2 = "../../任务数据/LibreOffice Calc/Create_column_charts_using_statistics_gold2.xlsx"
#print(compare_table(path1, path2, features=["chart"], chart_props=["type", "direction"]))
# path1 = "../../任务数据/LibreOffice Calc/Create_column_charts_using_statistics_gold.xlsx"
# path2 = "../../任务数据/LibreOffice Calc/Create_column_charts_using_statistics_gold2.xlsx"
# print(compare_table(path1, path2, features=["chart"], chart_props=["type", "direction"]))
#path1 = "../../任务数据/LibreOffice Calc/Represent_in_millions_billions_gold.xlsx"
#path2 = "../../任务数据/LibreOffice Calc/Represent_in_millions_billions_gold3.xlsx"
#path1 = "../../任务数据/LibreOffice Calc/Set_Decimal_Separator_Dot.xlsx"
#path2 = "../../任务数据/LibreOffice Calc/Set_Decimal_Separator_Dot_gold.xlsx"
#workbook1: Workbook = openpyxl.load_workbook(filename=path1)
#worksheet1: Worksheet = workbook1.active
#import itertools
#for col, r in itertools.product( ['A', 'B']
#, range(1, 20)
#):
#position: str = "{:}{:d}".format(col, r)
#print(worksheet1[position])
#print(worksheet1[position].value)
#print(worksheet1[position].number_format)
#workbook2: Workbook = openpyxl.load_workbook(filename=path2)
#worksheet2: Worksheet = workbook2.active
#for col, r in itertools.product( ['A', 'B']
#, range(1, 20)
#):
#position: str = "{:}{:d}".format(col, r)
#print(worksheet2[position])
#print(worksheet2[position].value)
#print(worksheet2[position].number_format)
#print(compare_table(path1, path2, features=["number_format"]))
# path1 = "../../任务数据/LibreOffice Calc/Represent_in_millions_billions_gold.xlsx"
# path2 = "../../任务数据/LibreOffice Calc/Represent_in_millions_billions_gold3.xlsx"
# path1 = "../../任务数据/LibreOffice Calc/Set_Decimal_Separator_Dot.xlsx"
# path2 = "../../任务数据/LibreOffice Calc/Set_Decimal_Separator_Dot_gold.xlsx"
# workbook1: Workbook = openpyxl.load_workbook(filename=path1)
# worksheet1: Worksheet = workbook1.active
# import itertools
# for col, r in itertools.product( ['A', 'B']
# , range(1, 20)
# ):
# position: str = "{:}{:d}".format(col, r)
# print(worksheet1[position])
# print(worksheet1[position].value)
# print(worksheet1[position].number_format)
# workbook2: Workbook = openpyxl.load_workbook(filename=path2)
# worksheet2: Worksheet = workbook2.active
# for col, r in itertools.product( ['A', 'B']
# , range(1, 20)
# ):
# position: str = "{:}{:d}".format(col, r)
# print(worksheet2[position])
# print(worksheet2[position].value)
# print(worksheet2[position].number_format)
# print(compare_table(path1, path2, features=["number_format"]))
#path1 = "../../任务数据/LibreOffice Calc/Zoom_Out_Oversized_Cells_gold.xlsx"
#path2 = "../../任务数据/LibreOffice Calc/Zoom_Out_Oversized_Cells.xlsx"
#workbook1: Workbook = openpyxl.load_workbook(filename=path1)
#worksheet1: Worksheet = workbook1.active
#print(worksheet1.sheet_view.zoomScale)
#print(type(worksheet1.sheet_view.zoomScale))
#
#import os
#import os.path
#for wb in filter( lambda f: f.endswith(".xlsx")
#, os.listdir("../../任务数据/LibreOffice Calc/")
#):
#path = os.path.join("../../任务数据/LibreOffice Calc/", wb)
#print(wb, openpyxl.load_workbook(filename=path).active.sheet_view.zoomScale)
#print(check_zoom(path1, {"relation": "lt", "ref_value": 100}))
#print(check_zoom(path2, {"relation": "lt", "ref_value": 100}))
# path1 = "../../任务数据/LibreOffice Calc/Zoom_Out_Oversized_Cells_gold.xlsx"
# path2 = "../../任务数据/LibreOffice Calc/Zoom_Out_Oversized_Cells.xlsx"
# workbook1: Workbook = openpyxl.load_workbook(filename=path1)
# worksheet1: Worksheet = workbook1.active
# print(worksheet1.sheet_view.zoomScale)
# print(type(worksheet1.sheet_view.zoomScale))
#
# import os
# import os.path
# for wb in filter( lambda f: f.endswith(".xlsx")
# , os.listdir("../../任务数据/LibreOffice Calc/")
# ):
# path = os.path.join("../../任务数据/LibreOffice Calc/", wb)
# print(wb, openpyxl.load_workbook(filename=path).active.sheet_view.zoomScale)
# print(check_zoom(path1, {"relation": "lt", "ref_value": 100}))
# print(check_zoom(path2, {"relation": "lt", "ref_value": 100}))
path1 = "../../任务数据/LibreOffice Calc/Padding_Decimals_In_Formular_gold.xlsx"
data_frame: pd.DataFrame = pd.read_excel(path1)

View File

@@ -1,16 +1,18 @@
from typing import Dict
def compare_text_file(actual: str, expected: str, **options) -> float:
"""
Args:
actual (str): path to result xlsx
expected (str): path to gold xlsx
options (Dict[str, List[str]]): dict like
{
}
actual (str): path to result text file
expected (str): path to gold text file
Return:
float: the score
"""
if not actual:
return 0.
with open(actual) as f1:
actual_text = f1.read()
with open(expected) as f2:
@@ -20,13 +22,46 @@ def compare_text_file(actual: str, expected: str, **options) -> float:
return 1.0
return 0.0
def compare_answer(actual: str, expected: str, **options) -> float:
if actual == expected:
def compare_config(actual: str, rules: Dict, **options) -> float:
if not actual:
return 0.
with open(actual) as f1:
actual_text = f1.read()
if actual_text == rules['expect']:
return 1.0
return 0.0
def compare_answer(actual: str, rules: Dict, **options) -> float:
"""
Args:
actual (str): result string
expected (str): gold string
Return:
float: the score
"""
if not actual:
return 0.
if actual == rules['expect']:
return 1.0
# TODO: can use text embedding to get non-zero return
return 0.0
if __name__ == '__main__':
print(compare_text_file("README.md", "README.md"))
def is_extension_installed(actual: str, rules: Dict, **options):
if rules['type'] == 'contain':
if rules['expected'] in actual:
return 1.0
return 0.0
elif rules['type'] == 'not_contain':
if rules['expected'] not in actual:
return 1.0
return 0.0
else:
raise NotImplementedError

View File

@@ -71,3 +71,10 @@ You can use accerciser to check the accessibility tree on GNOME VM.
```sh
sudo apt install accerciser
```
### Additional Installation
Activating the window manager control requires the installation of `wmctrl`:
```bash
sudo apt install wmctrl
```

View File

@@ -3,29 +3,26 @@ import os
import platform
import subprocess
from pathlib import Path
from typing import Any, Optional
from typing import List, Dict
import Xlib
import lxml.etree
from lxml.etree import _Element
import pyatspi
import pyautogui
import requests
from PIL import Image
from Xlib import display, X
from flask import Flask, request, jsonify, send_file, abort
from lxml.etree import _Element
from pyatspi import Accessible, StateType
from pyatspi import Action as ATAction
from pyatspi import Component, Document
from pyatspi import Text as ATText
from pyatspi import Value as ATValue
from pyatspi import Action as ATAction
from typing import List, Dict
from typing import Any, Optional
import Xlib
import pyautogui
from PIL import Image
from Xlib import display, X
from pyxcursor import Xcursor
import requests
from flask import Flask, request, jsonify, send_file, abort
from werkzeug.utils import secure_filename
app = Flask(__name__)
pyautogui.PAUSE = 0
@@ -141,22 +138,24 @@ def get_terminal_output():
xpath = '//application[@name="gnome-terminal-server"]/frame[@st:active="true"]//terminal[@st:focused="true"]'
terminals: List[_Element] = desktop_xml.xpath(xpath, namespaces=_accessibility_ns_map)
output = terminals[0].text.rstrip() if len(terminals) == 1 else None
else: # windows and macos platform is not implemented currently
else: # windows and macos platform is not implemented currently
raise NotImplementedError
return jsonify({"output": output, "status": "success"})
except:
return jsonify({"output": None, "status": "error"})
_accessibility_ns_map = { "st": "uri:deskat:state.at-spi.gnome.org"
, "attr": "uri:deskat:attributes.at-spi.gnome.org"
, "cp": "uri:deskat:component.at-spi.gnome.org"
, "doc": "uri:deskat:document.at-spi.gnome.org"
, "docattr": "uri:deskat:attributes.document.at-spi.gnome.org"
, "txt": "uri:deskat:text.at-spi.gnome.org"
, "val": "uri:deskat:value.at-spi.gnome.org"
, "act": "uri:deskat:action.at-spi.gnome.org"
}
_accessibility_ns_map = {"st": "uri:deskat:state.at-spi.gnome.org"
, "attr": "uri:deskat:attributes.at-spi.gnome.org"
, "cp": "uri:deskat:component.at-spi.gnome.org"
, "doc": "uri:deskat:document.at-spi.gnome.org"
, "docattr": "uri:deskat:attributes.document.at-spi.gnome.org"
, "txt": "uri:deskat:text.at-spi.gnome.org"
, "val": "uri:deskat:value.at-spi.gnome.org"
, "act": "uri:deskat:action.at-spi.gnome.org"
}
def _create_node(node: Accessible) -> _Element:
attribute_dict: Dict[str, Any] = {"name": node.name}
@@ -164,11 +163,11 @@ def _create_node(node: Accessible) -> _Element:
states: List[StateType] = node.getState().get_states()
for st in states:
state_name: str = StateType._enum_lookup[st]
attribute_dict[ "{{{:}}}{:}"\
.format( _accessibility_ns_map["st"]
, state_name.split("_", maxsplit=1)[1].lower()
)
] = "true"
attribute_dict["{{{:}}}{:}" \
.format(_accessibility_ns_map["st"]
, state_name.split("_", maxsplit=1)[1].lower()
)
] = "true"
# }}} States #
# Attributes {{{ #
@@ -177,11 +176,11 @@ def _create_node(node: Accessible) -> _Element:
attribute_name: str
attribute_value: str
attribute_name, attribute_value = attrbt.split(":", maxsplit=1)
attribute_dict[ "{{{:}}}{:}"\
.format( _accessibility_ns_map["attr"]
, attribute_name
)
] = attribute_value
attribute_dict["{{{:}}}{:}" \
.format(_accessibility_ns_map["attr"]
, attribute_name
)
] = attribute_value
# }}} Attributes #
# Component {{{ #
@@ -190,9 +189,12 @@ def _create_node(node: Accessible) -> _Element:
except NotImplementedError:
pass
else:
attribute_dict["{{{:}}}screencoord".format(_accessibility_ns_map["cp"])] = str(component.getPosition(pyatspi.XY_SCREEN))
attribute_dict["{{{:}}}windowcoord".format(_accessibility_ns_map["cp"])] = str(component.getPosition(pyatspi.XY_WINDOW))
attribute_dict["{{{:}}}parentcoord".format(_accessibility_ns_map["cp"])] = str(component.getPosition(pyatspi.XY_PARENT))
attribute_dict["{{{:}}}screencoord".format(_accessibility_ns_map["cp"])] = str(
component.getPosition(pyatspi.XY_SCREEN))
attribute_dict["{{{:}}}windowcoord".format(_accessibility_ns_map["cp"])] = str(
component.getPosition(pyatspi.XY_WINDOW))
attribute_dict["{{{:}}}parentcoord".format(_accessibility_ns_map["cp"])] = str(
component.getPosition(pyatspi.XY_PARENT))
attribute_dict["{{{:}}}size".format(_accessibility_ns_map["cp"])] = str(component.getSize())
# }}} Component #
@@ -209,11 +211,11 @@ def _create_node(node: Accessible) -> _Element:
attribute_name: str
attribute_value: str
attribute_name, attribute_value = attrbt.split(":", maxsplit=1)
attribute_dict[ "{{{:}}}{:}"\
.format( _accessibility_ns_map["docattr"]
, attribute_name
)
] = attribute_value
attribute_dict["{{{:}}}{:}" \
.format(_accessibility_ns_map["docattr"]
, attribute_name
)
] = attribute_value
# }}} Document #
# Text {{{ #
@@ -223,13 +225,13 @@ def _create_node(node: Accessible) -> _Element:
pass
else:
# only text shown on current screen is available
#attribute_dict["txt:text"] = text_obj.getText(0, text_obj.characterCount)
# attribute_dict["txt:text"] = text_obj.getText(0, text_obj.characterCount)
text: str = text_obj.getText(0, text_obj.characterCount)
# }}} Text #
# Selection {{{ #
try:
node.querySelection()
node.querySelection()
except NotImplementedError:
pass
else:
@@ -256,34 +258,36 @@ def _create_node(node: Accessible) -> _Element:
else:
for i in range(action.nActions):
action_name: str = action.getName(i).replace(" ", "-")
attribute_dict[ "{{{:}}}{:}_desc"\
.format( _accessibility_ns_map["act"]
, action_name
)
] = action.getDescription(i)
attribute_dict[ "{{{:}}}{:}_kb"\
.format( _accessibility_ns_map["act"]
, action_name
)
] = action.getKeyBinding(i)
attribute_dict["{{{:}}}{:}_desc" \
.format(_accessibility_ns_map["act"]
, action_name
)
] = action.getDescription(i)
attribute_dict["{{{:}}}{:}_kb" \
.format(_accessibility_ns_map["act"]
, action_name
)
] = action.getKeyBinding(i)
# }}} Action #
xml_node = lxml.etree.Element( node.getRoleName().replace(" ", "-")
, attrib=attribute_dict
, nsmap=_accessibility_ns_map
)
if "text" in locals() and len(text)>0:
xml_node = lxml.etree.Element(node.getRoleName().replace(" ", "-")
, attrib=attribute_dict
, nsmap=_accessibility_ns_map
)
if "text" in locals() and len(text) > 0:
xml_node.text = text
for ch in node:
xml_node.append(_create_node(ch))
return xml_node
@app.route("/accessibility", methods=["GET"])
def get_accessibility_tree():
desktop: Accessible = pyatspi.Registry.getDesktop(0)
desktop_xml: _Element = _create_node(desktop)
return jsonify({"AT": lxml.etree.tostring(desktop_xml, encoding="unicode")})
@app.route('/screen_size', methods=['POST'])
def get_screen_size():
d = display.Display()
@@ -563,5 +567,43 @@ def open_file():
return f"Failed to open {path}. Error: {e}", 500
@app.route("/setup/activate_window", methods=['POST'])
def activate_window():
data = request.json
window_name = data.get('window_name', None)
os_name = platform.system()
if os_name == 'Windows':
import pygetwindow as gw
try:
# Find the VS Code window
vscode_window = gw.getWindowsWithTitle(window_name)[0]
# Activate the window, bringing it to the front
vscode_window.activate()
except IndexError:
return "VS Code window not found.", 404
elif os_name == 'Darwin':
import pygetwindow as gw
try:
# Find the VS Code window
vscode_window = gw.getWindowsWithTitle(window_name)[0]
# Un-minimize the window and then bring it to the front
vscode_window.unminimize()
vscode_window.activate()
except IndexError:
return "VS Code window not found.", 404
elif os_name == 'Linux':
# Attempt to activate VS Code window using wmctrl
subprocess.Popen(["wmctrl", "-a", window_name])
else:
return f"Operating system {os_name} not supported.", 400
return "File opened successfully", 200
if __name__ == '__main__':
app.run(debug=True, host="0.0.0.0")