From 0f2655249ca54dc60bef3378450aaf7c6d9ff6bd Mon Sep 17 00:00:00 2001 From: Xinyuan Wang <109049819+XinyuanWangCS@users.noreply.github.com> Date: Wed, 16 Jul 2025 17:53:12 +0800 Subject: [PATCH] Wxy/opencua (#260) * OpenCUA Agent code base * update url * debug, modify url input * debug opencua * show result * debug agent history overlap * modify opencua agent; add comment lines --- lib_run_single.py | 3 +- mm_agents/opencua_agent.py | 326 ++++++++++++++++++------------------- run_multienv_opencua.py | 74 +++++---- show_result_opencua.py | 291 +++++++++++++++++++++++++++++++++ 4 files changed, 497 insertions(+), 197 deletions(-) create mode 100644 show_result_opencua.py diff --git a/lib_run_single.py b/lib_run_single.py index 1735d0e..dda5407 100644 --- a/lib_run_single.py +++ b/lib_run_single.py @@ -163,7 +163,8 @@ def run_single_example_opencua(agent, env, example, max_steps, instruction, args response, actions, info_dict = agent.predict(instruction, obs) logger.info(f"Got Action: {actions}") - if not actions or len(actions)==0 or actions[0]=="" or actions[0].lower().startswith("error"): # TODO: new added + # Breack if no actions + if not actions or len(actions)==0 or actions[0]=="" or actions[0].lower().startswith("error"): break for action in actions: diff --git a/mm_agents/opencua_agent.py b/mm_agents/opencua_agent.py index 512ac9c..c1ddb60 100644 --- a/mm_agents/opencua_agent.py +++ b/mm_agents/opencua_agent.py @@ -1,38 +1,45 @@ -import base64 -from loguru import logger +""" +OpenCUA Agent Implementation + +This module implements an OpenCUA agent for desktop automation tasks, building upon +existing frameworks and integrating multiple coordinate mapping systems. + +Framework and Implementation Sources: +- Main framework structure follows: https://github.com/xlang-ai/OSWorld/blob/main/mm_agents/agent.py +- Agent implementation adapted from: https://github.com/xlang-ai/OSWorld/blob/main/mm_agents/aguvis_agent.py +- Qwen2.5-VL coordinate mapping from: https://github.com/QwenLM/Qwen2.5-VL/blob/main/qwen-vl-utils/src/qwen_vl_utils/vision_process.py +""" + import re -import time -import math -import httpx -from io import BytesIO -from typing import Dict, List, Tuple, Optional -import backoff -from PIL import Image import os +import ast +import time +import json +import math +import copy +import httpx +import base64 +import backoff +from io import BytesIO +from loguru import logger +from PIL import Image +from typing import Dict, List, Tuple, Optional -AGNET_SYS_PROMPT_L1 = """You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"—\", maximize \"□\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}""".strip() - +AGNET_SYS_PROMPT_L1 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"—\", maximize \"□\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}".strip() AGNET_SYS_PROMPT_L2 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nThought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"—\", maximize \"□\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}".strip() - AGNET_SYS_PROMPT_L3 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nObservation:\n - Describe the current computer state based on the full screenshot in detail. \n - Application Context:\n - The active application\n - The active window or page\n - Overall layout and visible interface\n - Key Elements:\n - Menu items and toolbars \n - Buttons and controls\n - Text fields and content\n - Dialog boxes or popups\n - Error messages or notifications\n - Loading states\n - Other key elements\n - Describe any content, elements, options, information or clues that are possibly relevant to achieving the task goal, including their name, content, or shape (if possible).\n\nThought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"—\", maximize \"□\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}\n".strip() -AGNET_SYS_PROMPT_L0 = """You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task. - -For each step, output the action as PyAutoGUI code or the following functions: -- {"name": "computer.triple_click", "description": "Triple click on the screen", "parameters": {"type": "object", "properties": {"x": {"type": "number", "description": "The x coordinate of the triple click"}, "y": {"type": "number", "description": "The y coordinate of the triple click"}}, "required": ["x", "y"]}} -- {"name": "computer.terminate", "description": "Terminate the current task and report its completion status", "parameters": {"type": "object", "properties": {"status": {"type": "string", "enum": ["success", "failure"], "description": "The status of the task"}}, "required": ["status"]}} -""".strip() - +STEP_TEMPLATE = "# Step {step_num}:\n" INSTRUTION_TEMPLATE = "# Task Instruction:\n{instruction}\n\nPlease generate the next move according to the screenshot, task instruction and previous steps (if provided).\n" -STEP_TEMPLATE = "# Step {step_num}:\n" ACTION_HISTORY_TEMPLATE = "## Action:\n{action}\n" THOUGHT_HISTORY_TEMPLATE = "## Thought:\n{thought}\n\n## Action:\n{action}\n" OBSERVATION_HISTORY_TEMPLATE = "## Observation:\n{observation}\n\n## Thought:\n{thought}\n\n## Action:\n{action}\n" DETAIL_HISTORY_TEMPLATE = "## Thought:\n{thought}\n\n## Action:\n{action}\n\n## Code:\n{code}\n" -# Function to encode the image + def encode_image(image_content): + """Encode the image to base64""" return base64.b64encode(image_content).decode('utf-8') def parse_response_to_cot_and_action(input_string, screen_size, coordinate_type) -> Tuple[str, List[str], dict]: @@ -40,57 +47,61 @@ def parse_response_to_cot_and_action(input_string, screen_size, coordinate_type) try: sections = {} - if "computer.terminate" in input_string.lower(): - code_blocks = re.findall(r'```(?:code)?\s*(.*?)\s*```', input_string, re.DOTALL | re.IGNORECASE) - if code_blocks: - last_code = code_blocks[-1].strip().lower() - if "fail" in last_code: - return "FAIL", ["FAIL"], {} - elif "success" in last_code: - return "DONE", ["DONE"], {} - - return "DONE", ["DONE"], {} - obs_match = re.search(r'^##\s*Observation\s*:?[\n\r]+(.*?)(?=^##\s*Thought:|^##\s*Action:|^##|\Z)', input_string, re.DOTALL | re.MULTILINE) if obs_match: sections['observation'] = obs_match.group(1).strip() - # logger.warning(f"Extracted Observation: {sections.get('observation', 'None')}") thought_match = re.search(r'^##\s*Thought\s*:?[\n\r]+(.*?)(?=^##\s*Action:|^##|\Z)', input_string, re.DOTALL | re.MULTILINE) if thought_match: sections['thought'] = thought_match.group(1).strip() - # logger.warning(f"Extracted Thought: {sections.get('thought', 'None')}") action_match = re.search(r'^##\s*Action\s*:?[\n\r]+(.*?)(?=^##|\Z)', input_string, re.DOTALL | re.MULTILINE) if action_match: action = action_match.group(1).strip() sections['action'] = action.strip() - # logger.warning(f"Extracted Action: {sections.get('action', 'None')}") - code_blocks = re.findall(r'```(?:python)?\s*(.*?)\s*```', input_string, re.DOTALL) + if "computer.terminate" in input_string.lower(): + # Look for code blocks that might contain terminate command + code_blocks = re.findall(r'```(?:code|python)?\s*(.*?)\s*```', input_string, re.DOTALL | re.IGNORECASE) + if code_blocks: + last_code = code_blocks[-1].strip().lower() + if "fail" in last_code: + sections['code'] = "FAIL" + return "FAIL", ["FAIL"], sections + elif "success" in last_code: + sections['code'] = "DONE" + return "DONE", ["DONE"], sections + # Default to DONE if terminate is mentioned but no specific status + sections['code'] = "DONE" + return "DONE", ["DONE"], sections + + code_blocks = re.findall(r'```(?:python)\s*(.*?)\s*```', input_string, re.DOTALL) if code_blocks: code = code_blocks[-1].strip() sections['original_code'] = transform_agnet_action_to_code_block(code) corrected_code = correct_pyautogui_arguments(code) sections['code'] = corrected_code sections['code'] = project_coordinate_to_absolute_scale(corrected_code, screen_width=screen_size[0], screen_height=screen_size[1], coordinate_type=coordinate_type) - # logger.warning(f"Extracted Code: {sections.get('code', 'None')}") + else: + # No code blocks found + sections['code'] = "WAIT" + return "WAIT", ["WAIT"], sections if 'code' not in sections: logger.error("Missing required action or code section") return None, None, {} - if 'action' not in sections: # TODO: new added + if 'action' not in sections: sections['action'] = "" return sections['action'], [sections['code']], sections except Exception as e: logger.exception(f"Error parsing response: {str(e)}\nInput string: {input_string}") - return None, None, {} - + return None, None, {} def correct_pyautogui_arguments(code: str) -> str: + """Correct the pyautogui arguments""" function_corrections = { 'write': { 'incorrect_args': ['text', 'content'], @@ -154,6 +165,7 @@ def correct_pyautogui_arguments(code: str) -> str: return corrected_code def split_args(args_str: str) -> List[str]: + """Split the arguments string into a list of arguments""" args = [] current_arg = '' within_string = False @@ -185,13 +197,15 @@ def smart_resize( max_aspect_ratio_allowed: Optional[float] = None, size_can_be_smaller_than_factor: bool = False, ): - """Rescales the image so that the following conditions are met: + """ + The function is modified from https://github.com/QwenLM/Qwen2.5-VL/blob/main/qwen-vl-utils/src/qwen_vl_utils/vision_process.py - 1. Both dimensions (height and width) are divisible by 'factor'. + Qwen2.5-VL based model need this function to resize screenshots. - 2. The total number of pixels is within the range ['min_pixels', 'max_pixels']. - - 3. The aspect ratio of the image is maintained as closely as possible. + Rescales the image so that the following conditions are met: + 1. Both dimensions (height and width) are divisible by 'factor'. + 2. The total number of pixels is within the range ['min_pixels', 'max_pixels']. + 3. The aspect ratio of the image is maintained as closely as possible. """ if not size_can_be_smaller_than_factor and (height < factor or width < factor): @@ -218,39 +232,29 @@ def smart_resize( return h_bar, w_bar def _coordinate_projection(x, y, screen_width, screen_height, coordinate_type): - if coordinate_type == "relative": + """Project the coordinates to the absolute scale""" + if coordinate_type == "relative": + return int(round(x * screen_width)), int(round(y * screen_height)) + elif coordinate_type == "absolute": + return x, y + elif coordinate_type == "qwen25": + if 0 <= x <= 1 and 0 <= y <= 1: + # If already normalized, treat like "relative" return int(round(x * screen_width)), int(round(y * screen_height)) - elif coordinate_type == "absolute": - return x, y - elif coordinate_type == "qwen25": - if 0 <= x <= 1 and 0 <= y <= 1: - # If already normalized, treat like "relative" - return int(round(x * screen_width)), int(round(y * screen_height)) - height, width = smart_resize( - height=screen_height, - width=screen_width, - factor=28, - min_pixels=3136, - max_pixels=12845056 - ) - return int(x / width * screen_width), int(y / height * screen_height) - elif coordinate_type == "relative1000": - if screen_width == 0 or screen_height == 0: - raise ValueError("Screen width and height must be greater than zero for relative1000 coordinates.") - x_abs = int(round(x * screen_width / 1000)) - y_abs = int(round(y * screen_height / 1000)) - return x_abs, y_abs - else: - raise ValueError(f"Unsupported coordinate type: {coordinate_type}") + height, width = smart_resize( + height=screen_height, + width=screen_width, + factor=28, + min_pixels=3136, + max_pixels=12845056 # We use this max_pixels setting in our training data + ) + return int(x / width * screen_width), int(y / height * screen_height) + else: + raise ValueError(f"Unsupported coordinate type: {coordinate_type}") def project_coordinate_to_absolute_scale(pyautogui_code_relative_coordinates, screen_width, screen_height, coordinate_type="relative"): - """ - Convert the relative coordinates in the pyautogui code to absolute coordinates based on the logical screen size. - """ - import re - import ast - + """Convert the relative coordinates in the pyautogui code to absolute coordinates based on the logical screen size.""" if coordinate_type not in ["relative", "relative1000", "absolute", "qwen25"]: raise ValueError(f"Invalid coordinate type: {coordinate_type}. Expected one of ['relative', 'relative1000', 'absolute', 'qwen25'].") @@ -426,8 +430,7 @@ def update_code_with_new_coordinates(code, updated_positions): Returns: str: The updated Python code. """ - # TODO: the matching logics in 'update_code_with_new_coordinates' - # and 'extract_positions_and_instructions' are not exactly the same + lines = code.splitlines() updated_code_lines = [] position_index = 0 # Tracks which position update to use @@ -463,36 +466,51 @@ def update_code_with_new_coordinates(code, updated_positions): return "\n".join(updated_code_lines) def transform_agnet_action_to_code_block(action): + """Transform the agent action to a code block: not used in agent, for logging only""" if "computer.terminate" in action or "browser.select_option" in action or "browser.clear" in action: return f"```code\n{action}\n```" else: return f"```python\n{action}\n```" class OpenCUAAgent: + """ + OpenCUA Agent for desktop automation tasks. + + This class implements a OpenCUA Model based agent that can observe + desktop environments through screenshots and execute mouse/keyboard actions + via PyAutoGUI to complete automation tasks. + + Attributes: + model (str): Name of the language model being used + history_type (str): Type of history recording mechanism + actions (list): History of executed actions + observations (list): History of environment observations + cots (list): Chain of thought reasoning records + """ def __init__( self, - model, - history_type: str, - max_image_history_length: int, - - platform="ubuntu", - - max_tokens=1500, - top_p=0.9, - temperature=0, - action_space="pyautogui", - observation_type="screenshot", - cot_level: str = "l2", - - screen_size=(1920, 1080), - coordinate_type: str = "relative", # relative or qwen25 - - detail_history_length: int = 0, + model: str, # OpenCUA model name + history_type: str, # History step type: action_history, thought_history, observation_history + max_image_history_length: int = 3, # The max number of images in the history + platform: str = "ubuntu", # The platform of the computer + max_tokens: int = 1500, # The max number of tokens in the response + top_p: float = 0.9, # The top p value in the response + temperature: float = 0, # The temperature value in the response + action_space: str = "pyautogui", # The action space: pyautogui + observation_type: str = "screenshot", # The observation type: screenshot + cot_level: str = "l2", # The CoT level: l1, l2, l3 + screen_size: Tuple[int, int] = (1920, 1080), # The screen size + coordinate_type: str = "relative", # The coordinate type: relative, absolute, qwen25 **kwargs ): - self.platform = platform + assert coordinate_type in ["relative", "absolute", "qwen25"] + assert action_space in ["pyautogui"], "Invalid action space" + assert observation_type in ["screenshot"], "Invalid observation type" + assert history_type in ["action_history", "thought_history", "observation_history"] + assert model is not None, "Model cannot be None" + self.model = model - assert self.model is not None, "Executor model cannot be None" + self.platform = platform self.max_tokens = max_tokens self.top_p = top_p self.temperature = temperature @@ -500,19 +518,9 @@ class OpenCUAAgent: self.observation_type = observation_type self.history_type = history_type self.coordinate_type = coordinate_type - assert coordinate_type in ["relative", "relative1000", "absolute", "qwen25"] - assert action_space in ["pyautogui"], "Invalid action space" - assert observation_type in ["screenshot"], "Invalid observation type" - assert history_type in ["action_history", "thought_history", "observation_history"] - - self.actions = [] - self.observations = [] - self.cots = [] - self.cot_level = cot_level self.screen_size = screen_size self.max_image_history_length = max_image_history_length - self.detail_history_length = detail_history_length if history_type == "action_history": self.HISTORY_TEMPLATE = ACTION_HISTORY_TEMPLATE @@ -522,15 +530,27 @@ class OpenCUAAgent: self.HISTORY_TEMPLATE = OBSERVATION_HISTORY_TEMPLATE else: raise ValueError(f"Invalid history type: {history_type}") + + if cot_level == "l3": + self.SYSTEM_PROMPT = AGNET_SYS_PROMPT_L3 + elif cot_level == "l2": + self.SYSTEM_PROMPT = AGNET_SYS_PROMPT_L2 + elif cot_level == "l1": + self.SYSTEM_PROMPT = AGNET_SYS_PROMPT_L1 + else: + raise ValueError(f"Invalid COT level: {cot_level}") + + self.actions = [] + self.observations = [] + self.cots = [] def reset(self, _logger=None): global logger logger = _logger if _logger is not None else logging.getLogger("desktopenv.agent") self.observations = [] - self.thoughts = [] + self.cots = [] self.actions = [] - self.image_summaries = [] def _scale_scroll_for_windows(self, code: str, factor: int = 50) -> str: """ pyautogui.scroll has a different scale on Ubuntu and Windows, multiple 'factor' when scrolling on Windows system""" @@ -541,7 +561,7 @@ class OpenCUAAgent: code = pattern_pos.sub(lambda m: f"{m.group(1)}{int(m.group(2))*factor})", code) return code - def predict(self, instruction: str, obs: Dict, **kwargs) -> List: + def predict(self, instruction: str, obs: Dict, **kwargs) -> Tuple[str, List[str], Dict]: """ Predict the next action(s) based on the current observation. """ @@ -557,31 +577,10 @@ class OpenCUAAgent: print("Logical screen size", self.screen_size) messages = [] - - if self.cot_level == "l3": - messages.append({ + messages.append({ "role": "system", - "content": AGNET_SYS_PROMPT_L3 + "content": self.SYSTEM_PROMPT }) - elif self.cot_level == "l2": - messages.append({ - "role": "system", - "content": AGNET_SYS_PROMPT_L2 - }) - elif self.cot_level == "l1": - messages.append({ - "role": "system", - "content": AGNET_SYS_PROMPT_L1 - }) - elif self.cot_level == "l0": - messages.append({ - "role": "system", - "content": AGNET_SYS_PROMPT_L0 - }) - else: - raise ValueError(f"Invalid COT level: {self.cot_level}") - - instruction_prompt = INSTRUTION_TEMPLATE.format(instruction=instruction) history_step_texts = [] for i in range(len(self.actions)): @@ -596,19 +595,11 @@ class OpenCUAAgent: ] }) - if self.detail_history_length > 0 and i >= len(self.actions) - self.detail_history_length: - history_content = STEP_TEMPLATE.format(step_num=i+1) + DETAIL_HISTORY_TEMPLATE.format( - observation=self.cots[i].get('observation'), - thought=self.cots[i].get('thought'), - action=self.cots[i]['action'], - code=self.cots[i]['original_code'] - ) - else: - history_content = STEP_TEMPLATE.format(step_num=i+1) + self.HISTORY_TEMPLATE.format( - observation=self.cots[i].get('observation'), - thought=self.cots[i].get('thought'), - action=self.cots[i]['action'] - ) + history_content = STEP_TEMPLATE.format(step_num=i+1) + self.HISTORY_TEMPLATE.format( + observation=self.cots[i].get('observation'), + thought=self.cots[i].get('thought'), + action=self.cots[i]['action'] + ) messages.append({ "role": "assistant", @@ -636,26 +627,11 @@ class OpenCUAAgent: }, { "type": "text", - "text": instruction_prompt + "text": INSTRUTION_TEMPLATE.format(instruction=instruction) } ] }) - # Print message structure if needed - # logger.info("\nMessages structure:") - # messages_to_print = [] - # current_image = 1 - # for msg in messages: - # msg_copy = copy.deepcopy(msg) - # if isinstance(msg_copy['content'], list): - # for content in msg_copy['content']: - # if content['type'] == 'image_url': - # content['image_url']['url'] = f'Image {current_image}' - # current_image += 1 - # messages_to_print.append(msg_copy) - - # logger.info(json.dumps(messages_to_print, indent=2)) - response = self.call_llm({ "model": self.model, "messages": messages, @@ -667,7 +643,7 @@ class OpenCUAAgent: logger.info(f"Model Output: \n\n{response}") if not response: logger.error("No response found in the response.") - return response, [], {} + return "ERROR", [], {} low_level_instruction, pyautogui_actions, other_cot = parse_response_to_cot_and_action(response, self.screen_size, self.coordinate_type) if not pyautogui_actions: @@ -683,13 +659,34 @@ class OpenCUAAgent: logger.info(f"Parsed pyautogui Action: \n{pyautogui_actions}") self.actions.append(low_level_instruction) + if 'action' not in other_cot or not other_cot['action'] or 'thought' not in other_cot or not other_cot['thought']: + logger.error("Error! no action/thought in cot") + logger.error(f"response: {response}") + logger.error(f"cot: {other_cot}") self.cots.append(other_cot) - + + # Print message structure if needed + logger.info(f"\nInstruction: {instruction}") + messages_to_print = [] + current_image = 1 + for msg in messages: + msg_copy = copy.deepcopy(msg) + if isinstance(msg_copy['content'], list): + for content in msg_copy['content']: + if content['type'] == 'image_url': + content['image_url']['url'] = f'Image {current_image}' + current_image += 1 + messages_to_print.append(msg_copy) + + messages_to_print.append({ + "new_step_cot": other_cot, + "response": response + }) + logger.info(json.dumps(messages_to_print, indent=2)) + return response, pyautogui_actions, {} - # return response, [parsed_action] - @backoff.on_exception( backoff.constant, # here you should add more model exceptions as you want, @@ -703,6 +700,7 @@ class OpenCUAAgent: max_tries=10 ) def call_llm(self, payload, model): + """Call the LLM API""" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {os.environ['OPENCUA_API_KEY']}" diff --git a/run_multienv_opencua.py b/run_multienv_opencua.py index 8c61c09..1765976 100644 --- a/run_multienv_opencua.py +++ b/run_multienv_opencua.py @@ -20,8 +20,6 @@ active_environments = [] processes = [] is_terminating = False -# import wandb - # load the environment variables from .env file if os.path.exists(".env"): from dotenv import load_dotenv @@ -47,17 +45,8 @@ def config() -> argparse.Namespace: default="screenshot", help="Observation type", ) - parser.add_argument("--screen_width", type=int, default=1920) - parser.add_argument("--screen_height", type=int, default=1080) parser.add_argument("--sleep_after_execution", type=float, default=0.0) parser.add_argument("--max_steps", type=int, default=15) - - # agent config - parser.add_argument("--cot_level", type=str, default="l2", help="CoT version: l0, l1, l2, l3") - parser.add_argument("--history_type", type=str, default="action_history", help="History: action history") - parser.add_argument("--coordinate_type", type=str, default="relative", help="type of coordinate", choices=["relative", "qwen25"]) - parser.add_argument("--max_image_history_length", type=int, default=3) - parser.add_argument("--detail_history_length", type=int, default=0, help="length of detail history") # evaluation config parser.add_argument( @@ -71,6 +60,12 @@ def config() -> argparse.Namespace: parser.add_argument("--max_tokens", type=int, default=1500) parser.add_argument("--stop_token", type=str, default=None) + # OpenCUAagent config + parser.add_argument("--cot_level", type=str, default="l2", help="CoT version: l1, l2, l3. Default is l2 includes 'thought' and 'action'") + parser.add_argument("--history_type", type=str, default="action_history", help="Use action to represent history steps", choices=["action_history", "thought_history", "observation_history"]) + parser.add_argument("--coordinate_type", type=str, default="relative", help="Type of coordinate: Qwen2-VL or Kimi-VL based models use 'relative'; Qwen2.5-VL based models use 'qwen25'", choices=["relative", "qwen25"]) + parser.add_argument("--max_image_history_length", type=int, default=3, help="The max number of images in the history.") + # example config parser.add_argument("--domain", type=str, default="all") parser.add_argument( @@ -86,6 +81,18 @@ def config() -> argparse.Namespace: parser.add_argument( "--region", type=str, default="us-east-1", help="AWS region for the VM" ) + parser.add_argument( + "--provider_name", type=str, default="aws", choices=["aws", "virtualbox", "vmware", "docker", "azure"], help="Provider name" + ) + parser.add_argument( + "--client_password", type=str, default="", help="Client password" + ) + parser.add_argument( + "--screen_width", type=int, default=1920, help="Screen width" + ) + parser.add_argument( + "--screen_height", type=int, default=1080, help="Screen height" + ) args = parser.parse_args() return args @@ -187,36 +194,24 @@ def run_env_tasks(env_idx: int, env_tasks: dict, args: argparse.Namespace, share signal.signal(signal.SIGTERM, lambda signum, frame: process_signal_handler(signum, frame, env_idx)) from desktop_env.providers.aws.manager import IMAGE_ID_MAP - REGION = "us-east-1" + REGION = args.region + screen_size = (args.screen_width, args.screen_height) + ami_id = IMAGE_ID_MAP[REGION].get(screen_size, IMAGE_ID_MAP[REGION][(1920, 1080)]) env = DesktopEnv( path_to_vm=args.path_to_vm, action_space=args.action_space, - - provider_name="aws", + provider_name=args.provider_name, region=REGION, - snapshot_name=IMAGE_ID_MAP[REGION], - - screen_size=(args.screen_width, args.screen_height), + snapshot_name=ami_id, + screen_size=screen_size, headless=args.headless, os_type="Ubuntu", require_a11y_tree=args.observation_type in ["a11y_tree", "screenshot_a11y_tree", "som"], + enable_proxy=True, + client_password=args.client_password ) active_environments.append(env) - agent = OpenCUAAgent( - env=env, - model=args.model, - max_tokens=args.max_tokens, - top_p=args.top_p, - temperature=args.temperature, - action_space=args.action_space, - observation_type=args.observation_type, - cot_level=args.cot_level, - history_type=args.history_type, - screen_size=(args.screen_width, args.screen_height), - coordinate_type=args.coordinate_type, - max_image_history_length=args.max_image_history_length, - detail_history_length=args.detail_history_length, - ) + logger.info(f"Executing tasks in environment {env_idx + 1}/{args.num_envs}") try: @@ -242,6 +237,21 @@ def run_env_tasks(env_idx: int, env_tasks: dict, args: argparse.Namespace, share ) os.makedirs(example_result_dir, exist_ok=True) + agent = OpenCUAAgent( + env=env, + model=args.model, + max_tokens=args.max_tokens, + top_p=args.top_p, + temperature=args.temperature, + action_space=args.action_space, + observation_type=args.observation_type, + cot_level=args.cot_level, + history_type=args.history_type, + screen_size=(args.screen_width, args.screen_height), + coordinate_type=args.coordinate_type, + max_image_history_length=args.max_image_history_length, + ) + try: lib_run_single.run_single_example_opencua( agent, diff --git a/show_result_opencua.py b/show_result_opencua.py new file mode 100644 index 0000000..36ab4ad --- /dev/null +++ b/show_result_opencua.py @@ -0,0 +1,291 @@ +from collections import defaultdict +import json +import os +import pandas as pd +import shutil +from loguru import logger +import prettytable + + +def synthesis(df: pd.DataFrame, domains: list[str], basic: bool = False): + valid_df = df[df["Domain"].isin(domains)] + success_rate = sum(valid_df['%Success Rate'] * valid_df['#Test']) / sum(valid_df['#Test']) if not valid_df.empty else None + if basic: + return { + "#Test": sum(valid_df["#Test"]), + "%Success Rate": success_rate, + } + avg_success_length = sum(valid_df["#Success Steps"]) / sum(valid_df["#Success"]) if sum(valid_df["#Success"]) > 0 else None + avg_failure_length = (sum(valid_df["#Total Steps"]) - sum(valid_df["#Success Steps"])) / (sum(valid_df["#Test"]) - sum(valid_df["#Success"])) if (sum(valid_df["#Test"]) - sum(valid_df["#Success"])) > 0 else None + return { + "#Test": sum(valid_df["#Test"]), + "#Success": sum(valid_df["#Success"]), + "%Success Rate": success_rate, + "#Success Steps": sum(valid_df["#Success Steps"]), + "#Total Steps": sum(valid_df["#Total Steps"]), + "Avg. Success Length": avg_success_length, + "Avg. Failure Length": avg_failure_length, + } + + +def prettytable_df(df: pd.DataFrame): + table = prettytable.PrettyTable() + table.field_names = df.columns + for _, row in df.iterrows(): + table.add_row(row) + table.set_style(prettytable.TableStyle.SINGLE_BORDER) + table.float_format = ".2" + return table + +def check_turn_folder_exsitence(folder_path: str): + for sub_folder in os.listdir(folder_path): + if sub_folder.startswith("turn_"): + return True + return False + +def get_result_from_folder(target_dir, target_domain: str, print_details: bool, show_single_result:int,turn_id:int, version_id:int, task_file: str): + + if not os.path.exists(target_dir): + print("?New experiment, no result yet.") + return None + + if "windows" in target_dir.lower(): + with open("evaluation_examples_windows/test_all_windows.json", "r") as f: + all_reference = json.load(f) + else: + with open("evaluation_examples/test_all.json", "r") as f: + all_reference = json.load(f) + if "rlrollout" in target_dir.lower(): + with open("evaluation_examples/rl_tasks0612.json", "r") as f: + all_reference = json.load(f) + if task_file is not None: + with open(task_file, "r") as f: + all_reference = json.load(f) + try: + with open("evaluation_examples/bad_tests.json", "r") as f: + bad_tests = json.load(f) + except FileNotFoundError: + print("No 'bad_tests.json' found. Continue without bad tests.") + bad_tests = {} + + all_result = [] + domain_result = defaultdict(dict) + domain_length = defaultdict(dict) + domain_length_success = defaultdict(dict) + domain_length_failure = defaultdict(dict) + manifest = {"domains": []} + + if check_turn_folder_exsitence(target_dir): + sub_folder=f"turn_{turn_id}" + if version_id > 0: + sub_folder+=f"_version_{version_id}" + target_turn_dir = os.path.join(target_dir, sub_folder) + if not os.path.exists(target_turn_dir): + print(f"Target directory {target_turn_dir} does not exist.") + return None + else: + target_turn_dir = target_dir + + print(f"Check directory: {target_turn_dir}") + + for domain in os.listdir(target_turn_dir): + if target_domain != "all" and domain != target_domain: + continue + domain_path = os.path.join(target_turn_dir, domain) + if not os.path.isdir(domain_path): + continue + + manifest_domain = {"name": domain, "trajectories": []} + for example_id in all_reference[domain]: + if example_id in bad_tests.get(domain, []): + continue + example_path = os.path.join(domain_path, example_id) + if not os.path.exists(example_path): + continue + if os.listdir(example_path): # If the folder is not empty + manifest_domain["trajectories"].append(example_id) + if "result.txt" not in os.listdir(example_path): + if print_details: + print(f"{example_id}: ERROR, no result.txt") + continue + if "traj.jsonl" not in os.listdir(example_path): + if print_details: + print(f"{example_id}: ERROR, no traj.jsonl") + continue + result = open(os.path.join(example_path, "result.txt"), "r").read() + try: + result = float(result) + except: + if result.strip() in {"True", "true"}: + result = 1.0 + elif result.strip() in {"False", "false"}: + result = 0.0 + else: + logger.error(f"domain: {domain}, example_id: {example_id}, result: {result}") + logger.exception(f"Unknown result: {result}") + # raise ValueError("Unknown result:", result) + continue + if print_details: + print(f"{example_id}: {result}") + # if domain == "chrome" and result > 0.5: + # print(f"{turn_num}: {example_id}") + if example_id not in domain_result[domain]: + domain_result[domain][example_id] = result + else: + domain_result[domain][example_id] = max(domain_result[domain][example_id], result) + + with open(os.path.join(example_path, "traj.jsonl"), "r") as f: + traj = [json.loads(line) for line in f] + step_num_line = -1 + + while "step_num" not in traj[step_num_line]: + step_num_line-=1 + + if example_id not in domain_length[domain] or result > 0.5: + domain_length[domain][example_id] = traj[step_num_line]["step_num"] + + if result > 0.5: # The success threshold is temporarily 0.5 + domain_length_success[domain][example_id] = traj[step_num_line]["step_num"] + else: + domain_length_failure[domain][example_id] = traj[step_num_line]["step_num"] + + + all_result.append(domain_result[domain][example_id]) + + if len(manifest_domain["trajectories"]) > 0: + manifest["domains"].append(manifest_domain) + + with open(os.path.join(target_turn_dir, "manifest.json"), "w") as f: + json.dump(manifest, f, indent=2) + try: + shutil.copy("html/trajectory/single_exp/index.html", os.path.join(target_turn_dir, "index.html")) + shutil.copy("html/trajectory/single_exp/marked.min.js", os.path.join(target_turn_dir, "marked.min.js")) + except FileNotFoundError: + pass + + if len(all_result) == 0: + print("New experiment, no result yet.") + return None + + print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") + df = pd.DataFrame([ + { + "Domain": domain, + "#Test": len(list(domain_result[domain].values())), + "#Success":len(domain_length_success[domain].values()), + "%Success Rate": sum(list(domain_result[domain].values())) / len(list(domain_result[domain].values())) * 100, + "#Success Steps": sum(domain_length_success[domain].values()), + "#Total Steps": sum(list(domain_length[domain].values())), + # "Avg. Length": sum(domain_length[domain].values()) / len(domain_length[domain].values()) if len(domain_length[domain].values()) > 0 else None, + "Avg. Success Length": sum(domain_length_success[domain].values()) / len(domain_length_success[domain].values()) if len(domain_length_success[domain].values()) > 0 else None, + "Avg. Failure Length": sum(domain_length_failure[domain].values()) / len(domain_length_failure[domain].values()) if len(domain_length_failure[domain].values()) > 0 else None, + } for domain in domain_result.keys() + ]) + print(prettytable_df(df)) + + print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") + if "windows" in target_dir.lower(): + s1_df = pd.DataFrame([ + # {"Domain": "OS", **synthesis(df, ["os"])}, + {"Domain": "Office", **synthesis(df, ["libreoffice_calc", "libreoffice_impress", "libreoffice_writer"])}, + {"Domain": "Daily", **synthesis(df, ["vlc", "thunderbird", "chrome"])}, + {"Domain": "Professional", **synthesis(df, ["gimp", "vs_code"])}, + # {"Domain": "Workflow", **synthesis(df, ["multi_apps"])}, + ### windows_specifed below + {"Domain": "Windows Calc", **synthesis(df, ["windows_calc"])}, + {"Domain": "Clock", **synthesis(df, ["clock"])}, + {"Domain": "File_Explorer", **synthesis(df, ["file_explorer"])}, + {"Domain": "Microsoft_Paint", **synthesis(df, ["microsoft_paint"])}, + {"Domain": "Msedge", **synthesis(df, ["msedge"])}, + {"Domain": "Notepad", **synthesis(df, ["notepad"])}, + {"Domain": "Settings", **synthesis(df, ["settings"])}, + ]) + else: + s1_df = pd.DataFrame([ + {"Domain": "OS", **synthesis(df, ["os"])}, + {"Domain": "Office", **synthesis(df, ["libreoffice_calc", "libreoffice_impress", "libreoffice_writer"])}, + {"Domain": "Daily", **synthesis(df, ["vlc", "thunderbird", "chrome"])}, + {"Domain": "Professional", **synthesis(df, ["gimp", "vs_code"])}, + {"Domain": "Workflow", **synthesis(df, ["multi_apps"])}, + ]) + print(prettytable_df(s1_df)) + + print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") + print(f"Total: {len(all_result)}\t Steps: {sum(df['#Total Steps'])}") + print(f"Success Rate: {sum(all_result) / len(all_result) * 100:.2f}") + total_df = pd.DataFrame([ + {"Domain": "Total", **synthesis(df, ["os", "libreoffice_calc", "libreoffice_impress", "libreoffice_writer", + "vlc", "thunderbird", "chrome", "gimp", "vs_code", "multi_apps","windows_calc", "clock", "file_explorer", "microsoft_paint", "msedge", "notepad", "settings"])} + ]) + print(prettytable_df(total_df)) + return domain_result, all_result + + +def domain_results_union(drs: list): + union = defaultdict(dict) + + domains = set() + for dr in drs: + domains.update(dr.keys()) + for domain in domains: + tasks = set() + for dr in drs: + tasks.update(dr.get(domain, {}).keys()) + for task in tasks: + scores = [dr.get(domain, {}).get(task, 0) for dr in drs] + union[domain][task] = max(scores) + + print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") + df = pd.DataFrame([ + { + "Domain": domain, + "#Test Cases": len(list(union[domain].values())), + "%Success Rate": sum(list(union[domain].values())) / len(list(union[domain].values())) * 100, + } for domain in union.keys() + ]) + print(prettytable_df(df)) + + print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") + s1_df = pd.DataFrame([ + {"Domain": "OS", **synthesis(df, ["os"], basic=True)}, + {"Domain": "Office", **synthesis(df, ["libreoffice_calc", "libreoffice_impress", "libreoffice_writer"], basic=True)}, + {"Domain": "Daily", **synthesis(df, ["vlc", "thunderbird", "chrome"], basic=True)}, + {"Domain": "Professional", **synthesis(df, ["gimp", "vs_code"], basic=True)}, + {"Domain": "Workflow", **synthesis(df, ["multi_apps"], basic=True)}, + ]) + print(prettytable_df(s1_df)) + + all_result = [] + for domain in union.keys(): + all_result.extend(list(union[domain].values())) + print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") + print(f"Total: {len(all_result)}") + print(f"Success Rate: {sum(all_result) / len(all_result) * 100:.2f}") + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser("Show result of the experiment.") + parser.add_argument("override_path", nargs='+', type=str, help="One or more result directories.") + parser.add_argument("--task_file", type=str, default=None, help="The task file to use for the experiment.") + parser.add_argument("--show_single_result", type=int, default=0) + parser.add_argument("--domain", type=str, default="all") + parser.add_argument("--print_details", action="store_true") + parser.add_argument("--t",type=int, default=1, help="The turn id to show the result.") + parser.add_argument("--v", type=int, default=0, help="The version id to show the result. Just use for previous result, no need to use in the new experiment.") + args = parser.parse_args() + + # print(args.override_path) + + + + if len(args.override_path) == 1: + get_result_from_folder(args.override_path[0], args.domain, args.print_details, args.show_single_result, args.t, args.v, args.task_file) + else: + drs = [] + for override_path in args.override_path: + dr, _ = get_result_from_folder(override_path, args.domain, args.print_details, args.show_single_result,args.t, args.v, args.task_file) + if dr is not None: + drs.append(dr) + domain_results_union(drs)