import base64 import json import logging import os import re import tempfile import time from http import HTTPStatus from io import BytesIO from typing import Dict, List import backoff import openai import requests from PIL import Image from google.api_core.exceptions import InvalidArgument, ResourceExhausted, InternalServerError, BadRequest from requests.exceptions import SSLError logger = logging.getLogger("desktopenv.aguvis_agent") # Function to encode the image def encode_image(image_content): return base64.b64encode(image_content).decode('utf-8') def encoded_img_to_pil_img(data_str): base64_str = data_str.replace("data:image/png;base64,", "") image_data = base64.b64decode(base64_str) image = Image.open(BytesIO(image_data)) return image def save_to_tmp_img_file(data_str): base64_str = data_str.replace("data:image/png;base64,", "") image_data = base64.b64decode(base64_str) image = Image.open(BytesIO(image_data)) tmp_img_path = os.path.join(tempfile.mkdtemp(), "tmp_img.png") image.save(tmp_img_path) return tmp_img_path # TODO: hardcoded screen size, need to be fixed SCREEN_LOGIC_SIZE = (1280, 800) wait_func = {"name": "WAIT", "description": "wait for a moment"} done_func = {"name": "DONE", "description": "done with the task"} fail_func = {"name": "FAIL", "description": "fail to complete the task"} SYS_PROMPT = f"""You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task. """ # TODO: let GPT not to predict non-atomic actions, PLANNER_OUTPUT_FORMAT_SYS_PROMPT = """Your response should be formatted as follows: Thought: *Describe your understanding of the current situation and consider what you need to do next.* Action: *State the specific action you have decided to perform, described in natural language.* **Note:** Please **do not** predict non-atomic actions. For example, for multi-step operations like "click then select the date," only predict the first atomic action (e.g., "click") at this time, and leave subsequent steps (like click for selecting the date) for the next planning phase. **Example:** Thought: To proceed with booking a hotel, I must first specify the check-in and check-out dates for the stay. Since the objective is to book a three-night stay starting from the 1st of June, I need to input these dates into the form to find available accommodations. Action: Click on the "Choose date" button in the Check-in field to start selecting the stay dates. Addtionally, you can use the following functions: - {json.dumps(wait_func)} - {json.dumps(done_func)} - {json.dumps(fail_func)} **Example 1:** Thought: I need to wait for a moment before proceeding. Action: WAIT **Example 2:** Thought: I have completed the task. Action: DONE """ INSTRUCTION_PROMPT = """Please generate the next move according to the UI screenshot, instruction and previous actions. Instruction: {instruction} """ ACTION_PROMPT = """Previous actions: """ def _pyautogui_code_to_absolute_coordinates(pyautogui_code_relative_coordinates, logical_screen_size=SCREEN_LOGIC_SIZE): """ Convert the relative coordinates in the pyautogui code to absolute coordinates based on the logical screen size. """ import re import ast width, height = logical_screen_size pattern = r'(pyautogui\.\w+\([^\)]*\))' matches = re.findall(pattern, pyautogui_code_relative_coordinates) new_code = pyautogui_code_relative_coordinates for full_call in matches: func_name_pattern = r'(pyautogui\.\w+)\((.*)\)' func_match = re.match(func_name_pattern, full_call, re.DOTALL) if not func_match: continue func_name = func_match.group(1) args_str = func_match.group(2) try: parsed = ast.parse(f"func({args_str})").body[0].value parsed_args = parsed.args parsed_keywords = parsed.keywords except SyntaxError: continue function_parameters = { 'click': ['x', 'y', 'clicks', 'interval', 'button', 'duration', 'pause'], 'moveTo': ['x', 'y', 'duration', 'tween', 'pause'], 'moveRel': ['xOffset', 'yOffset', 'duration', 'tween', 'pause'], 'dragTo': ['x', 'y', 'duration', 'button', 'mouseDownUp', 'pause'], 'dragRel': ['xOffset', 'yOffset', 'duration', 'button', 'mouseDownUp', 'pause'], 'doubleClick': ['x', 'y', 'interval', 'button', 'duration', 'pause'], } func_base_name = func_name.split('.')[-1] param_names = function_parameters.get(func_base_name, []) args = {} for idx, arg in enumerate(parsed_args): if idx < len(param_names): param_name = param_names[idx] arg_value = ast.literal_eval(arg) args[param_name] = arg_value for kw in parsed_keywords: param_name = kw.arg arg_value = ast.literal_eval(kw.value) args[param_name] = arg_value updated = False if 'x' in args: try: x_rel = float(args['x']) x_abs = int(round(x_rel * width)) args['x'] = x_abs updated = True except ValueError: pass if 'y' in args: try: y_rel = float(args['y']) y_abs = int(round(y_rel * height)) args['y'] = y_abs updated = True except ValueError: pass if 'xOffset' in args: try: x_rel = float(args['xOffset']) x_abs = int(round(x_rel * width)) args['xOffset'] = x_abs updated = True except ValueError: pass if 'yOffset' in args: try: y_rel = float(args['yOffset']) y_abs = int(round(y_rel * height)) args['yOffset'] = y_abs updated = True except ValueError: pass if updated: reconstructed_args = [] for idx, param_name in enumerate(param_names): if param_name in args: arg_value = args[param_name] if isinstance(arg_value, str): arg_repr = f"'{arg_value}'" else: arg_repr = str(arg_value) reconstructed_args.append(arg_repr) else: break used_params = set(param_names[:len(reconstructed_args)]) for kw in parsed_keywords: if kw.arg not in used_params: arg_value = args[kw.arg] if isinstance(arg_value, str): arg_repr = f"{kw.arg}='{arg_value}'" else: arg_repr = f"{kw.arg}={arg_value}" reconstructed_args.append(arg_repr) new_args_str = ', '.join(reconstructed_args) new_full_call = f"{func_name}({new_args_str})" new_code = new_code.replace(full_call, new_full_call) return new_code def _parse(text, screen_logic_size=SCREEN_LOGIC_SIZE): if text.lower().startswith("wait"): return "WAIT", "WAIT" elif text.lower().startswith("done"): return "DONE", "DONE" elif text.lower().startswith("fail"): return "FAIL", "FAIL" try: lines = text.strip().split("\n") lines = [line for line in lines if line.strip() != ""] # Remove empty lines pyautogui_index = -1 for i, line in enumerate(lines): if line.strip() == "assistantos" or line.strip().startswith("pyautogui"): pyautogui_index = i break if pyautogui_index == -1: print(f"Error: Could not parse response {text}") return None, None # Return None or handle the error as needed pyautogui_code_relative_coordinates = "\n".join(lines[pyautogui_index:]) # remove the assistantos prefix, ugly, fix later pyautogui_code_relative_coordinates = pyautogui_code_relative_coordinates.replace("assistantos", "") parsed_action = _pyautogui_code_to_absolute_coordinates(pyautogui_code_relative_coordinates, screen_logic_size) return parsed_action except Exception as e: print(f"Error: Could not parse response {text}") return None def parse_planner_response(planner_response): try: # Split the response into lines for easier parsing lines = planner_response.splitlines() # Initialize variables to store thought and action thought = None action_description = None # Iterate over each line to find the thought and action for line in lines: # Check if the line starts with 'Thought:' if line.startswith("Thought:"): # Extract the part after 'Thought: ' as the thought thought = line[len("Thought: "):].strip() # Check if the line starts with 'Action:' elif line.startswith("Action:"): # Extract the part after 'Action: ' as the action action_description = line[len("Action: "):].strip() # Return the thought and action as a dictionary return thought, action_description except Exception as e: print(f"Error: Could not parse response {planner_response}") return "", "" class AguvisAgent: def __init__( self, platform="ubuntu", planner_model="gpt-4o", executor_model="qwen-aguvis-7b", max_tokens=1500, top_p=0.9, temperature=0.5, action_space="pyautogui", observation_type="screenshot", ): self.platform = platform self.planner_model = planner_model self.executor_model = executor_model assert self.executor_model is not None, "Executor model cannot be None" self.max_tokens = max_tokens self.top_p = top_p self.temperature = temperature self.action_space = action_space self.observation_type = observation_type assert action_space in ["pyautogui"], "Invalid action space" assert observation_type in ["screenshot"], "Invalid observation type" self.thoughts = [] self.actions = [] self.observations = [] def predict(self, instruction: str, obs: Dict) -> List: """ Predict the next action(s) based on the current observation. """ # Prepare the payload for the API call messages = [] masks = None self.observations.append(obs["screenshot"]) messages.append({ "role": "system", "content": [ { "type": "text", "text": SYS_PROMPT }, ] }) instruction_prompt = INSTRUCTION_PROMPT.format(instruction=instruction) history_actions_prompt = ACTION_PROMPT # thought, or so called action description for i, action_description in enumerate(self.action_descriptions): history_actions_prompt += f"Step {i+1}: {action_description}\n" if len(history_actions_prompt) > 0: instruction_prompt += "\n\n" + history_actions_prompt base64_img = encode_image(obs["screenshot"]) messages.append({ "role": "user", "content": [ { "type": "text", "text": instruction_prompt }, { "type": "image_url", "image_url": { "url": f"data:image/png;base64,{base64_img}", "detail": "high" } } ] }) if self.planner_model is None: # For now, we call the same model twice, one for planner and one for executor, # This can be improved later when the inference stop token fixed messages.append({ "role": "assistant", "content": [ { "type": "text", "text": """<|recipient|>all\nAction: """ } ] }) with open("messages_direct_executor.json", "w") as f: f.write(json.dumps(messages, indent=4)) executor_response = self.call_llm({ "model": self.executor_model, "messages": messages, "max_tokens": self.max_tokens, "top_p": self.top_p, "temperature": self.temperature }, self.executor_model) logger.info("EXECUTOR RESPONSE: %s", executor_response) pyautogui_action = _parse(executor_response) thought, action_description = parse_planner_response("Action: " + executor_response) self.thoughts.append(thought) self.action_descriptions.append(action_description) self.actions.append(pyautogui_action) return executor_response, [pyautogui_action] else: # Planner stage messages.append({ "role": "user", "content": [ { "type": "text", "text": PLANNER_OUTPUT_FORMAT_SYS_PROMPT + "\nThought:" } ] }) planner_response = self.call_llm({ "model": self.planner_model, "messages": messages, "max_tokens": self.max_tokens, "top_p": self.top_p, "temperature": self.temperature }, self.planner_model) logger.info("PLANNER RESPONSE: %s", planner_response) thought, action_description = parse_planner_response(planner_response) self.thoughts.append(thought) self.action_descriptions.append(action_description) if "WAIT" in action_description: self.actions.append("WAIT") return planner_response, ["WAIT"] elif "DONE" in action_description: self.actions.append("DONE") return planner_response, ["DONE"] elif "FAIL" in action_description: self.actions.append("FAIL") return planner_response, ["FAIL"] messages[1]["content"][0]["text"] = INSTRUCTION_PROMPT.format(instruction=action_description) # pretend nothing happend with stronger planner model messages[-1] = { "role": "assistant", "content": [ { "type": "text", # "text": f"""<|recipient|>all\nAction: {action_description}<|im_end|>\n<|im_start|>assistant<|recipient|>os""" "text": f"""<|recipient|>os""" } ] } with open("messages_executor.json", "w") as f: f.write(json.dumps(messages, indent=4)) # Executor stage executor_response = self.call_llm({ "model": self.executor_model, "messages": messages, "max_tokens": self.max_tokens, "top_p": self.top_p, "temperature": self.temperature }, self.executor_model) logger.info("EXECUTOR RESPONSE: %s", executor_response) pyautogui_action = _parse(executor_response) self.actions.append(pyautogui_action) return planner_response + "\n\n" + executor_response, [pyautogui_action] @backoff.on_exception( backoff.constant, # here you should add more model exceptions as you want, # but you are forbidden to add "Exception", that is, a common type of exception # because we want to catch this kind of Exception in the outside to ensure each example won't exceed the time limit ( # General exceptions SSLError, # OpenAI exceptions openai.RateLimitError, openai.BadRequestError, openai.InternalServerError, # Google exceptions InvalidArgument, ResourceExhausted, InternalServerError, BadRequest, # Groq exceptions # todo: check ), interval=30, max_tries=10 ) def call_llm(self, payload, model): if model.startswith("gpt"): headers = { "Content-Type": "application/json", "Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}" # "Authorization": f"Bearer {os.environ['MIT_SPIDER_TOKEN']}" } logger.info("Generating content with GPT model: %s", model) response = requests.post( "https://api.openai.com/v1/chat/completions", # "http://47.88.8.18:8088/v1/chat/completions", headers=headers, json=payload ) if response.status_code != 200: logger.error("Failed to call LLM: " + response.text) time.sleep(5) return "" else: return response.json()['choices'][0]['message']['content'] elif "aguvis" in model: headers = { "Content-Type": "application/json", } logger.info("Generating content with Aguvis model: %s", model) response = requests.post( "http://101.132.136.195:7908/v1/chat/completions", headers=headers, json=payload ) if response.status_code != 200: logger.error("Failed to call LLM: " + response.text) time.sleep(5) return "" else: return response.json()['choices'][0]['message']['content'] def reset(self): self.thoughts = [] self.action_descriptions = [] self.actions = [] self.observations = [] if __name__ == "__main__": agent = AguvisAgent() with open("screenshot.png", "rb") as f: screenshot = f.read() agent.predict("Add a new paper to my list", {"screenshot": screenshot}) # relative_code = """pyautogui.typewrite("Hello, world! I have a float number 0.172") # pyautogui.click(0, 1, n_click=1) # pyautogui.moveTo(0.5342, 0.5342) # """ # absolute_code = _pyautogui_code_to_absolute_coordinates(relative_code, logical_screen_size=(1920, 1080)) # print(absolute_code)