From f88331416cec04d84dea72f9945510834e89ca9b Mon Sep 17 00:00:00 2001 From: Timothyxxx <384084775@qq.com> Date: Sat, 20 Jan 2024 18:55:21 +0800 Subject: [PATCH] Refactor baselines code implementations --- ...nt_pure_text.py => experiment_a11y_tree.py | 20 ++- experiment_screenshot.py | 16 +- .../heuristic_retrieve.py | 36 ++-- mm_agents/gemini_pro_agent.py | 2 + mm_agents/gemini_pro_vision_agent.py | 2 + mm_agents/gpt_4v_agent.py | 167 ++++++++++++++---- mm_agents/prompts.py | 26 +-- 7 files changed, 204 insertions(+), 65 deletions(-) rename experiment_pure_text.py => experiment_a11y_tree.py (88%) diff --git a/experiment_pure_text.py b/experiment_a11y_tree.py similarity index 88% rename from experiment_pure_text.py rename to experiment_a11y_tree.py index 4fd19b1..728d0de 100644 --- a/experiment_pure_text.py +++ b/experiment_a11y_tree.py @@ -5,8 +5,7 @@ import os import sys from desktop_env.envs.desktop_env import DesktopEnv -from mm_agents.gpt_4_agent import GPT4_Agent -from mm_agents.gemini_pro_agent import GeminiPro_Agent +from mm_agents.gpt_4v_agent import GPT4v_Agent # Logger Configs {{{ # logger = logging.getLogger() @@ -113,24 +112,29 @@ def run_one_example(example, agent, max_steps=10, example_trajectory_dir="exp_tr if __name__ == "__main__": action_space = "pyautogui" example_class = "chrome" - example_id = "7a5a7856-f1b6-42a4-ade9-1ca81ca0f263" - gpt4_model = "gpt-4-1106-preview" + example_id = "7b6c7e24-c58a-49fc-a5bb-d57b80e5b4c3" + gpt4_model = "gpt-4-vision-preview" gemini_model = "gemini-pro-vision" + logger.info("Running example %s/%s", example_class, example_id) + logger.info("Using model %s", gpt4_model) + # logger.info("Using model %s", gemini_model) + with open(f"evaluation_examples/examples/{example_class}/{example_id}.json", "r") as f: example = json.load(f) example["snapshot"] = "exp_setup4" api_key = os.environ.get("OPENAI_API_KEY") - agent = GPT4_Agent(api_key=api_key, model=gpt4_model, instruction=example['instruction'], action_space=action_space) + agent = GPT4v_Agent(api_key=api_key, model=gpt4_model, instruction=example['instruction'], + action_space=action_space, exp="a11y_tree") # api_key = os.environ.get("GENAI_API_KEY") - # agent = GeminiPro_Agent(api_key=api_key, model=gemini_model, instruction=example['instruction'], action_space=action_space) + # agent = GeminiPro_Agent(api_key=api_key, model=gemini_model, instruction=example['instruction'], action_space=action_space, exp="a11y_tree") root_trajectory_dir = "exp_trajectory" - example_trajectory_dir = os.path.join(root_trajectory_dir, "text", example_class, gpt4_model, example_id) - # example_trajectory_dir = os.path.join(root_trajectory_dir, "text", example_class, gemini_model, example_id) + example_trajectory_dir = os.path.join(root_trajectory_dir, "a11y_tree", example_class, gpt4_model, example_id) + # example_trajectory_dir = os.path.join(root_trajectory_dir, "a11y_tree", example_class, gemini_model, example_id) os.makedirs(example_trajectory_dir, exist_ok=True) diff --git a/experiment_screenshot.py b/experiment_screenshot.py index 8e7f8b5..6d82730 100644 --- a/experiment_screenshot.py +++ b/experiment_screenshot.py @@ -113,20 +113,28 @@ if __name__ == "__main__": action_space = "pyautogui" example_class = "thunderbird" example_id = "bb5e4c0d-f964-439c-97b6-bdb9747de3f4" + gpt4_model = "gpt-4-vision-preview" + gemini_model = "gemini-pro-vision" + + logger.info("Running example %s/%s", example_class, example_id) + logger.info("Using model %s", gpt4_model) + # logger.info("Using model %s", gemini_model) with open(f"evaluation_examples/examples/{example_class}/{example_id}.json", "r") as f: example = json.load(f) example["snapshot"] = "exp_setup2" # api_key = os.environ.get("OPENAI_API_KEY") - # agent = GPT4v_Agent(api_key=api_key, instruction=example['instruction'], action_space=action_space) + # agent = GPT4v_Agent(api_key=api_key, instruction=example['instruction'], action_space=action_space, exp="screenshot") api_key = os.environ.get("GENAI_API_KEY") - agent = GeminiPro_Agent(api_key=api_key, instruction=example['instruction'], action_space=action_space) + agent = GeminiPro_Agent(api_key=api_key, instruction=example['instruction'], action_space=action_space, exp="screenshot") root_trajectory_dir = "exp_trajectory" - example_trajectory_dir = os.path.join(root_trajectory_dir, example_class, example_id) + example_trajectory_dir = os.path.join(root_trajectory_dir, "a11y_tree", example_class, gpt4_model, example_id) + # example_trajectory_dir = os.path.join(root_trajectory_dir, "a11y_tree", example_class, gemini_model, example_id) + os.makedirs(example_trajectory_dir, exist_ok=True) - run_one_example(example, agent, 10, example_trajectory_dir) + run_one_example(example, agent, 15, example_trajectory_dir) diff --git a/mm_agents/accessibility_tree_wrap/heuristic_retrieve.py b/mm_agents/accessibility_tree_wrap/heuristic_retrieve.py index c59060c..47bbca0 100644 --- a/mm_agents/accessibility_tree_wrap/heuristic_retrieve.py +++ b/mm_agents/accessibility_tree_wrap/heuristic_retrieve.py @@ -60,19 +60,19 @@ def draw_bounding_boxes(nodes, image_file_path, output_image_file_path): image = Image.open(image_file_path) draw = ImageDraw.Draw(image) marks = [] + drew_nodes = [] - # todo: change the image tagger to align with SoM paper - - # Optional: Load a font. If you don't specify a font, a default one will be used. try: # Adjust the path to the font file you have or use a default one - font = ImageFont.truetype("arial.ttf", 20) + font = ImageFont.truetype("arial.ttf", 15) except IOError: # Fallback to a basic font if the specified font can't be loaded font = ImageFont.load_default() + index = 1 + # Loop over all the visible nodes and draw their bounding boxes - for index, _node in enumerate(nodes): + for _node in nodes: coords_str = _node.attrib.get('{uri:deskat:component.at-spi.gnome.org}screencoord') size_str = _node.attrib.get('{uri:deskat:component.at-spi.gnome.org}size') @@ -93,22 +93,30 @@ def draw_bounding_boxes(nodes, image_file_path, output_image_file_path): if bottom_right[0] < coords[0] or bottom_right[1] < coords[1]: raise ValueError(f"Invalid coordinates or size, coords: {coords}, size: {size}") - # Draw rectangle on image - draw.rectangle([coords, bottom_right], outline="red", width=2) + # Check if the area only contains one color + cropped_image = image.crop((*coords, *bottom_right)) + if len(set(list(cropped_image.getdata()))) == 1: + continue - # Draw index number at the bottom left of the bounding box + # Draw rectangle on image + draw.rectangle([coords, bottom_right], outline="red", width=1) + + # Draw index number at the bottom left of the bounding box with black background text_position = (coords[0], bottom_right[1]) # Adjust Y to be above the bottom right - draw.text(text_position, str(index), font=font, fill="purple") + draw.rectangle([text_position, (text_position[0] + 25, text_position[1] + 18)], fill='black') + draw.text(text_position, str(index), font=font, fill="white") + index += 1 # each mark is an x, y, w, h tuple marks.append([coords[0], coords[1], size[0], size[1]]) + drew_nodes.append(_node) except ValueError as e: pass # Save the result image.save(output_image_file_path) - return marks + return marks, drew_nodes def print_nodes_with_indent(nodes, indent=0): @@ -120,6 +128,10 @@ def print_nodes_with_indent(nodes, indent=0): if __name__ == '__main__': with open('chrome_desktop_example_1.xml', 'r', encoding='utf-8') as f: xml_file_str = f.read() + filtered_nodes = filter_nodes(find_leaf_nodes(xml_file_str)) + print(len(filtered_nodes)) + masks = draw_bounding_boxes(filtered_nodes, 'screenshot.png', + 'chrome_desktop_example_1_tagged_remove.png', ) - nodes = ET.fromstring(xml_file_str) - print_nodes_with_indent(nodes) + # print(masks) + print(len(masks)) diff --git a/mm_agents/gemini_pro_agent.py b/mm_agents/gemini_pro_agent.py index 26f9c0e..ce84488 100644 --- a/mm_agents/gemini_pro_agent.py +++ b/mm_agents/gemini_pro_agent.py @@ -1,3 +1,5 @@ +# todo: needs to be refactored + import time from typing import Dict, List diff --git a/mm_agents/gemini_pro_vision_agent.py b/mm_agents/gemini_pro_vision_agent.py index 2d5d365..4a537db 100644 --- a/mm_agents/gemini_pro_vision_agent.py +++ b/mm_agents/gemini_pro_vision_agent.py @@ -1,3 +1,5 @@ +# todo: needs to be refactored + import time from typing import Dict, List diff --git a/mm_agents/gpt_4v_agent.py b/mm_agents/gpt_4v_agent.py index 6e2000c..896ff7e 100644 --- a/mm_agents/gpt_4v_agent.py +++ b/mm_agents/gpt_4v_agent.py @@ -2,7 +2,6 @@ import base64 import json import os import re -import time import uuid from typing import Dict, List @@ -54,9 +53,9 @@ def tag_screenshot(screenshot, accessibility_tree): tagged_screenshot_file_path = os.path.join("tmp/images", uuid_str + ".png") nodes = filter_nodes(find_leaf_nodes(accessibility_tree)) # Make tag screenshot - marks = draw_bounding_boxes(nodes, screenshot, tagged_screenshot_file_path) + marks, drew_nodes = draw_bounding_boxes(nodes, screenshot, tagged_screenshot_file_path) - return marks, tagged_screenshot_file_path + return marks, drew_nodes, tagged_screenshot_file_path def parse_actions_from_string(input_string): @@ -123,11 +122,18 @@ def parse_code_from_string(input_string): def parse_code_from_som_string(input_string, masks): + # parse the output string by masks + mappings = [] for i, mask in enumerate(masks): x, y, w, h = mask - input_string = input_string.replace("tag#" + str(i), "{}, {}".format(int(x + w // 2), int(y + h // 2))) + mappings.append(("tag#" + str(i + 1), "{}, {}".format(int(x + w // 2), int(y + h // 2)))) - return parse_code_from_string(input_string) + # reverse the mappings + for mapping in mappings[::-1]: + input_string = input_string.replace(mapping[0], mapping[1]) + + actions = parse_code_from_string(input_string) + return actions class GPT4v_Agent: @@ -136,7 +142,7 @@ class GPT4v_Agent: api_key, instruction, model="gpt-4-vision-preview", - max_tokens=300, + max_tokens=500, action_space="computer_13", exp="screenshot_a11y_tree" # exp can be in ["screenshot", "a11y_tree", "screenshot_a11y_tree", "som", "seeact"] @@ -147,6 +153,7 @@ class GPT4v_Agent: self.max_tokens = max_tokens self.action_space = action_space self.exp = exp + self.max_trajectory_length = 3 self.headers = { "Content-Type": "application/json", @@ -194,8 +201,8 @@ class GPT4v_Agent: else: raise ValueError("Invalid experiment type: " + exp) - self.system_message = (self.system_message + - "\nHere is the instruction for the task: {}".format(self.instruction)) + self.system_message = self.system_message + "\nYou are asked to complete the following task: {}".format( + self.instruction) def predict(self, obs: Dict) -> List: """ @@ -204,28 +211,111 @@ class GPT4v_Agent: # Prepare the payload for the API call messages = [] - - if len(self.actions) > 0: - system_message = self.system_message + "\nHere are the actions you have done so far:\n" + "\n->\n".join( - self.actions) - else: - system_message = self.system_message + masks = None messages.append({ "role": "system", "content": [ { "type": "text", - "text": system_message + "text": self.system_message }, ] }) - masks = None + # Append trajectory + assert len(self.observations) == len(self.actions), "The number of observations and actions should be the same." + + if len(self.observations) > self.max_trajectory_length: + _observations = self.observations[-self.max_trajectory_length:] + _actions = self.actions[-self.max_trajectory_length:] + else: + _observations = self.observations + _actions = self.actions + + for previous_obs, previous_action in zip(_observations, _actions): + + if self.exp in ["both", "som", "seeact"]: + _screenshot = previous_obs["screenshot"] + _linearized_accessibility_tree = previous_obs["accessibility_tree"] + + messages.append({ + "role": "user", + "content": [ + { + "type": "text", + "text": "Given the info from the tagged screenshot as below:\n{}\nWhat's the next step that you will do to help with the task?".format( + _linearized_accessibility_tree) + }, + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{_screenshot}", + "detail": "high" + } + } + ] + }) + elif self.exp == "screenshot": + _screenshot = previous_obs["screenshot"] + + messages.append({ + "role": "user", + "content": [ + { + "type": "text", + "text": "Given the screenshot as below. What's the next step that you will do to help with the task?" + }, + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{_screenshot}", + "detail": "high" + } + } + ] + }) + elif self.exp == "a11y_tree": + _linearized_accessibility_tree = previous_obs["accessibility_tree"] + + messages.append({ + "role": "user", + "content": [ + { + "type": "text", + "text": "Given the info from accessibility tree as below:\n{}\nWhat's the next step that you will do to help with the task?".format( + _linearized_accessibility_tree) + } + ] + }) + else: + raise ValueError("Invalid experiment type: " + self.exp) + + messages.append({ + "role": "assistant", + "content": [ + { + "type": "text", + "text": "\n".join(previous_action) if len(previous_action) > 0 else "No valid action" + }, + ] + }) if self.exp in ["screenshot", "both"]: base64_image = encode_image(obs["screenshot"]) linearized_accessibility_tree = linearize_accessibility_tree(accessibility_tree=obs["accessibility_tree"]) + + if self.exp == "both": + self.observations.append({ + "screenshot": base64_image, + "accessibility_tree": linearized_accessibility_tree + }) + else: + self.observations.append({ + "screenshot": base64_image, + "accessibility_tree": None + }) + messages.append({ "role": "user", "content": [ @@ -247,6 +337,12 @@ class GPT4v_Agent: }) elif self.exp == "a11y_tree": linearized_accessibility_tree = linearize_accessibility_tree(accessibility_tree=obs["accessibility_tree"]) + + self.observations.append({ + "screenshot": None, + "accessibility_tree": linearized_accessibility_tree + }) + messages.append({ "role": "user", "content": [ @@ -259,11 +355,15 @@ class GPT4v_Agent: }) elif self.exp == "som": # Add som to the screenshot - masks, tagged_screenshot = tag_screenshot(obs["screenshot"], obs["accessibility_tree"]) - + masks, drew_nodes, tagged_screenshot = tag_screenshot(obs["screenshot"], obs["accessibility_tree"]) base64_image = encode_image(tagged_screenshot) linearized_accessibility_tree = linearize_accessibility_tree(accessibility_tree=obs["accessibility_tree"]) + self.observations.append({ + "screenshot": base64_image, + "accessibility_tree": linearized_accessibility_tree + }) + messages.append({ "role": "user", "content": [ @@ -288,6 +388,11 @@ class GPT4v_Agent: base64_image = encode_image(tagged_screenshot) linearized_accessibility_tree = linearize_accessibility_tree(accessibility_tree=obs["accessibility_tree"]) + self.observations.append({ + "screenshot": base64_image, + "accessibility_tree": linearized_accessibility_tree + }) + messages.append({ "role": "user", "content": [ @@ -307,6 +412,9 @@ class GPT4v_Agent: else: raise ValueError("Invalid experiment type: " + self.exp) + with open("messages.json", "w") as f: + f.write(json.dumps(messages, indent=4)) + response = self.call_llm({ "model": self.model, "messages": messages, @@ -354,20 +462,17 @@ class GPT4v_Agent: (APIError, RateLimitError, APIConnectionError, ServiceUnavailableError, InvalidRequestError), ) def call_llm(self, payload): - while True: - try: - response = requests.post( - "https://api.openai.com/v1/chat/completions", - headers=self.headers, - json=payload - ) - break - except: - print("Failed to generate response, retrying...") - time.sleep(5) - pass + response = requests.post( + "https://api.openai.com/v1/chat/completions", + headers=self.headers, + json=payload + ) - return response.json()['choices'][0]['message']['content'] + if response.status_code != 200: + print("Failed to call LLM: " + response.text) + return "" + else: + return response.json()['choices'][0]['message']['content'] def parse_actions(self, response: str, masks=None): diff --git a/mm_agents/prompts.py b/mm_agents/prompts.py index dcc9a85..90ce22f 100644 --- a/mm_agents/prompts.py +++ b/mm_agents/prompts.py @@ -3,7 +3,7 @@ You are an agent which follow my instruction and perform desktop computer tasks You have good knowledge of computer and good internet connection and assume your code will run on a computer for controlling the mouse and keyboard. For each step, you will get an observation of an image, which is the screenshot of the computer screen and you will predict the action of the computer based on the image. -You are required to use `pyautogui` to perform the action. +You are required to use `pyautogui` to perform the action, but don't use the `pyautogui.locateCenterOnScreen` function to locate the element you want to operate with since we have no image of the element you want to operate with. Return one line or multiple lines of python code to perform the action each time, be time efficient. You ONLY need to return the code inside a code block, like this: ```python @@ -14,7 +14,7 @@ When you think you have to wait for some time, return ```WAIT```; When you think the task can not be done, return ```FAIL```, don't easily say ```FAIL```, try your best to do the task; When you think the task is done, return ```DONE```. -First give the current screenshot and previous things we did a reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE. +First give the current screenshot and previous things we did a short reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE. """.strip() SYS_PROMPT_IN_SCREENSHOT_OUT_ACTION = """ @@ -267,7 +267,7 @@ You are an agent which follow my instruction and perform desktop computer tasks You have good knowledge of computer and good internet connection and assume your code will run on a computer for controlling the mouse and keyboard. For each step, you will get an observation of the desktop by accessibility tree, which is based on AT-SPI library. And you will predict the action of the computer based on the accessibility tree. -You are required to use `pyautogui` to perform the action. +You are required to use `pyautogui` to perform the action, but don't use the `pyautogui.locateCenterOnScreen` function to locate the element you want to operate with since we have no image of the element you want to operate with. Return one line or multiple lines of python code to perform the action each time, be time efficient. You ONLY need to return the code inside a code block, like this: ```python @@ -278,7 +278,7 @@ When you think you have to wait for some time, return ```WAIT```; When you think the task can not be done, return ```FAIL```, don't easily say ```FAIL```, try your best to do the task; When you think the task is done, return ```DONE```. -First give the current screenshot and previous things we did a reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE. +First give the current screenshot and previous things we did a short reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE. """.strip() SYS_PROMPT_IN_A11Y_OUT_ACTION = """ @@ -532,7 +532,7 @@ You have good knowledge of computer and good internet connection and assume your For each step, you will get an observation of the desktop by 1) a screenshot; and 2) accessibility tree, which is based on AT-SPI library. And you will predict the action of the computer based on the screenshot and accessibility tree. -You are required to use `pyautogui` to perform the action. +You are required to use `pyautogui` to perform the action, but don't use the `pyautogui.locateCenterOnScreen` function to locate the element you want to operate with since we have no image of the element you want to operate with. Return one line or multiple lines of python code to perform the action each time, be time efficient. You ONLY need to return the code inside a code block, like this: ```python @@ -543,7 +543,7 @@ When you think you have to wait for some time, return ```WAIT```; When you think the task can not be done, return ```FAIL```, don't easily say ```FAIL```, try your best to do the task; When you think the task is done, return ```DONE```. -First give the current screenshot and previous things we did a reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE. +First give the current screenshot and previous things we did a short reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE. """.strip() SYS_PROMPT_IN_BOTH_OUT_ACTION = """ @@ -797,12 +797,15 @@ You are an agent which follow my instruction and perform desktop computer tasks You have good knowledge of computer and good internet connection and assume your code will run on a computer for controlling the mouse and keyboard. For each step, you will get an observation of the desktop by 1) a screenshot; and 2) accessibility tree, which is based on AT-SPI library. -You are required to use `pyautogui` to perform the action. But replace x, y in the code with the tag of the element you want to operate with. such as: +You are required to use `pyautogui` to perform the action, but don't use the `pyautogui.locateCenterOnScreen` function to locate the element you want to operate with since we have no image of the element you want to operate with. +You can replace x, y in the code with the tag of the element you want to operate with. such as: ```python pyautogui.moveTo(tag#3) pyautogui.click(tag#2) pyautogui.dragTo(tag#1, button='left') ``` +When you think you can directly output precise x and y coordinates or there is no tag on which you want to interact, you can also use them directly. +But you should be careful to ensure that the coordinates are correct. Return one line or multiple lines of python code to perform the action each time, be time efficient. You ONLY need to return the code inside a code block, like this: ```python @@ -813,7 +816,7 @@ When you think you have to wait for some time, return ```WAIT```; When you think the task can not be done, return ```FAIL```, don't easily say ```FAIL```, try your best to do the task; When you think the task is done, return ```DONE```. -First give the current screenshot and previous things we did a reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE. +First give the current screenshot and previous things we did a short reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE. """.strip() SYS_PROMPT_SEEACT = """ @@ -842,12 +845,15 @@ Then, based on your analysis, in conjunction with human desktop using habits and """ ACTION_GROUNDING_PROMPT_SEEACT = """ -You are required to use `pyautogui` to perform the action. But replace x, y in the code with the tag of the element you want to operate with. such as: +You are required to use `pyautogui` to perform the action, but don't use the `pyautogui.locateCenterOnScreen` function to locate the element you want to operate with since we have no image of the element you want to operate with. +You can replace x, y in the code with the tag of the element you want to operate with. such as: ```python pyautogui.moveTo(tag#3) pyautogui.click(tag#2) pyautogui.dragTo(tag#1, button='left') ``` +When you think you can directly output precise x and y coordinates or there is no tag on which you want to interact, you can also use them directly. +But you should be careful to ensure that the coordinates are correct. Return one line or multiple lines of python code to perform the action each time, be time efficient. You ONLY need to return the code inside a code block, like this: ```python @@ -858,5 +864,5 @@ When you think you have to wait for some time, return ```WAIT```; When you think the task can not be done, return ```FAIL```, don't easily say ```FAIL```, try your best to do the task; When you think the task is done, return ```DONE```. -First give the current screenshot and previous things we did a reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE. +First give the current screenshot and previous things we did a short reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE. """