Files
sci-gui-agent-benchmark/desktop_env/evaluators/metrics/vllm_eval.py
lizhanyuan 252d2f79ce fix(eval): 修复vllm_eval截图排序bug并对齐reeval逻辑
- 修复_load_screenshots_from_dir中截图按字符串排序导致step_9被误判为最终帧的bug,改为数字排序
- 对齐reeval.py的prompt逻辑:明确要求模型优先检查最终截图(STEP 1 EXAMINE FINAL SCREENSHOT FIRST)
- 评估temperature从0.7降至0.2提升一致性
- 新增batch_reeval.py:基于test_final.json批量重评测已有轨迹
- 新增reeval.py:单任务重评测脚本(final-frame-anchored evaluation)
- test_final.json新增avogadro(11题)和origin(8题)
2026-03-27 14:34:32 +08:00

661 lines
24 KiB
Python

import os
from typing import Optional, List, Dict, Any
from dotenv import load_dotenv
import logging
import base64
import glob
from io import BytesIO
from PIL import Image
logger = logging.getLogger("desktopenv.vllm_eval")
load_dotenv()
def _compress_image(img_b64: str, max_size: int = 800, quality: int = 85) -> str:
"""
Compress base64 encoded image to reduce size
Args:
img_b64: Base64 encoded image string
max_size: Maximum dimension (width or height) in pixels
quality: JPEG quality (1-100), lower means smaller file size
Returns:
Compressed base64 encoded image string
"""
try:
# Decode base64 to image
img_data = base64.b64decode(img_b64)
img = Image.open(BytesIO(img_data))
# Convert to RGB if necessary (for PNG with transparency)
if img.mode in ('RGBA', 'LA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[-1] if img.mode in ('RGBA', 'LA') else None)
img = background
# Resize if image is too large
original_size = img.size
if max(img.size) > max_size:
ratio = max_size / max(img.size)
new_size = tuple(int(dim * ratio) for dim in img.size)
img = img.resize(new_size, Image.Resampling.LANCZOS)
logger.info(f"Resized image from {original_size} to {new_size}")
# Compress to JPEG
buffer = BytesIO()
img.save(buffer, format='JPEG', quality=quality, optimize=True)
compressed_data = buffer.getvalue()
# Encode back to base64
compressed_b64 = base64.b64encode(compressed_data).decode('utf-8')
# Log compression ratio
original_size_kb = len(img_b64) * 3 / 4 / 1024 # base64 to bytes to KB
compressed_size_kb = len(compressed_b64) * 3 / 4 / 1024
compression_ratio = (1 - compressed_size_kb / original_size_kb) * 100
logger.info(f"Compressed image: {original_size_kb:.1f}KB -> {compressed_size_kb:.1f}KB ({compression_ratio:.1f}% reduction)")
return compressed_b64
except Exception as e:
logger.warning(f"Failed to compress image: {e}, using original")
return img_b64
class UnifiedLLM:
def __init__(self, model: str):
if model.startswith("gpt"):
self.provider = "openai"
elif model.startswith("claude"):
self.provider = "anthropic"
elif model.startswith("gemini"):
# If OPENAI_API_KEY is set but GOOGLE_API_KEY is not,
# use OpenAI-compatible proxy for Gemini models
if os.getenv("OPENAI_API_KEY") and not os.getenv("GOOGLE_API_KEY"):
self.provider = "openai"
logger.info(f"Using OpenAI-compatible proxy for Gemini model: {model}")
else:
self.provider = "gemini"
else:
self.provider = "unknown"
self.model = model
self.client = self._init_client()
def _init_client(self):
"""Initialize client"""
if self.provider == "openai":
from openai import OpenAI
return OpenAI(
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY")
)
elif self.provider == "anthropic":
from anthropic import Anthropic
return Anthropic(
base_url=os.getenv("ANTHROPIC_BASE_URL"),
api_key=os.getenv("ANTHROPIC_API_KEY")
)
elif self.provider == "gemini":
logger.warning("Using Google Gemini model, make sure your internet connection is working.")
import google.generativeai as genai
genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
return genai.GenerativeModel(self.model)
else:
logger.error(f"Unsupported LLM provider for model: {self.model}")
raise ValueError(f"Unsupported LLM provider for model: {self.model}")
def _get_supported_params(self, temperature: float, max_tokens: int, top_p: float) -> Dict[str, Any]:
"""Get supported parameters for each provider"""
base_params = {
"temperature": temperature,
"max_tokens": max_tokens
}
# GPT-5.2 and newer models may not support top_p
if self.provider == "openai":
# Only add top_p for older models
if not self.model.startswith("gpt-5"):
base_params["top_p"] = top_p
elif self.provider == "anthropic":
base_params["top_p"] = top_p
elif self.provider == "gemini":
base_params["top_p"] = top_p
return base_params
def generate(
self,
prompt: str,
temperature: float = 0.7,
max_tokens: int = 16384,
top_p: float = 1.0,
**kwargs
) -> str:
"""
Args:
prompt: Input prompt
temperature: Temperature (0.0-2.0)
max_tokens: Maximum number of tokens
top_p: Top-p sampling (0.0-1.0)
Returns:
Generated text
"""
params = self._get_supported_params(temperature, max_tokens, top_p)
if self.provider == "openai":
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
**params
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"OpenAI API error: {e}")
raise e
elif self.provider == "anthropic":
try:
response = self.client.messages.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
**params
)
return response.content[0].text
except Exception as e:
logger.error(f"Anthropic API error: {e}")
raise e
elif self.provider == "gemini":
try:
import google.generativeai as genai
config = genai.GenerationConfig(
temperature=params["temperature"],
max_output_tokens=params["max_tokens"],
top_p=params.get("top_p", 1.0)
)
response = self.client.generate_content(prompt, generation_config=config)
return response.text
except Exception as e:
logger.error(f"Gemini API error: {e}")
raise e
def generate_with_images(
self,
prompt: str,
images_b64: List[str],
temperature: float = 0.7,
max_tokens: int = 16384,
top_p: float = 1.0,
**kwargs
) -> str:
"""
Generate with multiple images in a single request
Args:
prompt: Instruction prompt
images_b64: List of base64 encoded images
temperature: Temperature (0.0-2.0)
max_tokens: Maximum number of tokens
top_p: Top-p sampling (0.0-1.0)
Returns:
Generated text
"""
if not images_b64:
logger.warning("No images provided, falling back to text-only generation")
return self.generate(prompt, temperature, max_tokens, top_p, **kwargs)
params = self._get_supported_params(temperature, max_tokens, top_p)
if self.provider == "openai":
# Build content with text and all images
content = [{"type": "text", "text": prompt}]
for img_b64 in images_b64:
content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{img_b64}"
}
})
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": content}],
**params
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"OpenAI API error: {e}")
raise e
elif self.provider == "anthropic":
# Build content with text and all images
content = [{"type": "text", "text": prompt}]
for img_b64 in images_b64:
content.append({
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": img_b64
}
})
try:
response = self.client.messages.create(
model=self.model,
messages=[{"role": "user", "content": content}],
**params
)
return response.content[0].text
except Exception as e:
logger.error(f"Anthropic API error: {e}")
raise e
elif self.provider == "gemini":
import google.generativeai as genai
config = genai.GenerationConfig(
temperature=params["temperature"],
max_output_tokens=params["max_tokens"],
top_p=params.get("top_p", 1.0)
)
# Build content parts
content_parts = [prompt]
for img_b64 in images_b64:
img_data = base64.b64decode(img_b64)
img = Image.open(BytesIO(img_data))
content_parts.append(img)
try:
response = self.client.generate_content(content_parts, generation_config=config)
return response.text
except Exception as e:
logger.error(f"Gemini API error: {e}")
raise e
else:
raise ValueError(f"Unsupported provider: {self.provider}")
def _sample_key_frames(items: list, max_count: int) -> list:
"""
Uniformly sample key frames while always keeping the first and last items.
Args:
items: List of items to sample from
max_count: Maximum number of items to keep (must be >= 2)
Returns:
List of sampled indices (sorted)
"""
n = len(items)
if n <= max_count:
return list(range(n))
# Always keep first and last
if max_count < 2:
max_count = 2
indices = [0] # first frame
# Uniformly sample (max_count - 2) frames from the middle
middle_count = max_count - 2
if middle_count > 0:
step = (n - 2) / (middle_count + 1)
for i in range(1, middle_count + 1):
idx = int(round(i * step))
indices.append(idx)
indices.append(n - 1) # last frame
# Deduplicate and sort
indices = sorted(set(indices))
return indices
def _load_screenshots_from_dir(result_dir: str, compress: bool = False, max_size: int = 800, quality: int = 85, max_images: int = 0) -> tuple:
"""
Load step screenshots from result directory and convert to base64.
When max_images > 0 and there are more screenshots than max_images,
uniformly sample key frames (always keeping first and last).
Args:
result_dir: Path to result directory containing step_*.png files
compress: Whether to compress images (default: True)
max_size: Maximum dimension for compression (default: 800)
quality: JPEG quality for compression (default: 85)
max_images: Maximum number of screenshots to load (0 = no limit)
Returns:
Tuple of (list of base64 encoded screenshot strings, list of short filenames like 'step_1', 'step_2', ...)
"""
screenshots = []
filenames = []
# Find all step screenshot files (e.g., step_1_20240101@120000.png)
# Sort numerically by step number to avoid lexicographic issues (step_10 < step_2 in string sort)
import re as _re_sort
pattern = os.path.join(result_dir, "step_*.png")
screenshot_files = sorted(
glob.glob(pattern),
key=lambda p: int(_re_sort.search(r"step_(\d+)", os.path.basename(p)).group(1))
)
if not screenshot_files:
logger.warning(f"No screenshot files found in {result_dir}")
return screenshots, filenames
# Key frame sampling: if max_images > 0 and we have more files than allowed,
# keep first + last + uniformly sampled middle frames
total_files = len(screenshot_files)
if max_images > 0 and total_files > max_images:
sampled_indices = _sample_key_frames(screenshot_files, max_images)
screenshot_files_sampled = [screenshot_files[i] for i in sampled_indices]
logger.info(f"Key frame sampling: {total_files} screenshots -> {len(screenshot_files_sampled)} "
f"(max_images={max_images}, kept indices: {sampled_indices})")
screenshot_files = screenshot_files_sampled
import re as _re
for filepath in screenshot_files:
try:
with open(filepath, "rb") as f:
img_data = f.read()
img_b64 = base64.b64encode(img_data).decode('utf-8')
# Compress if enabled
if compress:
img_b64 = _compress_image(img_b64, max_size=max_size, quality=quality)
screenshots.append(img_b64)
# Extract short name like 'step_1' from 'step_1_20240101@120000.png'
basename = os.path.basename(filepath)
match = _re.match(r'(step_\d+)', basename)
short_name = match.group(1) if match else basename
filenames.append(short_name)
except Exception as e:
logger.error(f"Error loading screenshot {filepath}: {e}")
logger.info(f"Loaded {len(screenshots)} screenshots from {result_dir}: {filenames}")
return screenshots, filenames
def vllm_eval(result_state, **options) -> float:
"""
Evaluate task completion using vision-language model
Args:
result_state: Current state description
**options: Additional options including:
- result_dir: Path to result directory containing step screenshots (recommended)
- screenshots: List of base64 encoded screenshots (deprecated, use result_dir instead)
- instruction: Task instruction
- eval_model: Model name to use
- compress_images: Whether to compress images (default: True)
- max_image_size: Maximum image dimension for compression (default: 800)
- image_quality: JPEG quality for compression (default: 85)
- max_eval_images: Max screenshots for evaluation (0 = no limit, default: 10).
When exceeded, keeps first + last + uniformly sampled middle frames.
- temperature: Temperature parameter
- max_tokens: Maximum tokens
- top_p: Top-p parameter
Returns:
Score between 0.0 and 1.0
"""
# Try to load screenshots from result_dir if provided
result_dir = options.get("result_dir", None)
screenshots = options.get("screenshots", [])
# Image compression options
compress_images = options.get("compress_images", False)
max_image_size = options.get("max_image_size", 800)
image_quality = options.get("image_quality", 85)
max_eval_images = options.get("max_eval_images", 10)
screenshot_filenames = [] # Short names like 'step_1', 'step_2', ...
if result_dir and not screenshots:
screenshots, screenshot_filenames = _load_screenshots_from_dir(
result_dir,
compress=compress_images,
max_size=max_image_size,
quality=image_quality,
max_images=max_eval_images
)
logger.info(f"Loaded {len(screenshots)} screenshots from result_dir: {result_dir}")
elif screenshots:
logger.info(f"Using {len(screenshots)} screenshots from options")
screenshot_filenames = [f"step_{i+1}" for i in range(len(screenshots))]
# Compress screenshots if needed
if compress_images:
logger.info("Compressing provided screenshots...")
screenshots = [_compress_image(img, max_size=max_image_size, quality=image_quality) for img in screenshots]
instruction = options.get("instruction", "")
eval_model = options.get("eval_model", "gpt-4-vision-preview")
config = options.get("config", [])
metadata = options.get("metadata", {})
params = {
"temperature": options.get("temperature", 0.2),
"max_tokens": options.get("max_tokens", 16384),
"top_p": options.get("top_p", 1.0)
}
llm = UnifiedLLM(eval_model)
# Build pre-configured environment description from config
preconfig_items = []
for cfg in config:
if cfg.get("type") == "launch":
cmds = cfg.get("parameters", {}).get("command", [])
if cmds:
app_name = os.path.basename(cmds[0]) if cmds else "unknown"
preconfig_items.append(f"Application '{app_name}' was automatically launched before the agent started.")
elif cfg.get("type") == "sleep":
pass # not relevant to scoring
elif cfg.get("type") == "open":
path = cfg.get("parameters", {}).get("path", "")
preconfig_items.append(f"File/URL '{path}' was automatically opened before the agent started.")
preconfig_section = ""
if preconfig_items:
preconfig_desc = "\n".join(f" - {item}" for item in preconfig_items)
preconfig_section = f"""
PRE-CONFIGURED ENVIRONMENT (done BEFORE the agent started, NOT the agent's work):
{preconfig_desc}
IMPORTANT: The above actions were performed automatically as part of environment setup. The agent did NOT perform these actions. Do NOT give ANY credit for them. For example, if the application was pre-launched, the agent merely having the application open is worth 0 points - that was the starting state."""
# Build expected steps section from metadata
expected_steps_section = ""
if metadata.get("steps"):
expected_steps_section = f"""
EXPECTED STEPS for this task (use as reference for what the agent should have done):
{metadata['steps']}
NOTE: Evaluate the screenshots against these expected steps. Only give credit for steps that show VISIBLE evidence of completion BEYOND the pre-configured starting state."""
# Build image list description for the prompt
if screenshot_filenames:
img_list_str = ", ".join(screenshot_filenames)
img_info = f"""\nYou are provided with exactly {len(screenshot_filenames)} screenshots in chronological order: {img_list_str}
The FIRST screenshot is: {screenshot_filenames[0]}
The LAST screenshot (final state): {screenshot_filenames[-1]}
IMPORTANT: Only reference screenshots from the list above. Do NOT reference any screenshot that is not listed."""
else:
img_info = "\nNo screenshots were provided."
final_name = screenshot_filenames[-1] if screenshot_filenames else "N/A"
prompt = f"""You are a STRICT evaluator for desktop GUI agent tasks.
Task: {instruction}
{preconfig_section}
{expected_steps_section}
{img_info}
════════════════════════════════════════════════════
STEP 1 — EXAMINE THE FINAL SCREENSHOT FIRST
The LAST image provided is "{final_name}" — this is the FINAL STATE of the agent's session.
Look at this image carefully NOW before anything else. Ask yourself:
"Does this final screenshot show the task is complete?"
Only after answering that, use earlier screenshots to understand HOW the agent got there.
════════════════════════════════════════════════════
SCORING RULES:
1. Base your final_completion and score PRIMARILY on "{final_name}" (the last image).
2. Credit ONLY actions performed BY THE AGENT (not pre-configured setup).
3. Require VISIBLE evidence in a specific screenshot for each step.
4. If the final screenshot shows the task is done, score high even if earlier steps were messy.
SCORE GUIDE (0-10):
- 0: No agent progress; only pre-configured state visible.
- 1-3: Minor actions taken, far from goal.
- 4-6: Meaningful progress, roughly half done.
- 7-8: Most steps done, minor issues.
- 9: Essentially complete, cosmetic differences only.
- 10: Fully and perfectly complete with clear visual proof in the final screenshot.
Respond with ONLY valid JSON (no extra text):
{{
"final_screenshot": "{final_name}",
"final_screenshot_description": "Describe exactly what you see in {final_name}",
"task_complete_in_final": true/false,
"steps_analysis": [
{{"step": "...", "status": "Success/Fail", "evidence_img": "step_X", "reason": "..."}}
],
"final_completion": "True/False",
"score": 0-10
}}"""
try:
result = llm.generate_with_images(
prompt=prompt,
images_b64=screenshots,
**params
)
# Parse score from result
score = _parse_score(result)
logger.info(f"Evaluation result: {result}")
logger.info(f"Parsed score: {score}")
# Save raw result to file for reference
if result_dir:
eval_output_path = os.path.join(result_dir, "vllm_evaluation_result.json")
with open(eval_output_path, "w", encoding="utf-8") as f:
f.write(result)
logger.info(f"Saved evaluation result to {eval_output_path}")
return score
except Exception as e:
logger.error(f"Error during evaluation: {e}")
return 0.0
def _parse_evaluation_response(text: str) -> Dict[str, Any]:
"""
Parse the JSON evaluation response from the model
Returns:
Dictionary containing steps_analysis, final_completion, and score
"""
import re
import json
# Try to extract JSON from the response
# Sometimes models wrap JSON in markdown code blocks
text = text.strip()
# Remove markdown code blocks if present
if text.startswith("```"):
# Extract content between ``` markers
match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, re.DOTALL)
if match:
text = match.group(1)
else:
# Try to remove opening and closing ```
text = re.sub(r'^```(?:json)?\s*', '', text)
text = re.sub(r'\s*```$', '', text)
try:
result = json.loads(text)
# Validate required fields
if "steps_analysis" not in result:
logger.warning("Missing 'steps_analysis' field in response")
result["steps_analysis"] = []
if "final_completion" not in result:
logger.warning("Missing 'final_completion' field in response")
result["final_completion"] = "False"
if "score" not in result:
logger.warning("Missing 'score' field in response")
result["score"] = 0
return result
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON response: {e}")
logger.error(f"Response text: {text[:500]}")
# Return a default structure
return {
"steps_analysis": [],
"final_completion": "False",
"score": 0
}
def _parse_score(text: str) -> float:
"""
Parse score from model response and convert to 0.0-1.0 range
Args:
text: Raw model response (expected to be JSON format)
Returns:
Score between 0.0 and 1.0
"""
result = _parse_evaluation_response(text)
# Extract score (0-10) and convert to 0.0-1.0
score = result.get("score", 0)
try:
score = float(score)
# Clamp to [0, 10] then normalize to [0.0, 1.0]
score = max(0.0, min(10.0, score))
normalized_score = score / 10.0
logger.info(f"Final completion: {result.get('final_completion')}")
logger.info(f"Raw score (0-10): {score}, Normalized score (0-1): {normalized_score}")
# Log steps analysis if available
steps = result.get("steps_analysis", [])
if steps:
logger.info(f"Steps analysis ({len(steps)} steps):")
for i, step in enumerate(steps):
logger.info(f" Step {i+1}: {step.get('step', 'N/A')} - {step.get('status', 'N/A')}")
return normalized_score
except (ValueError, TypeError) as e:
logger.warning(f"Could not parse score: {e}")
return 0.0