Wxy/opencua (#260)

* OpenCUA Agent code base

* update url

* debug, modify url input

* debug opencua

* show result

* debug agent history overlap

* modify opencua agent; add comment lines
This commit is contained in:
Xinyuan Wang
2025-07-16 17:53:12 +08:00
committed by GitHub
parent 5e5058c1f2
commit 0f2655249c
4 changed files with 497 additions and 197 deletions

View File

@@ -163,7 +163,8 @@ def run_single_example_opencua(agent, env, example, max_steps, instruction, args
response, actions, info_dict = agent.predict(instruction, obs)
logger.info(f"Got Action: {actions}")
if not actions or len(actions)==0 or actions[0]=="" or actions[0].lower().startswith("error"): # TODO: new added
# Breack if no actions
if not actions or len(actions)==0 or actions[0]=="" or actions[0].lower().startswith("error"):
break
for action in actions:

View File

@@ -1,38 +1,45 @@
import base64
from loguru import logger
"""
OpenCUA Agent Implementation
This module implements an OpenCUA agent for desktop automation tasks, building upon
existing frameworks and integrating multiple coordinate mapping systems.
Framework and Implementation Sources:
- Main framework structure follows: https://github.com/xlang-ai/OSWorld/blob/main/mm_agents/agent.py
- Agent implementation adapted from: https://github.com/xlang-ai/OSWorld/blob/main/mm_agents/aguvis_agent.py
- Qwen2.5-VL coordinate mapping from: https://github.com/QwenLM/Qwen2.5-VL/blob/main/qwen-vl-utils/src/qwen_vl_utils/vision_process.py
"""
import re
import time
import math
import httpx
from io import BytesIO
from typing import Dict, List, Tuple, Optional
import backoff
from PIL import Image
import os
import ast
import time
import json
import math
import copy
import httpx
import base64
import backoff
from io import BytesIO
from loguru import logger
from PIL import Image
from typing import Dict, List, Tuple, Optional
AGNET_SYS_PROMPT_L1 = """You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"\", maximize \"\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}""".strip()
AGNET_SYS_PROMPT_L1 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"\", maximize \"\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}".strip()
AGNET_SYS_PROMPT_L2 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nThought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"\", maximize \"\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}".strip()
AGNET_SYS_PROMPT_L3 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nObservation:\n - Describe the current computer state based on the full screenshot in detail. \n - Application Context:\n - The active application\n - The active window or page\n - Overall layout and visible interface\n - Key Elements:\n - Menu items and toolbars \n - Buttons and controls\n - Text fields and content\n - Dialog boxes or popups\n - Error messages or notifications\n - Loading states\n - Other key elements\n - Describe any content, elements, options, information or clues that are possibly relevant to achieving the task goal, including their name, content, or shape (if possible).\n\nThought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"\", maximize \"\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}\n".strip()
AGNET_SYS_PROMPT_L0 = """You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.
For each step, output the action as PyAutoGUI code or the following functions:
- {"name": "computer.triple_click", "description": "Triple click on the screen", "parameters": {"type": "object", "properties": {"x": {"type": "number", "description": "The x coordinate of the triple click"}, "y": {"type": "number", "description": "The y coordinate of the triple click"}}, "required": ["x", "y"]}}
- {"name": "computer.terminate", "description": "Terminate the current task and report its completion status", "parameters": {"type": "object", "properties": {"status": {"type": "string", "enum": ["success", "failure"], "description": "The status of the task"}}, "required": ["status"]}}
""".strip()
STEP_TEMPLATE = "# Step {step_num}:\n"
INSTRUTION_TEMPLATE = "# Task Instruction:\n{instruction}\n\nPlease generate the next move according to the screenshot, task instruction and previous steps (if provided).\n"
STEP_TEMPLATE = "# Step {step_num}:\n"
ACTION_HISTORY_TEMPLATE = "## Action:\n{action}\n"
THOUGHT_HISTORY_TEMPLATE = "## Thought:\n{thought}\n\n## Action:\n{action}\n"
OBSERVATION_HISTORY_TEMPLATE = "## Observation:\n{observation}\n\n## Thought:\n{thought}\n\n## Action:\n{action}\n"
DETAIL_HISTORY_TEMPLATE = "## Thought:\n{thought}\n\n## Action:\n{action}\n\n## Code:\n{code}\n"
# Function to encode the image
def encode_image(image_content):
"""Encode the image to base64"""
return base64.b64encode(image_content).decode('utf-8')
def parse_response_to_cot_and_action(input_string, screen_size, coordinate_type) -> Tuple[str, List[str], dict]:
@@ -40,57 +47,61 @@ def parse_response_to_cot_and_action(input_string, screen_size, coordinate_type)
try:
sections = {}
if "computer.terminate" in input_string.lower():
code_blocks = re.findall(r'```(?:code)?\s*(.*?)\s*```', input_string, re.DOTALL | re.IGNORECASE)
if code_blocks:
last_code = code_blocks[-1].strip().lower()
if "fail" in last_code:
return "FAIL", ["FAIL"], {}
elif "success" in last_code:
return "DONE", ["DONE"], {}
return "DONE", ["DONE"], {}
obs_match = re.search(r'^##\s*Observation\s*:?[\n\r]+(.*?)(?=^##\s*Thought:|^##\s*Action:|^##|\Z)', input_string, re.DOTALL | re.MULTILINE)
if obs_match:
sections['observation'] = obs_match.group(1).strip()
# logger.warning(f"Extracted Observation: {sections.get('observation', 'None')}")
thought_match = re.search(r'^##\s*Thought\s*:?[\n\r]+(.*?)(?=^##\s*Action:|^##|\Z)', input_string, re.DOTALL | re.MULTILINE)
if thought_match:
sections['thought'] = thought_match.group(1).strip()
# logger.warning(f"Extracted Thought: {sections.get('thought', 'None')}")
action_match = re.search(r'^##\s*Action\s*:?[\n\r]+(.*?)(?=^##|\Z)', input_string, re.DOTALL | re.MULTILINE)
if action_match:
action = action_match.group(1).strip()
sections['action'] = action.strip()
# logger.warning(f"Extracted Action: {sections.get('action', 'None')}")
code_blocks = re.findall(r'```(?:python)?\s*(.*?)\s*```', input_string, re.DOTALL)
if "computer.terminate" in input_string.lower():
# Look for code blocks that might contain terminate command
code_blocks = re.findall(r'```(?:code|python)?\s*(.*?)\s*```', input_string, re.DOTALL | re.IGNORECASE)
if code_blocks:
last_code = code_blocks[-1].strip().lower()
if "fail" in last_code:
sections['code'] = "FAIL"
return "FAIL", ["FAIL"], sections
elif "success" in last_code:
sections['code'] = "DONE"
return "DONE", ["DONE"], sections
# Default to DONE if terminate is mentioned but no specific status
sections['code'] = "DONE"
return "DONE", ["DONE"], sections
code_blocks = re.findall(r'```(?:python)\s*(.*?)\s*```', input_string, re.DOTALL)
if code_blocks:
code = code_blocks[-1].strip()
sections['original_code'] = transform_agnet_action_to_code_block(code)
corrected_code = correct_pyautogui_arguments(code)
sections['code'] = corrected_code
sections['code'] = project_coordinate_to_absolute_scale(corrected_code, screen_width=screen_size[0], screen_height=screen_size[1], coordinate_type=coordinate_type)
# logger.warning(f"Extracted Code: {sections.get('code', 'None')}")
else:
# No code blocks found
sections['code'] = "WAIT"
return "WAIT", ["WAIT"], sections
if 'code' not in sections:
logger.error("Missing required action or code section")
return None, None, {}
if 'action' not in sections: # TODO: new added
if 'action' not in sections:
sections['action'] = ""
return sections['action'], [sections['code']], sections
except Exception as e:
logger.exception(f"Error parsing response: {str(e)}\nInput string: {input_string}")
return None, None, {}
return None, None, {}
def correct_pyautogui_arguments(code: str) -> str:
"""Correct the pyautogui arguments"""
function_corrections = {
'write': {
'incorrect_args': ['text', 'content'],
@@ -154,6 +165,7 @@ def correct_pyautogui_arguments(code: str) -> str:
return corrected_code
def split_args(args_str: str) -> List[str]:
"""Split the arguments string into a list of arguments"""
args = []
current_arg = ''
within_string = False
@@ -185,13 +197,15 @@ def smart_resize(
max_aspect_ratio_allowed: Optional[float] = None,
size_can_be_smaller_than_factor: bool = False,
):
"""Rescales the image so that the following conditions are met:
"""
The function is modified from https://github.com/QwenLM/Qwen2.5-VL/blob/main/qwen-vl-utils/src/qwen_vl_utils/vision_process.py
1. Both dimensions (height and width) are divisible by 'factor'.
Qwen2.5-VL based model need this function to resize screenshots.
2. The total number of pixels is within the range ['min_pixels', 'max_pixels'].
3. The aspect ratio of the image is maintained as closely as possible.
Rescales the image so that the following conditions are met:
1. Both dimensions (height and width) are divisible by 'factor'.
2. The total number of pixels is within the range ['min_pixels', 'max_pixels'].
3. The aspect ratio of the image is maintained as closely as possible.
"""
if not size_can_be_smaller_than_factor and (height < factor or width < factor):
@@ -218,39 +232,29 @@ def smart_resize(
return h_bar, w_bar
def _coordinate_projection(x, y, screen_width, screen_height, coordinate_type):
if coordinate_type == "relative":
"""Project the coordinates to the absolute scale"""
if coordinate_type == "relative":
return int(round(x * screen_width)), int(round(y * screen_height))
elif coordinate_type == "absolute":
return x, y
elif coordinate_type == "qwen25":
if 0 <= x <= 1 and 0 <= y <= 1:
# If already normalized, treat like "relative"
return int(round(x * screen_width)), int(round(y * screen_height))
elif coordinate_type == "absolute":
return x, y
elif coordinate_type == "qwen25":
if 0 <= x <= 1 and 0 <= y <= 1:
# If already normalized, treat like "relative"
return int(round(x * screen_width)), int(round(y * screen_height))
height, width = smart_resize(
height=screen_height,
width=screen_width,
factor=28,
min_pixels=3136,
max_pixels=12845056
)
return int(x / width * screen_width), int(y / height * screen_height)
elif coordinate_type == "relative1000":
if screen_width == 0 or screen_height == 0:
raise ValueError("Screen width and height must be greater than zero for relative1000 coordinates.")
x_abs = int(round(x * screen_width / 1000))
y_abs = int(round(y * screen_height / 1000))
return x_abs, y_abs
else:
raise ValueError(f"Unsupported coordinate type: {coordinate_type}")
height, width = smart_resize(
height=screen_height,
width=screen_width,
factor=28,
min_pixels=3136,
max_pixels=12845056 # We use this max_pixels setting in our training data
)
return int(x / width * screen_width), int(y / height * screen_height)
else:
raise ValueError(f"Unsupported coordinate type: {coordinate_type}")
def project_coordinate_to_absolute_scale(pyautogui_code_relative_coordinates, screen_width, screen_height, coordinate_type="relative"):
"""
Convert the relative coordinates in the pyautogui code to absolute coordinates based on the logical screen size.
"""
import re
import ast
"""Convert the relative coordinates in the pyautogui code to absolute coordinates based on the logical screen size."""
if coordinate_type not in ["relative", "relative1000", "absolute", "qwen25"]:
raise ValueError(f"Invalid coordinate type: {coordinate_type}. Expected one of ['relative', 'relative1000', 'absolute', 'qwen25'].")
@@ -426,8 +430,7 @@ def update_code_with_new_coordinates(code, updated_positions):
Returns:
str: The updated Python code.
"""
# TODO: the matching logics in 'update_code_with_new_coordinates'
# and 'extract_positions_and_instructions' are not exactly the same
lines = code.splitlines()
updated_code_lines = []
position_index = 0 # Tracks which position update to use
@@ -463,36 +466,51 @@ def update_code_with_new_coordinates(code, updated_positions):
return "\n".join(updated_code_lines)
def transform_agnet_action_to_code_block(action):
"""Transform the agent action to a code block: not used in agent, for logging only"""
if "computer.terminate" in action or "browser.select_option" in action or "browser.clear" in action:
return f"```code\n{action}\n```"
else:
return f"```python\n{action}\n```"
class OpenCUAAgent:
"""
OpenCUA Agent for desktop automation tasks.
This class implements a OpenCUA Model based agent that can observe
desktop environments through screenshots and execute mouse/keyboard actions
via PyAutoGUI to complete automation tasks.
Attributes:
model (str): Name of the language model being used
history_type (str): Type of history recording mechanism
actions (list): History of executed actions
observations (list): History of environment observations
cots (list): Chain of thought reasoning records
"""
def __init__(
self,
model,
history_type: str,
max_image_history_length: int,
platform="ubuntu",
max_tokens=1500,
top_p=0.9,
temperature=0,
action_space="pyautogui",
observation_type="screenshot",
cot_level: str = "l2",
screen_size=(1920, 1080),
coordinate_type: str = "relative", # relative or qwen25
detail_history_length: int = 0,
model: str, # OpenCUA model name
history_type: str, # History step type: action_history, thought_history, observation_history
max_image_history_length: int = 3, # The max number of images in the history
platform: str = "ubuntu", # The platform of the computer
max_tokens: int = 1500, # The max number of tokens in the response
top_p: float = 0.9, # The top p value in the response
temperature: float = 0, # The temperature value in the response
action_space: str = "pyautogui", # The action space: pyautogui
observation_type: str = "screenshot", # The observation type: screenshot
cot_level: str = "l2", # The CoT level: l1, l2, l3
screen_size: Tuple[int, int] = (1920, 1080), # The screen size
coordinate_type: str = "relative", # The coordinate type: relative, absolute, qwen25
**kwargs
):
self.platform = platform
assert coordinate_type in ["relative", "absolute", "qwen25"]
assert action_space in ["pyautogui"], "Invalid action space"
assert observation_type in ["screenshot"], "Invalid observation type"
assert history_type in ["action_history", "thought_history", "observation_history"]
assert model is not None, "Model cannot be None"
self.model = model
assert self.model is not None, "Executor model cannot be None"
self.platform = platform
self.max_tokens = max_tokens
self.top_p = top_p
self.temperature = temperature
@@ -500,19 +518,9 @@ class OpenCUAAgent:
self.observation_type = observation_type
self.history_type = history_type
self.coordinate_type = coordinate_type
assert coordinate_type in ["relative", "relative1000", "absolute", "qwen25"]
assert action_space in ["pyautogui"], "Invalid action space"
assert observation_type in ["screenshot"], "Invalid observation type"
assert history_type in ["action_history", "thought_history", "observation_history"]
self.actions = []
self.observations = []
self.cots = []
self.cot_level = cot_level
self.screen_size = screen_size
self.max_image_history_length = max_image_history_length
self.detail_history_length = detail_history_length
if history_type == "action_history":
self.HISTORY_TEMPLATE = ACTION_HISTORY_TEMPLATE
@@ -522,15 +530,27 @@ class OpenCUAAgent:
self.HISTORY_TEMPLATE = OBSERVATION_HISTORY_TEMPLATE
else:
raise ValueError(f"Invalid history type: {history_type}")
if cot_level == "l3":
self.SYSTEM_PROMPT = AGNET_SYS_PROMPT_L3
elif cot_level == "l2":
self.SYSTEM_PROMPT = AGNET_SYS_PROMPT_L2
elif cot_level == "l1":
self.SYSTEM_PROMPT = AGNET_SYS_PROMPT_L1
else:
raise ValueError(f"Invalid COT level: {cot_level}")
self.actions = []
self.observations = []
self.cots = []
def reset(self, _logger=None):
global logger
logger = _logger if _logger is not None else logging.getLogger("desktopenv.agent")
self.observations = []
self.thoughts = []
self.cots = []
self.actions = []
self.image_summaries = []
def _scale_scroll_for_windows(self, code: str, factor: int = 50) -> str:
""" pyautogui.scroll has a different scale on Ubuntu and Windows, multiple 'factor' when scrolling on Windows system"""
@@ -541,7 +561,7 @@ class OpenCUAAgent:
code = pattern_pos.sub(lambda m: f"{m.group(1)}{int(m.group(2))*factor})", code)
return code
def predict(self, instruction: str, obs: Dict, **kwargs) -> List:
def predict(self, instruction: str, obs: Dict, **kwargs) -> Tuple[str, List[str], Dict]:
"""
Predict the next action(s) based on the current observation.
"""
@@ -557,31 +577,10 @@ class OpenCUAAgent:
print("Logical screen size", self.screen_size)
messages = []
if self.cot_level == "l3":
messages.append({
messages.append({
"role": "system",
"content": AGNET_SYS_PROMPT_L3
"content": self.SYSTEM_PROMPT
})
elif self.cot_level == "l2":
messages.append({
"role": "system",
"content": AGNET_SYS_PROMPT_L2
})
elif self.cot_level == "l1":
messages.append({
"role": "system",
"content": AGNET_SYS_PROMPT_L1
})
elif self.cot_level == "l0":
messages.append({
"role": "system",
"content": AGNET_SYS_PROMPT_L0
})
else:
raise ValueError(f"Invalid COT level: {self.cot_level}")
instruction_prompt = INSTRUTION_TEMPLATE.format(instruction=instruction)
history_step_texts = []
for i in range(len(self.actions)):
@@ -596,19 +595,11 @@ class OpenCUAAgent:
]
})
if self.detail_history_length > 0 and i >= len(self.actions) - self.detail_history_length:
history_content = STEP_TEMPLATE.format(step_num=i+1) + DETAIL_HISTORY_TEMPLATE.format(
observation=self.cots[i].get('observation'),
thought=self.cots[i].get('thought'),
action=self.cots[i]['action'],
code=self.cots[i]['original_code']
)
else:
history_content = STEP_TEMPLATE.format(step_num=i+1) + self.HISTORY_TEMPLATE.format(
observation=self.cots[i].get('observation'),
thought=self.cots[i].get('thought'),
action=self.cots[i]['action']
)
history_content = STEP_TEMPLATE.format(step_num=i+1) + self.HISTORY_TEMPLATE.format(
observation=self.cots[i].get('observation'),
thought=self.cots[i].get('thought'),
action=self.cots[i]['action']
)
messages.append({
"role": "assistant",
@@ -636,26 +627,11 @@ class OpenCUAAgent:
},
{
"type": "text",
"text": instruction_prompt
"text": INSTRUTION_TEMPLATE.format(instruction=instruction)
}
]
})
# Print message structure if needed
# logger.info("\nMessages structure:")
# messages_to_print = []
# current_image = 1
# for msg in messages:
# msg_copy = copy.deepcopy(msg)
# if isinstance(msg_copy['content'], list):
# for content in msg_copy['content']:
# if content['type'] == 'image_url':
# content['image_url']['url'] = f'Image {current_image}'
# current_image += 1
# messages_to_print.append(msg_copy)
# logger.info(json.dumps(messages_to_print, indent=2))
response = self.call_llm({
"model": self.model,
"messages": messages,
@@ -667,7 +643,7 @@ class OpenCUAAgent:
logger.info(f"Model Output: \n\n{response}")
if not response:
logger.error("No response found in the response.")
return response, [], {}
return "ERROR", [], {}
low_level_instruction, pyautogui_actions, other_cot = parse_response_to_cot_and_action(response, self.screen_size, self.coordinate_type)
if not pyautogui_actions:
@@ -683,13 +659,34 @@ class OpenCUAAgent:
logger.info(f"Parsed pyautogui Action: \n{pyautogui_actions}")
self.actions.append(low_level_instruction)
if 'action' not in other_cot or not other_cot['action'] or 'thought' not in other_cot or not other_cot['thought']:
logger.error("Error! no action/thought in cot")
logger.error(f"response: {response}")
logger.error(f"cot: {other_cot}")
self.cots.append(other_cot)
# Print message structure if needed
logger.info(f"\nInstruction: {instruction}")
messages_to_print = []
current_image = 1
for msg in messages:
msg_copy = copy.deepcopy(msg)
if isinstance(msg_copy['content'], list):
for content in msg_copy['content']:
if content['type'] == 'image_url':
content['image_url']['url'] = f'Image {current_image}'
current_image += 1
messages_to_print.append(msg_copy)
messages_to_print.append({
"new_step_cot": other_cot,
"response": response
})
logger.info(json.dumps(messages_to_print, indent=2))
return response, pyautogui_actions, {}
# return response, [parsed_action]
@backoff.on_exception(
backoff.constant,
# here you should add more model exceptions as you want,
@@ -703,6 +700,7 @@ class OpenCUAAgent:
max_tries=10
)
def call_llm(self, payload, model):
"""Call the LLM API"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {os.environ['OPENCUA_API_KEY']}"

View File

@@ -20,8 +20,6 @@ active_environments = []
processes = []
is_terminating = False
# import wandb
# load the environment variables from .env file
if os.path.exists(".env"):
from dotenv import load_dotenv
@@ -47,17 +45,8 @@ def config() -> argparse.Namespace:
default="screenshot",
help="Observation type",
)
parser.add_argument("--screen_width", type=int, default=1920)
parser.add_argument("--screen_height", type=int, default=1080)
parser.add_argument("--sleep_after_execution", type=float, default=0.0)
parser.add_argument("--max_steps", type=int, default=15)
# agent config
parser.add_argument("--cot_level", type=str, default="l2", help="CoT version: l0, l1, l2, l3")
parser.add_argument("--history_type", type=str, default="action_history", help="History: action history")
parser.add_argument("--coordinate_type", type=str, default="relative", help="type of coordinate", choices=["relative", "qwen25"])
parser.add_argument("--max_image_history_length", type=int, default=3)
parser.add_argument("--detail_history_length", type=int, default=0, help="length of detail history")
# evaluation config
parser.add_argument(
@@ -71,6 +60,12 @@ def config() -> argparse.Namespace:
parser.add_argument("--max_tokens", type=int, default=1500)
parser.add_argument("--stop_token", type=str, default=None)
# OpenCUAagent config
parser.add_argument("--cot_level", type=str, default="l2", help="CoT version: l1, l2, l3. Default is l2 includes 'thought' and 'action'")
parser.add_argument("--history_type", type=str, default="action_history", help="Use action to represent history steps", choices=["action_history", "thought_history", "observation_history"])
parser.add_argument("--coordinate_type", type=str, default="relative", help="Type of coordinate: Qwen2-VL or Kimi-VL based models use 'relative'; Qwen2.5-VL based models use 'qwen25'", choices=["relative", "qwen25"])
parser.add_argument("--max_image_history_length", type=int, default=3, help="The max number of images in the history.")
# example config
parser.add_argument("--domain", type=str, default="all")
parser.add_argument(
@@ -86,6 +81,18 @@ def config() -> argparse.Namespace:
parser.add_argument(
"--region", type=str, default="us-east-1", help="AWS region for the VM"
)
parser.add_argument(
"--provider_name", type=str, default="aws", choices=["aws", "virtualbox", "vmware", "docker", "azure"], help="Provider name"
)
parser.add_argument(
"--client_password", type=str, default="", help="Client password"
)
parser.add_argument(
"--screen_width", type=int, default=1920, help="Screen width"
)
parser.add_argument(
"--screen_height", type=int, default=1080, help="Screen height"
)
args = parser.parse_args()
return args
@@ -187,36 +194,24 @@ def run_env_tasks(env_idx: int, env_tasks: dict, args: argparse.Namespace, share
signal.signal(signal.SIGTERM, lambda signum, frame: process_signal_handler(signum, frame, env_idx))
from desktop_env.providers.aws.manager import IMAGE_ID_MAP
REGION = "us-east-1"
REGION = args.region
screen_size = (args.screen_width, args.screen_height)
ami_id = IMAGE_ID_MAP[REGION].get(screen_size, IMAGE_ID_MAP[REGION][(1920, 1080)])
env = DesktopEnv(
path_to_vm=args.path_to_vm,
action_space=args.action_space,
provider_name="aws",
provider_name=args.provider_name,
region=REGION,
snapshot_name=IMAGE_ID_MAP[REGION],
screen_size=(args.screen_width, args.screen_height),
snapshot_name=ami_id,
screen_size=screen_size,
headless=args.headless,
os_type="Ubuntu",
require_a11y_tree=args.observation_type in ["a11y_tree", "screenshot_a11y_tree", "som"],
enable_proxy=True,
client_password=args.client_password
)
active_environments.append(env)
agent = OpenCUAAgent(
env=env,
model=args.model,
max_tokens=args.max_tokens,
top_p=args.top_p,
temperature=args.temperature,
action_space=args.action_space,
observation_type=args.observation_type,
cot_level=args.cot_level,
history_type=args.history_type,
screen_size=(args.screen_width, args.screen_height),
coordinate_type=args.coordinate_type,
max_image_history_length=args.max_image_history_length,
detail_history_length=args.detail_history_length,
)
logger.info(f"Executing tasks in environment {env_idx + 1}/{args.num_envs}")
try:
@@ -242,6 +237,21 @@ def run_env_tasks(env_idx: int, env_tasks: dict, args: argparse.Namespace, share
)
os.makedirs(example_result_dir, exist_ok=True)
agent = OpenCUAAgent(
env=env,
model=args.model,
max_tokens=args.max_tokens,
top_p=args.top_p,
temperature=args.temperature,
action_space=args.action_space,
observation_type=args.observation_type,
cot_level=args.cot_level,
history_type=args.history_type,
screen_size=(args.screen_width, args.screen_height),
coordinate_type=args.coordinate_type,
max_image_history_length=args.max_image_history_length,
)
try:
lib_run_single.run_single_example_opencua(
agent,

291
show_result_opencua.py Normal file
View File

@@ -0,0 +1,291 @@
from collections import defaultdict
import json
import os
import pandas as pd
import shutil
from loguru import logger
import prettytable
def synthesis(df: pd.DataFrame, domains: list[str], basic: bool = False):
valid_df = df[df["Domain"].isin(domains)]
success_rate = sum(valid_df['%Success Rate'] * valid_df['#Test']) / sum(valid_df['#Test']) if not valid_df.empty else None
if basic:
return {
"#Test": sum(valid_df["#Test"]),
"%Success Rate": success_rate,
}
avg_success_length = sum(valid_df["#Success Steps"]) / sum(valid_df["#Success"]) if sum(valid_df["#Success"]) > 0 else None
avg_failure_length = (sum(valid_df["#Total Steps"]) - sum(valid_df["#Success Steps"])) / (sum(valid_df["#Test"]) - sum(valid_df["#Success"])) if (sum(valid_df["#Test"]) - sum(valid_df["#Success"])) > 0 else None
return {
"#Test": sum(valid_df["#Test"]),
"#Success": sum(valid_df["#Success"]),
"%Success Rate": success_rate,
"#Success Steps": sum(valid_df["#Success Steps"]),
"#Total Steps": sum(valid_df["#Total Steps"]),
"Avg. Success Length": avg_success_length,
"Avg. Failure Length": avg_failure_length,
}
def prettytable_df(df: pd.DataFrame):
table = prettytable.PrettyTable()
table.field_names = df.columns
for _, row in df.iterrows():
table.add_row(row)
table.set_style(prettytable.TableStyle.SINGLE_BORDER)
table.float_format = ".2"
return table
def check_turn_folder_exsitence(folder_path: str):
for sub_folder in os.listdir(folder_path):
if sub_folder.startswith("turn_"):
return True
return False
def get_result_from_folder(target_dir, target_domain: str, print_details: bool, show_single_result:int,turn_id:int, version_id:int, task_file: str):
if not os.path.exists(target_dir):
print("?New experiment, no result yet.")
return None
if "windows" in target_dir.lower():
with open("evaluation_examples_windows/test_all_windows.json", "r") as f:
all_reference = json.load(f)
else:
with open("evaluation_examples/test_all.json", "r") as f:
all_reference = json.load(f)
if "rlrollout" in target_dir.lower():
with open("evaluation_examples/rl_tasks0612.json", "r") as f:
all_reference = json.load(f)
if task_file is not None:
with open(task_file, "r") as f:
all_reference = json.load(f)
try:
with open("evaluation_examples/bad_tests.json", "r") as f:
bad_tests = json.load(f)
except FileNotFoundError:
print("No 'bad_tests.json' found. Continue without bad tests.")
bad_tests = {}
all_result = []
domain_result = defaultdict(dict)
domain_length = defaultdict(dict)
domain_length_success = defaultdict(dict)
domain_length_failure = defaultdict(dict)
manifest = {"domains": []}
if check_turn_folder_exsitence(target_dir):
sub_folder=f"turn_{turn_id}"
if version_id > 0:
sub_folder+=f"_version_{version_id}"
target_turn_dir = os.path.join(target_dir, sub_folder)
if not os.path.exists(target_turn_dir):
print(f"Target directory {target_turn_dir} does not exist.")
return None
else:
target_turn_dir = target_dir
print(f"Check directory: {target_turn_dir}")
for domain in os.listdir(target_turn_dir):
if target_domain != "all" and domain != target_domain:
continue
domain_path = os.path.join(target_turn_dir, domain)
if not os.path.isdir(domain_path):
continue
manifest_domain = {"name": domain, "trajectories": []}
for example_id in all_reference[domain]:
if example_id in bad_tests.get(domain, []):
continue
example_path = os.path.join(domain_path, example_id)
if not os.path.exists(example_path):
continue
if os.listdir(example_path): # If the folder is not empty
manifest_domain["trajectories"].append(example_id)
if "result.txt" not in os.listdir(example_path):
if print_details:
print(f"{example_id}: ERROR, no result.txt")
continue
if "traj.jsonl" not in os.listdir(example_path):
if print_details:
print(f"{example_id}: ERROR, no traj.jsonl")
continue
result = open(os.path.join(example_path, "result.txt"), "r").read()
try:
result = float(result)
except:
if result.strip() in {"True", "true"}:
result = 1.0
elif result.strip() in {"False", "false"}:
result = 0.0
else:
logger.error(f"domain: {domain}, example_id: {example_id}, result: {result}")
logger.exception(f"Unknown result: {result}")
# raise ValueError("Unknown result:", result)
continue
if print_details:
print(f"{example_id}: {result}")
# if domain == "chrome" and result > 0.5:
# print(f"{turn_num}: {example_id}")
if example_id not in domain_result[domain]:
domain_result[domain][example_id] = result
else:
domain_result[domain][example_id] = max(domain_result[domain][example_id], result)
with open(os.path.join(example_path, "traj.jsonl"), "r") as f:
traj = [json.loads(line) for line in f]
step_num_line = -1
while "step_num" not in traj[step_num_line]:
step_num_line-=1
if example_id not in domain_length[domain] or result > 0.5:
domain_length[domain][example_id] = traj[step_num_line]["step_num"]
if result > 0.5: # The success threshold is temporarily 0.5
domain_length_success[domain][example_id] = traj[step_num_line]["step_num"]
else:
domain_length_failure[domain][example_id] = traj[step_num_line]["step_num"]
all_result.append(domain_result[domain][example_id])
if len(manifest_domain["trajectories"]) > 0:
manifest["domains"].append(manifest_domain)
with open(os.path.join(target_turn_dir, "manifest.json"), "w") as f:
json.dump(manifest, f, indent=2)
try:
shutil.copy("html/trajectory/single_exp/index.html", os.path.join(target_turn_dir, "index.html"))
shutil.copy("html/trajectory/single_exp/marked.min.js", os.path.join(target_turn_dir, "marked.min.js"))
except FileNotFoundError:
pass
if len(all_result) == 0:
print("New experiment, no result yet.")
return None
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
df = pd.DataFrame([
{
"Domain": domain,
"#Test": len(list(domain_result[domain].values())),
"#Success":len(domain_length_success[domain].values()),
"%Success Rate": sum(list(domain_result[domain].values())) / len(list(domain_result[domain].values())) * 100,
"#Success Steps": sum(domain_length_success[domain].values()),
"#Total Steps": sum(list(domain_length[domain].values())),
# "Avg. Length": sum(domain_length[domain].values()) / len(domain_length[domain].values()) if len(domain_length[domain].values()) > 0 else None,
"Avg. Success Length": sum(domain_length_success[domain].values()) / len(domain_length_success[domain].values()) if len(domain_length_success[domain].values()) > 0 else None,
"Avg. Failure Length": sum(domain_length_failure[domain].values()) / len(domain_length_failure[domain].values()) if len(domain_length_failure[domain].values()) > 0 else None,
} for domain in domain_result.keys()
])
print(prettytable_df(df))
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
if "windows" in target_dir.lower():
s1_df = pd.DataFrame([
# {"Domain": "OS", **synthesis(df, ["os"])},
{"Domain": "Office", **synthesis(df, ["libreoffice_calc", "libreoffice_impress", "libreoffice_writer"])},
{"Domain": "Daily", **synthesis(df, ["vlc", "thunderbird", "chrome"])},
{"Domain": "Professional", **synthesis(df, ["gimp", "vs_code"])},
# {"Domain": "Workflow", **synthesis(df, ["multi_apps"])},
### windows_specifed below
{"Domain": "Windows Calc", **synthesis(df, ["windows_calc"])},
{"Domain": "Clock", **synthesis(df, ["clock"])},
{"Domain": "File_Explorer", **synthesis(df, ["file_explorer"])},
{"Domain": "Microsoft_Paint", **synthesis(df, ["microsoft_paint"])},
{"Domain": "Msedge", **synthesis(df, ["msedge"])},
{"Domain": "Notepad", **synthesis(df, ["notepad"])},
{"Domain": "Settings", **synthesis(df, ["settings"])},
])
else:
s1_df = pd.DataFrame([
{"Domain": "OS", **synthesis(df, ["os"])},
{"Domain": "Office", **synthesis(df, ["libreoffice_calc", "libreoffice_impress", "libreoffice_writer"])},
{"Domain": "Daily", **synthesis(df, ["vlc", "thunderbird", "chrome"])},
{"Domain": "Professional", **synthesis(df, ["gimp", "vs_code"])},
{"Domain": "Workflow", **synthesis(df, ["multi_apps"])},
])
print(prettytable_df(s1_df))
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
print(f"Total: {len(all_result)}\t Steps: {sum(df['#Total Steps'])}")
print(f"Success Rate: {sum(all_result) / len(all_result) * 100:.2f}")
total_df = pd.DataFrame([
{"Domain": "Total", **synthesis(df, ["os", "libreoffice_calc", "libreoffice_impress", "libreoffice_writer",
"vlc", "thunderbird", "chrome", "gimp", "vs_code", "multi_apps","windows_calc", "clock", "file_explorer", "microsoft_paint", "msedge", "notepad", "settings"])}
])
print(prettytable_df(total_df))
return domain_result, all_result
def domain_results_union(drs: list):
union = defaultdict(dict)
domains = set()
for dr in drs:
domains.update(dr.keys())
for domain in domains:
tasks = set()
for dr in drs:
tasks.update(dr.get(domain, {}).keys())
for task in tasks:
scores = [dr.get(domain, {}).get(task, 0) for dr in drs]
union[domain][task] = max(scores)
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
df = pd.DataFrame([
{
"Domain": domain,
"#Test Cases": len(list(union[domain].values())),
"%Success Rate": sum(list(union[domain].values())) / len(list(union[domain].values())) * 100,
} for domain in union.keys()
])
print(prettytable_df(df))
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
s1_df = pd.DataFrame([
{"Domain": "OS", **synthesis(df, ["os"], basic=True)},
{"Domain": "Office", **synthesis(df, ["libreoffice_calc", "libreoffice_impress", "libreoffice_writer"], basic=True)},
{"Domain": "Daily", **synthesis(df, ["vlc", "thunderbird", "chrome"], basic=True)},
{"Domain": "Professional", **synthesis(df, ["gimp", "vs_code"], basic=True)},
{"Domain": "Workflow", **synthesis(df, ["multi_apps"], basic=True)},
])
print(prettytable_df(s1_df))
all_result = []
for domain in union.keys():
all_result.extend(list(union[domain].values()))
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
print(f"Total: {len(all_result)}")
print(f"Success Rate: {sum(all_result) / len(all_result) * 100:.2f}")
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser("Show result of the experiment.")
parser.add_argument("override_path", nargs='+', type=str, help="One or more result directories.")
parser.add_argument("--task_file", type=str, default=None, help="The task file to use for the experiment.")
parser.add_argument("--show_single_result", type=int, default=0)
parser.add_argument("--domain", type=str, default="all")
parser.add_argument("--print_details", action="store_true")
parser.add_argument("--t",type=int, default=1, help="The turn id to show the result.")
parser.add_argument("--v", type=int, default=0, help="The version id to show the result. Just use for previous result, no need to use in the new experiment.")
args = parser.parse_args()
# print(args.override_path)
if len(args.override_path) == 1:
get_result_from_folder(args.override_path[0], args.domain, args.print_details, args.show_single_result, args.t, args.v, args.task_file)
else:
drs = []
for override_path in args.override_path:
dr, _ = get_result_from_folder(override_path, args.domain, args.print_details, args.show_single_result,args.t, args.v, args.task_file)
if dr is not None:
drs.append(dr)
domain_results_union(drs)