Fix the width and height of vm, make agent perform more accurate

This commit is contained in:
Timothyxxx
2023-11-30 12:10:41 +08:00
parent ecb62d7eb4
commit e52ba2ab13
4 changed files with 134 additions and 62 deletions

View File

@@ -9,6 +9,7 @@ import gymnasium as gym
from gymnasium import spaces
import numpy as np
import uuid
from PIL import Image
from desktop_env.controllers.mouse import MouseClick, AbstractMouseController, XDoToolMouseController, \
PythonMouseController
@@ -39,8 +40,10 @@ class DesktopEnv(gym.Env):
username: str,
password: str = None,
host: str = "192.168.7.128:5000",
snapshot_path: str = "initial_state_with_env_set",
vm_os: VM_TYPE = "ubuntu"):
snapshot_path: str = "base",
vm_os: VM_TYPE = "ubuntu"
):
# The path to the vmx file of your vm
self.path_to_vm = path_to_vm
@@ -51,9 +54,13 @@ class DesktopEnv(gym.Env):
self.host = host
self.snapshot_path = snapshot_path # todo: handling the logic of snapshot directory
# TODO: get the screen width and height from the vm, or standardize it
self.screen_width = 800
self.screen_height = 800
# Initialize emulator
print("Initializing...")
self._start_emulator()
# Get the screen size
self.screen_width, self.screen_height = self._get_screensize()
# Define the action and observation space
self.action_space = spaces.Dict({
"action_type": spaces.Discrete(len(Action)),
@@ -70,13 +77,14 @@ class DesktopEnv(gym.Env):
# Additional setup
self.metadata = {'render.modes': ['rgb_array']}
# Initialize emulator
print("Initializing...")
self._start_emulator()
# set up controllers
self.mouse_controller, self.keyboard_controller = self._create_controllers(vm_os)
def _get_screensize(self):
screenshot_path = self._get_obs()
img = Image.open(screenshot_path)
return img.size
def _create_controllers(self, vm_os: VM_TYPE) -> Tuple[AbstractMouseController, AbstractKeyboardController]:
if vm_os == "ubuntu":
ssh_connection = Connection(host=self.host, user=self.username, connect_kwargs={"password": self.password})
@@ -145,7 +153,18 @@ class DesktopEnv(gym.Env):
return observation
def step(self, action):
action_type = Action(action['action_type'])
if isinstance(action, list):
for a in action:
observation, reward, done, info = self.step(a)
return observation, reward, done, info
# todo: handle the case when the action is not a single action
try:
action_type = Action(action['action_type'])
except KeyError:
done = True
return self._get_obs(), 0, done, {}
if action_type == Action.CLICK:
click = MouseClick(action['click_type'])
if click == MouseClick.LEFT:
@@ -185,17 +204,19 @@ class DesktopEnv(gym.Env):
elif action_type == Action.MOUSE_MOVE:
self.mouse_controller.mouse_move(x=action['x'], y=action['y'])
elif action_type == Action.KEY:
key_sequence = ''.join(map(chr, action['key'])) # Convert integer array to string
self.keyboard_controller.key(key_sequence)
self.keyboard_controller.key(action['key'])
elif action_type == Action.KEY_DOWN:
key_sequence = ''.join(map(chr, action['key'])) # Convert integer array to string
self.keyboard_controller.key_down(key_sequence)
self.keyboard_controller.key_down(action['key'])
elif action_type == Action.KEY_UP:
key_sequence = ''.join(map(chr, action['key'])) # Convert integer array to string
self.keyboard_controller.key_up(key_sequence)
self.keyboard_controller.key_up(action['key'])
elif action_type == Action.TYPE:
text = ''.join(map(chr, action['text'])) # Convert integer array to string
self.keyboard_controller.type(text)
for key in action['text']:
if key == "\r" or key == "\n":
self.keyboard_controller.key("enter")
else:
self.keyboard_controller.key(key)
# sleep for 0.05 seconds with some random noise
time.sleep(0.05 + np.random.normal(0, 0.01))
# Capture new state
observation = self._get_obs()

View File

@@ -2,14 +2,30 @@ import os
from pprint import pprint
from desktop_env.envs.desktop_env import DesktopEnv, Action, MouseClick
from mm_agents.gpt_4v_agent import GPT4v_Agent
import uuid
def gpt_4v_agent():
api_key = os.environ.get("OPENAI_API_KEY")
agent = GPT4v_Agent(api_key=api_key, instruction="Clear the recycle bin.")
# meta_info = {
# "instruction": "Open WSJ website to get latest news",
# "task_name": "open_wsj",
# "snapshot_path": "base",
# }
meta_info = {
"instruction": "Clear the recycle bin",
"task_name": "clean_recycle_bin",
"snapshot_path": "base",
}
agent = GPT4v_Agent(api_key=api_key, instruction=meta_info["instruction"])
env = DesktopEnv(
path_to_vm=r"""C:\Users\tianbaox\Documents\Virtual Machines\Win10\Win10.vmx""", # automitically load the snapshot and start the vm
path_to_vm=r"""C:\Users\tianbaox\Documents\Virtual Machines\Win10\Win10.vmx""",
# automitically load the snapshot and start the vm
# path_to_vm="/home/yuri/vmware/Ubuntu 64-bit/Ubuntu 64-bit.vmx",
snapshot_path="base",
username="tianbaox",
password="951753",
# host="192.168.7.128",
@@ -20,15 +36,34 @@ def gpt_4v_agent():
# reset the environment to certain snapshot
observation = env.reset()
done = False
time_idx = 0
# create a file_dir for this agent
file_dir = os.path.join("observations", str(uuid.uuid4()))
os.makedirs(file_dir, exist_ok=True)
# save the meta_info
with open(os.path.join(file_dir, "meta_info.json"), "w") as f:
f.write(str(meta_info))
f.write("\n")
while not done:
# todo: action needs to be redesigned, need to support multiple actions at one step
action = agent.predict(obs=observation)
print("Action:", action)
actions = agent.predict(obs=observation)
print("Actions:", actions)
with open(os.path.join(file_dir, "obs_{}.png".format(time_idx)), "wb") as f:
# copy the image in the path of observation to the file
with open(observation, "rb") as image_file:
f.write(image_file.read())
# save the actions
with open(os.path.join(file_dir, "actions_{}.json".format(time_idx)), "w") as f:
f.write(str(actions))
f.write("\n")
time_idx += 1
observation, reward, done, info = env.step(actions)
# fixme: step not working
observation, reward, done, info = env.step(action)
print("Observation:", observation)
print("Reward:", reward)
print("Info:", info)

View File

@@ -6,20 +6,24 @@ import json
import requests
from mm_agents.gpt_4v_prompt import SYS_PROMPT
# Function to encode the image
def encode_image(image_path):
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
def parse_action_from_string(input_string):
def parse_actions_from_string(input_string):
# Search for a JSON string within the input string
actions = []
matches = re.findall(r'```json\s+(.*?)\s+```', input_string, re.DOTALL)
if matches:
# Assuming there's only one match, parse the JSON string into a dictionary
try:
action_dict = json.loads(matches[0])
return action_dict
for match in matches:
action_dict = json.loads(match)
actions.append(action_dict)
return actions
except json.JSONDecodeError as e:
return f"Failed to parse JSON: {e}"
else:
@@ -27,17 +31,20 @@ def parse_action_from_string(input_string):
if matches:
# Assuming there's only one match, parse the JSON string into a dictionary
try:
action_dict = json.loads(matches[0])
return action_dict
for match in matches:
action_dict = json.loads(match)
actions.append(action_dict)
return actions
except json.JSONDecodeError as e:
return f"Failed to parse JSON: {e}"
else:
try:
action_dict = json.loads(input_string)
return action_dict
return [action_dict]
except json.JSONDecodeError as e:
raise ValueError("Invalid response format: " + input_string)
class GPT4v_Agent:
def __init__(self, api_key, instruction, model="gpt-4-vision-preview", max_tokens=300):
self.instruction = instruction
@@ -78,6 +85,10 @@ class GPT4v_Agent:
}
]
})
traj_to_show = []
for i in range(len(self.trajectory)):
traj_to_show.append(self.trajectory[i]["content"][0]["text"])
print("Trajectory:", traj_to_show)
payload = {
"model": self.model,
"messages": self.trajectory,
@@ -85,11 +96,15 @@ class GPT4v_Agent:
}
response = requests.post("https://api.openai.com/v1/chat/completions", headers=self.headers, json=payload)
action = self.parse_action(response.json()['choices'][0]['message']['content'])
try:
actions = self.parse_actions(response.json()['choices'][0]['message']['content'])
except:
print("Failed to parse action from response:", response.json()['choices'][0]['message']['content'])
actions = None
return action
return actions
def parse_action(self, response: str):
def parse_actions(self, response: str):
# response example
"""
```json
@@ -101,7 +116,7 @@ class GPT4v_Agent:
"""
# parse from the response
action = parse_action_from_string(response)
actions = parse_actions_from_string(response)
# add action into the trajectory
self.trajectory.append({
@@ -115,25 +130,28 @@ class GPT4v_Agent:
})
# parse action
parsed_action = {}
action_type = Action[action['action_type']].value
parsed_action["action_type"] = action_type
parsed_actions = []
for action in actions:
parsed_action = {}
action_type = Action[action['action_type']].value
parsed_action["action_type"] = action_type
if action_type == Action.CLICK.value or action_type == Action.MOUSE_DOWN.value or action_type == Action.MOUSE_UP.value:
parsed_action["click_type"] = MouseClick[action['click_type']].value
if action_type == Action.CLICK.value or action_type == Action.MOUSE_DOWN.value or action_type == Action.MOUSE_UP.value:
parsed_action["click_type"] = MouseClick[action['click_type']].value
if action_type == Action.MOUSE_MOVE.value:
parsed_action["x"] = action["x"]
parsed_action["y"] = action["y"]
if action_type == Action.MOUSE_MOVE.value:
parsed_action["x"] = action["x"]
parsed_action["y"] = action["y"]
# fixme: could these two actions be merged??
if action_type == Action.KEY.value:
parsed_action["key"] = [ord(c) for c in action["key"]]
if action_type == Action.KEY.value:
parsed_action["key"] = action["key"] # handle the condition of single key and multiple keys
if action_type == Action.TYPE.value:
parsed_action["text"] = [ord(c) for c in action["text"]]
if action_type == Action.TYPE.value:
parsed_action["text"] = action["text"]
return parsed_action
parsed_actions.append(parsed_action)
return parsed_actions
if __name__ == '__main__':
@@ -142,4 +160,3 @@ if __name__ == '__main__':
agent = GPT4v_Agent(api_key=api_key, instruction="Open Google Sheet")
print(agent.predict(obs="stackoverflow.png"))

View File

@@ -31,25 +31,24 @@ for example, format as:
"click_type": "LEFT"
}
```
- For [KEY, KEY_DOWN, KEY_UP, TYPE], you need to choose a(multiple) key(s) from the keyboard
- For [KEY, KEY_DOWN, KEY_UP], you need to choose a(multiple) key(s) from the keyboard
for example, format as:
```
{
"action_type": "KEY",
"key": "ctrl+c"
}
```
- For TYPE, you need to specify the text you want to type
for example, format as:
```
{
"action_type": "TYPE",
"text": [
"w",
"i",
"k",
"i",
"p",
"e",
"d",
"i",
"a"
]
"text": "hello world"
}
```
For every step, you should only return the action_type and the parameters of your action as a dict, without any other things. You MUST wrap the dict with backticks (\`).
You can predict multiple actions at one step, but you should only return one action for each step.
You MUST choose and ONLY CHOOSE from the action space above, otherwise your action will be considered as invalid and you will get a penalty.
"""