* feat: add claude support * feat: add script for end-to-end evaluation with logging and task distribution * feat&fix: add tool result handling and update model default in evaluation script * chore: remove run_test_env.py script * feat&fix: implement action parsing for tool calls and update default action space * fix: update text formatting in action parsing and replace logger import * feat&fix: implement action parsing for tool calls and add screen size handling * feat: add setup instructions for Anthropic API integration * feat: add notice about image size limitations for Anthropic API * Delete test_env/logger.py * Delete test_env/utils.py
260 lines
9.3 KiB
Python
260 lines
9.3 KiB
Python
import asyncio
|
|
import base64
|
|
import os
|
|
import shlex
|
|
import shutil
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import Literal, TypedDict, Optional, Tuple
|
|
from uuid import uuid4
|
|
|
|
from anthropic.types.beta import BetaToolComputerUse20241022Param
|
|
|
|
from .base import BaseAnthropicTool, ToolError, ToolResult
|
|
from .run import run
|
|
|
|
OUTPUT_DIR = "/tmp/outputs"
|
|
|
|
TYPING_DELAY_MS = 12
|
|
TYPING_GROUP_SIZE = 50
|
|
|
|
Action = Literal[
|
|
"key",
|
|
"type",
|
|
"mouse_move",
|
|
"left_click",
|
|
"left_click_drag",
|
|
"right_click",
|
|
"middle_click",
|
|
"double_click",
|
|
"screenshot",
|
|
"cursor_position",
|
|
]
|
|
|
|
|
|
class Resolution(TypedDict):
|
|
width: int
|
|
height: int
|
|
|
|
|
|
# sizes above XGA/WXGA are not recommended (see README.md)
|
|
# scale down to one of these targets if ComputerTool._scaling_enabled is set
|
|
MAX_SCALING_TARGETS: dict[str, Resolution] = {
|
|
"XGA": Resolution(width=1024, height=768), # 4:3
|
|
"WXGA": Resolution(width=1280, height=800), # 16:10
|
|
"FWXGA": Resolution(width=1366, height=768), # ~16:9
|
|
}
|
|
|
|
|
|
class ScalingSource(Enum):
|
|
COMPUTER = "computer"
|
|
API = "api"
|
|
|
|
|
|
class ComputerToolOptions(TypedDict):
|
|
display_height_px: int
|
|
display_width_px: int
|
|
display_number: Optional[int]
|
|
|
|
|
|
def chunks(s: str, chunk_size: int) -> list[str]:
|
|
return [s[i : i + chunk_size] for i in range(0, len(s), chunk_size)]
|
|
|
|
|
|
class ComputerTool(BaseAnthropicTool):
|
|
"""
|
|
A tool that allows the agent to interact with the screen, keyboard, and mouse of the current computer.
|
|
The tool parameters are defined by Anthropic and are not editable.
|
|
"""
|
|
|
|
name: Literal["computer"] = "computer"
|
|
api_type: Literal["computer_20241022"] = "computer_20241022"
|
|
width: int
|
|
height: int
|
|
display_num: Optional[int]
|
|
|
|
_screenshot_delay = 2.0
|
|
_scaling_enabled = True
|
|
|
|
@property
|
|
def options(self) -> ComputerToolOptions:
|
|
width, height = self.scale_coordinates(
|
|
ScalingSource.COMPUTER, self.width, self.height
|
|
)
|
|
return {
|
|
"display_width_px": width,
|
|
"display_height_px": height,
|
|
"display_number": self.display_num,
|
|
}
|
|
|
|
def to_params(self) -> BetaToolComputerUse20241022Param:
|
|
return {"name": self.name, "type": self.api_type, **self.options}
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
self.width = int(os.getenv("WIDTH") or 0)
|
|
self.height = int(os.getenv("HEIGHT") or 0)
|
|
assert self.width and self.height, "WIDTH, HEIGHT must be set"
|
|
if (display_num := os.getenv("DISPLAY_NUM")) is not None:
|
|
self.display_num = int(display_num)
|
|
self._display_prefix = f"DISPLAY=:{self.display_num} "
|
|
else:
|
|
self.display_num = None
|
|
self._display_prefix = ""
|
|
|
|
self.xdotool = f"{self._display_prefix}xdotool"
|
|
|
|
async def __call__(
|
|
self,
|
|
*,
|
|
action: Action,
|
|
text: Optional[str] = None,
|
|
coordinate: Optional[Tuple[int, int]] = None,
|
|
**kwargs,
|
|
):
|
|
if action in ("mouse_move", "left_click_drag"):
|
|
if coordinate is None:
|
|
raise ToolError(f"coordinate is required for {action}")
|
|
if text is not None:
|
|
raise ToolError(f"text is not accepted for {action}")
|
|
if not isinstance(coordinate, list) or len(coordinate) != 2:
|
|
raise ToolError(f"{coordinate} must be a tuple of length 2")
|
|
if not all(isinstance(i, int) and i >= 0 for i in coordinate):
|
|
raise ToolError(f"{coordinate} must be a tuple of non-negative ints")
|
|
|
|
x, y = self.scale_coordinates(
|
|
ScalingSource.API, coordinate[0], coordinate[1]
|
|
)
|
|
|
|
if action == "mouse_move":
|
|
return await self.shell(f"{self.xdotool} mousemove --sync {x} {y}")
|
|
elif action == "left_click_drag":
|
|
return await self.shell(
|
|
f"{self.xdotool} mousedown 1 mousemove --sync {x} {y} mouseup 1"
|
|
)
|
|
|
|
if action in ("key", "type"):
|
|
if text is None:
|
|
raise ToolError(f"text is required for {action}")
|
|
if coordinate is not None:
|
|
raise ToolError(f"coordinate is not accepted for {action}")
|
|
if not isinstance(text, str):
|
|
raise ToolError(output=f"{text} must be a string")
|
|
|
|
if action == "key":
|
|
return await self.shell(f"{self.xdotool} key -- {text}")
|
|
elif action == "type":
|
|
results: list[ToolResult] = []
|
|
for chunk in chunks(text, TYPING_GROUP_SIZE):
|
|
cmd = f"{self.xdotool} type --delay {TYPING_DELAY_MS} -- {shlex.quote(chunk)}"
|
|
results.append(await self.shell(cmd, take_screenshot=False))
|
|
screenshot_base64 = (await self.screenshot()).base64_image
|
|
return ToolResult(
|
|
output="".join(result.output or "" for result in results),
|
|
error="".join(result.error or "" for result in results),
|
|
base64_image=screenshot_base64,
|
|
)
|
|
|
|
if action in (
|
|
"left_click",
|
|
"right_click",
|
|
"double_click",
|
|
"middle_click",
|
|
"screenshot",
|
|
"cursor_position",
|
|
):
|
|
if text is not None:
|
|
raise ToolError(f"text is not accepted for {action}")
|
|
if coordinate is not None:
|
|
raise ToolError(f"coordinate is not accepted for {action}")
|
|
|
|
if action == "screenshot":
|
|
return await self.screenshot()
|
|
elif action == "cursor_position":
|
|
result = await self.shell(
|
|
f"{self.xdotool} getmouselocation --shell",
|
|
take_screenshot=False,
|
|
)
|
|
output = result.output or ""
|
|
x, y = self.scale_coordinates(
|
|
ScalingSource.COMPUTER,
|
|
int(output.split("X=")[1].split("\n")[0]),
|
|
int(output.split("Y=")[1].split("\n")[0]),
|
|
)
|
|
return result.replace(output=f"X={x},Y={y}")
|
|
else:
|
|
click_arg = {
|
|
"left_click": "1",
|
|
"right_click": "3",
|
|
"middle_click": "2",
|
|
"double_click": "--repeat 2 --delay 500 1",
|
|
}[action]
|
|
return await self.shell(f"{self.xdotool} click {click_arg}")
|
|
|
|
raise ToolError(f"Invalid action: {action}")
|
|
|
|
async def screenshot(self):
|
|
"""Take a screenshot of the current screen and return the base64 encoded image."""
|
|
output_dir = Path(OUTPUT_DIR)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
path = output_dir / f"screenshot_{uuid4().hex}.png"
|
|
|
|
# Try gnome-screenshot first
|
|
if shutil.which("gnome-screenshot"):
|
|
screenshot_cmd = f"{self._display_prefix}gnome-screenshot -f {path} -p"
|
|
else:
|
|
# Fall back to scrot if gnome-screenshot isn't available
|
|
screenshot_cmd = f"{self._display_prefix}scrot -p {path}"
|
|
|
|
result = await self.shell(screenshot_cmd, take_screenshot=False)
|
|
if self._scaling_enabled:
|
|
x, y = self.scale_coordinates(
|
|
ScalingSource.COMPUTER, self.width, self.height
|
|
)
|
|
await self.shell(
|
|
f"convert {path} -resize {x}x{y}! {path}", take_screenshot=False
|
|
)
|
|
|
|
if path.exists():
|
|
return result.replace(
|
|
base64_image=base64.b64encode(path.read_bytes()).decode()
|
|
)
|
|
raise ToolError(f"Failed to take screenshot: {result.error}")
|
|
|
|
async def shell(self, command: str, take_screenshot=True) -> ToolResult:
|
|
"""Run a shell command and return the output, error, and optionally a screenshot."""
|
|
_, stdout, stderr = await run(command)
|
|
base64_image = None
|
|
|
|
if take_screenshot:
|
|
# delay to let things settle before taking a screenshot
|
|
await asyncio.sleep(self._screenshot_delay)
|
|
base64_image = (await self.screenshot()).base64_image
|
|
|
|
return ToolResult(output=stdout, error=stderr, base64_image=base64_image)
|
|
|
|
def scale_coordinates(self, source: ScalingSource, x: int, y: int):
|
|
"""Scale coordinates to a target maximum resolution."""
|
|
if not self._scaling_enabled:
|
|
return x, y
|
|
ratio = self.width / self.height
|
|
target_dimension = None
|
|
for dimension in MAX_SCALING_TARGETS.values():
|
|
# allow some error in the aspect ratio - not ratios are exactly 16:9
|
|
if abs(dimension["width"] / dimension["height"] - ratio) < 0.02:
|
|
if dimension["width"] < self.width:
|
|
target_dimension = dimension
|
|
break
|
|
if target_dimension is None:
|
|
return x, y
|
|
# should be less than 1
|
|
x_scaling_factor = target_dimension["width"] / self.width
|
|
y_scaling_factor = target_dimension["height"] / self.height
|
|
if source == ScalingSource.API:
|
|
if x > self.width or y > self.height:
|
|
raise ToolError(f"Coordinates {x}, {y} are out of bounds")
|
|
# scale up
|
|
return round(x / x_scaling_factor), round(y / y_scaling_factor)
|
|
# scale down
|
|
return round(x * x_scaling_factor), round(y * y_scaling_factor) |