diff --git a/lib_run_single.py b/lib_run_single.py index 4bc37e1..be4bc54 100644 --- a/lib_run_single.py +++ b/lib_run_single.py @@ -326,4 +326,60 @@ def run_single_example_mano(agent, env, example, max_steps, instruction, args, e scores.append(result) with open(os.path.join(example_result_dir, "result.txt"), "w", encoding="utf-8") as f: f.write(f"{result}\n") - env.controller.end_recording(os.path.join(example_result_dir, "recording.mp4")) \ No newline at end of file + env.controller.end_recording(os.path.join(example_result_dir, "recording.mp4")) + +def run_single_example_uipath(agent, env, example, max_steps, instruction, args, example_result_dir, scores): + runtime_logger = setup_logger(example, example_result_dir) + try: + agent.reset(runtime_logger) + except Exception as e: + agent.reset() + + env.reset(task_config=example) + + time.sleep(60) # Wait for the environment to be ready + obs = env._get_obs() # Get the initial observation + done = False + step_idx = 0 + env.controller.start_recording() + while not done and step_idx < max_steps: + response, actions = agent.predict( + instruction, + obs, + args, + step_idx + ) + for action in actions: + # Capture the timestamp before executing the action + action_timestamp = datetime.datetime.now().strftime("%Y%m%d@%H%M%S") + logger.info("Step %d: %s", step_idx + 1, action) + obs, reward, done, info = env.step(action, args.sleep_after_execution) + + logger.info("Reward: %.2f", reward) + logger.info("Done: %s", done) + # Save screenshot and trajectory information + with open(os.path.join(example_result_dir, f"step_{step_idx + 1}_{action_timestamp}.png"), + "wb") as _f: + _f.write(obs['screenshot']) + with open(os.path.join(example_result_dir, "traj.jsonl"), "a") as f: + f.write(json.dumps({ + "step_num": step_idx + 1, + "action_timestamp": action_timestamp, + "action": action, + "response": response, + "reward": reward, + "done": done, + "info": info, + "screenshot_file": f"step_{step_idx + 1}_{action_timestamp}.png" + })) + f.write("\n") + if done: + logger.info("The episode is done.") + break + step_idx += 1 + result = env.evaluate() + logger.info("Result: %.2f", result) + scores.append(result) + with open(os.path.join(example_result_dir, "result.txt"), "w", encoding="utf-8") as f: + f.write(f"{result}\n") + env.controller.end_recording(os.path.join(example_result_dir, "recording.mp4")) diff --git a/mm_agents/uipath/README.md b/mm_agents/uipath/README.md new file mode 100644 index 0000000..db47b2d --- /dev/null +++ b/mm_agents/uipath/README.md @@ -0,0 +1,71 @@ +# UiPath Screen Agent + +We propose a simple, yet effective implementation of a Computer Use Agent, which achieves a performance of **53.6%** on the **OSWorld** benchmark with 50 steps, demonstrating competitive results with a relatively lightweight setup and UI only actions. + +Our system builds upon recent approaches in agentic computer use and follows the literature in adopting a two-stage architecture that separates high-level reasoning from low-level execution. Specifically, the system is composed of: + +- **Action Planner (GPT-5)**: Responsible for generating high-level action sequences, reasoning about task goals and observing modifications in the environment. +- **Grounder (UI-TARS 1.5 + Internal UI Predictor)**: This component translates abstract plans into concrete interactions with the user interface. The UI-TARS 1.5 serves as the grounding mechanism, mapping planned actions to locations on screen, while the Internal UI Predictor assists in resolving ambiguous predictions, increasing the robustness and probability of the predictions to fall within UI elements. + +![Schema](imgs/schema.png) + +## Run +``` +python run_multienv_uipath.py \ + --provider_name docker \ + --observation_type screenshot \ + --model uipath_gpt_5 \ + --sleep_after_execution 3 \ + --max_steps 50 \ + --num_envs 10 \ + --action_space computer_13 \ + --client_password password \ + --test_all_meta_path evaluation_examples/test_all.json \ + --uipath_model_name gpt-5-2025-08-07 +``` + +## Action Planner +The Action Planner receives the current screenshot, a task description, and a history of previous steps - including past screenshots, observations, internal reasoning, and predicted actions. Its role is to plan the next steps toward achieving the task goal, anticipate changes in the environment, and determine the next action, providing clear reasoning for each decision. + +The interaction history is structured as a conversation: the user reports the task, executed actions, supplies recent screenshots (up to the last two), and notes previously predicted outcomes of the agent, while the assistant replies consist of previously predicted agent responses. We adopt this conversational format because it mirrors the dialogue-style data the model was originally trained on, making the setup both natural and effective. + +By combining the current state with this structured history, the Action Planner generates context-aware, informed predictions at every step, being able to reconstruct the sequence of actions that led him to this point, noticing eventual failures, and plan the subsequent steps. + +We support a concise set of actions for interacting with the environment, focusing specifically on UI-related activities: +- Click (left, right, double click) +- Type +- Scroll +- Drag +- Mouse move +- Key press +- Extract data – a pseudo-action used to capture information for later steps +- Finish + +To help the planner model understand how to effectively apply actions, we provide them through few-shot examples. + +We intentionally exclude more complex actions to isolate and evaluate the capabilities of a UI-focused agent, since certain advanced actions may not be applicable across all applications. + +## Grounder +The Grounder maps the action to a certain point on the screen, if needed (for actions such as click, scroll, drag). It receives the screenshot, description of action to be performed and the type of the actions and returns a pair of integers representing the screen coordinates. + +We utilized the `UI-TARS-1.5` model, which has amazing screen knowledge and capabilities, however, to ensure more confidence in the predicted coordinates, we employ a crop-and-refine method, using an internal UI element predictor. + +### Crop and refine +We wrap the prediction of the grounding model with our internal UI element predictor. The goal of this step is not to guarantee that the prediction will always fall within an identified element, but to increase the likelihood of alignment and to give the model an opportunity to refine its output. + +The UI element predictor consists of a shared feature extractor backbone and multiple prediction towers for: +- identifying UI elements or controls such as icons, input boxes, checkboxes, buttons, radio buttons +- tables and cells +- few other tasks not used for our approach, but employed in other use-cases and needed in training for improving the feature extractor performance + +![Element preditions](imgs/element_predictions.png) + +In most interfaces, actions are expected to interact directly with UI elements: buttons, fields, icons, or menus. When a prediction lands outside any element, this often signals a potential misprediction. While there are legitimate cases where a click outside elements makes sense (e.g., dismissing a modal, dragging to select text, or changing window focus), they are exceptions rather than the rule. By treating these situations as possible errors, we can provide the model with a structured way to reconsider its output. + +Our approach is to give the model a “second shot” when its initial prediction falls outside an identified element. We do this by cropping around the former prediction and running the prediction again. This retry doesn’t guarantee correctness, but it does give the model a chance to adjust and potentially recover from mistakes. We crop around the original coordinates including close UI elements. + +This process gives the model multiple opportunities to predict within a relevant zone of the interface, reducing the overall number of mispredictions. In our experiments, the grounding model placed predictions outside any UI element about 11% of the time. After applying our refinement step, the second prediction was always different from the original, demonstrating that the model does reconsider and “changes its mind” when given this guided feedback. + +## Conclusion +Our method offers a clean and simple yet competitive pipeline for Computer Use tasks. It is cost effective, minimizing token usage during planning, avoiding parallel planning and reliance on numerous past images, and incorporate only **direct UI actions** with refined grounding actions to improve accuracy. With this approach, we achieve **53.6%** accuracy on OSWorld with a 50-step horizon. + diff --git a/mm_agents/uipath/action_planner.py b/mm_agents/uipath/action_planner.py new file mode 100644 index 0000000..217f32b --- /dev/null +++ b/mm_agents/uipath/action_planner.py @@ -0,0 +1,288 @@ +import datetime +import json +from collections import OrderedDict +import time +import mm_agents.uipath.llm_client as llm_client +from mm_agents.uipath.types_utils import ( + PlanAction, + ExecutionState, + State, + PlanActionType, +) +from mm_agents.uipath.action_planner_prompt_builder import ( + ComputerUseAgentInterface, + PlanerCoTSections, + user_command_template, + user_task_info_template, + PlannerOutput, +) +from mm_agents.uipath.utils import ValidationException, parse_message_json + + +class ActionPlanner(object): + def __init__(self): + self.number_history_steps_with_images = 2 + self.computer_use_agent_interface = ComputerUseAgentInterface() + + def build_message_output_format_info(self) -> str: + output_dict = OrderedDict({}) + for _, value in PlanerCoTSections.items(): + display = value["display"] + description = value["description"] + output_dict[display] = description + + output_dict["action"] = ( + "" + ) + + return json.dumps(output_dict, indent=4, ensure_ascii=False) + + def get_step_content( + self, step: dict, following_step: dict | None + ) -> tuple[str, str]: + content_dict = OrderedDict({}) + observation_dict = OrderedDict({}) + + observation_dict["Performed actions"] = step["actions"] + + if ( + "extracted_data" in step["additional_parameters"] + ): # if the step was an extraction step add the dummy extraction action + extraction_action = { + "type": PlanActionType.ExtractData, + "description": step["description"], + "status": "data extracted", + } + observation_dict["Performed actions"] = [extraction_action] + + if following_step: + observation_dict["Observation"] = following_step[ + "additional_parameters" + ].get("review", None) + + for key, value in PlanerCoTSections.items(): + if key != "review": + param_value = step["additional_parameters"].get(key, None) + display_name = value["display"] + content_dict[display_name] = param_value + content_dict["actions"] = json.loads( + step["additional_parameters"]["plan_action"] + ) + + content_dict = json.dumps(content_dict, indent=4, ensure_ascii=False) + observation_dict = json.dumps(observation_dict, indent=4, ensure_ascii=False) + return content_dict, observation_dict + + def build_messages_chat(self, state: State, execution_info: dict) -> list[dict]: + messages = [] + system_message = { + "role": "system", + "content": self.computer_use_agent_interface.get_system_prompt(), + } + + messages.append(system_message) + + user_task_info_message = { + "role": "user", + "content": [ + { + "type": "text", + "text": user_task_info_template.format( + task=state.task, + current_date=datetime.datetime.now().strftime("%Y-%m-%d"), + ), + } + ], + } + + messages.append(user_task_info_message) + + start_index = max( + 0, len(state.previous_steps) - self.number_history_steps_with_images + ) + end_index = len(state.previous_steps) + + for index in range(0, end_index): + step = state.previous_steps[index] + + if index >= start_index: + assert step["image"] is not None and len(step["image"]) > 0, ( + "Step image is empty" + ) + user_image_message = { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{step['image']}" + }, + }, + ], + } + messages.append(user_image_message) + + assistant_message_text, user_observation = self.get_step_content( + step, state.previous_steps[index + 1] if index < end_index - 1 else None + ) + + assistant_message = { + "role": "assistant", + "content": [ + { + "type": "text", + "text": assistant_message_text, + }, + ], + } + messages.append(assistant_message) + + user_message_reply = { + "role": "user", + "content": [ + { + "type": "text", + "text": user_observation, + }, + ], + } + messages.append(user_message_reply) + + last_user_message = { + "role": "user", + "content": [ + { + "type": "text", + "text": "Current screenshot:", + }, + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{state.image_base64}" + }, + }, + { + "type": "text", + "text": user_command_template.format( + task=state.task, + execution_info_message=self.build_execution_info_message( + execution_info + ), + json_output_format=self.build_message_output_format_info(), + ), + }, + ], + } + + messages.append(last_user_message) + return messages + + def extract_response( + self, response_content: str + ) -> tuple[PlanAction, dict[str, str]]: + cot_sections_lst = list(PlanerCoTSections.keys()) + + additional_sections = OrderedDict({}) + response_json = parse_message_json(response_content) + + for section in cot_sections_lst: + section_display = PlanerCoTSections[section]["display"] + if section_display not in response_json: + raise ValidationException( + f"Invalid response format, '{section}' key not found: {response_content}" + ) + additional_sections[section] = response_json.get( + PlanerCoTSections[section]["display"] + ) + + if "action" not in response_json: + raise ValidationException( + f"Invalid response format, 'action' key not found: {response_content}" + ) + + action_dict = response_json["action"] + + plan_action = PlanAction.from_dict(self.correct_action_type(action_dict)) + + if plan_action.action_type == PlanActionType.Drag: + self.computer_use_agent_interface.validate_action(plan_action) + + return plan_action, additional_sections + + def build_execution_info_message(self, execution_info: dict) -> str: + execution_info_message = "" + if "planner_action_review" in execution_info: + action_description = execution_info["planner_action_review"][ + "action_description" + ] + error_message = execution_info["planner_action_review"]["error_message"] + + execution_info_message = f"You predicted this action: '{action_description}' but it is not valid because: {error_message}. If the target element is not visible on the screenshot, scroll first to make the target element visible. If the target element is not correct, change the action description with more precise element description using nearby context." + return execution_info_message + + def correct_action_type(self, response_json: dict) -> dict: + action_type = response_json.get("type", "").lower() + if action_type in ("press", "key_press", "press_key"): + response_json["type"] = "key_press" + elif action_type in ("mouse_move", "move_mouse"): + response_json["type"] = "move_mouse" + elif action_type in ("type_text", "type_into", "type"): + response_json["type"] = "type" + elif "scroll" in action_type: + response_json["type"] = "scroll" + elif "wait" in action_type: + response_json["type"] = "wait" + return response_json + + def predict(self, state: State, execution_state: ExecutionState) -> PlannerOutput: + messages = self.build_messages_chat(state, execution_state.execution_info) + llm_messages = [message for message in messages] + repeat_count = 2 + plan, response_content = None, None + while repeat_count > 0: + try: + payload = { + "model": execution_state.model_name, + "messages": llm_messages, + "max_completion_tokens": 5000, + "reasoning_effort": "medium", + } + response_content = llm_client.send_messages(payload) + if response_content is None or len(response_content.strip()) == 0: + raise ValidationException("Planner response is None or empty") + plan_action, additional_sections = self.extract_response( + str(response_content) + ) + plan = PlannerOutput(plan_action, additional_sections) + break + except ValidationException as e: + time.sleep(5) + repeat_count -= 1 + ai_message = { + "role": "assistant", + "content": [ + { + "type": "text", + "text": response_content, + }, + ], + } + error_message = { + "role": "user", + "content": [ + { + "type": "text", + "text": f"{e.message}. Please try again and output a valid response in the correct format.", + }, + ], + } + + llm_messages = messages + [ai_message, error_message] + + if repeat_count == 0: + raise ValueError( + f"Invalid planner response format: {response_content}, {str(e)}" + ) + if plan is None: + raise ValueError("Planner response is not valid") + return plan diff --git a/mm_agents/uipath/action_planner_prompt_builder.py b/mm_agents/uipath/action_planner_prompt_builder.py new file mode 100644 index 0000000..ea219da --- /dev/null +++ b/mm_agents/uipath/action_planner_prompt_builder.py @@ -0,0 +1,390 @@ +from collections import OrderedDict +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional +from mm_agents.uipath.types_utils import PlanAction, key_maps +from mm_agents.uipath.utils import ValidationException + +system_template = """You are a computer use agent that perform computer-related tasks. +You will be given a task, a current screenshot, and a list of previous actions. You need to predict the next action. + +## Available Actions: +{available_actions} + +In addition there are some special actions that are not part of the main UI actions: +{special_actions} + +Each action has a description and parameters. The action description is a single sentence which mentions the action and the control element to interact with. +This description will be used by the executor agent to locate the action's target element coordinates in the screen, so describe the element targeted by the action as detailed as possible. +Particularly for icons, you can describe their position, text on it, color, nearby elements etc... +Example of some action descriptions with more detailed information to help the executor agent locate the element: +- "Click on the Calendar icon with the text 'Thu 28'" +- "Click the 'Search' button on the top right corner next to the login button." +- "Click the 'First Name' input box from the UserInfo section to focus it before typing." + +Your action response must be a valid JSON with the following format: +{{ + "type": str # one of the valid action types + "description": # action description + "parameters": # optional, action parameters dictionary +}} + +## Action examples: example of valid actions: +{examples} + +## Important Notes: +- Close any cookies, ads, login or registration etc pop-ups if not needed. +- Before typing, ensure the input box is focused by clicking on it. +""" + +user_command_template = """Recall Task Again: {task} +Check if the task is finished. If not provide the next action to perform. +Remember: +- Perform the task on provided application(s) or website(s). You are not allowed to use the browser "address bar". +- Close any cookies, ads, login or registration etc pop-ups if not needed. +- Only one action at a time (never "click and type", "click and drag", "type and press", "press shift and click", etc..). Think of how to combine them in two consecutive actions obtaining the intended result or use an available action that can obtain it. +- For any opening input combobox, dropdown menu options, you must select an option or press Enter key to select default one. +- Click on input box to ensure is focused before typing. Otherwise, the input box will not accept the text. +- Once focusing on an input box, if it has a default pre-typed value (not placeholder which is usually grayed-out), remove the existing value first by clicking on "X" icon or using "Ctrl A" + "Backspace" or "Backspace" if the value is already selected. +- For search input, if no search button or suggestions popup after typing, press 'Enter' to trigger search. +- Retry the drag action on slider control if needed to refine the slider values closer to expected values. +- Scroll / Pageup / Pagedown to explore or extract more content/data if needed (prefer 'key_press' action with key 'Pageup', 'Pagedown' for faster scrolling). Particularly when extraction data from table with hidden rows or columns. +- Scroll action must have a 'direction' parameter. Finish action must have a 'status' parameter. +- If you modify some settings remember to save/apply them. If button is not visible try to scroll for it. + +Most importantly, never type or click on element not visible on screenshot. Use scroll or pageup/pagedown to make the element visible first. + +{execution_info_message} +Answer in json format: +{json_output_format} +""" + +PlanerCoTSections = OrderedDict( + { + "review": { + "display": "previous_action_result", + "description": "Briefly describe the previous action result and UI change on the screenshot to see if is correctly performed.", + }, + "thought": { + "display": "thought", + "description": "Reason briefly about the next action to perform if the task is not finished.", + }, + "action_description": { + "display": "action_description", + "description": "Describe the action to perform in a single sentence. The description must be precise and not rely on specific information in the current screen.", + }, + } +) + + +### for chat conversation +user_task_info_template = """## Task Information: +The current date is (YYYY-MM-DD): {current_date} +Task: {task} +""" + + +@dataclass +class ActionDefinition: + type: str + description: str + parameters: Optional[Dict[str, str]] = None + examples: List[Dict[str, Any]] = field(default_factory=list) + + +class PlannerOutput(object): + def __init__(self, plan_action: PlanAction, additional_sections: dict[str, str]): + self.plan_action = plan_action + self.thought = additional_sections["thought"] + self.review = additional_sections["review"] + self.additional_sections = { + key: value + for key, value in additional_sections.items() + if key not in ["review", "thought"] + } + + +class ComputerUseAgentInterface: + def __init__(self): + self.ui_actions = {} + self.special_actions = {} + self._setup_default_actions() + + def _setup_default_actions(self): + self.add_action( + ActionDefinition( + type="click", + description="Click on a UI element", + examples=[ + {"type": "click", "description": "Click the 'Next' button."}, + { + "type": "click", + "description": "Click the 'X' icon in the input box", + }, + { + "type": "click", + "description": "Click the first name input box to focus on it.", + }, + ], + ) + ) + + self.add_action( + ActionDefinition( + type="right_click", + description="Right click on a UI element", + examples=[ + { + "type": "right_click", + "description": "Right click on the first row from the patient table to open the context menu.", + } + ], + ) + ) + + self.add_action( + ActionDefinition( + type="double_click", + description="Double click on a UI element", + examples=[ + { + "type": "double_click", + "description": "Double click word app icon to open the application.", + }, + ], + ) + ) + + self.add_action( + ActionDefinition( + type="type", + description="Type text into a focused input field. Ensure the input box is focused before typing. To focus the input box, you may need to click on it first.", + parameters={"text": "str - the text to be typed"}, + examples=[ + { + "type": "type", + "description": "Type 'John' in the first name input box.", + "parameters": {"text": "John"}, + }, + { + "type": "type", + "description": "Type 'Doe' in the last name input box.", + "parameters": {"text": "Doe"}, + }, + { + "type": "type", + "description": "Type 'Hello, world!' in the text area.", + "parameters": {"text": "Hello, world!"}, + }, + ], + ) + ) + + self.add_action( + ActionDefinition( + type="scroll", + description="Scroll an UI element in a specified direction", + parameters={ + "direction": "str - 'up', 'down', 'left', or 'right'", + "distance": "int - the number of scroll steps (wheel “clicks”) to send.", + }, + examples=[ + { + "type": "scroll", + "description": "Scroll down to see more content.", + "parameters": {"direction": "down"}, + }, + { + "type": "scroll", + "description": "Scroll up to the top of the page.", + "parameters": {"direction": "up"}, + }, + ], + ) + ) + + self.add_action( + ActionDefinition( + type="drag", + description="Drag an element or the mouse (with left click on) from one location to another. You must specify both start_description and end_description.", + parameters={ + "start_description": "description of the location to start dragging", + "end_description": "description of the location to drag to", + }, + examples=[ + { + "type": "drag", + "description": "Drag the response.txt file to the responses folder", + "start_description": "Click the response.txt file", + "end_description": "Click the responses folder", + }, + ], + ) + ) + + self.add_action( + ActionDefinition( + type="mouse_move", + description="Move the mouse to a specific element", + examples=[ + { + "type": "mouse_move", + "description": "Move the mouse to the 'Submit' button.", + }, + { + "type": "mouse_move", + "description": "Hover over the 'Settings' icon.", + }, + ], + ) + ) + + self.add_action( + ActionDefinition( + type="key_press", + description="Press a specific key on the keyboard", + parameters={ + "key": f'str # the key or key combination (separated by space) to be pressed. Example of key combination "Ctrl A", "Shift Tab", "Ctrl C" etc. " + Click" is not a valid combination, use two separate actions. Beside normal keys like letters, numerics, punctuations etc.. here are special key list: {key_maps.keys()}.' + }, + examples=[ + { + "type": "key_press", + "description": "Press 'Ctrl A' to select all text.", + "parameters": {"key": "Ctrl A"}, + }, + { + "type": "key_press", + "description": "Press Pagedown key.", + "parameters": {"key": "Pagedown"}, + }, + ], + ) + ) + + self.add_special_action( + ActionDefinition( + type="extract_data", + description="Use to extract some data from the screen for the task. This data will be stored in memory and used in the next actions or returned in the final result.", + parameters={ + "description": "str - short description of the data to be extracted", + "data": "str|json - the data to be extracted", + }, + examples=[ + { + "type": "extract_data", + "description": "Extract the product name and price from the screen.", + "parameters": { + "description": "Available product name and price", + "data": "Product Name: iPhone 14, Price: $999", + }, + }, + ], + ) + ) + + self.add_special_action( + ActionDefinition( + type="finish", + description=" Use it to finish the task with success or failure status. When you think the task was finished return success, while when you think can not be done, return failure, don't easily say failure, try your best to do the task.", + parameters={"status": "str - 'success' or 'failure'"}, + examples=[ + { + "type": "finish", + "description": "Task completed successfully.", + "parameters": {"status": "success"}, + }, + ], + ) + ) + + def add_action(self, action: ActionDefinition): + self.ui_actions[action.type] = action + + def add_special_action(self, action: ActionDefinition): + self.special_actions[action.type] = action + + def get_action_definition(self, action_type: str) -> Optional[ActionDefinition]: + return self.ui_actions.get(action_type) or self.special_actions.get(action_type) + + def validate_action(self, action: PlanAction): + action_definition = self.get_action_definition(action.action_type) + if action_definition is None: + raise ValidationException(f"Invalid action type: {action.action_type}") + + if action_definition.parameters: + for parameter in action_definition.parameters: + if parameter not in action.parameters: + raise ValidationException( + f"Missing parameter '{parameter}' in action: {action}" + ) + + def get_system_prompt(self) -> str: + indentation = " " + + def get_action_definition(action: ActionDefinition) -> str: + action_prompt = f"- {action.type}: {action.description}" + if action.parameters is not None and len(action.parameters) > 0: + params = (",\n" + 2 * indentation).join( + f"{k}: {v}" for k, v in action.parameters.items() + ) + parameter_def = ( + f"{indentation}parameters:\n{indentation}{indentation}{params}" + ) + action_prompt += "\n" + parameter_def + return action_prompt + + def get_examples(actions: List[ActionDefinition]) -> list[str]: + output_examples = [] + for action in actions: + for example in action.examples: + example_type = example["type"] + example_description = example["description"] + type_str = f'"type": "{example_type}"' + description_str = f'"description": "{example_description}"' + example_parts = [type_str, description_str] + + if "parameters" in example: + params = (",\n" + 2 * indentation).join( + f'"{k}": "{v}"' for k, v in example["parameters"].items() + ) + parameters_str = ( + '"parameters"' + + ": {\n" + + 2 * indentation + + params + + "\n" + + indentation + + "}" + ) + example_parts.append(parameters_str) + example_json = ( + "{\n" + + indentation + + (",\n" + indentation).join(example_parts) + + "\n}" + ) + output_examples.append(example_json) + + return output_examples + + available_actions = "\n\n".join( + get_action_definition(action) for action in self.ui_actions.values() + ) + special_actions = "\n\n".join( + get_action_definition(action) for action in self.special_actions.values() + ) + examples = "\n\n".join( + get_examples( + list(self.ui_actions.values()) + list(self.special_actions.values()) + ) + ) + + return system_template.format( + available_actions=available_actions, + special_actions=special_actions, + examples=examples, + ) + + +if __name__ == "__main__": + agent = ComputerUseAgentInterface() + print(agent.get_system_prompt()) diff --git a/mm_agents/uipath/agent.py b/mm_agents/uipath/agent.py new file mode 100644 index 0000000..f9b2c67 --- /dev/null +++ b/mm_agents/uipath/agent.py @@ -0,0 +1,223 @@ +import json +from mm_agents.uipath.types_utils import ( + ComputerUseAction, + ComputerUseStep, + SupportedActions, + PlanActionType, + PlanAction, + key_maps, + ExecutionState, + State, +) +import mm_agents.uipath.utils as utils +from mm_agents.uipath.action_planner import ActionPlanner, PlannerOutput +from mm_agents.uipath.grounder_client import GrounderClient + + +class UiPathComputerUseV1(object): + def __init__(self): + self.planner = ActionPlanner() + self.executor = GrounderClient() + + async def predict_request( + self, request_body: dict, model_name: str + ) -> tuple[dict, dict]: + state = State( + task=request_body["userTask"], + image_base64=request_body["image"], + previous_steps=request_body.get("previousSteps", []), + ) + + execution_state = ExecutionState(model_name=model_name, execution_info={}) + output = await self.predict(state, execution_state) + return output + + def process_grounding( + self, + plan_action: PlanAction, + grounding_result: utils.GroundingOutput, + x: int, + y: int, + ): + match plan_action.action_type: + case PlanActionType.Scroll: + # guess the scroll direction if missing in the plan output + if "direction" not in plan_action.parameters: + if "scroll up" in plan_action.description.lower(): + scroll_direction = "up" + else: + scroll_direction = "down" + else: + scroll_direction = plan_action.parameters["direction"] + + action = ComputerUseAction( + name=SupportedActions.Scroll, + description=plan_action.description, + parameters={"position": [x, y], "direction": scroll_direction}, + ) + + if "distance" in plan_action.parameters: + match scroll_direction: + case "up": + action.parameters["offset"] = [ + 0, + plan_action.parameters["distance"], + ] + case "down": + action.parameters["offset"] = [ + 0, + -plan_action.parameters["distance"], + ] + case "left": + action.parameters["offset"] = [ + plan_action.parameters["distance"], + 0, + ] + case "right": + action.parameters["offset"] = [ + -plan_action.parameters["distance"], + 0, + ] + case PlanActionType.Drag: + assert grounding_result.end_position is not None, ( + "End position must be provided for drag action" + ) + x_end, y_end = grounding_result.end_position + action = ComputerUseAction( + name=SupportedActions.Drag, + description=plan_action.description, + parameters={ + "path": [ + {"x": x, "y": y}, + {"x": x_end, "y": y_end}, + ] + }, + ) + case _: + action_name = plan_action.action_type + parameters = {"position": [x, y]} + + if plan_action.action_type == PlanActionType.DoubleClick: + action_name = SupportedActions.Click + parameters["click_type"] = "double" + elif plan_action.action_type == PlanActionType.RightClick: + action_name = SupportedActions.Click + parameters["button"] = "right" + elif plan_action.action_type == PlanActionType.MouseMove: + action_name = SupportedActions.MouseMove # different names + + assert action_name in [ + SupportedActions.Click, + SupportedActions.MouseMove, + ] + action = ComputerUseAction( + name=action_name, + description=plan_action.description, + parameters=parameters, + ) + return action + + async def predict( + self, state: State, execution_state: ExecutionState + ) -> tuple[dict, dict]: + planer_output: PlannerOutput = self.planner.predict(state, execution_state) + plan_action = planer_output.plan_action + + action: ComputerUseAction | None = None + step: ComputerUseStep | None = None + + match plan_action.action_type: + case PlanActionType.KeyPress: + keys = plan_action.parameters["key"].split(" ") + keys = [key.strip() for key in keys] + keys = [key_maps.get(key, key) for key in keys] + action = ComputerUseAction( + name=SupportedActions.KeyPress, + description=plan_action.description, + parameters={"keys": keys}, + ) + case PlanActionType.Wait: + action = ComputerUseAction( + name=SupportedActions.Wait, + description=plan_action.description, + parameters={}, + ) + case PlanActionType.ExtractData: + # return a step with no action, just to store the extracted data + step = ComputerUseStep( + description=plan_action.description, + actions=[], + additional_parameters={ + "extracted_data": plan_action.parameters, + }, + thought=planer_output.thought, + ) + case PlanActionType.Finish: + action = ComputerUseAction( + name=SupportedActions.Finish, + description=plan_action.description, + parameters=plan_action.parameters, + ) + case ( + PlanActionType.Click + | PlanActionType.MouseMove + | PlanActionType.Scroll + | PlanActionType.Drag + | PlanActionType.DoubleClick + | PlanActionType.RightClick + ): + if plan_action.action_type != PlanActionType.Drag: + grounding_result = await self.executor.predict( + state.image_base64, + plan_action.description, + action=plan_action.action_type, + ) + else: + grounding_result = await self.executor.predict( + state.image_base64, + plan_action.parameters["start_description"], + action=plan_action.action_type, + ) + grounding_result_end = await self.executor.predict( + state.image_base64, + plan_action.parameters["end_description"], + action=plan_action.action_type, + ) + grounding_result.end_position = grounding_result_end.position + x, y = grounding_result.position + action = self.process_grounding(plan_action, grounding_result, x, y) + case PlanActionType.Type: + action = ComputerUseAction( + name=SupportedActions.TypeInto, + description=plan_action.description, + parameters={"value": plan_action.parameters["text"]}, + ) + + if step is None: + assert action is not None + step = ComputerUseStep( + description=plan_action.description, + actions=[action], + additional_parameters={}, + thought=planer_output.thought, + ) + + # save additional data for history + assert step.additional_parameters is not None + step.additional_parameters["thought"] = planer_output.thought + step.additional_parameters["review"] = planer_output.review + step.additional_parameters.update(planer_output.additional_sections) + step.additional_parameters["plan_action"] = json.dumps(plan_action.to_dict()) + + history_image = state.image_base64 + previous_steps_parameters = { + "max_chat_history_messages": 1000, + "max_chat_history_images": self.planner.number_history_steps_with_images, + "image": history_image, + } + agent_response = { + "step": step.to_response_dict(), + "previous_steps_parameters": previous_steps_parameters, + } + + return agent_response diff --git a/mm_agents/uipath/grounder_client.py b/mm_agents/uipath/grounder_client.py new file mode 100644 index 0000000..0fc5374 --- /dev/null +++ b/mm_agents/uipath/grounder_client.py @@ -0,0 +1,43 @@ +import httpx +import mm_agents.uipath.utils as utils +import os + +class GrounderClient(object): + def __init__(self): + # Proxy for hosting UI-TARS + UiElementPredictor + # Could be replaced with a VLLM server and grounder (UI-TARS) specific processing + # Or any other grounder + self.url = "" + + async def predict( + self, image_base64: str, action_description: str, action: str | None = None + ) -> utils.GroundingOutput: + request = utils.GroundingRequest( + description=action_description, + image_base64=image_base64, + action_type=action, + ) + api_key = os.getenv("SERVICE_KEY") + + async with httpx.AsyncClient() as client: + response = await client.post( + self.url, + json={ + "image_base64": request.image_base64, + "action_description": request.description, + "action": request.action_type, + }, + headers={ + "X-API-KEY": api_key + }, + timeout=100.0, + ) + + if response.status_code != 200: + raise ValueError(f"Prediction failed: {response.text}") + + data = response.json() + return utils.GroundingOutput( + description=data["description"], + position=tuple(data["position"]), + ) diff --git a/mm_agents/uipath/imgs/element_predictions.png b/mm_agents/uipath/imgs/element_predictions.png new file mode 100644 index 0000000..90227b1 Binary files /dev/null and b/mm_agents/uipath/imgs/element_predictions.png differ diff --git a/mm_agents/uipath/imgs/schema.png b/mm_agents/uipath/imgs/schema.png new file mode 100644 index 0000000..7b3f0e3 Binary files /dev/null and b/mm_agents/uipath/imgs/schema.png differ diff --git a/mm_agents/uipath/llm_client.py b/mm_agents/uipath/llm_client.py new file mode 100644 index 0000000..be0cd6f --- /dev/null +++ b/mm_agents/uipath/llm_client.py @@ -0,0 +1,44 @@ +import os +import requests + +def send_messages(payload): + # URL to your proxy for calling LLMs + proxy_url = "" + api_key = os.getenv("SERVICE_KEY") + + # Can be directly replaced with code for calling Azure endpoint as in: + #.env config example : + # AZURE_OPENAI_API_BASE=YOUR_API_BASE + # AZURE_OPENAI_DEPLOYMENT=YOUR_DEPLOYMENT + # AZURE_OPENAI_API_VERSION=YOUR_API_VERSION + # AZURE_OPENAI_MODEL=gpt-4o-mini + # AZURE_OPENAI_API_KEY={{YOUR_API_KEY}} + # AZURE_OPENAI_ENDPOINT=${AZURE_OPENAI_API_BASE}/openai/deployments/${AZURE_OPENAI_DEPLOYMENT}/chat/completions?api-version=${AZURE_OPENAI_API_VERSION} + + + # Load environment variables + # load_dotenv() + # api_key = os.getenv('AZURE_OPENAI_API_KEY') + # openai_endpoint = os.getenv('AZURE_OPENAI_ENDPOINT') + # #logger.info("Openai endpoint: %s", openai_endpoint) + + # headers = { + # "Content-Type": "application/json", + # "api-key": api_key + # } + # response = requests.post( + # openai_endpoint, + # headers=headers, + # json=payload + # ) + + headers = { + "Content-Type": "application/json", + "X-API-KEY": api_key + } + retries = 3 + for attempt in range(retries): + response = requests.post(proxy_url, headers=headers, json=payload) + if response.status_code == 200: + return response.json()["choices"][0]["message"]["content"] + return None \ No newline at end of file diff --git a/mm_agents/uipath/types_utils.py b/mm_agents/uipath/types_utils.py new file mode 100644 index 0000000..e00dfa4 --- /dev/null +++ b/mm_agents/uipath/types_utils.py @@ -0,0 +1,194 @@ +from typing import Optional, Union, List +from enum import Enum + +key_maps = { + "Backspace": "Back", + "Ctrl": "Ctrl", + "Shift": "Shift", + "Tab": "Tab", + "Enter": "Enter", + "Escape": "Esc", + "Arrowleft": "Left", + "Arrowup": "Up", + "Arrowright": "Right", + "Arrowdown": "Down", + "Delete": "Del", + "Pageup": "PgUp", + "Pagedown": "PgDn", +} + + +class PlanActionType(str, Enum): + Click = "click" + DoubleClick = "double_click" + RightClick = "right_click" + Type = "type" + Scroll = "scroll" + Drag = "drag" + Wait = "wait" + KeyPress = "key_press" + MouseMove = "move_mouse" + ExtractData = "extract_data" + Finish = "finish" + + +VALID_PLAN_ACTIONS = [action.value for action in PlanActionType] + + +class PlanAction: + def __init__( + self, action_type: str, description: str, parameters: dict | None = None + ): + self.action_type = action_type + self.description = description + self.parameters = parameters if parameters is not None else {} + + def to_dict(self) -> dict: + return { + "type": self.action_type, + "description": self.description, + "parameters": self.parameters, + } + + @classmethod + def from_dict(cls, data: dict | None) -> Union["PlanAction", None]: + if data is None: + return None + + action_type = data.get("type", "").lower() + + if action_type not in VALID_PLAN_ACTIONS: + raise Exception(f"Invalid action type: {action_type}") + + target_element = data.get("target_element", None) + + action = PlanAction( + action_type=action_type, + description=data.get("description", ""), + parameters=data.get("parameters", {}), + ) + + if target_element is not None: + action.parameters["target_element"] = target_element + + return action + + +class SupportedActions(str, Enum): + Click = "click" + TypeInto = "type_into" + Scroll = "scroll" + Drag = "drag" + Wait = "wait_load_completed" + KeyPress = "keypress" + MouseMove = "mouse_move" + Finish = "finish" + + def __str__(self): + return self.value + + +class ComputerUseAction(object): + def __init__( + self, + name: str, + description: str, + parameters: dict, + action_id: str | None = None, + result: Optional[dict | str] = None, + ): + self.id = action_id + self.name = name + self.parameters = parameters + self.description = description + self.result = result + + @staticmethod + def from_dict(action_dict: dict): + result = action_dict.get("result") + if ( + result is not None + and isinstance(result, dict) + and "token_usage" in result + and "data" in result + ): + result = result["data"] + + return ComputerUseAction( + name=action_dict["name"], + description=action_dict["description"], + result=result, + parameters=action_dict.get("parameters", {}), + ) + + def to_response_dict(self): + action_dict = { + "description": self.description, + "method_type": self.name, + "parameters": self.parameters, + "id": self.id, + } + + if self.result is not None: + action_dict["result"] = self.result + + return action_dict + + +class ComputerUseStep(object): + def __init__( + self, + description: str, + actions: List[ComputerUseAction], + thought: str | None = None, + screen_info: dict | None = None, + image: str | None = None, + additional_parameters: dict | None = None, + ): + self.description = description + self.actions: List[ComputerUseAction] = actions + self.thought = thought + self.screen_info = screen_info + self.additional_parameters = additional_parameters + self.image = image + + @staticmethod + def from_dict(step_dict): + return ComputerUseStep( + description=step_dict["description"], + thought=step_dict.get("thought"), + actions=[ + ComputerUseAction.from_dict(action_dict) + for action_dict in step_dict["actions"] + ], + ) + + def to_response_dict(self): + response_step = { + "description": self.description, + "thought": self.thought, + "additional_parameters": self.additional_parameters, + } + response_actions = [] + + for action in self.actions: + action_dict = action.to_response_dict() + response_actions.append(action_dict) + if self.image is not None: + response_step["image"] = self.image + response_step["actions"] = response_actions + + return response_step + + +class State(object): + def __init__(self, task: str, image_base64: str, previous_steps: list): + self.task = task + self.image_base64 = image_base64 + self.previous_steps = previous_steps + + +class ExecutionState(object): + def __init__(self, model_name: str, execution_info: dict): + self.model_name = model_name + self.execution_info = execution_info diff --git a/mm_agents/uipath/utils.py b/mm_agents/uipath/utils.py new file mode 100644 index 0000000..2345ffc --- /dev/null +++ b/mm_agents/uipath/utils.py @@ -0,0 +1,57 @@ +import json +import re + +from json_minify import json_minify +from json_repair import repair_json + + +class ValidationException(Exception): + def __init__(self, message: str): + self.message = message + + +def parse_message_json(message: str) -> dict: + message = message.strip() + code_block_pattern = r"```json\s*([\s\S]+?)```" + code_block_match = re.search(code_block_pattern, message, re.DOTALL) + + if code_block_match: + json_str = code_block_match.group(1).strip() + else: + bracket_pattern = r"\{.*\}" + bracket_match = re.search(bracket_pattern, message, re.DOTALL) + if not bracket_match: + raise ValidationException("Response does not have correct json format") + json_str = bracket_match.group(0).strip() + + try: + json_str = json_minify(json_str) + data = json.loads(json_str) + except json.JSONDecodeError: + try: + json_str = repair_json(json_str) + data = json.loads(json_str) + except json.JSONDecodeError: + raise ValidationException("Response does not have correct json format") + return data + + +class GroundingOutput: + def __init__( + self, + description: str, + position: tuple[int, int], + end_position: tuple[int, int] = None, + ): + self.description = description + self.position = position + self.end_position = end_position + + +class GroundingRequest: + def __init__( + self, description: str, image_base64: str, action_type: str | None = None + ): + self.description = description + self.image_base64 = image_base64 + self.action_type = action_type diff --git a/mm_agents/uipath_agent.py b/mm_agents/uipath_agent.py new file mode 100644 index 0000000..ea6f608 --- /dev/null +++ b/mm_agents/uipath_agent.py @@ -0,0 +1,238 @@ +import base64 +import json +from typing import Dict, List +import re +import asyncio +import logging +from mm_agents.uipath.agent import UiPathComputerUseV1 + + +def parse_actions_from_string(input_string): + if input_string.strip() in ["WAIT", "DONE", "FAIL"]: + return [input_string.strip()] + actions = [] + matches = re.findall(r"```json\s+(.*?)\s+```", input_string, re.DOTALL) + if matches: + try: + for match in matches: + action_dict = json.loads(match) + actions.append(action_dict) + return actions + except json.JSONDecodeError as e: + return f"Failed to parse JSON: {e}" + else: + matches = re.findall(r"```\s+(.*?)\s+```", input_string, re.DOTALL) + if matches: + try: + for match in matches: + action_dict = json.loads(match) + actions.append(action_dict) + return actions + except json.JSONDecodeError as e: + return f"Failed to parse JSON: {e}" + else: + try: + action_dict = json.loads(input_string) + return [action_dict] + except json.JSONDecodeError: + raise ValueError("Invalid response format: " + input_string) + + +def map_key(key): + key = key.lower() + if key == "space": + key = " " + elif key == "back": + key = "backspace" + elif key == "super": + key = "win" + elif key == "arrowdown": + key = "down" + elif key == "arrowup": + key = "up" + elif key == "arrowright": + key = "right" + elif key == "arrowrleft": + key = "left" + return key + + +def map_uipath_agent_actions_to_osworld(actions): + results = [] + + def handle_click(params): + x, y = tuple(params["position"]) + if "button" in params: + if params["button"] == "right": + return {"action_type": "RIGHT_CLICK", "x": x, "y": y} + elif params["button"] == "left": + return {"action_type": "LEFT_CLICK", "x": x, "y": y} + else: + raise ValueError(f"Unknown click button: {params['button']}") + elif "click_type" in params: + if params["click_type"] == "double": + return {"action_type": "DOUBLE_CLICK", "x": x, "y": y} + elif params["click_type"] == "triple": + return {"action_type": "TRIPLE_CLICK", "x": x, "y": y} + else: + raise ValueError(f"Unknown click type: {params['click_type']}") + else: + return {"action_type": "CLICK", "x": x, "y": y} + + def handle_keypress(params): + keys = [map_key(k) for k in params["keys"]] + if len(keys) == 1: + return {"action_type": "PRESS", "key": keys[0]} + return {"action_type": "HOTKEY", "keys": keys} + + def handle_key_event(params, event_type): + key = map_key(params["keys"][0]) + return {"action_type": event_type, "key": key} + + for action in actions: + method = action["method_type"].lower() + params = action["parameters"] + + match method: + case "click": + result = handle_click(params) + case "type_into": + result = {"action_type": "TYPING", "text": params["value"]} + case "wait_load_completed": + result = "WAIT" + case "keypress": + result = handle_keypress(params) + case "keydown": + result = handle_key_event(params, "KEY_DOWN") + case "keypup": + result = handle_key_event(params, "KEY_UP") + case "finish": + status_map = {"failure": "FAIL", "success": "DONE"} + result = status_map.get(params.get("status"), "DONE") + case "scroll": + x, y = tuple(params["position"]) + if "offset" in params: + dx, dy = tuple(params["offset"]) + else: + dy = 5 if params["direction"] == "up" else -5 + dx = 5 if params["direction"] == "left" else -5 + result = [ + {"action_type": "MOVE_TO", "x": x, "y": y}, + {"action_type": "SCROLL", "dx": dx, "dy": dy}, + ] + case "mouse_move": + x, y = tuple(params["position"]) + result = {"action_type": "MOVE_TO", "x": x, "y": y} + case "drag": + path = params["path"] + x1, y1 = path[0]["x"], path[0]["y"] + x2, y2 = path[1]["x"], path[1]["y"] + result = [ + {"action_type": "MOVE_TO", "x": x1, "y": y1}, + {"action_type": "DRAG_TO", "x": x2, "y": y2}, + ] + case _: + raise ValueError(f"Unknown method type: {method}") + + results.append(result) + + return json.dumps(results) + + +class UipathBaseAgent: + def __init__( + self, + platform="ubuntu", + model="gpt-5-mini-2025-08-07", + action_space="computer_13", + observation_type="screenshot", + client_password="password", + ): + self.platform = platform + self.model = model + self.action_space = action_space + self.observation_type = observation_type + self.client_password = client_password + self.uipath_computer_use_model = UiPathComputerUseV1() + + self.thoughts = [] + self.actions = [] + self.observations = [] + self.uipath_hist = [] + + def update_history(self, rsp, img_base64): + self.uipath_hist.append( + { + "actions": rsp["step"]["actions"], + "description": rsp["step"]["description"], + "additional_parameters": { + "review": rsp["step"]["additional_parameters"]["review"], + "thought": rsp["step"]["additional_parameters"]["thought"], + "action_description": rsp["step"]["additional_parameters"][ + "action_description" + ], + "plan_action": rsp["step"]["additional_parameters"]["plan_action"], + }, + "image": img_base64, + } + ) + + def predict(self, instruction: str, obs: Dict, args, step_idx) -> List: + if step_idx == args.max_steps - 1: + message = ( + instruction + + "The sudo password is password, if needed. This is the last step, you must return the finish actions with either success or failure, depending on the result. No further steps are allowed." + ) + else: + message = instruction + "The sudo password is password, if needed." + img_base64 = base64.b64encode(obs["screenshot"]).decode("utf-8") + payload = { + "previousSteps": self.uipath_hist, + "userTask": message, + "image": img_base64, + "model_name": args.uipath_model_name, + } + rsp = asyncio.run( + self.uipath_computer_use_model.predict_request( + payload, args.uipath_model_name + ) + ) + self.update_history(rsp, img_base64) + + uipath_actions = map_uipath_agent_actions_to_osworld(rsp["step"]["actions"]) + try: + actions = self.parse_actions(uipath_actions) + self.thoughts.append(rsp) + except ValueError as e: + print("Failed to parse action from response", e) + actions = None + self.thoughts.append("") + + if len(actions) != 0: + while actions and isinstance(actions[0], list): + actions = [ + action for multi_action in actions for action in multi_action + ] + return rsp["step"], actions + + def parse_actions(self, response: str, masks=None): + if self.observation_type in ["screenshot"]: + if self.action_space == "computer_13": + actions = parse_actions_from_string(response) + else: + raise ValueError("Invalid action space: " + self.action_space) + self.actions.append(actions) + return actions + else: + raise ValueError("Invalid observation type: " + self.action_space) + + def reset(self, _logger=None): + global logger + logger = ( + _logger if _logger is not None else logging.getLogger("desktopenv.agent") + ) + + self.thoughts = [] + self.actions = [] + self.observations = [] + self.uipath_hist = [] diff --git a/requirements.txt b/requirements.txt index 01ded6a..f0f6c86 100644 --- a/requirements.txt +++ b/requirements.txt @@ -68,3 +68,5 @@ anthropic alibabacloud_ecs20140526 alibabacloud_tea_openapi alibabacloud_tea_util +json_minify +json_repair \ No newline at end of file diff --git a/run_multienv_uipath.py b/run_multienv_uipath.py new file mode 100644 index 0000000..f7b9d58 --- /dev/null +++ b/run_multienv_uipath.py @@ -0,0 +1,560 @@ +from __future__ import annotations +import argparse +import datetime +import json +import logging +import os +import sys +import signal +import time +from typing import List +from multiprocessing import Process, Manager +from multiprocessing import current_process +import lib_run_single +from desktop_env.desktop_env import DesktopEnv +from mm_agents.uipath_agent import UipathBaseAgent +from queue import Queue + +# Global variables for signal handling +active_environments = [] +processes = [] +is_terminating = False + +# import wandb + +# load the environment variables from .env file +if os.path.exists(".env"): + from dotenv import load_dotenv + + load_dotenv() + + +# Logger Configs {{{ # +def config() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Run end-to-end evaluation on the benchmark" + ) + + # environment config + parser.add_argument("--uipath_model_name", type=str, default="gpt-5-2025-08-07") + + parser.add_argument("--path_to_vm", type=str, default=None) + parser.add_argument( + "--headless", action="store_true", help="Run in headless machine" + ) + parser.add_argument( + "--action_space", type=str, default="pyautogui", help="Action type" + ) + parser.add_argument( + "--observation_type", + choices=["screenshot", "a11y_tree", "screenshot_a11y_tree", "som"], + default="screenshot", + help="Observation type", + ) + parser.add_argument("--sleep_after_execution", type=float, default=0.0) + parser.add_argument("--max_steps", type=int, default=15) + + # agent config + parser.add_argument("--max_trajectory_length", type=int, default=3) + parser.add_argument( + "--test_config_base_dir", type=str, default="evaluation_examples" + ) + + # lm config + parser.add_argument("--model", type=str, default="gpt-4o") + parser.add_argument("--temperature", type=float, default=1.0) + parser.add_argument("--top_p", type=float, default=0.9) + parser.add_argument("--max_tokens", type=int, default=1500) + parser.add_argument("--stop_token", type=str, default=None) + + # example config + parser.add_argument("--domain", type=str, default="all") + parser.add_argument( + "--test_all_meta_path", type=str, default="evaluation_examples/test_all.json" + ) + + # logging related + parser.add_argument("--result_dir", type=str, default="./results") + parser.add_argument( + "--num_envs", + type=int, + default=1, + help="Number of environments to run in parallel", + ) + parser.add_argument( + "--log_level", + type=str, + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + default="INFO", + help="Set the logging level", + ) + + # aws config + parser.add_argument( + "--region", type=str, default="us-east-1", help="AWS region for the VM" + ) + parser.add_argument( + "--provider_name", + type=str, + default="docker", + choices=["aws", "virtualbox", "vmware", "docker", "azure"], + help="Provider name", + ) + parser.add_argument( + "--client_password", type=str, default="", help="Client password" + ) + parser.add_argument("--screen_width", type=int, default=1920, help="Screen width") + parser.add_argument("--screen_height", type=int, default=1080, help="Screen height") + args = parser.parse_args() + return args + + +args = config() # Get command line arguments first + +logger = logging.getLogger() +log_level = getattr(logging, args.log_level.upper()) +logger.setLevel(log_level) + +datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S") + +file_handler = logging.FileHandler( + os.path.join("logs", "normal-{:}.log".format(datetime_str)), encoding="utf-8" +) +debug_handler = logging.FileHandler( + os.path.join("logs", "debug-{:}.log".format(datetime_str)), encoding="utf-8" +) +stdout_handler = logging.StreamHandler(sys.stdout) + +file_handler.setLevel(logging.INFO) +debug_handler.setLevel(logging.DEBUG) +stdout_handler.setLevel(log_level) + +formatter = logging.Formatter( + fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s" +) +file_handler.setFormatter(formatter) +debug_handler.setFormatter(formatter) +stdout_handler.setFormatter(formatter) + +stdout_handler.addFilter(logging.Filter("desktopenv")) + +logger.addHandler(file_handler) +logger.addHandler(debug_handler) +logger.addHandler(stdout_handler) +# }}} Logger Configs # + +logger = logging.getLogger("desktopenv.experiment") + + +def distribute_tasks(test_all_meta: dict) -> List[tuple]: + all_tasks = [] + for domain, examples in test_all_meta.items(): + for example_id in examples: + all_tasks.append((domain, example_id)) + return all_tasks + + +def process_signal_handler(signum, frame, env_idx): + """Signal handler for child processes to gracefully shut down their environments.""" + logger.info(f"Process {env_idx + 1} received signal {signum}. Shutting down...") + + # Get the active_environments from the caller's frame + local_vars = frame.f_locals + active_environments = local_vars.get("active_environments", []) + + # Close environment in the current process context + for env in active_environments: + if env is not None: + try: + logger.info(f"Process {env_idx + 1} closing environment...") + env.close() + logger.info(f"Process {env_idx + 1} environment closed successfully") + except Exception as e: + logger.error(f"Process {env_idx + 1} error closing environment: {e}") + + logger.info(f"Process {env_idx + 1} shutdown complete. Exiting.") + sys.exit(0) + + +def run_env_tasks(task_queue: Queue, args: argparse.Namespace, shared_scores: list): + active_environments = [] + env = None + try: + # from desktop_env.providers.aws.manager import IMAGE_ID_MAP + # REGION = args.region + screen_size = (args.screen_width, args.screen_height) + # ami_id = IMAGE_ID_MAP[REGION].get(screen_size, IMAGE_ID_MAP[REGION][(1920, 1080)]) + env = DesktopEnv( + path_to_vm=args.path_to_vm, + action_space=args.action_space, + provider_name=args.provider_name, + # region=REGION, + # snapshot_name=ami_id, + screen_size=screen_size, + headless=args.headless, + os_type="Ubuntu", + require_a11y_tree=args.observation_type + in ["a11y_tree", "screenshot_a11y_tree", "som"], + enable_proxy=False, + client_password=args.client_password, + ) + active_environments.append(env) + agent = UipathBaseAgent( + model=args.model, + action_space=args.action_space, + observation_type=args.observation_type, + client_password=args.client_password, + ) + + logger.info(f"Process {current_process().name} started.") + while True: + try: + item = task_queue.get(timeout=5) + except Exception: + break + domain, example_id = item + try: + config_file = os.path.join( + args.test_config_base_dir, f"examples/{domain}/{example_id}.json" + ) + with open(config_file, "r", encoding="utf-8") as f: + example = json.load(f) + logger.info(f"[{current_process().name}][Domain]: {domain}") + logger.info(f"[{current_process().name}][Example ID]: {example_id}") + logger.info( + f"[{current_process().name}][Instruction]: {example['instruction']}" + ) + example_result_dir = os.path.join( + args.result_dir, + args.action_space, + args.observation_type, + args.model, + domain, + example_id, + ) + os.makedirs(example_result_dir, exist_ok=True) + try: + lib_run_single.run_single_example_uipath( + agent, + env, + example, + args.max_steps, + example["instruction"], + args, + example_result_dir, + shared_scores, + ) + except Exception as e: + import traceback + + logger.error( + f"Exception in {current_process().name} {domain}/{example_id}: {e}" + ) + logger.error(traceback.format_exc()) + try: + env.controller.end_recording( + os.path.join(example_result_dir, "recording.mp4") + ) + except Exception as rec_e: + logger.error(f"Failed to end recording: {rec_e}") + with open(os.path.join(example_result_dir, "traj.jsonl"), "a") as f: + f.write(json.dumps({"Error": f"{domain}/{example_id} - {e}"})) + f.write("\n") + except Exception as e: + logger.error(f"Task-level error in {current_process().name}: {e}") + import traceback + + logger.error(traceback.format_exc()) + except Exception as e: + logger.error(f"Process-level error in {current_process().name}: {e}") + import traceback + + logger.error(traceback.format_exc()) + finally: + logger.info(f"{current_process().name} cleaning up environment...") + try: + if env: + env.close() + logger.info(f"{current_process().name} environment closed successfully") + except Exception as e: + logger.error( + f"{current_process().name} error during environment cleanup: {e}" + ) + + +def signal_handler(signum, frame): + """Handle termination signals (SIGINT, SIGTERM) to gracefully shutdown environments.""" + global is_terminating, active_environments, processes + + # Avoid duplicate handling + if is_terminating: + return + + is_terminating = True + logger.info(f"Received signal {signum}. Gracefully shutting down...") + + # Close all registered environments in the main process + for env in active_environments: + try: + logger.info("Closing environment...") + env.close() + logger.info("Environment closed successfully") + except Exception as e: + logger.error(f"Error closing environment: {e}") + + # Send termination signal to all child processes first + for p in processes: + if p.is_alive(): + try: + logger.info(f"Sending termination signal to process {p.name}...") + p.terminate() + except Exception as e: + logger.error(f"Error sending termination signal to process: {e}") + + # Allow a short time for processes to handle their own cleanup + time.sleep(1) + + # Forcefully terminate any processes that didn't exit + for p in processes: + if p.is_alive(): + try: + logger.info(f"Forcefully terminating process {p.name}...") + import signal as sig + + os.kill(p.pid, sig.SIGKILL) + except Exception as e: + logger.error(f"Error forcefully terminating process: {e}") + + logger.info("Shutdown complete. Exiting.") + sys.exit(0) + + +def test(args: argparse.Namespace, test_all_meta: dict) -> None: + global processes + logger.info("Args: %s", args) + all_tasks = distribute_tasks(test_all_meta) + logger.info(f"Total tasks: {len(all_tasks)}") + with Manager() as manager: + shared_scores = manager.list() + task_queue = manager.Queue() + for item in all_tasks: + task_queue.put(item) + num_envs = args.num_envs + processes = [] + for i in range(num_envs): + p = Process( + target=run_env_tasks, + args=(task_queue, args, shared_scores), + name=f"EnvProcess-{i + 1}", + ) + p.daemon = True + p.start() + processes.append(p) + logger.info(f"Started process {p.name} with PID {p.pid}") + try: + while True: + alive_count = 0 + for idx, p in enumerate(processes): + if not p.is_alive(): + logger.warning(f"Process {p.name} died, restarting...") + new_p = Process( + target=run_env_tasks, + args=(task_queue, args, shared_scores), + name=f"EnvProcess-Restart-{idx + 1}", + ) + new_p.daemon = True + new_p.start() + processes[idx] = new_p + logger.info( + f"Restarted process {new_p.name} with PID {new_p.pid}" + ) + else: + alive_count += 1 + if task_queue.empty(): + logger.info("All tasks finished.") + break + if alive_count == 0: + logger.error("All processes died, exiting.") + break + time.sleep(5) + for p in processes: + p.join() + except KeyboardInterrupt: + logger.info( + "Main process received KeyboardInterrupt. Initiating graceful shutdown..." + ) + raise + except Exception as e: + logger.error( + f"Unexpected error while waiting for processes: {e}", exc_info=True + ) + for p in processes: + if p.is_alive(): + try: + logger.info(f"Terminating process {p.name} due to error...") + p.terminate() + except Exception as term_e: + logger.error(f"Error terminating process {p.name}: {term_e}") + raise + scores = list(shared_scores) + logger.info(f"Average score: {sum(scores) / len(scores) if scores else 0}") + + +def get_unfinished( + action_space, use_model, observation_type, result_dir, total_file_json +): + target_dir = os.path.join(result_dir, action_space, observation_type, use_model) + + if not os.path.exists(target_dir): + return total_file_json + + finished = {} + for domain in os.listdir(target_dir): + finished[domain] = [] + domain_path = os.path.join(target_dir, domain) + if os.path.isdir(domain_path): + for example_id in os.listdir(domain_path): + if example_id == "onboard": + continue + example_path = os.path.join(domain_path, example_id) + if os.path.isdir(example_path): + if "result.txt" not in os.listdir(example_path): + # empty all files under example_id + for file in os.listdir(example_path): + os.remove(os.path.join(example_path, file)) + else: + finished[domain].append(example_id) + + if not finished: + return total_file_json + + for domain, examples in finished.items(): + if domain in total_file_json: + total_file_json[domain] = [ + x for x in total_file_json[domain] if x not in examples + ] + + return total_file_json + + +def get_result(action_space, use_model, observation_type, result_dir, total_file_json): + target_dir = os.path.join(result_dir, action_space, observation_type, use_model) + if not os.path.exists(target_dir): + print("New experiment, no result yet.") + return None + + all_result = [] + + for domain in os.listdir(target_dir): + domain_path = os.path.join(target_dir, domain) + if os.path.isdir(domain_path): + for example_id in os.listdir(domain_path): + example_path = os.path.join(domain_path, example_id) + if os.path.isdir(example_path): + if "result.txt" in os.listdir(example_path): + # empty all files under example_id + try: + with open( + os.path.join(example_path, "result.txt"), "r" + ) as f: + all_result.append(float(f.read())) + except (FileNotFoundError, ValueError, OSError): + all_result.append(0.0) + + if not all_result: + print("New experiment, no result yet.") + return None + else: + print("Current Success Rate:", sum(all_result) / len(all_result) * 100, "%") + return all_result + + +if __name__ == "__main__": + ####### The complete version of the list of examples ####### + os.environ["TOKENIZERS_PARALLELISM"] = "false" + + # Register signal handlers for graceful termination + signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C + signal.signal(signal.SIGTERM, signal_handler) # Handle termination signal + + try: + args = config() + + # save args to json in result_dir/action_space/observation_type/model/args.json + path_to_args = os.path.join( + args.result_dir, + args.action_space, + args.observation_type, + args.model, + "args.json", + ) + os.makedirs(os.path.dirname(path_to_args), exist_ok=True) + with open(path_to_args, "w", encoding="utf-8") as f: + json.dump(vars(args), f, indent=4) + + with open(args.test_all_meta_path, "r", encoding="utf-8") as f: + test_all_meta = json.load(f) + + if args.domain != "all": + test_all_meta = {args.domain: test_all_meta[args.domain]} + + test_file_list = get_unfinished( + args.action_space, + args.model, + args.observation_type, + args.result_dir, + test_all_meta, + ) + left_info = "" + for domain in test_file_list: + left_info += f"{domain}: {len(test_file_list[domain])}\n" + logger.info(f"Left tasks:\n{left_info}") + + get_result( + args.action_space, + args.model, + args.observation_type, + args.result_dir, + test_all_meta, + ) + test(args, test_file_list) + except KeyboardInterrupt: + logger.info("Main process received KeyboardInterrupt.") + # Signal handler will take care of cleanup + except Exception as e: + logger.error(f"Unexpected error in main process: {e}", exc_info=True) + # Also trigger cleanup for unhandled exceptions + signal_handler(signal.SIGTERM, None) + finally: + # Final cleanup in case any environments or processes remain + logger.info("Main process final cleanup...") + for env in active_environments: + if env is not None: + try: + logger.info("Closing environment in final cleanup...") + env.close() + logger.info("Environment closed successfully in final cleanup") + except Exception as e: + logger.error(f"Error during final environment cleanup: {e}") + + # First try gentle termination + for p in processes: + if p is not None and p.is_alive(): + try: + logger.info(f"Terminating process {p.name}...") + p.terminate() + except Exception as e: + logger.error(f"Error terminating process: {e}") + + # Wait a moment for processes to terminate + time.sleep(1) + + # Then force kill if needed + for p in processes: + if p is not None and p.is_alive(): + try: + logger.info(f"Force killing process {p.name}...") + os.kill(p.pid, signal.SIGKILL) + logger.info(f"Process {p.name} force killed") + except Exception as e: + logger.error(f"Error force killing process: {e}")