Merge branch 'main' of github.com:ztjhz/DesktopEnv
This commit is contained in:
@@ -11,8 +11,9 @@ logger = logging.getLogger("desktopenv.pycontroller")
|
||||
|
||||
|
||||
class PythonController:
|
||||
def __init__(self, http_server: str, pkgs_prefix: str = "python -c \"import pyautogui; {command}\""):
|
||||
self.http_server = http_server
|
||||
def __init__(self, vm_ip: str, pkgs_prefix: str = "import pyautogui; {command}"):
|
||||
self.vm_ip = vm_ip
|
||||
self.http_server = f"http://{vm_ip}:5000"
|
||||
self.pkgs_prefix = pkgs_prefix # fixme: this is a hacky way to execute python commands. fix it and combine it with installation of packages
|
||||
|
||||
def get_screenshot(self):
|
||||
@@ -26,6 +27,16 @@ class PythonController:
|
||||
logger.error("Failed to get screenshot. Status code: %d", response.status_code)
|
||||
return None
|
||||
|
||||
def get_terminal_output(self):
|
||||
""" Gets the terminal output from the server. None -> no terminal output or unexpected error.
|
||||
"""
|
||||
response = requests.get(self.http_server + "/terminal")
|
||||
if response.status_code == 200:
|
||||
return response.json()["output"]
|
||||
else:
|
||||
logger.error("Failed to get terminal output. Status code: %d", response.status_code)
|
||||
return None
|
||||
|
||||
def get_accessibility_tree(self) -> Optional[str]:
|
||||
|
||||
response: requests.Response = requests.get(self.http_server + "/accessibility")
|
||||
@@ -52,8 +63,8 @@ class PythonController:
|
||||
Executes a python command on the server.
|
||||
It can be used to execute the pyautogui commands, or... any other python command. who knows?
|
||||
"""
|
||||
command = self.pkgs_prefix.format(command=command)
|
||||
payload = json.dumps({"command": command, "shell": True})
|
||||
command_list = ["python", "-c", self.pkgs_prefix.format(command=command)]
|
||||
payload = json.dumps({"command": command_list, "shell": False})
|
||||
headers = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
@@ -185,7 +196,8 @@ class PythonController:
|
||||
elif action_type == "TYPING":
|
||||
if "text" not in parameters:
|
||||
raise Exception(f"Unknown parameters: {parameters}")
|
||||
text = parameters["text"]
|
||||
# deal with special ' and \ characters
|
||||
text = parameters["text"].replace("\\", "\\\\").replace("'", "\\'")
|
||||
self.execute_python_command(f"pyautogui.typewrite('{text}')")
|
||||
|
||||
elif action_type == "PRESS":
|
||||
@@ -268,15 +280,3 @@ class PythonController:
|
||||
else:
|
||||
logger.error("Failed to get wallpaper. Status code: %d", response.status_code)
|
||||
return None
|
||||
|
||||
# VLC
|
||||
def get_vlc_status(self, host='localhost', port=8080, password='password'):
|
||||
url = f'http://{host}:{port}/requests/status.xml'
|
||||
|
||||
response = requests.get(url, auth=('', password))
|
||||
|
||||
if response.status_code == 200:
|
||||
return response.content
|
||||
else:
|
||||
logger.error("Failed to get vlc status. Status code: %d", response.status_code)
|
||||
return None
|
||||
|
||||
@@ -1,23 +1,25 @@
|
||||
import requests
|
||||
import json
|
||||
from requests_toolbelt.multipart.encoder import MultipartEncoder
|
||||
|
||||
import uuid
|
||||
import time
|
||||
import os.path
|
||||
import traceback
|
||||
import uuid
|
||||
|
||||
from typing import Dict, List
|
||||
from typing import Any, Union, Optional
|
||||
|
||||
import requests
|
||||
from playwright.sync_api import sync_playwright
|
||||
from requests_toolbelt.multipart.encoder import MultipartEncoder
|
||||
from desktop_env.evaluators.metrics.utils import compare_urls
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger("desktopenv.setup")
|
||||
|
||||
import traceback
|
||||
import time
|
||||
|
||||
class SetupController:
|
||||
def __init__(self, http_server: str, cache_dir: str):
|
||||
self.http_server: str = http_server
|
||||
self.http_server_setup_root = http_server + "/setup"
|
||||
def __init__(self, vm_ip: str, cache_dir: str):
|
||||
self.vm_ip: str = vm_ip
|
||||
self.http_server: str = f"http://{vm_ip}:5000"
|
||||
self.cache_dir: str = cache_dir
|
||||
|
||||
def reset_cache_dir(self, cache_dir: str):
|
||||
@@ -141,8 +143,8 @@ class SetupController:
|
||||
|
||||
# send request to server to upload file
|
||||
try:
|
||||
logger.debug("REQUEST ADDRESS: %s", self.http_server_setup_root + "/upload")
|
||||
response = requests.post(self.http_server_setup_root + "/upload", headers=headers, data=form)
|
||||
logger.debug("REQUEST ADDRESS: %s", self.http_server + "/setup" + "/upload")
|
||||
response = requests.post(self.http_server + "/setup" + "/upload", headers=headers, data=form)
|
||||
if response.status_code == 200:
|
||||
logger.info("Command executed successfully: %s", response.text)
|
||||
else:
|
||||
@@ -167,7 +169,7 @@ class SetupController:
|
||||
|
||||
# send request to server to change wallpaper
|
||||
try:
|
||||
response = requests.post(self.http_server_setup_root + "/change_wallpaper", headers=headers, data=payload)
|
||||
response = requests.post(self.http_server + "/setup" + "/change_wallpaper", headers=headers, data=payload)
|
||||
if response.status_code == 200:
|
||||
logger.info("Command executed successfully: %s", response.text)
|
||||
else:
|
||||
@@ -194,7 +196,7 @@ class SetupController:
|
||||
|
||||
# send request to server to open file
|
||||
try:
|
||||
response = requests.post(self.http_server_setup_root + "/open_file", headers=headers, data=payload)
|
||||
response = requests.post(self.http_server + "/setup" + "/open_file", headers=headers, data=payload)
|
||||
if response.status_code == 200:
|
||||
logger.info("Command executed successfully: %s", response.text)
|
||||
else:
|
||||
@@ -214,7 +216,7 @@ class SetupController:
|
||||
headers = {"Content-Type": "application/json"}
|
||||
|
||||
try:
|
||||
response = requests.post(self.http_server_setup_root + "/launch", headers=headers, data=payload)
|
||||
response = requests.post(self.http_server + "/setup" + "/launch", headers=headers, data=payload)
|
||||
if response.status_code == 200:
|
||||
logger.info("Command executed successfully: %s", response.text)
|
||||
else:
|
||||
@@ -280,6 +282,7 @@ class SetupController:
|
||||
def _act_setup(self, action_seq: List[Union[Dict[str, Any], str]]):
|
||||
# TODO
|
||||
raise NotImplementedError()
|
||||
|
||||
def _replay_setup(self, trajectory: str):
|
||||
"""
|
||||
Args:
|
||||
@@ -288,3 +291,84 @@ class SetupController:
|
||||
|
||||
# TODO
|
||||
raise NotImplementedError()
|
||||
|
||||
# Chrome setup
|
||||
def _chrome_open_tabs_setup(self, urls_to_open: List[str]):
|
||||
host = self.vm_ip
|
||||
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
||||
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
with sync_playwright() as p:
|
||||
browser = None
|
||||
for attempt in range(15):
|
||||
try:
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
break
|
||||
except Exception as e:
|
||||
if attempt < 14:
|
||||
logger.error(f"Attempt {attempt + 1}: Failed to connect, retrying. Error: {e}")
|
||||
time.sleep(1)
|
||||
else:
|
||||
logger.error(f"Failed to connect after multiple attempts: {e}")
|
||||
raise e
|
||||
|
||||
if not browser:
|
||||
return
|
||||
|
||||
for i, url in enumerate(urls_to_open):
|
||||
# Use the first context (which should be the only one if using default profile)
|
||||
if i == 0:
|
||||
context = browser.contexts[0]
|
||||
|
||||
page = context.new_page() # Create a new page (tab) within the existing context
|
||||
page.goto(url)
|
||||
logger.info(f"Opened tab {i + 1}: {url}")
|
||||
|
||||
if i == 0:
|
||||
# clear the default tab
|
||||
default_page = context.pages[0]
|
||||
default_page.close()
|
||||
|
||||
# Do not close the context or browser; they will remain open after script ends
|
||||
return browser, context
|
||||
|
||||
def _chrome_close_tabs_setup(self, urls_to_close: List[str]):
|
||||
time.sleep(5) # Wait for Chrome to finish launching
|
||||
|
||||
host = self.vm_ip
|
||||
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
||||
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
with sync_playwright() as p:
|
||||
browser = None
|
||||
for attempt in range(15):
|
||||
try:
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
break
|
||||
except Exception as e:
|
||||
if attempt < 14:
|
||||
logger.error(f"Attempt {attempt + 1}: Failed to connect, retrying. Error: {e}")
|
||||
time.sleep(1)
|
||||
else:
|
||||
logger.error(f"Failed to connect after multiple attempts: {e}")
|
||||
raise e
|
||||
|
||||
if not browser:
|
||||
return
|
||||
|
||||
for i, url in enumerate(urls_to_close):
|
||||
# Use the first context (which should be the only one if using default profile)
|
||||
if i == 0:
|
||||
context = browser.contexts[0]
|
||||
|
||||
for page in context.pages:
|
||||
|
||||
# if two urls are the same, close the tab
|
||||
if compare_urls(page.url, url):
|
||||
context.pages.pop(context.pages.index(page))
|
||||
page.close()
|
||||
logger.info(f"Closed tab {i + 1}: {url}")
|
||||
break
|
||||
|
||||
# Do not close the context or browser; they will remain open after script ends
|
||||
return browser, context
|
||||
|
||||
@@ -80,13 +80,12 @@ class DesktopEnv(gym.Env):
|
||||
logger.info("Initializing...")
|
||||
self._start_emulator()
|
||||
self.vm_ip = self._get_vm_ip()
|
||||
self.host = f"http://{self.vm_ip}:5000"
|
||||
self.controller = PythonController(http_server=self.host)
|
||||
self.setup_controller = SetupController(http_server=self.host, cache_dir=self.cache_dir)
|
||||
self.controller = PythonController(vm_ip=self.vm_ip)
|
||||
self.setup_controller = SetupController(vm_ip=self.vm_ip, cache_dir=self.cache_dir)
|
||||
|
||||
# Meta info of the VM
|
||||
self.vm_platform = self.controller.get_vm_platform()
|
||||
self.vm_screen_size = self.controller.get_vm_screen_size()
|
||||
# Meta info of the VM, move to the reset() function
|
||||
self.vm_platform: str = "" # self.controller.get_vm_platform()
|
||||
self.vm_screen_size = None # self.controller.get_vm_screen_size()
|
||||
|
||||
# mode: human or machine
|
||||
assert action_space in ["computer_13", "pyautogui"]
|
||||
@@ -193,6 +192,10 @@ class DesktopEnv(gym.Env):
|
||||
self._start_emulator()
|
||||
logger.info("Emulator started.")
|
||||
|
||||
logger.info("Get meta info of the VM...")
|
||||
self.vm_platform = self.controller.get_vm_platform()
|
||||
self.vm_screen_size = self.controller.get_vm_screen_size()
|
||||
|
||||
logger.info("Setting up environment...")
|
||||
self.setup_controller.setup(self.config)
|
||||
|
||||
@@ -218,6 +221,7 @@ class DesktopEnv(gym.Env):
|
||||
time.sleep(pause)
|
||||
observation = {
|
||||
"screenshot": self._get_obs(),
|
||||
"terminal": self.controller.get_terminal_output(),
|
||||
"instruction": self.instruction
|
||||
}
|
||||
reward = 0 # todo: Define reward calculation for each example
|
||||
|
||||
@@ -2,6 +2,6 @@ from .file import get_cloud_file, get_vm_file, get_cache_file
|
||||
from .info import get_vm_screen_size, get_vm_window_size, get_vm_wallpaper
|
||||
from .misc import get_rule, get_accessibility_tree
|
||||
from .vlc import get_vlc_playing_info, get_vlc_config
|
||||
from .chrome import get_default_search_engine, get_bookmarks
|
||||
from .chrome import get_default_search_engine, get_bookmarks, get_open_tabs_info
|
||||
from .replay import get_replay
|
||||
from .vscode import get_vscode_config
|
||||
|
||||
@@ -4,6 +4,8 @@ import os
|
||||
import sqlite3
|
||||
from typing import Dict
|
||||
|
||||
from playwright.sync_api import sync_playwright
|
||||
|
||||
logger = logging.getLogger("desktopenv.getters.chrome")
|
||||
|
||||
"""
|
||||
@@ -13,15 +15,20 @@ WARNING:
|
||||
"""
|
||||
|
||||
|
||||
# The following ones just need to load info from the files of software, no need to connect to the software
|
||||
def get_default_search_engine(env, config: Dict[str, str]):
|
||||
os_type = env.vm_platform
|
||||
if os_type == 'Windows':
|
||||
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
|
||||
'Google\\Chrome\\User Data\\Default\\Preferences'))""")['output'].strip()
|
||||
elif os_type == 'Darwin':
|
||||
preference_file_path = env.controller.execute_python_command("import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")['output'].strip()
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
preference_file_path = env.controller.execute_python_command("import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")['output'].strip()
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -41,12 +48,16 @@ def get_default_search_engine(env, config: Dict[str, str]):
|
||||
def get_cookie_data(env, config: Dict[str, str]):
|
||||
os_type = env.vm_platform
|
||||
if os_type == 'Windows':
|
||||
chrome_cookie_file_path = os.path.join(os.getenv('LOCALAPPDATA'), 'Google\\Chrome\\User Data\\Default\\Cookies')
|
||||
chrome_cookie_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
|
||||
'Google\\Chrome\\User Data\\Default\\Cookies'))""")['output'].strip()
|
||||
elif os_type == 'Darwin':
|
||||
chrome_cookie_file_path = os.path.join(os.getenv('HOME'),
|
||||
'Library/Application Support/Google/Chrome/Default/Cookies')
|
||||
chrome_cookie_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Cookies'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
chrome_cookie_file_path = os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Cookies')
|
||||
chrome_cookie_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Cookies'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -70,13 +81,16 @@ def get_cookie_data(env, config: Dict[str, str]):
|
||||
def get_bookmarks(env, config: Dict[str, str]):
|
||||
os_type = env.vm_platform
|
||||
if os_type == 'Windows':
|
||||
preference_file_path = os.path.join(os.getenv('LOCALAPPDATA'),
|
||||
'Google\\Chrome\\User Data\\Default\\Bookmarks')
|
||||
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
|
||||
'Google\\Chrome\\User Data\\Default\\Bookmarks'))""")['output'].strip()
|
||||
elif os_type == 'Darwin':
|
||||
preference_file_path = os.path.join(os.getenv('HOME'),
|
||||
'Library/Application Support/Google/Chrome/Default/Bookmarks')
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Bookmarks'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
preference_file_path = os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Bookmarks')
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Bookmarks'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -98,13 +112,16 @@ def get_extensions_installed_from_shop(env, config: Dict[str, str]):
|
||||
"""Find the Chrome extensions directory based on the operating system."""
|
||||
os_type = env.vm_platform
|
||||
if os_type == 'Windows':
|
||||
chrome_extension_dir = os.path.expanduser(
|
||||
'~') + '\\AppData\\Local\\Google\\Chrome\\User Data\\Default\\Extensions\\'
|
||||
chrome_extension_dir = env.controller.execute_python_command(
|
||||
"""os.path.expanduser('~') + '\\AppData\\Local\\Google\\Chrome\\User Data\\Default\\Extensions\\'""")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Darwin': # macOS
|
||||
chrome_extension_dir = os.path.expanduser(
|
||||
'~') + '/Library/Application Support/Google/Chrome/Default/Extensions/'
|
||||
chrome_extension_dir = env.controller.execute_python_command(
|
||||
"""os.path.expanduser('~') + '/Library/Application Support/Google/Chrome/Default/Extensions/'""")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
chrome_extension_dir = os.path.expanduser('~') + '/.config/google-chrome/Default/Extensions/'
|
||||
chrome_extension_dir = env.controller.execute_python_command(
|
||||
"""os.path.expanduser('~') + '/.config/google-chrome/Default/Extensions/'""")['output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -124,3 +141,52 @@ def get_extensions_installed_from_shop(env, config: Dict[str, str]):
|
||||
except json.JSONDecodeError:
|
||||
logger.error(f"Error reading {manifest_path}")
|
||||
return manifests
|
||||
|
||||
|
||||
# The following ones require Playwright to be installed on the target machine, and the chrome needs to be pre-config on
|
||||
# port info to allow remote debugging, see README.md for details
|
||||
|
||||
def get_open_tabs_info(env, config: Dict[str, str]):
|
||||
host = env.vm_ip
|
||||
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
||||
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
with sync_playwright() as p:
|
||||
# connect to remote Chrome instance
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
|
||||
tabs_info = []
|
||||
for context in browser.contexts:
|
||||
for page in context.pages:
|
||||
title = page.title()
|
||||
url = page.url
|
||||
tabs_info.append({'title': title, 'url': url})
|
||||
|
||||
browser.close()
|
||||
return tabs_info
|
||||
|
||||
|
||||
def get_active_tab_info(env, config: Dict[str, str]):
|
||||
host = env.vm_ip
|
||||
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
||||
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
with sync_playwright() as p:
|
||||
# connect to remote Chrome instance
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
|
||||
active_tab_info = {}
|
||||
for context in browser.contexts:
|
||||
for page in context.pages:
|
||||
if page.is_visible("body"): # check the visibility of the page body to determine the active status
|
||||
active_tab_info = {
|
||||
'title': page.title(),
|
||||
'url': page.url,
|
||||
'content': page.content() # get the HTML content of the page
|
||||
}
|
||||
break
|
||||
if active_tab_info:
|
||||
break
|
||||
|
||||
browser.close()
|
||||
return active_tab_info
|
||||
|
||||
@@ -2,6 +2,8 @@ import logging
|
||||
import os
|
||||
from typing import Dict
|
||||
|
||||
import requests
|
||||
|
||||
logger = logging.getLogger("desktopenv.getters.vlc")
|
||||
|
||||
|
||||
@@ -15,7 +17,14 @@ def get_vlc_playing_info(env, config: Dict[str, str]):
|
||||
password = 'password'
|
||||
|
||||
_path = os.path.join(env.cache_dir, config["dest"])
|
||||
content = env.controller.get_vlc_status(host, port, password)
|
||||
url = f'http://{host}:{port}/requests/status.xml'
|
||||
response = requests.get(url, auth=('', password))
|
||||
if response.status_code == 200:
|
||||
content = response.content
|
||||
else:
|
||||
logger.error("Failed to get vlc status. Status code: %d", response.status_code)
|
||||
return None
|
||||
|
||||
with open(_path, "wb") as f:
|
||||
f.write(content)
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from .chrome import is_expected_tabs, is_expected_bookmarks
|
||||
from .docs import compare_font_names, compare_subscript_contains, has_page_numbers_in_footers
|
||||
from .docs import find_default_font, contains_page_break, compare_docx_files, compare_docx_tables, compare_line_spacing, \
|
||||
compare_insert_equation
|
||||
@@ -9,8 +10,7 @@ from .table import check_sheet_list, check_xlsx_freeze, check_xlsx_zoom
|
||||
from .table import compare_table
|
||||
from .vlc import is_vlc_playing, is_vlc_recordings_folder, is_vlc_fullscreen, compare_images, compare_audios, \
|
||||
compare_videos
|
||||
|
||||
from .gimp import increase_saturation, decrease_brightness
|
||||
from .general import check_csv, check_accessibility_tree, check_list
|
||||
from .gimp import increase_saturation, decrease_brightness, check_file_exists, compare_triangle_positions
|
||||
from .general import check_csv, check_accessibility_tree, check_list, run_sqlite3
|
||||
from .thunderbird import check_thunderbird_prefs, check_thunderbird_filter
|
||||
|
||||
|
||||
@@ -1,52 +1,39 @@
|
||||
import logging
|
||||
|
||||
from playwright.sync_api import sync_playwright
|
||||
from typing import Any, Dict, List
|
||||
from desktop_env.evaluators.metrics.utils import are_lists_equal, compare_urls
|
||||
|
||||
logger = logging.getLogger("desktopenv.metrics.chrome")
|
||||
|
||||
|
||||
def is_expected_tabs(open_tabs: List[Dict[str, str]], rule: Dict[str, Any]) -> float:
|
||||
"""
|
||||
Checks if the expected tabs are open in Chrome.
|
||||
"""
|
||||
|
||||
print(open_tabs, rule)
|
||||
match_type = rule['type']
|
||||
|
||||
if match_type == "url":
|
||||
expected_urls = rule['urls']
|
||||
actual_urls = [tab['url'] for tab in open_tabs]
|
||||
return 1 if are_lists_equal(expected_urls, actual_urls, compare_urls) else 0
|
||||
else:
|
||||
logger.error(f"Unknown type: {match_type}")
|
||||
return 0
|
||||
|
||||
|
||||
# todo: move to getter module
|
||||
def is_expected_bookmarks(bookmarks: List[Dict[str, Any]], rule: Dict[str, Any]) -> float:
|
||||
"""
|
||||
Checks if the expected bookmarks are in Chrome.
|
||||
"""
|
||||
|
||||
# The following ones just need to load info from the files of software, no need to connect to the software
|
||||
# todo
|
||||
match_type = rule['type']
|
||||
|
||||
|
||||
# The following ones require Playwright to be installed on the target machine, and the chrome needs to be pre-config on port info to allow remote debugging, see README.md for details
|
||||
|
||||
def get_open_tabs_info(remote_debugging_url):
|
||||
with sync_playwright() as p:
|
||||
# connect to remote Chrome instance
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
|
||||
tabs_info = []
|
||||
for context in browser.contexts:
|
||||
for page in context.pages:
|
||||
title = page.title()
|
||||
url = page.url
|
||||
tabs_info.append({'title': title, 'url': url})
|
||||
|
||||
browser.close()
|
||||
return tabs_info
|
||||
|
||||
|
||||
def get_active_tab_info(remote_debugging_url):
|
||||
with sync_playwright() as p:
|
||||
# connect to remote Chrome instance
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
|
||||
active_tab_info = {}
|
||||
for context in browser.contexts:
|
||||
for page in context.pages():
|
||||
if page.is_visible("body"): # check the visibility of the page body to determine the active status
|
||||
active_tab_info = {
|
||||
'title': page.title(),
|
||||
'url': page.url,
|
||||
'content': page.content() # get the HTML content of the page
|
||||
}
|
||||
break
|
||||
if active_tab_info:
|
||||
break
|
||||
|
||||
browser.close()
|
||||
return active_tab_info
|
||||
if match_type == "url":
|
||||
expected_urls = rule['urls']
|
||||
actual_urls = [bookmark['url'] for bookmark in bookmarks]
|
||||
return 1 if are_lists_equal(expected_urls, actual_urls, compare_urls) else 0
|
||||
else:
|
||||
logger.error(f"Unknown type: {match_type}")
|
||||
return 0
|
||||
|
||||
@@ -13,6 +13,8 @@ from rapidfuzz import fuzz
|
||||
|
||||
from .utils import _match_record
|
||||
|
||||
import sqlite3
|
||||
|
||||
def exact_match(result, rules) -> float:
|
||||
expect = rules["expected"]
|
||||
print(result, expect)
|
||||
@@ -146,3 +148,8 @@ def check_accessibility_tree(result: str, rules: Dict[str, Any]) -> float:
|
||||
|
||||
# def check_existence(result: str, *args) -> float:
|
||||
# return 1. - (result is None)
|
||||
|
||||
def run_sqlite3(result: str, rules: Dict[str, Any]) -> float:
|
||||
connection: sqlite3.Connection = sqlite3.connect(result)
|
||||
cursor: sqlite3.Cursor = connection.execute(rules["sql"])
|
||||
return float(cursor.fetchone()[0] or 0)
|
||||
|
||||
@@ -20,6 +20,10 @@ def get_gimp_export_path():
|
||||
# Handle the case where the configuration file is not found
|
||||
print("GIMP configuration file not found")
|
||||
return False
|
||||
|
||||
def check_file_exists(directory, filename):
|
||||
file_path = os.path.join(directory, filename)
|
||||
return 1 if os.path.isfile(file_path) else 0
|
||||
|
||||
def increase_saturation(image1_path: str, image2_path: str) -> float:
|
||||
def calculate_saturation(image):
|
||||
@@ -61,6 +65,49 @@ def decrease_brightness(image1_path: str, image2_path: str) -> float:
|
||||
brightness2 = calculate_brightness(image2)
|
||||
|
||||
return 1 if brightness1 > brightness2 else 0
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
def find_yellow_triangle(image):
|
||||
# Convert the image to RGBA
|
||||
rgba = cv2.cvtColor(image, cv2.COLOR_BGR2RGBA)
|
||||
|
||||
# define range of yellow color in HSV
|
||||
lower_yellow = np.array([0, 0, 0], dtype=np.uint8)
|
||||
upper_yellow = np.array([255, 255, 255], dtype=np.uint8)
|
||||
|
||||
# expand the dimensions of lower and upper yellow to match the image dimensions
|
||||
lower_yellow = np.reshape(lower_yellow, (1, 1, 3))
|
||||
upper_yellow = np.reshape(upper_yellow, (1, 1, 3))
|
||||
# build a mask for the yellow color
|
||||
mask = cv2.inRange(rgba, lower_yellow, upper_yellow)
|
||||
|
||||
# search for contours in the mask
|
||||
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
# choose the largest contour
|
||||
max_contour = max(contours, key=cv2.contourArea)
|
||||
|
||||
# calculate the center of the contour
|
||||
M = cv2.moments(max_contour)
|
||||
cx = int(M['m10'] / M['m00'])
|
||||
cy = int(M['m01'] / M['m00'])
|
||||
|
||||
return cx, cy
|
||||
|
||||
def compare_triangle_positions(image1, image2):
|
||||
image1 = cv2.imread(image1, cv2.IMREAD_COLOR)
|
||||
image2 = cv2.imread(image2, cv2.IMREAD_COLOR)
|
||||
# find the center of the yellow triangle in each image
|
||||
cx1, cy1 = find_yellow_triangle(image1)
|
||||
cx2, cy2 = find_yellow_triangle(image2)
|
||||
|
||||
# calculate the distance between the center of the triangle and the center of the image
|
||||
center_distance1 = np.sqrt((cx1 - image1.shape[1] // 2)**2 + (cy1 - image1.shape[0] // 2)**2)
|
||||
center_distance2 = np.sqrt((cx2 - image2.shape[1] // 2)**2 + (cy2 - image2.shape[0] // 2)**2)
|
||||
|
||||
return 1 if center_distance1 > center_distance2 else 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
image1_path = "../Downloads/1.png"
|
||||
|
||||
@@ -85,7 +85,7 @@ def check_thunderbird_prefs(result: str, rule: Dict[str, Dict[str, Dict[str, Any
|
||||
|
||||
_value_processor: Callable[[str], str] = lambda val: val.replace("\\\"", "\"").replace("\\\\", "\\")
|
||||
#_condition_pattern: Pattern[str] = re.compile(r'(?P<type>AND|OR) \((?P<key>[\w ]+),(?P<rel>[\w ' + '\'' + r']+),(?:"(?P<val2>(?:[^"]|\")+)"|(?P<val1>[^)]+))\)')
|
||||
_condition_pattern: Pattern[str] = re.compile(r'(?:AND|OR) \((?:[\w ]+),(?:[\w ' + '\'' + r']+),(?:"(?:(?:[^"]|\")+)"|(?:[^)]+))\)')
|
||||
_condition_pattern: Pattern[str] = re.compile(r'\b(?:AND|OR) \((?:[\w ]+),(?:[\w ' + '\'' + r']+),(?:"(?:(?:[^"]|\")+)"|(?:[^)]+))\)|\bALL\b')
|
||||
def check_thunderbird_filter(result: str, rules: Dict[str, List[Dict[str, str]]]) -> float:
|
||||
"""
|
||||
Args:
|
||||
|
||||
@@ -1,26 +1,29 @@
|
||||
import zipfile
|
||||
import lxml.etree
|
||||
import lxml.cssselect
|
||||
from lxml.etree import _Element
|
||||
import xmltodict
|
||||
import openpyxl
|
||||
from openpyxl import Workbook
|
||||
from openpyxl.worksheet.worksheet import Worksheet
|
||||
from openpyxl.chart._chart import ChartBase
|
||||
|
||||
from typing import Dict, List, Set
|
||||
from typing import Any
|
||||
|
||||
import logging
|
||||
import zipfile
|
||||
from typing import Any
|
||||
from typing import Dict, List, Set
|
||||
from urllib.parse import urlparse, urlunparse
|
||||
|
||||
import lxml.cssselect
|
||||
import lxml.etree
|
||||
import openpyxl
|
||||
import xmltodict
|
||||
from lxml.etree import _Element
|
||||
from openpyxl import Workbook
|
||||
from openpyxl.chart._chart import ChartBase
|
||||
from openpyxl.worksheet.worksheet import Worksheet
|
||||
|
||||
logger = logging.getLogger("desktopenv.metrics.utils")
|
||||
|
||||
_xlsx_namespaces = [ ("x14", "http://schemas.microsoft.com/office/spreadsheetml/2009/9/main")
|
||||
, ("xm", "http://schemas.microsoft.com/office/excel/2006/main")
|
||||
]
|
||||
_xlsx_namespaces = [("x14", "http://schemas.microsoft.com/office/spreadsheetml/2009/9/main")
|
||||
, ("xm", "http://schemas.microsoft.com/office/excel/2006/main")
|
||||
]
|
||||
_xlsx_ns_mapping = dict(_xlsx_namespaces)
|
||||
_xlsx_ns_imapping = dict(map(lambda itm: (itm[1], itm[0]), _xlsx_namespaces))
|
||||
_sparklines_selector = lxml.cssselect.CSSSelector("x14|sparkline", namespaces=_xlsx_ns_mapping)
|
||||
#print(_sparklines_selector.css)
|
||||
|
||||
|
||||
# print(_sparklines_selector.css)
|
||||
def load_sparklines(xlsx_file: str) -> Dict[str, str]:
|
||||
"""
|
||||
This function modifies data_frame in-place
|
||||
@@ -44,13 +47,14 @@ def load_sparklines(xlsx_file: str) -> Dict[str, str]:
|
||||
sparklines_dict: Dict[str, str] = {}
|
||||
for sp_l in sparklines:
|
||||
sparkline_xml: str = lxml.etree.tostring(sp_l, encoding="unicode")
|
||||
sparkline: Dict[str, Dict[str, str]] = xmltodict.parse( sparkline_xml
|
||||
, process_namespaces=True
|
||||
, namespaces=_xlsx_ns_imapping
|
||||
)
|
||||
sparkline: Dict[str, Dict[str, str]] = xmltodict.parse(sparkline_xml
|
||||
, process_namespaces=True
|
||||
, namespaces=_xlsx_ns_imapping
|
||||
)
|
||||
sparklines_dict[sparkline["x14:sparkline"]["xm:sqref"]] = sparkline["x14:sparkline"]["xm:f"]
|
||||
return sparklines_dict
|
||||
|
||||
|
||||
# Available Chart Properties:
|
||||
# title: str
|
||||
# anchor: ["oneCell" | "twoCell" | "absolute", col0, row0, col1, row1]
|
||||
@@ -70,7 +74,7 @@ def load_charts(xlsx_file: Workbook, **options) -> Dict[str, Any]:
|
||||
Dict[str, Any]: information of charts
|
||||
"""
|
||||
|
||||
#workbook: Workbook = openpyxl.load_workbook(filename=xlsx_file)
|
||||
# workbook: Workbook = openpyxl.load_workbook(filename=xlsx_file)
|
||||
worksheet: Worksheet = xlsx_file.active
|
||||
charts: List[ChartBase] = worksheet._charts
|
||||
|
||||
@@ -79,22 +83,22 @@ def load_charts(xlsx_file: Workbook, **options) -> Dict[str, Any]:
|
||||
for ch in charts:
|
||||
series: List[str] = []
|
||||
for ser in ch.series:
|
||||
value_num = ser.val.numRef.f\
|
||||
if hasattr(ser.val, "numRef") and hasattr(ser.val.numRef, "f")\
|
||||
else ""
|
||||
value_str = ser.val.strRef.f\
|
||||
if hasattr(ser.val, "strRef") and hasattr(ser.val.strRef, "f")\
|
||||
else ""
|
||||
categ_num = ser.cat.numRef.f\
|
||||
if hasattr(ser.cat, "numRef") and hasattr(ser.cat.numRef, "f")\
|
||||
else ""
|
||||
categ_str = ser.cat.strRef.f\
|
||||
if hasattr(ser.cat, "strRef") and hasattr(ser.cat.strRef, "f")\
|
||||
else ""
|
||||
series.append( "{:},{:},{:},{:}".format( value_num, value_str
|
||||
value_num = ser.val.numRef.f \
|
||||
if hasattr(ser.val, "numRef") and hasattr(ser.val.numRef, "f") \
|
||||
else ""
|
||||
value_str = ser.val.strRef.f \
|
||||
if hasattr(ser.val, "strRef") and hasattr(ser.val.strRef, "f") \
|
||||
else ""
|
||||
categ_num = ser.cat.numRef.f \
|
||||
if hasattr(ser.cat, "numRef") and hasattr(ser.cat.numRef, "f") \
|
||||
else ""
|
||||
categ_str = ser.cat.strRef.f \
|
||||
if hasattr(ser.cat, "strRef") and hasattr(ser.cat.strRef, "f") \
|
||||
else ""
|
||||
series.append("{:},{:},{:},{:}".format(value_num, value_str
|
||||
, categ_num, categ_str
|
||||
)
|
||||
)
|
||||
)
|
||||
series: str = ";".join(series)
|
||||
|
||||
# TODO: maybe more aspects, like chart type
|
||||
@@ -103,10 +107,10 @@ def load_charts(xlsx_file: Workbook, **options) -> Dict[str, Any]:
|
||||
if "title" in chart_props:
|
||||
info["title"] = ch.title.tx.rich.p[0].r[0].t
|
||||
if "anchor" in chart_props:
|
||||
info["anchor"] = [ ch.anchor.editAs
|
||||
, ch.anchor._from.col, ch.anchor.to.row
|
||||
, ch.anchor.to.col, ch.anchor.to.row
|
||||
]
|
||||
info["anchor"] = [ch.anchor.editAs
|
||||
, ch.anchor._from.col, ch.anchor.to.row
|
||||
, ch.anchor.to.col, ch.anchor.to.row
|
||||
]
|
||||
if "width" in chart_props:
|
||||
info["width"] = ch.width
|
||||
if "height" in chart_props:
|
||||
@@ -125,43 +129,87 @@ def load_charts(xlsx_file: Workbook, **options) -> Dict[str, Any]:
|
||||
chart_set[series] = info
|
||||
return chart_set
|
||||
|
||||
|
||||
def _match_record(pattern: Dict[str, Any], item: Dict[str, Any]) -> bool:
|
||||
return all(k in item and item[k]==val for k, val in pattern.items())
|
||||
return all(k in item and item[k] == val for k, val in pattern.items())
|
||||
|
||||
|
||||
def are_lists_equal(list1, list2, comparison_func):
|
||||
# First check if both lists have the same length
|
||||
if len(list1) != len(list2):
|
||||
return False
|
||||
|
||||
# Now make sure each element in one list has an equal element in the other list
|
||||
for item1 in list1:
|
||||
# Use the supplied function to test for an equal item
|
||||
if not any(comparison_func(item1, item2) for item2 in list2):
|
||||
return False
|
||||
|
||||
# If all items match, the lists are equal
|
||||
return True
|
||||
|
||||
|
||||
def compare_urls(url1, url2):
|
||||
def normalize_url(url):
|
||||
# Parse the URL
|
||||
parsed_url = urlparse(url)
|
||||
|
||||
# If no scheme is present, assume 'http'
|
||||
scheme = parsed_url.scheme if parsed_url.scheme else 'http'
|
||||
|
||||
# Lowercase the scheme and netloc, remove 'www.', and handle trailing slash
|
||||
normalized_netloc = parsed_url.netloc.lower().replace("www.", "")
|
||||
normalized_path = parsed_url.path if parsed_url.path != '/' else ''
|
||||
|
||||
# Reassemble the URL with normalized components
|
||||
normalized_parsed_url = parsed_url._replace(scheme=scheme.lower(), netloc=normalized_netloc,
|
||||
path=normalized_path)
|
||||
normalized_url = urlunparse(normalized_parsed_url)
|
||||
|
||||
return normalized_url
|
||||
|
||||
# Normalize both URLs for comparison
|
||||
norm_url1 = normalize_url(url1)
|
||||
norm_url2 = normalize_url(url2)
|
||||
|
||||
# Compare the normalized URLs
|
||||
return norm_url1 == norm_url2
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
path1 = "../../../../../任务数据/LibreOffice Calc/Create_column_charts_using_statistics_gold_line_scatter.xlsx"
|
||||
workbook1: Workbook = openpyxl.load_workbook(filename=path1)
|
||||
worksheet1: Worksheet = workbook1.active
|
||||
charts: List[ChartBase] = worksheet1._charts
|
||||
#print(len(charts))
|
||||
#print(type(charts[0]))
|
||||
#
|
||||
#print(len(charts[0].series))
|
||||
#print(type(charts[0].series[0]))
|
||||
#print(type(charts[0].series[0].val))
|
||||
# print(len(charts))
|
||||
# print(type(charts[0]))
|
||||
#
|
||||
# print(len(charts[0].series))
|
||||
# print(type(charts[0].series[0]))
|
||||
# print(type(charts[0].series[0].val))
|
||||
##print(charts[0].series[0].val)
|
||||
#print(charts[0].series[0].val.numRef.f)
|
||||
#
|
||||
#print(type(charts[0].series[0].cat))
|
||||
# print(charts[0].series[0].val.numRef.f)
|
||||
#
|
||||
# print(type(charts[0].series[0].cat))
|
||||
##print(charts[0].series[0].cat)
|
||||
#print(charts[0].series[0].cat.numRef)
|
||||
#print(charts[0].series[0].cat.strRef)
|
||||
#print(charts[0].series[0].cat.strRef.f)
|
||||
# print(charts[0].series[0].cat.numRef)
|
||||
# print(charts[0].series[0].cat.strRef)
|
||||
# print(charts[0].series[0].cat.strRef.f)
|
||||
|
||||
#print(type(charts[0].title.tx.strRef))
|
||||
#print(type(charts[0].title.tx.rich))
|
||||
#print(type(charts[0].title.txPr))
|
||||
#print(len(charts[0].title.tx.rich.p))
|
||||
#print(len(charts[0].title.tx.rich.p[0].r))
|
||||
#print(type(charts[0].title.tx.rich.p[0].r[0]))
|
||||
#print(type(charts[0].title.tx.rich.p[0].r[0].t))
|
||||
#print(charts[0].title.tx.rich.p[0].r[0].t)
|
||||
# print(type(charts[0].title.tx.strRef))
|
||||
# print(type(charts[0].title.tx.rich))
|
||||
# print(type(charts[0].title.txPr))
|
||||
# print(len(charts[0].title.tx.rich.p))
|
||||
# print(len(charts[0].title.tx.rich.p[0].r))
|
||||
# print(type(charts[0].title.tx.rich.p[0].r[0]))
|
||||
# print(type(charts[0].title.tx.rich.p[0].r[0].t))
|
||||
# print(charts[0].title.tx.rich.p[0].r[0].t)
|
||||
|
||||
#print(type(charts[0].anchor))
|
||||
#print(charts[0].anchor.editAs)
|
||||
#print(charts[0].anchor._from.col, charts[0].anchor.to.row)
|
||||
#print(charts[0].anchor.to.col, charts[0].anchor.to.row)
|
||||
# print(type(charts[0].anchor))
|
||||
# print(charts[0].anchor.editAs)
|
||||
# print(charts[0].anchor._from.col, charts[0].anchor.to.row)
|
||||
# print(charts[0].anchor.to.col, charts[0].anchor.to.row)
|
||||
|
||||
#df1 = pd.read_excel(path1)
|
||||
#print(df1)
|
||||
# df1 = pd.read_excel(path1)
|
||||
# print(df1)
|
||||
print(load_charts(path1, chart_props=["title", "xtitle", "ytitle", "type"]))
|
||||
|
||||
@@ -14,7 +14,7 @@ from pyatspi import Value as ATValue
|
||||
from pyatspi import Action as ATAction
|
||||
|
||||
from typing import List, Dict
|
||||
from typing import Any
|
||||
from typing import Any, Optional
|
||||
|
||||
import Xlib
|
||||
import pyautogui
|
||||
@@ -114,6 +114,39 @@ def capture_screen_with_cursor():
|
||||
|
||||
return send_file(file_path, mimetype='image/png')
|
||||
|
||||
|
||||
def _has_active_terminal(desktop: Accessible) -> bool:
|
||||
""" A quick check whether the terminal window is open and active.
|
||||
"""
|
||||
for app in desktop:
|
||||
if app.getRoleName() == "application" and app.name == "gnome-terminal-server":
|
||||
for frame in app:
|
||||
if frame.getRoleName() == "frame" and frame.getState().contains(pyatspi.STATE_ACTIVE):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
@app.route('/terminal', methods=['GET'])
|
||||
def get_terminal_output():
|
||||
user_platform = platform.system()
|
||||
output: Optional[str] = None
|
||||
try:
|
||||
if user_platform == "Linux":
|
||||
desktop: Accessible = pyatspi.Registry.getDesktop(0)
|
||||
if _has_active_terminal(desktop):
|
||||
desktop_xml: _Element = _create_node(desktop)
|
||||
# 1. the terminal window (frame of application is st:active) is open and active
|
||||
# 2. the terminal tab (terminal status is st:focused) is focused
|
||||
xpath = '//application[@name="gnome-terminal-server"]/frame[@st:active="true"]//terminal[@st:focused="true"]'
|
||||
terminals: List[_Element] = desktop_xml.xpath(xpath, namespaces=_accessibility_ns_map)
|
||||
output = terminals[0].text.rstrip() if len(terminals) == 1 else None
|
||||
else: # windows and macos platform is not implemented currently
|
||||
raise NotImplementedError
|
||||
return jsonify({"output": output, "status": "success"})
|
||||
except:
|
||||
return jsonify({"output": None, "status": "error"})
|
||||
|
||||
|
||||
_accessibility_ns_map = { "st": "uri:deskat:state.at-spi.gnome.org"
|
||||
, "attr": "uri:deskat:attributes.at-spi.gnome.org"
|
||||
, "cp": "uri:deskat:component.at-spi.gnome.org"
|
||||
|
||||
Reference in New Issue
Block a user