diff --git a/desktop_env/evaluators/getters/chrome.py b/desktop_env/evaluators/getters/chrome.py
index 68728dc..129c460 100644
--- a/desktop_env/evaluators/getters/chrome.py
+++ b/desktop_env/evaluators/getters/chrome.py
@@ -1521,30 +1521,68 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
target_page = None
for context in browser.contexts:
for page in context.pages:
- page.wait_for_load_state("networkidle")
- # the accTree and playwright can get encoding(percent-encoding) characters, we need to convert them to normal characters
- # Normalize URLs by removing trailing slashes and decoding percent-encoding
- def normalize_url(url):
- return unquote(url).rstrip('/')
-
- if normalize_url(page.url) == normalize_url(active_tab_url):
- target_page = page
- print("\33[32mtartget page url: ", target_page.url, "\33[0m")
- print("\33[32mtartget page title: ", target_page.title(), "\33[0m")
- break
+ try:
+ # Wait for page to be stable before checking URL
+ page.wait_for_load_state("networkidle", timeout=10000)
+
+ # Check if page is still valid before accessing properties
+ if page.is_closed():
+ logger.debug(f"[DEBUG] Page is closed, skipping")
+ continue
+
+ # the accTree and playwright can get encoding(percent-encoding) characters, we need to convert them to normal characters
+ # Normalize URLs by removing trailing slashes and decoding percent-encoding
+ def normalize_url(url):
+ return unquote(url).rstrip('/')
+
+ # Safely get page URL with error handling
+ try:
+ page_url = page.url
+ except Exception as url_error:
+ logger.debug(f"[DEBUG] Failed to get page URL: {url_error}")
+ continue
+
+ if normalize_url(page_url) == normalize_url(active_tab_url):
+ target_page = page
+ print("\33[32mtartget page url: ", target_page.url, "\33[0m")
+
+ # Safely get page title with error handling
+ try:
+ page_title = target_page.title()
+ print("\33[32mtartget page title: ", page_title, "\33[0m")
+ except Exception as title_error:
+ logger.debug(f"[DEBUG] Failed to get page title: {title_error}")
+ print("\33[32mtartget page title: [Unable to get title - page may be navigating]", "\33[0m")
+ break
+
+ except Exception as page_error:
+ logger.debug(f"[DEBUG] Error processing page: {page_error}")
+ continue
if target_page is None:
logger.error("[DEBUG] Could not find target tab matching URL. Available tabs:")
for context in browser.contexts:
for page in context.pages:
- logger.error(f"[DEBUG] - Tab URL: {page.url}")
+ try:
+ if not page.is_closed():
+ page_url = page.url
+ logger.error(f"[DEBUG] - Tab URL: {page_url}")
+ except Exception as e:
+ logger.error(f"[DEBUG] - Tab URL: [Unable to get URL: {e}]")
logger.error(f"[DEBUG] Expected URL: {active_tab_url}")
return {}
return_json = {}
def safely_get_text_content(selector):
- elements = target_page.query_selector_all(selector)
- return [element.text_content().strip() for element in elements if element]
+ try:
+ if target_page.is_closed():
+ logger.warning(f"[DEBUG] Target page is closed, cannot get text content for selector: {selector}")
+ return []
+ elements = target_page.query_selector_all(selector)
+ return [element.text_content().strip() for element in elements if element]
+ except Exception as e:
+ logger.warning(f"[DEBUG] Error getting text content for selector '{selector}': {e}")
+ return []
def safely_get_direct_text_nodes_playwright(selector):
"""
@@ -1552,26 +1590,47 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
Returns a list of lists, each sublist contains the direct text nodes of one element.
Suitable for structures like:
"""
- elements = target_page.query_selector_all(selector)
- results = []
- for element in elements:
- texts = element.evaluate('''
- (node) => Array.from(node.childNodes)
- .filter(n => n.nodeType === Node.TEXT_NODE)
- .map(n => n.textContent.trim())
- .filter(Boolean)
- ''')
- results.append(texts)
- # Safety check: return empty list if no elements found
- return results[0] if results else []
+ try:
+ if target_page.is_closed():
+ logger.warning(f"[DEBUG] Target page is closed, cannot get direct text nodes for selector: {selector}")
+ return []
+ elements = target_page.query_selector_all(selector)
+ results = []
+ for element in elements:
+ texts = element.evaluate('''
+ (node) => Array.from(node.childNodes)
+ .filter(n => n.nodeType === Node.TEXT_NODE)
+ .map(n => n.textContent.trim())
+ .filter(Boolean)
+ ''')
+ results.append(texts)
+ # Safety check: return empty list if no elements found
+ return results[0] if results else []
+ except Exception as e:
+ logger.warning(f"[DEBUG] Error getting direct text nodes for selector '{selector}': {e}")
+ return []
def safely_get_direct_li_playwright(selector):
- elements = target_page.query_selector_all(selector + " li.catAllProducts")
- return [element.query_selector('span').inner_text().strip() for element in elements if element.query_selector('span')]
+ try:
+ if target_page.is_closed():
+ logger.warning(f"[DEBUG] Target page is closed, cannot get li elements for selector: {selector}")
+ return []
+ elements = target_page.query_selector_all(selector + " li.catAllProducts")
+ return [element.query_selector('span').inner_text().strip() for element in elements if element.query_selector('span')]
+ except Exception as e:
+ logger.warning(f"[DEBUG] Error getting li elements for selector '{selector}': {e}")
+ return []
def safely_get_only_child_text_content(selector):
- elements = target_page.query_selector_all(selector)
- return [element.query_selector('h3').text_content().strip() for element in elements if element.query_selector('h3')]
+ try:
+ if target_page.is_closed():
+ logger.warning(f"[DEBUG] Target page is closed, cannot get child text content for selector: {selector}")
+ return []
+ elements = target_page.query_selector_all(selector)
+ return [element.query_selector('h3').text_content().strip() for element in elements if element.query_selector('h3')]
+ except Exception as e:
+ logger.warning(f"[DEBUG] Error getting child text content for selector '{selector}': {e}")
+ return []
if config["category"] == "class":
class_multiObject = config.get("class_multiObject", {})
@@ -1649,9 +1708,21 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
# Assuming get_by_label is a custom function or part of the framework being used
labelObject = config.get("labelObject", {})
for labelSelector, key in labelObject.items():
- text = target_page.locator(f"text={labelSelector}").first.text_content().strip()
- if text:
- return_json[key] = text
+ try:
+ if target_page.is_closed():
+ logger.warning(f"[DEBUG] Target page is closed, cannot process label for key '{key}'")
+ return_json[key] = ""
+ continue
+
+ text = target_page.locator(f"text={labelSelector}").first.text_content()
+ if text:
+ text = text.strip()
+ return_json[key] = text
+ else:
+ return_json[key] = ""
+ except Exception as e:
+ logger.error(f"[DEBUG] Error processing label for key '{key}': {e}")
+ return_json[key] = ""
elif config["category"] == "xpath":
xpathObject = config.get("xpathObject", {})
@@ -1659,70 +1730,94 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
for xpath, key in xpathObject.items():
logger.info(f"[DEBUG] Processing xpath: {xpath} -> key: {key}")
- elements = target_page.locator(f"xpath={xpath}")
- element_count = elements.count()
- logger.info(f"[DEBUG] Found {element_count} elements for xpath: {xpath}")
- if element_count > 0:
- try:
- text_content = elements.first.text_content()
- if text_content is not None:
- text_content = text_content.strip()
- logger.info(f"[DEBUG] Raw text content for key '{key}': '{text_content}' (type: {type(text_content)})")
+ # Check if page is still valid
+ try:
+ if target_page.is_closed():
+ logger.warning(f"[DEBUG] Target page is closed, cannot process xpath for key '{key}'")
+ return_json[key] = ""
+ continue
- # 处理空文本内容的情况
- if text_content is None or text_content == "":
- logger.warning(f"[DEBUG] Element found but text content is empty for key '{key}' xpath: {xpath}")
- # 尝试获取更多信息
- element_html = elements.first.inner_html()
- element_text = elements.first.inner_text()
- logger.info(f"[DEBUG] Element innerHTML: '{element_html[:100]}...' innerText: '{element_text}'")
-
- return_json[key] = text_content if text_content else ""
- logger.info(f"[DEBUG] Final value for key '{key}': '{return_json[key]}'")
- except Exception as e:
- logger.error(f"[DEBUG] Error extracting text from element for key '{key}': {e}")
- return_json[key] = ""
- else:
- logger.warning(f"[DEBUG] No elements found for xpath: {xpath}")
- # 尝试一些备用的xpath查找方法
- try:
- # 尝试不使用xpath前缀
- fallback_elements = target_page.locator(xpath)
- fallback_count = fallback_elements.count()
- logger.info(f"[DEBUG] Fallback search (without xpath prefix) found {fallback_count} elements")
- if fallback_count > 0:
- text_content = fallback_elements.first.text_content()
- if text_content:
+ elements = target_page.locator(f"xpath={xpath}")
+ element_count = elements.count()
+ logger.info(f"[DEBUG] Found {element_count} elements for xpath: {xpath}")
+
+ if element_count > 0:
+ try:
+ text_content = elements.first.text_content()
+ if text_content is not None:
text_content = text_content.strip()
+ logger.info(f"[DEBUG] Raw text content for key '{key}': '{text_content}' (type: {type(text_content)})")
+
+ # 处理空文本内容的情况
+ if text_content is None or text_content == "":
+ logger.warning(f"[DEBUG] Element found but text content is empty for key '{key}' xpath: {xpath}")
+ # 尝试获取更多信息
+ try:
+ element_html = elements.first.inner_html()
+ element_text = elements.first.inner_text()
+ logger.info(f"[DEBUG] Element innerHTML: '{element_html[:100]}...' innerText: '{element_text}'")
+ except Exception as inner_e:
+ logger.debug(f"[DEBUG] Failed to get inner content: {inner_e}")
+
return_json[key] = text_content if text_content else ""
- logger.info(f"[DEBUG] Fallback extraction successful for key '{key}': '{return_json[key]}'")
- else:
+ logger.info(f"[DEBUG] Final value for key '{key}': '{return_json[key]}'")
+ except Exception as e:
+ logger.error(f"[DEBUG] Error extracting text from element for key '{key}': {e}")
return_json[key] = ""
- except Exception as e:
- logger.info(f"[DEBUG] Fallback xpath search also failed: {e}")
- return_json[key] = ""
+ else:
+ logger.warning(f"[DEBUG] No elements found for xpath: {xpath}")
+ # 尝试一些备用的xpath查找方法
+ try:
+ # 尝试不使用xpath前缀
+ fallback_elements = target_page.locator(xpath)
+ fallback_count = fallback_elements.count()
+ logger.info(f"[DEBUG] Fallback search (without xpath prefix) found {fallback_count} elements")
+ if fallback_count > 0:
+ text_content = fallback_elements.first.text_content()
+ if text_content:
+ text_content = text_content.strip()
+ return_json[key] = text_content if text_content else ""
+ logger.info(f"[DEBUG] Fallback extraction successful for key '{key}': '{return_json[key]}'")
+ else:
+ return_json[key] = ""
+ except Exception as e:
+ logger.info(f"[DEBUG] Fallback xpath search also failed: {e}")
+ return_json[key] = ""
+
+ except Exception as e:
+ logger.error(f"[DEBUG] Error processing xpath for key '{key}': {e}")
+ return_json[key] = ""
elif config["category"] == "input":
inputObjects = config.get("inputObject", {})
logger.info(f"[DEBUG] Processing input category with inputObjects: {inputObjects}")
for xpath, key in inputObjects.items():
logger.info(f"[DEBUG] Processing input xpath: {xpath} -> key: {key}")
- inputs = target_page.locator(f"xpath={xpath}")
- input_count = inputs.count()
- logger.info(f"[DEBUG] Found {input_count} input elements for xpath: {xpath}")
- if input_count > 0:
- try:
- input_value = inputs.first.input_value()
- if input_value:
- input_value = input_value.strip()
- return_json[key] = input_value if input_value else ""
- logger.info(f"[DEBUG] Input value for key '{key}': '{return_json[key]}'")
- except Exception as e:
- logger.error(f"[DEBUG] Error getting input value for key '{key}': {e}")
+ try:
+ if target_page.is_closed():
+ logger.warning(f"[DEBUG] Target page is closed, cannot process input for key '{key}'")
return_json[key] = ""
- else:
- logger.warning(f"[DEBUG] No input elements found for xpath: {xpath}")
+ continue
+
+ inputs = target_page.locator(f"xpath={xpath}")
+ input_count = inputs.count()
+ logger.info(f"[DEBUG] Found {input_count} input elements for xpath: {xpath}")
+ if input_count > 0:
+ try:
+ input_value = inputs.first.input_value()
+ if input_value:
+ input_value = input_value.strip()
+ return_json[key] = input_value if input_value else ""
+ logger.info(f"[DEBUG] Input value for key '{key}': '{return_json[key]}'")
+ except Exception as e:
+ logger.error(f"[DEBUG] Error getting input value for key '{key}': {e}")
+ return_json[key] = ""
+ else:
+ logger.warning(f"[DEBUG] No input elements found for xpath: {xpath}")
+ return_json[key] = ""
+ except Exception as e:
+ logger.error(f"[DEBUG] Error processing input for key '{key}': {e}")
return_json[key] = ""
elif config["category"] == "class&url":
@@ -1758,19 +1853,43 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
url_include_expected = config.get("url_include_expected", [])
for key in url_include_expected:
- if key.lower() in target_page.url.lower():
- if key.lower() not in return_json.keys():
- return_json[key.lower()] = True
- else:
+ try:
+ if target_page.is_closed():
+ logger.warning(f"[DEBUG] Target page is closed, cannot check URL for key '{key}'")
+ if key.lower() not in return_json.keys():
+ return_json[key.lower()] = False
+ continue
+
+ page_url = target_page.url.lower()
+ if key.lower() in page_url:
+ if key.lower() not in return_json.keys():
+ return_json[key.lower()] = True
+ else:
+ if key.lower() not in return_json.keys():
+ return_json[key.lower()] = False
+ except Exception as e:
+ logger.error(f"[DEBUG] Error checking URL for key '{key}': {e}")
if key.lower() not in return_json.keys():
return_json[key.lower()] = False
url_include_expected_multichoice = config.get("url_include_expected_multichoice", {})
for key, value in url_include_expected_multichoice.items():
- if key.lower() in target_page.url.lower():
- if value.lower() not in return_json.keys():
- return_json[value.lower()] = True
- else:
+ try:
+ if target_page.is_closed():
+ logger.warning(f"[DEBUG] Target page is closed, cannot check URL for multichoice key '{key}'")
+ if value.lower() not in return_json.keys():
+ return_json[value.lower()] = False
+ continue
+
+ page_url = target_page.url.lower()
+ if key.lower() in page_url:
+ if value.lower() not in return_json.keys():
+ return_json[value.lower()] = True
+ else:
+ if value.lower() not in return_json.keys():
+ return_json[value.lower()] = False
+ except Exception as e:
+ logger.error(f"[DEBUG] Error checking URL for multichoice key '{key}': {e}")
if value.lower() not in return_json.keys():
return_json[value.lower()] = False
@@ -1799,101 +1918,307 @@ def get_gotoRecreationPage_and_get_html_content(env, config: Dict[str, Any]):
"""
especially used for www.recreation.gov examples
"""
+ # Add logging for debugging
+ logger.info(f"[RECREATION_PAGE] Starting recreation.gov page processing")
+ logger.debug(f"[RECREATION_PAGE] Config: {config}")
+
host = env.vm_ip
port = env.chromium_port # fixme: this port is hard-coded, need to be changed from config file
server_port = env.server_port
+ use_proxy = env.current_use_proxy
remote_debugging_url = f"http://{host}:{port}"
- with sync_playwright() as p:
+ backend_url = f"http://{host}:{server_port}"
+
+ # Configuration for retry and timeout
+ max_retries = 3
+ timeout_ms = 60000 # Increase timeout to 60 seconds
+
+ # Test basic connectivity first
+ logger.info(f"[RECREATION_PAGE] Testing basic network connectivity...")
+ try:
+ import urllib.request
+ test_response = urllib.request.urlopen('http://www.google.com', timeout=10)
+ logger.info(f"[RECREATION_PAGE] Basic connectivity test passed (Google accessible)")
+ except Exception as e:
+ logger.warning(f"[RECREATION_PAGE] Basic connectivity test failed: {e}")
+ logger.info(f"[RECREATION_PAGE] Proceeding anyway...")
+
+ for attempt in range(max_retries):
try:
- browser = p.chromium.connect_over_cdp(remote_debugging_url)
- except Exception as e:
- # If the connection fails, start a new browser instance
- platform.machine()
- if "arm" in platform.machine():
- # start a new browser instance if the connection fails
- payload = json.dumps({"command": [
- "chromium",
- "--remote-debugging-port=1337"
- ], "shell": False})
- else:
- payload = json.dumps({"command": [
- "google-chrome",
- "--remote-debugging-port=1337"
- ], "shell": False})
-
- headers = {"Content-Type": "application/json"}
- requests.post("http://" + host + ":" + server_port + "/setup" + "/launch", headers=headers, data=payload)
- time.sleep(5)
- browser = p.chromium.connect_over_cdp(remote_debugging_url)
- page = browser.new_page()
- page.goto("https://www.recreation.gov/")
- page.fill("input#hero-search-input", "Diamond")
- page.click("button.nav-search-button")
- print("after first click")
- time.sleep(10)
- # Assuming .search-result-highlight--success leads to a new page or requires page load
- with page.expect_popup() as popup_info:
- page.click(".search-result-highlight--success")
- time.sleep(30)
- print("after second click")
- newpage = popup_info.value
- newpage.wait_for_load_state()
- print("go to newpage: ")
- print(newpage.title())
- time.sleep(2)
-
- # Try to click the button with better error handling and longer timeout
- try:
- # Wait for the button to be available with a longer timeout
- newpage.wait_for_selector("button.next-available", timeout=60000)
- newpage.click("button.next-available", timeout=60000)
- print("after third click")
- except Exception as e:
- logger.error(f"Failed to click 'next-available' button: {e}")
- # Try alternative selectors if the main one fails
- try:
- newpage.wait_for_selector("button[class*='next']", timeout=30000)
- newpage.click("button[class*='next']", timeout=30000)
- print("after third click (alternative selector)")
- except Exception as e2:
- logger.error(f"Alternative selector also failed: {e2}")
- # Continue execution even if button click fails
- print("Continuing without clicking next-available button")
-
- return_json = {}
- return_json["expected"] = {}
- # find the text of elements in html with specific class name
- if config["selector"] == "class":
- if "order" in config.keys():
- className = config["class"]
+ logger.info(f"[RECREATION_PAGE] Attempt {attempt + 1}/{max_retries}")
+
+ with sync_playwright() as p:
+ # Connect to remote Chrome instance
try:
- elements = newpage.query_selector_all("." + className)
- order_index = int(config["order"])
- if len(elements) > order_index:
- return_json["expected"][className] = elements[order_index].text_content().strip()
- else:
- logger.warning(f"Element with class '{className}' at index {order_index} not found. Found {len(elements)} elements.")
- # For expected values, if we can't find the element, the evaluation cannot proceed
- # Return a structure that indicates failure to get expected value
- return_json["expected"][className] = "__EVALUATION_FAILED__"
+ browser = p.chromium.connect_over_cdp(remote_debugging_url)
+ logger.info(f"[RECREATION_PAGE] Successfully connected to existing Chrome instance")
except Exception as e:
- logger.error(f"Error accessing element with class '{className}': {e}")
- return_json["expected"][className] = "__EVALUATION_FAILED__"
- else:
- className = config["class"]
+ logger.warning(f"[RECREATION_PAGE] Failed to connect to existing Chrome instance: {e}")
+ logger.info(f"[RECREATION_PAGE] Starting new Chrome instance with enhanced options...")
+
+ # If the connection fails, start a new browser instance with better options
+ app = 'chromium' if 'arm' in platform.machine() else 'google-chrome'
+ command = [
+ app,
+ "--remote-debugging-port=1337",
+ "--no-sandbox",
+ "--disable-web-security",
+ "--disable-features=VizDisplayCompositor",
+ "--disable-dev-shm-usage",
+ "--user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
+ ]
+
+ if use_proxy:
+ command.append(f"--proxy-server=127.0.0.1:18888")
+ logger.info(f"[RECREATION_PAGE] Using proxy server: 127.0.0.1:18888")
+
+ logger.info(f"[RECREATION_PAGE] Starting browser with command: {' '.join(command)}")
+ payload = json.dumps({"command": command, "shell": False})
+ headers = {"Content-Type": "application/json"}
+ requests.post(backend_url + "/setup/launch", headers=headers, data=payload)
+ time.sleep(8) # Give more time for browser to start
+ browser = p.chromium.connect_over_cdp(remote_debugging_url)
+ logger.info(f"[RECREATION_PAGE] Successfully connected to new Chrome instance")
+
+ page = browser.new_page()
+
+ # Set longer timeout for all operations
+ page.set_default_timeout(timeout_ms)
+
+ # Set additional headers to appear more like a real browser
+ page.set_extra_http_headers({
+ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
+ 'Accept-Language': 'en-US,en;q=0.9',
+ 'Accept-Encoding': 'gzip, deflate, br',
+ 'DNT': '1',
+ 'Connection': 'keep-alive',
+ 'Upgrade-Insecure-Requests': '1',
+ })
+
+ # Try multiple URLs to test connectivity
+ test_urls = [
+ "https://www.recreation.gov/",
+ "http://www.recreation.gov/",
+ "https://recreation.gov/",
+ ]
+
+ successful_url = None
+ for test_url in test_urls:
+ try:
+ # Step 1: Navigate to recreation.gov with better error handling
+ logger.info(f"[RECREATION_PAGE] Trying to navigate to: {test_url}")
+
+ # Try different wait strategies
+ wait_strategies = ['domcontentloaded', 'load', 'networkidle']
+
+ for wait_until in wait_strategies:
+ try:
+ logger.debug(f"[RECREATION_PAGE] Trying wait strategy: {wait_until}")
+ page.goto(test_url, wait_until=wait_until, timeout=timeout_ms)
+ logger.info(f"[RECREATION_PAGE] Successfully loaded {test_url} with strategy {wait_until}")
+ logger.info(f"[RECREATION_PAGE] Page title: '{page.title()}'")
+ logger.info(f"[RECREATION_PAGE] Current URL: '{page.url}'")
+ successful_url = test_url
+ break
+ except Exception as strategy_error:
+ logger.debug(f"[RECREATION_PAGE] Wait strategy {wait_until} failed: {strategy_error}")
+ continue
+
+ if successful_url:
+ break
+
+ except Exception as url_error:
+ logger.warning(f"[RECREATION_PAGE] Failed to navigate to {test_url}: {url_error}")
+ continue
+
+ if not successful_url:
+ raise Exception("Failed to navigate to any recreation.gov URL variant")
+
+ # Additional wait to ensure page is fully loaded
try:
- element = newpage.query_selector("." + className)
- if element:
- return_json["expected"][className] = element.text_content().strip()
- else:
- logger.warning(f"Element with class '{className}' not found.")
- return_json["expected"][className] = "__EVALUATION_FAILED__"
+ page.wait_for_load_state('networkidle', timeout=30000)
+ logger.info(f"[RECREATION_PAGE] Page fully loaded and idle")
except Exception as e:
- logger.error(f"Error accessing element with class '{className}': {e}")
- return_json["expected"][className] = "__EVALUATION_FAILED__"
- browser.close()
- return return_json
+ logger.warning(f"[RECREATION_PAGE] NetworkIdle wait failed, continuing: {e}")
+
+ try:
+ # Step 2: Fill search input
+ logger.info(f"[RECREATION_PAGE] Filling search input with 'Diamond'...")
+ page.wait_for_selector("input#hero-search-input", state='visible', timeout=timeout_ms)
+ page.fill("input#hero-search-input", "Diamond")
+ logger.info(f"[RECREATION_PAGE] Successfully filled search input")
+
+ except Exception as e:
+ logger.error(f"[RECREATION_PAGE] Failed to fill search input: {e}")
+ raise e
+
+ try:
+ # Step 3: Click search button
+ logger.info(f"[RECREATION_PAGE] Clicking search button...")
+ page.wait_for_selector("button.nav-search-button", state='visible', timeout=timeout_ms)
+ page.click("button.nav-search-button")
+ logger.info(f"[RECREATION_PAGE] Successfully clicked search button")
+ print("after first click")
+
+ # Wait for search results to load
+ time.sleep(10)
+
+ except Exception as e:
+ logger.error(f"[RECREATION_PAGE] Failed to click search button: {e}")
+ raise e
+
+ try:
+ # Step 4: Click on search result
+ logger.info(f"[RECREATION_PAGE] Waiting for and clicking search result...")
+ page.wait_for_selector(".search-result-highlight--success", state='visible', timeout=timeout_ms)
+
+ with page.expect_popup() as popup_info:
+ page.click(".search-result-highlight--success")
+
+ time.sleep(30) # Wait for popup to fully load
+ print("after second click")
+ logger.info(f"[RECREATION_PAGE] Successfully clicked search result")
+
+ except Exception as e:
+ logger.error(f"[RECREATION_PAGE] Failed to click search result: {e}")
+ raise e
+
+ try:
+ # Step 5: Handle new page
+ newpage = popup_info.value
+ newpage.set_default_timeout(timeout_ms)
+ newpage.wait_for_load_state('networkidle', timeout=timeout_ms)
+
+ page_title = newpage.title()
+ logger.info(f"[RECREATION_PAGE] New page loaded successfully")
+ logger.info(f"[RECREATION_PAGE] New page title: '{page_title}'")
+ logger.info(f"[RECREATION_PAGE] New page URL: '{newpage.url}'")
+ print(f"go to newpage: {page_title}")
+ time.sleep(2)
+
+ except Exception as e:
+ logger.error(f"[RECREATION_PAGE] Failed to handle new page: {e}")
+ raise e
+
+ try:
+ # Step 6: Click next-available button with better error handling
+ logger.info(f"[RECREATION_PAGE] Looking for next-available button...")
+
+ # Try multiple selectors for the button
+ selectors_to_try = [
+ "button.next-available",
+ "button[class*='next-available']",
+ "button[class*='next']",
+ ".next-available",
+ "[data-testid*='next']"
+ ]
+
+ button_clicked = False
+ for selector in selectors_to_try:
+ try:
+ logger.debug(f"[RECREATION_PAGE] Trying selector: {selector}")
+ newpage.wait_for_selector(selector, state='visible', timeout=30000)
+ newpage.click(selector, timeout=30000)
+ logger.info(f"[RECREATION_PAGE] Successfully clicked next-available button with selector: {selector}")
+ print("after third click")
+ button_clicked = True
+ break
+ except Exception as selector_error:
+ logger.debug(f"[RECREATION_PAGE] Selector '{selector}' failed: {selector_error}")
+ continue
+
+ if not button_clicked:
+ logger.warning(f"[RECREATION_PAGE] Could not find or click next-available button with any selector")
+ logger.info(f"[RECREATION_PAGE] Continuing without clicking next-available button")
+ print("Continuing without clicking next-available button")
+
+ except Exception as e:
+ logger.warning(f"[RECREATION_PAGE] Error with next-available button: {e}")
+ logger.info(f"[RECREATION_PAGE] Continuing execution despite button click failure")
+ print("Continuing without clicking next-available button")
+
+ # Step 7: Extract content based on config
+ return_json = {}
+ return_json["expected"] = {}
+
+ try:
+ logger.info(f"[RECREATION_PAGE] Extracting content based on config...")
+
+ if config["selector"] == "class":
+ className = config["class"]
+ logger.debug(f"[RECREATION_PAGE] Looking for elements with class: {className}")
+
+ if "order" in config.keys():
+ try:
+ elements = newpage.query_selector_all("." + className)
+ order_index = int(config["order"])
+ logger.info(f"[RECREATION_PAGE] Found {len(elements)} elements with class '{className}', looking for index {order_index}")
+
+ if len(elements) > order_index:
+ text_content = elements[order_index].text_content()
+ if text_content:
+ text_content = text_content.strip()
+ return_json["expected"][className] = text_content
+ logger.info(f"[RECREATION_PAGE] Successfully extracted text from element at index {order_index}: '{text_content}'")
+ else:
+ logger.warning(f"[RECREATION_PAGE] Element with class '{className}' at index {order_index} not found. Found {len(elements)} elements.")
+ return_json["expected"][className] = "__EVALUATION_FAILED__"
+ except Exception as e:
+ logger.error(f"[RECREATION_PAGE] Error accessing element with class '{className}' at specific order: {e}")
+ return_json["expected"][className] = "__EVALUATION_FAILED__"
+ else:
+ try:
+ element = newpage.query_selector("." + className)
+ if element:
+ text_content = element.text_content()
+ if text_content:
+ text_content = text_content.strip()
+ return_json["expected"][className] = text_content
+ logger.info(f"[RECREATION_PAGE] Successfully extracted text from first element: '{text_content}'")
+ else:
+ logger.warning(f"[RECREATION_PAGE] Element with class '{className}' not found.")
+ return_json["expected"][className] = "__EVALUATION_FAILED__"
+ except Exception as e:
+ logger.error(f"[RECREATION_PAGE] Error accessing element with class '{className}': {e}")
+ return_json["expected"][className] = "__EVALUATION_FAILED__"
+
+ logger.info(f"[RECREATION_PAGE] Content extraction completed successfully")
+ logger.info(f"[RECREATION_PAGE] Final result: {return_json}")
+
+ except Exception as e:
+ logger.error(f"[RECREATION_PAGE] Error during content extraction: {e}")
+ # Return a structure indicating extraction failed
+ return_json["expected"] = {"extraction_error": "__EVALUATION_FAILED__"}
+
+ browser.close()
+ return return_json
+
+ except Exception as e:
+ logger.error(f"[RECREATION_PAGE] Attempt {attempt + 1} failed: {str(e)}")
+ logger.error(f"[RECREATION_PAGE] Exception type: {type(e).__name__}")
+
+ if attempt < max_retries - 1:
+ logger.info(f"[RECREATION_PAGE] Retrying in 5 seconds...")
+ time.sleep(5)
+ else:
+ logger.error(f"[RECREATION_PAGE] All {max_retries} attempts failed. Returning error result.")
+ # Return a structure that indicates complete failure
+ return {
+ "expected": {
+ "error": "__EVALUATION_FAILED__",
+ "error_message": f"Failed after {max_retries} attempts: {str(e)}"
+ }
+ }
+
+ # This should never be reached, but just in case
+ logger.error(f"[RECREATION_PAGE] Unexpected code path reached")
+ return {
+ "expected": {
+ "error": "__EVALUATION_FAILED__",
+ "error_message": "Unexpected error in function execution"
+ }
+ }
def get_active_tab_url_parse(env, config: Dict[str, Any]):
diff --git a/desktop_env/evaluators/metrics/chrome.py b/desktop_env/evaluators/metrics/chrome.py
index 6c3811f..bc8467c 100644
--- a/desktop_env/evaluators/metrics/chrome.py
+++ b/desktop_env/evaluators/metrics/chrome.py
@@ -21,6 +21,30 @@ def is_expected_active_tab(active_tab_info: Dict[str, str], rule: Dict[str, Any]
if not active_tab_info:
return 0.
+ # 添加重试机制
+ max_retries = 3
+ retry_delay = 2 # seconds
+
+ for attempt in range(max_retries):
+ # 添加HTTP状态码检查
+ if 'status' in active_tab_info and active_tab_info['status'] >= 400:
+ logger.warning(f"Page load failed (attempt {attempt+1}/{max_retries}), HTTP status: {active_tab_info['status']}")
+
+ if attempt < max_retries - 1:
+ # 重试前刷新页面
+ logger.info(f"Refreshing page and retrying in {retry_delay} seconds...")
+ time.sleep(retry_delay)
+ # 这里需要调用刷新页面的函数(实际实现取决于您的环境)
+ # 伪代码: refresh_active_tab()
+ # 然后重新获取 active_tab_info
+ # 伪代码: active_tab_info = get_active_tab_info()
+ continue
+ else:
+ logger.error(f"Page load failed after {max_retries} attempts")
+ return 0.
+
+ break # 如果状态码正常,跳出重试循环
+
match_type = rule['type']
if match_type == "url":
@@ -44,6 +68,26 @@ def is_expected_active_tab_approximate(active_tab_info: Dict[str, str], rule: Di
if not active_tab_info:
return 0.
+ # 添加相同的重试机制
+ max_retries = 3
+ retry_delay = 2 # seconds
+
+ for attempt in range(max_retries):
+ if 'status' in active_tab_info and active_tab_info['status'] >= 400:
+ logger.warning(f"Page load failed (attempt {attempt+1}/{max_retries}), HTTP status: {active_tab_info['status']}")
+
+ if attempt < max_retries - 1:
+ logger.info(f"Refreshing page and retrying in {retry_delay} seconds...")
+ time.sleep(retry_delay)
+ # 伪代码: refresh_active_tab()
+ # 伪代码: active_tab_info = get_active_tab_info()
+ continue
+ else:
+ logger.error(f"Page load failed after {max_retries} attempts")
+ return 0.
+
+ break
+
match_type = rule['type']
if match_type == "url":
@@ -74,11 +118,25 @@ def is_expected_url_pattern_match(result, rules) -> float:
if not result:
return 0.
- if type(result) == dict:
- result_url = result["url"]
- logger.info("result url: {}".format(result_url))
- else:
- result_url = result
+ # 添加相同的重试机制
+ max_retries = 3
+ retry_delay = 2 # seconds
+
+ for attempt in range(max_retries):
+ if isinstance(result, dict) and 'status' in result and result['status'] >= 400:
+ logger.warning(f"Page load failed (attempt {attempt+1}/{max_retries}), HTTP status: {result['status']}")
+
+ if attempt < max_retries - 1:
+ logger.info(f"Refreshing page and retrying in {retry_delay} seconds...")
+ time.sleep(retry_delay)
+ # 伪代码: refresh_active_tab()
+ # 伪代码: result = get_active_tab_info()
+ continue
+ else:
+ logger.error(f"Page load failed after {max_retries} attempts")
+ return 0.
+
+ break
# expect_regex = re.compile(rules["expected"])
patterns = rules["expected"]
logger.info("expected_regex: {}".format(patterns))
@@ -220,6 +278,7 @@ import imagehash
from pathlib import Path
import typing
+import time
def compare_pdf_images(pdf1_path: str, pdf2_path: str, **kwargs) -> float: