Merge branch 'main' into zdy
This commit is contained in:
BIN
desktop_env/assets/history_empty.sqlite
Normal file
BIN
desktop_env/assets/history_empty.sqlite
Normal file
Binary file not shown.
@@ -11,7 +11,7 @@ logger = logging.getLogger("desktopenv.pycontroller")
|
||||
|
||||
|
||||
class PythonController:
|
||||
def __init__(self, vm_ip: str, pkgs_prefix: str = "import pyautogui; import time; {command}"):
|
||||
def __init__(self, vm_ip: str, pkgs_prefix: str = "import pyautogui; import time; pyautogui.FAILSAFE = False; {command}"):
|
||||
self.vm_ip = vm_ip
|
||||
self.http_server = f"http://{vm_ip}:5000"
|
||||
self.pkgs_prefix = pkgs_prefix # fixme: this is a hacky way to execute python commands. fix it and combine it with installation of packages
|
||||
|
||||
@@ -1,24 +1,29 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import os.path
|
||||
import sqlite3
|
||||
import tempfile
|
||||
import time
|
||||
import traceback
|
||||
import uuid
|
||||
import tempfile
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any, Union, Optional
|
||||
from typing import Dict, List
|
||||
import os
|
||||
|
||||
import shutil
|
||||
import requests
|
||||
from playwright.sync_api import sync_playwright, TimeoutError
|
||||
from pydrive.auth import GoogleAuth
|
||||
from pydrive.drive import GoogleDrive, GoogleDriveFile, GoogleDriveFileList
|
||||
from playwright.sync_api import sync_playwright, TimeoutError
|
||||
from requests_toolbelt.multipart.encoder import MultipartEncoder
|
||||
|
||||
from desktop_env.controllers.python import PythonController
|
||||
from desktop_env.evaluators.metrics.utils import compare_urls
|
||||
|
||||
logger = logging.getLogger("desktopenv.setup")
|
||||
|
||||
FILE_PATH = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
class SetupController:
|
||||
def __init__(self, vm_ip: str, cache_dir: str):
|
||||
@@ -130,7 +135,8 @@ class SetupController:
|
||||
break
|
||||
|
||||
except requests.RequestException as e:
|
||||
logger.error(f"Failed to download {url} caused by {e}. Retrying... ({max_retries - i - 1} attempts left)")
|
||||
logger.error(
|
||||
f"Failed to download {url} caused by {e}. Retrying... ({max_retries - i - 1} attempts left)")
|
||||
if not downloaded:
|
||||
raise requests.RequestException(f"Failed to download {url}. No retries left. Error: {e}")
|
||||
|
||||
@@ -349,18 +355,18 @@ class SetupController:
|
||||
logger.info("Connect to Chrome @: %s", remote_debugging_url)
|
||||
logger.debug("PLAYWRIGHT ENV: %s", repr(os.environ))
|
||||
for attempt in range(15):
|
||||
if attempt>0:
|
||||
if attempt > 0:
|
||||
time.sleep(5)
|
||||
|
||||
browser = None
|
||||
with sync_playwright() as p:
|
||||
try:
|
||||
browser = p.chromium.connect_over_cdp(remote_debugging_url)
|
||||
#break
|
||||
# break
|
||||
except Exception as e:
|
||||
if attempt < 14:
|
||||
logger.error(f"Attempt {attempt + 1}: Failed to connect, retrying. Error: {e}")
|
||||
#time.sleep(10)
|
||||
# time.sleep(10)
|
||||
continue
|
||||
else:
|
||||
logger.error(f"Failed to connect after multiple attempts: {e}")
|
||||
@@ -379,7 +385,7 @@ class SetupController:
|
||||
try:
|
||||
page.goto(url, timeout=60000)
|
||||
except:
|
||||
logger.warning("Opening %s exceeds time limit", url) # only for human test
|
||||
logger.warning("Opening %s exceeds time limit", url) # only for human test
|
||||
logger.info(f"Opened tab {i + 1}: {url}")
|
||||
|
||||
if i == 0:
|
||||
@@ -458,16 +464,17 @@ class SetupController:
|
||||
for p in paths:
|
||||
q = f'"{parent_id}" in parents and title = "{p}" and mimeType = "application/vnd.google-apps.folder" and trashed = false'
|
||||
folder = drive.ListFile({'q': q}).GetList()
|
||||
if len(folder) == 0: # not exists, create it
|
||||
if len(folder) == 0: # not exists, create it
|
||||
parents = {} if parent_id == 'root' else {'parents': [{'id': parent_id}]}
|
||||
file = drive.CreateFile({'title': p, 'mimeType':'application/vnd.google-apps.folder', **parents})
|
||||
file = drive.CreateFile({'title': p, 'mimeType': 'application/vnd.google-apps.folder', **parents})
|
||||
file.Upload()
|
||||
parent_id = file['id']
|
||||
else: parent_id = folder[0]['id']
|
||||
else:
|
||||
parent_id = folder[0]['id']
|
||||
return parent_id
|
||||
|
||||
for oid, operation in enumerate(config['operation']):
|
||||
if operation == 'delete': # delete a specific file
|
||||
if operation == 'delete': # delete a specific file
|
||||
# query pattern string, by default, remove all files/folders not in the trash to the trash
|
||||
params = config['args'][oid]
|
||||
q = params.get('query', '')
|
||||
@@ -476,15 +483,19 @@ class SetupController:
|
||||
filelist: GoogleDriveFileList = drive.ListFile({'q': q_file}).GetList()
|
||||
q_folder = f"( {q} ) and mimeType = 'application/vnd.google-apps.folder'" if q.strip() else "mimeType = 'application/vnd.google-apps.folder'"
|
||||
folderlist: GoogleDriveFileList = drive.ListFile({'q': q_folder}).GetList()
|
||||
for file in filelist: # first delete file, then folder
|
||||
for file in filelist: # first delete file, then folder
|
||||
file: GoogleDriveFile
|
||||
if trash: file.Trash()
|
||||
else: file.Delete()
|
||||
if trash:
|
||||
file.Trash()
|
||||
else:
|
||||
file.Delete()
|
||||
for folder in folderlist:
|
||||
folder: GoogleDriveFile
|
||||
# note that, if a folder is trashed/deleted, all files and folders in it will be trashed/deleted
|
||||
if trash: folder.Trash()
|
||||
else: folder.Delete()
|
||||
if trash:
|
||||
folder.Trash()
|
||||
else:
|
||||
folder.Delete()
|
||||
elif operation == 'mkdirs':
|
||||
params = config['args'][oid]
|
||||
mkdir_in_googledrive(params['path'])
|
||||
@@ -508,7 +519,6 @@ class SetupController:
|
||||
else:
|
||||
raise ValueError('[ERROR]: not implemented clean type!')
|
||||
|
||||
|
||||
def _login_setup(self, **config):
|
||||
""" Login to a website with account and password information.
|
||||
@args:
|
||||
@@ -568,3 +578,82 @@ class SetupController:
|
||||
raise NotImplementedError
|
||||
|
||||
return browser, context
|
||||
|
||||
def _update_browse_history_setup(self, **config):
|
||||
db_path = os.path.join("desktop_env", "assets", "history_empty.sqlite")
|
||||
|
||||
# copy a new history file in the tmp folder
|
||||
cache_path = os.path.join(self.cache_dir, "history_new.sqlite")
|
||||
shutil.copyfile(db_path, cache_path)
|
||||
db_path = cache_path
|
||||
|
||||
history = config['history']
|
||||
|
||||
for history_item in history:
|
||||
url = history_item['url']
|
||||
title = history_item['title']
|
||||
visit_time = datetime.now() - timedelta(seconds=history_item['visit_time_from_now_in_seconds'])
|
||||
|
||||
# Chrome use ms from 1601-01-01 as timestamp
|
||||
epoch_start = datetime(1601, 1, 1)
|
||||
chrome_timestamp = int((visit_time - epoch_start).total_seconds() * 1000000)
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute('''
|
||||
INSERT INTO urls (url, title, visit_count, typed_count, last_visit_time, hidden)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
''', (url, title, 1, 0, chrome_timestamp, 0))
|
||||
|
||||
url_id = cursor.lastrowid
|
||||
|
||||
cursor.execute('''
|
||||
INSERT INTO visits (url, visit_time, from_visit, transition, segment_id, visit_duration)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
''', (url_id, chrome_timestamp, 0, 805306368, 0, 0))
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
logger.info('Fake browsing history added successfully.')
|
||||
|
||||
controller = PythonController(self.vm_ip)
|
||||
|
||||
# get the path of the history file according to the platform
|
||||
os_type = controller.get_vm_platform()
|
||||
|
||||
if os_type == 'Windows':
|
||||
chrome_history_path = controller.execute_python_command(
|
||||
"""import os; print(os.path.join(os.getenv('USERPROFILE'), "AppData", "Local", "Google", "Chrome", "User Data", "Default", "History"))""")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Darwin':
|
||||
chrome_history_path = controller.execute_python_command(
|
||||
"""import os; print(os.path.join(os.getenv('HOME'), "Library", "Application Support", "Google", "Chrome", "Default", "History"))""")[
|
||||
'output'].strip()
|
||||
elif os_type == 'Linux':
|
||||
chrome_history_path = controller.execute_python_command(
|
||||
"import os; print(os.path.join(os.getenv('HOME'), '.config', 'google-chrome', 'Default', 'History'))")[
|
||||
'output'].strip()
|
||||
else:
|
||||
raise Exception('Unsupported operating system')
|
||||
|
||||
form = MultipartEncoder({
|
||||
"file_path": chrome_history_path,
|
||||
"file_data": (os.path.basename(chrome_history_path), open(db_path, "rb"))
|
||||
})
|
||||
headers = {"Content-Type": form.content_type}
|
||||
logger.debug(form.content_type)
|
||||
|
||||
# send request to server to upload file
|
||||
try:
|
||||
logger.debug("REQUEST ADDRESS: %s", self.http_server + "/setup" + "/upload")
|
||||
response = requests.post(self.http_server + "/setup" + "/upload", headers=headers, data=form)
|
||||
if response.status_code == 200:
|
||||
logger.info("Command executed successfully: %s", response.text)
|
||||
else:
|
||||
logger.error("Failed to upload file. Status code: %s", response.text)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error("An error occurred while trying to send the request: %s", e)
|
||||
|
||||
self._execute_setup(["sudo chown -R user:user /home/user/.config/google-chrome/Default/History"], shell=True)
|
||||
|
||||
@@ -175,10 +175,16 @@ class DesktopEnv(gym.Env):
|
||||
if isinstance(self.evaluator["func"], list) \
|
||||
else getattr(metrics, self.evaluator["func"])
|
||||
self.metric_conj: str = self.evaluator.get("conj", "and") # take conjunction of multiple metrics
|
||||
self.result_getter: Getter = [getattr(getters, "get_{:}".format(res["type"])) for res in
|
||||
if "result" in self.evaluator:
|
||||
self.result_getter: Getter = [getattr(getters, "get_{:}".format(res["type"])) for res in
|
||||
self.evaluator["result"]] \
|
||||
if isinstance(self.evaluator["result"], list) \
|
||||
else getattr(getters, "get_{:}".format(self.evaluator["result"]["type"]))
|
||||
else:
|
||||
self.result_getter = [None] * len(self.metric) \
|
||||
if isinstance(self.metric, list) \
|
||||
else None
|
||||
|
||||
if "expected" in self.evaluator:
|
||||
self.expected_getter: Getter = [getattr(getters, "get_{:}".format(exp["type"])) if exp else None for exp in
|
||||
self.evaluator["expected"]] \
|
||||
@@ -293,6 +299,12 @@ class DesktopEnv(gym.Env):
|
||||
|
||||
self.setup_controller.setup(self.evaluator.get("postconfig", []))
|
||||
|
||||
if self.metric == "infeasible":
|
||||
if self.action_history[-1] == "FAIL":
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
|
||||
if type(self.metric) == list:
|
||||
results = []
|
||||
for idx, metric in enumerate(self.metric):
|
||||
@@ -315,7 +327,8 @@ class DesktopEnv(gym.Env):
|
||||
return 0
|
||||
elif self.metric_conj == 'or' and float(metric) == 1.0:
|
||||
return 1
|
||||
else: results.append(metric)
|
||||
else:
|
||||
results.append(metric)
|
||||
return sum(results) / len(results) if self.metric_conj == 'and' else max(results)
|
||||
else:
|
||||
try:
|
||||
|
||||
@@ -13,6 +13,8 @@ def get_vm_command_line(env, config: Dict[str, str]):
|
||||
|
||||
response = requests.post(f"http://{vm_ip}:{port}/execute", json={"command": command, "shell": shell})
|
||||
|
||||
print(response.json())
|
||||
|
||||
if response.status_code == 200:
|
||||
return response.json()["output"]
|
||||
else:
|
||||
|
||||
@@ -31,9 +31,7 @@ from .docs import (
|
||||
evaluate_alignment,
|
||||
get_unique_train_ids,
|
||||
check_no_duplicates,
|
||||
compare_init_lines
|
||||
)
|
||||
from .docs import (
|
||||
compare_init_lines,
|
||||
find_default_font,
|
||||
contains_page_break,
|
||||
compare_docx_files,
|
||||
@@ -43,6 +41,7 @@ from .docs import (
|
||||
compare_highlighted_text,
|
||||
is_first_line_centered,
|
||||
check_file_exists,
|
||||
check_tabstops,
|
||||
compare_contains_image
|
||||
)
|
||||
from .general import (
|
||||
@@ -86,7 +85,8 @@ from .slides import (
|
||||
evaluate_presentation_fill_to_rgb_distance,
|
||||
check_left_panel,
|
||||
check_transition,
|
||||
check_page_number_colors
|
||||
check_page_number_colors,
|
||||
check_auto_saving_time
|
||||
)
|
||||
from .table import (
|
||||
compare_table,
|
||||
|
||||
@@ -46,7 +46,6 @@ def check_text_enlarged(scaling_factor_str):
|
||||
|
||||
|
||||
def check_moved_jpgs(directory_list, rule):
|
||||
|
||||
expected_jpgs = rule["expected"]
|
||||
moved_jpgs = [node['name'] for node in directory_list['children']]
|
||||
|
||||
|
||||
@@ -6,11 +6,13 @@ import zipfile
|
||||
from typing import List, Dict, Any
|
||||
|
||||
from docx import Document
|
||||
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
|
||||
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT, WD_TAB_ALIGNMENT
|
||||
from docx.shared import RGBColor
|
||||
from odf.opendocument import load
|
||||
from odf.text import P
|
||||
from odf.text import Span
|
||||
from skimage.color import deltaE_ciede2000
|
||||
from skimage.color import rgb2lab
|
||||
|
||||
logger = logging.getLogger("desktopenv.metric.docs")
|
||||
|
||||
@@ -141,7 +143,7 @@ def compare_docx_tables(docx_file1, docx_file2):
|
||||
# Compare each cell
|
||||
for i in range(len(table1.rows)):
|
||||
for j in range(len(table1.columns)):
|
||||
if table1.cell(i, j).text != table2.cell(i, j).text:
|
||||
if table1.cell(i, j).text.strip() != table2.cell(i, j).text.strip():
|
||||
return 0
|
||||
|
||||
return 1
|
||||
@@ -234,6 +236,40 @@ def check_file_exists(directory, filename):
|
||||
return 1 if os.path.isfile(file_path) else 0
|
||||
|
||||
|
||||
def check_tabstops(docx_file1, docx_file2, **kwargs) -> float:
|
||||
doc1: Document = Document(docx_file1)
|
||||
doc2: Document = Document(docx_file2)
|
||||
para1 = [p for p in doc1.paragraphs if p.text.strip()]
|
||||
para2 = [p for p in doc2.paragraphs if p.text.strip()]
|
||||
if len(para1) != len(para2): return .0
|
||||
|
||||
if kwargs.get('word_number_split_by_tabstop', None) is not None:
|
||||
number = kwargs['word_number_split_by_tabstop']
|
||||
index = kwargs.get('index', 0)
|
||||
for p1 in para1:
|
||||
splits = p1.text.split('\t')
|
||||
if len(splits) == 0: return .0
|
||||
words = list(filter(lambda x: x.strip(), re.split(r'\s', splits[index])))
|
||||
if len(words) != number: return .0
|
||||
|
||||
section = doc2.sections[0]
|
||||
paragraph_width = section.page_width - section.left_margin - section.right_margin
|
||||
ignore_tabs = lambda x: x.alignment == WD_TAB_ALIGNMENT.CLEAR or (x.alignment == WD_TAB_ALIGNMENT.LEFT and x.position == 0)
|
||||
minus = .0
|
||||
for p1, p2 in zip(para1, para2):
|
||||
# filter CLEAR tabstop and default left-0 tabstop
|
||||
tabs1 = [tst for tst in p1.paragraph_format.tab_stops if not ignore_tabs(tst)]
|
||||
tabs2 = [tst for tst in p2.paragraph_format.tab_stops if not ignore_tabs(tst)]
|
||||
if len(tabs1) != len(tabs2): return .0
|
||||
difference = .0
|
||||
for t1, t2 in zip(tabs1, tabs2):
|
||||
if t1.alignment != t2.alignment: return .0
|
||||
difference += abs(t1.position - t2.position)
|
||||
minus += difference / paragraph_width
|
||||
score = 1 - (minus / len(para1))
|
||||
return score
|
||||
|
||||
|
||||
def compare_contains_image(docx_file1, docx_file2):
|
||||
doc1 = Document(docx_file1)
|
||||
doc2 = Document(docx_file2)
|
||||
@@ -258,10 +294,18 @@ def compare_contains_image(docx_file1, docx_file2):
|
||||
# print(find_default_font("Ani", config_path))
|
||||
|
||||
|
||||
def evaluate_colored_words_in_tables(file_path1, file_path2):
|
||||
def evaluate_colored_words_in_tables(file_path1, file_path2, **kwargs):
|
||||
if not compare_docx_files(file_path1, file_path2):
|
||||
return 0
|
||||
document = Document(file_path1)
|
||||
threshold = kwargs.get('threshold', 3.5)
|
||||
|
||||
def _calculate_color_difference(rgb1, rgb2):
|
||||
srgb1 = [rgb1[0] / 255.0, rgb1[1] / 255.0, rgb1[2] / 255.0]
|
||||
srgb2 = [rgb2[0] / 255.0, rgb2[1] / 255.0, rgb2[2] / 255.0]
|
||||
lab1, lab2 = rgb2lab(srgb1), rgb2lab(srgb2)
|
||||
delta_e = deltaE_ciede2000(lab1, lab2)
|
||||
return delta_e
|
||||
|
||||
for table in document.tables:
|
||||
# Iterate through rows and cells in the table
|
||||
@@ -273,9 +317,9 @@ def evaluate_colored_words_in_tables(file_path1, file_path2):
|
||||
if word:
|
||||
first_letter = word[0].lower()
|
||||
|
||||
if first_letter in 'aeiou' and run.font.color.rgb != RGBColor(255, 0, 0):
|
||||
if first_letter in 'aeiou' and _calculate_color_difference(run.font.color.rgb, RGBColor(255, 0, 0)) > threshold:
|
||||
return 0 # Vowel-colored words should be red
|
||||
elif first_letter not in 'aeiou' and run.font.color.rgb != RGBColor(0, 0, 255):
|
||||
elif first_letter not in 'aeiou' and _calculate_color_difference(run.font.color.rgb, RGBColor(0, 0, 255)) > threshold:
|
||||
return 0 # Non-vowel-colored words should be blue
|
||||
|
||||
return 1 # All words in tables are correctly colored
|
||||
|
||||
@@ -139,6 +139,7 @@ def compare_pptx_files(file1_path, file2_path, **options):
|
||||
examine_number_of_slides = options.get("examine_number_of_slides", True)
|
||||
examine_shape = options.get("examine_shape", True)
|
||||
examine_text = options.get("examine_text", True)
|
||||
examine_indent = options.get("examine_indent", True)
|
||||
examine_font_name = options.get("examine_font_name", True)
|
||||
examine_font_size = options.get("examine_font_size", True)
|
||||
examine_font_bold = options.get("examine_font_bold", True)
|
||||
@@ -146,6 +147,9 @@ def compare_pptx_files(file1_path, file2_path, **options):
|
||||
examine_color_rgb = options.get("examine_color_rgb", True)
|
||||
examine_font_underline = options.get("examine_font_underline", True)
|
||||
examine_strike_through = options.get("examine_strike_through", True)
|
||||
examine_alignment = options.get("examine_alignment", True)
|
||||
examine_bottom_position = options.get("examine_bottom_position", False)
|
||||
examine_bullets = options.get("examine_bullets", True)
|
||||
|
||||
# compare the number of slides
|
||||
if len(prs1.slides) != len(prs2.slides) and examine_number_of_slides:
|
||||
@@ -153,23 +157,32 @@ def compare_pptx_files(file1_path, file2_path, **options):
|
||||
|
||||
# compare the content of each slide
|
||||
for slide1, slide2 in zip(prs1.slides, prs2.slides):
|
||||
|
||||
# check if the shapes are the same
|
||||
for shape1, shape2 in zip(slide1.shapes, slide2.shapes):
|
||||
if (
|
||||
shape1.left != shape2.left or shape1.top != shape2.top or shape1.width != shape2.width or shape1.height != shape2.height) and examine_shape:
|
||||
if examine_bottom_position and shape1.top != shape2.top:
|
||||
if hasattr(shape1, "text") and hasattr(shape2, "text") and shape1.text == shape2.text and shape1.text == "Product Comparison":
|
||||
if shape1.top >= shape2.top:
|
||||
return 0
|
||||
|
||||
if (shape1.left != shape2.left or shape1.top != shape2.top or shape1.width != shape2.width or shape1.height != shape2.height) and examine_shape:
|
||||
return 0
|
||||
|
||||
if hasattr(shape1, "text") and hasattr(shape2, "text"):
|
||||
if shape1.text != shape2.text and examine_text:
|
||||
return 0
|
||||
|
||||
return 0
|
||||
|
||||
# check if the paragraphs are the same
|
||||
for para1, para2 in zip(shape1.text_frame.paragraphs, shape2.text_frame.paragraphs):
|
||||
if para1.alignment != para2.alignment and examine_alignment:
|
||||
return 0
|
||||
# check if the runs are the same
|
||||
if para1.text != para2.text and examine_text:
|
||||
return 0
|
||||
|
||||
if para1.level != para2.level and examine_indent:
|
||||
return 0
|
||||
|
||||
for run1, run2 in zip(para1.runs, para2.runs):
|
||||
if run1.text != run2.text and examine_text:
|
||||
return 0
|
||||
|
||||
# check if the font properties are the same
|
||||
if run1.font.name != run2.font.name and examine_font_name:
|
||||
@@ -184,16 +197,51 @@ def compare_pptx_files(file1_path, file2_path, **options):
|
||||
if run1.font.italic != run2.font.italic and examine_font_italic:
|
||||
return 0
|
||||
|
||||
if run1.font.color.rgb != run2.font.color.rgb and examine_color_rgb:
|
||||
return 0
|
||||
if hasattr(run1.font.color, "rgb") and hasattr(run2.font.color, "rgb"):
|
||||
if run1.font.color.rgb != run2.font.color.rgb and examine_color_rgb:
|
||||
return 0
|
||||
|
||||
if run1.font.underline != run2.font.underline and examine_font_underline:
|
||||
return 0
|
||||
|
||||
if run1.font._element.attrib.get('strike', 'noStrike') != run2.font._element.attrib.get('strike', 'noStrike') and examine_strike_through:
|
||||
if run1.font._element.attrib.get('strike', 'noStrike') != run2.font._element.attrib.get(
|
||||
'strike', 'noStrike') and examine_strike_through:
|
||||
return 0
|
||||
|
||||
# fixme: Actually there are more properties to be compared, but we cannot get them through pptx
|
||||
def _extract_bullets(xml_data):
|
||||
root = ET.fromstring(xml_data)
|
||||
|
||||
namespaces = {
|
||||
'a': 'http://schemas.openxmlformats.org/drawingml/2006/main',
|
||||
'p': 'http://schemas.openxmlformats.org/presentationml/2006/main',
|
||||
}
|
||||
|
||||
bullets = []
|
||||
|
||||
for paragraph in root.findall('.//a:p', namespaces):
|
||||
pPr = paragraph.find('a:pPr', namespaces)
|
||||
if pPr is not None:
|
||||
lvl = pPr.get('lvl')
|
||||
buChar = pPr.find('a:buChar', namespaces)
|
||||
char = buChar.get('char') if buChar is not None else "No Bullet"
|
||||
buClr = pPr.find('a:buClr/a:srgbClr', namespaces)
|
||||
color = buClr.get('val') if buClr is not None else "No Color"
|
||||
else:
|
||||
lvl = "No Level"
|
||||
char = "No Bullet"
|
||||
color = "No Color"
|
||||
|
||||
text = "".join(t.text for t in paragraph.findall('.//a:t', namespaces))
|
||||
|
||||
bullets.append((lvl, char, text, color))
|
||||
|
||||
return bullets
|
||||
|
||||
if _extract_bullets(run1.part.blob.decode('utf-8')) != _extract_bullets(
|
||||
run2.part.blob.decode('utf-8')) and examine_bullets:
|
||||
return 0
|
||||
|
||||
# fixme: Actually there are more properties to be compared, we can add them later via parsing the xml data
|
||||
|
||||
return 1
|
||||
|
||||
@@ -371,6 +419,51 @@ def check_page_number_colors(pptx_file, rules):
|
||||
|
||||
return 1
|
||||
|
||||
|
||||
def check_auto_saving_time(pptx_file, rules):
|
||||
minutes = rules["minutes"]
|
||||
|
||||
# open and parse xml file
|
||||
try:
|
||||
tree = ET.parse(pptx_file)
|
||||
root = tree.getroot()
|
||||
|
||||
# Traverse the XML tree to find the autosave time setting
|
||||
autosave_time = None
|
||||
for item in root.findall(".//item"):
|
||||
# Check the path attribute
|
||||
path = item.get('{http://openoffice.org/2001/registry}path')
|
||||
if path == "/org.openoffice.Office.Common/Save/Document":
|
||||
# Once the correct item is found, look for the prop element with the name "AutoSaveTimeIntervall"
|
||||
for prop in item.findall(".//prop"):
|
||||
name = prop.get('{http://openoffice.org/2001/registry}name')
|
||||
if name == "AutoSaveTimeIntervall":
|
||||
# Extract the value of the autosave time interval
|
||||
autosave_time = prop.find(".//value").text
|
||||
break
|
||||
|
||||
if autosave_time is None:
|
||||
return 0
|
||||
else:
|
||||
autosave_time = int(autosave_time)
|
||||
if autosave_time == minutes:
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
|
||||
except ET.ParseError as e:
|
||||
logger.error(f"Error parsing XML: {e}")
|
||||
except FileNotFoundError:
|
||||
logger.error(f"File not found: {pptx_file}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(compare_pptx_files(r"C:\Users\tianbaox\Desktop\DesktopEnv\cache\550ce7e7-747b-495f-b122-acdc4d0b8e54\New_Club_Spring_2018_Training_Gold.pptx", r"C:\Users\tianbaox\Desktop\DesktopEnv\cache\550ce7e7-747b-495f-b122-acdc4d0b8e54\New_Club_Spring_2018_Training_Gold.pptx"))
|
||||
# print(evaluate_presentation_fill_to_rgb_distance(r"C:\Users\tianbaox\Desktop\DesktopEnv\cache\3b27600c-3668-4abd-8f84-7bcdebbccbdb\lec17-gui-events.pptx", {"rgb": (0, 0, 255)}))
|
||||
# print(compare_pptx_files(
|
||||
# r"C:\Users\tianbaox\Desktop\DesktopEnv\cache\550ce7e7-747b-495f-b122-acdc4d0b8e54\New_Club_Spring_2018_Training_Gold.pptx",
|
||||
# r"C:\Users\tianbaox\Desktop\DesktopEnv\cache\550ce7e7-747b-495f-b122-acdc4d0b8e54\New_Club_Spring_2018_Training_Gold.pptx"))
|
||||
# print(evaluate_presentation_fill_to_rgb_distance(r"C:\Users\tianbaox\Desktop\DesktopEnv\cache\3b27600c-3668-4abd-8f84-7bcdebbccbdb\lec17-gui-events.pptx", {"rgb": (0, 0, 255)}))
|
||||
# print(check_auto_saving_time(r"C:\Users\tianbaox\Desktop\DesktopEnv\cache\2cd43775-7085-45d8-89fa-9e35c0a915cf\registrymodifications.xcu", {"minutes": 3}))
|
||||
print(compare_pptx_files(
|
||||
r"C:\Users\tianbaox\Desktop\DesktopEnv\cache\a669ef01-ded5-4099-9ea9-25e99b569840\Writing-Outlines.pptx",
|
||||
r"C:\Users\tianbaox\Desktop\DesktopEnv\cache\a669ef01-ded5-4099-9ea9-25e99b569840\Writing-Outlines_Gold.pptx",
|
||||
examine_shape=False))
|
||||
|
||||
@@ -2,7 +2,7 @@ import ctypes
|
||||
import os
|
||||
import platform
|
||||
import shlex
|
||||
import subprocess
|
||||
import subprocess, signal
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
from typing import List, Dict, Tuple
|
||||
@@ -997,7 +997,7 @@ def start_recording():
|
||||
|
||||
start_command = f"ffmpeg -y -f x11grab -draw_mouse 1 -s {screen_width}x{screen_height} -i :0.0 -c:v libx264 -r 30 {recording_path}"
|
||||
|
||||
recording_process = subprocess.Popen(shlex.split(start_command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
recording_process = subprocess.Popen(shlex.split(start_command), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||||
|
||||
return jsonify({'status': 'success', 'message': 'Started recording.'})
|
||||
|
||||
@@ -1009,10 +1009,8 @@ def end_recording():
|
||||
if not recording_process:
|
||||
return jsonify({'status': 'error', 'message': 'No recording in progress to stop.'}), 400
|
||||
|
||||
recording_process.terminate()
|
||||
recording_process.send_signal(signal.SIGINT)
|
||||
recording_process.wait()
|
||||
# return_code = recording_process.returncode
|
||||
output, error = recording_process.communicate()
|
||||
recording_process = None
|
||||
|
||||
# return recording video file
|
||||
|
||||
Reference in New Issue
Block a user