From 0a5058342dfc0dcab4f19ffd6ffc4ce5bc25f06f Mon Sep 17 00:00:00 2001 From: Yan98 <291157173@qq.com> Date: Thu, 10 Jul 2025 00:29:42 +0800 Subject: [PATCH] init (#246) --- mm_agents/gat1_agent.py | 1387 +++++++++++++++++++++++++++++++++++++++ mm_agents/prompts.py | 179 +++++ 2 files changed, 1566 insertions(+) create mode 100644 mm_agents/gat1_agent.py diff --git a/mm_agents/gat1_agent.py b/mm_agents/gat1_agent.py new file mode 100644 index 0000000..4d621fc --- /dev/null +++ b/mm_agents/gat1_agent.py @@ -0,0 +1,1387 @@ +''' +The code is mainly based on: +- Jedi https://github.com/xlang-ai/OSWorld/blob/main/mm_agents/jedi_7b_agent.py +- AgentS2 https://github.com/simular-ai/Agent-S +''' +import base64 +import json +import logging +import os +import re +import time +from io import BytesIO + +import backoff +import openai +import requests +from PIL import Image +from google.api_core.exceptions import ( + InvalidArgument, + ResourceExhausted, + InternalServerError, + BadRequest, +) +from requests.exceptions import SSLError +import os +from mm_agents.prompts import GTA1_PLANNER_SYSTEM_PROMPT, GTA1_GROUNDING_SYSTEM_PROMPT, GTA1_JUDGE_SYSTEM_PROMPT +from mm_agents.img_utils import smart_resize +from pytesseract import Output +import pytesseract +import inspect +import textwrap +import ast +import re +from collections import defaultdict +from typing import Any, Dict, List, Optional, Tuple, Union +import numpy as np +from openai import OpenAI, APIConnectionError, APIError, RateLimitError +import cv2 + +logger = None + +OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY",None) #"Your OpenAI API Key" +GTA1_API_KEY = os.environ.get("GTA1_API_KEY",None) #"Your GTA1 API Key" +GTA1_MODEL_NMAE = os.environ.get("GTA1_API_KEY",None) #Your served model name +GTA1_SERVICE_URL = os.environ.get("GTA1_SERVICE_URL",None) #"Your GTA1 Service URL" +proxies = None # Your proxies + +def encode_image(image_content): + return base64.b64encode(image_content).decode("utf-8") + + +class LMMEngineOpenAI: + ''' + functions borrow from https://github.com/simular-ai/Agent-S/blob/main/gui_agents/s2/core/engine.py#L247 + ''' + def __init__( + self, base_url=None, api_key=None, model=None, rate_limit=-1, **kwargs + ): + assert model is not None, "model must be provided" + self.model = model + + api_key = api_key or os.getenv("OPENAI_API_KEY") + if api_key is None: + raise ValueError( + "An API Key needs to be provided in either the api_key parameter or as an environment variable named OPENAI_API_KEY" + ) + + self.base_url = base_url + + self.api_key = api_key + self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit + + if not self.base_url: + self.llm_client = OpenAI(api_key=self.api_key) + else: + self.llm_client = OpenAI(base_url=self.base_url, api_key=self.api_key) + + @backoff.on_exception( + backoff.expo, (APIConnectionError, APIError, RateLimitError), max_time=60 + ) + def generate(self, messages, temperature=0.0, max_new_tokens=None, **kwargs): + """Generate the next message based on previous messages""" + return ( + self.llm_client.chat.completions.create( + model=self.model, + messages=messages, + max_completion_tokens=max_new_tokens if max_new_tokens else 4096, + #temperature=temperature, + **kwargs, + ) + .choices[0] + .message.content + ) + +class LMMAgent: + ''' + functions borrow from https://github.com/simular-ai/Agent-S/blob/a0c5c9bf0c526119b1f023c8948563c780729428/gui_agents/s2/core/mllm.py#L16 + ''' + def __init__(self, engine_params=None, system_prompt=None, engine=None): + if engine is None: + if engine_params is not None: + engine_type = engine_params.get("engine_type") + if engine_type == "openai": + self.engine = LMMEngineOpenAI(**engine_params) + else: + raise ValueError("engine_type is not supported") + else: + raise ValueError("engine_params must be provided") + else: + self.engine = engine + + self.messages = [] + + if system_prompt: + self.add_system_prompt(system_prompt) + else: + self.add_system_prompt("You are a helpful assistant.") + + def encode_image(self, image_content): + # if image_content is a path to an image file, check type of the image_content to verify + if isinstance(image_content, str): + with open(image_content, "rb") as image_file: + return base64.b64encode(image_file.read()).decode("utf-8") + else: + return base64.b64encode(image_content).decode("utf-8") + + def reset( + self, + ): + + self.messages = [ + { + "role": "system", + "content": [{"type": "text", "text": self.system_prompt}], + } + ] + + def add_system_prompt(self, system_prompt): + self.system_prompt = system_prompt + if len(self.messages) > 0: + self.messages[0] = { + "role": "system", + "content": [{"type": "text", "text": self.system_prompt}], + } + else: + self.messages.append( + { + "role": "system", + "content": [{"type": "text", "text": self.system_prompt}], + } + ) + + def remove_message_at(self, index): + """Remove a message at a given index""" + if index < len(self.messages): + self.messages.pop(index) + + def replace_message_at( + self, index, text_content, image_content=None, image_detail="high" + ): + """Replace a message at a given index""" + if index < len(self.messages): + self.messages[index] = { + "role": self.messages[index]["role"], + "content": [{"type": "text", "text": text_content}], + } + if image_content: + base64_image = self.encode_image(image_content) + self.messages[index]["content"].append( + { + "type": "image_url", + "image_url": { + "url": f"data:image/png;base64,{base64_image}", + "detail": image_detail, + }, + } + ) + + def add_message( + self, + text_content, + image_content=None, + role=None, + image_detail="high", + put_text_last=False, + ): + """Add a new message to the list of messages""" + + # API-style inference from OpenAI and AzureOpenAI + if isinstance( + self.engine, + ( + LMMEngineOpenAI, + ), + ): + # infer role from previous message + if role != "user": + if self.messages[-1]["role"] == "system": + role = "user" + elif self.messages[-1]["role"] == "user": + role = "assistant" + elif self.messages[-1]["role"] == "assistant": + role = "user" + + message = { + "role": role, + "content": [{"type": "text", "text": text_content}], + } + + if isinstance(image_content, np.ndarray) or image_content: + # Check if image_content is a list or a single image + if isinstance(image_content, list): + # If image_content is a list of images, loop through each image + for image in image_content: + base64_image = self.encode_image(image) + message["content"].append( + { + "type": "image_url", + "image_url": { + "url": f"data:image/png;base64,{base64_image}", + "detail": image_detail, + }, + } + ) + else: + # If image_content is a single image, handle it directly + base64_image = self.encode_image(image_content) + message["content"].append( + { + "type": "image_url", + "image_url": { + "url": f"data:image/png;base64,{base64_image}", + "detail": image_detail, + }, + } + ) + + # Rotate text to be the last message if desired + if put_text_last: + text_content = message["content"].pop(0) + message["content"].append(text_content) + + self.messages.append(message) + else: + raise ValueError("engine_type is not supported") + + def get_response( + self, + user_message=None, + messages=None, + temperature=0.0, + max_new_tokens=None, + **kwargs, + ): + """Generate the next response based on previous messages""" + if messages is None: + messages = self.messages + if user_message: + messages.append( + {"role": "user", "content": [{"type": "text", "text": user_message}]} + ) + + return self.engine.generate( + messages, + temperature=temperature, + max_new_tokens=max_new_tokens, + **kwargs, + ) + +def agent_action(func): + func.is_agent_action = True + return func + + +UBUNTU_APP_SETUP = f"""import subprocess; +import difflib; +import pyautogui; +pyautogui.press('escape'); +time.sleep(0.5); +output = subprocess.check_output(['wmctrl', '-lx']); +output = output.decode('utf-8').splitlines(); +window_titles = [line.split(None, 4)[2] for line in output]; +closest_matches = difflib.get_close_matches('APP_NAME', window_titles, n=1, cutoff=0.1); +if closest_matches: + closest_match = closest_matches[0]; + for line in output: + if closest_match in line: + window_id = line.split()[0] + break; +subprocess.run(['wmctrl', '-ia', window_id]) +subprocess.run(['wmctrl', '-ir', window_id, '-b', 'add,maximized_vert,maximized_horz']) +""" + + +SET_CELL_VALUES_CMD = """import uno +import subprocess + +def identify_document_type(component): + if component.supportsService("com.sun.star.sheet.SpreadsheetDocument"): + return "Calc" + + if component.supportsService("com.sun.star.text.TextDocument"): + return "Writer" + + if component.supportsService("com.sun.star.sheet.PresentationDocument"): + return "Impress" + + return None + +def cell_ref_to_indices(cell_ref): + column_letters = ''.join(filter(str.isalpha, cell_ref)) + row_number = ''.join(filter(str.isdigit, cell_ref)) + + col = sum((ord(char.upper()) - ord('A') + 1) * (26**idx) for idx, char in enumerate(reversed(column_letters))) - 1 + row = int(row_number) - 1 + return col, row + +def set_cell_values(new_cell_values: dict[str, str], app_name: str = "Untitled 1", sheet_name: str = "Sheet1"): + new_cell_values_idx = {{}} + for k, v in new_cell_values.items(): + try: + col, row = cell_ref_to_indices(k) + except: + col = row = None + + if col is not None and row is not None: + new_cell_values_idx[(col, row)] = v + + # Clean up previous TCP connections. + subprocess.run( + 'echo \"password\" | sudo -S ss --kill --tcp state TIME-WAIT sport = :2002', + shell=True, + check=True, + text=True, + capture_output=True + ) + + # Dynamically allow soffice to listen on port 2002. + subprocess.run( + [ + "soffice", + "--accept=socket,host=localhost,port=2002;urp;StarOffice.Service" + ] + ) + + local_context = uno.getComponentContext() + resolver = local_context.ServiceManager.createInstanceWithContext( + "com.sun.star.bridge.UnoUrlResolver", local_context + ) + context = resolver.resolve( + f"uno:socket,host=localhost,port=2002;urp;StarOffice.ComponentContext" + ) + desktop = context.ServiceManager.createInstanceWithContext( + "com.sun.star.frame.Desktop", context + ) + + # Collect all LibreOffice-related opened windows. + documents = [] + for i, component in enumerate(desktop.Components): + title = component.Title + doc_type = identify_document_type(component) + documents.append((i, component, title, doc_type)) + + # Find the LibreOffice Calc app and the sheet of interest. + spreadsheet = [doc for doc in documents if doc[3] == "Calc"] + selected_spreadsheet = [doc for doc in spreadsheet if doc[2] == app_name] + if spreadsheet: + try: + if selected_spreadsheet: + spreadsheet = selected_spreadsheet[0][1] + else: + spreadsheet = spreadsheet[0][1] + + sheet = spreadsheet.Sheets.getByName(sheet_name) + except: + raise ValueError(f"Could not find sheet {{sheet_name}} in {{app_name}}.") + + for (col, row), value in new_cell_values_idx.items(): + cell = sheet.getCellByPosition(col, row) + + # Set the cell value. + if isinstance(value, (int, float)): + cell.Value = value + elif isinstance(value, str): + if value.startswith("="): + cell.Formula = value + else: + cell.String = value + elif isinstance(value, bool): + cell.Value = 1 if value else 0 + elif value is None: + cell.clearContents(0) + else: + raise ValueError(f"Unsupported cell value type: {{type(value)}}") + + else: + raise ValueError(f"Could not find LibreOffice Calc app corresponding to {{app_name}}.") + +set_cell_values(new_cell_values={cell_values}, app_name="{app_name}", sheet_name="{sheet_name}") +""" + + +class OSWorldACI: + ''' + classes borrow from https://github.com/simular-ai/Agent-S/blob/a0c5c9bf0c526119b1f023c8948563c780729428/gui_agents/s2/agents/grounding.py#L159 + ''' + PHRASE_TO_WORD_COORDS_PROMPT = textwrap.dedent( + """ + You are an expert in graphical user interfaces. Your task is to process a phrase of text, and identify the most relevant word on the computer screen. + You are provided with a phrase, a table with all the text on the screen, and a screenshot of the computer screen. You will identify the single word id that is best associated with the provided phrase. + This single word must be displayed on the computer screenshot, and its location on the screen should align with the provided phrase. + Each row in the text table provides 2 pieces of data in the following order. 1st is the unique word id. 2nd is the corresponding word. + + To be successful, it is very important to follow all these rules: + 1. First, think step by step and generate your reasoning about which word id to click on. + 2. Then, output the unique word id. Remember, the word id is the 1st number in each row of the text table. + 3. If there are multiple occurrences of the same word, use the surrounding context in the phrase to choose the correct one. Pay very close attention to punctuation and capitalization. + + """ + ) + def __init__( + self, + platform: 'linux', + width: int = 1920, + height: int = 1080, + ): + self.platform = ( + platform # Dictates how the switch_applications agent action works. + ) + + engine_params_for_generation = engine_params = { + "engine_type": 'openai', + "model": 'o3', + "base_url": '', + "api_key": os.environ.get("OPENAI_API_KEY", ""), + } + + # Configure scaling + self.width = width + self.height = height + + # Maintain state for save_to_knowledge + self.notes = [] + + # Coordinates used during ACI execution + self.coords1 = None + self.coords2 = None + + # Configure text grounding agent + self.text_span_agent = LMMAgent( + engine_params=engine_params_for_generation, + system_prompt=self.PHRASE_TO_WORD_COORDS_PROMPT, + ) + + # Given the state and worker's referring expression, use the grounding model to generate (x,y) + def generate_coords(self, ref_expr: str, obs: Dict, request_vllm) -> List[int]: + return request_vllm(image=obs["screenshot"], prompt=ref_expr) + + # Calls pytesseract to generate word level bounding boxes for text grounding + def get_ocr_elements(self, b64_image_data: str) -> Tuple[str, List]: + image = Image.open(BytesIO(b64_image_data)) + image_data = pytesseract.image_to_data(image, output_type=Output.DICT) + + # Clean text by removing leading and trailing spaces and non-alphabetical characters, but keeping punctuation + for i, word in enumerate(image_data["text"]): + image_data["text"][i] = re.sub( + r"^[^a-zA-Z\s.,!?;:\-\+]+|[^a-zA-Z\s.,!?;:\-\+]+$", "", word + ) + ocr_elements = [] + ocr_table = "Text Table:\nWord id\tText\n" + # Obtain the for each valid element + grouping_map = defaultdict(list) + ocr_id = 0 + for i in range(len(image_data["text"])): + block_num = image_data["block_num"][i] + if image_data["text"][i]: + grouping_map[block_num].append(image_data["text"][i]) + ocr_table += f"{ocr_id}\t{image_data['text'][i]}\n" + ocr_elements.append( + { + "id": ocr_id, + "text": image_data["text"][i], + "group_num": block_num, + "word_num": len(grouping_map[block_num]), + "left": image_data["left"][i], + "top": image_data["top"][i], + "width": image_data["width"][i], + "height": image_data["height"][i], + } + ) + ocr_id += 1 + + return ocr_table, ocr_elements + + # Given the state and worker's text phrase, generate the coords of the first/last word in the phrase + def generate_text_coords( + self, phrase: str, obs: Dict, alignment: str = "" + ) -> List[int]: + ocr_table, ocr_elements = self.get_ocr_elements(obs["screenshot"]) + + alignment_prompt = "" + if alignment == "start": + alignment_prompt = "**Important**: Output the word id of the FIRST word in the provided phrase.\n" + elif alignment == "end": + alignment_prompt = "**Important**: Output the word id of the LAST word in the provided phrase.\n" + + # Load LLM prompt + self.text_span_agent.reset() + self.text_span_agent.add_message( + alignment_prompt + "Phrase: " + phrase + "\n" + ocr_table, role="user" + ) + self.text_span_agent.add_message( + "Screenshot:\n", image_content=obs["screenshot"], role="user" + ) + + # Obtain the target element + response = call_llm_safe(self.text_span_agent) + #print("TEXT SPAN AGENT RESPONSE:", response) + numericals = re.findall(r"\d+", response) + if len(numericals) > 0: + text_id = int(numericals[-1]) + else: + text_id = 0 + elem = ocr_elements[text_id] + + # Compute the element coordinates + if alignment == "start": + coords = [elem["left"], elem["top"] + (elem["height"] // 2)] + elif alignment == "end": + coords = [elem["left"] + elem["width"], elem["top"] + (elem["height"] // 2)] + else: + coords = [ + elem["left"] + (elem["width"] // 2), + elem["top"] + (elem["height"] // 2), + ] + return coords + + # Takes a description based action and assigns the coordinates for any coordinate based action + # Raises an error if function can't be parsed + def assign_coordinates(self, plan: str, obs: Dict, request_vllm): + + # Reset coords from previous action generation + self.coords1, self.coords2 = None, None + + try: + # Extract the function name and args + action = parse_single_code_from_string(plan.split("Grounded Action")[-1]) + function_name = re.match(r"(\w+\.\w+)\(", action).group(1) + args = self.parse_function_args(action) + except Exception as e: + raise RuntimeError(f"Error in parsing grounded action: {e}") from e + + # arg0 is a description + if ( + function_name in ["agent.click", "agent.type", "agent.scroll"] + and len(args) >= 1 + and args[0] != None + ): + self.coords1 = self.generate_coords(args[0], obs, request_vllm) + # arg0 and arg1 are descriptions + elif function_name == "agent.drag_and_drop" and len(args) >= 2: + self.coords1 = self.generate_coords(args[0], obs, request_vllm) + self.coords2 = self.generate_coords(args[1], obs, request_vllm) + # arg0 and arg1 are text phrases + elif function_name == "agent.highlight_text_span" and len(args) >= 2: + self.coords1 = self.generate_text_coords(args[0], obs, alignment="start") + self.coords2 = self.generate_text_coords(args[1], obs, alignment="end") + + # Resize from grounding model dim into OSWorld dim (1920 * 1080) + def resize_coordinates(self, coordinates: List[int]) -> List[int]: + return [ + round(coordinates[0] * self.width), + round(coordinates[1] * self.height), + ] + + # Given a generated ACI function, returns a list of argument values, where descriptions are at the front of the list + def parse_function_args(self, function: str) -> List[str]: + tree = ast.parse(function) + call_node = tree.body[0].value + + def safe_eval(node): + if isinstance( + node, ast.Constant + ): # Handles literals like numbers, strings, etc. + return node.value + else: + return ast.unparse(node) # Return as a string if not a literal + + positional_args = [safe_eval(arg) for arg in call_node.args] + keyword_args = {kw.arg: safe_eval(kw.value) for kw in call_node.keywords} + + res = [] + + for key, val in keyword_args.items(): + if "description" in key: + res.append(val) + + for arg in positional_args: + res.append(arg) + + return res + + def click( + self, + instruction: str, + num_clicks: int = 1, + button_type: str = "left", + hold_keys: List = [], + ): + """Click on the element + Args: + instruction:str, decribe the element you want to interact with in detail including the visual description and function description. And make it clear and concise. For example you can describe what the element looks like, and what will be the expected result when you interact with it. + num_clicks:int, number of times to click the element + button_type:str, which mouse button to press can be "left", "middle", or "right" + hold_keys:List, list of keys to hold while clicking + """ + x, y = self.resize_coordinates(self.coords1) + command = "import pyautogui; " + + # TODO: specified duration? + for k in hold_keys: + command += f"pyautogui.keyDown({repr(k)}); " + command += f"""import pyautogui; pyautogui.click({x}, {y}, clicks={num_clicks}, button={repr(button_type)}); """ + for k in hold_keys: + command += f"pyautogui.keyUp({repr(k)}); " + # Return pyautoguicode to click on the element + return command + + def switch_applications(self, app_code): + """Switch to a different application that is already open + Args: + app_code:str the code name of the application to switch to from the provided list of open applications + """ + if self.platform == "darwin": + return f"import pyautogui; import time; pyautogui.hotkey('command', 'space', interval=0.5); pyautogui.typewrite({repr(app_code)}); pyautogui.press('enter'); time.sleep(1.0)" + elif self.platform == "linux": + return UBUNTU_APP_SETUP.replace("APP_NAME", app_code) + elif self.platform == "windows": + return f"import pyautogui; import time; pyautogui.hotkey('win', 'd', interval=0.5); pyautogui.typewrite({repr(app_code)}); pyautogui.press('enter'); time.sleep(1.0)" + + def open(self, app_or_filename: str): + """Open any application or file with name app_or_filename. Use this action to open applications or files on the desktop, do not open manually. + Args: + app_or_filename:str, the name of the application or filename to open + """ + return f"import pyautogui; pyautogui.hotkey('win'); time.sleep(0.5); pyautogui.write({repr(app_or_filename)}); time.sleep(1.0); pyautogui.hotkey('enter'); time.sleep(0.5)" + + def type( + self, + element_description: Optional[str] = None, + text: str = "", + overwrite: bool = False, + enter: bool = False, + ): + """Type text into a specific element + Args: + element_description:str, a detailed description of which element to enter text in. This description should be at least a full sentence. + text:str, the text to type + overwrite:bool, Assign it to True if the text should overwrite the existing text, otherwise assign it to False. Using this argument clears all text in an element. + enter:bool, Assign it to True if the enter key should be pressed after typing the text, otherwise assign it to False. + """ + + if self.coords1 is not None: + # If a node is found, retrieve its coordinates and size + # Start typing at the center of the element + + x, y = self.resize_coordinates(self.coords1) + + command = "import pyautogui; " + command += f"pyautogui.click({x}, {y}); " + + if overwrite: + command += ( + f"pyautogui.hotkey('ctrl', 'a'); pyautogui.press('backspace'); " + ) + + command += f"pyautogui.write({repr(text)}); " + + if enter: + command += "pyautogui.press('enter'); " + else: + # If no element is found, start typing at the current cursor location + command = "import pyautogui; " + + if overwrite: + command += ( + f"pyautogui.hotkey('ctrl', 'a'); pyautogui.press('backspace'); " + ) + + command += f"pyautogui.write({repr(text)}); " + + if enter: + command += "pyautogui.press('enter'); " + + return command + + def drag_and_drop( + self, starting_description: str, ending_description: str, hold_keys: List = [] + ): + """Drag from the starting description to the ending description + Args: + starting_description:str, a very detailed description of where to start the drag action. This description should be at least a full sentence. And make it clear and concise. + ending_description:str, a very detailed description of where to end the drag action. This description should be at least a full sentence. And make it clear and concise. + hold_keys:List list of keys to hold while dragging + """ + x1, y1 = self.resize_coordinates(self.coords1) + x2, y2 = self.resize_coordinates(self.coords2) + + command = "import pyautogui; " + + command += f"pyautogui.moveTo({x1}, {y1}); " + # TODO: specified duration? + for k in hold_keys: + command += f"pyautogui.keyDown({repr(k)}); " + command += f"pyautogui.dragTo({x2}, {y2}, duration=1.); pyautogui.mouseUp(); " + for k in hold_keys: + command += f"pyautogui.keyUp({repr(k)}); " + + # Return pyautoguicode to drag and drop the elements + + return command + + def highlight_text_span(self, starting_phrase: str, ending_phrase: str): + """Highlight a text span between a provided starting phrase and ending phrase. Use this to highlight words, lines, and paragraphs. + Args: + starting_phrase:str, the phrase that denotes the start of the text span you want to highlight. If you only want to highlight one word, just pass in that single word. + ending_phrase:str, the phrase that denotes the end of the text span you want to highlight. If you only want to highlight one word, just pass in that single word. + """ + + x1, y1 = self.coords1 + x2, y2 = self.coords2 + + command = "import pyautogui; " + command += f"pyautogui.moveTo({x1}, {y1}); " + command += f"pyautogui.dragTo({x2}, {y2}, duration=1.); pyautogui.mouseUp(); " + + # Return pyautoguicode to drag and drop the elements + return command + + def set_cell_values( + self, cell_values: Dict[str, Any], app_name: str, sheet_name: str + ): + """Use this to set individual cell values in a spreadsheet. For example, setting A2 to "hello" would be done by passing {"A2": "hello"} as cell_values. The sheet must be opened before this command can be used. + Args: + cell_values: Dict[str, Any], A dictionary of cell values to set in the spreadsheet. The keys are the cell coordinates in the format "A1", "B2", etc. + Supported value types include: float, int, string, bool, formulas. + app_name: str, The name of the spreadsheet application. For example, "Some_sheet.xlsx". + sheet_name: str, The name of the sheet in the spreadsheet. For example, "Sheet1". + """ + return SET_CELL_VALUES_CMD.format( + cell_values=cell_values, app_name=app_name, sheet_name=sheet_name + ) + + def scroll(self, instruction: str, clicks: int, shift: bool = False): + """Scroll the element in the specified direction + Args: + instruction:str, a very detailed description of which element to enter scroll in. This description should be at least a full sentence. And make it clear and concise. + clicks:int, the number of clicks to scroll can be positive (up) or negative (down). + shift:bool, whether to use shift+scroll for horizontal scrolling + """ + + x, y = self.resize_coordinates(self.coords1) + + if shift: + return f"import pyautogui; import time; pyautogui.moveTo({x}, {y}); time.sleep(0.5); pyautogui.hscroll({clicks})" + else: + return f"import pyautogui; import time; pyautogui.moveTo({x}, {y}); time.sleep(0.5); pyautogui.vscroll({clicks})" + + def hotkey(self, keys: List): + """Press a hotkey combination + Args: + keys:List the keys to press in combination in a list format (e.g. ['ctrl', 'c']) + """ + # add quotes around the keys + keys = [f"'{key}'" for key in keys] + return f"import pyautogui; pyautogui.hotkey({', '.join(keys)})" + + def hold_and_press(self, hold_keys: List, press_keys: List): + """Hold a list of keys and press a list of keys + Args: + hold_keys:List, list of keys to hold + press_keys:List, list of keys to press in a sequence + """ + + press_keys_str = "[" + ", ".join([f"'{key}'" for key in press_keys]) + "]" + command = "import pyautogui; " + for k in hold_keys: + command += f"pyautogui.keyDown({repr(k)}); " + command += f"pyautogui.press({press_keys_str}); " + for k in hold_keys: + command += f"pyautogui.keyUp({repr(k)}); " + + return command + + def wait(self, time: float): + """Wait for a specified amount of time + Args: + time:float the amount of time to wait in seconds + """ + return f"""import time; time.sleep({time})""" + + def done( + self, + return_value: Optional[Union[Dict, str, List, Tuple, int, float, bool]] = None, + ): + """End the current task with a success and the required return value""" + self.returned_info = return_value + return """DONE""" + + def fail(self): + """End the current task with a failure, and replan the whole task.""" + return """FAIL""" + +def call_llm_safe(agent): + ''' + functions borrow from https://github.com/simular-ai/Agent-S/blob/a0c5c9bf0c526119b1f023c8948563c780729428/gui_agents/s2/utils/common_utils.py#L27 + ''' + # Retry if fails + max_retries = 3 # Set the maximum number of retries + attempt = 0 + response = "" + while attempt < max_retries: + try: + response = agent.get_response() + break # If successful, break out of the loop + except Exception as e: + attempt += 1 + print(f"Attempt {attempt} failed: {e}") + if attempt == max_retries: + print("Max retries reached. Handling failure.") + time.sleep(1.0) + return response + +def parse_single_code_from_string(input_string): + ''' + functions borrow from https://github.com/simular-ai/Agent-S/blob/a0c5c9bf0c526119b1f023c8948563c780729428/gui_agents/s2/utils/common_utils.py#L129 + ''' + input_string = input_string.strip() + if input_string.strip() in ["WAIT", "DONE", "FAIL"]: + return input_string.strip() + + # This regular expression will match both ```code``` and ```python code``` + # and capture the `code` part. It uses a non-greedy match for the content inside. + pattern = r"```(?:\w+\s+)?(.*?)```" + # Find all non-overlapping matches in the string + matches = re.findall(pattern, input_string, re.DOTALL) + + # The regex above captures the content inside the triple backticks. + # The `re.DOTALL` flag allows the dot `.` to match newline characters as well, + # so the code inside backticks can span multiple lines. + + # matches now contains all the captured code snippets + + codes = [] + + for match in matches: + match = match.strip() + commands = [ + "WAIT", + "DONE", + "FAIL", + ] # fixme: updates this part when we have more commands + + if match in commands: + codes.append(match.strip()) + elif match.split("\n")[-1] in commands: + if len(match.split("\n")) > 1: + codes.append("\n".join(match.split("\n")[:-1])) + codes.append(match.split("\n")[-1]) + else: + codes.append(match) + + return codes[0] + +agent = OSWorldACI('linux') + +class GTA1Agent: + ''' + class based on https://github.com/xlang-ai/OSWorld/blob/main/mm_agents/jedi_7b_agent.py + ''' + def __init__( + self, + platform="ubuntu", + planner_model="o3", + max_tokens=4096, + top_p=0.9, + temperature= 0.0, + action_space="pyautogui", + observation_type="screenshot", + max_steps=15, + max_image_history_length = 5, + N_SEQ = 4, + ): + self.platform = platform + self.max_tokens = max_tokens + self.top_p = top_p + self.temperature = temperature + self.action_space = action_space + self.observation_type = observation_type + assert action_space in ["pyautogui"], "Invalid action space" + assert observation_type in ["screenshot"], "Invalid observation type" + self.thoughts = [] + self.actions = [] + self.observations = [] + self.observation_captions = [] + self.max_steps = max_steps + self.planner_model=planner_model + self.current_step = 1 + self.max_image_history_length = max_image_history_length + self.N_SEQ=N_SEQ + + def predict(self, instruction: str, obs: Dict) -> List: + """ + Predict the next action(s) based on the current observation. + """ + + user_prompt = ( + f"""Please generate the next move according to the UI screenshot and instruction. And you can refer to the previous actions and observations for reflection.\n\nInstruction: {instruction}\n\n""") + + system_prompt = GTA1_PLANNER_SYSTEM_PROMPT + + messages = [{ + "role": "system", + "content": [{ + "type": "text", + "text": system_prompt.replace("{current_step}", str(self.current_step)).replace("{max_steps}", str(self.max_steps)) + }] + }] + + # Determine which observations to include images for (only most recent ones) + obs_start_idx = max(0, len(self.observations) - self.max_image_history_length) + + # Add all thought and action history + for i in range(len(self.thoughts)): + # For recent steps, include the actual screenshot + if i >= obs_start_idx: + messages.append({ + "role": "user", + "content": [{ + "type": "image_url", + "image_url": { + "url": f"data:image/png;base64,{encode_image(self.observations[i]['screenshot'])}", + "detail": "high" + }, + }] + }) + + messages.append({ + "role": "user", + "content": [{ + "type": "text", + "text": f"Step {i+1} Observation:\n{self.observation_captions[i]}\n" + }] + }) + + thought_messages = f"Step {i+1} Thought:\n{self.thoughts[i]}" + + action_messages = f"Step {i+1} Action:" + for action in self.actions[i]: + action_messages += f"\n{action}" + messages.append({ + "role": "assistant", + "content": [{ + "type": "text", + "text": thought_messages + "\n" + action_messages + }] + }) + + messages.append({ + "role":"user", + "content": [ + { + "type":"image_url", + "image_url":{ + "url":f"data:image/png;base64,{encode_image(obs['screenshot'])}", + "detail": "high" + }, + }, + { + "type": "text", + "text": user_prompt + }, + ], + }) + + N = self.N_SEQ + + planner_response = [] + for bn in split_to_batches(N, batch_size=8): + planner_response_ = self.call_llm({ + "model": self.planner_model, + "messages": messages, + "n": bn, + "max_completion_tokens": self.max_tokens, + }, self.planner_model) + planner_response.extend(planner_response_) + + valid_responses = [response for response in planner_response if self.isvalid(response)] + N = N - len(valid_responses) + planner_response = [response for response in planner_response if not self.isvalid(response)] + if planner_response: + planner_response = planner_response[0] + retry_count = 0 + max_retries = 5 + while N > 0: + if retry_count >= max_retries: + break + + messages.append({ + "role": "user", + "content": [ + {"type": "text", "text": """You didn't generate a valid "Observation:\n(.*?)\n" section, a valid "Thought:\n(.*?)\n" section, or valid actions. Please try again."""} #"You didn't generate valid actions. Please try again."} + ] + }) + + planner_response = [] + for bn in split_to_batches(N, batch_size=8): + planner_response_ = self.call_llm({ + "model": self.planner_model, + "messages": messages, + "n": bn, + "max_completion_tokens": self.max_tokens * 4, + }, self.planner_model) + planner_response.extend(planner_response_) + + valid_responses_ = [response for response in planner_response if self.isvalid(response)] + N = N - len(valid_responses_) + planner_response = [response for response in planner_response if not self.isvalid(response)] + if planner_response: + planner_response = planner_response[0] + valid_responses.extend(valid_responses_) + retry_count += 1 + + assert len(valid_responses) > int(self.N_SEQ) * 0.5, f"Not enough valid responses generated {len(valid_responses)}" + + if self.N_SEQ > 1: + history_cache = [f"Observation:\n{o}\nThought:\n{t}\nAction:\n{a}" for a,t,o in zip(self.actions, self.thoughts, self.observations)] + planner_response = self.select(instruction, Image.open(BytesIO(obs['screenshot'])), valid_responses, history_cache) + else: + planner_response = valid_responses[0] + codes = self.parse_code_from_planner_response(planner_response) + + thought = self.parse_thought_from_planner_response(planner_response) + observation_caption = self.parse_observation_caption_from_planner_response(planner_response) + + def request_vllm(image, prompt): + if isinstance(image, bytes): + image = np.array(Image.open(BytesIO(image)).convert('RGB')) + H, W, C = image.shape + H, W = smart_resize( + H, + W, + factor=28, + min_pixels=1000, + max_pixels=1000000000000, + ) + assert C == 3 + if isinstance(image, np.ndarray): + image_base64 = encode_numpy_image_to_base64(image) + elif isinstance(image, bytes): + image_base64 = encode_image_bytes(image) + else: + raise ValueError(f"Invalid image type: {type(image)}") + messages=[ + {"role": "system", "content": GTA1_GROUNDING_SYSTEM_PROMPT.format(height=H, width=W)}, + { + "role": + "user", + "content": [ + { + "type": "text", + "text": prompt + }, + { + "type": "image_url", + "image_url": { + "url": f"data:image/png;base64,{image_base64}" + }, + }, + ], + }] + vllm_client = OpenAI( + base_url=GTA1_SERVICE_URL, + api_key=GTA1_API_KEY, + ) + response = vllm_client.chat.completions.create( + model=GTA1_MODEL_NMAE, + messages=messages, + max_tokens=100, + temperature=0, + n=1 + ) + result = response.choices[0].message.content + matches = re.findall(r"\((-?\d*\.?\d+),\s*(-?\d*\.?\d+)\)", result) + x,y = [tuple(map(int, match)) for match in matches][0] + x = x/W + y = y/H + return x,y + + agent.assign_coordinates(planner_response, obs, request_vllm) + + plan_code = extract_first_agent_function("\n".join(codes)) + pyautogui_actions = [eval(plan_code)] + + plan_code = [plan_code] + self.actions.append([plan_code]) + self.observations.append(obs) + self.thoughts.append(thought) + self.observation_captions.append(observation_caption) + self.current_step += 1 + + if self.current_step >= self.max_steps: + pyautogui_actions = ["FAIL"] + + return planner_response, pyautogui_actions + + def select(self, instruction, screenshot, response, history_cache): + height, width = screenshot.height, screenshot.width + height, width = smart_resize( + height, + width, + factor=28, + min_pixels=1000, + max_pixels=1000000000000, + ) + image = screenshot.resize((height, width)) + + system_promt = GTA1_JUDGE_SYSTEM_PROMPT.format(N_PLANNING=len(response), N_INDEX=len(response)-1,width=width,height=height) + lines = [ + f"The goal of the task is:\n{instruction}", + ] + if len(history_cache) == 0: + history_cache = ["No history available. The action just started"] + + lines = [ + f"The goal of the task is:\n{instruction}", + "Here are the past history:" + ] + lines += [ + f"### Past step {idx}:\n{step}" + for idx, step in enumerate(history_cache) + ] + + lines += ["Here are the different plans to compare:"] + lines += [ + f"### Index {idx}:\n{plan}" + for idx, plan in enumerate(response) + ] + user_message = "\n".join(lines) + + + messages = [ + { + "role": "system", + "content": [{"type": "text", "text": system_promt}] + }, + { + "role": "user", + "content": [{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{pil_to_base64(image)}"}}, {"type": "text", "text": user_message}] + } + ] + url = "https://api.openai.com/v1/chat/completions" + + headers = {"Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}", "Content-Type":"application/json"} + payload = { + "model": "o3", + "messages": messages, + "max_completion_tokens": 4096 * 4, + } + + wait = 1 + for _ in range(10): + try: + prediction = requests.post(url, headers=headers, json=payload, proxies=proxies, timeout=180) + if prediction.status_code != 200: + continue + prediction = prediction.json()['choices'][0]['message']['content'] + prediction = extract_answer_from_response(prediction) + return response[prediction['index']] + except: + time.sleep(wait) + wait *=2 + wait = min(wait,32) + continue + return response[0] + + def isvalid(self,planner_response): + codes = self.parse_code_from_planner_response(planner_response) + thought = self.parse_thought_from_planner_response(planner_response) + observation_caption = self.parse_observation_caption_from_planner_response(planner_response) + return bool(codes and thought and observation_caption) + + def parse_code_from_planner_response(self, input_string: str) -> List[str]: + + input_string = "\n".join([line.strip() for line in input_string.split(';') if line.strip()]) + + pattern = r"```(?:\w+\s+)?(.*?)```" + matches = re.findall(pattern, input_string, re.DOTALL) + codes = [] + + for match in matches: + match = match.strip() + codes.append(match) + return codes + + def unsetonestep(self): + self.actions = self.actions[:-1] + self.observations = self.actions[:-1] + self.thoughts.append = self.actions[:-1] + self.observation_captions = self.actions[:-1] + self.current_step -= 1 + + def parse_observation_caption_from_planner_response(self, input_string: str) -> str: + pattern = r"Observation:\n(.*?)\n" + matches = re.findall(pattern, input_string, re.DOTALL) + if matches: + return matches[0].strip() + return "" + + def parse_thought_from_planner_response(self, input_string: str) -> str: + pattern = r"Thought:\n(.*?)\n" + matches = re.findall(pattern, input_string, re.DOTALL) + if matches: + return matches[0].strip() + return "" + + @backoff.on_exception( + backoff.constant, + # here you should add more model exceptions as you want, + # but you are forbidden to add "Exception", that is, a common type of exception + # because we want to catch this kind of Exception in the outside to ensure + # each example won't exceed the time limit + ( + # General exceptions + SSLError, + # OpenAI exceptions + openai.RateLimitError, + openai.BadRequestError, + openai.InternalServerError, + # Google exceptions + InvalidArgument, + ResourceExhausted, + InternalServerError, + BadRequest, + # Groq exceptions + # todo: check + ), + interval=30, + max_tries=10, + ) + def call_llm(self, payload, model): + if model.startswith("gpt") or "o3" in model: + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}" + } + response = requests.post( + "https://api.openai.com/v1/chat/completions", + headers=headers, + proxies=proxies, + json=payload, + ) + #print(response.status_code,"!!!") + #print(response.json(),"!!!") + if response.status_code != 200: + time.sleep(5) + return "" + else: + response = response.json() + return [response["choices"][i]["message"]["content"] for i in range(len(response["choices"]))] + else: + raise SystemExit + + def reset(self, _logger=None): + + self.thoughts = [] + self.action_descriptions = [] + self.actions = [] + self.observations = [] + self.observation_captions = [] + + + +def extract_first_agent_function(code_string): + ''' + functions borrow from https://github.com/simular-ai/Agent-S/blob/a0c5c9bf0c526119b1f023c8948563c780729428/gui_agents/s2/utils/common_utils.py#L189 + ''' + # Regular expression pattern to match 'agent' functions with any arguments, including nested parentheses + pattern = r'agent\.[a-zA-Z_]+\((?:[^()\'"]|\'[^\']*\'|"[^"]*")*\)' + + # Find all matches in the string + matches = re.findall(pattern, code_string) + + # Return the first match if found, otherwise return None + return matches[0] if matches else None + +def split_to_batches(n, batch_size=8): + batches = [batch_size] * (n // batch_size) + remainder = n % batch_size + if remainder: + batches.append(remainder) + return batches + +def extract_answer_from_response(response): + if not response or not isinstance(response, str): + raise ValueError("Response must be a non-empty string") + json_pattern = r'```json\s*(.*?)\s*```' + json_match = re.search(json_pattern, response, re.DOTALL) + + if json_match: + json_str = json_match.group(1) + try: + answer = json.loads(json_str) + if "explaining" in answer and "index" in answer: + answer["index"] = int(answer["index"]) + return answer + else: + raise ValueError("JSON missing required fields 'explaining' or 'index'") + + except json.JSONDecodeError: + pass + + direct_json_pattern = r'\{[\s\S]*?"explaining"[\s\S]*?"index"[\s\S]*?\}' + direct_match = re.search(direct_json_pattern, response) + + if direct_match: + try: + json_str = direct_match.group(0) + json_str = json_str.replace(''', "'").replace(''', "'").replace('"', '"').replace('"', '"') + answer = json.loads(json_str) + answer["index"] = int(answer["index"]) + return answer + except json.JSONDecodeError: + pass + index_pattern = r'"index"\s*:\s*(\d+)' + index_match = re.search(index_pattern, response) + + explaining_pattern = r'"explaining"\s*:\s*"(.*?)"(?=,|\s*})' + explaining_match = re.search(explaining_pattern, response, re.DOTALL) + + if not explaining_match: + explaining_pattern = r'"explaining"\s*:\s*(.*?)(?=,\s*"index"|\s*})' + explaining_match = re.search(explaining_pattern, response, re.DOTALL) + + if index_match and explaining_match: + return { + "index": int(index_match.group(1)), + "explaining": explaining_match.group(1).strip('" \t\n') + } + if index_match: + return { + "index": int(index_match.group(1)), + "explaining": "Explanation not found in response" + } + raise ValueError("Could not extract valid answer from response") + + +def pil_to_base64(image): + ''' + function borrow from https://github.com/xlang-ai/OSWorld/blob/7d0ad02706a7fe742fa1ad6a483782835e3d51e6/mm_agents/uitars_agent.py#L486 + ''' + buffer = BytesIO() + image.save(buffer, format="PNG") + return base64.b64encode(buffer.getvalue()).decode("utf-8") + +def encode_numpy_image_to_base64(image: np.ndarray) -> str: + """Converts a numpy array image to base64 string. + + Args: + image: Numpy array representing an image (height, width, channels) + + Returns: + Base64 encoded string of the image + """ + # Convert numpy array to bytes + success, buffer = cv2.imencode('.png', image) + if not success: + raise ValueError("Failed to encode image to png format") + + # Convert bytes to base64 string + image_bytes = buffer.tobytes() + base64_string = base64.b64encode(image_bytes).decode('utf-8') + + return base64_string + +def encode_image_bytes(image_content): + return base64.b64encode(image_content).decode('utf-8') \ No newline at end of file diff --git a/mm_agents/prompts.py b/mm_agents/prompts.py index e1e53a8..0055975 100644 --- a/mm_agents/prompts.py +++ b/mm_agents/prompts.py @@ -1338,3 +1338,182 @@ Here are some guidelines for you: My computer's password is 'password', feel free to use it when you need sudo rights. First give the current screenshot and previous things we did a short reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR NEVER EVER RETURN ME ANYTHING ELSE. """ + +GTA1_PLANNER_SYSTEM_PROMPT = """You are an agent which follow my instruction and perform desktop computer tasks as instructed. +You have good knowledge of computer and good internet connection and assume your code will run on a computer for controlling the mouse and keyboard. +You are on Ubuntu operating system and the resolution of the screen is 1920x1080. +For each step, you will get: +- An observation of an image, which is the screenshot of the computer screen and you will predict the action of the computer based on the image. +- Access to the following class and methods to interact with the UI: +class Agent: + + def click(self, instruction: str, num_clicks: int = 1, button_type: str = 'left', hold_keys: List = []): + '''Click on the element + Args: + instruction:str, decribe the element you want to interact with in detail including the visual description and function description. And make it clear and concise. For example you can describe what the element looks like, and what will be the expected result when you interact with it. + num_clicks:int, number of times to click the element + button_type:str, which mouse button to press can be "left", "middle", or "right" + hold_keys:List, list of keys to hold while clicking + ''' + + def done(self, return_value: Union[Dict, str, List, Tuple, int, float, bool, NoneType] = None): + '''End the current task with a success and the required return value''' + + def drag_and_drop(self, starting_description: str, ending_description: str, hold_keys: List = []): + '''Drag from the starting description to the ending description + Args: + starting_description:str, a very detailed description of where to start the drag action. This description should be at least a full sentence. And make it clear and concise. + ending_description:str, a very detailed description of where to end the drag action. This description should be at least a full sentence. And make it clear and concise. + hold_keys:List list of keys to hold while dragging + ''' + + def fail(self): + '''End the current task with a failure, and replan the whole task.''' + + def highlight_text_span(self, starting_phrase: str, ending_phrase: str): + '''Highlight a text span between a provided starting phrase and ending phrase. Use this to highlight words, lines, and paragraphs. + Args: + starting_phrase:str, the phrase that denotes the start of the text span you want to highlight. If you only want to highlight one word, just pass in that single word. + ending_phrase:str, the phrase that denotes the end of the text span you want to highlight. If you only want to highlight one word, just pass in that single word. + ''' + + def hold_and_press(self, hold_keys: List, press_keys: List): + '''Hold a list of keys and press a list of keys + Args: + hold_keys:List, list of keys to hold + press_keys:List, list of keys to press in a sequence + ''' + + def hotkey(self, keys: List): + '''Press a hotkey combination + Args: + keys:List the keys to press in combination in a list format (e.g. ['ctrl', 'c']) + ''' + + def open(self, app_or_filename: str): + '''Open any application or file with name app_or_filename. Use this action to open applications or files on the desktop, do not open manually. + Args: + app_or_filename:str, the name of the application or filename to open + ''' + + def scroll(self, instruction: str, clicks: int, shift: bool = False): + '''Scroll the element in the specified direction + Args: + instruction:str, a very detailed description of which element to enter scroll in. This description should be at least a full sentence. And make it clear and concise. + clicks:int, the number of clicks to scroll can be positive (up) or negative (down). + shift:bool, whether to use shift+scroll for horizontal scrolling + ''' + + def set_cell_values(self, cell_values: Dict[str, Any], app_name: str, sheet_name: str): + '''Use this to set individual cell values in a spreadsheet. For example, setting A2 to "hello" would be done by passing {"A2": "hello"} as cell_values. The sheet must be opened before this command can be used. + Args: + cell_values: Dict[str, Any], A dictionary of cell values to set in the spreadsheet. The keys are the cell coordinates in the format "A1", "B2", etc. + Supported value types include: float, int, string, bool, formulas. + app_name: str, The name of the spreadsheet application. For example, "Some_sheet.xlsx". + sheet_name: str, The name of the sheet in the spreadsheet. For example, "Sheet1". + ''' + + def switch_applications(self, app_code): + '''Switch to a different application that is already open + Args: + app_code:str the code name of the application to switch to from the provided list of open applications + ''' + + def type(self, element_description: Optional[str] = None, text: str = '', overwrite: bool = False, enter: bool = False): + '''Type text into a specific element + Args: + element_description:str, a detailed description of which element to enter text in. This description should be at least a full sentence. + text:str, the text to type + overwrite:bool, Assign it to True if the text should overwrite the existing text, otherwise assign it to False. Using this argument clears all text in an element. + enter:bool, Assign it to True if the enter key should be pressed after typing the text, otherwise assign it to False. + ''' + + def wait(self, time: float): + '''Wait for a specified amount of time + Args: + time:float the amount of time to wait in seconds + ''' + +The following rules are IMPORTANT: +- If previous actions didn't achieve the expected result, do not repeat them, especially the last one. Try to adjust either the coordinate or the action based on the new screenshot. +- Do not predict multiple clicks at once. Base each action on the current screenshot; do not predict actions for elements or events not yet visible in the screenshot. +- You cannot complete the task by outputting text content in your response. You must use mouse and keyboard to interact with the computer. Call ```agent.fail()``` function when you think the task can not be done. +- You must use only the available methods provided above to interact with the UI, do not invent new methods. + +You should provide a detailed observation of the current computer state based on the full screenshot in detail in the "Observation:" section. +Provide any information that is possibly relevant to achieving the task goal and any elements that may affect the task execution, such as pop-ups, notifications, error messages, loading states, etc.. +You MUST return the observation before the thought. + +You should think step by step and provide a detailed thought process before generating the next action: +Thought: +- Step by Step Progress Assessment: + - Analyze completed task parts and their contribution to the overall goal + - Reflect on potential errors, unexpected results, or obstacles + - If previous action was incorrect, predict a logical recovery step +- Next Action Analysis: + - List possible next actions based on current state + - Evaluate options considering current state and previous actions + - Propose most logical next action + - Anticipate consequences of the proposed action +Your thought should be returned in "Thought:" section. You MUST return the thought before the code. + +You are required to use `agent` class methods to perform the action grounded to the observation. +Return exactly ONE line of python code to perform the action each time. At each step (example: ```agent.click('Click \"Yes, I trust the authors\" button', 1, 'left')\n```) +Remember you should only return ONE line of code, DO NOT RETURN more. You should return the code inside a code block, like this: +```python +agent.click('Click \"Yes, I trust the authors\" button', 1, "left") +``` + +For your reference, you have maximum of 100 steps, and current step is {current_step} out of {max_steps}. +If you are in the last step, you should return ```agent.done()``` or ```agent.fail()``` according to the result. + +Here are some guidelines for you: +1. Remember to generate the corresponding instruction to the code before a # in a comment and only return ONE line of code. +2. `agent.click` can have multiple clicks. For example, agent.click('Click \"Yes, I trust the authors\" button', 2, "left") is double click. +3. Return ```agent.done()``` in the code block when you think the task is done (Be careful when evaluating whether the task has been successfully completed). Return ```agent.fail()``` in the code block when you think the task can not be done. +4. Whenever possible, your grounded action should use hot-keys with the agent.hotkey() action instead of clicking or dragging. +5. Save modified files before returning ```agent.done()```. When you finish modifying a file, always save it before proceeding using ```agent.hotkey(['ctrl', 's'])``` or equivalent. Tasks may involve multiple files. Save each after finishing modification. +6. If you meet "Authentication required" prompt, you can continue to click "Cancel" to close it. + +My computer's password is 'password', feel free to use it when you need sudo rights. +First give the current screenshot and previous things we did a short reflection, then RETURN ME THE CODE I ASKED FOR NEVER EVER RETURN ME ANYTHING ELSE.""" + +GTA1_GROUNDING_SYSTEM_PROMPT = ''' +You are an expert UI element locator. Given a GUI image and a user's element description, provide the coordinates of the specified element as a single (x,y) point. The image resolution is height {height} and width {width}. For elements with area, return the center point. + +Output the coordinate pair exactly: +(x,y) +'''.strip() + +GTA1_JUDGE_SYSTEM_PROMPT=''' +You are an expert at evaluating the planning and reasoning of UI agents working toward achieving a goal. + +My computer's password is 'password', feel free to use it when you need sudo rights or login. + +Each time, I will provide you with: +- The current screenshot of the UI of width {width} and height {height} +- The goal of the task +- Past histories of planning and actions that have been taken +- A list of {N_PLANNING} different planning approaches toward achieving the goal in the current state in this form: + Observation: + Thought: + Action: + +Your task is to select the single most effective planning approach that best advances toward the goal. +Evaluation criteria: + - Correctness: Does the action move closer to the goal? + - Effectiveness: Does it make meaningful progress immediately? + - Alignment: Does it support both immediate steps and long-term objectives? + - Planning quality: Is the thought process clear, concise, and logical? + - Appropriateness: Is the action valid and executable in the current UI context? + +Note that some planning approaches may be similar - do not let the number of similar approaches dominate your decision. Evaluate each planning on its own merits. + +Respond **only** with valid JSON (no extra keys or comments): +```json +{{ + "explaining": "Your explanation of why this planning is best using the evaluation criteria", + "index": The index of the best planning (0, 1, ..., {N_INDEX}) +}} +``` +'''.strip()