import os from typing import Optional, List, Dict, Any from dotenv import load_dotenv import logging import base64 import glob from io import BytesIO from PIL import Image logger = logging.getLogger("desktopenv.vllm_eval") load_dotenv() def _compress_image(img_b64: str, max_size: int = 800, quality: int = 85) -> str: """ Compress base64 encoded image to reduce size Args: img_b64: Base64 encoded image string max_size: Maximum dimension (width or height) in pixels quality: JPEG quality (1-100), lower means smaller file size Returns: Compressed base64 encoded image string """ try: # Decode base64 to image img_data = base64.b64decode(img_b64) img = Image.open(BytesIO(img_data)) # Convert to RGB if necessary (for PNG with transparency) if img.mode in ('RGBA', 'LA', 'P'): background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'P': img = img.convert('RGBA') background.paste(img, mask=img.split()[-1] if img.mode in ('RGBA', 'LA') else None) img = background # Resize if image is too large original_size = img.size if max(img.size) > max_size: ratio = max_size / max(img.size) new_size = tuple(int(dim * ratio) for dim in img.size) img = img.resize(new_size, Image.Resampling.LANCZOS) logger.info(f"Resized image from {original_size} to {new_size}") # Compress to JPEG buffer = BytesIO() img.save(buffer, format='JPEG', quality=quality, optimize=True) compressed_data = buffer.getvalue() # Encode back to base64 compressed_b64 = base64.b64encode(compressed_data).decode('utf-8') # Log compression ratio original_size_kb = len(img_b64) * 3 / 4 / 1024 # base64 to bytes to KB compressed_size_kb = len(compressed_b64) * 3 / 4 / 1024 compression_ratio = (1 - compressed_size_kb / original_size_kb) * 100 logger.info(f"Compressed image: {original_size_kb:.1f}KB -> {compressed_size_kb:.1f}KB ({compression_ratio:.1f}% reduction)") return compressed_b64 except Exception as e: logger.warning(f"Failed to compress image: {e}, using original") return img_b64 class UnifiedLLM: def __init__(self, model: str): if model.startswith("gpt"): self.provider = "openai" elif model.startswith("claude"): self.provider = "anthropic" elif model.startswith("gemini"): self.provider = "gemini" else: self.provider = "unknown" self.model = model self.client = self._init_client() def _init_client(self): """Initialize client""" if self.provider == "openai": from openai import OpenAI return OpenAI( base_url=os.getenv("OPENAI_BASE_URL"), api_key=os.getenv("OPENAI_API_KEY") ) elif self.provider == "anthropic": from anthropic import Anthropic return Anthropic( base_url=os.getenv("ANTHROPIC_BASE_URL"), api_key=os.getenv("ANTHROPIC_API_KEY") ) elif self.provider == "gemini": logger.warning("Using Google Gemini model, make sure your internet connection is working.") import google.generativeai as genai genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) return genai.GenerativeModel(self.model) else: logger.error(f"Unsupported LLM provider for model: {self.model}") raise ValueError(f"Unsupported LLM provider for model: {self.model}") def _get_supported_params(self, temperature: float, max_tokens: int, top_p: float) -> Dict[str, Any]: """Get supported parameters for each provider""" base_params = { "temperature": temperature, "max_tokens": max_tokens } # GPT-5.2 and newer models may not support top_p if self.provider == "openai": # Only add top_p for older models if not self.model.startswith("gpt-5"): base_params["top_p"] = top_p elif self.provider == "anthropic": base_params["top_p"] = top_p elif self.provider == "gemini": base_params["top_p"] = top_p return base_params def generate( self, prompt: str, temperature: float = 0.7, max_tokens: int = 16384, top_p: float = 1.0, **kwargs ) -> str: """ Args: prompt: Input prompt temperature: Temperature (0.0-2.0) max_tokens: Maximum number of tokens top_p: Top-p sampling (0.0-1.0) Returns: Generated text """ params = self._get_supported_params(temperature, max_tokens, top_p) if self.provider == "openai": try: response = self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], **params ) return response.choices[0].message.content except Exception as e: logger.error(f"OpenAI API error: {e}") raise e elif self.provider == "anthropic": try: response = self.client.messages.create( model=self.model, messages=[{"role": "user", "content": prompt}], **params ) return response.content[0].text except Exception as e: logger.error(f"Anthropic API error: {e}") raise e elif self.provider == "gemini": try: import google.generativeai as genai config = genai.GenerationConfig( temperature=params["temperature"], max_output_tokens=params["max_tokens"], top_p=params.get("top_p", 1.0) ) response = self.client.generate_content(prompt, generation_config=config) return response.text except Exception as e: logger.error(f"Gemini API error: {e}") raise e def generate_with_images( self, prompt: str, images_b64: List[str], temperature: float = 0.7, max_tokens: int = 16384, top_p: float = 1.0, **kwargs ) -> str: """ Generate with multiple images in a single request Args: prompt: Instruction prompt images_b64: List of base64 encoded images temperature: Temperature (0.0-2.0) max_tokens: Maximum number of tokens top_p: Top-p sampling (0.0-1.0) Returns: Generated text """ if not images_b64: logger.warning("No images provided, falling back to text-only generation") return self.generate(prompt, temperature, max_tokens, top_p, **kwargs) params = self._get_supported_params(temperature, max_tokens, top_p) if self.provider == "openai": # Build content with text and all images content = [{"type": "text", "text": prompt}] for img_b64 in images_b64: content.append({ "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{img_b64}" } }) try: response = self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": content}], **params ) return response.choices[0].message.content except Exception as e: logger.error(f"OpenAI API error: {e}") raise e elif self.provider == "anthropic": # Build content with text and all images content = [{"type": "text", "text": prompt}] for img_b64 in images_b64: content.append({ "type": "image", "source": { "type": "base64", "media_type": "image/jpeg", "data": img_b64 } }) try: response = self.client.messages.create( model=self.model, messages=[{"role": "user", "content": content}], **params ) return response.content[0].text except Exception as e: logger.error(f"Anthropic API error: {e}") raise e elif self.provider == "gemini": import google.generativeai as genai config = genai.GenerationConfig( temperature=params["temperature"], max_output_tokens=params["max_tokens"], top_p=params.get("top_p", 1.0) ) # Build content parts content_parts = [prompt] for img_b64 in images_b64: img_data = base64.b64decode(img_b64) img = Image.open(BytesIO(img_data)) content_parts.append(img) try: response = self.client.generate_content(content_parts, generation_config=config) return response.text except Exception as e: logger.error(f"Gemini API error: {e}") raise e else: raise ValueError(f"Unsupported provider: {self.provider}") def _load_screenshots_from_dir(result_dir: str, compress: bool = True, max_size: int = 800, quality: int = 85) -> tuple: """ Load all step screenshots from result directory and convert to base64 Args: result_dir: Path to result directory containing step_*.png files compress: Whether to compress images (default: True) max_size: Maximum dimension for compression (default: 800) quality: JPEG quality for compression (default: 85) Returns: Tuple of (list of base64 encoded screenshot strings, list of short filenames like 'step_1', 'step_2', ...) """ screenshots = [] filenames = [] # Find all step screenshot files (e.g., step_1_20240101@120000.png) pattern = os.path.join(result_dir, "step_*.png") screenshot_files = sorted(glob.glob(pattern)) if not screenshot_files: logger.warning(f"No screenshot files found in {result_dir}") return screenshots, filenames import re as _re for filepath in screenshot_files: try: with open(filepath, "rb") as f: img_data = f.read() img_b64 = base64.b64encode(img_data).decode('utf-8') # Compress if enabled if compress: img_b64 = _compress_image(img_b64, max_size=max_size, quality=quality) screenshots.append(img_b64) # Extract short name like 'step_1' from 'step_1_20240101@120000.png' basename = os.path.basename(filepath) match = _re.match(r'(step_\d+)', basename) short_name = match.group(1) if match else basename filenames.append(short_name) except Exception as e: logger.error(f"Error loading screenshot {filepath}: {e}") logger.info(f"Loaded {len(screenshots)} screenshots from {result_dir}: {filenames}") return screenshots, filenames def vllm_eval(result_state, **options) -> float: """ Evaluate task completion using vision-language model Args: result_state: Current state description **options: Additional options including: - result_dir: Path to result directory containing step screenshots (recommended) - screenshots: List of base64 encoded screenshots (deprecated, use result_dir instead) - instruction: Task instruction - eval_model: Model name to use - compress_images: Whether to compress images (default: True) - max_image_size: Maximum image dimension for compression (default: 800) - image_quality: JPEG quality for compression (default: 85) - temperature: Temperature parameter - max_tokens: Maximum tokens - top_p: Top-p parameter Returns: Score between 0.0 and 1.0 """ # Try to load screenshots from result_dir if provided result_dir = options.get("result_dir", None) screenshots = options.get("screenshots", []) # Image compression options compress_images = options.get("compress_images", True) max_image_size = options.get("max_image_size", 800) image_quality = options.get("image_quality", 85) screenshot_filenames = [] # Short names like 'step_1', 'step_2', ... if result_dir and not screenshots: screenshots, screenshot_filenames = _load_screenshots_from_dir( result_dir, compress=compress_images, max_size=max_image_size, quality=image_quality ) logger.info(f"Loaded {len(screenshots)} screenshots from result_dir: {result_dir}") elif screenshots: logger.info(f"Using {len(screenshots)} screenshots from options") screenshot_filenames = [f"step_{i+1}" for i in range(len(screenshots))] # Compress screenshots if needed if compress_images: logger.info("Compressing provided screenshots...") screenshots = [_compress_image(img, max_size=max_image_size, quality=image_quality) for img in screenshots] instruction = options.get("instruction", "") eval_model = options.get("eval_model", "gpt-4-vision-preview") config = options.get("config", []) metadata = options.get("metadata", {}) params = { "temperature": options.get("temperature", 0.7), "max_tokens": options.get("max_tokens", 16384), "top_p": options.get("top_p", 1.0) } llm = UnifiedLLM(eval_model) # Build pre-configured environment description from config preconfig_items = [] for cfg in config: if cfg.get("type") == "launch": cmds = cfg.get("parameters", {}).get("command", []) if cmds: app_name = os.path.basename(cmds[0]) if cmds else "unknown" preconfig_items.append(f"Application '{app_name}' was automatically launched before the agent started.") elif cfg.get("type") == "sleep": pass # not relevant to scoring elif cfg.get("type") == "open": path = cfg.get("parameters", {}).get("path", "") preconfig_items.append(f"File/URL '{path}' was automatically opened before the agent started.") preconfig_section = "" if preconfig_items: preconfig_desc = "\n".join(f" - {item}" for item in preconfig_items) preconfig_section = f""" PRE-CONFIGURED ENVIRONMENT (done BEFORE the agent started, NOT the agent's work): {preconfig_desc} IMPORTANT: The above actions were performed automatically as part of environment setup. The agent did NOT perform these actions. Do NOT give ANY credit for them. For example, if the application was pre-launched, the agent merely having the application open is worth 0 points - that was the starting state.""" # Build expected steps section from metadata expected_steps_section = "" if metadata.get("steps"): expected_steps_section = f""" EXPECTED STEPS for this task (use as reference for what the agent should have done): {metadata['steps']} NOTE: Evaluate the screenshots against these expected steps. Only give credit for steps that show VISIBLE evidence of completion BEYOND the pre-configured starting state.""" # Build image list description for the prompt if screenshot_filenames: img_list_str = ", ".join(screenshot_filenames) img_info = f"""\nYou are provided with exactly {len(screenshot_filenames)} screenshots in chronological order: {img_list_str} The FIRST screenshot is: {screenshot_filenames[0]} The LAST screenshot (final state): {screenshot_filenames[-1]} IMPORTANT: Only reference screenshots from the list above. Do NOT reference any screenshot that is not listed.""" else: img_info = "\nNo screenshots were provided." prompt = f"""You are a STRICT and RIGOROUS evaluator for desktop environment tasks. Your job is to score ONLY based on concrete, visible evidence of task completion in the screenshots. Task Instruction: {instruction} {preconfig_section} {expected_steps_section} {img_info} Analyze ONLY the FINAL screenshot ({screenshot_filenames[-1] if screenshot_filenames else 'N/A'}) to determine the end state, while using earlier screenshots for context. CRITICAL SCORING RULES: 1. Score ONLY based on what the AGENT actually accomplished. The pre-configured environment (application already launched, files already opened, etc.) is the STARTING STATE and worth 0 points. 2. Score ONLY based on what is ACTUALLY VISIBLE in the screenshots. Do NOT give credit for assumed or potential progress. 3. If the screenshots show NO meaningful action beyond the initial pre-configured state, the score MUST be 0. 4. Do NOT give partial credit for "having the system on", "desktop being visible", "the application being open" (if it was pre-launched), or "the application being installed". These are prerequisites or pre-configured state, NOT progress. 5. Each point must correspond to a SPECIFIC, VERIFIABLE action that was successfully completed BY THE AGENT toward the task goal. SCORING GUIDE (0-10): - 0: No progress beyond the pre-configured starting state. If the app was pre-launched, merely having it open is 0. If the screenshots only show the desktop or the initial app state without any agent action, score is 0. - 1-2: The agent performed one minor action (e.g., clicked on a menu) but did not make meaningful progress toward the task goal. - 3-4: Some initial steps toward the task have been taken but the task is far from complete. - 5-6: Significant progress - about half the required steps are completed with visible evidence. - 7-8: Most steps are completed but the final result is not fully achieved or has minor issues. - 9: The task is essentially complete with very minor cosmetic differences. - 10: The task is perfectly and completely finished with clear evidence in the final screenshot. IMPORTANT: You must respond with ONLY a valid JSON object (no additional text before or after). Use the following exact format: {{ "steps_analysis": [ {{"step": "Step description", "status": "Success/Fail", "evidence_img": "step_X.png", "reason": "Brief explanation of VISIBLE evidence"}}, {{"step": "Another step", "status": "Success/Fail", "evidence_img": "step_Y.png", "reason": "Brief explanation of VISIBLE evidence"}} ], "final_completion": "True/False", "score": 0-10 }} Where: - "steps_analysis": Array of steps you identified from the screenshots. Each step must cite VISIBLE evidence from a specific screenshot. Do NOT include pre-configured actions as agent steps. - "status": Either "Success" or "Fail" for each step - "evidence_img": The screenshot filename that shows evidence for this step (e.g., "step_2.png") - "reason": Explanation of what is VISUALLY observed in the screenshot as evidence - "final_completion": "True" ONLY if the overall task is fully completed with clear visual proof, "False" otherwise - "score": Integer from 0 to 10, following the strict scoring guide above Remember: Return ONLY the JSON object, no additional text. Be STRICT - when in doubt, score LOWER.""" try: result = llm.generate_with_images( prompt=prompt, images_b64=screenshots, **params ) # Parse score from result score = _parse_score(result) logger.info(f"Evaluation result: {result}") logger.info(f"Parsed score: {score}") # Save raw result to file for reference if result_dir: eval_output_path = os.path.join(result_dir, "vllm_evaluation_result.json") with open(eval_output_path, "w", encoding="utf-8") as f: f.write(result) logger.info(f"Saved evaluation result to {eval_output_path}") return score except Exception as e: logger.error(f"Error during evaluation: {e}") return 0.0 def _parse_evaluation_response(text: str) -> Dict[str, Any]: """ Parse the JSON evaluation response from the model Returns: Dictionary containing steps_analysis, final_completion, and score """ import re import json # Try to extract JSON from the response # Sometimes models wrap JSON in markdown code blocks text = text.strip() # Remove markdown code blocks if present if text.startswith("```"): # Extract content between ``` markers match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, re.DOTALL) if match: text = match.group(1) else: # Try to remove opening and closing ``` text = re.sub(r'^```(?:json)?\s*', '', text) text = re.sub(r'\s*```$', '', text) try: result = json.loads(text) # Validate required fields if "steps_analysis" not in result: logger.warning("Missing 'steps_analysis' field in response") result["steps_analysis"] = [] if "final_completion" not in result: logger.warning("Missing 'final_completion' field in response") result["final_completion"] = "False" if "score" not in result: logger.warning("Missing 'score' field in response") result["score"] = 0 return result except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON response: {e}") logger.error(f"Response text: {text[:500]}") # Return a default structure return { "steps_analysis": [], "final_completion": "False", "score": 0 } def _parse_score(text: str) -> float: """ Parse score from model response and convert to 0.0-1.0 range Args: text: Raw model response (expected to be JSON format) Returns: Score between 0.0 and 1.0 """ result = _parse_evaluation_response(text) # Extract score (0-10) and convert to 0.0-1.0 score = result.get("score", 0) try: score = float(score) # Clamp to [0, 10] then normalize to [0.0, 1.0] score = max(0.0, min(10.0, score)) normalized_score = score / 10.0 logger.info(f"Final completion: {result.get('final_completion')}") logger.info(f"Raw score (0-10): {score}, Normalized score (0-1): {normalized_score}") # Log steps analysis if available steps = result.get("steps_analysis", []) if steps: logger.info(f"Steps analysis ({len(steps)} steps):") for i, step in enumerate(steps): logger.info(f" Step {i+1}: {step.get('step', 'N/A')} - {step.get('status', 'N/A')}") return normalized_score except (ValueError, TypeError) as e: logger.warning(f"Could not parse score: {e}") return 0.0