Merge remote-tracking branch 'upstream/main'

This commit is contained in:
yuanmengqi
2025-07-13 07:05:54 +00:00
11 changed files with 226 additions and 58 deletions

View File

@@ -32,7 +32,7 @@ class DesktopEnv(gym.Env):
snapshot_name: str = "init_state",
action_space: str = "computer_13",
cache_dir: str = "cache",
screen_size: Tuple[int] = (1920, 1080),
screen_size: Tuple[int] = (int(os.environ.get("SCREEN_WIDTH", 1920)), int(os.environ.get("SCREEN_HEIGHT", 1080))),
headless: bool = False,
require_a11y_tree: bool = True,
require_terminal: bool = False,

View File

@@ -52,6 +52,11 @@ def get_info_from_website(env, config: Dict[Any, Any]) -> Any:
- attribute (str): optional for 'attribute' and 'click_and_attribute', the attribute to be extracted.
- backups (Any): The backup information to be returned if the extraction fails.
"""
# 添加函数开始日志
logger.info(f"[INFO_FROM_WEBSITE] Starting to get information from website: {config.get('url', 'N/A')}")
logger.info(f"[INFO_FROM_WEBSITE] Total info operations to perform: {len(config.get('infos', []))}")
logger.debug(f"[INFO_FROM_WEBSITE] Full config: {config}")
try:
host = env.vm_ip
port = env.chromium_port # fixme: this port is hard-coded, need to be changed from config file
@@ -59,11 +64,18 @@ def get_info_from_website(env, config: Dict[Any, Any]) -> Any:
remote_debugging_url = f"http://{host}:{port}"
backend_url = f"http://{host}:{server_port}"
use_proxy = env.current_use_proxy
logger.info(f"[INFO_FROM_WEBSITE] Connecting to Chrome at {remote_debugging_url}")
with sync_playwright() as p:
# connect to remote Chrome instance
try:
browser = p.chromium.connect_over_cdp(remote_debugging_url)
logger.info(f"[INFO_FROM_WEBSITE] Successfully connected to existing Chrome instance")
except Exception as e:
logger.warning(f"[INFO_FROM_WEBSITE] Failed to connect to existing Chrome instance: {e}")
logger.info(f"[INFO_FROM_WEBSITE] Starting new Chrome instance...")
# If the connection fails (e.g., the agent close the browser instance), start a new browser instance
app = 'chromium' if 'arm' in platform.machine() else 'google-chrome'
command = [
@@ -72,52 +84,116 @@ def get_info_from_website(env, config: Dict[Any, Any]) -> Any:
]
if use_proxy:
command.append(f"--proxy-server=127.0.0.1:18888")
logger.info(f"[INFO_FROM_WEBSITE] Using proxy server: 127.0.0.1:18888")
logger.info(f"[INFO_FROM_WEBSITE] Starting browser with command: {' '.join(command)}")
payload = json.dumps({"command": command, "shell": False})
headers = {"Content-Type": "application/json"}
#requests.post("http://" + host + ":" + server_port + "/setup" + "/launch", headers=headers, data=payload)
requests.post(backend_url + "/setup" + "/launch", headers=headers, data=payload)
time.sleep(5)
browser = p.chromium.connect_over_cdp(remote_debugging_url)
logger.info(f"[INFO_FROM_WEBSITE] Successfully connected to new Chrome instance")
page = browser.contexts[0].new_page()
logger.info(f"[INFO_FROM_WEBSITE] Created new page, navigating to: {config['url']}")
page.goto(config["url"])
page.wait_for_load_state('load')
# 记录页面加载完成后的信息
logger.info(f"[INFO_FROM_WEBSITE] Page loaded successfully")
logger.info(f"[INFO_FROM_WEBSITE] Page title: '{page.title()}'")
logger.info(f"[INFO_FROM_WEBSITE] Current URL: '{page.url}'")
infos = []
for info_dict in config.get('infos', []):
for idx, info_dict in enumerate(config.get('infos', [])):
logger.info(f"[INFO_FROM_WEBSITE] Processing info operation {idx + 1}/{len(config.get('infos', []))}")
logger.debug(f"[INFO_FROM_WEBSITE] Info config: {info_dict}")
if page.url != config["url"]:
logger.info(f"[INFO_FROM_WEBSITE] Page URL changed, navigating back to: {config['url']}")
page.goto(config["url"])
page.wait_for_load_state('load')
logger.info(f"[INFO_FROM_WEBSITE] Back to original page")
action = info_dict.get('action', 'inner_text')
selector = info_dict.get('selector')
logger.info(f"[INFO_FROM_WEBSITE] Action: {action}, Selector: {selector}")
if action == "inner_text":
logger.debug(f"[INFO_FROM_WEBSITE] Waiting for element with selector: {selector}")
ele = page.wait_for_selector(info_dict['selector'], state='attached', timeout=10000)
infos.append(ele.inner_text())
extracted_text = ele.inner_text()
logger.info(f"[INFO_FROM_WEBSITE] Successfully extracted inner_text: '{extracted_text}'")
infos.append(extracted_text)
elif action == "attribute":
attribute = info_dict.get('attribute')
logger.debug(f"[INFO_FROM_WEBSITE] Waiting for element with selector: {selector}")
logger.debug(f"[INFO_FROM_WEBSITE] Extracting attribute: {attribute}")
ele = page.wait_for_selector(info_dict['selector'], state='attached', timeout=10000)
infos.append(ele.get_attribute(info_dict['attribute']))
extracted_attr = ele.get_attribute(info_dict['attribute'])
logger.info(f"[INFO_FROM_WEBSITE] Successfully extracted attribute '{attribute}': '{extracted_attr}'")
infos.append(extracted_attr)
elif action == 'click_and_inner_text':
logger.debug(f"[INFO_FROM_WEBSITE] Performing click_and_inner_text with {len(info_dict['selector'])} selectors")
for idx, sel in enumerate(info_dict['selector']):
logger.debug(f"[INFO_FROM_WEBSITE] Processing selector {idx + 1}/{len(info_dict['selector'])}: {sel}")
if idx != len(info_dict['selector']) - 1:
logger.debug(f"[INFO_FROM_WEBSITE] Clicking element with selector: {sel}")
link = page.wait_for_selector(sel, state='attached', timeout=10000)
link.click()
page.wait_for_load_state('load')
logger.info(f"[INFO_FROM_WEBSITE] Successfully clicked element, page loaded")
logger.debug(f"[INFO_FROM_WEBSITE] New page URL: {page.url}")
else:
logger.debug(f"[INFO_FROM_WEBSITE] Extracting inner_text from final element: {sel}")
ele = page.wait_for_selector(sel, state='attached', timeout=10000)
infos.append(ele.inner_text())
extracted_text = ele.inner_text()
logger.info(f"[INFO_FROM_WEBSITE] Successfully extracted inner_text after clicks: '{extracted_text}'")
infos.append(extracted_text)
elif action == 'click_and_attribute':
attribute = info_dict.get('attribute')
logger.debug(f"[INFO_FROM_WEBSITE] Performing click_and_attribute with {len(info_dict['selector'])} selectors")
logger.debug(f"[INFO_FROM_WEBSITE] Target attribute: {attribute}")
for idx, sel in enumerate(info_dict['selector']):
logger.debug(f"[INFO_FROM_WEBSITE] Processing selector {idx + 1}/{len(info_dict['selector'])}: {sel}")
if idx != len(info_dict['selector']) - 1:
logger.debug(f"[INFO_FROM_WEBSITE] Clicking element with selector: {sel}")
link = page.wait_for_selector(sel, state='attached', timeout=10000)
link.click()
page.wait_for_load_state('load')
logger.info(f"[INFO_FROM_WEBSITE] Successfully clicked element, page loaded")
logger.debug(f"[INFO_FROM_WEBSITE] New page URL: {page.url}")
else:
logger.debug(f"[INFO_FROM_WEBSITE] Extracting attribute from final element: {sel}")
ele = page.wait_for_selector(sel, state='attached')
infos.append(ele.get_attribute(info_dict['attribute']))
extracted_attr = ele.get_attribute(info_dict['attribute'])
logger.info(f"[INFO_FROM_WEBSITE] Successfully extracted attribute '{attribute}' after clicks: '{extracted_attr}'")
infos.append(extracted_attr)
else:
logger.error(f"[INFO_FROM_WEBSITE] Unsupported action: {action}")
raise NotImplementedError(f'The action {action} is not supported yet.')
logger.info(f"[INFO_FROM_WEBSITE] Completed info operation {idx + 1}")
# 记录最终提取的所有信息
logger.info(f"[INFO_FROM_WEBSITE] All operations completed successfully")
logger.info(f"[INFO_FROM_WEBSITE] Total extracted information count: {len(infos)}")
logger.info(f"[INFO_FROM_WEBSITE] Final extracted information: {infos}")
return infos
except Exception as e:
logger.error(f'[ERROR]: failed to obtain information from the website: {config["url"]}. Use backup results instead.')
return config.get('backups', None)
logger.error(f'[INFO_FROM_WEBSITE] ERROR: Failed to obtain information from website: {config.get("url", "N/A")}')
logger.error(f'[INFO_FROM_WEBSITE] Exception details: {str(e)}')
logger.error(f'[INFO_FROM_WEBSITE] Exception type: {type(e).__name__}')
logger.info(f'[INFO_FROM_WEBSITE] Using backup results instead')
backup_data = config.get('backups', None)
logger.info(f'[INFO_FROM_WEBSITE] Backup data: {backup_data}')
return backup_data
# The following ones just need to load info from the files of software, no need to connect to the software

View File

@@ -29,8 +29,8 @@ def is_expected_active_tab(active_tab_info: Dict[str, str], rule: Dict[str, Any]
actual_url = active_tab_info.get('url', None)
else:
actual_url = active_tab_info
print("expected_url: {}".format(expected_url))
print("actual_url: {}".format(actual_url))
logger.info("expected_url: {}".format(expected_url))
logger.info("actual_url: {}".format(actual_url))
return 1 if compare_urls(expected_url, actual_url) else 0
else:
logger.error(f"Unknown type: {match_type}")
@@ -76,23 +76,26 @@ def is_expected_url_pattern_match(result, rules) -> float:
if type(result) == dict:
result_url = result["url"]
print("result url: {}".format(result_url))
logger.info("result url: {}".format(result_url))
else:
result_url = result
# expect_regex = re.compile(rules["expected"])
patterns = rules["expected"]
print("expected_regex: {}".format(patterns))
logger.info("expected_regex: {}".format(patterns))
for pattern in patterns:
match = re.search(pattern, result_url)
print(match)
logger.info("match: {}".format(match))
if not match:
return 0.
return 1.
def is_expected_installed_extensions(installed_extensions, expected) -> float:
print("installed_extensions: ")
print(installed_extensions)
if not installed_extensions:
return 0.
logger.info("installed_extensions: ")
logger.info(installed_extensions)
expected_extensions = expected["expected"]
# whether the expected extensions are installed
@@ -109,6 +112,8 @@ def is_expected_tabs(open_tabs: List[Dict[str, str]], rule: Dict[str, Any]) -> f
"""
Checks if the expected tabs are open in Chrome.
"""
if not open_tabs:
return 0.
match_type = rule['type']
@@ -146,8 +151,10 @@ def is_expected_bookmarks(bookmarks: List[str], rule: Dict[str, Any]) -> float:
bookmark['type'] == 'folder' and bookmark['name'] == 'Liked Authors'), None)
if liked_authors_folder:
# Check if it contains the specified URLs
logger.info("'Liked Authors' folder exists")
liked_authors_urls = [bookmark['url'] for bookmark in liked_authors_folder['children'] if
bookmark['type'] == 'url']
logger.info("Here is the 'Liked Authors' folder's urls: {}".format(liked_authors_urls))
urls = rule['urls']
@@ -168,6 +175,9 @@ def is_expected_bookmarks(bookmarks: List[str], rule: Dict[str, Any]) -> float:
def is_expected_search_query(active_tab_info: Dict[str, str], rules: Dict[str, Any]) -> float:
if not active_tab_info:
return 0.
expected = rules['expect']
pattern = expected['pattern']
matched = re.search(pattern, active_tab_info['url'])

View File

@@ -396,7 +396,10 @@ def check_structure_sim_resized(src_path, tgt_path):
# Check if the structure is similar
structure_same = structure_check_by_ssim(img_src_resized, img_tgt)
return structure_same
if structure_same:
return 1.
else:
return 0.
def check_contrast_increase_and_structure_sim(src_path, tgt_path):

View File

@@ -463,23 +463,60 @@ def compare_table(result: str, expected: str = None, **options) -> float:
# }}} function compare_table #
def compare_csv(result: str, expected: str, **options) -> float:
def compare_csv(result: str, expected: Union[str, List[str]], **options) -> float:
"""
Compare CSV files. If expected is a list, returns 1.0 if result matches any of the expected files.
Args:
result: Path to result CSV file
expected: Path to expected CSV file or list of paths to expected CSV files
options: Additional options (strict, ignore_case)
Returns:
1.0 if result matches expected (or any file in expected list), 0.0 otherwise
"""
if result is None:
return 0.
with open(result) as f:
result_lines: Iterable[str] = f.read().splitlines()
with open(expected) as f:
expected_lines: Iterable[str] = f.read().splitlines()
if not options.get("strict", True):
result_lines = map(str.strip, result_lines)
expected_lines = map(str.strip, expected_lines)
if options.get("ignore_case", False):
result_lines = map(str.lower, result_lines)
expected_lines = map(str.lower, expected_lines)
try:
with open(result) as f:
result_lines: Iterable[str] = f.read().splitlines()
except (FileNotFoundError, IOError):
return 0.
metric: bool = list(result_lines) == list(expected_lines)
return float(metric)
# Convert expected to list if it's a single string (for backward compatibility)
if isinstance(expected, str):
expected_files = [expected]
else:
expected_files = expected
# Try to match against each expected file
for expected_file in expected_files:
try:
with open(expected_file) as f:
expected_lines: Iterable[str] = f.read().splitlines()
# Process lines based on options
current_result_lines = result_lines
current_expected_lines = expected_lines
if not options.get("strict", True):
current_result_lines = map(str.strip, current_result_lines)
current_expected_lines = map(str.strip, current_expected_lines)
if options.get("ignore_case", False):
current_result_lines = map(str.lower, current_result_lines)
current_expected_lines = map(str.lower, current_expected_lines)
# Check if this expected file matches
if list(current_result_lines) == list(current_expected_lines):
return 1.0
except (FileNotFoundError, IOError):
# If this expected file doesn't exist, continue to next one
continue
# No match found
return 0.0
def compare_conference_city_in_order(actual_city_list_path, expected_city):