From a961d2276de8996586988871da07e7de0c4d3d9a Mon Sep 17 00:00:00 2001 From: HappySix <33394488+FredWuCZ@users.noreply.github.com> Date: Tue, 30 Jul 2024 14:07:29 +0800 Subject: [PATCH] Improve efficiency of getting accessibility tree (#57) * Initial commit * Accelarate a11y tree acquisition * Clean code * Leave todos for Windows and macOS * Prepare for support of win and macos * Prepare for support of win and macos * Minor fix * add preliminary support for macos * fix subtle bugs * Clean the windows a11y tree getter code * Clean the windows a11y tree getter code * Intermediate version * Intermediate version * Update * adding support for macos * Delete dummy * Delete dummy * add bounding box for pruning * Delete dummy * FIX: enable a11y tree fetching on Windows * Move the requirement depency item place * FIX: remove "jsonify" from return value of get a11y tree * FIX: change print into logger, decompose functions in _create_pywinauto_node * Update * remove redundant nodes on macos * remove reliance of pywinauto * Clean * Fix bugs for pywinauto a11y_tree acquisition * FEAT: only fetch active windows, skip repeated nodes * CHORE: clean code, add comments on time-consuming part, add connection logger.info * FEAT: 1. add attrs including class_name, id, count... 2. use multithread to accelerate * FIX: add code of getting writable properties * Clean, update the max_depth for macOS * FIX: get all active windows * Accelarate child nodes in macOS * FEAT: get all active windows, add timing when testing * CHORE: remove print * Clean and finalize * Clean and finalize --------- Co-authored-by: Timothyxxx <384084775@qq.com> Co-authored-by: Junli Wang Co-authored-by: YangJL2003 --- desktop_env/server/main.py | 768 +++++++++++++++++----------- desktop_env/server/requirements.txt | 1 + 2 files changed, 467 insertions(+), 302 deletions(-) diff --git a/desktop_env/server/main.py b/desktop_env/server/main.py index ab38350..b1e1c65 100644 --- a/desktop_env/server/main.py +++ b/desktop_env/server/main.py @@ -2,15 +2,18 @@ import ctypes import os import platform import shlex +import json import subprocess, signal from pathlib import Path from typing import Any, Optional, Sequence -from typing import List, Dict, Tuple +from typing import List, Dict, Tuple, Literal +import concurrent.futures import Xlib import lxml.etree import pyautogui import requests +import re from PIL import Image from Xlib import display, X from flask import Flask, request, jsonify, send_file, abort # , send_from_directory @@ -18,23 +21,44 @@ from lxml.etree import _Element platform_name: str = platform.system() -if platform_name=="Linux": +if platform_name == "Linux": import pyatspi from pyatspi import Accessible, StateType, STATE_SHOWING from pyatspi import Action as ATAction - from pyatspi import Component #, Document + from pyatspi import Component # , Document from pyatspi import Text as ATText from pyatspi import Value as ATValue BaseWrapper = Any -elif platform_name=="Windows": + +elif platform_name == "Windows": from pywinauto import Desktop from pywinauto.base_wrapper import BaseWrapper + import pywinauto.application Accessible = Any +elif platform_name == "Darwin": + import plistlib + + import AppKit + import ApplicationServices + import Foundation + import Quartz + import oa_atomacos + + Accessible = Any + BaseWrapper = Any + +else: + # Platform not supported + Accessible = None + BaseWrapper = Any + from pyxcursor import Xcursor +# todo: need to reformat and organize this whole file + app = Flask(__name__) pyautogui.PAUSE = 0 @@ -63,7 +87,8 @@ def execute_command(): # Execute the command without any safety checks. try: - result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell, text=True, timeout=120) + result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell, text=True, + timeout=120) return jsonify({ 'status': 'success', 'output': result.stdout, @@ -106,7 +131,7 @@ def launch_app(): try: if 'google-chrome' in command and _get_machine_architecture() == 'arm': index = command.index('google-chrome') - command[index] = 'chromium-browser' # arm64 chrome is not available yet, can only use chromium + command[index] = 'chromium-browser' # arm64 chrome is not available yet, can only use chromium subprocess.Popen(command, shell=shell) return "{:} launched successfully".format(command if shell else " ".join(command)) except Exception as e: @@ -192,397 +217,347 @@ def get_terminal_output(): return jsonify({"status": "error", "message": str(e)}), 500 -_accessibility_ns_map = { "st": "uri:deskat:state.at-spi.gnome.org" - , "attr": "uri:deskat:attributes.at-spi.gnome.org" - , "cp": "uri:deskat:component.at-spi.gnome.org" - , "doc": "uri:deskat:document.at-spi.gnome.org" - , "docattr": "uri:deskat:attributes.document.at-spi.gnome.org" - , "txt": "uri:deskat:text.at-spi.gnome.org" - , "val": "uri:deskat:value.at-spi.gnome.org" - , "act": "uri:deskat:action.at-spi.gnome.org" - , "win": "uri:deskat:uia.windows.microsoft.org" - } +_accessibility_ns_map = { + "ubuntu": { + "st": "https://accessibility.ubuntu.example.org/ns/state", + "attr": "https://accessibility.ubuntu.example.org/ns/attributes", + "cp": "https://accessibility.ubuntu.example.org/ns/component", + "doc": "https://accessibility.ubuntu.example.org/ns/document", + "docattr": "https://accessibility.ubuntu.example.org/ns/document/attributes", + "txt": "https://accessibility.ubuntu.example.org/ns/text", + "val": "https://accessibility.ubuntu.example.org/ns/value", + "act": "https://accessibility.ubuntu.example.org/ns/action", + }, + "windows": { + "st": "https://accessibility.windows.example.org/ns/state", + "attr": "https://accessibility.windows.example.org/ns/attributes", + "cp": "https://accessibility.windows.example.org/ns/component", + "doc": "https://accessibility.windows.example.org/ns/document", + "docattr": "https://accessibility.windows.example.org/ns/document/attributes", + "txt": "https://accessibility.windows.example.org/ns/text", + "val": "https://accessibility.windows.example.org/ns/value", + "act": "https://accessibility.windows.example.org/ns/action", + "class": "https://accessibility.windows.example.org/ns/class" + }, + "macos": { + "st": "https://accessibility.macos.example.org/ns/state", + "attr": "https://accessibility.macos.example.org/ns/attributes", + "cp": "https://accessibility.macos.example.org/ns/component", + "doc": "https://accessibility.macos.example.org/ns/document", + "txt": "https://accessibility.macos.example.org/ns/text", + "val": "https://accessibility.macos.example.org/ns/value", + "act": "https://accessibility.macos.example.org/ns/action", + "role": "https://accessibility.macos.example.org/ns/role", + } + +} + +_accessibility_ns_map_ubuntu = _accessibility_ns_map['ubuntu'] +_accessibility_ns_map_windows = _accessibility_ns_map['windows'] +_accessibility_ns_map_macos = _accessibility_ns_map['macos'] + +# A11y tree getter for Ubuntu +libreoffice_version_tuple: Optional[Tuple[int, ...]] = None +MAX_DEPTH = 50 +MAX_WIDTH = 1024 +MAX_CALLS = 5000 + + +def _get_libreoffice_version() -> Tuple[int, ...]: + """Function to get the LibreOffice version as a tuple of integers.""" + result = subprocess.run("libreoffice --version", shell=True, text=True, stdout=subprocess.PIPE) + version_str = result.stdout.split()[1] # Assuming version is the second word in the command output + return tuple(map(int, version_str.split("."))) def _create_atspi_node(node: Accessible, depth: int = 0, flag: Optional[str] = None) -> _Element: - # function _create_atspi_node {{{ # - if node.getRoleName() == "document spreadsheet": - flag = "calc" - if node.getRoleName() == "application" and node.name=="Thunderbird": - flag = "thunderbird" + node_name = node.name + attribute_dict: Dict[str, Any] = {"name": node_name} - attribute_dict: Dict[str, Any] = {"name": node.name} - - # States {{{ # + # States states: List[StateType] = node.getState().get_states() for st in states: state_name: str = StateType._enum_lookup[st] state_name: str = state_name.split("_", maxsplit=1)[1].lower() if len(state_name) == 0: continue - attribute_dict[ - "{{{:}}}{:}".format(_accessibility_ns_map["st"], state_name)] = "true" - # }}} States # + attribute_dict["{{{:}}}{:}".format(_accessibility_ns_map_ubuntu["st"], state_name)] = "true" - # Attributes {{{ # + # Attributes attributes: Dict[str, str] = node.get_attributes() for attribute_name, attribute_value in attributes.items(): if len(attribute_name) == 0: continue - attribute_dict["{{{:}}}{:}".format(_accessibility_ns_map["attr"], attribute_name)] = attribute_value - # }}} Attributes # + attribute_dict["{{{:}}}{:}".format(_accessibility_ns_map_ubuntu["attr"], attribute_name)] = attribute_value - # Component {{{ # - if attribute_dict.get("{{{:}}}visible".format(_accessibility_ns_map["st"]), "false") == "true"\ - and attribute_dict.get("{{{:}}}showing".format(_accessibility_ns_map["st"]), "false") == "true": + # Component + if attribute_dict.get("{{{:}}}visible".format(_accessibility_ns_map_ubuntu["st"]), "false") == "true" \ + and attribute_dict.get("{{{:}}}showing".format(_accessibility_ns_map_ubuntu["st"]), "false") == "true": try: component: Component = node.queryComponent() except NotImplementedError: pass else: bbox: Sequence[int] = component.getExtents(pyatspi.XY_SCREEN) - attribute_dict["{{{:}}}screencoord".format(_accessibility_ns_map["cp"])] =\ - str(tuple(bbox[0:2])) - #attribute_dict["{{{:}}}screencoord".format(_accessibility_ns_map["cp"])] = str( - #component.getPosition(pyatspi.XY_SCREEN)) - #attribute_dict["{{{:}}}windowcoord".format(_accessibility_ns_map["cp"])] = str( - #component.getPosition(pyatspi.XY_WINDOW)) - #attribute_dict["{{{:}}}parentcoord".format(_accessibility_ns_map["cp"])] = str( - #component.getPosition(pyatspi.XY_PARENT)) - attribute_dict["{{{:}}}size".format(_accessibility_ns_map["cp"])] = str(tuple(bbox[2:])) - # }}} Component # + attribute_dict["{{{:}}}screencoord".format(_accessibility_ns_map_ubuntu["cp"])] = \ + str(tuple(bbox[0:2])) + attribute_dict["{{{:}}}size".format(_accessibility_ns_map_ubuntu["cp"])] = str(tuple(bbox[2:])) - # Document {{{ # - #try: - #document: Document = node.queryDocument() - #except NotImplementedError: - #pass - #else: - #attribute_dict["{{{:}}}locale".format(_accessibility_ns_map["doc"])] = document.getLocale() - #attribute_dict["{{{:}}}pagecount".format(_accessibility_ns_map["doc"])] = str(document.getPageCount()) - #attribute_dict["{{{:}}}currentpage".format(_accessibility_ns_map["doc"])] = str(document.getCurrentPageNumber()) - #for attrbt in document.getAttributes(): - #attribute_name: str - #attribute_value: str - #attribute_name, attribute_value = attrbt.split(":", maxsplit=1) - #if len(attribute_name) == 0: - #continue - #attribute_dict["{{{:}}}{:}".format(_accessibility_ns_map["docattr"], attribute_name)] = attribute_value - # }}} Document # - - # Text {{{ # + text = "" + # Text try: text_obj: ATText = node.queryText() - except NotImplementedError: - pass - else: # only text shown on current screen is available # attribute_dict["txt:text"] = text_obj.getText(0, text_obj.characterCount) text: str = text_obj.getText(0, text_obj.characterCount) - #if flag=="thunderbird": - # appeard in thunderbird (uFFFC) (not only in thunderbird), "Object + # if flag=="thunderbird": + # appeared in thunderbird (uFFFC) (not only in thunderbird), "Object # Replacement Character" in Unicode, "used as placeholder in text for # an otherwise unspecified object; uFFFD is another "Replacement # Character", just in case text = text.replace("\ufffc", "").replace("\ufffd", "") - # }}} Text # + except NotImplementedError: + pass - # Image {{{ # + # Image, Selection, Value, Action try: node.queryImage() + attribute_dict["image"] = "true" except NotImplementedError: pass - else: - attribute_dict["image"] = "true" - # }}} Image # - # Selection {{{ # try: node.querySelection() + attribute_dict["selection"] = "true" except NotImplementedError: pass - else: - attribute_dict["selection"] = "true" - # }}} Selection # - # Value {{{ # try: value: ATValue = node.queryValue() + value_key = f"{{{_accessibility_ns_map_ubuntu['val']}}}" + + for attr_name, attr_func in [ + ("value", lambda: value.currentValue), + ("min", lambda: value.minimumValue), + ("max", lambda: value.maximumValue), + ("step", lambda: value.minimumIncrement) + ]: + try: + attribute_dict[f"{value_key}{attr_name}"] = str(attr_func()) + except: + pass except NotImplementedError: pass - else: - try: - attribute_dict["{{{:}}}value".format(_accessibility_ns_map["val"])] = str(value.currentValue) - except: - pass - try: - attribute_dict["{{{:}}}min".format(_accessibility_ns_map["val"])] = str(value.minimumValue) - except: - pass - try: - attribute_dict["{{{:}}}max".format(_accessibility_ns_map["val"])] = str(value.maximumValue) - except: - pass - try: - attribute_dict["{{{:}}}step".format(_accessibility_ns_map["val"])] = str(value.minimumIncrement) - except: - pass - # }}} Value # - # Action {{{ # try: action: ATAction = node.queryAction() - except NotImplementedError: - pass - else: for i in range(action.nActions): action_name: str = action.getName(i).replace(" ", "-") - attribute_dict["{{{:}}}{:}_desc" \ - .format(_accessibility_ns_map["act"] - , action_name - ) - ] = action.getDescription(i) - attribute_dict["{{{:}}}{:}_kb" \ - .format(_accessibility_ns_map["act"] - , action_name - ) - ] = action.getKeyBinding(i) - # }}} Action # + attribute_dict[ + "{{{:}}}{:}_desc".format(_accessibility_ns_map_ubuntu["act"], action_name)] = action.getDescription( + i) + attribute_dict[ + "{{{:}}}{:}_kb".format(_accessibility_ns_map_ubuntu["act"], action_name)] = action.getKeyBinding(i) + except NotImplementedError: + pass + + # Add from here if we need more attributes in the future... raw_role_name: str = node.getRoleName().strip() node_role_name = (raw_role_name or "unknown").replace(" ", "-") + if not flag: + if raw_role_name == "document spreadsheet": + flag = "calc" + if raw_role_name == "application" and node.name == "Thunderbird": + flag = "thunderbird" + xml_node = lxml.etree.Element( node_role_name, attrib=attribute_dict, - nsmap=_accessibility_ns_map + nsmap=_accessibility_ns_map_ubuntu ) - if "text" in locals() and len(text) > 0: + + if len(text) > 0: xml_node.text = text - # HYPERPARAMETER - if depth==50: + if depth == MAX_DEPTH: logger.warning("Max depth reached") return xml_node - if flag=="calc" and node_role_name=="table": + if flag == "calc" and node_role_name == "table": # Maximum column: 1024 if ver<=7.3 else 16384 # Maximum row: 104 8576 # Maximun sheet: 1 0000 - version_str: str = subprocess.run("libreoffice --version", shell=True, text=True, stdout=subprocess.PIPE).stdout - version_str = version_str.split()[1] - version_tuple: Tuple[int] = tuple(map(int, version_str.split("."))) - MAXIMUN_COLUMN = 1024 if version_tuple<(7, 4) else 16384 + global libreoffice_version_tuple + MAXIMUN_COLUMN = 1024 if libreoffice_version_tuple < (7, 4) else 16384 MAX_ROW = 104_8576 index_base = 0 first_showing = False column_base = None for r in range(MAX_ROW): - #logger.warning(r) for clm in range(column_base or 0, MAXIMUN_COLUMN): - child_node: Accessible = node[index_base+clm] + child_node: Accessible = node[index_base + clm] showing: bool = child_node.getState().contains(STATE_SHOWING) if showing: - child_node: _Element = _create_atspi_node(child_node, depth+1, flag) + child_node: _Element = _create_atspi_node(child_node, depth + 1, flag) if not first_showing: column_base = clm first_showing = True xml_node.append(child_node) - elif first_showing and column_base is not None or clm>=500: + elif first_showing and column_base is not None or clm >= 500: break - if first_showing and clm==column_base or not first_showing and r>=500: + if first_showing and clm == column_base or not first_showing and r >= 500: break index_base += MAXIMUN_COLUMN return xml_node else: try: for i, ch in enumerate(node): - # HYPERPARAMETER - if i>=1025: + if i == MAX_WIDTH: logger.warning("Max width reached") break - xml_node.append(_create_atspi_node(ch, depth+1, flag)) + xml_node.append(_create_atspi_node(ch, depth + 1, flag)) except: - logger.warning("Error occurred during children traversing. Has Ignored. Node: %s", lxml.etree.tostring(xml_node, encoding="unicode")) + logger.warning("Error occurred during children traversing. Has Ignored. Node: %s", + lxml.etree.tostring(xml_node, encoding="unicode")) return xml_node - # }}} function _create_atspi_node # -def _create_pywinauto_node(node: BaseWrapper, depth: int = 0, flag: Optional[str] = None) -> _Element: - # function _create_pywinauto_node {{{ # - #element_info: ElementInfo = node.element_info + +# A11y tree getter for Windows +def _create_pywinauto_node(node, nodes, depth: int = 0, flag: Optional[str] = None) -> _Element: + nodes = nodes or set() + if node in nodes: + return + nodes.add(node) + attribute_dict: Dict[str, Any] = {"name": node.element_info.name} - # States {{{ # + base_properties = {} try: - attribute_dict["{{{:}}}enabled".format(_accessibility_ns_map["st"])] = str(node.is_enabled()).lower() + base_properties.update( + node.get_properties()) # get all writable/not writable properties, but have bugs when landing on chrome and it's slower! except: - pass + logger.debug("Failed to call get_properties(), trying to get writable properites") + try: + _element_class = node.__class__ + + class TempElement(node.__class__): + writable_props = pywinauto.base_wrapper.BaseWrapper.writable_props + + # Instantiate the subclass + node.__class__ = TempElement + # Retrieve properties using get_properties() + properties = node.get_properties() + node.__class__ = _element_class + + base_properties.update(properties) # only get all writable properties + logger.debug("get writable properties") + except Exception as e: + logger.error(e) + pass + + # Count-cnt + for attr_name in ["control_count", "button_count", "item_count", "column_count"]: + try: + attribute_dict[f"{{{_accessibility_ns_map_windows['cnt']}}}{attr_name}"] = base_properties[ + attr_name].lower() + except: + pass + + # Columns-cols try: - attribute_dict["{{{:}}}visible".format(_accessibility_ns_map["st"])] = str(node.is_visible()).lower() - except: - pass - try: - attribute_dict["{{{:}}}active".format(_accessibility_ns_map["st"])] = str(node.is_active()).lower() + attribute_dict[f"{{{_accessibility_ns_map_windows['cols']}}}columns"] = base_properties["columns"].lower() except: pass - if hasattr(node, "is_minimized"): + # Id-id + for attr_name in ["control_id", "automation_id", "window_id"]: try: - attribute_dict["{{{:}}}minimized".format(_accessibility_ns_map["st"])] = str(node.is_minimized()).lower() - except: - pass - if hasattr(node, "is_maximized"): - try: - attribute_dict["{{{:}}}maximized".format(_accessibility_ns_map["st"])] = str(node.is_maximized()).lower() - except: - pass - if hasattr(node, "is_normal"): - try: - attribute_dict["{{{:}}}normal".format(_accessibility_ns_map["st"])] = str(node.is_normal()).lower() + attribute_dict[f"{{{_accessibility_ns_map_windows['id']}}}{attr_name}"] = base_properties[attr_name].lower() except: pass - if hasattr(node, "is_unicode"): + # States + # 19 sec out of 20 + for attr_name, attr_func in [ + ("enabled", lambda: node.is_enabled()), + ("visible", lambda: node.is_visible()), + # ("active", lambda: node.is_active()), # occupied most of the time: 20s out of 21s for slack, 51.5s out of 54s for WeChat # maybe use for cutting branches + ("minimized", lambda: node.is_minimized()), + ("maximized", lambda: node.is_maximized()), + ("normal", lambda: node.is_normal()), + ("unicode", lambda: node.is_unicode()), + ("collapsed", lambda: node.is_collapsed()), + ("checkable", lambda: node.is_checkable()), + ("checked", lambda: node.is_checked()), + ("focused", lambda: node.is_focused()), + ("keyboard_focused", lambda: node.is_keyboard_focused()), + ("selected", lambda: node.is_selected()), + ("selection_required", lambda: node.is_selection_required()), + ("pressable", lambda: node.is_pressable()), + ("pressed", lambda: node.is_pressed()), + ("expanded", lambda: node.is_expanded()), + ("editable", lambda: node.is_editable()), + ("has_keyboard_focus", lambda: node.has_keyboard_focus()), + ("is_keyboard_focusable", lambda: node.is_keyboard_focusable()), + ]: try: - attribute_dict["{{{:}}}unicode".format(_accessibility_ns_map["st"])] = str(node.is_unicode()).lower() + attribute_dict[f"{{{_accessibility_ns_map_windows['st']}}}{attr_name}"] = str(attr_func()).lower() except: pass - if hasattr(node, "is_collapsed"): - try: - attribute_dict["{{{:}}}collapsed".format(_accessibility_ns_map["st"])] = str(node.is_collapsed()).lower() - except: - pass - if hasattr(node, "is_checkable"): - try: - attribute_dict["{{{:}}}checkable".format(_accessibility_ns_map["st"])] = str(node.is_checkable()).lower() - except: - pass - if hasattr(node, "is_checked"): - try: - attribute_dict["{{{:}}}checked".format(_accessibility_ns_map["st"])] = str(node.is_checked()).lower() - except: - pass - if hasattr(node, "is_focused"): - try: - attribute_dict["{{{:}}}focused".format(_accessibility_ns_map["st"])] = str(node.is_focused()).lower() - except: - pass - if hasattr(node, "is_keyboard_focused"): - try: - attribute_dict["{{{:}}}keyboard_focused".format(_accessibility_ns_map["st"])] = str(node.is_keyboard_focused()).lower() - except: - pass - if hasattr(node, "is_selected"): - try: - attribute_dict["{{{:}}}selected".format(_accessibility_ns_map["st"])] = str(node.is_selected()).lower() - except: - pass - if hasattr(node, "is_selection_required"): - try: - attribute_dict["{{{:}}}selection_required".format(_accessibility_ns_map["st"])] = str(node.is_selection_required()).lower() - except: - pass - if hasattr(node, "is_pressable"): - try: - attribute_dict["{{{:}}}pressable".format(_accessibility_ns_map["st"])] = str(node.is_pressable()).lower() - except: - pass - if hasattr(node, "is_pressed"): - try: - attribute_dict["{{{:}}}pressed".format(_accessibility_ns_map["st"])] = str(node.is_pressed()).lower() - except: - pass + # Component + try: + rectangle = node.rectangle() + attribute_dict["{{{:}}}screencoord".format(_accessibility_ns_map_windows["cp"])] = \ + "({:d}, {:d})".format(rectangle.left, rectangle.top) + attribute_dict["{{{:}}}size".format(_accessibility_ns_map_windows["cp"])] = \ + "({:d}, {:d})".format(rectangle.width(), rectangle.height()) - if hasattr(node, "is_expanded"): - try: - attribute_dict["{{{:}}}expanded".format(_accessibility_ns_map["st"])] = str(node.is_expanded()).lower() - except: - pass - if hasattr(node, "is_editable"): - try: - attribute_dict["{{{:}}}editable".format(_accessibility_ns_map["st"])] = str(node.is_editable()).lower() - except: - pass - # }}} States # + except Exception as e: + logger.error("Error accessing rectangle: ", e) - # Component {{{ # - rectangle = node.rectangle() - attribute_dict["{{{:}}}screencoord".format(_accessibility_ns_map["cp"])] = "({:d}, {:d})".format(rectangle.left, rectangle.top) - attribute_dict["{{{:}}}size".format(_accessibility_ns_map["cp"])] = "({:d}, {:d})".format(rectangle.width(), rectangle.height()) - # }}} Component # - - # Text {{{ # + # Text text: str = node.window_text() - if text==attribute_dict["name"]: + if text == attribute_dict["name"]: text = "" - #if hasattr(node, "texts"): - #texts: List[str] = node.texts()[1:] - #texts: Iterable[str] = map(lambda itm: itm if isinstance(itm, str) else "".join(itm), texts) - #text += "\n".join(texts) - #text = text.strip() - # }}} Text # - # Selection {{{ # + # Selection if hasattr(node, "select"): attribute_dict["selection"] = "true" - # }}} Selection # - # Value {{{ # - if hasattr(node, "get_step"): - try: - attribute_dict["{{{:}}}step".format(_accessibility_ns_map["val"])] = str(node.get_step()) - except: - pass - if hasattr(node, "value"): - try: - attribute_dict["{{{:}}}value".format(_accessibility_ns_map["val"])] = str(node.value()) - except: - pass - if hasattr(node, "get_value"): - try: - attribute_dict["{{{:}}}value".format(_accessibility_ns_map["val"])] = str(node.get_value()) - except: - pass - elif hasattr(node, "get_position"): - try: - attribute_dict["{{{:}}}value".format(_accessibility_ns_map["val"])] = str(node.get_position()) - except: - pass - if hasattr(node, "min_value"): - try: - attribute_dict["{{{:}}}min".format(_accessibility_ns_map["val"])] = str(node.min_value()) - except: - pass - elif hasattr(node, "get_range_min"): - try: - attribute_dict["{{{:}}}min".format(_accessibility_ns_map["val"])] = str(node.get_range_min()) - except: - pass - if hasattr(node, "max_value"): - try: - attribute_dict["{{{:}}}max".format(_accessibility_ns_map["val"])] = str(node.max_value()) - except: - pass - elif hasattr(node, "get_range_max"): - try: - attribute_dict["{{{:}}}max".format(_accessibility_ns_map["val"])] = str(node.get_range_max()) - except: - pass - # }}} Value # + # Value + for attr_name, attr_funcs in [ + ("step", [lambda: node.get_step()]), + ("value", [lambda: node.value(), lambda: node.get_value(), lambda: node.get_position()]), + ("min", [lambda: node.min_value(), lambda: node.get_range_min()]), + ("max", [lambda: node.max_value(), lambda: node.get_range_max()]) + ]: + for attr_func in attr_funcs: + if hasattr(node, attr_func.__name__): + try: + attribute_dict[f"{{{_accessibility_ns_map_windows['val']}}}{attr_name}"] = str(attr_func()) + break # exit once the attribute is set successfully + except: + pass - attribute_dict["{{{:}}}class".format(_accessibility_ns_map["win"])] = str(type(node)) + attribute_dict["{{{:}}}class".format(_accessibility_ns_map_windows["class"])] = str(type(node)) + + # class_name + for attr_name in ["class_name", "friendly_class_name"]: + try: + attribute_dict[f"{{{_accessibility_ns_map_windows['class']}}}{attr_name}"] = base_properties[ + attr_name].lower() + except: + pass node_role_name: str = node.class_name().lower().replace(" ", "-") - node_role_name = "".join( map( lambda ch: ch if ch.isidentifier()\ - or ch in {"-"}\ - or ch.isalnum() - else "-" - , node_role_name - ) - ) + node_role_name = "".join( + map(lambda _ch: _ch if _ch.isidentifier() or _ch in {"-"} or _ch.isalnum() else "-", node_role_name)) + if node_role_name.strip() == "": node_role_name = "unknown" if not node_role_name[0].isalpha(): @@ -591,26 +566,185 @@ def _create_pywinauto_node(node: BaseWrapper, depth: int = 0, flag: Optional[str xml_node = lxml.etree.Element( node_role_name, attrib=attribute_dict, - nsmap=_accessibility_ns_map + nsmap=_accessibility_ns_map_windows ) - if text is not None and len(text)>0 and text!=attribute_dict["name"]: + + if text is not None and len(text) > 0 and text != attribute_dict["name"]: xml_node.text = text - # HYPERPARAMETER - if depth==50: + if depth == MAX_DEPTH: logger.warning("Max depth reached") - #print("Max depth reached") return xml_node - for i, ch in enumerate(node.children()): - # HYPERPARAMETER - if i>=2048: - logger.warning("Max width reached") - #print("Max width reached") - break - xml_node.append(_create_pywinauto_node(ch, depth+1, flag)) + # use multi thread to accelerate children fetching + children = node.children() + if children: + with concurrent.futures.ThreadPoolExecutor() as executor: + future_to_child = [executor.submit(_create_pywinauto_node, ch, nodes, depth + 1, flag) for ch in + children[:MAX_WIDTH]] + try: + xml_node.extend([future.result() for future in concurrent.futures.as_completed(future_to_child)]) + except Exception as e: + logger.error(f"Exception occurred: {e}") return xml_node - # }}} function _create_pywinauto_node # + + +# A11y tree getter for macOS + +def _create_axui_node(node, nodes: set = None, depth: int = 0, bbox: tuple = None): + nodes = nodes or set() + if node in nodes: + return + nodes.add(node) + + reserved_keys = { + "AXEnabled": "st", + "AXFocused": "st", + "AXFullScreen": "st", + "AXTitle": "attr", + "AXChildrenInNavigationOrder": "attr", + "AXChildren": "attr", + "AXFrame": "attr", + "AXRole": "role", + "AXHelp": "attr", + "AXRoleDescription": "role", + "AXSubrole": "role", + "AXURL": "attr", + "AXValue": "val", + "AXDescription": "attr", + "AXDOMIdentifier": "attr", + "AXSelected": "st", + "AXInvalid": "st", + "AXRows": "attr", + "AXColumns": "attr", + } + attribute_dict = {} + + if depth == 0: + bbox = ( + node["kCGWindowBounds"]["X"], + node["kCGWindowBounds"]["Y"], + node["kCGWindowBounds"]["X"] + node["kCGWindowBounds"]["Width"], + node["kCGWindowBounds"]["Y"] + node["kCGWindowBounds"]["Height"] + ) + app_ref = ApplicationServices.AXUIElementCreateApplication(node["kCGWindowOwnerPID"]) + error_code, app_wins_ref = ApplicationServices.AXUIElementCopyAttributeValue(app_ref, "AXWindows", None) + if error_code: + logger.error("MacOS parsing %s encountered Error code: %d", app_ref, error_code) + + attribute_dict["name"] = node["kCGWindowOwnerName"] + + node = app_wins_ref[0] + + error_code, attr_names = ApplicationServices.AXUIElementCopyAttributeNames(node, None) + + if error_code: + # -25202: AXError.invalidUIElement + # The accessibility object received in this event is invalid. + return + + value = None + + if "AXFrame" in attr_names: + error_code, attr_val = ApplicationServices.AXUIElementCopyAttributeValue(node, "AXFrame", None) + rep = repr(attr_val) + x_value = re.search(r"x:(-?[\d.]+)", rep) + y_value = re.search(r"y:(-?[\d.]+)", rep) + w_value = re.search(r"w:(-?[\d.]+)", rep) + h_value = re.search(r"h:(-?[\d.]+)", rep) + type_value = re.search(r"type\s?=\s?(\w+)", rep) + value = { + "x": float(x_value.group(1)) if x_value else None, + "y": float(y_value.group(1)) if y_value else None, + "w": float(w_value.group(1)) if w_value else None, + "h": float(h_value.group(1)) if h_value else None, + "type": type_value.group(1) if type_value else None, + } + + if not any(v is None for v in value.values()): + x_min = max(bbox[0], value["x"]) + x_max = min(bbox[2], value["x"] + value["w"]) + y_min = max(bbox[1], value["y"]) + y_max = min(bbox[3], value["y"] + value["h"]) + + if x_min > x_max or y_min > y_max: + # No intersection + return + + role = None + text = None + + for attr_name, ns_key in reserved_keys.items(): + if attr_name not in attr_names: + continue + + if value and attr_name == "AXFrame": + bb = value + if not any(v is None for v in bb.values()): + attribute_dict["{{{:}}}screencoord".format(_accessibility_ns_map_macos["cp"])] = \ + "({:d}, {:d})".format(int(bb["x"]), int(bb["y"])) + attribute_dict["{{{:}}}size".format(_accessibility_ns_map_macos["cp"])] = \ + "({:d}, {:d})".format(int(bb["w"]), int(bb["h"])) + continue + + error_code, attr_val = ApplicationServices.AXUIElementCopyAttributeValue(node, attr_name, None) + + full_attr_name = f"{{{_accessibility_ns_map_macos[ns_key]}}}{attr_name}" + + if attr_name == "AXValue" and not text: + text = str(attr_val) + continue + + if attr_name == "AXRoleDescription": + role = attr_val + continue + + # Set the attribute_dict + if not (isinstance(attr_val, ApplicationServices.AXUIElementRef) + or isinstance(attr_val, (AppKit.NSArray, list))): + if attr_val is not None: + attribute_dict[full_attr_name] = str(attr_val) + + node_role_name = role.lower().replace(" ", "_") if role else "unknown_role" + + xml_node = lxml.etree.Element( + node_role_name, + attrib=attribute_dict, + nsmap=_accessibility_ns_map_macos + ) + + if text is not None and len(text) > 0: + xml_node.text = text + + if depth == MAX_DEPTH: + logger.warning("Max depth reached") + return xml_node + + future_to_child = [] + + with concurrent.futures.ThreadPoolExecutor() as executor: + for attr_name, ns_key in reserved_keys.items(): + if attr_name not in attr_names: + continue + + error_code, attr_val = ApplicationServices.AXUIElementCopyAttributeValue(node, attr_name, None) + if isinstance(attr_val, ApplicationServices.AXUIElementRef): + future_to_child.append(executor.submit(_create_axui_node, attr_val, nodes, depth + 1, bbox)) + + elif isinstance(attr_val, (AppKit.NSArray, list)): + for child in attr_val: + future_to_child.append(executor.submit(_create_axui_node, child, nodes, depth + 1, bbox)) + + try: + for future in concurrent.futures.as_completed(future_to_child): + result = future.result() + if result is not None: + xml_node.append(result) + except Exception as e: + logger.error(f"Exception occurred: {e}") + + return xml_node + @app.route("/accessibility", methods=["GET"]) def get_accessibility_tree(): @@ -618,30 +752,61 @@ def get_accessibility_tree(): # AT-SPI works for KDE as well if os_name == "Linux": + global libreoffice_version_tuple + libreoffice_version_tuple = _get_libreoffice_version() + desktop: Accessible = pyatspi.Registry.getDesktop(0) - desktop_xml: _Element = _create_atspi_node(desktop, 0) - return jsonify({"AT": lxml.etree.tostring(desktop_xml, encoding="unicode")}) + xml_node = lxml.etree.Element("desktop-frame", nsmap=_accessibility_ns_map_ubuntu) + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [executor.submit(_create_atspi_node, app_node, 1) for app_node in desktop] + for future in concurrent.futures.as_completed(futures): + xml_tree = future.result() + xml_node.append(xml_tree) + return jsonify({"AT": lxml.etree.tostring(xml_node, encoding="unicode")}) elif os_name == "Windows": - # Windows AT may be read through `pywinauto` module, however, two different backends `win32` and `uia` are supported and different results may be returned + # Attention: Windows a11y tree is implemented to be read through `pywinauto` module, however, + # two different backends `win32` and `uia` are supported and different results may be returned desktop: Desktop = Desktop(backend="uia") - xml_node = lxml.etree.Element("desktop", nsmap=_accessibility_ns_map) - for wnd in desktop.windows(): - logger.debug("Win UIA AT parsing: %s(%d)", wnd.element_info.name, len(wnd.children())) - node: _Element = _create_pywinauto_node(wnd, 1) - xml_node.append(node) + xml_node = lxml.etree.Element("desktop", nsmap=_accessibility_ns_map_windows) + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [executor.submit(_create_pywinauto_node, wnd, {}, 1) for wnd in desktop.windows()] + for future in concurrent.futures.as_completed(futures): + xml_tree = future.result() + xml_node.append(xml_tree) return jsonify({"AT": lxml.etree.tostring(xml_node, encoding="unicode")}) + + elif os_name == "Darwin": + xml_node = lxml.etree.Element("desktop", nsmap=_accessibility_ns_map_macos) + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [ + executor.submit(_create_axui_node, wnd, None, 0) for wnd in + [win for win in + Quartz.CGWindowListCopyWindowInfo( + (Quartz.kCGWindowListExcludeDesktopElements | Quartz.kCGWindowListOptionOnScreenOnly), + Quartz.kCGNullWindowID, ) if + win["kCGWindowLayer"] == 0 and win["kCGWindowOwnerName"] != "Window Server" + ]] + + for future in concurrent.futures.as_completed(futures): + xml_tree = future.result() + if xml_tree is not None: + xml_node.append(xml_tree) + + return jsonify({"AT": lxml.etree.tostring(xml_node, encoding="unicode")}) + else: return "Currently not implemented for platform {:}.".format(platform.platform()), 500 @app.route('/screen_size', methods=['POST']) def get_screen_size(): - if platform_name=="Linux": + if platform_name == "Linux": d = display.Display() screen_width = d.screen().width_in_pixels screen_height = d.screen().height_in_pixels - elif platform_name=="Windows": + elif platform_name == "Windows": user32 = ctypes.windll.user32 screen_width: int = user32.GetSystemMetrics(0) screen_height: int = user32.GetSystemMetrics(1) @@ -870,8 +1035,6 @@ def download_file(): data = request.json url = data.get('url', None) path = data.get('path', None) - print(url, path) - print("*" * 100) if not url or not path: return "Path or URL not supplied!", 400 @@ -1050,7 +1213,8 @@ def start_recording(): start_command = f"ffmpeg -y -f x11grab -draw_mouse 1 -s {screen_width}x{screen_height} -i :0.0 -c:v libx264 -r 30 {recording_path}" - recording_process = subprocess.Popen(shlex.split(start_command), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + recording_process = subprocess.Popen(shlex.split(start_command), stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) return jsonify({'status': 'success', 'message': 'Started recording.'}) diff --git a/desktop_env/server/requirements.txt b/desktop_env/server/requirements.txt index 1b5ecda..cfee21b 100644 --- a/desktop_env/server/requirements.txt +++ b/desktop_env/server/requirements.txt @@ -7,3 +7,4 @@ flask numpy lxml pygame +pywinauto