Files
sci-gui-agent-benchmark/desktop_env/evaluators/metrics/thunderbird.py
2024-02-01 00:53:31 +08:00

233 lines
10 KiB
Python

from typing import List, Pattern, Dict, Match
from typing import Union, Any, TypeVar, Callable
import re
import json
from .utils import _match_record
from .utils import _match_value_to_rule as _match_pref
import logging
logger = logging.getLogger("desktopenv.metric.thunderbird")
V = TypeVar("Value")
_pref_pattern: Pattern[str] = re.compile(r'^user_pref\("(?P<key>(?:[^"]|\\")+)\", (?P<val>.+)\);$');
def check_thunderbird_prefs(result: str, rule: Dict[str, Dict[str, Dict[str, Any]]]):
"""
Args:
result (str): path to result file
rule (Dict[str, Dict[str, Dict[str, Any]]]): dict like
{
"expect": {
str: {
"method": str
"ref": something
}
}
"unexpect": {
str: {
"method": str
"ref": something
}
}
}
Returns:
float
"""
if result is None:
return 0.
expect_rules = rule.get("expect", {})
unexpect_rules = rule.get("unexpect", {})
expect_metrics = {k: False for k in expect_rules}
unexpect_metric = True
with open(result) as f:
for l in f:
match_: Match[str] = _pref_pattern.match(l.strip())
if match_ is None:
continue
key: str = match_.group("key")
#value: str = match_.group("val")
#if value in {"true", "false"}:
#value = value.title()
#value: V = eval(value)
value = json.loads(match_.group("val"))
if key in expect_rules:
logger.debug("K: %s, V: %s", key, repr(value))
expect_metrics[key] = _match_pref(value, expect_rules[key])
elif key in unexpect_rules:
unexpect_metric = unexpect_metric and not _match_pref(value, unexpect_rules[key])
return float(all(expect_metrics.values()) and unexpect_metric)
_value_processor: Callable[[str], str] = lambda val: val.replace("\\\"", "\"").replace("\\\\", "\\")
#_condition_pattern: Pattern[str] = re.compile(r'(?P<type>AND|OR) \((?P<key>[\w ]+),(?P<rel>[\w ' + '\'' + r']+),(?:"(?P<val2>(?:[^"]|\")+)"|(?P<val1>[^)]+))\)')
_condition_pattern: Pattern[str] = re.compile(r'\b(?:AND|OR) \((?:[\w ]+),(?:[\w ' + '\'' + r']+),(?:"(?:(?:[^"]|\")+)"|(?:[^)]+))\)|\bALL\b')
def check_thunderbird_filter(result: str, rules: Dict[str, List[Dict[str, str]]]) -> float:
"""
Args:
result (str): path to filter def file
rules (Dict[str, List[Dict[str, str]]]): dict like
{
"expect": [{key: value}]
"unexpect": [{key: value}]
}
Returns:
float
"""
if result is None:
return 0.
# read filter def file
# a filter:
# {
# "name": "Name",
# "enabled": "yes" | "no",
# "type": "17",
# "action": "Move to folder" | ...,
# "actionValue": ...,
# "condition": [...]
# }
filters: List[Dict[str, Union[str, List[str]]]] = []
with open(result) as f:
for l in f:
if l.startswith("name="):
filter_: Dict[str, Union[str, List[str]]] = {}
filter_["name"] = _value_processor(l[6:-2])
elif l.startswith("enabled="):
filter_["enabled"] = _value_processor(l[9:-2])
elif l.startswith("type="):
filter_["type"] = _value_processor(l[6:-2])
elif l.startswith("action="):
filter_["action"] = _value_processor(l[8:-2])
elif l.startswith("actionValue="):
filter_["actionValue"] = _value_processor(l[13:-2])
elif l.startswith("condition="):
condition_str: str = _value_processor(l[11:-2])
logger.debug("FILTER CONDITION: %s", condition_str)
conditions: List[str] =\
_condition_pattern.findall(condition_str)
logger.debug("FILTER CONDITIONS: %s", repr(conditions))
filter_["condition"] = conditions
logger.debug("FILTER %s", repr(filter_))
filters.append(filter_)
expect_metrics = [False] * len(rules.get("expect", []))
unexpect_metric = True
for flt in filters:
for i, r in enumerate(rules.get("expect", [])):
expect_metrics[i] = expect_metrics[i] or _match_record(r, flt)
unexpect_metric = unexpect_metric and not any(_match_record(r, flt) for r in rules.get("unexpect", []))
return float(all(expect_metrics) and unexpect_metric)
def check_thunderbird_folder(result: Union[str, List[str]], reference: Union[str, List[str]], **kwargs) -> float:
"""
Check the file or file_list that each text file contains all messages in a folder in Thunderbird. Each message is started with `FROM - `.
**kwargs:
ignore_status (bool): for comparison, ignore the status (X-Mozilla-Status: 0000) of each message. default: False
ignore_keys (bool): for comparison, ignore the keys (X-Mozilla-Keys: label) of each message. default: False
remove_deleted (bool): ignore deleted messages which has status code 0008 or 0009. default: True
remove_duplicate (bool): remove duplicate messages. default: True
"""
def normalize_msg(msg, options):
ignore_status = options.get('ignore_status', False)
ignore_keys = options.get('ignore_keys', False)
if ignore_status:
msg = re.sub(r'X-Mozilla-Status\d?:[\s\d]+', '', msg)
if ignore_keys:
msg = re.sub(r'(X-Mozilla-Keys:[^\n]*?)\n(MIME-Version)', r'\2', msg)
return msg.strip()
def read_thunderbird_folder_file(path: str) -> str:
with open(path, 'r') as inf:
data = inf.read().strip()
messages = []
for mail in data.split('FROM - '):
if mail.strip(): continue
if kwargs.get('remove_deleted', True) and re.search(r'X-Mozilla-Status: 000[89]', mail): continue
messages.append('FROM - ' + normalize_msg(mail, kwargs))
if kwargs.get('remove_duplicate', True):
messages = set(messages)
return '\n'.join(sorted(messages))
if type(reference) != list:
result, reference = [result], [reference]
for pred, gold in zip(result, reference):
if pred is None: return .0
mail1 = read_thunderbird_folder_file(pred)
mail2 = read_thunderbird_folder_file(gold)
if mail1 != mail2: return .0
return 1.0
if __name__ == "__main__":
#import lxml.etree
#from lxml.cssselect import CSSSelector
#from lxml.etree import _Element
#xml = "../../任务数据/Thunderbird/vertical-card-view.xml"
#xml = "../../任务数据/Thunderbird/vertical-table-view.xml"
#at: _Element = lxml.etree.parse(xml)
#elements: List[_Element] = CSSSelector('application[name=Thunderbird] page-tab-list')(at) # page tab tags
#elements: List[_Element] = CSSSelector('application[name=Thunderbird] panel>scroll-pane>internal-frame>panel[name$="anonym-x2024@outlook.com"]')(at) # email tag page
#elements: List[_Element] = CSSSelector('application[name=Thunderbird] panel>scroll-pane>internal-frame>panel[name$="anonym-x2024@outlook.com"]>section:nth-child(3)')(at) # email tag page
#elements: List[_Element] = CSSSelector('application[name=Thunderbird] panel>scroll-pane>internal-frame>panel[name$="anonym-x2024@outlook.com"]>section[attr|id=threadPane]>section[attr|id="threadTree"]>table[attr|class="tree-table"]>section[attr|class~="tree-table-header"]>table-row>column-header[name=Subject]>push-button', namespaces={"attr": "uri:deskat:attributes.at-spi.gnome.org"})(at) # table view, column header
#elements: List[_Element] = CSSSelector('application[name=Thunderbird] panel>scroll-pane>internal-frame>panel[name$="anonym-x2024@outlook.com"]>section[attr|id=threadPane]>section[attr|id="threadTree"]>table[attr|class="tree-table"]>tree>tree-item>section[name="Subject"]>section>section', namespaces={"attr": "uri:deskat:attributes.at-spi.gnome.org"})(at) # table view, column header
#print(len(elements))
#for elm in elements:
#print(lxml.etree.tostring(elm, encoding="unicode", pretty_print=True))
import datetime
import os
import sys
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
file_handler = logging.FileHandler(os.path.join("logs", "normal-{:}.log".format(datetime_str)))
debug_handler = logging.FileHandler(os.path.join("logs", "debug-{:}.log".format(datetime_str)))
stdout_handler = logging.StreamHandler(sys.stdout)
sdebug_handler = logging.FileHandler(os.path.join("logs", "sdebug-{:}.log".format(datetime_str)))
file_handler.setLevel(logging.INFO)
debug_handler.setLevel(logging.DEBUG)
stdout_handler.setLevel(logging.INFO)
sdebug_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s")
file_handler.setFormatter(formatter)
debug_handler.setFormatter(formatter)
stdout_handler.setFormatter(formatter)
sdebug_handler.setFormatter(formatter)
stdout_handler.addFilter(logging.Filter("desktopenv"))
sdebug_handler.addFilter(logging.Filter("desktopenv"))
logger.addHandler(file_handler)
logger.addHandler(debug_handler)
logger.addHandler(stdout_handler)
logger.addHandler(sdebug_handler)
print( check_thunderbird_filter( "../../任务数据/Thunderbird/msgFilterRules.dat"
, { "expect": [ { "enabled": "yes"
, "action": "Move to folder"
, "actionValue": "mailbox://nobody@Local%20Folders/Promotions"
, "condition": ["AND (subject,contains,discount)"]
}
]
}
)
)