Uitars/dev (#291)

* use aws pub ip

* os task fix: set the default dim screen time to be 300s

* add all the uitars agents:
1. run_multienv_uitars.py: Qwen2VL-based UITARS models
2. run_multienv_uitars15_v1.py: UITARS1.5-7B
3. run_multienv_uitars15_v2.py: SeedVL1.5 thining/non-thinking

---------

Co-authored-by: Jiaqi <dengjiaqi@moonshot.cn>
This commit is contained in:
Xinyuan Wang
2025-07-31 08:52:27 +08:00
committed by GitHub
parent dd488c7294
commit 3d32556085
9 changed files with 2155 additions and 643 deletions

View File

@@ -6,7 +6,7 @@ import re
import xml.etree.ElementTree as ET
from io import BytesIO
from typing import Dict, List
import os
import backoff
import numpy as np
from PIL import Image
@@ -28,22 +28,16 @@ from mm_agents.prompts import (
UITARS_CALL_USR_ACTION_SPACE,
UITARS_USR_PROMPT_NOTHOUGHT,
UITARS_USR_PROMPT_THOUGHT,
UITARS_NORMAL_ACTION_SPACE
)
logger = logging.getLogger("desktopenv.agent")
from loguru import logger
FINISH_WORD = "finished"
WAIT_WORD = "wait"
ENV_FAIL_WORD = "error_env"
CALL_USER = "call_user"
IMAGE_FACTOR = 28
MIN_PIXELS = 100 * 28 * 28
MAX_PIXELS = 16384 * 28 * 28
MAX_RATIO = 200
pure_text_settings = ["a11y_tree"]
attributes_ns_ubuntu = "https://accessibility.windows.example.org/ns/attributes"
@@ -109,68 +103,8 @@ def escape_single_quotes(text):
pattern = r"(?<!\\)'"
return re.sub(pattern, r"\\'", text)
def round_by_factor(number: int, factor: int) -> int:
"""Returns the closest integer to 'number' that is divisible by 'factor'."""
return round(number / factor) * factor
def ceil_by_factor(number: int, factor: int) -> int:
"""Returns the smallest integer greater than or equal to 'number' that is divisible by 'factor'."""
return math.ceil(number / factor) * factor
def floor_by_factor(number: int, factor: int) -> int:
"""Returns the largest integer less than or equal to 'number' that is divisible by 'factor'."""
return math.floor(number / factor) * factor
def linear_resize(
height: int, width: int, factor: int = IMAGE_FACTOR, min_pixels: int = MIN_PIXELS, max_pixels: int = MAX_PIXELS
) -> tuple[int, int]:
if width * height > max_pixels:
"""
如果图片超过/低于像素限制则计算一个缩放因子resize_factor使图片的像素数缩小到等于或小于max_pixels。这个缩放因子是通过开平方根计算的确保纵横比保持不变,这样原始的相对坐标可以不经转换直接复用
"""
resize_factor = math.sqrt(max_pixels / (width * height))
width, height = int(width * resize_factor), int(height * resize_factor)
if width * height < min_pixels:
resize_factor = math.sqrt(min_pixels / (width * height))
width, height = math.ceil(width * resize_factor), math.ceil(height * resize_factor)
return height, width
def smart_resize(
height: int, width: int, factor: int = IMAGE_FACTOR, min_pixels: int = MIN_PIXELS, max_pixels: int = MAX_PIXELS
) -> tuple[int, int]:
"""
Rescales the image so that the following conditions are met:
1. Both dimensions (height and width) are divisible by 'factor'.
2. The total number of pixels is within the range ['min_pixels', 'max_pixels'].
3. The aspect ratio of the image is maintained as closely as possible.
"""
if max(height, width) / min(height, width) > MAX_RATIO:
raise ValueError(
f"absolute aspect ratio must be smaller than {MAX_RATIO}, got {max(height, width) / min(height, width)}"
)
h_bar = max(factor, round_by_factor(height, factor))
w_bar = max(factor, round_by_factor(width, factor))
if h_bar * w_bar > max_pixels:
beta = math.sqrt((height * width) / max_pixels)
h_bar = floor_by_factor(height / beta, factor)
w_bar = floor_by_factor(width / beta, factor)
elif h_bar * w_bar < min_pixels:
beta = math.sqrt(min_pixels / (height * width))
h_bar = ceil_by_factor(height * beta, factor)
w_bar = ceil_by_factor(width * beta, factor)
return h_bar, w_bar
def parse_action_to_structure_output(text, factor, origin_resized_height, origin_resized_width, model_type, max_pixels=16384*28*28, min_pixels=100*28*28):
def parse_action_qwen2vl(text, factor, image_height, image_width):
text = text.strip()
if model_type == "qwen25vl":
smart_resize_height, smart_resize_width = smart_resize(origin_resized_height, origin_resized_width, factor=IMAGE_FACTOR, min_pixels=min_pixels, max_pixels=max_pixels)
# 正则表达式匹配 Action 字符串
if text.startswith("Thought:"):
thought_pattern = r"Thought: (.+?)(?=\s*Action:|$)"
@@ -182,8 +116,10 @@ def parse_action_to_structure_output(text, factor, origin_resized_height, origin
thought_pattern = r"Action_Summary: (.+?)(?=\s*Action:|$)"
thought_hint = "Action_Summary: "
else:
thought_pattern = r"Thought: (.+?)(?=\s*Action:|$)"
thought_hint = "Thought: "
# 修复:当没有明确的"Thought:"标识时提取Action:之前的所有内容作为思考
thought_pattern = r"(.+?)(?=\s*Action:|$)"
thought_hint = ""
reflection, thought = None, None
thought_match = re.search(thought_pattern, text, re.DOTALL)
if thought_match:
@@ -218,7 +154,7 @@ def parse_action_to_structure_output(text, factor, origin_resized_height, origin
for action_instance, raw_str in zip(parsed_actions, all_action):
if action_instance == None:
print(f"Action can't parse: {raw_str}")
raise ValueError(f"Action can't parse: {raw_str}")
continue
action_type = action_instance["function"]
params = action_instance["args"]
@@ -236,18 +172,7 @@ def parse_action_to_structure_output(text, factor, origin_resized_height, origin
numbers = ori_box.replace("(", "").replace(")", "").split(",")
# Convert to float and scale by 1000
# Qwen2.5vl output absolute coordinates, qwen2vl output relative coordinates
if model_type == "qwen25vl":
float_numbers = []
for num_idx, num in enumerate(numbers):
num = float(num)
if (num_idx + 1) % 2 == 0:
float_numbers.append(float(num/smart_resize_height))
else:
float_numbers.append(float(num/smart_resize_width))
else:
float_numbers = [float(num) / factor for num in numbers]
float_numbers = [float(num) / factor for num in numbers]
if len(float_numbers) == 2:
float_numbers = [float_numbers[0], float_numbers[1], float_numbers[0], float_numbers[1]]
action_inputs[param_name.strip()] = str(float_numbers)
@@ -296,7 +221,7 @@ def parsing_response_to_pyautogui_code(responses, image_height: int, image_width
if response_id == 0:
pyautogui_code += f"'''\nObservation:\n{observation}\n\nThought:\n{thought}\n'''\n"
else:
pyautogui_code += f"\ntime.sleep(1)\n"
pyautogui_code += f"\ntime.sleep(3)\n"
action_dict = response
action_type = action_dict.get("action_type")
@@ -309,79 +234,25 @@ def parsing_response_to_pyautogui_code(responses, image_height: int, image_width
else:
hotkey = action_inputs.get("hotkey", "")
if hotkey == "arrowleft":
hotkey = "left"
elif hotkey == "arrowright":
hotkey = "right"
elif hotkey == "arrowup":
hotkey = "up"
elif hotkey == "arrowdown":
hotkey = "down"
if hotkey:
# Handle other hotkeys
keys = hotkey.split() # Split the keys by space
convert_keys = []
for key in keys:
if key == "space":
key = ' '
convert_keys.append(key)
pyautogui_code += f"\npyautogui.hotkey({', '.join([repr(k) for k in convert_keys])})"
pyautogui_code += f"\npyautogui.hotkey({', '.join([repr(k) for k in keys])})"
elif action_type == "press":
# Parsing press action
if "key" in action_inputs:
key_to_press = action_inputs.get("key", "")
else:
key_to_press = action_inputs.get("press", "")
if hotkey == "arrowleft":
hotkey = "left"
elif hotkey == "arrowright":
hotkey = "right"
elif hotkey == "arrowup":
hotkey = "up"
elif hotkey == "arrowdown":
hotkey = "down"
elif hotkey == "space":
hotkey = " "
if key_to_press:
# Simulate pressing a single key
pyautogui_code += f"\npyautogui.press({repr(key_to_press)})"
elif action_type == "keyup":
key_to_up = action_inputs.get("key", "")
pyautogui_code += f"\npyautogui.keyUp({repr(key_to_up)})"
elif action_type == "keydown":
key_to_down = action_inputs.get("key", "")
pyautogui_code += f"\npyautogui.keyDown({repr(key_to_down)})"
elif action_type == "type":
# Parsing typing action using clipboard
content = action_inputs.get("content", "")
content = escape_single_quotes(content)
stripped_content = content
if content.endswith("\n") or content.endswith("\\n"):
stripped_content = stripped_content.rstrip("\\n").rstrip("\n")
if content:
if input_swap:
pyautogui_code += f"\nimport pyperclip"
pyautogui_code += f"\npyperclip.copy('{stripped_content}')"
pyautogui_code += f"\npyperclip.copy('{content.strip()}')"
pyautogui_code += f"\npyautogui.hotkey('ctrl', 'v')"
pyautogui_code += f"\ntime.sleep(0.5)\n"
if content.endswith("\n") or content.endswith("\\n"):
pyautogui_code += f"\npyautogui.press('enter')"
else:
pyautogui_code += f"\npyautogui.write('{stripped_content}', interval=0.1)"
pyautogui_code += f"\npyautogui.write('{content.strip()}', interval=0.1)"
pyautogui_code += f"\ntime.sleep(0.5)\n"
if content.endswith("\n") or content.endswith("\\n"):
pyautogui_code += f"\npyautogui.press('enter')"
@@ -460,29 +331,6 @@ def parsing_response_to_pyautogui_code(responses, image_height: int, image_width
return pyautogui_code
def add_box_token(input_string):
# Step 1: Split the string into individual actions
if "Action: " in input_string and "start_box=" in input_string:
suffix = input_string.split("Action: ")[0] + "Action: "
actions = input_string.split("Action: ")[1:]
processed_actions = []
for action in actions:
action = action.strip()
# Step 2: Extract coordinates (start_box or end_box) using regex
coordinates = re.findall(r"(start_box|end_box)='\((\d+),\s*(\d+)\)'", action)
updated_action = action # Start with the original action
for coord_type, x, y in coordinates:
# Convert x and y to integers
updated_action = updated_action.replace(f"{coord_type}='({x},{y})'", f"{coord_type}='<|box_start|>({x},{y})<|box_end|>'")
processed_actions.append(updated_action)
# Step 5: Reconstruct the final string
final_string = suffix + "\n\n".join(processed_actions)
else:
final_string = input_string
return final_string
def pil_to_base64(image):
buffer = BytesIO()
image.save(buffer, format="PNG") # 你可以改成 "JPEG" 等格式
@@ -558,51 +406,48 @@ def trim_accessibility_tree(linearized_accessibility_tree, max_tokens):
class UITARSAgent:
def __init__(
self,
model: str,
platform="ubuntu",
max_tokens=1000,
top_p=0.9,
top_k=1.0,
temperature=0.0,
action_space="pyautogui",
observation_type="screenshot",
observation_type="screenshot_a11y_tree",
# observation_type can be in ["screenshot", "a11y_tree", "screenshot_a11y_tree", "som"]
max_trajectory_length=50,
a11y_tree_max_tokens=10000,
model_type="qwen25vl",
runtime_conf: dict = {
"infer_mode": "qwen25vl_normal",
"prompt_style": "qwen25vl_normal",
"infer_mode": "qwen2vl_user",
"prompt_style": "qwen2vl_user",
"input_swap": True,
"language": "Chinese",
"max_steps": 50,
"history_n": 5,
"max_pixels": 16384*28*28,
"min_pixels": 100*28*28,
"callusr_tolerance": 3,
"temperature": 0.0,
"top_k": -1,
"top_p": 0.9,
"max_tokens": 500
"screen_height": 1080,
"screen_width": 1920
}
):
self.model = model
self.platform = platform
self.max_tokens = max_tokens
self.top_p = top_p
self.top_k = top_k
self.temperature = temperature
self.action_space = action_space
self.observation_type = observation_type
self.max_trajectory_length = max_trajectory_length
self.a11y_tree_max_tokens = a11y_tree_max_tokens
self.model_type = model_type
self.runtime_conf = runtime_conf
self.vlm = OpenAI(
base_url="http://127.0.0.1:8000/v1",
api_key="empty",
base_url=os.environ['DOUBAO_API_URL'],
api_key=os.environ['DOUBAO_API_KEY'],
) # should replace with your UI-TARS server api
self.temperature = self.runtime_conf["temperature"]
self.top_k = self.runtime_conf["top_k"]
self.top_p = self.runtime_conf["top_p"]
self.max_tokens = self.runtime_conf["max_tokens"]
self.infer_mode = self.runtime_conf["infer_mode"]
self.prompt_style = self.runtime_conf["prompt_style"]
self.input_swap = self.runtime_conf["input_swap"]
self.language = self.runtime_conf["language"]
self.max_pixels = self.runtime_conf["max_pixels"]
self.min_pixels = self.runtime_conf["min_pixels"]
self.callusr_tolerance = self.runtime_conf["callusr_tolerance"]
self.max_steps = max_trajectory_length
self.thoughts = []
self.actions = []
@@ -611,15 +456,14 @@ class UITARSAgent:
self.history_responses = []
self.prompt_action_space = UITARS_ACTION_SPACE
self.customize_action_parser = parse_action_qwen2vl
self.action_parse_res_factor = 1000
if self.infer_mode == "qwen2vl_user":
self.prompt_action_space = UITARS_CALL_USR_ACTION_SPACE
elif self.infer_mode == "qwen25vl_normal":
self.prompt_action_space = UITARS_NORMAL_ACTION_SPACE
self.prompt_template = UITARS_USR_PROMPT_THOUGHT
if self.prompt_style == "qwen2vl_user" or self.prompt_style == "qwen25vl_normal":
if self.prompt_style == "qwen2vl_user":
self.prompt_template = UITARS_USR_PROMPT_THOUGHT
elif self.prompt_style == "qwen2vl_no_thought":
@@ -630,8 +474,6 @@ class UITARSAgent:
self.history_n = self.runtime_conf["history_n"]
else:
self.history_n = 5
self.cur_callusr_count = 0
def predict(
self, instruction: str, obs: Dict, last_action_after_obs: Dict = None
@@ -660,18 +502,9 @@ class UITARSAgent:
_actions = self.actions
_thoughts = self.thoughts
for previous_obs, previous_action, previous_thought in zip(
_observations, _actions, _thoughts
):
# {{{1
if self.observation_type == "screenshot_a11y_tree":
_screenshot = previous_obs["screenshot"]
_linearized_accessibility_tree = previous_obs["accessibility_tree"]
else:
raise ValueError(
"Invalid observation_type type: " + self.observation_type
) # 1}}}
if last_action_after_obs is not None and self.infer_mode == "double_image":
self.history_images.append(last_action_after_obs["screenshot"])
self.history_images.append(obs["screenshot"])
@@ -712,7 +545,7 @@ class UITARSAgent:
"Invalid observation_type type: " + self.observation_type
) # 1}}}
if self.infer_mode == "qwen2vl_user" or self.infer_mode == "qwen25vl_normal":
if self.infer_mode == "qwen2vl_user":
user_prompt = self.prompt_template.format(
instruction=instruction,
action_space=self.prompt_action_space,
@@ -726,6 +559,8 @@ class UITARSAgent:
if len(self.history_images) > self.history_n:
self.history_images = self.history_images[-self.history_n:]
max_pixels = 2116800
min_pixels = 3136
messages, images = [], []
if isinstance(self.history_images, bytes):
self.history_images = [self.history_images]
@@ -735,24 +570,28 @@ class UITARSAgent:
pass
else:
raise TypeError(f"Unidentified images type: {type(self.history_images)}")
max_image_nums_under_32k = int(32768*0.75/max_pixels*28*28)
if len(self.history_images) > max_image_nums_under_32k:
num_of_images = min(5, len(self.history_images))
max_pixels = int(32768*0.75) // num_of_images
for turn, image in enumerate(self.history_images):
if len(images) >= self.history_n:
if len(images) >= 5:
break
try:
image = Image.open(BytesIO(image))
except Exception as e:
raise RuntimeError(f"Error opening image: {e}")
if image.width * image.height > self.max_pixels:
if image.width * image.height > max_pixels:
"""
如果图片超过/低于像素限制则计算一个缩放因子resize_factor使图片的像素数缩小到等于或小于max_pixels。这个缩放因子是通过开平方根计算的确保纵横比保持不变,这样原始的相对坐标可以不经转换直接复用
"""
resize_factor = math.sqrt(self.max_pixels / (image.width * image.height))
resize_factor = math.sqrt(max_pixels / (image.width * image.height))
width, height = int(image.width * resize_factor), int(image.height * resize_factor)
image = image.resize((width, height))
if image.width * image.height < self.min_pixels:
resize_factor = math.sqrt(self.min_pixels / (image.width * image.height))
if image.width * image.height < min_pixels:
resize_factor = math.sqrt(min_pixels / (image.width * image.height))
width, height = math.ceil(image.width * resize_factor), math.ceil(image.height * resize_factor)
image = image.resize((width, height))
@@ -788,7 +627,7 @@ class UITARSAgent:
messages.append({
"role": "assistant",
"content": [add_box_token(history_response)]
"content": history_response
})
cur_image = images[image_num]
@@ -809,79 +648,59 @@ class UITARSAgent:
image_num += 1
try_times = 3
origin_resized_height = images[-1].height
origin_resized_width = images[-1].width
temperature = self.temperature
top_k = self.top_k
while True:
if try_times <= 0:
print(f"Reach max retry times to fetch response from client, as error flag.")
return "client error", ["DONE"], []
return "client error", ["DONE"]
try:
response = self.vlm.chat.completions.create(
model="ui-tars",
model=self.model,
messages=messages,
frequency_penalty=1,
max_tokens=self.max_tokens,
temperature=temperature,
temperature=self.temperature,
top_p=self.top_p
)
# print(response.choices[0].message.content)
prediction = response.choices[0].message.content.strip()
except Exception as e:
print(f"Error when fetching response from client, with response: {response}")
prediction = None
try_times -= 1
try:
parsed_responses = parse_action_to_structure_output(
print("Response:")
print(response.choices[0].message.content)
prediction = response.choices[0].message.content
parsed_responses = self.customize_action_parser(
prediction,
self.action_parse_res_factor,
origin_resized_height,
origin_resized_width,
self.model_type,
self.max_pixels,
self.min_pixels
self.runtime_conf["screen_height"],
self.runtime_conf["screen_width"]
)
break
except Exception as e:
print(f"Error when parsing response from client, with response: {response}")
# If fail to parse the model response, we use sampling parameters to avoid it
logger.exception(f"Error when fetching response from client, with response: {e}")
prediction = None
try_times -= 1
temperature = 1
top_k = -1
if prediction is None:
return "client error", ["DONE"]
self.history_responses.append(prediction)
self.thoughts.append(prediction)
try:
parsed_responses = parse_action_to_structure_output(
parsed_responses = self.customize_action_parser(
prediction,
self.action_parse_res_factor,
origin_resized_height,
origin_resized_width,
self.model_type,
self.max_pixels,
self.min_pixels
self.runtime_conf["screen_height"],
self.runtime_conf["screen_width"]
)
except Exception as e:
print(f"Parsing action error: {prediction}, with error:\n{e}")
return f"Parsing action error: {prediction}, with error:\n{e}", ["DONE"]
actions = []
last_image = Image.open(BytesIO(self.history_images[-1]))
obs_image_height = last_image.height
obs_image_width = last_image.width
for parsed_response in parsed_responses:
if "action_type" in parsed_response:
if parsed_response["action_type"] == FINISH_WORD:
self.actions.append(actions)
return prediction, ["DONE"]
elif parsed_response["action_type"] == WAIT_WORD:
@@ -893,18 +712,13 @@ class UITARSAgent:
return prediction, ["FAIL"]
elif parsed_response["action_type"] == CALL_USER:
if self.callusr_tolerance > self.cur_callusr_count:
self.actions.append(actions)
self.cur_callusr_count += 1
return prediction, ["WAIT"]
else:
self.actions.append(actions)
return prediction, ["FAIL"]
self.actions.append(actions)
return prediction, ["FAIL"]
pyautogui_code = parsing_response_to_pyautogui_code(
parsed_response,
obs_image_height,
obs_image_width,
self.runtime_conf["screen_height"],
self.runtime_conf["screen_width"],
self.input_swap
)
actions.append(pyautogui_code)
@@ -917,7 +731,6 @@ class UITARSAgent:
return prediction, actions
@backoff.on_exception(
backoff.constant,
# here you should add more model exceptions as you want,
@@ -947,4 +760,4 @@ class UITARSAgent:
self.actions = []
self.observations = []
self.history_images = []
self.history_responses = []
self.history_responses = []