Merge branch 'main' of https://github.com/xlang-ai/DesktopEnv
This commit is contained in:
@@ -137,7 +137,7 @@ class DesktopEnv(gym.Env):
|
||||
logger.info("Getting IP Address...")
|
||||
for _ in range(max_retries):
|
||||
try:
|
||||
output = _execute_command(["vmrun", "-T", "ws", "getGuestIPAddress", self.path_to_vm]).strip()
|
||||
output = _execute_command(["vmrun", "-T", "ws", "getGuestIPAddress", self.path_to_vm, "-wait"]).strip()
|
||||
logger.info(f"IP address: {output}")
|
||||
return output
|
||||
except Exception as e:
|
||||
@@ -316,6 +316,9 @@ class DesktopEnv(gym.Env):
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
else:
|
||||
if len(self.action_history) > 0 and self.action_history[-1] == "FAIL":
|
||||
return 0
|
||||
|
||||
if type(self.metric) == list:
|
||||
results = []
|
||||
|
||||
@@ -6,6 +6,7 @@ from .chrome import (
|
||||
get_pdf_from_url,
|
||||
get_shortcuts_on_desktop,
|
||||
get_history,
|
||||
get_page_info,
|
||||
get_enabled_experiments,
|
||||
get_chrome_language,
|
||||
get_chrome_font_size,
|
||||
@@ -16,14 +17,20 @@ from .chrome import (
|
||||
get_enable_do_not_track,
|
||||
get_enable_enhanced_safety_browsing,
|
||||
get_new_startup_page,
|
||||
get_find_unpacked_extension_path
|
||||
get_find_unpacked_extension_path,
|
||||
get_data_delete_automacally,
|
||||
get_active_tab_html_parse,
|
||||
get_active_tab_url_parse,
|
||||
get_gotoRecreationPage_and_get_html_content,
|
||||
get_url_dashPart,
|
||||
get_active_url_from_accessTree
|
||||
)
|
||||
from .file import get_cloud_file, get_vm_file, get_cache_file
|
||||
from .general import get_vm_command_line, get_vm_terminal_output
|
||||
from .gimp import get_gimp_config_file
|
||||
from .impress import get_audio_in_slide
|
||||
from .info import get_vm_screen_size, get_vm_window_size, get_vm_wallpaper, get_list_directory
|
||||
from .misc import get_rule, get_accessibility_tree
|
||||
from .misc import get_rule, get_accessibility_tree, get_rule_relativeTime
|
||||
from .replay import get_replay
|
||||
from .vlc import get_vlc_playing_info, get_vlc_config, get_default_video_player
|
||||
from .vscode import get_vscode_config
|
||||
|
||||
@@ -1,11 +1,30 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import sqlite3
|
||||
from typing import Dict, Any
|
||||
import time
|
||||
from typing import Dict, Any, List
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
|
||||
import lxml.etree
|
||||
import requests
|
||||
from lxml.cssselect import CSSSelector
|
||||
from lxml.etree import _Element
|
||||
from playwright.sync_api import sync_playwright
|
||||
from pydrive.auth import GoogleAuth
|
||||
from pydrive.drive import GoogleDrive, GoogleDriveFileList, GoogleDriveFile
|
||||
from playwright.sync_api import sync_playwright
|
||||
|
||||
_accessibility_ns_map = {
|
||||
"st": "uri:deskat:state.at-spi.gnome.org",
|
||||
"attr": "uri:deskat:attributes.at-spi.gnome.org",
|
||||
"cp": "uri:deskat:component.at-spi.gnome.org",
|
||||
"doc": "uri:deskat:document.at-spi.gnome.org",
|
||||
"docattr": "uri:deskat:attributes.document.at-spi.gnome.org",
|
||||
"txt": "uri:deskat:text.at-spi.gnome.org",
|
||||
"val": "uri:deskat:value.at-spi.gnome.org",
|
||||
"act": "uri:deskat:action.at-spi.gnome.org"
|
||||
}
|
||||
|
||||
logger = logging.getLogger("desktopenv.getters.chrome")
|
||||
|
||||
@@ -27,9 +46,12 @@ def get_default_search_engine(env, config: Dict[str, str]):
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
if "arm" in platform.machine():
|
||||
raise NotImplementedError
|
||||
else:
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -60,9 +82,12 @@ def get_cookie_data(env, config: Dict[str, str]):
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Cookies'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
chrome_cookie_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Cookies'))")[
|
||||
'output'].strip()
|
||||
if "arm" in platform.machine():
|
||||
raise NotImplementedError
|
||||
else:
|
||||
chrome_cookie_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Cookies'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -96,9 +121,12 @@ def get_history(env, config: Dict[str, str]):
|
||||
"""import os; print(os.path.join(os.getenv('HOME'), "Library", "Application Support", "Google", "Chrome", "Default", "History"))""")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
chrome_history_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config', 'google-chrome', 'Default', 'History'))")[
|
||||
'output'].strip()
|
||||
if "arm" in platform.machine():
|
||||
raise NotImplementedError
|
||||
else:
|
||||
chrome_history_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config', 'google-chrome', 'Default', 'History'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -132,9 +160,12 @@ def get_enabled_experiments(env, config: Dict[str, str]):
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Local State'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Local State'))")[
|
||||
'output'].strip()
|
||||
if "arm" in platform.machine():
|
||||
raise NotImplementedError
|
||||
else:
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Local State'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -164,9 +195,12 @@ def get_profile_name(env, config: Dict[str, str]):
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
if "arm" in platform.machine():
|
||||
raise NotImplementedError
|
||||
else:
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -193,9 +227,12 @@ def get_chrome_language(env, config: Dict[str, str]):
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Local State'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Local State'))")[
|
||||
'output'].strip()
|
||||
if "arm" in platform.machine():
|
||||
raise NotImplementedError
|
||||
else:
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Local State'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -222,9 +259,12 @@ def get_chrome_font_size(env, config: Dict[str, str]):
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
if "arm" in platform.machine():
|
||||
raise NotImplementedError
|
||||
else:
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -257,9 +297,12 @@ def get_bookmarks(env, config: Dict[str, str]):
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Bookmarks'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Bookmarks'))")[
|
||||
'output'].strip()
|
||||
if "arm" in platform.machine():
|
||||
raise NotImplementedError
|
||||
else:
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Bookmarks'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -284,8 +327,11 @@ def get_extensions_installed_from_shop(env, config: Dict[str, str]):
|
||||
"""os.path.expanduser('~') + '/Library/Application Support/Google/Chrome/Default/Extensions/'""")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
chrome_extension_dir = env.controller.execute_python_command(
|
||||
"""os.path.expanduser('~') + '/.config/google-chrome/Default/Extensions/'""")['output'].strip()
|
||||
if "arm" in platform.machine():
|
||||
raise NotImplementedError
|
||||
else:
|
||||
chrome_extension_dir = env.controller.execute_python_command(
|
||||
"""os.path.expanduser('~') + '/.config/google-chrome/Default/Extensions/'""")['output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -310,6 +356,57 @@ def get_extensions_installed_from_shop(env, config: Dict[str, str]):
|
||||
# The following ones require Playwright to be installed on the target machine, and the chrome needs to be pre-config on
|
||||
# port info to allow remote debugging, see README.md for details
|
||||
|
||||
def get_page_info(env, config: Dict[str, str]):
|
||||
host = env.vm_ip
|
||||
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
||||
url = config["url"]
|
||||
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
with sync_playwright() as p:
|
||||
# connect to remote Chrome instance
|
||||
try:
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
except Exception as e:
|
||||
# If the connection fails, start a new browser instance
|
||||
platform.machine()
|
||||
if "arm" in platform.machine():
|
||||
# start a new browser instance if the connection fails
|
||||
payload = json.dumps({"command": [
|
||||
"chromium",
|
||||
"--remote-debugging-port=1337"
|
||||
], "shell": False})
|
||||
else:
|
||||
payload = json.dumps({"command": [
|
||||
"google-chrome",
|
||||
"--remote-debugging-port=1337"
|
||||
], "shell": False})
|
||||
|
||||
headers = {"Content-Type": "application/json"}
|
||||
requests.post("http://" + host + ":5000/setup" + "/launch", headers=headers, data=payload)
|
||||
time.sleep(5)
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
|
||||
page = browser.contexts[0].new_page()
|
||||
page.goto(url)
|
||||
|
||||
try:
|
||||
# Wait for the page to finish loading, this prevents the "execution context was destroyed" issue
|
||||
page.wait_for_load_state('load') # Wait for the 'load' event to complete
|
||||
title = page.title()
|
||||
url = page.url
|
||||
page_info = {'title': title, 'url': url, 'content': page.content()}
|
||||
except TimeoutError:
|
||||
# If page loading times out, catch the exception and store the current information in the list
|
||||
page_info = {'title': 'Load timeout', 'url': page.url, 'content': page.content()}
|
||||
except Exception as e:
|
||||
# Catch other potential exceptions that might occur while reading the page title
|
||||
print(f'Error: {e}')
|
||||
page_info = {'title': 'Error encountered', 'url': page.url, 'content': page.content()}
|
||||
|
||||
browser.close()
|
||||
return page_info
|
||||
|
||||
|
||||
def get_open_tabs_info(env, config: Dict[str, str]):
|
||||
host = env.vm_ip
|
||||
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
||||
@@ -317,14 +414,37 @@ def get_open_tabs_info(env, config: Dict[str, str]):
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
with sync_playwright() as p:
|
||||
# connect to remote Chrome instance
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
try:
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
except Exception as e:
|
||||
# If the connection fails, start a new browser instance
|
||||
platform.machine()
|
||||
if "arm" in platform.machine():
|
||||
# start a new browser instance if the connection fails
|
||||
payload = json.dumps({"command": [
|
||||
"chromium",
|
||||
"--remote-debugging-port=1337"
|
||||
], "shell": False})
|
||||
else:
|
||||
payload = json.dumps({"command": [
|
||||
"google-chrome",
|
||||
"--remote-debugging-port=1337"
|
||||
], "shell": False})
|
||||
|
||||
headers = {"Content-Type": "application/json"}
|
||||
requests.post("http://" + host + ":5000/setup" + "/launch", headers=headers, data=payload)
|
||||
time.sleep(5)
|
||||
try:
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
except Exception as e:
|
||||
return []
|
||||
|
||||
tabs_info = []
|
||||
for context in browser.contexts:
|
||||
for page in context.pages:
|
||||
try:
|
||||
# Wait for the page to finish loading, this prevents the "execution context was destroyed" issue
|
||||
page.wait_for_load_state('load') # Wait for the 'load' event to complete
|
||||
page.wait_for_load_state('networkidle') # Wait for the 'load' event to complete
|
||||
title = page.title()
|
||||
url = page.url
|
||||
tabs_info.append({'title': title, 'url': url})
|
||||
@@ -340,32 +460,103 @@ def get_open_tabs_info(env, config: Dict[str, str]):
|
||||
return tabs_info
|
||||
|
||||
|
||||
def get_active_url_from_accessTree(env, config):
|
||||
"""
|
||||
Playwright cannot get the url of active tab directly,
|
||||
so we need to use accessibility tree to get the active tab info.
|
||||
This function is used to get the active tab url from the accessibility tree.
|
||||
config:
|
||||
Dict[str, str]{
|
||||
# we no longer need to specify the xpath or selectors, since we will use defalut value
|
||||
# 'xpath':
|
||||
# the same as in metrics.general.accessibility_tree.
|
||||
# 'selectors':
|
||||
# the same as in metrics.general.accessibility_tree.
|
||||
'goto_prefix':
|
||||
the prefix you want to add to the beginning of the url to be opened, default is "https://",
|
||||
(the url we get from accTree does not have prefix)
|
||||
...(other keys, not used in this function)
|
||||
}
|
||||
Return
|
||||
url: str
|
||||
"""
|
||||
accessibility_tree: str = env.controller.get_accessibility_tree()
|
||||
# download accessibility tree to "/home/user/Desktop"
|
||||
logger.debug("AT@eval: %s", accessibility_tree)
|
||||
# first, use accessibility API to get the active tab URL
|
||||
at: _Element = lxml.etree.fromstring(accessibility_tree)
|
||||
arch = platform.machine()
|
||||
print("Your architecture is: {}".format(arch))
|
||||
try:
|
||||
if "arm" in arch:
|
||||
selector = CSSSelector("application[name=Chromium] entry[name=Address\\ and\\ search\\ bar]",
|
||||
namespaces=_accessibility_ns_map)
|
||||
else:
|
||||
selector = CSSSelector("application[name=Google\\ Chrome] entry[name=Address\\ and\\ search\\ bar]",
|
||||
namespaces=_accessibility_ns_map)
|
||||
except:
|
||||
logger.error("Failed to parse the selector for active tab URL")
|
||||
return None
|
||||
elements: List[_Element] = selector(at)
|
||||
# if "xpath" in config:
|
||||
# elements: List[_Element] = at.xpath(config["xpath"], namespaces=_accessibility_ns_map)
|
||||
# elif "selectors" in config:
|
||||
# selector = CSSSelector(", ".join(config["selectors"]), namespaces=_accessibility_ns_map)
|
||||
# elements: List[_Element] = selector(at)
|
||||
if len(elements) == 0:
|
||||
print("no elements found")
|
||||
return None
|
||||
active_tab_url = config["goto_prefix"] + elements[0].text if "goto_prefix" in config.keys() else "https://" + \
|
||||
elements[0].text
|
||||
print("active tab url now: {}".format(active_tab_url))
|
||||
return active_tab_url
|
||||
|
||||
|
||||
def get_active_tab_info(env, config: Dict[str, str]):
|
||||
"""
|
||||
This function is used to get all info about active tab.
|
||||
Warning! This function will reload the target-url page
|
||||
If the tartget url has cache or cookie, this function may reload to another page.
|
||||
If you have tested the url will not pop up to another page (check in incongnito mode yourself first),
|
||||
you can use this function.
|
||||
config: Dict[str, str]{
|
||||
# Keys used in get_active_url_from_accessTree: "xpath", "selectors"
|
||||
}
|
||||
"""
|
||||
active_tab_url = get_active_url_from_accessTree(env, config)
|
||||
if active_tab_url is None:
|
||||
logger.error("Failed to get the url of active tab")
|
||||
return None
|
||||
host = env.vm_ip
|
||||
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
||||
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
with sync_playwright() as p:
|
||||
# connect to remote Chrome instance
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
# connect to remote Chrome instance, since it is supposed to be the active one, we won't start a new one if failed
|
||||
try:
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
except Exception as e:
|
||||
return None
|
||||
|
||||
active_tab_info = {}
|
||||
for context in browser.contexts:
|
||||
for page in context.pages:
|
||||
if page.is_visible("body"): # check the visibility of the page body to determine the active status
|
||||
active_tab_info = {
|
||||
'title': page.title(),
|
||||
'url': page.url,
|
||||
'content': page.content() # get the HTML content of the page
|
||||
}
|
||||
break
|
||||
if active_tab_info:
|
||||
break
|
||||
# go to the target URL page
|
||||
page = browser.new_page()
|
||||
try:
|
||||
page.goto(active_tab_url)
|
||||
except:
|
||||
logger.error("Failed to go to the target URL page")
|
||||
return None
|
||||
page.wait_for_load_state('load') # Wait for the 'load' event to complete
|
||||
active_tab_info = {
|
||||
'title': page.title(),
|
||||
'url': page.url,
|
||||
'content': page.content() # get the HTML content of the page
|
||||
}
|
||||
|
||||
browser.close()
|
||||
print("active_tab_title: {}".format(active_tab_info.get('title', 'None')))
|
||||
print("active_tab_url: {}".format(active_tab_info.get('url', 'None')))
|
||||
print("active_tab_content: {}".format(active_tab_info.get('content', 'None')))
|
||||
# print("active_tab_title: {}".format(active_tab_info.get('title', 'None')))
|
||||
# print("active_tab_url: {}".format(active_tab_info.get('url', 'None')))
|
||||
# print("active_tab_content: {}".format(active_tab_info.get('content', 'None')))
|
||||
return active_tab_info
|
||||
|
||||
|
||||
@@ -382,7 +573,28 @@ def get_pdf_from_url(env, config: Dict[str, str]) -> str:
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
try:
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
except Exception as e:
|
||||
# If the connection fails, start a new browser instance
|
||||
platform.machine()
|
||||
if "arm" in platform.machine():
|
||||
# start a new browser instance if the connection fails
|
||||
payload = json.dumps({"command": [
|
||||
"chromium",
|
||||
"--remote-debugging-port=1337"
|
||||
], "shell": False})
|
||||
else:
|
||||
payload = json.dumps({"command": [
|
||||
"google-chrome",
|
||||
"--remote-debugging-port=1337"
|
||||
], "shell": False})
|
||||
|
||||
headers = {"Content-Type": "application/json"}
|
||||
requests.post("http://" + host + ":5000/setup" + "/launch", headers=headers, data=payload)
|
||||
time.sleep(5)
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
|
||||
page = browser.new_page()
|
||||
page.goto(_url)
|
||||
page.pdf(path=_path)
|
||||
@@ -399,7 +611,27 @@ def get_chrome_saved_address(env, config: Dict[str, str]):
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
with sync_playwright() as p:
|
||||
# connect to remote Chrome instance
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
try:
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
except Exception as e:
|
||||
# If the connection fails, start a new browser instance
|
||||
platform.machine()
|
||||
if "arm" in platform.machine():
|
||||
# start a new browser instance if the connection fails
|
||||
payload = json.dumps({"command": [
|
||||
"chromium",
|
||||
"--remote-debugging-port=1337"
|
||||
], "shell": False})
|
||||
else:
|
||||
payload = json.dumps({"command": [
|
||||
"google-chrome",
|
||||
"--remote-debugging-port=1337"
|
||||
], "shell": False})
|
||||
|
||||
headers = {"Content-Type": "application/json"}
|
||||
requests.post("http://" + host + ":5000/setup" + "/launch", headers=headers, data=payload)
|
||||
time.sleep(5)
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
|
||||
page = browser.new_page()
|
||||
|
||||
@@ -457,7 +689,27 @@ def get_number_of_search_results(env, config: Dict[str, str]):
|
||||
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
try:
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
except Exception as e:
|
||||
# If the connection fails, start a new browser instance
|
||||
platform.machine()
|
||||
if "arm" in platform.machine():
|
||||
# start a new browser instance if the connection fails
|
||||
payload = json.dumps({"command": [
|
||||
"chromium",
|
||||
"--remote-debugging-port=1337"
|
||||
], "shell": False})
|
||||
else:
|
||||
payload = json.dumps({"command": [
|
||||
"google-chrome",
|
||||
"--remote-debugging-port=1337"
|
||||
], "shell": False})
|
||||
|
||||
headers = {"Content-Type": "application/json"}
|
||||
requests.post("http://" + host + ":5000/setup" + "/launch", headers=headers, data=payload)
|
||||
time.sleep(5)
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
page = browser.new_page()
|
||||
page.goto(url)
|
||||
search_results = page.query_selector_all(result_selector)
|
||||
@@ -487,9 +739,9 @@ def get_googledrive_file(env, config: Dict[str, Any]) -> str:
|
||||
for q in _query:
|
||||
search = f'( {q} ) and "{parent_id}" in parents'
|
||||
filelist: GoogleDriveFileList = drive.ListFile({'q': search}).GetList()
|
||||
if len(filelist) == 0: # target file not found
|
||||
if len(filelist) == 0: # target file not found
|
||||
return None
|
||||
file: GoogleDriveFile = filelist[0] # HACK: if multiple candidates, just use the first one
|
||||
file: GoogleDriveFile = filelist[0] # HACK: if multiple candidates, just use the first one
|
||||
parent_id = file['id']
|
||||
|
||||
file.GetContentFile(_path, mimetype=file['mimeType'])
|
||||
@@ -501,8 +753,9 @@ def get_googledrive_file(env, config: Dict[str, Any]) -> str:
|
||||
if 'query' in config:
|
||||
return get_single_file(config['query'], os.path.join(env.cache_dir, config['dest']))
|
||||
elif 'path' in config:
|
||||
query = [f"title = '{fp}' and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if idx < len(config['path']) - 1
|
||||
else f"title = '{fp}' and trashed = false" for idx, fp in enumerate(config['path'])]
|
||||
query = [f"title = '{fp}' and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if idx < len(
|
||||
config['path']) - 1
|
||||
else f"title = '{fp}' and trashed = false" for idx, fp in enumerate(config['path'])]
|
||||
return get_single_file(query, os.path.join(env.cache_dir, config['dest']))
|
||||
elif 'query_list' in config:
|
||||
_path_list = []
|
||||
@@ -511,12 +764,14 @@ def get_googledrive_file(env, config: Dict[str, Any]) -> str:
|
||||
dest = config['dest'][idx]
|
||||
_path_list.append(get_single_file(query, os.path.join(env.cache_dir, dest)))
|
||||
return _path_list
|
||||
else: # path_list in config
|
||||
else: # path_list in config
|
||||
_path_list = []
|
||||
assert len(config['path_list']) == len(config['dest'])
|
||||
for idx, path in enumerate(config['path_list']):
|
||||
query = [f"title = '{fp}' and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if jdx < len(path) - 1
|
||||
else f"title = '{fp}' and trashed = false" for jdx, fp in enumerate(path)]
|
||||
query = [
|
||||
f"title = '{fp}' and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if jdx < len(
|
||||
path) - 1
|
||||
else f"title = '{fp}' and trashed = false" for jdx, fp in enumerate(path)]
|
||||
dest = config['dest'][idx]
|
||||
_path_list.append(get_single_file(query, os.path.join(env.cache_dir, dest)))
|
||||
return _path_list
|
||||
@@ -532,12 +787,15 @@ def get_enable_do_not_track(env, config: Dict[str, str]):
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
# preference_file_path = env.controller.execute_python_command(
|
||||
# "import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
||||
# 'output'].strip()
|
||||
if "arm" in platform.machine():
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -545,7 +803,7 @@ def get_enable_do_not_track(env, config: Dict[str, str]):
|
||||
content = env.controller.get_file(preference_file_path)
|
||||
data = json.loads(content)
|
||||
|
||||
if_enable_do_not_track = data.get('enable_do_not_track', {}) # bool
|
||||
if_enable_do_not_track = data.get('enable_do_not_track', {}) # bool
|
||||
return "true" if if_enable_do_not_track else "false"
|
||||
except Exception as e:
|
||||
logger.error(f"Error: {e}")
|
||||
@@ -562,12 +820,15 @@ def get_enable_enhanced_safety_browsing(env, config: Dict[str, str]):
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
# preference_file_path = env.controller.execute_python_command(
|
||||
# "import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
||||
# 'output'].strip()
|
||||
if "arm" in platform.machine():
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -575,7 +836,7 @@ def get_enable_enhanced_safety_browsing(env, config: Dict[str, str]):
|
||||
content = env.controller.get_file(preference_file_path)
|
||||
data = json.loads(content)
|
||||
|
||||
if_enable_do_not_track = data.get('safebrowsing', {}).get('enhanced', {}) # bool
|
||||
if_enable_do_not_track = data.get('safebrowsing', {}).get('enhanced', {}) # bool
|
||||
return "true" if if_enable_do_not_track else "false"
|
||||
except Exception as e:
|
||||
logger.error(f"Error: {e}")
|
||||
@@ -592,12 +853,15 @@ def get_new_startup_page(env, config: Dict[str, str]):
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
# preference_file_path = env.controller.execute_python_command(
|
||||
# "import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
||||
# 'output'].strip()
|
||||
if "arm" in platform.machine():
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -610,7 +874,7 @@ def get_new_startup_page(env, config: Dict[str, str]):
|
||||
if "session" not in data.keys():
|
||||
return "true"
|
||||
else:
|
||||
if_enable_do_not_track = data.get('session', {}).get('restore_on_startup', {}) # int, need to be 5
|
||||
if_enable_do_not_track = data.get('session', {}).get('restore_on_startup', {}) # int, need to be 5
|
||||
return "true" if if_enable_do_not_track == 5 else "false"
|
||||
except Exception as e:
|
||||
logger.error(f"Error: {e}")
|
||||
@@ -627,12 +891,15 @@ def get_find_unpacked_extension_path(env, config: Dict[str, str]):
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
# preference_file_path = env.controller.execute_python_command(
|
||||
# "import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
||||
# 'output'].strip()
|
||||
if "arm" in platform.machine():
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -648,4 +915,262 @@ def get_find_unpacked_extension_path(env, config: Dict[str, str]):
|
||||
return all_extensions_path
|
||||
except Exception as e:
|
||||
logger.error(f"Error: {e}")
|
||||
return "Google"
|
||||
return "Google"
|
||||
|
||||
|
||||
def get_data_delete_automacally(env, config: Dict[str, str]):
|
||||
"""
|
||||
This function is used to open th "auto-delete" mode of chromium
|
||||
"""
|
||||
os_type = env.vm_platform
|
||||
if os_type == 'Windows':
|
||||
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
|
||||
'Google\\Chrome\\User Data\\Default\\Preferences'))""")['output'].strip()
|
||||
elif os_type == 'Darwin':
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
if "arm" in platform.machine():
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
try:
|
||||
content = env.controller.get_file(preference_file_path)
|
||||
data = json.loads(content)
|
||||
data_delete_state = data["profile"]["exit_type"]
|
||||
return data_delete_state
|
||||
except Exception as e:
|
||||
logger.error(f"Error: {e}")
|
||||
return "Google"
|
||||
|
||||
|
||||
def get_active_tab_html_parse(env, config: Dict[str, Any]):
|
||||
"""
|
||||
This function is used to get the specific element's text content from the active tab's html.
|
||||
config:
|
||||
Dict[str, str]{
|
||||
# Keys used in get_active_url_from_accessTree: "xpath", "selectors"
|
||||
'category':
|
||||
choose from ["class", "label", "xpath", "input"], used to indicate how to find the element
|
||||
'labelObject':
|
||||
only exists when category is "label",
|
||||
a dict like { "labelSelector": "the key you want to store the text content of this label's ee=lement"}
|
||||
'class_singleObject':
|
||||
only exists when category is "class", a dict with keys as the class name,
|
||||
like { "class name" : "the key you want to store the text content of this element" }
|
||||
'class_multiObject':
|
||||
only exists when category is "class", used for elements with same class name.
|
||||
Two layer of dict, like
|
||||
( {
|
||||
"class name": {
|
||||
"rank in this class" : "the key you want to store the text content of this element"
|
||||
...
|
||||
}
|
||||
} )
|
||||
'xpathObject':
|
||||
only exists when category is "xpath", a dict with keys as the xpath,
|
||||
like { "full xpath" : "the key you want to store the text content of this element" }
|
||||
'inputObject':
|
||||
only exists when category is "input",
|
||||
a dict with keys as the input element's xpath, like { "full xpath" : "the key you want to store the text content of this element" }
|
||||
}
|
||||
"""
|
||||
active_tab_url = get_active_url_from_accessTree(env, config)
|
||||
if not isinstance(active_tab_url, str):
|
||||
return None
|
||||
host = env.vm_ip
|
||||
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
||||
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
with sync_playwright() as p:
|
||||
# connect to remote Chrome instance
|
||||
try:
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
except Exception as e:
|
||||
# If the connection fails, start a new browser instance
|
||||
platform.machine()
|
||||
if "arm" in platform.machine():
|
||||
# start a new browser instance if the connection fails
|
||||
payload = json.dumps({"command": [
|
||||
"chromium",
|
||||
"--remote-debugging-port=1337"
|
||||
], "shell": False})
|
||||
else:
|
||||
payload = json.dumps({"command": [
|
||||
"google-chrome",
|
||||
"--remote-debugging-port=1337"
|
||||
], "shell": False})
|
||||
|
||||
headers = {"Content-Type": "application/json"}
|
||||
requests.post("http://" + host + ":5000/setup" + "/launch", headers=headers, data=payload)
|
||||
time.sleep(5)
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
target_page = None
|
||||
for context in browser.contexts:
|
||||
for page in context.pages:
|
||||
page.wait_for_load_state("networkidle")
|
||||
if page.url == active_tab_url:
|
||||
target_page = page
|
||||
print("tartget page url: ", target_page.url)
|
||||
print("tartget page title: ", target_page.title())
|
||||
break
|
||||
if target_page is None:
|
||||
return {}
|
||||
return_json = {}
|
||||
if config["category"] == "class":
|
||||
# find the text of elements in html with specific class name
|
||||
class_multiObject = config["class_multiObject"]
|
||||
for key in class_multiObject.keys():
|
||||
object_dict = class_multiObject[key]
|
||||
for order_key in object_dict.keys():
|
||||
return_json[object_dict[order_key]] = target_page.query_selector_all("." + key)[
|
||||
int(order_key)].text_content().strip()
|
||||
class_singleObject = config["class_singleObject"]
|
||||
for key in class_singleObject.keys():
|
||||
return_json[class_singleObject[key]] = target_page.query_selector("." + key).text_content().strip()
|
||||
elif config['category'] == "label":
|
||||
# find the text of elements in html with specific label name
|
||||
labelObject = config["labelObject"]
|
||||
for key in labelObject.keys():
|
||||
return_json[labelObject[key]] = target_page.get_by_label(key).text_content().strip()
|
||||
elif config["category"] == "xpath":
|
||||
# find the text of elements in html with specific xpath
|
||||
xpathObject = config["xpathObject"]
|
||||
for key in xpathObject.keys():
|
||||
return_json[xpathObject[key]] = target_page.locator("xpath=" + key).text_content().strip()
|
||||
elif config["category"] == "input":
|
||||
inputObject = config["inputObject"]
|
||||
for key in inputObject.keys():
|
||||
return_json[inputObject[key]] = target_page.locator("xpath=" + key).input_value().strip()
|
||||
browser.close()
|
||||
return return_json
|
||||
|
||||
|
||||
def get_gotoRecreationPage_and_get_html_content(env, config: Dict[str, Any]):
|
||||
"""
|
||||
especially used for www.recreation.gov examples
|
||||
"""
|
||||
host = env.vm_ip
|
||||
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
||||
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
with sync_playwright() as p:
|
||||
try:
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
except Exception as e:
|
||||
# If the connection fails, start a new browser instance
|
||||
platform.machine()
|
||||
if "arm" in platform.machine():
|
||||
# start a new browser instance if the connection fails
|
||||
payload = json.dumps({"command": [
|
||||
"chromium",
|
||||
"--remote-debugging-port=1337"
|
||||
], "shell": False})
|
||||
else:
|
||||
payload = json.dumps({"command": [
|
||||
"google-chrome",
|
||||
"--remote-debugging-port=1337"
|
||||
], "shell": False})
|
||||
|
||||
headers = {"Content-Type": "application/json"}
|
||||
requests.post("http://" + host + ":5000/setup" + "/launch", headers=headers, data=payload)
|
||||
time.sleep(5)
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
page = browser.new_page()
|
||||
page.goto("https://www.recreation.gov/")
|
||||
page.fill("input#hero-search-input", "Albion Basin")
|
||||
page.click("button.nav-search-button")
|
||||
print("after first click")
|
||||
time.sleep(2)
|
||||
# Assuming .search-result-highlight--success leads to a new page or requires page load
|
||||
with page.expect_popup() as popup_info:
|
||||
page.click(".search-result-highlight--success")
|
||||
print("after second click")
|
||||
newpage = popup_info.value
|
||||
newpage.wait_for_load_state()
|
||||
print("go to newpage: ")
|
||||
print(newpage.title())
|
||||
time.sleep(2)
|
||||
newpage.click("button.next-available")
|
||||
print("after third click")
|
||||
|
||||
return_json = {}
|
||||
return_json["expected"] = {}
|
||||
# find the text of elements in html with specific class name
|
||||
if config["selector"] == "class":
|
||||
if "order" in config.keys():
|
||||
className = config["class"]
|
||||
return_json["expected"][className] = newpage.query_selector_all("." + className)[
|
||||
int(config["order"])].text_content().strip()
|
||||
else:
|
||||
className = config["class"]
|
||||
return_json["expected"][className] = newpage.query_selector("." + className).text_content().strip()
|
||||
browser.close()
|
||||
return return_json
|
||||
|
||||
|
||||
def get_active_tab_url_parse(env, config: Dict[str, Any]):
|
||||
"""
|
||||
This function is used to parse the url according to config["parse_keys"].
|
||||
config:
|
||||
'parse_keys': must exist,
|
||||
a list of keys to extract from the query parameters of the url.
|
||||
'replace': optional,
|
||||
a dict, used to replace the original key with the new key.
|
||||
( { "original key": "new key" } )
|
||||
"""
|
||||
active_tab_url = get_active_url_from_accessTree(env, config)
|
||||
if active_tab_url is None:
|
||||
return None
|
||||
|
||||
# connect to remote Chrome instance
|
||||
# parse in a hard-coded way to find the specific info about task
|
||||
parsed_url = urlparse(active_tab_url)
|
||||
# Extract the query parameters
|
||||
query_params = parse_qs(parsed_url.query)
|
||||
# Define the keys of interest
|
||||
keys_of_interest = [key for key in config["parse_keys"]]
|
||||
# Extract the parameters of interest
|
||||
extracted_params = {key: query_params.get(key, [''])[0] for key in keys_of_interest}
|
||||
if "replace" in config:
|
||||
for key in config["replace"].keys():
|
||||
# change original key to new key, keep value unchange
|
||||
value = extracted_params.pop(key)
|
||||
extracted_params[config["replace"][key]] = value
|
||||
return extracted_params
|
||||
|
||||
|
||||
def get_url_dashPart(env, config: Dict[str, str]):
|
||||
"""
|
||||
This function is used to extract one of the dash-separated part of the URL.
|
||||
config
|
||||
'partIndex': must exist,
|
||||
the index of the dash-separated part to extract, starting from 0.
|
||||
'needDeleteId': optional,
|
||||
a boolean, used to indicate whether to delete the "id" part ( an example: "/part-you-want?id=xxx" )
|
||||
'returnType': must exist,
|
||||
a string, used to indicate the return type, "string" or "json".
|
||||
"""
|
||||
active_tab_url = get_active_url_from_accessTree(env, config)
|
||||
if active_tab_url is None:
|
||||
return None
|
||||
|
||||
# extract the last dash-separated part of the URL, and delete all the characters after "id"
|
||||
dash_part = active_tab_url.split("/")[config["partIndex"]]
|
||||
if config["needDeleteId"]:
|
||||
dash_part = dash_part.split("?")[0]
|
||||
# print("active_tab_title: {}".format(active_tab_info.get('title', 'None')))
|
||||
# print("active_tab_url: {}".format(active_tab_info.get('url', 'None')))
|
||||
# print("active_tab_content: {}".format(active_tab_info.get('content', 'None')))
|
||||
if config["returnType"] == "string":
|
||||
return dash_part
|
||||
elif config["returnType"] == "json":
|
||||
return {config["key"]: dash_part}
|
||||
|
||||
@@ -1,10 +1,78 @@
|
||||
import logging
|
||||
from typing import TypeVar
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
logger = logging.getLogger("desktopenv.getters.misc")
|
||||
|
||||
R = TypeVar("Rule")
|
||||
|
||||
day_of_week_mapping = {
|
||||
0: 'Mon',
|
||||
1: 'Tue',
|
||||
2: 'Wed',
|
||||
3: 'Thu',
|
||||
4: 'Fri',
|
||||
5: 'Sat',
|
||||
6: 'Sun'
|
||||
}
|
||||
|
||||
month_mapping = {
|
||||
1: 'Jan',
|
||||
2: 'Feb',
|
||||
3: 'Mar',
|
||||
4: 'Apr',
|
||||
5: 'May',
|
||||
6: 'Jun',
|
||||
7: 'Jul',
|
||||
8: 'Aug',
|
||||
9: 'Sep',
|
||||
10: 'Oct',
|
||||
11: 'Nov',
|
||||
12: 'Dec'
|
||||
}
|
||||
|
||||
Month_Mapping_Full = {
|
||||
1: "January",
|
||||
2: "February",
|
||||
3: "March",
|
||||
4: "April",
|
||||
5: "May",
|
||||
6: "June",
|
||||
7: "July",
|
||||
8: "August",
|
||||
9: "September",
|
||||
10: "October",
|
||||
11: "November",
|
||||
12: "December"
|
||||
}
|
||||
|
||||
month_mapping_full = {
|
||||
1: 'january',
|
||||
2: 'february',
|
||||
3:'march',
|
||||
4: 'april',
|
||||
5:'may',
|
||||
6: 'june',
|
||||
7: 'july',
|
||||
8: 'august',
|
||||
9:'september',
|
||||
10: 'october',
|
||||
11: 'november',
|
||||
12: 'december'
|
||||
}
|
||||
|
||||
relativeTime_to_IntDay = {
|
||||
"tomorrow": 1,
|
||||
"5th next month": "special",
|
||||
"10th next month": "special",
|
||||
"11th next month": "special",
|
||||
"this month": "special",
|
||||
"this Saturday": "special",
|
||||
"this Sunday": "special",
|
||||
"next Monday": "special",
|
||||
"next Friday": "special",
|
||||
"first monday four months later": "special"
|
||||
}
|
||||
|
||||
def get_rule(env, config: R) -> R:
|
||||
"""
|
||||
@@ -12,6 +80,116 @@ def get_rule(env, config: R) -> R:
|
||||
"""
|
||||
return config["rules"]
|
||||
|
||||
def get_rule_relativeTime(env, config: R) -> R:
|
||||
"""
|
||||
According to the rule definded in funciton "apply_rules_to_timeFormat", convert the relative time to absolute time.
|
||||
config:
|
||||
'relativeTime': {
|
||||
"from": must exist; indicates the relativeTime.
|
||||
"to": optional; indicates the relativeTime.
|
||||
}
|
||||
If relativeTime only has key "from", then the key of time in "expected" dict must be "time".
|
||||
If relativeTime has key "to", then the key of time in "expected" dict must be "from" and "to".
|
||||
"""
|
||||
relativeRules = config["rules"]
|
||||
relativeTime = relativeRules["relativeTime"] # int, "+" means future, "-" means past
|
||||
# get the date now
|
||||
now = datetime.now()
|
||||
# calculate the relative time
|
||||
if "to" not in relativeTime.keys():
|
||||
start_relative_time = relativeTime["from"]
|
||||
if relativeTime_to_IntDay[start_relative_time] != "special":
|
||||
# relativeTime can be represented by actual int days
|
||||
start_relative_time_IntDat = relativeTime_to_IntDay[start_relative_time]
|
||||
timediff = timedelta(days=start_relative_time_IntDat)
|
||||
absoluteDay = now + timediff
|
||||
else:
|
||||
# special case, you can add more special cases here
|
||||
if start_relative_time == "5th next month":
|
||||
next_year = now.year + 1 if now.month == 12 else now.year
|
||||
next_month = now.month + 1 if now.month < 12 else 1
|
||||
next_day = 5
|
||||
absoluteDay = datetime(next_year, next_month, next_day)
|
||||
elif start_relative_time == "10th next month":
|
||||
next_year = now.year + 1 if now.month == 12 else now.year
|
||||
next_month = now.month + 1 if now.month < 12 else 1
|
||||
next_day = 10
|
||||
absoluteDay = datetime(next_year, next_month, next_day)
|
||||
elif start_relative_time == "this month":
|
||||
absoluteDay = now
|
||||
elif start_relative_time == "next Monday":
|
||||
absoluteDay = now + timedelta(days=((6-now.weekday())+1))
|
||||
elif start_relative_time == "first monday four months later":
|
||||
next_year = now.year + 1 if now.month >=9 else now.year
|
||||
next_month = (now.month + 4)%12
|
||||
# get the first monday of the next_month
|
||||
temp_date = datetime(next_year, next_month, 1)
|
||||
absoluteDay = temp_date + timedelta(days=((6-temp_date.weekday())+1)%7)
|
||||
regular_time = apply_rules_to_timeFormat(relativeRules["expected"]["time"], absoluteDay)
|
||||
config["rules"]["expected"]["time"] = regular_time
|
||||
|
||||
else:
|
||||
from_time = relativeTime["from"]
|
||||
to_time = relativeTime["to"]
|
||||
# deal with from_time first
|
||||
if relativeTime_to_IntDay[from_time] != "special":
|
||||
from_time_IntDat = relativeTime_to_IntDay[from_time]
|
||||
from_timediff = timedelta(days=from_time_IntDat)
|
||||
from_absoluteDay = now + from_timediff
|
||||
else:
|
||||
if from_time == "this Saturday":
|
||||
from_absoluteDay = now + timedelta(days=(5-now.weekday()))
|
||||
elif from_time == "10th next month":
|
||||
next_year = now.year + 1 if now.month == 12 else now.year
|
||||
next_month = now.month + 1 if now.month < 12 else 1
|
||||
next_day = 10
|
||||
from_absoluteDay = datetime(next_year, next_month, next_day)
|
||||
elif from_time == "next Monday":
|
||||
from_absoluteDay = now + timedelta(days=((6-now.weekday())+1))
|
||||
else:
|
||||
pass # more rules here
|
||||
regular_from_time = apply_rules_to_timeFormat(relativeRules["expected"]["from"], from_absoluteDay)
|
||||
config["rules"]["expected"]["from"] = regular_from_time
|
||||
|
||||
# deal with to_time
|
||||
if relativeTime_to_IntDay[to_time] != "special":
|
||||
to_time_IntDat = relativeTime_to_IntDay[to_time]
|
||||
to_timediff = timedelta(days=to_time_IntDat)
|
||||
to_absoluteDay = now + to_timediff
|
||||
else:
|
||||
if to_time == "this Sunday":
|
||||
to_absoluteDay = now + timedelta(days=(6-now.weekday()))
|
||||
elif to_time == "11th next month":
|
||||
next_year = now.year + 1 if now.month == 12 else now.year
|
||||
next_month = now.month + 1 if now.month < 12 else 1
|
||||
next_day = 11
|
||||
to_absoluteDay = datetime(next_year, next_month, next_day)
|
||||
elif to_time == "next Friday":
|
||||
if now.weekday() < 4 and from_time in ["next Monday"]:
|
||||
to_absoluteDay = now + timedelta(days=((4-now.weekday())+7))
|
||||
else:
|
||||
to_absoluteDay = now + timedelta(days=((4-now.weekday()) if now.weekday() < 4 else (6-now.weekday()) + 5))
|
||||
else:
|
||||
pass # more rules here
|
||||
regular_to_time = apply_rules_to_timeFormat(relativeRules["expected"]["to"], to_absoluteDay)
|
||||
config["rules"]["expected"]["to"] = regular_to_time
|
||||
|
||||
return config["rules"]
|
||||
|
||||
|
||||
def apply_rules_to_timeFormat(timeFormat: str, absoluteDay: datetime):
|
||||
timeFormat = timeFormat.replace("{DoW}", day_of_week_mapping[absoluteDay.weekday()], 1)
|
||||
timeFormat = timeFormat.replace("{Month}", month_mapping[absoluteDay.month], 1)
|
||||
timeFormat = timeFormat.replace("{DayD}", str(absoluteDay.day), 1)
|
||||
timeFormat = timeFormat.replace("{Year}", str(absoluteDay.year), 1)
|
||||
timeFormat = timeFormat.replace("{Month0D}", "0"+str(absoluteDay.month) if absoluteDay.month < 10 else str(absoluteDay.month), 1)
|
||||
timeFormat = timeFormat.replace("{month}", month_mapping_full[absoluteDay.month], 1)
|
||||
timeFormat = timeFormat.replace("{MonthFull}", Month_Mapping_Full[absoluteDay.month], 1)
|
||||
timeFormat = timeFormat.replace("{Day0D}", "0"+str(absoluteDay.day) if absoluteDay.day < 10 else str(absoluteDay.day), 1)
|
||||
# you can add other replace rules here
|
||||
|
||||
return timeFormat
|
||||
|
||||
|
||||
def get_accessibility_tree(env, *args) -> str:
|
||||
accessibility_tree: str = env.controller.get_accessibility_tree()
|
||||
|
||||
@@ -16,7 +16,9 @@ from .chrome import (
|
||||
check_enabled_experiments,
|
||||
check_history_deleted,
|
||||
is_expected_search_query,
|
||||
is_expected_active_tab
|
||||
is_expected_active_tab,
|
||||
is_expected_url_pattern_match,
|
||||
is_added_to_steam_cart
|
||||
)
|
||||
from .docs import (
|
||||
compare_font_names,
|
||||
@@ -54,7 +56,8 @@ from .general import (
|
||||
exact_match,
|
||||
is_in_list,
|
||||
fuzzy_match,
|
||||
check_include_exclude
|
||||
check_include_exclude,
|
||||
check_direct_json_object
|
||||
)
|
||||
from .gimp import (
|
||||
check_brightness_decrease_and_structure_sim,
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
import logging, re, os, shutil
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
from typing import Any, Dict, List, Union
|
||||
from bs4 import BeautifulSoup, Tag
|
||||
|
||||
import fitz # PyMuPDF
|
||||
import rapidfuzz.fuzz as fuzz
|
||||
from bs4 import BeautifulSoup, Tag
|
||||
|
||||
from desktop_env.evaluators.metrics.utils import are_lists_equal, compare_urls
|
||||
|
||||
@@ -13,11 +17,17 @@ def is_expected_active_tab(active_tab_info: Dict[str, str], rule: Dict[str, Any]
|
||||
"""
|
||||
Checks if the expected active tab is open in Chrome.
|
||||
"""
|
||||
if not active_tab_info:
|
||||
return 0.
|
||||
|
||||
match_type = rule['type']
|
||||
|
||||
if match_type == "url":
|
||||
expected_url = rule['url']
|
||||
actual_url = active_tab_info['url']
|
||||
if isinstance(active_tab_info, Dict):
|
||||
actual_url = active_tab_info.get('url', None)
|
||||
else:
|
||||
actual_url = active_tab_info
|
||||
print("expected_url: {}".format(expected_url))
|
||||
print("actual_url: {}".format(actual_url))
|
||||
return 1 if compare_urls(expected_url, actual_url) else 0
|
||||
@@ -25,6 +35,32 @@ def is_expected_active_tab(active_tab_info: Dict[str, str], rule: Dict[str, Any]
|
||||
logger.error(f"Unknown type: {match_type}")
|
||||
return 0
|
||||
|
||||
|
||||
# rules[expected] is a string-formatted regex
|
||||
def is_expected_url_pattern_match(result, rules) -> float:
|
||||
"""
|
||||
This function is used to search the expected pattern in the url using regex.
|
||||
result is the return value of function "activte_tab_info" or return value of function "get_active_url_from_accessTree"
|
||||
"""
|
||||
if not result:
|
||||
return 0.
|
||||
|
||||
if type(result) == dict:
|
||||
result_url = result["url"]
|
||||
print("result url: {}".format(result_url))
|
||||
else:
|
||||
result_url = result
|
||||
# expect_regex = re.compile(rules["expected"])
|
||||
patterns = rules["expected"]
|
||||
print("expected_regex: {}".format(patterns))
|
||||
for pattern in patterns:
|
||||
match = re.search(pattern, result_url)
|
||||
print(match)
|
||||
if not match:
|
||||
return 0.
|
||||
return 1.
|
||||
|
||||
|
||||
def is_expected_tabs(open_tabs: List[Dict[str, str]], rule: Dict[str, Any]) -> float:
|
||||
"""
|
||||
Checks if the expected tabs are open in Chrome.
|
||||
@@ -102,14 +138,14 @@ def compare_archive(pred_path: str, gold_path: str, **kwargs) -> float:
|
||||
pred_folder = os.path.splitext(pred_path)[0] + '_pred'
|
||||
gold_folder = os.path.splitext(gold_path)[0] + '_gold'
|
||||
|
||||
if os.path.exists(pred_folder): # remove existing folder for new predictions
|
||||
if os.path.exists(pred_folder): # remove existing folder for new predictions
|
||||
shutil.rmtree(pred_folder, ignore_errors=True)
|
||||
os.makedirs(pred_folder)
|
||||
shutil.unpack_archive(pred_path, pred_folder)
|
||||
if not os.path.exists(gold_folder): # use cache if exists
|
||||
if not os.path.exists(gold_folder): # use cache if exists
|
||||
os.makedirs(gold_folder)
|
||||
shutil.unpack_archive(gold_path, gold_folder)
|
||||
|
||||
|
||||
pred_files = sorted(os.listdir(pred_folder))
|
||||
gold_files = sorted(os.listdir(gold_folder))
|
||||
if pred_files != gold_files: return 0.
|
||||
@@ -119,7 +155,8 @@ def compare_archive(pred_path: str, gold_path: str, **kwargs) -> float:
|
||||
if file_type == 'text':
|
||||
from .vscode import compare_text_file
|
||||
return compare_text_file
|
||||
elif file_type == 'pdf': return compare_pdfs
|
||||
elif file_type == 'pdf':
|
||||
return compare_pdfs
|
||||
elif file_type == 'docx':
|
||||
from .docs import compare_docx_files
|
||||
return compare_docx_files
|
||||
@@ -141,7 +178,8 @@ def compare_archive(pred_path: str, gold_path: str, **kwargs) -> float:
|
||||
elif file_type == 'video':
|
||||
from .vlc import compare_videos
|
||||
return compare_videos
|
||||
else: raise ValueError('[ERROR]: not support file type: %s' % file_type)
|
||||
else:
|
||||
raise ValueError('[ERROR]: not support file type: %s' % file_type)
|
||||
|
||||
score = 0
|
||||
compare_function = get_compare_function()
|
||||
@@ -160,7 +198,7 @@ def compare_htmls(html_path1: str, html_path2: str) -> float:
|
||||
soup1 = BeautifulSoup(inf, 'lxml')
|
||||
with open(html_path2, 'r', encoding='utf-8') as inf:
|
||||
soup2 = BeautifulSoup(inf, 'lxml')
|
||||
|
||||
|
||||
def compare_elements(elem1, elem2):
|
||||
if not (isinstance(elem1, Tag) and isinstance(elem2, Tag)):
|
||||
return elem1 == elem2
|
||||
@@ -252,3 +290,18 @@ def check_font_size(font_size, rule):
|
||||
return 1. if rule['min'] < default_font_size < rule['max'] else 0.
|
||||
else:
|
||||
raise TypeError(f"{rule['type']} not support yet!")
|
||||
|
||||
|
||||
def is_added_to_steam_cart(active_tab_info, rule):
|
||||
"""
|
||||
Check if the item is added to the Steam cart.
|
||||
"""
|
||||
items = rule['items']
|
||||
|
||||
content = active_tab_info['content']
|
||||
|
||||
for item in items:
|
||||
if item not in content:
|
||||
return 0.
|
||||
|
||||
return 1.
|
||||
|
||||
@@ -55,7 +55,8 @@ def contains_page_break(docx_file):
|
||||
return 0
|
||||
|
||||
|
||||
def compare_docx_files(file1, file2, ignore_blanks=True):
|
||||
def compare_docx_files(file1, file2, **options):
|
||||
ignore_blanks = options.get('ignore_blanks', True)
|
||||
def get_paragraph_texts_odt(document):
|
||||
paragraphs = document.getElementsByType(P)
|
||||
paragraph_texts = []
|
||||
@@ -250,11 +251,12 @@ def check_tabstops(docx_file1, docx_file2, **kwargs) -> float:
|
||||
splits = p1.text.split('\t')
|
||||
if len(splits) == 0: return .0
|
||||
words = list(filter(lambda x: x.strip(), re.split(r'\s', splits[index])))
|
||||
if len(words) != number: return .0
|
||||
|
||||
if len(words) != number: return .0
|
||||
|
||||
section = doc2.sections[0]
|
||||
paragraph_width = section.page_width - section.left_margin - section.right_margin
|
||||
ignore_tabs = lambda x: x.alignment == WD_TAB_ALIGNMENT.CLEAR or (x.alignment == WD_TAB_ALIGNMENT.LEFT and x.position == 0)
|
||||
ignore_tabs = lambda x: x.alignment == WD_TAB_ALIGNMENT.CLEAR or (
|
||||
x.alignment == WD_TAB_ALIGNMENT.LEFT and x.position == 0)
|
||||
minus = .0
|
||||
for p1, p2 in zip(para1, para2):
|
||||
# filter CLEAR tabstop and default left-0 tabstop
|
||||
@@ -282,18 +284,6 @@ def compare_contains_image(docx_file1, docx_file2):
|
||||
return 1
|
||||
|
||||
|
||||
# file1 = 'path/to/file1.docx'
|
||||
# file2 = 'path/to/file2.docx'
|
||||
|
||||
# print(are_docx_files_same(file1, file2))
|
||||
# Replace 'your_document.docx' with the path to your document
|
||||
# result = contains_page_break('your_document.docx')
|
||||
# print(result)
|
||||
|
||||
# config_path = "/home/[username]/.config/libreoffice/4/user/registrymodifications.xcu"
|
||||
# print(find_default_font("Ani", config_path))
|
||||
|
||||
|
||||
def evaluate_colored_words_in_tables(file_path1, file_path2, **kwargs):
|
||||
if not compare_docx_files(file_path1, file_path2):
|
||||
return 0
|
||||
@@ -317,9 +307,12 @@ def evaluate_colored_words_in_tables(file_path1, file_path2, **kwargs):
|
||||
if word:
|
||||
first_letter = word[0].lower()
|
||||
|
||||
if first_letter in 'aeiou' and _calculate_color_difference(run.font.color.rgb, RGBColor(255, 0, 0)) > threshold:
|
||||
if first_letter in 'aeiou' and _calculate_color_difference(run.font.color.rgb,
|
||||
RGBColor(255, 0, 0)) > threshold:
|
||||
return 0 # Vowel-colored words should be red
|
||||
elif first_letter not in 'aeiou' and _calculate_color_difference(run.font.color.rgb, RGBColor(0, 0, 255)) > threshold:
|
||||
elif first_letter not in 'aeiou' and _calculate_color_difference(run.font.color.rgb,
|
||||
RGBColor(0, 0,
|
||||
255)) > threshold:
|
||||
return 0 # Non-vowel-colored words should be blue
|
||||
|
||||
return 1 # All words in tables are correctly colored
|
||||
@@ -533,4 +526,3 @@ def compare_highlighted_text(file1, file2):
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
|
||||
|
||||
@@ -44,7 +44,9 @@ def is_in_list(result, rules) -> float:
|
||||
return 1.
|
||||
else:
|
||||
return 0.
|
||||
|
||||
|
||||
|
||||
|
||||
def fuzzy_match(result, rules) -> float:
|
||||
expect = rules["expected"]
|
||||
|
||||
@@ -135,7 +137,7 @@ def check_accessibility_tree(result: str, rules: Dict[str, Any]) -> float:
|
||||
needed. If both are present, `xpath` takes the priority.
|
||||
"text": str as the expected text content of the selected element.
|
||||
"exact": bool specifying whether exact match or fuzzy match should
|
||||
be performed. defaults to True
|
||||
be performed. defaults to True.
|
||||
}
|
||||
|
||||
Returns:
|
||||
@@ -152,6 +154,7 @@ def check_accessibility_tree(result: str, rules: Dict[str, Any]) -> float:
|
||||
raise ValueError("At least one of xpath and selectors is required")
|
||||
|
||||
if len(elements) == 0:
|
||||
print("no elements")
|
||||
return 0.
|
||||
|
||||
if "text" in rules:
|
||||
@@ -217,3 +220,22 @@ def check_json(result: str, rules: Dict[str, List[Dict[str, Union[List[str], str
|
||||
value = value[k]
|
||||
metric = metric and not _match_value_to_rule(value, r)
|
||||
return metric
|
||||
|
||||
|
||||
def check_direct_json_object(result, rules)->float:
|
||||
"""
|
||||
One of the most commonly used function to evalute.
|
||||
Compare two json objects directly.
|
||||
"""
|
||||
print("result: ")
|
||||
print(result)
|
||||
print("expected: ")
|
||||
print(rules["expected"])
|
||||
if result is None:
|
||||
return 0.
|
||||
expected_json = rules["expected"]
|
||||
for key in expected_json.keys():
|
||||
expected_value = expected_json.get(key)
|
||||
if expected_value != result.get(key):
|
||||
return 0.
|
||||
return 1.0
|
||||
@@ -33,16 +33,25 @@ def _parse_sheet_idx(sheet_idx: Union[int, str]
|
||||
) -> Tuple[BOOK, str]:
|
||||
# function _parse_sheet_idx {{{ #
|
||||
if isinstance(sheet_idx, int):
|
||||
index: str = result_sheet_names[sheet_idx]
|
||||
try:
|
||||
index: str = result_sheet_names[sheet_idx]
|
||||
except:
|
||||
index = ""
|
||||
book: BOOK = result
|
||||
elif sheet_idx.startswith("RI"):
|
||||
index: str = result_sheet_names[int(sheet_idx[2:])]
|
||||
try:
|
||||
index: str = result_sheet_names[int(sheet_idx[2:])]
|
||||
except:
|
||||
index = ""
|
||||
book: BOOK = result
|
||||
elif sheet_idx.startswith("RN"):
|
||||
index: str = sheet_idx[2:]
|
||||
book: BOOK = result
|
||||
elif sheet_idx.startswith("EI"):
|
||||
index: str = expected_sheet_names[int(sheet_idx[2:])]
|
||||
try:
|
||||
index: str = expected_sheet_names[int(sheet_idx[2:])]
|
||||
except:
|
||||
index = ""
|
||||
book: BOOK = expected
|
||||
elif sheet_idx.startswith("EN"):
|
||||
index: str = sheet_idx[2:]
|
||||
@@ -59,24 +68,29 @@ SHEET = Union[pd.DataFrame, Worksheet, List[str]]
|
||||
|
||||
def _load_sheet(book: BOOK, index: str) -> SHEET:
|
||||
# function _load_sheet {{{ #
|
||||
if isinstance(book, str):
|
||||
book: str = cast(str, book)
|
||||
csv_name: str = "{:}-{:}.csv".format(os.path.splitext(book)[0], index)
|
||||
try:
|
||||
if isinstance(book, str):
|
||||
book: str = cast(str, book)
|
||||
csv_name: str = "{:}-{:}.csv".format(os.path.splitext(book)[0], index)
|
||||
|
||||
with open(csv_name) as f:
|
||||
csv_lines: List[str] = list(itertools.dropwhile(lambda l: len(l) == 0
|
||||
, map(lambda l: l.strip()
|
||||
, reversed(f.read().splitlines())
|
||||
)
|
||||
)
|
||||
)
|
||||
return csv_lines
|
||||
if isinstance(book, pd.ExcelFile):
|
||||
return pd.read_excel(book, index)
|
||||
if isinstance(book, Workbook):
|
||||
return book[index]
|
||||
logger.error("Not supported workbook format")
|
||||
raise NotImplementedError("Not supported workbook format")
|
||||
with open(csv_name) as f:
|
||||
csv_lines: List[str] = list(itertools.dropwhile(lambda l: len(l) == 0
|
||||
, map(lambda l: l.strip()
|
||||
, reversed(f.read().splitlines())
|
||||
)
|
||||
)
|
||||
)
|
||||
return csv_lines
|
||||
if isinstance(book, pd.ExcelFile):
|
||||
return pd.read_excel(book, index)
|
||||
if isinstance(book, Workbook):
|
||||
return book[index]
|
||||
logger.error("Not supported workbook format")
|
||||
raise NotImplementedError("Not supported workbook format")
|
||||
except NotImplementedError as e:
|
||||
raise e
|
||||
except:
|
||||
return None
|
||||
# }}} function _load_sheet #
|
||||
|
||||
|
||||
@@ -139,8 +153,13 @@ def compare_table(result: str, expected: str = None, **options) -> float:
|
||||
# precision: int as number of decimal digits, default to 4
|
||||
|
||||
error_limit: int = r.get("precision", 4)
|
||||
sheet1: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx0"], pdworkbookr, pdworkbooke)).round(error_limit)
|
||||
sheet2: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx1"], pdworkbookr, pdworkbooke)).round(error_limit)
|
||||
sheet1: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx0"], pdworkbookr, pdworkbooke))
|
||||
if sheet1 is None:
|
||||
return 0.
|
||||
sheet2: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx1"], pdworkbookr, pdworkbooke))
|
||||
|
||||
sheet1 = sheet1.round()
|
||||
sheet2 = sheet2.round()
|
||||
metric: bool = sheet1.equals(sheet2)
|
||||
logger.debug("Sheet1: \n%s", str(sheet1))
|
||||
logger.debug("Sheet2: \n%s", str(sheet2))
|
||||
@@ -158,6 +177,8 @@ def compare_table(result: str, expected: str = None, **options) -> float:
|
||||
# ignore_case: optional, defaults to False
|
||||
|
||||
sheet1: List[str] = _load_sheet(*parse_idx(r["sheet_idx0"], result, expected))
|
||||
if sheet1 is None:
|
||||
return 0.
|
||||
sheet2: List[str] = _load_sheet(*parse_idx(r["sheet_idx1"], result, expected))
|
||||
if r.get("ignore_case", False):
|
||||
sheet1 = [l.lower() for l in sheet1]
|
||||
@@ -195,11 +216,11 @@ def compare_table(result: str, expected: str = None, **options) -> float:
|
||||
# sheet_idx1: as sheet_idx0
|
||||
# props: list of str indicating concerned styles, see utils._read_cell_style
|
||||
|
||||
sheet_idx1: Tuple[Book, str] = parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke)
|
||||
sheet_idx1: Tuple[BOOK, str] = parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke)
|
||||
book_name1: str = parse_idx(r["sheet_idx0"], result, expected)[0]
|
||||
styles1: Dict[str, List[Any]] = load_xlsx_styles(*sheet_idx1, book_name1, **r)
|
||||
|
||||
sheet_idx2: Tuple[Book, str] = parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke)
|
||||
sheet_idx2: Tuple[BOOK, str] = parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke)
|
||||
book_name2: str = parse_idx(r["sheet_idx1"], result, expected)[0]
|
||||
styles2: Dict[str, List[Any]] = load_xlsx_styles(*sheet_idx2, book_name2, **r)
|
||||
# number_formats1: List[str] = [c.number_format.lower() for col in sheet1.iter_cols() for c in col if c.value is not None and c.data_type=="n"]
|
||||
@@ -214,6 +235,8 @@ def compare_table(result: str, expected: str = None, **options) -> float:
|
||||
# sheet_idx1: as sheet_idx0
|
||||
|
||||
sheet1: Worksheet = _load_sheet(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke))
|
||||
if sheet1 is None:
|
||||
return 0.
|
||||
sheet2: Worksheet = _load_sheet(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke))
|
||||
metric: bool = sheet1.freeze_panes == sheet2.freeze_panes
|
||||
logger.debug("Assertion: %s.freeze(%s) == %s.freeze(%s) - %s"
|
||||
@@ -230,6 +253,8 @@ def compare_table(result: str, expected: str = None, **options) -> float:
|
||||
# ref: value
|
||||
|
||||
sheet: Worksheet = _load_sheet(*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke))
|
||||
if sheet is None:
|
||||
return 0.
|
||||
zoom_scale: Number = sheet.sheet_view.zoomScale or 100.
|
||||
metric: bool = _match_value_to_rule(zoom_scale, r)
|
||||
logger.debug("Assertion: %s.zoom(%.1f) %s %.1f - %s", r["sheet_idx"], zoom_scale, r["method"], r["ref"],
|
||||
@@ -258,6 +283,8 @@ def compare_table(result: str, expected: str = None, **options) -> float:
|
||||
# * imeMode
|
||||
|
||||
sheet: Worksheet = _load_sheet(*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke))
|
||||
if sheet is None:
|
||||
return 0.
|
||||
data_validators: List[DataValidation] = sheet.data_validations.dataValidation
|
||||
|
||||
total_metric = len(data_validators) >= len(r["dv_props"])
|
||||
@@ -348,6 +375,8 @@ def compare_table(result: str, expected: str = None, **options) -> float:
|
||||
# supported attributes: value & those supported by utils._read_cell_style
|
||||
|
||||
sheet: Worksheet = _load_sheet(*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke))
|
||||
if sheet is None:
|
||||
return 0.
|
||||
# data_frame: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx"], pdworkbookr, pdworkbooke))
|
||||
cell: Cell = sheet[r["coordinate"]]
|
||||
metric: bool = True
|
||||
|
||||
@@ -1,46 +1,47 @@
|
||||
import builtins
|
||||
import functools
|
||||
import itertools
|
||||
import logging
|
||||
import operator
|
||||
import re
|
||||
import zipfile
|
||||
from typing import Any, TypeVar, Union, Iterable, Optional, Callable
|
||||
from typing import Dict, List, Set, Match, Tuple, Pattern
|
||||
from urllib.parse import urlparse, urlunparse
|
||||
import re
|
||||
import functools
|
||||
import operator
|
||||
import builtins
|
||||
import itertools
|
||||
|
||||
import formulas
|
||||
import lxml.cssselect
|
||||
import lxml.etree
|
||||
import openpyxl
|
||||
import xmltodict
|
||||
from lxml.etree import _Element
|
||||
from openpyxl import Workbook
|
||||
from openpyxl.cell.cell import Cell
|
||||
from openpyxl.chart._chart import ChartBase
|
||||
from openpyxl.worksheet.worksheet import Worksheet
|
||||
from openpyxl.worksheet.filters import AutoFilter, SortState
|
||||
from openpyxl.formatting.formatting import ConditionalFormattingList
|
||||
from openpyxl.pivot.cache import CacheSource as PivotCacheSource
|
||||
from openpyxl.pivot.table import TableDefinition as PivotTableDefinition
|
||||
from openpyxl.styles.differential import DifferentialStyle
|
||||
from openpyxl.utils import coordinate_to_tuple, get_column_letter
|
||||
from openpyxl.worksheet.cell_range import MultiCellRange, CellRange
|
||||
from openpyxl.worksheet.dimensions import DimensionHolder
|
||||
from openpyxl.formatting.formatting import ConditionalFormattingList
|
||||
from openpyxl.utils import coordinate_to_tuple, get_column_letter
|
||||
from openpyxl.cell.cell import Cell
|
||||
from openpyxl.styles.differential import DifferentialStyle
|
||||
from openpyxl.pivot.table import TableDefinition as PivotTableDefinition
|
||||
from openpyxl.pivot.cache import CacheSource as PivotCacheSource
|
||||
import formulas
|
||||
from openpyxl.worksheet.filters import AutoFilter, SortState
|
||||
from openpyxl.worksheet.worksheet import Worksheet
|
||||
|
||||
V = TypeVar("Value")
|
||||
|
||||
logger = logging.getLogger("desktopenv.metrics.utils")
|
||||
|
||||
_xlsx_namespaces = [ ("oo", "http://schemas.openxmlformats.org/spreadsheetml/2006/main")
|
||||
, ("x14", "http://schemas.microsoft.com/office/spreadsheetml/2009/9/main")
|
||||
, ("xm", "http://schemas.microsoft.com/office/excel/2006/main")
|
||||
]
|
||||
_xlsx_namespaces = [("oo", "http://schemas.openxmlformats.org/spreadsheetml/2006/main")
|
||||
, ("x14", "http://schemas.microsoft.com/office/spreadsheetml/2009/9/main")
|
||||
, ("xm", "http://schemas.microsoft.com/office/excel/2006/main")
|
||||
]
|
||||
_xlsx_ns_mapping = dict(_xlsx_namespaces)
|
||||
_xlsx_ns_imapping = dict(map(lambda itm: (itm[1], itm[0]), _xlsx_namespaces))
|
||||
_xlsx_ns_imapping["http://schemas.openxmlformats.org/spreadsheetml/2006/main"] = None
|
||||
_sheet_name_selector = lxml.cssselect.CSSSelector("oo|sheets>oo|sheet", namespaces=_xlsx_ns_mapping)
|
||||
_sparklines_selector = lxml.cssselect.CSSSelector("x14|sparkline", namespaces=_xlsx_ns_mapping)
|
||||
|
||||
|
||||
def load_sparklines(xlsx_file: str, sheet_name: str) -> Dict[str, str]:
|
||||
# function load_sparklines {{{ #
|
||||
"""
|
||||
@@ -174,6 +175,7 @@ def load_charts(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[str, An
|
||||
return chart_set
|
||||
# }}} function load_charts #
|
||||
|
||||
|
||||
# Available Pivot Properties:
|
||||
# name: str
|
||||
# show_total, show_empty_row, show_empty_col, show_headers: bool
|
||||
@@ -210,23 +212,26 @@ def load_pivot_tables(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[s
|
||||
pivot_set: Dict[str, Any] = {}
|
||||
pivot_props: Set[str] = set(options.get("pivot_props", []))
|
||||
for pvt in pivots:
|
||||
raw_selection: List[List[tuple[Optional[bool], int]]] =\
|
||||
[ [(itm.h, itm.x) for itm in f.items if itm.x is not None]\
|
||||
for f in pvt.pivotFields
|
||||
]
|
||||
raw__selection: List[List[tuple[Optional[bool], int]]] = list(itertools.dropwhile(lambda r: len(r)==0, raw_selection))
|
||||
left_bias = len(raw_selection)-len(raw__selection)
|
||||
selection: List[List[tuple[Optional[bool], int]]] = list((itertools.dropwhile(lambda r: len(r)==0, reversed(raw__selection))))[::-1]
|
||||
right_bias = len(raw__selection)-len(selection)
|
||||
raw_selection: List[List[tuple[Optional[bool], int]]] = \
|
||||
[[(itm.h, itm.x) for itm in f.items if itm.x is not None] \
|
||||
for f in pvt.pivotFields
|
||||
]
|
||||
raw__selection: List[List[tuple[Optional[bool], int]]] = list(
|
||||
itertools.dropwhile(lambda r: len(r) == 0, raw_selection))
|
||||
left_bias = len(raw_selection) - len(raw__selection)
|
||||
selection: List[List[tuple[Optional[bool], int]]] = list(
|
||||
(itertools.dropwhile(lambda r: len(r) == 0, reversed(raw__selection))))[::-1]
|
||||
right_bias = len(raw__selection) - len(selection)
|
||||
cache_source: PivotCacheSource = pvt.cache.cacheSource
|
||||
cell_range1: str
|
||||
cell_range2: str
|
||||
cell_range1, cell_range2 = cache_source.worksheetSource.ref.split(":")
|
||||
cell_range1: Tuple[int, int] = coordinate_to_tuple(cell_range1)
|
||||
cell_range1 = (cell_range1[0], cell_range1[1]+left_bias)
|
||||
cell_range1 = (cell_range1[0], cell_range1[1] + left_bias)
|
||||
cell_range2: Tuple[int, int] = coordinate_to_tuple(cell_range2)
|
||||
cell_range2 = (cell_range2[0], cell_range2[1]-right_bias)
|
||||
source: str = "{:};{:}:{:};{:}".format(cache_source.type, cell_range1, cell_range2, cache_source.worksheetSource.sheet)
|
||||
cell_range2 = (cell_range2[0], cell_range2[1] - right_bias)
|
||||
source: str = "{:};{:}:{:};{:}".format(cache_source.type, cell_range1, cell_range2,
|
||||
cache_source.worksheetSource.sheet)
|
||||
|
||||
info: Dict[str, Any] = {}
|
||||
if "name" in pivot_props:
|
||||
@@ -248,22 +253,26 @@ def load_pivot_tables(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[s
|
||||
if "filter" in pivot_props:
|
||||
info["filter_fields"] = set(f.fld for f in pvt.pageFields)
|
||||
if "col_fields" in pivot_props:
|
||||
info["col_fields"] = [f.x-left_bias for f in pvt.colFields]
|
||||
info["col_fields"] = [f.x - left_bias for f in pvt.colFields]
|
||||
if "row_fields" in pivot_props:
|
||||
info["row_fields"] = [f.x-left_bias for f in pvt.rowFields]
|
||||
info["row_fields"] = [f.x - left_bias for f in pvt.rowFields]
|
||||
if "data_fields" in pivot_props:
|
||||
info["data_fields"] = [ "{:d};{:};{:};{:}".format( f.fld-left_bias, f.name if "data_fields_name" in pivot_props else ""
|
||||
, f.subtotal, f.showDataAs
|
||||
)\
|
||||
for f in pvt.dataFields
|
||||
]
|
||||
info["data_fields"] = [
|
||||
"{:d};{:};{:};{:}".format(f.fld - left_bias, f.name if "data_fields_name" in pivot_props else ""
|
||||
, f.subtotal, f.showDataAs
|
||||
) \
|
||||
for f in pvt.dataFields
|
||||
]
|
||||
|
||||
pivot_set[source] = info
|
||||
logger.debug(".[%s].pivots: %s", sheet_name, repr(pivot_set))
|
||||
return pivot_set
|
||||
# }}} function load_pivot_tables #
|
||||
|
||||
|
||||
_shared_str_selector = lxml.cssselect.CSSSelector("oo|sst>oo|si>oo|t", namespaces=_xlsx_ns_mapping)
|
||||
|
||||
|
||||
def read_cell_value(xlsx_file: str, sheet_name: str, coordinate: str) -> Any:
|
||||
# read_cell_value {{{ #
|
||||
try:
|
||||
@@ -283,20 +292,20 @@ def read_cell_value(xlsx_file: str, sheet_name: str, coordinate: str) -> Any:
|
||||
|
||||
with z_f.open("xl/worksheets/sheet{:}.xml".format(sheet_names[sheet_name])) as f:
|
||||
sheet: _Element = lxml.etree.fromstring(f.read())
|
||||
cells: List[_Element] =\
|
||||
lxml.cssselect.CSSSelector( 'oo|row>oo|c[r="{:}"]'.format(coordinate)
|
||||
, namespaces=_xlsx_ns_mapping
|
||||
)(sheet)
|
||||
if len(cells)==0:
|
||||
cells: List[_Element] = \
|
||||
lxml.cssselect.CSSSelector('oo|row>oo|c[r="{:}"]'.format(coordinate)
|
||||
, namespaces=_xlsx_ns_mapping
|
||||
)(sheet)
|
||||
if len(cells) == 0:
|
||||
return None
|
||||
cell: _Element = cells[0]
|
||||
except zipfile.BadZipFile:
|
||||
return None
|
||||
|
||||
cell: Dict[str, str] = xmltodict.parse( lxml.etree.tostring(cell, encoding="unicode")
|
||||
, process_namespaces=True
|
||||
, namespaces=_xlsx_ns_imapping
|
||||
)
|
||||
cell: Dict[str, str] = xmltodict.parse(lxml.etree.tostring(cell, encoding="unicode")
|
||||
, process_namespaces=True
|
||||
, namespaces=_xlsx_ns_imapping
|
||||
)
|
||||
logger.debug("%s.%s[%s]: %s", xlsx_file, sheet_name, coordinate, repr(cell))
|
||||
if "@t" not in cell["c"]:
|
||||
return None
|
||||
@@ -308,6 +317,7 @@ def read_cell_value(xlsx_file: str, sheet_name: str, coordinate: str) -> Any:
|
||||
return cell["c"]["v"]
|
||||
# }}} read_cell_value #
|
||||
|
||||
|
||||
# Supported Styles:
|
||||
# number_format
|
||||
# font_name - str
|
||||
@@ -322,50 +332,53 @@ def read_cell_value(xlsx_file: str, sheet_name: str, coordinate: str) -> Any:
|
||||
# fgcolor - in aRGB, e.g., FF00FFFF is yellow
|
||||
# hyperlink - str
|
||||
def _read_cell_style(style_name: str, cell: Cell, diff_style: Optional[DifferentialStyle] = None) -> Any:
|
||||
if style_name=="number_format":
|
||||
return (cell.number_format if diff_style is None else diff_style.numFmt.formatCode)\
|
||||
if cell.value is not None and cell.data_type=="n" else None
|
||||
elif style_name=="font_name":
|
||||
if style_name == "number_format":
|
||||
return (cell.number_format if diff_style is None else diff_style.numFmt.formatCode) \
|
||||
if cell.value is not None and cell.data_type == "n" else None
|
||||
elif style_name == "font_name":
|
||||
return (diff_style or cell).font.name if cell.value is not None else None
|
||||
elif style_name=="font_family":
|
||||
elif style_name == "font_family":
|
||||
return (diff_style or cell).font.family if cell.value is not None else None
|
||||
elif style_name=="font_color":
|
||||
elif style_name == "font_color":
|
||||
return (diff_style or cell).font.color.rgb if cell.value is not None else None
|
||||
elif style_name=="font_bold":
|
||||
elif style_name == "font_bold":
|
||||
return (diff_style or cell).font.bold if cell.value is not None else None
|
||||
elif style_name=="font_italic":
|
||||
elif style_name == "font_italic":
|
||||
return (diff_style or cell).font.italic if cell.value is not None else None
|
||||
elif style_name=="font_underline":
|
||||
elif style_name == "font_underline":
|
||||
return (diff_style or cell).font.underline if cell.value is not None else None
|
||||
elif style_name=="font_size":
|
||||
elif style_name == "font_size":
|
||||
return (diff_style or cell).font.size if cell.value is not None else None
|
||||
elif style_name=="fill_type":
|
||||
elif style_name == "fill_type":
|
||||
try:
|
||||
return (diff_style or cell).fill.tagname
|
||||
except:
|
||||
return None
|
||||
elif style_name=="bgcolor":
|
||||
elif style_name == "bgcolor":
|
||||
try:
|
||||
return (diff_style or cell).fill.bgColor.rgb
|
||||
except:
|
||||
return None
|
||||
elif style_name=="fgcolor":
|
||||
elif style_name == "fgcolor":
|
||||
try:
|
||||
return (diff_style or cell).fill.fgColor.rgb
|
||||
except:
|
||||
return None
|
||||
elif style_name=="hyperlink":
|
||||
elif style_name == "hyperlink":
|
||||
return cell.hyperlink or "" if cell.value is not None else None
|
||||
else:
|
||||
raise NotImplementedError("Unsupported Style: {:}".format(style_name))
|
||||
|
||||
_absolute_range_pattern: Pattern[str] = re.compile( r"""\$(?P<col1>[A-Z]{1,3})\$(?P<row1>\d+) # coord1
|
||||
|
||||
_absolute_range_pattern: Pattern[str] = re.compile(r"""\$(?P<col1>[A-Z]{1,3})\$(?P<row1>\d+) # coord1
|
||||
(?::
|
||||
\$(?P<col2>[A-Z]{1,3})\$(?P<row2>\d+) # coord2
|
||||
)?
|
||||
"""
|
||||
, re.X
|
||||
)
|
||||
, re.X
|
||||
)
|
||||
|
||||
|
||||
def load_xlsx_styles(xlsx_file: Workbook, sheet_name: str, book_name: str, **options) -> Dict[str, List[Any]]:
|
||||
# function load_xlsx_styles {{{ #
|
||||
"""
|
||||
@@ -417,24 +430,24 @@ def load_xlsx_styles(xlsx_file: Workbook, sheet_name: str, book_name: str, **opt
|
||||
if m[2] is None and m[3] is None:
|
||||
arguments.append(read_cell_value(book_name, sheet_name, coordinate="{:}{:}".format(m[0], m[1])))
|
||||
else:
|
||||
arguments.append( [ read_cell_value( book_name, sheet_name
|
||||
, coordinate="{:}{:}".format( get_column_letter(c[1])
|
||||
arguments.append([read_cell_value(book_name, sheet_name
|
||||
, coordinate="{:}{:}".format(get_column_letter(c[1])
|
||||
, c[0]
|
||||
)
|
||||
)\
|
||||
for c in CellRange("{:}{:}:{:}{:}".format(m[0], m[1], m[2], m[3])).cells\
|
||||
) \
|
||||
for c in CellRange("{:}{:}:{:}{:}".format(m[0], m[1], m[2], m[3])).cells \
|
||||
]
|
||||
)
|
||||
)
|
||||
logger.debug("Absolute range arguments: %s", repr(arguments))
|
||||
|
||||
for rge in fmt.cells:
|
||||
for c in rge.cells:
|
||||
cell: Cell = worksheet.cell(row=c[0], column=c[1])
|
||||
cell_value = read_cell_value( book_name, sheet_name
|
||||
, coordinate="{:}{:d}".format( get_column_letter(c[1])
|
||||
, c[0]
|
||||
)
|
||||
)
|
||||
cell_value = read_cell_value(book_name, sheet_name
|
||||
, coordinate="{:}{:d}".format(get_column_letter(c[1])
|
||||
, c[0]
|
||||
)
|
||||
)
|
||||
if condition(cell_value, *arguments):
|
||||
logger.debug("Active Cell %s(%s) for %s", repr(cell), str(cell_value), r.formula[0])
|
||||
active_cells.append(cell)
|
||||
@@ -448,6 +461,7 @@ def load_xlsx_styles(xlsx_file: Workbook, sheet_name: str, book_name: str, **opt
|
||||
return style_dict
|
||||
# }}} function load_xlsx_styles #
|
||||
|
||||
|
||||
# Available Row Properties:
|
||||
# hidden
|
||||
# collapsed
|
||||
@@ -460,7 +474,7 @@ def load_xlsx_styles(xlsx_file: Workbook, sheet_name: str, book_name: str, **opt
|
||||
# collapsed
|
||||
# min
|
||||
# max
|
||||
def load_rows_or_cols(xlsx_file: Workbook, sheet_name: str, **options)\
|
||||
def load_rows_or_cols(xlsx_file: Workbook, sheet_name: str, **options) \
|
||||
-> Dict[Union[int, str], Dict[str, Any]]:
|
||||
# function load_rows_or_cols {{{ #
|
||||
"""
|
||||
@@ -491,6 +505,7 @@ def load_rows_or_cols(xlsx_file: Workbook, sheet_name: str, **options)\
|
||||
return obj_set
|
||||
# }}} function load_rows_or_cols #
|
||||
|
||||
|
||||
def load_filters(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[str, Any]:
|
||||
# function load_filters {{{ #
|
||||
try:
|
||||
@@ -514,16 +529,16 @@ def load_filters(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[str, A
|
||||
filter_column["filters"] = set(flt_clm.filters.filter)
|
||||
if flt_clm.customFilters is not None:
|
||||
filter_column["custom_filters_op"] = flt_clm.customFilters._and
|
||||
filter_column["custom_filters"] = set( ( flt.operator
|
||||
filter_column["custom_filters"] = set((flt.operator
|
||||
, flt.val
|
||||
)\
|
||||
for flt in flt_clm.customFilters.customFilter
|
||||
)
|
||||
) \
|
||||
for flt in flt_clm.customFilters.customFilter
|
||||
)
|
||||
filter_column_set.append(filter_column)
|
||||
filter_column_set = list( sorted( filter_column_set
|
||||
filter_column_set = list(sorted(filter_column_set
|
||||
, key=(lambda d: d["col_id"])
|
||||
)
|
||||
)
|
||||
)
|
||||
filter_dict["filter_column"] = filter_column_set
|
||||
|
||||
# sortState
|
||||
@@ -534,26 +549,30 @@ def load_filters(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[str, A
|
||||
sort_state_dict["case"] = sort_state.caseSensitive
|
||||
sort_state_dict["method"] = sort_state.sortMethod
|
||||
sort_state_dict["ref"] = sort_state.ref
|
||||
sort_state_dict["condition"] = list( { "descending": cdt.descending
|
||||
, "key": cdt.sortBy
|
||||
, "ref": cdt.ref
|
||||
, "custom_list": cdt.customList
|
||||
, "dxf_id": cdt.dxfId
|
||||
, "icon": cdt.iconSet
|
||||
, "iconid": cdt.iconId
|
||||
}\
|
||||
for cdt in sort_state.sortCondition
|
||||
)
|
||||
sort_state_dict["condition"] = list({"descending": cdt.descending
|
||||
, "key": cdt.sortBy
|
||||
, "ref": cdt.ref
|
||||
, "custom_list": cdt.customList
|
||||
, "dxf_id": cdt.dxfId
|
||||
, "icon": cdt.iconSet
|
||||
, "iconid": cdt.iconId
|
||||
} \
|
||||
for cdt in sort_state.sortCondition
|
||||
)
|
||||
filter_dict["sort_state"] = sort_state_dict
|
||||
|
||||
return filter_dict
|
||||
# }}} function load_filters #
|
||||
|
||||
|
||||
def _match_record(pattern: Dict[str, Any], item: Dict[str, Any]) -> bool:
|
||||
return all(k in item and item[k] == val for k, val in pattern.items())
|
||||
|
||||
|
||||
def _multicellrange_containsby(subset_candidate: MultiCellRange, superset_candidate: MultiCellRange) -> bool:
|
||||
return all(r in superset_candidate for r in subset_candidate)
|
||||
|
||||
|
||||
def _match_value_to_rule(value: V, rule: Dict[str, Union[str, V]]) -> bool:
|
||||
"""
|
||||
Args:
|
||||
@@ -576,10 +595,10 @@ def _match_value_to_rule(value: V, rule: Dict[str, Union[str, V]]) -> bool:
|
||||
|
||||
match_: Optional[Match[str]] = re.search(rule["ref"], value, flag)
|
||||
return match_ is not None
|
||||
if rule["method"] in { "eq", "ne"
|
||||
, "le", "lt"
|
||||
, "ge", "gt"
|
||||
}:
|
||||
if rule["method"] in {"eq", "ne"
|
||||
, "le", "lt"
|
||||
, "ge", "gt"
|
||||
}:
|
||||
return getattr(operator, rule["method"])(value, rule["ref"])
|
||||
if rule["method"].startswith("approx"):
|
||||
threshold: float = float(rule["method"].split(":")[1])
|
||||
@@ -589,26 +608,27 @@ def _match_value_to_rule(value: V, rule: Dict[str, Union[str, V]]) -> bool:
|
||||
except (ValueError, TypeError):
|
||||
return False
|
||||
else:
|
||||
return abs(value-rule["ref"])<=threshold
|
||||
return abs(value - rule["ref"]) <= threshold
|
||||
if rule["method"] == "spreadsheet_range":
|
||||
subset_limit = MultiCellRange(rule["ref"][0])
|
||||
superset_limit = MultiCellRange(rule["ref"][1])
|
||||
return _multicellrange_containsby(subset_limit, value)\
|
||||
and _multicellrange_containsby(value, superset_limit)
|
||||
if rule["method"].startswith("range."): # e.g., range.te [0, 2] -> 0 < x <= 2
|
||||
return _multicellrange_containsby(subset_limit, value) \
|
||||
and _multicellrange_containsby(value, superset_limit)
|
||||
if rule["method"].startswith("range."): # e.g., range.te [0, 2] -> 0 < x <= 2
|
||||
left_et = rule["method"][6]
|
||||
right_et = rule["method"][7]
|
||||
return getattr(operator, "l" + left_et)(rule["ref"][0], value)\
|
||||
and getattr(operator, "l" + right_et)(value, rule["ref"][1])
|
||||
return getattr(operator, "l" + left_et)(rule["ref"][0], value) \
|
||||
and getattr(operator, "l" + right_et)(value, rule["ref"][1])
|
||||
if rule["method"] in {"str_list_eq", "str_set_eq"}:
|
||||
container_type_str: str = rule["method"][4:-3]
|
||||
container_type = getattr(builtins, container_type_str)
|
||||
|
||||
value: container_type = container_type(value.strip("\"'").split(","))
|
||||
ref: container_type = container_type(rule["ref"])
|
||||
return value==ref
|
||||
return value == ref
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def are_lists_equal(list1, list2, comparison_func):
|
||||
# First check if both lists have the same length
|
||||
if len(list1) != len(list2):
|
||||
@@ -625,6 +645,9 @@ def are_lists_equal(list1, list2, comparison_func):
|
||||
|
||||
|
||||
def compare_urls(url1, url2):
|
||||
if url1 is None or url2 is None:
|
||||
return url1 == url2
|
||||
|
||||
def normalize_url(url):
|
||||
# Parse the URL
|
||||
parsed_url = urlparse(url)
|
||||
@@ -649,114 +672,3 @@ def compare_urls(url1, url2):
|
||||
|
||||
# Compare the normalized URLs
|
||||
return norm_url1 == norm_url2
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
path1 = "test.xlsx"
|
||||
#path1 = "../../任务数据/LibreOffice Calc/Create_column_charts_using_statistics_gold.xlsx"
|
||||
path1 = "../../任务集/SheetCopilot/dataset/task_sheet_answers_v2/BoomerangSales/2_BoomerangSales/2_BoomerangSales_gt1.xlsx"
|
||||
workbook1: Workbook = openpyxl.load_workbook(filename=path1)
|
||||
worksheet1: Worksheet = workbook1.active
|
||||
#charts: List[ChartBase] = worksheet1._charts
|
||||
# print(len(charts))
|
||||
# print(type(charts[0]))
|
||||
#
|
||||
# print(len(charts[0].series))
|
||||
# print(type(charts[0].series[0]))
|
||||
# print(type(charts[0].series[0].val))
|
||||
##print(charts[0].series[0].val)
|
||||
# print(charts[0].series[0].val.numRef.f)
|
||||
#
|
||||
# print(type(charts[0].series[0].cat))
|
||||
##print(charts[0].series[0].cat)
|
||||
# print(charts[0].series[0].cat.numRef)
|
||||
# print(charts[0].series[0].cat.strRef)
|
||||
# print(charts[0].series[0].cat.strRef.f)
|
||||
|
||||
# print(type(charts[0].title.tx.strRef))
|
||||
# print(type(charts[0].title.tx.rich))
|
||||
# print(type(charts[0].title.txPr))
|
||||
# print(len(charts[0].title.tx.rich.p))
|
||||
# print(len(charts[0].title.tx.rich.p[0].r))
|
||||
# print(type(charts[0].title.tx.rich.p[0].r[0]))
|
||||
# print(type(charts[0].title.tx.rich.p[0].r[0].t))
|
||||
# print(charts[0].title.tx.rich.p[0].r[0].t)
|
||||
|
||||
# print(type(charts[0].anchor))
|
||||
# print(charts[0].anchor.editAs)
|
||||
# print(charts[0].anchor._from.col, charts[0].anchor.to.row)
|
||||
# print(charts[0].anchor.to.col, charts[0].anchor.to.row)
|
||||
|
||||
# df1 = pd.read_excel(path1)
|
||||
# print(df1)
|
||||
#print(load_charts(path1, chart_props=["title", "xtitle", "ytitle", "type"]))
|
||||
#print(type(worksheet1["A1"].hyperlink))
|
||||
#print(worksheet1["A1"].hyperlink)
|
||||
#print(worksheet1._charts[0].legend)
|
||||
#print(worksheet1._charts[0].legend.position)
|
||||
#for entr in worksheet1._charts[0].legend.legendEntry:
|
||||
#print("Entr", entr.txPr.p[0].r[0].t)
|
||||
#print(load_filters(workbook1, "工作表1"))
|
||||
#print(worksheet1.auto_filter)
|
||||
#for pvt in worksheet1._pivots:
|
||||
##print(type(pvt))
|
||||
##print(pvt)
|
||||
#print(type(pvt.cache))
|
||||
##print(pvt.cache)
|
||||
#print(pvt.cache.cacheSource.type)
|
||||
#print(pvt.cache.cacheSource.worksheetSource.ref)
|
||||
#print(pvt.cache.cacheSource.worksheetSource.sheet)
|
||||
#
|
||||
#print(type(pvt.location))
|
||||
#print(pvt.location)
|
||||
#for f in pvt.pivotFields:
|
||||
#print(type(f))
|
||||
#print([(itm.h, itm.x) for itm in f.items])
|
||||
##for f_itm in f.items:
|
||||
##print(f_itm.n)
|
||||
##print(f_itm.t)
|
||||
##print(f_itm.h)
|
||||
##print(f_itm.s)
|
||||
##print(f_itm.sd)
|
||||
##print(f_itm.f)
|
||||
##print(f_itm.m)
|
||||
##print(f_itm.c)
|
||||
##print(f_itm.x)
|
||||
##print(f_itm.d)
|
||||
##print(f_itm.e)
|
||||
##print(f.countASubtotal)
|
||||
##print(f.countSubtotal)
|
||||
##for f in pvt.dataFields:
|
||||
##print(f.name)
|
||||
##print(f.fld)
|
||||
###print(f.baseField)
|
||||
##print(f.subtotal)
|
||||
##print(f.showDataAs)
|
||||
##for f in pvt.rowFields:
|
||||
##print(1, f.x)
|
||||
##for f in pvt.rowItems:
|
||||
##print(2, f.t, f.r, f.i, f.x)
|
||||
##for f in pvt.colFields:
|
||||
##print(3, f.x)
|
||||
##for f in pvt.colItems:
|
||||
##print(4, f.t, f.r, f.i, f.x)
|
||||
#for f in pvt.pageFields:
|
||||
#print(5, f.fld)
|
||||
#for flt in pvt.filters:
|
||||
#print(5, flt.fld)
|
||||
#print(6, flt.mpFld)
|
||||
#print(7, flt.type)
|
||||
#print(8, flt.evalOrder)
|
||||
#print(9, flt.id)
|
||||
#print(10, flt.stringValue1)
|
||||
#print(11, flt.stringValue2)
|
||||
#print(load_charts(workbook1, "Sheet2", chart_props=["title", "type", "legend"]))
|
||||
#print(load_filters(workbook1, "透视表_工作表1_1"))
|
||||
#workbook1.save("test2.xlsx")
|
||||
print( load_pivot_tables( workbook1, "Sheet2", pivot_props=[ "col_fields"
|
||||
, "filter"
|
||||
, "row_fields"
|
||||
, "data_fields"
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
@@ -33,7 +33,7 @@ def check_json_keybindings(actual: str, expected: str, **options) -> float:
|
||||
break
|
||||
else:
|
||||
return 0.0
|
||||
expected = expected['expect']
|
||||
expected = expected['expected']
|
||||
if expected in data:
|
||||
return 1.0
|
||||
else:
|
||||
@@ -55,7 +55,7 @@ def check_json_settings(actual: str, expected: str, **options) -> float:
|
||||
with open(actual, 'r') as f:
|
||||
data = json.load(f)
|
||||
|
||||
expect = expected['expect']
|
||||
expect = expected['expected']
|
||||
data_copy = copy.deepcopy(data)
|
||||
data_copy.update(expect)
|
||||
if data == data_copy:
|
||||
@@ -93,7 +93,7 @@ def compare_config(actual: str, rules: Dict, **options) -> float:
|
||||
with open(actual) as f1:
|
||||
actual_text = f1.read()
|
||||
|
||||
if actual_text == rules['expect']:
|
||||
if actual_text == rules['expected']:
|
||||
return 1.0
|
||||
return 0.0
|
||||
|
||||
@@ -110,7 +110,7 @@ def compare_answer(actual: str, rules: Dict, **options) -> float:
|
||||
if not actual:
|
||||
return 0.
|
||||
|
||||
if actual == rules['expect']:
|
||||
if actual == rules['expected']:
|
||||
return 1.0
|
||||
|
||||
# TODO: can use text embedding to get non-zero return
|
||||
|
||||
Reference in New Issue
Block a user