feat: add test tube pick task with custom assets and grasp annotations

- Add pick_test_tube task: USDC asset repackaging, grasp generation, task config
- Add tools: usdc_to_obj.py, repackage_test_tube.py, fix_test_tube_materials.py
- Add custom_task_guide.md: full Chinese documentation for creating custom tasks
- Add crawled InternDataEngine online docs (23 pages)
- Add grasp generation script (gen_tube_grasp.py) and pipeline config
This commit is contained in:
Tangger
2026-04-05 11:01:59 +08:00
parent 6314603676
commit 3d6b73753a
36 changed files with 18013 additions and 0 deletions

328
migrate/crawl_docs.py Normal file
View File

@@ -0,0 +1,328 @@
"""
Crawl InternDataEngine online docs and save as local markdown files.
Usage:
python migrate/crawl_docs.py
python migrate/crawl_docs.py --output docs_crawled
"""
import argparse
import os
import re
import time
import urllib.request
from html.parser import HTMLParser
BASE_URL = "https://internrobotics.github.io/InternDataEngine-Docs"
# All pages from the sitemap (extracted from VitePress hash map)
PAGES = [
# Getting Started
"/guides/installation.html",
"/guides/quickstart.html",
# Core Concepts
"/concepts/workflows.html",
"/concepts/skills.html",
"/concepts/skills/overview.html",
"/concepts/skills/pick.html",
"/concepts/skills/place.html",
"/concepts/skills/articulation.html",
"/concepts/objects.html",
"/concepts/cameras.html",
"/concepts/robots.html",
"/concepts/controllers.html",
# Configuration
"/config/yaml.html",
"/config/dr.html",
"/config/assets.html",
# Customization
"/custom/assets.html",
"/custom/robot.html",
"/custom/controller.html",
"/custom/skill.html",
"/custom/task.html",
# Policy
"/policy/training.html",
# API
"/api/controllers.html",
"/api/skills.html",
# Chinese versions
"/zh/guides/installation.html",
"/zh/guides/quickstart.html",
"/zh/concepts/workflows.html",
"/zh/concepts/skills.html",
"/zh/concepts/skills/overview.html",
"/zh/concepts/skills/pick.html",
"/zh/concepts/skills/place.html",
"/zh/concepts/skills/articulation.html",
"/zh/concepts/tasks.html",
"/zh/concepts/cameras.html",
"/zh/concepts/robots.html",
"/zh/concepts/controllers.html",
"/zh/config/yaml.html",
"/zh/config/dr.html",
"/zh/config/assets.html",
"/zh/custom/assets.html",
"/zh/custom/robot.html",
"/zh/custom/controller.html",
"/zh/custom/skill.html",
"/zh/custom/task.html",
]
class HTMLToMarkdown(HTMLParser):
"""Simple HTML to Markdown converter for VitePress content."""
def __init__(self):
super().__init__()
self.output = []
self.current_tag = None
self.in_content = False
self.in_code = False
self.code_lang = ""
self.skip_tags = {"script", "style", "nav", "header", "footer", "button"}
self.skip_depth = 0
self.heading_level = 0
self.in_li = False
self.in_pre = False
self.in_a = False
self.a_href = ""
self.a_text = ""
def handle_starttag(self, tag, attrs):
attrs_dict = dict(attrs)
classes = attrs_dict.get("class", "")
# Skip navigation, header, footer
if tag in self.skip_tags:
self.skip_depth += 1
return
if self.skip_depth > 0:
return
# Track content area
if "vp-doc" in classes or "VPDoc" in classes:
self.in_content = True
if not self.in_content:
return
if tag in ("h1", "h2", "h3", "h4", "h5", "h6"):
self.heading_level = int(tag[1])
self.output.append("\n" + "#" * self.heading_level + " ")
elif tag == "p":
self.output.append("\n\n")
elif tag == "pre":
self.in_pre = True
elif tag == "code":
if self.in_pre:
self.in_code = True
lang = attrs_dict.get("class", "")
lang_match = re.search(r"language-(\w+)", lang)
self.code_lang = lang_match.group(1) if lang_match else ""
self.output.append(f"\n```{self.code_lang}\n")
else:
self.output.append("`")
elif tag == "a":
self.in_a = True
self.a_href = attrs_dict.get("href", "")
self.a_text = ""
elif tag == "ul":
self.output.append("\n")
elif tag == "ol":
self.output.append("\n")
elif tag == "li":
self.in_li = True
self.output.append("- ")
elif tag == "strong" or tag == "b":
self.output.append("**")
elif tag == "em" or tag == "i":
self.output.append("*")
elif tag == "br":
self.output.append("\n")
elif tag == "img":
alt = attrs_dict.get("alt", "")
src = attrs_dict.get("src", "")
self.output.append(f"![{alt}]({src})")
elif tag == "table":
self.output.append("\n")
elif tag == "tr":
self.output.append("| ")
elif tag == "th" or tag == "td":
pass
elif tag == "blockquote":
self.output.append("\n> ")
def handle_endtag(self, tag):
if tag in self.skip_tags:
self.skip_depth -= 1
return
if self.skip_depth > 0:
return
if not self.in_content:
return
if tag in ("h1", "h2", "h3", "h4", "h5", "h6"):
self.output.append("\n")
self.heading_level = 0
elif tag == "pre":
self.in_pre = False
elif tag == "code":
if self.in_code:
self.in_code = False
self.output.append("\n```\n")
else:
self.output.append("`")
elif tag == "a":
self.in_a = False
if self.a_href and self.a_text:
self.output.append(f"[{self.a_text.strip()}]({self.a_href})")
self.a_text = ""
elif tag == "li":
self.in_li = False
self.output.append("\n")
elif tag == "strong" or tag == "b":
self.output.append("**")
elif tag == "em" or tag == "i":
self.output.append("*")
elif tag == "tr":
self.output.append("\n")
elif tag == "th" or tag == "td":
self.output.append(" | ")
elif tag == "p":
self.output.append("\n")
def handle_data(self, data):
if self.skip_depth > 0:
return
if self.in_a:
self.a_text += data
return
if self.in_content:
if self.in_code:
self.output.append(data)
else:
text = data.strip()
if text:
self.output.append(text + " ")
def get_markdown(self):
text = "".join(self.output)
# Clean up
text = re.sub(r"\n{3,}", "\n\n", text)
text = re.sub(r"[ \t]+\n", "\n", text)
return text.strip()
def fetch_page(url):
"""Fetch a page and return HTML content."""
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(req, timeout=15) as resp:
return resp.read().decode("utf-8")
except Exception as e:
print(f" ERROR: {e}")
return None
def html_to_markdown(html_content):
"""Convert HTML to markdown."""
parser = HTMLToMarkdown()
parser.feed(html_content)
return parser.get_markdown()
def main():
parser = argparse.ArgumentParser(description="Crawl InternDataEngine docs")
parser.add_argument("--output", default="docs_crawled", help="Output directory")
parser.add_argument("--zh-only", action="store_true", help="Only crawl Chinese docs")
parser.add_argument("--en-only", action="store_true", help="Only crawl English docs")
args = parser.parse_args()
os.makedirs(args.output, exist_ok=True)
pages = PAGES
if args.zh_only:
pages = [p for p in PAGES if p.startswith("/zh/")]
elif args.en_only:
pages = [p for p in PAGES if not p.startswith("/zh/")]
print(f"Crawling {len(pages)} pages to {args.output}/\n")
for page_path in pages:
url = BASE_URL + page_path
# Convert path to filename: /guides/installation.html -> guides_installation.md
md_name = page_path.strip("/").replace("/", "_").replace(".html", ".md")
md_path = os.path.join(args.output, md_name)
print(f" {page_path} -> {md_name}", end=" ")
html = fetch_page(url)
if html is None:
print("FAILED")
continue
md = html_to_markdown(html)
if not md or len(md) < 50:
# VitePress SPA - content is loaded via JS, try fetching the raw .md source
# VitePress stores source at /InternDataEngine-Docs/page.html but the actual
# markdown might be accessible differently
print(f"(sparse content: {len(md)} chars, SPA rendering)")
else:
print(f"OK ({len(md)} chars)")
with open(md_path, "w") as f:
f.write(f"# Source: {url}\n\n")
f.write(md)
time.sleep(0.3)
# Also try fetching raw markdown from GitHub
print("\n\nAttempting raw markdown from GitHub...")
gh_base = "https://raw.githubusercontent.com/InternRobotics/InternDataEngine/master/docs"
# VitePress source files
raw_pages = {
"guides/installation.md": "guides_installation_raw.md",
"guides/quickstart.md": "guides_quickstart_raw.md",
"concepts/workflows.md": "concepts_workflows_raw.md",
"concepts/objects.md": "concepts_objects_raw.md",
"concepts/cameras.md": "concepts_cameras_raw.md",
"concepts/robots.md": "concepts_robots_raw.md",
"concepts/controllers.md": "concepts_controllers_raw.md",
"concepts/skills/overview.md": "concepts_skills_overview_raw.md",
"concepts/skills/pick.md": "concepts_skills_pick_raw.md",
"concepts/skills/place.md": "concepts_skills_place_raw.md",
"concepts/skills/articulation.md": "concepts_skills_articulation_raw.md",
"config/yaml.md": "config_yaml_raw.md",
"config/dr.md": "config_dr_raw.md",
"config/assets.md": "config_assets_raw.md",
"custom/assets.md": "custom_assets_raw.md",
"custom/robot.md": "custom_robot_raw.md",
"custom/controller.md": "custom_controller_raw.md",
"custom/skill.md": "custom_skill_raw.md",
"custom/task.md": "custom_task_raw.md",
"policy/training.md": "policy_training_raw.md",
"api/controllers.md": "api_controllers_raw.md",
"api/skills.md": "api_skills_raw.md",
}
for src, dst in raw_pages.items():
url = f"{gh_base}/{src}"
dst_path = os.path.join(args.output, dst)
print(f" {src}", end=" ")
content = fetch_page(url)
if content and len(content) > 50 and not content.strip().startswith("<!DOCTYPE"):
with open(dst_path, "w") as f:
f.write(content)
print(f"OK ({len(content)} chars)")
else:
print("NOT FOUND or HTML")
time.sleep(0.3)
print(f"\nDone. Files saved to {args.output}/")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,303 @@
# 自定义任务创建指南:从 USDC 资产到完整任务
本文档记录了将外部 USDC 资产(以试管为例)转换为 InternDataEngine 可用任务的完整流程。
## 前提条件
- InternDataEngine 项目已配置完成IS 5.0.0 + banana500 环境)
- 有现成的 USD/USDC 3D 模型文件
- conda 环境 `banana450`(用于 grasp 标注生成pyarmor 兼容)
## 完整流程
### Step 1: 准备资产目录
`workflows/simbox/example_assets/task/` 下创建任务目录:
```bash
mkdir -p workflows/simbox/example_assets/task/pick_test_tube/test_tube
mkdir -p workflows/simbox/example_assets/task/pick_test_tube/test_tube_rack
```
### Step 2: 从 USDC 导出三角化 OBJ
InternDataEngine 的 grasp 生成工具需要 OBJ 格式输入,且必须是纯三角面。使用 `migrate/usdc_to_obj.py` 从 USDC 提取并三角化:
```bash
conda activate banana500
python migrate/usdc_to_obj.py \
--input /path/to/your/model.usdc \
--output workflows/simbox/example_assets/task/pick_test_tube/test_tube/
```
输出文件:`Aligned_obj.obj`(三角化的 OBJ
**注意**:原始 USDC 中的面可能是四边形或多边形,脚本会自动进行 fan 三角化。
### Step 3: 重新打包 USD 结构
InternDataEngine 要求 USD 资产具有特定的层级结构:
```
/World (defaultPrim, Xform, 无物理属性)
├── /Looks (Scope, 材质)
│ ├── Material_0
│ └── Material_1
├── /Aligned (Xform, PhysicsRigidBodyAPI + PhysicsMassAPI)
│ └── /mesh (Mesh, PhysicsCollisionAPI + PhysicsMeshCollisionAPI)
└── /PhysicsMaterial (Material, PhysicsMaterialAPI)
```
**关键要求**
- `/World` 必须是 defaultPrim且不能有任何物理 schema
- `/Aligned``prim_path_child`,必须有 `PhysicsRigidBodyAPI`
- 碰撞体必须使用 `convexHull` 近似(不能用 triangle mesh
- `/Looks` 必须和 `/Aligned` 平级(在同一引用范围内)
使用 `migrate/repackage_test_tube.py` 自动重构:
```bash
python migrate/repackage_test_tube.py
```
脚本逻辑:
1. 创建新 USD定义 `/World` 为 defaultPrim纯净 Xform不挂引用
2.`/World/Aligned` 上添加引用,指向源 USD 的子节点(如 `/Test_Tube_AA_01/Test_Tube`
3.`/World/Looks` 上添加引用,指向源 USD 的材质(如 `/Test_Tube_AA_01/Looks`
4.`/World/Aligned` 添加 `PhysicsRigidBodyAPI``PhysicsMassAPI`
5. 给碰撞 mesh 设置 `convexHull` 近似
**重新绑定材质**:由于引用子节点后,原始材质绑定路径超出引用范围,需要手动重新绑定:
```bash
python migrate/fix_test_tube_materials.py
```
### Step 4: 生成 Grasp 抓取标注
使用项目自带的 grasp 生成工具(需要 `banana450` 环境,因为 pyarmor 加密对 Python 版本敏感):
```bash
conda activate banana450
python workflows/simbox/tools/grasp/gen_sparse_label.py \
--obj_path workflows/simbox/example_assets/task/pick_test_tube/test_tube/Aligned_obj.obj \
--unit m --sparse_num 3000 --max_widths 0.1
```
输出:`Aligned_grasp_sparse.npy`3000 x 17 的数组)
**可视化验证**
```bash
python workflows/simbox/tools/grasp/vis_grasp.py \
--obj_path workflows/simbox/example_assets/task/pick_test_tube/test_tube/Aligned_obj.obj \
--unit m --N 200
```
会弹出 Open3D 窗口,蓝色线条表示 gripper 姿态,应该合理地分布在物体周围。
**Grasp 标注格式N x 17**
| 列 | 维度 | 含义 |
|---|---|---|
| 0 | 1 | score质量分数0.1-1.0,越小越好) |
| 1 | 1 | width夹持器开口宽度 |
| 2 | 1 | height |
| 3 | 1 | depth |
| 4:13 | 9 | 旋转矩阵3x3row-major |
| 13:16 | 3 | 抓取中心点位置(物体坐标系) |
| 16 | 1 | obj_id通常为 -1 |
### Step 5: 创建任务配置 YAML
文件:`workflows/simbox/core/configs/tasks/example/pick_test_tube.yaml`
```yaml
tasks:
-
name: banana_base_task
asset_root: workflows/simbox/example_assets
task: BananaBaseTask
task_id: 0
offset: null
render: True
neglect_collision_names: ["table"]
arena_file: workflows/simbox/core/configs/arenas/example.yaml
env_map:
envmap_lib: envmap_lib
apply_randomization: False
intensity_range: [5000, 5000]
rotation_range: [0, 0]
robots:
-
name: "split_aloha"
robot_config_file: workflows/simbox/core/configs/robots/split_aloha.yaml
euler: [0.0, 0.0, 90.0]
ignore_substring: ["material", "table", "test_tube_rack"]
objects:
-
name: test_tube_right
path: task/pick_test_tube/test_tube/Aligned_obj.usd
target_class: RigidObject
dataset: custom
category: test_tube
prim_path_child: Aligned
translation: [0.0, 0.0, 0.0]
euler: [0.0, 0.0, 0.0]
scale: [1, 1, 1]
apply_randomization: False
orientation_mode: keep
regions:
-
object: test_tube_right
target: table
random_type: A_on_B_region_sampler
random_config:
pos_range: [[0.05, -0.05, 0.0], [0.15, 0.05, 0.0]]
yaw_rotation: [0.0, 0.0]
-
object: split_aloha
target: table
random_type: A_on_B_region_sampler
random_config:
pos_range: [[0.0, -0.86, -0.75], [0.0, -0.86, -0.75]]
yaw_rotation: [0.0, 0.0]
cameras:
# ... 复用 split_aloha 的相机配置(详见完整配置文件)
data:
task_dir: "pick_test_tube"
language_instruction: "Pick up the test tube from the table."
detailed_language_instruction: "Use the right arm to grasp the test tube."
collect_info: "Test tube picking task"
version: "v1.0"
update: True
max_episode_length: 2000
skills:
-
split_aloha:
-
right:
-
name: pick
objects: [test_tube_right]
npy_name: Aligned_grasp_sparse.npy
filter_y_dir: ["forward", 60]
filter_z_dir: ["downward", 150]
pre_grasp_offset: 0.05
gripper_change_steps: 10
t_eps: 0.025
o_eps: 1
process_valid: True
lift_th: 0.02
post_grasp_offset_min: 0.10
post_grasp_offset_max: 0.15
-
name: heuristic__skill
mode: home
gripper_state: 1.0
```
**Skill 名称注意**:使用 `register_skill` 装饰器注册的名称(类名转 snake_case例如
- `Pick``pick`
- `Heuristic_Skill``heuristic__skill`(双下划线)
- `Goto_Pose``goto__pose`(双下划线)
### Step 6: 创建 Pipeline 配置
文件:`configs/simbox/de_pick_test_tube.yaml`
```yaml
name: simbox_pick_test_tube
load_stage:
scene_loader:
type: env_loader
args:
workflow_type: SimBoxDualWorkFlow
cfg_path: workflows/simbox/core/configs/tasks/example/pick_test_tube.yaml
simulator:
physics_dt: 1/30
rendering_dt: 1/30
stage_units_in_meters: 1.0
headless: False # 调试时用 FalseGUI生产时改 True
renderer: "RayTracedLighting"
anti_aliasing: 0
layout_random_generator:
type: env_randomizer
args:
random_num: 1 # 调试时用 1生产时增大
strict_mode: true
plan_stage:
seq_planner:
type: env_planner
render_stage:
renderer:
type: env_renderer
store_stage:
writer:
type: env_writer
args:
batch_async: true
output_dir: output/${name}/
```
### Step 7: 运行测试
```bash
conda activate banana500
# GUI 模式调试headless: False
python launcher.py --config configs/simbox/de_pick_test_tube.yaml
# 确认后改 headless: True增大 random_num 批量生产
```
### Step 8: 检查输出
输出目录:`output/simbox_pick_test_tube/`
```
output/simbox_pick_test_tube/
├── pick_test_tube/split_aloha/<task>/right/
│ ├── <timestamp>/
│ │ ├── images.rgb.head/
│ │ ├── images.rgb.hand_right/
│ │ ├── images.rgb.hand_left/
│ │ └── lmdb/
│ └── ...
└── de_config.yaml
```
## 常见问题
### Q: `gen_sparse_label.py` 报 `_PyFloat_Pack8` 错误
A: pyarmor 加密对 Python 版本敏感,使用 `banana450` 环境Python 3.10 + 旧版依赖)。
### Q: USD 打开后材质丢失
A: 引用子节点时材质路径超出引用范围。需要用 `fix_test_tube_materials.py` 重新绑定。
### Q: `RigidContactView` 报 `sensor_count` 错误
A: USD 结构不对。确保 `/World` (defaultPrim) 没有 `PhysicsRigidBodyAPI`,物理属性只在 `/World/Aligned` 子节点上。
### Q: OBJ 导出后 grasp 生成为空0 条)
A: OBJ 文件包含非三角形面。使用 `migrate/usdc_to_obj.py` 导出时会自动三角化。
### Q: Plan 不收敛
A: 检查 grasp 标注质量(用 `vis_grasp.py` 可视化),确认姿态合理。调整 `filter_y_dir` / `filter_z_dir` 放宽过滤条件。
## 工具脚本一览
| 脚本 | 用途 |
|------|------|
| `migrate/usdc_to_obj.py` | USDC → 三角化 OBJ |
| `migrate/repackage_test_tube.py` | 重构 USD 层级结构 |
| `migrate/fix_test_tube_materials.py` | 修复材质绑定 |
| `workflows/simbox/tools/grasp/gen_sparse_label.py` | 生成 grasp 标注(需 banana450 |
| `workflows/simbox/tools/grasp/vis_grasp.py` | 可视化 grasp 标注 |

View File

@@ -0,0 +1,47 @@
"""
Fix material bindings on repackaged test tube USD.
The referenced meshes have bindings pointing to /Test_Tube_AA_01/Looks/...
which is outside the reference scope. This script overrides them to
point to /World/Looks/... instead.
Usage:
python migrate/fix_test_tube_materials.py
"""
from pxr import Usd, UsdShade
USD_PATH = "workflows/simbox/example_assets/task/pick_test_tube/test_tube/Aligned_obj.usd"
# Mapping: mesh prim path -> material path in new structure
MATERIAL_BINDINGS = {
"/World/Aligned/_______005": "/World/Looks/SimPBR_Translucent",
"/World/Aligned/tags/_______007": "/World/Looks/OmniPBR",
"/World/Aligned/Test_Tube_lid/_______006": "/World/Looks/OmniPBR_01",
}
def fix():
stage = Usd.Stage.Open(USD_PATH)
for mesh_path, mat_path in MATERIAL_BINDINGS.items():
mesh_prim = stage.GetPrimAtPath(mesh_path)
mat_prim = stage.GetPrimAtPath(mat_path)
if not mesh_prim.IsValid():
print(f" SKIP: {mesh_path} not found")
continue
if not mat_prim.IsValid():
print(f" SKIP: material {mat_path} not found")
continue
mat = UsdShade.Material(mat_prim)
UsdShade.MaterialBindingAPI.Apply(mesh_prim)
UsdShade.MaterialBindingAPI(mesh_prim).Bind(mat)
print(f" Bound {mesh_path} -> {mat_path}")
stage.GetRootLayer().Save()
print(f"\nSaved: {USD_PATH}")
if __name__ == "__main__":
fix()

51
migrate/gen_tube_grasp.py Normal file
View File

@@ -0,0 +1,51 @@
"""
Generate test tube grasp annotations by adapting working bottle grasps.
Strategy: Load verified working bottle grasps, scale positions to match
test tube dimensions. The rotation conventions are already correct.
Bottle: ~5cm radius, ~10cm height
Test tube: ~1.5cm radius, ~10.6cm height (similar height, 3x smaller radius)
Usage:
python migrate/gen_tube_grasp.py
"""
import numpy as np
import os
BOTTLE_GRASP = "workflows/simbox/example_assets/task/sort_the_rubbish/recyclable_garbage/bottle_0/Aligned_grasp_sparse.npy"
OUTPUT = "workflows/simbox/example_assets/task/pick_test_tube/test_tube/Aligned_grasp_sparse.npy"
def main():
# Load working bottle grasps
bottle = np.load(BOTTLE_GRASP)
print(f"Loaded {len(bottle)} bottle grasps")
print(f"Bottle pos range: X[{bottle[:,13].min():.4f},{bottle[:,13].max():.4f}] "
f"Y[{bottle[:,14].min():.4f},{bottle[:,14].max():.4f}] "
f"Z[{bottle[:,15].min():.4f},{bottle[:,15].max():.4f}]")
tube = bottle.copy()
# Scale XY positions (radius: bottle ~3cm -> tube ~1.5cm, factor ~0.5)
# Keep Z positions similar (both ~10cm height)
xy_scale = 0.5
tube[:, 13] *= xy_scale # X position
tube[:, 14] *= xy_scale # Y position
# Z positions stay the same (similar height)
# Reduce gripper width for smaller object
# Bottle width: 0.044-0.10, Tube needs: 0.02-0.06
tube[:, 1] = np.clip(tube[:, 1] * 0.6, 0.02, 0.06)
print(f"\nTube pos range: X[{tube[:,13].min():.4f},{tube[:,13].max():.4f}] "
f"Y[{tube[:,14].min():.4f},{tube[:,14].max():.4f}] "
f"Z[{tube[:,15].min():.4f},{tube[:,15].max():.4f}]")
print(f"Tube width range: [{tube[:,1].min():.3f},{tube[:,1].max():.3f}]")
np.save(OUTPUT, tube)
print(f"\nSaved {len(tube)} grasps to: {OUTPUT}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,140 @@
"""
Repackage test tube USD to match InternDataEngine's expected format.
Target structure (same as Aligned_obj.usd):
/World (Xform, defaultPrim, NO physics schemas)
/Looks (Scope) - materials referenced from source
/Aligned (Xform) - PhysicsRigidBodyAPI, PhysicsMassAPI
/mesh (first child mesh, PhysicsCollisionAPI)
/PhysicsMaterial (Material, PhysicsMaterialAPI)
Strategy:
- /World is a clean Xform (no reference, no inherited schemas)
- /World/Aligned references /Test_Tube_AA_01/Test_Tube from source
- /World/Looks references /Test_Tube_AA_01/Looks from source
- Physics applied as overrides on /World/Aligned
Usage:
python migrate/repackage_test_tube.py
"""
from pxr import Usd, UsdGeom, UsdPhysics, UsdShade, Sdf, Gf
import os
SRC_PATH = os.path.abspath(
"/home/tangger/LYT/maic_usd_assets_moudle/laboratory_equipment/Test_Tube/Test_Tube_AA_01.usdc"
)
DST_DIR = "workflows/simbox/example_assets/task/pick_test_tube/test_tube"
DST_PATH = os.path.join(DST_DIR, "Aligned_obj.usd")
def repackage():
# Read source to understand structure
src_stage = Usd.Stage.Open(SRC_PATH)
src_dp = src_stage.GetDefaultPrim()
print(f"Source: {SRC_PATH}")
print(f"Source defaultPrim: {src_dp.GetPath()}")
# Remove old output
if os.path.exists(DST_PATH):
os.remove(DST_PATH)
# Create new stage
dst_stage = Usd.Stage.CreateNew(DST_PATH)
UsdGeom.SetStageMetersPerUnit(dst_stage, 1.0)
UsdGeom.SetStageUpAxis(dst_stage, UsdGeom.Tokens.z)
# --- /World: clean Xform, no reference, no inherited schemas ---
world_xform = UsdGeom.Xform.Define(dst_stage, "/World")
dst_stage.SetDefaultPrim(world_xform.GetPrim())
# --- /World/Aligned: reference Test_Tube child from source ---
aligned_xform = UsdGeom.Xform.Define(dst_stage, "/World/Aligned")
aligned_prim = aligned_xform.GetPrim()
aligned_prim.GetReferences().AddReference(
SRC_PATH,
f"{src_dp.GetPath()}/Test_Tube"
)
# Add RigidBody physics
UsdPhysics.RigidBodyAPI.Apply(aligned_prim)
mass_api = UsdPhysics.MassAPI.Apply(aligned_prim)
mass_api.GetMassAttr().Set(0.005)
print("Created /World/Aligned with RigidBodyAPI + MassAPI (0.005 kg)")
# Set collision approximation on mesh children
for child_prim in aligned_prim.GetAllChildren():
if child_prim.HasAPI(UsdPhysics.CollisionAPI):
mesh_api = UsdPhysics.MeshCollisionAPI(child_prim)
if mesh_api:
mesh_api.GetApproximationAttr().Set("convexHull")
print(f" Set convexHull on {child_prim.GetPath()}")
# --- /World/Looks: reference Looks from source ---
looks_prim = dst_stage.DefinePrim("/World/Looks", "Scope")
looks_prim.GetReferences().AddReference(
SRC_PATH,
f"{src_dp.GetPath()}/Looks"
)
print("Created /World/Looks referencing source materials")
# --- Fix material bindings: remap from source paths to new paths ---
# Source materials are at /Test_Tube_AA_01/Looks/XXX
# In our new stage they are at /World/Looks/XXX
# The mesh bindings from the reference point to /Test_Tube_AA_01/Looks/XXX
# which is outside scope. We need to override the bindings.
src_material_map = {
"SimPBR_Translucent": "/World/Looks/SimPBR_Translucent",
"OmniPBR": "/World/Looks/OmniPBR",
"OmniPBR_01": "/World/Looks/OmniPBR_01",
}
# Find all mesh prims under /World/Aligned and rebind materials
for prim in dst_stage.Traverse():
if not str(prim.GetPath()).startswith("/World/Aligned"):
continue
binding = UsdShade.MaterialBindingAPI(prim)
if not binding:
continue
# Check if there's a direct binding
mat_path_rel = binding.GetDirectBinding().GetMaterialPath()
if mat_path_rel and str(mat_path_rel) != "":
mat_name = str(mat_path_rel).split("/")[-1]
if mat_name in src_material_map:
new_mat_path = src_material_map[mat_name]
new_mat = UsdShade.Material(dst_stage.GetPrimAtPath(new_mat_path))
if new_mat:
UsdShade.MaterialBindingAPI.Apply(prim)
UsdShade.MaterialBindingAPI(prim).Bind(new_mat)
print(f" Rebound material on {prim.GetPath()} -> {new_mat_path}")
# --- /World/PhysicsMaterial ---
phys_mat_prim = dst_stage.DefinePrim("/World/PhysicsMaterial", "Material")
UsdPhysics.MaterialAPI.Apply(phys_mat_prim)
phys_mat = UsdPhysics.MaterialAPI(phys_mat_prim)
phys_mat.GetStaticFrictionAttr().Set(0.5)
phys_mat.GetDynamicFrictionAttr().Set(0.5)
phys_mat.GetRestitutionAttr().Set(0.1)
print("Created /World/PhysicsMaterial")
# Save
dst_stage.GetRootLayer().Save()
print(f"\nSaved: {DST_PATH}")
# --- Verify ---
print(f"\n{'='*60}")
print("Verification:")
print(f"{'='*60}")
v_stage = Usd.Stage.Open(DST_PATH)
dp = v_stage.GetDefaultPrim()
print(f"DefaultPrim: {dp.GetPath()}")
print(f" Type: {dp.GetTypeName()}")
print(f" Schemas: {dp.GetAppliedSchemas()}")
for c in dp.GetChildren():
print(f" /{c.GetName()} [{c.GetTypeName()}] schemas={c.GetAppliedSchemas()}")
for gc in c.GetAllChildren():
schemas = gc.GetAppliedSchemas()
s = f" schemas={schemas}" if schemas else ""
print(f" /{gc.GetName()} [{gc.GetTypeName()}]{s}")
if __name__ == "__main__":
repackage()

64
migrate/usdc_to_obj.py Normal file
View File

@@ -0,0 +1,64 @@
"""
Extract mesh from USD/USDC and export as OBJ file.
Usage:
python migrate/usdc_to_obj.py --input path/to/file.usdc --output path/to/output/
"""
import argparse
import os
import numpy as np
from pxr import Usd, UsdGeom
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True)
parser.add_argument("--output", default=".")
args = parser.parse_args()
os.makedirs(args.output, exist_ok=True)
obj_path = os.path.join(args.output, "Aligned_obj.obj")
stage = Usd.Stage.Open(args.input)
vert_offset = 0
with open(obj_path, "w") as f:
f.write("# Exported from USD\n\n")
for prim in stage.Traverse():
if prim.GetTypeName() != "Mesh":
continue
mesh = UsdGeom.Mesh(prim)
points = mesh.GetPointsAttr().Get()
if not points:
continue
face_counts = mesh.GetFaceVertexCountsAttr().Get()
face_indices = mesh.GetFaceVertexIndicesAttr().Get()
name = str(prim.GetPath()).replace("/", "_").strip("_")
f.write(f"o {name}\n")
print(f" Mesh: {prim.GetPath()} ({len(points)} verts)")
for p in points:
f.write(f"v {p[0]:.8f} {p[1]:.8f} {p[2]:.8f}\n")
if face_counts and face_indices:
idx = 0
for fc in face_counts:
verts = []
for j in range(fc):
verts.append(face_indices[idx] + 1 + vert_offset)
idx += 1
# Triangulate: fan from first vertex
for j in range(1, len(verts) - 1):
f.write(f"f {verts[0]} {verts[j]} {verts[j+1]}\n")
vert_offset += len(points)
print(f"\nSaved: {obj_path} ({vert_offset} total vertices)")
if __name__ == "__main__":
main()