Files
sci-gui-agent-benchmark/evaluation_examples/extract_instructions.py

605 lines
21 KiB
Python

import os
import sys
import asyncio
import aiohttp
import base64
import logging
from pathlib import Path
from typing import List, Optional
import tempfile
import shutil
from dataclasses import dataclass
from datetime import datetime
import json
# Configuration
SCRIPT_DIR = Path(__file__).parent
PROJECT_ROOT = SCRIPT_DIR.parent
API_BASE_URL = os.getenv("OPENAI_BASE_URL")
API_URL = f"{API_BASE_URL}/chat/completions" if API_BASE_URL else None
API_KEY = os.getenv("OPENAI_API_KEY")
MODEL_NAME = "gemini-2.5-pro"
MAX_CONCURRENT_REQUESTS = 5
INPUT_FOLDER = "/Users/cuihang/Downloads/test_files"
EXAMPLES_FOLDER = PROJECT_ROOT / "evaluation_examples" / "examples"
TEST_ALL_JSON = PROJECT_ROOT / "evaluation_examples" / "test_all.json"
# Retry configuration
MAX_RETRY_ATTEMPTS = 3
RETRY_DELAY = 5
RETRY_BACKOFF = 2
# Image limit
MAX_IMAGES_PER_REQUEST = 50
# Supported file extensions
SUPPORTED_EXTENSIONS = {'.docx', '.doc', '.ppt', '.pptx', '.pdf', '.mp4', '.avi', '.mov', '.mkv'}
SYSTEM_PROMPT = """You are an AI assistant that generates precise, executable step-by-step instructions for desktop software operations.
Your task:
Convert the provided document information into precise operation instructions that can be executed step-by-step by an AI agent in a software GUI.
Output requirements (no additional explanatory text):
------------------------------------------------
[Task Goal]
Describe in one sentence the final task result to be achieved in the software.
[Input Files]
Specify the file names, types, and locations involved in this operation.
- If the document provides complete paths, record them as is
- If only file names are mentioned (e.g., data.xlsx), record the filename and note "complete path not specified in document"
- If no input files are mentioned, write "no input files required"
[Detailed Operation Steps (GUI Level)]
Break down the task into atomic GUI operation steps.
Each step must meet the following conditions:
- Contains only one explicit, indivisible GUI atomic action
- Must specify the menus, panels, buttons, or controls involved
- Must specify parameter names and option values involved
- Arranged in the actual operation order of the software
- Must include software launch steps (e.g., double-click desktop icon, launch from start menu, etc.)
Step format example:
1. Double-click the [Software Name] icon on the desktop to launch the software.
2. Click "File → Open" in the main menu bar.
3. In the file selection dialog, navigate to the specified directory and select file [filename].
4. Click the "Open" button to confirm.
5. ... (and so on)
------------------------------------------------
[Handling Uncertain Information]
- Strictly generate operation steps based on document content, do not add features or menus not mentioned
- If operation steps are unclear or ambiguous, infer based on common software operation flows
- If parameter values in the document are unclear, note "[set according to actual needs]" in the step
[Output Format]
Output in JSON format with the following fields:
{
"input_files": ["file1", "file2", "..."],
"task_goal": "...",
"steps": "A string containing all operation steps, arranged in order, with numbered prefix for each step, separated by newlines"
}
Note: Output must be strict JSON format, with no extra text or explanations."""
# Logging configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
@dataclass
class ProcessingStats:
"""Processing statistics tracker"""
total_files: int = 0
completed_files: int = 0
failed_files: int = 0
retried_files: int = 0
start_time: datetime = None
failed_list: List[tuple] = None
def __post_init__(self):
if self.start_time is None:
self.start_time = datetime.now()
if self.failed_list is None:
self.failed_list = []
def add_completed(self):
self.completed_files += 1
self._log_progress()
def add_failed(self, file_path: str, error: str):
self.failed_files += 1
self.failed_list.append((file_path, error))
self._log_progress()
def add_retry(self):
self.retried_files += 1
def _log_progress(self):
processed = self.completed_files + self.failed_files
percentage = (processed / self.total_files * 100) if self.total_files > 0 else 0
elapsed = (datetime.now() - self.start_time).total_seconds()
if processed > 0:
avg_time = elapsed / processed
remaining = (self.total_files - processed) * avg_time
eta = f"{int(remaining // 60)}m{int(remaining % 60)}s"
else:
eta = "calculating..."
logger.info(f"Progress: {processed}/{self.total_files} ({percentage:.1f}%) | "
f"Success: {self.completed_files} | Failed: {self.failed_files} | "
f"Retried: {self.retried_files} | ETA: {eta}")
def print_summary(self):
elapsed = (datetime.now() - self.start_time).total_seconds()
logger.info("=" * 60)
logger.info("Processing Complete")
logger.info("=" * 60)
logger.info(f"Total files: {self.total_files}")
logger.info(f"Success: {self.completed_files}")
logger.info(f"Failed: {self.failed_files}")
logger.info(f"Total retries: {self.retried_files}")
logger.info(f"Total time: {int(elapsed // 60)}m{int(elapsed % 60)}s")
if self.failed_list:
logger.info("\nFailed files:")
for file_path, error in self.failed_list:
logger.info(f" - {file_path}")
logger.info(f" Error: {error}")
self._save_report()
def _save_report(self):
report = {
"total_files": self.total_files,
"completed": self.completed_files,
"failed": self.failed_files,
"retries": self.retried_files,
"start_time": self.start_time.isoformat(),
"end_time": datetime.now().isoformat(),
"elapsed_seconds": (datetime.now() - self.start_time).total_seconds(),
"failed_files": [{"file": f, "error": e} for f, e in self.failed_list]
}
report_file = Path(EXAMPLES_FOLDER) / "processing_report.json"
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
logger.info(f"\nDetailed report saved to: {report_file}")
stats = ProcessingStats()
software_tests = {}
def check_dependencies():
"""Check and prompt for missing dependencies"""
missing = []
try:
import pdf2image
except ImportError:
missing.append("pdf2image")
try:
import PIL
except ImportError:
missing.append("Pillow")
try:
import cv2
except ImportError:
missing.append("opencv-python or opencv-python-headless")
if not shutil.which("soffice") and not shutil.which("libreoffice"):
logger.warning("LibreOffice not detected, cannot convert .doc and .ppt files")
logger.info("Install: sudo apt-get install libreoffice (Linux) or download from https://www.libreoffice.org/")
if missing:
logger.error(f"Missing dependencies: {', '.join(missing)}")
logger.info(f"Install with: pip install {' '.join(missing)}")
logger.info("Note: pdf2image also requires poppler")
logger.info(" - Ubuntu/Debian: sudo apt-get install poppler-utils")
logger.info(" - macOS: brew install poppler")
logger.info(" - Windows: download from https://github.com/oschwartz10612/poppler-windows/releases/")
return False
return True
def convert_pdf_to_images(pdf_path: str) -> List[str]:
"""Convert PDF to base64-encoded images"""
try:
from pdf2image import convert_from_path
from PIL import Image
import io
images = convert_from_path(pdf_path, dpi=150, fmt='jpeg')
base64_images = []
for img in images:
buffer = io.BytesIO()
img.save(buffer, format='JPEG', quality=100)
img_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
base64_images.append(img_base64)
return base64_images
except Exception as e:
logger.error(f"PDF conversion failed for {pdf_path}: {str(e)}")
return []
def convert_office_to_pdf(input_path: str) -> Optional[str]:
"""Convert Office documents to PDF using LibreOffice"""
try:
import subprocess
temp_dir = tempfile.mkdtemp()
soffice_cmd = "soffice" if shutil.which("soffice") else "libreoffice"
cmd = [
soffice_cmd,
"--headless",
"--convert-to", "pdf",
"--outdir", temp_dir,
input_path
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
pdf_name = Path(input_path).stem + ".pdf"
pdf_path = os.path.join(temp_dir, pdf_name)
if os.path.exists(pdf_path):
return pdf_path
logger.error(f"LibreOffice conversion failed: {result.stderr}")
return None
except Exception as e:
logger.error(f"Office conversion failed for {input_path}: {str(e)}")
return None
def convert_document_to_images(file_path: str) -> List[str]:
"""Convert any supported document to base64-encoded images"""
file_ext = Path(file_path).suffix.lower()
if file_ext == '.pdf':
return convert_pdf_to_images(file_path)
elif file_ext in ['.docx', '.doc', '.ppt', '.pptx']:
pdf_path = convert_office_to_pdf(file_path)
if pdf_path:
images = convert_pdf_to_images(pdf_path)
try:
os.remove(pdf_path)
os.rmdir(os.path.dirname(pdf_path))
except:
pass
return images
return []
elif file_ext in ['.mp4', '.avi', '.mov', '.mkv']:
return extract_video_frames(file_path)
return []
def extract_video_frames(video_path: str, num_frames: int = 10) -> List[str]:
"""Extract key frames from video"""
try:
import cv2
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if total_frames == 0:
return []
frame_indices = [int(total_frames * i / (num_frames + 1)) for i in range(1, num_frames + 1)]
base64_frames = []
for idx in frame_indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = cap.read()
if ret:
height, width = frame.shape[:2]
if width > 1280:
scale = 1280 / width
frame = cv2.resize(frame, (1280, int(height * scale)))
_, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
frame_base64 = base64.b64encode(buffer).decode('utf-8')
base64_frames.append(frame_base64)
cap.release()
return base64_frames
except Exception as e:
logger.error(f"Video frame extraction failed for {video_path}: {str(e)}")
return []
async def call_api_single_batch(images_batch: List[str], file_type: str,
session: aiohttp.ClientSession, batch_num: int = 0) -> tuple[str, bool, int]:
"""
Call API to process a single batch of images
Returns: (content, success, status_code)
"""
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
batch_info = f" (batch {batch_num})" if batch_num > 0 else ""
content = [
{"type": "text", "text": f"Please analyze the following {file_type} pages/frames{batch_info} and extract the operation workflow:"}
]
for img_b64 in images_batch:
content.append({
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{img_b64}"}
})
messages.append({"role": "user", "content": content})
try:
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": MODEL_NAME,
"messages": messages,
"max_tokens": 8192
}
async with session.post(API_URL, headers=headers, json=payload, timeout=180) as response:
status_code = response.status
if status_code == 200:
result = await response.json()
return result['choices'][0]['message']['content'], True, status_code
else:
error_text = await response.text()
return f"[API call failed: {status_code}]\n{error_text}", False, status_code
except asyncio.TimeoutError:
return "[API call timeout]", False, 0
except Exception as e:
return f"[API call error: {str(e)}]", False, 0
async def call_multimodal_api_with_retry(file_path: str, session: aiohttp.ClientSession) -> tuple[str, bool]:
"""
Call multimodal API to analyze document images with retry mechanism
Returns: (content, success)
"""
images_base64 = convert_document_to_images(file_path)
if not images_base64:
error_msg = f"[Document conversion failed: unable to convert {Path(file_path).name} to images]"
return error_msg, False
file_type = "video" if Path(file_path).suffix.lower() in ['.mp4', '.avi', '.mov', '.mkv'] else "document"
total_images = len(images_base64)
if total_images > MAX_IMAGES_PER_REQUEST:
images_base64 = images_base64[:MAX_IMAGES_PER_REQUEST]
total_images = MAX_IMAGES_PER_REQUEST
for attempt in range(1, MAX_RETRY_ATTEMPTS + 1):
try:
content, success, status_code = await call_api_single_batch(images_base64, file_type, session)
if success:
return content, True
if status_code == 413:
return f"[File too large: server refused to process the file]", False
if attempt < MAX_RETRY_ATTEMPTS:
delay = RETRY_DELAY * (RETRY_BACKOFF ** (attempt - 1))
logger.info(f"\nRetry {attempt}/{MAX_RETRY_ATTEMPTS}: {Path(file_path).name} (waiting {delay}s)")
stats.add_retry()
await asyncio.sleep(delay)
continue
return content, False
except asyncio.TimeoutError:
if attempt < MAX_RETRY_ATTEMPTS:
delay = RETRY_DELAY * (RETRY_BACKOFF ** (attempt - 1))
logger.info(f"\nRetry {attempt}/{MAX_RETRY_ATTEMPTS}: {Path(file_path).name} (timeout, waiting {delay}s)")
stats.add_retry()
await asyncio.sleep(delay)
continue
return "[API call timeout]", False
except Exception as e:
if attempt < MAX_RETRY_ATTEMPTS:
delay = RETRY_DELAY * (RETRY_BACKOFF ** (attempt - 1))
logger.info(f"\nRetry {attempt}/{MAX_RETRY_ATTEMPTS}: {Path(file_path).name} (error, waiting {delay}s)")
stats.add_retry()
await asyncio.sleep(delay)
continue
return f"[API call error: {str(e)}]", False
return "[Max retry attempts reached]", False
async def process_file(file_path: str, session: aiohttp.ClientSession,
semaphore: asyncio.Semaphore):
"""Process a single file"""
async with semaphore:
try:
content, success = await call_multimodal_api_with_retry(file_path, session)
file_path_obj = Path(file_path).resolve()
input_folder_obj = Path(INPUT_FOLDER).resolve()
try:
rel_path = file_path_obj.relative_to(input_folder_obj)
software_name = rel_path.parts[0] if len(rel_path.parts) > 1 else "unknown"
except ValueError:
software_name = "unknown"
file_stem = file_path_obj.stem
test_id = file_stem
output_file = Path(EXAMPLES_FOLDER) / software_name / f"{file_stem}.json"
output_file.parent.mkdir(parents=True, exist_ok=True)
import re
match = re.search(r'```json\s*([\s\S]*?)\s*```', content)
content = match.group(1) if match else content
if success:
api_result = json.loads(content)
data = {
"id": test_id,
"snapshot": "snapshot",
"instruction": api_result.get("steps", ""),
"source": "custom",
"config": [],
"trajectory": "trajectories/",
"related_apps": [software_name],
"evaluator": {
"postconfig": [
{
"type": "sleep",
"parameters": {
"seconds": 3
}
}
],
"func": "vllm_eval"
},
"proxy": False,
"fixed_ip": False,
"possibility_of_env_change": "low",
"metadata": {
"input_files": api_result.get("input_files", []),
"task_goal": api_result.get("task_goal", "")
}
}
if software_name not in software_tests:
software_tests[software_name] = []
software_tests[software_name].append(test_id)
else:
data = {
"id": test_id,
"error": content,
"status": "failed"
}
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
if success:
stats.add_completed()
else:
stats.add_failed(file_path, content)
except Exception as e:
error_msg = str(e)
stats.add_failed(file_path, error_msg)
logger.error(f"\nError processing {file_path}: {error_msg}")
def find_all_files(input_folder: str) -> List[str]:
"""Recursively find all supported files"""
all_files = []
for root, dirs, files in os.walk(input_folder):
for file in files:
file_path = os.path.join(root, file)
if Path(file_path).suffix.lower() in SUPPORTED_EXTENSIONS:
all_files.append(file_path)
return all_files
def save_test_all_json():
"""Save aggregated test_all.json"""
test_all_path = Path(TEST_ALL_JSON)
if test_all_path.exists():
with open(test_all_path, 'r', encoding='utf-8') as f:
existing_data = json.load(f)
else:
existing_data = {}
for software, test_ids in software_tests.items():
if software in existing_data:
existing_data[software] = list(set(existing_data[software] + test_ids))
else:
existing_data[software] = test_ids
test_all_path.parent.mkdir(parents=True, exist_ok=True)
with open(test_all_path, 'w', encoding='utf-8') as f:
json.dump(existing_data, f, ensure_ascii=False, indent=2)
logger.info(f"\nTest index updated: {test_all_path}")
logger.info(f"Software included: {list(existing_data.keys())}")
async def main():
"""Main function"""
if not check_dependencies():
return
if not Path(INPUT_FOLDER).exists():
logger.error(f"Input directory does not exist: {INPUT_FOLDER}")
return
Path(EXAMPLES_FOLDER).mkdir(parents=True, exist_ok=True)
logger.info("Scanning files...")
logger.info(f"Input directory: {INPUT_FOLDER}")
logger.info(f"Output directory: {EXAMPLES_FOLDER}")
logger.info(f"Test index file: {TEST_ALL_JSON}\n")
files = find_all_files(INPUT_FOLDER)
stats.total_files = len(files)
logger.info(f"Found {len(files)} files")
logger.info(f"Configuration: max retries={MAX_RETRY_ATTEMPTS}, concurrency={MAX_CONCURRENT_REQUESTS}")
logger.info("=" * 60 + "\n")
if not files:
logger.warning("No supported files found")
return
semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
async with aiohttp.ClientSession() as session:
tasks = [
process_file(file, session, semaphore)
for file in files
]
await asyncio.gather(*tasks, return_exceptions=True)
save_test_all_json()
stats.print_summary()
logger.info("\nCompleted!")
logger.info(f" - Test cases saved to: {EXAMPLES_FOLDER}")
logger.info(f" - Test index updated: {TEST_ALL_JSON}")
if __name__ == "__main__":
asyncio.run(main())