From b25854edba55d21da0e9a64bbb1b1cc6e026d718 Mon Sep 17 00:00:00 2001 From: yuanmengqi Date: Sat, 26 Jul 2025 08:26:23 +0000 Subject: [PATCH] feat: introduce DummyAgent class for enhanced coordinate handling - Added DummyAgent class to facilitate coordinate generation and action assignment. - Updated GTA1Agent to utilize DummyAgent for improved planning and execution. - Increased max_steps and N_SEQ parameters for better performance. - Enhanced logging for planning and execution processes. - Maintained existing logic while integrating new functionality. --- mm_agents/gta1_agent.py | 337 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 332 insertions(+), 5 deletions(-) diff --git a/mm_agents/gta1_agent.py b/mm_agents/gta1_agent.py index aa34746..ecaba50 100644 --- a/mm_agents/gta1_agent.py +++ b/mm_agents/gta1_agent.py @@ -24,7 +24,7 @@ from google.api_core.exceptions import ( from requests.exceptions import SSLError import os from mm_agents.prompts import GTA1_PLANNER_SYSTEM_PROMPT, GTA1_GROUNDING_SYSTEM_PROMPT, GTA1_JUDGE_SYSTEM_PROMPT -from mm_agents.img_utils import smart_resize +from mm_agents.utils.qwen_vl_utils import smart_resize from pytesseract import Output import pytesseract import inspect @@ -451,6 +451,8 @@ class OSWorldACI: engine_params=engine_params_for_generation, system_prompt=self.PHRASE_TO_WORD_COORDS_PROMPT, ) + + self.dummy_agent = DummyAgent(platform=platform) # Given the state and worker's referring expression, use the grounding model to generate (x,y) def generate_coords(self, ref_expr: str, obs: Dict, request_vllm) -> List[int]: @@ -811,6 +813,314 @@ class OSWorldACI: """End the current task with a failure, and replan the whole task.""" return """FAIL""" +class DummyAgent: + def __init__( + self, + platform, + ): + self.platform = ( + platform # Dictates how the switch_applications agent action works. + ) + + self.width = 1 + self.height = 1 + + self.notes = [] + + self.coords1 = None + self.coords2 = None + + def generate_coords(self, ref_expr: str, obs: Dict) -> List[int]: + return 0,0 + + def generate_text_coords( + self, phrase: str, obs: Dict, alignment: str = "" + ) -> List[int]: + return 0,0 + + # Takes a description based action and assigns the coordinates for any coordinate based action + # Raises an error if function can't be parsed + def assign_coordinates(self, plan: str, obs: Dict): + + # Reset coords from previous action generation + self.coords1, self.coords2 = None, None + + try: + # Extract the function name and args + action = parse_single_code_from_string(plan.split("Grounded Action")[-1]) + function_name = re.match(r"(\w+\.\w+)\(", action).group(1) + args = self.parse_function_args(action) + except Exception as e: + raise RuntimeError(f"Error in parsing grounded action: {e}") from e + + # arg0 is a description + if ( + function_name in ["agent.click", "agent.type", "agent.scroll"] + and len(args) >= 1 + and args[0] != None + ): + self.coords1 = self.generate_coords(args[0], obs) + # arg0 and arg1 are descriptions + elif function_name == "agent.drag_and_drop" and len(args) >= 2: + self.coords1 = self.generate_coords(args[0], obs) + self.coords2 = self.generate_coords(args[1], obs) + # arg0 and arg1 are text phrases + elif function_name == "agent.highlight_text_span" and len(args) >= 2: + self.coords1 = self.generate_text_coords(args[0], obs, alignment="start") + self.coords2 = self.generate_text_coords(args[1], obs, alignment="end") + + # Resize from grounding model dim into OSWorld dim (1920 * 1080) + def resize_coordinates(self, coordinates: List[int]) -> List[int]: + return [ + round(coordinates[0] * self.width), + round(coordinates[1] * self.height), + ] + + # Given a generated ACI function, returns a list of argument values, where descriptions are at the front of the list + def parse_function_args(self, function: str) -> List[str]: + tree = ast.parse(function) + call_node = tree.body[0].value + + def safe_eval(node): + if isinstance( + node, ast.Constant + ): # Handles literals like numbers, strings, etc. + return node.value + else: + return ast.unparse(node) # Return as a string if not a literal + + positional_args = [safe_eval(arg) for arg in call_node.args] + keyword_args = {kw.arg: safe_eval(kw.value) for kw in call_node.keywords} + + res = [] + + for key, val in keyword_args.items(): + if "description" in key: + res.append(val) + + for arg in positional_args: + res.append(arg) + + return res + + def click( + self, + instruction: str, + num_clicks: int = 1, + button_type: str = "left", + hold_keys: List = [], + ): + """Click on the element + Args: + instruction:str, decribe the element you want to interact with in detail including the visual description and function description. And make it clear and concise. For example you can describe what the element looks like, and what will be the expected result when you interact with it. + num_clicks:int, number of times to click the element + button_type:str, which mouse button to press can be "left", "middle", or "right" + hold_keys:List, list of keys to hold while clicking + """ + x, y = self.resize_coordinates(self.coords1) + command = "import pyautogui; " + + # TODO: specified duration? + for k in hold_keys: + command += f"pyautogui.keyDown({repr(k)}); " + command += f"""import pyautogui; pyautogui.click({x}, {y}, clicks={num_clicks}, button={repr(button_type)}); """ + for k in hold_keys: + command += f"pyautogui.keyUp({repr(k)}); " + # Return pyautoguicode to click on the element + return command + + def switch_applications(self, app_code): + """Switch to a different application that is already open + Args: + app_code:str the code name of the application to switch to from the provided list of open applications + """ + if self.platform == "darwin": + return f"import pyautogui; import time; pyautogui.hotkey('command', 'space', interval=0.5); pyautogui.typewrite({repr(app_code)}); pyautogui.press('enter'); time.sleep(1.0)" + elif self.platform == "linux": + return UBUNTU_APP_SETUP.replace("APP_NAME", app_code) + elif self.platform == "windows": + return f"import pyautogui; import time; pyautogui.hotkey('win', 'd', interval=0.5); pyautogui.typewrite({repr(app_code)}); pyautogui.press('enter'); time.sleep(1.0)" + + def open(self, app_or_filename: str): + """Open any application or file with name app_or_filename. Use this action to open applications or files on the desktop, do not open manually. + Args: + app_or_filename:str, the name of the application or filename to open + """ + return f"import pyautogui; pyautogui.hotkey('win'); time.sleep(0.5); pyautogui.write({repr(app_or_filename)}); time.sleep(1.0); pyautogui.hotkey('enter'); time.sleep(0.5)" + + def type( + self, + element_description: Optional[str] = None, + text: str = "", + overwrite: bool = False, + enter: bool = False, + ): + """Type text into a specific element + Args: + element_description:str, a detailed description of which element to enter text in. This description should be at least a full sentence. + text:str, the text to type + overwrite:bool, Assign it to True if the text should overwrite the existing text, otherwise assign it to False. Using this argument clears all text in an element. + enter:bool, Assign it to True if the enter key should be pressed after typing the text, otherwise assign it to False. + """ + + if self.coords1 is not None: + # If a node is found, retrieve its coordinates and size + # Start typing at the center of the element + + x, y = self.resize_coordinates(self.coords1) + + command = "import pyautogui; " + command += f"pyautogui.click({x}, {y}); " + + if overwrite: + command += ( + f"pyautogui.hotkey('ctrl', 'a'); pyautogui.press('backspace'); " + ) + + command += f"pyautogui.write({repr(text)}); " + + if enter: + command += "pyautogui.press('enter'); " + else: + # If no element is found, start typing at the current cursor location + command = "import pyautogui; " + + if overwrite: + command += ( + f"pyautogui.hotkey('ctrl', 'a'); pyautogui.press('backspace'); " + ) + + command += f"pyautogui.write({repr(text)}); " + + if enter: + command += "pyautogui.press('enter'); " + + return command + + def drag_and_drop( + self, starting_description: str, ending_description: str, hold_keys: List = [] + ): + """Drag from the starting description to the ending description + Args: + starting_description:str, a very detailed description of where to start the drag action. This description should be at least a full sentence. And make it clear and concise. + ending_description:str, a very detailed description of where to end the drag action. This description should be at least a full sentence. And make it clear and concise. + hold_keys:List list of keys to hold while dragging + """ + x1, y1 = self.resize_coordinates(self.coords1) + x2, y2 = self.resize_coordinates(self.coords2) + + command = "import pyautogui; " + + command += f"pyautogui.moveTo({x1}, {y1}); " + # TODO: specified duration? + for k in hold_keys: + command += f"pyautogui.keyDown({repr(k)}); " + command += f"pyautogui.dragTo({x2}, {y2}, duration=1.); pyautogui.mouseUp(); " + for k in hold_keys: + command += f"pyautogui.keyUp({repr(k)}); " + + # Return pyautoguicode to drag and drop the elements + + return command + + def highlight_text_span(self, starting_phrase: str, ending_phrase: str): + """Highlight a text span between a provided starting phrase and ending phrase. Use this to highlight words, lines, and paragraphs. + Args: + starting_phrase:str, the phrase that denotes the start of the text span you want to highlight. If you only want to highlight one word, just pass in that single word. + ending_phrase:str, the phrase that denotes the end of the text span you want to highlight. If you only want to highlight one word, just pass in that single word. + """ + + x1, y1 = self.coords1 + x2, y2 = self.coords2 + + command = "import pyautogui; " + command += f"pyautogui.moveTo({x1}, {y1}); " + command += f"pyautogui.dragTo({x2}, {y2}, duration=1.); pyautogui.mouseUp(); " + + # Return pyautoguicode to drag and drop the elements + return command + + def set_cell_values( + self, cell_values: Dict[str, Any], app_name: str, sheet_name: str + ): + """Use this to set individual cell values in a spreadsheet. For example, setting A2 to "hello" would be done by passing {"A2": "hello"} as cell_values. The sheet must be opened before this command can be used. + Args: + cell_values: Dict[str, Any], A dictionary of cell values to set in the spreadsheet. The keys are the cell coordinates in the format "A1", "B2", etc. + Supported value types include: float, int, string, bool, formulas. + app_name: str, The name of the spreadsheet application. For example, "Some_sheet.xlsx". + sheet_name: str, The name of the sheet in the spreadsheet. For example, "Sheet1". + """ + return SET_CELL_VALUES_CMD.format( + cell_values=cell_values, app_name=app_name, sheet_name=sheet_name + ) + + def scroll(self, instruction: str, clicks: int, shift: bool = False): + """Scroll the element in the specified direction + Args: + instruction:str, a very detailed description of which element to enter scroll in. This description should be at least a full sentence. And make it clear and concise. + clicks:int, the number of clicks to scroll can be positive (up) or negative (down). + shift:bool, whether to use shift+scroll for horizontal scrolling + """ + + x, y = self.resize_coordinates(self.coords1) + + if shift: + return f"import pyautogui; import time; pyautogui.moveTo({x}, {y}); time.sleep(0.5); pyautogui.hscroll({clicks})" + else: + return f"import pyautogui; import time; pyautogui.moveTo({x}, {y}); time.sleep(0.5); pyautogui.vscroll({clicks})" + + def hotkey(self, keys: List): + """Press a hotkey combination + Args: + keys:List the keys to press in combination in a list format (e.g. ['ctrl', 'c']) + """ + # add quotes around the keys + keys = [f"'{key}'" for key in keys] + return f"import pyautogui; pyautogui.hotkey({', '.join(keys)})" + + def hold_and_press(self, hold_keys: List, press_keys: List): + """Hold a list of keys and press a list of keys + Args: + hold_keys:List, list of keys to hold + press_keys:List, list of keys to press in a sequence + """ + + press_keys_str = "[" + ", ".join([f"'{key}'" for key in press_keys]) + "]" + command = "import pyautogui; " + for k in hold_keys: + command += f"pyautogui.keyDown({repr(k)}); " + command += f"pyautogui.press({press_keys_str}); " + for k in hold_keys: + command += f"pyautogui.keyUp({repr(k)}); " + + return command + + def wait(self, time: float): + """Wait for a specified amount of time + Args: + time:float the amount of time to wait in seconds + """ + return f"""import time; time.sleep({time})""" + + def done( + self, + return_value: Optional[Union[Dict, str, List, Tuple, int, float, bool]] = None, + ): + """End the current task with a success and the required return value""" + self.returned_info = return_value + return """DONE""" + + def fail(self): + """End the current task with a failure, and replan the whole task.""" + return """FAIL""" + + def run_python(self,code): + return code + + def fast_open_terminal(self, *args,**kwargs): + app_or_filename='terminal' + return f"import time; import pyautogui; pyautogui.hotkey('ctrl', 's'); time.sleep(0.5); pyautogui.hotkey('alt', 'f4'); time.sleep(0.5); pyautogui.hotkey('win'); time.sleep(0.5); pyautogui.write({repr(app_or_filename)}); time.sleep(1.0); pyautogui.hotkey('enter'); time.sleep(0.5)" + def call_llm_safe(agent): ''' functions borrow from https://github.com/simular-ai/Agent-S/blob/a0c5c9bf0c526119b1f023c8948563c780729428/gui_agents/s2/utils/common_utils.py#L27 @@ -887,9 +1197,9 @@ class GTA1Agent: temperature= 0.0, action_space="pyautogui", observation_type="screenshot", - max_steps=15, + max_steps=100, max_image_history_length = 5, - N_SEQ = 4, + N_SEQ = 8, ): self.platform = platform self.max_tokens = max_tokens @@ -985,6 +1295,7 @@ class GTA1Agent: N = self.N_SEQ + logger.info(f"Executing planning") planner_response = [] for bn in split_to_batches(N, batch_size=8): planner_response_ = self.call_llm({ @@ -1003,6 +1314,7 @@ class GTA1Agent: retry_count = 0 max_retries = 5 while N > 0: + logger.info(f"Executing planning {retry_count}") if retry_count >= max_retries: break @@ -1031,8 +1343,9 @@ class GTA1Agent: valid_responses.extend(valid_responses_) retry_count += 1 - assert len(valid_responses) > int(self.N_SEQ) * 0.5, f"Not enough valid responses generated {len(valid_responses)}" + assert len(valid_responses) > int(self.N_SEQ) * 0.8, f"Not enough valid responses generated {len(valid_responses)}" + logger.info(f"Executing selection") if self.N_SEQ > 1: history_cache = [f"Observation:\n{o}\nThought:\n{t}\nAction:\n{a}" for a,t,o in zip(self.actions, self.thoughts, self.observation_captions)] planner_response = self.select(instruction, Image.open(BytesIO(obs['screenshot'])), valid_responses, history_cache) @@ -1096,7 +1409,7 @@ class GTA1Agent: x = x/W y = y/H return x,y - + logger.info(f"Executing grounding") agent.assign_coordinates(planner_response, obs, request_vllm) plan_code = extract_first_agent_function("\n".join(codes)) @@ -1185,7 +1498,18 @@ class GTA1Agent: return response[0] def isvalid(self,planner_response): + try: + agent.dummy_agent.assign_coordinates(planner_response, {"screenshot": None}) + except: + return False codes = self.parse_code_from_planner_response(planner_response) + try: + test_code = extract_first_agent_function("\n".join(codes)) + test_code = "agent.dummy_agent." + test_code[6:] + eval(test_code) + except Exception as e: + #print("Invalid code:", [test_code], str(e), "!!!") + return False thought = self.parse_thought_from_planner_response(planner_response) observation_caption = self.parse_observation_caption_from_planner_response(planner_response) return bool(codes and thought and observation_caption) @@ -1272,12 +1596,15 @@ class GTA1Agent: raise SystemExit def reset(self, _logger=None): + global logger + logger = _logger if _logger is not None else logging.getLogger("desktopenv.agent") self.thoughts = [] self.action_descriptions = [] self.actions = [] self.observations = [] self.observation_captions = [] + self.current_step = 1