Enhance Chrome evaluator with improved error handling and retry mechanisms

- Added robust error handling for page processing, including checks for closed pages and HTTP status codes.
- Implemented retry logic for page loads and active tab checks to improve reliability.
- Enhanced logging throughout the process to capture detailed information about failures and successes.
- Preserved existing logic while ensuring better maintainability and robustness in the Chrome evaluator functions.
This commit is contained in:
yuanmengqi
2025-07-18 07:13:13 +00:00
parent 0fb625e4fd
commit fcaefe7bb4
2 changed files with 567 additions and 183 deletions

View File

@@ -1521,30 +1521,68 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
target_page = None
for context in browser.contexts:
for page in context.pages:
page.wait_for_load_state("networkidle")
# the accTree and playwright can get encoding(percent-encoding) characters, we need to convert them to normal characters
# Normalize URLs by removing trailing slashes and decoding percent-encoding
def normalize_url(url):
return unquote(url).rstrip('/')
if normalize_url(page.url) == normalize_url(active_tab_url):
target_page = page
print("\33[32mtartget page url: ", target_page.url, "\33[0m")
print("\33[32mtartget page title: ", target_page.title(), "\33[0m")
break
try:
# Wait for page to be stable before checking URL
page.wait_for_load_state("networkidle", timeout=10000)
# Check if page is still valid before accessing properties
if page.is_closed():
logger.debug(f"[DEBUG] Page is closed, skipping")
continue
# the accTree and playwright can get encoding(percent-encoding) characters, we need to convert them to normal characters
# Normalize URLs by removing trailing slashes and decoding percent-encoding
def normalize_url(url):
return unquote(url).rstrip('/')
# Safely get page URL with error handling
try:
page_url = page.url
except Exception as url_error:
logger.debug(f"[DEBUG] Failed to get page URL: {url_error}")
continue
if normalize_url(page_url) == normalize_url(active_tab_url):
target_page = page
print("\33[32mtartget page url: ", target_page.url, "\33[0m")
# Safely get page title with error handling
try:
page_title = target_page.title()
print("\33[32mtartget page title: ", page_title, "\33[0m")
except Exception as title_error:
logger.debug(f"[DEBUG] Failed to get page title: {title_error}")
print("\33[32mtartget page title: [Unable to get title - page may be navigating]", "\33[0m")
break
except Exception as page_error:
logger.debug(f"[DEBUG] Error processing page: {page_error}")
continue
if target_page is None:
logger.error("[DEBUG] Could not find target tab matching URL. Available tabs:")
for context in browser.contexts:
for page in context.pages:
logger.error(f"[DEBUG] - Tab URL: {page.url}")
try:
if not page.is_closed():
page_url = page.url
logger.error(f"[DEBUG] - Tab URL: {page_url}")
except Exception as e:
logger.error(f"[DEBUG] - Tab URL: [Unable to get URL: {e}]")
logger.error(f"[DEBUG] Expected URL: {active_tab_url}")
return {}
return_json = {}
def safely_get_text_content(selector):
elements = target_page.query_selector_all(selector)
return [element.text_content().strip() for element in elements if element]
try:
if target_page.is_closed():
logger.warning(f"[DEBUG] Target page is closed, cannot get text content for selector: {selector}")
return []
elements = target_page.query_selector_all(selector)
return [element.text_content().strip() for element in elements if element]
except Exception as e:
logger.warning(f"[DEBUG] Error getting text content for selector '{selector}': {e}")
return []
def safely_get_direct_text_nodes_playwright(selector):
"""
@@ -1552,26 +1590,47 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
Returns a list of lists, each sublist contains the direct text nodes of one element.
Suitable for structures like: <div>SEA<div class="aura-separator"></div>NYC</div>
"""
elements = target_page.query_selector_all(selector)
results = []
for element in elements:
texts = element.evaluate('''
(node) => Array.from(node.childNodes)
.filter(n => n.nodeType === Node.TEXT_NODE)
.map(n => n.textContent.trim())
.filter(Boolean)
''')
results.append(texts)
# Safety check: return empty list if no elements found
return results[0] if results else []
try:
if target_page.is_closed():
logger.warning(f"[DEBUG] Target page is closed, cannot get direct text nodes for selector: {selector}")
return []
elements = target_page.query_selector_all(selector)
results = []
for element in elements:
texts = element.evaluate('''
(node) => Array.from(node.childNodes)
.filter(n => n.nodeType === Node.TEXT_NODE)
.map(n => n.textContent.trim())
.filter(Boolean)
''')
results.append(texts)
# Safety check: return empty list if no elements found
return results[0] if results else []
except Exception as e:
logger.warning(f"[DEBUG] Error getting direct text nodes for selector '{selector}': {e}")
return []
def safely_get_direct_li_playwright(selector):
elements = target_page.query_selector_all(selector + " li.catAllProducts")
return [element.query_selector('span').inner_text().strip() for element in elements if element.query_selector('span')]
try:
if target_page.is_closed():
logger.warning(f"[DEBUG] Target page is closed, cannot get li elements for selector: {selector}")
return []
elements = target_page.query_selector_all(selector + " li.catAllProducts")
return [element.query_selector('span').inner_text().strip() for element in elements if element.query_selector('span')]
except Exception as e:
logger.warning(f"[DEBUG] Error getting li elements for selector '{selector}': {e}")
return []
def safely_get_only_child_text_content(selector):
elements = target_page.query_selector_all(selector)
return [element.query_selector('h3').text_content().strip() for element in elements if element.query_selector('h3')]
try:
if target_page.is_closed():
logger.warning(f"[DEBUG] Target page is closed, cannot get child text content for selector: {selector}")
return []
elements = target_page.query_selector_all(selector)
return [element.query_selector('h3').text_content().strip() for element in elements if element.query_selector('h3')]
except Exception as e:
logger.warning(f"[DEBUG] Error getting child text content for selector '{selector}': {e}")
return []
if config["category"] == "class":
class_multiObject = config.get("class_multiObject", {})
@@ -1649,9 +1708,21 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
# Assuming get_by_label is a custom function or part of the framework being used
labelObject = config.get("labelObject", {})
for labelSelector, key in labelObject.items():
text = target_page.locator(f"text={labelSelector}").first.text_content().strip()
if text:
return_json[key] = text
try:
if target_page.is_closed():
logger.warning(f"[DEBUG] Target page is closed, cannot process label for key '{key}'")
return_json[key] = ""
continue
text = target_page.locator(f"text={labelSelector}").first.text_content()
if text:
text = text.strip()
return_json[key] = text
else:
return_json[key] = ""
except Exception as e:
logger.error(f"[DEBUG] Error processing label for key '{key}': {e}")
return_json[key] = ""
elif config["category"] == "xpath":
xpathObject = config.get("xpathObject", {})
@@ -1659,70 +1730,94 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
for xpath, key in xpathObject.items():
logger.info(f"[DEBUG] Processing xpath: {xpath} -> key: {key}")
elements = target_page.locator(f"xpath={xpath}")
element_count = elements.count()
logger.info(f"[DEBUG] Found {element_count} elements for xpath: {xpath}")
if element_count > 0:
try:
text_content = elements.first.text_content()
if text_content is not None:
text_content = text_content.strip()
logger.info(f"[DEBUG] Raw text content for key '{key}': '{text_content}' (type: {type(text_content)})")
# Check if page is still valid
try:
if target_page.is_closed():
logger.warning(f"[DEBUG] Target page is closed, cannot process xpath for key '{key}'")
return_json[key] = ""
continue
# 处理空文本内容的情况
if text_content is None or text_content == "":
logger.warning(f"[DEBUG] Element found but text content is empty for key '{key}' xpath: {xpath}")
# 尝试获取更多信息
element_html = elements.first.inner_html()
element_text = elements.first.inner_text()
logger.info(f"[DEBUG] Element innerHTML: '{element_html[:100]}...' innerText: '{element_text}'")
return_json[key] = text_content if text_content else ""
logger.info(f"[DEBUG] Final value for key '{key}': '{return_json[key]}'")
except Exception as e:
logger.error(f"[DEBUG] Error extracting text from element for key '{key}': {e}")
return_json[key] = ""
else:
logger.warning(f"[DEBUG] No elements found for xpath: {xpath}")
# 尝试一些备用的xpath查找方法
try:
# 尝试不使用xpath前缀
fallback_elements = target_page.locator(xpath)
fallback_count = fallback_elements.count()
logger.info(f"[DEBUG] Fallback search (without xpath prefix) found {fallback_count} elements")
if fallback_count > 0:
text_content = fallback_elements.first.text_content()
if text_content:
elements = target_page.locator(f"xpath={xpath}")
element_count = elements.count()
logger.info(f"[DEBUG] Found {element_count} elements for xpath: {xpath}")
if element_count > 0:
try:
text_content = elements.first.text_content()
if text_content is not None:
text_content = text_content.strip()
logger.info(f"[DEBUG] Raw text content for key '{key}': '{text_content}' (type: {type(text_content)})")
# 处理空文本内容的情况
if text_content is None or text_content == "":
logger.warning(f"[DEBUG] Element found but text content is empty for key '{key}' xpath: {xpath}")
# 尝试获取更多信息
try:
element_html = elements.first.inner_html()
element_text = elements.first.inner_text()
logger.info(f"[DEBUG] Element innerHTML: '{element_html[:100]}...' innerText: '{element_text}'")
except Exception as inner_e:
logger.debug(f"[DEBUG] Failed to get inner content: {inner_e}")
return_json[key] = text_content if text_content else ""
logger.info(f"[DEBUG] Fallback extraction successful for key '{key}': '{return_json[key]}'")
else:
logger.info(f"[DEBUG] Final value for key '{key}': '{return_json[key]}'")
except Exception as e:
logger.error(f"[DEBUG] Error extracting text from element for key '{key}': {e}")
return_json[key] = ""
except Exception as e:
logger.info(f"[DEBUG] Fallback xpath search also failed: {e}")
return_json[key] = ""
else:
logger.warning(f"[DEBUG] No elements found for xpath: {xpath}")
# 尝试一些备用的xpath查找方法
try:
# 尝试不使用xpath前缀
fallback_elements = target_page.locator(xpath)
fallback_count = fallback_elements.count()
logger.info(f"[DEBUG] Fallback search (without xpath prefix) found {fallback_count} elements")
if fallback_count > 0:
text_content = fallback_elements.first.text_content()
if text_content:
text_content = text_content.strip()
return_json[key] = text_content if text_content else ""
logger.info(f"[DEBUG] Fallback extraction successful for key '{key}': '{return_json[key]}'")
else:
return_json[key] = ""
except Exception as e:
logger.info(f"[DEBUG] Fallback xpath search also failed: {e}")
return_json[key] = ""
except Exception as e:
logger.error(f"[DEBUG] Error processing xpath for key '{key}': {e}")
return_json[key] = ""
elif config["category"] == "input":
inputObjects = config.get("inputObject", {})
logger.info(f"[DEBUG] Processing input category with inputObjects: {inputObjects}")
for xpath, key in inputObjects.items():
logger.info(f"[DEBUG] Processing input xpath: {xpath} -> key: {key}")
inputs = target_page.locator(f"xpath={xpath}")
input_count = inputs.count()
logger.info(f"[DEBUG] Found {input_count} input elements for xpath: {xpath}")
if input_count > 0:
try:
input_value = inputs.first.input_value()
if input_value:
input_value = input_value.strip()
return_json[key] = input_value if input_value else ""
logger.info(f"[DEBUG] Input value for key '{key}': '{return_json[key]}'")
except Exception as e:
logger.error(f"[DEBUG] Error getting input value for key '{key}': {e}")
try:
if target_page.is_closed():
logger.warning(f"[DEBUG] Target page is closed, cannot process input for key '{key}'")
return_json[key] = ""
else:
logger.warning(f"[DEBUG] No input elements found for xpath: {xpath}")
continue
inputs = target_page.locator(f"xpath={xpath}")
input_count = inputs.count()
logger.info(f"[DEBUG] Found {input_count} input elements for xpath: {xpath}")
if input_count > 0:
try:
input_value = inputs.first.input_value()
if input_value:
input_value = input_value.strip()
return_json[key] = input_value if input_value else ""
logger.info(f"[DEBUG] Input value for key '{key}': '{return_json[key]}'")
except Exception as e:
logger.error(f"[DEBUG] Error getting input value for key '{key}': {e}")
return_json[key] = ""
else:
logger.warning(f"[DEBUG] No input elements found for xpath: {xpath}")
return_json[key] = ""
except Exception as e:
logger.error(f"[DEBUG] Error processing input for key '{key}': {e}")
return_json[key] = ""
elif config["category"] == "class&url":
@@ -1758,19 +1853,43 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
url_include_expected = config.get("url_include_expected", [])
for key in url_include_expected:
if key.lower() in target_page.url.lower():
if key.lower() not in return_json.keys():
return_json[key.lower()] = True
else:
try:
if target_page.is_closed():
logger.warning(f"[DEBUG] Target page is closed, cannot check URL for key '{key}'")
if key.lower() not in return_json.keys():
return_json[key.lower()] = False
continue
page_url = target_page.url.lower()
if key.lower() in page_url:
if key.lower() not in return_json.keys():
return_json[key.lower()] = True
else:
if key.lower() not in return_json.keys():
return_json[key.lower()] = False
except Exception as e:
logger.error(f"[DEBUG] Error checking URL for key '{key}': {e}")
if key.lower() not in return_json.keys():
return_json[key.lower()] = False
url_include_expected_multichoice = config.get("url_include_expected_multichoice", {})
for key, value in url_include_expected_multichoice.items():
if key.lower() in target_page.url.lower():
if value.lower() not in return_json.keys():
return_json[value.lower()] = True
else:
try:
if target_page.is_closed():
logger.warning(f"[DEBUG] Target page is closed, cannot check URL for multichoice key '{key}'")
if value.lower() not in return_json.keys():
return_json[value.lower()] = False
continue
page_url = target_page.url.lower()
if key.lower() in page_url:
if value.lower() not in return_json.keys():
return_json[value.lower()] = True
else:
if value.lower() not in return_json.keys():
return_json[value.lower()] = False
except Exception as e:
logger.error(f"[DEBUG] Error checking URL for multichoice key '{key}': {e}")
if value.lower() not in return_json.keys():
return_json[value.lower()] = False
@@ -1799,101 +1918,307 @@ def get_gotoRecreationPage_and_get_html_content(env, config: Dict[str, Any]):
"""
especially used for www.recreation.gov examples
"""
# Add logging for debugging
logger.info(f"[RECREATION_PAGE] Starting recreation.gov page processing")
logger.debug(f"[RECREATION_PAGE] Config: {config}")
host = env.vm_ip
port = env.chromium_port # fixme: this port is hard-coded, need to be changed from config file
server_port = env.server_port
use_proxy = env.current_use_proxy
remote_debugging_url = f"http://{host}:{port}"
with sync_playwright() as p:
backend_url = f"http://{host}:{server_port}"
# Configuration for retry and timeout
max_retries = 3
timeout_ms = 60000 # Increase timeout to 60 seconds
# Test basic connectivity first
logger.info(f"[RECREATION_PAGE] Testing basic network connectivity...")
try:
import urllib.request
test_response = urllib.request.urlopen('http://www.google.com', timeout=10)
logger.info(f"[RECREATION_PAGE] Basic connectivity test passed (Google accessible)")
except Exception as e:
logger.warning(f"[RECREATION_PAGE] Basic connectivity test failed: {e}")
logger.info(f"[RECREATION_PAGE] Proceeding anyway...")
for attempt in range(max_retries):
try:
browser = p.chromium.connect_over_cdp(remote_debugging_url)
except Exception as e:
# If the connection fails, start a new browser instance
platform.machine()
if "arm" in platform.machine():
# start a new browser instance if the connection fails
payload = json.dumps({"command": [
"chromium",
"--remote-debugging-port=1337"
], "shell": False})
else:
payload = json.dumps({"command": [
"google-chrome",
"--remote-debugging-port=1337"
], "shell": False})
headers = {"Content-Type": "application/json"}
requests.post("http://" + host + ":" + server_port + "/setup" + "/launch", headers=headers, data=payload)
time.sleep(5)
browser = p.chromium.connect_over_cdp(remote_debugging_url)
page = browser.new_page()
page.goto("https://www.recreation.gov/")
page.fill("input#hero-search-input", "Diamond")
page.click("button.nav-search-button")
print("after first click")
time.sleep(10)
# Assuming .search-result-highlight--success leads to a new page or requires page load
with page.expect_popup() as popup_info:
page.click(".search-result-highlight--success")
time.sleep(30)
print("after second click")
newpage = popup_info.value
newpage.wait_for_load_state()
print("go to newpage: ")
print(newpage.title())
time.sleep(2)
# Try to click the button with better error handling and longer timeout
try:
# Wait for the button to be available with a longer timeout
newpage.wait_for_selector("button.next-available", timeout=60000)
newpage.click("button.next-available", timeout=60000)
print("after third click")
except Exception as e:
logger.error(f"Failed to click 'next-available' button: {e}")
# Try alternative selectors if the main one fails
try:
newpage.wait_for_selector("button[class*='next']", timeout=30000)
newpage.click("button[class*='next']", timeout=30000)
print("after third click (alternative selector)")
except Exception as e2:
logger.error(f"Alternative selector also failed: {e2}")
# Continue execution even if button click fails
print("Continuing without clicking next-available button")
return_json = {}
return_json["expected"] = {}
# find the text of elements in html with specific class name
if config["selector"] == "class":
if "order" in config.keys():
className = config["class"]
logger.info(f"[RECREATION_PAGE] Attempt {attempt + 1}/{max_retries}")
with sync_playwright() as p:
# Connect to remote Chrome instance
try:
elements = newpage.query_selector_all("." + className)
order_index = int(config["order"])
if len(elements) > order_index:
return_json["expected"][className] = elements[order_index].text_content().strip()
else:
logger.warning(f"Element with class '{className}' at index {order_index} not found. Found {len(elements)} elements.")
# For expected values, if we can't find the element, the evaluation cannot proceed
# Return a structure that indicates failure to get expected value
return_json["expected"][className] = "__EVALUATION_FAILED__"
browser = p.chromium.connect_over_cdp(remote_debugging_url)
logger.info(f"[RECREATION_PAGE] Successfully connected to existing Chrome instance")
except Exception as e:
logger.error(f"Error accessing element with class '{className}': {e}")
return_json["expected"][className] = "__EVALUATION_FAILED__"
else:
className = config["class"]
logger.warning(f"[RECREATION_PAGE] Failed to connect to existing Chrome instance: {e}")
logger.info(f"[RECREATION_PAGE] Starting new Chrome instance with enhanced options...")
# If the connection fails, start a new browser instance with better options
app = 'chromium' if 'arm' in platform.machine() else 'google-chrome'
command = [
app,
"--remote-debugging-port=1337",
"--no-sandbox",
"--disable-web-security",
"--disable-features=VizDisplayCompositor",
"--disable-dev-shm-usage",
"--user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
]
if use_proxy:
command.append(f"--proxy-server=127.0.0.1:18888")
logger.info(f"[RECREATION_PAGE] Using proxy server: 127.0.0.1:18888")
logger.info(f"[RECREATION_PAGE] Starting browser with command: {' '.join(command)}")
payload = json.dumps({"command": command, "shell": False})
headers = {"Content-Type": "application/json"}
requests.post(backend_url + "/setup/launch", headers=headers, data=payload)
time.sleep(8) # Give more time for browser to start
browser = p.chromium.connect_over_cdp(remote_debugging_url)
logger.info(f"[RECREATION_PAGE] Successfully connected to new Chrome instance")
page = browser.new_page()
# Set longer timeout for all operations
page.set_default_timeout(timeout_ms)
# Set additional headers to appear more like a real browser
page.set_extra_http_headers({
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})
# Try multiple URLs to test connectivity
test_urls = [
"https://www.recreation.gov/",
"http://www.recreation.gov/",
"https://recreation.gov/",
]
successful_url = None
for test_url in test_urls:
try:
# Step 1: Navigate to recreation.gov with better error handling
logger.info(f"[RECREATION_PAGE] Trying to navigate to: {test_url}")
# Try different wait strategies
wait_strategies = ['domcontentloaded', 'load', 'networkidle']
for wait_until in wait_strategies:
try:
logger.debug(f"[RECREATION_PAGE] Trying wait strategy: {wait_until}")
page.goto(test_url, wait_until=wait_until, timeout=timeout_ms)
logger.info(f"[RECREATION_PAGE] Successfully loaded {test_url} with strategy {wait_until}")
logger.info(f"[RECREATION_PAGE] Page title: '{page.title()}'")
logger.info(f"[RECREATION_PAGE] Current URL: '{page.url}'")
successful_url = test_url
break
except Exception as strategy_error:
logger.debug(f"[RECREATION_PAGE] Wait strategy {wait_until} failed: {strategy_error}")
continue
if successful_url:
break
except Exception as url_error:
logger.warning(f"[RECREATION_PAGE] Failed to navigate to {test_url}: {url_error}")
continue
if not successful_url:
raise Exception("Failed to navigate to any recreation.gov URL variant")
# Additional wait to ensure page is fully loaded
try:
element = newpage.query_selector("." + className)
if element:
return_json["expected"][className] = element.text_content().strip()
else:
logger.warning(f"Element with class '{className}' not found.")
return_json["expected"][className] = "__EVALUATION_FAILED__"
page.wait_for_load_state('networkidle', timeout=30000)
logger.info(f"[RECREATION_PAGE] Page fully loaded and idle")
except Exception as e:
logger.error(f"Error accessing element with class '{className}': {e}")
return_json["expected"][className] = "__EVALUATION_FAILED__"
browser.close()
return return_json
logger.warning(f"[RECREATION_PAGE] NetworkIdle wait failed, continuing: {e}")
try:
# Step 2: Fill search input
logger.info(f"[RECREATION_PAGE] Filling search input with 'Diamond'...")
page.wait_for_selector("input#hero-search-input", state='visible', timeout=timeout_ms)
page.fill("input#hero-search-input", "Diamond")
logger.info(f"[RECREATION_PAGE] Successfully filled search input")
except Exception as e:
logger.error(f"[RECREATION_PAGE] Failed to fill search input: {e}")
raise e
try:
# Step 3: Click search button
logger.info(f"[RECREATION_PAGE] Clicking search button...")
page.wait_for_selector("button.nav-search-button", state='visible', timeout=timeout_ms)
page.click("button.nav-search-button")
logger.info(f"[RECREATION_PAGE] Successfully clicked search button")
print("after first click")
# Wait for search results to load
time.sleep(10)
except Exception as e:
logger.error(f"[RECREATION_PAGE] Failed to click search button: {e}")
raise e
try:
# Step 4: Click on search result
logger.info(f"[RECREATION_PAGE] Waiting for and clicking search result...")
page.wait_for_selector(".search-result-highlight--success", state='visible', timeout=timeout_ms)
with page.expect_popup() as popup_info:
page.click(".search-result-highlight--success")
time.sleep(30) # Wait for popup to fully load
print("after second click")
logger.info(f"[RECREATION_PAGE] Successfully clicked search result")
except Exception as e:
logger.error(f"[RECREATION_PAGE] Failed to click search result: {e}")
raise e
try:
# Step 5: Handle new page
newpage = popup_info.value
newpage.set_default_timeout(timeout_ms)
newpage.wait_for_load_state('networkidle', timeout=timeout_ms)
page_title = newpage.title()
logger.info(f"[RECREATION_PAGE] New page loaded successfully")
logger.info(f"[RECREATION_PAGE] New page title: '{page_title}'")
logger.info(f"[RECREATION_PAGE] New page URL: '{newpage.url}'")
print(f"go to newpage: {page_title}")
time.sleep(2)
except Exception as e:
logger.error(f"[RECREATION_PAGE] Failed to handle new page: {e}")
raise e
try:
# Step 6: Click next-available button with better error handling
logger.info(f"[RECREATION_PAGE] Looking for next-available button...")
# Try multiple selectors for the button
selectors_to_try = [
"button.next-available",
"button[class*='next-available']",
"button[class*='next']",
".next-available",
"[data-testid*='next']"
]
button_clicked = False
for selector in selectors_to_try:
try:
logger.debug(f"[RECREATION_PAGE] Trying selector: {selector}")
newpage.wait_for_selector(selector, state='visible', timeout=30000)
newpage.click(selector, timeout=30000)
logger.info(f"[RECREATION_PAGE] Successfully clicked next-available button with selector: {selector}")
print("after third click")
button_clicked = True
break
except Exception as selector_error:
logger.debug(f"[RECREATION_PAGE] Selector '{selector}' failed: {selector_error}")
continue
if not button_clicked:
logger.warning(f"[RECREATION_PAGE] Could not find or click next-available button with any selector")
logger.info(f"[RECREATION_PAGE] Continuing without clicking next-available button")
print("Continuing without clicking next-available button")
except Exception as e:
logger.warning(f"[RECREATION_PAGE] Error with next-available button: {e}")
logger.info(f"[RECREATION_PAGE] Continuing execution despite button click failure")
print("Continuing without clicking next-available button")
# Step 7: Extract content based on config
return_json = {}
return_json["expected"] = {}
try:
logger.info(f"[RECREATION_PAGE] Extracting content based on config...")
if config["selector"] == "class":
className = config["class"]
logger.debug(f"[RECREATION_PAGE] Looking for elements with class: {className}")
if "order" in config.keys():
try:
elements = newpage.query_selector_all("." + className)
order_index = int(config["order"])
logger.info(f"[RECREATION_PAGE] Found {len(elements)} elements with class '{className}', looking for index {order_index}")
if len(elements) > order_index:
text_content = elements[order_index].text_content()
if text_content:
text_content = text_content.strip()
return_json["expected"][className] = text_content
logger.info(f"[RECREATION_PAGE] Successfully extracted text from element at index {order_index}: '{text_content}'")
else:
logger.warning(f"[RECREATION_PAGE] Element with class '{className}' at index {order_index} not found. Found {len(elements)} elements.")
return_json["expected"][className] = "__EVALUATION_FAILED__"
except Exception as e:
logger.error(f"[RECREATION_PAGE] Error accessing element with class '{className}' at specific order: {e}")
return_json["expected"][className] = "__EVALUATION_FAILED__"
else:
try:
element = newpage.query_selector("." + className)
if element:
text_content = element.text_content()
if text_content:
text_content = text_content.strip()
return_json["expected"][className] = text_content
logger.info(f"[RECREATION_PAGE] Successfully extracted text from first element: '{text_content}'")
else:
logger.warning(f"[RECREATION_PAGE] Element with class '{className}' not found.")
return_json["expected"][className] = "__EVALUATION_FAILED__"
except Exception as e:
logger.error(f"[RECREATION_PAGE] Error accessing element with class '{className}': {e}")
return_json["expected"][className] = "__EVALUATION_FAILED__"
logger.info(f"[RECREATION_PAGE] Content extraction completed successfully")
logger.info(f"[RECREATION_PAGE] Final result: {return_json}")
except Exception as e:
logger.error(f"[RECREATION_PAGE] Error during content extraction: {e}")
# Return a structure indicating extraction failed
return_json["expected"] = {"extraction_error": "__EVALUATION_FAILED__"}
browser.close()
return return_json
except Exception as e:
logger.error(f"[RECREATION_PAGE] Attempt {attempt + 1} failed: {str(e)}")
logger.error(f"[RECREATION_PAGE] Exception type: {type(e).__name__}")
if attempt < max_retries - 1:
logger.info(f"[RECREATION_PAGE] Retrying in 5 seconds...")
time.sleep(5)
else:
logger.error(f"[RECREATION_PAGE] All {max_retries} attempts failed. Returning error result.")
# Return a structure that indicates complete failure
return {
"expected": {
"error": "__EVALUATION_FAILED__",
"error_message": f"Failed after {max_retries} attempts: {str(e)}"
}
}
# This should never be reached, but just in case
logger.error(f"[RECREATION_PAGE] Unexpected code path reached")
return {
"expected": {
"error": "__EVALUATION_FAILED__",
"error_message": "Unexpected error in function execution"
}
}
def get_active_tab_url_parse(env, config: Dict[str, Any]):

View File

@@ -21,6 +21,30 @@ def is_expected_active_tab(active_tab_info: Dict[str, str], rule: Dict[str, Any]
if not active_tab_info:
return 0.
# 添加重试机制
max_retries = 3
retry_delay = 2 # seconds
for attempt in range(max_retries):
# 添加HTTP状态码检查
if 'status' in active_tab_info and active_tab_info['status'] >= 400:
logger.warning(f"Page load failed (attempt {attempt+1}/{max_retries}), HTTP status: {active_tab_info['status']}")
if attempt < max_retries - 1:
# 重试前刷新页面
logger.info(f"Refreshing page and retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
# 这里需要调用刷新页面的函数(实际实现取决于您的环境)
# 伪代码: refresh_active_tab()
# 然后重新获取 active_tab_info
# 伪代码: active_tab_info = get_active_tab_info()
continue
else:
logger.error(f"Page load failed after {max_retries} attempts")
return 0.
break # 如果状态码正常,跳出重试循环
match_type = rule['type']
if match_type == "url":
@@ -44,6 +68,26 @@ def is_expected_active_tab_approximate(active_tab_info: Dict[str, str], rule: Di
if not active_tab_info:
return 0.
# 添加相同的重试机制
max_retries = 3
retry_delay = 2 # seconds
for attempt in range(max_retries):
if 'status' in active_tab_info and active_tab_info['status'] >= 400:
logger.warning(f"Page load failed (attempt {attempt+1}/{max_retries}), HTTP status: {active_tab_info['status']}")
if attempt < max_retries - 1:
logger.info(f"Refreshing page and retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
# 伪代码: refresh_active_tab()
# 伪代码: active_tab_info = get_active_tab_info()
continue
else:
logger.error(f"Page load failed after {max_retries} attempts")
return 0.
break
match_type = rule['type']
if match_type == "url":
@@ -74,11 +118,25 @@ def is_expected_url_pattern_match(result, rules) -> float:
if not result:
return 0.
if type(result) == dict:
result_url = result["url"]
logger.info("result url: {}".format(result_url))
else:
result_url = result
# 添加相同的重试机制
max_retries = 3
retry_delay = 2 # seconds
for attempt in range(max_retries):
if isinstance(result, dict) and 'status' in result and result['status'] >= 400:
logger.warning(f"Page load failed (attempt {attempt+1}/{max_retries}), HTTP status: {result['status']}")
if attempt < max_retries - 1:
logger.info(f"Refreshing page and retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
# 伪代码: refresh_active_tab()
# 伪代码: result = get_active_tab_info()
continue
else:
logger.error(f"Page load failed after {max_retries} attempts")
return 0.
break
# expect_regex = re.compile(rules["expected"])
patterns = rules["expected"]
logger.info("expected_regex: {}".format(patterns))
@@ -220,6 +278,7 @@ import imagehash
from pathlib import Path
import typing
import time
def compare_pdf_images(pdf1_path: str, pdf2_path: str, **kwargs) -> float: