Improve efficiency of getting accessibility tree (#57)

* Initial commit

* Accelarate a11y tree acquisition

* Clean code

* Leave todos for Windows and macOS

* Prepare for support of win and macos

* Prepare for support of win and macos

* Minor fix

* add preliminary support for macos

* fix subtle bugs

* Clean the windows a11y tree getter code

* Clean the windows a11y tree getter code

* Intermediate version

* Intermediate version

* Update

* adding support for macos

* Delete dummy

* Delete dummy

* add bounding box for pruning

* Delete dummy

* FIX: enable a11y tree fetching on Windows

* Move the requirement depency item place

* FIX: remove "jsonify" from return value of get a11y tree

* FIX: change print into logger, decompose functions in _create_pywinauto_node

* Update

* remove redundant nodes on macos

* remove reliance of pywinauto

* Clean

* Fix bugs for pywinauto a11y_tree acquisition

* FEAT: only fetch active windows, skip repeated nodes

* CHORE: clean code, add comments on time-consuming part, add connection logger.info

* FEAT: 1. add attrs including class_name, id, count... 2. use multithread to accelerate

* FIX: add code of getting writable properties

* Clean, update the max_depth for macOS

* FIX: get all active windows

* Accelarate child nodes in macOS

* FEAT: get all active windows, add timing when testing

* CHORE: remove print

* Clean and finalize

* Clean and finalize

---------

Co-authored-by: Timothyxxx <384084775@qq.com>
Co-authored-by: Junli Wang <ltnsxdxd@gmail.com>
Co-authored-by: YangJL2003 <yangjl22@mails.tsinghua.edu.cn>
This commit is contained in:
HappySix
2024-07-30 14:07:29 +08:00
committed by GitHub
parent a156f8a3d6
commit a961d2276d
2 changed files with 467 additions and 302 deletions

View File

@@ -2,15 +2,18 @@ import ctypes
import os import os
import platform import platform
import shlex import shlex
import json
import subprocess, signal import subprocess, signal
from pathlib import Path from pathlib import Path
from typing import Any, Optional, Sequence from typing import Any, Optional, Sequence
from typing import List, Dict, Tuple from typing import List, Dict, Tuple, Literal
import concurrent.futures
import Xlib import Xlib
import lxml.etree import lxml.etree
import pyautogui import pyautogui
import requests import requests
import re
from PIL import Image from PIL import Image
from Xlib import display, X from Xlib import display, X
from flask import Flask, request, jsonify, send_file, abort # , send_from_directory from flask import Flask, request, jsonify, send_file, abort # , send_from_directory
@@ -18,23 +21,44 @@ from lxml.etree import _Element
platform_name: str = platform.system() platform_name: str = platform.system()
if platform_name=="Linux": if platform_name == "Linux":
import pyatspi import pyatspi
from pyatspi import Accessible, StateType, STATE_SHOWING from pyatspi import Accessible, StateType, STATE_SHOWING
from pyatspi import Action as ATAction from pyatspi import Action as ATAction
from pyatspi import Component #, Document from pyatspi import Component # , Document
from pyatspi import Text as ATText from pyatspi import Text as ATText
from pyatspi import Value as ATValue from pyatspi import Value as ATValue
BaseWrapper = Any BaseWrapper = Any
elif platform_name=="Windows":
elif platform_name == "Windows":
from pywinauto import Desktop from pywinauto import Desktop
from pywinauto.base_wrapper import BaseWrapper from pywinauto.base_wrapper import BaseWrapper
import pywinauto.application
Accessible = Any Accessible = Any
elif platform_name == "Darwin":
import plistlib
import AppKit
import ApplicationServices
import Foundation
import Quartz
import oa_atomacos
Accessible = Any
BaseWrapper = Any
else:
# Platform not supported
Accessible = None
BaseWrapper = Any
from pyxcursor import Xcursor from pyxcursor import Xcursor
# todo: need to reformat and organize this whole file
app = Flask(__name__) app = Flask(__name__)
pyautogui.PAUSE = 0 pyautogui.PAUSE = 0
@@ -63,7 +87,8 @@ def execute_command():
# Execute the command without any safety checks. # Execute the command without any safety checks.
try: try:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell, text=True, timeout=120) result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell, text=True,
timeout=120)
return jsonify({ return jsonify({
'status': 'success', 'status': 'success',
'output': result.stdout, 'output': result.stdout,
@@ -106,7 +131,7 @@ def launch_app():
try: try:
if 'google-chrome' in command and _get_machine_architecture() == 'arm': if 'google-chrome' in command and _get_machine_architecture() == 'arm':
index = command.index('google-chrome') index = command.index('google-chrome')
command[index] = 'chromium-browser' # arm64 chrome is not available yet, can only use chromium command[index] = 'chromium-browser' # arm64 chrome is not available yet, can only use chromium
subprocess.Popen(command, shell=shell) subprocess.Popen(command, shell=shell)
return "{:} launched successfully".format(command if shell else " ".join(command)) return "{:} launched successfully".format(command if shell else " ".join(command))
except Exception as e: except Exception as e:
@@ -192,397 +217,347 @@ def get_terminal_output():
return jsonify({"status": "error", "message": str(e)}), 500 return jsonify({"status": "error", "message": str(e)}), 500
_accessibility_ns_map = { "st": "uri:deskat:state.at-spi.gnome.org" _accessibility_ns_map = {
, "attr": "uri:deskat:attributes.at-spi.gnome.org" "ubuntu": {
, "cp": "uri:deskat:component.at-spi.gnome.org" "st": "https://accessibility.ubuntu.example.org/ns/state",
, "doc": "uri:deskat:document.at-spi.gnome.org" "attr": "https://accessibility.ubuntu.example.org/ns/attributes",
, "docattr": "uri:deskat:attributes.document.at-spi.gnome.org" "cp": "https://accessibility.ubuntu.example.org/ns/component",
, "txt": "uri:deskat:text.at-spi.gnome.org" "doc": "https://accessibility.ubuntu.example.org/ns/document",
, "val": "uri:deskat:value.at-spi.gnome.org" "docattr": "https://accessibility.ubuntu.example.org/ns/document/attributes",
, "act": "uri:deskat:action.at-spi.gnome.org" "txt": "https://accessibility.ubuntu.example.org/ns/text",
, "win": "uri:deskat:uia.windows.microsoft.org" "val": "https://accessibility.ubuntu.example.org/ns/value",
} "act": "https://accessibility.ubuntu.example.org/ns/action",
},
"windows": {
"st": "https://accessibility.windows.example.org/ns/state",
"attr": "https://accessibility.windows.example.org/ns/attributes",
"cp": "https://accessibility.windows.example.org/ns/component",
"doc": "https://accessibility.windows.example.org/ns/document",
"docattr": "https://accessibility.windows.example.org/ns/document/attributes",
"txt": "https://accessibility.windows.example.org/ns/text",
"val": "https://accessibility.windows.example.org/ns/value",
"act": "https://accessibility.windows.example.org/ns/action",
"class": "https://accessibility.windows.example.org/ns/class"
},
"macos": {
"st": "https://accessibility.macos.example.org/ns/state",
"attr": "https://accessibility.macos.example.org/ns/attributes",
"cp": "https://accessibility.macos.example.org/ns/component",
"doc": "https://accessibility.macos.example.org/ns/document",
"txt": "https://accessibility.macos.example.org/ns/text",
"val": "https://accessibility.macos.example.org/ns/value",
"act": "https://accessibility.macos.example.org/ns/action",
"role": "https://accessibility.macos.example.org/ns/role",
}
}
_accessibility_ns_map_ubuntu = _accessibility_ns_map['ubuntu']
_accessibility_ns_map_windows = _accessibility_ns_map['windows']
_accessibility_ns_map_macos = _accessibility_ns_map['macos']
# A11y tree getter for Ubuntu
libreoffice_version_tuple: Optional[Tuple[int, ...]] = None
MAX_DEPTH = 50
MAX_WIDTH = 1024
MAX_CALLS = 5000
def _get_libreoffice_version() -> Tuple[int, ...]:
"""Function to get the LibreOffice version as a tuple of integers."""
result = subprocess.run("libreoffice --version", shell=True, text=True, stdout=subprocess.PIPE)
version_str = result.stdout.split()[1] # Assuming version is the second word in the command output
return tuple(map(int, version_str.split(".")))
def _create_atspi_node(node: Accessible, depth: int = 0, flag: Optional[str] = None) -> _Element: def _create_atspi_node(node: Accessible, depth: int = 0, flag: Optional[str] = None) -> _Element:
# function _create_atspi_node {{{ # node_name = node.name
if node.getRoleName() == "document spreadsheet": attribute_dict: Dict[str, Any] = {"name": node_name}
flag = "calc"
if node.getRoleName() == "application" and node.name=="Thunderbird":
flag = "thunderbird"
attribute_dict: Dict[str, Any] = {"name": node.name} # States
# States {{{ #
states: List[StateType] = node.getState().get_states() states: List[StateType] = node.getState().get_states()
for st in states: for st in states:
state_name: str = StateType._enum_lookup[st] state_name: str = StateType._enum_lookup[st]
state_name: str = state_name.split("_", maxsplit=1)[1].lower() state_name: str = state_name.split("_", maxsplit=1)[1].lower()
if len(state_name) == 0: if len(state_name) == 0:
continue continue
attribute_dict[ attribute_dict["{{{:}}}{:}".format(_accessibility_ns_map_ubuntu["st"], state_name)] = "true"
"{{{:}}}{:}".format(_accessibility_ns_map["st"], state_name)] = "true"
# }}} States #
# Attributes {{{ # # Attributes
attributes: Dict[str, str] = node.get_attributes() attributes: Dict[str, str] = node.get_attributes()
for attribute_name, attribute_value in attributes.items(): for attribute_name, attribute_value in attributes.items():
if len(attribute_name) == 0: if len(attribute_name) == 0:
continue continue
attribute_dict["{{{:}}}{:}".format(_accessibility_ns_map["attr"], attribute_name)] = attribute_value attribute_dict["{{{:}}}{:}".format(_accessibility_ns_map_ubuntu["attr"], attribute_name)] = attribute_value
# }}} Attributes #
# Component {{{ # # Component
if attribute_dict.get("{{{:}}}visible".format(_accessibility_ns_map["st"]), "false") == "true"\ if attribute_dict.get("{{{:}}}visible".format(_accessibility_ns_map_ubuntu["st"]), "false") == "true" \
and attribute_dict.get("{{{:}}}showing".format(_accessibility_ns_map["st"]), "false") == "true": and attribute_dict.get("{{{:}}}showing".format(_accessibility_ns_map_ubuntu["st"]), "false") == "true":
try: try:
component: Component = node.queryComponent() component: Component = node.queryComponent()
except NotImplementedError: except NotImplementedError:
pass pass
else: else:
bbox: Sequence[int] = component.getExtents(pyatspi.XY_SCREEN) bbox: Sequence[int] = component.getExtents(pyatspi.XY_SCREEN)
attribute_dict["{{{:}}}screencoord".format(_accessibility_ns_map["cp"])] =\ attribute_dict["{{{:}}}screencoord".format(_accessibility_ns_map_ubuntu["cp"])] = \
str(tuple(bbox[0:2])) str(tuple(bbox[0:2]))
#attribute_dict["{{{:}}}screencoord".format(_accessibility_ns_map["cp"])] = str( attribute_dict["{{{:}}}size".format(_accessibility_ns_map_ubuntu["cp"])] = str(tuple(bbox[2:]))
#component.getPosition(pyatspi.XY_SCREEN))
#attribute_dict["{{{:}}}windowcoord".format(_accessibility_ns_map["cp"])] = str(
#component.getPosition(pyatspi.XY_WINDOW))
#attribute_dict["{{{:}}}parentcoord".format(_accessibility_ns_map["cp"])] = str(
#component.getPosition(pyatspi.XY_PARENT))
attribute_dict["{{{:}}}size".format(_accessibility_ns_map["cp"])] = str(tuple(bbox[2:]))
# }}} Component #
# Document {{{ # text = ""
#try: # Text
#document: Document = node.queryDocument()
#except NotImplementedError:
#pass
#else:
#attribute_dict["{{{:}}}locale".format(_accessibility_ns_map["doc"])] = document.getLocale()
#attribute_dict["{{{:}}}pagecount".format(_accessibility_ns_map["doc"])] = str(document.getPageCount())
#attribute_dict["{{{:}}}currentpage".format(_accessibility_ns_map["doc"])] = str(document.getCurrentPageNumber())
#for attrbt in document.getAttributes():
#attribute_name: str
#attribute_value: str
#attribute_name, attribute_value = attrbt.split(":", maxsplit=1)
#if len(attribute_name) == 0:
#continue
#attribute_dict["{{{:}}}{:}".format(_accessibility_ns_map["docattr"], attribute_name)] = attribute_value
# }}} Document #
# Text {{{ #
try: try:
text_obj: ATText = node.queryText() text_obj: ATText = node.queryText()
except NotImplementedError:
pass
else:
# only text shown on current screen is available # only text shown on current screen is available
# attribute_dict["txt:text"] = text_obj.getText(0, text_obj.characterCount) # attribute_dict["txt:text"] = text_obj.getText(0, text_obj.characterCount)
text: str = text_obj.getText(0, text_obj.characterCount) text: str = text_obj.getText(0, text_obj.characterCount)
#if flag=="thunderbird": # if flag=="thunderbird":
# appeard in thunderbird (uFFFC) (not only in thunderbird), "Object # appeared in thunderbird (uFFFC) (not only in thunderbird), "Object
# Replacement Character" in Unicode, "used as placeholder in text for # Replacement Character" in Unicode, "used as placeholder in text for
# an otherwise unspecified object; uFFFD is another "Replacement # an otherwise unspecified object; uFFFD is another "Replacement
# Character", just in case # Character", just in case
text = text.replace("\ufffc", "").replace("\ufffd", "") text = text.replace("\ufffc", "").replace("\ufffd", "")
# }}} Text # except NotImplementedError:
pass
# Image {{{ # # Image, Selection, Value, Action
try: try:
node.queryImage() node.queryImage()
attribute_dict["image"] = "true"
except NotImplementedError: except NotImplementedError:
pass pass
else:
attribute_dict["image"] = "true"
# }}} Image #
# Selection {{{ #
try: try:
node.querySelection() node.querySelection()
attribute_dict["selection"] = "true"
except NotImplementedError: except NotImplementedError:
pass pass
else:
attribute_dict["selection"] = "true"
# }}} Selection #
# Value {{{ #
try: try:
value: ATValue = node.queryValue() value: ATValue = node.queryValue()
value_key = f"{{{_accessibility_ns_map_ubuntu['val']}}}"
for attr_name, attr_func in [
("value", lambda: value.currentValue),
("min", lambda: value.minimumValue),
("max", lambda: value.maximumValue),
("step", lambda: value.minimumIncrement)
]:
try:
attribute_dict[f"{value_key}{attr_name}"] = str(attr_func())
except:
pass
except NotImplementedError: except NotImplementedError:
pass pass
else:
try:
attribute_dict["{{{:}}}value".format(_accessibility_ns_map["val"])] = str(value.currentValue)
except:
pass
try:
attribute_dict["{{{:}}}min".format(_accessibility_ns_map["val"])] = str(value.minimumValue)
except:
pass
try:
attribute_dict["{{{:}}}max".format(_accessibility_ns_map["val"])] = str(value.maximumValue)
except:
pass
try:
attribute_dict["{{{:}}}step".format(_accessibility_ns_map["val"])] = str(value.minimumIncrement)
except:
pass
# }}} Value #
# Action {{{ #
try: try:
action: ATAction = node.queryAction() action: ATAction = node.queryAction()
except NotImplementedError:
pass
else:
for i in range(action.nActions): for i in range(action.nActions):
action_name: str = action.getName(i).replace(" ", "-") action_name: str = action.getName(i).replace(" ", "-")
attribute_dict["{{{:}}}{:}_desc" \ attribute_dict[
.format(_accessibility_ns_map["act"] "{{{:}}}{:}_desc".format(_accessibility_ns_map_ubuntu["act"], action_name)] = action.getDescription(
, action_name i)
) attribute_dict[
] = action.getDescription(i) "{{{:}}}{:}_kb".format(_accessibility_ns_map_ubuntu["act"], action_name)] = action.getKeyBinding(i)
attribute_dict["{{{:}}}{:}_kb" \ except NotImplementedError:
.format(_accessibility_ns_map["act"] pass
, action_name
) # Add from here if we need more attributes in the future...
] = action.getKeyBinding(i)
# }}} Action #
raw_role_name: str = node.getRoleName().strip() raw_role_name: str = node.getRoleName().strip()
node_role_name = (raw_role_name or "unknown").replace(" ", "-") node_role_name = (raw_role_name or "unknown").replace(" ", "-")
if not flag:
if raw_role_name == "document spreadsheet":
flag = "calc"
if raw_role_name == "application" and node.name == "Thunderbird":
flag = "thunderbird"
xml_node = lxml.etree.Element( xml_node = lxml.etree.Element(
node_role_name, node_role_name,
attrib=attribute_dict, attrib=attribute_dict,
nsmap=_accessibility_ns_map nsmap=_accessibility_ns_map_ubuntu
) )
if "text" in locals() and len(text) > 0:
if len(text) > 0:
xml_node.text = text xml_node.text = text
# HYPERPARAMETER if depth == MAX_DEPTH:
if depth==50:
logger.warning("Max depth reached") logger.warning("Max depth reached")
return xml_node return xml_node
if flag=="calc" and node_role_name=="table": if flag == "calc" and node_role_name == "table":
# Maximum column: 1024 if ver<=7.3 else 16384 # Maximum column: 1024 if ver<=7.3 else 16384
# Maximum row: 104 8576 # Maximum row: 104 8576
# Maximun sheet: 1 0000 # Maximun sheet: 1 0000
version_str: str = subprocess.run("libreoffice --version", shell=True, text=True, stdout=subprocess.PIPE).stdout global libreoffice_version_tuple
version_str = version_str.split()[1] MAXIMUN_COLUMN = 1024 if libreoffice_version_tuple < (7, 4) else 16384
version_tuple: Tuple[int] = tuple(map(int, version_str.split(".")))
MAXIMUN_COLUMN = 1024 if version_tuple<(7, 4) else 16384
MAX_ROW = 104_8576 MAX_ROW = 104_8576
index_base = 0 index_base = 0
first_showing = False first_showing = False
column_base = None column_base = None
for r in range(MAX_ROW): for r in range(MAX_ROW):
#logger.warning(r)
for clm in range(column_base or 0, MAXIMUN_COLUMN): for clm in range(column_base or 0, MAXIMUN_COLUMN):
child_node: Accessible = node[index_base+clm] child_node: Accessible = node[index_base + clm]
showing: bool = child_node.getState().contains(STATE_SHOWING) showing: bool = child_node.getState().contains(STATE_SHOWING)
if showing: if showing:
child_node: _Element = _create_atspi_node(child_node, depth+1, flag) child_node: _Element = _create_atspi_node(child_node, depth + 1, flag)
if not first_showing: if not first_showing:
column_base = clm column_base = clm
first_showing = True first_showing = True
xml_node.append(child_node) xml_node.append(child_node)
elif first_showing and column_base is not None or clm>=500: elif first_showing and column_base is not None or clm >= 500:
break break
if first_showing and clm==column_base or not first_showing and r>=500: if first_showing and clm == column_base or not first_showing and r >= 500:
break break
index_base += MAXIMUN_COLUMN index_base += MAXIMUN_COLUMN
return xml_node return xml_node
else: else:
try: try:
for i, ch in enumerate(node): for i, ch in enumerate(node):
# HYPERPARAMETER if i == MAX_WIDTH:
if i>=1025:
logger.warning("Max width reached") logger.warning("Max width reached")
break break
xml_node.append(_create_atspi_node(ch, depth+1, flag)) xml_node.append(_create_atspi_node(ch, depth + 1, flag))
except: except:
logger.warning("Error occurred during children traversing. Has Ignored. Node: %s", lxml.etree.tostring(xml_node, encoding="unicode")) logger.warning("Error occurred during children traversing. Has Ignored. Node: %s",
lxml.etree.tostring(xml_node, encoding="unicode"))
return xml_node return xml_node
# }}} function _create_atspi_node #
def _create_pywinauto_node(node: BaseWrapper, depth: int = 0, flag: Optional[str] = None) -> _Element:
# function _create_pywinauto_node {{{ # # A11y tree getter for Windows
#element_info: ElementInfo = node.element_info def _create_pywinauto_node(node, nodes, depth: int = 0, flag: Optional[str] = None) -> _Element:
nodes = nodes or set()
if node in nodes:
return
nodes.add(node)
attribute_dict: Dict[str, Any] = {"name": node.element_info.name} attribute_dict: Dict[str, Any] = {"name": node.element_info.name}
# States {{{ # base_properties = {}
try: try:
attribute_dict["{{{:}}}enabled".format(_accessibility_ns_map["st"])] = str(node.is_enabled()).lower() base_properties.update(
node.get_properties()) # get all writable/not writable properties, but have bugs when landing on chrome and it's slower!
except: except:
pass logger.debug("Failed to call get_properties(), trying to get writable properites")
try:
_element_class = node.__class__
class TempElement(node.__class__):
writable_props = pywinauto.base_wrapper.BaseWrapper.writable_props
# Instantiate the subclass
node.__class__ = TempElement
# Retrieve properties using get_properties()
properties = node.get_properties()
node.__class__ = _element_class
base_properties.update(properties) # only get all writable properties
logger.debug("get writable properties")
except Exception as e:
logger.error(e)
pass
# Count-cnt
for attr_name in ["control_count", "button_count", "item_count", "column_count"]:
try:
attribute_dict[f"{{{_accessibility_ns_map_windows['cnt']}}}{attr_name}"] = base_properties[
attr_name].lower()
except:
pass
# Columns-cols
try: try:
attribute_dict["{{{:}}}visible".format(_accessibility_ns_map["st"])] = str(node.is_visible()).lower() attribute_dict[f"{{{_accessibility_ns_map_windows['cols']}}}columns"] = base_properties["columns"].lower()
except:
pass
try:
attribute_dict["{{{:}}}active".format(_accessibility_ns_map["st"])] = str(node.is_active()).lower()
except: except:
pass pass
if hasattr(node, "is_minimized"): # Id-id
for attr_name in ["control_id", "automation_id", "window_id"]:
try: try:
attribute_dict["{{{:}}}minimized".format(_accessibility_ns_map["st"])] = str(node.is_minimized()).lower() attribute_dict[f"{{{_accessibility_ns_map_windows['id']}}}{attr_name}"] = base_properties[attr_name].lower()
except:
pass
if hasattr(node, "is_maximized"):
try:
attribute_dict["{{{:}}}maximized".format(_accessibility_ns_map["st"])] = str(node.is_maximized()).lower()
except:
pass
if hasattr(node, "is_normal"):
try:
attribute_dict["{{{:}}}normal".format(_accessibility_ns_map["st"])] = str(node.is_normal()).lower()
except: except:
pass pass
if hasattr(node, "is_unicode"): # States
# 19 sec out of 20
for attr_name, attr_func in [
("enabled", lambda: node.is_enabled()),
("visible", lambda: node.is_visible()),
# ("active", lambda: node.is_active()), # occupied most of the time: 20s out of 21s for slack, 51.5s out of 54s for WeChat # maybe use for cutting branches
("minimized", lambda: node.is_minimized()),
("maximized", lambda: node.is_maximized()),
("normal", lambda: node.is_normal()),
("unicode", lambda: node.is_unicode()),
("collapsed", lambda: node.is_collapsed()),
("checkable", lambda: node.is_checkable()),
("checked", lambda: node.is_checked()),
("focused", lambda: node.is_focused()),
("keyboard_focused", lambda: node.is_keyboard_focused()),
("selected", lambda: node.is_selected()),
("selection_required", lambda: node.is_selection_required()),
("pressable", lambda: node.is_pressable()),
("pressed", lambda: node.is_pressed()),
("expanded", lambda: node.is_expanded()),
("editable", lambda: node.is_editable()),
("has_keyboard_focus", lambda: node.has_keyboard_focus()),
("is_keyboard_focusable", lambda: node.is_keyboard_focusable()),
]:
try: try:
attribute_dict["{{{:}}}unicode".format(_accessibility_ns_map["st"])] = str(node.is_unicode()).lower() attribute_dict[f"{{{_accessibility_ns_map_windows['st']}}}{attr_name}"] = str(attr_func()).lower()
except: except:
pass pass
if hasattr(node, "is_collapsed"): # Component
try: try:
attribute_dict["{{{:}}}collapsed".format(_accessibility_ns_map["st"])] = str(node.is_collapsed()).lower() rectangle = node.rectangle()
except: attribute_dict["{{{:}}}screencoord".format(_accessibility_ns_map_windows["cp"])] = \
pass "({:d}, {:d})".format(rectangle.left, rectangle.top)
if hasattr(node, "is_checkable"): attribute_dict["{{{:}}}size".format(_accessibility_ns_map_windows["cp"])] = \
try: "({:d}, {:d})".format(rectangle.width(), rectangle.height())
attribute_dict["{{{:}}}checkable".format(_accessibility_ns_map["st"])] = str(node.is_checkable()).lower()
except:
pass
if hasattr(node, "is_checked"):
try:
attribute_dict["{{{:}}}checked".format(_accessibility_ns_map["st"])] = str(node.is_checked()).lower()
except:
pass
if hasattr(node, "is_focused"):
try:
attribute_dict["{{{:}}}focused".format(_accessibility_ns_map["st"])] = str(node.is_focused()).lower()
except:
pass
if hasattr(node, "is_keyboard_focused"):
try:
attribute_dict["{{{:}}}keyboard_focused".format(_accessibility_ns_map["st"])] = str(node.is_keyboard_focused()).lower()
except:
pass
if hasattr(node, "is_selected"):
try:
attribute_dict["{{{:}}}selected".format(_accessibility_ns_map["st"])] = str(node.is_selected()).lower()
except:
pass
if hasattr(node, "is_selection_required"):
try:
attribute_dict["{{{:}}}selection_required".format(_accessibility_ns_map["st"])] = str(node.is_selection_required()).lower()
except:
pass
if hasattr(node, "is_pressable"):
try:
attribute_dict["{{{:}}}pressable".format(_accessibility_ns_map["st"])] = str(node.is_pressable()).lower()
except:
pass
if hasattr(node, "is_pressed"):
try:
attribute_dict["{{{:}}}pressed".format(_accessibility_ns_map["st"])] = str(node.is_pressed()).lower()
except:
pass
if hasattr(node, "is_expanded"): except Exception as e:
try: logger.error("Error accessing rectangle: ", e)
attribute_dict["{{{:}}}expanded".format(_accessibility_ns_map["st"])] = str(node.is_expanded()).lower()
except:
pass
if hasattr(node, "is_editable"):
try:
attribute_dict["{{{:}}}editable".format(_accessibility_ns_map["st"])] = str(node.is_editable()).lower()
except:
pass
# }}} States #
# Component {{{ # # Text
rectangle = node.rectangle()
attribute_dict["{{{:}}}screencoord".format(_accessibility_ns_map["cp"])] = "({:d}, {:d})".format(rectangle.left, rectangle.top)
attribute_dict["{{{:}}}size".format(_accessibility_ns_map["cp"])] = "({:d}, {:d})".format(rectangle.width(), rectangle.height())
# }}} Component #
# Text {{{ #
text: str = node.window_text() text: str = node.window_text()
if text==attribute_dict["name"]: if text == attribute_dict["name"]:
text = "" text = ""
#if hasattr(node, "texts"):
#texts: List[str] = node.texts()[1:]
#texts: Iterable[str] = map(lambda itm: itm if isinstance(itm, str) else "".join(itm), texts)
#text += "\n".join(texts)
#text = text.strip()
# }}} Text #
# Selection {{{ # # Selection
if hasattr(node, "select"): if hasattr(node, "select"):
attribute_dict["selection"] = "true" attribute_dict["selection"] = "true"
# }}} Selection #
# Value {{{ # # Value
if hasattr(node, "get_step"): for attr_name, attr_funcs in [
try: ("step", [lambda: node.get_step()]),
attribute_dict["{{{:}}}step".format(_accessibility_ns_map["val"])] = str(node.get_step()) ("value", [lambda: node.value(), lambda: node.get_value(), lambda: node.get_position()]),
except: ("min", [lambda: node.min_value(), lambda: node.get_range_min()]),
pass ("max", [lambda: node.max_value(), lambda: node.get_range_max()])
if hasattr(node, "value"): ]:
try: for attr_func in attr_funcs:
attribute_dict["{{{:}}}value".format(_accessibility_ns_map["val"])] = str(node.value()) if hasattr(node, attr_func.__name__):
except: try:
pass attribute_dict[f"{{{_accessibility_ns_map_windows['val']}}}{attr_name}"] = str(attr_func())
if hasattr(node, "get_value"): break # exit once the attribute is set successfully
try: except:
attribute_dict["{{{:}}}value".format(_accessibility_ns_map["val"])] = str(node.get_value()) pass
except:
pass
elif hasattr(node, "get_position"):
try:
attribute_dict["{{{:}}}value".format(_accessibility_ns_map["val"])] = str(node.get_position())
except:
pass
if hasattr(node, "min_value"):
try:
attribute_dict["{{{:}}}min".format(_accessibility_ns_map["val"])] = str(node.min_value())
except:
pass
elif hasattr(node, "get_range_min"):
try:
attribute_dict["{{{:}}}min".format(_accessibility_ns_map["val"])] = str(node.get_range_min())
except:
pass
if hasattr(node, "max_value"):
try:
attribute_dict["{{{:}}}max".format(_accessibility_ns_map["val"])] = str(node.max_value())
except:
pass
elif hasattr(node, "get_range_max"):
try:
attribute_dict["{{{:}}}max".format(_accessibility_ns_map["val"])] = str(node.get_range_max())
except:
pass
# }}} Value #
attribute_dict["{{{:}}}class".format(_accessibility_ns_map["win"])] = str(type(node)) attribute_dict["{{{:}}}class".format(_accessibility_ns_map_windows["class"])] = str(type(node))
# class_name
for attr_name in ["class_name", "friendly_class_name"]:
try:
attribute_dict[f"{{{_accessibility_ns_map_windows['class']}}}{attr_name}"] = base_properties[
attr_name].lower()
except:
pass
node_role_name: str = node.class_name().lower().replace(" ", "-") node_role_name: str = node.class_name().lower().replace(" ", "-")
node_role_name = "".join( map( lambda ch: ch if ch.isidentifier()\ node_role_name = "".join(
or ch in {"-"}\ map(lambda _ch: _ch if _ch.isidentifier() or _ch in {"-"} or _ch.isalnum() else "-", node_role_name))
or ch.isalnum()
else "-"
, node_role_name
)
)
if node_role_name.strip() == "": if node_role_name.strip() == "":
node_role_name = "unknown" node_role_name = "unknown"
if not node_role_name[0].isalpha(): if not node_role_name[0].isalpha():
@@ -591,26 +566,185 @@ def _create_pywinauto_node(node: BaseWrapper, depth: int = 0, flag: Optional[str
xml_node = lxml.etree.Element( xml_node = lxml.etree.Element(
node_role_name, node_role_name,
attrib=attribute_dict, attrib=attribute_dict,
nsmap=_accessibility_ns_map nsmap=_accessibility_ns_map_windows
) )
if text is not None and len(text)>0 and text!=attribute_dict["name"]:
if text is not None and len(text) > 0 and text != attribute_dict["name"]:
xml_node.text = text xml_node.text = text
# HYPERPARAMETER if depth == MAX_DEPTH:
if depth==50:
logger.warning("Max depth reached") logger.warning("Max depth reached")
#print("Max depth reached")
return xml_node return xml_node
for i, ch in enumerate(node.children()): # use multi thread to accelerate children fetching
# HYPERPARAMETER children = node.children()
if i>=2048: if children:
logger.warning("Max width reached") with concurrent.futures.ThreadPoolExecutor() as executor:
#print("Max width reached") future_to_child = [executor.submit(_create_pywinauto_node, ch, nodes, depth + 1, flag) for ch in
break children[:MAX_WIDTH]]
xml_node.append(_create_pywinauto_node(ch, depth+1, flag)) try:
xml_node.extend([future.result() for future in concurrent.futures.as_completed(future_to_child)])
except Exception as e:
logger.error(f"Exception occurred: {e}")
return xml_node return xml_node
# }}} function _create_pywinauto_node #
# A11y tree getter for macOS
def _create_axui_node(node, nodes: set = None, depth: int = 0, bbox: tuple = None):
nodes = nodes or set()
if node in nodes:
return
nodes.add(node)
reserved_keys = {
"AXEnabled": "st",
"AXFocused": "st",
"AXFullScreen": "st",
"AXTitle": "attr",
"AXChildrenInNavigationOrder": "attr",
"AXChildren": "attr",
"AXFrame": "attr",
"AXRole": "role",
"AXHelp": "attr",
"AXRoleDescription": "role",
"AXSubrole": "role",
"AXURL": "attr",
"AXValue": "val",
"AXDescription": "attr",
"AXDOMIdentifier": "attr",
"AXSelected": "st",
"AXInvalid": "st",
"AXRows": "attr",
"AXColumns": "attr",
}
attribute_dict = {}
if depth == 0:
bbox = (
node["kCGWindowBounds"]["X"],
node["kCGWindowBounds"]["Y"],
node["kCGWindowBounds"]["X"] + node["kCGWindowBounds"]["Width"],
node["kCGWindowBounds"]["Y"] + node["kCGWindowBounds"]["Height"]
)
app_ref = ApplicationServices.AXUIElementCreateApplication(node["kCGWindowOwnerPID"])
error_code, app_wins_ref = ApplicationServices.AXUIElementCopyAttributeValue(app_ref, "AXWindows", None)
if error_code:
logger.error("MacOS parsing %s encountered Error code: %d", app_ref, error_code)
attribute_dict["name"] = node["kCGWindowOwnerName"]
node = app_wins_ref[0]
error_code, attr_names = ApplicationServices.AXUIElementCopyAttributeNames(node, None)
if error_code:
# -25202: AXError.invalidUIElement
# The accessibility object received in this event is invalid.
return
value = None
if "AXFrame" in attr_names:
error_code, attr_val = ApplicationServices.AXUIElementCopyAttributeValue(node, "AXFrame", None)
rep = repr(attr_val)
x_value = re.search(r"x:(-?[\d.]+)", rep)
y_value = re.search(r"y:(-?[\d.]+)", rep)
w_value = re.search(r"w:(-?[\d.]+)", rep)
h_value = re.search(r"h:(-?[\d.]+)", rep)
type_value = re.search(r"type\s?=\s?(\w+)", rep)
value = {
"x": float(x_value.group(1)) if x_value else None,
"y": float(y_value.group(1)) if y_value else None,
"w": float(w_value.group(1)) if w_value else None,
"h": float(h_value.group(1)) if h_value else None,
"type": type_value.group(1) if type_value else None,
}
if not any(v is None for v in value.values()):
x_min = max(bbox[0], value["x"])
x_max = min(bbox[2], value["x"] + value["w"])
y_min = max(bbox[1], value["y"])
y_max = min(bbox[3], value["y"] + value["h"])
if x_min > x_max or y_min > y_max:
# No intersection
return
role = None
text = None
for attr_name, ns_key in reserved_keys.items():
if attr_name not in attr_names:
continue
if value and attr_name == "AXFrame":
bb = value
if not any(v is None for v in bb.values()):
attribute_dict["{{{:}}}screencoord".format(_accessibility_ns_map_macos["cp"])] = \
"({:d}, {:d})".format(int(bb["x"]), int(bb["y"]))
attribute_dict["{{{:}}}size".format(_accessibility_ns_map_macos["cp"])] = \
"({:d}, {:d})".format(int(bb["w"]), int(bb["h"]))
continue
error_code, attr_val = ApplicationServices.AXUIElementCopyAttributeValue(node, attr_name, None)
full_attr_name = f"{{{_accessibility_ns_map_macos[ns_key]}}}{attr_name}"
if attr_name == "AXValue" and not text:
text = str(attr_val)
continue
if attr_name == "AXRoleDescription":
role = attr_val
continue
# Set the attribute_dict
if not (isinstance(attr_val, ApplicationServices.AXUIElementRef)
or isinstance(attr_val, (AppKit.NSArray, list))):
if attr_val is not None:
attribute_dict[full_attr_name] = str(attr_val)
node_role_name = role.lower().replace(" ", "_") if role else "unknown_role"
xml_node = lxml.etree.Element(
node_role_name,
attrib=attribute_dict,
nsmap=_accessibility_ns_map_macos
)
if text is not None and len(text) > 0:
xml_node.text = text
if depth == MAX_DEPTH:
logger.warning("Max depth reached")
return xml_node
future_to_child = []
with concurrent.futures.ThreadPoolExecutor() as executor:
for attr_name, ns_key in reserved_keys.items():
if attr_name not in attr_names:
continue
error_code, attr_val = ApplicationServices.AXUIElementCopyAttributeValue(node, attr_name, None)
if isinstance(attr_val, ApplicationServices.AXUIElementRef):
future_to_child.append(executor.submit(_create_axui_node, attr_val, nodes, depth + 1, bbox))
elif isinstance(attr_val, (AppKit.NSArray, list)):
for child in attr_val:
future_to_child.append(executor.submit(_create_axui_node, child, nodes, depth + 1, bbox))
try:
for future in concurrent.futures.as_completed(future_to_child):
result = future.result()
if result is not None:
xml_node.append(result)
except Exception as e:
logger.error(f"Exception occurred: {e}")
return xml_node
@app.route("/accessibility", methods=["GET"]) @app.route("/accessibility", methods=["GET"])
def get_accessibility_tree(): def get_accessibility_tree():
@@ -618,30 +752,61 @@ def get_accessibility_tree():
# AT-SPI works for KDE as well # AT-SPI works for KDE as well
if os_name == "Linux": if os_name == "Linux":
global libreoffice_version_tuple
libreoffice_version_tuple = _get_libreoffice_version()
desktop: Accessible = pyatspi.Registry.getDesktop(0) desktop: Accessible = pyatspi.Registry.getDesktop(0)
desktop_xml: _Element = _create_atspi_node(desktop, 0) xml_node = lxml.etree.Element("desktop-frame", nsmap=_accessibility_ns_map_ubuntu)
return jsonify({"AT": lxml.etree.tostring(desktop_xml, encoding="unicode")}) with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(_create_atspi_node, app_node, 1) for app_node in desktop]
for future in concurrent.futures.as_completed(futures):
xml_tree = future.result()
xml_node.append(xml_tree)
return jsonify({"AT": lxml.etree.tostring(xml_node, encoding="unicode")})
elif os_name == "Windows": elif os_name == "Windows":
# Windows AT may be read through `pywinauto` module, however, two different backends `win32` and `uia` are supported and different results may be returned # Attention: Windows a11y tree is implemented to be read through `pywinauto` module, however,
# two different backends `win32` and `uia` are supported and different results may be returned
desktop: Desktop = Desktop(backend="uia") desktop: Desktop = Desktop(backend="uia")
xml_node = lxml.etree.Element("desktop", nsmap=_accessibility_ns_map) xml_node = lxml.etree.Element("desktop", nsmap=_accessibility_ns_map_windows)
for wnd in desktop.windows(): with concurrent.futures.ThreadPoolExecutor() as executor:
logger.debug("Win UIA AT parsing: %s(%d)", wnd.element_info.name, len(wnd.children())) futures = [executor.submit(_create_pywinauto_node, wnd, {}, 1) for wnd in desktop.windows()]
node: _Element = _create_pywinauto_node(wnd, 1) for future in concurrent.futures.as_completed(futures):
xml_node.append(node) xml_tree = future.result()
xml_node.append(xml_tree)
return jsonify({"AT": lxml.etree.tostring(xml_node, encoding="unicode")}) return jsonify({"AT": lxml.etree.tostring(xml_node, encoding="unicode")})
elif os_name == "Darwin":
xml_node = lxml.etree.Element("desktop", nsmap=_accessibility_ns_map_macos)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(_create_axui_node, wnd, None, 0) for wnd in
[win for win in
Quartz.CGWindowListCopyWindowInfo(
(Quartz.kCGWindowListExcludeDesktopElements | Quartz.kCGWindowListOptionOnScreenOnly),
Quartz.kCGNullWindowID, ) if
win["kCGWindowLayer"] == 0 and win["kCGWindowOwnerName"] != "Window Server"
]]
for future in concurrent.futures.as_completed(futures):
xml_tree = future.result()
if xml_tree is not None:
xml_node.append(xml_tree)
return jsonify({"AT": lxml.etree.tostring(xml_node, encoding="unicode")})
else: else:
return "Currently not implemented for platform {:}.".format(platform.platform()), 500 return "Currently not implemented for platform {:}.".format(platform.platform()), 500
@app.route('/screen_size', methods=['POST']) @app.route('/screen_size', methods=['POST'])
def get_screen_size(): def get_screen_size():
if platform_name=="Linux": if platform_name == "Linux":
d = display.Display() d = display.Display()
screen_width = d.screen().width_in_pixels screen_width = d.screen().width_in_pixels
screen_height = d.screen().height_in_pixels screen_height = d.screen().height_in_pixels
elif platform_name=="Windows": elif platform_name == "Windows":
user32 = ctypes.windll.user32 user32 = ctypes.windll.user32
screen_width: int = user32.GetSystemMetrics(0) screen_width: int = user32.GetSystemMetrics(0)
screen_height: int = user32.GetSystemMetrics(1) screen_height: int = user32.GetSystemMetrics(1)
@@ -870,8 +1035,6 @@ def download_file():
data = request.json data = request.json
url = data.get('url', None) url = data.get('url', None)
path = data.get('path', None) path = data.get('path', None)
print(url, path)
print("*" * 100)
if not url or not path: if not url or not path:
return "Path or URL not supplied!", 400 return "Path or URL not supplied!", 400
@@ -1050,7 +1213,8 @@ def start_recording():
start_command = f"ffmpeg -y -f x11grab -draw_mouse 1 -s {screen_width}x{screen_height} -i :0.0 -c:v libx264 -r 30 {recording_path}" start_command = f"ffmpeg -y -f x11grab -draw_mouse 1 -s {screen_width}x{screen_height} -i :0.0 -c:v libx264 -r 30 {recording_path}"
recording_process = subprocess.Popen(shlex.split(start_command), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) recording_process = subprocess.Popen(shlex.split(start_command), stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
return jsonify({'status': 'success', 'message': 'Started recording.'}) return jsonify({'status': 'success', 'message': 'Started recording.'})

View File

@@ -7,3 +7,4 @@ flask
numpy numpy
lxml lxml
pygame pygame
pywinauto