Merge branch 'main' into zdy

This commit is contained in:
David Chang
2024-01-13 17:15:32 +08:00
61 changed files with 4616 additions and 273 deletions

View File

@@ -2,4 +2,4 @@ from .file import get_cloud_file, get_vm_file, get_cache_file
from .info import get_vm_screen_size, get_vm_window_size, get_vm_wallpaper
from .misc import get_rule, get_accessibility_tree
from .vlc import get_vlc_playing_info, get_vlc_config
from .chrome import get_default_search_engine, get_bookmarks
from .chrome import get_default_search_engine, get_bookmarks, get_open_tabs_info

View File

@@ -4,6 +4,8 @@ import os
import sqlite3
from typing import Dict
from playwright.sync_api import sync_playwright
logger = logging.getLogger("desktopenv.getters.chrome")
"""
@@ -13,15 +15,20 @@ WARNING:
"""
# The following ones just need to load info from the files of software, no need to connect to the software
def get_default_search_engine(env, config: Dict[str, str]):
os_type = env.vm_platform
if os_type == 'Windows':
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Default\\Preferences'))""")['output'].strip()
elif os_type == 'Darwin':
preference_file_path = env.controller.execute_python_command("import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")['output'].strip()
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
'output'].strip()
elif os_type == 'Linux':
preference_file_path = env.controller.execute_python_command("import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")['output'].strip()
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
'output'].strip()
else:
raise Exception('Unsupported operating system')
@@ -41,12 +48,16 @@ def get_default_search_engine(env, config: Dict[str, str]):
def get_cookie_data(env, config: Dict[str, str]):
os_type = env.vm_platform
if os_type == 'Windows':
chrome_cookie_file_path = os.path.join(os.getenv('LOCALAPPDATA'), 'Google\\Chrome\\User Data\\Default\\Cookies')
chrome_cookie_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Default\\Cookies'))""")['output'].strip()
elif os_type == 'Darwin':
chrome_cookie_file_path = os.path.join(os.getenv('HOME'),
'Library/Application Support/Google/Chrome/Default/Cookies')
chrome_cookie_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Cookies'))")[
'output'].strip()
elif os_type == 'Linux':
chrome_cookie_file_path = os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Cookies')
chrome_cookie_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Cookies'))")[
'output'].strip()
else:
raise Exception('Unsupported operating system')
@@ -70,13 +81,16 @@ def get_cookie_data(env, config: Dict[str, str]):
def get_bookmarks(env, config: Dict[str, str]):
os_type = env.vm_platform
if os_type == 'Windows':
preference_file_path = os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Default\\Bookmarks')
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Default\\Bookmarks'))""")['output'].strip()
elif os_type == 'Darwin':
preference_file_path = os.path.join(os.getenv('HOME'),
'Library/Application Support/Google/Chrome/Default/Bookmarks')
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Bookmarks'))")[
'output'].strip()
elif os_type == 'Linux':
preference_file_path = os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Bookmarks')
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Bookmarks'))")[
'output'].strip()
else:
raise Exception('Unsupported operating system')
@@ -98,13 +112,16 @@ def get_extensions_installed_from_shop(env, config: Dict[str, str]):
"""Find the Chrome extensions directory based on the operating system."""
os_type = env.vm_platform
if os_type == 'Windows':
chrome_extension_dir = os.path.expanduser(
'~') + '\\AppData\\Local\\Google\\Chrome\\User Data\\Default\\Extensions\\'
chrome_extension_dir = env.controller.execute_python_command(
"""os.path.expanduser('~') + '\\AppData\\Local\\Google\\Chrome\\User Data\\Default\\Extensions\\'""")[
'output'].strip()
elif os_type == 'Darwin': # macOS
chrome_extension_dir = os.path.expanduser(
'~') + '/Library/Application Support/Google/Chrome/Default/Extensions/'
chrome_extension_dir = env.controller.execute_python_command(
"""os.path.expanduser('~') + '/Library/Application Support/Google/Chrome/Default/Extensions/'""")[
'output'].strip()
elif os_type == 'Linux':
chrome_extension_dir = os.path.expanduser('~') + '/.config/google-chrome/Default/Extensions/'
chrome_extension_dir = env.controller.execute_python_command(
"""os.path.expanduser('~') + '/.config/google-chrome/Default/Extensions/'""")['output'].strip()
else:
raise Exception('Unsupported operating system')
@@ -124,3 +141,52 @@ def get_extensions_installed_from_shop(env, config: Dict[str, str]):
except json.JSONDecodeError:
logger.error(f"Error reading {manifest_path}")
return manifests
# The following ones require Playwright to be installed on the target machine, and the chrome needs to be pre-config on
# port info to allow remote debugging, see README.md for details
def get_open_tabs_info(env, config: Dict[str, str]):
host = env.vm_ip
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
remote_debugging_url = f"http://{host}:{port}"
with sync_playwright() as p:
# connect to remote Chrome instance
browser = p.chromium.connect_over_cdp(remote_debugging_url)
tabs_info = []
for context in browser.contexts:
for page in context.pages:
title = page.title()
url = page.url
tabs_info.append({'title': title, 'url': url})
browser.close()
return tabs_info
def get_active_tab_info(env, config: Dict[str, str]):
host = env.vm_ip
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
remote_debugging_url = f"http://{host}:{port}"
with sync_playwright() as p:
# connect to remote Chrome instance
browser = p.chromium.connect_over_cdp(remote_debugging_url)
active_tab_info = {}
for context in browser.contexts:
for page in context.pages:
if page.is_visible("body"): # check the visibility of the page body to determine the active status
active_tab_info = {
'title': page.title(),
'url': page.url,
'content': page.content() # get the HTML content of the page
}
break
if active_tab_info:
break
browser.close()
return active_tab_info

View File

@@ -2,6 +2,8 @@ import logging
import os
from typing import Dict
import requests
logger = logging.getLogger("desktopenv.getters.vlc")
@@ -15,7 +17,14 @@ def get_vlc_playing_info(env, config: Dict[str, str]):
password = 'password'
_path = os.path.join(env.cache_dir, config["dest"])
content = env.controller.get_vlc_status(host, port, password)
url = f'http://{host}:{port}/requests/status.xml'
response = requests.get(url, auth=('', password))
if response.status_code == 200:
content = response.content
else:
logger.error("Failed to get vlc status. Status code: %d", response.status_code)
return None
with open(_path, "wb") as f:
f.write(content)

View File

@@ -1,3 +1,4 @@
from .chrome import is_expected_tabs, is_expected_bookmarks
from .docs import compare_font_names, compare_subscript_contains, has_page_numbers_in_footers
from .docs import find_default_font, contains_page_break, compare_docx_files, compare_docx_tables, compare_line_spacing, \
compare_insert_equation
@@ -9,5 +10,7 @@ from .table import check_sheet_list, check_xlsx_freeze, check_xlsx_zoom
from .table import compare_table
from .vlc import is_vlc_playing, is_vlc_recordings_folder, is_vlc_fullscreen, compare_images, compare_audios, \
compare_videos
from .gimp import increase_saturation, decrease_brightness, check_file_exists, compare_triangle_positions
from .general import check_csv, check_accessibility_tree, check_list, run_sqlite3
from .thunderbird import check_thunderbird_prefs, check_thunderbird_filter

View File

@@ -1,52 +1,39 @@
import logging
from playwright.sync_api import sync_playwright
from typing import Any, Dict, List
from desktop_env.evaluators.metrics.utils import are_lists_equal, compare_urls
logger = logging.getLogger("desktopenv.metrics.chrome")
def is_expected_tabs(open_tabs: List[Dict[str, str]], rule: Dict[str, Any]) -> float:
"""
Checks if the expected tabs are open in Chrome.
"""
print(open_tabs, rule)
match_type = rule['type']
if match_type == "url":
expected_urls = rule['urls']
actual_urls = [tab['url'] for tab in open_tabs]
return 1 if are_lists_equal(expected_urls, actual_urls, compare_urls) else 0
else:
logger.error(f"Unknown type: {match_type}")
return 0
# todo: move to getter module
def is_expected_bookmarks(bookmarks: List[Dict[str, Any]], rule: Dict[str, Any]) -> float:
"""
Checks if the expected bookmarks are in Chrome.
"""
# The following ones just need to load info from the files of software, no need to connect to the software
# todo
match_type = rule['type']
# The following ones require Playwright to be installed on the target machine, and the chrome needs to be pre-config on port info to allow remote debugging, see README.md for details
def get_open_tabs_info(remote_debugging_url):
with sync_playwright() as p:
# connect to remote Chrome instance
browser = p.chromium.connect_over_cdp(remote_debugging_url)
tabs_info = []
for context in browser.contexts:
for page in context.pages:
title = page.title()
url = page.url
tabs_info.append({'title': title, 'url': url})
browser.close()
return tabs_info
def get_active_tab_info(remote_debugging_url):
with sync_playwright() as p:
# connect to remote Chrome instance
browser = p.chromium.connect_over_cdp(remote_debugging_url)
active_tab_info = {}
for context in browser.contexts:
for page in context.pages():
if page.is_visible("body"): # check the visibility of the page body to determine the active status
active_tab_info = {
'title': page.title(),
'url': page.url,
'content': page.content() # get the HTML content of the page
}
break
if active_tab_info:
break
browser.close()
return active_tab_info
if match_type == "url":
expected_urls = rule['urls']
actual_urls = [bookmark['url'] for bookmark in bookmarks]
return 1 if are_lists_equal(expected_urls, actual_urls, compare_urls) else 0
else:
logger.error(f"Unknown type: {match_type}")
return 0

View File

@@ -1,5 +1,5 @@
import os
from PIL import Image, ImageChops, ImageStat
def get_gimp_export_path():
# Path to GIMP's configuration file. This example assumes GIMP version 2.10.
@@ -20,3 +20,99 @@ def get_gimp_export_path():
# Handle the case where the configuration file is not found
print("GIMP configuration file not found")
return False
def check_file_exists(directory, filename):
file_path = os.path.join(directory, filename)
return 1 if os.path.isfile(file_path) else 0
def increase_saturation(image1_path: str, image2_path: str) -> float:
def calculate_saturation(image):
# convert the image to HSV mode
hsv_image = image.convert("HSV")
saturation_channel = hsv_image.split()[1]
# calculate the mean saturation level
stat = ImageStat.Stat(saturation_channel)
mean_saturation = stat.mean[0]
return mean_saturation
image1 = Image.open(image1_path)
image2 = Image.open(image2_path)
# calculate the saturation level of each image
saturation1 = calculate_saturation(image1)
saturation2 = calculate_saturation(image2)
return 1 if saturation1 < saturation2 else 0
def decrease_brightness(image1_path: str, image2_path: str) -> float:
def calculate_brightness(image):
# Convert the image to grayscale mode
grayscale_image = image.convert("L")
# Get the image data
pixels = list(grayscale_image.getdata())
brightness = sum(pixels) / len(pixels)
return brightness
image1 = Image.open(image1_path)
image2 = Image.open(image2_path)
brightness1 = calculate_brightness(image1)
brightness2 = calculate_brightness(image2)
return 1 if brightness1 > brightness2 else 0
import cv2
import numpy as np
def find_yellow_triangle(image):
# Convert the image to RGBA
rgba = cv2.cvtColor(image, cv2.COLOR_BGR2RGBA)
# define range of yellow color in HSV
lower_yellow = np.array([0, 0, 0], dtype=np.uint8)
upper_yellow = np.array([255, 255, 255], dtype=np.uint8)
# expand the dimensions of lower and upper yellow to match the image dimensions
lower_yellow = np.reshape(lower_yellow, (1, 1, 3))
upper_yellow = np.reshape(upper_yellow, (1, 1, 3))
# build a mask for the yellow color
mask = cv2.inRange(rgba, lower_yellow, upper_yellow)
# search for contours in the mask
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# choose the largest contour
max_contour = max(contours, key=cv2.contourArea)
# calculate the center of the contour
M = cv2.moments(max_contour)
cx = int(M['m10'] / M['m00'])
cy = int(M['m01'] / M['m00'])
return cx, cy
def compare_triangle_positions(image1, image2):
image1 = cv2.imread(image1, cv2.IMREAD_COLOR)
image2 = cv2.imread(image2, cv2.IMREAD_COLOR)
# find the center of the yellow triangle in each image
cx1, cy1 = find_yellow_triangle(image1)
cx2, cy2 = find_yellow_triangle(image2)
# calculate the distance between the center of the triangle and the center of the image
center_distance1 = np.sqrt((cx1 - image1.shape[1] // 2)**2 + (cy1 - image1.shape[0] // 2)**2)
center_distance2 = np.sqrt((cx2 - image2.shape[1] // 2)**2 + (cy2 - image2.shape[0] // 2)**2)
return 1 if center_distance1 > center_distance2 else 0
if __name__ == "__main__":
image1_path = "../Downloads/1.png"
image2_path = "../Downloads/edited_darker.png"
decrease_brightness(image1_path, image2_path)
increase_saturation(image1_path, image2_path)

View File

@@ -1,26 +1,29 @@
import zipfile
import lxml.etree
import lxml.cssselect
from lxml.etree import _Element
import xmltodict
import openpyxl
from openpyxl import Workbook
from openpyxl.worksheet.worksheet import Worksheet
from openpyxl.chart._chart import ChartBase
from typing import Dict, List, Set
from typing import Any
import logging
import zipfile
from typing import Any
from typing import Dict, List, Set
from urllib.parse import urlparse, urlunparse
import lxml.cssselect
import lxml.etree
import openpyxl
import xmltodict
from lxml.etree import _Element
from openpyxl import Workbook
from openpyxl.chart._chart import ChartBase
from openpyxl.worksheet.worksheet import Worksheet
logger = logging.getLogger("desktopenv.metrics.utils")
_xlsx_namespaces = [ ("x14", "http://schemas.microsoft.com/office/spreadsheetml/2009/9/main")
, ("xm", "http://schemas.microsoft.com/office/excel/2006/main")
]
_xlsx_namespaces = [("x14", "http://schemas.microsoft.com/office/spreadsheetml/2009/9/main")
, ("xm", "http://schemas.microsoft.com/office/excel/2006/main")
]
_xlsx_ns_mapping = dict(_xlsx_namespaces)
_xlsx_ns_imapping = dict(map(lambda itm: (itm[1], itm[0]), _xlsx_namespaces))
_sparklines_selector = lxml.cssselect.CSSSelector("x14|sparkline", namespaces=_xlsx_ns_mapping)
#print(_sparklines_selector.css)
# print(_sparklines_selector.css)
def load_sparklines(xlsx_file: str) -> Dict[str, str]:
"""
This function modifies data_frame in-place
@@ -44,13 +47,14 @@ def load_sparklines(xlsx_file: str) -> Dict[str, str]:
sparklines_dict: Dict[str, str] = {}
for sp_l in sparklines:
sparkline_xml: str = lxml.etree.tostring(sp_l, encoding="unicode")
sparkline: Dict[str, Dict[str, str]] = xmltodict.parse( sparkline_xml
, process_namespaces=True
, namespaces=_xlsx_ns_imapping
)
sparkline: Dict[str, Dict[str, str]] = xmltodict.parse(sparkline_xml
, process_namespaces=True
, namespaces=_xlsx_ns_imapping
)
sparklines_dict[sparkline["x14:sparkline"]["xm:sqref"]] = sparkline["x14:sparkline"]["xm:f"]
return sparklines_dict
# Available Chart Properties:
# title: str
# anchor: ["oneCell" | "twoCell" | "absolute", col0, row0, col1, row1]
@@ -70,7 +74,7 @@ def load_charts(xlsx_file: Workbook, **options) -> Dict[str, Any]:
Dict[str, Any]: information of charts
"""
#workbook: Workbook = openpyxl.load_workbook(filename=xlsx_file)
# workbook: Workbook = openpyxl.load_workbook(filename=xlsx_file)
worksheet: Worksheet = xlsx_file.active
charts: List[ChartBase] = worksheet._charts
@@ -79,22 +83,22 @@ def load_charts(xlsx_file: Workbook, **options) -> Dict[str, Any]:
for ch in charts:
series: List[str] = []
for ser in ch.series:
value_num = ser.val.numRef.f\
if hasattr(ser.val, "numRef") and hasattr(ser.val.numRef, "f")\
else ""
value_str = ser.val.strRef.f\
if hasattr(ser.val, "strRef") and hasattr(ser.val.strRef, "f")\
else ""
categ_num = ser.cat.numRef.f\
if hasattr(ser.cat, "numRef") and hasattr(ser.cat.numRef, "f")\
else ""
categ_str = ser.cat.strRef.f\
if hasattr(ser.cat, "strRef") and hasattr(ser.cat.strRef, "f")\
else ""
series.append( "{:},{:},{:},{:}".format( value_num, value_str
value_num = ser.val.numRef.f \
if hasattr(ser.val, "numRef") and hasattr(ser.val.numRef, "f") \
else ""
value_str = ser.val.strRef.f \
if hasattr(ser.val, "strRef") and hasattr(ser.val.strRef, "f") \
else ""
categ_num = ser.cat.numRef.f \
if hasattr(ser.cat, "numRef") and hasattr(ser.cat.numRef, "f") \
else ""
categ_str = ser.cat.strRef.f \
if hasattr(ser.cat, "strRef") and hasattr(ser.cat.strRef, "f") \
else ""
series.append("{:},{:},{:},{:}".format(value_num, value_str
, categ_num, categ_str
)
)
)
series: str = ";".join(series)
# TODO: maybe more aspects, like chart type
@@ -103,10 +107,10 @@ def load_charts(xlsx_file: Workbook, **options) -> Dict[str, Any]:
if "title" in chart_props:
info["title"] = ch.title.tx.rich.p[0].r[0].t
if "anchor" in chart_props:
info["anchor"] = [ ch.anchor.editAs
, ch.anchor._from.col, ch.anchor.to.row
, ch.anchor.to.col, ch.anchor.to.row
]
info["anchor"] = [ch.anchor.editAs
, ch.anchor._from.col, ch.anchor.to.row
, ch.anchor.to.col, ch.anchor.to.row
]
if "width" in chart_props:
info["width"] = ch.width
if "height" in chart_props:
@@ -125,43 +129,87 @@ def load_charts(xlsx_file: Workbook, **options) -> Dict[str, Any]:
chart_set[series] = info
return chart_set
def _match_record(pattern: Dict[str, Any], item: Dict[str, Any]) -> bool:
return all(k in item and item[k]==val for k, val in pattern.items())
return all(k in item and item[k] == val for k, val in pattern.items())
def are_lists_equal(list1, list2, comparison_func):
# First check if both lists have the same length
if len(list1) != len(list2):
return False
# Now make sure each element in one list has an equal element in the other list
for item1 in list1:
# Use the supplied function to test for an equal item
if not any(comparison_func(item1, item2) for item2 in list2):
return False
# If all items match, the lists are equal
return True
def compare_urls(url1, url2):
def normalize_url(url):
# Parse the URL
parsed_url = urlparse(url)
# If no scheme is present, assume 'http'
scheme = parsed_url.scheme if parsed_url.scheme else 'http'
# Lowercase the scheme and netloc, remove 'www.', and handle trailing slash
normalized_netloc = parsed_url.netloc.lower().replace("www.", "")
normalized_path = parsed_url.path if parsed_url.path != '/' else ''
# Reassemble the URL with normalized components
normalized_parsed_url = parsed_url._replace(scheme=scheme.lower(), netloc=normalized_netloc,
path=normalized_path)
normalized_url = urlunparse(normalized_parsed_url)
return normalized_url
# Normalize both URLs for comparison
norm_url1 = normalize_url(url1)
norm_url2 = normalize_url(url2)
# Compare the normalized URLs
return norm_url1 == norm_url2
if __name__ == "__main__":
path1 = "../../../../../任务数据/LibreOffice Calc/Create_column_charts_using_statistics_gold_line_scatter.xlsx"
workbook1: Workbook = openpyxl.load_workbook(filename=path1)
worksheet1: Worksheet = workbook1.active
charts: List[ChartBase] = worksheet1._charts
#print(len(charts))
#print(type(charts[0]))
#
#print(len(charts[0].series))
#print(type(charts[0].series[0]))
#print(type(charts[0].series[0].val))
# print(len(charts))
# print(type(charts[0]))
#
# print(len(charts[0].series))
# print(type(charts[0].series[0]))
# print(type(charts[0].series[0].val))
##print(charts[0].series[0].val)
#print(charts[0].series[0].val.numRef.f)
#
#print(type(charts[0].series[0].cat))
# print(charts[0].series[0].val.numRef.f)
#
# print(type(charts[0].series[0].cat))
##print(charts[0].series[0].cat)
#print(charts[0].series[0].cat.numRef)
#print(charts[0].series[0].cat.strRef)
#print(charts[0].series[0].cat.strRef.f)
# print(charts[0].series[0].cat.numRef)
# print(charts[0].series[0].cat.strRef)
# print(charts[0].series[0].cat.strRef.f)
#print(type(charts[0].title.tx.strRef))
#print(type(charts[0].title.tx.rich))
#print(type(charts[0].title.txPr))
#print(len(charts[0].title.tx.rich.p))
#print(len(charts[0].title.tx.rich.p[0].r))
#print(type(charts[0].title.tx.rich.p[0].r[0]))
#print(type(charts[0].title.tx.rich.p[0].r[0].t))
#print(charts[0].title.tx.rich.p[0].r[0].t)
# print(type(charts[0].title.tx.strRef))
# print(type(charts[0].title.tx.rich))
# print(type(charts[0].title.txPr))
# print(len(charts[0].title.tx.rich.p))
# print(len(charts[0].title.tx.rich.p[0].r))
# print(type(charts[0].title.tx.rich.p[0].r[0]))
# print(type(charts[0].title.tx.rich.p[0].r[0].t))
# print(charts[0].title.tx.rich.p[0].r[0].t)
#print(type(charts[0].anchor))
#print(charts[0].anchor.editAs)
#print(charts[0].anchor._from.col, charts[0].anchor.to.row)
#print(charts[0].anchor.to.col, charts[0].anchor.to.row)
# print(type(charts[0].anchor))
# print(charts[0].anchor.editAs)
# print(charts[0].anchor._from.col, charts[0].anchor.to.row)
# print(charts[0].anchor.to.col, charts[0].anchor.to.row)
#df1 = pd.read_excel(path1)
#print(df1)
# df1 = pd.read_excel(path1)
# print(df1)
print(load_charts(path1, chart_props=["title", "xtitle", "ytitle", "type"]))