Merge remote-tracking branch 'upstream/main' into fix_chrome

This commit is contained in:
yuanmengqi
2025-07-08 10:45:57 +00:00
93 changed files with 1158 additions and 495 deletions

View File

@@ -177,28 +177,8 @@ class DesktopEnv(gym.Env):
logger.info("Task requires proxy but proxy is disabled at system level, ignoring proxy requirement.")
if task_use_proxy != self.current_use_proxy:
logger.info(f"Task proxy requirement changed: {self.current_use_proxy} -> {task_use_proxy}")
# Close current provider if it exists
if hasattr(self, 'provider') and self.provider:
try:
self.provider.stop_emulator(self.path_to_vm)
except Exception as e:
logger.warning(f"Failed to stop current provider: {e}")
# Create new provider with appropriate proxy setting
# keep because get_info_from_website depend on this
self.current_use_proxy = task_use_proxy
self.manager, self.provider = create_vm_manager_and_provider(
self.provider_name,
self.region,
use_proxy=task_use_proxy
)
if task_use_proxy:
logger.info("Using proxy-enabled AWS provider.")
else:
logger.info("Using regular AWS provider.")
# Only revert to snapshot if environment has been used (step/setup)
# This optimization is especially important for cloud providers like AWS

View File

@@ -26,7 +26,8 @@ from .chrome import (
get_active_url_from_accessTree,
get_find_installed_extension_name,
get_info_from_website,
get_url_path_parse
get_macys_product_url_parse,
get_url_path_parse # Alias for backward compatibility
)
from .file import get_cloud_file, get_vm_file, get_cache_file, get_content_from_vm_file
from .general import get_vm_command_line, get_vm_terminal_output, get_vm_command_error

View File

@@ -1153,14 +1153,19 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
}
"""
active_tab_url = get_active_url_from_accessTree(env, config)
logger.info(f"[DEBUG] get_active_url_from_accessTree returned: {active_tab_url} (type: {type(active_tab_url)})")
if not isinstance(active_tab_url, str):
logger.error("active_tab_url is not a string")
logger.error(f"[DEBUG] active_tab_url is not a string, got {type(active_tab_url)}: {active_tab_url}")
return None
host = env.vm_ip
port = env.chromium_port # fixme: this port is hard-coded, need to be changed from config file
server_port = env.server_port
remote_debugging_url = f"http://{host}:{port}"
# DEBUG: Add logging for configuration
logger.info(f"[DEBUG] get_active_tab_html_parse called with config: {config}")
with sync_playwright() as p:
# connect to remote Chrome instance
try:
@@ -1189,13 +1194,21 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
for page in context.pages:
page.wait_for_load_state("networkidle")
# the accTree and playwright can get encoding(percent-encoding) characters, we need to convert them to normal characters
if unquote(page.url) == unquote(active_tab_url):
# Normalize URLs by removing trailing slashes and decoding percent-encoding
def normalize_url(url):
return unquote(url).rstrip('/')
if normalize_url(page.url) == normalize_url(active_tab_url):
target_page = page
print("\33[32mtartget page url: ", target_page.url, "\33[0m")
print("\33[32mtartget page title: ", target_page.title(), "\33[0m")
break
if target_page is None:
logger.error("Your tab is not the target tab.")
logger.error("[DEBUG] Could not find target tab matching URL. Available tabs:")
for context in browser.contexts:
for page in context.pages:
logger.error(f"[DEBUG] - Tab URL: {page.url}")
logger.error(f"[DEBUG] Expected URL: {active_tab_url}")
return {}
return_json = {}
@@ -1220,7 +1233,8 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
.filter(Boolean)
''')
results.append(texts)
return results[0]
# Safety check: return empty list if no elements found
return results[0] if results else []
def safely_get_direct_li_playwright(selector):
elements = target_page.query_selector_all(selector + " li.catAllProducts")
@@ -1238,6 +1252,9 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
index = int(order_key)
if len(elements_texts) > index:
return_json[key] = elements_texts[index]
else:
logger.warning(f"[DEBUG] Element at index {index} not found for class '{class_name}'. Found {len(elements_texts)} elements.")
return_json[key] = "" # Return empty string instead of None
class_multiObject_child = config.get("class_multiObject_child", {})
for class_name, object_dict in class_multiObject_child.items():
@@ -1246,6 +1263,9 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
index = int(order_key)
if len(elements_texts) > index:
return_json[key] = elements_texts[index]
else:
logger.warning(f"[DEBUG] Child element at index {index} not found for class '{class_name}'. Found {len(elements_texts)} elements.")
return_json[key] = "" # Return empty string instead of None
class_multiObject_only_child = config.get("class_multiObject_only_child", {})
for class_name, object_dict in class_multiObject_only_child.items():
@@ -1254,10 +1274,16 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
index = int(order_key)
if len(elements_texts) > index:
return_json[key] = elements_texts[index]
else:
logger.warning(f"[DEBUG] Only child element at index {index} not found for class '{class_name}'. Found {len(elements_texts)} elements.")
return_json[key] = "" # Return empty string instead of None
class_multiObject_search_exist = config.get("class_multiObject_search_exist", {})
for class_name, object_list in class_multiObject_search_exist.items():
elements_texts = safely_get_text_content("." + class_name)
logger.info(f"[DEBUG] Found elements with class '{class_name}': {elements_texts}")
logger.info(f"[DEBUG] Expected elements: {[obj for obj in object_list if obj != 'is_other_exist']}")
for each_object in object_list:
if each_object == "is_other_exist":
continue
@@ -1266,10 +1292,15 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
else:
return_json[each_object] = False
if "is_other_exist" in object_list:
extra_elements = []
for each_element in elements_texts:
if each_element not in object_list:
extra_elements.append(each_element)
return_json["is_other_exist"] = True
break
if extra_elements:
logger.warning(f"[DEBUG] Found unexpected elements not in expected list: {extra_elements}")
else:
logger.info(f"[DEBUG] No unexpected elements found")
if "is_other_exist" not in return_json.keys():
return_json["is_other_exist"] = False
@@ -1277,8 +1308,13 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
class_singleObject = config.get("class_singleObject", {})
for class_name, key in class_singleObject.items():
element_text = safely_get_text_content("." + class_name)
logger.info(f"[DEBUG] Class '{class_name}' found {len(element_text)} elements")
if element_text:
return_json[key] = element_text[0]
logger.info(f"[DEBUG] Class extraction for key '{key}': '{element_text[0]}'")
else:
logger.warning(f"[DEBUG] No elements found for class: {class_name}")
return_json[key] = "" # Return empty string instead of None
elif config['category'] == "label":
# Assuming get_by_label is a custom function or part of the framework being used
@@ -1290,17 +1326,75 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
elif config["category"] == "xpath":
xpathObject = config.get("xpathObject", {})
logger.info(f"[DEBUG] Processing xpath category with xpathObject: {xpathObject}")
for xpath, key in xpathObject.items():
logger.info(f"[DEBUG] Processing xpath: {xpath} -> key: {key}")
elements = target_page.locator(f"xpath={xpath}")
if elements.count() > 0:
return_json[key] = elements.first.text_content().strip()
element_count = elements.count()
logger.info(f"[DEBUG] Found {element_count} elements for xpath: {xpath}")
if element_count > 0:
try:
text_content = elements.first.text_content()
if text_content is not None:
text_content = text_content.strip()
logger.info(f"[DEBUG] Raw text content for key '{key}': '{text_content}' (type: {type(text_content)})")
# 处理空文本内容的情况
if text_content is None or text_content == "":
logger.warning(f"[DEBUG] Element found but text content is empty for key '{key}' xpath: {xpath}")
# 尝试获取更多信息
element_html = elements.first.inner_html()
element_text = elements.first.inner_text()
logger.info(f"[DEBUG] Element innerHTML: '{element_html[:100]}...' innerText: '{element_text}'")
return_json[key] = text_content if text_content else ""
logger.info(f"[DEBUG] Final value for key '{key}': '{return_json[key]}'")
except Exception as e:
logger.error(f"[DEBUG] Error extracting text from element for key '{key}': {e}")
return_json[key] = ""
else:
logger.warning(f"[DEBUG] No elements found for xpath: {xpath}")
# 尝试一些备用的xpath查找方法
try:
# 尝试不使用xpath前缀
fallback_elements = target_page.locator(xpath)
fallback_count = fallback_elements.count()
logger.info(f"[DEBUG] Fallback search (without xpath prefix) found {fallback_count} elements")
if fallback_count > 0:
text_content = fallback_elements.first.text_content()
if text_content:
text_content = text_content.strip()
return_json[key] = text_content if text_content else ""
logger.info(f"[DEBUG] Fallback extraction successful for key '{key}': '{return_json[key]}'")
else:
return_json[key] = ""
except Exception as e:
logger.info(f"[DEBUG] Fallback xpath search also failed: {e}")
return_json[key] = ""
elif config["category"] == "input":
inputObjects = config.get("inputObject", {})
logger.info(f"[DEBUG] Processing input category with inputObjects: {inputObjects}")
for xpath, key in inputObjects.items():
logger.info(f"[DEBUG] Processing input xpath: {xpath} -> key: {key}")
inputs = target_page.locator(f"xpath={xpath}")
if inputs.count() > 0:
return_json[key] = inputs.first.input_value().strip()
input_count = inputs.count()
logger.info(f"[DEBUG] Found {input_count} input elements for xpath: {xpath}")
if input_count > 0:
try:
input_value = inputs.first.input_value()
if input_value:
input_value = input_value.strip()
return_json[key] = input_value if input_value else ""
logger.info(f"[DEBUG] Input value for key '{key}': '{return_json[key]}'")
except Exception as e:
logger.error(f"[DEBUG] Error getting input value for key '{key}': {e}")
return_json[key] = ""
else:
logger.warning(f"[DEBUG] No input elements found for xpath: {xpath}")
return_json[key] = ""
elif config["category"] == "class&url":
class_multiObject = config.get("class_multiObject", {})
@@ -1352,6 +1446,23 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
return_json[value.lower()] = False
browser.close()
# DEBUG: Add logging for final result and check for None values
logger.info(f"[DEBUG] get_active_tab_html_parse final result: {return_json}")
# 检查是否有None值
none_keys = [key for key, value in return_json.items() if value is None]
if none_keys:
logger.warning(f"[DEBUG] Found None values for keys: {none_keys}")
# 检查是否期望的键都存在
if config["category"] == "xpath":
expected_keys = set(config.get("xpathObject", {}).values())
actual_keys = set(return_json.keys())
missing_keys = expected_keys - actual_keys
if missing_keys:
logger.warning(f"[DEBUG] Missing expected keys: {missing_keys}")
return return_json
@@ -1402,8 +1513,24 @@ def get_gotoRecreationPage_and_get_html_content(env, config: Dict[str, Any]):
print("go to newpage: ")
print(newpage.title())
time.sleep(2)
newpage.click("button.next-available")
print("after third click")
# Try to click the button with better error handling and longer timeout
try:
# Wait for the button to be available with a longer timeout
newpage.wait_for_selector("button.next-available", timeout=60000)
newpage.click("button.next-available", timeout=60000)
print("after third click")
except Exception as e:
logger.error(f"Failed to click 'next-available' button: {e}")
# Try alternative selectors if the main one fails
try:
newpage.wait_for_selector("button[class*='next']", timeout=30000)
newpage.click("button[class*='next']", timeout=30000)
print("after third click (alternative selector)")
except Exception as e2:
logger.error(f"Alternative selector also failed: {e2}")
# Continue execution even if button click fails
print("Continuing without clicking next-available button")
return_json = {}
return_json["expected"] = {}
@@ -1411,11 +1538,31 @@ def get_gotoRecreationPage_and_get_html_content(env, config: Dict[str, Any]):
if config["selector"] == "class":
if "order" in config.keys():
className = config["class"]
return_json["expected"][className] = newpage.query_selector_all("." + className)[
int(config["order"])].text_content().strip()
try:
elements = newpage.query_selector_all("." + className)
order_index = int(config["order"])
if len(elements) > order_index:
return_json["expected"][className] = elements[order_index].text_content().strip()
else:
logger.warning(f"Element with class '{className}' at index {order_index} not found. Found {len(elements)} elements.")
# For expected values, if we can't find the element, the evaluation cannot proceed
# Return a structure that indicates failure to get expected value
return_json["expected"][className] = "__EVALUATION_FAILED__"
except Exception as e:
logger.error(f"Error accessing element with class '{className}': {e}")
return_json["expected"][className] = "__EVALUATION_FAILED__"
else:
className = config["class"]
return_json["expected"][className] = newpage.query_selector("." + className).text_content().strip()
try:
element = newpage.query_selector("." + className)
if element:
return_json["expected"][className] = element.text_content().strip()
else:
logger.warning(f"Element with class '{className}' not found.")
return_json["expected"][className] = "__EVALUATION_FAILED__"
except Exception as e:
logger.error(f"Error accessing element with class '{className}': {e}")
return_json["expected"][className] = "__EVALUATION_FAILED__"
browser.close()
return return_json
@@ -1481,7 +1628,7 @@ def get_url_dashPart(env, config: Dict[str, str]):
return {config["key"]: dash_part}
def get_url_path_parse(env, config: Dict[str, str]):
def get_macys_product_url_parse(env, config: Dict[str, str]):
"""
Parse Macy's product url path, extract:
- mens_clothing: true if 'mens-clothing' in path, else None
@@ -1504,6 +1651,7 @@ def get_url_path_parse(env, config: Dict[str, str]):
path_parts = path.strip('/').split('/')
key_value_json = {}
shirts_flag = False
short_sleeve_flag = False # Initialize short_sleeve_flag to avoid UnboundLocalError
if "shirts" in path:
shirts_flag = True
if "short-sleeve" in path:
@@ -1531,10 +1679,20 @@ def get_url_path_parse(env, config: Dict[str, str]):
for key in config["parse_keys"]:
if key in key_value_json:
if key == "Price_discount_range":
if '50_PERCENT_ off & more' in key_value_json[key] and not '30_PERCENT_ off & more' in key_value_json[key] and not '20_PERCENT_ off & more' in key_value_json[key]:
# Check if key_value_json[key] is not None before using 'in' operator
if key_value_json[key] is not None and '50_PERCENT_ off & more' in key_value_json[key] and not '30_PERCENT_ off & more' in key_value_json[key] and not '20_PERCENT_ off & more' in key_value_json[key]:
result[key] = '50_PERCENT_ off & more'
else:
result[key] = 'not_50_PERCENT_ off & more'
else:
result[key] = key_value_json[key]
return result
# Alias for backward compatibility - the old function name was too generic
def get_url_path_parse(env, config: Dict[str, str]):
"""
Alias for get_macys_product_url_parse to maintain backward compatibility.
This function name is kept for existing configurations that still use "url_path_parse" type.
"""
return get_macys_product_url_parse(env, config)

View File

@@ -1,6 +1,8 @@
import logging
from typing import TypeVar, Dict
from datetime import datetime, timedelta
import pytz
import requests
logger = logging.getLogger("desktopenv.getters.misc")
@@ -71,6 +73,11 @@ relativeTime_to_IntDay = {
"this Sunday": "special",
"next Monday": "special",
"next Friday": "special",
"next Saturday": "special",
"next Sunday": "special",
"next week Friday": "special",
"next week Saturday": "special",
"next week Sunday": "special",
"first monday four months later": "special",
"first monday eight months later": "special",
"next Monday split": "special",
@@ -93,68 +100,146 @@ def get_rule_relativeTime(env, config: Dict[str, R]) -> R:
}
If relativeTime only has key "from", then the key of time in "expected" dict must be "time".
If relativeTime has key "to", then the key of time in "expected" dict must be "from" and "to".
Optional 'timezone': timezone string like 'Europe/Zurich', 'America/New_York', etc.
If not specified, will try to get timezone from IP geolocation.
"""
logger.info(f"[DEBUG] get_rule_relativeTime called with config: {config}")
relativeRules = config["rules"]
relativeTime = relativeRules["relativeTime"] # int, "+" means future, "-" means past
# get the date now
now = datetime.now()
logger.info(f"[DEBUG] relativeTime: {relativeTime}")
# Get timezone configuration
timezone_str = get_timezone_from_config(config)
try:
timezone = pytz.timezone(timezone_str)
logger.info(f"Successfully loaded timezone: {timezone_str}")
except pytz.exceptions.UnknownTimeZoneError:
logger.error(f"Unknown timezone: {timezone_str}, falling back to UTC")
timezone = pytz.UTC
# Get current time in the specified timezone
now = datetime.now(timezone)
logger.info(f"Current time in {timezone_str}: {now.strftime('%Y-%m-%d %H:%M:%S %Z')}")
# calculate the relative time
if "to" not in relativeTime.keys():
start_relative_time = relativeTime["from"]
logger.info(f"Processing single time: '{start_relative_time}'")
if relativeTime_to_IntDay[start_relative_time] != "special":
# relativeTime can be represented by actual int days
start_relative_time_IntDat = relativeTime_to_IntDay[start_relative_time]
timediff = timedelta(days=start_relative_time_IntDat)
absoluteDay = now + timediff
logger.info(f"Simple calculation: {start_relative_time} = {start_relative_time_IntDat} days → {absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
else:
# special case, you can add more special cases here
if start_relative_time == "5th next month":
next_year = now.year + 1 if now.month == 12 else now.year
next_month = now.month + 1 if now.month < 12 else 1
next_day = 5
absoluteDay = datetime(next_year, next_month, next_day)
absoluteDay = timezone.localize(datetime(next_year, next_month, next_day))
logger.info(f"5th next month: {absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
elif start_relative_time == "10th next month":
next_year = now.year + 1 if now.month == 12 else now.year
next_month = now.month + 1 if now.month < 12 else 1
next_day = 10
absoluteDay = datetime(next_year, next_month, next_day)
absoluteDay = timezone.localize(datetime(next_year, next_month, next_day))
logger.info(f"10th next month: {absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
elif start_relative_time == "this month":
absoluteDay = now
logger.info(f"This month: {absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
elif start_relative_time == "next Monday":
absoluteDay = now + timedelta(days=((6-now.weekday())+1))
days_until_monday = (6-now.weekday()) + 1
absoluteDay = now + timedelta(days=days_until_monday)
logger.info(f"Next Monday: current weekday={now.weekday()}, days to add={days_until_monday}{absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
elif start_relative_time == "first monday four months later":
next_year = now.year + 1 if now.month >=9 else now.year
next_month = (now.month + 4)%12
# get the first monday of the next_month
temp_date = datetime(next_year, next_month, 1)
absoluteDay = temp_date + timedelta(days=((6-temp_date.weekday())+1)%7)
temp_date = timezone.localize(datetime(next_year, next_month, 1))
days_to_monday = ((6-temp_date.weekday())+1)%7
absoluteDay = temp_date + timedelta(days=days_to_monday)
logger.info(f"First Monday 4 months later: {next_year}-{next_month:02d}{absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
elif start_relative_time == "first monday eight months later":
next_year = now.year + 1 if now.month >= 5 else now.year
next_month = (now.month + 8)%12
# get the first monday of the next_month
temp_date = datetime(next_year, next_month, 1)
absoluteDay = temp_date + timedelta(days=((6-temp_date.weekday())+1)%7)
temp_date = timezone.localize(datetime(next_year, next_month, 1))
days_to_monday = ((6-temp_date.weekday())+1)%7
absoluteDay = temp_date + timedelta(days=days_to_monday)
logger.info(f"First Monday 8 months later: {next_year}-{next_month:02d}{absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
regular_time = apply_rules_to_timeFormat(relativeRules["expected"]["time"], absoluteDay)
logger.info(f"Final formatted time: {regular_time}")
config["rules"]["expected"]["time"] = regular_time
else:
from_time = relativeTime["from"]
to_time = relativeTime["to"]
logger.info(f"Processing time range: from '{from_time}' to '{to_time}'")
# deal with from_time first
if relativeTime_to_IntDay[from_time] != "special":
from_time_IntDat = relativeTime_to_IntDay[from_time]
from_timediff = timedelta(days=from_time_IntDat)
from_absoluteDay = now + from_timediff
logger.info(f"From time calculation: {from_time} = {from_time_IntDat} days → {from_absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
else:
if from_time == "this Saturday":
from_absoluteDay = now + timedelta(days=(5-now.weekday()))
days_until_saturday = (5-now.weekday())
from_absoluteDay = now + timedelta(days=days_until_saturday)
logger.info(f"This Saturday: current weekday={now.weekday()}, days to add={days_until_saturday}{from_absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
elif from_time == "10th next month":
next_year = now.year + 1 if now.month == 12 else now.year
next_month = now.month + 1 if now.month < 12 else 1
next_day = 10
from_absoluteDay = datetime(next_year, next_month, next_day)
from_absoluteDay = timezone.localize(datetime(next_year, next_month, next_day))
logger.info(f"10th next month (from): {from_absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
elif from_time == "next Monday" or from_time == "next Monday split":
from_absoluteDay = now + timedelta(days=((6-now.weekday())+1))
days_until_monday = (6-now.weekday()) + 1
from_absoluteDay = now + timedelta(days=days_until_monday)
logger.info(f"Next Monday (from): current weekday={now.weekday()}, days to add={days_until_monday}{from_absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
elif from_time == "next Friday":
# Next weekend Friday calculation
if now.weekday() < 4: # Monday to Thursday - use this weekend
days_until_friday = 4 - now.weekday()
elif now.weekday() == 4: # Today is Friday - use next weekend
days_until_friday = 7
else: # Saturday to Sunday - use next weekend
days_until_friday = (7 - now.weekday()) + 4 # Days to next Monday + 4 to get to Friday
from_absoluteDay = now + timedelta(days=days_until_friday)
logger.info(f"Next Friday (from): current weekday={now.weekday()}, days to add={days_until_friday}{from_absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
elif from_time == "next Saturday":
# Next weekend Saturday calculation
if now.weekday() < 5: # Monday to Friday - use this weekend
days_until_saturday = 5 - now.weekday()
elif now.weekday() == 5: # Today is Saturday - use next weekend
days_until_saturday = 7
else: # Sunday - use next weekend
days_until_saturday = 6 # 6 days to next Saturday
from_absoluteDay = now + timedelta(days=days_until_saturday)
logger.info(f"Next Saturday (from): current weekday={now.weekday()}, days to add={days_until_saturday}{from_absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
elif from_time == "next week Friday":
# Next week Friday - simple: go to next Monday, then +4 days
days_to_next_monday = 7 - now.weekday()
days_until_friday = days_to_next_monday + 4 # Monday + 4 = Friday
from_absoluteDay = now + timedelta(days=days_until_friday)
logger.info(f"Next week Friday (from): current weekday={now.weekday()}, days to add={days_until_friday}{from_absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
elif from_time == "next week Saturday":
# Next week Saturday - simple: go to next Monday, then +5 days
days_to_next_monday = 7 - now.weekday()
days_until_saturday = days_to_next_monday + 5 # Monday + 5 = Saturday
from_absoluteDay = now + timedelta(days=days_until_saturday)
logger.info(f"Next week Saturday (from): current weekday={now.weekday()}, days to add={days_until_saturday}{from_absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
elif from_time == "next week Sunday":
# Next week Sunday - simple: go to next Monday, then +6 days
days_to_next_monday = 7 - now.weekday()
days_until_sunday = days_to_next_monday + 6 # Monday + 6 = Sunday
from_absoluteDay = now + timedelta(days=days_until_sunday)
logger.info(f"Next week Sunday (from): current weekday={now.weekday()}, days to add={days_until_sunday}{from_absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
else:
pass # more rules here
if from_time == "next Monday split":
@@ -164,28 +249,75 @@ def get_rule_relativeTime(env, config: Dict[str, R]) -> R:
config["rules"]["expected"]["puMonth"] = pumonth
puyear = apply_rules_to_timeFormat(relativeRules["expected"]["puYear"], from_absoluteDay)
config["rules"]["expected"]["puYear"] = puyear
logger.info(f"Monday split formatting: puDay={puday}, puMonth={pumonth}, puYear={puyear}")
else:
regular_from_time = apply_rules_to_timeFormat(relativeRules["expected"]["from"], from_absoluteDay)
config["rules"]["expected"]["from"] = regular_from_time
logger.info(f"From time formatted: {regular_from_time}")
# deal with to_time
if relativeTime_to_IntDay[to_time] != "special":
to_time_IntDat = relativeTime_to_IntDay[to_time]
to_timediff = timedelta(days=to_time_IntDat)
to_absoluteDay = now + to_timediff
logger.info(f"To time calculation: {to_time} = {to_time_IntDat} days → {to_absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
else:
if to_time == "this Sunday":
to_absoluteDay = now + timedelta(days=(6-now.weekday()))
days_until_sunday = (6-now.weekday())
to_absoluteDay = now + timedelta(days=days_until_sunday)
logger.info(f"This Sunday: current weekday={now.weekday()}, days to add={days_until_sunday}{to_absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
elif to_time == "11th next month":
next_year = now.year + 1 if now.month == 12 else now.year
next_month = now.month + 1 if now.month < 12 else 1
next_day = 11
to_absoluteDay = datetime(next_year, next_month, next_day)
to_absoluteDay = timezone.localize(datetime(next_year, next_month, next_day))
logger.info(f"11th next month (to): {to_absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
elif to_time == "next Friday" or to_time == "next Friday split":
if now.weekday() < 4 and from_time in ["next Monday"]:
to_absoluteDay = now + timedelta(days=((4-now.weekday())+7))
# Check if from_time is any variant of "next Monday"
if from_time in ["next Monday", "next Monday split"]:
# Calculate Friday of the same week as the Monday
# from_absoluteDay is already calculated as next Monday
# Friday is 4 days after Monday (Monday=0, Friday=4)
to_absoluteDay = from_absoluteDay + timedelta(days=4)
logger.info(f"Next Friday (same week as Monday): from Monday {from_absoluteDay.strftime('%Y-%m-%d')} + 4 days → {to_absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
else:
to_absoluteDay = now + timedelta(days=((4-now.weekday()) if now.weekday() < 4 else (6-now.weekday()) + 5))
# Standalone "next Friday" calculation
if now.weekday() < 4: # Monday to Thursday
days_to_friday = 4 - now.weekday()
else: # Friday to Sunday
days_to_friday = (6 - now.weekday()) + 5
to_absoluteDay = now + timedelta(days=days_to_friday)
logger.info(f"Next Friday (standalone): current weekday={now.weekday()}, days to add={days_to_friday}{to_absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
elif to_time == "next Sunday":
# Next weekend Sunday calculation - should be the same weekend as the from_time
if from_time in ["next Friday", "next Saturday"]:
# Calculate Sunday of the same weekend as from_time
# from_absoluteDay is already calculated, get the Sunday of that week
days_to_add_for_sunday = 6 - from_absoluteDay.weekday() # Days from Friday/Saturday to Sunday
to_absoluteDay = from_absoluteDay + timedelta(days=days_to_add_for_sunday)
logger.info(f"Next Sunday (to, same weekend as {from_time}): from {from_absoluteDay.strftime('%Y-%m-%d %A')} + {days_to_add_for_sunday} days → {to_absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
else:
# Standalone next Sunday calculation
if now.weekday() < 6: # Monday to Saturday - use this weekend
days_until_sunday = 6 - now.weekday()
else: # Sunday - use next weekend
days_until_sunday = 7
to_absoluteDay = now + timedelta(days=days_until_sunday)
logger.info(f"Next Sunday (to, standalone): current weekday={now.weekday()}, days to add={days_until_sunday}{to_absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
elif to_time == "next week Sunday":
# Next week Sunday calculation - should be the same week as from_time if from_time is also "next week"
if from_time in ["next week Friday", "next week Saturday"]:
# Calculate Sunday of the same week as from_time
# from_absoluteDay is already calculated, get the Sunday of that week
days_to_add_for_sunday = 6 - from_absoluteDay.weekday() # Days from Friday/Saturday to Sunday
to_absoluteDay = from_absoluteDay + timedelta(days=days_to_add_for_sunday)
logger.info(f"Next week Sunday (to, same week as {from_time}): from {from_absoluteDay.strftime('%Y-%m-%d %A')} + {days_to_add_for_sunday} days → {to_absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
else:
# Standalone next week Sunday calculation - simple: go to next Monday, then +6 days
days_to_next_monday = 7 - now.weekday()
days_until_sunday = days_to_next_monday + 6 # Monday + 6 = Sunday
to_absoluteDay = now + timedelta(days=days_until_sunday)
logger.info(f"Next week Sunday (to, standalone): current weekday={now.weekday()}, days to add={days_until_sunday}{to_absoluteDay.strftime('%Y-%m-%d %H:%M:%S %Z')}")
else:
pass # more rules here
if to_time == "next Friday split":
@@ -195,10 +327,14 @@ def get_rule_relativeTime(env, config: Dict[str, R]) -> R:
config["rules"]["expected"]["doMonth"] = to_month
to_year = apply_rules_to_timeFormat(relativeRules["expected"]["doYear"], to_absoluteDay)
config["rules"]["expected"]["doYear"] = to_year
logger.info(f"Friday split formatting: doDay={to_day}, doMonth={to_month}, doYear={to_year}")
else:
regular_to_time = apply_rules_to_timeFormat(relativeRules["expected"]["to"], to_absoluteDay)
config["rules"]["expected"]["to"] = regular_to_time
logger.info(f"To time formatted: {regular_to_time}")
logger.info(f"[DEBUG] Final config rules: {config['rules']}")
print(config["rules"])
return config["rules"]
@@ -227,4 +363,44 @@ def get_time_diff_range(env, config) -> str:
return config["diff_range_in_minutes"]
except:
logger.error("diff_range_in_minutes not found in config.")
return None
return None
def get_timezone_from_ip() -> str:
"""
Get timezone from IP address using IP geolocation API
Returns timezone string like 'Europe/Zurich' or 'UTC' as fallback
"""
try:
# Try ipapi.co first
response = requests.get('https://ipapi.co/json/', timeout=5)
if response.status_code == 200:
data = response.json()
timezone = data.get('timezone')
if timezone:
logger.info(f"Timezone from IP: {timezone}")
return timezone
except Exception as e:
logger.warning(f"Failed to get timezone from IP: {e}")
# Fallback to UTC
logger.info("Using UTC as fallback timezone")
return 'UTC'
def get_timezone_from_config(config: Dict, default_timezone: str = None) -> str:
"""
Get timezone from config, with fallback options
Priority: config timezone > default_timezone > IP-based timezone > UTC
"""
# Check if timezone is specified in config
if "timezone" in config.get("rules", {}):
timezone = config["rules"]["timezone"]
logger.info(f"Using timezone from config: {timezone}")
return timezone
# Use provided default
if default_timezone:
logger.info(f"Using provided default timezone: {default_timezone}")
return default_timezone
# Get from IP
return get_timezone_from_ip()

View File

@@ -2,6 +2,7 @@ import logging
import os
import re
import shutil
import io
from itertools import product
from typing import Any, Dict, List, Union
@@ -200,6 +201,7 @@ import fitz
from PIL import Image
from borb.pdf import Document
from borb.pdf import PDF
import imagehash
from pathlib import Path
import typing
@@ -208,6 +210,9 @@ import typing
def compare_pdf_images(pdf1_path: str, pdf2_path: str, **kwargs) -> float:
if not pdf1_path or not pdf2_path:
return 0.
if not all(map(os.path.exists, [pdf1_path, pdf2_path])):
logger.warning(f"PDF file does not exist: {pdf1_path} or {pdf2_path}")
return 0.
def extract_images_from_pdf(pdf_path):
pdf_document = fitz.open(pdf_path)
@@ -215,35 +220,61 @@ def compare_pdf_images(pdf1_path: str, pdf2_path: str, **kwargs) -> float:
for page_number in range(pdf_document.page_count):
page = pdf_document[page_number]
pixmap = page.get_pixmap()
img = Image.frombytes("RGB", [pixmap.width, pixmap.height], pixmap.samples)
images.append(img)
for img_index, img in enumerate(page.get_images(full=True)):
xref = img[0]
base_image = pdf_document.extract_image(xref)
image_bytes = base_image["image"]
# convert to PIL Image
try:
pil_image = Image.open(io.BytesIO(image_bytes))
images.append(pil_image)
except Exception as e:
logger.error(f"Failed to process image in {pdf_path} on page {page_number}: {e}")
return images
temp_dir = Path(pdf1_path).parent / "temp_pdf_comparison"
os.makedirs(temp_dir, exist_ok=True)
temp_pdf1 = temp_dir / Path(pdf1_path).name
temp_pdf2 = temp_dir / Path(pdf2_path).name
def fix_pdf(in_path: Path, out_path: Path) -> None:
doc: typing.Optional[Document] = None
with open(in_path, "rb") as fh:
doc = PDF.loads(fh)
with open(out_path, "wb") as fh:
PDF.dumps(fh, doc)
shutil.copy(pdf1_path, temp_pdf1)
shutil.copy(pdf2_path, temp_pdf2)
fix_pdf(Path(pdf1_path), Path(pdf1_path))
fix_pdf(Path(pdf2_path), Path(pdf2_path))
try:
images1 = extract_images_from_pdf(str(temp_pdf1))
images2 = extract_images_from_pdf(str(temp_pdf2))
except Exception as e:
logger.error(f"Error extracting images from PDFs: {e}")
shutil.rmtree(temp_dir)
return 0.
finally:
shutil.rmtree(temp_dir)
images1 = extract_images_from_pdf(pdf1_path)
images2 = extract_images_from_pdf(pdf2_path)
if len(images1) != len(images2):
logger.info(f"Different number of images found. Gold: {len(images1)}, Pred: {len(images2)}")
return 0.
for img1, img2 in zip(images1, images2):
if img1.tobytes() != img2.tobytes():
return 0.
if not images1:
logger.info("No images found in either PDF. Considering it a match.")
return 1.0
return 1.
hash_threshold = 5
total_score = 0
for i, (img1, img2) in enumerate(zip(images1, images2)):
hash1 = imagehash.phash(img1)
hash2 = imagehash.phash(img2)
hash_diff = hash1 - hash2
logger.info(f"Image {i+1}: Gold hash: {hash1}, Pred hash: {hash2}, Hash difference: {hash_diff}")
if hash_diff <= hash_threshold:
total_score +=1
return total_score / len(images1)
def compare_archive(pred_path: str, gold_path: str, **kwargs) -> float:

View File

@@ -86,6 +86,7 @@ def compare_docx_files(file1, file2, **options):
ignore_case = options.get('ignore_case', False)
ignore_order = options.get('ignore_order', False)
content_only = options.get('content_only', False)
fuzzy_match = options.get('fuzzy_match', False)
delete_empty_lines = options.get('delete_empty_lines', False)
if not file1 or not file2:
@@ -158,29 +159,48 @@ def compare_docx_files(file1, file2, **options):
text2 = re.sub(r'\s+', ' ', '\n'.join(doc2_paragraphs)).strip()
if ignore_case:
text1, text2 = text1.lower(), text2.lower()
if text1 != text2:
return 0
if fuzzy_match:
similarity = fuzz.ratio(text1, text2) / 100.0
return similarity
else:
if text1 != text2:
return 0
else:
print("ignore_blanks=false")
if len(doc1_paragraphs) != len(doc2_paragraphs):
print(doc1_paragraphs)
print(doc2_paragraphs)
print(len(doc1_paragraphs))
print(len(doc2_paragraphs))
return 0
print("in compare")
# Compare each paragraph
for p1, p2 in zip(doc1_paragraphs, doc2_paragraphs):
if ignore_case:
p1, p2 = p1.lower(), p2.lower()
if p1 != p2:
# show the difference
print("=== First Paragraph ===")
print(f"\033[92m{repr(p1)}\033[0m") # Green color for p1, repr() shows hidden chars
print("=== Second Paragraph ===")
print(f"\033[91m{repr(p2)}\033[0m") # Red color for p2, repr() shows hidden chars
print("=" * 50) # Clear boundary
return 0
if fuzzy_match:
total_similarity = 0
if not doc1_paragraphs:
return 1.0
for p1, p2 in zip(doc1_paragraphs, doc2_paragraphs):
if ignore_case:
p1, p2 = p1.lower(), p2.lower()
total_similarity += fuzz.ratio(p1, p2) / 100.0
if len(doc1_paragraphs) == 0:
return 1.0 if len(doc2_paragraphs) == 0 else 0.0
avg_similarity = total_similarity / len(doc1_paragraphs)
return avg_similarity
else:
# Compare each paragraph
for p1, p2 in zip(doc1_paragraphs, doc2_paragraphs):
if ignore_case:
p1, p2 = p1.lower(), p2.lower()
if p1 != p2:
# show the difference
print("=== First Paragraph ===")
print(f"\033[92m{repr(p1)}\033[0m") # Green color for p1, repr() shows hidden chars
print("=== Second Paragraph ===")
print(f"\033[91m{repr(p2)}\033[0m") # Red color for p2, repr() shows hidden chars
print("=" * 50) # Clear boundary
return 0
return 1

View File

@@ -178,18 +178,43 @@ def check_list(result: str, rules: Dict[str, List[str]]) -> float:
return float(all(expect_metrics) and unexpect_metric)
_accessibility_ns_map = {"st": "uri:deskat:state.at-spi.gnome.org"
, "attr": "uri:deskat:attributes.at-spi.gnome.org"
, "cp": "uri:deskat:component.at-spi.gnome.org"
, "doc": "uri:deskat:document.at-spi.gnome.org"
, "docattr": "uri:deskat:attributes.document.at-spi.gnome.org"
, "txt": "uri:deskat:text.at-spi.gnome.org"
, "val": "uri:deskat:value.at-spi.gnome.org"
, "act": "uri:deskat:action.at-spi.gnome.org"
}
_accessibility_ns_map = {
"ubuntu": {
"st": "https://accessibility.ubuntu.example.org/ns/state",
"attr": "https://accessibility.ubuntu.example.org/ns/attributes",
"cp": "https://accessibility.ubuntu.example.org/ns/component",
"doc": "https://accessibility.ubuntu.example.org/ns/document",
"docattr": "https://accessibility.ubuntu.example.org/ns/document/attributes",
"txt": "https://accessibility.ubuntu.example.org/ns/text",
"val": "https://accessibility.ubuntu.example.org/ns/value",
"act": "https://accessibility.ubuntu.example.org/ns/action",
},
"windows": {
"st": "https://accessibility.windows.example.org/ns/state",
"attr": "https://accessibility.windows.example.org/ns/attributes",
"cp": "https://accessibility.windows.example.org/ns/component",
"doc": "https://accessibility.windows.example.org/ns/document",
"docattr": "https://accessibility.windows.example.org/ns/document/attributes",
"txt": "https://accessibility.windows.example.org/ns/text",
"val": "https://accessibility.windows.example.org/ns/value",
"act": "https://accessibility.windows.example.org/ns/action",
"class": "https://accessibility.windows.example.org/ns/class"
},
"macos": {
"st": "https://accessibility.macos.example.org/ns/state",
"attr": "https://accessibility.macos.example.org/ns/attributes",
"cp": "https://accessibility.macos.example.org/ns/component",
"doc": "https://accessibility.macos.example.org/ns/document",
"txt": "https://accessibility.macos.example.org/ns/text",
"val": "https://accessibility.macos.example.org/ns/value",
"act": "https://accessibility.macos.example.org/ns/action",
"role": "https://accessibility.macos.example.org/ns/role",
}
}
def check_accessibility_tree(result: str, rules: List[Dict[str, Any]]) -> float:
def check_accessibility_tree(result: str, rules: List[Dict[str, Any]], osname: str = "ubuntu") -> float:
"""
Args:
result (str): XML of GNOME Accessibility Tree
@@ -205,18 +230,21 @@ def check_accessibility_tree(result: str, rules: List[Dict[str, Any]]) -> float:
"exact": bool specifying whether exact match or fuzzy match should
be performed. defaults to True.
}
osname (str): "ubuntu" | "windows" | "macos". "ubuntu" by default.
Returns:
float
"""
a11y_ns_map = _accessibility_ns_map[osname]
at: _Element = lxml.etree.fromstring(result)
total_match_score = 1.
for r in rules:
if "xpath" in r:
elements: List[_Element] = at.xpath(r["xpath"], namespaces=_accessibility_ns_map)
elements: List[_Element] = at.xpath(r["xpath"], namespaces=a11y_ns_map)
elif "selectors" in r:
selector = CSSSelector(", ".join(r["selectors"]), namespaces=_accessibility_ns_map)
selector = CSSSelector(", ".join(r["selectors"]), namespaces=a11y_ns_map)
elements: List[_Element] = selector(at)
else:
raise ValueError("At least one of xpath and selectors is required")
@@ -307,6 +335,9 @@ def check_direct_json_object(result, rules) -> float:
One of the most commonly used function to evalute.
Compare two json objects directly.
"""
logger.info(f"[DEBUG] check_direct_json_object called with result: {result}")
logger.info(f"[DEBUG] check_direct_json_object called with rules: {rules}")
if isinstance(result, str):
# remove blanks before and after result
result = result.strip()
@@ -314,45 +345,84 @@ def check_direct_json_object(result, rules) -> float:
result = result.replace("'", '"')
# load json object
result = json.loads(result)
logger.info(f"[DEBUG] Processed result: {result}")
if result is None:
logger.info("[DEBUG] Result is None, returning 0.0")
return 0.
# Check if expected value contains evaluation failure indicator
try:
expected_json = rules.get("expected", {})
if expected_json:
for key, value in expected_json.items():
if value == "__EVALUATION_FAILED__":
logger.error(f"[DEBUG] Expected value for key '{key}' indicates evaluation failure, returning 0.0")
return 0.
except Exception as e:
logger.error(f"[DEBUG] Error checking for evaluation failure indicator: {e}")
return 0.
try:
expect_in_result = rules.get("expect_in_result", False)
logger.info(f"[DEBUG] expect_in_result: {expect_in_result}")
if not expect_in_result:
expected_json = rules["expected"]
logger.info(f"[DEBUG] Expected JSON: {expected_json}")
for key in expected_json.keys():
expected_value = expected_json.get(key)
actual_value = result.get(key)
logger.info(f"[DEBUG] Checking key '{key}': expected='{expected_value}', actual='{actual_value}'")
if expected_json.get("ignore_list_order", False):
expected_value = sorted(expected_value)
result_value = sorted(result.get(key))
logger.info(f"[DEBUG] Comparing lists (sorted): expected={expected_value}, actual={result_value}")
if expected_value != result_value:
logger.info(f"[DEBUG] List comparison failed for key '{key}', returning 0.0")
return 0.
else:
if expected_value != result.get(key):
if expected_value != actual_value:
logger.info(f"[DEBUG] Value comparison failed for key '{key}': expected='{expected_value}', actual='{actual_value}', returning 0.0")
return 0.
else:
logger.info(f"[DEBUG] Value comparison passed for key '{key}'")
logger.info("[DEBUG] All comparisons passed, returning 1.0")
return 1.0
else:
expected_json = rules["expected"]
logger.info(f"[DEBUG] Expected JSON (expect_in_result mode): {expected_json}")
for key in expected_json.keys():
if isinstance(expected_json.get(key), list):
flag = 0
expected_value_list = expected_json.get(key)
logger.info(f"[DEBUG] Checking list key '{key}': expected_list={expected_value_list}, actual='{result.get(key)}'")
for each_expected_value in expected_value_list:
if isinstance(result.get(key), list) and each_expected_value in result.get(key):
flag = 1
logger.info(f"[DEBUG] Found expected value '{each_expected_value}' in result list for key '{key}'")
break
if flag == 0:
logger.info(f"[DEBUG] No expected values found in result list for key '{key}', returning 0.0")
return 0.
elif isinstance(expected_json.get(key), str):
if expected_json.get(key) not in result.get(key):
expected_str = expected_json.get(key)
actual_str = result.get(key)
logger.info(f"[DEBUG] Checking string key '{key}': expected='{expected_str}', actual='{actual_str}'")
if expected_str not in actual_str:
logger.info(f"[DEBUG] Expected string '{expected_str}' not found in actual string '{actual_str}' for key '{key}', returning 0.0")
return 0.
else:
logger.debug("check_direct_json_object: expected value type not supported")
return 0.
logger.info("[DEBUG] All expect_in_result comparisons passed, returning 1.0")
return 1.0
except:
logger.debug("check_direct_json_object: result is not a valid json object")
except Exception as e:
logger.debug(f"check_direct_json_object: result is not a valid json object, error: {e}")
return 0.
@@ -361,7 +431,7 @@ def compare_time_in_speedtest_results(speedtest_result_path, time_diff):
return 0
# open the speedtest results file(csv)
date_col = None
#date_col = None
try:
with open(speedtest_result_path, 'r') as f:
for i, line in enumerate(f):
@@ -476,37 +546,66 @@ def compare_terminal_and_txt(txt_file_path, terminal_output):
def compare_python_pure_text(py_file_path, gold_file_path):
if not py_file_path or not gold_file_path:
return 0
return 0.0
# first, change the suffix of gold_file from .txt to .py
print("py_file_path: ")
print(py_file_path)
print("gold_file_path: ")
print(gold_file_path)
def _normalize(text):
"""
Minimal normalization - only handle basic formatting:
- Skip obvious file metadata (encoding, shebang) at the beginning
- Normalize whitespace and indentation
- Remove empty lines
This preserves any content that shouldn't be there (like markdown)
so it can be detected as an error.
"""
lines = text.splitlines()
result_lines = []
i = 0
# Only skip obvious metadata at the very beginning
while i < len(lines) and i < 3: # Check only first 3 lines
stripped = lines[i].strip()
if (stripped.startswith('#!') or
stripped.startswith('# -*- coding:') or
stripped.startswith('# coding:') or
stripped.startswith('# coding=')):
i += 1
continue
break
# Process all remaining lines with minimal filtering
while i < len(lines):
line = lines[i]
stripped = line.strip()
if stripped: # Keep all non-empty lines
normalized = line.expandtabs(4).rstrip()
result_lines.append(normalized)
i += 1
return '\n'.join(result_lines)
# gold_file_path = gold_file_path.replace('.txt', '.py')
def remove_whitespace(text):
return ''.join(text.split())
with open(py_file_path, 'r') as file1:
content1 = file1.read()
with open(gold_file_path, 'r') as file2:
content2 = file2.read()
content1_no_whitespace = remove_whitespace(content1)
content2_no_whitespace = remove_whitespace(content2)
if content1_no_whitespace == content2_no_whitespace:
return 1
else:
return 0
if __name__ == '__main__':
print(check_direct_json_object([], rules={
"relativeTime": {
"from": "5th next month"
},
"expected": {
"start": "SEA",
"end": "NYC",
"time": "{DoW}, {Month} {DayD}, {Year}",
"category": "Miles"
}}))
try:
with open(py_file_path, 'r', encoding='utf-8') as file1:
user_content = file1.read()
with open(gold_file_path, 'r', encoding='utf-8') as file2:
gold_content = file2.read()
# Apply different normalization strategies
user_normalized = _normalize(user_content)
gold_normalized = _normalize(gold_content)
if user_normalized == gold_normalized:
return 1.0
else:
return 0.0
except (FileNotFoundError, IOError, UnicodeDecodeError) as e:
logger.debug(f"compare_python_pure_text: Error reading files - {e}")
return 0.0
except Exception as e:
logger.debug(f"compare_python_pure_text: Unexpected error - {e}")
return 0.0

View File

@@ -1,4 +1,5 @@
import os
import logging
from typing import List, Union
from skimage.metrics import structural_similarity as ssim
from PIL import Image, ImageChops, ImageStat
@@ -39,7 +40,7 @@ def get_gimp_export_path():
return current_path
except FileNotFoundError:
# Handle the case where the configuration file is not found
print("GIMP configuration file not found")
logging.debug("GIMP configuration file not found")
return False
@@ -193,18 +194,18 @@ def structure_check_by_mse(img1, img2, threshold=0.03):
(np.array(img1, dtype=np.float32) / 255
- np.array(img2, dtype=np.float32) / 255) ** 2)
structure_same = True if mse < threshold else False
print("MSE: ", mse)
logging.debug(f"MSE: {mse}, threshold: {threshold}")
return structure_same
def structure_check_by_ssim(img1, img2, threshold=0.9):
"""Check if two images are approximately the same by SSIM"""
similarity = ssim(np.array(img1), np.array(img2), multichannel=True, channel_axis=-1)
print("SSIM: ", similarity)
logging.debug("SSIM: %s", similarity)
return similarity >= threshold
def check_brightness_decrease_and_structure_sim(src_path, tgt_path):
def check_brightness_decrease_and_structure_sim(src_path, tgt_path, threshold=0.03):
"""
Check the brightness of src is lower than tgt and the structures are similar
gimp:7a4deb26-d57d-4ea9-9a73-630f66a7b568
@@ -219,13 +220,15 @@ def check_brightness_decrease_and_structure_sim(src_path, tgt_path):
brightness_src = calculate_brightness(img_src)
brightness_tgt = calculate_brightness(img_tgt)
brightness_reduced = brightness_tgt > brightness_src
# print(f"Brightness src: {brightness_src}, tgt: {brightness_tgt}, reduced: {brightness_reduced}")
# Normalize and compare images
target_brightness = 128
img_src_normalized = normalize_brightness(img_src, target_brightness)
img_tgt_normalized = normalize_brightness(img_tgt, target_brightness)
structure_same = structure_check_by_mse(img_src_normalized, img_tgt_normalized)
structure_same = structure_check_by_mse(img_src_normalized, img_tgt_normalized, threshold=threshold)
if brightness_reduced and structure_same:
return 1.
else:
@@ -362,11 +365,37 @@ def check_structure_sim_resized(src_path, tgt_path):
img_src = Image.open(src_path)
img_tgt = Image.open(tgt_path)
# Resize the images to the same size
img_src = img_src.resize(img_tgt.size)
# Check if source image has transparency and extract content area
if img_src.mode in ('RGBA', 'LA') or 'transparency' in img_src.info:
if img_src.mode != 'RGBA':
img_src = img_src.convert('RGBA')
# Get alpha channel and find bounding box of non-transparent pixels
alpha = img_src.split()[-1]
bbox = alpha.getbbox()
if bbox is None:
# Image is completely transparent
logging.debug("Source image is completely transparent")
return 0.
# Crop to content area only
img_src_content = img_src.crop(bbox)
logging.debug(f"Source image cropped from {img_src.size} to {img_src_content.size}")
# Convert to RGB for comparison
img_src_content = img_src_content.convert('RGB')
img_src_resized = img_src_content.resize(img_tgt.size)
else:
# No transparency, resize normally
img_src_resized = img_src.resize(img_tgt.size)
# Ensure target image is RGB for comparison
if img_tgt.mode != 'RGB':
img_tgt = img_tgt.convert('RGB')
# Check if the structure is similar
structure_same = structure_check_by_ssim(img_src, img_tgt)
structure_same = structure_check_by_ssim(img_src_resized, img_tgt)
return structure_same
@@ -431,20 +460,52 @@ def check_image_size(src_path, rule):
# Load the image
img = Image.open(src_path)
# Check if we should ignore transparent parts
ignore_transparent = rule.get("ignore_transparent", False)
if ignore_transparent and img.mode in ('RGBA', 'LA') or 'transparency' in img.info:
# Calculate bounding box of non-transparent pixels
if img.mode != 'RGBA':
img = img.convert('RGBA')
# Get alpha channel
alpha = img.split()[-1]
# Find bounding box of non-transparent pixels
bbox = alpha.getbbox()
if bbox is None:
# Image is completely transparent
actual_width = 0
actual_height = 0
else:
# Calculate actual content size
actual_width = bbox[2] - bbox[0]
actual_height = bbox[3] - bbox[1]
logging.debug(f"Original size: {img.size}, Content size: {actual_width}x{actual_height}")
else:
# Use original image size
actual_width = img.size[0]
actual_height = img.size[1]
logging.debug(f"Image size: {img.size}")
# Check the size
if rule.get("height", None) is not None:
height_same = img.size[1] == rule["height"]
height_same = actual_height == rule["height"]
else:
height_same = True
if rule.get("width", None) is not None:
width_same = img.size[0] == rule["width"]
width_same = actual_width == rule["width"]
else:
width_same = True
if height_same and width_same:
logging.debug(f"height_same: {height_same}, width_same: {width_same}")
return 1.
else:
logging.debug(f"height_same: {height_same}, width_same: {width_same}")
return 0.

View File

@@ -63,11 +63,13 @@ def compare_epub(result: str, expected: str) -> float:
result_files: List[str] = process_epub(result)
expected_files: List[str] = process_epub(expected)
metric: float = 1.
metric: float = 0.
for f1, f2 in zip(result_files, expected_files):
current_metric: float = diff_text_file(f1, f2)
logger.debug("%s vs %s: %f", f1, f2, current_metric)
metric *= current_metric
metric += current_metric
if len(result_files) > 0:
metric /= len(result_files)
return metric

View File

@@ -5,6 +5,7 @@ from math import sqrt
from pptx import Presentation
from pptx.util import Inches
from pptx.enum.shapes import MSO_SHAPE_TYPE
logger = logging.getLogger("desktopenv.metric.slides")
@@ -139,6 +140,17 @@ def compare_pptx_files(file1_path, file2_path, **options):
prs1 = Presentation(file1_path)
prs2 = Presentation(file2_path)
approximately_tolerance = options.get("approximately_tolerance", 0.005)
def is_approximately_equal(val1, val2, tolerance=approximately_tolerance):
"""Compare two values with a tolerance of 0.1% (0.005)"""
if val1 == val2:
return True
if val1 == 0 and val2 == 0:
return True
if val1 == 0 or val2 == 0:
return False
return abs(val1 - val2) / max(abs(val1), abs(val2)) <= tolerance
examine_number_of_slides = options.get("examine_number_of_slides", True)
examine_shape = options.get("examine_shape", True)
examine_text = options.get("examine_text", True)
@@ -212,14 +224,20 @@ def compare_pptx_files(file1_path, file2_path, **options):
if hasattr(shape1, "text") and hasattr(shape2, "text") and shape1.text == shape2.text:
if shape1.text == "Product Comparison" and (shape1.top <= shape2.top or shape1.top < 3600000):
return 0
elif shape1.left != shape2.left or shape1.top != shape2.top or shape1.width != shape2.width or shape1.height != shape2.height:
elif (not is_approximately_equal(shape1.left, shape2.left) or
not is_approximately_equal(shape1.top, shape2.top) or
not is_approximately_equal(shape1.width, shape2.width) or
not is_approximately_equal(shape1.height, shape2.height)):
return 0
if examine_table_bottom_position:
if slide_idx == 3 and shape1.shape_type == 19 and shape2.shape_type == 19:
if shape1.top <= shape2.top or shape1.top < 3600000:
return 0
elif shape1.left != shape2.left or shape1.top != shape2.top or shape1.width != shape2.width or shape1.height != shape2.height:
elif (not is_approximately_equal(shape1.left, shape2.left) or
not is_approximately_equal(shape1.top, shape2.top) or
not is_approximately_equal(shape1.width, shape2.width) or
not is_approximately_equal(shape1.height, shape2.height)):
return 0
if examine_right_position:
@@ -231,34 +249,62 @@ def compare_pptx_files(file1_path, file2_path, **options):
if slide_idx == 2 and shape1.shape_type == 13 and shape2.shape_type == 13:
if shape1.top >= shape2.top or shape1.top > 1980000:
return 0
elif shape1.left != shape2.left or shape1.top != shape2.top or shape1.width != shape2.width or shape1.height != shape2.height:
elif (not is_approximately_equal(shape1.left, shape2.left) or
not is_approximately_equal(shape1.top, shape2.top) or
not is_approximately_equal(shape1.width, shape2.width) or
not is_approximately_equal(shape1.height, shape2.height)):
return 0
if examine_shape_for_shift_size:
if shape1.left != shape2.left or shape1.top != shape2.top or shape1.width != shape2.width or shape1.height != shape2.height:
if (not is_approximately_equal(shape1.left, shape2.left) or
not is_approximately_equal(shape1.top, shape2.top) or
not is_approximately_equal(shape1.width, shape2.width) or
not is_approximately_equal(shape1.height, shape2.height)):
if not (hasattr(shape1, "text") and hasattr(shape2,
"text") and shape1.text == shape2.text and shape1.text == "Elaborate on what you want to discuss."):
return 0
if (
shape1.left != shape2.left or shape1.top != shape2.top or shape1.width != shape2.width or shape1.height != shape2.height) and examine_shape:
not is_approximately_equal(shape1.left, shape2.left) or
not is_approximately_equal(shape1.top, shape2.top) or
not is_approximately_equal(shape1.width, shape2.width) or
not is_approximately_equal(shape1.height, shape2.height)) and examine_shape:
return 0
if examine_image_size:
if shape1.shape_type == 13 and shape2.shape_type == 13:
if shape1.width != shape2.width or shape1.height != shape2.height:
if not is_approximately_equal(shape1.width, shape2.width) or not is_approximately_equal(shape1.height, shape2.height):
return 0
elif shape1.left != shape2.left or shape1.top != shape2.top or shape1.width != shape2.width or shape1.height != shape2.height:
elif (not is_approximately_equal(shape1.left, shape2.left) or
not is_approximately_equal(shape1.top, shape2.top) or
not is_approximately_equal(shape1.width, shape2.width) or
not is_approximately_equal(shape1.height, shape2.height)):
return 0
if examine_modify_height:
if not hasattr(shape1, "text") and not hasattr(shape2,
"text") or shape1.shape_type == 5 and shape2.shape_type == 5:
if shape1.height != shape2.height:
if not is_approximately_equal(shape1.height, shape2.height):
return 0
elif shape1.left != shape2.left or shape1.top != shape2.top or shape1.width != shape2.width or shape1.height != shape2.height:
elif (not is_approximately_equal(shape1.left, shape2.left) or
not is_approximately_equal(shape1.top, shape2.top) or
not is_approximately_equal(shape1.width, shape2.width) or
not is_approximately_equal(shape1.height, shape2.height)):
return 0
if shape1.shape_type == MSO_SHAPE_TYPE.TABLE:
table1 = shape1.table
table2 = shape2.table
for row_idx in range(len(table1.rows)):
for col_idx in range(len(table1.columns)):
cell1 = table1.cell(row_idx, col_idx)
cell2 = table2.cell(row_idx, col_idx)
for para1, para2 in zip(cell1.text_frame.paragraphs, cell2.text_frame.paragraphs):
for run1, run2 in zip(para1.runs, para2.runs):
if run1.font.color.rgb != run2.font.color.rgb:
return 0
if hasattr(shape1, "text") and hasattr(shape2, "text"):
if shape1.text.strip() != shape2.text.strip() and examine_text:
return 0
@@ -288,15 +334,19 @@ def compare_pptx_files(file1_path, file2_path, **options):
return 0
if run1.font.italic != run2.font.italic and examine_font_italic:
return 0
if run1.font.italic is not None and run2.font.italic is not None:
return 0
if hasattr(run1.font.color, "rgb") and hasattr(run2.font.color, "rgb"):
if run1.font.color.rgb != run2.font.color.rgb and examine_color_rgb:
return 0
if run1.font.underline != run2.font.underline and examine_font_underline:
return 0
if run1.font.underline is not None and run2.font.underline is not None:
return 0
if (run1.font.underline is None and run2.font.underline is True) or (run1.font.underline is True and run2.font.underline is None):
return 0
if run1.font._element.attrib.get('strike', 'noStrike') != run2.font._element.attrib.get(
'strike', 'noStrike') and examine_strike_through:
return 0
@@ -325,14 +375,20 @@ def compare_pptx_files(file1_path, file2_path, **options):
color = "No Color"
text = "".join(t.text for t in paragraph.findall('.//a:t', namespaces))
bullets.append((lvl, char, text, color))
# Only add non-empty paragraphs to bullets list
if text.strip():
bullets.append((lvl, char, text, color))
return bullets
if examine_bullets and _extract_bullets(run1.part.blob.decode('utf-8')) != _extract_bullets(
run2.part.blob.decode('utf-8')):
return 0
if examine_bullets:
bullets1 = _extract_bullets(run1.part.blob.decode('utf-8'))
bullets2 = _extract_bullets(run2.part.blob.decode('utf-8'))
# Compare only non-empty bullets
if bullets1 != bullets2:
return 0
# fixme: Actually there are more properties to be compared, we can add them later via parsing the xml data
@@ -446,17 +502,13 @@ def check_left_panel(accessibility_tree):
root = ET.fromstring(accessibility_tree)
for root_pane in root.iter('root-pane'):
for panel in root_pane.iter('panel'):
for split_pane in panel.iter('split-pane'):
# Get the left panel
if split_pane.attrib.get("{{{}}}parentcoord".format(namespaces['cp'])) == "(0, 0)":
# Get the visible attribute
visible = split_pane.attrib.get("{{{}}}visible".format(namespaces['st']))
if visible:
# decide if it is left panel
return 1.
# 遍历所有 document-frame 节点
for doc_frame in root.iter('document-frame'):
if doc_frame.attrib.get("name") == "Slides View":
# 说明 Slides View 存在,即左侧面板已打开
return 1.
# 没找到 Slides View认为左侧面板未打开
return 0.

View File

@@ -308,7 +308,7 @@ def compare_videos(video_path1, video_path2, max_frames_to_check=100, threshold=
# If a video ends, then check if both ended to confirm they are of the same length
if not ret1 or not ret2:
return ret1 == ret2
return 1. if ret1 == ret2 else 0. # return float only
# Convert frames to PIL Images
frame1 = Image.fromarray(cv2.cvtColor(frame1, cv2.COLOR_BGR2RGB))

View File

@@ -36,7 +36,7 @@ DEFAULT_REGION = "us-east-1"
# todo: Add doc for the configuration of image, security group and network interface
# todo: public the AMI images
IMAGE_ID_MAP = {
"us-east-1": "ami-0cae20d2680c939d4",
"us-east-1": "ami-09138bff939f82bd8",
"ap-east-1": "ami-0c092a5b8be4116f5",
}

View File

@@ -1161,7 +1161,9 @@ def upload_file():
try:
# Ensure target directory exists
os.makedirs(os.path.dirname(file_path), exist_ok=True)
target_dir = os.path.dirname(file_path)
if target_dir: # Only create directory if it's not empty
os.makedirs(target_dir, exist_ok=True)
# Save file and get size for verification
file.save(file_path)

View File

@@ -1,16 +1,19 @@
[Unit]
Description=OSWorld Server
Description=osworld service
StartLimitIntervalSec=60
StartLimitBurst=4
After=network.target auditd.service
[Service]
ExecStart=/usr/bin/python3 /home/user/main.py
Type=simple
ExecStart=/usr/bin/python /home/user/server/main.py
User=user
WorkingDirectory=/home/user
Environment="DISPLAY=:0"
Environment="DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/1000/bus"
Environment="XDG_RUNTIME_DIR=/run/user/1000"
Restart=on-failure
RestartSec=1
Environment="DISPLAY=:1"
RestartSec=5s
[Install]
WantedBy=graphical.target
WantedBy=multi-user.target