Merge branch 'main' into xiaochuanli/addChromeExtensions

This commit is contained in:
Tianbao Xie
2024-02-25 00:45:17 +08:00
committed by GitHub
152 changed files with 7174 additions and 420 deletions

View File

@@ -412,7 +412,7 @@ class SetupController:
except Exception as e:
if attempt < 14:
logger.error(f"Attempt {attempt + 1}: Failed to connect, retrying. Error: {e}")
time.sleep(1)
time.sleep(5)
else:
logger.error(f"Failed to connect after multiple attempts: {e}")
raise e
@@ -541,7 +541,7 @@ class SetupController:
except Exception as e:
if attempt < 14:
logger.error(f"Attempt {attempt + 1}: Failed to connect, retrying. Error: {e}")
time.sleep(1)
time.sleep(5)
else:
logger.error(f"Failed to connect after multiple attempts: {e}")
raise e
@@ -554,7 +554,10 @@ class SetupController:
if platform == 'googledrive':
url = 'https://drive.google.com/drive/my-drive'
page = context.new_page() # Create a new page (tab) within the existing context
page.goto(url)
try:
page.goto(url, timeout=60000)
except:
logger.warning("Opening %s exceeds time limit", url) # only for human test
logger.info(f"Opened new page: {url}")
settings = json.load(open(config['settings_file']))
email, password = settings['email'], settings['password']

View File

@@ -26,11 +26,19 @@ Getter = Callable[[gym.Env, Dict[str, Any]], Any]
def _execute_command(command: List[str]) -> None:
if command[:4] == ["vmrun", "-T", "ws", "start"]:
def _is_contained_in(a, b):
for v in set(a):
if a.count(v) > b.count(v):
return False
return True
# Specially handled for the `vmrun` command in Windows
if _is_contained_in(["vmrun", "-T", "ws", "start"], command):
p = subprocess.Popen(command)
p.wait()
else:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=60, text=True, encoding="utf-8")
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=60, text=True,
encoding="utf-8")
if result.returncode != 0:
raise Exception("\033[91m" + result.stdout + result.stderr + "\033[0m")
return result.stdout
@@ -49,7 +57,8 @@ class DesktopEnv(gym.Env):
task_config: Dict[str, Any] = None,
tmp_dir: str = "tmp",
cache_dir: str = "cache",
screen_size: Tuple[int] = (1920, 1080)
screen_size: Tuple[int] = (1920, 1080),
headless: bool = False
):
"""
Args:
@@ -75,6 +84,7 @@ class DesktopEnv(gym.Env):
self.tmp_dir_base: str = tmp_dir
self.cache_dir_base: str = cache_dir
self.vm_screen_size = screen_size
self.headless = headless
os.makedirs(self.tmp_dir_base, exist_ok=True)
@@ -116,7 +126,8 @@ class DesktopEnv(gym.Env):
break
else:
logger.info("Starting VM...")
_execute_command(["vmrun", "-T", "ws", "start", self.path_to_vm])
_execute_command(["vmrun", "-T", "ws", "start", self.path_to_vm]) if not self.headless \
else _execute_command(["vmrun", "-T", "ws", "start", self.path_to_vm, "nogui"])
time.sleep(3)
except subprocess.CalledProcessError as e:
logger.error(f"Error executing command: {e.output.decode().strip()}")
@@ -126,10 +137,11 @@ class DesktopEnv(gym.Env):
logger.info("Getting IP Address...")
for _ in range(max_retries):
try:
output = _execute_command(["vmrun", "-T", "ws", "getGuestIPAddress", self.path_to_vm]).strip()
output = _execute_command(["vmrun", "-T", "ws", "getGuestIPAddress", self.path_to_vm, "-wait"]).strip()
logger.info(f"IP address: {output}")
return output
except:
except Exception as e:
print(e)
time.sleep(5)
logger.info("Retrying...")
raise Exception("Failed to get VM IP address!")
@@ -172,14 +184,14 @@ class DesktopEnv(gym.Env):
# even if one of the metrics does not need expected or options field, it should be included in the list with None
self.evaluator = task_config["evaluator"]
self.metric: Metric = [getattr(metrics, func) for func in self.evaluator["func"]] \
if isinstance(self.evaluator["func"], list) \
else getattr(metrics, self.evaluator["func"])
if isinstance(self.evaluator["func"], list) \
else getattr(metrics, self.evaluator["func"])
self.metric_conj: str = self.evaluator.get("conj", "and") # take conjunction of multiple metrics
if "result" in self.evaluator:
self.result_getter: Getter = [getattr(getters, "get_{:}".format(res["type"])) for res in
self.evaluator["result"]] \
if isinstance(self.evaluator["result"], list) \
else getattr(getters, "get_{:}".format(self.evaluator["result"]["type"]))
self.evaluator["result"]] \
if isinstance(self.evaluator["result"], list) \
else getattr(getters, "get_{:}".format(self.evaluator["result"]["type"]))
else:
self.result_getter = [None] * len(self.metric) \
if isinstance(self.metric, list) \
@@ -299,11 +311,14 @@ class DesktopEnv(gym.Env):
self.setup_controller.setup(self.evaluator.get("postconfig", []))
if self.metric == "infeasible":
if self.action_history[-1] == "FAIL":
if self.evaluator['func'] == "infeasible":
if len(self.action_history) > 0 and self.action_history[-1] == "FAIL":
return 1
else:
return 0
else:
if len(self.action_history) > 0 and self.action_history[-1] == "FAIL":
return 0
if type(self.metric) == list:
results = []

View File

@@ -6,6 +6,7 @@ from .chrome import (
get_pdf_from_url,
get_shortcuts_on_desktop,
get_history,
get_page_info,
get_enabled_experiments,
get_chrome_language,
get_chrome_font_size,

View File

@@ -326,6 +326,36 @@ def get_extensions_installed_from_shop(env, config: Dict[str, str]):
# The following ones require Playwright to be installed on the target machine, and the chrome needs to be pre-config on
# port info to allow remote debugging, see README.md for details
def get_page_info(env, config: Dict[str, str]):
host = env.vm_ip
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
url = config["url"]
remote_debugging_url = f"http://{host}:{port}"
with sync_playwright() as p:
# connect to remote Chrome instance
browser = p.chromium.connect_over_cdp(remote_debugging_url)
page = browser.contexts[0].new_page()
page.goto(url)
try:
# Wait for the page to finish loading, this prevents the "execution context was destroyed" issue
page.wait_for_load_state('load') # Wait for the 'load' event to complete
title = page.title()
url = page.url
page_info = {'title': title, 'url': url, 'content': page.content()}
except TimeoutError:
# If page loading times out, catch the exception and store the current information in the list
page_info = {'title': 'Load timeout', 'url': page.url, 'content': page.content()}
except Exception as e:
# Catch other potential exceptions that might occur while reading the page title
print(f'Error: {e}')
page_info = {'title': 'Error encountered', 'url': page.url, 'content': page.content()}
browser.close()
return page_info
def get_open_tabs_info(env, config: Dict[str, str]):
host = env.vm_ip
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
@@ -551,9 +581,9 @@ def get_googledrive_file(env, config: Dict[str, Any]) -> str:
for q in _query:
search = f'( {q} ) and "{parent_id}" in parents'
filelist: GoogleDriveFileList = drive.ListFile({'q': search}).GetList()
if len(filelist) == 0: # target file not found
if len(filelist) == 0: # target file not found
return None
file: GoogleDriveFile = filelist[0] # HACK: if multiple candidates, just use the first one
file: GoogleDriveFile = filelist[0] # HACK: if multiple candidates, just use the first one
parent_id = file['id']
file.GetContentFile(_path, mimetype=file['mimeType'])
@@ -565,8 +595,9 @@ def get_googledrive_file(env, config: Dict[str, Any]) -> str:
if 'query' in config:
return get_single_file(config['query'], os.path.join(env.cache_dir, config['dest']))
elif 'path' in config:
query = [f"title = '{fp}' and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if idx < len(config['path']) - 1
else f"title = '{fp}' and trashed = false" for idx, fp in enumerate(config['path'])]
query = [f"title = '{fp}' and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if idx < len(
config['path']) - 1
else f"title = '{fp}' and trashed = false" for idx, fp in enumerate(config['path'])]
return get_single_file(query, os.path.join(env.cache_dir, config['dest']))
elif 'query_list' in config:
_path_list = []
@@ -575,12 +606,14 @@ def get_googledrive_file(env, config: Dict[str, Any]) -> str:
dest = config['dest'][idx]
_path_list.append(get_single_file(query, os.path.join(env.cache_dir, dest)))
return _path_list
else: # path_list in config
else: # path_list in config
_path_list = []
assert len(config['path_list']) == len(config['dest'])
for idx, path in enumerate(config['path_list']):
query = [f"title = '{fp}' and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if jdx < len(path) - 1
else f"title = '{fp}' and trashed = false" for jdx, fp in enumerate(path)]
query = [
f"title = '{fp}' and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if jdx < len(
path) - 1
else f"title = '{fp}' and trashed = false" for jdx, fp in enumerate(path)]
dest = config['dest'][idx]
_path_list.append(get_single_file(query, os.path.join(env.cache_dir, dest)))
return _path_list
@@ -596,12 +629,12 @@ def get_enable_do_not_track(env, config: Dict[str, str]):
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
'output'].strip()
elif os_type == 'Linux':
# preference_file_path = env.controller.execute_python_command(
# "import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
# 'output'].strip()
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
'output'].strip()
# preference_file_path = env.controller.execute_python_command(
# "import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
# 'output'].strip()
else:
raise Exception('Unsupported operating system')
@@ -609,11 +642,11 @@ def get_enable_do_not_track(env, config: Dict[str, str]):
content = env.controller.get_file(preference_file_path)
data = json.loads(content)
if_enable_do_not_track = data.get('enable_do_not_track', {}) # bool
if_enable_do_not_track = data.get('enable_do_not_track', {}) # bool
return "true" if if_enable_do_not_track else "false"
except Exception as e:
logger.error(f"Error: {e}")
return "Google"
return "false"
def get_enable_enhanced_safety_browsing(env, config: Dict[str, str]):
@@ -626,12 +659,12 @@ def get_enable_enhanced_safety_browsing(env, config: Dict[str, str]):
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
'output'].strip()
elif os_type == 'Linux':
# preference_file_path = env.controller.execute_python_command(
# "import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
# 'output'].strip()
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
'output'].strip()
# preference_file_path = env.controller.execute_python_command(
# "import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
# 'output'].strip()
else:
raise Exception('Unsupported operating system')
@@ -639,7 +672,7 @@ def get_enable_enhanced_safety_browsing(env, config: Dict[str, str]):
content = env.controller.get_file(preference_file_path)
data = json.loads(content)
if_enable_do_not_track = data.get('safebrowsing', {}).get('enhanced', {}) # bool
if_enable_do_not_track = data.get('safebrowsing', {}).get('enhanced', {}) # bool
return "true" if if_enable_do_not_track else "false"
except Exception as e:
logger.error(f"Error: {e}")
@@ -656,12 +689,12 @@ def get_new_startup_page(env, config: Dict[str, str]):
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
'output'].strip()
elif os_type == 'Linux':
# preference_file_path = env.controller.execute_python_command(
# "import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
# 'output'].strip()
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
'output'].strip()
# preference_file_path = env.controller.execute_python_command(
# "import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
# 'output'].strip()
else:
raise Exception('Unsupported operating system')
@@ -674,7 +707,7 @@ def get_new_startup_page(env, config: Dict[str, str]):
if "session" not in data.keys():
return "true"
else:
if_enable_do_not_track = data.get('session', {}).get('restore_on_startup', {}) # int, need to be 5
if_enable_do_not_track = data.get('session', {}).get('restore_on_startup', {}) # int, need to be 5
return "true" if if_enable_do_not_track == 5 else "false"
except Exception as e:
logger.error(f"Error: {e}")
@@ -691,12 +724,12 @@ def get_find_unpacked_extension_path(env, config: Dict[str, str]):
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
'output'].strip()
elif os_type == 'Linux':
# preference_file_path = env.controller.execute_python_command(
# "import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
# 'output'].strip()
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
'output'].strip()
# preference_file_path = env.controller.execute_python_command(
# "import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
# 'output'].strip()
else:
raise Exception('Unsupported operating system')

View File

@@ -17,7 +17,8 @@ from .chrome import (
check_history_deleted,
is_expected_search_query,
is_expected_active_tab,
is_expected_url_pattern_match
is_expected_url_pattern_match,
is_added_to_steam_cart
)
from .docs import (
compare_font_names,
@@ -123,3 +124,7 @@ from .vscode import (
check_json_settings,
check_json_keybindings
)
def infeasible():
pass

View File

@@ -1,8 +1,12 @@
import logging, re, os, shutil
import logging
import os
import re
import shutil
from typing import Any, Dict, List, Union
from bs4 import BeautifulSoup, Tag
import fitz # PyMuPDF
import rapidfuzz.fuzz as fuzz
from bs4 import BeautifulSoup, Tag
from desktop_env.evaluators.metrics.utils import are_lists_equal, compare_urls
@@ -17,7 +21,7 @@ def is_expected_active_tab(active_tab_info: Dict[str, str], rule: Dict[str, Any]
if match_type == "url":
expected_url = rule['url']
actual_url = active_tab_info['url']
actual_url = active_tab_info.get('url', None)
print("expected_url: {}".format(expected_url))
print("actual_url: {}".format(actual_url))
return 1 if compare_urls(expected_url, actual_url) else 0
@@ -46,6 +50,7 @@ def is_expected_url_pattern_match(result, rules) -> float:
return 0.
return 1.
def is_expected_tabs(open_tabs: List[Dict[str, str]], rule: Dict[str, Any]) -> float:
"""
Checks if the expected tabs are open in Chrome.
@@ -123,14 +128,14 @@ def compare_archive(pred_path: str, gold_path: str, **kwargs) -> float:
pred_folder = os.path.splitext(pred_path)[0] + '_pred'
gold_folder = os.path.splitext(gold_path)[0] + '_gold'
if os.path.exists(pred_folder): # remove existing folder for new predictions
if os.path.exists(pred_folder): # remove existing folder for new predictions
shutil.rmtree(pred_folder, ignore_errors=True)
os.makedirs(pred_folder)
shutil.unpack_archive(pred_path, pred_folder)
if not os.path.exists(gold_folder): # use cache if exists
if not os.path.exists(gold_folder): # use cache if exists
os.makedirs(gold_folder)
shutil.unpack_archive(gold_path, gold_folder)
pred_files = sorted(os.listdir(pred_folder))
gold_files = sorted(os.listdir(gold_folder))
if pred_files != gold_files: return 0.
@@ -140,7 +145,8 @@ def compare_archive(pred_path: str, gold_path: str, **kwargs) -> float:
if file_type == 'text':
from .vscode import compare_text_file
return compare_text_file
elif file_type == 'pdf': return compare_pdfs
elif file_type == 'pdf':
return compare_pdfs
elif file_type == 'docx':
from .docs import compare_docx_files
return compare_docx_files
@@ -162,7 +168,8 @@ def compare_archive(pred_path: str, gold_path: str, **kwargs) -> float:
elif file_type == 'video':
from .vlc import compare_videos
return compare_videos
else: raise ValueError('[ERROR]: not support file type: %s' % file_type)
else:
raise ValueError('[ERROR]: not support file type: %s' % file_type)
score = 0
compare_function = get_compare_function()
@@ -181,7 +188,7 @@ def compare_htmls(html_path1: str, html_path2: str) -> float:
soup1 = BeautifulSoup(inf, 'lxml')
with open(html_path2, 'r', encoding='utf-8') as inf:
soup2 = BeautifulSoup(inf, 'lxml')
def compare_elements(elem1, elem2):
if not (isinstance(elem1, Tag) and isinstance(elem2, Tag)):
return elem1 == elem2
@@ -273,3 +280,18 @@ def check_font_size(font_size, rule):
return 1. if rule['min'] < default_font_size < rule['max'] else 0.
else:
raise TypeError(f"{rule['type']} not support yet!")
def is_added_to_steam_cart(active_tab_info, rule):
"""
Check if the item is added to the Steam cart.
"""
items = rule['items']
content = active_tab_info['content']
for item in items:
if item not in content:
return 0.
return 1.

View File

@@ -55,7 +55,8 @@ def contains_page_break(docx_file):
return 0
def compare_docx_files(file1, file2, ignore_blanks=True):
def compare_docx_files(file1, file2, **options):
ignore_blanks = options.get('ignore_blanks', True)
def get_paragraph_texts_odt(document):
paragraphs = document.getElementsByType(P)
paragraph_texts = []
@@ -250,11 +251,12 @@ def check_tabstops(docx_file1, docx_file2, **kwargs) -> float:
splits = p1.text.split('\t')
if len(splits) == 0: return .0
words = list(filter(lambda x: x.strip(), re.split(r'\s', splits[index])))
if len(words) != number: return .0
if len(words) != number: return .0
section = doc2.sections[0]
paragraph_width = section.page_width - section.left_margin - section.right_margin
ignore_tabs = lambda x: x.alignment == WD_TAB_ALIGNMENT.CLEAR or (x.alignment == WD_TAB_ALIGNMENT.LEFT and x.position == 0)
ignore_tabs = lambda x: x.alignment == WD_TAB_ALIGNMENT.CLEAR or (
x.alignment == WD_TAB_ALIGNMENT.LEFT and x.position == 0)
minus = .0
for p1, p2 in zip(para1, para2):
# filter CLEAR tabstop and default left-0 tabstop
@@ -282,18 +284,6 @@ def compare_contains_image(docx_file1, docx_file2):
return 1
# file1 = 'path/to/file1.docx'
# file2 = 'path/to/file2.docx'
# print(are_docx_files_same(file1, file2))
# Replace 'your_document.docx' with the path to your document
# result = contains_page_break('your_document.docx')
# print(result)
# config_path = "/home/[username]/.config/libreoffice/4/user/registrymodifications.xcu"
# print(find_default_font("Ani", config_path))
def evaluate_colored_words_in_tables(file_path1, file_path2, **kwargs):
if not compare_docx_files(file_path1, file_path2):
return 0
@@ -317,9 +307,12 @@ def evaluate_colored_words_in_tables(file_path1, file_path2, **kwargs):
if word:
first_letter = word[0].lower()
if first_letter in 'aeiou' and _calculate_color_difference(run.font.color.rgb, RGBColor(255, 0, 0)) > threshold:
if first_letter in 'aeiou' and _calculate_color_difference(run.font.color.rgb,
RGBColor(255, 0, 0)) > threshold:
return 0 # Vowel-colored words should be red
elif first_letter not in 'aeiou' and _calculate_color_difference(run.font.color.rgb, RGBColor(0, 0, 255)) > threshold:
elif first_letter not in 'aeiou' and _calculate_color_difference(run.font.color.rgb,
RGBColor(0, 0,
255)) > threshold:
return 0 # Non-vowel-colored words should be blue
return 1 # All words in tables are correctly colored
@@ -533,4 +526,3 @@ def compare_highlighted_text(file1, file2):
return 1
else:
return 0

View File

@@ -146,9 +146,13 @@ def compare_pptx_files(file1_path, file2_path, **options):
examine_font_underline = options.get("examine_font_underline", True)
examine_strike_through = options.get("examine_strike_through", True)
examine_alignment = options.get("examine_alignment", True)
examine_bottom_position = options.get("examine_bottom_position", False)
examine_title_bottom_position = options.get("examine_title_bottom_position", False)
examine_table_bottom_position = options.get("examine_table_bottom_position", False)
examine_right_position = options.get("examine_right_position", False)
examine_image_size = options.get("examine_image_size", True)
examine_top_position = options.get("examine_top_position", False)
examine_shape_for_shift_size = options.get("examine_shape_for_shift_size", False)
examine_image_size = options.get("examine_image_size", False)
examine_modify_height = options.get("examine_modify_height", False)
examine_bullets = options.get("examine_bullets", True)
examine_background_color = options.get("examine_background_color", True)
examine_note = options.get("examine_note", True)
@@ -157,8 +161,10 @@ def compare_pptx_files(file1_path, file2_path, **options):
if len(prs1.slides) != len(prs2.slides) and examine_number_of_slides:
return 0
slide_idx = 0
# compare the content of each slide
for slide1, slide2 in zip(prs1.slides, prs2.slides):
slide_idx += 1
def get_slide_background_color(slide):
background = slide.background
if background.fill.background():
@@ -180,16 +186,37 @@ def compare_pptx_files(file1_path, file2_path, **options):
return 0
# check if the shapes are the same
for shape1, shape2 in zip(slide1.shapes, slide2.shapes):
if examine_bottom_position and shape1.top != shape2.top:
if hasattr(shape1, "text") and hasattr(shape2, "text") and shape1.text == shape2.text and shape1.text == "Product Comparison":
if shape1.top >= shape2.top:
if examine_title_bottom_position:
if hasattr(shape1, "text") and hasattr(shape2, "text") and shape1.text == shape2.text:
if shape1.text == "Product Comparison" and (shape1.top <= shape2.top or shape1.top < 3600000):
return 0
elif shape1.left != shape2.left or shape1.top != shape2.top or shape1.width != shape2.width or shape1.height != shape2.height:
return 0
if examine_table_bottom_position:
if slide_idx == 3 and shape1.shape_type == 19 and shape2.shape_type == 19:
if shape1.top <= shape2.top or shape1.top < 3600000:
return 0
elif shape1.left != shape2.left or shape1.top != shape2.top or shape1.width != shape2.width or shape1.height != shape2.height:
return 0
if examine_right_position and shape1.left != shape2.left:
if not hasattr(shape1, "text") and not hasattr(shape2, "text"):
if shape1.left >= shape2.left:
if examine_right_position:
if slide_idx == 2 and not hasattr(shape1, "text") and not hasattr(shape2, "text"):
if shape1.left <= shape2.left or shape1.left < 4320000:
return 0
if examine_top_position:
if slide_idx == 2 and shape1.shape_type == 13 and shape2.shape_type == 13:
if shape1.top >= shape2.top or shape1.top > 1980000:
return 0
elif shape1.left != shape2.left or shape1.top != shape2.top or shape1.width != shape2.width or shape1.height != shape2.height:
return 0
if examine_shape_for_shift_size:
if shape1.left != shape2.left or shape1.top != shape2.top or shape1.width != shape2.width or shape1.height != shape2.height:
if not (hasattr(shape1, "text") and hasattr(shape2, "text") and shape1.text == shape2.text and shape1.text == "Elaborate on what you want to discuss."):
return 0
if (shape1.left != shape2.left or shape1.top != shape2.top or shape1.width != shape2.width or shape1.height != shape2.height) and examine_shape:
return 0
@@ -199,6 +226,13 @@ def compare_pptx_files(file1_path, file2_path, **options):
return 0
elif shape1.left != shape2.left or shape1.top != shape2.top or shape1.width != shape2.width or shape1.height != shape2.height:
return 0
if examine_modify_height:
if not hasattr(shape1, "text") and not hasattr(shape2, "text") or shape1.shape_type == 5 and shape2.shape_type == 5:
if shape1.height != shape2.height:
return 0
elif shape1.left != shape2.left or shape1.top != shape2.top or shape1.width != shape2.width or shape1.height != shape2.height:
return 0
if hasattr(shape1, "text") and hasattr(shape2, "text"):
if shape1.text != shape2.text and examine_text:

View File

@@ -16,7 +16,8 @@ from openpyxl.worksheet.datavalidation import DataValidation
from openpyxl.worksheet.worksheet import Worksheet
from .utils import _match_value_to_rule, _read_cell_style, read_cell_value
from .utils import load_charts, load_sparklines, load_rows_or_cols, load_xlsx_styles
from .utils import load_charts, load_sparklines, load_rows_or_cols, load_xlsx_styles\
, load_filters, load_pivot_tables
# from openpyxl.utils import coordinate_to_tuple
@@ -116,7 +117,7 @@ def compare_table(result: str, expected: str = None, **options) -> float:
pdworkbooke = None
worksheete_names: List[str] = None
parse_idx: Callable[[Union[str, int], BOOK, BOOK], BOOK] = \
parse_idx: Callable[[Union[str, int], BOOK, BOOK], Tuple[BOOK, str]] = \
functools.partial(
_parse_sheet_idx,
result_sheet_names=worksheetr_names,
@@ -135,10 +136,18 @@ def compare_table(result: str, expected: str = None, **options) -> float:
# Compare Sheet Data by Internal Value {{{ #
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# sheet_idx1: as sheet_idx0
# precision: int as number of decimal digits, default to 4
sheet1: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx0"], pdworkbookr, pdworkbooke))
sheet2: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx1"], pdworkbookr, pdworkbooke))
error_limit: int = r.get("precision", 4)
sheet1: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx0"], pdworkbookr, pdworkbooke)).round(error_limit)
sheet2: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx1"], pdworkbookr, pdworkbooke)).round(error_limit)
metric: bool = sheet1.equals(sheet2)
logger.debug("Sheet1: \n%s", str(sheet1))
logger.debug("Sheet2: \n%s", str(sheet2))
try:
logger.debug("Sheet1 =v= Sheet2: \n%s", str(sheet1==sheet2))
except:
logger.debug("Sheet1 =/v= Sheet2")
logger.debug("Assertion: %s =v= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
# }}} Compare Sheet Data by Internal Value #
@@ -186,8 +195,13 @@ def compare_table(result: str, expected: str = None, **options) -> float:
# sheet_idx1: as sheet_idx0
# props: list of str indicating concerned styles, see utils._read_cell_style
styles1: Dict[str, List[Any]] = load_xlsx_styles(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r)
styles2: Dict[str, List[Any]] = load_xlsx_styles(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r)
sheet_idx1: Tuple[Book, str] = parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke)
book_name1: str = parse_idx(r["sheet_idx0"], result, expected)[0]
styles1: Dict[str, List[Any]] = load_xlsx_styles(*sheet_idx1, book_name1, **r)
sheet_idx2: Tuple[Book, str] = parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke)
book_name2: str = parse_idx(r["sheet_idx1"], result, expected)[0]
styles2: Dict[str, List[Any]] = load_xlsx_styles(*sheet_idx2, book_name2, **r)
# number_formats1: List[str] = [c.number_format.lower() for col in sheet1.iter_cols() for c in col if c.value is not None and c.data_type=="n"]
# number_formats2: List[str] = [c.number_format.lower() for col in sheet2.iter_cols() for c in col if c.value is not None and c.data_type=="n"]
metric: bool = styles1 == styles2
@@ -303,6 +317,29 @@ def compare_table(result: str, expected: str = None, **options) -> float:
logger.debug("Assertion: %s[cols] == %s[cols] - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
# }}} Check Row Properties #
elif r["type"] == "filter":
# Compare Filters {{{ #
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# sheet_idx1: as sheet_idx0
filters1: Dict[str, Any] = load_filters(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r)
filters2: Dict[str, Any] = load_filters(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r)
metric: bool = filters1==filters2
logger.debug("Assertion: %s[filter] == %s[filter] - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
# }}} Compare Filters #
elif r["type"] == "pivot_table":
# Compare Pivot Tables {{{ #
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# sheet_idx1: as sheet_idx0
# pivot_props: list of str, see utils.load_pivot_tables
pivots1: Dict[str, Any] = load_pivot_tables(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r)
pivots2: Dict[str, Any] = load_pivot_tables(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r)
metric: bool = pivots1==pivots2
logger.debug("Assertion: %s[pivot]==%s[pivot] - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
# }}} Compare Pivot Tables #
elif r["type"] == "check_cell":
# Check Cell Properties {{{ #
# sheet_idx: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"

View File

@@ -1,42 +1,47 @@
import builtins
import functools
import itertools
import logging
import operator
import re
import zipfile
from typing import Any, TypeVar, Union, Iterable, Optional, Callable
from typing import Dict, List, Set, Match
from typing import Dict, List, Set, Match, Tuple, Pattern
from urllib.parse import urlparse, urlunparse
import re
import functools
import operator
import builtins
import formulas
import lxml.cssselect
import lxml.etree
import openpyxl
import xmltodict
from lxml.etree import _Element
from openpyxl import Workbook
from openpyxl.chart._chart import ChartBase
from openpyxl.worksheet.worksheet import Worksheet
from openpyxl.worksheet.cell_range import MultiCellRange
from openpyxl.worksheet.dimensions import DimensionHolder
from openpyxl.formatting.formatting import ConditionalFormattingList
#from openpyxl.utils import get_column_letter
from openpyxl.cell.cell import Cell
from openpyxl.chart._chart import ChartBase
from openpyxl.formatting.formatting import ConditionalFormattingList
from openpyxl.pivot.cache import CacheSource as PivotCacheSource
from openpyxl.pivot.table import TableDefinition as PivotTableDefinition
from openpyxl.styles.differential import DifferentialStyle
import formulas
from openpyxl.utils import coordinate_to_tuple, get_column_letter
from openpyxl.worksheet.cell_range import MultiCellRange, CellRange
from openpyxl.worksheet.dimensions import DimensionHolder
from openpyxl.worksheet.filters import AutoFilter, SortState
from openpyxl.worksheet.worksheet import Worksheet
V = TypeVar("Value")
logger = logging.getLogger("desktopenv.metrics.utils")
_xlsx_namespaces = [ ("oo", "http://schemas.openxmlformats.org/spreadsheetml/2006/main")
, ("x14", "http://schemas.microsoft.com/office/spreadsheetml/2009/9/main")
, ("xm", "http://schemas.microsoft.com/office/excel/2006/main")
]
_xlsx_namespaces = [("oo", "http://schemas.openxmlformats.org/spreadsheetml/2006/main")
, ("x14", "http://schemas.microsoft.com/office/spreadsheetml/2009/9/main")
, ("xm", "http://schemas.microsoft.com/office/excel/2006/main")
]
_xlsx_ns_mapping = dict(_xlsx_namespaces)
_xlsx_ns_imapping = dict(map(lambda itm: (itm[1], itm[0]), _xlsx_namespaces))
_xlsx_ns_imapping["http://schemas.openxmlformats.org/spreadsheetml/2006/main"] = None
_sheet_name_selector = lxml.cssselect.CSSSelector("oo|sheets>oo|sheet", namespaces=_xlsx_ns_mapping)
_sparklines_selector = lxml.cssselect.CSSSelector("x14|sparkline", namespaces=_xlsx_ns_mapping)
def load_sparklines(xlsx_file: str, sheet_name: str) -> Dict[str, str]:
# function load_sparklines {{{ #
"""
@@ -79,6 +84,7 @@ def load_sparklines(xlsx_file: str, sheet_name: str) -> Dict[str, str]:
# Available Chart Properties:
# title: str
# anchor: ["oneCell" | "twoCell" | "absolute", col0, row0, col1, row1]
# legend: "b" | "tr" | "l" | "r" | "t"
# width: number
# height: number
# type: "scatterChart" | "lineChart" | "barChart"
@@ -103,7 +109,10 @@ def load_charts(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[str, An
"""
# workbook: Workbook = openpyxl.load_workbook(filename=xlsx_file)
worksheet: Worksheet = xlsx_file[sheet_name]
try:
worksheet: Worksheet = xlsx_file[sheet_name]
except KeyError:
return {}
charts: List[ChartBase] = worksheet._charts
chart_set: Dict[str, Any] = {}
@@ -111,29 +120,27 @@ def load_charts(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[str, An
for ch in charts:
series: List[str] = []
for ser in ch.series:
value_num = ser.val.numRef.f \
if hasattr(ser.val, "numRef") and hasattr(ser.val.numRef, "f") \
else ""
value_str = ser.val.strRef.f \
if hasattr(ser.val, "strRef") and hasattr(ser.val.strRef, "f") \
else ""
categ_num = ser.cat.numRef.f \
if hasattr(ser.cat, "numRef") and hasattr(ser.cat.numRef, "f") \
else ""
categ_str = ser.cat.strRef.f \
if hasattr(ser.cat, "strRef") and hasattr(ser.cat.strRef, "f") \
else ""
series.append("{:},{:},{:},{:}".format(value_num, value_str
, categ_num, categ_str
)
)
if hasattr(ser.val, "numRef") and hasattr(ser.val.numRef, "f"):
value_str: str = ser.val.numRef.f
elif hasattr(ser.val, "strRef") and hasattr(ser.val.strRef, "f"):
value_str: str = ser.val.strRef.f
if hasattr(ser.cat, "numRef") and hasattr(ser.cat.numRef, "f"):
categ_str: str = ser.cat.numRef.f
elif hasattr(ser.cat, "strRef") and hasattr(ser.cat.strRef, "f"):
categ_str: str = ser.cat.strRef.f
series.append("{:},{:}".format(value_str, categ_str))
series: str = ";".join(series)
# TODO: maybe more aspects, like chart type
info: Dict[str, Any] = {}
if "title" in chart_props:
info["title"] = ch.title.tx.rich.p[0].r[0].t
try:
info["title"] = ch.title.tx.rich.p[0].r[0].t
except:
info["title"] = None
if "legend" in chart_props:
info["legend"] = ch.legend.position if ch.legend is not None else None
if "anchor" in chart_props:
info["anchor"] = [ch.anchor.editAs
, ch.anchor._from.col, ch.anchor.to.row
@@ -149,16 +156,123 @@ def load_charts(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[str, An
info["direction"] = ch.barDir
if "xtitle" in chart_props:
info["xtitle"] = ch.x_axis.title.tx.rich.p[0].r[0].t
try:
info["xtitle"] = ch.x_axis.title.tx.rich.p[0].r[0].t
except:
info["xtitle"] = None
if "ytitle" in chart_props:
info["ytitle"] = ch.y_axis.title.tx.rich.p[0].r[0].t
try:
info["ytitle"] = ch.y_axis.title.tx.rich.p[0].r[0].t
except:
info["ytitle"] = None
if "ztitle" in chart_props:
info["ztitle"] = ch.z_axis.title.tx.rich.p[0].r[0].t
try:
info["ztitle"] = ch.z_axis.title.tx.rich.p[0].r[0].t
except:
info["ztitle"] = None
chart_set[series] = info
logger.debug(".[%s].charts: %s", sheet_name, repr(chart_set))
return chart_set
# }}} function load_charts #
# Available Pivot Properties:
# name: str
# show_total, show_empty_row, show_empty_col, show_headers: bool
# location: str
# selection: if the concrete item selection should be checked, a list of set of tuple like (bool, index) will be returned; list will be returned instead of set if "ordered" is specified
# filter: if the filter fields should be checked; fields indices will be return in `filter_fields` item
# col_fields: indices
# row_fields: indices
# data_fields: list of str representations. the str representation is like "index;name;subtotal_type;show_data_as"; name is optional and is only returned when `data_fields_name` is specified in `pivot_props`
def load_pivot_tables(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[str, Any]:
# function load_pivot_tables {{{ #
"""
Args:
xlsx_file (Workbook): concerned excel book
sheet_name (str): sheet name
options (Dict[str, List[str]]): dict like {"pivot_props": list of str}
giving the concerned pivot properties
Returns:
Dict[str, Any]: information of pivot tables, dict like
{
<str representing data source>: {
<str as property>: anything
}
}
"""
try:
worksheet: Worksheet = xlsx_file[sheet_name]
except KeyError:
return {}
pivots: List[PivotTableDefinition] = worksheet._pivots
pivot_set: Dict[str, Any] = {}
pivot_props: Set[str] = set(options.get("pivot_props", []))
for pvt in pivots:
raw_selection: List[List[tuple[Optional[bool], int]]] = \
[[(itm.h, itm.x) for itm in f.items if itm.x is not None] \
for f in pvt.pivotFields
]
raw__selection: List[List[tuple[Optional[bool], int]]] = list(
itertools.dropwhile(lambda r: len(r) == 0, raw_selection))
left_bias = len(raw_selection) - len(raw__selection)
selection: List[List[tuple[Optional[bool], int]]] = list(
(itertools.dropwhile(lambda r: len(r) == 0, reversed(raw__selection))))[::-1]
right_bias = len(raw__selection) - len(selection)
cache_source: PivotCacheSource = pvt.cache.cacheSource
cell_range1: str
cell_range2: str
cell_range1, cell_range2 = cache_source.worksheetSource.ref.split(":")
cell_range1: Tuple[int, int] = coordinate_to_tuple(cell_range1)
cell_range1 = (cell_range1[0], cell_range1[1] + left_bias)
cell_range2: Tuple[int, int] = coordinate_to_tuple(cell_range2)
cell_range2 = (cell_range2[0], cell_range2[1] - right_bias)
source: str = "{:};{:}:{:};{:}".format(cache_source.type, cell_range1, cell_range2,
cache_source.worksheetSource.sheet)
info: Dict[str, Any] = {}
if "name" in pivot_props:
info["name"] = pvt.name
if "show_total" in pivot_props:
info["show_total"] = pvt.visualTotals
if "show_empty_row" in pivot_props:
info["show_empty_row"] = pvt.showEmptyRow
if "show_empty_col" in pivot_props:
info["show_empty_col"] = pvt.showEmptyCol
if "show_headers" in pivot_props:
info["show_headers"] = pvt.showHeaders
if "location" in pivot_props:
info["location"] = pvt.location
if "filter" in pivot_props or "selection" in pivot_props:
info["selection"] = selection if "ordered" in pivot_props else list(set(r) for r in selection)
if "filter" in pivot_props:
info["filter_fields"] = set(f.fld for f in pvt.pageFields)
if "col_fields" in pivot_props:
info["col_fields"] = [f.x - left_bias for f in pvt.colFields]
if "row_fields" in pivot_props:
info["row_fields"] = [f.x - left_bias for f in pvt.rowFields]
if "data_fields" in pivot_props:
info["data_fields"] = [
"{:d};{:};{:};{:}".format(f.fld - left_bias, f.name if "data_fields_name" in pivot_props else ""
, f.subtotal, f.showDataAs
) \
for f in pvt.dataFields
]
pivot_set[source] = info
logger.debug(".[%s].pivots: %s", sheet_name, repr(pivot_set))
return pivot_set
# }}} function load_pivot_tables #
_shared_str_selector = lxml.cssselect.CSSSelector("oo|sst>oo|si>oo|t", namespaces=_xlsx_ns_mapping)
def read_cell_value(xlsx_file: str, sheet_name: str, coordinate: str) -> Any:
# read_cell_value {{{ #
try:
@@ -178,20 +292,20 @@ def read_cell_value(xlsx_file: str, sheet_name: str, coordinate: str) -> Any:
with z_f.open("xl/worksheets/sheet{:}.xml".format(sheet_names[sheet_name])) as f:
sheet: _Element = lxml.etree.fromstring(f.read())
cells: List[_Element] =\
lxml.cssselect.CSSSelector( 'oo|row>oo|c[r="{:}"]'.format(coordinate)
, namespaces=_xlsx_ns_mapping
)(sheet)
if len(cells)==0:
cells: List[_Element] = \
lxml.cssselect.CSSSelector('oo|row>oo|c[r="{:}"]'.format(coordinate)
, namespaces=_xlsx_ns_mapping
)(sheet)
if len(cells) == 0:
return None
cell: _Element = cells[0]
except zipfile.BadZipFile:
return None
cell: Dict[str, str] = xmltodict.parse( lxml.etree.tostring(cell, encoding="unicode")
, process_namespaces=True
, namespaces=_xlsx_ns_imapping
)
cell: Dict[str, str] = xmltodict.parse(lxml.etree.tostring(cell, encoding="unicode")
, process_namespaces=True
, namespaces=_xlsx_ns_imapping
)
logger.debug("%s.%s[%s]: %s", xlsx_file, sheet_name, coordinate, repr(cell))
if "@t" not in cell["c"]:
return None
@@ -203,6 +317,7 @@ def read_cell_value(xlsx_file: str, sheet_name: str, coordinate: str) -> Any:
return cell["c"]["v"]
# }}} read_cell_value #
# Supported Styles:
# number_format
# font_name - str
@@ -210,38 +325,67 @@ def read_cell_value(xlsx_file: str, sheet_name: str, coordinate: str) -> Any:
# font_color - in aRGB, e.g., FF000000 is black
# font_bold - bool
# font_italic - bool
# font_underline - "single" | "double" | "singleAccounting" | "doubleAccounting"
# font_size - float
# fill_type - "patternFill" | "gradientFill"
# bgcolor - in aRGB, e.g., FFFF0000 is red
# fgcolor - in aRGB, e.g., FF00FFFF is yellow
# hyperlink - str
def _read_cell_style(style_name: str, cell: Cell, diff_style: Optional[DifferentialStyle] = None) -> Any:
if style_name=="number_format":
return (cell.number_format if diff_style is None else diff_style.numFmt.formatCode)\
if cell.value is not None and cell.data_type=="n" else None
elif style_name=="font_name":
if style_name == "number_format":
return (cell.number_format if diff_style is None else diff_style.numFmt.formatCode) \
if cell.value is not None and cell.data_type == "n" else None
elif style_name == "font_name":
return (diff_style or cell).font.name if cell.value is not None else None
elif style_name=="font_family":
elif style_name == "font_family":
return (diff_style or cell).font.family if cell.value is not None else None
elif style_name=="font_color":
elif style_name == "font_color":
return (diff_style or cell).font.color.rgb if cell.value is not None else None
elif style_name=="font_bold":
elif style_name == "font_bold":
return (diff_style or cell).font.bold if cell.value is not None else None
elif style_name=="font_italic":
elif style_name == "font_italic":
return (diff_style or cell).font.italic if cell.value is not None else None
elif style_name=="fill_type":
return (diff_style or cell).fill.tagname
elif style_name=="bgcolor":
return (diff_style or cell).fill.bgColor.rgb
elif style_name=="fgcolor":
return (diff_style or cell).fill.fgColor.rgb
elif style_name == "font_underline":
return (diff_style or cell).font.underline if cell.value is not None else None
elif style_name == "font_size":
return (diff_style or cell).font.size if cell.value is not None else None
elif style_name == "fill_type":
try:
return (diff_style or cell).fill.tagname
except:
return None
elif style_name == "bgcolor":
try:
return (diff_style or cell).fill.bgColor.rgb
except:
return None
elif style_name == "fgcolor":
try:
return (diff_style or cell).fill.fgColor.rgb
except:
return None
elif style_name == "hyperlink":
return cell.hyperlink or "" if cell.value is not None else None
else:
raise NotImplementedError("Unsupported Style: {:}".format(style_name))
def load_xlsx_styles(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[str, List[Any]]:
_absolute_range_pattern: Pattern[str] = re.compile(r"""\$(?P<col1>[A-Z]{1,3})\$(?P<row1>\d+) # coord1
(?::
\$(?P<col2>[A-Z]{1,3})\$(?P<row2>\d+) # coord2
)?
"""
, re.X
)
def load_xlsx_styles(xlsx_file: Workbook, sheet_name: str, book_name: str, **options) -> Dict[str, List[Any]]:
# function load_xlsx_styles {{{ #
"""
Args:
xlsx_file (Workbook): concerned excel book
sheet_name (str): sheet name
book_name (str): book name
options (Dict[str, List[str]): dick like {"props": list of str} giving
the concerned styles
@@ -253,7 +397,10 @@ def load_xlsx_styles(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[st
}
"""
worksheet: Worksheet = xlsx_file[sheet_name]
try:
worksheet: Worksheet = xlsx_file[sheet_name]
except KeyError:
return {}
style_dict: Dict[str, List[Any]] = {}
concerned_styles: List[str] = options.get("props", [])
@@ -274,10 +421,35 @@ def load_xlsx_styles(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[st
active_cells: List[Cell] = []
if r.type == "expression":
condition: Callable[[str], bool] = formula_parser.ast("=" + r.formula[0])[1].compile()
logger.debug("Expression condition: %s", r.formula[0])
arguments: List[Any] = []
absolute_range_match: List[Tuple[str, str, str, str]] = _absolute_range_pattern.findall(r.formula[0])
for m in absolute_range_match:
logger.debug("Absolute ranges: %s", repr(m))
if m[2] is None and m[3] is None:
arguments.append(read_cell_value(book_name, sheet_name, coordinate="{:}{:}".format(m[0], m[1])))
else:
arguments.append([read_cell_value(book_name, sheet_name
, coordinate="{:}{:}".format(get_column_letter(c[1])
, c[0]
)
) \
for c in CellRange("{:}{:}:{:}{:}".format(m[0], m[1], m[2], m[3])).cells \
]
)
logger.debug("Absolute range arguments: %s", repr(arguments))
for rge in fmt.cells:
for c in rge.cells:
cell: Cell = worksheet.cell(row=c[0], column=c[1])
if condition(str(cell.value)):
cell_value = read_cell_value(book_name, sheet_name
, coordinate="{:}{:d}".format(get_column_letter(c[1])
, c[0]
)
)
if condition(cell_value, *arguments):
logger.debug("Active Cell %s(%s) for %s", repr(cell), str(cell_value), r.formula[0])
active_cells.append(cell)
else:
raise NotImplementedError("Not Implemented Condition Type: {:}".format(r.type))
@@ -285,9 +457,11 @@ def load_xlsx_styles(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[st
for c in active_cells:
style_dict[c.coordinate] = [_read_cell_style(st, c, r.dxf) for st in concerned_styles]
logger.debug(".[%s].styles: %s", sheet_name, repr(style_dict))
return style_dict
# }}} function load_xlsx_styles #
# Available Row Properties:
# hidden
# collapsed
@@ -300,7 +474,7 @@ def load_xlsx_styles(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[st
# collapsed
# min
# max
def load_rows_or_cols(xlsx_file: Workbook, sheet_name: str, **options)\
def load_rows_or_cols(xlsx_file: Workbook, sheet_name: str, **options) \
-> Dict[Union[int, str], Dict[str, Any]]:
# function load_rows_or_cols {{{ #
"""
@@ -315,7 +489,10 @@ def load_rows_or_cols(xlsx_file: Workbook, sheet_name: str, **options)\
Dict[Union[int, str], Dict[str, Any]]: row/column information
"""
worksheet: Worksheet = xlsx_file[sheet_name]
try:
worksheet: Worksheet = xlsx_file[sheet_name]
except KeyError:
return {}
objs: DimensionHolder = getattr(worksheet, "{:}_dimensions".format(options["obj"]))
obj_set: Dict[int, Any] = {}
@@ -328,11 +505,74 @@ def load_rows_or_cols(xlsx_file: Workbook, sheet_name: str, **options)\
return obj_set
# }}} function load_rows_or_cols #
def load_filters(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[str, Any]:
# function load_filters {{{ #
try:
worksheet: Worksheet = xlsx_file[sheet_name]
except KeyError:
return {}
filters: AutoFilter = worksheet.auto_filter
filter_dict: Dict[str, Any] = {}
filter_dict["ref"] = filters.ref
# filterColumn
filter_column_set: List[Dict[str, Any]] = []
for flt_clm in filters.filterColumn:
filter_column: Dict[str, Any] = {}
filter_column["col_id"] = flt_clm.colId
filter_column["hidden_button"] = flt_clm.hiddenButton
filter_column["show_button"] = flt_clm.showButton
if flt_clm.filters is not None:
filter_column["filters_blank"] = flt_clm.filters.blank
filter_column["filters"] = set(flt_clm.filters.filter)
if flt_clm.customFilters is not None:
filter_column["custom_filters_op"] = flt_clm.customFilters._and
filter_column["custom_filters"] = set((flt.operator
, flt.val
) \
for flt in flt_clm.customFilters.customFilter
)
filter_column_set.append(filter_column)
filter_column_set = list(sorted(filter_column_set
, key=(lambda d: d["col_id"])
)
)
filter_dict["filter_column"] = filter_column_set
# sortState
sort_state: Optional[SortState] = filters.sortState
if sort_state is not None:
sort_state_dict: Dict[str, Any] = {}
sort_state_dict["sort"] = sort_state.columnSort
sort_state_dict["case"] = sort_state.caseSensitive
sort_state_dict["method"] = sort_state.sortMethod
sort_state_dict["ref"] = sort_state.ref
sort_state_dict["condition"] = list({"descending": cdt.descending
, "key": cdt.sortBy
, "ref": cdt.ref
, "custom_list": cdt.customList
, "dxf_id": cdt.dxfId
, "icon": cdt.iconSet
, "iconid": cdt.iconId
} \
for cdt in sort_state.sortCondition
)
filter_dict["sort_state"] = sort_state_dict
return filter_dict
# }}} function load_filters #
def _match_record(pattern: Dict[str, Any], item: Dict[str, Any]) -> bool:
return all(k in item and item[k] == val for k, val in pattern.items())
def _multicellrange_containsby(subset_candidate: MultiCellRange, superset_candidate: MultiCellRange) -> bool:
return all(r in superset_candidate for r in subset_candidate)
def _match_value_to_rule(value: V, rule: Dict[str, Union[str, V]]) -> bool:
"""
Args:
@@ -355,10 +595,10 @@ def _match_value_to_rule(value: V, rule: Dict[str, Union[str, V]]) -> bool:
match_: Optional[Match[str]] = re.search(rule["ref"], value, flag)
return match_ is not None
if rule["method"] in { "eq", "ne"
, "le", "lt"
, "ge", "gt"
}:
if rule["method"] in {"eq", "ne"
, "le", "lt"
, "ge", "gt"
}:
return getattr(operator, rule["method"])(value, rule["ref"])
if rule["method"].startswith("approx"):
threshold: float = float(rule["method"].split(":")[1])
@@ -368,26 +608,27 @@ def _match_value_to_rule(value: V, rule: Dict[str, Union[str, V]]) -> bool:
except (ValueError, TypeError):
return False
else:
return abs(value-rule["ref"])<=threshold
return abs(value - rule["ref"]) <= threshold
if rule["method"] == "spreadsheet_range":
subset_limit = MultiCellRange(rule["ref"][0])
superset_limit = MultiCellRange(rule["ref"][1])
return _multicellrange_containsby(subset_limit, value)\
and _multicellrange_containsby(value, superset_limit)
if rule["method"].startswith("range."): # e.g., range.te [0, 2] -> 0 < x <= 2
return _multicellrange_containsby(subset_limit, value) \
and _multicellrange_containsby(value, superset_limit)
if rule["method"].startswith("range."): # e.g., range.te [0, 2] -> 0 < x <= 2
left_et = rule["method"][6]
right_et = rule["method"][7]
return getattr(operator, "l" + left_et)(rule["ref"][0], value)\
and getattr(operator, "l" + right_et)(value, rule["ref"][1])
return getattr(operator, "l" + left_et)(rule["ref"][0], value) \
and getattr(operator, "l" + right_et)(value, rule["ref"][1])
if rule["method"] in {"str_list_eq", "str_set_eq"}:
container_type_str: str = rule["method"][4:-3]
container_type = getattr(builtins, container_type_str)
value: container_type = container_type(value.strip("\"'").split(","))
ref: container_type = container_type(rule["ref"])
return value==ref
return value == ref
raise NotImplementedError()
def are_lists_equal(list1, list2, comparison_func):
# First check if both lists have the same length
if len(list1) != len(list2):
@@ -404,6 +645,9 @@ def are_lists_equal(list1, list2, comparison_func):
def compare_urls(url1, url2):
if url1 is None or url2 is None:
return url1 == url2
def normalize_url(url):
# Parse the URL
parsed_url = urlparse(url)
@@ -428,42 +672,3 @@ def compare_urls(url1, url2):
# Compare the normalized URLs
return norm_url1 == norm_url2
if __name__ == "__main__":
path1 = "../../../../../任务数据/LibreOffice Calc/Create_column_charts_using_statistics_gold_line_scatter.xlsx"
workbook1: Workbook = openpyxl.load_workbook(filename=path1)
worksheet1: Worksheet = workbook1.active
charts: List[ChartBase] = worksheet1._charts
# print(len(charts))
# print(type(charts[0]))
#
# print(len(charts[0].series))
# print(type(charts[0].series[0]))
# print(type(charts[0].series[0].val))
##print(charts[0].series[0].val)
# print(charts[0].series[0].val.numRef.f)
#
# print(type(charts[0].series[0].cat))
##print(charts[0].series[0].cat)
# print(charts[0].series[0].cat.numRef)
# print(charts[0].series[0].cat.strRef)
# print(charts[0].series[0].cat.strRef.f)
# print(type(charts[0].title.tx.strRef))
# print(type(charts[0].title.tx.rich))
# print(type(charts[0].title.txPr))
# print(len(charts[0].title.tx.rich.p))
# print(len(charts[0].title.tx.rich.p[0].r))
# print(type(charts[0].title.tx.rich.p[0].r[0]))
# print(type(charts[0].title.tx.rich.p[0].r[0].t))
# print(charts[0].title.tx.rich.p[0].r[0].t)
# print(type(charts[0].anchor))
# print(charts[0].anchor.editAs)
# print(charts[0].anchor._from.col, charts[0].anchor.to.row)
# print(charts[0].anchor.to.col, charts[0].anchor.to.row)
# df1 = pd.read_excel(path1)
# print(df1)
print(load_charts(path1, chart_props=["title", "xtitle", "ytitle", "type"]))

View File

@@ -33,7 +33,7 @@ def check_json_keybindings(actual: str, expected: str, **options) -> float:
break
else:
return 0.0
expected = expected['expect']
expected = expected['expected']
if expected in data:
return 1.0
else:
@@ -55,7 +55,7 @@ def check_json_settings(actual: str, expected: str, **options) -> float:
with open(actual, 'r') as f:
data = json.load(f)
expect = expected['expect']
expect = expected['expected']
data_copy = copy.deepcopy(data)
data_copy.update(expect)
if data == data_copy:
@@ -93,7 +93,7 @@ def compare_config(actual: str, rules: Dict, **options) -> float:
with open(actual) as f1:
actual_text = f1.read()
if actual_text == rules['expect']:
if actual_text == rules['expected']:
return 1.0
return 0.0
@@ -110,7 +110,7 @@ def compare_answer(actual: str, rules: Dict, **options) -> float:
if not actual:
return 0.
if actual == rules['expect']:
if actual == rules['expected']:
return 1.0
# TODO: can use text embedding to get non-zero return

View File

@@ -387,12 +387,15 @@ def _create_atspi_node(node: Accessible, depth: int = 0, flag: Optional[str] = N
index_base += MAXIMUN_COLUMN
return xml_node
else:
for i, ch in enumerate(node):
# HYPERPARAMETER
if i>=1025:
logger.warning("Max width reached")
break
xml_node.append(_create_atspi_node(ch, depth+1, flag))
try:
for i, ch in enumerate(node):
# HYPERPARAMETER
if i>=1025:
logger.warning("Max width reached")
break
xml_node.append(_create_atspi_node(ch, depth+1, flag))
except:
logger.warning("Error occurred during children traversing. Has Ignored. Node: %s", lxml.etree.tostring(xml_node, encoding="unicode"))
return xml_node
# }}} function _create_atspi_node #