add multi_apps; update chrome utilities
This commit is contained in:
@@ -8,7 +8,9 @@ from typing import Any, Union, Optional
|
||||
from typing import Dict, List
|
||||
|
||||
import requests
|
||||
from playwright.sync_api import sync_playwright
|
||||
from pydrive.auth import GoogleAuth
|
||||
from pydrive.drive import GoogleDrive, GoogleDriveFile, GoogleDriveFileList
|
||||
from playwright.sync_api import sync_playwright, TimeoutError
|
||||
from requests_toolbelt.multipart.encoder import MultipartEncoder
|
||||
|
||||
from desktop_env.evaluators.metrics.utils import compare_urls
|
||||
@@ -46,7 +48,7 @@ class SetupController:
|
||||
# Assumes all the setup the functions should follow this name
|
||||
# protocol
|
||||
setup_function: str = "_{:}_setup".format(config_type)
|
||||
assert hasattr(self, setup_function)
|
||||
assert hasattr(self, setup_function), f'Setup controller cannot find init function {setup_function}'
|
||||
getattr(self, setup_function)(**parameters)
|
||||
|
||||
logger.info("SETUP: %s(%s)", setup_function, str(parameters))
|
||||
@@ -416,3 +418,99 @@ class SetupController:
|
||||
|
||||
# Do not close the context or browser; they will remain open after script ends
|
||||
return browser, context
|
||||
|
||||
# google drive setup
|
||||
def _googledrive_setup(self, **config):
|
||||
""" Clean google drive space (eliminate the impact of previous experiments to reset the environment)
|
||||
@args:
|
||||
config(Dict[str, Any]): contain keys
|
||||
settings_file(str): path to google drive settings file, which will be loaded by pydrive.auth.GoogleAuth()
|
||||
operation(List[str]): each operation is chosen from ['delete', 'upload']
|
||||
args(List[Dict[str, Any]]): parameters for each operation
|
||||
different args dict for different operations:
|
||||
for delete:
|
||||
query(str): query pattern string to search files or folder in google drive to delete, please refer to
|
||||
https://developers.google.com/drive/api/guides/search-files?hl=en about how to write query string.
|
||||
By default, move all files/folders into trash (can be recovered).
|
||||
trash(bool): whether to delete files permanently or move to trash. By default, trash=True, just move to trash.
|
||||
TODO: other operations
|
||||
"""
|
||||
settings_file = config.get('settings_file', 'evaluation_examples/settings/googledrive/settings.yml')
|
||||
gauth = GoogleAuth(settings_file=settings_file)
|
||||
drive = GoogleDrive(gauth)
|
||||
|
||||
for oid, operation in enumerate(config['operation']):
|
||||
if operation == 'delete': # delete a specific file
|
||||
# query pattern string, by default, remove all files/folders not in the trash to the trash
|
||||
params = config['args'][oid]
|
||||
q = params.get('query', 'trashed = false')
|
||||
trash = params.get('trash', True)
|
||||
filelist: GoogleDriveFileList = drive.ListFile({'q': q}).GetList()
|
||||
for file in filelist:
|
||||
file: GoogleDriveFile
|
||||
# note that, if a folder is trashed/deleted, all files and folders in it will be trashed/deleted
|
||||
# this is the same for UnTrash
|
||||
if trash: file.Trash()
|
||||
else: file.Delete()
|
||||
elif operation == 'upload':
|
||||
pass
|
||||
else:
|
||||
raise ValueError('[ERROR]: not implemented clean type!')
|
||||
|
||||
|
||||
def _login_setup(self, **config):
|
||||
""" Login to a website with account and password information.
|
||||
@args:
|
||||
config(Dict[str, Any]): contain keys
|
||||
settings_file(str): path to the settings file
|
||||
platform(str): platform to login, implemented platforms include:
|
||||
googledrive: https://drive.google.com/drive/my-drive
|
||||
|
||||
"""
|
||||
host = self.vm_ip
|
||||
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
||||
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
with sync_playwright() as p:
|
||||
browser = None
|
||||
for attempt in range(15):
|
||||
try:
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
break
|
||||
except Exception as e:
|
||||
if attempt < 14:
|
||||
logger.error(f"Attempt {attempt + 1}: Failed to connect, retrying. Error: {e}")
|
||||
time.sleep(1)
|
||||
else:
|
||||
logger.error(f"Failed to connect after multiple attempts: {e}")
|
||||
raise e
|
||||
if not browser:
|
||||
return
|
||||
|
||||
context = browser.contexts[0]
|
||||
platform = config['platform']
|
||||
|
||||
if platform == 'googledrive':
|
||||
url = 'https://drive.google.com/drive/my-drive'
|
||||
page = context.new_page() # Create a new page (tab) within the existing context
|
||||
page.goto(url)
|
||||
logger.info(f"Opened new page: {url}")
|
||||
settings = json.load(open(config['settings_file']))
|
||||
email, password = settings['account'], settings['password']
|
||||
|
||||
try:
|
||||
page.wait_for_selector('input[type="email"]', state="visible", timeout=3000)
|
||||
page.fill('input[type="email"]', email)
|
||||
page.click('#identifierNext > div > button')
|
||||
page.wait_for_selector('input[type="password"]', state="visible", timeout=5000)
|
||||
page.fill('input[type="password"]', password)
|
||||
page.click('#passwordNext > div > button')
|
||||
page.wait_for_load_state('load', timeout=5000)
|
||||
except TimeoutError:
|
||||
logger.info('[ERROR]: timeout when waiting for google drive login page to load!')
|
||||
return
|
||||
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
return browser, context
|
||||
@@ -1,7 +1,6 @@
|
||||
from .chrome import get_default_search_engine, get_cookie_data, get_bookmarks, get_open_tabs_info, get_pdf_from_url, \
|
||||
get_shortcuts_on_desktop, get_history, get_enabled_experiments, get_chrome_language, get_chrome_font_size, \
|
||||
get_profile_name, \
|
||||
get_number_of_search_results
|
||||
get_profile_name, get_number_of_search_results, get_googledrive_file
|
||||
from .file import get_cloud_file, get_vm_file, get_cache_file
|
||||
from .general import get_vm_command_line
|
||||
from .impress import get_audio_in_slide
|
||||
@@ -9,4 +8,4 @@ from .info import get_vm_screen_size, get_vm_window_size, get_vm_wallpaper
|
||||
from .misc import get_rule, get_accessibility_tree
|
||||
from .replay import get_replay
|
||||
from .vlc import get_vlc_playing_info, get_vlc_config
|
||||
from .vscode import get_vscode_config
|
||||
from .vscode import get_vscode_config
|
||||
@@ -2,8 +2,9 @@ import json
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
from typing import Dict
|
||||
|
||||
from typing import Dict, Any
|
||||
from pydrive.auth import GoogleAuth
|
||||
from pydrive.drive import GoogleDrive, GoogleDriveFileList, GoogleDriveFile
|
||||
from playwright.sync_api import sync_playwright
|
||||
|
||||
logger = logging.getLogger("desktopenv.getters.chrome")
|
||||
@@ -461,3 +462,49 @@ def get_number_of_search_results(env, config: Dict[str, str]):
|
||||
browser.close()
|
||||
|
||||
return actual_count
|
||||
|
||||
|
||||
def get_googledrive_file(env, config: Dict[str, Any]) -> str:
|
||||
""" Get the desired file from Google Drive based on config, return the downloaded local filepath.
|
||||
"""
|
||||
settings_file = config.get('settings_file', 'evaluation_examples/settings/googledrive/settings.json')
|
||||
auth = GoogleAuth(settings_file=settings_file)
|
||||
drive = GoogleDrive(auth)
|
||||
|
||||
q = config['query']
|
||||
filelist: GoogleDriveFileList = drive.ListFile({'q': q}).GetList()
|
||||
if len(filelist) == 0: # target file not found
|
||||
return None
|
||||
|
||||
file: GoogleDriveFile = filelist[0] # HACK: if multiple candidates, just download the first one
|
||||
_path = os.path.join(env.cache_dir, config['dest'])
|
||||
|
||||
try:
|
||||
file.GetContentFile(_path, mimetype=file.metadata['mimeType'])
|
||||
except:
|
||||
logger.info('[ERROR]: Failed to download the file from Google Drive')
|
||||
return None
|
||||
return _path
|
||||
|
||||
|
||||
def get_googledrive_file(env, config: Dict[str, Any]) -> str:
|
||||
""" Get the desired file from Google Drive based on config, return the downloaded local filepath.
|
||||
"""
|
||||
settings_file = config.get('settings_file', 'evaluation_examples/settings/googledrive/settings.json')
|
||||
auth = GoogleAuth(settings_file=settings_file)
|
||||
drive = GoogleDrive(auth)
|
||||
|
||||
q = config['query']
|
||||
filelist: GoogleDriveFileList = drive.ListFile({'q': q}).GetList()
|
||||
if len(filelist) == 0: # target file not found
|
||||
return None
|
||||
|
||||
file: GoogleDriveFile = filelist[0] # HACK: if multiple candidates, just download the first one
|
||||
_path = os.path.join(env.cache_dir, config['dest'])
|
||||
|
||||
try:
|
||||
file.GetContentFile(_path, mimetype=file.metadata['mimeType'])
|
||||
except:
|
||||
logger.info('[ERROR]: Failed to download the file from Google Drive')
|
||||
return None
|
||||
return _path
|
||||
@@ -1,5 +1,5 @@
|
||||
from .chrome import is_expected_tabs, is_expected_bookmarks, compare_pdfs, is_cookie_deleted, is_shortcut_on_desktop, check_font_size, \
|
||||
check_enabled_experiments, check_history_deleted
|
||||
check_enabled_experiments, check_history_deleted, is_expected_search_query
|
||||
from .docs import compare_font_names, compare_subscript_contains, has_page_numbers_in_footers, compare_docx_lines
|
||||
from .docs import find_default_font, contains_page_break, compare_docx_files, compare_docx_tables, compare_line_spacing, \
|
||||
compare_insert_equation, compare_highlighted_text
|
||||
@@ -20,4 +20,4 @@ from .thunderbird import check_thunderbird_prefs, check_thunderbird_filter
|
||||
from .vlc import is_vlc_playing, is_vlc_recordings_folder, is_vlc_fullscreen, compare_images, compare_audios, \
|
||||
compare_videos, check_qt_bgcone, check_one_instance_when_started_from_file,check_qt_minimal_view, check_qt_max_volume, \
|
||||
check_qt_slider_colours, check_global_key_play_pause
|
||||
from .vscode import compare_text_file, compare_config, compare_answer, is_extension_installed, check_json_settings, check_json_keybindings
|
||||
from .vscode import compare_text_file, compare_config, compare_answer, is_extension_installed, check_json_settings, check_json_keybindings
|
||||
@@ -1,4 +1,4 @@
|
||||
import logging
|
||||
import logging, re
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import fitz # PyMuPDF
|
||||
@@ -44,6 +44,15 @@ def is_expected_bookmarks(bookmarks: List[str], rule: Dict[str, Any]) -> float:
|
||||
raise TypeError(f"{rule['type']} not support yet!")
|
||||
|
||||
|
||||
def is_expected_search_query(active_tab_info: Dict[str, str], rules: Dict[str, Any]) -> float:
|
||||
expected = rules['expect']
|
||||
pattern = expected['pattern']
|
||||
matched = re.search(pattern, active_tab_info['url'])
|
||||
if matched:
|
||||
return 1.
|
||||
return 0.
|
||||
|
||||
|
||||
def compare_pdfs(pdf1_path, pdf2_path):
|
||||
"""
|
||||
Compare two PDF files.
|
||||
@@ -56,11 +65,14 @@ def compare_pdfs(pdf1_path, pdf2_path):
|
||||
for page in pdf:
|
||||
text += page.get_text()
|
||||
return text.strip()
|
||||
try:
|
||||
text1 = extract_text_from_pdf(pdf1_path)
|
||||
text2 = extract_text_from_pdf(pdf2_path)
|
||||
|
||||
text1 = extract_text_from_pdf(pdf1_path)
|
||||
text2 = extract_text_from_pdf(pdf2_path)
|
||||
|
||||
return fuzz.ratio(text1, text2) / 100
|
||||
return fuzz.ratio(text1, text2) / 100
|
||||
except Exception as e:
|
||||
logger.info(f"[ERROR]: unexpected error occurred when comparing PDF files: {e}")
|
||||
return 0.0
|
||||
|
||||
|
||||
def is_cookie_deleted(cookie_data, rule):
|
||||
|
||||
@@ -61,6 +61,18 @@ def execute_command():
|
||||
}), 500
|
||||
|
||||
|
||||
def _get_machine_architecture() -> str:
|
||||
""" Get the machine architecture, e.g., x86_64, arm64, aarch64, i386, etc.
|
||||
"""
|
||||
architecture = platform.machine().lower()
|
||||
if architecture in ['amd32', 'amd64', 'x86', 'x86_64', 'x86-64', 'x64', 'i386', 'i686']:
|
||||
return 'amd'
|
||||
elif architecture in ['arm64', 'aarch64', 'aarch32']:
|
||||
return 'arm'
|
||||
else:
|
||||
return 'unknown'
|
||||
|
||||
|
||||
@app.route('/setup/launch', methods=["POST"])
|
||||
def launch_app():
|
||||
data = request.json
|
||||
@@ -71,6 +83,9 @@ def launch_app():
|
||||
command = shlex.split(command)
|
||||
|
||||
try:
|
||||
if 'google-chrome' in command and _get_machine_architecture() == 'arm':
|
||||
index = command.index('google-chrome')
|
||||
command[index] = 'chromium-browser' # arm64 chrome is not available yet, can only use chromium
|
||||
subprocess.Popen(command, shell=shell)
|
||||
return "{:} launched successfully".format(command if shell else " ".join(command))
|
||||
except Exception as e:
|
||||
@@ -287,6 +302,11 @@ def _create_atspi_node(node: Accessible) -> _Element:
|
||||
)
|
||||
if "text" in locals() and len(text) > 0:
|
||||
xml_node.text = text
|
||||
|
||||
# HACK: libreoffice has a problem -> billions of children for parent with RoleName "document spreadsheet"
|
||||
if node.getRoleName() == "document spreadsheet":
|
||||
return xml_node
|
||||
|
||||
for ch in node:
|
||||
xml_node.append(_create_atspi_node(ch))
|
||||
return xml_node
|
||||
|
||||
Reference in New Issue
Block a user