Merge branch 'main' of github.com:ztjhz/DesktopEnv

This commit is contained in:
Siheng Zhao
2024-01-20 19:45:47 +08:00
11 changed files with 640 additions and 1474 deletions

141
experiment_a11y_tree.py Normal file
View File

@@ -0,0 +1,141 @@
import datetime
import json
import logging
import os
import sys
from desktop_env.envs.desktop_env import DesktopEnv
from mm_agents.gpt_4v_agent import GPT4v_Agent
# Logger Configs {{{ #
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
file_handler = logging.FileHandler(os.path.join("logs", "normal-{:}.log".format(datetime_str)), encoding="utf-8")
debug_handler = logging.FileHandler(os.path.join("logs", "debug-{:}.log".format(datetime_str)), encoding="utf-8")
stdout_handler = logging.StreamHandler(sys.stdout)
sdebug_handler = logging.FileHandler(os.path.join("logs", "sdebug-{:}.log".format(datetime_str)), encoding="utf-8")
file_handler.setLevel(logging.INFO)
debug_handler.setLevel(logging.DEBUG)
stdout_handler.setLevel(logging.INFO)
sdebug_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s")
file_handler.setFormatter(formatter)
debug_handler.setFormatter(formatter)
stdout_handler.setFormatter(formatter)
sdebug_handler.setFormatter(formatter)
stdout_handler.addFilter(logging.Filter("desktopenv"))
sdebug_handler.addFilter(logging.Filter("desktopenv"))
logger.addHandler(file_handler)
logger.addHandler(debug_handler)
logger.addHandler(stdout_handler)
logger.addHandler(sdebug_handler)
# }}} Logger Configs #
logger = logging.getLogger("desktopenv.experiment")
PATH_TO_VM = r"C:\Users\tianbaox\Documents\Virtual Machines\Ubuntu\Ubuntu.vmx"
def run_one_example(example, agent, max_steps=10, example_trajectory_dir="exp_trajectory", recording=True):
trajectory_recording_path = os.path.join(example_trajectory_dir, "trajectory.json")
env = DesktopEnv(
path_to_vm=PATH_TO_VM,
action_space=agent.action_space,
task_config=example
)
# reset the environment to certain snapshot
observation = env.reset()
done = False
step_num = 0
if recording:
# send a request to the server to start recording
env.controller.start_recording()
while not done and step_num < max_steps:
with open("accessibility_tree.xml", "w", encoding="utf-8") as f:
f.write(observation["accessibility_tree"])
actions = agent.predict(observation)
step_num += 1
for action in actions:
# Capture the timestamp before executing the action
action_timestamp = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
logger.info("Step %d: %s", step_num, action)
observation, reward, done, info = env.step(action)
logger.info("Reward: %.2f", reward)
logger.info("Done: %s", done)
logger.info("Info: %s", info)
# Save screenshot and trajectory information
with open(os.path.join(example_trajectory_dir, f"step_{step_num}_{action_timestamp}.png"), "wb") as _f:
with open(observation['screenshot'], "rb") as __f:
screenshot = __f.read()
_f.write(screenshot)
with open(trajectory_recording_path, "a") as f:
f.write(json.dumps({
"step_num": step_num,
"action_timestamp": action_timestamp,
"action": action,
"reward": reward,
"done": done,
"info": info,
"screenshot_file": f"step_{step_num}_{action_timestamp}.png"
}))
f.write("\n")
if done:
logger.info("The episode is done.")
break
if recording:
# send a request to the server to stop recording
env.controller.end_recording(os.path.join(example_trajectory_dir, "recording.mp4"))
result = env.evaluate()
logger.info("Result: %.2f", result)
# env.close()
logger.info("Environment closed.")
if __name__ == "__main__":
action_space = "pyautogui"
example_class = "chrome"
example_id = "7b6c7e24-c58a-49fc-a5bb-d57b80e5b4c3"
gpt4_model = "gpt-4-vision-preview"
gemini_model = "gemini-pro-vision"
logger.info("Running example %s/%s", example_class, example_id)
logger.info("Using model %s", gpt4_model)
# logger.info("Using model %s", gemini_model)
with open(f"evaluation_examples/examples/{example_class}/{example_id}.json", "r") as f:
example = json.load(f)
example["snapshot"] = "exp_setup4"
api_key = os.environ.get("OPENAI_API_KEY")
agent = GPT4v_Agent(api_key=api_key, model=gpt4_model, instruction=example['instruction'],
action_space=action_space, exp="a11y_tree")
# api_key = os.environ.get("GENAI_API_KEY")
# agent = GeminiPro_Agent(api_key=api_key, model=gemini_model, instruction=example['instruction'], action_space=action_space, exp="a11y_tree")
root_trajectory_dir = "exp_trajectory"
example_trajectory_dir = os.path.join(root_trajectory_dir, "a11y_tree", example_class, gpt4_model, example_id)
# example_trajectory_dir = os.path.join(root_trajectory_dir, "a11y_tree", example_class, gemini_model, example_id)
os.makedirs(example_trajectory_dir, exist_ok=True)
run_one_example(example, agent, 15, example_trajectory_dir)

View File

@@ -113,20 +113,28 @@ if __name__ == "__main__":
action_space = "pyautogui"
example_class = "thunderbird"
example_id = "bb5e4c0d-f964-439c-97b6-bdb9747de3f4"
gpt4_model = "gpt-4-vision-preview"
gemini_model = "gemini-pro-vision"
logger.info("Running example %s/%s", example_class, example_id)
logger.info("Using model %s", gpt4_model)
# logger.info("Using model %s", gemini_model)
with open(f"evaluation_examples/examples/{example_class}/{example_id}.json", "r") as f:
example = json.load(f)
example["snapshot"] = "exp_setup2"
# api_key = os.environ.get("OPENAI_API_KEY")
# agent = GPT4v_Agent(api_key=api_key, instruction=example['instruction'], action_space=action_space)
# agent = GPT4v_Agent(api_key=api_key, instruction=example['instruction'], action_space=action_space, exp="screenshot")
api_key = os.environ.get("GENAI_API_KEY")
agent = GeminiPro_Agent(api_key=api_key, instruction=example['instruction'], action_space=action_space)
agent = GeminiPro_Agent(api_key=api_key, instruction=example['instruction'], action_space=action_space, exp="screenshot")
root_trajectory_dir = "exp_trajectory"
example_trajectory_dir = os.path.join(root_trajectory_dir, example_class, example_id)
example_trajectory_dir = os.path.join(root_trajectory_dir, "a11y_tree", example_class, gpt4_model, example_id)
# example_trajectory_dir = os.path.join(root_trajectory_dir, "a11y_tree", example_class, gemini_model, example_id)
os.makedirs(example_trajectory_dir, exist_ok=True)
run_one_example(example, agent, 10, example_trajectory_dir)
run_one_example(example, agent, 15, example_trajectory_dir)

View File

@@ -0,0 +1,139 @@
import datetime
import json
import logging
import os
import sys
from desktop_env.envs.desktop_env import DesktopEnv
from mm_agents.gpt_4v_agent import GPT4v_Agent
# Logger Configs {{{ #
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
file_handler = logging.FileHandler(os.path.join("logs", "normal-{:}.log".format(datetime_str)), encoding="utf-8")
debug_handler = logging.FileHandler(os.path.join("logs", "debug-{:}.log".format(datetime_str)), encoding="utf-8")
stdout_handler = logging.StreamHandler(sys.stdout)
sdebug_handler = logging.FileHandler(os.path.join("logs", "sdebug-{:}.log".format(datetime_str)), encoding="utf-8")
file_handler.setLevel(logging.INFO)
debug_handler.setLevel(logging.DEBUG)
stdout_handler.setLevel(logging.INFO)
sdebug_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s")
file_handler.setFormatter(formatter)
debug_handler.setFormatter(formatter)
stdout_handler.setFormatter(formatter)
sdebug_handler.setFormatter(formatter)
stdout_handler.addFilter(logging.Filter("desktopenv"))
sdebug_handler.addFilter(logging.Filter("desktopenv"))
logger.addHandler(file_handler)
logger.addHandler(debug_handler)
logger.addHandler(stdout_handler)
logger.addHandler(sdebug_handler)
# }}} Logger Configs #
logger = logging.getLogger("desktopenv.experiment")
PATH_TO_VM = r"C:\Users\tianbaox\Documents\Virtual Machines\Ubuntu\Ubuntu.vmx"
def run_one_example(example, agent, max_steps=10, example_trajectory_dir="exp_trajectory", recording=True):
trajectory_recording_path = os.path.join(example_trajectory_dir, "trajectory.json")
env = DesktopEnv(
path_to_vm=PATH_TO_VM,
action_space=agent.action_space,
task_config=example
)
# reset the environment to certain snapshot
observation = env.reset()
done = False
step_num = 0
if recording:
# send a request to the server to start recording
env.controller.start_recording()
while not done and step_num < max_steps:
actions = agent.predict(observation)
step_num += 1
for action in actions:
# Capture the timestamp before executing the action
action_timestamp = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
logger.info("Step %d: %s", step_num, action)
observation, reward, done, info = env.step(action)
logger.info("Reward: %.2f", reward)
logger.info("Done: %s", done)
logger.info("Info: %s", info)
# Save screenshot and trajectory information
with open(os.path.join(example_trajectory_dir, f"step_{step_num}_{action_timestamp}.png"), "wb") as _f:
with open(observation['screenshot'], "rb") as __f:
screenshot = __f.read()
_f.write(screenshot)
with open(trajectory_recording_path, "a") as f:
f.write(json.dumps({
"step_num": step_num,
"action_timestamp": action_timestamp,
"action": action,
"reward": reward,
"done": done,
"info": info,
"screenshot_file": f"step_{step_num}_{action_timestamp}.png"
}))
f.write("\n")
if done:
logger.info("The episode is done.")
break
if recording:
# send a request to the server to stop recording
env.controller.end_recording(os.path.join(example_trajectory_dir, "recording.mp4"))
result = env.evaluate()
logger.info("Result: %.2f", result)
# env.close()
logger.info("Environment closed.")
if __name__ == "__main__":
action_space = "pyautogui"
example_class = "chrome"
example_id = "7b6c7e24-c58a-49fc-a5bb-d57b80e5b4c3"
gpt4_model = "gpt-4-vision-preview"
gemini_model = "gemini-pro-vision"
logger.info("Running example %s/%s", example_class, example_id)
logger.info("Using model %s", gpt4_model)
# logger.info("Using model %s", gemini_model)
with open(f"evaluation_examples/examples/{example_class}/{example_id}.json", "r") as f:
example = json.load(f)
example["snapshot"] = "exp_setup4"
api_key = os.environ.get("OPENAI_API_KEY")
agent = GPT4v_Agent(api_key=api_key, model=gpt4_model, instruction=example['instruction'],
action_space=action_space, exp="both")
# api_key = os.environ.get("GENAI_API_KEY")
# agent = GeminiPro_Agent(api_key=api_key, model=gemini_model, instruction=example['instruction'], action_space=action_space, exp="both")
root_trajectory_dir = "exp_trajectory"
example_trajectory_dir = os.path.join(root_trajectory_dir, "both", example_class, gpt4_model, example_id)
# example_trajectory_dir = os.path.join(root_trajectory_dir, "both", example_class, gemini_model, example_id)
os.makedirs(example_trajectory_dir, exist_ok=True)
run_one_example(example, agent, 15, example_trajectory_dir)

View File

@@ -5,8 +5,7 @@ import os
import sys
from desktop_env.envs.desktop_env import DesktopEnv
from mm_agents.gpt_4_agent import GPT4_Agent
from mm_agents.gemini_pro_agent import GeminiPro_Agent
from mm_agents.gpt_4v_agent import GPT4v_Agent
# Logger Configs {{{ #
logger = logging.getLogger()
@@ -62,8 +61,6 @@ def run_one_example(example, agent, max_steps=10, example_trajectory_dir="exp_tr
env.controller.start_recording()
while not done and step_num < max_steps:
with open("accessibility_tree.xml", "w", encoding="utf-8") as f:
f.write(observation["accessibility_tree"])
actions = agent.predict(observation)
step_num += 1
for action in actions:
@@ -113,8 +110,8 @@ def run_one_example(example, agent, max_steps=10, example_trajectory_dir="exp_tr
if __name__ == "__main__":
action_space = "pyautogui"
example_class = "chrome"
example_id = "7a5a7856-f1b6-42a4-ade9-1ca81ca0f263"
gpt4_model = "gpt-4-1106-preview"
example_id = "7b6c7e24-c58a-49fc-a5bb-d57b80e5b4c3"
gpt4_model = "gpt-4-vision-preview"
gemini_model = "gemini-pro-vision"
with open(f"evaluation_examples/examples/{example_class}/{example_id}.json", "r") as f:
@@ -122,15 +119,16 @@ if __name__ == "__main__":
example["snapshot"] = "exp_setup4"
api_key = os.environ.get("OPENAI_API_KEY")
agent = GPT4_Agent(api_key=api_key, model=gpt4_model, instruction=example['instruction'], action_space=action_space)
agent = GPT4v_Agent(api_key=api_key, model=gpt4_model, instruction=example['instruction'],
action_space=action_space, exp="seeact")
# api_key = os.environ.get("GENAI_API_KEY")
# agent = GeminiPro_Agent(api_key=api_key, model=gemini_model, instruction=example['instruction'], action_space=action_space)
root_trajectory_dir = "exp_trajectory"
example_trajectory_dir = os.path.join(root_trajectory_dir, "text", example_class, gpt4_model, example_id)
# example_trajectory_dir = os.path.join(root_trajectory_dir, "text", example_class, gemini_model, example_id)
example_trajectory_dir = os.path.join(root_trajectory_dir, "seeact", example_class, gpt4_model, example_id)
# example_trajectory_dir = os.path.join(root_trajectory_dir, "seeact", example_class, gemini_model, example_id)
os.makedirs(example_trajectory_dir, exist_ok=True)

View File

@@ -0,0 +1,135 @@
import datetime
import json
import logging
import os
import sys
from desktop_env.envs.desktop_env import DesktopEnv
from mm_agents.gpt_4v_agent import GPT4v_Agent
# Logger Configs {{{ #
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
file_handler = logging.FileHandler(os.path.join("logs", "normal-{:}.log".format(datetime_str)), encoding="utf-8")
debug_handler = logging.FileHandler(os.path.join("logs", "debug-{:}.log".format(datetime_str)), encoding="utf-8")
stdout_handler = logging.StreamHandler(sys.stdout)
sdebug_handler = logging.FileHandler(os.path.join("logs", "sdebug-{:}.log".format(datetime_str)), encoding="utf-8")
file_handler.setLevel(logging.INFO)
debug_handler.setLevel(logging.DEBUG)
stdout_handler.setLevel(logging.INFO)
sdebug_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s")
file_handler.setFormatter(formatter)
debug_handler.setFormatter(formatter)
stdout_handler.setFormatter(formatter)
sdebug_handler.setFormatter(formatter)
stdout_handler.addFilter(logging.Filter("desktopenv"))
sdebug_handler.addFilter(logging.Filter("desktopenv"))
logger.addHandler(file_handler)
logger.addHandler(debug_handler)
logger.addHandler(stdout_handler)
logger.addHandler(sdebug_handler)
# }}} Logger Configs #
logger = logging.getLogger("desktopenv.experiment")
PATH_TO_VM = r"C:\Users\tianbaox\Documents\Virtual Machines\Ubuntu\Ubuntu.vmx"
def run_one_example(example, agent, max_steps=10, example_trajectory_dir="exp_trajectory", recording=True):
trajectory_recording_path = os.path.join(example_trajectory_dir, "trajectory.json")
env = DesktopEnv(
path_to_vm=PATH_TO_VM,
action_space=agent.action_space,
task_config=example
)
# reset the environment to certain snapshot
observation = env.reset()
done = False
step_num = 0
if recording:
# send a request to the server to start recording
env.controller.start_recording()
while not done and step_num < max_steps:
actions = agent.predict(observation)
step_num += 1
for action in actions:
# Capture the timestamp before executing the action
action_timestamp = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
logger.info("Step %d: %s", step_num, action)
observation, reward, done, info = env.step(action)
logger.info("Reward: %.2f", reward)
logger.info("Done: %s", done)
logger.info("Info: %s", info)
# Save screenshot and trajectory information
with open(os.path.join(example_trajectory_dir, f"step_{step_num}_{action_timestamp}.png"), "wb") as _f:
with open(observation['screenshot'], "rb") as __f:
screenshot = __f.read()
_f.write(screenshot)
with open(trajectory_recording_path, "a") as f:
f.write(json.dumps({
"step_num": step_num,
"action_timestamp": action_timestamp,
"action": action,
"reward": reward,
"done": done,
"info": info,
"screenshot_file": f"step_{step_num}_{action_timestamp}.png"
}))
f.write("\n")
if done:
logger.info("The episode is done.")
break
if recording:
# send a request to the server to stop recording
env.controller.end_recording(os.path.join(example_trajectory_dir, "recording.mp4"))
result = env.evaluate()
logger.info("Result: %.2f", result)
# env.close()
logger.info("Environment closed.")
if __name__ == "__main__":
action_space = "pyautogui"
example_class = "chrome"
example_id = "7b6c7e24-c58a-49fc-a5bb-d57b80e5b4c3"
gpt4_model = "gpt-4-vision-preview"
gemini_model = "gemini-pro-vision"
with open(f"evaluation_examples/examples/{example_class}/{example_id}.json", "r") as f:
example = json.load(f)
example["snapshot"] = "exp_setup4"
api_key = os.environ.get("OPENAI_API_KEY")
agent = GPT4v_Agent(api_key=api_key, model=gpt4_model, instruction=example['instruction'],
action_space=action_space, exp="som")
# api_key = os.environ.get("GENAI_API_KEY")
# agent = GeminiPro_Agent(api_key=api_key, model=gemini_model, instruction=example['instruction'], action_space=action_space)
root_trajectory_dir = "exp_trajectory"
example_trajectory_dir = os.path.join(root_trajectory_dir, "som", example_class, gpt4_model, example_id)
# example_trajectory_dir = os.path.join(root_trajectory_dir, "som", example_class, gemini_model, example_id)
os.makedirs(example_trajectory_dir, exist_ok=True)
run_one_example(example, agent, 15, example_trajectory_dir)

View File

@@ -60,19 +60,19 @@ def draw_bounding_boxes(nodes, image_file_path, output_image_file_path):
image = Image.open(image_file_path)
draw = ImageDraw.Draw(image)
marks = []
drew_nodes = []
# todo: change the image tagger to align with SoM paper
# Optional: Load a font. If you don't specify a font, a default one will be used.
try:
# Adjust the path to the font file you have or use a default one
font = ImageFont.truetype("arial.ttf", 20)
font = ImageFont.truetype("arial.ttf", 15)
except IOError:
# Fallback to a basic font if the specified font can't be loaded
font = ImageFont.load_default()
index = 1
# Loop over all the visible nodes and draw their bounding boxes
for index, _node in enumerate(nodes):
for _node in nodes:
coords_str = _node.attrib.get('{uri:deskat:component.at-spi.gnome.org}screencoord')
size_str = _node.attrib.get('{uri:deskat:component.at-spi.gnome.org}size')
@@ -93,22 +93,30 @@ def draw_bounding_boxes(nodes, image_file_path, output_image_file_path):
if bottom_right[0] < coords[0] or bottom_right[1] < coords[1]:
raise ValueError(f"Invalid coordinates or size, coords: {coords}, size: {size}")
# Draw rectangle on image
draw.rectangle([coords, bottom_right], outline="red", width=2)
# Check if the area only contains one color
cropped_image = image.crop((*coords, *bottom_right))
if len(set(list(cropped_image.getdata()))) == 1:
continue
# Draw index number at the bottom left of the bounding box
# Draw rectangle on image
draw.rectangle([coords, bottom_right], outline="red", width=1)
# Draw index number at the bottom left of the bounding box with black background
text_position = (coords[0], bottom_right[1]) # Adjust Y to be above the bottom right
draw.text(text_position, str(index), font=font, fill="purple")
draw.rectangle([text_position, (text_position[0] + 25, text_position[1] + 18)], fill='black')
draw.text(text_position, str(index), font=font, fill="white")
index += 1
# each mark is an x, y, w, h tuple
marks.append([coords[0], coords[1], size[0], size[1]])
drew_nodes.append(_node)
except ValueError as e:
pass
# Save the result
image.save(output_image_file_path)
return marks
return marks, drew_nodes
def print_nodes_with_indent(nodes, indent=0):
@@ -120,6 +128,10 @@ def print_nodes_with_indent(nodes, indent=0):
if __name__ == '__main__':
with open('chrome_desktop_example_1.xml', 'r', encoding='utf-8') as f:
xml_file_str = f.read()
filtered_nodes = filter_nodes(find_leaf_nodes(xml_file_str))
print(len(filtered_nodes))
masks = draw_bounding_boxes(filtered_nodes, 'screenshot.png',
'chrome_desktop_example_1_tagged_remove.png', )
nodes = ET.fromstring(xml_file_str)
print_nodes_with_indent(nodes)
# print(masks)
print(len(masks))

View File

@@ -1,3 +1,5 @@
# todo: needs to be refactored
import time
from typing import Dict, List

View File

@@ -1,3 +1,5 @@
# todo: needs to be refactored
import time
from typing import Dict, List

View File

@@ -2,7 +2,6 @@ import base64
import json
import os
import re
import time
import uuid
from typing import Dict, List
@@ -54,9 +53,9 @@ def tag_screenshot(screenshot, accessibility_tree):
tagged_screenshot_file_path = os.path.join("tmp/images", uuid_str + ".png")
nodes = filter_nodes(find_leaf_nodes(accessibility_tree))
# Make tag screenshot
marks = draw_bounding_boxes(nodes, screenshot, tagged_screenshot_file_path)
marks, drew_nodes = draw_bounding_boxes(nodes, screenshot, tagged_screenshot_file_path)
return marks, tagged_screenshot_file_path
return marks, drew_nodes, tagged_screenshot_file_path
def parse_actions_from_string(input_string):
@@ -123,11 +122,18 @@ def parse_code_from_string(input_string):
def parse_code_from_som_string(input_string, masks):
# parse the output string by masks
mappings = []
for i, mask in enumerate(masks):
x, y, w, h = mask
input_string = input_string.replace("tag#" + str(i), "{}, {}".format(int(x + w // 2), int(y + h // 2)))
mappings.append(("tag#" + str(i + 1), "{}, {}".format(int(x + w // 2), int(y + h // 2))))
return parse_code_from_string(input_string)
# reverse the mappings
for mapping in mappings[::-1]:
input_string = input_string.replace(mapping[0], mapping[1])
actions = parse_code_from_string(input_string)
return actions
class GPT4v_Agent:
@@ -136,7 +142,7 @@ class GPT4v_Agent:
api_key,
instruction,
model="gpt-4-vision-preview",
max_tokens=300,
max_tokens=500,
action_space="computer_13",
exp="screenshot_a11y_tree"
# exp can be in ["screenshot", "a11y_tree", "screenshot_a11y_tree", "som", "seeact"]
@@ -147,6 +153,7 @@ class GPT4v_Agent:
self.max_tokens = max_tokens
self.action_space = action_space
self.exp = exp
self.max_trajectory_length = 3
self.headers = {
"Content-Type": "application/json",
@@ -194,8 +201,8 @@ class GPT4v_Agent:
else:
raise ValueError("Invalid experiment type: " + exp)
self.system_message = (self.system_message +
"\nHere is the instruction for the task: {}".format(self.instruction))
self.system_message = self.system_message + "\nYou are asked to complete the following task: {}".format(
self.instruction)
def predict(self, obs: Dict) -> List:
"""
@@ -204,28 +211,132 @@ class GPT4v_Agent:
# Prepare the payload for the API call
messages = []
if len(self.actions) > 0:
system_message = self.system_message + "\nHere are the actions you have done so far:\n" + "\n->\n".join(
self.actions)
else:
system_message = self.system_message
masks = None
messages.append({
"role": "system",
"content": [
{
"type": "text",
"text": system_message
"text": self.system_message
},
]
})
masks = None
# Append trajectory
assert len(self.observations) == len(self.actions), "The number of observations and actions should be the same."
if len(self.observations) > self.max_trajectory_length:
_observations = self.observations[-self.max_trajectory_length:]
_actions = self.actions[-self.max_trajectory_length:]
else:
_observations = self.observations
_actions = self.actions
for previous_obs, previous_action in zip(_observations, _actions):
if self.exp == "both":
_screenshot = previous_obs["screenshot"]
_linearized_accessibility_tree = previous_obs["accessibility_tree"]
messages.append({
"role": "user",
"content": [
{
"type": "text",
"text": "Given the screenshot and info from accessibility tree as below:\n{}\nWhat's the next step that you will do to help with the task?".format(
_linearized_accessibility_tree)
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{_screenshot}",
"detail": "high"
}
}
]
})
elif self.exp in ["som", "seeact"]:
_screenshot = previous_obs["screenshot"]
_linearized_accessibility_tree = previous_obs["accessibility_tree"]
messages.append({
"role": "user",
"content": [
{
"type": "text",
"text": "Given the tagged screenshot and info from accessibility tree as below:\n{}\nWhat's the next step that you will do to help with the task?".format(
_linearized_accessibility_tree)
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{_screenshot}",
"detail": "high"
}
}
]
})
elif self.exp == "screenshot":
_screenshot = previous_obs["screenshot"]
messages.append({
"role": "user",
"content": [
{
"type": "text",
"text": "Given the screenshot as below. What's the next step that you will do to help with the task?"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{_screenshot}",
"detail": "high"
}
}
]
})
elif self.exp == "a11y_tree":
_linearized_accessibility_tree = previous_obs["accessibility_tree"]
messages.append({
"role": "user",
"content": [
{
"type": "text",
"text": "Given the info from accessibility tree as below:\n{}\nWhat's the next step that you will do to help with the task?".format(
_linearized_accessibility_tree)
}
]
})
else:
raise ValueError("Invalid experiment type: " + self.exp)
messages.append({
"role": "assistant",
"content": [
{
"type": "text",
"text": "\n".join(previous_action) if len(previous_action) > 0 else "No valid action"
},
]
})
if self.exp in ["screenshot", "both"]:
base64_image = encode_image(obs["screenshot"])
linearized_accessibility_tree = linearize_accessibility_tree(accessibility_tree=obs["accessibility_tree"])
if self.exp == "both":
self.observations.append({
"screenshot": base64_image,
"accessibility_tree": linearized_accessibility_tree
})
else:
self.observations.append({
"screenshot": base64_image,
"accessibility_tree": None
})
messages.append({
"role": "user",
"content": [
@@ -247,6 +358,12 @@ class GPT4v_Agent:
})
elif self.exp == "a11y_tree":
linearized_accessibility_tree = linearize_accessibility_tree(accessibility_tree=obs["accessibility_tree"])
self.observations.append({
"screenshot": None,
"accessibility_tree": linearized_accessibility_tree
})
messages.append({
"role": "user",
"content": [
@@ -259,17 +376,21 @@ class GPT4v_Agent:
})
elif self.exp == "som":
# Add som to the screenshot
masks, tagged_screenshot = tag_screenshot(obs["screenshot"], obs["accessibility_tree"])
masks, drew_nodes, tagged_screenshot = tag_screenshot(obs["screenshot"], obs["accessibility_tree"])
base64_image = encode_image(tagged_screenshot)
linearized_accessibility_tree = linearize_accessibility_tree(accessibility_tree=obs["accessibility_tree"])
self.observations.append({
"screenshot": base64_image,
"accessibility_tree": linearized_accessibility_tree
})
messages.append({
"role": "user",
"content": [
{
"type": "text",
"text": "Given the info from the tagged screenshot as below:\n{}\nWhat's the next step that you will do to help with the task?".format(
"text": "Given the tagged screenshot and info from accessibility tree as below:\n{}\nWhat's the next step that you will do to help with the task?".format(
linearized_accessibility_tree)
},
{
@@ -283,11 +404,15 @@ class GPT4v_Agent:
})
elif self.exp == "seeact":
# Add som to the screenshot
masks, tagged_screenshot = tag_screenshot(obs["screenshot"], obs["accessibility_tree"])
masks, drew_nodes, tagged_screenshot = tag_screenshot(obs["screenshot"], obs["accessibility_tree"])
base64_image = encode_image(tagged_screenshot)
linearized_accessibility_tree = linearize_accessibility_tree(accessibility_tree=obs["accessibility_tree"])
self.observations.append({
"screenshot": base64_image,
"accessibility_tree": linearized_accessibility_tree
})
messages.append({
"role": "user",
"content": [
@@ -307,12 +432,17 @@ class GPT4v_Agent:
else:
raise ValueError("Invalid experiment type: " + self.exp)
with open("messages.json", "w") as f:
f.write(json.dumps(messages, indent=4))
response = self.call_llm({
"model": self.model,
"messages": messages,
"max_tokens": self.max_tokens
})
print(response)
if self.exp == "seeact":
messages.append({
"role": "assistant",
@@ -340,6 +470,7 @@ class GPT4v_Agent:
"messages": messages,
"max_tokens": self.max_tokens
})
print(response)
try:
actions = self.parse_actions(response, masks)
@@ -354,20 +485,17 @@ class GPT4v_Agent:
(APIError, RateLimitError, APIConnectionError, ServiceUnavailableError, InvalidRequestError),
)
def call_llm(self, payload):
while True:
try:
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers=self.headers,
json=payload
)
break
except:
print("Failed to generate response, retrying...")
time.sleep(5)
pass
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers=self.headers,
json=payload
)
return response.json()['choices'][0]['message']['content']
if response.status_code != 200:
print("Failed to call LLM: " + response.text)
return ""
else:
return response.json()['choices'][0]['message']['content']
def parse_actions(self, response: str, masks=None):

View File

@@ -3,7 +3,7 @@ You are an agent which follow my instruction and perform desktop computer tasks
You have good knowledge of computer and good internet connection and assume your code will run on a computer for controlling the mouse and keyboard.
For each step, you will get an observation of an image, which is the screenshot of the computer screen and you will predict the action of the computer based on the image.
You are required to use `pyautogui` to perform the action.
You are required to use `pyautogui` to perform the action, but don't use the `pyautogui.locateCenterOnScreen` function to locate the element you want to operate with since we have no image of the element you want to operate with.
Return one line or multiple lines of python code to perform the action each time, be time efficient.
You ONLY need to return the code inside a code block, like this:
```python
@@ -14,7 +14,7 @@ When you think you have to wait for some time, return ```WAIT```;
When you think the task can not be done, return ```FAIL```, don't easily say ```FAIL```, try your best to do the task;
When you think the task is done, return ```DONE```.
First give the current screenshot and previous things we did a reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE.
First give the current screenshot and previous things we did a short reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE.
""".strip()
SYS_PROMPT_IN_SCREENSHOT_OUT_ACTION = """
@@ -267,7 +267,7 @@ You are an agent which follow my instruction and perform desktop computer tasks
You have good knowledge of computer and good internet connection and assume your code will run on a computer for controlling the mouse and keyboard.
For each step, you will get an observation of the desktop by accessibility tree, which is based on AT-SPI library. And you will predict the action of the computer based on the accessibility tree.
You are required to use `pyautogui` to perform the action.
You are required to use `pyautogui` to perform the action, but don't use the `pyautogui.locateCenterOnScreen` function to locate the element you want to operate with since we have no image of the element you want to operate with.
Return one line or multiple lines of python code to perform the action each time, be time efficient.
You ONLY need to return the code inside a code block, like this:
```python
@@ -278,7 +278,7 @@ When you think you have to wait for some time, return ```WAIT```;
When you think the task can not be done, return ```FAIL```, don't easily say ```FAIL```, try your best to do the task;
When you think the task is done, return ```DONE```.
First give the current screenshot and previous things we did a reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE.
First give the current screenshot and previous things we did a short reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE.
""".strip()
SYS_PROMPT_IN_A11Y_OUT_ACTION = """
@@ -532,7 +532,7 @@ You have good knowledge of computer and good internet connection and assume your
For each step, you will get an observation of the desktop by 1) a screenshot; and 2) accessibility tree, which is based on AT-SPI library.
And you will predict the action of the computer based on the screenshot and accessibility tree.
You are required to use `pyautogui` to perform the action.
You are required to use `pyautogui` to perform the action, but don't use the `pyautogui.locateCenterOnScreen` function to locate the element you want to operate with since we have no image of the element you want to operate with.
Return one line or multiple lines of python code to perform the action each time, be time efficient.
You ONLY need to return the code inside a code block, like this:
```python
@@ -543,7 +543,7 @@ When you think you have to wait for some time, return ```WAIT```;
When you think the task can not be done, return ```FAIL```, don't easily say ```FAIL```, try your best to do the task;
When you think the task is done, return ```DONE```.
First give the current screenshot and previous things we did a reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE.
First give the current screenshot and previous things we did a short reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE.
""".strip()
SYS_PROMPT_IN_BOTH_OUT_ACTION = """
@@ -797,12 +797,15 @@ You are an agent which follow my instruction and perform desktop computer tasks
You have good knowledge of computer and good internet connection and assume your code will run on a computer for controlling the mouse and keyboard.
For each step, you will get an observation of the desktop by 1) a screenshot; and 2) accessibility tree, which is based on AT-SPI library.
You are required to use `pyautogui` to perform the action. But replace x, y in the code with the tag of the element you want to operate with. such as:
You are required to use `pyautogui` to perform the action, but don't use the `pyautogui.locateCenterOnScreen` function to locate the element you want to operate with since we have no image of the element you want to operate with.
You can replace x, y in the code with the tag of the element you want to operate with. such as:
```python
pyautogui.moveTo(tag#3)
pyautogui.click(tag#2)
pyautogui.dragTo(tag#1, button='left')
```
When you think you can directly output precise x and y coordinates or there is no tag on which you want to interact, you can also use them directly.
But you should be careful to ensure that the coordinates are correct.
Return one line or multiple lines of python code to perform the action each time, be time efficient.
You ONLY need to return the code inside a code block, like this:
```python
@@ -813,7 +816,7 @@ When you think you have to wait for some time, return ```WAIT```;
When you think the task can not be done, return ```FAIL```, don't easily say ```FAIL```, try your best to do the task;
When you think the task is done, return ```DONE```.
First give the current screenshot and previous things we did a reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE.
First give the current screenshot and previous things we did a short reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE.
""".strip()
SYS_PROMPT_SEEACT = """
@@ -842,12 +845,15 @@ Then, based on your analysis, in conjunction with human desktop using habits and
"""
ACTION_GROUNDING_PROMPT_SEEACT = """
You are required to use `pyautogui` to perform the action. But replace x, y in the code with the tag of the element you want to operate with. such as:
You are required to use `pyautogui` to perform the action, but don't use the `pyautogui.locateCenterOnScreen` function to locate the element you want to operate with since we have no image of the element you want to operate with.
You can replace x, y in the code with the tag of the element you want to operate with. such as:
```python
pyautogui.moveTo(tag#3)
pyautogui.click(tag#2)
pyautogui.dragTo(tag#1, button='left')
```
When you think you can directly output precise x and y coordinates or there is no tag on which you want to interact, you can also use them directly.
But you should be careful to ensure that the coordinates are correct.
Return one line or multiple lines of python code to perform the action each time, be time efficient.
You ONLY need to return the code inside a code block, like this:
```python
@@ -858,5 +864,5 @@ When you think you have to wait for some time, return ```WAIT```;
When you think the task can not be done, return ```FAIL```, don't easily say ```FAIL```, try your best to do the task;
When you think the task is done, return ```DONE```.
First give the current screenshot and previous things we did a reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE.
First give the current screenshot and previous things we did a short reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE.
"""

File diff suppressed because it is too large Load Diff