|
|
|
|
@@ -2,27 +2,29 @@ import json
|
|
|
|
|
import logging
|
|
|
|
|
import os
|
|
|
|
|
import platform
|
|
|
|
|
import time
|
|
|
|
|
import sqlite3
|
|
|
|
|
import time
|
|
|
|
|
from typing import Dict, Any, List
|
|
|
|
|
from pydrive.auth import GoogleAuth
|
|
|
|
|
from pydrive.drive import GoogleDrive, GoogleDriveFileList, GoogleDriveFile
|
|
|
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
|
from urllib.parse import urlparse, parse_qs
|
|
|
|
|
|
|
|
|
|
import lxml.etree
|
|
|
|
|
import requests
|
|
|
|
|
from lxml.cssselect import CSSSelector
|
|
|
|
|
from lxml.etree import _Element
|
|
|
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
|
from pydrive.auth import GoogleAuth
|
|
|
|
|
from pydrive.drive import GoogleDrive, GoogleDriveFileList, GoogleDriveFile
|
|
|
|
|
|
|
|
|
|
_accessibility_ns_map = {"st": "uri:deskat:state.at-spi.gnome.org"
|
|
|
|
|
, "attr": "uri:deskat:attributes.at-spi.gnome.org"
|
|
|
|
|
, "cp": "uri:deskat:component.at-spi.gnome.org"
|
|
|
|
|
, "doc": "uri:deskat:document.at-spi.gnome.org"
|
|
|
|
|
, "docattr": "uri:deskat:attributes.document.at-spi.gnome.org"
|
|
|
|
|
, "txt": "uri:deskat:text.at-spi.gnome.org"
|
|
|
|
|
, "val": "uri:deskat:value.at-spi.gnome.org"
|
|
|
|
|
, "act": "uri:deskat:action.at-spi.gnome.org"
|
|
|
|
|
}
|
|
|
|
|
_accessibility_ns_map = {
|
|
|
|
|
"st": "uri:deskat:state.at-spi.gnome.org",
|
|
|
|
|
"attr": "uri:deskat:attributes.at-spi.gnome.org",
|
|
|
|
|
"cp": "uri:deskat:component.at-spi.gnome.org",
|
|
|
|
|
"doc": "uri:deskat:document.at-spi.gnome.org",
|
|
|
|
|
"docattr": "uri:deskat:attributes.document.at-spi.gnome.org",
|
|
|
|
|
"txt": "uri:deskat:text.at-spi.gnome.org",
|
|
|
|
|
"val": "uri:deskat:value.at-spi.gnome.org",
|
|
|
|
|
"act": "uri:deskat:action.at-spi.gnome.org"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger("desktopenv.getters.chrome")
|
|
|
|
|
|
|
|
|
|
@@ -44,9 +46,12 @@ def get_default_search_engine(env, config: Dict[str, str]):
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
elif os_type == 'Linux':
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
else:
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('Unsupported operating system')
|
|
|
|
|
|
|
|
|
|
@@ -77,9 +82,12 @@ def get_cookie_data(env, config: Dict[str, str]):
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Cookies'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
elif os_type == 'Linux':
|
|
|
|
|
chrome_cookie_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Cookies'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
else:
|
|
|
|
|
chrome_cookie_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Cookies'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('Unsupported operating system')
|
|
|
|
|
|
|
|
|
|
@@ -113,9 +121,12 @@ def get_history(env, config: Dict[str, str]):
|
|
|
|
|
"""import os; print(os.path.join(os.getenv('HOME'), "Library", "Application Support", "Google", "Chrome", "Default", "History"))""")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
elif os_type == 'Linux':
|
|
|
|
|
chrome_history_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config', 'google-chrome', 'Default', 'History'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
else:
|
|
|
|
|
chrome_history_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config', 'google-chrome', 'Default', 'History'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('Unsupported operating system')
|
|
|
|
|
|
|
|
|
|
@@ -149,9 +160,12 @@ def get_enabled_experiments(env, config: Dict[str, str]):
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Local State'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
elif os_type == 'Linux':
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Local State'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
else:
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Local State'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('Unsupported operating system')
|
|
|
|
|
|
|
|
|
|
@@ -181,9 +195,12 @@ def get_profile_name(env, config: Dict[str, str]):
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
elif os_type == 'Linux':
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
else:
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('Unsupported operating system')
|
|
|
|
|
|
|
|
|
|
@@ -210,9 +227,12 @@ def get_chrome_language(env, config: Dict[str, str]):
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Local State'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
elif os_type == 'Linux':
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Local State'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
else:
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Local State'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('Unsupported operating system')
|
|
|
|
|
|
|
|
|
|
@@ -239,9 +259,12 @@ def get_chrome_font_size(env, config: Dict[str, str]):
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
elif os_type == 'Linux':
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
else:
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('Unsupported operating system')
|
|
|
|
|
|
|
|
|
|
@@ -274,9 +297,12 @@ def get_bookmarks(env, config: Dict[str, str]):
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Bookmarks'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
elif os_type == 'Linux':
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Bookmarks'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
else:
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Bookmarks'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('Unsupported operating system')
|
|
|
|
|
|
|
|
|
|
@@ -301,8 +327,11 @@ def get_extensions_installed_from_shop(env, config: Dict[str, str]):
|
|
|
|
|
"""os.path.expanduser('~') + '/Library/Application Support/Google/Chrome/Default/Extensions/'""")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
elif os_type == 'Linux':
|
|
|
|
|
chrome_extension_dir = env.controller.execute_python_command(
|
|
|
|
|
"""os.path.expanduser('~') + '/.config/google-chrome/Default/Extensions/'""")['output'].strip()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
else:
|
|
|
|
|
chrome_extension_dir = env.controller.execute_python_command(
|
|
|
|
|
"""os.path.expanduser('~') + '/.config/google-chrome/Default/Extensions/'""")['output'].strip()
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('Unsupported operating system')
|
|
|
|
|
|
|
|
|
|
@@ -335,7 +364,28 @@ def get_page_info(env, config: Dict[str, str]):
|
|
|
|
|
remote_debugging_url = f"http://{host}:{port}"
|
|
|
|
|
with sync_playwright() as p:
|
|
|
|
|
# connect to remote Chrome instance
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
try:
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
# If the connection fails, start a new browser instance
|
|
|
|
|
platform.machine()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
# start a new browser instance if the connection fails
|
|
|
|
|
payload = json.dumps({"command": [
|
|
|
|
|
"chromium",
|
|
|
|
|
"--remote-debugging-port=1337"
|
|
|
|
|
], "shell": False})
|
|
|
|
|
else:
|
|
|
|
|
payload = json.dumps({"command": [
|
|
|
|
|
"google-chrome",
|
|
|
|
|
"--remote-debugging-port=1337"
|
|
|
|
|
], "shell": False})
|
|
|
|
|
|
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
|
requests.post("http://" + host + ":5000/setup" + "/launch", headers=headers, data=payload)
|
|
|
|
|
time.sleep(5)
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
|
|
|
|
|
page = browser.contexts[0].new_page()
|
|
|
|
|
page.goto(url)
|
|
|
|
|
|
|
|
|
|
@@ -364,7 +414,30 @@ def get_open_tabs_info(env, config: Dict[str, str]):
|
|
|
|
|
remote_debugging_url = f"http://{host}:{port}"
|
|
|
|
|
with sync_playwright() as p:
|
|
|
|
|
# connect to remote Chrome instance
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
try:
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
# If the connection fails, start a new browser instance
|
|
|
|
|
platform.machine()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
# start a new browser instance if the connection fails
|
|
|
|
|
payload = json.dumps({"command": [
|
|
|
|
|
"chromium",
|
|
|
|
|
"--remote-debugging-port=1337"
|
|
|
|
|
], "shell": False})
|
|
|
|
|
else:
|
|
|
|
|
payload = json.dumps({"command": [
|
|
|
|
|
"google-chrome",
|
|
|
|
|
"--remote-debugging-port=1337"
|
|
|
|
|
], "shell": False})
|
|
|
|
|
|
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
|
requests.post("http://" + host + ":5000/setup" + "/launch", headers=headers, data=payload)
|
|
|
|
|
time.sleep(5)
|
|
|
|
|
try:
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
tabs_info = []
|
|
|
|
|
for context in browser.contexts:
|
|
|
|
|
@@ -406,7 +479,7 @@ def get_active_url_from_accessTree(env, config):
|
|
|
|
|
}
|
|
|
|
|
Return
|
|
|
|
|
url: str
|
|
|
|
|
"""
|
|
|
|
|
"""
|
|
|
|
|
accessibility_tree: str = env.controller.get_accessibility_tree()
|
|
|
|
|
# download accessibility tree to "/home/user/Desktop"
|
|
|
|
|
logger.debug("AT@eval: %s", accessibility_tree)
|
|
|
|
|
@@ -414,9 +487,11 @@ def get_active_url_from_accessTree(env, config):
|
|
|
|
|
at: _Element = lxml.etree.fromstring(accessibility_tree)
|
|
|
|
|
arch = platform.machine()
|
|
|
|
|
if "arm" in arch:
|
|
|
|
|
selector = CSSSelector(", application[name=Chromium] entry[name=Address\\ and\\ search\\ bar]", namespaces=_accessibility_ns_map)
|
|
|
|
|
selector = CSSSelector(", application[name=Chromium] entry[name=Address\\ and\\ search\\ bar]",
|
|
|
|
|
namespaces=_accessibility_ns_map)
|
|
|
|
|
else:
|
|
|
|
|
selector = CSSSelector(", application[name=Google\\ Chrome] entry[name=Address\\ and\\ search\\ bar]", namespaces=_accessibility_ns_map)
|
|
|
|
|
selector = CSSSelector("application[name=Google\\ Chrome] entry[name=Address\\ and\\ search\\ bar]",
|
|
|
|
|
namespaces=_accessibility_ns_map)
|
|
|
|
|
elements: List[_Element] = selector(at)
|
|
|
|
|
# if "xpath" in config:
|
|
|
|
|
# elements: List[_Element] = at.xpath(config["xpath"], namespaces=_accessibility_ns_map)
|
|
|
|
|
@@ -425,8 +500,9 @@ def get_active_url_from_accessTree(env, config):
|
|
|
|
|
# elements: List[_Element] = selector(at)
|
|
|
|
|
if len(elements) == 0:
|
|
|
|
|
print("no elements found")
|
|
|
|
|
return 0.
|
|
|
|
|
active_tab_url = config["goto_prefix"]+elements[0].text if "goto_prefix" in config.keys() else "https://" + elements[0].text
|
|
|
|
|
return None
|
|
|
|
|
active_tab_url = config["goto_prefix"] + elements[0].text if "goto_prefix" in config.keys() else "https://" + \
|
|
|
|
|
elements[0].text
|
|
|
|
|
print("active tab url now: {}".format(active_tab_url))
|
|
|
|
|
return active_tab_url
|
|
|
|
|
|
|
|
|
|
@@ -448,8 +524,12 @@ def get_active_tab_info(env, config: Dict[str, str]):
|
|
|
|
|
|
|
|
|
|
remote_debugging_url = f"http://{host}:{port}"
|
|
|
|
|
with sync_playwright() as p:
|
|
|
|
|
# connect to remote Chrome instance
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
# connect to remote Chrome instance, since it is supposed to be the active one, we won't start a new one if failed
|
|
|
|
|
try:
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
active_tab_info = {}
|
|
|
|
|
# go to the target URL page
|
|
|
|
|
page = browser.new_page()
|
|
|
|
|
@@ -481,7 +561,28 @@ def get_pdf_from_url(env, config: Dict[str, str]) -> str:
|
|
|
|
|
remote_debugging_url = f"http://{host}:{port}"
|
|
|
|
|
|
|
|
|
|
with sync_playwright() as p:
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
try:
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
# If the connection fails, start a new browser instance
|
|
|
|
|
platform.machine()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
# start a new browser instance if the connection fails
|
|
|
|
|
payload = json.dumps({"command": [
|
|
|
|
|
"chromium",
|
|
|
|
|
"--remote-debugging-port=1337"
|
|
|
|
|
], "shell": False})
|
|
|
|
|
else:
|
|
|
|
|
payload = json.dumps({"command": [
|
|
|
|
|
"google-chrome",
|
|
|
|
|
"--remote-debugging-port=1337"
|
|
|
|
|
], "shell": False})
|
|
|
|
|
|
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
|
requests.post("http://" + host + ":5000/setup" + "/launch", headers=headers, data=payload)
|
|
|
|
|
time.sleep(5)
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
|
|
|
|
|
page = browser.new_page()
|
|
|
|
|
page.goto(_url)
|
|
|
|
|
page.pdf(path=_path)
|
|
|
|
|
@@ -498,7 +599,27 @@ def get_chrome_saved_address(env, config: Dict[str, str]):
|
|
|
|
|
remote_debugging_url = f"http://{host}:{port}"
|
|
|
|
|
with sync_playwright() as p:
|
|
|
|
|
# connect to remote Chrome instance
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
try:
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
# If the connection fails, start a new browser instance
|
|
|
|
|
platform.machine()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
# start a new browser instance if the connection fails
|
|
|
|
|
payload = json.dumps({"command": [
|
|
|
|
|
"chromium",
|
|
|
|
|
"--remote-debugging-port=1337"
|
|
|
|
|
], "shell": False})
|
|
|
|
|
else:
|
|
|
|
|
payload = json.dumps({"command": [
|
|
|
|
|
"google-chrome",
|
|
|
|
|
"--remote-debugging-port=1337"
|
|
|
|
|
], "shell": False})
|
|
|
|
|
|
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
|
requests.post("http://" + host + ":5000/setup" + "/launch", headers=headers, data=payload)
|
|
|
|
|
time.sleep(5)
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
|
|
|
|
|
page = browser.new_page()
|
|
|
|
|
|
|
|
|
|
@@ -556,7 +677,27 @@ def get_number_of_search_results(env, config: Dict[str, str]):
|
|
|
|
|
|
|
|
|
|
remote_debugging_url = f"http://{host}:{port}"
|
|
|
|
|
with sync_playwright() as p:
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
try:
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
# If the connection fails, start a new browser instance
|
|
|
|
|
platform.machine()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
# start a new browser instance if the connection fails
|
|
|
|
|
payload = json.dumps({"command": [
|
|
|
|
|
"chromium",
|
|
|
|
|
"--remote-debugging-port=1337"
|
|
|
|
|
], "shell": False})
|
|
|
|
|
else:
|
|
|
|
|
payload = json.dumps({"command": [
|
|
|
|
|
"google-chrome",
|
|
|
|
|
"--remote-debugging-port=1337"
|
|
|
|
|
], "shell": False})
|
|
|
|
|
|
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
|
requests.post("http://" + host + ":5000/setup" + "/launch", headers=headers, data=payload)
|
|
|
|
|
time.sleep(5)
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
page = browser.new_page()
|
|
|
|
|
page.goto(url)
|
|
|
|
|
search_results = page.query_selector_all(result_selector)
|
|
|
|
|
@@ -634,12 +775,15 @@ def get_enable_do_not_track(env, config: Dict[str, str]):
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
elif os_type == 'Linux':
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
# preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
# "import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
|
|
|
|
# 'output'].strip()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
else:
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('Unsupported operating system')
|
|
|
|
|
|
|
|
|
|
@@ -664,12 +808,15 @@ def get_enable_enhanced_safety_browsing(env, config: Dict[str, str]):
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
elif os_type == 'Linux':
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
# preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
# "import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
|
|
|
|
# 'output'].strip()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
else:
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('Unsupported operating system')
|
|
|
|
|
|
|
|
|
|
@@ -694,12 +841,15 @@ def get_new_startup_page(env, config: Dict[str, str]):
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
elif os_type == 'Linux':
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
# preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
# "import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
|
|
|
|
# 'output'].strip()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
else:
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('Unsupported operating system')
|
|
|
|
|
|
|
|
|
|
@@ -729,12 +879,15 @@ def get_find_unpacked_extension_path(env, config: Dict[str, str]):
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
elif os_type == 'Linux':
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
# preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
# "import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
|
|
|
|
# 'output'].strip()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
else:
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('Unsupported operating system')
|
|
|
|
|
|
|
|
|
|
@@ -766,12 +919,14 @@ def get_data_delete_automacally(env, config: Dict[str, str]):
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
elif os_type == 'Linux':
|
|
|
|
|
# preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
# "import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
|
|
|
# 'output'].strip()
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), 'snap/chromium/common/chromium/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
else:
|
|
|
|
|
preference_file_path = env.controller.execute_python_command(
|
|
|
|
|
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
|
|
|
|
'output'].strip()
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('Unsupported operating system')
|
|
|
|
|
|
|
|
|
|
@@ -779,7 +934,7 @@ def get_data_delete_automacally(env, config: Dict[str, str]):
|
|
|
|
|
content = env.controller.get_file(preference_file_path)
|
|
|
|
|
data = json.loads(content)
|
|
|
|
|
data_delete_state = data["profile"]["exit_type"]
|
|
|
|
|
return data_delete_state
|
|
|
|
|
return data_delete_state
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Error: {e}")
|
|
|
|
|
return "Google"
|
|
|
|
|
@@ -823,7 +978,27 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
|
|
|
|
|
remote_debugging_url = f"http://{host}:{port}"
|
|
|
|
|
with sync_playwright() as p:
|
|
|
|
|
# connect to remote Chrome instance
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
try:
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
# If the connection fails, start a new browser instance
|
|
|
|
|
platform.machine()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
# start a new browser instance if the connection fails
|
|
|
|
|
payload = json.dumps({"command": [
|
|
|
|
|
"chromium",
|
|
|
|
|
"--remote-debugging-port=1337"
|
|
|
|
|
], "shell": False})
|
|
|
|
|
else:
|
|
|
|
|
payload = json.dumps({"command": [
|
|
|
|
|
"google-chrome",
|
|
|
|
|
"--remote-debugging-port=1337"
|
|
|
|
|
], "shell": False})
|
|
|
|
|
|
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
|
requests.post("http://" + host + ":5000/setup" + "/launch", headers=headers, data=payload)
|
|
|
|
|
time.sleep(5)
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
target_page = None
|
|
|
|
|
for context in browser.contexts:
|
|
|
|
|
for page in context.pages:
|
|
|
|
|
@@ -831,31 +1006,34 @@ def get_active_tab_html_parse(env, config: Dict[str, Any]):
|
|
|
|
|
if page.url == active_tab_url:
|
|
|
|
|
target_page = page
|
|
|
|
|
break
|
|
|
|
|
if target_page is None:
|
|
|
|
|
return {}
|
|
|
|
|
return_json = {}
|
|
|
|
|
if config["category"] == "class":
|
|
|
|
|
#find the text of elements in html with specific class name
|
|
|
|
|
# find the text of elements in html with specific class name
|
|
|
|
|
class_multiObject = config["class_multiObject"]
|
|
|
|
|
for key in class_multiObject.keys():
|
|
|
|
|
object_dict = class_multiObject[key]
|
|
|
|
|
for order_key in object_dict.keys():
|
|
|
|
|
return_json[object_dict[order_key]] = target_page.query_selector_all("."+key)[int(order_key)].text_content().strip()
|
|
|
|
|
return_json[object_dict[order_key]] = target_page.query_selector_all("." + key)[
|
|
|
|
|
int(order_key)].text_content().strip()
|
|
|
|
|
class_singleObject = config["class_singleObject"]
|
|
|
|
|
for key in class_singleObject.keys():
|
|
|
|
|
return_json[class_singleObject[key]] = target_page.query_selector("."+key).text_content().strip()
|
|
|
|
|
return_json[class_singleObject[key]] = target_page.query_selector("." + key).text_content().strip()
|
|
|
|
|
elif config['category'] == "label":
|
|
|
|
|
#find the text of elements in html with specific label name
|
|
|
|
|
# find the text of elements in html with specific label name
|
|
|
|
|
labelObject = config["labelObject"]
|
|
|
|
|
for key in labelObject.keys():
|
|
|
|
|
return_json[labelObject[key]] = target_page.get_by_label(key).text_content().strip()
|
|
|
|
|
elif config["category"] == "xpath":
|
|
|
|
|
#find the text of elements in html with specific xpath
|
|
|
|
|
# find the text of elements in html with specific xpath
|
|
|
|
|
xpathObject = config["xpathObject"]
|
|
|
|
|
for key in xpathObject.keys():
|
|
|
|
|
return_json[xpathObject[key]] = target_page.locator("xpath="+key).text_content().strip()
|
|
|
|
|
return_json[xpathObject[key]] = target_page.locator("xpath=" + key).text_content().strip()
|
|
|
|
|
elif config["category"] == "input":
|
|
|
|
|
inputObject = config["inputObject"]
|
|
|
|
|
for key in inputObject.keys():
|
|
|
|
|
return_json[inputObject[key]] = target_page.locator("xpath="+key).input_value().strip()
|
|
|
|
|
return_json[inputObject[key]] = target_page.locator("xpath=" + key).input_value().strip()
|
|
|
|
|
browser.close()
|
|
|
|
|
return return_json
|
|
|
|
|
|
|
|
|
|
@@ -869,7 +1047,27 @@ def get_gotoRecreationPage_and_get_html_content(env, config: Dict[str, Any]):
|
|
|
|
|
|
|
|
|
|
remote_debugging_url = f"http://{host}:{port}"
|
|
|
|
|
with sync_playwright() as p:
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
try:
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
# If the connection fails, start a new browser instance
|
|
|
|
|
platform.machine()
|
|
|
|
|
if "arm" in platform.machine():
|
|
|
|
|
# start a new browser instance if the connection fails
|
|
|
|
|
payload = json.dumps({"command": [
|
|
|
|
|
"chromium",
|
|
|
|
|
"--remote-debugging-port=1337"
|
|
|
|
|
], "shell": False})
|
|
|
|
|
else:
|
|
|
|
|
payload = json.dumps({"command": [
|
|
|
|
|
"google-chrome",
|
|
|
|
|
"--remote-debugging-port=1337"
|
|
|
|
|
], "shell": False})
|
|
|
|
|
|
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
|
requests.post("http://" + host + ":5000/setup" + "/launch", headers=headers, data=payload)
|
|
|
|
|
time.sleep(5)
|
|
|
|
|
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
|
|
|
|
page = browser.new_page()
|
|
|
|
|
page.goto("https://www.recreation.gov/")
|
|
|
|
|
page.fill("input#hero-search-input", "Albion Basin")
|
|
|
|
|
@@ -888,22 +1086,22 @@ def get_gotoRecreationPage_and_get_html_content(env, config: Dict[str, Any]):
|
|
|
|
|
newpage.click("button.next-available")
|
|
|
|
|
print("after third click")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return_json = {}
|
|
|
|
|
return_json["expected"]={}
|
|
|
|
|
#find the text of elements in html with specific class name
|
|
|
|
|
return_json["expected"] = {}
|
|
|
|
|
# find the text of elements in html with specific class name
|
|
|
|
|
if config["selector"] == "class":
|
|
|
|
|
if "order" in config.keys():
|
|
|
|
|
className = config["class"]
|
|
|
|
|
return_json["expected"][className]=newpage.query_selector_all("."+className)[int(config["order"])].text_content().strip()
|
|
|
|
|
return_json["expected"][className] = newpage.query_selector_all("." + className)[
|
|
|
|
|
int(config["order"])].text_content().strip()
|
|
|
|
|
else:
|
|
|
|
|
className = config["class"]
|
|
|
|
|
return_json["expected"][className] = newpage.query_selector("."+className).text_content().strip()
|
|
|
|
|
return_json["expected"][className] = newpage.query_selector("." + className).text_content().strip()
|
|
|
|
|
browser.close()
|
|
|
|
|
return return_json
|
|
|
|
|
return return_json
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_active_tab_url_parse(env, config:Dict[str, Any]):
|
|
|
|
|
def get_active_tab_url_parse(env, config: Dict[str, Any]):
|
|
|
|
|
"""
|
|
|
|
|
This function is used to parse the url according to config["parse_keys"].
|
|
|
|
|
config:
|
|
|
|
|
@@ -914,6 +1112,8 @@ def get_active_tab_url_parse(env, config:Dict[str, Any]):
|
|
|
|
|
( { "original key": "new key" } )
|
|
|
|
|
"""
|
|
|
|
|
active_tab_url = get_active_url_from_accessTree(env, config)
|
|
|
|
|
if active_tab_url is None:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# connect to remote Chrome instance
|
|
|
|
|
# parse in a hard-coded way to find the specific info about task
|
|
|
|
|
@@ -944,6 +1144,8 @@ def get_url_dashPart(env, config: Dict[str, str]):
|
|
|
|
|
a string, used to indicate the return type, "string" or "json".
|
|
|
|
|
"""
|
|
|
|
|
active_tab_url = get_active_url_from_accessTree(env, config)
|
|
|
|
|
if active_tab_url is None:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# extract the last dash-separated part of the URL, and delete all the characters after "id"
|
|
|
|
|
dash_part = active_tab_url.split("/")[config["partIndex"]]
|
|
|
|
|
|