add multi-apps 5 examples by ruisheng 2024-03-06

This commit is contained in:
rhythmcao
2024-03-06 21:20:26 +08:00
parent 69ef653a7c
commit da0dafc32c
14 changed files with 645 additions and 15 deletions

View File

@@ -450,6 +450,8 @@ class SetupController:
query(str): query pattern string to search files or folder in google drive to delete, please refer to
https://developers.google.com/drive/api/guides/search-files?hl=en about how to write query string.
trash(bool): whether to delete files permanently or move to trash. By default, trash=false, completely delete it.
for mkdirs:
path(List[str]): the path in the google drive to create folder
for upload:
path(str): remote url to download file
dest(List[str]): the path in the google drive to store the downloaded file

View File

@@ -23,9 +23,10 @@ from .chrome import (
get_active_tab_url_parse,
get_gotoRecreationPage_and_get_html_content,
get_url_dashPart,
get_active_url_from_accessTree
get_active_url_from_accessTree,
get_info_from_website
)
from .file import get_cloud_file, get_vm_file, get_cache_file
from .file import get_cloud_file, get_vm_file, get_cache_file, get_content_from_vm_file
from .general import get_vm_command_line, get_vm_terminal_output
from .gimp import get_gimp_config_file
from .impress import get_audio_in_slide

View File

@@ -11,7 +11,7 @@ import lxml.etree
import requests
from lxml.cssselect import CSSSelector
from lxml.etree import _Element
from playwright.sync_api import sync_playwright
from playwright.sync_api import sync_playwright, expect
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive, GoogleDriveFileList, GoogleDriveFile
@@ -35,6 +35,89 @@ WARNING:
"""
def get_info_from_website(env, config: Dict[Any, Any]) -> Any:
""" Get information from a website. Especially useful when the information may be updated through time.
Args:
env (Any): The environment object.
config (Dict[Any, Any]): The configuration dictionary.
- url (str): The URL of the website to visit
- infos (List[Dict[str, str]]): The list of information to be extracted from the website. Each dictionary contains:
- action (str): chosen from 'inner_text', 'attribute', 'click_and_inner_text', 'click_and_attribute', etc., concretely,
- inner_text: extract the inner text of the element specified by the selector
- attribute: extract the attribute of the element specified by the selector
- click_and_inner_text: click elements following the selector and then extract the inner text of the last element
- click_and_attribute: click elements following the selector and then extract the attribute of the last element
- selector (Union[str, List[str]]): The CSS selector(s) of the element(s) to be extracted.
- attribute (str): optional for 'attribute' and 'click_and_attribute', the attribute to be extracted.
- backups (Any): The backup information to be returned if the extraction fails.
"""
try:
host = env.vm_ip
port = 9222 # fixme: this port is hard-coded, need to be changed from config file
remote_debugging_url = f"http://{host}:{port}"
with sync_playwright() as p:
# connect to remote Chrome instance
try:
browser = p.chromium.connect_over_cdp(remote_debugging_url)
except Exception as e:
# If the connection fails (e.g., the agent close the browser instance), start a new browser instance
app = 'chromium' if 'arm' in platform.machine() else 'google-chrome'
payload = json.dumps({"command": [
app,
"--remote-debugging-port=1337"
], "shell": False})
headers = {"Content-Type": "application/json"}
requests.post("http://" + host + ":5000/setup" + "/launch", headers=headers, data=payload)
time.sleep(5)
browser = p.chromium.connect_over_cdp(remote_debugging_url)
page = browser.contexts[0].new_page()
page.goto(config["url"])
page.wait_for_load_state('load')
infos = []
for info_dict in config.get('infos', []):
if page.url != config["url"]:
page.goto(config["url"])
page.wait_for_load_state('load')
action = info_dict.get('action', 'inner_text')
if action == "inner_text":
ele = page.locator(info_dict['selector'])
expect(ele).to_be_visible()
infos.append(ele.inner_text())
elif action == "attribute":
ele = page.locator(info_dict['selector'])
expect(ele).to_be_visible()
infos.append(ele.get_attribute(info_dict['attribute']))
elif action == 'click_and_inner_text':
for idx, sel in enumerate(info_dict['selector']):
if idx != len(info_dict['selector']) - 1:
link = page.locator(sel)
expect(link).to_be_visible()
link.click()
page.wait_for_load_state('load')
else:
ele = page.locator(sel)
expect(ele).to_be_visible()
infos.append(ele.inner_text())
elif action == 'click_and_attribute':
for idx, sel in enumerate(info_dict['selector']):
if idx != len(info_dict['selector']) - 1:
link = page.locator(sel)
expect(link).to_be_visible()
link.click()
page.wait_for_load_state('load')
else:
ele = page.locator(sel)
expect(ele).to_be_visible()
infos.append(ele.get_attribute(info_dict['attribute']))
else:
raise NotImplementedError(f'The action {action} is not supported yet.')
return infos
except Exception as e:
logger.error(f'[ERROR]: failed to obtain information from the website: {config["url"]}. Use backup results instead.')
return config.get('backups', None)
# The following ones just need to load info from the files of software, no need to connect to the software
def get_default_search_engine(env, config: Dict[str, str]):
os_type = env.vm_platform

View File

@@ -1,8 +1,27 @@
import os
from typing import Dict, List, Set
from typing import Optional, Any, Union
import requests
import pandas as pd
def get_content_from_vm_file(env, config: Dict[str, Any]) -> Any:
"""
Config:
path (str): absolute path on the VM to fetch
"""
path = config["path"]
file_path = get_vm_file(env, {"path": path, "dest": os.path.basename(path)})
file_type, file_content = config['file_type'], config['file_content']
if file_type == 'xlsx':
if file_content == 'last_row':
df = pd.read_excel(file_path)
last_row = df.iloc[-1]
last_row_as_list = last_row.astype(str).tolist()
return last_row_as_list
else:
raise NotImplementedError(f"File type {file_type} not supported")
def get_cloud_file(env, config: Dict[str, Any]) -> Union[str, List[str]]:

View File

@@ -40,20 +40,23 @@ def get_audio_in_slide(env, config: Dict[str, str]):
audio_file_path = audio_file_path.replace('\\', '/')
# Create a temporary directory to extract the audio file
with tempfile.TemporaryDirectory() as tmpdirname:
# Extract the audio file
myzip.extract(audio_file_path, tmpdirname)
# Get the full path of the extracted audio file
extracted_audio_path = os.path.join(tmpdirname, audio_file_path)
# Return the extracted audio file path
audio_file_path = extracted_audio_path
tmpdirname = os.path.dirname(ppt_file_localhost_path)
myzip.extract(audio_file_path, tmpdirname)
audio_file_path = os.path.join(tmpdirname, audio_file_path)
return audio_file_path
# with tempfile.TemporaryDirectory() as tmpdirname:
# # Extract the audio file
# myzip.extract(audio_file_path, tmpdirname)
# # Get the full path of the extracted audio file
# extracted_audio_path = os.path.join(tmpdirname, audio_file_path)
# # Return the extracted audio file path
# audio_file_path = extracted_audio_path
else:
# the audio file is external to the .pptx file
# Return the audio file path
assert target.startswith("file://"), target
audio_file_path = target[7:]
break
if audio_file_path is None:
return None

View File

@@ -60,7 +60,8 @@ from .general import (
fuzzy_match,
check_include_exclude,
check_direct_json_object,
diff_text_file
diff_text_file,
literal_match
)
from .gimp import (
check_brightness_decrease_and_structure_sim,

View File

@@ -58,6 +58,8 @@ def contains_page_break(docx_file):
def compare_docx_files(file1, file2, **options):
ignore_blanks = options.get('ignore_blanks', True)
ignore_case = options.get('ignore_case', False)
ignore_order = options.get('ignore_order', False)
content_only = options.get('content_only', False)
def get_paragraph_texts_odt(document):
@@ -82,11 +84,17 @@ def compare_docx_files(file1, file2, **options):
doc2 = Document(file2)
doc1_paragraphs = [p.text for p in doc1.paragraphs]
doc2_paragraphs = [p.text for p in doc2.paragraphs]
if ignore_order:
doc1_paragraphs = sorted(doc1_paragraphs)
doc2_paragraphs = sorted(doc2_paragraphs)
elif file1.endswith('.odt') and file2.endswith('.odt'):
doc1 = load(file1)
doc2 = load(file2)
doc1_paragraphs = get_paragraph_texts_odt(doc1)
doc2_paragraphs = get_paragraph_texts_odt(doc2)
if ignore_order:
doc1_paragraphs = sorted(doc1_paragraphs)
doc2_paragraphs = sorted(doc2_paragraphs)
else:
# Unsupported file types or mismatch
print("Unsupported file types or mismatch between file types.")
@@ -96,6 +104,8 @@ def compare_docx_files(file1, file2, **options):
# Compare the content of the documents
text1 = re.sub(r'\s+', ' ', '\n'.join(doc1_paragraphs)).strip()
text2 = re.sub(r'\s+', ' ', '\n'.join(doc2_paragraphs)).strip()
if ignore_case:
text1, text2 = text1.lower(), text2.lower()
similarity = fuzz.ratio(text1, text2) / 100.0
return similarity
@@ -103,6 +113,8 @@ def compare_docx_files(file1, file2, **options):
if ignore_blanks:
text1 = re.sub(r'\s+', ' ', '\n'.join(doc1_paragraphs)).strip()
text2 = re.sub(r'\s+', ' ', '\n'.join(doc2_paragraphs)).strip()
if ignore_case:
text1, text2 = text1.lower(), text2.lower()
if text1 != text2:
return 0
else:
@@ -111,6 +123,8 @@ def compare_docx_files(file1, file2, **options):
# Compare each paragraph
for p1, p2 in zip(doc1_paragraphs, doc2_paragraphs):
if ignore_case:
p1, p2 = p1.lower(), p2.lower()
if p1 != p2:
return 0

View File

@@ -39,6 +39,24 @@ def exact_match(result, rules) -> float:
else:
return 0.
def literal_match(result: Any, expected: Any, **options) -> float:
literal_type = options.get('type', 'str')
if literal_type == 'str':
ignore_case = options.get('ignore_case', False)
score = str(result) == str(expected) if not ignore_case else str(result).lower() == str(expected).lower()
return float(score)
elif literal_type == 'list':
if type(result) not in [list, tuple] or type(expected) not in [list, tuple] or len(result) != len(expected):
return .0
ignore_case = options.get('ignore_case', False)
result = [str(s) for s in result] if not ignore_case else [str(s).lower() for s in result]
expected = [str(s) for s in expected] if not ignore_case else [str(s).lower() for s in expected]
return float(result == expected)
else:
raise NotImplementedError(f"Type {type} not supported")
def is_in_list(result, rules) -> float:
expect = rules["expected"]
if expect in result: