finished calc human evaluation updated calc configs with an extra sleep to guarantee the integrity of downloaded xlsx file
439 lines
19 KiB
Python
439 lines
19 KiB
Python
import functools
|
|
import itertools
|
|
import logging
|
|
import os.path
|
|
# import operator
|
|
from numbers import Number
|
|
from typing import Any, Union, cast, Callable, Iterable
|
|
from typing import Dict, List, Tuple
|
|
|
|
import openpyxl
|
|
import pandas as pd
|
|
from openpyxl import Workbook
|
|
from openpyxl.cell.cell import Cell
|
|
# from openpyxl.worksheet.cell_range import MultiCellRange
|
|
from openpyxl.worksheet.datavalidation import DataValidation
|
|
from openpyxl.worksheet.worksheet import Worksheet
|
|
|
|
from .utils import _match_value_to_rule, _read_cell_style, read_cell_value
|
|
from .utils import load_charts, load_sparklines, load_rows_or_cols, load_xlsx_styles
|
|
|
|
# from openpyxl.utils import coordinate_to_tuple
|
|
|
|
logger = logging.getLogger("desktopenv.metric.table")
|
|
|
|
BOOK = Union[pd.ExcelFile, Workbook, str]
|
|
|
|
|
|
def _parse_sheet_idx(sheet_idx: Union[int, str]
|
|
, result: BOOK, expected: BOOK
|
|
, result_sheet_names: List[str]
|
|
, expected_sheet_names: List[str]
|
|
) -> Tuple[BOOK, str]:
|
|
# function _parse_sheet_idx {{{ #
|
|
if isinstance(sheet_idx, int):
|
|
index: str = result_sheet_names[sheet_idx]
|
|
book: BOOK = result
|
|
elif sheet_idx.startswith("RI"):
|
|
index: str = result_sheet_names[int(sheet_idx[2:])]
|
|
book: BOOK = result
|
|
elif sheet_idx.startswith("RN"):
|
|
index: str = sheet_idx[2:]
|
|
book: BOOK = result
|
|
elif sheet_idx.startswith("EI"):
|
|
index: str = expected_sheet_names[int(sheet_idx[2:])]
|
|
book: BOOK = expected
|
|
elif sheet_idx.startswith("EN"):
|
|
index: str = sheet_idx[2:]
|
|
book: BOOK = expected
|
|
else:
|
|
logger.error("Unrecognized sheet index")
|
|
raise ValueError("Unrecognized sheet index")
|
|
return book, index
|
|
# }}} function _parse_sheet_idx #
|
|
|
|
|
|
SHEET = Union[pd.DataFrame, Worksheet, List[str]]
|
|
|
|
|
|
def _load_sheet(book: BOOK, index: str) -> SHEET:
|
|
# function _load_sheet {{{ #
|
|
if isinstance(book, str):
|
|
book: str = cast(str, book)
|
|
csv_name: str = "{:}-{:}.csv".format(os.path.splitext(book)[0], index)
|
|
|
|
with open(csv_name) as f:
|
|
csv_lines: List[str] = list(itertools.dropwhile(lambda l: len(l) == 0
|
|
, map(lambda l: l.strip()
|
|
, reversed(f.read().splitlines())
|
|
)
|
|
)
|
|
)
|
|
return csv_lines
|
|
if isinstance(book, pd.ExcelFile):
|
|
return pd.read_excel(book, index)
|
|
if isinstance(book, Workbook):
|
|
return book[index]
|
|
logger.error("Not supported workbook format")
|
|
raise NotImplementedError("Not supported workbook format")
|
|
# }}} function _load_sheet #
|
|
|
|
|
|
def compare_table(result: str, expected: str = None, **options) -> float:
|
|
# function compare_table {{{ #
|
|
"""
|
|
Args:
|
|
result (str): path to result xlsx
|
|
expected (str): path to golden xlsx
|
|
rules (List[Dict[str, Any]]): list of dict like
|
|
{
|
|
"type": str,
|
|
<str as parameters>: anything
|
|
}
|
|
as sequential rules
|
|
|
|
Returns:
|
|
float: the score
|
|
"""
|
|
|
|
if result is None:
|
|
return 0.
|
|
|
|
try:
|
|
xlworkbookr: Workbook = openpyxl.load_workbook(filename=result)
|
|
pdworkbookr = pd.ExcelFile(result)
|
|
except:
|
|
return 0.
|
|
worksheetr_names: List[str] = pdworkbookr.sheet_names
|
|
|
|
if expected is not None:
|
|
|
|
xlworkbooke: Workbook = openpyxl.load_workbook(filename=expected)
|
|
pdworkbooke = pd.ExcelFile(expected)
|
|
worksheete_names: List[str] = pdworkbooke.sheet_names
|
|
else:
|
|
xlworkbooke: Workbook = None
|
|
pdworkbooke = None
|
|
worksheete_names: List[str] = None
|
|
|
|
parse_idx: Callable[[Union[str, int], BOOK, BOOK], BOOK] = \
|
|
functools.partial(
|
|
_parse_sheet_idx,
|
|
result_sheet_names=worksheetr_names,
|
|
expected_sheet_names=worksheete_names
|
|
)
|
|
|
|
passes = True
|
|
for r in options["rules"]:
|
|
if r["type"] == "sheet_name":
|
|
# Compare Sheet Names {{{ #
|
|
metric: bool = worksheetr_names == worksheete_names
|
|
logger.debug("Assertion: %s.sheet_names == %s.sheet_names - %s", result, expected, metric)
|
|
# }}} Compare Sheet Names #
|
|
|
|
elif r["type"] == "sheet_data":
|
|
# Compare Sheet Data by Internal Value {{{ #
|
|
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
|
|
# sheet_idx1: as sheet_idx0
|
|
|
|
sheet1: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx0"], pdworkbookr, pdworkbooke))
|
|
sheet2: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx1"], pdworkbookr, pdworkbooke))
|
|
metric: bool = sheet1.equals(sheet2)
|
|
logger.debug("Assertion: %s =v= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
|
|
# }}} Compare Sheet Data by Internal Value #
|
|
|
|
elif r["type"] == "sheet_print":
|
|
# Compare Sheet Data by Printed Value {{{ #
|
|
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
|
|
# sheet_idx1: as sheet_idx0
|
|
# ignore_case: optional, defaults to False
|
|
|
|
sheet1: List[str] = _load_sheet(*parse_idx(r["sheet_idx0"], result, expected))
|
|
sheet2: List[str] = _load_sheet(*parse_idx(r["sheet_idx1"], result, expected))
|
|
if r.get("ignore_case", False):
|
|
sheet1 = [l.lower() for l in sheet1]
|
|
sheet2 = [l.lower() for l in sheet2]
|
|
metric: bool = sheet1 == sheet2
|
|
logger.debug("Assertion: %s =p= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
|
|
# }}} Compare Sheet Data by Printed Value #
|
|
|
|
elif r["type"] == "sparkline":
|
|
# Compare Sparklines {{{ #
|
|
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
|
|
# sheet_idx1: as sheet_idx0
|
|
|
|
sparkline1: Dict[str, str] = load_sparklines(*parse_idx(r["sheet_idx0"], result, expected))
|
|
sparkline2: Dict[str, str] = load_sparklines(*parse_idx(r["sheet_idx1"], result, expected))
|
|
metric: bool = sparkline1 == sparkline2
|
|
logger.debug("Assertion: %s.sp == %.sp - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
|
|
# }}} Compare Sparklines #
|
|
|
|
elif r["type"] == "chart":
|
|
# Compare Charts {{{ #
|
|
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
|
|
# sheet_idx1: as sheet_idx0
|
|
# chart_props: list of str, see utils.load_charts
|
|
|
|
charts1: Dict[str, Any] = load_charts(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r)
|
|
charts2: Dict[str, Any] = load_charts(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r)
|
|
metric: bool = charts1 == charts2
|
|
logger.debug("Assertion: %s[chart] == %s[chart] - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
|
|
# }}} Compare Charts #
|
|
|
|
elif r["type"] == "style":
|
|
# Compare Style (Also Conditional Formatiing) {{{ #
|
|
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
|
|
# sheet_idx1: as sheet_idx0
|
|
# props: list of str indicating concerned styles, see utils._read_cell_style
|
|
|
|
styles1: Dict[str, List[Any]] = load_xlsx_styles(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r)
|
|
styles2: Dict[str, List[Any]] = load_xlsx_styles(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r)
|
|
# number_formats1: List[str] = [c.number_format.lower() for col in sheet1.iter_cols() for c in col if c.value is not None and c.data_type=="n"]
|
|
# number_formats2: List[str] = [c.number_format.lower() for col in sheet2.iter_cols() for c in col if c.value is not None and c.data_type=="n"]
|
|
metric: bool = styles1 == styles2
|
|
logger.debug("Assertion: %s.style == %s.style - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
|
|
# }}} Compare Style (Also Conditional Formatiing) #
|
|
|
|
elif r["type"] == "freeze":
|
|
# Compare Freezing {{{ #
|
|
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
|
|
# sheet_idx1: as sheet_idx0
|
|
|
|
sheet1: Worksheet = _load_sheet(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke))
|
|
sheet2: Worksheet = _load_sheet(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke))
|
|
metric: bool = sheet1.freeze_panes == sheet2.freeze_panes
|
|
logger.debug("Assertion: %s.freeze(%s) == %s.freeze(%s) - %s"
|
|
, r["sheet_idx0"], sheet1.freeze_panes
|
|
, r["sheet_idx1"], sheet2.freeze_panes
|
|
, metric
|
|
)
|
|
# }}} Compare Freezing #
|
|
|
|
elif r["type"] == "zoom":
|
|
# Check Zooming {{{ #
|
|
# sheet_idx: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
|
|
# method: str
|
|
# ref: value
|
|
|
|
sheet: Worksheet = _load_sheet(*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke))
|
|
zoom_scale: Number = sheet.sheet_view.zoomScale or 100.
|
|
metric: bool = _match_value_to_rule(zoom_scale, r)
|
|
logger.debug("Assertion: %s.zoom(%.1f) %s %.1f - %s", r["sheet_idx"], zoom_scale, r["method"], r["ref"],
|
|
metric)
|
|
# }}} Check Zooming #
|
|
|
|
elif r["type"] == "data_validation":
|
|
# Check Data Validation {{{ #
|
|
# sheet_idx: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
|
|
# dv_props: list of dict like {attribute: {"method": str, "ref": anything}}
|
|
# available attributes:
|
|
# * ranges
|
|
# * type
|
|
# * formula1
|
|
# * formula2
|
|
# * operator
|
|
# * allowBlank
|
|
# * showDropDown
|
|
# * showInputMessage
|
|
# * showErrorMessage
|
|
# * error
|
|
# * errorTitle
|
|
# * errorStyle
|
|
# * prompt
|
|
# * promptTitle
|
|
# * imeMode
|
|
|
|
sheet: Worksheet = _load_sheet(*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke))
|
|
data_validators: List[DataValidation] = sheet.data_validations.dataValidation
|
|
|
|
total_metric = len(data_validators) >= len(r["dv_props"])
|
|
for dat_vldt in data_validators:
|
|
metric = False
|
|
for prpt in r["dv_props"]:
|
|
metric = metric or all(_match_value_to_rule(getattr(dat_vldt, attrbt)
|
|
, mr
|
|
) \
|
|
for attrbt, mr in prpt.items()
|
|
)
|
|
if metric:
|
|
break
|
|
total_metric = total_metric and metric
|
|
if not total_metric:
|
|
break
|
|
|
|
logger.debug("Assertion: %s.data_validation - %s", r["sheet_idx"], total_metric)
|
|
metric: bool = total_metric
|
|
# }}} Check Data Validation #
|
|
|
|
elif r["type"] == "row_props":
|
|
# Check Row Properties {{{ #
|
|
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
|
|
# sheet_idx1: as sheet_idx0
|
|
# props: list of str, see utils.load_rows_or_cols
|
|
|
|
rows1: Dict[str, Any] = load_rows_or_cols(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke)
|
|
, obj="row"
|
|
, **r
|
|
)
|
|
rows2: Dict[str, Any] = load_rows_or_cols(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke)
|
|
, obj="row"
|
|
, **r
|
|
)
|
|
logger.debug("Rows1: %s", repr(rows1))
|
|
logger.debug("Rows2: %s", repr(rows2))
|
|
metric: bool = rows1 == rows2
|
|
logger.debug("Assertion: %s[rows] == %s[rows] - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
|
|
# }}} Check Row Properties #
|
|
|
|
elif r["type"] == "col_props":
|
|
# Check Row Properties {{{ #
|
|
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
|
|
# sheet_idx1: as sheet_idx0
|
|
# props: list of str, see utils.load_rows_or_cols
|
|
|
|
cols1: Dict[str, Any] = load_rows_or_cols(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke)
|
|
, obj="column"
|
|
, **r
|
|
)
|
|
cols2: Dict[str, Any] = load_rows_or_cols(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke)
|
|
, obj="column"
|
|
, **r
|
|
)
|
|
metric: bool = cols1 == cols2
|
|
logger.debug("Assertion: %s[cols] == %s[cols] - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
|
|
# }}} Check Row Properties #
|
|
|
|
elif r["type"] == "check_cell":
|
|
# Check Cell Properties {{{ #
|
|
# sheet_idx: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
|
|
# coordinate: str, "E3"
|
|
# props: dict like {attribute: {"method": str, "ref": anything}}
|
|
# supported attributes: value & those supported by utils._read_cell_style
|
|
|
|
sheet: Worksheet = _load_sheet(*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke))
|
|
# data_frame: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx"], pdworkbookr, pdworkbooke))
|
|
cell: Cell = sheet[r["coordinate"]]
|
|
metric: bool = True
|
|
for prpt, rule in r["props"].items():
|
|
if prpt == "value":
|
|
val = read_cell_value(*parse_idx(r["sheet_idx"], result, expected), r["coordinate"])
|
|
else:
|
|
val = _read_cell_style(prpt, cell)
|
|
|
|
metric = metric and _match_value_to_rule(val, rule)
|
|
|
|
logger.debug("Assertion: %s[%s] :%s - %s"
|
|
, r["sheet_idx"], r["coordinate"]
|
|
, repr(r["props"]), metric
|
|
)
|
|
# }}} Check Cell Properties #
|
|
|
|
else:
|
|
raise NotImplementedError("Unimplemented sheet check: {:}".format(r["type"]))
|
|
|
|
passes = passes and metric
|
|
if not passes:
|
|
break
|
|
|
|
return float(passes)
|
|
# }}} function compare_table #
|
|
|
|
|
|
def compare_csv(result: str, expected: str, **options) -> float:
|
|
if result is None:
|
|
return 0.
|
|
|
|
with open(result) as f:
|
|
result_lines: Iterable[str] = f.read().splitlines()
|
|
with open(expected) as f:
|
|
expected_lines: Iterable[str] = f.read().splitlines()
|
|
if not options.get("strict", True):
|
|
result_lines = map(str.strip, result_lines)
|
|
expected_lines = map(str.strip, expected_lines)
|
|
if options.get("ignore_case", False):
|
|
result_lines = map(str.lower, result_lines)
|
|
expected_lines = map(str.lower, expected_lines)
|
|
|
|
metric: bool = list(result_lines) == list(expected_lines)
|
|
return float(metric)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import datetime
|
|
import sys
|
|
|
|
logger = logging.getLogger()
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
|
|
|
|
file_handler = logging.FileHandler(os.path.join("logs", "normal-{:}.log".format(datetime_str)))
|
|
debug_handler = logging.FileHandler(os.path.join("logs", "debug-{:}.log".format(datetime_str)))
|
|
stdout_handler = logging.StreamHandler(sys.stdout)
|
|
sdebug_handler = logging.FileHandler(os.path.join("logs", "sdebug-{:}.log".format(datetime_str)))
|
|
|
|
file_handler.setLevel(logging.INFO)
|
|
debug_handler.setLevel(logging.DEBUG)
|
|
stdout_handler.setLevel(logging.INFO)
|
|
sdebug_handler.setLevel(logging.DEBUG)
|
|
|
|
formatter = logging.Formatter(
|
|
fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s")
|
|
file_handler.setFormatter(formatter)
|
|
debug_handler.setFormatter(formatter)
|
|
stdout_handler.setFormatter(formatter)
|
|
sdebug_handler.setFormatter(formatter)
|
|
|
|
stdout_handler.addFilter(logging.Filter("desktopenv"))
|
|
sdebug_handler.addFilter(logging.Filter("desktopenv"))
|
|
|
|
logger.addHandler(file_handler)
|
|
logger.addHandler(debug_handler)
|
|
logger.addHandler(stdout_handler)
|
|
logger.addHandler(sdebug_handler)
|
|
|
|
path1 = "snapshots/test/cache/4e6fcf72-daf3-439f-a232-c434ce416af6/Employee_Age_By_Birthday.xlsx"
|
|
path2 = "snapshots/test/cache/4e6fcf72-daf3-439f-a232-c434ce416af6/Employee_Age_By_Birthday_gold.xlsx"
|
|
rules = [ { "type": "sheet_data"
|
|
, "sheet_idx0": 0
|
|
, "sheet_idx1": "EI0"
|
|
}
|
|
]
|
|
print(compare_table(path1, path2
|
|
, rules=rules
|
|
)
|
|
)
|
|
print(compare_table(path2, path2
|
|
, rules=rules
|
|
)
|
|
)
|
|
|
|
# Row Properties
|
|
# path1 = "../../任务数据/LibreOffice Calc/Date_Budget_Variance_HideNA.xlsx"
|
|
# path2 = "../../任务数据/LibreOffice Calc/Date_Budget_Variance_HideNA_gold.xlsx"
|
|
# workbook: Workbook = openpyxl.load_workbook(filename=path1)
|
|
# worksheet: Worksheet = workbook.active
|
|
# for r_no, dms in worksheet.column_dimensions.items():
|
|
# print(r_no, type(r_no), type(dms), dms.hidden)
|
|
|
|
# Conditional Formats
|
|
# import formulas
|
|
# path1 = "../../任务数据/LibreOffice Calc/Calendar_Highlight_Weekend_Days.xlsx"
|
|
# path2 = "../../任务数据/LibreOffice Calc/Calendar_Highlight_Weekend_Days_gold.xlsx"
|
|
# path3 = "../../任务数据/LibreOffice Calc/Calendar_Highlight_Weekend_Days_gold_test.xlsx"
|
|
# workbook: Workbook = openpyxl.load_workbook(filename=path2)
|
|
# worksheet: Worksheet = workbook.active
|
|
# print(worksheet.conditional_formatting)
|
|
# for itm in worksheet.conditional_formatting:
|
|
# print(itm.cells)
|
|
# for r in itm.rules:
|
|
# print( r.type, r.formula, r.dxf.font.color.rgb
|
|
# , r.dxf.fill.fgColor.rgb, r.dxf.fill.bgColor.rgb
|
|
# )
|
|
# condition = formulas.Parser().ast("=" + r.formula[0])[1].compile()
|
|
##print(r.type, r.operator, r.dxfId, r.dxf)
|
|
# for r in itm.cells:
|
|
# for c in r.cells:
|
|
# value = worksheet.cell(row=c[0], column=c[1]).value
|
|
# print(value, condition(str(value)))
|