Merge branch 'main' into xiaochuanli/addChromeExtensions

This commit is contained in:
Tianbao Xie
2024-03-08 20:45:49 +08:00
committed by GitHub
109 changed files with 7196 additions and 172 deletions

View File

@@ -450,6 +450,8 @@ class SetupController:
query(str): query pattern string to search files or folder in google drive to delete, please refer to
https://developers.google.com/drive/api/guides/search-files?hl=en about how to write query string.
trash(bool): whether to delete files permanently or move to trash. By default, trash=false, completely delete it.
for mkdirs:
path(List[str]): the path in the google drive to create folder
for upload:
path(str): remote url to download file
dest(List[str]): the path in the google drive to store the downloaded file

View File

@@ -24,12 +24,13 @@ from .chrome import (
get_gotoRecreationPage_and_get_html_content,
get_url_dashPart,
get_active_url_from_accessTree,
get_find_installed_extension_name
get_find_installed_extension_name,
get_info_from_website
)
from .file import get_cloud_file, get_vm_file, get_cache_file
from .general import get_vm_command_line, get_vm_terminal_output
from .file import get_cloud_file, get_vm_file, get_cache_file, get_content_from_vm_file
from .general import get_vm_command_line, get_vm_terminal_output, get_vm_command_error
from .gimp import get_gimp_config_file
from .impress import get_audio_in_slide
from .impress import get_audio_in_slide, get_background_image_in_slide
from .info import get_vm_screen_size, get_vm_window_size, get_vm_wallpaper, get_list_directory
from .misc import get_rule, get_accessibility_tree, get_rule_relativeTime, get_time_diff_range
from .replay import get_replay

View File

@@ -12,7 +12,7 @@ import lxml.etree
import requests
from lxml.cssselect import CSSSelector
from lxml.etree import _Element
from playwright.sync_api import sync_playwright
from playwright.sync_api import sync_playwright, expect
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive, GoogleDriveFileList, GoogleDriveFile
@@ -36,6 +36,89 @@ WARNING:
"""
def get_info_from_website(env, config: Dict[Any, Any]) -> Any:
""" Get information from a website. Especially useful when the information may be updated through time.
Args:
env (Any): The environment object.
config (Dict[Any, Any]): The configuration dictionary.
- url (str): The URL of the website to visit
- infos (List[Dict[str, str]]): The list of information to be extracted from the website. Each dictionary contains:
- action (str): chosen from 'inner_text', 'attribute', 'click_and_inner_text', 'click_and_attribute', etc., concretely,
- inner_text: extract the inner text of the element specified by the selector
- attribute: extract the attribute of the element specified by the selector
- click_and_inner_text: click elements following the selector and then extract the inner text of the last element
- click_and_attribute: click elements following the selector and then extract the attribute of the last element
- selector (Union[str, List[str]]): The CSS selector(s) of the element(s) to be extracted.
- attribute (str): optional for 'attribute' and 'click_and_attribute', the attribute to be extracted.
- backups (Any): The backup information to be returned if the extraction fails.
"""
try:
host = env.vm_ip
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
remote_debugging_url = f"http://{host}:{port}"
with sync_playwright() as p:
# connect to remote Chrome instance
try:
browser = p.chromium.connect_over_cdp(remote_debugging_url)
except Exception as e:
# If the connection fails (e.g., the agent close the browser instance), start a new browser instance
app = 'chromium' if 'arm' in platform.machine() else 'google-chrome'
payload = json.dumps({"command": [
app,
"--remote-debugging-port=1337"
], "shell": False})
headers = {"Content-Type": "application/json"}
requests.post("http://" + host + ":5000/setup" + "/launch", headers=headers, data=payload)
time.sleep(5)
browser = p.chromium.connect_over_cdp(remote_debugging_url)
page = browser.contexts[0].new_page()
page.goto(config["url"])
page.wait_for_load_state('load')
infos = []
for info_dict in config.get('infos', []):
if page.url != config["url"]:
page.goto(config["url"])
page.wait_for_load_state('load')
action = info_dict.get('action', 'inner_text')
if action == "inner_text":
ele = page.locator(info_dict['selector'])
expect(ele).to_be_visible()
infos.append(ele.inner_text())
elif action == "attribute":
ele = page.locator(info_dict['selector'])
expect(ele).to_be_visible()
infos.append(ele.get_attribute(info_dict['attribute']))
elif action == 'click_and_inner_text':
for idx, sel in enumerate(info_dict['selector']):
if idx != len(info_dict['selector']) - 1:
link = page.locator(sel)
expect(link).to_be_visible()
link.click()
page.wait_for_load_state('load')
else:
ele = page.locator(sel)
expect(ele).to_be_visible()
infos.append(ele.inner_text())
elif action == 'click_and_attribute':
for idx, sel in enumerate(info_dict['selector']):
if idx != len(info_dict['selector']) - 1:
link = page.locator(sel)
expect(link).to_be_visible()
link.click()
page.wait_for_load_state('load')
else:
ele = page.locator(sel)
expect(ele).to_be_visible()
infos.append(ele.get_attribute(info_dict['attribute']))
else:
raise NotImplementedError(f'The action {action} is not supported yet.')
return infos
except Exception as e:
logger.error(f'[ERROR]: failed to obtain information from the website: {config["url"]}. Use backup results instead.')
return config.get('backups', None)
# The following ones just need to load info from the files of software, no need to connect to the software
def get_default_search_engine(env, config: Dict[str, str]):
os_type = env.vm_platform
@@ -507,6 +590,10 @@ def get_active_url_from_accessTree(env, config):
if len(elements) == 0:
print("no elements found")
return None
elif elements[-1].text is None:
print("no text found")
return None
active_tab_url = config["goto_prefix"] + elements[0].text if "goto_prefix" in config.keys() else "https://" + \
elements[0].text
print("active tab url now: {}".format(active_tab_url))
@@ -722,15 +809,20 @@ def get_number_of_search_results(env, config: Dict[str, str]):
def get_googledrive_file(env, config: Dict[str, Any]) -> str:
""" Get the desired file from Google Drive based on config, return the downloaded local filepath.
To retrieve the file, we provide two options in config dict:
1. query: a list of queries to search the file, each query is a string that follows the format of Google Drive search query
2. path: a list of path to the file, 'folder/subfolder/filename' -> ['folder', 'subfolder', 'filename']
3. query_list: query extends to list to download multiple files
4. path_list: path extends to list to download multiple files
dest: target file name or list. If *_list is used, dest should also be a list of the same length.
Return the downloaded filepath locally.
@args: keys in config dict
settings_file(str): target filepath to the settings file for Google Drive authentication, default is 'evaluation_examples/settings/googledrive/settings.yml'
query/path[_list](Union[str, List[str]]): the query or path [list] to the file(s) on Google Drive. To retrieve the file, we provide multiple key options to specify the filepath on drive in config dict:
1) query: a list of queries to search the file, each query is a string that follows the format of Google Drive search query. The documentation is available here: (support more complex search but too complicated to use)
https://developers.google.com/drive/api/guides/search-files?hl=en
2) path: a str list poingting to file path on googledrive, e.g., 'folder/subfolder/filename.txt' ->
config contain one key-value pair "path": ['folder', 'subfolder', 'filename.txt']
3) query_list: query extends to list to download multiple files
4) path_list: path extends to list to download multiple files, e.g.,
"path_list": [['folder', 'subfolder', 'filename1.txt'], ['folder', 'subfolder', 'filename2.txt']]
@return:
dest(Union[List[str], str]): target file name or list. If *_list is used in input config, dest should also be a list of the same length. Return the downloaded local filepath.
"""
settings_file = config.get('settings_file', 'evaluation_examples/settings/googledrive/settings.json')
settings_file = config.get('settings_file', 'evaluation_examples/settings/googledrive/settings.yml')
auth = GoogleAuth(settings_file=settings_file)
drive = GoogleDrive(auth)

View File

@@ -3,6 +3,26 @@ from typing import Dict, List, Set
from typing import Optional, Any, Union
from datetime import datetime
import requests
import pandas as pd
def get_content_from_vm_file(env, config: Dict[str, Any]) -> Any:
"""
Config:
path (str): absolute path on the VM to fetch
"""
path = config["path"]
file_path = get_vm_file(env, {"path": path, "dest": os.path.basename(path)})
file_type, file_content = config['file_type'], config['file_content']
if file_type == 'xlsx':
if file_content == 'last_row':
df = pd.read_excel(file_path)
last_row = df.iloc[-1]
last_row_as_list = last_row.astype(str).tolist()
return last_row_as_list
else:
raise NotImplementedError(f"File type {file_type} not supported")
def get_cloud_file(env, config: Dict[str, Any]) -> Union[str, List[str]]:

View File

@@ -21,6 +21,22 @@ def get_vm_command_line(env, config: Dict[str, str]):
logger.error("Failed to get vm command line. Status code: %d", response.status_code)
return None
def get_vm_command_error(env, config: Dict[str, str]):
vm_ip = env.vm_ip
port = 5000
command = config["command"]
shell = config.get("shell", False)
response = requests.post(f"http://{vm_ip}:{port}/execute", json={"command": command, "shell": shell})
print(response.json())
if response.status_code == 200:
return response.json()["error"]
else:
logger.error("Failed to get vm command line error. Status code: %d", response.status_code)
return None
def get_vm_terminal_output(env, config: Dict[str, str]):
return env.controller.get_terminal_output()

View File

@@ -7,6 +7,67 @@ from typing import Dict
from desktop_env.evaluators.getters.file import get_vm_file
def get_background_image_in_slide(env, config: Dict[str, str]):
ppt_file_path, slide_index, dest = config["ppt_file_path"], int(config["slide_index"]), config["dest"]
image_id, image_file_path = None, None
ppt_file_localhost_path = get_vm_file(env, {"path": ppt_file_path, "dest": os.path.split(ppt_file_path)[-1]})
with zipfile.ZipFile(ppt_file_localhost_path, 'r') as myzip:
slide1_xml_file = 'ppt/slides/slide{}.xml'.format(slide_index + 1)
# firstly, check whether the background image is used in the slide
if slide1_xml_file not in myzip.namelist(): return None
with myzip.open(slide1_xml_file) as f:
# Parse the XML tree from the relationships file
tree = ET.parse(f)
root = tree.getroot()
bg_tag = "{http://schemas.openxmlformats.org/presentationml/2006/main}bgPr"
image_tag = "{http://schemas.openxmlformats.org/drawingml/2006/main}blip"
attr_tag = "{http://schemas.openxmlformats.org/officeDocument/2006/relationships}embed"
for child in root.iter(bg_tag):
try:
for element in child.iter(image_tag):
image_id = element.attrib[attr_tag]
break
except: pass
if image_id is not None: break
else: return None
# next, extract the background image from the slide
slide1_rels_file = 'ppt/slides/_rels/slide{}.xml.rels'.format(slide_index + 1)
if slide1_rels_file in myzip.namelist():
with myzip.open(slide1_rels_file) as f:
# Parse the XML tree from the relationships file
tree = ET.parse(f)
root = tree.getroot()
# Define the namespace used in the relationships file
namespaces = {'r': 'http://schemas.openxmlformats.org/package/2006/relationships'}
# Look for all relationship elements that have a type attribute for image
for rel in root.findall('r:Relationship', namespaces):
# Check if the relationship is for an image file
if 'image' in rel.attrib['Type'] and rel.attrib['Id'] == image_id:
target = rel.attrib['Target']
if target.startswith('..'):
# Resolve the relative path to get the correct path within the zip file
image_file_path = os.path.normpath(os.path.join('ppt/slides', target))
# Replace backslashes with forward slashes for ZIP compatibility
image_file_path = image_file_path.replace('\\', '/')
tmpdirname = os.path.dirname(ppt_file_localhost_path)
myzip.extract(image_file_path, tmpdirname)
image_file_path = os.path.join(tmpdirname, image_file_path)
return image_file_path
else: # absolute path
assert target.startswith("file://"), target
image_file_path = target[7:]
break
if image_file_path is None:
return None
else:
# Get the audio file from vm and return the file path in the host
return get_vm_file(env, {"path": image_file_path, "dest": dest})
def get_audio_in_slide(env, config: Dict[str, str]):
ppt_file_path, slide_index, dest = config["ppt_file_path"], int(config["slide_index"]), config["dest"]
@@ -40,20 +101,23 @@ def get_audio_in_slide(env, config: Dict[str, str]):
audio_file_path = audio_file_path.replace('\\', '/')
# Create a temporary directory to extract the audio file
with tempfile.TemporaryDirectory() as tmpdirname:
# Extract the audio file
myzip.extract(audio_file_path, tmpdirname)
# Get the full path of the extracted audio file
extracted_audio_path = os.path.join(tmpdirname, audio_file_path)
# Return the extracted audio file path
audio_file_path = extracted_audio_path
tmpdirname = os.path.dirname(ppt_file_localhost_path)
myzip.extract(audio_file_path, tmpdirname)
audio_file_path = os.path.join(tmpdirname, audio_file_path)
return audio_file_path
# with tempfile.TemporaryDirectory() as tmpdirname:
# # Extract the audio file
# myzip.extract(audio_file_path, tmpdirname)
# # Get the full path of the extracted audio file
# extracted_audio_path = os.path.join(tmpdirname, audio_file_path)
# # Return the extracted audio file path
# audio_file_path = extracted_audio_path
else:
# the audio file is external to the .pptx file
# Return the audio file path
assert target.startswith("file://"), target
audio_file_path = target[7:]
break
if audio_file_path is None:
return None

View File

@@ -1,5 +1,5 @@
import logging
from typing import TypeVar
from typing import TypeVar, Dict
from datetime import datetime, timedelta
logger = logging.getLogger("desktopenv.getters.misc")
@@ -74,13 +74,13 @@ relativeTime_to_IntDay = {
"first monday four months later": "special"
}
def get_rule(env, config: R) -> R:
def get_rule(env, config: Dict[str, R]) -> R:
"""
Returns the rule as-is.
"""
return config["rules"]
def get_rule_relativeTime(env, config: R) -> R:
def get_rule_relativeTime(env, config: Dict[str, R]) -> R:
"""
According to the rule definded in funciton "apply_rules_to_timeFormat", convert the relative time to absolute time.
config:

View File

@@ -21,6 +21,7 @@ from .chrome import (
is_expected_url_pattern_match,
is_added_to_steam_cart,
is_expected_installed_extensions
compare_pdf_images
)
from .docs import (
compare_font_names,
@@ -49,6 +50,9 @@ from .docs import (
check_tabstops,
compare_contains_image,
compare_docx_files_and_ignore_new_lines
compare_docx_images,
compare_image_text,
compare_references
)
from .general import (
check_csv,
@@ -69,12 +73,14 @@ from .general import (
compare_terminal_and_txt,
fuzzy_place_math,
compare_python_pure_text
diff_text_file,
literal_match
)
from .gimp import (
check_brightness_decrease_and_structure_sim,
check_contrast_increase_and_structure_sim,
check_saturation_increase_and_structure_sim,
check_image_size_and_structure_sim,
check_image_size,
check_image_mirror,
check_palette_and_structure_sim,
check_textbox_on_leftside,
@@ -87,7 +93,9 @@ from .gimp import (
increase_saturation,
decrease_brightness,
check_file_exists,
compare_triangle_positions
compare_triangle_positions,
check_sharper,
check_image_file_size
)
from .libreoffice import check_libre_locale
from .pdf import check_pdf_pages
@@ -131,11 +139,17 @@ from .vscode import (
compare_text_file,
compare_config,
compare_answer,
compare_result_files,
is_extension_installed,
check_json_settings,
check_json_keybindings
check_json_keybindings,
check_python_file_by_test_suite,
check_python_file_by_gold_file,
check_html_background_image,
compare_zip_files
)
from .calc import compare_conference_city_in_order
from .others import compare_epub, check_mp3_meta
def infeasible():
pass

View File

@@ -2,9 +2,9 @@ import logging
import os
import re
import shutil
from itertools import product
from typing import Any, Dict, List, Union
import fitz # PyMuPDF
import rapidfuzz.fuzz as fuzz
from bs4 import BeautifulSoup, Tag
@@ -97,6 +97,29 @@ def is_expected_bookmarks(bookmarks: List[str], rule: Dict[str, Any]) -> float:
bookmark_bar_websites_urls = [bookmark['url'] for bookmark in bookmarks['bookmark_bar']['children'] if
bookmark['type'] == 'url']
return 1. if set(bookmark_bar_websites_urls) == set(rule['urls']) else 0.
elif rule['type'] == "liked_authors_websites_urls":
# Check if "liked authors" folder exists
liked_authors_folder = next((bookmark for bookmark in bookmarks['bookmark_bar']['children'] if
bookmark['type'] == 'folder' and bookmark['name'] == 'Liked Authors'), None)
if liked_authors_folder:
# Check if it contains the specified URLs
liked_authors_urls = [bookmark['url'] for bookmark in liked_authors_folder['children'] if
bookmark['type'] == 'url']
urls = rule['urls']
for idx, url in enumerate(urls):
if isinstance(url, str):
urls[idx] = [url]
combinations = product(*urls)
for combination in combinations:
if set(combination) == set(liked_authors_urls):
return 1.
return 0.
else:
return 0.
else:
raise TypeError(f"{rule['type']} not support yet!")
@@ -136,6 +159,53 @@ def compare_pdfs(pdf1_path: Union[str, List[str]], pdf2_path: Union[str, List[st
return score / len(pdf2_path)
import fitz
from PIL import Image
from borb.pdf import Document
from borb.pdf import PDF
from pathlib import Path
import typing
def compare_pdf_images(pdf1_path: str, pdf2_path: str, **kwargs) -> float:
def extract_images_from_pdf(pdf_path):
pdf_document = fitz.open(pdf_path)
images = []
for page_number in range(pdf_document.page_count):
page = pdf_document[page_number]
pixmap = page.get_pixmap()
img = Image.frombytes("RGB", [pixmap.width, pixmap.height], pixmap.samples)
images.append(img)
return images
def fix_pdf(in_path: Path, out_path: Path) -> None:
doc: typing.Optional[Document] = None
with open(in_path, "rb") as fh:
doc = PDF.loads(fh)
with open(out_path, "wb") as fh:
PDF.dumps(fh, doc)
fix_pdf(Path(pdf1_path), Path(pdf1_path))
fix_pdf(Path(pdf2_path), Path(pdf2_path))
images1 = extract_images_from_pdf(pdf1_path)
images2 = extract_images_from_pdf(pdf2_path)
if len(images1) != len(images2):
return 0.
for img1, img2 in zip(images1, images2):
if img1.tobytes() != img2.tobytes():
return 0.
return 1.
def compare_archive(pred_path: str, gold_path: str, **kwargs) -> float:
"""
Compare two archives. Note that the files in the archives should be of the same type.

View File

@@ -11,6 +11,7 @@ from docx.shared import RGBColor
from odf.opendocument import load
from odf.text import P
from odf.text import Span
from rapidfuzz import fuzz
from skimage.color import deltaE_ciede2000
from skimage.color import rgb2lab
@@ -57,6 +58,10 @@ def contains_page_break(docx_file):
def compare_docx_files(file1, file2, **options):
ignore_blanks = options.get('ignore_blanks', True)
ignore_case = options.get('ignore_case', False)
ignore_order = options.get('ignore_order', False)
content_only = options.get('content_only', False)
def get_paragraph_texts_odt(document):
paragraphs = document.getElementsByType(P)
paragraph_texts = []
@@ -79,20 +84,37 @@ def compare_docx_files(file1, file2, **options):
doc2 = Document(file2)
doc1_paragraphs = [p.text for p in doc1.paragraphs]
doc2_paragraphs = [p.text for p in doc2.paragraphs]
if ignore_order:
doc1_paragraphs = sorted(doc1_paragraphs)
doc2_paragraphs = sorted(doc2_paragraphs)
elif file1.endswith('.odt') and file2.endswith('.odt'):
doc1 = load(file1)
doc2 = load(file2)
doc1_paragraphs = get_paragraph_texts_odt(doc1)
doc2_paragraphs = get_paragraph_texts_odt(doc2)
if ignore_order:
doc1_paragraphs = sorted(doc1_paragraphs)
doc2_paragraphs = sorted(doc2_paragraphs)
else:
# Unsupported file types or mismatch
print("Unsupported file types or mismatch between file types.")
return 0
if content_only:
# Compare the content of the documents
text1 = re.sub(r'\s+', ' ', '\n'.join(doc1_paragraphs)).strip()
text2 = re.sub(r'\s+', ' ', '\n'.join(doc2_paragraphs)).strip()
if ignore_case:
text1, text2 = text1.lower(), text2.lower()
similarity = fuzz.ratio(text1, text2) / 100.0
return similarity
# Process and compare documents
if ignore_blanks:
text1 = re.sub(r'\s+', ' ', '\n'.join(doc1_paragraphs)).strip()
text2 = re.sub(r'\s+', ' ', '\n'.join(doc2_paragraphs)).strip()
if ignore_case:
text1, text2 = text1.lower(), text2.lower()
if text1 != text2:
return 0
else:
@@ -106,6 +128,8 @@ def compare_docx_files(file1, file2, **options):
print("in compare")
# Compare each paragraph
for p1, p2 in zip(doc1_paragraphs, doc2_paragraphs):
if ignore_case:
p1, p2 = p1.lower(), p2.lower()
if p1 != p2:
print(p1)
print(p2)
@@ -157,6 +181,44 @@ def compare_docx_tables(docx_file1, docx_file2):
return 1
from io import BytesIO
from PIL import Image
def compare_docx_images(docx_file1, docx_file2):
doc1 = Document(docx_file1)
doc2 = Document(docx_file2)
def extract_images(doc):
images = []
for rel in doc.part.rels.values():
if "image" in rel.reltype:
img_data = rel.target_part.blob
images.append(BytesIO(img_data))
return images
images1 = extract_images(doc1)
images2 = extract_images(doc2)
if len(images1) != len(images2):
return 0
for img1, img2 in zip(images1, images2):
if Image.open(img1).tobytes() != Image.open(img2).tobytes():
return 0
return 1
import pytesseract
def compare_image_text(image_path, rule):
img = Image.open(image_path)
img_text = pytesseract.image_to_string(img)
if rule['type'] == 'text':
return 1 if rule['text'] in img_text else 0
else:
raise ValueError("Unsupported rule type")
def compare_line_spacing(docx_file1, docx_file2):
if not compare_docx_files(docx_file1, docx_file2):
return 0
@@ -263,7 +325,7 @@ def check_tabstops(docx_file1, docx_file2, **kwargs) -> float:
section = doc2.sections[0]
paragraph_width = section.page_width - section.left_margin - section.right_margin
ignore_tabs = lambda x: x.alignment == WD_TAB_ALIGNMENT.CLEAR or (
x.alignment == WD_TAB_ALIGNMENT.LEFT and x.position == 0)
x.alignment == WD_TAB_ALIGNMENT.LEFT and x.position == 0)
minus = .0
for p1, p2 in zip(para1, para2):
# filter CLEAR tabstop and default left-0 tabstop
@@ -566,3 +628,95 @@ def compare_highlighted_text(file1, file2):
return 1
else:
return 0
def compare_references(file1, file2, **options):
reference_indicator = options.get('reference_indicator', 'References')
reference_base_result = options.get('reference_base_result', 0.5)
# Determine file types and load documents
if file1.endswith('.docx') and file2.endswith('.docx'):
doc1 = Document(file1)
doc2 = Document(file2)
doc1_paragraphs = [p.text for p in doc1.paragraphs]
doc2_paragraphs = [p.text for p in doc2.paragraphs]
else:
# Unsupported file types or mismatch
print("Unsupported file types or mismatch between file types.")
return 0
# Find the references section in the paragraphs, find the idx of the last reference_indicator in the paragraph list
ref1_idx = doc1_paragraphs.index(reference_indicator) if reference_indicator in doc1_paragraphs else -1
ref2_idx = doc2_paragraphs.index(reference_indicator) if reference_indicator in doc2_paragraphs else -1
if ref1_idx == -1 and ref2_idx == -1:
return 1
if ref1_idx == -1 or ref2_idx == -1:
return 0
# split the reference section into reference items, and remove the empty string items
ref1 = [p for p in doc1_paragraphs[ref1_idx + 1:] if p.strip()]
ref2 = [p for p in doc2_paragraphs[ref2_idx + 1:] if p.strip()]
# Compare the references
if len(ref1) != len(ref2):
return 0
total_similarity = 0
for r1, r2 in zip(ref1, ref2):
# fuzzy match the references
similarity = fuzz.ratio(r1, r2) / 100.0
total_similarity += similarity
result = total_similarity / len(ref1)
if result >= reference_base_result:
return (result - reference_base_result) / (1 - reference_base_result)
else:
return 0
def compare_answer(file1, file2, **options):
"""This is a specific function to compare the """
# Determine file types and load documents
if file1.endswith('.docx') and file2.endswith('.docx'):
doc1 = Document(file1)
doc2 = Document(file2)
doc1_paragraphs = [p.text for p in doc1.paragraphs]
doc2_paragraphs = [p.text for p in doc2.paragraphs]
else:
# Unsupported file types or mismatch
print("Unsupported file types or mismatch between file types.")
return 0
# Find the references section in the paragraphs, find the idx of the last reference_indicator in the paragraph list
ref1_idx = doc1_paragraphs.index(reference_indicator) if reference_indicator in doc1_paragraphs else -1
ref2_idx = doc2_paragraphs.index(reference_indicator) if reference_indicator in doc2_paragraphs else -1
if ref1_idx == -1 and ref2_idx == -1:
return 1
if ref1_idx == -1 or ref2_idx == -1:
return 0
# split the reference section into reference items, and remove the empty string items
ref1 = [p for p in doc1_paragraphs[ref1_idx + 1:] if p.strip()]
ref2 = [p for p in doc2_paragraphs[ref2_idx + 1:] if p.strip()]
# Compare the references
if len(ref1) != len(ref2):
return 0
total_similarity = 0
for r1, r2 in zip(ref1, ref2):
# fuzzy match the references
similarity = fuzz.ratio(r1, r2) / 100.0
total_similarity += similarity
result = total_similarity / len(ref1)
if result >= reference_base_result:
return (result - reference_base_result) / (1 - reference_base_result)
else:
return 0

View File

@@ -1,6 +1,7 @@
import csv
import functools
import json
import yaml
import operator
import re
import pdfplumber
@@ -15,9 +16,13 @@ from lxml.cssselect import CSSSelector
from lxml.etree import _Element
from rapidfuzz import fuzz
from docx import Document
import difflib
from .utils import _match_record, _match_value_to_rule
import logging
logger = logging.getLogger("desktopenv.metric.general")
def check_include_exclude(result: str, rules: Dict[str, List[str]]) -> float:
if result is None:
@@ -41,6 +46,24 @@ def exact_match(result, rules) -> float:
else:
return 0.
def literal_match(result: Any, expected: Any, **options) -> float:
literal_type = options.get('type', 'str')
if literal_type == 'str':
ignore_case = options.get('ignore_case', False)
score = str(result) == str(expected) if not ignore_case else str(result).lower() == str(expected).lower()
return float(score)
elif literal_type == 'list':
if type(result) not in [list, tuple] or type(expected) not in [list, tuple] or len(result) != len(expected):
return .0
ignore_case = options.get('ignore_case', False)
result = [str(s) for s in result] if not ignore_case else [str(s).lower() for s in result]
expected = [str(s) for s in expected] if not ignore_case else [str(s).lower() for s in expected]
return float(result == expected)
else:
raise NotImplementedError(f"Type {type} not supported")
def is_in_list(result, rules) -> float:
expect = rules["expected"]
if expect in result:
@@ -48,6 +71,15 @@ def is_in_list(result, rules) -> float:
else:
return 0.
def diff_text_file(result: str, expect: str) -> float:
if result is None:
return 0.
with open(result) as f:
result_lines: List[str] = f.read().splitlines()
with open(expect) as f:
expected_lines: List[str] = f.read().splitlines()
return difflib.SequenceMatcher(a=result_lines, b=expected_lines).ratio()
def fuzzy_match(result, rules) -> float:
expect = rules["expected"]
@@ -62,7 +94,7 @@ def fuzzy_place_math(result_file_path, rules) -> float:
words_list = []
for para in doc.paragraphs:
words_list.extend(para.text.split())
# 打印出提取的单词列表
# Print out the list of extracted words
print(words_list)
for word in words_list:
if not any(ans in word for ans in expect):
@@ -140,11 +172,11 @@ _accessibility_ns_map = {"st": "uri:deskat:state.at-spi.gnome.org"
}
def check_accessibility_tree(result: str, rules: Dict[str, Any]) -> float:
def check_accessibility_tree(result: str, rules: List[Dict[str, Any]]) -> float:
"""
Args:
result (str): XML of GNOME Accessibility Tree
rules (Dict[str, Any]): dict like
rules (List[Dict[str, Any]]): list of dict like
{
"selectors": list of str as CSS selectors, will be connected by ", "
to form a composite selector. Only one from `selectors` and
@@ -162,30 +194,33 @@ def check_accessibility_tree(result: str, rules: Dict[str, Any]) -> float:
"""
at: _Element = lxml.etree.fromstring(result)
if "xpath" in rules:
elements: List[_Element] = at.xpath(rules["xpath"], namespaces=_accessibility_ns_map)
elif "selectors" in rules:
selector = CSSSelector(", ".join(rules["selectors"]), namespaces=_accessibility_ns_map)
elements: List[_Element] = selector(at)
else:
raise ValueError("At least one of xpath and selectors is required")
total_match_score = 1.
for r in rules:
if "xpath" in r:
elements: List[_Element] = at.xpath(r["xpath"], namespaces=_accessibility_ns_map)
elif "selectors" in r:
selector = CSSSelector(", ".join(r["selectors"]), namespaces=_accessibility_ns_map)
elements: List[_Element] = selector(at)
else:
raise ValueError("At least one of xpath and selectors is required")
if len(elements) == 0:
print("no elements")
return 0.
if len(elements) == 0:
logger.info("No elements: %s", r["xpath"] if "xpath" in r else r["selectors"])
return 0.
if "text" in rules:
match_func: Callable[[str], Number] = functools.partial(operator.eq if rules["exact"] \
else (lambda a, b: fuzz.ratio(a, b) / 100.)
, rules["text"]
)
match_score: Number = 0
for elm in elements:
match_score = max(match_score, match_func(elm.text or None))
else:
match_score = 1.
if "text" in r:
match_func: Callable[[str], Number] = functools.partial( operator.eq if r["exact"] \
else (lambda a, b: fuzz.ratio(a, b) / 100.)
, r["text"]
)
match_score: Number = 0
for elm in elements:
match_score = max(match_score, match_func(elm.text or None))
else:
match_score = 1.
total_match_score *= match_score
return float(match_score)
return float(total_match_score)
# def check_existence(result: str, *args) -> float:
@@ -197,7 +232,7 @@ def run_sqlite3(result: str, rules: Dict[str, Any]) -> float:
return float(cursor.fetchone()[0] or 0)
def check_json(result: str, rules: Dict[str, List[Dict[str, Union[List[str], str]]]]) -> float:
def check_json(result: str, rules: Dict[str, List[Dict[str, Union[List[str], str]]]], is_yaml: bool = False) -> float:
"""
Args:
result (str): path to json file
@@ -212,6 +247,7 @@ def check_json(result: str, rules: Dict[str, List[Dict[str, Union[List[str], str
],
"unexpect": <the same as `expect`
}
is_yaml (bool): yaml rather than json
Returns:
float
@@ -220,7 +256,10 @@ def check_json(result: str, rules: Dict[str, List[Dict[str, Union[List[str], str
if result is None:
return 0.
with open(result) as f:
result: Dict[str, Any] = json.load(f)
if is_yaml:
result: Dict[str, Any] = yaml.load(f, Loader=yaml.Loader)
else:
result: Dict[str, Any] = json.load(f)
expect_rules = rules.get("expect", {})
unexpect_rules = rules.get("unexpect", {})
@@ -229,14 +268,21 @@ def check_json(result: str, rules: Dict[str, List[Dict[str, Union[List[str], str
for r in expect_rules:
value = result
for k in r["key"]:
value = value[k]
try:
value = value[k]
except KeyError:
return 0.
metric = metric and _match_value_to_rule(value, r)
for r in unexpect_rules:
value = result
for k in r["key"]:
value = value[k]
try:
value = value[k]
except KeyError:
value = None
break
metric = metric and not _match_value_to_rule(value, r)
return metric
return float(metric)
def check_direct_json_object(result, rules)->float:
@@ -257,6 +303,7 @@ def check_direct_json_object(result, rules)->float:
print(rules["expected"])
if result is None:
return 0.
expect_in_result = rules.get("expect_in_result", False)
if not expect_in_result:
expected_json = rules["expected"]
@@ -374,8 +421,6 @@ def compare_python_pure_text(py_file_path, gold_file_path):
content1 = file1.read()
with open(gold_file_path, 'r') as file2:
content2 = file2.read()
# 移除文件内容中的所有空白字符
content1_no_whitespace = remove_whitespace(content1)
content2_no_whitespace = remove_whitespace(content2)
# 比较处理后的文件内容
return content1_no_whitespace == content2_no_whitespace

View File

@@ -5,7 +5,7 @@ from PIL import Image, ImageChops, ImageStat
def compare_image_list(pred_img_path_list: Union[str, List[str]],
gold_img_path_list: Union[str, List[str]]) -> float:
gold_img_path_list: Union[str, List[str]]) -> float:
""" Compare two image lists, only if all images are the same, return 1.0, otherwise return 0.0
"""
if type(pred_img_path_list) != list:
@@ -177,6 +177,16 @@ def calculate_contrast(image):
return np.std(pixels)
def calculate_image_sharpness(image_path):
# Load the image in grayscale
image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
# Apply the Laplacian operator
laplacian = cv2.Laplacian(image, cv2.CV_64F)
# Calculate the variance
variance = np.var(laplacian)
return variance
def structure_check_by_mse(img1, img2, threshold=0.03):
"""Check if two images are approximately the same by MSE"""
mse = np.mean(
@@ -295,7 +305,8 @@ def check_triangle_position(tgt_path):
# We assume the triangle is a different color from the background
# Find the unique colors
unique_colors, counts = np.unique(img_array.reshape(-1, img_array.shape[2]), axis=0, return_counts=True)
unique_colors, counts = np.unique(img_array.reshape(-1, img_array.shape[2]), axis=0,
return_counts=True)
unique_colors_sorted = unique_colors[np.argsort(counts)]
# Assuming the background is the most common color and the triangle is a different color
@@ -337,6 +348,25 @@ def check_structure_sim(src_path, tgt_path):
return structure_same
def check_structure_sim_resized(src_path, tgt_path):
"""
Check if the structure of the two images are similar after resizing.
gimp:d16c99dc-2a1e-46f2-b350-d97c86c85c15
"""
if src_path is None or tgt_path is None:
return 0.
img_src = Image.open(src_path)
img_tgt = Image.open(tgt_path)
# Resize the images to the same size
img_src = img_src.resize(img_tgt.size)
# Check if the structure is similar
structure_same = structure_check_by_ssim(img_src, img_tgt)
return structure_same
def check_contrast_increase_and_structure_sim(src_path, tgt_path):
"""
Check if the src image has higher contrast than the tgt image and the structures are similar
@@ -388,34 +418,28 @@ def check_config_status(actual_config_path, rule):
return 0.
def check_image_size_and_structure_sim(src_path, tgt_path, height=512, width=None):
def check_image_size(src_path, rule):
"""
Check if the size of the src image is correct and the structure of the two images are similar.
gimp:d16c99dc-2a1e-46f2-b350-d97c86c85c15
Check if the size of the src image is correct
multi-apps:42f4d1c7-4521-4161-b646-0a8934e36081
"""
if src_path is None or tgt_path is None:
if src_path is None:
return 0.
# Load images
source_image = Image.open(src_path)
target_image = Image.open(tgt_path)
# Load the image
img = Image.open(src_path)
# Check size
if width is not None:
width_same = source_image.size[0] == width
else:
width_same = True
if height is not None:
height_same = source_image.size[1] == height
# Check the size
if rule["height"] is not None:
height_same = img.size[1] == rule["height"]
else:
height_same = True
if rule["width"] is not None:
width_same = img.size[0] == rule["width"]
else:
width_same = True
# Check structure
resized_target_image = target_image.resize(source_image.size)
structure_same = structure_check_by_ssim(source_image, resized_target_image)
if width_same and height_same and structure_same:
if height_same and width_same:
return 1.
else:
return 0.
@@ -521,6 +545,31 @@ def check_green_background(src_path, tgt_path):
return 1.
def check_sharper(src_path, tgt_path):
"""
Check if the source image is sharper than the target image.
multi-app:bb7db4c2-30b5-4be7-8dd7-b8c4ec7d3108
"""
sharpness_src = calculate_image_sharpness(src_path)
sharpness_tgt = calculate_image_sharpness(tgt_path)
return 1.0 if sharpness_src > sharpness_tgt else 0.0
def check_image_file_size(src_path, rule):
"""
Check if the size of the src image within 500KB
"""
if src_path is None:
return 0.0
# Check the size
file_size = os.path.getsize(src_path)
if file_size < rule["max_size"]:
return 1.0
else:
return 0.0
if __name__ == "__main__":
actual_config_path = "../../../cache/sessionrc_test"
rule = {
@@ -550,3 +599,12 @@ if __name__ == "__main__":
tgt_path = "../../../cache/f4aec372-4fb0-4df5-a52b-79e0e2a5d6ce/Triangle_In_The_Middle.png"
print(check_triangle_position(tgt_path))
src_path = "../../../cache/bb7db4c2-30b5-4be7-8dd7-b8c4ec7d3108/anmi_sharper.png"
tgt_path = "../../../cache/bb7db4c2-30b5-4be7-8dd7-b8c4ec7d3108/anmi.png"
print(check_sharper(src_path, tgt_path))
src_path = "../../../cache/3c8f201a-009d-4bbe-8b65-a6f8b35bb57f/compressed.jpeg"
rule = {
"max_size": 500000
}
print(check_image_file_size(src_path, rule))

View File

@@ -0,0 +1,128 @@
import zipfile
import os.path
import os
import lxml.html
from lxml.html import HtmlElement
from typing import List, Dict
from typing import Union, TypeVar
from mutagen.easyid3 import EasyID3
from .general import diff_text_file
from .utils import _match_value_to_rule
import logging
logger = logging.getLogger("desktopenv.metric.others")
def process_epub(filename: str) -> List[str]:
file_list: List[str] = []
base_dir: str = filename + ".dir"
os.makedirs(base_dir, exist_ok=True)
try:
with zipfile.ZipFile(filename, "r") as z_f:
with z_f.open("toc.ncx") as in_f\
, open(os.path.join(base_dir, "toc.ncx"), "w") as out_f:
contents: str = in_f.read().decode()
contents = contents.splitlines()
for l in contents:
if "navPoint" not in l:
out_f.write(l + "\n")
file_list.append(os.path.join(base_dir, "toc.ncx"))
with z_f.open("content.opf") as in_f\
, open(os.path.join(base_dir, "content.opf"), "w") as out_f:
contents: str = in_f.read().decode()
contents = contents.splitlines()
for l in contents:
if "dc:identifier" not in l:
out_f.write(l + "\n")
file_list.append(os.path.join(base_dir, "content.opf"))
for f_n in z_f.namelist():
if f_n.endswith(".html"):
with z_f.open(f_n) as in_f\
, open(os.path.join(base_dir, f_n), "w") as out_f:
html: HtmlElement = lxml.html.fromstring(
''.join( filter( lambda ch: ch!="\n" and ch!="\r"
, in_f.read().decode()
)
).encode()
)
out_f.write(lxml.html.tostring(html, pretty_print=True, encoding="unicode"))
file_list.append(os.path.join(base_dir, f_n))
logger.debug("%s: %s", filename, file_list)
return list(sorted(file_list))
except zipfile.BadZipFile:
return []
def compare_epub(result: str, expected: str) -> float:
if result is None:
return 0.
result_files: List[str] = process_epub(result)
expected_files: List[str] = process_epub(expected)
metric: float = 1.
for f1, f2 in zip(result_files, expected_files):
current_metric: float = diff_text_file(f1, f2)
logger.debug("%s vs %s: %f", f1, f2, current_metric)
metric *= current_metric
return metric
V = TypeVar("Value")
def check_mp3_meta(result: str, meta: Dict[str, Dict[str, Union[str, V]]]) -> bool:
# checks using _match_value_to_rule
if result is None:
return 0.
id3_dict = EasyID3(result)
metric: bool = True
for k, r in meta.items():
value = id3_dict.get(k, "")
if isinstance(value, list):
value: str = ",".join(value)
logger.debug("%s.%s: %s", result, k, value)
metric = metric and _match_value_to_rule(value, r)
return float(metric)
if __name__ == "__main__":
import datetime
import sys
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
file_handler = logging.FileHandler(os.path.join("logs", "normal-{:}.log".format(datetime_str)))
debug_handler = logging.FileHandler(os.path.join("logs", "debug-{:}.log".format(datetime_str)))
stdout_handler = logging.StreamHandler(sys.stdout)
sdebug_handler = logging.FileHandler(os.path.join("logs", "sdebug-{:}.log".format(datetime_str)))
file_handler.setLevel(logging.INFO)
debug_handler.setLevel(logging.DEBUG)
stdout_handler.setLevel(logging.INFO)
sdebug_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s")
file_handler.setFormatter(formatter)
debug_handler.setFormatter(formatter)
stdout_handler.setFormatter(formatter)
sdebug_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(debug_handler)
logger.addHandler(stdout_handler)
logger.addHandler(sdebug_handler)
metric = check_mp3_meta( "snapshots/test/cache/3f05f3b9-29ba-4b6b-95aa-2204697ffc06/Cheng Xiang - Missing You - gt.mp3"
, { "title": { "method": "eq"
, "ref": "Missing You"
}
, "artist": { "method": "eq"
, "ref": "Cheng Xiang"
}
}
)
print(metric)

View File

@@ -182,7 +182,7 @@ def compare_pptx_files(file1_path, file2_path, **options):
else:
return None
if get_slide_notes(slide1) != get_slide_notes(slide2) and examine_note:
if get_slide_notes(slide1).strip() != get_slide_notes(slide2).strip() and examine_note:
return 0
# check if the shapes are the same
for shape1, shape2 in zip(slide1.shapes, slide2.shapes):
@@ -235,7 +235,7 @@ def compare_pptx_files(file1_path, file2_path, **options):
return 0
if hasattr(shape1, "text") and hasattr(shape2, "text"):
if shape1.text != shape2.text and examine_text:
if shape1.text.strip() != shape2.text.strip() and examine_text:
return 0
# check if the paragraphs are the same

View File

@@ -5,19 +5,21 @@ import os.path
# import operator
from numbers import Number
from typing import Any, Union, cast, Callable, Iterable
from typing import Dict, List, Tuple
from typing import Dict, List, Tuple, Set
import openpyxl
import pandas as pd
from openpyxl import Workbook
from openpyxl.cell.cell import Cell
# from openpyxl.worksheet.cell_range import MultiCellRange
from openpyxl.worksheet.cell_range import MultiCellRange
from openpyxl.utils import get_column_letter
from openpyxl.worksheet.datavalidation import DataValidation
from openpyxl.worksheet.worksheet import Worksheet
from .utils import _match_value_to_rule, _read_cell_style, read_cell_value
from .utils import load_charts, load_sparklines, load_rows_or_cols, load_xlsx_styles\
, load_filters, load_pivot_tables
from desktop_env.evaluators.metrics.utils import _match_value_to_rule, _read_cell_style, read_cell_value
from desktop_env.evaluators.metrics.utils import load_charts, load_sparklines, load_rows_or_cols, load_xlsx_styles \
, load_filters, load_pivot_tables
from rapidfuzz import fuzz
# from openpyxl.utils import coordinate_to_tuple
@@ -122,7 +124,6 @@ def compare_table(result: str, expected: str = None, **options) -> float:
worksheetr_names: List[str] = pdworkbookr.sheet_names
if expected is not None:
xlworkbooke: Workbook = openpyxl.load_workbook(filename=expected)
pdworkbooke = pd.ExcelFile(expected)
worksheete_names: List[str] = pdworkbooke.sheet_names
@@ -158,8 +159,8 @@ def compare_table(result: str, expected: str = None, **options) -> float:
return 0.
sheet2: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx1"], pdworkbookr, pdworkbooke))
sheet1 = sheet1.round()
sheet2 = sheet2.round()
sheet1 = sheet1.round(error_limit)
sheet2 = sheet2.round(error_limit)
metric: bool = sheet1.equals(sheet2)
logger.debug("Sheet1: \n%s", str(sheet1))
logger.debug("Sheet2: \n%s", str(sheet2))
@@ -187,6 +188,63 @@ def compare_table(result: str, expected: str = None, **options) -> float:
logger.debug("Assertion: %s =p= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
# }}} Compare Sheet Data by Printed Value #
elif r["type"] == "sheet_fuzzy":
# Fuzzy Match for Ranges {{{ #
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# sheet_idx1: as sheet_idx0
# rules: list of dict, each dict is like
# { "range": ["A1:B6", "C2:E5"],
# "type": "includes" | "included_by" | "fuzzy_match" | "exact_match", # 0 includes 1, 0 includes_by 1
# "threshold": 85, // for fuzzy match
# "ignore_case": true | false,
# "ignore_chars": " ()", # filtered out
# "trim_leadings": "+ ", # filtered by lstrip
# "trim_trailings": "", # filtered by rstrip
# "normalization": [["Rd", "Road"]], # filtered by replace
# }
sheet1: Tuple[BOOK, str] = parse_idx(r["sheet_idx0"], result, expected)
sheet2: Tuple[BOOK, str] = parse_idx(r["sheet_idx1"], result, expected)
total_metric = True
for rl in r["rules"]:
for rng in MultiCellRange(rl["range"]):
for cdn in rng.cells:
coordinate: str = "{:}{:d}".format(get_column_letter(cdn[1]), cdn[0])
value1: str = str(read_cell_value(*sheet1, coordinate))
value2: str = str(read_cell_value(*sheet2, coordinate))
logger.debug("%s: %s vs %s", cdn, value1, value2)
for rplc in rl.get("normalization", []):
value1 = value1.replace(rplc[0], rplc[1])
value2 = value2.replace(rplc[0], rplc[1])
if "trim_leadings" in rl:
value1 = value1.lstrip(rl["trim_leadings"])
value2 = value2.lstrip(rl["trim_leadings"])
if "trim_trailings" in rl:
value1 = value1.rstrip(rl["trim_trailings"])
value2 = value2.rstrip(rl["trim_trailings"])
if "ignore_chars" in rl:
ignore_chars: Set[str] = set(rl["ignore_chars"])
value1 = "".join(filter(lambda ch: ch not in ignore_chars, value1))
value2 = "".join(filter(lambda ch: ch not in ignore_chars, value2))
if rl.get("ignore_case", False):
value1 = value1.lower()
value2 = value2.lower()
if rl["type"]=="includes":
metric: bool = value2 in value1
elif rl["type"]=="included_by":
metric: bool = value1 in value2
elif rl["type"]=="fuzzy_match":
metric: bool = fuzz.ratio(value1, value2) >= rl.get("threshold", 85.)
elif rl["type"]=="exact_match":
metric: bool = value1==value2
total_metric = total_metric and metric
metric: bool = total_metric
logger.debug("Assertion: %s =~= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
# }}} Fuzzy Match for Ranges #
elif r["type"] == "sparkline":
# Compare Sparklines {{{ #
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"

View File

@@ -126,10 +126,14 @@ def load_charts(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[str, An
value_str: str = ser.val.numRef.f
elif hasattr(ser.val, "strRef") and hasattr(ser.val.strRef, "f"):
value_str: str = ser.val.strRef.f
else:
value_str: str = ""
if hasattr(ser.cat, "numRef") and hasattr(ser.cat.numRef, "f"):
categ_str: str = ser.cat.numRef.f
elif hasattr(ser.cat, "strRef") and hasattr(ser.cat.strRef, "f"):
categ_str: str = ser.cat.strRef.f
else:
categ_str: str = ""
series.append("{:},{:}".format(value_str, categ_str))
series: str = ";".join(series)
@@ -272,7 +276,8 @@ def load_pivot_tables(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[s
# }}} function load_pivot_tables #
_shared_str_selector = lxml.cssselect.CSSSelector("oo|sst>oo|si>oo|t", namespaces=_xlsx_ns_mapping)
_shared_str_selector = lxml.cssselect.CSSSelector("oo|sst>oo|si", namespaces=_xlsx_ns_mapping)
_shared_str_value_selector = lxml.cssselect.CSSSelector("oo|t", namespaces=_xlsx_ns_mapping)
def read_cell_value(xlsx_file: str, sheet_name: str, coordinate: str) -> Any:
@@ -283,7 +288,9 @@ def read_cell_value(xlsx_file: str, sheet_name: str, coordinate: str) -> Any:
with z_f.open("xl/sharedStrings.xml") as f:
shared_str_xml: _Element = lxml.etree.fromstring(f.read())
str_elements: List[_Element] = _shared_str_selector(shared_str_xml)
shared_strs: List[str] = [elm.text for elm in str_elements]
shared_strs: List[str] = [ "".join(t.text for t in _shared_str_value_selector(elm))\
for elm in str_elements
]
except:
logger.debug("Read shared strings error: %s", xlsx_file)
@@ -309,14 +316,15 @@ def read_cell_value(xlsx_file: str, sheet_name: str, coordinate: str) -> Any:
, namespaces=_xlsx_ns_imapping
)
logger.debug("%s.%s[%s]: %s", xlsx_file, sheet_name, coordinate, repr(cell))
if "@t" not in cell["c"]:
try:
if "@t" not in cell["c"] or cell["c"]["@t"] == "n":
return float(cell["c"]["v"])
if cell["c"]["@t"] == "s":
return shared_strs[int(cell["c"]["v"])]
if cell["c"]["@t"] == "str":
return cell["c"]["v"]
except (KeyError, ValueError):
return None
if cell["c"]["@t"] == "s":
return shared_strs[int(cell["c"]["v"])]
if cell["c"]["@t"] == "n":
return float(cell["c"]["v"])
if cell["c"]["@t"] == "str":
return cell["c"]["v"]
# }}} read_cell_value #
@@ -589,7 +597,7 @@ def _match_value_to_rule(value: V, rule: Dict[str, Union[str, V]]) -> bool:
bool
"""
if rule["method"].startswith("re"):
if rule["method"].startswith("re"): # re.FLAGs
flags: List[str] = rule["method"].split(".")[1:]
flags: Iterable[re.RegexFlag] = (getattr(re, fl) for fl in flags)
flag: re.RegexFlag = functools.reduce(operator.or_, flags, re.RegexFlag(0))
@@ -602,7 +610,7 @@ def _match_value_to_rule(value: V, rule: Dict[str, Union[str, V]]) -> bool:
, "ge", "gt"
}:
return getattr(operator, rule["method"])(value, rule["ref"])
if rule["method"].startswith("approx"):
if rule["method"].startswith("approx"): # approx:THRESHOLD
threshold: float = float(rule["method"].split(":")[1])
logger.debug("Approx: TH%f, REF%f, VAL%s", threshold, rule["ref"], repr(value))
try:

View File

@@ -1,5 +1,10 @@
import copy
import importlib.util
import json
import sys
import re
from typing import Dict
import json, copy
def check_json_keybindings(actual: str, expected: str, **options) -> float:
"""
@@ -10,6 +15,7 @@ def check_json_keybindings(actual: str, expected: str, **options) -> float:
Return:
float: the score
"""
def direct_load_json(fp):
try:
with open(fp, 'r') as f:
@@ -17,7 +23,7 @@ def check_json_keybindings(actual: str, expected: str, **options) -> float:
return data
except:
return None
def skip_first_line_load_json(fp):
try:
with open(fp, 'r') as f:
@@ -54,7 +60,7 @@ def check_json_settings(actual: str, expected: str, **options) -> float:
with open(actual, 'r') as f:
data = json.load(f)
expect = expected['expected']
data_copy = copy.deepcopy(data)
data_copy.update(expect)
@@ -81,10 +87,51 @@ def compare_text_file(actual: str, expected: str, **options) -> float:
with open(expected) as f2:
expected_text = f2.read()
ignore_blanks = options.get('ignore_blanks', False)
if ignore_blanks:
actual_text = re.sub(r'[\t\n]', ' ', actual_text).strip()
actual_text = re.sub(r'\s+', ' ', actual_text)
expected_text = re.sub(r'[\t\n]', ' ', expected_text).strip()
expected_text = re.sub(r'\s+', ' ', expected_text)
ignore_case = options.get('ignore_case', False)
if ignore_case:
actual_text = actual_text.lower()
expected_text = expected_text.lower()
if actual_text == expected_text:
return 1.0
return 0.0
import zipfile
def compare_zip_files(actual: str, expected: str, **options) -> float:
"""
Args:
actual (str): path to result zip file
expected (str): path to gold zip file
Return:
float: the score
"""
if not actual:
return 0.
with zipfile.ZipFile(actual, 'r') as zip_file1, zipfile.ZipFile(expected, 'r') as zip_file2:
file_list1 = set(zip_file1.namelist())
file_list2 = set(zip_file2.namelist())
if file_list1 != file_list2:
return 0.0
for file_name in file_list1:
content1 = zip_file1.read(file_name)
content2 = zip_file2.read(file_name)
if content1 != content2:
return 0.0
return 1.0
def compare_config(actual: str, rules: Dict, **options) -> float:
if not actual:
@@ -128,3 +175,82 @@ def is_extension_installed(actual: str, rules: Dict, **options):
return 0.0
else:
raise NotImplementedError
def check_python_file_by_test_suite(actual_files, test_file, **options) -> float:
"""Check the python file by running the test suite in the given test file."""
test_function_name = options.get('test_function_name', 'test')
# Create a unique module name, it can be arbitrary but must be unique in the current runtime environment
module_name = 'dynamic_module'
# Load the module from the given file path
spec = importlib.util.spec_from_file_location(module_name, test_file)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module # Add the loaded module to sys.modules
spec.loader.exec_module(module) # Execute the module to make its content available
# Retrieve the function by name from the loaded module and execute it
test_function = getattr(module, test_function_name)
try:
if test_function():
return 1.0
else:
return 0.0
except Exception as e:
return 0.0
def check_python_file_by_gold_file(actual_files, gold_file: str, **options) -> float:
pass
def check_html_background_image(src_path: str, rule: Dict = None) -> float:
"""
Check if the background image is correctly set.
multi-app:bb7db4c2-30b5-4be7-8dd7-b8c4ec7d3108
"""
from bs4 import BeautifulSoup
with open(src_path, 'r') as f:
html_content = f.read()
soup = BeautifulSoup(html_content, 'html.parser')
styles = soup.find_all('style')
for style in styles:
if f'background-image: url(\'{rule["value"]}\')' in style.text:
return 1.0
return 0.0
def compare_result_files(src_path, tgt_path):
"""
Compare whether the content of two files are the same.
multi-app:7f35355e-02a6-45b5-b140-f0be698bcf85
"""
with open(src_path, 'r') as f:
src_content = f.read().strip()
with open(tgt_path, 'r') as f:
tgt_content = f.read().strip()
try:
# Compare the content as numbers
tgt_content_num = float(tgt_content)
if tgt_content in src_content:
# If the content of tgt is in src, return 1.0 since output src might be
# a superset(language description+number) of tgt
return 1.0
src_content_num = float(src_content)
if abs(src_content_num - tgt_content_num) < 1e-4:
return 1.0
return 0.0
except:
if src_content == tgt_content:
return 1.0
return 0.0
if __name__ == "__main__":
src_path = "../../../cache/bb7db4c2-30b5-4be7-8dd7-b8c4ec7d3108/index.html"
rule = {
"type:": "value",
"value": "anmi_sharper.png"
}
print(check_html_background_image(src_path, rule))