Add AutoGLM-OS agent (#309)
* autoglm-os initialize * clean code * chore: use proxy for download setup * feat(autoglm-os): add parameter to toggle images * fix: use temporary directory for files pulled from the vm to prevent potential collision when running multiple instances of the same task in parallel * update * add client_password * update multienv * fix * fix prompt * fix prompt * fix prompt * fix sys prompt * feat: use proxy in file evaluator * fix client_password * fix note_prompt * fix autoglm agent cmd type * fix * revert: fix: use temporary directory for files pulled from the vm to prevent potential collision when running multiple instances of the same task in parallel reverts commit bab5473eea1de0e61b0e1d68b23ce324a5b0ee57 * feat(autoglm): setup tools * fix(autoglm): remove second time of get a11y tree * add osworld server restart * Revert "add osworld server restart" This reverts commit 7bd9d84122e246ce2a26de0e49c25494244c2b3d. * fix _launch_setup * fix autoglm agent tools & xml tree * fix desktop_env * fix bug for tool name capitalization * fix: always use proxy for setup download * add fail after exceeding max turns * fix(autoglm): avoid adding image to message when screenshot is empty * fix maximize_window * fix maximize_window * fix maximize_window * fix import browsertools module bug * fix task proxy config bug * restore setup * refactor desktop env * restore image in provider * restore file.py * refactor desktop_env * quick fix * refactor desktop_env.step * fix our env reset * add max truns constraint * clean run script * clean lib_run_single.py --------- Co-authored-by: hanyullai <hanyullai@outlook.com> Co-authored-by: JingBh <jingbohao@yeah.net>
This commit is contained in:
committed by
GitHub
parent
c833d03a4b
commit
aa05f6cc26
329
mm_agents/autoglm/prompt/accessibility_tree_handle.py
Normal file
329
mm_agents/autoglm/prompt/accessibility_tree_handle.py
Normal file
@@ -0,0 +1,329 @@
|
||||
import io
|
||||
import re
|
||||
import xml.etree.ElementTree as ET
|
||||
from typing import List, Tuple
|
||||
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
|
||||
from .deduplicate_node import filter_similar_nodes
|
||||
|
||||
attributes_ns_ubuntu = "https://accessibility.windows.example.org/ns/attributes"
|
||||
attributes_ns_windows = "https://accessibility.windows.example.org/ns/attributes"
|
||||
state_ns_ubuntu = "https://accessibility.ubuntu.example.org/ns/state"
|
||||
state_ns_windows = "https://accessibility.windows.example.org/ns/state"
|
||||
component_ns_ubuntu = "https://accessibility.ubuntu.example.org/ns/component"
|
||||
component_ns_windows = "https://accessibility.windows.example.org/ns/component"
|
||||
value_ns_ubuntu = "https://accessibility.ubuntu.example.org/ns/value"
|
||||
value_ns_windows = "https://accessibility.windows.example.org/ns/value"
|
||||
class_ns_windows = "https://accessibility.windows.example.org/ns/class"
|
||||
|
||||
|
||||
def find_leaf_nodes(xlm_file_str):
|
||||
if not xlm_file_str:
|
||||
return []
|
||||
|
||||
root = ET.fromstring(xlm_file_str)
|
||||
|
||||
# Recursive function to traverse the XML tree and collect leaf nodes
|
||||
def collect_leaf_nodes(node, leaf_nodes):
|
||||
# If the node has no children, it is a leaf node, add it to the list
|
||||
if not list(node):
|
||||
leaf_nodes.append(node)
|
||||
# If the node has children, recurse on each child
|
||||
for child in node:
|
||||
collect_leaf_nodes(child, leaf_nodes)
|
||||
|
||||
# List to hold all leaf nodes
|
||||
leaf_nodes = []
|
||||
collect_leaf_nodes(root, leaf_nodes)
|
||||
return leaf_nodes
|
||||
|
||||
|
||||
def judge_node(node: ET, platform="Ubuntu", check_image=False) -> bool:
|
||||
if platform == "Ubuntu":
|
||||
_state_ns = state_ns_ubuntu
|
||||
_component_ns = component_ns_ubuntu
|
||||
elif platform == "Windows":
|
||||
_state_ns = state_ns_windows
|
||||
_component_ns = component_ns_windows
|
||||
else:
|
||||
raise ValueError("Invalid platform, must be 'Ubuntu' or 'Windows'")
|
||||
|
||||
keeps: bool = (
|
||||
node.tag.startswith("document")
|
||||
or node.tag.endswith("item")
|
||||
or node.tag.endswith("button")
|
||||
or node.tag.endswith("heading")
|
||||
or node.tag.endswith("label")
|
||||
or node.tag.endswith("scrollbar")
|
||||
or node.tag.endswith("searchbox")
|
||||
or node.tag.endswith("textbox")
|
||||
or node.tag.endswith("link")
|
||||
or node.tag.endswith("tabelement")
|
||||
or node.tag.endswith("textfield")
|
||||
or node.tag.endswith("textarea")
|
||||
or node.tag.endswith("menu")
|
||||
or node.tag
|
||||
in {
|
||||
"alert",
|
||||
"canvas",
|
||||
"check-box",
|
||||
"combo-box",
|
||||
"entry",
|
||||
"icon",
|
||||
"image",
|
||||
"paragraph",
|
||||
"scroll-bar",
|
||||
"section",
|
||||
"slider",
|
||||
"static",
|
||||
"table-cell",
|
||||
"terminal",
|
||||
"text",
|
||||
"netuiribbontab",
|
||||
"start",
|
||||
"trayclockwclass",
|
||||
"traydummysearchcontrol",
|
||||
"uiimage",
|
||||
"uiproperty",
|
||||
"uiribboncommandbar",
|
||||
}
|
||||
)
|
||||
keeps = (
|
||||
keeps
|
||||
and (
|
||||
platform == "Ubuntu"
|
||||
and node.get("{{{:}}}showing".format(_state_ns), "false") == "true"
|
||||
and node.get("{{{:}}}visible".format(_state_ns), "false") == "true"
|
||||
or platform == "Windows"
|
||||
and node.get("{{{:}}}visible".format(_state_ns), "false") == "true"
|
||||
)
|
||||
and (
|
||||
node.get("name", "") != ""
|
||||
or node.text is not None
|
||||
and len(node.text) > 0
|
||||
or check_image
|
||||
and node.get("image", "false") == "true"
|
||||
)
|
||||
)
|
||||
# and (
|
||||
# node.get("{{{:}}}enabled".format(_state_ns), "false") == "true"
|
||||
# or node.get("{{{:}}}editable".format(_state_ns), "false") == "true"
|
||||
# or node.get("{{{:}}}expandable".format(_state_ns), "false") == "true"
|
||||
# or node.get("{{{:}}}checkable".format(_state_ns), "false") == "true"
|
||||
# ) \
|
||||
|
||||
coordinates: Tuple[int, int] = eval(node.get("{{{:}}}screencoord".format(_component_ns), "(-1, -1)"))
|
||||
sizes: Tuple[int, int] = eval(node.get("{{{:}}}size".format(_component_ns), "(-1, -1)"))
|
||||
keeps = keeps and coordinates[0] >= 0 and coordinates[1] >= 0 and sizes[0] > 0 and sizes[1] > 0
|
||||
return keeps
|
||||
|
||||
|
||||
def filter_nodes(root: ET, platform="Ubuntu", check_image=False):
|
||||
filtered_nodes = []
|
||||
|
||||
for node in root.iter():
|
||||
if judge_node(node, platform, check_image):
|
||||
filtered_nodes.append(node)
|
||||
|
||||
return filtered_nodes
|
||||
|
||||
|
||||
def draw_bounding_boxes(nodes, image_file_content, down_sampling_ratio=1.0, platform="Ubuntu"):
|
||||
|
||||
if platform == "Ubuntu":
|
||||
_state_ns = state_ns_ubuntu
|
||||
_component_ns = component_ns_ubuntu
|
||||
_value_ns = value_ns_ubuntu
|
||||
elif platform == "Windows":
|
||||
_state_ns = state_ns_windows
|
||||
_component_ns = component_ns_windows
|
||||
_value_ns = value_ns_windows
|
||||
else:
|
||||
raise ValueError("Invalid platform, must be 'Ubuntu' or 'Windows'")
|
||||
|
||||
# Load the screenshot image
|
||||
image_stream = io.BytesIO(image_file_content)
|
||||
image = Image.open(image_stream)
|
||||
if float(down_sampling_ratio) != 1.0:
|
||||
image = image.resize((int(image.size[0] * down_sampling_ratio), int(image.size[1] * down_sampling_ratio)))
|
||||
draw = ImageDraw.Draw(image)
|
||||
marks = []
|
||||
drew_nodes = []
|
||||
text_informations: List[str] = ["index\ttag\tname\ttext"]
|
||||
|
||||
try:
|
||||
# Adjust the path to the font file you have or use a default one
|
||||
font = ImageFont.truetype("arial.ttf", 15)
|
||||
except IOError:
|
||||
# Fallback to a basic font if the specified font can't be loaded
|
||||
font = ImageFont.load_default()
|
||||
|
||||
index = 1
|
||||
|
||||
# Loop over all the visible nodes and draw their bounding boxes
|
||||
for _node in nodes:
|
||||
coords_str = _node.attrib.get("{{{:}}}screencoord".format(_component_ns))
|
||||
size_str = _node.attrib.get("{{{:}}}size".format(_component_ns))
|
||||
|
||||
if coords_str and size_str:
|
||||
try:
|
||||
# Parse the coordinates and size from the strings
|
||||
coords = tuple(map(int, coords_str.strip("()").split(", ")))
|
||||
size = tuple(map(int, size_str.strip("()").split(", ")))
|
||||
|
||||
import copy
|
||||
|
||||
original_coords = copy.deepcopy(coords)
|
||||
original_size = copy.deepcopy(size)
|
||||
|
||||
if float(down_sampling_ratio) != 1.0:
|
||||
# Downsample the coordinates and size
|
||||
coords = tuple(int(coord * down_sampling_ratio) for coord in coords)
|
||||
size = tuple(int(s * down_sampling_ratio) for s in size)
|
||||
|
||||
# Check for negative sizes
|
||||
if size[0] <= 0 or size[1] <= 0:
|
||||
raise ValueError(f"Size must be positive, got: {size}")
|
||||
|
||||
# Calculate the bottom-right corner of the bounding box
|
||||
bottom_right = (coords[0] + size[0], coords[1] + size[1])
|
||||
|
||||
# Check that bottom_right > coords (x1 >= x0, y1 >= y0)
|
||||
if bottom_right[0] < coords[0] or bottom_right[1] < coords[1]:
|
||||
raise ValueError(f"Invalid coordinates or size, coords: {coords}, size: {size}")
|
||||
|
||||
# Check if the area only contains one color
|
||||
cropped_image = image.crop((*coords, *bottom_right))
|
||||
if len(set(list(cropped_image.getdata()))) == 1:
|
||||
continue
|
||||
|
||||
# Draw rectangle on image
|
||||
draw.rectangle([coords, bottom_right], outline="red", width=1)
|
||||
|
||||
# Draw index number at the bottom left of the bounding box with black background
|
||||
text_position = (coords[0], bottom_right[1]) # Adjust Y to be above the bottom right
|
||||
text_bbox: Tuple[int, int, int, int] = draw.textbbox(text_position, str(index), font=font, anchor="lb")
|
||||
# offset: int = bottom_right[1]-text_bbox[3]
|
||||
# text_bbox = (text_bbox[0], text_bbox[1]+offset, text_bbox[2], text_bbox[3]+offset)
|
||||
|
||||
# draw.rectangle([text_position, (text_position[0] + 25, text_position[1] + 18)], fill='black')
|
||||
draw.rectangle(text_bbox, fill="black")
|
||||
draw.text(text_position, str(index), font=font, anchor="lb", fill="white")
|
||||
|
||||
# each mark is an x, y, w, h tuple
|
||||
marks.append([original_coords[0], original_coords[1], original_size[0], original_size[1]])
|
||||
drew_nodes.append(_node)
|
||||
|
||||
if _node.text:
|
||||
node_text = _node.text if '"' not in _node.text else '"{:}"'.format(_node.text.replace('"', '""'))
|
||||
elif _node.get("{{{:}}}class".format(class_ns_windows), "").endswith("EditWrapper") and _node.get(
|
||||
"{{{:}}}value".format(_value_ns)
|
||||
):
|
||||
node_text = _node.get("{{{:}}}value".format(_value_ns), "")
|
||||
node_text = node_text if '"' not in node_text else '"{:}"'.format(node_text.replace('"', '""'))
|
||||
else:
|
||||
node_text = '""'
|
||||
text_information: str = "{:d}\t{:}\t{:}\t{:}".format(index, _node.tag, _node.get("name", ""), node_text)
|
||||
text_informations.append(text_information)
|
||||
|
||||
index += 1
|
||||
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
output_image_stream = io.BytesIO()
|
||||
image.save(output_image_stream, format="PNG")
|
||||
image_content = output_image_stream.getvalue()
|
||||
|
||||
return marks, drew_nodes, "\n".join(text_informations), image_content
|
||||
|
||||
|
||||
def print_nodes_with_indent(nodes, indent=0):
|
||||
for node in nodes:
|
||||
print(" " * indent, node.tag, node.attrib)
|
||||
print_nodes_with_indent(node, indent + 2)
|
||||
|
||||
|
||||
def find_active_applications(tree, state_ns):
|
||||
apps_with_active_tag = []
|
||||
for application in list(tree.getroot()):
|
||||
app_name = application.attrib.get("name")
|
||||
for frame in application:
|
||||
is_active = frame.attrib.get("{{{:}}}active".format(state_ns), "false")
|
||||
if is_active == "true":
|
||||
apps_with_active_tag.append(app_name)
|
||||
if apps_with_active_tag:
|
||||
to_keep = apps_with_active_tag + ["gnome-shell"]
|
||||
else:
|
||||
to_keep = ["gjs", "gnome-shell"]
|
||||
return to_keep
|
||||
|
||||
|
||||
def linearize_accessibility_tree(accessibility_tree, platform="Ubuntu"):
|
||||
if platform == "Ubuntu":
|
||||
_attributes_ns = attributes_ns_ubuntu
|
||||
_state_ns = state_ns_ubuntu
|
||||
_component_ns = component_ns_ubuntu
|
||||
_value_ns = value_ns_ubuntu
|
||||
elif platform == "Windows":
|
||||
_attributes_ns = attributes_ns_windows
|
||||
_state_ns = state_ns_windows
|
||||
_component_ns = component_ns_windows
|
||||
_value_ns = value_ns_windows
|
||||
else:
|
||||
raise ValueError("Invalid platform, must be 'Ubuntu' or 'Windows'")
|
||||
|
||||
try:
|
||||
tree = ET.ElementTree(ET.fromstring(accessibility_tree))
|
||||
keep_apps = find_active_applications(tree, _state_ns)
|
||||
|
||||
# Remove inactive applications
|
||||
for application in list(tree.getroot()):
|
||||
if application.get("name") not in keep_apps:
|
||||
tree.getroot().remove(application)
|
||||
|
||||
filtered_nodes = filter_nodes(tree.getroot(), platform, check_image=True)
|
||||
linearized_accessibility_tree = ["tag\ttext\tposition (center x & y)\tsize (w & h)"]
|
||||
|
||||
# Linearize the accessibility tree nodes into a table format
|
||||
for node in filtered_nodes:
|
||||
try:
|
||||
text = node.text if node.text is not None else ""
|
||||
text = text.strip()
|
||||
name = node.get("name", "").strip()
|
||||
if text == "":
|
||||
text = name
|
||||
elif name != "" and text != name:
|
||||
text = f"{name} ({text})"
|
||||
|
||||
text = text.replace("\n", "\\n")
|
||||
pos = node.get("{{{:}}}screencoord".format(_component_ns), "")
|
||||
size = node.get("{{{:}}}size".format(_component_ns), "")
|
||||
|
||||
x, y = re.match(f"\((\d+), (\d+)\)", pos).groups()
|
||||
w, h = re.match(f"\((\d+), (\d+)\)", size).groups()
|
||||
x_mid, y_mid = int(x) + int(w) // 2, int(y) + int(h) // 2
|
||||
|
||||
linearized_accessibility_tree.append(
|
||||
"{:}\t{:}\t{:}\t{:}".format(node.tag, text, f"({x_mid}, {y_mid})", size)
|
||||
)
|
||||
except Exception as e:
|
||||
continue
|
||||
|
||||
# Filter out similar nodes
|
||||
linearized_accessibility_tree = filter_similar_nodes("\n".join(linearized_accessibility_tree))
|
||||
except Exception as e:
|
||||
print(f"Error in linearize_accessibility_tree: {e}")
|
||||
linearized_accessibility_tree = ""
|
||||
|
||||
return linearized_accessibility_tree
|
||||
|
||||
|
||||
def trim_accessibility_tree(linearized_accessibility_tree, max_items):
|
||||
lines = linearized_accessibility_tree.strip().split("\n")
|
||||
if len(lines) > max_items:
|
||||
lines = lines[:max_items]
|
||||
linearized_accessibility_tree = "\n".join(lines)
|
||||
linearized_accessibility_tree += "\n..."
|
||||
return linearized_accessibility_tree
|
||||
100
mm_agents/autoglm/prompt/deduplicate_node.py
Normal file
100
mm_agents/autoglm/prompt/deduplicate_node.py
Normal file
@@ -0,0 +1,100 @@
|
||||
import re
|
||||
|
||||
|
||||
def parse_line(line):
|
||||
# 解析格式,如:label Google Chrome (191, 13) (104, 17)
|
||||
pattern = r"^(\S+)\s+(.+?)\s+\((\d+), (\d+)\)\s+\((\d+), (\d+)\)"
|
||||
m = re.match(pattern, line)
|
||||
if not m:
|
||||
return None
|
||||
node_type, text, cx, cy, w, h = m.groups()
|
||||
cx, cy, w, h = map(int, (cx, cy, w, h))
|
||||
# bounding box as (x1, y1, x2, y2)
|
||||
x1 = cx - w // 2
|
||||
y1 = cy - h // 2
|
||||
x2 = x1 + w
|
||||
y2 = y1 + h
|
||||
return {
|
||||
"type": node_type,
|
||||
"text": text.strip(),
|
||||
"bbox": (x1, y1, x2, y2),
|
||||
"center": (cx, cy),
|
||||
"size": (w, h),
|
||||
"raw": line,
|
||||
}
|
||||
|
||||
|
||||
def iou(box1, box2):
|
||||
# box: (x1, y1, x2, y2)
|
||||
xi1 = max(box1[0], box2[0])
|
||||
yi1 = max(box1[1], box2[1])
|
||||
xi2 = min(box1[2], box2[2])
|
||||
yi2 = min(box1[3], box2[3])
|
||||
inter_width = max(0, xi2 - xi1)
|
||||
inter_height = max(0, yi2 - yi1)
|
||||
inter_area = inter_width * inter_height
|
||||
area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
|
||||
area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
|
||||
union = area1 + area2 - inter_area
|
||||
if union == 0:
|
||||
return 0
|
||||
return inter_area / union
|
||||
|
||||
|
||||
def norm_text(s):
|
||||
# 归一化文本:小写、去空格等
|
||||
return re.sub(r"\s+", "", s.lower())
|
||||
|
||||
|
||||
def text_similarity(a, b):
|
||||
# 简单判定:完全一致为1,否则0
|
||||
na, nb = norm_text(a), norm_text(b)
|
||||
if na == nb:
|
||||
return 1.0
|
||||
else:
|
||||
return 0
|
||||
|
||||
|
||||
def filter_similar_nodes(linearized_accessibility_tree):
|
||||
lines = [ln for ln in linearized_accessibility_tree.split("\n") if ln.strip()]
|
||||
# parse all nodes
|
||||
nodes = []
|
||||
for ln in lines:
|
||||
node = parse_line(ln)
|
||||
if node:
|
||||
nodes.append(node)
|
||||
else:
|
||||
# 解析不了的保留
|
||||
nodes.append({"raw": ln, "invalid": True})
|
||||
filtered = []
|
||||
removed = [False] * len(nodes)
|
||||
# 阈值可自行调整
|
||||
IOU_THRESH = 0.2
|
||||
TEXT_THRESH = 0.9
|
||||
for i, ni in enumerate(nodes):
|
||||
if ni.get("invalid"):
|
||||
filtered.append(ni["raw"])
|
||||
continue
|
||||
if removed[i]:
|
||||
continue
|
||||
for j in range(i + 1, len(nodes)):
|
||||
nj = nodes[j]
|
||||
if nj.get("invalid"):
|
||||
continue
|
||||
iou_val = iou(ni["bbox"], nj["bbox"])
|
||||
text_sim = text_similarity(ni["text"], nj["text"])
|
||||
if iou_val > IOU_THRESH and text_sim > TEXT_THRESH:
|
||||
# 二者极其相似,移除后者
|
||||
removed[j] = True
|
||||
# print(f"移除: {nj['raw']} (与 {ni['raw']} 相似度高)")
|
||||
# 保留未被标记为移除的
|
||||
if not removed[i]:
|
||||
filtered.append(ni["raw"])
|
||||
return "\n".join(filtered)
|
||||
|
||||
|
||||
# 示例用法
|
||||
if __name__ == "__main__":
|
||||
linearized_accessibility_tree = "tag\ttext\tposition (center x & y)\tsize (w & h)\nicon\t\t(1853, 1001)\t(64, 64)\nlabel\tHome\t(1853, 1045)\t(40, 17)\nlabel\tActivities\t(49, 13)\t(63, 17)\ntext\tActivities\t(49, 13)\t(63, 17)\nlabel\tApr 17 17∶04\t(995, 13)\t(117, 27)\ntext\tApr 17 17∶04\t(995, 13)\t(87, 18)\nmenu\tSystem\t(1867, 13)\t(106, 27)\npush-button\tGoogle Chrome\t(35, 65)\t(70, 64)\npush-button\tThunderbird Mail\t(35, 133)\t(70, 64)\npush-button\tVisual Studio Code\t(35, 201)\t(70, 64)\npush-button\tVLC media player\t(35, 269)\t(70, 64)\npush-button\tLibreOffice Writer\t(35, 337)\t(70, 64)\npush-button\tLibreOffice Calc\t(35, 405)\t(70, 64)\npush-button\tLibreOffice Impress\t(35, 473)\t(70, 64)\npush-button\tGNU Image Manipulation Program\t(35, 541)\t(70, 64)\npush-button\tFiles\t(35, 609)\t(70, 64)\npush-button\tUbuntu Software\t(35, 677)\t(70, 64)\npush-button\tHelp\t(35, 745)\t(70, 64)\npush-button\tTrash\t(35, 816)\t(70, 64)\ntoggle-button\tShow Applications\t(35, 1045)\t(70, 70)"
|
||||
result = filter_similar_nodes(linearized_accessibility_tree)
|
||||
print(result)
|
||||
259
mm_agents/autoglm/prompt/grounding_agent.py
Normal file
259
mm_agents/autoglm/prompt/grounding_agent.py
Normal file
@@ -0,0 +1,259 @@
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import xml.etree.ElementTree as ET
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger("desktopenv.agent")
|
||||
|
||||
|
||||
def agent_action(func):
|
||||
func.is_agent_action = True
|
||||
return func
|
||||
|
||||
|
||||
switch_window_code = """import subprocess;
|
||||
import pyautogui;
|
||||
pyautogui.press('escape');
|
||||
time.sleep(0.5);
|
||||
subprocess.run(['wmctrl', '-ia', 'WINDOW_ID'])
|
||||
subprocess.run(['wmctrl', '-ir', 'WINDOW_ID', '-b', 'add,maximized_vert,maximized_horz'])
|
||||
print('Switch to WINDOW_ID')"""
|
||||
|
||||
launch_app_commands = {
|
||||
# Web Browser
|
||||
"chrome": "google-chrome --remote-debugging-port=1337",
|
||||
# File Manager
|
||||
"files": "nautilus",
|
||||
# Terminal
|
||||
"terminal": 'export DBUS_SESSION_BUS_ADDRESS="unix:path=/run/user/1000/bus" && gnome-terminal',
|
||||
# Utilities
|
||||
"gedit": "gedit",
|
||||
# Office
|
||||
"libreoffice writer": "libreoffice --writer",
|
||||
"libreoffice calc": "libreoffice --calc",
|
||||
"libreoffice impress": "libreoffice --impress",
|
||||
# System
|
||||
"settings": 'export DBUS_SESSION_BUS_ADDRESS="unix:path=/run/user/1000/bus" && gnome-control-center',
|
||||
# Multimedia
|
||||
"vlc": "vlc",
|
||||
"gimp": "gimp",
|
||||
# IDE
|
||||
"vs code": "code",
|
||||
# Email
|
||||
"thunderbird": "thunderbird",
|
||||
}
|
||||
|
||||
|
||||
class GroundingAgent:
|
||||
|
||||
tool_list = {
|
||||
"libreoffice_calc": "CalcTools",
|
||||
"libreoffice_impress": "ImpressTools",
|
||||
"libreoffice_writer": "WriterTools",
|
||||
"code": "CodeTools",
|
||||
"vlc": "VLCTools",
|
||||
"google_chrome": "BrowserTools",
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def tool_commands(cls, code: str, tool_name: str):
|
||||
command = f"from {tool_name} import *; "
|
||||
command += code
|
||||
|
||||
tool_class = cls.tool_list[tool_name]
|
||||
command += f"; {tool_class}.print_result()"
|
||||
|
||||
return [
|
||||
command,
|
||||
]
|
||||
|
||||
@classmethod
|
||||
@agent_action
|
||||
def click(
|
||||
cls,
|
||||
coordinates: List,
|
||||
num_clicks: int = 1,
|
||||
button_type: str = "left",
|
||||
):
|
||||
"""
|
||||
Click on the element.
|
||||
|
||||
Args:
|
||||
coordinates (List): [x, y], Coordinates of the element to click on
|
||||
num_clicks (int): number of times to click the element
|
||||
button_type (str): which mouse button to press can be "left", "middle", or "right"
|
||||
"""
|
||||
command = ""
|
||||
x, y = coordinates
|
||||
command += f"""pyautogui.click({x}, {y}, clicks={num_clicks}, button={repr(button_type)}); print("Click Success")""" # TODO: 最大化窗口需要一次调用
|
||||
return command
|
||||
|
||||
@classmethod
|
||||
@agent_action
|
||||
def type(
|
||||
cls,
|
||||
coordinates: Optional[List] = None,
|
||||
text: str = "",
|
||||
overwrite: bool = False,
|
||||
enter: bool = False,
|
||||
):
|
||||
"""
|
||||
Type text into the element.
|
||||
|
||||
Args:
|
||||
coordinates (List): [x, y] Coordinates of the element to type into. If not provided, typing will start at the current cursor location.
|
||||
text (str): the text to type
|
||||
overwrite (bool): Assign it to True if the text should overwrite the existing text, otherwise assign it to False. Using this argument clears all text in an element.
|
||||
enter (bool): Assign it to True if the enter key should be pressed after typing the text, otherwise assign it to False.
|
||||
"""
|
||||
|
||||
command = ""
|
||||
|
||||
if coordinates is not None:
|
||||
# Start typing at the center of the element
|
||||
x, y = coordinates
|
||||
command += f"pyautogui.click({x}, {y}); "
|
||||
|
||||
if overwrite:
|
||||
command += f"pyautogui.hotkey('ctrl', 'a'); pyautogui.press('backspace'); "
|
||||
|
||||
command += f"pyautogui.write({repr(text)}); "
|
||||
|
||||
if enter:
|
||||
command += "pyautogui.press('enter'); "
|
||||
|
||||
command += "print('Type Success')"
|
||||
|
||||
return command
|
||||
|
||||
@classmethod
|
||||
@agent_action
|
||||
def drag_and_drop(cls, drag_from_coordinates: List, drop_on_coordinates: List):
|
||||
"""
|
||||
Drag element1 and drop it on element2.
|
||||
|
||||
Args:
|
||||
drag_from_coordinates (List): [x, y] Coordinates of element to drag
|
||||
drop_on_coordinates (List): [x, y] Coordinates of element to drop on
|
||||
"""
|
||||
x1, y1 = drag_from_coordinates
|
||||
x2, y2 = drop_on_coordinates
|
||||
|
||||
command = f"pyautogui.moveTo({x1}, {y1}); "
|
||||
# TODO: specified duration?
|
||||
command += f"pyautogui.dragTo({x2}, {y2}, duration=1.); pyautogui.mouseUp(); "
|
||||
|
||||
command += "print('Drag and Drop Success')"
|
||||
|
||||
return command
|
||||
|
||||
@classmethod
|
||||
@agent_action
|
||||
def scroll(cls, coordinates: List, direction: str):
|
||||
"""
|
||||
Scroll the element in the specified direction.
|
||||
|
||||
Args:
|
||||
coordinates (List): [x, y] Coordinates of the element to scroll in
|
||||
direction (str): the direction to scroll can be "up" or "down".
|
||||
"""
|
||||
x, y = coordinates
|
||||
amount = 100 if direction == "up" else -100
|
||||
return f"import pyautogui; pyautogui.moveTo({x}, {y}); pyautogui.scroll({amount}); print('Scroll Success')"
|
||||
|
||||
@classmethod
|
||||
@agent_action
|
||||
def open_app(cls, app_name: str):
|
||||
"""
|
||||
Open a specified application.
|
||||
|
||||
App List:
|
||||
- chrome
|
||||
- files
|
||||
- terminal
|
||||
- gedit
|
||||
- libreoffice writer
|
||||
- libreoffice calc
|
||||
- libreoffice impress
|
||||
- vs code
|
||||
- vlc
|
||||
- gimp
|
||||
- settings
|
||||
- thunderbird
|
||||
|
||||
Args:
|
||||
app_name (str): Name of the application to open
|
||||
"""
|
||||
|
||||
app_name = app_name.lower().strip()
|
||||
|
||||
if app_name not in launch_app_commands:
|
||||
command = f"print(f'{app_name} is not supported or recognized')"
|
||||
else:
|
||||
command = {
|
||||
"action_type": "OPEN_APP",
|
||||
"parameters": {"launch_app_command": launch_app_commands[app_name], "app_name": app_name},
|
||||
}
|
||||
|
||||
return command
|
||||
|
||||
@classmethod
|
||||
@agent_action
|
||||
def switch_window(cls, window_id: str):
|
||||
"""
|
||||
Switch to the window with the given window id.
|
||||
|
||||
Args:
|
||||
window_id (str): the window id to switch to from the provided list of open windows
|
||||
"""
|
||||
return switch_window_code.replace("WINDOW_ID", window_id)
|
||||
|
||||
@classmethod
|
||||
@agent_action
|
||||
def hotkey(cls, keys: List):
|
||||
"""
|
||||
Press a hotkey combination.
|
||||
|
||||
Args:
|
||||
keys (List): the keys to press in combination in a list format (e.g. ['ctrl', 'c'] for copy, ['prtsc'] for screenshot)
|
||||
"""
|
||||
# add quotes around the keys
|
||||
keys = [f"'{key}'" for key in keys]
|
||||
key_str = ", ".join(keys).replace("'", "\\'")
|
||||
return f"import pyautogui; pyautogui.hotkey({', '.join(keys)}); print(f'Press Hotkey: {key_str}')"
|
||||
|
||||
@classmethod
|
||||
@agent_action
|
||||
def quote(cls, content: str):
|
||||
"""
|
||||
Quoting information from the current page for memory. Only you can see the quoted content.
|
||||
|
||||
Args:
|
||||
content (str): text summarized or copied from the page for later operation.
|
||||
"""
|
||||
return f'''print("""{content}""")'''
|
||||
|
||||
@classmethod
|
||||
@agent_action
|
||||
def wait(cls):
|
||||
"""
|
||||
Wait for a while.
|
||||
|
||||
"""
|
||||
return "WAIT"
|
||||
|
||||
@classmethod
|
||||
@agent_action
|
||||
def exit(cls, success: bool):
|
||||
"""
|
||||
End the current task.
|
||||
|
||||
Args:
|
||||
success (bool): True if successfully finish a task, otherwise set it False
|
||||
"""
|
||||
if success:
|
||||
return "DONE"
|
||||
else:
|
||||
return "FAIL"
|
||||
202
mm_agents/autoglm/prompt/procedural_memory.py
Normal file
202
mm_agents/autoglm/prompt/procedural_memory.py
Normal file
@@ -0,0 +1,202 @@
|
||||
import inspect
|
||||
import json
|
||||
import os
|
||||
import textwrap
|
||||
|
||||
current_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
|
||||
def generate_func(json_data):
|
||||
# 收集所有类名和它们的函数
|
||||
class_funcs = {}
|
||||
no_class_funcs = []
|
||||
cls_name = ""
|
||||
|
||||
for item in json_data:
|
||||
if item["type"] == "function":
|
||||
func = item["function"]
|
||||
func_parts = func["name"].split(".")
|
||||
|
||||
if len(func_parts) == 2:
|
||||
class_name, func_name = func_parts
|
||||
if class_name not in class_funcs:
|
||||
class_funcs[class_name] = []
|
||||
class_funcs[class_name].append(item)
|
||||
else:
|
||||
no_class_funcs.append(item)
|
||||
|
||||
code = ""
|
||||
|
||||
# 生成有类的函数
|
||||
for class_name, funcs in class_funcs.items():
|
||||
code += f"class {class_name}:\n"
|
||||
cls_name = class_name
|
||||
for item in funcs:
|
||||
func = item["function"]
|
||||
func_name = func["name"].split(".")[-1]
|
||||
description = func["description"]
|
||||
params = func["parameters"]["properties"]
|
||||
required = func["parameters"].get("required", [])
|
||||
|
||||
# 构建参数列表
|
||||
param_list = ["cls"]
|
||||
# 首先添加必需参数
|
||||
for param_name in required:
|
||||
param_list.append(f"{param_name}")
|
||||
# 然后添加可选参数
|
||||
for param_name in params:
|
||||
if param_name not in required:
|
||||
param_list.append(f"{param_name}") # 可选参数默认值设为None
|
||||
|
||||
# 构建函数定义
|
||||
func_def = f" def {func_name}({', '.join(param_list)}):\n"
|
||||
|
||||
# 构建文档字符串
|
||||
docstring = f' """\n {description}\n\n Args:\n'
|
||||
if len(param_list) == 1: # 只有cls参数
|
||||
docstring += " None\n"
|
||||
else:
|
||||
# 首先记录必需参数
|
||||
for param_name in required:
|
||||
param_type = params[param_name]["type"]
|
||||
param_desc = params[param_name].get("description", "")
|
||||
docstring += f" {param_name} ({param_type}): {param_desc}\n"
|
||||
# 然后记录可选参数
|
||||
for param_name in params:
|
||||
if param_name not in required:
|
||||
param_type = params[param_name]["type"]
|
||||
param_desc = params[param_name].get("description", "")
|
||||
docstring += f" {param_name} ({param_type}, optional): {param_desc}\n"
|
||||
|
||||
docstring += ' """\n'
|
||||
|
||||
code += func_def + docstring + "\n"
|
||||
|
||||
code += "\n"
|
||||
|
||||
# 生成没有类的函数
|
||||
for item in no_class_funcs:
|
||||
func = item["function"]
|
||||
func_name = func["name"]
|
||||
description = func["description"]
|
||||
params = func["parameters"]["properties"]
|
||||
required = func["parameters"].get("required", [])
|
||||
|
||||
# 构建参数列表
|
||||
param_list = []
|
||||
# 首先添加必需参数
|
||||
for param_name in required:
|
||||
param_list.append(f"{param_name}")
|
||||
# 然后添加可选参数
|
||||
for param_name in params:
|
||||
if param_name not in required:
|
||||
param_list.append(f"{param_name}")
|
||||
|
||||
# 构建函数定义
|
||||
func_def = f"def {func_name}({', '.join(param_list)}):\n"
|
||||
|
||||
# 构建文档字符串
|
||||
docstring = f' """\n {description}\n\n Args:\n'
|
||||
if not param_list:
|
||||
docstring += " None\n"
|
||||
else:
|
||||
# 首先记录必需参数
|
||||
for param_name in required:
|
||||
param_type = params[param_name]["type"]
|
||||
param_desc = params[param_name].get("description", "")
|
||||
docstring += f" {param_name} ({param_type}): {param_desc}\n"
|
||||
# 然后记录可选参数
|
||||
for param_name in params:
|
||||
if param_name not in required:
|
||||
param_type = params[param_name]["type"]
|
||||
param_desc = params[param_name].get("description", "")
|
||||
docstring += f" {param_name} ({param_type}, optional): {param_desc}\n"
|
||||
|
||||
docstring += ' """\n'
|
||||
|
||||
code += func_def + docstring + "\n"
|
||||
|
||||
return code.strip(), cls_name
|
||||
|
||||
|
||||
setup_prompt = """You are an agent which follow my instruction and perform desktop computer tasks as instructed.
|
||||
You have good knowledge of computer and good internet connection and assume your code will run on a computer for controlling the mouse and keyboard.
|
||||
For each step, you will get an observation of the desktop by 1) screenshot; 2) current application name; 3) accessibility tree, which is based on AT-SPI library; 4) application info; 5) last action result.
|
||||
You should first generate a plan for completing the task, confirm the previous results, reflect on the current status, then generate operations to complete the task in python-style pseudo code using the predefined functions.
|
||||
|
||||
Your output should STRICTLY follow the format:
|
||||
<think>
|
||||
{**YOUR-PLAN-AND-THINKING**}
|
||||
</think>
|
||||
```python
|
||||
{**ONE-LINE-OF-CODE**}
|
||||
```"""
|
||||
|
||||
func_def_tool_template = """You will be provided access to the following methods to interact with the UI:
|
||||
1. class Agent, a grounding agent which provides basic action space to interact with desktop.
|
||||
2. class {tool_class_name}, which provides tools to interact with the current application {app_name}.
|
||||
|
||||
Here are the defination of the classes:
|
||||
```python
|
||||
{class_content}
|
||||
```"""
|
||||
|
||||
func_def_template = """You will be provided access to the following methods to interact with the UI:
|
||||
|
||||
```python
|
||||
{class_content}
|
||||
```"""
|
||||
|
||||
note_prompt = """* Note:
|
||||
- Your code should be wrapped in ```python```, and your plan and thinking should be wrapped in <think></think>.
|
||||
- Only **ONE-LINE-OF-CODE** at a time.
|
||||
- Each code block is context independent, and variables from the previous round cannot be used in the next round.
|
||||
- Do not put anything other than python code in ```python```.
|
||||
- You **can only use the above methods to interact with the UI**, do not invent new methods.
|
||||
- Return with `Agent.exit(success=True)` immediately after the task is completed.
|
||||
- If you think cannot complete the task, **DO NOT keep repeating actions, just return with `Agent.exit(success=False)`.**
|
||||
- The computer's environment is Linux, e.g., Desktop path is '/home/user/Desktop'
|
||||
- My computer's password is '{client_password}', feel free to use it when you need sudo rights"""
|
||||
|
||||
|
||||
class Prompt:
|
||||
@staticmethod
|
||||
def construct_procedural_memory(agent_class, app_name=None, client_password="password"):
|
||||
agent_class_content = "Class Agent:"
|
||||
for attr_name in dir(agent_class):
|
||||
attr = getattr(agent_class, attr_name)
|
||||
if callable(attr) and hasattr(attr, "is_agent_action"):
|
||||
# Use inspect to get the full function signature
|
||||
signature = inspect.signature(attr)
|
||||
agent_class_content += f"""
|
||||
def {attr_name}{signature}:
|
||||
'''{attr.__doc__}'''
|
||||
"""
|
||||
|
||||
if app_name is not None:
|
||||
tool_path = os.path.join(current_dir, "tools", "apis", f"{app_name.lower()}.json")
|
||||
with open(tool_path, "r") as f:
|
||||
json_data = json.load(f)
|
||||
|
||||
tool_class_content, tool_class_name = generate_func(json_data)
|
||||
|
||||
agent_class_content += "\n\n{}".format(tool_class_content)
|
||||
func_def_prompt = func_def_tool_template.format(
|
||||
class_content=agent_class_content.strip(),
|
||||
tool_class_name=tool_class_name,
|
||||
app_name=app_name,
|
||||
client_password=client_password,
|
||||
)
|
||||
else:
|
||||
func_def_prompt = func_def_template.format(class_content=agent_class_content.strip())
|
||||
note_prompt_formatted = note_prompt.format(client_password=client_password)
|
||||
|
||||
# procedural_memory = f"{setup_prompt}\n\n{func_def_prompt}\n\n{note_prompt}".strip()
|
||||
# return procedural_memory
|
||||
return setup_prompt, func_def_prompt, note_prompt_formatted
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from grounding_agent import GroundingAgent
|
||||
|
||||
print(Prompt.construct_procedural_memory(GroundingAgent, "vlc"))
|
||||
Reference in New Issue
Block a user