Update on Chrome examples; Refactor on logic of controlling
This commit is contained in:
@@ -11,8 +11,9 @@ logger = logging.getLogger("desktopenv.pycontroller")
|
||||
|
||||
|
||||
class PythonController:
|
||||
def __init__(self, http_server: str, pkgs_prefix: str = "python -c \"import pyautogui; {command}\""):
|
||||
self.http_server = http_server
|
||||
def __init__(self, vm_ip: str, pkgs_prefix: str = "python -c \"import pyautogui; {command}\""):
|
||||
self.vm_ip = vm_ip
|
||||
self.http_server = f"http://{vm_ip}:5000"
|
||||
self.pkgs_prefix = pkgs_prefix # fixme: this is a hacky way to execute python commands. fix it and combine it with installation of packages
|
||||
|
||||
def get_screenshot(self):
|
||||
@@ -268,15 +269,3 @@ class PythonController:
|
||||
else:
|
||||
logger.error("Failed to get wallpaper. Status code: %d", response.status_code)
|
||||
return None
|
||||
|
||||
# VLC
|
||||
def get_vlc_status(self, host='localhost', port=8080, password='password'):
|
||||
url = f'http://{host}:{port}/requests/status.xml'
|
||||
|
||||
response = requests.get(url, auth=('', password))
|
||||
|
||||
if response.status_code == 200:
|
||||
return response.content
|
||||
else:
|
||||
logger.error("Failed to get vlc status. Status code: %d", response.status_code)
|
||||
return None
|
||||
|
||||
@@ -1,22 +1,24 @@
|
||||
import requests
|
||||
import json
|
||||
from requests_toolbelt.multipart.encoder import MultipartEncoder
|
||||
|
||||
import uuid
|
||||
import os.path
|
||||
|
||||
from typing import Dict, List
|
||||
from typing import Any, Union
|
||||
|
||||
import logging
|
||||
import os.path
|
||||
import time
|
||||
import traceback
|
||||
import uuid
|
||||
from typing import Any, Union
|
||||
from typing import Dict, List
|
||||
|
||||
import requests
|
||||
from playwright.sync_api import sync_playwright
|
||||
from requests_toolbelt.multipart.encoder import MultipartEncoder
|
||||
from desktop_env.evaluators.metrics.utils import compare_urls
|
||||
|
||||
logger = logging.getLogger("desktopenv.setup")
|
||||
|
||||
import traceback
|
||||
|
||||
class SetupController:
|
||||
def __init__(self, http_server: str, cache_dir: str):
|
||||
self.http_server: str = http_server
|
||||
self.http_server_setup_root = http_server + "/setup"
|
||||
def __init__(self, vm_ip: str, cache_dir: str):
|
||||
self.vm_ip: str = vm_ip
|
||||
self.http_server: str = f"http://{vm_ip}:5000"
|
||||
self.cache_dir: str = cache_dir
|
||||
|
||||
def reset_cache_dir(self, cache_dir: str):
|
||||
@@ -52,31 +54,31 @@ class SetupController:
|
||||
# can add other setup steps
|
||||
|
||||
# ZDY_COMMENT: merged with launch
|
||||
#def _command_setup(self, command: str):
|
||||
#"""
|
||||
#Directly send a command into the virtual machine os for setting up.
|
||||
#"""
|
||||
#payload = json.dumps({"command": command})
|
||||
#headers = {
|
||||
#'Content-Type': 'application/json'
|
||||
#}
|
||||
#timeout = 5
|
||||
#timout_whitelist = ["vlc"]
|
||||
#
|
||||
#try:
|
||||
#
|
||||
#response = requests.post(self.http_server + "/execute", headers=headers, data=payload, timeout=timeout)
|
||||
#if response.status_code == 200:
|
||||
#print("Command executed successfully:", response.text)
|
||||
#else:
|
||||
#print("Failed to execute command. Status code:", response.status_code)
|
||||
#except requests.exceptions.Timeout as e:
|
||||
#if command in timout_whitelist:
|
||||
#print("Command executed successfully:", command)
|
||||
#else:
|
||||
#print("An error occurred while trying to execute the command:", e)
|
||||
#except requests.exceptions.RequestException as e:
|
||||
#print("An error occurred while trying to execute the command:", e)
|
||||
# def _command_setup(self, command: str):
|
||||
# """
|
||||
# Directly send a command into the virtual machine os for setting up.
|
||||
# """
|
||||
# payload = json.dumps({"command": command})
|
||||
# headers = {
|
||||
# 'Content-Type': 'application/json'
|
||||
# }
|
||||
# timeout = 5
|
||||
# timout_whitelist = ["vlc"]
|
||||
#
|
||||
# try:
|
||||
#
|
||||
# response = requests.post(self.http_server + "/execute", headers=headers, data=payload, timeout=timeout)
|
||||
# if response.status_code == 200:
|
||||
# print("Command executed successfully:", response.text)
|
||||
# else:
|
||||
# print("Failed to execute command. Status code:", response.status_code)
|
||||
# except requests.exceptions.Timeout as e:
|
||||
# if command in timout_whitelist:
|
||||
# print("Command executed successfully:", command)
|
||||
# else:
|
||||
# print("An error occurred while trying to execute the command:", e)
|
||||
# except requests.exceptions.RequestException as e:
|
||||
# print("An error occurred while trying to execute the command:", e)
|
||||
|
||||
def _download_setup(self, files: List[Dict[str, str]]):
|
||||
"""
|
||||
@@ -138,8 +140,8 @@ class SetupController:
|
||||
|
||||
# send request to server to upload file
|
||||
try:
|
||||
logger.debug("REQUEST ADDRESS: %s", self.http_server_setup_root + "/upload")
|
||||
response = requests.post(self.http_server_setup_root + "/upload", headers=headers, data=form)
|
||||
logger.debug("REQUEST ADDRESS: %s", self.http_server + "/setup" + "/upload")
|
||||
response = requests.post(self.http_server + "/setup" + "/upload", headers=headers, data=form)
|
||||
if response.status_code == 200:
|
||||
logger.info("Command executed successfully: %s", response.text)
|
||||
else:
|
||||
@@ -164,7 +166,7 @@ class SetupController:
|
||||
|
||||
# send request to server to change wallpaper
|
||||
try:
|
||||
response = requests.post(self.http_server_setup_root + "/change_wallpaper", headers=headers, data=payload)
|
||||
response = requests.post(self.http_server + "/setup" + "/change_wallpaper", headers=headers, data=payload)
|
||||
if response.status_code == 200:
|
||||
logger.info("Command executed successfully: %s", response.text)
|
||||
else:
|
||||
@@ -191,7 +193,7 @@ class SetupController:
|
||||
|
||||
# send request to server to open file
|
||||
try:
|
||||
response = requests.post(self.http_server_setup_root + "/open_file", headers=headers, data=payload)
|
||||
response = requests.post(self.http_server + "/setup" + "/open_file", headers=headers, data=payload)
|
||||
if response.status_code == 200:
|
||||
logger.info("Command executed successfully: %s", response.text)
|
||||
else:
|
||||
@@ -211,7 +213,7 @@ class SetupController:
|
||||
headers = {"Content-Type": "application/json"}
|
||||
|
||||
try:
|
||||
response = requests.post(self.http_server_setup_root + "/launch", headers=headers, data=payload)
|
||||
response = requests.post(self.http_server + "/setup" + "/launch", headers=headers, data=payload)
|
||||
if response.status_code == 200:
|
||||
logger.info("Command executed successfully: %s", response.text)
|
||||
else:
|
||||
@@ -227,7 +229,7 @@ class SetupController:
|
||||
headers = {"Content-Type": "application/json"}
|
||||
|
||||
try:
|
||||
response = requests.post(self.http_server_setup_root + "/execute", headers=headers, data=payload)
|
||||
response = requests.post(self.http_server + "/setup" + "/execute", headers=headers, data=payload)
|
||||
if response.status_code == 200:
|
||||
results: Dict[str, str] = response.json()
|
||||
if stdout:
|
||||
@@ -236,10 +238,10 @@ class SetupController:
|
||||
if stderr:
|
||||
with open(os.path.join(self.cache_dir, stderr), "w") as f:
|
||||
f.write(results["error"])
|
||||
logger.info( "Command executed successfully: %s -> %s"
|
||||
, " ".join(command)
|
||||
, response.text
|
||||
)
|
||||
logger.info("Command executed successfully: %s -> %s"
|
||||
, " ".join(command)
|
||||
, response.text
|
||||
)
|
||||
else:
|
||||
logger.error("Failed to launch application. Status code: %s", response.text)
|
||||
except requests.exceptions.RequestException as e:
|
||||
@@ -252,6 +254,7 @@ class SetupController:
|
||||
def _act_setup(self, action_seq: List[Union[Dict[str, Any], str]]):
|
||||
# TODO
|
||||
raise NotImplementedError()
|
||||
|
||||
def _replay_setup(self, trajectory: str):
|
||||
"""
|
||||
Args:
|
||||
@@ -260,3 +263,84 @@ class SetupController:
|
||||
|
||||
# TODO
|
||||
raise NotImplementedError()
|
||||
|
||||
# Chrome setup
|
||||
def _chrome_open_tabs_setup(self, urls_to_open: List[str]):
|
||||
host = self.vm_ip
|
||||
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
||||
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
with sync_playwright() as p:
|
||||
browser = None
|
||||
for attempt in range(15):
|
||||
try:
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
break
|
||||
except Exception as e:
|
||||
if attempt < 14:
|
||||
logger.error(f"Attempt {attempt + 1}: Failed to connect, retrying. Error: {e}")
|
||||
time.sleep(1)
|
||||
else:
|
||||
logger.error(f"Failed to connect after multiple attempts: {e}")
|
||||
raise e
|
||||
|
||||
if not browser:
|
||||
return
|
||||
|
||||
for i, url in enumerate(urls_to_open):
|
||||
# Use the first context (which should be the only one if using default profile)
|
||||
if i == 0:
|
||||
context = browser.contexts[0]
|
||||
|
||||
page = context.new_page() # Create a new page (tab) within the existing context
|
||||
page.goto(url)
|
||||
logger.info(f"Opened tab {i + 1}: {url}")
|
||||
|
||||
if i == 0:
|
||||
# clear the default tab
|
||||
default_page = context.pages[0]
|
||||
default_page.close()
|
||||
|
||||
# Do not close the context or browser; they will remain open after script ends
|
||||
return browser, context
|
||||
|
||||
def _chrome_close_tabs_setup(self, urls_to_close: List[str]):
|
||||
time.sleep(5) # Wait for Chrome to finish launching
|
||||
|
||||
host = self.vm_ip
|
||||
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
||||
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
with sync_playwright() as p:
|
||||
browser = None
|
||||
for attempt in range(15):
|
||||
try:
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
break
|
||||
except Exception as e:
|
||||
if attempt < 14:
|
||||
logger.error(f"Attempt {attempt + 1}: Failed to connect, retrying. Error: {e}")
|
||||
time.sleep(1)
|
||||
else:
|
||||
logger.error(f"Failed to connect after multiple attempts: {e}")
|
||||
raise e
|
||||
|
||||
if not browser:
|
||||
return
|
||||
|
||||
for i, url in enumerate(urls_to_close):
|
||||
# Use the first context (which should be the only one if using default profile)
|
||||
if i == 0:
|
||||
context = browser.contexts[0]
|
||||
|
||||
for page in context.pages:
|
||||
|
||||
# if two urls are the same, close the tab
|
||||
if compare_urls(page.url, url):
|
||||
context.pages.pop(context.pages.index(page))
|
||||
page.close()
|
||||
logger.info(f"Closed tab {i + 1}: {url}")
|
||||
break
|
||||
|
||||
# Do not close the context or browser; they will remain open after script ends
|
||||
return browser, context
|
||||
|
||||
@@ -80,9 +80,8 @@ class DesktopEnv(gym.Env):
|
||||
logger.info("Initializing...")
|
||||
self._start_emulator()
|
||||
self.vm_ip = self._get_vm_ip()
|
||||
self.host = f"http://{self.vm_ip}:5000"
|
||||
self.controller = PythonController(http_server=self.host)
|
||||
self.setup_controller = SetupController(http_server=self.host, cache_dir=self.cache_dir)
|
||||
self.controller = PythonController(vm_ip=self.vm_ip)
|
||||
self.setup_controller = SetupController(vm_ip=self.vm_ip, cache_dir=self.cache_dir)
|
||||
|
||||
# Meta info of the VM
|
||||
self.vm_platform = self.controller.get_vm_platform()
|
||||
|
||||
@@ -3,4 +3,4 @@ from .misc import get_rule
|
||||
from .info import get_vm_screen_size, get_vm_window_size, get_vm_wallpaper
|
||||
from .misc import get_rule, get_accessibility_tree
|
||||
from .vlc import get_vlc_playing_info, get_vlc_config
|
||||
from .chrome import get_default_search_engine, get_bookmarks
|
||||
from .chrome import get_default_search_engine, get_bookmarks, get_open_tabs_info
|
||||
|
||||
@@ -4,6 +4,8 @@ import os
|
||||
import sqlite3
|
||||
from typing import Dict
|
||||
|
||||
from playwright.sync_api import sync_playwright
|
||||
|
||||
logger = logging.getLogger("desktopenv.getters.chrome")
|
||||
|
||||
"""
|
||||
@@ -13,15 +15,20 @@ WARNING:
|
||||
"""
|
||||
|
||||
|
||||
# The following ones just need to load info from the files of software, no need to connect to the software
|
||||
def get_default_search_engine(env, config: Dict[str, str]):
|
||||
os_type = env.vm_platform
|
||||
if os_type == 'Windows':
|
||||
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
|
||||
'Google\\Chrome\\User Data\\Default\\Preferences'))""")['output'].strip()
|
||||
elif os_type == 'Darwin':
|
||||
preference_file_path = env.controller.execute_python_command("import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")['output'].strip()
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
preference_file_path = env.controller.execute_python_command("import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")['output'].strip()
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Preferences'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -41,12 +48,16 @@ def get_default_search_engine(env, config: Dict[str, str]):
|
||||
def get_cookie_data(env, config: Dict[str, str]):
|
||||
os_type = env.vm_platform
|
||||
if os_type == 'Windows':
|
||||
chrome_cookie_file_path = os.path.join(os.getenv('LOCALAPPDATA'), 'Google\\Chrome\\User Data\\Default\\Cookies')
|
||||
chrome_cookie_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
|
||||
'Google\\Chrome\\User Data\\Default\\Cookies'))""")['output'].strip()
|
||||
elif os_type == 'Darwin':
|
||||
chrome_cookie_file_path = os.path.join(os.getenv('HOME'),
|
||||
'Library/Application Support/Google/Chrome/Default/Cookies')
|
||||
chrome_cookie_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Cookies'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
chrome_cookie_file_path = os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Cookies')
|
||||
chrome_cookie_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Cookies'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -70,13 +81,16 @@ def get_cookie_data(env, config: Dict[str, str]):
|
||||
def get_bookmarks(env, config: Dict[str, str]):
|
||||
os_type = env.vm_platform
|
||||
if os_type == 'Windows':
|
||||
preference_file_path = os.path.join(os.getenv('LOCALAPPDATA'),
|
||||
'Google\\Chrome\\User Data\\Default\\Bookmarks')
|
||||
preference_file_path = env.controller.execute_python_command("""import os; print(os.path.join(os.getenv('LOCALAPPDATA'),
|
||||
'Google\\Chrome\\User Data\\Default\\Bookmarks'))""")['output'].strip()
|
||||
elif os_type == 'Darwin':
|
||||
preference_file_path = os.path.join(os.getenv('HOME'),
|
||||
'Library/Application Support/Google/Chrome/Default/Bookmarks')
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), 'Library/Application Support/Google/Chrome/Default/Bookmarks'))")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
preference_file_path = os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Bookmarks')
|
||||
preference_file_path = env.controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config/google-chrome/Default/Bookmarks'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -98,13 +112,16 @@ def get_extensions_installed_from_shop(env, config: Dict[str, str]):
|
||||
"""Find the Chrome extensions directory based on the operating system."""
|
||||
os_type = env.vm_platform
|
||||
if os_type == 'Windows':
|
||||
chrome_extension_dir = os.path.expanduser(
|
||||
'~') + '\\AppData\\Local\\Google\\Chrome\\User Data\\Default\\Extensions\\'
|
||||
chrome_extension_dir = env.controller.execute_python_command(
|
||||
"""os.path.expanduser('~') + '\\AppData\\Local\\Google\\Chrome\\User Data\\Default\\Extensions\\'""")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Darwin': # macOS
|
||||
chrome_extension_dir = os.path.expanduser(
|
||||
'~') + '/Library/Application Support/Google/Chrome/Default/Extensions/'
|
||||
chrome_extension_dir = env.controller.execute_python_command(
|
||||
"""os.path.expanduser('~') + '/Library/Application Support/Google/Chrome/Default/Extensions/'""")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
chrome_extension_dir = os.path.expanduser('~') + '/.config/google-chrome/Default/Extensions/'
|
||||
chrome_extension_dir = env.controller.execute_python_command(
|
||||
"""os.path.expanduser('~') + '/.config/google-chrome/Default/Extensions/'""")['output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
@@ -124,3 +141,52 @@ def get_extensions_installed_from_shop(env, config: Dict[str, str]):
|
||||
except json.JSONDecodeError:
|
||||
logger.error(f"Error reading {manifest_path}")
|
||||
return manifests
|
||||
|
||||
|
||||
# The following ones require Playwright to be installed on the target machine, and the chrome needs to be pre-config on
|
||||
# port info to allow remote debugging, see README.md for details
|
||||
|
||||
def get_open_tabs_info(env, config: Dict[str, str]):
|
||||
host = env.vm_ip
|
||||
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
||||
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
with sync_playwright() as p:
|
||||
# connect to remote Chrome instance
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
|
||||
tabs_info = []
|
||||
for context in browser.contexts:
|
||||
for page in context.pages:
|
||||
title = page.title()
|
||||
url = page.url
|
||||
tabs_info.append({'title': title, 'url': url})
|
||||
|
||||
browser.close()
|
||||
return tabs_info
|
||||
|
||||
|
||||
def get_active_tab_info(env, config: Dict[str, str]):
|
||||
host = env.vm_ip
|
||||
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
|
||||
|
||||
remote_debugging_url = f"http://{host}:{port}"
|
||||
with sync_playwright() as p:
|
||||
# connect to remote Chrome instance
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
|
||||
active_tab_info = {}
|
||||
for context in browser.contexts:
|
||||
for page in context.pages:
|
||||
if page.is_visible("body"): # check the visibility of the page body to determine the active status
|
||||
active_tab_info = {
|
||||
'title': page.title(),
|
||||
'url': page.url,
|
||||
'content': page.content() # get the HTML content of the page
|
||||
}
|
||||
break
|
||||
if active_tab_info:
|
||||
break
|
||||
|
||||
browser.close()
|
||||
return active_tab_info
|
||||
|
||||
@@ -2,6 +2,8 @@ import logging
|
||||
import os
|
||||
from typing import Dict
|
||||
|
||||
import requests
|
||||
|
||||
logger = logging.getLogger("desktopenv.getters.vlc")
|
||||
|
||||
|
||||
@@ -15,7 +17,14 @@ def get_vlc_playing_info(env, config: Dict[str, str]):
|
||||
password = 'password'
|
||||
|
||||
_path = os.path.join(env.cache_dir, config["dest"])
|
||||
content = env.controller.get_vlc_status(host, port, password)
|
||||
url = f'http://{host}:{port}/requests/status.xml'
|
||||
response = requests.get(url, auth=('', password))
|
||||
if response.status_code == 200:
|
||||
content = response.content
|
||||
else:
|
||||
logger.error("Failed to get vlc status. Status code: %d", response.status_code)
|
||||
return None
|
||||
|
||||
with open(_path, "wb") as f:
|
||||
f.write(content)
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from .chrome import is_expected_tabs, is_expected_bookmarks
|
||||
from .docs import compare_font_names, compare_subscript_contains, has_page_numbers_in_footers
|
||||
from .docs import find_default_font, contains_page_break, compare_docx_files, compare_docx_tables, compare_line_spacing, \
|
||||
compare_insert_equation
|
||||
|
||||
@@ -1,52 +1,39 @@
|
||||
import logging
|
||||
|
||||
from playwright.sync_api import sync_playwright
|
||||
from typing import Any, Dict, List
|
||||
from desktop_env.evaluators.metrics.utils import are_lists_equal, compare_urls
|
||||
|
||||
logger = logging.getLogger("desktopenv.metrics.chrome")
|
||||
|
||||
|
||||
def is_expected_tabs(open_tabs: List[Dict[str, str]], rule: Dict[str, Any]) -> float:
|
||||
"""
|
||||
Checks if the expected tabs are open in Chrome.
|
||||
"""
|
||||
|
||||
print(open_tabs, rule)
|
||||
match_type = rule['type']
|
||||
|
||||
if match_type == "url":
|
||||
expected_urls = rule['urls']
|
||||
actual_urls = [tab['url'] for tab in open_tabs]
|
||||
return 1 if are_lists_equal(expected_urls, actual_urls, compare_urls) else 0
|
||||
else:
|
||||
logger.error(f"Unknown type: {match_type}")
|
||||
return 0
|
||||
|
||||
|
||||
# todo: move to getter module
|
||||
def is_expected_bookmarks(bookmarks: List[Dict[str, Any]], rule: Dict[str, Any]) -> float:
|
||||
"""
|
||||
Checks if the expected bookmarks are in Chrome.
|
||||
"""
|
||||
|
||||
# The following ones just need to load info from the files of software, no need to connect to the software
|
||||
# todo
|
||||
match_type = rule['type']
|
||||
|
||||
|
||||
# The following ones require Playwright to be installed on the target machine, and the chrome needs to be pre-config on port info to allow remote debugging, see README.md for details
|
||||
|
||||
def get_open_tabs_info(remote_debugging_url):
|
||||
with sync_playwright() as p:
|
||||
# connect to remote Chrome instance
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
|
||||
tabs_info = []
|
||||
for context in browser.contexts:
|
||||
for page in context.pages:
|
||||
title = page.title()
|
||||
url = page.url
|
||||
tabs_info.append({'title': title, 'url': url})
|
||||
|
||||
browser.close()
|
||||
return tabs_info
|
||||
|
||||
|
||||
def get_active_tab_info(remote_debugging_url):
|
||||
with sync_playwright() as p:
|
||||
# connect to remote Chrome instance
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
|
||||
active_tab_info = {}
|
||||
for context in browser.contexts:
|
||||
for page in context.pages():
|
||||
if page.is_visible("body"): # check the visibility of the page body to determine the active status
|
||||
active_tab_info = {
|
||||
'title': page.title(),
|
||||
'url': page.url,
|
||||
'content': page.content() # get the HTML content of the page
|
||||
}
|
||||
break
|
||||
if active_tab_info:
|
||||
break
|
||||
|
||||
browser.close()
|
||||
return active_tab_info
|
||||
if match_type == "url":
|
||||
expected_urls = rule['urls']
|
||||
actual_urls = [bookmark['url'] for bookmark in bookmarks]
|
||||
return 1 if are_lists_equal(expected_urls, actual_urls, compare_urls) else 0
|
||||
else:
|
||||
logger.error(f"Unknown type: {match_type}")
|
||||
return 0
|
||||
|
||||
@@ -1,26 +1,29 @@
|
||||
import zipfile
|
||||
import lxml.etree
|
||||
import lxml.cssselect
|
||||
from lxml.etree import _Element
|
||||
import xmltodict
|
||||
import openpyxl
|
||||
from openpyxl import Workbook
|
||||
from openpyxl.worksheet.worksheet import Worksheet
|
||||
from openpyxl.chart._chart import ChartBase
|
||||
|
||||
from typing import Dict, List, Set
|
||||
from typing import Any
|
||||
|
||||
import logging
|
||||
import zipfile
|
||||
from typing import Any
|
||||
from typing import Dict, List, Set
|
||||
from urllib.parse import urlparse, urlunparse
|
||||
|
||||
import lxml.cssselect
|
||||
import lxml.etree
|
||||
import openpyxl
|
||||
import xmltodict
|
||||
from lxml.etree import _Element
|
||||
from openpyxl import Workbook
|
||||
from openpyxl.chart._chart import ChartBase
|
||||
from openpyxl.worksheet.worksheet import Worksheet
|
||||
|
||||
logger = logging.getLogger("desktopenv.metrics.utils")
|
||||
|
||||
_xlsx_namespaces = [ ("x14", "http://schemas.microsoft.com/office/spreadsheetml/2009/9/main")
|
||||
, ("xm", "http://schemas.microsoft.com/office/excel/2006/main")
|
||||
]
|
||||
_xlsx_namespaces = [("x14", "http://schemas.microsoft.com/office/spreadsheetml/2009/9/main")
|
||||
, ("xm", "http://schemas.microsoft.com/office/excel/2006/main")
|
||||
]
|
||||
_xlsx_ns_mapping = dict(_xlsx_namespaces)
|
||||
_xlsx_ns_imapping = dict(map(lambda itm: (itm[1], itm[0]), _xlsx_namespaces))
|
||||
_sparklines_selector = lxml.cssselect.CSSSelector("x14|sparkline", namespaces=_xlsx_ns_mapping)
|
||||
#print(_sparklines_selector.css)
|
||||
|
||||
|
||||
# print(_sparklines_selector.css)
|
||||
def load_sparklines(xlsx_file: str) -> Dict[str, str]:
|
||||
"""
|
||||
This function modifies data_frame in-place
|
||||
@@ -44,13 +47,14 @@ def load_sparklines(xlsx_file: str) -> Dict[str, str]:
|
||||
sparklines_dict: Dict[str, str] = {}
|
||||
for sp_l in sparklines:
|
||||
sparkline_xml: str = lxml.etree.tostring(sp_l, encoding="unicode")
|
||||
sparkline: Dict[str, Dict[str, str]] = xmltodict.parse( sparkline_xml
|
||||
, process_namespaces=True
|
||||
, namespaces=_xlsx_ns_imapping
|
||||
)
|
||||
sparkline: Dict[str, Dict[str, str]] = xmltodict.parse(sparkline_xml
|
||||
, process_namespaces=True
|
||||
, namespaces=_xlsx_ns_imapping
|
||||
)
|
||||
sparklines_dict[sparkline["x14:sparkline"]["xm:sqref"]] = sparkline["x14:sparkline"]["xm:f"]
|
||||
return sparklines_dict
|
||||
|
||||
|
||||
# Available Chart Properties:
|
||||
# title: str
|
||||
# anchor: ["oneCell" | "twoCell" | "absolute", col0, row0, col1, row1]
|
||||
@@ -70,7 +74,7 @@ def load_charts(xlsx_file: Workbook, **options) -> Dict[str, Any]:
|
||||
Dict[str, Any]: information of charts
|
||||
"""
|
||||
|
||||
#workbook: Workbook = openpyxl.load_workbook(filename=xlsx_file)
|
||||
# workbook: Workbook = openpyxl.load_workbook(filename=xlsx_file)
|
||||
worksheet: Worksheet = xlsx_file.active
|
||||
charts: List[ChartBase] = worksheet._charts
|
||||
|
||||
@@ -79,22 +83,22 @@ def load_charts(xlsx_file: Workbook, **options) -> Dict[str, Any]:
|
||||
for ch in charts:
|
||||
series: List[str] = []
|
||||
for ser in ch.series:
|
||||
value_num = ser.val.numRef.f\
|
||||
if hasattr(ser.val, "numRef") and hasattr(ser.val.numRef, "f")\
|
||||
else ""
|
||||
value_str = ser.val.strRef.f\
|
||||
if hasattr(ser.val, "strRef") and hasattr(ser.val.strRef, "f")\
|
||||
else ""
|
||||
categ_num = ser.cat.numRef.f\
|
||||
if hasattr(ser.cat, "numRef") and hasattr(ser.cat.numRef, "f")\
|
||||
else ""
|
||||
categ_str = ser.cat.strRef.f\
|
||||
if hasattr(ser.cat, "strRef") and hasattr(ser.cat.strRef, "f")\
|
||||
else ""
|
||||
series.append( "{:},{:},{:},{:}".format( value_num, value_str
|
||||
value_num = ser.val.numRef.f \
|
||||
if hasattr(ser.val, "numRef") and hasattr(ser.val.numRef, "f") \
|
||||
else ""
|
||||
value_str = ser.val.strRef.f \
|
||||
if hasattr(ser.val, "strRef") and hasattr(ser.val.strRef, "f") \
|
||||
else ""
|
||||
categ_num = ser.cat.numRef.f \
|
||||
if hasattr(ser.cat, "numRef") and hasattr(ser.cat.numRef, "f") \
|
||||
else ""
|
||||
categ_str = ser.cat.strRef.f \
|
||||
if hasattr(ser.cat, "strRef") and hasattr(ser.cat.strRef, "f") \
|
||||
else ""
|
||||
series.append("{:},{:},{:},{:}".format(value_num, value_str
|
||||
, categ_num, categ_str
|
||||
)
|
||||
)
|
||||
)
|
||||
series: str = ";".join(series)
|
||||
|
||||
# TODO: maybe more aspects, like chart type
|
||||
@@ -103,10 +107,10 @@ def load_charts(xlsx_file: Workbook, **options) -> Dict[str, Any]:
|
||||
if "title" in chart_props:
|
||||
info["title"] = ch.title.tx.rich.p[0].r[0].t
|
||||
if "anchor" in chart_props:
|
||||
info["anchor"] = [ ch.anchor.editAs
|
||||
, ch.anchor._from.col, ch.anchor.to.row
|
||||
, ch.anchor.to.col, ch.anchor.to.row
|
||||
]
|
||||
info["anchor"] = [ch.anchor.editAs
|
||||
, ch.anchor._from.col, ch.anchor.to.row
|
||||
, ch.anchor.to.col, ch.anchor.to.row
|
||||
]
|
||||
if "width" in chart_props:
|
||||
info["width"] = ch.width
|
||||
if "height" in chart_props:
|
||||
@@ -125,40 +129,83 @@ def load_charts(xlsx_file: Workbook, **options) -> Dict[str, Any]:
|
||||
chart_set[series] = info
|
||||
return chart_set
|
||||
|
||||
|
||||
def are_lists_equal(list1, list2, comparison_func):
|
||||
# First check if both lists have the same length
|
||||
if len(list1) != len(list2):
|
||||
return False
|
||||
|
||||
# Now make sure each element in one list has an equal element in the other list
|
||||
for item1 in list1:
|
||||
# Use the supplied function to test for an equal item
|
||||
if not any(comparison_func(item1, item2) for item2 in list2):
|
||||
return False
|
||||
|
||||
# If all items match, the lists are equal
|
||||
return True
|
||||
|
||||
|
||||
def compare_urls(url1, url2):
|
||||
def normalize_url(url):
|
||||
# Parse the URL
|
||||
parsed_url = urlparse(url)
|
||||
|
||||
# If no scheme is present, assume 'http'
|
||||
scheme = parsed_url.scheme if parsed_url.scheme else 'http'
|
||||
|
||||
# Lowercase the scheme and netloc, remove 'www.', and handle trailing slash
|
||||
normalized_netloc = parsed_url.netloc.lower().replace("www.", "")
|
||||
normalized_path = parsed_url.path if parsed_url.path != '/' else ''
|
||||
|
||||
# Reassemble the URL with normalized components
|
||||
normalized_parsed_url = parsed_url._replace(scheme=scheme.lower(), netloc=normalized_netloc,
|
||||
path=normalized_path)
|
||||
normalized_url = urlunparse(normalized_parsed_url)
|
||||
|
||||
return normalized_url
|
||||
|
||||
# Normalize both URLs for comparison
|
||||
norm_url1 = normalize_url(url1)
|
||||
norm_url2 = normalize_url(url2)
|
||||
|
||||
# Compare the normalized URLs
|
||||
return norm_url1 == norm_url2
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
path1 = "../../../../../任务数据/LibreOffice Calc/Create_column_charts_using_statistics_gold_line_scatter.xlsx"
|
||||
workbook1: Workbook = openpyxl.load_workbook(filename=path1)
|
||||
worksheet1: Worksheet = workbook1.active
|
||||
charts: List[ChartBase] = worksheet1._charts
|
||||
#print(len(charts))
|
||||
#print(type(charts[0]))
|
||||
#
|
||||
#print(len(charts[0].series))
|
||||
#print(type(charts[0].series[0]))
|
||||
#print(type(charts[0].series[0].val))
|
||||
# print(len(charts))
|
||||
# print(type(charts[0]))
|
||||
#
|
||||
# print(len(charts[0].series))
|
||||
# print(type(charts[0].series[0]))
|
||||
# print(type(charts[0].series[0].val))
|
||||
##print(charts[0].series[0].val)
|
||||
#print(charts[0].series[0].val.numRef.f)
|
||||
#
|
||||
#print(type(charts[0].series[0].cat))
|
||||
# print(charts[0].series[0].val.numRef.f)
|
||||
#
|
||||
# print(type(charts[0].series[0].cat))
|
||||
##print(charts[0].series[0].cat)
|
||||
#print(charts[0].series[0].cat.numRef)
|
||||
#print(charts[0].series[0].cat.strRef)
|
||||
#print(charts[0].series[0].cat.strRef.f)
|
||||
# print(charts[0].series[0].cat.numRef)
|
||||
# print(charts[0].series[0].cat.strRef)
|
||||
# print(charts[0].series[0].cat.strRef.f)
|
||||
|
||||
#print(type(charts[0].title.tx.strRef))
|
||||
#print(type(charts[0].title.tx.rich))
|
||||
#print(type(charts[0].title.txPr))
|
||||
#print(len(charts[0].title.tx.rich.p))
|
||||
#print(len(charts[0].title.tx.rich.p[0].r))
|
||||
#print(type(charts[0].title.tx.rich.p[0].r[0]))
|
||||
#print(type(charts[0].title.tx.rich.p[0].r[0].t))
|
||||
#print(charts[0].title.tx.rich.p[0].r[0].t)
|
||||
# print(type(charts[0].title.tx.strRef))
|
||||
# print(type(charts[0].title.tx.rich))
|
||||
# print(type(charts[0].title.txPr))
|
||||
# print(len(charts[0].title.tx.rich.p))
|
||||
# print(len(charts[0].title.tx.rich.p[0].r))
|
||||
# print(type(charts[0].title.tx.rich.p[0].r[0]))
|
||||
# print(type(charts[0].title.tx.rich.p[0].r[0].t))
|
||||
# print(charts[0].title.tx.rich.p[0].r[0].t)
|
||||
|
||||
#print(type(charts[0].anchor))
|
||||
#print(charts[0].anchor.editAs)
|
||||
#print(charts[0].anchor._from.col, charts[0].anchor.to.row)
|
||||
#print(charts[0].anchor.to.col, charts[0].anchor.to.row)
|
||||
# print(type(charts[0].anchor))
|
||||
# print(charts[0].anchor.editAs)
|
||||
# print(charts[0].anchor._from.col, charts[0].anchor.to.row)
|
||||
# print(charts[0].anchor.to.col, charts[0].anchor.to.row)
|
||||
|
||||
#df1 = pd.read_excel(path1)
|
||||
#print(df1)
|
||||
# df1 = pd.read_excel(path1)
|
||||
# print(df1)
|
||||
print(load_charts(path1, chart_props=["title", "xtitle", "ytitle", "type"]))
|
||||
|
||||
Reference in New Issue
Block a user