Files
sci-gui-agent-benchmark/evaluation_examples/extract_instructions_v2.py
lizhanyuan 9899d4a0c7 feat: 新增科研软件 benchmark 任务数据
- 新增 avogadro/imagej/jade/origin/ovito/pymol/vesta 等科研软件任务 JSON
- 修改 vllm_eval.py,修改图片文件名称为第x步
- desktop_env.py 添加额外数据参数 config 和 metadata
2026-02-25 15:19:36 +08:00

566 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import asyncio
import aiohttp
import base64
import logging
from pathlib import Path
from typing import List, Optional
import tempfile
import shutil
from dataclasses import dataclass
from datetime import datetime
import json
import re
# Configuration
SCRIPT_DIR = Path(__file__).parent
PROJECT_ROOT = SCRIPT_DIR.parent
API_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
API_URL = f"{API_BASE_URL}/chat/completions"
API_KEY = os.getenv("OPENAI_API_KEY")
MODEL_NAME = os.getenv("EXTRACT_MODEL", "gpt-4o") # Configurable via env var
MAX_CONCURRENT_REQUESTS = 5
# Input folder where PDFs/Docs are stored, organized by software name
# e.g. evaluation_examples/inputs/vesta/tutorial.pdf
INPUT_FOLDER = PROJECT_ROOT / "evaluation_examples" / "inputs"
EXAMPLES_FOLDER = PROJECT_ROOT / "evaluation_examples" / "examples"
TEST_ALL_JSON = PROJECT_ROOT / "evaluation_examples" / "test_all.json"
# Retry configuration
MAX_RETRY_ATTEMPTS = 3
RETRY_DELAY = 5
RETRY_BACKOFF = 2
# Image limit - keep low to avoid 413 payload too large errors
MAX_IMAGES_PER_REQUEST = 20
# Supported file extensions
SUPPORTED_EXTENSIONS = {'.docx', '.doc', '.ppt', '.pptx', '.pdf', '.mp4', '.avi', '.mov', '.mkv'}
# Software-specific launch config and snapshot mapping
# Maps software folder name -> {"snapshot": ..., "config": [...]}
SOFTWARE_CONFIG = {
"avogadro": {
"snapshot": "avogadro",
"config": [
{"type": "launch", "parameters": {"command": ["C:\\Avogadro2\\bin\\avogadro2.exe"]}},
{"type": "sleep", "parameters": {"seconds": 5}}
]
},
"imagej": {
"snapshot": "imagej",
"config": [
{"type": "launch", "parameters": {"command": ["C:\\ImageJ\\ImageJ.exe"]}},
{"type": "sleep", "parameters": {"seconds": 5}}
]
},
"origin": {
"snapshot": "origin",
"config": [
{"type": "launch", "parameters": {"command": ["C:\\OriginLab\\Origin2025b\\Origin64.exe"]}},
{"type": "sleep", "parameters": {"seconds": 5}}
]
},
"ovito": {
"snapshot": "ovito",
"config": [
{"type": "launch", "parameters": {"command": ["C:\\OVITO Basic\\ovito.exe"]}},
{"type": "sleep", "parameters": {"seconds": 5}}
]
},
"pymol": {
"snapshot": "pymol",
"config": [
{"type": "launch", "parameters": {"command": ["C:\\PYMOL\\PyMOLWin.exe"]}},
{"type": "sleep", "parameters": {"seconds": 5}}
]
},
"vesta": {
"snapshot": "vesta",
"config": [
{"type": "launch", "parameters": {"command": ["C:\\VESTA-win64\\VESTA.exe"]}},
{"type": "sleep", "parameters": {"seconds": 5}}
]
},
}
# Default config for unknown software
DEFAULT_SOFTWARE_CONFIG = {
"snapshot": "snapshot",
"config": []
}
SYSTEM_PROMPT = """你是一个科研软件 GUI 自动化测试专家。你的任务是从教程文档中提取出多个**具体的、可执行的、可验证的** GUI 操作任务。
## 核心要求
这些任务将被用于测试 AI Agent 操控桌面软件的能力。每个任务必须足够具体,让 Agent 明确知道要做什么,做完后能通过截图判断是否成功。
## 任务粒度要求(非常重要)
- **每个任务应该是 3-8 步 GUI 操作就能完成的小任务**
- **task_goal 必须包含具体的参数值、文件名、菜单路径等细节**
- **绝对不要写模糊的指令**
### ❌ 错误示例(太模糊):
- "Perform phase identification" — Agent 不知道用哪个文件、选什么参数
- "Export data" — 导出什么格式?保存到哪里?
- "Calculate crystallite size" — 选哪个峰?什么参数?
### ✅ 正确示例(具体可执行):
- "在 ImageJ 中,通过 File → Open 打开桌面上的 cell_image.tif 文件"
- "在 ImageJ 中,使用 Image → Adjust → Threshold 对当前图像进行阈值分割,选择 Default 方法并点击 Apply"
- "在 ImageJ 中,通过 Analyze → Measure 测量当前选区的面积和平均灰度值"
- "在 ImageJ 中,使用 Process → Filters → Gaussian Blur 对图像施加半径为 2.0 像素的高斯模糊"
- "在 Avogadro 2 中,通过 Build → Insert → Molecule 搜索并插入一个 benzene 分子"
- "在 VESTA 中通过 File → Open 打开桌面上的 Si.cif 文件,然后将视角旋转到 [110] 方向"
## 输出格式
返回严格的 JSON 对象:
{
"tasks": [
{
"task_goal": "一句话具体描述要做什么(包含软件名、菜单路径、文件名、参数值等具体信息)。用中文。",
"input_files": ["涉及的文件名列表,如 'sample.raw'。如果不需要输入文件则为空列表 []"],
"steps": "详细的 GUI 操作步骤,带编号,用换行分隔"
}
]
}
## 任务提取规则
1. **独立性**:每个任务都能独立完成(假设软件已打开或从头启动)
2. **具体性**task_goal 中必须包含教程中提到的具体文件名、参数值、菜单名称
3. **可验证性**:完成后应该能从屏幕截图看出任务是否成功(例如:文件已打开、图表已显示、对话框已出现等)
4. **忠实性**:只描述教程中实际出现的操作,不要编造功能
5. **数量**:从一份教程中提取 10-15 个不同的任务,覆盖教程的各个章节。优先选择最常用、最有代表性的操作
6. **软件名称**task_goal 必须以「在 XXX 中,」开头,明确指出软件名称
7. **难度分布**包含简单2-3步、中等4-5步、较难6-8步的任务各占三分之一
"""
# Logging configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
stats = None # Will be initialized in main
@dataclass
class ProcessingStats:
"""Processing statistics tracker"""
total_files: int = 0
completed_files: int = 0
failed_files: int = 0
retried_files: int = 0
generated_tasks: int = 0
start_time: datetime = None
failed_list: List[tuple] = None
def __post_init__(self):
if self.start_time is None:
self.start_time = datetime.now()
if self.failed_list is None:
self.failed_list = []
def add_completed(self, num_tasks=1):
self.completed_files += 1
self.generated_tasks += num_tasks
self._log_progress()
def add_failed(self, file_path: str, error: str):
self.failed_files += 1
self.failed_list.append((file_path, error))
self._log_progress()
def add_retry(self):
self.retried_files += 1
def _log_progress(self):
processed = self.completed_files + self.failed_files
percentage = (processed / self.total_files * 100) if self.total_files > 0 else 0
elapsed = (datetime.now() - self.start_time).total_seconds()
logger.info(f"Progress: {processed}/{self.total_files} ({percentage:.1f}%) | "
f"Tasks Gen: {self.generated_tasks} | Failed: {self.failed_files}")
# -----------------------------------------------------------------------------
# Dependency Checks & File Conversion (Copied & Adapted from original script)
# -----------------------------------------------------------------------------
def check_dependencies():
"""Check and prompt for missing dependencies"""
missing = []
try: import pdf2image
except ImportError: missing.append("pdf2image")
try: import PIL
except ImportError: missing.append("Pillow")
try: import cv2
except ImportError: missing.append("opencv-python")
if not shutil.which("soffice") and not shutil.which("libreoffice"):
logger.warning("LibreOffice not detected (needed for .doc/.ppt)")
if missing:
logger.error(f"Missing dependencies: {', '.join(missing)}")
return False
return True
def convert_pdf_to_images(pdf_path: str) -> List[str]:
try:
from pdf2image import convert_from_path
import io
# First, get total page count at very low DPI
quick_check = convert_from_path(pdf_path, dpi=36, fmt='jpeg')
total_pages = len(quick_check)
del quick_check
# For large PDFs: lower DPI + sample pages evenly
if total_pages > MAX_IMAGES_PER_REQUEST:
dpi = 100 # lower DPI for large docs
quality = 80
# Sample pages evenly across the document
step = total_pages / MAX_IMAGES_PER_REQUEST
selected_pages = [int(step * i) + 1 for i in range(MAX_IMAGES_PER_REQUEST)]
logger.info(f"Large PDF ({total_pages} pages): sampling {len(selected_pages)} pages at {dpi} DPI")
base64_images = []
for page_num in selected_pages:
imgs = convert_from_path(pdf_path, dpi=dpi, fmt='jpeg',
first_page=page_num, last_page=page_num)
if imgs:
buffer = io.BytesIO()
imgs[0].save(buffer, format='JPEG', quality=quality)
base64_images.append(base64.b64encode(buffer.getvalue()).decode('utf-8'))
return base64_images
else:
# Small PDF: convert all pages at normal quality
dpi = 150
quality = 90
logger.info(f"PDF ({total_pages} pages) at {dpi} DPI")
images = convert_from_path(pdf_path, dpi=dpi, fmt='jpeg')
base64_images = []
for img in images:
buffer = io.BytesIO()
img.save(buffer, format='JPEG', quality=quality)
base64_images.append(base64.b64encode(buffer.getvalue()).decode('utf-8'))
return base64_images
except Exception as e:
logger.error(f"PDF conversion failed: {e}")
return []
def convert_office_to_pdf(input_path: str) -> Optional[str]:
try:
import subprocess
temp_dir = tempfile.mkdtemp()
soffice_cmd = "soffice" if shutil.which("soffice") else "libreoffice"
if not soffice_cmd: return None
cmd = [soffice_cmd, "--headless", "--convert-to", "pdf", "--outdir", temp_dir, input_path]
subprocess.run(cmd, capture_output=True, timeout=60)
pdf_name = Path(input_path).stem + ".pdf"
pdf_path = os.path.join(temp_dir, pdf_name)
return pdf_path if os.path.exists(pdf_path) else None
except Exception:
return None
def extract_video_frames(video_path: str, num_frames=10) -> List[str]:
try:
import cv2
cap = cv2.VideoCapture(video_path)
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if total == 0: return []
indices = [int(total * i / (num_frames + 1)) for i in range(1, num_frames + 1)]
frames = []
for idx in indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = cap.read()
if ret:
h, w = frame.shape[:2]
if w > 1280:
scale = 1280/w
frame = cv2.resize(frame, (1280, int(h*scale)))
_, buf = cv2.imencode('.jpg', frame)
frames.append(base64.b64encode(buf).decode('utf-8'))
cap.release()
return frames
except Exception:
return []
def convert_document_to_images(file_path: str) -> List[str]:
path = Path(file_path)
ext = path.suffix.lower()
if ext == '.pdf':
return convert_pdf_to_images(file_path)
elif ext in ['.docx', '.doc', '.ppt', '.pptx']:
pdf = convert_office_to_pdf(file_path)
if pdf:
imgs = convert_pdf_to_images(pdf)
shutil.rmtree(os.path.dirname(pdf), ignore_errors=True)
return imgs
elif ext in ['.mp4', '.avi', '.mov', '.mkv']:
return extract_video_frames(file_path)
return []
# -----------------------------------------------------------------------------
# API Interaction
# -----------------------------------------------------------------------------
async def call_multimodal_api(images_b64: List[str], session: aiohttp.ClientSession) -> tuple[str, bool]:
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
content = [{"type": "text", "text": "Analyze these tutorial pages and extract benchmark tasks as JSON."}]
# Cap images to avoid huge payloads
subset_images = images_b64[:MAX_IMAGES_PER_REQUEST]
for img in subset_images:
content.append({
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{img}"}
})
messages.append({"role": "user", "content": content})
for attempt in range(1, MAX_RETRY_ATTEMPTS + 1):
try:
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
# Add site specific headers if using openrouter or others if needed
payload = {
"model": MODEL_NAME,
"messages": messages,
"max_tokens": 4096,
}
async with session.post(API_URL, headers=headers, json=payload, timeout=180) as response:
if response.status == 200:
res_json = await response.json()
return res_json['choices'][0]['message']['content'], True
else:
err = await response.text()
logger.warning(f"API Error ({response.status}): {err}")
if attempt < MAX_RETRY_ATTEMPTS:
await asyncio.sleep(RETRY_DELAY)
else:
return f"API Error: {err}", False
except Exception as e:
logger.warning(f"Exception: {e}")
if attempt < MAX_RETRY_ATTEMPTS:
await asyncio.sleep(RETRY_DELAY)
else:
return str(e), False
return "Max retries", False
# -----------------------------------------------------------------------------
# Main Logic
# -----------------------------------------------------------------------------
software_tests = {} # Global dict to track software -> [test_ids]
FORCE_REGENERATE = False # Set via --force flag
async def process_file(file_path: str, session: aiohttp.ClientSession, semaphore: asyncio.Semaphore):
async with semaphore:
file_path_obj = Path(file_path)
file_stem = file_path_obj.stem
# Infer software name from folder structure
try:
rel_path = file_path_obj.relative_to(INPUT_FOLDER)
software_name = rel_path.parts[0] if len(rel_path.parts) > 1 else "unknown"
except ValueError:
software_name = "unknown"
# Skip if already processed (check if task1 json exists)
existing_task1 = EXAMPLES_FOLDER / software_name / f"{file_stem}_task1.json"
if existing_task1.exists() and not FORCE_REGENERATE:
logger.info(f"Skipping (already processed): {file_path_obj.name} → use --force to regenerate")
# Still register existing tasks in software_tests for test_all.json
import glob as g
existing_tasks = g.glob(str(EXAMPLES_FOLDER / software_name / f"{file_stem}_task*.json"))
for t in existing_tasks:
tid = Path(t).stem
if software_name not in software_tests:
software_tests[software_name] = []
software_tests[software_name].append(tid)
stats.add_completed(num_tasks=len(existing_tasks))
return
logger.info(f"Processing: {file_path_obj.name}")
# 1. Convert to images
images = convert_document_to_images(file_path)
if not images:
stats.add_failed(file_path, "No images extracted")
return
# 2. Call API
content, success = await call_multimodal_api(images, session)
if not success:
stats.add_failed(file_path, content)
return
# 3. Parse JSON
try:
# Try to find JSON block if mixed with text
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
json_str = json_match.group(0)
else:
json_str = content
api_result = json.loads(json_str)
tasks = api_result.get("tasks", [])
if not tasks:
logger.warning(f"No tasks found in JSON for {file_path}")
return
except json.JSONDecodeError as e:
stats.add_failed(file_path, f"JSON Parse Error: {e}")
logger.error(f"Raw content: {content[:200]}...")
return
# 4. Generate Output Files
for i, task in enumerate(tasks, 1):
test_id = f"{file_stem}_task{i}"
output_file = EXAMPLES_FOLDER / software_name / f"{test_id}.json"
output_file.parent.mkdir(parents=True, exist_ok=True)
# Get software-specific config
sw_cfg = SOFTWARE_CONFIG.get(software_name, DEFAULT_SOFTWARE_CONFIG)
# Construct the OSWorld/Jade Benchmark Standard JSON
task_json = {
"id": test_id,
"snapshot": sw_cfg["snapshot"],
"instruction": task.get("task_goal", ""),
"source": "custom",
"config": sw_cfg["config"],
"trajectory": "trajectories/",
"related_apps": [software_name],
"evaluator": {
"postconfig": [
{
"type": "sleep",
"parameters": {
"seconds": 3
}
}
],
"func": "vllm_eval"
# "result" field is NOT needed for vllm_eval
},
"proxy": False,
"fixed_ip": False,
"possibility_of_env_change": "low",
"metadata": {
"input_files": task.get("input_files", []),
"steps": task.get("steps", "")
}
}
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(task_json, f, ensure_ascii=False, indent=2)
# Register to global index
if software_name not in software_tests:
software_tests[software_name] = []
software_tests[software_name].append(test_id)
stats.add_completed(num_tasks=len(tasks))
logger.info(f"Generated {len(tasks)} tasks for {file_path_obj.name}")
def save_test_all_json():
"""Update test_all.json with new tests"""
test_all_meta_path = Path(TEST_ALL_JSON)
existing_data = {}
if test_all_meta_path.exists():
try:
with open(test_all_meta_path, 'r', encoding='utf-8') as f:
existing_data = json.load(f)
except: pass
# Merge new tests
for software, test_ids in software_tests.items():
current_list = existing_data.get(software, [])
# Append unique
updated_list = sorted(list(set(current_list + test_ids)))
existing_data[software] = updated_list
with open(test_all_meta_path, 'w', encoding='utf-8') as f:
json.dump(existing_data, f, ensure_ascii=False, indent=2)
# Also save a 'test_custom.json' that ONLY contains the softwares we just processed/have in our inputs
# This is useful for running ONLY your custom benchmarks without OSWorld defaults
custom_data = {}
# We scan the INPUT_FOLDER to see which softwares are "ours"
custom_softwares = set()
if INPUT_FOLDER.exists():
for item in os.listdir(INPUT_FOLDER):
if (INPUT_FOLDER / item).is_dir():
custom_softwares.add(item)
for software in custom_softwares:
if software in existing_data:
custom_data[software] = existing_data[software]
test_custom_path = PROJECT_ROOT / "evaluation_examples" / "test_custom.json"
with open(test_custom_path, 'w', encoding='utf-8') as f:
json.dump(custom_data, f, ensure_ascii=False, indent=2)
logger.info(f"Custom test index saved to: {test_custom_path}")
async def main():
global stats, FORCE_REGENERATE
stats = ProcessingStats()
# Parse --force flag
FORCE_REGENERATE = "--force" in sys.argv
if not API_KEY:
logger.error("OPENAI_API_KEY environment variable not set.")
return
# Check/Create Input Folder
if not INPUT_FOLDER.exists():
logger.warning(f"Input folder {INPUT_FOLDER} does not exist. Creating it.")
INPUT_FOLDER.mkdir(parents=True, exist_ok=True)
logger.info(f"Please put software PDF tutorials into subfolders in: {INPUT_FOLDER}")
return
# Find files
files = []
for root, _, filenames in os.walk(INPUT_FOLDER):
for f in filenames:
if Path(f).suffix.lower() in SUPPORTED_EXTENSIONS:
files.append(os.path.join(root, f))
stats.total_files = len(files)
logger.info(f"Found {len(files)} files in {INPUT_FOLDER}")
if not files:
logger.info("No files to process.")
return
# Process
semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
async with aiohttp.ClientSession() as session:
tasks = [process_file(f, session, semaphore) for f in files]
await asyncio.gather(*tasks)
# Save Index
save_test_all_json()
logger.info("Done.")
if __name__ == "__main__":
asyncio.run(main())