diff --git a/desktop_env/evaluators/metrics/table.py b/desktop_env/evaluators/metrics/table.py index 5a0f79d..4a1598c 100644 --- a/desktop_env/evaluators/metrics/table.py +++ b/desktop_env/evaluators/metrics/table.py @@ -2,6 +2,7 @@ import functools import itertools import logging import os.path + # import operator from numbers import Number from typing import Any, Union, cast, Callable, Iterable @@ -17,9 +18,19 @@ from openpyxl.worksheet.datavalidation import DataValidation from openpyxl.worksheet.worksheet import Worksheet from rapidfuzz import fuzz -from desktop_env.evaluators.metrics.utils import _match_value_to_rule, _read_cell_style, read_cell_value -from desktop_env.evaluators.metrics.utils import load_charts, load_sparklines, load_rows_or_cols, load_xlsx_styles \ - , load_filters, load_pivot_tables +from desktop_env.evaluators.metrics.utils import ( + _match_value_to_rule, + _read_cell_style, + read_cell_value, +) +from desktop_env.evaluators.metrics.utils import ( + load_charts, + load_sparklines, + load_rows_or_cols, + load_xlsx_styles, + load_filters, + load_pivot_tables, +) # from openpyxl.utils import coordinate_to_tuple @@ -28,16 +39,20 @@ logger = logging.getLogger("desktopenv.metric.table") BOOK = Union[pd.ExcelFile, Workbook, str] -def _parse_sheet_idx(sheet_idx: Union[int, str] - , result: BOOK, expected: BOOK - , result_sheet_names: List[str] - , expected_sheet_names: List[str] - ) -> Tuple[BOOK, str]: - # function _parse_sheet_idx {{{ # +def _parse_sheet_idx( + sheet_idx: Union[int, str], + result: BOOK, + expected: BOOK, + result_sheet_names: List[str], + expected_sheet_names: List[str], +) -> Tuple[BOOK, str]: + # function _parse_sheet_idx {{{ # if isinstance(sheet_idx, int): try: if not result_sheet_names or sheet_idx >= len(result_sheet_names): - logger.error(f"Sheet index {sheet_idx} out of range. Available sheets: {result_sheet_names}") + logger.error( + f"Sheet index {sheet_idx} out of range. Available sheets: {result_sheet_names}" + ) index = "" else: index: str = result_sheet_names[sheet_idx] @@ -68,27 +83,31 @@ def _parse_sheet_idx(sheet_idx: Union[int, str] logger.error("Unrecognized sheet index") raise ValueError("Unrecognized sheet index") return book, index - # }}} function _parse_sheet_idx # + # }}} function _parse_sheet_idx # SHEET = Union[pd.DataFrame, Worksheet, List[str]] def _load_sheet(book: BOOK, index: str) -> SHEET: - # function _load_sheet {{{ # + # function _load_sheet {{{ # try: if isinstance(book, str): book: str = cast(str, book) csv_name: str = "{:}-{:}.csv".format(os.path.splitext(book)[0], index) - with open(csv_name) as f: - csv_lines: List[str] = list(itertools.dropwhile(lambda l: len(l) == 0 - , map(lambda l: l.strip() - , reversed(f.read().splitlines()) - ) - ) - ) - return csv_lines + try: + all_lines: List[str] = _safe_read_file(csv_name) + csv_lines: List[str] = list( + itertools.dropwhile( + lambda l: len(l) == 0, + map(lambda l: l.strip(), reversed(all_lines)), + ) + ) + return csv_lines + except (FileNotFoundError, IOError) as e: + logger.error(f"Failed to read CSV file {csv_name}: {e}") + return None if isinstance(book, pd.ExcelFile): return pd.read_excel(book, index) if isinstance(book, Workbook): @@ -99,11 +118,124 @@ def _load_sheet(book: BOOK, index: str) -> SHEET: raise e except: return None - # }}} function _load_sheet # + # }}} function _load_sheet # + + +def _safe_read_file(file_path: str) -> List[str]: + """ + Safely read a file with multiple encoding attempts. + + Args: + file_path: Path to the file to read + + Returns: + List of lines from the file + + Raises: + FileNotFoundError: If file doesn't exist + IOError: If file cannot be read with any encoding + """ + # Common encodings to try in order of preference + encodings = [ + "utf-8", # Most common modern encoding + "utf-8-sig", # UTF-8 with BOM + "latin-1", # ISO-8859-1, works with any byte sequence + "windows-1252", # Common Windows encoding + "gbk", # Chinese encoding + "cp1251", # Cyrillic encoding + "iso-8859-1", # Alternative latin-1 + ] + + last_error = None + + for encoding in encodings: + try: + with open(file_path, "r", encoding=encoding) as f: + lines = f.read().splitlines() + logger.debug( + f"Successfully read file {file_path} with encoding {encoding}" + ) + return lines + except UnicodeDecodeError as e: + last_error = e + logger.debug(f"Failed to read {file_path} with encoding {encoding}: {e}") + continue + except (FileNotFoundError, IOError) as e: + # These are non-encoding related errors, re-raise immediately + raise e + + # If all encodings fail, try with error handling as last resort + try: + with open(file_path, "r", encoding="utf-8", errors="replace") as f: + lines = f.read().splitlines() + logger.warning(f"Read file {file_path} with UTF-8 and error replacement") + return lines + except Exception as e: + logger.error( + f"Failed to read file {file_path} with any encoding. Last error: {last_error}" + ) + raise IOError( + f"Cannot read file {file_path} with any supported encoding" + ) from last_error + + +def compare_csv(result: str, expected: Union[str, List[str]], **options) -> float: + """ + Compare CSV files. If expected is a list, returns 1.0 if result matches any of the expected files. + + Args: + result: Path to result CSV file + expected: Path to expected CSV file or list of paths to expected CSV files + options: Additional options (strict, ignore_case) + + Returns: + 1.0 if result matches expected (or any file in expected list), 0.0 otherwise + """ + if result is None: + return 0.0 + + try: + result_lines: List[str] = _safe_read_file(result) + except (FileNotFoundError, IOError) as e: + logger.error(f"Failed to read result file {result}: {e}") + return 0.0 + + # Convert expected to list if it's a single string (for backward compatibility) + if isinstance(expected, str): + expected_files = [expected] + else: + expected_files = expected + + # Try to match against each expected file + for expected_file in expected_files: + try: + expected_lines: List[str] = _safe_read_file(expected_file) + + # Process lines based on options + current_result_lines = result_lines + current_expected_lines = expected_lines + + if not options.get("strict", True): + current_result_lines = map(str.strip, current_result_lines) + current_expected_lines = map(str.strip, current_expected_lines) + if options.get("ignore_case", False): + current_result_lines = map(str.lower, current_result_lines) + current_expected_lines = map(str.lower, current_expected_lines) + + # Check if this expected file matches + if list(current_result_lines) == list(current_expected_lines): + return 1.0 + + except (FileNotFoundError, IOError): + # If this expected file doesn't exist, continue to next one + continue + + # No match found + return 0.0 def compare_table(result: str, expected: str = None, **options) -> float: - # function compare_table {{{ # + # function compare_table {{{ # """ Args: result (str): path to result xlsx @@ -121,21 +253,23 @@ def compare_table(result: str, expected: str = None, **options) -> float: if result is None: logger.error("Result file path is None") - return 0. + return 0.0 # Check if result file exists if not os.path.exists(result): logger.error(f"Result file not found: {result}") - return 0. + return 0.0 try: logger.info(f"Loading result file: {result}") xlworkbookr: Workbook = openpyxl.load_workbook(filename=result) pdworkbookr = pd.ExcelFile(result) - logger.info(f"Successfully loaded result file with sheets: {pdworkbookr.sheet_names}") + logger.info( + f"Successfully loaded result file with sheets: {pdworkbookr.sheet_names}" + ) except Exception as e: logger.error(f"Failed to load result file {result}: {e}") - return 0. + return 0.0 worksheetr_names: List[str] = pdworkbookr.sheet_names if expected is not None: @@ -147,32 +281,42 @@ def compare_table(result: str, expected: str = None, **options) -> float: pdworkbooke = None worksheete_names: List[str] = None - parse_idx: Callable[[Union[str, int], BOOK, BOOK], Tuple[BOOK, str]] = \ + parse_idx: Callable[[Union[str, int], BOOK, BOOK], Tuple[BOOK, str]] = ( functools.partial( _parse_sheet_idx, result_sheet_names=worksheetr_names, - expected_sheet_names=worksheete_names + expected_sheet_names=worksheete_names, ) + ) passes = True for r in options["rules"]: if r["type"] == "sheet_name": - # Compare Sheet Names {{{ # + # Compare Sheet Names {{{ # metric: bool = worksheetr_names == worksheete_names - logger.debug("Assertion: %s.sheet_names == %s.sheet_names - %s", result, expected, metric) - # }}} Compare Sheet Names # + logger.debug( + "Assertion: %s.sheet_names == %s.sheet_names - %s", + result, + expected, + metric, + ) + # }}} Compare Sheet Names # elif r["type"] == "sheet_data": - # Compare Sheet Data by Internal Value {{{ # + # Compare Sheet Data by Internal Value {{{ # # sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" # sheet_idx1: as sheet_idx0 # precision: int as number of decimal digits, default to 4 error_limit: int = r.get("precision", 4) - sheet1: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx0"], pdworkbookr, pdworkbooke)) + sheet1: pd.DataFrame = _load_sheet( + *parse_idx(r["sheet_idx0"], pdworkbookr, pdworkbooke) + ) if sheet1 is None: - return 0. - sheet2: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx1"], pdworkbookr, pdworkbooke)) + return 0.0 + sheet2: pd.DataFrame = _load_sheet( + *parse_idx(r["sheet_idx1"], pdworkbookr, pdworkbooke) + ) sheet1 = sheet1.round(error_limit) sheet2 = sheet2.round(error_limit) @@ -183,28 +327,36 @@ def compare_table(result: str, expected: str = None, **options) -> float: logger.debug("Sheet1 =v= Sheet2: \n%s", str(sheet1 == sheet2)) except: logger.debug("Sheet1 =/v= Sheet2") - logger.debug("Assertion: %s =v= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric) - # }}} Compare Sheet Data by Internal Value # + logger.debug( + "Assertion: %s =v= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric + ) + # }}} Compare Sheet Data by Internal Value # elif r["type"] == "sheet_print": - # Compare Sheet Data by Printed Value {{{ # + # Compare Sheet Data by Printed Value {{{ # # sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" # sheet_idx1: as sheet_idx0 # ignore_case: optional, defaults to False - sheet1: List[str] = _load_sheet(*parse_idx(r["sheet_idx0"], result, expected)) + sheet1: List[str] = _load_sheet( + *parse_idx(r["sheet_idx0"], result, expected) + ) if sheet1 is None: - return 0. - sheet2: List[str] = _load_sheet(*parse_idx(r["sheet_idx1"], result, expected)) + return 0.0 + sheet2: List[str] = _load_sheet( + *parse_idx(r["sheet_idx1"], result, expected) + ) if r.get("ignore_case", False): sheet1 = [l.lower() for l in sheet1] sheet2 = [l.lower() for l in sheet2] metric: bool = sheet1 == sheet2 - logger.debug("Assertion: %s =p= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric) - # }}} Compare Sheet Data by Printed Value # + logger.debug( + "Assertion: %s =p= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric + ) + # }}} Compare Sheet Data by Printed Value # elif r["type"] == "sheet_fuzzy": - # Fuzzy Match for Ranges {{{ # + # Fuzzy Match for Ranges {{{ # # sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" # sheet_idx1: as sheet_idx0 # rules: list of dict, each dict is like @@ -224,7 +376,9 @@ def compare_table(result: str, expected: str = None, **options) -> float: for rl in r["rules"]: for rng in MultiCellRange(rl["range"]): for cdn in rng.cells: - coordinate: str = "{:}{:d}".format(get_column_letter(cdn[1]), cdn[0]) + coordinate: str = "{:}{:d}".format( + get_column_letter(cdn[1]), cdn[0] + ) value1: str = str(read_cell_value(*sheet1, coordinate)) value2: str = str(read_cell_value(*sheet2, coordinate)) logger.debug("%s: %s vs %s", cdn, value1, value2) @@ -240,8 +394,12 @@ def compare_table(result: str, expected: str = None, **options) -> float: value2 = value2.rstrip(rl["trim_trailings"]) if "ignore_chars" in rl: ignore_chars: Set[str] = set(rl["ignore_chars"]) - value1 = "".join(filter(lambda ch: ch not in ignore_chars, value1)) - value2 = "".join(filter(lambda ch: ch not in ignore_chars, value2)) + value1 = "".join( + filter(lambda ch: ch not in ignore_chars, value1) + ) + value2 = "".join( + filter(lambda ch: ch not in ignore_chars, value2) + ) if rl.get("ignore_case", False): value1 = value1.lower() value2 = value2.lower() @@ -251,91 +409,141 @@ def compare_table(result: str, expected: str = None, **options) -> float: elif rl["type"] == "included_by": metric: bool = value1 in value2 elif rl["type"] == "fuzzy_match": - metric: bool = fuzz.ratio(value1, value2) >= rl.get("threshold", 85.) + metric: bool = fuzz.ratio(value1, value2) >= rl.get( + "threshold", 85.0 + ) elif rl["type"] == "exact_match": metric: bool = value1 == value2 total_metric = total_metric and metric metric: bool = total_metric - logger.debug("Assertion: %s =~= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric) - # }}} Fuzzy Match for Ranges # + logger.debug( + "Assertion: %s =~= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric + ) + # }}} Fuzzy Match for Ranges # elif r["type"] == "sparkline": - # Compare Sparklines {{{ # + # Compare Sparklines {{{ # # sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" # sheet_idx1: as sheet_idx0 - sparkline1: Dict[str, str] = load_sparklines(*parse_idx(r["sheet_idx0"], result, expected)) - sparkline2: Dict[str, str] = load_sparklines(*parse_idx(r["sheet_idx1"], result, expected)) + sparkline1: Dict[str, str] = load_sparklines( + *parse_idx(r["sheet_idx0"], result, expected) + ) + sparkline2: Dict[str, str] = load_sparklines( + *parse_idx(r["sheet_idx1"], result, expected) + ) metric: bool = sparkline1 == sparkline2 - logger.debug("Assertion: %s.sp == %.sp - %s", r["sheet_idx0"], r["sheet_idx1"], metric) - # }}} Compare Sparklines # + logger.debug( + "Assertion: %s.sp == %.sp - %s", + r["sheet_idx0"], + r["sheet_idx1"], + metric, + ) + # }}} Compare Sparklines # elif r["type"] == "chart": - # Compare Charts {{{ # + # Compare Charts {{{ # # sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" # sheet_idx1: as sheet_idx0 # chart_props: list of str, see utils.load_charts - charts1: Dict[str, Any] = load_charts(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r) - charts2: Dict[str, Any] = load_charts(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r) + charts1: Dict[str, Any] = load_charts( + *parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r + ) + charts2: Dict[str, Any] = load_charts( + *parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r + ) metric: bool = charts1 == charts2 - logger.debug("Assertion: %s[chart] == %s[chart] - %s", r["sheet_idx0"], r["sheet_idx1"], metric) - # }}} Compare Charts # + logger.debug( + "Assertion: %s[chart] == %s[chart] - %s", + r["sheet_idx0"], + r["sheet_idx1"], + metric, + ) + # }}} Compare Charts # elif r["type"] == "style": - # Compare Style (Also Conditional Formatiing) {{{ # + # Compare Style (Also Conditional Formatiing) {{{ # # sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" # sheet_idx1: as sheet_idx0 # props: list of str indicating concerned styles, see utils._read_cell_style - sheet_idx1: Tuple[BOOK, str] = parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke) + sheet_idx1: Tuple[BOOK, str] = parse_idx( + r["sheet_idx0"], xlworkbookr, xlworkbooke + ) book_name1: str = parse_idx(r["sheet_idx0"], result, expected)[0] - styles1: Dict[str, List[Any]] = load_xlsx_styles(*sheet_idx1, book_name1, **r) + styles1: Dict[str, List[Any]] = load_xlsx_styles( + *sheet_idx1, book_name1, **r + ) - sheet_idx2: Tuple[BOOK, str] = parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke) + sheet_idx2: Tuple[BOOK, str] = parse_idx( + r["sheet_idx1"], xlworkbookr, xlworkbooke + ) book_name2: str = parse_idx(r["sheet_idx1"], result, expected)[0] - styles2: Dict[str, List[Any]] = load_xlsx_styles(*sheet_idx2, book_name2, **r) + styles2: Dict[str, List[Any]] = load_xlsx_styles( + *sheet_idx2, book_name2, **r + ) # number_formats1: List[str] = [c.number_format.lower() for col in sheet1.iter_cols() for c in col if c.value is not None and c.data_type=="n"] # number_formats2: List[str] = [c.number_format.lower() for col in sheet2.iter_cols() for c in col if c.value is not None and c.data_type=="n"] metric: bool = styles1 == styles2 - logger.debug("Assertion: %s.style == %s.style - %s", r["sheet_idx0"], r["sheet_idx1"], metric) - # }}} Compare Style (Also Conditional Formatiing) # + logger.debug( + "Assertion: %s.style == %s.style - %s", + r["sheet_idx0"], + r["sheet_idx1"], + metric, + ) + # }}} Compare Style (Also Conditional Formatiing) # elif r["type"] == "freeze": - # Compare Freezing {{{ # + # Compare Freezing {{{ # # sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" # sheet_idx1: as sheet_idx0 - sheet1: Worksheet = _load_sheet(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke)) + sheet1: Worksheet = _load_sheet( + *parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke) + ) if sheet1 is None: - return 0. - sheet2: Worksheet = _load_sheet(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke)) + return 0.0 + sheet2: Worksheet = _load_sheet( + *parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke) + ) metric: bool = sheet1.freeze_panes == sheet2.freeze_panes - logger.debug("Assertion: %s.freeze(%s) == %s.freeze(%s) - %s" - , r["sheet_idx0"], sheet1.freeze_panes - , r["sheet_idx1"], sheet2.freeze_panes - , metric - ) - # }}} Compare Freezing # + logger.debug( + "Assertion: %s.freeze(%s) == %s.freeze(%s) - %s", + r["sheet_idx0"], + sheet1.freeze_panes, + r["sheet_idx1"], + sheet2.freeze_panes, + metric, + ) + # }}} Compare Freezing # elif r["type"] == "zoom": - # Check Zooming {{{ # + # Check Zooming {{{ # # sheet_idx: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" # method: str # ref: value - sheet: Worksheet = _load_sheet(*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke)) + sheet: Worksheet = _load_sheet( + *parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke) + ) if sheet is None: - return 0. - zoom_scale: Number = sheet.sheet_view.zoomScale or 100. + return 0.0 + zoom_scale: Number = sheet.sheet_view.zoomScale or 100.0 metric: bool = _match_value_to_rule(zoom_scale, r) - logger.debug("Assertion: %s.zoom(%.1f) %s %.1f - %s", r["sheet_idx"], zoom_scale, r["method"], r["ref"], - metric) - # }}} Check Zooming # + logger.debug( + "Assertion: %s.zoom(%.1f) %s %.1f - %s", + r["sheet_idx"], + zoom_scale, + r["method"], + r["ref"], + metric, + ) + # }}} Check Zooming # elif r["type"] == "data_validation": - # Check Data Validation {{{ # + # Check Data Validation {{{ # # sheet_idx: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" # dv_props: list of dict like {attribute: {"method": str, "ref": anything}} # available attributes: @@ -355,103 +563,136 @@ def compare_table(result: str, expected: str = None, **options) -> float: # * promptTitle # * imeMode - sheet: Worksheet = _load_sheet(*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke)) + sheet: Worksheet = _load_sheet( + *parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke) + ) if sheet is None: - return 0. - data_validators: List[DataValidation] = sheet.data_validations.dataValidation + return 0.0 + data_validators: List[DataValidation] = ( + sheet.data_validations.dataValidation + ) total_metric = len(data_validators) >= len(r["dv_props"]) for dat_vldt in data_validators: metric = False for prpt in r["dv_props"]: - metric = metric or all(_match_value_to_rule(getattr(dat_vldt, attrbt) - , mr - ) \ - for attrbt, mr in prpt.items() - ) + metric = metric or all( + _match_value_to_rule(getattr(dat_vldt, attrbt), mr) + for attrbt, mr in prpt.items() + ) if metric: break total_metric = total_metric and metric if not total_metric: break - logger.debug("Assertion: %s.data_validation - %s", r["sheet_idx"], total_metric) + logger.debug( + "Assertion: %s.data_validation - %s", r["sheet_idx"], total_metric + ) metric: bool = total_metric - # }}} Check Data Validation # + # }}} Check Data Validation # elif r["type"] == "row_props": - # Check Row Properties {{{ # + # Check Row Properties {{{ # # sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" # sheet_idx1: as sheet_idx0 # props: list of str, see utils.load_rows_or_cols - rows1: Dict[str, Any] = load_rows_or_cols(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke) - , obj="row" - , **r - ) - rows2: Dict[str, Any] = load_rows_or_cols(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke) - , obj="row" - , **r - ) + rows1: Dict[str, Any] = load_rows_or_cols( + *parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), obj="row", **r + ) + rows2: Dict[str, Any] = load_rows_or_cols( + *parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), obj="row", **r + ) logger.debug("Rows1: %s", repr(rows1)) logger.debug("Rows2: %s", repr(rows2)) metric: bool = rows1 == rows2 - logger.debug("Assertion: %s[rows] == %s[rows] - %s", r["sheet_idx0"], r["sheet_idx1"], metric) - # }}} Check Row Properties # + logger.debug( + "Assertion: %s[rows] == %s[rows] - %s", + r["sheet_idx0"], + r["sheet_idx1"], + metric, + ) + # }}} Check Row Properties # elif r["type"] == "col_props": - # Check Row Properties {{{ # + # Check Row Properties {{{ # # sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" # sheet_idx1: as sheet_idx0 # props: list of str, see utils.load_rows_or_cols - cols1: Dict[str, Any] = load_rows_or_cols(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke) - , obj="column" - , **r - ) - cols2: Dict[str, Any] = load_rows_or_cols(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke) - , obj="column" - , **r - ) + cols1: Dict[str, Any] = load_rows_or_cols( + *parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), obj="column", **r + ) + cols2: Dict[str, Any] = load_rows_or_cols( + *parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), obj="column", **r + ) metric: bool = cols1 == cols2 - logger.debug("Assertion: %s[cols] == %s[cols] - %s", r["sheet_idx0"], r["sheet_idx1"], metric) - # }}} Check Row Properties # + logger.debug( + "Assertion: %s[cols] == %s[cols] - %s", + r["sheet_idx0"], + r["sheet_idx1"], + metric, + ) + # }}} Check Row Properties # elif r["type"] == "filter": - # Compare Filters {{{ # + # Compare Filters {{{ # # sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" # sheet_idx1: as sheet_idx0 - filters1: Dict[str, Any] = load_filters(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r) - filters2: Dict[str, Any] = load_filters(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r) + filters1: Dict[str, Any] = load_filters( + *parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r + ) + filters2: Dict[str, Any] = load_filters( + *parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r + ) metric: bool = filters1 == filters2 - logger.debug("Assertion: %s[filter] == %s[filter] - %s", r["sheet_idx0"], r["sheet_idx1"], metric) - # }}} Compare Filters # + logger.debug( + "Assertion: %s[filter] == %s[filter] - %s", + r["sheet_idx0"], + r["sheet_idx1"], + metric, + ) + # }}} Compare Filters # elif r["type"] == "pivot_table": - # Compare Pivot Tables {{{ # + # Compare Pivot Tables {{{ # # sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" # sheet_idx1: as sheet_idx0 # pivot_props: list of str, see utils.load_pivot_tables - pivots1: Dict[str, Any] = load_pivot_tables(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r) - pivots2: Dict[str, Any] = load_pivot_tables(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r) + pivots1: Dict[str, Any] = load_pivot_tables( + *parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r + ) + pivots2: Dict[str, Any] = load_pivot_tables( + *parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r + ) metric: bool = pivots1 == pivots2 - logger.debug("Assertion: %s[pivot]==%s[pivot] - %s", r["sheet_idx0"], r["sheet_idx1"], metric) - # }}} Compare Pivot Tables # + logger.debug( + "Assertion: %s[pivot]==%s[pivot] - %s", + r["sheet_idx0"], + r["sheet_idx1"], + metric, + ) + # }}} Compare Pivot Tables # elif r["type"] == "check_cell": - # Check Cell Properties {{{ # + # Check Cell Properties {{{ # # sheet_idx: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" # coordinate: str, "E3" # props: dict like {attribute: {"method": str, "ref": anything}} # supported attributes: value & those supported by utils._read_cell_style try: - sheet: Worksheet = _load_sheet(*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke)) + sheet: Worksheet = _load_sheet( + *parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke) + ) if sheet is None: - logger.error(f"Failed to load sheet for sheet_idx: {r['sheet_idx']}") - return 0. + logger.error( + f"Failed to load sheet for sheet_idx: {r['sheet_idx']}" + ) + return 0.0 # data_frame: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx"], pdworkbookr, pdworkbooke)) cell: Cell = sheet[r["coordinate"]] metric: bool = True @@ -463,91 +704,44 @@ def compare_table(result: str, expected: str = None, **options) -> float: val = read_cell_value(*parsed_result, r["coordinate"]) logger.debug(f"Cell {r['coordinate']} value: {val}") except Exception as e: - logger.error(f"Failed to read cell value at {r['coordinate']}: {e}") + logger.error( + f"Failed to read cell value at {r['coordinate']}: {e}" + ) val = None else: try: val = _read_cell_style(prpt, cell) except Exception as e: - logger.error(f"Failed to read cell style {prpt} at {r['coordinate']}: {e}") + logger.error( + f"Failed to read cell style {prpt} at {r['coordinate']}: {e}" + ) val = None metric = metric and _match_value_to_rule(val, rule) except Exception as e: logger.error(f"Error in check_cell processing: {e}") - return 0. + return 0.0 - logger.debug("Assertion: %s[%s] :%s - %s" - , r["sheet_idx"], r["coordinate"] - , repr(r["props"]), metric - ) - # }}} Check Cell Properties # + logger.debug( + "Assertion: %s[%s] :%s - %s", + r["sheet_idx"], + r["coordinate"], + repr(r["props"]), + metric, + ) + # }}} Check Cell Properties # else: - raise NotImplementedError("Unimplemented sheet check: {:}".format(r["type"])) + raise NotImplementedError( + "Unimplemented sheet check: {:}".format(r["type"]) + ) passes = passes and metric if not passes: break return float(passes) - # }}} function compare_table # - - -def compare_csv(result: str, expected: Union[str, List[str]], **options) -> float: - """ - Compare CSV files. If expected is a list, returns 1.0 if result matches any of the expected files. - - Args: - result: Path to result CSV file - expected: Path to expected CSV file or list of paths to expected CSV files - options: Additional options (strict, ignore_case) - - Returns: - 1.0 if result matches expected (or any file in expected list), 0.0 otherwise - """ - if result is None: - return 0. - - try: - with open(result) as f: - result_lines: Iterable[str] = f.read().splitlines() - except (FileNotFoundError, IOError): - return 0. - - # Convert expected to list if it's a single string (for backward compatibility) - if isinstance(expected, str): - expected_files = [expected] - else: - expected_files = expected - - # Try to match against each expected file - for expected_file in expected_files: - try: - with open(expected_file) as f: - expected_lines: Iterable[str] = f.read().splitlines() - - # Process lines based on options - current_result_lines = result_lines - current_expected_lines = expected_lines - - if not options.get("strict", True): - current_result_lines = map(str.strip, current_result_lines) - current_expected_lines = map(str.strip, current_expected_lines) - if options.get("ignore_case", False): - current_result_lines = map(str.lower, current_result_lines) - current_expected_lines = map(str.lower, current_expected_lines) - - # Check if this expected file matches - if list(current_result_lines) == list(current_expected_lines): - return 1.0 - - except (FileNotFoundError, IOError): - # If this expected file doesn't exist, continue to next one - continue - - # No match found - return 0.0 + # }}} function compare_table # def compare_conference_city_in_order(actual_city_list_path, expected_city): @@ -565,21 +759,31 @@ def compare_conference_city_in_order(actual_city_list_path, expected_city): for i in range(len(actual_city_list)): if isinstance(expected_city_list[i], str): if expected_city_list[i] not in actual_city_list[i]: - logger.debug(f"Expected city {expected_city_list[i]}; Actual city {actual_city_list[i]}") - print(f"Expected city {expected_city_list[i]}; Actual city {actual_city_list[i]}") - return 0. - + logger.debug( + f"Expected city {expected_city_list[i]}; Actual city {actual_city_list[i]}" + ) + print( + f"Expected city {expected_city_list[i]}; Actual city {actual_city_list[i]}" + ) + return 0.0 elif isinstance(expected_city_list[i], List): - if not any(possible_str in actual_city_list[i] for possible_str in expected_city_list[i]): - logger.debug(f"Expected city {expected_city_list[i]}; Actual city {actual_city_list[i]}") - print(f"Expected city {expected_city_list[i]}; Actual city {actual_city_list[i]}") - return 0. + if not any( + possible_str in actual_city_list[i] + for possible_str in expected_city_list[i] + ): + logger.debug( + f"Expected city {expected_city_list[i]}; Actual city {actual_city_list[i]}" + ) + print( + f"Expected city {expected_city_list[i]}; Actual city {actual_city_list[i]}" + ) + return 0.0 else: raise TypeError("Expected city should be a string or a list of strings") except: - return 0. + return 0.0 - return 1. + return 1.0