Finish Aguvis eval on OSWorld (#107)

* Initialize Aguvis eval on OSWorld

* Debug

* Debug

* v1, internal version

* Add experiments script

* Fix minor bugs

* Update new endpoint

* Update ip

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Fix model name

* Fix docker close issues; update prompting

* Fix missed

* Fix the default port to avoid crashing on examples like '_update_browse_history_setup'

* Fix server and chromium ports in setup

* Revert and add missed dependency

* Add VLC port for docker

* Update

* Aguvis Grounding

* Add Aguvis as planner

* fix parse bug

* fix pause

* fix planner prompt

* Aguvis Grounding

* fix

* fix

* fix

* add logger for each example

* Modify Aguvis Planner Prompts

* fix logger setup

* fix absolute coordinates

* Finish Aguvis Evaluation on OSWorld

* Merge origin/main into junli/aguvis

* Remove screenshot

---------

Co-authored-by: Tianbao Xie <tianbaoxie@U-492FC39R-0217.local>
Co-authored-by: Timothyxxx <384084775@qq.com>
Co-authored-by: FredWuCZ <fredwucz@outlook.com>
This commit is contained in:
Junli Wang
2024-11-24 16:43:25 +08:00
committed by GitHub
parent 7d84a21962
commit 1503eb3994
6 changed files with 407 additions and 247 deletions

View File

@@ -223,7 +223,7 @@ class DesktopEnv(gym.Env):
or (len(self.metric) == len(self.result_getter) == len(self.expected_getter) == len(
self.metric_options)))
def step(self, action, pause=0.5):
def step(self, action, pause=2):
self._step_no += 1
self.action_history.append(action)
@@ -252,6 +252,7 @@ class DesktopEnv(gym.Env):
# the set of all possible python commands insides `pyautogui`
self.controller.execute_python_command(action)
time.sleep(pause)
observation = self._get_obs()
return observation, reward, done, info

View File

@@ -7,8 +7,10 @@ from wrapt_timeout_decorator import *
logger = logging.getLogger("desktopenv.experiment")
def run_single_example(agent, env, example, max_steps, instruction, args, example_result_dir, scores):
agent.reset()
runtime_logger = setup_logger(example, example_result_dir)
agent.reset(runtime_logger)
obs = env.reset(task_config=example)
done = False
step_idx = 0
@@ -51,3 +53,10 @@ def run_single_example(agent, env, example, max_steps, instruction, args, exampl
with open(os.path.join(example_result_dir, "result.txt"), "w", encoding="utf-8") as f:
f.write(f"{result}\n")
env.controller.end_recording(os.path.join(example_result_dir, "recording.mp4"))
def setup_logger(example, example_result_dir):
runtime_logger = logging.getLogger(f"desktopenv.example.{example['id']}")
runtime_logger.setLevel(logging.DEBUG)
runtime_logger.addHandler(logging.FileHandler(os.path.join(example_result_dir, "runtime.log")))
return runtime_logger

View File

@@ -7,7 +7,7 @@ import tempfile
import time
from http import HTTPStatus
from io import BytesIO
from typing import Dict, List
from typing import Dict, List, Tuple
import backoff
import openai
@@ -15,8 +15,17 @@ import requests
from PIL import Image
from google.api_core.exceptions import InvalidArgument, ResourceExhausted, InternalServerError, BadRequest
from requests.exceptions import SSLError
from mm_agents.prompts import (
AGUVIS_PLANNER_SYS_PROMPT,
AGUVIS_SYS_PROMPT,
AGUVIS_PLANNING_PROMPT,
AGUVIS_INNER_MONOLOGUE_APPEND_PROMPT,
AGUVIS_GROUNDING_PROMPT,
AGUVIS_GROUNDING_APPEND_PROMPT
)
logger = None
logger = logging.getLogger("desktopenv.aguvis_agent")
# Function to encode the image
def encode_image(image_content):
@@ -41,48 +50,184 @@ def save_to_tmp_img_file(data_str):
return tmp_img_path
# TODO: hardcoded screen size, need to be fixed
SCREEN_LOGIC_SIZE = (1280, 800)
wait_func = {"name": "WAIT", "description": "wait for a moment"}
done_func = {"name": "DONE", "description": "done with the task"}
fail_func = {"name": "FAIL", "description": "fail to complete the task"}
# FIXME: hardcoded screen size and planner system message
SCREEN_LOGIC_SIZE = (1280, 720)
SYS_PROMPT = f"""You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.
"""
# TODO: let GPT not to predict non-atomic actions,
PLANNER_OUTPUT_FORMAT_SYS_PROMPT = """Your response should be formatted as follows:
Thought: *Describe your understanding of the current situation and consider what you need to do next.*
Action: *State the specific action you have decided to perform, described in natural language.*
def parse_code_from_planner_response(input_string: str) -> List[str]:
"""Parse the planner's response containing executable pyautogui code"""
**Note:** Please **do not** predict non-atomic actions. For example, for multi-step operations like "click then select the date," only predict the first atomic action (e.g., "click") at this time, and leave subsequent steps (like click for selecting the date) for the next planning phase.
input_string = "\n".join([line.strip() for line in input_string.split(';') if line.strip()])
if input_string.strip() in ['WAIT', 'DONE', 'FAIL']:
return [input_string.strip()]
**Example:**
Thought: To proceed with booking a hotel, I must first specify the check-in and check-out dates for the stay. Since the objective is to book a three-night stay starting from the 1st of June, I need to input these dates into the form to find available accommodations.
Action: Click on the "Choose date" button in the Check-in field to start selecting the stay dates.
# This regular expression will match both ```code``` and ```python code```
# and capture the `code` part. It uses a non-greedy match for the content inside.
pattern = r"```(?:\w+\s+)?(.*?)```"
# Find all non-overlapping matches in the string
matches = re.findall(pattern, input_string, re.DOTALL)
Addtionally, you can use the following functions:
- {json.dumps(wait_func)}
- {json.dumps(done_func)}
- {json.dumps(fail_func)}
# The regex above captures the content inside the triple backticks.
# The `re.DOTALL` flag allows the dot `.` to match newline characters as well,
# so the code inside backticks can span multiple lines.
**Example 1:**
Thought: I need to wait for a moment before proceeding.
Action: WAIT
# matches now contains all the captured code snippets
codes = []
**Example 2:**
Thought: I have completed the task.
Action: DONE
"""
for match in matches:
match = match.strip()
commands = ['WAIT', 'DONE', 'FAIL']
INSTRUCTION_PROMPT = """Please generate the next move according to the UI screenshot, instruction and previous actions.
if match in commands:
codes.append(match.strip())
elif match.split('\n')[-1] in commands:
if len(match.split('\n')) > 1:
codes.append("\n".join(match.split('\n')[:-1]))
codes.append(match.split('\n')[-1])
else:
codes.append(match)
Instruction: {instruction}
"""
return codes
def parse_aguvis_response(input_string, screen_logic_size=SCREEN_LOGIC_SIZE) -> Tuple[str, List[str]]:
if input_string.lower().startswith("wait"):
return "WAIT", "WAIT"
elif input_string.lower().startswith("done"):
return "DONE", "DONE"
elif input_string.lower().startswith("fail"):
return "FAIL", "FAIL"
try:
lines = input_string.strip().split("\n")
lines = [line for line in lines if line.strip() != ""]
low_level_instruction = lines[0]
pyautogui_index = -1
for i, line in enumerate(lines):
if line.strip() == "assistantos" or line.strip().startswith("pyautogui"):
pyautogui_index = i
break
if pyautogui_index == -1:
print(f"Error: Could not parse response {input_string}")
return None, None
pyautogui_code_relative_coordinates = "\n".join(lines[pyautogui_index:])
pyautogui_code_relative_coordinates = pyautogui_code_relative_coordinates.replace("assistantos", "").strip()
corrected_code = correct_pyautogui_arguments(pyautogui_code_relative_coordinates)
parsed_action = _pyautogui_code_to_absolute_coordinates(corrected_code, screen_logic_size)
return low_level_instruction, parsed_action
except Exception as e:
print(f"Error: Could not parse response {input_string}")
return None, None
def correct_pyautogui_arguments(code: str) -> str:
function_corrections = {
'write': {
'incorrect_args': ['text'],
'correct_args': [],
'keyword_arg': 'message'
},
'press': {
'incorrect_args': ['key', 'button'],
'correct_args': [],
'keyword_arg': None
},
'hotkey': {
'incorrect_args': ['key1', 'key2', 'keys'],
'correct_args': [],
'keyword_arg': None
},
}
lines = code.strip().split('\n')
corrected_lines = []
for line in lines:
line = line.strip()
match = re.match(r'(pyautogui\.(\w+))\((.*)\)', line)
if match:
full_func_call = match.group(1)
func_name = match.group(2)
args_str = match.group(3)
if func_name in function_corrections:
func_info = function_corrections[func_name]
args = split_args(args_str)
corrected_args = []
for arg in args:
arg = arg.strip()
kwarg_match = re.match(r'(\w+)\s*=\s*(.*)', arg)
if kwarg_match:
arg_name = kwarg_match.group(1)
arg_value = kwarg_match.group(2)
if arg_name in func_info['incorrect_args']:
if func_info['keyword_arg']:
corrected_args.append(f"{func_info['keyword_arg']}={arg_value}")
else:
corrected_args.append(arg_value)
else:
corrected_args.append(f'{arg_name}={arg_value}')
else:
corrected_args.append(arg)
corrected_args_str = ', '.join(corrected_args)
corrected_line = f'{full_func_call}({corrected_args_str})'
corrected_lines.append(corrected_line)
else:
corrected_lines.append(line)
else:
corrected_lines.append(line)
corrected_code = '\n'.join(corrected_lines)
return corrected_code
def split_args(args_str: str) -> List[str]:
args = []
current_arg = ''
within_string = False
string_char = ''
prev_char = ''
for char in args_str:
if char in ['"', "'"]:
if not within_string:
within_string = True
string_char = char
elif within_string and prev_char != '\\' and char == string_char:
within_string = False
if char == ',' and not within_string:
args.append(current_arg)
current_arg = ''
else:
current_arg += char
prev_char = char
if current_arg:
args.append(current_arg)
return args
def extract_coordinates(text, logical_screen_size=SCREEN_LOGIC_SIZE) -> Tuple[int, int] | None:
# Pattern to match (x=0.1, y=0.2) or (0.1, 0.2) format
text = text.strip()
logger.info(f"Extracting coordinates from: {text}")
pattern = r'\((?:x=)?([-+]?\d*\.\d+|\d+)(?:,\s*(?:y=)?([-+]?\d*\.\d+|\d+))?\)'
match = re.search(pattern, text)
if match:
x = int(float(match.group(1)) * logical_screen_size[0])
y = int(float(match.group(2)) * logical_screen_size[1]) if match.group(2) else None
if y is not None:
return (x, y)
logger.info(f"Error: No coordinates found in: {text}")
return None
ACTION_PROMPT = """Previous actions:
"""
def _pyautogui_code_to_absolute_coordinates(pyautogui_code_relative_coordinates, logical_screen_size=SCREEN_LOGIC_SIZE):
"""
@@ -203,65 +348,6 @@ def _pyautogui_code_to_absolute_coordinates(pyautogui_code_relative_coordinates,
return new_code
def _parse(text, screen_logic_size=SCREEN_LOGIC_SIZE):
if text.lower().startswith("wait"):
return "WAIT", "WAIT"
elif text.lower().startswith("done"):
return "DONE", "DONE"
elif text.lower().startswith("fail"):
return "FAIL", "FAIL"
try:
lines = text.strip().split("\n")
lines = [line for line in lines if line.strip() != ""] # Remove empty lines
pyautogui_index = -1
for i, line in enumerate(lines):
if line.strip() == "assistantos" or line.strip().startswith("pyautogui"):
pyautogui_index = i
break
if pyautogui_index == -1:
print(f"Error: Could not parse response {text}")
return None, None # Return None or handle the error as needed
pyautogui_code_relative_coordinates = "\n".join(lines[pyautogui_index:])
# remove the assistantos prefix, ugly, fix later
pyautogui_code_relative_coordinates = pyautogui_code_relative_coordinates.replace("assistantos", "")
parsed_action = _pyautogui_code_to_absolute_coordinates(pyautogui_code_relative_coordinates, screen_logic_size)
return parsed_action
except Exception as e:
print(f"Error: Could not parse response {text}")
return None
def parse_planner_response(planner_response):
try:
# Split the response into lines for easier parsing
lines = planner_response.splitlines()
# Initialize variables to store thought and action
thought = None
action_description = None
# Iterate over each line to find the thought and action
for line in lines:
# Check if the line starts with 'Thought:'
if line.startswith("Thought:"):
# Extract the part after 'Thought: ' as the thought
thought = line[len("Thought: "):].strip()
# Check if the line starts with 'Action:'
elif line.startswith("Action:"):
# Extract the part after 'Action: ' as the action
action_description = line[len("Action: "):].strip()
# Return the thought and action as a dictionary
return thought, action_description
except Exception as e:
print(f"Error: Could not parse response {planner_response}")
return "", ""
class AguvisAgent:
def __init__(
@@ -294,159 +380,177 @@ class AguvisAgent:
"""
Predict the next action(s) based on the current observation.
"""
# Prepare the payload for the API call
messages = []
masks = None
self.observations.append(obs["screenshot"])
messages.append({
"role": "system",
"content": [
{
"type": "text",
"text": SYS_PROMPT
},
]
})
instruction_prompt = INSTRUCTION_PROMPT.format(instruction=instruction)
history_actions_prompt = ACTION_PROMPT
# thought, or so called action description
for i, action_description in enumerate(self.action_descriptions):
history_actions_prompt += f"Step {i+1}: {action_description}\n"
if len(history_actions_prompt) > 0:
instruction_prompt += "\n\n" + history_actions_prompt
base64_img = encode_image(obs["screenshot"])
messages.append({
"role": "user",
"content": [
{
"type": "text",
"text": instruction_prompt
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{base64_img}",
"detail": "high"
}
}
]
})
previous_actions = "\n".join([f"Step {i+1}: {action}" for i, action in enumerate(self.actions)]) if self.actions else "None"
if self.planner_model is None:
# For now, we call the same model twice, one for planner and one for executor,
# This can be improved later when the inference stop token fixed
messages.append({
"role": "assistant",
"content": [
{
"type": "text",
"text": """<|recipient|>all\nAction: """
}
]
aguvis_messages = []
aguvis_messages.append({
"role": "system",
"content": [{"type": "text", "text": AGUVIS_SYS_PROMPT}]
})
with open("messages_direct_executor.json", "w") as f:
f.write(json.dumps(messages, indent=4))
executor_response = self.call_llm({
"model": self.executor_model,
"messages": messages,
"max_tokens": self.max_tokens,
"top_p": self.top_p,
"temperature": self.temperature
}, self.executor_model)
logger.info("EXECUTOR RESPONSE: %s", executor_response)
pyautogui_action = _parse(executor_response)
thought, action_description = parse_planner_response("Action: " + executor_response)
self.thoughts.append(thought)
self.action_descriptions.append(action_description)
self.actions.append(pyautogui_action)
return executor_response, [pyautogui_action]
else:
# Planner stage
messages.append({
aguvis_messages.append({
"role": "user",
"content": [
{
"type": "text",
"text": PLANNER_OUTPUT_FORMAT_SYS_PROMPT + "\nThought:"
"text": AGUVIS_PLANNING_PROMPT.format(
instruction=instruction,
previous_actions=previous_actions,
)
},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{encode_image(obs['screenshot'])}"}
}
]
],
})
planner_response = self.call_llm({
"model": self.planner_model,
"messages": messages,
"max_tokens": self.max_tokens,
"top_p": self.top_p,
"temperature": self.temperature
}, self.planner_model)
logger.info("PLANNER RESPONSE: %s", planner_response)
thought, action_description = parse_planner_response(planner_response)
self.thoughts.append(thought)
self.action_descriptions.append(action_description)
if "WAIT" in action_description:
self.actions.append("WAIT")
return planner_response, ["WAIT"]
elif "DONE" in action_description:
self.actions.append("DONE")
return planner_response, ["DONE"]
elif "FAIL" in action_description:
self.actions.append("FAIL")
return planner_response, ["FAIL"]
messages[1]["content"][0]["text"] = INSTRUCTION_PROMPT.format(instruction=action_description)
# pretend nothing happend with stronger planner model
messages[-1] = {
aguvis_messages.append({
"role": "assistant",
"content": [
{
"type": "text",
# "text": f"""<|recipient|>all\nAction: {action_description}<|im_end|>\n<|im_start|>assistant<|recipient|>os"""
"text": f"""<|recipient|>os"""
}
{"type": "text", "text": AGUVIS_INNER_MONOLOGUE_APPEND_PROMPT}
]
}
with open("messages_executor.json", "w") as f:
f.write(json.dumps(messages, indent=4))
# Executor stage
executor_response = self.call_llm({
})
aguvis_response = self.call_llm({
"model": self.executor_model,
"messages": messages,
"messages": aguvis_messages,
"max_tokens": self.max_tokens,
"top_p": self.top_p,
"temperature": self.temperature
}, self.executor_model)
logger.info(f"Aguvis Output: {aguvis_response}")
low_level_instruction, pyautogui_actions = parse_aguvis_response(aguvis_response)
logger.info("EXECUTOR RESPONSE: %s", executor_response)
self.actions.append(low_level_instruction)
return aguvis_response, [pyautogui_actions]
else:
# FIXME [junli]:
# Using an external planner (GPT-4o) requires relying on more
# detailed prompt to provide Aguvis with low level instructions.
# So we temporarily separate the planner prompt and aguvis prompt.
pyautogui_action = _parse(executor_response)
self.actions.append(pyautogui_action)
planner_messages = []
planner_system_message = AGUVIS_PLANNER_SYS_PROMPT
planner_messages.append({
"role": "system",
"content": [{"type": "text", "text": planner_system_message}]
})
planner_messages.append(
{
"role": "user",
"content": [
{
"type": "text",
"text": f"You are asked to complete the following task: {instruction}"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{encode_image(obs['screenshot'])}",
"detail": "high"
}
}
]
}
)
planner_response = self.call_llm({
"model": self.planner_model,
"messages": planner_messages,
"max_tokens": self.max_tokens,
"top_p": self.top_p,
"temperature": self.temperature
}, self.planner_model)
logger.info(f"Planner output: {planner_response}")
code = parse_code_from_planner_response(planner_response)
pyautogui_actions = []
for line in code:
code = self.convert_action_to_grounding_model_instruction(
line,
obs,
instruction,
)
pyautogui_actions.append(code)
return planner_response + "\n\n" + executor_response, [pyautogui_action]
return "", pyautogui_actions
def convert_action_to_grounding_model_instruction(
self, line: str, obs: Dict, instruction: str
) -> str:
pattern = r'(#.*?)\n(pyautogui\.(moveTo|click|rightClick)\((?:x=)?(\d+)(?:,\s*|\s*,\s*y=)(\d+)(?:,\s*duration=[\d.]+)?\))'
matches = re.findall(pattern, line, re.DOTALL)
if not matches:
return line
new_instruction = line
for match in matches:
comment = match[0].split("#")[1].strip()
original_action = match[1]
func_name = match[2].strip()
if "click()" in original_action.lower():
continue # Skip click() without coordinates
aguvis_messages = []
aguvis_messages.append({
"role": "system",
"content": [{"type": "text", "text": AGUVIS_SYS_PROMPT}]
})
aguvis_messages.append(
{
"role": "user",
"content": [
{
"type": "text",
"text": AGUVIS_GROUNDING_PROMPT.format(
instruction=comment,
),
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{encode_image(obs['screenshot'])}",
"detail": "high",
},
},
],
}
)
aguvis_messages.append(
{
"role": "assistant",
"content": [
{"type": "text", "text": AGUVIS_GROUNDING_APPEND_PROMPT.format(function_name=func_name)}
],
}
)
grounding_response = self.call_llm({
"model": self.executor_model,
"messages": aguvis_messages,
"max_tokens": self.max_tokens,
"top_p": self.top_p,
"temperature": self.temperature
}, self.executor_model)
coordinates = extract_coordinates(grounding_response, SCREEN_LOGIC_SIZE)
# FIXME [junli]: Use ast to reconstruct the action with coordinates
action_parts = original_action.split('(')
new_action = f"{action_parts[0]}({coordinates[0]}, {coordinates[1]}"
if len(action_parts) > 1 and 'duration' in action_parts[1]:
duration_part = action_parts[1].split(',')[-1]
new_action += f", {duration_part}"
elif len(action_parts) > 1 and 'button' in action_parts[1]:
button_part = action_parts[1].split(',')[-1]
new_action += f", {button_part}"
else:
new_action += ")"
logger.info(new_action)
new_instruction = new_instruction.replace(original_action, new_action)
return new_instruction
@backoff.on_exception(
backoff.constant,
# here you should add more model exceptions as you want,
# but you are forbidden to add "Exception", that is, a common type of exception
# because we want to catch this kind of Exception in the outside to ensure each example won't exceed the time limit
# because we want to catch this kind of Exception in the outside to ensure
# each example won't exceed the time limit
(
# General exceptions
SSLError,
@@ -469,7 +573,6 @@ class AguvisAgent:
max_tries=10
)
def call_llm(self, payload, model):
if model.startswith("gpt"):
headers = {
"Content-Type": "application/json",
@@ -479,7 +582,6 @@ class AguvisAgent:
logger.info("Generating content with GPT model: %s", model)
response = requests.post(
"https://api.openai.com/v1/chat/completions",
# "http://47.88.8.18:8088/v1/chat/completions",
headers=headers,
json=payload
)
@@ -490,17 +592,26 @@ class AguvisAgent:
return ""
else:
return response.json()['choices'][0]['message']['content']
elif "aguvis" in model:
headers = {
"Content-Type": "application/json",
}
logger.info("Generating content with Aguvis model: %s", model)
response = requests.post(
"http://101.132.136.195:7908/v1/chat/completions",
headers=headers,
json=payload
)
if "7b" in model:
response = requests.post(
"http://101.132.136.195:7908/v1/chat/completions",
headers=headers,
json=payload
)
elif "72b" in model:
response = requests.post(
"http://123.57.10.166:7908/v1/chat/completions",
headers=headers,
json=payload
)
else:
raise Exception("Unsupported Aguvis model version")
if response.status_code != 200:
logger.error("Failed to call LLM: " + response.text)
@@ -509,22 +620,11 @@ class AguvisAgent:
else:
return response.json()['choices'][0]['message']['content']
def reset(self, _logger=None):
global logger
logger = _logger if _logger is not None else logging.getLogger("desktopenv.aguvis_agent")
def reset(self):
self.thoughts = []
self.action_descriptions = []
self.actions = []
self.observations = []
if __name__ == "__main__":
agent = AguvisAgent()
with open("screenshot.png", "rb") as f:
screenshot = f.read()
agent.predict("Add a new paper to my list", {"screenshot": screenshot})
# relative_code = """pyautogui.typewrite("Hello, world! I have a float number 0.172")
# pyautogui.click(0, 1, n_click=1)
# pyautogui.moveTo(0.5342, 0.5342)
# """
# absolute_code = _pyautogui_code_to_absolute_coordinates(relative_code, logical_screen_size=(1920, 1080))
# print(absolute_code)

View File

@@ -1145,3 +1145,51 @@ When you think the task is done, return ```DONE```.
My computer's password is 'password', feel free to use it when you need sudo rights.
First give the current screenshot and previous things we did a short reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE.
"""
AGUVIS_PLANNER_SYS_PROMPT = """
You are an agent which follow my instruction and perform desktop computer tasks as instructed.
You have good knowledge of computer and good internet connection and assume your code will run on a computer for controlling the mouse and keyboard.
For each step, you will get an observation of an image, which is the screenshot of the computer screen and you will predict the action of the computer based on the image.
You are required to use `pyautogui` to perform the action grounded to the observation, but DONOT use the `pyautogui.locateCenterOnScreen` function to locate the element you want to operate with since we have no image of the element you want to operate with. DONOT USE `pyautogui.screenshot()` to make screenshot.
Return exactly ONE line of python code to perform the action each time. At each step, you MUST generate the corresponding instruction to the code before a # in a comment (example: # Click \"Yes, I trust the authors\" button\npyautogui.click(x=0, y=0, duration=1)\n)
You need to to specify the coordinates of by yourself based on your observation of current observation, but you should be careful to ensure that the coordinates are correct.
You ONLY need to return the code inside a code block, like this:
```python
# your code here
```
Specially, it is also allowed to return the following special code:
When you think you have to wait for some time, return ```WAIT```;
When you think the task can not be done, return ```FAIL```, don't easily say ```FAIL```, try your best to do the task;
When you think the task is done, return ```DONE```.
Here are some guidelines for you:
1. Remember to generate the corresponding instruction to the code before a # in a comment.
2. If a click action is needed, use only the following functions: pyautogui.click, pyautogui.rightClick or pyautogui.doubleClick.
3. Return ```Done``` when you think the task is done. Return ```Fail``` when you think the task can not be done.
My computer's password is 'password', feel free to use it when you need sudo rights.
First give the current screenshot and previous things we did a short reflection, then RETURN ME THE CODE OR SPECIAL CODE I ASKED FOR. NEVER EVER RETURN ME ANYTHING ELSE.
""".strip()
AGUVIS_SYS_PROMPT = """You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.
"""
AGUVIS_PLANNING_PROMPT = """Please generate the next move according to the UI screenshot, instruction and previous actions.
Instruction: {instruction}.
Previous actions:
{previous_actions}
"""
AGUVIS_INNER_MONOLOGUE_APPEND_PROMPT = """<|recipient|>all
Action: """
AGUVIS_GROUNDING_PROMPT = """Please generate the next move according to the UI screenshot, instruction and previous actions.
Instruction: {instruction}
"""
AGUVIS_GROUNDING_APPEND_PROMPT = """<|recipient|>os
pyautogui.{function_name}"""

View File

@@ -59,3 +59,4 @@ azure-identity
azure-mgmt-compute
azure-mgmt-network
docker
loguru

View File

@@ -82,7 +82,7 @@ def config() -> argparse.Namespace:
)
parser.add_argument("--screen_width", type=int, default=1920)
parser.add_argument("--screen_height", type=int, default=1080)
parser.add_argument("--sleep_after_execution", type=float, default=0.0)
parser.add_argument("--sleep_after_execution", type=float, default=2.0)
parser.add_argument("--max_steps", type=int, default=15)
# agent config
@@ -91,8 +91,9 @@ def config() -> argparse.Namespace:
)
# lm config
parser.add_argument("--planner_model", type=str, default="gpt-4o")
parser.add_argument("--executor_model", type=str, default="/mnt/chuzhe.hby/hf_ckpts/qwen-aguvis-7b")
parser.add_argument("--planner_model", type=str, default=None)
parser.add_argument("--executor_model", type=str, default="aguvis-72b-415")
parser.add_argument("--temperature", type=float, default=0)
parser.add_argument("--top_p", type=float, default=0.9)
parser.add_argument("--max_tokens", type=int, default=1500)