Merge branch 'main' into zdy

This commit is contained in:
David Chang
2024-01-16 22:48:35 +08:00
28 changed files with 1196 additions and 168 deletions

View File

@@ -23,4 +23,11 @@ todo
- [x] Error handling during file passing and file opening, etc.
- [x] Add accessibility tree from the OS into the observation space
- [ ] Add pre-process and post-process action support for benchmarking setup and evaluation
- [ ] Multiprocess support, this can enable the reinforcement learning to be more efficient
- [ ] Multiprocess support, this can enable the reinforcement learning to be more efficient
## Road map of benchmark, tools and resources (Proposed)
- [ ] Improve the annotation tool base on DuckTrack, make it more robust which align on accessibility tree
- [ ] Annotate the steps of doing the task
- [ ] Build a website for the project
- [ ] Crawl all resources we explored from the internet, and make it easy to access
- [ ] Set up ways for community to contribute new examples

View File

@@ -83,6 +83,8 @@ class PythonController:
"""
Executes an action on the server computer.
"""
if action in ['WAIT', 'FAIL', 'DONE']:
return
action_type = action["action_type"]
parameters = action["parameters"] if "parameters" in action else {}

View File

@@ -204,7 +204,10 @@ class DesktopEnv(gym.Env):
time.sleep(5)
logger.info("Environment setup complete.")
observation = {"screenshot": self._get_obs()}
observation = {
"screenshot": self._get_obs(),
"accessibility_tree": self.controller.get_accessibility_tree(),
}
return observation
def step(self, action, pause=0.5):
@@ -231,8 +234,11 @@ class DesktopEnv(gym.Env):
# the set of all possible actions defined in the action representation
self.controller.execute_action(action)
elif self.action_space == "pyautogui":
# the set of all possible python commands insides `pyautogui`
self.controller.execute_python_command(action)
if action in ['WAIT', 'FAIL', 'DONE']:
self.controller.execute_action(action)
else:
# the set of all possible python commands insides `pyautogui`
self.controller.execute_python_command(action)
observation = {
"screenshot": self._get_obs(),

View File

@@ -159,9 +159,19 @@ def get_open_tabs_info(env, config: Dict[str, str]):
tabs_info = []
for context in browser.contexts:
for page in context.pages:
title = page.title()
url = page.url
tabs_info.append({'title': title, 'url': url})
try:
# Wait for the page to finish loading, this prevents the "execution context was destroyed" issue
page.wait_for_load_state('load') # Wait for the 'load' event to complete
title = page.title()
url = page.url
tabs_info.append({'title': title, 'url': url})
except TimeoutError:
# If page loading times out, catch the exception and store the current information in the list
tabs_info.append({'title': 'Load timeout', 'url': page.url})
except Exception as e:
# Catch other potential exceptions that might occur while reading the page title
print(f'Error: {e}')
tabs_info.append({'title': 'Error encountered', 'url': page.url})
browser.close()
return tabs_info

View File

@@ -14,4 +14,4 @@ from .gimp import increase_saturation, decrease_brightness, check_file_exists, c
from .general import check_csv, check_accessibility_tree, check_list, run_sqlite3, check_json
from .thunderbird import check_thunderbird_prefs, check_thunderbird_filter
from .vscode import compare_text_file, compare_config, compare_answer, is_extension_installed
from .impress import check_slide_numbers_color, compare_pptx_files, check_for_two_lines
from .impress import check_slide_numbers_color, compare_pptx_files, check_for_two_lines, check_for_audio, check_formula_shape, check_file_exists

View File

@@ -4,7 +4,7 @@ import functools
import operator
import re
from numbers import Number
from typing import Callable, Any
from typing import Callable, Any, Union
from typing import Dict, List, Pattern
import lxml.etree

View File

@@ -71,6 +71,29 @@ def check_file_exists(directory, filename):
file_path = os.path.join(directory, filename)
return 1 if os.path.isfile(file_path) else 0
def has_audio_on_page(slide):
for shape in slide.shapes:
if shape.shape_type == 13:
return True
return False
def check_for_audio(prs):
prs = Presentation(prs)
for i, slide in enumerate(prs.slides):
if has_audio_on_page(slide):
return 1
return 0
def check_formula_shape(prs):
prs = Presentation(prs)
slide = prs.slides[13]
for shape in slide.shapes:
if shape.has_text_frame and shape.shape_type == 1:
return 1
return 0
if __name__ == "__main__":
path1 = "../../任务数据/LibreOffice Impress/Change_Color_Slide_Number_gold_textbox.pptx"
presentation = Presentation(path1)

View File

@@ -0,0 +1,37 @@
{
"id": "39478d4a-1049-456f-aa77-407811393add",
"snapshot": "libreoffice_impress",
"instruction": "Could you help me add a hat symbol to \"Y\" in text in page 14? I need to represent vector sign for my statistics class.",
"source": "https://www.reddit.com/r/libreoffice/comments/jul3o8/putting_cap_or_hat_or_carat_symbol_in_libre/",
"config": [
{
"type": "download",
"parameters": {
"files": [
{
"url": "https://drive.usercontent.google.com/download?id=1WT1-L0iiIlF2kuIK77IDxTfBaQ0X0BbX&export=download&authuser=0&confirm=t&uuid=0b69767e-1f3e-49ce-88a7-1036ef25bcaf&at=APZUnTXZ_sqEZUrHNx1edWep017b:1705337750065",
"path": "Desktop/Ch5.pptx"
}
]
}
},
{
"type": "open",
"parameters": {
"path": "Desktop/Ch5.pptx"
}
}
],
"trajectory": "trajectories/",
"related_apps": [
"libreoffice_impress"
],
"evaluator": {
"func": "check_formula_shape",
"result": {
"type": "vm_file",
"path": "Desktop/Ch5.pptx",
"dest": "Ch5.pptx"
}
}
}

View File

@@ -1,12 +1,42 @@
{
"id": "9ec204e4-f0a3-42f8-8458-b772a6797cab",
"snapshot": "libreoffice_impress",
"instruction": "Could you help me copy and paste the first 3 slides?",
"instruction": "Make a duplicate of the last two slides for me, please.",
"source": "https://www.tiktok.com/@lil.d1rt_/video/7247574148887629083",
"config": [],
"config": [
{
"type": "download",
"parameters": {
"files": [
{
"url": "https://drive.usercontent.google.com/download?id=1ad5vUXasdN2MypNap-pBUmgPg5FaxmDA&export=download&authuser=0&confirm=t&uuid=9dc069bb-edd3-4ae9-b356-4c6543778584&at=APZUnTXbgLHjv1MhMFy1IfZL3fQI:1705338128337",
"path": "Desktop/MLA_Workshop_061X_Works_Cited.pptx"
}
]
}
},
{
"type": "open",
"parameters": {
"path": "Desktop/MLA_Workshop_061X_Works_Cited.pptx"
}
}
],
"trajectory": "trajectories/",
"related_apps": [
""
"libreoffice_impress"
],
"evaluator": "evaluation_dir"
}
"evaluator": {
"func": "compare_pptx_files",
"expected": {
"type": "cloud_file",
"path": "https://drive.usercontent.google.com/download?id=1otbzscpOZ0tCXMvsMC0MmNWUC7Pv71of&export=download&authuser=0&confirm=t&uuid=faa0b0c1-6b14-4bce-a1fd-ccf824ee1e60&at=APZUnTXw6TlBOlrPPZ2OhfGnNPf0:1705338135842",
"dest": "MLA_Workshop_061X_Works_Cited_Gold.docx"
},
"result": {
"type": "vm_file",
"path": "Desktop/MLA_Workshop_061X_Works_Cited.pptx",
"dest": "MLA_Workshop_061X_Works_Cited.pptx"
}
}
}

View File

@@ -1,12 +1,42 @@
{
"id": "af23762e-2bfd-4a1d-aada-20fa8de9ce07",
"snapshot": "libreoffice_impress",
"instruction": "Please make a summary slide of the whole presentation for me",
"instruction": "I am making PPT on LibreOffice Impress for presentation tomorrow. I need to summarize contents on one slide. Could you make a summary slide for me?",
"source": "https://superuser.com/questions/1059080/how-to-make-a-summary-slide-in-impress-listing-the-titles-of-all-slides-autom",
"config": [],
"config": [
{
"type": "download",
"parameters": {
"files": [
{
"url": "https://drive.usercontent.google.com/download?id=1zmtomIzSgSjnYZbhgtH4n90L5mV9bS7L&export=download&authuser=0&confirm=t&uuid=1966ab03-1e17-447a-aeb4-71a753eca196&at=APZUnTVoWHV6z8LJi5VHADuQE6VG:1705319233167",
"path": "Desktop/Forests.pptx"
}
]
}
},
{
"type": "open",
"parameters": {
"path": "Desktop/Forests.pptx"
}
}
],
"trajectory": "trajectories/",
"related_apps": [
""
"libreoffice_impress"
],
"evaluator": "evaluation_dir"
}
"evaluator": {
"func": "compare_pptx_files",
"expected": {
"type": "cloud_file",
"path": "https://drive.usercontent.google.com/download?id=1nRwmFgYdskv3EiriZZFoT8TzM9CsG5B0&export=download&authuser=0&confirm=t&uuid=f2f919df-2867-4bc3-8bb9-dabd51108ebb&at=APZUnTWzw9LJWWXvH0cvdaWL-Ij-:1705319339474",
"dest": "Forests_Gold.docx"
},
"result": {
"type": "vm_file",
"path": "Desktop/Forests.pptx",
"dest": "Forests.pptx"
}
}
}

View File

@@ -1,12 +1,48 @@
{
"id": "c59742c0-4323-4b9d-8a02-723c251deaa0",
"snapshot": "libreoffice_impress",
"instruction": "Could you help me add video into the presentation file?",
"instruction": "I am making PPT about the history of baseball. I want to add an introduction audio named \"Baseball.mp3\" on the Desktop into my PPT, but I do not know how. Could you help me add audio into my presentation file?",
"source": "https://www.reddit.com/r/libreoffice/comments/17lcdrp/audio_not_supported_in_libreoffice_impress/",
"config": [],
"config": [
{
"type": "download",
"parameters": {
"files": [
{
"url": "https://drive.usercontent.google.com/download?id=1Oy5Zga6PnvpIwJ1OHMdFf3mSbm_YClHh&export=download&authuser=0&confirm=t&uuid=da1db839-da27-4bb5-a4fc-0358342f493b&at=APZUnTVsLyGsj8qI1rPyGAUTCX4F:1705324246149",
"path": "Desktop/Mady_and_Mia_Baseball.pptx"
}
]
}
},
{
"type": "download",
"parameters": {
"files": [
{
"url": "https://drive.usercontent.google.com/download?id=1a-DaT1LUuvh55GsjpJkhpjcDTh8CPzkl&export=download&authuser=0&confirm=t&uuid=2bfb7700-5222-47ad-9aee-e5c22b50dbfe&at=APZUnTW7RQR5HI9giEl-cirnSX3q:1705320977391",
"path": "Desktop/Baseball.mp3"
}
]
}
},
{
"type": "open",
"parameters": {
"path": "Desktop/Mady_and_Mia_Baseball.pptx"
}
}
],
"trajectory": "trajectories/",
"related_apps": [
""
"libreoffice_impress"
],
"evaluator": "evaluation_dir"
}
"evaluator": {
"func": "check_for_audio",
"result": {
"type": "vm_file",
"path": "Desktop/Mady_and_Mia_Baseball.pptx",
"dest": "Mady_and_Mia_Baseball.pptx"
}
}
}

View File

@@ -6,6 +6,7 @@ import sys
from desktop_env.envs.desktop_env import DesktopEnv
from mm_agents.gpt_4v_agent import GPT4v_Agent
from mm_agents.gemini_pro_agent import GeminiPro_Agent
# Logger Configs {{{ #
logger = logging.getLogger()
@@ -44,7 +45,7 @@ logger = logging.getLogger("desktopenv.experiment")
PATH_TO_VM = r"C:\Users\tianbaox\Documents\Virtual Machines\Ubuntu\Ubuntu.vmx"
def run_one_example(example, agent, max_steps=2, example_trajectory_dir="exp_trajectory", recording=True):
def run_one_example(example, agent, max_steps=10, example_trajectory_dir="exp_trajectory", recording=True):
trajectory_recording_path = os.path.join(example_trajectory_dir, "trajectory.json")
env = DesktopEnv(
path_to_vm=PATH_TO_VM,
@@ -53,7 +54,6 @@ def run_one_example(example, agent, max_steps=2, example_trajectory_dir="exp_tra
)
# reset the environment to certain snapshot
observation = env.reset()
observation['instruction'] = example['instruction']
done = False
step_num = 0
@@ -63,17 +63,14 @@ def run_one_example(example, agent, max_steps=2, example_trajectory_dir="exp_tra
while not done and step_num < max_steps:
actions = agent.predict(observation)
step_num += 1
for action in actions:
step_num += 1
# Capture the timestamp before executing the action
action_timestamp = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
logger.info("Step %d: %s", step_num, action)
observation, reward, done, info = env.step(action)
observation['instruction'] = example['instruction']
# Logging
logger.info("Step %d: %s", step_num, action)
logger.info("Reward: %.2f", reward)
logger.info("Done: %s", done)
logger.info("Info: %s", info)
@@ -114,19 +111,22 @@ def run_one_example(example, agent, max_steps=2, example_trajectory_dir="exp_tra
if __name__ == "__main__":
action_space = "pyautogui"
example_class = "vlc"
example_id = "8f080098-ddb1-424c-b438-4e96e5e4786e"
example_class = "thunderbird"
example_id = "bb5e4c0d-f964-439c-97b6-bdb9747de3f4"
with open(f"evaluation_examples/examples/{example_class}/{example_id}.json", "r") as f:
example = json.load(f)
example["snapshot"] = "exp_setup"
example["snapshot"] = "exp_setup2"
api_key = os.environ.get("OPENAI_API_KEY")
agent = GPT4v_Agent(api_key=api_key, action_space=action_space)
# api_key = os.environ.get("OPENAI_API_KEY")
# agent = GPT4v_Agent(api_key=api_key, instruction=example['instruction'], action_space=action_space)
api_key = os.environ.get("GENAI_API_KEY")
agent = GeminiPro_Agent(api_key=api_key, instruction=example['instruction'], action_space=action_space)
root_trajectory_dir = "exp_trajectory"
example_trajectory_dir = os.path.join(root_trajectory_dir, example_class, example_id)
os.makedirs(example_trajectory_dir, exist_ok=True)
run_one_example(example, agent, 2, example_trajectory_dir)
run_one_example(example, agent, 10, example_trajectory_dir)

135
experiment_pure_text.py Normal file
View File

@@ -0,0 +1,135 @@
import datetime
import json
import logging
import os
import sys
from desktop_env.envs.desktop_env import DesktopEnv
from mm_agents.gpt_4_agent import GPT4_Agent
from mm_agents.gemini_pro_agent import GeminiPro_Agent
# Logger Configs {{{ #
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
file_handler = logging.FileHandler(os.path.join("logs", "normal-{:}.log".format(datetime_str)), encoding="utf-8")
debug_handler = logging.FileHandler(os.path.join("logs", "debug-{:}.log".format(datetime_str)), encoding="utf-8")
stdout_handler = logging.StreamHandler(sys.stdout)
sdebug_handler = logging.FileHandler(os.path.join("logs", "sdebug-{:}.log".format(datetime_str)), encoding="utf-8")
file_handler.setLevel(logging.INFO)
debug_handler.setLevel(logging.DEBUG)
stdout_handler.setLevel(logging.INFO)
sdebug_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s")
file_handler.setFormatter(formatter)
debug_handler.setFormatter(formatter)
stdout_handler.setFormatter(formatter)
sdebug_handler.setFormatter(formatter)
stdout_handler.addFilter(logging.Filter("desktopenv"))
sdebug_handler.addFilter(logging.Filter("desktopenv"))
logger.addHandler(file_handler)
logger.addHandler(debug_handler)
logger.addHandler(stdout_handler)
logger.addHandler(sdebug_handler)
# }}} Logger Configs #
logger = logging.getLogger("desktopenv.experiment")
PATH_TO_VM = r"C:\Users\tianbaox\Documents\Virtual Machines\Ubuntu\Ubuntu.vmx"
def run_one_example(example, agent, max_steps=10, example_trajectory_dir="exp_trajectory", recording=True):
trajectory_recording_path = os.path.join(example_trajectory_dir, "trajectory.json")
env = DesktopEnv(
path_to_vm=PATH_TO_VM,
action_space=agent.action_space,
task_config=example
)
# reset the environment to certain snapshot
observation = env.reset()
done = False
step_num = 0
if recording:
# send a request to the server to start recording
env.controller.start_recording()
while not done and step_num < max_steps:
actions = agent.predict(observation)
step_num += 1
for action in actions:
# Capture the timestamp before executing the action
action_timestamp = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
logger.info("Step %d: %s", step_num, action)
observation, reward, done, info = env.step(action)
logger.info("Reward: %.2f", reward)
logger.info("Done: %s", done)
logger.info("Info: %s", info)
# Save screenshot and trajectory information
with open(os.path.join(example_trajectory_dir, f"step_{step_num}_{action_timestamp}.png"), "wb") as _f:
with open(observation['screenshot'], "rb") as __f:
screenshot = __f.read()
_f.write(screenshot)
with open(trajectory_recording_path, "a") as f:
f.write(json.dumps({
"step_num": step_num,
"action_timestamp": action_timestamp,
"action": action,
"reward": reward,
"done": done,
"info": info,
"screenshot_file": f"step_{step_num}_{action_timestamp}.png"
}))
f.write("\n")
if done:
logger.info("The episode is done.")
break
if recording:
# send a request to the server to stop recording
env.controller.end_recording(os.path.join(example_trajectory_dir, "recording.mp4"))
result = env.evaluate()
logger.info("Result: %.2f", result)
# env.close()
logger.info("Environment closed.")
if __name__ == "__main__":
action_space = "pyautogui"
example_class = "chrome"
example_id = "7a5a7856-f1b6-42a4-ade9-1ca81ca0f263"
gpt4_model = "gpt-4-1106-preview"
gemini_model = "gemini-pro-vision"
with open(f"evaluation_examples/examples/{example_class}/{example_id}.json", "r") as f:
example = json.load(f)
example["snapshot"] = "exp_setup4"
api_key = os.environ.get("OPENAI_API_KEY")
agent = GPT4_Agent(api_key=api_key, model=gpt4_model, instruction=example['instruction'], action_space=action_space)
# api_key = os.environ.get("GENAI_API_KEY")
# agent = GeminiPro_Agent(api_key=api_key, model=gemini_model, instruction=example['instruction'], action_space=action_space)
root_trajectory_dir = "exp_trajectory"
example_trajectory_dir = os.path.join(root_trajectory_dir, "text", example_class, gpt4_model, example_id)
# example_trajectory_dir = os.path.join(root_trajectory_dir, "text", example_class, gemini_model, example_id)
os.makedirs(example_trajectory_dir, exist_ok=True)
run_one_example(example, agent, 15, example_trajectory_dir)

View File

@@ -0,0 +1,102 @@
import xml.etree.ElementTree as ET
from PIL import Image, ImageDraw, ImageFont
def find_leaf_nodes(xlm_file_str):
if not xlm_file_str:
return []
root = ET.fromstring(xlm_file_str)
# Recursive function to traverse the XML tree and collect leaf nodes
def collect_leaf_nodes(node, leaf_nodes):
# If the node has no children, it is a leaf node, add it to the list
if not list(node):
leaf_nodes.append(node)
# If the node has children, recurse on each child
for child in node:
collect_leaf_nodes(child, leaf_nodes)
# List to hold all leaf nodes
leaf_nodes = []
collect_leaf_nodes(root, leaf_nodes)
return leaf_nodes
def filter_nodes(nodes):
filtered_nodes = []
for node in nodes:
if not node.get('{uri:deskat:state.at-spi.gnome.org}visible', None) == 'true':
# Not visible
continue
# Check if the node is a 'panel'
if node.tag == 'panel':
# Check if the 'panel' represents an interactive element
# or if it has certain attributes that are of interest.
# Add your conditions here...
if node.get('{uri:deskat:state.at-spi.gnome.org}focusable', 'false') == 'true':
filtered_nodes.append(node)
elif node.tag == 'text':
continue
else:
coords = tuple(map(int, node.attrib.get('{uri:deskat:component.at-spi.gnome.org}screencoord').strip('()').split(', ')))
if coords[0] < 0 or coords[1] < 0:
continue
size = tuple(map(int, node.attrib.get('{uri:deskat:component.at-spi.gnome.org}size').strip('()').split(', ')))
if size[0] <= 0 or size[1] <= 0:
continue
# Node is not a 'panel', add to the list.
filtered_nodes.append(node)
return filtered_nodes
def draw_bounding_boxes(nodes, image_file_path, output_image_file_path):
# Load the screenshot image
image = Image.open(image_file_path)
draw = ImageDraw.Draw(image)
# Optional: Load a font. If you don't specify a font, a default one will be used.
try:
# Adjust the path to the font file you have or use a default one
font = ImageFont.truetype("arial.ttf", 20)
except IOError:
# Fallback to a basic font if the specified font can't be loaded
font = ImageFont.load_default()
# Loop over all the visible nodes and draw their bounding boxes
for index, _node in enumerate(nodes):
coords_str = _node.attrib.get('{uri:deskat:component.at-spi.gnome.org}screencoord')
size_str = _node.attrib.get('{uri:deskat:component.at-spi.gnome.org}size')
if coords_str and size_str:
try:
# Parse the coordinates and size from the strings
coords = tuple(map(int, coords_str.strip('()').split(', ')))
size = tuple(map(int, size_str.strip('()').split(', ')))
# Check for negative sizes
if size[0] <= 0 or size[1] <= 0:
raise ValueError(f"Size must be positive, got: {size}")
# Calculate the bottom-right corner of the bounding box
bottom_right = (coords[0] + size[0], coords[1] + size[1])
# Check that bottom_right > coords (x1 >= x0, y1 >= y0)
if bottom_right[0] < coords[0] or bottom_right[1] < coords[1]:
raise ValueError(f"Invalid coordinates or size, coords: {coords}, size: {size}")
# Draw rectangle on image
draw.rectangle([coords, bottom_right], outline="red", width=2)
# Draw index number at the bottom left of the bounding box
text_position = (coords[0], bottom_right[1]) # Adjust Y to be above the bottom right
draw.text(text_position, str(index), font=font, fill="purple")
except ValueError as e:
pass
# Save the result
image.save(output_image_file_path)

View File

@@ -1,20 +0,0 @@
from transformers import FuyuProcessor, FuyuForCausalLM
from PIL import Image
image = Image.open("stackoverflow.png").convert("RGB")
# load model and processor
model_id = "adept/fuyu-8b"
processor = FuyuProcessor.from_pretrained(model_id)
model = FuyuForCausalLM.from_pretrained(model_id, device_map="cuda:0")
# prepare inputs for the model
text_prompt = "Description:\n"
inputs = processor(text=text_prompt, images=image, return_tensors="pt").to("cuda:0")
# autoregressively generate text
generation_output = model.generate(**inputs, max_new_tokens=100)
generation_text = processor.batch_decode(generation_output[:, -100:], skip_special_tokens=True)
print(generation_text)

View File

@@ -1,84 +0,0 @@
from typing import Dict
import PIL.Image
import google.generativeai as genai
from mm_agents.gpt_4v_agent import parse_actions_from_string, parse_code_from_string
from mm_agents.gpt_4v_prompt_action import SYS_PROMPT as SYS_PROMPT_ACTION
from mm_agents.gpt_4v_prompt_code import SYS_PROMPT as SYS_PROMPT_CODE
class GeminiPro_Agent:
def __init__(self, api_key, model='gemini-pro-vision', max_tokens=300, action_space="computer_13"):
genai.configure(api_key)
self.model = genai.GenerativeModel(model)
self.max_tokens = max_tokens
self.action_space = action_space
self.trajectory = [
{
"role": "system",
"parts": [
{
"computer_13": SYS_PROMPT_ACTION,
"pyautogui": SYS_PROMPT_CODE
}[action_space]
]
}
]
def predict(self, obs: Dict):
"""
Predict the next action(s) based on the current observation.
"""
img = PIL.Image.open(obs["screenshot"])
self.trajectory.append({
"role": "user",
"parts": ["To accomplish the task '{}' and given the current screenshot, what's the next step?".format(
obs["instruction"]), img]
})
traj_to_show = []
for i in range(len(self.trajectory)):
traj_to_show.append(self.trajectory[i]["parts"][0])
if len(self.trajectory[i]["parts"]) > 1:
traj_to_show.append("screenshot_obs")
print("Trajectory:", traj_to_show)
response = self.model.generate_content(self.trajectory, max_tokens=self.max_tokens)
try:
# fixme: change to fit the new response format from gemini pro
actions = self.parse_actions(response.json()['choices'][0]['message']['content'])
except:
# todo: add error handling
print("Failed to parse action from response:", response.json()['choices'][0]['message']['content'])
actions = None
return actions
def parse_actions(self, response: str):
# response example
"""
```json
{
"action_type": "CLICK",
"click_type": "RIGHT"
}
```
"""
# parse from the response
if self.action_space == "computer_13":
actions = parse_actions_from_string(response)
elif self.action_space == "pyautogui":
actions = parse_code_from_string(response)
# add action into the trajectory
self.trajectory.append({
"role": "assistant",
"parts": [response]
})
return actions

View File

@@ -0,0 +1,134 @@
import time
from typing import Dict, List
import google.generativeai as genai
from mm_agents.accessibility_tree_wrap.heuristic_retrieve import find_leaf_nodes, filter_nodes
from mm_agents.gpt_4_prompt_action import SYS_PROMPT as SYS_PROMPT_ACTION
from mm_agents.gpt_4_prompt_code import SYS_PROMPT as SYS_PROMPT_CODE
from mm_agents.gpt_4v_agent import parse_actions_from_string, parse_code_from_string
class GeminiPro_Agent:
def __init__(self, api_key, instruction, model='gemini-pro', max_tokens=300, temperature=0.0,
action_space="computer_13"):
genai.configure(api_key=api_key)
self.instruction = instruction
self.model = genai.GenerativeModel(model)
self.max_tokens = max_tokens
self.temperature = temperature
self.action_space = action_space
self.trajectory = [
{
"role": "system",
"parts": [
{
"computer_13": SYS_PROMPT_ACTION,
"pyautogui": SYS_PROMPT_CODE
}[action_space] + "\nHere is the instruction for the task: {}".format(self.instruction)
]
}
]
def predict(self, obs: Dict) -> List:
"""
Predict the next action(s) based on the current observation.
Only support single-round conversation, only fill-in the last desktop screenshot.
"""
accessibility_tree = obs["accessibility_tree"]
leaf_nodes = find_leaf_nodes(accessibility_tree)
filtered_nodes = filter_nodes(leaf_nodes)
linearized_accessibility_tree = "tag\ttext\tposition\tsize\n"
# Linearize the accessibility tree nodes into a table format
for node in filtered_nodes:
linearized_accessibility_tree += node.tag + "\t"
linearized_accessibility_tree += node.attrib.get('name') + "\t"
linearized_accessibility_tree += node.attrib.get(
'{uri:deskat:component.at-spi.gnome.org}screencoord') + "\t"
linearized_accessibility_tree += node.attrib.get('{uri:deskat:component.at-spi.gnome.org}size') + "\n"
self.trajectory.append({
"role": "user",
"parts": [
"Given the XML format of accessibility tree (convert and formatted into table) as below:\n{}\nWhat's the next step that you will do to help with the task?".format(
linearized_accessibility_tree)]
})
# todo: Remove this step once the Gemini supports multi-round conversation
all_message_str = ""
for i in range(len(self.trajectory)):
if i == 0:
all_message_template = "<|im_start|>system\n{}\n<|im_end|>\n"
elif i % 2 == 1:
all_message_template = "<|im_start|>user\n{}\n<|im_end|>\n"
else:
all_message_template = "<|im_start|>assistant\n{}\n<|im_end|>\n"
all_message_str += all_message_template.format(self.trajectory[i]["parts"][0])
print("All message: >>>>>>>>>>>>>>>> ")
print(
all_message_str
)
message_for_gemini = {
"role": "user",
"parts": [all_message_str]
}
traj_to_show = []
for i in range(len(self.trajectory)):
traj_to_show.append(self.trajectory[i]["parts"][0])
if len(self.trajectory[i]["parts"]) > 1:
traj_to_show.append("screenshot_obs")
print("Trajectory:", traj_to_show)
while True:
try:
response = self.model.generate_content(
message_for_gemini,
generation_config={
"max_output_tokens": self.max_tokens,
"temperature": self.temperature
}
)
break
except:
print("Failed to generate response, retrying...")
time.sleep(5)
pass
try:
response_text = response.text
except:
return []
try:
actions = self.parse_actions(response_text)
except:
print("Failed to parse action from response:", response_text)
actions = []
return actions
def parse_actions(self, response: str):
# parse from the response
if self.action_space == "computer_13":
actions = parse_actions_from_string(response)
elif self.action_space == "pyautogui":
actions = parse_code_from_string(response)
else:
raise ValueError("Invalid action space: " + self.action_space)
# add action into the trajectory
self.trajectory.append({
"role": "assistant",
"parts": [response]
})
return actions

View File

@@ -0,0 +1,113 @@
import time
from typing import Dict, List
import PIL.Image
import google.generativeai as genai
from mm_agents.gpt_4v_agent import parse_actions_from_string, parse_code_from_string
from mm_agents.gpt_4v_prompt_action import SYS_PROMPT as SYS_PROMPT_ACTION
from mm_agents.gpt_4v_prompt_code import SYS_PROMPT as SYS_PROMPT_CODE
class GeminiProV_Agent:
def __init__(self, api_key, instruction, model='gemini-pro-vision', max_tokens=300, temperature=0.0,
action_space="computer_13"):
genai.configure(api_key=api_key)
self.instruction = instruction
self.model = genai.GenerativeModel(model)
self.max_tokens = max_tokens
self.temperature = temperature
self.action_space = action_space
self.trajectory = [
{
"role": "system",
"parts": [
{
"computer_13": SYS_PROMPT_ACTION,
"pyautogui": SYS_PROMPT_CODE
}[action_space] + "\nHere is the instruction for the task: {}".format(self.instruction)
]
}
]
def predict(self, obs: Dict) -> List:
"""
Predict the next action(s) based on the current observation.
Only support single-round conversation, only fill-in the last desktop screenshot.
"""
img = PIL.Image.open(obs["screenshot"])
self.trajectory.append({
"role": "user",
"parts": ["What's the next step that you will do to help with the task?", img]
})
# todo: Remove this step once the Gemini supports multi-round conversation
all_message_str = ""
for i in range(len(self.trajectory)):
if i == 0:
all_message_template = "<|im_start|>system\n{}\n<|im_end|>\n"
elif i % 2 == 1:
all_message_template = "<|im_start|>user\n{}\n<|im_end|>\n"
else:
all_message_template = "<|im_start|>assistant\n{}\n<|im_end|>\n"
all_message_str += all_message_template.format(self.trajectory[i]["parts"][0])
message_for_gemini = {
"role": "user",
"parts": [all_message_str, img]
}
traj_to_show = []
for i in range(len(self.trajectory)):
traj_to_show.append(self.trajectory[i]["parts"][0])
if len(self.trajectory[i]["parts"]) > 1:
traj_to_show.append("screenshot_obs")
print("Trajectory:", traj_to_show)
while True:
try:
response = self.model.generate_content(
message_for_gemini,
generation_config={
"max_output_tokens": self.max_tokens,
"temperature": self.temperature
}
)
break
except:
print("Failed to generate response, retrying...")
time.sleep(5)
pass
try:
response_text = response.text
except:
return []
try:
actions = self.parse_actions(response_text)
except:
print("Failed to parse action from response:", response_text)
actions = []
return actions
def parse_actions(self, response: str):
# parse from the response
if self.action_space == "computer_13":
actions = parse_actions_from_string(response)
elif self.action_space == "pyautogui":
actions = parse_code_from_string(response)
else:
raise ValueError("Invalid action space: " + self.action_space)
# add action into the trajectory
self.trajectory.append({
"role": "assistant",
"parts": [response]
})
return actions

195
mm_agents/gpt_4_agent.py Normal file
View File

@@ -0,0 +1,195 @@
import base64
import json
import re
import time
from typing import Dict, List
import requests
from mm_agents.accessibility_tree_wrap.heuristic_retrieve import find_leaf_nodes, filter_nodes
from mm_agents.gpt_4_prompt_action import SYS_PROMPT as SYS_PROMPT_ACTION
from mm_agents.gpt_4_prompt_code import SYS_PROMPT as SYS_PROMPT_CODE
# Function to encode the image
def encode_image(image_path):
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
def parse_actions_from_string(input_string):
# Search for a JSON string within the input string
actions = []
matches = re.findall(r'```json\s+(.*?)\s+```', input_string, re.DOTALL)
if matches:
# Assuming there's only one match, parse the JSON string into a dictionary
try:
for match in matches:
action_dict = json.loads(match)
actions.append(action_dict)
return actions
except json.JSONDecodeError as e:
return f"Failed to parse JSON: {e}"
else:
matches = re.findall(r'```\s+(.*?)\s+```', input_string, re.DOTALL)
if matches:
# Assuming there's only one match, parse the JSON string into a dictionary
try:
for match in matches:
action_dict = json.loads(match)
actions.append(action_dict)
return actions
except json.JSONDecodeError as e:
return f"Failed to parse JSON: {e}"
else:
try:
action_dict = json.loads(input_string)
return [action_dict]
except json.JSONDecodeError as e:
raise ValueError("Invalid response format: " + input_string)
def parse_code_from_string(input_string):
# This regular expression will match both ```code``` and ```python code```
# and capture the `code` part. It uses a non-greedy match for the content inside.
pattern = r"```(?:\w+\s+)?(.*?)```"
# Find all non-overlapping matches in the string
matches = re.findall(pattern, input_string, re.DOTALL)
# The regex above captures the content inside the triple backticks.
# The `re.DOTALL` flag allows the dot `.` to match newline characters as well,
# so the code inside backticks can span multiple lines.
# matches now contains all the captured code snippets
codes = []
for match in matches:
match = match.strip()
commands = ['WAIT', 'DONE', 'FAIL'] # fixme: updates this part when we have more commands
if match in commands:
codes.append(match.strip())
elif match.split('\n')[-1] in commands:
if len(match.split('\n')) > 1:
codes.append("\n".join(match.split('\n')[:-1]))
codes.append(match.split('\n')[-1])
else:
codes.append(match)
return codes
class GPT4_Agent:
def __init__(self, api_key, instruction, model="gpt-4-1106-preview", max_tokens=600, action_space="computer_13"):
self.instruction = instruction
self.model = model
self.max_tokens = max_tokens
self.action_space = action_space
self.headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
self.trajectory = [
{
"role": "system",
"content": [
{
"type": "text",
"text": {
"computer_13": SYS_PROMPT_ACTION,
"pyautogui": SYS_PROMPT_CODE
}[action_space] + "\nHere is the instruction for the task: {}".format(self.instruction)
},
]
}
]
def predict(self, obs: Dict) -> List:
"""
Predict the next action(s) based on the current observation.
"""
accessibility_tree = obs["accessibility_tree"]
leaf_nodes = find_leaf_nodes(accessibility_tree)
filtered_nodes = filter_nodes(leaf_nodes)
linearized_accessibility_tree = "tag\ttext\tposition\tsize\n"
# Linearize the accessibility tree nodes into a table format
for node in filtered_nodes:
linearized_accessibility_tree += node.tag + "\t"
linearized_accessibility_tree += node.attrib.get('name') + "\t"
linearized_accessibility_tree += node.attrib.get(
'{uri:deskat:component.at-spi.gnome.org}screencoord') + "\t"
linearized_accessibility_tree += node.attrib.get('{uri:deskat:component.at-spi.gnome.org}size') + "\n"
self.trajectory.append({
"role": "user",
"content": [
{
"type": "text",
"text": "Given the XML format of accessibility tree as below:\n{}\nWhat's the next step that you will do to help with the task?".format(
linearized_accessibility_tree)
}
]
})
# print(
# "Given the XML format of accessibility tree as below:\n{}\nWhat's the next step that you will do to help with the task?".format(
# linearized_accessibility_tree)
# )
traj_to_show = []
for i in range(len(self.trajectory)):
traj_to_show.append(self.trajectory[i]["content"][0]["text"])
if len(self.trajectory[i]["content"]) > 1:
traj_to_show.append("screenshot_obs")
payload = {
"model": self.model,
"messages": self.trajectory,
"max_tokens": self.max_tokens
}
while True:
try:
response = requests.post("https://api.openai.com/v1/chat/completions", headers=self.headers,
json=payload)
break
except:
print("Failed to generate response, retrying...")
time.sleep(5)
pass
try:
actions = self.parse_actions(response.json()['choices'][0]['message']['content'])
except:
print("Failed to parse action from response:", response.json())
actions = None
return actions
def parse_actions(self, response: str):
# parse from the response
if self.action_space == "computer_13":
actions = parse_actions_from_string(response)
elif self.action_space == "pyautogui":
actions = parse_code_from_string(response)
else:
raise ValueError("Invalid action space: " + self.action_space)
# add action into the trajectory
self.trajectory.append({
"role": "assistant",
"content": [
{
"type": "text",
"text": response
},
]
})
return actions

View File

@@ -0,0 +1,244 @@
SYS_PROMPT = """
You will act as an agent which follow my instruction and perform desktop computer tasks as instructed. You must have good knowledge of computer and good internet connection.
For each step, you will get an observation of the desktop by the XML format of accessibility tree, which is based on AT-SPI library. And you will predict the action of the computer based on the accessibility tree.
HERE is the description of the action space you need to predict, follow the format and choose the correct action type and parameters:
ACTION_SPACE = [
{
"action_type": "MOVE_TO",
"note": "move the cursor to the specified position",
"parameters": {
"x": {
"type": float,
"range": [0, X_MAX],
"optional": False,
},
"y": {
"type": float,
"range": [0, Y_MAX],
"optional": False,
}
}
},
{
"action_type": "CLICK",
"note": "click the left button if the button not specified, otherwise click the specified button; click at the current position if x and y are not specified, otherwise click at the specified position",
"parameters": {
"button": {
"type": str,
"range": ["left", "right", "middle"],
"optional": True,
},
"x": {
"type": float,
"range": [0, X_MAX],
"optional": True,
},
"y": {
"type": float,
"range": [0, Y_MAX],
"optional": True,
},
"num_clicks": {
"type": int,
"range": [1, 2, 3],
"optional": True,
},
}
},
{
"action_type": "MOUSE_DOWN",
"note": "press the left button if the button not specified, otherwise press the specified button",
"parameters": {
"button": {
"type": str,
"range": ["left", "right", "middle"],
"optional": True,
}
}
},
{
"action_type": "MOUSE_UP",
"note": "release the left button if the button not specified, otherwise release the specified button",
"parameters": {
"button": {
"type": str,
"range": ["left", "right", "middle"],
"optional": True,
}
}
},
{
"action_type": "RIGHT_CLICK",
"note": "right click at the current position if x and y are not specified, otherwise right click at the specified position",
"parameters": {
"x": {
"type": float,
"range": [0, X_MAX],
"optional": True,
},
"y": {
"type": float,
"range": [0, Y_MAX],
"optional": True,
}
}
},
{
"action_type": "DOUBLE_CLICK",
"note": "double click at the current position if x and y are not specified, otherwise double click at the specified position",
"parameters": {
"x": {
"type": float,
"range": [0, X_MAX],
"optional": True,
},
"y": {
"type": float,
"range": [0, Y_MAX],
"optional": True,
}
}
},
{
"action_type": "DRAG_TO",
"note": "drag the cursor to the specified position with the left button pressed",
"parameters": {
"x": {
"type": float,
"range": [0, X_MAX],
"optional": False,
},
"y": {
"type": float,
"range": [0, Y_MAX],
"optional": False,
}
}
},
{
"action_type": "SCROLL",
"note": "scroll the mouse wheel up or down",
"parameters": {
"dx": {
"type": int,
"range": None,
"optional": False,
},
"dy": {
"type": int,
"range": None,
"optional": False,
}
}
},
{
"action_type": "TYPING",
"note": "type the specified text",
"parameters": {
"text": {
"type": str,
"range": None,
"optional": False,
}
}
},
{
"action_type": "PRESS",
"note": "press the specified key and release it",
"parameters": {
"key": {
"type": str,
"range": KEYBOARD_KEYS,
"optional": False,
}
}
},
{
"action_type": "KEY_DOWN",
"note": "press the specified key",
"parameters": {
"key": {
"type": str,
"range": KEYBOARD_KEYS,
"optional": False,
}
}
},
{
"action_type": "KEY_UP",
"note": "release the specified key",
"parameters": {
"key": {
"type": str,
"range": KEYBOARD_KEYS,
"optional": False,
}
}
},
{
"action_type": "HOTKEY",
"note": "press the specified key combination",
"parameters": {
"keys": {
"type": list,
"range": [KEYBOARD_KEYS],
"optional": False,
}
}
},
############################################################################################################
{
"action_type": "WAIT",
"note": "wait until the next action",
},
{
"action_type": "FAIL",
"note": "decide the task can not be performed",
},
{
"action_type": "DONE",
"note": "decide the task is done",
}
]
Firstly you need to predict the class of your action, then you need to predict the parameters of your action:
- For MOUSE_MOVE, you need to predict the x and y coordinate of the mouse cursor, the left top corner of the screen is (0, 0), the right bottom corner of the screen is (1920, 1080)
for example, format as:
```
{
"action_type": "MOUSE_MOVE",
"x": 1319.11,
"y": 65.06
}
```
- For [CLICK, MOUSE_DOWN, MOUSE_UP], you need to specify the click_type as well, select from [LEFT, MIDDLE, RIGHT, WHEEL_UP, WHEEL_DOWN], which means you click the left button, middle button, right button, wheel up or wheel down of your mouse:
for example, format as:
```
{
"action_type": "CLICK",
"click_type": "LEFT"
}
```
- For [KEY, KEY_DOWN, KEY_UP], you need to choose a(multiple) key(s) from the keyboard
for example, format as:
```
{
"action_type": "KEY",
"key": "ctrl+c"
}
```
- For TYPE, you need to specify the text you want to type
for example, format as:
```
{
"action_type": "TYPE",
"text": "hello world"
}
```
REMEMBER:
For every step, you should only RETURN ME THE action_type AND parameters I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE.
You MUST wrap the dict with backticks (\`).
You MUST choose and ONLY CHOOSE from the action space above, otherwise your action will be considered as invalid and you will get a penalty.
You CAN predict multiple actions at one step, but you should only return one action for each step.
"""

View File

@@ -0,0 +1,18 @@
SYS_PROMPT = """
You are an agent which follow my instruction and perform desktop computer tasks as instructed.
You have good knowledge of computer and good internet connection and assume your code will run on a computer for controlling the mouse and keyboard.
For each step, you will get an observation of the desktop by the XML format of accessibility tree, which is based on AT-SPI library. And you will predict the action of the computer based on the accessibility tree.
You are required to use `pyautogui` to perform the action.
Return one line or multiple lines of python code to perform the action each time, be time efficient.
You ONLY need to return the code inside a code block, like this:
```python
# your code here
```
Specially, it is also allowed to return the following special code:
When you think you have to wait for some time, return ```WAIT```;
When you think the task can not be done, return ```FAIL```, don't easily say ```FAIL```, try your best to do the task;
When you think the task is done, return ```DONE```.
First give the current screenshot and previous things we did a reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE.
"""

View File

@@ -1,7 +1,8 @@
import base64
import json
import re
from typing import Dict
import time
from typing import Dict, List
import requests
@@ -63,7 +64,8 @@ def parse_code_from_string(input_string):
class GPT4v_Agent:
def __init__(self, api_key, model="gpt-4-vision-preview", max_tokens=300, action_space="computer_13"):
def __init__(self, api_key, instruction, model="gpt-4-vision-preview", max_tokens=300, action_space="computer_13"):
self.instruction = instruction
self.model = model
self.max_tokens = max_tokens
self.action_space = action_space
@@ -80,15 +82,15 @@ class GPT4v_Agent:
{
"type": "text",
"text": {
"computer_13": SYS_PROMPT_ACTION,
"pyautogui": SYS_PROMPT_CODE
}[action_space]
"computer_13": SYS_PROMPT_ACTION,
"pyautogui": SYS_PROMPT_CODE
}[action_space] + "\nHere is the instruction for the task: {}".format(self.instruction)
},
]
}
]
def predict(self, obs: Dict):
def predict(self, obs: Dict) -> List:
"""
Predict the next action(s) based on the current observation.
"""
@@ -98,8 +100,7 @@ class GPT4v_Agent:
"content": [
{
"type": "text",
"text": "To accomplish the task '{}' and given the current screenshot, what's the next step?".format(
obs["instruction"])
"text": "What's the next step that you will do to help with the task?"
},
{
"type": "image_url",
@@ -123,33 +124,32 @@ class GPT4v_Agent:
"messages": self.trajectory,
"max_tokens": self.max_tokens
}
response = requests.post("https://api.openai.com/v1/chat/completions", headers=self.headers, json=payload)
while True:
try:
response = requests.post("https://api.openai.com/v1/chat/completions", headers=self.headers,
json=payload)
break
except:
print("Failed to generate response, retrying...")
time.sleep(5)
pass
try:
actions = self.parse_actions(response.json()['choices'][0]['message']['content'])
except:
# todo: add error handling
print("Failed to parse action from response:", response.json()['choices'][0]['message']['content'])
print("Failed to parse action from response:", response.json())
actions = None
return actions
def parse_actions(self, response: str):
# response example
"""
```json
{
"action_type": "CLICK",
"click_type": "RIGHT"
}
```
"""
# parse from the response
if self.action_space == "computer_13":
actions = parse_actions_from_string(response)
elif self.action_space == "pyautogui":
actions = parse_code_from_string(response)
else:
raise ValueError("Invalid action space: " + self.action_space)
# add action into the trajectory
self.trajectory.append({

View File

@@ -237,7 +237,7 @@ for example, format as:
```
REMEMBER:
For every step, you should only return the action_type and the parameters of your action as a dict, without any other things.
For every step, you should only RETURN ME THE action_type AND parameters I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE.
You MUST wrap the dict with backticks (\`).
You MUST choose and ONLY CHOOSE from the action space above, otherwise your action will be considered as invalid and you will get a penalty.
You CAN predict multiple actions at one step, but you should only return one action for each step.

View File

@@ -1,11 +1,18 @@
SYS_PROMPT = """
You will act as an agent which follow my instruction and perform desktop computer tasks as instructed. You must have good knowledge of computer and good internet connection.
For each step, you will get an observation of an image, which is the screenshot of the computer screen. And you will predict the action of the computer based on the image.
You are an agent which follow my instruction and perform desktop computer tasks as instructed.
You have good knowledge of computer and good internet connection and assume your code will run on a computer for controlling the mouse and keyboard.
For each step, you will get an observation of an image, which is the screenshot of the computer screen and you will predict the action of the computer based on the image.
You are required to use `pyautogui` to perform the action.
Return one line or multiple lines of python code to perform the action each time, be time efficient.
You ONLY need to return the code inside a code block, like this:
```python
# your code here
```
Specially, it is also allowed to return the following special code:
When you think you have to wait for some time, return ```WAIT```;
When you think the task can not be done, return ```FAIL```, don't easily say ```FAIL```, try your best to do the task;
When you think the task is done, return ```DONE```.
When you think you have to wait for some time, return `WAIT`.
When you think the task can not be done, return `FAIL`.
When you think the task is done, return `DONE`.
First give the current screenshot and previous things we did a reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE.
"""

View File

@@ -0,0 +1 @@
Deprecated since we found we can use `accelaerator` to do the same thing. But can be potentially used in the future when only access to screen is available.

View File

@@ -30,3 +30,5 @@ ImageHash
scikit-image
librosa
pymupdf
chardet
playwright