Files
sci-gui-agent-benchmark/desktop_env/evaluators/getters/chrome.py
2024-02-01 00:53:31 +08:00

519 lines
22 KiB
Python

import json
import logging
import os
import sqlite3
from typing import Dict, Any
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive, GoogleDriveFileList, GoogleDriveFile
from playwright.sync_api import sync_playwright
logger = logging.getLogger("desktopenv.getters.chrome")
"""
WARNING:
1. Functions from this script assume that no account is registered on Chrome, otherwise the default file path needs to be changed.
2. The functions are not tested on Windows and Mac, but they should work.
"""
# The following ones just need to load info from the files of software, no need to connect to the software
def get_default_search_engine(env, config: Dict[str, str]):
os_type = env.vm_platform
if os_type == 'Windows':
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Default\\Preferences'))""")['output'].strip()
elif os_type == 'Darwin':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
'output'].strip()
elif os_type == 'Linux':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
'output'].strip()
else:
raise Exception('Unsupported operating system')
try:
content = env.controller.get_file(preference_file_path)
data = json.loads(content)
# The path within the JSON data to the default search engine might vary
search_engine = data.get('default_search_provider_data', {}).get('template_url_data', {}).get('short_name',
'Google')
return search_engine
except Exception as e:
logger.error(f"Error: {e}")
return "Google"
def get_cookie_data(env, config: Dict[str, str]):
"""
Get the cookies from the Chrome browser.
Assume the cookies are stored in the default location, not encrypted and not large in size.
"""
os_type = env.vm_platform
if os_type == 'Windows':
chrome_cookie_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Default\\Cookies'))""")['output'].strip()
elif os_type == 'Darwin':
chrome_cookie_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Cookies'))")[
'output'].strip()
elif os_type == 'Linux':
chrome_cookie_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Cookies'))")[
'output'].strip()
else:
raise Exception('Unsupported operating system')
try:
content = env.controller.get_file(chrome_cookie_file_path)
_path = os.path.join(env.cache_dir, config["dest"])
with open(_path, "wb") as f:
f.write(content)
conn = sqlite3.connect(_path)
cursor = conn.cursor()
# Query to check for OpenAI cookies
cursor.execute("SELECT * FROM cookies")
cookies = cursor.fetchall()
return cookies
except Exception as e:
logger.error(f"Error: {e}")
return None
def get_history(env, config: Dict[str, str]):
os_type = env.vm_platform
if os_type == 'Windows':
chrome_history_path = env.controller.execute_python_command(
"""import os; print(os.path.join(os.getenv('USERPROFILE'), "AppData", "Local", "Google", "Chrome", "User Data", "Default", "History"))""")[
'output'].strip()
elif os_type == 'Darwin':
chrome_history_path = env.controller.execute_python_command(
"""import os; print(os.path.join(os.getenv('HOME'), "Library", "Application Support", "Google", "Chrome", "Default", "History"))""")[
'output'].strip()
elif os_type == 'Linux':
chrome_history_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), '.config', 'google-chrome', 'Default', 'History'))")[
'output'].strip()
else:
raise Exception('Unsupported operating system')
try:
content = env.controller.get_file(chrome_history_path)
_path = os.path.join(env.cache_dir, config["dest"])
with open(_path, "wb") as f:
f.write(content)
conn = sqlite3.connect(_path)
cursor = conn.cursor()
# Query to check for OpenAI cookies
cursor.execute("SELECT url, title, last_visit_time FROM urls")
history_items = cursor.fetchall()
return history_items
except Exception as e:
logger.error(f"Error: {e}")
return None
def get_enabled_experiments(env, config: Dict[str, str]):
os_type = env.vm_platform
if os_type == 'Windows':
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Local State'))""")[
'output'].strip()
elif os_type == 'Darwin':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Local State'))")[
'output'].strip()
elif os_type == 'Linux':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Local State'))")[
'output'].strip()
else:
raise Exception('Unsupported operating system')
try:
content = env.controller.get_file(preference_file_path)
data = json.loads(content)
# The path within the JSON data to the default search engine might vary
enabled_labs_experiments = data.get('browser', {}).get('enabled_labs_experiments', [])
return enabled_labs_experiments
except Exception as e:
logger.error(f"Error: {e}")
return []
def get_profile_name(env, config: Dict[str, str]):
"""
Get the username from the Chrome browser.
Assume the cookies are stored in the default location, not encrypted and not large in size.
"""
os_type = env.vm_platform
if os_type == 'Windows':
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Default\\Preferences'))""")['output'].strip()
elif os_type == 'Darwin':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
'output'].strip()
elif os_type == 'Linux':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
'output'].strip()
else:
raise Exception('Unsupported operating system')
try:
content = env.controller.get_file(preference_file_path)
data = json.loads(content)
# The path within the JSON data to the default search engine might vary
profile_name = data.get('profile', {}).get('name', None)
return profile_name
except Exception as e:
logger.error(f"Error: {e}")
return None
def get_chrome_language(env, config: Dict[str, str]):
os_type = env.vm_platform
if os_type == 'Windows':
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Local State'))""")[
'output'].strip()
elif os_type == 'Darwin':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Local State'))")[
'output'].strip()
elif os_type == 'Linux':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Local State'))")[
'output'].strip()
else:
raise Exception('Unsupported operating system')
try:
content = env.controller.get_file(preference_file_path)
data = json.loads(content)
# The path within the JSON data to the default search engine might vary
enabled_labs_experiments = data.get('intl', {}).get('app_locale', "en-US")
return enabled_labs_experiments
except Exception as e:
logger.error(f"Error: {e}")
return "en-US"
def get_chrome_font_size(env, config: Dict[str, str]):
os_type = env.vm_platform
if os_type == 'Windows':
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Default\\Preferences'))""")[
'output'].strip()
elif os_type == 'Darwin':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
'output'].strip()
elif os_type == 'Linux':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
'output'].strip()
else:
raise Exception('Unsupported operating system')
try:
content = env.controller.get_file(preference_file_path)
data = json.loads(content)
# The path within the JSON data to the default search engine might vary
search_engine = data.get('webkit', {}).get('webprefs', {
"default_fixed_font_size": 13,
"default_font_size": 16,
"minimum_font_size": 13
})
return search_engine
except Exception as e:
logger.error(f"Error: {e}")
return {
"default_fixed_font_size": 13,
"default_font_size": 16
}
def get_bookmarks(env, config: Dict[str, str]):
os_type = env.vm_platform
if os_type == 'Windows':
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
'Google\\Chrome\\User Data\\Default\\Bookmarks'))""")['output'].strip()
elif os_type == 'Darwin':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Bookmarks'))")[
'output'].strip()
elif os_type == 'Linux':
preference_file_path = env.controller.execute_python_command(
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Bookmarks'))")[
'output'].strip()
else:
raise Exception('Unsupported operating system')
content = env.controller.get_file(preference_file_path)
if not content:
return []
data = json.loads(content)
bookmarks = data.get('roots', {})
return bookmarks
# todo: move this to the main.py
def get_extensions_installed_from_shop(env, config: Dict[str, str]):
"""Find the Chrome extensions directory based on the operating system."""
os_type = env.vm_platform
if os_type == 'Windows':
chrome_extension_dir = env.controller.execute_python_command(
"""os.path.expanduser('~') + '\\AppData\\Local\\Google\\Chrome\\User Data\\Default\\Extensions\\'""")[
'output'].strip()
elif os_type == 'Darwin': # macOS
chrome_extension_dir = env.controller.execute_python_command(
"""os.path.expanduser('~') + '/Library/Application Support/Google/Chrome/Default/Extensions/'""")[
'output'].strip()
elif os_type == 'Linux':
chrome_extension_dir = env.controller.execute_python_command(
"""os.path.expanduser('~') + '/.config/google-chrome/Default/Extensions/'""")['output'].strip()
else:
raise Exception('Unsupported operating system')
manifests = []
for extension_id in os.listdir(chrome_extension_dir):
extension_path = os.path.join(chrome_extension_dir, extension_id)
if os.path.isdir(extension_path):
# Iterate through version-named subdirectories
for version_dir in os.listdir(extension_path):
version_path = os.path.join(extension_path, version_dir)
manifest_path = os.path.join(version_path, 'manifest.json')
if os.path.isfile(manifest_path):
with open(manifest_path, 'r') as file:
try:
manifest = json.load(file)
manifests.append(manifest)
except json.JSONDecodeError:
logger.error(f"Error reading {manifest_path}")
return manifests
# The following ones require Playwright to be installed on the target machine, and the chrome needs to be pre-config on
# port info to allow remote debugging, see README.md for details
def get_open_tabs_info(env, config: Dict[str, str]):
host = env.vm_ip
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
remote_debugging_url = f"http://{host}:{port}"
with sync_playwright() as p:
# connect to remote Chrome instance
browser = p.chromium.connect_over_cdp(remote_debugging_url)
tabs_info = []
for context in browser.contexts:
for page in context.pages:
try:
# Wait for the page to finish loading, this prevents the "execution context was destroyed" issue
page.wait_for_load_state('load') # Wait for the 'load' event to complete
title = page.title()
url = page.url
tabs_info.append({'title': title, 'url': url})
except TimeoutError:
# If page loading times out, catch the exception and store the current information in the list
tabs_info.append({'title': 'Load timeout', 'url': page.url})
except Exception as e:
# Catch other potential exceptions that might occur while reading the page title
print(f'Error: {e}')
tabs_info.append({'title': 'Error encountered', 'url': page.url})
browser.close()
return tabs_info
def get_active_tab_info(env, config: Dict[str, str]):
host = env.vm_ip
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
remote_debugging_url = f"http://{host}:{port}"
with sync_playwright() as p:
# connect to remote Chrome instance
browser = p.chromium.connect_over_cdp(remote_debugging_url)
active_tab_info = {}
for context in browser.contexts:
for page in context.pages:
if page.is_visible("body"): # check the visibility of the page body to determine the active status
active_tab_info = {
'title': page.title(),
'url': page.url,
'content': page.content() # get the HTML content of the page
}
break
if active_tab_info:
break
browser.close()
return active_tab_info
def get_pdf_from_url(env, config: Dict[str, str]) -> str:
"""
Download a PDF from a URL.
"""
_url = config["path"]
_path = os.path.join(env.cache_dir, config["dest"])
host = env.vm_ip
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
remote_debugging_url = f"http://{host}:{port}"
with sync_playwright() as p:
browser = p.chromium.connect_over_cdp(remote_debugging_url)
page = browser.new_page()
page.goto(_url)
page.pdf(path=_path)
browser.close()
return _path
# fixme: needs to be changed (maybe through post-processing) since it's not working
def get_chrome_saved_address(env, config: Dict[str, str]):
host = env.vm_ip
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
remote_debugging_url = f"http://{host}:{port}"
with sync_playwright() as p:
# connect to remote Chrome instance
browser = p.chromium.connect_over_cdp(remote_debugging_url)
page = browser.new_page()
# Navigate to Chrome's settings page for autofill
page.goto("chrome://settings/addresses")
# Get the HTML content of the page
content = page.content()
browser.close()
return content
def get_shortcuts_on_desktop(env, config: Dict[str, str]):
# Find out the operating system
os_name = env.vm_platform
# Depending on the OS, define the shortcut file extension
if os_name == 'Windows':
# Windows shortcuts are typically .url or .lnk files
shortcut_extension = '.lnk'
elif os_name == 'Darwin':
# macOS's shortcuts are .webloc files
shortcut_extension = '.webloc'
elif os_name == 'Linux':
# Linux (Ubuntu, etc.) shortcuts are typically .desktop files
shortcut_extension = '.desktop'
else:
logger.error(f"Unsupported operating system: {os_name}")
return []
# Get the path to the desktop folder
desktop_path = env.controller.get_vm_desktop_path()
desktop_directory_tree = env.controller.get_vm_directory_tree(desktop_path)
shortcuts_paths = [file['name'] for file in desktop_directory_tree['children'] if
file['name'].endswith(shortcut_extension)]
short_cuts = {}
for shortcut_path in shortcuts_paths:
short_cuts[shortcut_path] = env.controller.get_file(env.controller.execute_python_command(
f"import os; print(os.path.join(os.path.expanduser('~'), 'Desktop', '{shortcut_path}'))")[
'output'].strip()).decode('utf-8')
return short_cuts
def get_number_of_search_results(env, config: Dict[str, str]):
# todo: move into the config file
url, result_selector = "https://google.com/search?q=query", '.search-result'
host = env.vm_ip
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
remote_debugging_url = f"http://{host}:{port}"
with sync_playwright() as p:
browser = p.chromium.connect_over_cdp(remote_debugging_url)
page = browser.new_page()
page.goto(url)
search_results = page.query_selector_all(result_selector)
actual_count = len(search_results)
browser.close()
return actual_count
def get_googledrive_file(env, config: Dict[str, Any]) -> str:
""" Get the desired file from Google Drive based on config, return the downloaded local filepath.
To retrieve the file, we provide two options in config dict:
1. query: a list of queries to search the file, each query is a string that follows the format of Google Drive search query
2. path: a list of path to the file, 'folder/subfolder/filename' -> ['folder', 'subfolder', 'filename']
3. query_list: query extends to list to download multiple files
4. path_list: path extends to list to download multiple files
dest: target file name or list. If *_list is used, dest should also be a list of the same length.
Return the downloaded filepath locally.
"""
settings_file = config.get('settings_file', 'evaluation_examples/settings/googledrive/settings.json')
auth = GoogleAuth(settings_file=settings_file)
drive = GoogleDrive(auth)
def get_single_file(_query, _path):
parent_id = 'root'
try:
for q in _query:
search = f'( {q} ) and "{parent_id}" in parents'
filelist: GoogleDriveFileList = drive.ListFile({'q': search}).GetList()
if len(filelist) == 0: # target file not found
return None
file: GoogleDriveFile = filelist[0] # HACK: if multiple candidates, just use the first one
parent_id = file['id']
file.GetContentFile(_path, mimetype=file['mimeType'])
except Exception as e:
logger.info('[ERROR]: Failed to download the file from Google Drive', e)
return None
return _path
if 'query' in config:
return get_single_file(config['query'], os.path.join(env.cache_dir, config['dest']))
elif 'path' in config:
query = [f"title = '{fp}' and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if idx < len(config['path']) - 1
else f"title = '{fp}' and trashed = false" for idx, fp in enumerate(config['path'])]
return get_single_file(query, os.path.join(env.cache_dir, config['dest']))
elif 'query_list' in config:
_path_list = []
assert len(config['query_list']) == len(config['dest'])
for idx, query in enumerate(config['query_list']):
dest = config['dest'][idx]
_path_list.append(get_single_file(query, os.path.join(env.cache_dir, dest)))
return _path_list
else: # path_list in config
_path_list = []
assert len(config['path_list']) == len(config['dest'])
for idx, path in enumerate(config['path_list']):
query = [f"title = '{fp}' and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if jdx < len(path) - 1
else f"title = '{fp}' and trashed = false" for jdx, fp in enumerate(path)]
dest = config['dest'][idx]
_path_list.append(get_single_file(query, os.path.join(env.cache_dir, dest)))
return _path_list