From 903ed3671547bc735d23e27154bac1b9c8e436d0 Mon Sep 17 00:00:00 2001 From: Qichen Fu Date: Thu, 13 Nov 2025 21:54:32 -0800 Subject: [PATCH] Add Claude Sonnet 4.5 support and improve action handling (#362) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit šŸ¤– Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude --- desktop_env/controllers/python.py | 7 +- desktop_env/desktop_env.py | 23 ++- desktop_env/providers/aws/manager.py | 6 +- lib_results_logger.py | 135 ++++++++++++ lib_run_single.py | 7 +- mm_agents/anthropic/main.py | 298 ++++++++++++++++++--------- mm_agents/anthropic/utils.py | 143 ++++++++++++- run_multienv_claude.py | 88 +++++++- 8 files changed, 578 insertions(+), 129 deletions(-) create mode 100644 lib_results_logger.py diff --git a/desktop_env/controllers/python.py b/desktop_env/controllers/python.py index d4391c8..0071173 100644 --- a/desktop_env/controllers/python.py +++ b/desktop_env/controllers/python.py @@ -238,12 +238,17 @@ class PythonController: "returncode": -1 } - def execute_action(self, action: Dict[str, Any]): + def execute_action(self, action): """ Executes an action on the server computer. """ + # Handle string actions if action in ['WAIT', 'FAIL', 'DONE']: return + + # Handle dictionary actions + if type(action) == dict and action.get('action_type') in ['WAIT', 'FAIL', 'DONE']: + return action_type = action["action_type"] parameters = action["parameters"] if "parameters" in action else {param: action[param] for param in action if param != 'action_type'} diff --git a/desktop_env/desktop_env.py b/desktop_env/desktop_env.py index 4f5836d..543c17d 100644 --- a/desktop_env/desktop_env.py +++ b/desktop_env/desktop_env.py @@ -391,12 +391,12 @@ class DesktopEnv(gym.Env): logger.info(f"Step {self._step_no} in trajectory {self._traj_no} with action: {action}") # handle the special actions if action in ['WAIT', 'FAIL', 'DONE'] or (type(action) == dict and action['action_type'] in ['WAIT', 'FAIL', 'DONE']): - if action == 'WAIT': + if action == 'WAIT' or (type(action) == dict and action.get('action_type') == 'WAIT'): time.sleep(pause) - elif action == 'FAIL': + elif action == 'FAIL' or (type(action) == dict and action.get('action_type') == 'FAIL'): done = True info = {"fail": True} - elif action == 'DONE': + elif action == 'DONE' or (type(action) == dict and action.get('action_type') == 'DONE'): done = True info = {"done": True} @@ -404,7 +404,7 @@ class DesktopEnv(gym.Env): # the set of all possible actions defined in the action representation self.controller.execute_action(action) elif self.action_space == "pyautogui" or self.action_space == "claude_computer_use": - if action in ['WAIT', 'FAIL', 'DONE']: + if action in ['WAIT', 'FAIL', 'DONE'] or (type(action) == dict and action.get('action_type') in ['WAIT', 'FAIL', 'DONE']): self.controller.execute_action(action) else: # the set of all possible python commands insides `pyautogui` @@ -434,13 +434,16 @@ class DesktopEnv(gym.Env): self.is_environment_used = True if self.evaluator['func'] == "infeasible": - if len(self.action_history) > 0 and self.action_history[-1] == "FAIL": - return 1 - else: - return 0 + if len(self.action_history) > 0: + last_action = self.action_history[-1] + if last_action == "FAIL" or (type(last_action) == dict and last_action.get('action_type') == 'FAIL'): + return 1 + return 0 else: - if len(self.action_history) > 0 and self.action_history[-1] == "FAIL": - return 0 + if len(self.action_history) > 0: + last_action = self.action_history[-1] + if last_action == "FAIL" or (type(last_action) == dict and last_action.get('action_type') == 'FAIL'): + return 0 if type(self.metric) == list: # Multiple metrics to evaluate whether the task is successfully completed diff --git a/desktop_env/providers/aws/manager.py b/desktop_env/providers/aws/manager.py index 57b33f6..5e10d3c 100644 --- a/desktop_env/providers/aws/manager.py +++ b/desktop_env/providers/aws/manager.py @@ -10,7 +10,7 @@ from desktop_env.providers.aws.config import ENABLE_TTL, DEFAULT_TTL_MINUTES, AW from desktop_env.providers.aws.scheduler_utils import schedule_instance_termination -INSTANCE_TYPE = "t3.medium" +INSTANCE_TYPE = "t3.xlarge" # Load environment variables from .env file dotenv.load_dotenv() @@ -40,9 +40,9 @@ DEFAULT_REGION = "us-east-1" # todo: public the AMI images IMAGE_ID_MAP = { "us-east-1": { - # (1920, 1080): "ami-0d23263edb96951d8" + (1920, 1080): "ami-0d23263edb96951d8", # For CoACT-1, uncomment to use the following AMI - (1920, 1080): "ami-0b505e9d0d99ba88c" + # (1920, 1080): "ami-0b505e9d0d99ba88c" }, "ap-east-1": { (1920, 1080): "ami-06850864d18fad836" diff --git a/lib_results_logger.py b/lib_results_logger.py new file mode 100644 index 0000000..a33df50 --- /dev/null +++ b/lib_results_logger.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 +""" +Thread-safe results logging for OSWorld evaluations. +Appends task completion results to results.json in real-time. +""" + +import json +import os +import time +import fcntl +from pathlib import Path +from typing import Dict, Any, Optional + + +def extract_domain_from_path(result_path: str) -> str: + """ + Extract domain/application from result directory path. + Expected structure: results/{action_space}/{observation_type}/{model}/{domain}/{task_id}/ + """ + path_parts = Path(result_path).parts + if len(path_parts) >= 2: + return path_parts[-2] # Second to last part should be domain + return "unknown" + + +def append_task_result( + task_id: str, + domain: str, + score: float, + result_dir: str, + args: Any, + error_message: Optional[str] = None +) -> None: + """ + Thread-safely append a task result to results.json. + + Args: + task_id: UUID of the task + domain: Application domain (chrome, vlc, etc.) + score: Task score (0.0 or 1.0) + result_dir: Full path to the task result directory + args: Command line arguments object + error_message: Error message if task failed + """ + # Create result entry + result_entry = { + "application": domain, + "task_id": task_id, + "status": "error" if error_message else "success", + "score": score, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") + } + + if error_message: + result_entry["err_message"] = error_message + + # Determine summary directory and results file path + # Extract base result directory from args + base_result_dir = Path(args.result_dir) + summary_dir = base_result_dir / "summary" + results_file = summary_dir / "results.json" + + # Ensure summary directory exists + summary_dir.mkdir(parents=True, exist_ok=True) + + # Thread-safe JSON append with file locking + try: + with open(results_file, 'a+') as f: + # Lock the file for exclusive access + fcntl.flock(f.fileno(), fcntl.LOCK_EX) + + try: + # Move to beginning to read existing content + f.seek(0) + content = f.read().strip() + + # Parse existing JSON array or create new one + if content: + try: + existing_results = json.loads(content) + if not isinstance(existing_results, list): + existing_results = [] + except json.JSONDecodeError: + existing_results = [] + else: + existing_results = [] + + # Add new result + existing_results.append(result_entry) + + # Write back the complete JSON array + f.seek(0) + f.truncate() + json.dump(existing_results, f, indent=2) + f.write('\n') # Add newline for readability + + finally: + # Always unlock the file + fcntl.flock(f.fileno(), fcntl.LOCK_UN) + + print(f"šŸ“ Logged result: {domain}/{task_id} -> {result_entry['status']} (score: {score})") + + except Exception as e: + # Don't let logging errors break the main evaluation + print(f"āš ļø Failed to log result for {task_id}: {e}") + + +def log_task_completion(example: Dict, result: float, result_dir: str, args: Any) -> None: + """ + Convenience wrapper for logging successful task completion. + + Args: + example: Task configuration dictionary + result: Task score + result_dir: Path to task result directory + args: Command line arguments + """ + task_id = example.get('id', 'unknown') + domain = extract_domain_from_path(result_dir) + append_task_result(task_id, domain, result, result_dir, args) + + +def log_task_error(example: Dict, error_msg: str, result_dir: str, args: Any) -> None: + """ + Convenience wrapper for logging task errors. + + Args: + example: Task configuration dictionary + error_msg: Error message + result_dir: Path to task result directory + args: Command line arguments + """ + task_id = example.get('id', 'unknown') + domain = extract_domain_from_path(result_dir) + append_task_result(task_id, domain, 0.0, result_dir, args, error_msg) \ No newline at end of file diff --git a/lib_run_single.py b/lib_run_single.py index c9d1fb3..e0e342e 100644 --- a/lib_run_single.py +++ b/lib_run_single.py @@ -4,6 +4,7 @@ import logging import os import time from wrapt_timeout_decorator import * +from lib_results_logger import log_task_completion logger = logging.getLogger("desktopenv.experiment") @@ -32,7 +33,7 @@ def run_single_example(agent, env, example, max_steps, instruction, args, exampl ) for action in actions: # Capture the timestamp before executing the action - action_timestamp = datetime.datetime.now().strftime("%Y%m%d@%H%M%S") + action_timestamp = datetime.datetime.now().strftime("%Y%m%d@%H%M%S%f") logger.info("Step %d: %s", step_idx + 1, action) obs, reward, done, info = env.step(action, args.sleep_after_execution) @@ -64,6 +65,10 @@ def run_single_example(agent, env, example, max_steps, instruction, args, exampl scores.append(result) with open(os.path.join(example_result_dir, "result.txt"), "w", encoding="utf-8") as f: f.write(f"{result}\n") + + # Log task completion to results.json + log_task_completion(example, result, example_result_dir, args) + env.controller.end_recording(os.path.join(example_result_dir, "recording.mp4")) diff --git a/mm_agents/anthropic/main.py b/mm_agents/anthropic/main.py index 4ff838c..eb4f5c4 100644 --- a/mm_agents/anthropic/main.py +++ b/mm_agents/anthropic/main.py @@ -17,7 +17,7 @@ from anthropic.types.beta import ( BetaMessageParam, BetaTextBlockParam, ) -from .utils import COMPUTER_USE_BETA_FLAG, PROMPT_CACHING_BETA_FLAG,SYSTEM_PROMPT, SYSTEM_PROMPT_WINDOWS, APIProvider, PROVIDER_TO_DEFAULT_MODEL_NAME +from .utils import COMPUTER_USE_BETA_FLAG, PROMPT_CACHING_BETA_FLAG,SYSTEM_PROMPT, SYSTEM_PROMPT_WINDOWS, APIProvider, PROVIDER_TO_DEFAULT_MODEL_NAME, get_model_name from .utils import _response_to_params, _inject_prompt_caching, _maybe_filter_to_n_most_recent_images import logging @@ -30,14 +30,18 @@ API_RETRY_INTERVAL = 5 class AnthropicAgent: def __init__(self, platform: str = "Ubuntu", - model: str = "claude-3-5-sonnet-20241022", - provider: APIProvider = APIProvider.BEDROCK, + model: str = "claude-sonnet-4-5-20250929", + provider: APIProvider = APIProvider.ANTHROPIC, max_tokens: int = 4096, api_key: str = os.environ.get("ANTHROPIC_API_KEY", None), system_prompt_suffix: str = "", only_n_most_recent_images: Optional[int] = 10, action_space: str = "claude_computer_use", screen_size: tuple[int, int] = (1920, 1080), + no_thinking: bool = False, + use_isp: bool = False, + temperature: Optional[float] = None, + top_p: Optional[float] = None, *args, **kwargs ): self.platform = platform @@ -52,10 +56,24 @@ class AnthropicAgent: self.only_n_most_recent_images = only_n_most_recent_images self.messages: list[BetaMessageParam] = [] self.screen_size = screen_size + self.no_thinking = no_thinking + self.use_isp = use_isp + self.temperature = temperature + self.top_p = top_p + self.resize_factor = ( screen_size[0] / 1280, # Assuming 1280 is the base width screen_size[1] / 720 # Assuming 720 is the base height ) + + def _get_sampling_params(self): + """Get sampling parameters (temperature and/or top_p) - let API validate exclusivity""" + params = {} + if self.temperature is not None: + params['temperature'] = self.temperature + if self.top_p is not None: + params['top_p'] = self.top_p + return params def add_tool_result(self, tool_call_id: str, result: str, screenshot: bytes = None): """Add tool result to message history""" @@ -84,6 +102,21 @@ class AnthropicAgent: "content": tool_result_content }) + def _extract_raw_response_string(self, response) -> str: + """Extract and concatenate raw response content into a single string.""" + raw_response_str = "" + if response.content: + for block in response.content: + if hasattr(block, 'text') and block.text: + raw_response_str += f"[TEXT] {block.text}\n" + elif hasattr(block, 'thinking') and block.thinking: + raw_response_str += f"[THINKING] {block.thinking}\n" + elif hasattr(block, 'name') and hasattr(block, 'input'): + raw_response_str += f"[TOOL_USE] {block.name}: {block.input}\n" + else: + raw_response_str += f"[OTHER] {str(block)}\n" + return raw_response_str.strip() + def parse_actions_from_tool_call(self, tool_call: Dict) -> str: result = "" function_args = ( @@ -194,13 +227,23 @@ class AnthropicAgent: result += (f"pyautogui.keyUp('{key}')\n") expected_outcome = f"Key {key} pressed." elif action == "type": - result += ( - f"pyautogui.typewrite(\"\"\"{text}\"\"\", interval=0.01)\n" - ) + for char in text: + if char == '\n': + result += "pyautogui.press('enter')\n" + elif char == "'": + result += 'pyautogui.press("\'")\n' + elif char == '\\': + result += "pyautogui.press('\\\\')\n" + elif char == '"': + result += "pyautogui.press('\"')\n" + else: + result += f"pyautogui.press('{char}')\n" expected_outcome = f"Text {text} written." # Handle scroll actions elif action == "scroll": + if text is not None: + result += (f"pyautogui.keyDown('{text.lower()}')\n") if coordinate is None: if scroll_direction in ("up", "down"): result += ( @@ -221,6 +264,8 @@ class AnthropicAgent: result += ( f"pyautogui.hscroll({scroll_amount if scroll_direction == 'right' else -scroll_amount}, {x}, {y})\n" ) + if text is not None: + result += (f"pyautogui.keyUp('{text.lower()}')\n") expected_outcome = "Scroll action finished" # Handle click actions @@ -285,7 +330,7 @@ class AnthropicAgent: expected_outcome = "Call user" elif action == "screenshot": result += "pyautogui.sleep(0.1)\n" - expected_outcome = "Screenshot taken" + expected_outcome = "Screenshot taken" else: raise ValueError(f"Invalid action: {action}") @@ -303,6 +348,9 @@ class AnthropicAgent: screenshot_bytes = obs["screenshot"] screenshot_image = Image.open(io.BytesIO(screenshot_bytes)) + # Store original unresized screenshot for zoom processing + obs["screenshot_original"] = screenshot_bytes + # Calculate new size based on resize factor new_width, new_height = 1280, 720 @@ -334,23 +382,45 @@ class AnthropicAgent: ] }) - if self.messages and "tool_use" in [content_block["type"] for content_block in self.messages[-1]["content"]]: - self.add_tool_result( - self.messages[-1]["content"][-1]["id"], - f"Success", - screenshot=obs.get("screenshot") if obs else None - ) + # Add tool_result for ALL tool_use blocks in the last message + if self.messages: + last_message_content = self.messages[-1]["content"] + tool_use_blocks = [block for block in last_message_content if block.get("type") == "tool_use"] + + for i, tool_block in enumerate(tool_use_blocks): + tool_input = tool_block.get("input", {}) + action = tool_input.get("action") + is_last_tool = i == len(tool_use_blocks) - 1 + + include_screenshot = None + + if obs: + if action == "screenshot": + # Screenshot action always gets regular screenshot + include_screenshot = obs.get("screenshot") + elif is_last_tool: + # Auto-screenshot: last tool gets regular screenshot (unless it's zoom, handled above) + include_screenshot = obs.get("screenshot") + + self.add_tool_result( + tool_block["id"], + f"Success", + screenshot=include_screenshot + ) enable_prompt_caching = False - betas = ["computer-use-2025-01-24"] - if self.model_name == "claude-3-7-sonnet-20250219" or self.model_name == "claude-4-opus-20250514" or self.model_name == "claude-4-sonnet-20250514": - betas = ["computer-use-2025-01-24"] - elif self.model_name == "claude-3-5-sonnet-20241022": - betas = [COMPUTER_USE_BETA_FLAG] + betas = [COMPUTER_USE_BETA_FLAG] + + # Add interleaved thinking beta if ISP is requested + if self.use_isp: + betas.append("interleaved-thinking-2025-05-14") + logger.info(f"Added interleaved thinking beta. Betas: {betas}") image_truncation_threshold = 10 if self.provider == APIProvider.ANTHROPIC: - client = Anthropic(api_key=self.api_key, max_retries=4) + client = Anthropic(api_key=self.api_key, max_retries=4).with_options( + default_headers={"anthropic-beta": COMPUTER_USE_BETA_FLAG} + ) enable_prompt_caching = True elif self.provider == APIProvider.VERTEX: client = AnthropicVertex() @@ -368,7 +438,7 @@ class AnthropicAgent: if enable_prompt_caching: betas.append(PROMPT_CACHING_BETA_FLAG) _inject_prompt_caching(self.messages) - image_truncation_threshold = 50 + image_truncation_threshold = 20 system["cache_control"] = {"type": "ephemeral"} if self.only_n_most_recent_images: @@ -378,49 +448,65 @@ class AnthropicAgent: min_removal_threshold=image_truncation_threshold, ) - try: - if self.model_name == "claude-3-5-sonnet-20241022": - tools = [ - {'name': 'computer', 'type': 'computer_20241022', 'display_width_px': 1280, 'display_height_px': 720, 'display_number': 1}, - # {'type': 'bash_20241022', 'name': 'bash'}, - # {'name': 'str_replace_editor', 'type': 'text_editor_20241022'} - ] if self.platform == 'Ubuntu' else [ - {'name': 'computer', 'type': 'computer_20241022', 'display_width_px': 1280, 'display_height_px': 720, 'display_number': 1}, - ] - elif self.model_name in ["claude-3-7-sonnet-20250219", "claude-4-opus-20250514", "claude-4-sonnet-20250514"]: - tools = [ - {'name': 'computer', 'type': 'computer_20250124', 'display_width_px': 1280, 'display_height_px': 720, 'display_number': 1}, - # {'type': 'bash_20250124', 'name': 'bash'}, - # {'name': 'str_replace_editor', 'type': 'text_editor_20250124'} - ] if self.platform == 'Ubuntu' else [ - {'name': 'computer', 'type': 'computer_20250124', 'display_width_px': 1280, 'display_height_px': 720, 'display_number': 1}, - ] + # Configure tool settings - use modern computer tool for all models + tool_config = { + 'name': 'computer', + 'type': 'computer_20250124', + 'display_width_px': 1280, + 'display_height_px': 720, + 'display_number': 1 + } + + tools = [ + tool_config, + ] if self.platform == 'Ubuntu' else [ + tool_config, + ] + + # Configure thinking mode based on user preferences + if self.no_thinking: + # Disable thinking mode - omit the thinking parameter + extra_body = {} + actual_max_tokens = self.max_tokens # Use default when no thinking + logger.info("Thinking mode: DISABLED") + else: + # Enable thinking mode (regular or interleaved) + # Use consistent 2048 budget for both regular and ISP thinking + budget_tokens = 2048 + + # For regular thinking: max_tokens > budget_tokens (API requirement) + # For ISP: budget_tokens can exceed max_tokens (represents total across all thinking blocks) + if self.max_tokens <= budget_tokens: + required_max_tokens = budget_tokens + 500 # Give some headroom + logger.warning(f"Regular thinking requires max_tokens > budget_tokens. Increasing max_tokens from {self.max_tokens} to {required_max_tokens}") + actual_max_tokens = required_max_tokens + else: + actual_max_tokens = self.max_tokens + extra_body = { - "thinking": {"type": "enabled", "budget_tokens": 1024} + "thinking": {"type": "enabled", "budget_tokens": budget_tokens} } + if self.use_isp: + logger.info("Thinking mode: INTERLEAVED SCRATCHPAD (ISP)") + else: + logger.info("Thinking mode: REGULAR SCRATCHPAD") + + try: response = None for attempt in range(API_RETRY_TIMES): try: - if self.model_name in ["claude-3-7-sonnet-20250219", "claude-4-opus-20250514", "claude-4-sonnet-20250514"]: - response = client.beta.messages.create( - max_tokens=self.max_tokens, - messages=self.messages, - model=PROVIDER_TO_DEFAULT_MODEL_NAME[self.provider, self.model_name], - system=[system], - tools=tools, - betas=betas, - extra_body=extra_body - ) - elif self.model_name == "claude-3-5-sonnet-20241022": - response = client.beta.messages.create( - max_tokens=self.max_tokens, - messages=self.messages, - model=PROVIDER_TO_DEFAULT_MODEL_NAME[self.provider, self.model_name], - system=[system], - tools=tools, - betas=betas, - ) + response = client.beta.messages.create( + max_tokens=actual_max_tokens, + messages=self.messages, + model=get_model_name(self.provider, self.model_name), + system=[system], + tools=tools, + betas=betas, + extra_body=extra_body, + **self._get_sampling_params() + ) + logger.info(f"Response: {response}") break except (APIError, APIStatusError, APIResponseValidationError) as e: @@ -450,26 +536,20 @@ class AnthropicAgent: try: logger.warning("Retrying with backup API key...") - backup_client = Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY_BACKUP"), max_retries=4) - if self.model_name in ["claude-3-7-sonnet-20250219", "claude-4-opus-20250514", "claude-4-sonnet-20250514"]: - response = backup_client.beta.messages.create( - max_tokens=self.max_tokens, - messages=self.messages, - model=PROVIDER_TO_DEFAULT_MODEL_NAME[APIProvider.ANTHROPIC, self.model_name], - system=[system], - tools=tools, - betas=betas, - extra_body=extra_body - ) - elif self.model_name == "claude-3-5-sonnet-20241022": - response = backup_client.beta.messages.create( - max_tokens=self.max_tokens, - messages=self.messages, - model=PROVIDER_TO_DEFAULT_MODEL_NAME[APIProvider.ANTHROPIC, self.model_name], - system=[system], - tools=tools, - betas=betas, - ) + backup_client = Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY_BACKUP"), max_retries=4).with_options( + default_headers={"anthropic-beta": COMPUTER_USE_BETA_FLAG} + ) + response = backup_client.beta.messages.create( + max_tokens=actual_max_tokens, + messages=self.messages, + model=get_model_name(self.provider, self.model_name), + system=[system], + tools=tools, + betas=betas, + extra_body=extra_body, + **self._get_sampling_params() + ) + logger.info("Successfully used backup API key") except Exception as backup_e: backup_error_msg = str(backup_e) @@ -497,9 +577,16 @@ class AnthropicAgent: logger.exception(f"Error in Anthropic API: {str(e)}") return None, None + if response is None: + logger.error("Response is None after API call - this should not happen") + return None, None + response_params = _response_to_params(response) logger.info(f"Received response params: {response_params}") + # Convert raw response to concatenated string for trajectory logging + raw_response_str = self._extract_raw_response_string(response) + # Store response in message history self.messages.append({ "role": "assistant", @@ -518,7 +605,8 @@ class AnthropicAgent: "input": cast(dict[str, Any], content_block["input"]), "id": content_block["id"], "action_type": content_block.get("type"), - "command": self.parse_actions_from_tool_call(content_block) + "command": self.parse_actions_from_tool_call(content_block), + "raw_response": raw_response_str # Add raw response to each action }) elif content_block["type"] == "text": reasonings.append(content_block["text"]) @@ -526,10 +614,23 @@ class AnthropicAgent: reasonings = reasonings[0] else: reasonings = "" + + # Check if the model indicated the task is infeasible + if raw_response_str and "[INFEASIBLE]" in raw_response_str: + logger.info("Detected [INFEASIBLE] pattern in response, triggering FAIL action") + # Override actions with FAIL + actions = [{ + "action_type": "FAIL", + "raw_response": raw_response_str + }] + logger.info(f"Received actions: {actions}") logger.info(f"Received reasonings: {reasonings}") if len(actions) == 0: - actions = ["DONE"] + actions = [{ + "action_type": "DONE", + "raw_response": raw_response_str + }] return reasonings, actions except Exception as e: logger.warning(f"parse_actions_from_tool_call parsing failed (attempt {parse_retry+1}/3), will retry API request: {e}") @@ -539,25 +640,17 @@ class AnthropicAgent: response = None for attempt in range(API_RETRY_TIMES): try: - if self.model_name in ["claude-3-7-sonnet-20250219", "claude-4-opus-20250514", "claude-4-sonnet-20250514"]: - response = client.beta.messages.create( - max_tokens=self.max_tokens, - messages=self.messages, - model=PROVIDER_TO_DEFAULT_MODEL_NAME[self.provider, self.model_name], - system=[system], - tools=tools, - betas=betas, - extra_body=extra_body - ) - elif self.model_name == "claude-3-5-sonnet-20241022": - response = client.beta.messages.create( - max_tokens=self.max_tokens, - messages=self.messages, - model=PROVIDER_TO_DEFAULT_MODEL_NAME[self.provider, self.model_name], - system=[system], - tools=tools, - betas=betas, - ) + response = client.beta.messages.create( + max_tokens=actual_max_tokens, + messages=self.messages, + model=get_model_name(self.provider, self.model_name), + system=[system], + tools=tools, + betas=betas, + extra_body=extra_body, + **self._get_sampling_params() + ) + logger.info(f"Response: {response}") break # Success, exit retry loop except (APIError, APIStatusError, APIResponseValidationError) as e2: @@ -569,13 +662,20 @@ class AnthropicAgent: raise response_params = _response_to_params(response) logger.info(f"Received response params: {response_params}") + + # Update raw response string for retry case (will be used in next loop iteration) + raw_response_str = self._extract_raw_response_string(response) + self.messages.append({ "role": "assistant", "content": response_params }) if parse_retry == max_parse_retry - 1: logger.error(f"parse_actions_from_tool_call parsing failed 3 times consecutively, terminating: {e}") - actions = ["FAIL"] + actions = [{ + "action_type": "FAIL", + "raw_response": f"Failed to parse actions from tool call after {max_parse_retry} attempts: {e}" + }] return reasonings, actions def reset(self, _logger = None, *args, **kwargs): """ diff --git a/mm_agents/anthropic/utils.py b/mm_agents/anthropic/utils.py index 195a82c..8597942 100644 --- a/mm_agents/anthropic/utils.py +++ b/mm_agents/anthropic/utils.py @@ -27,7 +27,7 @@ from datetime import datetime from .tools import ToolResult -COMPUTER_USE_BETA_FLAG = "computer-use-2024-10-22" +COMPUTER_USE_BETA_FLAG = "computer-use-2025-01-24" PROMPT_CACHING_BETA_FLAG = "prompt-caching-2024-07-31" @@ -47,12 +47,25 @@ PROVIDER_TO_DEFAULT_MODEL_NAME: dict[(APIProvider, str), str] = { (APIProvider.ANTHROPIC, "claude-4-opus-20250514"): "claude-4-opus-20250514", (APIProvider.BEDROCK, "claude-4-opus-20250514"): "us.anthropic.claude-opus-4-20250514-v1:0", (APIProvider.VERTEX, "claude-4-opus-20250514"): "claude-4-opus-v1@20250514", + # Add mapping for the alternative model name format + (APIProvider.ANTHROPIC, "claude-opus-4-20250514"): "claude-opus-4-20250514", + (APIProvider.ANTHROPIC, "claude-opus-4-1-20250805"): "claude-opus-4-1-20250805", (APIProvider.ANTHROPIC, "claude-4-sonnet-20250514"): "claude-4-sonnet-20250514", + (APIProvider.ANTHROPIC, "claude-sonnet-4-20250514"): "claude-sonnet-4-20250514", (APIProvider.BEDROCK, "claude-4-sonnet-20250514"): "us.anthropic.claude-sonnet-4-20250514-v1:0", (APIProvider.VERTEX, "claude-4-sonnet-20250514"): "claude-sonnet-4-v1@20250514", } +def get_model_name(provider: APIProvider, model_name: str) -> str: + """ + Get the actual model name to use for API calls. + + Simply returns the model name as-is for direct API usage. + """ + return model_name + + # This system prompt is optimized for the Docker environment in this repository and # specific tool combinations enabled. # We encourage modifying this system prompt to ensure the model has context for the @@ -67,8 +80,15 @@ SYSTEM_PROMPT = f""" * When viewing a page it can be helpful to zoom out so that you can see everything on the page. Either that, or make sure you scroll down to see everything before deciding something isn't available. * DO NOT ask users for clarification during task execution. DO NOT stop to request more information from users. Always take action using available tools. * When using your computer function calls, they take a while to run and send back to you. Where possible/feasible, try to chain multiple of these calls all into one function calls request. +* TASK FEASIBILITY: You can declare a task infeasible at any point during execution - whether at the beginning after taking a screenshot, or later after attempting some actions and discovering barriers. Carefully evaluate whether the task is feasible given the current system state, available applications, and task requirements. If you determine that a task cannot be completed due to: + - Missing required applications or dependencies that cannot be installed + - Insufficient permissions or system limitations + - Contradictory or impossible requirements + - Any other fundamental barriers that make completion impossible + Then you MUST output exactly "[INFEASIBLE]" (including the square brackets) anywhere in your response to trigger the fail action. The system will automatically detect this pattern and terminate the task appropriately. * The current date is {datetime.today().strftime('%A, %B %d, %Y')}. * Home directory of this Ubuntu system is '/home/user'. +* If you need a password for sudo, the password of the computer is 'osworld-public-evaluation'. @@ -82,6 +102,7 @@ SYSTEM_PROMPT_WINDOWS = f""" * The current date is {datetime.today().strftime('%A, %B %d, %Y')}. * Home directory of this Windows system is 'C:\\Users\\user'. * When you want to open some applications on Windows, please use Double Click on it instead of clicking once. +* If you need a password for sudo, The password of the computer is 'osworld-public-evaluation'. """ @@ -154,21 +175,30 @@ def _inject_prompt_caching( one cache breakpoint is left for tools/system prompt, to be shared across sessions """ - breakpoints_remaining = 3 + breakpoints_remaining = 2 # Use full budget for recent messages + messages_processed = 0 + for message in reversed(messages): if message["role"] == "user" and isinstance( content := message["content"], list ): - if breakpoints_remaining: - breakpoints_remaining -= 1 + messages_processed += 1 + # Check if this message would fit within the remaining budget + if breakpoints_remaining >= len(content): + # We have enough budget, spend it and add cache_control + breakpoints_remaining -= len(content) # Use type ignore to bypass TypedDict check until SDK types are updated content[-1]["cache_control"] = BetaCacheControlEphemeralParam( # type: ignore {"type": "ephemeral"} ) else: - content[-1].pop("cache_control", None) - # we'll only every have one extra turn per loop - break + # Check if this is the first message (contains image + text with task description) + is_first_message = messages_processed == len([msg for msg in messages if msg["role"] == "user"]) + + if not is_first_message: + # Not enough budget, remove any existing cache_control from this message + content[-1].pop("cache_control", None) + # Continue to clean up older messages that might have cache_control from previous turns def _maybe_filter_to_n_most_recent_images( @@ -220,6 +250,105 @@ def _maybe_filter_to_n_most_recent_images( tool_result["content"] = new_content +def validate_model_support(model_name: str, api_key: str = None, temperature: float = None, top_p: float = None, no_thinking: bool = False, use_isp: bool = False) -> bool: + """ + Validate model support with the same API call pattern as the main agent. + + Args: + model_name: The model name to validate + api_key: Optional API key, defaults to ANTHROPIC_API_KEY env var + temperature: Optional temperature parameter for testing + top_p: Optional top_p parameter for testing + no_thinking: Disable thinking mode (matches AnthropicAgent) + use_isp: Use interleaved scratchpad mode (matches AnthropicAgent) + + Returns: + True if model is supported and API call succeeds, False otherwise + """ + print(f"šŸ” Validating model support: {model_name}") + + try: + from anthropic import Anthropic + import os + import time + + # Same client setup as main agent but with manual retry (max_retries=1 for faster feedback) + client = Anthropic( + api_key=api_key or os.environ.get("ANTHROPIC_API_KEY"), + max_retries=4 + ).with_options(default_headers={"anthropic-beta": COMPUTER_USE_BETA_FLAG}) + + # Same message format as main agent - always use structured format with cache_control + messages = [{"role": "user", "content": [{"type": "text", "text": "Respond with 'OK'", "cache_control": {"type": "ephemeral"}}]}] + + # Same betas configuration as main agent + betas = [COMPUTER_USE_BETA_FLAG] + if use_isp: + betas.append("interleaved-thinking-2025-05-14") + + system = [{"type": "text", "text": "You are Claude. Respond with 'OK'."}] + + # Same tools configuration as main agent - use modern computer tool for all models + tools = [{"name": "computer", "type": "computer_20250124", + "display_width_px": 1280, "display_height_px": 720, "display_number": 1}] + + # Same thinking configuration as main agent + max_tokens = 50 # Base validation max_tokens + if no_thinking: + extra_body = {} + actual_max_tokens = max_tokens + else: + budget_tokens = 2048 + # Same logic as main agent: if max_tokens <= budget_tokens, increase it + if max_tokens <= budget_tokens: + actual_max_tokens = budget_tokens + 500 + else: + actual_max_tokens = max_tokens + extra_body = { + "thinking": {"type": "enabled", "budget_tokens": budget_tokens} + } + + # Sampling parameters (same logic as main agent) + sampling_params = {} + if temperature is not None: + sampling_params['temperature'] = temperature + if top_p is not None: + sampling_params['top_p'] = top_p + + # Retry logic with 5 attempts, 5 second delays + for attempt in range(5): + try: + # Same API call pattern as main agent + client.beta.messages.create( + max_tokens=actual_max_tokens, + messages=messages, + model=get_model_name(APIProvider.ANTHROPIC, model_name), + system=system, + tools=tools, + betas=betas, + extra_body=extra_body, + **sampling_params + ) + + print(f"āœ… Model {model_name} validated successfully") + return True + except Exception as e: + if attempt < 4: # Don't print error on final attempt + print(f"šŸ”„ Validation attempt {attempt + 1}/5 failed: {e}") + print(f"ā³ Retrying in 5 seconds...") + time.sleep(5) + else: + print(f"āŒ All validation attempts failed. Final error: {e}") + + return False + + except ValueError: + return False + except Exception as e: + print(f"āŒ API validation setup failed: {e}") + return False + + def _response_to_params( response: BetaMessage, ) -> list[BetaContentBlockParam]: diff --git a/run_multienv_claude.py b/run_multienv_claude.py index ca6aba2..7d1e84f 100644 --- a/run_multienv_claude.py +++ b/run_multienv_claude.py @@ -13,6 +13,7 @@ import time from typing import List from multiprocessing import Process, Manager, current_process import lib_run_single +from lib_results_logger import log_task_error from desktop_env.desktop_env import DesktopEnv from mm_agents.anthropic import AnthropicAgent @@ -67,17 +68,27 @@ def config() -> argparse.Namespace: ) # lm config - parser.add_argument("--model", type=str, default="claude-4-sonnet-20250514") - parser.add_argument("--temperature", type=float, default=1.0) - parser.add_argument("--top_p", type=float, default=0.9) - parser.add_argument("--max_tokens", type=int, default=1500) + parser.add_argument("--model", type=str, default="") + parser.add_argument("--temperature", type=float, default=None) + parser.add_argument("--top_p", type=float, default=None) + parser.add_argument("--max_tokens", type=int, default=3000) parser.add_argument("--stop_token", type=str, default=None) + + # thinking mode config + parser.add_argument("--no-thinking", action="store_true", + help="Disable thinking mode (no scratchpad)") + parser.add_argument("--use-isp", action="store_true", + help="Use interleaved scratchpad (ISP) mode") # example config parser.add_argument("--domain", type=str, default="all") parser.add_argument( "--test_all_meta_path", type=str, default="evaluation_examples/test_all.json" ) + parser.add_argument( + "--specific_task_id", type=str, default=None, + help="Run only a specific task ID (overrides domain filtering)" + ) # logging related parser.add_argument("--result_dir", type=str, default="./results") @@ -95,6 +106,37 @@ def config() -> argparse.Namespace: args = config() # Get command line arguments first +# Validate that model is specified to prevent accidental usage with empty model +if not args.model or args.model.strip() == "": + print("ERROR: Model must be specified. Use --model ") + print("Example: --model claude-sonnet-4-5-20250929") + sys.exit(1) + +# Validate model support before proceeding +from mm_agents.anthropic.utils import validate_model_support + +# Pass same temperature/top_p and thinking parameters as will be used by the agent +validation_kwargs = {} +if args.temperature is not None: + validation_kwargs['temperature'] = args.temperature +if args.top_p is not None: + validation_kwargs['top_p'] = args.top_p +validation_kwargs['no_thinking'] = args.no_thinking +validation_kwargs['use_isp'] = args.use_isp + +if not validate_model_support(args.model, **validation_kwargs): + print(f"\nšŸ’„ Model '{args.model}' api sample failed") + sys.exit(1) + +# Validate thinking mode options are mutually exclusive +if args.no_thinking and args.use_isp: + print("ERROR: --no-thinking and --use-isp are mutually exclusive") + print("Choose one of:") + print(" (default): Regular scratchpad mode") + print(" --no-thinking: Disable thinking/scratchpad") + print(" --use-isp: Use interleaved scratchpad (ISP)") + sys.exit(1) + logger = logging.getLogger() log_level = getattr(logging, args.log_level.upper()) logger.setLevel(log_level) @@ -182,7 +224,7 @@ def run_env_tasks(task_queue, args, shared_scores): headless=args.headless, os_type="Ubuntu", require_a11y_tree=args.observation_type in ["a11y_tree", "screenshot_a11y_tree", "som"], - enable_proxy=True, + enable_proxy=False, client_password=args.client_password ) active_environments.append(env) @@ -196,8 +238,9 @@ def run_env_tasks(task_queue, args, shared_scores): observation_type=args.observation_type, max_trajectory_length=args.max_trajectory_length, provider_name=args.provider_name, - screen_width=args.screen_width, - screen_height=args.screen_height, + screen_size=(args.screen_width, args.screen_height), + no_thinking=getattr(args, 'no_thinking', False), + use_isp=getattr(args, 'use_isp', False), ) logger.info(f"Process {current_process().name} started.") while True: @@ -239,6 +282,14 @@ def run_env_tasks(task_queue, args, shared_scores): import traceback logger.error(f"Exception in {current_process().name} {domain}/{example_id}: {e}") logger.error(traceback.format_exc()) + + # Log error to results.json + try: + example = {"id": example_id} # Create minimal example dict for error logging + log_task_error(example, str(e), example_result_dir, args) + except Exception as log_e: + logger.error(f"Failed to log error to results.json: {log_e}") + try: env.controller.end_recording( os.path.join(example_result_dir, "recording.mp4") @@ -479,7 +530,28 @@ if __name__ == "__main__": with open(args.test_all_meta_path, "r", encoding="utf-8") as f: test_all_meta = json.load(f) - if args.domain != "all": + # Filter for specific task ID if provided + if args.specific_task_id: + logger.info(f"Filtering for specific task ID: {args.specific_task_id}") + filtered_meta = {} + task_found = False + + for domain, task_ids in test_all_meta.items(): + for task_id in task_ids: + if task_id == args.specific_task_id: + filtered_meta[domain] = [task_id] + task_found = True + logger.info(f"Found task {args.specific_task_id} in domain: {domain}") + break + if task_found: + break + + if not task_found: + logger.error(f"Task ID {args.specific_task_id} not found in test file!") + sys.exit(1) + + test_all_meta = filtered_meta + elif args.domain != "all": test_all_meta = {args.domain: test_all_meta[args.domain]} test_file_list = get_unfinished(