Add 'WAIT', 'FAIL', 'DONE' to the action space; Debug basic prompting-based GPT-4 and Gemini agents; Initialize experiments script;

This commit is contained in:
Timothyxxx
2024-01-14 23:36:19 +08:00
parent d52b692ee5
commit f153a4c253
10 changed files with 482 additions and 144 deletions

View File

@@ -237,6 +237,9 @@ class PythonController:
keys_para_rep = "', '".join(keys)
self.execute_python_command(f"pyautogui.hotkey('{keys_para_rep}')")
elif action_type in ['WAIT', 'FAIL', 'DONE']:
pass
else:
raise Exception(f"Unknown action type: {action_type}")

View File

@@ -186,5 +186,18 @@ ACTION_SPACE = [
"optional": False,
}
}
},
############################################################################################################
{
"action_type": "WAIT",
"note": "wait until the next action",
},
{
"action_type": "FAIL",
"note": "decide the task can not be performed",
},
{
"action_type": "DONE",
"note": "decide the task is done",
}
]

View File

@@ -1,28 +1,30 @@
from __future__ import annotations
import logging
import os
import subprocess
import tempfile
import time
from typing import Callable, Any, Optional
# import uuid
# import platform
from typing import List, Dict
from typing import Callable, Any, Optional
import tempfile
import gymnasium as gym
# import requests
from desktop_env.controllers.python import PythonController
from desktop_env.controllers.setup import SetupController
# from desktop_env.evaluators import eval_funcs
from desktop_env.evaluators import metrics, getters
import logging
# import requests
logger = logging.getLogger("desktopenv.env")
Metric = Callable[[Any, Any], float]
Getter = Callable[[gym.Env, Dict[str, Any]], Any]
def _execute_command(command: List[str]) -> None:
if command[:4] == ["vmrun", "-T", "ws", "start"]:
p = subprocess.Popen(command)
@@ -84,8 +86,8 @@ class DesktopEnv(gym.Env):
self.setup_controller = SetupController(vm_ip=self.vm_ip, cache_dir=self.cache_dir)
# Meta info of the VM, move to the reset() function
self.vm_platform: str = "" # self.controller.get_vm_platform()
self.vm_screen_size = None # self.controller.get_vm_screen_size()
self.vm_platform: str = "" # self.controller.get_vm_platform()
self.vm_screen_size = None # self.controller.get_vm_screen_size()
# mode: human or machine
assert action_space in ["computer_13", "pyautogui"]
@@ -164,7 +166,7 @@ class DesktopEnv(gym.Env):
self.evaluator["expected"]["type"])) if "expected" in self.evaluator else None
self.metric_options: Dict[str, Any] = self.evaluator.get("options", {})
def reset(self, task_config: Optional[Dict[str, Any]] = None, seed=None, options=None):
def reset(self, task_config: Optional[Dict[str, Any]] = None, seed=None, options=None) -> Dict[str, Any]:
logger.info("Resetting environment...")
logger.info("Switching task...")
@@ -202,11 +204,27 @@ class DesktopEnv(gym.Env):
time.sleep(5)
logger.info("Environment setup complete.")
observation = self._get_obs()
observation = {"screenshot": self._get_obs()}
return observation
def step(self, action, pause=0.5):
self._step_no += 1
self.action_history.append(action)
reward = 0 # todo: Define reward calculation for each example
done = False # todo: Define episode termination condition for each example
info = {}
# handle the special actions
if action in ['WAIT', 'FAIL', 'DONE']:
if action == 'WAIT':
time.sleep(pause)
elif action == 'FAIL':
done = True
info = {"fail": True}
elif action == 'DONE':
done = True
info = {"done": True}
# fixme: add reminding logic here, decide if the action is valid for the current action_space
if self.action_space == "computer_13":
@@ -215,19 +233,14 @@ class DesktopEnv(gym.Env):
elif self.action_space == "pyautogui":
# the set of all possible python commands insides `pyautogui`
self.controller.execute_python_command(action)
self.action_history.append(action)
# todo: maybe for the better here we need to add a logic to wait until the rendering is done
time.sleep(pause)
observation = {
"screenshot": self._get_obs(),
"accessibility_tree": self.controller.get_accessibility_tree(),
"terminal": self.controller.get_terminal_output(),
"instruction": self.instruction
}
reward = 0 # todo: Define reward calculation for each example
done = False # todo: Define episode termination condition for each example
info = {}
return observation, reward, done, info
def evaluate(self):

104
experiment.py Normal file
View File

@@ -0,0 +1,104 @@
import datetime
import json
import logging
import os
import sys
from desktop_env.envs.desktop_env import DesktopEnv
from mm_agents.gpt_4v_agent import GPT4v_Agent
# Logger Configs {{{ #
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
file_handler = logging.FileHandler(os.path.join("logs", "normal-{:}.log".format(datetime_str)), encoding="utf-8")
debug_handler = logging.FileHandler(os.path.join("logs", "debug-{:}.log".format(datetime_str)), encoding="utf-8")
stdout_handler = logging.StreamHandler(sys.stdout)
sdebug_handler = logging.FileHandler(os.path.join("logs", "sdebug-{:}.log".format(datetime_str)), encoding="utf-8")
file_handler.setLevel(logging.INFO)
debug_handler.setLevel(logging.DEBUG)
stdout_handler.setLevel(logging.INFO)
sdebug_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s")
file_handler.setFormatter(formatter)
debug_handler.setFormatter(formatter)
stdout_handler.setFormatter(formatter)
sdebug_handler.setFormatter(formatter)
stdout_handler.addFilter(logging.Filter("desktopenv"))
sdebug_handler.addFilter(logging.Filter("desktopenv"))
logger.addHandler(file_handler)
logger.addHandler(debug_handler)
logger.addHandler(stdout_handler)
logger.addHandler(sdebug_handler)
# }}} Logger Configs #
logger = logging.getLogger("desktopenv.experiment")
PATH_TO_VM = r"C:\Users\tianbaox\Documents\Virtual Machines\Ubuntu\Ubuntu.vmx"
def run_one_example(example, agent, max_steps=20, example_trajectory_dir="exp_trajectory"):
trajectory_recording_path = os.path.join(example_trajectory_dir, "trajectory.json")
env = DesktopEnv(
path_to_vm=PATH_TO_VM,
action_space=agent.action_space,
task_config=example
)
# reset the environment to certain snapshot
observation = env.reset()
observation['instruction'] = example['instruction']
done = False
step_num = 0
# todo: save the screenshots and actions to a folder
while not done and step_num < max_steps:
actions = agent.predict(observation)
for action in actions:
observation, reward, done, info = env.step(action)
observation['instruction'] = example['instruction']
step_num += 1
logger.info("Step %d", step_num)
logger.info("Action: %s", actions)
observation.pop("accessibility_tree")
logger.info("Observation: %s", observation)
logger.info("Reward: %.2f", reward)
logger.info("Info: %s", info)
logger.info("================================\n")
if done:
logger.info("The episode is done.")
break
result = env.evaluate()
logger.info("Result: %.2f", result)
# env.close()
logger.info("Environment closed.")
if __name__ == "__main__":
action_space = "pyautogui"
example_class = "vlc"
example_id = "8f080098-ddb1-424c-b438-4e96e5e4786e"
with open(f"evaluation_examples/examples/{example_class}/{example_id}.json", "r") as f:
example = json.load(f)
example["snapshot"] = "chrome_setup"
api_key = os.environ.get("OPENAI_API_KEY")
agent = GPT4v_Agent(api_key=api_key, action_space=action_space)
root_trajectory_dir = "exp_trajectory"
example_trajectory_dir = os.path.join(root_trajectory_dir, example_class, example_id)
os.makedirs(example_trajectory_dir, exist_ok=True)
run_one_example(example, agent, 20, example_trajectory_dir)

84
mm_agents/gemini_agent.py Normal file
View File

@@ -0,0 +1,84 @@
from typing import Dict
import PIL.Image
import google.generativeai as genai
from mm_agents.gpt_4v_agent import parse_actions_from_string, parse_code_from_string
from mm_agents.gpt_4v_prompt_action import SYS_PROMPT as SYS_PROMPT_ACTION
from mm_agents.gpt_4v_prompt_code import SYS_PROMPT as SYS_PROMPT_CODE
class GeminiPro_Agent:
def __init__(self, api_key, model='gemini-pro-vision', max_tokens=300, action_space="computer_13"):
genai.configure(api_key)
self.model = genai.GenerativeModel(model)
self.max_tokens = max_tokens
self.action_space = action_space
self.trajectory = [
{
"role": "system",
"parts": [
{
"computer_13": SYS_PROMPT_ACTION,
"pyautogui": SYS_PROMPT_CODE
}[action_space]
]
}
]
def predict(self, obs: Dict):
"""
Predict the next action(s) based on the current observation.
"""
img = PIL.Image.open(obs["screenshot"])
self.trajectory.append({
"role": "user",
"parts": ["To accomplish the task '{}' and given the current screenshot, what's the next step?".format(
obs["instruction"]), img]
})
traj_to_show = []
for i in range(len(self.trajectory)):
traj_to_show.append(self.trajectory[i]["parts"][0])
if len(self.trajectory[i]["parts"]) > 1:
traj_to_show.append("screenshot_obs")
print("Trajectory:", traj_to_show)
response = self.model.generate_content(self.trajectory, max_tokens=self.max_tokens)
try:
# fixme: change to fit the new response format from gemini pro
actions = self.parse_actions(response.json()['choices'][0]['message']['content'])
except:
# todo: add error handling
print("Failed to parse action from response:", response.json()['choices'][0]['message']['content'])
actions = None
return actions
def parse_actions(self, response: str):
# response example
"""
```json
{
"action_type": "CLICK",
"click_type": "RIGHT"
}
```
"""
# parse from the response
if self.action_space == "computer_13":
actions = parse_actions_from_string(response)
elif self.action_space == "pyautogui":
actions = parse_code_from_string(response)
# add action into the trajectory
self.trajectory.append({
"role": "assistant",
"parts": [response]
})
return actions

View File

@@ -1,19 +0,0 @@
import PIL.Image
import google.generativeai as genai
genai.configure(api_key="AIzaSyANsETKHVo-D8jZu1SnTSaQgLOJEDgnj9Q")
# for m in genai.list_models():
# if 'generateContent' in m.supported_generation_methods:
# print(m.name)
model = genai.GenerativeModel('gemini-pro-vision')
img = PIL.Image.open('image.jpg')
messages = [
{'role':'user',
'parts': ["Explain this image.", img]}
]
response = model.generate_content(messages)

View File

@@ -1,12 +1,12 @@
# fixme: Need to be rewrite on new action space
import os
import re
import base64
from desktop_env.envs.desktop_env import Action, MouseClick
import json
import re
from typing import Dict
import requests
from mm_agents.gpt_4v_prompt import SYS_PROMPT
from mm_agents.gpt_4v_prompt_action import SYS_PROMPT as SYS_PROMPT_ACTION
from mm_agents.gpt_4v_prompt_code import SYS_PROMPT as SYS_PROMPT_CODE
# Function to encode the image
@@ -47,11 +47,26 @@ def parse_actions_from_string(input_string):
raise ValueError("Invalid response format: " + input_string)
def parse_code_from_string(input_string):
# This regular expression will match both ```code``` and ```python code```
# and capture the `code` part. It uses a non-greedy match for the content inside.
pattern = r"```(?:\w+\s+)?(.*?)```"
# Find all non-overlapping matches in the string
matches = re.findall(pattern, input_string, re.DOTALL)
# The regex above captures the content inside the triple backticks.
# The `re.DOTALL` flag allows the dot `.` to match newline characters as well,
# so the code inside backticks can span multiple lines.
# matches now contains all the captured code snippets
return matches
class GPT4v_Agent:
def __init__(self, api_key, instruction, model="gpt-4-vision-preview", max_tokens=300):
self.instruction = instruction
def __init__(self, api_key, model="gpt-4-vision-preview", max_tokens=300, action_space="computer_13"):
self.model = model
self.max_tokens = max_tokens
self.action_space = action_space
self.headers = {
"Content-Type": "application/json",
@@ -64,20 +79,27 @@ class GPT4v_Agent:
"content": [
{
"type": "text",
"text": SYS_PROMPT
"text": {
"computer_13": SYS_PROMPT_ACTION,
"pyautogui": SYS_PROMPT_CODE
}[action_space]
},
]
}
]
def predict(self, obs):
base64_image = encode_image(obs)
def predict(self, obs: Dict):
"""
Predict the next action(s) based on the current observation.
"""
base64_image = encode_image(obs["screenshot"])
self.trajectory.append({
"role": "user",
"content": [
{
"type": "text",
"text": "What's the next step for instruction '{}'?".format(self.instruction)
"text": "To accomplish the task '{}' and given the current screenshot, what's the next step?".format(
obs["instruction"])
},
{
"type": "image_url",
@@ -87,12 +109,15 @@ class GPT4v_Agent:
}
]
})
traj_to_show = []
for i in range(len(self.trajectory)):
traj_to_show.append(self.trajectory[i]["content"][0]["text"])
if len(self.trajectory[i]["content"]) > 1:
traj_to_show.append("screenshot_obs")
print("Trajectory:", traj_to_show)
payload = {
"model": self.model,
"messages": self.trajectory,
@@ -103,6 +128,7 @@ class GPT4v_Agent:
try:
actions = self.parse_actions(response.json()['choices'][0]['message']['content'])
except:
# todo: add error handling
print("Failed to parse action from response:", response.json()['choices'][0]['message']['content'])
actions = None
@@ -120,7 +146,10 @@ class GPT4v_Agent:
"""
# parse from the response
actions = parse_actions_from_string(response)
if self.action_space == "computer_13":
actions = parse_actions_from_string(response)
elif self.action_space == "pyautogui":
actions = parse_code_from_string(response)
# add action into the trajectory
self.trajectory.append({
@@ -133,34 +162,4 @@ class GPT4v_Agent:
]
})
# parse action
parsed_actions = []
for action in actions:
parsed_action = {}
action_type = Action[action['action_type']].value
parsed_action["action_type"] = action_type
if action_type == Action.CLICK.value or action_type == Action.MOUSE_DOWN.value or action_type == Action.MOUSE_UP.value:
parsed_action["click_type"] = MouseClick[action['click_type']].value
if action_type == Action.MOUSE_MOVE.value:
parsed_action["x"] = action["x"]
parsed_action["y"] = action["y"]
if action_type == Action.KEY.value:
parsed_action["key"] = action["key"] # handle the condition of single key and multiple keys
if action_type == Action.TYPE.value:
parsed_action["text"] = action["text"]
parsed_actions.append(parsed_action)
return parsed_actions
if __name__ == '__main__':
# OpenAI API Key
api_key = os.environ.get("OPENAI_API_KEY")
agent = GPT4v_Agent(api_key=api_key, instruction="Open Google Sheet")
print(agent.predict(obs="stackoverflow.png"))
return actions

View File

@@ -1,52 +0,0 @@
You will act as an agent which follow my instruction and perform desktop computer tasks as instructed. You must have good knowledge of computer and good internet connection.
For each step, you will get an observation of an image, which is the screenshot of the computer screen. And you will predict the action of the computer based on the image.
Here is the description of the action space:
Firstly you need to predict the class of your action, select from one below:
- **MOUSE_MOVE**: move the mouse to a specific position
- **CLICK**: click on the screen
- **MOUSE_DOWN**: press the mouse button
- **MOUSE_UP**: release the mouse button
- **KEY**: press a key on the keyboard
- **KEY_DOWN**: press a key on the keyboard
- **KEY_UP**: release a key on the keyboard
- **TYPE**: type a string on the keyboard
Then you need to predict the parameters of your action:
- For MOUSE_MOVE, you need to predict the x and y coordinate of the mouse cursor
for example, format as:
```
{
"action_type": "MOUSE_MOVE",
"x": 1319.11,
"y": 65.06
}
```
- For [CLICK, MOUSE_DOWN, MOUSE_UP], you need to specify the click_type as well, select from [LEFT, MIDDLE, RIGHT, WHEEL_UP, WHEEL_DOWN], which means you click the left button, middle button, right button, wheel up or wheel down of your mouse:
for example, format as:
```
{
"action_type": "CLICK",
"click_type": "LEFT"
}
```
- For [KEY, KEY_DOWN, KEY_UP, TYPE], you need to choose a(multiple) key(s) from the keyboard, select from [A-Z, 0-9, F1-F12, ESC, TAB, ENTER, SPACE, BACKSPACE, SHIFT, CTRL, ALT, UP, DOWN, LEFT, RIGHT, CAPSLOCK, NUMLOCK, SCROLLLOCK, INSERT, DELETE, HOME, END, PAGEUP, PAGEDOWN]:
for example, format as:
```
{
"action_type": "TYPE",
"text": [
"w",
"i",
"k",
"i",
"p",
"e",
"d",
"i",
"a"
]
}
```
For every setup, you should only return the action_type and the parameters of your action as a dict, without any other things.

View File

@@ -1,19 +1,207 @@
SYS_PROMPT = """
You will act as an agent which follow my instruction and perform desktop computer tasks as instructed. You must have good knowledge of computer and good internet connection.
For each step, you will get an observation of an image, which is the screenshot of the computer screen. And you will predict the action of the computer based on the image.
Here is the description of the action space:
Firstly you need to predict the class of your action, select from one below:
- **MOUSE_MOVE**: move the mouse to a specific position
- **CLICK**: click on the screen
- **MOUSE_DOWN**: press the mouse button
- **MOUSE_UP**: release the mouse button
- **KEY**: press a key on the keyboard
- **KEY_DOWN**: press a key on the keyboard
- **KEY_UP**: release a key on the keyboard
- **TYPE**: type a string on the keyboard
Then you need to predict the parameters of your action:
HERE is the description of the action space you need to predict, follow the format and choose the correct action type and parameters:
ACTION_SPACE = [
{
"action_type": "MOVE_TO",
"note": "move the cursor to the specified position",
"parameters": {
"x": {
"type": float,
"range": [0, X_MAX],
"optional": False,
},
"y": {
"type": float,
"range": [0, Y_MAX],
"optional": False,
}
}
},
{
"action_type": "CLICK",
"note": "click the left button if the button not specified, otherwise click the specified button; click at the current position if x and y are not specified, otherwise click at the specified position",
"parameters": {
"button": {
"type": str,
"range": ["left", "right", "middle"],
"optional": True,
},
"x": {
"type": float,
"range": [0, X_MAX],
"optional": True,
},
"y": {
"type": float,
"range": [0, Y_MAX],
"optional": True,
},
"num_clicks": {
"type": int,
"range": [1, 2, 3],
"optional": True,
},
}
},
{
"action_type": "MOUSE_DOWN",
"note": "press the left button if the button not specified, otherwise press the specified button",
"parameters": {
"button": {
"type": str,
"range": ["left", "right", "middle"],
"optional": True,
}
}
},
{
"action_type": "MOUSE_UP",
"note": "release the left button if the button not specified, otherwise release the specified button",
"parameters": {
"button": {
"type": str,
"range": ["left", "right", "middle"],
"optional": True,
}
}
},
{
"action_type": "RIGHT_CLICK",
"note": "right click at the current position if x and y are not specified, otherwise right click at the specified position",
"parameters": {
"x": {
"type": float,
"range": [0, X_MAX],
"optional": True,
},
"y": {
"type": float,
"range": [0, Y_MAX],
"optional": True,
}
}
},
{
"action_type": "DOUBLE_CLICK",
"note": "double click at the current position if x and y are not specified, otherwise double click at the specified position",
"parameters": {
"x": {
"type": float,
"range": [0, X_MAX],
"optional": True,
},
"y": {
"type": float,
"range": [0, Y_MAX],
"optional": True,
}
}
},
{
"action_type": "DRAG_TO",
"note": "drag the cursor to the specified position with the left button pressed",
"parameters": {
"x": {
"type": float,
"range": [0, X_MAX],
"optional": False,
},
"y": {
"type": float,
"range": [0, Y_MAX],
"optional": False,
}
}
},
{
"action_type": "SCROLL",
"note": "scroll the mouse wheel up or down",
"parameters": {
"dx": {
"type": int,
"range": None,
"optional": False,
},
"dy": {
"type": int,
"range": None,
"optional": False,
}
}
},
{
"action_type": "TYPING",
"note": "type the specified text",
"parameters": {
"text": {
"type": str,
"range": None,
"optional": False,
}
}
},
{
"action_type": "PRESS",
"note": "press the specified key and release it",
"parameters": {
"key": {
"type": str,
"range": KEYBOARD_KEYS,
"optional": False,
}
}
},
{
"action_type": "KEY_DOWN",
"note": "press the specified key",
"parameters": {
"key": {
"type": str,
"range": KEYBOARD_KEYS,
"optional": False,
}
}
},
{
"action_type": "KEY_UP",
"note": "release the specified key",
"parameters": {
"key": {
"type": str,
"range": KEYBOARD_KEYS,
"optional": False,
}
}
},
{
"action_type": "HOTKEY",
"note": "press the specified key combination",
"parameters": {
"keys": {
"type": list,
"range": [KEYBOARD_KEYS],
"optional": False,
}
}
},
############################################################################################################
{
"action_type": "WAIT",
"note": "wait until the next action",
},
{
"action_type": "FAIL",
"note": "decide the task can not be performed",
},
{
"action_type": "DONE",
"note": "decide the task is done",
}
]
Firstly you need to predict the class of your action, then you need to predict the parameters of your action:
- For MOUSE_MOVE, you need to predict the x and y coordinate of the mouse cursor, the left top corner of the screen is (0, 0), the right bottom corner of the screen is (1920, 1080)
for example, format as:
```
@@ -48,7 +236,9 @@ for example, format as:
}
```
For every step, you should only return the action_type and the parameters of your action as a dict, without any other things. You MUST wrap the dict with backticks (\`).
You can predict multiple actions at one step, but you should only return one action for each step.
REMEMBER:
For every step, you should only return the action_type and the parameters of your action as a dict, without any other things.
You MUST wrap the dict with backticks (\`).
You MUST choose and ONLY CHOOSE from the action space above, otherwise your action will be considered as invalid and you will get a penalty.
You CAN predict multiple actions at one step, but you should only return one action for each step.
"""

View File

@@ -4,5 +4,8 @@ For each step, you will get an observation of an image, which is the screenshot
You are required to use `pyautogui` to perform the action.
Return one line or multiple lines of python code to perform the action each time, be time efficient.
Return `None` if you cannot perform the action.
When you think you have to wait for some time, return `WAIT`.
When you think the task can not be done, return `FAIL`.
When you think the task is done, return `DONE`.
"""