feat: introduce DummyAgent class for enhanced coordinate handling

- Added DummyAgent class to facilitate coordinate generation and action assignment.
- Updated GTA1Agent to utilize DummyAgent for improved planning and execution.
- Increased max_steps and N_SEQ parameters for better performance.
- Enhanced logging for planning and execution processes.
- Maintained existing logic while integrating new functionality.
This commit is contained in:
yuanmengqi
2025-07-26 08:26:23 +00:00
parent d49ca9cc2d
commit b25854edba

View File

@@ -24,7 +24,7 @@ from google.api_core.exceptions import (
from requests.exceptions import SSLError
import os
from mm_agents.prompts import GTA1_PLANNER_SYSTEM_PROMPT, GTA1_GROUNDING_SYSTEM_PROMPT, GTA1_JUDGE_SYSTEM_PROMPT
from mm_agents.img_utils import smart_resize
from mm_agents.utils.qwen_vl_utils import smart_resize
from pytesseract import Output
import pytesseract
import inspect
@@ -451,6 +451,8 @@ class OSWorldACI:
engine_params=engine_params_for_generation,
system_prompt=self.PHRASE_TO_WORD_COORDS_PROMPT,
)
self.dummy_agent = DummyAgent(platform=platform)
# Given the state and worker's referring expression, use the grounding model to generate (x,y)
def generate_coords(self, ref_expr: str, obs: Dict, request_vllm) -> List[int]:
@@ -811,6 +813,314 @@ class OSWorldACI:
"""End the current task with a failure, and replan the whole task."""
return """FAIL"""
class DummyAgent:
def __init__(
self,
platform,
):
self.platform = (
platform # Dictates how the switch_applications agent action works.
)
self.width = 1
self.height = 1
self.notes = []
self.coords1 = None
self.coords2 = None
def generate_coords(self, ref_expr: str, obs: Dict) -> List[int]:
return 0,0
def generate_text_coords(
self, phrase: str, obs: Dict, alignment: str = ""
) -> List[int]:
return 0,0
# Takes a description based action and assigns the coordinates for any coordinate based action
# Raises an error if function can't be parsed
def assign_coordinates(self, plan: str, obs: Dict):
# Reset coords from previous action generation
self.coords1, self.coords2 = None, None
try:
# Extract the function name and args
action = parse_single_code_from_string(plan.split("Grounded Action")[-1])
function_name = re.match(r"(\w+\.\w+)\(", action).group(1)
args = self.parse_function_args(action)
except Exception as e:
raise RuntimeError(f"Error in parsing grounded action: {e}") from e
# arg0 is a description
if (
function_name in ["agent.click", "agent.type", "agent.scroll"]
and len(args) >= 1
and args[0] != None
):
self.coords1 = self.generate_coords(args[0], obs)
# arg0 and arg1 are descriptions
elif function_name == "agent.drag_and_drop" and len(args) >= 2:
self.coords1 = self.generate_coords(args[0], obs)
self.coords2 = self.generate_coords(args[1], obs)
# arg0 and arg1 are text phrases
elif function_name == "agent.highlight_text_span" and len(args) >= 2:
self.coords1 = self.generate_text_coords(args[0], obs, alignment="start")
self.coords2 = self.generate_text_coords(args[1], obs, alignment="end")
# Resize from grounding model dim into OSWorld dim (1920 * 1080)
def resize_coordinates(self, coordinates: List[int]) -> List[int]:
return [
round(coordinates[0] * self.width),
round(coordinates[1] * self.height),
]
# Given a generated ACI function, returns a list of argument values, where descriptions are at the front of the list
def parse_function_args(self, function: str) -> List[str]:
tree = ast.parse(function)
call_node = tree.body[0].value
def safe_eval(node):
if isinstance(
node, ast.Constant
): # Handles literals like numbers, strings, etc.
return node.value
else:
return ast.unparse(node) # Return as a string if not a literal
positional_args = [safe_eval(arg) for arg in call_node.args]
keyword_args = {kw.arg: safe_eval(kw.value) for kw in call_node.keywords}
res = []
for key, val in keyword_args.items():
if "description" in key:
res.append(val)
for arg in positional_args:
res.append(arg)
return res
def click(
self,
instruction: str,
num_clicks: int = 1,
button_type: str = "left",
hold_keys: List = [],
):
"""Click on the element
Args:
instruction:str, decribe the element you want to interact with in detail including the visual description and function description. And make it clear and concise. For example you can describe what the element looks like, and what will be the expected result when you interact with it.
num_clicks:int, number of times to click the element
button_type:str, which mouse button to press can be "left", "middle", or "right"
hold_keys:List, list of keys to hold while clicking
"""
x, y = self.resize_coordinates(self.coords1)
command = "import pyautogui; "
# TODO: specified duration?
for k in hold_keys:
command += f"pyautogui.keyDown({repr(k)}); "
command += f"""import pyautogui; pyautogui.click({x}, {y}, clicks={num_clicks}, button={repr(button_type)}); """
for k in hold_keys:
command += f"pyautogui.keyUp({repr(k)}); "
# Return pyautoguicode to click on the element
return command
def switch_applications(self, app_code):
"""Switch to a different application that is already open
Args:
app_code:str the code name of the application to switch to from the provided list of open applications
"""
if self.platform == "darwin":
return f"import pyautogui; import time; pyautogui.hotkey('command', 'space', interval=0.5); pyautogui.typewrite({repr(app_code)}); pyautogui.press('enter'); time.sleep(1.0)"
elif self.platform == "linux":
return UBUNTU_APP_SETUP.replace("APP_NAME", app_code)
elif self.platform == "windows":
return f"import pyautogui; import time; pyautogui.hotkey('win', 'd', interval=0.5); pyautogui.typewrite({repr(app_code)}); pyautogui.press('enter'); time.sleep(1.0)"
def open(self, app_or_filename: str):
"""Open any application or file with name app_or_filename. Use this action to open applications or files on the desktop, do not open manually.
Args:
app_or_filename:str, the name of the application or filename to open
"""
return f"import pyautogui; pyautogui.hotkey('win'); time.sleep(0.5); pyautogui.write({repr(app_or_filename)}); time.sleep(1.0); pyautogui.hotkey('enter'); time.sleep(0.5)"
def type(
self,
element_description: Optional[str] = None,
text: str = "",
overwrite: bool = False,
enter: bool = False,
):
"""Type text into a specific element
Args:
element_description:str, a detailed description of which element to enter text in. This description should be at least a full sentence.
text:str, the text to type
overwrite:bool, Assign it to True if the text should overwrite the existing text, otherwise assign it to False. Using this argument clears all text in an element.
enter:bool, Assign it to True if the enter key should be pressed after typing the text, otherwise assign it to False.
"""
if self.coords1 is not None:
# If a node is found, retrieve its coordinates and size
# Start typing at the center of the element
x, y = self.resize_coordinates(self.coords1)
command = "import pyautogui; "
command += f"pyautogui.click({x}, {y}); "
if overwrite:
command += (
f"pyautogui.hotkey('ctrl', 'a'); pyautogui.press('backspace'); "
)
command += f"pyautogui.write({repr(text)}); "
if enter:
command += "pyautogui.press('enter'); "
else:
# If no element is found, start typing at the current cursor location
command = "import pyautogui; "
if overwrite:
command += (
f"pyautogui.hotkey('ctrl', 'a'); pyautogui.press('backspace'); "
)
command += f"pyautogui.write({repr(text)}); "
if enter:
command += "pyautogui.press('enter'); "
return command
def drag_and_drop(
self, starting_description: str, ending_description: str, hold_keys: List = []
):
"""Drag from the starting description to the ending description
Args:
starting_description:str, a very detailed description of where to start the drag action. This description should be at least a full sentence. And make it clear and concise.
ending_description:str, a very detailed description of where to end the drag action. This description should be at least a full sentence. And make it clear and concise.
hold_keys:List list of keys to hold while dragging
"""
x1, y1 = self.resize_coordinates(self.coords1)
x2, y2 = self.resize_coordinates(self.coords2)
command = "import pyautogui; "
command += f"pyautogui.moveTo({x1}, {y1}); "
# TODO: specified duration?
for k in hold_keys:
command += f"pyautogui.keyDown({repr(k)}); "
command += f"pyautogui.dragTo({x2}, {y2}, duration=1.); pyautogui.mouseUp(); "
for k in hold_keys:
command += f"pyautogui.keyUp({repr(k)}); "
# Return pyautoguicode to drag and drop the elements
return command
def highlight_text_span(self, starting_phrase: str, ending_phrase: str):
"""Highlight a text span between a provided starting phrase and ending phrase. Use this to highlight words, lines, and paragraphs.
Args:
starting_phrase:str, the phrase that denotes the start of the text span you want to highlight. If you only want to highlight one word, just pass in that single word.
ending_phrase:str, the phrase that denotes the end of the text span you want to highlight. If you only want to highlight one word, just pass in that single word.
"""
x1, y1 = self.coords1
x2, y2 = self.coords2
command = "import pyautogui; "
command += f"pyautogui.moveTo({x1}, {y1}); "
command += f"pyautogui.dragTo({x2}, {y2}, duration=1.); pyautogui.mouseUp(); "
# Return pyautoguicode to drag and drop the elements
return command
def set_cell_values(
self, cell_values: Dict[str, Any], app_name: str, sheet_name: str
):
"""Use this to set individual cell values in a spreadsheet. For example, setting A2 to "hello" would be done by passing {"A2": "hello"} as cell_values. The sheet must be opened before this command can be used.
Args:
cell_values: Dict[str, Any], A dictionary of cell values to set in the spreadsheet. The keys are the cell coordinates in the format "A1", "B2", etc.
Supported value types include: float, int, string, bool, formulas.
app_name: str, The name of the spreadsheet application. For example, "Some_sheet.xlsx".
sheet_name: str, The name of the sheet in the spreadsheet. For example, "Sheet1".
"""
return SET_CELL_VALUES_CMD.format(
cell_values=cell_values, app_name=app_name, sheet_name=sheet_name
)
def scroll(self, instruction: str, clicks: int, shift: bool = False):
"""Scroll the element in the specified direction
Args:
instruction:str, a very detailed description of which element to enter scroll in. This description should be at least a full sentence. And make it clear and concise.
clicks:int, the number of clicks to scroll can be positive (up) or negative (down).
shift:bool, whether to use shift+scroll for horizontal scrolling
"""
x, y = self.resize_coordinates(self.coords1)
if shift:
return f"import pyautogui; import time; pyautogui.moveTo({x}, {y}); time.sleep(0.5); pyautogui.hscroll({clicks})"
else:
return f"import pyautogui; import time; pyautogui.moveTo({x}, {y}); time.sleep(0.5); pyautogui.vscroll({clicks})"
def hotkey(self, keys: List):
"""Press a hotkey combination
Args:
keys:List the keys to press in combination in a list format (e.g. ['ctrl', 'c'])
"""
# add quotes around the keys
keys = [f"'{key}'" for key in keys]
return f"import pyautogui; pyautogui.hotkey({', '.join(keys)})"
def hold_and_press(self, hold_keys: List, press_keys: List):
"""Hold a list of keys and press a list of keys
Args:
hold_keys:List, list of keys to hold
press_keys:List, list of keys to press in a sequence
"""
press_keys_str = "[" + ", ".join([f"'{key}'" for key in press_keys]) + "]"
command = "import pyautogui; "
for k in hold_keys:
command += f"pyautogui.keyDown({repr(k)}); "
command += f"pyautogui.press({press_keys_str}); "
for k in hold_keys:
command += f"pyautogui.keyUp({repr(k)}); "
return command
def wait(self, time: float):
"""Wait for a specified amount of time
Args:
time:float the amount of time to wait in seconds
"""
return f"""import time; time.sleep({time})"""
def done(
self,
return_value: Optional[Union[Dict, str, List, Tuple, int, float, bool]] = None,
):
"""End the current task with a success and the required return value"""
self.returned_info = return_value
return """DONE"""
def fail(self):
"""End the current task with a failure, and replan the whole task."""
return """FAIL"""
def run_python(self,code):
return code
def fast_open_terminal(self, *args,**kwargs):
app_or_filename='terminal'
return f"import time; import pyautogui; pyautogui.hotkey('ctrl', 's'); time.sleep(0.5); pyautogui.hotkey('alt', 'f4'); time.sleep(0.5); pyautogui.hotkey('win'); time.sleep(0.5); pyautogui.write({repr(app_or_filename)}); time.sleep(1.0); pyautogui.hotkey('enter'); time.sleep(0.5)"
def call_llm_safe(agent):
'''
functions borrow from https://github.com/simular-ai/Agent-S/blob/a0c5c9bf0c526119b1f023c8948563c780729428/gui_agents/s2/utils/common_utils.py#L27
@@ -887,9 +1197,9 @@ class GTA1Agent:
temperature= 0.0,
action_space="pyautogui",
observation_type="screenshot",
max_steps=15,
max_steps=100,
max_image_history_length = 5,
N_SEQ = 4,
N_SEQ = 8,
):
self.platform = platform
self.max_tokens = max_tokens
@@ -985,6 +1295,7 @@ class GTA1Agent:
N = self.N_SEQ
logger.info(f"Executing planning")
planner_response = []
for bn in split_to_batches(N, batch_size=8):
planner_response_ = self.call_llm({
@@ -1003,6 +1314,7 @@ class GTA1Agent:
retry_count = 0
max_retries = 5
while N > 0:
logger.info(f"Executing planning {retry_count}")
if retry_count >= max_retries:
break
@@ -1031,8 +1343,9 @@ class GTA1Agent:
valid_responses.extend(valid_responses_)
retry_count += 1
assert len(valid_responses) > int(self.N_SEQ) * 0.5, f"Not enough valid responses generated {len(valid_responses)}"
assert len(valid_responses) > int(self.N_SEQ) * 0.8, f"Not enough valid responses generated {len(valid_responses)}"
logger.info(f"Executing selection")
if self.N_SEQ > 1:
history_cache = [f"Observation:\n{o}\nThought:\n{t}\nAction:\n{a}" for a,t,o in zip(self.actions, self.thoughts, self.observation_captions)]
planner_response = self.select(instruction, Image.open(BytesIO(obs['screenshot'])), valid_responses, history_cache)
@@ -1096,7 +1409,7 @@ class GTA1Agent:
x = x/W
y = y/H
return x,y
logger.info(f"Executing grounding")
agent.assign_coordinates(planner_response, obs, request_vllm)
plan_code = extract_first_agent_function("\n".join(codes))
@@ -1185,7 +1498,18 @@ class GTA1Agent:
return response[0]
def isvalid(self,planner_response):
try:
agent.dummy_agent.assign_coordinates(planner_response, {"screenshot": None})
except:
return False
codes = self.parse_code_from_planner_response(planner_response)
try:
test_code = extract_first_agent_function("\n".join(codes))
test_code = "agent.dummy_agent." + test_code[6:]
eval(test_code)
except Exception as e:
#print("Invalid code:", [test_code], str(e), "!!!")
return False
thought = self.parse_thought_from_planner_response(planner_response)
observation_caption = self.parse_observation_caption_from_planner_response(planner_response)
return bool(codes and thought and observation_caption)
@@ -1272,12 +1596,15 @@ class GTA1Agent:
raise SystemExit
def reset(self, _logger=None):
global logger
logger = _logger if _logger is not None else logging.getLogger("desktopenv.agent")
self.thoughts = []
self.action_descriptions = []
self.actions = []
self.observations = []
self.observation_captions = []
self.current_step = 1