diff --git a/evaluation_examples/extract_instructions.py b/evaluation_examples/extract_instructions.py new file mode 100644 index 0000000..fb721b1 --- /dev/null +++ b/evaluation_examples/extract_instructions.py @@ -0,0 +1,604 @@ +import os +import sys +import asyncio +import aiohttp +import base64 +import logging +from pathlib import Path +from typing import List, Optional +import tempfile +import shutil +from dataclasses import dataclass +from datetime import datetime +import json + +# Configuration +SCRIPT_DIR = Path(__file__).parent +PROJECT_ROOT = SCRIPT_DIR.parent + +API_BASE_URL = os.getenv("OPENAI_BASE_URL") +API_URL = f"{API_BASE_URL}/chat/completions" if API_BASE_URL else None +API_KEY = os.getenv("OPENAI_API_KEY") +MODEL_NAME = "gemini-2.5-pro" +MAX_CONCURRENT_REQUESTS = 5 +INPUT_FOLDER = "/Users/cuihang/Downloads/test_files" +EXAMPLES_FOLDER = PROJECT_ROOT / "evaluation_examples" / "examples" +TEST_ALL_JSON = PROJECT_ROOT / "evaluation_examples" / "test_all.json" + +# Retry configuration +MAX_RETRY_ATTEMPTS = 3 +RETRY_DELAY = 5 +RETRY_BACKOFF = 2 + +# Image limit +MAX_IMAGES_PER_REQUEST = 50 + +# Supported file extensions +SUPPORTED_EXTENSIONS = {'.docx', '.doc', '.ppt', '.pptx', '.pdf', '.mp4', '.avi', '.mov', '.mkv'} + +SYSTEM_PROMPT = """You are an AI assistant that generates precise, executable step-by-step instructions for desktop software operations. + +Your task: +Convert the provided document information into precise operation instructions that can be executed step-by-step by an AI agent in a software GUI. + +Output requirements (no additional explanatory text): +------------------------------------------------ + +[Task Goal] +Describe in one sentence the final task result to be achieved in the software. + +[Input Files] +Specify the file names, types, and locations involved in this operation. +- If the document provides complete paths, record them as is +- If only file names are mentioned (e.g., data.xlsx), record the filename and note "complete path not specified in document" +- If no input files are mentioned, write "no input files required" + +[Detailed Operation Steps (GUI Level)] +Break down the task into atomic GUI operation steps. +Each step must meet the following conditions: +- Contains only one explicit, indivisible GUI atomic action +- Must specify the menus, panels, buttons, or controls involved +- Must specify parameter names and option values involved +- Arranged in the actual operation order of the software +- Must include software launch steps (e.g., double-click desktop icon, launch from start menu, etc.) + +Step format example: +1. Double-click the [Software Name] icon on the desktop to launch the software. +2. Click "File → Open" in the main menu bar. +3. In the file selection dialog, navigate to the specified directory and select file [filename]. +4. Click the "Open" button to confirm. +5. ... (and so on) + +------------------------------------------------ + +[Handling Uncertain Information] +- Strictly generate operation steps based on document content, do not add features or menus not mentioned +- If operation steps are unclear or ambiguous, infer based on common software operation flows +- If parameter values in the document are unclear, note "[set according to actual needs]" in the step + +[Output Format] +Output in JSON format with the following fields: +{ + "input_files": ["file1", "file2", "..."], + "task_goal": "...", + "steps": "A string containing all operation steps, arranged in order, with numbered prefix for each step, separated by newlines" +} +Note: Output must be strict JSON format, with no extra text or explanations.""" + + +# Logging configuration +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(sys.stdout) + ] +) +logger = logging.getLogger(__name__) + + +@dataclass +class ProcessingStats: + """Processing statistics tracker""" + total_files: int = 0 + completed_files: int = 0 + failed_files: int = 0 + retried_files: int = 0 + start_time: datetime = None + failed_list: List[tuple] = None + + def __post_init__(self): + if self.start_time is None: + self.start_time = datetime.now() + if self.failed_list is None: + self.failed_list = [] + + def add_completed(self): + self.completed_files += 1 + self._log_progress() + + def add_failed(self, file_path: str, error: str): + self.failed_files += 1 + self.failed_list.append((file_path, error)) + self._log_progress() + + def add_retry(self): + self.retried_files += 1 + + def _log_progress(self): + processed = self.completed_files + self.failed_files + percentage = (processed / self.total_files * 100) if self.total_files > 0 else 0 + elapsed = (datetime.now() - self.start_time).total_seconds() + + if processed > 0: + avg_time = elapsed / processed + remaining = (self.total_files - processed) * avg_time + eta = f"{int(remaining // 60)}m{int(remaining % 60)}s" + else: + eta = "calculating..." + + logger.info(f"Progress: {processed}/{self.total_files} ({percentage:.1f}%) | " + f"Success: {self.completed_files} | Failed: {self.failed_files} | " + f"Retried: {self.retried_files} | ETA: {eta}") + + def print_summary(self): + elapsed = (datetime.now() - self.start_time).total_seconds() + logger.info("=" * 60) + logger.info("Processing Complete") + logger.info("=" * 60) + logger.info(f"Total files: {self.total_files}") + logger.info(f"Success: {self.completed_files}") + logger.info(f"Failed: {self.failed_files}") + logger.info(f"Total retries: {self.retried_files}") + logger.info(f"Total time: {int(elapsed // 60)}m{int(elapsed % 60)}s") + + if self.failed_list: + logger.info("\nFailed files:") + for file_path, error in self.failed_list: + logger.info(f" - {file_path}") + logger.info(f" Error: {error}") + + self._save_report() + + def _save_report(self): + report = { + "total_files": self.total_files, + "completed": self.completed_files, + "failed": self.failed_files, + "retries": self.retried_files, + "start_time": self.start_time.isoformat(), + "end_time": datetime.now().isoformat(), + "elapsed_seconds": (datetime.now() - self.start_time).total_seconds(), + "failed_files": [{"file": f, "error": e} for f, e in self.failed_list] + } + + report_file = Path(EXAMPLES_FOLDER) / "processing_report.json" + with open(report_file, 'w', encoding='utf-8') as f: + json.dump(report, f, ensure_ascii=False, indent=2) + + logger.info(f"\nDetailed report saved to: {report_file}") + + +stats = ProcessingStats() +software_tests = {} + + +def check_dependencies(): + """Check and prompt for missing dependencies""" + missing = [] + + try: + import pdf2image + except ImportError: + missing.append("pdf2image") + + try: + import PIL + except ImportError: + missing.append("Pillow") + + try: + import cv2 + except ImportError: + missing.append("opencv-python or opencv-python-headless") + + if not shutil.which("soffice") and not shutil.which("libreoffice"): + logger.warning("LibreOffice not detected, cannot convert .doc and .ppt files") + logger.info("Install: sudo apt-get install libreoffice (Linux) or download from https://www.libreoffice.org/") + + if missing: + logger.error(f"Missing dependencies: {', '.join(missing)}") + logger.info(f"Install with: pip install {' '.join(missing)}") + logger.info("Note: pdf2image also requires poppler") + logger.info(" - Ubuntu/Debian: sudo apt-get install poppler-utils") + logger.info(" - macOS: brew install poppler") + logger.info(" - Windows: download from https://github.com/oschwartz10612/poppler-windows/releases/") + return False + return True + + +def convert_pdf_to_images(pdf_path: str) -> List[str]: + """Convert PDF to base64-encoded images""" + try: + from pdf2image import convert_from_path + from PIL import Image + import io + + images = convert_from_path(pdf_path, dpi=150, fmt='jpeg') + base64_images = [] + + for img in images: + buffer = io.BytesIO() + img.save(buffer, format='JPEG', quality=100) + img_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') + base64_images.append(img_base64) + + return base64_images + except Exception as e: + logger.error(f"PDF conversion failed for {pdf_path}: {str(e)}") + return [] + + +def convert_office_to_pdf(input_path: str) -> Optional[str]: + """Convert Office documents to PDF using LibreOffice""" + try: + import subprocess + + temp_dir = tempfile.mkdtemp() + soffice_cmd = "soffice" if shutil.which("soffice") else "libreoffice" + + cmd = [ + soffice_cmd, + "--headless", + "--convert-to", "pdf", + "--outdir", temp_dir, + input_path + ] + + result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) + + if result.returncode == 0: + pdf_name = Path(input_path).stem + ".pdf" + pdf_path = os.path.join(temp_dir, pdf_name) + + if os.path.exists(pdf_path): + return pdf_path + + logger.error(f"LibreOffice conversion failed: {result.stderr}") + return None + + except Exception as e: + logger.error(f"Office conversion failed for {input_path}: {str(e)}") + return None + + +def convert_document_to_images(file_path: str) -> List[str]: + """Convert any supported document to base64-encoded images""" + file_ext = Path(file_path).suffix.lower() + + if file_ext == '.pdf': + return convert_pdf_to_images(file_path) + + elif file_ext in ['.docx', '.doc', '.ppt', '.pptx']: + pdf_path = convert_office_to_pdf(file_path) + if pdf_path: + images = convert_pdf_to_images(pdf_path) + try: + os.remove(pdf_path) + os.rmdir(os.path.dirname(pdf_path)) + except: + pass + return images + return [] + + elif file_ext in ['.mp4', '.avi', '.mov', '.mkv']: + return extract_video_frames(file_path) + + return [] + + +def extract_video_frames(video_path: str, num_frames: int = 10) -> List[str]: + """Extract key frames from video""" + try: + import cv2 + + cap = cv2.VideoCapture(video_path) + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + + if total_frames == 0: + return [] + + frame_indices = [int(total_frames * i / (num_frames + 1)) for i in range(1, num_frames + 1)] + base64_frames = [] + + for idx in frame_indices: + cap.set(cv2.CAP_PROP_POS_FRAMES, idx) + ret, frame = cap.read() + + if ret: + height, width = frame.shape[:2] + if width > 1280: + scale = 1280 / width + frame = cv2.resize(frame, (1280, int(height * scale))) + + _, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) + frame_base64 = base64.b64encode(buffer).decode('utf-8') + base64_frames.append(frame_base64) + + cap.release() + return base64_frames + + except Exception as e: + logger.error(f"Video frame extraction failed for {video_path}: {str(e)}") + return [] + + +async def call_api_single_batch(images_batch: List[str], file_type: str, + session: aiohttp.ClientSession, batch_num: int = 0) -> tuple[str, bool, int]: + """ + Call API to process a single batch of images + Returns: (content, success, status_code) + """ + messages = [{"role": "system", "content": SYSTEM_PROMPT}] + + batch_info = f" (batch {batch_num})" if batch_num > 0 else "" + content = [ + {"type": "text", "text": f"Please analyze the following {file_type} pages/frames{batch_info} and extract the operation workflow:"} + ] + + for img_b64 in images_batch: + content.append({ + "type": "image_url", + "image_url": {"url": f"data:image/jpeg;base64,{img_b64}"} + }) + + messages.append({"role": "user", "content": content}) + + try: + headers = { + "Authorization": f"Bearer {API_KEY}", + "Content-Type": "application/json" + } + + payload = { + "model": MODEL_NAME, + "messages": messages, + "max_tokens": 8192 + } + + async with session.post(API_URL, headers=headers, json=payload, timeout=180) as response: + status_code = response.status + if status_code == 200: + result = await response.json() + return result['choices'][0]['message']['content'], True, status_code + else: + error_text = await response.text() + return f"[API call failed: {status_code}]\n{error_text}", False, status_code + + except asyncio.TimeoutError: + return "[API call timeout]", False, 0 + except Exception as e: + return f"[API call error: {str(e)}]", False, 0 + + +async def call_multimodal_api_with_retry(file_path: str, session: aiohttp.ClientSession) -> tuple[str, bool]: + """ + Call multimodal API to analyze document images with retry mechanism + Returns: (content, success) + """ + images_base64 = convert_document_to_images(file_path) + + if not images_base64: + error_msg = f"[Document conversion failed: unable to convert {Path(file_path).name} to images]" + return error_msg, False + + file_type = "video" if Path(file_path).suffix.lower() in ['.mp4', '.avi', '.mov', '.mkv'] else "document" + total_images = len(images_base64) + + if total_images > MAX_IMAGES_PER_REQUEST: + images_base64 = images_base64[:MAX_IMAGES_PER_REQUEST] + total_images = MAX_IMAGES_PER_REQUEST + + for attempt in range(1, MAX_RETRY_ATTEMPTS + 1): + try: + content, success, status_code = await call_api_single_batch(images_base64, file_type, session) + + if success: + return content, True + + if status_code == 413: + return f"[File too large: server refused to process the file]", False + + if attempt < MAX_RETRY_ATTEMPTS: + delay = RETRY_DELAY * (RETRY_BACKOFF ** (attempt - 1)) + logger.info(f"\nRetry {attempt}/{MAX_RETRY_ATTEMPTS}: {Path(file_path).name} (waiting {delay}s)") + stats.add_retry() + await asyncio.sleep(delay) + continue + + return content, False + + except asyncio.TimeoutError: + if attempt < MAX_RETRY_ATTEMPTS: + delay = RETRY_DELAY * (RETRY_BACKOFF ** (attempt - 1)) + logger.info(f"\nRetry {attempt}/{MAX_RETRY_ATTEMPTS}: {Path(file_path).name} (timeout, waiting {delay}s)") + stats.add_retry() + await asyncio.sleep(delay) + continue + return "[API call timeout]", False + + except Exception as e: + if attempt < MAX_RETRY_ATTEMPTS: + delay = RETRY_DELAY * (RETRY_BACKOFF ** (attempt - 1)) + logger.info(f"\nRetry {attempt}/{MAX_RETRY_ATTEMPTS}: {Path(file_path).name} (error, waiting {delay}s)") + stats.add_retry() + await asyncio.sleep(delay) + continue + return f"[API call error: {str(e)}]", False + + return "[Max retry attempts reached]", False + + +async def process_file(file_path: str, session: aiohttp.ClientSession, + semaphore: asyncio.Semaphore): + """Process a single file""" + async with semaphore: + try: + content, success = await call_multimodal_api_with_retry(file_path, session) + + file_path_obj = Path(file_path).resolve() + input_folder_obj = Path(INPUT_FOLDER).resolve() + + try: + rel_path = file_path_obj.relative_to(input_folder_obj) + software_name = rel_path.parts[0] if len(rel_path.parts) > 1 else "unknown" + except ValueError: + software_name = "unknown" + + file_stem = file_path_obj.stem + test_id = file_stem + output_file = Path(EXAMPLES_FOLDER) / software_name / f"{file_stem}.json" + output_file.parent.mkdir(parents=True, exist_ok=True) + + import re + match = re.search(r'```json\s*([\s\S]*?)\s*```', content) + content = match.group(1) if match else content + + if success: + api_result = json.loads(content) + + data = { + "id": test_id, + "snapshot": "snapshot", + "instruction": api_result.get("steps", ""), + "source": "custom", + "config": [], + "trajectory": "trajectories/", + "related_apps": [software_name], + "evaluator": { + "postconfig": [ + { + "type": "sleep", + "parameters": { + "seconds": 3 + } + } + ], + "func": "vllm_eval" + }, + "proxy": False, + "fixed_ip": False, + "possibility_of_env_change": "low", + "metadata": { + "input_files": api_result.get("input_files", []), + "task_goal": api_result.get("task_goal", "") + } + } + + if software_name not in software_tests: + software_tests[software_name] = [] + software_tests[software_name].append(test_id) + + else: + data = { + "id": test_id, + "error": content, + "status": "failed" + } + + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + if success: + stats.add_completed() + else: + stats.add_failed(file_path, content) + + except Exception as e: + error_msg = str(e) + stats.add_failed(file_path, error_msg) + logger.error(f"\nError processing {file_path}: {error_msg}") + + +def find_all_files(input_folder: str) -> List[str]: + """Recursively find all supported files""" + all_files = [] + + for root, dirs, files in os.walk(input_folder): + for file in files: + file_path = os.path.join(root, file) + if Path(file_path).suffix.lower() in SUPPORTED_EXTENSIONS: + all_files.append(file_path) + + return all_files + + +def save_test_all_json(): + """Save aggregated test_all.json""" + test_all_path = Path(TEST_ALL_JSON) + if test_all_path.exists(): + with open(test_all_path, 'r', encoding='utf-8') as f: + existing_data = json.load(f) + else: + existing_data = {} + + for software, test_ids in software_tests.items(): + if software in existing_data: + existing_data[software] = list(set(existing_data[software] + test_ids)) + else: + existing_data[software] = test_ids + + test_all_path.parent.mkdir(parents=True, exist_ok=True) + with open(test_all_path, 'w', encoding='utf-8') as f: + json.dump(existing_data, f, ensure_ascii=False, indent=2) + + logger.info(f"\nTest index updated: {test_all_path}") + logger.info(f"Software included: {list(existing_data.keys())}") + + +async def main(): + """Main function""" + if not check_dependencies(): + return + + if not Path(INPUT_FOLDER).exists(): + logger.error(f"Input directory does not exist: {INPUT_FOLDER}") + return + + Path(EXAMPLES_FOLDER).mkdir(parents=True, exist_ok=True) + + logger.info("Scanning files...") + logger.info(f"Input directory: {INPUT_FOLDER}") + logger.info(f"Output directory: {EXAMPLES_FOLDER}") + logger.info(f"Test index file: {TEST_ALL_JSON}\n") + + files = find_all_files(INPUT_FOLDER) + stats.total_files = len(files) + + logger.info(f"Found {len(files)} files") + logger.info(f"Configuration: max retries={MAX_RETRY_ATTEMPTS}, concurrency={MAX_CONCURRENT_REQUESTS}") + logger.info("=" * 60 + "\n") + + if not files: + logger.warning("No supported files found") + return + + semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) + + async with aiohttp.ClientSession() as session: + tasks = [ + process_file(file, session, semaphore) + for file in files + ] + await asyncio.gather(*tasks, return_exceptions=True) + + save_test_all_json() + stats.print_summary() + + logger.info("\nCompleted!") + logger.info(f" - Test cases saved to: {EXAMPLES_FOLDER}") + logger.info(f" - Test index updated: {TEST_ALL_JSON}") + + +if __name__ == "__main__": + asyncio.run(main())