Merge remote-tracking branch 'origin/main'

# Conflicts:
#	desktop_env/controllers/setup.py
#	desktop_env/evaluators/metrics/utils.py
This commit is contained in:
Timothyxxx
2024-01-12 17:30:15 +08:00
67 changed files with 4514 additions and 272 deletions

View File

@@ -1,17 +1,18 @@
import json
import logging
import os.path
import time
import os.path
import traceback
import uuid
from typing import Any, Union
from typing import Dict, List
from typing import Any, Union, Optional
import requests
from playwright.sync_api import sync_playwright
from requests_toolbelt.multipart.encoder import MultipartEncoder
from desktop_env.evaluators.metrics.utils import compare_urls
import logging
logger = logging.getLogger("desktopenv.setup")
@@ -47,6 +48,8 @@ class SetupController:
assert hasattr(self, setup_function)
getattr(self, setup_function)(**parameters)
logger.info("SETUP: %s(%s)", setup_function, str(parameters))
# self._download_setup(config)
# self._change_wallpaper(config)
# self._tidy_desktop(config) todo: implement this
@@ -54,31 +57,31 @@ class SetupController:
# can add other setup steps
# ZDY_COMMENT: merged with launch
# def _command_setup(self, command: str):
# """
# Directly send a command into the virtual machine os for setting up.
# """
# payload = json.dumps({"command": command})
# headers = {
# 'Content-Type': 'application/json'
# }
# timeout = 5
# timout_whitelist = ["vlc"]
#
# try:
#
# response = requests.post(self.http_server + "/execute", headers=headers, data=payload, timeout=timeout)
# if response.status_code == 200:
# print("Command executed successfully:", response.text)
# else:
# print("Failed to execute command. Status code:", response.status_code)
# except requests.exceptions.Timeout as e:
# if command in timout_whitelist:
# print("Command executed successfully:", command)
# else:
# print("An error occurred while trying to execute the command:", e)
# except requests.exceptions.RequestException as e:
# print("An error occurred while trying to execute the command:", e)
#def _command_setup(self, command: str):
#"""
#Directly send a command into the virtual machine os for setting up.
#"""
#payload = json.dumps({"command": command})
#headers = {
#'Content-Type': 'application/json'
#}
#timeout = 5
#timout_whitelist = ["vlc"]
#
#try:
#
#response = requests.post(self.http_server + "/execute", headers=headers, data=payload, timeout=timeout)
#if response.status_code == 200:
#print("Command executed successfully:", response.text)
#else:
#print("Failed to execute command. Status code:", response.status_code)
#except requests.exceptions.Timeout as e:
#if command in timout_whitelist:
#print("Command executed successfully:", command)
#else:
#print("An error occurred while trying to execute the command:", e)
#except requests.exceptions.RequestException as e:
#print("An error occurred while trying to execute the command:", e)
def _download_setup(self, files: List[Dict[str, str]]):
"""
@@ -221,35 +224,60 @@ class SetupController:
except requests.exceptions.RequestException as e:
logger.error("An error occurred while trying to send the request: %s", e)
def _execute_setup(self, command: List[str], stdout: str = "", stderr: str = ""):
def _execute_setup( self, command: List[str]
, stdout: str = "", stderr: str = ""
, shell: bool = False, until: Optional[Dict[str, Any]] = None):
if not command:
raise Exception("Empty comman to launch.")
payload = json.dumps({"command": command})
until: Dict[str, Any] = until or {}
terminates: bool = False
nb_failings = 0
payload = json.dumps({"command": command, "shell": shell})
headers = {"Content-Type": "application/json"}
try:
response = requests.post(self.http_server + "/setup" + "/execute", headers=headers, data=payload)
if response.status_code == 200:
results: Dict[str, str] = response.json()
if stdout:
with open(os.path.join(self.cache_dir, stdout), "w") as f:
f.write(results["output"])
if stderr:
with open(os.path.join(self.cache_dir, stderr), "w") as f:
f.write(results["error"])
logger.info("Command executed successfully: %s -> %s"
, " ".join(command)
, response.text
)
else:
logger.error("Failed to launch application. Status code: %s", response.text)
except requests.exceptions.RequestException as e:
logger.error("An error occurred while trying to send the request: %s", e)
traceback.print_exc()
while not terminates:
try:
response = requests.post(self.http_server_setup_root + "/execute", headers=headers, data=payload)
if response.status_code == 200:
results: Dict[str, str] = response.json()
if stdout:
with open(os.path.join(self.cache_dir, stdout), "w") as f:
f.write(results["output"])
if stderr:
with open(os.path.join(self.cache_dir, stderr), "w") as f:
f.write(results["error"])
logger.info( "Command executed successfully: %s -> %s"
, " ".join(command)
, response.text
)
else:
logger.error("Failed to launch application. Status code: %s", response.text)
results = None
nb_failings += 1
except requests.exceptions.RequestException as e:
logger.error("An error occurred while trying to send the request: %s", e)
traceback.print_exc()
def _command_setup(self, command: List[str], stdout: str = "", stderr: str = ""):
self._execute_setup(command, stdout, stderr)
results = None
nb_failings += 1
if len(until)==0:
terminates = True
elif results is not None:
terminates = "returncode" in until and results["returncode"]==until["returncode"]\
or "stdout" in until and until["stdout"] in results["output"]\
or "stderr" in until and until["stderr"] in results["error"]
terminates = terminates or nb_failings>=5
if not terminates:
time.sleep(0.3)
def _command_setup(self, command: List[str], **kwargs):
self._execute_setup(command, **kwargs)
def _sleep_setup(self, seconds: float):
time.sleep(seconds)
def _act_setup(self, action_seq: List[Union[Dict[str, Any], str]]):
# TODO

View File

@@ -1,5 +1,4 @@
from .file import get_cloud_file, get_vm_file, get_cache_file
from .misc import get_rule
from .info import get_vm_screen_size, get_vm_window_size, get_vm_wallpaper
from .misc import get_rule, get_accessibility_tree
from .vlc import get_vlc_playing_info, get_vlc_config

View File

@@ -10,3 +10,8 @@ from .table import check_sheet_list, check_xlsx_freeze, check_xlsx_zoom
from .table import compare_table
from .vlc import is_vlc_playing, is_vlc_recordings_folder, is_vlc_fullscreen, compare_images, compare_audios, \
compare_videos
from .gimp import increase_saturation, decrease_brightness
from .general import check_csv, check_accessibility_tree, check_list
from .thunderbird import check_thunderbird_prefs, check_thunderbird_filter

View File

@@ -11,6 +11,7 @@ from lxml.cssselect import CSSSelector
from lxml.etree import _Element
from rapidfuzz import fuzz
from .utils import _match_record
def exact_match(result, rules) -> float:
expect = rules["expected"]
@@ -27,11 +28,6 @@ def fuzzy_match(result, rules) -> float:
return fuzz.ratio(result, expect) / 100.
def _match_record(pattern: Dict[str, str], item: Dict[str, str]) -> float:
return all(k in item and item[k] == val for k, val in pattern.items())
def check_csv(result: str, rules: Dict[str, List[Dict[str, str]]]) -> float:
"""
Args:
@@ -46,6 +42,9 @@ def check_csv(result: str, rules: Dict[str, List[Dict[str, str]]]) -> float:
float
"""
if result is None:
return 0.
expect_metrics = [False] * len(rules.get("expect", []))
unexpect_metric = True
with open(result) as f:
@@ -72,6 +71,9 @@ def check_list(result: str, rules: Dict[str, List[str]]) -> float:
float
"""
if result is None:
return 0.
expect_patterns: List[Pattern[str]] = [re.compile(ptt) for ptt in rules.get("expect", [])]
unexpect_patterns: List[Pattern[str]] = [re.compile(ptt) for ptt in rules.get("unexpect", [])]
@@ -130,9 +132,10 @@ def check_accessibility_tree(result: str, rules: Dict[str, Any]) -> float:
return 0.
if "text" in rules:
match_func: Callable[[str], Number] = functools.partial(operator.eq if rules["exact"] else fuzz.ratio
, rules["text"]
)
match_func: Callable[[str], Number] = functools.partial( operator.eq if rules["exact"]\
else (lambda a, b: fuzz.ratio(a, b)/100.)
, rules["text"]
)
match_score: Number = 0
for elm in elements:
match_score = max(match_score, match_func(elm.text or None))

View File

@@ -1,5 +1,5 @@
import os
from PIL import Image, ImageChops, ImageStat
def get_gimp_export_path():
# Path to GIMP's configuration file. This example assumes GIMP version 2.10.
@@ -20,3 +20,52 @@ def get_gimp_export_path():
# Handle the case where the configuration file is not found
print("GIMP configuration file not found")
return False
def increase_saturation(image1_path: str, image2_path: str) -> float:
def calculate_saturation(image):
# convert the image to HSV mode
hsv_image = image.convert("HSV")
saturation_channel = hsv_image.split()[1]
# calculate the mean saturation level
stat = ImageStat.Stat(saturation_channel)
mean_saturation = stat.mean[0]
return mean_saturation
image1 = Image.open(image1_path)
image2 = Image.open(image2_path)
# calculate the saturation level of each image
saturation1 = calculate_saturation(image1)
saturation2 = calculate_saturation(image2)
return 1 if saturation1 < saturation2 else 0
def decrease_brightness(image1_path: str, image2_path: str) -> float:
def calculate_brightness(image):
# Convert the image to grayscale mode
grayscale_image = image.convert("L")
# Get the image data
pixels = list(grayscale_image.getdata())
brightness = sum(pixels) / len(pixels)
return brightness
image1 = Image.open(image1_path)
image2 = Image.open(image2_path)
brightness1 = calculate_brightness(image1)
brightness2 = calculate_brightness(image2)
return 1 if brightness1 > brightness2 else 0
if __name__ == "__main__":
image1_path = "../Downloads/1.png"
image2_path = "../Downloads/edited_darker.png"
decrease_brightness(image1_path, image2_path)
increase_saturation(image1_path, image2_path)

View File

@@ -1,53 +1,210 @@
#from playwright.sync_api import sync_playwright, Browser
#from marionette_driver.marionette import Marionette
#import marionette
#import pyatspi
from typing import List, Pattern, Dict, Match, Tuple
from typing import Union, Any, Optional, Iterable, TypeVar, Callable
import lxml.etree
from lxml.cssselect import CSSSelector
from lxml.etree import _Element
import re
import functools
import operator
import json
from .utils import _match_record
from typing import List
import logging
logger = logging.getLogger("desktopenv.metric.thunderbird")
V = TypeVar("Value")
def _match_pref(value: Any, rule: Dict[str, Union[str, Any]]) -> bool:
# function _match_pref {{{ #
if rule["method"].startswith("re"):
flags: List[str] = rule["method"].split(".")[1:]
flags: Iterable[re.RegexFlag] = (getattr(re, fl) for fl in flags)
flag: re.RegexFlag = functools.reduce(operator.or_, flags, re.RegexFlag(0))
logger.debug("REFLAG: %s", repr(flag))
match_: Optional[Match[str]] = re.search(rule["ref"], value, flag)
return match_ is not None
if rule["method"] in { "eq", "ne"
, "le", "lt"
, "ge", "gt"
}:
return getattr(operator, rule["method"])(value, rule["ref"])
raise NotImplementedError()
# }}} function _match_pref #
_pref_pattern: Pattern[str] = re.compile(r'^user_pref\("(?P<key>(?:[^"]|\\")+)\", (?P<val>.+)\);$');
def check_thunderbird_prefs(result: str, rule: Dict[str, Dict[str, Dict[str, Any]]]):
"""
Args:
result (str): path to result file
rule (Dict[str, Dict[str, Dict[str, Any]]]): dict like
{
"expect": {
str: {
"method": str
"ref": something
}
}
"unexpect": {
str: {
"method": str
"ref": something
}
}
}
Returns:
float
"""
if result is None:
return 0.
expect_rules = rule.get("expect", {})
unexpect_rules = rule.get("unexpect", {})
expect_metrics = {k: False for k in expect_rules}
unexpect_metric = True
with open(result) as f:
for l in f:
match_: Match[str] = _pref_pattern.match(l.strip())
if match_ is None:
continue
key: str = match_.group("key")
#value: str = match_.group("val")
#if value in {"true", "false"}:
#value = value.title()
#value: V = eval(value)
value = json.loads(match_.group("val"))
if key in expect_rules:
logger.debug("K: %s, V: %s", key, repr(value))
expect_metrics[key] = _match_pref(value, expect_rules[key])
elif key in unexpect_rules:
unexpect_metric = unexpect_metric and not _match_pref(value, unexpect_rules[key])
return float(all(expect_metrics.values()) and unexpect_metric)
_value_processor: Callable[[str], str] = lambda val: val.replace("\\\"", "\"").replace("\\\\", "\\")
#_condition_pattern: Pattern[str] = re.compile(r'(?P<type>AND|OR) \((?P<key>[\w ]+),(?P<rel>[\w ' + '\'' + r']+),(?:"(?P<val2>(?:[^"]|\")+)"|(?P<val1>[^)]+))\)')
_condition_pattern: Pattern[str] = re.compile(r'(?:AND|OR) \((?:[\w ]+),(?:[\w ' + '\'' + r']+),(?:"(?:(?:[^"]|\")+)"|(?:[^)]+))\)')
def check_thunderbird_filter(result: str, rules: Dict[str, List[Dict[str, str]]]) -> float:
"""
Args:
result (str): path to filter def file
rules (Dict[str, List[Dict[str, str]]]): dict like
{
"expect": [{key: value}]
"unexpect": [{key: value}]
}
Returns:
float
"""
if result is None:
return 0.
# read filter def file
# a filter:
# {
# "name": "Name",
# "enabled": "yes" | "no",
# "type": "17",
# "action": "Move to folder" | ...,
# "actionValue": ...,
# "condition": [...]
# }
filters: List[Dict[str, Union[str, List[str]]]] = []
with open(result) as f:
for l in f:
if l.startswith("name="):
filter_: Dict[str, Union[str, List[str]]] = {}
filter_["name"] = _value_processor(l[6:-2])
elif l.startswith("enabled="):
filter_["enabled"] = _value_processor(l[9:-2])
elif l.startswith("type="):
filter_["type"] = _value_processor(l[6:-2])
elif l.startswith("action="):
filter_["action"] = _value_processor(l[8:-2])
elif l.startswith("actionValue="):
filter_["actionValue"] = _value_processor(l[13:-2])
elif l.startswith("condition="):
condition_str: str = _value_processor(l[11:-2])
logger.debug("FILTER CONDITION: %s", condition_str)
conditions: List[str] =\
_condition_pattern.findall(condition_str)
logger.debug("FILTER CONDITIONS: %s", repr(conditions))
filter_["condition"] = conditions
logger.debug("FILTER %s", repr(filter_))
filters.append(filter_)
expect_metrics = [False] * len(rules.get("expect", []))
unexpect_metric = True
for flt in filters:
for i, r in enumerate(rules.get("expect", [])):
expect_metrics[i] = expect_metrics[i] or _match_record(r, flt)
unexpect_metric = unexpect_metric and not any(_match_record(r, flt) for r in rules.get("unexpect", []))
return float(all(expect_metrics) and unexpect_metric)
if __name__ == "__main__":
#with sync_playwright() as plwr:
#while True:
##try:
#thunderbird: Browser = plwr.firefox.connect("http://127.0.0.1:6000", timeout=60)
#break
##except:
##pass
#for ctx in thunderbird.contexts:
#for p in ctx.pages:
#print(p.url)
#thunderbird = Marionette()
#print(thunderbird.start_session())
#print(thunderbird.chrome_window_handles)
#print(thunderbird.window_handles)
#print(thunderbird.current_chrome_window_handle)
#thunderbird.set_context(Marionette.CONTEXT_CONTENT)
#print(thunderbird.current_window_handle)
#thunderbird.switch_to_window(thunderbird.chrome_window_handles[0])
#thunderbird.switch_to_default_content()
#thunderbird.switch_to_frame()
#print(thunderbird.get_url())
#print(thunderbird.get_window_type())
#thunderbird.fullscreen()
#print(thunderbird.close())
#registry = pyatspi.Registry.get_default()
#registry
import lxml.etree
from lxml.cssselect import CSSSelector
from lxml.etree import _Element
#xml = "../../任务数据/Thunderbird/vertical-card-view.xml"
xml = "../../任务数据/Thunderbird/vertical-table-view.xml"
at: _Element = lxml.etree.parse(xml)
#xml = "../../任务数据/Thunderbird/vertical-table-view.xml"
#at: _Element = lxml.etree.parse(xml)
#elements: List[_Element] = CSSSelector('application[name=Thunderbird] page-tab-list')(at) # page tab tags
#elements: List[_Element] = CSSSelector('application[name=Thunderbird] panel>scroll-pane>internal-frame>panel[name$="anonym-x2024@outlook.com"]')(at) # email tag page
#elements: List[_Element] = CSSSelector('application[name=Thunderbird] panel>scroll-pane>internal-frame>panel[name$="anonym-x2024@outlook.com"]>section:nth-child(3)')(at) # email tag page
#elements: List[_Element] = CSSSelector('application[name=Thunderbird] panel>scroll-pane>internal-frame>panel[name$="anonym-x2024@outlook.com"]>section[attr|id=threadPane]>section[attr|id="threadTree"]>table[attr|class="tree-table"]>section[attr|class~="tree-table-header"]>table-row>column-header[name=Subject]>push-button', namespaces={"attr": "uri:deskat:attributes.at-spi.gnome.org"})(at) # table view, column header
elements: List[_Element] = CSSSelector('application[name=Thunderbird] panel>scroll-pane>internal-frame>panel[name$="anonym-x2024@outlook.com"]>section[attr|id=threadPane]>section[attr|id="threadTree"]>table[attr|class="tree-table"]>tree>tree-item>section[name="Subject"]>section>section', namespaces={"attr": "uri:deskat:attributes.at-spi.gnome.org"})(at) # table view, column header
print(len(elements))
for elm in elements:
print(lxml.etree.tostring(elm, encoding="unicode", pretty_print=True))
#elements: List[_Element] = CSSSelector('application[name=Thunderbird] panel>scroll-pane>internal-frame>panel[name$="anonym-x2024@outlook.com"]>section[attr|id=threadPane]>section[attr|id="threadTree"]>table[attr|class="tree-table"]>tree>tree-item>section[name="Subject"]>section>section', namespaces={"attr": "uri:deskat:attributes.at-spi.gnome.org"})(at) # table view, column header
#print(len(elements))
#for elm in elements:
#print(lxml.etree.tostring(elm, encoding="unicode", pretty_print=True))
import datetime
import os
import sys
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
file_handler = logging.FileHandler(os.path.join("logs", "normal-{:}.log".format(datetime_str)))
debug_handler = logging.FileHandler(os.path.join("logs", "debug-{:}.log".format(datetime_str)))
stdout_handler = logging.StreamHandler(sys.stdout)
sdebug_handler = logging.FileHandler(os.path.join("logs", "sdebug-{:}.log".format(datetime_str)))
file_handler.setLevel(logging.INFO)
debug_handler.setLevel(logging.DEBUG)
stdout_handler.setLevel(logging.INFO)
sdebug_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s")
file_handler.setFormatter(formatter)
debug_handler.setFormatter(formatter)
stdout_handler.setFormatter(formatter)
sdebug_handler.setFormatter(formatter)
stdout_handler.addFilter(logging.Filter("desktopenv"))
sdebug_handler.addFilter(logging.Filter("desktopenv"))
logger.addHandler(file_handler)
logger.addHandler(debug_handler)
logger.addHandler(stdout_handler)
logger.addHandler(sdebug_handler)
print( check_thunderbird_filter( "../../任务数据/Thunderbird/msgFilterRules.dat"
, { "expect": [ { "enabled": "yes"
, "action": "Move to folder"
, "actionValue": "mailbox://nobody@Local%20Folders/Promotions"
, "condition": ["AND (subject,contains,discount)"]
}
]
}
)
)

View File

@@ -130,6 +130,10 @@ def load_charts(xlsx_file: Workbook, **options) -> Dict[str, Any]:
return chart_set
def _match_record(pattern: Dict[str, Any], item: Dict[str, Any]) -> bool:
return all(k in item and item[k] == val for k, val in pattern.items())
def are_lists_equal(list1, list2, comparison_func):
# First check if both lists have the same length
if len(list1) != len(list2):

View File

@@ -48,7 +48,8 @@ def execute_command():
return jsonify({
'status': 'success',
'output': result.stdout,
'error': result.stderr
'error': result.stderr,
'returncode': result.returncode
})
except Exception as e:
return jsonify({