Merge remote-tracking branch 'origin/main'

This commit is contained in:
Timothyxxx
2024-02-01 00:55:47 +08:00
13 changed files with 844 additions and 23 deletions

View File

@@ -490,16 +490,16 @@ def get_googledrive_file(env, config: Dict[str, Any]) -> str:
parent_id = file['id']
file.GetContentFile(_path, mimetype=file['mimeType'])
except:
logger.info('[ERROR]: Failed to download the file from Google Drive')
except Exception as e:
logger.info('[ERROR]: Failed to download the file from Google Drive', e)
return None
return _path
if 'query' in config:
return get_single_file(config['query'], os.path.join(env.cache_dir, config['dest']))
elif 'path' in config:
query = [f"title = {fp} and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if idx < len(config['path']) - 1
else f'title = {fp} and trashed = false' for idx, fp in enumerate(config['path'])]
query = [f"title = '{fp}' and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if idx < len(config['path']) - 1
else f"title = '{fp}' and trashed = false" for idx, fp in enumerate(config['path'])]
return get_single_file(query, os.path.join(env.cache_dir, config['dest']))
elif 'query_list' in config:
_path_list = []
@@ -512,8 +512,8 @@ def get_googledrive_file(env, config: Dict[str, Any]) -> str:
_path_list = []
assert len(config['path_list']) == len(config['dest'])
for idx, path in enumerate(config['path_list']):
query = [f"title = {fp} and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if jdx < len(path) - 1
else f'title = {fp} and trashed = false' for jdx, fp in enumerate(path)]
query = [f"title = '{fp}' and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if jdx < len(path) - 1
else f"title = '{fp}' and trashed = false" for jdx, fp in enumerate(path)]
dest = config['dest'][idx]
_path_list.append(get_single_file(query, os.path.join(env.cache_dir, dest)))
return _path_list

View File

@@ -8,6 +8,8 @@ from .chrome import (
is_expected_tabs,
is_expected_bookmarks,
compare_pdfs,
compare_htmls,
compare_archive,
is_cookie_deleted,
is_shortcut_on_desktop,
check_font_size,
@@ -92,7 +94,8 @@ from .table import (
)
from .thunderbird import (
check_thunderbird_prefs,
check_thunderbird_filter
check_thunderbird_filter,
check_thunderbird_folder
)
from .vlc import (
is_vlc_playing,

View File

@@ -1,6 +1,6 @@
import logging, re
from typing import Any, Dict, List
import logging, re, os, shutil
from typing import Any, Dict, List, Union
from bs4 import BeautifulSoup, Tag
import fitz # PyMuPDF
import rapidfuzz.fuzz as fuzz
@@ -14,7 +14,6 @@ def is_expected_tabs(open_tabs: List[Dict[str, str]], rule: Dict[str, Any]) -> f
Checks if the expected tabs are open in Chrome.
"""
print(open_tabs, rule)
match_type = rule['type']
if match_type == "url":
@@ -53,10 +52,12 @@ def is_expected_search_query(active_tab_info: Dict[str, str], rules: Dict[str, A
return 0.
def compare_pdfs(pdf1_path, pdf2_path):
def compare_pdfs(pdf1_path: Union[str, List[str]], pdf2_path: Union[str, List[str]]):
"""
Compare two PDF files.
"""
if type(pdf2_path) != list:
pdf1_path, pdf2_path = [pdf1_path], [pdf2_path]
def extract_text_from_pdf(pdf_path):
"""Extract text from each page of the PDF."""
@@ -65,14 +66,100 @@ def compare_pdfs(pdf1_path, pdf2_path):
for page in pdf:
text += page.get_text()
return text.strip()
try:
text1 = extract_text_from_pdf(pdf1_path)
text2 = extract_text_from_pdf(pdf2_path)
return fuzz.ratio(text1, text2) / 100
except Exception as e:
logger.info(f"[ERROR]: unexpected error occurred when comparing PDF files: {e}")
return 0.0
score = 0.
for path1, path2 in zip(pdf1_path, pdf2_path):
try:
text1 = extract_text_from_pdf(path1)
text2 = extract_text_from_pdf(path2)
score += fuzz.ratio(text1, text2) / 100
except Exception as e:
logger.info(f"[ERROR]: unexpected error occurred when comparing PDF files: {e}")
return score / len(pdf2_path)
def compare_archive(pred_path: str, gold_path: str, **kwargs) -> float:
"""
Compare two archives. Note that the files in the archives should be of the same type.
"""
if not pred_path: return 0.
pred_folder = os.path.splitext(pred_path)[0] + '_pred'
gold_folder = os.path.splitext(gold_path)[0] + '_gold'
if os.path.exists(pred_folder): # remove existing folder for new predictions
shutil.rmtree(pred_folder, ignore_errors=True)
os.makedirs(pred_folder)
shutil.unpack_archive(pred_path, pred_folder)
if not os.path.exists(gold_folder): # use cache if exists
os.makedirs(gold_folder)
shutil.unpack_archive(gold_path, gold_folder)
pred_files = sorted(os.listdir(pred_folder))
gold_files = sorted(os.listdir(gold_folder))
if pred_files != gold_files: return 0.
def get_compare_function():
file_type = kwargs.pop('file_type', 'text')
if file_type == 'text':
from .vscode import compare_text_file
return compare_text_file
elif file_type == 'pdf': return compare_pdfs
elif file_type == 'docx':
from .docs import compare_docx_files
return compare_docx_files
elif file_type == 'ppt':
from .slides import compare_pptx_files
return compare_pptx_files
elif file_type == 'image':
from .vlc import compare_images
return compare_images
elif file_type == 'csv':
from .table import compare_csv
return compare_csv
elif file_type == 'table':
from .table import compare_table
return compare_table
elif file_type == 'audio':
from .vlc import compare_audios
return compare_audios
elif file_type == 'video':
from .vlc import compare_videos
return compare_videos
else: raise ValueError('[ERROR]: not support file type: %s' % file_type)
score = 0
compare_function = get_compare_function()
for f1, f2 in zip(pred_files, gold_files):
fp1 = os.path.join(pred_folder, f1)
fp2 = os.path.join(gold_folder, f2)
score += compare_function(fp1, fp2, **kwargs)
return score / len(pred_files)
def compare_htmls(html_path1: str, html_path2: str) -> float:
"""
Compare two HTML files.
"""
with open(html_path1, 'r', encoding='utf-8') as inf:
soup1 = BeautifulSoup(inf, 'lxml')
with open(html_path2, 'r', encoding='utf-8') as inf:
soup2 = BeautifulSoup(inf, 'lxml')
def compare_elements(elem1, elem2):
if not (isinstance(elem1, Tag) and isinstance(elem2, Tag)):
return elem1 == elem2
if elem1.name != elem2.name:
return False
if elem1.text.strip() != elem2.text.strip():
return False
if elem1.attrs != elem2.attrs:
return False
return True
for elem1, elem2 in zip(soup1.recursiveChildGenerator(), soup2.recursiveChildGenerator()):
if not compare_elements(elem1, elem2):
return .0
return 1.
def is_cookie_deleted(cookie_data, rule):

View File

@@ -128,6 +128,47 @@ def check_thunderbird_filter(result: str, rules: Dict[str, List[Dict[str, str]]]
unexpect_metric = unexpect_metric and not any(_match_record(r, flt) for r in rules.get("unexpect", []))
return float(all(expect_metrics) and unexpect_metric)
def check_thunderbird_folder(result: Union[str, List[str]], reference: Union[str, List[str]], **kwargs) -> float:
"""
Check the file or file_list that each text file contains all messages in a folder in Thunderbird. Each message is started with `FROM - `.
**kwargs:
ignore_status (bool): for comparison, ignore the status (X-Mozilla-Status: 0000) of each message. default: False
ignore_keys (bool): for comparison, ignore the keys (X-Mozilla-Keys: label) of each message. default: False
remove_deleted (bool): ignore deleted messages which has status code 0008 or 0009. default: True
remove_duplicate (bool): remove duplicate messages. default: True
"""
def normalize_msg(msg, options):
ignore_status = options.get('ignore_status', False)
ignore_keys = options.get('ignore_keys', False)
if ignore_status:
msg = re.sub(r'X-Mozilla-Status\d?:[\s\d]+', '', msg)
if ignore_keys:
msg = re.sub(r'(X-Mozilla-Keys:[^\n]*?)\n(MIME-Version)', r'\2', msg)
return msg.strip()
def read_thunderbird_folder_file(path: str) -> str:
with open(path, 'r') as inf:
data = inf.read().strip()
messages = []
for mail in data.split('FROM - '):
if mail.strip(): continue
if kwargs.get('remove_deleted', True) and re.search(r'X-Mozilla-Status: 000[89]', mail): continue
messages.append('FROM - ' + normalize_msg(mail, kwargs))
if kwargs.get('remove_duplicate', True):
messages = set(messages)
return '\n'.join(sorted(messages))
if type(reference) != list:
result, reference = [result], [reference]
for pred, gold in zip(result, reference):
if pred is None: return .0
mail1 = read_thunderbird_folder_file(pred)
mail2 = read_thunderbird_folder_file(gold)
if mail1 != mail2: return .0
return 1.0
if __name__ == "__main__":
#import lxml.etree
#from lxml.cssselect import CSSSelector