Merge branch 'main' into zdy

This commit is contained in:
David Chang
2024-01-11 23:02:00 +08:00
39 changed files with 580 additions and 282 deletions

View File

@@ -95,6 +95,12 @@ with sync_playwright() as playwright:
## VLC Media Player
### Bugs fix
One thing on Ubuntu need to do, enter into the `meida`>`convert/save`>select files>`convert/save`
Then enter the profile of `Audio - MP3`, change the profile for mp3, section audiocodec from "MP3" to "MPEG Audio"
Otherwise the mp3 file will be created but with 0 bytes. It's a bug of VLC.
### Setting Up VLC's HTTP Interface
To enable and use the HTTP interface in VLC Media Player for remote control and status checks, follow these steps:

View File

@@ -1,3 +1,5 @@
from .file import get_cloud_file, get_vm_file, get_cache_file
from .info import get_vm_screen_size, get_vm_window_size, get_vm_wallpaper
from .misc import get_rule, get_accessibility_tree
from .vlc import get_vlc_playing_info, get_vlc_config
from .chrome import get_default_search_engine, get_bookmarks

View File

@@ -0,0 +1,126 @@
import json
import logging
import os
import sqlite3
from typing import Dict
logger = logging.getLogger("desktopenv.getters.chrome")
"""
WARNING:
1. Functions from this script assume that no account is registered on Chrome, otherwise the default file path needs to be changed.
2. The functions are not tested on Windows and Mac, but they should work.
"""
def get_default_search_engine(env, config: Dict[str, str]):
os_type = env.vm_platform
if os_type == 'Windows':
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Default\\Preferences'))""")['output'].strip()
elif os_type == 'Darwin':
preference_file_path = env.controller.execute_python_command("import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")['output'].strip()
elif os_type == 'Linux':
preference_file_path = env.controller.execute_python_command("import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")['output'].strip()
else:
raise Exception('Unsupported operating system')
try:
content = env.controller.get_file(preference_file_path)
data = json.loads(content)
# The path within the JSON data to the default search engine might vary
search_engine = data.get('default_search_provider_data', {}).get('template_url_data', {}).get('short_name',
'Google')
return search_engine
except Exception as e:
logger.error(f"Error: {e}")
return "Google"
def get_cookie_data(env, config: Dict[str, str]):
os_type = env.vm_platform
if os_type == 'Windows':
chrome_cookie_file_path = os.path.join(os.getenv('LOCALAPPDATA'), 'Google\\Chrome\\User Data\\Default\\Cookies')
elif os_type == 'Darwin':
chrome_cookie_file_path = os.path.join(os.getenv('HOME'),
'Library/Application Support/Google/Chrome/Default/Cookies')
elif os_type == 'Linux':
chrome_cookie_file_path = os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Cookies')
else:
raise Exception('Unsupported operating system')
# todo: add a new controller function to connect the cookie database
#############
try:
conn = sqlite3.connect(chrome_cookie_file_path)
cursor = conn.cursor()
# Query to check for OpenAI cookies
cursor.execute("SELECT * FROM cookies")
cookies = cursor.fetchall()
return cookies
except Exception as e:
logger.error(f"Error: {e}")
return None
#############
def get_bookmarks(env, config: Dict[str, str]):
os_type = env.vm_platform
if os_type == 'Windows':
preference_file_path = os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Default\\Bookmarks')
elif os_type == 'Darwin':
preference_file_path = os.path.join(os.getenv('HOME'),
'Library/Application Support/Google/Chrome/Default/Bookmarks')
elif os_type == 'Linux':
preference_file_path = os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Bookmarks')
else:
raise Exception('Unsupported operating system')
try:
content = env.controller.get_file(preference_file_path)
# make content json variable
data = json.load(content)
bookmarks = data.get('roots', {})
return bookmarks
except Exception as e:
logger.error(f"Error: {e}")
return None
# todo: move this to the main.py
def get_extensions_installed_from_shop(env, config: Dict[str, str]):
"""Find the Chrome extensions directory based on the operating system."""
os_type = env.vm_platform
if os_type == 'Windows':
chrome_extension_dir = os.path.expanduser(
'~') + '\\AppData\\Local\\Google\\Chrome\\User Data\\Default\\Extensions\\'
elif os_type == 'Darwin': # macOS
chrome_extension_dir = os.path.expanduser(
'~') + '/Library/Application Support/Google/Chrome/Default/Extensions/'
elif os_type == 'Linux':
chrome_extension_dir = os.path.expanduser('~') + '/.config/google-chrome/Default/Extensions/'
else:
raise Exception('Unsupported operating system')
manifests = []
for extension_id in os.listdir(chrome_extension_dir):
extension_path = os.path.join(chrome_extension_dir, extension_id)
if os.path.isdir(extension_path):
# Iterate through version-named subdirectories
for version_dir in os.listdir(extension_path):
version_path = os.path.join(extension_path, version_dir)
manifest_path = os.path.join(version_path, 'manifest.json')
if os.path.isfile(manifest_path):
with open(manifest_path, 'r') as file:
try:
manifest = json.load(file)
manifests.append(manifest)
except json.JSONDecodeError:
logger.error(f"Error reading {manifest_path}")
return manifests

View File

@@ -0,0 +1,20 @@
import os
from typing import Union
def get_vm_screen_size(env, config: dict) -> dict:
return env.controller.get_vm_screen_size()
def get_vm_window_size(env, config: dict) -> dict:
return env.controller.get_vm_window_size(app_class_name=config["app_class_name"])
def get_vm_wallpaper(env, config: dict) -> Union[str, bytes]:
_path = os.path.join(env.cache_dir, config["dest"])
content = env.controller.get_vm_wallpaper()
with open(_path, "wb") as f:
f.write(content)
return _path

View File

@@ -9,12 +9,12 @@ def get_vlc_playing_info(env, config: Dict[str, str]):
"""
Gets the current playing information from VLC's HTTP interface.
"""
_path = os.path.join(env.cache_dir, config["dest"])
host = env.vm_ip
port = 8080
password = 'password'
_path = os.path.join(env.cache_dir, config["dest"])
content = env.controller.get_vlc_status(host, port, password)
with open(_path, "wb") as f:
f.write(content)
@@ -26,22 +26,24 @@ def get_vlc_config(env, config: Dict[str, str]):
"""
Reads the VLC configuration file to check setting.
"""
_path = os.path.join(env.cache_dir, config["dest"])
os_type = env.controller.execute_python_command("import platform; print(platform.system())")['output'].strip()
os_type = env.vm_platform
# fixme: depends on how we config and install the vlc in virtual machine, need to be aligned and double-checked
if os_type == "Linux":
config_path = \
env.controller.execute_python_command("import os; print(os.path.expanduser('~/.config/vlc/vlcrc'))")[
'output'].strip()
env.controller.execute_python_command("import os; print(os.path.expanduser('~/.config/vlc/vlcrc'))")[
'output'].strip()
elif os_type == "Darwin":
config_path = env.controller.execute_python_command(
"import os; print(os.path.expanduser('~/Library/Preferences/org.videolan.vlc/vlcrc'))")['output'].strip()
elif os_type == "Windows":
config_path = env.controller.execute_python_command(
"import os; print(os.path.expanduser('~\\AppData\\Roaming\\vlc\\vlcrc'))")['output'].strip()
else:
raise Exception("Unsupported operating system", os_type)
_path = os.path.join(env.cache_dir, config["dest"])
content = env.controller.get_file(config_path)
with open(_path, "wb") as f:
f.write(content)

View File

@@ -1,10 +1,13 @@
from .table import compare_table
from .table import check_sheet_list, check_xlsx_freeze, check_xlsx_zoom
from .docs import find_default_font, contains_page_break, compare_docx_files, compare_docx_tables, compare_line_spacing, compare_insert_equation
from .docs import compare_font_names, compare_subscript_contains, has_page_numbers_in_footers
from .docs import find_default_font, contains_page_break, compare_docx_files, compare_docx_tables, compare_line_spacing, \
compare_insert_equation
from .docs import is_first_line_centered, check_file_exists, compare_contains_image
from .pdf import check_pdf_pages
from .general import exact_match, fuzzy_match, check_csv, check_accessibility_tree, check_list
from .libreoffice import check_libre_locale
from .vlc import is_vlc_playing, is_vlc_recordings_folder
from .pdf import check_pdf_pages
from .table import check_sheet_list, check_xlsx_freeze, check_xlsx_zoom
from .table import compare_table
from .vlc import is_vlc_playing, is_vlc_recordings_folder, is_vlc_fullscreen, compare_images, compare_audios, \
compare_videos
from .general import check_csv, check_accessibility_tree, check_list
from .thunderbird import check_thunderbird_prefs, check_thunderbird_filter

View File

@@ -1,127 +1,15 @@
import json
import os
import platform
import sqlite3
import logging
from playwright.sync_api import sync_playwright
import logging
logger = logging.getLogger("desktopenv.metrics.chrome")
"""
WARNING:
1. Functions from this script assume that no account is registered on Chrome, otherwise the default file path needs to be changed.
2. The functions are not tested on Windows and Mac, but they should work.
"""
# todo: move to getter module
# The following ones just need to load info from the files of software, no need to connect to the software
def get_default_search_engine():
if platform.system() == 'Windows':
preference_file_path = os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Default\\Preferences')
elif platform.system() == 'Darwin':
preference_file_path = os.path.join(os.getenv('HOME'),
'Library/Application Support/Google/Chrome/Default/Preferences')
elif platform.system() == 'Linux':
preference_file_path = os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences')
else:
raise Exception('Unsupported operating system')
try:
with open(preference_file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
# The path within the JSON data to the default search engine might vary
search_engine = data.get('default_search_provider_data', {}).get('template_url_data', {}).get('short_name',
'Google')
return search_engine
except Exception as e:
logger.error(f"Error: {e}")
return "Google"
def get_cookie_data():
if platform.system() == 'Windows':
chrome_cookie_file_path = os.path.join(os.getenv('LOCALAPPDATA'), 'Google\\Chrome\\User Data\\Default\\Cookies')
elif platform.system() == 'Darwin':
chrome_cookie_file_path = os.path.join(os.getenv('HOME'),
'Library/Application Support/Google/Chrome/Default/Cookies')
elif platform.system() == 'Linux':
chrome_cookie_file_path = os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Cookies')
else:
raise Exception('Unsupported operating system')
try:
conn = sqlite3.connect(chrome_cookie_file_path)
cursor = conn.cursor()
# Query to check for OpenAI cookies
cursor.execute("SELECT * FROM cookies")
cookies = cursor.fetchall()
return cookies
except Exception as e:
logger.error(f"Error: {e}")
return None
def get_bookmarks():
if platform.system() == 'Windows':
preference_file_path = os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Default\\Bookmarks')
elif platform.system() == 'Darwin':
preference_file_path = os.path.join(os.getenv('HOME'),
'Library/Application Support/Google/Chrome/Default/Bookmarks')
elif platform.system() == 'Linux':
preference_file_path = os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Bookmarks')
else:
raise Exception('Unsupported operating system')
try:
with open(preference_file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
bookmarks = data.get('roots', {})
return bookmarks
except Exception as e:
logger.error(f"Error: {e}")
return None
def get_extensions_installed_from_shop():
"""Find the Chrome extensions directory based on the operating system."""
os_name = platform.system()
if os_name == 'Windows':
chrome_extension_dir = os.path.expanduser(
'~') + '\\AppData\\Local\\Google\\Chrome\\User Data\\Default\\Extensions\\'
elif os_name == 'Darwin': # macOS
chrome_extension_dir = os.path.expanduser(
'~') + '/Library/Application Support/Google/Chrome/Default/Extensions/'
elif os_name == 'Linux':
chrome_extension_dir = os.path.expanduser('~') + '/.config/google-chrome/Default/Extensions/'
else:
raise Exception('Unsupported operating system')
manifests = []
for extension_id in os.listdir(chrome_extension_dir):
extension_path = os.path.join(chrome_extension_dir, extension_id)
if os.path.isdir(extension_path):
# Iterate through version-named subdirectories
for version_dir in os.listdir(extension_path):
version_path = os.path.join(extension_path, version_dir)
manifest_path = os.path.join(version_path, 'manifest.json')
if os.path.isfile(manifest_path):
with open(manifest_path, 'r') as file:
try:
manifest = json.load(file)
manifests.append(manifest)
except json.JSONDecodeError:
logger.error(f"Error reading {manifest_path}")
return manifests
# The following ones require Playwright to be installed on the target machine, and the chrome needs to be pre-config on port info to allow remote debugging, see README.md for details

View File

@@ -1,22 +1,32 @@
import csv
import functools
import operator
import re
from numbers import Number
from typing import Callable, Any
from typing import Dict, List, Pattern
import lxml.etree
from lxml.etree import _Element
from lxml.cssselect import CSSSelector
from typing import Dict, List, Pattern
from typing import Callable, Any
from numbers import Number
import operator
from lxml.etree import _Element
from rapidfuzz import fuzz
import functools
import re
from .utils import _match_record
#def _match_record(pattern: Dict[str, str], item: Dict[str, str]) -> bool:
#return all(k in item and item[k]==val for k, val in pattern.items())
def exact_match(result, rules) -> float:
expect = rules["expected"]
print(result, expect)
if result == expect:
return 1.
else:
return 0.
def fuzzy_match(result, rules) -> float:
expect = rules["expected"]
return fuzz.ratio(result, expect) / 100.
def check_csv(result: str, rules: Dict[str, List[Dict[str, str]]]) -> float:
"""
@@ -46,6 +56,7 @@ def check_csv(result: str, rules: Dict[str, List[Dict[str, str]]]) -> float:
unexpect_metric = unexpect_metric and not any(_match_record(r, rcd) for r in rules.get("unexpect", []))
return float(all(expect_metrics) and unexpect_metric)
def check_list(result: str, rules: Dict[str, List[str]]) -> float:
"""
Args:
@@ -75,15 +86,18 @@ def check_list(result: str, rules: Dict[str, List[str]]) -> float:
unexpect_metric = unexpect_metric and all(r.search(l) is None for r in unexpect_patterns)
return float(all(expect_metrics) and unexpect_metric)
_accessibility_ns_map = { "st": "uri:deskat:state.at-spi.gnome.org"
, "attr": "uri:deskat:attributes.at-spi.gnome.org"
, "cp": "uri:deskat:component.at-spi.gnome.org"
, "doc": "uri:deskat:document.at-spi.gnome.org"
, "docattr": "uri:deskat:attributes.document.at-spi.gnome.org"
, "txt": "uri:deskat:text.at-spi.gnome.org"
, "val": "uri:deskat:value.at-spi.gnome.org"
, "act": "uri:deskat:action.at-spi.gnome.org"
}
_accessibility_ns_map = {"st": "uri:deskat:state.at-spi.gnome.org"
, "attr": "uri:deskat:attributes.at-spi.gnome.org"
, "cp": "uri:deskat:component.at-spi.gnome.org"
, "doc": "uri:deskat:document.at-spi.gnome.org"
, "docattr": "uri:deskat:attributes.document.at-spi.gnome.org"
, "txt": "uri:deskat:text.at-spi.gnome.org"
, "val": "uri:deskat:value.at-spi.gnome.org"
, "act": "uri:deskat:action.at-spi.gnome.org"
}
def check_accessibility_tree(result: str, rules: Dict[str, Any]) -> float:
"""
Args:
@@ -114,11 +128,12 @@ def check_accessibility_tree(result: str, rules: Dict[str, Any]) -> float:
else:
raise ValueError("At least one of xpath and selectors is required")
if len(elements)==0:
if len(elements) == 0:
return 0.
if "text" in rules:
match_func: Callable[[str], Number] = functools.partial( operator.eq if rules["exact"] else fuzz.ratio
match_func: Callable[[str], Number] = functools.partial( operator.eq if rules["exact"]\
else (lambda a, b: fuzz.ratio(a, b)/100.)
, rules["text"]
)
match_score: Number = 0
@@ -129,5 +144,5 @@ def check_accessibility_tree(result: str, rules: Dict[str, Any]) -> float:
return float(match_score)
#def check_existence(result: str, *args) -> float:
#return 1. - (result is None)
# def check_existence(result: str, *args) -> float:
# return 1. - (result is None)

View File

@@ -7,7 +7,10 @@ from xml.etree import ElementTree
import acoustid
import cv2
import imagehash
import librosa
import numpy as np
from PIL import Image
from skimage.metrics import structural_similarity as ssim
logger = logging.getLogger("desktopenv.metrics.vlc")
@@ -37,6 +40,7 @@ def is_vlc_playing(actual_status_path: str, rule: Dict[str, str]) -> float:
return 0
# fixme: part of this function can be moved to getters
def is_vlc_recordings_folder(actual_config_path: str, rule: Dict[str, str]) -> float:
"""
Checks if VLC's recording folder is set to the expected value.
@@ -57,24 +61,79 @@ def is_vlc_recordings_folder(actual_config_path: str, rule: Dict[str, str]) -> f
current_path = line.split('=')[-1].strip()
# Compare with the Desktop path
if current_path == expected_recording_file_path:
return True
return 1
else:
return False
return 0
# The configuration key was not found in the file
return False
return 0
except FileNotFoundError:
logger.error("VLC configuration file not found.")
return False
return 0
except Exception as e:
logger.error(f"An error occurred: {e}")
return False
return 0
def is_vlc_fullscreen(actual_window_size, screen_size):
if actual_window_size['width'] == screen_size['width'] and actual_window_size['height'] == screen_size['height']:
return True
return 1
else:
return False
return 0
def compare_images(image1_path, image2_path):
# You would call this function with the paths to the two images you want to compare:
# score = compare_images('path_to_image1', 'path_to_image2')
# print("Similarity score:", score)
# Open the images and convert to grayscale
image1 = Image.open(image1_path).convert('L')
image2 = Image.open(image2_path).convert('L')
# Resize images to the smaller one's size for comparison
image1_size = image1.size
image2_size = image2.size
new_size = min(image1_size, image2_size)
image1 = image1.resize(new_size, Image.Resampling.LANCZOS)
image2 = image2.resize(new_size, Image.Resampling.LANCZOS)
# Convert images to numpy arrays
image1_array = np.array(image1)
image2_array = np.array(image2)
# Calculate SSIM between two images
similarity_index = ssim(image1_array, image2_array)
return similarity_index
def compare_audios(audio_path_1, audio_path_2, max_distance=1000):
"""
Compare two audio files and return a similarity score in the range [0, 1].
audio_path_1, audio_path_2: paths to the audio files to compare
max_distance: an empirically determined maximum expected DTW distance
"""
# Example Usage:
# similarity = compare_audios_simple('path_to_audio1.mp3', 'path_to_audio2.mp3')
# print(f'Similarity Score: {similarity}')
# Convert to common format if necessary and load audio
y1, sr1 = librosa.load(audio_path_1)
y2, sr2 = librosa.load(audio_path_2)
# Extract MFCC features
mfcc1 = librosa.feature.mfcc(y=y1, sr=sr1)
mfcc2 = librosa.feature.mfcc(y=y2, sr=sr2)
# Compute Dynamic Time Warping distance
distance, path = librosa.sequence.dtw(mfcc1.T, mfcc2.T)
# Normalize distance to get a similarity score
normalized_distance = np.mean(distance) / max_distance
similarity_score = 1 - min(normalized_distance, 1) # Ensure the score is within [0, 1]
return similarity_score
def compare_videos(video_path1, video_path2, max_frames_to_check=100, threshold=5):