refactor: improve code readability and error handling in table.py

- Reformatted import statements for better organization.
- Enhanced error handling in file reading functions to provide clearer logging.
- Introduced a new `_safe_read_file` function to handle multiple encoding attempts when reading files.
- Updated the `compare_csv` function to utilize the new file reading method, improving robustness.
- Ensured consistent return values across functions, replacing `0.` with `0.0` for clarity.

These changes maintain existing logic while improving code maintainability and readability.
This commit is contained in:
yuanmengqi
2025-07-16 18:11:05 +00:00
parent a9c1c6135a
commit 2a48500691

View File

@@ -2,6 +2,7 @@ import functools
import itertools
import logging
import os.path
# import operator
from numbers import Number
from typing import Any, Union, cast, Callable, Iterable
@@ -17,9 +18,19 @@ from openpyxl.worksheet.datavalidation import DataValidation
from openpyxl.worksheet.worksheet import Worksheet
from rapidfuzz import fuzz
from desktop_env.evaluators.metrics.utils import _match_value_to_rule, _read_cell_style, read_cell_value
from desktop_env.evaluators.metrics.utils import load_charts, load_sparklines, load_rows_or_cols, load_xlsx_styles \
, load_filters, load_pivot_tables
from desktop_env.evaluators.metrics.utils import (
_match_value_to_rule,
_read_cell_style,
read_cell_value,
)
from desktop_env.evaluators.metrics.utils import (
load_charts,
load_sparklines,
load_rows_or_cols,
load_xlsx_styles,
load_filters,
load_pivot_tables,
)
# from openpyxl.utils import coordinate_to_tuple
@@ -28,16 +39,20 @@ logger = logging.getLogger("desktopenv.metric.table")
BOOK = Union[pd.ExcelFile, Workbook, str]
def _parse_sheet_idx(sheet_idx: Union[int, str]
, result: BOOK, expected: BOOK
, result_sheet_names: List[str]
, expected_sheet_names: List[str]
) -> Tuple[BOOK, str]:
# function _parse_sheet_idx {{{ #
def _parse_sheet_idx(
sheet_idx: Union[int, str],
result: BOOK,
expected: BOOK,
result_sheet_names: List[str],
expected_sheet_names: List[str],
) -> Tuple[BOOK, str]:
# function _parse_sheet_idx {{{ #
if isinstance(sheet_idx, int):
try:
if not result_sheet_names or sheet_idx >= len(result_sheet_names):
logger.error(f"Sheet index {sheet_idx} out of range. Available sheets: {result_sheet_names}")
logger.error(
f"Sheet index {sheet_idx} out of range. Available sheets: {result_sheet_names}"
)
index = ""
else:
index: str = result_sheet_names[sheet_idx]
@@ -68,27 +83,31 @@ def _parse_sheet_idx(sheet_idx: Union[int, str]
logger.error("Unrecognized sheet index")
raise ValueError("Unrecognized sheet index")
return book, index
# }}} function _parse_sheet_idx #
# }}} function _parse_sheet_idx #
SHEET = Union[pd.DataFrame, Worksheet, List[str]]
def _load_sheet(book: BOOK, index: str) -> SHEET:
# function _load_sheet {{{ #
# function _load_sheet {{{ #
try:
if isinstance(book, str):
book: str = cast(str, book)
csv_name: str = "{:}-{:}.csv".format(os.path.splitext(book)[0], index)
with open(csv_name) as f:
csv_lines: List[str] = list(itertools.dropwhile(lambda l: len(l) == 0
, map(lambda l: l.strip()
, reversed(f.read().splitlines())
)
)
)
return csv_lines
try:
all_lines: List[str] = _safe_read_file(csv_name)
csv_lines: List[str] = list(
itertools.dropwhile(
lambda l: len(l) == 0,
map(lambda l: l.strip(), reversed(all_lines)),
)
)
return csv_lines
except (FileNotFoundError, IOError) as e:
logger.error(f"Failed to read CSV file {csv_name}: {e}")
return None
if isinstance(book, pd.ExcelFile):
return pd.read_excel(book, index)
if isinstance(book, Workbook):
@@ -99,11 +118,124 @@ def _load_sheet(book: BOOK, index: str) -> SHEET:
raise e
except:
return None
# }}} function _load_sheet #
# }}} function _load_sheet #
def _safe_read_file(file_path: str) -> List[str]:
"""
Safely read a file with multiple encoding attempts.
Args:
file_path: Path to the file to read
Returns:
List of lines from the file
Raises:
FileNotFoundError: If file doesn't exist
IOError: If file cannot be read with any encoding
"""
# Common encodings to try in order of preference
encodings = [
"utf-8", # Most common modern encoding
"utf-8-sig", # UTF-8 with BOM
"latin-1", # ISO-8859-1, works with any byte sequence
"windows-1252", # Common Windows encoding
"gbk", # Chinese encoding
"cp1251", # Cyrillic encoding
"iso-8859-1", # Alternative latin-1
]
last_error = None
for encoding in encodings:
try:
with open(file_path, "r", encoding=encoding) as f:
lines = f.read().splitlines()
logger.debug(
f"Successfully read file {file_path} with encoding {encoding}"
)
return lines
except UnicodeDecodeError as e:
last_error = e
logger.debug(f"Failed to read {file_path} with encoding {encoding}: {e}")
continue
except (FileNotFoundError, IOError) as e:
# These are non-encoding related errors, re-raise immediately
raise e
# If all encodings fail, try with error handling as last resort
try:
with open(file_path, "r", encoding="utf-8", errors="replace") as f:
lines = f.read().splitlines()
logger.warning(f"Read file {file_path} with UTF-8 and error replacement")
return lines
except Exception as e:
logger.error(
f"Failed to read file {file_path} with any encoding. Last error: {last_error}"
)
raise IOError(
f"Cannot read file {file_path} with any supported encoding"
) from last_error
def compare_csv(result: str, expected: Union[str, List[str]], **options) -> float:
"""
Compare CSV files. If expected is a list, returns 1.0 if result matches any of the expected files.
Args:
result: Path to result CSV file
expected: Path to expected CSV file or list of paths to expected CSV files
options: Additional options (strict, ignore_case)
Returns:
1.0 if result matches expected (or any file in expected list), 0.0 otherwise
"""
if result is None:
return 0.0
try:
result_lines: List[str] = _safe_read_file(result)
except (FileNotFoundError, IOError) as e:
logger.error(f"Failed to read result file {result}: {e}")
return 0.0
# Convert expected to list if it's a single string (for backward compatibility)
if isinstance(expected, str):
expected_files = [expected]
else:
expected_files = expected
# Try to match against each expected file
for expected_file in expected_files:
try:
expected_lines: List[str] = _safe_read_file(expected_file)
# Process lines based on options
current_result_lines = result_lines
current_expected_lines = expected_lines
if not options.get("strict", True):
current_result_lines = map(str.strip, current_result_lines)
current_expected_lines = map(str.strip, current_expected_lines)
if options.get("ignore_case", False):
current_result_lines = map(str.lower, current_result_lines)
current_expected_lines = map(str.lower, current_expected_lines)
# Check if this expected file matches
if list(current_result_lines) == list(current_expected_lines):
return 1.0
except (FileNotFoundError, IOError):
# If this expected file doesn't exist, continue to next one
continue
# No match found
return 0.0
def compare_table(result: str, expected: str = None, **options) -> float:
# function compare_table {{{ #
# function compare_table {{{ #
"""
Args:
result (str): path to result xlsx
@@ -121,21 +253,23 @@ def compare_table(result: str, expected: str = None, **options) -> float:
if result is None:
logger.error("Result file path is None")
return 0.
return 0.0
# Check if result file exists
if not os.path.exists(result):
logger.error(f"Result file not found: {result}")
return 0.
return 0.0
try:
logger.info(f"Loading result file: {result}")
xlworkbookr: Workbook = openpyxl.load_workbook(filename=result)
pdworkbookr = pd.ExcelFile(result)
logger.info(f"Successfully loaded result file with sheets: {pdworkbookr.sheet_names}")
logger.info(
f"Successfully loaded result file with sheets: {pdworkbookr.sheet_names}"
)
except Exception as e:
logger.error(f"Failed to load result file {result}: {e}")
return 0.
return 0.0
worksheetr_names: List[str] = pdworkbookr.sheet_names
if expected is not None:
@@ -147,32 +281,42 @@ def compare_table(result: str, expected: str = None, **options) -> float:
pdworkbooke = None
worksheete_names: List[str] = None
parse_idx: Callable[[Union[str, int], BOOK, BOOK], Tuple[BOOK, str]] = \
parse_idx: Callable[[Union[str, int], BOOK, BOOK], Tuple[BOOK, str]] = (
functools.partial(
_parse_sheet_idx,
result_sheet_names=worksheetr_names,
expected_sheet_names=worksheete_names
expected_sheet_names=worksheete_names,
)
)
passes = True
for r in options["rules"]:
if r["type"] == "sheet_name":
# Compare Sheet Names {{{ #
# Compare Sheet Names {{{ #
metric: bool = worksheetr_names == worksheete_names
logger.debug("Assertion: %s.sheet_names == %s.sheet_names - %s", result, expected, metric)
# }}} Compare Sheet Names #
logger.debug(
"Assertion: %s.sheet_names == %s.sheet_names - %s",
result,
expected,
metric,
)
# }}} Compare Sheet Names #
elif r["type"] == "sheet_data":
# Compare Sheet Data by Internal Value {{{ #
# Compare Sheet Data by Internal Value {{{ #
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# sheet_idx1: as sheet_idx0
# precision: int as number of decimal digits, default to 4
error_limit: int = r.get("precision", 4)
sheet1: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx0"], pdworkbookr, pdworkbooke))
sheet1: pd.DataFrame = _load_sheet(
*parse_idx(r["sheet_idx0"], pdworkbookr, pdworkbooke)
)
if sheet1 is None:
return 0.
sheet2: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx1"], pdworkbookr, pdworkbooke))
return 0.0
sheet2: pd.DataFrame = _load_sheet(
*parse_idx(r["sheet_idx1"], pdworkbookr, pdworkbooke)
)
sheet1 = sheet1.round(error_limit)
sheet2 = sheet2.round(error_limit)
@@ -183,28 +327,36 @@ def compare_table(result: str, expected: str = None, **options) -> float:
logger.debug("Sheet1 =v= Sheet2: \n%s", str(sheet1 == sheet2))
except:
logger.debug("Sheet1 =/v= Sheet2")
logger.debug("Assertion: %s =v= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
# }}} Compare Sheet Data by Internal Value #
logger.debug(
"Assertion: %s =v= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric
)
# }}} Compare Sheet Data by Internal Value #
elif r["type"] == "sheet_print":
# Compare Sheet Data by Printed Value {{{ #
# Compare Sheet Data by Printed Value {{{ #
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# sheet_idx1: as sheet_idx0
# ignore_case: optional, defaults to False
sheet1: List[str] = _load_sheet(*parse_idx(r["sheet_idx0"], result, expected))
sheet1: List[str] = _load_sheet(
*parse_idx(r["sheet_idx0"], result, expected)
)
if sheet1 is None:
return 0.
sheet2: List[str] = _load_sheet(*parse_idx(r["sheet_idx1"], result, expected))
return 0.0
sheet2: List[str] = _load_sheet(
*parse_idx(r["sheet_idx1"], result, expected)
)
if r.get("ignore_case", False):
sheet1 = [l.lower() for l in sheet1]
sheet2 = [l.lower() for l in sheet2]
metric: bool = sheet1 == sheet2
logger.debug("Assertion: %s =p= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
# }}} Compare Sheet Data by Printed Value #
logger.debug(
"Assertion: %s =p= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric
)
# }}} Compare Sheet Data by Printed Value #
elif r["type"] == "sheet_fuzzy":
# Fuzzy Match for Ranges {{{ #
# Fuzzy Match for Ranges {{{ #
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# sheet_idx1: as sheet_idx0
# rules: list of dict, each dict is like
@@ -224,7 +376,9 @@ def compare_table(result: str, expected: str = None, **options) -> float:
for rl in r["rules"]:
for rng in MultiCellRange(rl["range"]):
for cdn in rng.cells:
coordinate: str = "{:}{:d}".format(get_column_letter(cdn[1]), cdn[0])
coordinate: str = "{:}{:d}".format(
get_column_letter(cdn[1]), cdn[0]
)
value1: str = str(read_cell_value(*sheet1, coordinate))
value2: str = str(read_cell_value(*sheet2, coordinate))
logger.debug("%s: %s vs %s", cdn, value1, value2)
@@ -240,8 +394,12 @@ def compare_table(result: str, expected: str = None, **options) -> float:
value2 = value2.rstrip(rl["trim_trailings"])
if "ignore_chars" in rl:
ignore_chars: Set[str] = set(rl["ignore_chars"])
value1 = "".join(filter(lambda ch: ch not in ignore_chars, value1))
value2 = "".join(filter(lambda ch: ch not in ignore_chars, value2))
value1 = "".join(
filter(lambda ch: ch not in ignore_chars, value1)
)
value2 = "".join(
filter(lambda ch: ch not in ignore_chars, value2)
)
if rl.get("ignore_case", False):
value1 = value1.lower()
value2 = value2.lower()
@@ -251,91 +409,141 @@ def compare_table(result: str, expected: str = None, **options) -> float:
elif rl["type"] == "included_by":
metric: bool = value1 in value2
elif rl["type"] == "fuzzy_match":
metric: bool = fuzz.ratio(value1, value2) >= rl.get("threshold", 85.)
metric: bool = fuzz.ratio(value1, value2) >= rl.get(
"threshold", 85.0
)
elif rl["type"] == "exact_match":
metric: bool = value1 == value2
total_metric = total_metric and metric
metric: bool = total_metric
logger.debug("Assertion: %s =~= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
# }}} Fuzzy Match for Ranges #
logger.debug(
"Assertion: %s =~= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric
)
# }}} Fuzzy Match for Ranges #
elif r["type"] == "sparkline":
# Compare Sparklines {{{ #
# Compare Sparklines {{{ #
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# sheet_idx1: as sheet_idx0
sparkline1: Dict[str, str] = load_sparklines(*parse_idx(r["sheet_idx0"], result, expected))
sparkline2: Dict[str, str] = load_sparklines(*parse_idx(r["sheet_idx1"], result, expected))
sparkline1: Dict[str, str] = load_sparklines(
*parse_idx(r["sheet_idx0"], result, expected)
)
sparkline2: Dict[str, str] = load_sparklines(
*parse_idx(r["sheet_idx1"], result, expected)
)
metric: bool = sparkline1 == sparkline2
logger.debug("Assertion: %s.sp == %.sp - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
# }}} Compare Sparklines #
logger.debug(
"Assertion: %s.sp == %.sp - %s",
r["sheet_idx0"],
r["sheet_idx1"],
metric,
)
# }}} Compare Sparklines #
elif r["type"] == "chart":
# Compare Charts {{{ #
# Compare Charts {{{ #
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# sheet_idx1: as sheet_idx0
# chart_props: list of str, see utils.load_charts
charts1: Dict[str, Any] = load_charts(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r)
charts2: Dict[str, Any] = load_charts(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r)
charts1: Dict[str, Any] = load_charts(
*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r
)
charts2: Dict[str, Any] = load_charts(
*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r
)
metric: bool = charts1 == charts2
logger.debug("Assertion: %s[chart] == %s[chart] - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
# }}} Compare Charts #
logger.debug(
"Assertion: %s[chart] == %s[chart] - %s",
r["sheet_idx0"],
r["sheet_idx1"],
metric,
)
# }}} Compare Charts #
elif r["type"] == "style":
# Compare Style (Also Conditional Formatiing) {{{ #
# Compare Style (Also Conditional Formatiing) {{{ #
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# sheet_idx1: as sheet_idx0
# props: list of str indicating concerned styles, see utils._read_cell_style
sheet_idx1: Tuple[BOOK, str] = parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke)
sheet_idx1: Tuple[BOOK, str] = parse_idx(
r["sheet_idx0"], xlworkbookr, xlworkbooke
)
book_name1: str = parse_idx(r["sheet_idx0"], result, expected)[0]
styles1: Dict[str, List[Any]] = load_xlsx_styles(*sheet_idx1, book_name1, **r)
styles1: Dict[str, List[Any]] = load_xlsx_styles(
*sheet_idx1, book_name1, **r
)
sheet_idx2: Tuple[BOOK, str] = parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke)
sheet_idx2: Tuple[BOOK, str] = parse_idx(
r["sheet_idx1"], xlworkbookr, xlworkbooke
)
book_name2: str = parse_idx(r["sheet_idx1"], result, expected)[0]
styles2: Dict[str, List[Any]] = load_xlsx_styles(*sheet_idx2, book_name2, **r)
styles2: Dict[str, List[Any]] = load_xlsx_styles(
*sheet_idx2, book_name2, **r
)
# number_formats1: List[str] = [c.number_format.lower() for col in sheet1.iter_cols() for c in col if c.value is not None and c.data_type=="n"]
# number_formats2: List[str] = [c.number_format.lower() for col in sheet2.iter_cols() for c in col if c.value is not None and c.data_type=="n"]
metric: bool = styles1 == styles2
logger.debug("Assertion: %s.style == %s.style - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
# }}} Compare Style (Also Conditional Formatiing) #
logger.debug(
"Assertion: %s.style == %s.style - %s",
r["sheet_idx0"],
r["sheet_idx1"],
metric,
)
# }}} Compare Style (Also Conditional Formatiing) #
elif r["type"] == "freeze":
# Compare Freezing {{{ #
# Compare Freezing {{{ #
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# sheet_idx1: as sheet_idx0
sheet1: Worksheet = _load_sheet(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke))
sheet1: Worksheet = _load_sheet(
*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke)
)
if sheet1 is None:
return 0.
sheet2: Worksheet = _load_sheet(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke))
return 0.0
sheet2: Worksheet = _load_sheet(
*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke)
)
metric: bool = sheet1.freeze_panes == sheet2.freeze_panes
logger.debug("Assertion: %s.freeze(%s) == %s.freeze(%s) - %s"
, r["sheet_idx0"], sheet1.freeze_panes
, r["sheet_idx1"], sheet2.freeze_panes
, metric
)
# }}} Compare Freezing #
logger.debug(
"Assertion: %s.freeze(%s) == %s.freeze(%s) - %s",
r["sheet_idx0"],
sheet1.freeze_panes,
r["sheet_idx1"],
sheet2.freeze_panes,
metric,
)
# }}} Compare Freezing #
elif r["type"] == "zoom":
# Check Zooming {{{ #
# Check Zooming {{{ #
# sheet_idx: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# method: str
# ref: value
sheet: Worksheet = _load_sheet(*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke))
sheet: Worksheet = _load_sheet(
*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke)
)
if sheet is None:
return 0.
zoom_scale: Number = sheet.sheet_view.zoomScale or 100.
return 0.0
zoom_scale: Number = sheet.sheet_view.zoomScale or 100.0
metric: bool = _match_value_to_rule(zoom_scale, r)
logger.debug("Assertion: %s.zoom(%.1f) %s %.1f - %s", r["sheet_idx"], zoom_scale, r["method"], r["ref"],
metric)
# }}} Check Zooming #
logger.debug(
"Assertion: %s.zoom(%.1f) %s %.1f - %s",
r["sheet_idx"],
zoom_scale,
r["method"],
r["ref"],
metric,
)
# }}} Check Zooming #
elif r["type"] == "data_validation":
# Check Data Validation {{{ #
# Check Data Validation {{{ #
# sheet_idx: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# dv_props: list of dict like {attribute: {"method": str, "ref": anything}}
# available attributes:
@@ -355,103 +563,136 @@ def compare_table(result: str, expected: str = None, **options) -> float:
# * promptTitle
# * imeMode
sheet: Worksheet = _load_sheet(*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke))
sheet: Worksheet = _load_sheet(
*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke)
)
if sheet is None:
return 0.
data_validators: List[DataValidation] = sheet.data_validations.dataValidation
return 0.0
data_validators: List[DataValidation] = (
sheet.data_validations.dataValidation
)
total_metric = len(data_validators) >= len(r["dv_props"])
for dat_vldt in data_validators:
metric = False
for prpt in r["dv_props"]:
metric = metric or all(_match_value_to_rule(getattr(dat_vldt, attrbt)
, mr
) \
for attrbt, mr in prpt.items()
)
metric = metric or all(
_match_value_to_rule(getattr(dat_vldt, attrbt), mr)
for attrbt, mr in prpt.items()
)
if metric:
break
total_metric = total_metric and metric
if not total_metric:
break
logger.debug("Assertion: %s.data_validation - %s", r["sheet_idx"], total_metric)
logger.debug(
"Assertion: %s.data_validation - %s", r["sheet_idx"], total_metric
)
metric: bool = total_metric
# }}} Check Data Validation #
# }}} Check Data Validation #
elif r["type"] == "row_props":
# Check Row Properties {{{ #
# Check Row Properties {{{ #
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# sheet_idx1: as sheet_idx0
# props: list of str, see utils.load_rows_or_cols
rows1: Dict[str, Any] = load_rows_or_cols(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke)
, obj="row"
, **r
)
rows2: Dict[str, Any] = load_rows_or_cols(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke)
, obj="row"
, **r
)
rows1: Dict[str, Any] = load_rows_or_cols(
*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), obj="row", **r
)
rows2: Dict[str, Any] = load_rows_or_cols(
*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), obj="row", **r
)
logger.debug("Rows1: %s", repr(rows1))
logger.debug("Rows2: %s", repr(rows2))
metric: bool = rows1 == rows2
logger.debug("Assertion: %s[rows] == %s[rows] - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
# }}} Check Row Properties #
logger.debug(
"Assertion: %s[rows] == %s[rows] - %s",
r["sheet_idx0"],
r["sheet_idx1"],
metric,
)
# }}} Check Row Properties #
elif r["type"] == "col_props":
# Check Row Properties {{{ #
# Check Row Properties {{{ #
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# sheet_idx1: as sheet_idx0
# props: list of str, see utils.load_rows_or_cols
cols1: Dict[str, Any] = load_rows_or_cols(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke)
, obj="column"
, **r
)
cols2: Dict[str, Any] = load_rows_or_cols(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke)
, obj="column"
, **r
)
cols1: Dict[str, Any] = load_rows_or_cols(
*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), obj="column", **r
)
cols2: Dict[str, Any] = load_rows_or_cols(
*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), obj="column", **r
)
metric: bool = cols1 == cols2
logger.debug("Assertion: %s[cols] == %s[cols] - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
# }}} Check Row Properties #
logger.debug(
"Assertion: %s[cols] == %s[cols] - %s",
r["sheet_idx0"],
r["sheet_idx1"],
metric,
)
# }}} Check Row Properties #
elif r["type"] == "filter":
# Compare Filters {{{ #
# Compare Filters {{{ #
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# sheet_idx1: as sheet_idx0
filters1: Dict[str, Any] = load_filters(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r)
filters2: Dict[str, Any] = load_filters(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r)
filters1: Dict[str, Any] = load_filters(
*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r
)
filters2: Dict[str, Any] = load_filters(
*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r
)
metric: bool = filters1 == filters2
logger.debug("Assertion: %s[filter] == %s[filter] - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
# }}} Compare Filters #
logger.debug(
"Assertion: %s[filter] == %s[filter] - %s",
r["sheet_idx0"],
r["sheet_idx1"],
metric,
)
# }}} Compare Filters #
elif r["type"] == "pivot_table":
# Compare Pivot Tables {{{ #
# Compare Pivot Tables {{{ #
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# sheet_idx1: as sheet_idx0
# pivot_props: list of str, see utils.load_pivot_tables
pivots1: Dict[str, Any] = load_pivot_tables(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r)
pivots2: Dict[str, Any] = load_pivot_tables(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r)
pivots1: Dict[str, Any] = load_pivot_tables(
*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r
)
pivots2: Dict[str, Any] = load_pivot_tables(
*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r
)
metric: bool = pivots1 == pivots2
logger.debug("Assertion: %s[pivot]==%s[pivot] - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
# }}} Compare Pivot Tables #
logger.debug(
"Assertion: %s[pivot]==%s[pivot] - %s",
r["sheet_idx0"],
r["sheet_idx1"],
metric,
)
# }}} Compare Pivot Tables #
elif r["type"] == "check_cell":
# Check Cell Properties {{{ #
# Check Cell Properties {{{ #
# sheet_idx: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
# coordinate: str, "E3"
# props: dict like {attribute: {"method": str, "ref": anything}}
# supported attributes: value & those supported by utils._read_cell_style
try:
sheet: Worksheet = _load_sheet(*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke))
sheet: Worksheet = _load_sheet(
*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke)
)
if sheet is None:
logger.error(f"Failed to load sheet for sheet_idx: {r['sheet_idx']}")
return 0.
logger.error(
f"Failed to load sheet for sheet_idx: {r['sheet_idx']}"
)
return 0.0
# data_frame: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx"], pdworkbookr, pdworkbooke))
cell: Cell = sheet[r["coordinate"]]
metric: bool = True
@@ -463,91 +704,44 @@ def compare_table(result: str, expected: str = None, **options) -> float:
val = read_cell_value(*parsed_result, r["coordinate"])
logger.debug(f"Cell {r['coordinate']} value: {val}")
except Exception as e:
logger.error(f"Failed to read cell value at {r['coordinate']}: {e}")
logger.error(
f"Failed to read cell value at {r['coordinate']}: {e}"
)
val = None
else:
try:
val = _read_cell_style(prpt, cell)
except Exception as e:
logger.error(f"Failed to read cell style {prpt} at {r['coordinate']}: {e}")
logger.error(
f"Failed to read cell style {prpt} at {r['coordinate']}: {e}"
)
val = None
metric = metric and _match_value_to_rule(val, rule)
except Exception as e:
logger.error(f"Error in check_cell processing: {e}")
return 0.
return 0.0
logger.debug("Assertion: %s[%s] :%s - %s"
, r["sheet_idx"], r["coordinate"]
, repr(r["props"]), metric
)
# }}} Check Cell Properties #
logger.debug(
"Assertion: %s[%s] :%s - %s",
r["sheet_idx"],
r["coordinate"],
repr(r["props"]),
metric,
)
# }}} Check Cell Properties #
else:
raise NotImplementedError("Unimplemented sheet check: {:}".format(r["type"]))
raise NotImplementedError(
"Unimplemented sheet check: {:}".format(r["type"])
)
passes = passes and metric
if not passes:
break
return float(passes)
# }}} function compare_table #
def compare_csv(result: str, expected: Union[str, List[str]], **options) -> float:
"""
Compare CSV files. If expected is a list, returns 1.0 if result matches any of the expected files.
Args:
result: Path to result CSV file
expected: Path to expected CSV file or list of paths to expected CSV files
options: Additional options (strict, ignore_case)
Returns:
1.0 if result matches expected (or any file in expected list), 0.0 otherwise
"""
if result is None:
return 0.
try:
with open(result) as f:
result_lines: Iterable[str] = f.read().splitlines()
except (FileNotFoundError, IOError):
return 0.
# Convert expected to list if it's a single string (for backward compatibility)
if isinstance(expected, str):
expected_files = [expected]
else:
expected_files = expected
# Try to match against each expected file
for expected_file in expected_files:
try:
with open(expected_file) as f:
expected_lines: Iterable[str] = f.read().splitlines()
# Process lines based on options
current_result_lines = result_lines
current_expected_lines = expected_lines
if not options.get("strict", True):
current_result_lines = map(str.strip, current_result_lines)
current_expected_lines = map(str.strip, current_expected_lines)
if options.get("ignore_case", False):
current_result_lines = map(str.lower, current_result_lines)
current_expected_lines = map(str.lower, current_expected_lines)
# Check if this expected file matches
if list(current_result_lines) == list(current_expected_lines):
return 1.0
except (FileNotFoundError, IOError):
# If this expected file doesn't exist, continue to next one
continue
# No match found
return 0.0
# }}} function compare_table #
def compare_conference_city_in_order(actual_city_list_path, expected_city):
@@ -565,21 +759,31 @@ def compare_conference_city_in_order(actual_city_list_path, expected_city):
for i in range(len(actual_city_list)):
if isinstance(expected_city_list[i], str):
if expected_city_list[i] not in actual_city_list[i]:
logger.debug(f"Expected city {expected_city_list[i]}; Actual city {actual_city_list[i]}")
print(f"Expected city {expected_city_list[i]}; Actual city {actual_city_list[i]}")
return 0.
logger.debug(
f"Expected city {expected_city_list[i]}; Actual city {actual_city_list[i]}"
)
print(
f"Expected city {expected_city_list[i]}; Actual city {actual_city_list[i]}"
)
return 0.0
elif isinstance(expected_city_list[i], List):
if not any(possible_str in actual_city_list[i] for possible_str in expected_city_list[i]):
logger.debug(f"Expected city {expected_city_list[i]}; Actual city {actual_city_list[i]}")
print(f"Expected city {expected_city_list[i]}; Actual city {actual_city_list[i]}")
return 0.
if not any(
possible_str in actual_city_list[i]
for possible_str in expected_city_list[i]
):
logger.debug(
f"Expected city {expected_city_list[i]}; Actual city {actual_city_list[i]}"
)
print(
f"Expected city {expected_city_list[i]}; Actual city {actual_city_list[i]}"
)
return 0.0
else:
raise TypeError("Expected city should be a string or a list of strings")
except:
return 0.
return 0.0
return 1.
return 1.0