Check and fix on Chrome tasks

- Added `pytz` dependency to `requirements.txt` for timezone handling.
- Introduced `get_macys_product_url_parse` function to replace the old `get_url_path_parse` for better clarity and maintain backward compatibility.
- Enhanced logging throughout the `get_active_tab_html_parse` and `get_rule_relativeTime` functions for improved debugging and traceability.
- Updated JSON examples to reflect changes in expected keys and added new fields for better evaluation context.
- Removed deprecated execution commands from JSON examples to streamline the evaluation process.
This commit is contained in:
yuanmengqi
2025-07-06 07:52:37 +00:00
parent 1b40a458de
commit 9be6fcd688
20 changed files with 521 additions and 171 deletions

View File

@@ -1153,14 +1153,19 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
}
"""
active_tab_url = get_active_url_from_accessTree(env, config)
logger.info(f"[DEBUG] get_active_url_from_accessTree returned: {active_tab_url} (type: {type(active_tab_url)})")
if not isinstance(active_tab_url, str):
logger.error("active_tab_url is not a string")
logger.error(f"[DEBUG] active_tab_url is not a string, got {type(active_tab_url)}: {active_tab_url}")
return None
host = env.vm_ip
port = env.chromium_port # fixme: this port is hard-coded, need to be changed from config file
server_port = env.server_port
remote_debugging_url = f"http://{host}:{port}"
# DEBUG: Add logging for configuration
logger.info(f"[DEBUG] get_active_tab_html_parse called with config: {config}")
with sync_playwright() as p:
# connect to remote Chrome instance
try:
@@ -1189,13 +1194,21 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
for page in context.pages:
page.wait_for_load_state("networkidle")
# the accTree and playwright can get encoding(percent-encoding) characters, we need to convert them to normal characters
if unquote(page.url) == unquote(active_tab_url):
# Normalize URLs by removing trailing slashes and decoding percent-encoding
def normalize_url(url):
return unquote(url).rstrip('/')
if normalize_url(page.url) == normalize_url(active_tab_url):
target_page = page
print("\33[32mtartget page url: ", target_page.url, "\33[0m")
print("\33[32mtartget page title: ", target_page.title(), "\33[0m")
break
if target_page is None:
logger.error("Your tab is not the target tab.")
logger.error("[DEBUG] Could not find target tab matching URL. Available tabs:")
for context in browser.contexts:
for page in context.pages:
logger.error(f"[DEBUG] - Tab URL: {page.url}")
logger.error(f"[DEBUG] Expected URL: {active_tab_url}")
return {}
return_json = {}
@@ -1220,7 +1233,8 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
.filter(Boolean)
''')
results.append(texts)
return results[0]
# Safety check: return empty list if no elements found
return results[0] if results else []
def safely_get_direct_li_playwright(selector):
elements = target_page.query_selector_all(selector + " li.catAllProducts")
@@ -1238,6 +1252,9 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
index = int(order_key)
if len(elements_texts) > index:
return_json[key] = elements_texts[index]
else:
logger.warning(f"[DEBUG] Element at index {index} not found for class '{class_name}'. Found {len(elements_texts)} elements.")
return_json[key] = "" # Return empty string instead of None
class_multiObject_child = config.get("class_multiObject_child", {})
for class_name, object_dict in class_multiObject_child.items():
@@ -1246,6 +1263,9 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
index = int(order_key)
if len(elements_texts) > index:
return_json[key] = elements_texts[index]
else:
logger.warning(f"[DEBUG] Child element at index {index} not found for class '{class_name}'. Found {len(elements_texts)} elements.")
return_json[key] = "" # Return empty string instead of None
class_multiObject_only_child = config.get("class_multiObject_only_child", {})
for class_name, object_dict in class_multiObject_only_child.items():
@@ -1254,10 +1274,16 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
index = int(order_key)
if len(elements_texts) > index:
return_json[key] = elements_texts[index]
else:
logger.warning(f"[DEBUG] Only child element at index {index} not found for class '{class_name}'. Found {len(elements_texts)} elements.")
return_json[key] = "" # Return empty string instead of None
class_multiObject_search_exist = config.get("class_multiObject_search_exist", {})
for class_name, object_list in class_multiObject_search_exist.items():
elements_texts = safely_get_text_content("." + class_name)
logger.info(f"[DEBUG] Found elements with class '{class_name}': {elements_texts}")
logger.info(f"[DEBUG] Expected elements: {[obj for obj in object_list if obj != 'is_other_exist']}")
for each_object in object_list:
if each_object == "is_other_exist":
continue
@@ -1266,10 +1292,15 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
else:
return_json[each_object] = False
if "is_other_exist" in object_list:
extra_elements = []
for each_element in elements_texts:
if each_element not in object_list:
extra_elements.append(each_element)
return_json["is_other_exist"] = True
break
if extra_elements:
logger.warning(f"[DEBUG] Found unexpected elements not in expected list: {extra_elements}")
else:
logger.info(f"[DEBUG] No unexpected elements found")
if "is_other_exist" not in return_json.keys():
return_json["is_other_exist"] = False
@@ -1277,8 +1308,13 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
class_singleObject = config.get("class_singleObject", {})
for class_name, key in class_singleObject.items():
element_text = safely_get_text_content("." + class_name)
logger.info(f"[DEBUG] Class '{class_name}' found {len(element_text)} elements")
if element_text:
return_json[key] = element_text[0]
logger.info(f"[DEBUG] Class extraction for key '{key}': '{element_text[0]}'")
else:
logger.warning(f"[DEBUG] No elements found for class: {class_name}")
return_json[key] = "" # Return empty string instead of None
elif config['category'] == "label":
# Assuming get_by_label is a custom function or part of the framework being used
@@ -1290,17 +1326,75 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
elif config["category"] == "xpath":
xpathObject = config.get("xpathObject", {})
logger.info(f"[DEBUG] Processing xpath category with xpathObject: {xpathObject}")
for xpath, key in xpathObject.items():
logger.info(f"[DEBUG] Processing xpath: {xpath} -> key: {key}")
elements = target_page.locator(f"xpath={xpath}")
if elements.count() > 0:
return_json[key] = elements.first.text_content().strip()
element_count = elements.count()
logger.info(f"[DEBUG] Found {element_count} elements for xpath: {xpath}")
if element_count > 0:
try:
text_content = elements.first.text_content()
if text_content is not None:
text_content = text_content.strip()
logger.info(f"[DEBUG] Raw text content for key '{key}': '{text_content}' (type: {type(text_content)})")
# 处理空文本内容的情况
if text_content is None or text_content == "":
logger.warning(f"[DEBUG] Element found but text content is empty for key '{key}' xpath: {xpath}")
# 尝试获取更多信息
element_html = elements.first.inner_html()
element_text = elements.first.inner_text()
logger.info(f"[DEBUG] Element innerHTML: '{element_html[:100]}...' innerText: '{element_text}'")
return_json[key] = text_content if text_content else ""
logger.info(f"[DEBUG] Final value for key '{key}': '{return_json[key]}'")
except Exception as e:
logger.error(f"[DEBUG] Error extracting text from element for key '{key}': {e}")
return_json[key] = ""
else:
logger.warning(f"[DEBUG] No elements found for xpath: {xpath}")
# 尝试一些备用的xpath查找方法
try:
# 尝试不使用xpath前缀
fallback_elements = target_page.locator(xpath)
fallback_count = fallback_elements.count()
logger.info(f"[DEBUG] Fallback search (without xpath prefix) found {fallback_count} elements")
if fallback_count > 0:
text_content = fallback_elements.first.text_content()
if text_content:
text_content = text_content.strip()
return_json[key] = text_content if text_content else ""
logger.info(f"[DEBUG] Fallback extraction successful for key '{key}': '{return_json[key]}'")
else:
return_json[key] = ""
except Exception as e:
logger.info(f"[DEBUG] Fallback xpath search also failed: {e}")
return_json[key] = ""
elif config["category"] == "input":
inputObjects = config.get("inputObject", {})
logger.info(f"[DEBUG] Processing input category with inputObjects: {inputObjects}")
for xpath, key in inputObjects.items():
logger.info(f"[DEBUG] Processing input xpath: {xpath} -> key: {key}")
inputs = target_page.locator(f"xpath={xpath}")
if inputs.count() > 0:
return_json[key] = inputs.first.input_value().strip()
input_count = inputs.count()
logger.info(f"[DEBUG] Found {input_count} input elements for xpath: {xpath}")
if input_count > 0:
try:
input_value = inputs.first.input_value()
if input_value:
input_value = input_value.strip()
return_json[key] = input_value if input_value else ""
logger.info(f"[DEBUG] Input value for key '{key}': '{return_json[key]}'")
except Exception as e:
logger.error(f"[DEBUG] Error getting input value for key '{key}': {e}")
return_json[key] = ""
else:
logger.warning(f"[DEBUG] No input elements found for xpath: {xpath}")
return_json[key] = ""
elif config["category"] == "class&url":
class_multiObject = config.get("class_multiObject", {})
@@ -1352,6 +1446,23 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
return_json[value.lower()] = False
browser.close()
# DEBUG: Add logging for final result and check for None values
logger.info(f"[DEBUG] get_active_tab_html_parse final result: {return_json}")
# 检查是否有None值
none_keys = [key for key, value in return_json.items() if value is None]
if none_keys:
logger.warning(f"[DEBUG] Found None values for keys: {none_keys}")
# 检查是否期望的键都存在
if config["category"] == "xpath":
expected_keys = set(config.get("xpathObject", {}).values())
actual_keys = set(return_json.keys())
missing_keys = expected_keys - actual_keys
if missing_keys:
logger.warning(f"[DEBUG] Missing expected keys: {missing_keys}")
return return_json
@@ -1402,8 +1513,24 @@ def get_gotoRecreationPage_and_get_html_content(env, config: Dict[str, Any]):
print("go to newpage: ")
print(newpage.title())
time.sleep(2)
newpage.click("button.next-available")
print("after third click")
# Try to click the button with better error handling and longer timeout
try:
# Wait for the button to be available with a longer timeout
newpage.wait_for_selector("button.next-available", timeout=60000)
newpage.click("button.next-available", timeout=60000)
print("after third click")
except Exception as e:
logger.error(f"Failed to click 'next-available' button: {e}")
# Try alternative selectors if the main one fails
try:
newpage.wait_for_selector("button[class*='next']", timeout=30000)
newpage.click("button[class*='next']", timeout=30000)
print("after third click (alternative selector)")
except Exception as e2:
logger.error(f"Alternative selector also failed: {e2}")
# Continue execution even if button click fails
print("Continuing without clicking next-available button")
return_json = {}
return_json["expected"] = {}
@@ -1411,11 +1538,31 @@ def get_gotoRecreationPage_and_get_html_content(env, config: Dict[str, Any]):
if config["selector"] == "class":
if "order" in config.keys():
className = config["class"]
return_json["expected"][className] = newpage.query_selector_all("." + className)[
int(config["order"])].text_content().strip()
try:
elements = newpage.query_selector_all("." + className)
order_index = int(config["order"])
if len(elements) > order_index:
return_json["expected"][className] = elements[order_index].text_content().strip()
else:
logger.warning(f"Element with class '{className}' at index {order_index} not found. Found {len(elements)} elements.")
# For expected values, if we can't find the element, the evaluation cannot proceed
# Return a structure that indicates failure to get expected value
return_json["expected"][className] = "__EVALUATION_FAILED__"
except Exception as e:
logger.error(f"Error accessing element with class '{className}': {e}")
return_json["expected"][className] = "__EVALUATION_FAILED__"
else:
className = config["class"]
return_json["expected"][className] = newpage.query_selector("." + className).text_content().strip()
try:
element = newpage.query_selector("." + className)
if element:
return_json["expected"][className] = element.text_content().strip()
else:
logger.warning(f"Element with class '{className}' not found.")
return_json["expected"][className] = "__EVALUATION_FAILED__"
except Exception as e:
logger.error(f"Error accessing element with class '{className}': {e}")
return_json["expected"][className] = "__EVALUATION_FAILED__"
browser.close()
return return_json
@@ -1481,11 +1628,11 @@ def get_url_dashPart(env, config: Dict[str, str]):
return {config["key"]: dash_part}
def get_url_path_parse(env, config: Dict[str, str]):
def get_macys_product_url_parse(env, config: Dict[str, str]):
"""
Parse Macy's product url path, extract:
- mens_clothing: true if 'mens-clothing' in path, else None
- t_shirts: true if any key 'Top_style' or 'Product_department' value is 'T-shirts', else None
- shirts: true if any key 'Top_style' or 'Product_department' value is 'shirts', else None
- Men_regular_size_t, Price_discount_range (as list), Sleeve_length: as before, None if not found
All fields are None if not found for robustness.
"""
@@ -1503,9 +1650,12 @@ def get_url_path_parse(env, config: Dict[str, str]):
# key-value
path_parts = path.strip('/').split('/')
key_value_json = {}
tshirts_flag = False
if "mens-t-shirts" in path:
tshirts_flag = True
shirts_flag = False
short_sleeve_flag = False # Initialize short_sleeve_flag to avoid UnboundLocalError
if "shirts" in path:
shirts_flag = True
if "short-sleeve" in path:
short_sleeve_flag = True
for i in range(len(path_parts)-1):
if ',' in path_parts[i] and ',' in path_parts[i+1]:
keys = [k.strip() for k in path_parts[i].split(',')]
@@ -1515,21 +1665,34 @@ def get_url_path_parse(env, config: Dict[str, str]):
key_value_json[k] = [item.strip() for item in v.split('|')] if v else None
else:
key_value_json[k] = v if v else None
if (k == 'Top_style' or k == 'Product_department') and (v == 'T-shirts' or v == 'T-Shirts' or v == 'T-Shirt'):
tshirts_flag = True
if k == 'Product_department' and (v == 'shirts' or v == 'Shirts' or v == 'Shirt'):
shirts_flag = True
if k == 'Sleeve_length' and (v == 'short-sleeve' or v == 'Short Sleeve'):
short_sleeve_flag = True
break
for field in ['Men_regular_size_t', 'Price_discount_range', 'Sleeve_length']:
for field in ['Men_regular_size_t', 'Price_discount_range']:
if field not in key_value_json:
key_value_json[field] = None
result['t_shirts'] = tshirts_flag if tshirts_flag else None
result['shirts'] = shirts_flag if shirts_flag else None
result['short_sleeve'] = short_sleeve_flag if short_sleeve_flag else None
# parse_keys
for key in config["parse_keys"]:
if key in key_value_json:
if key == "Price_discount_range":
if '50_PERCENT_ off & more' in key_value_json[key] and not '30_PERCENT_ off & more' in key_value_json[key] and not '20_PERCENT_ off & more' in key_value_json[key]:
# Check if key_value_json[key] is not None before using 'in' operator
if key_value_json[key] is not None and '50_PERCENT_ off & more' in key_value_json[key] and not '30_PERCENT_ off & more' in key_value_json[key] and not '20_PERCENT_ off & more' in key_value_json[key]:
result[key] = '50_PERCENT_ off & more'
else:
result[key] = 'not_50_PERCENT_ off & more'
else:
result[key] = key_value_json[key]
return result
# Alias for backward compatibility - the old function name was too generic
def get_url_path_parse(env, config: Dict[str, str]):
"""
Alias for get_macys_product_url_parse to maintain backward compatibility.
This function name is kept for existing configurations that still use "url_path_parse" type.
"""
return get_macys_product_url_parse(env, config)