Add ui agent (#343)

* add uipath agent

* readme update
This commit is contained in:
alexandruilie7
2025-09-24 14:42:46 +03:00
committed by GitHub
parent 088e68798c
commit f59cf00cae
14 changed files with 2167 additions and 1 deletions

View File

@@ -326,4 +326,60 @@ def run_single_example_mano(agent, env, example, max_steps, instruction, args, e
scores.append(result)
with open(os.path.join(example_result_dir, "result.txt"), "w", encoding="utf-8") as f:
f.write(f"{result}\n")
env.controller.end_recording(os.path.join(example_result_dir, "recording.mp4"))
env.controller.end_recording(os.path.join(example_result_dir, "recording.mp4"))
def run_single_example_uipath(agent, env, example, max_steps, instruction, args, example_result_dir, scores):
runtime_logger = setup_logger(example, example_result_dir)
try:
agent.reset(runtime_logger)
except Exception as e:
agent.reset()
env.reset(task_config=example)
time.sleep(60) # Wait for the environment to be ready
obs = env._get_obs() # Get the initial observation
done = False
step_idx = 0
env.controller.start_recording()
while not done and step_idx < max_steps:
response, actions = agent.predict(
instruction,
obs,
args,
step_idx
)
for action in actions:
# Capture the timestamp before executing the action
action_timestamp = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
logger.info("Step %d: %s", step_idx + 1, action)
obs, reward, done, info = env.step(action, args.sleep_after_execution)
logger.info("Reward: %.2f", reward)
logger.info("Done: %s", done)
# Save screenshot and trajectory information
with open(os.path.join(example_result_dir, f"step_{step_idx + 1}_{action_timestamp}.png"),
"wb") as _f:
_f.write(obs['screenshot'])
with open(os.path.join(example_result_dir, "traj.jsonl"), "a") as f:
f.write(json.dumps({
"step_num": step_idx + 1,
"action_timestamp": action_timestamp,
"action": action,
"response": response,
"reward": reward,
"done": done,
"info": info,
"screenshot_file": f"step_{step_idx + 1}_{action_timestamp}.png"
}))
f.write("\n")
if done:
logger.info("The episode is done.")
break
step_idx += 1
result = env.evaluate()
logger.info("Result: %.2f", result)
scores.append(result)
with open(os.path.join(example_result_dir, "result.txt"), "w", encoding="utf-8") as f:
f.write(f"{result}\n")
env.controller.end_recording(os.path.join(example_result_dir, "recording.mp4"))

View File

@@ -0,0 +1,71 @@
# UiPath Screen Agent
We propose a simple, yet effective implementation of a Computer Use Agent, which achieves a performance of **53.6%** on the **OSWorld** benchmark with 50 steps, demonstrating competitive results with a relatively lightweight setup and UI only actions.
Our system builds upon recent approaches in agentic computer use and follows the literature in adopting a two-stage architecture that separates high-level reasoning from low-level execution. Specifically, the system is composed of:
- **Action Planner (GPT-5)**: Responsible for generating high-level action sequences, reasoning about task goals and observing modifications in the environment.
- **Grounder (UI-TARS 1.5 + Internal UI Predictor)**: This component translates abstract plans into concrete interactions with the user interface. The UI-TARS 1.5 serves as the grounding mechanism, mapping planned actions to locations on screen, while the Internal UI Predictor assists in resolving ambiguous predictions, increasing the robustness and probability of the predictions to fall within UI elements.
![Schema](imgs/schema.png)
## Run
```
python run_multienv_uipath.py \
--provider_name docker \
--observation_type screenshot \
--model uipath_gpt_5 \
--sleep_after_execution 3 \
--max_steps 50 \
--num_envs 10 \
--action_space computer_13 \
--client_password password \
--test_all_meta_path evaluation_examples/test_all.json \
--uipath_model_name gpt-5-2025-08-07
```
## Action Planner
The Action Planner receives the current screenshot, a task description, and a history of previous steps - including past screenshots, observations, internal reasoning, and predicted actions. Its role is to plan the next steps toward achieving the task goal, anticipate changes in the environment, and determine the next action, providing clear reasoning for each decision.
The interaction history is structured as a conversation: the user reports the task, executed actions, supplies recent screenshots (up to the last two), and notes previously predicted outcomes of the agent, while the assistant replies consist of previously predicted agent responses. We adopt this conversational format because it mirrors the dialogue-style data the model was originally trained on, making the setup both natural and effective.
By combining the current state with this structured history, the Action Planner generates context-aware, informed predictions at every step, being able to reconstruct the sequence of actions that led him to this point, noticing eventual failures, and plan the subsequent steps.
We support a concise set of actions for interacting with the environment, focusing specifically on UI-related activities:
- Click (left, right, double click)
- Type
- Scroll
- Drag
- Mouse move
- Key press
- Extract data a pseudo-action used to capture information for later steps
- Finish
To help the planner model understand how to effectively apply actions, we provide them through few-shot examples.
We intentionally exclude more complex actions to isolate and evaluate the capabilities of a UI-focused agent, since certain advanced actions may not be applicable across all applications.
## Grounder
The Grounder maps the action to a certain point on the screen, if needed (for actions such as click, scroll, drag). It receives the screenshot, description of action to be performed and the type of the actions and returns a pair of integers representing the screen coordinates.
We utilized the `UI-TARS-1.5` model, which has amazing screen knowledge and capabilities, however, to ensure more confidence in the predicted coordinates, we employ a crop-and-refine method, using an internal UI element predictor.
### Crop and refine
We wrap the prediction of the grounding model with our internal UI element predictor. The goal of this step is not to guarantee that the prediction will always fall within an identified element, but to increase the likelihood of alignment and to give the model an opportunity to refine its output.
The UI element predictor consists of a shared feature extractor backbone and multiple prediction towers for:
- identifying UI elements or controls such as icons, input boxes, checkboxes, buttons, radio buttons
- tables and cells
- few other tasks not used for our approach, but employed in other use-cases and needed in training for improving the feature extractor performance
![Element preditions](imgs/element_predictions.png)
In most interfaces, actions are expected to interact directly with UI elements: buttons, fields, icons, or menus. When a prediction lands outside any element, this often signals a potential misprediction. While there are legitimate cases where a click outside elements makes sense (e.g., dismissing a modal, dragging to select text, or changing window focus), they are exceptions rather than the rule. By treating these situations as possible errors, we can provide the model with a structured way to reconsider its output.
Our approach is to give the model a “second shot” when its initial prediction falls outside an identified element. We do this by cropping around the former prediction and running the prediction again. This retry doesnt guarantee correctness, but it does give the model a chance to adjust and potentially recover from mistakes. We crop around the original coordinates including close UI elements.
This process gives the model multiple opportunities to predict within a relevant zone of the interface, reducing the overall number of mispredictions. In our experiments, the grounding model placed predictions outside any UI element about 11% of the time. After applying our refinement step, the second prediction was always different from the original, demonstrating that the model does reconsider and “changes its mind” when given this guided feedback.
## Conclusion
Our method offers a clean and simple yet competitive pipeline for Computer Use tasks. It is cost effective, minimizing token usage during planning, avoiding parallel planning and reliance on numerous past images, and incorporate only **direct UI actions** with refined grounding actions to improve accuracy. With this approach, we achieve **53.6%** accuracy on OSWorld with a 50-step horizon.

View File

@@ -0,0 +1,288 @@
import datetime
import json
from collections import OrderedDict
import time
import mm_agents.uipath.llm_client as llm_client
from mm_agents.uipath.types_utils import (
PlanAction,
ExecutionState,
State,
PlanActionType,
)
from mm_agents.uipath.action_planner_prompt_builder import (
ComputerUseAgentInterface,
PlanerCoTSections,
user_command_template,
user_task_info_template,
PlannerOutput,
)
from mm_agents.uipath.utils import ValidationException, parse_message_json
class ActionPlanner(object):
def __init__(self):
self.number_history_steps_with_images = 2
self.computer_use_agent_interface = ComputerUseAgentInterface()
def build_message_output_format_info(self) -> str:
output_dict = OrderedDict({})
for _, value in PlanerCoTSections.items():
display = value["display"]
description = value["description"]
output_dict[display] = description
output_dict["action"] = (
"<The action to perform in JSON format as specified in the system message>"
)
return json.dumps(output_dict, indent=4, ensure_ascii=False)
def get_step_content(
self, step: dict, following_step: dict | None
) -> tuple[str, str]:
content_dict = OrderedDict({})
observation_dict = OrderedDict({})
observation_dict["Performed actions"] = step["actions"]
if (
"extracted_data" in step["additional_parameters"]
): # if the step was an extraction step add the dummy extraction action
extraction_action = {
"type": PlanActionType.ExtractData,
"description": step["description"],
"status": "data extracted",
}
observation_dict["Performed actions"] = [extraction_action]
if following_step:
observation_dict["Observation"] = following_step[
"additional_parameters"
].get("review", None)
for key, value in PlanerCoTSections.items():
if key != "review":
param_value = step["additional_parameters"].get(key, None)
display_name = value["display"]
content_dict[display_name] = param_value
content_dict["actions"] = json.loads(
step["additional_parameters"]["plan_action"]
)
content_dict = json.dumps(content_dict, indent=4, ensure_ascii=False)
observation_dict = json.dumps(observation_dict, indent=4, ensure_ascii=False)
return content_dict, observation_dict
def build_messages_chat(self, state: State, execution_info: dict) -> list[dict]:
messages = []
system_message = {
"role": "system",
"content": self.computer_use_agent_interface.get_system_prompt(),
}
messages.append(system_message)
user_task_info_message = {
"role": "user",
"content": [
{
"type": "text",
"text": user_task_info_template.format(
task=state.task,
current_date=datetime.datetime.now().strftime("%Y-%m-%d"),
),
}
],
}
messages.append(user_task_info_message)
start_index = max(
0, len(state.previous_steps) - self.number_history_steps_with_images
)
end_index = len(state.previous_steps)
for index in range(0, end_index):
step = state.previous_steps[index]
if index >= start_index:
assert step["image"] is not None and len(step["image"]) > 0, (
"Step image is empty"
)
user_image_message = {
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{step['image']}"
},
},
],
}
messages.append(user_image_message)
assistant_message_text, user_observation = self.get_step_content(
step, state.previous_steps[index + 1] if index < end_index - 1 else None
)
assistant_message = {
"role": "assistant",
"content": [
{
"type": "text",
"text": assistant_message_text,
},
],
}
messages.append(assistant_message)
user_message_reply = {
"role": "user",
"content": [
{
"type": "text",
"text": user_observation,
},
],
}
messages.append(user_message_reply)
last_user_message = {
"role": "user",
"content": [
{
"type": "text",
"text": "Current screenshot:",
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{state.image_base64}"
},
},
{
"type": "text",
"text": user_command_template.format(
task=state.task,
execution_info_message=self.build_execution_info_message(
execution_info
),
json_output_format=self.build_message_output_format_info(),
),
},
],
}
messages.append(last_user_message)
return messages
def extract_response(
self, response_content: str
) -> tuple[PlanAction, dict[str, str]]:
cot_sections_lst = list(PlanerCoTSections.keys())
additional_sections = OrderedDict({})
response_json = parse_message_json(response_content)
for section in cot_sections_lst:
section_display = PlanerCoTSections[section]["display"]
if section_display not in response_json:
raise ValidationException(
f"Invalid response format, '{section}' key not found: {response_content}"
)
additional_sections[section] = response_json.get(
PlanerCoTSections[section]["display"]
)
if "action" not in response_json:
raise ValidationException(
f"Invalid response format, 'action' key not found: {response_content}"
)
action_dict = response_json["action"]
plan_action = PlanAction.from_dict(self.correct_action_type(action_dict))
if plan_action.action_type == PlanActionType.Drag:
self.computer_use_agent_interface.validate_action(plan_action)
return plan_action, additional_sections
def build_execution_info_message(self, execution_info: dict) -> str:
execution_info_message = ""
if "planner_action_review" in execution_info:
action_description = execution_info["planner_action_review"][
"action_description"
]
error_message = execution_info["planner_action_review"]["error_message"]
execution_info_message = f"You predicted this action: '{action_description}' but it is not valid because: {error_message}. If the target element is not visible on the screenshot, scroll first to make the target element visible. If the target element is not correct, change the action description with more precise element description using nearby context."
return execution_info_message
def correct_action_type(self, response_json: dict) -> dict:
action_type = response_json.get("type", "").lower()
if action_type in ("press", "key_press", "press_key"):
response_json["type"] = "key_press"
elif action_type in ("mouse_move", "move_mouse"):
response_json["type"] = "move_mouse"
elif action_type in ("type_text", "type_into", "type"):
response_json["type"] = "type"
elif "scroll" in action_type:
response_json["type"] = "scroll"
elif "wait" in action_type:
response_json["type"] = "wait"
return response_json
def predict(self, state: State, execution_state: ExecutionState) -> PlannerOutput:
messages = self.build_messages_chat(state, execution_state.execution_info)
llm_messages = [message for message in messages]
repeat_count = 2
plan, response_content = None, None
while repeat_count > 0:
try:
payload = {
"model": execution_state.model_name,
"messages": llm_messages,
"max_completion_tokens": 5000,
"reasoning_effort": "medium",
}
response_content = llm_client.send_messages(payload)
if response_content is None or len(response_content.strip()) == 0:
raise ValidationException("Planner response is None or empty")
plan_action, additional_sections = self.extract_response(
str(response_content)
)
plan = PlannerOutput(plan_action, additional_sections)
break
except ValidationException as e:
time.sleep(5)
repeat_count -= 1
ai_message = {
"role": "assistant",
"content": [
{
"type": "text",
"text": response_content,
},
],
}
error_message = {
"role": "user",
"content": [
{
"type": "text",
"text": f"{e.message}. Please try again and output a valid response in the correct format.",
},
],
}
llm_messages = messages + [ai_message, error_message]
if repeat_count == 0:
raise ValueError(
f"Invalid planner response format: {response_content}, {str(e)}"
)
if plan is None:
raise ValueError("Planner response is not valid")
return plan

View File

@@ -0,0 +1,390 @@
from collections import OrderedDict
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from mm_agents.uipath.types_utils import PlanAction, key_maps
from mm_agents.uipath.utils import ValidationException
system_template = """You are a computer use agent that perform computer-related tasks.
You will be given a task, a current screenshot, and a list of previous actions. You need to predict the next action.
## Available Actions:
{available_actions}
In addition there are some special actions that are not part of the main UI actions:
{special_actions}
Each action has a description and parameters. The action description is a single sentence which mentions the action and the control element to interact with.
This description will be used by the executor agent to locate the action's target element coordinates in the screen, so describe the element targeted by the action as detailed as possible.
Particularly for icons, you can describe their position, text on it, color, nearby elements etc...
Example of some action descriptions with more detailed information to help the executor agent locate the element:
- "Click on the Calendar icon with the text 'Thu 28'"
- "Click the 'Search' button on the top right corner next to the login button."
- "Click the 'First Name' input box from the UserInfo section to focus it before typing."
Your action response must be a valid JSON with the following format:
{{
"type": str # one of the valid action types
"description": # action description
"parameters": # optional, action parameters dictionary
}}
## Action examples: example of valid actions:
{examples}
## Important Notes:
- Close any cookies, ads, login or registration etc pop-ups if not needed.
- Before typing, ensure the input box is focused by clicking on it.
"""
user_command_template = """Recall Task Again: {task}
Check if the task is finished. If not provide the next action to perform.
Remember:
- Perform the task on provided application(s) or website(s). You are not allowed to use the browser "address bar".
- Close any cookies, ads, login or registration etc pop-ups if not needed.
- Only one action at a time (never "click and type", "click and drag", "type and press", "press shift and click", etc..). Think of how to combine them in two consecutive actions obtaining the intended result or use an available action that can obtain it.
- For any opening input combobox, dropdown menu options, you must select an option or press Enter key to select default one.
- Click on input box to ensure is focused before typing. Otherwise, the input box will not accept the text.
- Once focusing on an input box, if it has a default pre-typed value (not placeholder which is usually grayed-out), remove the existing value first by clicking on "X" icon or using "Ctrl A" + "Backspace" or "Backspace" if the value is already selected.
- For search input, if no search button or suggestions popup after typing, press 'Enter' to trigger search.
- Retry the drag action on slider control if needed to refine the slider values closer to expected values.
- Scroll / Pageup / Pagedown to explore or extract more content/data if needed (prefer 'key_press' action with key 'Pageup', 'Pagedown' for faster scrolling). Particularly when extraction data from table with hidden rows or columns.
- Scroll action must have a 'direction' parameter. Finish action must have a 'status' parameter.
- If you modify some settings remember to save/apply them. If button is not visible try to scroll for it.
Most importantly, never type or click on element not visible on screenshot. Use scroll or pageup/pagedown to make the element visible first.
{execution_info_message}
Answer in json format:
{json_output_format}
"""
PlanerCoTSections = OrderedDict(
{
"review": {
"display": "previous_action_result",
"description": "Briefly describe the previous action result and UI change on the screenshot to see if is correctly performed.",
},
"thought": {
"display": "thought",
"description": "Reason briefly about the next action to perform if the task is not finished.",
},
"action_description": {
"display": "action_description",
"description": "Describe the action to perform in a single sentence. The description must be precise and not rely on specific information in the current screen.",
},
}
)
### for chat conversation
user_task_info_template = """## Task Information:
The current date is (YYYY-MM-DD): {current_date}
Task: {task}
"""
@dataclass
class ActionDefinition:
type: str
description: str
parameters: Optional[Dict[str, str]] = None
examples: List[Dict[str, Any]] = field(default_factory=list)
class PlannerOutput(object):
def __init__(self, plan_action: PlanAction, additional_sections: dict[str, str]):
self.plan_action = plan_action
self.thought = additional_sections["thought"]
self.review = additional_sections["review"]
self.additional_sections = {
key: value
for key, value in additional_sections.items()
if key not in ["review", "thought"]
}
class ComputerUseAgentInterface:
def __init__(self):
self.ui_actions = {}
self.special_actions = {}
self._setup_default_actions()
def _setup_default_actions(self):
self.add_action(
ActionDefinition(
type="click",
description="Click on a UI element",
examples=[
{"type": "click", "description": "Click the 'Next' button."},
{
"type": "click",
"description": "Click the 'X' icon in the input box",
},
{
"type": "click",
"description": "Click the first name input box to focus on it.",
},
],
)
)
self.add_action(
ActionDefinition(
type="right_click",
description="Right click on a UI element",
examples=[
{
"type": "right_click",
"description": "Right click on the first row from the patient table to open the context menu.",
}
],
)
)
self.add_action(
ActionDefinition(
type="double_click",
description="Double click on a UI element",
examples=[
{
"type": "double_click",
"description": "Double click word app icon to open the application.",
},
],
)
)
self.add_action(
ActionDefinition(
type="type",
description="Type text into a focused input field. Ensure the input box is focused before typing. To focus the input box, you may need to click on it first.",
parameters={"text": "str - the text to be typed"},
examples=[
{
"type": "type",
"description": "Type 'John' in the first name input box.",
"parameters": {"text": "John"},
},
{
"type": "type",
"description": "Type 'Doe' in the last name input box.",
"parameters": {"text": "Doe"},
},
{
"type": "type",
"description": "Type 'Hello, world!' in the text area.",
"parameters": {"text": "Hello, world!"},
},
],
)
)
self.add_action(
ActionDefinition(
type="scroll",
description="Scroll an UI element in a specified direction",
parameters={
"direction": "str - 'up', 'down', 'left', or 'right'",
"distance": "int - the number of scroll steps (wheel “clicks”) to send.",
},
examples=[
{
"type": "scroll",
"description": "Scroll down to see more content.",
"parameters": {"direction": "down"},
},
{
"type": "scroll",
"description": "Scroll up to the top of the page.",
"parameters": {"direction": "up"},
},
],
)
)
self.add_action(
ActionDefinition(
type="drag",
description="Drag an element or the mouse (with left click on) from one location to another. You must specify both start_description and end_description.",
parameters={
"start_description": "description of the location to start dragging",
"end_description": "description of the location to drag to",
},
examples=[
{
"type": "drag",
"description": "Drag the response.txt file to the responses folder",
"start_description": "Click the response.txt file",
"end_description": "Click the responses folder",
},
],
)
)
self.add_action(
ActionDefinition(
type="mouse_move",
description="Move the mouse to a specific element",
examples=[
{
"type": "mouse_move",
"description": "Move the mouse to the 'Submit' button.",
},
{
"type": "mouse_move",
"description": "Hover over the 'Settings' icon.",
},
],
)
)
self.add_action(
ActionDefinition(
type="key_press",
description="Press a specific key on the keyboard",
parameters={
"key": f'str # the key or key combination (separated by space) to be pressed. Example of key combination "Ctrl A", "Shift Tab", "Ctrl C" etc. "<Key> + Click" is not a valid combination, use two separate actions. Beside normal keys like letters, numerics, punctuations etc.. here are special key list: {key_maps.keys()}.'
},
examples=[
{
"type": "key_press",
"description": "Press 'Ctrl A' to select all text.",
"parameters": {"key": "Ctrl A"},
},
{
"type": "key_press",
"description": "Press Pagedown key.",
"parameters": {"key": "Pagedown"},
},
],
)
)
self.add_special_action(
ActionDefinition(
type="extract_data",
description="Use to extract some data from the screen for the task. This data will be stored in memory and used in the next actions or returned in the final result.",
parameters={
"description": "str - short description of the data to be extracted",
"data": "str|json - the data to be extracted",
},
examples=[
{
"type": "extract_data",
"description": "Extract the product name and price from the screen.",
"parameters": {
"description": "Available product name and price",
"data": "Product Name: iPhone 14, Price: $999",
},
},
],
)
)
self.add_special_action(
ActionDefinition(
type="finish",
description=" Use it to finish the task with success or failure status. When you think the task was finished return success, while when you think can not be done, return failure, don't easily say failure, try your best to do the task.",
parameters={"status": "str - 'success' or 'failure'"},
examples=[
{
"type": "finish",
"description": "Task completed successfully.",
"parameters": {"status": "success"},
},
],
)
)
def add_action(self, action: ActionDefinition):
self.ui_actions[action.type] = action
def add_special_action(self, action: ActionDefinition):
self.special_actions[action.type] = action
def get_action_definition(self, action_type: str) -> Optional[ActionDefinition]:
return self.ui_actions.get(action_type) or self.special_actions.get(action_type)
def validate_action(self, action: PlanAction):
action_definition = self.get_action_definition(action.action_type)
if action_definition is None:
raise ValidationException(f"Invalid action type: {action.action_type}")
if action_definition.parameters:
for parameter in action_definition.parameters:
if parameter not in action.parameters:
raise ValidationException(
f"Missing parameter '{parameter}' in action: {action}"
)
def get_system_prompt(self) -> str:
indentation = " "
def get_action_definition(action: ActionDefinition) -> str:
action_prompt = f"- {action.type}: {action.description}"
if action.parameters is not None and len(action.parameters) > 0:
params = (",\n" + 2 * indentation).join(
f"{k}: {v}" for k, v in action.parameters.items()
)
parameter_def = (
f"{indentation}parameters:\n{indentation}{indentation}{params}"
)
action_prompt += "\n" + parameter_def
return action_prompt
def get_examples(actions: List[ActionDefinition]) -> list[str]:
output_examples = []
for action in actions:
for example in action.examples:
example_type = example["type"]
example_description = example["description"]
type_str = f'"type": "{example_type}"'
description_str = f'"description": "{example_description}"'
example_parts = [type_str, description_str]
if "parameters" in example:
params = (",\n" + 2 * indentation).join(
f'"{k}": "{v}"' for k, v in example["parameters"].items()
)
parameters_str = (
'"parameters"'
+ ": {\n"
+ 2 * indentation
+ params
+ "\n"
+ indentation
+ "}"
)
example_parts.append(parameters_str)
example_json = (
"{\n"
+ indentation
+ (",\n" + indentation).join(example_parts)
+ "\n}"
)
output_examples.append(example_json)
return output_examples
available_actions = "\n\n".join(
get_action_definition(action) for action in self.ui_actions.values()
)
special_actions = "\n\n".join(
get_action_definition(action) for action in self.special_actions.values()
)
examples = "\n\n".join(
get_examples(
list(self.ui_actions.values()) + list(self.special_actions.values())
)
)
return system_template.format(
available_actions=available_actions,
special_actions=special_actions,
examples=examples,
)
if __name__ == "__main__":
agent = ComputerUseAgentInterface()
print(agent.get_system_prompt())

223
mm_agents/uipath/agent.py Normal file
View File

@@ -0,0 +1,223 @@
import json
from mm_agents.uipath.types_utils import (
ComputerUseAction,
ComputerUseStep,
SupportedActions,
PlanActionType,
PlanAction,
key_maps,
ExecutionState,
State,
)
import mm_agents.uipath.utils as utils
from mm_agents.uipath.action_planner import ActionPlanner, PlannerOutput
from mm_agents.uipath.grounder_client import GrounderClient
class UiPathComputerUseV1(object):
def __init__(self):
self.planner = ActionPlanner()
self.executor = GrounderClient()
async def predict_request(
self, request_body: dict, model_name: str
) -> tuple[dict, dict]:
state = State(
task=request_body["userTask"],
image_base64=request_body["image"],
previous_steps=request_body.get("previousSteps", []),
)
execution_state = ExecutionState(model_name=model_name, execution_info={})
output = await self.predict(state, execution_state)
return output
def process_grounding(
self,
plan_action: PlanAction,
grounding_result: utils.GroundingOutput,
x: int,
y: int,
):
match plan_action.action_type:
case PlanActionType.Scroll:
# guess the scroll direction if missing in the plan output
if "direction" not in plan_action.parameters:
if "scroll up" in plan_action.description.lower():
scroll_direction = "up"
else:
scroll_direction = "down"
else:
scroll_direction = plan_action.parameters["direction"]
action = ComputerUseAction(
name=SupportedActions.Scroll,
description=plan_action.description,
parameters={"position": [x, y], "direction": scroll_direction},
)
if "distance" in plan_action.parameters:
match scroll_direction:
case "up":
action.parameters["offset"] = [
0,
plan_action.parameters["distance"],
]
case "down":
action.parameters["offset"] = [
0,
-plan_action.parameters["distance"],
]
case "left":
action.parameters["offset"] = [
plan_action.parameters["distance"],
0,
]
case "right":
action.parameters["offset"] = [
-plan_action.parameters["distance"],
0,
]
case PlanActionType.Drag:
assert grounding_result.end_position is not None, (
"End position must be provided for drag action"
)
x_end, y_end = grounding_result.end_position
action = ComputerUseAction(
name=SupportedActions.Drag,
description=plan_action.description,
parameters={
"path": [
{"x": x, "y": y},
{"x": x_end, "y": y_end},
]
},
)
case _:
action_name = plan_action.action_type
parameters = {"position": [x, y]}
if plan_action.action_type == PlanActionType.DoubleClick:
action_name = SupportedActions.Click
parameters["click_type"] = "double"
elif plan_action.action_type == PlanActionType.RightClick:
action_name = SupportedActions.Click
parameters["button"] = "right"
elif plan_action.action_type == PlanActionType.MouseMove:
action_name = SupportedActions.MouseMove # different names
assert action_name in [
SupportedActions.Click,
SupportedActions.MouseMove,
]
action = ComputerUseAction(
name=action_name,
description=plan_action.description,
parameters=parameters,
)
return action
async def predict(
self, state: State, execution_state: ExecutionState
) -> tuple[dict, dict]:
planer_output: PlannerOutput = self.planner.predict(state, execution_state)
plan_action = planer_output.plan_action
action: ComputerUseAction | None = None
step: ComputerUseStep | None = None
match plan_action.action_type:
case PlanActionType.KeyPress:
keys = plan_action.parameters["key"].split(" ")
keys = [key.strip() for key in keys]
keys = [key_maps.get(key, key) for key in keys]
action = ComputerUseAction(
name=SupportedActions.KeyPress,
description=plan_action.description,
parameters={"keys": keys},
)
case PlanActionType.Wait:
action = ComputerUseAction(
name=SupportedActions.Wait,
description=plan_action.description,
parameters={},
)
case PlanActionType.ExtractData:
# return a step with no action, just to store the extracted data
step = ComputerUseStep(
description=plan_action.description,
actions=[],
additional_parameters={
"extracted_data": plan_action.parameters,
},
thought=planer_output.thought,
)
case PlanActionType.Finish:
action = ComputerUseAction(
name=SupportedActions.Finish,
description=plan_action.description,
parameters=plan_action.parameters,
)
case (
PlanActionType.Click
| PlanActionType.MouseMove
| PlanActionType.Scroll
| PlanActionType.Drag
| PlanActionType.DoubleClick
| PlanActionType.RightClick
):
if plan_action.action_type != PlanActionType.Drag:
grounding_result = await self.executor.predict(
state.image_base64,
plan_action.description,
action=plan_action.action_type,
)
else:
grounding_result = await self.executor.predict(
state.image_base64,
plan_action.parameters["start_description"],
action=plan_action.action_type,
)
grounding_result_end = await self.executor.predict(
state.image_base64,
plan_action.parameters["end_description"],
action=plan_action.action_type,
)
grounding_result.end_position = grounding_result_end.position
x, y = grounding_result.position
action = self.process_grounding(plan_action, grounding_result, x, y)
case PlanActionType.Type:
action = ComputerUseAction(
name=SupportedActions.TypeInto,
description=plan_action.description,
parameters={"value": plan_action.parameters["text"]},
)
if step is None:
assert action is not None
step = ComputerUseStep(
description=plan_action.description,
actions=[action],
additional_parameters={},
thought=planer_output.thought,
)
# save additional data for history
assert step.additional_parameters is not None
step.additional_parameters["thought"] = planer_output.thought
step.additional_parameters["review"] = planer_output.review
step.additional_parameters.update(planer_output.additional_sections)
step.additional_parameters["plan_action"] = json.dumps(plan_action.to_dict())
history_image = state.image_base64
previous_steps_parameters = {
"max_chat_history_messages": 1000,
"max_chat_history_images": self.planner.number_history_steps_with_images,
"image": history_image,
}
agent_response = {
"step": step.to_response_dict(),
"previous_steps_parameters": previous_steps_parameters,
}
return agent_response

View File

@@ -0,0 +1,43 @@
import httpx
import mm_agents.uipath.utils as utils
import os
class GrounderClient(object):
def __init__(self):
# Proxy for hosting UI-TARS + UiElementPredictor
# Could be replaced with a VLLM server and grounder (UI-TARS) specific processing
# Or any other grounder
self.url = ""
async def predict(
self, image_base64: str, action_description: str, action: str | None = None
) -> utils.GroundingOutput:
request = utils.GroundingRequest(
description=action_description,
image_base64=image_base64,
action_type=action,
)
api_key = os.getenv("SERVICE_KEY")
async with httpx.AsyncClient() as client:
response = await client.post(
self.url,
json={
"image_base64": request.image_base64,
"action_description": request.description,
"action": request.action_type,
},
headers={
"X-API-KEY": api_key
},
timeout=100.0,
)
if response.status_code != 200:
raise ValueError(f"Prediction failed: {response.text}")
data = response.json()
return utils.GroundingOutput(
description=data["description"],
position=tuple(data["position"]),
)

Binary file not shown.

After

Width:  |  Height:  |  Size: 798 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 386 KiB

View File

@@ -0,0 +1,44 @@
import os
import requests
def send_messages(payload):
# URL to your proxy for calling LLMs
proxy_url = ""
api_key = os.getenv("SERVICE_KEY")
# Can be directly replaced with code for calling Azure endpoint as in:
#.env config example :
# AZURE_OPENAI_API_BASE=YOUR_API_BASE
# AZURE_OPENAI_DEPLOYMENT=YOUR_DEPLOYMENT
# AZURE_OPENAI_API_VERSION=YOUR_API_VERSION
# AZURE_OPENAI_MODEL=gpt-4o-mini
# AZURE_OPENAI_API_KEY={{YOUR_API_KEY}}
# AZURE_OPENAI_ENDPOINT=${AZURE_OPENAI_API_BASE}/openai/deployments/${AZURE_OPENAI_DEPLOYMENT}/chat/completions?api-version=${AZURE_OPENAI_API_VERSION}
# Load environment variables
# load_dotenv()
# api_key = os.getenv('AZURE_OPENAI_API_KEY')
# openai_endpoint = os.getenv('AZURE_OPENAI_ENDPOINT')
# #logger.info("Openai endpoint: %s", openai_endpoint)
# headers = {
# "Content-Type": "application/json",
# "api-key": api_key
# }
# response = requests.post(
# openai_endpoint,
# headers=headers,
# json=payload
# )
headers = {
"Content-Type": "application/json",
"X-API-KEY": api_key
}
retries = 3
for attempt in range(retries):
response = requests.post(proxy_url, headers=headers, json=payload)
if response.status_code == 200:
return response.json()["choices"][0]["message"]["content"]
return None

View File

@@ -0,0 +1,194 @@
from typing import Optional, Union, List
from enum import Enum
key_maps = {
"Backspace": "Back",
"Ctrl": "Ctrl",
"Shift": "Shift",
"Tab": "Tab",
"Enter": "Enter",
"Escape": "Esc",
"Arrowleft": "Left",
"Arrowup": "Up",
"Arrowright": "Right",
"Arrowdown": "Down",
"Delete": "Del",
"Pageup": "PgUp",
"Pagedown": "PgDn",
}
class PlanActionType(str, Enum):
Click = "click"
DoubleClick = "double_click"
RightClick = "right_click"
Type = "type"
Scroll = "scroll"
Drag = "drag"
Wait = "wait"
KeyPress = "key_press"
MouseMove = "move_mouse"
ExtractData = "extract_data"
Finish = "finish"
VALID_PLAN_ACTIONS = [action.value for action in PlanActionType]
class PlanAction:
def __init__(
self, action_type: str, description: str, parameters: dict | None = None
):
self.action_type = action_type
self.description = description
self.parameters = parameters if parameters is not None else {}
def to_dict(self) -> dict:
return {
"type": self.action_type,
"description": self.description,
"parameters": self.parameters,
}
@classmethod
def from_dict(cls, data: dict | None) -> Union["PlanAction", None]:
if data is None:
return None
action_type = data.get("type", "").lower()
if action_type not in VALID_PLAN_ACTIONS:
raise Exception(f"Invalid action type: {action_type}")
target_element = data.get("target_element", None)
action = PlanAction(
action_type=action_type,
description=data.get("description", ""),
parameters=data.get("parameters", {}),
)
if target_element is not None:
action.parameters["target_element"] = target_element
return action
class SupportedActions(str, Enum):
Click = "click"
TypeInto = "type_into"
Scroll = "scroll"
Drag = "drag"
Wait = "wait_load_completed"
KeyPress = "keypress"
MouseMove = "mouse_move"
Finish = "finish"
def __str__(self):
return self.value
class ComputerUseAction(object):
def __init__(
self,
name: str,
description: str,
parameters: dict,
action_id: str | None = None,
result: Optional[dict | str] = None,
):
self.id = action_id
self.name = name
self.parameters = parameters
self.description = description
self.result = result
@staticmethod
def from_dict(action_dict: dict):
result = action_dict.get("result")
if (
result is not None
and isinstance(result, dict)
and "token_usage" in result
and "data" in result
):
result = result["data"]
return ComputerUseAction(
name=action_dict["name"],
description=action_dict["description"],
result=result,
parameters=action_dict.get("parameters", {}),
)
def to_response_dict(self):
action_dict = {
"description": self.description,
"method_type": self.name,
"parameters": self.parameters,
"id": self.id,
}
if self.result is not None:
action_dict["result"] = self.result
return action_dict
class ComputerUseStep(object):
def __init__(
self,
description: str,
actions: List[ComputerUseAction],
thought: str | None = None,
screen_info: dict | None = None,
image: str | None = None,
additional_parameters: dict | None = None,
):
self.description = description
self.actions: List[ComputerUseAction] = actions
self.thought = thought
self.screen_info = screen_info
self.additional_parameters = additional_parameters
self.image = image
@staticmethod
def from_dict(step_dict):
return ComputerUseStep(
description=step_dict["description"],
thought=step_dict.get("thought"),
actions=[
ComputerUseAction.from_dict(action_dict)
for action_dict in step_dict["actions"]
],
)
def to_response_dict(self):
response_step = {
"description": self.description,
"thought": self.thought,
"additional_parameters": self.additional_parameters,
}
response_actions = []
for action in self.actions:
action_dict = action.to_response_dict()
response_actions.append(action_dict)
if self.image is not None:
response_step["image"] = self.image
response_step["actions"] = response_actions
return response_step
class State(object):
def __init__(self, task: str, image_base64: str, previous_steps: list):
self.task = task
self.image_base64 = image_base64
self.previous_steps = previous_steps
class ExecutionState(object):
def __init__(self, model_name: str, execution_info: dict):
self.model_name = model_name
self.execution_info = execution_info

57
mm_agents/uipath/utils.py Normal file
View File

@@ -0,0 +1,57 @@
import json
import re
from json_minify import json_minify
from json_repair import repair_json
class ValidationException(Exception):
def __init__(self, message: str):
self.message = message
def parse_message_json(message: str) -> dict:
message = message.strip()
code_block_pattern = r"```json\s*([\s\S]+?)```"
code_block_match = re.search(code_block_pattern, message, re.DOTALL)
if code_block_match:
json_str = code_block_match.group(1).strip()
else:
bracket_pattern = r"\{.*\}"
bracket_match = re.search(bracket_pattern, message, re.DOTALL)
if not bracket_match:
raise ValidationException("Response does not have correct json format")
json_str = bracket_match.group(0).strip()
try:
json_str = json_minify(json_str)
data = json.loads(json_str)
except json.JSONDecodeError:
try:
json_str = repair_json(json_str)
data = json.loads(json_str)
except json.JSONDecodeError:
raise ValidationException("Response does not have correct json format")
return data
class GroundingOutput:
def __init__(
self,
description: str,
position: tuple[int, int],
end_position: tuple[int, int] = None,
):
self.description = description
self.position = position
self.end_position = end_position
class GroundingRequest:
def __init__(
self, description: str, image_base64: str, action_type: str | None = None
):
self.description = description
self.image_base64 = image_base64
self.action_type = action_type

238
mm_agents/uipath_agent.py Normal file
View File

@@ -0,0 +1,238 @@
import base64
import json
from typing import Dict, List
import re
import asyncio
import logging
from mm_agents.uipath.agent import UiPathComputerUseV1
def parse_actions_from_string(input_string):
if input_string.strip() in ["WAIT", "DONE", "FAIL"]:
return [input_string.strip()]
actions = []
matches = re.findall(r"```json\s+(.*?)\s+```", input_string, re.DOTALL)
if matches:
try:
for match in matches:
action_dict = json.loads(match)
actions.append(action_dict)
return actions
except json.JSONDecodeError as e:
return f"Failed to parse JSON: {e}"
else:
matches = re.findall(r"```\s+(.*?)\s+```", input_string, re.DOTALL)
if matches:
try:
for match in matches:
action_dict = json.loads(match)
actions.append(action_dict)
return actions
except json.JSONDecodeError as e:
return f"Failed to parse JSON: {e}"
else:
try:
action_dict = json.loads(input_string)
return [action_dict]
except json.JSONDecodeError:
raise ValueError("Invalid response format: " + input_string)
def map_key(key):
key = key.lower()
if key == "space":
key = " "
elif key == "back":
key = "backspace"
elif key == "super":
key = "win"
elif key == "arrowdown":
key = "down"
elif key == "arrowup":
key = "up"
elif key == "arrowright":
key = "right"
elif key == "arrowrleft":
key = "left"
return key
def map_uipath_agent_actions_to_osworld(actions):
results = []
def handle_click(params):
x, y = tuple(params["position"])
if "button" in params:
if params["button"] == "right":
return {"action_type": "RIGHT_CLICK", "x": x, "y": y}
elif params["button"] == "left":
return {"action_type": "LEFT_CLICK", "x": x, "y": y}
else:
raise ValueError(f"Unknown click button: {params['button']}")
elif "click_type" in params:
if params["click_type"] == "double":
return {"action_type": "DOUBLE_CLICK", "x": x, "y": y}
elif params["click_type"] == "triple":
return {"action_type": "TRIPLE_CLICK", "x": x, "y": y}
else:
raise ValueError(f"Unknown click type: {params['click_type']}")
else:
return {"action_type": "CLICK", "x": x, "y": y}
def handle_keypress(params):
keys = [map_key(k) for k in params["keys"]]
if len(keys) == 1:
return {"action_type": "PRESS", "key": keys[0]}
return {"action_type": "HOTKEY", "keys": keys}
def handle_key_event(params, event_type):
key = map_key(params["keys"][0])
return {"action_type": event_type, "key": key}
for action in actions:
method = action["method_type"].lower()
params = action["parameters"]
match method:
case "click":
result = handle_click(params)
case "type_into":
result = {"action_type": "TYPING", "text": params["value"]}
case "wait_load_completed":
result = "WAIT"
case "keypress":
result = handle_keypress(params)
case "keydown":
result = handle_key_event(params, "KEY_DOWN")
case "keypup":
result = handle_key_event(params, "KEY_UP")
case "finish":
status_map = {"failure": "FAIL", "success": "DONE"}
result = status_map.get(params.get("status"), "DONE")
case "scroll":
x, y = tuple(params["position"])
if "offset" in params:
dx, dy = tuple(params["offset"])
else:
dy = 5 if params["direction"] == "up" else -5
dx = 5 if params["direction"] == "left" else -5
result = [
{"action_type": "MOVE_TO", "x": x, "y": y},
{"action_type": "SCROLL", "dx": dx, "dy": dy},
]
case "mouse_move":
x, y = tuple(params["position"])
result = {"action_type": "MOVE_TO", "x": x, "y": y}
case "drag":
path = params["path"]
x1, y1 = path[0]["x"], path[0]["y"]
x2, y2 = path[1]["x"], path[1]["y"]
result = [
{"action_type": "MOVE_TO", "x": x1, "y": y1},
{"action_type": "DRAG_TO", "x": x2, "y": y2},
]
case _:
raise ValueError(f"Unknown method type: {method}")
results.append(result)
return json.dumps(results)
class UipathBaseAgent:
def __init__(
self,
platform="ubuntu",
model="gpt-5-mini-2025-08-07",
action_space="computer_13",
observation_type="screenshot",
client_password="password",
):
self.platform = platform
self.model = model
self.action_space = action_space
self.observation_type = observation_type
self.client_password = client_password
self.uipath_computer_use_model = UiPathComputerUseV1()
self.thoughts = []
self.actions = []
self.observations = []
self.uipath_hist = []
def update_history(self, rsp, img_base64):
self.uipath_hist.append(
{
"actions": rsp["step"]["actions"],
"description": rsp["step"]["description"],
"additional_parameters": {
"review": rsp["step"]["additional_parameters"]["review"],
"thought": rsp["step"]["additional_parameters"]["thought"],
"action_description": rsp["step"]["additional_parameters"][
"action_description"
],
"plan_action": rsp["step"]["additional_parameters"]["plan_action"],
},
"image": img_base64,
}
)
def predict(self, instruction: str, obs: Dict, args, step_idx) -> List:
if step_idx == args.max_steps - 1:
message = (
instruction
+ "The sudo password is password, if needed. This is the last step, you must return the finish actions with either success or failure, depending on the result. No further steps are allowed."
)
else:
message = instruction + "The sudo password is password, if needed."
img_base64 = base64.b64encode(obs["screenshot"]).decode("utf-8")
payload = {
"previousSteps": self.uipath_hist,
"userTask": message,
"image": img_base64,
"model_name": args.uipath_model_name,
}
rsp = asyncio.run(
self.uipath_computer_use_model.predict_request(
payload, args.uipath_model_name
)
)
self.update_history(rsp, img_base64)
uipath_actions = map_uipath_agent_actions_to_osworld(rsp["step"]["actions"])
try:
actions = self.parse_actions(uipath_actions)
self.thoughts.append(rsp)
except ValueError as e:
print("Failed to parse action from response", e)
actions = None
self.thoughts.append("")
if len(actions) != 0:
while actions and isinstance(actions[0], list):
actions = [
action for multi_action in actions for action in multi_action
]
return rsp["step"], actions
def parse_actions(self, response: str, masks=None):
if self.observation_type in ["screenshot"]:
if self.action_space == "computer_13":
actions = parse_actions_from_string(response)
else:
raise ValueError("Invalid action space: " + self.action_space)
self.actions.append(actions)
return actions
else:
raise ValueError("Invalid observation type: " + self.action_space)
def reset(self, _logger=None):
global logger
logger = (
_logger if _logger is not None else logging.getLogger("desktopenv.agent")
)
self.thoughts = []
self.actions = []
self.observations = []
self.uipath_hist = []

View File

@@ -68,3 +68,5 @@ anthropic
alibabacloud_ecs20140526
alibabacloud_tea_openapi
alibabacloud_tea_util
json_minify
json_repair

560
run_multienv_uipath.py Normal file
View File

@@ -0,0 +1,560 @@
from __future__ import annotations
import argparse
import datetime
import json
import logging
import os
import sys
import signal
import time
from typing import List
from multiprocessing import Process, Manager
from multiprocessing import current_process
import lib_run_single
from desktop_env.desktop_env import DesktopEnv
from mm_agents.uipath_agent import UipathBaseAgent
from queue import Queue
# Global variables for signal handling
active_environments = []
processes = []
is_terminating = False
# import wandb
# load the environment variables from .env file
if os.path.exists(".env"):
from dotenv import load_dotenv
load_dotenv()
# Logger Configs {{{ #
def config() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Run end-to-end evaluation on the benchmark"
)
# environment config
parser.add_argument("--uipath_model_name", type=str, default="gpt-5-2025-08-07")
parser.add_argument("--path_to_vm", type=str, default=None)
parser.add_argument(
"--headless", action="store_true", help="Run in headless machine"
)
parser.add_argument(
"--action_space", type=str, default="pyautogui", help="Action type"
)
parser.add_argument(
"--observation_type",
choices=["screenshot", "a11y_tree", "screenshot_a11y_tree", "som"],
default="screenshot",
help="Observation type",
)
parser.add_argument("--sleep_after_execution", type=float, default=0.0)
parser.add_argument("--max_steps", type=int, default=15)
# agent config
parser.add_argument("--max_trajectory_length", type=int, default=3)
parser.add_argument(
"--test_config_base_dir", type=str, default="evaluation_examples"
)
# lm config
parser.add_argument("--model", type=str, default="gpt-4o")
parser.add_argument("--temperature", type=float, default=1.0)
parser.add_argument("--top_p", type=float, default=0.9)
parser.add_argument("--max_tokens", type=int, default=1500)
parser.add_argument("--stop_token", type=str, default=None)
# example config
parser.add_argument("--domain", type=str, default="all")
parser.add_argument(
"--test_all_meta_path", type=str, default="evaluation_examples/test_all.json"
)
# logging related
parser.add_argument("--result_dir", type=str, default="./results")
parser.add_argument(
"--num_envs",
type=int,
default=1,
help="Number of environments to run in parallel",
)
parser.add_argument(
"--log_level",
type=str,
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default="INFO",
help="Set the logging level",
)
# aws config
parser.add_argument(
"--region", type=str, default="us-east-1", help="AWS region for the VM"
)
parser.add_argument(
"--provider_name",
type=str,
default="docker",
choices=["aws", "virtualbox", "vmware", "docker", "azure"],
help="Provider name",
)
parser.add_argument(
"--client_password", type=str, default="", help="Client password"
)
parser.add_argument("--screen_width", type=int, default=1920, help="Screen width")
parser.add_argument("--screen_height", type=int, default=1080, help="Screen height")
args = parser.parse_args()
return args
args = config() # Get command line arguments first
logger = logging.getLogger()
log_level = getattr(logging, args.log_level.upper())
logger.setLevel(log_level)
datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
file_handler = logging.FileHandler(
os.path.join("logs", "normal-{:}.log".format(datetime_str)), encoding="utf-8"
)
debug_handler = logging.FileHandler(
os.path.join("logs", "debug-{:}.log".format(datetime_str)), encoding="utf-8"
)
stdout_handler = logging.StreamHandler(sys.stdout)
file_handler.setLevel(logging.INFO)
debug_handler.setLevel(logging.DEBUG)
stdout_handler.setLevel(log_level)
formatter = logging.Formatter(
fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s"
)
file_handler.setFormatter(formatter)
debug_handler.setFormatter(formatter)
stdout_handler.setFormatter(formatter)
stdout_handler.addFilter(logging.Filter("desktopenv"))
logger.addHandler(file_handler)
logger.addHandler(debug_handler)
logger.addHandler(stdout_handler)
# }}} Logger Configs #
logger = logging.getLogger("desktopenv.experiment")
def distribute_tasks(test_all_meta: dict) -> List[tuple]:
all_tasks = []
for domain, examples in test_all_meta.items():
for example_id in examples:
all_tasks.append((domain, example_id))
return all_tasks
def process_signal_handler(signum, frame, env_idx):
"""Signal handler for child processes to gracefully shut down their environments."""
logger.info(f"Process {env_idx + 1} received signal {signum}. Shutting down...")
# Get the active_environments from the caller's frame
local_vars = frame.f_locals
active_environments = local_vars.get("active_environments", [])
# Close environment in the current process context
for env in active_environments:
if env is not None:
try:
logger.info(f"Process {env_idx + 1} closing environment...")
env.close()
logger.info(f"Process {env_idx + 1} environment closed successfully")
except Exception as e:
logger.error(f"Process {env_idx + 1} error closing environment: {e}")
logger.info(f"Process {env_idx + 1} shutdown complete. Exiting.")
sys.exit(0)
def run_env_tasks(task_queue: Queue, args: argparse.Namespace, shared_scores: list):
active_environments = []
env = None
try:
# from desktop_env.providers.aws.manager import IMAGE_ID_MAP
# REGION = args.region
screen_size = (args.screen_width, args.screen_height)
# ami_id = IMAGE_ID_MAP[REGION].get(screen_size, IMAGE_ID_MAP[REGION][(1920, 1080)])
env = DesktopEnv(
path_to_vm=args.path_to_vm,
action_space=args.action_space,
provider_name=args.provider_name,
# region=REGION,
# snapshot_name=ami_id,
screen_size=screen_size,
headless=args.headless,
os_type="Ubuntu",
require_a11y_tree=args.observation_type
in ["a11y_tree", "screenshot_a11y_tree", "som"],
enable_proxy=False,
client_password=args.client_password,
)
active_environments.append(env)
agent = UipathBaseAgent(
model=args.model,
action_space=args.action_space,
observation_type=args.observation_type,
client_password=args.client_password,
)
logger.info(f"Process {current_process().name} started.")
while True:
try:
item = task_queue.get(timeout=5)
except Exception:
break
domain, example_id = item
try:
config_file = os.path.join(
args.test_config_base_dir, f"examples/{domain}/{example_id}.json"
)
with open(config_file, "r", encoding="utf-8") as f:
example = json.load(f)
logger.info(f"[{current_process().name}][Domain]: {domain}")
logger.info(f"[{current_process().name}][Example ID]: {example_id}")
logger.info(
f"[{current_process().name}][Instruction]: {example['instruction']}"
)
example_result_dir = os.path.join(
args.result_dir,
args.action_space,
args.observation_type,
args.model,
domain,
example_id,
)
os.makedirs(example_result_dir, exist_ok=True)
try:
lib_run_single.run_single_example_uipath(
agent,
env,
example,
args.max_steps,
example["instruction"],
args,
example_result_dir,
shared_scores,
)
except Exception as e:
import traceback
logger.error(
f"Exception in {current_process().name} {domain}/{example_id}: {e}"
)
logger.error(traceback.format_exc())
try:
env.controller.end_recording(
os.path.join(example_result_dir, "recording.mp4")
)
except Exception as rec_e:
logger.error(f"Failed to end recording: {rec_e}")
with open(os.path.join(example_result_dir, "traj.jsonl"), "a") as f:
f.write(json.dumps({"Error": f"{domain}/{example_id} - {e}"}))
f.write("\n")
except Exception as e:
logger.error(f"Task-level error in {current_process().name}: {e}")
import traceback
logger.error(traceback.format_exc())
except Exception as e:
logger.error(f"Process-level error in {current_process().name}: {e}")
import traceback
logger.error(traceback.format_exc())
finally:
logger.info(f"{current_process().name} cleaning up environment...")
try:
if env:
env.close()
logger.info(f"{current_process().name} environment closed successfully")
except Exception as e:
logger.error(
f"{current_process().name} error during environment cleanup: {e}"
)
def signal_handler(signum, frame):
"""Handle termination signals (SIGINT, SIGTERM) to gracefully shutdown environments."""
global is_terminating, active_environments, processes
# Avoid duplicate handling
if is_terminating:
return
is_terminating = True
logger.info(f"Received signal {signum}. Gracefully shutting down...")
# Close all registered environments in the main process
for env in active_environments:
try:
logger.info("Closing environment...")
env.close()
logger.info("Environment closed successfully")
except Exception as e:
logger.error(f"Error closing environment: {e}")
# Send termination signal to all child processes first
for p in processes:
if p.is_alive():
try:
logger.info(f"Sending termination signal to process {p.name}...")
p.terminate()
except Exception as e:
logger.error(f"Error sending termination signal to process: {e}")
# Allow a short time for processes to handle their own cleanup
time.sleep(1)
# Forcefully terminate any processes that didn't exit
for p in processes:
if p.is_alive():
try:
logger.info(f"Forcefully terminating process {p.name}...")
import signal as sig
os.kill(p.pid, sig.SIGKILL)
except Exception as e:
logger.error(f"Error forcefully terminating process: {e}")
logger.info("Shutdown complete. Exiting.")
sys.exit(0)
def test(args: argparse.Namespace, test_all_meta: dict) -> None:
global processes
logger.info("Args: %s", args)
all_tasks = distribute_tasks(test_all_meta)
logger.info(f"Total tasks: {len(all_tasks)}")
with Manager() as manager:
shared_scores = manager.list()
task_queue = manager.Queue()
for item in all_tasks:
task_queue.put(item)
num_envs = args.num_envs
processes = []
for i in range(num_envs):
p = Process(
target=run_env_tasks,
args=(task_queue, args, shared_scores),
name=f"EnvProcess-{i + 1}",
)
p.daemon = True
p.start()
processes.append(p)
logger.info(f"Started process {p.name} with PID {p.pid}")
try:
while True:
alive_count = 0
for idx, p in enumerate(processes):
if not p.is_alive():
logger.warning(f"Process {p.name} died, restarting...")
new_p = Process(
target=run_env_tasks,
args=(task_queue, args, shared_scores),
name=f"EnvProcess-Restart-{idx + 1}",
)
new_p.daemon = True
new_p.start()
processes[idx] = new_p
logger.info(
f"Restarted process {new_p.name} with PID {new_p.pid}"
)
else:
alive_count += 1
if task_queue.empty():
logger.info("All tasks finished.")
break
if alive_count == 0:
logger.error("All processes died, exiting.")
break
time.sleep(5)
for p in processes:
p.join()
except KeyboardInterrupt:
logger.info(
"Main process received KeyboardInterrupt. Initiating graceful shutdown..."
)
raise
except Exception as e:
logger.error(
f"Unexpected error while waiting for processes: {e}", exc_info=True
)
for p in processes:
if p.is_alive():
try:
logger.info(f"Terminating process {p.name} due to error...")
p.terminate()
except Exception as term_e:
logger.error(f"Error terminating process {p.name}: {term_e}")
raise
scores = list(shared_scores)
logger.info(f"Average score: {sum(scores) / len(scores) if scores else 0}")
def get_unfinished(
action_space, use_model, observation_type, result_dir, total_file_json
):
target_dir = os.path.join(result_dir, action_space, observation_type, use_model)
if not os.path.exists(target_dir):
return total_file_json
finished = {}
for domain in os.listdir(target_dir):
finished[domain] = []
domain_path = os.path.join(target_dir, domain)
if os.path.isdir(domain_path):
for example_id in os.listdir(domain_path):
if example_id == "onboard":
continue
example_path = os.path.join(domain_path, example_id)
if os.path.isdir(example_path):
if "result.txt" not in os.listdir(example_path):
# empty all files under example_id
for file in os.listdir(example_path):
os.remove(os.path.join(example_path, file))
else:
finished[domain].append(example_id)
if not finished:
return total_file_json
for domain, examples in finished.items():
if domain in total_file_json:
total_file_json[domain] = [
x for x in total_file_json[domain] if x not in examples
]
return total_file_json
def get_result(action_space, use_model, observation_type, result_dir, total_file_json):
target_dir = os.path.join(result_dir, action_space, observation_type, use_model)
if not os.path.exists(target_dir):
print("New experiment, no result yet.")
return None
all_result = []
for domain in os.listdir(target_dir):
domain_path = os.path.join(target_dir, domain)
if os.path.isdir(domain_path):
for example_id in os.listdir(domain_path):
example_path = os.path.join(domain_path, example_id)
if os.path.isdir(example_path):
if "result.txt" in os.listdir(example_path):
# empty all files under example_id
try:
with open(
os.path.join(example_path, "result.txt"), "r"
) as f:
all_result.append(float(f.read()))
except (FileNotFoundError, ValueError, OSError):
all_result.append(0.0)
if not all_result:
print("New experiment, no result yet.")
return None
else:
print("Current Success Rate:", sum(all_result) / len(all_result) * 100, "%")
return all_result
if __name__ == "__main__":
####### The complete version of the list of examples #######
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# Register signal handlers for graceful termination
signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # Handle termination signal
try:
args = config()
# save args to json in result_dir/action_space/observation_type/model/args.json
path_to_args = os.path.join(
args.result_dir,
args.action_space,
args.observation_type,
args.model,
"args.json",
)
os.makedirs(os.path.dirname(path_to_args), exist_ok=True)
with open(path_to_args, "w", encoding="utf-8") as f:
json.dump(vars(args), f, indent=4)
with open(args.test_all_meta_path, "r", encoding="utf-8") as f:
test_all_meta = json.load(f)
if args.domain != "all":
test_all_meta = {args.domain: test_all_meta[args.domain]}
test_file_list = get_unfinished(
args.action_space,
args.model,
args.observation_type,
args.result_dir,
test_all_meta,
)
left_info = ""
for domain in test_file_list:
left_info += f"{domain}: {len(test_file_list[domain])}\n"
logger.info(f"Left tasks:\n{left_info}")
get_result(
args.action_space,
args.model,
args.observation_type,
args.result_dir,
test_all_meta,
)
test(args, test_file_list)
except KeyboardInterrupt:
logger.info("Main process received KeyboardInterrupt.")
# Signal handler will take care of cleanup
except Exception as e:
logger.error(f"Unexpected error in main process: {e}", exc_info=True)
# Also trigger cleanup for unhandled exceptions
signal_handler(signal.SIGTERM, None)
finally:
# Final cleanup in case any environments or processes remain
logger.info("Main process final cleanup...")
for env in active_environments:
if env is not None:
try:
logger.info("Closing environment in final cleanup...")
env.close()
logger.info("Environment closed successfully in final cleanup")
except Exception as e:
logger.error(f"Error during final environment cleanup: {e}")
# First try gentle termination
for p in processes:
if p is not None and p.is_alive():
try:
logger.info(f"Terminating process {p.name}...")
p.terminate()
except Exception as e:
logger.error(f"Error terminating process: {e}")
# Wait a moment for processes to terminate
time.sleep(1)
# Then force kill if needed
for p in processes:
if p is not None and p.is_alive():
try:
logger.info(f"Force killing process {p.name}...")
os.kill(p.pid, signal.SIGKILL)
logger.info(f"Process {p.name} force killed")
except Exception as e:
logger.error(f"Error force killing process: {e}")