From e52ba2ab13542c16db24de595634b78d5c87bc5d Mon Sep 17 00:00:00 2001 From: Timothyxxx <384084775@qq.com> Date: Thu, 30 Nov 2023 12:10:41 +0800 Subject: [PATCH] Fix the width and height of vm, make agent perform more accurate --- desktop_env/envs/desktop_env.py | 57 +++++++++++++++++++--------- gpt_4v_agent_exp.py | 49 ++++++++++++++++++++---- mm_agents/gpt_4v_agent.py | 67 +++++++++++++++++++++------------ mm_agents/gpt_4v_prompt.py | 23 ++++++----- 4 files changed, 134 insertions(+), 62 deletions(-) diff --git a/desktop_env/envs/desktop_env.py b/desktop_env/envs/desktop_env.py index bbf0620..8755fdc 100644 --- a/desktop_env/envs/desktop_env.py +++ b/desktop_env/envs/desktop_env.py @@ -9,6 +9,7 @@ import gymnasium as gym from gymnasium import spaces import numpy as np import uuid +from PIL import Image from desktop_env.controllers.mouse import MouseClick, AbstractMouseController, XDoToolMouseController, \ PythonMouseController @@ -39,8 +40,10 @@ class DesktopEnv(gym.Env): username: str, password: str = None, host: str = "192.168.7.128:5000", - snapshot_path: str = "initial_state_with_env_set", - vm_os: VM_TYPE = "ubuntu"): + snapshot_path: str = "base", + vm_os: VM_TYPE = "ubuntu" + ): + # The path to the vmx file of your vm self.path_to_vm = path_to_vm @@ -51,9 +54,13 @@ class DesktopEnv(gym.Env): self.host = host self.snapshot_path = snapshot_path # todo: handling the logic of snapshot directory - # TODO: get the screen width and height from the vm, or standardize it - self.screen_width = 800 - self.screen_height = 800 + # Initialize emulator + print("Initializing...") + self._start_emulator() + + # Get the screen size + self.screen_width, self.screen_height = self._get_screensize() + # Define the action and observation space self.action_space = spaces.Dict({ "action_type": spaces.Discrete(len(Action)), @@ -70,13 +77,14 @@ class DesktopEnv(gym.Env): # Additional setup self.metadata = {'render.modes': ['rgb_array']} - # Initialize emulator - print("Initializing...") - self._start_emulator() - # set up controllers self.mouse_controller, self.keyboard_controller = self._create_controllers(vm_os) + def _get_screensize(self): + screenshot_path = self._get_obs() + img = Image.open(screenshot_path) + return img.size + def _create_controllers(self, vm_os: VM_TYPE) -> Tuple[AbstractMouseController, AbstractKeyboardController]: if vm_os == "ubuntu": ssh_connection = Connection(host=self.host, user=self.username, connect_kwargs={"password": self.password}) @@ -145,7 +153,18 @@ class DesktopEnv(gym.Env): return observation def step(self, action): - action_type = Action(action['action_type']) + if isinstance(action, list): + for a in action: + observation, reward, done, info = self.step(a) + return observation, reward, done, info + + # todo: handle the case when the action is not a single action + try: + action_type = Action(action['action_type']) + except KeyError: + done = True + return self._get_obs(), 0, done, {} + if action_type == Action.CLICK: click = MouseClick(action['click_type']) if click == MouseClick.LEFT: @@ -185,17 +204,19 @@ class DesktopEnv(gym.Env): elif action_type == Action.MOUSE_MOVE: self.mouse_controller.mouse_move(x=action['x'], y=action['y']) elif action_type == Action.KEY: - key_sequence = ''.join(map(chr, action['key'])) # Convert integer array to string - self.keyboard_controller.key(key_sequence) + self.keyboard_controller.key(action['key']) elif action_type == Action.KEY_DOWN: - key_sequence = ''.join(map(chr, action['key'])) # Convert integer array to string - self.keyboard_controller.key_down(key_sequence) + self.keyboard_controller.key_down(action['key']) elif action_type == Action.KEY_UP: - key_sequence = ''.join(map(chr, action['key'])) # Convert integer array to string - self.keyboard_controller.key_up(key_sequence) + self.keyboard_controller.key_up(action['key']) elif action_type == Action.TYPE: - text = ''.join(map(chr, action['text'])) # Convert integer array to string - self.keyboard_controller.type(text) + for key in action['text']: + if key == "\r" or key == "\n": + self.keyboard_controller.key("enter") + else: + self.keyboard_controller.key(key) + # sleep for 0.05 seconds with some random noise + time.sleep(0.05 + np.random.normal(0, 0.01)) # Capture new state observation = self._get_obs() diff --git a/gpt_4v_agent_exp.py b/gpt_4v_agent_exp.py index fe78970..bae0446 100644 --- a/gpt_4v_agent_exp.py +++ b/gpt_4v_agent_exp.py @@ -2,14 +2,30 @@ import os from pprint import pprint from desktop_env.envs.desktop_env import DesktopEnv, Action, MouseClick from mm_agents.gpt_4v_agent import GPT4v_Agent +import uuid def gpt_4v_agent(): api_key = os.environ.get("OPENAI_API_KEY") - agent = GPT4v_Agent(api_key=api_key, instruction="Clear the recycle bin.") + + # meta_info = { + # "instruction": "Open WSJ website to get latest news", + # "task_name": "open_wsj", + # "snapshot_path": "base", + # } + + meta_info = { + "instruction": "Clear the recycle bin", + "task_name": "clean_recycle_bin", + "snapshot_path": "base", + } + + agent = GPT4v_Agent(api_key=api_key, instruction=meta_info["instruction"]) env = DesktopEnv( - path_to_vm=r"""C:\Users\tianbaox\Documents\Virtual Machines\Win10\Win10.vmx""", # automitically load the snapshot and start the vm + path_to_vm=r"""C:\Users\tianbaox\Documents\Virtual Machines\Win10\Win10.vmx""", + # automitically load the snapshot and start the vm # path_to_vm="/home/yuri/vmware/Ubuntu 64-bit/Ubuntu 64-bit.vmx", + snapshot_path="base", username="tianbaox", password="951753", # host="192.168.7.128", @@ -20,15 +36,34 @@ def gpt_4v_agent(): # reset the environment to certain snapshot observation = env.reset() done = False + time_idx = 0 + + # create a file_dir for this agent + file_dir = os.path.join("observations", str(uuid.uuid4())) + os.makedirs(file_dir, exist_ok=True) + + # save the meta_info + with open(os.path.join(file_dir, "meta_info.json"), "w") as f: + f.write(str(meta_info)) + f.write("\n") while not done: - # todo: action needs to be redesigned, need to support multiple actions at one step - action = agent.predict(obs=observation) - print("Action:", action) + actions = agent.predict(obs=observation) + print("Actions:", actions) + with open(os.path.join(file_dir, "obs_{}.png".format(time_idx)), "wb") as f: + # copy the image in the path of observation to the file + with open(observation, "rb") as image_file: + f.write(image_file.read()) + + # save the actions + with open(os.path.join(file_dir, "actions_{}.json".format(time_idx)), "w") as f: + f.write(str(actions)) + f.write("\n") + + time_idx += 1 + observation, reward, done, info = env.step(actions) - # fixme: step not working - observation, reward, done, info = env.step(action) print("Observation:", observation) print("Reward:", reward) print("Info:", info) diff --git a/mm_agents/gpt_4v_agent.py b/mm_agents/gpt_4v_agent.py index fdbf4c7..c52b9c9 100644 --- a/mm_agents/gpt_4v_agent.py +++ b/mm_agents/gpt_4v_agent.py @@ -6,20 +6,24 @@ import json import requests from mm_agents.gpt_4v_prompt import SYS_PROMPT + # Function to encode the image def encode_image(image_path): with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') -def parse_action_from_string(input_string): +def parse_actions_from_string(input_string): # Search for a JSON string within the input string + actions = [] matches = re.findall(r'```json\s+(.*?)\s+```', input_string, re.DOTALL) if matches: # Assuming there's only one match, parse the JSON string into a dictionary try: - action_dict = json.loads(matches[0]) - return action_dict + for match in matches: + action_dict = json.loads(match) + actions.append(action_dict) + return actions except json.JSONDecodeError as e: return f"Failed to parse JSON: {e}" else: @@ -27,17 +31,20 @@ def parse_action_from_string(input_string): if matches: # Assuming there's only one match, parse the JSON string into a dictionary try: - action_dict = json.loads(matches[0]) - return action_dict + for match in matches: + action_dict = json.loads(match) + actions.append(action_dict) + return actions except json.JSONDecodeError as e: return f"Failed to parse JSON: {e}" else: try: action_dict = json.loads(input_string) - return action_dict + return [action_dict] except json.JSONDecodeError as e: raise ValueError("Invalid response format: " + input_string) + class GPT4v_Agent: def __init__(self, api_key, instruction, model="gpt-4-vision-preview", max_tokens=300): self.instruction = instruction @@ -78,6 +85,10 @@ class GPT4v_Agent: } ] }) + traj_to_show = [] + for i in range(len(self.trajectory)): + traj_to_show.append(self.trajectory[i]["content"][0]["text"]) + print("Trajectory:", traj_to_show) payload = { "model": self.model, "messages": self.trajectory, @@ -85,11 +96,15 @@ class GPT4v_Agent: } response = requests.post("https://api.openai.com/v1/chat/completions", headers=self.headers, json=payload) - action = self.parse_action(response.json()['choices'][0]['message']['content']) + try: + actions = self.parse_actions(response.json()['choices'][0]['message']['content']) + except: + print("Failed to parse action from response:", response.json()['choices'][0]['message']['content']) + actions = None - return action + return actions - def parse_action(self, response: str): + def parse_actions(self, response: str): # response example """ ```json @@ -101,7 +116,7 @@ class GPT4v_Agent: """ # parse from the response - action = parse_action_from_string(response) + actions = parse_actions_from_string(response) # add action into the trajectory self.trajectory.append({ @@ -115,25 +130,28 @@ class GPT4v_Agent: }) # parse action - parsed_action = {} - action_type = Action[action['action_type']].value - parsed_action["action_type"] = action_type + parsed_actions = [] + for action in actions: + parsed_action = {} + action_type = Action[action['action_type']].value + parsed_action["action_type"] = action_type - if action_type == Action.CLICK.value or action_type == Action.MOUSE_DOWN.value or action_type == Action.MOUSE_UP.value: - parsed_action["click_type"] = MouseClick[action['click_type']].value + if action_type == Action.CLICK.value or action_type == Action.MOUSE_DOWN.value or action_type == Action.MOUSE_UP.value: + parsed_action["click_type"] = MouseClick[action['click_type']].value - if action_type == Action.MOUSE_MOVE.value: - parsed_action["x"] = action["x"] - parsed_action["y"] = action["y"] + if action_type == Action.MOUSE_MOVE.value: + parsed_action["x"] = action["x"] + parsed_action["y"] = action["y"] - # fixme: could these two actions be merged?? - if action_type == Action.KEY.value: - parsed_action["key"] = [ord(c) for c in action["key"]] + if action_type == Action.KEY.value: + parsed_action["key"] = action["key"] # handle the condition of single key and multiple keys - if action_type == Action.TYPE.value: - parsed_action["text"] = [ord(c) for c in action["text"]] + if action_type == Action.TYPE.value: + parsed_action["text"] = action["text"] - return parsed_action + parsed_actions.append(parsed_action) + + return parsed_actions if __name__ == '__main__': @@ -142,4 +160,3 @@ if __name__ == '__main__': agent = GPT4v_Agent(api_key=api_key, instruction="Open Google Sheet") print(agent.predict(obs="stackoverflow.png")) - diff --git a/mm_agents/gpt_4v_prompt.py b/mm_agents/gpt_4v_prompt.py index bfe5430..11705e3 100644 --- a/mm_agents/gpt_4v_prompt.py +++ b/mm_agents/gpt_4v_prompt.py @@ -31,25 +31,24 @@ for example, format as: "click_type": "LEFT" } ``` -- For [KEY, KEY_DOWN, KEY_UP, TYPE], you need to choose a(multiple) key(s) from the keyboard +- For [KEY, KEY_DOWN, KEY_UP], you need to choose a(multiple) key(s) from the keyboard +for example, format as: +``` +{ + "action_type": "KEY", + "key": "ctrl+c" +} +``` +- For TYPE, you need to specify the text you want to type for example, format as: ``` { "action_type": "TYPE", - "text": [ - "w", - "i", - "k", - "i", - "p", - "e", - "d", - "i", - "a" - ] + "text": "hello world" } ``` For every step, you should only return the action_type and the parameters of your action as a dict, without any other things. You MUST wrap the dict with backticks (\`). +You can predict multiple actions at one step, but you should only return one action for each step. You MUST choose and ONLY CHOOSE from the action space above, otherwise your action will be considered as invalid and you will get a penalty. """ \ No newline at end of file