Merge branch 'zdy'
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import logging
|
||||
#import operator
|
||||
from numbers import Number
|
||||
from typing import Any, Union, cast, Callable
|
||||
from typing import Any, Union, cast, Callable, Iterable
|
||||
from typing import Dict, List, Tuple
|
||||
import os.path
|
||||
import itertools
|
||||
@@ -13,9 +13,11 @@ from openpyxl import Workbook
|
||||
from openpyxl.worksheet.worksheet import Worksheet
|
||||
#from openpyxl.worksheet.cell_range import MultiCellRange
|
||||
from openpyxl.worksheet.datavalidation import DataValidation
|
||||
from openpyxl.cell.cell import Cell
|
||||
#from openpyxl.utils import coordinate_to_tuple
|
||||
|
||||
from .utils import load_charts, load_sparklines, load_rows_or_cols, load_xlsx_styles
|
||||
from .utils import _match_value_to_rule
|
||||
from .utils import _match_value_to_rule, _read_cell_style, read_cell_value
|
||||
|
||||
logger = logging.getLogger("desktopenv.metric.table")
|
||||
|
||||
@@ -91,11 +93,11 @@ def compare_table(result: str, expected: str, **options) -> float:
|
||||
return 0.
|
||||
|
||||
xlworkbookr: Workbook = openpyxl.load_workbook(filename=result)
|
||||
pdworkbookr = pd.ExcelFile(xlworkbookr, engine="openpyxl")
|
||||
pdworkbookr = pd.ExcelFile(result)
|
||||
worksheetr_names: List[str] = pdworkbookr.sheet_names
|
||||
|
||||
xlworkbooke: Workbook = openpyxl.load_workbook(filename=expected)
|
||||
pdworkbooke = pd.ExcelFile(xlworkbooke, engine="openpyxl")
|
||||
pdworkbooke = pd.ExcelFile(expected)
|
||||
worksheete_names: List[str] = pdworkbooke.sheet_names
|
||||
|
||||
parse_idx: Callable[[Union[str, int], BOOK, BOOK], BOOK] =\
|
||||
@@ -165,7 +167,7 @@ def compare_table(result: str, expected: str, **options) -> float:
|
||||
# Compare Style (Also Conditional Formatiing) {{{ #
|
||||
# sheet_idx0: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
|
||||
# sheet_idx1: as sheet_idx0
|
||||
# props: list of str indicating concerned styles
|
||||
# props: list of str indicating concerned styles, see utils._read_cell_style
|
||||
|
||||
styles1: Dict[str, List[Any]] = load_xlsx_styles(*parse_idx(r["sheet_idx0"], xlworkbookr, xlworkbooke), **r)
|
||||
styles2: Dict[str, List[Any]] = load_xlsx_styles(*parse_idx(r["sheet_idx1"], xlworkbookr, xlworkbooke), **r)
|
||||
@@ -283,6 +285,31 @@ def compare_table(result: str, expected: str, **options) -> float:
|
||||
logger.debug("Assertion: %s[cols] == %s[cols] - %s", r["sheet_idx0"], r["sheet_idx1"], metric)
|
||||
# }}} Check Row Properties #
|
||||
|
||||
elif r["type"] == "check_cell":
|
||||
# Check Cell Properties {{{ #
|
||||
# sheet_idx: 0 == "RI0" == "RNSheet1" | "EI0" == "ENSheet1"
|
||||
# coordinate: str, "E3"
|
||||
# props: dict like {attribute: {"method": str, "ref": anything}}
|
||||
# supported attributes: value & those supported by utils._read_cell_style
|
||||
|
||||
sheet: Worksheet = _load_sheet(*parse_idx(r["sheet_idx"], xlworkbookr, xlworkbooke))
|
||||
#data_frame: pd.DataFrame = _load_sheet(*parse_idx(r["sheet_idx"], pdworkbookr, pdworkbooke))
|
||||
cell: Cell = sheet[r["coordinate"]]
|
||||
metric: bool = True
|
||||
for prpt, rule in r["props"].items():
|
||||
if prpt=="value":
|
||||
val = read_cell_value(*parse_idx(r["sheet_idx"], result, expected), r["coordinate"])
|
||||
else:
|
||||
val = _read_cell_style(prpt, cell)
|
||||
|
||||
metric = metric and _match_value_to_rule(val, rule)
|
||||
|
||||
logger.debug( "Assertion: %s[%s] :%s - %s"
|
||||
, r["sheet_idx"], r["coordinate"]
|
||||
, repr(r["props"]), metric
|
||||
)
|
||||
# }}} Check Cell Properties #
|
||||
|
||||
else:
|
||||
raise NotImplementedError("Unimplemented sheet check: {:}".format(r["type"]))
|
||||
|
||||
@@ -293,6 +320,24 @@ def compare_table(result: str, expected: str, **options) -> float:
|
||||
return float(passes)
|
||||
# }}} function compare_table #
|
||||
|
||||
def compare_csv(result: str, expected: str, **options) -> float:
|
||||
if result is None:
|
||||
return 0.
|
||||
|
||||
with open(result) as f:
|
||||
result_lines: Iterable[str] = f.read().splitlines()
|
||||
with open(expected) as f:
|
||||
expected_lines: Iterable[str] = f.read().splitlines()
|
||||
if not options.get("strict", True):
|
||||
result_lines = map(str.strip, result_lines)
|
||||
expected_lines = map(str.strip, expected_lines)
|
||||
if options.get("ignore_case", False):
|
||||
result_lines = map(str.lower, result_lines)
|
||||
expected_lines = map(str.lower, expected_lines)
|
||||
|
||||
metric: bool = list(result_lines)==list(expected_lines)
|
||||
return float(metric)
|
||||
|
||||
if __name__ == '__main__':
|
||||
import datetime
|
||||
import sys
|
||||
@@ -326,16 +371,15 @@ if __name__ == '__main__':
|
||||
logger.addHandler(stdout_handler)
|
||||
logger.addHandler(sdebug_handler)
|
||||
|
||||
path1 = "../../任务数据/LibreOffice Calc/Calendar_Highlight_Weekend_Days.xlsx"
|
||||
path2 = "../../任务数据/LibreOffice Calc/Calendar_Highlight_Weekend_Days_gold.xlsx"
|
||||
rules = [ { "type": "sheet_data"
|
||||
, "sheet_idx0": 0
|
||||
, "sheet_idx1": "EI0"
|
||||
}
|
||||
, { "type": "style"
|
||||
, "sheet_idx0": 0
|
||||
, "sheet_idx1": "EI0"
|
||||
, "props": ["bgcolor"]
|
||||
path1 = "../../任务数据/LibreOffice Calc/Multiply_Time_Number.xlsx"
|
||||
path2 = "../../任务数据/LibreOffice Calc/Multiply_Time_Number_gold.xlsx"
|
||||
rules = [ { "type": "check_cell"
|
||||
, "sheet_idx": 0
|
||||
, "coordinate": "E3"
|
||||
, "props": { "value": { "method": "approx:0.001"
|
||||
, "ref": 191.6667
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
print( compare_table( path1, path2
|
||||
|
||||
@@ -34,6 +34,7 @@ _xlsx_namespaces = [ ("oo", "http://schemas.openxmlformats.org/spreadsheetml/200
|
||||
]
|
||||
_xlsx_ns_mapping = dict(_xlsx_namespaces)
|
||||
_xlsx_ns_imapping = dict(map(lambda itm: (itm[1], itm[0]), _xlsx_namespaces))
|
||||
_xlsx_ns_imapping["http://schemas.openxmlformats.org/spreadsheetml/2006/main"] = None
|
||||
_sheet_name_selector = lxml.cssselect.CSSSelector("oo|sheets>oo|sheet", namespaces=_xlsx_ns_mapping)
|
||||
_sparklines_selector = lxml.cssselect.CSSSelector("x14|sparkline", namespaces=_xlsx_ns_mapping)
|
||||
def load_sparklines(xlsx_file: str, sheet_name: str) -> Dict[str, str]:
|
||||
@@ -154,6 +155,48 @@ def load_charts(xlsx_file: Workbook, sheet_name: str, **options) -> Dict[str, An
|
||||
return chart_set
|
||||
# }}} function load_charts #
|
||||
|
||||
_shared_str_selector = lxml.cssselect.CSSSelector("oo|sst>oo|si>oo|t", namespaces=_xlsx_ns_mapping)
|
||||
def read_cell_value(xlsx_file: str, sheet_name: str, coordinate: str) -> Any:
|
||||
# read_cell_value {{{ #
|
||||
with zipfile.ZipFile(xlsx_file, "r") as z_f:
|
||||
try:
|
||||
with z_f.open("xl/sharedStrings.xml") as f:
|
||||
shared_str_xml: _Element = lxml.etree.fromstring(f.read())
|
||||
str_elements: List[_Element] = _shared_str_selector(shared_str_xml)
|
||||
shared_strs: List[str] = [elm.text for elm in str_elements]
|
||||
except:
|
||||
logger.debug("Read shared strings error: %s", xlsx_file)
|
||||
|
||||
with z_f.open("xl/workbook.xml") as f:
|
||||
workbook_database: _Element = lxml.etree.fromstring(f.read())
|
||||
sheets: List[_Element] = _sheet_name_selector(workbook_database)
|
||||
sheet_names: Dict[str, str] = {sh.get("name"): sh.get("sheetId") for sh in sheets}
|
||||
|
||||
with z_f.open("xl/worksheets/sheet{:}.xml".format(sheet_names[sheet_name])) as f:
|
||||
sheet: _Element = lxml.etree.fromstring(f.read())
|
||||
cells: List[_Element] =\
|
||||
lxml.cssselect.CSSSelector( 'oo|row>oo|c[r="{:}"]'.format(coordinate)
|
||||
, namespaces=_xlsx_ns_mapping
|
||||
)(sheet)
|
||||
if len(cells)==0:
|
||||
return None
|
||||
cell: _Element = cells[0]
|
||||
|
||||
cell: Dict[str, str] = xmltodict.parse( lxml.etree.tostring(cell, encoding="unicode")
|
||||
, process_namespaces=True
|
||||
, namespaces=_xlsx_ns_imapping
|
||||
)
|
||||
logger.debug("%s.%s[%s]: %s", xlsx_file, sheet_name, coordinate, repr(cell))
|
||||
if "@t" not in cell["c"]:
|
||||
return None
|
||||
if cell["c"]["@t"] == "s":
|
||||
return shared_strs[int(cell["c"]["v"])]
|
||||
if cell["c"]["@t"] == "n":
|
||||
return float(cell["c"]["v"])
|
||||
if cell["c"]["@t"] == "str":
|
||||
return cell["c"]["v"]
|
||||
# }}} read_cell_value #
|
||||
|
||||
# Supported Styles:
|
||||
# number_format
|
||||
# font_name - str
|
||||
@@ -311,6 +354,15 @@ def _match_value_to_rule(value: V, rule: Dict[str, Union[str, V]]) -> bool:
|
||||
, "ge", "gt"
|
||||
}:
|
||||
return getattr(operator, rule["method"])(value, rule["ref"])
|
||||
if rule["method"].startswith("approx"):
|
||||
threshold: float = float(rule["method"].split(":")[1])
|
||||
logger.debug("Approx: TH%f, REF%f, VAL%s", threshold, rule["ref"], repr(value))
|
||||
try:
|
||||
value = float(value)
|
||||
except (ValueError, TypeError):
|
||||
return False
|
||||
else:
|
||||
return abs(value-rule["ref"])<=threshold
|
||||
if rule["method"] == "spreadsheet_range":
|
||||
subset_limit = MultiCellRange(rule["ref"][0])
|
||||
superset_limit = MultiCellRange(rule["ref"][1])
|
||||
|
||||
Reference in New Issue
Block a user