finish the rest part of chrome examples and verify them on mac arm64
This commit is contained in:
@@ -16,14 +16,21 @@ from .chrome import (
|
||||
get_enable_do_not_track,
|
||||
get_enable_enhanced_safety_browsing,
|
||||
get_new_startup_page,
|
||||
get_find_unpacked_extension_path
|
||||
get_find_unpacked_extension_path,
|
||||
get_data_delete_automacally,
|
||||
get_active_tab_html_parse,
|
||||
get_active_tab_html_parse_accTree,
|
||||
get_active_tab_url_parse,
|
||||
get_gotoRecreationPage_and_get_html_content,
|
||||
get_url_dashPart,
|
||||
get_active_url_from_accessTree
|
||||
)
|
||||
from .file import get_cloud_file, get_vm_file, get_cache_file
|
||||
from .general import get_vm_command_line, get_vm_terminal_output
|
||||
from .gimp import get_gimp_config_file
|
||||
from .impress import get_audio_in_slide
|
||||
from .info import get_vm_screen_size, get_vm_window_size, get_vm_wallpaper, get_list_directory
|
||||
from .misc import get_rule, get_accessibility_tree
|
||||
from .misc import get_rule, get_accessibility_tree, get_rule_relativeTime
|
||||
from .replay import get_replay
|
||||
from .vlc import get_vlc_playing_info, get_vlc_config, get_default_video_player
|
||||
from .vscode import get_vscode_config
|
||||
|
||||
@@ -1,11 +1,27 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
import sqlite3
|
||||
from typing import Dict, Any
|
||||
from typing import Dict, Any, List
|
||||
from pydrive.auth import GoogleAuth
|
||||
from pydrive.drive import GoogleDrive, GoogleDriveFileList, GoogleDriveFile
|
||||
from playwright.sync_api import sync_playwright
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
|
||||
import lxml.etree
|
||||
from lxml.cssselect import CSSSelector
|
||||
from lxml.etree import _Element
|
||||
|
||||
_accessibility_ns_map = {"st": "uri:deskat:state.at-spi.gnome.org"
|
||||
, "attr": "uri:deskat:attributes.at-spi.gnome.org"
|
||||
, "cp": "uri:deskat:component.at-spi.gnome.org"
|
||||
, "doc": "uri:deskat:document.at-spi.gnome.org"
|
||||
, "docattr": "uri:deskat:attributes.document.at-spi.gnome.org"
|
||||
, "txt": "uri:deskat:text.at-spi.gnome.org"
|
||||
, "val": "uri:deskat:value.at-spi.gnome.org"
|
||||
, "act": "uri:deskat:action.at-spi.gnome.org"
|
||||
}
|
||||
|
||||
logger = logging.getLogger("desktopenv.getters.chrome")
|
||||
|
||||
@@ -340,7 +356,58 @@ def get_open_tabs_info(env, config: Dict[str, str]):
|
||||
return tabs_info
|
||||
|
||||
|
||||
def get_active_url_from_accessTree(env, config):
|
||||
"""
|
||||
Playwright cannot get the url of active tab directly,
|
||||
so we need to use accessibility tree to get the active tab info.
|
||||
This function is used to get the active tab url from the accessibility tree.
|
||||
config:
|
||||
Dict[str, str]{
|
||||
'xpath':
|
||||
the same as in metrics.general.accessibility_tree.
|
||||
'selectors':
|
||||
the same as in metrics.general.accessibility_tree.
|
||||
'goto_prefix':
|
||||
the prefix you want to add to the beginning of the url to be opened, default is "https://",
|
||||
(the url we get from accTree does not have prefix)
|
||||
...(other keys, not used in this function)
|
||||
}
|
||||
Return
|
||||
url: str
|
||||
"""
|
||||
accessibility_tree: str = env.controller.get_accessibility_tree()
|
||||
# download accessibility tree to "/home/user/Desktop"
|
||||
logger.debug("AT@eval: %s", accessibility_tree)
|
||||
# first, use accessibility API to get the active tab URL
|
||||
at: _Element = lxml.etree.fromstring(accessibility_tree)
|
||||
if "xpath" in config:
|
||||
elements: List[_Element] = at.xpath(config["xpath"], namespaces=_accessibility_ns_map)
|
||||
elif "selectors" in config:
|
||||
selector = CSSSelector(", ".join(config["selectors"]), namespaces=_accessibility_ns_map)
|
||||
elements: List[_Element] = selector(at)
|
||||
else:
|
||||
raise ValueError("At least one of xpath and selectors is required")
|
||||
|
||||
if len(elements) == 0:
|
||||
print("no elements found")
|
||||
return 0.
|
||||
active_tab_url = config["goto_prefix"]+elements[0].text if "goto_prefix" in config.keys() else "https://" + elements[0].text
|
||||
print("active tab url now: {}".format(active_tab_url))
|
||||
return active_tab_url
|
||||
|
||||
|
||||
def get_active_tab_info(env, config: Dict[str, str]):
|
||||
"""
|
||||
This function is used to get all info about active tab.
|
||||
Warning! This function will reload the target-url page
|
||||
If the tartget url has cache or cookie, this function may reload to another page.
|
||||
If you have tested the url will not pop up to another page (check in incongnito mode yourself first),
|
||||
you can use this function.
|
||||
config: Dict[str, str]{
|
||||
# Keys used in get_active_url_from_accessTree: "xpath", "selectors"
|
||||
}
|
||||
"""
|
||||
active_tab_url = get_active_url_from_accessTree(env, config)
|
||||
host = env.vm_ip
|
||||
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
||||
|
||||
@@ -348,24 +415,21 @@ def get_active_tab_info(env, config: Dict[str, str]):
|
||||
with sync_playwright() as p:
|
||||
# connect to remote Chrome instance
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
|
||||
active_tab_info = {}
|
||||
for context in browser.contexts:
|
||||
for page in context.pages:
|
||||
if page.is_visible("body"): # check the visibility of the page body to determine the active status
|
||||
active_tab_info = {
|
||||
'title': page.title(),
|
||||
'url': page.url,
|
||||
'content': page.content() # get the HTML content of the page
|
||||
}
|
||||
break
|
||||
if active_tab_info:
|
||||
break
|
||||
# go to the target URL page
|
||||
page = browser.new_page()
|
||||
page.goto(active_tab_url)
|
||||
page.wait_for_load_state('load') # Wait for the 'load' event to complete
|
||||
active_tab_info = {
|
||||
'title': page.title(),
|
||||
'url': page.url,
|
||||
'content': page.content() # get the HTML content of the page
|
||||
}
|
||||
|
||||
browser.close()
|
||||
print("active_tab_title: {}".format(active_tab_info.get('title', 'None')))
|
||||
print("active_tab_url: {}".format(active_tab_info.get('url', 'None')))
|
||||
print("active_tab_content: {}".format(active_tab_info.get('content', 'None')))
|
||||
# print("active_tab_title: {}".format(active_tab_info.get('title', 'None')))
|
||||
# print("active_tab_url: {}".format(active_tab_info.get('url', 'None')))
|
||||
# print("active_tab_content: {}".format(active_tab_info.get('content', 'None')))
|
||||
return active_tab_info
|
||||
|
||||
|
||||
@@ -648,4 +712,209 @@ def get_find_unpacked_extension_path(env, config: Dict[str, str]):
|
||||
return all_extensions_path
|
||||
except Exception as e:
|
||||
logger.error(f"Error: {e}")
|
||||
return "Google"
|
||||
return "Google"
|
||||
|
||||
|
||||
def get_data_delete_automacally(env, config: Dict[str, str]):
|
||||
"""
|
||||
This function is used to open th "auto-delete" mode of chromium
|
||||
"""
|
||||
os_type = env.vm_platform
|
||||
if os_type == 'Windows':
|
||||
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
|
||||
'Google\\Chrome\\User Data\\Default\\Preferences'))""")['output'].strip()
|
||||
elif os_type == 'Darwin':
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
# preference_file_path = env.controller.execute_python_command(
|
||||
# "import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
||||
# 'output'].strip()
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
try:
|
||||
content = env.controller.get_file(preference_file_path)
|
||||
data = json.loads(content)
|
||||
data_delete_state = data["profile"]["exit_type"]
|
||||
return data_delete_state
|
||||
except Exception as e:
|
||||
logger.error(f"Error: {e}")
|
||||
return "Google"
|
||||
|
||||
|
||||
def get_active_tab_html_parse(env, config: Dict[str, Any]):
|
||||
"""
|
||||
This function is used to get the specific element's text content from the active tab's html.
|
||||
config:
|
||||
Dict[str, str]{
|
||||
# Keys used in get_active_url_from_accessTree: "xpath", "selectors"
|
||||
'category':
|
||||
choose from ["class", "label", "xpath", "input"], used to indicate how to find the element
|
||||
'labelObject':
|
||||
only exists when category is "label",
|
||||
a dict like { "labelSelector": "the key you want to store the text content of this label's ee=lement"}
|
||||
'class_singleObject':
|
||||
only exists when category is "class", a dict with keys as the class name,
|
||||
like { "class name" : "the key you want to store the text content of this element" }
|
||||
'class_multiObject':
|
||||
only exists when category is "class", used for elements with same class name.
|
||||
Two layer of dict, like
|
||||
( {
|
||||
"class name": {
|
||||
"rank in this class" : "the key you want to store the text content of this element"
|
||||
...
|
||||
}
|
||||
} )
|
||||
'xpathObject':
|
||||
only exists when category is "xpath", a dict with keys as the xpath,
|
||||
like { "full xpath" : "the key you want to store the text content of this element" }
|
||||
'inputObject':
|
||||
only exists when category is "input",
|
||||
a dict with keys as the input element's xpath, like { "full xpath" : "the key you want to store the text content of this element" }
|
||||
}
|
||||
"""
|
||||
active_tab_url = get_active_url_from_accessTree(env, config)
|
||||
host = env.vm_ip
|
||||
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
||||
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
with sync_playwright() as p:
|
||||
# connect to remote Chrome instance
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
target_page = None
|
||||
for context in browser.contexts:
|
||||
for page in context.pages:
|
||||
page.wait_for_load_state("load")
|
||||
if page.url == active_tab_url:
|
||||
target_page = page
|
||||
break
|
||||
return_json = {}
|
||||
if config["category"] == "class":
|
||||
#find the text of elements in html with specific class name
|
||||
class_multiObject = config["class_multiObject"]
|
||||
for key in class_multiObject.keys():
|
||||
object_dict = class_multiObject[key]
|
||||
for order_key in object_dict.keys():
|
||||
return_json[object_dict[order_key]] = target_page.query_selector_all("."+key)[int(order_key)].text_content().strip()
|
||||
class_singleObject = config["class_singleObject"]
|
||||
for key in class_singleObject.keys():
|
||||
return_json[class_singleObject[key]] = target_page.query_selector("."+key).text_content().strip()
|
||||
elif config['category'] == "label":
|
||||
#find the text of elements in html with specific label name
|
||||
labelObject = config["labelObject"]
|
||||
for key in labelObject.keys():
|
||||
return_json[labelObject[key]] = target_page.get_by_label(key).text_content().strip()
|
||||
elif config["category"] == "xpath":
|
||||
#find the text of elements in html with specific xpath
|
||||
xpathObject = config["xpathObject"]
|
||||
for key in xpathObject.keys():
|
||||
return_json[xpathObject[key]] = target_page.locator("xpath="+key).text_content().strip()
|
||||
elif config["category"] == "input":
|
||||
inputObject = config["inputObject"]
|
||||
for key in inputObject.keys():
|
||||
return_json[inputObject[key]] = target_page.locator("xpath="+key).input_value().strip()
|
||||
browser.close()
|
||||
return return_json
|
||||
|
||||
|
||||
def get_gotoRecreationPage_and_get_html_content(env, config: Dict[str, Any]):
|
||||
"""
|
||||
especially used for www.recreation.gov examples
|
||||
"""
|
||||
host = env.vm_ip
|
||||
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
||||
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
page = browser.new_page()
|
||||
page.goto("https://www.recreation.gov/")
|
||||
page.fill("input#hero-search-input", "Albion Basin")
|
||||
page.click("button.nav-search-button")
|
||||
print("after first click")
|
||||
time.sleep(2)
|
||||
# Assuming .search-result-highlight--success leads to a new page or requires page load
|
||||
with page.expect_popup() as popup_info:
|
||||
page.click(".search-result-highlight--success")
|
||||
print("after second click")
|
||||
newpage = popup_info.value
|
||||
newpage.wait_for_load_state()
|
||||
print("go to newpage: ")
|
||||
print(newpage.title())
|
||||
time.sleep(2)
|
||||
newpage.click("button.next-available")
|
||||
print("after third click")
|
||||
|
||||
|
||||
return_json = {}
|
||||
return_json["expected"]={}
|
||||
#find the text of elements in html with specific class name
|
||||
if config["selector"] == "class":
|
||||
if "order" in config.keys():
|
||||
className = config["class"]
|
||||
return_json["expected"][className]=newpage.query_selector_all("."+className)[int(config["order"])].text_content().strip()
|
||||
else:
|
||||
className = config["class"]
|
||||
return_json["expected"][className] = newpage.query_selector("."+className).text_content().strip()
|
||||
browser.close()
|
||||
return return_json
|
||||
|
||||
|
||||
def get_active_tab_url_parse(env, config:Dict[str, Any]):
|
||||
"""
|
||||
This function is used to parse the url according to config["parse_keys"].
|
||||
config:
|
||||
'parse_keys': must exist,
|
||||
a list of keys to extract from the query parameters of the url.
|
||||
'replace': optional,
|
||||
a dict, used to replace the original key with the new key.
|
||||
( { "original key": "new key" } )
|
||||
"""
|
||||
active_tab_url = get_active_url_from_accessTree(env, config)
|
||||
|
||||
# connect to remote Chrome instance
|
||||
# parse in a hard-coded way to find the specific info about task
|
||||
parsed_url = urlparse(active_tab_url)
|
||||
# Extract the query parameters
|
||||
query_params = parse_qs(parsed_url.query)
|
||||
# Define the keys of interest
|
||||
keys_of_interest = [key for key in config["parse_keys"]]
|
||||
# Extract the parameters of interest
|
||||
extracted_params = {key: query_params.get(key, [''])[0] for key in keys_of_interest}
|
||||
if "replace" in config:
|
||||
for key in config["replace"].keys():
|
||||
# change original key to new key, keep value unchange
|
||||
value = extracted_params.pop(key)
|
||||
extracted_params[config["replace"][key]] = value
|
||||
return extracted_params
|
||||
|
||||
|
||||
def get_url_dashPart(env, config: Dict[str, str]):
|
||||
"""
|
||||
This function is used to extract one of the dash-separated part of the URL.
|
||||
config
|
||||
'partIndex': must exist,
|
||||
the index of the dash-separated part to extract, starting from 0.
|
||||
'needDeleteId': optional,
|
||||
a boolean, used to indicate whether to delete the "id" part ( an example: "/part-you-want?id=xxx" )
|
||||
'returnType': must exist,
|
||||
a string, used to indicate the return type, "string" or "json".
|
||||
"""
|
||||
active_tab_url = get_active_url_from_accessTree(env, config)
|
||||
|
||||
# extract the last dash-separated part of the URL, and delete all the characters after "id"
|
||||
dash_part = active_tab_url.split("/")[config["partIndex"]]
|
||||
if config["needDeleteId"]:
|
||||
dash_part = dash_part.split("?")[0]
|
||||
# print("active_tab_title: {}".format(active_tab_info.get('title', 'None')))
|
||||
# print("active_tab_url: {}".format(active_tab_info.get('url', 'None')))
|
||||
# print("active_tab_content: {}".format(active_tab_info.get('content', 'None')))
|
||||
if config["returnType"] == "string":
|
||||
return dash_part
|
||||
elif config["returnType"] == "json":
|
||||
return {config["key"]: dash_part}
|
||||
|
||||
@@ -1,10 +1,78 @@
|
||||
import logging
|
||||
from typing import TypeVar
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
logger = logging.getLogger("desktopenv.getters.misc")
|
||||
|
||||
R = TypeVar("Rule")
|
||||
|
||||
day_of_week_mapping = {
|
||||
0: 'Mon',
|
||||
1: 'Tue',
|
||||
2: 'Wed',
|
||||
3: 'Thu',
|
||||
4: 'Fri',
|
||||
5: 'Sat',
|
||||
6: 'Sun'
|
||||
}
|
||||
|
||||
month_mapping = {
|
||||
1: 'Jan',
|
||||
2: 'Feb',
|
||||
3: 'Mar',
|
||||
4: 'Apr',
|
||||
5: 'May',
|
||||
6: 'Jun',
|
||||
7: 'Jul',
|
||||
8: 'Aug',
|
||||
9: 'Sep',
|
||||
10: 'Oct',
|
||||
11: 'Nov',
|
||||
12: 'Dec'
|
||||
}
|
||||
|
||||
Month_Mapping_Full = {
|
||||
1: "January",
|
||||
2: "February",
|
||||
3: "March",
|
||||
4: "April",
|
||||
5: "May",
|
||||
6: "June",
|
||||
7: "July",
|
||||
8: "August",
|
||||
9: "September",
|
||||
10: "October",
|
||||
11: "November",
|
||||
12: "December"
|
||||
}
|
||||
|
||||
month_mapping_full = {
|
||||
1: 'january',
|
||||
2: 'february',
|
||||
3:'march',
|
||||
4: 'april',
|
||||
5:'may',
|
||||
6: 'june',
|
||||
7: 'july',
|
||||
8: 'august',
|
||||
9:'september',
|
||||
10: 'october',
|
||||
11: 'november',
|
||||
12: 'december'
|
||||
}
|
||||
|
||||
relativeTime_to_IntDay = {
|
||||
"tomorrow": 1,
|
||||
"5th next month": "special",
|
||||
"10th next month": "special",
|
||||
"11th next month": "special",
|
||||
"this month": "special",
|
||||
"this Saturday": "special",
|
||||
"this Sunday": "special",
|
||||
"next Monday": "special",
|
||||
"next Friday": "special",
|
||||
"first monday four months later": "special"
|
||||
}
|
||||
|
||||
def get_rule(env, config: R) -> R:
|
||||
"""
|
||||
@@ -12,6 +80,113 @@ def get_rule(env, config: R) -> R:
|
||||
"""
|
||||
return config["rules"]
|
||||
|
||||
def get_rule_relativeTime(env, config: R) -> R:
|
||||
"""
|
||||
According to the rule definded in funciton "apply_rules_to_timeFormat", convert the relative time to absolute time.
|
||||
config:
|
||||
'relativeTime': {
|
||||
"from": must exist; indicates the relativeTime.
|
||||
"to": optional; indicates the relativeTime.
|
||||
}
|
||||
If relativeTime only has key "from", then the key of time in "expected" dict must be "time".
|
||||
If relativeTime has key "to", then the key of time in "expected" dict must be "from" and "to".
|
||||
"""
|
||||
relativeRules = config["rules"]
|
||||
relativeTime = relativeRules["relativeTime"] # int, "+" means future, "-" means past
|
||||
# get the date now
|
||||
now = datetime.now()
|
||||
# calculate the relative time
|
||||
if "to" not in relativeTime.keys():
|
||||
start_relative_time = relativeTime["from"]
|
||||
if relativeTime_to_IntDay[start_relative_time] != "special":
|
||||
# relativeTime can be represented by actual int days
|
||||
start_relative_time_IntDat = relativeTime_to_IntDay[start_relative_time]
|
||||
timediff = timedelta(days=start_relative_time_IntDat)
|
||||
absoluteDay = now + timediff
|
||||
else:
|
||||
# special case, you can add more special cases here
|
||||
if start_relative_time == "5th next month":
|
||||
next_year = now.year + 1 if now.month == 12 else now.year
|
||||
next_month = now.month + 1 if now.month < 12 else 1
|
||||
next_day = 5
|
||||
absoluteDay = datetime(next_year, next_month, next_day)
|
||||
elif start_relative_time == "10th next month":
|
||||
next_year = now.year + 1 if now.month == 12 else now.year
|
||||
next_month = now.month + 1 if now.month < 12 else 1
|
||||
next_day = 10
|
||||
absoluteDay = datetime(next_year, next_month, next_day)
|
||||
elif start_relative_time == "this month":
|
||||
absoluteDay = now
|
||||
elif start_relative_time == "next Monday":
|
||||
absoluteDay = now + timedelta(days=((6-now.weekday())+1))
|
||||
elif start_relative_time == "first monday four months later":
|
||||
next_year = now.year + 1 if now.month >=9 else now.year
|
||||
next_month = (now.month + 4)%12
|
||||
# get the first monday of the next_month
|
||||
temp_date = datetime(next_year, next_month, 1)
|
||||
absoluteDay = temp_date + timedelta(days=((6-temp_date.weekday())+1)%7)
|
||||
regular_time = apply_rules_to_timeFormat(relativeRules["expected"]["time"], absoluteDay)
|
||||
config["rules"]["expected"]["time"] = regular_time
|
||||
|
||||
else:
|
||||
from_time = relativeTime["from"]
|
||||
to_time = relativeTime["to"]
|
||||
# deal with from_time first
|
||||
if relativeTime_to_IntDay[from_time] != "special":
|
||||
from_time_IntDat = relativeTime_to_IntDay[from_time]
|
||||
from_timediff = timedelta(days=from_time_IntDat)
|
||||
from_absoluteDay = now + from_timediff
|
||||
else:
|
||||
if from_time == "this Saturday":
|
||||
from_absoluteDay = now + timedelta(days=(5-now.weekday()))
|
||||
elif from_time == "10th next month":
|
||||
next_year = now.year + 1 if now.month == 12 else now.year
|
||||
next_month = now.month + 1 if now.month < 12 else 1
|
||||
next_day = 10
|
||||
from_absoluteDay = datetime(next_year, next_month, next_day)
|
||||
elif from_time == "next Monday":
|
||||
from_absoluteDay = now + timedelta(days=((6-now.weekday())+1))
|
||||
else:
|
||||
pass # more rules here
|
||||
regular_from_time = apply_rules_to_timeFormat(relativeRules["expected"]["from"], from_absoluteDay)
|
||||
config["rules"]["expected"]["from"] = regular_from_time
|
||||
|
||||
# deal with to_time
|
||||
if relativeTime_to_IntDay[to_time] != "special":
|
||||
to_time_IntDat = relativeTime_to_IntDay[to_time]
|
||||
to_timediff = timedelta(days=to_time_IntDat)
|
||||
to_absoluteDay = now + to_timediff
|
||||
else:
|
||||
if to_time == "this Sunday":
|
||||
to_absoluteDay = now + timedelta(days=(6-now.weekday()))
|
||||
elif to_time == "11th next month":
|
||||
next_year = now.year + 1 if now.month == 12 else now.year
|
||||
next_month = now.month + 1 if now.month < 12 else 1
|
||||
next_day = 11
|
||||
to_absoluteDay = datetime(next_year, next_month, next_day)
|
||||
elif to_time == "next Friday":
|
||||
to_absoluteDay = now + timedelta(days=((4-now.weekday()) if now.weekday() < 4 else (6-now.weekday()) + 5))
|
||||
else:
|
||||
pass # more rules here
|
||||
regular_to_time = apply_rules_to_timeFormat(relativeRules["expected"]["to"], to_absoluteDay)
|
||||
config["rules"]["expected"]["to"] = regular_to_time
|
||||
|
||||
return config["rules"]
|
||||
|
||||
|
||||
def apply_rules_to_timeFormat(timeFormat: str, absoluteDay: datetime):
|
||||
timeFormat = timeFormat.replace("{DoW}", day_of_week_mapping[absoluteDay.weekday()], 1)
|
||||
timeFormat = timeFormat.replace("{Month}", month_mapping[absoluteDay.month], 1)
|
||||
timeFormat = timeFormat.replace("{DayD}", str(absoluteDay.day), 1)
|
||||
timeFormat = timeFormat.replace("{Year}", str(absoluteDay.year), 1)
|
||||
timeFormat = timeFormat.replace("{Month0D}", "0"+str(absoluteDay.month) if absoluteDay.month < 10 else str(absoluteDay.month), 1)
|
||||
timeFormat = timeFormat.replace("{month}", month_mapping_full[absoluteDay.month], 1)
|
||||
timeFormat = timeFormat.replace("{MonthFull}", Month_Mapping_Full[absoluteDay.month], 1)
|
||||
timeFormat = timeFormat.replace("{Day0D}", "0"+str(absoluteDay.day) if absoluteDay.day < 10 else str(absoluteDay.day), 1)
|
||||
# you can add other replace rules here
|
||||
|
||||
return timeFormat
|
||||
|
||||
|
||||
def get_accessibility_tree(env, *args) -> str:
|
||||
accessibility_tree: str = env.controller.get_accessibility_tree()
|
||||
|
||||
@@ -16,7 +16,8 @@ from .chrome import (
|
||||
check_enabled_experiments,
|
||||
check_history_deleted,
|
||||
is_expected_search_query,
|
||||
is_expected_active_tab
|
||||
is_expected_active_tab,
|
||||
is_expected_url_pattern_match
|
||||
)
|
||||
from .docs import (
|
||||
compare_font_names,
|
||||
@@ -54,7 +55,8 @@ from .general import (
|
||||
exact_match,
|
||||
is_in_list,
|
||||
fuzzy_match,
|
||||
check_include_exclude
|
||||
check_include_exclude,
|
||||
check_direct_json_object
|
||||
)
|
||||
from .gimp import (
|
||||
check_brightness_decrease_and_structure_sim,
|
||||
|
||||
@@ -25,6 +25,27 @@ def is_expected_active_tab(active_tab_info: Dict[str, str], rule: Dict[str, Any]
|
||||
logger.error(f"Unknown type: {match_type}")
|
||||
return 0
|
||||
|
||||
# rules[expected] is a string-formatted regex
|
||||
def is_expected_url_pattern_match(result, rules) -> float:
|
||||
"""
|
||||
This function is used to search the expected pattern in the url using regex.
|
||||
result is the return value of function "activte_tab_info" or return value of function "get_active_url_from_accessTree"
|
||||
"""
|
||||
if type(result)== dict:
|
||||
result_url = result["url"]
|
||||
print("result url: {}".format(result_url))
|
||||
else:
|
||||
result_url = result
|
||||
# expect_regex = re.compile(rules["expected"])
|
||||
patterns = rules["expected"]
|
||||
print("expected_regex: {}".format(patterns))
|
||||
for pattern in patterns:
|
||||
match = re.search(pattern, result_url)
|
||||
print(match)
|
||||
if not match:
|
||||
return 0.
|
||||
return 1.
|
||||
|
||||
def is_expected_tabs(open_tabs: List[Dict[str, str]], rule: Dict[str, Any]) -> float:
|
||||
"""
|
||||
Checks if the expected tabs are open in Chrome.
|
||||
|
||||
@@ -44,7 +44,9 @@ def is_in_list(result, rules) -> float:
|
||||
return 1.
|
||||
else:
|
||||
return 0.
|
||||
|
||||
|
||||
|
||||
|
||||
def fuzzy_match(result, rules) -> float:
|
||||
expect = rules["expected"]
|
||||
|
||||
@@ -135,7 +137,7 @@ def check_accessibility_tree(result: str, rules: Dict[str, Any]) -> float:
|
||||
needed. If both are present, `xpath` takes the priority.
|
||||
"text": str as the expected text content of the selected element.
|
||||
"exact": bool specifying whether exact match or fuzzy match should
|
||||
be performed. defaults to True
|
||||
be performed. defaults to True.
|
||||
}
|
||||
|
||||
Returns:
|
||||
@@ -152,6 +154,7 @@ def check_accessibility_tree(result: str, rules: Dict[str, Any]) -> float:
|
||||
raise ValueError("At least one of xpath and selectors is required")
|
||||
|
||||
if len(elements) == 0:
|
||||
print("no elements")
|
||||
return 0.
|
||||
|
||||
if "text" in rules:
|
||||
@@ -217,3 +220,19 @@ def check_json(result: str, rules: Dict[str, List[Dict[str, Union[List[str], str
|
||||
value = value[k]
|
||||
metric = metric and not _match_value_to_rule(value, r)
|
||||
return metric
|
||||
|
||||
|
||||
def check_direct_json_object(result, rules)->float:
|
||||
"""
|
||||
One of the most commonly used function to evalute.
|
||||
Compare two json objects directly.
|
||||
"""
|
||||
print("result: ")
|
||||
print(result)
|
||||
print("expected: ")
|
||||
print(rules["expected"])
|
||||
expected_json = rules["expected"]
|
||||
for key in expected_json.keys():
|
||||
if expected_json[key] != result[key]:
|
||||
return 0.
|
||||
return 1.0
|
||||
Reference in New Issue
Block a user