Files
issacdataengine/scripts/generate_pick_configs.py
2026-03-18 00:35:19 +08:00

227 lines
9.5 KiB
Python

#!/usr/bin/env python3
"""
Batch generate YAML config files for pick_and_place tasks.
Replaces asset names and USD paths based on available assets.
"""
import os
import shutil
from pathlib import Path
from typing import List, Dict, Optional
# Configuration
ASSETS_DIR = Path("workflows/simbox/assets/pick_and_place/pre-train-pick/assets")
FUNCTIONAL_ASSETS_DIR = Path("workflows/simbox/assets/pick_and_place/functional-pick-assets")
CONFIG_BASE_DIR = Path("workflows/simbox/core/configs/tasks/pick_and_place")
# Exclude these asset folders from pre-train-pick
EXCLUDE_ASSETS = ["google_scan-book", "google_scan-box", "omniobject3d-rubik_cube-old"]
# Template configs: (robot, task_type, arm_side, template_path, template_asset_name, assets_dir_type)
# task_type: "single_pick", "single_pnp", "single_func_pick", etc.
# arm_side: "left", "right", or None (for single-arm robots like franka)
# assets_dir_type: "pre-train" or "functional"
TEMPLATE_CONFIGS = [
# lift2 - single_pick (pre-train assets)
("lift2", "single_pick", "left", "lift2/single_pick/left/omniobject3d-banana.yaml", "omniobject3d-banana", "pre-train"),
("lift2", "single_pick", "right", "lift2/single_pick/right/omniobject3d-banana.yaml", "omniobject3d-banana", "pre-train"),
# split_aloha - single_pick (pre-train assets)
("split_aloha", "single_pick", "left", "split_aloha/single_pick/left/omniobject3d-banana.yaml", "omniobject3d-banana", "pre-train"),
("split_aloha", "single_pick", "right", "split_aloha/single_pick/right/omniobject3d-banana.yaml", "omniobject3d-banana", "pre-train"),
# franka - single_pick (pre-train assets, single arm, no left/right)
("franka", "single_pick", None, "franka/single_pick/omniobject3d-banana.yaml", "omniobject3d-banana", "pre-train"),
# genie1 - single_pick (pre-train assets)
("genie1", "single_pick", "left", "genie1/single_pick/left/omniobject3d-lemon.yaml", "omniobject3d-lemon", "pre-train"),
("genie1", "single_pick", "right", "genie1/single_pick/right/omniobject3d-lemon.yaml", "omniobject3d-lemon", "pre-train"),
# genie1 - single_pnp (pre-train assets, pick and place)
("genie1", "single_pnp", "left", "genie1/single_pnp/left/omniobject3d-banana.yaml", "omniobject3d-banana", "pre-train"),
("genie1", "single_pnp", "right", "genie1/single_pnp/right/omniobject3d-banana.yaml", "omniobject3d-banana", "pre-train"),
]
# Functional pick template configs (uses functional-pick-assets)
FUNCTIONAL_TEMPLATE_CONFIGS = [
# genie1 - single_func_pick (functional assets)
("genie1", "single_func_pick", "left", "genie1/single_func_pick/left/omniobject3d-hammer.yaml", "omniobject3d-hammer", "functional"),
("genie1", "single_func_pick", "right", "genie1/single_func_pick/right/omniobject3d-hammer.yaml", "omniobject3d-hammer", "functional"),
]
def get_all_assets(assets_dir: Path, exclude: List[str] = None) -> List[str]:
"""Get all asset folder names, excluding specified ones."""
if exclude is None:
exclude = []
assets = []
for item in sorted(assets_dir.iterdir()):
if item.is_dir() and item.name not in exclude:
assets.append(item.name)
return assets
def get_first_usd_path(assets_dir: Path, asset_name: str, assets_dir_type: str = "pre-train") -> Optional[str]:
"""Get the first USD file path for an asset."""
asset_folder = assets_dir / asset_name
if not asset_folder.exists():
return None
# Get all subfolders
subfolders = sorted([f for f in asset_folder.iterdir() if f.is_dir()])
if not subfolders:
return None
# Find the first subfolder with Aligned_obj.usd
for subfolder in subfolders:
usd_file = subfolder / "Aligned_obj.usd"
if usd_file.exists():
if assets_dir_type == "functional":
return f"pick_and_place/functional-pick-assets/{asset_name}/{subfolder.name}/Aligned_obj.usd"
else:
return f"pick_and_place/pre-train-pick/assets/{asset_name}/{subfolder.name}/Aligned_obj.usd"
return None
def replace_in_yaml(content: str, old_asset: str, new_asset: str, new_usd_path: str) -> str:
"""Replace asset name and USD path in YAML content.
Note: Does NOT modify distractors section - it should remain unchanged.
Only replaces paths that end with .usd (object paths, not directory paths).
"""
lines = content.split('\n')
new_lines = []
for line in lines:
# Replace path line ONLY if it's a USD file path (ends with .usd)
# This distinguishes object paths from distractor directory paths
# Support both pre-train-pick/assets and functional-pick-assets
if "path:" in line and "pick_and_place/" in line and ".usd" in line:
# Replace the USD path
indent = len(line) - len(line.lstrip())
new_lines.append(" " * indent + f"path: {new_usd_path}")
elif "category:" in line and old_asset in line:
# Replace category
indent = len(line) - len(line.lstrip())
new_lines.append(" " * indent + f'category: "{new_asset}"')
elif "task_dir:" in line and old_asset in line:
# Replace task_dir
indent = len(line) - len(line.lstrip())
# Extract the directory structure before the asset name
old_task_dir = line.split('"')[1]
new_task_dir = old_task_dir.replace(old_asset, new_asset)
new_lines.append(" " * indent + f'task_dir: "{new_task_dir}"')
else:
new_lines.append(line)
return '\n'.join(new_lines)
def generate_configs(
assets: List[str],
template_configs: List[tuple],
config_base_dir: Path,
assets_dir: Path,
dry_run: bool = True
):
"""Generate YAML config files for all assets."""
for robot, task_type, arm_side, template_rel_path, template_asset_name, assets_dir_type in template_configs:
template_path = config_base_dir / template_rel_path
if not template_path.exists():
print(f"[WARNING] Template not found: {template_path}")
continue
# Read template content
with open(template_path, 'r') as f:
template_content = f.read()
# Determine output directory based on task_type and arm_side
if arm_side is None:
# Single-arm robot (like franka)
output_dir = config_base_dir / robot / task_type
else:
# Dual-arm robot with left/right
output_dir = config_base_dir / robot / task_type / arm_side
output_dir.mkdir(parents=True, exist_ok=True)
for asset_name in assets:
# Get USD path for this asset
usd_path = get_first_usd_path(assets_dir, asset_name, assets_dir_type)
if usd_path is None:
print(f"[SKIP] No USD found for {asset_name}")
continue
# Generate new content
new_content = replace_in_yaml(
template_content,
template_asset_name,
asset_name,
usd_path
)
# Output file path
output_file = output_dir / f"{asset_name}.yaml"
if dry_run:
print(f"[DRY-RUN] Would create: {output_file}")
print(f" USD path: {usd_path}")
else:
with open(output_file, 'w') as f:
f.write(new_content)
print(f"[CREATED] {output_file}")
def main():
import argparse
parser = argparse.ArgumentParser(description="Batch generate YAML config files")
parser.add_argument("--dry-run", action="store_true", help="Show what would be done without creating files")
parser.add_argument("--assets", nargs="+", help="Specific assets to process (default: all)")
parser.add_argument("--robots", nargs="+", help="Specific robots to process (default: all)")
parser.add_argument("--task-types", nargs="+", help="Specific task types to process (default: all)")
parser.add_argument("--functional", action="store_true", help="Generate configs for functional pick assets")
args = parser.parse_args()
# Change to repo root
script_dir = Path(__file__).parent
os.chdir(script_dir.parent)
# Select template configs based on --functional flag
if args.functional:
template_configs = FUNCTIONAL_TEMPLATE_CONFIGS
assets_dir = FUNCTIONAL_ASSETS_DIR
exclude = [] # Don't exclude anything for functional assets
else:
template_configs = TEMPLATE_CONFIGS
assets_dir = ASSETS_DIR
exclude = EXCLUDE_ASSETS
# Get assets
if args.assets:
assets = args.assets
else:
assets = get_all_assets(assets_dir, exclude)
# Filter templates by robots and task_types if specified
if args.robots:
template_configs = [t for t in template_configs if t[0] in args.robots]
if args.task_types:
template_configs = [t for t in template_configs if t[1] in args.task_types]
print(f"Assets directory: {assets_dir}")
print(f"Found {len(assets)} assets to process")
print(f"Assets: {assets[:5]}..." if len(assets) > 5 else f"Assets: {assets}")
print(f"Templates: {[(t[0], t[1], t[2]) for t in template_configs]}")
print(f"Dry run: {args.dry_run}")
print("-" * 50)
generate_configs(
assets=assets,
template_configs=template_configs,
config_base_dir=CONFIG_BASE_DIR,
assets_dir=assets_dir,
dry_run=args.dry_run
)
if __name__ == "__main__":
main()