519 lines
22 KiB
Python
519 lines
22 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import sqlite3
|
|
from typing import Dict, Any
|
|
from pydrive.auth import GoogleAuth
|
|
from pydrive.drive import GoogleDrive, GoogleDriveFileList, GoogleDriveFile
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
logger = logging.getLogger("desktopenv.getters.chrome")
|
|
|
|
"""
|
|
WARNING:
|
|
1. Functions from this script assume that no account is registered on Chrome, otherwise the default file path needs to be changed.
|
|
2. The functions are not tested on Windows and Mac, but they should work.
|
|
"""
|
|
|
|
|
|
# The following ones just need to load info from the files of software, no need to connect to the software
|
|
def get_default_search_engine(env, config: Dict[str, str]):
|
|
os_type = env.vm_platform
|
|
if os_type == 'Windows':
|
|
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
|
|
'Google\\Chrome\\User Data\\Default\\Preferences'))""")['output'].strip()
|
|
elif os_type == 'Darwin':
|
|
preference_file_path = env.controller.execute_python_command(
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
|
'output'].strip()
|
|
elif os_type == 'Linux':
|
|
preference_file_path = env.controller.execute_python_command(
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
'output'].strip()
|
|
else:
|
|
raise Exception('Unsupported operating system')
|
|
|
|
try:
|
|
content = env.controller.get_file(preference_file_path)
|
|
data = json.loads(content)
|
|
|
|
# The path within the JSON data to the default search engine might vary
|
|
search_engine = data.get('default_search_provider_data', {}).get('template_url_data', {}).get('short_name',
|
|
'Google')
|
|
return search_engine
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
return "Google"
|
|
|
|
|
|
def get_cookie_data(env, config: Dict[str, str]):
|
|
"""
|
|
Get the cookies from the Chrome browser.
|
|
Assume the cookies are stored in the default location, not encrypted and not large in size.
|
|
"""
|
|
os_type = env.vm_platform
|
|
if os_type == 'Windows':
|
|
chrome_cookie_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
|
|
'Google\\Chrome\\User Data\\Default\\Cookies'))""")['output'].strip()
|
|
elif os_type == 'Darwin':
|
|
chrome_cookie_file_path = env.controller.execute_python_command(
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Cookies'))")[
|
|
'output'].strip()
|
|
elif os_type == 'Linux':
|
|
chrome_cookie_file_path = env.controller.execute_python_command(
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Cookies'))")[
|
|
'output'].strip()
|
|
else:
|
|
raise Exception('Unsupported operating system')
|
|
|
|
try:
|
|
content = env.controller.get_file(chrome_cookie_file_path)
|
|
_path = os.path.join(env.cache_dir, config["dest"])
|
|
|
|
with open(_path, "wb") as f:
|
|
f.write(content)
|
|
|
|
conn = sqlite3.connect(_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Query to check for OpenAI cookies
|
|
cursor.execute("SELECT * FROM cookies")
|
|
cookies = cursor.fetchall()
|
|
return cookies
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
return None
|
|
|
|
|
|
def get_history(env, config: Dict[str, str]):
|
|
os_type = env.vm_platform
|
|
if os_type == 'Windows':
|
|
chrome_history_path = env.controller.execute_python_command(
|
|
"""import os; print(os.path.join(os.getenv('USERPROFILE'), "AppData", "Local", "Google", "Chrome", "User Data", "Default", "History"))""")[
|
|
'output'].strip()
|
|
elif os_type == 'Darwin':
|
|
chrome_history_path = env.controller.execute_python_command(
|
|
"""import os; print(os.path.join(os.getenv('HOME'), "Library", "Application Support", "Google", "Chrome", "Default", "History"))""")[
|
|
'output'].strip()
|
|
elif os_type == 'Linux':
|
|
chrome_history_path = env.controller.execute_python_command(
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config', 'google-chrome', 'Default', 'History'))")[
|
|
'output'].strip()
|
|
else:
|
|
raise Exception('Unsupported operating system')
|
|
|
|
try:
|
|
content = env.controller.get_file(chrome_history_path)
|
|
_path = os.path.join(env.cache_dir, config["dest"])
|
|
|
|
with open(_path, "wb") as f:
|
|
f.write(content)
|
|
|
|
conn = sqlite3.connect(_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Query to check for OpenAI cookies
|
|
cursor.execute("SELECT url, title, last_visit_time FROM urls")
|
|
history_items = cursor.fetchall()
|
|
return history_items
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
return None
|
|
|
|
|
|
def get_enabled_experiments(env, config: Dict[str, str]):
|
|
os_type = env.vm_platform
|
|
if os_type == 'Windows':
|
|
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
|
|
'Google\\Chrome\\User Data\\Local State'))""")[
|
|
'output'].strip()
|
|
elif os_type == 'Darwin':
|
|
preference_file_path = env.controller.execute_python_command(
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Local State'))")[
|
|
'output'].strip()
|
|
elif os_type == 'Linux':
|
|
preference_file_path = env.controller.execute_python_command(
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Local State'))")[
|
|
'output'].strip()
|
|
else:
|
|
raise Exception('Unsupported operating system')
|
|
|
|
try:
|
|
content = env.controller.get_file(preference_file_path)
|
|
data = json.loads(content)
|
|
|
|
# The path within the JSON data to the default search engine might vary
|
|
enabled_labs_experiments = data.get('browser', {}).get('enabled_labs_experiments', [])
|
|
return enabled_labs_experiments
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
return []
|
|
|
|
|
|
def get_profile_name(env, config: Dict[str, str]):
|
|
"""
|
|
Get the username from the Chrome browser.
|
|
Assume the cookies are stored in the default location, not encrypted and not large in size.
|
|
"""
|
|
os_type = env.vm_platform
|
|
if os_type == 'Windows':
|
|
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
|
|
'Google\\Chrome\\User Data\\Default\\Preferences'))""")['output'].strip()
|
|
elif os_type == 'Darwin':
|
|
preference_file_path = env.controller.execute_python_command(
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
|
'output'].strip()
|
|
elif os_type == 'Linux':
|
|
preference_file_path = env.controller.execute_python_command(
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
'output'].strip()
|
|
else:
|
|
raise Exception('Unsupported operating system')
|
|
|
|
try:
|
|
content = env.controller.get_file(preference_file_path)
|
|
data = json.loads(content)
|
|
|
|
# The path within the JSON data to the default search engine might vary
|
|
profile_name = data.get('profile', {}).get('name', None)
|
|
return profile_name
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
return None
|
|
|
|
|
|
def get_chrome_language(env, config: Dict[str, str]):
|
|
os_type = env.vm_platform
|
|
if os_type == 'Windows':
|
|
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
|
|
'Google\\Chrome\\User Data\\Local State'))""")[
|
|
'output'].strip()
|
|
elif os_type == 'Darwin':
|
|
preference_file_path = env.controller.execute_python_command(
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Local State'))")[
|
|
'output'].strip()
|
|
elif os_type == 'Linux':
|
|
preference_file_path = env.controller.execute_python_command(
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Local State'))")[
|
|
'output'].strip()
|
|
else:
|
|
raise Exception('Unsupported operating system')
|
|
|
|
try:
|
|
content = env.controller.get_file(preference_file_path)
|
|
data = json.loads(content)
|
|
|
|
# The path within the JSON data to the default search engine might vary
|
|
enabled_labs_experiments = data.get('intl', {}).get('app_locale', "en-US")
|
|
return enabled_labs_experiments
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
return "en-US"
|
|
|
|
|
|
def get_chrome_font_size(env, config: Dict[str, str]):
|
|
os_type = env.vm_platform
|
|
if os_type == 'Windows':
|
|
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
|
|
'Google\\Chrome\\User Data\\Default\\Preferences'))""")[
|
|
'output'].strip()
|
|
elif os_type == 'Darwin':
|
|
preference_file_path = env.controller.execute_python_command(
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
|
'output'].strip()
|
|
elif os_type == 'Linux':
|
|
preference_file_path = env.controller.execute_python_command(
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
'output'].strip()
|
|
else:
|
|
raise Exception('Unsupported operating system')
|
|
|
|
try:
|
|
content = env.controller.get_file(preference_file_path)
|
|
data = json.loads(content)
|
|
|
|
# The path within the JSON data to the default search engine might vary
|
|
search_engine = data.get('webkit', {}).get('webprefs', {
|
|
"default_fixed_font_size": 13,
|
|
"default_font_size": 16,
|
|
"minimum_font_size": 13
|
|
})
|
|
return search_engine
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
return {
|
|
"default_fixed_font_size": 13,
|
|
"default_font_size": 16
|
|
}
|
|
|
|
|
|
def get_bookmarks(env, config: Dict[str, str]):
|
|
os_type = env.vm_platform
|
|
if os_type == 'Windows':
|
|
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
|
|
'Google\\Chrome\\User Data\\Default\\Bookmarks'))""")['output'].strip()
|
|
elif os_type == 'Darwin':
|
|
preference_file_path = env.controller.execute_python_command(
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Bookmarks'))")[
|
|
'output'].strip()
|
|
elif os_type == 'Linux':
|
|
preference_file_path = env.controller.execute_python_command(
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Bookmarks'))")[
|
|
'output'].strip()
|
|
else:
|
|
raise Exception('Unsupported operating system')
|
|
|
|
content = env.controller.get_file(preference_file_path)
|
|
if not content:
|
|
return []
|
|
data = json.loads(content)
|
|
bookmarks = data.get('roots', {})
|
|
return bookmarks
|
|
|
|
|
|
# todo: move this to the main.py
|
|
def get_extensions_installed_from_shop(env, config: Dict[str, str]):
|
|
"""Find the Chrome extensions directory based on the operating system."""
|
|
os_type = env.vm_platform
|
|
if os_type == 'Windows':
|
|
chrome_extension_dir = env.controller.execute_python_command(
|
|
"""os.path.expanduser('~') + '\\AppData\\Local\\Google\\Chrome\\User Data\\Default\\Extensions\\'""")[
|
|
'output'].strip()
|
|
elif os_type == 'Darwin': # macOS
|
|
chrome_extension_dir = env.controller.execute_python_command(
|
|
"""os.path.expanduser('~') + '/Library/Application Support/Google/Chrome/Default/Extensions/'""")[
|
|
'output'].strip()
|
|
elif os_type == 'Linux':
|
|
chrome_extension_dir = env.controller.execute_python_command(
|
|
"""os.path.expanduser('~') + '/.config/google-chrome/Default/Extensions/'""")['output'].strip()
|
|
else:
|
|
raise Exception('Unsupported operating system')
|
|
|
|
manifests = []
|
|
for extension_id in os.listdir(chrome_extension_dir):
|
|
extension_path = os.path.join(chrome_extension_dir, extension_id)
|
|
if os.path.isdir(extension_path):
|
|
# Iterate through version-named subdirectories
|
|
for version_dir in os.listdir(extension_path):
|
|
version_path = os.path.join(extension_path, version_dir)
|
|
manifest_path = os.path.join(version_path, 'manifest.json')
|
|
if os.path.isfile(manifest_path):
|
|
with open(manifest_path, 'r') as file:
|
|
try:
|
|
manifest = json.load(file)
|
|
manifests.append(manifest)
|
|
except json.JSONDecodeError:
|
|
logger.error(f"Error reading {manifest_path}")
|
|
return manifests
|
|
|
|
|
|
# The following ones require Playwright to be installed on the target machine, and the chrome needs to be pre-config on
|
|
# port info to allow remote debugging, see README.md for details
|
|
|
|
def get_open_tabs_info(env, config: Dict[str, str]):
|
|
host = env.vm_ip
|
|
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
|
|
|
remote_debugging_url = f"http://{host}:{port}"
|
|
with sync_playwright() as p:
|
|
# connect to remote Chrome instance
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
tabs_info = []
|
|
for context in browser.contexts:
|
|
for page in context.pages:
|
|
try:
|
|
# Wait for the page to finish loading, this prevents the "execution context was destroyed" issue
|
|
page.wait_for_load_state('load') # Wait for the 'load' event to complete
|
|
title = page.title()
|
|
url = page.url
|
|
tabs_info.append({'title': title, 'url': url})
|
|
except TimeoutError:
|
|
# If page loading times out, catch the exception and store the current information in the list
|
|
tabs_info.append({'title': 'Load timeout', 'url': page.url})
|
|
except Exception as e:
|
|
# Catch other potential exceptions that might occur while reading the page title
|
|
print(f'Error: {e}')
|
|
tabs_info.append({'title': 'Error encountered', 'url': page.url})
|
|
|
|
browser.close()
|
|
return tabs_info
|
|
|
|
|
|
def get_active_tab_info(env, config: Dict[str, str]):
|
|
host = env.vm_ip
|
|
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
|
|
|
remote_debugging_url = f"http://{host}:{port}"
|
|
with sync_playwright() as p:
|
|
# connect to remote Chrome instance
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
active_tab_info = {}
|
|
for context in browser.contexts:
|
|
for page in context.pages:
|
|
if page.is_visible("body"): # check the visibility of the page body to determine the active status
|
|
active_tab_info = {
|
|
'title': page.title(),
|
|
'url': page.url,
|
|
'content': page.content() # get the HTML content of the page
|
|
}
|
|
break
|
|
if active_tab_info:
|
|
break
|
|
|
|
browser.close()
|
|
return active_tab_info
|
|
|
|
|
|
def get_pdf_from_url(env, config: Dict[str, str]) -> str:
|
|
"""
|
|
Download a PDF from a URL.
|
|
"""
|
|
_url = config["path"]
|
|
_path = os.path.join(env.cache_dir, config["dest"])
|
|
|
|
host = env.vm_ip
|
|
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
|
|
|
remote_debugging_url = f"http://{host}:{port}"
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
page = browser.new_page()
|
|
page.goto(_url)
|
|
page.pdf(path=_path)
|
|
browser.close()
|
|
|
|
return _path
|
|
|
|
|
|
# fixme: needs to be changed (maybe through post-processing) since it's not working
|
|
def get_chrome_saved_address(env, config: Dict[str, str]):
|
|
host = env.vm_ip
|
|
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
|
|
|
remote_debugging_url = f"http://{host}:{port}"
|
|
with sync_playwright() as p:
|
|
# connect to remote Chrome instance
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
page = browser.new_page()
|
|
|
|
# Navigate to Chrome's settings page for autofill
|
|
page.goto("chrome://settings/addresses")
|
|
|
|
# Get the HTML content of the page
|
|
content = page.content()
|
|
|
|
browser.close()
|
|
|
|
return content
|
|
|
|
|
|
def get_shortcuts_on_desktop(env, config: Dict[str, str]):
|
|
# Find out the operating system
|
|
os_name = env.vm_platform
|
|
|
|
# Depending on the OS, define the shortcut file extension
|
|
if os_name == 'Windows':
|
|
# Windows shortcuts are typically .url or .lnk files
|
|
shortcut_extension = '.lnk'
|
|
elif os_name == 'Darwin':
|
|
# macOS's shortcuts are .webloc files
|
|
shortcut_extension = '.webloc'
|
|
elif os_name == 'Linux':
|
|
# Linux (Ubuntu, etc.) shortcuts are typically .desktop files
|
|
shortcut_extension = '.desktop'
|
|
else:
|
|
logger.error(f"Unsupported operating system: {os_name}")
|
|
return []
|
|
|
|
# Get the path to the desktop folder
|
|
desktop_path = env.controller.get_vm_desktop_path()
|
|
desktop_directory_tree = env.controller.get_vm_directory_tree(desktop_path)
|
|
|
|
shortcuts_paths = [file['name'] for file in desktop_directory_tree['children'] if
|
|
file['name'].endswith(shortcut_extension)]
|
|
|
|
short_cuts = {}
|
|
|
|
for shortcut_path in shortcuts_paths:
|
|
short_cuts[shortcut_path] = env.controller.get_file(env.controller.execute_python_command(
|
|
f"import os; print(os.path.join(os.path.expanduser('~'), 'Desktop', '{shortcut_path}'))")[
|
|
'output'].strip()).decode('utf-8')
|
|
|
|
return short_cuts
|
|
|
|
|
|
def get_number_of_search_results(env, config: Dict[str, str]):
|
|
# todo: move into the config file
|
|
url, result_selector = "https://google.com/search?q=query", '.search-result'
|
|
host = env.vm_ip
|
|
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
|
|
|
remote_debugging_url = f"http://{host}:{port}"
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
page = browser.new_page()
|
|
page.goto(url)
|
|
search_results = page.query_selector_all(result_selector)
|
|
actual_count = len(search_results)
|
|
browser.close()
|
|
|
|
return actual_count
|
|
|
|
|
|
def get_googledrive_file(env, config: Dict[str, Any]) -> str:
|
|
""" Get the desired file from Google Drive based on config, return the downloaded local filepath.
|
|
To retrieve the file, we provide two options in config dict:
|
|
1. query: a list of queries to search the file, each query is a string that follows the format of Google Drive search query
|
|
2. path: a list of path to the file, 'folder/subfolder/filename' -> ['folder', 'subfolder', 'filename']
|
|
3. query_list: query extends to list to download multiple files
|
|
4. path_list: path extends to list to download multiple files
|
|
dest: target file name or list. If *_list is used, dest should also be a list of the same length.
|
|
Return the downloaded filepath locally.
|
|
"""
|
|
settings_file = config.get('settings_file', 'evaluation_examples/settings/googledrive/settings.json')
|
|
auth = GoogleAuth(settings_file=settings_file)
|
|
drive = GoogleDrive(auth)
|
|
|
|
def get_single_file(_query, _path):
|
|
parent_id = 'root'
|
|
try:
|
|
for q in _query:
|
|
search = f'( {q} ) and "{parent_id}" in parents'
|
|
filelist: GoogleDriveFileList = drive.ListFile({'q': search}).GetList()
|
|
if len(filelist) == 0: # target file not found
|
|
return None
|
|
file: GoogleDriveFile = filelist[0] # HACK: if multiple candidates, just use the first one
|
|
parent_id = file['id']
|
|
|
|
file.GetContentFile(_path, mimetype=file['mimeType'])
|
|
except:
|
|
logger.info('[ERROR]: Failed to download the file from Google Drive')
|
|
return None
|
|
return _path
|
|
|
|
if 'query' in config:
|
|
return get_single_file(config['query'], os.path.join(env.cache_dir, config['dest']))
|
|
elif 'path' in config:
|
|
query = [f"title = {fp} and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if idx < len(config['path']) - 1
|
|
else f'title = {fp} and trashed = false' for idx, fp in enumerate(config['path'])]
|
|
return get_single_file(query, os.path.join(env.cache_dir, config['dest']))
|
|
elif 'query_list' in config:
|
|
_path_list = []
|
|
assert len(config['query_list']) == len(config['dest'])
|
|
for idx, query in enumerate(config['query_list']):
|
|
dest = config['dest'][idx]
|
|
_path_list.append(get_single_file(query, os.path.join(env.cache_dir, dest)))
|
|
return _path_list
|
|
else: # path_list in config
|
|
_path_list = []
|
|
assert len(config['path_list']) == len(config['dest'])
|
|
for idx, path in enumerate(config['path_list']):
|
|
query = [f"title = {fp} and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if jdx < len(path) - 1
|
|
else f'title = {fp} and trashed = false' for jdx, fp in enumerate(path)]
|
|
dest = config['dest'][idx]
|
|
_path_list.append(get_single_file(query, os.path.join(env.cache_dir, dest)))
|
|
return _path_list |