Compare commits

..

10 Commits

Author SHA1 Message Date
cui0711
231f7a8fbc feat(eval): add jade test case and update test categories 2026-01-30 16:29:05 +08:00
cui0711
716d82f4d1 feat: add flexible recording control and improve execution logging 2026-01-30 16:28:13 +08:00
cui0711
47bcfc0f0b feat(agent): add screenshot compression and dynamic resolution support 2026-01-30 16:28:02 +08:00
cui0711
7e9090e115 fix(prompts): fix template variable syntax and add dynamic resolution 2026-01-30 16:28:02 +08:00
cui0711
308282e830 feat(server): add cross-platform support and improve screenshot handling 2026-01-30 16:27:49 +08:00
cui0711
788b248dbc fix(logger): add Windows platform support for file locking 2026-01-30 16:27:49 +08:00
alexandruilie7
5463d3bb89 uipath v2 (#413)
* submission v2

* small updates
2026-01-09 08:47:20 +08:00
蘑菇先生
5ef8bdfa35 EvoCUA Update (2025.01.05) (#412)
* evocua init

* setup max_token

* evocua update

---------

Co-authored-by: xuetaofeng <xuetaofeng@meituan.com>
Co-authored-by: Tianbao Xie <47296835+Timothyxxx@users.noreply.github.com>
2026-01-05 16:14:53 +08:00
Bowen Yang
439e178a2e fix(os_symphony_evaluation) (#410)
* fix(os_symphony)

* Update desktop_env_os_symphony.py

* fix(os_symphony_desktop)

* fix(os_symphony_start)

* Add docstring to run_multienv_os_symphony.py

Added documentation header for the evaluation script.
2026-01-04 15:56:51 +08:00
Bowen Yang
951e1928c8 fix(desktop_os_symphony):support aws (#406)
* fix(os_symphony)

* Update desktop_env_os_symphony.py
2026-01-01 11:27:34 +08:00
27 changed files with 1632 additions and 1017 deletions

View File

@@ -101,7 +101,7 @@ class DesktopEnv(gym.Env):
provider_name: str = "vmware", provider_name: str = "vmware",
region: str = None, region: str = None,
path_to_vm: str = None, path_to_vm: str = None,
snapshot_name: str = "init_state", snapshot_name: str = "snapshot",
action_space: str = "pyautogui", action_space: str = "pyautogui",
cache_dir: str = "cache", cache_dir: str = "cache",
screen_size: Tuple[int] = (int(os.environ.get("SCREEN_WIDTH", 1920)), int(os.environ.get("SCREEN_HEIGHT", 1080))), screen_size: Tuple[int] = (int(os.environ.get("SCREEN_WIDTH", 1920)), int(os.environ.get("SCREEN_HEIGHT", 1080))),
@@ -117,7 +117,7 @@ class DesktopEnv(gym.Env):
provider_name (str): virtualization provider name, default to "vmware" provider_name (str): virtualization provider name, default to "vmware"
region (str): the region for allocate machines, work for cloud services, default to "us-east-1" region (str): the region for allocate machines, work for cloud services, default to "us-east-1"
path_to_vm (str): path to .vmx file path_to_vm (str): path to .vmx file
snapshot_name (str): snapshot name to revert to, default to "init_state" snapshot_name (str): snapshot name to revert to, default to "snapshot"
action_space (str): "computer_13" | "pyautogui" action_space (str): "computer_13" | "pyautogui"
cache_dir (str): cache directory to cache task-related stuffs like cache_dir (str): cache directory to cache task-related stuffs like
reference file for evaluation reference file for evaluation
@@ -265,7 +265,7 @@ class DesktopEnv(gym.Env):
self.current_use_proxy = task_use_proxy self.current_use_proxy = task_use_proxy
if self.is_environment_used: if self.is_environment_used:
logger.info("Environment has been used, reverting to snapshot {}...".format(self.snapshot_name)) logger.info("Environment has been used, reverting to snapshot: {}...".format(self.snapshot_name))
self._revert_to_snapshot() self._revert_to_snapshot()
logger.info("Starting emulator...") logger.info("Starting emulator...")
self._start_emulator() self._start_emulator()
@@ -402,6 +402,7 @@ class DesktopEnv(gym.Env):
if self.action_space == "computer_13": if self.action_space == "computer_13":
# the set of all possible actions defined in the action representation # the set of all possible actions defined in the action representation
logger.info(f"======executing here======{self.action_space}========================")
self.controller.execute_action(action) self.controller.execute_action(action)
elif self.action_space == "pyautogui" or self.action_space == "claude_computer_use": elif self.action_space == "pyautogui" or self.action_space == "claude_computer_use":
if action in ['WAIT', 'FAIL', 'DONE'] or (type(action) == dict and action.get('action_type') in ['WAIT', 'FAIL', 'DONE']): if action in ['WAIT', 'FAIL', 'DONE'] or (type(action) == dict and action.get('action_type') in ['WAIT', 'FAIL', 'DONE']):
@@ -411,6 +412,8 @@ class DesktopEnv(gym.Env):
if type(action) == str: if type(action) == str:
# Fix PyAutoGUI '<' character bug before execution # Fix PyAutoGUI '<' character bug before execution
fixed_command = _fix_pyautogui_less_than_bug(action) fixed_command = _fix_pyautogui_less_than_bug(action)
logger.info(f"======executing here======{self.action_space}========================")
logger.info(f"Fixed command: {fixed_command}")
self.controller.execute_python_command(fixed_command) self.controller.execute_python_command(fixed_command)
elif type(action) == dict: elif type(action) == dict:
# Fix PyAutoGUI '<' character bug before execution # Fix PyAutoGUI '<' character bug before execution

View File

@@ -151,10 +151,9 @@ class DesktopEnv(gym.Env):
# Initialize with default (no proxy) provider # Initialize with default (no proxy) provider
self.current_use_proxy = False self.current_use_proxy = False
# self.manager, self.provider = create_vm_manager_and_provider(provider_name, region, use_proxy=False)
self.manager, self.provider = None, None self.manager, self.provider = None, None
self.os_type = os_type self.os_type = os_type
self.path_to_vm = path_to_vm
# Track whether environment has been used (step/setup) to optimize snapshot revert # Track whether environment has been used (step/setup) to optimize snapshot revert
# docker, aws, gcp, azure are always unused as the emulator starts from a clean state # docker, aws, gcp, azure are always unused as the emulator starts from a clean state
# vmware, virtualbox are always used as the emulator starts from a dirty state # vmware, virtualbox are always used as the emulator starts from a dirty state
@@ -165,24 +164,12 @@ class DesktopEnv(gym.Env):
else: else:
raise ValueError(f"Invalid provider name: {self.provider_name}") raise ValueError(f"Invalid provider name: {self.provider_name}")
# Initialize environment variables
if path_to_vm:
self.path_to_vm = os.path.abspath(os.path.expandvars(os.path.expanduser(path_to_vm))) \
if provider_name in {"vmware", "virtualbox"} else path_to_vm
else:
self.path_to_vm = self.manager.get_vm_path(os_type=self.os_type, region=region, screen_size=(self.screen_width, self.screen_height))
self.snapshot_name = snapshot_name self.snapshot_name = snapshot_name
self.cache_dir_base: str = cache_dir self.cache_dir_base: str = cache_dir
# todo: add the logic to get the screen size from the VM
self.headless = headless self.headless = headless
self.require_a11y_tree = require_a11y_tree self.require_a11y_tree = require_a11y_tree
self.require_terminal = require_terminal self.require_terminal = require_terminal
# Initialize emulator and controller
# logger.info("Initializing...")
# self._start_emulator()
# mode: human or machine # mode: human or machine
self.instruction = None self.instruction = None
assert action_space in ["computer_13", "pyautogui", "claude_computer_use", "autoglm_computer_use"] assert action_space in ["computer_13", "pyautogui", "claude_computer_use", "autoglm_computer_use"]
@@ -199,11 +186,13 @@ class DesktopEnv(gym.Env):
if not self.manager and not self.provider: if not self.manager and not self.provider:
logger.info("Initializing...") logger.info("Initializing...")
self.manager, self.provider = create_vm_manager_and_provider(self.provider_name, self.region, use_proxy=False) self.manager, self.provider = create_vm_manager_and_provider(self.provider_name, self.region, use_proxy=False)
if self.path_to_vm: if self.path_to_vm:
self.path_to_vm = os.path.abspath(os.path.expandvars(os.path.expanduser(self.path_to_vm))) \ self.path_to_vm = os.path.abspath(os.path.expandvars(os.path.expanduser(self.path_to_vm))) \
if self.provider_name in {"vmware", "virtualbox"} else self.path_to_vm if self.provider_name in {"vmware", "virtualbox"} else self.path_to_vm
else: else:
self.path_to_vm = self.manager.get_vm_path(os_type=self.os_type, region=self.region, screen_size=(self.screen_width, self.screen_height)) self.path_to_vm = self.manager.get_vm_path(os_type=self.os_type, region=self.region, screen_size=(self.screen_width, self.screen_height))
self._start_emulator() self._start_emulator()
def _start_emulator(self): def _start_emulator(self):
@@ -344,6 +333,8 @@ class DesktopEnv(gym.Env):
def _set_evaluator_info(self, task_config: Dict[str, Any]): def _set_evaluator_info(self, task_config: Dict[str, Any]):
"""Set evaluator information from task config""" """Set evaluator information from task config"""
if "evaluator" not in task_config:
return
# evaluator dict # evaluator dict
# func -> metric function string, or list of metric function strings # func -> metric function string, or list of metric function strings
# conj -> conjunction of multiple metrics if func is a list with length > 1, "and"/"or" # conj -> conjunction of multiple metrics if func is a list with length > 1, "and"/"or"

View File

@@ -4,25 +4,27 @@ import platform
import shlex import shlex
import json import json
import subprocess, signal import subprocess, signal
import sys
import time import time
from pathlib import Path from pathlib import Path
from typing import Any, Optional, Sequence from typing import Any, Optional, Sequence
from typing import List, Dict, Tuple, Literal from typing import List, Dict, Tuple, Literal
import concurrent.futures import concurrent.futures
import Xlib
import lxml.etree import lxml.etree
import pyautogui import pyautogui
import requests import requests
import re import re
from PIL import Image, ImageGrab from PIL import Image, ImageGrab
from Xlib import display, X
from flask import Flask, request, jsonify, send_file, abort # , send_from_directory from flask import Flask, request, jsonify, send_file, abort # , send_from_directory
from lxml.etree import _Element from lxml.etree import _Element
platform_name: str = platform.system() platform_name: str = platform.system()
if platform_name == "Linux": if platform_name == "Linux":
import Xlib
from Xlib import display, X
from pyxcursor import Xcursor
import pyatspi import pyatspi
from pyatspi import Accessible, StateType, STATE_SHOWING from pyatspi import Accessible, StateType, STATE_SHOWING
from pyatspi import Action as ATAction from pyatspi import Action as ATAction
@@ -39,9 +41,14 @@ elif platform_name == "Windows":
import win32ui, win32gui import win32ui, win32gui
Accessible = Any Accessible = Any
Xlib = None
display = None
X = None
Xcursor = None
elif platform_name == "Darwin": elif platform_name == "Darwin":
import plistlib import plistlib
from pyxcursor import Xcursor
import AppKit import AppKit
import ApplicationServices import ApplicationServices
@@ -51,13 +58,16 @@ elif platform_name == "Darwin":
Accessible = Any Accessible = Any
BaseWrapper = Any BaseWrapper = Any
Xlib = None
else: else:
# Platform not supported # Platform not supported
Accessible = None Accessible = None
BaseWrapper = Any BaseWrapper = Any
Xlib = None
from pyxcursor import Xcursor display = None
X = None
Xcursor = None
# todo: need to reformat and organize this whole file # todo: need to reformat and organize this whole file
@@ -89,6 +99,10 @@ def execute_command():
if arg.startswith("~/"): if arg.startswith("~/"):
command[i] = os.path.expanduser(arg) command[i] = os.path.expanduser(arg)
# Replace 'python' with sys.executable to use the same Python interpreter as the server
if len(command) > 0 and command[0] in ['python', 'python3', 'python.exe', 'python3.exe']:
command[0] = sys.executable
# Execute the command without any safety checks. # Execute the command without any safety checks.
try: try:
if platform_name == "Windows": if platform_name == "Windows":
@@ -262,15 +276,12 @@ def launch_app():
@app.route('/screenshot', methods=['GET']) @app.route('/screenshot', methods=['GET'])
def capture_screen_with_cursor(): def capture_screen_with_cursor():
# fixme: when running on virtual machines, the cursor is not captured, don't know why
file_path = os.path.join(os.path.dirname(__file__), "screenshots", "screenshot.png") file_path = os.path.join(os.path.dirname(__file__), "screenshots", "screenshot.png")
user_platform = platform.system() user_platform = platform.system()
# Ensure the screenshots directory exists # Ensure the screenshots directory exists
os.makedirs(os.path.dirname(file_path), exist_ok=True) os.makedirs(os.path.dirname(file_path), exist_ok=True)
# fixme: This is a temporary fix for the cursor not being captured on Windows and Linux
if user_platform == "Windows": if user_platform == "Windows":
def get_cursor(): def get_cursor():
hcursor = win32gui.GetCursorInfo()[1] hcursor = win32gui.GetCursorInfo()[1]
@@ -303,19 +314,53 @@ def capture_screen_with_cursor():
ratio = ctypes.windll.shcore.GetScaleFactorForDevice(0) / 100 ratio = ctypes.windll.shcore.GetScaleFactorForDevice(0) / 100
# get logical screen size
user32 = ctypes.windll.user32
logical_width = user32.GetSystemMetrics(0)
logical_height = user32.GetSystemMetrics(1)
# ===== Key fix: get cursor position before taking screenshot =====
# win32gui.GetCursorPos() returns logical coordinates (consistent with pyautogui)
pos_win = win32gui.GetCursorPos()
logger.info(f"Cursor position (logical coordinates): {pos_win}")
# Take screenshot immediately to reduce time difference
img = ImageGrab.grab(bbox=None, include_layered_windows=True) img = ImageGrab.grab(bbox=None, include_layered_windows=True)
# =============================================
# ===== DPI scaling fix =====
if ratio != 1.0:
physical_width, physical_height = img.size
logger.info(f"Detected DPI scaling: {ratio}x ({ratio*100}%)")
logger.info(f"Physical screenshot size: {physical_width}x{physical_height}")
logger.info(f"Logical resolution: {logical_width}x{logical_height}")
logger.info(f"Resizing screenshot to match logical resolution...")
img = img.resize((logical_width, logical_height), Image.Resampling.LANCZOS)
logger.info(f"Screenshot resized to: {img.size}")
# ==========================
try: try:
cursor, (hotspotx, hotspoty) = get_cursor() cursor, (hotspotx, hotspoty) = get_cursor()
pos_win = win32gui.GetCursorPos() # ===== Cursor position handling =====
pos = (round(pos_win[0]*ratio - hotspotx), round(pos_win[1]*ratio - hotspoty)) # win32gui.GetCursorPos() and pyautogui both use logical coordinates
# The screenshot has been resized to logical resolution, so use directly
logical_cursor_x = pos_win[0]
logical_cursor_y = pos_win[1]
pos = (logical_cursor_x - hotspotx, logical_cursor_y - hotspoty)
logger.info(f"Cursor position (logical coordinates): ({logical_cursor_x}, {logical_cursor_y})")
logger.info(f"Hotspot offset: ({hotspotx}, {hotspoty})")
logger.info(f"Final paste position: {pos}")
# ===================================
img.paste(cursor, pos, cursor) img.paste(cursor, pos, cursor)
except Exception as e: except Exception as e:
logger.warning(f"Failed to capture cursor on Windows, screenshot will not have a cursor. Error: {e}") logger.warning(f"Failed to capture cursor on Windows, screenshot will not include cursor. Error: {e}")
img.save(file_path) img.save(file_path)
elif user_platform == "Linux": elif user_platform == "Linux":
cursor_obj = Xcursor() cursor_obj = Xcursor()
imgarray = cursor_obj.getCursorImageArrayFast() imgarray = cursor_obj.getCursorImageArrayFast()
@@ -324,17 +369,19 @@ def capture_screen_with_cursor():
cursor_x, cursor_y = pyautogui.position() cursor_x, cursor_y = pyautogui.position()
screenshot.paste(cursor_img, (cursor_x, cursor_y), cursor_img) screenshot.paste(cursor_img, (cursor_x, cursor_y), cursor_img)
screenshot.save(file_path) screenshot.save(file_path)
elif user_platform == "Darwin": # (Mac OS) elif user_platform == "Darwin": # (Mac OS)
# Use the screencapture utility to capture the screen with the cursor
subprocess.run(["screencapture", "-C", file_path]) subprocess.run(["screencapture", "-C", file_path])
else: else:
logger.warning(f"The platform you're using ({user_platform}) is not currently supported") logger.warning(f"The platform you're using ({user_platform}) is not currently supported")
return send_file(file_path, mimetype='image/png') return send_file(file_path, mimetype='image/png')
def _has_active_terminal(desktop: Accessible) -> bool: def _has_active_terminal(desktop: Accessible) -> bool:
""" A quick check whether the terminal window is open and active. """ A quick check whether the terminal window is open and active (Linux only).
""" """
for app in desktop: for app in desktop:
if app.getRoleName() == "application" and app.name == "gnome-terminal-server": if app.getRoleName() == "application" and app.name == "gnome-terminal-server":
@@ -344,6 +391,87 @@ def _has_active_terminal(desktop: Accessible) -> bool:
return False return False
def _get_windows_terminal_output() -> Optional[str]:
""" Get terminal output on Windows platform.
Supports Windows Terminal, PowerShell, Command Prompt, and ConHost.
"""
try:
from pywinauto import Desktop
from pywinauto.findwindows import ElementNotFoundError
desktop = Desktop(backend="uia")
# Common terminal applications on Windows
terminal_apps = [
"WindowsTerminal.exe", # Windows Terminal
"powershell.exe", # PowerShell
"pwsh.exe", # PowerShell Core
"cmd.exe", # Command Prompt
"conhost.exe" # Console Host
]
# Try to find active terminal windows
for window in desktop.windows():
try:
# Check if window is visible and not minimized
if not window.is_visible() or window.is_minimized():
continue
# Get window process name
process_name = window.element_info.name.lower()
# Check if this is a terminal window
is_terminal = False
for term_app in terminal_apps:
if term_app.lower() in process_name or \
any(term_name in process_name for term_name in ['terminal', 'powershell', 'command prompt', 'cmd']):
is_terminal = True
break
if not is_terminal:
continue
# Try to get text content from the terminal
# First, try to find console/edit controls that contain the output
try:
# For Windows Terminal and modern consoles
# Look for Edit or Document controls that contain the text
text_controls = window.descendants(control_type="Edit")
if not text_controls:
text_controls = window.descendants(control_type="Document")
if not text_controls:
text_controls = window.descendants(control_type="Text")
for control in text_controls:
try:
text = control.window_text()
if text and len(text.strip()) > 0:
return text.rstrip()
except:
pass
# If no text controls found, try to get the window text directly
window_text = window.window_text()
if window_text and len(window_text.strip()) > 0:
# Filter out just the window title
if window_text not in ['Windows PowerShell', 'Command Prompt', 'PowerShell', 'Administrator: Windows PowerShell']:
return window_text.rstrip()
except Exception as e:
logger.debug(f"Error getting text from window {process_name}: {e}")
continue
except Exception as e:
logger.debug(f"Error processing window: {e}")
continue
return None
except Exception as e:
logger.error(f"Error in _get_windows_terminal_output: {e}")
return None
@app.route('/terminal', methods=['GET']) @app.route('/terminal', methods=['GET'])
def get_terminal_output(): def get_terminal_output():
user_platform = platform.system() user_platform = platform.system()
@@ -358,8 +486,10 @@ def get_terminal_output():
xpath = '//application[@name="gnome-terminal-server"]/frame[@st:active="true"]//terminal[@st:focused="true"]' xpath = '//application[@name="gnome-terminal-server"]/frame[@st:active="true"]//terminal[@st:focused="true"]'
terminals: List[_Element] = desktop_xml.xpath(xpath, namespaces=_accessibility_ns_map_ubuntu) terminals: List[_Element] = desktop_xml.xpath(xpath, namespaces=_accessibility_ns_map_ubuntu)
output = terminals[0].text.rstrip() if len(terminals) == 1 else None output = terminals[0].text.rstrip() if len(terminals) == 1 else None
else: # windows and macos platform is not implemented currently elif user_platform == "Windows":
# raise NotImplementedError output = _get_windows_terminal_output()
logger.debug(f"Terminal output retrieved: {output}")
else: # macOS platform is not implemented currently
return "Currently not implemented for platform {:}.".format(platform.platform()), 500 return "Currently not implemented for platform {:}.".format(platform.platform()), 500
return jsonify({"output": output, "status": "success"}) return jsonify({"output": output, "status": "success"})
except Exception as e: except Exception as e:
@@ -989,6 +1119,9 @@ def get_window_size():
else: else:
return jsonify({"error": "app_class_name is required"}), 400 return jsonify({"error": "app_class_name is required"}), 400
if platform_name != "Linux":
return jsonify({"error": "window_size is only supported on Linux"}), 501
d = display.Display() d = display.Display()
root = d.screen().root root = d.screen().root
window_ids = root.get_full_property(d.intern_atom('_NET_CLIENT_LIST'), X.AnyPropertyType).value window_ids = root.get_full_property(d.intern_atom('_NET_CLIENT_LIST'), X.AnyPropertyType).value
@@ -1505,11 +1638,19 @@ def start_recording():
logger.error(f"Error removing old recording file: {e}") logger.error(f"Error removing old recording file: {e}")
return jsonify({'status': 'error', 'message': f'Failed to remove old recording file: {e}'}), 500 return jsonify({'status': 'error', 'message': f'Failed to remove old recording file: {e}'}), 500
if platform_name == "Linux":
d = display.Display() d = display.Display()
screen_width = d.screen().width_in_pixels screen_width = d.screen().width_in_pixels
screen_height = d.screen().height_in_pixels screen_height = d.screen().height_in_pixels
start_command = f"ffmpeg -y -f x11grab -draw_mouse 1 -s {screen_width}x{screen_height} -i :0.0 -c:v libx264 -r 30 {recording_path}" start_command = f"ffmpeg -y -f x11grab -draw_mouse 1 -s {screen_width}x{screen_height} -i :0.0 -c:v libx264 -r 30 {recording_path}"
elif platform_name == "Windows":
user32 = ctypes.windll.user32
screen_width = user32.GetSystemMetrics(0)
screen_height = user32.GetSystemMetrics(1)
# Use gdigrab for Windows screen capture
start_command = f"ffmpeg -y -f gdigrab -draw_mouse 1 -framerate 30 -video_size {screen_width}x{screen_height} -i desktop -c:v libx264 -r 30 {recording_path}"
else:
return jsonify({'status': 'error', 'message': f'Recording not supported on {platform_name}'}), 501
# Use stderr=PIPE to capture potential errors from ffmpeg # Use stderr=PIPE to capture potential errors from ffmpeg
recording_process = subprocess.Popen(shlex.split(start_command), recording_process = subprocess.Popen(shlex.split(start_command),
@@ -1544,11 +1685,22 @@ def end_recording():
error_output = "" error_output = ""
try: try:
# Send SIGINT for a graceful shutdown, allowing ffmpeg to finalize the file. # Send SIGINT for a graceful shutdown, allowing ffmpeg to finalize the file.
# On Windows, use CTRL_C_EVENT; on Unix, use SIGINT
if platform_name == "Windows":
# On Windows, we need to terminate the process gracefully
# ffmpeg responds to standard input 'q' to quit gracefully
try:
recording_process.stdin.write(b'q')
recording_process.stdin.flush()
except:
# If stdin is not available, use terminate
recording_process.terminate()
else:
recording_process.send_signal(signal.SIGINT) recording_process.send_signal(signal.SIGINT)
# Wait for ffmpeg to terminate. communicate() gets output and waits. # Wait for ffmpeg to terminate. communicate() gets output and waits.
_, error_output = recording_process.communicate(timeout=15) _, error_output = recording_process.communicate(timeout=15)
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
logger.error("ffmpeg did not respond to SIGINT, killing the process.") logger.error("ffmpeg did not respond to stop signal, killing the process.")
recording_process.kill() recording_process.kill()
# After killing, communicate to get any remaining output. # After killing, communicate to get any remaining output.
_, error_output = recording_process.communicate() _, error_output = recording_process.communicate()
@@ -1589,8 +1741,9 @@ def run_python():
f.write(code) f.write(code)
# Execute the file using subprocess to capture all output # Execute the file using subprocess to capture all output
# Use sys.executable to use the same Python interpreter as the Flask server
result = subprocess.run( result = subprocess.run(
['/usr/bin/python3', temp_filename], [sys.executable, temp_filename],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, text=True,

View File

@@ -0,0 +1,36 @@
{
"id": "jade_test",
"snapshot": "snapshot",
"instruction": "请打开桌面上的 JADE 6.5 软件",
"source": "custom",
"config": [],
"trajectory": "trajectories/",
"related_apps": [
"jade"
],
"evaluator": {
"postconfig": [
{
"type": "sleep",
"parameters": {
"seconds": 3
}
}
],
"func": "check_include_exclude",
"result": {
"type": "vm_command_line",
"command": "tasklist | findstr /i jade"
},
"expected": {
"type": "rule",
"rules": {
"include": ["jade"],
"exclude": []
}
}
},
"proxy": false,
"fixed_ip": false,
"possibility_of_env_change": "low"
}

View File

@@ -387,5 +387,8 @@
"dcbe20e8-647f-4f1d-8696-f1c5bbb570e3", "dcbe20e8-647f-4f1d-8696-f1c5bbb570e3",
"7c4cc09e-7a92-40dd-8338-b2286535c4ed", "7c4cc09e-7a92-40dd-8338-b2286535c4ed",
"971cbb5b-3cbf-4ff7-9e24-b5c84fcebfa6" "971cbb5b-3cbf-4ff7-9e24-b5c84fcebfa6"
],
"jade": [
"jade_test"
] ]
} }

View File

@@ -7,10 +7,19 @@ Appends task completion results to results.json in real-time.
import json import json
import os import os
import time import time
import fcntl import platform
from pathlib import Path from pathlib import Path
from typing import Dict, Any, Optional from typing import Dict, Any, Optional
# Import fcntl only on Unix-like systems (Linux, macOS)
# On Windows, we'll use msvcrt for file locking
if platform.system() != "Windows":
import fcntl
HAS_FCNTL = True
else:
import msvcrt
HAS_FCNTL = False
def extract_domain_from_path(result_path: str) -> str: def extract_domain_from_path(result_path: str) -> str:
""" """
@@ -66,8 +75,12 @@ def append_task_result(
# Thread-safe JSON append with file locking # Thread-safe JSON append with file locking
try: try:
with open(results_file, 'a+') as f: with open(results_file, 'a+') as f:
# Lock the file for exclusive access # Lock the file for exclusive access (platform-specific)
if HAS_FCNTL:
fcntl.flock(f.fileno(), fcntl.LOCK_EX) fcntl.flock(f.fileno(), fcntl.LOCK_EX)
else:
# Windows file locking using msvcrt
msvcrt.locking(f.fileno(), msvcrt.LK_LOCK, 1)
try: try:
# Move to beginning to read existing content # Move to beginning to read existing content
@@ -95,8 +108,12 @@ def append_task_result(
f.write('\n') # Add newline for readability f.write('\n') # Add newline for readability
finally: finally:
# Always unlock the file # Always unlock the file (platform-specific)
if HAS_FCNTL:
fcntl.flock(f.fileno(), fcntl.LOCK_UN) fcntl.flock(f.fileno(), fcntl.LOCK_UN)
else:
# Windows unlock using msvcrt
msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, 1)
print(f"📝 Logged result: {domain}/{task_id} -> {result_entry['status']} (score: {score})") print(f"📝 Logged result: {domain}/{task_id} -> {result_entry['status']} (score: {score})")

View File

@@ -13,29 +13,43 @@ def run_single_example(agent, env, example, max_steps, instruction, args, exampl
runtime_logger = setup_logger(example, example_result_dir) runtime_logger = setup_logger(example, example_result_dir)
# Reset environment first to get fresh VM IP # Reset environment first to get fresh VM IP
env.reset(task_config=example) # env.reset(task_config=example)
# logger.info("=======Environment reset completed=======")
# Reset agent with fresh VM IP (for snapshot reverts) # # Reset agent with fresh VM IP (for snapshot reverts)
try: # try:
agent.reset(runtime_logger, vm_ip=env.vm_ip) # agent.reset(runtime_logger, vm_ip=env.vm_ip)
except Exception as e: # except Exception as e:
agent.reset(vm_ip=env.vm_ip) # agent.reset(vm_ip=env.vm_ip)
time.sleep(60) # Wait for the environment to be ready # time.sleep(10) # Wait for the environment to be ready
# get initial observation
logger.info("Getting initial observation...")
obs = env._get_obs() # Get the initial observation obs = env._get_obs() # Get the initial observation
logger.info("Initial observation obtained.")
done = False done = False
step_idx = 0 step_idx = 0
if getattr(args, 'enable_recording', False):
env.controller.start_recording() env.controller.start_recording()
while not done and step_idx < max_steps: while not done and step_idx < max_steps:
logger.info(f"Step {step_idx + 1} prediction...")
response, actions = agent.predict( response, actions = agent.predict(
instruction, instruction,
obs obs
) )
logger.info(f"Response: {response}")
logger.info(f"Actions: {actions}")
logger.info(f"Executing actions...")
for action in actions: for action in actions:
# Capture the timestamp before executing the action # Capture the timestamp before executing the action
action_timestamp = datetime.datetime.now().strftime("%Y%m%d@%H%M%S%f") action_timestamp = datetime.datetime.now().strftime("%Y%m%d@%H%M%S%f")
logger.info("Step %d: %s", step_idx + 1, action) logger.info("Step %d: %s", step_idx + 1, action)
logger.info("执行动作中...")
obs, reward, done, info = env.step(action, args.sleep_after_execution) obs, reward, done, info = env.step(action, args.sleep_after_execution)
logger.info("动作执行完成。")
logger.info("Reward: %.2f", reward) logger.info("Reward: %.2f", reward)
logger.info("Done: %s", done) logger.info("Done: %s", done)
@@ -69,6 +83,7 @@ def run_single_example(agent, env, example, max_steps, instruction, args, exampl
# Log task completion to results.json # Log task completion to results.json
log_task_completion(example, result, example_result_dir, args) log_task_completion(example, result, example_result_dir, args)
if getattr(args, 'enable_recording', False):
env.controller.end_recording(os.path.join(example_result_dir, "recording.mp4")) env.controller.end_recording(os.path.join(example_result_dir, "recording.mp4"))

View File

@@ -49,6 +49,48 @@ def encode_image(image_content):
return base64.b64encode(image_content).decode('utf-8') return base64.b64encode(image_content).decode('utf-8')
def compress_screenshot(image_bytes, quality=75, resize_ratio=None):
"""
Compress screenshot to reduce file size while maintaining resolution.
Args:
image_bytes: Raw image bytes (PNG format)
quality: JPEG quality (1-100, default 75)
resize_ratio: Optional resize ratio (e.g., 0.5 for 50% size). None = keep original size.
Returns:
Compressed image bytes in JPEG format
"""
try:
# Open image from bytes
img = Image.open(BytesIO(image_bytes))
# Optionally resize if ratio is provided
if resize_ratio and resize_ratio != 1.0:
new_size = (int(img.size[0] * resize_ratio), int(img.size[1] * resize_ratio))
img = img.resize(new_size, Image.Resampling.LANCZOS)
# Convert to RGB if necessary (JPEG doesn't support alpha channel)
if img.mode in ('RGBA', 'LA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[-1] if img.mode in ('RGBA', 'LA') else None)
img = background
# Save as JPEG with compression
output = BytesIO()
img.save(output, format='JPEG', quality=quality, optimize=True)
compressed_size = len(output.getvalue())
logger.debug(f"Screenshot compressed: original={len(image_bytes)/1024:.1f}KB, compressed={compressed_size/1024:.1f}KB, ratio={compressed_size/len(image_bytes):.2%}")
return output.getvalue()
except Exception as e:
logger.warning(f"Failed to compress screenshot: {e}, using original")
return image_bytes
def encoded_img_to_pil_img(data_str): def encoded_img_to_pil_img(data_str):
base64_str = data_str.replace("data:image/png;base64,", "") base64_str = data_str.replace("data:image/png;base64,", "")
image_data = base64.b64decode(base64_str) image_data = base64.b64decode(base64_str)
@@ -236,7 +278,9 @@ class PromptAgent:
# observation_type can be in ["screenshot", "a11y_tree", "screenshot_a11y_tree", "som"] # observation_type can be in ["screenshot", "a11y_tree", "screenshot_a11y_tree", "som"]
max_trajectory_length=3, max_trajectory_length=3,
a11y_tree_max_tokens=10000, a11y_tree_max_tokens=10000,
client_password="password" client_password="password",
screen_width=1920,
screen_height=1080
): ):
self.platform = platform self.platform = platform
self.model = model self.model = model
@@ -248,6 +292,8 @@ class PromptAgent:
self.max_trajectory_length = max_trajectory_length self.max_trajectory_length = max_trajectory_length
self.a11y_tree_max_tokens = a11y_tree_max_tokens self.a11y_tree_max_tokens = a11y_tree_max_tokens
self.client_password = client_password self.client_password = client_password
self.screen_width = screen_width
self.screen_height = screen_height
self.thoughts = [] self.thoughts = []
self.actions = [] self.actions = []
@@ -284,7 +330,7 @@ class PromptAgent:
else: else:
raise ValueError("Invalid experiment type: " + observation_type) raise ValueError("Invalid experiment type: " + observation_type)
self.system_message = self.system_message.format(CLIENT_PASSWORD=self.client_password) self.system_message = self.system_message.format(CLIENT_PASSWORD=self.client_password, SCREEN_WIDTH=self.screen_width, SCREEN_HEIGHT=self.screen_height)
def predict(self, instruction: str, obs: Dict) -> List: def predict(self, instruction: str, obs: Dict) -> List:
""" """
@@ -342,8 +388,8 @@ class PromptAgent:
{ {
"type": "image_url", "type": "image_url",
"image_url": { "image_url": {
"url": f"data:image/png;base64,{_screenshot}", "url": f"data:image/jpeg;base64,{_screenshot}",
"detail": "high" "detail": "auto"
} }
} }
] ]
@@ -361,8 +407,8 @@ class PromptAgent:
{ {
"type": "image_url", "type": "image_url",
"image_url": { "image_url": {
"url": f"data:image/png;base64,{_screenshot}", "url": f"data:image/jpeg;base64,{_screenshot}",
"detail": "high" "detail": "auto"
} }
} }
] ]
@@ -380,8 +426,8 @@ class PromptAgent:
{ {
"type": "image_url", "type": "image_url",
"image_url": { "image_url": {
"url": f"data:image/png;base64,{_screenshot}", "url": f"data:image/jpeg;base64,{_screenshot}",
"detail": "high" "detail": "auto"
} }
} }
] ]
@@ -414,7 +460,9 @@ class PromptAgent:
# {{{1 # {{{1
if self.observation_type in ["screenshot", "screenshot_a11y_tree"]: if self.observation_type in ["screenshot", "screenshot_a11y_tree"]:
base64_image = encode_image(obs["screenshot"]) # Compress screenshot to JPEG (keep original resolution for accurate coordinates)
compressed_screenshot = compress_screenshot(obs["screenshot"], quality=75)
base64_image = encode_image(compressed_screenshot)
linearized_accessibility_tree = linearize_accessibility_tree(accessibility_tree=obs["accessibility_tree"], linearized_accessibility_tree = linearize_accessibility_tree(accessibility_tree=obs["accessibility_tree"],
platform=self.platform) if self.observation_type == "screenshot_a11y_tree" else None platform=self.platform) if self.observation_type == "screenshot_a11y_tree" else None
logger.debug("LINEAR AT: %s", linearized_accessibility_tree) logger.debug("LINEAR AT: %s", linearized_accessibility_tree)
@@ -447,8 +495,8 @@ class PromptAgent:
{ {
"type": "image_url", "type": "image_url",
"image_url": { "image_url": {
"url": f"data:image/png;base64,{base64_image}", "url": f"data:image/jpeg;base64,{base64_image}",
"detail": "high" "detail": "auto"
} }
} }
] ]
@@ -481,7 +529,9 @@ class PromptAgent:
# Add som to the screenshot # Add som to the screenshot
masks, drew_nodes, tagged_screenshot, linearized_accessibility_tree = tag_screenshot(obs["screenshot"], obs[ masks, drew_nodes, tagged_screenshot, linearized_accessibility_tree = tag_screenshot(obs["screenshot"], obs[
"accessibility_tree"], self.platform) "accessibility_tree"], self.platform)
base64_image = encode_image(tagged_screenshot) # Compress tagged screenshot (keep original resolution)
compressed_screenshot = compress_screenshot(tagged_screenshot, quality=75)
base64_image = encode_image(compressed_screenshot)
logger.debug("LINEAR AT: %s", linearized_accessibility_tree) logger.debug("LINEAR AT: %s", linearized_accessibility_tree)
if linearized_accessibility_tree: if linearized_accessibility_tree:
@@ -504,8 +554,8 @@ class PromptAgent:
{ {
"type": "image_url", "type": "image_url",
"image_url": { "image_url": {
"url": f"data:image/png;base64,{base64_image}", "url": f"data:image/jpeg;base64,{base64_image}",
"detail": "high" "detail": "auto"
} }
} }
] ]
@@ -523,7 +573,7 @@ class PromptAgent:
"model": self.model, "model": self.model,
"messages": messages, "messages": messages,
"max_tokens": self.max_tokens, "max_tokens": self.max_tokens,
"top_p": self.top_p, # "top_p": self.top_p,
"temperature": self.temperature "temperature": self.temperature
}) })
except Exception as e: except Exception as e:
@@ -691,8 +741,8 @@ class PromptAgent:
logger.debug("CLAUDE MESSAGE: %s", repr(claude_messages)) logger.debug("CLAUDE MESSAGE: %s", repr(claude_messages))
headers = { headers = {
"x-api-key": os.environ["ANTHROPIC_API_KEY"], "x-api-key": os.environ["OPENAI_API_KEY"],
"anthropic-version": "2023-06-01", # "anthropic-version": "2023-06-01",
"content-type": "application/json" "content-type": "application/json"
} }
@@ -705,7 +755,7 @@ class PromptAgent:
} }
response = requests.post( response = requests.post(
"https://api.anthropic.com/v1/messages", "https://api.apiyi.com/v1/messages",
headers=headers, headers=headers,
json=payload json=payload
) )

View File

@@ -317,7 +317,26 @@ Previous actions:
args = tool_call["arguments"] args = tool_call["arguments"]
action = args["action"] action = args["action"]
if action == "left_click": def _clean_keys(raw_keys):
keys = raw_keys if isinstance(raw_keys, list) else [raw_keys]
cleaned_keys = []
for key in keys:
if isinstance(key, str):
if key.startswith("keys=["):
key = key[6:]
if key.endswith("]"):
key = key[:-1]
if key.startswith("['") or key.startswith('["'):
key = key[2:] if len(key) > 2 else key
if key.endswith("']") or key.endswith('"]'):
key = key[:-2] if len(key) > 2 else key
key = key.strip()
cleaned_keys.append(key)
else:
cleaned_keys.append(key)
return cleaned_keys
if action == "left_click" or action == "click":
if "coordinate" in args: if "coordinate" in args:
x, y = args["coordinate"] x, y = args["coordinate"]
adj_x, adj_y = adjust_coordinates(x, y) adj_x, adj_y = adjust_coordinates(x, y)
@@ -355,6 +374,16 @@ Previous actions:
else: else:
pyautogui_code.append("pyautogui.doubleClick()") pyautogui_code.append("pyautogui.doubleClick()")
elif action == "triple_click":
if "coordinate" in args:
x, y = args["coordinate"]
adj_x, adj_y = adjust_coordinates(x, y)
pyautogui_code.append(
f"pyautogui.tripleClick({adj_x}, {adj_y})"
)
else:
pyautogui_code.append("pyautogui.tripleClick()")
elif action == "type": elif action == "type":
text = args.get("text", "") text = args.get("text", "")
@@ -383,24 +412,7 @@ Previous actions:
elif action == "key": elif action == "key":
keys = args.get("keys", []) keys = _clean_keys(args.get("keys", []))
if isinstance(keys, list):
cleaned_keys = []
for key in keys:
if isinstance(key, str):
if key.startswith("keys=["):
key = key[6:]
if key.endswith("]"):
key = key[:-1]
if key.startswith("['") or key.startswith('["'):
key = key[2:] if len(key) > 2 else key
if key.endswith("']") or key.endswith('"]'):
key = key[:-2] if len(key) > 2 else key
key = key.strip()
cleaned_keys.append(key)
else:
cleaned_keys.append(key)
keys = cleaned_keys
keys_str = ", ".join([f"'{key}'" for key in keys]) keys_str = ", ".join([f"'{key}'" for key in keys])
if len(keys) > 1: if len(keys) > 1:
@@ -408,6 +420,16 @@ Previous actions:
else: else:
pyautogui_code.append(f"pyautogui.press({keys_str})") pyautogui_code.append(f"pyautogui.press({keys_str})")
elif action == "key_down":
keys = _clean_keys(args.get("keys", []))
for k in keys:
pyautogui_code.append(f"pyautogui.keyDown('{k}')")
elif action == "key_up":
keys = _clean_keys(args.get("keys", []))
for k in reversed(keys):
pyautogui_code.append(f"pyautogui.keyUp('{k}')")
elif action == "scroll": elif action == "scroll":
pixels = args.get("pixels", 0) pixels = args.get("pixels", 0)
pyautogui_code.append(f"pyautogui.scroll({pixels})") pyautogui_code.append(f"pyautogui.scroll({pixels})")
@@ -416,6 +438,14 @@ Previous actions:
pyautogui_code.append("WAIT") pyautogui_code.append("WAIT")
elif action == "terminate": elif action == "terminate":
# Termination should respect status:
# - success -> DONE
# - failure -> FAIL
# Backward compatible: missing status defaults to success.
status = args.get("status", "success")
if str(status).lower() == "failure":
pyautogui_code.append("FAIL")
else:
pyautogui_code.append("DONE") pyautogui_code.append("DONE")
elif action == "mouse_move": elif action == "mouse_move":
@@ -481,7 +511,11 @@ Previous actions:
process_tool_call("\n".join(current_tool_call)) process_tool_call("\n".join(current_tool_call))
if not low_level_instruction and len(pyautogui_code) > 0: if not low_level_instruction and len(pyautogui_code) > 0:
action_type = pyautogui_code[0].split(".", 1)[1].split("(", 1)[0] first_action = pyautogui_code[0]
if "." in first_action:
action_type = first_action.split(".", 1)[1].split("(", 1)[0]
else:
action_type = first_action.lower()
low_level_instruction = f"Performing {action_type} action" low_level_instruction = f"Performing {action_type} action"
return low_level_instruction, pyautogui_code return low_level_instruction, pyautogui_code

View File

@@ -60,6 +60,8 @@ S1_ACTION_HISTORY_TEMPLATE = "## Action:\n{action}\n"
# S2 Prompts # S2 Prompts
S2_ACTION_DESCRIPTION = """ S2_ACTION_DESCRIPTION = """
* `key`: Performs key down presses on the arguments passed in order, then performs key releases in reverse order. * `key`: Performs key down presses on the arguments passed in order, then performs key releases in reverse order.
* `key_down`: Press and HOLD the specified key(s) down in order (no release). Use this for stateful holds like holding Shift while clicking.
* `key_up`: Release the specified key(s) in reverse order.
* `type`: Type a string of text on the keyboard. * `type`: Type a string of text on the keyboard.
* `mouse_move`: Move the cursor to a specified (x, y) pixel coordinate on the screen. * `mouse_move`: Move the cursor to a specified (x, y) pixel coordinate on the screen.
* `left_click`: Click the left mouse button at a specified (x, y) pixel coordinate on the screen. * `left_click`: Click the left mouse button at a specified (x, y) pixel coordinate on the screen.
@@ -67,7 +69,7 @@ S2_ACTION_DESCRIPTION = """
* `right_click`: Click the right mouse button at a specified (x, y) pixel coordinate on the screen. * `right_click`: Click the right mouse button at a specified (x, y) pixel coordinate on the screen.
* `middle_click`: Click the middle mouse button at a specified (x, y) pixel coordinate on the screen. * `middle_click`: Click the middle mouse button at a specified (x, y) pixel coordinate on the screen.
* `double_click`: Double-click the left mouse button at a specified (x, y) pixel coordinate on the screen. * `double_click`: Double-click the left mouse button at a specified (x, y) pixel coordinate on the screen.
* `triple_click`: Triple-click the left mouse button at a specified (x, y) pixel coordinate on the screen (simulated as double-click since it's the closest action). * `triple_click`: Triple-click the left mouse button at a specified (x, y) pixel coordinate on the screen.
* `scroll`: Performs a scroll of the mouse scroll wheel. * `scroll`: Performs a scroll of the mouse scroll wheel.
* `hscroll`: Performs a horizontal scroll (mapped to regular scroll). * `hscroll`: Performs a horizontal scroll (mapped to regular scroll).
* `wait`: Wait specified seconds for the change to happen. * `wait`: Wait specified seconds for the change to happen.
@@ -76,7 +78,7 @@ S2_ACTION_DESCRIPTION = """
""" """
S2_DESCRIPTION_PROMPT_TEMPLATE = """Use a mouse and keyboard to interact with a computer, and take screenshots. S2_DESCRIPTION_PROMPT_TEMPLATE = """Use a mouse and keyboard to interact with a computer, and take screenshots.
* This is an interface to a desktop GUI. You do not have access to a terminal or applications menu. You must click on desktop icons to start applications. * This is an interface to a desktop GUI. You must click on desktop icons to start applications.
* Some applications may take time to start or process actions, so you may need to wait and take successive screenshots to see the results of your actions. E.g. if you click on Firefox and a window doesn't open, try wait and taking another screenshot. * Some applications may take time to start or process actions, so you may need to wait and take successive screenshots to see the results of your actions. E.g. if you click on Firefox and a window doesn't open, try wait and taking another screenshot.
{resolution_info} {resolution_info}
* Whenever you intend to move the cursor to click on an element like an icon, you should consult a screenshot to determine the coordinates of the element before moving the cursor. * Whenever you intend to move the cursor to click on an element like an icon, you should consult a screenshot to determine the coordinates of the element before moving the cursor.
@@ -122,7 +124,8 @@ def build_s2_tools_def(description_prompt):
"action": { "action": {
"description": S2_ACTION_DESCRIPTION, "description": S2_ACTION_DESCRIPTION,
"enum": ["key", "type", "mouse_move", "left_click", "left_click_drag", "enum": ["key", "type", "mouse_move", "left_click", "left_click_drag",
"right_click", "middle_click", "double_click", "scroll", "wait", "terminate"], "right_click", "middle_click", "double_click", "triple_click", "scroll",
"wait", "terminate", "key_down", "key_up"],
"type": "string" "type": "string"
}, },
"keys": {"description": "Required only by `action=key`.", "type": "array"}, "keys": {"description": "Required only by `action=key`.", "type": "array"},

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,13 @@
# UiPath Screen Agent # UiPath Screen Agent
### 23 Dec 2025
- Updated the planner model to [Claude 4.5 Opus](https://www.anthropic.com/news/claude-opus-4-5)
- Updated the grounder model to an internally finetuned version of [Qwen3-VL](https://github.com/QwenLM/Qwen3-VL) and allowing it to predict "refusal" (similar to OSWorld-G) for elements that do not exist
- Added memory for storing relevant information across steps
- Improved utilization of the UI element detector for fine grained details (such as cell corners)
- Refactoring and various small fixes
### 18 Sep 2025
We propose a simple, yet effective implementation of a Computer Use Agent, which achieves a performance of **53.6%** on the **OSWorld** benchmark with 50 steps, demonstrating competitive results with a relatively lightweight setup and UI only actions. We propose a simple, yet effective implementation of a Computer Use Agent, which achieves a performance of **53.6%** on the **OSWorld** benchmark with 50 steps, demonstrating competitive results with a relatively lightweight setup and UI only actions.
Our system builds upon recent approaches in agentic computer use and follows the literature in adopting a two-stage architecture that separates high-level reasoning from low-level execution. Specifically, the system is composed of: Our system builds upon recent approaches in agentic computer use and follows the literature in adopting a two-stage architecture that separates high-level reasoning from low-level execution. Specifically, the system is composed of:
@@ -32,7 +40,7 @@ The interaction history is structured as a conversation: the user reports the ta
By combining the current state with this structured history, the Action Planner generates context-aware, informed predictions at every step, being able to reconstruct the sequence of actions that led him to this point, noticing eventual failures, and plan the subsequent steps. By combining the current state with this structured history, the Action Planner generates context-aware, informed predictions at every step, being able to reconstruct the sequence of actions that led him to this point, noticing eventual failures, and plan the subsequent steps.
We support a concise set of actions for interacting with the environment, focusing specifically on UI-related activities: We support a concise set of actions for interacting with the environment, focusing specifically on UI-related activities:
- Click (left, right, double click) - Click (left, right, double, triple, click)
- Type - Type
- Scroll - Scroll
- Drag - Drag
@@ -68,4 +76,3 @@ This process gives the model multiple opportunities to predict within a relevant
## Conclusion ## Conclusion
Our method offers a clean and simple yet competitive pipeline for Computer Use tasks. It is cost effective, minimizing token usage during planning, avoiding parallel planning and reliance on numerous past images, and incorporate only **direct UI actions** with refined grounding actions to improve accuracy. With this approach, we achieve **53.6%** accuracy on OSWorld with a 50-step horizon. Our method offers a clean and simple yet competitive pipeline for Computer Use tasks. It is cost effective, minimizing token usage during planning, avoiding parallel planning and reliance on numerous past images, and incorporate only **direct UI actions** with refined grounding actions to improve accuracy. With this approach, we achieve **53.6%** accuracy on OSWorld with a 50-step horizon.

View File

@@ -1,7 +1,9 @@
import datetime import datetime
import json import json
from collections import OrderedDict
import time import time
from collections import OrderedDict
from copy import deepcopy
import mm_agents.uipath.llm_client as llm_client import mm_agents.uipath.llm_client as llm_client
from mm_agents.uipath.types_utils import ( from mm_agents.uipath.types_utils import (
PlanAction, PlanAction,
@@ -11,43 +13,54 @@ from mm_agents.uipath.types_utils import (
) )
from mm_agents.uipath.action_planner_prompt_builder import ( from mm_agents.uipath.action_planner_prompt_builder import (
ComputerUseAgentInterface, ComputerUseAgentInterface,
PlanerCoTSections, PlanerCoTSectionsType,
user_command_template, user_command_template_chat,
user_task_info_template, user_task_info_template,
PlannerOutput,
) )
from mm_agents.uipath.utils import ValidationException, parse_message_json from mm_agents.uipath.utils import ValidationException, parse_message_json, ExecutionInfo
from mm_agents.uipath.memory import ShortTermMemoryManager
class PlannerOutput(object):
def __init__(self, plan_action: PlanAction, additional_sections: dict[str, str]):
self.plan_action = plan_action
self.thought = additional_sections["thought"]
self.review = additional_sections["review"]
self.additional_sections = {key: value for key, value in additional_sections.items() if key not in ["review", "thought"]}
class ActionPlanner(object): class ActionPlanner(object):
def __init__(self): def __init__(self):
self.number_history_steps_with_images = 2 self.number_history_steps_with_images = 2
self.computer_use_agent_interface = ComputerUseAgentInterface() self.computer_use_agent_interface = ComputerUseAgentInterface()
self.short_term_memory_manager = ShortTermMemoryManager()
def build_message_output_format_info(self) -> str: def build_message_output_format_info(self) -> str:
output_dict = OrderedDict({}) output_dict = OrderedDict({})
for _, value in PlanerCoTSections.items(): cot_sections: dict[str, dict] = self.computer_use_agent_interface.get_planner_cot_sections()
for _, value in cot_sections.items():
display = value["display"] display = value["display"]
description = value["description"] description = value["description"]
output_dict[display] = description output_dict[display] = description
output_dict["action"] = ( output_dict["action"] = "<The action to perform in JSON format as specified in the system message>"
"<The action to perform in JSON format as specified in the system message>"
)
return json.dumps(output_dict, indent=4, ensure_ascii=False) return json.dumps(output_dict, indent=4, ensure_ascii=False)
def get_step_content( def get_step_content(self, step: dict, following_step: dict | None) -> tuple[str, str]:
self, step: dict, following_step: dict | None
) -> tuple[str, str]:
content_dict = OrderedDict({}) content_dict = OrderedDict({})
observation_dict = OrderedDict({}) observation_dict = OrderedDict({})
observation_dict["Performed actions"] = step["actions"] observation_dict["Performed actions"] = deepcopy(step["actions"])
if ( def remove_unused_fields(action: list[dict], keys: list[str]):
"extracted_data" in step["additional_parameters"] for act in action:
): # if the step was an extraction step add the dummy extraction action for key in keys:
if key in act:
del act[key]
remove_unused_fields(observation_dict["Performed actions"], ["id", "result", "execution_error_message", "detected_items", "description"])
if "extracted_data" in step["additional_parameters"]: # if the step was an extraction step add the dummy extraction action
extraction_action = { extraction_action = {
"type": PlanActionType.ExtractData, "type": PlanActionType.ExtractData,
"description": step["description"], "description": step["description"],
@@ -56,24 +69,22 @@ class ActionPlanner(object):
observation_dict["Performed actions"] = [extraction_action] observation_dict["Performed actions"] = [extraction_action]
if following_step: if following_step:
observation_dict["Observation"] = following_step[ observation_dict["Observation"] = following_step["additional_parameters"].get("review", None)
"additional_parameters"
].get("review", None)
for key, value in PlanerCoTSections.items(): cot_sections = self.computer_use_agent_interface.get_planner_cot_sections()
if key != "review": for key, value in cot_sections.items():
if key not in [PlanerCoTSectionsType.Review, PlanerCoTSectionsType.Memory]:
param_value = step["additional_parameters"].get(key, None) param_value = step["additional_parameters"].get(key, None)
display_name = value["display"] display_name = value["display"]
content_dict[display_name] = param_value content_dict[display_name] = param_value
content_dict["actions"] = json.loads( content_dict["action"] = json.loads(step["additional_parameters"]["plan_action"])
step["additional_parameters"]["plan_action"]
)
content_dict = json.dumps(content_dict, indent=4, ensure_ascii=False) content_dict = json.dumps(content_dict, indent=4, ensure_ascii=False)
observation_dict = json.dumps(observation_dict, indent=4, ensure_ascii=False) observation_dict = json.dumps(observation_dict, indent=4, ensure_ascii=False)
return content_dict, observation_dict return content_dict, observation_dict
def build_messages_chat(self, state: State, execution_info: dict) -> list[dict]: def build_messages_chat(self, state: State, execution_state: ExecutionState) -> list[dict]:
execution_info = execution_state.execution_info
messages = [] messages = []
system_message = { system_message = {
"role": "system", "role": "system",
@@ -82,42 +93,45 @@ class ActionPlanner(object):
messages.append(system_message) messages.append(system_message)
start_index = max(0, len(state.previous_steps) - self.number_history_steps_with_images)
end_index = len(state.previous_steps)
images_dict = {index: state.previous_steps[index]["image"] for index in range(start_index, end_index)}
# Don't set it for the first iteration as the history is empty anyway
user_messages = state.task
if end_index == 0:
user_task_with_ref_imgs = ""
user_messages = [{"type": "text", "text": state.task}]
else:
user_task_with_ref_imgs = state.task
user_messages = [{"type": "text", "text": "Recall the task again:"}, {"type": "text", "text": state.task}]
user_task_info_message = { user_task_info_message = {
"role": "user", "role": "user",
"content": [ "content": [
{ {
"type": "text", "type": "text",
"text": user_task_info_template.format( "text": user_task_info_template.format(
task=state.task, task=user_task_with_ref_imgs,
current_date=datetime.datetime.now().strftime("%Y-%m-%d"), current_date=datetime.datetime.now().strftime("%Y-%m-%d"),
), ),
} }
], ],
} }
messages.append(user_task_info_message) messages.append(user_task_info_message)
start_index = max(
0, len(state.previous_steps) - self.number_history_steps_with_images
)
end_index = len(state.previous_steps)
for index in range(0, end_index): for index in range(0, end_index):
step = state.previous_steps[index] step = state.previous_steps[index]
if index >= start_index: if index >= start_index:
assert step["image"] is not None and len(step["image"]) > 0, ( image = images_dict.get(index, None)
"Step image is empty"
) assert image is not None and len(image) > 0, "Step image is empty"
user_image_message = { user_image_message = {
"role": "user", "role": "user",
"content": [ "content": [
{ {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image}"}},
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{step['image']}"
},
},
], ],
} }
messages.append(user_image_message) messages.append(user_image_message)
@@ -148,79 +162,98 @@ class ActionPlanner(object):
} }
messages.append(user_message_reply) messages.append(user_message_reply)
memory = json.loads(state.previous_steps[-1]["additional_parameters"].get("memory", "{}")) if len(state.previous_steps) > 0 else {}
memory_str = json.dumps(memory, indent=4, ensure_ascii=False) if len(memory) > 0 else "No memory."
last_user_message = { last_user_message = {
"role": "user", "role": "user",
"content": [ "content": user_messages
+ [
{ {
"type": "text", "type": "text",
"text": "Current screenshot:", "text": "Current screenshot:",
}, },
{ {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{state.image_base64}"}},
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{state.image_base64}"
},
},
{ {
"type": "text", "type": "text",
"text": user_command_template.format( "text": user_command_template_chat.format(
task=state.task, execution_info_message=self.build_execution_info_message(execution_info),
execution_info_message=self.build_execution_info_message(
execution_info
),
json_output_format=self.build_message_output_format_info(), json_output_format=self.build_message_output_format_info(),
memory=memory_str,
), ),
}, },
], ],
} }
messages.append(last_user_message) messages.append(last_user_message)
for raw_response in execution_info.responses:
if raw_response.grounding_error is not None:
ai_message = {
"role": "assistant",
"content": [
{
"type": "text",
"text": raw_response.raw_planning_prediction,
}
],
}
messages.append(ai_message)
user_message = {
"role": "user",
"content": [
{
"type": "text",
"text": f"Grounder model error detected. Could not identify the element with description: '{raw_response.grounding_error.element_description}', error {raw_response.grounding_error.message}. Possible reasons:the description is not precise enough for the grounder or the element is not visible on the screenshot. If providing a new description does not work, try to complete the action through another path than using that specific button (either by changing the element to be clicked or providing another action such as a hotkey if any exist).",
}
],
}
messages.append(user_message)
return messages return messages
def extract_response( def extract_response(self, response_content: str) -> tuple[PlanAction, dict[str, str]]:
self, response_content: str
) -> tuple[PlanAction, dict[str, str]]:
cot_sections_lst = list(PlanerCoTSections.keys())
additional_sections = OrderedDict({}) additional_sections = OrderedDict({})
response_json = parse_message_json(response_content) response_json = parse_message_json(response_content)
cot_sections = self.computer_use_agent_interface.get_planner_cot_sections()
cot_sections_lst = list(cot_sections.keys())
for section in cot_sections_lst: for section in cot_sections_lst:
section_display = PlanerCoTSections[section]["display"] section_display = cot_sections[section]["display"]
if section_display not in response_json: if section_display not in response_json:
raise ValidationException( raise ValidationException(f"Invalid response format, '{section_display}' key not found: {response_content}")
f"Invalid response format, '{section}' key not found: {response_content}" additional_sections[section] = response_json.get(section_display)
)
additional_sections[section] = response_json.get(
PlanerCoTSections[section]["display"]
)
if "action" not in response_json: if "action" not in response_json:
raise ValidationException( raise ValidationException(f"Invalid response format, 'action' key not found: {response_content}")
f"Invalid response format, 'action' key not found: {response_content}"
)
action_dict = response_json["action"] action_dict = response_json["action"]
plan_action = PlanAction.from_dict(self.correct_action_type(action_dict)) plan_action = PlanAction.from_dict(ActionPlanner.correct_action_type(action_dict))
if plan_action is None:
raise ValidationException(f"Invalid action format: {response_content}")
if plan_action.action_type == PlanActionType.Drag: if plan_action.action_type == PlanActionType.Drag:
self.computer_use_agent_interface.validate_action(plan_action) self.computer_use_agent_interface.validate_action(plan_action)
return plan_action, additional_sections return plan_action, additional_sections
def build_execution_info_message(self, execution_info: dict) -> str: def build_execution_info_message(self, execution_info: ExecutionInfo) -> str:
execution_info_message = "" execution_info_message = ""
if "planner_action_review" in execution_info: if execution_info.planner_action_review is not None:
action_description = execution_info["planner_action_review"][ action_description = execution_info.planner_action_review["action_description"]
"action_description" error_message = execution_info.planner_action_review["error_message"]
] execution_info_message = f"You predicted this action: '{action_description}' but it is not valid because: {error_message}. If the target element is not visible/fully visible on the screenshot, scroll first to make the target element visible. If the target element is not correct, change the action description with more precise element description using nearby context."
error_message = execution_info["planner_action_review"]["error_message"] elif execution_info.responses and len(execution_info.responses) > 0 and execution_info.responses[-1].grounding_error is not None:
grounding_error = execution_info.responses[-1].grounding_error
execution_info_message = f"You predicted this action: '{action_description}' but it is not valid because: {error_message}. If the target element is not visible on the screenshot, scroll first to make the target element visible. If the target element is not correct, change the action description with more precise element description using nearby context." error_message = str(grounding_error)
execution_info_message = f"The predicted is not valid because of this {error_message}. If the target element is not visible/fully visible on the screenshot, scroll first to make the target element visible. If the target element is not correct, change the action description with more precise element description using nearby context."
return execution_info_message return execution_info_message
def correct_action_type(self, response_json: dict) -> dict: @staticmethod
def correct_action_type(response_json: dict) -> dict:
action_type = response_json.get("type", "").lower() action_type = response_json.get("type", "").lower()
if action_type in ("press", "key_press", "press_key"): if action_type in ("press", "key_press", "press_key"):
response_json["type"] = "key_press" response_json["type"] = "key_press"
@@ -234,11 +267,13 @@ class ActionPlanner(object):
response_json["type"] = "wait" response_json["type"] = "wait"
return response_json return response_json
def predict(self, state: State, execution_state: ExecutionState) -> PlannerOutput: async def predict(self, state: State, execution_state: ExecutionState) -> PlannerOutput:
messages = self.build_messages_chat(state, execution_state.execution_info) messages = self.build_messages_chat(state, execution_state)
llm_messages = [message for message in messages] llm_messages = [message for message in messages]
repeat_count = 2 repeat_count = 3
plan, response_content = None, None response_content = ""
plan_action = None
additional_sections = {}
while repeat_count > 0: while repeat_count > 0:
try: try:
payload = { payload = {
@@ -250,13 +285,14 @@ class ActionPlanner(object):
response_content = llm_client.send_messages(payload) response_content = llm_client.send_messages(payload)
if response_content is None or len(response_content.strip()) == 0: if response_content is None or len(response_content.strip()) == 0:
raise ValidationException("Planner response is None or empty") raise ValidationException("Planner response is None or empty")
plan_action, additional_sections = self.extract_response(
str(response_content) plan_action, additional_sections = self.extract_response(str(response_content))
) llm_memory_response = additional_sections.get("memory", None)
plan = PlannerOutput(plan_action, additional_sections) memory_operations = self.short_term_memory_manager.extract_memory_operations(llm_memory_response)
execution_state.execution_info.current_response.raw_planning_prediction = response_content
break break
except ValidationException as e: except ValidationException as e:
time.sleep(5)
repeat_count -= 1 repeat_count -= 1
ai_message = { ai_message = {
"role": "assistant", "role": "assistant",
@@ -280,9 +316,15 @@ class ActionPlanner(object):
llm_messages = messages + [ai_message, error_message] llm_messages = messages + [ai_message, error_message]
if repeat_count == 0: if repeat_count == 0:
raise ValueError( raise ValueError(f"Invalid planner response format: {response_content}")
f"Invalid planner response format: {response_content}, {str(e)}" if plan_action is None:
)
if plan is None:
raise ValueError("Planner response is not valid") raise ValueError("Planner response is not valid")
return plan planner_output = PlannerOutput(
plan_action=plan_action,
additional_sections=additional_sections,
)
updated_memory = await self.short_term_memory_manager.get_updated_memory(
state, memory_operations, execution_state=execution_state
)
planner_output.additional_sections["memory"] = json.dumps(updated_memory, indent=4, ensure_ascii=False)
return planner_output

View File

@@ -1,8 +1,11 @@
from collections import OrderedDict from collections import OrderedDict
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from enum import Enum
from mm_agents.uipath.types_utils import PlanAction, key_maps from mm_agents.uipath.types_utils import PlanAction, key_maps
from mm_agents.uipath.utils import ValidationException from mm_agents.uipath.utils import ValidationException
from mm_agents.uipath.memory import memory_system_template
system_template = """You are a computer use agent that perform computer-related tasks. system_template = """You are a computer use agent that perform computer-related tasks.
You will be given a task, a current screenshot, and a list of previous actions. You need to predict the next action. You will be given a task, a current screenshot, and a list of previous actions. You need to predict the next action.
@@ -31,85 +34,138 @@ Your action response must be a valid JSON with the following format:
## Action examples: example of valid actions: ## Action examples: example of valid actions:
{examples} {examples}
## Important Notes: ## Action Sequence Example:
- Close any cookies, ads, login or registration etc pop-ups if not needed. Here is an example of the correct sequence for typing text into an input field.
- Before typing, ensure the input box is focused by clicking on it.
Step 1: Scroll to make the 'Username' input field fully visible.
{{
"type": "scroll",
"description": "Scroll page to make the 'Username' input field fully visible."
"parameters": {{"element_description": "the main page", "direction": "down", "distance": 3}}
}}
Step 2: Click the input field to focus it.
{{
"type": "click",
"description": "Click the 'Username' input field."
}}
Step 3: Type the desired text.
{{
"type": "type",
"description": "Type 'testuser' into the focused 'Username' input field.",
"parameters": {{
"text": "testuser"
}}
}}
## Important Rules:
CRITICAL: Always click to focus an input field before using the type action if it is not focused already from a previous step. The model must predict a click on the element, and then in the next step, predict the type action.
Close any cookies, ads, login or registration pop-ups if they are not needed for the task.
Before finish action, ensure all necessary data entries or selections are committed by performing appropriate actions (e.g., pressing 'Enter'/ 'Tab', Ctrl+S for saving documents or clicking 'Save', changing focus, or blurring the input field).
- **Strict Adherence**: Only perform actions the user has explicitly requested; avoid unnecessary steps. E.g. For colors, ensure that if user requested to use "green" you use the color named green, not light green or other shades.
- CRITICAL: Make sure the modified files or settings are saved and if no file name is specified in the user task, use the default settings that appear.
- Dismiss "Authentication required" prompts by clicking "Cancel".
- Leave windows/applications open at task completion.
- **Completion Criteria**: Only finish when all user requirements are met in full and all running commands have finished.
- **Impossibility Handling**: Return failure if completion is blocked by environmental constraints.
- You must never logout/close the computer, otherwise you won't be able to interact with the environment, if an action requires this, mark it as failure
""" """
user_command_template = """Recall Task Again: {task} user_message_template = """Here are the current information:
Check if the task is finished. If not provide the next action to perform. The current date is (YYYY-MM-DD): {current_date}
Remember: Task: {task}
- Perform the task on provided application(s) or website(s). You are not allowed to use the browser "address bar".
- Close any cookies, ads, login or registration etc pop-ups if not needed.
- Only one action at a time (never "click and type", "click and drag", "type and press", "press shift and click", etc..). Think of how to combine them in two consecutive actions obtaining the intended result or use an available action that can obtain it.
- For any opening input combobox, dropdown menu options, you must select an option or press Enter key to select default one.
- Click on input box to ensure is focused before typing. Otherwise, the input box will not accept the text.
- Once focusing on an input box, if it has a default pre-typed value (not placeholder which is usually grayed-out), remove the existing value first by clicking on "X" icon or using "Ctrl A" + "Backspace" or "Backspace" if the value is already selected.
- For search input, if no search button or suggestions popup after typing, press 'Enter' to trigger search.
- Retry the drag action on slider control if needed to refine the slider values closer to expected values.
- Scroll / Pageup / Pagedown to explore or extract more content/data if needed (prefer 'key_press' action with key 'Pageup', 'Pagedown' for faster scrolling). Particularly when extraction data from table with hidden rows or columns.
- Scroll action must have a 'direction' parameter. Finish action must have a 'status' parameter.
- If you modify some settings remember to save/apply them. If button is not visible try to scroll for it.
Most importantly, never type or click on element not visible on screenshot. Use scroll or pageup/pagedown to make the element visible first. Previous actions:
{history}
{execution_info_message}
Answer in json format:
{json_output_format}
""" """
PlanerCoTSections = OrderedDict(
{
"review": {
"display": "previous_action_result",
"description": "Briefly describe the previous action result and UI change on the screenshot to see if is correctly performed.",
},
"thought": {
"display": "thought",
"description": "Reason briefly about the next action to perform if the task is not finished.",
},
"action_description": {
"display": "action_description",
"description": "Describe the action to perform in a single sentence. The description must be precise and not rely on specific information in the current screen.",
},
}
)
### for chat conversation ### for chat conversation
user_task_info_template = """## Task Information: user_task_info_template = """## Task Information:
The current date is (YYYY-MM-DD): {current_date} The current date is (YYYY-MM-DD): {current_date}
Task: {task} Task: {task}
""" """
user_command_template_chat = """Current Memory: {memory}
Check if the task is finished. If not provide the next action to perform.
Remember:
- Perform the task on provided application(s) or website(s). You are not allowed to use the browser "address bar".
- Close any cookies, ads, login or registration etc pop-ups if not needed.
- Only one action at a time (never "click and type", "click and drag", "type and press" etc..).
- For any opening input combobox, dropdown menu options, you must select an option or press Enter key to select default one.
- Caret is not always visible in input box even when the input box is focused
- CRITICAL: Scroll to make the target element fully visible on the screenshot before clicking or typing on it. Never click or type on an element not fully visible on the screenshot.
- CRITICAL: Before typing ensure the element is focused by first clicking it. Otherwise, the input box will not accept the text.
- Once focusing on an input box, if it has a default pre-typed value (not placeholder which is usually grayed-out), remove the existing value first by clicking on "X" icon or using "Ctrl A" + "Backspace" or "Backspace" if the value is already selected.
- For search input, if no search button or suggestions popup after typing, press 'Enter' to trigger search.
- Retry the drag action on slider control if needed to refine the slider values closer to expected values.
- Scroll / Pageup / Pagedown to explore or extract more content/data if needed (prefer 'key_press' action with key 'Pageup', 'Pagedown' for faster scrolling). Particularly when extraction data from table with hidden rows or columns.
- Scroll action must have a 'direction' parameter. Finish action must have a 'status' parameter.
MOST IMPORTANTLY, never type or click on element not visible on screenshot. Use scroll or pageup/pagedown to make the element visible first.
{execution_info_message}
Answer in json format:
{json_output_format}
"""
user_command_template = """Recall Task Again: {task}\n""" + user_command_template_chat
class PlanerCoTSectionsType(str, Enum):
Review = "review"
Thought = "thought"
ActionDescription = "action_description"
Memory = "memory"
PlanerCoTSections = OrderedDict(
{
PlanerCoTSectionsType.Review: {
"display": "previous_action_result",
"description": "Briefly describe the previous action result and UI change on the screenshot to see if is correctly performed.",
},
PlanerCoTSectionsType.Thought: {"display": "thought", "description": "Reason briefly about the next action to perform if the task is not finished."},
PlanerCoTSectionsType.ActionDescription: {
"display": "action_description",
"description": "Describe the action to perform in a single sentence. The description must be precise and not rely on specific information in the current screen.",
},
PlanerCoTSectionsType.Memory: {
"display": "update_memory",
"description": "<Proceed with a memory update considering the previous actions. Emit a list of memory operations. If no memory update is needed, emit an empty list>",
},
}
)
@dataclass @dataclass
class ActionDefinition: class ActionDefinition:
"""Simple action definition with description, parameters, and examples"""
type: str type: str
description: str description: str
parameters: Optional[Dict[str, str]] = None parameters: Optional[Dict[str, str]] = None
examples: List[Dict[str, Any]] = field(default_factory=list) examples: List[Dict[str, Any]] = field(default_factory=list)
class PlannerOutput(object):
def __init__(self, plan_action: PlanAction, additional_sections: dict[str, str]):
self.plan_action = plan_action
self.thought = additional_sections["thought"]
self.review = additional_sections["review"]
self.additional_sections = {
key: value
for key, value in additional_sections.items()
if key not in ["review", "thought"]
}
class ComputerUseAgentInterface: class ComputerUseAgentInterface:
"""Simple computer use agent with modular action definitions"""
def __init__(self): def __init__(self):
self.ui_actions = {} self.ui_actions = {}
self.special_actions = {} self.special_actions = {}
self._setup_default_actions() self._setup_default_actions()
def get_planner_cot_sections(self) -> OrderedDict:
cot_sections = PlanerCoTSections.copy()
return cot_sections
def _setup_default_actions(self): def _setup_default_actions(self):
"""Define all available actions"""
# Click action - no parameters
self.add_action( self.add_action(
ActionDefinition( ActionDefinition(
type="click", type="click",
@@ -120,124 +176,121 @@ class ComputerUseAgentInterface:
"type": "click", "type": "click",
"description": "Click the 'X' icon in the input box", "description": "Click the 'X' icon in the input box",
}, },
{ {"type": "click", "description": "Click the first name input box to focus on it."},
"type": "click",
"description": "Click the first name input box to focus on it.",
},
], ],
) )
) )
# Right click action - no parameters
self.add_action( self.add_action(
ActionDefinition( ActionDefinition(
type="right_click", type="right_click",
description="Right click on a UI element", description="Right click on a UI element",
examples=[ examples=[{"type": "right_click", "description": "Right click on the first row from the patient table to open the context menu."}],
{
"type": "right_click",
"description": "Right click on the first row from the patient table to open the context menu.",
}
],
) )
) )
# Double click action - no parameters
self.add_action( self.add_action(
ActionDefinition( ActionDefinition(
type="double_click", type="double_click",
description="Double click on a UI element", description="Double click on a UI element",
examples=[ examples=[
{ {"type": "double_click", "description": "Double click word app icon to open the application."},
"type": "double_click",
"description": "Double click word app icon to open the application.",
},
], ],
) )
) )
# Triple click action - no parameters
self.add_action(
ActionDefinition(
type="triple_click",
description="Triple click on a UI element",
examples=[
{"type": "triple_click", "description": "Triple click the second paragraph to select it."},
],
)
)
# Type action - with text parameter
self.add_action( self.add_action(
ActionDefinition( ActionDefinition(
type="type", type="type",
description="Type text into a focused input field. Ensure the input box is focused before typing. To focus the input box, you may need to click on it first.", description="Type text into a focused input field. Ensure the input box is focused before typing. To focus the input box, you may need to click on it first.",
parameters={"text": "str - the text to be typed"}, parameters={"text": "str - the text to be typed"},
examples=[ examples=[
{ {"type": "type", "description": "Type 'John' in the first name input box.", "parameters": {"text": "John"}},
"type": "type", {"type": "type", "description": "Type 'Doe' in the last name input box.", "parameters": {"text": "Doe"}},
"description": "Type 'John' in the first name input box.", {"type": "type", "description": "Type 'Hello, world!' in the text area.", "parameters": {"text": "Hello, world!"}},
"parameters": {"text": "John"},
},
{
"type": "type",
"description": "Type 'Doe' in the last name input box.",
"parameters": {"text": "Doe"},
},
{
"type": "type",
"description": "Type 'Hello, world!' in the text area.",
"parameters": {"text": "Hello, world!"},
},
], ],
) )
) )
# Scroll action - with direction parameter
self.add_action( self.add_action(
ActionDefinition( ActionDefinition(
type="scroll", type="scroll",
description="Scroll an UI element in a specified direction", description="Scroll an UI element in a specified direction",
parameters={ parameters={
"element_description": "str - description of the element to be scrolled such that the executor can locate it",
"direction": "str - 'up', 'down', 'left', or 'right'", "direction": "str - 'up', 'down', 'left', or 'right'",
"distance": "int - the number of scroll steps (wheel “clicks”) to send.", "distance": "int - number of 'clicks' to scroll, e.g. on windows, 1 click = 120 units of scroll internally",
}, },
examples=[ examples=[
{ {
"type": "scroll", "type": "scroll",
"description": "Scroll down to see more content.", "description": "Scroll down the user table to see more content.",
"parameters": {"direction": "down"}, "parameters": {"element_description": "Users table", "direction": "down", "distance": "6"},
}, },
{ {
"type": "scroll", "type": "scroll",
"description": "Scroll up to the top of the page.", "description": "Scroll up to the top of the page.",
"parameters": {"direction": "up"}, "parameters": {"element_description": "the main page", "direction": "up"},
}, },
], ],
) )
) )
# Drag action
self.add_action( self.add_action(
ActionDefinition( ActionDefinition(
type="drag", type="drag",
description="Drag an element or the mouse (with left click on) from one location to another. You must specify both start_description and end_description.", description="Drag an element or the mouse (with left click on) from one location to another.",
parameters={ parameters={"start_description": "description of the location to start dragging", "end_description": "description of the location to drag to"},
"start_description": "description of the location to start dragging",
"end_description": "description of the location to drag to",
},
examples=[ examples=[
{ {
"type": "drag", "type": "drag",
"description": "Drag the response.txt file to the responses folder", "description": "Drag the response.txt file to the responses folder",
"start_description": "Click the response.txt file", "parameters": {
"end_description": "Click the responses folder", "start_description": "the response.txt file",
"end_description": "the responses folder",
},
},
{
"type": "drag",
"description": "Drag the profile picture image into the upload box",
"parameters": {
"start_description": "the profile picture image",
"end_description": "the upload box",
},
}, },
], ],
) )
) )
# Mouse move action
self.add_action( self.add_action(
ActionDefinition( ActionDefinition(
type="mouse_move", type="mouse_move",
description="Move the mouse to a specific element", description="Move the mouse to a specific element",
examples=[ examples=[
{ {"type": "mouse_move", "description": "Move the mouse to the 'Submit' button."},
"type": "mouse_move", {"type": "mouse_move", "description": "Hover over the 'Settings' icon."},
"description": "Move the mouse to the 'Submit' button.",
},
{
"type": "mouse_move",
"description": "Hover over the 'Settings' icon.",
},
], ],
) )
) )
# Key press action - with key parameter
self.add_action( self.add_action(
ActionDefinition( ActionDefinition(
type="key_press", type="key_press",
@@ -246,50 +299,55 @@ class ComputerUseAgentInterface:
"key": f'str # the key or key combination (separated by space) to be pressed. Example of key combination "Ctrl A", "Shift Tab", "Ctrl C" etc. "<Key> + Click" is not a valid combination, use two separate actions. Beside normal keys like letters, numerics, punctuations etc.. here are special key list: {key_maps.keys()}.' "key": f'str # the key or key combination (separated by space) to be pressed. Example of key combination "Ctrl A", "Shift Tab", "Ctrl C" etc. "<Key> + Click" is not a valid combination, use two separate actions. Beside normal keys like letters, numerics, punctuations etc.. here are special key list: {key_maps.keys()}.'
}, },
examples=[ examples=[
{ {"type": "key_press", "description": "Press 'Ctrl A' to select all text.", "parameters": {"key": "Ctrl A"}},
"type": "key_press", {"type": "key_press", "description": "Press Pagedown key.", "parameters": {"key": "Pagedown"}},
"description": "Press 'Ctrl A' to select all text.",
"parameters": {"key": "Ctrl A"},
},
{
"type": "key_press",
"description": "Press Pagedown key.",
"parameters": {"key": "Pagedown"},
},
], ],
) )
) )
# Extract data action - with variable parameter
self.add_special_action( self.add_special_action(
ActionDefinition( ActionDefinition(
type="extract_data", type="extract_data",
description="Use to extract some data from the screen for the task. This data will be stored in memory and used in the next actions or returned in the final result.", description="Use to extract some data from the screen for the task. This data will be stored in memory and used in the next actions or returned in the final result.",
parameters={ parameters={"description": "str - short description of the data to be extracted", "data": "str|json - the data to be extracted"},
"description": "str - short description of the data to be extracted",
"data": "str|json - the data to be extracted",
},
examples=[ examples=[
{ {
"type": "extract_data", "type": "extract_data",
"description": "Extract the product name and price from the screen.", "description": "Extract the product name and price from the screen.",
"parameters": { "parameters": {"description": "Available product name and price", "data": "Product Name: iPhone 14, Price: $999"},
"description": "Available product name and price",
"data": "Product Name: iPhone 14, Price: $999",
},
}, },
], ],
) )
) )
# Wait action
self.add_special_action(
ActionDefinition(
type="wait",
description="Use it to wait for the completion of an event.",
examples=[
{"type": "wait", "description": "Wait for the running command to finish."},
],
)
)
# Finish action - with status parameter
self.add_special_action( self.add_special_action(
ActionDefinition( ActionDefinition(
type="finish", type="finish",
description=" Use it to finish the task with success or failure status. When you think the task was finished return success, while when you think can not be done, return failure, don't easily say failure, try your best to do the task.", description=(
"Use it to finish the task with success or failure. "
"Before finishing, ensure all necessary data entries or selections required by the task are committed by performing appropriate actions (e.g., pressing 'Enter'/ 'Tab', pressing CTRL + S to save the document or clicking 'Save', changing focus, or blurring the input field). After typing a value that should be set/submitted, perform a COMMIT action (Enter, Tab, click Save/Apply or blur) before using the finish action.",
"Do not use the finish action while any essential process or command (e.g., downloading data, running a script, loading results) is still in progress; wait for it (emmit wait action) to fully complete before finishing. ",
"Failure status is used when the task is impossible to complete or you are unable to complete it (e.g. stuck in a loop, etc)."
),
parameters={"status": "str - 'success' or 'failure'"}, parameters={"status": "str - 'success' or 'failure'"},
examples=[ examples=[
{"type": "finish", "description": "Task completed successfully.", "parameters": {"status": "success"}},
{ {
"type": "finish", "type": "finish",
"description": "Task completed successfully.", "description": "After typing 'John Doe' and pressing TAB to save the value, finish the task successfully.",
"parameters": {"status": "success"}, "parameters": {"status": "success"},
}, },
], ],
@@ -297,15 +355,19 @@ class ComputerUseAgentInterface:
) )
def add_action(self, action: ActionDefinition): def add_action(self, action: ActionDefinition):
"""Add a new action to the agent"""
self.ui_actions[action.type] = action self.ui_actions[action.type] = action
def add_special_action(self, action: ActionDefinition): def add_special_action(self, action: ActionDefinition):
"""Add a special action that is not part of the main UI actions"""
self.special_actions[action.type] = action self.special_actions[action.type] = action
def get_action_definition(self, action_type: str) -> Optional[ActionDefinition]: def get_action_definition(self, action_type: str) -> Optional[ActionDefinition]:
"""Get action definition by type"""
return self.ui_actions.get(action_type) or self.special_actions.get(action_type) return self.ui_actions.get(action_type) or self.special_actions.get(action_type)
def validate_action(self, action: PlanAction): def validate_action(self, action: PlanAction):
"""Validate if the action is valid and has all required parameters"""
action_definition = self.get_action_definition(action.action_type) action_definition = self.get_action_definition(action.action_type)
if action_definition is None: if action_definition is None:
raise ValidationException(f"Invalid action type: {action.action_type}") raise ValidationException(f"Invalid action type: {action.action_type}")
@@ -313,26 +375,25 @@ class ComputerUseAgentInterface:
if action_definition.parameters: if action_definition.parameters:
for parameter in action_definition.parameters: for parameter in action_definition.parameters:
if parameter not in action.parameters: if parameter not in action.parameters:
raise ValidationException( raise ValidationException(f"Missing parameter '{parameter}' in action: {action}")
f"Missing parameter '{parameter}' in action: {action}"
)
def get_system_prompt(self) -> str: def get_system_prompt(self) -> str:
"""Generate the complete prompt for the agent"""
indentation = " " indentation = " "
def get_action_definition(action: ActionDefinition) -> str: def get_action_definition(action: ActionDefinition) -> str:
"""Format action definitions for the prompt"""
action_prompt = f"- {action.type}: {action.description}" action_prompt = f"- {action.type}: {action.description}"
if action.parameters is not None and len(action.parameters) > 0: if action.parameters is not None and len(action.parameters) > 0:
params = (",\n" + 2 * indentation).join( params = (",\n" + 2 * indentation).join(f"{k}: {v}" for k, v in action.parameters.items())
f"{k}: {v}" for k, v in action.parameters.items() parameter_def = f"{indentation}parameters:\n{indentation}{indentation}{params}"
)
parameter_def = (
f"{indentation}parameters:\n{indentation}{indentation}{params}"
)
action_prompt += "\n" + parameter_def action_prompt += "\n" + parameter_def
return action_prompt return action_prompt
def get_examples(actions: List[ActionDefinition]) -> list[str]: def get_examples(actions: List[ActionDefinition]) -> list[str]:
"""Format action examples for the prompt"""
output_examples = [] output_examples = []
for action in actions: for action in actions:
for example in action.examples: for example in action.examples:
@@ -343,46 +404,21 @@ class ComputerUseAgentInterface:
example_parts = [type_str, description_str] example_parts = [type_str, description_str]
if "parameters" in example: if "parameters" in example:
params = (",\n" + 2 * indentation).join( params = (",\n" + 2 * indentation).join(f'"{k}": "{v}"' for k, v in example["parameters"].items())
f'"{k}": "{v}"' for k, v in example["parameters"].items() parameters_str = '"parameters"' + ": {\n" + 2 * indentation + params + "\n" + indentation + "}"
)
parameters_str = (
'"parameters"'
+ ": {\n"
+ 2 * indentation
+ params
+ "\n"
+ indentation
+ "}"
)
example_parts.append(parameters_str) example_parts.append(parameters_str)
example_json = ( example_json = "{\n" + indentation + (",\n" + indentation).join(example_parts) + "\n}"
"{\n"
+ indentation
+ (",\n" + indentation).join(example_parts)
+ "\n}"
)
output_examples.append(example_json) output_examples.append(example_json)
return output_examples return output_examples
available_actions = "\n\n".join( available_actions = "\n\n".join(get_action_definition(action) for action in self.ui_actions.values())
get_action_definition(action) for action in self.ui_actions.values() special_actions = "\n\n".join(get_action_definition(action) for action in self.special_actions.values())
) examples = "\n\n".join(get_examples(list(self.ui_actions.values()) + list(self.special_actions.values())))
special_actions = "\n\n".join(
get_action_definition(action) for action in self.special_actions.values()
)
examples = "\n\n".join(
get_examples(
list(self.ui_actions.values()) + list(self.special_actions.values())
)
)
return system_template.format( out = system_template.format(available_actions=available_actions, special_actions=special_actions, examples=examples)
available_actions=available_actions, out += "\n\n" + memory_system_template.format()
special_actions=special_actions, return out
examples=examples,
)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -19,113 +19,19 @@ class UiPathComputerUseV1(object):
self.planner = ActionPlanner() self.planner = ActionPlanner()
self.executor = GrounderClient() self.executor = GrounderClient()
async def predict_request( async def predict_request(self, request_body: dict, model_name: str) -> tuple[dict, dict]:
self, request_body: dict, model_name: str previous_steps = request_body['previousSteps'] if request_body['previousSteps'] else []
) -> tuple[dict, dict]:
state = State( state = State(
task=request_body["userTask"], task=request_body["userTask"],
image_base64=request_body["image"], image_base64=request_body["image"],
previous_steps=request_body.get("previousSteps", []), previous_steps=[step for step in previous_steps],
) )
execution_state = ExecutionState(model_name=model_name, execution_info={}) execution_state = ExecutionState(model_name=model_name)
output = await self.predict(state, execution_state) output = await self.predict(state, execution_state, max_retries=2)
return output return output
def process_grounding( def wrap_to_computer_use_action(self, plan_action: PlanAction, grounding_result: utils.GroundingOutput | None) -> ComputerUseAction:
self,
plan_action: PlanAction,
grounding_result: utils.GroundingOutput,
x: int,
y: int,
):
match plan_action.action_type:
case PlanActionType.Scroll:
# guess the scroll direction if missing in the plan output
if "direction" not in plan_action.parameters:
if "scroll up" in plan_action.description.lower():
scroll_direction = "up"
else:
scroll_direction = "down"
else:
scroll_direction = plan_action.parameters["direction"]
action = ComputerUseAction(
name=SupportedActions.Scroll,
description=plan_action.description,
parameters={"position": [x, y], "direction": scroll_direction},
)
if "distance" in plan_action.parameters:
match scroll_direction:
case "up":
action.parameters["offset"] = [
0,
plan_action.parameters["distance"],
]
case "down":
action.parameters["offset"] = [
0,
-plan_action.parameters["distance"],
]
case "left":
action.parameters["offset"] = [
plan_action.parameters["distance"],
0,
]
case "right":
action.parameters["offset"] = [
-plan_action.parameters["distance"],
0,
]
case PlanActionType.Drag:
assert grounding_result.end_position is not None, (
"End position must be provided for drag action"
)
x_end, y_end = grounding_result.end_position
action = ComputerUseAction(
name=SupportedActions.Drag,
description=plan_action.description,
parameters={
"path": [
{"x": x, "y": y},
{"x": x_end, "y": y_end},
]
},
)
case _:
action_name = plan_action.action_type
parameters = {"position": [x, y]}
if plan_action.action_type == PlanActionType.DoubleClick:
action_name = SupportedActions.Click
parameters["click_type"] = "double"
elif plan_action.action_type == PlanActionType.RightClick:
action_name = SupportedActions.Click
parameters["button"] = "right"
elif plan_action.action_type == PlanActionType.MouseMove:
action_name = SupportedActions.MouseMove # different names
assert action_name in [
SupportedActions.Click,
SupportedActions.MouseMove,
]
action = ComputerUseAction(
name=action_name,
description=plan_action.description,
parameters=parameters,
)
return action
async def predict(
self, state: State, execution_state: ExecutionState
) -> tuple[dict, dict]:
planer_output: PlannerOutput = self.planner.predict(state, execution_state)
plan_action = planer_output.plan_action
action: ComputerUseAction | None = None
step: ComputerUseStep | None = None
match plan_action.action_type: match plan_action.action_type:
case PlanActionType.KeyPress: case PlanActionType.KeyPress:
keys = plan_action.parameters["key"].split(" ") keys = plan_action.parameters["key"].split(" ")
@@ -142,6 +48,125 @@ class UiPathComputerUseV1(object):
description=plan_action.description, description=plan_action.description,
parameters={}, parameters={},
) )
case PlanActionType.Click | PlanActionType.DoubleClick | PlanActionType.TripleClick | PlanActionType.MouseMove | PlanActionType.RightClick:
action_name = plan_action.action_type
x, y = grounding_result.position
parameters = {"position": [int(x), int(y)]}
if plan_action.action_type == PlanActionType.DoubleClick:
action_name = SupportedActions.Click
parameters["click_type"] = "double"
elif plan_action.action_type == PlanActionType.TripleClick:
action_name = SupportedActions.Click
parameters["click_type"] = "triple"
elif plan_action.action_type == PlanActionType.RightClick:
action_name = SupportedActions.Click
parameters["button"] = "right"
elif plan_action.action_type == PlanActionType.MouseMove:
action_name = SupportedActions.MouseMove # different names
assert action_name in [SupportedActions.Click, SupportedActions.MouseMove]
action = ComputerUseAction(
name=action_name,
description=plan_action.description,
parameters=parameters,
)
case PlanActionType.Drag:
assert grounding_result.end_position is not None, "End position must be provided for drag action"
x, y = grounding_result.position
x_end, y_end = grounding_result.end_position
x, y = int(x), int(y)
x_end, y_end = int(x_end), int(y_end)
action = ComputerUseAction(
name=SupportedActions.Drag,
description=plan_action.description,
parameters={"path": [{"x": x, "y": y}, {"x": x_end, "y": y_end}]},
)
case PlanActionType.Scroll:
x, y = grounding_result.position
x, y = int(x), int(y)
# guess the scroll direction if missing in the plan output
if "direction" not in plan_action.parameters:
if "scroll up" in plan_action.description.lower():
scroll_direction = "up"
else:
scroll_direction = "down"
else:
scroll_direction = plan_action.parameters["direction"]
action = ComputerUseAction(
name=SupportedActions.Scroll, description=plan_action.description, parameters={"position": [x, y], "direction": scroll_direction}
)
if "distance" in plan_action.parameters:
match scroll_direction:
case "up":
action.parameters["offset"] = [0, plan_action.parameters["distance"]]
case "down":
action.parameters["offset"] = [0, -plan_action.parameters["distance"]]
case "left":
action.parameters["offset"] = [plan_action.parameters["distance"], 0]
case "right":
action.parameters["offset"] = [-plan_action.parameters["distance"], 0]
case PlanActionType.Type:
action = ComputerUseAction(
name=SupportedActions.TypeInto,
description=plan_action.description,
parameters={"value": plan_action.parameters["text"]},
)
return action
async def predict(
self, state: State, execution_state: ExecutionState, max_retries: int = 0, planer_output: PlannerOutput | None = None
) -> tuple[dict, dict]:
execute_planning = True
is_planning_fixed = planer_output is not None
execution_count = 0
execution_state.execution_info.responses = []
while execute_planning:
try:
execution_count += 1
if execution_state.execution_info.current_response is not None:
execution_state.execution_info.responses.append(execution_state.execution_info.current_response)
execution_state.execution_info.current_response = utils.RawAgentResponse()
if not is_planning_fixed:
planer_output = await self.planner.predict(state, execution_state)
plan_action = planer_output.plan_action
step = await self.process_plan_and_ground(planer_output, state, execution_state, retry_number=max_retries)
execute_planning = False
except utils.GroundingOutputValidationException as e:
execution_state.execution_info.current_response.grounding_error = e
if is_planning_fixed or execution_count > max_retries:
raise ValueError(f"Grounding error with fixed plan: {e.message}, element description: {e.element_description}")
# save additional data for history
assert step is not None
assert step.additional_parameters is not None
step.additional_parameters["thought"] = planer_output.thought
step.additional_parameters["review"] = planer_output.review
step.additional_parameters.update(planer_output.additional_sections)
step.additional_parameters["plan_action"] = json.dumps(plan_action.to_dict())
history_image = state.image_base64
previous_steps_parameters = {
"max_chat_history_messages": 1000,
"max_chat_history_images": 1,
"image": history_image,
}
agent_response = {"step": step.to_response_dict(), "previous_steps_parameters": previous_steps_parameters}
return agent_response
async def process_plan_and_ground(
self, planer_output: PlannerOutput, state: State, execution_state: ExecutionState, retry_number: int = 0
) -> ComputerUseStep:
plan_action = planer_output.plan_action
action: ComputerUseAction | None = None
step: ComputerUseStep | None = None
match plan_action.action_type:
case PlanActionType.ExtractData: case PlanActionType.ExtractData:
# return a step with no action, just to store the extracted data # return a step with no action, just to store the extracted data
step = ComputerUseStep( step = ComputerUseStep(
@@ -164,35 +189,29 @@ class UiPathComputerUseV1(object):
| PlanActionType.Scroll | PlanActionType.Scroll
| PlanActionType.Drag | PlanActionType.Drag
| PlanActionType.DoubleClick | PlanActionType.DoubleClick
| PlanActionType.TripleClick
| PlanActionType.RightClick | PlanActionType.RightClick
): ):
if plan_action.action_type != PlanActionType.Drag: if plan_action.action_type != PlanActionType.Drag:
element_description = plan_action.parameters.get("element_description", None)
grounding_result = await self.executor.predict( grounding_result = await self.executor.predict(
state.image_base64, state.image_base64,
plan_action.description, plan_action.description,
action=plan_action.action_type, action=plan_action.action_type,
element_description=element_description
) )
else: else:
grounding_result = await self.executor.predict( start_description = plan_action.parameters.get("start_description", None)
state.image_base64, end_description = plan_action.parameters.get("end_description", None)
plan_action.parameters["start_description"], drag_entire_description = plan_action.description
action=plan_action.action_type, drag_start_description = f"Drag Start point:{start_description}. [Full Drag Description:{drag_entire_description}]"
) drag_end_description = f"Drag End point:{end_description}. [Full Drag Description:{drag_entire_description}]"
grounding_result_end = await self.executor.predict( grounding_result = await self.executor.predict(state.image_base64, drag_start_description, action=plan_action.action_type)
state.image_base64, grounding_result_end = await self.executor.predict(state.image_base64, drag_end_description, action=plan_action.action_type)
plan_action.parameters["end_description"], grounding_result.end_position = grounding_result_end.get_point_location()
action=plan_action.action_type, action = self.wrap_to_computer_use_action(plan_action, grounding_result)
) case _:
grounding_result.end_position = grounding_result_end.position action = self.wrap_to_computer_use_action(plan_action, grounding_result=None)
x, y = grounding_result.position
action = self.process_grounding(plan_action, grounding_result, x, y)
case PlanActionType.Type:
action = ComputerUseAction(
name=SupportedActions.TypeInto,
description=plan_action.description,
parameters={"value": plan_action.parameters["text"]},
)
if step is None: if step is None:
assert action is not None assert action is not None
step = ComputerUseStep( step = ComputerUseStep(
@@ -202,22 +221,4 @@ class UiPathComputerUseV1(object):
thought=planer_output.thought, thought=planer_output.thought,
) )
# save additional data for history return step
assert step.additional_parameters is not None
step.additional_parameters["thought"] = planer_output.thought
step.additional_parameters["review"] = planer_output.review
step.additional_parameters.update(planer_output.additional_sections)
step.additional_parameters["plan_action"] = json.dumps(plan_action.to_dict())
history_image = state.image_base64
previous_steps_parameters = {
"max_chat_history_messages": 1000,
"max_chat_history_images": self.planner.number_history_steps_with_images,
"image": history_image,
}
agent_response = {
"step": step.to_response_dict(),
"previous_steps_parameters": previous_steps_parameters,
}
return agent_response

View File

@@ -4,21 +4,20 @@ import os
class GrounderClient(object): class GrounderClient(object):
def __init__(self): def __init__(self):
# Proxy for hosting UI-TARS + UiElementPredictor # Proxy for hosting finetuned Qwen3VL + UiElementPredictor
# Could be replaced with a VLLM server and grounder (UI-TARS) specific processing # Could be replaced with a VLLM server and grounder specific processing
# Or any other grounder
self.url = "" self.url = ""
async def predict( async def predict(
self, image_base64: str, action_description: str, action: str | None = None self, image_base64: str, action_description: str, action: str, element_description: str | None = None,
) -> utils.GroundingOutput: ) -> utils.GroundingOutput:
request = utils.GroundingRequest( request = utils.GroundingRequest(
description=action_description, description=action_description,
image_base64=image_base64, image_base64=image_base64,
action_type=action, action_type=action,
element_description=element_description
) )
api_key = os.getenv("SERVICE_KEY") api_key = os.getenv("SERVICE_KEY")
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
response = await client.post( response = await client.post(
self.url, self.url,
@@ -26,6 +25,7 @@ class GrounderClient(object):
"image_base64": request.image_base64, "image_base64": request.image_base64,
"action_description": request.description, "action_description": request.description,
"action": request.action_type, "action": request.action_type,
"element_description": request.element_description,
}, },
headers={ headers={
"X-API-KEY": api_key "X-API-KEY": api_key
@@ -37,6 +37,8 @@ class GrounderClient(object):
raise ValueError(f"Prediction failed: {response.text}") raise ValueError(f"Prediction failed: {response.text}")
data = response.json() data = response.json()
if tuple(data["position"]) == (-1, -1):
raise utils.GroundingOutputValidationException(f"Element {request.description} not found in image", request.description)
return utils.GroundingOutput( return utils.GroundingOutput(
description=data["description"], description=data["description"],
position=tuple(data["position"]), position=tuple(data["position"]),

View File

@@ -5,7 +5,6 @@ def send_messages(payload):
# URL to your proxy for calling LLMs # URL to your proxy for calling LLMs
proxy_url = "" proxy_url = ""
api_key = os.getenv("SERVICE_KEY") api_key = os.getenv("SERVICE_KEY")
# Can be directly replaced with code for calling Azure endpoint as in: # Can be directly replaced with code for calling Azure endpoint as in:
#.env config example : #.env config example :
# AZURE_OPENAI_API_BASE=YOUR_API_BASE # AZURE_OPENAI_API_BASE=YOUR_API_BASE
@@ -40,5 +39,5 @@ def send_messages(payload):
for attempt in range(retries): for attempt in range(retries):
response = requests.post(proxy_url, headers=headers, json=payload) response = requests.post(proxy_url, headers=headers, json=payload)
if response.status_code == 200: if response.status_code == 200:
return response.json()["choices"][0]["message"]["content"] return response.text
return None return None

105
mm_agents/uipath/memory.py Normal file
View File

@@ -0,0 +1,105 @@
import json
from enum import Enum
from mm_agents.uipath.utils import ValidationException, parse_message_json, ExecutionInfo
from mm_agents.uipath.types_utils import ExecutionState, State
memory_system_template = """You also have a SHORT TERM MEMORY that stores only data about the task. It is NOT a log of mechanical UI interactions. Use it to:
- Keep track of items that need to be processed as part of the task
- store only information that might be useful later in the task
- DO NOT store information which can be easily inferered from the task description
Never record: scrolling, mouse movement / hover, focusing an input (unless it results in a committed value change), transient pop-ups you just closed, partial / intermediate typed characters, pure navigation clicks that do not yield a new verifiable state.
Memory supports only the following operations emitted as a LIST of JSON objects (empty list if no update):
- store_info # add or update information related to the task in memory
{{
"key": str, # the info key, must be unique
"info_type": Literal["data_update", "queue_elements"],
# data_update: different data related to the task
# queue_elements: list of items to be processed in the task
"value": str|json,
"description": str # Short human-readable description of the update (what changed and why it matters)
}}
- delete_info {{"key": str, "description": str}} - delete information from memory by key
Example: [{{"type": "store_info", "info_type": "queue_elements", "key": "scripts_to_be_executed", "value": "[script.py, script2.py, script3.py]", "description": "List of scripts that need to be executed as part of the task"}}]
"""
class EnumMemoryOperationType(str, Enum):
StoreInfo = "store_info"
DeleteInfo = "delete_info"
NoOp = "no_op"
class MemoryOperation(object):
def __init__(
self,
operation_type: str,
key: str | None = None,
value: str | dict | None = None,
description: str | None = None,
info_type: str | None = None,
):
self.operation_type = operation_type
self.key = key
self.value = value
self.description = description
self.info_type = info_type
@staticmethod
def from_dict(data: dict) -> "MemoryOperation":
operation_type = data.get("type", "").lower()
if data.get("info_type", None) is not None or data.get("value", None) is not None:
operation_type = EnumMemoryOperationType.StoreInfo
if operation_type not in (EnumMemoryOperationType.StoreInfo, EnumMemoryOperationType.DeleteInfo, EnumMemoryOperationType.NoOp):
raise ValidationException(f"Invalid memory operation type: {operation_type}")
if operation_type == EnumMemoryOperationType.StoreInfo:
if "key" not in data or "value" not in data:
raise ValidationException("StoreInfo operation requires 'key' and 'value'")
key = data.get("key", None)
value = data.get("value", None)
description = data.get("description", None)
info_type = data.get("info_type", None)
return MemoryOperation(operation_type, key, value, description, info_type)
class ShortTermMemoryManager:
async def get_updated_memory(
self, state: State, memory_operations: list[MemoryOperation], execution_state: ExecutionState
) -> tuple[dict[str, dict[str, str]], list[str]]:
current_memory = json.loads(state.previous_steps[-1]["additional_parameters"].get("memory", "{}")) if len(state.previous_steps) > 0 else {}
for i, memory_operation in enumerate(memory_operations):
if memory_operation.operation_type == EnumMemoryOperationType.StoreInfo:
if "data" not in current_memory:
current_memory["data"] = {}
data_memory = current_memory["data"]
if memory_operation.key is None or memory_operation.value is None:
raise ValidationException("StoreInfo operation requires 'key' and 'value'")
if memory_operation.key not in data_memory:
data_memory[memory_operation.key] = {}
data_memory[memory_operation.key]["value"] = memory_operation.value
data_memory[memory_operation.key]["description"] = memory_operation.description
data_memory[memory_operation.key]["info_type"] = memory_operation.info_type
elif memory_operation.operation_type == EnumMemoryOperationType.DeleteInfo:
data_memory = current_memory.get("data", {})
data_memory.pop(memory_operation.key, None)
elif memory_operation.operation_type == EnumMemoryOperationType.NoOp:
pass
return current_memory
def extract_memory_operations(self, memory_response: str | None) -> list[MemoryOperation]:
if isinstance(memory_response, str):
try:
memory_response = json.loads(memory_response)
except Exception as e:
raise ValidationException(f"Invalid memory format, cannot parse JSON: {memory_response}. Error: {e}")
memory_operations = [MemoryOperation.from_dict(item) for item in memory_response]
return memory_operations

View File

@@ -1,5 +1,6 @@
from typing import Optional, Union, List from typing import Optional, Union, List
from enum import Enum from enum import Enum
from mm_agents.uipath.utils import ExecutionInfo
key_maps = { key_maps = {
"Backspace": "Back", "Backspace": "Back",
@@ -21,6 +22,7 @@ key_maps = {
class PlanActionType(str, Enum): class PlanActionType(str, Enum):
Click = "click" Click = "click"
DoubleClick = "double_click" DoubleClick = "double_click"
TripleClick = "triple_click"
RightClick = "right_click" RightClick = "right_click"
Type = "type" Type = "type"
Scroll = "scroll" Scroll = "scroll"
@@ -189,6 +191,6 @@ class State(object):
class ExecutionState(object): class ExecutionState(object):
def __init__(self, model_name: str, execution_info: dict): def __init__(self, model_name: str):
self.model_name = model_name self.model_name = model_name
self.execution_info = execution_info self.execution_info = ExecutionInfo()

View File

@@ -1,14 +1,32 @@
import json import json
import re import re
from typing import Optional
from json_minify import json_minify from json_minify import json_minify
from json_repair import repair_json from json_repair import repair_json
from dataclasses import dataclass, field
class ValidationException(Exception): class ValidationException(Exception):
def __init__(self, message: str): def __init__(self, message: str):
self.message = message self.message = message
class GroundingOutputValidationException(ValidationException):
def __init__(self, message: str, element_description: str, raw_response: str | None = None):
super().__init__(message)
self.message = message
self.element_description = element_description
self.raw_response = raw_response
@dataclass
class RawAgentResponse:
raw_planning_prediction: str | None = None
grounding_error: Optional[GroundingOutputValidationException] = None
class ExecutionInfo:
planner_action_review: Optional[dict] = None
responses: list[RawAgentResponse] = field(default_factory=list) # can contain both planning and grounding raw responses
current_response: Optional[RawAgentResponse] = None
def parse_message_json(message: str) -> dict: def parse_message_json(message: str) -> dict:
message = message.strip() message = message.strip()
@@ -47,11 +65,19 @@ class GroundingOutput:
self.position = position self.position = position
self.end_position = end_position self.end_position = end_position
def get_point_location(self) -> tuple[int, int]:
if self.position is None:
x1, y1, x2, y2 = self.bbox
x, y = (x1 + x2) // 2, (y1 + y2) // 2
else:
x, y = self.position
return x, y
class GroundingRequest: class GroundingRequest:
def __init__( def __init__(
self, description: str, image_base64: str, action_type: str | None = None self, description: str, image_base64: str, action_type: str | None = None, element_description: str | None = None
): ):
self.description = description self.description = description
self.image_base64 = image_base64 self.image_base64 = image_base64
self.action_type = action_type self.action_type = action_type
self.element_description = element_description

View File

@@ -73,7 +73,7 @@ def map_uipath_agent_actions_to_osworld(actions):
if params["click_type"] == "double": if params["click_type"] == "double":
return {"action_type": "DOUBLE_CLICK", "x": x, "y": y} return {"action_type": "DOUBLE_CLICK", "x": x, "y": y}
elif params["click_type"] == "triple": elif params["click_type"] == "triple":
return {"action_type": "TRIPLE_CLICK", "x": x, "y": y} return {"action_type": "CLICK", "x": x, "y": y, "num_clicks": 3}
else: else:
raise ValueError(f"Unknown click type: {params['click_type']}") raise ValueError(f"Unknown click type: {params['click_type']}")
else: else:
@@ -165,23 +165,17 @@ class UipathBaseAgent:
{ {
"actions": rsp["step"]["actions"], "actions": rsp["step"]["actions"],
"description": rsp["step"]["description"], "description": rsp["step"]["description"],
"additional_parameters": { "additional_parameters": rsp['step']['additional_parameters'],
"review": rsp["step"]["additional_parameters"]["review"],
"thought": rsp["step"]["additional_parameters"]["thought"],
"action_description": rsp["step"]["additional_parameters"][
"action_description"
],
"plan_action": rsp["step"]["additional_parameters"]["plan_action"],
},
"image": img_base64, "image": img_base64,
} }
) )
def predict(self, instruction: str, obs: Dict, args, step_idx) -> List: def predict(self, instruction: str, obs: Dict, args, step_idx) -> List:
if step_idx == args.max_steps - 1: if step_idx >= args.max_steps - 1:
message = ( message = (
instruction instruction + """You have reached the final step of the process.
+ "The sudo password is password, if needed. This is the last step, you must return the finish actions with either success or failure, depending on the result. No further steps are allowed." At this point, no further actions can be taken - it may therefore be impossible to complete the task successfully.
Conclude by returning a finish action with success or failure, depending on what can be determined from the current state."""
) )
else: else:
message = instruction + "The sudo password is password, if needed." message = instruction + "The sudo password is password, if needed."

View File

@@ -1,5 +1,11 @@
from desktop_env.desktop_env import DesktopEnv
import argparse import argparse
import logging
from desktop_env.desktop_env import DesktopEnv
logging.basicConfig(
level=logging.INFO,
)
example = { example = {
"id": "94d95f96-9699-4208-98ba-3c3119edf9c2", "id": "94d95f96-9699-4208-98ba-3c3119edf9c2",

35
run.py
View File

@@ -86,6 +86,7 @@ def config() -> argparse.Namespace:
parser.add_argument("--screen_height", type=int, default=1080) parser.add_argument("--screen_height", type=int, default=1080)
parser.add_argument("--sleep_after_execution", type=float, default=0.0) parser.add_argument("--sleep_after_execution", type=float, default=0.0)
parser.add_argument("--max_steps", type=int, default=15) parser.add_argument("--max_steps", type=int, default=15)
parser.add_argument("--enable_recording", action="store_true", help="Enable video recording (disabled by default)")
# agent config # agent config
parser.add_argument("--max_trajectory_length", type=int, default=3) parser.add_argument("--max_trajectory_length", type=int, default=3)
@@ -94,10 +95,10 @@ def config() -> argparse.Namespace:
) )
# lm config # lm config
parser.add_argument("--model", type=str, default="gpt-4o") parser.add_argument("--model", type=str, default="gpt-4-vision-preview")
parser.add_argument("--temperature", type=float, default=1.0) parser.add_argument("--temperature", type=float, default=1.0)
parser.add_argument("--top_p", type=float, default=0.9) parser.add_argument("--top_p", type=float, default=0.9)
parser.add_argument("--max_tokens", type=int, default=1500) parser.add_argument("--max_tokens", type=int, default=16384)
parser.add_argument("--stop_token", type=str, default=None) parser.add_argument("--stop_token", type=str, default=None)
# example config # example config
@@ -147,6 +148,8 @@ def test(args: argparse.Namespace, test_all_meta: dict) -> None:
action_space=args.action_space, action_space=args.action_space,
observation_type=args.observation_type, observation_type=args.observation_type,
max_trajectory_length=args.max_trajectory_length, max_trajectory_length=args.max_trajectory_length,
screen_width=args.screen_width,
screen_height=args.screen_height,
) )
env = DesktopEnv( env = DesktopEnv(
@@ -155,11 +158,31 @@ def test(args: argparse.Namespace, test_all_meta: dict) -> None:
action_space=agent.action_space, action_space=agent.action_space,
screen_size=(args.screen_width, args.screen_height), screen_size=(args.screen_width, args.screen_height),
headless=args.headless, headless=args.headless,
os_type = "Ubuntu", os_type = "Windows",
require_a11y_tree=args.observation_type require_a11y_tree=args.observation_type
in ["a11y_tree", "screenshot_a11y_tree", "som"], in ["a11y_tree", "screenshot_a11y_tree", "som"],
) )
# get actual VM screen size after environment initialization
try:
actual_screen_size = env.vm_screen_size
if actual_screen_size and 'width' in actual_screen_size and 'height' in actual_screen_size:
actual_width = actual_screen_size['width']
actual_height = actual_screen_size['height']
logger.info(f"Actual VM screen size: {actual_width}x{actual_height}")
# update agent's screen size if different
if actual_width != args.screen_width or actual_height != args.screen_height:
logger.warning(f"Screen size mismatch! Expected: {args.screen_width}x{args.screen_height}, Actual: {actual_width}x{actual_height}")
agent.screen_width = actual_width
agent.screen_height = actual_height
# replace in system message as well
agent.system_message = agent.system_message.replace(
f"({args.screen_width}, {args.screen_height})",
f"({actual_width}, {actual_height})"
)
except Exception as e:
logger.warning(f"Unable to get actual VM screen size: {e}")
for domain in tqdm(test_all_meta, desc="Domain"): for domain in tqdm(test_all_meta, desc="Domain"):
for example_id in tqdm(test_all_meta[domain], desc="Example", leave=False): for example_id in tqdm(test_all_meta[domain], desc="Example", leave=False):
config_file = os.path.join( config_file = os.path.join(
@@ -204,8 +227,8 @@ def test(args: argparse.Namespace, test_all_meta: dict) -> None:
) )
except Exception as e: except Exception as e:
logger.error(f"Exception in {domain}/{example_id}: {e}") logger.error(f"Exception in {domain}/{example_id}: {e}")
# Only attempt to end recording if controller exists (not Docker provider) # Only attempt to end recording if controller exists (not Docker provider) and recording is enabled
if hasattr(env, 'controller') and env.controller is not None: if args.enable_recording and hasattr(env, 'controller') and env.controller is not None:
env.controller.end_recording( env.controller.end_recording(
os.path.join(example_result_dir, "recording.mp4") os.path.join(example_result_dir, "recording.mp4")
) )
@@ -217,7 +240,7 @@ def test(args: argparse.Namespace, test_all_meta: dict) -> None:
) )
f.write("\n") f.write("\n")
env.close() # env.close()
logger.info(f"Average score: {sum(scores) / len(scores) if scores else 0}") logger.info(f"Average score: {sum(scores) / len(scores) if scores else 0}")

View File

@@ -19,6 +19,7 @@
--test_all_meta_path evaluation_examples/test_nogdrive.json \ --test_all_meta_path evaluation_examples/test_nogdrive.json \
--max_steps 50 \ --max_steps 50 \
--num_envs 30 \ --num_envs 30 \
--temperature 0.01 \
--max_history_turns 4 \ --max_history_turns 4 \
--coordinate_type relative \ --coordinate_type relative \
--resize_factor 32 \ --resize_factor 32 \
@@ -63,6 +64,42 @@ active_environments = []
processes = [] processes = []
is_terminating = False is_terminating = False
# Thread-local storage for task context (works per-process in multiprocessing)
import threading
_task_context = threading.local()
def get_task_context():
"""Get current task context from thread-local storage."""
return getattr(_task_context, 'context', {'domain': None, 'example_id': None})
def set_task_context(domain: str, example_id: str):
"""Set current task context in thread-local storage."""
_task_context.context = {'domain': domain, 'example_id': example_id}
def clear_task_context():
"""Clear current task context."""
if hasattr(_task_context, 'context'):
delattr(_task_context, 'context')
class TaskContextFilter(logging.Filter):
"""Filter to add domain and example_id to log records."""
def filter(self, record):
ctx = get_task_context()
domain = ctx.get('domain')
example_id = ctx.get('example_id')
if domain and example_id:
record.domain = domain
record.example_id = example_id
# Add prefix to message
if hasattr(record, 'msg') and isinstance(record.msg, str):
if not record.msg.startswith(f"[{domain}/{example_id}]"):
record.msg = f"[{domain}/{example_id}] {record.msg}"
else:
record.domain = domain or "N/A"
record.example_id = example_id or "N/A"
return True
# load the environment variables from .env file # load the environment variables from .env file
if os.path.exists(".env"): if os.path.exists(".env"):
from dotenv import load_dotenv from dotenv import load_dotenv
@@ -169,6 +206,12 @@ file_handler.setFormatter(formatter)
debug_handler.setFormatter(formatter) debug_handler.setFormatter(formatter)
stdout_handler.setFormatter(formatter) stdout_handler.setFormatter(formatter)
# Add task context filter to all handlers
task_filter = TaskContextFilter()
file_handler.addFilter(task_filter)
debug_handler.addFilter(task_filter)
stdout_handler.addFilter(task_filter)
stdout_handler.addFilter(logging.Filter("desktopenv")) stdout_handler.addFilter(logging.Filter("desktopenv"))
logger.addHandler(file_handler) logger.addHandler(file_handler)
@@ -213,6 +256,7 @@ def run_env_tasks(task_queue: Queue, args: argparse.Namespace, shared_scores: li
enable_proxy=True, enable_proxy=True,
client_password=args.client_password client_password=args.client_password
) )
active_environments.append(env) active_environments.append(env)
logger.info(f"Process {current_process().name} started.") logger.info(f"Process {current_process().name} started.")
@@ -222,6 +266,7 @@ def run_env_tasks(task_queue: Queue, args: argparse.Namespace, shared_scores: li
except Exception: except Exception:
break break
domain, example_id = item domain, example_id = item
set_task_context(domain, example_id)
try: try:
config_file = os.path.join( config_file = os.path.join(
args.test_config_base_dir, f"examples/{domain}/{example_id}.json" args.test_config_base_dir, f"examples/{domain}/{example_id}.json"
@@ -273,12 +318,14 @@ def run_env_tasks(task_queue: Queue, args: argparse.Namespace, shared_scores: li
import traceback import traceback
logger.error(f"Exception in {current_process().name} {domain}/{example_id}: {e}") logger.error(f"Exception in {current_process().name} {domain}/{example_id}: {e}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
try: try:
env.controller.end_recording( env.controller.end_recording(
os.path.join(example_result_dir, "recording.mp4") os.path.join(example_result_dir, "recording.mp4")
) )
except Exception as rec_e: except Exception as rec_e:
logger.error(f"Failed to end recording: {rec_e}") logger.error(f"Failed to end recording: {rec_e}")
with open(os.path.join(example_result_dir, "traj.jsonl"), "a") as f: with open(os.path.join(example_result_dir, "traj.jsonl"), "a") as f:
f.write(json.dumps({"Error": f"{domain}/{example_id} - {e}"})) f.write(json.dumps({"Error": f"{domain}/{example_id} - {e}"}))
f.write("\n") f.write("\n")
@@ -286,6 +333,8 @@ def run_env_tasks(task_queue: Queue, args: argparse.Namespace, shared_scores: li
logger.error(f"Task-level error in {current_process().name}: {e}") logger.error(f"Task-level error in {current_process().name}: {e}")
import traceback import traceback
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
finally:
clear_task_context()
except Exception as e: except Exception as e:
logger.error(f"Process-level error in {current_process().name}: {e}") logger.error(f"Process-level error in {current_process().name}: {e}")
import traceback import traceback

View File

@@ -1,3 +1,16 @@
"""
OS-Symphony Official Evaluation Script
This script serves as the official evaluation entry point for OS-Symphony.
It handles the setup of the desktop environment, agent initialization, and
execution of evaluation tasks.
For detailed evaluation metrics, configuration options, and usage instructions,
please refer to the official repository:
https://github.com/OS-Copilot/OS-Symphony
"""
import argparse import argparse
import copy import copy
import datetime import datetime

View File

@@ -258,7 +258,11 @@ def run_env_tasks(task_queue: Queue, args: argparse.Namespace, shared_scores: li
except Exception as rec_e: except Exception as rec_e:
logger.error(f"Failed to end recording: {rec_e}") logger.error(f"Failed to end recording: {rec_e}")
with open(os.path.join(example_result_dir, "traj.jsonl"), "a") as f: with open(os.path.join(example_result_dir, "traj.jsonl"), "a") as f:
f.write(json.dumps({"Error": f"{domain}/{example_id} - {e}"})) tb = traceback.format_exc()
f.write(json.dumps({
"Error": f"{domain}/{example_id} - {e}",
"Traceback": tb
}))
f.write("\n") f.write("\n")
except Exception as e: except Exception as e:
logger.error(f"Task-level error in {current_process().name}: {e}") logger.error(f"Task-level error in {current_process().name}: {e}")

View File

@@ -1,57 +1,58 @@
EXP_NAME="xxx"
EXP_NAME="os-osworld-origin-nogdrive-gpt5-gta1-32b-step50-20251220-ybw" export AWS_SECRET_ACCESS_KEY="xxx"
# enable_rewrite_instruction export AWS_ACCESS_KEY_ID="xxx"
export AWS_REGION="us-east-1"
export AWS_SUBNET_ID="xxx"
export AWS_SECURITY_GROUP_ID="xxx"
# >> logs/${EXP_NAME}.log 2>&1
python run_multienv_os_symphony.py \ python run_multienv_os_symphony.py \
--provider_name "docker" \ --provider_name "aws" \
--path_to_vm "xxx" \ --region "us-east-1" \
--client_password "osworld-public-evaluation" \
--headless \ --headless \
--num_envs 1 \ --num_envs 7 \
--max_steps 50 \ --max_steps 50 \
--benchmark osworld \ --benchmark osworld \
--domain "all" \ --domain "all" \
--test_all_meta_path evaluation_examples/test_nogdrive.json \ --test_all_meta_path evaluation_examples/test_nogdrive.json \
--result_dir "results" \ --result_dir "results" \
--region "us-east-1" \
--tool_config mm_agents/os_symphony/tool/all_tool_config.yaml \ --tool_config mm_agents/os_symphony/tool/all_tool_config.yaml \
--orchestrator_provider "openai" \ --orchestrator_provider "openai" \
--orchestrator_model "gpt-5" \ --orchestrator_model "gpt-5" \
--orchestrator_url "https://api.boyuerichdata.opensphereai.com/v1" \ --orchestrator_url "xxx" \
--orchestrator_api_key "xxx" \ --orchestrator_api_key "xxx" \
--orchestrator_temperature 0.1 \ --orchestrator_temperature 0.1 \
--orchestrator_keep_first_image \ --orchestrator_keep_first_image \
--max_trajectory_length 8 \ --max_trajectory_length 8 \
--grounder_provider "vllm" \ --grounder_provider "vllm" \
--grounder_model "gta1_32b" \ --grounder_model "UI-TARS-1.5-7B" \
--grounder_api_key "none" \ --grounder_api_key "none" \
--grounder_url "https://h.pjlab.org.cn/kapi/workspace.kubebrain.io/ailab-intern11/dingzichen-7jzkt-932268-worker-0.dingzichen/18080/v1/" \ --grounder_url "xxx" \
--grounding_smart_resize \ --grounding_smart_resize \
--grounding_width 1280 \ --grounding_width 1920 \
--grounding_height 800 \ --grounding_height 1080 \
--coder_provider "openai" \ --coder_provider "openai" \
--coder_model "gpt-5" \ --coder_model "gpt-5" \
--coder_url "https://api.boyuerichdata.opensphereai.com/v1" \ --coder_url "xxx" \
--coder_api_key "xxx" \ --coder_api_key "xxx" \
--coder_temperature 0.1 \ --coder_temperature 0.1 \
--coder_budget 20 \ --coder_budget 20 \
--memoryer_provider "openai" \ --memoryer_provider "openai" \
--memoryer_model "gpt-5" \ --memoryer_model "gpt-5" \
--memoryer_url "https://api.boyuerichdata.opensphereai.com/v1" \ --memoryer_url "xxx" \
--memoryer_api_key "xxx" \ --memoryer_api_key "xxx" \
--memoryer_temperature 0.1 \ --memoryer_temperature 0.1 \
--memoryer_max_images 8 \ --memoryer_max_images 8 \
--searcher_provider "openai" \ --searcher_provider "openai" \
--searcher_model "gpt-5" \ --searcher_model "gpt-5" \
--searcher_url "https://api.boyuerichdata.opensphereai.com/v1" \ --searcher_url "xxx" \
--searcher_api_key "xxx" \ --searcher_api_key "xxx" \
--searcher_temperature 0.1 \ --searcher_temperature 0.1 \
--searcher_type "vlm" \ --searcher_type "vlm" \
--searcher_engine "duckduckgo" \ --searcher_engine "google" \
--searcher_budget 20 \ --searcher_budget 20 \
--searcher_screen_width 1920 \ --searcher_screen_width 1920 \
--searcher_screen_height 1080 \ --searcher_screen_height 1080 \
--searcher_path_to_vm "xxx" \
--sleep_after_execution 3 \ --sleep_after_execution 3 \
--exp_name ${EXP_NAME} \ --exp_name ${EXP_NAME} \
--enable_reflection --enable_reflection >> logs/${EXP_NAME}.log 2>&1
# bash scripts/remove_all_osworld_container.sh > logs/${EXP_NAME}.log 2>&1 --enable_rewrite_instruction --grounding_smart_resize