load libreoffice writer eval -batch 2

This commit is contained in:
tsuky_chen
2024-01-26 02:15:42 +08:00
99 changed files with 4318 additions and 109 deletions

View File

@@ -1,10 +1,13 @@
from .chrome import get_default_search_engine, get_cookie_data, get_bookmarks, get_open_tabs_info, get_pdf_from_url, \
get_shortcuts_on_desktop
get_shortcuts_on_desktop, get_history, get_enabled_experiments, get_chrome_language, get_chrome_font_size, \
get_profile_name, get_number_of_search_results, get_googledrive_file, get_active_tab_info
from .file import get_cloud_file, get_vm_file, get_cache_file
from .general import get_vm_command_line
from .info import get_vm_screen_size, get_vm_window_size, get_vm_wallpaper
from .impress import get_audio_in_slide
from .info import get_vm_screen_size, get_vm_window_size, get_vm_wallpaper, get_list_directory
from .misc import get_rule, get_accessibility_tree
from .replay import get_replay
from .vlc import get_vlc_playing_info, get_vlc_config
from .vscode import get_vscode_config
# from .impress import get_audio_in_slide

View File

@@ -2,8 +2,9 @@ import json
import logging
import os
import sqlite3
from typing import Dict
from typing import Dict, Any
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive, GoogleDriveFileList, GoogleDriveFile
from playwright.sync_api import sync_playwright
logger = logging.getLogger("desktopenv.getters.chrome")
@@ -84,6 +85,168 @@ def get_cookie_data(env, config: Dict[str, str]):
return None
def get_history(env, config: Dict[str, str]):
os_type = env.vm_platform
if os_type == 'Windows':
chrome_history_path = env.controller.execute_python_command(
"""import os; print(os.path.join(os.getenv('USERPROFILE'), "AppData", "Local", "Google", "Chrome", "User Data", "Default", "History"))""")[
'output'].strip()
elif os_type == 'Darwin':
chrome_history_path = env.controller.execute_python_command(
"""import os; print(os.path.join(os.getenv('HOME'), "Library", "Application Support", "Google", "Chrome", "Default", "History"))""")[
'output'].strip()
elif os_type == 'Linux':
chrome_history_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), '.config', 'google-chrome', 'Default', 'History'))")[
'output'].strip()
else:
raise Exception('Unsupported operating system')
try:
content = env.controller.get_file(chrome_history_path)
_path = os.path.join(env.cache_dir, config["dest"])
with open(_path, "wb") as f:
f.write(content)
conn = sqlite3.connect(_path)
cursor = conn.cursor()
# Query to check for OpenAI cookies
cursor.execute("SELECT url, title, last_visit_time FROM urls")
history_items = cursor.fetchall()
return history_items
except Exception as e:
logger.error(f"Error: {e}")
return None
def get_enabled_experiments(env, config: Dict[str, str]):
os_type = env.vm_platform
if os_type == 'Windows':
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Local State'))""")[
'output'].strip()
elif os_type == 'Darwin':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Local State'))")[
'output'].strip()
elif os_type == 'Linux':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Local State'))")[
'output'].strip()
else:
raise Exception('Unsupported operating system')
try:
content = env.controller.get_file(preference_file_path)
data = json.loads(content)
# The path within the JSON data to the default search engine might vary
enabled_labs_experiments = data.get('browser', {}).get('enabled_labs_experiments', [])
return enabled_labs_experiments
except Exception as e:
logger.error(f"Error: {e}")
return []
def get_profile_name(env, config: Dict[str, str]):
"""
Get the username from the Chrome browser.
Assume the cookies are stored in the default location, not encrypted and not large in size.
"""
os_type = env.vm_platform
if os_type == 'Windows':
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Default\\Preferences'))""")['output'].strip()
elif os_type == 'Darwin':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
'output'].strip()
elif os_type == 'Linux':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
'output'].strip()
else:
raise Exception('Unsupported operating system')
try:
content = env.controller.get_file(preference_file_path)
data = json.loads(content)
# The path within the JSON data to the default search engine might vary
profile_name = data.get('profile', {}).get('name', None)
return profile_name
except Exception as e:
logger.error(f"Error: {e}")
return None
def get_chrome_language(env, config: Dict[str, str]):
os_type = env.vm_platform
if os_type == 'Windows':
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Local State'))""")[
'output'].strip()
elif os_type == 'Darwin':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Local State'))")[
'output'].strip()
elif os_type == 'Linux':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Local State'))")[
'output'].strip()
else:
raise Exception('Unsupported operating system')
try:
content = env.controller.get_file(preference_file_path)
data = json.loads(content)
# The path within the JSON data to the default search engine might vary
enabled_labs_experiments = data.get('intl', {}).get('app_locale', "en-US")
return enabled_labs_experiments
except Exception as e:
logger.error(f"Error: {e}")
return "en-US"
def get_chrome_font_size(env, config: Dict[str, str]):
os_type = env.vm_platform
if os_type == 'Windows':
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Default\\Preferences'))""")[
'output'].strip()
elif os_type == 'Darwin':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
'output'].strip()
elif os_type == 'Linux':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
'output'].strip()
else:
raise Exception('Unsupported operating system')
try:
content = env.controller.get_file(preference_file_path)
data = json.loads(content)
# The path within the JSON data to the default search engine might vary
search_engine = data.get('webkit', {}).get('webprefs', {
"default_fixed_font_size": 13,
"default_font_size": 16,
"minimum_font_size": 13
})
return search_engine
except Exception as e:
logger.error(f"Error: {e}")
return {
"default_fixed_font_size": 13,
"default_font_size": 16
}
def get_bookmarks(env, config: Dict[str, str]):
os_type = env.vm_platform
if os_type == 'Windows':
@@ -227,8 +390,7 @@ def get_pdf_from_url(env, config: Dict[str, str]) -> str:
# fixme: needs to be changed (maybe through post-processing) since it's not working
def get_chrome_saved_address(env, config: Dict[str, str]):
# host = env.vm_ip
host = "192.168.13.130"
host = env.vm_ip
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
remote_debugging_url = f"http://{host}:{port}"
@@ -278,6 +440,80 @@ def get_shortcuts_on_desktop(env, config: Dict[str, str]):
for shortcut_path in shortcuts_paths:
short_cuts[shortcut_path] = env.controller.get_file(env.controller.execute_python_command(
f"import os; print(os.path.join(os.path.expanduser('~'), 'Desktop', '{shortcut_path}'))")['output'].strip()).decode('utf-8')
f"import os; print(os.path.join(os.path.expanduser('~'), 'Desktop', '{shortcut_path}'))")[
'output'].strip()).decode('utf-8')
return short_cuts
def get_number_of_search_results(env, config: Dict[str, str]):
# todo: move into the config file
url, result_selector = "https://google.com/search?q=query", '.search-result'
host = env.vm_ip
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
remote_debugging_url = f"http://{host}:{port}"
with sync_playwright() as p:
browser = p.chromium.connect_over_cdp(remote_debugging_url)
page = browser.new_page()
page.goto(url)
search_results = page.query_selector_all(result_selector)
actual_count = len(search_results)
browser.close()
return actual_count
def get_googledrive_file(env, config: Dict[str, Any]) -> str:
""" Get the desired file from Google Drive based on config, return the downloaded local filepath.
To retrieve the file, we provide two options in config dict:
1. query: a list of queries to search the file, each query is a string that follows the format of Google Drive search query
2. path: a list of path to the file, 'folder/subfolder/filename' -> ['folder', 'subfolder', 'filename']
3. query_list: query extends to list to download multiple files
4. path_list: path extends to list to download multiple files
dest: target file name or list. If *_list is used, dest should also be a list of the same length.
Return the downloaded filepath locally.
"""
settings_file = config.get('settings_file', 'evaluation_examples/settings/googledrive/settings.json')
auth = GoogleAuth(settings_file=settings_file)
drive = GoogleDrive(auth)
def get_single_file(_query, _path):
parent_id = 'root'
try:
for q in _query:
search = f'( {q} ) and "{parent_id}" in parents'
filelist: GoogleDriveFileList = drive.ListFile({'q': search}).GetList()
if len(filelist) == 0: # target file not found
return None
file: GoogleDriveFile = filelist[0] # HACK: if multiple candidates, just use the first one
parent_id = file['id']
file.GetContentFile(_path, mimetype=file['mimeType'])
except:
logger.info('[ERROR]: Failed to download the file from Google Drive')
return None
return _path
if 'query' in config:
return get_single_file(config['query'], os.path.join(env.cache_dir, config['dest']))
elif 'path' in config:
query = [f"title = {fp} and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if idx < len(config['path']) - 1
else f'title = {fp} and trashed = false' for idx, fp in enumerate(config['path'])]
return get_single_file(query, os.path.join(env.cache_dir, config['dest']))
elif 'query_list' in config:
_path_list = []
assert len(config['query_list']) == len(config['dest'])
for idx, query in enumerate(config['query_list']):
dest = config['dest'][idx]
_path_list.append(get_single_file(query, os.path.join(env.cache_dir, dest)))
return _path_list
else: # path_list in config
_path_list = []
assert len(config['path_list']) == len(config['dest'])
for idx, path in enumerate(config['path_list']):
query = [f"title = {fp} and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if jdx < len(path) - 1
else f'title = {fp} and trashed = false' for jdx, fp in enumerate(path)]
dest = config['dest'][idx]
_path_list.append(get_single_file(query, os.path.join(env.cache_dir, dest)))
return _path_list

View File

@@ -9,8 +9,9 @@ def get_vm_command_line(env, config: Dict[str, str]):
vm_ip = env.vm_ip
port = 5000
command = config["command"]
shell = config.get("shell", False)
response = requests.post(f"http://{vm_ip}:{port}/execute", json={"command": command})
response = requests.post(f"http://{vm_ip}:{port}/execute", json={"command": command, "shell": shell})
if response.status_code == 200:
return response.json()["output"]

View File

@@ -0,0 +1,62 @@
import os
import tempfile
import xml.etree.ElementTree as ET
import zipfile
from typing import Dict
from desktop_env.evaluators.getters.file import get_vm_file
def get_audio_in_slide(env, config: Dict[str, str]):
ppt_file_path, slide_index, dest = config["ppt_file_path"], int(config["slide_index"]), config["dest"]
# Open the .pptx file as a zip file, fixme: now we assume there is only one audio file in the slides
audio_file_path = None
ppt_file_localhost_path = get_vm_file(env, {"path": ppt_file_path, "dest": os.path.split(ppt_file_path)[-1]})
with zipfile.ZipFile(ppt_file_localhost_path, 'r') as myzip:
# Find the relationships XML file for the first slide
slide1_rels_file = 'ppt/slides/_rels/slide{}.xml.rels'.format(slide_index + 1)
if slide1_rels_file in myzip.namelist():
with myzip.open(slide1_rels_file) as f:
# Parse the XML tree from the relationships file
tree = ET.parse(f)
root = tree.getroot()
# Define the namespace used in the relationships file
namespaces = {'r': 'http://schemas.openxmlformats.org/package/2006/relationships'}
# Look for all relationship elements that have a type attribute for audio
for rel in root.findall('r:Relationship', namespaces):
# Check if the relationship is for an audio file
if 'audio' in rel.attrib['Type']:
# The audio can be embedded inside the file or linked to an external file
# Get the target attribute which contains the audio file path
target = rel.attrib['Target']
if target.startswith('..'):
# Resolve the relative path to get the correct path within the zip file
audio_file_path = os.path.normpath(os.path.join('ppt/slides', target))
# Replace backslashes with forward slashes for ZIP compatibility
audio_file_path = audio_file_path.replace('\\', '/')
# Create a temporary directory to extract the audio file
with tempfile.TemporaryDirectory() as tmpdirname:
# Extract the audio file
myzip.extract(audio_file_path, tmpdirname)
# Get the full path of the extracted audio file
extracted_audio_path = os.path.join(tmpdirname, audio_file_path)
# Return the extracted audio file path
audio_file_path = extracted_audio_path
else:
# the audio file is external to the .pptx file
# Return the audio file path
assert target.startswith("file://"), target
audio_file_path = target[7:]
if audio_file_path is None:
return None
else:
# Get the audio file from vm and return the file path in the host
return get_vm_file(env, {"path": audio_file_path, "dest": dest})

View File

@@ -18,3 +18,7 @@ def get_vm_wallpaper(env, config: dict) -> Union[str, bytes]:
f.write(content)
return _path
def get_list_directory(env, config: dict) -> dict:
return env.controller.get_vm_directory_tree(config["path"])

View File

@@ -1,22 +1,35 @@
from .chrome import is_expected_tabs, is_expected_bookmarks, compare_pdfs, is_cookie_deleted, is_shortcut_on_desktop
from .chrome import is_expected_tabs, is_expected_bookmarks, compare_pdfs, is_cookie_deleted, is_shortcut_on_desktop, \
check_font_size, \
check_enabled_experiments, check_history_deleted, is_expected_search_query
from .docs import compare_font_names, compare_subscript_contains, has_page_numbers_in_footers, compare_docx_lines
from .docs import find_default_font, contains_page_break, compare_docx_files, compare_docx_tables, compare_line_spacing, \
compare_insert_equation, compare_highlighted_text
from .docs import is_first_line_centered, check_file_exists, compare_contains_image
from .docs import evaluate_colored_words_in_tables, check_highlighted_words, evaluate_strike_through_last_paragraph, \
evaluate_conversion, evaluate_spacing, check_italic_font_size_14, evaluate_alignment, get_unique_train_ids, \
check_no_duplicates, compare_init_lines
from .general import exact_match, fuzzy_match
from .docs import find_default_font, contains_page_break, compare_docx_files, compare_docx_tables, compare_line_spacing, \
compare_insert_equation, compare_highlighted_text
from .docs import is_first_line_centered, check_file_exists, compare_contains_image
from .general import check_csv, check_accessibility_tree, run_sqlite3, check_json
from .general import exact_match, fuzzy_match, check_include_exclude
from .gimp import increase_saturation, decrease_brightness, check_file_exists, compare_triangle_positions
from .slides import check_presenter_console_disable, check_image_stretch_and_center, check_slide_numbers_color, compare_pptx_files, check_strikethrough, \
check_slide_orientation_Portrait, evaluate_presentation_fill_to_rgb_distance, check_left_panel
from .libreoffice import check_libre_locale
from .pdf import check_pdf_pages
#from .table import check_sheet_list, check_xlsx_freeze, check_xlsx_zoom, check_data_validations
from .slides import check_presenter_console_disable, check_image_stretch_and_center, check_slide_numbers_color, \
compare_pptx_files, check_strikethrough, \
check_slide_orientation_Portrait, evaluate_presentation_fill_to_rgb_distance, check_left_panel
# from .table import check_sheet_list, check_xlsx_freeze, check_xlsx_zoom, check_data_validations
from .table import compare_table
from .thunderbird import check_thunderbird_prefs, check_thunderbird_filter
# from .vlc import is_vlc_playing, is_vlc_recordings_folder, is_vlc_fullscreen, compare_images, compare_audios, \
# compare_videos
from .thunderbird import check_thunderbird_prefs, check_thunderbird_filter
from .vscode import compare_text_file, compare_config, compare_answer, is_extension_installed, check_json_settings, check_json_keybindings
from .vlc import is_vlc_playing, is_vlc_recordings_folder, is_vlc_fullscreen, compare_images, compare_audios, \
compare_videos, check_qt_bgcone, check_one_instance_when_started_from_file, check_qt_minimal_view, \
check_qt_max_volume, \
check_qt_slider_colours, check_global_key_play_pause
from .vscode import compare_text_file, compare_config, compare_answer, is_extension_installed, check_json_settings, \
check_json_keybindings
from .os import check_gnome_favorite_apps, is_utc_0, check_text_enlarged, check_moved_jpgs

View File

@@ -1,4 +1,4 @@
import logging
import logging, re
from typing import Any, Dict, List
import fitz # PyMuPDF
@@ -44,6 +44,15 @@ def is_expected_bookmarks(bookmarks: List[str], rule: Dict[str, Any]) -> float:
raise TypeError(f"{rule['type']} not support yet!")
def is_expected_search_query(active_tab_info: Dict[str, str], rules: Dict[str, Any]) -> float:
expected = rules['expect']
pattern = expected['pattern']
matched = re.search(pattern, active_tab_info['url'])
if matched:
return 1.
return 0.
def compare_pdfs(pdf1_path, pdf2_path):
"""
Compare two PDF files.
@@ -56,11 +65,14 @@ def compare_pdfs(pdf1_path, pdf2_path):
for page in pdf:
text += page.get_text()
return text.strip()
try:
text1 = extract_text_from_pdf(pdf1_path)
text2 = extract_text_from_pdf(pdf2_path)
text1 = extract_text_from_pdf(pdf1_path)
text2 = extract_text_from_pdf(pdf2_path)
return fuzz.ratio(text1, text2) / 100
return fuzz.ratio(text1, text2) / 100
except Exception as e:
logger.info(f"[ERROR]: unexpected error occurred when comparing PDF files: {e}")
return 0.0
def is_cookie_deleted(cookie_data, rule):
@@ -95,3 +107,45 @@ def is_shortcut_on_desktop(shortcuts: Dict[str, str], rule):
raise TypeError(f"{rule['type']} not support yet!")
else:
raise TypeError(f"{rule['type']} not support yet!")
def check_history_deleted(history_data, rule):
"""
Check if the history is deleted.
"""
if rule['type'] == 'keywords':
history_domains = [history[0] for history in history_data]
for keyword in rule['keywords']:
for history_domain in history_domains:
if keyword in history_domain:
return 0.
return 1.
else:
raise TypeError(f"{rule['type']} not support yet!")
def check_enabled_experiments(enabled_experiments, rule):
"""
Check if the enabled experiments are as expected.
"""
enabled_experiments_names = [experiment.split("@")[0] for experiment in enabled_experiments]
if rule['type'] == 'names':
return 1. if enabled_experiments_names == rule['names'] else 0.
else:
raise TypeError(f"{rule['type']} not support yet!")
def check_font_size(font_size, rule):
"""
Check if the font size is as expected.
"""
default_font_size = font_size['default_font_size']
if rule['type'] == 'value':
return 1. if default_font_size == rule['value'] else 0.
elif rule['type'] == 'range':
return 1. if rule['min'] < default_font_size < rule['max'] else 0.
else:
raise TypeError(f"{rule['type']} not support yet!")

View File

@@ -1,8 +1,9 @@
import csv
import json
import functools
import json
import operator
import re
import sqlite3
from numbers import Number
from typing import Callable, Any, Union
from typing import Dict, List, Pattern
@@ -14,7 +15,13 @@ from rapidfuzz import fuzz
from .utils import _match_record, _match_value_to_rule
import sqlite3
def check_include_exclude(result: str, rules: Dict[str, List[str]]) -> float:
print(result, rules)
include = rules.get("include", [])
exclude = rules.get("exclude", [])
return all(r in result for r in include) and all(r not in result for r in exclude)
def exact_match(result, rules) -> float:
expect = rules["expected"]
@@ -31,6 +38,7 @@ def fuzzy_match(result, rules) -> float:
return fuzz.ratio(result, expect) / 100.
def check_csv(result: str, rules: Dict[str, List[Dict[str, str]]]) -> float:
"""
Args:
@@ -135,10 +143,10 @@ def check_accessibility_tree(result: str, rules: Dict[str, Any]) -> float:
return 0.
if "text" in rules:
match_func: Callable[[str], Number] = functools.partial( operator.eq if rules["exact"]\
else (lambda a, b: fuzz.ratio(a, b)/100.)
, rules["text"]
)
match_func: Callable[[str], Number] = functools.partial(operator.eq if rules["exact"] \
else (lambda a, b: fuzz.ratio(a, b) / 100.)
, rules["text"]
)
match_score: Number = 0
for elm in elements:
match_score = max(match_score, match_func(elm.text or None))
@@ -147,6 +155,7 @@ def check_accessibility_tree(result: str, rules: Dict[str, Any]) -> float:
return float(match_score)
# def check_existence(result: str, *args) -> float:
# return 1. - (result is None)
@@ -155,6 +164,7 @@ def run_sqlite3(result: str, rules: Dict[str, Any]) -> float:
cursor: sqlite3.Cursor = connection.execute(rules["sql"])
return float(cursor.fetchone()[0] or 0)
def check_json(result: str, rules: Dict[str, List[Dict[str, Union[List[str], str]]]]) -> float:
"""
Args:

View File

@@ -0,0 +1,59 @@
import subprocess
def check_gnome_favorite_apps(apps_str: str, rule):
# parse the string like "['thunderbird.desktop', 'vim.desktop', 'google-chrome.desktop']"
# to a list of strings
apps = eval(apps_str)
expected_apps = rule["expected"]
if len(apps) != len(expected_apps):
return 0
if set(apps) == set(expected_apps):
return 1
else:
return 0
def is_utc_0(timedatectl_output):
"""
Format as:
Local time: Thu 2024-01-25 12:56:06 WET
Universal time: Thu 2024-01-25 12:56:06 UTC
RTC time: Thu 2024-01-25 12:56:05
Time zone: Atlantic/Faroe (WET, +0000)
System clock synchronized: yes
NTP service: inactive
RTC in local TZ: no
"""
utc_line = timedatectl_output.split("\n")[3]
if utc_line.endswith("+0000)"):
return 1
else:
return 0
def check_text_enlarged(scaling_factor_str):
scaling_factor = float(scaling_factor_str)
if scaling_factor > 1.0:
return 1
else:
return 0
def check_moved_jpgs(directory_list, rule):
expected_jpgs = rule["expected"]
moved_jpgs = [node['name'] for node in directory_list['children']]
if len(moved_jpgs) != len(expected_jpgs):
return 0
if set(moved_jpgs) == set(expected_jpgs):
return 1
else:
return 0

View File

@@ -1,7 +1,7 @@
import logging
#import operator
from numbers import Number
from typing import Any, Union, cast, Callable
from typing import Any, Union, cast, Callable, Iterable
from typing import Dict, List, Tuple
import os.path
import itertools
@@ -13,9 +13,11 @@ from openpyxl import Workbook
from openpyxl.worksheet.worksheet import Worksheet
#from openpyxl.worksheet.cell_range import MultiCellRange
from openpyxl.worksheet.datavalidation import DataValidation
from openpyxl.cell.cell import Cell
#from openpyxl.utils import coordinate_to_tuple
from .utils import load_charts, load_sparklines, load_rows_or_cols, load_xlsx_styles
from .utils import _match_value_to_rule
from .utils import _match_value_to_rule, _read_cell_style, read_cell_value
logger = logging.getLogger("desktopenv.metric.table")
@@ -91,11 +93,11 @@ def compare_table(result: str, expected: str, **options) -> float:
return 0.
xlworkbookr: Workbook = openpyxl.load_workbook(filename=result)
pdworkbookr = pd.ExcelFile(xlworkbookr, engine="openpyxl")
pdworkbookr = pd.ExcelFile(result)
worksheetr_names: List[str] = pdworkbookr.sheet_names
xlworkbooke: Workbook = openpyxl.load_workbook(filename=expected)
pdworkbooke = pd.ExcelFile(xlworkbooke, engine="openpyxl")
pdworkbooke = pd.ExcelFile(expected)
worksheete_names: List[str] = pdworkbooke.sheet_names
parse_idx: Callable[[Union[str, int], BOOK, BOOK], BOOK] =\
@@ -165,7 +167,7 @@ def compare_table(result: str, expected: str, **options) -> float:
# Compare Style (Also Conditional Formatiing) {{{ #
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# sheet_idx1: as sheet_idx0
# props: list of str indicating concerned styles
# props: list of str indicating concerned styles, see utils._read_cell_style
styles1: Dict[str, List[Any]] = load_xlsx_styles(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r)
styles2: Dict[str, List[Any]] = load_xlsx_styles(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r)
@@ -283,6 +285,31 @@ def compare_table(result: str, expected: str, **options) -> float:
logger.debug("Assertion: %s[cols] == %s[cols] - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
# }}} Check Row Properties #
elif r["type"] == "check_cell":
# Check Cell Properties {{{ #
# sheet_idx: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# coordinate: str, "E3"
# props: dict like {attribute: {"method": str, "ref": anything}}
# supported attributes: value & those supported by utils._read_cell_style
sheet: Worksheet = _load_sheet(*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke))
#data_frame: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx"], pdworkbookr, pdworkbooke))
cell: Cell = sheet[r["coordinate"]]
metric: bool = True
for prpt, rule in r["props"].items():
if prpt=="value":
val = read_cell_value(*parse_idx(r["sheet_idx"], result, expected), r["coordinate"])
else:
val = _read_cell_style(prpt, cell)
metric = metric and _match_value_to_rule(val, rule)
logger.debug( "Assertion: %s[%s] :%s - %s"
, r["sheet_idx"], r["coordinate"]
, repr(r["props"]), metric
)
# }}} Check Cell Properties #
else:
raise NotImplementedError("Unimplemented sheet check: {:}".format(r["type"]))
@@ -293,6 +320,24 @@ def compare_table(result: str, expected: str, **options) -> float:
return float(passes)
# }}} function compare_table #
def compare_csv(result: str, expected: str, **options) -> float:
if result is None:
return 0.
with open(result) as f:
result_lines: Iterable[str] = f.read().splitlines()
with open(expected) as f:
expected_lines: Iterable[str] = f.read().splitlines()
if not options.get("strict", True):
result_lines = map(str.strip, result_lines)
expected_lines = map(str.strip, expected_lines)
if options.get("ignore_case", False):
result_lines = map(str.lower, result_lines)
expected_lines = map(str.lower, expected_lines)
metric: bool = list(result_lines)==list(expected_lines)
return float(metric)
if __name__ == '__main__':
import datetime
import sys
@@ -326,16 +371,15 @@ if __name__ == '__main__':
logger.addHandler(stdout_handler)
logger.addHandler(sdebug_handler)
path1 = "../../任务数据/LibreOffice Calc/Calendar_Highlight_Weekend_Days.xlsx"
path2 = "../../任务数据/LibreOffice Calc/Calendar_Highlight_Weekend_Days_gold.xlsx"
rules = [ { "type": "sheet_data"
, "sheet_idx0": 0
, "sheet_idx1": "EI0"
}
, { "type": "style"
, "sheet_idx0": 0
, "sheet_idx1": "EI0"
, "props": ["bgcolor"]
path1 = "../../任务数据/LibreOffice Calc/Multiply_Time_Number.xlsx"
path2 = "../../任务数据/LibreOffice Calc/Multiply_Time_Number_gold.xlsx"
rules = [ { "type": "check_cell"
, "sheet_idx": 0
, "coordinate": "E3"
, "props": { "value": { "method": "approx:0.001"
, "ref": 191.6667
}
}
}
]
print( compare_table( path1, path2

View File

@@ -34,6 +34,7 @@ _xlsx_namespaces = [ ("oo", "http://schemas.openxmlformats.org/spreadsheetml/200
]
_xlsx_ns_mapping = dict(_xlsx_namespaces)
_xlsx_ns_imapping = dict(map(lambda itm: (itm[1], itm[0]), _xlsx_namespaces))
_xlsx_ns_imapping["http://schemas.openxmlformats.org/spreadsheetml/2006/main"] = None
_sheet_name_selector = lxml.cssselect.CSSSelector("oo|sheets>oo|sheet", namespaces=_xlsx_ns_mapping)
_sparklines_selector = lxml.cssselect.CSSSelector("x14|sparkline", namespaces=_xlsx_ns_mapping)
def load_sparklines(xlsx_file: str, sheet_name: str) -> Dict[str, str]:
@@ -154,6 +155,48 @@ def load_charts(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[str, An
return chart_set
# }}} function load_charts #
_shared_str_selector = lxml.cssselect.CSSSelector("oo|sst>oo|si>oo|t", namespaces=_xlsx_ns_mapping)
def read_cell_value(xlsx_file: str, sheet_name: str, coordinate: str) -> Any:
# read_cell_value {{{ #
with zipfile.ZipFile(xlsx_file, "r") as z_f:
try:
with z_f.open("xl/sharedStrings.xml") as f:
shared_str_xml: _Element = lxml.etree.fromstring(f.read())
str_elements: List[_Element] = _shared_str_selector(shared_str_xml)
shared_strs: List[str] = [elm.text for elm in str_elements]
except:
logger.debug("Read shared strings error: %s", xlsx_file)
with z_f.open("xl/workbook.xml") as f:
workbook_database: _Element = lxml.etree.fromstring(f.read())
sheets: List[_Element] = _sheet_name_selector(workbook_database)
sheet_names: Dict[str, str] = {sh.get("name"): sh.get("sheetId") for sh in sheets}
with z_f.open("xl/worksheets/sheet{:}.xml".format(sheet_names[sheet_name])) as f:
sheet: _Element = lxml.etree.fromstring(f.read())
cells: List[_Element] =\
lxml.cssselect.CSSSelector( 'oo|row>oo|c[r="{:}"]'.format(coordinate)
, namespaces=_xlsx_ns_mapping
)(sheet)
if len(cells)==0:
return None
cell: _Element = cells[0]
cell: Dict[str, str] = xmltodict.parse( lxml.etree.tostring(cell, encoding="unicode")
, process_namespaces=True
, namespaces=_xlsx_ns_imapping
)
logger.debug("%s.%s[%s]: %s", xlsx_file, sheet_name, coordinate, repr(cell))
if "@t" not in cell["c"]:
return None
if cell["c"]["@t"] == "s":
return shared_strs[int(cell["c"]["v"])]
if cell["c"]["@t"] == "n":
return float(cell["c"]["v"])
if cell["c"]["@t"] == "str":
return cell["c"]["v"]
# }}} read_cell_value #
# Supported Styles:
# number_format
# font_name - str
@@ -311,6 +354,15 @@ def _match_value_to_rule(value: V, rule: Dict[str, Union[str, V]]) -> bool:
, "ge", "gt"
}:
return getattr(operator, rule["method"])(value, rule["ref"])
if rule["method"].startswith("approx"):
threshold: float = float(rule["method"].split(":")[1])
logger.debug("Approx: TH%f, REF%f, VAL%s", threshold, rule["ref"], repr(value))
try:
value = float(value)
except (ValueError, TypeError):
return False
else:
return abs(value-rule["ref"])<=threshold
if rule["method"] == "spreadsheet_range":
subset_limit = MultiCellRange(rule["ref"][0])
superset_limit = MultiCellRange(rule["ref"][1])

View File

@@ -53,7 +53,7 @@ def is_vlc_recordings_folder(actual_config_path: str, rule: Dict[str, str]) -> f
expected_recording_file_path = rule['recording_file_path']
try:
for line in config_file:
for line in config_file.split("\n"):
# Skip comments and empty lines
if line.startswith('#') or not line.strip():
continue
@@ -216,3 +216,195 @@ def are_audio_files_similar(mp3_file_path, mp4_file_path):
return True
return False
def check_qt_bgcone(actual_config_path, rule):
with open(actual_config_path, 'rb') as file:
config_file = file.read().decode('utf-8')
expected_qt_bgcone = rule['expected_qt_bgcone']
if isinstance(expected_qt_bgcone, int):
expected_qt_bgcone = str(expected_qt_bgcone)
try:
# The default value of qt_bgcone is 1, which means it is enabled
qt_bgcone = "1"
for line in config_file.split("\n"):
# Check if the line contains the recording path setting
if 'qt-bgcone=' in line:
# Extract the value of the recording path and remove surrounding whitespace
qt_bgcone = line.split('=')[-1].strip()
# The configuration key was not found in the file
if qt_bgcone == expected_qt_bgcone:
return 1
else:
return 0
except FileNotFoundError:
logger.error("VLC configuration file not found.")
return 0
except Exception as e:
logger.error(f"An error occurred: {e}")
return 0
def check_qt_max_volume(actual_config_path, rule):
with open(actual_config_path, 'rb') as file:
config_file = file.read().decode('utf-8')
expected_qt_max_volume = rule['expected_qt_max_volume']
if isinstance(expected_qt_max_volume, int):
expected_qt_max_volume = str(expected_qt_max_volume)
try:
qt_max_volume = "125"
for line in config_file.split("\n"):
if 'qt-max-volume=' in line:
qt_max_volume = line.split('=')[-1].strip()
# The configuration key was not found in the file
if qt_max_volume == expected_qt_max_volume:
return 1
else:
return 0
except FileNotFoundError:
logger.error("VLC configuration file not found.")
return 0
except Exception as e:
logger.error(f"An error occurred: {e}")
return 0
def check_qt_minimal_view(actual_config_path, rule):
with open(actual_config_path, 'rb') as file:
config_file = file.read().decode('utf-8')
expected_qt_minimal_view = rule['expected_qt_minimal_view']
if isinstance(expected_qt_minimal_view, int):
expected_qt_minimal_view = str(expected_qt_minimal_view)
try:
qt_minimal_view = "0"
for line in config_file.split("\n"):
if 'qt-minimal-view=' in line:
qt_minimal_view = line.split('=')[-1].strip()
if qt_minimal_view == expected_qt_minimal_view:
return 1
else:
return 0
except FileNotFoundError:
logger.error("VLC configuration file not found.")
return 0
except Exception as e:
logger.error(f"An error occurred: {e}")
return 0
def check_qt_slider_colours(actual_config_path, rule):
with open(actual_config_path, 'rb') as file:
config_file = file.read().decode('utf-8')
try:
qt_slider_colours = "153;210;153;20;210;20;255;199;15;245;39;29"
for line in config_file.split("\n"):
if 'qt-slider-colours' in line:
qt_slider_colours = line.split('=')[-1].strip()
# The configuration key was not found in the file
if rule['type'] == 'match':
expected_qt_slider_colours = rule['expected_qt_slider_colours']
if qt_slider_colours == expected_qt_slider_colours:
return 1
else:
return 0
elif rule['type'] == 'blackish':
def is_color_blackish(rgb_values, threshold=100):
# decide if the color is blackish
return all(value < threshold for value in rgb_values)
def parse_qt_slider_colours(colours_string):
# parse the string of colours into a list of RGB tuples
values = [int(x) for x in colours_string.split(';')]
colors = list(zip(values[0::3], values[1::3], values[2::3]))
return colors
colors = parse_qt_slider_colours(qt_slider_colours)
# check if all colors are blackish
for color in colors:
if is_color_blackish(color):
pass
else:
return 0
return 1
except FileNotFoundError:
logger.error("VLC configuration file not found.")
return 0
except Exception as e:
logger.error(f"An error occurred: {e}")
return 0
def check_global_key_play_pause(actual_config_path, rule):
"""
# Play/Pause (str)
#global-key-play-pause=
# Play/Pause (str)
#key-play-pause=Space
"""
with open(actual_config_path, 'rb') as file:
config_file = file.read().decode('utf-8')
expected_global_key_play_pause = rule['expected_global_key_play_pause']
if isinstance(expected_global_key_play_pause, int):
expected_global_key_play_pause = str(expected_global_key_play_pause)
try:
global_key_play_pause = "0"
for line in config_file.split("\n"):
# Check if the line contains the recording path setting
if 'global-key-play-pause=' in line:
global_key_play_pause = "0" if line.split('=')[-1].strip() == "" else "1"
if global_key_play_pause == expected_global_key_play_pause:
return 1
else:
return 0
except FileNotFoundError:
logger.error("VLC configuration file not found.")
return 0
except Exception as e:
logger.error(f"An error occurred: {e}")
return 0
def check_one_instance_when_started_from_file(actual_config_path, rule):
with open(actual_config_path, 'rb') as file:
config_file = file.read().decode('utf-8')
expected_one_instance_when_started_from_file = rule['expected_one_instance_when_started_from_file']
if isinstance(expected_one_instance_when_started_from_file, int):
expected_one_instance_when_started_from_file = str(expected_one_instance_when_started_from_file)
try:
one_instance_when_started_from_file = "1"
for line in config_file.split("\n"):
# Check if the line contains the recording path setting
if 'one-instance-when-started-from-file=' in line:
one_instance_when_started_from_file = line.split('=')[-1].strip()
if one_instance_when_started_from_file == expected_one_instance_when_started_from_file:
return 1
else:
return 0
except FileNotFoundError:
logger.error("VLC configuration file not found.")
return 0
except Exception as e:
logger.error(f"An error occurred: {e}")
return 0