Merge branch 'zdy'

This commit is contained in:
David Chang
2024-01-17 22:48:30 +08:00
27 changed files with 818 additions and 151 deletions

View File

@@ -298,11 +298,11 @@ class SetupController:
# TODO
raise NotImplementedError()
def _activate_window_setup(self, window_name: str):
def _activate_window_setup(self, window_name: str, strict: bool = False, by_class: bool = False):
if not window_name:
raise Exception(f"Setup Open - Invalid path ({window_name}).")
payload = json.dumps({"window_name": window_name})
payload = json.dumps({"window_name": window_name, "strict": strict, "by_class": by_class})
headers = {
'Content-Type': 'application/json'
}
@@ -317,6 +317,25 @@ class SetupController:
except requests.exceptions.RequestException as e:
logger.error("An error occurred while trying to send the request: %s", e)
def _close_window_setup(self, window_name: str, strict: bool = False, by_class: bool = False):
if not window_name:
raise Exception(f"Setup Open - Invalid path ({window_name}).")
payload = json.dumps({"window_name": window_name, "strict": strict, "by_class": by_class})
headers = {
'Content-Type': 'application/json'
}
# send request to server to open file
try:
response = requests.post(self.http_server + "/setup" + "/close_window", headers=headers, data=payload)
if response.status_code == 200:
logger.info("Command executed successfully: %s", response.text)
else:
logger.error(f"Failed to close window {window_name}. Status code: %s", response.text)
except requests.exceptions.RequestException as e:
logger.error("An error occurred while trying to send the request: %s", e)
# Chrome setup
def _chrome_open_tabs_setup(self, urls_to_open: List[str]):
host = self.vm_ip

View File

@@ -1,50 +1,92 @@
import os
from typing import Dict
from typing import Optional
from typing import Dict, List, Set
from typing import Optional, Any, Union
import requests
def get_cloud_file(env, config: Dict[str, str]) -> str:
def get_cloud_file(env, config: Dict[str, Any]) -> Union[str, List[str]]:
"""
Config:
path (str): the url to download from
dest (str): file name of the downloaded file
path (str|List[str]): the url to download from
dest (str|List[str])): file name of the downloaded file
multi (bool) : optional. if path and dest are lists providing
information of multiple files. defaults to False
gives (List[int]): optional. defaults to [0]. which files are directly
returned to the metric. if len==1, str is returned; else, list is
returned.
"""
_path = os.path.join(env.cache_dir, config["dest"])
if os.path.exists(_path):
return _path
if not config.get("multi", False):
paths: List[str] = [config["path"]]
dests: List[str] = [config["dest"]]
else:
paths: List[str] = config["path"]
dests: List[str] = config["dest"]
cache_paths: List[str] = []
url = config["path"]
response = requests.get(url, stream=True)
response.raise_for_status()
gives: Set[int] = set(config.get("gives", [0]))
with open(_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
for i, (p, d) in enumerate(zip(paths, dests)):
_path = os.path.join(env.cache_dir, d)
if i in gives:
cache_paths.append(_path)
return _path
if os.path.exists(_path):
#return _path
continue
url = p
response = requests.get(url, stream=True)
response.raise_for_status()
with open(_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return cache_paths[0] if len(cache_paths)==1 else cache_paths
def get_vm_file(env, config: Dict[str, str]) -> Optional[str]:
def get_vm_file(env, config: Dict[str, Any]) -> Union[Optional[str], List[Optional[str]]]:
"""
Config:
path (str): absolute path on the VM to fetch
dest (str): file name of the downloaded file
multi (bool) : optional. if path and dest are lists providing
information of multiple files. defaults to False
gives (List[int]): optional. defaults to [0]. which files are directly
returned to the metric. if len==1, str is returned; else, list is
returned.
"""
_path = os.path.join(env.cache_dir, config["dest"])
if not config.get("multi", False):
paths: List[str] = [config["path"]]
dests: List[str] = [config["dest"]]
else:
paths: List[str] = config["path"]
dests: List[str] = config["dest"]
cache_paths: List[str] = []
file = env.controller.get_file(config["path"])
if file is None:
return None
# raise FileNotFoundError("File not found on VM: {:}".format(config["path"]))
with open(_path, "wb") as f:
f.write(file)
gives: Set[int] = set(config.get("gives", [0]))
return _path
for i, (p, d) in enumerate(zip(paths, dests)):
_path = os.path.join(env.cache_dir, d)
file = env.controller.get_file(p)
if file is None:
#return None
# raise FileNotFoundError("File not found on VM: {:}".format(config["path"]))
if i in gives:
cache_paths.append(None)
continue
if i in gives:
cache_paths.append(_path)
with open(_path, "wb") as f:
f.write(file)
return cache_paths[0] if len(cache_paths)==1 else cache_paths
def get_cache_file(env, config: Dict[str, str]) -> str:

View File

@@ -3,6 +3,8 @@ import operator
from numbers import Number
from typing import Any, Union
from typing import Dict, List
import os.path
import itertools
import openpyxl
import pandas as pd
@@ -26,6 +28,7 @@ def compare_table(actual: str, expected: str, **options) -> float:
* chart
* number_format
"chart_props": list of str, giving the converned chart properties
"as_shown": bool, TODO
}
Return:
@@ -35,10 +38,35 @@ def compare_table(actual: str, expected: str, **options) -> float:
if actual is None:
return 0.
df1 = pd.read_excel(expected)
df2 = pd.read_excel(actual)
metric: bool = df1.equals(df2)
logger.debug("Normal Contents Metric: {:}".format(metric))
if options.get("as_shown", False):
expected_csv: str = os.path.splitext(expected)[0] + ".csv"
actual_csv: str = os.path.splitext(actual)[0] + ".csv"
with open(expected_csv) as f:
expected_lines: List[str] = list( itertools.dropwhile( lambda l: len(l)==0
, map( lambda l: l.strip()
, reversed(f.read().splitlines())
)
)
)
if options.get("ignore_case", False):
expected_lines = [l.lower() for l in expected_lines]
with open(actual_csv) as f:
actual_lines: List[str] = list( itertools.dropwhile( lambda l: len(l)==0
, map( lambda l: l.strip()
, reversed(f.read().splitlines())
)
)
)
if options.get("ignore_case", False):
actual_lines = [l.lower() for l in actual_lines]
metric: bool = expected_lines==actual_lines
logger.debug("Content Metric just as shown: %s", metric)
else:
df1 = pd.read_excel(expected)
df2 = pd.read_excel(actual)
metric: bool = df1.equals(df2)
logger.debug("Normal Content Metric: {:}".format(metric))
features: List[str] = options.get("features", [])
for ftr in features:
@@ -219,6 +247,8 @@ if __name__ == '__main__':
# print(check_zoom(path1, {"relation": "lt", "ref_value": 100}))
# print(check_zoom(path2, {"relation": "lt", "ref_value": 100}))
path1 = "../../任务数据/LibreOffice Calc/Padding_Decimals_In_Formular_gold.xlsx"
data_frame: pd.DataFrame = pd.read_excel(path1)
print(data_frame)
path1 = "../../任务数据/LibreOffice Calc/Customers_New_7digit_Id.xlsx"
path2 = "../../任务数据/LibreOffice Calc/Customers_New_7digit_Id_gold.xlsx"
#data_frame: pd.DataFrame = pd.read_excel(path1)
#print(data_frame)
print(compare_table(path1, path2, as_shown=True))

View File

@@ -14,7 +14,7 @@ import pyautogui
import requests
from PIL import Image
from Xlib import display, X
from flask import Flask, request, jsonify, send_file, abort, send_from_directory
from flask import Flask, request, jsonify, send_file, abort #, send_from_directory
from lxml.etree import _Element
from pyatspi import Accessible, StateType
from pyatspi import Action as ATAction
@@ -579,39 +579,114 @@ def open_file():
def activate_window():
data = request.json
window_name = data.get('window_name', None)
if not window_name:
return "window_name required", 400
strict: bool = data.get("strict", False) # compare case-sensitively and match the whole string
by_class_name: bool = data.get("by_class", False)
os_name = platform.system()
if os_name == 'Windows':
import pygetwindow as gw
try:
# Find the VS Code window
vscode_window = gw.getWindowsWithTitle(window_name)[0]
# Activate the window, bringing it to the front
vscode_window.activate()
except IndexError:
return "VS Code window not found.", 404
if by_class_name:
return "Get window by class name is not supported on Windows currently.", 500
windows: List[gw.Window] = gw.getWindowsWithTitle(window_name)
window: Optional[gw.Window] = None
if len(windows)==0:
return "Window {:} not found (empty results)".format(window_name), 404
elif strict:
for wnd in windows:
if wnd.title==wnd:
window = wnd
if window is None:
return "Window {:} not found (strict mode).".format(window_name), 404
else:
window = windows[0]
window.activate()
elif os_name == 'Darwin':
import pygetwindow as gw
try:
# Find the VS Code window
vscode_window = gw.getWindowsWithTitle(window_name)[0]
# Un-minimize the window and then bring it to the front
vscode_window.unminimize()
vscode_window.activate()
except IndexError:
return "VS Code window not found.", 404
if by_class_name:
return "Get window by class name is not supported on macOS currently.", 500
# Find the VS Code window
windows = gw.getWindowsWithTitle(window_name)
window: Optional[gw.Window] = None
if len(windows)==0:
return "Window {:} not found (empty results)".format(window_name), 404
elif strict:
for wnd in windows:
if wnd.title==wnd:
window = wnd
if window is None:
return "Window {:} not found (strict mode).".format(window_name), 404
else:
window = windows[0]
# Un-minimize the window and then bring it to the front
window.unminimize()
window.activate()
elif os_name == 'Linux':
# Attempt to activate VS Code window using wmctrl
subprocess.Popen(["wmctrl", "-a", window_name])
subprocess.run( [ "wmctrl"
, "-{:}{:}a".format( "x" if by_class_name else ""
, "F" if strict else ""
)
, window_name
]
)
else:
return f"Operating system {os_name} not supported.", 400
return "File opened successfully", 200
return "Window activated successfully", 200
@app.route("/setup/close_window", methods=["POST"])
def close_window():
data = request.json
if "window_name" not in data:
return "window_name required", 400
window_name: str = data["window_name"]
strict: bool = data.get("strict", False) # compare case-sensitively and match the whole string
by_class_name: bool = data.get("by_class", False)
os_name: str = platform.system()
if os_name == "Windows":
import pygetwindow as gw
if by_class_name:
return "Get window by class name is not supported on Windows currently.", 500
windows: List[gw.Window] = gw.getWindowsWithTitle(window_name)
window: Optional[gw.Window] = None
if len(windows)==0:
return "Window {:} not found (empty results)".format(window_name), 404
elif strict:
for wnd in windows:
if wnd.title==wnd:
window = wnd
if window is None:
return "Window {:} not found (strict mode).".format(window_name), 404
else:
window = windows[0]
window.close()
elif os_name == "Linux":
subprocess.run( [ "wmctrl"
, "-{:}{:}c".format( "x" if by_class_name else ""
, "F" if strict else ""
)
, window_name
]
)
elif os_name=="Darwin":
import pygetwindow as gw
return "Currently not supported on macOS.", 500
else:
return "Not supported platform {:}".format(os_name), 500
return "Window closed successfully.", 200
@app.route('/start_recording', methods=['POST'])
def start_recording():
@@ -639,7 +714,7 @@ def end_recording():
recording_process.terminate()
recording_process.wait()
return_code = recording_process.returncode
#return_code = recording_process.returncode
output, error = recording_process.communicate()
recording_process = None