fix chrome tasks (#230)

* fix chrome

* fix: fix proxy setup

* feat&fix: add proxy support in setup and remove hardcoded proxy from example

* fix tasks

* fix chrome finished

* fix

* clean chrome_fix code

* clean chrome_fix code

---------

Co-authored-by: adlsdztony <zzl0712@connect.hku.hk>
This commit is contained in:
Yuan Mengqi
2025-07-03 21:32:41 +08:00
committed by GitHub
parent bdaf37e0e5
commit b2fb8b4222
26 changed files with 444 additions and 140 deletions

View File

@@ -25,7 +25,8 @@ from .chrome import (
get_url_dashPart,
get_active_url_from_accessTree,
get_find_installed_extension_name,
get_info_from_website
get_info_from_website,
get_url_path_parse
)
from .file import get_cloud_file, get_vm_file, get_cache_file, get_content_from_vm_file
from .general import get_vm_command_line, get_vm_terminal_output, get_vm_command_error

View File

@@ -1181,7 +1181,7 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
], "shell": False})
headers = {"Content-Type": "application/json"}
requests.post("http://" + host + ":" + server_port + "/setup" + "/launch", headers=headers, data=payload)
requests.post("http://" + host + ":" + str(server_port) + "/setup" + "/launch", headers=headers, data=payload)
time.sleep(5)
browser = p.chromium.connect_over_cdp(remote_debugging_url)
target_page = None
@@ -1204,6 +1204,32 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
elements = target_page.query_selector_all(selector)
return [element.text_content().strip() for element in elements if element]
def safely_get_direct_text_nodes_playwright(selector):
"""
Extract all direct text node contents under the specified selector element (excluding text inside child div, span, etc.).
Returns a list of lists, each sublist contains the direct text nodes of one element.
Suitable for structures like: <div>SEA<div class="aura-separator"></div>NYC</div>
"""
elements = target_page.query_selector_all(selector)
results = []
for element in elements:
texts = element.evaluate('''
(node) => Array.from(node.childNodes)
.filter(n => n.nodeType === Node.TEXT_NODE)
.map(n => n.textContent.trim())
.filter(Boolean)
''')
results.append(texts)
return results[0]
def safely_get_direct_li_playwright(selector):
elements = target_page.query_selector_all(selector + " li.catAllProducts")
return [element.query_selector('span').inner_text().strip() for element in elements if element.query_selector('span')]
def safely_get_only_child_text_content(selector):
elements = target_page.query_selector_all(selector)
return [element.query_selector('h3').text_content().strip() for element in elements if element.query_selector('h3')]
if config["category"] == "class":
class_multiObject = config.get("class_multiObject", {})
for class_name, object_dict in class_multiObject.items():
@@ -1212,6 +1238,41 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
index = int(order_key)
if len(elements_texts) > index:
return_json[key] = elements_texts[index]
class_multiObject_child = config.get("class_multiObject_child", {})
for class_name, object_dict in class_multiObject_child.items():
elements_texts = safely_get_direct_text_nodes_playwright("." + class_name)
for order_key, key in object_dict.items():
index = int(order_key)
if len(elements_texts) > index:
return_json[key] = elements_texts[index]
class_multiObject_only_child = config.get("class_multiObject_only_child", {})
for class_name, object_dict in class_multiObject_only_child.items():
elements_texts = safely_get_only_child_text_content("." + class_name)
for order_key, key in object_dict.items():
index = int(order_key)
if len(elements_texts) > index:
return_json[key] = elements_texts[index]
class_multiObject_search_exist = config.get("class_multiObject_search_exist", {})
for class_name, object_list in class_multiObject_search_exist.items():
elements_texts = safely_get_text_content("." + class_name)
for each_object in object_list:
if each_object == "is_other_exist":
continue
if each_object in elements_texts:
return_json[each_object] = True
else:
return_json[each_object] = False
if "is_other_exist" in object_list:
for each_element in elements_texts:
if each_element not in object_list:
return_json["is_other_exist"] = True
break
if "is_other_exist" not in return_json.keys():
return_json["is_other_exist"] = False
class_singleObject = config.get("class_singleObject", {})
for class_name, key in class_singleObject.items():
@@ -1240,6 +1301,55 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
inputs = target_page.locator(f"xpath={xpath}")
if inputs.count() > 0:
return_json[key] = inputs.first.input_value().strip()
elif config["category"] == "class&url":
class_multiObject = config.get("class_multiObject", {})
for class_name, object_list in class_multiObject.items():
elements_texts = safely_get_text_content("." + class_name)
for each_key in object_list:
if any(each_key.lower() == text.lower() for text in elements_texts):
return_json[each_key.lower()] = True
for each_key in elements_texts:
# each_key.lower() not in object_list.lower():
if all(each_key.lower() not in item.lower() for item in object_list):
return_json["is_other_exist"] = True
break
if "is_other_exist" not in return_json.keys():
return_json["is_other_exist"] = False
class_multiObject_li = config.get("class_multiObject_li", {})
for class_name, object_list in class_multiObject_li.items():
elements_texts = safely_get_direct_li_playwright("." + class_name)
for each_key in object_list:
if any(each_key.lower() == text.lower() for text in elements_texts):
return_json[each_key.lower()] = True
for each_key in elements_texts:
# each_key.lower() not in object_list.lower():
if all(each_key.lower() not in item.lower() for item in object_list):
return_json["is_other_exist"] = True
break
if "is_other_exist" not in return_json.keys():
return_json["is_other_exist"] = False
url_include_expected = config.get("url_include_expected", [])
for key in url_include_expected:
if key.lower() in target_page.url.lower():
if key.lower() not in return_json.keys():
return_json[key.lower()] = True
else:
if key.lower() not in return_json.keys():
return_json[key.lower()] = False
url_include_expected_multichoice = config.get("url_include_expected_multichoice", {})
for key, value in url_include_expected_multichoice.items():
if key.lower() in target_page.url.lower():
if value.lower() not in return_json.keys():
return_json[value.lower()] = True
else:
if value.lower() not in return_json.keys():
return_json[value.lower()] = False
browser.close()
return return_json
@@ -1278,13 +1388,14 @@ def get_gotoRecreationPage_and_get_html_content(env, config: Dict[str, Any]):
browser = p.chromium.connect_over_cdp(remote_debugging_url)
page = browser.new_page()
page.goto("https://www.recreation.gov/")
page.fill("input#hero-search-input", "Albion Basin")
page.fill("input#hero-search-input", "Diamond")
page.click("button.nav-search-button")
print("after first click")
time.sleep(2)
time.sleep(10)
# Assuming .search-result-highlight--success leads to a new page or requires page load
with page.expect_popup() as popup_info:
page.click(".search-result-highlight--success")
time.sleep(30)
print("after second click")
newpage = popup_info.value
newpage.wait_for_load_state()
@@ -1337,6 +1448,8 @@ def get_active_tab_url_parse(env, config: Dict[str, Any]):
# change original key to new key, keep value unchange
value = extracted_params.pop(key)
extracted_params[config["replace"][key]] = value
if config.get("split_list", False):
extracted_params = {key: extracted_params[key].split(',') for key in extracted_params.keys()}
return extracted_params
@@ -1366,3 +1479,57 @@ def get_url_dashPart(env, config: Dict[str, str]):
return dash_part
elif config["returnType"] == "json":
return {config["key"]: dash_part}
def get_url_path_parse(env, config: Dict[str, str]):
"""
Parse Macy's product url path, extract:
- mens_clothing: true if 'mens-clothing' in path, else None
- t_shirts: true if any key 'Top_style' or 'Product_department' value is 'T-shirts', else None
- Men_regular_size_t, Price_discount_range (as list), Sleeve_length: as before, None if not found
All fields are None if not found for robustness.
"""
from urllib.parse import urlparse, unquote
result = {}
# 1. Parse URL
active_tab_url = get_active_url_from_accessTree(env, config)
if active_tab_url is None:
return None
parsed = urlparse(active_tab_url)
path = unquote(parsed.path)
result = {}
# mens_clothing
result['mens_clothing'] = True if 'mens-clothing' in path else None
# key-value
path_parts = path.strip('/').split('/')
key_value_json = {}
tshirts_flag = False
if "mens-t-shirts" in path:
tshirts_flag = True
for i in range(len(path_parts)-1):
if ',' in path_parts[i] and ',' in path_parts[i+1]:
keys = [k.strip() for k in path_parts[i].split(',')]
values = [v.strip() for v in path_parts[i+1].split(',')]
for k, v in zip(keys, values):
if k == "Price_discount_range":
key_value_json[k] = [item.strip() for item in v.split('|')] if v else None
else:
key_value_json[k] = v if v else None
if (k == 'Top_style' or k == 'Product_department') and (v == 'T-shirts' or v == 'T-Shirts' or v == 'T-Shirt'):
tshirts_flag = True
break
for field in ['Men_regular_size_t', 'Price_discount_range', 'Sleeve_length']:
if field not in key_value_json:
key_value_json[field] = None
result['t_shirts'] = tshirts_flag if tshirts_flag else None
# parse_keys
for key in config["parse_keys"]:
if key in key_value_json:
if key == "Price_discount_range":
if '50_PERCENT_ off & more' in key_value_json[key] and not '30_PERCENT_ off & more' in key_value_json[key] and not '20_PERCENT_ off & more' in key_value_json[key]:
result[key] = '50_PERCENT_ off & more'
else:
result[key] = 'not_50_PERCENT_ off & more'
else:
result[key] = key_value_json[key]
return result

View File

@@ -71,7 +71,10 @@ relativeTime_to_IntDay = {
"this Sunday": "special",
"next Monday": "special",
"next Friday": "special",
"first monday four months later": "special"
"first monday four months later": "special",
"first monday eight months later": "special",
"next Monday split": "special",
"next Friday split": "special"
}
def get_rule(env, config: Dict[str, R]) -> R:
@@ -125,6 +128,12 @@ def get_rule_relativeTime(env, config: Dict[str, R]) -> R:
# get the first monday of the next_month
temp_date = datetime(next_year, next_month, 1)
absoluteDay = temp_date + timedelta(days=((6-temp_date.weekday())+1)%7)
elif start_relative_time == "first monday eight months later":
next_year = now.year + 1 if now.month >= 5 else now.year
next_month = (now.month + 8)%12
# get the first monday of the next_month
temp_date = datetime(next_year, next_month, 1)
absoluteDay = temp_date + timedelta(days=((6-temp_date.weekday())+1)%7)
regular_time = apply_rules_to_timeFormat(relativeRules["expected"]["time"], absoluteDay)
config["rules"]["expected"]["time"] = regular_time
@@ -144,12 +153,20 @@ def get_rule_relativeTime(env, config: Dict[str, R]) -> R:
next_month = now.month + 1 if now.month < 12 else 1
next_day = 10
from_absoluteDay = datetime(next_year, next_month, next_day)
elif from_time == "next Monday":
elif from_time == "next Monday" or from_time == "next Monday split":
from_absoluteDay = now + timedelta(days=((6-now.weekday())+1))
else:
pass # more rules here
regular_from_time = apply_rules_to_timeFormat(relativeRules["expected"]["from"], from_absoluteDay)
config["rules"]["expected"]["from"] = regular_from_time
if from_time == "next Monday split":
puday = apply_rules_to_timeFormat(relativeRules["expected"]["puDay"], from_absoluteDay)
config["rules"]["expected"]["puDay"] = puday
pumonth = apply_rules_to_timeFormat(relativeRules["expected"]["puMonth"], from_absoluteDay)
config["rules"]["expected"]["puMonth"] = pumonth
puyear = apply_rules_to_timeFormat(relativeRules["expected"]["puYear"], from_absoluteDay)
config["rules"]["expected"]["puYear"] = puyear
else:
regular_from_time = apply_rules_to_timeFormat(relativeRules["expected"]["from"], from_absoluteDay)
config["rules"]["expected"]["from"] = regular_from_time
# deal with to_time
if relativeTime_to_IntDay[to_time] != "special":
@@ -164,15 +181,23 @@ def get_rule_relativeTime(env, config: Dict[str, R]) -> R:
next_month = now.month + 1 if now.month < 12 else 1
next_day = 11
to_absoluteDay = datetime(next_year, next_month, next_day)
elif to_time == "next Friday":
elif to_time == "next Friday" or to_time == "next Friday split":
if now.weekday() < 4 and from_time in ["next Monday"]:
to_absoluteDay = now + timedelta(days=((4-now.weekday())+7))
else:
to_absoluteDay = now + timedelta(days=((4-now.weekday()) if now.weekday() < 4 else (6-now.weekday()) + 5))
else:
pass # more rules here
regular_to_time = apply_rules_to_timeFormat(relativeRules["expected"]["to"], to_absoluteDay)
config["rules"]["expected"]["to"] = regular_to_time
if to_time == "next Friday split":
to_day = apply_rules_to_timeFormat(relativeRules["expected"]["doDay"], to_absoluteDay)
config["rules"]["expected"]["doDay"] = to_day
to_month = apply_rules_to_timeFormat(relativeRules["expected"]["doMonth"], to_absoluteDay)
config["rules"]["expected"]["doMonth"] = to_month
to_year = apply_rules_to_timeFormat(relativeRules["expected"]["doYear"], to_absoluteDay)
config["rules"]["expected"]["doYear"] = to_year
else:
regular_to_time = apply_rules_to_timeFormat(relativeRules["expected"]["to"], to_absoluteDay)
config["rules"]["expected"]["to"] = regular_to_time
return config["rules"]
@@ -186,6 +211,7 @@ def apply_rules_to_timeFormat(timeFormat: str, absoluteDay: datetime):
timeFormat = timeFormat.replace("{month}", month_mapping_full[absoluteDay.month], 1)
timeFormat = timeFormat.replace("{MonthFull}", Month_Mapping_Full[absoluteDay.month], 1)
timeFormat = timeFormat.replace("{Day0D}", "0"+str(absoluteDay.day) if absoluteDay.day < 10 else str(absoluteDay.day), 1)
timeFormat = timeFormat.replace("{MonthD}", str(absoluteDay.month), 1)
# you can add other replace rules here
return timeFormat

View File

@@ -21,7 +21,8 @@ from .chrome import (
is_expected_url_pattern_match,
is_added_to_steam_cart,
is_expected_installed_extensions,
compare_pdf_images
compare_pdf_images,
is_expected_active_tab_approximate
)
from .docs import (
compare_font_names,

View File

@@ -37,6 +37,34 @@ def is_expected_active_tab(active_tab_info: Dict[str, str], rule: Dict[str, Any]
return 0
def is_expected_active_tab_approximate(active_tab_info: Dict[str, str], rule: Dict[str, Any]) -> float:
"""
Checks if the expected active tab is open in Chrome, ignoring query parameters in the URL.
"""
if not active_tab_info:
return 0.
match_type = rule['type']
if match_type == "url":
expected_url = rule['url']
if isinstance(active_tab_info, Dict):
actual_url = active_tab_info.get('url', None)
else:
actual_url = active_tab_info
from urllib.parse import urlparse, urlunparse
def strip_query(url):
parsed = urlparse(url)
return urlunparse(parsed._replace(query=""))
if strip_query(expected_url) == strip_query(actual_url):
return 1
else:
return 0
else:
logger.error(f"Unknown type: {match_type}")
return 0
# rules[expected] is a string-formatted regex
def is_expected_url_pattern_match(result, rules) -> float:
"""
@@ -366,7 +394,12 @@ def is_shortcut_on_desktop(shortcuts: Dict[str, str], rule):
for shortcut_path, shortcut_content in shortcuts.items():
if "Name=" + rule['name'] + "\n" in shortcut_content:
return 1.
return 0.
return 0.0
elif rule['type'] == 'exec':
for shortcut_path, shortcut_content in shortcuts.items():
if "Exec=" + rule['exec'] + "\n" in shortcut_content:
return 1.
return 0.0
elif rule['type'] == 'url':
raise TypeError(f"{rule['type']} not support yet!")
elif rule['type'] == 'id':

View File

@@ -322,8 +322,14 @@ def check_direct_json_object(result, rules) -> float:
expected_json = rules["expected"]
for key in expected_json.keys():
expected_value = expected_json.get(key)
if expected_value != result.get(key):
return 0.
if expected_json.get("ignore_list_order", False):
expected_value = sorted(expected_value)
result_value = sorted(result.get(key))
if expected_value != result_value:
return 0.
else:
if expected_value != result.get(key):
return 0.
return 1.0
else:
expected_json = rules["expected"]