From 4514c32269aa106758aacf4dbe2515a1f0051550 Mon Sep 17 00:00:00 2001 From: David Chang Date: Sun, 21 Jan 2024 22:55:52 +0800 Subject: [PATCH] ver Jan21st reconstructed calc metrics not updated the configs yet --- desktop_env/evaluators/metrics/table.py | 400 +++++++++++++----------- desktop_env/evaluators/metrics/utils.py | 29 +- 2 files changed, 235 insertions(+), 194 deletions(-) diff --git a/desktop_env/evaluators/metrics/table.py b/desktop_env/evaluators/metrics/table.py index 47c6fe1..02c6de3 100644 --- a/desktop_env/evaluators/metrics/table.py +++ b/desktop_env/evaluators/metrics/table.py @@ -1,10 +1,11 @@ import logging import operator from numbers import Number -from typing import Any, Union -from typing import Dict, List +from typing import Any, Union, cast, Callable +from typing import Dict, List, Tuple import os.path import itertools +import functools import openpyxl import pandas as pd @@ -17,206 +18,241 @@ from .utils import load_charts, load_sparklines, _match_value_to_rule logger = logging.getLogger("desktopenv.metric.table") +BOOK = Union[pd.ExcelFile, Workbook, str] +def _parse_sheet_idx( sheet_idx: Union[int, str] + , result: BOOK, expected: BOOK + , result_sheet_names: List[str] + , expected_sheet_names: List[str] + ) -> Tuple[BOOK, str]: + # function _parse_sheet_idx {{{ # + if isinstance(sheet_idx, int): + index: str = result_sheet_names[sheet_idx] + book: BOOK = result + elif sheet_idx.startswith("RI"): + index: str = result_sheet_names[int(sheet_idx[2:])] + book: BOOK = result + elif sheet_idx.startswith("RN"): + index: str = sheet_idx[2:] + book: BOOK = result + elif sheet_idx.startswith("EI"): + index: str = expected_sheet_names[int(sheet_idx[2:])] + book: BOOK = expected + elif sheet_idx.startswith("EN"): + index: str = sheet_idx[2:] + book: BOOK = expected + else: + logger.error("Unrecognized sheet index") + raise ValueError("Unrecognized sheet index") + return book, index + # }}} function _parse_sheet_idx # -def compare_table(actual: str, expected: str, **options) -> float: +SHEET = Union[pd.DataFrame, Worksheet, List[str]] +def _load_sheet(book: BOOK, index: str) -> SHEET: + # function _load_sheet {{{ # + if isinstance(book, str): + book: str = cast(str, book) + csv_name: str = "{:}-{:}.csv".format(os.path.splitext(book)[0], index) + + with open(csv_name) as f: + csv_lines: List[str] = list( itertools.dropwhile( lambda l: len(l)==0 + , map( lambda l: l.strip() + , reversed(f.read().splitlines()) + ) + ) + ) + return csv_lines + if isinstance(book, pd.ExcelFile): + return pd.read_excel(book, index) + if isinstance(book, Workbook): + return book[index] + logger.error("Not supported workbook format") + raise NotImplementedError("Not supported workbook format") + # }}} function _load_sheet # + +def compare_table(result: str, expected: str, **options) -> float: + # function compare_table {{{ # """ Args: - actual (str): path to result xlsx - expected (str): path to gold xlsx - options (Dict[str, List[str]]): dict like + result (str): path to result xlsx + expected (str): path to golden xlsx + rules (List[Dict[str, Any]]): list of dict like { - "features": list of str for other features, supports: - * sparkline - * chart - * number_format - "chart_props": list of str, giving the concerned chart properties - "as_shown": bool, TODO + "type": str, + : anything } + as sequential rules - Return: + Returns: float: the score """ - if actual is None: - return 0. - - if options.get("as_shown", False): - expected_csv: str = os.path.splitext(expected)[0] + ".csv" - actual_csv: str = os.path.splitext(actual)[0] + ".csv" - - with open(expected_csv) as f: - expected_lines: List[str] = list( itertools.dropwhile( lambda l: len(l)==0 - , map( lambda l: l.strip() - , reversed(f.read().splitlines()) - ) - ) - ) - if options.get("ignore_case", False): - expected_lines = [l.lower() for l in expected_lines] - with open(actual_csv) as f: - actual_lines: List[str] = list( itertools.dropwhile( lambda l: len(l)==0 - , map( lambda l: l.strip() - , reversed(f.read().splitlines()) - ) - ) - ) - if options.get("ignore_case", False): - actual_lines = [l.lower() for l in actual_lines] - metric: bool = expected_lines==actual_lines - logger.debug("Content Metric just as shown: %s", metric) - else: - df1 = pd.read_excel(expected) - df2 = pd.read_excel(actual) - metric: bool = df1.equals(df2) - logger.debug("Normal Content Metric: {:}".format(metric)) - - features: List[str] = options.get("features", []) - for ftr in features: - workbook1: Workbook = openpyxl.load_workbook(actual) - workbook2: Workbook = openpyxl.load_workbook(expected) - - if ftr == "sparkline": - sp1 = load_sparklines(actual) - sp2 = load_sparklines(expected) - new_metric: bool = sp1 == sp2 - logger.debug("Sparkline Metric: {:}".format(new_metric)) - elif ftr == "chart": - charts1 = load_charts(workbook1, **options) - charts2 = load_charts(workbook2, **options) - new_metric: bool = charts1 == charts2 - logger.debug("Chart Metric: {:}".format(new_metric)) - elif ftr == "number_format": - number_formats1: List[str] = [c.number_format.lower() \ - for col in workbook1.active.iter_cols() \ - for c in col \ - if c.data_type == "n" - ] - number_formats2: List[str] = [c.number_format.lower() \ - for col in workbook2.active.iter_cols() \ - for c in col \ - if c.data_type == "n" - ] - new_metric: bool = number_formats1 == number_formats2 - logger.debug("Number Format Metric: {:}".format(new_metric)) - else: - raise NotImplementedError("Unsupported xlsx feature: {:}".format(ftr)) - metric = metric and new_metric - - return float(metric) - - -def check_sheet_list(result: str, rules: List[Dict[str, Any]]) -> float: if result is None: return 0. - # workbook: Workbook = openpyxl.load_workbook(filename=result) - workbook = pd.ExcelFile(result) - worksheet_names: List[str] = workbook.sheet_names + xlworkbookr: Workbook = openpyxl.load_workbook(filename=result) + pdworkbookr = pd.ExcelFile(xlworkbookr, engine="openpyxl") + worksheetr_names: List[str] = pdworkbookr.sheet_names + + xlworkbooke: Workbook = openpyxl.load_workbook(filename=expected) + pdworkbooke = pd.ExcelFile(xlworkbooke, engine="openpyxl") + worksheete_names: List[str] = pdworkbooke.sheet_names + + parse_idx: Callable[[Union[str, int], BOOK, BOOK], BOOK] =\ + functools.partial( _parse_sheet_idx + , result_sheet_names=worksheetr_names + , expected_sheet_names=worksheete_names + ) passes = True - for r in rules: + for r in options["rules"]: if r["type"] == "sheet_name": - expected_name: str = worksheet_names[r["sheet_idx"]] - actual_name: str = r["sheet_name"] - metric: bool = expected_name == actual_name - logger.debug("Assertion: {:d}.{:} is {:} - {:}".format(r["sheet_idx"], actual_name, expected_name, metric)) - passes = passes and metric + # Compare Sheet Names {{{ # + # sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" + # sheet_idx1: as sheet_idx0 + + metric: bool = worksheetr_names==worksheete_names + logger.debug("Assertion: %s.sheet_names == %s.sheet_names - %s", r["sheet_idx0"], r["sheet_idx1"], metric) + # }}} Compare Sheet Names # + elif r["type"] == "sheet_data": - if isinstance(r["sheet_idx0"], int): - df1: pd.DataFrame = pd.read_excel(workbook, r["sheet_idx0"]) - else: - file_name: str - sheet_idx: str - file_name, sheet_idx = r["sheet_idx0"].rsplit("@", maxsplit=1) - sheet_idx: int = int(sheet_idx) - df1: pd.DataFrame = pd.read_excel(file_name, sheet_idx) - if isinstance(r["sheet_idx1"], int): - df2: pd.DataFrame = pd.read_excel(workbook, r["sheet_idx1"]) - else: - file_name: str - sheet_idx: str - file_name, sheet_idx = r["sheet_idx1"].rsplit("@", maxsplit=1) - sheet_idx: int = int(sheet_idx) - df2: pd.DataFrame = pd.read_excel(file_name, sheet_idx) - metric: bool = df1.equals(df2) - logger.debug("Assertion: {:} == {:} - {:}".format(r["sheet_idx0"], r["sheet_idx1"], metric)) - passes = passes and metric + # Compare Sheet Data by Internal Value {{{ # + # sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" + # sheet_idx1: as sheet_idx0 + + sheet1: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx0"], pdworkbookr, pdworkbooke)) + sheet2: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx1"], pdworkbookr, pdworkbooke)) + metric: bool = sheet1.equals(sheet2) + logger.debug("Assertion: %s =v= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric) + # }}} Compare Sheet Data by Internal Value # + + elif r["type"] == "sheet_print": + # Compare Sheet Data by Printed Value {{{ # + # sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" + # sheet_idx1: as sheet_idx0 + # ignore_case: optional, defaults to False + + sheet1: List[str] = _load_sheet(*parse_idx(r["sheet_idx0"], result, expected)) + sheet2: List[str] = _load_sheet(*parse_idx(r["sheet_idx1"], result, expected)) + if r.get("ignore_case", False): + sheet1 = [l.lower() for l in sheet1] + sheet2 = [l.lower() for l in sheet2] + metric: bool = sheet1 == sheet2 + logger.debug("Assertion: %s =p= %s - %s", r["sheet_idx0"], r["sheet_idx1"], metric) + # }}} Compare Sheet Data by Printed Value # + + elif r["type"] == "sparkline": + # Compare Sparklines {{{ # + # sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" + # sheet_idx1: as sheet_idx0 + + sparkline1: Dict[str, str] = load_sparklines(*parse_idx(r["sheet_idx0"], result, expected)) + sparkline2: Dict[str, str] = load_sparklines(*parse_idx(r["sheet_idx1"], result, expected)) + metric: bool = sparkline1 == sparkline2 + logger.debug("Assertion: %s.sp == %.sp - %s", r["sheet_idx0"], r["sheet_idx1"], metric) + # }}} Compare Sparklines # + + elif r["type"] == "chart": + # Compare Charts {{{ # + # sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" + # sheet_idx1: as sheet_idx0 + # chart_props: list of str, see utils.load_charts + + charts1: Dict[str, Any] = load_charts(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r) + charts2: Dict[str, Any] = load_charts(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r) + metric: bool = charts1 == charts2 + logger.debug("Assertion: %s[chart] == %s[chart] - %s", r["sheet_idx0"], r["sheet_idx1"], metric) + # }}} Compare Charts # + + elif r["type"] == "number_format": + # Compare Number Formats {{{ # + # sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" + # sheet_idx1: as sheet_idx0 + + sheet1: Worksheet = _load_sheet(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke)) + sheet2: Worksheet = _load_sheet(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke)) + number_formats1: List[str] = [c.number_format.lower() for col in sheet1.iter_cols() for c in col if c.data_type=="n"] + number_formats2: List[str] = [c.number_format.lower() for col in sheet2.iter_cols() for c in col if c.data_type=="n"] + metric: bool = number_formats1 == number_formats2 + logger.debug("Assertion: %s.nf == %s.nf - %s", r["sheet_idx0"], r["sheet_idx1"], metric) + # }}} Compare Number Formats # + + elif r["type"] == "freeze": + # Compare Freezing {{{ # + # sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" + # sheet_idx1: as sheet_idx0 + + sheet1: Worksheet = _load_sheet(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke)) + sheet2: Worksheet = _load_sheet(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke)) + metric: bool = sheet1.freeze_panes == sheet2.freeze_panes + logger.debug( "Assertion: %s.freeze(%s) == %s.freeze(%s) - %s" + , r["sheet_idx0"], sheet1.freeze_panes + , r["sheet_idx1"], sheet2.freeze_panes + , metric + ) + # }}} Compare Freezing # + + elif r["type"] == "zoom": + # Check Zooming {{{ # + # sheet_idx: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" + # method: str + # ref: value + + sheet: Worksheet = _load_sheet(*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke)) + zoom_scale: Number = sheet.sheet_view.zoomScale or 100. + metric: bool = _match_value_to_rule(zoom_scale, r) + logger.debug("Assertion: %s.zoom(%.1f) %s %.1f - %s", r["sheet_idx"], zoom_scale, r["method"], r["ref"], metric) + # }}} Check Zooming # + + elif r["type"] == "data_validation": + # Check Data Validation {{{ # + # sheet_idx: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1" + # dv_props: list of dict like {attribute: "method": str, "ref": anythin} + # available attributes: + # * ranges + # * type + # * formula1 + # * formula2 + # * operator + # * allowBlank + # * showDropDown + # * showInputMessage + # * showErrorMessage + # * error + # * errorTitle + # * errorStyle + # * prompt + # * promptTitle + # * imeMode + + sheet: Worksheet = _load_sheet(*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke)) + data_validators: List[DataValidation] = sheet.data_validations.dataValidation + + total_metric = True + for dat_vldt in data_validators: + metric = False + for r in r["dv_props"]: + metric = metric or all( _match_value_to_rule( getattr(dat_vldt, attrbt) + , mr + )\ + for attrbt, mr in r.items() + ) + if metric: + break + total_metric = total_metric and metric + if not total_metric: + break + + logger.debug("Assertion: %s.data_validation - %s", r["sheet_idx"], total_metric) + metric: bool = total_metric + # }}} Check Data Validation # + else: raise NotImplementedError("Unimplemented sheet check: {:}".format(r["type"])) + passes = passes and metric + return float(passes) - - -def check_xlsx_freeze(result: str, rules: Dict[str, str]) -> float: - if result is None: - return 0. - - worksheet: Worksheet = openpyxl.load_workbook(filename=result).active - return float(worksheet.freeze_panes == rules["position"]) - - -def check_xlsx_zoom(result: str, rules: Dict[str, Union[str, Number]]) -> float: - if result is None: - return 0. - - worksheet = openpyxl.load_workbook(filename=result).active - zoom_scale: Number = worksheet.sheet_view.zoomScale or 100. - return float(getattr(operator, rules["relation"])(zoom_scale - , rules["ref_value"] - ) - ) - -def check_data_validations(result: str, rules: List[Dict[str, Dict[str, Any]]]) -> float: - """ - Args: - result (str): path to the concerned xlsx file - rules (List[Dict[str, Dict[str, Any]]]): list of dict like - { - : { - "method": str - "ref": something - } - } - Available attributes: - * ranges - * type - * formula1 - * formula2 - * operator - * allowBlank - * showDropDown - * showInputMessage - * showErrorMessage - * error - * errorTitle - * errorStyle - * prompt - * promptTitle - * imeMode - - Returns: - float - """ - - workbook: Workbook = openpyxl.load_workbook(result) - worksheet: Worksheet = workbook.active - data_validators: List[DataValidation] = worksheet.data_validations.dataValidation - - total_metric = True - for dat_vldt in data_validators: - metric = False - for r in rules: - metric = metric or all( _match_value_to_rule( getattr(dat_vldt, attrbt) - , mr - )\ - for attrbt, mr in r.items() - ) - if metric: - break - total_metric = total_metric and metric - if not total_metric: - break - return float(total_metric) + # }}} function compare_table # if __name__ == '__main__': # path1 = "" diff --git a/desktop_env/evaluators/metrics/utils.py b/desktop_env/evaluators/metrics/utils.py index 2362a0d..841222c 100644 --- a/desktop_env/evaluators/metrics/utils.py +++ b/desktop_env/evaluators/metrics/utils.py @@ -22,21 +22,21 @@ V = TypeVar("Value") logger = logging.getLogger("desktopenv.metrics.utils") -_xlsx_namespaces = [("x14", "http://schemas.microsoft.com/office/spreadsheetml/2009/9/main") - , ("xm", "http://schemas.microsoft.com/office/excel/2006/main") - ] +_xlsx_namespaces = [ ("oo", "http://schemas.openxmlformats.org/spreadsheetml/2006/main") + , ("x14", "http://schemas.microsoft.com/office/spreadsheetml/2009/9/main") + , ("xm", "http://schemas.microsoft.com/office/excel/2006/main") + ] _xlsx_ns_mapping = dict(_xlsx_namespaces) _xlsx_ns_imapping = dict(map(lambda itm: (itm[1], itm[0]), _xlsx_namespaces)) +_sheet_name_selector = lxml.cssselect.CSSSelector("oo|sheets>oo|sheet", namespaces=_xlsx_ns_mapping) _sparklines_selector = lxml.cssselect.CSSSelector("x14|sparkline", namespaces=_xlsx_ns_mapping) - - -# print(_sparklines_selector.css) -def load_sparklines(xlsx_file: str) -> Dict[str, str]: +def load_sparklines(xlsx_file: str, sheet_name: str) -> Dict[str, str]: """ This function modifies data_frame in-place Args: xlsx_file (str): path to xlsx + sheet_name (str): sheet name Returns: List[Dict[str, str]]: sparkline definitions in form of @@ -47,9 +47,13 @@ def load_sparklines(xlsx_file: str) -> Dict[str, str]: # read xlsx with zipfile.ZipFile(xlsx_file, "r") as z_f: - with z_f.open("xl/worksheets/sheet1.xml") as f: - sheet1: _Element = lxml.etree.fromstring(f.read()) - sparklines: List[_Element] = _sparklines_selector(sheet1) + with z_f.open("xl/workbook.xml") as f: + workbook_database: _Element = lxml.etree.fromstring(f.read()) + sheets: List[_Element] = _sheet_name_selector(workbook_database) + sheet_names: Dict[str, str] = {sh.get("name"): sh.get("sheetId") for sh in sheets} + with z_f.open("xl/worksheets/sheet{:}.xml".format(sheet_names[sheet_name])) as f: + sheet: _Element = lxml.etree.fromstring(f.read()) + sparklines: List[_Element] = _sparklines_selector(sheet) sparklines_dict: Dict[str, str] = {} for sp_l in sparklines: @@ -70,10 +74,11 @@ def load_sparklines(xlsx_file: str) -> Dict[str, str]: # type: "scatterChart" | "lineChart" | "barChart" # direction: "bar" (hori) | "col" (vert) # xtitle, ytitle, ztitle: str -def load_charts(xlsx_file: Workbook, **options) -> Dict[str, Any]: +def load_charts(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[str, Any]: """ Args: xlsx_file (Workbook): concerned excel book + sheet_name (str): sheet name options (Dict[str, List[str]]): dict like {"chart_props": list of str} giving the concerned chart properties @@ -82,7 +87,7 @@ def load_charts(xlsx_file: Workbook, **options) -> Dict[str, Any]: """ # workbook: Workbook = openpyxl.load_workbook(filename=xlsx_file) - worksheet: Worksheet = xlsx_file.active + worksheet: Worksheet = xlsx_file[sheet_name] charts: List[ChartBase] = worksheet._charts chart_set: Dict[str, Any] = {}